From 76cff8b8fb329458883abf7d082a409fffb953f3 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Mon, 22 Dec 2025 18:39:34 -0500 Subject: [PATCH 1/5] Add migration to fix failed bg jobs with weird state Some background jobs previously failed and did not have success or finished fields set due to bugs. This migration targets those jobs to update these fields so that the existing API endpoints for retrying background jobs can be used. --- backend/btrixcloud/db.py | 2 +- .../migration_0056_rerun_bg_jobs.py | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index 746a279cbb..5f5e0f7312 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -44,7 +44,7 @@ ) = object -CURR_DB_VERSION = "0055" +CURR_DB_VERSION = "0056" MIN_DB_VERSION = 7.0 diff --git a/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py b/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py new file mode 100644 index 0000000000..50384ec2dd --- /dev/null +++ b/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py @@ -0,0 +1,60 @@ +""" +Migration 0056 - Fix failed background jobs with success and finished unset +""" + +from datetime import timedelta +import os + +from btrixcloud.migrations import BaseMigration +from btrixcloud.utils import dt_now + + +MIGRATION_VERSION = "0056" + + +class Migration(BaseMigration): + """Migration class.""" + + # pylint: disable=unused-argument + def __init__(self, mdb, **kwargs): + super().__init__(mdb, migration_version=MIGRATION_VERSION) + + async def migrate_up(self): + """Perform migration up. + + Identify background jobs that failed but never had finished or success + updated in database and correct them in the database so that they can + be restarted via the retry endpoints. + + We don't want to modify jobs that are still in process or subject to + the replica deletion delay, so target jobs that are either replica + delay deltion + 1 or 7 days old, whichever is greater. + """ + jobs_mdb = self.mdb["jobs"] + + replica_deletion_days = int(os.environ.get("REPLICA_DELETION_DELAY_DAYS", 0)) + days_delta = max(replica_deletion_days + 1, 7) + started_before = dt_now() - timedelta(days=days_delta) + + match_query = { + "finished": None, + "success": None, + "started": {"$lte": started_before}, + } + + try: + await jobs_mdb.update_many( + match_query, + { + "$set": { + "success": False, + "finished": started_before, + } + }, + ) + # pylint: disable=broad-exception-caught + except Exception as err: + print( + f"Error updating failed background jobs: {err}", + flush=True, + ) From e397365b381d5164f9f8c8b2b11c8e9ae48aad74 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Thu, 8 Jan 2026 13:22:06 -0500 Subject: [PATCH 2/5] Fix typo --- backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py b/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py index 50384ec2dd..0be23be4ef 100644 --- a/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py +++ b/backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py @@ -27,8 +27,8 @@ async def migrate_up(self): be restarted via the retry endpoints. We don't want to modify jobs that are still in process or subject to - the replica deletion delay, so target jobs that are either replica - delay deltion + 1 or 7 days old, whichever is greater. + the replica deletion delay, so target jobs that are either (replica delay + deltion + 1) or 7 days old, whichever is greater. """ jobs_mdb = self.mdb["jobs"] From 31481a8bc128e613fe7003c9d3a3f725af9b45c6 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Tue, 13 Jan 2026 16:48:03 -0500 Subject: [PATCH 3/5] Add migration to look for unreplicated files and replicate them This is preferable to simply retrying older failed replication background jobs, as it's possible that the objects they correlate to have been deleted or changed since and so those old background jobs would no longer be applicable. --- backend/btrixcloud/db.py | 2 +- .../migration_0057_replicate_files.py | 121 ++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 backend/btrixcloud/migrations/migration_0057_replicate_files.py diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index 5f5e0f7312..ce18e12b5e 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -44,7 +44,7 @@ ) = object -CURR_DB_VERSION = "0056" +CURR_DB_VERSION = "0057" MIN_DB_VERSION = 7.0 diff --git a/backend/btrixcloud/migrations/migration_0057_replicate_files.py b/backend/btrixcloud/migrations/migration_0057_replicate_files.py new file mode 100644 index 0000000000..2691a8c5b3 --- /dev/null +++ b/backend/btrixcloud/migrations/migration_0057_replicate_files.py @@ -0,0 +1,121 @@ +""" +Migration 0057 - Replicate any unreplicated crawl and profile files +""" + +from btrixcloud.migrations import BaseMigration +from btrixcloud.models import BaseCrawl, Profile, BgJobType + + +MIGRATION_VERSION = "0057" + + +class Migration(BaseMigration): + """Migration class.""" + + # pylint: disable=unused-argument + def __init__(self, mdb, **kwargs): + super().__init__(mdb, migration_version=MIGRATION_VERSION) + + self.background_job_ops = kwargs.get("background_job_ops") + + # pylint: disable=too-many-locals + async def migrate_up(self): + """Perform migration up. + + Identify files from archived items and profiles that should have been + replicated but weren't, and start new background jobs to re-replicate + the files if there isn't already an in-progress job to do the same. + """ + orgs_mdb = self.mdb["organizations"] + jobs_mdb = self.mdb["jobs"] + crawls_mdb = self.mdb["crawls"] + profiles_mdb = self.mdb["profiles"] + + if self.background_job_ops is None: + print( + "Unable to replicate unreplicated files, missing required ops", + flush=True, + ) + return + + # Future-proof in anticipation of custom storage - do not attempt to + # replicate files for orgs that don't have a replica location configured + orgs_with_replicas = [] + async for org in orgs_mdb.find( + {"storageReplicas.0": {"$exists": True}}, projection=["_id"] + ): + orgs_with_replicas.append(org["_id"]) + + # Archived items + + crawls_match_query = { + "oid": {"$in": orgs_with_replicas}, + "files": {"$elemMatch": {"replicas": {"$in": [None, []]}}}, + } + async for crawl_raw in crawls_mdb.find(crawls_match_query): + crawl = BaseCrawl.from_dict(crawl_raw) + for file_ in crawl.files: + if not file_.replicas: + # Check that there isn't an in-progress job for this file + matching_job = await jobs_mdb.find( + { + "type": BgJobType.CREATE_REPLICA.value, + "object_id": crawl.id, + "object_type": crawl.type, + "file_path": file_.filename, + "started": {"$ne": None}, + "finished": None, + "success": None, + } + ) + if matching_job: + continue + + try: + await self.background_job_ops.create_replica_jobs( + crawl.oid, file_, crawl.id, crawl.type + ) + # pylint: disable=broad-exception-caught + except Exception as err: + print( + f"Error replicating unreplicated file for item {crawl.id}: {err}", + flush=True, + ) + + # Profiles + + profiles_match_query = { + "oid": {"$in": orgs_with_replicas}, + "resource.replicas": {"$in": [None, []]}, + } + async for profile_raw in profiles_mdb.find(profiles_match_query): + profile = Profile.from_dict(profile_raw) + + if not profile.resource: + continue + + # Check there isn't already an in-progress job for this profile + matching_job = await jobs_mdb.find( + { + "type": BgJobType.CREATE_REPLICA.value, + "object_id": profile.id, + "object_type": "profile", + "file_path": profile.resource.filename, + "started": {"$ne": None}, + "finished": None, + "success": None, + } + ) + if matching_job: + continue + + try: + await self.background_job_ops.create_replica_jobs( + profile.oid, profile.resource, profile.id, "profile" + ) + # pylint: disable=broad-exception-caught + except Exception as err: + print( + f"Error replicating unreplicated file for profile {profile.id}: {err}", + flush=True, + ) From f01095bf2e0bfe76cc418730cdf20c43d5227c2b Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Wed, 14 Jan 2026 17:59:24 -0500 Subject: [PATCH 4/5] Remove unnecessary success filter --- backend/btrixcloud/migrations/migration_0057_replicate_files.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/btrixcloud/migrations/migration_0057_replicate_files.py b/backend/btrixcloud/migrations/migration_0057_replicate_files.py index 2691a8c5b3..45bd68d8a2 100644 --- a/backend/btrixcloud/migrations/migration_0057_replicate_files.py +++ b/backend/btrixcloud/migrations/migration_0057_replicate_files.py @@ -65,7 +65,6 @@ async def migrate_up(self): "file_path": file_.filename, "started": {"$ne": None}, "finished": None, - "success": None, } ) if matching_job: @@ -103,7 +102,6 @@ async def migrate_up(self): "file_path": profile.resource.filename, "started": {"$ne": None}, "finished": None, - "success": None, } ) if matching_job: From 0bf8c120182eaa26ded2049f3b7b8b2383677f57 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Thu, 15 Jan 2026 11:36:55 -0500 Subject: [PATCH 5/5] Remove unnecessary variable --- .../migrations/migration_0057_replicate_files.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/backend/btrixcloud/migrations/migration_0057_replicate_files.py b/backend/btrixcloud/migrations/migration_0057_replicate_files.py index 45bd68d8a2..16c03972b5 100644 --- a/backend/btrixcloud/migrations/migration_0057_replicate_files.py +++ b/backend/btrixcloud/migrations/migration_0057_replicate_files.py @@ -57,7 +57,7 @@ async def migrate_up(self): for file_ in crawl.files: if not file_.replicas: # Check that there isn't an in-progress job for this file - matching_job = await jobs_mdb.find( + if await jobs_mdb.find( { "type": BgJobType.CREATE_REPLICA.value, "object_id": crawl.id, @@ -66,8 +66,7 @@ async def migrate_up(self): "started": {"$ne": None}, "finished": None, } - ) - if matching_job: + ): continue try: @@ -94,7 +93,7 @@ async def migrate_up(self): continue # Check there isn't already an in-progress job for this profile - matching_job = await jobs_mdb.find( + if await jobs_mdb.find( { "type": BgJobType.CREATE_REPLICA.value, "object_id": profile.id, @@ -103,8 +102,7 @@ async def migrate_up(self): "started": {"$ne": None}, "finished": None, } - ) - if matching_job: + ): continue try: