Skip to content

Add Variant SELECT rewriter and comprehensive architecture docs#16

Open
tonyalaribe wants to merge 8 commits intomasterfrom
optimize
Open

Add Variant SELECT rewriter and comprehensive architecture docs#16
tonyalaribe wants to merge 8 commits intomasterfrom
optimize

Conversation

@tonyalaribe
Copy link
Contributor

Summary

  • VariantSelectRewriter: Adds analyzer rule to automatically wrap Variant columns with variant_to_json() in SELECT projections, ensuring proper serialization for PostgreSQL wire protocol
  • Comprehensive Documentation: Adds three new architecture docs covering the entire system
  • Database improvements: Unified table storage model and DML integration updates

Documentation Added

  • docs/ARCHITECTURE.md - Full system architecture overview with diagrams

    • Module structure
    • Data flow (Insert/Select/Flush paths)
    • Multi-tenant storage model
    • Key data structures (MemBuffer, WAL, Config)
    • Query transformation pipeline
    • Caching and safety mechanisms
  • docs/VARIANT_TYPE_SYSTEM.md - Variant type implementation

    • Struct representation
    • INSERT/SELECT automatic transformations
    • JSON path operators (-> and ->>)
    • UDFs and usage examples
  • docs/WAL.md - Write-ahead log implementation

    • Entry format and structure
    • Topic partitioning
    • Recovery process
    • Safety features

Code Changes

  • src/optimizers/variant_select_rewriter.rs - New analyzer rule
  • src/database.rs - Unified table storage improvements
  • src/dml.rs - Buffered layer integration
  • Schema and test updates

Test plan

  • Run cargo test for unit tests
  • Run cargo test --test integration_test for integration tests
  • Verify Variant column SELECT returns JSON strings via psql

- Consolidate WALRUS_DATA_DIR and FOYER_CACHE_DIR into single TIMEFUSION_DATA_DIR
  with derived subdirs (wal/, cache/)
- Add VariantConversionExec to convert string columns to Variant during INSERT
- Add VariantInsertRewriter analyzer rule to rewrite string literals for Variant columns
- Add is_schema_compatible_for_insert() for flexible INSERT type checking
- Split optimizers.rs into optimizers/ module directory
- Improve query sanitization for INSERT and long queries
- Add VariantSelectRewriter analyzer rule to wrap Variant columns with
  variant_to_json() in SELECT projections for PostgreSQL wire protocol
- Add comprehensive documentation:
  - docs/ARCHITECTURE.md: Full system architecture overview
  - docs/VARIANT_TYPE_SYSTEM.md: Variant type implementation details
  - docs/WAL.md: Write-ahead log implementation and recovery
- Update database.rs with unified table storage model improvements
- Update DML operations with buffered layer integration
- Align otel_logs_and_spans schema with monoscope
- Fix test configurations for new architecture
@claude
Copy link

claude bot commented Feb 1, 2026

PR Review: Add Variant SELECT rewriter and comprehensive architecture docs

Summary

This is a well-structured PR that adds critical functionality for Variant type handling and significantly improves documentation. The implementation is thoughtful and addresses PostgreSQL wire protocol compatibility correctly.

✅ Strengths

Code Quality

  1. Excellent Variant Rewriter Design (src/optimizers/variant_select_rewriter.rs, variant_insert_rewriter.rs)

    • Clean separation of concerns with dedicated analyzer rules
    • Proper handling of already-wrapped expressions (prevents double-wrapping at line 54-58)
    • Preserves column aliases correctly (line 66-79 in select_rewriter.rs)
    • Good use of debug logging for observability
  2. Robust Database Refactoring (src/database.rs)

    • Unified table storage model simplifies multi-tenancy
    • Proper error handling with exponential backoff (line 552)
    • Safe conversion between JSON strings and Variant with detailed error messages (line 168)
    • Consistent use of Arc/RwLock for thread safety
  3. Configuration Improvements (src/config.rs)

    • New unified TIMEFUSION_DATA_DIR consolidates WAL and cache paths
    • Clean helper methods wal_dir() and cache_dir() (lines 240-246)
    • Good defaults and validation (e.g., min enforcement at line 440)
  4. Comprehensive Documentation

    • ARCHITECTURE.md: Excellent system overview with clear diagrams
    • VARIANT_TYPE_SYSTEM.md: Well-documented automatic transformations
    • WAL.md: Thorough explanation of durability guarantees

