diff --git a/Cargo.toml b/Cargo.toml index 1a55921..fbdcae7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "crates/crypto", "crates/data-chain", "crates/storage", - "crates/node", + "crates/node", "crates/mempool", ] resolver = "2" diff --git a/crates/mempool/Cargo.toml b/crates/mempool/Cargo.toml new file mode 100644 index 0000000..f4e1082 --- /dev/null +++ b/crates/mempool/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "mempool" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +# Reth transaction pool and primitives +reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0" } +reth-primitives = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0" } +reth-storage-api = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0" } +reth-chainspec = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0" } + +# Ethereum types (alloy suite - use Reth compatible version) +alloy-primitives = "0.8" +alloy-eips = "0.4" + +# Serialization +serde = { workspace = true } +serde_json = { workspace = true } + +# Error handling +thiserror = { workspace = true } + +# Async +tokio = { workspace = true } +async-trait = { workspace = true } + +# Logging +tracing = { workspace = true } + +[dev-dependencies] +reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0", features = ["test-utils"] } +reth-provider = { git = "https://github.com/paradigmxyz/reth", rev = "v1.1.0", features = ["test-utils"] } diff --git a/crates/mempool/README.md b/crates/mempool/README.md new file mode 100644 index 0000000..05498d7 --- /dev/null +++ b/crates/mempool/README.md @@ -0,0 +1,88 @@ +# CipherBFT Mempool + +This crate wraps Reth's transaction pool and adds CipherBFT-specific validation. + +## What is implemented + +- `CipherBftPool

