Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
392 changes: 384 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/template",
"crates/babel",
"crates/fetcher",
"crates/fetcher-api",
"crates/cosmos-keys"
]

Expand All @@ -17,6 +18,7 @@ runtime-docker-compose = { path = "crates/runtime-docker-compose" }
runtime-trait = { path = "crates/runtime-trait" }
catalog = { path = "crates/catalog" }
template = { path = "crates/template" }
fetcher-api = { path = "crates/fetcher-api" }
cosmos-keys = { path = "crates/cosmos-keys" }

serde = { version = "1.0", features = ["derive"] }
Expand All @@ -32,5 +34,6 @@ tinytemplate = "1.2"
askama = "0.14.0"
clap = { version = "4.5", features = ["derive"] }
reqwest = { version = "0.12", default-features = false }
jsonrpsee = { version = "0.24", features = ["server", "ws-client", "macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
12 changes: 12 additions & 0 deletions crates/fetcher-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "fetcher-api"
version = "0.1.0"
edition = "2021"

[dependencies]
jsonrpsee = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
eyre = { workspace = true }
tracing = "0.1"
22 changes: 22 additions & 0 deletions crates/fetcher-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use jsonrpsee::{core::SubscriptionResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};

/// Progress message sent over WebSocket
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ProgressMessage {
/// Total size is known
SetTotal { total: u64 },
/// Progress update
Update { downloaded: u64, total: Option<u64> },
/// Download finished
Finished,
}

/// JSON-RPC API for fetcher progress tracking
#[rpc(server, client)]
pub trait FetcherProgressApi {
/// Subscribe to download progress updates via WebSocket
#[subscription(name = "subscribeProgress", item = ProgressMessage)]
async fn subscribe_progress(&self) -> SubscriptionResult;
}
3 changes: 3 additions & 0 deletions crates/fetcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pin-project-lite = "0.2"
bytes = "1.0"
uuid = { version = "1.0", features = ["v4"] }
eyre = { workspace = true }
fetcher-api = { workspace = true }
jsonrpsee = { workspace = true }

url = "2.5"
flate2 = "1.0"
tar = "0.4"
Expand Down
11 changes: 11 additions & 0 deletions crates/fetcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Fetcher

A lightweight tool to fetch files from HTTP/HTTPS sources with progress tracking, checksum verification, and automatic extraction of tar.gz archives.

## WebSocket Progress Server

Enable the WebSocket server with `--ws` to stream real-time progress updates via JSON-RPC (default: `127.0.0.1:7070`).

```bash
echo '{"jsonrpc":"2.0","method":"subscribeProgress","params":[],"id":1}' | websocat ws://127.0.0.1:7070
```
42 changes: 40 additions & 2 deletions crates/fetcher/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use clap::Parser;
use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum};
use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum, MultiProgressTracker};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;

#[derive(Parser, Debug)]
#[command(name = "fetcher")]
Expand All @@ -20,6 +21,18 @@ struct Args {
/// Skip download if file exists and checksum matches (requires --checksum)
#[arg(long, requires = "checksum")]
skip_if_valid_checksum: bool,

/// Enable WebSocket progress server
#[arg(long)]
ws: bool,

/// Port for WebSocket server (default: 7070)
#[arg(long, default_value = "7070")]
ws_port: u16,

/// Bind address for WebSocket server (default: 127.0.0.1)
#[arg(long, default_value = "0.0.0.0")]
ws_bind_address: String,
}

#[tokio::main]
Expand Down Expand Up @@ -55,7 +68,32 @@ async fn main() {
}
}

let mut progress = ConsoleProgressTracker::new();
// Build progress tracker with console, and optionally WebSocket
let mut progress = MultiProgressTracker::new().add_tracker(ConsoleProgressTracker::new());

// Start WebSocket server if enabled
let _ws_server_handle = if args.ws {
use fetcher::websocket::{start_progress_server, WebSocketProgressTracker};
use fetcher_api::ProgressMessage;
use tokio::sync::broadcast;

let addr = format!("{}:{}", args.ws_bind_address, args.ws_port)
.parse()
.expect("Invalid bind address");

let (tx, _rx) = broadcast::channel::<ProgressMessage>(100);
let tx = Arc::new(tx);

let handle = start_progress_server(addr, tx.clone())
.await
.expect("Failed to start WebSocket server");

progress = progress.add_tracker(WebSocketProgressTracker::new(tx));

Some(handle)
} else {
None
};

