Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/abci/src/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ impl ChainState {
self.deposit_intents.clone()
}

pub fn remove_deposit_intent(&mut self, intent: &DepositIntent) {
self.deposit_intents
.retain(|i| i.deposit_address != intent.deposit_address);
}

#[must_use]
pub fn get_deposit_intent_by_address(&self, address: &str) -> Option<&DepositIntent> {
self.deposit_intents
Expand All @@ -115,6 +120,11 @@ impl ChainState {
}

pub fn add_transaction_to_block(&mut self, transaction: Transaction) {
// Check if the transaction is already in the block
if self.proposed_transactions.contains(&transaction) {
return;
}

self.proposed_transactions.push(transaction);
}

Expand Down
1 change: 1 addition & 0 deletions crates/abci/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub trait Db: Send + Sync {
fn insert_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError>;
fn get_deposit_intent(&self, tracking_id: &str) -> Result<Option<DepositIntent>, NodeError>;
fn get_all_deposit_intents(&self) -> Result<Vec<DepositIntent>, NodeError>;
fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError>;
fn get_deposit_intent_by_address(
&self,
address: &str,
Expand Down
12 changes: 12 additions & 0 deletions crates/abci/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,16 @@ impl Db for RocksDb {

Ok(utxos)
}

fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError> {
self.db.delete_cf(
self.db.cf_handle("deposit_intents").unwrap(),
&intent.deposit_tracking_id,
)?;
self.db.delete_cf(
self.db.cf_handle("deposit_intents").unwrap(),
format!("addr:{}", intent.deposit_address),
)?;
Ok(())
}
}
58 changes: 48 additions & 10 deletions crates/abci/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub trait ChainInterface: Send + Sync {
fn insert_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError>;
fn get_all_deposit_intents(&self) -> Result<Vec<DepositIntent>, NodeError>;
fn get_deposit_intent_by_address(&self, address: &str) -> Option<DepositIntent>;
fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError>;

fn create_genesis_block(
&mut self,
Expand Down Expand Up @@ -68,20 +69,51 @@ pub enum ChainMessage {
},
GetPendingTransactions,
GetChainState,
GetChainInfo,
RemoveDepositIntent {
intent: DepositIntent,
},
}

#[derive(Clone)]
pub enum ChainResponse {
InsertDepositIntent { error: Option<NodeError> },
GetAccount { account: Option<Account> },
GetAllDepositIntents { intents: Vec<DepositIntent> },
GetDepositIntentByAddress { intent: Option<DepositIntent> },
CreateGenesisBlock { error: Option<NodeError> },
AddTransactionToBlock { error: Option<NodeError> },
GetProposedBlock { block: Block },
FinalizeAndStoreBlock { error: Option<NodeError> },
GetPendingTransactions { transactions: Vec<Transaction> },
GetChainState { state: chain_state::ChainState },
InsertDepositIntent {
error: Option<NodeError>,
},
GetAccount {
account: Option<Account>,
},
GetAllDepositIntents {
intents: Vec<DepositIntent>,
},
GetDepositIntentByAddress {
intent: Option<DepositIntent>,
},
CreateGenesisBlock {
error: Option<NodeError>,
},
AddTransactionToBlock {
error: Option<NodeError>,
},
GetProposedBlock {
block: Block,
},
FinalizeAndStoreBlock {
error: Option<NodeError>,
},
GetPendingTransactions {
transactions: Vec<Transaction>,
},
GetChainState {
state: chain_state::ChainState,
},
GetChainInfo {
height: u64,
pending_transactions: usize,
},
RemoveDepositIntent {
error: Option<NodeError>,
},
}

pub struct ChainInterfaceImpl {
Expand Down Expand Up @@ -132,6 +164,12 @@ impl ChainInterface for ChainInterfaceImpl {
.cloned()
}

fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError> {
self.chain_state.remove_deposit_intent(&intent);
self.db.remove_deposit_intent(intent)?;
Ok(())
}

fn create_genesis_block(
&mut self,
validators: Vec<ValidatorInfo>,
Expand Down
9 changes: 9 additions & 0 deletions crates/abci/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl ChainInterfaceImpl {
error: self.insert_deposit_intent(intent).err(),
}
}
ChainMessage::RemoveDepositIntent { intent } => {
ChainResponse::RemoveDepositIntent {
error: self.remove_deposit_intent(intent).err(),
}
}
ChainMessage::GetAccount { address } => ChainResponse::GetAccount {
account: self.get_account(&address),
},
Expand Down Expand Up @@ -79,6 +84,10 @@ impl ChainInterfaceImpl {
ChainMessage::GetChainState => ChainResponse::GetChainState {
state: self.get_chain_state(),
},
ChainMessage::GetChainInfo => ChainResponse::GetChainInfo {
height: self.get_chain_state().get_block_height(),
pending_transactions: self.get_pending_transactions().len(),
},
};
response_tx
.send(response)
Expand Down
36 changes: 36 additions & 0 deletions crates/consensus/src/consensus_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,42 @@ impl ConsensusInterfaceImpl {
self.max_validators = Some(max_validators);
}

pub async fn initialize_from_chain_state(&mut self) -> Result<(), NodeError> {
if let Some(chain_tx) = &mut self.chain_interface_tx {
match chain_tx
.send_message_with_response(abci::ChainMessage::GetChainInfo)
.await
{
Ok(abci::ChainResponse::GetChainInfo {
height,
pending_transactions: _,
}) => {
self.state.current_height = height;

// Set round to 0 for the current height (fresh start for this height)
self.state.current_round = 0;

info!(
"✅ Initialized consensus from chain state: height={}, round={}",
self.state.current_height, self.state.current_round
);

Ok(())
}
Ok(_) => Err(NodeError::Error(
"Unexpected response from chain interface for GetChainInfo".to_string(),
)),
Err(e) => {
warn!("Failed to get chain info, using default values: {}", e);
Ok(()) // Don't fail initialization, just use defaults
}
}
} else {
warn!("Chain interface not available during initialization, using default values");
Ok(())
}
}

fn send_broadcast(&self, message: BroadcastMessage) -> Result<(), NodeError> {
if let Some(sender) = &self.network_events_tx {
sender
Expand Down
30 changes: 29 additions & 1 deletion crates/node/src/handlers/deposit/create_deposit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl DepositIntentState {
}
}

pub async fn update_user_balance<N: Network, W: Wallet>(
pub async fn insert_pending_deposit_transaction<N: Network, W: Wallet>(
&mut self,
node: &mut NodeState<N, W>,
tx: &BitcoinTransaction,
Expand Down Expand Up @@ -199,6 +199,34 @@ impl DepositIntentState {

info!("✅ Transaction successfully added to block");

// Remove the transaction from the deposit addresses
self.deposit_addresses.remove(&addr_str);
self.processed_txids.insert(tx.compute_txid());
let remove_intent_response = node
.chain_interface_tx
.send_message_with_response(ChainMessage::RemoveDepositIntent {
intent: intent.clone(),
})
.await?;

match remove_intent_response {
ChainResponse::RemoveDepositIntent { error: None } => {
info!("✅ Deposit intent removed");
}
ChainResponse::RemoveDepositIntent { error: Some(e) } => {
error!("Failed to remove deposit intent: {}", e);
return Err(NodeError::Error(
"Failed to remove deposit intent".to_string(),
));
}
_ => {
error!("Failed to remove deposit intent");
return Err(NodeError::Error(
"Failed to remove deposit intent".to_string(),
));
}
}

// Broadcast the transaction to all other nodes
match bincode::encode_to_vec(&transaction, bincode::config::standard()) {
Ok(transaction_data) => {
Expand Down
5 changes: 4 additions & 1 deletion crates/node/src/handlers/deposit/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ impl<N: Network, W: Wallet> Handler<N, W> for DepositIntentState {
request: SelfRequest::ConfirmDeposit { confirmed_tx },
..
} => {
if let Err(e) = self.update_user_balance(node, &confirmed_tx).await {
if let Err(e) = self
.insert_pending_deposit_transaction(node, &confirmed_tx)
.await
{
info!("❌ Failed to update user balance: {}", e);
} else {
info!(
Expand Down
5 changes: 5 additions & 0 deletions crates/node/src/start_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ pub async fn start_node(
})
.await;

// Initialize consensus state from current chain state
if let Err(e) = consensus_interface.initialize_from_chain_state().await {
tracing::error!("Failed to initialize consensus from chain state: {}", e);
}

let consensus_interface_handle = tokio::spawn(async move {
consensus_interface.start().await;
});
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/utils/swarm_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ pub fn build_swarm(
.mesh_outbound_min(mesh_n_low)
.gossip_lazy(std::cmp::max(3, mesh_n_low))
.max_transmit_size(64 * 1024)
.flood_publish(false)
.flood_publish(true)
.build()
.map_err(io::Error::other)?;

Expand Down
4 changes: 2 additions & 2 deletions tests/src/deposit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ mod deposit_tests {
};

state
.update_user_balance(node, &tx)
.insert_pending_deposit_transaction(node, &tx)
.await
.expect("balance update failed");

Expand Down Expand Up @@ -408,7 +408,7 @@ mod deposit_tests {
};

state
.update_user_balance(node, &tx)
.insert_pending_deposit_transaction(node, &tx)
.await
.expect("balance update failed");

Expand Down
6 changes: 6 additions & 0 deletions tests/src/mocks/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ impl ChainInterface for MockChainInterface {
fn get_chain_state(&self) -> ChainState {
self.chain_state.clone()
}

fn remove_deposit_intent(&mut self, intent: DepositIntent) -> Result<(), NodeError> {
self.chain_state.remove_deposit_intent(&intent);
self.db.remove_deposit_intent(intent)?;
Ok(())
}
}

impl TestChainInterface for MockChainInterface {
Expand Down
6 changes: 6 additions & 0 deletions tests/src/mocks/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,10 @@ impl Db for MockDb {
let utxos = self.utxos.read().unwrap();
Ok(utxos.values().cloned().collect())
}

fn remove_deposit_intent(&self, intent: DepositIntent) -> Result<(), NodeError> {
let mut deposit_intents = self.deposit_intents.write().unwrap();
deposit_intents.remove(&intent.deposit_tracking_id);
Ok(())
}
}
Loading