Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a4bb8b8
feat(migrations): add saved queries and sql_snapshots
zfarrell Feb 8, 2026
058d684
feat(catalog): add SqlSnapshot entity and saved query types
zfarrell Feb 8, 2026
c85413e
feat(catalog): implement sql_snapshots and saved queries
zfarrell Feb 8, 2026
7aab321
feat(engine): wire snapshot creation into query execution
zfarrell Feb 8, 2026
2797f96
feat(http): add saved queries CRUD and execute endpoints
zfarrell Feb 8, 2026
744f737
test: add saved queries and sql_snapshots test coverage
zfarrell Feb 8, 2026
3bda42e
fix(catalog): use BEGIN IMMEDIATE for SQLite write transactions
zfarrell Feb 8, 2026
b598116
fix(migrations): drop pgcrypto dep, document backfill ID format
zfarrell Feb 8, 2026
601895b
perf(engine): skip version creation on no-op updates
zfarrell Feb 8, 2026
d413f45
refactor(http): clean up validation and accept empty execute body
zfarrell Feb 8, 2026
f87076a
feat(catalog): paginate versions, dedup mock snapshots
zfarrell Feb 8, 2026
fc3cc09
fix(catalog): move no-op version check inside transaction
zfarrell Feb 10, 2026
539680c
refactor(catalog): add raw-length guard, safe i64 casts
zfarrell Feb 10, 2026
130856a
fix(catalog): checked version increment, stable pagination, name-only…
zfarrell Feb 12, 2026
04e3450
test(catalog): add no-op, name-only, dup-name, empty-body tests
zfarrell Feb 12, 2026
f93b182
refactor(catalog): optional sql on update, skip redundant snapshot lo…
zfarrell Feb 12, 2026
6853196
test(catalog): add sql-less rename and empty-body update tests
zfarrell Feb 12, 2026
402fb31
refactor(http): extract detail helper, add tracing to all handlers
zfarrell Feb 18, 2026
e271cea
test(catalog): add overflow, pagination, nonexistent version tests
zfarrell Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions migrations/postgres/v6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
-- Saved queries with versioned SQL snapshots.
-- SQL text is stored in a content-addressable sql_snapshots table,
-- referenced by ID from saved_query_versions and query_runs.

-- 1) Create sql_snapshots table (content-addressable SQL store)
CREATE TABLE sql_snapshots (
id TEXT PRIMARY KEY,
sql_hash TEXT NOT NULL,
sql_text TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX ux_sql_snapshots_hash_text ON sql_snapshots (sql_hash, sql_text);
CREATE INDEX idx_sql_snapshots_hash ON sql_snapshots (sql_hash);

-- 2) Create saved_queries table
-- Note: name is intentionally NOT UNIQUE — duplicate names are allowed.
CREATE TABLE saved_queries (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
latest_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 3) Create saved_query_versions table
CREATE TABLE saved_query_versions (
saved_query_id TEXT NOT NULL REFERENCES saved_queries(id) ON DELETE CASCADE,
version INTEGER NOT NULL,
snapshot_id TEXT NOT NULL REFERENCES sql_snapshots(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (saved_query_id, version)
);

-- 4) Backfill sql_snapshots from existing query_runs.
-- IDs use the 'snap' prefix + 26 hex chars from md5, matching the 30-char
-- length of runtime nanoid-based IDs. The hex charset (0-9a-f) is narrower
-- than the full nanoid alphabet but functionally equivalent since IDs are
-- opaque. random() provides uniqueness without requiring pgcrypto.
INSERT INTO sql_snapshots (id, sql_hash, sql_text, created_at)
SELECT
'snap' || substr(md5(sql_hash || sql_text || random()::text), 1, 26),
sql_hash,
sql_text,
MIN(created_at)
FROM query_runs
WHERE sql_text IS NOT NULL
GROUP BY sql_hash, sql_text;

-- 5) Add snapshot_id to query_runs and populate
ALTER TABLE query_runs ADD COLUMN snapshot_id TEXT;

UPDATE query_runs qr
SET snapshot_id = s.id
FROM sql_snapshots s
WHERE s.sql_hash = qr.sql_hash AND s.sql_text = qr.sql_text;

-- Make snapshot_id NOT NULL and add FK
ALTER TABLE query_runs ALTER COLUMN snapshot_id SET NOT NULL;
ALTER TABLE query_runs
ADD CONSTRAINT fk_qr_snapshot FOREIGN KEY (snapshot_id) REFERENCES sql_snapshots(id);

-- 6) Drop old columns from query_runs
ALTER TABLE query_runs DROP COLUMN sql_text;
ALTER TABLE query_runs DROP COLUMN sql_hash;
DROP INDEX IF EXISTS idx_query_runs_sql_hash_created;

-- 7) Add new indexes
ALTER TABLE query_runs ADD COLUMN saved_query_id TEXT;
ALTER TABLE query_runs ADD COLUMN saved_query_version INTEGER;
CREATE INDEX idx_query_runs_saved_query_id ON query_runs(saved_query_id);
CREATE INDEX idx_query_runs_snapshot_id ON query_runs(snapshot_id);
87 changes: 87 additions & 0 deletions migrations/sqlite/v6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- Saved queries with versioned SQL snapshots.
-- SQL text is stored in a content-addressable sql_snapshots table,
-- referenced by ID from saved_query_versions and query_runs.

