Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 130 additions & 1 deletion crates/libtortillas/src/torrent/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
TorrentExport,
piece_manager::{FilePieceManager, PieceManager},
},
tracker::{Tracker, TrackerActor, udp::UdpServer},
tracker::{Event, Tracker, TrackerActor, TrackerMessage, TrackerUpdate, udp::UdpServer},
};
pub const BLOCK_SIZE: usize = 16 * 1024;

Expand Down Expand Up @@ -257,26 +257,48 @@ impl TorrentActor {
if self.is_full() {
self.state = TorrentState::Seeding;
info!(id = %self.info_hash(), "Torrent is now seeding");
self
.update_trackers(TrackerUpdate::Event(Event::Completed))
.await;
} else {
self.state = TorrentState::Downloading;
info!(id = %self.info_hash(), "Torrent is now downloading");

trace!(id = %self.info_hash(), peer_count = self.peers.len(), "Requesting first piece from peers");

self.next_piece = self.bitfield.first_zero().unwrap_or_default();
// Announce that we have started
self
.update_trackers(TrackerUpdate::Event(Event::Started))
.await;

// Force announce
self.broadcast_to_trackers(TrackerMessage::Announce).await;

// Now apperently we're supposed to set our event back to "empty" for the next
// announce (done via the interval), no clue why, just the way it's
// specified in the spec.
self
.update_trackers(TrackerUpdate::Event(Event::Empty))
.await;

// Request first piece from peers
self
.broadcast_to_peers(PeerTell::NeedPiece(self.next_piece, 0, BLOCK_SIZE))
.await;
self.start_time = Some(Instant::now());
}
// Send ready hook
if let Some(err) = self.ready_hook.take().and_then(|hook| hook.send(()).err()) {
error!(?err, "Failed to send ready hook");
}

let Some(info) = self.info.as_ref() else {
warn!(id = %self.info_hash(), "Start requested before info dict is available; deferring");
return;
};

// Start piece manager
self
.piece_manager
// Probably not the best to clone here, but should be fine for now
Expand All @@ -297,6 +319,60 @@ impl TorrentActor {
);
}

/// Calculates the total number of bytes downloaded by the torrent. Returns
/// None if the info dict is not present.
pub fn total_bytes_downloaded(&self) -> Option<usize> {
let info = self.info_dict()?;
let total_length = info.total_length();
let piece_length = info.piece_length as usize;

let num_pieces = self.bitfield.len();
let mut total_bytes = 0usize;

// Calculate the size of the last piece
let last_piece_len = if total_length % piece_length == 0 {
piece_length
} else {
total_length % piece_length
};

// Sum bytes from completed pieces
for piece_idx in 0..num_pieces {
if self.bitfield[piece_idx] {
let piece_size = if piece_idx == num_pieces - 1 {
last_piece_len
} else {
piece_length
};
total_bytes = total_bytes.saturating_add(piece_size);
}
}

// Sum bytes from incomplete pieces via block_map
for (piece_idx, block) in self.block_map.iter().enumerate() {
if piece_idx < num_pieces && !self.bitfield[piece_idx] {
let piece_size = if piece_idx == num_pieces - 1 {
last_piece_len
} else {
piece_length
};

let mut piece_offset = 0usize;
for block_idx in 0..block.len() {
if block[block_idx] {
let block_size = (piece_size - piece_offset).min(BLOCK_SIZE);
total_bytes = total_bytes.saturating_add(block_size);
piece_offset = piece_offset.saturating_add(block_size);
} else {
piece_offset =
piece_offset.saturating_add(BLOCK_SIZE.min(piece_size - piece_offset));
}
}
}
}

Some(total_bytes)
}
pub fn export(&self) -> TorrentExport {
TorrentExport {
info_hash: self.info_hash(),
Expand Down Expand Up @@ -463,6 +539,59 @@ impl TorrentActor {
}
// Returns immediately, without waiting for any peer responses
}

/// Broadcasts a [`TrackerUpdate`] to all trackers concurrently. similar to
/// [`Self::broadcast_to_peers`], but for trackers.
#[instrument(skip(self, message), fields(torrent_id = %self.info_hash()))]
pub(super) async fn update_trackers(&self, message: TrackerUpdate) {
let trackers = self.trackers.clone();

let actor_refs: Vec<(Tracker, ActorRef<TrackerActor>)> = trackers
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();

for (uri, actor) in actor_refs {
let msg = message.clone();
let trackers = trackers.clone();

tokio::spawn(async move {
if actor.is_alive() {
if let Err(e) = actor.tell(msg).await {
warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker");
}
} else {
trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set");
trackers.remove(&uri);
}
});
}
}

pub(super) async fn broadcast_to_trackers(&self, message: TrackerMessage) {
let trackers = self.trackers.clone();

let actor_refs: Vec<(Tracker, ActorRef<TrackerActor>)> = trackers
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();

for (uri, actor) in actor_refs {
let trackers = trackers.clone();

tokio::spawn(async move {
if actor.is_alive() {
if let Err(e) = actor.tell(message).await {
warn!(error = %e, tracker_uri = ?uri, "Failed to send to tracker");
}
} else {
trace!(tracker_uri = ?uri, "Tracker actor is dead, removing from trackers set");
trackers.remove(&uri);
}
});
}
}

/// Gets the path to a piece file based on the index. Only should be used
/// when the piece storage strategy is [`Disk`](PieceStorageStrategy::Disk),
/// this function will panic otherwise.
Expand Down
19 changes: 18 additions & 1 deletion crates/libtortillas/src/torrent/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
peer::{Peer, PeerId, PeerTell},
protocol::stream::PeerStream,
torrent::{PieceManagerProxy, piece_manager::PieceManager},
tracker::Tracker,
tracker::{Event, Tracker, TrackerMessage, TrackerUpdate},
};

