From 62fb26c9c992f9dcd9706f3053dd1430759f034d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Jan 2026 23:04:56 -0500 Subject: [PATCH 01/13] Add tail command to view streaming agent output (#97) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 't' key binding in the TUI to view live output from running agents. This helps users monitor agent progress and identify when an agent is stuck or going in circles, allowing early cancellation to save tokens. New components: - OutputBuffer: Thread-safe ring buffer for capturing agent output with memory limits (512KB per job, 4MB total) - Output normalizers: Convert agent-specific formats to readable text - Claude: Parses stream-json, extracts content, shows [Tool: X] indicators - OpenCode: Strips ANSI codes, filters tool call JSON - Generic: Default handler for other agents - API endpoint: GET /api/job/output for polling and streaming modes - TUI tail view with scrolling, cancel support, and auto-scroll Key bindings in tail view: - ↑/↓/j/k: Scroll - PgUp/PgDn: Page scroll - g/G: Top/bottom - x: Cancel the running job - esc/q: Return to queue Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui.go | 315 +++++++++++++++++++++++++++ docs/plans/issue-97-tail-command.md | 252 +++++++++++++++++++++ internal/daemon/normalize.go | 223 +++++++++++++++++++ internal/daemon/normalize_test.go | 271 +++++++++++++++++++++++ internal/daemon/outputbuffer.go | 242 ++++++++++++++++++++ internal/daemon/outputbuffer_test.go | 262 ++++++++++++++++++++++ internal/daemon/server.go | 108 +++++++++ internal/daemon/worker.go | 30 ++- 8 files changed, 1702 insertions(+), 1 deletion(-) create mode 100644 docs/plans/issue-97-tail-command.md create mode 100644 internal/daemon/normalize.go create mode 100644 internal/daemon/normalize_test.go create mode 100644 internal/daemon/outputbuffer.go create mode 100644 internal/daemon/outputbuffer_test.go diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index 9dc671c..75a3646 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -95,6 +95,7 @@ const ( tuiViewComment tuiViewCommitMsg tuiViewHelp + tuiViewTail ) // repoFilterItem represents a repo (or group of repos with same display name) in the filter modal @@ -178,6 +179,14 @@ type tuiModel struct { // Help view state helpFromView tuiView // View to return to after closing help + + // Tail view state + tailJobID int64 // Job being tailed + tailLines []tailLine // Buffer of output lines + tailScroll int // Scroll position + tailStreaming bool // True if job is still running + tailFromView tuiView // View to return to + tailFollow bool // True if auto-scrolling to bottom (follow mode) } // pendingState tracks a pending addressed toggle with sequence number @@ -186,6 +195,23 @@ type pendingState struct { seq uint64 } +// tailLine represents a single line of agent output in the tail view +type tailLine struct { + timestamp time.Time + text string + lineType string // "text", "tool", "thinking", "error" +} + +// tuiTailOutputMsg delivers output lines from the daemon +type tuiTailOutputMsg struct { + lines []tailLine + hasMore bool // true if job is still running + err error +} + +// tuiTailTickMsg triggers a refresh of the tail output +type tuiTailTickMsg struct{} + type tuiTickMsg time.Time type tuiJobsMsg struct { jobs []storage.ReviewJob @@ -647,6 +673,42 @@ func (m tuiModel) fetchReviewForPrompt(jobID int64) tea.Cmd { } } +func (m tuiModel) fetchTailOutput(jobID int64) tea.Cmd { + return func() tea.Msg { + resp, err := m.client.Get(fmt.Sprintf("%s/api/job/output?job_id=%d", m.serverAddr, jobID)) + if err != nil { + return tuiTailOutputMsg{err: err} + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return tuiTailOutputMsg{err: fmt.Errorf("fetch output: %s", resp.Status)} + } + + var result struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []struct { + TS string `json:"ts"` + Text string `json:"text"` + LineType string `json:"line_type"` + } `json:"lines"` + HasMore bool `json:"has_more"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return tuiTailOutputMsg{err: err} + } + + lines := make([]tailLine, len(result.Lines)) + for i, l := range result.Lines { + ts, _ := time.Parse(time.RFC3339Nano, l.TS) + lines[i] = tailLine{timestamp: ts, text: l.Text, lineType: l.LineType} + } + + return tuiTailOutputMsg{lines: lines, hasMore: result.HasMore} + } +} + // formatClipboardContent prepares review content for clipboard with a header line func formatClipboardContent(review *storage.Review) string { if review == nil || review.Output == "" { @@ -1355,6 +1417,82 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } } + // Handle tail view + if m.currentView == tuiViewTail { + switch msg.String() { + case "ctrl+c": + return m, tea.Quit + case "esc", "q": + m.currentView = m.tailFromView + m.tailStreaming = false + return m, nil + case "x": + // Cancel the job from tail view + if m.tailJobID > 0 && m.tailStreaming { + for i := range m.jobs { + if m.jobs[i].ID == m.tailJobID { + job := &m.jobs[i] + if job.Status == storage.JobStatusRunning { + oldStatus := job.Status + oldFinishedAt := job.FinishedAt + job.Status = storage.JobStatusCanceled + now := time.Now() + job.FinishedAt = &now + m.tailStreaming = false + return m, m.cancelJob(job.ID, oldStatus, oldFinishedAt) + } + break + } + } + } + return m, nil + case "up", "k": + m.tailFollow = false // Stop auto-scroll when user scrolls up + if m.tailScroll > 0 { + m.tailScroll-- + } + return m, nil + case "down", "j": + m.tailScroll++ + return m, nil + case "pgup": + m.tailFollow = false // Stop auto-scroll when user scrolls up + visibleLines := m.height - 5 + if visibleLines < 1 { + visibleLines = 1 + } + m.tailScroll -= visibleLines + if m.tailScroll < 0 { + m.tailScroll = 0 + } + return m, tea.ClearScreen + case "pgdown": + visibleLines := m.height - 5 + if visibleLines < 1 { + visibleLines = 1 + } + m.tailScroll += visibleLines + return m, tea.ClearScreen + case "home", "g": + m.tailFollow = false // Stop auto-scroll when going to top + m.tailScroll = 0 + return m, nil + case "end", "G": + m.tailFollow = true // Resume auto-scroll when going to bottom + visibleLines := m.height - 5 + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + m.tailScroll = maxScroll + return m, nil + } + return m, nil + } + switch msg.String() { case "ctrl+c", "q": if m.currentView == tuiViewReview { @@ -1714,6 +1852,26 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } } + case "t": + // Tail running job output + if m.currentView == tuiViewQueue && len(m.jobs) > 0 && m.selectedIdx >= 0 && m.selectedIdx < len(m.jobs) { + job := m.jobs[m.selectedIdx] + if job.Status == storage.JobStatusRunning { + m.tailJobID = job.ID + m.tailLines = nil + m.tailScroll = 0 + m.tailStreaming = true + m.tailFollow = true // Start in follow mode (auto-scroll) + m.tailFromView = tuiViewQueue + m.currentView = tuiViewTail + return m, tea.Batch(tea.ClearScreen, m.fetchTailOutput(job.ID)) + } else if job.Status == storage.JobStatusQueued { + m.flashMessage = "Job is queued - not yet running" + m.flashExpiresAt = time.Now().Add(2 * time.Second) + m.flashView = tuiViewQueue + } + } + case "f": // Open filter modal if m.currentView == tuiViewQueue { @@ -1933,6 +2091,12 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // initial load or user-initiated actions (filter changes, etc.) return m, tea.Batch(m.tick(), m.fetchJobs(), m.fetchStatus()) + case tuiTailTickMsg: + // Refresh tail output if still in tail view and streaming + if m.currentView == tuiViewTail && m.tailStreaming && m.tailJobID > 0 { + return m, m.fetchTailOutput(m.tailJobID) + } + case tuiJobsMsg: m.loadingMore = false if !msg.append { @@ -2106,6 +2270,35 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.currentView = tuiViewPrompt m.promptScroll = 0 + case tuiTailOutputMsg: + m.consecutiveErrors = 0 + if msg.err != nil { + m.err = msg.err + return m, nil + } + if m.currentView == tuiViewTail { + m.tailLines = msg.lines + m.tailStreaming = msg.hasMore + // Auto-scroll to bottom only if in follow mode + if m.tailFollow && len(m.tailLines) > 0 { + visibleLines := m.height - 5 + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + m.tailScroll = maxScroll + } + // Continue polling if still streaming + if m.tailStreaming { + return m, tea.Tick(500*time.Millisecond, func(t time.Time) tea.Msg { + return tuiTailTickMsg{} + }) + } + } + case tuiAddressedMsg: if m.currentReview != nil { m.currentReview.Addressed = bool(msg) @@ -2333,6 +2526,9 @@ func (m tuiModel) View() string { if m.currentView == tuiViewHelp { return m.renderHelpView() } + if m.currentView == tuiViewTail { + return m.renderTailView() + } if m.currentView == tuiViewPrompt && m.currentReview != nil { return m.renderPromptView() } @@ -2883,6 +3079,9 @@ func (m tuiModel) renderReviewView() string { func (m tuiModel) renderPromptView() string { var b strings.Builder + // Clear screen and move cursor to home position to prevent artifacts on scroll + b.WriteString("\x1b[2J\x1b[H") + review := m.currentReview if review.Job != nil { ref := shortJobRef(*review.Job) @@ -3245,6 +3444,121 @@ func (m tuiModel) renderCommitMsgView() string { return b.String() } +func (m tuiModel) renderTailView() string { + var b strings.Builder + + // Title with job info + var title string + for _, job := range m.jobs { + if job.ID == m.tailJobID { + repoName := m.getDisplayName(job.RepoPath, job.RepoName) + shortRef := job.GitRef + if len(shortRef) > 7 { + shortRef = shortRef[:7] + } + title = fmt.Sprintf("Tail: %s %s (#%d)", repoName, shortRef, job.ID) + break + } + } + if title == "" { + title = fmt.Sprintf("Tail: Job #%d", m.tailJobID) + } + if m.tailStreaming { + title += " " + tuiRunningStyle.Render("● live") + } else { + title += " " + tuiDoneStyle.Render("● complete") + } + b.WriteString(tuiTitleStyle.Render(title)) + b.WriteString("\x1b[K\n") + + // Calculate visible area + reservedLines := 4 // title + separator + status + help + visibleLines := m.height - reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + + // Clamp scroll + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + scroll := m.tailScroll + if scroll > maxScroll { + scroll = maxScroll + } + if scroll < 0 { + scroll = 0 + } + + // Separator + b.WriteString(strings.Repeat("─", m.width)) + b.WriteString("\x1b[K\n") + + // Render lines + linesWritten := 0 + if len(m.tailLines) == 0 { + b.WriteString(tuiStatusStyle.Render("Waiting for output...")) + b.WriteString("\x1b[K\n") + linesWritten++ + } else { + end := scroll + visibleLines + if end > len(m.tailLines) { + end = len(m.tailLines) + } + for i := scroll; i < end; i++ { + line := m.tailLines[i] + // Format with timestamp + ts := line.timestamp.Format("15:04:05") + var text string + switch line.lineType { + case "tool": + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiQueuedStyle.Render(line.text)) + case "error": + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiFailedStyle.Render(line.text)) + default: + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), line.text) + } + // Truncate to width + if runewidth.StringWidth(text) > m.width { + text = runewidth.Truncate(text, m.width-3, "...") + } + b.WriteString(text) + b.WriteString("\x1b[K\n") + linesWritten++ + } + } + + // Pad remaining lines + for linesWritten < visibleLines { + b.WriteString("\x1b[K\n") + linesWritten++ + } + + // Status line with position and follow mode + var status string + if len(m.tailLines) > visibleLines { + status = fmt.Sprintf("[%d-%d of %d lines]", scroll+1, scroll+linesWritten, len(m.tailLines)) + } else { + status = fmt.Sprintf("[%d lines]", len(m.tailLines)) + } + if m.tailFollow { + status += " " + tuiRunningStyle.Render("[following]") + } else { + status += " " + tuiStatusStyle.Render("[paused - G to follow]") + } + b.WriteString(tuiStatusStyle.Render(status)) + b.WriteString("\x1b[K\n") + + // Help + help := "↑/↓: scroll | g/G: top/follow | x: cancel | esc/q: back" + b.WriteString(tuiHelpStyle.Render(help)) + b.WriteString("\x1b[K") + b.WriteString("\x1b[J") // Clear to end of screen + + return b.String() +} + func (m tuiModel) renderHelpView() string { var b strings.Builder @@ -3272,6 +3586,7 @@ func (m tuiModel) renderHelpView() string { keys: []struct{ key, desc string }{ {"a", "Mark as addressed"}, {"c", "Add comment"}, + {"t", "Tail running job output"}, {"x", "Cancel job"}, {"r", "Re-run job"}, {"y", "Copy review to clipboard"}, diff --git a/docs/plans/issue-97-tail-command.md b/docs/plans/issue-97-tail-command.md new file mode 100644 index 0000000..2127831 --- /dev/null +++ b/docs/plans/issue-97-tail-command.md @@ -0,0 +1,252 @@ +# Implementation Plan: Issue #97 - Tail Command for Streaming Agent Output + +## Overview + +This feature adds a `t` key binding in the TUI to view streaming output of running agents in real-time, with `x` to cancel the job. The implementation requires: + +1. **Output capture infrastructure** in the daemon worker +2. **Output buffer** with memory limits and thread-safe access +3. **Output normalization** to convert agent-specific formats (especially Claude's stream-json) to readable text +4. **New API endpoint** to stream/fetch captured output +5. **TUI tail view** with scrolling and cancel support + +--- + +## 1. Output Buffer Design + +### Location: `internal/daemon/outputbuffer.go` (new file) + +Create a thread-safe ring buffer for capturing agent output per job, with memory limits. + +### Design: + +```go +// OutputBuffer stores streaming output for running jobs with memory limits. +type OutputBuffer struct { + mu sync.RWMutex + buffers map[int64]*JobOutput // jobID -> output + maxPerJob int // max bytes per job (default: 512KB) + maxTotal int // max total bytes across all jobs (default: 4MB) + totalBytes int // current total bytes +} + +// JobOutput stores output for a single job +type JobOutput struct { + lines []OutputLine // Ring buffer of lines + writeIdx int // Next write position + count int // Lines in buffer (up to maxLines) + totalBytes int // Bytes used by this job + startTime time.Time // When output capture started +} + +// OutputLine represents a single line of normalized output +type OutputLine struct { + Timestamp time.Time `json:"ts"` + Text string `json:"text"` + Type string `json:"type"` // "text", "tool", "thinking", "error" +} +``` + +### Key features: +- **Per-job limit**: 512KB default, configurable +- **Total limit**: 4MB default across all jobs +- **Ring buffer**: Oldest lines evicted when limit reached +- **Line-based**: Store by lines for efficient scrolling +- **Automatic cleanup**: Remove buffer when job completes/fails + +--- + +## 2. Output Normalization Strategy + +### Location: `internal/daemon/normalize.go` (new file) + +Different agents produce output in different formats. Normalize to human-readable text. + +### Claude Code (stream-json format) + +The Claude agent outputs JSON like: +```json +{"type": "assistant", "message": {"content": "..."}} +{"type": "result", "result": "..."} +{"type": "tool_use", "name": "Read", ...} +``` + +**Normalization:** +- `assistant` messages → extract content as text +- `result` messages → extract result as text +- `tool_use` → `[Tool: Read]` indicator +- `tool_result` → `[Tool result: N bytes]` (abbreviated) + +### OpenCode / Other Agents + +Plain text with ANSI codes: +- Strip ANSI escape sequences +- Filter tool call JSON lines +- Pass through readable content + +### Normalizer Registry + +```go +var normalizers = map[string]OutputNormalizer{ + "claude-code": NormalizeClaudeOutput, + "opencode": NormalizeOpenCodeOutput, + // Default for others: strip ANSI, pass through +} +``` + +--- + +## 3. Worker Integration + +### Location: `internal/daemon/worker.go` + +### Changes: +1. Add `outputBuffers *OutputBuffer` to WorkerPool struct +2. Initialize in NewWorkerPool with memory limits +3. In `processJob()` at line 320, create output writer and pass to `a.Review()`: + +```go +// Create output writer for tail command +outputWriter := wp.outputBuffers.Writer(job.ID, agentName) +defer wp.outputBuffers.CloseJob(job.ID) + +output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, outputWriter) +``` + +4. Add `GetJobOutput(jobID)` method to expose output for API + +--- + +## 4. API Endpoint Design + +### Location: `internal/daemon/server.go` + +### New endpoint: `GET /api/job/output?job_id=123` + +**Query Parameters:** +- `job_id` (required): Job ID to tail +- `stream` (optional): "1" for SSE streaming + +**Non-streaming response:** +```json +{ + "job_id": 123, + "status": "running", + "lines": [ + {"ts": "2024-01-25T10:30:00Z", "text": "Analyzing code...", "type": "text"}, + {"ts": "2024-01-25T10:30:01Z", "text": "[Tool: Read]", "type": "tool"} + ], + "has_more": true +} +``` + +**Streaming response (newline-delimited JSON):** +```json +{"type": "line", "ts": "...", "text": "...", "line_type": "text"} +{"type": "complete", "status": "done"} +``` + +--- + +## 5. TUI Tail View Implementation + +### Location: `cmd/roborev/tui.go` + +### New view type: +```go +const ( + // ... existing views ... + tuiViewTail // NEW +) +``` + +### New model fields: +```go +tailJobID int64 // Job being tailed +tailLines []tailLine // Buffer of output lines +tailScroll int // Scroll position +tailStreaming bool // True if actively streaming +tailFromView tuiView // View to return to +``` + +### Key bindings: +- `t` on running job → enter tail view +- `x` in tail view → cancel job +- `↑/↓/j/k` → scroll +- `pgup/pgdn` → page scroll +- `g/G` → top/bottom +- `q/esc` → return to queue + +### Rendering: +- Title showing job ID and streaming status +- Timestamped output lines with type indicators +- Status line showing position and live/complete state +- Help bar with available keys + +--- + +## 6. Thread Safety Considerations + +- **OutputBuffer**: RWMutex for buffer map, per-job mutex for fine-grained locking +- **Worker**: Output writer created per-job, single goroutine owner +- **API**: Channel-based subscription for streaming +- **TUI**: bubbletea handles all UI on single goroutine, async HTTP via tea.Cmd + +--- + +## 7. File Changes Summary + +### New Files +1. `internal/daemon/outputbuffer.go` - Buffer implementation +2. `internal/daemon/normalize.go` - Output normalizers +3. `internal/daemon/outputbuffer_test.go` - Buffer tests +4. `internal/daemon/normalize_test.go` - Normalizer tests + +### Modified Files +5. `internal/daemon/worker.go` - Add output capture +6. `internal/daemon/server.go` - Add API endpoint +7. `cmd/roborev/tui.go` - Add tail view + +--- + +## 8. Implementation Order + +### Phase 1: Output Buffer +- Implement core buffer with memory limits +- Unit tests for buffer operations + +### Phase 2: Normalization +- Implement normalizers for Claude, OpenCode, generic +- Unit tests for parsing edge cases + +### Phase 3: Worker Integration +- Wire up output capture in processJob +- Test with real agents + +### Phase 4: API Endpoint +- Implement polling mode first +- Add streaming mode +- Integration tests + +### Phase 5: TUI View +- Implement basic view with polling +- Add scrolling and navigation +- Polish UI + +--- + +## 9. Testing Strategy + +### Unit Tests +- OutputBuffer: append, eviction, subscribe, memory limits +- Normalizers: Claude stream-json parsing, ANSI stripping + +### Integration Tests +- API endpoint: polling and streaming modes +- Worker: output capture during real agent execution + +### Manual Testing +- Test with Claude Code (stream-json) +- Test with OpenCode (plain text + ANSI) +- Test memory limits with large output +- Test cancel during tail diff --git a/internal/daemon/normalize.go b/internal/daemon/normalize.go new file mode 100644 index 0000000..4cc575b --- /dev/null +++ b/internal/daemon/normalize.go @@ -0,0 +1,223 @@ +package daemon + +import ( + "encoding/json" + "regexp" + "strings" +) + +// ansiEscapePattern matches ANSI escape sequences (colors, cursor movement, etc.) +var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;?]*[a-zA-Z]|\x1b\]([^\x07\x1b]|\x1b[^\\])*(\x07|\x1b\\)`) + +// GetNormalizer returns the appropriate normalizer for an agent. +func GetNormalizer(agentName string) OutputNormalizer { + switch agentName { + case "claude-code": + return NormalizeClaudeOutput + case "opencode": + return NormalizeOpenCodeOutput + default: + return NormalizeGenericOutput + } +} + +// claudeNoisePatterns are status messages from Claude CLI that aren't useful progress info +var claudeNoisePatterns = []string{ + "mcp startup:", + "Initializing", + "Connected to", + "Session started", +} + +// isClaudeNoise returns true if the line is a Claude CLI status message to filter out +func isClaudeNoise(line string) bool { + for _, pattern := range claudeNoisePatterns { + if strings.Contains(line, pattern) { + return true + } + } + return false +} + +// NormalizeClaudeOutput parses Claude's stream-json format and extracts readable content. +func NormalizeClaudeOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Filter out Claude CLI noise/status messages + if isClaudeNoise(line) { + return nil + } + + // Try to parse as JSON + var msg struct { + Type string `json:"type"` + Subtype string `json:"subtype,omitempty"` + Message struct { + Content string `json:"content,omitempty"` + } `json:"message,omitempty"` + Result string `json:"result,omitempty"` + SessionID string `json:"session_id,omitempty"` + + // Tool-related fields + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + + // Content delta for streaming + ContentBlockDelta struct { + Delta struct { + Text string `json:"text,omitempty"` + } `json:"delta,omitempty"` + } `json:"content_block_delta,omitempty"` + } + + if err := json.Unmarshal([]byte(line), &msg); err != nil { + // Not JSON - return as raw text (shouldn't happen with Claude) + return &OutputLine{Text: stripANSI(line), Type: "text"} + } + + switch msg.Type { + case "assistant": + // Full assistant message with content + if msg.Message.Content != "" { + // Replace embedded newlines with spaces to keep output on single line + text := strings.ReplaceAll(msg.Message.Content, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + return &OutputLine{Text: text, Type: "text"} + } + // Skip empty assistant messages (e.g., start of response) + return nil + + case "result": + // Final result summary + if msg.Result != "" { + // Replace embedded newlines with spaces + text := strings.ReplaceAll(msg.Result, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + return &OutputLine{Text: text, Type: "text"} + } + return nil + + case "tool_use": + // Tool being called + if msg.Name != "" { + return &OutputLine{Text: "[Tool: " + msg.Name + "]", Type: "tool"} + } + return nil + + case "tool_result": + // Tool finished - could show brief indicator + return &OutputLine{Text: "[Tool completed]", Type: "tool"} + + case "content_block_start": + // Start of a content block - skip + return nil + + case "content_block_delta": + // Streaming text delta + if msg.ContentBlockDelta.Delta.Text != "" { + // Replace embedded newlines with spaces + text := strings.ReplaceAll(msg.ContentBlockDelta.Delta.Text, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + if text == "" || text == " " { + return nil + } + return &OutputLine{Text: text, Type: "text"} + } + return nil + + case "content_block_stop": + // End of content block - skip + return nil + + case "message_start", "message_delta", "message_stop": + // Message lifecycle events - skip + return nil + + case "system": + // System messages (e.g., init) + if msg.Subtype == "init" { + if msg.SessionID != "" { + return &OutputLine{Text: "[Session: " + msg.SessionID[:8] + "...]", Type: "text"} + } + } + return nil + + case "error": + // Error message + return &OutputLine{Text: "[Error in stream]", Type: "error"} + + default: + // Unknown type - skip to avoid noise + return nil + } +} + +// NormalizeOpenCodeOutput normalizes OpenCode output (plain text with ANSI codes). +func NormalizeOpenCodeOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Filter tool call JSON lines (same logic as in agent package) + if isToolCallJSON(line) { + return &OutputLine{Text: "[Tool call]", Type: "tool"} + } + + // Strip ANSI codes for clean display + text := stripANSI(line) + if text == "" { + return nil + } + + return &OutputLine{Text: text, Type: "text"} +} + +// NormalizeGenericOutput is the default normalizer for other agents. +func NormalizeGenericOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Strip ANSI codes + text := stripANSI(line) + if text == "" { + return nil + } + + // Filter tool call JSON if present + if isToolCallJSON(text) { + return &OutputLine{Text: "[Tool call]", Type: "tool"} + } + + return &OutputLine{Text: text, Type: "text"} +} + +// stripANSI removes ANSI escape sequences from a string. +func stripANSI(s string) string { + return ansiEscapePattern.ReplaceAllString(s, "") +} + +// isToolCallJSON checks if a line is a tool call JSON object. +// Tool calls have exactly "name" and "arguments" keys. +func isToolCallJSON(line string) bool { + trimmed := strings.TrimSpace(line) + if !strings.HasPrefix(trimmed, "{") { + return false + } + var m map[string]json.RawMessage + if err := json.Unmarshal([]byte(trimmed), &m); err != nil { + return false + } + // Tool calls have exactly "name" and "arguments" keys + if len(m) != 2 { + return false + } + _, hasName := m["name"] + _, hasArgs := m["arguments"] + return hasName && hasArgs +} diff --git a/internal/daemon/normalize_test.go b/internal/daemon/normalize_test.go new file mode 100644 index 0000000..2632b37 --- /dev/null +++ b/internal/daemon/normalize_test.go @@ -0,0 +1,271 @@ +package daemon + +import ( + "testing" +) + +func TestNormalizeClaudeOutput_AssistantMessage(t *testing.T) { + line := `{"type":"assistant","message":{"content":"Hello, I will review this code."}}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Hello, I will review this code." { + t.Errorf("expected message content, got %q", result.Text) + } + if result.Type != "text" { + t.Errorf("expected type 'text', got %q", result.Type) + } +} + +func TestNormalizeClaudeOutput_Result(t *testing.T) { + line := `{"type":"result","result":"Review complete: PASS"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Review complete: PASS" { + t.Errorf("expected result text, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_ToolUse(t *testing.T) { + line := `{"type":"tool_use","name":"Read","input":{"file_path":"/foo/bar.go"}}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool: Read]" { + t.Errorf("expected tool indicator, got %q", result.Text) + } + if result.Type != "tool" { + t.Errorf("expected type 'tool', got %q", result.Type) + } +} + +func TestNormalizeClaudeOutput_ToolResult(t *testing.T) { + line := `{"type":"tool_result","content":"file contents here"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool completed]" { + t.Errorf("expected tool completed indicator, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_SkipsLifecycleEvents(t *testing.T) { + cases := []string{ + `{"type":"message_start"}`, + `{"type":"message_delta"}`, + `{"type":"message_stop"}`, + `{"type":"content_block_start"}`, + `{"type":"content_block_stop"}`, + } + + for _, line := range cases { + result := NormalizeClaudeOutput(line) + if result != nil { + t.Errorf("expected nil for %s, got %+v", line, result) + } + } +} + +func TestNormalizeClaudeOutput_EmptyAssistant(t *testing.T) { + line := `{"type":"assistant","message":{}}` + result := NormalizeClaudeOutput(line) + + if result != nil { + t.Errorf("expected nil for empty assistant message, got %+v", result) + } +} + +func TestNormalizeClaudeOutput_SystemInit(t *testing.T) { + line := `{"type":"system","subtype":"init","session_id":"abc123def456"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Session: abc123de...]" { + t.Errorf("expected truncated session ID, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_InvalidJSON(t *testing.T) { + line := "not json at all" + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result for non-JSON") + } + if result.Text != "not json at all" { + t.Errorf("expected raw text, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_EmptyLine(t *testing.T) { + result := NormalizeClaudeOutput("") + if result != nil { + t.Errorf("expected nil for empty line, got %+v", result) + } + + result = NormalizeClaudeOutput(" ") + if result != nil { + t.Errorf("expected nil for whitespace line, got %+v", result) + } +} + +func TestNormalizeOpenCodeOutput_PlainText(t *testing.T) { + line := "Reviewing the code changes..." + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Reviewing the code changes..." { + t.Errorf("expected plain text, got %q", result.Text) + } + if result.Type != "text" { + t.Errorf("expected type 'text', got %q", result.Type) + } +} + +func TestNormalizeOpenCodeOutput_StripsANSI(t *testing.T) { + line := "\x1b[32mGreen text\x1b[0m" + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Green text" { + t.Errorf("expected ANSI stripped, got %q", result.Text) + } +} + +func TestNormalizeOpenCodeOutput_FilterToolCall(t *testing.T) { + line := `{"name":"read","arguments":{"path":"/foo"}}` + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool call]" { + t.Errorf("expected tool call indicator, got %q", result.Text) + } + if result.Type != "tool" { + t.Errorf("expected type 'tool', got %q", result.Type) + } +} + +func TestNormalizeOpenCodeOutput_PreservesJSONWithExtraKeys(t *testing.T) { + // JSON with more than just name/arguments should be preserved + line := `{"name":"test","arguments":{},"extra":"key"}` + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + // Should not be filtered as tool call + if result.Type == "tool" { + t.Error("expected non-tool type for JSON with extra keys") + } +} + +func TestNormalizeOpenCodeOutput_EmptyLine(t *testing.T) { + result := NormalizeOpenCodeOutput("") + if result != nil { + t.Errorf("expected nil for empty line, got %+v", result) + } +} + +func TestNormalizeGenericOutput_PlainText(t *testing.T) { + line := "Some agent output" + result := NormalizeGenericOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Some agent output" { + t.Errorf("expected plain text, got %q", result.Text) + } +} + +func TestNormalizeGenericOutput_StripsANSI(t *testing.T) { + line := "\x1b[1;31mBold red\x1b[0m normal" + result := NormalizeGenericOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Bold red normal" { + t.Errorf("expected ANSI stripped, got %q", result.Text) + } +} + +func TestGetNormalizer(t *testing.T) { + cases := []struct { + agent string + expected string + }{ + {"claude-code", "claude"}, + {"opencode", "opencode"}, + {"codex", "generic"}, + {"gemini", "generic"}, + {"unknown", "generic"}, + } + + for _, tc := range cases { + normalizer := GetNormalizer(tc.agent) + if normalizer == nil { + t.Errorf("GetNormalizer(%q) returned nil", tc.agent) + } + } +} + +func TestStripANSI(t *testing.T) { + cases := []struct { + input string + expected string + }{ + {"plain text", "plain text"}, + {"\x1b[32mgreen\x1b[0m", "green"}, + {"\x1b[1;31;40mcomplex\x1b[0m", "complex"}, + {"\x1b[?25hcursor\x1b[?25l", "cursor"}, + {"no escape", "no escape"}, + {"\x1b]0;title\x07text", "text"}, // OSC sequence + } + + for _, tc := range cases { + result := stripANSI(tc.input) + if result != tc.expected { + t.Errorf("stripANSI(%q) = %q, want %q", tc.input, result, tc.expected) + } + } +} + +func TestIsToolCallJSON(t *testing.T) { + cases := []struct { + input string + expected bool + }{ + {`{"name":"read","arguments":{}}`, true}, + {`{"name":"write","arguments":{"path":"/foo"}}`, true}, + {`{"name":"test","arguments":{},"extra":true}`, false}, // extra key + {`{"name":"only"}`, false}, // missing arguments + {`{"arguments":{}}`, false}, // missing name + {`not json`, false}, + {`{"other":"object"}`, false}, + } + + for _, tc := range cases { + result := isToolCallJSON(tc.input) + if result != tc.expected { + t.Errorf("isToolCallJSON(%q) = %v, want %v", tc.input, result, tc.expected) + } + } +} diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go new file mode 100644 index 0000000..77048a1 --- /dev/null +++ b/internal/daemon/outputbuffer.go @@ -0,0 +1,242 @@ +package daemon + +import ( + "bytes" + "strings" + "sync" + "time" +) + +// OutputLine represents a single line of normalized output +type OutputLine struct { + Timestamp time.Time `json:"ts"` + Text string `json:"text"` + Type string `json:"type"` // "text", "tool", "thinking", "error" +} + +// JobOutput stores output for a single job +type JobOutput struct { + mu sync.RWMutex + lines []OutputLine + totalBytes int + startTime time.Time + closed bool + subs []chan OutputLine // Subscribers for streaming +} + +// OutputBuffer stores streaming output for running jobs with memory limits. +type OutputBuffer struct { + mu sync.RWMutex + buffers map[int64]*JobOutput + maxPerJob int // max bytes per job + maxTotal int // max total bytes across all jobs + totalBytes int +} + +// NewOutputBuffer creates a new output buffer with the given limits. +func NewOutputBuffer(maxPerJob, maxTotal int) *OutputBuffer { + return &OutputBuffer{ + buffers: make(map[int64]*JobOutput), + maxPerJob: maxPerJob, + maxTotal: maxTotal, + } +} + +// getOrCreate returns the JobOutput for a job, creating if needed. +func (ob *OutputBuffer) getOrCreate(jobID int64) *JobOutput { + ob.mu.Lock() + defer ob.mu.Unlock() + + if jo, ok := ob.buffers[jobID]; ok { + return jo + } + + jo := &JobOutput{ + lines: make([]OutputLine, 0, 100), + startTime: time.Now(), + } + ob.buffers[jobID] = jo + return jo +} + +// Append adds a line to the job's output buffer. +func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { + jo := ob.getOrCreate(jobID) + + jo.mu.Lock() + defer jo.mu.Unlock() + + if jo.closed { + return + } + + lineBytes := len(line.Text) + + // Evict oldest lines if this job exceeds its limit + for jo.totalBytes+lineBytes > ob.maxPerJob && len(jo.lines) > 0 { + evicted := jo.lines[0] + jo.lines = jo.lines[1:] + jo.totalBytes -= len(evicted.Text) + ob.mu.Lock() + ob.totalBytes -= len(evicted.Text) + ob.mu.Unlock() + } + + // Add the line + jo.lines = append(jo.lines, line) + jo.totalBytes += lineBytes + ob.mu.Lock() + ob.totalBytes += lineBytes + ob.mu.Unlock() + + // Notify subscribers + for _, ch := range jo.subs { + select { + case ch <- line: + default: + // Drop if subscriber is slow + } + } +} + +// GetLines returns all lines for a job. +func (ob *OutputBuffer) GetLines(jobID int64) []OutputLine { + ob.mu.RLock() + jo, ok := ob.buffers[jobID] + ob.mu.RUnlock() + + if !ok { + return nil + } + + jo.mu.RLock() + defer jo.mu.RUnlock() + + result := make([]OutputLine, len(jo.lines)) + copy(result, jo.lines) + return result +} + +// Subscribe returns existing lines and a channel for new lines. +// Call the returned cancel function when done. +func (ob *OutputBuffer) Subscribe(jobID int64) ([]OutputLine, <-chan OutputLine, func()) { + jo := ob.getOrCreate(jobID) + + jo.mu.Lock() + defer jo.mu.Unlock() + + // Copy existing lines + initial := make([]OutputLine, len(jo.lines)) + copy(initial, jo.lines) + + // Create subscription channel + ch := make(chan OutputLine, 100) + jo.subs = append(jo.subs, ch) + + // Return cancel function + cancel := func() { + jo.mu.Lock() + defer jo.mu.Unlock() + for i, sub := range jo.subs { + if sub == ch { + jo.subs = append(jo.subs[:i], jo.subs[i+1:]...) + close(ch) + break + } + } + } + + return initial, ch, cancel +} + +// CloseJob marks a job as complete and removes its buffer. +func (ob *OutputBuffer) CloseJob(jobID int64) { + ob.mu.Lock() + jo, ok := ob.buffers[jobID] + if !ok { + ob.mu.Unlock() + return + } + delete(ob.buffers, jobID) + ob.mu.Unlock() + + jo.mu.Lock() + defer jo.mu.Unlock() + + jo.closed = true + ob.mu.Lock() + ob.totalBytes -= jo.totalBytes + ob.mu.Unlock() + + // Close all subscriber channels + for _, ch := range jo.subs { + close(ch) + } + jo.subs = nil +} + +// IsActive returns true if there's an active buffer for this job. +func (ob *OutputBuffer) IsActive(jobID int64) bool { + ob.mu.RLock() + defer ob.mu.RUnlock() + _, ok := ob.buffers[jobID] + return ok +} + +// OutputNormalizer converts agent-specific output to normalized OutputLines. +type OutputNormalizer func(line string) *OutputLine + +// outputWriter implements io.Writer and normalizes output to the buffer. +type outputWriter struct { + buffer *OutputBuffer + jobID int64 + normalize OutputNormalizer + lineBuf bytes.Buffer +} + +func (w *outputWriter) Write(p []byte) (n int, err error) { + w.lineBuf.Write(p) + + // Process complete lines + for { + data := w.lineBuf.String() + idx := strings.Index(data, "\n") + if idx < 0 { + // No complete line yet + break + } + // Extract line and update buffer + line := data[:idx] + w.lineBuf.Reset() + if idx+1 < len(data) { + w.lineBuf.WriteString(data[idx+1:]) + } + line = strings.TrimSuffix(line, "\r") + if normalized := w.normalize(line); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + } + return len(p), nil +} + +// Flush processes any remaining buffered content. +func (w *outputWriter) Flush() { + if w.lineBuf.Len() > 0 { + line := w.lineBuf.String() + w.lineBuf.Reset() + if normalized := w.normalize(line); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + } +} + +// Writer returns an io.Writer that normalizes and stores output for a job. +func (ob *OutputBuffer) Writer(jobID int64, normalize OutputNormalizer) *outputWriter { + return &outputWriter{ + buffer: ob, + jobID: jobID, + normalize: normalize, + } +} diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go new file mode 100644 index 0000000..fd481c9 --- /dev/null +++ b/internal/daemon/outputbuffer_test.go @@ -0,0 +1,262 @@ +package daemon + +import ( + "sync" + "testing" + "time" +) + +func TestOutputBuffer_Append(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "line 1", Type: "text"}) + ob.Append(1, OutputLine{Text: "line 2", Type: "tool"}) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "line 1" { + t.Errorf("expected 'line 1', got %q", lines[0].Text) + } + if lines[1].Type != "tool" { + t.Errorf("expected type 'tool', got %q", lines[1].Type) + } +} + +func TestOutputBuffer_GetLinesEmpty(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + lines := ob.GetLines(999) + if lines != nil { + t.Errorf("expected nil for non-existent job, got %v", lines) + } +} + +func TestOutputBuffer_PerJobLimit(t *testing.T) { + // Small limit: 50 bytes per job + ob := NewOutputBuffer(50, 1000) + + // Add lines that exceed the limit + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes - should evict first + + lines := ob.GetLines(1) + // First line should be evicted to make room + if len(lines) != 2 { + t.Fatalf("expected 2 lines after eviction, got %d", len(lines)) + } +} + +func TestOutputBuffer_CloseJob(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "test", Type: "text"}) + if !ob.IsActive(1) { + t.Error("expected job to be active") + } + + ob.CloseJob(1) + + if ob.IsActive(1) { + t.Error("expected job to be inactive after close") + } + + lines := ob.GetLines(1) + if lines != nil { + t.Error("expected nil lines after close") + } +} + +func TestOutputBuffer_Subscribe(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + // Add initial line + ob.Append(1, OutputLine{Text: "initial", Type: "text"}) + + // Subscribe + initial, ch, cancel := ob.Subscribe(1) + defer cancel() + + if len(initial) != 1 { + t.Fatalf("expected 1 initial line, got %d", len(initial)) + } + if initial[0].Text != "initial" { + t.Errorf("expected 'initial', got %q", initial[0].Text) + } + + // Add more lines after subscription + go func() { + time.Sleep(10 * time.Millisecond) + ob.Append(1, OutputLine{Text: "new", Type: "text"}) + }() + + select { + case line := <-ch: + if line.Text != "new" { + t.Errorf("expected 'new', got %q", line.Text) + } + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for subscribed line") + } +} + +func TestOutputBuffer_SubscribeCancel(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + _, ch, cancel := ob.Subscribe(1) + cancel() + + // Channel should be closed + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed") + } + default: + // Channel closed, as expected + } +} + +func TestOutputBuffer_CloseJobClosesSubscribers(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "test", Type: "text"}) + _, ch, _ := ob.Subscribe(1) + + ob.CloseJob(1) + + // Channel should be closed + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed after CloseJob") + } + case <-time.After(100 * time.Millisecond): + t.Error("channel not closed after CloseJob") + } +} + +func TestOutputWriter_Write(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write with newline + w.Write([]byte("hello\n")) + w.Write([]byte("world\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "hello" { + t.Errorf("expected 'hello', got %q", lines[0].Text) + } + if lines[1].Text != "world" { + t.Errorf("expected 'world', got %q", lines[1].Text) + } +} + +func TestOutputWriter_WritePartialLines(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write partial line + w.Write([]byte("hel")) + w.Write([]byte("lo\nwor")) + w.Write([]byte("ld\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "hello" { + t.Errorf("expected 'hello', got %q", lines[0].Text) + } + if lines[1].Text != "world" { + t.Errorf("expected 'world', got %q", lines[1].Text) + } +} + +func TestOutputWriter_Flush(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write without newline + w.Write([]byte("incomplete")) + + // Should not appear yet + lines := ob.GetLines(1) + if len(lines) != 0 { + t.Fatalf("expected 0 lines before flush, got %d", len(lines)) + } + + // Flush should process remaining + w.Flush() + + lines = ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected 1 line after flush, got %d", len(lines)) + } + if lines[0].Text != "incomplete" { + t.Errorf("expected 'incomplete', got %q", lines[0].Text) + } +} + +func TestOutputWriter_NormalizeFilters(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + // Normalizer that filters out empty lines + normalize := func(line string) *OutputLine { + if line == "" { + return nil + } + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + w.Write([]byte("keep\n\nskip empty\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines (empty filtered), got %d", len(lines)) + } +} + +func TestOutputBuffer_Concurrent(t *testing.T) { + ob := NewOutputBuffer(10240, 40960) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(jobID int64) { + defer wg.Done() + for j := 0; j < 100; j++ { + ob.Append(jobID, OutputLine{Text: "test", Type: "text"}) + } + }(int64(i)) + } + + wg.Wait() + + // All jobs should have lines + for i := 0; i < 10; i++ { + lines := ob.GetLines(int64(i)) + if len(lines) == 0 { + t.Errorf("job %d has no lines", i) + } + } +} diff --git a/internal/daemon/server.go b/internal/daemon/server.go index eae2bdb..a862b4f 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -65,6 +65,7 @@ func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server { mux.HandleFunc("/api/health", s.handleHealth) mux.HandleFunc("/api/jobs", s.handleListJobs) mux.HandleFunc("/api/job/cancel", s.handleCancelJob) + mux.HandleFunc("/api/job/output", s.handleJobOutput) mux.HandleFunc("/api/job/rerun", s.handleRerunJob) mux.HandleFunc("/api/repos", s.handleListRepos) mux.HandleFunc("/api/review", s.handleGetReview) @@ -646,6 +647,113 @@ func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]interface{}{"success": true}) } +// JobOutputResponse is the response for /api/job/output +type JobOutputResponse struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []OutputLine `json:"lines"` + HasMore bool `json:"has_more"` +} + +func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + jobIDStr := r.URL.Query().Get("job_id") + if jobIDStr == "" { + writeError(w, http.StatusBadRequest, "job_id required") + return + } + + var jobID int64 + if _, err := fmt.Sscanf(jobIDStr, "%d", &jobID); err != nil { + writeError(w, http.StatusBadRequest, "invalid job_id") + return + } + + // Check job exists + job, err := s.db.GetJobByID(jobID) + if err != nil { + writeError(w, http.StatusNotFound, "job not found") + return + } + + // Check if streaming mode requested + stream := r.URL.Query().Get("stream") == "1" + + if !stream { + // Return current buffer (polling mode) + lines := s.workerPool.GetJobOutput(jobID) + if lines == nil { + lines = []OutputLine{} + } + + resp := JobOutputResponse{ + JobID: jobID, + Status: string(job.Status), + Lines: lines, + HasMore: job.Status == storage.JobStatusRunning, + } + writeJSON(w, http.StatusOK, resp) + return + } + + // Streaming mode via SSE + w.Header().Set("Content-Type", "application/x-ndjson") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + // Subscribe to output + initial, ch, cancel := s.workerPool.SubscribeJobOutput(jobID) + defer cancel() + + encoder := json.NewEncoder(w) + + // Send initial lines + for _, line := range initial { + encoder.Encode(map[string]interface{}{ + "type": "line", + "ts": line.Timestamp.Format(time.RFC3339Nano), + "text": line.Text, + "line_type": line.Type, + }) + } + flusher.Flush() + + // Stream new lines until job completes or client disconnects + for { + select { + case <-r.Context().Done(): + return + case line, ok := <-ch: + if !ok { + // Job finished - channel closed + encoder.Encode(map[string]interface{}{ + "type": "complete", + "status": "done", + }) + flusher.Flush() + return + } + encoder.Encode(map[string]interface{}{ + "type": "line", + "ts": line.Timestamp.Format(time.RFC3339Nano), + "text": line.Text, + "line_type": line.Type, + }) + flusher.Flush() + } + } +} + type RerunJobRequest struct { JobID int64 `json:"job_id"` } diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 58f8a54..90cfbcf 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -33,6 +33,9 @@ type WorkerPool struct { pendingCancels map[int64]bool // Jobs canceled before registered runningJobsMu sync.Mutex + // Output capture for tail command + outputBuffers *OutputBuffer + // Test hooks for deterministic synchronization (nil in production) testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup } @@ -49,6 +52,7 @@ func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broad stopCh: make(chan struct{}), runningJobs: make(map[int64]context.CancelFunc), pendingCancels: make(map[int64]bool), + outputBuffers: NewOutputBuffer(512*1024, 4*1024*1024), // 512KB/job, 4MB total } } @@ -80,6 +84,22 @@ func (wp *WorkerPool) MaxWorkers() int { return wp.numWorkers } +// GetJobOutput returns the current output lines for a job. +func (wp *WorkerPool) GetJobOutput(jobID int64) []OutputLine { + return wp.outputBuffers.GetLines(jobID) +} + +// SubscribeJobOutput returns initial lines and a channel for new output. +// Call cancel when done to unsubscribe. +func (wp *WorkerPool) SubscribeJobOutput(jobID int64) ([]OutputLine, <-chan OutputLine, func()) { + return wp.outputBuffers.Subscribe(jobID) +} + +// HasJobOutput returns true if there's active output capture for a job. +func (wp *WorkerPool) HasJobOutput(jobID int64) bool { + return wp.outputBuffers.IsActive(jobID) +} + // CancelJob cancels a running job by its ID, killing the subprocess. // Returns true if the job was canceled or marked for pending cancellation. // Returns false only if the job doesn't exist or isn't in a cancellable state. @@ -315,9 +335,17 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { Agent: agentName, }) + // Create output writer for tail command + normalizer := GetNormalizer(agentName) + outputWriter := wp.outputBuffers.Writer(job.ID, normalizer) + defer func() { + outputWriter.Flush() + wp.outputBuffers.CloseJob(job.ID) + }() + // Run the review log.Printf("[%s] Running %s review...", workerID, agentName) - output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, nil) + output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, outputWriter) if err != nil { // Check if this was a cancellation if ctx.Err() == context.Canceled { From daaf753c45df98d3182e1ef58eb6f560e2b4f7e3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 07:14:33 -0500 Subject: [PATCH 02/13] Fix tail view rendering and add comment API tests - Fix tail view content being cleared when job completes (preserve lines when server returns empty response after buffer closure) - Fix inconsistent visibleLines calculation (use m.height - 4 consistently) - Add tests for AddCommentToJob with all job states (queued, running, done, failed, canceled) - Add API endpoint tests for commenting on in-progress/failed jobs Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui.go | 18 ++- internal/daemon/server_test.go | 160 ++++++++++++++++++++ internal/storage/reviews_test.go | 247 +++++++++++++++++++++++++++++++ 3 files changed, 418 insertions(+), 7 deletions(-) create mode 100644 internal/storage/reviews_test.go diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index 75a3646..76f19d4 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -1457,7 +1457,7 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, nil case "pgup": m.tailFollow = false // Stop auto-scroll when user scrolls up - visibleLines := m.height - 5 + visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { visibleLines = 1 } @@ -1465,21 +1465,21 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { if m.tailScroll < 0 { m.tailScroll = 0 } - return m, tea.ClearScreen + return m, nil case "pgdown": - visibleLines := m.height - 5 + visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { visibleLines = 1 } m.tailScroll += visibleLines - return m, tea.ClearScreen + return m, nil case "home", "g": m.tailFollow = false // Stop auto-scroll when going to top m.tailScroll = 0 return m, nil case "end", "G": m.tailFollow = true // Resume auto-scroll when going to bottom - visibleLines := m.height - 5 + visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { visibleLines = 1 } @@ -2277,11 +2277,15 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, nil } if m.currentView == tuiViewTail { - m.tailLines = msg.lines + // Only update lines if we have new content, or if job is still streaming + // This preserves the output when job completes (buffer gets closed on server) + if len(msg.lines) > 0 || msg.hasMore { + m.tailLines = msg.lines + } m.tailStreaming = msg.hasMore // Auto-scroll to bottom only if in follow mode if m.tailFollow && len(m.tailLines) > 0 { - visibleLines := m.height - 5 + visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { visibleLines = 1 } diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index b37157a..e2c6f20 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -2171,3 +2171,163 @@ func TestGetMachineID_CachingBehavior(t *testing.T) { } }) } + +// TestHandleAddCommentToJobStates tests that comments can be added to jobs +// in any state: queued, running, done, failed, and canceled. +func TestHandleAddCommentToJobStates(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create repo and commit + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test commit", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + testCases := []struct { + name string + setupQuery string // SQL to set job to specific state + }{ + {"queued job", ""}, + {"running job", `UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`}, + {"completed job", `UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`}, + {"failed job", `UPDATE review_jobs SET status = 'failed', started_at = datetime('now'), finished_at = datetime('now'), error = 'test error' WHERE id = ?`}, + {"canceled job", `UPDATE review_jobs SET status = 'canceled', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a job + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to desired state + if tc.setupQuery != "" { + if _, err := db.Exec(tc.setupQuery, job.ID); err != nil { + t.Fatalf("Failed to set job state: %v", err) + } + } + + // Add comment via API + reqData := map[string]interface{}{ + "job_id": job.ID, + "commenter": "test-user", + "comment": "Test comment for " + tc.name, + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify response contains the comment + var resp storage.Response + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + if resp.Responder != "test-user" { + t.Errorf("Expected responder 'test-user', got %q", resp.Responder) + } + }) + } +} + +// TestHandleAddCommentToNonExistentJob tests that adding a comment to a +// non-existent job returns 404. +func TestHandleAddCommentToNonExistentJob(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + reqData := map[string]interface{}{ + "job_id": 99999, + "commenter": "test-user", + "comment": "This should fail", + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "job not found") { + t.Errorf("Expected 'job not found' error, got: %s", w.Body.String()) + } +} + +// TestHandleAddCommentWithoutReview tests that comments can be added to jobs +// that don't have a review yet (job exists but hasn't completed). +func TestHandleAddCommentWithoutReview(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create repo, commit, and job (but NO review) + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test commit", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to running (no review exists yet) + if _, err := db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID); err != nil { + t.Fatalf("Failed to set job to running: %v", err) + } + + // Verify no review exists + if _, err := db.GetReviewByJobID(job.ID); err == nil { + t.Fatal("Expected no review to exist for job") + } + + // Add comment - should succeed even without a review + reqData := map[string]interface{}{ + "job_id": job.ID, + "commenter": "test-user", + "comment": "Comment on in-progress job without review", + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify comment was stored + comments, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(comments) != 1 { + t.Fatalf("Expected 1 comment, got %d", len(comments)) + } + if comments[0].Response != "Comment on in-progress job without review" { + t.Errorf("Unexpected comment: %q", comments[0].Response) + } +} diff --git a/internal/storage/reviews_test.go b/internal/storage/reviews_test.go new file mode 100644 index 0000000..7b34820 --- /dev/null +++ b/internal/storage/reviews_test.go @@ -0,0 +1,247 @@ +package storage + +import ( + "database/sql" + "path/filepath" + "testing" + "time" +) + +// TestAddCommentToJobAllStates verifies that comments can be added to jobs +// in any state: queued, running, done, failed, and canceled. +func TestAddCommentToJobAllStates(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + testCases := []struct { + name string + status JobStatus + setupQuery string // SQL to set the job to a specific state + }{ + { + name: "queued job", + status: JobStatusQueued, + setupQuery: "", // Default status is queued + }, + { + name: "running job", + status: JobStatusRunning, + setupQuery: `UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, + }, + { + name: "completed job", + status: JobStatusDone, + setupQuery: `UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, + }, + { + name: "failed job", + status: JobStatusFailed, + setupQuery: `UPDATE review_jobs SET status = 'failed', started_at = datetime('now'), finished_at = datetime('now'), error = 'test error' WHERE id = ?`, + }, + { + name: "canceled job", + status: JobStatusCanceled, + setupQuery: `UPDATE review_jobs SET status = 'canceled', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a job for this test case + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to the desired state + if tc.setupQuery != "" { + _, err = db.Exec(tc.setupQuery, job.ID) + if err != nil { + t.Fatalf("Failed to set job status to %s: %v", tc.status, err) + } + } + + // Verify job is in expected state + var actualStatus string + err = db.QueryRow(`SELECT status FROM review_jobs WHERE id = ?`, job.ID).Scan(&actualStatus) + if err != nil { + t.Fatalf("Failed to verify job status: %v", err) + } + if JobStatus(actualStatus) != tc.status { + t.Fatalf("Expected job status %s, got %s", tc.status, actualStatus) + } + + // Add a comment to the job + comment := "Test comment for " + tc.name + resp, err := db.AddCommentToJob(job.ID, "test-user", comment) + if err != nil { + t.Fatalf("AddCommentToJob failed for %s: %v", tc.name, err) + } + + // Verify the comment was added + if resp == nil { + t.Fatal("Expected non-nil response") + } + if resp.Responder != "test-user" { + t.Errorf("Expected responder 'test-user', got %q", resp.Responder) + } + if resp.Response != comment { + t.Errorf("Expected response %q, got %q", comment, resp.Response) + } + if resp.JobID == nil || *resp.JobID != job.ID { + t.Errorf("Expected job ID %d, got %v", job.ID, resp.JobID) + } + + // Verify we can retrieve the comment + comments, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(comments) != 1 { + t.Fatalf("Expected 1 comment, got %d", len(comments)) + } + if comments[0].Response != comment { + t.Errorf("Retrieved comment mismatch: expected %q, got %q", comment, comments[0].Response) + } + }) + } +} + +// TestAddCommentToJobNonExistent verifies that adding a comment to a +// non-existent job returns an appropriate error. +func TestAddCommentToJobNonExistent(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + // Try to add a comment to a job that doesn't exist + _, err := db.AddCommentToJob(99999, "test-user", "This should fail") + if err == nil { + t.Fatal("Expected error when adding comment to non-existent job") + } + if err != sql.ErrNoRows { + t.Errorf("Expected sql.ErrNoRows, got: %v", err) + } +} + +// TestAddCommentToJobMultipleComments verifies that multiple comments +// can be added to the same job. +func TestAddCommentToJobMultipleComments(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + // Create a job and set it to running (in-progress) + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + _, err = db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Failed to set job to running: %v", err) + } + + // Add multiple comments from different users + comments := []struct { + user string + message string + }{ + {"alice", "First comment while job is running"}, + {"bob", "Second comment from another user"}, + {"alice", "Third comment from alice again"}, + } + + for _, c := range comments { + _, err := db.AddCommentToJob(job.ID, c.user, c.message) + if err != nil { + t.Fatalf("AddCommentToJob failed for %s: %v", c.user, err) + } + } + + // Verify all comments were added + retrieved, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(retrieved) != len(comments) { + t.Fatalf("Expected %d comments, got %d", len(comments), len(retrieved)) + } + + // Verify comments are in order + for i, c := range comments { + if retrieved[i].Responder != c.user { + t.Errorf("Comment %d: expected responder %q, got %q", i, c.user, retrieved[i].Responder) + } + if retrieved[i].Response != c.message { + t.Errorf("Comment %d: expected message %q, got %q", i, c.message, retrieved[i].Response) + } + } +} + +// TestAddCommentToJobWithNoReview verifies that comments can be added +// to jobs that have no review (i.e., job exists but has no review record yet). +func TestAddCommentToJobWithNoReview(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + // Create a job (no review is created yet - job is just queued) + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Verify no review exists for this job + _, err = db.GetReviewByJobID(job.ID) + if err == nil { + t.Fatal("Expected error getting review for job with no review") + } + + // Add a comment to the job (should succeed even without a review) + resp, err := db.AddCommentToJob(job.ID, "test-user", "Comment on job without review") + if err != nil { + t.Fatalf("AddCommentToJob failed: %v", err) + } + if resp == nil { + t.Fatal("Expected non-nil response") + } + if resp.Response != "Comment on job without review" { + t.Errorf("Unexpected response: %q", resp.Response) + } +} + +func openReviewsTestDB(t *testing.T) *DB { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test.db") + + db, err := Open(dbPath) + if err != nil { + t.Fatalf("Failed to open test DB: %v", err) + } + + return db +} From 0954a5048763976368eac781e23e06e8ae1a0f25 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 07:17:16 -0500 Subject: [PATCH 03/13] Fix output buffer bugs from review #2577 - Fix potential panic when SessionID < 8 chars in normalize.go - Enforce maxTotal global memory limit (was tracked but not enforced) - Add tests for short session IDs and global memory limit Co-Authored-By: Claude Opus 4.5 --- internal/daemon/normalize.go | 6 +++++- internal/daemon/normalize_test.go | 13 +++++++++++++ internal/daemon/outputbuffer.go | 12 +++++++++--- internal/daemon/outputbuffer_test.go | 25 +++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/internal/daemon/normalize.go b/internal/daemon/normalize.go index 4cc575b..ff68e75 100644 --- a/internal/daemon/normalize.go +++ b/internal/daemon/normalize.go @@ -140,7 +140,11 @@ func NormalizeClaudeOutput(line string) *OutputLine { // System messages (e.g., init) if msg.Subtype == "init" { if msg.SessionID != "" { - return &OutputLine{Text: "[Session: " + msg.SessionID[:8] + "...]", Type: "text"} + sessionPrefix := msg.SessionID + if len(sessionPrefix) > 8 { + sessionPrefix = sessionPrefix[:8] + } + return &OutputLine{Text: "[Session: " + sessionPrefix + "...]", Type: "text"} } } return nil diff --git a/internal/daemon/normalize_test.go b/internal/daemon/normalize_test.go index 2632b37..c8153aa 100644 --- a/internal/daemon/normalize_test.go +++ b/internal/daemon/normalize_test.go @@ -96,6 +96,19 @@ func TestNormalizeClaudeOutput_SystemInit(t *testing.T) { } } +func TestNormalizeClaudeOutput_SystemInitShortSessionID(t *testing.T) { + // Test that short session IDs don't panic + line := `{"type":"system","subtype":"init","session_id":"abc"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Session: abc...]" { + t.Errorf("expected short session ID preserved, got %q", result.Text) + } +} + func TestNormalizeClaudeOutput_InvalidJSON(t *testing.T) { line := "not json at all" result := NormalizeClaudeOutput(line) diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index 77048a1..4fe956c 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -82,13 +82,19 @@ func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { ob.mu.Unlock() } - // Add the line - jo.lines = append(jo.lines, line) - jo.totalBytes += lineBytes + // Check global memory limit - drop line if we'd exceed maxTotal ob.mu.Lock() + if ob.totalBytes+lineBytes > ob.maxTotal { + ob.mu.Unlock() + return // Drop line to enforce global memory limit + } ob.totalBytes += lineBytes ob.mu.Unlock() + // Add the line + jo.lines = append(jo.lines, line) + jo.totalBytes += lineBytes + // Notify subscribers for _, ch := range jo.subs { select { diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index fd481c9..452187a 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -49,6 +49,31 @@ func TestOutputBuffer_PerJobLimit(t *testing.T) { } } +func TestOutputBuffer_GlobalLimit(t *testing.T) { + // Small global limit: 50 bytes total, 30 bytes per job + ob := NewOutputBuffer(30, 50) + + // Add lines across multiple jobs + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=20 + ob.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=40 + ob.Append(3, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes - would exceed 50, dropped + + // Job 3's line should be dropped due to global limit + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + lines3 := ob.GetLines(3) + + if len(lines1) != 1 { + t.Errorf("expected 1 line for job 1, got %d", len(lines1)) + } + if len(lines2) != 1 { + t.Errorf("expected 1 line for job 2, got %d", len(lines2)) + } + if len(lines3) != 0 { + t.Errorf("expected 0 lines for job 3 (global limit exceeded), got %d", len(lines3)) + } +} + func TestOutputBuffer_CloseJob(t *testing.T) { ob := NewOutputBuffer(1024, 4096) From 5548b3decfd67f4e7918b4d312db539d635d3149 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 07:24:27 -0500 Subject: [PATCH 04/13] Fix tail view status range to show actual lines not padded count The status line was showing ranges past actual line count (e.g., "[6-15 of 12 lines]") because it used linesWritten which includes padding. Now computes displayEnd correctly. Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index 76f19d4..e6ff124 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -3542,7 +3542,12 @@ func (m tuiModel) renderTailView() string { // Status line with position and follow mode var status string if len(m.tailLines) > visibleLines { - status = fmt.Sprintf("[%d-%d of %d lines]", scroll+1, scroll+linesWritten, len(m.tailLines)) + // Calculate actual displayed range (not including padding) + displayEnd := scroll + visibleLines + if displayEnd > len(m.tailLines) { + displayEnd = len(m.tailLines) + } + status = fmt.Sprintf("[%d-%d of %d lines]", scroll+1, displayEnd, len(m.tailLines)) } else { status = fmt.Sprintf("[%d lines]", len(m.tailLines)) } From 8aaee30e41d91a38925e16c9cb4cabc90697904e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 07:28:55 -0500 Subject: [PATCH 05/13] Fix additional output buffer and streaming issues - Fix JSON field mismatch: change OutputLine.Type tag from "type" to "line_type" to match what TUI expects (tool/error styling now works) - Prevent streaming hang for completed jobs: return immediate complete response instead of subscribing to non-existent buffer - Drop oversized lines that exceed per-job limit on their own - Add test for oversized line handling Co-Authored-By: Claude Opus 4.5 --- internal/daemon/outputbuffer.go | 7 ++++++- internal/daemon/outputbuffer_test.go | 21 +++++++++++++++++++++ internal/daemon/server.go | 12 ++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index 4fe956c..33d00f1 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -11,7 +11,7 @@ import ( type OutputLine struct { Timestamp time.Time `json:"ts"` Text string `json:"text"` - Type string `json:"type"` // "text", "tool", "thinking", "error" + Type string `json:"line_type"` // "text", "tool", "thinking", "error" } // JobOutput stores output for a single job @@ -72,6 +72,11 @@ func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { lineBytes := len(line.Text) + // Drop oversized lines that exceed per-job limit on their own + if lineBytes > ob.maxPerJob { + return + } + // Evict oldest lines if this job exceeds its limit for jo.totalBytes+lineBytes > ob.maxPerJob && len(jo.lines) > 0 { evicted := jo.lines[0] diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index 452187a..8a0a5aa 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -74,6 +74,27 @@ func TestOutputBuffer_GlobalLimit(t *testing.T) { } } +func TestOutputBuffer_OversizedLine(t *testing.T) { + // Per-job limit: 20 bytes + ob := NewOutputBuffer(20, 1000) + + // Try to add a line larger than per-job limit + ob.Append(1, OutputLine{Text: "this line is way too long to fit in buffer", Type: "text"}) // 43 bytes > 20 + + // Line should be dropped + lines := ob.GetLines(1) + if len(lines) != 0 { + t.Errorf("expected 0 lines (oversized dropped), got %d", len(lines)) + } + + // Normal sized lines should still work + ob.Append(1, OutputLine{Text: "short", Type: "text"}) // 5 bytes + lines = ob.GetLines(1) + if len(lines) != 1 { + t.Errorf("expected 1 line after normal append, got %d", len(lines)) + } +} + func TestOutputBuffer_CloseJob(t *testing.T) { ob := NewOutputBuffer(1024, 4096) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index a862b4f..e845a5a 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -701,6 +701,18 @@ func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { } // Streaming mode via SSE + // Don't stream for non-running jobs - they have no active buffer producer + // and would hang forever waiting for data + if job.Status != storage.JobStatusRunning { + w.Header().Set("Content-Type", "application/x-ndjson") + encoder := json.NewEncoder(w) + encoder.Encode(map[string]interface{}{ + "type": "complete", + "status": string(job.Status), + }) + return + } + w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") From a832c148726cd85789c475239c051058d531e987 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:04:19 -0500 Subject: [PATCH 06/13] Add tests for /api/job/output endpoint - Test invalid job_id returns 400 - Test non-existent job returns 404 - Test missing job_id returns 400 - Test polling mode for running job (has_more=true) - Test polling mode for completed job (has_more=false) - Test streaming mode for completed job returns immediate complete response (verifies fix for streaming hang) Co-Authored-By: Claude Opus 4.5 --- internal/daemon/server_test.go | 171 +++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index e2c6f20..515dfb6 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -2331,3 +2331,174 @@ func TestHandleAddCommentWithoutReview(t *testing.T) { t.Errorf("Unexpected comment: %q", comments[0].Response) } } + +// TestHandleJobOutput_InvalidJobID tests that invalid job_id returns 400. +func TestHandleJobOutput_InvalidJobID(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=notanumber", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestHandleJobOutput_NonExistentJob tests that non-existent job returns 404. +func TestHandleJobOutput_NonExistentJob(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=99999", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestHandleJobOutput_PollingRunningJob tests polling mode for a running job. +func TestHandleJobOutput_PollingRunningJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a running job + repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []struct { + TS string `json:"ts"` + Text string `json:"text"` + LineType string `json:"line_type"` + } `json:"lines"` + HasMore bool `json:"has_more"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.JobID != job.ID { + t.Errorf("Expected job_id %d, got %d", job.ID, resp.JobID) + } + if resp.Status != "running" { + t.Errorf("Expected status 'running', got %q", resp.Status) + } + if !resp.HasMore { + t.Error("Expected has_more=true for running job") + } +} + +// TestHandleJobOutput_PollingCompletedJob tests polling mode for a completed job. +func TestHandleJobOutput_PollingCompletedJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a completed job + repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + HasMore bool `json:"has_more"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.Status != "done" { + t.Errorf("Expected status 'done', got %q", resp.Status) + } + if resp.HasMore { + t.Error("Expected has_more=false for completed job") + } +} + +// TestHandleJobOutput_StreamingCompletedJob tests that streaming mode for a +// completed job returns an immediate complete response instead of hanging. +func TestHandleJobOutput_StreamingCompletedJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a completed job + repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d&stream=1", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + // Should return immediately with complete message, not hang + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + Type string `json:"type"` + Status string `json:"status"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.Type != "complete" { + t.Errorf("Expected type 'complete', got %q", resp.Type) + } + if resp.Status != "done" { + t.Errorf("Expected status 'done', got %q", resp.Status) + } +} + +// TestHandleJobOutput_MissingJobID tests that missing job_id returns 400. +func TestHandleJobOutput_MissingJobID(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} From 871318dd6fba9f431b9a7767ad5218fce47762bf Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:09:57 -0500 Subject: [PATCH 07/13] test(daemon): Address code review #2616 Fix error handling in TestHandleJobOutput tests: - Check errors from GetOrCreateRepo, GetOrCreateCommit, and EnqueueJob instead of ignoring them with blank identifiers - Capture db.Exec result and verify RowsAffected == 1 to ensure the UPDATE actually modified the job status This prevents silent setup failures from causing misleading test behavior. Co-Authored-By: Claude Opus 4.5 --- internal/daemon/server_test.go | 69 ++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 515dfb6..a2c9a53 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -2371,10 +2371,25 @@ func TestHandleJobOutput_PollingRunningJob(t *testing.T) { server := NewServer(db, cfg, "") // Create a running job - repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) - commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") - db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) w := httptest.NewRecorder() @@ -2417,10 +2432,25 @@ func TestHandleJobOutput_PollingCompletedJob(t *testing.T) { server := NewServer(db, cfg, "") // Create a completed job - repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) - commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") - db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) w := httptest.NewRecorder() @@ -2456,10 +2486,25 @@ func TestHandleJobOutput_StreamingCompletedJob(t *testing.T) { server := NewServer(db, cfg, "") // Create a completed job - repo, _ := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) - commit, _ := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") - db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d&stream=1", job.ID), nil) w := httptest.NewRecorder() From fb451a38d63006238650bea72f993b8e7e348835 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:15:26 -0500 Subject: [PATCH 08/13] Fix output buffer eviction and add tail output preservation tests - Fix per-job eviction to check global limit before evicting lines, preserving existing content when new lines would exceed global limit - Add TUI tests for tail output preservation when job completes - Remove completed plan document Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui_test.go | 111 ++++++++++++ docs/plans/issue-97-tail-command.md | 252 --------------------------- internal/daemon/outputbuffer.go | 31 ++-- internal/daemon/outputbuffer_test.go | 60 +++++++ 4 files changed, 191 insertions(+), 263 deletions(-) delete mode 100644 docs/plans/issue-97-tail-command.md diff --git a/cmd/roborev/tui_test.go b/cmd/roborev/tui_test.go index 5c2b4ca..5089c6a 100644 --- a/cmd/roborev/tui_test.go +++ b/cmd/roborev/tui_test.go @@ -6742,3 +6742,114 @@ func TestSanitizeForDisplay(t *testing.T) { }) } } + +func TestTUITailOutputPreservesLinesOnEmptyResponse(t *testing.T) { + // Test that when a job completes and the server returns empty lines + // (because the buffer was closed), the TUI preserves the existing lines. + m := newTuiModel("http://localhost") + m.currentView = tuiViewTail + m.tailJobID = 1 + m.tailStreaming = true + m.height = 30 + + // Set up initial lines as if we had been streaming output + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Line 1", lineType: "text"}, + {timestamp: time.Now(), text: "Line 2", lineType: "text"}, + {timestamp: time.Now(), text: "Line 3", lineType: "text"}, + } + + // Simulate job completion: server returns empty lines, hasMore=false + emptyMsg := tuiTailOutputMsg{ + lines: []tailLine{}, + hasMore: false, + err: nil, + } + + updated, _ := m.Update(emptyMsg) + m2 := updated.(tuiModel) + + // Lines should be preserved (not cleared) + if len(m2.tailLines) != 3 { + t.Errorf("Expected 3 lines preserved, got %d", len(m2.tailLines)) + } + + // Streaming should stop + if m2.tailStreaming { + t.Error("Expected tailStreaming to be false after job completes") + } + + // Verify the original content is still there + if m2.tailLines[0].text != "Line 1" { + t.Errorf("Expected 'Line 1', got %q", m2.tailLines[0].text) + } +} + +func TestTUITailOutputUpdatesLinesWhenStreaming(t *testing.T) { + // Test that when streaming and new lines arrive, they are updated + m := newTuiModel("http://localhost") + m.currentView = tuiViewTail + m.tailJobID = 1 + m.tailStreaming = true + m.height = 30 + + // Set up initial lines + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Old line", lineType: "text"}, + } + + // New lines arrive while still streaming + newMsg := tuiTailOutputMsg{ + lines: []tailLine{ + {timestamp: time.Now(), text: "Old line", lineType: "text"}, + {timestamp: time.Now(), text: "New line", lineType: "text"}, + }, + hasMore: true, // Still streaming + err: nil, + } + + updated, _ := m.Update(newMsg) + m2 := updated.(tuiModel) + + // Lines should be updated + if len(m2.tailLines) != 2 { + t.Errorf("Expected 2 lines, got %d", len(m2.tailLines)) + } + + // Streaming should continue + if !m2.tailStreaming { + t.Error("Expected tailStreaming to be true while job is running") + } +} + +func TestTUITailOutputIgnoredWhenNotInTailView(t *testing.T) { + // Test that tail output messages are ignored when not in tail view + m := newTuiModel("http://localhost") + m.currentView = tuiViewQueue // Not in tail view + m.tailJobID = 1 + + // Existing lines from a previous tail session + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Previous session line", lineType: "text"}, + } + + // New lines arrive (stale message from previous tail) + msg := tuiTailOutputMsg{ + lines: []tailLine{ + {timestamp: time.Now(), text: "Should be ignored", lineType: "text"}, + }, + hasMore: false, + err: nil, + } + + updated, _ := m.Update(msg) + m2 := updated.(tuiModel) + + // Lines should not be updated since we're not in tail view + if len(m2.tailLines) != 1 { + t.Errorf("Expected 1 line (unchanged), got %d", len(m2.tailLines)) + } + if m2.tailLines[0].text != "Previous session line" { + t.Errorf("Lines should not be updated when not in tail view") + } +} diff --git a/docs/plans/issue-97-tail-command.md b/docs/plans/issue-97-tail-command.md deleted file mode 100644 index 2127831..0000000 --- a/docs/plans/issue-97-tail-command.md +++ /dev/null @@ -1,252 +0,0 @@ -# Implementation Plan: Issue #97 - Tail Command for Streaming Agent Output - -## Overview - -This feature adds a `t` key binding in the TUI to view streaming output of running agents in real-time, with `x` to cancel the job. The implementation requires: - -1. **Output capture infrastructure** in the daemon worker -2. **Output buffer** with memory limits and thread-safe access -3. **Output normalization** to convert agent-specific formats (especially Claude's stream-json) to readable text -4. **New API endpoint** to stream/fetch captured output -5. **TUI tail view** with scrolling and cancel support - ---- - -## 1. Output Buffer Design - -### Location: `internal/daemon/outputbuffer.go` (new file) - -Create a thread-safe ring buffer for capturing agent output per job, with memory limits. - -### Design: - -```go -// OutputBuffer stores streaming output for running jobs with memory limits. -type OutputBuffer struct { - mu sync.RWMutex - buffers map[int64]*JobOutput // jobID -> output - maxPerJob int // max bytes per job (default: 512KB) - maxTotal int // max total bytes across all jobs (default: 4MB) - totalBytes int // current total bytes -} - -// JobOutput stores output for a single job -type JobOutput struct { - lines []OutputLine // Ring buffer of lines - writeIdx int // Next write position - count int // Lines in buffer (up to maxLines) - totalBytes int // Bytes used by this job - startTime time.Time // When output capture started -} - -// OutputLine represents a single line of normalized output -type OutputLine struct { - Timestamp time.Time `json:"ts"` - Text string `json:"text"` - Type string `json:"type"` // "text", "tool", "thinking", "error" -} -``` - -### Key features: -- **Per-job limit**: 512KB default, configurable -- **Total limit**: 4MB default across all jobs -- **Ring buffer**: Oldest lines evicted when limit reached -- **Line-based**: Store by lines for efficient scrolling -- **Automatic cleanup**: Remove buffer when job completes/fails - ---- - -## 2. Output Normalization Strategy - -### Location: `internal/daemon/normalize.go` (new file) - -Different agents produce output in different formats. Normalize to human-readable text. - -### Claude Code (stream-json format) - -The Claude agent outputs JSON like: -```json -{"type": "assistant", "message": {"content": "..."}} -{"type": "result", "result": "..."} -{"type": "tool_use", "name": "Read", ...} -``` - -**Normalization:** -- `assistant` messages → extract content as text -- `result` messages → extract result as text -- `tool_use` → `[Tool: Read]` indicator -- `tool_result` → `[Tool result: N bytes]` (abbreviated) - -### OpenCode / Other Agents - -Plain text with ANSI codes: -- Strip ANSI escape sequences -- Filter tool call JSON lines -- Pass through readable content - -### Normalizer Registry - -```go -var normalizers = map[string]OutputNormalizer{ - "claude-code": NormalizeClaudeOutput, - "opencode": NormalizeOpenCodeOutput, - // Default for others: strip ANSI, pass through -} -``` - ---- - -## 3. Worker Integration - -### Location: `internal/daemon/worker.go` - -### Changes: -1. Add `outputBuffers *OutputBuffer` to WorkerPool struct -2. Initialize in NewWorkerPool with memory limits -3. In `processJob()` at line 320, create output writer and pass to `a.Review()`: - -```go -// Create output writer for tail command -outputWriter := wp.outputBuffers.Writer(job.ID, agentName) -defer wp.outputBuffers.CloseJob(job.ID) - -output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, outputWriter) -``` - -4. Add `GetJobOutput(jobID)` method to expose output for API - ---- - -## 4. API Endpoint Design - -### Location: `internal/daemon/server.go` - -### New endpoint: `GET /api/job/output?job_id=123` - -**Query Parameters:** -- `job_id` (required): Job ID to tail -- `stream` (optional): "1" for SSE streaming - -**Non-streaming response:** -```json -{ - "job_id": 123, - "status": "running", - "lines": [ - {"ts": "2024-01-25T10:30:00Z", "text": "Analyzing code...", "type": "text"}, - {"ts": "2024-01-25T10:30:01Z", "text": "[Tool: Read]", "type": "tool"} - ], - "has_more": true -} -``` - -**Streaming response (newline-delimited JSON):** -```json -{"type": "line", "ts": "...", "text": "...", "line_type": "text"} -{"type": "complete", "status": "done"} -``` - ---- - -## 5. TUI Tail View Implementation - -### Location: `cmd/roborev/tui.go` - -### New view type: -```go -const ( - // ... existing views ... - tuiViewTail // NEW -) -``` - -### New model fields: -```go -tailJobID int64 // Job being tailed -tailLines []tailLine // Buffer of output lines -tailScroll int // Scroll position -tailStreaming bool // True if actively streaming -tailFromView tuiView // View to return to -``` - -### Key bindings: -- `t` on running job → enter tail view -- `x` in tail view → cancel job -- `↑/↓/j/k` → scroll -- `pgup/pgdn` → page scroll -- `g/G` → top/bottom -- `q/esc` → return to queue - -### Rendering: -- Title showing job ID and streaming status -- Timestamped output lines with type indicators -- Status line showing position and live/complete state -- Help bar with available keys - ---- - -## 6. Thread Safety Considerations - -- **OutputBuffer**: RWMutex for buffer map, per-job mutex for fine-grained locking -- **Worker**: Output writer created per-job, single goroutine owner -- **API**: Channel-based subscription for streaming -- **TUI**: bubbletea handles all UI on single goroutine, async HTTP via tea.Cmd - ---- - -## 7. File Changes Summary - -### New Files -1. `internal/daemon/outputbuffer.go` - Buffer implementation -2. `internal/daemon/normalize.go` - Output normalizers -3. `internal/daemon/outputbuffer_test.go` - Buffer tests -4. `internal/daemon/normalize_test.go` - Normalizer tests - -### Modified Files -5. `internal/daemon/worker.go` - Add output capture -6. `internal/daemon/server.go` - Add API endpoint -7. `cmd/roborev/tui.go` - Add tail view - ---- - -## 8. Implementation Order - -### Phase 1: Output Buffer -- Implement core buffer with memory limits -- Unit tests for buffer operations - -### Phase 2: Normalization -- Implement normalizers for Claude, OpenCode, generic -- Unit tests for parsing edge cases - -### Phase 3: Worker Integration -- Wire up output capture in processJob -- Test with real agents - -### Phase 4: API Endpoint -- Implement polling mode first -- Add streaming mode -- Integration tests - -### Phase 5: TUI View -- Implement basic view with polling -- Add scrolling and navigation -- Polish UI - ---- - -## 9. Testing Strategy - -### Unit Tests -- OutputBuffer: append, eviction, subscribe, memory limits -- Normalizers: Claude stream-json parsing, ANSI stripping - -### Integration Tests -- API endpoint: polling and streaming modes -- Worker: output capture during real agent execution - -### Manual Testing -- Test with Claude Code (stream-json) -- Test with OpenCode (plain text + ANSI) -- Test memory limits with large output -- Test cancel during tail diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index 33d00f1..eb83819 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -77,25 +77,34 @@ func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { return } - // Evict oldest lines if this job exceeds its limit - for jo.totalBytes+lineBytes > ob.maxPerJob && len(jo.lines) > 0 { - evicted := jo.lines[0] - jo.lines = jo.lines[1:] - jo.totalBytes -= len(evicted.Text) - ob.mu.Lock() - ob.totalBytes -= len(evicted.Text) - ob.mu.Unlock() + // Calculate how many bytes would be evicted for per-job limit + evictBytes := 0 + tempTotal := jo.totalBytes + evictCount := 0 + for tempTotal+lineBytes > ob.maxPerJob && evictCount < len(jo.lines) { + evictBytes += len(jo.lines[evictCount].Text) + tempTotal -= len(jo.lines[evictCount].Text) + evictCount++ } - // Check global memory limit - drop line if we'd exceed maxTotal + // Check global memory limit BEFORE evicting - if we'd still exceed + // maxTotal after eviction, reject without losing existing lines ob.mu.Lock() - if ob.totalBytes+lineBytes > ob.maxTotal { + if ob.totalBytes-evictBytes+lineBytes > ob.maxTotal { ob.mu.Unlock() return // Drop line to enforce global memory limit } - ob.totalBytes += lineBytes + + // Now safe to evict and add - update global total + ob.totalBytes = ob.totalBytes - evictBytes + lineBytes ob.mu.Unlock() + // Perform the eviction + if evictCount > 0 { + jo.lines = jo.lines[evictCount:] + jo.totalBytes -= evictBytes + } + // Add the line jo.lines = append(jo.lines, line) jo.totalBytes += lineBytes diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index 8a0a5aa..6de703d 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -95,6 +95,66 @@ func TestOutputBuffer_OversizedLine(t *testing.T) { } } +func TestOutputBuffer_GlobalLimitPreservesExistingLines(t *testing.T) { + // Scenario: job has lines, per-job eviction would occur, but global limit rejects. + // Existing lines should be preserved (not evicted for nothing). + // Per-job: 50 bytes, Global: 80 bytes + ob := NewOutputBuffer(50, 80) + + // Job 1: add 40 bytes (two 20-byte lines) + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, job1=40, total=40 + + // Job 2: add 30 bytes + ob.Append(2, OutputLine{Text: "123456789012345678901234567890", Type: "text"}) // 30 bytes, total=70 + + // Verify initial state + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + if len(lines1) != 2 { + t.Fatalf("expected 2 lines for job 1, got %d", len(lines1)) + } + if len(lines2) != 1 { + t.Fatalf("expected 1 line for job 2, got %d", len(lines2)) + } + + // Now try to add 20 bytes to job 1 + // Per-job: 40+20=60 > 50, would need to evict 20 bytes (1 line) + // After eviction: job1=40, but total would be 70-20+20=70, still under 80 + // This SHOULD succeed + ob.Append(1, OutputLine{Text: "AAAAAAAAAAAAAAAAAAAA", Type: "text"}) // 20 bytes + + lines1 = ob.GetLines(1) + if len(lines1) != 2 { + t.Errorf("expected 2 lines for job 1 after eviction+add, got %d", len(lines1)) + } + + // Now global is at 70. Try to add 20 more bytes to job 1. + // Per-job: 40+20=60 > 50, would evict 20 bytes + // After eviction: total would be 70-20+20=70, under 80 + // This SHOULD succeed + ob.Append(1, OutputLine{Text: "BBBBBBBBBBBBBBBBBBBB", Type: "text"}) // 20 bytes + + lines1 = ob.GetLines(1) + if len(lines1) != 2 { + t.Errorf("expected 2 lines for job 1 after second eviction+add, got %d", len(lines1)) + } + + // Now try to add 15 bytes to job 2 (total would be 70+15=85 > 80) + // Per-job: 30+15=45 < 50, no eviction + // Global: 70+15=85 > 80, REJECTED + // Job 2 should keep its original line + ob.Append(2, OutputLine{Text: "123456789012345", Type: "text"}) // 15 bytes - rejected + + lines2 = ob.GetLines(2) + if len(lines2) != 1 { + t.Errorf("expected job 2 to keep 1 line after global rejection, got %d", len(lines2)) + } + if lines2[0].Text != "123456789012345678901234567890" { + t.Errorf("job 2 original line was modified: %q", lines2[0].Text) + } +} + func TestOutputBuffer_CloseJob(t *testing.T) { ob := NewOutputBuffer(1024, 4096) From b66e144bea59c6c38f8d02270a2c2d3b05cfd7e8 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:30:57 -0500 Subject: [PATCH 09/13] Address code review findings for tail command - Change g key to toggle between top/bottom in tail view (user request) - Fix stream completion status to reflect actual job status (failed/canceled) - Cap outputWriter line buffer to maxPerJob to prevent unbounded growth - Fix ANSI-aware truncation by truncating text before applying styles - Handle time.Parse errors with fallback to current time Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui.go | 51 ++++++++++++++++++++++------ internal/daemon/outputbuffer.go | 22 +++++++++++- internal/daemon/outputbuffer_test.go | 29 ++++++++++++++++ internal/daemon/server.go | 8 +++-- 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index e6ff124..0ad0152 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -701,7 +701,11 @@ func (m tuiModel) fetchTailOutput(jobID int64) tea.Cmd { lines := make([]tailLine, len(result.Lines)) for i, l := range result.Lines { - ts, _ := time.Parse(time.RFC3339Nano, l.TS) + ts, err := time.Parse(time.RFC3339Nano, l.TS) + if err != nil { + // Fallback to current time if timestamp is invalid + ts = time.Now() + } lines[i] = tailLine{timestamp: ts, text: l.Text, lineType: l.LineType} } @@ -1473,11 +1477,11 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } m.tailScroll += visibleLines return m, nil - case "home", "g": + case "home": m.tailFollow = false // Stop auto-scroll when going to top m.tailScroll = 0 return m, nil - case "end", "G": + case "end": m.tailFollow = true // Resume auto-scroll when going to bottom visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { @@ -1489,6 +1493,26 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } m.tailScroll = maxScroll return m, nil + case "g", "G": + // Toggle between top and bottom + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + if m.tailScroll == 0 { + // At top, go to bottom + m.tailFollow = true + m.tailScroll = maxScroll + } else { + // Not at top, go to top + m.tailFollow = false + m.tailScroll = 0 + } + return m, nil } return m, nil } @@ -3514,18 +3538,23 @@ func (m tuiModel) renderTailView() string { line := m.tailLines[i] // Format with timestamp ts := line.timestamp.Format("15:04:05") + + // Truncate raw text BEFORE styling to avoid cutting ANSI codes + // Account for timestamp prefix (8 chars + 1 space = 9) + lineText := line.text + maxTextWidth := m.width - 9 + if maxTextWidth > 3 && runewidth.StringWidth(lineText) > maxTextWidth { + lineText = runewidth.Truncate(lineText, maxTextWidth-3, "...") + } + var text string switch line.lineType { case "tool": - text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiQueuedStyle.Render(line.text)) + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiQueuedStyle.Render(lineText)) case "error": - text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiFailedStyle.Render(line.text)) + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiFailedStyle.Render(lineText)) default: - text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), line.text) - } - // Truncate to width - if runewidth.StringWidth(text) > m.width { - text = runewidth.Truncate(text, m.width-3, "...") + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), lineText) } b.WriteString(text) b.WriteString("\x1b[K\n") @@ -3560,7 +3589,7 @@ func (m tuiModel) renderTailView() string { b.WriteString("\x1b[K\n") // Help - help := "↑/↓: scroll | g/G: top/follow | x: cancel | esc/q: back" + help := "↑/↓: scroll | g: toggle top/bottom | x: cancel | esc/q: back" b.WriteString(tuiHelpStyle.Render(help)) b.WriteString("\x1b[K") b.WriteString("\x1b[J") // Clear to end of screen diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index eb83819..b43c7ba 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -212,6 +212,7 @@ type outputWriter struct { jobID int64 normalize OutputNormalizer lineBuf bytes.Buffer + maxLine int // Max line size before forced flush (prevents unbounded growth) } func (w *outputWriter) Write(p []byte) (n int, err error) { @@ -222,7 +223,24 @@ func (w *outputWriter) Write(p []byte) (n int, err error) { data := w.lineBuf.String() idx := strings.Index(data, "\n") if idx < 0 { - // No complete line yet + // No complete line yet - check if buffer exceeds max line size + if w.maxLine > 0 && w.lineBuf.Len() > w.maxLine { + // Force flush truncated line to prevent unbounded growth + // Truncate to maxLine-3 to leave room for "..." suffix + truncLen := w.maxLine - 3 + if truncLen < 1 { + truncLen = 1 + } + line := data[:truncLen] + w.lineBuf.Reset() + // Discard the rest of the oversized chunk until next newline + // Don't carry forward - it would just accumulate again + if normalized := w.normalize(line + "..."); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + continue + } break } // Extract line and update buffer @@ -253,10 +271,12 @@ func (w *outputWriter) Flush() { } // Writer returns an io.Writer that normalizes and stores output for a job. +// Lines exceeding maxPerJob will be truncated to prevent unbounded buffer growth. func (ob *OutputBuffer) Writer(jobID int64, normalize OutputNormalizer) *outputWriter { return &outputWriter{ buffer: ob, jobID: jobID, normalize: normalize, + maxLine: ob.maxPerJob, } } diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index 6de703d..a41e001 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -1,6 +1,7 @@ package daemon import ( + "strings" "sync" "testing" "time" @@ -342,6 +343,34 @@ func TestOutputWriter_NormalizeFilters(t *testing.T) { } } +func TestOutputWriter_LongLineWithoutNewline(t *testing.T) { + // Per-job limit: 50 bytes + ob := NewOutputBuffer(50, 1000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write a very long line without newline - should be force-flushed with truncation + longLine := strings.Repeat("x", 100) + w.Write([]byte(longLine)) + + lines := ob.GetLines(1) + // Should have at least one line from forced flush + if len(lines) == 0 { + t.Fatalf("expected at least 1 line after forced flush, got 0") + } + + // First line should be truncated to maxLine-3 + "..." = 50 bytes total + if len(lines[0].Text) != 50 { + t.Errorf("expected truncated line to be 50 bytes, got %d bytes: %q", len(lines[0].Text), lines[0].Text) + } + if !strings.HasSuffix(lines[0].Text, "...") { + t.Errorf("expected line to end with '...', got %q", lines[0].Text) + } +} + func TestOutputBuffer_Concurrent(t *testing.T) { ob := NewOutputBuffer(10240, 40960) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index e845a5a..0b0f465 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -747,10 +747,14 @@ func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { return case line, ok := <-ch: if !ok { - // Job finished - channel closed + // Job finished - channel closed, fetch actual status + finalStatus := "done" + if finalJob, err := s.db.GetJobByID(jobID); err == nil { + finalStatus = string(finalJob.Status) + } encoder.Encode(map[string]interface{}{ "type": "complete", - "status": "done", + "status": finalStatus, }) flusher.Flush() return From 843403cb2983e32abac13ae89bf4a1e08639e814 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:45:04 -0500 Subject: [PATCH 10/13] Fix output buffer race condition and add discard-until-newline - Move global total update after eviction to prevent concurrent overshoot - Add discard mode to outputWriter to prevent repeated truncated fragments - Fix test assertions to use Fatalf before indexing (prevents panic) - Add tests for multi-write long line discard and per-job eviction edge case Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui_test.go | 4 +- internal/daemon/outputbuffer.go | 43 +++++++++---- internal/daemon/outputbuffer_test.go | 93 ++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 14 deletions(-) diff --git a/cmd/roborev/tui_test.go b/cmd/roborev/tui_test.go index 5089c6a..ebb8c8b 100644 --- a/cmd/roborev/tui_test.go +++ b/cmd/roborev/tui_test.go @@ -6771,7 +6771,7 @@ func TestTUITailOutputPreservesLinesOnEmptyResponse(t *testing.T) { // Lines should be preserved (not cleared) if len(m2.tailLines) != 3 { - t.Errorf("Expected 3 lines preserved, got %d", len(m2.tailLines)) + t.Fatalf("Expected 3 lines preserved, got %d", len(m2.tailLines)) } // Streaming should stop @@ -6847,7 +6847,7 @@ func TestTUITailOutputIgnoredWhenNotInTailView(t *testing.T) { // Lines should not be updated since we're not in tail view if len(m2.tailLines) != 1 { - t.Errorf("Expected 1 line (unchanged), got %d", len(m2.tailLines)) + t.Fatalf("Expected 1 line (unchanged), got %d", len(m2.tailLines)) } if m2.tailLines[0].text != "Previous session line" { t.Errorf("Lines should not be updated when not in tail view") diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index b43c7ba..8267eff 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -95,11 +95,8 @@ func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { return // Drop line to enforce global memory limit } - // Now safe to evict and add - update global total - ob.totalBytes = ob.totalBytes - evictBytes + lineBytes - ob.mu.Unlock() - - // Perform the eviction + // Perform the eviction and add BEFORE updating global total + // to keep ob.totalBytes in sync with actual memory usage if evictCount > 0 { jo.lines = jo.lines[evictCount:] jo.totalBytes -= evictBytes @@ -109,6 +106,10 @@ func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { jo.lines = append(jo.lines, line) jo.totalBytes += lineBytes + // Update global total after eviction/add - now reflects actual state + ob.totalBytes = ob.totalBytes - evictBytes + lineBytes + ob.mu.Unlock() + // Notify subscribers for _, ch := range jo.subs { select { @@ -208,11 +209,12 @@ type OutputNormalizer func(line string) *OutputLine // outputWriter implements io.Writer and normalizes output to the buffer. type outputWriter struct { - buffer *OutputBuffer - jobID int64 - normalize OutputNormalizer - lineBuf bytes.Buffer - maxLine int // Max line size before forced flush (prevents unbounded growth) + buffer *OutputBuffer + jobID int64 + normalize OutputNormalizer + lineBuf bytes.Buffer + maxLine int // Max line size before forced flush (prevents unbounded growth) + discarding bool // True when discarding bytes until next newline (after truncation) } func (w *outputWriter) Write(p []byte) (n int, err error) { @@ -222,6 +224,23 @@ func (w *outputWriter) Write(p []byte) (n int, err error) { for { data := w.lineBuf.String() idx := strings.Index(data, "\n") + + // If discarding, skip all data until newline + if w.discarding { + if idx < 0 { + // No newline yet, discard everything + w.lineBuf.Reset() + break + } + // Found newline, stop discarding and keep remainder + w.lineBuf.Reset() + if idx+1 < len(data) { + w.lineBuf.WriteString(data[idx+1:]) + } + w.discarding = false + continue + } + if idx < 0 { // No complete line yet - check if buffer exceeds max line size if w.maxLine > 0 && w.lineBuf.Len() > w.maxLine { @@ -233,8 +252,8 @@ func (w *outputWriter) Write(p []byte) (n int, err error) { } line := data[:truncLen] w.lineBuf.Reset() - // Discard the rest of the oversized chunk until next newline - // Don't carry forward - it would just accumulate again + // Enter discard mode - drop bytes until next newline + w.discarding = true if normalized := w.normalize(line + "..."); normalized != nil { normalized.Timestamp = time.Now() w.buffer.Append(w.jobID, *normalized) diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index a41e001..96e30fe 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -371,6 +371,99 @@ func TestOutputWriter_LongLineWithoutNewline(t *testing.T) { } } +func TestOutputWriter_MultiWriteLongLineDiscard(t *testing.T) { + // Test that after truncating a long line, subsequent writes for the + // same line are discarded until a newline is seen. + // Key invariant: repeated writes without newlines produce at most ONE truncated line + ob := NewOutputBuffer(100, 10000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write data exceeding maxLine (100 bytes) multiple times WITHOUT a newline + // This simulates a single very long line being written in chunks + for i := 0; i < 5; i++ { + w.Write([]byte(strings.Repeat("x", 50))) // 5 * 50 = 250 bytes total + } + + // Should only have 1 line (the truncated one), not 5 fragments + lines := ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected exactly 1 truncated line (not multiple fragments), got %d", len(lines)) + } + + // Verify it's truncated + if !strings.HasSuffix(lines[0].Text, "...") { + t.Errorf("expected truncated line to end with '...', got %q", lines[0].Text) + } +} + +func TestOutputBuffer_PerJobEvictionBlockedByGlobal(t *testing.T) { + // Test the case where per-job eviction would be needed but global limit + // would still be exceeded after eviction - no eviction should occur + // Per-job: 40 bytes, Global: 50 bytes + ob := NewOutputBuffer(40, 50) + + // Job 1: add 30 bytes + ob.Append(1, OutputLine{Text: "123456789012345678901234567890", Type: "text"}) // 30 bytes, total=30 + + // Job 2: add 20 bytes + ob.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=50 + + // Verify initial state + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + if len(lines1) != 1 || len(lines2) != 1 { + t.Fatalf("expected 1 line each, got job1=%d, job2=%d", len(lines1), len(lines2)) + } + + // Now try to add 25 bytes to job 1 + // Per-job: 30+25=55 > 40, would need to evict 30 bytes (the existing line) + // After eviction: job1=0, total=20, adding 25 would make total=45 < 50 + // BUT: after eviction total=50-30=20, +25=45 < 50, so this should succeed + // Actually wait - let me recalculate: + // current job1=30, total=50 + // new line=25 bytes + // per-job check: 30+25=55 > 40, need to evict 15+ bytes, evict whole line (30 bytes) + // after eviction: job1=0, total=50-30=20 + // global check: 20+25=45 < 50, OK + // This case should succeed, so it's not the right test case + + // Let me create a case where eviction wouldn't help: + // Per-job: 30, Global: 40 + ob2 := NewOutputBuffer(30, 40) + + // Job 1: add 20 bytes + ob2.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + + // Job 2: add 20 bytes + ob2.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=40 + + // Now try to add 25 bytes to job 1 + // Per-job: 20+25=45 > 30, would evict 20 bytes (the existing line) + // After eviction: job1=0, total=40-20=20, +25=45 > 40 - REJECTED + // Existing line should be preserved + + lines1Before := ob2.GetLines(1) + if len(lines1Before) != 1 { + t.Fatalf("expected 1 line for job 1 before, got %d", len(lines1Before)) + } + + // Try to add a line that would exceed global limit even after eviction + ob2.Append(1, OutputLine{Text: "1234567890123456789012345", Type: "text"}) // 25 bytes + + lines1After := ob2.GetLines(1) + if len(lines1After) != 1 { + t.Fatalf("expected 1 line for job 1 (preserved), got %d", len(lines1After)) + } + // Original line should still be there + if lines1After[0].Text != "12345678901234567890" { + t.Errorf("original line should be preserved, got %q", lines1After[0].Text) + } +} + func TestOutputBuffer_Concurrent(t *testing.T) { ob := NewOutputBuffer(10240, 40960) From 7058a80a1856a1e9f6e5fdad9d04be6f64a2b200 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 08:59:00 -0500 Subject: [PATCH 11/13] Fix tail view rendering artifacts on scroll Return tea.ClearScreen for page up/down and g toggle to force Bubbletea to do a full repaint, preventing ghosting artifacts when scrolling. Co-Authored-By: Claude Opus 4.5 --- cmd/roborev/tui.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index 0ad0152..efb5876 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -1469,14 +1469,14 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { if m.tailScroll < 0 { m.tailScroll = 0 } - return m, nil + return m, tea.ClearScreen case "pgdown": visibleLines := m.height - 4 // Match renderTailView reservedLines if visibleLines < 1 { visibleLines = 1 } m.tailScroll += visibleLines - return m, nil + return m, tea.ClearScreen case "home": m.tailFollow = false // Stop auto-scroll when going to top m.tailScroll = 0 @@ -1512,7 +1512,7 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.tailFollow = false m.tailScroll = 0 } - return m, nil + return m, tea.ClearScreen } return m, nil } From a397243a3d6461b1ff7957c7d04d1c395fd3ef7d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 09:06:05 -0500 Subject: [PATCH 12/13] Fix truncation overflow with small maxLine values Skip ellipsis when maxLine < 4 (no room for content + "..."). Add test covering maxLine values 3, 4, 5, and 10. Co-Authored-By: Claude Opus 4.5 --- internal/daemon/outputbuffer.go | 14 +++++---- internal/daemon/outputbuffer_test.go | 46 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go index 8267eff..9d0c0ef 100644 --- a/internal/daemon/outputbuffer.go +++ b/internal/daemon/outputbuffer.go @@ -245,16 +245,18 @@ func (w *outputWriter) Write(p []byte) (n int, err error) { // No complete line yet - check if buffer exceeds max line size if w.maxLine > 0 && w.lineBuf.Len() > w.maxLine { // Force flush truncated line to prevent unbounded growth - // Truncate to maxLine-3 to leave room for "..." suffix - truncLen := w.maxLine - 3 - if truncLen < 1 { - truncLen = 1 + var line string + if w.maxLine >= 4 { + // Room for content + "..." suffix + line = data[:w.maxLine-3] + "..." + } else { + // Too small for ellipsis, just truncate + line = data[:w.maxLine] } - line := data[:truncLen] w.lineBuf.Reset() // Enter discard mode - drop bytes until next newline w.discarding = true - if normalized := w.normalize(line + "..."); normalized != nil { + if normalized := w.normalize(line); normalized != nil { normalized.Timestamp = time.Now() w.buffer.Append(w.jobID, *normalized) } diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go index 96e30fe..bfb5a04 100644 --- a/internal/daemon/outputbuffer_test.go +++ b/internal/daemon/outputbuffer_test.go @@ -371,6 +371,52 @@ func TestOutputWriter_LongLineWithoutNewline(t *testing.T) { } } +func TestOutputWriter_SmallMaxLine(t *testing.T) { + // Test that truncation works correctly with very small maxLine values + // where there's no room for "..." suffix + tests := []struct { + name string + maxLine int + input string + expectLen int + expectNoEll bool // true if no ellipsis expected + }{ + {"maxLine=3", 3, "abcdefgh", 3, true}, // No room for ellipsis + {"maxLine=4", 4, "abcdefgh", 4, false}, // Just enough: 1 char + "..." + {"maxLine=5", 5, "abcdefgh", 5, false}, // 2 chars + "..." + {"maxLine=10", 10, "abcdefghijklmn", 10, false}, // 7 chars + "..." + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ob := NewOutputBuffer(tt.maxLine, 10000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + w.Write([]byte(tt.input)) // No newline, triggers truncation + + lines := ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected 1 line, got %d", len(lines)) + } + + if len(lines[0].Text) != tt.expectLen { + t.Errorf("expected line length %d, got %d: %q", tt.expectLen, len(lines[0].Text), lines[0].Text) + } + + hasEllipsis := strings.HasSuffix(lines[0].Text, "...") + if tt.expectNoEll && hasEllipsis { + t.Errorf("expected no ellipsis for maxLine=%d, got %q", tt.maxLine, lines[0].Text) + } + if !tt.expectNoEll && !hasEllipsis { + t.Errorf("expected ellipsis for maxLine=%d, got %q", tt.maxLine, lines[0].Text) + } + }) + } +} + func TestOutputWriter_MultiWriteLongLineDiscard(t *testing.T) { // Test that after truncating a long line, subsequent writes for the // same line are discarded until a newline is seen. From a73cc8b6037d54df27294d37f4201fc5276c8b00 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 26 Jan 2026 09:27:59 -0500 Subject: [PATCH 13/13] Increase timing threshold for flaky Windows CI test TestKillDaemonSkipsHTTPForNonLoopback was failing on Windows CI at 202ms with a 200ms threshold. Increase to 300ms to account for CI variability. Co-Authored-By: Claude Opus 4.5 --- internal/daemon/runtime_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/daemon/runtime_test.go b/internal/daemon/runtime_test.go index 5ea8d99..9e025d4 100644 --- a/internal/daemon/runtime_test.go +++ b/internal/daemon/runtime_test.go @@ -95,9 +95,9 @@ func TestKillDaemonSkipsHTTPForNonLoopback(t *testing.T) { t.Error("KillDaemon should return true for non-existent PID") } - // Should complete quickly (no HTTP call). Allow 200ms for process checks. - // If HTTP was attempted, it would take at least 500ms (client timeout). - if elapsed > 200*time.Millisecond { + // Should complete quickly (no HTTP call). Allow 300ms for process checks + // (Windows CI can be slow). If HTTP was attempted, it would take 500ms+. + if elapsed > 300*time.Millisecond { t.Errorf("KillDaemon took %v, suggesting HTTP was attempted to non-loopback address", elapsed) } }