diff --git a/.env.example b/.env.example index bb4a66a..4058bd9 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-bundles-kafka-pro TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE=/app/docker/ingress-audit-kafka-properties TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit +TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE=/app/docker/ingress-user-operation-consumer-kafka-properties TIPS_INGRESS_LOG_LEVEL=info TIPS_INGRESS_LOG_FORMAT=pretty TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800 diff --git a/Cargo.lock b/Cargo.lock index 49c509f..079de98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,10 +15,13 @@ dependencies = [ "async-trait", "jsonrpsee", "op-alloy-network", + "rdkafka", "reth-rpc-eth-types", "serde", "serde_json", + "tips-core", "tokio", + "tracing", "wiremock", ] diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index 55277ca..39b1591 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -22,8 +22,11 @@ jsonrpsee.workspace = true async-trait = { workspace = true } alloy-sol-types.workspace= true anyhow.workspace = true +rdkafka.workspace = true +serde_json.workspace = true +tips-core.workspace = true +tracing.workspace=true [dev-dependencies] alloy-primitives.workspace = true -serde_json.workspace = true wiremock.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs new file mode 100644 index 0000000..7165ef4 --- /dev/null +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -0,0 +1,275 @@ +use crate::mempool::PoolConfig; +use crate::mempool::{self, Mempool}; +use crate::types::WrappedUserOperation; +use async_trait::async_trait; +use rdkafka::{ + ClientConfig, Message, + consumer::{Consumer, StreamConsumer}, + message::OwnedMessage, +}; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::sync::Arc; +use tips_core::kafka::load_kafka_config_from_file; +use tokio::sync::RwLock; +use tracing::{info, warn}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "event", content = "data")] +pub enum KafkaEvent { + UserOpAdded { + user_op: WrappedUserOperation, + }, + UserOpIncluded { + user_op: WrappedUserOperation, + }, + UserOpDropped { + user_op: WrappedUserOperation, + reason: String, + }, +} + +#[async_trait] +pub trait KafkaConsumer: Send + Sync { + async fn recv_msg(&self) -> anyhow::Result; +} + +#[async_trait] +impl KafkaConsumer for StreamConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Ok(self.recv().await?.detach()) + } +} + +pub struct KafkaMempoolEngine { + mempool: Arc>, + kafka_consumer: Arc, +} + +impl KafkaMempoolEngine { + pub fn new( + mempool: Arc>, + kafka_consumer: Arc, + ) -> Self { + Self { + mempool, + kafka_consumer, + } + } + + pub fn with_kafka_consumer( + kafka_consumer: Arc, + pool_config: Option, + ) -> Self { + let pool_config = pool_config.unwrap_or(PoolConfig::default()); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); + Self { + mempool, + kafka_consumer, + } + } + + pub fn get_mempool(&self) -> Arc> { + self.mempool.clone() + } + + pub async fn run(&self) { + loop { + if let Err(err) = self.process_next().await { + warn!(error = %err, "Kafka mempool engine error, continuing"); + } + } + } + + /// Process a single Kafka message (useful for tests and controlled loops) + pub async fn process_next(&self) -> anyhow::Result<()> { + let msg = self.kafka_consumer.recv_msg().await?; + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload) + .map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + self.handle_event(event).await + } + + async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { + info!( + event = ?event, + "Kafka mempool engine handling event" + ); + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + } + Ok(()) + } +} + +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + client_config.set("group.id", consumer_group_id); + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mempool::PoolConfig; + use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::Timestamp; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + #[tokio::test] + async fn handle_add_operation() { + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); + + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + + let add_event = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![OwnedMessage::new( + Some(serde_json::to_vec(&add_event).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + )])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + + // Process add then remove deterministically + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + } + + #[tokio::test] + async fn remove_opperation_should_remove_from_mempool() { + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + let add_mempool = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let remove_mempool = KafkaEvent::UserOpDropped { + user_op: wrapped.clone(), + reason: "test".to_string(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![ + OwnedMessage::new( + Some(serde_json::to_vec(&add_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + OwnedMessage::new( + Some(serde_json::to_vec(&remove_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + ])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 0); + } + struct MockConsumer { + msgs: Mutex>, + } + + impl MockConsumer { + fn new(msgs: Vec) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + Ok(guard.remove(0)) + } + } + } +} diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index fe08aa7..4fda184 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,3 +3,6 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; +pub mod kafka_mempool_engine; +pub mod mempool; +pub mod reputation_service; diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs new file mode 100644 index 0000000..ecb46be --- /dev/null +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -0,0 +1,481 @@ +use crate::types::{UserOpHash, WrappedUserOperation}; +use alloy_primitives::Address; +use std::cmp::Ordering; +use std::collections::{BTreeSet, HashMap}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; + +pub struct PoolConfig { + minimum_max_fee_per_gas: u128, +} + +impl PoolConfig { + pub fn default() -> Self { + Self { + minimum_max_fee_per_gas: 0, + } + } +} + +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct OrderedPoolOperation { + pub pool_operation: WrappedUserOperation, + pub submission_id: u64, +} + +impl OrderedPoolOperation { + pub fn from_wrapped(operation: &WrappedUserOperation, submission_id: u64) -> Self { + Self { + pool_operation: operation.clone(), + submission_id, + } + } + + pub fn sender(&self) -> Address { + self.pool_operation.operation.sender() + } +} + +/// Ordering by max priority fee (desc) then submission id, then hash to ensure total order +#[derive(Clone, Debug)] +pub struct ByMaxFeeAndSubmissionId(pub OrderedPoolOperation); + +impl PartialEq for ByMaxFeeAndSubmissionId { + fn eq(&self, other: &Self) -> bool { + self.0.pool_operation.hash == other.0.pool_operation.hash + } +} +impl Eq for ByMaxFeeAndSubmissionId {} + +impl PartialOrd for ByMaxFeeAndSubmissionId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByMaxFeeAndSubmissionId { + /// TODO: There can be invalid opperations, where base fee, + expected gas price + /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. + fn cmp(&self, other: &Self) -> Ordering { + other + .0 + .pool_operation + .operation + .max_priority_fee_per_gas() + .cmp(&self.0.pool_operation.operation.max_priority_fee_per_gas()) + .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) + } +} + +/// Ordering by nonce (asc), then submission id, then hash to ensure total order +#[derive(Clone, Debug)] +pub struct ByNonce(pub OrderedPoolOperation); + +impl PartialEq for ByNonce { + fn eq(&self, other: &Self) -> bool { + self.0.pool_operation.hash == other.0.pool_operation.hash + } +} +impl Eq for ByNonce {} + +impl PartialOrd for ByNonce { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByNonce { + /// TODO: There can be invalid opperations, where base fee, + expected gas price + /// is greater that the maximum gas, in that case we don't include it in the mempool as such mempool changes. + fn cmp(&self, other: &Self) -> Ordering { + self.0 + .pool_operation + .operation + .nonce() + .cmp(&other.0.pool_operation.operation.nonce()) + .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) + .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) + } +} + +pub trait Mempool { + fn add_operation( + &mut self, + operation: &WrappedUserOperation, + ) -> Result, anyhow::Error>; + fn get_top_operations(&self, n: usize) -> impl Iterator; + fn remove_operation( + &mut self, + operation_hash: &UserOpHash, + ) -> Result, anyhow::Error>; +} + +pub struct MempoolImpl { + config: PoolConfig, + best: BTreeSet, + hash_to_operation: HashMap, + operations_by_account: HashMap>, + submission_id_counter: AtomicU64, +} + +impl Mempool for MempoolImpl { + fn add_operation( + &mut self, + operation: &WrappedUserOperation, + ) -> Result, anyhow::Error> { + if operation.operation.max_fee_per_gas() < self.config.minimum_max_fee_per_gas { + return Err(anyhow::anyhow!( + "Gas price is below the minimum required PVG gas" + )); + } + let ordered_operation_result = self.handle_add_operation(operation)?; + Ok(ordered_operation_result) + } + + fn get_top_operations(&self, n: usize) -> impl Iterator { + // TODO: There is a case where we skip operations that are not the lowest nonce for an account. + // But we still have not given the N number of operations, meaning we don't return those operations. + + self.best + .iter() + .filter_map(|op_by_fee| { + let lowest = self + .operations_by_account + .get(&op_by_fee.0.sender()) + .and_then(|set| set.first()); + + match lowest { + Some(lowest) + if lowest.0.pool_operation.hash == op_by_fee.0.pool_operation.hash => + { + Some(op_by_fee.0.pool_operation.clone()) + } + Some(_) => None, + None => { + println!( + "No operations found for account: {} but one was found in the best set", + op_by_fee.0.sender() + ); + None + } + } + }) + .take(n) + } + + fn remove_operation( + &mut self, + operation_hash: &UserOpHash, + ) -> Result, anyhow::Error> { + if let Some(ordered_operation) = self.hash_to_operation.remove(operation_hash) { + self.best + .remove(&ByMaxFeeAndSubmissionId(ordered_operation.clone())); + self.operations_by_account + .get_mut(&ordered_operation.sender()) + .map(|set| set.remove(&ByNonce(ordered_operation.clone()))); + Ok(Some(ordered_operation.pool_operation)) + } else { + Ok(None) + } + } +} + +// When user opperation is added to the mempool we need to check + +impl MempoolImpl { + fn handle_add_operation( + &mut self, + operation: &WrappedUserOperation, + ) -> Result, anyhow::Error> { + // Account + if self.hash_to_operation.contains_key(&operation.hash) { + return Ok(None); + } + + let order = self.get_next_order_id(); + let ordered_operation = OrderedPoolOperation::from_wrapped(operation, order); + + self.best + .insert(ByMaxFeeAndSubmissionId(ordered_operation.clone())); + self.operations_by_account + .entry(ordered_operation.sender()) + .or_default() + .insert(ByNonce(ordered_operation.clone())); + self.hash_to_operation + .insert(operation.hash, ordered_operation.clone()); + Ok(Some(ordered_operation)) + } + + fn get_next_order_id(&self) -> u64 { + self.submission_id_counter + .fetch_add(1, AtomicOrdering::SeqCst) + } + + pub fn new(config: PoolConfig) -> Self { + Self { + config, + best: BTreeSet::new(), + hash_to_operation: HashMap::new(), + operations_by_account: HashMap::new(), + submission_id_counter: AtomicU64::new(0), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + fn create_test_user_operation(max_priority_fee_per_gas: u128) -> VersionedUserOperation { + VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::random(), + nonce: Uint::from(0), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100000), + verification_gas_limit: Uint::from(100000), + pre_verification_gas: Uint::from(21000), + max_fee_per_gas: Uint::from(max_priority_fee_per_gas), + max_priority_fee_per_gas: Uint::from(max_priority_fee_per_gas), + paymaster_and_data: Default::default(), + signature: Default::default(), + }) + } + + fn create_wrapped_operation( + max_priority_fee_per_gas: u128, + hash: UserOpHash, + ) -> WrappedUserOperation { + WrappedUserOperation { + operation: create_test_user_operation(max_priority_fee_per_gas), + hash, + } + } + + fn create_test_mempool(minimum_required_pvg_gas: u128) -> MempoolImpl { + MempoolImpl::new(PoolConfig { + minimum_max_fee_per_gas: minimum_required_pvg_gas, + }) + } + + // Tests successfully adding a valid operation to the mempool + #[test] + fn test_add_operation_success() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_wrapped_operation(2000, hash); + + let result = mempool.add_operation(&operation); + + assert!(result.is_ok()); + let ordered_op = result.unwrap(); + assert!(ordered_op.is_some()); + let ordered_op = ordered_op.unwrap(); + assert_eq!(ordered_op.pool_operation.hash, hash); + assert_eq!( + ordered_op.pool_operation.operation.max_fee_per_gas(), + Uint::from(2000) + ); + } + + // Tests adding an operation with a gas price below the minimum required PVG gas + #[test] + fn test_add_operation_below_minimum_gas() { + let mut mempool = create_test_mempool(2000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_wrapped_operation(1000, hash); + + let result = mempool.add_operation(&operation); + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Gas price is below the minimum required PVG gas") + ); + } + + // Tests adding multiple operations with different hashes + #[test] + fn test_add_multiple_operations_with_different_hashes() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_wrapped_operation(2000, hash1); + let result1 = mempool.add_operation(&operation1); + assert!(result1.is_ok()); + assert!(result1.unwrap().is_some()); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_wrapped_operation(3000, hash2); + let result2 = mempool.add_operation(&operation2); + assert!(result2.is_ok()); + assert!(result2.unwrap().is_some()); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_wrapped_operation(1500, hash3); + let result3 = mempool.add_operation(&operation3); + assert!(result3.is_ok()); + assert!(result3.unwrap().is_some()); + + assert_eq!(mempool.hash_to_operation.len(), 3); + assert_eq!(mempool.best.len(), 3); + } + + // Tests removing an operation that is not in the mempool + #[test] + fn test_remove_operation_not_in_mempool() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } + + // Tests removing an operation that exists in the mempool + #[test] + fn test_remove_operation_exists() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_wrapped_operation(2000, hash); + + mempool.add_operation(&operation).unwrap(); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + let removed = result.unwrap(); + assert!(removed.is_some()); + let removed_op = removed.unwrap(); + assert_eq!(removed_op.hash, hash); + assert_eq!(removed_op.operation.max_fee_per_gas(), Uint::from(2000)); + } + + // Tests removing an operation and checking the best operations + #[test] + fn test_remove_operation_and_check_best() { + let mut mempool = create_test_mempool(1000); + let hash = FixedBytes::from([1u8; 32]); + let operation = create_wrapped_operation(2000, hash); + + mempool.add_operation(&operation).unwrap(); + + let best_before: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best_before.len(), 1); + assert_eq!(best_before[0].hash, hash); + + let result = mempool.remove_operation(&hash); + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + + let best_after: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best_after.len(), 0); + } + + // Tests getting the top operations with ordering + #[test] + fn test_get_top_operations_ordering() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_wrapped_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_wrapped_operation(3000, hash2); + mempool.add_operation(&operation2).unwrap(); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_wrapped_operation(1500, hash3); + mempool.add_operation(&operation3).unwrap(); + + let best: Vec<_> = mempool.get_top_operations(10).collect(); + assert_eq!(best.len(), 3); + assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); + assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); + assert_eq!(best[2].operation.max_fee_per_gas(), Uint::from(1500)); + } + + // Tests getting the top operations with a limit + #[test] + fn test_get_top_operations_limit() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_wrapped_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_wrapped_operation(3000, hash2); + mempool.add_operation(&operation2).unwrap(); + + let hash3 = FixedBytes::from([3u8; 32]); + let operation3 = create_wrapped_operation(1500, hash3); + mempool.add_operation(&operation3).unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 2); + assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); + assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); + } + + // Tests top opperations tie breaker with submission id + #[test] + fn test_get_top_operations_submission_id_tie_breaker() { + let mut mempool = create_test_mempool(1000); + + let hash1 = FixedBytes::from([1u8; 32]); + let operation1 = create_wrapped_operation(2000, hash1); + mempool.add_operation(&operation1).unwrap().unwrap(); + + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = create_wrapped_operation(2000, hash2); + mempool.add_operation(&operation2).unwrap().unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 2); + assert_eq!(best[0].hash, hash1); + assert_eq!(best[1].hash, hash2); + } + + #[test] + fn test_get_top_operations_should_return_the_lowest_nonce_operation_for_each_account() { + let mut mempool = create_test_mempool(1000); + let hash1 = FixedBytes::from([1u8; 32]); + let test_user_operation = create_test_user_operation(2000); + + // Destructure to the inner struct, then update nonce + let base_op = match test_user_operation.clone() { + VersionedUserOperation::UserOperation(op) => op, + _ => panic!("expected UserOperation variant"), + }; + + let operation1 = WrappedUserOperation { + operation: VersionedUserOperation::UserOperation(erc4337::UserOperation { + nonce: Uint::from(0), + max_fee_per_gas: Uint::from(2000), + ..base_op.clone() + }), + hash: hash1, + }; + + mempool.add_operation(&operation1).unwrap().unwrap(); + let hash2 = FixedBytes::from([2u8; 32]); + let operation2 = WrappedUserOperation { + operation: VersionedUserOperation::UserOperation(erc4337::UserOperation { + nonce: Uint::from(1), + max_fee_per_gas: Uint::from(10_000), + ..base_op.clone() + }), + hash: hash2, + }; + mempool.add_operation(&operation2).unwrap().unwrap(); + + let best: Vec<_> = mempool.get_top_operations(2).collect(); + assert_eq!(best.len(), 1); + assert_eq!(best[0].operation.nonce(), Uint::from(0)); + } +} diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs new file mode 100644 index 0000000..c946073 --- /dev/null +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -0,0 +1,35 @@ +use crate::mempool; +use alloy_primitives::Address; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Reputation status for an entity +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ReputationStatus { + /// Entity is not throttled or banned + Ok, + /// Entity is throttled + Throttled, + /// Entity is banned + Banned, +} + +pub trait ReputationService { + fn get_reputation(&self, entity: &Address) -> ReputationStatus; +} + +pub struct ReputationServiceImpl { + mempool: Arc>, +} + +impl ReputationServiceImpl { + pub async fn new(mempool: Arc>) -> Self { + Self { mempool } + } +} + +impl ReputationService for ReputationServiceImpl { + fn get_reputation(&self, _entity: &Address) -> ReputationStatus { + ReputationStatus::Ok + } +} diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 03e3eb2..3d79564 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -1,5 +1,5 @@ use crate::entrypoints::{v06, v07, version::EntryPointVersion}; -use alloy_primitives::{Address, B256, ChainId, U256}; +use alloy_primitives::{Address, B256, ChainId, FixedBytes, U256}; use alloy_rpc_types::erc4337; pub use alloy_rpc_types::erc4337::SendUserOperationResponse; use anyhow::Result; @@ -11,6 +11,35 @@ pub enum VersionedUserOperation { UserOperation(erc4337::UserOperation), PackedUserOperation(erc4337::PackedUserOperation), } + +impl VersionedUserOperation { + pub fn max_fee_per_gas(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.max_fee_per_gas, + VersionedUserOperation::PackedUserOperation(op) => op.max_fee_per_gas, + } + } + + pub fn max_priority_fee_per_gas(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.max_priority_fee_per_gas, + VersionedUserOperation::PackedUserOperation(op) => op.max_priority_fee_per_gas, + } + } + pub fn nonce(&self) -> U256 { + match self { + VersionedUserOperation::UserOperation(op) => op.nonce, + VersionedUserOperation::PackedUserOperation(op) => op.nonce, + } + } + + pub fn sender(&self) -> Address { + match self { + VersionedUserOperation::UserOperation(op) => op.sender, + VersionedUserOperation::PackedUserOperation(op) => op.sender, + } + } +} #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UserOperationRequest { pub user_operation: VersionedUserOperation, @@ -107,6 +136,20 @@ pub struct AggregatorInfo { pub stake_info: EntityStakeInfo, } +pub type UserOpHash = FixedBytes<32>; + +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct WrappedUserOperation { + pub operation: VersionedUserOperation, + pub hash: UserOpHash, +} + +impl WrappedUserOperation { + pub fn has_higher_max_fee(&self, other: &WrappedUserOperation) -> bool { + self.operation.max_fee_per_gas() > other.operation.max_fee_per_gas() + } +} + // Tests #[cfg(test)] mod tests { diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 67da612..ef49082 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -1,3 +1,4 @@ +use account_abstraction_core::kafka_mempool_engine::create_mempool_engine; use alloy_provider::ProviderBuilder; use clap::Parser; use jsonrpsee::server::Server; @@ -73,6 +74,20 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); + let user_op_properties_file = &config.user_operation_consumer_properties; + + let mempool_engine = create_mempool_engine( + user_op_properties_file, + &config.user_operation_topic, + &config.user_operation_consumer_group_id, + None, + )?; + + let mempool_engine_handle = { + let engine = mempool_engine.clone(); + tokio::spawn(async move { engine.run().await }) + }; + let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); @@ -95,6 +110,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + mempool_engine.clone(), cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); @@ -110,6 +126,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; health_handle.abort(); + mempool_engine_handle.abort(); Ok(()) } diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 649e120..b694d03 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -85,6 +85,21 @@ pub struct Config { )] pub audit_topic: String, + /// Kafka properties file for the user operation consumer + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" + )] + pub user_operation_consumer_properties: String, + + /// Consumer group id for user operation topic (set uniquely per deployment) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_GROUP_ID", + default_value = "tips-user-operation" + )] + pub user_operation_consumer_group_id: String, + /// User operation topic for pushing valid user operations #[arg( long, diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index ee6063c..d5b0a91 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,7 @@ -use account_abstraction_core::types::VersionedUserOperation; +use account_abstraction_core::{ + kafka_mempool_engine::KafkaEvent, + types::{VersionedUserOperation, WrappedUserOperation}, +}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -79,9 +82,25 @@ impl UserOpQueuePublisher { pub async fn publish(&self, user_op: &VersionedUserOperation, hash: &B256) -> Result<()> { let key = hash.to_string(); - let payload = serde_json::to_vec(&user_op)?; + let event = self.create_user_op_added_event(user_op, hash); + let payload = serde_json::to_vec(&event)?; self.queue.publish(&self.topic, &key, &payload).await } + + fn create_user_op_added_event( + &self, + user_op: &VersionedUserOperation, + hash: &B256, + ) -> KafkaEvent { + let wrapped_user_op = WrappedUserOperation { + operation: user_op.clone(), + hash: *hash, + }; + + KafkaEvent::UserOpAdded { + user_op: wrapped_user_op, + } + } } pub struct BundleQueuePublisher { diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index f0d3341..7442562 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,3 +1,5 @@ +use account_abstraction_core::kafka_mempool_engine::KafkaMempoolEngine; +use account_abstraction_core::reputation_service::ReputationStatus; use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{Address, B256, Bytes, FixedBytes}; @@ -25,7 +27,10 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl}; +use account_abstraction_core::{ + AccountAbstractionService, AccountAbstractionServiceImpl, + reputation_service::{ReputationService, ReputationServiceImpl}, +}; use std::sync::Arc; /// RPC providers for different endpoints @@ -69,6 +74,8 @@ pub struct IngressService { tx_submission_method: TxSubmissionMethod, bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, + reputation_service: Arc, + mempool_engine: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -86,6 +93,7 @@ impl IngressService { audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, + mempool_engine: Arc, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -97,6 +105,7 @@ impl IngressService { config.validate_user_operation_timeout_ms, ); let queue_connection = Arc::new(queue); + Self { mempool_provider, simulation_provider, @@ -111,6 +120,8 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool_engine.get_mempool())), + mempool_engine, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -347,6 +358,20 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; + let reputation = self + .reputation_service + .get_reputation(&request.user_operation.sender()); + if reputation == ReputationStatus::Banned { + return Err( + EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err(), + ); + } else if reputation == ReputationStatus::Throttled { + return Err( + EthApiError::InvalidParams("User operation sender is throttled".into()) + .into_rpc_err(), + ); + } + let _ = self .account_abstraction_service .validate_user_operation(&request.user_operation, &entry_point) @@ -493,6 +518,7 @@ impl IngressService { mod tests { use super::*; use crate::{Config, TxSubmissionMethod, queue::MessageQueue}; + use account_abstraction_core::kafka_mempool_engine::KafkaConsumer; use alloy_provider::RootProvider; use anyhow::Result; use async_trait::async_trait; @@ -500,6 +526,7 @@ mod tests { use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use mockall::mock; + use rdkafka::message::OwnedMessage; use serde_json::json; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -516,6 +543,15 @@ mod tests { } } + struct NoopConsumer; + + #[async_trait] + impl KafkaConsumer for NoopConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Err(anyhow::anyhow!("no messages")) + } + } + fn create_test_config(mock_server: &MockServer) -> Config { Config { address: IpAddr::from([127, 0, 0, 1]), @@ -526,6 +562,8 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), + user_operation_consumer_properties: String::new(), + user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, send_transaction_default_lifetime_seconds: 300, @@ -652,8 +690,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); let bundle = Bundle::default(); @@ -711,8 +760,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); // Valid signed transaction bytes diff --git a/docker-compose.tips.yml b/docker-compose.tips.yml index 666802a..c73c226 100644 --- a/docker-compose.tips.yml +++ b/docker-compose.tips.yml @@ -15,6 +15,10 @@ services: volumes: - ./docker/ingress-bundles-kafka-properties:/app/docker/ingress-bundles-kafka-properties:ro - ./docker/ingress-audit-kafka-properties:/app/docker/ingress-audit-kafka-properties:ro + - ./docker/ingress-user-operation-consumer-kafka-properties:/app/docker/ingress-user-operation-consumer-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped audit: @@ -28,6 +32,9 @@ services: - .env.docker volumes: - ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped ui: diff --git a/docker/ingress-user-operation-consumer-kafka-properties b/docker/ingress-user-operation-consumer-kafka-properties new file mode 100644 index 0000000..3bb02bf --- /dev/null +++ b/docker/ingress-user-operation-consumer-kafka-properties @@ -0,0 +1,9 @@ +# Kafka configuration properties for ingress user operation consumer +bootstrap.servers=host.docker.internal:9094 +message.timeout.ms=5000 +enable.partition.eof=false +session.timeout.ms=6000 +fetch.wait.max.ms=100 +fetch.min.bytes=1 +# Note: group.id and enable.auto.commit are set programmatically +