Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg-datafusion = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[[example]]
Expand All @@ -43,6 +46,14 @@ name = "oss-backend"
path = "src/oss_backend.rs"
required-features = ["storage-oss"]

[[example]]
name = "datafusion-integration"
path = "src/datafusion_integration.rs"

[[example]]
name = "datafusion-external-table"
path = "src/datafusion_external_table.rs"

[features]
default = []
storage-oss = ["iceberg/storage-oss"]
80 changes: 80 additions & 0 deletions crates/examples/src/datafusion_external_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example demonstrating external Iceberg table access via DataFusion.
//!
//! This example shows how to use `IcebergTableProviderFactory` to read
//! existing Iceberg tables via `CREATE EXTERNAL TABLE` syntax.
//!
//! Note: External tables are read-only. For write operations, use
//! `IcebergCatalogProvider` instead (see `datafusion_integration.rs`).

use std::sync::Arc;

use datafusion::execution::context::SessionContext;
use datafusion::execution::session_state::SessionStateBuilder;
use iceberg_datafusion::IcebergTableProviderFactory;

// ANCHOR: external_table_setup
/// Set up a DataFusion session with IcebergTableProviderFactory registered.
///
/// This allows reading existing Iceberg tables via `CREATE EXTERNAL TABLE` syntax.
fn setup_external_table_support() -> SessionContext {
// Create a session state with the Iceberg table factory registered
let mut state = SessionStateBuilder::new().with_default_features().build();

// Register the IcebergTableProviderFactory to handle "ICEBERG" file type
state.table_factories_mut().insert(
"ICEBERG".to_string(),
Arc::new(IcebergTableProviderFactory::new()),
);

SessionContext::new_with_state(state)
}
// ANCHOR_END: external_table_setup

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = setup_external_table_support();

// ANCHOR: external_table_query
// Example SQL for creating and querying an external Iceberg table:
//
// CREATE EXTERNAL TABLE my_table
// STORED AS ICEBERG
// LOCATION '/path/to/iceberg/metadata/v1.metadata.json';
//
// SELECT * FROM my_table WHERE column > 100;
// ANCHOR_END: external_table_query

println!("External table support configured.");
println!("Use CREATE EXTERNAL TABLE ... STORED AS ICEBERG to read existing tables.");
println!();
println!("Example:");
println!(" CREATE EXTERNAL TABLE my_table");
println!(" STORED AS ICEBERG");
println!(" LOCATION '/path/to/iceberg/metadata/v1.metadata.json';");

// This example requires an actual Iceberg table to query.
// For a complete working example with table creation, see datafusion_integration.rs

// Verify the session is configured correctly
let tables = ctx.catalog_names();
println!("\nRegistered catalogs: {tables:?}");

Ok(())
}
134 changes: 134 additions & 0 deletions crates/examples/src/datafusion_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example demonstrating DataFusion integration with Apache Iceberg.
//!
//! This example shows how to:
//! - Set up an Iceberg catalog with DataFusion
//! - Create tables using SQL
//! - Insert and query data
//! - Query metadata tables

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::execution::context::SessionContext;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
use iceberg_datafusion::IcebergCatalogProvider;
use tempfile::TempDir;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a temporary directory for the warehouse
let temp_dir = TempDir::new()?;
let warehouse_path = temp_dir.path().to_str().unwrap().to_string();

// ANCHOR: catalog_setup
// Create an in-memory Iceberg catalog
let iceberg_catalog = MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path)]),
)
.await?;

// Create a namespace for our tables
let namespace = NamespaceIdent::new("demo".to_string());
iceberg_catalog
.create_namespace(&namespace, HashMap::new())
.await?;

// Create the IcebergCatalogProvider and register it with DataFusion
let catalog_provider =
Arc::new(IcebergCatalogProvider::try_new(Arc::new(iceberg_catalog)).await?);

let ctx = SessionContext::new();
ctx.register_catalog("my_catalog", catalog_provider);
// ANCHOR_END: catalog_setup

// ANCHOR: create_table
// Create a table using SQL
ctx.sql(
"CREATE TABLE my_catalog.demo.users (
id INT NOT NULL,
name STRING NOT NULL,
email STRING
)",
)
.await?;

println!("Table 'users' created successfully.");
// ANCHOR_END: create_table

