Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Tests

on:
pull_request:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true

- name: Install dependencies
run: uv sync --all-extras --dev

- name: Run tests
run: uv run pytest
15 changes: 9 additions & 6 deletions braintrust_migrate/btql.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""BTQL helpers (query execution + resilient paging).

Streaming migrators use BTQL (SQL) queries sorted by `_pagination_key` and need
Streaming migrators use native BTQL queries sorted by `_pagination_key` and need
to be resilient to backend timeouts (504) and internal errors (500) that
correlate with large LIMIT values.

Native BTQL syntax is used instead of SQL for compatibility with data planes
that don't yet support SQL mode.
"""

from __future__ import annotations
Expand Down Expand Up @@ -38,11 +41,11 @@ async def find_first_pagination_key_for_created_after(

def _query_text_for_limit(n: int) -> str:
return (
"SELECT _pagination_key\n"
f"FROM {from_expr}\n"
f"WHERE created >= '{btql_quote(created_after)}'\n"
"ORDER BY _pagination_key ASC\n"
f"LIMIT {int(n)}"
"select: _pagination_key\n"
f"from: {from_expr}\n"
f"filter: created >= '{btql_quote(created_after)}'\n"
"sort: _pagination_key asc\n"
f"limit: {int(n)}"
)

out = await fetch_btql_sorted_page_with_retries(
Expand Down
4 changes: 2 additions & 2 deletions braintrust_migrate/resources/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,12 @@ async def _fetch_dataset_events_page_btql_sorted(
limit: int,
state: EventsStreamState,
) -> dict[str, Any]:
"""Fetch one page via POST /btql using SQL syntax, sorted by _pagination_key."""
"""Fetch one page via POST /btql using native BTQL syntax, sorted by _pagination_key."""
last_pagination_key = state.btql_min_pagination_key

def _query_text_for_limit(n: int) -> str:
return build_btql_sorted_page_query(
from_expr=f"dataset('{btql_quote(dataset_id)}', shape => 'spans')",
from_expr=f"dataset('{btql_quote(dataset_id)}') spans",
limit=n,
last_pagination_key=last_pagination_key,
select="*",
Expand Down
6 changes: 3 additions & 3 deletions braintrust_migrate/resources/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ async def _migrate_experiment_events(
) -> None:
"""Migrate events from source experiment to destination experiment.

This uses BTQL (SQL) pagination ordered by `_pagination_key` and bounded inserts
This uses native BTQL pagination ordered by `_pagination_key` and bounded inserts
so it can scale to very large experiments (potentially comparable to project logs).

Deleted events (`_object_delete=true`) are skipped.
Expand Down Expand Up @@ -555,12 +555,12 @@ async def _fetch_experiment_events_page_btql_sorted(
limit: int,
state: EventsStreamState,
) -> dict[str, Any]:
"""Fetch one page via POST /btql using SQL syntax, sorted by _pagination_key."""
"""Fetch one page via POST /btql using native BTQL syntax, sorted by _pagination_key."""
last_pagination_key = state.btql_min_pagination_key

def _query_text_for_limit(n: int) -> str:
return build_btql_sorted_page_query(
from_expr=f"experiment('{btql_quote(experiment_id)}', shape => 'spans')",
from_expr=f"experiment('{btql_quote(experiment_id)}') spans",
limit=n,
last_pagination_key=last_pagination_key,
select="*",
Expand Down
10 changes: 5 additions & 5 deletions braintrust_migrate/resources/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async def _fetch_page_btql_sorted(
project_id: str,
limit: int,
) -> dict[str, Any]:
"""Fetch one page via POST /btql using SQL syntax, sorted by _pagination_key.
"""Fetch one page via POST /btql using native BTQL syntax, sorted by _pagination_key.

This exists to allow inserting logs in created-ascending order, which makes
destination `_xact_id` (and thus UI default ordering by `_pagination_key`)
Expand All @@ -280,16 +280,16 @@ async def _fetch_page_btql_sorted(
# For sorted pagination, we must do offset-based pagination by filtering on
# the last sort key values from the previous page.

# Use SQL syntax for BTQL. This is simpler and tends to be more robust across
# deployments than multi-clause BTQL text generation.
# Use native BTQL syntax (select:/from:/filter:/sort:/limit:) for compatibility
# with data planes that don't yet support SQL mode.
last_pagination_key = self._stream_state.btql_min_pagination_key
last_pagination_key_inclusive = bool(
self._stream_state.btql_min_pagination_key_inclusive
)
created_after = self._stream_state.created_after
created_before = self._stream_state.created_before

from_expr = f"project_logs('{btql_quote(project_id)}', shape => 'spans')"
from_expr = f"project_logs('{btql_quote(project_id)}') spans"

def _query_text_for_limit(n: int) -> str:
return build_btql_sorted_page_query(
Expand Down Expand Up @@ -522,7 +522,7 @@ async def migrate_all(self, project_id: str | None = None) -> dict[str, Any]:
start_pk = await find_first_pagination_key_for_created_after(
client=self.source_client,
from_expr=(
f"project_logs('{btql_quote(source_project_id)}', shape => 'spans')"
f"project_logs('{btql_quote(source_project_id)}') spans"
),
created_after=self._stream_state.created_after,
operation="btql_project_logs_created_after_start_pk",
Expand Down
21 changes: 12 additions & 9 deletions braintrust_migrate/streaming_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ def build_btql_sorted_page_query(
created_before: str | None = None,
select: str = "*",
) -> str:
"""Build a BTQL SQL query for stable sorted paging on `_pagination_key`.
"""Build a native BTQL query for stable sorted paging on `_pagination_key`.

Uses native BTQL syntax (select:/from:/filter:/sort:/limit:) instead of SQL
for compatibility with data planes that don't yet support SQL mode.

Args:
from_expr: The FROM expression (e.g., "project_logs('...', shape => 'spans')")
from_expr: The FROM expression (e.g., "project_logs('...') spans")
limit: Maximum number of rows to return
last_pagination_key: Resume pagination from this key
last_pagination_key_inclusive: If True, use >= instead of > for pagination key
Expand All @@ -128,7 +131,7 @@ def build_btql_sorted_page_query(
select: Fields to select (default "*")

Returns:
BTQL SQL query string
Native BTQL query string
"""
conditions: list[str] = []
if isinstance(created_after, str) and created_after:
Expand All @@ -138,14 +141,14 @@ def build_btql_sorted_page_query(
if isinstance(last_pagination_key, str) and last_pagination_key:
op = ">=" if last_pagination_key_inclusive else ">"
conditions.append(f"_pagination_key {op} '{btql_quote(last_pagination_key)}'")
where = f"WHERE {' AND '.join(conditions)}\n" if conditions else ""
filter_clause = f"filter: {' and '.join(conditions)}\n" if conditions else ""

return (
f"SELECT {select}\n"
f"FROM {from_expr}\n"
f"{where}"
"ORDER BY _pagination_key ASC\n"
f"LIMIT {int(limit)}"
f"select: {select}\n"
f"from: {from_expr}\n"
f"{filter_clause}"
"sort: _pagination_key asc\n"
f"limit: {int(limit)}"
)


Expand Down
Loading