-- 1) Create sql_snapshots table (content-addressable SQL store)
CREATE TABLE sql_snapshots (
id TEXT PRIMARY KEY,
sql_hash TEXT NOT NULL,
sql_text TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX ux_sql_snapshots_hash_text ON sql_snapshots (sql_hash, sql_text);
CREATE INDEX idx_sql_snapshots_hash ON sql_snapshots (sql_hash);

-- 2) Create saved_queries table
-- Note: name is intentionally NOT UNIQUE — duplicate names are allowed.
CREATE TABLE saved_queries (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
latest_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 3) Create saved_query_versions table
CREATE TABLE saved_query_versions (
saved_query_id TEXT NOT NULL REFERENCES saved_queries(id),
version INTEGER NOT NULL,
snapshot_id TEXT NOT NULL REFERENCES sql_snapshots(id),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (saved_query_id, version)
);

-- 4) Backfill sql_snapshots from existing query_runs.
-- IDs use the 'snap' prefix + 26 hex chars from randomblob, matching the
-- 30-char length of runtime nanoid-based IDs. The hex charset (0-9a-f) is
-- narrower than the full nanoid alphabet but functionally equivalent since
-- IDs are opaque.
INSERT INTO sql_snapshots (id, sql_hash, sql_text, created_at)
SELECT
'snap' || lower(hex(randomblob(13))),
sql_hash,
sql_text,
MIN(created_at)
FROM query_runs
WHERE sql_text IS NOT NULL
GROUP BY sql_hash, sql_text;

-- 5) Recreate query_runs with snapshot_id, without sql_text/sql_hash
CREATE TABLE query_runs_new (
id TEXT PRIMARY KEY,
snapshot_id TEXT NOT NULL REFERENCES sql_snapshots(id),
trace_id TEXT,
status TEXT NOT NULL,
result_id TEXT,
error_message TEXT,
warning_message TEXT,
row_count BIGINT,
execution_time_ms BIGINT,
saved_query_id TEXT,
saved_query_version INTEGER,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
);

INSERT INTO query_runs_new (
id, snapshot_id, trace_id, status, result_id,
error_message, warning_message, row_count, execution_time_ms,
saved_query_id, saved_query_version, created_at, completed_at
)
SELECT
qr.id,
s.id,
qr.trace_id, qr.status, qr.result_id,
qr.error_message, qr.warning_message, qr.row_count, qr.execution_time_ms,
NULL, NULL, qr.created_at, qr.completed_at
FROM query_runs qr
JOIN sql_snapshots s ON s.sql_hash = qr.sql_hash AND s.sql_text = qr.sql_text;

DROP TABLE query_runs;
ALTER TABLE query_runs_new RENAME TO query_runs;

-- Recreate indexes on query_runs
CREATE INDEX idx_query_runs_created_id ON query_runs(created_at DESC, id DESC);
CREATE INDEX idx_query_runs_trace_id ON query_runs(trace_id);
CREATE INDEX idx_query_runs_saved_query_id ON query_runs(saved_query_id);
CREATE INDEX idx_query_runs_snapshot_id ON query_runs(snapshot_id);
62 changes: 60 additions & 2 deletions src/catalog/caching_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

use super::{
CatalogManager, ConnectionInfo, CreateQueryRun, DatasetInfo, OptimisticLock, PendingDeletion,
QueryResult, QueryRun, QueryRunCursor, QueryRunUpdate, ResultStatus, ResultUpdate, TableInfo,
UploadInfo,
QueryResult, QueryRun, QueryRunCursor, QueryRunUpdate, ResultStatus, ResultUpdate, SavedQuery,
SavedQueryVersion, SqlSnapshot, TableInfo, UploadInfo,
};
use crate::config::CacheConfig;
use crate::secrets::{SecretMetadata, SecretStatus};
Expand Down Expand Up @@ -1124,6 +1124,64 @@ impl CatalogManager for CachingCatalogManager {
self.inner().release_upload(id).await
}

// SQL snapshot methods (pass-through, no caching)

async fn get_or_create_snapshot(&self, sql_text: &str) -> Result<SqlSnapshot> {
self.inner().get_or_create_snapshot(sql_text).await
}

// Saved query methods (pass-through, no caching)

async fn create_saved_query(&self, name: &str, snapshot_id: &str) -> Result<SavedQuery> {
self.inner().create_saved_query(name, snapshot_id).await
}

async fn get_saved_query(&self, id: &str) -> Result<Option<SavedQuery>> {
self.inner().get_saved_query(id).await
}

async fn list_saved_queries(
&self,
limit: usize,
offset: usize,
) -> Result<(Vec<SavedQuery>, bool)> {
self.inner().list_saved_queries(limit, offset).await
}

async fn update_saved_query(
&self,
id: &str,
name: Option<&str>,
snapshot_id: &str,
) -> Result<Option<SavedQuery>> {
self.inner().update_saved_query(id, name, snapshot_id).await
}

async fn delete_saved_query(&self, id: &str) -> Result<bool> {
self.inner().delete_saved_query(id).await
}

async fn get_saved_query_version(
&self,
saved_query_id: &str,
version: i32,
) -> Result<Option<SavedQueryVersion>> {
self.inner()
.get_saved_query_version(saved_query_id, version)
.await
}

async fn list_saved_query_versions(
&self,
saved_query_id: &str,
limit: usize,
offset: usize,
) -> Result<(Vec<SavedQueryVersion>, bool)> {
self.inner()
.list_saved_query_versions(saved_query_id, limit, offset)
.await
}

// Dataset management methods (with cache invalidation for list_dataset_table_names)

#[tracing::instrument(name = "catalog.create_dataset", skip(self, dataset), fields(dataset_id = %dataset.id))]
Expand Down
Loading
Loading