Dart-native background job platform
Queues, retries, scheduling, workflows, and observability — all in pure Dart.
- Pure Dart — No external worker processes, no FFI bindings. Runs anywhere Dart runs.
- Pluggable backends — Swap between SQLite, Redis, or Postgres with a single line.
- Battle-tested patterns — Retries with backoff, rate limiting, dead-letter queues, and priority scheduling.
- Workflows — Durable, checkpointed execution for complex multi-step processes.
- Canvas API — Compose tasks into groups, chains, and chords.
- Observability — Built-in OpenTelemetry integration for traces and metrics.
import 'dart:async';
import 'package:stem/stem.dart';
class EmailTask extends TaskHandler<String> {
@override
String get name => 'email.send';
@override
Future<String> call(TaskContext ctx, Map<String, Object?> args) async {
final to = args['to'] as String;
return 'sent to $to';
}
}
Future<void> main() async {
final client = await StemClient.inMemory(tasks: [EmailTask()]);
final worker = await client.createWorker();
unawaited(worker.start());
final taskId = await client.stem.enqueue(
'email.send',
args: {'to': 'hello@example.com'},
);
final result = await client.stem.waitForTask<String>(taskId);
print('Result: ${result?.value}');
await worker.shutdown();
await client.close();
} PRODUCERS
┌───────────────────┬───────────────────┬──────────────────┐
│ │ │ │
v v v v
┌─────────┐ ┌──────────┐ ┌───────────┐ ┌──────────┐
│ Stem │ │ Canvas │ │ Workflow │ │ Client │
│ Client │ │ (chains, │ │ API │ │ SDKs │
└────┬────┘ │ groups) │ └─────┬─────┘ └────┬─────┘
│ └────┬─────┘ │ │
└──────────────────┼────────────────────┼────────────────────┘
│
v
┌──────────────────────────────────────────────────────────┐
│ BROKER │
│ queues / leases / acks / nack / delayed / dlq │
└───────────────┬───────────────────────────┬──────────────┘
│ │
v v
┌──────────────────┐ ┌─────────────────────┐
│ Workflow Engine │ │ Workers │
│ claim runs & │ │ (many, independent)│
│ schedule steps │ └───────┬───────┬─────┘
└───────┬──────────┘ │ │
│ │ │
enqueue steps ──────────────────────────┘ │
│ │
v v
┌──────────────────┐ ┌──────────────────┐
│ Workflow Store │ │ Task Registry │
│ (runs/steps) │ │ & Handlers │
└────────┬─────────┘ └────────┬─────────┘
│ │
v v
┌──────────────────┐ ┌──────────────────┐
│ Event Bus │ │ Result Backend │
└──────────────────┘ └──────────────────┘
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
SCHEDULING
┌────────────────┐ ┌────────────────┐
│ Beat Scheduler │---->│ Schedule Store │
│ (cron) │ └────────────────┘
└───────┬────────┘ │
│ │
v v
┌────────┐ ┌────────────┐
│ Broker │<---------│ Lock Store │
└────────┘ └────────────┘
(enqueues scheduled tasks / lease guards)
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
ADAPTERS
┌────────────┐ ┌────────────┐ ┌────────────┐
│ SQLite │ │ Redis │ │ Postgres │
│ (local) │ │ (streams) │ │ (durable) │
└────────────┘ └────────────┘ └────────────┘
| Package | Description | pub.dev |
|---|---|---|
stem |
Core runtime: contracts, worker, scheduler, in-memory adapters, signals, Canvas, workflows | |
stem_cli |
Command-line tooling (stem executable) and CLI utilities |
|
stem_redis |
Redis Streams broker, result backend, and watchdog helpers | |
stem_postgres |
Postgres broker, result backend, and scheduler stores | |
stem_sqlite |
SQLite broker and result backend for local dev/testing | |
stem_builder |
Build-time code generator for annotated tasks and workflows | |
stem_adapter_tests |
Shared contract test suites for adapter implementations | |
stem_memory |
In-memory adapter package (broker/backend/workflow/scheduler factories) | |
stem_dashboard |
Hotwire-based operations dashboard (experimental) | — |
TaskOptions(
queue: 'high-priority', // Target queue
maxRetries: 5, // Retry on failure
priority: 10, // Higher = processed first
rateLimit: '100/m', // Rate limiting
softTimeLimit: Duration(seconds: 30),
hardTimeLimit: Duration(minutes: 2),
visibilityTimeout: Duration(minutes: 5),
)// Chain: sequential execution, results flow forward
await canvas.chain([
task('resize', args: {'file': 'img.png'}),
task('upload', args: {'bucket': 's3://photos'}),
task('notify', args: {'channel': 'slack'}),
]);
// Group: parallel execution
await canvas.group([
task('resize', args: {'size': 'small'}),
task('resize', args: {'size': 'medium'}),
task('resize', args: {'size': 'large'}),
]);
// Chord: parallel tasks + callback when all complete
await canvas.chord(
[task('fetch.a'), task('fetch.b'), task('fetch.c')],
callback: task('aggregate'),
);final workflow = WorkflowScript('order.process', (wf) async {
final validated = await wf.activity('validate', args: order);
final charged = await wf.activity('charge', args: validated);
await wf.sleep(Duration(hours: 24)); // Durable sleep!
await wf.activity('ship', args: charged);
});final beat = BeatScheduler(
broker: broker,
scheduleStore: store,
entries: [
ScheduleEntry(
name: 'daily-report',
cron: '0 9 * * *', // Every day at 9 AM
task: task('reports.generate'),
),
],
);# Run a worker
stem worker --queue default --concurrency 8
# Run the beat scheduler
stem schedule list
# Inspect dead-letter queue
stem dlq list
stem dlq retry <task-id>
# List registered tasks
stem tasks ls
# Health check
stem health- Dart 3.9.2+
- Docker (for adapter integration tests)
# Clone the repository
git clone https://github.com/kingwill101/stem.git
cd stem
# Install dependencies
dart pub get
# Run quality gates
dart format --output=none --set-exit-if-changed .
dart analyze
task test:no-envIntegration tests require the Docker test stack:
# Run all package tests with Docker-backed integration env
task test
# Run coverage workflow for core adapters/runtime packages
task coverage
# Run targeted adapter suites (auto-bootstraps integration env)
task test:contract
task test:redis
task test:postgresTargeted adapter tasks now bootstrap integration environment automatically. If bootstrap still fails (for example Docker unavailable), run:
source ./packages/stem_cli/_init_test_envCapability flags and skip behavior for adapter contract suites are documented in
packages/stem_adapter_tests/README.md.
Contributions are welcome! Please read the contribution guidelines before submitting a PR.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
MIT License — see LICENSE for details.
