Skip to content

Comments

feat(index): add sorted parquet indexes with CREATE/DROP/SHOW INDEX support #117

Draft
shefeek-jinnah wants to merge 2 commits intomainfrom
shefeek/parquet_indexing
Draft

feat(index): add sorted parquet indexes with CREATE/DROP/SHOW INDEX support #117
shefeek-jinnah wants to merge 2 commits intomainfrom
shefeek/parquet_indexing

Conversation

@shefeek-jinnah
Copy link
Contributor

@shefeek-jinnah shefeek-jinnah commented Feb 11, 2026

Status: DRAFT — Parked for review

Summary

Adds sorted parquet projection indexes to the caching layer. When a user creates an index on a cached table, the engine writes a second parquet file with rows physically sorted by the indexed column(s). At
query time, the planner selects the best available index based on WHERE clause filters, enabling row-group pruning on sorted statistics and sort-elimination for ORDER BY queries.

What's Done

Core indexing engine

  • CREATE INDEX idx ON catalog.schema.table (col1, col2, ...) — sorts cached parquet data by specified columns and writes a separate index parquet file
  • DROP INDEX idx ON catalog.schema.table — removes index metadata and parquet file
  • SHOW INDEXES ON catalog.schema.table — lists all indexes for a table
  • SQL interception in engine.rs (parsed before DataFusion, since these aren't standard DataFusion DDL)

Sorted parquet writer (src/datafetch/sorted_parquet.rs)

  • Uses DataFusion's in-memory sort to reorder batches by index columns (ASC, NULLS FIRST)
  • Writes sorted projection as a standalone parquet file alongside the base table parquet

In-memory batch collector (src/datafetch/collecting_writer.rs)

  • CollectingBatchWriter accumulates batches in memory for multi-pass processing (e.g., creating multiple index presets from the same base data)

Index-aware query planning (src/datafusion/lazy_table_provider.rs)

  • Analyzes pushed-down filters to select the best index whose sort column matches a WHERE predicate
  • Declares output ordering on index parquet so DataFusion can eliminate redundant sorts for ORDER BY
  • Falls back to base parquet when no index benefits the query

Index preset registry (src/datafetch/index_presets.rs)

  • Programmatic API for pre-configuring indexes per table (used by with_tpch_index_presets())
  • Ships with a tpch_optimized preset defining 6 indexes across lineitem, orders, customer, partsupp

Catalog persistence (migrations/sqlite/v5.sql, migrations/postgres/v5.sql)

  • New indexes table with unique constraint on (connection_id, schema_name, table_name, index_name)
  • Cascade delete when connection is removed
  • Index on (connection_id, schema_name, table_name) for efficient lookups

Configurable parquet settings (ParquetConfig)

  • max_row_group_size and bloom_filter_enabled are now configurable via engine builder
  • Indexes are automatically invalidated when a table is re-synced

DataFusion upgrade

  • Upgraded from DataFusion 51.0 → 52.1
  • liquid-cache-client temporarily disabled (waiting for DF 52 compatibility)

Benchmark harness (tests/tpch_benchmark_tests.rs)

  • Full TPC-H Q1–Q22 benchmark suite with DuckDB data generation
  • Tests: test_tpch_benchmark, test_tpch_benchmark_presets, test_index_performance, test_old_vs_current_config, test_rowgroup_size_comparison
  • Bloom filter and distinct index micro-benchmarks

Files Changed (22 files, +4870 / -191)

Area Files
Core engine engine.rs (+952)
Index writer sorted_parquet.rs, collecting_writer.rs, index_presets.rs
Query planner lazy_table_provider.rs (+434), parquet_exec.rs
Parquet config native/parquet_writer.rs, orchestrator.rs (+298)
Catalog manager.rs, sqlite_manager.rs, postgres_manager.rs, caching_manager.rs, mock_catalog.rs
Migrations sqlite/v5.sql, postgres/v5.sql
Tests tpch_benchmark_tests.rs (+2210), result_persistence_tests.rs
Dependencies Cargo.toml (DF 51→52, liquid-cache disabled)

Configuration: Old vs New

Setting Old (main) New (this branch)
DataFusion version 51.0 52.1
Row group size 1,000,000 100,000
Bloom filters Always enabled (FPP=0.01) Disabled by default (configurable)
Compression LZ4 LZ4 (unchanged)
Parquet version 2.0 2.0 (unchanged)
Sorted indexes None CREATE INDEX support
liquid-cache Enabled Temporarily disabled

Writer config (old — main branch):

WriterProperties::builder()
    .set_writer_version(WriterVersion::PARQUET_2_0)
    .set_compression(Compression::LZ4_RAW)
    .set_bloom_filter_enabled(true)
    .set_bloom_filter_fpp(0.01)
    .build()
// Row group size: 1,000,000 (default)

Writer config (new — this branch):
ParquetConfig {
    max_row_group_size: 100_000,     // 10x smaller row groups → better pruning
    bloom_filter_enabled: false,      // disabled by default → smaller files
}
// + optional sorted index parquet files via CREATE INDEX

---
TPC-H Performance Comparison (SF=1.0, ~1GB dataset)

Old config: 1M row groups + bloom filters (FPP=0.01), no indexes
New config: 100K row groups, no bloom filters, 6 sorted indexes
┌───────┬────────────────────────┬──────────┬──────────┬────────────────────┐
│ QueryPatternOld (ms)New (ms)Improvement     │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q1Aggregation64.0162.872% faster          │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q2Correlated subquery    │ 82.5877.856% faster          │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q33-way join             │ 54.8146.8814% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q4EXISTS subquery        │ 27.8416.4741% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q56-way join             │ 73.1163.4013% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q6Scan + filter          │ 18.907.1262% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q76-way join + CASE111.03111.60   │ ~same              │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q88-way join             │ 88.6371.4619% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q96-way join + LIKE127.60107.6816% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q104-way join             │ 88.7880.829% faster          │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q11Nested subquery        │ 58.3441.0330% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q123-way join + CASE69.2340.6841% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q13LEFT OUTER join        │ 86.4328.8267% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q142-way join             │ 28.0416.8540% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q15CTE + MAX30.7917.6743% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q16NOT IN subquery        │ 30.4617.9941% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q17AVG subquery           │ 93.04100.318% slower          │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q18IN + HAVING119.38133.6912% slower         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q19OR predicates          │ 58.8144.0525% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q20IN + EXISTS47.7840.2016% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q21EXISTS + NOT EXISTS113.84109.544% faster          │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Q22NOT EXISTS + SUBSTRING17.9812.6829% faster         │
├───────┼────────────────────────┼──────────┼──────────┼────────────────────┤
│ Total │                        │ 1,491 ms │ 1,250 ms │ 16% faster (1.19x) │
└───────┴────────────────────────┴──────────┴──────────┴────────────────────┘
Results Summary

- 20 of 22 queries faster or unchanged
- Best gains: Q13 (-67%), Q6 (-62%), Q15 (-43%), Q4/Q12/Q16 (-41%), Q14 (-40%)
- Regressions: Q17 (+8%), Q18 (+12%) — correlated subqueries with AVG/HAVING that don't benefit from the current index set
- Overall: 1.19x faster (1,491ms → 1,250ms)

Cache Size Trade-off
┌──────────────────────────────────────┬────────────────────────┐
│                ConfigCache Size       │
├──────────────────────────────────────┼────────────────────────┤
│ Old (1M row groups + bloom)688.7 MB               │
├──────────────────────────────────────┼────────────────────────┤
│ New base (100K row groups, no bloom)388.9 MB (43% smaller) │
├──────────────────────────────────────┼────────────────────────┤
│ New + 6 sorted indexes               │ 973.9 MB               │
└──────────────────────────────────────┴────────────────────────┘
The base cache without indexes is 43% smaller. Adding 6 sorted index files brings total storage to 973.9 MB. Indexes are opt-in via CREATE INDEX — only create them for columns that benefit your workload.

Indexes Used in Benchmark
┌───────────────┬──────────┬─────────────┬────────────────────────────────┐
│     IndexTableColumnBenefits Queries        │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_shipdate  │ lineitem │ l_shipdate  │ Q1, Q3, Q6, Q7, Q14, Q15, Q20  │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_orderdate │ orders   │ o_orderdate │ Q3, Q4, Q5, Q8, Q10            │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_partkey   │ lineitem │ l_partkey   │ Q2, Q9, Q14, Q17, Q19, Q20     │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_custkey   │ orders   │ o_custkey   │ Q3, Q5, Q8, Q10, Q13, Q18, Q22 │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_nationkey │ customer │ c_nationkey │ Q5, Q7, Q8, Q10                │
├───────────────┼──────────┼─────────────┼────────────────────────────────┤
│ idx_suppkey   │ partsupp │ ps_suppkey  │ Q2, Q9, Q11, Q16, Q20          │
└───────────────┴──────────┴─────────────┴────────────────────────────────┘
---
How to Run Benchmarks

# Quick smoke test (SF=0.01, ~10MB)
cargo test tpch_benchmark --release -- --nocapture

# Full benchmark — old vs new comparison (SF=1.0, ~1GB)
TPCH_SCALE_FACTOR=1.0 cargo test test_old_vs_current_config --release -- --nocapture --test-threads=1

# Index performance — baseline vs indexed (SF=1.0)
TPCH_SCALE_FACTOR=1.0 cargo test test_index_performance --release -- --nocapture --test-threads=1

# Row group size — 1M vs 100K without indexes (SF=1.0)
TPCH_SCALE_FACTOR=1.0 cargo test test_rowgroup_size_comparison --release -- --nocapture --test-threads=1

---
Known Limitations

- liquid-cache-client temporarily disabled (incompatible with DataFusion 52)
- Index creation reads all cached parquet into memory and re-sorts — not suitable for very large tables without streaming sort
- No automatic index selection for JOIN keys (only WHERE clause filters are matched)
- Indexes are invalidated on table re-sync but not automatically rebuilt
Single Query Benchmark: ORDER BY with LIMIT

Query: SELECT * FROM tpch.main.orders ORDER BY o_orderdate DESC LIMIT 5                                                                                                                                        
Dataset: TPC-H SF=1.0 (~1.5M orders)
┌──────┬────────────┬──────────────────────────────────────┬──────┬──────────────────────────────────────────────────────────────────────────────────┐                                                         
│ Step │ DataFusion │                Config                │ Time │                                  What Happened                                   │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 1    │ 51         │ 1M row group, bloom, no index        │ 54ms │ Full scan + sort                                                                 │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 2    │ 51         │ 1M row group, bloom, ASC index       │ 54ms │ No sort pushdown in DF 51 — index ignored, same as step 1                        │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 3    │ 52         │ 1M row group, bloom, no index        │ 54ms │ Same as step 1 — no index, no benefit from DF 52                                 │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 4    │ 52         │ 1M row group, bloom, ASC index       │ 74ms │ Sort pushdown activates, but reverse scan of all row groups — slower             │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 5    │ 52         │ 100K row group, no bloom, no index   │ 16ms │ Smaller row groups = faster scan + sort                                          │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 6    │ 52         │ 100K row group, no bloom, ASC index  │ 20ms │ Sort pushdown + reverse scan — slight overhead                                   │
├──────┼────────────┼──────────────────────────────────────┼──────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 7    │ 52         │ 100K row group, no bloom, DESC index │ 5ms  │ Sort eliminated, limit pushed down, early termination — 10.8x faster than step 1 │
└──────┴────────────┴──────────────────────────────────────┴──────┴──────────────────────────────────────────────────────────────────────────────────┘

Why DataFusion 52?

The sorted index feature depends on sort pushdown (apache/datafusion#10433), landed in DataFusion 52 via apache/datafusion#19064.

It adds a PushdownSort optimizer rule that detects SortExec nodes in physical plans and pushes sorting requirements down to data sources via a new try_pushdown_sort() trait method.

The key concept is Inexact ordering. When DataFusion sees ORDER BY o_orderdate DESC LIMIT 5 on a parquet file:

  1. The PushdownSort rule asks the ParquetSource: "can you help with this sort?"
  2. ParquetSource checks row group statistics and sets reverse_scan_inexact=true — meaning it will reverse the order of row groups based on their min/max metadata
  3. It returns Inexact — telling the planner "I'll give you data in roughly the right order, but not perfectly sorted"
  4. Because the result is Inexact, the SortExec node stays in the plan for correctness, but now it receives data that's approximately sorted
  5. Combined with LIMIT 5, the TopK sort can terminate early — the first few row groups likely contain all the answers

The critical detail: it does NOT reverse rows within row groups. It only reorders which row groups are read first. That's why it's "Inexact" — row-group-level ordering, not row-level. The Sort operator above
it handles the final correctness.

Before this PR (DataFusion 51), there was no mechanism for the planner to tell a data source "I need this data sorted" — so sorted parquet files gave zero benefit. The sort always happened in memory after a
full scan.

Why reverse order scanning not possilbe within a row group ?

Rows within a row group are not individually addressable. They're stored as columnar chunks where each column is compressed and encoded (dictionary encoding, RLE, delta encoding, etc.). To read row N of a
column chunk, you have to decompress and decode all pages from the start of that chunk up to row N. There's no "seek to row 99,995" — the encoding is sequential.

So when DataFusion does reverse_row_groups=true on an ASC-sorted index for a DESC query:

  • It correctly reads the last row group first (the one with the highest dates)
  • But the rows inside that group are sorted ASC, and the 5 highest dates are at the end
  • To reach them, it must decompress and decode the entire column chunk (~100K or ~1M rows) from start to finish
  • It can't read the column chunk backwards because the encoding doesn't support it

That's why a DESC index solves this — the 5 highest dates are the first 5 rows in the first row group. Decompress one page, read 5 rows, done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant