Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/datafetch/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ pub trait DataFetcher: Send + Sync + std::fmt::Debug {
table: &str,
writer: &mut dyn BatchWriter,
) -> Result<(), DataFetchError>;

/// Check connectivity to the remote source.
/// Returns Ok(()) if the connection is healthy, or an error describing the failure.
async fn check_health(
&self,
source: &Source,
secrets: &SecretManager,
) -> Result<(), DataFetchError>;
}
22 changes: 22 additions & 0 deletions src/datafetch/native/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ async fn build_client(source: &Source, secrets: &SecretManager) -> Result<Client
.map_err(|e| DataFetchError::Connection(format!("Failed to create BigQuery client: {}", e)))
}

/// Check connectivity to a BigQuery source
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let client = build_client(source, secrets).await?;

let project_id = match source {
Source::Bigquery { project_id, .. } => project_id.as_str(),
_ => {
return Err(DataFetchError::Connection(
"Expected BigQuery source".to_string(),
))
}
};

let query_request = QueryRequest::new("SELECT 1");
client
.job()
.query(project_id, query_request)
.await
.map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?;
Ok(())
}

/// Discover tables and columns from BigQuery
pub async fn discover_tables(
source: &Source,
Expand Down
14 changes: 14 additions & 0 deletions src/datafetch/native/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ use crate::source::Source;
// Arrow 56 IPC for duckdb compatibility (duckdb uses arrow 56)
use arrow_ipc_56 as arrow56_ipc;

/// Check connectivity to a DuckDB/MotherDuck source
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let connection_string = resolve_connection_string(source, secrets).await?;
tokio::task::spawn_blocking(move || {
let conn = Connection::open(&connection_string)
.map_err(|e| DataFetchError::Connection(e.to_string()))?;
conn.execute("SELECT 1", [])
.map_err(|e| DataFetchError::Query(e.to_string()))?;
Ok(())
})
.await
.map_err(|e| DataFetchError::Connection(e.to_string()))?
}

/// Discover tables and columns from DuckDB/MotherDuck
pub async fn discover_tables(
source: &Source,
Expand Down
37 changes: 29 additions & 8 deletions src/datafetch/native/ducklake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,11 @@ pub async fn discover_tables(
Ok(tables)
}

/// Fetch table data from DuckLake and write to the batch writer.
pub async fn fetch_table(
/// Build a SessionContext with the DuckLake catalog and S3 object store registered.
async fn build_session_context(
source: &Source,
secrets: &SecretManager,
_catalog: Option<&str>,
schema: &str,
table: &str,
writer: &mut dyn BatchWriter,
) -> Result<(), DataFetchError> {
) -> Result<SessionContext, DataFetchError> {
let (s3_endpoint, s3_region) = match source {
Source::Ducklake {
s3_endpoint,
Expand All @@ -157,7 +153,6 @@ pub async fn fetch_table(

let (catalog, creds) = build_catalog(source, secrets).await?;

// Build a SessionContext and register the catalog
let ctx = SessionContext::new();
ctx.register_catalog("ducklake", catalog.clone());

Expand Down Expand Up @@ -201,6 +196,32 @@ pub async fn fetch_table(
ctx.register_object_store(&s3_url, Arc::new(s3_store));
}

Ok(ctx)
}

/// Check connectivity to the DuckLake catalog database and object store.
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let ctx = build_session_context(source, secrets).await?;
ctx.sql("SELECT 1")
.await
.map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))?
.collect()
.await
.map_err(|e| DataFetchError::Query(format!("DuckLake health check failed: {}", e)))?;
Ok(())
}

/// Fetch table data from DuckLake and write to the batch writer.
pub async fn fetch_table(
source: &Source,
secrets: &SecretManager,
_catalog: Option<&str>,
schema: &str,
table: &str,
writer: &mut dyn BatchWriter,
) -> Result<(), DataFetchError> {
let ctx = build_session_context(source, secrets).await?;

// Execute SELECT * FROM ducklake.{schema}.{table}
let sql = format!(
"SELECT * FROM ducklake.{}.{}",
Expand Down
10 changes: 10 additions & 0 deletions src/datafetch/native/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ async fn build_catalog(
}
}

/// Check connectivity to an Iceberg catalog
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let catalog = build_catalog(source, secrets).await?;
catalog
.list_namespaces(None)
.await
.map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?;
Ok(())
}

/// Discover tables and columns from an Iceberg catalog.
/// Recursively discovers nested namespaces.
pub async fn discover_tables(
Expand Down
38 changes: 38 additions & 0 deletions src/datafetch/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,17 @@ mod tests {
}
}

use std::time::Duration;

use crate::datafetch::batch_writer::BatchWriter;
use crate::datafetch::{DataFetchError, DataFetcher, TableMetadata};
use crate::secrets::SecretManager;
use crate::source::Source;

/// Timeout for health check operations.
/// Keeps health checks fast-failing for unreachable hosts.
const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10);

/// Native Rust driver-based data fetcher
#[derive(Debug, Default)]
pub struct NativeFetcher;
Expand Down Expand Up @@ -229,4 +235,36 @@ impl DataFetcher for NativeFetcher {
}
}
}

