diff --git a/Cargo.lock b/Cargo.lock index 3de43e685b..24bd3b5f39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3540,9 +3540,12 @@ dependencies = [ name = "iceberg-examples" version = "0.8.0" dependencies = [ + "datafusion", "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-datafusion", + "tempfile", "tokio", ] diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index c7874d9a17..63c1fb1dc9 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -25,9 +25,12 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } [[example]] @@ -43,6 +46,14 @@ name = "oss-backend" path = "src/oss_backend.rs" required-features = ["storage-oss"] +[[example]] +name = "datafusion-integration" +path = "src/datafusion_integration.rs" + +[[example]] +name = "datafusion-external-table" +path = "src/datafusion_external_table.rs" + [features] default = [] storage-oss = ["iceberg/storage-oss"] diff --git a/crates/examples/src/datafusion_external_table.rs b/crates/examples/src/datafusion_external_table.rs new file mode 100644 index 0000000000..dcd66515c8 --- /dev/null +++ b/crates/examples/src/datafusion_external_table.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating external Iceberg table access via DataFusion. +//! +//! This example shows how to use `IcebergTableProviderFactory` to read +//! existing Iceberg tables via `CREATE EXTERNAL TABLE` syntax. +//! +//! Note: External tables are read-only. For write operations, use +//! `IcebergCatalogProvider` instead (see `datafusion_integration.rs`). + +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::execution::session_state::SessionStateBuilder; +use iceberg_datafusion::IcebergTableProviderFactory; + +// ANCHOR: external_table_setup +/// Set up a DataFusion session with IcebergTableProviderFactory registered. +/// +/// This allows reading existing Iceberg tables via `CREATE EXTERNAL TABLE` syntax. +fn setup_external_table_support() -> SessionContext { + // Create a session state with the Iceberg table factory registered + let mut state = SessionStateBuilder::new().with_default_features().build(); + + // Register the IcebergTableProviderFactory to handle "ICEBERG" file type + state.table_factories_mut().insert( + "ICEBERG".to_string(), + Arc::new(IcebergTableProviderFactory::new()), + ); + + SessionContext::new_with_state(state) +} +// ANCHOR_END: external_table_setup + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = setup_external_table_support(); + + // ANCHOR: external_table_query + // Example SQL for creating and querying an external Iceberg table: + // + // CREATE EXTERNAL TABLE my_table + // STORED AS ICEBERG + // LOCATION '/path/to/iceberg/metadata/v1.metadata.json'; + // + // SELECT * FROM my_table WHERE column > 100; + // ANCHOR_END: external_table_query + + println!("External table support configured."); + println!("Use CREATE EXTERNAL TABLE ... STORED AS ICEBERG to read existing tables."); + println!(); + println!("Example:"); + println!(" CREATE EXTERNAL TABLE my_table"); + println!(" STORED AS ICEBERG"); + println!(" LOCATION '/path/to/iceberg/metadata/v1.metadata.json';"); + + // This example requires an actual Iceberg table to query. + // For a complete working example with table creation, see datafusion_integration.rs + + // Verify the session is configured correctly + let tables = ctx.catalog_names(); + println!("\nRegistered catalogs: {tables:?}"); + + Ok(()) +} diff --git a/crates/examples/src/datafusion_integration.rs b/crates/examples/src/datafusion_integration.rs new file mode 100644 index 0000000000..b2c9ccad37 --- /dev/null +++ b/crates/examples/src/datafusion_integration.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating DataFusion integration with Apache Iceberg. +//! +//! This example shows how to: +//! - Set up an Iceberg catalog with DataFusion +//! - Create tables using SQL +//! - Insert and query data +//! - Query metadata tables + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; +use iceberg_datafusion::IcebergCatalogProvider; +use tempfile::TempDir; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a temporary directory for the warehouse + let temp_dir = TempDir::new()?; + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + // ANCHOR: catalog_setup + // Create an in-memory Iceberg catalog + let iceberg_catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path)]), + ) + .await?; + + // Create a namespace for our tables + let namespace = NamespaceIdent::new("demo".to_string()); + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + // Create the IcebergCatalogProvider and register it with DataFusion + let catalog_provider = + Arc::new(IcebergCatalogProvider::try_new(Arc::new(iceberg_catalog)).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("my_catalog", catalog_provider); + // ANCHOR_END: catalog_setup + + // ANCHOR: create_table + // Create a table using SQL + ctx.sql( + "CREATE TABLE my_catalog.demo.users ( + id INT NOT NULL, + name STRING NOT NULL, + email STRING + )", + ) + .await?; + + println!("Table 'users' created successfully."); + // ANCHOR_END: create_table + + // ANCHOR: insert_data + // Insert data into the table + let result = ctx + .sql( + "INSERT INTO my_catalog.demo.users VALUES + (1, 'Alice', 'alice@example.com'), + (2, 'Bob', 'bob@example.com'), + (3, 'Charlie', NULL)", + ) + .await? + .collect() + .await?; + + // The result contains the number of rows inserted + println!("Inserted {} rows.", result[0].num_rows()); + // ANCHOR_END: insert_data + + // ANCHOR: query_data + // Query the data with filtering + println!("\nQuerying users with email:"); + let df = ctx + .sql("SELECT id, name, email FROM my_catalog.demo.users WHERE email IS NOT NULL") + .await?; + + df.show().await?; + + // Query with projection (only specific columns) + println!("\nQuerying only names:"); + let df = ctx + .sql("SELECT name FROM my_catalog.demo.users ORDER BY id") + .await?; + + df.show().await?; + // ANCHOR_END: query_data + + // ANCHOR: metadata_tables + // Query the snapshots metadata table + println!("\nTable snapshots:"); + let df = ctx + .sql("SELECT snapshot_id, operation FROM my_catalog.demo.users$snapshots") + .await?; + + df.show().await?; + + // Query the manifests metadata table + println!("\nTable manifests:"); + let df = ctx + .sql("SELECT path, added_data_files_count FROM my_catalog.demo.users$manifests") + .await?; + + df.show().await?; + // ANCHOR_END: metadata_tables + + println!("\nDataFusion integration example completed successfully!"); + + Ok(()) +} diff --git a/website/src/SUMMARY.md b/website/src/SUMMARY.md index da82364f58..21159497ec 100644 --- a/website/src/SUMMARY.md +++ b/website/src/SUMMARY.md @@ -24,6 +24,7 @@ - [Install](./install.md) - [Download](./download.md) - [API](./api.md) +- [DataFusion Integration](./datafusion.md) # Developer Guide diff --git a/website/src/datafusion.md b/website/src/datafusion.md new file mode 100644 index 0000000000..c3921720c6 --- /dev/null +++ b/website/src/datafusion.md @@ -0,0 +1,123 @@ + + +# DataFusion Integration + +The `iceberg-datafusion` crate provides integration between Apache Iceberg and [DataFusion](https://datafusion.apache.org/), enabling SQL queries on Iceberg tables. + +## Features + +- **SQL DDL/DML**: `CREATE TABLE`, `INSERT INTO`, `SELECT` +- **Metadata Tables**: Query snapshots and manifests + +## Dependencies + +Add the following to your `Cargo.toml`: + +```toml +[dependencies] +iceberg = "0.8" +iceberg-datafusion = "0.8" +datafusion = "51" +tokio = { version = "1", features = ["full"] } +``` + +## Catalog-Based Access + +The recommended way to use Iceberg with DataFusion is through `IcebergCatalogProvider`, which integrates an Iceberg catalog with DataFusion's catalog system. + +### Setup + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:catalog_setup}} +``` + +### Creating Tables + +Once the catalog is registered, you can create tables using SQL: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:create_table}} +``` + +Supported column types include: +- `INT`, `BIGINT` - Integer types +- `FLOAT`, `DOUBLE` - Floating point types +- `STRING` - String/text type +- `BOOLEAN` - Boolean type +- `DATE`, `TIMESTAMP` - Date/time types + +> **Note**: `CREATE TABLE AS SELECT` is not currently supported. Create the table first, then use `INSERT INTO`. + +### Inserting Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:insert_data}} +``` + +For nested structs, use the `named_struct()` function: + +```sql +INSERT INTO catalog.namespace.table +SELECT + 1 as id, + named_struct('street', '123 Main St', 'city', 'NYC') as address +``` + +### Querying Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:query_data}} +``` + +## Metadata Tables + +Iceberg metadata tables can be queried using the `$` syntax (following Flink convention): + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:metadata_tables}} +``` + +Available metadata tables: +- `table$snapshots` - Table snapshot history +- `table$manifests` - Manifest file information + +## Configuration Options + +These table properties control write behavior. They must be set when creating the table via the Iceberg catalog API, as DataFusion SQL does not support `ALTER TABLE` for property changes. + +| Property | Default | Description | +|----------|---------|-------------| +| `write.target-file-size-bytes` | `536870912` (512MB) | Target size for data files | +| `write.format.default` | `parquet` | Default file format for new data files | + +## Current Limitations + +- `CREATE TABLE AS SELECT` is not supported +- Metadata tables are limited to `$snapshots` and `$manifests` +- `ALTER TABLE` and `DROP TABLE` via SQL are not supported (use catalog API) +- Schema evolution through SQL is not supported + +## Running the Example + +A complete example is available in the repository: + +```bash +cargo run -p iceberg-examples --example datafusion-integration +```