From 45c284d43f051c4d76a10841d5212581bd796009 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Tue, 27 Jan 2026 18:14:18 -0600 Subject: [PATCH 1/4] feat: add Workflows support Add bindings for Workflows including step primitives (do, sleep, sleepUntil), event/instance management, and a proc macro for defining workflow entrypoints. Includes example worker, sys-level bindings, and integration tests. --- Cargo.lock | 9 + Cargo.toml | 2 +- examples/workflow/Cargo.toml | 15 + examples/workflow/src/lib.rs | 154 +++++++++ examples/workflow/wrangler.toml | 13 + rust-toolchain.toml | 2 +- test/src/lib.rs | 1 + test/src/router.rs | 4 +- test/src/workflow.rs | 74 +++++ test/tests/mf.ts | 21 ++ test/tests/workflow.spec.ts | 37 +++ test/wrangler.toml | 5 + worker-build/src/main.rs | 100 ++++-- worker-macros/Cargo.toml | 1 + worker-macros/src/lib.rs | 35 ++ worker-macros/src/workflow.rs | 79 +++++ worker-sys/Cargo.toml | 1 + worker-sys/src/types.rs | 4 + worker-sys/src/types/workflow.rs | 87 +++++ worker/Cargo.toml | 1 + worker/src/env.rs | 8 + worker/src/lib.rs | 8 + worker/src/workflow.rs | 526 +++++++++++++++++++++++++++++++ 23 files changed, 1155 insertions(+), 32 deletions(-) create mode 100644 examples/workflow/Cargo.toml create mode 100644 examples/workflow/src/lib.rs create mode 100644 examples/workflow/wrangler.toml create mode 100644 test/src/workflow.rs create mode 100644 test/tests/workflow.spec.ts create mode 100644 worker-macros/src/workflow.rs create mode 100644 worker-sys/src/types/workflow.rs create mode 100644 worker/src/workflow.rs diff --git a/Cargo.lock b/Cargo.lock index e4372f752..45cfd40ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3736,6 +3736,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "workflow-example" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "worker 0.7.4", +] + [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index 4763782ef..0c458d12d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ web-sys = { version = "0.3.85", features = [ "WritableStream", "WritableStreamDefaultWriter", ] } -worker = { version = "0.7.4", path = "worker", features = ["queue", "d1", "axum", "timezone"] } +worker = { version = "0.7.4", path = "worker", features = ["queue", "d1", "axum", "timezone", "workflow"] } worker-codegen = { path = "worker-codegen", version = "0.2.0" } worker-macros = { version = "0.7.4", path = "worker-macros", features = ["queue"] } worker-sys = { version = "0.7.4", path = "worker-sys", features = ["d1", "queue"] } diff --git a/examples/workflow/Cargo.toml b/examples/workflow/Cargo.toml new file mode 100644 index 000000000..41e4881b3 --- /dev/null +++ b/examples/workflow/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "workflow-example" +version = "0.1.0" +edition = "2021" + +[package.metadata.release] +release = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" +worker = { path = "../../worker", features = ["workflow"] } diff --git a/examples/workflow/src/lib.rs b/examples/workflow/src/lib.rs new file mode 100644 index 000000000..82a9e22c6 --- /dev/null +++ b/examples/workflow/src/lib.rs @@ -0,0 +1,154 @@ +use serde::{Deserialize, Serialize}; +use worker::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MyParams { + pub email: String, + pub name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MyOutput { + pub message: String, + pub steps_completed: u32, +} + +#[workflow] +pub struct MyWorkflow { + #[allow(dead_code)] + env: Env, +} + +impl WorkflowEntrypoint for MyWorkflow { + fn new(_ctx: Context, env: Env) -> Self { + Self { env } + } + + async fn run( + &self, + event: WorkflowEvent, + step: WorkflowStep, + ) -> Result { + console_log!("Workflow started with instance ID: {}", event.instance_id); + + let params: MyParams = + serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?; + + let name_for_step1 = params.name.clone(); + let step1_result = step + .do_("initial-processing", move || async move { + console_log!("Processing for user: {}", name_for_step1); + Ok(serde_json::json!({ + "processed": true, + "user": name_for_step1 + })) + }) + .await?; + + console_log!("Step 1 completed: {:?}", step1_result); + + console_log!("Step 2: Sleeping for 10 seconds..."); + step.sleep("wait-for-processing", "10 seconds").await?; + + let email_for_step3 = params.email.clone(); + let notification_result = step + .do_with_config( + "send-notification", + StepConfig { + retries: Some(RetryConfig { + limit: 3, + delay: "5 seconds".to_string(), + backoff: Some(Backoff::Exponential), + }), + timeout: Some("1 minute".to_string()), + }, + move || async move { + console_log!("Sending notification to: {}", email_for_step3); + Ok(serde_json::json!({ + "notification_sent": true, + "email": email_for_step3 + })) + }, + ) + .await?; + + console_log!("Step 3 completed: {:?}", notification_result); + + let output = MyOutput { + message: format!("Workflow completed for {}", params.name), + steps_completed: 3, + }; + + Ok(serde_json::to_value(output).unwrap()) + } +} + +#[event(fetch)] +async fn fetch(req: Request, env: Env, _ctx: Context) -> Result { + let url = req.url()?; + let path = url.path(); + let workflow = env.workflow("MY_WORKFLOW")?; + + match (req.method(), path) { + (Method::Post, "/workflow") => { + let params = MyParams { + email: "user@example.com".to_string(), + name: "Test User".to_string(), + }; + + let instance = workflow + .create(Some(CreateOptions { + id: None, + params: Some(params), + retention: None, + })) + .await?; + + Response::from_json(&serde_json::json!({ + "id": instance.id()?, + "message": "Workflow created" + })) + } + + (Method::Get, path) if path.starts_with("/workflow/") => { + let id = path.trim_start_matches("/workflow/"); + let instance = workflow.get(id).await?; + let status = instance.status().await?; + + Response::from_json(&serde_json::json!({ + "id": instance.id()?, + "status": format!("{:?}", status.status), + "error": status.error, + "output": status.output + })) + } + + (Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/pause") => { + let id = path + .trim_start_matches("/workflow/") + .trim_end_matches("/pause"); + let instance = workflow.get(id).await?; + instance.pause().await?; + + Response::from_json(&serde_json::json!({ + "id": instance.id()?, + "message": "Workflow paused" + })) + } + + (Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/resume") => { + let id = path + .trim_start_matches("/workflow/") + .trim_end_matches("/resume"); + let instance = workflow.get(id).await?; + instance.resume().await?; + + Response::from_json(&serde_json::json!({ + "id": instance.id()?, + "message": "Workflow resumed" + })) + } + + _ => Response::error("Not Found", 404), + } +} diff --git a/examples/workflow/wrangler.toml b/examples/workflow/wrangler.toml new file mode 100644 index 000000000..b20e6ef7c --- /dev/null +++ b/examples/workflow/wrangler.toml @@ -0,0 +1,13 @@ +name = "workflow-example" +main = "build/worker/shim.mjs" +compatibility_date = "2024-10-22" + +[build] +# For development: use local worker-build binary +# For production: command = "cargo install -q worker-build && worker-build --release" +command = "RUSTFLAGS='--cfg=web_sys_unstable_apis' ../../target/release/worker-build --release" + +[[workflows]] +name = "my-workflow" +binding = "MY_WORKFLOW" +class_name = "MyWorkflow" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 291696d0e..ab40f4f44 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.87.0" +channel = "1.88.0" profile = "default" diff --git a/test/src/lib.rs b/test/src/lib.rs index 4f28ca705..d32e50a08 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -37,6 +37,7 @@ mod socket; mod sql_counter; mod sql_iterator; mod user; +mod workflow; mod ws; #[derive(Deserialize, Serialize)] diff --git a/test/src/router.rs b/test/src/router.rs index 4ca07033e..8dd936375 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -1,7 +1,7 @@ use crate::{ alarm, analytics_engine, assets, auto_response, cache, container, counter, d1, durable, fetch, form, js_snippets, kv, put_raw, queue, r2, rate_limit, request, secret_store, service, socket, - sql_counter, sql_iterator, user, ws, SomeSharedData, GLOBAL_STATE, + sql_counter, sql_iterator, user, workflow, ws, SomeSharedData, GLOBAL_STATE, }; #[cfg(feature = "http")] use std::convert::TryInto; @@ -234,6 +234,8 @@ macro_rules! add_routes ( add_route!($obj, get, format_route!("/rate-limit/key/{}", "key"), rate_limit::handle_rate_limit_with_key); add_route!($obj, get, "/rate-limit/bulk-test", rate_limit::handle_rate_limit_bulk_test); add_route!($obj, get, "/rate-limit/reset", rate_limit::handle_rate_limit_reset); + add_route!($obj, post, "/workflow/create", workflow::handle_workflow_create); + add_route!($obj, get, format_route!("/workflow/status/{}", "id"), workflow::handle_workflow_status); }); #[cfg(feature = "http")] diff --git a/test/src/workflow.rs b/test/src/workflow.rs new file mode 100644 index 000000000..b1cc2b32c --- /dev/null +++ b/test/src/workflow.rs @@ -0,0 +1,74 @@ +use serde::{Deserialize, Serialize}; +use worker::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TestParams { + pub value: String, +} + +#[workflow] +pub struct TestWorkflow { + #[allow(dead_code)] + env: Env, +} + +impl WorkflowEntrypoint for TestWorkflow { + fn new(_ctx: Context, env: Env) -> Self { + Self { env } + } + + async fn run( + &self, + event: WorkflowEvent, + step: WorkflowStep, + ) -> Result { + let params: TestParams = + serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?; + + let result = step + .do_("process", move || async move { + Ok(serde_json::json!({ "processed": params.value })) + }) + .await?; + + Ok(result) + } +} + +pub async fn handle_workflow_create( + _req: Request, + env: Env, + _data: crate::SomeSharedData, +) -> Result { + let workflow = env.workflow("TEST_WORKFLOW")?; + let params = TestParams { + value: "hello".to_string(), + }; + let instance = workflow + .create(Some(CreateOptions { + params: Some(params), + ..Default::default() + })) + .await?; + + Response::from_json(&serde_json::json!({ "id": instance.id()? })) +} + +pub async fn handle_workflow_status( + req: Request, + env: Env, + _data: crate::SomeSharedData, +) -> Result { + let url = req.url()?; + let path = url.path(); + let id = path.trim_start_matches("/workflow/status/"); + let workflow = env.workflow("TEST_WORKFLOW")?; + let instance = workflow.get(id).await?; + let status = instance.status().await?; + + Response::from_json(&serde_json::json!({ + "status": format!("{:?}", status.status), + "output": status.output, + "error": status.error, + })) +} diff --git a/test/tests/mf.ts b/test/tests/mf.ts index 7a63c7d39..26ba3afe7 100644 --- a/test/tests/mf.ts +++ b/test/tests/mf.ts @@ -37,6 +37,7 @@ const mf_instance = new Miniflare({ kvPersist: false, r2Persist: false, cachePersist: false, + workflowsPersist: false, workers: [ { scriptPath: "./build/index.js", @@ -113,6 +114,15 @@ const mf_instance = new Miniflare({ scriptName: "mini-analytics-engine" // mock out analytics engine binding to the "mini-analytics-engine" worker } }, + // Workflow binding requires a separate worker via scriptName in + // the Miniflare JS API (wrangler dev handles this automatically). + workflows: { + TEST_WORKFLOW: { + name: "test-workflow", + className: "TestWorkflow", + scriptName: "workflow-worker", + }, + }, ratelimits: { TEST_RATE_LIMITER: { simple: { @@ -122,6 +132,17 @@ const mf_instance = new Miniflare({ } } }, + { + // Dedicated worker for TestWorkflow; uses the generated JS class wrapper. + name: "workflow-worker", + scriptPath: "./build/worker/shim.mjs", + modules: true, + modulesRules: [ + { type: "ESModule", include: ["**/*.js"], fallthrough: true }, + { type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true }, + ], + compatibilityDate: "2025-07-24", + }, { name: "mini-analytics-engine", modules: true, diff --git a/test/tests/workflow.spec.ts b/test/tests/workflow.spec.ts new file mode 100644 index 000000000..9e7cce89e --- /dev/null +++ b/test/tests/workflow.spec.ts @@ -0,0 +1,37 @@ +import { describe, test, expect } from "vitest"; +import { mf, mfUrl } from "./mf"; + +describe("workflow", () => { + test("create and poll status until completion", async () => { + const createResp = await mf.dispatchFetch(`${mfUrl}workflow/create`, { + method: "POST", + }); + expect(createResp.status).toBe(200); + const { id } = (await createResp.json()) as { id: string }; + expect(id).toBeDefined(); + expect(typeof id).toBe("string"); + + let status: string | undefined; + let output: unknown; + for (let i = 0; i < 30; i++) { + await new Promise((resolve) => setTimeout(resolve, 500)); + const statusResp = await mf.dispatchFetch( + `${mfUrl}workflow/status/${id}` + ); + expect(statusResp.status).toBe(200); + const body = (await statusResp.json()) as { + status: string; + output: unknown; + error: unknown; + }; + status = body.status; + output = body.output; + if (status === "Complete" || status === "Errored") { + break; + } + } + + expect(status).toBe("Complete"); + expect(output).toEqual({ processed: "hello" }); + }); +}); diff --git a/test/wrangler.toml b/test/wrangler.toml index 0729a2b90..85f141ab6 100644 --- a/test/wrangler.toml +++ b/test/wrangler.toml @@ -85,6 +85,11 @@ class_name = "EchoContainer" image = "./container-echo/Dockerfile" max_instances = 1 +[[workflows]] +name = "test-workflow" +binding = "TEST_WORKFLOW" +class_name = "TestWorkflow" + [[ratelimits]] name = "TEST_RATE_LIMITER" namespace_id = "1" diff --git a/worker-build/src/main.rs b/worker-build/src/main.rs index 597a463f1..80f222409 100644 --- a/worker-build/src/main.rs +++ b/worker-build/src/main.rs @@ -119,7 +119,7 @@ pub fn main() -> Result<()> { ); fs::write(output_path(&out_dir, "shim.js"), shim)?; - add_export_wrappers(&out_dir)?; + let has_workflows = add_export_wrappers(&out_dir)?; update_package_json(&out_dir)?; @@ -130,7 +130,9 @@ pub fn main() -> Result<()> { remove_unused_files(&out_dir)?; - create_wrapper_alias(&out_dir, false)?; + if !has_workflows { + create_wrapper_alias(&out_dir, false)?; + } } else { main_legacy::process(&out_dir)?; create_wrapper_alias(&out_dir, true)?; @@ -198,28 +200,83 @@ fn generate_handlers(out_dir: &Path) -> Result { static SYSTEM_FNS: &[&str] = &["__wbg_reset_state", "setPanicHook"]; -fn add_export_wrappers(out_dir: &Path) -> Result<()> { +/// Returns true if workflow classes were detected and a wrapper was generated +fn add_export_wrappers(out_dir: &Path) -> Result { let index_path = output_path(out_dir, "index.js"); let content = fs::read_to_string(&index_path)?; let mut class_names = Vec::new(); + let mut workflow_classes = Vec::new(); for line in content.lines() { if let Some(rest) = line.strip_prefix("export class ") { if let Some(brace_pos) = rest.find("{") { - let class_name = rest[..brace_pos].trim(); - class_names.push(class_name.to_string()); + class_names.push(rest[..brace_pos].trim().to_string()); + } + } else if let Some(rest) = line.strip_prefix("export function __wf_") { + if let Some(paren_pos) = rest.find('(') { + workflow_classes.push(rest[..paren_pos].trim().to_string()); + } + } else if let Some(rest) = line.strip_prefix("export { __wf_") { + if let Some(as_pos) = rest.find(" as ") { + workflow_classes.push(rest[..as_pos].trim().to_string()); } } } let shim_path = output_path(out_dir, "shim.js"); let mut output = fs::read_to_string(&shim_path)?; - for class_name in class_names { - output.push_str(&format!( - "export const {class_name} = new Proxy(exports.{class_name}, classProxyHooks);\n" - )); + + for class_name in &class_names { + if workflow_classes.contains(class_name) { + output.push_str(&format!( + "export const {class_name} = exports.{class_name};\n" + )); + } else { + output.push_str(&format!( + "export const {class_name} = new Proxy(exports.{class_name}, classProxyHooks);\n" + )); + } } fs::write(&shim_path, output)?; + + // Workflows need a JS wrapper that extends WorkflowEntrypoint from cloudflare:workers + let has_workflows = !workflow_classes.is_empty(); + if has_workflows { + generate_workflow_wrapper(out_dir, &workflow_classes)?; + } + + Ok(has_workflows) +} + +fn generate_workflow_wrapper(out_dir: &Path, workflow_classes: &[String]) -> Result<()> { + let mut wrapper = String::from( + r#"import { WorkflowEntrypoint } from "cloudflare:workers"; +import * as wasm from "../index.js"; +export * from "../index.js"; +export { default } from "../index.js"; + +"#, + ); + + for class_name in workflow_classes { + wrapper.push_str(&format!( + r#"export class {class_name} extends WorkflowEntrypoint {{ + constructor(ctx, env) {{ + super(ctx, env); + this.inner = new wasm.{class_name}(ctx, env); + }} + async run(event, step) {{ + return await this.inner.run(event, step); + }} +}} + +"# + )); + } + + fs::create_dir_all(output_path(out_dir, "worker"))?; + fs::write(output_path(out_dir, "worker/shim.mjs"), wrapper)?; + Ok(()) } @@ -289,29 +346,14 @@ fn wasm_coredump(out_dir: &Path) -> Result<()> { } fn create_wrapper_alias(out_dir: &Path, legacy: bool) -> Result<()> { - let msg = if !legacy { - "// Use index.js directly, this file provided for backwards compat -// with former shim.mjs only. -" - } else { - "" - }; - let path = if !legacy { - "../index.js" + if legacy { + let shim_content = + "export * from './worker/shim.mjs';\nexport { default } from './worker/shim.mjs';\n"; + fs::write(output_path(out_dir, "index.js"), shim_content)?; } else { - "./worker/shim.mjs" - }; - let shim_content = format!( - "{msg}export * from '{path}'; -export {{ default }} from '{path}'; -" - ); - - if !legacy { + let shim_content = "// Use index.js directly, this file provided for backwards compat\n// with former shim.mjs only.\nexport * from '../index.js';\nexport { default } from '../index.js';\n"; fs::create_dir_all(output_path(out_dir, "worker"))?; fs::write(output_path(out_dir, "worker/shim.mjs"), shim_content)?; - } else { - fs::write(output_path(out_dir, "index.js"), shim_content)?; } Ok(()) } diff --git a/worker-macros/Cargo.toml b/worker-macros/Cargo.toml index 4628974b4..668088fa9 100644 --- a/worker-macros/Cargo.toml +++ b/worker-macros/Cargo.toml @@ -24,3 +24,4 @@ quote.workspace = true [features] queue = [] http = [] +workflow = [] diff --git a/worker-macros/src/lib.rs b/worker-macros/src/lib.rs index c6f2cd376..ed3bb9ff9 100644 --- a/worker-macros/src/lib.rs +++ b/worker-macros/src/lib.rs @@ -1,6 +1,8 @@ mod durable_object; mod event; mod send; +#[cfg(feature = "workflow")] +mod workflow; use proc_macro::TokenStream; @@ -141,3 +143,36 @@ pub fn send(attr: TokenStream, stream: TokenStream) -> TokenStream { pub fn consume(_: TokenStream, _: TokenStream) -> TokenStream { TokenStream::new() } + +/// Integrate the struct with the Workers Runtime as a Workflow Entrypoint. +/// Requires the `WorkflowEntrypoint` trait with the workflow attribute macro on the struct. +/// +/// ## Example +/// +/// ```rust,ignore +/// #[workflow] +/// pub struct MyWorkflow { +/// env: Env, +/// } +/// +/// impl WorkflowEntrypoint for MyWorkflow { +/// fn new(ctx: Context, env: Env) -> Self { +/// Self { env } +/// } +/// +/// async fn run(&self, event: WorkflowEvent, step: WorkflowStep) -> Result { +/// let result = step.do_("my step", || async { +/// Ok(serde_json::json!({"data": "value"})) +/// }).await?; +/// +/// Ok(result) +/// } +/// } +/// ``` +#[cfg(feature = "workflow")] +#[proc_macro_attribute] +pub fn workflow(_attr: TokenStream, item: TokenStream) -> TokenStream { + workflow::expand_macro(item.into()) + .unwrap_or_else(syn::Error::into_compile_error) + .into() +} diff --git a/worker-macros/src/workflow.rs b/worker-macros/src/workflow.rs new file mode 100644 index 000000000..45fbea7ef --- /dev/null +++ b/worker-macros/src/workflow.rs @@ -0,0 +1,79 @@ +use proc_macro2::TokenStream; +use quote::{format_ident, quote}; +use syn::{Error, ItemImpl, ItemStruct}; + +pub fn expand_macro(tokens: TokenStream) -> syn::Result { + if syn::parse2::(tokens.clone()).is_ok() { + return Err(Error::new( + proc_macro2::Span::call_site(), + "#[workflow] should only be applied to struct definitions, not impl blocks", + )); + } + + let target = syn::parse2::(tokens)?; + let target_name = &target.ident; + let marker_fn_name = format_ident!("__wf_{}", target_name); + let marker_js_name = format!("__wf_{}", target_name); + let target_name_str = target_name.to_string(); + + Ok(quote! { + #target + + impl ::worker::HasWorkflowAttribute for #target_name {} + + const _: () = { + use ::worker::wasm_bindgen::prelude::*; + #[allow(unused_imports)] + use ::worker::WorkflowEntrypoint; + + #[allow(non_snake_case)] + #[wasm_bindgen(js_name = #marker_js_name, wasm_bindgen=::worker::wasm_bindgen)] + pub fn #marker_fn_name() -> ::worker::js_sys::JsString { + ::worker::js_sys::JsString::from(#target_name_str) + } + + #[wasm_bindgen(wasm_bindgen=::worker::wasm_bindgen)] + #[::worker::consume] + #target + + #[wasm_bindgen(wasm_bindgen=::worker::wasm_bindgen)] + impl #target_name { + #[wasm_bindgen(constructor, wasm_bindgen=::worker::wasm_bindgen)] + pub fn new( + ctx: ::worker::worker_sys::Context, + env: ::worker::Env + ) -> Self { + ::new( + ::worker::Context::new(ctx), + env + ) + } + + #[wasm_bindgen(js_name = run, wasm_bindgen=::worker::wasm_bindgen)] + pub fn run( + &self, + event: ::worker::wasm_bindgen::JsValue, + step: ::worker::worker_sys::WorkflowStep + ) -> ::worker::js_sys::Promise { + // SAFETY: The Cloudflare Workers runtime manages the Workflow instance + // lifecycle. The runtime guarantees that: + // 1. The instance is created before run() is called + // 2. The instance is not destroyed while any Promise returned by run() is pending + // 3. WASM execution is single-threaded, so no concurrent access is possible + // This is the same lifecycle model used by Durable Objects and WorkerEntrypoint. + let static_self: &'static Self = unsafe { &*(self as *const _) }; + + ::worker::wasm_bindgen_futures::future_to_promise(::std::panic::AssertUnwindSafe(async move { + let event = ::worker::WorkflowEvent::from_js(event) + .map_err(|e| ::worker::wasm_bindgen::JsValue::from_str(&e.to_string()))?; + let step = ::worker::WorkflowStep::from(step); + let result = ::run(static_self, event, step).await + .map_err(::worker::wasm_bindgen::JsValue::from)?; + ::worker::serialize_as_object(&result) + .map_err(|e| ::worker::wasm_bindgen::JsValue::from_str(&e.to_string())) + })) + } + } + }; + }) +} diff --git a/worker-sys/Cargo.toml b/worker-sys/Cargo.toml index 5ce54720e..25763204c 100644 --- a/worker-sys/Cargo.toml +++ b/worker-sys/Cargo.toml @@ -16,3 +16,4 @@ web-sys.workspace = true [features] d1 = [] queue = [] +workflow = [] diff --git a/worker-sys/src/types.rs b/worker-sys/src/types.rs index d6e63d4db..f874a4ea6 100644 --- a/worker-sys/src/types.rs +++ b/worker-sys/src/types.rs @@ -21,6 +21,8 @@ mod tls_client_auth; mod version; mod websocket_pair; mod websocket_request_response_pair; +#[cfg(feature = "workflow")] +mod workflow; pub use ai::*; pub use analytics_engine::*; @@ -45,3 +47,5 @@ pub use tls_client_auth::*; pub use version::*; pub use websocket_pair::*; pub use websocket_request_response_pair::*; +#[cfg(feature = "workflow")] +pub use workflow::*; diff --git a/worker-sys/src/types/workflow.rs b/worker-sys/src/types/workflow.rs new file mode 100644 index 000000000..45454edb0 --- /dev/null +++ b/worker-sys/src/types/workflow.rs @@ -0,0 +1,87 @@ +use wasm_bindgen::prelude::*; + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Debug, Clone, PartialEq, Eq)] + pub type WorkflowStep; + + #[wasm_bindgen(method, catch, js_name = "do")] + pub fn do_( + this: &WorkflowStep, + name: &str, + callback: &js_sys::Function, + ) -> Result; + + #[wasm_bindgen(method, catch, js_name = "do")] + pub fn do_with_config( + this: &WorkflowStep, + name: &str, + config: JsValue, + callback: &js_sys::Function, + ) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn sleep( + this: &WorkflowStep, + name: &str, + duration: JsValue, + ) -> Result; + + #[wasm_bindgen(method, catch, js_name = sleepUntil)] + pub fn sleep_until( + this: &WorkflowStep, + name: &str, + timestamp: JsValue, + ) -> Result; + + #[wasm_bindgen(method, catch, js_name = waitForEvent)] + pub fn wait_for_event( + this: &WorkflowStep, + name: &str, + options: JsValue, + ) -> Result; + + /// Workflow binding type - may be a Workflow object, WorkflowImpl, or Fetcher (RPC stub). + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Debug, Clone, PartialEq, Eq)] + pub type WorkflowBinding; + + #[wasm_bindgen(method, catch)] + pub fn get(this: &WorkflowBinding, id: &str) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn create(this: &WorkflowBinding, options: JsValue) -> Result; + + #[wasm_bindgen(method, catch, js_name = createBatch)] + pub fn create_batch( + this: &WorkflowBinding, + batch: &js_sys::Array, + ) -> Result; + + /// Workflow instance handle - may be an RPC stub in Miniflare. + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Debug, Clone, PartialEq, Eq)] + pub type WorkflowInstanceSys; + + #[wasm_bindgen(method, catch)] + pub fn pause(this: &WorkflowInstanceSys) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn resume(this: &WorkflowInstanceSys) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn terminate(this: &WorkflowInstanceSys) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn restart(this: &WorkflowInstanceSys) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn status(this: &WorkflowInstanceSys) -> Result; + + #[wasm_bindgen(method, catch, js_name = sendEvent)] + pub fn send_event( + this: &WorkflowInstanceSys, + event: JsValue, + ) -> Result; +} diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 2de23f3b2..3aa81c87b 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -47,6 +47,7 @@ axum = { version = "0.8", optional = true, default-features = false } [features] queue = ["worker-macros/queue", "worker-sys/queue"] d1 = ["worker-sys/d1"] +workflow = ["worker-macros/workflow", "worker-sys/workflow"] http = ["worker-macros/http"] axum = ["dep:axum"] timezone = ["dep:chrono-tz"] diff --git a/worker/src/env.rs b/worker/src/env.rs index 780c2585b..396fbb9b7 100644 --- a/worker/src/env.rs +++ b/worker/src/env.rs @@ -8,6 +8,8 @@ use crate::rate_limit::RateLimiter; use crate::Ai; #[cfg(feature = "queue")] use crate::Queue; +#[cfg(feature = "workflow")] +use crate::Workflow; use crate::{durable::ObjectNamespace, Bucket, DynamicDispatcher, Fetcher, Result, SecretStore}; use crate::{error::Error, hyperdrive::Hyperdrive}; @@ -99,6 +101,12 @@ impl Env { self.get_binding(binding) } + #[cfg(feature = "workflow")] + /// Access a Workflow by the binding name configured in your wrangler.toml file. + pub fn workflow(&self, binding: &str) -> Result { + self.get_binding(binding) + } + /// Access an R2 Bucket by the binding name configured in your wrangler.toml file. pub fn bucket(&self, binding: &str) -> Result { self.get_binding(binding) diff --git a/worker/src/lib.rs b/worker/src/lib.rs index d4f902bdf..0a5ee5a5b 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -149,12 +149,16 @@ use std::result::Result as StdResult; #[doc(hidden)] pub use async_trait; pub use js_sys; +pub use serde_json; +pub use serde_wasm_bindgen; pub use url::Url; pub use wasm_bindgen; pub use wasm_bindgen_futures; pub use web_sys; pub use cf::{Cf, CfResponseProperties, TlsClientAuth}; +#[cfg(feature = "workflow")] +pub use worker_macros::workflow; pub use worker_macros::{consume, durable_object, event, send}; #[doc(hidden)] pub use worker_sys; @@ -196,6 +200,8 @@ pub use crate::socket::*; pub use crate::streams::*; pub use crate::version::*; pub use crate::websocket::*; +#[cfg(feature = "workflow")] +pub use crate::workflow::*; mod abort; mod ai; @@ -240,6 +246,8 @@ mod sql; mod streams; mod version; mod websocket; +#[cfg(feature = "workflow")] +mod workflow; /// A `Result` alias defaulting to [`Error`]. pub type Result = StdResult; diff --git a/worker/src/workflow.rs b/worker/src/workflow.rs new file mode 100644 index 000000000..a3caf619b --- /dev/null +++ b/worker/src/workflow.rs @@ -0,0 +1,526 @@ +//! Cloudflare Workflows support for Rust Workers. + +use std::future::Future; +use std::panic::AssertUnwindSafe; + +use js_sys::{Object, Reflect}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::{future_to_promise, JsFuture}; +use worker_sys::types::WorkflowBinding as WorkflowBindingSys; +use worker_sys::types::WorkflowInstanceSys; +use worker_sys::types::WorkflowStep as WorkflowStepSys; + +use crate::env::EnvBinding; +use crate::Result; + +#[doc(hidden)] +pub fn serialize_as_object( + value: &T, +) -> std::result::Result { + value.serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true)) +} + +fn get_property(target: &JsValue, name: &str) -> Result { + Reflect::get(target, &JsValue::from_str(name)) + .map_err(|e| crate::Error::JsError(format!("failed to get property '{name}': {e:?}"))) +} + +fn get_string_property(target: &JsValue, name: &str) -> Result { + get_property(target, name)? + .as_string() + .ok_or_else(|| crate::Error::JsError(format!("{name} is not a string"))) +} + +fn get_timestamp_property(target: &JsValue, name: &str) -> Result { + let val = get_property(target, name)?; + Ok(crate::Date::from(js_sys::Date::from(val))) +} + +/// A Workflow binding for creating and managing workflow instances. +#[derive(Debug, Clone)] +pub struct Workflow { + inner: WorkflowBindingSys, +} + +// SAFETY: WASM is single-threaded. These types wrap JS objects that are only +// accessed from the main thread. Send/Sync are implemented to satisfy Rust's +// async machinery (e.g., holding references across await points), but actual +// cross-thread access is impossible in the Workers runtime. +unsafe impl Send for Workflow {} +unsafe impl Sync for Workflow {} + +impl Workflow { + /// Get a handle to an existing workflow instance by ID. + pub async fn get(&self, id: &str) -> Result { + let promise = self.inner.get(id)?; + let result = JsFuture::from(promise).await?; + Ok(WorkflowInstance::from_js(result)) + } + + /// Create a new workflow instance. + pub async fn create( + &self, + options: Option>, + ) -> Result { + let js_options = match options { + Some(opts) => serde_wasm_bindgen::to_value(&opts)?, + None => JsValue::UNDEFINED, + }; + let promise = self.inner.create(js_options)?; + let result = JsFuture::from(promise).await?; + Ok(WorkflowInstance::from_js(result)) + } + + /// Create a batch of workflow instances (limited to 100 at a time). + pub async fn create_batch( + &self, + batch: Vec>, + ) -> Result> { + let js_array = js_sys::Array::new(); + for opts in batch { + js_array.push(&serde_wasm_bindgen::to_value(&opts)?); + } + let promise = self.inner.create_batch(&js_array)?; + let result = JsFuture::from(promise).await?; + let result_array: js_sys::Array = result.unchecked_into(); + + let mut instances = Vec::with_capacity(result_array.length() as usize); + for i in 0..result_array.length() { + instances.push(WorkflowInstance::from_js(result_array.get(i))); + } + Ok(instances) + } +} + +impl EnvBinding for Workflow { + const TYPE_NAME: &'static str = "Workflow"; + + fn get(val: JsValue) -> Result { + let obj = Object::from(val); + let constructor_name = obj.constructor().name(); + if constructor_name == Self::TYPE_NAME + || constructor_name == "WorkflowImpl" + || constructor_name == "Fetcher" + { + Ok(Self { + inner: obj.unchecked_into(), + }) + } else { + Err(format!( + "Binding cannot be cast to the type {} from {}", + Self::TYPE_NAME, + constructor_name + ) + .into()) + } + } +} + +impl JsCast for Workflow { + fn instanceof(_val: &JsValue) -> bool { + true + } + + fn unchecked_from_js(val: JsValue) -> Self { + Self { + inner: val.unchecked_into(), + } + } + + fn unchecked_from_js_ref(val: &JsValue) -> &Self { + unsafe { &*(val as *const JsValue as *const Self) } + } +} + +impl From for JsValue { + fn from(workflow: Workflow) -> Self { + workflow.inner.into() + } +} + +impl AsRef for Workflow { + fn as_ref(&self) -> &JsValue { + self.inner.as_ref() + } +} + +/// Options for creating a new workflow instance. +#[derive(Debug, Clone, Serialize)] +pub struct CreateOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub retention: Option, +} + +impl Default for CreateOptions { + fn default() -> Self { + Self { + id: None, + params: None, + retention: None, + } + } +} + +/// Retention policy for workflow instances. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RetentionOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub success_retention: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_retention: Option, +} + +/// A handle to a workflow instance. +#[derive(Debug, Clone)] +pub struct WorkflowInstance { + inner: WorkflowInstanceSys, +} + +// SAFETY: See Workflow for rationale - WASM is single-threaded. +unsafe impl Send for WorkflowInstance {} +unsafe impl Sync for WorkflowInstance {} + +impl WorkflowInstance { + fn from_js(val: JsValue) -> Self { + Self { + inner: val.unchecked_into(), + } + } + + /// The unique ID of this workflow instance. + pub fn id(&self) -> Result { + get_string_property(self.inner.as_ref(), "id") + } + + /// Pause the workflow instance. + pub async fn pause(&self) -> Result<()> { + JsFuture::from(self.inner.pause()?).await?; + Ok(()) + } + + /// Resume a paused workflow instance. + pub async fn resume(&self) -> Result<()> { + JsFuture::from(self.inner.resume()?).await?; + Ok(()) + } + + /// Terminate the workflow instance. + pub async fn terminate(&self) -> Result<()> { + JsFuture::from(self.inner.terminate()?).await?; + Ok(()) + } + + /// Restart the workflow instance. + pub async fn restart(&self) -> Result<()> { + JsFuture::from(self.inner.restart()?).await?; + Ok(()) + } + + /// Get the current status of the workflow instance. + pub async fn status(&self) -> Result { + let result = JsFuture::from(self.inner.status()?).await?; + Ok(serde_wasm_bindgen::from_value(result)?) + } + + /// Send an event to the workflow instance to trigger `step.wait_for_event()` calls. + pub async fn send_event(&self, event_type: &str, payload: T) -> Result<()> { + let event = Object::new(); + Reflect::set(&event, &"type".into(), &event_type.into())?; + Reflect::set( + &event, + &"payload".into(), + &serde_wasm_bindgen::to_value(&payload)?, + )?; + JsFuture::from(self.inner.send_event(event.into())?).await?; + Ok(()) + } +} + +/// The status of a workflow instance. +#[derive(Debug, Clone, Deserialize)] +pub struct InstanceStatus { + pub status: InstanceStatusKind, + #[serde(default)] + pub error: Option, + #[serde(default)] + pub output: Option, +} + +/// The possible status values for a workflow instance. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum InstanceStatusKind { + Queued, + Running, + Paused, + Errored, + Terminated, + Complete, + Waiting, + WaitingForPause, + Unknown, +} + +/// Error information for a failed workflow. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InstanceError { + pub name: String, + pub message: String, +} + +/// Provides methods for executing durable workflow steps. +#[derive(Debug)] +pub struct WorkflowStep(WorkflowStepSys); + +// SAFETY: See Workflow for rationale - WASM is single-threaded. +unsafe impl Send for WorkflowStep {} +unsafe impl Sync for WorkflowStep {} + +impl WorkflowStep { + fn wrap_callback( + callback: F, + ) -> wasm_bindgen::closure::Closure js_sys::Promise> + where + T: Serialize + 'static, + F: FnOnce() -> Fut + 'static, + Fut: Future> + 'static, + { + wasm_bindgen::closure::Closure::once(move || -> js_sys::Promise { + future_to_promise(AssertUnwindSafe(async move { + let result = callback().await.map_err(JsValue::from)?; + serialize_as_object(&result).map_err(|e| JsValue::from_str(&e.to_string())) + })) + }) + } + + /// Execute a named step. The callback's return value is persisted and + /// returned without re-executing on replay. + pub async fn do_(&self, name: &str, callback: F) -> Result + where + T: Serialize + DeserializeOwned + 'static, + F: FnOnce() -> Fut + 'static, + Fut: Future> + 'static, + { + let closure = Self::wrap_callback(callback); + let js_fn = closure.as_ref().unchecked_ref::(); + let promise = self.0.do_(name, js_fn)?; + let result = JsFuture::from(promise).await?; + Ok(serde_wasm_bindgen::from_value(result)?) + } + + /// Execute a named step with retry and timeout configuration. + pub async fn do_with_config( + &self, + name: &str, + config: StepConfig, + callback: F, + ) -> Result + where + T: Serialize + DeserializeOwned + 'static, + F: FnOnce() -> Fut + 'static, + Fut: Future> + 'static, + { + let config_js = serde_wasm_bindgen::to_value(&config)?; + let closure = Self::wrap_callback(callback); + let js_fn = closure.as_ref().unchecked_ref::(); + let promise = self.0.do_with_config(name, config_js, js_fn)?; + let result = JsFuture::from(promise).await?; + Ok(serde_wasm_bindgen::from_value(result)?) + } + + /// Sleep for a specified duration (e.g., "1 minute", "5 seconds"). + pub async fn sleep(&self, name: &str, duration: impl Into) -> Result<()> { + let duration_js = duration.into().to_js_value(); + JsFuture::from(self.0.sleep(name, duration_js)?).await?; + Ok(()) + } + + /// Sleep until a specific timestamp. + pub async fn sleep_until(&self, name: &str, timestamp: impl Into) -> Result<()> { + let date: crate::Date = timestamp.into(); + let ts_ms = date.as_millis() as f64; + JsFuture::from(self.0.sleep_until(name, ts_ms.into())?).await?; + Ok(()) + } + + /// Wait for an external event sent via `WorkflowInstance::send_event()`. + pub async fn wait_for_event( + &self, + name: &str, + options: WaitForEventOptions, + ) -> Result> { + let options_js = serde_wasm_bindgen::to_value(&options)?; + let result = JsFuture::from(self.0.wait_for_event(name, options_js)?).await?; + WorkflowStepEvent::from_js(result) + } +} + +impl From for WorkflowStep { + fn from(inner: WorkflowStepSys) -> Self { + Self(inner) + } +} + +/// Configuration for a workflow step. +#[derive(Debug, Clone, Default, Serialize)] +pub struct StepConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub retries: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timeout: Option, +} + +/// Retry configuration for a workflow step. +#[derive(Debug, Clone, Serialize)] +pub struct RetryConfig { + pub limit: u32, + pub delay: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub backoff: Option, +} + +/// Backoff strategy for retries. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Backoff { + Constant, + Linear, + Exponential, +} + +/// Options for waiting for an external event. +#[derive(Debug, Clone, Serialize)] +pub struct WaitForEventOptions { + #[serde(rename = "type")] + pub event_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub timeout: Option, +} + +/// An event received from `wait_for_event`. +#[derive(Debug, Clone)] +pub struct WorkflowStepEvent { + pub payload: T, + pub timestamp: crate::Date, + pub event_type: String, +} + +impl WorkflowStepEvent { + fn from_js(value: JsValue) -> Result { + Ok(Self { + payload: serde_wasm_bindgen::from_value(get_property(&value, "payload")?)?, + timestamp: get_timestamp_property(&value, "timestamp")?, + event_type: get_string_property(&value, "type")?, + }) + } +} + +/// The event passed to a workflow's run method. +#[derive(Debug, Clone)] +pub struct WorkflowEvent { + pub payload: T, + pub timestamp: crate::Date, + pub instance_id: String, +} + +impl WorkflowEvent { + pub fn from_js(value: JsValue) -> Result { + Ok(Self { + payload: serde_wasm_bindgen::from_value(get_property(&value, "payload")?)?, + timestamp: get_timestamp_property(&value, "timestamp")?, + instance_id: get_string_property(&value, "instanceId")?, + }) + } +} + +/// Duration type for workflow sleep operations. +#[derive(Debug, Clone)] +pub enum WorkflowDuration { + Milliseconds(u64), + String(String), +} + +impl WorkflowDuration { + fn to_js_value(&self) -> JsValue { + match self { + Self::Milliseconds(ms) => JsValue::from_f64(*ms as f64), + Self::String(s) => JsValue::from_str(s), + } + } +} + +impl From<&str> for WorkflowDuration { + fn from(s: &str) -> Self { + Self::String(s.to_string()) + } +} + +impl From for WorkflowDuration { + fn from(s: String) -> Self { + Self::String(s) + } +} + +impl From for WorkflowDuration { + fn from(d: std::time::Duration) -> Self { + Self::Milliseconds(d.as_millis() as u64) + } +} + +/// Error type for non-retryable workflow errors. +#[derive(Debug)] +pub struct NonRetryableError { + message: String, + name: Option, +} + +impl NonRetryableError { + pub fn new(message: impl Into) -> Self { + Self { + message: message.into(), + name: None, + } + } + + pub fn with_name(message: impl Into, name: impl Into) -> Self { + Self { + message: message.into(), + name: Some(name.into()), + } + } +} + +impl std::fmt::Display for NonRetryableError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(name) = &self.name { + write!(f, "{}: {}", name, self.message) + } else { + write!(f, "{}", self.message) + } + } +} + +impl std::error::Error for NonRetryableError {} + +/// Marker trait implemented by the `#[workflow]` macro. +#[doc(hidden)] +pub trait HasWorkflowAttribute {} + +/// Trait for implementing a Workflow entrypoint. +#[allow(async_fn_in_trait)] +pub trait WorkflowEntrypoint: HasWorkflowAttribute { + fn new(ctx: crate::Context, env: crate::Env) -> Self; + + async fn run( + &self, + event: WorkflowEvent, + step: WorkflowStep, + ) -> Result; +} From 93f3c5ad8054d20ee81d66fea68765597228054d Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 28 Jan 2026 08:18:09 -0600 Subject: [PATCH 2/4] clippy? clippy fix picked up more things. unclear why the build just flagged these --- worker-macros/src/event.rs | 2 +- worker-macros/src/workflow.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/worker-macros/src/event.rs b/worker-macros/src/event.rs index f7c9374d8..104a4e7e2 100644 --- a/worker-macros/src/event.rs +++ b/worker-macros/src/event.rs @@ -28,7 +28,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { "respond_with_errors" => { respond_with_errors = true; } - _ => panic!("Invalid attribute: {}", attr), + _ => panic!("Invalid attribute: {attr}"), } } let handler_type = handler_type.expect( diff --git a/worker-macros/src/workflow.rs b/worker-macros/src/workflow.rs index 45fbea7ef..2b3807e72 100644 --- a/worker-macros/src/workflow.rs +++ b/worker-macros/src/workflow.rs @@ -13,7 +13,7 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result { let target = syn::parse2::(tokens)?; let target_name = &target.ident; let marker_fn_name = format_ident!("__wf_{}", target_name); - let marker_js_name = format!("__wf_{}", target_name); + let marker_js_name = format!("__wf_{target_name}"); let target_name_str = target_name.to_string(); Ok(quote! { From d19624f3fced5c7d7f1df089b13acc8e76d8f620 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 28 Jan 2026 18:04:31 -0600 Subject: [PATCH 3/4] backout toolchain bump --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index ab40f4f44..291696d0e 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.88.0" +channel = "1.87.0" profile = "default" From 4536c0cb902de193364b62c948d8efbaf19ac4f5 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 28 Jan 2026 18:16:25 -0600 Subject: [PATCH 4/4] fix: wrap workflow JsFuture calls with SendFuture for axum compatibility Workflow async methods were failing to compile with the `http` feature because JsFuture is not Send. Wrap all JsFuture calls with SendFuture and extract promises to separate variables to avoid holding non-Send types across await points. --- worker/src/workflow.rs | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/worker/src/workflow.rs b/worker/src/workflow.rs index a3caf619b..03010704f 100644 --- a/worker/src/workflow.rs +++ b/worker/src/workflow.rs @@ -13,6 +13,7 @@ use worker_sys::types::WorkflowInstanceSys; use worker_sys::types::WorkflowStep as WorkflowStepSys; use crate::env::EnvBinding; +use crate::send::SendFuture; use crate::Result; #[doc(hidden)] @@ -55,7 +56,7 @@ impl Workflow { /// Get a handle to an existing workflow instance by ID. pub async fn get(&self, id: &str) -> Result { let promise = self.inner.get(id)?; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; Ok(WorkflowInstance::from_js(result)) } @@ -69,7 +70,7 @@ impl Workflow { None => JsValue::UNDEFINED, }; let promise = self.inner.create(js_options)?; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; Ok(WorkflowInstance::from_js(result)) } @@ -83,7 +84,7 @@ impl Workflow { js_array.push(&serde_wasm_bindgen::to_value(&opts)?); } let promise = self.inner.create_batch(&js_array)?; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; let result_array: js_sys::Array = result.unchecked_into(); let mut instances = Vec::with_capacity(result_array.length() as usize); @@ -201,31 +202,36 @@ impl WorkflowInstance { /// Pause the workflow instance. pub async fn pause(&self) -> Result<()> { - JsFuture::from(self.inner.pause()?).await?; + let promise = self.inner.pause()?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } /// Resume a paused workflow instance. pub async fn resume(&self) -> Result<()> { - JsFuture::from(self.inner.resume()?).await?; + let promise = self.inner.resume()?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } /// Terminate the workflow instance. pub async fn terminate(&self) -> Result<()> { - JsFuture::from(self.inner.terminate()?).await?; + let promise = self.inner.terminate()?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } /// Restart the workflow instance. pub async fn restart(&self) -> Result<()> { - JsFuture::from(self.inner.restart()?).await?; + let promise = self.inner.restart()?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } /// Get the current status of the workflow instance. pub async fn status(&self) -> Result { - let result = JsFuture::from(self.inner.status()?).await?; + let promise = self.inner.status()?; + let result = SendFuture::new(JsFuture::from(promise)).await?; Ok(serde_wasm_bindgen::from_value(result)?) } @@ -238,7 +244,8 @@ impl WorkflowInstance { &"payload".into(), &serde_wasm_bindgen::to_value(&payload)?, )?; - JsFuture::from(self.inner.send_event(event.into())?).await?; + let promise = self.inner.send_event(event.into())?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } } @@ -311,7 +318,7 @@ impl WorkflowStep { let closure = Self::wrap_callback(callback); let js_fn = closure.as_ref().unchecked_ref::(); let promise = self.0.do_(name, js_fn)?; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; Ok(serde_wasm_bindgen::from_value(result)?) } @@ -331,14 +338,15 @@ impl WorkflowStep { let closure = Self::wrap_callback(callback); let js_fn = closure.as_ref().unchecked_ref::(); let promise = self.0.do_with_config(name, config_js, js_fn)?; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; Ok(serde_wasm_bindgen::from_value(result)?) } /// Sleep for a specified duration (e.g., "1 minute", "5 seconds"). pub async fn sleep(&self, name: &str, duration: impl Into) -> Result<()> { let duration_js = duration.into().to_js_value(); - JsFuture::from(self.0.sleep(name, duration_js)?).await?; + let promise = self.0.sleep(name, duration_js)?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } @@ -346,7 +354,8 @@ impl WorkflowStep { pub async fn sleep_until(&self, name: &str, timestamp: impl Into) -> Result<()> { let date: crate::Date = timestamp.into(); let ts_ms = date.as_millis() as f64; - JsFuture::from(self.0.sleep_until(name, ts_ms.into())?).await?; + let promise = self.0.sleep_until(name, ts_ms.into())?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } @@ -357,7 +366,8 @@ impl WorkflowStep { options: WaitForEventOptions, ) -> Result> { let options_js = serde_wasm_bindgen::to_value(&options)?; - let result = JsFuture::from(self.0.wait_for_event(name, options_js)?).await?; + let promise = self.0.wait_for_event(name, options_js)?; + let result = SendFuture::new(JsFuture::from(promise)).await?; WorkflowStepEvent::from_js(result) } }