Skip to content
5 changes: 5 additions & 0 deletions crates/libtortillas/src/peer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ impl Actor for PeerActor {
stream.send(PeerMessages::Bitfield(bitfield)).await?;
}

supervisor
.tell(TorrentMessage::PeerReady(peer.id.unwrap()))
.await
.map_err(|e| PeerActorError::SupervisorCommunicationFailed(e.to_string()))?;

Ok(Self {
peer,
stream,
Expand Down
238 changes: 237 additions & 1 deletion crates/libtortillas/src/torrent/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use dashmap::DashMap;
use kameo::{Actor, actor::ActorRef, mailbox};
use librqbit_utp::UtpSocketUdp;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::{fs, sync::oneshot};
use tracing::{debug, error, info, instrument, trace, warn};

use super::util;
Expand Down Expand Up @@ -253,6 +253,242 @@ impl TorrentActor {
self.pending_start = false;
}

/// Handles an incoming piece block from a peer. This is the main entry point
/// that orchestrates receiving, validating, and storing piece blocks. If
/// all blocks for a piece are received, it triggers piece completion
/// logic.
pub async fn incoming_piece(&mut self, index: usize, offset: usize, block: Bytes) {
let info_dict = match &self.info {
Some(info) => info,
None => {
warn!("Received piece block before info dict was available");
return;
}
};

let piece_length = info_dict.piece_length as usize;
let expected_blocks = piece_length.div_ceil(BLOCK_SIZE);

let block_index = offset / BLOCK_SIZE;
if block_index >= expected_blocks {
warn!("Received piece block with invalid offset");
return;
}

if self.is_duplicate_block(index, block_index) {
trace!("Received duplicate piece block");
return;
}

self.initialize_and_mark_block(index, block_index);

self
.broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len()))
.await;

self.write_block_to_storage(index, offset, block).await;

if self.is_piece_complete(index) {
self.piece_completed(index).await;
} else {
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index);
self
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
.await;
trace!(piece = piece_idx, "Requested next block");
}
}

/// Checks if a block has already been received and initializes the block map
/// for a piece if it doesn't exist yet. Also marks the current block as
/// received in the block map.
fn initialize_and_mark_block(&mut self, index: usize, block_index: usize) {
if !self.block_map.contains_key(&index) {
let info_dict = self
.info_dict()
.expect("Can't receive piece without info dict");

let piece_length = info_dict.piece_length as usize;
let total_blocks = piece_length.div_ceil(BLOCK_SIZE);
let mut vec = BitVec::with_capacity(total_blocks);
vec.resize(total_blocks, false);
self.block_map.insert(index, vec);
}

self
.block_map
.get_mut(&index)
.unwrap()
.set(block_index, true);
}

/// Writes a block to the appropriate storage location based on the
/// configured storage strategy. Currently supports disk-based storage
/// with file-based storage unimplemented.
async fn write_block_to_storage(&self, index: usize, offset: usize, block: Bytes) {
match &self.piece_storage {
PieceStorageStrategy::Disk(_) => {
let path = self
.get_piece_path(index)
.expect("Failed to get piece path");
util::write_block_to_file(path, offset, block)
.await
.expect("Failed to write block to file")
}
PieceStorageStrategy::InFile => {
unimplemented!()
}
}
}

/// Handles the completion of a full piece. This validates the piece hash,
/// sends it to the piece manager, updates the bitfield, notifies peers,
/// updates trackers, and either requests the next piece or transitions to
/// seeding mode if done.
async fn piece_completed(&mut self, index: usize) {
let info_dict = self
.info_dict()
.expect("Can't receive piece without info dict");

let previous_blocks = self.block_map.remove(&index);
let cur_piece = self.next_piece;
let piece_count = info_dict.piece_count();
let total_length = info_dict.total_length();

if !self.validate_and_send_piece(index, previous_blocks).await {
return;
}

self.next_piece += 1;
self.bitfield.set_aliased(index, true);

debug!(
piece_index = index,
pieces_left = piece_count.saturating_sub(index + 1),
"Piece is now complete"
);

self.broadcast_to_peers(PeerTell::Have(cur_piece)).await;

if let Some(total_downloaded) = self.total_bytes_downloaded() {
let total_bytes_left = total_length - total_downloaded;
self
.update_trackers(TrackerUpdate::Left(total_bytes_left))
.await;
}

if self.next_piece >= piece_count {
self.state = TorrentState::Seeding;
self
.update_trackers(TrackerUpdate::Event(Event::Completed))
.await;
self.broadcast_to_trackers(TrackerMessage::Announce).await;
info!("Torrenting process completed, switching to seeding mode");
} else {
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(self.next_piece);
self
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
.await;
}
}

/// Validates a completed piece by checking its hash and sends it to the
/// piece manager. Returns false if validation fails or the piece manager
/// rejects it, which triggers a re-request of the piece. Returns true if
/// the piece is successfully validated and stored.
async fn validate_and_send_piece(
&mut self, index: usize, previous_blocks: Option<(usize, BitVec)>,
) -> bool {
let info_dict = self
.info_dict()
.expect("Can't receive piece without info dict");

match &self.piece_storage {
PieceStorageStrategy::Disk(_) => {
let path = self
.get_piece_path(index)
.expect("Failed to get piece path");

if util::validate_piece_file(path.clone(), info_dict.pieces[index])
.await
.is_err()
{
warn!(path = %path.display(), index, "Piece file is invalid, clearing it");
let path_clone = path.clone();

tokio::spawn(async move {
fs::remove_file(&path_clone).await.unwrap_or_else(|_| {
error!(path = ?path_clone.display(), "Failed to delete file piece");
});
});
return false;
}

let data = fs::read(&path).await.unwrap().into();
if let Err(err) = self.piece_manager.recv(index, data).await {
warn!(?err, index, path = %path.display(), "Piece manager rejected piece; re-requesting");
if let Some((_, mut blocks)) = previous_blocks {
blocks.fill(false);
self.block_map.insert(index, blocks);
}
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index);
self
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
.await;
return false;
}
}
PieceStorageStrategy::InFile => {
unimplemented!()
}
}
true
}

/// Calculates the coordinates of the next block to request for a given
/// piece. Returns a tuple of (piece_index, offset, block_length) where
/// the offset points to the next unreceived block and the length accounts
/// for the final block potentially being smaller than the standard block
/// size.
pub fn next_block_coordinates(&self, piece_index: usize) -> (usize, usize, usize) {
let info_dict = self
.info_dict()
.expect("Can't receive piece without info dict");

let piece_length = info_dict.piece_length as usize;

let next_block_index = self
.block_map
.get(&piece_index)
.and_then(|blocks| blocks.iter().position(|b| !*b))
.unwrap_or(0);

let offset = next_block_index * BLOCK_SIZE;
let is_overflowing = offset + BLOCK_SIZE > piece_length;
let block_length = if is_overflowing {
piece_length - offset
} else {
BLOCK_SIZE
};

(piece_index, offset, block_length)
}

fn is_duplicate_block(&self, index: usize, block_index: usize) -> bool {
self
.block_map
.get(&index)
.and_then(|block_map| block_map.get(block_index).as_deref().copied())
.unwrap_or(false)
}

fn is_piece_complete(&self, index: usize) -> bool {
self
.block_map
.get(&index)
.map(|blocks| blocks.iter().all(|b| *b))
.unwrap_or(false)
}
pub async fn start(&mut self) {
if self.is_full() {
self.state = TorrentState::Seeding;
Expand Down
Loading