Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ use crate::schema::DuckLakeSchema;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::datasource::object_store::ObjectStoreUrl;

#[cfg(feature = "write")]
use crate::metadata_writer::MetadataWriter;
#[cfg(feature = "write")]
use std::path::PathBuf;

/// Configuration for write operations (when write feature is enabled)
#[cfg(feature = "write")]
#[derive(Debug, Clone)]
struct WriteConfig {
/// Metadata writer for catalog operations
writer: Arc<dyn MetadataWriter>,
/// Base data path for writing files
data_path: PathBuf,
}

/// DuckLake catalog provider
///
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
Expand All @@ -26,6 +41,9 @@ pub struct DuckLakeCatalog {
object_store_url: Arc<ObjectStoreUrl>,
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
catalog_path: String,
/// Write configuration (when write feature is enabled)
#[cfg(feature = "write")]
write_config: Option<WriteConfig>,
}

impl DuckLakeCatalog {
Expand All @@ -44,6 +62,8 @@ impl DuckLakeCatalog {
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
#[cfg(feature = "write")]
write_config: None,
})
}

Expand All @@ -61,6 +81,52 @@ impl DuckLakeCatalog {
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
#[cfg(feature = "write")]
write_config: None,
})
}

/// Create a catalog with write support.
///
/// This constructor enables write operations (INSERT INTO, CREATE TABLE AS)
/// by attaching a metadata writer. The catalog will pass the writer to all
/// schemas and tables it creates.
///
/// # Arguments
/// * `provider` - Metadata provider for reading catalog metadata
/// * `writer` - Metadata writer for write operations
///
/// # Example
/// ```no_run
/// # async fn example() -> datafusion_ducklake::Result<()> {
/// use datafusion_ducklake::{DuckLakeCatalog, SqliteMetadataProvider, SqliteMetadataWriter};
/// use std::sync::Arc;
///
/// let provider = SqliteMetadataProvider::new("sqlite:catalog.db?mode=rwc").await?;
/// let writer = SqliteMetadataWriter::new("sqlite:catalog.db?mode=rwc").await?;
///
/// let catalog = DuckLakeCatalog::with_writer(Arc::new(provider), Arc::new(writer))?;
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "write")]
pub fn with_writer(
provider: Arc<dyn MetadataProvider>,
writer: Arc<dyn MetadataWriter>,
) -> Result<Self> {
let snapshot_id = provider.get_current_snapshot()?;
let data_path_str = provider.get_data_path()?;
let (object_store_url, catalog_path) = parse_object_store_url(&data_path_str)?;

Ok(Self {
provider,
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
write_config: Some(WriteConfig {
writer,
data_path: PathBuf::from(&data_path_str),
}),
})
}

Expand Down Expand Up @@ -121,14 +187,24 @@ impl CatalogProvider for DuckLakeCatalog {
resolve_path(&self.catalog_path, &meta.path, meta.path_is_relative);

// Pass the pinned snapshot_id to schema
Some(Arc::new(DuckLakeSchema::new(
let schema = DuckLakeSchema::new(
meta.schema_id,
meta.schema_name,
Arc::clone(&self.provider),
self.snapshot_id, // Propagate pinned snapshot_id
self.object_store_url.clone(),
schema_path,
)) as Arc<dyn SchemaProvider>)
);

// Configure writer if this catalog is writable
#[cfg(feature = "write")]
let schema = if let Some(ref config) = self.write_config {
schema.with_writer(Arc::clone(&config.writer), config.data_path.clone())
} else {
schema
};

Some(Arc::new(schema) as Arc<dyn SchemaProvider>)
},
_ => None,
}
Expand Down
224 changes: 224 additions & 0 deletions src/insert_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
//! DuckLake INSERT execution plan.
//!
//! Limitations:
//! - Collects all batches into memory before writing (no streaming yet)
//! - Single partition only (partition 0)

use std::any::Any;
use std::fmt::{self, Debug};
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::stream::{self, TryStreamExt};

use crate::metadata_writer::{MetadataWriter, WriteMode};
use crate::table_writer::DuckLakeTableWriter;

/// Schema for the output of insert operations (count of rows inserted)
fn make_insert_count_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new(
"count",
DataType::UInt64,
false,
)]))
}

