diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 9ee8f1fd16..69e2aa8a66 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -51,6 +51,11 @@ pub struct Arguments { #[clap(long, env)] pub ethflow_indexing_start: Option, + /// Enable the transfer listener that cancels orders when tokens are + /// transferred away. + #[clap(long, env, action = clap::ArgAction::Set, default_value = "false")] + pub transfer_listener_enabled: bool, + /// A tracing Ethereum node URL to connect to, allowing a separate node URL /// to be used exclusively for tracing calls. #[clap(long, env)] @@ -369,6 +374,7 @@ impl std::fmt::Display for Arguments { tracing_node_url, ethflow_contracts, ethflow_indexing_start, + transfer_listener_enabled, metrics_address, skip_event_sync, allowed_tokens, @@ -416,6 +422,7 @@ impl std::fmt::Display for Arguments { display_option(f, "tracing_node_url", tracing_node_url)?; writeln!(f, "ethflow_contracts: {ethflow_contracts:?}")?; writeln!(f, "ethflow_indexing_start: {ethflow_indexing_start:?}")?; + writeln!(f, "transfer_listener_enabled: {transfer_listener_enabled}")?; writeln!(f, "metrics_address: {metrics_address}")?; display_secret_option(f, "db_write_url", Some(&db_write_url))?; writeln!(f, "skip_event_sync: {skip_event_sync}")?; diff --git a/crates/autopilot/src/database/mod.rs b/crates/autopilot/src/database/mod.rs index 5f37629277..04bc972964 100644 --- a/crates/autopilot/src/database/mod.rs +++ b/crates/autopilot/src/database/mod.rs @@ -14,6 +14,7 @@ pub mod fee_policies; pub mod onchain_order_events; pub mod order_events; mod quotes; +pub mod transfer_listener; #[derive(Debug, Clone)] pub struct Config { diff --git a/crates/autopilot/src/database/transfer_listener/listener.rs b/crates/autopilot/src/database/transfer_listener/listener.rs new file mode 100644 index 0000000000..75994fdd18 --- /dev/null +++ b/crates/autopilot/src/database/transfer_listener/listener.rs @@ -0,0 +1,337 @@ +//! Real-time Transfer event listener that cancels orders when tokens are +//! transferred away. + +use { + crate::database::{Metrics, Postgres}, + alloy::{ + primitives::{Address, B256, b256}, + providers::Provider, + rpc::types::Log, + }, + anyhow::{Result, anyhow}, + chrono::{DateTime, Utc}, + database::{ + OrderUid, + byte_array::ByteArray, + order_events::{OrderEvent, OrderEventLabel, insert_order_event}, + }, + shared::ethrpc::Web3, + sqlx::{PgConnection, QueryBuilder}, +}; + +/// The ERC20 Transfer event signature hash: +/// keccak256("Transfer(address,address,uint256)") +const TRANSFER_SIGNATURE: B256 = + b256!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"); + +/// Represents a decoded Transfer event +#[derive(Clone, Copy, Debug)] +pub struct TransferEvent { + /// The address sending tokens + pub from: Address, + /// The address receiving tokens + pub to: Address, + /// The token contract address + pub token: Address, + /// Block number where the transfer occurred + pub block_number: u64, +} + +impl TransferEvent { + /// Decode a Transfer event from a log + pub fn from_log(log: &Log) -> Option { + // Transfer event has 3 topics: [signature_hash, from (indexed), to (indexed)] + // and data contains the value (uint256) + let topics = log.topics(); + if topics.len() < 3 { + return None; + } + + // Verify it's a Transfer event + if topics[0].0 != TRANSFER_SIGNATURE { + return None; + } + + // Extract addresses from topics. Topics are padded to 32 bytes (B256), + // so indexed address topics have the address in the last 20 bytes. + // Use from_word which handles the conversion from a 32-byte word to address. + let from = Address::from_word(topics[1]); + let to = Address::from_word(topics[2]); + let token = log.address(); + let block_number = log.block_number?; + + Some(TransferEvent { + from, + to, + token, + block_number, + }) + } +} + +pub struct TransferListener { + db: Postgres, + web3: Web3, + /// Addresses to ignore/exclude from transfer event processing + /// (e.g., settlement contract, vault relayer) + ignored_addresses: std::collections::HashSet
, +} + +impl TransferListener { + pub fn new(db: Postgres, web3: Web3, ignored_addresses: Vec
) -> Self { + Self { + db, + web3, + ignored_addresses: ignored_addresses.into_iter().collect(), + } + } + + /// Fetch and process Transfer events from a specific block + pub async fn process_block(&self, block_number: u64) -> Result<()> { + // Fetch all receipts for this block (includes all logs) + // This is more efficient than filtering on the RPC side + let receipts = self + .web3 + .alloy + .get_block_receipts(block_number.into()) + .await?; + + // Collect all logs from receipts and manually filter for Transfer events that + // are *NOT* related to our ignored contracts (settlement, vault + // relayer, etc.) + let logs: Vec = receipts + .into_iter() + .flat_map(|receipt| { + receipt.into_iter().flat_map(|r| { + r.logs() + .iter() + .filter_map(|l| { + if l.topic0().eq(&Some(&TRANSFER_SIGNATURE)) { + // Extract the from and to addresses from topics + let topics = l.topics(); + if topics.len() < 3 { + return None; + } + + let from = Address::from_word(topics[1]); + let to = Address::from_word(topics[2]); + + // Exclude transfers involving any ignored addresses + if self.ignored_addresses.contains(&from) + || self.ignored_addresses.contains(&to) + { + return None; + } + + Some(l.clone()) + } else { + None + } + }) + .collect::>() + }) + }) + .collect(); + + tracing::debug!(logs_count = ?logs.len(), "Cancelling orders matching transfer events"); + + if !logs.is_empty() { + self.process_transfer_events(logs).await + } else { + Ok(()) + } + } + + /// Process Transfer events and cancel matching orders + /// All transfers from a block are processed in a single batch transaction + pub async fn process_transfer_events(&self, logs: Vec) -> Result<()> { + // Decode all transfer events + let transfers: Vec = + logs.iter().filter_map(TransferEvent::from_log).collect(); + + if transfers.is_empty() { + tracing::warn!("transfers empty after parsing logs"); + return Ok(()); + } + + tracing::debug!( + transfers_count = transfers.len(), + "processing transfer events" + ); + + // Find and cancel all matching live orders in a single transaction + let mut ex = self.db.pool.begin().await?; + let cancelled_count = cancel_matching_orders(&mut ex, &transfers).await?; + ex.commit().await?; + + if cancelled_count > 0 { + let _timer = Metrics::get() + .database_queries + .with_label_values(&["transfer_listener_cancel_orders"]) + .start_timer(); + + tracing::debug!( + "Transfer listener: cancelled {} orders from {} transfer events", + cancelled_count, + transfers.len() + ); + } else { + tracing::debug!("no orders were cancelled for {} transfers", transfers.len()); + } + + Ok(()) + } +} + +/// Cancel all live orders matching the given transfers in a single batch +/// transaction. Uses the same `live_orders` logic as `solvable_orders` to +/// ensure we only cancel truly active orders (not expired, not invalidated, +/// etc). +async fn cancel_matching_orders(ex: &mut PgConnection, transfers: &[TransferEvent]) -> Result { + if transfers.is_empty() { + return Ok(0); + } + + let now = Utc::now(); + + // Build a dynamic query with all the (owner, sell_token) pairs + let mut owner_tokens = Vec::new(); + for transfer in transfers { + owner_tokens.push((transfer.from, transfer.token)); + } + + // Find all live orders matching any of the (owner, sell_token) pairs + let order_uids = find_live_orders_to_cancel(ex, &owner_tokens).await?; + + if order_uids.is_empty() { + tracing::debug!("no live orders matched any transfer events"); + return Ok(0); + } + + // Update all orders' cancellation_timestamp in a single query + update_cancellation_timestamps(ex, &order_uids, now).await?; + + // Insert cancellation events in a single batch + insert_cancellation_events(ex, &order_uids, now).await?; + + Ok(order_uids.len() as u64) +} + +/// Find all live orders that match any of the (owner, sell_token) pairs. +/// A live order is one that: +/// - Has NOT been cancelled via the API (cancellation_timestamp IS NULL) +/// - Has NOT been invalidated (various invalidation tables) +/// - For ethflow orders, has NOT been invalidated by ethflow-specific logic +async fn find_live_orders_to_cancel( + ex: &mut PgConnection, + owner_token_pairs: &[(Address, Address)], +) -> Result> { + // Collect all owners and tokens into separate vectors explicitly typed as bytea + let owners: Vec> = owner_token_pairs + .iter() + .map(|(o, _)| o.0.to_vec()) + .collect(); + let tokens: Vec> = owner_token_pairs + .iter() + .map(|(_, t)| t.0.to_vec()) + .collect(); + + // Note: We don't filter by valid_to here because transfers can happen at any + // time, and an expired order should still be cancelled to reflect the + // user's intent + const QUERY: &str = r#" +SELECT o.uid +FROM orders o +WHERE o.cancellation_timestamp IS NULL + AND (o.owner, o.sell_token) IN ( + SELECT DISTINCT sp.owner, sp.sell_token + FROM ( + SELECT UNNEST($1::bytea[]) as owner, UNNEST($2::bytea[]) as sell_token + ) sp + ) + AND NOT EXISTS (SELECT 1 FROM invalidations i WHERE i.order_uid = o.uid) + AND NOT EXISTS (SELECT 1 FROM onchain_order_invalidations oi WHERE oi.uid = o.uid) + AND NOT EXISTS (SELECT 1 FROM onchain_placed_orders op WHERE op.uid = o.uid AND op.placement_error IS NOT NULL) + AND ( + NOT EXISTS (SELECT 1 FROM ethflow_orders e WHERE e.uid = o.uid) + OR EXISTS ( + SELECT 1 FROM ethflow_orders e + WHERE e.uid = o.uid + AND (e.valid_to IS NULL OR e.valid_to >= EXTRACT(EPOCH FROM NOW())::bigint) + ) + ) + "#; + + let rows: Vec<(Vec,)> = sqlx::query_as(QUERY) + .bind(&owners as &[Vec]) + .bind(&tokens as &[Vec]) + .fetch_all(ex) + .await?; + + rows.into_iter() + .map(|(uid_bytes,)| { + let array: [u8; 56] = uid_bytes + .try_into() + .map_err(|_| anyhow!("Invalid order UID length"))?; + Ok(ByteArray(array)) + }) + .collect() +} + +/// Update the cancellation timestamp for all given orders +async fn update_cancellation_timestamps( + ex: &mut PgConnection, + order_uids: &[OrderUid], + timestamp: DateTime, +) -> Result<()> { + if order_uids.is_empty() { + return Ok(()); + } + + let mut query_builder: QueryBuilder = + QueryBuilder::new("UPDATE orders SET cancellation_timestamp = "); + query_builder.push_bind(timestamp); + query_builder.push(" WHERE uid IN ("); + + let mut separated = query_builder.separated(", "); + for order_uid in order_uids { + separated.push_bind(order_uid.0.as_ref()); + } + query_builder.push(")"); + + query_builder.build().execute(ex).await?; + + Ok(()) +} + +/// Insert cancellation events for all given orders +async fn insert_cancellation_events( + ex: &mut PgConnection, + order_uids: &[OrderUid], + timestamp: DateTime, +) -> Result<()> { + if order_uids.is_empty() { + return Ok(()); + } + + // Use the standard insert_order_event function for each order to respect + // the deduplication logic (don't insert if the last event is already Cancelled) + for order_uid in order_uids { + insert_order_event( + ex, + &OrderEvent { + order_uid: *order_uid, + timestamp, + label: OrderEventLabel::Cancelled, + }, + ) + .await?; + + tracing::debug!( + ?order_uid, + "Order cancelled due to transfer of order token from owner" + ); + } + + Ok(()) +} diff --git a/crates/autopilot/src/database/transfer_listener/mod.rs b/crates/autopilot/src/database/transfer_listener/mod.rs new file mode 100644 index 0000000000..8eecd7cba2 --- /dev/null +++ b/crates/autopilot/src/database/transfer_listener/mod.rs @@ -0,0 +1,6 @@ +//! Listens for ERC20 Transfer events and cancels orders that have transferred +//! their sell tokens away from the order owner. + +pub mod listener; + +pub use listener::TransferListener; diff --git a/crates/autopilot/src/maintenance.rs b/crates/autopilot/src/maintenance.rs index 1344c6ba67..54b585fb91 100644 --- a/crates/autopilot/src/maintenance.rs +++ b/crates/autopilot/src/maintenance.rs @@ -116,6 +116,33 @@ impl Maintenance { }); } + /// Spawns the Transfer event listener that cancels orders when tokens are + /// transferred. + pub fn spawn_transfer_listener( + transfer_listener: crate::database::transfer_listener::TransferListener, + current_block: CurrentBlockWatcher, + ) { + tracing::info!(?current_block, "spawning transfer listener task"); + tokio::task::spawn(async move { + let mut stream = into_stream(current_block); + loop { + if let Some(block) = stream.next().await { + if let Err(err) = Self::timed_future( + "transfer_listener", + transfer_listener.process_block(block.number), + ) + .await + { + tracing::warn!(?err, "transfer listener failed an iteration"); + } + } else { + tracing::error!("block stream terminated unexpectedly"); + break; + } + } + }); + } + pub fn with_cow_amms(&mut self, registry: &cow_amm::Registry) { self.cow_amm_indexer = registry.maintenance_tasks().clone(); } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index be521e3a9d..768e362b20 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -639,6 +639,16 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) { ); } + if args.transfer_listener_enabled { + let ignored_addresses = vec![*eth.contracts().settlement().address(), vault_relayer]; + let transfer_listener = crate::database::transfer_listener::TransferListener::new( + db_write.clone(), + web3.clone(), + ignored_addresses, + ); + Maintenance::spawn_transfer_listener(transfer_listener, eth.current_block().clone()); + } + let run_loop_config = run_loop::Config { submission_deadline: args.submission_deadline as u64, max_settlement_transaction_wait: args.max_settlement_transaction_wait,