Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/btrixcloud/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
) = object


CURR_DB_VERSION = "0055"
CURR_DB_VERSION = "0057"

MIN_DB_VERSION = 7.0

Expand Down
60 changes: 60 additions & 0 deletions backend/btrixcloud/migrations/migration_0056_rerun_bg_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Migration 0056 - Fix failed background jobs with success and finished unset
"""

from datetime import timedelta
import os

from btrixcloud.migrations import BaseMigration
from btrixcloud.utils import dt_now


MIGRATION_VERSION = "0056"


class Migration(BaseMigration):
"""Migration class."""

# pylint: disable=unused-argument
def __init__(self, mdb, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

async def migrate_up(self):
"""Perform migration up.

Identify background jobs that failed but never had finished or success
updated in database and correct them in the database so that they can
be restarted via the retry endpoints.

We don't want to modify jobs that are still in process or subject to
the replica deletion delay, so target jobs that are either (replica delay
deltion + 1) or 7 days old, whichever is greater.
"""
jobs_mdb = self.mdb["jobs"]

replica_deletion_days = int(os.environ.get("REPLICA_DELETION_DELAY_DAYS", 0))
days_delta = max(replica_deletion_days + 1, 7)
started_before = dt_now() - timedelta(days=days_delta)

match_query = {
"finished": None,
"success": None,
"started": {"$lte": started_before},
}

try:
await jobs_mdb.update_many(
match_query,
{
"$set": {
"success": False,
"finished": started_before,
}
},
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error updating failed background jobs: {err}",
flush=True,
)
117 changes: 117 additions & 0 deletions backend/btrixcloud/migrations/migration_0057_replicate_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Migration 0057 - Replicate any unreplicated crawl and profile files
"""

from btrixcloud.migrations import BaseMigration
from btrixcloud.models import BaseCrawl, Profile, BgJobType


MIGRATION_VERSION = "0057"


class Migration(BaseMigration):
"""Migration class."""

# pylint: disable=unused-argument
def __init__(self, mdb, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

self.background_job_ops = kwargs.get("background_job_ops")

# pylint: disable=too-many-locals
async def migrate_up(self):
"""Perform migration up.

Identify files from archived items and profiles that should have been
replicated but weren't, and start new background jobs to re-replicate
the files if there isn't already an in-progress job to do the same.
"""
orgs_mdb = self.mdb["organizations"]
jobs_mdb = self.mdb["jobs"]
crawls_mdb = self.mdb["crawls"]
profiles_mdb = self.mdb["profiles"]

if self.background_job_ops is None:
print(
"Unable to replicate unreplicated files, missing required ops",
flush=True,
)
return

# Future-proof in anticipation of custom storage - do not attempt to
# replicate files for orgs that don't have a replica location configured
orgs_with_replicas = []
async for org in orgs_mdb.find(
{"storageReplicas.0": {"$exists": True}}, projection=["_id"]
):
orgs_with_replicas.append(org["_id"])

# Archived items

crawls_match_query = {
"oid": {"$in": orgs_with_replicas},
"files": {"$elemMatch": {"replicas": {"$in": [None, []]}}},
}
async for crawl_raw in crawls_mdb.find(crawls_match_query):
crawl = BaseCrawl.from_dict(crawl_raw)
for file_ in crawl.files:
if not file_.replicas:
# Check that there isn't an in-progress job for this file
if await jobs_mdb.find(
{
"type": BgJobType.CREATE_REPLICA.value,
"object_id": crawl.id,
"object_type": crawl.type,
"file_path": file_.filename,
"started": {"$ne": None},
"finished": None,
}
):
continue

try:
await self.background_job_ops.create_replica_jobs(
crawl.oid, file_, crawl.id, crawl.type
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error replicating unreplicated file for item {crawl.id}: {err}",
flush=True,
)

# Profiles

profiles_match_query = {
"oid": {"$in": orgs_with_replicas},
"resource.replicas": {"$in": [None, []]},
}
async for profile_raw in profiles_mdb.find(profiles_match_query):
profile = Profile.from_dict(profile_raw)

if not profile.resource:
continue

# Check there isn't already an in-progress job for this profile
if await jobs_mdb.find(
{
"type": BgJobType.CREATE_REPLICA.value,
"object_id": profile.id,
"object_type": "profile",
"file_path": profile.resource.filename,
"started": {"$ne": None},
"finished": None,
}
):
continue

try:
await self.background_job_ops.create_replica_jobs(
profile.oid, profile.resource, profile.id, "profile"
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error replicating unreplicated file for profile {profile.id}: {err}",
flush=True,
)
Loading