Best Practices

  • Proper use of DataFusion's TreeNode API for AST transformation
  • Fail-fast approach on invalid JSON (database.rs:168)
  • No backwards-compatibility hacks (clean removal of WALRUS_DATA_DIR and TIMEFUSION_FOYER_CACHE_DIR)

🔍 Issues & Recommendations

Security Concerns

HIGH: Credentials in Database Schema (database.rs:570-571)

s3_access_key_id VARCHAR(500) NOT NULL,
s3_secret_access_key VARCHAR(500) NOT NULL,

Storing credentials in plaintext is a security vulnerability. Consider:

  • Encrypting credentials at rest using PostgreSQL's pgcrypto extension
  • Using AWS IAM roles or STS temporary credentials instead
  • Referencing AWS Secrets Manager or similar secret stores

MEDIUM: Potential SQL Injection in Dynamic Queries
While I don't see direct issues in this PR, ensure that any dynamic SQL construction (especially in DML operations) properly uses parameterized queries.

Code Quality Issues

1. Missing Error Context (variant_insert_rewriter.rs:32)

plan.transform_up(|node| rewrite_insert_node(node)).map(|t| t.data)

Consider adding error context to help with debugging:

plan.transform_up(|node| rewrite_insert_node(node))
    .map(|t| t.data)
    .map_err(|e| DataFusionError::Plan(format!("VariantInsertRewriter failed: {}", e)))

2. Recursive Transformation Could Stack Overflow (variant_insert_rewriter.rs:94-104)
The recursive call in rewrite_input_for_variant could overflow on deeply nested plans. Consider:

  • Adding a depth limit
  • Using an iterative approach with a stack
  • Adding a comment about expected maximum depth

3. Magic Number (optimizers/mod.rs:29)

let date_scalar = ScalarValue::Date32(Some(date.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp() as i32 / 86400));

The division by 86400 (seconds per day) should be a named constant:

const SECONDS_PER_DAY: i32 = 86400;

4. Unused Code Warning (optimizers/mod.rs:7-10)

// Remove unused imports warning - these are used by the submodules indirectly
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::scalar::ScalarValue;

These imports ARE used directly in the time_range_partition_pruner module below. The comment is misleading - just remove it.

Performance Considerations

1. Cloning in Hot Path (variant_select_rewriter.rs:40)

expr.clone()

While necessary for the TreeNode pattern, consider documenting that this is acceptable because:

  • DataFusion expressions use Arc internally
  • Clones are shallow for most expression types
  • The optimizer runs once per query, not per row

2. Batch Conversion Efficiency (database.rs:100-152)
The convert_variant_columns function processes columns sequentially. For large batches with multiple Variant columns, consider parallel conversion using rayon.

Test Coverage

GOOD:

  • Integration test updates show the feature is tested end-to-end
  • Test configuration properly isolates tests with unique IDs

NEEDS IMPROVEMENT:

  • No dedicated unit tests for the new rewriter modules
  • Missing test cases:
    • Double-wrapping prevention (already wrapped columns)
    • Nested projections with Variant columns
    • INSERT with partial column lists (columns beyond batch length)
    • Invalid JSON error handling path

Recommendation: Add unit tests in src/optimizers/variant_insert_rewriter.rs and variant_select_rewriter.rs:

#[cfg(test)]
mod tests {
    use super::*;
    use datafusion::prelude::*;

    #[test]
    fn test_variant_select_no_double_wrap() {
        // Test that already-wrapped columns aren't wrapped again
    }

