Skip to content

elizaos-plugins/plugin-orchestrator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@elizaos/plugin-orchestrator

A demonstrable agentic task orchestrator for ElizaOS.

Why This Plugin Exists

ElizaOS agents need to execute multi-step tasks autonomously - tasks that require planning, executing actions, evaluating results, and adapting. The existing DefaultMessageService.runMultiStepCore handles this for synchronous message processing, but we needed something for background tasks that:

  1. Doesn't block - Long-running tasks shouldn't freeze the agent
  2. Is fair - Multiple tasks should share resources fairly
  3. Is observable - Users need to see progress and status
  4. Is resilient - Tasks should survive restarts and handle failures gracefully

This plugin provides that foundation.

Design Philosophy

Why an Agentic Loop?

Traditional task queues execute predefined steps. But AI agents need to think between steps:

Traditional: Step 1 → Step 2 → Step 3 → Done
Agentic:     Plan → Execute → Evaluate → Decide → (loop or done)

The evaluation step is what makes this "agentic" - after each action, the LLM assesses:

  • Did we make progress toward the goal?
  • Should we continue, pivot, or bail?
  • Are we stuck in a loop?

Why Single LLM Call Per Iteration?

We combine planning and progress estimation into one LLM call:

❌ Two calls: "What action?" + "How much progress?"  (2x cost, 2x latency)
✅ One call:  "What action and estimate progress"    (1x cost, 1x latency)

The prompt asks the LLM to return action, progress, and status together. This halves API costs and latency without losing accuracy.

Why Query-Derived Queue?

We don't maintain a separate queue data structure. Instead:

// The queue IS just a query over Tasks
const queue = await runtime.getTasks({ tags: ['orchestrator'] });

Why?

  • Simpler - No separate state to sync
  • Persistent - Tasks already survive restarts
  • Consistent - Single source of truth
  • Queryable - Filter by status, search by name, etc.

Why Round-Robin Scheduling?

With multiple tasks, we could:

  • Run them in parallel (exhausts LLM rate limits)
  • Run one to completion (starves other tasks)
  • Round-robin one iteration each (fair sharing)

We chose round-robin because:

  • Fair - Every task makes progress
  • Resource-controlled - One LLM call at a time
  • Predictable - Users know their task will be processed
  • Simple - Sort by lastProcessedAt, pick oldest

Why Smart Batched Notifications?

Naive notification approach:

Bot: Task #1 progress: 10%
Bot: Task #1 progress: 12%
Bot: Task #1 progress: 15%    ← Spam!
Bot: Task #1 progress: 18%

Our approach batches updates and shows deltas:

Bot: Task #1 "Build auth"
     Progress: 10% → 18%
     Last action: create_file ✓

Why batching?

  • No spam - Users aren't flooded
  • Context - "10% → 18%" is more meaningful than just "18%"
  • Immediate when needed - Completions/failures bypass batching

Why categories replace?

  • Progress updates: Latest wins (18% replaces 15%)
  • Errors: Accumulate (show all errors)
  • Actions: Latest wins (only care about most recent)

Why Stalemate Detection?

LLMs can get stuck in loops, trying the same action repeatedly. We track:

  • staleCounter - Iterations without progress increase
  • When staleCounter >= 3, we bail out

Why 3? It's a balance:

  • Too low (1): Might give up too early on hard problems
  • Too high (10): Wastes resources on truly stuck tasks
  • 3 gives the LLM a few chances to try different approaches

Why Numbered Tasks?

Tasks get sequential numbers: #1, #2, #3...

Why?

  • Human-friendly - "pause #2" is easier than "pause abc123-def456"
  • Unambiguous - Numbers don't conflict like names might
  • Memorable - Users remember "task 3" better than UUIDs
  • Demonstrable - Clean output for demos and screenshots

Architecture

flowchart TD
    subgraph human [Human Interface]
        H1["'create task Build auth'"]
        H2["'status #1'"]
        H3["'pause #1'"]
        H4["'list tasks'"]
    end

    subgraph db [Database - Tasks Table]
        T1["Task #1 status=running"]
        T2["Task #2 status=queued"]
        T3["Task #3 status=queued"]
    end

    subgraph worker [TaskService Worker - Every Tick]
        W1["Query: tags=['orchestrator']<br/>status in [running, queued]"]
        W2["Sort by lastProcessedAt"]
        W3["Pick oldest = fair!"]
        W4["runOneIteration()"]
    end

    subgraph iteration [One Iteration]
        I1["LLM: 'Given goal and history,<br/>what action? estimate progress'"]
        I2["Execute action via runtime"]
        I3["Check: progress > previous?"]
        I4{Decide}
    end

    subgraph outcomes [Outcomes]
        O1["Continue → update metadata"]
        O2["Done → mark completed"]
        O3["Stuck → mark failed"]
        O4["Stale x3 → mark stalemate"]
    end

    subgraph notify [Notification Service]
        N1["Collect updates by category"]
        N2["Every 5s: flush batched"]
        N3["Immediate: completion/failure"]
    end

    human --> db
    db --> worker
    W1 --> W2 --> W3 --> W4
    W4 --> iteration
    I1 --> I2 --> I3 --> I4
    I4 --> outcomes
    outcomes --> notify
    notify --> human
Loading

Installation

bun add @elizaos/plugin-orchestrator

Usage

import { agentOrchestratorPlugin } from '@elizaos/plugin-orchestrator';

const runtime = await createRuntime({
  plugins: [agentOrchestratorPlugin],
});

Commands

