diff --git a/Cargo.lock b/Cargo.lock index 0e9fd42..feaad2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,7 +274,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.5.2", "tracing", "url", "wasmtimer", @@ -409,7 +409,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", - "tower", + "tower 0.5.2", "tracing", "url", "wasmtimer", @@ -425,7 +425,7 @@ dependencies = [ "alloy-transport", "reqwest", "serde_json", - "tower", + "tower 0.5.2", "tracing", "url", ] @@ -856,7 +856,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -906,7 +906,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "tokio", - "tower", + "tower 0.5.2", "url", ] @@ -929,7 +929,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tower", + "tower 0.5.2", "tower-http", "tracing", "tracing-subscriber", @@ -1154,6 +1154,12 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" @@ -1235,6 +1241,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "const-hex" version = "1.17.0" @@ -1775,8 +1791,10 @@ dependencies = [ "bytes", "clap", "eyre", + "fetcher-api", "flate2", "futures-util", + "jsonrpsee", "pin-project-lite", "reqwest", "serde", @@ -1790,6 +1808,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "fetcher-api" +version = "0.1.0" +dependencies = [ + "eyre", + "jsonrpsee", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "ff" version = "0.13.1" @@ -1965,6 +1995,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2534,6 +2570,28 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -2554,6 +2612,135 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpsee" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e281ae70cc3b98dac15fced3366a880949e65fc66e345ce857a5682d152f3e62" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-proc-macros", + "jsonrpsee-server", + "jsonrpsee-types", + "jsonrpsee-ws-client", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-client-transport" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc4280b709ac3bb5e16cf3bad5056a0ec8df55fa89edfe996361219aadc2c7ea" +dependencies = [ + "base64", + "futures-util", + "http", + "jsonrpsee-core", + "pin-project", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348ee569eaed52926b5e740aae20863762b16596476e943c9e415a6479021622" +dependencies = [ + "async-trait", + "bytes", + "futures-timer", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types", + "parking_lot", + "pin-project", + "rand 0.8.5", + "rustc-hash", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7398cddf5013cca4702862a2692b66c48a3bd6cf6ec681a47453c93d63cf8de5" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.112", +] + +[[package]] +name = "jsonrpsee-server" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21429bcdda37dcf2d43b68621b994adede0e28061f816b038b0f18c70c143d51" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.4.13", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f05e0028e55b15dbd2107163b3c744cd3bb4474f193f95d9708acbf5677e44" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "jsonrpsee-ws-client" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78fc744f17e7926d57f478cf9ca6e1ee5d8332bf0514860b1a3cdf1742e614cc" +dependencies = [ + "http", + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types", + "url", +] + [[package]] name = "k256" version = "0.13.4" @@ -3503,7 +3690,7 @@ dependencies = [ "tokio-native-tls", "tokio-rustls", "tokio-util", - "tower", + "tower 0.5.2", "tower-http", "tower-service", "url", @@ -3557,6 +3744,12 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "ruint" version = "1.17.2" @@ -3673,6 +3866,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -3703,6 +3897,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework 3.5.1", + "security-framework-sys", + "webpki-root-certs 0.26.11", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -3739,6 +3960,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -3971,6 +4201,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.9" @@ -4073,6 +4314,22 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "soketto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" +dependencies = [ + "base64", + "bytes", + "futures", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", +] + [[package]] name = "spec" version = "0.1.0" @@ -4453,6 +4710,7 @@ checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -4488,6 +4746,21 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -4517,7 +4790,7 @@ dependencies = [ "http-body", "iri-string", "pin-project-lite", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -4755,6 +5028,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4884,6 +5167,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-root-certs" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" +dependencies = [ + "webpki-root-certs 1.0.5", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "1.0.4" @@ -4909,6 +5210,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -4985,6 +5295,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -5012,6 +5331,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -5045,6 +5379,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -5057,6 +5397,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -5069,6 +5415,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -5093,6 +5445,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -5105,6 +5463,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -5117,6 +5481,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -5129,6 +5499,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index d43b332..39c02e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/template", "crates/babel", "crates/fetcher", + "crates/fetcher-api", "crates/cosmos-keys" ] @@ -17,6 +18,7 @@ runtime-docker-compose = { path = "crates/runtime-docker-compose" } runtime-trait = { path = "crates/runtime-trait" } catalog = { path = "crates/catalog" } template = { path = "crates/template" } +fetcher-api = { path = "crates/fetcher-api" } cosmos-keys = { path = "crates/cosmos-keys" } serde = { version = "1.0", features = ["derive"] } @@ -32,5 +34,6 @@ tinytemplate = "1.2" askama = "0.14.0" clap = { version = "4.5", features = ["derive"] } reqwest = { version = "0.12", default-features = false } +jsonrpsee = { version = "0.24", features = ["server", "ws-client", "macros"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/crates/fetcher-api/Cargo.toml b/crates/fetcher-api/Cargo.toml new file mode 100644 index 0000000..aca33c7 --- /dev/null +++ b/crates/fetcher-api/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fetcher-api" +version = "0.1.0" +edition = "2021" + +[dependencies] +jsonrpsee = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +eyre = { workspace = true } +tracing = "0.1" diff --git a/crates/fetcher-api/src/lib.rs b/crates/fetcher-api/src/lib.rs new file mode 100644 index 0000000..07635ae --- /dev/null +++ b/crates/fetcher-api/src/lib.rs @@ -0,0 +1,22 @@ +use jsonrpsee::{core::SubscriptionResult, proc_macros::rpc}; +use serde::{Deserialize, Serialize}; + +/// Progress message sent over WebSocket +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ProgressMessage { + /// Total size is known + SetTotal { total: u64 }, + /// Progress update + Update { downloaded: u64, total: Option }, + /// Download finished + Finished, +} + +/// JSON-RPC API for fetcher progress tracking +#[rpc(server, client)] +pub trait FetcherProgressApi { + /// Subscribe to download progress updates via WebSocket + #[subscription(name = "subscribeProgress", item = ProgressMessage)] + async fn subscribe_progress(&self) -> SubscriptionResult; +} diff --git a/crates/fetcher/Cargo.toml b/crates/fetcher/Cargo.toml index 0f1bc2c..6fd61b4 100644 --- a/crates/fetcher/Cargo.toml +++ b/crates/fetcher/Cargo.toml @@ -16,6 +16,9 @@ pin-project-lite = "0.2" bytes = "1.0" uuid = { version = "1.0", features = ["v4"] } eyre = { workspace = true } +fetcher-api = { workspace = true } +jsonrpsee = { workspace = true } + url = "2.5" flate2 = "1.0" tar = "0.4" diff --git a/crates/fetcher/README.md b/crates/fetcher/README.md new file mode 100644 index 0000000..77723c8 --- /dev/null +++ b/crates/fetcher/README.md @@ -0,0 +1,11 @@ +# Fetcher + +A lightweight tool to fetch files from HTTP/HTTPS sources with progress tracking, checksum verification, and automatic extraction of tar.gz archives. + +## WebSocket Progress Server + +Enable the WebSocket server with `--ws` to stream real-time progress updates via JSON-RPC (default: `127.0.0.1:7070`). + +```bash +echo '{"jsonrpc":"2.0","method":"subscribeProgress","params":[],"id":1}' | websocat ws://127.0.0.1:7070 +``` diff --git a/crates/fetcher/bin/main.rs b/crates/fetcher/bin/main.rs index c888561..9e58ce3 100644 --- a/crates/fetcher/bin/main.rs +++ b/crates/fetcher/bin/main.rs @@ -1,7 +1,8 @@ use clap::Parser; -use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum}; +use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum, MultiProgressTracker}; use std::path::PathBuf; use std::process; +use std::sync::Arc; #[derive(Parser, Debug)] #[command(name = "fetcher")] @@ -20,6 +21,18 @@ struct Args { /// Skip download if file exists and checksum matches (requires --checksum) #[arg(long, requires = "checksum")] skip_if_valid_checksum: bool, + + /// Enable WebSocket progress server + #[arg(long)] + ws: bool, + + /// Port for WebSocket server (default: 7070) + #[arg(long, default_value = "7070")] + ws_port: u16, + + /// Bind address for WebSocket server (default: 127.0.0.1) + #[arg(long, default_value = "0.0.0.0")] + ws_bind_address: String, } #[tokio::main] @@ -55,7 +68,32 @@ async fn main() { } } - let mut progress = ConsoleProgressTracker::new(); + // Build progress tracker with console, and optionally WebSocket + let mut progress = MultiProgressTracker::new().add_tracker(ConsoleProgressTracker::new()); + + // Start WebSocket server if enabled + let _ws_server_handle = if args.ws { + use fetcher::websocket::{start_progress_server, WebSocketProgressTracker}; + use fetcher_api::ProgressMessage; + use tokio::sync::broadcast; + + let addr = format!("{}:{}", args.ws_bind_address, args.ws_port) + .parse() + .expect("Invalid bind address"); + + let (tx, _rx) = broadcast::channel::(100); + let tx = Arc::new(tx); + + let handle = start_progress_server(addr, tx.clone()) + .await + .expect("Failed to start WebSocket server"); + + progress = progress.add_tracker(WebSocketProgressTracker::new(tx)); + + Some(handle) + } else { + None + }; if let Err(e) = fetch_with_progress( &args.source, diff --git a/crates/fetcher/src/lib.rs b/crates/fetcher/src/lib.rs index c2cbe14..c154b6f 100644 --- a/crates/fetcher/src/lib.rs +++ b/crates/fetcher/src/lib.rs @@ -12,6 +12,8 @@ use tokio::io::{AsyncRead, ReadBuf}; use url::Url; use uuid::Uuid; +pub mod websocket; + #[derive(Debug, Clone, Copy, PartialEq)] enum ArchiveFormat { TarGz, @@ -95,6 +97,44 @@ impl ProgressTracker for ConsoleProgressTracker { } } +/// A progress tracker that forwards updates to multiple trackers +pub struct MultiProgressTracker { + trackers: Vec>, +} + +impl MultiProgressTracker { + pub fn new() -> Self { + Self { + trackers: Vec::new(), + } + } + + pub fn add_tracker(mut self, tracker: T) -> Self { + self.trackers.push(Box::new(tracker)); + self + } +} + +impl ProgressTracker for MultiProgressTracker { + fn set_total(&mut self, total: u64) { + for tracker in &mut self.trackers { + tracker.set_total(total); + } + } + + fn update(&mut self, downloaded: u64) { + for tracker in &mut self.trackers { + tracker.update(downloaded); + } + } + + fn finish(&mut self) { + for tracker in &mut self.trackers { + tracker.finish(); + } + } +} + pub async fn fetch(source: &str, destination: &PathBuf, checksum: Option) -> Result<()> { fetch_with_progress(source, destination, &mut NoOpProgressTracker, checksum).await } diff --git a/crates/fetcher/src/websocket.rs b/crates/fetcher/src/websocket.rs new file mode 100644 index 0000000..1a429ae --- /dev/null +++ b/crates/fetcher/src/websocket.rs @@ -0,0 +1,106 @@ +use fetcher_api::{FetcherProgressApiServer, ProgressMessage}; +use jsonrpsee::core::SubscriptionResult; +use std::sync::Arc; +use tokio::sync::broadcast; + +use crate::ProgressTracker; + +/// Progress tracker that sends updates to a WebSocket broadcast channel +pub struct WebSocketProgressTracker { + total: Option, + tx: Arc>, +} + +impl WebSocketProgressTracker { + pub fn new(tx: Arc>) -> Self { + Self { total: None, tx } + } +} + +impl ProgressTracker for WebSocketProgressTracker { + fn set_total(&mut self, total: u64) { + self.total = Some(total); + let _ = self.tx.send(ProgressMessage::SetTotal { total }); + } + + fn update(&mut self, downloaded: u64) { + let _ = self.tx.send(ProgressMessage::Update { + downloaded, + total: self.total, + }); + } + + fn finish(&mut self) { + let _ = self.tx.send(ProgressMessage::Finished); + } +} + +/// Implementation of the Fetcher Progress RPC server +pub struct FetcherProgressServer { + progress_tx: Arc>, +} + +impl FetcherProgressServer { + pub fn new(progress_tx: Arc>) -> Self { + Self { progress_tx } + } +} + +#[jsonrpsee::core::async_trait] +impl FetcherProgressApiServer for FetcherProgressServer { + async fn subscribe_progress( + &self, + pending: jsonrpsee::PendingSubscriptionSink, + ) -> SubscriptionResult { + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(err) => { + tracing::error!("failed to accept subscription: {err}"); + return Err(err.to_string().into()); + } + }; + + let mut rx = self.progress_tx.subscribe(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(msg) => { + let subscription_msg = + jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap(); + let result = sink.send(subscription_msg).await; + if result.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Progress subscriber lagged by {} messages", n); + } + Err(broadcast::error::RecvError::Closed) => { + break; + } + } + } + tracing::debug!("Progress subscription ended"); + }); + + Ok(()) + } +} + +/// Start the WebSocket progress server +pub async fn start_progress_server( + addr: std::net::SocketAddr, + progress_tx: Arc>, +) -> Result> { + use jsonrpsee::server::Server; + + let server = Server::builder().build(addr).await?; + + let progress_server = FetcherProgressServer::new(progress_tx); + let handle = server.start(progress_server.into_rpc()); + + tracing::info!("Progress WebSocket server listening on {}", addr); + + Ok(handle) +}