diff --git a/.gitignore b/.gitignore index 22dd177..e6fac60 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,6 @@ criterion/ # Testnet data testnet/ + +.serena +.claude \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 1a55921..a936952 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,10 @@ bitvec = "1.0" # Storage rocksdb = { version = "0.21", default-features = false, features = ["lz4"] } +reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +reth-codecs = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } # Networking futures = "0.3" diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index b91f8b2..55e3c1d 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -30,6 +30,14 @@ tracing = { workspace = true } # Storage backend (for InMemoryStore) parking_lot = "0.12" +# MDBX storage backend (reth-db) +reth-db = { workspace = true, optional = true } +reth-db-api = { workspace = true, optional = true } +reth-codecs = { workspace = true, optional = true } + +# BitVec for attestation bitmaps +bitvec = { workspace = true } + [dev-dependencies] tempfile = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } @@ -37,3 +45,4 @@ rand = { workspace = true } [features] default = [] +mdbx = ["reth-db", "reth-db-api", "reth-codecs"] diff --git a/crates/storage/INTEGRATION_PLAN.md b/crates/storage/INTEGRATION_PLAN.md new file mode 100644 index 0000000..853cc0c --- /dev/null +++ b/crates/storage/INTEGRATION_PLAN.md @@ -0,0 +1,314 @@ +# Execution Layer + Storage Layer Integration Plan + +## Overview + +This document outlines the integration plan between the Execution Layer (`crates/execution`) and Storage Layer (`crates/storage`) for CipherBFT. + +--- + +## Current State Analysis + +### Execution Layer (feat/el-integration branch) + +| Component | Status | Description | +|-----------|--------|-------------| +| `ExecutionEngine` | Done | Core execution engine with EVM | +| `Provider` trait | Done | Storage abstraction interface | +| `InMemoryProvider` | Done | In-memory implementation (for tests) | +| `StateManager

