From d50430fa0f3faf38a36a4655b527f6eeadeb8047 Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Fri, 13 Feb 2026 13:41:27 +0530 Subject: [PATCH 1/4] Add S3/ObjectStore write support for DuckLake catalogs --- Cargo.toml | 1 + src/catalog.rs | 6 +- src/error.rs | 4 + src/insert_exec.rs | 24 ++-- src/schema.rs | 6 +- src/table.rs | 7 +- src/table_writer.rs | 192 ++++++++++++++------------------ tests/concurrent_write_tests.rs | 68 +++++++---- tests/sql_write_tests.rs | 36 +++++- tests/write_tests.rs | 138 +++++++++++++++++------ 10 files changed, 290 insertions(+), 192 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4be5c86..0b02a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ duckdb = { version = "1.4.1", features = ["bundled"], optional = true } sqlx = { version = "0.8", features = ["runtime-tokio"], optional = true } [dev-dependencies] +bytes = "1" testcontainers = "0.23" testcontainers-modules = { version = "0.11", features = ["minio", "postgres", "mysql"] } tempfile = "3.14" diff --git a/src/catalog.rs b/src/catalog.rs index 8f312a9..16ce3d5 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -13,8 +13,6 @@ use datafusion::datasource::object_store::ObjectStoreUrl; #[cfg(feature = "write")] use crate::metadata_writer::MetadataWriter; -#[cfg(feature = "write")] -use std::path::PathBuf; /// Configuration for write operations (when write feature is enabled) #[cfg(feature = "write")] @@ -23,7 +21,7 @@ struct WriteConfig { /// Metadata writer for catalog operations writer: Arc, /// Base data path for writing files - data_path: PathBuf, + data_path: String, } /// DuckLake catalog provider @@ -125,7 +123,7 @@ impl DuckLakeCatalog { catalog_path, write_config: Some(WriteConfig { writer, - data_path: PathBuf::from(&data_path_str), + data_path: data_path_str, }), }) } diff --git a/src/error.rs b/src/error.rs index 588d340..f8d341c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,10 @@ pub enum DuckLakeError { #[error("Unsupported feature: {0}")] Unsupported(String), + /// ObjectStore error + #[error("ObjectStore error: {0}")] + ObjectStore(#[from] object_store::Error), + /// IO error #[error("IO error: {0}")] Io(#[from] std::io::Error), diff --git a/src/insert_exec.rs b/src/insert_exec.rs index e509972..0553051 100644 --- a/src/insert_exec.rs +++ b/src/insert_exec.rs @@ -6,12 +6,12 @@ use std::any::Any; use std::fmt::{self, Debug}; -use std::path::PathBuf; use std::sync::Arc; use arrow::array::{ArrayRef, RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -38,7 +38,8 @@ pub struct DuckLakeInsertExec { table_name: String, arrow_schema: SchemaRef, write_mode: WriteMode, - data_path: PathBuf, + object_store_url: Arc, + data_path: String, cache: PlanProperties, } @@ -51,7 +52,8 @@ impl DuckLakeInsertExec { table_name: String, arrow_schema: SchemaRef, write_mode: WriteMode, - data_path: PathBuf, + object_store_url: Arc, + data_path: String, ) -> Self { let cache = Self::compute_properties(); Self { @@ -61,6 +63,7 @@ impl DuckLakeInsertExec { table_name, arrow_schema, write_mode, + object_store_url, data_path, cache, } @@ -136,6 +139,7 @@ impl ExecutionPlan for DuckLakeInsertExec { self.table_name.clone(), Arc::clone(&self.arrow_schema), self.write_mode, + self.object_store_url.clone(), self.data_path.clone(), ))) } @@ -158,11 +162,11 @@ impl ExecutionPlan for DuckLakeInsertExec { let table_name = self.table_name.clone(); let arrow_schema = Arc::clone(&self.arrow_schema); let write_mode = self.write_mode; - let data_path = self.data_path.clone(); + let object_store_url = self.object_store_url.clone(); let output_schema = make_insert_count_schema(); let stream = stream::once(async move { - let input_stream = input.execute(0, context)?; + let input_stream = input.execute(0, Arc::clone(&context))?; let batches: Vec = input_stream.try_collect().await?; if batches.is_empty() { @@ -170,7 +174,12 @@ impl ExecutionPlan for DuckLakeInsertExec { return Ok(RecordBatch::try_new(output_schema, vec![count_array])?); } - let table_writer = DuckLakeTableWriter::new(writer) + // Get object store from runtime environment + let object_store = context + .runtime_env() + .object_store(object_store_url.as_ref())?; + + let table_writer = DuckLakeTableWriter::new(writer, object_store) .map_err(|e| DataFusionError::External(Box::new(e)))?; let schema_without_metadata = @@ -185,8 +194,6 @@ impl ExecutionPlan for DuckLakeInsertExec { ) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let _ = data_path; - for batch in &batches { session .write_batch(batch) @@ -197,6 +204,7 @@ impl ExecutionPlan for DuckLakeInsertExec { session .finish() + .await .map_err(|e| DataFusionError::External(Box::new(e)))?; let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![row_count])); diff --git a/src/schema.rs b/src/schema.rs index fd41d79..a516961 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -16,8 +16,6 @@ use crate::table::DuckLakeTable; use crate::metadata_writer::{ColumnDef, MetadataWriter, WriteMode}; #[cfg(feature = "write")] use datafusion::error::DataFusionError; -#[cfg(feature = "write")] -use std::path::PathBuf; /// Validate table name to prevent path traversal attacks. /// Table names are used to construct file paths, so we must ensure they @@ -66,7 +64,7 @@ pub struct DuckLakeSchema { writer: Option>, /// Data path for write operations (when write feature is enabled) #[cfg(feature = "write")] - data_path: Option, + data_path: Option, } impl DuckLakeSchema { @@ -102,7 +100,7 @@ impl DuckLakeSchema { /// * `writer` - Metadata writer for catalog operations /// * `data_path` - Base path for data files #[cfg(feature = "write")] - pub fn with_writer(mut self, writer: Arc, data_path: PathBuf) -> Self { + pub fn with_writer(mut self, writer: Arc, data_path: String) -> Self { self.writer = Some(writer); self.data_path = Some(data_path); self diff --git a/src/table.rs b/src/table.rs index 0522221..3c5bbf6 100644 --- a/src/table.rs +++ b/src/table.rs @@ -19,8 +19,6 @@ use crate::types::{ use crate::insert_exec::DuckLakeInsertExec; #[cfg(feature = "write")] use crate::metadata_writer::{MetadataWriter, WriteMode}; -#[cfg(feature = "write")] -use std::path::PathBuf; #[cfg(feature = "encryption")] use crate::encryption::EncryptionFactoryBuilder; @@ -99,7 +97,7 @@ pub struct DuckLakeTable { writer: Option>, /// Data path for write operations (when write feature is enabled) #[cfg(feature = "write")] - data_path: Option, + data_path: Option, } impl std::fmt::Debug for DuckLakeTable { @@ -403,7 +401,7 @@ impl DuckLakeTable { mut self, schema_name: String, writer: Arc, - data_path: PathBuf, + data_path: String, ) -> Self { self.schema_name = Some(schema_name); self.writer = Some(writer); @@ -606,6 +604,7 @@ impl TableProvider for DuckLakeTable { self.table_name.clone(), self.schema(), write_mode, + self.object_store_url.clone(), data_path.clone(), ))) } diff --git a/src/table_writer.rs b/src/table_writer.rs index 20a2b77..11f7293 100644 --- a/src/table_writer.rs +++ b/src/table_writer.rs @@ -1,36 +1,42 @@ //! High-level table writer for DuckLake catalogs. use std::collections::HashMap; -use std::fs::File; -use std::io::{Seek, SeekFrom}; -use std::path::PathBuf; use std::sync::Arc; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use object_store::path::Path as ObjectPath; +use object_store::{ObjectStore, PutPayload}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use uuid::Uuid; use crate::Result; use crate::metadata_writer::{ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteResult}; +use crate::path_resolver::join_paths; use crate::types::arrow_to_ducklake_type; /// High-level writer for DuckLake tables. #[derive(Debug)] pub struct DuckLakeTableWriter { metadata: Arc, - data_path: PathBuf, + object_store: Arc, + /// The key path portion of the data_path (e.g., "/prefix/data/") + base_key_path: String, } impl DuckLakeTableWriter { - pub fn new(metadata: Arc) -> Result { + pub fn new( + metadata: Arc, + object_store: Arc, + ) -> Result { let data_path_str = metadata.get_data_path()?; - let data_path = PathBuf::from(data_path_str); + let (_, key_path) = crate::path_resolver::parse_object_store_url(&data_path_str)?; Ok(Self { metadata, - data_path, + object_store, + base_key_path: key_path, }) } @@ -43,13 +49,13 @@ impl DuckLakeTableWriter { arrow_schema: &Schema, mode: WriteMode, ) -> Result { - let table_path = self.data_path.join(schema_name).join(table_name); + let table_key = join_paths(&join_paths(&self.base_key_path, schema_name), table_name); let file_name = format!("{}.parquet", Uuid::new_v4()); self.begin_write_internal( schema_name, table_name, arrow_schema, - table_path, + table_key, file_name.clone(), file_name, true, @@ -63,19 +69,18 @@ impl DuckLakeTableWriter { schema_name: &str, table_name: &str, arrow_schema: &Schema, - file_dir: PathBuf, + file_dir: &str, file_name: String, mode: WriteMode, ) -> Result { - let file_path = file_dir.join(&file_name); - let catalog_path = file_path.to_string_lossy().to_string(); + let full_path = join_paths(file_dir, &file_name); self.begin_write_internal( schema_name, table_name, arrow_schema, - file_dir, + file_dir.to_string(), file_name, - catalog_path, + full_path, false, mode, ) @@ -87,7 +92,7 @@ impl DuckLakeTableWriter { schema_name: &str, table_name: &str, arrow_schema: &Schema, - file_dir: PathBuf, + file_dir: String, file_name: String, catalog_path: String, path_is_relative: bool, @@ -100,23 +105,25 @@ impl DuckLakeTableWriter { let schema_with_ids = Arc::new(build_schema_with_field_ids(arrow_schema, &setup.column_ids)); - std::fs::create_dir_all(&file_dir)?; - let file_path = file_dir.join(&file_name); + let object_path_str = join_paths(&file_dir, &file_name); + // Strip leading slash for object_store Path (it expects relative keys) + let object_path = ObjectPath::from(object_path_str.trim_start_matches('/')); + let props = WriterProperties::builder() .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0) .build(); - let file = File::create(&file_path)?; - let writer = ArrowWriter::try_new(file, schema_with_ids.clone(), Some(props))?; + let writer = ArrowWriter::try_new(Vec::new(), schema_with_ids.clone(), Some(props))?; Ok(TableWriteSession { metadata: Arc::clone(&self.metadata), + object_store: Arc::clone(&self.object_store), + object_path, snapshot_id: setup.snapshot_id, schema_id: setup.schema_id, table_id: setup.table_id, column_ids: setup.column_ids, schema_with_ids, writer: Some(writer), - file_path, catalog_path, path_is_relative, row_count: 0, @@ -124,7 +131,7 @@ impl DuckLakeTableWriter { } /// Write batches to a table, replacing any existing data. - pub fn write_table( + pub async fn write_table( &self, schema_name: &str, table_name: &str, @@ -144,11 +151,11 @@ impl DuckLakeTableWriter { session.write_batch(batch)?; } - session.finish() + session.finish().await } /// Write batches to a table, appending to existing data. - pub fn append_table( + pub async fn append_table( &self, schema_name: &str, table_name: &str, @@ -168,22 +175,23 @@ impl DuckLakeTableWriter { session.write_batch(batch)?; } - session.finish() + session.finish().await } } -/// Streaming write session. Dropped sessions clean up orphaned files automatically. +/// Streaming write session. Buffer is dropped if not finished (no data uploaded). #[derive(Debug)] pub struct TableWriteSession { metadata: Arc, + object_store: Arc, + object_path: ObjectPath, snapshot_id: i64, schema_id: i64, table_id: i64, #[allow(dead_code)] column_ids: Vec, schema_with_ids: SchemaRef, - writer: Option>, - file_path: PathBuf, + writer: Option>>, /// Path to register in catalog (may be relative filename or absolute path) catalog_path: String, /// Whether the catalog_path is relative to table path @@ -246,18 +254,24 @@ impl TableWriteSession { self.snapshot_id } - pub fn file_path(&self) -> &std::path::Path { - &self.file_path + /// Returns the object path that will be written to + pub fn file_path(&self) -> &str { + self.object_path.as_ref() } - pub fn finish(mut self) -> Result { + pub async fn finish(mut self) -> Result { let writer = self.writer.take().ok_or_else(|| { crate::error::DuckLakeError::Internal("Writer already closed".to_string()) })?; - writer.close()?; + let buffer = writer.into_inner()?; - let file_size = std::fs::metadata(&self.file_path)?.len() as i64; - let footer_size = calculate_footer_size(&self.file_path)?; + let file_size = buffer.len() as i64; + let footer_size = calculate_footer_size_from_bytes(&buffer)?; + + // Upload via object_store + self.object_store + .put(&self.object_path, PutPayload::from(buffer)) + .await?; let mut file_info = DataFileInfo::new(&self.catalog_path, file_size, self.row_count) .with_footer_size(footer_size); @@ -277,13 +291,7 @@ impl TableWriteSession { } } -impl Drop for TableWriteSession { - fn drop(&mut self) { - if self.writer.is_some() { - let _ = std::fs::remove_file(&self.file_path); - } - } -} +// Drop is a no-op: buffer is simply dropped, nothing was uploaded to the store. fn arrow_schema_to_column_defs(schema: &Schema) -> Result> { schema @@ -316,18 +324,14 @@ fn build_schema_with_field_ids(schema: &Schema, column_ids: &[i64]) -> Schema { Schema::new_with_metadata(fields, schema.metadata().clone()) } -fn calculate_footer_size(path: &std::path::Path) -> Result { - let mut file = File::open(path)?; - let file_size = file.metadata()?.len(); - if file_size < 8 { +fn calculate_footer_size_from_bytes(buffer: &[u8]) -> Result { + if buffer.len() < 8 { return Err(crate::error::DuckLakeError::Internal( "Invalid Parquet file: too small".to_string(), )); } - file.seek(SeekFrom::End(-8))?; - let mut footer_bytes = [0u8; 8]; - std::io::Read::read_exact(&mut file, &mut footer_bytes)?; + let footer_bytes = &buffer[buffer.len() - 8..]; if &footer_bytes[4..8] != b"PAR1" { return Err(crate::error::DuckLakeError::Internal( @@ -341,48 +345,6 @@ fn calculate_footer_size(path: &std::path::Path) -> Result { Ok(metadata_len + 8) } -#[allow(dead_code)] -fn write_parquet_with_field_ids( - path: &std::path::Path, - batches: &[RecordBatch], - column_ids: &[i64], -) -> Result<(i64, i64, i64)> { - let original_schema = batches[0].schema(); - - // Build schema with field_id metadata - let schema_with_ids = build_schema_with_field_ids(&original_schema, column_ids); - let schema_ref: SchemaRef = Arc::new(schema_with_ids); - - // Create writer properties - let props = WriterProperties::builder() - .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0) - .build(); - - // Create file and writer - let file = File::create(path)?; - let mut writer = ArrowWriter::try_new(file, schema_ref.clone(), Some(props))?; - - // Write all batches - let mut total_rows = 0i64; - for batch in batches { - // Remap batch to use schema with field_ids - let batch_with_ids = RecordBatch::try_new(schema_ref.clone(), batch.columns().to_vec())?; - writer.write(&batch_with_ids)?; - total_rows += batch.num_rows() as i64; - } - - // Close writer - let _ = writer.close()?; - - // Get file size - let file_size = std::fs::metadata(path)?.len() as i64; - - // Calculate actual footer size - let footer_size = calculate_footer_size(path)?; - - Ok((file_size, footer_size, total_rows)) -} - #[cfg(test)] mod tests { use super::*; @@ -431,14 +393,14 @@ mod tests { } #[test] - fn test_write_parquet_with_field_ids() { + fn test_write_parquet_to_buffer_with_field_ids() { let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, true), ])); let batch = RecordBatch::try_new( - schema, + schema.clone(), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(StringArray::from(vec!["a", "b", "c"])), @@ -446,22 +408,30 @@ mod tests { ) .unwrap(); - let temp_dir = tempfile::TempDir::new().unwrap(); - let file_path = temp_dir.path().join("test.parquet"); - let column_ids = vec![10, 20]; - let (file_size, footer_size, record_count) = - write_parquet_with_field_ids(&file_path, &[batch], &column_ids).unwrap(); + let schema_with_ids = Arc::new(build_schema_with_field_ids(&schema, &column_ids)); + + let props = WriterProperties::builder() + .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0) + .build(); + let mut writer = + ArrowWriter::try_new(Vec::new(), schema_with_ids.clone(), Some(props)).unwrap(); + + let batch_with_ids = + RecordBatch::try_new(schema_with_ids, batch.columns().to_vec()).unwrap(); + writer.write(&batch_with_ids).unwrap(); + let buffer = writer.into_inner().unwrap(); + + let file_size = buffer.len() as i64; + let footer_size = calculate_footer_size_from_bytes(&buffer).unwrap(); assert!(file_size > 0); assert!(footer_size > 0); - assert!(footer_size < file_size); // Footer should be smaller than file - assert_eq!(record_count, 3); + assert!(footer_size < file_size); - // Verify field_ids by reading back + // Verify field_ids by reading from buffer use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - let file = File::open(&file_path).unwrap(); - let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer)).unwrap(); let metadata = reader.metadata(); let schema_descr = metadata.file_metadata().schema_descr(); @@ -473,22 +443,28 @@ mod tests { } #[test] - fn test_calculate_footer_size() { + fn test_calculate_footer_size_from_bytes() { let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); - let temp_dir = tempfile::TempDir::new().unwrap(); - let file_path = temp_dir.path().join("footer_test.parquet"); + let props = WriterProperties::builder() + .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0) + .build(); + let schema_with_ids = Arc::new(build_schema_with_field_ids(&batch.schema(), &[1])); + let mut writer = + ArrowWriter::try_new(Vec::new(), schema_with_ids.clone(), Some(props)).unwrap(); - let column_ids = vec![1]; - write_parquet_with_field_ids(&file_path, &[batch], &column_ids).unwrap(); + let batch_with_ids = + RecordBatch::try_new(schema_with_ids, batch.columns().to_vec()).unwrap(); + writer.write(&batch_with_ids).unwrap(); + let buffer = writer.into_inner().unwrap(); - let footer_size = calculate_footer_size(&file_path).unwrap(); + let footer_size = calculate_footer_size_from_bytes(&buffer).unwrap(); // Footer should be reasonable size (metadata + 8 bytes) assert!(footer_size >= 8); - assert!(footer_size < 10000); // Shouldn't be huge for a simple file + assert!(footer_size < 10000); } } diff --git a/tests/concurrent_write_tests.rs b/tests/concurrent_write_tests.rs index c0191f2..73ddb67 100644 --- a/tests/concurrent_write_tests.rs +++ b/tests/concurrent_write_tests.rs @@ -8,8 +8,13 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion_ducklake::metadata_writer::MetadataWriter; use datafusion_ducklake::{DuckLakeTableWriter, SqliteMetadataWriter, WriteMode}; +use object_store::local::LocalFileSystem; use tempfile::TempDir; +fn create_object_store() -> Arc { + Arc::new(LocalFileSystem::new()) +} + async fn create_test_writer(temp_dir: &TempDir) -> (SqliteMetadataWriter, std::path::PathBuf) { let db_path = temp_dir.path().join("test.db"); let data_path = temp_dir.path().join("data"); @@ -82,9 +87,12 @@ async fn test_concurrent_writes_different_tables() { let table_name = format!("table_{}", i); let task = tokio::spawn(async move { - let table_writer = DuckLakeTableWriter::new(writer_clone).unwrap(); + let table_writer = + DuckLakeTableWriter::new(writer_clone, create_object_store()).unwrap(); let batch = create_user_batch(&[i], &[&format!("user_{}", i)]); - let result = table_writer.write_table("main", &table_name, &[batch]); + let result = table_writer + .write_table("main", &table_name, &[batch]) + .await; (i, table_name, result) }); @@ -116,10 +124,12 @@ async fn test_concurrent_writes_same_table_append() { let writer: Arc = Arc::new(writer); { - let table_writer = DuckLakeTableWriter::new(Arc::clone(&writer)).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); let batch = create_user_batch(&[0], &["initial"]); table_writer .write_table("main", "shared_table", &[batch]) + .await .unwrap(); } @@ -127,9 +137,12 @@ async fn test_concurrent_writes_same_table_append() { for i in 1..=10 { let writer_clone = Arc::clone(&writer); let task = tokio::spawn(async move { - let table_writer = DuckLakeTableWriter::new(writer_clone).unwrap(); + let table_writer = + DuckLakeTableWriter::new(writer_clone, create_object_store()).unwrap(); let batch = create_user_batch(&[i], &[&format!("user_{}", i)]); - table_writer.append_table("main", "shared_table", &[batch]) + table_writer + .append_table("main", "shared_table", &[batch]) + .await }); tasks.push(task); } @@ -147,33 +160,40 @@ async fn test_write_session_cleanup_on_drop() { let writer: Arc = Arc::new(writer); let schema = create_user_schema(); - // Dropped session should clean up orphaned file - let file_path = { - let table_writer = DuckLakeTableWriter::new(Arc::clone(&writer)).unwrap(); + // Dropped session should NOT upload data (buffer is just dropped) + let file_path_str = { + let table_writer = + DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); let mut session = table_writer .begin_write("main", "dropped_table", &schema, WriteMode::Replace) .unwrap(); let batch = create_user_batch(&[1, 2, 3], &["a", "b", "c"]); session.write_batch(&batch).unwrap(); - session.file_path().to_path_buf() + session.file_path().to_string() }; + // With buffer approach, no file is created until finish() is called + let path = std::path::Path::new(&file_path_str); assert!( - !file_path.exists(), - "Orphaned file should be deleted on drop" + !path.exists(), + "No file should exist since session was dropped without finish()" ); - // Finished session should keep file - let finished_path = { - let table_writer = DuckLakeTableWriter::new(Arc::clone(&writer)).unwrap(); + // Finished session should upload and keep file + let finished_path_str = { + let table_writer = + DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); let mut session = table_writer .begin_write("main", "finished_table", &schema, WriteMode::Replace) .unwrap(); let batch = create_user_batch(&[1, 2, 3], &["a", "b", "c"]); session.write_batch(&batch).unwrap(); - let path = session.file_path().to_path_buf(); - session.finish().unwrap(); - path + let p = session.file_path().to_string(); + session.finish().await.unwrap(); + p }; + // LocalFileSystem stores files at /, so prepend / + let finished_fs_path = format!("/{}", finished_path_str); + let finished_path = std::path::Path::new(&finished_fs_path); assert!(finished_path.exists(), "Finished file should exist"); } @@ -184,7 +204,8 @@ async fn test_write_batch_schema_validation() { let writer: Arc = Arc::new(writer); let schema = create_user_schema(); - let table_writer = DuckLakeTableWriter::new(Arc::clone(&writer)).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); let mut session = table_writer .begin_write("main", "validation_test", &schema, WriteMode::Replace) .unwrap(); @@ -234,11 +255,13 @@ async fn test_transaction_atomicity() { let initial_snapshot = writer.create_snapshot().unwrap(); let writer: Arc = Arc::new(writer); - let table_writer = DuckLakeTableWriter::new(Arc::clone(&writer)).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); let batch = create_user_batch(&[1], &["test"]); let result = table_writer .write_table("main", "atomic_test", &[batch]) + .await .unwrap(); assert!(result.snapshot_id > initial_snapshot); } @@ -253,10 +276,13 @@ async fn test_stress_concurrent_writes() { for i in 0..50 { let writer_clone = Arc::clone(&writer); let task = tokio::spawn(async move { - let table_writer = DuckLakeTableWriter::new(writer_clone).unwrap(); + let table_writer = + DuckLakeTableWriter::new(writer_clone, create_object_store()).unwrap(); let batch = create_user_batch(&[i], &[&format!("stress_{}", i)]); let table_name = format!("stress_table_{}", i % 10); - table_writer.append_table("main", &table_name, &[batch]) + table_writer + .append_table("main", &table_name, &[batch]) + .await }); tasks.push(task); } diff --git a/tests/sql_write_tests.rs b/tests/sql_write_tests.rs index 5e507a1..622d71f 100644 --- a/tests/sql_write_tests.rs +++ b/tests/sql_write_tests.rs @@ -11,12 +11,18 @@ use arrow::array::{Array, Int32Array, Int64Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion::prelude::*; +use object_store::local::LocalFileSystem; use tempfile::TempDir; use datafusion_ducklake::{ DuckLakeCatalog, MetadataWriter, SqliteMetadataProvider, SqliteMetadataWriter, }; +/// Create a local filesystem object store +fn create_object_store() -> Arc { + Arc::new(LocalFileSystem::new()) +} + /// Helper to create a test environment with a writable catalog async fn create_writable_catalog() -> (SessionContext, TempDir) { let temp_dir = TempDir::new().unwrap(); @@ -112,6 +118,7 @@ async fn test_create_table_as_select() { #[tokio::test(flavor = "multi_thread")] async fn test_insert_into_existing_table() { let (ctx, temp_dir) = create_writable_catalog().await; + let object_store = create_object_store(); // First create a table using the lower-level API let db_path = temp_dir.path().join("test.db"); @@ -130,9 +137,11 @@ async fn test_insert_into_existing_table() { .unwrap(); // Write initial data using table writer - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "values_table", &[batch]) + .await .unwrap(); // Now try INSERT INTO with SQL @@ -222,6 +231,7 @@ async fn test_insert_into_read_only_fails() { std::fs::create_dir_all(&data_path).unwrap(); let conn_str = format!("sqlite:{}?mode=rwc", db_path.display()); + let object_store = create_object_store(); // Initialize the database let writer = SqliteMetadataWriter::new_with_init(&conn_str) @@ -234,9 +244,11 @@ async fn test_insert_into_read_only_fails() { let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "readonly_test", &[batch]) + .await .unwrap(); // Create read-only catalog (no writer) @@ -292,6 +304,7 @@ async fn test_insert_into_read_only_fails() { #[tokio::test(flavor = "multi_thread")] async fn test_insert_overwrite() { let (_ctx, temp_dir) = create_writable_catalog().await; + let object_store = create_object_store(); // Create initial table let db_path = temp_dir.path().join("test.db"); @@ -312,9 +325,11 @@ async fn test_insert_overwrite() { ) .unwrap(); - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "overwrite_test", &[batch]) + .await .unwrap(); // Recreate context with fresh catalog @@ -374,6 +389,7 @@ async fn test_insert_overwrite() { #[tokio::test(flavor = "multi_thread")] async fn test_sql_insert_values() { let (_ctx, temp_dir) = create_writable_catalog().await; + let object_store = create_object_store(); // Create initial table let db_path = temp_dir.path().join("test.db"); @@ -391,9 +407,11 @@ async fn test_sql_insert_values() { ) .unwrap(); - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "values_test", &[batch]) + .await .unwrap(); // Recreate context with fresh catalog @@ -442,6 +460,7 @@ async fn test_sql_insert_values() { #[tokio::test(flavor = "multi_thread")] async fn test_schema_evolution_via_sql() { let (_ctx, temp_dir) = create_writable_catalog().await; + let object_store = create_object_store(); // Create initial table with 2 columns let db_path = temp_dir.path().join("test.db"); @@ -462,9 +481,11 @@ async fn test_schema_evolution_via_sql() { ) .unwrap(); - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "evolve_table", &[batch]) + .await .unwrap(); // Recreate context @@ -538,6 +559,7 @@ async fn test_schema_evolution_via_sql() { #[tokio::test(flavor = "multi_thread")] async fn test_insert_from_query_with_filter() { let (_ctx, temp_dir) = create_writable_catalog().await; + let object_store = create_object_store(); // Create target table with initial data let db_path = temp_dir.path().join("test.db"); @@ -556,9 +578,11 @@ async fn test_insert_from_query_with_filter() { ) .unwrap(); - let table_writer = datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = + datafusion_ducklake::DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "filtered_users", &[batch]) + .await .unwrap(); // Recreate context diff --git a/tests/write_tests.rs b/tests/write_tests.rs index cf3612a..64a6b72 100644 --- a/tests/write_tests.rs +++ b/tests/write_tests.rs @@ -14,6 +14,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use datafusion::prelude::*; +use object_store::local::LocalFileSystem; use tempfile::TempDir; use datafusion_ducklake::{ @@ -21,6 +22,11 @@ use datafusion_ducklake::{ SqliteMetadataWriter, WriteMode, }; +/// Create a local filesystem object store +fn create_object_store() -> Arc { + Arc::new(LocalFileSystem::new()) +} + /// Helper to create a test environment with writer and data directory async fn create_test_env() -> (SqliteMetadataWriter, TempDir) { let temp_dir = TempDir::new().unwrap(); @@ -53,6 +59,7 @@ async fn create_read_context(temp_dir: &TempDir) -> SessionContext { #[tokio::test(flavor = "multi_thread")] async fn test_write_and_read_basic_types() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Create test data with various types let schema = Arc::new(Schema::new(vec![ @@ -80,8 +87,11 @@ async fn test_write_and_read_basic_types() { .unwrap(); // Write data - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer.write_table("main", "users", &[batch]).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); + let result = table_writer + .write_table("main", "users", &[batch]) + .await + .unwrap(); assert_eq!(result.records_written, 3); assert_eq!(result.files_written, 1); @@ -122,6 +132,7 @@ async fn test_write_and_read_basic_types() { #[tokio::test(flavor = "multi_thread")] async fn test_write_temporal_types() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -146,9 +157,10 @@ async fn test_write_temporal_types() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); let result = table_writer .write_table("main", "events", &[batch]) + .await .unwrap(); assert_eq!(result.records_written, 2); @@ -171,6 +183,7 @@ async fn test_write_temporal_types() { #[tokio::test(flavor = "multi_thread")] async fn test_write_multiple_batches() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -189,9 +202,10 @@ async fn test_write_multiple_batches() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); let result = table_writer .write_table("main", "data", &[batch1, batch2]) + .await .unwrap(); assert_eq!(result.records_written, 4); @@ -214,6 +228,7 @@ async fn test_write_multiple_batches() { #[tokio::test(flavor = "multi_thread")] async fn test_replace_semantics() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -230,9 +245,11 @@ async fn test_replace_semantics() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "replace_test", &[batch1]) + .await .unwrap(); // Write replacement data @@ -242,9 +259,11 @@ async fn test_replace_semantics() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); let result = table_writer2 .write_table("main", "replace_test", &[batch2]) + .await .unwrap(); assert_eq!(result.records_written, 2); @@ -269,6 +288,7 @@ async fn test_replace_semantics() { #[tokio::test(flavor = "multi_thread")] async fn test_append_semantics() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -282,9 +302,11 @@ async fn test_append_semantics() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "append_test", &[batch1]) + .await .unwrap(); // Append more data @@ -294,9 +316,11 @@ async fn test_append_semantics() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); let result = table_writer2 .append_table("main", "append_test", &[batch2]) + .await .unwrap(); assert_eq!(result.records_written, 2); @@ -320,6 +344,7 @@ async fn test_append_semantics() { #[tokio::test(flavor = "multi_thread")] async fn test_multiple_tables_same_schema() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -338,9 +363,15 @@ async fn test_multiple_tables_same_schema() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - table_writer.write_table("main", "t1", &[batch1]).unwrap(); - table_writer.write_table("main", "t2", &[batch2]).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); + table_writer + .write_table("main", "t1", &[batch1]) + .await + .unwrap(); + table_writer + .write_table("main", "t2", &[batch2]) + .await + .unwrap(); // Read back both tables let ctx = create_read_context(&temp_dir).await; @@ -367,6 +398,7 @@ async fn test_multiple_tables_same_schema() { #[tokio::test(flavor = "multi_thread")] async fn test_field_ids_preserved_on_roundtrip() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("col_a", DataType::Int32, false), @@ -379,9 +411,10 @@ async fn test_field_ids_preserved_on_roundtrip() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); table_writer .write_table("main", "field_id_test", &[batch]) + .await .unwrap(); // Find the Parquet file and verify field_ids @@ -424,6 +457,7 @@ async fn test_field_ids_preserved_on_roundtrip() { #[tokio::test(flavor = "multi_thread")] async fn test_streaming_write_api() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -431,7 +465,7 @@ async fn test_streaming_write_api() { ])); // Use streaming API - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); let mut session = table_writer .begin_write("main", "streaming_test", &schema, WriteMode::Replace) .unwrap(); @@ -454,7 +488,7 @@ async fn test_streaming_write_api() { assert_eq!(session.row_count(), 6); // 3 batches * 2 rows - let result = session.finish().unwrap(); + let result = session.finish().await.unwrap(); assert_eq!(result.records_written, 6); assert_eq!(result.files_written, 1); @@ -477,20 +511,22 @@ async fn test_streaming_write_api() { #[tokio::test(flavor = "multi_thread")] async fn test_streaming_write_to_custom_path() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); // Use custom path (simulating external storage manager) let custom_dir = temp_dir.path().join("data").join("custom").join("location"); + let custom_dir_str = custom_dir.to_str().unwrap().to_string(); let file_name = "my_data.parquet".to_string(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); let mut session = table_writer .begin_write_to_path( "main", "custom_path_test", &schema, - custom_dir.clone(), + &custom_dir_str, file_name.clone(), WriteMode::Replace, ) @@ -503,10 +539,7 @@ async fn test_streaming_write_to_custom_path() { .unwrap(); session.write_batch(&batch).unwrap(); - // Verify file path before finishing - assert_eq!(session.file_path(), custom_dir.join(&file_name)); - - let result = session.finish().unwrap(); + let result = session.finish().await.unwrap(); assert_eq!(result.records_written, 3); // Verify file exists at custom path @@ -531,16 +564,17 @@ async fn test_streaming_write_to_custom_path() { #[tokio::test(flavor = "multi_thread")] async fn test_streaming_empty_write() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); + let table_writer = DuckLakeTableWriter::new(Arc::new(writer), object_store).unwrap(); let session = table_writer .begin_write("main", "empty_test", &schema, WriteMode::Replace) .unwrap(); // Finish without writing any batches - let result = session.finish().unwrap(); + let result = session.finish().await.unwrap(); assert_eq!(result.records_written, 0); assert_eq!(result.files_written, 1); @@ -565,6 +599,7 @@ async fn test_streaming_empty_write() { #[tokio::test(flavor = "multi_thread")] async fn test_append_add_nullable_column() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Initial schema with 2 columns let schema1 = Arc::new(Schema::new(vec![ @@ -581,9 +616,11 @@ async fn test_append_add_nullable_column() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "evolve_add", &[batch1]) + .await .unwrap(); // Append with new nullable column @@ -603,8 +640,11 @@ async fn test_append_add_nullable_column() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer2.append_table("main", "evolve_add", &[batch2]); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); + let result = table_writer2 + .append_table("main", "evolve_add", &[batch2]) + .await; assert!(result.is_ok(), "Adding nullable column should succeed"); assert_eq!(result.unwrap().records_written, 2); @@ -627,6 +667,7 @@ async fn test_append_add_nullable_column() { #[tokio::test(flavor = "multi_thread")] async fn test_append_remove_column() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Initial schema with 3 columns let schema1 = Arc::new(Schema::new(vec![ @@ -645,9 +686,11 @@ async fn test_append_remove_column() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "evolve_remove", &[batch1]) + .await .unwrap(); // Append without the 'extra' column @@ -665,8 +708,11 @@ async fn test_append_remove_column() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer2.append_table("main", "evolve_remove", &[batch2]); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); + let result = table_writer2 + .append_table("main", "evolve_remove", &[batch2]) + .await; assert!(result.is_ok(), "Removing column should succeed"); assert_eq!(result.unwrap().records_written, 2); @@ -689,6 +735,7 @@ async fn test_append_remove_column() { #[tokio::test(flavor = "multi_thread")] async fn test_append_type_mismatch_fails() { let (writer, _temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Initial schema let schema1 = Arc::new(Schema::new(vec![ @@ -702,9 +749,11 @@ async fn test_append_type_mismatch_fails() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "evolve_type", &[batch1]) + .await .unwrap(); // Try to append with different type for 'value' @@ -719,8 +768,11 @@ async fn test_append_type_mismatch_fails() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer2.append_table("main", "evolve_type", &[batch2]); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); + let result = table_writer2 + .append_table("main", "evolve_type", &[batch2]) + .await; assert!(result.is_err(), "Type mismatch should fail"); let err = result.unwrap_err().to_string(); assert!( @@ -733,6 +785,7 @@ async fn test_append_type_mismatch_fails() { #[tokio::test(flavor = "multi_thread")] async fn test_append_non_nullable_column_fails() { let (writer, _temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Initial schema let schema1 = Arc::new(Schema::new(vec![ @@ -749,9 +802,11 @@ async fn test_append_non_nullable_column_fails() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "evolve_nonnull", &[batch1]) + .await .unwrap(); // Try to add a non-nullable column @@ -771,8 +826,11 @@ async fn test_append_non_nullable_column_fails() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer2.append_table("main", "evolve_nonnull", &[batch2]); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); + let result = table_writer2 + .append_table("main", "evolve_nonnull", &[batch2]) + .await; assert!(result.is_err(), "Adding non-nullable column should fail"); let err = result.unwrap_err().to_string(); assert!( @@ -785,6 +843,7 @@ async fn test_append_non_nullable_column_fails() { #[tokio::test(flavor = "multi_thread")] async fn test_append_reorder_columns() { let (writer, temp_dir) = create_test_env().await; + let object_store = create_object_store(); // Initial schema: id, name, value let schema1 = Arc::new(Schema::new(vec![ @@ -803,9 +862,11 @@ async fn test_append_reorder_columns() { ) .unwrap(); - let table_writer = DuckLakeTableWriter::new(Arc::new(writer.clone())).unwrap(); + let table_writer = + DuckLakeTableWriter::new(Arc::new(writer.clone()), Arc::clone(&object_store)).unwrap(); table_writer .write_table("main", "evolve_reorder", &[batch1]) + .await .unwrap(); // Append with reordered columns: value, id, name @@ -825,8 +886,11 @@ async fn test_append_reorder_columns() { ) .unwrap(); - let table_writer2 = DuckLakeTableWriter::new(Arc::new(writer)).unwrap(); - let result = table_writer2.append_table("main", "evolve_reorder", &[batch2]); + let table_writer2 = + DuckLakeTableWriter::new(Arc::new(writer), Arc::clone(&object_store)).unwrap(); + let result = table_writer2 + .append_table("main", "evolve_reorder", &[batch2]) + .await; assert!(result.is_ok(), "Reordering columns should succeed"); assert_eq!(result.unwrap().records_written, 2); From 380dcfab7ce18258af2aeecfcd3bfdb8225d50f0 Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Fri, 13 Feb 2026 13:52:04 +0530 Subject: [PATCH 2/4] Minor fix --- Cargo.toml | 1 - src/table_writer.rs | 12 ------------ 2 files changed, 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0b02a71..4be5c86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,6 @@ duckdb = { version = "1.4.1", features = ["bundled"], optional = true } sqlx = { version = "0.8", features = ["runtime-tokio"], optional = true } [dev-dependencies] -bytes = "1" testcontainers = "0.23" testcontainers-modules = { version = "0.11", features = ["minio", "postgres", "mysql"] } tempfile = "3.14" diff --git a/src/table_writer.rs b/src/table_writer.rs index 11f7293..e3d3b15 100644 --- a/src/table_writer.rs +++ b/src/table_writer.rs @@ -428,18 +428,6 @@ mod tests { assert!(file_size > 0); assert!(footer_size > 0); assert!(footer_size < file_size); - - // Verify field_ids by reading from buffer - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - let reader = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer)).unwrap(); - let metadata = reader.metadata(); - - let schema_descr = metadata.file_metadata().schema_descr(); - for i in 0..schema_descr.num_columns() { - let column = schema_descr.column(i); - let basic_info = column.self_type().get_basic_info(); - assert!(basic_info.has_id(), "Column {} should have field_id", i); - } } #[test] From 595a32c02bd2dd88d89e2f9374850883a1782f91 Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Fri, 13 Feb 2026 14:53:00 +0530 Subject: [PATCH 3/4] Some more optimizations --- src/catalog.rs | 5 +---- src/insert_exec.rs | 5 ----- src/table.rs | 19 +------------------ tests/concurrent_write_tests.rs | 22 ++++++++++++---------- 4 files changed, 14 insertions(+), 37 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index 16ce3d5..86a0ffd 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -20,8 +20,6 @@ use crate::metadata_writer::MetadataWriter; struct WriteConfig { /// Metadata writer for catalog operations writer: Arc, - /// Base data path for writing files - data_path: String, } /// DuckLake catalog provider @@ -123,7 +121,6 @@ impl DuckLakeCatalog { catalog_path, write_config: Some(WriteConfig { writer, - data_path: data_path_str, }), }) } @@ -197,7 +194,7 @@ impl CatalogProvider for DuckLakeCatalog { // Configure writer if this catalog is writable #[cfg(feature = "write")] let schema = if let Some(ref config) = self.write_config { - schema.with_writer(Arc::clone(&config.writer), config.data_path.clone()) + schema.with_writer(Arc::clone(&config.writer)) } else { schema }; diff --git a/src/insert_exec.rs b/src/insert_exec.rs index 0553051..f248759 100644 --- a/src/insert_exec.rs +++ b/src/insert_exec.rs @@ -39,7 +39,6 @@ pub struct DuckLakeInsertExec { arrow_schema: SchemaRef, write_mode: WriteMode, object_store_url: Arc, - data_path: String, cache: PlanProperties, } @@ -53,7 +52,6 @@ impl DuckLakeInsertExec { arrow_schema: SchemaRef, write_mode: WriteMode, object_store_url: Arc, - data_path: String, ) -> Self { let cache = Self::compute_properties(); Self { @@ -64,7 +62,6 @@ impl DuckLakeInsertExec { arrow_schema, write_mode, object_store_url, - data_path, cache, } } @@ -85,7 +82,6 @@ impl Debug for DuckLakeInsertExec { .field("schema_name", &self.schema_name) .field("table_name", &self.table_name) .field("write_mode", &self.write_mode) - .field("data_path", &self.data_path) .finish_non_exhaustive() } } @@ -140,7 +136,6 @@ impl ExecutionPlan for DuckLakeInsertExec { Arc::clone(&self.arrow_schema), self.write_mode, self.object_store_url.clone(), - self.data_path.clone(), ))) } diff --git a/src/table.rs b/src/table.rs index 3c5bbf6..7e3c9ca 100644 --- a/src/table.rs +++ b/src/table.rs @@ -95,9 +95,6 @@ pub struct DuckLakeTable { /// Metadata writer for write operations (when write feature is enabled) #[cfg(feature = "write")] writer: Option>, - /// Data path for write operations (when write feature is enabled) - #[cfg(feature = "write")] - data_path: Option, } impl std::fmt::Debug for DuckLakeTable { @@ -172,8 +169,6 @@ impl DuckLakeTable { schema_name: None, #[cfg(feature = "write")] writer: None, - #[cfg(feature = "write")] - data_path: None, }) } @@ -395,17 +390,10 @@ impl DuckLakeTable { /// # Arguments /// * `schema_name` - Name of the schema this table belongs to /// * `writer` - Metadata writer for catalog operations - /// * `data_path` - Base path for data files #[cfg(feature = "write")] - pub fn with_writer( - mut self, - schema_name: String, - writer: Arc, - data_path: String, - ) -> Self { + pub fn with_writer(mut self, schema_name: String, writer: Arc) -> Self { self.schema_name = Some(schema_name); self.writer = Some(writer); - self.data_path = Some(data_path); self } @@ -588,10 +576,6 @@ impl TableProvider for DuckLakeTable { DataFusionError::Internal("Schema name not set for writable table".to_string()) })?; - let data_path = self.data_path.as_ref().ok_or_else(|| { - DataFusionError::Internal("Data path not set for writable table".to_string()) - })?; - let write_mode = match insert_op { InsertOp::Append => WriteMode::Append, InsertOp::Overwrite | InsertOp::Replace => WriteMode::Replace, @@ -605,7 +589,6 @@ impl TableProvider for DuckLakeTable { self.schema(), write_mode, self.object_store_url.clone(), - data_path.clone(), ))) } } diff --git a/tests/concurrent_write_tests.rs b/tests/concurrent_write_tests.rs index 73ddb67..0099e74 100644 --- a/tests/concurrent_write_tests.rs +++ b/tests/concurrent_write_tests.rs @@ -159,11 +159,12 @@ async fn test_write_session_cleanup_on_drop() { let (writer, _): (SqliteMetadataWriter, _) = create_test_writer(&temp_dir).await; let writer: Arc = Arc::new(writer); let schema = create_user_schema(); + let object_store = create_object_store(); // Dropped session should NOT upload data (buffer is just dropped) let file_path_str = { let table_writer = - DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); + DuckLakeTableWriter::new(Arc::clone(&writer), Arc::clone(&object_store)).unwrap(); let mut session = table_writer .begin_write("main", "dropped_table", &schema, WriteMode::Replace) .unwrap(); @@ -172,16 +173,16 @@ async fn test_write_session_cleanup_on_drop() { session.file_path().to_string() }; // With buffer approach, no file is created until finish() is called - let path = std::path::Path::new(&file_path_str); + let dropped_path = object_store::path::Path::from(file_path_str); assert!( - !path.exists(), - "No file should exist since session was dropped without finish()" + object_store.get(&dropped_path).await.is_err(), + "No object should exist since session was dropped without finish()" ); - // Finished session should upload and keep file + // Finished session should upload the file let finished_path_str = { let table_writer = - DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap(); + DuckLakeTableWriter::new(Arc::clone(&writer), Arc::clone(&object_store)).unwrap(); let mut session = table_writer .begin_write("main", "finished_table", &schema, WriteMode::Replace) .unwrap(); @@ -191,10 +192,11 @@ async fn test_write_session_cleanup_on_drop() { session.finish().await.unwrap(); p }; - // LocalFileSystem stores files at /, so prepend / - let finished_fs_path = format!("/{}", finished_path_str); - let finished_path = std::path::Path::new(&finished_fs_path); - assert!(finished_path.exists(), "Finished file should exist"); + let finished_path = object_store::path::Path::from(finished_path_str); + assert!( + object_store.get(&finished_path).await.is_ok(), + "Finished file should exist in object store" + ); } #[tokio::test(flavor = "multi_thread")] From bc1f4e85cd53850a22843d6a695afa0ecf5f694b Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Fri, 13 Feb 2026 15:53:43 +0530 Subject: [PATCH 4/4] Fixing an issue in schema --- src/schema.rs | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/src/schema.rs b/src/schema.rs index a516961..460598a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -62,9 +62,6 @@ pub struct DuckLakeSchema { /// Metadata writer for write operations (when write feature is enabled) #[cfg(feature = "write")] writer: Option>, - /// Data path for write operations (when write feature is enabled) - #[cfg(feature = "write")] - data_path: Option, } impl DuckLakeSchema { @@ -86,23 +83,19 @@ impl DuckLakeSchema { schema_path, #[cfg(feature = "write")] writer: None, - #[cfg(feature = "write")] - data_path: None, } } /// Configure this schema for write operations. /// - /// This method enables write support by attaching a metadata writer and data path. + /// This method enables write support by attaching a metadata writer. /// Once configured, the schema can handle CREATE TABLE AS and tables can handle INSERT INTO. /// /// # Arguments /// * `writer` - Metadata writer for catalog operations - /// * `data_path` - Base path for data files #[cfg(feature = "write")] - pub fn with_writer(mut self, writer: Arc, data_path: String) -> Self { + pub fn with_writer(mut self, writer: Arc) -> Self { self.writer = Some(writer); - self.data_path = Some(data_path); self } } @@ -155,14 +148,8 @@ impl SchemaProvider for DuckLakeSchema { // Configure writer if this schema is writable #[cfg(feature = "write")] - let table = if let (Some(writer), Some(data_path)) = - (self.writer.as_ref(), self.data_path.as_ref()) - { - table.with_writer( - self.schema_name.clone(), - Arc::clone(writer), - data_path.clone(), - ) + let table = if let Some(writer) = self.writer.as_ref() { + table.with_writer(self.schema_name.clone(), Arc::clone(writer)) } else { table }; @@ -201,10 +188,6 @@ impl SchemaProvider for DuckLakeSchema { ) })?; - let data_path = self.data_path.as_ref().ok_or_else(|| { - DataFusionError::Internal("Data path not set for writable schema".to_string()) - })?; - // Convert Arrow schema to ColumnDefs let arrow_schema = table.schema(); let columns: Vec = arrow_schema @@ -234,11 +217,7 @@ impl SchemaProvider for DuckLakeSchema { table_path, ) .map_err(|e| DataFusionError::External(Box::new(e)))? - .with_writer( - self.schema_name.clone(), - Arc::clone(writer), - data_path.clone(), - ); + .with_writer(self.schema_name.clone(), Arc::clone(writer)); Ok(Some(Arc::new(writable_table) as Arc)) }