#[tracing::instrument(
name = "check_health",
skip(self, source, secrets),
fields(runtimedb.backend = %source.source_type())
)]
async fn check_health(
&self,
source: &Source,
secrets: &SecretManager,
) -> Result<(), DataFetchError> {
tokio::time::timeout(HEALTH_CHECK_TIMEOUT, async {
match source {
Source::Duckdb { .. } | Source::Motherduck { .. } => {
duckdb::check_health(source, secrets).await
}
Source::Postgres { .. } => postgres::check_health(source, secrets).await,
Source::Iceberg { .. } => iceberg::check_health(source, secrets).await,
Source::Mysql { .. } => mysql::check_health(source, secrets).await,
Source::Snowflake { .. } => snowflake::check_health(source, secrets).await,
Source::Bigquery { .. } => bigquery::check_health(source, secrets).await,
Source::Ducklake { .. } => ducklake::check_health(source, secrets).await,
}
})
.await
.unwrap_or_else(|_| {
Err(DataFetchError::Connection(format!(
"health check timed out after {}s",
HEALTH_CHECK_TIMEOUT.as_secs()
)))
})
}
}
12 changes: 11 additions & 1 deletion src/datafetch/native/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlRow, MySqlSslMode};
use sqlx::{ConnectOptions, Row};
use sqlx::{ConnectOptions, Connection, Row};
use std::sync::Arc;
use tracing::warn;

Expand Down Expand Up @@ -84,6 +84,16 @@ async fn connect_with_ssl_retry(
}
}

/// Check connectivity to a MySQL source
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let options = resolve_connect_options(source, secrets).await?;
let mut conn = connect_with_ssl_retry(options).await?;
conn.ping()
.await
.map_err(|e| DataFetchError::Connection(e.to_string()))?;
Ok(())
}

/// Discover tables and columns from MySQL
pub async fn discover_tables(
source: &Source,
Expand Down
10 changes: 10 additions & 0 deletions src/datafetch/native/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ async fn connect_with_ssl_retry(connection_string: &str) -> Result<PgConnection,
}
}

/// Check connectivity to a PostgreSQL source
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let connection_string = resolve_connection_string(source, secrets).await?;
let mut conn = connect_with_ssl_retry(&connection_string).await?;
conn.ping()
.await
.map_err(|e| DataFetchError::Connection(e.to_string()))?;
Ok(())
}

