diff --git a/Cargo.lock b/Cargo.lock index 3c14964..8cc8072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2645,6 +2645,26 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b99e13947667b36ad713549237362afb054b2d8f8cc447751e23ec61202db07" +[[package]] +name = "datafusion-ducklake" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d611b5aef6cdbe0287352376813ec422367437474e3d3033414d98913dd72d0e" +dependencies = [ + "arrow 57.2.0", + "async-trait", + "datafusion", + "duckdb", + "futures", + "object_store 0.12.4", + "parquet 57.2.0", + "sqlx", + "thiserror 2.0.17", + "tokio", + "tracing", + "url", +] + [[package]] name = "datafusion-execution" version = "51.0.0" @@ -6311,6 +6331,7 @@ dependencies = [ "clap", "config", "datafusion", + "datafusion-ducklake", "datafusion-tracing", "duckdb", "futures", diff --git a/Cargo.toml b/Cargo.toml index 68aa187..7e61e82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ arrow-array-54 = { package = "arrow-array", version = "54" } arrow-ipc-54 = { package = "arrow-ipc", version = "54" } # Arrow 56 for duckdb compatibility (duckdb uses arrow 56, datafusion uses arrow 57) arrow-ipc-56 = { package = "arrow-ipc", version = "56" } +datafusion-ducklake = { version = "0.0.6", features = ["metadata-postgres"] } liquid-cache-client = { git = "https://github.com/XiangpengHao/liquid-cache", rev = "5e63d811132230a0e15fb6d4311be2eb5551cb4d" } redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] } sha2 = "0.10" diff --git a/src/datafetch/native/ducklake.rs b/src/datafetch/native/ducklake.rs new file mode 100644 index 0000000..e13f2e4 --- /dev/null +++ b/src/datafetch/native/ducklake.rs @@ -0,0 +1,321 @@ +//! DuckLake native driver implementation using datafusion-ducklake + +use std::sync::Arc; + +use datafusion::catalog::CatalogProvider; +use datafusion::prelude::SessionContext; +use datafusion_ducklake::{DuckLakeCatalog, PostgresMetadataProvider}; +use object_store::aws::AmazonS3Builder; +use serde::Deserialize; +use url::Url; + +use crate::datafetch::batch_writer::BatchWriter; +use crate::datafetch::{ColumnMetadata, DataFetchError, TableMetadata}; +use crate::secrets::SecretManager; +use crate::source::Source; + +/// Credentials extracted from the credentials_json secret. +#[derive(Deserialize)] +struct DucklakeCredentials { + catalog_db_password: String, + s3_access_key_id: Option, + s3_secret_access_key: Option, +} + +/// Build the full catalog URL by injecting the password into the connection string. +fn build_catalog_url_with_password( + catalog_url: &str, + password: &str, +) -> Result { + let mut parsed = Url::parse(catalog_url) + .map_err(|e| DataFetchError::Connection(format!("Invalid catalog_url: {}", e)))?; + parsed.set_password(Some(password)).map_err(|_| { + DataFetchError::Connection("Cannot set password on catalog_url".to_string()) + })?; + Ok(parsed.to_string()) +} + +/// Build a DuckLakeCatalog from source configuration. +async fn build_catalog( + source: &Source, + secrets: &SecretManager, +) -> Result<(Arc, DucklakeCredentials), DataFetchError> { + let (catalog_url, credential) = match source { + Source::Ducklake { + catalog_url, + credential, + .. + } => (catalog_url.as_str(), credential), + _ => { + return Err(DataFetchError::Connection( + "Expected DuckLake source".to_string(), + )) + } + }; + + let creds_json = credential + .resolve(secrets) + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + + let creds: DucklakeCredentials = serde_json::from_str(&creds_json).map_err(|e| { + DataFetchError::Connection(format!("Invalid DuckLake credentials JSON: {}", e)) + })?; + + let full_url = build_catalog_url_with_password(catalog_url, &creds.catalog_db_password)?; + + let provider = PostgresMetadataProvider::new(&full_url) + .await + .map_err(|e| { + DataFetchError::Connection(format!("Failed to connect to catalog DB: {}", e)) + })?; + + let catalog = DuckLakeCatalog::new(provider).map_err(|e| { + DataFetchError::Connection(format!("Failed to create DuckLake catalog: {}", e)) + })?; + + Ok((Arc::new(catalog), creds)) +} + +/// Discover tables and columns from a DuckLake catalog. +pub async fn discover_tables( + source: &Source, + secrets: &SecretManager, +) -> Result, DataFetchError> { + let (catalog, _creds) = build_catalog(source, secrets).await?; + + let schema_names = catalog.schema_names(); + + let mut tables = Vec::new(); + + for schema_name in schema_names { + // Skip system schemas + if schema_name == "information_schema" { + continue; + } + + let schema = match catalog.schema(&schema_name) { + Some(s) => s, + None => continue, + }; + + let table_names = schema.table_names(); + + for table_name in table_names { + let table = match schema.table(&table_name).await { + Ok(Some(t)) => t, + _ => continue, + }; + + let arrow_schema = table.schema(); + let columns = arrow_schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| ColumnMetadata { + name: field.name().clone(), + data_type: field.data_type().clone(), + nullable: field.is_nullable(), + ordinal_position: i as i32, + }) + .collect(); + + tables.push(TableMetadata { + catalog_name: None, + schema_name: schema_name.clone(), + table_name, + table_type: "BASE TABLE".to_string(), + columns, + }); + } + } + + Ok(tables) +} + +/// Fetch table data from DuckLake and write to the batch writer. +pub async fn fetch_table( + source: &Source, + secrets: &SecretManager, + _catalog: Option<&str>, + schema: &str, + table: &str, + writer: &mut dyn BatchWriter, +) -> Result<(), DataFetchError> { + let (s3_endpoint, s3_region) = match source { + Source::Ducklake { + s3_endpoint, + s3_region, + .. + } => (s3_endpoint.clone(), s3_region.clone()), + _ => { + return Err(DataFetchError::Connection( + "Expected DuckLake source".to_string(), + )) + } + }; + + let (catalog, creds) = build_catalog(source, secrets).await?; + + // Build a SessionContext and register the catalog + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", catalog.clone()); + + // Get the data path from the catalog to determine the S3 bucket + let data_path = catalog + .provider() + .get_data_path() + .map_err(|e| DataFetchError::Connection(format!("Failed to get data path: {}", e)))?; + + // If the data path is an S3 URL, register an object store for the bucket + if data_path.starts_with("s3://") { + let parsed_data_url = Url::parse(&data_path) + .map_err(|e| DataFetchError::Connection(format!("Invalid data path URL: {}", e)))?; + let bucket = parsed_data_url.host_str().ok_or_else(|| { + DataFetchError::Connection(format!("No bucket name in data path: {}", data_path)) + })?; + + let mut s3_builder = AmazonS3Builder::new() + .with_bucket_name(bucket) + .with_allow_http(true); + + if let Some(access_key) = &creds.s3_access_key_id { + s3_builder = s3_builder.with_access_key_id(access_key); + } + if let Some(secret_key) = &creds.s3_secret_access_key { + s3_builder = s3_builder.with_secret_access_key(secret_key); + } + if let Some(endpoint) = &s3_endpoint { + s3_builder = s3_builder.with_endpoint(endpoint); + } + if let Some(region) = &s3_region { + s3_builder = s3_builder.with_region(region); + } + + let s3_store = s3_builder + .build() + .map_err(|e| DataFetchError::Connection(format!("Failed to build S3 store: {}", e)))?; + + let s3_url = Url::parse(&format!("s3://{}/", bucket)) + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + ctx.register_object_store(&s3_url, Arc::new(s3_store)); + } + + // Execute SELECT * FROM ducklake.{schema}.{table} + let sql = format!( + "SELECT * FROM ducklake.{}.{}", + sanitize_identifier(schema), + sanitize_identifier(table), + ); + + let df = ctx + .sql(&sql) + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake query failed: {}", e)))?; + + let batches = df + .collect() + .await + .map_err(|e| DataFetchError::Query(format!("DuckLake collect failed: {}", e)))?; + + if batches.is_empty() { + // Get schema from the table provider directly for empty tables + let catalog_provider = ctx + .catalog("ducklake") + .ok_or_else(|| DataFetchError::Query("DuckLake catalog not found".to_string()))?; + let schema_provider = catalog_provider + .schema(schema) + .ok_or_else(|| DataFetchError::Query(format!("Schema '{}' not found", schema)))?; + let table_provider = schema_provider + .table(table) + .await + .map_err(|e| DataFetchError::Query(e.to_string()))? + .ok_or_else(|| { + DataFetchError::Query(format!("Table '{}.{}' not found", schema, table)) + })?; + writer.init(table_provider.schema().as_ref())?; + } else { + writer.init(batches[0].schema().as_ref())?; + for batch in &batches { + writer.write_batch(batch)?; + } + } + + Ok(()) +} + +/// Sanitize a SQL identifier by quoting it with double quotes. +fn sanitize_identifier(name: &str) -> String { + format!("\"{}\"", name.replace('"', "\"\"")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_catalog_url_with_password() { + let url = build_catalog_url_with_password( + "postgresql://user@localhost:5432/ducklake", + "secret123", + ) + .unwrap(); + assert_eq!(url, "postgresql://user:secret123@localhost:5432/ducklake"); + } + + #[test] + fn test_build_catalog_url_with_special_chars_password() { + let url = build_catalog_url_with_password( + "postgresql://user@localhost:5432/ducklake", + "p@ss w0rd!&=", + ) + .unwrap(); + // Verify the password round-trips correctly through URL parsing + let parsed = Url::parse(&url).unwrap(); + assert_eq!(parsed.username(), "user"); + // The url crate percent-encodes some special chars but not all + // Verify the password is present and decodes correctly + let decoded = urlencoding::decode(parsed.password().unwrap()).unwrap(); + assert_eq!(decoded, "p@ss w0rd!&="); + } + + #[test] + fn test_build_catalog_url_replaces_existing_password() { + let url = build_catalog_url_with_password( + "postgresql://user:oldpass@localhost:5432/ducklake", + "newpass", + ) + .unwrap(); + assert_eq!(url, "postgresql://user:newpass@localhost:5432/ducklake"); + } + + #[test] + fn test_build_catalog_url_invalid_url() { + let result = build_catalog_url_with_password("not-a-url", "pass"); + assert!(result.is_err()); + } + + #[test] + fn test_sanitize_identifier() { + assert_eq!(sanitize_identifier("my_table"), "\"my_table\""); + assert_eq!(sanitize_identifier("has\"quote"), "\"has\"\"quote\""); + } + + #[test] + fn test_ducklake_credentials_deserialization() { + let json = r#"{"catalog_db_password": "secret", "s3_access_key_id": "AKIA...", "s3_secret_access_key": "wJal..."}"#; + let creds: DucklakeCredentials = serde_json::from_str(json).unwrap(); + assert_eq!(creds.catalog_db_password, "secret"); + assert_eq!(creds.s3_access_key_id.as_deref(), Some("AKIA...")); + assert_eq!(creds.s3_secret_access_key.as_deref(), Some("wJal...")); + } + + #[test] + fn test_ducklake_credentials_without_s3() { + let json = r#"{"catalog_db_password": "secret"}"#; + let creds: DucklakeCredentials = serde_json::from_str(json).unwrap(); + assert_eq!(creds.catalog_db_password, "secret"); + assert!(creds.s3_access_key_id.is_none()); + assert!(creds.s3_secret_access_key.is_none()); + } +} diff --git a/src/datafetch/native/mod.rs b/src/datafetch/native/mod.rs index 2d59a1f..b6fd987 100644 --- a/src/datafetch/native/mod.rs +++ b/src/datafetch/native/mod.rs @@ -2,6 +2,7 @@ // The type mapping functions are the authoritative implementations that tests validate against. pub mod bigquery; pub mod duckdb; +pub mod ducklake; pub mod iceberg; pub mod mysql; mod parquet_writer; @@ -180,6 +181,7 @@ impl DataFetcher for NativeFetcher { Source::Mysql { .. } => mysql::discover_tables(source, secrets).await, Source::Snowflake { .. } => snowflake::discover_tables(source, secrets).await, Source::Bigquery { .. } => bigquery::discover_tables(source, secrets).await, + Source::Ducklake { .. } => ducklake::discover_tables(source, secrets).await, }?; tracing::Span::current().record("runtimedb.tables_found", tables.len()); Ok(tables) @@ -222,6 +224,9 @@ impl DataFetcher for NativeFetcher { Source::Bigquery { .. } => { bigquery::fetch_table(source, secrets, catalog, schema, table, writer).await } + Source::Ducklake { .. } => { + ducklake::fetch_table(source, secrets, catalog, schema, table, writer).await + } } } } diff --git a/src/source.rs b/src/source.rs index 9181c19..6f8fdbf 100644 --- a/src/source.rs +++ b/src/source.rs @@ -139,6 +139,16 @@ pub enum Source { #[serde(default)] credential: Credential, }, + Ducklake { + /// Catalog database URL (e.g., "postgresql://user@host:5432/dbname") + catalog_url: String, + #[serde(skip_serializing_if = "Option::is_none")] + s3_endpoint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + s3_region: Option, + #[serde(default)] + credential: Credential, + }, } impl Source { @@ -152,6 +162,7 @@ impl Source { Source::Iceberg { .. } => "iceberg", Source::Mysql { .. } => "mysql", Source::Bigquery { .. } => "bigquery", + Source::Ducklake { .. } => "ducklake", } } @@ -177,6 +188,7 @@ impl Source { }, Source::Mysql { credential, .. } => credential, Source::Bigquery { credential, .. } => credential, + Source::Ducklake { credential, .. } => credential, } } @@ -267,6 +279,17 @@ impl Source { region, credential, }, + Source::Ducklake { + catalog_url, + s3_endpoint, + s3_region, + .. + } => Source::Ducklake { + catalog_url, + s3_endpoint, + s3_region, + credential, + }, } } } @@ -821,4 +844,89 @@ mod tests { }; assert!(matches!(without_cred.credential(), Credential::None)); } + + #[test] + fn test_ducklake_serialization() { + let source = Source::Ducklake { + catalog_url: "postgresql://user@localhost:5432/ducklake".to_string(), + s3_endpoint: Some("http://localhost:9000".to_string()), + s3_region: Some("us-east-1".to_string()), + credential: Credential::SecretRef { + id: "secr_dl".to_string(), + }, + }; + + let json = serde_json::to_string(&source).unwrap(); + assert!(json.contains(r#""type":"ducklake""#)); + assert!(json.contains(r#""catalog_url":"postgresql://user@localhost:5432/ducklake""#)); + assert!(json.contains(r#""s3_endpoint":"http://localhost:9000""#)); + assert!(json.contains(r#""s3_region":"us-east-1""#)); + + let parsed: Source = serde_json::from_str(&json).unwrap(); + assert_eq!(source, parsed); + } + + #[test] + fn test_ducklake_without_s3_fields() { + let source = Source::Ducklake { + catalog_url: "postgresql://user@localhost:5432/ducklake".to_string(), + s3_endpoint: None, + s3_region: None, + credential: Credential::None, + }; + + let json = serde_json::to_string(&source).unwrap(); + assert!(!json.contains(r#""s3_endpoint""#)); + assert!(!json.contains(r#""s3_region""#)); + + let parsed: Source = serde_json::from_str(&json).unwrap(); + assert_eq!(source, parsed); + } + + #[test] + fn test_ducklake_source_type() { + let dl = Source::Ducklake { + catalog_url: "postgresql://localhost/db".to_string(), + s3_endpoint: None, + s3_region: None, + credential: Credential::None, + }; + assert_eq!(dl.source_type(), "ducklake"); + } + + #[test] + fn test_ducklake_catalog_returns_none() { + let dl = Source::Ducklake { + catalog_url: "postgresql://localhost/db".to_string(), + s3_endpoint: None, + s3_region: None, + credential: Credential::None, + }; + assert_eq!(dl.catalog(), None); + } + + #[test] + fn test_ducklake_credential_accessor() { + let with_secret = Source::Ducklake { + catalog_url: "postgresql://localhost/db".to_string(), + s3_endpoint: None, + s3_region: None, + credential: Credential::SecretRef { + id: "secr_dl".to_string(), + }, + }; + assert!(matches!( + with_secret.credential(), + Credential::SecretRef { id } if id == "secr_dl" + )); + assert_eq!(with_secret.secret_id(), Some("secr_dl")); + + let without_cred = Source::Ducklake { + catalog_url: "postgresql://localhost/db".to_string(), + s3_endpoint: None, + s3_region: None, + credential: Credential::None, + }; + assert!(matches!(without_cred.credential(), Credential::None)); + } }