diff --git a/research/docs/2026-02-15-ralph-dag-orchestration-blockedby.md b/research/docs/2026-02-15-ralph-dag-orchestration-blockedby.md new file mode 100644 index 0000000..a58065e --- /dev/null +++ b/research/docs/2026-02-15-ralph-dag-orchestration-blockedby.md @@ -0,0 +1,238 @@ +--- +date: 2026-02-15 00:00:00 UTC +researcher: Claude Opus 4.6 +git_commit: d5c8a4e3ee33dbfae60da8e6df15af549403fb9f +branch: main +repository: atomic +topic: "Ralph DAG-Based Orchestration with blockedBy Dependency Enforcement" +tags: [research, codebase, ralph, dag, orchestration, blockedBy, parallel-workers, topological-sort, task-management, workflow] +status: complete +last_updated: 2026-02-15 +last_updated_by: Claude Opus 4.6 +--- + +# Research: Ralph DAG-Based Orchestration with blockedBy Dependency Enforcement + +## Research Question + +How to modify the current ralph implementation so that `blockedBy` is properly enforced during task execution (not just UI display), worker sub-agents can mark tasks as complete with immediate UI reflection, and multiple workers are dispatched in parallel using a DAG-based topological traversal with round-robin execution. + +## Summary + +The `blockedBy` dependency system exists across the entire data model (TodoWrite schema, normalization pipeline, topological sort, UI rendering) but is **never enforced during task execution**. The worker loop in `workflow-commands.ts` is sequential: it spawns one worker at a time via `context.spawnSubagent()`, which blocks on a single `streamCompletionResolverRef` slot in `chat.tsx`. Workers select tasks by "highest priority" heuristic without checking `blockedBy`. The infrastructure for parallel sub-agent execution exists (`SubagentGraphBridge.spawnParallel()` using `Promise.allSettled()`) but is unused by ralph. The UI already updates reactively via `fs.watch` on `tasks.json`, so workers writing directly to `tasks.json` would immediately update the persistent `TaskListPanel`. Key gaps to address: (1) dependency-aware task selection, (2) parallel worker dispatch replacing the serial loop, (3) dynamic DAG mutation when workers insert bug-fix tasks, (4) file locking for concurrent `tasks.json` writes, and (5) deadlock detection. + +## Detailed Findings + +### 1. Current Worker Loop: Sequential and Dependency-Unaware + +The ralph worker loop exists in two places (fresh start and resume), both following the same pattern: + +**File**: [`src/ui/commands/workflow-commands.ts:796-807`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L796-L807) + +```typescript +// Worker loop: spawn worker sub-agent per iteration until all tasks are done +const maxIterations = tasks.length * 2; // safety limit +for (let i = 0; i < maxIterations; i++) { + // Read current task state from disk + const currentTasks = await readTasksFromDisk(sessionDir); + const pending = currentTasks.filter(t => t.status !== "completed"); + if (pending.length === 0) break; + + const message = buildTaskListPreamble(currentTasks); + const result = await context.spawnSubagent({ name: "worker", message }); + if (!result.success) break; +} +``` + +**Resume path** (identical pattern): [`workflow-commands.ts:748-757`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L748-L757) + +**Key problems**: +1. **No `blockedBy` check**: The loop filters only on `status !== "completed"` (line 801). Tasks with unsatisfied dependencies are included in `pending` and presented to the worker. +2. **Serial execution**: `context.spawnSubagent()` blocks until the worker stream completes, so only one worker runs at a time. +3. **Worker self-selection**: The full task list (including blocked tasks) is sent to the worker via `buildTaskListPreamble()`. The worker picks "highest priority" without dependency checking. + +### 2. Why `spawnSubagent` Is Serial (Single-Slot Blocking) + +**File**: [`src/ui/chat.tsx:3359-3374`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/chat.tsx#L3359-L3374) + +```typescript +spawnSubagent: async (options) => { + const agentName = options.name ?? options.model ?? "general-purpose"; + const task = options.message; + const instruction = `Use the ${agentName} sub-agent to handle this task: ${task}`; + const result = await new Promise((resolve) => { + streamCompletionResolverRef.current = resolve; + context.sendSilentMessage(instruction); + }); + return { success: !result.wasInterrupted, output: result.content }; +}, +``` + +The `streamCompletionResolverRef` (declared at [`chat.tsx:1897`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/chat.tsx#L1897)) is a `useRef` holding a single resolver function. Each call to `spawnSubagent` overwrites it. This means calling `spawnSubagent` a second time before the first resolves would **silently drop** the first stream's result. This is the fundamental architectural barrier to parallel worker dispatch via the current `CommandContext` interface. + +### 3. The blockedBy Data Model: Complete but Unenforced + +The `blockedBy` field flows through the entire system but is only used for **display purposes**: + +| Layer | File | Usage | +|-------|------|-------| +| **Schema** | [`src/sdk/tools/todo-write.ts:40-44`](https://github.com/flora131/atomic/blob/d5c8a4e/src/sdk/tools/todo-write.ts#L40-L44) | `blockedBy` field in TodoWrite JSON schema | +| **Type** | [`src/sdk/tools/todo-write.ts:58`](https://github.com/flora131/atomic/blob/d5c8a4e/src/sdk/tools/todo-write.ts#L58) | `blockedBy?: string[]` on `TodoItem` interface | +| **Normalization** | [`src/ui/utils/task-status.ts:69-80`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/utils/task-status.ts#L69-L80) | `normalizeBlockedBy()` filters/stringifies array | +| **Prompt generation** | [`src/graph/nodes/ralph.ts:50`](https://github.com/flora131/atomic/blob/d5c8a4e/src/graph/nodes/ralph.ts#L50) | LLM instructed to generate `blockedBy` arrays | +| **Topological sort** | [`src/ui/components/task-order.ts:19-122`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/components/task-order.ts#L19-L122) | `sortTasksTopologically()` using Kahn's algorithm | +| **UI rendering** | [`src/ui/components/task-list-indicator.tsx:117-119`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/components/task-list-indicator.tsx#L117-L119) | Renders `> blocked by #1, #2` annotations | +| **Worker agent** | [`.claude/agents/worker.md:84-96`](https://github.com/flora131/atomic/blob/d5c8a4e/.claude/agents/worker.md#L84-L96) | Bug handling instructs writing `blockedBy` on affected tasks | +| **State snapshots** | [`src/ui/utils/ralph-task-state.ts:34-38`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/utils/ralph-task-state.ts#L34-L38) | `snapshotTaskItems()` preserves `blockedBy` | +| **Worker loop** | [`workflow-commands.ts:801`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L801) | **NOT USED** - only checks `status !== "completed"` | + +The topological sort in `task-order.ts` implements Kahn's algorithm (BFS) with cycle detection, but it is only consumed by `TaskListPanel` for **display ordering**, not by the worker loop for **execution scheduling**. + +### 4. Existing Parallel Execution Infrastructure + +**File**: [`src/graph/subagent-bridge.ts:184-208`](https://github.com/flora131/atomic/blob/d5c8a4e/src/graph/subagent-bridge.ts#L184-L208) + +```typescript +async spawnParallel( + agents: SubagentSpawnOptions[], +): Promise { + const results = await Promise.allSettled( + agents.map((agent) => this.spawn(agent)) + ); + return results.map((result, i) => { + if (result.status === "fulfilled") return result.value; + // ... error handling for rejected promises + }); +} +``` + +This uses `Promise.allSettled()` so one agent's failure doesn't cancel others. Each sub-agent in `spawnParallel` gets its own independent session via `SubagentGraphBridge.spawn()`. This infrastructure **already supports true parallel sub-agent execution** and could replace the serial `context.spawnSubagent()` loop. + +Additional parallel primitives exist in [`src/graph/nodes.ts`](https://github.com/flora131/atomic/blob/d5c8a4e/src/graph/nodes.ts): +- `parallelNode()` (line 988): Graph node with parallel branches, strategy, and merge function +- `parallelSubagentNode()` (line 1802): Spawns multiple sub-agents concurrently within graph execution + +### 5. Task List UI: Already Reactive via File Watcher + +The UI update pipeline is already file-driven and would work with parallel workers: + +1. **Worker writes `tasks.json`** via `saveTasksToActiveSession()` ([`workflow-commands.ts:141-163`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L141-L163)) which uses `Bun.write()` +2. **File watcher detects change** via `watchTasksJson()` ([`workflow-commands.ts:818-837`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L818-L837)) using Node's `fs.watch` on the session directory +3. **TaskListPanel re-renders** ([`task-list-panel.tsx:48-64`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/components/task-list-panel.tsx#L48-L64)) with topologically sorted tasks + +Three write paths exist for `tasks.json`: +- **Orchestrator** writes after task decomposition ([`workflow-commands.ts:789`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/commands/workflow-commands.ts#L789)) +- **TodoWrite interception** in chat.tsx persists when ralph is active ([`chat.tsx:2145-2146`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/chat.tsx#L2145-L2146) and [`chat.tsx:2254-2255`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/chat.tsx#L2254-L2255)) +- **Worker agent** writes directly to `tasks.json` at `~/.atomic/workflows/{session_id}/tasks.json` (per worker.md instructions) + +Since the `TaskListPanel` uses `watchTasksJson()` to react to file changes, workers that write directly to `tasks.json` already trigger immediate UI updates. No delay until the main agent is involved. + +### 6. Session and File Concurrency Concerns + +**File**: [`src/workflows/session.ts:32-49`](https://github.com/flora131/atomic/blob/d5c8a4e/src/workflows/session.ts#L32-L49) + +``` +~/.atomic/workflows/sessions/{sessionId}/ + ├── tasks.json ← shared state file + ├── progress.txt ← append-only log + ├── session.json ← session metadata + └── subagent-outputs/ ← individual agent output files +``` + +**No file locking mechanism exists.** The current `saveTasksToActiveSession()` uses `Bun.write()` which is a full file overwrite. With parallel workers, two workers could: +1. Both read `tasks.json` at the same time +2. Each modify their respective task status +3. The second write overwrites the first, losing its status update + +This is a classic read-modify-write race condition. A solution requires either: +- **Centralized coordinator** that serializes all `tasks.json` mutations (recommended) +- **File locking** via `flock` or advisory locks +- **Atomic compare-and-swap** using versioned writes + +### 7. Worker Agent: Missing Dependency Awareness + +**File**: [`.claude/agents/worker.md`](https://github.com/flora131/atomic/blob/d5c8a4e/.claude/agents/worker.md) + +The worker agent prompt instructs (line 9): +> "Only work on the SINGLE highest priority task that is not yet marked as complete." + +It does **not** instruct the worker to: +- Check `blockedBy` before selecting a task +- Skip tasks whose `blockedBy` contains incomplete task IDs +- Verify dependency completion before starting work + +The worker **does** understand `blockedBy` for bug handling (lines 84-96): when a bug is found, it adds the bug-fix task and updates downstream `blockedBy` arrays. But this is write-only — the worker never reads `blockedBy` for task selection. + +Additionally, the worker references the wrong path format: `~/.atomic/workflows/{session_id}` (line 13) vs the actual path `~/.atomic/workflows/sessions/{session_id}` ([`session.ts:32-35`](https://github.com/flora131/atomic/blob/d5c8a4e/src/workflows/session.ts#L32-L35)). + +### 8. Topological Sort Implementation (Reusable for Execution Scheduling) + +**File**: [`src/ui/components/task-order.ts:19-122`](https://github.com/flora131/atomic/blob/d5c8a4e/src/ui/components/task-order.ts#L19-L122) + +`sortTasksTopologically()` implements Kahn's algorithm: +1. Normalize task IDs (strip leading `#`, handle duplicates) +2. Build adjacency list from `blockedBy` → dependents edges +3. Compute in-degree for each task +4. BFS from zero-in-degree tasks (no dependencies) +5. Append tasks with unresolvable metadata (missing IDs, unknown blockers, cycles) to tail + +This function could be adapted for **execution scheduling** (not just display ordering) by: +- Extracting the "ready set" (zero in-degree, status = pending) as tasks to dispatch +- After a task completes, decrementing in-degree of dependents and checking for newly-ready tasks +- Detecting deadlock when no tasks are ready but uncompleted tasks remain + +### 9. Dynamic DAG Mutation (Bug-Fix Insertion) + +Per [`specs/ralph-loop-enhancements.md`](https://github.com/flora131/atomic/blob/d5c8a4e/specs/ralph-loop-enhancements.md) (Section 5.1.4), workers can dynamically mutate the task DAG by: +1. Inserting a new bug-fix task (e.g., `#0`) at the top +2. Adding `#0` to the `blockedBy` array of affected downstream tasks +3. Writing the modified `tasks.json` to disk + +A DAG orchestrator must handle this by: +- Re-reading `tasks.json` after each worker completes (or on file change) +- Rebuilding the dependency graph to detect new edges +- Pausing dispatch of tasks that now have new blockers + +## Architecture Gaps Summary + +| Gap | Current State | Required Change | +|-----|--------------|-----------------| +| **Dependency enforcement** | `blockedBy` exists but worker loop only checks `status !== "completed"` | Orchestrator must compute "ready set" (pending + all blockers completed) before dispatch | +| **Parallel dispatch** | Serial `for` loop with single `streamCompletionResolverRef` | Use `SubagentGraphBridge.spawnParallel()` or equivalent concurrent dispatch | +| **Worker task selection** | Worker picks "highest priority" without checking blockers | Either (a) orchestrator assigns specific task to each worker, or (b) worker prompt updated to check `blockedBy` | +| **File concurrency** | No locking; `Bun.write()` full overwrite | Centralized coordinator serializes `tasks.json` mutations | +| **Deadlock detection** | Not implemented | Detect when remaining tasks all have unsatisfied `blockedBy` and no worker is running | +| **Worker path** | Worker.md references `~/.atomic/workflows/{session_id}` | Should be `~/.atomic/workflows/sessions/{session_id}` | + +## Key Code References + +| Component | File:Line | Description | +|-----------|-----------|-------------| +| Worker loop (fresh) | `src/ui/commands/workflow-commands.ts:796-807` | Serial `for` loop spawning one worker at a time | +| Worker loop (resume) | `src/ui/commands/workflow-commands.ts:748-757` | Same pattern for resume path | +| `spawnSubagent` impl | `src/ui/chat.tsx:3359-3374` | Single-slot `streamCompletionResolverRef` blocking | +| `streamCompletionResolverRef` | `src/ui/chat.tsx:1897` | `useRef` holding single resolver — prevents parallelism | +| `saveTasksToActiveSession` | `src/ui/commands/workflow-commands.ts:141-163` | Writes tasks to `tasks.json` via `Bun.write()` | +| `readTasksFromDisk` | `src/ui/commands/workflow-commands.ts:166-176` | Reads/normalizes tasks from disk | +| `watchTasksJson` | `src/ui/commands/workflow-commands.ts:818-837` | File watcher for live UI updates | +| `buildSpecToTasksPrompt` | `src/graph/nodes/ralph.ts:19-58` | Prompt instructing LLM to generate `blockedBy` | +| `buildTaskListPreamble` | `src/graph/nodes/ralph.ts:66-81` | Serializes full task list for worker context | +| `sortTasksTopologically` | `src/ui/components/task-order.ts:19-122` | Kahn's algorithm (display only, reusable for scheduling) | +| `normalizeBlockedBy` | `src/ui/utils/task-status.ts:69-80` | Normalizes `blockedBy` arrays | +| `TaskListPanel` | `src/ui/components/task-list-panel.tsx:39-94` | Persistent file-driven task list UI | +| `TaskListIndicator` | `src/ui/components/task-list-indicator.tsx:85-134` | Renders task items with blocked-by annotations | +| `SubagentGraphBridge.spawnParallel` | `src/graph/subagent-bridge.ts:184-208` | Parallel sub-agent execution via `Promise.allSettled()` | +| `parallelSubagentNode` | `src/graph/nodes.ts:1802-1838` | Graph node for concurrent sub-agent spawning | +| Worker agent definition | `.claude/agents/worker.md` | Worker prompt — no `blockedBy` check for task selection | +| TodoWrite tool | `src/sdk/tools/todo-write.ts:53-59` | TodoItem interface with `blockedBy` field | +| TodoWrite interception | `src/ui/chat.tsx:2145-2146, 2254-2255` | Persists to `tasks.json` when ralph is active | +| Ralph session state | `src/ui/chat.tsx:1904-1907` | `ralphSessionDir`/`ralphSessionId` React state | +| Session directory | `src/workflows/session.ts:32-49` | `~/.atomic/workflows/sessions/{sessionId}/` | +| Ralph task state helpers | `src/ui/utils/ralph-task-state.ts:17-25` | `normalizeInterruptedTasks()` preserves `blockedBy` | + +## Related Research Documents + +- [`research/docs/2026-02-09-163-ralph-loop-enhancements.md`](https://github.com/flora131/atomic/blob/d5c8a4e/research/docs/2026-02-09-163-ralph-loop-enhancements.md) - Original ralph loop enhancement research (Issue #163) +- [`research/docs/2026-02-13-ralph-task-list-ui.md`](https://github.com/flora131/atomic/blob/d5c8a4e/research/docs/2026-02-13-ralph-task-list-ui.md) - Persistent task list UI implementation research +- [`specs/ralph-loop-enhancements.md`](https://github.com/flora131/atomic/blob/d5c8a4e/specs/ralph-loop-enhancements.md) - Detailed design spec including dependency resolution (Section 5.1.3) and dynamic DAG mutations (Section 5.1.4) +- [`specs/ralph-task-list-ui.md`](https://github.com/flora131/atomic/blob/d5c8a4e/specs/ralph-task-list-ui.md) - Task list UI spec with file-driven reactive pattern diff --git a/src/ui/chat.tsx b/src/ui/chat.tsx index c9b387b..1de9322 100644 --- a/src/ui/chat.tsx +++ b/src/ui/chat.tsx @@ -1282,12 +1282,16 @@ function buildContentSegments( tasksOffset?: number, tasksExpanded?: boolean, ): ContentSegment[] { - // Separate HITL tools from regular tools: + // Separate HITL tools and sub-agent Task tools from regular tools: // - Running/pending HITL tools are hidden (the dialog handles display) // - Completed HITL tools are shown as compact inline question records + // - Task tools are hidden — sub-agents are shown via ParallelAgentsTree; + // individual tool traces are available in the ctrl+o detail view only. const isHitlTool = (name: string) => name === "AskUserQuestion" || name === "question" || name === "ask_user"; - const visibleToolCalls = toolCalls.filter(tc => !isHitlTool(tc.toolName)); + const isSubAgentTool = (name: string) => + name === "Task" || name === "task"; + const visibleToolCalls = toolCalls.filter(tc => !isHitlTool(tc.toolName) && !isSubAgentTool(tc.toolName)); const completedHitlCalls = toolCalls.filter(tc => isHitlTool(tc.toolName) && tc.status === "completed"); // Build unified list of insertion points @@ -1317,13 +1321,38 @@ function buildContentSegments( }); } - // Add agents tree insertion (if agents exist and offset is defined) - if (agents && agents.length > 0 && agentsOffset !== undefined) { - insertions.push({ - offset: agentsOffset, - segment: { type: "agents", agents, key: "agents-tree" }, - consumesText: false, - }); + // Add agents tree insertion(s). When sub-agents are spawned sequentially + // (with text between invocations), each group of concurrent agents is + // rendered as a separate tree at its chronological content offset. + if (agents && agents.length > 0) { + // Build a map from agent ID → content offset using the Task tool calls + const taskToolOffsets = new Map(); + for (const tc of toolCalls) { + if (tc.toolName === "Task" || tc.toolName === "task") { + taskToolOffsets.set(tc.id, tc.contentOffsetAtStart ?? agentsOffset ?? 0); + } + } + + // Group agents by their content offset + const groups = new Map(); + for (const agent of agents) { + const offset = taskToolOffsets.get(agent.id) ?? agentsOffset ?? 0; + const group = groups.get(offset); + if (group) { + group.push(agent); + } else { + groups.set(offset, [agent]); + } + } + + // Create a tree insertion for each group + for (const [offset, groupAgents] of groups) { + insertions.push({ + offset, + segment: { type: "agents", agents: groupAgents, key: `agents-tree-${offset}` }, + consumesText: false, + }); + } } // Add task list insertion (if tasks exist and offset is defined) @@ -1921,6 +1950,10 @@ export function ChatApp({ // Store current input when entering history mode const savedInputRef = useRef(""); + // Track skills that have already shown the "loaded" UI indicator this session. + // Once a skill is loaded, subsequent invocations should not show the indicator again. + const loadedSkillsRef = useRef>(new Set()); + // Refs for streaming message updates const streamingMessageIdRef = useRef(null); // Ref to track when streaming started for duration calculation @@ -2261,6 +2294,10 @@ export function ChatApp({ skillName: string, _skillPath?: string ) => { + // Only show "loaded" indicator on the first invocation per session + if (loadedSkillsRef.current.has(skillName)) return; + loadedSkillsRef.current.add(skillName); + const skillLoad: MessageSkillLoad = { skillName, status: "loaded", @@ -3152,6 +3189,23 @@ export function ChatApp({ } // Handle streaming response if handler provided if (onStreamMessage) { + // Finalize any previous streaming message before starting a new one. + // This prevents duplicate "Generating..." spinners when sendSilentMessage + // is called from an @mention handler that already created a placeholder. + const prevStreamingId = streamingMessageIdRef.current; + if (prevStreamingId) { + setMessagesWindowed((prev: ChatMessage[]) => + prev.map((msg: ChatMessage) => + msg.id === prevStreamingId && msg.streaming + ? { ...msg, streaming: false } + : msg + ).filter((msg: ChatMessage) => + // Remove the previous placeholder if it has no content + !(msg.id === prevStreamingId && !msg.content.trim()) + ) + ); + } + // Increment stream generation so stale handleComplete callbacks become no-ops const currentGeneration = ++streamGenerationRef.current; isStreamingRef.current = true; @@ -3449,6 +3503,7 @@ export function ChatApp({ setTranscriptMode(false); clearHistoryBuffer(); setTrimmedMessageCount(0); + loadedSkillsRef.current.clear(); } // Handle clearMessages flag — persist history before clearing @@ -3508,8 +3563,12 @@ export function ChatApp({ addMessage("assistant", result.message); } - // Track skill load in message for UI indicator - if (result.skillLoaded) { + // Track skill load in message for UI indicator (only on first successful load per session; + // errors are always shown so the user sees the failure) + if (result.skillLoaded && (result.skillLoadError || !loadedSkillsRef.current.has(result.skillLoaded))) { + if (!result.skillLoadError) { + loadedSkillsRef.current.add(result.skillLoaded); + } const skillLoad: MessageSkillLoad = { skillName: result.skillLoaded, status: result.skillLoadError ? "error" : "loaded", diff --git a/src/ui/commands/skill-commands.ts b/src/ui/commands/skill-commands.ts index 1259cdf..035c4a6 100644 --- a/src/ui/commands/skill-commands.ts +++ b/src/ui/commands/skill-commands.ts @@ -1689,7 +1689,15 @@ function createDiskSkillCommand(skill: DiskSkillDefinition): CommandDefinition { return { success: true, skillLoaded: skill.name }; } const expandedPrompt = expandArguments(body, skillArgs); - context.sendSilentMessage(expandedPrompt); + // Prepend a directive so the model acts on the already-expanded + // skill content rather than re-loading the raw skill via the SDK's + // built-in "skill" tool (which would lose the $ARGUMENTS expansion). + const directive = + `\n` + + `The "${skill.name}" skill has already been loaded with the user's arguments below. ` + + `Do NOT invoke the Skill tool for "${skill.name}" — follow the instructions directly.\n` + + `\n\n`; + context.sendSilentMessage(directive + expandedPrompt); return { success: true, skillLoaded: skill.name }; }, }; diff --git a/src/ui/components/parallel-agents-tree.tsx b/src/ui/components/parallel-agents-tree.tsx index e925bc7..c5b59a9 100644 --- a/src/ui/components/parallel-agents-tree.tsx +++ b/src/ui/components/parallel-agents-tree.tsx @@ -411,16 +411,6 @@ function AgentRow({ agent, isLast, compact, themeColors }: AgentRowProps): React )} - {/* Result summary for completed agents */} - {isCompleted && agent.result && ( - - - {continuationPrefix}{SUB_STATUS_PAD} - - {CONNECTOR.subStatus} {truncateText(agent.result, 60)} - - - )} ); } @@ -459,16 +449,6 @@ function AgentRow({ agent, isLast, compact, themeColors }: AgentRowProps): React )} - {/* Result summary for completed agents */} - {isCompleted && agent.result && ( - - - {continuationPrefix}{SUB_STATUS_PAD} - - {CONNECTOR.subStatus} {truncateText(agent.result, 60)} - - - )} ); } diff --git a/src/ui/components/tool-result.tsx b/src/ui/components/tool-result.tsx index c38e0c5..c248ac8 100644 --- a/src/ui/components/tool-result.tsx +++ b/src/ui/components/tool-result.tsx @@ -249,7 +249,7 @@ export function ToolResult({ // Skill tool: render SkillLoadIndicator directly, bypassing standard tool result layout const normalizedToolName = toolName.toLowerCase(); if (normalizedToolName === "skill") { - const skillName = (input.skill as string) || "unknown"; + const skillName = (input.skill as string) || (input.name as string) || "unknown"; const skillStatus: SkillLoadStatus = status === "completed" ? "loaded" : status === "error" ? "error" : "loading"; const errorMessage = status === "error" && typeof output === "string" ? output : undefined; diff --git a/src/ui/index.ts b/src/ui/index.ts index a4f3a34..3d89b17 100644 --- a/src/ui/index.ts +++ b/src/ui/index.ts @@ -508,6 +508,26 @@ export async function startChatUI( const input = data.toolInput as Record; const prompt = (input.prompt as string) ?? (input.description as string) ?? ""; pendingTaskEntries.push({ toolId, prompt: prompt || undefined }); + + // Eagerly create a ParallelAgent so the tree appears immediately + // instead of waiting for the SDK's subagent.start event (which may + // arrive late or not at all). When subagent.start fires later, the + // entry is updated in-place with the real subagentId. + if (state.parallelAgentHandler) { + const agentType = (input.subagent_type as string) ?? (input.agent_type as string) ?? "agent"; + const taskDesc = (input.description as string) ?? prompt ?? "Sub-agent task"; + const newAgent: ParallelAgent = { + id: toolId, + name: agentType, + task: taskDesc, + status: "running", + startedAt: new Date().toISOString(), + currentTool: `Starting ${agentType}…`, + }; + state.parallelAgents = [...state.parallelAgents, newAgent]; + state.parallelAgentHandler(state.parallelAgents); + toolCallToAgentMap.set(toolId, toolId); + } } // Reset post-task text suppression when the model invokes a new tool — @@ -625,21 +645,44 @@ export async function startChatUI( || toolCallToAgentMap.get(toolId); if (agentId) { + // Set result AND finalize status — if subagent.complete never + // fired (eager agent path), this ensures the agent transitions + // from "running" → "completed" when the Task tool returns. state.parallelAgents = state.parallelAgents.map((a) => - a.id === agentId ? { ...a, result: resultStr } : a + a.id === agentId + ? { + ...a, + result: resultStr, + status: a.status === "running" || a.status === "pending" + ? "completed" as const + : a.status, + currentTool: a.status === "running" || a.status === "pending" + ? undefined + : a.currentTool, + durationMs: a.durationMs ?? (Date.now() - new Date(a.startedAt).getTime()), + } + : a ); state.parallelAgentHandler(state.parallelAgents); // Clean up consumed mappings if (sdkCorrelationId) toolCallToAgentMap.delete(sdkCorrelationId); toolCallToAgentMap.delete(toolId); } else { - // Fallback: find the last completed agent without a result + // Fallback: find the last completed-or-running agent without a result const agentToUpdate = [...state.parallelAgents] .reverse() - .find((a) => a.status === "completed" && !a.result); + .find((a) => (a.status === "completed" || a.status === "running") && !a.result); if (agentToUpdate) { state.parallelAgents = state.parallelAgents.map((a) => - a.id === agentToUpdate.id ? { ...a, result: resultStr } : a + a.id === agentToUpdate.id + ? { + ...a, + result: resultStr, + status: "completed" as const, + currentTool: undefined, + durationMs: a.durationMs ?? (Date.now() - new Date(a.startedAt).getTime()), + } + : a ); state.parallelAgentHandler(state.parallelAgents); } @@ -650,6 +693,28 @@ export async function startChatUI( // streaming text — we suppress text that matches the result but // allow the model's real follow-up response through. state.suppressPostTaskResult = resultStr; + } else if ( + isTaskTool && + state.parallelAgentHandler && + state.parallelAgents.length > 0 + ) { + // Task tool completed without a result — still finalize any + // eagerly-created agent that hasn't been marked completed yet. + const agentId = toolCallToAgentMap.get(toolId); + if (agentId) { + state.parallelAgents = state.parallelAgents.map((a) => + a.id === agentId && (a.status === "running" || a.status === "pending") + ? { + ...a, + status: "completed" as const, + currentTool: undefined, + durationMs: a.durationMs ?? (Date.now() - new Date(a.startedAt).getTime()), + } + : a + ); + state.parallelAgentHandler(state.parallelAgents); + toolCallToAgentMap.delete(toolId); + } } // Clean up tracking @@ -733,29 +798,54 @@ export async function startChatUI( || data.subagentType || "Sub-agent"; const agentTypeName = data.subagentType ?? "agent"; - const newAgent: ParallelAgent = { - id: data.subagentId, - name: agentTypeName, - task, - status: "running", - startedAt: event.timestamp ?? new Date().toISOString(), - // Set initial currentTool so the agent shows activity immediately - // instead of just "Initializing..." until tool events arrive - currentTool: `Running ${agentTypeName}…`, - }; - state.parallelAgents = [...state.parallelAgents, newAgent]; + + // Check if an eager agent was already created from tool.start. + // If so, update it in-place with the real subagentId instead of + // creating a duplicate entry. + const eagerToolId = pendingTaskEntry?.toolId; + const hasEagerAgent = eagerToolId + ? state.parallelAgents.some(a => a.id === eagerToolId) + : false; + + if (hasEagerAgent && eagerToolId) { + // Merge: update existing eager agent with real subagentId + state.parallelAgents = state.parallelAgents.map(a => + a.id === eagerToolId + ? { + ...a, + id: data.subagentId!, + name: agentTypeName, + task: data.task || a.task, + currentTool: `Running ${agentTypeName}…`, + } + : a + ); + // Re-point correlation: toolId now maps to the real subagentId + toolCallToAgentMap.set(eagerToolId, data.subagentId!); + } else { + // No eager agent — create fresh (backward compat for non-Task subagents) + const newAgent: ParallelAgent = { + id: data.subagentId, + name: agentTypeName, + task, + status: "running", + startedAt: event.timestamp ?? new Date().toISOString(), + currentTool: `Running ${agentTypeName}…`, + }; + state.parallelAgents = [...state.parallelAgents, newAgent]; + } state.parallelAgentHandler(state.parallelAgents); // Build correlation mapping: SDK-level ID → agentId // This allows tool.complete to attribute results to the correct agent. const sdkCorrelationId = data.toolUseID ?? data.toolCallId; if (sdkCorrelationId) { - toolCallToAgentMap.set(sdkCorrelationId, data.subagentId); + toolCallToAgentMap.set(sdkCorrelationId, data.subagentId!); } // FIFO fallback: consume pending Task toolId and map it to this agent const fifoToolId = pendingTaskEntry?.toolId; if (fifoToolId) { - toolCallToAgentMap.set(fifoToolId, data.subagentId); + toolCallToAgentMap.set(fifoToolId, data.subagentId!); } } }); @@ -900,6 +990,13 @@ export async function startChatUI( // Reset the suppress state at the start of each stream state.suppressPostTaskResult = null; + // Prefix-based accumulator for post-task text suppression. + // Tracks accumulated text so we can check if the model is echoing the + // sub-agent result (text arrives sequentially matching from the start) + // vs. generating genuine follow-up content. + let suppressAccumulator = ""; + let suppressTarget: string | null = null; + for await (const message of abortableStream) { // Handle text content if (message.type === "text" && typeof message.content === "string") { @@ -911,24 +1008,43 @@ export async function startChatUI( // After a Task tool completes, the SDK model may echo back the raw // tool_response as streaming text. Suppress only text that looks - // like the echoed result (starts with JSON delimiters or is a - // substring of the stored result). Once non-echo text arrives, - // clear the suppression so the model's real response flows through. + // like the echoed result (starts with JSON delimiters or sequentially + // matches the stored result from the beginning). Once non-echo text + // arrives, clear the suppression so the model's real response flows. const cachedResult = state.suppressPostTaskResult; + // Reset accumulator when suppression target changes + if (cachedResult !== suppressTarget) { + suppressAccumulator = ""; + suppressTarget = cachedResult; + } if (cachedResult !== null) { const trimmed = message.content.trim(); if (trimmed.length === 0) { - // Skip empty/whitespace chunks while suppression is active + // Accumulate whitespace while suppression is active + suppressAccumulator += message.content; continue; } const isJsonEcho = trimmed.startsWith("{") || trimmed.startsWith("["); - const isResultSubstring = (cachedResult as string).indexOf(trimmed) !== -1; - if (isJsonEcho || isResultSubstring) { + if (isJsonEcho) { + continue; + } + // Check if accumulated text + current chunk is a prefix of the + // cached result. When the model echoes a result, text arrives + // sequentially matching from the start of the result string. + // The old substring check (`cachedResult.indexOf(trimmed) !== -1`) + // was too aggressive — small streaming chunks (single words) are + // almost always found as substrings of long result strings, which + // incorrectly suppressed the model's genuine follow-up text. + const candidate = (suppressAccumulator + message.content).trimStart(); + if ((cachedResult as string).startsWith(candidate)) { + suppressAccumulator += message.content; continue; } - // Non-echo text arrived — model is generating real output. - // Clear suppression and let this chunk (and all future) through. + // Not an echo — clear suppression, let this chunk through. + // Accumulated text was part of the echo prefix and stays suppressed. state.suppressPostTaskResult = null; + suppressTarget = null; + suppressAccumulator = ""; } if (message.content.length > 0) { diff --git a/src/ui/tools/registry.ts b/src/ui/tools/registry.ts index a295b27..69a0e7e 100644 --- a/src/ui/tools/registry.ts +++ b/src/ui/tools/registry.ts @@ -758,12 +758,12 @@ export const skillToolRenderer: ToolRenderer = { icon: STATUS.active, getTitle(props: ToolRenderProps): string { - const skillName = (props.input.skill as string) || "unknown"; + const skillName = (props.input.skill as string) || (props.input.name as string) || "unknown"; return `Skill(${skillName})`; }, render(props: ToolRenderProps): ToolRenderResult { - const skillName = (props.input.skill as string) || "unknown"; + const skillName = (props.input.skill as string) || (props.input.name as string) || "unknown"; return { title: `Skill(${skillName})`, content: ["Successfully loaded skill"],