From 78b2fd89cd7e21e8fd8efa485a0b94f76f309c2b Mon Sep 17 00:00:00 2001 From: kingwill101 Date: Thu, 26 Feb 2026 11:39:37 -0500 Subject: [PATCH] feat(dashboard): expand observability UI and microservice demo --- packages/dashboard/CHANGELOG.md | 11 + packages/dashboard/README.md | 32 + packages/dashboard/bin/dashboard.dart | 2 + packages/dashboard/lib/dashboard.dart | 10 +- packages/dashboard/lib/src/config/config.dart | 81 + packages/dashboard/lib/src/server.dart | 1085 +++++++++- .../dashboard/lib/src/services/models.dart | 740 ++++++- .../lib/src/services/stem_service.dart | 660 +++++- .../lib/src/state/dashboard_state.dart | 327 ++- packages/dashboard/lib/src/ui/content.dart | 643 +----- .../dashboard/lib/src/ui/event_templates.dart | 14 +- packages/dashboard/lib/src/ui/layout.dart | 1920 ++++++++++++++--- packages/dashboard/pubspec.yaml | 14 +- .../test/dashboard_browser_test.dart | 95 +- .../test/dashboard_state_property_test.dart | 40 +- packages/dashboard/test/server_test.dart | 588 ++++- .../sqlite_dashboard_service_test.dart | 64 + packages/stem/CHANGELOG.md | 12 + packages/stem/example/microservice/README.md | 31 +- .../stem/example/microservice/beat/Dockerfile | 3 +- .../example/microservice/beat/bin/beat.dart | 3 + .../example/microservice/beat/pubspec.yaml | 2 + .../example/microservice/dashboard/Dockerfile | 3 +- .../example/microservice/docker-compose.yml | 46 +- .../example/microservice/enqueuer/Dockerfile | 3 +- .../microservice/enqueuer/bin/main.dart | 316 ++- .../microservice/enqueuer/pubspec.yaml | 4 +- .../microservice/grafana-datasources.yml | 8 +- 28 files changed, 5740 insertions(+), 1017 deletions(-) diff --git a/packages/dashboard/CHANGELOG.md b/packages/dashboard/CHANGELOG.md index 9b244c58..b9e5c2bf 100644 --- a/packages/dashboard/CHANGELOG.md +++ b/packages/dashboard/CHANGELOG.md @@ -2,4 +2,15 @@ ## 0.1.0 +- Reworked the dashboard into a richer operations console with dedicated views + for tasks, jobs, workflows, workers, failures, audit, events, namespaces, and + search. +- Refactored UI rendering into modular page components and shared table/layout + primitives for better maintainability. +- Introduced a full Tailwind-based styling system and updated responsive layout + behavior for sidebar/header/content rendering. +- Improved navigation and Turbo frame behavior to reduce stale-content flashes + during page switches. +- Expanded dashboard state/service/server models and test coverage to support + the new views and metadata-rich rendering paths. - Initial release of the `stem_dashboard` package. diff --git a/packages/dashboard/README.md b/packages/dashboard/README.md index 79cea7c4..59ba1183 100644 --- a/packages/dashboard/README.md +++ b/packages/dashboard/README.md @@ -36,6 +36,7 @@ Environment variables mirror the Stem CLI: - `STEM_RESULT_BACKEND_URL` (defaults to the broker URL when omitted) - `STEM_NAMESPACE` / `STEM_DASHBOARD_NAMESPACE` (defaults to `stem`) - `STEM_TLS_*` for TLS-enabled Redis endpoints +- `DASHBOARD_BASE_PATH` (optional mount prefix such as `/dashboard`) Because the dashboard reuses `StemConfig`, any broker/result backend supported by Stem (`redis://`, `rediss://`, `postgres://`, `postgresql://`, `memory://`) @@ -45,6 +46,37 @@ The events page keeps a websocket open to `/dash/streams` so new queue/worker deltas appear instantly without refreshing. Tasks and workers pages use Turbo Frames for navigation and sorting. +## Library Embedding + +`stem_dashboard` can run standalone (via `runDashboardServer`) or be mounted +into an existing `routed` engine: + +```dart +import 'package:routed/routed.dart'; +import 'package:stem_dashboard/dashboard.dart'; + +Future main() async { + final service = await StemDashboardService.connect(); + final state = DashboardState(service: service); + await state.start(); + + final engine = Engine(); + mountDashboard( + engine: engine, + service: service, + state: state, + options: const DashboardMountOptions(basePath: '/dashboard'), + ); + + await engine.serve(host: '127.0.0.1', port: 8080); +} +``` + +For embedded usage, the host app owns lifecycle: + +- call `state.start()` before serving. +- call `state.dispose()` and `service.close()` on shutdown. + ### Local dependency overrides `pubspec.yaml` contains overrides pointing at the local Stem packages so the diff --git a/packages/dashboard/bin/dashboard.dart b/packages/dashboard/bin/dashboard.dart index 547cccff..b75ce116 100644 --- a/packages/dashboard/bin/dashboard.dart +++ b/packages/dashboard/bin/dashboard.dart @@ -6,6 +6,7 @@ Future main(List args) async { final host = Platform.environment['DASHBOARD_HOST']?.trim(); final portRaw = Platform.environment['DASHBOARD_PORT']?.trim(); final echoRaw = Platform.environment['DASHBOARD_ECHO_ROUTES']?.trim(); + final basePath = Platform.environment['DASHBOARD_BASE_PATH']?.trim(); final resolvedHost = host != null && host.isNotEmpty ? host : '127.0.0.1'; final resolvedPort = int.tryParse(portRaw ?? '') ?? 3080; @@ -17,6 +18,7 @@ Future main(List args) async { host: resolvedHost, port: resolvedPort, echoRoutes: echoRoutes, + basePath: basePath ?? '', ), ); } diff --git a/packages/dashboard/lib/dashboard.dart b/packages/dashboard/lib/dashboard.dart index c0a9329b..70ac12e2 100644 --- a/packages/dashboard/lib/dashboard.dart +++ b/packages/dashboard/lib/dashboard.dart @@ -1,3 +1,11 @@ -export 'src/server.dart' show DashboardServerOptions, runDashboardServer; +export 'src/server.dart' + show + DashboardMountOptions, + DashboardServerOptions, + buildDashboardEngine, + mountDashboard, + registerDashboardRoutes, + runDashboardServer; export 'src/services/stem_service.dart' show DashboardDataSource, StemDashboardService; +export 'src/state/dashboard_state.dart' show DashboardState; diff --git a/packages/dashboard/lib/src/config/config.dart b/packages/dashboard/lib/src/config/config.dart index d4cc7d5a..94968562 100644 --- a/packages/dashboard/lib/src/config/config.dart +++ b/packages/dashboard/lib/src/config/config.dart @@ -11,6 +11,11 @@ class DashboardConfig { required this.stem, required this.namespace, required this.routing, + required this.alertWebhookUrls, + required this.alertBacklogThreshold, + required this.alertFailedTaskThreshold, + required this.alertOfflineWorkerThreshold, + required this.alertCooldown, }); /// Loads a dashboard config from the provided environment map. @@ -29,12 +34,37 @@ class DashboardConfig { final routing = RoutingConfigLoader( StemRoutingContext.fromConfig(stemConfig), ).load(); + final webhookUrls = _parseCsv( + env['STEM_DASHBOARD_ALERT_WEBHOOK_URLS'] ?? + env['STEM_DASHBOARD_WEBHOOK_URLS'], + ); + final backlogThreshold = _parsePositiveInt( + env['STEM_DASHBOARD_ALERT_BACKLOG_THRESHOLD'], + fallback: 500, + ); + final failedThreshold = _parsePositiveInt( + env['STEM_DASHBOARD_ALERT_FAILED_TASK_THRESHOLD'], + fallback: 25, + ); + final offlineThreshold = _parsePositiveInt( + env['STEM_DASHBOARD_ALERT_OFFLINE_WORKER_THRESHOLD'], + fallback: 1, + ); + final cooldown = _parseDuration( + env['STEM_DASHBOARD_ALERT_COOLDOWN'], + fallback: const Duration(minutes: 5), + ); return DashboardConfig._( environment: Map.unmodifiable(env), stem: stemConfig, namespace: namespace, routing: routing, + alertWebhookUrls: webhookUrls, + alertBacklogThreshold: backlogThreshold, + alertFailedTaskThreshold: failedThreshold, + alertOfflineWorkerThreshold: offlineThreshold, + alertCooldown: cooldown, ); } @@ -54,6 +84,21 @@ class DashboardConfig { /// Routing registry resolved for this dashboard session. final RoutingRegistry routing; + /// Alert webhook URLs. + final List alertWebhookUrls; + + /// Backlog alert threshold. + final int alertBacklogThreshold; + + /// Failed task alert threshold. + final int alertFailedTaskThreshold; + + /// Offline worker alert threshold. + final int alertOfflineWorkerThreshold; + + /// Alert cooldown. + final Duration alertCooldown; + /// Broker URL resolved from the underlying Stem config. String get brokerUrl => stem.brokerUrl; @@ -63,3 +108,39 @@ class DashboardConfig { /// TLS configuration resolved from the underlying Stem config. TlsConfig get tls => stem.tls; } + +List _parseCsv(String? raw) { + if (raw == null || raw.trim().isEmpty) return const []; + return raw + .split(',') + .map((value) => value.trim()) + .where((value) => value.isNotEmpty) + .toList(growable: false); +} + +int _parsePositiveInt(String? raw, {required int fallback}) { + if (raw == null || raw.trim().isEmpty) return fallback; + final parsed = int.tryParse(raw.trim()); + if (parsed == null || parsed <= 0) return fallback; + return parsed; +} + +Duration _parseDuration(String? raw, {required Duration fallback}) { + if (raw == null || raw.trim().isEmpty) return fallback; + final value = raw.trim(); + final match = RegExp(r'^(\d+)(ms|s|m|h)$').firstMatch(value); + if (match == null) return fallback; + final amount = int.tryParse(match.group(1) ?? ''); + if (amount == null || amount <= 0) return fallback; + switch (match.group(2)) { + case 'ms': + return Duration(milliseconds: amount); + case 's': + return Duration(seconds: amount); + case 'm': + return Duration(minutes: amount); + case 'h': + return Duration(hours: amount); + } + return fallback; +} diff --git a/packages/dashboard/lib/src/server.dart b/packages/dashboard/lib/src/server.dart index d1767abe..6d66b1f6 100644 --- a/packages/dashboard/lib/src/server.dart +++ b/packages/dashboard/lib/src/server.dart @@ -1,9 +1,11 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:routed/routed.dart'; import 'package:routed_hotwire/routed_hotwire.dart'; -import 'package:stem/stem.dart' show generateEnvelopeId; +import 'package:stem/stem.dart' + show TaskState, generateEnvelopeId, stemLogContext, stemLogger; import 'package:stem_dashboard/src/config/config.dart'; import 'package:stem_dashboard/src/services/models.dart'; import 'package:stem_dashboard/src/services/stem_service.dart'; @@ -11,6 +13,19 @@ import 'package:stem_dashboard/src/state/dashboard_state.dart'; import 'package:stem_dashboard/src/stem/control_messages.dart'; import 'package:stem_dashboard/src/ui/content.dart'; import 'package:stem_dashboard/src/ui/layout.dart'; +import 'package:stem_dashboard/src/ui/overview.dart'; +import 'package:stem_dashboard/src/ui/paths.dart'; + +/// Mount options for embedding the dashboard in a host app. +class DashboardMountOptions { + /// Creates mount options. + const DashboardMountOptions({this.basePath = ''}); + + /// Prefix path used when mounting routes into a host app. + /// + /// Examples: `''` (root), `'/dashboard'`. + final String basePath; +} /// Options controlling how the dashboard server binds to the network. class DashboardServerOptions { @@ -19,6 +34,7 @@ class DashboardServerOptions { this.host = '127.0.0.1', this.port = 3080, this.echoRoutes = false, + this.basePath = '', }); /// Hostname or IP address for the HTTP server. @@ -30,12 +46,21 @@ class DashboardServerOptions { /// Whether to log each registered route on startup. final bool echoRoutes; + /// Prefix path used when serving the dashboard from a sub-route. + final String basePath; + /// Returns a copy with the provided fields replaced. - DashboardServerOptions copyWith({String? host, int? port, bool? echoRoutes}) { + DashboardServerOptions copyWith({ + String? host, + int? port, + bool? echoRoutes, + String? basePath, + }) { return DashboardServerOptions( host: host ?? this.host, port: port ?? this.port, echoRoutes: echoRoutes ?? this.echoRoutes, + basePath: basePath ?? this.basePath, ); } } @@ -53,7 +78,19 @@ Future runDashboardServer({ final dashboardService = service ?? await StemDashboardService.connect(resolvedConfig!); final stateOwner = state == null; - final dashboardState = state ?? DashboardState(service: dashboardService); + final dashboardState = + state ?? + DashboardState( + service: dashboardService, + alertWebhookUrls: resolvedConfig?.alertWebhookUrls ?? const [], + alertBacklogThreshold: resolvedConfig?.alertBacklogThreshold ?? 500, + alertFailedTaskThreshold: + resolvedConfig?.alertFailedTaskThreshold ?? 25, + alertOfflineWorkerThreshold: + resolvedConfig?.alertOfflineWorkerThreshold ?? 1, + alertCooldown: + resolvedConfig?.alertCooldown ?? const Duration(minutes: 5), + ); if (stateOwner) { await dashboardState.start(); @@ -61,10 +98,22 @@ Future runDashboardServer({ final engine = buildDashboardEngine( service: dashboardService, state: dashboardState, + basePath: options.basePath, ); + final resolvedBasePath = normalizeDashboardBasePath(options.basePath); + final dashboardUrlPath = dashboardRoute(resolvedBasePath, '/'); - stdout.writeln( - '[stem-dashboard] Starting on http://${options.host}:${options.port}', + stemLogger.info( + 'Starting dashboard server', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'host': options.host, + 'port': options.port, + 'basePath': dashboardUrlPath, + }, + ), ); try { @@ -73,7 +122,9 @@ Future runDashboardServer({ port: options.port, echo: options.echoRoutes, ); + await _waitForShutdownSignal(); } finally { + await engine.close(); if (stateOwner) { await dashboardState.dispose(); } @@ -83,15 +134,75 @@ Future runDashboardServer({ } } +Future _waitForShutdownSignal() async { + final completer = Completer(); + final subscriptions = >[]; + + void complete(ProcessSignal signal) { + stemLogger.info( + 'Shutdown signal received', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: {'signal': signal.toString()}, + ), + ); + if (!completer.isCompleted) { + completer.complete(); + } + } + + void watch(ProcessSignal signal) { + subscriptions.add(signal.watch().listen(complete)); + } + + watch(ProcessSignal.sigint); + if (!Platform.isWindows) { + watch(ProcessSignal.sigterm); + } + + try { + await completer.future; + } finally { + for (final subscription in subscriptions) { + await subscription.cancel(); + } + } +} + /// Constructs the dashboard engine with routes and Turbo streaming. Engine buildDashboardEngine({ required DashboardDataSource service, required DashboardState state, + String basePath = '', }) { final engine = Engine(); - _registerRoutes(engine, service, state); + mountDashboard( + engine: engine, + service: service, + state: state, + options: DashboardMountOptions(basePath: basePath), + ); + return engine; +} + +/// Mounts dashboard routes and websocket streams into an existing [engine]. +void mountDashboard({ + required Engine engine, + required DashboardDataSource service, + required DashboardState state, + DashboardMountOptions options = const DashboardMountOptions(), +}) { + final resolvedBasePath = normalizeDashboardBasePath(options.basePath); + registerDashboardRoutes( + engine, + service, + state, + basePath: resolvedBasePath, + ); + final streamPath = dashboardRoute(resolvedBasePath, '/dash/streams'); engine.ws( - '/dash/streams', + streamPath, TurboStreamSocketHandler( hub: state.hub, topicResolver: (context) => @@ -99,84 +210,414 @@ Engine buildDashboardEngine({ const ['stem-dashboard:events'], ), ); - return engine; } -void _registerRoutes( +/// Registers the dashboard HTTP routes on [engine]. +void registerDashboardRoutes( Engine engine, DashboardDataSource service, - DashboardState state, -) { + DashboardState state, { + String basePath = '', +}) { engine ..get( - '/', - (ctx) => _renderPage(ctx, DashboardPage.overview, service, state), + dashboardRoute(basePath, '/'), + (ctx) => _renderPage( + ctx, + DashboardPage.overview, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/tasks'), + (ctx) => _renderPage( + ctx, + DashboardPage.tasks, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/tasks/detail'), + (ctx) => _renderPage( + ctx, + DashboardPage.taskDetail, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/tasks/inline'), + (ctx) => _renderTaskInline(ctx, service, basePath: basePath), + ) + ..get( + dashboardRoute(basePath, '/failures'), + (ctx) => _renderPage( + ctx, + DashboardPage.failures, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/search'), + (ctx) => _renderPage( + ctx, + DashboardPage.search, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/audit'), + (ctx) => _renderPage( + ctx, + DashboardPage.audit, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/events'), + (ctx) => _renderPage( + ctx, + DashboardPage.events, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/namespaces'), + (ctx) => _renderPage( + ctx, + DashboardPage.namespaces, + service, + state, + basePath: basePath, + ), ) ..get( - '/tasks', - (ctx) => _renderPage(ctx, DashboardPage.tasks, service, state), + dashboardRoute(basePath, '/workflows'), + (ctx) => _renderPage( + ctx, + DashboardPage.workflows, + service, + state, + basePath: basePath, + ), ) ..get( - '/events', - (ctx) => _renderPage(ctx, DashboardPage.events, service, state), + dashboardRoute(basePath, '/jobs'), + (ctx) => _renderPage( + ctx, + DashboardPage.jobs, + service, + state, + basePath: basePath, + ), ) ..get( - '/workers', - (ctx) => _renderPage(ctx, DashboardPage.workers, service, state), + dashboardRoute(basePath, '/workers'), + (ctx) => _renderPage( + ctx, + DashboardPage.workers, + service, + state, + basePath: basePath, + ), + ) + ..get( + dashboardRoute(basePath, '/partials/overview'), + (ctx) => _renderOverviewPartials(ctx, service, state, basePath: basePath), + ) + ..post( + dashboardRoute(basePath, '/tasks/enqueue'), + (ctx) => _enqueueTask(ctx, service, state, basePath: basePath), ) - ..post('/tasks/enqueue', (ctx) => _enqueueTask(ctx, service)) - ..post('/workers/control', (ctx) => _controlWorkers(ctx, service)) - ..post('/queues/replay', (ctx) => _replayDeadLetters(ctx, service)); + ..post( + dashboardRoute(basePath, '/tasks/action'), + (ctx) => _taskAction(ctx, service, state, basePath: basePath), + ) + ..post( + dashboardRoute(basePath, '/workers/control'), + (ctx) => _controlWorkers(ctx, service, state, basePath: basePath), + ) + ..post( + dashboardRoute(basePath, '/queues/replay'), + (ctx) => _replayDeadLetters(ctx, service, state, basePath: basePath), + ); +} + +Future _renderOverviewPartials( + EngineContext ctx, + DashboardDataSource service, + DashboardState state, { + required String basePath, +}) async { + try { + final queues = await service.fetchQueueSummaries(); + final workers = await service.fetchWorkerStatuses(); + final taskStatuses = await service.fetchTaskStatuses(limit: 300); + final sections = buildOverviewSections( + queues, + workers, + state.throughput, + taskStatuses, + defaultNamespace: _resolveDefaultNamespace(workers, taskStatuses), + ); + + final updates = [ + turboStreamReplace( + target: 'overview-metrics', + html: prefixDashboardUrlAttributes(sections.metrics, basePath), + ), + turboStreamReplace( + target: 'overview-namespaces', + html: prefixDashboardUrlAttributes(sections.namespaces, basePath), + ), + turboStreamReplace( + target: 'overview-queue-table', + html: prefixDashboardUrlAttributes(sections.topQueues, basePath), + ), + turboStreamReplace( + target: 'overview-workflows', + html: prefixDashboardUrlAttributes(sections.workflows, basePath), + ), + turboStreamReplace( + target: 'overview-jobs', + html: prefixDashboardUrlAttributes(sections.jobs, basePath), + ), + turboStreamReplace( + target: 'overview-latency-table', + html: prefixDashboardUrlAttributes(sections.latency, basePath), + ), + turboStreamReplace( + target: 'overview-recent-tasks', + html: prefixDashboardUrlAttributes(sections.recentTasks, basePath), + ), + ].join('\n'); + + return ctx.turboStream(updates); + } on Object catch (error, stack) { + stemLogger.error( + 'Failed to render overview partials', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); + return ctx.turboHtml( + '
Failed to refresh overview metrics.
', + statusCode: HttpStatus.internalServerError, + ); + } } Future _renderPage( EngineContext ctx, DashboardPage page, DashboardDataSource service, - DashboardState state, -) async { + DashboardState state, { + required String basePath, +}) async { final turbo = ctx.turbo; try { - final queues = page == DashboardPage.events - ? const [] - : await service.fetchQueueSummaries(); + final needsQueues = + page == DashboardPage.overview || + page == DashboardPage.tasks || + page == DashboardPage.workers || + page == DashboardPage.search || + page == DashboardPage.namespaces; + final queues = needsQueues + ? await service.fetchQueueSummaries() + : const []; final workers = - page == DashboardPage.overview || page == DashboardPage.workers + page == DashboardPage.overview || + page == DashboardPage.workers || + page == DashboardPage.search || + page == DashboardPage.namespaces ? await service.fetchWorkerStatuses() : const []; - final tasksOptions = page == DashboardPage.tasks + var tasksOptions = page == DashboardPage.tasks ? _parseTasksOptions(ctx.uri.queryParameters) : const TasksPageOptions(); + final failuresOptions = page == DashboardPage.failures + ? _parseFailuresOptions(ctx.uri.queryParameters) + : const FailuresPageOptions(); + + final searchOptions = page == DashboardPage.search + ? _parseSearchOptions(ctx.uri.queryParameters) + : const SearchPageOptions(); + final namespacesOptions = page == DashboardPage.namespaces + ? _parseNamespacesOptions(ctx.uri.queryParameters) + : const NamespacesPageOptions(); + final workflowsOptions = page == DashboardPage.workflows + ? _parseWorkflowsOptions(ctx.uri.queryParameters) + : const WorkflowsPageOptions(); + final jobsOptions = page == DashboardPage.jobs + ? _parseJobsOptions(ctx.uri.queryParameters) + : const JobsPageOptions(); final workersOptions = page == DashboardPage.workers ? _parseWorkersOptions(ctx.uri.queryParameters) : const WorkersPageOptions(); + List taskStatuses; + if (page == DashboardPage.tasks) { + final localFilteringNeeded = + tasksOptions.hasNamespaceFilter || + tasksOptions.hasTaskFilter || + tasksOptions.hasRunIdFilter; + if (!localFilteringNeeded) { + final pageRequest = await service.fetchTaskStatuses( + state: tasksOptions.stateFilter, + queue: tasksOptions.filter, + limit: tasksOptions.pageSize + 1, + offset: tasksOptions.offset, + ); + final hasNextPage = pageRequest.length > tasksOptions.pageSize; + taskStatuses = hasNextPage + ? pageRequest.take(tasksOptions.pageSize).toList(growable: false) + : pageRequest; + tasksOptions = tasksOptions.copyWith( + hasNextPage: hasNextPage, + hasPreviousPage: tasksOptions.page > 1, + ); + } else { + final source = tasksOptions.hasRunIdFilter + ? await service.fetchTaskStatusesForRun( + tasksOptions.runId!, + limit: 1000, + ) + : await service.fetchTaskStatuses( + state: tasksOptions.stateFilter, + queue: tasksOptions.filter, + limit: 1000, + ); + final filtered = _applyTaskViewFilters(source, tasksOptions); + final pageItems = filtered + .skip(tasksOptions.offset) + .take(tasksOptions.pageSize) + .toList(growable: false); + final hasNextPage = + filtered.length > tasksOptions.offset + pageItems.length; + taskStatuses = pageItems; + tasksOptions = tasksOptions.copyWith( + hasNextPage: hasNextPage, + hasPreviousPage: tasksOptions.page > 1, + ); + } + } else if (page == DashboardPage.failures) { + taskStatuses = await service.fetchTaskStatuses( + state: TaskState.failed, + queue: failuresOptions.queue, + limit: 300, + ); + } else if (page == DashboardPage.overview) { + taskStatuses = await service.fetchTaskStatuses(limit: 300); + } else if (page == DashboardPage.search) { + taskStatuses = await service.fetchTaskStatuses(limit: 500); + } else if (page == DashboardPage.namespaces) { + taskStatuses = await service.fetchTaskStatuses(limit: 600); + } else if (page == DashboardPage.workflows) { + taskStatuses = await service.fetchTaskStatuses(limit: 700); + } else if (page == DashboardPage.jobs) { + taskStatuses = await service.fetchTaskStatuses(limit: 700); + } else { + taskStatuses = const []; + } + + final taskDetail = page == DashboardPage.taskDetail + ? await service.fetchTaskStatus(ctx.uri.queryParameters['id'] ?? '') + : null; + final runId = ctx.uri.queryParameters['runId']?.trim().isNotEmpty ?? false + ? ctx.uri.queryParameters['runId']!.trim() + : taskDetail?.runId; + final runTimeline = page == DashboardPage.taskDetail && runId != null + ? await service.fetchTaskStatusesForRun(runId, limit: 250) + : const []; + final workflowRun = page == DashboardPage.taskDetail && runId != null + ? await service.fetchWorkflowRun(runId) + : null; + final workflowSteps = page == DashboardPage.taskDetail && runId != null + ? await service.fetchWorkflowSteps(runId) + : const []; + final content = buildPageContent( page: page, queues: queues, workers: workers, + taskStatuses: taskStatuses, + taskDetail: taskDetail, + runTimeline: runTimeline, + workflowRun: workflowRun, + workflowSteps: workflowSteps, + auditEntries: page == DashboardPage.search || page == DashboardPage.audit + ? state.auditEntries + : const [], throughput: page == DashboardPage.overview ? state.throughput : null, events: page == DashboardPage.events ? state.events : const [], + defaultNamespace: _resolveDefaultNamespace(workers, taskStatuses), tasksOptions: tasksOptions, workersOptions: workersOptions, + failuresOptions: failuresOptions, + searchOptions: searchOptions, + namespacesOptions: namespacesOptions, + workflowsOptions: workflowsOptions, + jobsOptions: jobsOptions, ); + final contentWithBasePath = prefixDashboardUrlAttributes(content, basePath); + final streamPath = dashboardRoute(basePath, '/dash/streams'); if (turbo.isFrameRequest) { - return ctx.turboFrame(renderFrame(page, content)); + return ctx.turboFrame(renderFrame(page, contentWithBasePath)); } - return ctx.turboHtml(renderLayout(page, content)); + return ctx.turboHtml( + renderLayout( + page, + contentWithBasePath, + basePath: basePath, + streamPath: streamPath, + ), + ); } on Object catch (error, stack) { - stderr - ..writeln( - '[stem-dashboard] Failed to render ${page.name} page: $error', - ) - ..writeln(stack); + stemLogger.error( + 'Failed to render dashboard page', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'page': page.name, + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); final errorContent = _renderErrorPanel(error); if (turbo.isFrameRequest) { - return ctx.turboFrame(renderFrame(page, errorContent)); + return ctx.turboFrame( + renderFrame(page, prefixDashboardUrlAttributes(errorContent, basePath)), + ); } - return ctx.turboHtml(renderLayout(page, errorContent)); + return ctx.turboHtml( + renderLayout( + page, + prefixDashboardUrlAttributes(errorContent, basePath), + basePath: basePath, + ), + ); } } @@ -193,15 +634,60 @@ String _renderErrorPanel(Object error) { '''; } +Future _renderTaskInline( + EngineContext ctx, + DashboardDataSource service, { + required String basePath, +}) async { + final taskId = (ctx.uri.queryParameters['id'] ?? '').trim(); + final target = _sanitizeDomTarget(ctx.uri.queryParameters['target'] ?? ''); + if (target.isEmpty) { + return ctx.turboHtml( + '
Missing inline target.
', + statusCode: HttpStatus.badRequest, + ); + } + + DashboardTaskStatusEntry? task; + if (taskId.isNotEmpty) { + task = await service.fetchTaskStatus(taskId); + } + + final content = prefixDashboardUrlAttributes( + buildTaskInlineContent(task), + basePath, + ); + final payload = + '
$content
'; + return ctx.turboStream(turboStreamReplace(target: target, html: payload)); +} + +String _sanitizeDomTarget(String raw) { + final trimmed = raw.trim(); + if (trimmed.isEmpty) return ''; + final validPattern = RegExp(r'^[A-Za-z][A-Za-z0-9:_-]*$'); + return validPattern.hasMatch(trimmed) ? trimmed : ''; +} + Future _enqueueTask( EngineContext ctx, DashboardDataSource service, -) async { + DashboardState state, { + required String basePath, +}) async { + final tasksPath = dashboardRoute(basePath, '/tasks'); try { final queue = (await ctx.postForm('queue')).trim(); final task = (await ctx.postForm('task')).trim(); if (queue.isEmpty || task.isEmpty) { - return ctx.turboSeeOther('/tasks?error=missing-fields'); + state.recordAudit( + kind: 'action', + action: 'task.enqueue', + status: 'error', + actor: 'dashboard', + summary: 'Task enqueue rejected: queue/task missing.', + ); + return ctx.turboSeeOther('$tasksPath?error=missing-fields'); } final payloadText = (await ctx.postForm('payload')).trim(); @@ -212,10 +698,24 @@ Future _enqueueTask( if (decoded is Map) { args = decoded; } else { - return ctx.turboSeeOther('/tasks?error=invalid-payload'); + state.recordAudit( + kind: 'action', + action: 'task.enqueue', + status: 'error', + actor: 'dashboard', + summary: 'Task enqueue rejected: payload not a JSON object.', + ); + return ctx.turboSeeOther('$tasksPath?error=invalid-payload'); } } on Object { - return ctx.turboSeeOther('/tasks?error=invalid-payload'); + state.recordAudit( + kind: 'action', + action: 'task.enqueue', + status: 'error', + actor: 'dashboard', + summary: 'Task enqueue rejected: invalid JSON payload.', + ); + return ctx.turboSeeOther('$tasksPath?error=invalid-payload'); } } @@ -235,12 +735,176 @@ Future _enqueueTask( maxRetries: maxRetries, ), ); - return ctx.turboSeeOther('/tasks?flash=queued'); + state.recordAudit( + kind: 'action', + action: 'task.enqueue', + status: 'ok', + actor: 'dashboard', + summary: 'Queued task "$task" on "$queue".', + metadata: {'queue': queue, 'task': task}, + ); + return ctx.turboSeeOther('$tasksPath?flash=queued'); + } on Object catch (error, stack) { + stemLogger.error( + 'Dashboard enqueue failed', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); + state.recordAudit( + kind: 'action', + action: 'task.enqueue', + status: 'error', + actor: 'dashboard', + summary: 'Task enqueue failed: $error', + ); + return ctx.turboSeeOther('$tasksPath?error=enqueue-failed'); + } +} + +Future _taskAction( + EngineContext ctx, + DashboardDataSource service, + DashboardState state, { + required String basePath, +}) async { + final redirect = _resolveRedirectPath( + await ctx.defaultPostForm('redirect', dashboardRoute(basePath, '/tasks')), + fallbackPath: dashboardRoute(basePath, '/tasks'), + ); + try { + final action = (await ctx.postForm('action')).trim().toLowerCase(); + final taskId = (await ctx.postForm('taskId')).trim(); + final queueRaw = (await ctx.defaultPostForm('queue', '')).trim(); + final queue = queueRaw.isEmpty ? null : queueRaw; + + if (taskId.isEmpty) { + state.recordAudit( + kind: 'action', + action: 'task.action', + status: 'error', + actor: 'dashboard', + summary: 'Task action rejected: missing task id.', + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, {'error': 'Task ID is required.'}), + ); + } + + switch (action) { + case 'cancel': + final reasonRaw = (await ctx.defaultPostForm( + 'reason', + 'Cancelled from dashboard.', + )).trim(); + final terminate = _isTruthy( + (await ctx.defaultPostForm('terminate', 'false')).trim(), + ); + final revoked = await service.revokeTask( + taskId, + terminate: terminate, + reason: reasonRaw.isEmpty ? null : reasonRaw, + ); + if (!revoked) { + state.recordAudit( + kind: 'action', + action: 'task.cancel', + status: 'error', + actor: 'dashboard', + summary: 'Failed to revoke task $taskId.', + metadata: {'taskId': taskId, 'queue': ?queue}, + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'error': 'Unable to revoke task $taskId.', + }), + ); + } + state.recordAudit( + kind: 'action', + action: 'task.cancel', + status: 'ok', + actor: 'dashboard', + summary: 'Revocation requested for $taskId.', + metadata: {'taskId': taskId, 'queue': ?queue}, + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'flash': 'Revocation requested for task $taskId.', + }), + ); + case 'replay': + final replayed = await service.replayTaskById(taskId, queue: queue); + if (!replayed) { + state.recordAudit( + kind: 'action', + action: 'task.replay', + status: 'error', + actor: 'dashboard', + summary: 'Task $taskId was not found in dead letters.', + metadata: {'taskId': taskId, 'queue': ?queue}, + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'error': 'Task $taskId was not found in dead letters.', + }), + ); + } + state.recordAudit( + kind: 'action', + action: 'task.replay', + status: 'ok', + actor: 'dashboard', + summary: 'Replayed dead-letter task $taskId.', + metadata: {'taskId': taskId, 'queue': ?queue}, + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'flash': 'Replayed dead-letter task $taskId as a new envelope.', + }), + ); + default: + state.recordAudit( + kind: 'action', + action: 'task.action', + status: 'error', + actor: 'dashboard', + summary: 'Unsupported task action "$action".', + metadata: {'taskId': taskId}, + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'error': 'Unsupported task action "$action".', + }), + ); + } } on Object catch (error, stack) { - stderr - ..writeln('[stem-dashboard] enqueue failed: $error') - ..writeln(stack); - return ctx.turboSeeOther('/tasks?error=enqueue-failed'); + stemLogger.error( + 'Dashboard task action failed', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); + state.recordAudit( + kind: 'action', + action: 'task.action', + status: 'error', + actor: 'dashboard', + summary: 'Task action failed: $error', + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, {'error': 'Task action failed.'}), + ); } } @@ -256,10 +920,38 @@ TasksPageOptions _parseTasksOptions(Map params) { final descending = direction == 'desc'; final filterRaw = params['queue']?.trim(); final filter = filterRaw == null || filterRaw.isEmpty ? null : filterRaw; + final namespaceRaw = params['namespace']?.trim(); + final namespaceFilter = namespaceRaw == null || namespaceRaw.isEmpty + ? null + : namespaceRaw; + final taskRaw = params['task']?.trim(); + final taskFilter = taskRaw == null || taskRaw.isEmpty ? null : taskRaw; + final runRaw = params['runId']?.trim(); + final runId = runRaw == null || runRaw.isEmpty ? null : runRaw; + final stateRaw = params['state']?.trim().toLowerCase(); + final stateFilter = switch (stateRaw) { + 'queued' => TaskState.queued, + 'running' => TaskState.running, + 'succeeded' => TaskState.succeeded, + 'failed' => TaskState.failed, + 'retried' => TaskState.retried, + 'cancelled' => TaskState.cancelled, + _ => null, + }; + final pageRaw = int.tryParse((params['page'] ?? '1').trim()); + final page = pageRaw == null || pageRaw < 1 ? 1 : pageRaw; + final pageSizeRaw = int.tryParse((params['pageSize'] ?? '25').trim()); + final pageSize = (pageSizeRaw ?? 25).clamp(25, 200); return TasksPageOptions( sortKey: sortKey, descending: descending, filter: filter, + namespaceFilter: namespaceFilter, + taskFilter: taskFilter, + runId: runId, + stateFilter: stateFilter, + page: page, + pageSize: pageSize, flashKey: params['flash']?.trim().isEmpty ?? false ? null : params['flash'], errorKey: params['error']?.trim().isEmpty ?? false ? null : params['error'], ); @@ -269,22 +961,145 @@ WorkersPageOptions _parseWorkersOptions(Map params) { final flash = params['flash']?.trim(); final error = params['error']?.trim(); final target = params['scope']?.trim(); + final namespace = params['namespace']?.trim(); return WorkersPageOptions( flashMessage: flash?.isNotEmpty ?? false ? flash : null, errorMessage: error?.isNotEmpty ?? false ? error : null, scope: target?.isNotEmpty ?? false ? target : null, + namespaceFilter: namespace?.isNotEmpty ?? false ? namespace : null, + ); +} + +FailuresPageOptions _parseFailuresOptions(Map params) { + final queue = params['queue']?.trim(); + final flash = params['flash']?.trim(); + final error = params['error']?.trim(); + return FailuresPageOptions( + queue: queue?.isEmpty ?? true ? null : queue, + flashMessage: flash?.isEmpty ?? true ? null : flash, + errorMessage: error?.isEmpty ?? true ? null : error, + ); +} + +SearchPageOptions _parseSearchOptions(Map params) { + final query = params['q']?.trim(); + final scopeRaw = (params['scope'] ?? 'all').trim().toLowerCase(); + final scope = switch (scopeRaw) { + 'tasks' => 'tasks', + 'workers' => 'workers', + 'queues' => 'queues', + 'audit' => 'audit', + _ => 'all', + }; + return SearchPageOptions( + query: query?.isEmpty ?? true ? null : query, + scope: scope, + ); +} + +NamespacesPageOptions _parseNamespacesOptions(Map params) { + final namespace = params['namespace']?.trim(); + return NamespacesPageOptions( + namespace: namespace?.isNotEmpty ?? false ? namespace : null, ); } +WorkflowsPageOptions _parseWorkflowsOptions(Map params) { + final workflow = params['workflow']?.trim(); + final runId = params['runId']?.trim(); + return WorkflowsPageOptions( + workflow: workflow?.isNotEmpty ?? false ? workflow : null, + runId: runId?.isNotEmpty ?? false ? runId : null, + ); +} + +JobsPageOptions _parseJobsOptions(Map params) { + final task = params['task']?.trim(); + final queue = params['queue']?.trim(); + return JobsPageOptions( + task: task?.isNotEmpty ?? false ? task : null, + queue: queue?.isNotEmpty ?? false ? queue : null, + ); +} + +List _applyTaskViewFilters( + List tasks, + TasksPageOptions options, +) { + final queueFilter = options.filter?.toLowerCase(); + final namespaceFilter = options.namespaceFilter?.toLowerCase(); + final taskFilter = options.taskFilter?.toLowerCase(); + final runFilter = options.runId?.toLowerCase(); + return tasks.where((entry) { + if (options.hasFilter) { + final queue = entry.queue.toLowerCase(); + if (!(queueFilter != null && queue.contains(queueFilter))) { + return false; + } + } + if (options.hasNamespaceFilter && + entry.namespace.toLowerCase() != namespaceFilter) { + return false; + } + if (options.hasTaskFilter) { + final name = entry.taskName.toLowerCase(); + if (!(taskFilter != null && name.contains(taskFilter))) { + return false; + } + } + if (options.hasRunIdFilter) { + final runId = entry.runId?.toLowerCase() ?? ''; + if (!(runFilter != null && runId.contains(runFilter))) { + return false; + } + } + if (options.hasStateFilter && entry.state != options.stateFilter) { + return false; + } + return true; + }).toList(growable: false); +} + +String _resolveDefaultNamespace( + List workers, + List tasks, +) { + for (final worker in workers) { + final value = worker.namespace.trim(); + if (value.isNotEmpty) return value; + } + for (final task in tasks) { + final value = task.namespace.trim(); + if (value.isNotEmpty) return value; + } + return 'stem'; +} + Future _controlWorkers( EngineContext ctx, DashboardDataSource service, -) async { + DashboardState state, { + required String basePath, +}) async { + final namespaceFilter = (await ctx.defaultPostForm('namespace', '')).trim(); + final workersPath = namespaceFilter.isEmpty + ? dashboardRoute(basePath, '/workers') + : _appendRedirectQuery( + dashboardRoute(basePath, '/workers'), + {'namespace': namespaceFilter}, + ); try { final rawAction = (await ctx.postForm('action')).trim().toLowerCase(); if (rawAction.isEmpty) { + state.recordAudit( + kind: 'action', + action: 'worker.control', + status: 'error', + actor: 'dashboard', + summary: 'Control action missing.', + ); return ctx.turboSeeOther( - '/workers?error=${Uri.encodeComponent('Control action missing.')}', + '$workersPath?error=${Uri.encodeComponent('Control action missing.')}', ); } @@ -313,10 +1128,17 @@ Future _controlWorkers( }; if (commandType == null) { + state.recordAudit( + kind: 'action', + action: 'worker.control', + status: 'error', + actor: 'dashboard', + summary: 'Unsupported control action "$rawAction".', + ); final encodedError = Uri.encodeComponent( 'Unsupported control action "$rawAction".', ); - return ctx.turboSeeOther('/workers?error=$encodedError'); + return ctx.turboSeeOther('$workersPath?error=$encodedError'); } final payload = {}; @@ -363,10 +1185,18 @@ Future _controlWorkers( if (primaryError is String && primaryError.isNotEmpty) { message.write(' Example: $primaryError'); } + state.recordAudit( + kind: 'action', + action: 'worker.control.$rawAction', + status: 'error', + actor: 'dashboard', + summary: + '$label command reached $scope with $errorReplies error replies.', + ); final encodedMessage = Uri.encodeComponent(message.toString()); final encodedScope = Uri.encodeComponent(scope); return ctx.turboSeeOther( - '/workers?error=$encodedMessage&scope=$encodedScope', + '$workersPath?error=$encodedMessage&scope=$encodedScope', ); } @@ -374,17 +1204,39 @@ Future _controlWorkers( final message = replies.isEmpty ? '$label command sent to $scope.' : '$label command acknowledged by $okReplies $ackLabel from $scope.'; + state.recordAudit( + kind: 'action', + action: 'worker.control.$rawAction', + status: 'ok', + actor: 'dashboard', + summary: message, + ); final encodedMessage = Uri.encodeComponent(message); final encodedScope = Uri.encodeComponent(scope); return ctx.turboSeeOther( - '/workers?flash=$encodedMessage&scope=$encodedScope', + '$workersPath?flash=$encodedMessage&scope=$encodedScope', ); } on Object catch (error, stack) { - stderr - ..writeln('[stem-dashboard] control command failed: $error') - ..writeln(stack); + stemLogger.error( + 'Dashboard control command failed', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); + state.recordAudit( + kind: 'action', + action: 'worker.control', + status: 'error', + actor: 'dashboard', + summary: 'Control command failed: $error', + ); return ctx.turboSeeOther( - '/workers?error=${Uri.encodeComponent('Control command failed.')}', + '$workersPath?error=${Uri.encodeComponent('Control command failed.')}', ); } } @@ -392,14 +1244,28 @@ Future _controlWorkers( Future _replayDeadLetters( EngineContext ctx, DashboardDataSource service, -) async { + DashboardState state, { + required String basePath, +}) async { + final redirect = _resolveRedirectPath( + await ctx.defaultPostForm('redirect', dashboardRoute(basePath, '/workers')), + fallbackPath: dashboardRoute(basePath, '/workers'), + ); try { final queue = (await ctx.postForm('queue')).trim(); if (queue.isEmpty) { - final encodedError = Uri.encodeComponent( - 'Queue name is required for replay.', + state.recordAudit( + kind: 'action', + action: 'queue.replay', + status: 'error', + actor: 'dashboard', + summary: 'Replay rejected: missing queue name.', + ); + return ctx.turboSeeOther( + _appendRedirectQuery(redirect, { + 'error': 'Queue name is required for replay.', + }), ); - return ctx.turboSeeOther('/workers?error=$encodedError'); } final limitInput = (await ctx.defaultPostForm('limit', '50')).trim(); final limit = int.tryParse(limitInput)?.clamp(1, 500) ?? 50; @@ -421,10 +1287,20 @@ Future _replayDeadLetters( final message = dryRun ? 'Dry run replay found no dead letters for "$queue".' : 'No dead letters replayed for "$queue".'; - final encodedMessage = Uri.encodeComponent(message); - final encodedScope = Uri.encodeComponent(scope); + state.recordAudit( + kind: 'action', + action: 'queue.replay', + status: 'ok', + actor: 'dashboard', + summary: message, + metadata: {'queue': queue, 'dryRun': dryRun}, + ); return ctx.turboSeeOther( - '/workers?flash=$encodedMessage&scope=$encodedScope', + _appendRedirectQuery(redirect, { + 'flash': message, + 'scope': scope, + if (redirect == '/failures') 'queue': queue, + }), ); } @@ -434,17 +1310,86 @@ Future _replayDeadLetters( ? 'Dry run replay would consider $entryCount dead letter$entrySuffix ' 'for "$queue".' : 'Replayed $entryCount dead letter$entrySuffix for "$queue".'; - final encodedMessage = Uri.encodeComponent(message); - final encodedScope = Uri.encodeComponent(scope); + state.recordAudit( + kind: 'action', + action: 'queue.replay', + status: 'ok', + actor: 'dashboard', + summary: message, + metadata: {'queue': queue, 'entries': entryCount, 'dryRun': dryRun}, + ); return ctx.turboSeeOther( - '/workers?flash=$encodedMessage&scope=$encodedScope', + _appendRedirectQuery(redirect, { + 'flash': message, + 'scope': scope, + if (redirect == '/failures') 'queue': queue, + }), ); } on Object catch (error, stack) { - stderr - ..writeln('[stem-dashboard] DLQ replay failed: $error') - ..writeln(stack); + stemLogger.error( + 'Dashboard dead-letter replay failed', + stemLogContext( + component: 'dashboard', + subsystem: 'server', + fields: { + 'error': error.toString(), + 'stack': stack.toString(), + }, + ), + ); + state.recordAudit( + kind: 'action', + action: 'queue.replay', + status: 'error', + actor: 'dashboard', + summary: 'Dead-letter replay failed: $error', + ); return ctx.turboSeeOther( - '/workers?error=${Uri.encodeComponent('Failed to replay dead letters.')}', + _appendRedirectQuery(redirect, { + 'error': 'Failed to replay dead letters.', + }), ); } } + +String _resolveRedirectPath( + String? raw, { + required String fallbackPath, +}) { + final value = raw?.trim() ?? ''; + if (value.isEmpty || !value.startsWith('/')) { + return fallbackPath; + } + final uri = Uri.tryParse(value); + if (uri == null || uri.host.isNotEmpty || uri.scheme.isNotEmpty) { + return fallbackPath; + } + return value; +} + +String _appendRedirectQuery( + String path, + Map params, +) { + final uri = Uri.parse(path); + final merged = Map.from(uri.queryParameters); + for (final entry in params.entries) { + if (entry.value.trim().isEmpty) continue; + merged[entry.key] = entry.value; + } + final query = merged.entries + .map( + (entry) { + final key = Uri.encodeQueryComponent(entry.key); + final value = Uri.encodeQueryComponent(entry.value); + return '$key=$value'; + }, + ) + .join('&'); + return query.isEmpty ? uri.path : '${uri.path}?$query'; +} + +bool _isTruthy(String value) { + final normalized = value.trim().toLowerCase(); + return normalized == 'true' || normalized == '1' || normalized == 'yes'; +} diff --git a/packages/dashboard/lib/src/services/models.dart b/packages/dashboard/lib/src/services/models.dart index 54730d0f..42681efd 100644 --- a/packages/dashboard/lib/src/services/models.dart +++ b/packages/dashboard/lib/src/services/models.dart @@ -1,4 +1,14 @@ -import 'package:stem/stem.dart' show QueueHeartbeat, WorkerHeartbeat, stemNow; +import 'package:stem/stem.dart' + show + QueueHeartbeat, + RunState, + TaskState, + TaskStatus, + TaskStatusRecord, + WorkerHeartbeat, + WorkflowStatus, + WorkflowStepEntry, + stemNow; /// Aggregate counts for a queue at a point in time. class QueueSummary { @@ -181,6 +191,604 @@ class DashboardEvent { final Map metadata; } +/// Audit log entry for operator actions and automated alerts. +class DashboardAuditEntry { + /// Creates an audit log entry. + const DashboardAuditEntry({ + required this.id, + required this.timestamp, + required this.kind, + required this.action, + required this.status, + this.actor, + this.summary, + this.metadata = const {}, + }); + + /// Stable entry identifier. + final String id; + + /// Event timestamp. + final DateTime timestamp; + + /// Entry kind: `action` or `alert`. + final String kind; + + /// Action/event type identifier. + final String action; + + /// Status marker (`ok`, `error`, `sent`, `skipped`, etc.). + final String status; + + /// Actor identifier where applicable. + final String? actor; + + /// Human-readable summary. + final String? summary; + + /// Optional metadata payload. + final Map metadata; +} + +/// Dashboard-friendly projection of a persisted task status record. +class DashboardTaskStatusEntry { + /// Creates a task status entry. + const DashboardTaskStatusEntry({ + required this.id, + required this.state, + required this.attempt, + required this.createdAt, + required this.updatedAt, + required this.queue, + required this.taskName, + this.errorMessage, + this.errorType, + this.errorStack, + this.payload, + this.meta = const {}, + this.runId, + this.workflowName, + this.workflowStep, + this.workflowStepIndex, + this.workflowIteration, + this.retryable = false, + }); + + /// Builds a dashboard task entry from a [TaskStatusRecord]. + factory DashboardTaskStatusEntry.fromRecord(TaskStatusRecord record) { + final status = record.status; + final meta = status.meta; + final error = status.error; + final queue = _readQueue(meta); + final taskName = _readTaskName(meta); + return DashboardTaskStatusEntry( + id: status.id, + state: status.state, + attempt: status.attempt, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + queue: queue, + taskName: taskName, + errorMessage: error?.message, + errorType: error?.type, + errorStack: error?.stack, + payload: status.payload, + meta: meta, + runId: meta['stem.workflow.runId']?.toString(), + workflowName: meta['stem.workflow.name']?.toString(), + workflowStep: meta['stem.workflow.step']?.toString(), + workflowStepIndex: _readInt(meta['stem.workflow.stepIndex']), + workflowIteration: _readInt(meta['stem.workflow.iteration']), + retryable: error?.retryable ?? false, + ); + } + + /// Builds a dashboard task entry from a plain [TaskStatus]. + /// + /// Use this when the result backend can return the current status but not + /// the persisted record timestamps. + factory DashboardTaskStatusEntry.fromStatus( + TaskStatus status, { + DateTime? observedAt, + }) { + final seenAt = observedAt?.toUtc() ?? stemNow().toUtc(); + final meta = status.meta; + final queue = _readQueue(meta); + final taskName = _readTaskName(meta); + final error = status.error; + return DashboardTaskStatusEntry( + id: status.id, + state: status.state, + attempt: status.attempt, + createdAt: seenAt, + updatedAt: seenAt, + queue: queue, + taskName: taskName, + errorMessage: error?.message, + errorType: error?.type, + errorStack: error?.stack, + payload: status.payload, + meta: meta, + runId: meta['stem.workflow.runId']?.toString(), + workflowName: meta['stem.workflow.name']?.toString(), + workflowStep: meta['stem.workflow.step']?.toString(), + workflowStepIndex: _readInt(meta['stem.workflow.stepIndex']), + workflowIteration: _readInt(meta['stem.workflow.iteration']), + retryable: error?.retryable ?? false, + ); + } + + /// Task identifier. + final String id; + + /// Current lifecycle state. + final TaskState state; + + /// Attempt count for this status. + final int attempt; + + /// Record creation timestamp. + final DateTime createdAt; + + /// Record update timestamp. + final DateTime updatedAt; + + /// Queue associated with the task. + final String queue; + + /// Task handler name if available. + final String taskName; + + /// Failure message when [state] is failed/retried. + final String? errorMessage; + + /// Failure type when [state] is failed/retried. + final String? errorType; + + /// Failure stack trace when captured by the backend. + final String? errorStack; + + /// Persisted task result payload. + final Object? payload; + + /// Raw task metadata from the result backend. + final Map meta; + + /// Workflow run identifier, when this task is part of a workflow. + final String? runId; + + /// Workflow name, when present. + final String? workflowName; + + /// Workflow step name, when present. + final String? workflowStep; + + /// Workflow step index, when present. + final int? workflowStepIndex; + + /// Workflow iteration, when present. + final int? workflowIteration; + + /// Whether the failure is marked retryable. + final bool retryable; + + /// Namespace reported by task metadata, or `stem` when unavailable. + String get namespace => _readNamespace(meta); + + /// Whether this entry represents a workflow task. + bool get isWorkflowTask => + runId != null || + taskName.startsWith('stem.workflow.') || + taskName.contains('workflow'); + + /// Whether this entry is in a failed terminal state. + bool get isFailure => + state == TaskState.failed || state == TaskState.cancelled; + + /// Fingerprint used to group related failures in diagnostics views. + String get errorFingerprint { + final type = (errorType ?? 'Unknown').trim(); + final message = (errorMessage ?? 'No message').trim(); + return '$type: $message'; + } + + /// Task processing start timestamp, when recorded by workers. + DateTime? get startedAt => _readDate(meta['startedAt']); + + /// Task completion/failure timestamp, when recorded by workers. + DateTime? get finishedAt => + _readDate(meta['completedAt']) ?? _readDate(meta['failedAt']); + + /// Estimated queue wait from persisted record creation to processing start. + Duration? get queueWait { + final started = startedAt; + if (started == null) return null; + final value = started.difference(createdAt.toUtc()); + if (value.isNegative) return Duration.zero; + return value; + } + + /// Estimated processing time from start to finish/last update. + Duration? get processingTime { + final started = startedAt; + if (started == null) return null; + final end = finishedAt ?? updatedAt.toUtc(); + final value = end.difference(started); + if (value.isNegative) return Duration.zero; + return value; + } +} + +/// App-focused namespace summary for dashboard observability. +class DashboardNamespaceSnapshot { + /// Creates a namespace summary. + const DashboardNamespaceSnapshot({ + required this.namespace, + required this.queueCount, + required this.workerCount, + required this.pending, + required this.inflight, + required this.deadLetters, + required this.runningTasks, + required this.failedTasks, + required this.workflowRuns, + }); + + /// Namespace identifier. + final String namespace; + + /// Number of distinct queues seen for this namespace. + final int queueCount; + + /// Number of active workers in this namespace. + final int workerCount; + + /// Pending queue depth. + final int pending; + + /// In-flight envelope count. + final int inflight; + + /// Dead-letter count. + final int deadLetters; + + /// Running task statuses. + final int runningTasks; + + /// Failed terminal task statuses. + final int failedTasks; + + /// Distinct workflow run ids observed in task metadata. + final int workflowRuns; +} + +/// Aggregate task summary grouped by task name. +class DashboardJobSummary { + /// Creates a task/job summary. + const DashboardJobSummary({ + required this.taskName, + required this.sampleQueue, + required this.total, + required this.running, + required this.succeeded, + required this.failed, + required this.retried, + required this.cancelled, + required this.lastUpdated, + }); + + /// Task handler name. + final String taskName; + + /// Queue most commonly associated with this task in sampled statuses. + final String sampleQueue; + + /// Total sampled statuses for this task. + final int total; + + /// Running count. + final int running; + + /// Success count. + final int succeeded; + + /// Failure count. + final int failed; + + /// Retried count. + final int retried; + + /// Cancelled count. + final int cancelled; + + /// Most recent update timestamp across sampled statuses. + final DateTime lastUpdated; + + /// Failure ratio in sampled statuses. + double get failureRatio => total <= 0 ? 0 : failed / total; +} + +/// Workflow run summary projected from task status metadata. +class DashboardWorkflowRunSummary { + /// Creates a workflow summary. + const DashboardWorkflowRunSummary({ + required this.runId, + required this.workflowName, + required this.lastStep, + required this.total, + required this.queued, + required this.running, + required this.succeeded, + required this.failed, + required this.cancelled, + required this.lastUpdated, + }); + + /// Workflow run id. + final String runId; + + /// Workflow name, when available. + final String workflowName; + + /// Most recent step marker, when available. + final String? lastStep; + + /// Total sampled statuses for this run. + final int total; + + /// Queued count. + final int queued; + + /// Running count. + final int running; + + /// Succeeded count. + final int succeeded; + + /// Failed count. + final int failed; + + /// Cancelled count. + final int cancelled; + + /// Most recent update timestamp. + final DateTime lastUpdated; +} + +/// Builds app-focused namespace summaries from sampled runtime state. +List buildNamespaceSnapshots({ + required List queues, + required List workers, + required List tasks, + String defaultNamespace = 'stem', +}) { + final queueNamesByNamespace = >{}; + final pendingByNamespace = {}; + final inflightByNamespace = {}; + final deadByNamespace = {}; + final workerCountByNamespace = {}; + final runningByNamespace = {}; + final failedByNamespace = {}; + final runsByNamespace = >{}; + + for (final queue in queues) { + queueNamesByNamespace.putIfAbsent(defaultNamespace, () => {}).add( + queue.queue, + ); + pendingByNamespace[defaultNamespace] = + (pendingByNamespace[defaultNamespace] ?? 0) + queue.pending; + inflightByNamespace[defaultNamespace] = + (inflightByNamespace[defaultNamespace] ?? 0) + queue.inflight; + deadByNamespace[defaultNamespace] = + (deadByNamespace[defaultNamespace] ?? 0) + queue.deadLetters; + } + + for (final worker in workers) { + final namespace = worker.namespace.trim().isEmpty + ? defaultNamespace + : worker.namespace.trim(); + workerCountByNamespace[namespace] = + (workerCountByNamespace[namespace] ?? 0) + 1; + final names = queueNamesByNamespace.putIfAbsent( + namespace, + () => {}, + ); + for (final queue in worker.queues) { + names.add(queue.name); + } + } + + for (final task in tasks) { + final namespace = task.namespace.trim().isEmpty + ? defaultNamespace + : task.namespace.trim(); + queueNamesByNamespace.putIfAbsent(namespace, () => {}).add( + task.queue, + ); + if (task.state == TaskState.running) { + runningByNamespace[namespace] = (runningByNamespace[namespace] ?? 0) + 1; + } + if (task.isFailure) { + failedByNamespace[namespace] = (failedByNamespace[namespace] ?? 0) + 1; + } + if (task.runId != null && task.runId!.isNotEmpty) { + runsByNamespace.putIfAbsent(namespace, () => {}).add(task.runId!); + } + } + + final namespaces = { + ...queueNamesByNamespace.keys, + ...workerCountByNamespace.keys, + ...runningByNamespace.keys, + ...failedByNamespace.keys, + ...runsByNamespace.keys, + }.toList(growable: false) + ..sort(); + + return namespaces.map((namespace) { + return DashboardNamespaceSnapshot( + namespace: namespace, + queueCount: queueNamesByNamespace[namespace]?.length ?? 0, + workerCount: workerCountByNamespace[namespace] ?? 0, + pending: pendingByNamespace[namespace] ?? 0, + inflight: inflightByNamespace[namespace] ?? 0, + deadLetters: deadByNamespace[namespace] ?? 0, + runningTasks: runningByNamespace[namespace] ?? 0, + failedTasks: failedByNamespace[namespace] ?? 0, + workflowRuns: runsByNamespace[namespace]?.length ?? 0, + ); + }).toList(growable: false); +} + +/// Builds task/job summaries grouped by task name. +List buildJobSummaries( + List tasks, { + int limit = 20, +}) { + final buckets = {}; + for (final task in tasks) { + buckets + .putIfAbsent( + task.taskName, + () => _DashboardJobSummaryBuilder(taskName: task.taskName), + ) + .add(task); + } + final results = buckets.values.map((bucket) => bucket.build()).toList() + ..sort((a, b) { + final byTotal = b.total.compareTo(a.total); + if (byTotal != 0) return byTotal; + return b.lastUpdated.compareTo(a.lastUpdated); + }); + final bounded = limit < 1 ? 1 : limit; + return results.take(bounded).toList(growable: false); +} + +/// Builds workflow run summaries grouped by run id. +List buildWorkflowRunSummaries( + List tasks, { + int limit = 20, +}) { + final buckets = {}; + for (final task in tasks) { + final runId = task.runId?.trim(); + if (runId == null || runId.isEmpty) continue; + buckets + .putIfAbsent(runId, () => _DashboardWorkflowSummaryBuilder(runId)) + .add(task); + } + final results = buckets.values.map((bucket) => bucket.build()).toList() + ..sort((a, b) => b.lastUpdated.compareTo(a.lastUpdated)); + final bounded = limit < 1 ? 1 : limit; + return results.take(bounded).toList(growable: false); +} + +/// Projection of a workflow run snapshot for dashboard rendering. +class DashboardWorkflowRunSnapshot { + /// Creates a workflow run snapshot. + const DashboardWorkflowRunSnapshot({ + required this.id, + required this.workflow, + required this.status, + required this.cursor, + required this.createdAt, + this.updatedAt, + this.waitTopic, + this.resumeAt, + this.ownerId, + this.leaseExpiresAt, + this.lastError, + this.result, + }); + + /// Builds a dashboard workflow run snapshot from [RunState]. + factory DashboardWorkflowRunSnapshot.fromRunState(RunState state) { + return DashboardWorkflowRunSnapshot( + id: state.id, + workflow: state.workflow, + status: state.status, + cursor: state.cursor, + createdAt: state.createdAt, + updatedAt: state.updatedAt, + waitTopic: state.waitTopic, + resumeAt: state.resumeAt, + ownerId: state.ownerId, + leaseExpiresAt: state.leaseExpiresAt, + lastError: state.lastError, + result: state.result, + ); + } + + /// Run identifier. + final String id; + + /// Workflow name. + final String workflow; + + /// Current lifecycle state. + final WorkflowStatus status; + + /// Next step cursor. + final int cursor; + + /// Run creation timestamp. + final DateTime createdAt; + + /// Most recent mutation timestamp. + final DateTime? updatedAt; + + /// Topic currently awaited by this run, when suspended. + final String? waitTopic; + + /// Resume deadline for suspended runs. + final DateTime? resumeAt; + + /// Owner of the active lease when running. + final String? ownerId; + + /// Lease expiration if the run is claimed. + final DateTime? leaseExpiresAt; + + /// Last error payload recorded by the workflow runtime. + final Map? lastError; + + /// Final workflow result payload when completed. + final Object? result; +} + +/// Projection of a persisted workflow step checkpoint. +class DashboardWorkflowStepSnapshot { + /// Creates a workflow step snapshot. + const DashboardWorkflowStepSnapshot({ + required this.name, + required this.position, + required this.value, + this.completedAt, + }); + + /// Builds a workflow step snapshot from [WorkflowStepEntry]. + factory DashboardWorkflowStepSnapshot.fromEntry(WorkflowStepEntry entry) { + return DashboardWorkflowStepSnapshot( + name: entry.name, + position: entry.position, + value: entry.value, + completedAt: entry.completedAt, + ); + } + + /// Step name. + final String name; + + /// Step ordering position. + final int position; + + /// Persisted checkpoint value. + final Object? value; + + /// Completion timestamp if available. + final DateTime? completedAt; +} + /// Task request submitted from the dashboard UI. class EnqueueRequest { /// Creates a task enqueue request. @@ -207,3 +815,133 @@ class EnqueueRequest { /// Maximum retry count for the task. final int maxRetries; } + +int? _readInt(Object? value) { + if (value == null) return null; + if (value is int) return value; + if (value is num) return value.toInt(); + return int.tryParse(value.toString()); +} + +DateTime? _readDate(Object? value) { + if (value == null) return null; + if (value is DateTime) return value.toUtc(); + return DateTime.tryParse(value.toString())?.toUtc(); +} + +String _readTaskName(Map meta) { + return meta['task']?.toString() ?? + meta['stem.task']?.toString() ?? + meta['name']?.toString() ?? + meta['taskName']?.toString() ?? + 'unknown'; +} + +String _readQueue(Map meta) { + return meta['queue']?.toString() ?? + meta['stem.queue']?.toString() ?? + 'default'; +} + +String _readNamespace(Map meta) { + return meta['namespace']?.toString() ?? + meta['stem.namespace']?.toString() ?? + 'stem'; +} + +class _DashboardJobSummaryBuilder { + _DashboardJobSummaryBuilder({required this.taskName}); + + final String taskName; + final Map _queueHits = {}; + var _total = 0; + var _running = 0; + var _succeeded = 0; + var _failed = 0; + var _retried = 0; + var _cancelled = 0; + DateTime _lastUpdated = DateTime.fromMillisecondsSinceEpoch(0, isUtc: true); + + void add(DashboardTaskStatusEntry task) { + _total += 1; + _queueHits[task.queue] = (_queueHits[task.queue] ?? 0) + 1; + if (task.state == TaskState.running) _running += 1; + if (task.state == TaskState.succeeded) _succeeded += 1; + if (task.state == TaskState.failed) _failed += 1; + if (task.state == TaskState.retried) _retried += 1; + if (task.state == TaskState.cancelled) _cancelled += 1; + if (task.updatedAt.toUtc().isAfter(_lastUpdated)) { + _lastUpdated = task.updatedAt.toUtc(); + } + } + + DashboardJobSummary build() { + final sampleQueue = _queueHits.entries.isEmpty + ? 'default' + : (_queueHits.entries.toList() + ..sort((a, b) => b.value.compareTo(a.value))) + .first + .key; + return DashboardJobSummary( + taskName: taskName, + sampleQueue: sampleQueue, + total: _total, + running: _running, + succeeded: _succeeded, + failed: _failed, + retried: _retried, + cancelled: _cancelled, + lastUpdated: _lastUpdated, + ); + } +} + +class _DashboardWorkflowSummaryBuilder { + _DashboardWorkflowSummaryBuilder(this.runId); + + final String runId; + String _workflowName = 'workflow'; + String? _lastStep; + var _total = 0; + var _queued = 0; + var _running = 0; + var _succeeded = 0; + var _failed = 0; + var _cancelled = 0; + DateTime _lastUpdated = DateTime.fromMillisecondsSinceEpoch(0, isUtc: true); + + void add(DashboardTaskStatusEntry task) { + _total += 1; + if (task.workflowName != null && task.workflowName!.isNotEmpty) { + _workflowName = task.workflowName!; + } + if (task.workflowStep != null && task.workflowStep!.isNotEmpty) { + _lastStep = task.workflowStep; + } + if (task.state == TaskState.queued || task.state == TaskState.retried) { + _queued += 1; + } + if (task.state == TaskState.running) _running += 1; + if (task.state == TaskState.succeeded) _succeeded += 1; + if (task.state == TaskState.failed) _failed += 1; + if (task.state == TaskState.cancelled) _cancelled += 1; + if (task.updatedAt.toUtc().isAfter(_lastUpdated)) { + _lastUpdated = task.updatedAt.toUtc(); + } + } + + DashboardWorkflowRunSummary build() { + return DashboardWorkflowRunSummary( + runId: runId, + workflowName: _workflowName, + lastStep: _lastStep, + total: _total, + queued: _queued, + running: _running, + succeeded: _succeeded, + failed: _failed, + cancelled: _cancelled, + lastUpdated: _lastUpdated, + ); + } +} diff --git a/packages/dashboard/lib/src/services/stem_service.dart b/packages/dashboard/lib/src/services/stem_service.dart index 4eb872d1..b90ded04 100644 --- a/packages/dashboard/lib/src/services/stem_service.dart +++ b/packages/dashboard/lib/src/services/stem_service.dart @@ -1,10 +1,13 @@ import 'dart:async'; +import 'dart:io'; import 'package:stem/stem.dart'; import 'package:stem_cli/stem_cli.dart'; - import 'package:stem_dashboard/src/config/config.dart'; import 'package:stem_dashboard/src/services/models.dart'; +import 'package:stem_postgres/stem_postgres.dart'; +import 'package:stem_redis/stem_redis.dart'; +import 'package:stem_sqlite/stem_sqlite.dart'; /// Contract for dashboard services that load queue and worker data. abstract class DashboardDataSource { @@ -14,6 +17,29 @@ abstract class DashboardDataSource { /// Fetches current worker status snapshots. Future> fetchWorkerStatuses(); + /// Fetches persisted task statuses for observability views. + Future> fetchTaskStatuses({ + TaskState? state, + String? queue, + int limit = 100, + int offset = 0, + }); + + /// Fetches a single task status by [taskId]. + Future fetchTaskStatus(String taskId); + + /// Fetches statuses belonging to a workflow [runId]. + Future> fetchTaskStatusesForRun( + String runId, { + int limit = 200, + }); + + /// Fetches persisted workflow run snapshot, if a workflow store is available. + Future fetchWorkflowRun(String runId); + + /// Fetches persisted workflow checkpoints, if a workflow store is available. + Future> fetchWorkflowSteps(String runId); + /// Enqueues a task request through the backing broker. Future enqueueTask(EnqueueRequest request); @@ -24,6 +50,20 @@ abstract class DashboardDataSource { bool dryRun = false, }); + /// Replays a specific dead-letter task by [taskId]. + /// + /// Returns `true` when the entry was found and replayed. + Future replayTaskById(String taskId, {String? queue}); + + /// Requests revocation for [taskId]. + /// + /// Returns `true` when a revoke store is configured and the request is saved. + Future revokeTask( + String taskId, { + bool terminate = false, + String? reason, + }); + /// Sends a control command and returns any replies collected. Future> sendControlCommand( ControlCommandMessage command, { @@ -40,28 +80,57 @@ class StemDashboardService implements DashboardDataSource { required DashboardConfig config, required Broker broker, ResultBackend? backend, + WorkflowStore? workflowStore, + RevokeStore? revokeStore, + Future Function()? disposeContext, + Future<_DashboardRuntimeContext> Function()? reloadRuntimeContext, + bool ownsWorkflowStore = false, }) : _config = config, _namespace = config.namespace, + _signer = PayloadSigner.maybe(config.stem.signing), _broker = broker, - _backend = backend; + _backend = backend, + _workflowStore = workflowStore, + _revokeStore = revokeStore, + _disposeContext = disposeContext, + _reloadRuntimeContext = reloadRuntimeContext, + _ownsWorkflowStore = ownsWorkflowStore; final DashboardConfig _config; final String _namespace; - final Broker _broker; - final ResultBackend? _backend; + final PayloadSigner? _signer; + Broker _broker; + ResultBackend? _backend; + final WorkflowStore? _workflowStore; + RevokeStore? _revokeStore; + Future Function()? _disposeContext; + final Future<_DashboardRuntimeContext> Function()? _reloadRuntimeContext; + Future? _runtimeReconnectFuture; + Future _runtimeOperationQueue = Future.value(); + final bool _ownsWorkflowStore; + var _closed = false; /// Creates a dashboard service using [config]. /// /// Uses [createDefaultContext] to set up broker and backend from environment. static Future connect(DashboardConfig config) async { - final ctx = await createDefaultContext( - environment: Map.from(config.environment), + final runtimeContext = await _createRuntimeContext(config); + + final workflowStore = await _connectWorkflowStore( + config.environment['STEM_WORKFLOW_STORE_URL'], + namespace: _resolveWorkflowNamespace(config), + tls: config.tls, ); return StemDashboardService._( config: config, - broker: ctx.broker, - backend: ctx.backend, + broker: runtimeContext.broker, + backend: runtimeContext.backend, + workflowStore: workflowStore, + revokeStore: runtimeContext.revokeStore, + disposeContext: runtimeContext.dispose, + reloadRuntimeContext: () => _createRuntimeContext(config), + ownsWorkflowStore: true, ); } @@ -73,47 +142,164 @@ class StemDashboardService implements DashboardDataSource { required DashboardConfig config, required Broker broker, ResultBackend? backend, + WorkflowStore? workflowStore, + RevokeStore? revokeStore, }) async { return StemDashboardService._( config: config, broker: broker, backend: backend, + workflowStore: workflowStore, + revokeStore: revokeStore, ); } @override Future> fetchQueueSummaries() async { - final queues = await _discoverQueues(); - final summaries = []; + try { + return await _withRuntimeReconnectRetry(_fetchQueueSummariesImpl); + } on Object catch (error, stack) { + _logReadFailure('fetchQueueSummaries', error, stack); + return const []; + } + } - for (final queue in queues) { - final pending = await _broker.pendingCount(queue) ?? 0; - final inflight = await _broker.inflightCount(queue) ?? 0; - final dead = await _deadLetterCount(queue); + @override + Future> fetchWorkerStatuses() async { + try { + final heartbeats = await _withRuntimeReconnectRetry(() async { + final backend = _backend; + if (backend == null) return const []; + return backend.listWorkerHeartbeats(); + }); + return heartbeats.map(WorkerStatus.fromHeartbeat).toList(growable: false) + ..sort((a, b) => a.workerId.compareTo(b.workerId)); + } on Object catch (error, stack) { + _logReadFailure('fetchWorkerStatuses', error, stack); + return const []; + } + } - summaries.add( - QueueSummary( - queue: queue, - pending: pending, - inflight: inflight, - deadLetters: dead, - ), - ); + @override + Future> fetchTaskStatuses({ + TaskState? state, + String? queue, + int limit = 100, + int offset = 0, + }) async { + final resolvedQueue = queue?.trim(); + final boundedLimit = limit.clamp(1, 500); + final boundedOffset = offset < 0 ? 0 : offset; + try { + final page = await _withRuntimeReconnectRetry(() async { + final backend = _backend; + if (backend == null) { + return const TaskStatusPage(items: []); + } + return backend.listTaskStatuses( + TaskStatusListRequest( + state: state, + queue: resolvedQueue == null || resolvedQueue.isEmpty + ? null + : resolvedQueue, + limit: boundedLimit, + offset: boundedOffset, + ), + ); + }); + return page.items + .map(DashboardTaskStatusEntry.fromRecord) + .toList(growable: false); + } on Object catch (error, stack) { + _logReadFailure('fetchTaskStatuses', error, stack); + return const []; } + } - summaries.sort((a, b) => a.queue.compareTo(b.queue)); - return summaries; + @override + Future fetchTaskStatus(String taskId) async { + final trimmed = taskId.trim(); + if (trimmed.isEmpty) return null; + + try { + final record = await _findTaskStatusRecord(trimmed); + if (record != null) { + return DashboardTaskStatusEntry.fromRecord(record); + } + final backend = _backend; + if (backend == null) return null; + final status = await backend.get(trimmed); + if (status == null) { + return null; + } + return DashboardTaskStatusEntry.fromStatus(status); + } on Object { + return null; + } } @override - Future> fetchWorkerStatuses() async { - final backend = _backend; - if (backend == null) return const []; + Future> fetchTaskStatusesForRun( + String runId, { + int limit = 200, + }) async { + final trimmed = runId.trim(); + if (trimmed.isEmpty) return const []; try { - final heartbeats = await backend.listWorkerHeartbeats(); - return heartbeats.map(WorkerStatus.fromHeartbeat).toList(growable: false) - ..sort((a, b) => a.workerId.compareTo(b.workerId)); + final page = await _withRuntimeReconnectRetry(() async { + final backend = _backend; + if (backend == null) { + return const TaskStatusPage(items: []); + } + return backend.listTaskStatuses( + TaskStatusListRequest( + meta: {'stem.workflow.runId': trimmed}, + limit: limit.clamp(1, 500), + ), + ); + }); + return page.items + .map(DashboardTaskStatusEntry.fromRecord) + .toList(growable: false); + } on Object { + return const []; + } + } + + @override + Future fetchWorkflowRun(String runId) async { + final store = _workflowStore; + if (store == null) return null; + + final trimmed = runId.trim(); + if (trimmed.isEmpty) return null; + + try { + final run = await store.get(trimmed); + if (run == null) return null; + return DashboardWorkflowRunSnapshot.fromRunState(run); + } on Object { + return null; + } + } + + @override + Future> fetchWorkflowSteps( + String runId, + ) async { + final store = _workflowStore; + if (store == null) return const []; + + final trimmed = runId.trim(); + if (trimmed.isEmpty) return const []; + + try { + final steps = await store.listSteps(trimmed); + return steps + .map(DashboardWorkflowStepSnapshot.fromEntry) + .toList(growable: false) + ..sort((a, b) => a.position.compareTo(b.position)); } on Object { return const []; } @@ -130,7 +316,7 @@ class StemDashboardService implements DashboardDataSource { maxRetries: request.maxRetries, meta: const {'source': 'dashboard'}, ); - await _broker.publish(envelope); + await _publishEnvelope(envelope); } @override @@ -140,7 +326,91 @@ class StemDashboardService implements DashboardDataSource { bool dryRun = false, }) async { final bounded = limit.clamp(1, 500); - return _broker.replayDeadLetters(queue, limit: bounded, dryRun: dryRun); + return _withRuntimeReconnectRetry( + () => _broker.replayDeadLetters(queue, limit: bounded, dryRun: dryRun), + ); + } + + @override + Future replayTaskById(String taskId, {String? queue}) async { + final trimmedTask = taskId.trim(); + if (trimmedTask.isEmpty) return false; + + final resolvedQueue = await _resolveReplayQueue(trimmedTask, queue: queue); + if (resolvedQueue == null) { + return false; + } + + final deadLetter = await _withRuntimeReconnectRetry( + () => _broker.getDeadLetter(resolvedQueue, trimmedTask), + ); + if (deadLetter == null) { + return false; + } + + final now = stemNow().toUtc(); + final original = deadLetter.envelope; + final replayMeta = Map.from(original.meta) + ..['source'] = 'dashboard' + ..['dashboard.replayFromTaskId'] = trimmedTask + ..['dashboard.replayedAt'] = now.toIso8601String(); + final replay = original.copyWith( + id: generateEnvelopeId(), + attempt: 0, + enqueuedAt: now, + meta: replayMeta, + ); + await _publishEnvelope(replay); + + final backend = _backend; + if (backend != null) { + final queuedMeta = { + 'queue': replay.queue, + 'task': replay.name, + ...replayMeta, + }; + await backend.set( + replay.id, + TaskState.queued, + attempt: 0, + meta: queuedMeta, + ); + } + return true; + } + + @override + Future revokeTask( + String taskId, { + bool terminate = false, + String? reason, + }) async { + final trimmedTask = taskId.trim(); + if (trimmedTask.isEmpty) return false; + + final now = stemNow().toUtc(); + final trimmedReason = reason?.trim(); + final entry = RevokeEntry( + namespace: _namespace, + taskId: trimmedTask, + version: generateRevokeVersion(), + issuedAt: now, + terminate: terminate, + reason: trimmedReason == null || trimmedReason.isEmpty + ? null + : trimmedReason, + requestedBy: 'dashboard', + ); + try { + final store = _revokeStore; + if (store == null) { + return false; + } + await store.upsertAll([entry]); + return true; + } on Object { + return false; + } } @override @@ -172,7 +442,7 @@ class StemDashboardService implements DashboardDataSource { meta: const {'source': 'dashboard'}, enqueuedAt: now, ); - await _broker.publish(envelope); + await _publishEnvelope(envelope); } final expectedReplies = command.targets.isEmpty @@ -180,14 +450,18 @@ class StemDashboardService implements DashboardDataSource { : command.targets.length; final prefetch = expectedReplies == null ? 8 : expectedReplies.clamp(1, 32); - final subscription = _broker.consume( - RoutingSubscription.singleQueue(replyQueue), - consumerGroup: _controlConsumerGroup, - consumerName: 'dashboard-${command.requestId}', - prefetch: prefetch, + final subscription = await _withRuntimeReconnectRetry>( + () async { + return _broker.consume( + RoutingSubscription.singleQueue(replyQueue), + consumerGroup: _controlConsumerGroup, + consumerName: 'dashboard-${command.requestId}', + prefetch: prefetch, + ); + }, ); - final iterator = StreamIterator(subscription); + final iterator = StreamIterator(subscription); final replies = []; final deadline = stemNow().add(timeout); @@ -210,9 +484,11 @@ class StemDashboardService implements DashboardDataSource { try { final reply = controlReplyFromEnvelope(delivery.envelope); replies.add(reply); - await _broker.ack(delivery); + await _withRuntimeReconnectRetry(() => _broker.ack(delivery)); } on Object { - await _broker.nack(delivery, requeue: false); + await _withRuntimeReconnectRetry( + () => _broker.nack(delivery, requeue: false), + ); } if (expectedReplies != null && replies.length >= expectedReplies) { @@ -229,9 +505,155 @@ class StemDashboardService implements DashboardDataSource { @override Future close() async { - // Note: The broker and backend will be closed when the context is disposed. - // Since we got them from createDefaultContext, we don't own their - // lifecycle. + if (_closed) return; + _closed = true; + + if (_ownsWorkflowStore) { + await _disposeWorkflowStore(_workflowStore); + } + + await _disposeRuntimeContext(); + } + + Future> _fetchQueueSummariesImpl() async { + final queues = await _discoverQueues(); + final summaries = []; + + for (final queue in queues) { + final pending = await _broker.pendingCount(queue) ?? 0; + final inflight = await _broker.inflightCount(queue) ?? 0; + final dead = await _deadLetterCount(queue); + + summaries.add( + QueueSummary( + queue: queue, + pending: pending, + inflight: inflight, + deadLetters: dead, + ), + ); + } + + summaries.sort((a, b) => a.queue.compareTo(b.queue)); + return summaries; + } + + Future _withRuntimeReconnectRetry(Future Function() action) { + return _serializeRuntimeAccess(() async { + try { + return await action(); + } on Object catch (error) { + final recovered = await _recoverRuntimeContextIfNeeded(error); + if (!recovered) { + rethrow; + } + return action(); + } + }); + } + + Future _recoverRuntimeContextIfNeeded(Object error) async { + if (_closed || !_isRecoverableConnectionError(error)) { + return false; + } + + final reloadRuntimeContext = _reloadRuntimeContext; + if (reloadRuntimeContext == null) { + return false; + } + + try { + await _reconnectRuntimeContext(reloadRuntimeContext); + return true; + } on Object { + return false; + } + } + + bool _isRecoverableConnectionError(Object error) { + if (error is SocketException || + error is IOException || + error is StateError) { + return true; + } + final message = '$error'.toLowerCase(); + return message.contains('streamsink is closed') || + message.contains('stream is closed') || + message.contains('connection closed') || + message.contains('not connected') || + message.contains('connection refused') || + message.contains('socket is closed') || + message.contains('broken pipe') || + message.contains('timed out') || + message.contains('connection reset'); + } + + Future _serializeRuntimeAccess(Future Function() action) { + final completer = Completer(); + _runtimeOperationQueue = _runtimeOperationQueue.catchError((_) {}).then(( + _, + ) async { + try { + completer.complete(await action()); + } on Object catch (error, stack) { + completer.completeError(error, stack); + } + }); + return completer.future; + } + + void _logReadFailure(String operation, Object error, StackTrace stack) { + stemLogger.warning( + 'Dashboard data read failed', + stemLogContext( + component: 'dashboard', + subsystem: 'service', + fields: { + 'operation': operation, + 'error': '$error', + 'stack': '$stack', + }, + ), + ); + } + + Future _reconnectRuntimeContext( + Future<_DashboardRuntimeContext> Function() reloadRuntimeContext, + ) async { + if (_runtimeReconnectFuture != null) { + return _runtimeReconnectFuture!; + } + final completer = Completer(); + _runtimeReconnectFuture = completer.future; + try { + final nextContext = await reloadRuntimeContext(); + final previousDispose = _disposeContext; + _broker = nextContext.broker; + _backend = nextContext.backend; + _revokeStore = nextContext.revokeStore; + _disposeContext = nextContext.dispose; + if (previousDispose != null) { + try { + await previousDispose(); + } on Object { + // Ignore disposal failures while recovering from connection errors. + } + } + completer.complete(); + } on Object catch (error, stack) { + completer.completeError(error, stack); + rethrow; + } finally { + _runtimeReconnectFuture = null; + } + } + + Future _disposeRuntimeContext() async { + final disposeContext = _disposeContext; + _disposeContext = null; + if (disposeContext != null) { + await disposeContext(); + } } Future> _discoverQueues() async { @@ -294,11 +716,159 @@ class StemDashboardService implements DashboardDataSource { Future _purgeQueue(String queue) async { try { - await _broker.purge(queue); + await _withRuntimeReconnectRetry(() => _broker.purge(queue)); } on Object { // Some brokers may not support purge; ignore failures. } } + Future _publishEnvelope(Envelope envelope) async { + final signer = _signer; + final payload = signer == null ? envelope : await signer.sign(envelope); + await _withRuntimeReconnectRetry(() => _broker.publish(payload)); + } + + Future _findTaskStatusRecord(String taskId) async { + final backend = _backend; + if (backend == null) return null; + + var offset = 0; + const pageSize = 200; + const maxPages = 10; + + for (var pageIndex = 0; pageIndex < maxPages; pageIndex++) { + final page = await backend.listTaskStatuses( + TaskStatusListRequest(limit: pageSize, offset: offset), + ); + for (final item in page.items) { + if (item.status.id == taskId) { + return item; + } + } + final nextOffset = page.nextOffset; + if (nextOffset == null) { + break; + } + offset = nextOffset; + } + return null; + } + + Future _resolveReplayQueue( + String taskId, { + String? queue, + }) async { + final explicit = queue?.trim(); + if (explicit != null && explicit.isNotEmpty) { + return explicit; + } + + final status = await fetchTaskStatus(taskId); + final statusQueue = status?.queue.trim(); + if (statusQueue != null && statusQueue.isNotEmpty) { + return statusQueue; + } + + final queues = await _discoverQueues(); + for (final candidate in queues) { + final entry = await _broker.getDeadLetter(candidate, taskId); + if (entry != null) { + return candidate; + } + } + return null; + } + + static String _resolveWorkflowNamespace(DashboardConfig config) { + final raw = config.environment['STEM_WORKFLOW_NAMESPACE']?.trim(); + if (raw != null && raw.isNotEmpty) { + return raw; + } + return config.namespace; + } + + static Future _connectWorkflowStore( + String? url, { + required String namespace, + required TlsConfig tls, + }) async { + final trimmed = url?.trim(); + if (trimmed == null || trimmed.isEmpty) { + return null; + } + + final uri = Uri.parse(trimmed); + switch (uri.scheme) { + case 'redis': + case 'rediss': + return RedisWorkflowStore.connect( + trimmed, + namespace: namespace, + tls: tls, + ); + case 'postgres': + case 'postgresql': + case 'postgresql+ssl': + case 'postgres+ssl': + return PostgresWorkflowStore.connect( + trimmed, + namespace: namespace, + applicationName: 'stem-dashboard-workflow', + tls: tls, + ); + case 'sqlite': + final path = uri.path.isNotEmpty ? uri.path : 'workflow.sqlite'; + return SqliteWorkflowStore.open(File(path)); + case 'file': + return SqliteWorkflowStore.open(File(uri.toFilePath())); + case 'memory': + return InMemoryWorkflowStore(); + default: + return null; + } + } + + static Future _disposeWorkflowStore(WorkflowStore? store) async { + if (store is RedisWorkflowStore) { + await store.close(); + return; + } + if (store is PostgresWorkflowStore) { + await store.close(); + return; + } + if (store is SqliteWorkflowStore) { + await store.close(); + } + } + + static Future<_DashboardRuntimeContext> _createRuntimeContext( + DashboardConfig config, + ) async { + final context = await createDefaultContext( + environment: Map.from(config.environment), + ); + return _DashboardRuntimeContext( + broker: context.broker, + backend: context.backend, + revokeStore: context.revokeStore, + dispose: context.dispose, + ); + } + static const _controlConsumerGroup = 'stem-dashboard-control'; } + +class _DashboardRuntimeContext { + const _DashboardRuntimeContext({ + required this.broker, + required this.backend, + required this.revokeStore, + required this.dispose, + }); + + final Broker broker; + final ResultBackend? backend; + final RevokeStore? revokeStore; + final Future Function() dispose; +} diff --git a/packages/dashboard/lib/src/state/dashboard_state.dart b/packages/dashboard/lib/src/state/dashboard_state.dart index 1b4e193a..0dd8696d 100644 --- a/packages/dashboard/lib/src/state/dashboard_state.dart +++ b/packages/dashboard/lib/src/state/dashboard_state.dart @@ -1,8 +1,10 @@ import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; import 'package:meta/meta.dart'; import 'package:routed_hotwire/routed_hotwire.dart'; -import 'package:stem/stem.dart' show stemNow; +import 'package:stem/stem.dart' show TaskState, stemNow; import 'package:stem_dashboard/src/services/models.dart'; import 'package:stem_dashboard/src/services/stem_service.dart'; import 'package:stem_dashboard/src/ui/event_templates.dart'; @@ -14,8 +16,16 @@ class DashboardState { required this.service, this.pollInterval = const Duration(seconds: 5), this.eventLimit = 200, + this.auditLimit = 300, + this.alertWebhookUrls = const [], + this.alertBacklogThreshold = 500, + this.alertFailedTaskThreshold = 25, + this.alertOfflineWorkerThreshold = 1, + this.alertCooldown = const Duration(minutes: 5), }) : hub = TurboStreamHub(); + static const _alertWebhookTimeout = Duration(seconds: 5); + /// Data source used to fetch queues and workers. final DashboardDataSource service; @@ -28,10 +38,34 @@ class DashboardState { /// Maximum number of events retained in memory. final int eventLimit; + /// Maximum number of audit entries retained in memory. + final int auditLimit; + + /// Webhook URLs used for alert delivery. + final List alertWebhookUrls; + + /// Backlog threshold triggering an alert. + final int alertBacklogThreshold; + + /// Failed-task threshold triggering an alert. + final int alertFailedTaskThreshold; + + /// Offline-worker threshold triggering an alert. + final int alertOfflineWorkerThreshold; + + /// Minimum duration between repeated alerts of the same type. + final Duration alertCooldown; + Timer? _timer; List _previousQueues = const []; Map _previousWorkers = const {}; + String _previousQueueSignature = ''; + String _previousWorkerSignature = ''; + String _previousTaskSignature = ''; + var _hasPrimedRefresh = false; final _events = []; + final _auditEntries = []; + final _lastAlertAt = {}; Future _polling = Future.value(); DateTime? _lastPollAt; DashboardThroughput _throughput = const DashboardThroughput( @@ -46,6 +80,10 @@ class DashboardState { /// Most recent throughput calculation. DashboardThroughput get throughput => _throughput; + /// Recent audit entries in reverse chronological order. + List get auditEntries => + List.unmodifiable(_auditEntries); + /// Starts the polling loop and emits initial state. Future start() async { await _runPoll(); @@ -67,14 +105,36 @@ class DashboardState { Future runOnce() => _poll(); Future _poll() async { - final queues = await service.fetchQueueSummaries(); - final workers = await service.fetchWorkerStatuses(); + final results = await Future.wait([ + service.fetchQueueSummaries(), + service.fetchWorkerStatuses(), + service.fetchTaskStatuses(limit: 120), + ]); + final queues = results[0] as List; + final workers = results[1] as List; + final tasks = results[2] as List; _updateThroughput(queues); _generateQueueEvents(_previousQueues, queues); _generateWorkerEvents(_previousWorkers, { for (final worker in workers) worker.workerId: worker, }); + await _evaluateAlerts(queues: queues, workers: workers, tasks: tasks); + + final queueSignature = _queueSignature(queues); + final workerSignature = _workerSignature(workers); + final taskSignature = _taskSignature(tasks); + final changed = + queueSignature != _previousQueueSignature || + workerSignature != _previousWorkerSignature || + taskSignature != _previousTaskSignature; + if (_hasPrimedRefresh && changed) { + _broadcastRefreshSignal(); + } + _hasPrimedRefresh = true; + _previousQueueSignature = queueSignature; + _previousWorkerSignature = workerSignature; + _previousTaskSignature = taskSignature; _previousQueues = queues; _previousWorkers = {for (final worker in workers) worker.workerId: worker}; @@ -256,4 +316,265 @@ class DashboardState { if (delta < 0) return 'decreased by ${delta.abs()}'; return 'unchanged'; } + + String _queueSignature(List queues) { + final sorted = List.from(queues) + ..sort((a, b) => a.queue.compareTo(b.queue)); + return sorted + .map( + (queue) { + return '${queue.queue}:${queue.pending}:' + '${queue.inflight}:${queue.deadLetters}'; + }, + ) + .join('|'); + } + + String _workerSignature(List workers) { + final sorted = List.from(workers) + ..sort((a, b) => a.workerId.compareTo(b.workerId)); + return sorted + .map( + (worker) { + final stamp = worker.timestamp.toUtc().toIso8601String(); + return '${worker.workerId}:${worker.inflight}:$stamp'; + }, + ) + .join('|'); + } + + String _taskSignature(List tasks) { + return tasks + .map( + (task) { + final stamp = task.updatedAt.toUtc().toIso8601String(); + return '${task.id}:${task.state.name}:${task.attempt}:$stamp'; + }, + ) + .join('|'); + } + + void _broadcastRefreshSignal() { + final payload = turboStreamReplace( + target: 'dashboard-refresh-signal', + html: '${stemNow().toUtc().toIso8601String()}', + ); + hub.broadcast('stem-dashboard:refresh', [payload]); + } + + /// Records an audit entry. + void recordAudit({ + required String kind, + required String action, + required String status, + String? actor, + String? summary, + Map metadata = const {}, + }) { + final entry = DashboardAuditEntry( + id: 'audit-${stemNow().toUtc().microsecondsSinceEpoch}', + timestamp: stemNow().toUtc(), + kind: kind, + action: action, + status: status, + actor: actor, + summary: summary, + metadata: metadata, + ); + _auditEntries.insert(0, entry); + if (_auditEntries.length > auditLimit) { + _auditEntries.removeRange(auditLimit, _auditEntries.length); + } + _broadcastRefreshSignal(); + } + + Future _evaluateAlerts({ + required List queues, + required List workers, + required List tasks, + }) async { + final totalPending = queues.fold( + 0, + (total, queue) => total + queue.pending, + ); + if (totalPending >= alertBacklogThreshold) { + await _emitAlert( + key: 'queue.backlog.high', + summary: + 'Backlog threshold exceeded: ' + '$totalPending >= $alertBacklogThreshold.', + metadata: { + 'pendingTotal': totalPending, + 'threshold': alertBacklogThreshold, + }, + ); + } + + final failedCount = tasks.where((task) { + return task.state == TaskState.failed || + task.state == TaskState.cancelled; + }).length; + if (failedCount >= alertFailedTaskThreshold) { + await _emitAlert( + key: 'tasks.failed.high', + summary: + 'Failed task threshold exceeded: ' + '$failedCount >= $alertFailedTaskThreshold.', + metadata: { + 'failedCount': failedCount, + 'threshold': alertFailedTaskThreshold, + }, + ); + } + + final offlineWorkers = workers.where( + (worker) => worker.age > const Duration(minutes: 2), + ); + if (offlineWorkers.length >= alertOfflineWorkerThreshold) { + await _emitAlert( + key: 'workers.offline.high', + summary: + 'Offline workers threshold exceeded: ${offlineWorkers.length} >= ' + '$alertOfflineWorkerThreshold.', + metadata: { + 'offlineWorkers': offlineWorkers + .map((worker) => worker.workerId) + .toList( + growable: false, + ), + 'threshold': alertOfflineWorkerThreshold, + }, + ); + } + } + + Future _emitAlert({ + required String key, + required String summary, + Map metadata = const {}, + }) async { + final now = stemNow().toUtc(); + final last = _lastAlertAt[key]; + if (last != null && now.difference(last) < alertCooldown) { + return; + } + _lastAlertAt[key] = now; + + recordAudit( + kind: 'alert', + action: key, + status: 'triggered', + actor: 'system', + summary: summary, + metadata: metadata, + ); + _recordEvent( + DashboardEvent( + title: 'Alert: $key', + timestamp: now, + summary: summary, + metadata: metadata, + ), + ); + + if (alertWebhookUrls.isEmpty) { + recordAudit( + kind: 'alert', + action: key, + status: 'skipped', + actor: 'system', + summary: 'No alert webhook URLs configured.', + ); + return; + } + await _sendAlertWebhooks(key: key, summary: summary, metadata: metadata); + } + + Future _sendAlertWebhooks({ + required String key, + required String summary, + required Map metadata, + }) async { + final payload = { + 'kind': 'stem-dashboard-alert', + 'key': key, + 'summary': summary, + 'timestamp': stemNow().toUtc().toIso8601String(), + 'metadata': metadata, + }; + + for (final rawUrl in alertWebhookUrls) { + final url = rawUrl.trim(); + if (url.isEmpty) continue; + final uri = Uri.tryParse(url); + if (uri == null || !uri.hasScheme || uri.host.isEmpty) { + recordAudit( + kind: 'alert', + action: key, + status: 'error', + actor: 'system', + summary: 'Invalid webhook URL: $url', + ); + continue; + } + + final client = HttpClient()..connectionTimeout = _alertWebhookTimeout; + HttpClientRequest? request; + var shouldAbortRequest = false; + try { + request = await client.postUrl(uri).timeout(_alertWebhookTimeout); + shouldAbortRequest = true; + request.headers.contentType = ContentType.json; + request.add(utf8.encode(jsonEncode(payload))); + final response = await request.close().timeout(_alertWebhookTimeout); + shouldAbortRequest = false; + try { + await response.drain().timeout(_alertWebhookTimeout); + } on Object { + // Response body is optional for webhook auditing. + } + if (response.statusCode >= 200 && response.statusCode < 300) { + recordAudit( + kind: 'alert', + action: key, + status: 'sent', + actor: 'system', + summary: 'Alert delivered to $url.', + ); + } else { + recordAudit( + kind: 'alert', + action: key, + status: 'error', + actor: 'system', + summary: 'Webhook returned HTTP ${response.statusCode} for $url.', + ); + } + } on TimeoutException { + if (shouldAbortRequest) { + request?.abort(); + } + recordAudit( + kind: 'alert', + action: key, + status: 'error', + actor: 'system', + summary: 'Webhook delivery timed out for $url.', + ); + } on Object catch (error) { + if (shouldAbortRequest) { + request?.abort(); + } + recordAudit( + kind: 'alert', + action: key, + status: 'error', + actor: 'system', + summary: 'Webhook delivery failed for $url: $error', + ); + } finally { + client.close(force: true); + } + } + } } diff --git a/packages/dashboard/lib/src/ui/content.dart b/packages/dashboard/lib/src/ui/content.dart index b5f3ba7e..b88cc745 100644 --- a/packages/dashboard/lib/src/ui/content.dart +++ b/packages/dashboard/lib/src/ui/content.dart @@ -1,594 +1,95 @@ -import 'package:intl/intl.dart'; -import 'package:stem/stem.dart' show stemNow; -// HTML template strings are kept on single lines for readability. -// ignore_for_file: lines_longer_than_80_chars - import 'package:stem_dashboard/src/services/models.dart'; -import 'package:stem_dashboard/src/ui/event_templates.dart'; +import 'package:stem_dashboard/src/ui/audit.dart'; +import 'package:stem_dashboard/src/ui/events.dart'; +import 'package:stem_dashboard/src/ui/failures.dart'; +import 'package:stem_dashboard/src/ui/jobs.dart'; import 'package:stem_dashboard/src/ui/layout.dart'; +import 'package:stem_dashboard/src/ui/namespaces.dart'; +import 'package:stem_dashboard/src/ui/options.dart'; +import 'package:stem_dashboard/src/ui/overview.dart'; +import 'package:stem_dashboard/src/ui/search.dart'; +import 'package:stem_dashboard/src/ui/task_detail.dart'; +import 'package:stem_dashboard/src/ui/tasks.dart'; +import 'package:stem_dashboard/src/ui/workers.dart'; +import 'package:stem_dashboard/src/ui/workflows.dart'; -final _numberFormat = NumberFormat.decimalPattern(); - -/// View options used by the tasks page renderer. -class TasksPageOptions { - /// Creates task page options with optional overrides. - const TasksPageOptions({ - this.sortKey = 'queue', - this.descending = false, - this.filter, - this.flashKey, - this.errorKey, - }); - - /// Sort key used for queue ordering. - final String sortKey; - - /// Whether sorting should be descending. - final bool descending; - - /// Optional queue filter text. - final String? filter; - - /// Optional flash message key for UI alerts. - final String? flashKey; - - /// Optional error message key for UI alerts. - final String? errorKey; - - /// Whether a non-empty filter value is set. - bool get hasFilter => filter != null && filter!.isNotEmpty; -} - -/// View options used by the workers page renderer. -class WorkersPageOptions { - /// Creates worker page options with optional overrides. - const WorkersPageOptions({this.flashMessage, this.errorMessage, this.scope}); - - /// Optional flash message for the UI. - final String? flashMessage; - - /// Optional error message for the UI. - final String? errorMessage; - - /// Optional worker scope filter. - final String? scope; - - /// Whether a non-empty flash message is set. - bool get hasFlash => flashMessage != null && flashMessage!.isNotEmpty; - - /// Whether a non-empty error message is set. - bool get hasError => errorMessage != null && errorMessage!.isNotEmpty; - - /// Whether a non-empty scope value is set. - bool get hasScope => scope != null && scope!.isNotEmpty; -} +export 'package:stem_dashboard/src/ui/options.dart'; /// Builds the HTML for the specified dashboard [page]. String buildPageContent({ required DashboardPage page, required List queues, required List workers, + List taskStatuses = const [], + DashboardTaskStatusEntry? taskDetail, + List runTimeline = const [], + DashboardWorkflowRunSnapshot? workflowRun, + List workflowSteps = const [], + List auditEntries = const [], DashboardThroughput? throughput, List events = const [], + String defaultNamespace = 'stem', TasksPageOptions tasksOptions = const TasksPageOptions(), WorkersPageOptions workersOptions = const WorkersPageOptions(), + FailuresPageOptions failuresOptions = const FailuresPageOptions(), + SearchPageOptions searchOptions = const SearchPageOptions(), + NamespacesPageOptions namespacesOptions = const NamespacesPageOptions(), + WorkflowsPageOptions workflowsOptions = const WorkflowsPageOptions(), + JobsPageOptions jobsOptions = const JobsPageOptions(), }) { switch (page) { case DashboardPage.overview: - return _overviewContent(queues, workers, throughput); + return buildOverviewContent( + queues, + workers, + throughput, + taskStatuses, + defaultNamespace, + ); case DashboardPage.tasks: - return _tasksContent(queues, tasksOptions); + return buildTasksContent(queues, tasksOptions, taskStatuses); + case DashboardPage.taskDetail: + return buildTaskDetailContent( + taskDetail, + runTimeline, + workflowRun, + workflowSteps, + ); + case DashboardPage.failures: + return buildFailuresContent(taskStatuses, failuresOptions); + case DashboardPage.search: + return buildSearchContent( + options: searchOptions, + queues: queues, + workers: workers, + taskStatuses: taskStatuses, + auditEntries: auditEntries, + ); + case DashboardPage.audit: + return buildAuditContent(auditEntries); case DashboardPage.events: - return _eventsContent(events); + return buildEventsContent(events); case DashboardPage.workers: - return _workersContent(workers, queues, workersOptions); - } -} - -String _overviewContent( - List queues, - List workers, - DashboardThroughput? throughput, -) { - final totalPending = queues.fold( - 0, - (total, summary) => total + summary.pending, - ); - final totalInflight = queues.fold( - 0, - (total, summary) => total + summary.inflight, - ); - final totalDead = queues.fold( - 0, - (total, summary) => total + summary.deadLetters, - ); - final activeWorkers = workers.length; - final busiest = List.of( - queues, - )..sort((a, b) => (b.pending + b.inflight).compareTo(a.pending + a.inflight)); - final topQueues = busiest.take(5).toList(); - - final processedPerMin = throughput?.processedPerMinute ?? 0; - final enqueuedPerMin = throughput?.enqueuedPerMinute ?? 0; - final throughputHint = throughput == null - ? 'Waiting for another snapshot to estimate rate.' - : 'Net change over the last ${throughput.interval.inSeconds}s.'; - - return ''' - - -
- ${_metricCard('Backlog (lag)', _formatInt(totalPending), 'Undelivered tasks waiting across all queues.')} - ${_metricCard('Processing', _formatInt(totalInflight), 'Active envelopes currently being executed.')} - ${_metricCard('Processed / min', _formatRate(processedPerMin), throughputHint)} - ${_metricCard('Enqueued / min', _formatRate(enqueuedPerMin), throughputHint)} - ${_metricCard('Dead letters', _formatInt(totalDead), 'Items held in dead letter queues.')} - ${_metricCard('Active workers', _formatInt(activeWorkers), 'Workers that published heartbeats within the retention window.')} -
- -
- - - - - - - - - - - ${topQueues.isEmpty ? _emptyQueuesRow('No queues detected yet.') : topQueues.map(_queueTableRow).join()} - -
QueuePendingIn-flightDead letters
-
-'''; -} - -String _tasksContent(List queues, TasksPageOptions options) { - var filtered = - options.hasFilter - ? queues - .where( - (summary) => summary.queue.toLowerCase().contains( - options.filter!.toLowerCase(), - ), - ) - .toList() - : List.of(queues) - ..sort((a, b) => _compareQueues(a, b, options)); - if (options.descending) { - filtered = filtered.reversed.toList(); - } - - final totalQueues = filtered.length; - final dlqTotal = filtered.fold( - 0, - (total, summary) => total + summary.deadLetters, - ); - - return ''' - - -${_renderTasksAlert(options)} - -
- ${_metricCard('Tracked queues', _formatInt(totalQueues), 'Queues discovered via Redis stream prefixes.')} - ${_metricCard('Dead letter size', _formatInt(dlqTotal), 'Aggregate items across all dead letter queues.')} -
- -
- - - - - - ${options.hasFilter ? 'Clear' : ''} -
- -
- - - - - - - - - - - ${filtered.isEmpty ? _emptyQueuesRow('No streams found for the configured namespace.') : filtered.map(_queueTableRow).join()} - -
${_sortableHeader('Queue', 'queue', options)}${_sortableHeader('Pending', 'pending', options)}${_sortableHeader('In-flight', 'inflight', options)}${_sortableHeader('Dead letters', 'dead', options)}
-
- -
-
-

Ad-hoc enqueue

-
-
- - - - - -
- -
-
-
-'''; -} - -String _formatRate(double value) { - if (value <= 0) return '0'; - if (value < 1) return value.toStringAsFixed(2); - return _numberFormat.format(value.round()); -} - -String _eventsContent(List events) { - final items = events.isEmpty - ? ''' -
-

No events captured yet

-

- Configure the dashboard event bridge to stream Stem signals (enqueue, start, retry, completion) into Redis. - Once connected, updates will appear here automatically via Turbo Streams. -

-
- ''' - : events.map(renderEventItem).join(); - - return ''' - - -
- $items -
-'''; -} - -String _workersContent( - List workers, - List queues, - WorkersPageOptions options, -) { - final healthyWorkers = workers.where((worker) { - return worker.age <= const Duration(minutes: 2); - }).length; - - final busy = workers.where((worker) => worker.inflight > 0).length; - final queueMap = {for (final summary in queues) summary.queue: summary}; - - return ''' - - -${_renderWorkersAlert(options)} - -
- ${_metricCard('Healthy workers', _formatInt(healthyWorkers), 'Heartbeats received within the last two minutes.')} - ${_metricCard('Busy workers', _formatInt(busy), 'Workers currently processing at least one task.')} - ${_metricCard('Isolates in use', _formatInt(_totalIsolates(workers)), 'Sum of worker isolates across the cluster.')} -
- -
- - - - - - - - - - - - ${workers.isEmpty ? ''' - - - - ''' : workers.map(_workerRow).join()} - -
WorkerQueuesInflightLast heartbeatActions
No heartbeats detected for namespace "${workers.isEmpty ? 'stem' : workers.first.namespace}".
-
- -${_clusterControls()} - -${_queueRecoverySection(queueMap)} -'''; -} - -String _queueTableRow(QueueSummary summary) { - return ''' - - ${summary.queue} - ${_formatInt(summary.pending)} - ${_formatInt(summary.inflight)} - ${_formatInt(summary.deadLetters)} - - - -
-
Pending ${_formatInt(summary.pending)}
-
In-flight ${_formatInt(summary.inflight)}
-
Dead letters ${_formatInt(summary.deadLetters)}
-
Detailed DLQ previews render here once the replay control is wired.
-
- - -'''; -} - -String _workerRow(WorkerStatus status) { - final queues = status.queues.isEmpty - ? '—' - : status.queues - .map((queue) => '${queue.name}') - .join(' '); - return ''' - - ${status.workerId} - $queues - ${_formatInt(status.inflight)} - ${_formatRelative(status.timestamp)} - -
- ${_workerActionButton('Ping', 'ping', status.workerId)} - ${_workerActionButton('Pause', 'pause', status.workerId)} - ${_workerActionButton('Shutdown', 'shutdown', status.workerId)} -
- - -'''; -} - -String _workerActionButton(String label, String action, String workerId) { - return ''' -
- - - -
-'''; -} - -String _clusterControls() { - return ''' -
-

Cluster controls

-
- ${_clusterActionButton('Ping all workers', 'ping')} - ${_clusterActionButton('Pause all workers', 'pause')} - ${_clusterActionButton('Shutdown all workers', 'shutdown')} -
-
-'''; -} - -String _clusterActionButton(String label, String action) { - return ''' -
- - - -
-'''; -} - -String _queueRecoverySection(Map queues) { - if (queues.isEmpty) return ''; - final rows = queues.values.toList() - ..sort((a, b) => a.queue.compareTo(b.queue)); - return ''' -
- - - - - - - - - - - ${rows.map(_queueRecoveryRow).join()} - -
QueuePendingDead lettersReplay
-
-'''; -} - -String _queueRecoveryRow(QueueSummary summary) { - final limitDefault = summary.deadLetters <= 0 - ? 50 - : summary.deadLetters.clamp(1, 50); - final action = summary.deadLetters == 0 - ? 'No dead letters' - : ''' -
- - - -
- '''; - return ''' - - ${_escapeHtml(summary.queue)} - ${_formatInt(summary.pending)} - ${_formatInt(summary.deadLetters)} - $action - -'''; -} - -String _metricCard(String title, String value, String caption) { - return ''' -
-
$title
-
$value
-

$caption

-
-'''; -} - -String _emptyQueuesRow(String message) { - return ''' - - $message - -'''; -} - -String _renderTasksAlert(TasksPageOptions options) { - String? message; - var type = 'success'; - switch (options.flashKey) { - case 'queued': - message = 'Task enqueued successfully.'; - } - switch (options.errorKey) { - case 'missing-fields': - message = 'Queue and task name are required.'; - type = 'error'; - case 'invalid-payload': - message = 'Payload must be valid JSON describing an object.'; - type = 'error'; - case 'enqueue-failed': - message = - 'Failed to enqueue the task. Check the dashboard logs for details.'; - type = 'error'; - } - - if (message == null) return ''; - return '
${_escapeHtml(message)}
'; -} - -String _renderWorkersAlert(WorkersPageOptions options) { - if (options.hasError) { - final scope = options.hasScope - ? '
Target: ${_escapeHtml(options.scope!)}.
' - : ''; - return ''' -
- ${_escapeHtml(options.errorMessage!)} - $scope -
-'''; - } - if (options.hasFlash) { - final scope = options.hasScope - ? '
Target: ${_escapeHtml(options.scope!)}.
' - : ''; - return ''' -
- ${_escapeHtml(options.flashMessage!)} - $scope -
-'''; - } - return ''; -} - -int _compareQueues(QueueSummary a, QueueSummary b, TasksPageOptions options) { - switch (options.sortKey) { - case 'pending': - return a.pending.compareTo(b.pending); - case 'inflight': - return a.inflight.compareTo(b.inflight); - case 'dead': - return a.deadLetters.compareTo(b.deadLetters); - case 'queue': - default: - return a.queue.toLowerCase().compareTo(b.queue.toLowerCase()); + return buildWorkersContent(workers, queues, workersOptions); + case DashboardPage.namespaces: + return buildNamespacesContent( + queues: queues, + workers: workers, + taskStatuses: taskStatuses, + options: namespacesOptions, + defaultNamespace: defaultNamespace, + ); + case DashboardPage.workflows: + return buildWorkflowsContent( + taskStatuses: taskStatuses, + options: workflowsOptions, + ); + case DashboardPage.jobs: + return buildJobsContent(taskStatuses: taskStatuses, options: jobsOptions); } } -String _sortableHeader(String label, String key, TasksPageOptions options) { - final isActive = options.sortKey == key; - final descendingNext = isActive ? !options.descending : key != 'queue'; - final params = { - 'sort': key, - 'direction': descendingNext ? 'desc' : 'asc', - }; - if (options.hasFilter) { - params['queue'] = options.filter!; - } - final query = _buildQuery(params); - final indicator = isActive ? (options.descending ? '↓' : '↑') : ''; - final classes = isActive ? 'sort-link active' : 'sort-link'; - return '$label $indicator'; -} - -String _buildQuery(Map params) { - return params.entries - .map( - (entry) => - '${Uri.encodeQueryComponent(entry.key)}=${Uri.encodeQueryComponent(entry.value)}', - ) - .join('&'); -} - -String _escapeHtml(String value) { - return value - .replaceAll('&', '&') - .replaceAll('<', '<') - .replaceAll('>', '>') - .replaceAll('"', '"') - .replaceAll("'", '''); -} - -int _totalIsolates(List workers) { - return workers.fold(0, (total, status) => total + status.isolateCount); -} - -String _formatInt(int value) => _numberFormat.format(value); - -String _formatRelative(DateTime timestamp) { - final now = stemNow().toUtc(); - final diff = now.difference(timestamp.toUtc()); - if (diff < const Duration(seconds: 30)) return 'just now'; - if (diff < const Duration(minutes: 1)) { - return '${diff.inSeconds}s ago'; - } - if (diff < const Duration(hours: 1)) { - return '${diff.inMinutes}m ago'; - } - if (diff < const Duration(days: 1)) { - return '${diff.inHours}h ago'; - } - return '${diff.inDays}d ago'; +/// Builds inline expandable-row content for task table details. +String buildTaskInlineContent(DashboardTaskStatusEntry? task) { + return buildTaskInlinePanel(task); } diff --git a/packages/dashboard/lib/src/ui/event_templates.dart b/packages/dashboard/lib/src/ui/event_templates.dart index 013454e7..b796f58c 100644 --- a/packages/dashboard/lib/src/ui/event_templates.dart +++ b/packages/dashboard/lib/src/ui/event_templates.dart @@ -1,23 +1,25 @@ import 'package:intl/intl.dart'; import 'package:stem_dashboard/src/services/models.dart'; +import 'package:stem_dashboard/src/ui/shared.dart' show escapeHtml; final _eventTimeFormat = DateFormat('HH:mm:ss'); /// Renders a dashboard event as an HTML list item. String renderEventItem(DashboardEvent event) { final timestamp = _eventTimeFormat.format(event.timestamp.toLocal()); - final metadataItems = event.metadata.entries - .map((entry) => '${entry.key}: ${entry.value}') - .join(); + final metadataItems = event.metadata.entries.map((entry) { + final value = entry.value == null ? 'null' : entry.value.toString(); + return '${escapeHtml(entry.key)}: ${escapeHtml(value)}'; + }).join(); final summary = event.summary != null && event.summary!.isNotEmpty - ? '

${event.summary}

' + ? '

${escapeHtml(event.summary!)}

' : ''; return ''' -
+
- ${event.title} + ${escapeHtml(event.title)} $timestamp $summary diff --git a/packages/dashboard/lib/src/ui/layout.dart b/packages/dashboard/lib/src/ui/layout.dart index 9b051deb..6f34c387 100644 --- a/packages/dashboard/lib/src/ui/layout.dart +++ b/packages/dashboard/lib/src/ui/layout.dart @@ -1,3 +1,5 @@ +import 'package:stem_dashboard/src/ui/paths.dart'; + /// Pages supported by the dashboard UI. enum DashboardPage { /// Overview landing page. @@ -6,9 +8,30 @@ enum DashboardPage { /// Task and queue details page. tasks('/tasks'), + /// Detailed view for a single task / workflow run. + taskDetail('/tasks/detail'), + + /// Failure diagnostics and grouped retry controls. + failures('/failures'), + + /// Global search and saved operational views. + search('/search'), + + /// Audit log for actions and alert deliveries. + audit('/audit'), + /// Event feed page. events('/events'), + /// Namespace-centric operational summary. + namespaces('/namespaces'), + + /// Workflow run-centric operational summary. + workflows('/workflows'), + + /// Task family/job-centric operational summary. + jobs('/jobs'), + /// Worker status page. workers('/workers'); @@ -25,26 +48,76 @@ enum DashboardPage { return 'Overview'; case DashboardPage.tasks: return 'Tasks'; + case DashboardPage.taskDetail: + return 'Task Detail'; + case DashboardPage.failures: + return 'Failures'; + case DashboardPage.search: + return 'Search'; + case DashboardPage.audit: + return 'Audit'; case DashboardPage.events: return 'Events'; + case DashboardPage.namespaces: + return 'Namespaces'; + case DashboardPage.workflows: + return 'Workflows'; + case DashboardPage.jobs: + return 'Jobs'; case DashboardPage.workers: return 'Workers'; } } + /// Whether this page should appear in sidebar navigation. + bool get showInNav => this != DashboardPage.taskDetail; + /// Browser title for this page. String get title => 'Stem Dashboard · $label'; } /// Renders the full HTML layout for a dashboard page. -String renderLayout(DashboardPage page, String content) { +String renderLayout( + DashboardPage page, + String content, { + String basePath = '', + String? streamPath, +}) { + final resolvedBasePath = normalizeDashboardBasePath(basePath); + final resolvedStreamPath = + streamPath ?? dashboardRoute(basePath, '/dash/streams'); return ''' - + ${page.title} + + + + + +
-
@@ -661,16 +2055,20 @@ $content '''; } -String _renderNav(DashboardPage active) { - return DashboardPage.values.map((page) => _navLink(page, active)).join('\n'); +String _renderNav(DashboardPage active, String basePath) { + return DashboardPage.values + .where((page) => page.showInNav) + .map((page) => _navLink(page, active, basePath)) + .join('\n'); } -String _navLink(DashboardPage page, DashboardPage active) { +String _navLink(DashboardPage page, DashboardPage active, String basePath) { final isActive = page == active; final classes = ['nav-link', if (isActive) 'active'].join(' '); final aria = isActive ? ' aria-current="page"' : ''; + final route = dashboardRoute(basePath, page.path); return ''' - + ${page.label} '''; diff --git a/packages/dashboard/pubspec.yaml b/packages/dashboard/pubspec.yaml index d702f2c0..083a814e 100644 --- a/packages/dashboard/pubspec.yaml +++ b/packages/dashboard/pubspec.yaml @@ -5,6 +5,7 @@ publish_to: "none" environment: sdk: ">=3.9.2 <4.0.0" +resolution: workspace dependencies: intl: ^0.20.2 meta: ^1.18.0 @@ -27,14 +28,5 @@ dev_dependencies: dependency_overrides: analyzer: ^10.0.1 - stem: - path: ../stem - stem_cli: - path: ../stem_cli - stem_postgres: - path: ../stem_postgres - stem_redis: - path: ../stem_redis - stem_sqlite: - path: ../stem_sqlite - timezone: 0.11.0 + artisanal: ^0.2.0 + diff --git a/packages/dashboard/test/dashboard_browser_test.dart b/packages/dashboard/test/dashboard_browser_test.dart index 9c315334..a073ba29 100644 --- a/packages/dashboard/test/dashboard_browser_test.dart +++ b/packages/dashboard/test/dashboard_browser_test.dart @@ -6,7 +6,8 @@ import 'package:server_testing/src/browser/bootstrap/driver/driver_manager.dart' as driver_manager; import 'package:server_testing/src/browser/interfaces/browser_type.dart' show BrowserLaunchOptions; -import 'package:stem/stem.dart' show DeadLetterEntry, DeadLetterReplayResult; +import 'package:stem/stem.dart' + show DeadLetterEntry, DeadLetterReplayResult, TaskState; import 'package:stem_dashboard/src/server.dart'; import 'package:stem_dashboard/src/services/models.dart'; import 'package:stem_dashboard/src/services/stem_service.dart'; @@ -17,16 +18,23 @@ class _FakeDashboardService implements DashboardDataSource { _FakeDashboardService({ required List queues, required List workers, + List taskStatuses = const [], }) : _queues = queues, - _workers = workers; + _workers = workers, + _taskStatuses = taskStatuses; List _queues; List _workers; + List _taskStatuses; EnqueueRequest? lastEnqueue; final List controlCommands = []; String? lastReplayQueue; int? lastReplayLimit; bool? lastReplayDryRun; + String? lastReplayTaskId; + String? lastRevokeTaskId; + bool replayTaskSuccess = true; + bool revokeTaskSuccess = true; DeadLetterReplayResult replayResult = const DeadLetterReplayResult( entries: [], dryRun: false, @@ -43,6 +51,11 @@ class _FakeDashboardService implements DashboardDataSource { _workers = List.unmodifiable(values); } + List get taskStatuses => _taskStatuses; + set taskStatuses(List values) { + _taskStatuses = List.unmodifiable(values); + } + @override Future> fetchQueueSummaries() async => List.from(_queues); @@ -51,6 +64,51 @@ class _FakeDashboardService implements DashboardDataSource { Future> fetchWorkerStatuses() async => List.from(_workers); + @override + Future> fetchTaskStatuses({ + TaskState? state, + String? queue, + int limit = 100, + int offset = 0, + }) async { + final matches = _taskStatuses.where((entry) { + if (state != null && entry.state != state) return false; + if (queue != null && queue.isNotEmpty && entry.queue != queue) { + return false; + } + return true; + }); + return matches.skip(offset).take(limit).toList(growable: false); + } + + @override + Future fetchTaskStatus(String taskId) async { + for (final entry in _taskStatuses) { + if (entry.id == taskId) { + return entry; + } + } + return null; + } + + @override + Future> fetchTaskStatusesForRun( + String runId, { + int limit = 200, + }) async { + final matches = _taskStatuses.where((entry) => entry.runId == runId); + return matches.take(limit).toList(growable: false); + } + + @override + Future fetchWorkflowRun(String runId) async => + null; + + @override + Future> fetchWorkflowSteps( + String runId, + ) async => const []; + @override Future enqueueTask(EnqueueRequest request) async { lastEnqueue = request; @@ -68,6 +126,25 @@ class _FakeDashboardService implements DashboardDataSource { return replayResult; } + @override + Future replayTaskById(String taskId, {String? queue}) async { + lastReplayTaskId = taskId; + if (queue != null && queue.isNotEmpty) { + lastReplayQueue = queue; + } + return replayTaskSuccess; + } + + @override + Future revokeTask( + String taskId, { + bool terminate = false, + String? reason, + }) async { + lastRevokeTaskId = taskId; + return revokeTaskSuccess; + } + @override Future> sendControlCommand( ControlCommandMessage command, { @@ -86,6 +163,10 @@ class _FakeDashboardService implements DashboardDataSource { lastReplayQueue = null; lastReplayLimit = null; lastReplayDryRun = null; + lastReplayTaskId = null; + lastRevokeTaskId = null; + replayTaskSuccess = true; + revokeTaskSuccess = true; replayResult = const DeadLetterReplayResult( entries: [], dryRun: false, @@ -283,4 +364,14 @@ return fetch('/queues/replay', { expect(service.lastReplayLimit, 5); expect(service.lastReplayDryRun, isFalse); }); + + _dashboardBrowserTest('search page renders saved views and query results', ( + browser, + ) async { + await browser.visit('/search?q=default&scope=all'); + await browser.waiter.waitFor('.table-card'); + await browser.assertSee('Search'); + await browser.assertSee('Saved Views'); + await browser.assertSee('Backlog hotspots'); + }); } diff --git a/packages/dashboard/test/dashboard_state_property_test.dart b/packages/dashboard/test/dashboard_state_property_test.dart index e537d2ae..405079c0 100644 --- a/packages/dashboard/test/dashboard_state_property_test.dart +++ b/packages/dashboard/test/dashboard_state_property_test.dart @@ -1,5 +1,6 @@ import 'package:property_testing/property_testing.dart'; -import 'package:stem/stem.dart' show DeadLetterEntry, DeadLetterReplayResult; +import 'package:stem/stem.dart' + show DeadLetterEntry, DeadLetterReplayResult, TaskState; import 'package:stem_dashboard/src/services/models.dart'; import 'package:stem_dashboard/src/services/stem_service.dart'; import 'package:stem_dashboard/src/state/dashboard_state.dart'; @@ -76,6 +77,33 @@ class _SequenceDashboardService implements DashboardDataSource { return _workerSnapshots[_workerIndex++]; } + @override + Future> fetchTaskStatuses({ + TaskState? state, + String? queue, + int limit = 100, + int offset = 0, + }) async => const []; + + @override + Future fetchTaskStatus(String taskId) async => + null; + + @override + Future> fetchTaskStatusesForRun( + String runId, { + int limit = 200, + }) async => const []; + + @override + Future fetchWorkflowRun(String runId) async => + null; + + @override + Future> fetchWorkflowSteps( + String runId, + ) async => const []; + @override Future enqueueTask(EnqueueRequest request) async {} @@ -87,6 +115,16 @@ class _SequenceDashboardService implements DashboardDataSource { }) async => const DeadLetterReplayResult(entries: [], dryRun: false); + @override + Future replayTaskById(String taskId, {String? queue}) async => false; + + @override + Future revokeTask( + String taskId, { + bool terminate = false, + String? reason, + }) async => false; + @override Future> sendControlCommand( ControlCommandMessage command, { diff --git a/packages/dashboard/test/server_test.dart b/packages/dashboard/test/server_test.dart index 165126c2..a0f13451 100644 --- a/packages/dashboard/test/server_test.dart +++ b/packages/dashboard/test/server_test.dart @@ -1,7 +1,7 @@ import 'package:routed_testing/routed_testing.dart'; import 'package:server_testing/server_testing.dart'; import 'package:stem/stem.dart' - show DeadLetterEntry, DeadLetterReplayResult, Envelope; + show DeadLetterEntry, DeadLetterReplayResult, Envelope, TaskState; import 'package:stem_dashboard/src/server.dart'; import 'package:stem_dashboard/src/services/models.dart'; import 'package:stem_dashboard/src/services/stem_service.dart'; @@ -9,10 +9,15 @@ import 'package:stem_dashboard/src/state/dashboard_state.dart'; import 'package:stem_dashboard/src/stem/control_messages.dart'; class _RecordingService implements DashboardDataSource { - _RecordingService({this.queues = const [], this.workers = const []}); + _RecordingService({ + this.queues = const [], + this.workers = const [], + this.taskStatuses = const [], + }); final List queues; final List workers; + final List taskStatuses; EnqueueRequest? lastEnqueue; final List controlCommands = []; @@ -20,6 +25,12 @@ class _RecordingService implements DashboardDataSource { String? lastReplayQueue; int? lastReplayLimit; bool? lastReplayDryRun; + String? lastReplayTaskId; + String? lastRevokeTaskId; + bool? lastRevokeTerminate; + String? lastRevokeReason; + bool replayTaskSuccess = true; + bool revokeTaskSuccess = true; DeadLetterReplayResult replayResult = const DeadLetterReplayResult( entries: [], dryRun: false, @@ -31,6 +42,61 @@ class _RecordingService implements DashboardDataSource { @override Future> fetchWorkerStatuses() async => workers; + @override + Future> fetchTaskStatuses({ + TaskState? state, + String? queue, + int limit = 100, + int offset = 0, + }) async { + final filtered = taskStatuses + .where((entry) { + if (state != null && entry.state != state) { + return false; + } + if (queue != null && + queue.trim().isNotEmpty && + entry.queue != queue) { + return false; + } + return true; + }) + .skip(offset) + .take(limit); + return filtered.toList(growable: false); + } + + @override + Future fetchTaskStatus(String taskId) async { + for (final entry in taskStatuses) { + if (entry.id == taskId) { + return entry; + } + } + return null; + } + + @override + Future> fetchTaskStatusesForRun( + String runId, { + int limit = 200, + }) async { + final filtered = taskStatuses + .where((entry) => entry.runId == runId) + .take(limit) + .toList(growable: false); + return filtered; + } + + @override + Future fetchWorkflowRun(String runId) async => + null; + + @override + Future> fetchWorkflowSteps( + String runId, + ) async => const []; + @override Future enqueueTask(EnqueueRequest request) async { lastEnqueue = request; @@ -48,6 +114,27 @@ class _RecordingService implements DashboardDataSource { return replayResult; } + @override + Future replayTaskById(String taskId, {String? queue}) async { + lastReplayTaskId = taskId; + if (queue != null && queue.isNotEmpty) { + lastReplayQueue = queue; + } + return replayTaskSuccess; + } + + @override + Future revokeTask( + String taskId, { + bool terminate = false, + String? reason, + }) async { + lastRevokeTaskId = taskId; + lastRevokeTerminate = terminate; + lastRevokeReason = reason; + return revokeTaskSuccess; + } + @override Future> sendControlCommand( ControlCommandMessage command, { @@ -63,10 +150,15 @@ class _RecordingService implements DashboardDataSource { Future _buildClient( _RecordingService service, - DashboardState state, -) async { + DashboardState state, { + String basePath = '', +}) async { await state.runOnce(); - final engine = buildDashboardEngine(service: service, state: state); + final engine = buildDashboardEngine( + service: service, + state: state, + basePath: basePath, + ); final handler = RoutedRequestHandler(engine, true); addTearDown(() async { await handler.close(); @@ -100,6 +192,58 @@ void main() { ..assertBodyContains('critical'); }); + test('GET /partials/overview renders turbo stream section updates', () async { + final service = _RecordingService( + queues: const [ + QueueSummary(queue: 'default', pending: 2, inflight: 1, deadLetters: 0), + ], + workers: [ + WorkerStatus( + workerId: 'worker-1', + namespace: 'stem', + timestamp: DateTime.utc(2026), + inflight: 1, + isolateCount: 2, + queues: const [], + ), + ], + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-1', + state: TaskState.running, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'default', + taskName: 'demo.run', + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get( + '/partials/overview', + headers: { + 'accept': ['text/vnd.turbo-stream.html'], + }, + ); + response + ..assertStatus(200) + ..assertBodyContains( + '.generate(55, (index) { + final position = index + 1; + return DashboardTaskStatusEntry( + id: 'task-$position', + state: TaskState.succeeded, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, position), + queue: 'alpha', + taskName: 'demo.task.$position', + ); + }); + final service = _RecordingService( + queues: const [ + QueueSummary(queue: 'alpha', pending: 0, inflight: 0, deadLetters: 0), + ], + taskStatuses: statuses, + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get('/tasks?page=2&pageSize=25'); + response + ..assertStatus(200) + ..assertBodyContains('Page 2') + ..assertBodyContains('task-26') + ..assertBodyContains('Previous') + ..assertBodyContains('Next'); + }, + ); + + test('GET /tasks applies namespace/task/run filters', () async { + final service = _RecordingService( + queues: const [ + QueueSummary(queue: 'alpha', pending: 0, inflight: 0, deadLetters: 0), + ], + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-a', + state: TaskState.running, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'alpha', + taskName: 'greeting.send', + runId: 'run-1', + meta: const {'namespace': 'stem'}, + ), + DashboardTaskStatusEntry( + id: 'task-b', + state: TaskState.running, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 2), + queue: 'alpha', + taskName: 'greeting.send', + runId: 'run-2', + meta: const {'namespace': 'tenant-a'}, + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get( + '/tasks?namespace=tenant-a&task=greeting&runId=run-2', + ); + response + ..assertStatus(200) + ..assertBodyContains('task-b'); + expect(response.body, isNot(contains('task-a'))); + }); + + test('GET /namespaces renders namespace rollup table', () async { + final service = _RecordingService( + queues: const [ + QueueSummary(queue: 'alpha', pending: 2, inflight: 1, deadLetters: 0), + ], + workers: [ + WorkerStatus( + workerId: 'worker-1', + namespace: 'stem', + timestamp: DateTime.utc(2026), + isolateCount: 2, + inflight: 1, + queues: const [WorkerQueueInfo(name: 'alpha', inflight: 1)], + ), + ], + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-ns-1', + state: TaskState.running, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'alpha', + taskName: 'demo.task', + meta: const {'namespace': 'stem'}, + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get('/namespaces'); + response + ..assertStatus(200) + ..assertBodyContains('Namespaces') + ..assertBodyContains('Namespace Summary') + ..assertBodyContains('stem'); + }); + + test('GET /workflows renders workflow run summaries', () async { + final service = _RecordingService( + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-wf-1', + state: TaskState.running, + attempt: 0, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'alpha', + taskName: 'workflow.step', + runId: 'run-xyz', + workflowName: 'greetingFlow', + workflowStep: 'stepA', + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get('/workflows'); + response + ..assertStatus(200) + ..assertBodyContains('Workflow Runs') + ..assertBodyContains('run-xyz') + ..assertBodyContains('greetingFlow'); + }); + + test('GET /jobs renders job family summary', () async { + final service = _RecordingService( + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-job-1', + state: TaskState.failed, + attempt: 1, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'alpha', + taskName: 'greeting.send', + errorMessage: 'boom', + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get('/jobs'); + response + ..assertStatus(200) + ..assertBodyContains('Job Summary') + ..assertBodyContains('greeting.send'); + }); + + test('GET /tasks/inline renders lazy task panel as turbo stream', () async { + final service = _RecordingService( + taskStatuses: [ + DashboardTaskStatusEntry( + id: 'task-inline-1', + state: TaskState.failed, + attempt: 1, + createdAt: DateTime.utc(2026), + updatedAt: DateTime.utc(2026, 1, 1, 0, 1), + queue: 'alpha', + taskName: 'demo.inline', + errorMessage: 'boom', + meta: const {'stem.task': 'demo.inline'}, + ), + ], + ); + final state = DashboardState(service: service); + final client = await _buildClient(service, state); + + final response = await client.get( + '/tasks/inline?id=task-inline-1&target=task-inline-task-inline-1', + headers: { + 'accept': ['text/vnd.turbo-stream.html'], + }, + ); + response + ..assertStatus(200) + ..assertBodyContains(' entry.id == 'task-failed'); + expect(failed.queue, 'critical'); + expect(failed.taskName, 'demo.fail'); + expect(failed.state, TaskState.failed); + expect(failed.errorMessage, 'boom'); + + final stemMeta = all.firstWhere((entry) => entry.id == 'task-stem-meta'); + expect(stemMeta.queue, 'stem-only'); + expect(stemMeta.taskName, 'demo.stem.meta'); + expect(stemMeta.state, TaskState.running); + + final failedOnly = await service.fetchTaskStatuses(state: TaskState.failed); + expect(failedOnly, hasLength(1)); + expect(failedOnly.first.id, 'task-failed'); + + final queueOnly = await service.fetchTaskStatuses(queue: 'default'); + expect(queueOnly, hasLength(1)); + expect(queueOnly.first.id, 'task-ok'); + + final detail = await service.fetchTaskStatus('task-failed'); + expect(detail, isNotNull); + expect(detail!.errorType, 'StateError'); + expect(detail.errorMessage, 'boom'); + expect(detail.runId, 'run-1'); + + final runStatuses = await service.fetchTaskStatusesForRun('run-1'); + expect(runStatuses.length, 3); + }); } diff --git a/packages/stem/CHANGELOG.md b/packages/stem/CHANGELOG.md index f3e05269..f9620a28 100644 --- a/packages/stem/CHANGELOG.md +++ b/packages/stem/CHANGELOG.md @@ -2,6 +2,18 @@ ## 0.1.1 +- Expanded span attribution across enqueue/consume/execute with task identity, + queue, worker, host, lineage, namespace, and workflow step metadata + (`run_id`, `step`, `step_id`, `step_index`, `step_attempt`, `iteration`). +- Improved worker retry republish behavior to preserve optional payload signing + when retrying deliveries. +- Added workflow metadata quality-of-life getters and watcher/run-state helpers + to make workflow introspection easier from task metadata. +- Strengthened tracing and workflow-related test coverage for metadata + propagation and contract behavior. +- Expanded the microservice example with richer workload generation, queue + diversity, updated scheduler/demo flows, and full local observability wiring + for Jaeger/Prometheus/Grafana through nginx. - Improved bootstrap DX with explicit fail-fast errors across broker/backend/ workflow/schedule/lock/revoke resolution paths in `StemStack.fromUrl`, including actionable hints when adapters support a URL but do not implement diff --git a/packages/stem/example/microservice/README.md b/packages/stem/example/microservice/README.md index 73b104f1..9dde5fc1 100644 --- a/packages/stem/example/microservice/README.md +++ b/packages/stem/example/microservice/README.md @@ -22,6 +22,10 @@ All services expect the following environment variables (see `.env.example` for | `STEM_TLS_CLIENT_CERT` | _(optional)_ | mTLS client certificate used by enqueuers/workers. | | `STEM_TLS_CLIENT_KEY` | _(optional)_ | Private key associated with the client certificate. | | `PORT` | `8081` | HTTP port for the enqueue API (nginx fronts it on `api.localhost:8080`). | +| `ENQUEUER_AUTOFILL_ENABLED` | `true` | Enables a background demo producer that keeps the dashboard populated. | +| `ENQUEUER_AUTOFILL_INTERVAL_MS` | `2500` | Interval between auto-fill publish cycles. | +| `ENQUEUER_AUTOFILL_BATCH_SIZE` | `2` | Number of successful tasks published per auto-fill cycle. | +| `ENQUEUER_AUTOFILL_FAILURE_EVERY` | `8` | Every Nth cycle enqueues one synthetic failing task. | | `STEM_SCHEDULE_FILE` | `/config/schedules.yaml` | Optional YAML file the beat service uses to seed schedules. | | `STEM_METRIC_EXPORTERS` | `otlp:http://otel-collector:4318/v1/metrics` | Comma-separated list of metrics exporters enabled for workers (OTLP by default). | | `STEM_OTLP_ENDPOINT` | `http://otel-collector:4318/v1/traces` | Default OTLP endpoint used when exporters do not specify a destination. | @@ -58,7 +62,7 @@ If you want to run the stack with TLS enabled, generate certificates and switch ## Running with Docker Compose ```bash -cd examples/microservice +cd packages/stem/example/microservice cp .env.hmac_tls .env # or .env.hmac / .env.ed25519_tls docker compose up --build ``` @@ -74,14 +78,20 @@ The stack now brings up Redis, the enqueue API, three workers, the beat schedule - **Local overrides:** The compose file expects a routed ecosystem checkout next to this repo (sibling directory at `../routed_ecosystem`) so the dashboard overrides resolve correctly. If your local path differs, update the `volumes` entries in `docker-compose.yml` accordingly. -Workers emit metrics to the collector via OTLP; Prometheus scrapes the collector and Grafana ships with a pre-provisioned datasource so you can build dashboards immediately. Jaeger receives spans published through the collector, allowing you to trace enqueue and worker execution paths without extra configuration. +Workers emit metrics to the collector via OTLP; Prometheus scrapes the collector and Grafana ships with pre-provisioned datasources and Stem dashboards: + +- `Stem Overview` +- `Stem Workers & Queues` +- `Stem Scheduler` + +Jaeger receives spans published through the collector, allowing you to trace enqueue and worker execution paths without extra configuration. Enqueue a task: ```bash curl -X POST http://api.localhost:8080/enqueue \ -H 'content-type: application/json' \ - -d '{"name": "Ada"}' + -d '{"name": "Ada", "task": "greeting.send"}' ``` Fan out work with the canvas helper: @@ -106,6 +116,15 @@ stem schedule list stem schedule dry-run --id greetings-reminder --count 3 ``` +The demo stack now auto-generates background traffic across `greetings`, +`billing`, and `reporting` queues, including synthetic workflow metadata +(`stem.workflow.runId`, `stem.workflow.name`, `stem.workflow.step`) so the +Workflows/Jobs/Namespaces dashboard views stay populated. Disable this with: + +```bash +ENQUEUER_AUTOFILL_ENABLED=false docker compose up --build +``` + Stop the stack with `docker compose down`. ## Running locally with Dart @@ -130,7 +149,7 @@ Stop the stack with `docker compose down`. 3. Run the worker: ```bash - cd examples/microservice/worker + cd packages/stem/example/microservice/worker dart pub get dart run bin/worker.dart ``` @@ -138,7 +157,7 @@ Stop the stack with `docker compose down`. 4. In another terminal, run the enqueue API: ```bash - cd examples/microservice/enqueuer + cd packages/stem/example/microservice/enqueuer dart pub get dart run bin/main.dart ``` @@ -146,7 +165,7 @@ Stop the stack with `docker compose down`. The worker logs progress for each greeting task, demonstrating isolate execution, heartbeats, and result backend updates. Start the beat service in a third terminal to dispatch scheduled jobs: ```bash -cd examples/microservice/beat +cd packages/stem/example/microservice/beat dart pub get dart run bin/beat.dart ``` diff --git a/packages/stem/example/microservice/beat/Dockerfile b/packages/stem/example/microservice/beat/Dockerfile index ad1a206f..ef4cd4af 100644 --- a/packages/stem/example/microservice/beat/Dockerfile +++ b/packages/stem/example/microservice/beat/Dockerfile @@ -3,6 +3,7 @@ FROM dart:stable WORKDIR /workspace COPY . /workspace -WORKDIR /workspace/example/microservice/beat +WORKDIR /workspace/packages/stem/example/microservice/beat ENV DART_PUB_CACHE=/tmp/.dart_pub_cache +ENV PATH=/usr/lib/dart/bin:$PATH CMD ["sh", "-c", "dart pub get && dart run bin/beat.dart"] diff --git a/packages/stem/example/microservice/beat/bin/beat.dart b/packages/stem/example/microservice/beat/bin/beat.dart index eabb16b6..8a8195a0 100644 --- a/packages/stem/example/microservice/beat/bin/beat.dart +++ b/packages/stem/example/microservice/beat/bin/beat.dart @@ -11,7 +11,10 @@ const _deepEquals = DeepCollectionEquality(); Future main(List args) async { // #region signing-beat-config final config = StemConfig.fromEnvironment(); + final observability = ObservabilityConfig.fromEnvironment(); // #endregion signing-beat-config + observability.applyMetricExporters(); + observability.applySignalConfiguration(); final broker = await RedisStreamsBroker.connect( config.brokerUrl, diff --git a/packages/stem/example/microservice/beat/pubspec.yaml b/packages/stem/example/microservice/beat/pubspec.yaml index ec8769c7..c842d7eb 100644 --- a/packages/stem/example/microservice/beat/pubspec.yaml +++ b/packages/stem/example/microservice/beat/pubspec.yaml @@ -18,3 +18,5 @@ dev_dependencies: dependency_overrides: stem: path: ../../.. + stem_memory: + path: ../../../../stem_memory diff --git a/packages/stem/example/microservice/dashboard/Dockerfile b/packages/stem/example/microservice/dashboard/Dockerfile index 400543ce..70032857 100644 --- a/packages/stem/example/microservice/dashboard/Dockerfile +++ b/packages/stem/example/microservice/dashboard/Dockerfile @@ -3,6 +3,7 @@ FROM dart:stable WORKDIR /workspace COPY . /workspace -WORKDIR /workspace/dashboard +WORKDIR /workspace/packages/dashboard ENV DART_PUB_CACHE=/tmp/.dart_pub_cache +ENV PATH=/usr/lib/dart/bin:$PATH CMD ["sh", "-c", "dart pub get && dart run bin/dashboard.dart"] diff --git a/packages/stem/example/microservice/docker-compose.yml b/packages/stem/example/microservice/docker-compose.yml index 9ccfa769..4ba0758e 100644 --- a/packages/stem/example/microservice/docker-compose.yml +++ b/packages/stem/example/microservice/docker-compose.yml @@ -16,6 +16,7 @@ services: image: jaegertracing/all-in-one:1.56 environment: COLLECTOR_OTLP_ENABLED: "true" + QUERY_BASE_PATH: /jaeger restart: unless-stopped prometheus: @@ -36,12 +37,13 @@ services: environment: GF_SECURITY_ADMIN_USER: admin GF_SECURITY_ADMIN_PASSWORD: admin - GF_SERVER_DOMAIN: localhost - GF_SERVER_ROOT_URL: "%(protocol)s://%(domain)s:%(http_port)s/grafana/" + GF_SERVER_ROOT_URL: "http://localhost:8080/grafana/" GF_SERVER_SERVE_FROM_SUB_PATH: "true" volumes: - grafana-data:/var/lib/grafana - ./grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yaml:ro + - ./grafana/provisioning/dashboards/stem.yml:/etc/grafana/provisioning/dashboards/stem.yaml:ro + - ./grafana/dashboards:/etc/grafana/dashboards/stem:ro depends_on: - prometheus restart: unless-stopped @@ -61,8 +63,8 @@ services: dashboard: build: - context: ../.. - dockerfile: example/microservice/dashboard/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/dashboard/Dockerfile env_file: - .env.example environment: @@ -74,12 +76,16 @@ services: enqueuer: build: - context: ../.. - dockerfile: example/microservice/enqueuer/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/enqueuer/Dockerfile env_file: - .env.example environment: PORT: ${PORT:-8081} + ENQUEUER_AUTOFILL_ENABLED: ${ENQUEUER_AUTOFILL_ENABLED:-true} + ENQUEUER_AUTOFILL_INTERVAL_MS: ${ENQUEUER_AUTOFILL_INTERVAL_MS:-2500} + ENQUEUER_AUTOFILL_BATCH_SIZE: ${ENQUEUER_AUTOFILL_BATCH_SIZE:-2} + ENQUEUER_AUTOFILL_FAILURE_EVERY: ${ENQUEUER_AUTOFILL_FAILURE_EVERY:-8} depends_on: - redis - otel-collector @@ -87,10 +93,14 @@ services: worker: build: - context: ../.. - dockerfile: example/microservice/worker/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/worker/Dockerfile env_file: - .env.example + environment: + STEM_WORKER_NAME: microservice-worker-1 + STEM_WORKER_QUEUE: greetings + STEM_WORKER_NAMESPACE: customer-experience depends_on: - redis - otel-collector @@ -98,10 +108,14 @@ services: worker2: build: - context: ../.. - dockerfile: example/microservice/worker/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/worker/Dockerfile env_file: - .env.example + environment: + STEM_WORKER_NAME: microservice-worker-2 + STEM_WORKER_QUEUE: billing + STEM_WORKER_NAMESPACE: revenue depends_on: - redis - otel-collector @@ -109,10 +123,14 @@ services: worker3: build: - context: ../.. - dockerfile: example/microservice/worker/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/worker/Dockerfile env_file: - .env.example + environment: + STEM_WORKER_NAME: microservice-worker-3 + STEM_WORKER_QUEUE: reporting + STEM_WORKER_NAMESPACE: analytics depends_on: - redis - otel-collector @@ -120,8 +138,8 @@ services: beat: build: - context: ../.. - dockerfile: example/microservice/beat/Dockerfile + context: ../../../.. + dockerfile: packages/stem/example/microservice/beat/Dockerfile env_file: - .env.example environment: diff --git a/packages/stem/example/microservice/enqueuer/Dockerfile b/packages/stem/example/microservice/enqueuer/Dockerfile index 9ab07e2e..814c179f 100644 --- a/packages/stem/example/microservice/enqueuer/Dockerfile +++ b/packages/stem/example/microservice/enqueuer/Dockerfile @@ -3,8 +3,9 @@ FROM dart:stable WORKDIR /workspace COPY . /workspace -WORKDIR /workspace/example/microservice/enqueuer +WORKDIR /workspace/packages/stem/example/microservice/enqueuer ENV DART_PUB_CACHE=/tmp/.dart_pub_cache +ENV PATH=/usr/lib/dart/bin:$PATH ENV PORT=8081 EXPOSE 8081 CMD ["sh", "-c", "dart pub get && dart run bin/main.dart"] diff --git a/packages/stem/example/microservice/enqueuer/bin/main.dart b/packages/stem/example/microservice/enqueuer/bin/main.dart index c5e06bf1..20c4a2de 100644 --- a/packages/stem/example/microservice/enqueuer/bin/main.dart +++ b/packages/stem/example/microservice/enqueuer/bin/main.dart @@ -8,10 +8,86 @@ import 'package:shelf_router/shelf_router.dart'; import 'package:stem/stem.dart'; import 'package:stem_redis/stem_redis.dart'; +const _defaultTaskName = 'greeting.send'; + +const _demoTaskSpecs = <_DemoTaskSpec>[ + _DemoTaskSpec( + name: 'greeting.send', + queue: 'greetings', + namespace: 'customer-experience', + maxRetries: 5, + ), + _DemoTaskSpec( + name: 'customer.followup', + queue: 'greetings', + namespace: 'customer-experience', + maxRetries: 4, + ), + _DemoTaskSpec( + name: 'billing.charge', + queue: 'billing', + namespace: 'revenue', + maxRetries: 5, + ), + _DemoTaskSpec( + name: 'billing.settlement', + queue: 'billing', + namespace: 'revenue', + maxRetries: 3, + ), + _DemoTaskSpec( + name: 'reports.aggregate', + queue: 'reporting', + namespace: 'analytics', + maxRetries: 2, + ), + _DemoTaskSpec( + name: 'reports.publish', + queue: 'reporting', + namespace: 'analytics', + maxRetries: 2, + ), +]; + +final _demoTaskByName = { + for (final spec in _demoTaskSpecs) spec.name: spec, +}; + +const _workflowTemplates = <_WorkflowTemplate>[ + _WorkflowTemplate( + name: 'onboarding.v1', + steps: [ + _WorkflowStep(taskName: 'greeting.send', stepName: 'prepare-message'), + _WorkflowStep(taskName: 'billing.charge', stepName: 'charge-account'), + _WorkflowStep(taskName: 'reports.publish', stepName: 'publish-summary'), + ], + ), + _WorkflowTemplate( + name: 'billing.closeout', + steps: [ + _WorkflowStep(taskName: 'billing.charge', stepName: 'capture'), + _WorkflowStep(taskName: 'billing.settlement', stepName: 'settle'), + _WorkflowStep(taskName: 'reports.aggregate', stepName: 'rollup'), + ], + ), + _WorkflowTemplate( + name: 'customer.reengagement', + steps: [ + _WorkflowStep(taskName: 'customer.followup', stepName: 'hydrate-profile'), + _WorkflowStep(taskName: 'greeting.send', stepName: 'send-message'), + _WorkflowStep(taskName: 'reports.publish', stepName: 'write-audit'), + ], + ), +]; + Future main(List args) async { // #region signing-producer-config final config = StemConfig.fromEnvironment(); + final observability = ObservabilityConfig.fromEnvironment(); // #endregion signing-producer-config + observability.applyMetricExporters(); + observability.applySignalConfiguration(); + final broker = await RedisStreamsBroker.connect( config.brokerUrl, tls: config.tls, @@ -31,14 +107,16 @@ Future main(List args) async { // #endregion signing-producer-signer final httpContext = _buildHttpSecurityContext(); - final registry = SimpleTaskRegistry() - ..register( + final registry = SimpleTaskRegistry(); + for (final spec in _demoTaskSpecs) { + registry.register( FunctionTaskHandler( - name: 'greeting.send', + name: spec.name, entrypoint: _placeholderEntrypoint, - options: const TaskOptions(queue: 'greetings', maxRetries: 5), + options: TaskOptions(queue: spec.queue, maxRetries: spec.maxRetries), ), ); + } // #region signing-producer-stem final stem = Stem( @@ -53,23 +131,68 @@ Future main(List args) async { backend: backend, registry: registry, ); + final autoFill = _AutoFillController( + stem: stem, + enabled: _boolFromEnv( + Platform.environment['ENQUEUER_AUTOFILL_ENABLED'], + defaultValue: true, + ), + interval: Duration( + milliseconds: _intFromEnv( + Platform.environment['ENQUEUER_AUTOFILL_INTERVAL_MS'], + defaultValue: 2500, + ), + ), + batchSize: _intFromEnv( + Platform.environment['ENQUEUER_AUTOFILL_BATCH_SIZE'], + defaultValue: 2, + ), + failureEvery: _intFromEnv( + Platform.environment['ENQUEUER_AUTOFILL_FAILURE_EVERY'], + defaultValue: 8, + ), + )..start(); final router = Router() ..post('/enqueue', (Request request) async { final body = jsonDecode(await request.readAsString()) as Map; - final name = (body['name'] as String?)?.trim(); - if (name == null || name.isEmpty) { - return Response.badRequest( - body: jsonEncode({'error': 'Missing "name" field'}), + final requestedTask = (body['task'] as String?)?.trim(); + final taskName = (requestedTask == null || requestedTask.isEmpty) + ? _defaultTaskName + : requestedTask; + final taskSpec = _demoTaskByName[taskName]; + if (taskSpec == null) { + return Response( + HttpStatus.badRequest, + body: jsonEncode({ + 'error': 'Unknown task "$taskName".', + 'knownTasks': _demoTaskSpecs.map((entry) => entry.name).toList(), + }), + headers: {'content-type': 'application/json'}, ); } + final name = (body['name'] as String?)?.trim(); + final entity = (name == null || name.isEmpty) ? 'friend' : name; final taskId = await stem.enqueue( - 'greeting.send', - args: {'name': name}, - options: const TaskOptions(queue: 'greetings'), + taskSpec.name, + args: { + 'name': entity, + if (body['delayMs'] is num) + 'delayMs': (body['delayMs'] as num).toInt(), + if (body['fail'] is bool) 'fail': body['fail'] as bool, + }, + options: TaskOptions( + queue: taskSpec.queue, + maxRetries: taskSpec.maxRetries, + ), + meta: { + 'namespace': taskSpec.namespace, + 'stem.namespace': taskSpec.namespace, + 'demo.source': 'http.enqueue', + }, ); return Response.ok( - jsonEncode({'taskId': taskId}), + jsonEncode({'taskId': taskId, 'task': taskSpec.name}), headers: {'content-type': 'application/json'}, ); }) @@ -142,6 +265,7 @@ Future main(List args) async { Future shutdown(ProcessSignal signal) async { stdout.writeln('Shutting down enqueue service ($signal)...'); + autoFill.stop(); await server.close(force: true); await broker.close(); await backend.close(); @@ -174,3 +298,171 @@ SecurityContext? _buildHttpSecurityContext() { } return context; } + +class _AutoFillController { + _AutoFillController({ + required this.stem, + required this.enabled, + required this.interval, + required this.batchSize, + required this.failureEvery, + }); + + final Stem stem; + final bool enabled; + final Duration interval; + final int batchSize; + final int failureEvery; + + Timer? _timer; + var _tick = 0; + var _running = false; + + void start() { + if (!enabled) return; + stdout.writeln( + 'Auto-fill enabled (interval=${interval.inMilliseconds}ms, ' + 'batchSize=$batchSize, failureEvery=$failureEvery).', + ); + _timer = Timer.periodic(interval, (_) { + if (_running) return; + _running = true; + unawaited(_produce().whenComplete(() => _running = false)); + }); + } + + void stop() { + _timer?.cancel(); + } + + Future _produce() async { + _tick++; + for (var index = 0; index < batchSize; index++) { + final spec = _demoTaskSpecs[(_tick + index) % _demoTaskSpecs.length]; + final shouldFail = failureEvery > 0 && + _tick % failureEvery == 0 && + index == 0 && + (spec.queue == 'greetings' || spec.queue == 'billing'); + final delayMs = 220 + ((_tick + index) % 8) * 160; + final taskId = await _enqueueTask( + spec, + label: 'demo-${_tick.toString().padLeft(4, '0')}-$index', + delayMs: delayMs, + shouldFail: shouldFail, + extraMeta: const {'demo.kind': 'standalone'}, + ); + stdout.writeln( + 'Auto-filled standalone task $taskId ' + '(${spec.name} queue=${spec.queue} delayMs=$delayMs fail=$shouldFail).', + ); + } + + if (_tick.isEven) { + await _enqueueWorkflowSample(); + } + } + + Future _enqueueTask( + _DemoTaskSpec spec, { + required String label, + required int delayMs, + required bool shouldFail, + Map extraMeta = const {}, + }) { + return stem.enqueue( + spec.name, + args: { + 'name': label, + 'delayMs': delayMs, + if (shouldFail) 'fail': true, + }, + options: TaskOptions(queue: spec.queue, maxRetries: spec.maxRetries), + meta: { + 'namespace': spec.namespace, + 'stem.namespace': spec.namespace, + ...extraMeta, + }, + ); + } + + Future _enqueueWorkflowSample() async { + final template = _workflowTemplates[_tick % _workflowTemplates.length]; + final runId = 'wf-${_tick.toString().padLeft(6, '0')}'; + final forceFailure = failureEvery > 0 && _tick % failureEvery == 0; + for (var index = 0; index < template.steps.length; index++) { + final step = template.steps[index]; + final spec = _demoTaskByName[step.taskName]; + if (spec == null) { + continue; + } + final shouldFail = forceFailure && index == template.steps.length - 1; + final delayMs = 280 + ((_tick + index) % 6) * 140; + final taskId = await _enqueueTask( + spec, + label: '$runId-${step.stepName}', + delayMs: delayMs, + shouldFail: shouldFail, + extraMeta: { + 'demo.kind': 'workflow-step', + 'demo.workflow': template.name, + 'stem.workflow.runId': runId, + 'stem.workflow.name': template.name, + 'stem.workflow.step': step.stepName, + 'stem.workflow.stepIndex': index, + 'stem.workflow.iteration': 0, + }, + ); + stdout.writeln( + 'Auto-filled workflow step $taskId ' + '(run=$runId workflow=${template.name} step=${step.stepName}).', + ); + } + } +} + +class _DemoTaskSpec { + const _DemoTaskSpec({ + required this.name, + required this.queue, + required this.namespace, + required this.maxRetries, + }); + + final String name; + final String queue; + final String namespace; + final int maxRetries; +} + +class _WorkflowTemplate { + const _WorkflowTemplate({required this.name, required this.steps}); + + final String name; + final List<_WorkflowStep> steps; +} + +class _WorkflowStep { + const _WorkflowStep({required this.taskName, required this.stepName}); + + final String taskName; + final String stepName; +} + +bool _boolFromEnv(String? value, {required bool defaultValue}) { + final normalized = value?.trim().toLowerCase(); + if (normalized == null || normalized.isEmpty) { + return defaultValue; + } + return normalized == '1' || + normalized == 'true' || + normalized == 'yes' || + normalized == 'on'; +} + +int _intFromEnv(String? value, {required int defaultValue}) { + final parsed = int.tryParse(value?.trim() ?? ''); + if (parsed == null || parsed <= 0) { + return defaultValue; + } + return parsed; +} diff --git a/packages/stem/example/microservice/enqueuer/pubspec.yaml b/packages/stem/example/microservice/enqueuer/pubspec.yaml index a0cc865a..23a48edb 100644 --- a/packages/stem/example/microservice/enqueuer/pubspec.yaml +++ b/packages/stem/example/microservice/enqueuer/pubspec.yaml @@ -18,4 +18,6 @@ dependency_overrides: stem: path: ../../.. stem_redis: - path: ../../../../stem_redis \ No newline at end of file + path: ../../../../stem_redis + stem_memory: + path: ../../../../stem_memory diff --git a/packages/stem/example/microservice/grafana-datasources.yml b/packages/stem/example/microservice/grafana-datasources.yml index 64d3212c..ef926e7a 100644 --- a/packages/stem/example/microservice/grafana-datasources.yml +++ b/packages/stem/example/microservice/grafana-datasources.yml @@ -1,13 +1,13 @@ apiVersion: 1 datasources: - name: Prometheus + uid: stem-prometheus type: prometheus access: proxy - url: http://prometheus:9090 + url: http://prometheus:9090/prometheus isDefault: true - jsonData: - timeInterval: 15s - name: Jaeger + uid: stem-jaeger type: jaeger access: proxy - url: http://jaeger:16686 + url: http://jaeger:16686/jaeger