// ANCHOR: insert_data
// Insert data into the table
let result = ctx
.sql(
"INSERT INTO my_catalog.demo.users VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', NULL)",
)
.await?
.collect()
.await?;

// The result contains the number of rows inserted
println!("Inserted {} rows.", result[0].num_rows());
// ANCHOR_END: insert_data

// ANCHOR: query_data
// Query the data with filtering
println!("\nQuerying users with email:");
let df = ctx
.sql("SELECT id, name, email FROM my_catalog.demo.users WHERE email IS NOT NULL")
.await?;

df.show().await?;

// Query with projection (only specific columns)
println!("\nQuerying only names:");
let df = ctx
.sql("SELECT name FROM my_catalog.demo.users ORDER BY id")
.await?;

df.show().await?;
// ANCHOR_END: query_data

// ANCHOR: metadata_tables
// Query the snapshots metadata table
println!("\nTable snapshots:");
let df = ctx
.sql("SELECT snapshot_id, operation FROM my_catalog.demo.users$snapshots")
.await?;

df.show().await?;

// Query the manifests metadata table
println!("\nTable manifests:");
let df = ctx
.sql("SELECT path, added_data_files_count FROM my_catalog.demo.users$manifests")
.await?;

df.show().await?;
// ANCHOR_END: metadata_tables

println!("\nDataFusion integration example completed successfully!");

Ok(())
}
1 change: 1 addition & 0 deletions website/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [Install](./install.md)
- [Download](./download.md)
- [API](./api.md)
- [DataFusion Integration](./datafusion.md)

# Developer Guide

Expand Down
123 changes: 123 additions & 0 deletions website/src/datafusion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

# DataFusion Integration

The `iceberg-datafusion` crate provides integration between Apache Iceberg and [DataFusion](https://datafusion.apache.org/), enabling SQL queries on Iceberg tables.

## Features

- **SQL DDL/DML**: `CREATE TABLE`, `INSERT INTO`, `SELECT`
- **Metadata Tables**: Query snapshots and manifests

## Dependencies

Add the following to your `Cargo.toml`:

```toml
[dependencies]
iceberg = "0.8"
iceberg-datafusion = "0.8"
datafusion = "51"
tokio = { version = "1", features = ["full"] }
```

## Catalog-Based Access

The recommended way to use Iceberg with DataFusion is through `IcebergCatalogProvider`, which integrates an Iceberg catalog with DataFusion's catalog system.

### Setup

```rust,no_run,noplayground
{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:catalog_setup}}
```

### Creating Tables

Once the catalog is registered, you can create tables using SQL:

```rust,no_run,noplayground
{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:create_table}}
```

Supported column types include:
- `INT`, `BIGINT` - Integer types
- `FLOAT`, `DOUBLE` - Floating point types
- `STRING` - String/text type
- `BOOLEAN` - Boolean type
- `DATE`, `TIMESTAMP` - Date/time types

> **Note**: `CREATE TABLE AS SELECT` is not currently supported. Create the table first, then use `INSERT INTO`.

### Inserting Data

```rust,no_run,noplayground
{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:insert_data}}
```

For nested structs, use the `named_struct()` function:

```sql
INSERT INTO catalog.namespace.table
SELECT
1 as id,
named_struct('street', '123 Main St', 'city', 'NYC') as address
```

### Querying Data

```rust,no_run,noplayground
{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:query_data}}
```

## Metadata Tables

Iceberg metadata tables can be queried using the `$` syntax (following Flink convention):

```rust,no_run,noplayground
{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:metadata_tables}}
```

Available metadata tables:
- `table$snapshots` - Table snapshot history
- `table$manifests` - Manifest file information

## Configuration Options

These table properties control write behavior. They must be set when creating the table via the Iceberg catalog API, as DataFusion SQL does not support `ALTER TABLE` for property changes.

| Property | Default | Description |
|----------|---------|-------------|
| `write.target-file-size-bytes` | `536870912` (512MB) | Target size for data files |
| `write.format.default` | `parquet` | Default file format for new data files |

## Current Limitations

- `CREATE TABLE AS SELECT` is not supported
- Metadata tables are limited to `$snapshots` and `$manifests`
- `ALTER TABLE` and `DROP TABLE` via SQL are not supported (use catalog API)
- Schema evolution through SQL is not supported

## Running the Example

A complete example is available in the repository:

```bash
cargo run -p iceberg-examples --example datafusion-integration
```
Loading