Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 55 additions & 51 deletions examples/axum-activity-feed.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
asynk_strim::stream_fn,
asynk_strim::{Yielder, stream_fn},
axum::{
Router,
extract::Path,
response::{Html, IntoResponse, Sse},
response::{Html, IntoResponse, Sse, sse::Event},
routing::{get, post},
},
core::{convert::Infallible, error::Error, time::Duration},
Expand Down Expand Up @@ -80,36 +80,38 @@ async fn generate(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoRespon
let mut done = signals.done;

// Start the SSE stream
Sse::new(stream_fn(move |mut yielder| async move {
// Signal event generation start
let patch = PatchSignals::new(r#"{{"generating": true}}"#);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;

// Yield the events elements and signals to the stream
for _ in 1..=signals.events {
total += 1;
done += 1;
// Append a new entry to the activity feed
let elements = event_entry(&Status::Done, total, "Auto");
let patch = PatchElements::new(elements)
.selector("#feed")
.mode(ElementPatchMode::After);
Sse::new(stream_fn(
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
// Signal event generation start
let patch = PatchSignals::new(r#"{{"generating": true}}"#);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;

// Update the event counts
let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#));
yielder.yield_item(Ok(sse_event)).await;

// Yield the events elements and signals to the stream
for _ in 1..=signals.events {
total += 1;
done += 1;
// Append a new entry to the activity feed
let elements = event_entry(&Status::Done, total, "Auto");
let patch = PatchElements::new(elements)
.selector("#feed")
.mode(ElementPatchMode::After);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok(sse_event)).await;

// Update the event counts
let patch = PatchSignals::new(format!(r#"{{"total": {total}, "done": {done}}}"#));
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok(sse_event)).await;
tokio::time::sleep(Duration::from_millis(signals.interval)).await;
}

// Signal event generation end
let patch = PatchSignals::new(r#"{{"generating": false}}"#);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
tokio::time::sleep(Duration::from_millis(signals.interval)).await;
}

// Signal event generation end
let patch = PatchSignals::new(r#"{{"generating": false}}"#);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
}))
yielder.yield_item(Ok(sse_event)).await;
},
))
}

/// Creates one event with a given status
Expand All @@ -118,27 +120,29 @@ async fn event(
ReadSignals(signals): ReadSignals<Signals>,
) -> impl IntoResponse {
// Create the event stream, since we're patching both an element and a signal.
Sse::new(stream_fn(move |mut yielder| async move {
// Signal the updated event counts
let total = signals.total + 1;
let signals = match status {
Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1),
Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1),
Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1),
Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1),
};
let patch = PatchSignals::new(signals);
let sse_signal = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_signal)).await;

// Patch an element and append it to the feed
let elements = event_entry(&status, total, "Manual");
let patch = PatchElements::new(elements)
.selector("#feed")
.mode(ElementPatchMode::After);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
}))
Sse::new(stream_fn(
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
// Signal the updated event counts
let total = signals.total + 1;
let signals = match status {
Status::Done => format!(r#"{{"total": {total}, "done": {}}}"#, signals.done + 1),
Status::Warn => format!(r#"{{"total": {total}, "warn": {}}}"#, signals.warn + 1),
Status::Fail => format!(r#"{{"total": {total}, "fail": {}}}"#, signals.fail + 1),
Status::Info => format!(r#"{{"total": {total}, "info": {}}}"#, signals.info + 1),
};
let patch = PatchSignals::new(signals);
let sse_signal = patch.write_as_axum_sse_event();
yielder.yield_item(Ok(sse_signal)).await;

// Patch an element and append it to the feed
let elements = event_entry(&status, total, "Manual");
let patch = PatchElements::new(elements)
.selector("#feed")
.mode(ElementPatchMode::After);
let sse_event = patch.write_as_axum_sse_event();
yielder.yield_item(Ok(sse_event)).await;
},
))
}

/// Returns an HTML string for the entry
Expand Down
24 changes: 13 additions & 11 deletions examples/axum-hello.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use {
asynk_strim::stream_fn,
asynk_strim::{Yielder, stream_fn},
axum::{
Router,
response::{Html, IntoResponse, Sse},
response::{Html, IntoResponse, Sse, sse::Event},
routing::get,
},
core::{convert::Infallible, error::Error, time::Duration},
Expand Down Expand Up @@ -49,15 +49,17 @@ pub struct Signals {
}

async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
Sse::new(stream_fn(move |mut yielder| async move {
for i in 0..MESSAGE.len() {
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
let patch = PatchElements::new(elements);
let sse_event = patch.write_as_axum_sse_event();
Sse::new(stream_fn(
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
for i in 0..MESSAGE.len() {
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
let patch = PatchElements::new(elements);
let sse_event = patch.write_as_axum_sse_event();

yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
yielder.yield_item(Ok(sse_event)).await;

tokio::time::sleep(Duration::from_millis(signals.delay)).await;
}
}))
tokio::time::sleep(Duration::from_millis(signals.delay)).await;
}
},
))
}
48 changes: 27 additions & 21 deletions examples/axum-live-reload.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use {
asynk_strim::stream_fn,
asynk_strim::{Yielder, stream_fn},
axum::{
Router,
response::{Html, IntoResponse, Sse},
response::{Html, IntoResponse, Sse, sse::Event},
routing::get,
},
core::{convert::Infallible, error::Error, time::Duration},
Expand Down Expand Up @@ -72,17 +72,21 @@ async fn hotreload() -> impl IntoResponse {
// tracking against a date or version stored in a cookie
// or by some other means.

use asynk_strim::Yielder;
use axum::response::sse;
use datastar::prelude::ExecuteScript;
static ONCE: atomic::AtomicBool = atomic::AtomicBool::new(false);

Sse::new(stream_fn(move |mut yielder| async move {
if !ONCE.swap(true, atomic::Ordering::SeqCst) {
let script = ExecuteScript::new("window.location.reload()");
let sse_event = script.write_as_axum_sse_event();
yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;
}
std::future::pending().await
}))
Sse::new(stream_fn(
|mut yielder: Yielder<Result<sse::Event, Infallible>>| async move {
if !ONCE.swap(true, atomic::Ordering::SeqCst) {
let script = ExecuteScript::new("window.location.reload()");
let sse_event = script.write_as_axum_sse_event();
yielder.yield_item(Ok(sse_event)).await;
}
std::future::pending().await
},
))
}

const MESSAGE: &str = "Hello, world!";
Expand All @@ -93,15 +97,17 @@ pub struct Signals {
}

async fn hello_world(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoResponse {
Sse::new(stream_fn(move |mut yielder| async move {
for i in 0..MESSAGE.len() {
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
let patch = PatchElements::new(elements);
let sse_event = patch.write_as_axum_sse_event();

yielder.yield_item(Ok::<_, Infallible>(sse_event)).await;

tokio::time::sleep(Duration::from_millis(signals.delay)).await;
}
}))
Sse::new(stream_fn(
move |mut yielder: Yielder<Result<Event, Infallible>>| async move {
for i in 0..MESSAGE.len() {
let elements = format!("<div id='message'>{}</div>", &MESSAGE[0..i + 1]);
let patch = PatchElements::new(elements);
let sse_event = patch.write_as_axum_sse_event();

yielder.yield_item(Ok(sse_event)).await;

tokio::time::sleep(Duration::from_millis(signals.delay)).await;
}
},
))
}
Loading