/// For incoming from outside sources (e.g Peers, Trackers and Engine)
Expand Down Expand Up @@ -196,6 +196,8 @@ impl Message<TorrentMessage> for TorrentActor {
.info_dict()
.expect("Can't receive piece without info dict");

let total_length = info_dict.total_length();

let block_index = offset / BLOCK_SIZE;
let piece_count = info_dict.piece_count();

Expand Down Expand Up @@ -296,9 +298,24 @@ impl Message<TorrentMessage> for TorrentActor {

// Announce to peers that we have this piece
self.broadcast_to_peers(PeerTell::Have(cur_piece)).await;

if let Some(total_downloaded) = self.total_bytes_downloaded() {
let total_bytes_left = total_length - total_downloaded;
self
.update_trackers(TrackerUpdate::Left(total_bytes_left))
.await;
}

if self.next_piece >= piece_count - 1 {
// Handle end of torrenting process
self.state = TorrentState::Seeding;

// Announce to trackers that we have completed the torrent
self
.update_trackers(TrackerUpdate::Event(Event::Completed))
.await;
self.broadcast_to_trackers(TrackerMessage::Announce).await;

info!("Torrenting process completed, switching to seeding mode");
} else {
self
Expand Down
7 changes: 5 additions & 2 deletions crates/libtortillas/src/tracker/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ impl TrackerRequest {
if let Some(left) = self.left {
params.push(format!("left={left}"));
}
let event_str = format!("{:?}", self.event).to_lowercase(); // Hack to get the string representation of the enum

params.push(format!("event={event_str}"));
// Don't include event if it's empty
if self.event != Event::Empty {
params.push(format!("event={:?}", self.event).to_lowercase());
}

params.push(format!("compact={}", self.compact.unwrap_or(true) as u8));

params.join("&")
Expand Down
29 changes: 19 additions & 10 deletions crates/libtortillas/src/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,27 +291,26 @@ impl Actor for TrackerActor {
}

async fn next(
&mut self, _: kameo::prelude::WeakActorRef<Self>,
&mut self, actor_ref: kameo::prelude::WeakActorRef<Self>,
mailbox_rx: &mut kameo::prelude::MailboxReceiver<Self>,
) -> Option<Signal<Self>> {
tokio::select! {
signal = mailbox_rx.recv() => signal,
// Waits for the next interval to tick
_ = self.interval.tick() => {
if let Ok(peers) = self.tracker.announce().await {
let _ = self.supervisor.tell(TorrentMessage::Announce(peers)).await;
}
let duration = Duration::from_secs(self.tracker.interval() as u64);
self.interval = interval(duration);

None
}
let msg = TrackerMessage::Announce;
Some(Signal::Message{ message: Box::new(msg),
actor_ref: actor_ref.upgrade()?.clone(),
reply: None,
sent_within_actor: true,
})}
}
}
}

/// A message from an outside source.
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub(crate) enum TrackerMessage {
/// Forces the tracker to make an announce request. By default, announce
/// requests are made on an interval.
Expand All @@ -333,6 +332,15 @@ impl Message<TrackerMessage> for TrackerActor {
Ok(peers) => {
if let Err(e) = self.supervisor.tell(TorrentMessage::Announce(peers)).await {
error!("Failed to send announce to supervisor: {}", e);
} else {
// Ensure we don't have a delay of 0
let delay = self.tracker.interval().max(1) as u64;

self.interval = interval(Duration::from_secs(delay));
// Tick because when starting a new interval, it will tick immediately and
// cause a never ending loop, adding this doesn't add any delay and fixes that
// issue
self.interval.tick().await;
}
}
Err(e) => {
Expand All @@ -347,6 +355,7 @@ impl Message<TrackerMessage> for TrackerActor {
}

/// Updates the tracker's announce fields
#[derive(Debug, Clone)]
pub enum TrackerUpdate {
/// The amount of data uploaded, in bytes
Uploaded(usize),
Expand Down Expand Up @@ -383,8 +392,8 @@ impl Message<TrackerUpdate> for TrackerActor {
)]
#[repr(u32)]
pub enum Event {
Empty = 0,
#[default]
Empty = 0,
Started = 1,
Completed = 2,
Stopped = 3,
Expand Down
Loading