Command Example Description
Create task "work on building auth" Creates Task #N with the goal
List tasks "list tasks" or "show queue" Shows all tasks with progress bars
Task status "status #1" Detailed status of specific task
Pause "pause #1" Pauses task (won't be scheduled)
Resume "resume #1" Resumes paused task
Cancel "cancel #1" Cancels task permanently
Search "search tasks auth" Find tasks by name

Output Examples

Progress Update (batched every 5s)

📋 Task #1 "Build auth feature"
   Progress: 15% → 35%
   Last action: `create_file src/auth/login.ts` ✓
   Iteration: 4

Task Completed

✅ Task #1 "Build auth feature" completed!
   Final progress: 100%
   Total steps: 12
   Duration: 8m 23s
   
   Summary: Created login/logout endpoints with JWT tokens,
   added password hashing, and integrated with user database.

Task Stalemate

⚠️ Task #2 "Fix database connection" stalled
   Progress stuck at: 40%
   Attempts without progress: 3
   Last action: `read_file config/db.ts` ✓
   
   The task couldn't make further progress. Consider:
   - Providing more context
   - Breaking into smaller tasks
   - Manual intervention

Queue Status

📊 Task Queue (3 tasks)

#1 "Build auth"     [████████░░] 80%  Running (iter 12)
#2 "Fix bug"        [██░░░░░░░░] 20%  Queued
#3 "Add tests"      [░░░░░░░░░░]  0%  Queued

Next: #2 "Fix bug" (fair scheduling)

Extending for Your Use Case

Why Composable Exports?

This plugin is designed to be a foundation, not a complete solution. Different domains need different:

  • Actions - Coding vs. research vs. data processing
  • Evaluation criteria - Build passing vs. accuracy metrics
  • Progress estimation - Different heuristics per domain

So we export the building blocks:

import {
  // The core iteration function
  runOneIteration,
  
  // Build your own prompts
  buildIterationPrompt,
  parseDecision,
  
  // Notification infrastructure
  NotificationService,
  formatDuration,
  createProgressBar,
  
  // Types for extending
  OrchestratorTaskMetadata,
  StepResult,
  IterationResult,
} from '@elizaos/plugin-orchestrator';

Example: Coding-Specific Orchestrator

import { 
  runOneIteration, 
  NotificationService,
  type IterationResult 
} from '@elizaos/plugin-orchestrator';

/**
 * Extends the generic orchestrator with coding-specific evaluation.
 * 
 * WHY: Generic progress estimation doesn't know about builds/tests.
 * A coding task at "80% progress" should also pass the build.
 */
async function runCodingIteration(
  runtime: IAgentRuntime, 
  task: Task
): Promise<IterationResult> {
  // Run the generic iteration first
  const result = await runOneIteration(runtime, task);
  
  if (result.status === 'continue' && result.newMetadata) {
    // Add coding-specific checks
    const buildPasses = await checkBuild();
    const testsPasses = await runTests();
    
    // Adjust progress: Can't be "done" if tests fail
    if (!testsPasses && result.newMetadata.progressScore > 90) {
      result.newMetadata.progressScore = 85; // Cap until tests pass
    }
    
    // Bonus progress for passing CI
    if (buildPasses && testsPasses) {
      result.newMetadata.progressScore = Math.min(
        100, 
        result.newMetadata.progressScore + 5
      );
    }
  }
  
  return result;
}

Configuration

Task Defaults

Setting Default Description
maxIterations 50 Maximum iterations before auto-stalemate
maxStaleIterations 3 Iterations without progress before bail
flushIntervalMs 5000 Notification batching window

Customizing via Task Metadata

// Create task with custom limits
const task = await svc.createTask(name, goal, requestorId, roomId);

// Update limits if needed
await runtime.updateTask(task.id, {
  metadata: {
    ...task.metadata,
    maxIterations: 100,        // Allow more iterations
    maxStaleIterations: 5,     // More patience for hard tasks
  }
});

Types

OrchestratorTaskMetadata

interface OrchestratorTaskMetadata {
  // Identity (human-readable)
  number: number;              // Task #1, #2, #3...
  name: string;                // "Build auth feature"
  
  // Goal
  goal: string;                // What we're trying to achieve
  
  // Progress tracking
  steps: StepResult[];         // History of actions taken
  progressScore: number;       // 0-100, estimated by LLM
  previousProgress: number;    // For "was X → now Y" display
  staleCounter: number;        // Iterations without progress
  iteration: number;           // Current iteration number
  
  // Limits
  maxIterations: number;       // Auto-stalemate after this
  maxStaleIterations: number;  // Bail after N stale iterations
  
  // Requestor (for notifications)
  requestorId: string;         // Who asked for this task
  roomId: string;              // Where to send updates
  
  // Scheduling
  lastProcessedAt: number;     // For round-robin fairness
  
  // Status
  status: TaskStatus;          // queued|running|paused|completed|failed|stalemate
  
  // Timing
  createdAt: number;
  startedAt?: number;
  completedAt?: number;
  
  // Result
  summary?: string;            // Final summary when completed
  error?: string;              // Error message if failed
}

StepResult

interface StepResult {
  action: string;      // The action that was executed
  result: string;      // The output/result of the action
  success: boolean;    // Whether the action succeeded
  timestamp: number;   // When this step was executed
}

IterationResult

interface IterationResult {
  status: 'continue' | 'completed' | 'failed' | 'stalemate';
  message?: string;
  newMetadata?: Partial<OrchestratorTaskMetadata>;
}

Comparison with Other Approaches

Approach Pros Cons When to Use
This plugin Fair scheduling, observable, resilient Single-threaded Multiple background tasks
runMultiStepCore Built-in, synchronous Blocks message handling Single quick task in message flow
Custom service Full control More code to maintain Very specific requirements

License

MIT

About

A demonstrable agentic task orchestrator for ElizaOS.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published