From b5fddf7bd1b973076013447fe184ea40c4c4c8a3 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Tue, 11 Nov 2025 22:58:25 -0800 Subject: [PATCH 1/9] refactor: Split up `IncomingPiece` code to their own functions --- crates/libtortillas/src/torrent/actor.rs | 221 +++++++++++++++++++- crates/libtortillas/src/torrent/messages.rs | 159 +------------- 2 files changed, 226 insertions(+), 154 deletions(-) diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index 9795064..7980c5e 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -14,7 +14,7 @@ use dashmap::DashMap; use kameo::{Actor, actor::ActorRef, mailbox}; use librqbit_utp::UtpSocketUdp; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{fs, sync::oneshot}; use tracing::{debug, error, info, instrument, trace, warn}; use super::util; @@ -253,6 +253,225 @@ impl TorrentActor { self.pending_start = false; } + /// Handles an incoming piece block from a peer. This is the main entry point + /// that orchestrates receiving, validating, and storing piece blocks. If + /// all blocks for a piece are received, it triggers piece completion + /// logic. + pub async fn incoming_piece(&mut self, index: usize, offset: usize, block: Bytes) { + let block_index = offset / BLOCK_SIZE; + + if self.is_duplicate_block(index, block_index) { + self + .broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len())) + .await; + trace!("Received duplicate piece block"); + return; + } + + self.initialize_and_mark_block(index, block_index); + + self.write_block_to_storage(index, offset, block).await; + + if self.is_piece_complete(index) { + self.piece_completed(index).await; + } else { + let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index); + self + .broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length)) + .await; + trace!(piece = piece_idx, "Requested next block"); + } + } + + /// Checks if a block has already been received and initializes the block map + /// for a piece if it doesn't exist yet. Also marks the current block as + /// received in the block map. + fn initialize_and_mark_block(&mut self, index: usize, block_index: usize) { + if !self.block_map.contains_key(&index) { + let info_dict = self + .info_dict() + .expect("Can't receive piece without info dict"); + + let piece_length = info_dict.piece_length as usize; + let total_blocks = piece_length.div_ceil(BLOCK_SIZE); + let mut vec = BitVec::with_capacity(total_blocks); + vec.resize(total_blocks, false); + self.block_map.insert(index, vec); + } + + self + .block_map + .get_mut(&index) + .unwrap() + .set(block_index, true); + } + + /// Writes a block to the appropriate storage location based on the + /// configured storage strategy. Currently supports disk-based storage + /// with file-based storage unimplemented. + async fn write_block_to_storage(&self, index: usize, offset: usize, block: Bytes) { + match &self.piece_storage { + PieceStorageStrategy::Disk(_) => { + let path = self + .get_piece_path(index) + .expect("Failed to get piece path"); + util::write_block_to_file(path, offset, block) + .await + .expect("Failed to write block to file") + } + PieceStorageStrategy::InFile => { + unimplemented!() + } + } + } + + /// Handles the completion of a full piece. This validates the piece hash, + /// sends it to the piece manager, updates the bitfield, notifies peers, + /// updates trackers, and either requests the next piece or transitions to + /// seeding mode if done. + async fn piece_completed(&mut self, index: usize) { + let info_dict = self + .info_dict() + .expect("Can't receive piece without info dict"); + + let previous_blocks = self.block_map.remove(&index); + let cur_piece = self.next_piece; + let piece_count = info_dict.piece_count(); + let total_length = info_dict.total_length(); + + if !self.validate_and_send_piece(index, previous_blocks).await { + return; + } + + self.next_piece += 1; + self.bitfield.set_aliased(index, true); + debug!( + piece_index = index, + pieces_left = piece_count.saturating_sub(index + 1), + "Piece is now complete" + ); + + self.broadcast_to_peers(PeerTell::Have(cur_piece)).await; + + if let Some(total_downloaded) = self.total_bytes_downloaded() { + let total_bytes_left = total_length - total_downloaded; + self + .update_trackers(TrackerUpdate::Left(total_bytes_left)) + .await; + } + + if self.next_piece >= piece_count - 1 { + self.state = TorrentState::Seeding; + self + .update_trackers(TrackerUpdate::Event(Event::Completed)) + .await; + self.broadcast_to_trackers(TrackerMessage::Announce).await; + info!("Torrenting process completed, switching to seeding mode"); + } else { + let (piece_idx, block_offset, block_length) = self.next_block_coordinates(self.next_piece); + self + .broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length)) + .await; + } + } + + /// Validates a completed piece by checking its hash and sends it to the + /// piece manager. Returns false if validation fails or the piece manager + /// rejects it, which triggers a re-request of the piece. Returns true if + /// the piece is successfully validated and stored. + async fn validate_and_send_piece( + &mut self, index: usize, previous_blocks: Option<(usize, BitVec)>, + ) -> bool { + let info_dict = self + .info_dict() + .expect("Can't receive piece without info dict"); + + match &self.piece_storage { + PieceStorageStrategy::Disk(_) => { + let path = self + .get_piece_path(index) + .expect("Failed to get piece path"); + + if util::validate_piece_file(path.clone(), info_dict.pieces[index]) + .await + .is_err() + { + warn!(path = %path.display(), index, "Piece file is invalid, clearing it"); + let path_clone = path.clone(); + + tokio::spawn(async move { + fs::remove_file(&path_clone).await.unwrap_or_else(|_| { + error!(path = ?path_clone.display(), "Failed to delete file piece"); + }); + }); + return false; + } + + let data = fs::read(&path).await.unwrap().into(); + if let Err(err) = self.piece_manager.recv(index, data).await { + warn!(?err, index, path = %path.display(), "Piece manager rejected piece; re-requesting"); + if let Some((_, mut blocks)) = previous_blocks { + blocks.fill(false); + self.block_map.insert(index, blocks); + } + let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index); + self + .broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length)) + .await; + return false; + } + } + PieceStorageStrategy::InFile => { + unimplemented!() + } + } + true + } + + /// Calculates the coordinates of the next block to request for a given + /// piece. Returns a tuple of (piece_index, offset, block_length) where + /// the offset points to the next unreceived block and the length accounts + /// for the final block potentially being smaller than the standard block + /// size. + pub fn next_block_coordinates(&self, piece_index: usize) -> (usize, usize, usize) { + let info_dict = self + .info_dict() + .expect("Can't receive piece without info dict"); + + let piece_length = info_dict.piece_length as usize; + + let next_block_index = self + .block_map + .get(&piece_index) + .and_then(|blocks| blocks.iter().position(|b| !*b)) + .unwrap_or(0); + + let offset = next_block_index * BLOCK_SIZE; + let is_overflowing = offset + BLOCK_SIZE > piece_length; + let block_length = if is_overflowing { + piece_length - offset + } else { + BLOCK_SIZE + }; + + (piece_index, offset, block_length) + } + + fn is_duplicate_block(&self, index: usize, block_index: usize) -> bool { + self + .block_map + .get(&index) + .map(|block_map| block_map[block_index]) + .unwrap_or(false) + } + + fn is_piece_complete(&self, index: usize) -> bool { + self + .block_map + .get(&index) + .map(|blocks| blocks.iter().all(|b| *b)) + .unwrap_or(false) + } pub async fn start(&mut self) { if self.is_full() { self.state = TorrentState::Seeding; diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index ca4ed50..f1160fe 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -75,6 +75,9 @@ pub(crate) enum TorrentMessage { /// /// Only should be used internally. ReadyHook(ReadyHook), + + /// Sent after the `PeerActor::on_start` is ran + PeerReady(PeerId), } impl fmt::Debug for TorrentMessage { @@ -191,161 +194,11 @@ impl Message for TorrentActor { warn!("Received kill tracker message for unknown tracker"); } } - TorrentMessage::IncomingPiece(index, offset, block) => { - let info_dict = self - .info_dict() - .expect("Can't receive piece without info dict"); - - let total_length = info_dict.total_length(); - - let block_index = offset / BLOCK_SIZE; - let piece_count = info_dict.piece_count(); - - if let Some(block_map) = &self.block_map.get_mut(&index) { - self - .broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len())) - .await; - if block_map[block_index] { - trace!("Received duplicate piece block"); - return; - } } else { - let total_blocks = (info_dict.piece_length as usize).div_ceil(BLOCK_SIZE); - let mut vec = BitVec::with_capacity(total_blocks); - vec.resize(total_blocks, false); - self.block_map.insert(index, vec); - }; - - self - .block_map - .get_mut(&index) - .unwrap() // Unwrap is safe because we just inserted the block map - .set(block_index, true); - - let block_len = block.len(); - - let is_piece_complete = self - .block_map - .get(&index) - .map(|blocks| blocks.iter().all(|b| *b)) - .unwrap_or(false); - - match &self.piece_storage { - PieceStorageStrategy::Disk(_) => { - let path = self - .get_piece_path(index) - .expect("Failed to get piece path"); - util::write_block_to_file(path, offset, block) - .await - .expect("Failed to write block to file") - } - PieceStorageStrategy::InFile => { - unimplemented!() - } - }; - - // We now have the full piece - if is_piece_complete { - let previous_blocks = self.block_map.remove(&index); - let cur_piece = self.next_piece; - - match &self.piece_storage { - PieceStorageStrategy::Disk(_) => { - let path = self - .get_piece_path(index) - .expect("Failed to get piece path"); - if util::validate_piece_file(path.clone(), info_dict.pieces[index]) - .await - .is_err() - { - warn!(path = %path.display(), index, "Piece file is invalid, clearing it"); - let path_clone = path.clone(); - - // Clears the piece on a new thread - tokio::spawn(async move { - fs::remove_file(&path_clone).await.unwrap_or_else(|_| { - error!("Failed to delete file for piece {}", &path_clone.display()); - }); - }); - return; - } - - let data = fs::read(&path).await.unwrap().into(); - if let Err(err) = self.piece_manager.recv(index, data).await { - warn!(?err, index, path = %path.display(), "Piece manager rejected piece; re-requesting"); - if let Some((_, mut blocks)) = previous_blocks { - blocks.fill(false); - self.block_map.insert(index, blocks); - } - self - .broadcast_to_peers(PeerTell::NeedPiece(index, 0, BLOCK_SIZE)) - .await; - return; - } - } - PieceStorageStrategy::InFile => { - unimplemented!() - } - } - - self.next_piece += 1; - self.bitfield.set_aliased(index, true); - debug!( - piece_index = index, - pieces_left = piece_count.saturating_sub(index + 1), - "Piece is now complete" - ); - - // Announce to peers that we have this piece - self.broadcast_to_peers(PeerTell::Have(cur_piece)).await; - - if let Some(total_downloaded) = self.total_bytes_downloaded() { - let total_bytes_left = total_length - total_downloaded; - self - .update_trackers(TrackerUpdate::Left(total_bytes_left)) - .await; - } - - if self.next_piece >= piece_count - 1 { - // Handle end of torrenting process - self.state = TorrentState::Seeding; - - // Announce to trackers that we have completed the torrent - self - .update_trackers(TrackerUpdate::Event(Event::Completed)) - .await; - self.broadcast_to_trackers(TrackerMessage::Announce).await; - - info!("Torrenting process completed, switching to seeding mode"); - } else { - self - .broadcast_to_peers(PeerTell::NeedPiece(self.next_piece, 0, BLOCK_SIZE)) - .await; - } - } else { - // We need more blocks - // Requests the next block at the next offset - let offset = offset + block_len; - - // Check if we're overflowing the piece, this is only when we're requesting the - // last block. This happens because if a piece is lets say 100 bytes, and we - // request 40 bytes per block, when we're on piece 2, we'll - // overflow and request 120 bytes instargo ad of 100. This checks if we're - // overflowing and if so, we'll request the remaining bytes of - // the piece - let is_overflowing = offset + BLOCK_SIZE > info_dict.piece_length as usize; - let next_block_len = if is_overflowing { - info_dict.piece_length as usize - offset - } else { - BLOCK_SIZE - }; - - self - .broadcast_to_peers(PeerTell::NeedPiece(index, offset, next_block_len)) - .await; - trace!(piece = index, "Requested next block"); - }; + TorrentMessage::IncomingPiece(index, offset, block) => { + self.incoming_piece(index, offset, block).await } + TorrentMessage::PieceStorage(strategy) => { if !self.is_empty() { // Intentional panic because this is unintended behavior From b248cdf956d377469f595f67ad55fd1f9c86fbdc Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 26 Nov 2025 13:26:27 -0800 Subject: [PATCH 2/9] fix: Send `Cancel` requests immediately after receiving a block --- crates/libtortillas/src/torrent/actor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index 7980c5e..5bfc393 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -261,15 +261,16 @@ impl TorrentActor { let block_index = offset / BLOCK_SIZE; if self.is_duplicate_block(index, block_index) { - self - .broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len())) - .await; trace!("Received duplicate piece block"); return; } self.initialize_and_mark_block(index, block_index); + self + .broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len())) + .await; + self.write_block_to_storage(index, offset, block).await; if self.is_piece_complete(index) { From a02744644430da48a93c71e8e6d95d0e6da1edc5 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 26 Nov 2025 15:03:53 -0800 Subject: [PATCH 3/9] feat: Request needed pieces from new peers on connection --- crates/libtortillas/src/peer/actor.rs | 5 +++++ crates/libtortillas/src/torrent/messages.rs | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/crates/libtortillas/src/peer/actor.rs b/crates/libtortillas/src/peer/actor.rs index 9d8dc2b..65da9db 100644 --- a/crates/libtortillas/src/peer/actor.rs +++ b/crates/libtortillas/src/peer/actor.rs @@ -330,6 +330,11 @@ impl Actor for PeerActor { stream.send(PeerMessages::Bitfield(bitfield)).await?; } + supervisor + .tell(TorrentMessage::PeerReady(peer.id.unwrap())) + .await + .map_err(|e| PeerActorError::SupervisorCommunicationFailed(e.to_string()))?; + Ok(Self { peer, stream, diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index f1160fe..5ac7db4 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -194,7 +194,25 @@ impl Message for TorrentActor { warn!("Received kill tracker message for unknown tracker"); } } + // If we're in the downloading state, send a piece request to the peer + // that just connected + TorrentMessage::PeerReady(id) => { + if let Some(actor) = self.peers.get(&id) + && actor.is_alive() + && self.state == TorrentState::Downloading + && self.is_ready() + { + let (piece_idx, block_offset, block_length) = + self.next_block_coordinates(self.next_piece); + actor + .tell(PeerTell::NeedPiece(piece_idx, block_offset, block_length)) + .await + .expect("Failed to send piece request to peer"); + trace!(peer_id = %id, piece_idx, block_offset, block_length, "Requested piece form new peer"); } else { + warn!("Received peer ready message for unknown peer"); + } + } TorrentMessage::IncomingPiece(index, offset, block) => { self.incoming_piece(index, offset, block).await } From 874db36dfd90bcfe25ad4d4e519fd789e2228d0b Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 12:47:24 -0800 Subject: [PATCH 4/9] fix: Potential panic from malicious or malformed block offset --- crates/libtortillas/src/torrent/actor.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index 5bfc393..3daa327 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -258,7 +258,22 @@ impl TorrentActor { /// all blocks for a piece are received, it triggers piece completion /// logic. pub async fn incoming_piece(&mut self, index: usize, offset: usize, block: Bytes) { + let info_dict = match &self.info { + Some(info) => info, + None => { + warn!("Received piece block before info dict was available"); + return; + } + }; + + let piece_length = info_dict.piece_length as usize; + let expected_blocks = piece_length.div_ceil(BLOCK_SIZE); + let block_index = offset / BLOCK_SIZE; + if block_index >= expected_blocks { + warn!("Received piece block with invalid offset"); + return; + } if self.is_duplicate_block(index, block_index) { trace!("Received duplicate piece block"); From a7331a148fe94d5245290c3628f0d028abce8567 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 12:55:54 -0800 Subject: [PATCH 5/9] fix: Off-by-one error in torrent completion check --- crates/libtortillas/src/torrent/actor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index 3daa327..db3275c 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -361,6 +361,7 @@ impl TorrentActor { self.next_piece += 1; self.bitfield.set_aliased(index, true); + debug!( piece_index = index, pieces_left = piece_count.saturating_sub(index + 1), @@ -376,7 +377,7 @@ impl TorrentActor { .await; } - if self.next_piece >= piece_count - 1 { + if self.next_piece >= piece_count { self.state = TorrentState::Seeding; self .update_trackers(TrackerUpdate::Event(Event::Completed)) From adc82f18b8fdb5ca1eb02c2004424baff3e205cd Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 12:57:01 -0800 Subject: [PATCH 6/9] fix: Possible panic in `is_duplicate_block` --- crates/libtortillas/src/torrent/actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index db3275c..feae30e 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -478,7 +478,7 @@ impl TorrentActor { self .block_map .get(&index) - .map(|block_map| block_map[block_index]) + .and_then(|block_map| block_map.get(block_index).as_deref().copied()) .unwrap_or(false) } From 7d3d936c5587e1b4df71c98e24e8e66010aa2fcf Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 12:58:37 -0800 Subject: [PATCH 7/9] refactor: Add more clear and verbose logging on unexpected `PeerReady` --- crates/libtortillas/src/torrent/messages.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index 5ac7db4..07625ba 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -210,7 +210,7 @@ impl Message for TorrentActor { .expect("Failed to send piece request to peer"); trace!(peer_id = %id, piece_idx, block_offset, block_length, "Requested piece form new peer"); } else { - warn!("Received peer ready message for unknown peer"); + trace!(peer_id = %id, state = ?self.state, ready = self.is_ready(), "Ignoring PeerReady: peer unknown, dead, or torrent not in download state"); } } TorrentMessage::IncomingPiece(index, offset, block) => { From 0ac1b65351d91cbda707e16edbcc825b1db17f97 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 17:47:20 -0800 Subject: [PATCH 8/9] style: Fix clippy errors --- crates/libtortillas/src/torrent/messages.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index 07625ba..eafa11e 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -12,11 +12,10 @@ use kameo::{ prelude::{Context, Message}, }; use sha1::{Digest, Sha1}; -use tokio::fs; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{info, instrument, trace, warn}; use super::{ - BLOCK_SIZE, PieceStorageStrategy, ReadyHook, TorrentActor, TorrentExport, TorrentState, util, + PieceStorageStrategy, ReadyHook, TorrentActor, TorrentExport, TorrentState, util, }; use crate::{ actor_request_response, @@ -25,7 +24,7 @@ use crate::{ peer::{Peer, PeerId, PeerTell}, protocol::stream::PeerStream, torrent::{PieceManagerProxy, piece_manager::PieceManager}, - tracker::{Event, Tracker, TrackerMessage, TrackerUpdate}, + tracker::Tracker, }; /// For incoming from outside sources (e.g Peers, Trackers and Engine) From 5f62cadb7e83aad4a5c29763282c5d94f3e92380 Mon Sep 17 00:00:00 2001 From: artrixdotdev Date: Wed, 3 Dec 2025 17:49:04 -0800 Subject: [PATCH 9/9] style: Fix typo in logs --- crates/libtortillas/src/torrent/messages.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index eafa11e..51cc13b 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -14,9 +14,7 @@ use kameo::{ use sha1::{Digest, Sha1}; use tracing::{info, instrument, trace, warn}; -use super::{ - PieceStorageStrategy, ReadyHook, TorrentActor, TorrentExport, TorrentState, util, -}; +use super::{PieceStorageStrategy, ReadyHook, TorrentActor, TorrentExport, TorrentState, util}; use crate::{ actor_request_response, hashes::InfoHash, @@ -207,7 +205,7 @@ impl Message for TorrentActor { .tell(PeerTell::NeedPiece(piece_idx, block_offset, block_length)) .await .expect("Failed to send piece request to peer"); - trace!(peer_id = %id, piece_idx, block_offset, block_length, "Requested piece form new peer"); + trace!(peer_id = %id, piece_idx, block_offset, block_length, "Requested piece from new peer"); } else { trace!(peer_id = %id, state = ?self.state, ready = self.is_ready(), "Ignoring PeerReady: peer unknown, dead, or torrent not in download state"); }