From 9a1542b71fd632cdcf26453845a53c14a050c850 Mon Sep 17 00:00:00 2001 From: "james.delbarco" Date: Thu, 29 Jan 2026 15:07:32 -0800 Subject: [PATCH] feat: enhance migration resilience with improved error handling and filtering Add comprehensive resiliency improvements to handle edge cases during migration: - Add --created-before flag for inclusive end-date filtering - Extend rate limit (429) retry attempts to 30 with enhanced jitter (10-50%) to prevent thundering herd and allow recovery from sustained rate limiting - Make 413 (Payload Too Large) errors retryable with automatic batch bisection and optional oversized item skipping - Fix BTQL created_before filter from exclusive (<) to inclusive (<=) - Remove tags from non-root spans per API constraints - Add skipped_oversize tracking to migration state These changes significantly improve migration success rates for large datasets and high-traffic scenarios. --- README.md | 34 +- braintrust_migrate/cli.py | 5 +- braintrust_migrate/client.py | 44 ++- braintrust_migrate/config.py | 50 ++- braintrust_migrate/insert_bisect.py | 15 +- braintrust_migrate/resources/datasets.py | 4 + braintrust_migrate/resources/experiments.py | 13 + braintrust_migrate/resources/logs.py | 48 ++- braintrust_migrate/streaming_utils.py | 42 ++- tests/unit/test_created_after_canonicalize.py | 38 +- .../unit/test_created_before_canonicalize.py | 36 ++ tests/unit/test_logs_btql_sorted_fetch.py | 355 ++++++++++++++++++ 12 files changed, 576 insertions(+), 108 deletions(-) create mode 100644 tests/unit/test_created_before_canonicalize.py diff --git a/README.md b/README.md index 4c29701..3c6b9e3 100644 --- a/README.md +++ b/README.md @@ -148,11 +148,18 @@ MIGRATION_INSERT_MAX_REQUEST_BYTES=6291456 # 6MB max request size (default) MIGRATION_INSERT_REQUEST_HEADROOM_RATIO=0.75 # Use 75% of max → ~4.5MB effective limit # Optional time-based filtering (date range) -MIGRATION_CREATED_AFTER= # Inclusive start date (e.g. 2026-01-01) -MIGRATION_CREATED_BEFORE= # Exclusive end date (e.g. 2026-02-01) - # Together: migrates data where created >= after AND created < before - # - Logs: filters individual events by created date - # - Experiments: filters which experiments to migrate +MIGRATION_CREATED_AFTER= # Start date filter (e.g. 2026-01-15) + # - Logs: only migrate events created >= this date + # - Experiments: only migrate experiments created >= this date +MIGRATION_CREATED_BEFORE= # End date filter (e.g. 2026-01-31) + # - Inclusive to end-of-day (23:59:59) + # - Logs: only migrate events created <= this date + # - Experiments: only migrate experiments created <= this date + +# Example: Define a date range by combining both filters +# MIGRATION_CREATED_AFTER=2026-01-01 +# MIGRATION_CREATED_BEFORE=2026-01-31 +# This migrates only logs/experiments created between Jan 1 and Jan 31, 2026 (inclusive) ``` ### Getting API Keys @@ -230,27 +237,20 @@ braintrust-migrate migrate --dry-run **Time-based Filtering:** ```bash -# Only migrate data created on or after a certain date (inclusive) +# Only migrate logs/experiments created on or after a certain date braintrust-migrate migrate --created-after 2026-01-15 -# Only migrate data created before a certain date (exclusive) -braintrust-migrate migrate --created-before 2026-02-01 +# Only migrate logs/experiments created on or before a certain date (inclusive to end-of-day) +braintrust-migrate migrate --created-before 2026-01-31 -# Date range: migrate all of January 2026 -# Uses half-open interval: [created-after, created-before) -braintrust-migrate migrate --created-after 2026-01-01 --created-before 2026-02-01 +# Combine both to define a date range +braintrust-migrate migrate --created-after 2026-01-01 --created-before 2026-01-31 # Applies to: # - Logs: filters individual events by created date # - Experiments: filters which experiments to migrate (all their events are included) ``` -**Date Filter Semantics:** -- `--created-after`: **Inclusive** — `created >= value` -- `--created-before`: **Exclusive** — `created < value` - -This half-open interval `[after, before)` makes it easy to specify clean date ranges without overlap or gaps. - ### CLI Reference ```bash diff --git a/braintrust_migrate/cli.py b/braintrust_migrate/cli.py index 4fcc7fc..2dcd75f 100644 --- a/braintrust_migrate/cli.py +++ b/braintrust_migrate/cli.py @@ -177,8 +177,9 @@ def migrate( typer.Option( "--created-before", help=( - "Only migrate data created before this date (exclusive). Format: YYYY-MM-DD (e.g. 2026-02-01). " - "Use with --created-after for date ranges (e.g., --created-after 2026-01-01 --created-before 2026-02-01)." + "Only migrate data created on or before this date. Format: YYYY-MM-DD (e.g. 2026-01-15). " + "Date-only values are treated as end-of-day (23:59:59.999999 UTC). " + "Can be combined with --created-after to define a date range." ), envvar="MIGRATION_CREATED_BEFORE", ), diff --git a/braintrust_migrate/client.py b/braintrust_migrate/client.py index 4cfab2c..bc77313 100644 --- a/braintrust_migrate/client.py +++ b/braintrust_migrate/client.py @@ -347,6 +347,8 @@ async def with_retry(self, operation_name: str, coro_func): attempts = int(self.migration_config.retry_attempts) + 1 base_delay = float(self.migration_config.retry_delay) max_delay = 60.0 + # For 429 rate limits, retry many more times (up to ~30 min of waiting) + rate_limit_max_attempts = 30 HTTP_STATUS_TOO_MANY_REQUESTS = 429 @@ -384,7 +386,8 @@ def _classify_exception( retry_after = _parse_retry_after_seconds(exc.response) if status == HTTP_STATUS_TOO_MANY_REQUESTS: return True, status, retry_after - if status in {408, 409, 425, 500, 502, 503, 504}: + # 413 is retryable - bisect logic in insert_bisect.py will handle splitting batches + if status in {408, 409, 413, 425, 500, 502, 503, 504}: return True, status, retry_after return False, status, retry_after if isinstance(exc, httpx.RequestError): @@ -416,7 +419,8 @@ def _exc_context(exc: Exception) -> dict[str, Any]: return ctx last_exc: Exception | None = None - for attempt in range(1, attempts + 1): + attempt = 1 + while True: try: self._logger.debug( "Executing operation", @@ -433,12 +437,18 @@ def _exc_context(exc: Exception) -> dict[str, Any]: retryable, status, retry_after = _classify_exception(e) exc_ctx = _exc_context(e) - if not retryable or attempt >= attempts: + # For 429 rate limits, be much more persistent + is_rate_limit = status == HTTP_STATUS_TOO_MANY_REQUESTS + effective_max_attempts = ( + rate_limit_max_attempts if is_rate_limit else attempts + ) + + if not retryable or attempt >= effective_max_attempts: self._logger.error( "Operation failed", operation=operation_name, attempt=attempt, - max_attempts=attempts, + max_attempts=effective_max_attempts, status_code=status, error=str(e), **exc_ctx, @@ -446,26 +456,31 @@ def _exc_context(exc: Exception) -> dict[str, Any]: raise exp_delay = min(max_delay, base_delay * (2 ** (attempt - 1))) - # small jitter to avoid thundering herd - jitter = random.uniform(0, min(1.0, exp_delay * 0.1)) - delay = min(max_delay, exp_delay + jitter) - if retry_after is not None: - delay = min(max_delay, max(delay, retry_after)) - # Concise message for rate limits (expected behavior), verbose for other errors - if status == HTTP_STATUS_TOO_MANY_REQUESTS: + if is_rate_limit: + # For rate limits: use full retry_after, add significant jitter + # to spread out requests and avoid thundering herd + base_wait = retry_after if retry_after is not None else 60.0 + # Add 10-50% jitter to spread out concurrent requests + jitter = random.uniform(base_wait * 0.1, base_wait * 0.5) + delay = min(base_wait + jitter, max_delay) self._logger.warning( "Rate limited, retrying", operation=operation_name, retry_in_seconds=round(delay, 1), - attempt=f"{attempt}/{attempts}", + attempt=f"{attempt}/{effective_max_attempts}", ) else: + # For other errors: use exponential backoff with small jitter + jitter = random.uniform(0, min(1.0, exp_delay * 0.1)) + delay = min(max_delay, exp_delay + jitter) + if retry_after is not None: + delay = min(max_delay, max(delay, retry_after)) self._logger.warning( "Operation failed, backing off and retrying", operation=operation_name, attempt=attempt, - max_attempts=attempts, + max_attempts=effective_max_attempts, status_code=status, retry_after_seconds=retry_after, sleep_seconds=delay, @@ -473,8 +488,9 @@ def _exc_context(exc: Exception) -> dict[str, Any]: **exc_ctx, ) await asyncio.sleep(delay) + attempt += 1 - # Should be unreachable + # Should be unreachable - loop exits via return or raise if last_exc is not None: raise last_exc raise BraintrustAPIError(f"{operation_name} failed with unknown error") diff --git a/braintrust_migrate/config.py b/braintrust_migrate/config.py index c46d549..4e7faa3 100644 --- a/braintrust_migrate/config.py +++ b/braintrust_migrate/config.py @@ -15,8 +15,8 @@ _DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") -def _canonicalize_datetime(value: str, field_name: str) -> str: - """Normalize a user-supplied datetime string for BTQL filters. +def canonicalize_created_after(value: str) -> str: + """Normalize a user-supplied datetime string for BTQL `created >= ...` filters. Accepts: - YYYY-MM-DD (treated as midnight UTC) @@ -26,7 +26,7 @@ def _canonicalize_datetime(value: str, field_name: str) -> str: """ v = (value or "").strip() if not v: - raise ValueError(f"{field_name} cannot be empty") + raise ValueError("created_after cannot be empty") if _DATE_ONLY_RE.match(v): dt = datetime.fromisoformat(v).replace(tzinfo=UTC) @@ -37,7 +37,7 @@ def _canonicalize_datetime(value: str, field_name: str) -> str: dt = datetime.fromisoformat(v_for_parse) except Exception as e: raise ValueError( - f"Invalid {field_name} date (expected YYYY-MM-DD like 2026-01-15): {value!r}" + f"Invalid created_after date (expected YYYY-MM-DD like 2026-01-15): {value!r}" ) from e if dt.tzinfo is None: @@ -46,14 +46,38 @@ def _canonicalize_datetime(value: str, field_name: str) -> str: return dt.isoformat().replace("+00:00", "Z") -def canonicalize_created_after(value: str) -> str: - """Normalize a user-supplied datetime string for BTQL `created >= ...` filters.""" - return _canonicalize_datetime(value, "created_after") +def canonicalize_created_before(value: str) -> str: + """Normalize a user-supplied datetime string for BTQL `created <= ...` filters. + Accepts: + - YYYY-MM-DD (treated as end-of-day 23:59:59.999999 UTC for inclusive range) + - ISO-8601 datetimes, with or without timezone (naive treated as UTC) -def canonicalize_created_before(value: str) -> str: - """Normalize a user-supplied datetime string for BTQL `created < ...` filters.""" - return _canonicalize_datetime(value, "created_before") + Returns a canonical UTC ISO-8601 string ending in 'Z'. + """ + v = (value or "").strip() + if not v: + raise ValueError("created_before cannot be empty") + + if _DATE_ONLY_RE.match(v): + # For date-only, use end-of-day to make it inclusive of the full day + dt = datetime.fromisoformat(v).replace( + hour=23, minute=59, second=59, microsecond=999999, tzinfo=UTC + ) + return dt.isoformat().replace("+00:00", "Z") + + v_for_parse = v[:-1] + "+00:00" if v.endswith("Z") else v + try: + dt = datetime.fromisoformat(v_for_parse) + except Exception as e: + raise ValueError( + f"Invalid created_before date (expected YYYY-MM-DD like 2026-01-15): {value!r}" + ) from e + + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + dt = dt.astimezone(UTC) + return dt.isoformat().replace("+00:00", "Z") class BraintrustOrgConfig(BaseModel): @@ -156,10 +180,10 @@ class MigrationConfig(BaseModel): default=None, description=( "Optional ISO-8601 timestamp filter. When set: " - "(1) project logs migration only migrates events with created < created_before, " - "(2) experiments migration only migrates experiments with created < created_before " + "(1) project logs migration only migrates events with created <= created_before, " + "(2) experiments migration only migrates experiments with created <= created_before " "(and all their events). " - "Use with created_after for date ranges (e.g., --created-after 2026-01-01 --created-before 2026-02-01)." + "Can be combined with created_after to define a date range." ), ) diff --git a/braintrust_migrate/insert_bisect.py b/braintrust_migrate/insert_bisect.py index 3299074..545d777 100644 --- a/braintrust_migrate/insert_bisect.py +++ b/braintrust_migrate/insert_bisect.py @@ -23,12 +23,19 @@ async def insert_with_413_bisect( is_http_413: Callable[[Exception], bool], on_success: Callable[[list[T], R], Awaitable[None]] | None = None, on_single_413: Callable[[T, Exception], Awaitable[None]] | None = None, -) -> None: + skip_single_413: bool = False, +) -> list[T]: """Insert items, bisecting on 413 to isolate oversized payloads. - Preserves the original item order by always processing the left half first. - - If a singleton batch triggers 413, invokes `on_single_413` (if provided) and re-raises. + - If a singleton batch triggers 413, invokes `on_single_413` (if provided). + - If `skip_single_413` is True, the oversized item is skipped and migration + continues. If False (default), the exception is re-raised. + + Returns: + List of items that were skipped due to being oversized (empty if none). """ + skipped_oversize: list[T] = [] stack: list[list[T]] = [items] while stack: batch = stack.pop() @@ -43,6 +50,9 @@ async def insert_with_413_bisect( if len(batch) == 1: if on_single_413 is not None: await on_single_413(batch[0], e) + if skip_single_413: + skipped_oversize.append(batch[0]) + continue raise mid = math.ceil(len(batch) / 2) left = batch[:mid] @@ -52,3 +62,4 @@ async def insert_with_413_bisect( stack.append(left) continue raise + return skipped_oversize \ No newline at end of file diff --git a/braintrust_migrate/resources/datasets.py b/braintrust_migrate/resources/datasets.py index a9623e9..c37adde 100644 --- a/braintrust_migrate/resources/datasets.py +++ b/braintrust_migrate/resources/datasets.py @@ -681,6 +681,9 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: incr_skipped_seen=lambda n: setattr( state, "skipped_seen", int(state.skipped_seen) + int(n) ), + incr_skipped_oversize=lambda n: setattr( + state, "skipped_oversize", int(state.skipped_oversize) + int(n) + ), incr_attachments_copied=lambda n: setattr( state, "attachments_copied", @@ -702,6 +705,7 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: "inserted_bytes_total": state.inserted_bytes, "skipped_deleted_total": state.skipped_deleted, "skipped_seen_total": state.skipped_seen, + "skipped_oversize_total": state.skipped_oversize, "attachments_copied_total": state.attachments_copied, "cursor": ( (state.btql_min_pagination_key[:16] + "…") diff --git a/braintrust_migrate/resources/experiments.py b/braintrust_migrate/resources/experiments.py index a86d4fc..ffa733e 100644 --- a/braintrust_migrate/resources/experiments.py +++ b/braintrust_migrate/resources/experiments.py @@ -530,6 +530,15 @@ def _event_to_insert( if event.get("created") is not None: origin["created"] = event.get("created") out["origin"] = origin + + # Braintrust API does not allow 'tags' on non-root spans. + # A span is a root span if span_id equals root_span_id (or root_span_id is absent). + span_id = event.get("span_id") + root_span_id = event.get("root_span_id") + is_root_span = root_span_id is None or span_id == root_span_id + if not is_root_span and "tags" in out: + del out["tags"] + return out async def _fetch_experiment_events_page( @@ -762,6 +771,9 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: incr_skipped_seen=lambda n: setattr( state, "skipped_seen", int(state.skipped_seen) + int(n) ), + incr_skipped_oversize=lambda n: setattr( + state, "skipped_oversize", int(state.skipped_oversize) + int(n) + ), incr_attachments_copied=lambda n: setattr( state, "attachments_copied", @@ -783,6 +795,7 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: "inserted_bytes_total": state.inserted_bytes, "skipped_deleted_total": state.skipped_deleted, "skipped_seen_total": state.skipped_seen, + "skipped_oversize_total": state.skipped_oversize, "attachments_copied_total": state.attachments_copied, "cursor": ( (state.btql_min_pagination_key[:16] + "…") diff --git a/braintrust_migrate/resources/logs.py b/braintrust_migrate/resources/logs.py index a2cbfe2..fa8f171 100644 --- a/braintrust_migrate/resources/logs.py +++ b/braintrust_migrate/resources/logs.py @@ -45,6 +45,7 @@ class _LogsStreamingState: inserted_events: int = 0 inserted_bytes: int = 0 skipped_seen: int = 0 + skipped_oversize: int = 0 failed_batches: int = 0 attachments_copied: int = 0 # When using BTQL-sorted fetch, we cannot use the opaque cursor. Instead we @@ -69,6 +70,7 @@ def from_path(cls, path: Path) -> _LogsStreamingState: inserted_events=int(data.get("inserted_events", 0)), inserted_bytes=int(data.get("inserted_bytes", 0)), skipped_seen=int(data.get("skipped_seen", 0)), + skipped_oversize=int(data.get("skipped_oversize", 0)), failed_batches=int(data.get("failed_batches", 0)), attachments_copied=int(data.get("attachments_copied", 0)), btql_min_pagination_key=data.get("btql_min_pagination_key"), @@ -89,6 +91,7 @@ def to_dict(self) -> dict[str, Any]: "inserted_events": self.inserted_events, "inserted_bytes": self.inserted_bytes, "skipped_seen": self.skipped_seen, + "skipped_oversize": self.skipped_oversize, "failed_batches": self.failed_batches, "attachments_copied": self.attachments_copied, "btql_min_pagination_key": self.btql_min_pagination_key, @@ -235,6 +238,14 @@ def _event_to_insert( if "id" in event: out["id"] = event["id"] + # Braintrust API does not allow 'tags' on non-root spans. + # A span is a root span if span_id equals root_span_id (or root_span_id is absent). + span_id = event.get("span_id") + root_span_id = event.get("root_span_id") + is_root_span = root_span_id is None or span_id == root_span_id + if not is_root_span and "tags" in out: + del out["tags"] + if "origin" not in out or out.get("origin") is None: origin: dict[str, Any] = { "object_type": "project_logs", @@ -451,12 +462,12 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: try: # BTQL-based streaming does not use version snapshots. - # Optional time filters for streaming queries (persisted for safe resume). - mig_cfg = getattr(self.source_client, "migration_config", None) - created_after_cfg = getattr(mig_cfg, "created_after", None) - created_before_cfg = getattr(mig_cfg, "created_before", None) - - # Validate and persist created_after filter + # Optional created-after filter for streaming queries (persisted for safe resume). + created_after_cfg = getattr( + getattr(self.source_client, "migration_config", None), + "created_after", + None, + ) if ( self._stream_state.created_after is not None or created_after_cfg is not None @@ -467,7 +478,7 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: "migration_config.created_after must be a non-empty string when set" ) self._stream_state.created_after = created_after_cfg - self._stream_state.query_source = "btql_sorted_date_filter" + self._stream_state.query_source = "btql_sorted_created_after" self._save_stream_state() elif created_after_cfg is None: raise ValueError( @@ -484,7 +495,12 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: f"run={created_after_cfg!r}. Re-run with the checkpoint value or start a fresh checkpoint." ) - # Validate and persist created_before filter + # Optional created-before filter for streaming queries (persisted for safe resume). + created_before_cfg = getattr( + getattr(self.source_client, "migration_config", None), + "created_before", + None, + ) if ( self._stream_state.created_before is not None or created_before_cfg is not None @@ -495,7 +511,9 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: "migration_config.created_before must be a non-empty string when set" ) self._stream_state.created_before = created_before_cfg - self._stream_state.query_source = "btql_sorted_date_filter" + # If no query_source set yet (no created_after), set it now + if not self._stream_state.query_source: + self._stream_state.query_source = "btql_sorted_created_before" self._save_stream_state() elif created_before_cfg is None: raise ValueError( @@ -529,16 +547,14 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: log_fields={ "source_project_id": source_project_id, "created_after": self._stream_state.created_after, - "created_before": self._stream_state.created_before, }, ) if start_pk is None: self._logger.info( - "No project logs found in date range; finishing", + "No project logs found at/after created_after; finishing", source_project_id=source_project_id, dest_project_id=dest_project_id, created_after=self._stream_state.created_after, - created_before=self._stream_state.created_before, ) self._save_stream_state() return { @@ -558,7 +574,7 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]: } self._stream_state.btql_min_pagination_key = start_pk self._stream_state.btql_min_pagination_key_inclusive = True - self._stream_state.query_source = "btql_sorted_date_filter" + self._stream_state.query_source = "btql_sorted_created_after" self._save_stream_state() progress_hook = self._progress_hook @@ -792,6 +808,11 @@ def _set_last_pk(pk: str | None) -> None: "skipped_seen", int(self._stream_state.skipped_seen) + int(n), ), + incr_skipped_oversize=lambda n: setattr( + self._stream_state, + "skipped_oversize", + int(self._stream_state.skipped_oversize) + int(n), + ), incr_attachments_copied=lambda n: setattr( self._stream_state, "attachments_copied", @@ -813,6 +834,7 @@ def _set_last_pk(pk: str | None) -> None: "total": self._stream_state.fetched_events, "migrated": self._stream_state.inserted_events, "skipped": self._stream_state.skipped_seen, + "skipped_oversize": self._stream_state.skipped_oversize, "failed": self._stream_state.failed_batches, "errors": errors, "streaming": True, diff --git a/braintrust_migrate/streaming_utils.py b/braintrust_migrate/streaming_utils.py index e726791..dc6aa80 100644 --- a/braintrust_migrate/streaming_utils.py +++ b/braintrust_migrate/streaming_utils.py @@ -36,6 +36,7 @@ class EventsStreamState: inserted_bytes: int = 0 skipped_deleted: int = 0 skipped_seen: int = 0 + skipped_oversize: int = 0 attachments_copied: int = 0 @classmethod @@ -54,6 +55,7 @@ def from_path(cls, path: Path) -> EventsStreamState: inserted_bytes=int(data.get("inserted_bytes", 0)), skipped_deleted=int(data.get("skipped_deleted", 0)), skipped_seen=int(data.get("skipped_seen", 0)), + skipped_oversize=int(data.get("skipped_oversize", 0)), attachments_copied=int(data.get("attachments_copied", 0)), ) @@ -68,6 +70,7 @@ def to_dict(self) -> dict[str, Any]: "inserted_bytes": self.inserted_bytes, "skipped_deleted": self.skipped_deleted, "skipped_seen": self.skipped_seen, + "skipped_oversize": self.skipped_oversize, "attachments_copied": self.attachments_copied, } @@ -119,22 +122,23 @@ def build_btql_sorted_page_query( """Build a BTQL SQL query for stable sorted paging on `_pagination_key`. Args: - from_expr: The FROM expression (e.g., "project_logs('...', shape => 'spans')") - limit: Maximum number of rows to return - last_pagination_key: Resume pagination from this key - last_pagination_key_inclusive: If True, use >= instead of > for pagination key - created_after: Only include rows with created >= this value (inclusive) - created_before: Only include rows with created < this value (exclusive) - select: Fields to select (default "*") + from_expr: The BTQL FROM clause (e.g., 'logs(...)'). + limit: Maximum number of rows to return. + last_pagination_key: Resume from this pagination key. + last_pagination_key_inclusive: If True, use >=; otherwise >. + created_after: Optional filter for created >= this timestamp. + created_before: Optional filter for created <= this timestamp. + select: Columns to select (default '*'). Returns: - BTQL SQL query string + A BTQL query string. """ + conditions: list[str] = [] if isinstance(created_after, str) and created_after: conditions.append(f"created >= '{btql_quote(created_after)}'") if isinstance(created_before, str) and created_before: - conditions.append(f"created < '{btql_quote(created_before)}'") + conditions.append(f"created <= '{btql_quote(created_before)}'") if isinstance(last_pagination_key, str) and last_pagination_key: op = ">=" if last_pagination_key_inclusive else ">" conditions.append(f"_pagination_key {op} '{btql_quote(last_pagination_key)}'") @@ -157,6 +161,7 @@ class StreamHooks(TypedDict, total=False): on_page: Callable[[dict[str, Any]], None] on_done: Callable[[dict[str, Any]], None] on_batch_error: Callable[[dict[str, Any]], None] + on_skip_oversize: Callable[[dict[str, Any]], None] def _extract_ids(events: list[dict[str, Any]]) -> list[str]: @@ -195,6 +200,7 @@ async def stream_btql_sorted_events( incr_inserted_bytes: Callable[[int], None], incr_skipped_deleted: Callable[[int], None] | None, incr_skipped_seen: Callable[[int], None] | None, + incr_skipped_oversize: Callable[[int], None] | None, incr_attachments_copied: Callable[[int], None] | None, # Optional progress hooks hooks: StreamHooks | None = None, @@ -310,13 +316,29 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None: incr_attachments_copied(copied) try: - await insert_with_413_bisect( + skipped_oversize = await insert_with_413_bisect( batch, insert_fn=_insert_one, is_http_413=is_http_413, on_success=_on_success, on_single_413=_on_single_413, + skip_single_413=True, ) + # Track skipped oversize events (these were logged via on_single_413) + if skipped_oversize: + if incr_skipped_oversize is not None: + incr_skipped_oversize(len(skipped_oversize)) + # Mark as seen so they won't be retried on resume + if seen_db is not None: + seen_db.mark_seen(_extract_ids(skipped_oversize)) + if hooks and "on_skip_oversize" in hooks: + hooks["on_skip_oversize"]( + { + "page_num": page_num, + "skipped_count": len(skipped_oversize), + "skipped_ids": _extract_ids(skipped_oversize), + } + ) except Exception as e: if hooks and "on_batch_error" in hooks: hooks["on_batch_error"]( diff --git a/tests/unit/test_created_after_canonicalize.py b/tests/unit/test_created_after_canonicalize.py index 78c7ee3..db646aa 100644 --- a/tests/unit/test_created_after_canonicalize.py +++ b/tests/unit/test_created_after_canonicalize.py @@ -2,10 +2,7 @@ import pytest -from braintrust_migrate.config import canonicalize_created_after, canonicalize_created_before - - -# Tests for created_after +from braintrust_migrate.config import canonicalize_created_after def test_created_after_date_only_is_midnight_utc() -> None: @@ -36,36 +33,3 @@ def test_created_after_rejects_empty() -> None: def test_created_after_rejects_invalid() -> None: with pytest.raises(ValueError, match="Invalid created_after date"): canonicalize_created_after("not-a-date") - - -# Tests for created_before - - -def test_created_before_date_only_is_midnight_utc() -> None: - assert canonicalize_created_before("2020-01-02") == "2020-01-02T00:00:00Z" - - -def test_created_before_accepts_z_suffix() -> None: - assert canonicalize_created_before("2020-01-02T03:04:05Z") == "2020-01-02T03:04:05Z" - - -def test_created_before_converts_offset_to_utc() -> None: - # 03:00 at -05:00 is 08:00Z - assert ( - canonicalize_created_before("2020-01-02T03:00:00-05:00") - == "2020-01-02T08:00:00Z" - ) - - -def test_created_before_naive_datetime_treated_as_utc() -> None: - assert canonicalize_created_before("2020-01-02T03:04:05") == "2020-01-02T03:04:05Z" - - -def test_created_before_rejects_empty() -> None: - with pytest.raises(ValueError, match="created_before cannot be empty"): - canonicalize_created_before(" ") - - -def test_created_before_rejects_invalid() -> None: - with pytest.raises(ValueError, match="Invalid created_before date"): - canonicalize_created_before("not-a-date") diff --git a/tests/unit/test_created_before_canonicalize.py b/tests/unit/test_created_before_canonicalize.py new file mode 100644 index 0000000..8cd660a --- /dev/null +++ b/tests/unit/test_created_before_canonicalize.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import pytest + +from braintrust_migrate.config import canonicalize_created_before + + +def test_created_before_date_only_is_end_of_day_utc() -> None: + # For created_before, date-only values should use end-of-day for inclusive range + assert canonicalize_created_before("2020-01-02") == "2020-01-02T23:59:59.999999Z" + + +def test_created_before_accepts_z_suffix() -> None: + assert canonicalize_created_before("2020-01-02T03:04:05Z") == "2020-01-02T03:04:05Z" + + +def test_created_before_converts_offset_to_utc() -> None: + # 03:00 at -05:00 is 08:00Z + assert ( + canonicalize_created_before("2020-01-02T03:00:00-05:00") + == "2020-01-02T08:00:00Z" + ) + + +def test_created_before_naive_datetime_treated_as_utc() -> None: + assert canonicalize_created_before("2020-01-02T03:04:05") == "2020-01-02T03:04:05Z" + + +def test_created_before_rejects_empty() -> None: + with pytest.raises(ValueError, match="created_before cannot be empty"): + canonicalize_created_before(" ") + + +def test_created_before_rejects_invalid() -> None: + with pytest.raises(ValueError, match="Invalid created_before date"): + canonicalize_created_before("not-a-date") diff --git a/tests/unit/test_logs_btql_sorted_fetch.py b/tests/unit/test_logs_btql_sorted_fetch.py index 6a24a2b..1111f4b 100644 --- a/tests/unit/test_logs_btql_sorted_fetch.py +++ b/tests/unit/test_logs_btql_sorted_fetch.py @@ -326,3 +326,358 @@ async def test_logs_created_after_mismatch_with_checkpoint_errors( with pytest.raises(ValueError, match="created_after mismatch"): await migrator.migrate_all("proj-source") + + +class _SourceBtqlClientCreatedBefore: + def __init__( + self, pages: list[list[dict[str, Any]]], mig_cfg: MigrationConfig + ) -> None: + self._pages = pages + self.migration_config = mig_cfg + self.queries: list[str] = [] + self._fetch_calls = 0 + + async def with_retry(self, _operation_name: str, coro_func): + res = coro_func() + if hasattr(res, "__await__"): + return await res + return res + + async def raw_request( + self, method: str, path: str, *, json: Any = None, **kwargs: Any + ) -> Any: + _ = kwargs + assert method.upper() == "POST" + assert path == "/btql" + q = (json or {}).get("query") + assert isinstance(q, str) + self.queries.append(q) + + # Fetch queries select full rows. + assert "SELECT *" in q + assert "FROM project_logs('proj-source', shape => 'spans')" in q + assert "ORDER BY _pagination_key" in q + assert "cursor:" not in q + + self._fetch_calls += 1 + if self._fetch_calls == 1: + # First fetch should have only the created_before filter + assert "WHERE created <= '2020-01-02T23:59:59.999999Z'" in q + if self._fetch_calls == 2: + assert ( + "WHERE created <= '2020-01-02T23:59:59.999999Z' AND _pagination_key > '1'" + in q + ) + + if self._pages: + return {"data": self._pages.pop(0)} + return {"data": []} + + +class _SourceBtqlClientCreatedRange: + """Client that validates both created_after and created_before are used.""" + + def __init__( + self, pages: list[list[dict[str, Any]]], mig_cfg: MigrationConfig + ) -> None: + self._pages = pages + self.migration_config = mig_cfg + self.queries: list[str] = [] + self._fetch_calls = 0 + self._preflight_calls = 0 + + async def with_retry(self, _operation_name: str, coro_func): + res = coro_func() + if hasattr(res, "__await__"): + return await res + return res + + async def raw_request( + self, method: str, path: str, *, json: Any = None, **kwargs: Any + ) -> Any: + _ = kwargs + assert method.upper() == "POST" + assert path == "/btql" + q = (json or {}).get("query") + assert isinstance(q, str) + self.queries.append(q) + + # Preflight query selects only _pagination_key. + if "SELECT _pagination_key" in q: + self._preflight_calls += 1 + assert "FROM project_logs('proj-source', shape => 'spans')" in q + assert "WHERE created >= '2020-01-01T00:00:00Z'" in q + assert "ORDER BY _pagination_key ASC" in q + assert "LIMIT 1" in q + return {"data": [{"_pagination_key": "1"}]} + + # Fetch queries select full rows. + assert "SELECT *" in q + assert "FROM project_logs('proj-source', shape => 'spans')" in q + assert "ORDER BY _pagination_key" in q + assert "cursor:" not in q + + self._fetch_calls += 1 + if self._fetch_calls == 1: + # Both created_after and created_before should be present + assert "created >= '2020-01-01T00:00:00Z'" in q + assert "created <= '2020-01-02T23:59:59.999999Z'" in q + + if self._pages: + return {"data": self._pages.pop(0)} + return {"data": []} + + +@pytest.mark.asyncio +async def test_logs_created_before_filters_query(tmp_path: Path) -> None: + # created_before=2020-01-02 (date-only) should canonicalize to end-of-day UTC. + source_cfg = MigrationConfig(created_before="2020-01-02") + dest_cfg = MigrationConfig(created_before="2020-01-02") + + pages = [ + [ + {"id": "a", "created": "2020-01-01T00:00:00Z", "_pagination_key": "1"}, + ], + [ + {"id": "b", "created": "2020-01-02T00:00:00Z", "_pagination_key": "2"}, + ], + ] + + source = _SourceBtqlClientCreatedBefore(pages, source_cfg) + dest = _DestInsertClient(dest_cfg) + + migrator = LogsMigrator( + source, # type: ignore[arg-type] + dest, # type: ignore[arg-type] + tmp_path, + page_limit=1, + insert_batch_size=10, + use_version_snapshot=False, + use_seen_db=False, + progress_hook=None, + ) + migrator.set_destination_project_id("proj-dest") + res = await migrator.migrate_all("proj-source") + + assert res["migrated"] == 2 + assert dest.inserted_ids == ["a", "b"] + + +@pytest.mark.asyncio +async def test_logs_created_before_mismatch_with_checkpoint_errors( + tmp_path: Path, +) -> None: + source_cfg = MigrationConfig(created_before="2020-01-02T23:59:59.999999Z") + dest_cfg = MigrationConfig(created_before="2020-01-02T23:59:59.999999Z") + + # Write a checkpoint state with a different created_before. + (tmp_path / "logs_streaming_state.json").write_text( + '{"created_before":"2020-01-03T23:59:59.999999Z"}' + ) + + source = _SourceBtqlClient([], source_cfg) + dest = _DestInsertClient(dest_cfg) + + migrator = LogsMigrator( + source, # type: ignore[arg-type] + dest, # type: ignore[arg-type] + tmp_path, + page_limit=1, + insert_batch_size=10, + use_version_snapshot=False, + use_seen_db=False, + progress_hook=None, + ) + migrator.set_destination_project_id("proj-dest") + + with pytest.raises(ValueError, match="created_before mismatch"): + await migrator.migrate_all("proj-source") + + +@pytest.mark.asyncio +async def test_logs_created_range_uses_both_filters(tmp_path: Path) -> None: + # Test using both created_after and created_before together. + source_cfg = MigrationConfig( + created_after="2020-01-01", created_before="2020-01-02" + ) + dest_cfg = MigrationConfig(created_after="2020-01-01", created_before="2020-01-02") + + pages = [ + [ + {"id": "a", "created": "2020-01-01T12:00:00Z", "_pagination_key": "1"}, + ], + ] + + source = _SourceBtqlClientCreatedRange(pages, source_cfg) + dest = _DestInsertClient(dest_cfg) + + migrator = LogsMigrator( + source, # type: ignore[arg-type] + dest, # type: ignore[arg-type] + tmp_path, + page_limit=1, + insert_batch_size=10, + use_version_snapshot=False, + use_seen_db=False, + progress_hook=None, + ) + migrator.set_destination_project_id("proj-dest") + res = await migrator.migrate_all("proj-source") + + assert res["migrated"] == 1 + assert dest.inserted_ids == ["a"] + # Verify both filters were used + combined = "\n".join(source.queries) + assert "created >= '2020-01-01T00:00:00Z'" in combined + assert "created <= '2020-01-02T23:59:59.999999Z'" in combined + + +class _SourceBtqlClientSimple: + """Simplified source client that returns pages without strict pagination assertions.""" + + def __init__( + self, pages: list[list[dict[str, Any]]], mig_cfg: MigrationConfig + ) -> None: + self._pages = pages + self.migration_config = mig_cfg + self._call_index = 0 + + async def with_retry(self, _operation_name: str, coro_func): + res = coro_func() + if hasattr(res, "__await__"): + return await res + return res + + async def raw_request( + self, method: str, path: str, *, json: Any = None, **kwargs: Any + ) -> Any: + _ = kwargs + assert method.upper() == "POST" + assert path == "/btql" + q = (json or {}).get("query") + assert isinstance(q, str) + assert "SELECT *" in q + assert "FROM project_logs(" in q + + if self._pages: + return {"data": self._pages.pop(0)} + return {"data": []} + + +class _DestInsertClientCapturingEvents: + """Destination client that captures full event payloads for testing.""" + + def __init__(self, mig_cfg: MigrationConfig) -> None: + self.migration_config = mig_cfg + self.inserted_events: list[dict[str, Any]] = [] + + class _Views: + async def create(self, **kwargs: Any) -> Any: + _ = kwargs + return {"id": "v1"} + + class _ClientObj: + def __init__(self) -> None: + self.views = _Views() + + self.client = _ClientObj() + + async def with_retry(self, _operation_name: str, coro_func): + res = coro_func() + if hasattr(res, "__await__"): + return await res + return res + + async def raw_request( + self, method: str, path: str, *, json: Any = None, **kwargs: Any + ) -> Any: + _ = kwargs + assert method.upper() == "POST" + assert "/v1/project_logs/" in path + assert path.endswith("/insert") + events = (json or {}).get("events", []) + for e in events: + if isinstance(e, dict): + self.inserted_events.append(e) + return {"row_ids": [e.get("id") for e in events if isinstance(e, dict)]} + + +@pytest.mark.asyncio +async def test_logs_strips_tags_from_non_root_spans(tmp_path: Path) -> None: + """Test that tags are stripped from non-root spans but preserved on root spans. + + The Braintrust API does not allow 'tags' on non-root spans (child spans). + A span is a root span if: + - root_span_id is None/absent, OR + - span_id == root_span_id + """ + source_cfg = MigrationConfig() + dest_cfg = MigrationConfig() + + pages = [ + [ + # Root span: span_id == root_span_id (tags should be preserved) + { + "id": "root-span", + "created": "2020-01-01T00:00:00Z", + "_pagination_key": "1", + "span_id": "span-a", + "root_span_id": "span-a", + "tags": ["important", "production"], + }, + # Non-root span: span_id != root_span_id (tags should be STRIPPED) + { + "id": "child-span", + "created": "2020-01-02T00:00:00Z", + "_pagination_key": "2", + "span_id": "span-b", + "root_span_id": "span-a", + "tags": ["should-be-removed"], + }, + # Root span: root_span_id absent (tags should be preserved) + { + "id": "implicit-root", + "created": "2020-01-03T00:00:00Z", + "_pagination_key": "3", + "span_id": "span-c", + # No root_span_id means it's implicitly a root span + "tags": ["also-important"], + }, + ], + ] + + source = _SourceBtqlClientSimple(pages, source_cfg) + dest = _DestInsertClientCapturingEvents(dest_cfg) + + migrator = LogsMigrator( + source, # type: ignore[arg-type] + dest, # type: ignore[arg-type] + tmp_path, + page_limit=10, + insert_batch_size=10, + use_version_snapshot=False, + use_seen_db=False, + progress_hook=None, + ) + migrator.set_destination_project_id("proj-dest") + res = await migrator.migrate_all("proj-source") + + assert res["migrated"] == 3 + assert len(dest.inserted_events) == 3 + + # Find each inserted event by id + events_by_id = {e["id"]: e for e in dest.inserted_events} + + # Root span (span_id == root_span_id): tags should be preserved + root_event = events_by_id["root-span"] + assert "tags" in root_event + assert root_event["tags"] == ["important", "production"] + + # Non-root span (span_id != root_span_id): tags should be STRIPPED + child_event = events_by_id["child-span"] + assert "tags" not in child_event + + # Implicit root span (no root_span_id): tags should be preserved + implicit_root_event = events_by_id["implicit-root"] + assert "tags" in implicit_root_event + assert implicit_root_event["tags"] == ["also-important"]