`: thin wrapper over Reth's pool (`pool.rs`) +- `CipherBftValidator`: wrapper for Reth validator (`validator.rs`) +- BFT policy checks: min gas price + nonce gap (inside `validate_bft_policy`) +- `MempoolConfig -> PoolConfig` mapping (`config.rs`) +- Worker adapter: pending/queued/batch helpers (`CipherBftPoolAdapter`) + +## Pool creation + +### Recommended (build internally) + +`CipherBftPool::new(...)` builds the Reth pool and validator internally. + +```rust +use cipherbft_mempool::{CipherBftPool, MempoolConfig}; +use reth_chainspec::ChainSpec; +use reth_provider::StateProviderFactory; +use reth_transaction_pool::blobstore::InMemoryBlobStore; +use std::sync::Arc; + +let chain_spec: Arc = Arc::new(/* ... */); +let client: Arc = Arc::new(/* ... */); +let blob_store = InMemoryBlobStore::default(); +let config = MempoolConfig::default(); + +let pool = CipherBftPool::new(chain_spec, client, blob_store, config)?; +``` + +### Wrap an existing Reth pool + +Use this when you already constructed a `Pool`. + +```rust +use cipherbft_mempool::{CipherBftPool, CipherBftValidator, MempoolConfig}; +use reth_transaction_pool::{Pool, CoinbaseTipOrdering, PoolConfig}; + +let mempool_config = MempoolConfig::default(); +let validator = CipherBftValidator::new(chain_spec, client, blob_store.clone()); +let pool_config: PoolConfig = mempool_config.clone().into(); +let reth_pool = Pool::new( + validator, + CoinbaseTipOrdering::default(), + blob_store, + pool_config, +); + +let pool = CipherBftPool::wrap(reth_pool, mempool_config, client); +``` + +## Transaction insertion + +`add_transaction` accepts several input types (`TransactionSigned`, recovered, pooled, raw bytes): + +```rust +use reth_transaction_pool::TransactionOrigin; + +pool.add_transaction(TransactionOrigin::External, tx_signed).await?; +pool.add_transaction(TransactionOrigin::External, tx_recovered).await?; +pool.add_transaction(TransactionOrigin::External, pooled).await?; +``` + +Or insert pooled transactions directly: + +```rust +pool.add_pooled_transaction(TransactionOrigin::Local, pooled_tx).await?; +``` + +## Adapter helpers (worker-facing) + +```rust +let adapter = pool.adapter(); +let pending = adapter.pending_transactions(); +let queued = adapter.queued_transactions(); +let batch = adapter.get_transactions_for_batch(100, 30_000_000); +let stats = adapter.stats(); +adapter.remove_finalized(&tx_hashes); +``` + +## Notes + +- BFT policy checks are enforced before handing transactions to Reth. +- Standard Ethereum validation is delegated to Reth. +- Worker integration is not yet wired in the node. diff --git a/crates/mempool/src/config.rs b/crates/mempool/src/config.rs new file mode 100644 index 0000000..644e4f1 --- /dev/null +++ b/crates/mempool/src/config.rs @@ -0,0 +1,91 @@ +//! Mempool configuration +//! +//! We primarily rely on Reth's `PoolConfig`, but expose a CipherBFT-friendly +//! wrapper so higher layers can express their preferences without depending +//! on Reth types directly. + +use reth_transaction_pool::PoolConfig; +use serde::{Deserialize, Serialize}; + +/// Mempool configuration with CipherBFT-specific knobs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MempoolConfig { + /// Maximum executable transactions to keep in the pending sub-pool. + pub max_pending: usize, + + /// Maximum queued transactions per sender (maps to Reth's account slots). + pub max_queued_per_account: usize, + + /// Maximum nonce gap before the transaction is considered queued. + pub max_nonce_gap: u64, + + /// Minimum gas price (in wei) we allow into the pool. + pub min_gas_price: u128, + + /// Default price bump (in %) required to replace a transaction. + pub default_price_bump: u128, + + /// Price bump (in %) required to replace a blob transaction. + pub replace_blob_tx_price_bump: u128, +} + +impl Default for MempoolConfig { + fn default() -> Self { + Self { + max_pending: 10_000, + max_queued_per_account: 100, + max_nonce_gap: 16, + min_gas_price: 1_000_000_000, // 1 gwei + default_price_bump: 10, + replace_blob_tx_price_bump: 100, + } + } +} + +impl From for PoolConfig { + fn from(cfg: MempoolConfig) -> Self { + let mut pool_cfg = PoolConfig::default(); + pool_cfg.pending_limit.max_txs = cfg.max_pending; + pool_cfg.queued_limit.max_txs = cfg.max_pending; + pool_cfg.max_account_slots = cfg.max_queued_per_account; + pool_cfg.price_bumps.default_price_bump = cfg.default_price_bump; + pool_cfg.price_bumps.replace_blob_tx_price_bump = cfg.replace_blob_tx_price_bump; + + // TODO: map min_gas_price and max_nonce_gap once upstream hooks are wired. + pool_cfg + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let cfg = MempoolConfig::default(); + assert_eq!(cfg.max_pending, 10_000); + assert_eq!(cfg.max_queued_per_account, 100); + assert_eq!(cfg.max_nonce_gap, 16); + assert_eq!(cfg.min_gas_price, 1_000_000_000); + assert_eq!(cfg.default_price_bump, 10); + assert_eq!(cfg.replace_blob_tx_price_bump, 100); + } + + #[test] + fn test_reth_conversion() { + let cfg = MempoolConfig { + max_pending: 5_000, + max_queued_per_account: 42, + default_price_bump: 25, + replace_blob_tx_price_bump: 150, + ..Default::default() + }; + + let reth_cfg: PoolConfig = cfg.into(); + assert_eq!(reth_cfg.pending_limit.max_txs, 5_000); + assert_eq!(reth_cfg.queued_limit.max_txs, 5_000); + assert_eq!(reth_cfg.max_account_slots, 42); + assert_eq!(reth_cfg.price_bumps.default_price_bump, 25); + assert_eq!(reth_cfg.price_bumps.replace_blob_tx_price_bump, 150); + } +} diff --git a/crates/mempool/src/error.rs b/crates/mempool/src/error.rs new file mode 100644 index 0000000..0815d18 --- /dev/null +++ b/crates/mempool/src/error.rs @@ -0,0 +1,51 @@ +//! Error types for mempool operations +//! +//! MP-1: keep the data structure minimal, wrap Reth's PoolError, and expose +//! only the BFT policy errors we need. + +use reth_transaction_pool::error::PoolError; +use thiserror::Error; + +/// Mempool error types (expanded for MP-2) +#[derive(Error, Debug)] +pub enum MempoolError { + /// Bubble up errors coming from the underlying Reth pool. + #[error(transparent)] + Pool(#[from] PoolError), + + /// Gas price below policy threshold. + #[error("Insufficient gas price: got {got}, min {min}")] + InsufficientGasPrice { got: u128, min: u128 }, + + /// Nonce gap exceeds configured maximum. + #[error("Nonce gap exceeds maximum: gap={gap} > max={max}")] + NonceGapExceeded { gap: u64, max: u64 }, + + /// Invalid signature. + #[error("Invalid transaction signature")] + InvalidSignature, + + /// Nonce too low (already executed). + #[error("Nonce too low: tx nonce {tx_nonce}, current {current_nonce}")] + NonceTooLow { tx_nonce: u64, current_nonce: u64 }, + + /// Insufficient balance for gas. + #[error("Insufficient balance: need {need}, have {have}")] + InsufficientBalance { need: u128, have: u128 }, + + /// Gas limit too high. + #[error("Gas limit too high: {gas_limit} > {max}")] + GasLimitTooHigh { gas_limit: u64, max: u64 }, + + /// Transaction size exceeds limit. + #[error("Transaction too large: {size} > {max}")] + OversizedTransaction { size: usize, max: usize }, + + /// Failed to convert into pool-specific transaction type. + #[error("Transaction conversion failed: {0}")] + Conversion(String), + + /// Internal error marker. + #[error("Internal error: {0}")] + Internal(String), +} diff --git a/crates/mempool/src/lib.rs b/crates/mempool/src/lib.rs new file mode 100644 index 0000000..0a09584 --- /dev/null +++ b/crates/mempool/src/lib.rs @@ -0,0 +1,26 @@ +//! CipherBFT Mempool - Transaction mempool based on Reth's TransactionPool +//! +//! # Architecture +//! +//! The mempool is organized around Reth's transaction pool with CipherBFT-specific +//! wrapping for DCL/CL integration. +//! +//! ## Modules +//! +//! - `error`: Error types for mempool operations +//! - `config`: Configuration for the mempool +//! - `transaction`: Transaction metadata tracking +//! - `account`: Per-account state management +//! - `pool`: Main pool adapter over Reth's TransactionPool + +pub mod config; +pub mod error; +pub mod pool; +pub mod transaction; +pub mod validator; + +pub use config::MempoolConfig; +pub use error::MempoolError; +pub use pool::CipherBftPool; +pub use transaction::TransactionOrdering; +pub use validator::CipherBftValidator; diff --git a/crates/mempool/src/pool.rs b/crates/mempool/src/pool.rs new file mode 100644 index 0000000..6da9415 --- /dev/null +++ b/crates/mempool/src/pool.rs @@ -0,0 +1,552 @@ +//! CipherBFT mempool wrapper over Reth's Pool +//! +//! MP-1: Basic pool wrapper and config delegation to Reth +//! MP-2: Transaction validation (signature, nonce, gas, balance) +//! +//! ## Design Decision: Direct Reth Pool Usage +//! +//! Per ADR-006, we **directly use Reth's Pool implementation**: +//! ```ignore +//! type CipherBftPool = Pool; +//! ``` +//! +//! We add a thin wrapper ONLY for BFT-specific pre-validation: +//! 1. Minimum gas price enforcement (spam prevention) +//! 2. Maximum nonce gap enforcement (queue bloat prevention) +//! +//! All standard Ethereum validation is delegated to Reth: +//! - Signature verification +//! - Nonce ordering +//! - Balance checks +//! - Gas limits +//! - Transaction size +//! - Replace-by-fee logic +//! +//! ## Integration Status +//! +//! Generic `P: TransactionPool` is temporary until we can instantiate Reth's Pool: +//! +//! ```ignore +//! // Target implementation after EL/CL integration: +//! use reth_transaction_pool::{Pool, CoinbaseTipOrdering, maintain::LocalTransactionConfig}; +//! +//! let pool = Pool::new( +//! eth_pool_validator, // From EL - provides StateProvider for nonce/balance +//! CoinbaseTipOrdering::default(), // Reth's gas price ordering +//! blob_store, // Node-provided BlobStore (optional if not using blobs) +//! pool_config, // From our MempoolConfig.to_reth_config() +//! ); +//! ``` + +use crate::config::MempoolConfig; +use crate::error::MempoolError; +use crate::validator::CipherBftValidator; +use alloy_eips::eip2718::Decodable2718; +use alloy_primitives::{Bytes, TxHash}; +use reth_primitives::{ + PooledTransactionsElement, PooledTransactionsElementEcRecovered, TransactionSigned, + TransactionSignedEcRecovered, TransactionSignedNoHash, +}; +use reth_storage_api::StateProviderFactory; +use reth_transaction_pool::{ + blobstore::BlobStore, validate::EthTransactionValidator, CoinbaseTipOrdering, + EthPooledTransaction, Pool, PoolTransaction, TransactionOrigin, TransactionPool, +}; +use std::sync::Arc; +use tracing::{debug, warn}; + +/// Mempool wrapper that adds BFT-specific pre-validation to Reth's Pool +/// +/// Generic `P: TransactionPool` will be replaced with concrete Reth Pool type: +/// `Pool` once EL/ST/CL provide required components. +/// +/// This is NOT abstraction for abstraction's sake - it's a temporary measure +/// until we have all dependencies ready for Pool instantiation. +pub struct CipherBftPool { + /// Reth's Pool implementation (to be: Pool) + pool: P, + /// BFT-specific config + config: MempoolConfig, + /// State provider factory for BFT policy validation (nonce queries) + /// Use latest() per validation to avoid stale state snapshots. + state_provider_factory: Arc, +} + +/// Concrete Reth pool type used by CipherBFT. +pub type CipherBftRethPool = Pool< + CipherBftValidator, EthPooledTransaction>>, + CoinbaseTipOrdering, + S, +>; + +impl CipherBftPool

