Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions braintrust_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ def migrate(
typer.Option(
"--created-before",
help=(
"Only migrate data created before this date (exclusive). Format: YYYY-MM-DD (e.g. 2026-02-01). "
"Use with --created-after for date ranges (e.g., --created-after 2026-01-01 --created-before 2026-02-01)."
"Only migrate data created on or before this date. Format: YYYY-MM-DD (e.g. 2026-01-15). "
"Date-only values are treated as end-of-day (23:59:59.999999 UTC). "
"Can be combined with --created-after to define a date range."
),
envvar="MIGRATION_CREATED_BEFORE",
),
Expand Down
44 changes: 30 additions & 14 deletions braintrust_migrate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ async def with_retry(self, operation_name: str, coro_func):
attempts = int(self.migration_config.retry_attempts) + 1
base_delay = float(self.migration_config.retry_delay)
max_delay = 60.0
# For 429 rate limits, retry many more times (up to ~30 min of waiting)
rate_limit_max_attempts = 30

HTTP_STATUS_TOO_MANY_REQUESTS = 429

Expand Down Expand Up @@ -384,7 +386,8 @@ def _classify_exception(
retry_after = _parse_retry_after_seconds(exc.response)
if status == HTTP_STATUS_TOO_MANY_REQUESTS:
return True, status, retry_after
if status in {408, 409, 425, 500, 502, 503, 504}:
# 413 is retryable - bisect logic in insert_bisect.py will handle splitting batches
if status in {408, 409, 413, 425, 500, 502, 503, 504}:
return True, status, retry_after
return False, status, retry_after
if isinstance(exc, httpx.RequestError):
Expand Down Expand Up @@ -416,7 +419,8 @@ def _exc_context(exc: Exception) -> dict[str, Any]:
return ctx

last_exc: Exception | None = None
for attempt in range(1, attempts + 1):
attempt = 1
while True:
try:
self._logger.debug(
"Executing operation",
Expand All @@ -433,48 +437,60 @@ def _exc_context(exc: Exception) -> dict[str, Any]:
retryable, status, retry_after = _classify_exception(e)
exc_ctx = _exc_context(e)

if not retryable or attempt >= attempts:
# For 429 rate limits, be much more persistent
is_rate_limit = status == HTTP_STATUS_TOO_MANY_REQUESTS
effective_max_attempts = (
rate_limit_max_attempts if is_rate_limit else attempts
)

if not retryable or attempt >= effective_max_attempts:
self._logger.error(
"Operation failed",
operation=operation_name,
attempt=attempt,
max_attempts=attempts,
max_attempts=effective_max_attempts,
status_code=status,
error=str(e),
**exc_ctx,
)
raise

exp_delay = min(max_delay, base_delay * (2 ** (attempt - 1)))
# small jitter to avoid thundering herd
jitter = random.uniform(0, min(1.0, exp_delay * 0.1))
delay = min(max_delay, exp_delay + jitter)
if retry_after is not None:
delay = min(max_delay, max(delay, retry_after))

# Concise message for rate limits (expected behavior), verbose for other errors
if status == HTTP_STATUS_TOO_MANY_REQUESTS:
if is_rate_limit:
# For rate limits: use full retry_after, add significant jitter
# to spread out requests and avoid thundering herd
base_wait = retry_after if retry_after is not None else 60.0
# Add 10-50% jitter to spread out concurrent requests
jitter = random.uniform(base_wait * 0.1, base_wait * 0.5)
delay = min(base_wait + jitter, max_delay)
self._logger.warning(
"Rate limited, retrying",
operation=operation_name,
retry_in_seconds=round(delay, 1),
attempt=f"{attempt}/{attempts}",
attempt=f"{attempt}/{effective_max_attempts}",
)
else:
# For other errors: use exponential backoff with small jitter
jitter = random.uniform(0, min(1.0, exp_delay * 0.1))
delay = min(max_delay, exp_delay + jitter)
if retry_after is not None:
delay = min(max_delay, max(delay, retry_after))
self._logger.warning(
"Operation failed, backing off and retrying",
operation=operation_name,
attempt=attempt,
max_attempts=attempts,
max_attempts=effective_max_attempts,
status_code=status,
retry_after_seconds=retry_after,
sleep_seconds=delay,
error=str(e),
**exc_ctx,
)
await asyncio.sleep(delay)
attempt += 1

# Should be unreachable
# Should be unreachable - loop exits via return or raise
if last_exc is not None:
raise last_exc
raise BraintrustAPIError(f"{operation_name} failed with unknown error")
Expand Down
50 changes: 37 additions & 13 deletions braintrust_migrate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
_DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")


def _canonicalize_datetime(value: str, field_name: str) -> str:
"""Normalize a user-supplied datetime string for BTQL filters.
def canonicalize_created_after(value: str) -> str:
"""Normalize a user-supplied datetime string for BTQL `created >= ...` filters.

Accepts:
- YYYY-MM-DD (treated as midnight UTC)
Expand All @@ -26,7 +26,7 @@ def _canonicalize_datetime(value: str, field_name: str) -> str:
"""
v = (value or "").strip()
if not v:
raise ValueError(f"{field_name} cannot be empty")
raise ValueError("created_after cannot be empty")

if _DATE_ONLY_RE.match(v):
dt = datetime.fromisoformat(v).replace(tzinfo=UTC)
Expand All @@ -37,7 +37,7 @@ def _canonicalize_datetime(value: str, field_name: str) -> str:
dt = datetime.fromisoformat(v_for_parse)
except Exception as e:
raise ValueError(
f"Invalid {field_name} date (expected YYYY-MM-DD like 2026-01-15): {value!r}"
f"Invalid created_after date (expected YYYY-MM-DD like 2026-01-15): {value!r}"
) from e

if dt.tzinfo is None:
Expand All @@ -46,14 +46,38 @@ def _canonicalize_datetime(value: str, field_name: str) -> str:
return dt.isoformat().replace("+00:00", "Z")


def canonicalize_created_after(value: str) -> str:
"""Normalize a user-supplied datetime string for BTQL `created >= ...` filters."""
return _canonicalize_datetime(value, "created_after")
def canonicalize_created_before(value: str) -> str:
"""Normalize a user-supplied datetime string for BTQL `created <= ...` filters.

Accepts:
- YYYY-MM-DD (treated as end-of-day 23:59:59.999999 UTC for inclusive range)
- ISO-8601 datetimes, with or without timezone (naive treated as UTC)

def canonicalize_created_before(value: str) -> str:
"""Normalize a user-supplied datetime string for BTQL `created < ...` filters."""
return _canonicalize_datetime(value, "created_before")
Returns a canonical UTC ISO-8601 string ending in 'Z'.
"""
v = (value or "").strip()
if not v:
raise ValueError("created_before cannot be empty")

if _DATE_ONLY_RE.match(v):
# For date-only, use end-of-day to make it inclusive of the full day
dt = datetime.fromisoformat(v).replace(
hour=23, minute=59, second=59, microsecond=999999, tzinfo=UTC
)
return dt.isoformat().replace("+00:00", "Z")

v_for_parse = v[:-1] + "+00:00" if v.endswith("Z") else v
try:
dt = datetime.fromisoformat(v_for_parse)
except Exception as e:
raise ValueError(
f"Invalid created_before date (expected YYYY-MM-DD like 2026-01-15): {value!r}"
) from e

if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
dt = dt.astimezone(UTC)
return dt.isoformat().replace("+00:00", "Z")


class BraintrustOrgConfig(BaseModel):
Expand Down Expand Up @@ -156,10 +180,10 @@ class MigrationConfig(BaseModel):
default=None,
description=(
"Optional ISO-8601 timestamp filter. When set: "
"(1) project logs migration only migrates events with created < created_before, "
"(2) experiments migration only migrates experiments with created < created_before "
"(1) project logs migration only migrates events with created <= created_before, "
"(2) experiments migration only migrates experiments with created <= created_before "
"(and all their events). "
"Use with created_after for date ranges (e.g., --created-after 2026-01-01 --created-before 2026-02-01)."
"Can be combined with created_after to define a date range."
),
)

Expand Down
15 changes: 13 additions & 2 deletions braintrust_migrate/insert_bisect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ async def insert_with_413_bisect(
is_http_413: Callable[[Exception], bool],
on_success: Callable[[list[T], R], Awaitable[None]] | None = None,
on_single_413: Callable[[T, Exception], Awaitable[None]] | None = None,
) -> None:
skip_single_413: bool = False,
) -> list[T]:
"""Insert items, bisecting on 413 to isolate oversized payloads.

- Preserves the original item order by always processing the left half first.
- If a singleton batch triggers 413, invokes `on_single_413` (if provided) and re-raises.
- If a singleton batch triggers 413, invokes `on_single_413` (if provided).
- If `skip_single_413` is True, the oversized item is skipped and migration
continues. If False (default), the exception is re-raised.

Returns:
List of items that were skipped due to being oversized (empty if none).
"""
skipped_oversize: list[T] = []
stack: list[list[T]] = [items]
while stack:
batch = stack.pop()
Expand All @@ -43,6 +50,9 @@ async def insert_with_413_bisect(
if len(batch) == 1:
if on_single_413 is not None:
await on_single_413(batch[0], e)
if skip_single_413:
skipped_oversize.append(batch[0])
continue
raise
mid = math.ceil(len(batch) / 2)
left = batch[:mid]
Expand All @@ -52,3 +62,4 @@ async def insert_with_413_bisect(
stack.append(left)
continue
raise
return skipped_oversize
4 changes: 4 additions & 0 deletions braintrust_migrate/resources/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,9 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None:
incr_skipped_seen=lambda n: setattr(
state, "skipped_seen", int(state.skipped_seen) + int(n)
),
incr_skipped_oversize=lambda n: setattr(
state, "skipped_oversize", int(state.skipped_oversize) + int(n)
),
incr_attachments_copied=lambda n: setattr(
state,
"attachments_copied",
Expand All @@ -702,6 +705,7 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None:
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted,
"skipped_seen_total": state.skipped_seen,
"skipped_oversize_total": state.skipped_oversize,
"attachments_copied_total": state.attachments_copied,
"cursor": (
(state.btql_min_pagination_key[:16] + "…")
Expand Down
13 changes: 13 additions & 0 deletions braintrust_migrate/resources/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,15 @@ def _event_to_insert(
if event.get("created") is not None:
origin["created"] = event.get("created")
out["origin"] = origin

# Braintrust API does not allow 'tags' on non-root spans.
# A span is a root span if span_id equals root_span_id (or root_span_id is absent).
span_id = event.get("span_id")
root_span_id = event.get("root_span_id")
is_root_span = root_span_id is None or span_id == root_span_id
if not is_root_span and "tags" in out:
del out["tags"]

return out

async def _fetch_experiment_events_page(
Expand Down Expand Up @@ -762,6 +771,9 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None:
incr_skipped_seen=lambda n: setattr(
state, "skipped_seen", int(state.skipped_seen) + int(n)
),
incr_skipped_oversize=lambda n: setattr(
state, "skipped_oversize", int(state.skipped_oversize) + int(n)
),
incr_attachments_copied=lambda n: setattr(
state,
"attachments_copied",
Expand All @@ -783,6 +795,7 @@ async def _on_single_413(event: dict[str, Any], err: Exception) -> None:
"inserted_bytes_total": state.inserted_bytes,
"skipped_deleted_total": state.skipped_deleted,
"skipped_seen_total": state.skipped_seen,
"skipped_oversize_total": state.skipped_oversize,
"attachments_copied_total": state.attachments_copied,
"cursor": (
(state.btql_min_pagination_key[:16] + "…")
Expand Down
Loading