From 300e15455091dcc9dbd30fce89c1718ea35c1f4f Mon Sep 17 00:00:00 2001 From: jtornert Date: Wed, 10 Sep 2025 14:00:11 +0200 Subject: [PATCH] add yielder type to axum examples --- examples/axum-activity-feed.rs | 106 ++++++++++---------- examples/axum-hello.rs | 24 ++--- examples/axum-live-reload.rs | 48 ++++++---- examples/axum-test-suite.rs | 170 +++++++++++++++++---------------- 4 files changed, 181 insertions(+), 167 deletions(-) diff --git a/examples/axum-activity-feed.rs b/examples/axum-activity-feed.rs index 919efdb..d419dfe 100644 --- a/examples/axum-activity-feed.rs +++ b/examples/axum-activity-feed.rs @@ -1,9 +1,9 @@ use { - asynk_strim::stream_fn, + asynk_strim::{Yielder, stream_fn}, axum::{ Router, extract::Path, - response::{Html, IntoResponse, Sse}, + response::{Html, IntoResponse, Sse, sse::Event}, routing::{get, post}, }, core::{convert::Infallible, error::Error, time::Duration}, @@ -80,36 +80,38 @@ async fn generate(ReadSignals(signals): ReadSignals) -> impl IntoRespon let mut done = signals.done; // Start the SSE stream - Sse::new(stream_fn(move |mut yielder| async move { - // Signal event generation start - let patch = PatchSignals::new(r#"{{"generating": true}}"#); - let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - - // Yield the events elements and signals to the stream - for _ in 1..=signals.events { - total += 1; - done += 1; - // Append a new entry to the activity feed - let elements = event_entry(&Status::Done, total, "Auto"); - let patch = PatchElements::new(elements) - .selector("#feed") - .mode(ElementPatchMode::After); + Sse::new(stream_fn( + move |mut yielder: Yielder>| async move { + // Signal event generation start + let patch = PatchSignals::new(r#"{{"generating": true}}"#); let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - - // Update the event counts - let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#)); + yielder.yield_item(Ok(sse_event)).await; + + // Yield the events elements and signals to the stream + for _ in 1..=signals.events { + total += 1; + done += 1; + // Append a new entry to the activity feed + let elements = event_entry(&Status::Done, total, "Auto"); + let patch = PatchElements::new(elements) + .selector("#feed") + .mode(ElementPatchMode::After); + let sse_event = patch.write_as_axum_sse_event(); + yielder.yield_item(Ok(sse_event)).await; + + // Update the event counts + let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#)); + let sse_event = patch.write_as_axum_sse_event(); + yielder.yield_item(Ok(sse_event)).await; + tokio::time::sleep(Duration::from_millis(signals.interval)).await; + } + + // Signal event generation end + let patch = PatchSignals::new(r#"{{"generating": false}}"#); let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - tokio::time::sleep(Duration::from_millis(signals.interval)).await; - } - - // Signal event generation end - let patch = PatchSignals::new(r#"{{"generating": false}}"#); - let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - })) + yielder.yield_item(Ok(sse_event)).await; + }, + )) } /// Creates one event with a given status @@ -118,27 +120,29 @@ async fn event( ReadSignals(signals): ReadSignals, ) -> impl IntoResponse { // Create the event stream, since we're patching both an element and a signal. - Sse::new(stream_fn(move |mut yielder| async move { - // Signal the updated event counts - let total = signals.total + 1; - let signals = match status { - Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1), - Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1), - Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1), - Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1), - }; - let patch = PatchSignals::new(signals); - let sse_signal = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_signal)).await; - - // Patch an element and append it to the feed - let elements = event_entry(&status, total, "Manual"); - let patch = PatchElements::new(elements) - .selector("#feed") - .mode(ElementPatchMode::After); - let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - })) + Sse::new(stream_fn( + move |mut yielder: Yielder>| async move { + // Signal the updated event counts + let total = signals.total + 1; + let signals = match status { + Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1), + Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1), + Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1), + Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1), + }; + let patch = PatchSignals::new(signals); + let sse_signal = patch.write_as_axum_sse_event(); + yielder.yield_item(Ok(sse_signal)).await; + + // Patch an element and append it to the feed + let elements = event_entry(&status, total, "Manual"); + let patch = PatchElements::new(elements) + .selector("#feed") + .mode(ElementPatchMode::After); + let sse_event = patch.write_as_axum_sse_event(); + yielder.yield_item(Ok(sse_event)).await; + }, + )) } /// Returns an HTML string for the entry diff --git a/examples/axum-hello.rs b/examples/axum-hello.rs index 40a6057..a300d31 100644 --- a/examples/axum-hello.rs +++ b/examples/axum-hello.rs @@ -1,8 +1,8 @@ use { - asynk_strim::stream_fn, + asynk_strim::{Yielder, stream_fn}, axum::{ Router, - response::{Html, IntoResponse, Sse}, + response::{Html, IntoResponse, Sse, sse::Event}, routing::get, }, core::{convert::Infallible, error::Error, time::Duration}, @@ -49,15 +49,17 @@ pub struct Signals { } async fn hello_world(ReadSignals(signals): ReadSignals) -> impl IntoResponse { - Sse::new(stream_fn(move |mut yielder| async move { - for i in 0..MESSAGE.len() { - let elements = format!("
{}
", &MESSAGE[0..i + 1]); - let patch = PatchElements::new(elements); - let sse_event = patch.write_as_axum_sse_event(); + Sse::new(stream_fn( + move |mut yielder: Yielder>| async move { + for i in 0..MESSAGE.len() { + let elements = format!("
{}
", &MESSAGE[0..i + 1]); + let patch = PatchElements::new(elements); + let sse_event = patch.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; + yielder.yield_item(Ok(sse_event)).await; - tokio::time::sleep(Duration::from_millis(signals.delay)).await; - } - })) + tokio::time::sleep(Duration::from_millis(signals.delay)).await; + } + }, + )) } diff --git a/examples/axum-live-reload.rs b/examples/axum-live-reload.rs index acdcd7f..36f53a7 100644 --- a/examples/axum-live-reload.rs +++ b/examples/axum-live-reload.rs @@ -1,8 +1,8 @@ use { - asynk_strim::stream_fn, + asynk_strim::{Yielder, stream_fn}, axum::{ Router, - response::{Html, IntoResponse, Sse}, + response::{Html, IntoResponse, Sse, sse::Event}, routing::get, }, core::{convert::Infallible, error::Error, time::Duration}, @@ -72,17 +72,21 @@ async fn hotreload() -> impl IntoResponse { // tracking against a date or version stored in a cookie // or by some other means. + use asynk_strim::Yielder; + use axum::response::sse; use datastar::prelude::ExecuteScript; static ONCE: atomic::AtomicBool = atomic::AtomicBool::new(false); - Sse::new(stream_fn(move |mut yielder| async move { - if !ONCE.swap(true, atomic::Ordering::SeqCst) { - let script = ExecuteScript::new("window.location.reload()"); - let sse_event = script.write_as_axum_sse_event(); - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - } - std::future::pending().await - })) + Sse::new(stream_fn( + |mut yielder: Yielder>| async move { + if !ONCE.swap(true, atomic::Ordering::SeqCst) { + let script = ExecuteScript::new("window.location.reload()"); + let sse_event = script.write_as_axum_sse_event(); + yielder.yield_item(Ok(sse_event)).await; + } + std::future::pending().await + }, + )) } const MESSAGE: &str = "Hello, world!"; @@ -93,15 +97,17 @@ pub struct Signals { } async fn hello_world(ReadSignals(signals): ReadSignals) -> impl IntoResponse { - Sse::new(stream_fn(move |mut yielder| async move { - for i in 0..MESSAGE.len() { - let elements = format!("
{}
", &MESSAGE[0..i + 1]); - let patch = PatchElements::new(elements); - let sse_event = patch.write_as_axum_sse_event(); - - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - - tokio::time::sleep(Duration::from_millis(signals.delay)).await; - } - })) + Sse::new(stream_fn( + move |mut yielder: Yielder>| async move { + for i in 0..MESSAGE.len() { + let elements = format!("
{}
", &MESSAGE[0..i + 1]); + let patch = PatchElements::new(elements); + let sse_event = patch.write_as_axum_sse_event(); + + yielder.yield_item(Ok(sse_event)).await; + + tokio::time::sleep(Duration::from_millis(signals.delay)).await; + } + }, + )) } diff --git a/examples/axum-test-suite.rs b/examples/axum-test-suite.rs index 77e9568..0141b2e 100644 --- a/examples/axum-test-suite.rs +++ b/examples/axum-test-suite.rs @@ -1,8 +1,8 @@ use { - asynk_strim::stream_fn, + asynk_strim::{Yielder, stream_fn}, axum::{ Router, - response::{IntoResponse, Sse}, + response::{IntoResponse, Sse, sse::Event}, routing::{MethodFilter, on}, }, core::{convert::Infallible, error::Error, time::Duration}, @@ -87,87 +87,89 @@ pub enum TestCaseEvent { } async fn test(ReadSignals(test_case): ReadSignals) -> impl IntoResponse { - Sse::new(stream_fn(move |mut yielder| async move { - for event in test_case.events { - let sse_event = match event { - TestCaseEvent::ExecuteScript { - script, - event_id, - retry_duration, - attributes, - auto_remove, - } => ExecuteScript { - script, - id: event_id, - retry: Duration::from_millis( - retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), - ), - auto_remove, - attributes: attributes - .map(|attributes| { - attributes - .into_iter() - .map(|(key, value)| { - format!("{key}=\"{}\"", value.to_string().trim_matches('"')) - }) - .collect() - }) - .unwrap_or_default(), - } - .into_datastar_event() - .write_as_axum_sse_event(), - TestCaseEvent::PatchElements { - elements, - event_id, - retry_duration, - mode, - selector, - use_view_transition, - } => PatchElements { - id: event_id, - retry: Duration::from_millis( - retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), - ), - elements, - selector, - mode: match mode.as_deref().unwrap_or_default() { - "outer" => consts::ElementPatchMode::Outer, - "inner" => consts::ElementPatchMode::Inner, - "remove" => consts::ElementPatchMode::Remove, - "replace" => consts::ElementPatchMode::Replace, - "prepend" => consts::ElementPatchMode::Prepend, - "append" => consts::ElementPatchMode::Append, - "before" => consts::ElementPatchMode::Before, - "after" => consts::ElementPatchMode::After, - _ => consts::ElementPatchMode::Outer, - }, - use_view_transition: use_view_transition.unwrap_or_default(), - } - .into_datastar_event() - .write_as_axum_sse_event(), - TestCaseEvent::PatchSignals { - signals, - signals_raw, - event_id, - retry_duration, - only_if_missing, - } => PatchSignals { - id: event_id, - retry: Duration::from_millis( - retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), - ), - signals: signals_raw.unwrap_or_else(|| { - signals - .map(|s| serde_json::to_string(&s).unwrap_or_default()) - .unwrap_or_default() - }), - only_if_missing: only_if_missing.unwrap_or_default(), - } - .into_datastar_event() - .write_as_axum_sse_event(), - }; + Sse::new(stream_fn( + |mut yielder: Yielder>| async move { + for event in test_case.events { + let sse_event = match event { + TestCaseEvent::ExecuteScript { + script, + event_id, + retry_duration, + attributes, + auto_remove, + } => ExecuteScript { + script, + id: event_id, + retry: Duration::from_millis( + retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), + ), + auto_remove, + attributes: attributes + .map(|attributes| { + attributes + .into_iter() + .map(|(key, value)| { + format!("{key}=\"{}\"", value.to_string().trim_matches('"')) + }) + .collect() + }) + .unwrap_or_default(), + } + .into_datastar_event() + .write_as_axum_sse_event(), + TestCaseEvent::PatchElements { + elements, + event_id, + retry_duration, + mode, + selector, + use_view_transition, + } => PatchElements { + id: event_id, + retry: Duration::from_millis( + retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), + ), + elements, + selector, + mode: match mode.as_deref().unwrap_or_default() { + "outer" => consts::ElementPatchMode::Outer, + "inner" => consts::ElementPatchMode::Inner, + "remove" => consts::ElementPatchMode::Remove, + "replace" => consts::ElementPatchMode::Replace, + "prepend" => consts::ElementPatchMode::Prepend, + "append" => consts::ElementPatchMode::Append, + "before" => consts::ElementPatchMode::Before, + "after" => consts::ElementPatchMode::After, + _ => consts::ElementPatchMode::Outer, + }, + use_view_transition: use_view_transition.unwrap_or_default(), + } + .into_datastar_event() + .write_as_axum_sse_event(), + TestCaseEvent::PatchSignals { + signals, + signals_raw, + event_id, + retry_duration, + only_if_missing, + } => PatchSignals { + id: event_id, + retry: Duration::from_millis( + retry_duration.unwrap_or(consts::DEFAULT_SSE_RETRY_DURATION), + ), + signals: signals_raw.unwrap_or_else(|| { + signals + .map(|s| serde_json::to_string(&s).unwrap_or_default()) + .unwrap_or_default() + }), + only_if_missing: only_if_missing.unwrap_or_default(), + } + .into_datastar_event() + .write_as_axum_sse_event(), + }; - yielder.yield_item(Ok::<_, Infallible>(sse_event)).await; - } - })) + yielder.yield_item(Ok(sse_event)).await; + } + }, + )) }