Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/catalog/caching_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,10 @@ impl CatalogManager for CachingCatalogManager {
self.inner().cleanup_stale_results(cutoff).await
}

async fn cleanup_stale_query_runs(&self, cutoff: DateTime<Utc>) -> Result<usize> {
self.inner().cleanup_stale_query_runs(cutoff).await
}

// Upload management methods

async fn create_upload(&self, upload: &UploadInfo) -> Result<()> {
Expand Down
5 changes: 5 additions & 0 deletions src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,11 @@ pub trait CatalogManager: Debug + Send + Sync {
/// Get a single query run by ID.
async fn get_query_run(&self, id: &str) -> Result<Option<QueryRun>>;

/// Mark orphaned running query runs as failed.
/// Runs that have been in "running" status since before the cutoff are marked failed.
/// Returns the number of query runs cleaned up.
async fn cleanup_stale_query_runs(&self, cutoff: DateTime<Utc>) -> Result<usize>;

// Query result persistence methods

/// Create a new query result with the given initial status.
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ impl CatalogManager for MockCatalog {
Ok(None)
}

async fn cleanup_stale_query_runs(&self, _cutoff: DateTime<Utc>) -> Result<usize> {
Ok(0)
}

async fn count_connections_by_secret_id(&self, _secret_id: &str) -> Result<i64> {
Ok(0)
}
Expand Down
20 changes: 20 additions & 0 deletions src/catalog/postgres_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,26 @@ impl CatalogManager for PostgresCatalogManager {
Ok(row.map(QueryRun::from))
}

#[tracing::instrument(
name = "catalog_cleanup_stale_query_runs",
skip(self),
fields(db = "postgres")
)]
async fn cleanup_stale_query_runs(&self, cutoff: DateTime<Utc>) -> Result<usize> {
let now = Utc::now();
let result = sqlx::query(
"UPDATE query_runs SET status = 'failed', \
error_message = 'Server interrupted before query completed', \
completed_at = $1 \
WHERE status = 'running' AND created_at < $2",
)
.bind(now)
.bind(cutoff)
.execute(self.backend.pool())
.await?;
Ok(result.rows_affected() as usize)
}

#[tracing::instrument(
name = "catalog_count_connections_by_secret_id",
skip(self),
Expand Down
21 changes: 21 additions & 0 deletions src/catalog/sqlite_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,27 @@ impl CatalogManager for SqliteCatalogManager {
Ok(row.map(QueryRunRow::into_query_run))
}

#[tracing::instrument(
name = "catalog_cleanup_stale_query_runs",
skip(self),
fields(db = "sqlite")
)]
async fn cleanup_stale_query_runs(&self, cutoff: DateTime<Utc>) -> Result<usize> {
let now = Utc::now().to_rfc3339();
let cutoff_str = cutoff.to_rfc3339();
let result = sqlx::query(
"UPDATE query_runs SET status = 'failed', \
error_message = 'Server interrupted before query completed', \
completed_at = ? \
WHERE status = 'running' AND created_at < ?",
)
.bind(&now)
.bind(&cutoff_str)
.execute(self.backend.pool())
.await?;
Ok(result.rows_affected() as usize)
}

#[tracing::instrument(
name = "catalog_count_connections_by_secret_id",
skip(self),
Expand Down
28 changes: 28 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2733,6 +2733,19 @@ impl RuntimeEngine {
warn!(error = %e, "Failed to cleanup stale results");
}
}
match catalog.cleanup_stale_query_runs(cutoff).await {
Ok(0) => {}
Ok(count) => {
info!(
count = count,
cutoff = %cutoff,
"Cleaned up stale query runs"
);
}
Err(e) => {
warn!(error = %e, "Failed to cleanup stale query runs");
}
}
}
}
}
Expand Down Expand Up @@ -3111,6 +3124,21 @@ impl RuntimeEngineBuilder {
warn!("Failed to process pending deletions on startup: {}", e);
}

// Nothing can be in-flight on startup; clean up all orphaned running query runs
let cutoff = Utc::now() + chrono::Duration::seconds(1);
match engine.catalog.cleanup_stale_query_runs(cutoff).await {
Ok(0) => {}
Ok(count) => {
info!(
count = count,
"Cleaned up orphaned query runs from previous crash"
);
}
Err(e) => {
warn!(error = %e, "Failed to cleanup orphaned query runs on startup");
}
}

Ok(engine)
}
}
Expand Down
64 changes: 64 additions & 0 deletions tests/catalog_manager_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,70 @@ macro_rules! catalog_manager_tests {
assert_eq!(run.snapshot_id, snap.id);
}

