diff --git a/src/catalog/caching_manager.rs b/src/catalog/caching_manager.rs index d832994..54985e3 100644 --- a/src/catalog/caching_manager.rs +++ b/src/catalog/caching_manager.rs @@ -1098,6 +1098,10 @@ impl CatalogManager for CachingCatalogManager { self.inner().cleanup_stale_results(cutoff).await } + async fn cleanup_stale_query_runs(&self, cutoff: DateTime) -> Result { + self.inner().cleanup_stale_query_runs(cutoff).await + } + // Upload management methods async fn create_upload(&self, upload: &UploadInfo) -> Result<()> { diff --git a/src/catalog/manager.rs b/src/catalog/manager.rs index 3ef88ed..a528286 100644 --- a/src/catalog/manager.rs +++ b/src/catalog/manager.rs @@ -729,6 +729,11 @@ pub trait CatalogManager: Debug + Send + Sync { /// Get a single query run by ID. async fn get_query_run(&self, id: &str) -> Result>; + /// Mark orphaned running query runs as failed. + /// Runs that have been in "running" status since before the cutoff are marked failed. + /// Returns the number of query runs cleaned up. + async fn cleanup_stale_query_runs(&self, cutoff: DateTime) -> Result; + // Query result persistence methods /// Create a new query result with the given initial status. diff --git a/src/catalog/mock_catalog.rs b/src/catalog/mock_catalog.rs index 0d4f501..36dd409 100644 --- a/src/catalog/mock_catalog.rs +++ b/src/catalog/mock_catalog.rs @@ -313,6 +313,10 @@ impl CatalogManager for MockCatalog { Ok(None) } + async fn cleanup_stale_query_runs(&self, _cutoff: DateTime) -> Result { + Ok(0) + } + async fn count_connections_by_secret_id(&self, _secret_id: &str) -> Result { Ok(0) } diff --git a/src/catalog/postgres_manager.rs b/src/catalog/postgres_manager.rs index ca2e65d..af57026 100644 --- a/src/catalog/postgres_manager.rs +++ b/src/catalog/postgres_manager.rs @@ -763,6 +763,26 @@ impl CatalogManager for PostgresCatalogManager { Ok(row.map(QueryRun::from)) } + #[tracing::instrument( + name = "catalog_cleanup_stale_query_runs", + skip(self), + fields(db = "postgres") + )] + async fn cleanup_stale_query_runs(&self, cutoff: DateTime) -> Result { + let now = Utc::now(); + let result = sqlx::query( + "UPDATE query_runs SET status = 'failed', \ + error_message = 'Server interrupted before query completed', \ + completed_at = $1 \ + WHERE status = 'running' AND created_at < $2", + ) + .bind(now) + .bind(cutoff) + .execute(self.backend.pool()) + .await?; + Ok(result.rows_affected() as usize) + } + #[tracing::instrument( name = "catalog_count_connections_by_secret_id", skip(self), diff --git a/src/catalog/sqlite_manager.rs b/src/catalog/sqlite_manager.rs index 8d20685..d751cbc 100644 --- a/src/catalog/sqlite_manager.rs +++ b/src/catalog/sqlite_manager.rs @@ -805,6 +805,27 @@ impl CatalogManager for SqliteCatalogManager { Ok(row.map(QueryRunRow::into_query_run)) } + #[tracing::instrument( + name = "catalog_cleanup_stale_query_runs", + skip(self), + fields(db = "sqlite") + )] + async fn cleanup_stale_query_runs(&self, cutoff: DateTime) -> Result { + let now = Utc::now().to_rfc3339(); + let cutoff_str = cutoff.to_rfc3339(); + let result = sqlx::query( + "UPDATE query_runs SET status = 'failed', \ + error_message = 'Server interrupted before query completed', \ + completed_at = ? \ + WHERE status = 'running' AND created_at < ?", + ) + .bind(&now) + .bind(&cutoff_str) + .execute(self.backend.pool()) + .await?; + Ok(result.rows_affected() as usize) + } + #[tracing::instrument( name = "catalog_count_connections_by_secret_id", skip(self), diff --git a/src/engine.rs b/src/engine.rs index a89e626..b45ae75 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2733,6 +2733,19 @@ impl RuntimeEngine { warn!(error = %e, "Failed to cleanup stale results"); } } + match catalog.cleanup_stale_query_runs(cutoff).await { + Ok(0) => {} + Ok(count) => { + info!( + count = count, + cutoff = %cutoff, + "Cleaned up stale query runs" + ); + } + Err(e) => { + warn!(error = %e, "Failed to cleanup stale query runs"); + } + } } } } @@ -3111,6 +3124,21 @@ impl RuntimeEngineBuilder { warn!("Failed to process pending deletions on startup: {}", e); } + // Nothing can be in-flight on startup; clean up all orphaned running query runs + let cutoff = Utc::now() + chrono::Duration::seconds(1); + match engine.catalog.cleanup_stale_query_runs(cutoff).await { + Ok(0) => {} + Ok(count) => { + info!( + count = count, + "Cleaned up orphaned query runs from previous crash" + ); + } + Err(e) => { + warn!(error = %e, "Failed to cleanup orphaned query runs on startup"); + } + } + Ok(engine) } } diff --git a/tests/catalog_manager_suite.rs b/tests/catalog_manager_suite.rs index 9b8df93..cfb1094 100644 --- a/tests/catalog_manager_suite.rs +++ b/tests/catalog_manager_suite.rs @@ -1299,6 +1299,70 @@ macro_rules! catalog_manager_tests { assert_eq!(run.snapshot_id, snap.id); } + #[tokio::test] + async fn cleanup_stale_query_runs() { + let ctx = super::$setup_fn().await; + let catalog = ctx.manager(); + + let snap = catalog.get_or_create_snapshot("SELECT 1").await.unwrap(); + + // Create a running query run + let running_id = runtimedb::id::generate_query_run_id(); + catalog + .create_query_run(CreateQueryRun { + id: &running_id, + snapshot_id: &snap.id, + saved_query_id: None, + saved_query_version: None, + trace_id: None, + }) + .await + .unwrap(); + + // Create a succeeded query run + let succeeded_id = runtimedb::id::generate_query_run_id(); + catalog + .create_query_run(CreateQueryRun { + id: &succeeded_id, + snapshot_id: &snap.id, + saved_query_id: None, + saved_query_version: None, + trace_id: None, + }) + .await + .unwrap(); + catalog + .update_query_run( + &succeeded_id, + QueryRunUpdate::Succeeded { + result_id: None, + row_count: 1, + execution_time_ms: 10, + warning_message: None, + }, + ) + .await + .unwrap(); + + // Cleanup with a cutoff in the future (should catch the running run) + let cutoff = chrono::Utc::now() + chrono::Duration::seconds(60); + let cleaned = catalog.cleanup_stale_query_runs(cutoff).await.unwrap(); + assert_eq!(cleaned, 1); + + // Verify the running run is now failed + let run = catalog.get_query_run(&running_id).await.unwrap().unwrap(); + assert_eq!(run.status, QueryRunStatus::Failed); + assert_eq!( + run.error_message.as_deref(), + Some("Server interrupted before query completed") + ); + assert!(run.completed_at.is_some()); + + // Verify the succeeded run is unchanged + let run = catalog.get_query_run(&succeeded_id).await.unwrap().unwrap(); + assert_eq!(run.status, QueryRunStatus::Succeeded); + } + #[tokio::test] async fn saved_query_version_list_pagination() { let ctx = super::$setup_fn().await; diff --git a/tests/result_persistence_tests.rs b/tests/result_persistence_tests.rs index 154d14e..a80c929 100644 --- a/tests/result_persistence_tests.rs +++ b/tests/result_persistence_tests.rs @@ -10,8 +10,9 @@ use datafusion::prelude::SessionContext; use rand::RngCore; use runtimedb::catalog::{ CatalogManager, ConnectionInfo, CreateQueryRun, DatasetInfo, OptimisticLock, PendingDeletion, - QueryResult, QueryRun, QueryRunCursor, QueryRunUpdate, ResultStatus, ResultUpdate, SavedQuery, - SavedQueryVersion, SqlSnapshot, SqliteCatalogManager, TableInfo, UploadInfo, + QueryResult, QueryRun, QueryRunCursor, QueryRunStatus, QueryRunUpdate, ResultStatus, + ResultUpdate, SavedQuery, SavedQueryVersion, SqlSnapshot, SqliteCatalogManager, TableInfo, + UploadInfo, }; use runtimedb::http::app_server::{AppServer, PATH_QUERY, PATH_RESULT, PATH_RESULTS}; use runtimedb::secrets::{SecretMetadata, SecretStatus}; @@ -259,6 +260,10 @@ impl CatalogManager for FailingCatalog { self.inner.cleanup_stale_results(cutoff).await } + async fn cleanup_stale_query_runs(&self, cutoff: DateTime) -> Result { + self.inner.cleanup_stale_query_runs(cutoff).await + } + // Upload management methods async fn create_upload(&self, upload: &UploadInfo) -> Result<()> { @@ -1973,3 +1978,78 @@ async fn test_get_result_missing_parquet_returns_failed_status() -> Result<()> { Ok(()) } + +/// Orphaned "running" query runs from a previous crash are marked failed on startup. +#[tokio::test] +async fn startup_cleans_up_orphaned_query_runs() -> Result<()> { + let temp_dir = tempfile::tempdir()?; + let catalog_path = temp_dir.path().join("catalog.db"); + let catalog = SqliteCatalogManager::new(catalog_path.to_str().unwrap()).await?; + catalog.run_migrations().await?; + + let snap = catalog.get_or_create_snapshot("SELECT 1").await?; + + // Create a running query run (simulates orphaned run from crash) + let orphaned_id = runtimedb::id::generate_query_run_id(); + catalog + .create_query_run(CreateQueryRun { + id: &orphaned_id, + snapshot_id: &snap.id, + saved_query_id: None, + saved_query_version: None, + trace_id: None, + }) + .await?; + + // Create a succeeded run (should not be affected) + let succeeded_id = runtimedb::id::generate_query_run_id(); + catalog + .create_query_run(CreateQueryRun { + id: &succeeded_id, + snapshot_id: &snap.id, + saved_query_id: None, + saved_query_version: None, + trace_id: None, + }) + .await?; + catalog + .update_query_run( + &succeeded_id, + QueryRunUpdate::Succeeded { + result_id: None, + row_count: 1, + execution_time_ms: 10, + warning_message: None, + }, + ) + .await?; + + assert_eq!( + catalog.get_query_run(&orphaned_id).await?.unwrap().status, + QueryRunStatus::Running + ); + + // Boot engine — startup cleanup should catch the orphaned run regardless of timeout + let catalog = Arc::new(catalog); + let engine = RuntimeEngine::builder() + .base_dir(temp_dir.path()) + .catalog(catalog.clone()) + .secret_key(generate_test_secret_key()) + .build() + .await?; + + let run = catalog.get_query_run(&orphaned_id).await?.unwrap(); + assert_eq!(run.status, QueryRunStatus::Failed); + assert_eq!( + run.error_message.as_deref(), + Some("Server interrupted before query completed") + ); + assert!(run.completed_at.is_some()); + + let run = catalog.get_query_run(&succeeded_id).await?.unwrap(); + assert_eq!(run.status, QueryRunStatus::Succeeded); + + engine.shutdown().await?; + + Ok(()) +}