/// Discover tables and columns from PostgreSQL
pub async fn discover_tables(
source: &Source,
Expand Down
10 changes: 10 additions & 0 deletions src/datafetch/native/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ async fn build_client(
Ok(client)
}

/// Check connectivity to a Snowflake source
pub async fn check_health(source: &Source, secrets: &SecretManager) -> Result<(), DataFetchError> {
let client = build_client(source, secrets).await?;
client
.exec("SELECT 1")
.await
.map_err(|e| DataFetchError::Query(format!("Health check failed: {}", e)))?;
Ok(())
}

/// Discover tables and columns from Snowflake
pub async fn discover_tables(
source: &Source,
Expand Down
16 changes: 16 additions & 0 deletions src/datafetch/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ impl FetchOrchestrator {
Ok((parquet_url, row_count))
}

/// Check connectivity to a remote source.
/// Delegates to the underlying fetcher.
pub async fn check_health(&self, source: &Source) -> Result<(), DataFetchError> {
self.fetcher
.check_health(source, &self.secret_manager)
.await
}

/// Discover tables from a remote source.
/// Delegates to the underlying fetcher.
pub async fn discover_tables(
Expand Down Expand Up @@ -299,6 +307,14 @@ mod tests {
}])
}

async fn check_health(
&self,
_source: &Source,
_secrets: &SecretManager,
) -> Result<(), DataFetchError> {
Ok(())
}

async fn fetch_table(
&self,
_source: &Source,
Expand Down
24 changes: 24 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,30 @@ impl RuntimeEngine {
self.catalog.clone()
}

/// Check health of a connection by attempting to connect to the remote source.
/// Returns Ok(()) if healthy, or an error describing the failure.
pub async fn check_connection_health(
&self,
connection_id: &str,
) -> Result<(), crate::datafetch::DataFetchError> {
let conn = self
.catalog
.get_connection(connection_id)
.await
.map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))?
.ok_or_else(|| {
crate::datafetch::DataFetchError::Connection(format!(
"Connection '{}' not found",
connection_id
))
})?;

let source: Source = serde_json::from_str(&conn.config_json)
.map_err(|e| crate::datafetch::DataFetchError::Connection(e.to_string()))?;

self.orchestrator.check_health(&source).await
}

/// List all configured connections.
pub async fn list_connections(&self) -> Result<Vec<ConnectionInfo>> {
self.catalog.list_connections().await
Expand Down
15 changes: 9 additions & 6 deletions src/http/app_server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::http::controllers::{
create_connection_handler, create_dataset, create_secret_handler, delete_connection_handler,
delete_dataset, delete_secret_handler, get_connection_handler, get_dataset, get_result_handler,
get_secret_handler, health_handler, information_schema_handler, list_connections_handler,
list_datasets, list_query_runs_handler, list_results_handler, list_secrets_handler,
list_uploads, purge_connection_cache_handler, purge_table_cache_handler, query_handler,
refresh_handler, update_dataset, update_secret_handler, upload_file, MAX_UPLOAD_SIZE,
check_connection_health_handler, create_connection_handler, create_dataset,
create_secret_handler, delete_connection_handler, delete_dataset, delete_secret_handler,
get_connection_handler, get_dataset, get_result_handler, get_secret_handler, health_handler,
information_schema_handler, list_connections_handler, list_datasets, list_query_runs_handler,
list_results_handler, list_secrets_handler, list_uploads, purge_connection_cache_handler,
purge_table_cache_handler, query_handler, refresh_handler, update_dataset,
update_secret_handler, upload_file, MAX_UPLOAD_SIZE,
};
use crate::RuntimeEngine;
use axum::extract::DefaultBodyLimit;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub const PATH_HEALTH: &str = "/health";
pub const PATH_REFRESH: &str = "/refresh";
pub const PATH_CONNECTIONS: &str = "/connections";
pub const PATH_CONNECTION: &str = "/connections/{connection_id}";
pub const PATH_CONNECTION_HEALTH: &str = "/connections/{connection_id}/health";
pub const PATH_CONNECTION_CACHE: &str = "/connections/{connection_id}/cache";
pub const PATH_TABLE_CACHE: &str = "/connections/{connection_id}/tables/{schema}/{table}/cache";
pub const PATH_SECRETS: &str = "/secrets";
Expand Down Expand Up @@ -108,6 +110,7 @@ impl AppServer {
PATH_CONNECTION,
get(get_connection_handler).delete(delete_connection_handler),
)
.route(PATH_CONNECTION_HEALTH, get(check_connection_health_handler))
.route(
PATH_CONNECTION_CACHE,
delete(purge_connection_cache_handler),
Expand Down
Loading
Loading