From e8e74b54f80cc565b61d85b5a2f3c96a1d70a4ab Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Thu, 19 Feb 2026 17:24:10 +0530 Subject: [PATCH 1/2] feat(engine): add query result retention policy (#123) --- src/catalog/caching_manager.rs | 4 + src/catalog/manager.rs | 4 + src/catalog/mock_catalog.rs | 20 +++ src/catalog/postgres_manager.rs | 18 +++ src/catalog/sqlite_manager.rs | 31 +++++ src/config/mod.rs | 46 ++++++ src/engine.rs | 224 ++++++++++++++++++++++++++++++ tests/result_persistence_tests.rs | 7 + 8 files changed, 354 insertions(+) diff --git a/src/catalog/caching_manager.rs b/src/catalog/caching_manager.rs index d832994..7ee2bac 100644 --- a/src/catalog/caching_manager.rs +++ b/src/catalog/caching_manager.rs @@ -1098,6 +1098,10 @@ impl CatalogManager for CachingCatalogManager { self.inner().cleanup_stale_results(cutoff).await } + async fn delete_expired_results(&self, cutoff: DateTime) -> Result> { + self.inner().delete_expired_results(cutoff).await + } + // Upload management methods async fn create_upload(&self, upload: &UploadInfo) -> Result<()> { diff --git a/src/catalog/manager.rs b/src/catalog/manager.rs index 3ef88ed..26489ff 100644 --- a/src/catalog/manager.rs +++ b/src/catalog/manager.rs @@ -759,6 +759,10 @@ pub trait CatalogManager: Debug + Send + Sync { /// Returns the number of results cleaned up. async fn cleanup_stale_results(&self, cutoff: DateTime) -> Result; + /// Delete expired terminal results (ready/failed) older than the cutoff. + /// Returns the deleted results so the caller can schedule file deletion for any parquet paths. + async fn delete_expired_results(&self, cutoff: DateTime) -> Result>; + // Upload management methods /// Create a new upload record. diff --git a/src/catalog/mock_catalog.rs b/src/catalog/mock_catalog.rs index 0d4f501..96b933c 100644 --- a/src/catalog/mock_catalog.rs +++ b/src/catalog/mock_catalog.rs @@ -293,6 +293,26 @@ impl CatalogManager for MockCatalog { Ok(count) } + async fn delete_expired_results(&self, cutoff: DateTime) -> Result> { + let mut results = self.results.write().unwrap(); + let expired_ids: Vec = results + .values() + .filter(|r| { + (r.status == ResultStatus::Ready || r.status == ResultStatus::Failed) + && r.created_at < cutoff + }) + .map(|r| r.id.clone()) + .collect(); + + let mut deleted = Vec::new(); + for id in expired_ids { + if let Some(result) = results.remove(&id) { + deleted.push(result); + } + } + Ok(deleted) + } + async fn create_query_run(&self, params: CreateQueryRun<'_>) -> Result { Ok(params.id.to_string()) } diff --git a/src/catalog/postgres_manager.rs b/src/catalog/postgres_manager.rs index ca2e65d..bdf305c 100644 --- a/src/catalog/postgres_manager.rs +++ b/src/catalog/postgres_manager.rs @@ -628,6 +628,24 @@ impl CatalogManager for PostgresCatalogManager { Ok(result.rows_affected() as usize) } + #[tracing::instrument( + name = "catalog_delete_expired_results", + skip(self), + fields(db = "postgres") + )] + async fn delete_expired_results(&self, cutoff: DateTime) -> Result> { + let rows: Vec = sqlx::query_as( + "DELETE FROM results + WHERE status IN ('ready', 'failed') AND created_at < $1 + RETURNING id, parquet_path, status, error_message, created_at", + ) + .bind(cutoff) + .fetch_all(self.backend.pool()) + .await?; + + Ok(rows.into_iter().map(QueryResult::from).collect()) + } + // Query run history methods #[tracing::instrument( diff --git a/src/catalog/sqlite_manager.rs b/src/catalog/sqlite_manager.rs index 8d20685..e45ffca 100644 --- a/src/catalog/sqlite_manager.rs +++ b/src/catalog/sqlite_manager.rs @@ -664,6 +664,37 @@ impl CatalogManager for SqliteCatalogManager { Ok(result.rows_affected() as usize) } + #[tracing::instrument( + name = "catalog_delete_expired_results", + skip(self), + fields(db = "sqlite") + )] + async fn delete_expired_results(&self, cutoff: DateTime) -> Result> { + // SQLite doesn't reliably support RETURNING, so use SELECT then DELETE + let rows: Vec = sqlx::query_as( + "SELECT id, parquet_path, status, error_message, created_at + FROM results + WHERE status IN ('ready', 'failed') AND created_at < ?", + ) + .bind(cutoff) + .fetch_all(self.backend.pool()) + .await?; + + if rows.is_empty() { + return Ok(vec![]); + } + + sqlx::query( + "DELETE FROM results + WHERE status IN ('ready', 'failed') AND created_at < ?", + ) + .bind(cutoff) + .execute(self.backend.pool()) + .await?; + + Ok(rows.into_iter().map(QueryResult::from).collect()) + } + // Query run history methods #[tracing::instrument( diff --git a/src/config/mod.rs b/src/config/mod.rs index 9218b43..6a3517f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -110,6 +110,13 @@ pub struct EngineConfig { /// Default: 300 (5 minutes). #[serde(default = "default_stale_result_timeout_secs")] pub stale_result_timeout_secs: u64, + + /// Number of days to retain query results before automatic cleanup. + /// Results older than this (in ready/failed status) are deleted along with their parquet files. + /// Set to 0 to disable automatic cleanup (keep results indefinitely). + /// Default: 7. + #[serde(default = "default_result_retention_days")] + pub result_retention_days: u64, } impl Default for EngineConfig { @@ -118,6 +125,7 @@ impl Default for EngineConfig { max_concurrent_persistence: default_max_concurrent_persistence(), stale_result_cleanup_interval_secs: default_stale_result_cleanup_interval_secs(), stale_result_timeout_secs: default_stale_result_timeout_secs(), + result_retention_days: default_result_retention_days(), } } } @@ -134,6 +142,10 @@ fn default_stale_result_timeout_secs() -> u64 { 300 } +fn default_result_retention_days() -> u64 { + 7 +} + #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct CacheConfig { /// Redis connection URL. None = caching disabled. @@ -287,4 +299,38 @@ type = "filesystem" std::env::remove_var("RUNTIMEDB_STORAGE__AUTHORIZATION_HEADER"); std::env::remove_var("RUNTIMEDB_SERVER__PORT"); } + + #[test] + fn test_engine_config_result_retention_days_default() { + let config = EngineConfig::default(); + assert_eq!(config.result_retention_days, 7); + } + + #[test] + fn test_engine_config_result_retention_days_from_env() { + let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap(); + let temp_file = tempfile::Builder::new().suffix(".toml").tempfile().unwrap(); + std::fs::write( + temp_file.path(), + r#" +[server] +host = "127.0.0.1" +port = 3000 + +[catalog] +type = "sqlite" + +[storage] +type = "filesystem" +"#, + ) + .unwrap(); + + std::env::set_var("RUNTIMEDB_ENGINE__RESULT_RETENTION_DAYS", "30"); + + let config = AppConfig::load(temp_file.path().to_str().unwrap()).unwrap(); + assert_eq!(config.engine.result_retention_days, 30); + + std::env::remove_var("RUNTIMEDB_ENGINE__RESULT_RETENTION_DAYS"); + } } diff --git a/src/engine.rs b/src/engine.rs index a89e626..6e7c331 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -63,6 +63,12 @@ const DEFAULT_DELETION_WORKER_INTERVAL_SECS: u64 = 30; /// Default interval (in seconds) between stale result cleanup runs. const DEFAULT_STALE_RESULT_CLEANUP_INTERVAL_SECS: u64 = 60; +/// Default number of days to retain query results before automatic cleanup. +const DEFAULT_RESULT_RETENTION_DAYS: u64 = 7; + +/// Interval (in seconds) between result retention cleanup runs (1 hour). +const RESULT_RETENTION_CLEANUP_INTERVAL_SECS: u64 = 3600; + /// Default timeout (in seconds) after which pending/processing results are considered stale. const DEFAULT_STALE_RESULT_TIMEOUT_SECS: u64 = 300; @@ -173,6 +179,8 @@ pub struct RuntimeEngine { persistence_tasks: Mutex>, /// Handle for the stale result cleanup worker task. stale_result_cleanup_handle: Mutex>>, + /// Handle for the result retention cleanup worker task. + result_retention_handle: Mutex>>, } /// Build a reader-compatible schema where geometry (Binary) columns are replaced with Utf8. @@ -348,6 +356,7 @@ impl RuntimeEngine { )); builder = builder .stale_result_timeout(Duration::from_secs(config.engine.stale_result_timeout_secs)); + builder = builder.result_retention_days(config.engine.result_retention_days); builder.build().await } @@ -1400,6 +1409,11 @@ impl RuntimeEngine { let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; } + // Wait for the result retention worker to finish + if let Some(handle) = self.result_retention_handle.lock().await.take() { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + // Wait for in-flight persistence tasks to complete let mut tasks = self.persistence_tasks.lock().await; let task_count = tasks.len(); @@ -2738,6 +2752,78 @@ impl RuntimeEngine { } })) } + + /// Start background task that deletes expired query results (ready/failed) and schedules + /// their parquet files for deletion. Returns None if retention_days is 0 (disabled). + fn start_result_retention_worker( + catalog: Arc, + shutdown_token: CancellationToken, + retention_days: u64, + ) -> Option> { + if retention_days == 0 { + info!("Result retention worker disabled (retention_days is 0)"); + return None; + } + + info!( + retention_days = retention_days, + interval_secs = RESULT_RETENTION_CLEANUP_INTERVAL_SECS, + "Starting result retention worker" + ); + + Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs( + RESULT_RETENTION_CLEANUP_INTERVAL_SECS, + )); + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + info!("Result retention worker received shutdown signal"); + break; + } + _ = interval.tick() => { + let cutoff = Utc::now() - chrono::Duration::days(retention_days as i64); + match catalog.delete_expired_results(cutoff).await { + Ok(deleted) if deleted.is_empty() => { + // Nothing expired — normal case, don't log + } + Ok(deleted) => { + let count = deleted.len(); + let now = Utc::now(); + for result in &deleted { + if let Some(ref parquet_path) = result.parquet_path { + // Strip "/data.parquet" suffix to get directory path + let dir_path = parquet_path + .strip_suffix("/data.parquet") + .unwrap_or(parquet_path); + if let Err(e) = catalog + .schedule_file_deletion(dir_path, now) + .await + { + warn!( + result_id = %result.id, + path = %dir_path, + error = %e, + "Failed to schedule file deletion for expired result" + ); + } + } + } + info!( + count = count, + cutoff = %cutoff, + "Deleted expired results" + ); + } + Err(e) => { + warn!(error = %e, "Failed to delete expired results"); + } + } + } + } + } + })) + } } impl Drop for RuntimeEngine { @@ -2791,6 +2877,7 @@ pub struct RuntimeEngineBuilder { cache_config: Option, stale_result_cleanup_interval: Duration, stale_result_timeout: Duration, + result_retention_days: u64, } impl Default for RuntimeEngineBuilder { @@ -2820,6 +2907,7 @@ impl RuntimeEngineBuilder { DEFAULT_STALE_RESULT_CLEANUP_INTERVAL_SECS, ), stale_result_timeout: Duration::from_secs(DEFAULT_STALE_RESULT_TIMEOUT_SECS), + result_retention_days: DEFAULT_RESULT_RETENTION_DAYS, } } @@ -2924,6 +3012,14 @@ impl RuntimeEngineBuilder { self } + /// Set the number of days to retain query results before automatic cleanup. + /// Results older than this (in ready/failed status) are deleted along with their parquet files. + /// Set to 0 to disable automatic cleanup. Defaults to 7. + pub fn result_retention_days(mut self, days: u64) -> Self { + self.result_retention_days = days; + self + } + /// Resolve the base directory, using default if not set. fn resolve_base_dir(&self) -> PathBuf { self.base_dir.clone().unwrap_or_else(|| { @@ -3075,6 +3171,13 @@ impl RuntimeEngineBuilder { self.stale_result_timeout, ); + // Start background result retention worker + let result_retention_handle = RuntimeEngine::start_result_retention_worker( + catalog.clone(), + shutdown_token.clone(), + self.result_retention_days, + ); + // Initialize catalog (starts warmup loop if configured) catalog.init().await?; @@ -3101,6 +3204,7 @@ impl RuntimeEngineBuilder { persistence_semaphore: Arc::new(Semaphore::new(self.max_concurrent_persistence)), persistence_tasks: Mutex::new(JoinSet::new()), stale_result_cleanup_handle: Mutex::new(stale_result_cleanup_handle), + result_retention_handle: Mutex::new(result_retention_handle), }; // Note: All catalogs (connections, datasets, runtimedb) are now resolved on-demand @@ -3713,4 +3817,124 @@ mod tests { .unwrap(); assert_eq!(name_col.value(0), "Alice"); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_result_retention_deletes_expired_ready_and_failed() { + use crate::catalog::ResultUpdate; + + let temp_dir = TempDir::new().unwrap(); + let base_dir = temp_dir.path().to_path_buf(); + + let catalog = Arc::new( + SqliteCatalogManager::new(base_dir.join("catalog.db").to_str().unwrap()) + .await + .unwrap(), + ); + catalog.run_migrations().await.unwrap(); + + // Create a result in ready state with a fake parquet_path + let r1 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + catalog + .update_result(&r1, ResultUpdate::Ready { parquet_path: "cache/_runtimedb_internal/runtimedb_results/r1/data.parquet" }) + .await + .unwrap(); + + // Create a result in failed state + let r2 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + catalog + .update_result(&r2, ResultUpdate::Failed { error_message: Some("test error") }) + .await + .unwrap(); + + // Create a result still in pending state + let r3 = catalog.create_result(ResultStatus::Pending).await.unwrap(); + + // Create a result in processing state + let r4 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + + // Delete expired results with cutoff in the future (everything is expired) + let cutoff = Utc::now() + chrono::Duration::seconds(60); + let deleted = catalog.delete_expired_results(cutoff).await.unwrap(); + + // Should only delete ready and failed results + assert_eq!(deleted.len(), 2, "Should delete 2 terminal results"); + let deleted_ids: Vec<&str> = deleted.iter().map(|r| r.id.as_str()).collect(); + assert!(deleted_ids.contains(&r1.as_str())); + assert!(deleted_ids.contains(&r2.as_str())); + + // Pending and processing should remain + assert!(catalog.get_result(&r3).await.unwrap().is_some(), "Pending result should remain"); + assert!(catalog.get_result(&r4).await.unwrap().is_some(), "Processing result should remain"); + + // Deleted results should be gone from DB + assert!(catalog.get_result(&r1).await.unwrap().is_none(), "Ready result should be deleted"); + assert!(catalog.get_result(&r2).await.unwrap().is_none(), "Failed result should be deleted"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_result_retention_does_not_delete_non_expired() { + let temp_dir = TempDir::new().unwrap(); + let base_dir = temp_dir.path().to_path_buf(); + + let catalog = Arc::new( + SqliteCatalogManager::new(base_dir.join("catalog.db").to_str().unwrap()) + .await + .unwrap(), + ); + catalog.run_migrations().await.unwrap(); + + // Create a result in ready state + let r1 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + catalog + .update_result(&r1, ResultUpdate::Ready { parquet_path: "some/path/data.parquet" }) + .await + .unwrap(); + + // Use cutoff in the past (nothing should be expired since results were just created) + let cutoff = Utc::now() - chrono::Duration::days(1); + let deleted = catalog.delete_expired_results(cutoff).await.unwrap(); + + assert!(deleted.is_empty(), "No results should be deleted when cutoff is in the past"); + assert!(catalog.get_result(&r1).await.unwrap().is_some(), "Result should still exist"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_result_retention_worker_disabled_when_days_zero() { + let temp_dir = TempDir::new().unwrap(); + let base_dir = temp_dir.path().to_path_buf(); + + let engine = RuntimeEngine::builder() + .base_dir(base_dir) + .secret_key(test_secret_key()) + .result_retention_days(0) + .build() + .await + .unwrap(); + + assert!( + engine.result_retention_handle.lock().await.is_none(), + "Retention worker should be None when disabled" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_result_retention_worker_enabled_when_days_nonzero() { + let temp_dir = TempDir::new().unwrap(); + let base_dir = temp_dir.path().to_path_buf(); + + let engine = RuntimeEngine::builder() + .base_dir(base_dir) + .secret_key(test_secret_key()) + .result_retention_days(30) + .build() + .await + .unwrap(); + + assert!( + engine.result_retention_handle.lock().await.is_some(), + "Retention worker should be running when days > 0" + ); + + engine.shutdown().await.unwrap(); + } } diff --git a/tests/result_persistence_tests.rs b/tests/result_persistence_tests.rs index 154d14e..8aee9fc 100644 --- a/tests/result_persistence_tests.rs +++ b/tests/result_persistence_tests.rs @@ -259,6 +259,13 @@ impl CatalogManager for FailingCatalog { self.inner.cleanup_stale_results(cutoff).await } + async fn delete_expired_results( + &self, + cutoff: DateTime, + ) -> Result> { + self.inner.delete_expired_results(cutoff).await + } + // Upload management methods async fn create_upload(&self, upload: &UploadInfo) -> Result<()> { From ae99390f050663135c699afccb644648d401a8eb Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Thu, 19 Feb 2026 19:30:48 +0530 Subject: [PATCH 2/2] Fixing clippy and fmt issues --- src/catalog/sqlite_manager.rs | 50 +++++++++++++-------- src/engine.rs | 84 +++++++++++++++++++++++++++-------- 2 files changed, 98 insertions(+), 36 deletions(-) diff --git a/src/catalog/sqlite_manager.rs b/src/catalog/sqlite_manager.rs index e45ffca..9932dff 100644 --- a/src/catalog/sqlite_manager.rs +++ b/src/catalog/sqlite_manager.rs @@ -671,28 +671,42 @@ impl CatalogManager for SqliteCatalogManager { )] async fn delete_expired_results(&self, cutoff: DateTime) -> Result> { // SQLite doesn't reliably support RETURNING, so use SELECT then DELETE - let rows: Vec = sqlx::query_as( - "SELECT id, parquet_path, status, error_message, created_at - FROM results - WHERE status IN ('ready', 'failed') AND created_at < ?", - ) - .bind(cutoff) - .fetch_all(self.backend.pool()) - .await?; + // inside a BEGIN IMMEDIATE transaction to prevent concurrent modifications. + let mut conn = self.backend.pool().acquire().await?; + sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await?; - if rows.is_empty() { - return Ok(vec![]); + let result: Result> = async { + let rows: Vec = sqlx::query_as( + "SELECT id, parquet_path, status, error_message, created_at + FROM results + WHERE status IN ('ready', 'failed') AND created_at < ?", + ) + .bind(cutoff) + .fetch_all(&mut *conn) + .await?; + + if rows.is_empty() { + return Ok(vec![]); + } + + sqlx::query( + "DELETE FROM results + WHERE status IN ('ready', 'failed') AND created_at < ?", + ) + .bind(cutoff) + .execute(&mut *conn) + .await?; + + Ok(rows.into_iter().map(QueryResult::from).collect()) } + .await; - sqlx::query( - "DELETE FROM results - WHERE status IN ('ready', 'failed') AND created_at < ?", - ) - .bind(cutoff) - .execute(self.backend.pool()) - .await?; + match &result { + Ok(_) => sqlx::query("COMMIT").execute(&mut *conn).await?, + Err(_) => sqlx::query("ROLLBACK").execute(&mut *conn).await?, + }; - Ok(rows.into_iter().map(QueryResult::from).collect()) + result } // Query run history methods diff --git a/src/engine.rs b/src/engine.rs index 6e7c331..4eb82c9 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2759,6 +2759,7 @@ impl RuntimeEngine { catalog: Arc, shutdown_token: CancellationToken, retention_days: u64, + deletion_grace_period: Duration, ) -> Option> { if retention_days == 0 { info!("Result retention worker disabled (retention_days is 0)"); @@ -2772,9 +2773,8 @@ impl RuntimeEngine { ); Some(tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs( - RESULT_RETENTION_CLEANUP_INTERVAL_SECS, - )); + let mut interval = + tokio::time::interval(Duration::from_secs(RESULT_RETENTION_CLEANUP_INTERVAL_SECS)); loop { tokio::select! { _ = shutdown_token.cancelled() => { @@ -2789,7 +2789,9 @@ impl RuntimeEngine { } Ok(deleted) => { let count = deleted.len(); - let now = Utc::now(); + let grace = chrono::Duration::from_std(deletion_grace_period) + .unwrap_or(chrono::Duration::seconds(60)); + let delete_after = Utc::now() + grace; for result in &deleted { if let Some(ref parquet_path) = result.parquet_path { // Strip "/data.parquet" suffix to get directory path @@ -2797,7 +2799,7 @@ impl RuntimeEngine { .strip_suffix("/data.parquet") .unwrap_or(parquet_path); if let Err(e) = catalog - .schedule_file_deletion(dir_path, now) + .schedule_file_deletion(dir_path, delete_after) .await { warn!( @@ -3176,6 +3178,7 @@ impl RuntimeEngineBuilder { catalog.clone(), shutdown_token.clone(), self.result_retention_days, + self.deletion_grace_period, ); // Initialize catalog (starts warmup loop if configured) @@ -3833,16 +3836,32 @@ mod tests { catalog.run_migrations().await.unwrap(); // Create a result in ready state with a fake parquet_path - let r1 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + let r1 = catalog + .create_result(ResultStatus::Processing) + .await + .unwrap(); catalog - .update_result(&r1, ResultUpdate::Ready { parquet_path: "cache/_runtimedb_internal/runtimedb_results/r1/data.parquet" }) + .update_result( + &r1, + ResultUpdate::Ready { + parquet_path: "cache/_runtimedb_internal/runtimedb_results/r1/data.parquet", + }, + ) .await .unwrap(); // Create a result in failed state - let r2 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + let r2 = catalog + .create_result(ResultStatus::Processing) + .await + .unwrap(); catalog - .update_result(&r2, ResultUpdate::Failed { error_message: Some("test error") }) + .update_result( + &r2, + ResultUpdate::Failed { + error_message: Some("test error"), + }, + ) .await .unwrap(); @@ -3850,7 +3869,10 @@ mod tests { let r3 = catalog.create_result(ResultStatus::Pending).await.unwrap(); // Create a result in processing state - let r4 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + let r4 = catalog + .create_result(ResultStatus::Processing) + .await + .unwrap(); // Delete expired results with cutoff in the future (everything is expired) let cutoff = Utc::now() + chrono::Duration::seconds(60); @@ -3863,12 +3885,24 @@ mod tests { assert!(deleted_ids.contains(&r2.as_str())); // Pending and processing should remain - assert!(catalog.get_result(&r3).await.unwrap().is_some(), "Pending result should remain"); - assert!(catalog.get_result(&r4).await.unwrap().is_some(), "Processing result should remain"); + assert!( + catalog.get_result(&r3).await.unwrap().is_some(), + "Pending result should remain" + ); + assert!( + catalog.get_result(&r4).await.unwrap().is_some(), + "Processing result should remain" + ); // Deleted results should be gone from DB - assert!(catalog.get_result(&r1).await.unwrap().is_none(), "Ready result should be deleted"); - assert!(catalog.get_result(&r2).await.unwrap().is_none(), "Failed result should be deleted"); + assert!( + catalog.get_result(&r1).await.unwrap().is_none(), + "Ready result should be deleted" + ); + assert!( + catalog.get_result(&r2).await.unwrap().is_none(), + "Failed result should be deleted" + ); } #[tokio::test(flavor = "multi_thread")] @@ -3884,9 +3918,17 @@ mod tests { catalog.run_migrations().await.unwrap(); // Create a result in ready state - let r1 = catalog.create_result(ResultStatus::Processing).await.unwrap(); + let r1 = catalog + .create_result(ResultStatus::Processing) + .await + .unwrap(); catalog - .update_result(&r1, ResultUpdate::Ready { parquet_path: "some/path/data.parquet" }) + .update_result( + &r1, + ResultUpdate::Ready { + parquet_path: "some/path/data.parquet", + }, + ) .await .unwrap(); @@ -3894,8 +3936,14 @@ mod tests { let cutoff = Utc::now() - chrono::Duration::days(1); let deleted = catalog.delete_expired_results(cutoff).await.unwrap(); - assert!(deleted.is_empty(), "No results should be deleted when cutoff is in the past"); - assert!(catalog.get_result(&r1).await.unwrap().is_some(), "Result should still exist"); + assert!( + deleted.is_empty(), + "No results should be deleted when cutoff is in the past" + ); + assert!( + catalog.get_result(&r1).await.unwrap().is_some(), + "Result should still exist" + ); } #[tokio::test(flavor = "multi_thread")]