From beca33ade633fbc119fb53fa74ff3f7c13c87cbb Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Sun, 4 Jan 2026 12:02:16 +0100 Subject: [PATCH 1/3] Websocket api --- Cargo.lock | 393 +++++++++++++++++++++++++++++++- Cargo.toml | 5 +- crates/fetcher-api/Cargo.toml | 12 + crates/fetcher-api/src/lib.rs | 22 ++ crates/fetcher/Cargo.toml | 4 + crates/fetcher/src/lib.rs | 2 + crates/fetcher/src/websocket.rs | 97 ++++++++ 7 files changed, 526 insertions(+), 9 deletions(-) create mode 100644 crates/fetcher-api/Cargo.toml create mode 100644 crates/fetcher-api/src/lib.rs create mode 100644 crates/fetcher/src/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index 7dfe700..abc8074 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,7 +274,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.5.2", "tracing", "url", "wasmtimer", @@ -409,7 +409,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", - "tower", + "tower 0.5.2", "tracing", "url", "wasmtimer", @@ -425,7 +425,7 @@ dependencies = [ "alloy-transport", "reqwest", "serde_json", - "tower", + "tower 0.5.2", "tracing", "url", ] @@ -856,7 +856,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -906,7 +906,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "tokio", - "tower", + "tower 0.5.2", "url", ] @@ -929,7 +929,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tower", + "tower 0.5.2", "tower-http", "tracing", "tracing-subscriber", @@ -1158,6 +1158,12 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" @@ -1239,6 +1245,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "const-hex" version = "1.17.0" @@ -1763,15 +1779,30 @@ version = "0.1.0" dependencies = [ "clap", "eyre", + "fetcher-api", "flate2", + "jsonrpsee", "reqwest", "sha2", "tar", + "tokio", "tracing", "tracing-subscriber", "url", ] +[[package]] +name = "fetcher-api" +version = "0.1.0" +dependencies = [ + "eyre", + "jsonrpsee", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "ff" version = "0.13.1" @@ -1947,6 +1978,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2516,6 +2553,28 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -2536,6 +2595,135 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpsee" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e281ae70cc3b98dac15fced3366a880949e65fc66e345ce857a5682d152f3e62" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-proc-macros", + "jsonrpsee-server", + "jsonrpsee-types", + "jsonrpsee-ws-client", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-client-transport" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc4280b709ac3bb5e16cf3bad5056a0ec8df55fa89edfe996361219aadc2c7ea" +dependencies = [ + "base64", + "futures-util", + "http", + "jsonrpsee-core", + "pin-project", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348ee569eaed52926b5e740aae20863762b16596476e943c9e415a6479021622" +dependencies = [ + "async-trait", + "bytes", + "futures-timer", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types", + "parking_lot", + "pin-project", + "rand 0.8.5", + "rustc-hash", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7398cddf5013cca4702862a2692b66c48a3bd6cf6ec681a47453c93d63cf8de5" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.112", +] + +[[package]] +name = "jsonrpsee-server" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21429bcdda37dcf2d43b68621b994adede0e28061f816b038b0f18c70c143d51" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.4.13", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f05e0028e55b15dbd2107163b3c744cd3bb4474f193f95d9708acbf5677e44" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "jsonrpsee-ws-client" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78fc744f17e7926d57f478cf9ca6e1ee5d8332bf0514860b1a3cdf1742e614cc" +dependencies = [ + "http", + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types", + "url", +] + [[package]] name = "k256" version = "0.13.4" @@ -3484,7 +3672,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", - "tower", + "tower 0.5.2", "tower-http", "tower-service", "url", @@ -3537,6 +3725,12 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "ruint" version = "1.17.2" @@ -3662,6 +3856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -3692,6 +3887,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework 3.5.1", + "security-framework-sys", + "webpki-root-certs 0.26.11", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -3728,6 +3950,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -3960,6 +4191,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.9" @@ -4062,6 +4304,22 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "soketto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" +dependencies = [ + "base64", + "bytes", + "futures", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", +] + [[package]] name = "spec" version = "0.1.0" @@ -4442,6 +4700,7 @@ checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -4477,6 +4736,21 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -4506,7 +4780,7 @@ dependencies = [ "http-body", "iri-string", "pin-project-lite", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -4743,6 +5017,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4859,6 +5143,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-root-certs" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" +dependencies = [ + "webpki-root-certs 1.0.5", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "1.0.4" @@ -4884,6 +5186,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -4960,6 +5271,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -4987,6 +5307,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -5020,6 +5355,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -5032,6 +5373,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -5044,6 +5391,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -5068,6 +5421,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -5080,6 +5439,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -5092,6 +5457,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -5104,6 +5475,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 0b25326..375a7b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,8 @@ members = [ "crates/catalog", "crates/template", "crates/babel", - "crates/fetcher" + "crates/fetcher", + "crates/fetcher-api" ] [workspace.dependencies] @@ -17,6 +18,7 @@ runtime-docker-compose = { path = "crates/runtime-docker-compose" } runtime-trait = { path = "crates/runtime-trait" } catalog = { path = "crates/catalog" } template = { path = "crates/template" } +fetcher-api = { path = "crates/fetcher-api" } serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9.34" @@ -31,3 +33,4 @@ tinytemplate = "1.2" askama = "0.14.0" clap = { version = "4.5", features = ["derive"] } reqwest = { version = "0.12", default-features = false } +jsonrpsee = { version = "0.24", features = ["server", "ws-client", "macros"] } diff --git a/crates/fetcher-api/Cargo.toml b/crates/fetcher-api/Cargo.toml new file mode 100644 index 0000000..aca33c7 --- /dev/null +++ b/crates/fetcher-api/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fetcher-api" +version = "0.1.0" +edition = "2021" + +[dependencies] +jsonrpsee = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +eyre = { workspace = true } +tracing = "0.1" diff --git a/crates/fetcher-api/src/lib.rs b/crates/fetcher-api/src/lib.rs new file mode 100644 index 0000000..8e8ce02 --- /dev/null +++ b/crates/fetcher-api/src/lib.rs @@ -0,0 +1,22 @@ +use jsonrpsee::proc_macros::rpc; +use serde::{Deserialize, Serialize}; + +/// Progress message sent over WebSocket +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ProgressMessage { + /// Total size is known + SetTotal { total: u64 }, + /// Progress update + Update { downloaded: u64, total: Option }, + /// Download finished + Finished, +} + +/// JSON-RPC API for fetcher progress tracking +#[rpc(server, client)] +pub trait FetcherProgressApi { + /// Subscribe to download progress updates via WebSocket + #[subscription(name = "subscribeProgress", item = ProgressMessage)] + async fn subscribe_progress(&self); +} diff --git a/crates/fetcher/Cargo.toml b/crates/fetcher/Cargo.toml index 5b9226b..2538acf 100644 --- a/crates/fetcher/Cargo.toml +++ b/crates/fetcher/Cargo.toml @@ -11,6 +11,10 @@ path = "bin/main.rs" clap = { workspace = true, features = ["derive"] } reqwest = { workspace = true, features = ["blocking", "rustls-tls"] } eyre = { workspace = true } +fetcher-api = { workspace = true } +tokio = { workspace = true } +jsonrpsee = { workspace = true } + url = "2.5" flate2 = "1.0" tar = "0.4" diff --git a/crates/fetcher/src/lib.rs b/crates/fetcher/src/lib.rs index 296b5b0..2aaa607 100644 --- a/crates/fetcher/src/lib.rs +++ b/crates/fetcher/src/lib.rs @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf}; use tar::Archive; use url::Url; +pub mod websocket; + #[derive(Debug, Clone, Copy, PartialEq)] enum ArchiveFormat { TarGz, diff --git a/crates/fetcher/src/websocket.rs b/crates/fetcher/src/websocket.rs new file mode 100644 index 0000000..396505b --- /dev/null +++ b/crates/fetcher/src/websocket.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; +use tokio::sync::broadcast; + +use fetcher_api::{FetcherProgressApiServer, ProgressMessage}; + +use crate::ProgressTracker; + +/// Progress tracker that sends updates to a WebSocket broadcast channel +pub struct WebSocketProgressTracker { + total: Option, + tx: Arc>, +} + +impl WebSocketProgressTracker { + pub fn new(tx: Arc>) -> Self { + Self { total: None, tx } + } +} + +impl ProgressTracker for WebSocketProgressTracker { + fn set_total(&mut self, total: u64) { + self.total = Some(total); + let _ = self.tx.send(ProgressMessage::SetTotal { total }); + } + + fn update(&mut self, downloaded: u64) { + let _ = self.tx.send(ProgressMessage::Update { + downloaded, + total: self.total, + }); + } + + fn finish(&mut self) { + let _ = self.tx.send(ProgressMessage::Finished); + } +} + +/// Implementation of the Fetcher Progress RPC server +pub struct FetcherProgressServer { + progress_tx: Arc>, +} + +impl FetcherProgressServer { + pub fn new(progress_tx: Arc>) -> Self { + Self { progress_tx } + } +} + +#[jsonrpsee::core::async_trait] +impl FetcherProgressApiServer for FetcherProgressServer { + async fn subscribe_progress(&self, pending: jsonrpsee::PendingSubscriptionSink) { + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::error!("Failed to accept subscription: {:?}", e); + return; + } + }; + + let mut rx = self.progress_tx.subscribe(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(msg) => { + if sink.send(jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap()).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Progress subscriber lagged by {} messages", n); + } + Err(broadcast::error::RecvError::Closed) => { + break; + } + } + } + }); + } +} + +/// Start the WebSocket progress server +pub async fn start_progress_server( + addr: std::net::SocketAddr, + progress_tx: Arc>, +) -> Result> { + use jsonrpsee::server::Server; + + let server = Server::builder().build(addr).await?; + + let progress_server = FetcherProgressServer::new(progress_tx); + let handle = server.start(progress_server.into_rpc()); + + tracing::info!("Progress WebSocket server listening on {}", addr); + + Ok(handle) +} From 002a492471fee593563dff137d1bfcbcb846a766 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Sun, 11 Jan 2026 17:21:50 +0800 Subject: [PATCH 2/3] Update stuff --- Cargo.lock | 58 +++++++++++++++++++++++---------- Cargo.toml | 2 +- crates/fetcher/Cargo.toml | 1 - crates/fetcher/README.md | 11 +++++++ crates/fetcher/bin/main.rs | 42 ++++++++++++++++++++++-- crates/fetcher/src/lib.rs | 38 +++++++++++++++++++++ crates/fetcher/src/websocket.rs | 12 +++---- 7 files changed, 137 insertions(+), 27 deletions(-) create mode 100644 crates/fetcher/README.md diff --git a/Cargo.lock b/Cargo.lock index abc8074..feaad2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,11 +971,12 @@ dependencies = [ "clap", "eyre", "runtime-docker-compose", - "runtime-trait", "serde", "serde_json", "spec", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1130,16 +1131,11 @@ name = "catalog" version = "0.1.0" dependencies = [ "askama", - "base64", - "ed25519-dalek", + "cosmos-keys", "eyre", - "hex", - "k256", - "rand 0.8.5", "reqwest", "serde", "serde_json", - "sha3", "spec", "template", "tinytemplate", @@ -1338,6 +1334,21 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cosmos-keys" +version = "0.1.0" +dependencies = [ + "base64", + "ed25519-dalek", + "eyre", + "hex", + "k256", + "rand 0.8.5", + "serde", + "serde_json", + "sha3", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1777,18 +1788,24 @@ dependencies = [ name = "fetcher" version = "0.1.0" dependencies = [ + "bytes", "clap", "eyre", "fetcher-api", "flate2", + "futures-util", "jsonrpsee", + "pin-project-lite", "reqwest", + "serde", + "serde_json", "sha2", "tar", "tokio", "tracing", "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -3672,12 +3689,14 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower 0.5.2", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", ] @@ -3774,21 +3793,12 @@ dependencies = [ "eyre", "futures-util", "include_dir", - "runtime-trait", "serde", "serde_yaml", "spec", "tinytemplate", "tokio", -] - -[[package]] -name = "runtime-trait" -version = "0.1.0" -dependencies = [ - "async-trait", - "eyre", - "spec", + "tracing", ] [[package]] @@ -4986,6 +4996,7 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ + "getrandom 0.3.4", "js-sys", "wasm-bindgen", ] @@ -5109,6 +5120,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmtimer" version = "0.4.3" diff --git a/Cargo.toml b/Cargo.toml index c43c67b..39c02e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "crates/template", "crates/babel", "crates/fetcher", - "crates/fetcher-api" + "crates/fetcher-api", "crates/cosmos-keys" ] diff --git a/crates/fetcher/Cargo.toml b/crates/fetcher/Cargo.toml index 11ec421..6fd61b4 100644 --- a/crates/fetcher/Cargo.toml +++ b/crates/fetcher/Cargo.toml @@ -17,7 +17,6 @@ bytes = "1.0" uuid = { version = "1.0", features = ["v4"] } eyre = { workspace = true } fetcher-api = { workspace = true } -tokio = { workspace = true } jsonrpsee = { workspace = true } url = "2.5" diff --git a/crates/fetcher/README.md b/crates/fetcher/README.md new file mode 100644 index 0000000..77723c8 --- /dev/null +++ b/crates/fetcher/README.md @@ -0,0 +1,11 @@ +# Fetcher + +A lightweight tool to fetch files from HTTP/HTTPS sources with progress tracking, checksum verification, and automatic extraction of tar.gz archives. + +## WebSocket Progress Server + +Enable the WebSocket server with `--ws` to stream real-time progress updates via JSON-RPC (default: `127.0.0.1:7070`). + +```bash +echo '{"jsonrpc":"2.0","method":"subscribeProgress","params":[],"id":1}' | websocat ws://127.0.0.1:7070 +``` diff --git a/crates/fetcher/bin/main.rs b/crates/fetcher/bin/main.rs index c888561..9e58ce3 100644 --- a/crates/fetcher/bin/main.rs +++ b/crates/fetcher/bin/main.rs @@ -1,7 +1,8 @@ use clap::Parser; -use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum}; +use fetcher::{ConsoleProgressTracker, fetch_with_progress, verify_checksum, MultiProgressTracker}; use std::path::PathBuf; use std::process; +use std::sync::Arc; #[derive(Parser, Debug)] #[command(name = "fetcher")] @@ -20,6 +21,18 @@ struct Args { /// Skip download if file exists and checksum matches (requires --checksum) #[arg(long, requires = "checksum")] skip_if_valid_checksum: bool, + + /// Enable WebSocket progress server + #[arg(long)] + ws: bool, + + /// Port for WebSocket server (default: 7070) + #[arg(long, default_value = "7070")] + ws_port: u16, + + /// Bind address for WebSocket server (default: 127.0.0.1) + #[arg(long, default_value = "0.0.0.0")] + ws_bind_address: String, } #[tokio::main] @@ -55,7 +68,32 @@ async fn main() { } } - let mut progress = ConsoleProgressTracker::new(); + // Build progress tracker with console, and optionally WebSocket + let mut progress = MultiProgressTracker::new().add_tracker(ConsoleProgressTracker::new()); + + // Start WebSocket server if enabled + let _ws_server_handle = if args.ws { + use fetcher::websocket::{start_progress_server, WebSocketProgressTracker}; + use fetcher_api::ProgressMessage; + use tokio::sync::broadcast; + + let addr = format!("{}:{}", args.ws_bind_address, args.ws_port) + .parse() + .expect("Invalid bind address"); + + let (tx, _rx) = broadcast::channel::(100); + let tx = Arc::new(tx); + + let handle = start_progress_server(addr, tx.clone()) + .await + .expect("Failed to start WebSocket server"); + + progress = progress.add_tracker(WebSocketProgressTracker::new(tx)); + + Some(handle) + } else { + None + }; if let Err(e) = fetch_with_progress( &args.source, diff --git a/crates/fetcher/src/lib.rs b/crates/fetcher/src/lib.rs index f64baed..c154b6f 100644 --- a/crates/fetcher/src/lib.rs +++ b/crates/fetcher/src/lib.rs @@ -97,6 +97,44 @@ impl ProgressTracker for ConsoleProgressTracker { } } +/// A progress tracker that forwards updates to multiple trackers +pub struct MultiProgressTracker { + trackers: Vec>, +} + +impl MultiProgressTracker { + pub fn new() -> Self { + Self { + trackers: Vec::new(), + } + } + + pub fn add_tracker(mut self, tracker: T) -> Self { + self.trackers.push(Box::new(tracker)); + self + } +} + +impl ProgressTracker for MultiProgressTracker { + fn set_total(&mut self, total: u64) { + for tracker in &mut self.trackers { + tracker.set_total(total); + } + } + + fn update(&mut self, downloaded: u64) { + for tracker in &mut self.trackers { + tracker.update(downloaded); + } + } + + fn finish(&mut self) { + for tracker in &mut self.trackers { + tracker.finish(); + } + } +} + pub async fn fetch(source: &str, destination: &PathBuf, checksum: Option) -> Result<()> { fetch_with_progress(source, destination, &mut NoOpProgressTracker, checksum).await } diff --git a/crates/fetcher/src/websocket.rs b/crates/fetcher/src/websocket.rs index 396505b..f9948e6 100644 --- a/crates/fetcher/src/websocket.rs +++ b/crates/fetcher/src/websocket.rs @@ -51,19 +51,18 @@ impl FetcherProgressApiServer for FetcherProgressServer { async fn subscribe_progress(&self, pending: jsonrpsee::PendingSubscriptionSink) { let sink = match pending.accept().await { Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return; - } + Err(_) => return, }; let mut rx = self.progress_tx.subscribe(); - tokio::spawn(async move { + let _handle = tokio::spawn(async move { loop { match rx.recv().await { Ok(msg) => { - if sink.send(jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap()).await.is_err() { + let subscription_msg = jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap(); + let result = sink.send(subscription_msg).await; + if result.is_err() { break; } } @@ -75,6 +74,7 @@ impl FetcherProgressApiServer for FetcherProgressServer { } } } + tracing::debug!("Progress subscription ended"); }); } } From eba2e4c9df33b28332e1a4f9893309f24216c78d Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Sun, 11 Jan 2026 17:35:06 +0800 Subject: [PATCH 3/3] Fix ws --- crates/fetcher-api/src/lib.rs | 4 ++-- crates/fetcher/src/websocket.rs | 21 +++++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/crates/fetcher-api/src/lib.rs b/crates/fetcher-api/src/lib.rs index 8e8ce02..07635ae 100644 --- a/crates/fetcher-api/src/lib.rs +++ b/crates/fetcher-api/src/lib.rs @@ -1,4 +1,4 @@ -use jsonrpsee::proc_macros::rpc; +use jsonrpsee::{core::SubscriptionResult, proc_macros::rpc}; use serde::{Deserialize, Serialize}; /// Progress message sent over WebSocket @@ -18,5 +18,5 @@ pub enum ProgressMessage { pub trait FetcherProgressApi { /// Subscribe to download progress updates via WebSocket #[subscription(name = "subscribeProgress", item = ProgressMessage)] - async fn subscribe_progress(&self); + async fn subscribe_progress(&self) -> SubscriptionResult; } diff --git a/crates/fetcher/src/websocket.rs b/crates/fetcher/src/websocket.rs index f9948e6..1a429ae 100644 --- a/crates/fetcher/src/websocket.rs +++ b/crates/fetcher/src/websocket.rs @@ -1,8 +1,8 @@ +use fetcher_api::{FetcherProgressApiServer, ProgressMessage}; +use jsonrpsee::core::SubscriptionResult; use std::sync::Arc; use tokio::sync::broadcast; -use fetcher_api::{FetcherProgressApiServer, ProgressMessage}; - use crate::ProgressTracker; /// Progress tracker that sends updates to a WebSocket broadcast channel @@ -48,19 +48,26 @@ impl FetcherProgressServer { #[jsonrpsee::core::async_trait] impl FetcherProgressApiServer for FetcherProgressServer { - async fn subscribe_progress(&self, pending: jsonrpsee::PendingSubscriptionSink) { + async fn subscribe_progress( + &self, + pending: jsonrpsee::PendingSubscriptionSink, + ) -> SubscriptionResult { let sink = match pending.accept().await { Ok(sink) => sink, - Err(_) => return, + Err(err) => { + tracing::error!("failed to accept subscription: {err}"); + return Err(err.to_string().into()); + } }; let mut rx = self.progress_tx.subscribe(); - let _handle = tokio::spawn(async move { + tokio::spawn(async move { loop { match rx.recv().await { Ok(msg) => { - let subscription_msg = jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap(); + let subscription_msg = + jsonrpsee::SubscriptionMessage::from_json(&msg).unwrap(); let result = sink.send(subscription_msg).await; if result.is_err() { break; @@ -76,6 +83,8 @@ impl FetcherProgressApiServer for FetcherProgressServer { } tracing::debug!("Progress subscription ended"); }); + + Ok(()) } }