Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ web-sys = { version = "0.3.85", features = [
"WritableStream",
"WritableStreamDefaultWriter",
] }
worker = { version = "0.7.4", path = "worker", features = ["queue", "d1", "axum", "timezone"] }
worker = { version = "0.7.4", path = "worker", features = ["queue", "d1", "axum", "timezone", "workflow"] }
worker-codegen = { path = "worker-codegen", version = "0.2.0" }
worker-macros = { version = "0.7.4", path = "worker-macros", features = ["queue"] }
worker-sys = { version = "0.7.4", path = "worker-sys", features = ["d1", "queue"] }
Expand Down
15 changes: 15 additions & 0 deletions examples/workflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "workflow-example"
version = "0.1.0"
edition = "2021"

[package.metadata.release]
release = false

[lib]
crate-type = ["cdylib"]

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
worker = { path = "../../worker", features = ["workflow"] }
154 changes: 154 additions & 0 deletions examples/workflow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use serde::{Deserialize, Serialize};
use worker::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyParams {
pub email: String,
pub name: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyOutput {
pub message: String,
pub steps_completed: u32,
}

#[workflow]
pub struct MyWorkflow {
#[allow(dead_code)]
env: Env,
}

impl WorkflowEntrypoint for MyWorkflow {
fn new(_ctx: Context, env: Env) -> Self {
Self { env }
}

async fn run(
&self,
event: WorkflowEvent<serde_json::Value>,
step: WorkflowStep,
) -> Result<serde_json::Value> {
console_log!("Workflow started with instance ID: {}", event.instance_id);

let params: MyParams =
serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?;

let name_for_step1 = params.name.clone();
let step1_result = step
.do_("initial-processing", move || async move {
console_log!("Processing for user: {}", name_for_step1);
Ok(serde_json::json!({
"processed": true,
"user": name_for_step1
}))
})
.await?;

console_log!("Step 1 completed: {:?}", step1_result);

console_log!("Step 2: Sleeping for 10 seconds...");
step.sleep("wait-for-processing", "10 seconds").await?;

let email_for_step3 = params.email.clone();
let notification_result = step
.do_with_config(
"send-notification",
StepConfig {
retries: Some(RetryConfig {
limit: 3,
delay: "5 seconds".to_string(),
backoff: Some(Backoff::Exponential),
}),
timeout: Some("1 minute".to_string()),
},
move || async move {
console_log!("Sending notification to: {}", email_for_step3);
Ok(serde_json::json!({
"notification_sent": true,
"email": email_for_step3
}))
},
)
.await?;

console_log!("Step 3 completed: {:?}", notification_result);

let output = MyOutput {
message: format!("Workflow completed for {}", params.name),
steps_completed: 3,
};

Ok(serde_json::to_value(output).unwrap())
}
}

#[event(fetch)]
async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
let url = req.url()?;
let path = url.path();
let workflow = env.workflow("MY_WORKFLOW")?;

match (req.method(), path) {
(Method::Post, "/workflow") => {
let params = MyParams {
email: "user@example.com".to_string(),
name: "Test User".to_string(),
};

let instance = workflow
.create(Some(CreateOptions {
id: None,
params: Some(params),
retention: None,
}))
.await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow created"
}))
}

(Method::Get, path) if path.starts_with("/workflow/") => {
let id = path.trim_start_matches("/workflow/");
let instance = workflow.get(id).await?;
let status = instance.status().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"status": format!("{:?}", status.status),
"error": status.error,
"output": status.output
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/pause") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/pause");
let instance = workflow.get(id).await?;
instance.pause().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow paused"
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/resume") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/resume");
let instance = workflow.get(id).await?;
instance.resume().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow resumed"
}))
}

_ => Response::error("Not Found", 404),
}
}
13 changes: 13 additions & 0 deletions examples/workflow/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "workflow-example"
main = "build/worker/shim.mjs"
compatibility_date = "2024-10-22"

[build]
# For development: use local worker-build binary
# For production: command = "cargo install -q worker-build && worker-build --release"
command = "RUSTFLAGS='--cfg=web_sys_unstable_apis' ../../target/release/worker-build --release"

[[workflows]]
name = "my-workflow"
binding = "MY_WORKFLOW"
class_name = "MyWorkflow"
1 change: 1 addition & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod socket;
mod sql_counter;
mod sql_iterator;
mod user;
mod workflow;
mod ws;