/// Execution plan that writes input data to a DuckLake table.
pub struct DuckLakeInsertExec {
input: Arc<dyn ExecutionPlan>,
writer: Arc<dyn MetadataWriter>,
schema_name: String,
table_name: String,
arrow_schema: SchemaRef,
write_mode: WriteMode,
data_path: PathBuf,
cache: PlanProperties,
}

impl DuckLakeInsertExec {
/// Create a new DuckLakeInsertExec
pub fn new(
input: Arc<dyn ExecutionPlan>,
writer: Arc<dyn MetadataWriter>,
schema_name: String,
table_name: String,
arrow_schema: SchemaRef,
write_mode: WriteMode,
data_path: PathBuf,
) -> Self {
let cache = Self::compute_properties();
Self {
input,
writer,
schema_name,
table_name,
arrow_schema,
write_mode,
data_path,
cache,
}
}

fn compute_properties() -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(make_insert_count_schema()),
Partitioning::UnknownPartitioning(1),
datafusion::physical_plan::execution_plan::EmissionType::Final,
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
)
}
}

impl Debug for DuckLakeInsertExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DuckLakeInsertExec")
.field("schema_name", &self.schema_name)
.field("table_name", &self.table_name)
.field("write_mode", &self.write_mode)
.field("data_path", &self.data_path)
.finish_non_exhaustive()
}
}

impl DisplayAs for DuckLakeInsertExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => {
write!(
f,
"DuckLakeInsertExec: schema={}, table={}, mode={:?}",
self.schema_name, self.table_name, self.write_mode
)
},
}
}
}

impl ExecutionPlan for DuckLakeInsertExec {
fn name(&self) -> &str {
"DuckLakeInsertExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"DuckLakeInsertExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
Arc::clone(&children[0]),
Arc::clone(&self.writer),
self.schema_name.clone(),
self.table_name.clone(),
Arc::clone(&self.arrow_schema),
self.write_mode,
self.data_path.clone(),
)))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(format!(
"DuckLakeInsertExec only supports partition 0, got {}",
partition
)));
}

let input = Arc::clone(&self.input);
let writer = Arc::clone(&self.writer);
let schema_name = self.schema_name.clone();
let table_name = self.table_name.clone();
let arrow_schema = Arc::clone(&self.arrow_schema);
let write_mode = self.write_mode;
let data_path = self.data_path.clone();
let output_schema = make_insert_count_schema();

let stream = stream::once(async move {
let input_stream = input.execute(0, context)?;
let batches: Vec<RecordBatch> = input_stream.try_collect().await?;

if batches.is_empty() {
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![0u64]));
return Ok(RecordBatch::try_new(output_schema, vec![count_array])?);
}

let table_writer = DuckLakeTableWriter::new(writer)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema_without_metadata =
Schema::new(arrow_schema.fields().iter().cloned().collect::<Vec<_>>());

let mut session = table_writer
.begin_write(
&schema_name,
&table_name,
&schema_without_metadata,
write_mode,
)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let _ = data_path;

for batch in &batches {
session
.write_batch(batch)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}

let row_count = session.row_count() as u64;

session
.finish()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![row_count]));
Ok(RecordBatch::try_new(output_schema, vec![count_array])?)
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
make_insert_count_schema(),
stream.map_err(|e: DataFusionError| e),
)))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_insert_count_schema() {
let schema = make_insert_count_schema();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0).name(), "count");
assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub mod metadata_provider_sqlite;

// Write support (feature-gated)
#[cfg(feature = "write")]
pub mod insert_exec;
#[cfg(feature = "write")]
pub mod metadata_writer;
#[cfg(feature = "write-sqlite")]
pub mod metadata_writer_sqlite;
Expand Down Expand Up @@ -91,6 +93,8 @@ pub use metadata_provider_sqlite::SqliteMetadataProvider;

// Re-export write types (feature-gated)
#[cfg(feature = "write")]
pub use insert_exec::DuckLakeInsertExec;
#[cfg(feature = "write")]
pub use metadata_writer::{
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteResult, WriteSetupResult,
};
Expand Down
Loading
Loading