From 5fc3624299571953e4837bb7732457a1e1697906 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Thu, 12 Feb 2026 17:01:30 +0530 Subject: [PATCH 1/5] feat(connections): add health check endpoint Add GET /connections/{connection_id}/health to validate remote database connectivity. Each backend reuses its existing connection code and runs a minimal probe (SELECT 1 or list_namespaces). Returns 200 with {healthy, error, latency_ms}; HTTP 404 only for unknown connection_id. Includes a 10s timeout at the dispatch level and a 5s connect_timeout for Postgres to prevent hanging on unreachable hosts. --- src/datafetch/fetcher.rs | 8 ++++ src/datafetch/native/bigquery.rs | 25 +++++++++++ src/datafetch/native/duckdb.rs | 17 ++++++++ src/datafetch/native/iceberg.rs | 13 ++++++ src/datafetch/native/mod.rs | 37 ++++++++++++++++ src/datafetch/native/mysql.rs | 14 ++++++ src/datafetch/native/postgres.rs | 20 +++++++++ src/datafetch/native/snowflake.rs | 13 ++++++ src/datafetch/orchestrator.rs | 16 +++++++ src/engine.rs | 24 +++++++++++ src/http/app_server.rs | 18 +++++--- .../controllers/connections_controller.rs | 43 ++++++++++++++++++- src/http/controllers/mod.rs | 5 ++- src/http/models.rs | 10 +++++ 14 files changed, 253 insertions(+), 10 deletions(-) diff --git a/src/datafetch/fetcher.rs b/src/datafetch/fetcher.rs index 7ee6f40..4f160b9 100644 --- a/src/datafetch/fetcher.rs +++ b/src/datafetch/fetcher.rs @@ -27,4 +27,12 @@ pub trait DataFetcher: Send + Sync + std::fmt::Debug { table: &str, writer: &mut dyn BatchWriter, ) -> Result<(), DataFetchError>; + + /// Check connectivity to the remote source. + /// Returns Ok(()) if the connection is healthy, or an error describing the failure. + async fn check_health( + &self, + source: &Source, + secrets: &SecretManager, + ) -> Result<(), DataFetchError>; } diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 5416457..670a18c 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -45,6 +45,31 @@ async fn build_client(source: &Source, secrets: &SecretManager) -> Result Result<(), DataFetchError> { + let client = build_client(source, secrets).await?; + + let project_id = match source { + Source::Bigquery { project_id, .. } => project_id.as_str(), + _ => { + return Err(DataFetchError::Connection( + "Expected BigQuery source".to_string(), + )) + } + }; + + let query_request = QueryRequest::new("SELECT 1"); + client + .job() + .query(project_id, query_request) + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from BigQuery pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/duckdb.rs b/src/datafetch/native/duckdb.rs index a4a88b1..7e119a9 100644 --- a/src/datafetch/native/duckdb.rs +++ b/src/datafetch/native/duckdb.rs @@ -15,6 +15,23 @@ use crate::source::Source; // Arrow 56 IPC for duckdb compatibility (duckdb uses arrow 56) use arrow_ipc_56 as arrow56_ipc; +/// Check connectivity to a DuckDB/MotherDuck source +pub async fn check_health( + source: &Source, + secrets: &SecretManager, +) -> Result<(), DataFetchError> { + let connection_string = resolve_connection_string(source, secrets).await?; + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&connection_string) + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + conn.execute("SELECT 1", []) + .map_err(|e| DataFetchError::Query(e.to_string()))?; + Ok(()) + }) + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))? +} + /// Discover tables and columns from DuckDB/MotherDuck pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/iceberg.rs b/src/datafetch/native/iceberg.rs index 48c8506..baed2f4 100644 --- a/src/datafetch/native/iceberg.rs +++ b/src/datafetch/native/iceberg.rs @@ -103,6 +103,19 @@ async fn build_catalog( } } +/// Check connectivity to an Iceberg catalog +pub async fn check_health( + source: &Source, + secrets: &SecretManager, +) -> Result<(), DataFetchError> { + let catalog = build_catalog(source, secrets).await?; + catalog + .list_namespaces(None) + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from an Iceberg catalog. /// Recursively discovers nested namespaces. pub async fn discover_tables( diff --git a/src/datafetch/native/mod.rs b/src/datafetch/native/mod.rs index b6fd987..c806fe0 100644 --- a/src/datafetch/native/mod.rs +++ b/src/datafetch/native/mod.rs @@ -142,11 +142,17 @@ mod tests { } } +use std::time::Duration; + use crate::datafetch::batch_writer::BatchWriter; use crate::datafetch::{DataFetchError, DataFetcher, TableMetadata}; use crate::secrets::SecretManager; use crate::source::Source; +/// Timeout for health check operations. +/// Keeps health checks fast-failing for unreachable hosts. +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10); + /// Native Rust driver-based data fetcher #[derive(Debug, Default)] pub struct NativeFetcher; @@ -229,4 +235,35 @@ impl DataFetcher for NativeFetcher { } } } + + #[tracing::instrument( + name = "check_health", + skip(self, source, secrets), + fields(runtimedb.backend = %source.source_type()) + )] + async fn check_health( + &self, + source: &Source, + secrets: &SecretManager, + ) -> Result<(), DataFetchError> { + tokio::time::timeout(HEALTH_CHECK_TIMEOUT, async { + match source { + Source::Duckdb { .. } | Source::Motherduck { .. } => { + duckdb::check_health(source, secrets).await + } + Source::Postgres { .. } => postgres::check_health(source, secrets).await, + Source::Iceberg { .. } => iceberg::check_health(source, secrets).await, + Source::Mysql { .. } => mysql::check_health(source, secrets).await, + Source::Snowflake { .. } => snowflake::check_health(source, secrets).await, + Source::Bigquery { .. } => bigquery::check_health(source, secrets).await, + } + }) + .await + .unwrap_or_else(|_| { + Err(DataFetchError::Connection(format!( + "health check timed out after {}s", + HEALTH_CHECK_TIMEOUT.as_secs() + ))) + }) + } } diff --git a/src/datafetch/native/mysql.rs b/src/datafetch/native/mysql.rs index af2b3d4..756c407 100644 --- a/src/datafetch/native/mysql.rs +++ b/src/datafetch/native/mysql.rs @@ -84,6 +84,20 @@ async fn connect_with_ssl_retry( } } +/// Check connectivity to a MySQL source +pub async fn check_health( + source: &Source, + secrets: &SecretManager, +) -> Result<(), DataFetchError> { + let options = resolve_connect_options(source, secrets).await?; + let mut conn = connect_with_ssl_retry(options).await?; + sqlx::query("SELECT 1") + .execute(&mut conn) + .await + .map_err(|e| DataFetchError::Query(e.to_string()))?; + Ok(()) +} + /// Discover tables and columns from MySQL pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/postgres.rs b/src/datafetch/native/postgres.rs index b2d5785..0d3be98 100644 --- a/src/datafetch/native/postgres.rs +++ b/src/datafetch/native/postgres.rs @@ -93,6 +93,26 @@ async fn connect_with_ssl_retry(connection_string: &str) -> Result Result<(), DataFetchError> { + let mut connection_string = resolve_connection_string(source, secrets).await?; + // Set a short connect timeout so we don't hang on unreachable hosts + if connection_string.contains('?') { + connection_string.push_str("&connect_timeout=5"); + } else { + connection_string.push_str("?connect_timeout=5"); + } + let mut conn = connect_with_ssl_retry(&connection_string).await?; + sqlx::query("SELECT 1") + .execute(&mut conn) + .await + .map_err(|e| DataFetchError::Query(e.to_string()))?; + Ok(()) +} + /// Discover tables and columns from PostgreSQL pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/native/snowflake.rs b/src/datafetch/native/snowflake.rs index 39d8732..a6cac08 100644 --- a/src/datafetch/native/snowflake.rs +++ b/src/datafetch/native/snowflake.rs @@ -98,6 +98,19 @@ async fn build_client( Ok(client) } +/// Check connectivity to a Snowflake source +pub async fn check_health( + source: &Source, + secrets: &SecretManager, +) -> Result<(), DataFetchError> { + let client = build_client(source, secrets).await?; + client + .exec("SELECT 1") + .await + .map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?; + Ok(()) +} + /// Discover tables and columns from Snowflake pub async fn discover_tables( source: &Source, diff --git a/src/datafetch/orchestrator.rs b/src/datafetch/orchestrator.rs index db7bdb8..f589d8b 100644 --- a/src/datafetch/orchestrator.rs +++ b/src/datafetch/orchestrator.rs @@ -119,6 +119,14 @@ impl FetchOrchestrator { Ok((parquet_url, row_count)) } + /// Check connectivity to a remote source. + /// Delegates to the underlying fetcher. + pub async fn check_health(&self, source: &Source) -> Result<(), DataFetchError> { + self.fetcher + .check_health(source, &self.secret_manager) + .await + } + /// Discover tables from a remote source. /// Delegates to the underlying fetcher. pub async fn discover_tables( @@ -299,6 +307,14 @@ mod tests { }]) } + async fn check_health( + &self, + _source: &Source, + _secrets: &SecretManager, + ) -> Result<(), DataFetchError> { + Ok(()) + } + async fn fetch_table( &self, _source: &Source, diff --git a/src/engine.rs b/src/engine.rs index a9f890d..24bcfc8 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -461,6 +461,30 @@ impl RuntimeEngine { self.catalog.clone() } + /// Check health of a connection by attempting to connect to the remote source. + /// Returns Ok(()) if healthy, or an error describing the failure. + pub async fn check_connection_health( + &self, + connection_id: &str, + ) -> Result<(), crate::datafetch::DataFetchError> { + let conn = self + .catalog + .get_connection(connection_id) + .await + .map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))? + .ok_or_else(|| { + crate::datafetch::DataFetchError::Connection(format!( + "Connection '{}' not found", + connection_id + )) + })?; + + let source: Source = serde_json::from_str(&conn.config_json) + .map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))?; + + self.orchestrator.check_health(&source).await + } + /// List all configured connections. pub async fn list_connections(&self) -> Result> { self.catalog.list_connections().await diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 108b0ec..0a9422b 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -1,10 +1,11 @@ use crate::http::controllers::{ - create_connection_handler, create_dataset, create_secret_handler, delete_connection_handler, - delete_dataset, delete_secret_handler, get_connection_handler, get_dataset, get_result_handler, - get_secret_handler, health_handler, information_schema_handler, list_connections_handler, - list_datasets, list_query_runs_handler, list_results_handler, list_secrets_handler, - list_uploads, purge_connection_cache_handler, purge_table_cache_handler, query_handler, - refresh_handler, update_dataset, update_secret_handler, upload_file, MAX_UPLOAD_SIZE, + check_connection_health_handler, create_connection_handler, create_dataset, + create_secret_handler, delete_connection_handler, delete_dataset, delete_secret_handler, + get_connection_handler, get_dataset, get_result_handler, get_secret_handler, health_handler, + information_schema_handler, list_connections_handler, list_datasets, list_query_runs_handler, + list_results_handler, list_secrets_handler, list_uploads, purge_connection_cache_handler, + purge_table_cache_handler, query_handler, refresh_handler, update_dataset, + update_secret_handler, upload_file, MAX_UPLOAD_SIZE, }; use crate::RuntimeEngine; use axum::extract::DefaultBodyLimit; @@ -80,6 +81,7 @@ pub const PATH_HEALTH: &str = "/health"; pub const PATH_REFRESH: &str = "/refresh"; pub const PATH_CONNECTIONS: &str = "/connections"; pub const PATH_CONNECTION: &str = "/connections/{connection_id}"; +pub const PATH_CONNECTION_HEALTH: &str = "/connections/{connection_id}/health"; pub const PATH_CONNECTION_CACHE: &str = "/connections/{connection_id}/cache"; pub const PATH_TABLE_CACHE: &str = "/connections/{connection_id}/tables/{schema}/{table}/cache"; pub const PATH_SECRETS: &str = "/secrets"; @@ -108,6 +110,10 @@ impl AppServer { PATH_CONNECTION, get(get_connection_handler).delete(delete_connection_handler), ) + .route( + PATH_CONNECTION_HEALTH, + get(check_connection_health_handler), + ) .route( PATH_CONNECTION_CACHE, delete(purge_connection_cache_handler), diff --git a/src/http/controllers/connections_controller.rs b/src/http/controllers/connections_controller.rs index 3bccb75..8f979e8 100644 --- a/src/http/controllers/connections_controller.rs +++ b/src/http/controllers/connections_controller.rs @@ -1,7 +1,7 @@ use crate::http::error::ApiError; use crate::http::models::{ - ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, DiscoveryStatus, - GetConnectionResponse, ListConnectionsResponse, + ConnectionHealthResponse, ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, + DiscoveryStatus, GetConnectionResponse, ListConnectionsResponse, }; use crate::source::{Credential, Source}; use crate::RuntimeEngine; @@ -12,6 +12,7 @@ use axum::{ }; use serde::Deserialize; use std::sync::Arc; +use std::time::Instant; use tracing::error; /// Handler for POST /connections @@ -468,3 +469,41 @@ pub async fn purge_table_cache_handler( Ok(StatusCode::NO_CONTENT) } + +/// Handler for GET /connections/{connection_id}/health +#[tracing::instrument( + name = "handler_check_connection_health", + skip(engine), + fields(runtimedb.connection_id = %connection_id) +)] +pub async fn check_connection_health_handler( + State(engine): State>, + Path(connection_id): Path, +) -> Result, ApiError> { + // Verify connection exists (return 404 if not) + engine + .catalog() + .get_connection(&connection_id) + .await + .map_err(|e| ApiError::internal_error(e.to_string()))? + .ok_or_else(|| ApiError::not_found(format!("Connection '{}' not found", connection_id)))?; + + let start = Instant::now(); + let result = engine.check_connection_health(&connection_id).await; + let latency_ms = start.elapsed().as_millis() as u64; + + match result { + Ok(()) => Ok(Json(ConnectionHealthResponse { + connection_id, + healthy: true, + error: None, + latency_ms, + })), + Err(e) => Ok(Json(ConnectionHealthResponse { + connection_id, + healthy: false, + error: Some(e.to_string()), + latency_ms, + })), + } +} diff --git a/src/http/controllers/mod.rs b/src/http/controllers/mod.rs index 12cbed5..a6b58a9 100644 --- a/src/http/controllers/mod.rs +++ b/src/http/controllers/mod.rs @@ -10,8 +10,9 @@ pub mod secrets_controller; pub mod uploads_controller; pub use connections_controller::{ - create_connection_handler, delete_connection_handler, get_connection_handler, - list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler, + check_connection_health_handler, create_connection_handler, delete_connection_handler, + get_connection_handler, list_connections_handler, purge_connection_cache_handler, + purge_table_cache_handler, }; pub use datasets_controller::{ create_dataset, delete_dataset, get_dataset, list_datasets, update_dataset, diff --git a/src/http/models.rs b/src/http/models.rs index 2d4c84d..562bbed 100644 --- a/src/http/models.rs +++ b/src/http/models.rs @@ -204,6 +204,16 @@ pub struct ListConnectionsResponse { pub connections: Vec, } +/// Response body for GET /connections/{connection_id}/health +#[derive(Debug, Serialize)] +pub struct ConnectionHealthResponse { + pub connection_id: String, + pub healthy: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + pub latency_ms: u64, +} + /// Response body for GET /connections/{connection_id} #[derive(Debug, Serialize)] pub struct GetConnectionResponse { From da6b8a0b096bdf546ff7a2574bb5b96338563ad9 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Thu, 12 Feb 2026 17:11:53 +0530 Subject: [PATCH 2/5] fix(connections): drop redundant postgres connect_timeout The 10s tokio::time::timeout wrapper handles all backends uniformly. --- src/datafetch/native/postgres.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/datafetch/native/postgres.rs b/src/datafetch/native/postgres.rs index 0d3be98..16971ba 100644 --- a/src/datafetch/native/postgres.rs +++ b/src/datafetch/native/postgres.rs @@ -98,13 +98,7 @@ pub async fn check_health( source: &Source, secrets: &SecretManager, ) -> Result<(), DataFetchError> { - let mut connection_string = resolve_connection_string(source, secrets).await?; - // Set a short connect timeout so we don't hang on unreachable hosts - if connection_string.contains('?') { - connection_string.push_str("&connect_timeout=5"); - } else { - connection_string.push_str("?connect_timeout=5"); - } + let connection_string = resolve_connection_string(source, secrets).await?; let mut conn = connect_with_ssl_retry(&connection_string).await?; sqlx::query("SELECT 1") .execute(&mut conn) From f8cc7e9aa0391c6967ddab3ef6625085d4cc294a Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Mon, 16 Feb 2026 11:04:53 +0530 Subject: [PATCH 3/5] feat(connections): add health check for DuckLake datasource --- src/datafetch/native/ducklake.rs | 40 +++++++++++++++++++++++++------- src/datafetch/native/mod.rs | 1 + 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/datafetch/native/ducklake.rs b/src/datafetch/native/ducklake.rs index e13f2e4..0828997 100644 --- a/src/datafetch/native/ducklake.rs +++ b/src/datafetch/native/ducklake.rs @@ -133,15 +133,11 @@ pub async fn discover_tables( Ok(tables) } -/// Fetch table data from DuckLake and write to the batch writer. -pub async fn fetch_table( +/// Build a SessionContext with the DuckLake catalog and S3 object store registered. +async fn build_session_context( source: &Source, secrets: &SecretManager, - _catalog: Option<&str>, - schema: &str, - table: &str, - writer: &mut dyn BatchWriter, -) -> Result<(), DataFetchError> { +) -> Result { let (s3_endpoint, s3_region) = match source { Source::Ducklake { s3_endpoint, @@ -157,7 +153,6 @@ pub async fn fetch_table( let (catalog, creds) = build_catalog(source, secrets).await?; - // Build a SessionContext and register the catalog let ctx = SessionContext::new(); ctx.register_catalog("ducklake", catalog.clone()); @@ -201,6 +196,35 @@ pub async fn fetch_table( ctx.register_object_store(&s3_url, Arc::new(s3_store)); } + Ok(ctx) +} + +/// Check connectivity to the DuckLake catalog database and object store. +pub async fn check_health( + source: &Source, + secrets: &SecretManager, +) -> Result<(), DataFetchError> { + let ctx = build_session_context(source, secrets).await?; + ctx.sql("SELECT 1") + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))? + .collect() + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))?; + Ok(()) +} + +/// Fetch table data from DuckLake and write to the batch writer. +pub async fn fetch_table( + source: &Source, + secrets: &SecretManager, + _catalog: Option<&str>, + schema: &str, + table: &str, + writer: &mut dyn BatchWriter, +) -> Result<(), DataFetchError> { + let ctx = build_session_context(source, secrets).await?; + // Execute SELECT * FROM ducklake.{schema}.{table} let sql = format!( "SELECT * FROM ducklake.{}.{}", diff --git a/src/datafetch/native/mod.rs b/src/datafetch/native/mod.rs index c806fe0..f3ce2b2 100644 --- a/src/datafetch/native/mod.rs +++ b/src/datafetch/native/mod.rs @@ -256,6 +256,7 @@ impl DataFetcher for NativeFetcher { Source::Mysql { .. } => mysql::check_health(source, secrets).await, Source::Snowflake { .. } => snowflake::check_health(source, secrets).await, Source::Bigquery { .. } => bigquery::check_health(source, secrets).await, + Source::Ducklake { .. } => ducklake::check_health(source, secrets).await, } }) .await From 6ed90e0da503d0d1f48e5407c551c365f4a6300c Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Mon, 16 Feb 2026 12:07:34 +0530 Subject: [PATCH 4/5] refactor(connections): use sqlx ping for mysql and postgres health checks --- src/datafetch/native/mysql.rs | 7 +++---- src/datafetch/native/postgres.rs | 5 ++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/datafetch/native/mysql.rs b/src/datafetch/native/mysql.rs index 756c407..be18866 100644 --- a/src/datafetch/native/mysql.rs +++ b/src/datafetch/native/mysql.rs @@ -9,7 +9,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use futures::StreamExt; use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlRow, MySqlSslMode}; -use sqlx::{ConnectOptions, Row}; +use sqlx::{ConnectOptions, Connection, Row}; use std::sync::Arc; use tracing::warn; @@ -91,10 +91,9 @@ pub async fn check_health( ) -> Result<(), DataFetchError> { let options = resolve_connect_options(source, secrets).await?; let mut conn = connect_with_ssl_retry(options).await?; - sqlx::query("SELECT 1") - .execute(&mut conn) + conn.ping() .await - .map_err(|e| DataFetchError::Query(e.to_string()))?; + .map_err(|e| DataFetchError::Connection(e.to_string()))?; Ok(()) } diff --git a/src/datafetch/native/postgres.rs b/src/datafetch/native/postgres.rs index 16971ba..48996d8 100644 --- a/src/datafetch/native/postgres.rs +++ b/src/datafetch/native/postgres.rs @@ -100,10 +100,9 @@ pub async fn check_health( ) -> Result<(), DataFetchError> { let connection_string = resolve_connection_string(source, secrets).await?; let mut conn = connect_with_ssl_retry(&connection_string).await?; - sqlx::query("SELECT 1") - .execute(&mut conn) + conn.ping() .await - .map_err(|e| DataFetchError::Query(e.to_string()))?; + .map_err(|e| DataFetchError::Connection(e.to_string()))?; Ok(()) } From c009066899566cb77a17f78a368646086b6d4b17 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Mon, 16 Feb 2026 14:59:55 +0530 Subject: [PATCH 5/5] style(connections): apply cargo fmt formatting --- src/datafetch/native/bigquery.rs | 5 +---- src/datafetch/native/duckdb.rs | 5 +---- src/datafetch/native/ducklake.rs | 5 +---- src/datafetch/native/iceberg.rs | 5 +---- src/datafetch/native/mysql.rs | 5 +---- src/datafetch/native/postgres.rs | 5 +---- src/datafetch/native/snowflake.rs | 5 +---- src/http/app_server.rs | 5 +---- 8 files changed, 8 insertions(+), 32 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 670a18c..0a6678c 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -46,10 +46,7 @@ async fn build_client(source: &Source, secrets: &SecretManager) -> Result Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let client = build_client(source, secrets).await?; let project_id = match source { diff --git a/src/datafetch/native/duckdb.rs b/src/datafetch/native/duckdb.rs index 7e119a9..e2bd36d 100644 --- a/src/datafetch/native/duckdb.rs +++ b/src/datafetch/native/duckdb.rs @@ -16,10 +16,7 @@ use crate::source::Source; use arrow_ipc_56 as arrow56_ipc; /// Check connectivity to a DuckDB/MotherDuck source -pub async fn check_health( - source: &Source, - secrets: &SecretManager, -) -> Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let connection_string = resolve_connection_string(source, secrets).await?; tokio::task::spawn_blocking(move || { let conn = Connection::open(&connection_string) diff --git a/src/datafetch/native/ducklake.rs b/src/datafetch/native/ducklake.rs index 0828997..bb9ccc5 100644 --- a/src/datafetch/native/ducklake.rs +++ b/src/datafetch/native/ducklake.rs @@ -200,10 +200,7 @@ async fn build_session_context( } /// Check connectivity to the DuckLake catalog database and object store. -pub async fn check_health( - source: &Source, - secrets: &SecretManager, -) -> Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let ctx = build_session_context(source, secrets).await?; ctx.sql("SELECT 1") .await diff --git a/src/datafetch/native/iceberg.rs b/src/datafetch/native/iceberg.rs index baed2f4..58116ad 100644 --- a/src/datafetch/native/iceberg.rs +++ b/src/datafetch/native/iceberg.rs @@ -104,10 +104,7 @@ async fn build_catalog( } /// Check connectivity to an Iceberg catalog -pub async fn check_health( - source: &Source, - secrets: &SecretManager, -) -> Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let catalog = build_catalog(source, secrets).await?; catalog .list_namespaces(None) diff --git a/src/datafetch/native/mysql.rs b/src/datafetch/native/mysql.rs index be18866..afe5223 100644 --- a/src/datafetch/native/mysql.rs +++ b/src/datafetch/native/mysql.rs @@ -85,10 +85,7 @@ async fn connect_with_ssl_retry( } /// Check connectivity to a MySQL source -pub async fn check_health( - source: &Source, - secrets: &SecretManager, -) -> Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let options = resolve_connect_options(source, secrets).await?; let mut conn = connect_with_ssl_retry(options).await?; conn.ping() diff --git a/src/datafetch/native/postgres.rs b/src/datafetch/native/postgres.rs index 48996d8..8abfabd 100644 --- a/src/datafetch/native/postgres.rs +++ b/src/datafetch/native/postgres.rs @@ -94,10 +94,7 @@ async fn connect_with_ssl_retry(connection_string: &str) -> Result Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let connection_string = resolve_connection_string(source, secrets).await?; let mut conn = connect_with_ssl_retry(&connection_string).await?; conn.ping() diff --git a/src/datafetch/native/snowflake.rs b/src/datafetch/native/snowflake.rs index a6cac08..ce69a0a 100644 --- a/src/datafetch/native/snowflake.rs +++ b/src/datafetch/native/snowflake.rs @@ -99,10 +99,7 @@ async fn build_client( } /// Check connectivity to a Snowflake source -pub async fn check_health( - source: &Source, - secrets: &SecretManager, -) -> Result<(), DataFetchError> { +pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> { let client = build_client(source, secrets).await?; client .exec("SELECT 1") diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 0a9422b..a78b49b 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -110,10 +110,7 @@ impl AppServer { PATH_CONNECTION, get(get_connection_handler).delete(delete_connection_handler), ) - .route( - PATH_CONNECTION_HEALTH, - get(check_connection_health_handler), - ) + .route(PATH_CONNECTION_HEALTH, get(check_connection_health_handler)) .route( PATH_CONNECTION_CACHE, delete(purge_connection_cache_handler),