Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/catalog/caching_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,10 @@ impl CatalogManager for CachingCatalogManager {
self.inner().cleanup_stale_results(cutoff).await
}

async fn delete_expired_results(&self, cutoff: DateTime<Utc>) -> Result<Vec<QueryResult>> {
self.inner().delete_expired_results(cutoff).await
}

// Upload management methods

async fn create_upload(&self, upload: &UploadInfo) -> Result<()> {
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,10 @@ pub trait CatalogManager: Debug + Send + Sync {
/// Returns the number of results cleaned up.
async fn cleanup_stale_results(&self, cutoff: DateTime<Utc>) -> Result<usize>;

/// Delete expired terminal results (ready/failed) older than the cutoff.
/// Returns the deleted results so the caller can schedule file deletion for any parquet paths.
async fn delete_expired_results(&self, cutoff: DateTime<Utc>) -> Result<Vec<QueryResult>>;

// Upload management methods

/// Create a new upload record.
Expand Down
20 changes: 20 additions & 0 deletions src/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,26 @@ impl CatalogManager for MockCatalog {
Ok(count)
}

async fn delete_expired_results(&self, cutoff: DateTime<Utc>) -> Result<Vec<QueryResult>> {
let mut results = self.results.write().unwrap();
let expired_ids: Vec<String> = results
.values()
.filter(|r| {
(r.status == ResultStatus::Ready || r.status == ResultStatus::Failed)
&& r.created_at < cutoff
})
.map(|r| r.id.clone())
.collect();

let mut deleted = Vec::new();
for id in expired_ids {
if let Some(result) = results.remove(&id) {
deleted.push(result);
}
}
Ok(deleted)
}

async fn create_query_run(&self, params: CreateQueryRun<'_>) -> Result<String> {
Ok(params.id.to_string())
}
Expand Down
18 changes: 18 additions & 0 deletions src/catalog/postgres_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,24 @@ impl CatalogManager for PostgresCatalogManager {
Ok(result.rows_affected() as usize)
}

#[tracing::instrument(
name = "catalog_delete_expired_results",
skip(self),
fields(db = "postgres")
)]
async fn delete_expired_results(&self, cutoff: DateTime<Utc>) -> Result<Vec<QueryResult>> {
let rows: Vec<QueryResultRow> = sqlx::query_as(
"DELETE FROM results
WHERE status IN ('ready', 'failed') AND created_at < $1
RETURNING id, parquet_path, status, error_message, created_at",
)
.bind(cutoff)
.fetch_all(self.backend.pool())
.await?;

Ok(rows.into_iter().map(QueryResult::from).collect())
}

// Query run history methods

#[tracing::instrument(
Expand Down
45 changes: 45 additions & 0 deletions src/catalog/sqlite_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,51 @@ impl CatalogManager for SqliteCatalogManager {
Ok(result.rows_affected() as usize)
}

#[tracing::instrument(
name = "catalog_delete_expired_results",
skip(self),
fields(db = "sqlite")
)]
async fn delete_expired_results(&self, cutoff: DateTime<Utc>) -> Result<Vec<QueryResult>> {
// SQLite doesn't reliably support RETURNING, so use SELECT then DELETE
// inside a BEGIN IMMEDIATE transaction to prevent concurrent modifications.
let mut conn = self.backend.pool().acquire().await?;
sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await?;

let result: Result<Vec<QueryResult>> = async {
let rows: Vec<QueryResultRow> = sqlx::query_as(
"SELECT id, parquet_path, status, error_message, created_at
FROM results
WHERE status IN ('ready', 'failed') AND created_at < ?",
)
.bind(cutoff)
.fetch_all(&mut *conn)
.await?;

if rows.is_empty() {
return Ok(vec![]);
}

sqlx::query(
"DELETE FROM results
WHERE status IN ('ready', 'failed') AND created_at < ?",
)
.bind(cutoff)
.execute(&mut *conn)
.await?;

Ok(rows.into_iter().map(QueryResult::from).collect())
}
.await;

match &result {
Ok(_) => sqlx::query("COMMIT").execute(&mut *conn).await?,
Err(_) => sqlx::query("ROLLBACK").execute(&mut *conn).await?,
};

result
}

// Query run history methods

#[tracing::instrument(
Expand Down
46 changes: 46 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ pub struct EngineConfig {
/// Default: 300 (5 minutes).
#[serde(default = "default_stale_result_timeout_secs")]
pub stale_result_timeout_secs: u64,

/// Number of days to retain query results before automatic cleanup.
/// Results older than this (in ready/failed status) are deleted along with their parquet files.
/// Set to 0 to disable automatic cleanup (keep results indefinitely).
/// Default: 7.
#[serde(default = "default_result_retention_days")]
pub result_retention_days: u64,
}

impl Default for EngineConfig {
Expand All @@ -118,6 +125,7 @@ impl Default for EngineConfig {
max_concurrent_persistence: default_max_concurrent_persistence(),
stale_result_cleanup_interval_secs: default_stale_result_cleanup_interval_secs(),
stale_result_timeout_secs: default_stale_result_timeout_secs(),
result_retention_days: default_result_retention_days(),
}
}
}
Expand All @@ -134,6 +142,10 @@ fn default_stale_result_timeout_secs() -> u64 {
300
}

fn default_result_retention_days() -> u64 {
7
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct CacheConfig {
/// Redis connection URL. None = caching disabled.
Expand Down Expand Up @@ -287,4 +299,38 @@ type = "filesystem"
std::env::remove_var("RUNTIMEDB_STORAGE__AUTHORIZATION_HEADER");
std::env::remove_var("RUNTIMEDB_SERVER__PORT");
}

#[test]
fn test_engine_config_result_retention_days_default() {
let config = EngineConfig::default();
assert_eq!(config.result_retention_days, 7);
}

#[test]
fn test_engine_config_result_retention_days_from_env() {
let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
let temp_file = tempfile::Builder::new().suffix(".toml").tempfile().unwrap();
std::fs::write(
temp_file.path(),
r#"
[server]
host = "127.0.0.1"
port = 3000

[catalog]
type = "sqlite"

[storage]
type = "filesystem"
"#,
)
.unwrap();

std::env::set_var("RUNTIMEDB_ENGINE__RESULT_RETENTION_DAYS", "30");

let config = AppConfig::load(temp_file.path().to_str().unwrap()).unwrap();
assert_eq!(config.engine.result_retention_days, 30);

std::env::remove_var("RUNTIMEDB_ENGINE__RESULT_RETENTION_DAYS");
}
}
Loading
Loading