if let Err(e) = fetch_with_progress(
&args.source,
Expand Down
40 changes: 40 additions & 0 deletions crates/fetcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use tokio::io::{AsyncRead, ReadBuf};
use url::Url;
use uuid::Uuid;

pub mod websocket;

#[derive(Debug, Clone, Copy, PartialEq)]
enum ArchiveFormat {
TarGz,
Expand Down Expand Up @@ -95,6 +97,44 @@ impl ProgressTracker for ConsoleProgressTracker {
}
}

/// A progress tracker that forwards updates to multiple trackers
pub struct MultiProgressTracker {
trackers: Vec<Box<dyn ProgressTracker>>,
}

impl MultiProgressTracker {
pub fn new() -> Self {
Self {
trackers: Vec::new(),
}
}

pub fn add_tracker<T: ProgressTracker + 'static>(mut self, tracker: T) -> Self {
self.trackers.push(Box::new(tracker));
self
}
}

impl ProgressTracker for MultiProgressTracker {
fn set_total(&mut self, total: u64) {
for tracker in &mut self.trackers {
tracker.set_total(total);
}
}

fn update(&mut self, downloaded: u64) {
for tracker in &mut self.trackers {
tracker.update(downloaded);
}
}

fn finish(&mut self) {
for tracker in &mut self.trackers {
tracker.finish();
}
}
}

pub async fn fetch(source: &str, destination: &PathBuf, checksum: Option<String>) -> Result<()> {
fetch_with_progress(source, destination, &mut NoOpProgressTracker, checksum).await
}
Expand Down
106 changes: 106 additions & 0 deletions crates/fetcher/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use fetcher_api::{FetcherProgressApiServer, ProgressMessage};
use jsonrpsee::core::SubscriptionResult;
use std::sync::Arc;
use tokio::sync::broadcast;

use crate::ProgressTracker;

/// Progress tracker that sends updates to a WebSocket broadcast channel
pub struct WebSocketProgressTracker {
total: Option<u64>,
tx: Arc<broadcast::Sender<ProgressMessage>>,
}

impl WebSocketProgressTracker {
pub fn new(tx: Arc<broadcast::Sender<ProgressMessage>>) -> Self {
Self { total: None, tx }
}
}

impl ProgressTracker for WebSocketProgressTracker {
fn set_total(&mut self, total: u64) {
self.total = Some(total);
let _ = self.tx.send(ProgressMessage::SetTotal { total });
}

fn update(&mut self, downloaded: u64) {
let _ = self.tx.send(ProgressMessage::Update {
downloaded,
total: self.total,
});
}

fn finish(&mut self) {
let _ = self.tx.send(ProgressMessage::Finished);
}
}

/// Implementation of the Fetcher Progress RPC server
pub struct FetcherProgressServer {
progress_tx: Arc<broadcast::Sender<ProgressMessage>>,
}

impl FetcherProgressServer {
pub fn new(progress_tx: Arc<broadcast::Sender<ProgressMessage>>) -> Self {
Self { progress_tx }
}
}

#[jsonrpsee::core::async_trait]
impl FetcherProgressApiServer for FetcherProgressServer {
async fn subscribe_progress(
&self,
pending: jsonrpsee::PendingSubscriptionSink,
) -> SubscriptionResult {
let sink = match pending.accept().await {
Ok(sink) => sink,
Err(err) => {
tracing::error!("failed to accept subscription: {err}");
return Err(err.to_string().into());
}
};

let mut rx = self.progress_tx.subscribe();

tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
let subscription_msg =
jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap();
let result = sink.send(subscription_msg).await;
if result.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Progress subscriber lagged by {} messages", n);
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
tracing::debug!("Progress subscription ended");
});

Ok(())
}
}

/// Start the WebSocket progress server
pub async fn start_progress_server(
addr: std::net::SocketAddr,
progress_tx: Arc<broadcast::Sender<ProgressMessage>>,
) -> Result<jsonrpsee::server::ServerHandle, Box<dyn std::error::Error>> {
use jsonrpsee::server::Server;

let server = Server::builder().build(addr).await?;

let progress_server = FetcherProgressServer::new(progress_tx);
let handle = server.start(progress_server.into_rpc());

tracing::info!("Progress WebSocket server listening on {}", addr);

Ok(handle)
}
Loading