Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl;

#[cfg(feature = "write")]
use crate::metadata_writer::MetadataWriter;
#[cfg(feature = "write")]
use std::path::PathBuf;

/// Configuration for write operations (when write feature is enabled)
#[cfg(feature = "write")]
#[derive(Debug, Clone)]
struct WriteConfig {
/// Metadata writer for catalog operations
writer: Arc<dyn MetadataWriter>,
/// Base data path for writing files
data_path: PathBuf,
}

/// DuckLake catalog provider
Expand Down Expand Up @@ -125,7 +121,6 @@ impl DuckLakeCatalog {
catalog_path,
write_config: Some(WriteConfig {
writer,
data_path: PathBuf::from(&data_path_str),
}),
})
}
Expand Down Expand Up @@ -199,7 +194,7 @@ impl CatalogProvider for DuckLakeCatalog {
// Configure writer if this catalog is writable
#[cfg(feature = "write")]
let schema = if let Some(ref config) = self.write_config {
schema.with_writer(Arc::clone(&config.writer), config.data_path.clone())
schema.with_writer(Arc::clone(&config.writer))
} else {
schema
};
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub enum DuckLakeError {
#[error("Unsupported feature: {0}")]
Unsupported(String),

/// ObjectStore error
#[error("ObjectStore error: {0}")]
ObjectStore(#[from] object_store::Error),

/// IO error
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
Expand Down
25 changes: 14 additions & 11 deletions src/insert_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

use std::any::Any;
use std::fmt::{self, Debug};
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -38,7 +38,7 @@ pub struct DuckLakeInsertExec {
table_name: String,
arrow_schema: SchemaRef,
write_mode: WriteMode,
data_path: PathBuf,
object_store_url: Arc<ObjectStoreUrl>,
cache: PlanProperties,
}

Expand All @@ -51,7 +51,7 @@ impl DuckLakeInsertExec {
table_name: String,
arrow_schema: SchemaRef,
write_mode: WriteMode,
data_path: PathBuf,
object_store_url: Arc<ObjectStoreUrl>,
) -> Self {
let cache = Self::compute_properties();
Self {
Expand All @@ -61,7 +61,7 @@ impl DuckLakeInsertExec {
table_name,
arrow_schema,
write_mode,
data_path,
object_store_url,
cache,
}
}
Expand All @@ -82,7 +82,6 @@ impl Debug for DuckLakeInsertExec {
.field("schema_name", &self.schema_name)
.field("table_name", &self.table_name)
.field("write_mode", &self.write_mode)
.field("data_path", &self.data_path)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -136,7 +135,7 @@ impl ExecutionPlan for DuckLakeInsertExec {
self.table_name.clone(),
Arc::clone(&self.arrow_schema),
self.write_mode,
self.data_path.clone(),
self.object_store_url.clone(),
)))
}

Expand All @@ -158,19 +157,24 @@ impl ExecutionPlan for DuckLakeInsertExec {
let table_name = self.table_name.clone();
let arrow_schema = Arc::clone(&self.arrow_schema);
let write_mode = self.write_mode;
let data_path = self.data_path.clone();
let object_store_url = self.object_store_url.clone();
let output_schema = make_insert_count_schema();

let stream = stream::once(async move {
let input_stream = input.execute(0, context)?;
let input_stream = input.execute(0, Arc::clone(&context))?;
let batches: Vec<RecordBatch> = input_stream.try_collect().await?;

if batches.is_empty() {
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![0u64]));
return Ok(RecordBatch::try_new(output_schema, vec![count_array])?);
}

let table_writer = DuckLakeTableWriter::new(writer)
// Get object store from runtime environment
let object_store = context
.runtime_env()
.object_store(object_store_url.as_ref())?;

let table_writer = DuckLakeTableWriter::new(writer, object_store)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema_without_metadata =
Expand All @@ -185,8 +189,6 @@ impl ExecutionPlan for DuckLakeInsertExec {
)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let _ = data_path;

for batch in &batches {
session
.write_batch(batch)
Expand All @@ -197,6 +199,7 @@ impl ExecutionPlan for DuckLakeInsertExec {

session
.finish()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![row_count]));
Expand Down
33 changes: 5 additions & 28 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::table::DuckLakeTable;
use crate::metadata_writer::{ColumnDef, MetadataWriter, WriteMode};
#[cfg(feature = "write")]
use datafusion::error::DataFusionError;
#[cfg(feature = "write")]
use std::path::PathBuf;

/// Validate table name to prevent path traversal attacks.
/// Table names are used to construct file paths, so we must ensure they
Expand Down Expand Up @@ -64,9 +62,6 @@ pub struct DuckLakeSchema {
/// Metadata writer for write operations (when write feature is enabled)
#[cfg(feature = "write")]
writer: Option<Arc<dyn MetadataWriter>>,
/// Data path for write operations (when write feature is enabled)
#[cfg(feature = "write")]
data_path: Option<PathBuf>,
}

impl DuckLakeSchema {
Expand All @@ -88,23 +83,19 @@ impl DuckLakeSchema {
schema_path,
#[cfg(feature = "write")]
writer: None,
#[cfg(feature = "write")]
data_path: None,
}
}

/// Configure this schema for write operations.
///
/// This method enables write support by attaching a metadata writer and data path.
/// This method enables write support by attaching a metadata writer.
/// Once configured, the schema can handle CREATE TABLE AS and tables can handle INSERT INTO.
///
/// # Arguments
/// * `writer` - Metadata writer for catalog operations
/// * `data_path` - Base path for data files
#[cfg(feature = "write")]
pub fn with_writer(mut self, writer: Arc<dyn MetadataWriter>, data_path: PathBuf) -> Self {
pub fn with_writer(mut self, writer: Arc<dyn MetadataWriter>) -> Self {
self.writer = Some(writer);
self.data_path = Some(data_path);
self
}
}
Expand Down Expand Up @@ -157,14 +148,8 @@ impl SchemaProvider for DuckLakeSchema {

// Configure writer if this schema is writable
#[cfg(feature = "write")]
let table = if let (Some(writer), Some(data_path)) =
(self.writer.as_ref(), self.data_path.as_ref())
{
table.with_writer(
self.schema_name.clone(),
Arc::clone(writer),
data_path.clone(),
)
let table = if let Some(writer) = self.writer.as_ref() {
table.with_writer(self.schema_name.clone(), Arc::clone(writer))
} else {
table
};
Expand Down Expand Up @@ -203,10 +188,6 @@ impl SchemaProvider for DuckLakeSchema {
)
})?;

let data_path = self.data_path.as_ref().ok_or_else(|| {
DataFusionError::Internal("Data path not set for writable schema".to_string())
})?;

// Convert Arrow schema to ColumnDefs
let arrow_schema = table.schema();
let columns: Vec<ColumnDef> = arrow_schema
Expand Down Expand Up @@ -236,11 +217,7 @@ impl SchemaProvider for DuckLakeSchema {
table_path,
)
.map_err(|e| DataFusionError::External(Box::new(e)))?
.with_writer(
self.schema_name.clone(),
Arc::clone(writer),
data_path.clone(),
);
.with_writer(self.schema_name.clone(), Arc::clone(writer));

Ok(Some(Arc::new(writable_table) as Arc<dyn TableProvider>))
}
Expand Down
22 changes: 2 additions & 20 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use crate::types::{
use crate::insert_exec::DuckLakeInsertExec;
#[cfg(feature = "write")]
use crate::metadata_writer::{MetadataWriter, WriteMode};
#[cfg(feature = "write")]
use std::path::PathBuf;

#[cfg(feature = "encryption")]
use crate::encryption::EncryptionFactoryBuilder;
Expand Down Expand Up @@ -97,9 +95,6 @@ pub struct DuckLakeTable {
/// Metadata writer for write operations (when write feature is enabled)
#[cfg(feature = "write")]
writer: Option<Arc<dyn MetadataWriter>>,
/// Data path for write operations (when write feature is enabled)
#[cfg(feature = "write")]
data_path: Option<PathBuf>,
}

impl std::fmt::Debug for DuckLakeTable {
Expand Down Expand Up @@ -174,8 +169,6 @@ impl DuckLakeTable {
schema_name: None,
#[cfg(feature = "write")]
writer: None,
#[cfg(feature = "write")]
data_path: None,
})
}

Expand Down Expand Up @@ -397,17 +390,10 @@ impl DuckLakeTable {
/// # Arguments
/// * `schema_name` - Name of the schema this table belongs to
/// * `writer` - Metadata writer for catalog operations
/// * `data_path` - Base path for data files
#[cfg(feature = "write")]
pub fn with_writer(
mut self,
schema_name: String,
writer: Arc<dyn MetadataWriter>,
data_path: PathBuf,
) -> Self {
pub fn with_writer(mut self, schema_name: String, writer: Arc<dyn MetadataWriter>) -> Self {
self.schema_name = Some(schema_name);
self.writer = Some(writer);
self.data_path = Some(data_path);
self
}

Expand Down Expand Up @@ -590,10 +576,6 @@ impl TableProvider for DuckLakeTable {
DataFusionError::Internal("Schema name not set for writable table".to_string())
})?;

let data_path = self.data_path.as_ref().ok_or_else(|| {
DataFusionError::Internal("Data path not set for writable table".to_string())
})?;

let write_mode = match insert_op {
InsertOp::Append => WriteMode::Append,
InsertOp::Overwrite | InsertOp::Replace => WriteMode::Replace,
Expand All @@ -606,7 +588,7 @@ impl TableProvider for DuckLakeTable {
self.table_name.clone(),
self.schema(),
write_mode,
data_path.clone(),
self.object_store_url.clone(),
)))
}
}
Expand Down
Loading
Loading