diff --git a/src/datafetch/fetcher.rs b/src/datafetch/fetcher.rs index 7ee6f40..4f160b9 100644 --- a/src/datafetch/fetcher.rs +++ b/src/datafetch/fetcher.rs @@ -27,4 +27,12 @@ pub trait DataFetcher: Send + Sync + std::fmt::Debug { table: &str, writer: &mut dyn BatchWriter, ) -> Result<(), DataFetchError>; + + /// Check connectivity to the remote source. + /// Returns Ok(()) if the connection is healthy, or an error describing the failure. + async fn check_health( + &self, + source: &Source, + secrets: &SecretManager, + ) -> Result<(), DataFetchError>; } diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 5416457..0a6678c 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -45,6 +45,28 @@ async fn build_client(source: &Source, secrets: &SecretManager) -> Result Result<(), DataFetchError> { + let client = build_client(source, secrets).await?; + + let project_id = match source { + Source::Bigquery { project_id, .. } => project_id.as_str(), + _ => { + return Err(DataFetchError::Connection( + "Expected BigQuery source".to_string(), + )) + } + }; + + let query_request = QueryRequest::new("SELECT 1"); + client + .job() + .query(project_id, query_request) + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from BigQuery pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/duckdb.rs b/src/datafetch/native/duckdb.rs index a4a88b1..e2bd36d 100644 --- a/src/datafetch/native/duckdb.rs +++ b/src/datafetch/native/duckdb.rs @@ -15,6 +15,20 @@ use crate::source::Source; // Arrow 56 IPC for duckdb compatibility (duckdb uses arrow 56) use arrow_ipc_56 as arrow56_ipc; +/// Check connectivity to a DuckDB/MotherDuck source +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { + let connection_string = resolve_connection_string(source, secrets).await?; + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&connection_string) + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + conn.execute("SELECT 1", []) + .map_err(|e| DataFetchError::Query(e.to_string()))?; + Ok(()) + }) + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))? +} + /// Discover tables and columns from DuckDB/MotherDuck pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/ducklake.rs b/src/datafetch/native/ducklake.rs index e13f2e4..bb9ccc5 100644 --- a/src/datafetch/native/ducklake.rs +++ b/src/datafetch/native/ducklake.rs @@ -133,15 +133,11 @@ pub async fn discover_tables( Ok(tables) } -/// Fetch table data from DuckLake and write to the batch writer. -pub async fn fetch_table( +/// Build a SessionContext with the DuckLake catalog and S3 object store registered. +async fn build_session_context( source: &Source, secrets: &SecretManager, - _catalog: Option<&str>, - schema: &str, - table: &str, - writer: &mut dyn BatchWriter, -) -> Result<(), DataFetchError> { +) -> Result { let (s3_endpoint, s3_region) = match source { Source::Ducklake { s3_endpoint, @@ -157,7 +153,6 @@ pub async fn fetch_table( let (catalog, creds) = build_catalog(source, secrets).await?; - // Build a SessionContext and register the catalog let ctx = SessionContext::new(); ctx.register_catalog("ducklake", catalog.clone()); @@ -201,6 +196,32 @@ pub async fn fetch_table( ctx.register_object_store(&s3_url, Arc::new(s3_store)); } + Ok(ctx) +} + +/// Check connectivity to the DuckLake catalog database and object store. +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { + let ctx = build_session_context(source, secrets).await?; + ctx.sql("SELECT 1") + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))? + .collect() + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))?; + Ok(()) +} + +/// Fetch table data from DuckLake and write to the batch writer. +pub async fn fetch_table( + source: &Source, + secrets: &SecretManager, + _catalog: Option<&str>, + schema: &str, + table: &str, + writer: &mut dyn BatchWriter, +) -> Result<(), DataFetchError> { + let ctx = build_session_context(source, secrets).await?; + // Execute SELECT * FROM ducklake.{schema}.{table} let sql = format!( "SELECT * FROM ducklake.{}.{}", diff --git a/src/datafetch/native/iceberg.rs b/src/datafetch/native/iceberg.rs index 48c8506..58116ad 100644 --- a/src/datafetch/native/iceberg.rs +++ b/src/datafetch/native/iceberg.rs @@ -103,6 +103,16 @@ async fn build_catalog( } } +/// Check connectivity to an Iceberg catalog +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { + let catalog = build_catalog(source, secrets).await?; + catalog + .list_namespaces(None) + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from an Iceberg catalog. /// Recursively discovers nested namespaces. pub async fn discover_tables( diff --git a/src/datafetch/native/mod.rs b/src/datafetch/native/mod.rs index b6fd987..f3ce2b2 100644 --- a/src/datafetch/native/mod.rs +++ b/src/datafetch/native/mod.rs @@ -142,11 +142,17 @@ mod tests { } } +use std::time::Duration; + use crate::datafetch::batch_writer::BatchWriter; use crate::datafetch::{DataFetchError, DataFetcher, TableMetadata}; use crate::secrets::SecretManager; use crate::source::Source; +/// Timeout for health check operations. +/// Keeps health checks fast-failing for unreachable hosts. +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10); + /// Native Rust driver-based data fetcher #[derive(Debug, Default)] pub struct NativeFetcher; @@ -229,4 +235,36 @@ impl DataFetcher for NativeFetcher { } } } + + #[tracing::instrument( + name = "check_health", + skip(self, source, secrets), + fields(runtimedb.backend = %source.source_type()) + )] + async fn check_health( + &self, + source: &Source, + secrets: &SecretManager, + ) -> Result<(), DataFetchError> { + tokio::time::timeout(HEALTH_CHECK_TIMEOUT, async { + match source { + Source::Duckdb { .. } | Source::Motherduck { .. } => { + duckdb::check_health(source, secrets).await + } + Source::Postgres { .. } => postgres::check_health(source, secrets).await, + Source::Iceberg { .. } => iceberg::check_health(source, secrets).await, + Source::Mysql { .. } => mysql::check_health(source, secrets).await, + Source::Snowflake { .. } => snowflake::check_health(source, secrets).await, + Source::Bigquery { .. } => bigquery::check_health(source, secrets).await, + Source::Ducklake { .. } => ducklake::check_health(source, secrets).await, + } + }) + .await + .unwrap_or_else(|_| { + Err(DataFetchError::Connection(format!( + "health check timed out after {}s", + HEALTH_CHECK_TIMEOUT.as_secs() + ))) + }) + } } diff --git a/src/datafetch/native/mysql.rs b/src/datafetch/native/mysql.rs index af2b3d4..afe5223 100644 --- a/src/datafetch/native/mysql.rs +++ b/src/datafetch/native/mysql.rs @@ -9,7 +9,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use futures::StreamExt; use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlRow, MySqlSslMode}; -use sqlx::{ConnectOptions, Row}; +use sqlx::{ConnectOptions, Connection, Row}; use std::sync::Arc; use tracing::warn; @@ -84,6 +84,16 @@ async fn connect_with_ssl_retry( } } +/// Check connectivity to a MySQL source +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { + let options = resolve_connect_options(source, secrets).await?; + let mut conn = connect_with_ssl_retry(options).await?; + conn.ping() + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + Ok(()) +} + /// Discover tables and columns from MySQL pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/postgres.rs b/src/datafetch/native/postgres.rs index b2d5785..8abfabd 100644 --- a/src/datafetch/native/postgres.rs +++ b/src/datafetch/native/postgres.rs @@ -93,6 +93,16 @@ async fn connect_with_ssl_retry(connection_string: &str) -> Result Result<(), DataFetchError> { + let connection_string = resolve_connection_string(source, secrets).await?; + let mut conn = connect_with_ssl_retry(&connection_string).await?; + conn.ping() + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + Ok(()) +} + /// Discover tables and columns from PostgreSQL pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/snowflake.rs b/src/datafetch/native/snowflake.rs index 39d8732..ce69a0a 100644 --- a/src/datafetch/native/snowflake.rs +++ b/src/datafetch/native/snowflake.rs @@ -98,6 +98,16 @@ async fn build_client( Ok(client) } +/// Check connectivity to a Snowflake source +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { + let client = build_client(source, secrets).await?; + client + .exec("SELECT 1") + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from Snowflake pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/orchestrator.rs b/src/datafetch/orchestrator.rs index db7bdb8..f589d8b 100644 --- a/src/datafetch/orchestrator.rs +++ b/src/datafetch/orchestrator.rs @@ -119,6 +119,14 @@ impl FetchOrchestrator { Ok((parquet_url, row_count)) } + /// Check connectivity to a remote source. + /// Delegates to the underlying fetcher. + pub async fn check_health(&self, source: &Source) -> Result<(), DataFetchError> { + self.fetcher + .check_health(source, &self.secret_manager) + .await + } + /// Discover tables from a remote source. /// Delegates to the underlying fetcher. pub async fn discover_tables( @@ -299,6 +307,14 @@ mod tests { }]) } + async fn check_health( + &self, + _source: &Source, + _secrets: &SecretManager, + ) -> Result<(), DataFetchError> { + Ok(()) + } + async fn fetch_table( &self, _source: &Source, diff --git a/src/engine.rs b/src/engine.rs index a9f890d..24bcfc8 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -461,6 +461,30 @@ impl RuntimeEngine { self.catalog.clone() } + /// Check health of a connection by attempting to connect to the remote source. + /// Returns Ok(()) if healthy, or an error describing the failure. + pub async fn check_connection_health( + &self, + connection_id: &str, + ) -> Result<(), crate::datafetch::DataFetchError> { + let conn = self + .catalog + .get_connection(connection_id) + .await + .map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))? + .ok_or_else(|| { + crate::datafetch::DataFetchError::Connection(format!( + "Connection '{}' not found", + connection_id + )) + })?; + + let source: Source = serde_json::from_str(&conn.config_json) + .map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))?; + + self.orchestrator.check_health(&source).await + } + /// List all configured connections. pub async fn list_connections(&self) -> Result> { self.catalog.list_connections().await diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 108b0ec..a78b49b 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -1,10 +1,11 @@ use crate::http::controllers::{ - create_connection_handler, create_dataset, create_secret_handler, delete_connection_handler, - delete_dataset, delete_secret_handler, get_connection_handler, get_dataset, get_result_handler, - get_secret_handler, health_handler, information_schema_handler, list_connections_handler, - list_datasets, list_query_runs_handler, list_results_handler, list_secrets_handler, - list_uploads, purge_connection_cache_handler, purge_table_cache_handler, query_handler, - refresh_handler, update_dataset, update_secret_handler, upload_file, MAX_UPLOAD_SIZE, + check_connection_health_handler, create_connection_handler, create_dataset, + create_secret_handler, delete_connection_handler, delete_dataset, delete_secret_handler, + get_connection_handler, get_dataset, get_result_handler, get_secret_handler, health_handler, + information_schema_handler, list_connections_handler, list_datasets, list_query_runs_handler, + list_results_handler, list_secrets_handler, list_uploads, purge_connection_cache_handler, + purge_table_cache_handler, query_handler, refresh_handler, update_dataset, + update_secret_handler, upload_file, MAX_UPLOAD_SIZE, }; use crate::RuntimeEngine; use axum::extract::DefaultBodyLimit; @@ -80,6 +81,7 @@ pub const PATH_HEALTH: &str = "/health"; pub const PATH_REFRESH: &str = "/refresh"; pub const PATH_CONNECTIONS: &str = "/connections"; pub const PATH_CONNECTION: &str = "/connections/{connection_id}"; +pub const PATH_CONNECTION_HEALTH: &str = "/connections/{connection_id}/health"; pub const PATH_CONNECTION_CACHE: &str = "/connections/{connection_id}/cache"; pub const PATH_TABLE_CACHE: &str = "/connections/{connection_id}/tables/{schema}/{table}/cache"; pub const PATH_SECRETS: &str = "/secrets"; @@ -108,6 +110,7 @@ impl AppServer { PATH_CONNECTION, get(get_connection_handler).delete(delete_connection_handler), ) + .route(PATH_CONNECTION_HEALTH, get(check_connection_health_handler)) .route( PATH_CONNECTION_CACHE, delete(purge_connection_cache_handler), diff --git a/src/http/controllers/connections_controller.rs b/src/http/controllers/connections_controller.rs index 3bccb75..8f979e8 100644 --- a/src/http/controllers/connections_controller.rs +++ b/src/http/controllers/connections_controller.rs @@ -1,7 +1,7 @@ use crate::http::error::ApiError; use crate::http::models::{ - ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, DiscoveryStatus, - GetConnectionResponse, ListConnectionsResponse, + ConnectionHealthResponse, ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, + DiscoveryStatus, GetConnectionResponse, ListConnectionsResponse, }; use crate::source::{Credential, Source}; use crate::RuntimeEngine; @@ -12,6 +12,7 @@ use axum::{ }; use serde::Deserialize; use std::sync::Arc; +use std::time::Instant; use tracing::error; /// Handler for POST /connections @@ -468,3 +469,41 @@ pub async fn purge_table_cache_handler( Ok(StatusCode::NO_CONTENT) } + +/// Handler for GET /connections/{connection_id}/health +#[tracing::instrument( + name = "handler_check_connection_health", + skip(engine), + fields(runtimedb.connection_id = %connection_id) +)] +pub async fn check_connection_health_handler( + State(engine): State>, + Path(connection_id): Path, +) -> Result, ApiError> { + // Verify connection exists (return 404 if not) + engine + .catalog() + .get_connection(&connection_id) + .await + .map_err(|e| ApiError::internal_error(e.to_string()))? + .ok_or_else(|| ApiError::not_found(format!("Connection '{}' not found", connection_id)))?; + + let start = Instant::now(); + let result = engine.check_connection_health(&connection_id).await; + let latency_ms = start.elapsed().as_millis() as u64; + + match result { + Ok(()) => Ok(Json(ConnectionHealthResponse { + connection_id, + healthy: true, + error: None, + latency_ms, + })), + Err(e) => Ok(Json(ConnectionHealthResponse { + connection_id, + healthy: false, + error: Some(e.to_string()), + latency_ms, + })), + } +} diff --git a/src/http/controllers/mod.rs b/src/http/controllers/mod.rs index 12cbed5..a6b58a9 100644 --- a/src/http/controllers/mod.rs +++ b/src/http/controllers/mod.rs @@ -10,8 +10,9 @@ pub mod secrets_controller; pub mod uploads_controller; pub use connections_controller::{ - create_connection_handler, delete_connection_handler, get_connection_handler, - list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler, + check_connection_health_handler, create_connection_handler, delete_connection_handler, + get_connection_handler, list_connections_handler, purge_connection_cache_handler, + purge_table_cache_handler, }; pub use datasets_controller::{ create_dataset, delete_dataset, get_dataset, list_datasets, update_dataset, diff --git a/src/http/models.rs b/src/http/models.rs index 2d4c84d..562bbed 100644 --- a/src/http/models.rs +++ b/src/http/models.rs @@ -204,6 +204,16 @@ pub struct ListConnectionsResponse { pub connections: Vec, } +/// Response body for GET /connections/{connection_id}/health +#[derive(Debug, Serialize)] +pub struct ConnectionHealthResponse { + pub connection_id: String, + pub healthy: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + pub latency_ms: u64, +} + /// Response body for GET /connections/{connection_id} #[derive(Debug, Serialize)] pub struct GetConnectionResponse {