From a72435f1ce7ec25399582dc55e374c1c14ca439d Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 10:00:43 -0800 Subject: [PATCH 01/10] feat: Implement hash on `PeerMessage` and child structs --- crates/libtortillas/src/protocol/messages.rs | 32 ++++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/crates/libtortillas/src/protocol/messages.rs b/crates/libtortillas/src/protocol/messages.rs index a037c38..fdf9715 100644 --- a/crates/libtortillas/src/protocol/messages.rs +++ b/crates/libtortillas/src/protocol/messages.rs @@ -1,3 +1,4 @@ +use core::hash; use std::{ collections::HashMap, fmt::Display, @@ -23,7 +24,7 @@ use crate::{ peer::{MAGIC_STRING, PeerId}, }; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[repr(u8)] /// Represents messages exchanged between peers in the BitTorrent protocol. /// @@ -366,7 +367,8 @@ impl PeerMessages { PartialEq, Eq, Deserialize_repr, - TryFromPrimitive + TryFromPrimitive, + Hash )] #[repr(u8)] pub enum ExtendedMessageType { @@ -456,6 +458,30 @@ pub struct ExtendedMessage { pub total_size: Option, } +impl hash::Hash for ExtendedMessage { + fn hash(&self, state: &mut H) { + if let Some(extensions) = &self.supported_extensions { + let mut pairs: Vec<_> = extensions.iter().collect(); + pairs.sort_by_key(|i| i.0); + + pairs.hash(state); + } + + self.local_port.hash(state); + self.version.hash(state); + self.your_ip.hash(state); + self.ipv6.hash(state); + self.ipv4.hash(state); + self.outstanding_requests.hash(state); + self.metadata_size.hash(state); + if let Some(msg_type) = &self.msg_type { + msg_type.hash(state); + } + self.piece.hash(state); + self.total_size.hash(state); + } +} + impl ExtendedMessage { pub fn new() -> Self { Self::default() @@ -502,7 +528,7 @@ impl ExtendedMessage { } /// BitTorrent Handshake message structure -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Handshake { /// Protocol identifier (typically "BitTorrent protocol") pub protocol: Bytes, From 5560caaf7e5798fe7926c2310092cb9d1d5838f7 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 10:46:53 -0800 Subject: [PATCH 02/10] feat: Create message `pending_message_queue` and `flush_queue` --- crates/libtortillas/src/peer/actor.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 232cd86..1e7544a 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -39,6 +39,7 @@ pub(crate) struct PeerActor { supervisor: ActorRef, pending_block_requests: Arc>, + pending_message_requests: Vec, } impl PeerActor { @@ -223,6 +224,25 @@ impl PeerActor { self.peer.set_am_interested(has_interesting_pieces); } + + /// Sends all queued messages to the peer. This sends synchronously, and will + /// not return until each message has been sent. This is because most of + /// the time we want the messages to be sent in their original order. + #[instrument(skip(self), fields(peer_addr = %self.stream, peer_id = %self.peer.id.unwrap()))] + async fn flush_queue(&mut self) { + let queued_messages = self.pending_message_requests.len(); + + while let Some(msg) = self.pending_message_requests.pop() { + self + .stream + .send(msg) + .await + .expect("Failed to send message to peer"); + } + + trace!(amount = queued_messages, "Flushed queued messages to peer"); + } + } impl Actor for PeerActor { @@ -251,6 +271,7 @@ impl Actor for PeerActor { stream, supervisor, pending_block_requests: Arc::new(DashSet::new()), + pending_message_requests: Vec::with_capacity(MAX_PENDING_MESSAGES), }) } From 8b61a20c863b06089b6bc675cf8e626381e42474 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 11:03:59 -0800 Subject: [PATCH 03/10] feat: Add `flush_block_requests` to retry requested blocks we were choked --- crates/libtortillas/src/peer/actor.rs | 33 +++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 1e7544a..940e96a 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -243,6 +243,39 @@ impl PeerActor { trace!(amount = queued_messages, "Flushed queued messages to peer"); } + /// Flushes/resends all pending block requests to the peer. + #[instrument(skip(self), fields(peer_addr = %self.stream, peer_id = %self.peer.id.unwrap()))] + async fn flush_block_requests(&mut self) { + let queued_block_requests = self.pending_block_requests.len(); + let mut completed = Vec::with_capacity(queued_block_requests); + + for request in self.pending_block_requests.iter() { + let (index, begin, length) = *request; + if let Ok(()) = self + .stream + .send(PeerMessages::Request( + index as u32, + begin as u32, + length as u32, + )) + .await + { + completed.push((index, begin, length)); + } + } + for (index, begin, length) in &completed { + self + .pending_block_requests + .remove(&(*index, *begin, *length)); + } + + trace!( + amount = queued_block_requests, + amount_succussful = completed.len(), + "Flushed queued block requests to peer" + ); + } + } impl Actor for PeerActor { From 8cd94592ec26d6c7ea11ed71433d42bc983ce3cb Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 11:04:36 -0800 Subject: [PATCH 04/10] refactor: Only send `Request` messages if we arent choked --- crates/libtortillas/src/peer/actor.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 940e96a..ba1d108 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -504,15 +504,17 @@ impl Message for PeerActor { return; } - self - .stream - .send(PeerMessages::Request( - index as u32, - begin as u32, - length as u32, - )) - .await - .expect("Failed to send piece request"); + if !self.peer.am_choked() { + self + .stream + .send(PeerMessages::Request( + index as u32, + begin as u32, + length as u32, + )) + .await + .expect("Failed to send piece request"); + } self.pending_block_requests.insert((index, begin, length)); trace!(piece_index = index, "Sent piece request to peer"); } From ddc11a50a5274fe383e8f4f88c7d488c7561a621 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 11:05:07 -0800 Subject: [PATCH 05/10] feat: Flush both the block and normal message queue on `Unchoke` --- crates/libtortillas/src/peer/actor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index ba1d108..3745d7f 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -390,6 +390,10 @@ impl Message for PeerActor { PeerMessages::Unchoke => { self.peer.update_last_optimistic_unchoke(); self.peer.set_am_choked(false); + + // Send all pending messages + self.flush_queue().await; + self.flush_block_requests().await; trace!("Peer unchoked us"); } PeerMessages::Interested => { From 0bc1f5b277fc2a9568c19075d9de4c0746118730 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 11:47:18 -0800 Subject: [PATCH 06/10] refactor: Move `pending_message_requests` to a `VecDeque` over vanilla `Vec` --- crates/libtortillas/src/peer/actor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 3745d7f..1624989 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + collections::{HashMap, VecDeque}, sync::{Arc, atomic::AtomicU8}, time::Instant, }; @@ -39,7 +40,7 @@ pub(crate) struct PeerActor { supervisor: ActorRef, pending_block_requests: Arc>, - pending_message_requests: Vec, + pending_message_requests: VecDeque, } impl PeerActor { @@ -232,7 +233,7 @@ impl PeerActor { async fn flush_queue(&mut self) { let queued_messages = self.pending_message_requests.len(); - while let Some(msg) = self.pending_message_requests.pop() { + while let Some(msg) = self.pending_message_requests.pop_back() { self .stream .send(msg) @@ -304,7 +305,7 @@ impl Actor for PeerActor { stream, supervisor, pending_block_requests: Arc::new(DashSet::new()), - pending_message_requests: Vec::with_capacity(MAX_PENDING_MESSAGES), + pending_message_requests: VecDeque::with_capacity(MAX_PENDING_MESSAGES), }) } From 787b99c68fbdfc0accc6dec9fee3d80b381c9b25 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 12:49:39 -0800 Subject: [PATCH 07/10] feat: Only run flushes if their respective `Vec` isn't empty --- crates/libtortillas/src/peer/actor.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 1624989..f2db5b1 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -231,6 +231,10 @@ impl PeerActor { /// the time we want the messages to be sent in their original order. #[instrument(skip(self), fields(peer_addr = %self.stream, peer_id = %self.peer.id.unwrap()))] async fn flush_queue(&mut self) { + if self.pending_message_requests.is_empty() { + return; + } + let queued_messages = self.pending_message_requests.len(); while let Some(msg) = self.pending_message_requests.pop_back() { @@ -247,6 +251,10 @@ impl PeerActor { /// Flushes/resends all pending block requests to the peer. #[instrument(skip(self), fields(peer_addr = %self.stream, peer_id = %self.peer.id.unwrap()))] async fn flush_block_requests(&mut self) { + if self.pending_block_requests.is_empty() { + return; + } + let queued_block_requests = self.pending_block_requests.len(); let mut completed = Vec::with_capacity(queued_block_requests); From 1bbf0bd5eeab2f540f3604614f8eb6efb9064b59 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 12:52:04 -0800 Subject: [PATCH 08/10] feat: Create a `send_message` method that proxies peer messages --- crates/libtortillas/src/peer/actor.rs | 28 +++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index f2db5b1..174a459 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -27,6 +27,8 @@ use crate::{ torrent::{TorrentActor, TorrentMessage, TorrentRequest, TorrentResponse}, }; +const MAX_PENDING_MESSAGES: usize = 8; + const PEER_KEEPALIVE_TIMEOUT: u64 = 10; const PEER_DISCONNECT_TIMEOUT: u64 = 20; @@ -285,6 +287,32 @@ impl PeerActor { ); } + /// Send a message to the peer. Checks if the peer is choked, and if so, + /// queues the message in [`self.pending_message_requests`]. This function + /// will NOT queue request messages since they have their own queue of + /// sorts. + /// + /// Unless you're doing something like a `KeepAlive` message or a piece + /// request, you should use this function over [`Self::stream.send`]. + #[instrument(skip(self), fields(peer_addr = %self.stream, peer_id = %self.peer.id.unwrap()))] + async fn send_message(&mut self, msg: PeerMessages) -> Result<(), PeerActorError> { + if self.peer.am_choked() { + // Only push the message if it's not a request + if matches!(msg, PeerMessages::Request(..)) { + return Ok(()); + } + if self.pending_message_requests.len() >= MAX_PENDING_MESSAGES { + self.pending_message_requests.pop_back(); + } + + self.pending_message_requests.push_front(msg); + trace!("Peer is choked, queueing message"); + + return Ok(()); + } + + self.stream.send(msg).await + } } impl Actor for PeerActor { From cfd2e964526cfc356b501e5b6b151885256640f7 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 12:52:25 -0800 Subject: [PATCH 09/10] feat: Move appropriate messages to `self.send_message` over `self.stream.send` --- crates/libtortillas/src/peer/actor.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 174a459..cfd9301 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, collections::{HashMap, VecDeque}, sync::{Arc, atomic::AtomicU8}, time::Instant, @@ -172,7 +171,7 @@ impl PeerActor { None, ); - if let Err(e) = self.stream.send(message).await { + if let Err(e) = self.send_message(message).await { trace!(error = %e, piece, "Failed to send metadata request"); } } else { @@ -582,14 +581,13 @@ impl Message for PeerActor { } PeerTell::HaveInfoDict(bitfield) => { self - .stream - .send(PeerMessages::Bitfield(bitfield)) + .send_message(PeerMessages::Bitfield(bitfield)) .await .expect("Failed to send bitfield"); trace!("Sent bitfield to peer"); } PeerTell::Have(piece) => { - if let Err(e) = self.stream.send(PeerMessages::Have(piece as u32)).await { + if let Err(e) = self.send_message(PeerMessages::Have(piece as u32)).await { trace!(piece_num = piece, error = %e, "Failed to send Have message to peer"); } } From 8ac653b2fffa7e55bf6bebd0f8c59bc1d5f954bc Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Mon, 10 Nov 2025 13:06:18 -0800 Subject: [PATCH 10/10] fix: Keep pending block requests until the piece arrives --- crates/libtortillas/src/peer/actor.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index cfd9301..9d8dc2b 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -257,11 +257,11 @@ impl PeerActor { } let queued_block_requests = self.pending_block_requests.len(); - let mut completed = Vec::with_capacity(queued_block_requests); + let mut completed = 0usize; for request in self.pending_block_requests.iter() { let (index, begin, length) = *request; - if let Ok(()) = self + if self .stream .send(PeerMessages::Request( index as u32, @@ -269,19 +269,14 @@ impl PeerActor { length as u32, )) .await + .is_ok() { - completed.push((index, begin, length)); + completed += 1; } } - for (index, begin, length) in &completed { - self - .pending_block_requests - .remove(&(*index, *begin, *length)); - } - trace!( amount = queued_block_requests, - amount_succussful = completed.len(), + amount_succussful = completed, "Flushed queued block requests to peer" ); }