From 9023401e95eb29f9486199b87486f64fd8c10ac8 Mon Sep 17 00:00:00 2001 From: Dens Sumesh Date: Wed, 25 Jun 2025 16:15:57 -0700 Subject: [PATCH 1/2] cleanup: make consensus more stable after restart --- crates/abci/src/chain_state.rs | 10 ++++ crates/abci/src/db/mod.rs | 1 + crates/abci/src/db/rocksdb.rs | 12 ++++ crates/abci/src/lib.rs | 58 +++++++++++++++---- crates/abci/src/main_loop.rs | 9 +++ crates/consensus/src/consensus_interface.rs | 36 ++++++++++++ .../src/handlers/deposit/create_deposit.rs | 28 +++++++++ crates/node/src/start_node.rs | 5 ++ crates/node/src/utils/swarm_manager.rs | 2 +- tests/src/mocks/abci.rs | 6 ++ tests/src/mocks/db.rs | 6 ++ 11 files changed, 162 insertions(+), 11 deletions(-) diff --git a/crates/abci/src/chain_state.rs b/crates/abci/src/chain_state.rs index 5ca4d53f..dcdb34f6 100644 --- a/crates/abci/src/chain_state.rs +++ b/crates/abci/src/chain_state.rs @@ -102,6 +102,11 @@ impl ChainState { self.deposit_intents.clone() } + pub fn remove_deposit_intent(&mut self, intent: &DepositIntent) { + self.deposit_intents + .retain(|i| i.deposit_address != intent.deposit_address); + } + #[must_use] pub fn get_deposit_intent_by_address(&self, address: &str) -> Option<&DepositIntent> { self.deposit_intents @@ -115,6 +120,11 @@ impl ChainState { } pub fn add_transaction_to_block(&mut self, transaction: Transaction) { + // Check if the transaction is already in the block + if self.proposed_transactions.contains(&transaction) { + return; + } + self.proposed_transactions.push(transaction); } diff --git a/crates/abci/src/db/mod.rs b/crates/abci/src/db/mod.rs index 20f6af74..6dab5260 100644 --- a/crates/abci/src/db/mod.rs +++ b/crates/abci/src/db/mod.rs @@ -15,6 +15,7 @@ pub trait Db: Send + Sync { fn insert_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError>; fn get_deposit_intent(&self, tracking_id: &str) -> Result, NodeError>; fn get_all_deposit_intents(&self) -> Result, NodeError>; + fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError>; fn get_deposit_intent_by_address( &self, address: &str, diff --git a/crates/abci/src/db/rocksdb.rs b/crates/abci/src/db/rocksdb.rs index 6fb1c713..4acbd675 100644 --- a/crates/abci/src/db/rocksdb.rs +++ b/crates/abci/src/db/rocksdb.rs @@ -222,4 +222,16 @@ impl Db for RocksDb { Ok(utxos) } + + fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError> { + self.db.delete_cf( + self.db.cf_handle("deposit_intents").unwrap(), + &intent.deposit_tracking_id, + )?; + self.db.delete_cf( + self.db.cf_handle("deposit_intents").unwrap(), + format!("addr:{}", intent.deposit_address), + )?; + Ok(()) + } } diff --git a/crates/abci/src/lib.rs b/crates/abci/src/lib.rs index 32706efa..dfe96d1d 100644 --- a/crates/abci/src/lib.rs +++ b/crates/abci/src/lib.rs @@ -18,6 +18,7 @@ pub trait ChainInterface: Send + Sync { fn insert_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError>; fn get_all_deposit_intents(&self) -> Result, NodeError>; fn get_deposit_intent_by_address(&self, address: &str) -> Option; + fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError>; fn create_genesis_block( &mut self, @@ -68,20 +69,51 @@ pub enum ChainMessage { }, GetPendingTransactions, GetChainState, + GetChainInfo, + RemoveDepositIntent { + intent: DepositIntent, + }, } #[derive(Clone)] pub enum ChainResponse { - InsertDepositIntent { error: Option }, - GetAccount { account: Option }, - GetAllDepositIntents { intents: Vec }, - GetDepositIntentByAddress { intent: Option }, - CreateGenesisBlock { error: Option }, - AddTransactionToBlock { error: Option }, - GetProposedBlock { block: Block }, - FinalizeAndStoreBlock { error: Option }, - GetPendingTransactions { transactions: Vec }, - GetChainState { state: chain_state::ChainState }, + InsertDepositIntent { + error: Option, + }, + GetAccount { + account: Option, + }, + GetAllDepositIntents { + intents: Vec, + }, + GetDepositIntentByAddress { + intent: Option, + }, + CreateGenesisBlock { + error: Option, + }, + AddTransactionToBlock { + error: Option, + }, + GetProposedBlock { + block: Block, + }, + FinalizeAndStoreBlock { + error: Option, + }, + GetPendingTransactions { + transactions: Vec, + }, + GetChainState { + state: chain_state::ChainState, + }, + GetChainInfo { + height: u64, + pending_transactions: usize, + }, + RemoveDepositIntent { + error: Option, + }, } pub struct ChainInterfaceImpl { @@ -132,6 +164,12 @@ impl ChainInterface for ChainInterfaceImpl { .cloned() } + fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError> { + self.chain_state.remove_deposit_intent(&intent); + self.db.remove_deposit_intent(intent)?; + Ok(()) + } + fn create_genesis_block( &mut self, validators: Vec, diff --git a/crates/abci/src/main_loop.rs b/crates/abci/src/main_loop.rs index 1108d25d..ee521b6b 100644 --- a/crates/abci/src/main_loop.rs +++ b/crates/abci/src/main_loop.rs @@ -39,6 +39,11 @@ impl ChainInterfaceImpl { error: self.insert_deposit_intent(intent).err(), } } + ChainMessage::RemoveDepositIntent { intent } => { + ChainResponse::RemoveDepositIntent { + error: self.remove_deposit_intent(intent).err(), + } + } ChainMessage::GetAccount { address } => ChainResponse::GetAccount { account: self.get_account(&address), }, @@ -79,6 +84,10 @@ impl ChainInterfaceImpl { ChainMessage::GetChainState => ChainResponse::GetChainState { state: self.get_chain_state(), }, + ChainMessage::GetChainInfo => ChainResponse::GetChainInfo { + height: self.get_chain_state().get_block_height(), + pending_transactions: self.get_pending_transactions().len(), + }, }; response_tx .send(response) diff --git a/crates/consensus/src/consensus_interface.rs b/crates/consensus/src/consensus_interface.rs index 2dd72e82..b2e5f522 100644 --- a/crates/consensus/src/consensus_interface.rs +++ b/crates/consensus/src/consensus_interface.rs @@ -64,6 +64,42 @@ impl ConsensusInterfaceImpl { self.max_validators = Some(max_validators); } + pub async fn initialize_from_chain_state(&mut self) -> Result<(), NodeError> { + if let Some(chain_tx) = &mut self.chain_interface_tx { + match chain_tx + .send_message_with_response(abci::ChainMessage::GetChainInfo) + .await + { + Ok(abci::ChainResponse::GetChainInfo { + height, + pending_transactions: _, + }) => { + self.state.current_height = height; + + // Set round to 0 for the current height (fresh start for this height) + self.state.current_round = 0; + + info!( + "✅ Initialized consensus from chain state: height={}, round={}", + self.state.current_height, self.state.current_round + ); + + Ok(()) + } + Ok(_) => Err(NodeError::Error( + "Unexpected response from chain interface for GetChainInfo".to_string(), + )), + Err(e) => { + warn!("Failed to get chain info, using default values: {}", e); + Ok(()) // Don't fail initialization, just use defaults + } + } + } else { + warn!("Chain interface not available during initialization, using default values"); + Ok(()) + } + } + fn send_broadcast(&self, message: BroadcastMessage) -> Result<(), NodeError> { if let Some(sender) = &self.network_events_tx { sender diff --git a/crates/node/src/handlers/deposit/create_deposit.rs b/crates/node/src/handlers/deposit/create_deposit.rs index 70126c8d..6f71cca0 100644 --- a/crates/node/src/handlers/deposit/create_deposit.rs +++ b/crates/node/src/handlers/deposit/create_deposit.rs @@ -199,6 +199,34 @@ impl DepositIntentState { info!("✅ Transaction successfully added to block"); + // Remove the transaction from the deposit addresses + self.deposit_addresses.remove(&addr_str); + self.processed_txids.insert(tx.compute_txid()); + let remove_intent_response = node + .chain_interface_tx + .send_message_with_response(ChainMessage::RemoveDepositIntent { + intent: intent.clone(), + }) + .await?; + + match remove_intent_response { + ChainResponse::RemoveDepositIntent { error: None } => { + info!("✅ Deposit intent removed"); + } + ChainResponse::RemoveDepositIntent { error: Some(e) } => { + error!("Failed to remove deposit intent: {}", e); + return Err(NodeError::Error( + "Failed to remove deposit intent".to_string(), + )); + } + _ => { + error!("Failed to remove deposit intent"); + return Err(NodeError::Error( + "Failed to remove deposit intent".to_string(), + )); + } + } + // Broadcast the transaction to all other nodes match bincode::encode_to_vec(&transaction, bincode::config::standard()) { Ok(transaction_data) => { diff --git a/crates/node/src/start_node.rs b/crates/node/src/start_node.rs index 5bd88884..2f7aa00b 100644 --- a/crates/node/src/start_node.rs +++ b/crates/node/src/start_node.rs @@ -204,6 +204,11 @@ pub async fn start_node( }) .await; + // Initialize consensus state from current chain state + if let Err(e) = consensus_interface.initialize_from_chain_state().await { + tracing::error!("Failed to initialize consensus from chain state: {}", e); + } + let consensus_interface_handle = tokio::spawn(async move { consensus_interface.start().await; }); diff --git a/crates/node/src/utils/swarm_manager.rs b/crates/node/src/utils/swarm_manager.rs index 68044a92..c23099a0 100644 --- a/crates/node/src/utils/swarm_manager.rs +++ b/crates/node/src/utils/swarm_manager.rs @@ -340,7 +340,7 @@ pub fn build_swarm( .mesh_outbound_min(mesh_n_low) .gossip_lazy(std::cmp::max(3, mesh_n_low)) .max_transmit_size(64 * 1024) - .flood_publish(false) + .flood_publish(true) .build() .map_err(io::Error::other)?; diff --git a/tests/src/mocks/abci.rs b/tests/src/mocks/abci.rs index 349098e9..c86cb04a 100644 --- a/tests/src/mocks/abci.rs +++ b/tests/src/mocks/abci.rs @@ -255,6 +255,12 @@ impl ChainInterface for MockChainInterface { fn get_chain_state(&self) -> ChainState { self.chain_state.clone() } + + fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError> { + self.chain_state.remove_deposit_intent(&intent); + self.db.remove_deposit_intent(intent)?; + Ok(()) + } } impl TestChainInterface for MockChainInterface { diff --git a/tests/src/mocks/db.rs b/tests/src/mocks/db.rs index 6f3f2f4f..72805bbb 100644 --- a/tests/src/mocks/db.rs +++ b/tests/src/mocks/db.rs @@ -113,4 +113,10 @@ impl Db for MockDb { let utxos = self.utxos.read().unwrap(); Ok(utxos.values().cloned().collect()) } + + fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError> { + let mut deposit_intents = self.deposit_intents.write().unwrap(); + deposit_intents.remove(&intent.deposit_tracking_id); + Ok(()) + } } From 62b908d6d30f3c2dfd9eb917a6afd62cff0b6f47 Mon Sep 17 00:00:00 2001 From: Dens Sumesh Date: Wed, 25 Jun 2025 16:22:43 -0700 Subject: [PATCH 2/2] cleanup: rename update_user_balance to insert_pendign_deposit_transaction --- crates/node/src/handlers/deposit/create_deposit.rs | 2 +- crates/node/src/handlers/deposit/handler.rs | 5 ++++- tests/src/deposit/mod.rs | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/node/src/handlers/deposit/create_deposit.rs b/crates/node/src/handlers/deposit/create_deposit.rs index 6f71cca0..70ca59cc 100644 --- a/crates/node/src/handlers/deposit/create_deposit.rs +++ b/crates/node/src/handlers/deposit/create_deposit.rs @@ -138,7 +138,7 @@ impl DepositIntentState { } } - pub async fn update_user_balance( + pub async fn insert_pending_deposit_transaction( &mut self, node: &mut NodeState, tx: &BitcoinTransaction, diff --git a/crates/node/src/handlers/deposit/handler.rs b/crates/node/src/handlers/deposit/handler.rs index 9c3be3ec..4caae88a 100644 --- a/crates/node/src/handlers/deposit/handler.rs +++ b/crates/node/src/handlers/deposit/handler.rs @@ -84,7 +84,10 @@ impl Handler for DepositIntentState { request: SelfRequest::ConfirmDeposit { confirmed_tx }, .. } => { - if let Err(e) = self.update_user_balance(node, &confirmed_tx).await { + if let Err(e) = self + .insert_pending_deposit_transaction(node, &confirmed_tx) + .await + { info!("❌ Failed to update user balance: {}", e); } else { info!( diff --git a/tests/src/deposit/mod.rs b/tests/src/deposit/mod.rs index dddd8f78..c89a4b51 100644 --- a/tests/src/deposit/mod.rs +++ b/tests/src/deposit/mod.rs @@ -302,7 +302,7 @@ mod deposit_tests { }; state - .update_user_balance(node, &tx) + .insert_pending_deposit_transaction(node, &tx) .await .expect("balance update failed"); @@ -408,7 +408,7 @@ mod deposit_tests { }; state - .update_user_balance(node, &tx) + .insert_pending_deposit_transaction(node, &tx) .await .expect("balance update failed");