    #[test]
    fn test_variant_insert_with_partial_columns() {
        // Test INSERT with fewer columns than table schema
    }
}

Documentation

Minor Issues:

  1. VARIANT_TYPE_SYSTEM.md line 70: "Applies recursively to Values and Projection nodes" - Should clarify what happens with subqueries
  2. ARCHITECTURE.md: Missing information about how Variant types interact with the buffered write layer

🎯 Action Items

Before Merge (Required):

  1. ✅ Address the credentials storage security concern (use encryption or external secret management)
  2. Add unit tests for the rewriter modules
  3. Remove misleading comment in optimizers/mod.rs:7

Post-Merge (Recommended):

  1. Add depth limit to recursive rewrite_input_for_variant
  2. Extract magic number (86400) to named constant
  3. Consider parallel Variant column conversion for large batches
  4. Add more edge case tests for Variant handling

📊 Metrics

  • Files changed: 24
  • Additions: 1,839 lines (including 912 lines of documentation)
  • Deletions: 264 lines
  • Net impact: Significant improvement in code organization and documentation

Conclusion

This is high-quality work that significantly improves the Variant type system and documentation. The security concern around credential storage should be addressed before merging, but otherwise the implementation is solid. The new documentation is comprehensive and will help onboard contributors.

Recommendation: Approve with requested changes to credential storage

Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com

variant_columns_to_json() was using RecordBatch::try_new() which fails
when creating batches with 0 columns (empty projections used by COUNT(*))
because Arrow requires either columns or an explicit row count.

Changed to try_new_with_options() to preserve the original batch's row
count, fixing queries like SELECT COUNT(*) that don't need any columns.
Transient network errors like "error sending request" were failing
immediately with no retries. Added:
- RetryConfig: 5 retries with exponential backoff (100ms-15s)
- ClientOptions: 30s connect timeout, 5min request timeout

This should resolve intermittent flush failures to R2/S3.
- Replace blocking std::thread::sleep with tokio::time::sleep in CAS
  retry loop to avoid starving the Tokio executor under contention
- Fix DML memory tracking: recalculate bucket memory_bytes after
  DELETE/UPDATE operations to prevent premature flush triggers
- Improve WAL recovery resilience: catch schema-incompatible entries
  instead of aborting recovery, add empty batch skip
- Add timestamp range filtering to MemBuffer queries: extract bounds
  from filter expressions and skip non-overlapping time buckets
- Switch from GreedyMemoryPool to FairSpillPool for per-query memory
  fairness and automatic spill-to-disk under pressure
- Make WAL fsync interval configurable via TIMEFUSION_WAL_FSYNC_MS
  env var (default 200ms)
… optimization

- WAL serialization now uses Arrow IPC (v129) instead of custom CompactBatch,
  with automatic fallback to legacy v128 format for existing WAL entries
- MemBuffer compacts multiple small batches into a single RecordBatch before
  flush to reduce small file writes
- Optimization window, min file threshold, and light optimize target size are
  now configurable via maintenance config
- Switch TimeBucket from RwLock to parking_lot::Mutex for lower overhead
- Add compact-on-read in query()/query_partitioned(): first read compacts
  batches in-place, subsequent reads get pre-compacted single batch
- Remove insert-time compaction that caused +64% batch_api write regression
- Revert WAL serialization from Arrow IPC back to bincode CompactBatch
  (IPC schema preamble overhead caused +18-20% SQL insert regression)
- Keep IPC deserialization as fallback for backward compatibility
- Skip VariantToJsonExec wrapper for tables with no Variant columns
- Add bloom filter config (timefusion_bloom_filter_disabled), enabled by default
- Add WAL file monitoring and emergency flush on file count threshold
- Add criterion benchmarks for write, read, S3 flush, and S3 read paths
@claude
Copy link

claude bot commented Feb 16, 2026

Code Review: PR #16 - Variant SELECT/INSERT Rewriter and Architecture Docs

Overview

This is a well-architected and production-ready PR that adds comprehensive Variant type support, extensive documentation, and performance optimizations. The implementation is solid with clean separation of concerns and proper integration into the DataFusion query pipeline.

