diff --git a/crates/libtortillas/src/torrent/actor.rs b/crates/libtortillas/src/torrent/actor.rs index d82a328..9795064 100644 --- a/crates/libtortillas/src/torrent/actor.rs +++ b/crates/libtortillas/src/torrent/actor.rs @@ -31,7 +31,7 @@ use crate::{ TorrentExport, piece_manager::{FilePieceManager, PieceManager}, }, - tracker::{Tracker, TrackerActor, udp::UdpServer}, + tracker::{Event, Tracker, TrackerActor, TrackerMessage, TrackerUpdate, udp::UdpServer}, }; pub const BLOCK_SIZE: usize = 16 * 1024; @@ -257,6 +257,9 @@ impl TorrentActor { if self.is_full() { self.state = TorrentState::Seeding; info!(id = %self.info_hash(), "Torrent is now seeding"); + self + .update_trackers(TrackerUpdate::Event(Event::Completed)) + .await; } else { self.state = TorrentState::Downloading; info!(id = %self.info_hash(), "Torrent is now downloading"); @@ -264,19 +267,38 @@ impl TorrentActor { trace!(id = %self.info_hash(), peer_count = self.peers.len(), "Requesting first piece from peers"); self.next_piece = self.bitfield.first_zero().unwrap_or_default(); + // Announce that we have started + self + .update_trackers(TrackerUpdate::Event(Event::Started)) + .await; + + // Force announce + self.broadcast_to_trackers(TrackerMessage::Announce).await; + + // Now apperently we're supposed to set our event back to "empty" for the next + // announce (done via the interval), no clue why, just the way it's + // specified in the spec. + self + .update_trackers(TrackerUpdate::Event(Event::Empty)) + .await; + // Request first piece from peers self .broadcast_to_peers(PeerTell::NeedPiece(self.next_piece, 0, BLOCK_SIZE)) .await; self.start_time = Some(Instant::now()); } + // Send ready hook if let Some(err) = self.ready_hook.take().and_then(|hook| hook.send(()).err()) { error!(?err, "Failed to send ready hook"); } + let Some(info) = self.info.as_ref() else { warn!(id = %self.info_hash(), "Start requested before info dict is available; deferring"); return; }; + + // Start piece manager self .piece_manager // Probably not the best to clone here, but should be fine for now @@ -297,6 +319,60 @@ impl TorrentActor { ); } + /// Calculates the total number of bytes downloaded by the torrent. Returns + /// None if the info dict is not present. + pub fn total_bytes_downloaded(&self) -> Option { + let info = self.info_dict()?; + let total_length = info.total_length(); + let piece_length = info.piece_length as usize; + + let num_pieces = self.bitfield.len(); + let mut total_bytes = 0usize; + + // Calculate the size of the last piece + let last_piece_len = if total_length % piece_length == 0 { + piece_length + } else { + total_length % piece_length + }; + + // Sum bytes from completed pieces + for piece_idx in 0..num_pieces { + if self.bitfield[piece_idx] { + let piece_size = if piece_idx == num_pieces - 1 { + last_piece_len + } else { + piece_length + }; + total_bytes = total_bytes.saturating_add(piece_size); + } + } + + // Sum bytes from incomplete pieces via block_map + for (piece_idx, block) in self.block_map.iter().enumerate() { + if piece_idx < num_pieces && !self.bitfield[piece_idx] { + let piece_size = if piece_idx == num_pieces - 1 { + last_piece_len + } else { + piece_length + }; + + let mut piece_offset = 0usize; + for block_idx in 0..block.len() { + if block[block_idx] { + let block_size = (piece_size - piece_offset).min(BLOCK_SIZE); + total_bytes = total_bytes.saturating_add(block_size); + piece_offset = piece_offset.saturating_add(block_size); + } else { + piece_offset = + piece_offset.saturating_add(BLOCK_SIZE.min(piece_size - piece_offset)); + } + } + } + } + + Some(total_bytes) + } pub fn export(&self) -> TorrentExport { TorrentExport { info_hash: self.info_hash(), @@ -463,6 +539,59 @@ impl TorrentActor { } // Returns immediately, without waiting for any peer responses } + + /// Broadcasts a [`TrackerUpdate`] to all trackers concurrently. similar to + /// [`Self::broadcast_to_peers`], but for trackers. + #[instrument(skip(self, message), fields(torrent_id = %self.info_hash()))] + pub(super) async fn update_trackers(&self, message: TrackerUpdate) { + let trackers = self.trackers.clone(); + + let actor_refs: Vec<(Tracker, ActorRef)> = trackers + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + + for (uri, actor) in actor_refs { + let msg = message.clone(); + let trackers = trackers.clone(); + + tokio::spawn(async move { + if actor.is_alive() { + if let Err(e) = actor.tell(msg).await { + warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker"); + } + } else { + trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set"); + trackers.remove(&uri); + } + }); + } + } + + pub(super) async fn broadcast_to_trackers(&self, message: TrackerMessage) { + let trackers = self.trackers.clone(); + + let actor_refs: Vec<(Tracker, ActorRef)> = trackers + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + + for (uri, actor) in actor_refs { + let trackers = trackers.clone(); + + tokio::spawn(async move { + if actor.is_alive() { + if let Err(e) = actor.tell(message).await { + warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker"); + } + } else { + trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set"); + trackers.remove(&uri); + } + }); + } + } + /// Gets the path to a piece file based on the index. Only should be used /// when the piece storage strategy is [`Disk`](PieceStorageStrategy::Disk), /// this function will panic otherwise. diff --git a/crates/libtortillas/src/torrent/messages.rs b/crates/libtortillas/src/torrent/messages.rs index 88efa1e..ca4ed50 100644 --- a/crates/libtortillas/src/torrent/messages.rs +++ b/crates/libtortillas/src/torrent/messages.rs @@ -25,7 +25,7 @@ use crate::{ peer::{Peer, PeerId, PeerTell}, protocol::stream::PeerStream, torrent::{PieceManagerProxy, piece_manager::PieceManager}, - tracker::Tracker, + tracker::{Event, Tracker, TrackerMessage, TrackerUpdate}, }; /// For incoming from outside sources (e.g Peers, Trackers and Engine) @@ -196,6 +196,8 @@ impl Message for TorrentActor { .info_dict() .expect("Can't receive piece without info dict"); + let total_length = info_dict.total_length(); + let block_index = offset / BLOCK_SIZE; let piece_count = info_dict.piece_count(); @@ -296,9 +298,24 @@ impl Message for TorrentActor { // Announce to peers that we have this piece self.broadcast_to_peers(PeerTell::Have(cur_piece)).await; + + if let Some(total_downloaded) = self.total_bytes_downloaded() { + let total_bytes_left = total_length - total_downloaded; + self + .update_trackers(TrackerUpdate::Left(total_bytes_left)) + .await; + } + if self.next_piece >= piece_count - 1 { // Handle end of torrenting process self.state = TorrentState::Seeding; + + // Announce to trackers that we have completed the torrent + self + .update_trackers(TrackerUpdate::Event(Event::Completed)) + .await; + self.broadcast_to_trackers(TrackerMessage::Announce).await; + info!("Torrenting process completed, switching to seeding mode"); } else { self diff --git a/crates/libtortillas/src/tracker/http.rs b/crates/libtortillas/src/tracker/http.rs index 84b285f..fb99779 100644 --- a/crates/libtortillas/src/tracker/http.rs +++ b/crates/libtortillas/src/tracker/http.rs @@ -80,9 +80,12 @@ impl TrackerRequest { if let Some(left) = self.left { params.push(format!("left={left}")); } - let event_str = format!("{:?}", self.event).to_lowercase(); // Hack to get the string representation of the enum - params.push(format!("event={event_str}")); + // Don't include event if it's empty + if self.event != Event::Empty { + params.push(format!("event={:?}", self.event).to_lowercase()); + } + params.push(format!("compact={}", self.compact.unwrap_or(true) as u8)); params.join("&") diff --git a/crates/libtortillas/src/tracker/mod.rs b/crates/libtortillas/src/tracker/mod.rs index 275e345..0fdfac8 100644 --- a/crates/libtortillas/src/tracker/mod.rs +++ b/crates/libtortillas/src/tracker/mod.rs @@ -291,27 +291,26 @@ impl Actor for TrackerActor { } async fn next( - &mut self, _: kameo::prelude::WeakActorRef, + &mut self, actor_ref: kameo::prelude::WeakActorRef, mailbox_rx: &mut kameo::prelude::MailboxReceiver, ) -> Option> { tokio::select! { signal = mailbox_rx.recv() => signal, // Waits for the next interval to tick _ = self.interval.tick() => { - if let Ok(peers) = self.tracker.announce().await { - let _ = self.supervisor.tell(TorrentMessage::Announce(peers)).await; - } - let duration = Duration::from_secs(self.tracker.interval() as u64); - self.interval = interval(duration); - - None - } + let msg = TrackerMessage::Announce; + Some(Signal::Message{ message: Box::new(msg), + actor_ref: actor_ref.upgrade()?.clone(), + reply: None, + sent_within_actor: true, + })} } } } /// A message from an outside source. #[allow(dead_code)] +#[derive(Debug, Clone, Copy)] pub(crate) enum TrackerMessage { /// Forces the tracker to make an announce request. By default, announce /// requests are made on an interval. @@ -333,6 +332,15 @@ impl Message for TrackerActor { Ok(peers) => { if let Err(e) = self.supervisor.tell(TorrentMessage::Announce(peers)).await { error!("Failed to send announce to supervisor: {}", e); + } else { + // Ensure we don't have a delay of 0 + let delay = self.tracker.interval().max(1) as u64; + + self.interval = interval(Duration::from_secs(delay)); + // Tick because when starting a new interval, it will tick immediately and + // cause a never ending loop, adding this doesn't add any delay and fixes that + // issue + self.interval.tick().await; } } Err(e) => { @@ -347,6 +355,7 @@ impl Message for TrackerActor { } /// Updates the tracker's announce fields +#[derive(Debug, Clone)] pub enum TrackerUpdate { /// The amount of data uploaded, in bytes Uploaded(usize), @@ -383,8 +392,8 @@ impl Message for TrackerActor { )] #[repr(u32)] pub enum Event { - Empty = 0, #[default] + Empty = 0, Started = 1, Completed = 2, Stopped = 3,