#[derive(Deserialize, Serialize)]
Expand Down
4 changes: 3 additions & 1 deletion test/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alarm, analytics_engine, assets, auto_response, cache, container, counter, d1, durable, fetch,
form, js_snippets, kv, put_raw, queue, r2, rate_limit, request, secret_store, service, socket,
sql_counter, sql_iterator, user, ws, SomeSharedData, GLOBAL_STATE,
sql_counter, sql_iterator, user, workflow, ws, SomeSharedData, GLOBAL_STATE,
};
#[cfg(feature = "http")]
use std::convert::TryInto;
Expand Down Expand Up @@ -234,6 +234,8 @@ macro_rules! add_routes (
add_route!($obj, get, format_route!("/rate-limit/key/{}", "key"), rate_limit::handle_rate_limit_with_key);
add_route!($obj, get, "/rate-limit/bulk-test", rate_limit::handle_rate_limit_bulk_test);
add_route!($obj, get, "/rate-limit/reset", rate_limit::handle_rate_limit_reset);
add_route!($obj, post, "/workflow/create", workflow::handle_workflow_create);
add_route!($obj, get, format_route!("/workflow/status/{}", "id"), workflow::handle_workflow_status);
});

#[cfg(feature = "http")]
Expand Down
74 changes: 74 additions & 0 deletions test/src/workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use serde::{Deserialize, Serialize};
use worker::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestParams {
pub value: String,
}

#[workflow]
pub struct TestWorkflow {
#[allow(dead_code)]
env: Env,
}

impl WorkflowEntrypoint for TestWorkflow {
fn new(_ctx: Context, env: Env) -> Self {
Self { env }
}

async fn run(
&self,
event: WorkflowEvent<serde_json::Value>,
step: WorkflowStep,
) -> Result<serde_json::Value> {
let params: TestParams =
serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?;

let result = step
.do_("process", move || async move {
Ok(serde_json::json!({ "processed": params.value }))
})
.await?;

Ok(result)
}
}

pub async fn handle_workflow_create(
_req: Request,
env: Env,
_data: crate::SomeSharedData,
) -> Result<Response> {
let workflow = env.workflow("TEST_WORKFLOW")?;
let params = TestParams {
value: "hello".to_string(),
};
let instance = workflow
.create(Some(CreateOptions {
params: Some(params),
..Default::default()
}))
.await?;

Response::from_json(&serde_json::json!({ "id": instance.id()? }))
}

pub async fn handle_workflow_status(
req: Request,
env: Env,
_data: crate::SomeSharedData,
) -> Result<Response> {
let url = req.url()?;
let path = url.path();
let id = path.trim_start_matches("/workflow/status/");
let workflow = env.workflow("TEST_WORKFLOW")?;
let instance = workflow.get(id).await?;
let status = instance.status().await?;

Response::from_json(&serde_json::json!({
"status": format!("{:?}", status.status),
"output": status.output,
"error": status.error,
}))
}
21 changes: 21 additions & 0 deletions test/tests/mf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const mf_instance = new Miniflare({
kvPersist: false,
r2Persist: false,
cachePersist: false,
workflowsPersist: false,
workers: [
{
scriptPath: "./build/index.js",
Expand Down Expand Up @@ -113,6 +114,15 @@ const mf_instance = new Miniflare({
scriptName: "mini-analytics-engine" // mock out analytics engine binding to the "mini-analytics-engine" worker
}
},
// Workflow binding requires a separate worker via scriptName in
// the Miniflare JS API (wrangler dev handles this automatically).
workflows: {
TEST_WORKFLOW: {
name: "test-workflow",
className: "TestWorkflow",
scriptName: "workflow-worker",
},
},
ratelimits: {
TEST_RATE_LIMITER: {
simple: {
Expand All @@ -122,6 +132,17 @@ const mf_instance = new Miniflare({
}
}
},
{
// Dedicated worker for TestWorkflow; uses the generated JS class wrapper.
name: "workflow-worker",
scriptPath: "./build/worker/shim.mjs",
modules: true,
modulesRules: [
{ type: "ESModule", include: ["**/*.js"], fallthrough: true },
{ type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true },
],
compatibilityDate: "2025-07-24",
},
{
name: "mini-analytics-engine",
modules: true,
Expand Down
37 changes: 37 additions & 0 deletions test/tests/workflow.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { describe, test, expect } from "vitest";
import { mf, mfUrl } from "./mf";

describe("workflow", () => {
test("create and poll status until completion", async () => {
const createResp = await mf.dispatchFetch(`${mfUrl}workflow/create`, {
method: "POST",
});
expect(createResp.status).toBe(200);
const { id } = (await createResp.json()) as { id: string };
expect(id).toBeDefined();
expect(typeof id).toBe("string");

let status: string | undefined;
let output: unknown;
for (let i = 0; i < 30; i++) {
await new Promise((resolve) => setTimeout(resolve, 500));
const statusResp = await mf.dispatchFetch(
`${mfUrl}workflow/status/${id}`
);
expect(statusResp.status).toBe(200);
const body = (await statusResp.json()) as {
status: string;
output: unknown;
error: unknown;
};
status = body.status;
output = body.output;
if (status === "Complete" || status === "Errored") {
break;
}
}

expect(status).toBe("Complete");
expect(output).toEqual({ processed: "hello" });
});
});
Loading
Loading