{ + /// Create new mempool wrapper + /// + /// Once DCL/EL/ST are ready, instantiate with Reth's Pool: + /// ```ignore + /// // In DCL Worker initialization: + /// let reth_pool = Pool::new( + /// eth_pool_validator, // EL's validator with StateProvider + /// CoinbaseTipOrdering::default(), + /// blob_store, // ST's blob storage + /// config.to_reth_config(), + /// ); + /// // Mempool lives in DCL, uses EL's StateProvider for validation + /// CipherBftPool::wrap(reth_pool, config, state_provider_factory) + /// ``` + pub fn wrap( + pool: P, + config: MempoolConfig, + state_provider_factory: Arc, + ) -> Self { + Self { + pool, + config, + state_provider_factory, + } + } + + /// Get reference to Reth pool + pub fn pool(&self) -> &P { + &self.pool + } + + /// Get BFT config + pub fn config(&self) -> &MempoolConfig { + &self.config + } + + /// Borrow a worker-facing adapter over the underlying pool (ADR-006) + pub fn adapter(&self) -> CipherBftPoolAdapter<'_, P> { + CipherBftPoolAdapter::new(&self.pool) + } + + /// Recover and pre-validate a transaction before handing it to Reth + /// + /// Returns the recovered transaction if it passes CipherBFT policies. + pub async fn recover_and_validate( + &self, + tx: TransactionSigned, + ) -> Result { + let tx_recovered = tx.try_ecrecovered().ok_or(MempoolError::InvalidSignature)?; + + let sender = tx_recovered.signer(); + let tx_ref = tx_recovered.as_ref(); + + debug!( + "Pre-validating transaction from {:?}, nonce={}, gas_price={}", + sender, + tx_ref.nonce(), + tx_ref.max_fee_per_gas() + ); + + self.validate_bft_policy(&tx_recovered).await?; + + debug!("Transaction {:?} passed BFT policy checks", tx_ref.hash()); + + Ok(tx_recovered) + } + + /// Validate BFT-specific policies only (MP-2) + /// + /// This method performs ONLY CipherBFT-specific checks. + /// All standard Ethereum validation is delegated to Reth: + /// - Signature verification → Reth + /// - Nonce ordering (too low/duplicate) → Reth + /// - Balance sufficiency → Reth + /// - Gas limit vs chain limit → Reth (from chain spec) + /// - Transaction size limits → Reth + /// + /// CipherBFT adds: + /// 1. Minimum gas price (prevent spam) + /// 2. Maximum nonce gap (prevent queue bloat) + async fn validate_bft_policy( + &self, + tx: &TransactionSignedEcRecovered, + ) -> Result<(), MempoolError> { + let sender = tx.signer(); + let tx_ref = tx.as_ref(); + + // BFT Policy 1: Minimum gas price enforcement + let effective_gas_price = tx_ref.max_fee_per_gas(); + if effective_gas_price < self.config.min_gas_price { + warn!( + "Transaction from {:?} rejected: gas price {} < min {}", + sender, effective_gas_price, self.config.min_gas_price + ); + return Err(MempoolError::InsufficientGasPrice { + got: effective_gas_price, + min: self.config.min_gas_price, + }); + } + + // BFT Policy 2: Nonce gap enforcement + // Prevents attackers from bloating the queued pool with distant-future nonces + let state_provider = self + .state_provider_factory + .latest() + .map_err(|e| MempoolError::Internal(format!("Failed to get state provider: {e}")))?; + let current_nonce = state_provider + .account_nonce(sender) + .map_err(|e| MempoolError::Internal(format!("Failed to get nonce: {}", e)))? + .unwrap_or(0); // Default to 0 if account doesn't exist yet + let tx_nonce = tx_ref.nonce(); + + if tx_nonce > current_nonce { + let gap = tx_nonce - current_nonce - 1; + if gap > self.config.max_nonce_gap { + return Err(MempoolError::NonceGapExceeded { + gap, + max: self.config.max_nonce_gap, + }); + } + } + + Ok(()) + } +} + +impl CipherBftPool> +where + Client: StateProviderFactory + 'static, + S: BlobStore + Clone, +{ + /// Create a CipherBFT mempool that builds the underlying Reth pool internally. + pub fn new( + chain_spec: Arc, + client: Arc, + blob_store: S, + config: MempoolConfig, + ) -> Result { + let state_provider_factory: Arc = client.clone(); + let validator = + CipherBftValidator::new(chain_spec, Arc::clone(&client), blob_store.clone()); + let pool_config: reth_transaction_pool::PoolConfig = config.clone().into(); + let pool = Pool::new( + validator, + CoinbaseTipOrdering::default(), + blob_store, + pool_config, + ); + Ok(Self::wrap(pool, config, state_provider_factory)) + } +} + +/// Worker-facing adapter that surfaces pool operations (ADR-006) +pub struct CipherBftPoolAdapter<'a, P: TransactionPool> { + pool: &'a P, +} + +impl<'a, P: TransactionPool> CipherBftPoolAdapter<'a, P> { + fn new(pool: &'a P) -> Self { + Self { pool } + } + + /// Get best transactions for a Worker batch (ADR-006) + pub fn get_transactions_for_batch( + &self, + limit: usize, + gas_limit: u64, + ) -> Vec { + let mut selected = Vec::with_capacity(limit); + let mut gas_used = 0u64; + + for tx in self.pool.best_transactions() { + if selected.len() >= limit { + break; + } + let tx_gas = tx.gas_limit(); + if gas_used + tx_gas > gas_limit { + continue; + } + gas_used += tx_gas; + let signed = tx.to_recovered_transaction().into_signed(); + selected.push(signed); + } + + selected + } + + /// Remove transactions that were finalized in a committed block (ADR-006) + pub fn remove_finalized(&self, tx_hashes: &[TxHash]) { + let _ = self.pool.remove_transactions(tx_hashes.to_vec()); + } + + /// Get pool statistics for metrics (ADR-006) + pub fn stats(&self) -> PoolStats { + let size = self.pool.pool_size(); + PoolStats { + pending: size.pending, + queued: size.queued, + total: size.pending + size.queued, + } + } + + /// Get pending (executable) transactions in pool order (MP-5). + pub fn pending_transactions(&self) -> Vec { + self.pool + .pending_transactions() + .into_iter() + .map(|tx| tx.to_recovered_transaction().into_signed()) + .collect() + } + + /// Get queued (nonce-gap) transactions in pool order (MP-5). + pub fn queued_transactions(&self) -> Vec { + self.pool + .queued_transactions() + .into_iter() + .map(|tx| tx.to_recovered_transaction().into_signed()) + .collect() + } +} + +impl

CipherBftPool

+where + P: TransactionPool, + P::Transaction: PoolTransaction + TryFrom, + ::Consensus: Into, + ::Pooled: From, + >::Error: std::fmt::Display, +{ + /// Add a transaction to the pool with CipherBFT validation (MP-2) + pub async fn add_transaction( + &self, + origin: TransactionOrigin, + tx: T, + ) -> Result<(), MempoolError> + where + T: IntoPoolTransactionInput, + { + let pooled_tx = match tx.into_input()? { + PoolTransactionInput::Signed(tx) => { + let tx_recovered = self.recover_and_validate(tx).await?; + P::Transaction::try_from(tx_recovered) + .map_err(|err| MempoolError::Conversion(err.to_string()))? + } + PoolTransactionInput::Recovered(tx_recovered) => { + self.validate_bft_policy(&tx_recovered).await?; + P::Transaction::try_from(tx_recovered) + .map_err(|err| MempoolError::Conversion(err.to_string()))? + } + PoolTransactionInput::Pooled(PoolTx(pooled_tx)) => { + let tx_recovered: TransactionSignedEcRecovered = + pooled_tx.clone().into_consensus().into(); + self.validate_bft_policy(&tx_recovered).await?; + pooled_tx + } + PoolTransactionInput::PooledEcRecovered(pooled) => { + let pooled_tx = P::Transaction::from_pooled(pooled.into()); + let tx_recovered: TransactionSignedEcRecovered = + pooled_tx.clone().into_consensus().into(); + self.validate_bft_policy(&tx_recovered).await?; + pooled_tx + } + PoolTransactionInput::PooledRaw(pooled) => { + let pooled = pooled + .try_into_ecrecovered() + .map_err(|_| MempoolError::InvalidSignature)?; + let pooled_tx = P::Transaction::from_pooled(pooled.into()); + let tx_recovered: TransactionSignedEcRecovered = + pooled_tx.clone().into_consensus().into(); + self.validate_bft_policy(&tx_recovered).await?; + pooled_tx + } + }; + self.pool.add_transaction(origin, pooled_tx).await?; + Ok(()) + } + + /// Add a pooled transaction directly. + pub async fn add_pooled_transaction( + &self, + origin: TransactionOrigin, + tx: P::Transaction, + ) -> Result<(), MempoolError> { + self.add_transaction(origin, PoolTx::new(tx)).await + } +} + +pub enum PoolTransactionInput { + Signed(TransactionSigned), + Recovered(TransactionSignedEcRecovered), + Pooled(PoolTx), + PooledEcRecovered(PooledTransactionsElementEcRecovered), + PooledRaw(PooledTransactionsElement), +} + +pub trait IntoPoolTransactionInput { + fn into_input(self) -> Result, MempoolError>; +} + +pub struct PoolTx(pub Tx); + +impl PoolTx { + pub fn new(tx: Tx) -> Self { + Self(tx) + } +} + +impl IntoPoolTransactionInput for TransactionSigned { + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::Signed(self)) + } +} + +impl IntoPoolTransactionInput for TransactionSignedEcRecovered { + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::Recovered(self)) + } +} + +impl IntoPoolTransactionInput for PoolTx { + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::Pooled(self)) + } +} + +impl IntoPoolTransactionInput for PooledTransactionsElementEcRecovered +where + Tx: PoolTransaction, + Tx::Pooled: From, +{ + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::PooledEcRecovered(self)) + } +} + +impl IntoPoolTransactionInput for PooledTransactionsElement +where + Tx: PoolTransaction, + Tx::Pooled: From, +{ + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::PooledRaw(self)) + } +} + +impl IntoPoolTransactionInput for TransactionSignedNoHash { + fn into_input(self) -> Result, MempoolError> { + Ok(PoolTransactionInput::Signed(self.into())) + } +} + +impl IntoPoolTransactionInput for Bytes +where + Tx: PoolTransaction, + Tx::Pooled: From, +{ + fn into_input(self) -> Result, MempoolError> { + decode_pooled_from_bytes(&self).map(PoolTransactionInput::PooledRaw) + } +} + +impl IntoPoolTransactionInput for Vec +where + Tx: PoolTransaction, + Tx::Pooled: From, +{ + fn into_input(self) -> Result, MempoolError> { + decode_pooled_from_bytes(&self).map(PoolTransactionInput::PooledRaw) + } +} + +impl IntoPoolTransactionInput for &[u8] +where + Tx: PoolTransaction, + Tx::Pooled: From, +{ + fn into_input(self) -> Result, MempoolError> { + decode_pooled_from_bytes(self).map(PoolTransactionInput::PooledRaw) + } +} + +fn decode_pooled_from_bytes(bytes: &[u8]) -> Result { + let mut slice = bytes; + PooledTransactionsElement::decode_2718(&mut slice) + .map_err(|err| MempoolError::Conversion(err.to_string())) +} + +/// Simplified pool stats view for metrics (ADR-006) +pub struct PoolStats { + pub pending: usize, + pub queued: usize, + pub total: usize, +} + +// Note: We use Reth's standard StateProvider trait from reth-storage-api. +// EL provides the implementation, but DCL Worker uses it for mempool validation. +// This follows ADR-006: Mempool integrates with DCL Workers for batch creation, +// while EL manages account state (nonce, balance) for transaction validation. + +#[cfg(test)] +mod tests { + use super::*; + // Note: Tests simplified - mock StateProvider removed. + // Real StateProvider comes from EL integration. + + #[test] + fn test_config_defaults() { + let config = MempoolConfig::default(); + assert_eq!(config.max_pending, 10_000); + assert_eq!(config.max_queued_per_account, 100); + assert_eq!(config.max_nonce_gap, 16); + assert_eq!(config.min_gas_price, 1_000_000_000); + } + + #[test] + fn test_bft_policy_min_gas_price() { + // BFT Policy: Minimum gas price enforcement + let min_gas_price = 1_000_000_000u128; + + // Below minimum - rejected by CipherBFT + let gas_price = 500_000_000u128; + assert!(gas_price < min_gas_price); + + // At minimum - accepted + let gas_price = 1_000_000_000u128; + assert!(gas_price >= min_gas_price); + + // Above minimum - accepted + let gas_price = 2_000_000_000u128; + assert!(gas_price >= min_gas_price); + } + + #[test] + fn test_bft_policy_nonce_gap() { + // BFT Policy: Max nonce gap prevents queue bloat + // Reth handles nonce ordering (too low, duplicates) + // CipherBFT adds gap limit for far-future nonces + let current_nonce = 10u64; + let max_gap = 16u64; + + // No gap (next nonce) - accepted + let tx_nonce = 11u64; + let gap = tx_nonce - current_nonce - 1; + assert_eq!(gap, 0); + assert!(gap <= max_gap); + + // Gap within limit - accepted + let tx_nonce = 26u64; + let gap = tx_nonce - current_nonce - 1; + assert_eq!(gap, 15); + assert!(gap <= max_gap); + + // Gap exceeds limit - rejected by CipherBFT + let tx_nonce = 28u64; + let gap = tx_nonce - current_nonce - 1; + assert_eq!(gap, 17); + assert!(gap > max_gap); + } + + // Note: The following validations are delegated to Reth and NOT tested here: + // - Balance sufficiency: Reth checks sender has enough for gas + value + // - Transaction size limits: Reth enforces based on protocol rules + // - Gas limit vs block limit: Reth validates against chain spec + // - Nonce ordering (too low): Reth maintains nonce sequence per account + // - Signature verification: Reth validates ECDSA signatures + // + // CipherBFT only adds BFT-specific policies tested above: + // - Minimum gas price (spam prevention) + // - Maximum nonce gap (queue bloat prevention) +} diff --git a/crates/mempool/src/transaction.rs b/crates/mempool/src/transaction.rs new file mode 100644 index 0000000..ab3805d --- /dev/null +++ b/crates/mempool/src/transaction.rs @@ -0,0 +1,55 @@ +//! Transaction ordering helpers built on top of Reth's pool traits. + +use reth_transaction_pool::PoolTransaction; + +/// Ordering key derived from a pool transaction. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TransactionOrdering { + pub effective_gas_price: u128, + pub nonce: u64, +} + +impl TransactionOrdering { + /// Build an ordering key from any Reth pool transaction. + pub fn from_pool_transaction(tx: &T) -> Self { + Self { + effective_gas_price: tx.priority_fee_or_price(), + nonce: tx.nonce(), + } + } +} + +impl Ord for TransactionOrdering { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Higher gas price comes first + match other.effective_gas_price.cmp(&self.effective_gas_price) { + std::cmp::Ordering::Equal => self.nonce.cmp(&other.nonce), + ordering => ordering, + } + } +} + +impl PartialOrd for TransactionOrdering { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ordering_manual() { + let high = TransactionOrdering { + effective_gas_price: 200, + nonce: 1, + }; + let low = TransactionOrdering { + effective_gas_price: 100, + nonce: 0, + }; + + assert!(high < low); + } +} diff --git a/crates/mempool/src/validator.rs b/crates/mempool/src/validator.rs new file mode 100644 index 0000000..c15c5ff --- /dev/null +++ b/crates/mempool/src/validator.rs @@ -0,0 +1,120 @@ +//! CipherBFT transaction validator wrapper for Reth's pool. + +use reth_storage_api::StateProviderFactory; +use reth_transaction_pool::{ + blobstore::BlobStore, + validate::{EthTransactionValidator, EthTransactionValidatorBuilder}, + EthPoolTransaction, TransactionOrigin, TransactionValidationOutcome, TransactionValidator, +}; +use std::sync::Arc; + +/// CipherBFT-specific validation that wraps a Reth `TransactionValidator`. +#[derive(Debug, Clone)] +pub struct CipherBftValidator { + inner: V, +} + +impl CipherBftValidator { + /// Create a new wrapper around the given validator. + pub fn wrap(inner: V) -> Self { + Self { inner } + } +} + +impl CipherBftValidator, Tx>> +where + Client: StateProviderFactory, + Tx: EthPoolTransaction, +{ + /// Build a wrapper with a reth EthTransactionValidator. + pub fn new( + chain_spec: Arc, + client: Arc, + blob_store: S, + ) -> Self + where + S: BlobStore, + { + let validator = EthTransactionValidatorBuilder::new(chain_spec).build(client, blob_store); + Self::wrap(validator) + } +} +// to add custom validation logic, modify the validate_transaction method +impl TransactionValidator for CipherBftValidator { + type Transaction = V::Transaction; + + async fn validate_transaction( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> TransactionValidationOutcome { + self.inner.validate_transaction(origin, transaction).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{Address, U256}; + use reth_chainspec::{Chain, ChainSpecBuilder, MAINNET}; + use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; + use reth_transaction_pool::blobstore::InMemoryBlobStore; + use reth_transaction_pool::test_utils::TransactionBuilder; + use reth_transaction_pool::validate::EthTransactionValidator; + use reth_transaction_pool::EthPooledTransaction; + use reth_transaction_pool::PoolTransaction; + use std::sync::Arc; + + fn build_test_pooled_tx(chain_id: u64) -> EthPooledTransaction { + TransactionBuilder::default() + .chain_id(chain_id) + .to(Address::ZERO) + .gas_limit(100_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000_000) + .into_eip1559() + .into_ecrecovered() + .expect("recover signed transaction") + .try_into() + .expect("convert to pooled transaction") + } + + fn build_test_validator( + chain_id: u64, + tx: &EthPooledTransaction, + ) -> CipherBftValidator, EthPooledTransaction>> + { + let chain_spec = ChainSpecBuilder::mainnet() + .chain(Chain::from_id(chain_id)) + .build(); + let provider = Arc::new(MockEthProvider::default()); + provider.add_account(tx.sender(), ExtendedAccount::new(tx.nonce(), U256::MAX)); + let blob_store = InMemoryBlobStore::default(); + CipherBftValidator::new(Arc::new(chain_spec), provider, blob_store) + } + + #[tokio::test] + async fn test_chain_id_mismatch_invalid() { + let tx = build_test_pooled_tx(MAINNET.chain.id()); + let validator = build_test_validator(2, &tx); + + let outcome = validator + .validate_transaction(TransactionOrigin::External, tx) + .await; + + assert!(outcome.is_invalid()); + } + + #[tokio::test] + async fn test_chain_id_match_valid() { + let chain_id = MAINNET.chain.id(); + let tx = build_test_pooled_tx(chain_id); + let validator = build_test_validator(chain_id, &tx); + + let outcome = validator + .validate_transaction(TransactionOrigin::External, tx) + .await; + + assert!(outcome.is_valid()); + } +} diff --git a/crates/mempool/tests/mempool_tests.rs b/crates/mempool/tests/mempool_tests.rs new file mode 100644 index 0000000..04875d5 --- /dev/null +++ b/crates/mempool/tests/mempool_tests.rs @@ -0,0 +1,170 @@ +use alloy_primitives::address; +use mempool::{CipherBftPool, MempoolConfig}; +use reth_primitives::TransactionSignedEcRecovered; +use reth_provider::test_utils::NoopProvider; +use reth_transaction_pool::test_utils::{MockTransaction, TestPoolBuilder}; +use reth_transaction_pool::{PoolConfig, SubPoolLimit, TransactionOrigin}; +use std::sync::Arc; + +fn noop_state_provider() -> Arc { + Arc::new(NoopProvider::default()) +} + +fn recovered_tx( + sender: alloy_primitives::Address, + nonce: u64, + gas_price: u128, +) -> TransactionSignedEcRecovered { + let mock = MockTransaction::legacy() + .with_sender(sender) + .with_nonce(nonce) + .with_gas_price(gas_price); + TransactionSignedEcRecovered::from(mock) +} + +#[tokio::test] +async fn test_transaction_insertion_and_retrieval() { + let pool: reth_transaction_pool::test_utils::TestPool = TestPoolBuilder::default().into(); + let mempool: CipherBftPool = + CipherBftPool::wrap(pool, MempoolConfig::default(), noop_state_provider()); + + let tx = recovered_tx( + address!("1000000000000000000000000000000000000001"), + 0, + 2_000_000_000, + ); + let tx_hash = *tx.clone().into_signed().hash(); + + mempool + .add_transaction(TransactionOrigin::External, tx) + .await + .unwrap(); + + let pending = mempool.adapter().pending_transactions(); + assert_eq!(pending.len(), 1); + assert_eq!(*pending[0].hash(), tx_hash); +} + +#[tokio::test] +async fn test_priority_ordering_by_gas_price() { + let pool: reth_transaction_pool::test_utils::TestPool = TestPoolBuilder::default().into(); + let mempool: CipherBftPool = + CipherBftPool::wrap(pool, MempoolConfig::default(), noop_state_provider()); + + let low = recovered_tx( + address!("1000000000000000000000000000000000000002"), + 0, + 1_500_000_000, + ); + let high = recovered_tx( + address!("1000000000000000000000000000000000000003"), + 0, + 3_000_000_000, + ); + let high_hash = *high.clone().into_signed().hash(); + + mempool + .add_transaction(TransactionOrigin::External, low) + .await + .unwrap(); + mempool + .add_transaction(TransactionOrigin::External, high) + .await + .unwrap(); + + let batch = mempool + .adapter() + .get_transactions_for_batch(2, 1_000_000_000); + assert_eq!(batch.len(), 2); + assert_eq!(*batch[0].hash(), high_hash); +} + +#[tokio::test] +async fn test_replacement_logic() { + let pool: reth_transaction_pool::test_utils::TestPool = TestPoolBuilder::default().into(); + let mempool: CipherBftPool = + CipherBftPool::wrap(pool, MempoolConfig::default(), noop_state_provider()); + + let sender = address!("1000000000000000000000000000000000000004"); + let low = recovered_tx(sender, 0, 1_000_000_000); + let high = recovered_tx(sender, 0, 2_000_000_000); + let high_hash = *high.clone().into_signed().hash(); + + mempool + .add_transaction(TransactionOrigin::External, low) + .await + .unwrap(); + mempool + .add_transaction(TransactionOrigin::External, high) + .await + .unwrap(); + + let pending = mempool.adapter().pending_transactions(); + assert_eq!(pending.len(), 1); + assert_eq!(*pending[0].hash(), high_hash); +} + +#[tokio::test] +async fn test_pending_queued_promotion() { + let pool: reth_transaction_pool::test_utils::TestPool = TestPoolBuilder::default().into(); + let mempool: CipherBftPool = + CipherBftPool::wrap(pool, MempoolConfig::default(), noop_state_provider()); + + let sender = address!("1000000000000000000000000000000000000005"); + let nonce_one = recovered_tx(sender, 1, 2_000_000_000); + let nonce_zero = recovered_tx(sender, 0, 2_000_000_000); + + mempool + .add_transaction(TransactionOrigin::External, nonce_one) + .await + .unwrap(); + assert_eq!(mempool.adapter().pending_transactions().len(), 0); + assert_eq!(mempool.adapter().queued_transactions().len(), 1); + + mempool + .add_transaction(TransactionOrigin::External, nonce_zero) + .await + .unwrap(); + assert_eq!(mempool.adapter().queued_transactions().len(), 0); + assert_eq!(mempool.adapter().pending_transactions().len(), 2); +} + +#[tokio::test] +async fn test_eviction_under_pressure() { + let config = PoolConfig { + pending_limit: SubPoolLimit::new(1, usize::MAX), + basefee_limit: SubPoolLimit::new(0, usize::MAX), + queued_limit: SubPoolLimit::new(0, usize::MAX), + blob_limit: SubPoolLimit::new(0, usize::MAX), + max_account_slots: 1, + ..Default::default() + }; + + let pool: reth_transaction_pool::test_utils::TestPool = + TestPoolBuilder::default().with_config(config).into(); + let mempool: CipherBftPool = + CipherBftPool::wrap(pool, MempoolConfig::default(), noop_state_provider()); + + let low = recovered_tx( + address!("1000000000000000000000000000000000000010"), + 0, + 1_000_000_000, + ); + let high = recovered_tx( + address!("1000000000000000000000000000000000000011"), + 0, + 2_000_000_000, + ); + + mempool + .add_transaction(TransactionOrigin::External, low) + .await + .unwrap(); + mempool + .add_transaction(TransactionOrigin::External, high) + .await + .unwrap(); + + let pending = mempool.adapter().pending_transactions(); + assert_eq!(pending.len(), 1); +} diff --git a/deny.toml b/deny.toml index df5f7b6..4aaaceb 100644 --- a/deny.toml +++ b/deny.toml @@ -8,7 +8,17 @@ all-features = true [advisories] version = 2 db-path = "~/.cargo/advisory-db" -ignore = [] +ignore = [ + # paste crate unmaintained - transitive dependency from alloy-primitives 1.x + # Used as proc-macro only, not a runtime security concern + "RUSTSEC-2024-0436", + # proc-macro-error unmaintained - transitive dependency via reth tooling + # Used as proc-macro only, not a runtime security concern + "RUSTSEC-2024-0370", + # bincode unmaintained - transitive dependency in core crates + # No maintained upgrade available; revisit if replacement is planned. + "RUSTSEC-2025-0141", +] [licenses] version = 2