Overall Assessment:Approve with minor recommendations


🎯 Key Strengths

1. Clean Implementation

  • Variant rewriters properly integrated into analyzer rule pipeline with correct ordering
  • VariantInsertRewriter runs BEFORE TypeCoercion (prevents type mismatches)
  • VariantSelectRewriter runs AFTER TypeCoercion (ensures correct output types)
  • Double-wrap prevention implemented correctly in both rewriters

2. Comprehensive Documentation

  • docs/ARCHITECTURE.md: Excellent system overview with clear diagrams
  • docs/VARIANT_TYPE_SYSTEM.md: Well-explained Variant representation and transformations
  • docs/WAL.md: Thorough coverage of WAL implementation and recovery

3. Performance Focus

  • New benchmark suite covers critical paths (write, read, S3 flush/read)
  • Compact-on-read optimization for MemBuffer queries
  • Configurable bloom filters and WAL fsync intervals
  • Proper retry/timeout configuration for S3 operations

4. Type Safety & Error Handling

  • Proper use of Result types throughout
  • JSON validation fails fast on invalid input
  • WAL corruption tracking with configurable thresholds
  • 100MB batch size limit prevents OOM attacks

⚠️ Areas for Improvement

High Priority

1. Missing Unit Tests for Variant Rewriters

Location: src/optimizers/variant_insert_rewriter.rs & variant_select_rewriter.rs

The rewriters have no dedicated unit tests - only indirect coverage through integration tests.

Recommendation: Add unit tests for:

// Test cases needed:
- Double-wrap prevention (SELECT with already-wrapped columns)
- Alias preservation in VariantSelectRewriter
- Nested expression handling in VariantInsertRewriter
- Edge case: Empty projections (COUNT(*))
- Edge case: Mixed Variant/non-Variant columns

2. Schema Evolution Not Addressed

Location: src/schema_loader.rs:18-26

The Variant type detection relies on exact struct pattern matching (metadata + value fields), but doesn't handle schema evolution:

pub fn is_variant_type(dt: &DataType) -> bool {
    matches!(dt, DataType::Struct(fields)
        if fields.len() == 2
        && fields.iter().any(|f| f.name() == "metadata")
        && fields.iter().any(|f| f.name() == "value"))
}

Issues:

  • If JSON structure within Variant changes (new fields added/removed), no migration path
  • If Parquet Variant spec evolves, detection could fail silently

Recommendation:

  • Document Parquet Variant version dependency explicitly
  • Consider adding version field or more flexible matching
  • Add migration framework for future schema changes

3. JSON Conversion Error Handling

Location: src/database.rs:87-164

The json_strings_to_variant() function fails the entire batch on first invalid JSON:

let variant = VariantStruct::try_from_json(&json_str)
    .map_err(|e| DataFusionError::Execution(format!(
        "Failed to parse JSON at row {}: {}", i, e
    )))?;

Issue: No option for partial batch success or row-level error recovery

Recommendation: Consider adding a "skip invalid rows" mode with logging for production resilience.


Medium Priority

4. Variant-Specific Benchmarks Missing

Location: benches/core_benchmarks.rs

Current benchmarks don't measure Variant-specific operations:

Missing coverage:

  • json_to_variant() conversion performance
  • variant_to_json() serialization overhead
  • Comparison: Variant columns vs. regular JSON columns
  • Large JSON document handling (e.g., 10KB+ objects)

Recommendation: Add Variant-focused benchmarks to track regression in conversion paths.

5. Batch Concatenation Performance

Location: src/mem_buffer.rs:395

The query path concatenates multiple batches per time bucket without size checks:

let combined_batch = concat_batches(&schema, &batches)?;

Concern: High-cardinality scenarios with many small batches could cause GC pressure

Recommendation: Profile this path and consider batch consolidation threshold.

6. Limited WAL Recovery Granularity

Location: src/wal.rs:357-385