` | Done | State root computation & snapshots | +| `CipherBftEvmConfig` | Done | EVM configuration (Cancun fork) | +| `StakingPrecompile` | Done | Staking precompile at 0x100 | +| `ExecutionLayer` (lib.rs) | Placeholder | Public API wrapper (Phase 2) | + +#### Provider Trait Interface + +```rust +pub trait Provider: Send + Sync { + fn get_account(&self, address: Address) -> Result>; + fn get_code(&self, code_hash: B256) -> Result>; + fn get_storage(&self, address: Address, slot: U256) -> Result; + fn get_block_hash(&self, number: u64) -> Result>; + fn set_account(&self, address: Address, account: Account) -> Result<()>; + fn set_code(&self, code_hash: B256, bytecode: Bytecode) -> Result<()>; + fn set_storage(&self, address: Address, slot: U256, value: U256) -> Result<()>; + fn set_block_hash(&self, number: u64, hash: B256) -> Result<()>; +} +``` + +### Storage Layer (kyrie/storage-layer branch) + +| Component | Status | Description | +|-----------|--------|-------------| +| `DclStore` trait | Done | Consensus data storage interface | +| `MdbxDclStore` | Done | MDBX-based implementation | +| `DclStoreTx` | Done | Transaction support | +| WAL (Write-Ahead Log) | Done | Crash recovery | +| Pruning Service | Done | Garbage collection | + +#### Current Tables (Consensus Data Only) + +- `Batches`, `Cars`, `CarsByHash` - Batch/CAR data +- `Attestations` - Attestations +- `PendingCuts`, `FinalizedCuts` - Cuts +- `ConsensusWal`, `ConsensusState` - Consensus state +- `ValidatorSets`, `Votes`, `Proposals` - Validator/voting data + +--- + +## Known Issues + +### 1. Dependency Conflict (c-kzg version) + +``` +execution layer: alloy 1.x + revm 33 + c-kzg 2.x +storage layer: reth v1.1.0 -> alloy 0.4.x + c-kzg 1.x +``` + +**Solution:** Upgrade reth to a version that uses alloy 1.x and c-kzg 2.x + +```toml +# Current (Cargo.toml workspace) +reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } + +# Required: Find reth version compatible with alloy 1.x +``` + +### 2. Missing MdbxProvider + +Execution layer only has `InMemoryProvider`. Need to implement `MdbxProvider` that uses storage layer's MDBX backend for persistence. + +--- + +## Integration Architecture + +``` ++-----------------------------------------------------------+ +| crates/storage | ++--------------------------+--------------------------------+ +| [Existing] DclStore | [NEW] EvmStore | +| - Batches, Cars, Cuts | - Accounts | +| - Attestations | - Code | +| - ConsensusState | - Storage | +| | - BlockHashes | ++--------------------------+--------------------------------+ + | + v ++-----------------------------------------------------------+ +| crates/execution | +| MdbxProvider implements Provider trait | +| (uses storage layer's EvmStore) | ++-----------------------------------------------------------+ + | + v ++-----------------------------------------------------------+ +| ExecutionEngine | +| - execute_block() | +| - validate_block() | +| - seal_block() | ++-----------------------------------------------------------+ +``` + +--- + +## Implementation Steps + +### Phase 1: Resolve Dependency Conflict + +- [ ] Research reth versions compatible with alloy 1.x +- [ ] Update workspace Cargo.toml with new reth version +- [ ] Verify storage layer builds with updated dependencies +- [ ] Verify execution layer builds +- [ ] Verify both crates build together + +### Phase 2: Add EVM Tables to Storage Layer + +**File:** `crates/storage/src/mdbx/tables.rs` + +```rust +// New tables for EVM state +pub struct EvmAccounts; // Address -> Account +pub struct EvmCode; // CodeHash -> Bytecode +pub struct EvmStorage; // (Address, Slot) -> Value +pub struct EvmBlockHashes; // BlockNumber -> Hash + +// New tables for Staking Precompile state +pub struct StakingValidators; // Address -> ValidatorInfo +pub struct StakingMetadata; // () -> StakingMetadata (total_stake, epoch) +``` + +### Phase 2.5: Staking Precompile Storage Integration + +**Problem:** `StakingPrecompile` currently stores state in memory only: + +```rust +// Current (in-memory, lost on restart) +pub struct StakingPrecompile { + state: Arc>, +} +``` + +**Solution:** Integrate with storage layer for persistence: + +```rust +// New (persistent) +pub struct StakingPrecompile { + store: Arc, + cache: Arc>, // Optional: in-memory cache +} + +pub trait StakingStore: Send + Sync { + fn get_validator(&self, address: Address) -> Result>; + fn set_validator(&self, address: Address, info: ValidatorInfo) -> Result<()>; + fn delete_validator(&self, address: Address) -> Result<()>; + fn get_all_validators(&self) -> Result>; + fn get_total_stake(&self) -> Result; + fn set_total_stake(&self, stake: U256) -> Result<()>; + fn get_epoch(&self) -> Result; + fn set_epoch(&self, epoch: u64) -> Result<()>; +} +``` + +**Data to persist:** +- `ValidatorInfo` (address, bls_pubkey, stake, registered_at, pending_exit) +- `total_stake` (U256) +- `epoch` (u64) + +### Phase 3: Implement EvmStore Trait + +**File:** `crates/storage/src/evm.rs` (new) + +```rust +pub trait EvmStore: Send + Sync { + fn get_account(&self, address: Address) -> Result>; + fn set_account(&self, address: Address, account: Account) -> Result<()>; + fn get_code(&self, code_hash: B256) -> Result>; + fn set_code(&self, code_hash: B256, bytecode: Bytecode) -> Result<()>; + fn get_storage(&self, address: Address, slot: U256) -> Result; + fn set_storage(&self, address: Address, slot: U256, value: U256) -> Result<()>; + fn get_block_hash(&self, number: u64) -> Result>; + fn set_block_hash(&self, number: u64, hash: B256) -> Result<()>; +} +``` + +### Phase 4: Implement MdbxEvmStore + +**File:** `crates/storage/src/mdbx/evm.rs` (new) + +```rust +pub struct MdbxEvmStore { + db: Arc, +} + +impl EvmStore for MdbxEvmStore { + // MDBX-based implementation +} +``` + +### Phase 5: Implement MdbxProvider in Execution Layer + +**File:** `crates/execution/src/database.rs` (add) + +```rust +use cipherbft_storage::EvmStore; + +pub struct MdbxProvider { + store: Arc, +} + +impl Provider for MdbxProvider { + // Delegate to EvmStore +} +``` + +### Phase 6: Integration Testing + +- [ ] Unit tests for MdbxEvmStore +- [ ] Unit tests for MdbxProvider +- [ ] Integration tests: ExecutionEngine +- [ ] End-to-end test: block execution with persistence + +--- + +## File Changes Summary + +### Storage Layer (crates/storage) + +| File | Action | Description | +|------|--------|-------------| +| `src/mdbx/tables.rs` | Modify | Add EVM + Staking tables | +| `src/evm.rs` | Create | EvmStore trait | +| `src/staking.rs` | Create | StakingStore trait | +| `src/mdbx/evm.rs` | Create | MdbxEvmStore implementation | +| `src/mdbx/staking.rs` | Create | MdbxStakingStore implementation | +| `src/lib.rs` | Modify | Export new modules | + +### Execution Layer (crates/execution) + +| File | Action | Description | +|------|--------|-------------| +| `Cargo.toml` | Modify | Add cipherbft-storage dependency | +| `src/database.rs` | Modify | Add MdbxProvider | +| `src/precompiles/staking.rs` | Modify | Add StakingStore generic, persistence | +| `src/lib.rs` | Modify | Export MdbxProvider, updated StakingPrecompile | + +### Workspace (root) + +| File | Action | Description | +|------|--------|-------------| +| `Cargo.toml` | Modify | Update reth version | + +--- + +## Testing Strategy + +``` +Unit Tests + | + +-- MdbxEvmStore (storage layer) + | +-- test_account_operations + | +-- test_code_operations + | +-- test_storage_operations + | +-- test_block_hash_operations + | + +-- MdbxStakingStore (storage layer) + | +-- test_validator_crud + | +-- test_total_stake_operations + | +-- test_epoch_operations + | +-- test_get_all_validators + | + +-- MdbxProvider (execution layer) + | +-- test_provider_get_account + | +-- test_provider_set_account + | +-- ... + | + +-- StakingPrecompile (execution layer) + +-- test_register_validator_persistent + +-- test_deregister_validator_persistent + +-- test_staking_state_recovery + +Integration Tests + | + +-- ExecutionEngine + +-- test_execute_block_with_persistence + +-- test_state_recovery_after_restart + +-- test_rollback_with_persistence + +-- test_staking_precompile_with_persistence +``` + +--- + +## Open Questions + +1. **Transaction Boundaries:** Should EVM state changes be in the same MDBX transaction as consensus data? + +2. **Snapshot Strategy:** How to handle state snapshots for rollbacks with MDBX? + +3. **Migration:** How to migrate existing InMemoryProvider test data to MdbxProvider tests? + +--- + +## References + +- Execution Layer Design: `crates/execution/DESIGN.md` (in feat/el-integration branch) +- Storage Layer ADR: `docs/adr/adr-010-storage-design.md` +- reth-db documentation: https://github.com/paradigmxyz/reth diff --git a/crates/storage/MDBX_IMPLEMENTATION.md b/crates/storage/MDBX_IMPLEMENTATION.md new file mode 100644 index 0000000..9da5b85 --- /dev/null +++ b/crates/storage/MDBX_IMPLEMENTATION.md @@ -0,0 +1,212 @@ +# MDBX Storage Layer Implementation + +This document summarizes the current state of the MDBX storage backend implementation and outlines remaining work. + +## Overview + +The MDBX storage backend provides persistent storage for CipherBFT using [reth-db](https://github.com/paradigmxyz/reth), which wraps LMDB/MDBX. This implementation follows [ADR-010: Storage Design](../../docs/architecture/adr-010-storage-design.md). + +## Architecture + +``` +crates/storage/src/mdbx/ +├── mod.rs # Module definition and re-exports +├── database.rs # Database wrapper (DatabaseConfig, DatabaseEnv) +├── tables.rs # Table key/value type definitions +├── provider.rs # MdbxDclStore (DclStore trait implementation) +└── wal.rs # MdbxWal (Wal trait implementation) +``` + +## Current Status + +### Completed + +| Component | Status | Description | +|-----------|--------|-------------| +| `DatabaseConfig` | Done | Configuration for DB path, size limits, read-only mode | +| `Database` | Done | Wrapper around reth-db MDBX environment | +| `MdbxDclStore` | **Done** | Full DclStore trait implementation with MDBX operations | +| `MdbxWal` | **Done** | Full Wal trait implementation with MDBX persistence | +| Table Key Types | Done | `CarTableKey`, `HeightRoundKey`, `HashKey`, `HeightKey`, `UnitKey` with Encode/Decode | +| Stored Value Types | Done | `StoredBatch`, `StoredCar`, `StoredCut`, etc. with Serialize/Deserialize | +| **Table Definitions** | **Done** | All 11 tables defined with reth-db `Table` trait | +| **CRUD Operations** | **Done** | All put/get/delete/has methods implemented | +| **Range Queries** | **Done** | Cursor-based range scans for Cars and Cuts | +| **Secondary Indexes** | **Done** | CarsByHash index maintained on put/delete | + +### Feature Flag + +The MDBX backend requires the `mdbx` feature: + +```toml +[dependencies] +cipherbft-storage = { version = "0.1", features = ["mdbx"] } +``` + +## Usage + +```rust +use cipherbft_storage::mdbx::{Database, DatabaseConfig, MdbxDclStore}; +use std::sync::Arc; + +// Open database +let config = DatabaseConfig::new("/path/to/db"); +let db = Arc::new(Database::open(config)?); + +// Create store +let store = MdbxDclStore::new(db); + +// Use DclStore trait methods +store.put_batch(batch).await?; +store.put_car(car).await?; +``` + +## Table Definitions + +All 11 tables are now defined in `tables.rs` using the reth-db `Table` trait: + +### DCL Tables + +| Table | Key Type | Value Type | Description | +|-------|----------|------------|-------------| +| `Batches` | `HashKey` | `BincodeValue` | Transaction batches from Workers | +| `Cars` | `CarTableKey` | `BincodeValue` | CARs indexed by (validator, position) | +| `CarsByHash` | `HashKey` | `CarTableKey` | Secondary index for Car lookup by hash | +| `Attestations` | `HashKey` | `BincodeValue` | Aggregated BLS attestations | +| `PendingCuts` | `HeightKey` | `BincodeValue` | Cuts awaiting consensus | +| `FinalizedCuts` | `HeightKey` | `BincodeValue` | Finalized Cuts | + +### Consensus Tables + +| Table | Key Type | Value Type | Description | +|-------|----------|------------|-------------| +| `ConsensusState` | `UnitKey` | `BincodeValue` | Singleton current state | +| `ConsensusWal` | `HeightKey` | `BincodeValue` | WAL entries | +| `ValidatorSets` | `HeightKey` | `BincodeValue` | Validator sets by epoch | +| `Votes` | `HeightRoundKey` | `BincodeValue` | Votes by (height, round) | +| `Proposals` | `HeightRoundKey` | `BincodeValue` | Proposals by (height, round) | + +### Key Types + +| Key Type | Size | Description | +|----------|------|-------------| +| `HashKey` | 32 bytes | 32-byte hash, big-endian encoded | +| `CarTableKey` | 28 bytes | (validator_prefix[20] + position[8]) | +| `HeightKey` | 8 bytes | u64 height, big-endian for sorted iteration | +| `HeightRoundKey` | 12 bytes | (height[8] + round[4]) | +| `UnitKey` | 1 byte | Singleton key for single-row tables | + +### Value Encoding + +All values use `BincodeValue` wrapper which: +- Implements `Compress` trait (serializes with bincode) +- Implements `Decompress` trait (deserializes with bincode) +- Provides compact binary representation + +## TODO + +### Phase 1: Core MDBX Operations ✅ COMPLETED + +- [x] **Define tables using reth-db Table trait** + - ~~Use `define_tables!` macro for type-safe table definitions~~ + - Implemented using direct `Table` trait implementation + - All 11 tables defined: `Batches`, `Cars`, `CarsByHash`, `Attestations`, `PendingCuts`, `FinalizedCuts`, `ConsensusState`, `ConsensusWal`, `ValidatorSets`, `Votes`, `Proposals` + +- [x] **Implement actual MDBX read/write in MdbxDclStore** + - All CRUD operations implemented with proper transaction handling + - Implemented `put_*`, `get_*`, `delete_*`, `has_*` for all data types + - Secondary index `CarsByHash` maintained on put/delete operations + +- [x] **Implement cursor-based queries** + - `get_cars_range`: Range scan for Cars by validator ✅ + - `get_finalized_cuts_range`: Range scan for Cuts by height ✅ + - `get_highest_car_position`: Reverse scan to find max position ✅ + - `get_latest_finalized_cut`: Reverse scan for latest Cut ✅ + - `get_all_pending_cuts`: Full table scan ✅ + - `prune_before`: Cursor-based deletion with count tracking ✅ + - `stats`: Table entry counting ✅ + +### Phase 2: WAL and Recovery ✅ COMPLETED + +- [x] **Implement persistent WAL in MdbxWal** + - Store WAL entries in `ConsensusWal` table ✅ + - Implement `append`, `replay_from`, `truncate_before` ✅ + - Implement `last_checkpoint` with reverse scan ✅ + - Implement `load_next_index` for restart recovery ✅ + - MDBX provides durable writes by default + +- [x] **RecoveryManager already exists** + - `WalRecovery` in `wal.rs` handles recovery + - Finds last checkpoint and replays entries + - Reconstructs `RecoveredState` with all data types + +### Phase 3: Transactions ✅ COMPLETED + +- [x] **Implement DclStoreTx trait** + - `MdbxDclStoreTx` wraps MDBX write transactions ✅ + - Support atomic batch operations with all put methods ✅ + - Implement `commit()` and `abort()` ✅ + - `finalize_cut()` atomically moves pending → finalized ✅ + +- [x] **Implement DclStoreExt trait** + - Factory method `begin_tx()` for creating transactions ✅ + - Returns `MdbxDclStoreTx` for batch operations + +### Phase 4: Garbage Collection ✅ COMPLETED + +- [x] **Implement prune_before()** + - Delete finalized Cuts before threshold height ✅ + - Delete unreferenced Cars (not in any retained Cut) ✅ + - Delete unreferenced Attestations ✅ + - Delete unreferenced Batches ✅ + - Track and return pruned entry count ✅ + - Reference tracking: collects retained Car/Batch hashes from Cuts >= threshold + +- [x] **Background pruning task** + - `PruningConfig` with configurable retention and interval ✅ + - `PruningTask` and `PruningHandle` for control ✅ + - Default: retain 100,000 blocks, run every 1,000 blocks ✅ + - `spawn_pruning_task()` and `spawn_pruning_task_with_config()` helpers ✅ + +### Phase 5: Testing and Integration + +- [ ] **Integration tests** + - Test with temporary databases + - Verify data persistence across restarts + - Test concurrent access patterns + +- [ ] **Benchmarks** + - Write throughput (batch inserts) + - Read latency (point queries, range scans) + - Storage efficiency (compression ratios) + +- [ ] **Crash recovery tests** + - Simulate crashes at various points + - Verify WAL replay correctness + +## Dependencies + +```toml +[dependencies] +reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +reth-codecs = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.0" } +``` + +## Design Decisions + +1. **Reth Compatibility**: Reuse reth-db for MDBX wrapper to leverage battle-tested code and maintain ecosystem compatibility. + +2. **Single Database**: All data stored in one MDBX environment for atomic cross-table operations. + +3. **Bincode Serialization**: Values serialized with bincode for compact binary representation. + +4. **Secondary Indexes**: `CarsByHash` table provides O(1) lookup by Car hash, maintained manually on writes. + +5. **Optional Feature**: MDBX backend is opt-in via feature flag to keep default builds lightweight. + +## References + +- [ADR-010: Storage Design](../../docs/architecture/adr-010-storage-design.md) +- [reth-db documentation](https://github.com/paradigmxyz/reth/tree/main/crates/storage/db) +- [MDBX documentation](https://erthink.github.io/libmdbx/) diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index edd05b9..1663c81 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -12,7 +12,7 @@ //! The storage layer uses trait-based abstractions to allow multiple backends: //! - [`DclStore`]: Main trait for DCL storage operations //! - [`InMemoryStore`]: In-memory implementation for testing -//! - Future: RocksDB/MDBX implementation for production +//! - [`mdbx::MdbxDclStore`]: MDBX-backed implementation for production (requires `mdbx` feature) //! //! # Write-Ahead Log (WAL) //! @@ -22,6 +22,8 @@ //! //! # Usage //! +//! ## In-Memory Store (Testing) +//! //! ```ignore //! use cipherbft_storage::{DclStore, InMemoryStore}; //! @@ -29,14 +31,42 @@ //! store.put_batch(batch).await?; //! store.put_car(car).await?; //! ``` +//! +//! ## MDBX Store (Production) +//! +//! Requires the `mdbx` feature: +//! +//! ```ignore +//! use cipherbft_storage::mdbx::{Database, DatabaseConfig, MdbxDclStore}; +//! use std::sync::Arc; +//! +//! let config = DatabaseConfig::new("/path/to/db"); +//! let db = Arc::new(Database::open(config)?); +//! let store = MdbxDclStore::new(db); +//! store.put_batch(batch).await?; +//! ``` +//! +//! # Feature Flags +//! +//! - `mdbx`: Enables the MDBX storage backend using reth-db pub mod dcl; pub mod error; pub mod memory; +pub mod pruning; pub mod tables; pub mod wal; +// MDBX backend (requires feature flag) +#[cfg(feature = "mdbx")] +pub mod mdbx; + pub use dcl::DclStore; pub use error::StorageError; pub use memory::InMemoryStore; +pub use pruning::{PruningConfig, PruningHandle, PruningTask}; pub use wal::{Wal, WalEntry}; + +// Re-export MDBX types when feature is enabled +#[cfg(feature = "mdbx")] +pub use mdbx::{Database, DatabaseConfig, MdbxDclStore, MdbxWal}; diff --git a/crates/storage/src/mdbx/database.rs b/crates/storage/src/mdbx/database.rs new file mode 100644 index 0000000..01f5597 --- /dev/null +++ b/crates/storage/src/mdbx/database.rs @@ -0,0 +1,221 @@ +//! Database wrapper for MDBX backend +//! +//! This module provides a high-level wrapper around reth-db's MDBX database, +//! handling initialization, configuration, and table creation. + +use crate::error::{Result, StorageError}; +use reth_db::{mdbx::DatabaseArguments, ClientVersion, DatabaseEnv as RethDatabaseEnv}; +use reth_db_api::database::Database as DatabaseTrait; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tracing::{debug, info}; + +/// Database configuration +#[derive(Debug, Clone)] +pub struct DatabaseConfig { + /// Path to the database directory + pub path: PathBuf, + /// Maximum database size in bytes (default: 1TB) + pub max_size: usize, + /// Maximum number of readers (default: 256) + pub max_readers: u32, + /// Grow step when database needs more space (default: 4GB) + pub growth_step: usize, + /// Enable read-only mode + pub read_only: bool, +} + +impl Default for DatabaseConfig { + fn default() -> Self { + Self { + path: PathBuf::from("data/cipherbft"), + max_size: 1024 * 1024 * 1024 * 1024, // 1TB + max_readers: 256, + growth_step: 4 * 1024 * 1024 * 1024, // 4GB + read_only: false, + } + } +} + +impl DatabaseConfig { + /// Create a new config with the given path + pub fn new(path: impl Into) -> Self { + Self { + path: path.into(), + ..Default::default() + } + } + + /// Set read-only mode + pub fn read_only(mut self, read_only: bool) -> Self { + self.read_only = read_only; + self + } + + /// Set maximum database size + pub fn max_size(mut self, size: usize) -> Self { + self.max_size = size; + self + } +} + +/// Database environment wrapper type +/// +/// Uses WriteMap for writable databases, NoWriteMap for read-only. +pub type DatabaseEnv = RethDatabaseEnv; + +/// CipherBFT database wrapper +/// +/// Wraps reth-db's MDBX environment and provides high-level operations. +pub struct Database { + /// The underlying reth-db environment + env: Arc, + /// Configuration used to open the database + config: DatabaseConfig, +} + +impl Database { + /// Open a database at the specified path + pub fn open(config: DatabaseConfig) -> Result { + info!(path = %config.path.display(), "Opening CipherBFT database"); + + // Ensure directory exists + if !config.path.exists() { + std::fs::create_dir_all(&config.path)?; + } + + // Build database arguments + let args = DatabaseArguments::new(ClientVersion::default()) + .with_max_read_transaction_duration(Some( + reth_db::mdbx::MaxReadTransactionDuration::Set(std::time::Duration::from_secs(60)), + )); + + // Open the environment + let env = reth_db::init_db(&config.path, args) + .map_err(|e| StorageError::Database(format!("Failed to open database: {e}")))?; + + debug!("Database opened successfully"); + + Ok(Self { + env: Arc::new(env), + config, + }) + } + + /// Open a database for testing with a temporary directory + #[cfg(test)] + pub fn open_temp() -> Result<(Self, tempfile::TempDir)> { + let temp_dir = tempfile::tempdir()?; + + let config = DatabaseConfig::new(temp_dir.path()); + let db = Self::open(config)?; + + Ok((db, temp_dir)) + } + + /// Get the underlying database environment + pub fn env(&self) -> &Arc { + &self.env + } + + /// Get the database path + pub fn path(&self) -> &Path { + &self.config.path + } + + /// Check if the database is read-only + pub fn is_read_only(&self) -> bool { + self.config.read_only + } + + /// Create a read transaction + pub fn tx(&self) -> Result { + self.env + .tx() + .map_err(|e| StorageError::Database(format!("Failed to create read transaction: {e}"))) + } + + /// Create a write transaction + pub fn tx_mut( + &self, + ) -> Result { + if self.config.read_only { + return Err(StorageError::Database( + "Cannot create write transaction on read-only database".into(), + )); + } + + self.env + .tx_mut() + .map_err(|e| StorageError::Database(format!("Failed to create write transaction: {e}"))) + } + + /// Get database statistics + pub fn stats(&self) -> Result { + let stat = self + .env + .stat() + .map_err(|e| StorageError::Database(format!("Failed to get database stats: {e}")))?; + + Ok(DatabaseStats { + page_size: stat.page_size(), + tree_depth: stat.depth(), + branch_pages: stat.branch_pages() as u64, + leaf_pages: stat.leaf_pages() as u64, + overflow_pages: stat.overflow_pages() as u64, + entries: stat.entries() as u64, + }) + } +} + +impl Clone for Database { + fn clone(&self) -> Self { + Self { + env: Arc::clone(&self.env), + config: self.config.clone(), + } + } +} + +/// Database statistics +#[derive(Debug, Clone, Default)] +pub struct DatabaseStats { + /// Page size in bytes + pub page_size: u32, + /// B-tree depth + pub tree_depth: u32, + /// Number of branch pages + pub branch_pages: u64, + /// Number of leaf pages + pub leaf_pages: u64, + /// Number of overflow pages + pub overflow_pages: u64, + /// Number of entries + pub entries: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_database_config_default() { + let config = DatabaseConfig::default(); + assert_eq!(config.max_readers, 256); + assert!(!config.read_only); + } + + #[test] + fn test_database_config_builder() { + let config = DatabaseConfig::new("/tmp/test") + .read_only(true) + .max_size(1024); + + assert_eq!(config.path, PathBuf::from("/tmp/test")); + assert!(config.read_only); + assert_eq!(config.max_size, 1024); + } + + // Note: Actual database tests require the mdbx feature to be enabled + // and are integration tests rather than unit tests. +} diff --git a/crates/storage/src/mdbx/mod.rs b/crates/storage/src/mdbx/mod.rs new file mode 100644 index 0000000..a27887d --- /dev/null +++ b/crates/storage/src/mdbx/mod.rs @@ -0,0 +1,62 @@ +//! MDBX storage backend for CipherBFT +//! +//! This module provides persistent storage using reth-db (MDBX) per ADR-010. +//! +//! # Architecture +//! +//! The MDBX backend consists of: +//! - [`Database`]: Main database wrapper around reth-db +//! - [`Tables`]: Custom table definitions for DCL and consensus data +//! - [`MdbxDclStore`]: Implementation of [`DclStore`] trait +//! - [`MdbxWal`]: Persistent WAL implementation +//! +//! # Feature Flag +//! +//! This module is only available when the `mdbx` feature is enabled: +//! ```toml +//! cipherbft-storage = { version = "0.1", features = ["mdbx"] } +//! ``` + +mod database; +mod provider; +mod tables; +mod wal; + +pub use database::{Database, DatabaseConfig, DatabaseEnv}; +pub use provider::{MdbxDclStore, MdbxDclStoreTx}; +pub use tables::{ + // Table types + Attestations, + Batches, + // Key types + CarTableKey, + Cars, + CarsByHash, + ConsensusState, + ConsensusWal, + FinalizedCuts, + HashKey, + HeightKey, + HeightRoundKey, + PendingCuts, + Proposals, + // Value types + StoredAggregatedAttestation, + StoredBatch, + StoredBatchDigest, + StoredCar, + StoredCarEntry, + StoredConsensusState, + StoredCut, + StoredProposal, + StoredValidator, + StoredValidatorSet, + StoredVote, + StoredVotes, + StoredWalEntry, + Tables, + UnitKey, + ValidatorSets, + Votes, +}; +pub use wal::MdbxWal; diff --git a/crates/storage/src/mdbx/provider.rs b/crates/storage/src/mdbx/provider.rs new file mode 100644 index 0000000..6b05610 --- /dev/null +++ b/crates/storage/src/mdbx/provider.rs @@ -0,0 +1,1328 @@ +//! MDBX implementation of DclStore +//! +//! This module provides a persistent implementation of the [`DclStore`] trait +//! using MDBX as the storage backend. + +use crate::dcl::{DclStore, StorageStats}; +use crate::error::{Result, StorageError}; +use crate::tables::{CarRange, CutRange}; +use async_trait::async_trait; +use cipherbft_data_chain::{AggregatedAttestation, Batch, BatchDigest, Car, Cut}; +use cipherbft_types::{Hash, ValidatorId}; +use reth_db_api::transaction::DbTx; +use std::sync::Arc; +use tracing::{debug, trace}; + +use super::database::Database; +use super::tables::{ + CarTableKey, HashKey, StoredAggregatedAttestation, StoredBatch, StoredBatchDigest, StoredCar, + StoredCut, +}; + +/// MDBX-backed DCL store +/// +/// Provides persistent storage for all DCL data types using MDBX. +/// Thread-safe and suitable for concurrent access. +pub struct MdbxDclStore { + /// The underlying database + db: Arc, +} + +impl MdbxDclStore { + /// Create a new MDBX DCL store + pub fn new(db: Arc) -> Self { + Self { db } + } + + /// Get the underlying database + pub fn db(&self) -> &Arc { + &self.db + } + + // ============================================================ + // Conversion helpers + // ============================================================ + + #[allow(dead_code)] + fn batch_to_stored(batch: &Batch) -> StoredBatch { + StoredBatch { + worker_id: batch.worker_id, + transactions: batch.transactions.clone(), + timestamp: batch.timestamp, + } + } + + #[allow(dead_code)] + fn stored_to_batch(stored: StoredBatch, _hash: Hash) -> Batch { + Batch { + worker_id: stored.worker_id, + transactions: stored.transactions, + timestamp: stored.timestamp, + } + } + + #[allow(dead_code)] + fn car_to_stored(car: &Car) -> StoredCar { + StoredCar { + proposer: car.proposer.as_bytes().to_vec(), + position: car.position, + batch_digests: car + .batch_digests + .iter() + .map(|bd| StoredBatchDigest { + worker_id: bd.worker_id, + hash: *bd.digest.as_bytes(), + tx_count: bd.tx_count, + size_bytes: bd.byte_size as u64, + }) + .collect(), + parent_ref: car.parent_ref.map(|h| *h.as_bytes()), + signature: car.signature.to_bytes().to_vec(), + hash: *car.hash().as_bytes(), + } + } + + #[allow(dead_code)] + fn stored_to_car(stored: StoredCar) -> Result { + let proposer = ValidatorId::from_bytes( + stored + .proposer + .as_slice() + .try_into() + .map_err(|_| StorageError::Database("Invalid validator ID length".into()))?, + ); + + let batch_digests: Vec = stored + .batch_digests + .into_iter() + .map(|bd| BatchDigest { + worker_id: bd.worker_id, + digest: Hash::from_bytes(bd.hash), + tx_count: bd.tx_count, + byte_size: bd.size_bytes as u32, + }) + .collect(); + + let parent_ref = stored.parent_ref.map(Hash::from_bytes); + + let sig_bytes: [u8; 96] = stored + .signature + .as_slice() + .try_into() + .map_err(|_| StorageError::Database("Invalid BLS signature length".into()))?; + let signature = cipherbft_crypto::BlsSignature::from_bytes(&sig_bytes) + .map_err(|e| StorageError::Database(format!("Invalid BLS signature: {e}")))?; + + Ok(Car { + proposer, + position: stored.position, + batch_digests, + parent_ref, + signature, + }) + } + + #[allow(dead_code)] + fn attestation_to_stored(att: &AggregatedAttestation) -> StoredAggregatedAttestation { + StoredAggregatedAttestation { + car_hash: *att.car_hash.as_bytes(), + car_position: att.car_position, + car_proposer: att.car_proposer.as_bytes().to_vec(), + aggregated_signature: att.aggregated_signature.to_bytes().to_vec(), + signers_bitvec: att.validators.as_raw_slice().to_vec(), + signer_count: att.count() as u32, + } + } + + #[allow(dead_code)] + fn stored_to_attestation(stored: StoredAggregatedAttestation) -> Result { + use bitvec::prelude::*; + + let car_proposer = ValidatorId::from_bytes( + stored + .car_proposer + .as_slice() + .try_into() + .map_err(|_| StorageError::Database("Invalid validator ID length".into()))?, + ); + + let agg_sig_bytes: [u8; 96] = stored + .aggregated_signature + .as_slice() + .try_into() + .map_err(|_| StorageError::Database("Invalid aggregate signature length".into()))?; + let aggregated_signature = + cipherbft_crypto::BlsAggregateSignature::from_bytes(&agg_sig_bytes) + .map_err(|e| StorageError::Database(format!("Invalid BLS signature: {e}")))?; + + // Reconstruct bitvec from raw bytes + let validators = BitVec::::from_vec(stored.signers_bitvec); + + Ok(AggregatedAttestation { + car_hash: Hash::from_bytes(stored.car_hash), + car_position: stored.car_position, + car_proposer, + aggregated_signature, + validators, + }) + } + + #[allow(dead_code)] + fn cut_to_stored(cut: &Cut) -> StoredCut { + StoredCut { + height: cut.height, + cars: cut + .cars + .iter() + .map(|(vid, car)| { + let car_hash = car.hash(); + let attestation = cut + .attestations + .get(&car_hash) + .map(Self::attestation_to_stored); + super::tables::StoredCarEntry { + validator: vid.as_bytes().to_vec(), + car: Self::car_to_stored(car), + attestation, + } + }) + .collect(), + } + } + + #[allow(dead_code)] + fn stored_to_cut(stored: StoredCut) -> Result { + let mut cut = Cut::new(stored.height); + + for entry in stored.cars { + let validator = + ValidatorId::from_bytes( + entry.validator.as_slice().try_into().map_err(|_| { + StorageError::Database("Invalid validator ID length".into()) + })?, + ); + + let car = Self::stored_to_car(entry.car)?; + let car_hash = car.hash(); + + if let Some(stored_att) = entry.attestation { + let attestation = Self::stored_to_attestation(stored_att)?; + cut.attestations.insert(car_hash, attestation); + } + + cut.cars.insert(validator, car); + } + + Ok(cut) + } +} + +#[async_trait] +impl DclStore for MdbxDclStore { + // ============================================================ + // Batch Operations + // ============================================================ + + async fn put_batch(&self, batch: Batch) -> Result<()> { + use super::tables::{Batches, BincodeValue}; + use reth_db_api::transaction::DbTxMut; + + let hash = batch.hash(); + let stored = Self::batch_to_stored(&batch); + let key = HashKey::from_slice(hash.as_bytes()); + + trace!(?hash, "Storing batch"); + + let tx = self.db.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put batch: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit batch: {e}")))?; + + debug!(?hash, "Batch stored"); + Ok(()) + } + + async fn get_batch(&self, hash: &Hash) -> Result> { + use super::tables::Batches; + use reth_db_api::transaction::DbTx; + + let key = HashKey::from_slice(hash.as_bytes()); + + trace!(?hash, "Getting batch"); + + let tx = self.db.tx()?; + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get batch: {e}")))?; + + match result { + Some(bincode_value) => { + let batch = Self::stored_to_batch(bincode_value.0, *hash); + Ok(Some(batch)) + } + None => Ok(None), + } + } + + async fn has_batch(&self, hash: &Hash) -> Result { + Ok(self.get_batch(hash).await?.is_some()) + } + + async fn delete_batch(&self, hash: &Hash) -> Result { + use super::tables::Batches; + use reth_db_api::transaction::DbTxMut; + + let key = HashKey::from_slice(hash.as_bytes()); + + trace!(?hash, "Deleting batch"); + + let tx = self.db.tx_mut()?; + let existed = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to check batch: {e}")))? + .is_some(); + + if existed { + tx.delete::(key, None) + .map_err(|e| StorageError::Database(format!("Failed to delete batch: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit delete: {e}")))?; + debug!(?hash, "Batch deleted"); + } + + Ok(existed) + } + + // ============================================================ + // Car Operations + // ============================================================ + + async fn put_car(&self, car: Car) -> Result<()> { + use super::tables::{BincodeValue, Cars, CarsByHash}; + use reth_db_api::transaction::DbTxMut; + + let hash = car.hash(); + let key = CarTableKey::new(car.proposer.as_bytes(), car.position); + let stored = Self::car_to_stored(&car); + let hash_key = HashKey::from_slice(hash.as_bytes()); + + trace!(proposer = ?car.proposer, position = car.position, "Storing car"); + + let tx = self.db.tx_mut()?; + + // Store the car + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put car: {e}")))?; + + // Maintain secondary index (CarsByHash) + tx.put::(hash_key, key) + .map_err(|e| StorageError::Database(format!("Failed to put car index: {e}")))?; + + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit car: {e}")))?; + + debug!(?hash, "Car stored"); + Ok(()) + } + + async fn get_car(&self, validator: &ValidatorId, position: u64) -> Result> { + use super::tables::Cars; + use reth_db_api::transaction::DbTx; + + let key = CarTableKey::new(validator.as_bytes(), position); + + trace!(?validator, position, "Getting car"); + + let tx = self.db.tx()?; + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get car: {e}")))?; + + match result { + Some(bincode_value) => { + let car = Self::stored_to_car(bincode_value.0)?; + Ok(Some(car)) + } + None => Ok(None), + } + } + + async fn get_car_by_hash(&self, hash: &Hash) -> Result> { + use super::tables::{Cars, CarsByHash}; + use reth_db_api::transaction::DbTx; + + let hash_key = HashKey::from_slice(hash.as_bytes()); + + trace!(?hash, "Getting car by hash"); + + let tx = self.db.tx()?; + + // Look up the car key in the secondary index + let car_key = tx + .get::(hash_key) + .map_err(|e| StorageError::Database(format!("Failed to get car index: {e}")))?; + + match car_key { + Some(key) => { + // Fetch the actual car + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get car: {e}")))?; + + match result { + Some(bincode_value) => { + let car = Self::stored_to_car(bincode_value.0)?; + Ok(Some(car)) + } + None => Ok(None), + } + } + None => Ok(None), + } + } + + async fn get_highest_car_position(&self, validator: &ValidatorId) -> Result> { + use super::tables::Cars; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + trace!(?validator, "Getting highest car position"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + // Create a key with max position for the validator to seek backwards + let validator_prefix: [u8; 20] = { + let bytes = validator.as_bytes(); + let mut arr = [0u8; 20]; + let copy_len = bytes.len().min(20); + arr[..copy_len].copy_from_slice(&bytes[..copy_len]); + arr + }; + + // Create key for next validator (to set upper bound) + let mut next_prefix = validator_prefix; + let mut carry = true; + for i in (0..20).rev() { + if carry { + if next_prefix[i] == 0xFF { + next_prefix[i] = 0; + } else { + next_prefix[i] += 1; + carry = false; + } + } + } + + // Seek to the position just before the next validator + let seek_key = CarTableKey { + validator_prefix: next_prefix, + position: 0, + }; + + // Use prev to find the last entry for this validator + if cursor + .seek(seek_key) + .map_err(|e| StorageError::Database(format!("Cursor seek failed: {e}")))? + .is_some() + { + // Go to previous entry + if let Some((key, _)) = cursor + .prev() + .map_err(|e| StorageError::Database(format!("Cursor prev failed: {e}")))? + { + if key.validator_prefix == validator_prefix { + return Ok(Some(key.position)); + } + } + } else { + // We're at the end, try last + if let Some((key, _)) = cursor + .last() + .map_err(|e| StorageError::Database(format!("Cursor last failed: {e}")))? + { + if key.validator_prefix == validator_prefix { + return Ok(Some(key.position)); + } + } + } + + Ok(None) + } + + async fn get_cars_range(&self, range: CarRange) -> Result> { + use super::tables::Cars; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + trace!(?range.validator_id, start = range.start, end = ?range.end, "Getting cars range"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let start_key = CarTableKey::new(range.validator_id.as_bytes(), range.start); + let end_position = range.end.unwrap_or(u64::MAX); + + let mut cars = Vec::new(); + + // Seek to start position + let mut current = cursor + .seek(start_key) + .map_err(|e| StorageError::Database(format!("Cursor seek failed: {e}")))?; + + let validator_prefix: [u8; 20] = { + let bytes = range.validator_id.as_bytes(); + let mut arr = [0u8; 20]; + let copy_len = bytes.len().min(20); + arr[..copy_len].copy_from_slice(&bytes[..copy_len]); + arr + }; + + while let Some((key, value)) = current { + // Check if we're still within the same validator + if key.validator_prefix != validator_prefix { + break; + } + + // Check if we're past the end position + if key.position > end_position { + break; + } + + // Convert and add the car + let car = Self::stored_to_car(value.0)?; + cars.push(car); + + // Move to next + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + + Ok(cars) + } + + async fn has_car(&self, validator: &ValidatorId, position: u64) -> Result { + Ok(self.get_car(validator, position).await?.is_some()) + } + + async fn delete_car(&self, validator: &ValidatorId, position: u64) -> Result { + use super::tables::{Cars, CarsByHash}; + use reth_db_api::transaction::DbTxMut; + + let key = CarTableKey::new(validator.as_bytes(), position); + + trace!(?validator, position, "Deleting car"); + + let tx = self.db.tx_mut()?; + + // Get the car first to find its hash for index cleanup + let car_result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get car: {e}")))?; + + match car_result { + Some(bincode_value) => { + let hash = bincode_value.0.hash; + let hash_key = HashKey::from_slice(&hash); + + // Delete from Cars table + tx.delete::(key, None) + .map_err(|e| StorageError::Database(format!("Failed to delete car: {e}")))?; + + // Delete from secondary index + tx.delete::(hash_key, None).map_err(|e| { + StorageError::Database(format!("Failed to delete car index: {e}")) + })?; + + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit delete: {e}")))?; + + debug!(?validator, position, "Car deleted"); + Ok(true) + } + None => Ok(false), + } + } + + // ============================================================ + // Attestation Operations + // ============================================================ + + async fn put_attestation(&self, attestation: AggregatedAttestation) -> Result<()> { + use super::tables::{Attestations, BincodeValue}; + use reth_db_api::transaction::DbTxMut; + + let key = HashKey::from_slice(attestation.car_hash.as_bytes()); + let stored = Self::attestation_to_stored(&attestation); + + trace!(car_hash = ?attestation.car_hash, "Storing attestation"); + + let tx = self.db.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put attestation: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit attestation: {e}")))?; + + debug!(car_hash = ?attestation.car_hash, "Attestation stored"); + Ok(()) + } + + async fn get_attestation(&self, car_hash: &Hash) -> Result> { + use super::tables::Attestations; + use reth_db_api::transaction::DbTx; + + let key = HashKey::from_slice(car_hash.as_bytes()); + + trace!(?car_hash, "Getting attestation"); + + let tx = self.db.tx()?; + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get attestation: {e}")))?; + + match result { + Some(bincode_value) => { + let attestation = Self::stored_to_attestation(bincode_value.0)?; + Ok(Some(attestation)) + } + None => Ok(None), + } + } + + async fn has_attestation(&self, car_hash: &Hash) -> Result { + Ok(self.get_attestation(car_hash).await?.is_some()) + } + + async fn delete_attestation(&self, car_hash: &Hash) -> Result { + use super::tables::Attestations; + use reth_db_api::transaction::DbTxMut; + + let key = HashKey::from_slice(car_hash.as_bytes()); + + trace!(?car_hash, "Deleting attestation"); + + let tx = self.db.tx_mut()?; + let existed = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to check attestation: {e}")))? + .is_some(); + + if existed { + tx.delete::(key, None).map_err(|e| { + StorageError::Database(format!("Failed to delete attestation: {e}")) + })?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit delete: {e}")))?; + debug!(?car_hash, "Attestation deleted"); + } + + Ok(existed) + } + + // ============================================================ + // Cut Operations + // ============================================================ + + async fn put_pending_cut(&self, cut: Cut) -> Result<()> { + use super::tables::{BincodeValue, HeightKey, PendingCuts}; + use reth_db_api::transaction::DbTxMut; + + let stored = Self::cut_to_stored(&cut); + let key = HeightKey::new(cut.height); + + trace!(height = cut.height, "Storing pending cut"); + + let tx = self.db.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put pending cut: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit pending cut: {e}")))?; + + debug!(height = cut.height, "Pending cut stored"); + Ok(()) + } + + async fn get_pending_cut(&self, height: u64) -> Result> { + use super::tables::{HeightKey, PendingCuts}; + use reth_db_api::transaction::DbTx; + + let key = HeightKey::new(height); + + trace!(height, "Getting pending cut"); + + let tx = self.db.tx()?; + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get pending cut: {e}")))?; + + match result { + Some(bincode_value) => { + let cut = Self::stored_to_cut(bincode_value.0)?; + Ok(Some(cut)) + } + None => Ok(None), + } + } + + async fn get_all_pending_cuts(&self) -> Result> { + use super::tables::PendingCuts; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + trace!("Getting all pending cuts"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let mut cuts = Vec::new(); + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor first failed: {e}")))?; + + while let Some((_, value)) = current { + let cut = Self::stored_to_cut(value.0)?; + cuts.push(cut); + + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + + Ok(cuts) + } + + async fn finalize_cut(&self, height: u64) -> Result> { + trace!(height, "Finalizing cut"); + + // Get pending cut + let cut = match self.get_pending_cut(height).await? { + Some(cut) => cut, + None => return Ok(None), + }; + + // Delete from pending + self.delete_pending_cut(height).await?; + + // Insert into finalized + self.put_finalized_cut(cut.clone()).await?; + + Ok(Some(cut)) + } + + async fn delete_pending_cut(&self, height: u64) -> Result { + use super::tables::{HeightKey, PendingCuts}; + use reth_db_api::transaction::DbTxMut; + + let key = HeightKey::new(height); + + trace!(height, "Deleting pending cut"); + + let tx = self.db.tx_mut()?; + let existed = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to check pending cut: {e}")))? + .is_some(); + + if existed { + tx.delete::(key, None).map_err(|e| { + StorageError::Database(format!("Failed to delete pending cut: {e}")) + })?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit delete: {e}")))?; + debug!(height, "Pending cut deleted"); + } + + Ok(existed) + } + + async fn put_finalized_cut(&self, cut: Cut) -> Result<()> { + use super::tables::{BincodeValue, FinalizedCuts, HeightKey}; + use reth_db_api::transaction::DbTxMut; + + let stored = Self::cut_to_stored(&cut); + let key = HeightKey::new(cut.height); + + trace!(height = cut.height, "Storing finalized cut"); + + let tx = self.db.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put finalized cut: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit finalized cut: {e}")))?; + + debug!(height = cut.height, "Finalized cut stored"); + Ok(()) + } + + async fn get_finalized_cut(&self, height: u64) -> Result> { + use super::tables::{FinalizedCuts, HeightKey}; + use reth_db_api::transaction::DbTx; + + let key = HeightKey::new(height); + + trace!(height, "Getting finalized cut"); + + let tx = self.db.tx()?; + let result = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get finalized cut: {e}")))?; + + match result { + Some(bincode_value) => { + let cut = Self::stored_to_cut(bincode_value.0)?; + Ok(Some(cut)) + } + None => Ok(None), + } + } + + async fn get_latest_finalized_cut(&self) -> Result> { + use super::tables::FinalizedCuts; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + trace!("Getting latest finalized cut"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + // Get the last entry (highest height due to big-endian ordering) + let result = cursor + .last() + .map_err(|e| StorageError::Database(format!("Cursor last failed: {e}")))?; + + match result { + Some((_, value)) => { + let cut = Self::stored_to_cut(value.0)?; + Ok(Some(cut)) + } + None => Ok(None), + } + } + + async fn get_finalized_cuts_range(&self, range: CutRange) -> Result> { + use super::tables::{FinalizedCuts, HeightKey}; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + trace!(start = range.start, end = ?range.end, "Getting finalized cuts range"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let start_key = HeightKey::new(range.start); + let end_height = range.end.unwrap_or(u64::MAX); + + let mut cuts = Vec::new(); + + // Seek to start position + let mut current = cursor + .seek(start_key) + .map_err(|e| StorageError::Database(format!("Cursor seek failed: {e}")))?; + + while let Some((key, value)) = current { + // Check if we're past the end height + if key.0 > end_height { + break; + } + + // Convert and add the cut + let cut = Self::stored_to_cut(value.0)?; + cuts.push(cut); + + // Move to next + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + + Ok(cuts) + } + + // ============================================================ + // Garbage Collection + // ============================================================ + + async fn prune_before(&self, height: u64) -> Result { + use super::tables::{ + Attestations, Batches, Cars, CarsByHash, FinalizedCuts, HashKey, HeightKey, + }; + use reth_db_api::cursor::{DbCursorRO, DbCursorRW}; + use reth_db_api::transaction::DbTxMut; + use std::collections::HashSet; + + trace!(height, "Pruning before height with reference tracking"); + + // Phase 1: Collect all referenced Car hashes and Batch hashes from retained Cuts + let mut retained_car_hashes: HashSet<[u8; 32]> = HashSet::new(); + let mut retained_batch_hashes: HashSet<[u8; 32]> = HashSet::new(); + + { + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + // Seek to the threshold height + let mut current = cursor + .seek(HeightKey::new(height)) + .map_err(|e| StorageError::Database(format!("Cursor seek failed: {e}")))?; + + // Collect all references from retained Cuts (height >= threshold) + while let Some((_, value)) = current { + for car_entry in &value.0.cars { + retained_car_hashes.insert(car_entry.car.hash); + for batch_digest in &car_entry.car.batch_digests { + retained_batch_hashes.insert(batch_digest.hash); + } + } + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + } + + debug!( + retained_cars = retained_car_hashes.len(), + retained_batches = retained_batch_hashes.len(), + "Collected retained references" + ); + + // Phase 2: Collect Car hashes and Batch hashes to prune from Cuts below threshold + let mut car_hashes_to_prune: HashSet<[u8; 32]> = HashSet::new(); + let mut batch_hashes_to_prune: HashSet<[u8; 32]> = HashSet::new(); + let mut car_keys_to_delete: Vec = Vec::new(); + + { + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor first failed: {e}")))?; + + while let Some((key, value)) = current { + if key.0 >= height { + break; + } + + for car_entry in &value.0.cars { + let car_hash = car_entry.car.hash; + + // Only prune if not retained by any higher Cut + if !retained_car_hashes.contains(&car_hash) { + car_hashes_to_prune.insert(car_hash); + + // Build car key for deletion + let validator_bytes = &car_entry.validator; + if validator_bytes.len() >= 20 { + let mut validator_prefix = [0u8; 20]; + validator_prefix.copy_from_slice(&validator_bytes[..20]); + car_keys_to_delete.push(CarTableKey { + validator_prefix, + position: car_entry.car.position, + }); + } + + // Collect batch hashes for pruning + for batch_digest in &car_entry.car.batch_digests { + if !retained_batch_hashes.contains(&batch_digest.hash) { + batch_hashes_to_prune.insert(batch_digest.hash); + } + } + } + } + + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + } + + debug!( + cars_to_prune = car_hashes_to_prune.len(), + batches_to_prune = batch_hashes_to_prune.len(), + "Collected items to prune" + ); + + // Phase 3: Perform deletions in a single transaction + let tx = self.db.tx_mut()?; + let mut pruned_count = 0u64; + + // 3a: Delete FinalizedCuts below threshold + { + let mut cursor = tx + .cursor_write::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor first failed: {e}")))?; + + while let Some((key, _)) = current { + if key.0 >= height { + break; + } + + cursor + .delete_current() + .map_err(|e| StorageError::Database(format!("Failed to delete cut: {e}")))?; + pruned_count += 1; + + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + } + + // 3b: Delete Cars and their CarsByHash index entries + for car_key in &car_keys_to_delete { + // Delete from Cars table + tx.delete::(*car_key, None) + .map_err(|e| StorageError::Database(format!("Failed to delete car: {e}")))?; + pruned_count += 1; + } + + for car_hash in &car_hashes_to_prune { + // Delete from CarsByHash index + tx.delete::(HashKey(*car_hash), None) + .map_err(|e| { + StorageError::Database(format!("Failed to delete car hash index: {e}")) + })?; + } + + // 3c: Delete Attestations for pruned Cars + for car_hash in &car_hashes_to_prune { + tx.delete::(HashKey(*car_hash), None) + .map_err(|e| { + StorageError::Database(format!("Failed to delete attestation: {e}")) + })?; + pruned_count += 1; + } + + // 3d: Delete unreferenced Batches + for batch_hash in &batch_hashes_to_prune { + tx.delete::(HashKey(*batch_hash), None) + .map_err(|e| StorageError::Database(format!("Failed to delete batch: {e}")))?; + pruned_count += 1; + } + + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit prune: {e}")))?; + + debug!( + height, + pruned_count, + cuts_pruned = car_keys_to_delete.len(), + cars_pruned = car_hashes_to_prune.len(), + attestations_pruned = car_hashes_to_prune.len(), + batches_pruned = batch_hashes_to_prune.len(), + "Pruning completed with reference tracking" + ); + Ok(pruned_count) + } + + async fn stats(&self) -> Result { + use super::tables::{Attestations, Batches, Cars, FinalizedCuts, PendingCuts}; + use reth_db_api::cursor::DbCursorRO; + use reth_db_api::transaction::DbTx; + + let tx = self.db.tx()?; + + // Count entries in each table + let batch_count = { + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + let mut count = 0u64; + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + while current.is_some() { + count += 1; + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + } + count + }; + + let car_count = { + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + let mut count = 0u64; + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + while current.is_some() { + count += 1; + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + } + count + }; + + let attestation_count = { + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + let mut count = 0u64; + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + while current.is_some() { + count += 1; + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + } + count + }; + + let pending_cut_count = { + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + let mut count = 0u64; + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + while current.is_some() { + count += 1; + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + } + count + }; + + let finalized_cut_count = { + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + let mut count = 0u64; + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + while current.is_some() { + count += 1; + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor failed: {e}")))?; + } + count + }; + + // Get storage size from database stats + let db_stats = self.db.stats()?; + let storage_bytes = (db_stats.leaf_pages + db_stats.branch_pages + db_stats.overflow_pages) + * db_stats.page_size as u64; + + Ok(StorageStats { + batch_count, + car_count, + attestation_count, + pending_cut_count, + finalized_cut_count, + storage_bytes, + }) + } +} + +// ============================================================ +// Transaction Support +// ============================================================ + +use crate::dcl::{DclStoreExt, DclStoreTx}; +use reth_db::DatabaseEnv; +use reth_db_api::transaction::DbTxMut; + +type MdbxTx = ::TXMut; + +/// MDBX transaction wrapper for atomic batch operations +pub struct MdbxDclStoreTx { + /// The underlying MDBX write transaction + tx: Option, +} + +impl MdbxDclStoreTx { + /// Create a new transaction wrapper + fn new(tx: MdbxTx) -> Self { + Self { tx: Some(tx) } + } + + /// Get a mutable reference to the transaction + fn tx_mut(&mut self) -> Result<&mut MdbxTx> { + self.tx + .as_mut() + .ok_or_else(|| StorageError::Database("Transaction already consumed".into())) + } + + /// Take ownership of the transaction + fn take_tx(&mut self) -> Result { + self.tx + .take() + .ok_or_else(|| StorageError::Database("Transaction already consumed".into())) + } +} + +#[async_trait] +impl DclStoreTx for MdbxDclStoreTx { + async fn commit(mut self) -> Result<()> { + let tx = self.take_tx()?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit transaction: {e}")))?; + debug!("Transaction committed"); + Ok(()) + } + + async fn abort(mut self) -> Result<()> { + let tx = self.take_tx()?; + tx.abort(); + debug!("Transaction aborted"); + Ok(()) + } + + async fn put_batch(&mut self, batch: Batch) -> Result<()> { + use super::tables::{Batches, BincodeValue}; + + let hash = batch.hash(); + let stored = MdbxDclStore::batch_to_stored(&batch); + let key = HashKey::from_slice(hash.as_bytes()); + + let tx = self.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put batch: {e}")))?; + + trace!(?hash, "Batch added to transaction"); + Ok(()) + } + + async fn put_car(&mut self, car: Car) -> Result<()> { + use super::tables::{BincodeValue, Cars, CarsByHash}; + + let hash = car.hash(); + let key = CarTableKey::new(car.proposer.as_bytes(), car.position); + let stored = MdbxDclStore::car_to_stored(&car); + let hash_key = HashKey::from_slice(hash.as_bytes()); + + let tx = self.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put car: {e}")))?; + tx.put::(hash_key, key) + .map_err(|e| StorageError::Database(format!("Failed to put car index: {e}")))?; + + trace!(?hash, "Car added to transaction"); + Ok(()) + } + + async fn put_attestation(&mut self, attestation: AggregatedAttestation) -> Result<()> { + use super::tables::{Attestations, BincodeValue}; + + let key = HashKey::from_slice(attestation.car_hash.as_bytes()); + let stored = MdbxDclStore::attestation_to_stored(&attestation); + + let tx = self.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put attestation: {e}")))?; + + trace!(car_hash = ?attestation.car_hash, "Attestation added to transaction"); + Ok(()) + } + + async fn put_pending_cut(&mut self, cut: Cut) -> Result<()> { + use super::tables::{BincodeValue, HeightKey, PendingCuts}; + + let stored = MdbxDclStore::cut_to_stored(&cut); + let key = HeightKey::new(cut.height); + + let tx = self.tx_mut()?; + tx.put::(key, BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put pending cut: {e}")))?; + + trace!(height = cut.height, "Pending cut added to transaction"); + Ok(()) + } + + async fn finalize_cut(&mut self, height: u64) -> Result> { + use super::tables::{FinalizedCuts, HeightKey, PendingCuts}; + + let key = HeightKey::new(height); + let tx = self.tx_mut()?; + + // Get the pending cut + let pending = tx + .get::(key) + .map_err(|e| StorageError::Database(format!("Failed to get pending cut: {e}")))?; + + match pending { + Some(bincode_value) => { + // Delete from pending + tx.delete::(key, None).map_err(|e| { + StorageError::Database(format!("Failed to delete pending cut: {e}")) + })?; + + // Insert into finalized + tx.put::(key, bincode_value.clone()) + .map_err(|e| { + StorageError::Database(format!("Failed to put finalized cut: {e}")) + })?; + + let cut = MdbxDclStore::stored_to_cut(bincode_value.0)?; + trace!(height, "Cut finalized in transaction"); + Ok(Some(cut)) + } + None => Ok(None), + } + } +} + +#[async_trait] +impl DclStoreExt for MdbxDclStore { + type Transaction = MdbxDclStoreTx; + + async fn begin_tx(&self) -> Result { + use reth_db_api::database::Database as DbTrait; + + let tx = self + .db + .env() + .tx_mut() + .map_err(|e| StorageError::Database(format!("Failed to begin transaction: {e}")))?; + + debug!("Transaction started"); + Ok(MdbxDclStoreTx::new(tx)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Note: Full tests require the mdbx feature and a test database. + // These are integration tests that should be run separately. + + #[test] + fn test_car_table_key_creation() { + let validator_bytes = [1u8; 32]; + let key = CarTableKey::new(&validator_bytes, 42); + assert_eq!(key.position, 42); + assert_eq!(&key.validator_prefix[..], &validator_bytes[..20]); + } +} diff --git a/crates/storage/src/mdbx/tables.rs b/crates/storage/src/mdbx/tables.rs new file mode 100644 index 0000000..bf7100a --- /dev/null +++ b/crates/storage/src/mdbx/tables.rs @@ -0,0 +1,623 @@ +//! MDBX table definitions for CipherBFT per ADR-010 +//! +//! This module defines custom tables for DCL and consensus data using reth-db macros. +//! EVM state tables are reused from reth-db directly. +//! +//! # Table Categories +//! +//! ## DCL Tables (Custom) +//! - `Batches`: Transaction batches from Workers +//! - `Cars`: Certified Available Records indexed by (validator, position) +//! - `CarsByHash`: Secondary index for Car lookup by hash +//! - `Attestations`: Aggregated BLS attestations +//! - `PendingCuts`: Cuts awaiting consensus +//! - `FinalizedCuts`: Consensus-finalized Cuts +//! +//! ## Consensus Tables (Custom) +//! - `ConsensusState`: Current height/round/step +//! - `ConsensusWal`: Write-ahead log entries +//! - `ValidatorSets`: Validator sets by epoch +//! - `Votes`: Collected votes by (height, round) +//! - `Proposals`: Block proposals by (height, round) + +use reth_db_api::table::{Compress, Decode, Decompress, Encode}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +// ============================================================ +// Key Types +// ============================================================ + +/// Key for Cars table: (ValidatorId bytes, position) +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Default, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct CarTableKey { + /// First 20 bytes of validator ID (truncated for efficiency) + pub validator_prefix: [u8; 20], + /// Position in validator's lane + pub position: u64, +} + +impl CarTableKey { + /// Create a new car table key + pub fn new(validator_bytes: &[u8], position: u64) -> Self { + let mut validator_prefix = [0u8; 20]; + let copy_len = validator_bytes.len().min(20); + validator_prefix[..copy_len].copy_from_slice(&validator_bytes[..copy_len]); + Self { + validator_prefix, + position, + } + } +} + +impl Encode for CarTableKey { + type Encoded = [u8; 28]; // 20 + 8 + + fn encode(self) -> Self::Encoded { + let mut buf = [0u8; 28]; + buf[..20].copy_from_slice(&self.validator_prefix); + buf[20..28].copy_from_slice(&self.position.to_be_bytes()); + buf + } +} + +impl Decode for CarTableKey { + fn decode(value: &[u8]) -> Result { + if value.len() < 28 { + return Err(reth_db_api::DatabaseError::Decode); + } + let mut validator_prefix = [0u8; 20]; + validator_prefix.copy_from_slice(&value[..20]); + let position = u64::from_be_bytes(value[20..28].try_into().unwrap()); + Ok(Self { + validator_prefix, + position, + }) + } +} + +// CarTableKey is also used as a Value in CarsByHash table +impl Compress for CarTableKey { + type Compressed = Vec; + + fn compress(self) -> Self::Compressed { + self.encode().to_vec() + } + + fn compress_to_buf>(self, buf: &mut B) { + buf.put_slice(&self.encode()); + } +} + +impl Decompress for CarTableKey { + fn decompress(value: &[u8]) -> Result { + Self::decode(value) + } +} + +/// Key for Votes/Proposals table: (height, round) +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Default, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct HeightRoundKey { + /// Consensus height + pub height: u64, + /// Consensus round + pub round: u32, +} + +impl HeightRoundKey { + /// Create a new height-round key + pub fn new(height: u64, round: u32) -> Self { + Self { height, round } + } +} + +impl Encode for HeightRoundKey { + type Encoded = [u8; 12]; // 8 + 4 + + fn encode(self) -> Self::Encoded { + let mut buf = [0u8; 12]; + buf[..8].copy_from_slice(&self.height.to_be_bytes()); + buf[8..12].copy_from_slice(&self.round.to_be_bytes()); + buf + } +} + +impl Decode for HeightRoundKey { + fn decode(value: &[u8]) -> Result { + if value.len() < 12 { + return Err(reth_db_api::DatabaseError::Decode); + } + let height = u64::from_be_bytes(value[..8].try_into().unwrap()); + let round = u32::from_be_bytes(value[8..12].try_into().unwrap()); + Ok(Self { height, round }) + } +} + +/// 32-byte hash key +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Default, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct HashKey(pub [u8; 32]); + +impl HashKey { + /// Create from a slice + pub fn from_slice(slice: &[u8]) -> Self { + let mut bytes = [0u8; 32]; + let copy_len = slice.len().min(32); + bytes[..copy_len].copy_from_slice(&slice[..copy_len]); + Self(bytes) + } +} + +impl Encode for HashKey { + type Encoded = [u8; 32]; + + fn encode(self) -> Self::Encoded { + self.0 + } +} + +impl Decode for HashKey { + fn decode(value: &[u8]) -> Result { + if value.len() < 32 { + return Err(reth_db_api::DatabaseError::Decode); + } + let mut bytes = [0u8; 32]; + bytes.copy_from_slice(&value[..32]); + Ok(Self(bytes)) + } +} + +/// Height key (u64) for height-indexed tables +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Default, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct HeightKey(pub u64); + +impl HeightKey { + /// Create a new height key + pub fn new(height: u64) -> Self { + Self(height) + } +} + +impl Encode for HeightKey { + type Encoded = [u8; 8]; + + fn encode(self) -> Self::Encoded { + self.0.to_be_bytes() + } +} + +impl Decode for HeightKey { + fn decode(value: &[u8]) -> Result { + if value.len() < 8 { + return Err(reth_db_api::DatabaseError::Decode); + } + Ok(Self(u64::from_be_bytes(value[..8].try_into().unwrap()))) + } +} + +/// Unit key for singleton tables (e.g., ConsensusState) +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Default, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct UnitKey; + +impl Encode for UnitKey { + type Encoded = [u8; 1]; + + fn encode(self) -> Self::Encoded { + [0] + } +} + +impl Decode for UnitKey { + fn decode(_value: &[u8]) -> Result { + Ok(Self) + } +} + +// ============================================================ +// Value Types (stored as bincode-serialized bytes) +// ============================================================ + +/// Wrapper for bincode-serializable values +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BincodeValue(pub T); + +impl From for BincodeValue { + fn from(value: T) -> Self { + Self(value) + } +} + +impl Deserialize<'de> + Debug + Send + Sync> Compress for BincodeValue { + type Compressed = Vec; + + fn compress(self) -> Self::Compressed { + bincode::serialize(&self.0).expect("bincode serialization failed") + } + + fn compress_to_buf>(self, buf: &mut B) { + let serialized = bincode::serialize(&self.0).expect("bincode serialization failed"); + buf.put_slice(&serialized); + } +} + +impl Deserialize<'de> + Debug + Send + Sync> Decompress + for BincodeValue +{ + fn decompress(value: &[u8]) -> Result { + bincode::deserialize(value) + .map(BincodeValue) + .map_err(|_| reth_db_api::DatabaseError::Decode) + } +} + +// ============================================================ +// Stored Value Types +// ============================================================ + +/// Stored batch value +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredBatch { + /// Worker ID that created this batch + pub worker_id: u8, + /// Serialized transactions + pub transactions: Vec>, + /// Timestamp when batch was created + pub timestamp: u64, +} + +/// Stored Car value +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredCar { + /// Proposer validator ID bytes + pub proposer: Vec, + /// Position in lane + pub position: u64, + /// Batch digests included + pub batch_digests: Vec, + /// Parent Car hash (if not genesis) + pub parent_ref: Option<[u8; 32]>, + /// BLS signature bytes + pub signature: Vec, + /// Computed hash + pub hash: [u8; 32], +} + +/// Stored batch digest +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredBatchDigest { + /// Worker ID + pub worker_id: u8, + /// Batch hash + pub hash: [u8; 32], + /// Transaction count + pub tx_count: u32, + /// Total size in bytes + pub size_bytes: u64, +} + +/// Stored aggregated attestation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredAggregatedAttestation { + /// Car hash being attested + pub car_hash: [u8; 32], + /// Car position + pub car_position: u64, + /// Car proposer + pub car_proposer: Vec, + /// Aggregated BLS signature + pub aggregated_signature: Vec, + /// Bit vector of signers + pub signers_bitvec: Vec, + /// Number of signers + pub signer_count: u32, +} + +/// Stored Cut value +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredCut { + /// Consensus height + pub height: u64, + /// Car entries in this Cut (validator -> car + attestation) + pub cars: Vec, +} + +/// Stored Car entry in a Cut (includes full car and attestation) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredCarEntry { + /// Validator ID bytes + pub validator: Vec, + /// The full stored Car + pub car: StoredCar, + /// Attestation (if available) + pub attestation: Option, +} + +/// Stored consensus state +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredConsensusState { + /// Current height + pub height: u64, + /// Current round + pub round: u32, + /// Last committed height + pub last_committed_height: u64, + /// WAL index to start replay from + pub wal_replay_index: u64, +} + +/// Stored WAL entry +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredWalEntry { + /// Entry type tag + pub entry_type: u8, + /// Serialized entry data + pub data: Vec, +} + +/// Stored validator set +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredValidatorSet { + /// Epoch number + pub epoch: u64, + /// Validators in this set + pub validators: Vec, + /// Total voting power + pub total_power: u64, +} + +/// Stored validator info +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredValidator { + /// Validator ID bytes + pub id: Vec, + /// Ed25519 public key for consensus + pub ed25519_pubkey: Vec, + /// BLS public key for attestations + pub bls_pubkey: Vec, + /// Voting power + pub power: u64, +} + +/// Stored votes collection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredVotes { + /// Height + pub height: u64, + /// Round + pub round: u32, + /// Collected votes + pub votes: Vec, +} + +/// Stored vote +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredVote { + /// Vote type (prevote=0, precommit=1) + pub vote_type: u8, + /// Voter ID + pub voter: Vec, + /// Block hash (None for nil vote) + pub block_hash: Option<[u8; 32]>, + /// Signature + pub signature: Vec, +} + +/// Stored proposal +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredProposal { + /// Height + pub height: u64, + /// Round + pub round: u32, + /// Proposer ID + pub proposer: Vec, + /// Cut being proposed + pub cut: StoredCut, + /// Signature + pub signature: Vec, +} + +// ============================================================ +// Table Definitions using reth-db Table trait +// ============================================================ + +use reth_db_api::table::Table; + +/// Batches table: Hash -> StoredBatch +/// Stores transaction batches from Workers +#[derive(Debug, Clone, Copy, Default)] +pub struct Batches; + +impl Table for Batches { + const NAME: &'static str = "Batches"; + type Key = HashKey; + type Value = BincodeValue; +} + +/// Cars table: (ValidatorPrefix, Position) -> StoredCar +/// Stores Certified Available Records indexed by validator and position +#[derive(Debug, Clone, Copy, Default)] +pub struct Cars; + +impl Table for Cars { + const NAME: &'static str = "Cars"; + type Key = CarTableKey; + type Value = BincodeValue; +} + +/// CarsByHash table: Hash -> CarTableKey +/// Secondary index for Car lookup by hash +#[derive(Debug, Clone, Copy, Default)] +pub struct CarsByHash; + +impl Table for CarsByHash { + const NAME: &'static str = "CarsByHash"; + type Key = HashKey; + type Value = CarTableKey; +} + +/// Attestations table: CarHash -> StoredAggregatedAttestation +/// Stores aggregated BLS attestations +#[derive(Debug, Clone, Copy, Default)] +pub struct Attestations; + +impl Table for Attestations { + const NAME: &'static str = "Attestations"; + type Key = HashKey; + type Value = BincodeValue; +} + +/// PendingCuts table: Height -> StoredCut +/// Stores Cuts awaiting consensus finalization +#[derive(Debug, Clone, Copy, Default)] +pub struct PendingCuts; + +impl Table for PendingCuts { + const NAME: &'static str = "PendingCuts"; + type Key = HeightKey; + type Value = BincodeValue; +} + +/// FinalizedCuts table: Height -> StoredCut +/// Stores consensus-finalized Cuts +#[derive(Debug, Clone, Copy, Default)] +pub struct FinalizedCuts; + +impl Table for FinalizedCuts { + const NAME: &'static str = "FinalizedCuts"; + type Key = HeightKey; + type Value = BincodeValue; +} + +/// ConsensusWal table: Index -> WalEntry bytes +/// Write-ahead log for crash recovery +#[derive(Debug, Clone, Copy, Default)] +pub struct ConsensusWal; + +impl Table for ConsensusWal { + const NAME: &'static str = "ConsensusWal"; + type Key = HeightKey; + type Value = BincodeValue; +} + +/// ConsensusState table: () -> StoredConsensusState +/// Current consensus state (height, round, step) +#[derive(Debug, Clone, Copy, Default)] +pub struct ConsensusState; + +impl Table for ConsensusState { + const NAME: &'static str = "ConsensusState"; + type Key = UnitKey; + type Value = BincodeValue; +} + +/// ValidatorSets table: Epoch -> StoredValidatorSet +/// Validator sets by epoch +#[derive(Debug, Clone, Copy, Default)] +pub struct ValidatorSets; + +impl Table for ValidatorSets { + const NAME: &'static str = "ValidatorSets"; + type Key = HeightKey; + type Value = BincodeValue; +} + +/// Votes table: (Height, Round) -> StoredVotes +/// Collected votes by height and round +#[derive(Debug, Clone, Copy, Default)] +pub struct Votes; + +impl Table for Votes { + const NAME: &'static str = "Votes"; + type Key = HeightRoundKey; + type Value = BincodeValue; +} + +/// Proposals table: (Height, Round) -> StoredProposal +/// Block proposals by height and round +#[derive(Debug, Clone, Copy, Default)] +pub struct Proposals; + +impl Table for Proposals { + const NAME: &'static str = "Proposals"; + type Key = HeightRoundKey; + type Value = BincodeValue; +} + +/// All CipherBFT tables +pub struct Tables; + +impl Tables { + /// All table names (for iteration/creation) + pub const ALL: &'static [&'static str] = &[ + Batches::NAME, + Cars::NAME, + CarsByHash::NAME, + Attestations::NAME, + PendingCuts::NAME, + FinalizedCuts::NAME, + ConsensusWal::NAME, + ConsensusState::NAME, + ValidatorSets::NAME, + Votes::NAME, + Proposals::NAME, + ]; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_car_table_key_encode_decode() { + let validator = [1u8; 32]; + let key = CarTableKey::new(&validator, 42); + let encoded = key.encode(); + let decoded = CarTableKey::decode(&encoded).unwrap(); + assert_eq!(key.validator_prefix, decoded.validator_prefix); + assert_eq!(key.position, decoded.position); + } + + #[test] + fn test_height_round_key_encode_decode() { + let key = HeightRoundKey::new(100, 5); + let encoded = key.encode(); + let decoded = HeightRoundKey::decode(&encoded).unwrap(); + assert_eq!(key.height, decoded.height); + assert_eq!(key.round, decoded.round); + } + + #[test] + fn test_hash_key_encode_decode() { + let hash = [42u8; 32]; + let key = HashKey(hash); + let encoded = key.encode(); + let decoded = HashKey::decode(&encoded).unwrap(); + assert_eq!(key.0, decoded.0); + } + + #[test] + fn test_bincode_value_compress_decompress() { + let stored = StoredConsensusState { + height: 100, + round: 5, + last_committed_height: 99, + wal_replay_index: 1000, + }; + let value = BincodeValue(stored.clone()); + let compressed = value.compress(); + let decompressed: BincodeValue = + BincodeValue::decompress(&compressed).unwrap(); + assert_eq!(decompressed.0.height, stored.height); + assert_eq!(decompressed.0.round, stored.round); + } +} diff --git a/crates/storage/src/mdbx/wal.rs b/crates/storage/src/mdbx/wal.rs new file mode 100644 index 0000000..26cca82 --- /dev/null +++ b/crates/storage/src/mdbx/wal.rs @@ -0,0 +1,308 @@ +//! MDBX-backed Write-Ahead Log implementation +//! +//! Provides a persistent WAL for crash recovery using MDBX as the backend. +//! All consensus state changes are logged before being applied. + +use crate::error::{Result, StorageError}; +use crate::wal::{Wal, WalEntry}; +use async_trait::async_trait; +use reth_db_api::cursor::DbCursorRO; +use reth_db_api::transaction::DbTx; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tracing::{debug, trace}; + +use super::database::Database; +use super::tables::{ConsensusWal as ConsensusWalTable, HeightKey, StoredWalEntry}; + +/// MDBX-backed WAL implementation +/// +/// Stores WAL entries persistently using MDBX, ensuring crash recovery. +pub struct MdbxWal { + /// The underlying database + db: Arc, + /// Next entry index (cached for performance) + next_index: AtomicU64, +} + +impl MdbxWal { + /// Create a new MDBX WAL + pub fn new(db: Arc) -> Result { + // Load the next index from the database + let next_index = Self::load_next_index(&db)?; + + debug!(next_index, "Initialized MDBX WAL"); + + Ok(Self { + db, + next_index: AtomicU64::new(next_index), + }) + } + + /// Load the next WAL index from database + fn load_next_index(db: &Database) -> Result { + let tx = db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + // Find the last entry to determine next index + match cursor + .last() + .map_err(|e| StorageError::Database(format!("Cursor last failed: {e}")))? + { + Some((key, _)) => Ok(key.0 + 1), + None => Ok(0), + } + } + + /// Get the underlying database + pub fn db(&self) -> &Arc { + &self.db + } + + /// Serialize a WAL entry for storage + #[allow(dead_code)] + fn serialize_entry(entry: &WalEntry) -> Result> { + bincode::serialize(entry).map_err(|e| { + crate::error::StorageError::Serialization(format!("Failed to serialize WAL entry: {e}")) + }) + } + + /// Deserialize a WAL entry from storage + #[allow(dead_code)] + fn deserialize_entry(data: &[u8]) -> Result { + bincode::deserialize(data).map_err(|e| { + crate::error::StorageError::Deserialization(format!( + "Failed to deserialize WAL entry: {e}" + )) + }) + } + + /// Get entry type tag for storage + fn entry_type_tag(entry: &WalEntry) -> u8 { + match entry { + WalEntry::BatchReceived(_) => 0, + WalEntry::CarCreated(_) => 1, + WalEntry::CarReceived(_) => 2, + WalEntry::AttestationAggregated(_) => 3, + WalEntry::CutProposed(_) => 4, + WalEntry::CutFinalized { .. } => 5, + WalEntry::Checkpoint { .. } => 6, + WalEntry::PipelineStageChanged { .. } => 7, + WalEntry::NextHeightAttestation { .. } => 8, + WalEntry::PreservedAttestedCars { .. } => 9, + } + } +} + +#[async_trait] +impl Wal for MdbxWal { + async fn append(&self, entry: WalEntry) -> Result { + use super::tables::BincodeValue; + use reth_db_api::transaction::DbTxMut; + + let index = self.next_index.fetch_add(1, Ordering::SeqCst); + let entry_type = entry.entry_type(); + + trace!(index, entry_type, "Appending WAL entry"); + + let serialized = Self::serialize_entry(&entry)?; + let stored = StoredWalEntry { + entry_type: Self::entry_type_tag(&entry), + data: serialized, + }; + + let tx = self.db.tx_mut()?; + tx.put::(HeightKey::new(index), BincodeValue(stored)) + .map_err(|e| StorageError::Database(format!("Failed to put WAL entry: {e}")))?; + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit WAL entry: {e}")))?; + + debug!(index, entry_type, "WAL entry appended"); + + Ok(index) + } + + async fn replay_from(&self, start_index: u64) -> Result> { + trace!(start_index, "Replaying WAL from index"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let mut entries = Vec::new(); + + // Seek to start index + let mut current = cursor + .seek(HeightKey::new(start_index)) + .map_err(|e| StorageError::Database(format!("Cursor seek failed: {e}")))?; + + while let Some((key, value)) = current { + let entry = Self::deserialize_entry(&value.0.data)?; + entries.push((key.0, entry)); + + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + + debug!(start_index, count = entries.len(), "WAL replay completed"); + + Ok(entries) + } + + async fn truncate_before(&self, before_index: u64) -> Result { + use reth_db_api::cursor::DbCursorRW; + use reth_db_api::transaction::DbTxMut; + + trace!(before_index, "Truncating WAL before index"); + + let tx = self.db.tx_mut()?; + let mut cursor = tx + .cursor_write::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + let mut deleted = 0u64; + + // Start from the beginning + let mut current = cursor + .first() + .map_err(|e| StorageError::Database(format!("Cursor first failed: {e}")))?; + + while let Some((key, _)) = current { + if key.0 >= before_index { + break; + } + + cursor + .delete_current() + .map_err(|e| StorageError::Database(format!("Failed to delete: {e}")))?; + deleted += 1; + + current = cursor + .next() + .map_err(|e| StorageError::Database(format!("Cursor next failed: {e}")))?; + } + + tx.commit() + .map_err(|e| StorageError::Database(format!("Failed to commit truncate: {e}")))?; + + debug!(before_index, deleted, "WAL truncation completed"); + + Ok(deleted) + } + + async fn next_index(&self) -> Result { + Ok(self.next_index.load(Ordering::SeqCst)) + } + + async fn sync(&self) -> Result<()> { + trace!("Syncing WAL to disk"); + + // MDBX provides durable writes by default with proper transaction commits + // The MDBX_SAFE_NOSYNC mode is not used, so writes are already durable + // No additional sync needed as commits are already synchronous + + Ok(()) + } + + async fn last_checkpoint(&self) -> Result> { + trace!("Finding last checkpoint"); + + let tx = self.db.tx()?; + let mut cursor = tx + .cursor_read::() + .map_err(|e| StorageError::Database(format!("Failed to create cursor: {e}")))?; + + // Iterate backwards to find the last checkpoint + let mut current = cursor + .last() + .map_err(|e| StorageError::Database(format!("Cursor last failed: {e}")))?; + + while let Some((key, value)) = current { + // Check if this is a checkpoint entry (entry_type == 6) + if value.0.entry_type == 6 { + return Ok(Some(key.0)); + } + + current = cursor + .prev() + .map_err(|e| StorageError::Database(format!("Cursor prev failed: {e}")))?; + } + + Ok(None) + } + + async fn checkpoint(&self, height: u64) -> Result { + let entry_count = self.next_index().await?; + + trace!(height, entry_count, "Creating checkpoint"); + + let entry = WalEntry::Checkpoint { + height, + entry_count, + }; + + self.append(entry).await + } +} + +/// WAL entry index key +#[allow(dead_code)] +#[derive(Debug, Clone, Copy, Default)] +pub struct WalIndexKey(pub u64); + +impl reth_db_api::table::Encode for WalIndexKey { + type Encoded = [u8; 8]; + + fn encode(self) -> Self::Encoded { + self.0.to_be_bytes() + } +} + +impl reth_db_api::table::Decode for WalIndexKey { + fn decode(value: &[u8]) -> std::result::Result { + if value.len() < 8 { + return Err(reth_db_api::DatabaseError::Decode); + } + Ok(Self(u64::from_be_bytes(value[..8].try_into().unwrap()))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_db_api::table::Decode; + + #[test] + fn test_wal_entry_serialization() { + let entry = WalEntry::Checkpoint { + height: 100, + entry_count: 50, + }; + + let serialized = MdbxWal::serialize_entry(&entry).unwrap(); + let deserialized = MdbxWal::deserialize_entry(&serialized).unwrap(); + + match deserialized { + WalEntry::Checkpoint { + height, + entry_count, + } => { + assert_eq!(height, 100); + assert_eq!(entry_count, 50); + } + _ => panic!("Wrong entry type"), + } + } + + #[test] + fn test_wal_index_key_encode_decode() { + let key = WalIndexKey(12345); + let encoded = reth_db_api::table::Encode::encode(key); + let decoded = WalIndexKey::decode(&encoded).unwrap(); + assert_eq!(key.0, decoded.0); + } +} diff --git a/crates/storage/src/memory.rs b/crates/storage/src/memory.rs index 18ed048..03454fd 100644 --- a/crates/storage/src/memory.rs +++ b/crates/storage/src/memory.rs @@ -82,7 +82,7 @@ impl DclStore for InMemoryStore { let mut batches = self.batches.write(); if batches.contains_key(&hash) { - return Err(StorageError::DuplicateEntry(format!("batch {}", hash))); + return Err(StorageError::DuplicateEntry(format!("batch {hash}"))); } batches.insert(hash, batch); diff --git a/crates/storage/src/pruning.rs b/crates/storage/src/pruning.rs new file mode 100644 index 0000000..da01860 --- /dev/null +++ b/crates/storage/src/pruning.rs @@ -0,0 +1,297 @@ +//! Background pruning service for storage garbage collection +//! +//! Provides automatic cleanup of old data based on configurable retention policies. +//! The pruning task runs periodically and removes: +//! - Finalized Cuts older than the retention threshold +//! - Unreferenced Cars, Attestations, and Batches + +use crate::dcl::DclStore; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; +use tokio::time::{interval, Duration}; +use tracing::{debug, error, info, trace}; + +/// Configuration for the background pruning task +#[derive(Debug, Clone)] +pub struct PruningConfig { + /// Number of blocks to retain (default: 100,000) + pub retention_blocks: u64, + /// Interval between pruning runs in blocks (default: 1,000) + pub prune_interval_blocks: u64, + /// Minimum interval between pruning runs (default: 60 seconds) + pub min_prune_interval: Duration, +} + +impl Default for PruningConfig { + fn default() -> Self { + Self { + retention_blocks: 100_000, + prune_interval_blocks: 1_000, + min_prune_interval: Duration::from_secs(60), + } + } +} + +impl PruningConfig { + /// Create a new pruning configuration + pub fn new(retention_blocks: u64, prune_interval_blocks: u64) -> Self { + Self { + retention_blocks, + prune_interval_blocks, + min_prune_interval: Duration::from_secs(60), + } + } + + /// Set the minimum interval between pruning runs + pub fn with_min_interval(mut self, interval: Duration) -> Self { + self.min_prune_interval = interval; + self + } +} + +/// Handle for controlling the background pruning task +pub struct PruningHandle { + /// Shutdown signal + shutdown: Arc, + /// Notify for immediate pruning trigger + trigger: Arc, + /// Last pruned height + last_pruned_height: Arc, + /// Current finalized height (updated externally) + current_height: Arc, +} + +impl PruningHandle { + /// Create a new pruning handle + fn new() -> Self { + Self { + shutdown: Arc::new(AtomicBool::new(false)), + trigger: Arc::new(Notify::new()), + last_pruned_height: Arc::new(AtomicU64::new(0)), + current_height: Arc::new(AtomicU64::new(0)), + } + } + + /// Signal shutdown to the pruning task + pub fn shutdown(&self) { + self.shutdown.store(true, Ordering::SeqCst); + self.trigger.notify_one(); + } + + /// Trigger immediate pruning + pub fn trigger_prune(&self) { + self.trigger.notify_one(); + } + + /// Update the current finalized height + /// + /// This should be called when new blocks are finalized. + /// The pruning task will use this to determine when to prune. + pub fn update_height(&self, height: u64) { + self.current_height.store(height, Ordering::SeqCst); + } + + /// Get the last height that was pruned + pub fn last_pruned_height(&self) -> u64 { + self.last_pruned_height.load(Ordering::SeqCst) + } + + /// Check if the pruning task is still running + pub fn is_running(&self) -> bool { + !self.shutdown.load(Ordering::SeqCst) + } +} + +impl Clone for PruningHandle { + fn clone(&self) -> Self { + Self { + shutdown: Arc::clone(&self.shutdown), + trigger: Arc::clone(&self.trigger), + last_pruned_height: Arc::clone(&self.last_pruned_height), + current_height: Arc::clone(&self.current_height), + } + } +} + +/// Background pruning task +/// +/// Spawns a tokio task that periodically prunes old data from storage. +pub struct PruningTask { + store: Arc, + config: PruningConfig, + handle: PruningHandle, +} + +impl PruningTask { + /// Create a new pruning task + pub fn new(store: Arc, config: PruningConfig) -> Self { + Self { + store, + config, + handle: PruningHandle::new(), + } + } + + /// Get a handle for controlling the pruning task + pub fn handle(&self) -> PruningHandle { + self.handle.clone() + } + + /// Start the background pruning task + /// + /// Returns a JoinHandle for the spawned task. + pub fn spawn(self) -> tokio::task::JoinHandle<()> { + let store = self.store; + let config = self.config; + let handle = self.handle; + + tokio::spawn(async move { + info!( + retention_blocks = config.retention_blocks, + prune_interval_blocks = config.prune_interval_blocks, + "Starting background pruning task" + ); + + let mut interval_timer = interval(config.min_prune_interval); + let mut last_checked_height = 0u64; + + loop { + tokio::select! { + _ = interval_timer.tick() => { + // Check if it's time to prune based on block interval + } + _ = handle.trigger.notified() => { + if handle.shutdown.load(Ordering::SeqCst) { + info!("Pruning task received shutdown signal"); + break; + } + // Immediate prune triggered + trace!("Immediate prune triggered"); + } + } + + if handle.shutdown.load(Ordering::SeqCst) { + break; + } + + let current_height = handle.current_height.load(Ordering::SeqCst); + + // Check if we've advanced enough blocks to warrant pruning + if current_height < last_checked_height + config.prune_interval_blocks { + continue; + } + + // Check if we have enough blocks to apply retention policy + if current_height < config.retention_blocks { + trace!( + current_height, + retention_blocks = config.retention_blocks, + "Not enough blocks to prune yet" + ); + continue; + } + + let prune_before_height = current_height.saturating_sub(config.retention_blocks); + let last_pruned = handle.last_pruned_height.load(Ordering::SeqCst); + + // Skip if we've already pruned up to this height + if prune_before_height <= last_pruned { + trace!( + prune_before_height, + last_pruned, + "Already pruned up to this height" + ); + last_checked_height = current_height; + continue; + } + + debug!(current_height, prune_before_height, "Running pruning cycle"); + + match store.prune_before(prune_before_height).await { + Ok(pruned_count) => { + if pruned_count > 0 { + info!(pruned_count, prune_before_height, "Pruning cycle completed"); + } else { + debug!(prune_before_height, "Pruning cycle completed (no entries)"); + } + handle + .last_pruned_height + .store(prune_before_height, Ordering::SeqCst); + } + Err(e) => { + error!(error = %e, prune_before_height, "Pruning cycle failed"); + } + } + + last_checked_height = current_height; + } + + info!("Background pruning task stopped"); + }) + } +} + +/// Convenience function to start a pruning task with default configuration +pub fn spawn_pruning_task( + store: Arc, +) -> (PruningHandle, tokio::task::JoinHandle<()>) { + spawn_pruning_task_with_config(store, PruningConfig::default()) +} + +/// Start a pruning task with custom configuration +pub fn spawn_pruning_task_with_config( + store: Arc, + config: PruningConfig, +) -> (PruningHandle, tokio::task::JoinHandle<()>) { + let task = PruningTask::new(store, config); + let handle = task.handle(); + let join_handle = task.spawn(); + (handle, join_handle) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memory::InMemoryStore; + use tokio::time::sleep; + + #[test] + fn test_pruning_config_default() { + let config = PruningConfig::default(); + assert_eq!(config.retention_blocks, 100_000); + assert_eq!(config.prune_interval_blocks, 1_000); + assert_eq!(config.min_prune_interval, Duration::from_secs(60)); + } + + #[test] + fn test_pruning_config_builder() { + let config = PruningConfig::new(50_000, 500).with_min_interval(Duration::from_secs(30)); + assert_eq!(config.retention_blocks, 50_000); + assert_eq!(config.prune_interval_blocks, 500); + assert_eq!(config.min_prune_interval, Duration::from_secs(30)); + } + + #[tokio::test] + async fn test_pruning_handle_lifecycle() { + let store = Arc::new(InMemoryStore::new()); + let config = PruningConfig::new(10, 5).with_min_interval(Duration::from_millis(10)); + + let (handle, join_handle) = spawn_pruning_task_with_config(store, config); + + assert!(handle.is_running()); + assert_eq!(handle.last_pruned_height(), 0); + + // Update height + handle.update_height(100); + + // Give task time to run + sleep(Duration::from_millis(50)).await; + + // Shutdown + handle.shutdown(); + join_handle.await.unwrap(); + + assert!(!handle.is_running()); + } +} diff --git a/deny.toml b/deny.toml index df5f7b6..cacda31 100644 --- a/deny.toml +++ b/deny.toml @@ -8,7 +8,18 @@ all-features = true [advisories] version = 2 db-path = "~/.cargo/advisory-db" -ignore = [] +ignore = [ + # paste - no longer maintained (RUSTSEC-2024-0436) + # This is a transitive dependency from reth-db/reth-libmdbx + # We cannot remove it until reth upstream removes this dependency + # See: https://rustsec.org/advisories/RUSTSEC-2024-0436 + "RUSTSEC-2024-0436", + # bincode - unmaintained (RUSTSEC-2025-0141) + # Transitive dependency from reth-nippy-jar, also used in our crates + # bincode 1.3.3 is considered "complete" by maintainers + # See: https://rustsec.org/advisories/RUSTSEC-2025-0141 + "RUSTSEC-2025-0141", +] [licenses] version = 2