diff --git a/cmd/roborev/tui.go b/cmd/roborev/tui.go index 9dc671c..efb5876 100644 --- a/cmd/roborev/tui.go +++ b/cmd/roborev/tui.go @@ -95,6 +95,7 @@ const ( tuiViewComment tuiViewCommitMsg tuiViewHelp + tuiViewTail ) // repoFilterItem represents a repo (or group of repos with same display name) in the filter modal @@ -178,6 +179,14 @@ type tuiModel struct { // Help view state helpFromView tuiView // View to return to after closing help + + // Tail view state + tailJobID int64 // Job being tailed + tailLines []tailLine // Buffer of output lines + tailScroll int // Scroll position + tailStreaming bool // True if job is still running + tailFromView tuiView // View to return to + tailFollow bool // True if auto-scrolling to bottom (follow mode) } // pendingState tracks a pending addressed toggle with sequence number @@ -186,6 +195,23 @@ type pendingState struct { seq uint64 } +// tailLine represents a single line of agent output in the tail view +type tailLine struct { + timestamp time.Time + text string + lineType string // "text", "tool", "thinking", "error" +} + +// tuiTailOutputMsg delivers output lines from the daemon +type tuiTailOutputMsg struct { + lines []tailLine + hasMore bool // true if job is still running + err error +} + +// tuiTailTickMsg triggers a refresh of the tail output +type tuiTailTickMsg struct{} + type tuiTickMsg time.Time type tuiJobsMsg struct { jobs []storage.ReviewJob @@ -647,6 +673,46 @@ func (m tuiModel) fetchReviewForPrompt(jobID int64) tea.Cmd { } } +func (m tuiModel) fetchTailOutput(jobID int64) tea.Cmd { + return func() tea.Msg { + resp, err := m.client.Get(fmt.Sprintf("%s/api/job/output?job_id=%d", m.serverAddr, jobID)) + if err != nil { + return tuiTailOutputMsg{err: err} + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return tuiTailOutputMsg{err: fmt.Errorf("fetch output: %s", resp.Status)} + } + + var result struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []struct { + TS string `json:"ts"` + Text string `json:"text"` + LineType string `json:"line_type"` + } `json:"lines"` + HasMore bool `json:"has_more"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return tuiTailOutputMsg{err: err} + } + + lines := make([]tailLine, len(result.Lines)) + for i, l := range result.Lines { + ts, err := time.Parse(time.RFC3339Nano, l.TS) + if err != nil { + // Fallback to current time if timestamp is invalid + ts = time.Now() + } + lines[i] = tailLine{timestamp: ts, text: l.Text, lineType: l.LineType} + } + + return tuiTailOutputMsg{lines: lines, hasMore: result.HasMore} + } +} + // formatClipboardContent prepares review content for clipboard with a header line func formatClipboardContent(review *storage.Review) string { if review == nil || review.Output == "" { @@ -1355,6 +1421,102 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } } + // Handle tail view + if m.currentView == tuiViewTail { + switch msg.String() { + case "ctrl+c": + return m, tea.Quit + case "esc", "q": + m.currentView = m.tailFromView + m.tailStreaming = false + return m, nil + case "x": + // Cancel the job from tail view + if m.tailJobID > 0 && m.tailStreaming { + for i := range m.jobs { + if m.jobs[i].ID == m.tailJobID { + job := &m.jobs[i] + if job.Status == storage.JobStatusRunning { + oldStatus := job.Status + oldFinishedAt := job.FinishedAt + job.Status = storage.JobStatusCanceled + now := time.Now() + job.FinishedAt = &now + m.tailStreaming = false + return m, m.cancelJob(job.ID, oldStatus, oldFinishedAt) + } + break + } + } + } + return m, nil + case "up", "k": + m.tailFollow = false // Stop auto-scroll when user scrolls up + if m.tailScroll > 0 { + m.tailScroll-- + } + return m, nil + case "down", "j": + m.tailScroll++ + return m, nil + case "pgup": + m.tailFollow = false // Stop auto-scroll when user scrolls up + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + m.tailScroll -= visibleLines + if m.tailScroll < 0 { + m.tailScroll = 0 + } + return m, tea.ClearScreen + case "pgdown": + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + m.tailScroll += visibleLines + return m, tea.ClearScreen + case "home": + m.tailFollow = false // Stop auto-scroll when going to top + m.tailScroll = 0 + return m, nil + case "end": + m.tailFollow = true // Resume auto-scroll when going to bottom + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + m.tailScroll = maxScroll + return m, nil + case "g", "G": + // Toggle between top and bottom + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + if m.tailScroll == 0 { + // At top, go to bottom + m.tailFollow = true + m.tailScroll = maxScroll + } else { + // Not at top, go to top + m.tailFollow = false + m.tailScroll = 0 + } + return m, tea.ClearScreen + } + return m, nil + } + switch msg.String() { case "ctrl+c", "q": if m.currentView == tuiViewReview { @@ -1714,6 +1876,26 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } } + case "t": + // Tail running job output + if m.currentView == tuiViewQueue && len(m.jobs) > 0 && m.selectedIdx >= 0 && m.selectedIdx < len(m.jobs) { + job := m.jobs[m.selectedIdx] + if job.Status == storage.JobStatusRunning { + m.tailJobID = job.ID + m.tailLines = nil + m.tailScroll = 0 + m.tailStreaming = true + m.tailFollow = true // Start in follow mode (auto-scroll) + m.tailFromView = tuiViewQueue + m.currentView = tuiViewTail + return m, tea.Batch(tea.ClearScreen, m.fetchTailOutput(job.ID)) + } else if job.Status == storage.JobStatusQueued { + m.flashMessage = "Job is queued - not yet running" + m.flashExpiresAt = time.Now().Add(2 * time.Second) + m.flashView = tuiViewQueue + } + } + case "f": // Open filter modal if m.currentView == tuiViewQueue { @@ -1933,6 +2115,12 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // initial load or user-initiated actions (filter changes, etc.) return m, tea.Batch(m.tick(), m.fetchJobs(), m.fetchStatus()) + case tuiTailTickMsg: + // Refresh tail output if still in tail view and streaming + if m.currentView == tuiViewTail && m.tailStreaming && m.tailJobID > 0 { + return m, m.fetchTailOutput(m.tailJobID) + } + case tuiJobsMsg: m.loadingMore = false if !msg.append { @@ -2106,6 +2294,39 @@ func (m tuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.currentView = tuiViewPrompt m.promptScroll = 0 + case tuiTailOutputMsg: + m.consecutiveErrors = 0 + if msg.err != nil { + m.err = msg.err + return m, nil + } + if m.currentView == tuiViewTail { + // Only update lines if we have new content, or if job is still streaming + // This preserves the output when job completes (buffer gets closed on server) + if len(msg.lines) > 0 || msg.hasMore { + m.tailLines = msg.lines + } + m.tailStreaming = msg.hasMore + // Auto-scroll to bottom only if in follow mode + if m.tailFollow && len(m.tailLines) > 0 { + visibleLines := m.height - 4 // Match renderTailView reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + m.tailScroll = maxScroll + } + // Continue polling if still streaming + if m.tailStreaming { + return m, tea.Tick(500*time.Millisecond, func(t time.Time) tea.Msg { + return tuiTailTickMsg{} + }) + } + } + case tuiAddressedMsg: if m.currentReview != nil { m.currentReview.Addressed = bool(msg) @@ -2333,6 +2554,9 @@ func (m tuiModel) View() string { if m.currentView == tuiViewHelp { return m.renderHelpView() } + if m.currentView == tuiViewTail { + return m.renderTailView() + } if m.currentView == tuiViewPrompt && m.currentReview != nil { return m.renderPromptView() } @@ -2883,6 +3107,9 @@ func (m tuiModel) renderReviewView() string { func (m tuiModel) renderPromptView() string { var b strings.Builder + // Clear screen and move cursor to home position to prevent artifacts on scroll + b.WriteString("\x1b[2J\x1b[H") + review := m.currentReview if review.Job != nil { ref := shortJobRef(*review.Job) @@ -3245,6 +3472,131 @@ func (m tuiModel) renderCommitMsgView() string { return b.String() } +func (m tuiModel) renderTailView() string { + var b strings.Builder + + // Title with job info + var title string + for _, job := range m.jobs { + if job.ID == m.tailJobID { + repoName := m.getDisplayName(job.RepoPath, job.RepoName) + shortRef := job.GitRef + if len(shortRef) > 7 { + shortRef = shortRef[:7] + } + title = fmt.Sprintf("Tail: %s %s (#%d)", repoName, shortRef, job.ID) + break + } + } + if title == "" { + title = fmt.Sprintf("Tail: Job #%d", m.tailJobID) + } + if m.tailStreaming { + title += " " + tuiRunningStyle.Render("● live") + } else { + title += " " + tuiDoneStyle.Render("● complete") + } + b.WriteString(tuiTitleStyle.Render(title)) + b.WriteString("\x1b[K\n") + + // Calculate visible area + reservedLines := 4 // title + separator + status + help + visibleLines := m.height - reservedLines + if visibleLines < 1 { + visibleLines = 1 + } + + // Clamp scroll + maxScroll := len(m.tailLines) - visibleLines + if maxScroll < 0 { + maxScroll = 0 + } + scroll := m.tailScroll + if scroll > maxScroll { + scroll = maxScroll + } + if scroll < 0 { + scroll = 0 + } + + // Separator + b.WriteString(strings.Repeat("─", m.width)) + b.WriteString("\x1b[K\n") + + // Render lines + linesWritten := 0 + if len(m.tailLines) == 0 { + b.WriteString(tuiStatusStyle.Render("Waiting for output...")) + b.WriteString("\x1b[K\n") + linesWritten++ + } else { + end := scroll + visibleLines + if end > len(m.tailLines) { + end = len(m.tailLines) + } + for i := scroll; i < end; i++ { + line := m.tailLines[i] + // Format with timestamp + ts := line.timestamp.Format("15:04:05") + + // Truncate raw text BEFORE styling to avoid cutting ANSI codes + // Account for timestamp prefix (8 chars + 1 space = 9) + lineText := line.text + maxTextWidth := m.width - 9 + if maxTextWidth > 3 && runewidth.StringWidth(lineText) > maxTextWidth { + lineText = runewidth.Truncate(lineText, maxTextWidth-3, "...") + } + + var text string + switch line.lineType { + case "tool": + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiQueuedStyle.Render(lineText)) + case "error": + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), tuiFailedStyle.Render(lineText)) + default: + text = fmt.Sprintf("%s %s", tuiStatusStyle.Render(ts), lineText) + } + b.WriteString(text) + b.WriteString("\x1b[K\n") + linesWritten++ + } + } + + // Pad remaining lines + for linesWritten < visibleLines { + b.WriteString("\x1b[K\n") + linesWritten++ + } + + // Status line with position and follow mode + var status string + if len(m.tailLines) > visibleLines { + // Calculate actual displayed range (not including padding) + displayEnd := scroll + visibleLines + if displayEnd > len(m.tailLines) { + displayEnd = len(m.tailLines) + } + status = fmt.Sprintf("[%d-%d of %d lines]", scroll+1, displayEnd, len(m.tailLines)) + } else { + status = fmt.Sprintf("[%d lines]", len(m.tailLines)) + } + if m.tailFollow { + status += " " + tuiRunningStyle.Render("[following]") + } else { + status += " " + tuiStatusStyle.Render("[paused - G to follow]") + } + b.WriteString(tuiStatusStyle.Render(status)) + b.WriteString("\x1b[K\n") + + // Help + help := "↑/↓: scroll | g: toggle top/bottom | x: cancel | esc/q: back" + b.WriteString(tuiHelpStyle.Render(help)) + b.WriteString("\x1b[K") + b.WriteString("\x1b[J") // Clear to end of screen + + return b.String() +} + func (m tuiModel) renderHelpView() string { var b strings.Builder @@ -3272,6 +3624,7 @@ func (m tuiModel) renderHelpView() string { keys: []struct{ key, desc string }{ {"a", "Mark as addressed"}, {"c", "Add comment"}, + {"t", "Tail running job output"}, {"x", "Cancel job"}, {"r", "Re-run job"}, {"y", "Copy review to clipboard"}, diff --git a/cmd/roborev/tui_test.go b/cmd/roborev/tui_test.go index 5c2b4ca..ebb8c8b 100644 --- a/cmd/roborev/tui_test.go +++ b/cmd/roborev/tui_test.go @@ -6742,3 +6742,114 @@ func TestSanitizeForDisplay(t *testing.T) { }) } } + +func TestTUITailOutputPreservesLinesOnEmptyResponse(t *testing.T) { + // Test that when a job completes and the server returns empty lines + // (because the buffer was closed), the TUI preserves the existing lines. + m := newTuiModel("http://localhost") + m.currentView = tuiViewTail + m.tailJobID = 1 + m.tailStreaming = true + m.height = 30 + + // Set up initial lines as if we had been streaming output + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Line 1", lineType: "text"}, + {timestamp: time.Now(), text: "Line 2", lineType: "text"}, + {timestamp: time.Now(), text: "Line 3", lineType: "text"}, + } + + // Simulate job completion: server returns empty lines, hasMore=false + emptyMsg := tuiTailOutputMsg{ + lines: []tailLine{}, + hasMore: false, + err: nil, + } + + updated, _ := m.Update(emptyMsg) + m2 := updated.(tuiModel) + + // Lines should be preserved (not cleared) + if len(m2.tailLines) != 3 { + t.Fatalf("Expected 3 lines preserved, got %d", len(m2.tailLines)) + } + + // Streaming should stop + if m2.tailStreaming { + t.Error("Expected tailStreaming to be false after job completes") + } + + // Verify the original content is still there + if m2.tailLines[0].text != "Line 1" { + t.Errorf("Expected 'Line 1', got %q", m2.tailLines[0].text) + } +} + +func TestTUITailOutputUpdatesLinesWhenStreaming(t *testing.T) { + // Test that when streaming and new lines arrive, they are updated + m := newTuiModel("http://localhost") + m.currentView = tuiViewTail + m.tailJobID = 1 + m.tailStreaming = true + m.height = 30 + + // Set up initial lines + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Old line", lineType: "text"}, + } + + // New lines arrive while still streaming + newMsg := tuiTailOutputMsg{ + lines: []tailLine{ + {timestamp: time.Now(), text: "Old line", lineType: "text"}, + {timestamp: time.Now(), text: "New line", lineType: "text"}, + }, + hasMore: true, // Still streaming + err: nil, + } + + updated, _ := m.Update(newMsg) + m2 := updated.(tuiModel) + + // Lines should be updated + if len(m2.tailLines) != 2 { + t.Errorf("Expected 2 lines, got %d", len(m2.tailLines)) + } + + // Streaming should continue + if !m2.tailStreaming { + t.Error("Expected tailStreaming to be true while job is running") + } +} + +func TestTUITailOutputIgnoredWhenNotInTailView(t *testing.T) { + // Test that tail output messages are ignored when not in tail view + m := newTuiModel("http://localhost") + m.currentView = tuiViewQueue // Not in tail view + m.tailJobID = 1 + + // Existing lines from a previous tail session + m.tailLines = []tailLine{ + {timestamp: time.Now(), text: "Previous session line", lineType: "text"}, + } + + // New lines arrive (stale message from previous tail) + msg := tuiTailOutputMsg{ + lines: []tailLine{ + {timestamp: time.Now(), text: "Should be ignored", lineType: "text"}, + }, + hasMore: false, + err: nil, + } + + updated, _ := m.Update(msg) + m2 := updated.(tuiModel) + + // Lines should not be updated since we're not in tail view + if len(m2.tailLines) != 1 { + t.Fatalf("Expected 1 line (unchanged), got %d", len(m2.tailLines)) + } + if m2.tailLines[0].text != "Previous session line" { + t.Errorf("Lines should not be updated when not in tail view") + } +} diff --git a/internal/daemon/normalize.go b/internal/daemon/normalize.go new file mode 100644 index 0000000..ff68e75 --- /dev/null +++ b/internal/daemon/normalize.go @@ -0,0 +1,227 @@ +package daemon + +import ( + "encoding/json" + "regexp" + "strings" +) + +// ansiEscapePattern matches ANSI escape sequences (colors, cursor movement, etc.) +var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;?]*[a-zA-Z]|\x1b\]([^\x07\x1b]|\x1b[^\\])*(\x07|\x1b\\)`) + +// GetNormalizer returns the appropriate normalizer for an agent. +func GetNormalizer(agentName string) OutputNormalizer { + switch agentName { + case "claude-code": + return NormalizeClaudeOutput + case "opencode": + return NormalizeOpenCodeOutput + default: + return NormalizeGenericOutput + } +} + +// claudeNoisePatterns are status messages from Claude CLI that aren't useful progress info +var claudeNoisePatterns = []string{ + "mcp startup:", + "Initializing", + "Connected to", + "Session started", +} + +// isClaudeNoise returns true if the line is a Claude CLI status message to filter out +func isClaudeNoise(line string) bool { + for _, pattern := range claudeNoisePatterns { + if strings.Contains(line, pattern) { + return true + } + } + return false +} + +// NormalizeClaudeOutput parses Claude's stream-json format and extracts readable content. +func NormalizeClaudeOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Filter out Claude CLI noise/status messages + if isClaudeNoise(line) { + return nil + } + + // Try to parse as JSON + var msg struct { + Type string `json:"type"` + Subtype string `json:"subtype,omitempty"` + Message struct { + Content string `json:"content,omitempty"` + } `json:"message,omitempty"` + Result string `json:"result,omitempty"` + SessionID string `json:"session_id,omitempty"` + + // Tool-related fields + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + + // Content delta for streaming + ContentBlockDelta struct { + Delta struct { + Text string `json:"text,omitempty"` + } `json:"delta,omitempty"` + } `json:"content_block_delta,omitempty"` + } + + if err := json.Unmarshal([]byte(line), &msg); err != nil { + // Not JSON - return as raw text (shouldn't happen with Claude) + return &OutputLine{Text: stripANSI(line), Type: "text"} + } + + switch msg.Type { + case "assistant": + // Full assistant message with content + if msg.Message.Content != "" { + // Replace embedded newlines with spaces to keep output on single line + text := strings.ReplaceAll(msg.Message.Content, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + return &OutputLine{Text: text, Type: "text"} + } + // Skip empty assistant messages (e.g., start of response) + return nil + + case "result": + // Final result summary + if msg.Result != "" { + // Replace embedded newlines with spaces + text := strings.ReplaceAll(msg.Result, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + return &OutputLine{Text: text, Type: "text"} + } + return nil + + case "tool_use": + // Tool being called + if msg.Name != "" { + return &OutputLine{Text: "[Tool: " + msg.Name + "]", Type: "tool"} + } + return nil + + case "tool_result": + // Tool finished - could show brief indicator + return &OutputLine{Text: "[Tool completed]", Type: "tool"} + + case "content_block_start": + // Start of a content block - skip + return nil + + case "content_block_delta": + // Streaming text delta + if msg.ContentBlockDelta.Delta.Text != "" { + // Replace embedded newlines with spaces + text := strings.ReplaceAll(msg.ContentBlockDelta.Delta.Text, "\n", " ") + text = strings.ReplaceAll(text, "\r", "") + if text == "" || text == " " { + return nil + } + return &OutputLine{Text: text, Type: "text"} + } + return nil + + case "content_block_stop": + // End of content block - skip + return nil + + case "message_start", "message_delta", "message_stop": + // Message lifecycle events - skip + return nil + + case "system": + // System messages (e.g., init) + if msg.Subtype == "init" { + if msg.SessionID != "" { + sessionPrefix := msg.SessionID + if len(sessionPrefix) > 8 { + sessionPrefix = sessionPrefix[:8] + } + return &OutputLine{Text: "[Session: " + sessionPrefix + "...]", Type: "text"} + } + } + return nil + + case "error": + // Error message + return &OutputLine{Text: "[Error in stream]", Type: "error"} + + default: + // Unknown type - skip to avoid noise + return nil + } +} + +// NormalizeOpenCodeOutput normalizes OpenCode output (plain text with ANSI codes). +func NormalizeOpenCodeOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Filter tool call JSON lines (same logic as in agent package) + if isToolCallJSON(line) { + return &OutputLine{Text: "[Tool call]", Type: "tool"} + } + + // Strip ANSI codes for clean display + text := stripANSI(line) + if text == "" { + return nil + } + + return &OutputLine{Text: text, Type: "text"} +} + +// NormalizeGenericOutput is the default normalizer for other agents. +func NormalizeGenericOutput(line string) *OutputLine { + line = strings.TrimSpace(line) + if line == "" { + return nil + } + + // Strip ANSI codes + text := stripANSI(line) + if text == "" { + return nil + } + + // Filter tool call JSON if present + if isToolCallJSON(text) { + return &OutputLine{Text: "[Tool call]", Type: "tool"} + } + + return &OutputLine{Text: text, Type: "text"} +} + +// stripANSI removes ANSI escape sequences from a string. +func stripANSI(s string) string { + return ansiEscapePattern.ReplaceAllString(s, "") +} + +// isToolCallJSON checks if a line is a tool call JSON object. +// Tool calls have exactly "name" and "arguments" keys. +func isToolCallJSON(line string) bool { + trimmed := strings.TrimSpace(line) + if !strings.HasPrefix(trimmed, "{") { + return false + } + var m map[string]json.RawMessage + if err := json.Unmarshal([]byte(trimmed), &m); err != nil { + return false + } + // Tool calls have exactly "name" and "arguments" keys + if len(m) != 2 { + return false + } + _, hasName := m["name"] + _, hasArgs := m["arguments"] + return hasName && hasArgs +} diff --git a/internal/daemon/normalize_test.go b/internal/daemon/normalize_test.go new file mode 100644 index 0000000..c8153aa --- /dev/null +++ b/internal/daemon/normalize_test.go @@ -0,0 +1,284 @@ +package daemon + +import ( + "testing" +) + +func TestNormalizeClaudeOutput_AssistantMessage(t *testing.T) { + line := `{"type":"assistant","message":{"content":"Hello, I will review this code."}}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Hello, I will review this code." { + t.Errorf("expected message content, got %q", result.Text) + } + if result.Type != "text" { + t.Errorf("expected type 'text', got %q", result.Type) + } +} + +func TestNormalizeClaudeOutput_Result(t *testing.T) { + line := `{"type":"result","result":"Review complete: PASS"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Review complete: PASS" { + t.Errorf("expected result text, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_ToolUse(t *testing.T) { + line := `{"type":"tool_use","name":"Read","input":{"file_path":"/foo/bar.go"}}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool: Read]" { + t.Errorf("expected tool indicator, got %q", result.Text) + } + if result.Type != "tool" { + t.Errorf("expected type 'tool', got %q", result.Type) + } +} + +func TestNormalizeClaudeOutput_ToolResult(t *testing.T) { + line := `{"type":"tool_result","content":"file contents here"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool completed]" { + t.Errorf("expected tool completed indicator, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_SkipsLifecycleEvents(t *testing.T) { + cases := []string{ + `{"type":"message_start"}`, + `{"type":"message_delta"}`, + `{"type":"message_stop"}`, + `{"type":"content_block_start"}`, + `{"type":"content_block_stop"}`, + } + + for _, line := range cases { + result := NormalizeClaudeOutput(line) + if result != nil { + t.Errorf("expected nil for %s, got %+v", line, result) + } + } +} + +func TestNormalizeClaudeOutput_EmptyAssistant(t *testing.T) { + line := `{"type":"assistant","message":{}}` + result := NormalizeClaudeOutput(line) + + if result != nil { + t.Errorf("expected nil for empty assistant message, got %+v", result) + } +} + +func TestNormalizeClaudeOutput_SystemInit(t *testing.T) { + line := `{"type":"system","subtype":"init","session_id":"abc123def456"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Session: abc123de...]" { + t.Errorf("expected truncated session ID, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_SystemInitShortSessionID(t *testing.T) { + // Test that short session IDs don't panic + line := `{"type":"system","subtype":"init","session_id":"abc"}` + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Session: abc...]" { + t.Errorf("expected short session ID preserved, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_InvalidJSON(t *testing.T) { + line := "not json at all" + result := NormalizeClaudeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result for non-JSON") + } + if result.Text != "not json at all" { + t.Errorf("expected raw text, got %q", result.Text) + } +} + +func TestNormalizeClaudeOutput_EmptyLine(t *testing.T) { + result := NormalizeClaudeOutput("") + if result != nil { + t.Errorf("expected nil for empty line, got %+v", result) + } + + result = NormalizeClaudeOutput(" ") + if result != nil { + t.Errorf("expected nil for whitespace line, got %+v", result) + } +} + +func TestNormalizeOpenCodeOutput_PlainText(t *testing.T) { + line := "Reviewing the code changes..." + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Reviewing the code changes..." { + t.Errorf("expected plain text, got %q", result.Text) + } + if result.Type != "text" { + t.Errorf("expected type 'text', got %q", result.Type) + } +} + +func TestNormalizeOpenCodeOutput_StripsANSI(t *testing.T) { + line := "\x1b[32mGreen text\x1b[0m" + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Green text" { + t.Errorf("expected ANSI stripped, got %q", result.Text) + } +} + +func TestNormalizeOpenCodeOutput_FilterToolCall(t *testing.T) { + line := `{"name":"read","arguments":{"path":"/foo"}}` + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "[Tool call]" { + t.Errorf("expected tool call indicator, got %q", result.Text) + } + if result.Type != "tool" { + t.Errorf("expected type 'tool', got %q", result.Type) + } +} + +func TestNormalizeOpenCodeOutput_PreservesJSONWithExtraKeys(t *testing.T) { + // JSON with more than just name/arguments should be preserved + line := `{"name":"test","arguments":{},"extra":"key"}` + result := NormalizeOpenCodeOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + // Should not be filtered as tool call + if result.Type == "tool" { + t.Error("expected non-tool type for JSON with extra keys") + } +} + +func TestNormalizeOpenCodeOutput_EmptyLine(t *testing.T) { + result := NormalizeOpenCodeOutput("") + if result != nil { + t.Errorf("expected nil for empty line, got %+v", result) + } +} + +func TestNormalizeGenericOutput_PlainText(t *testing.T) { + line := "Some agent output" + result := NormalizeGenericOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Some agent output" { + t.Errorf("expected plain text, got %q", result.Text) + } +} + +func TestNormalizeGenericOutput_StripsANSI(t *testing.T) { + line := "\x1b[1;31mBold red\x1b[0m normal" + result := NormalizeGenericOutput(line) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Text != "Bold red normal" { + t.Errorf("expected ANSI stripped, got %q", result.Text) + } +} + +func TestGetNormalizer(t *testing.T) { + cases := []struct { + agent string + expected string + }{ + {"claude-code", "claude"}, + {"opencode", "opencode"}, + {"codex", "generic"}, + {"gemini", "generic"}, + {"unknown", "generic"}, + } + + for _, tc := range cases { + normalizer := GetNormalizer(tc.agent) + if normalizer == nil { + t.Errorf("GetNormalizer(%q) returned nil", tc.agent) + } + } +} + +func TestStripANSI(t *testing.T) { + cases := []struct { + input string + expected string + }{ + {"plain text", "plain text"}, + {"\x1b[32mgreen\x1b[0m", "green"}, + {"\x1b[1;31;40mcomplex\x1b[0m", "complex"}, + {"\x1b[?25hcursor\x1b[?25l", "cursor"}, + {"no escape", "no escape"}, + {"\x1b]0;title\x07text", "text"}, // OSC sequence + } + + for _, tc := range cases { + result := stripANSI(tc.input) + if result != tc.expected { + t.Errorf("stripANSI(%q) = %q, want %q", tc.input, result, tc.expected) + } + } +} + +func TestIsToolCallJSON(t *testing.T) { + cases := []struct { + input string + expected bool + }{ + {`{"name":"read","arguments":{}}`, true}, + {`{"name":"write","arguments":{"path":"/foo"}}`, true}, + {`{"name":"test","arguments":{},"extra":true}`, false}, // extra key + {`{"name":"only"}`, false}, // missing arguments + {`{"arguments":{}}`, false}, // missing name + {`not json`, false}, + {`{"other":"object"}`, false}, + } + + for _, tc := range cases { + result := isToolCallJSON(tc.input) + if result != tc.expected { + t.Errorf("isToolCallJSON(%q) = %v, want %v", tc.input, result, tc.expected) + } + } +} diff --git a/internal/daemon/outputbuffer.go b/internal/daemon/outputbuffer.go new file mode 100644 index 0000000..9d0c0ef --- /dev/null +++ b/internal/daemon/outputbuffer.go @@ -0,0 +1,303 @@ +package daemon + +import ( + "bytes" + "strings" + "sync" + "time" +) + +// OutputLine represents a single line of normalized output +type OutputLine struct { + Timestamp time.Time `json:"ts"` + Text string `json:"text"` + Type string `json:"line_type"` // "text", "tool", "thinking", "error" +} + +// JobOutput stores output for a single job +type JobOutput struct { + mu sync.RWMutex + lines []OutputLine + totalBytes int + startTime time.Time + closed bool + subs []chan OutputLine // Subscribers for streaming +} + +// OutputBuffer stores streaming output for running jobs with memory limits. +type OutputBuffer struct { + mu sync.RWMutex + buffers map[int64]*JobOutput + maxPerJob int // max bytes per job + maxTotal int // max total bytes across all jobs + totalBytes int +} + +// NewOutputBuffer creates a new output buffer with the given limits. +func NewOutputBuffer(maxPerJob, maxTotal int) *OutputBuffer { + return &OutputBuffer{ + buffers: make(map[int64]*JobOutput), + maxPerJob: maxPerJob, + maxTotal: maxTotal, + } +} + +// getOrCreate returns the JobOutput for a job, creating if needed. +func (ob *OutputBuffer) getOrCreate(jobID int64) *JobOutput { + ob.mu.Lock() + defer ob.mu.Unlock() + + if jo, ok := ob.buffers[jobID]; ok { + return jo + } + + jo := &JobOutput{ + lines: make([]OutputLine, 0, 100), + startTime: time.Now(), + } + ob.buffers[jobID] = jo + return jo +} + +// Append adds a line to the job's output buffer. +func (ob *OutputBuffer) Append(jobID int64, line OutputLine) { + jo := ob.getOrCreate(jobID) + + jo.mu.Lock() + defer jo.mu.Unlock() + + if jo.closed { + return + } + + lineBytes := len(line.Text) + + // Drop oversized lines that exceed per-job limit on their own + if lineBytes > ob.maxPerJob { + return + } + + // Calculate how many bytes would be evicted for per-job limit + evictBytes := 0 + tempTotal := jo.totalBytes + evictCount := 0 + for tempTotal+lineBytes > ob.maxPerJob && evictCount < len(jo.lines) { + evictBytes += len(jo.lines[evictCount].Text) + tempTotal -= len(jo.lines[evictCount].Text) + evictCount++ + } + + // Check global memory limit BEFORE evicting - if we'd still exceed + // maxTotal after eviction, reject without losing existing lines + ob.mu.Lock() + if ob.totalBytes-evictBytes+lineBytes > ob.maxTotal { + ob.mu.Unlock() + return // Drop line to enforce global memory limit + } + + // Perform the eviction and add BEFORE updating global total + // to keep ob.totalBytes in sync with actual memory usage + if evictCount > 0 { + jo.lines = jo.lines[evictCount:] + jo.totalBytes -= evictBytes + } + + // Add the line + jo.lines = append(jo.lines, line) + jo.totalBytes += lineBytes + + // Update global total after eviction/add - now reflects actual state + ob.totalBytes = ob.totalBytes - evictBytes + lineBytes + ob.mu.Unlock() + + // Notify subscribers + for _, ch := range jo.subs { + select { + case ch <- line: + default: + // Drop if subscriber is slow + } + } +} + +// GetLines returns all lines for a job. +func (ob *OutputBuffer) GetLines(jobID int64) []OutputLine { + ob.mu.RLock() + jo, ok := ob.buffers[jobID] + ob.mu.RUnlock() + + if !ok { + return nil + } + + jo.mu.RLock() + defer jo.mu.RUnlock() + + result := make([]OutputLine, len(jo.lines)) + copy(result, jo.lines) + return result +} + +// Subscribe returns existing lines and a channel for new lines. +// Call the returned cancel function when done. +func (ob *OutputBuffer) Subscribe(jobID int64) ([]OutputLine, <-chan OutputLine, func()) { + jo := ob.getOrCreate(jobID) + + jo.mu.Lock() + defer jo.mu.Unlock() + + // Copy existing lines + initial := make([]OutputLine, len(jo.lines)) + copy(initial, jo.lines) + + // Create subscription channel + ch := make(chan OutputLine, 100) + jo.subs = append(jo.subs, ch) + + // Return cancel function + cancel := func() { + jo.mu.Lock() + defer jo.mu.Unlock() + for i, sub := range jo.subs { + if sub == ch { + jo.subs = append(jo.subs[:i], jo.subs[i+1:]...) + close(ch) + break + } + } + } + + return initial, ch, cancel +} + +// CloseJob marks a job as complete and removes its buffer. +func (ob *OutputBuffer) CloseJob(jobID int64) { + ob.mu.Lock() + jo, ok := ob.buffers[jobID] + if !ok { + ob.mu.Unlock() + return + } + delete(ob.buffers, jobID) + ob.mu.Unlock() + + jo.mu.Lock() + defer jo.mu.Unlock() + + jo.closed = true + ob.mu.Lock() + ob.totalBytes -= jo.totalBytes + ob.mu.Unlock() + + // Close all subscriber channels + for _, ch := range jo.subs { + close(ch) + } + jo.subs = nil +} + +// IsActive returns true if there's an active buffer for this job. +func (ob *OutputBuffer) IsActive(jobID int64) bool { + ob.mu.RLock() + defer ob.mu.RUnlock() + _, ok := ob.buffers[jobID] + return ok +} + +// OutputNormalizer converts agent-specific output to normalized OutputLines. +type OutputNormalizer func(line string) *OutputLine + +// outputWriter implements io.Writer and normalizes output to the buffer. +type outputWriter struct { + buffer *OutputBuffer + jobID int64 + normalize OutputNormalizer + lineBuf bytes.Buffer + maxLine int // Max line size before forced flush (prevents unbounded growth) + discarding bool // True when discarding bytes until next newline (after truncation) +} + +func (w *outputWriter) Write(p []byte) (n int, err error) { + w.lineBuf.Write(p) + + // Process complete lines + for { + data := w.lineBuf.String() + idx := strings.Index(data, "\n") + + // If discarding, skip all data until newline + if w.discarding { + if idx < 0 { + // No newline yet, discard everything + w.lineBuf.Reset() + break + } + // Found newline, stop discarding and keep remainder + w.lineBuf.Reset() + if idx+1 < len(data) { + w.lineBuf.WriteString(data[idx+1:]) + } + w.discarding = false + continue + } + + if idx < 0 { + // No complete line yet - check if buffer exceeds max line size + if w.maxLine > 0 && w.lineBuf.Len() > w.maxLine { + // Force flush truncated line to prevent unbounded growth + var line string + if w.maxLine >= 4 { + // Room for content + "..." suffix + line = data[:w.maxLine-3] + "..." + } else { + // Too small for ellipsis, just truncate + line = data[:w.maxLine] + } + w.lineBuf.Reset() + // Enter discard mode - drop bytes until next newline + w.discarding = true + if normalized := w.normalize(line); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + continue + } + break + } + // Extract line and update buffer + line := data[:idx] + w.lineBuf.Reset() + if idx+1 < len(data) { + w.lineBuf.WriteString(data[idx+1:]) + } + line = strings.TrimSuffix(line, "\r") + if normalized := w.normalize(line); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + } + return len(p), nil +} + +// Flush processes any remaining buffered content. +func (w *outputWriter) Flush() { + if w.lineBuf.Len() > 0 { + line := w.lineBuf.String() + w.lineBuf.Reset() + if normalized := w.normalize(line); normalized != nil { + normalized.Timestamp = time.Now() + w.buffer.Append(w.jobID, *normalized) + } + } +} + +// Writer returns an io.Writer that normalizes and stores output for a job. +// Lines exceeding maxPerJob will be truncated to prevent unbounded buffer growth. +func (ob *OutputBuffer) Writer(jobID int64, normalize OutputNormalizer) *outputWriter { + return &outputWriter{ + buffer: ob, + jobID: jobID, + normalize: normalize, + maxLine: ob.maxPerJob, + } +} diff --git a/internal/daemon/outputbuffer_test.go b/internal/daemon/outputbuffer_test.go new file mode 100644 index 0000000..bfb5a04 --- /dev/null +++ b/internal/daemon/outputbuffer_test.go @@ -0,0 +1,536 @@ +package daemon + +import ( + "strings" + "sync" + "testing" + "time" +) + +func TestOutputBuffer_Append(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "line 1", Type: "text"}) + ob.Append(1, OutputLine{Text: "line 2", Type: "tool"}) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "line 1" { + t.Errorf("expected 'line 1', got %q", lines[0].Text) + } + if lines[1].Type != "tool" { + t.Errorf("expected type 'tool', got %q", lines[1].Type) + } +} + +func TestOutputBuffer_GetLinesEmpty(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + lines := ob.GetLines(999) + if lines != nil { + t.Errorf("expected nil for non-existent job, got %v", lines) + } +} + +func TestOutputBuffer_PerJobLimit(t *testing.T) { + // Small limit: 50 bytes per job + ob := NewOutputBuffer(50, 1000) + + // Add lines that exceed the limit + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes - should evict first + + lines := ob.GetLines(1) + // First line should be evicted to make room + if len(lines) != 2 { + t.Fatalf("expected 2 lines after eviction, got %d", len(lines)) + } +} + +func TestOutputBuffer_GlobalLimit(t *testing.T) { + // Small global limit: 50 bytes total, 30 bytes per job + ob := NewOutputBuffer(30, 50) + + // Add lines across multiple jobs + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=20 + ob.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=40 + ob.Append(3, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes - would exceed 50, dropped + + // Job 3's line should be dropped due to global limit + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + lines3 := ob.GetLines(3) + + if len(lines1) != 1 { + t.Errorf("expected 1 line for job 1, got %d", len(lines1)) + } + if len(lines2) != 1 { + t.Errorf("expected 1 line for job 2, got %d", len(lines2)) + } + if len(lines3) != 0 { + t.Errorf("expected 0 lines for job 3 (global limit exceeded), got %d", len(lines3)) + } +} + +func TestOutputBuffer_OversizedLine(t *testing.T) { + // Per-job limit: 20 bytes + ob := NewOutputBuffer(20, 1000) + + // Try to add a line larger than per-job limit + ob.Append(1, OutputLine{Text: "this line is way too long to fit in buffer", Type: "text"}) // 43 bytes > 20 + + // Line should be dropped + lines := ob.GetLines(1) + if len(lines) != 0 { + t.Errorf("expected 0 lines (oversized dropped), got %d", len(lines)) + } + + // Normal sized lines should still work + ob.Append(1, OutputLine{Text: "short", Type: "text"}) // 5 bytes + lines = ob.GetLines(1) + if len(lines) != 1 { + t.Errorf("expected 1 line after normal append, got %d", len(lines)) + } +} + +func TestOutputBuffer_GlobalLimitPreservesExistingLines(t *testing.T) { + // Scenario: job has lines, per-job eviction would occur, but global limit rejects. + // Existing lines should be preserved (not evicted for nothing). + // Per-job: 50 bytes, Global: 80 bytes + ob := NewOutputBuffer(50, 80) + + // Job 1: add 40 bytes (two 20-byte lines) + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + ob.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, job1=40, total=40 + + // Job 2: add 30 bytes + ob.Append(2, OutputLine{Text: "123456789012345678901234567890", Type: "text"}) // 30 bytes, total=70 + + // Verify initial state + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + if len(lines1) != 2 { + t.Fatalf("expected 2 lines for job 1, got %d", len(lines1)) + } + if len(lines2) != 1 { + t.Fatalf("expected 1 line for job 2, got %d", len(lines2)) + } + + // Now try to add 20 bytes to job 1 + // Per-job: 40+20=60 > 50, would need to evict 20 bytes (1 line) + // After eviction: job1=40, but total would be 70-20+20=70, still under 80 + // This SHOULD succeed + ob.Append(1, OutputLine{Text: "AAAAAAAAAAAAAAAAAAAA", Type: "text"}) // 20 bytes + + lines1 = ob.GetLines(1) + if len(lines1) != 2 { + t.Errorf("expected 2 lines for job 1 after eviction+add, got %d", len(lines1)) + } + + // Now global is at 70. Try to add 20 more bytes to job 1. + // Per-job: 40+20=60 > 50, would evict 20 bytes + // After eviction: total would be 70-20+20=70, under 80 + // This SHOULD succeed + ob.Append(1, OutputLine{Text: "BBBBBBBBBBBBBBBBBBBB", Type: "text"}) // 20 bytes + + lines1 = ob.GetLines(1) + if len(lines1) != 2 { + t.Errorf("expected 2 lines for job 1 after second eviction+add, got %d", len(lines1)) + } + + // Now try to add 15 bytes to job 2 (total would be 70+15=85 > 80) + // Per-job: 30+15=45 < 50, no eviction + // Global: 70+15=85 > 80, REJECTED + // Job 2 should keep its original line + ob.Append(2, OutputLine{Text: "123456789012345", Type: "text"}) // 15 bytes - rejected + + lines2 = ob.GetLines(2) + if len(lines2) != 1 { + t.Errorf("expected job 2 to keep 1 line after global rejection, got %d", len(lines2)) + } + if lines2[0].Text != "123456789012345678901234567890" { + t.Errorf("job 2 original line was modified: %q", lines2[0].Text) + } +} + +func TestOutputBuffer_CloseJob(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "test", Type: "text"}) + if !ob.IsActive(1) { + t.Error("expected job to be active") + } + + ob.CloseJob(1) + + if ob.IsActive(1) { + t.Error("expected job to be inactive after close") + } + + lines := ob.GetLines(1) + if lines != nil { + t.Error("expected nil lines after close") + } +} + +func TestOutputBuffer_Subscribe(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + // Add initial line + ob.Append(1, OutputLine{Text: "initial", Type: "text"}) + + // Subscribe + initial, ch, cancel := ob.Subscribe(1) + defer cancel() + + if len(initial) != 1 { + t.Fatalf("expected 1 initial line, got %d", len(initial)) + } + if initial[0].Text != "initial" { + t.Errorf("expected 'initial', got %q", initial[0].Text) + } + + // Add more lines after subscription + go func() { + time.Sleep(10 * time.Millisecond) + ob.Append(1, OutputLine{Text: "new", Type: "text"}) + }() + + select { + case line := <-ch: + if line.Text != "new" { + t.Errorf("expected 'new', got %q", line.Text) + } + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for subscribed line") + } +} + +func TestOutputBuffer_SubscribeCancel(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + _, ch, cancel := ob.Subscribe(1) + cancel() + + // Channel should be closed + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed") + } + default: + // Channel closed, as expected + } +} + +func TestOutputBuffer_CloseJobClosesSubscribers(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + + ob.Append(1, OutputLine{Text: "test", Type: "text"}) + _, ch, _ := ob.Subscribe(1) + + ob.CloseJob(1) + + // Channel should be closed + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed after CloseJob") + } + case <-time.After(100 * time.Millisecond): + t.Error("channel not closed after CloseJob") + } +} + +func TestOutputWriter_Write(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write with newline + w.Write([]byte("hello\n")) + w.Write([]byte("world\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "hello" { + t.Errorf("expected 'hello', got %q", lines[0].Text) + } + if lines[1].Text != "world" { + t.Errorf("expected 'world', got %q", lines[1].Text) + } +} + +func TestOutputWriter_WritePartialLines(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write partial line + w.Write([]byte("hel")) + w.Write([]byte("lo\nwor")) + w.Write([]byte("ld\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d", len(lines)) + } + if lines[0].Text != "hello" { + t.Errorf("expected 'hello', got %q", lines[0].Text) + } + if lines[1].Text != "world" { + t.Errorf("expected 'world', got %q", lines[1].Text) + } +} + +func TestOutputWriter_Flush(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write without newline + w.Write([]byte("incomplete")) + + // Should not appear yet + lines := ob.GetLines(1) + if len(lines) != 0 { + t.Fatalf("expected 0 lines before flush, got %d", len(lines)) + } + + // Flush should process remaining + w.Flush() + + lines = ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected 1 line after flush, got %d", len(lines)) + } + if lines[0].Text != "incomplete" { + t.Errorf("expected 'incomplete', got %q", lines[0].Text) + } +} + +func TestOutputWriter_NormalizeFilters(t *testing.T) { + ob := NewOutputBuffer(1024, 4096) + // Normalizer that filters out empty lines + normalize := func(line string) *OutputLine { + if line == "" { + return nil + } + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + w.Write([]byte("keep\n\nskip empty\n")) + + lines := ob.GetLines(1) + if len(lines) != 2 { + t.Fatalf("expected 2 lines (empty filtered), got %d", len(lines)) + } +} + +func TestOutputWriter_LongLineWithoutNewline(t *testing.T) { + // Per-job limit: 50 bytes + ob := NewOutputBuffer(50, 1000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write a very long line without newline - should be force-flushed with truncation + longLine := strings.Repeat("x", 100) + w.Write([]byte(longLine)) + + lines := ob.GetLines(1) + // Should have at least one line from forced flush + if len(lines) == 0 { + t.Fatalf("expected at least 1 line after forced flush, got 0") + } + + // First line should be truncated to maxLine-3 + "..." = 50 bytes total + if len(lines[0].Text) != 50 { + t.Errorf("expected truncated line to be 50 bytes, got %d bytes: %q", len(lines[0].Text), lines[0].Text) + } + if !strings.HasSuffix(lines[0].Text, "...") { + t.Errorf("expected line to end with '...', got %q", lines[0].Text) + } +} + +func TestOutputWriter_SmallMaxLine(t *testing.T) { + // Test that truncation works correctly with very small maxLine values + // where there's no room for "..." suffix + tests := []struct { + name string + maxLine int + input string + expectLen int + expectNoEll bool // true if no ellipsis expected + }{ + {"maxLine=3", 3, "abcdefgh", 3, true}, // No room for ellipsis + {"maxLine=4", 4, "abcdefgh", 4, false}, // Just enough: 1 char + "..." + {"maxLine=5", 5, "abcdefgh", 5, false}, // 2 chars + "..." + {"maxLine=10", 10, "abcdefghijklmn", 10, false}, // 7 chars + "..." + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ob := NewOutputBuffer(tt.maxLine, 10000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + w.Write([]byte(tt.input)) // No newline, triggers truncation + + lines := ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected 1 line, got %d", len(lines)) + } + + if len(lines[0].Text) != tt.expectLen { + t.Errorf("expected line length %d, got %d: %q", tt.expectLen, len(lines[0].Text), lines[0].Text) + } + + hasEllipsis := strings.HasSuffix(lines[0].Text, "...") + if tt.expectNoEll && hasEllipsis { + t.Errorf("expected no ellipsis for maxLine=%d, got %q", tt.maxLine, lines[0].Text) + } + if !tt.expectNoEll && !hasEllipsis { + t.Errorf("expected ellipsis for maxLine=%d, got %q", tt.maxLine, lines[0].Text) + } + }) + } +} + +func TestOutputWriter_MultiWriteLongLineDiscard(t *testing.T) { + // Test that after truncating a long line, subsequent writes for the + // same line are discarded until a newline is seen. + // Key invariant: repeated writes without newlines produce at most ONE truncated line + ob := NewOutputBuffer(100, 10000) + normalize := func(line string) *OutputLine { + return &OutputLine{Text: line, Type: "text"} + } + + w := ob.Writer(1, normalize) + + // Write data exceeding maxLine (100 bytes) multiple times WITHOUT a newline + // This simulates a single very long line being written in chunks + for i := 0; i < 5; i++ { + w.Write([]byte(strings.Repeat("x", 50))) // 5 * 50 = 250 bytes total + } + + // Should only have 1 line (the truncated one), not 5 fragments + lines := ob.GetLines(1) + if len(lines) != 1 { + t.Fatalf("expected exactly 1 truncated line (not multiple fragments), got %d", len(lines)) + } + + // Verify it's truncated + if !strings.HasSuffix(lines[0].Text, "...") { + t.Errorf("expected truncated line to end with '...', got %q", lines[0].Text) + } +} + +func TestOutputBuffer_PerJobEvictionBlockedByGlobal(t *testing.T) { + // Test the case where per-job eviction would be needed but global limit + // would still be exceeded after eviction - no eviction should occur + // Per-job: 40 bytes, Global: 50 bytes + ob := NewOutputBuffer(40, 50) + + // Job 1: add 30 bytes + ob.Append(1, OutputLine{Text: "123456789012345678901234567890", Type: "text"}) // 30 bytes, total=30 + + // Job 2: add 20 bytes + ob.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=50 + + // Verify initial state + lines1 := ob.GetLines(1) + lines2 := ob.GetLines(2) + if len(lines1) != 1 || len(lines2) != 1 { + t.Fatalf("expected 1 line each, got job1=%d, job2=%d", len(lines1), len(lines2)) + } + + // Now try to add 25 bytes to job 1 + // Per-job: 30+25=55 > 40, would need to evict 30 bytes (the existing line) + // After eviction: job1=0, total=20, adding 25 would make total=45 < 50 + // BUT: after eviction total=50-30=20, +25=45 < 50, so this should succeed + // Actually wait - let me recalculate: + // current job1=30, total=50 + // new line=25 bytes + // per-job check: 30+25=55 > 40, need to evict 15+ bytes, evict whole line (30 bytes) + // after eviction: job1=0, total=50-30=20 + // global check: 20+25=45 < 50, OK + // This case should succeed, so it's not the right test case + + // Let me create a case where eviction wouldn't help: + // Per-job: 30, Global: 40 + ob2 := NewOutputBuffer(30, 40) + + // Job 1: add 20 bytes + ob2.Append(1, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes + + // Job 2: add 20 bytes + ob2.Append(2, OutputLine{Text: "12345678901234567890", Type: "text"}) // 20 bytes, total=40 + + // Now try to add 25 bytes to job 1 + // Per-job: 20+25=45 > 30, would evict 20 bytes (the existing line) + // After eviction: job1=0, total=40-20=20, +25=45 > 40 - REJECTED + // Existing line should be preserved + + lines1Before := ob2.GetLines(1) + if len(lines1Before) != 1 { + t.Fatalf("expected 1 line for job 1 before, got %d", len(lines1Before)) + } + + // Try to add a line that would exceed global limit even after eviction + ob2.Append(1, OutputLine{Text: "1234567890123456789012345", Type: "text"}) // 25 bytes + + lines1After := ob2.GetLines(1) + if len(lines1After) != 1 { + t.Fatalf("expected 1 line for job 1 (preserved), got %d", len(lines1After)) + } + // Original line should still be there + if lines1After[0].Text != "12345678901234567890" { + t.Errorf("original line should be preserved, got %q", lines1After[0].Text) + } +} + +func TestOutputBuffer_Concurrent(t *testing.T) { + ob := NewOutputBuffer(10240, 40960) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(jobID int64) { + defer wg.Done() + for j := 0; j < 100; j++ { + ob.Append(jobID, OutputLine{Text: "test", Type: "text"}) + } + }(int64(i)) + } + + wg.Wait() + + // All jobs should have lines + for i := 0; i < 10; i++ { + lines := ob.GetLines(int64(i)) + if len(lines) == 0 { + t.Errorf("job %d has no lines", i) + } + } +} diff --git a/internal/daemon/runtime_test.go b/internal/daemon/runtime_test.go index 5ea8d99..9e025d4 100644 --- a/internal/daemon/runtime_test.go +++ b/internal/daemon/runtime_test.go @@ -95,9 +95,9 @@ func TestKillDaemonSkipsHTTPForNonLoopback(t *testing.T) { t.Error("KillDaemon should return true for non-existent PID") } - // Should complete quickly (no HTTP call). Allow 200ms for process checks. - // If HTTP was attempted, it would take at least 500ms (client timeout). - if elapsed > 200*time.Millisecond { + // Should complete quickly (no HTTP call). Allow 300ms for process checks + // (Windows CI can be slow). If HTTP was attempted, it would take 500ms+. + if elapsed > 300*time.Millisecond { t.Errorf("KillDaemon took %v, suggesting HTTP was attempted to non-loopback address", elapsed) } } diff --git a/internal/daemon/server.go b/internal/daemon/server.go index eae2bdb..0b0f465 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -65,6 +65,7 @@ func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server { mux.HandleFunc("/api/health", s.handleHealth) mux.HandleFunc("/api/jobs", s.handleListJobs) mux.HandleFunc("/api/job/cancel", s.handleCancelJob) + mux.HandleFunc("/api/job/output", s.handleJobOutput) mux.HandleFunc("/api/job/rerun", s.handleRerunJob) mux.HandleFunc("/api/repos", s.handleListRepos) mux.HandleFunc("/api/review", s.handleGetReview) @@ -646,6 +647,129 @@ func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]interface{}{"success": true}) } +// JobOutputResponse is the response for /api/job/output +type JobOutputResponse struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []OutputLine `json:"lines"` + HasMore bool `json:"has_more"` +} + +func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + jobIDStr := r.URL.Query().Get("job_id") + if jobIDStr == "" { + writeError(w, http.StatusBadRequest, "job_id required") + return + } + + var jobID int64 + if _, err := fmt.Sscanf(jobIDStr, "%d", &jobID); err != nil { + writeError(w, http.StatusBadRequest, "invalid job_id") + return + } + + // Check job exists + job, err := s.db.GetJobByID(jobID) + if err != nil { + writeError(w, http.StatusNotFound, "job not found") + return + } + + // Check if streaming mode requested + stream := r.URL.Query().Get("stream") == "1" + + if !stream { + // Return current buffer (polling mode) + lines := s.workerPool.GetJobOutput(jobID) + if lines == nil { + lines = []OutputLine{} + } + + resp := JobOutputResponse{ + JobID: jobID, + Status: string(job.Status), + Lines: lines, + HasMore: job.Status == storage.JobStatusRunning, + } + writeJSON(w, http.StatusOK, resp) + return + } + + // Streaming mode via SSE + // Don't stream for non-running jobs - they have no active buffer producer + // and would hang forever waiting for data + if job.Status != storage.JobStatusRunning { + w.Header().Set("Content-Type", "application/x-ndjson") + encoder := json.NewEncoder(w) + encoder.Encode(map[string]interface{}{ + "type": "complete", + "status": string(job.Status), + }) + return + } + + w.Header().Set("Content-Type", "application/x-ndjson") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + // Subscribe to output + initial, ch, cancel := s.workerPool.SubscribeJobOutput(jobID) + defer cancel() + + encoder := json.NewEncoder(w) + + // Send initial lines + for _, line := range initial { + encoder.Encode(map[string]interface{}{ + "type": "line", + "ts": line.Timestamp.Format(time.RFC3339Nano), + "text": line.Text, + "line_type": line.Type, + }) + } + flusher.Flush() + + // Stream new lines until job completes or client disconnects + for { + select { + case <-r.Context().Done(): + return + case line, ok := <-ch: + if !ok { + // Job finished - channel closed, fetch actual status + finalStatus := "done" + if finalJob, err := s.db.GetJobByID(jobID); err == nil { + finalStatus = string(finalJob.Status) + } + encoder.Encode(map[string]interface{}{ + "type": "complete", + "status": finalStatus, + }) + flusher.Flush() + return + } + encoder.Encode(map[string]interface{}{ + "type": "line", + "ts": line.Timestamp.Format(time.RFC3339Nano), + "text": line.Text, + "line_type": line.Type, + }) + flusher.Flush() + } + } +} + type RerunJobRequest struct { JobID int64 `json:"job_id"` } diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index b37157a..a2c9a53 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -2171,3 +2171,379 @@ func TestGetMachineID_CachingBehavior(t *testing.T) { } }) } + +// TestHandleAddCommentToJobStates tests that comments can be added to jobs +// in any state: queued, running, done, failed, and canceled. +func TestHandleAddCommentToJobStates(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create repo and commit + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test commit", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + testCases := []struct { + name string + setupQuery string // SQL to set job to specific state + }{ + {"queued job", ""}, + {"running job", `UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`}, + {"completed job", `UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`}, + {"failed job", `UPDATE review_jobs SET status = 'failed', started_at = datetime('now'), finished_at = datetime('now'), error = 'test error' WHERE id = ?`}, + {"canceled job", `UPDATE review_jobs SET status = 'canceled', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a job + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to desired state + if tc.setupQuery != "" { + if _, err := db.Exec(tc.setupQuery, job.ID); err != nil { + t.Fatalf("Failed to set job state: %v", err) + } + } + + // Add comment via API + reqData := map[string]interface{}{ + "job_id": job.ID, + "commenter": "test-user", + "comment": "Test comment for " + tc.name, + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify response contains the comment + var resp storage.Response + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + if resp.Responder != "test-user" { + t.Errorf("Expected responder 'test-user', got %q", resp.Responder) + } + }) + } +} + +// TestHandleAddCommentToNonExistentJob tests that adding a comment to a +// non-existent job returns 404. +func TestHandleAddCommentToNonExistentJob(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + reqData := map[string]interface{}{ + "job_id": 99999, + "commenter": "test-user", + "comment": "This should fail", + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "job not found") { + t.Errorf("Expected 'job not found' error, got: %s", w.Body.String()) + } +} + +// TestHandleAddCommentWithoutReview tests that comments can be added to jobs +// that don't have a review yet (job exists but hasn't completed). +func TestHandleAddCommentWithoutReview(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create repo, commit, and job (but NO review) + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test commit", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to running (no review exists yet) + if _, err := db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID); err != nil { + t.Fatalf("Failed to set job to running: %v", err) + } + + // Verify no review exists + if _, err := db.GetReviewByJobID(job.ID); err == nil { + t.Fatal("Expected no review to exist for job") + } + + // Add comment - should succeed even without a review + reqData := map[string]interface{}{ + "job_id": job.ID, + "commenter": "test-user", + "comment": "Comment on in-progress job without review", + } + reqBody, _ := json.Marshal(reqData) + req := httptest.NewRequest(http.MethodPost, "/api/comment", bytes.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleAddComment(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify comment was stored + comments, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(comments) != 1 { + t.Fatalf("Expected 1 comment, got %d", len(comments)) + } + if comments[0].Response != "Comment on in-progress job without review" { + t.Errorf("Unexpected comment: %q", comments[0].Response) + } +} + +// TestHandleJobOutput_InvalidJobID tests that invalid job_id returns 400. +func TestHandleJobOutput_InvalidJobID(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=notanumber", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestHandleJobOutput_NonExistentJob tests that non-existent job returns 404. +func TestHandleJobOutput_NonExistentJob(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=99999", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestHandleJobOutput_PollingRunningJob tests polling mode for a running job. +func TestHandleJobOutput_PollingRunningJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a running job + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []struct { + TS string `json:"ts"` + Text string `json:"text"` + LineType string `json:"line_type"` + } `json:"lines"` + HasMore bool `json:"has_more"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.JobID != job.ID { + t.Errorf("Expected job_id %d, got %d", job.ID, resp.JobID) + } + if resp.Status != "running" { + t.Errorf("Expected status 'running', got %q", resp.Status) + } + if !resp.HasMore { + t.Error("Expected has_more=true for running job") + } +} + +// TestHandleJobOutput_PollingCompletedJob tests polling mode for a completed job. +func TestHandleJobOutput_PollingCompletedJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a completed job + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + HasMore bool `json:"has_more"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.Status != "done" { + t.Errorf("Expected status 'done', got %q", resp.Status) + } + if resp.HasMore { + t.Error("Expected has_more=false for completed job") + } +} + +// TestHandleJobOutput_StreamingCompletedJob tests that streaming mode for a +// completed job returns an immediate complete response instead of hanging. +func TestHandleJobOutput_StreamingCompletedJob(t *testing.T) { + db, tmpDir := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + // Create a completed job + repo, err := db.GetOrCreateRepo(filepath.Join(tmpDir, "test-repo")) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Test", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test-agent", "", "") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + result, err := db.Exec(`UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Exec failed: %v", err) + } + if n, _ := result.RowsAffected(); n != 1 { + t.Fatalf("Expected 1 row affected, got %d", n) + } + + req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d&stream=1", job.ID), nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + // Should return immediately with complete message, not hang + if w.Code != http.StatusOK { + t.Fatalf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + Type string `json:"type"` + Status string `json:"status"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("Failed to unmarshal response: %v", err) + } + + if resp.Type != "complete" { + t.Errorf("Expected type 'complete', got %q", resp.Type) + } + if resp.Status != "done" { + t.Errorf("Expected status 'done', got %q", resp.Status) + } +} + +// TestHandleJobOutput_MissingJobID tests that missing job_id returns 400. +func TestHandleJobOutput_MissingJobID(t *testing.T) { + db, _ := testutil.OpenTestDBWithDir(t) + cfg := config.DefaultConfig() + server := NewServer(db, cfg, "") + + req := httptest.NewRequest(http.MethodGet, "/api/job/output", nil) + w := httptest.NewRecorder() + + server.handleJobOutput(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 58f8a54..90cfbcf 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -33,6 +33,9 @@ type WorkerPool struct { pendingCancels map[int64]bool // Jobs canceled before registered runningJobsMu sync.Mutex + // Output capture for tail command + outputBuffers *OutputBuffer + // Test hooks for deterministic synchronization (nil in production) testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup } @@ -49,6 +52,7 @@ func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broad stopCh: make(chan struct{}), runningJobs: make(map[int64]context.CancelFunc), pendingCancels: make(map[int64]bool), + outputBuffers: NewOutputBuffer(512*1024, 4*1024*1024), // 512KB/job, 4MB total } } @@ -80,6 +84,22 @@ func (wp *WorkerPool) MaxWorkers() int { return wp.numWorkers } +// GetJobOutput returns the current output lines for a job. +func (wp *WorkerPool) GetJobOutput(jobID int64) []OutputLine { + return wp.outputBuffers.GetLines(jobID) +} + +// SubscribeJobOutput returns initial lines and a channel for new output. +// Call cancel when done to unsubscribe. +func (wp *WorkerPool) SubscribeJobOutput(jobID int64) ([]OutputLine, <-chan OutputLine, func()) { + return wp.outputBuffers.Subscribe(jobID) +} + +// HasJobOutput returns true if there's active output capture for a job. +func (wp *WorkerPool) HasJobOutput(jobID int64) bool { + return wp.outputBuffers.IsActive(jobID) +} + // CancelJob cancels a running job by its ID, killing the subprocess. // Returns true if the job was canceled or marked for pending cancellation. // Returns false only if the job doesn't exist or isn't in a cancellable state. @@ -315,9 +335,17 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { Agent: agentName, }) + // Create output writer for tail command + normalizer := GetNormalizer(agentName) + outputWriter := wp.outputBuffers.Writer(job.ID, normalizer) + defer func() { + outputWriter.Flush() + wp.outputBuffers.CloseJob(job.ID) + }() + // Run the review log.Printf("[%s] Running %s review...", workerID, agentName) - output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, nil) + output, err := a.Review(ctx, job.RepoPath, job.GitRef, reviewPrompt, outputWriter) if err != nil { // Check if this was a cancellation if ctx.Err() == context.Canceled { diff --git a/internal/storage/reviews_test.go b/internal/storage/reviews_test.go new file mode 100644 index 0000000..7b34820 --- /dev/null +++ b/internal/storage/reviews_test.go @@ -0,0 +1,247 @@ +package storage + +import ( + "database/sql" + "path/filepath" + "testing" + "time" +) + +// TestAddCommentToJobAllStates verifies that comments can be added to jobs +// in any state: queued, running, done, failed, and canceled. +func TestAddCommentToJobAllStates(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + testCases := []struct { + name string + status JobStatus + setupQuery string // SQL to set the job to a specific state + }{ + { + name: "queued job", + status: JobStatusQueued, + setupQuery: "", // Default status is queued + }, + { + name: "running job", + status: JobStatusRunning, + setupQuery: `UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, + }, + { + name: "completed job", + status: JobStatusDone, + setupQuery: `UPDATE review_jobs SET status = 'done', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, + }, + { + name: "failed job", + status: JobStatusFailed, + setupQuery: `UPDATE review_jobs SET status = 'failed', started_at = datetime('now'), finished_at = datetime('now'), error = 'test error' WHERE id = ?`, + }, + { + name: "canceled job", + status: JobStatusCanceled, + setupQuery: `UPDATE review_jobs SET status = 'canceled', started_at = datetime('now'), finished_at = datetime('now') WHERE id = ?`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a job for this test case + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Set job to the desired state + if tc.setupQuery != "" { + _, err = db.Exec(tc.setupQuery, job.ID) + if err != nil { + t.Fatalf("Failed to set job status to %s: %v", tc.status, err) + } + } + + // Verify job is in expected state + var actualStatus string + err = db.QueryRow(`SELECT status FROM review_jobs WHERE id = ?`, job.ID).Scan(&actualStatus) + if err != nil { + t.Fatalf("Failed to verify job status: %v", err) + } + if JobStatus(actualStatus) != tc.status { + t.Fatalf("Expected job status %s, got %s", tc.status, actualStatus) + } + + // Add a comment to the job + comment := "Test comment for " + tc.name + resp, err := db.AddCommentToJob(job.ID, "test-user", comment) + if err != nil { + t.Fatalf("AddCommentToJob failed for %s: %v", tc.name, err) + } + + // Verify the comment was added + if resp == nil { + t.Fatal("Expected non-nil response") + } + if resp.Responder != "test-user" { + t.Errorf("Expected responder 'test-user', got %q", resp.Responder) + } + if resp.Response != comment { + t.Errorf("Expected response %q, got %q", comment, resp.Response) + } + if resp.JobID == nil || *resp.JobID != job.ID { + t.Errorf("Expected job ID %d, got %v", job.ID, resp.JobID) + } + + // Verify we can retrieve the comment + comments, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(comments) != 1 { + t.Fatalf("Expected 1 comment, got %d", len(comments)) + } + if comments[0].Response != comment { + t.Errorf("Retrieved comment mismatch: expected %q, got %q", comment, comments[0].Response) + } + }) + } +} + +// TestAddCommentToJobNonExistent verifies that adding a comment to a +// non-existent job returns an appropriate error. +func TestAddCommentToJobNonExistent(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + // Try to add a comment to a job that doesn't exist + _, err := db.AddCommentToJob(99999, "test-user", "This should fail") + if err == nil { + t.Fatal("Expected error when adding comment to non-existent job") + } + if err != sql.ErrNoRows { + t.Errorf("Expected sql.ErrNoRows, got: %v", err) + } +} + +// TestAddCommentToJobMultipleComments verifies that multiple comments +// can be added to the same job. +func TestAddCommentToJobMultipleComments(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + // Create a job and set it to running (in-progress) + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + _, err = db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job.ID) + if err != nil { + t.Fatalf("Failed to set job to running: %v", err) + } + + // Add multiple comments from different users + comments := []struct { + user string + message string + }{ + {"alice", "First comment while job is running"}, + {"bob", "Second comment from another user"}, + {"alice", "Third comment from alice again"}, + } + + for _, c := range comments { + _, err := db.AddCommentToJob(job.ID, c.user, c.message) + if err != nil { + t.Fatalf("AddCommentToJob failed for %s: %v", c.user, err) + } + } + + // Verify all comments were added + retrieved, err := db.GetCommentsForJob(job.ID) + if err != nil { + t.Fatalf("GetCommentsForJob failed: %v", err) + } + if len(retrieved) != len(comments) { + t.Fatalf("Expected %d comments, got %d", len(comments), len(retrieved)) + } + + // Verify comments are in order + for i, c := range comments { + if retrieved[i].Responder != c.user { + t.Errorf("Comment %d: expected responder %q, got %q", i, c.user, retrieved[i].Responder) + } + if retrieved[i].Response != c.message { + t.Errorf("Comment %d: expected message %q, got %q", i, c.message, retrieved[i].Response) + } + } +} + +// TestAddCommentToJobWithNoReview verifies that comments can be added +// to jobs that have no review (i.e., job exists but has no review record yet). +func TestAddCommentToJobWithNoReview(t *testing.T) { + db := openReviewsTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/tmp/test-repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + commit, err := db.GetOrCreateCommit(repo.ID, "abc123", "Author", "Subject", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit failed: %v", err) + } + + // Create a job (no review is created yet - job is just queued) + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "claude-code", "", "thorough") + if err != nil { + t.Fatalf("EnqueueJob failed: %v", err) + } + + // Verify no review exists for this job + _, err = db.GetReviewByJobID(job.ID) + if err == nil { + t.Fatal("Expected error getting review for job with no review") + } + + // Add a comment to the job (should succeed even without a review) + resp, err := db.AddCommentToJob(job.ID, "test-user", "Comment on job without review") + if err != nil { + t.Fatalf("AddCommentToJob failed: %v", err) + } + if resp == nil { + t.Fatal("Expected non-nil response") + } + if resp.Response != "Comment on job without review" { + t.Errorf("Unexpected response: %q", resp.Response) + } +} + +func openReviewsTestDB(t *testing.T) *DB { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test.db") + + db, err := Open(dbPath) + if err != nil { + t.Fatalf("Failed to open test DB: %v", err) + } + + return db +}