#[tokio::test]
async fn cleanup_stale_query_runs() {
let ctx = super::$setup_fn().await;
let catalog = ctx.manager();

let snap = catalog.get_or_create_snapshot("SELECT 1").await.unwrap();

// Create a running query run
let running_id = runtimedb::id::generate_query_run_id();
catalog
.create_query_run(CreateQueryRun {
id: &running_id,
snapshot_id: &snap.id,
saved_query_id: None,
saved_query_version: None,
trace_id: None,
})
.await
.unwrap();

// Create a succeeded query run
let succeeded_id = runtimedb::id::generate_query_run_id();
catalog
.create_query_run(CreateQueryRun {
id: &succeeded_id,
snapshot_id: &snap.id,
saved_query_id: None,
saved_query_version: None,
trace_id: None,
})
.await
.unwrap();
catalog
.update_query_run(
&succeeded_id,
QueryRunUpdate::Succeeded {
result_id: None,
row_count: 1,
execution_time_ms: 10,
warning_message: None,
},
)
.await
.unwrap();

// Cleanup with a cutoff in the future (should catch the running run)
let cutoff = chrono::Utc::now() + chrono::Duration::seconds(60);
let cleaned = catalog.cleanup_stale_query_runs(cutoff).await.unwrap();
assert_eq!(cleaned, 1);

// Verify the running run is now failed
let run = catalog.get_query_run(&running_id).await.unwrap().unwrap();
assert_eq!(run.status, QueryRunStatus::Failed);
assert_eq!(
run.error_message.as_deref(),
Some("Server interrupted before query completed")
);
assert!(run.completed_at.is_some());

// Verify the succeeded run is unchanged
let run = catalog.get_query_run(&succeeded_id).await.unwrap().unwrap();
assert_eq!(run.status, QueryRunStatus::Succeeded);
}

#[tokio::test]
async fn saved_query_version_list_pagination() {
let ctx = super::$setup_fn().await;
Expand Down
84 changes: 82 additions & 2 deletions tests/result_persistence_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use datafusion::prelude::SessionContext;
use rand::RngCore;
use runtimedb::catalog::{
CatalogManager, ConnectionInfo, CreateQueryRun, DatasetInfo, OptimisticLock, PendingDeletion,
QueryResult, QueryRun, QueryRunCursor, QueryRunUpdate, ResultStatus, ResultUpdate, SavedQuery,
SavedQueryVersion, SqlSnapshot, SqliteCatalogManager, TableInfo, UploadInfo,
QueryResult, QueryRun, QueryRunCursor, QueryRunStatus, QueryRunUpdate, ResultStatus,
ResultUpdate, SavedQuery, SavedQueryVersion, SqlSnapshot, SqliteCatalogManager, TableInfo,
UploadInfo,
};
use runtimedb::http::app_server::{AppServer, PATH_QUERY, PATH_RESULT, PATH_RESULTS};
use runtimedb::secrets::{SecretMetadata, SecretStatus};
Expand Down Expand Up @@ -259,6 +260,10 @@ impl CatalogManager for FailingCatalog {
self.inner.cleanup_stale_results(cutoff).await
}

async fn cleanup_stale_query_runs(&self, cutoff: DateTime<Utc>) -> Result<usize> {
self.inner.cleanup_stale_query_runs(cutoff).await
}

// Upload management methods

async fn create_upload(&self, upload: &UploadInfo) -> Result<()> {
Expand Down Expand Up @@ -1973,3 +1978,78 @@ async fn test_get_result_missing_parquet_returns_failed_status() -> Result<()> {

Ok(())
}

/// Orphaned "running" query runs from a previous crash are marked failed on startup.
#[tokio::test]
async fn startup_cleans_up_orphaned_query_runs() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let catalog_path = temp_dir.path().join("catalog.db");
let catalog = SqliteCatalogManager::new(catalog_path.to_str().unwrap()).await?;
catalog.run_migrations().await?;

let snap = catalog.get_or_create_snapshot("SELECT 1").await?;

// Create a running query run (simulates orphaned run from crash)
let orphaned_id = runtimedb::id::generate_query_run_id();
catalog
.create_query_run(CreateQueryRun {
id: &orphaned_id,
snapshot_id: &snap.id,
saved_query_id: None,
saved_query_version: None,
trace_id: None,
})
.await?;

// Create a succeeded run (should not be affected)
let succeeded_id = runtimedb::id::generate_query_run_id();
catalog
.create_query_run(CreateQueryRun {
id: &succeeded_id,
snapshot_id: &snap.id,
saved_query_id: None,
saved_query_version: None,
trace_id: None,
})
.await?;
catalog
.update_query_run(
&succeeded_id,
QueryRunUpdate::Succeeded {
result_id: None,
row_count: 1,
execution_time_ms: 10,
warning_message: None,
},
)
.await?;

assert_eq!(
catalog.get_query_run(&orphaned_id).await?.unwrap().status,
QueryRunStatus::Running
);

// Boot engine — startup cleanup should catch the orphaned run regardless of timeout
let catalog = Arc::new(catalog);
let engine = RuntimeEngine::builder()
.base_dir(temp_dir.path())
.catalog(catalog.clone())
.secret_key(generate_test_secret_key())
.build()
.await?;

let run = catalog.get_query_run(&orphaned_id).await?.unwrap();
assert_eq!(run.status, QueryRunStatus::Failed);
assert_eq!(
run.error_message.as_deref(),
Some("Server interrupted before query completed")
);
assert!(run.completed_at.is_some());

let run = catalog.get_query_run(&succeeded_id).await?.unwrap();
assert_eq!(run.status, QueryRunStatus::Succeeded);

engine.shutdown().await?;

Ok(())
}
Loading