Recovery is binary - either skip all corruption or fail completely. No graduated recovery option.

Recommendation: Add configurable partial recovery (e.g., "skip up to N corrupt entries, then fail").


Low Priority (Documentation)

7. Performance Implications Not Documented

Location: docs/VARIANT_TYPE_SYSTEM.md

Missing sections:

  • Performance overhead of Variant vs. native JSON types
  • When to use Variant vs. regular columns
  • Query optimization tips for Variant columns

8. Schema Evolution Limitations

Location: docs/VARIANT_TYPE_SYSTEM.md

Should document:

  • Current limitations with schema changes
  • Best practices for evolving Variant structures
  • Migration strategies

🔒 Security Assessment

No critical security issues found

Positive aspects:

  1. JSON validation prevents injection attacks
  2. 100MB batch size limit prevents OOM DoS
  3. WAL version detection prevents format confusion
  4. No SQL injection vectors (uses DataFusion's prepared execution)

Minor concern:

  • No rate limiting on Variant conversions (could DoS with massive JSON payloads)
  • Consider adding max JSON object size limit

📊 Test Coverage

Current coverage: Good integration test coverage through .slt files

Gaps identified:

  1. No unit tests for Variant rewriters (see Issues #1 above)
  2. No DELETE/UPDATE performance benchmarks
  3. No memory pressure scenarios in benchmarks
  4. Limited edge case coverage for Variant conversion

Recommendation: Add the missing unit tests before merge.


🎨 Code Quality Notes

Excellent patterns observed:

  • Clean use of DataFusion's Transformed API in rewriters
  • Proper logging with variant column counts (variant_insert_rewriter.rs:152-155)
  • Safe WAL magic byte detection prevents format confusion
  • Thread-safe concurrent access with DashMap

Minor style suggestions:

src/optimizers/variant_select_rewriter.rs:66-69

// Current (works but slightly unclear):
let (qualifier, field) = new_expr_with_alias.into_parts();
// ...
new_expr_with_alias = wrapped.alias_if_changed(alias);

// Suggested (more explicit):
let (qualifier, original_field) = new_expr_with_alias.into_parts();
let wrapped = create_udf("variant_to_json", vec![original_field]);
new_expr_with_alias = wrapped.alias(alias);

src/database.rs:103 - Silent skip could mask bugs:

// Current:
if idx >= columns.len() { continue; }

// Suggested: Add debug logging
if idx >= columns.len() {
    debug!("Column index {} exceeds batch length {}, skipping", idx, columns.len());
    continue;
}

📝 Summary & Recommendations

Before Merge:

  • Add unit tests for VariantInsertRewriter and VariantSelectRewriter
  • Document Parquet Variant version dependencies in code comments
  • Add performance implications section to docs/VARIANT_TYPE_SYSTEM.md

Post-Merge (Future Work):

  • Add Variant-specific performance benchmarks
  • Implement schema evolution framework
  • Add batch recovery mode for JSON conversion errors
  • Profile batch concatenation performance
  • Add troubleshooting guide for Variant type issues

✅ Conclusion

This PR represents high-quality work with:

  • Solid architecture and implementation
  • Comprehensive documentation
  • Production-ready error handling and durability
  • Performance-conscious design

The identified issues are minor and mostly centered around test coverage and documentation completeness. The core implementation is sound and follows DataFusion best practices.

Recommendation: Approve with the understanding that unit tests for the rewriters should be added before or shortly after merge.

Great work on the comprehensive documentation - the architecture docs will be invaluable for future contributors! 🎉

@claude
Copy link

claude bot commented Feb 16, 2026

Code Review - PR #16: Variant SELECT Rewriter and Architecture Documentation

Overall Assessment

Approval Status: ✅ Approve with minor suggestions

This is a high-quality PR with well-designed components. The VariantSelectRewriter and VariantInsertRewriter are clean, focused implementations that integrate well with DataFusion's analyzer pipeline.


Component Reviews

1. VariantSelectRewriter (src/optimizers/variant_select_rewriter.rs)

Quality: ✅ Excellent

What it does: Automatically wraps Variant columns with variant_to_json() in SELECT projections to ensure proper PostgreSQL wire protocol serialization.

Minor suggestion: Line 61 - Consider logging when get_type() fails:

expr.get_type(schema).map(|dt| is_variant_type(&dt)).unwrap_or(false)

The unwrap_or(false) safely defaults to not wrapping, but logging failures could help debug schema issues.


2. VariantInsertRewriter (src/optimizers/variant_insert_rewriter.rs)

Quality: ✅ Good overall

Potential Issues:

  1. Name-based column matching (lines 52-57): Relies on exact column name matching between input and target schemas. Verify this works correctly with case-sensitive column names and when column order differs.

  2. UTF-8 detection is conservative (lines 172-178): Only wraps literal strings and casts, not function calls returning strings. This is safe but might miss valid conversion opportunities. Acceptable design choice.

  3. JSON validation timing: Invalid JSON only fails at execution time in convert_variant_columns(). This is late but safe and provides good error messages.


3. Database.rs Changes

Quality: ✅ Excellent

Highlights:

  • Excellent error messages including row index and problematic values (lines 153-158)
  • Proper row count preservation with with_row_count() (line 196)
  • Comprehensive schema compatibility checks

Note: The fail-fast approach on invalid JSON is correct for ACID compliance, though one invalid row fails the entire batch. The error messages are detailed enough for users to fix issues.


4. DML.rs Changes

Quality: ✅ Excellent, with one verification needed

Security: ✅ Outstanding - Mandatory project_id filters properly enforce multi-tenancy

Verification needed: Lines 185-198, extract_project_id() uses val.to_string(). Verify that ScalarValue's Display implementation doesn't include quotes (e.g., ensure it produces project_123 not 'project_123').

Question: Lines 393-418 - The dual execution path runs operations on both buffered and committed data. If the same logical row exists in both, are updates counted twice? Please clarify the expected behavior.


Test Coverage Assessment

Overall: 🟡 Good but incomplete

Missing test cases:

  1. ❌ Invalid JSON insertion error handling
  2. ❌ NULL Variant values
  3. ❌ Deeply nested JSON objects
  4. ❌ Variant-specific integration test (INSERT/SELECT round-trip)
  5. ❌ Very large JSON objects (performance/memory)

Recommendation: Add at least tests #1, #2, and #4 before merging.


Performance Considerations

Positive:

  • ✅ Single-pass transformations
  • ✅ Early bailout optimizations
  • ✅ Batch processing

Considerations:

  • JSON parsing on every INSERT has overhead (unavoidable)
  • Large JSON batches could spike memory usage
  • Recommendation: Document any batch size limits or consider streaming conversion for very large batches

Security Assessment

Status: ✅ Excellent - No security issues found

  • ✅ No SQL injection vectors
  • ✅ Multi-tenancy properly enforced
  • ✅ No unsafe code
  • ✅ Proper input validation

Rust Best Practices

Assessment: ✅ Excellent adherence

  • Proper Result types throughout
  • No unwrap() in production paths
  • Strong typing and good separation of concerns
  • Clear documentation

Action Items

Before Merge:

  1. Add test for invalid JSON handling
  2. Add test for NULL Variant values
  3. Add Variant INSERT/SELECT round-trip integration test
  4. Verify ScalarValue::to_string() behavior in project_id extraction
  5. Clarify dual execution path behavior for UPDATE/DELETE row counting

Nice to Have:

  • Add logging when type resolution fails (variant_select_rewriter.rs:61)
  • Consider validating project_id is non-empty string, not just checking extraction success
  • Document batch size recommendations for large JSON objects
  • Add benchmark for Variant operations

Conclusion

The code is well-architected with proper error handling and security practices. The recommendations focus on test coverage and edge case handling rather than fixing bugs. Great work! 🎉


Review performed by Claude Code analyzing codebase patterns, Rust best practices, security, and performance implications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments