diff --git a/cmd/roborev/main.go b/cmd/roborev/main.go index e84ad3e2..750f0398 100644 --- a/cmd/roborev/main.go +++ b/cmd/roborev/main.go @@ -566,6 +566,7 @@ func reviewCmd() *cobra.Command { repoPath string sha string agent string + model string reasoning string quiet bool dirty bool @@ -748,6 +749,7 @@ Examples: "repo_path": root, "git_ref": gitRef, "agent": agent, + "model": model, "reasoning": reasoning, "diff_content": diffContent, }) @@ -808,6 +810,7 @@ Examples: cmd.Flags().StringVar(&repoPath, "repo", "", "path to git repository (default: current directory)") cmd.Flags().StringVar(&sha, "sha", "HEAD", "commit SHA to review (used when no positional args)") cmd.Flags().StringVar(&agent, "agent", "", "agent to use (codex, claude-code, gemini, copilot, opencode)") + cmd.Flags().StringVar(&model, "model", "", "model for agent (format varies: opencode uses provider/model, others use model name)") cmd.Flags().StringVar(&reasoning, "reasoning", "", "reasoning level: thorough (default), standard, or fast") cmd.Flags().BoolVarP(&quiet, "quiet", "q", false, "suppress output (for use in hooks)") cmd.Flags().BoolVar(&dirty, "dirty", false, "review uncommitted changes instead of a commit") diff --git a/cmd/roborev/main_test.go b/cmd/roborev/main_test.go index bca71055..76de432e 100644 --- a/cmd/roborev/main_test.go +++ b/cmd/roborev/main_test.go @@ -190,6 +190,7 @@ func (f failingAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st func (f failingAgent) WithReasoning(level agent.ReasoningLevel) agent.Agent { return f } func (f failingAgent) WithAgentic(agentic bool) agent.Agent { return f } +func (f failingAgent) WithModel(model string) agent.Agent { return f } func TestEnqueueReviewRefine(t *testing.T) { t.Run("returns job ID on success", func(t *testing.T) { @@ -367,7 +368,7 @@ func TestRunRefineSurfacesResponseErrors(t *testing.T) { } defer os.Chdir(origDir) - if err := runRefine("test", "", 1, true, false, false, ""); err == nil { + if err := runRefine("test", "", "", 1, true, false, false, ""); err == nil { t.Fatal("expected error, got nil") } } @@ -397,7 +398,7 @@ func TestRunRefineQuietNonTTYTimerOutput(t *testing.T) { defer func() { isTerminal = origIsTerminal }() output := captureStdout(t, func() { - if err := runRefine("test", "", 1, true, false, false, ""); err == nil { + if err := runRefine("test", "", "", 1, true, false, false, ""); err == nil { t.Fatal("expected error, got nil") } }) @@ -438,7 +439,7 @@ func TestRunRefineStopsLiveTimerOnAgentError(t *testing.T) { defer agent.Register(agent.NewTestAgent()) output := captureStdout(t, func() { - if err := runRefine("test", "", 1, true, false, false, ""); err == nil { + if err := runRefine("test", "", "", 1, true, false, false, ""); err == nil { t.Fatal("expected error, got nil") } }) @@ -484,7 +485,7 @@ func TestRunRefineAgentErrorRetriesWithoutApplyingChanges(t *testing.T) { output := captureStdout(t, func() { // With 2 iterations and a failing agent, should exhaust iterations - err := runRefine("test", "", 2, true, false, false, "") + err := runRefine("test", "", "", 2, true, false, false, "") if err == nil { t.Fatal("expected error after exhausting iterations, got nil") } @@ -1004,6 +1005,10 @@ func (a *changingAgent) WithAgentic(agentic bool) agent.Agent { return a } +func (a *changingAgent) WithModel(model string) agent.Agent { + return a +} + func TestRefineLoopStaysOnFailedFixChain(t *testing.T) { setupFastPolling(t) repoDir, _ := setupRefineRepo(t) @@ -1192,7 +1197,7 @@ func TestRefineLoopStaysOnFailedFixChain(t *testing.T) { agent.Register(changer) defer agent.Register(agent.NewTestAgent()) - if err := runRefine("test", "", 2, true, false, false, ""); err == nil { + if err := runRefine("test", "", "", 2, true, false, false, ""); err == nil { t.Fatal("expected error from reaching max iterations") } @@ -1392,7 +1397,7 @@ func TestRefinePendingJobWaitDoesNotConsumeIteration(t *testing.T) { // an iteration, this would fail with "max iterations reached". Since the // pending job transitions to Done with a passing review (and no failed // reviews exist), refine should succeed. - err = runRefine("test", "", 1, true, false, false, "") + err = runRefine("test", "", "", 1, true, false, false, "") // Should succeed - all reviews pass after waiting for the pending one if err != nil { diff --git a/cmd/roborev/refine.go b/cmd/roborev/refine.go index 514c7c6d..413f7c1b 100644 --- a/cmd/roborev/refine.go +++ b/cmd/roborev/refine.go @@ -31,6 +31,7 @@ var postCommitWaitDelay = 1 * time.Second func refineCmd() *cobra.Command { var ( agentName string + model string reasoning string maxIterations int quiet bool @@ -61,11 +62,12 @@ Use --since to specify a starting commit when on the main branch or to limit how far back to look for reviews to address.`, RunE: func(cmd *cobra.Command, args []string) error { unsafeFlagChanged := cmd.Flags().Changed("allow-unsafe-agents") - return runRefine(agentName, reasoning, maxIterations, quiet, allowUnsafeAgents, unsafeFlagChanged, since) + return runRefine(agentName, model, reasoning, maxIterations, quiet, allowUnsafeAgents, unsafeFlagChanged, since) }, } cmd.Flags().StringVar(&agentName, "agent", "", "agent to use for addressing findings (default: from config)") + cmd.Flags().StringVar(&model, "model", "", "model for agent (format varies: opencode uses provider/model, others use model name)") cmd.Flags().StringVar(&reasoning, "reasoning", "", "reasoning level: fast, standard (default), or thorough") cmd.Flags().IntVar(&maxIterations, "max-iterations", 10, "maximum refinement iterations") cmd.Flags().BoolVar(&quiet, "quiet", false, "suppress agent output, show elapsed time instead") @@ -185,7 +187,7 @@ func validateRefineContext(since string) (repoPath, currentBranch, defaultBranch return repoPath, currentBranch, defaultBranch, mergeBase, nil } -func runRefine(agentName, reasoningStr string, maxIterations int, quiet bool, allowUnsafeAgents bool, unsafeFlagChanged bool, since string) error { +func runRefine(agentName, modelStr, reasoningStr string, maxIterations int, quiet bool, allowUnsafeAgents bool, unsafeFlagChanged bool, since string) error { // 1. Validate git and branch context (before touching daemon) repoPath, currentBranch, defaultBranch, mergeBase, err := validateRefineContext(since) if err != nil { @@ -225,8 +227,11 @@ func runRefine(agentName, reasoningStr string, maxIterations int, quiet bool, al } reasoningLevel := agent.ParseReasoningLevel(resolvedReasoning) - // Get the agent with configured reasoning level - addressAgent, err := selectRefineAgent(resolvedAgent, reasoningLevel) + // Resolve model from CLI or config + resolvedModel := config.ResolveModel(modelStr, repoPath, cfg) + + // Get the agent with configured reasoning level and model + addressAgent, err := selectRefineAgent(resolvedAgent, reasoningLevel, resolvedModel) if err != nil { return fmt.Errorf("no agent available: %w", err) } @@ -777,18 +782,18 @@ func applyWorktreeChanges(repoPath, worktreePath string) error { return nil } -func selectRefineAgent(resolvedAgent string, reasoningLevel agent.ReasoningLevel) (agent.Agent, error) { +func selectRefineAgent(resolvedAgent string, reasoningLevel agent.ReasoningLevel, model string) (agent.Agent, error) { if resolvedAgent == "codex" && agent.IsAvailable("codex") { baseAgent, err := agent.Get("codex") if err != nil { return nil, err } - return baseAgent.WithReasoning(reasoningLevel), nil + return baseAgent.WithReasoning(reasoningLevel).WithModel(model), nil } baseAgent, err := agent.GetAvailable(resolvedAgent) if err != nil { return nil, err } - return baseAgent.WithReasoning(reasoningLevel), nil + return baseAgent.WithReasoning(reasoningLevel).WithModel(model), nil } diff --git a/cmd/roborev/refine_test.go b/cmd/roborev/refine_test.go index 4975c59c..ecdb0c1e 100644 --- a/cmd/roborev/refine_test.go +++ b/cmd/roborev/refine_test.go @@ -126,7 +126,7 @@ var _ daemon.Client = (*mockDaemonClient)(nil) func TestSelectRefineAgentCodexFallback(t *testing.T) { t.Setenv("PATH", "") - selected, err := selectRefineAgent("codex", agent.ReasoningFast) + selected, err := selectRefineAgent("codex", agent.ReasoningFast, "") if err != nil { t.Fatalf("selectRefineAgent failed: %v", err) } @@ -234,7 +234,7 @@ func TestSelectRefineAgentCodexUsesRequestedReasoning(t *testing.T) { t.Setenv("PATH", tmpDir) - selected, err := selectRefineAgent("codex", agent.ReasoningFast) + selected, err := selectRefineAgent("codex", agent.ReasoningFast, "") if err != nil { t.Fatalf("selectRefineAgent failed: %v", err) } @@ -267,7 +267,7 @@ func TestSelectRefineAgentCodexFallbackUsesRequestedReasoning(t *testing.T) { t.Setenv("PATH", tmpDir) // Request an unavailable agent (claude), codex should be used as fallback - selected, err := selectRefineAgent("claude", agent.ReasoningThorough) + selected, err := selectRefineAgent("claude", agent.ReasoningThorough, "") if err != nil { t.Fatalf("selectRefineAgent failed: %v", err) } diff --git a/cmd/roborev/run.go b/cmd/roborev/run.go index bb7c6805..ad3e0950 100644 --- a/cmd/roborev/run.go +++ b/cmd/roborev/run.go @@ -20,6 +20,7 @@ import ( func runCmd() *cobra.Command { var ( agentName string + model string reasoning string wait bool quiet bool @@ -58,11 +59,12 @@ Examples: cat instructions.txt | roborev run --wait `, RunE: func(cmd *cobra.Command, args []string) error { - return runPrompt(cmd, args, agentName, reasoning, wait, quiet, !noContext, agentic) + return runPrompt(cmd, args, agentName, model, reasoning, wait, quiet, !noContext, agentic) }, } cmd.Flags().StringVar(&agentName, "agent", "", "agent to use (default: from config)") + cmd.Flags().StringVar(&model, "model", "", "model for agent (format varies: opencode uses provider/model, others use model name)") cmd.Flags().StringVar(&reasoning, "reasoning", "", "reasoning level: fast, standard, or thorough (default)") cmd.Flags().BoolVar(&wait, "wait", false, "wait for job to complete and show result") cmd.Flags().BoolVarP(&quiet, "quiet", "q", false, "suppress output (just enqueue)") @@ -81,7 +83,7 @@ func promptCmd() *cobra.Command { return cmd } -func runPrompt(cmd *cobra.Command, args []string, agentName, reasoningStr string, wait, quiet, includeContext, agentic bool) error { +func runPrompt(cmd *cobra.Command, args []string, agentName, modelStr, reasoningStr string, wait, quiet, includeContext, agentic bool) error { // Get prompt from args or stdin var promptText string if len(args) > 0 { @@ -136,6 +138,7 @@ func runPrompt(cmd *cobra.Command, args []string, agentName, reasoningStr string "repo_path": repoRoot, "git_ref": "run", "agent": agentName, + "model": modelStr, "reasoning": reasoningStr, "custom_prompt": fullPrompt, "agentic": agentic, diff --git a/e2e_test.go b/e2e_test.go index fcc0dbc2..93b965b2 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -103,7 +103,7 @@ func TestDatabaseIntegration(t *testing.T) { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "codex", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "codex", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index b3f1e738..b2f7cc48 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -52,6 +52,11 @@ type Agent interface { // In agentic mode, agents can edit files and run commands. // If false, agents operate in read-only review mode. WithAgentic(agentic bool) Agent + + // WithModel returns a copy of the agent configured to use the specified model. + // Agents that don't support model selection may return themselves unchanged. + // For opencode, the model format is "provider/model" (e.g., "anthropic/claude-sonnet-4-20250514"). + WithModel(model string) Agent } // CommandAgent is an agent that uses an external command diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index f16abb3f..3cf85d75 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "strings" "sync" "testing" "time" @@ -207,3 +208,210 @@ type failingWriter struct { func (w *failingWriter) Write(p []byte) (int, error) { return 0, w.err } + +// getAgentModel extracts the model from any agent type +func getAgentModel(a Agent) string { + switch v := a.(type) { + case *CodexAgent: + return v.Model + case *ClaudeAgent: + return v.Model + case *GeminiAgent: + return v.Model + case *CopilotAgent: + return v.Model + case *OpenCodeAgent: + return v.Model + default: + return "" + } +} + +func TestAgentWithModelPersistence(t *testing.T) { + tests := []struct { + name string + newAgent func() Agent + model string + wantModel string + }{ + {"codex", func() Agent { return NewCodexAgent("") }, "o3", "o3"}, + {"claude", func() Agent { return NewClaudeAgent("") }, "opus", "opus"}, + {"gemini", func() Agent { return NewGeminiAgent("") }, "gemini-2.5-pro", "gemini-2.5-pro"}, + {"copilot", func() Agent { return NewCopilotAgent("") }, "gpt-4o", "gpt-4o"}, + {"opencode", func() Agent { return NewOpenCodeAgent("") }, "anthropic/claude-sonnet-4", "anthropic/claude-sonnet-4"}, + } + + for _, tt := range tests { + t.Run(tt.name+"/WithModel sets model", func(t *testing.T) { + a := tt.newAgent().WithModel(tt.model) + if got := getAgentModel(a); got != tt.wantModel { + t.Errorf("got model %q, want %q", got, tt.wantModel) + } + }) + + t.Run(tt.name+"/model persists through WithReasoning", func(t *testing.T) { + a := tt.newAgent().WithModel(tt.model).WithReasoning(ReasoningThorough) + if got := getAgentModel(a); got != tt.wantModel { + t.Errorf("after WithReasoning: got model %q, want %q", got, tt.wantModel) + } + }) + + t.Run(tt.name+"/model persists through WithAgentic", func(t *testing.T) { + a := tt.newAgent().WithModel(tt.model).WithAgentic(true) + if got := getAgentModel(a); got != tt.wantModel { + t.Errorf("after WithAgentic: got model %q, want %q", got, tt.wantModel) + } + }) + + t.Run(tt.name+"/model persists through chained calls", func(t *testing.T) { + a := tt.newAgent().WithModel(tt.model).WithReasoning(ReasoningFast).WithAgentic(true) + if got := getAgentModel(a); got != tt.wantModel { + t.Errorf("after chained calls: got model %q, want %q", got, tt.wantModel) + } + }) + } +} + +// containsSequence checks if args contains needle1 immediately followed by needle2 +func containsSequence(args []string, needle1, needle2 string) bool { + for i := 0; i < len(args)-1; i++ { + if args[i] == needle1 && args[i+1] == needle2 { + return true + } + } + return false +} + +// containsArg checks if args contains the given argument +func containsArg(args []string, needle string) bool { + for _, arg := range args { + if arg == needle { + return true + } + } + return false +} + +func TestAgentBuildArgsWithModel(t *testing.T) { + tests := []struct { + name string + buildFn func(model string) []string + flag string // e.g. "-m" or "--model" + model string + wantFlag bool + }{ + { + name: "codex with model", + buildFn: func(model string) []string { + return (&CodexAgent{Model: model}).buildArgs("/tmp", "/tmp/out.txt", false, true) + }, + flag: "-m", model: "o3", wantFlag: true, + }, + { + name: "codex without model", + buildFn: func(model string) []string { + return (&CodexAgent{Model: model}).buildArgs("/tmp", "/tmp/out.txt", false, true) + }, + flag: "-m", model: "", wantFlag: false, + }, + { + name: "claude with model", + buildFn: func(model string) []string { + return (&ClaudeAgent{Model: model}).buildArgs(false) + }, + flag: "--model", model: "opus", wantFlag: true, + }, + { + name: "claude without model", + buildFn: func(model string) []string { + return (&ClaudeAgent{Model: model}).buildArgs(false) + }, + flag: "--model", model: "", wantFlag: false, + }, + { + name: "gemini with model", + buildFn: func(model string) []string { + return (&GeminiAgent{Model: model}).buildArgs(false) + }, + flag: "-m", model: "gemini-2.5-pro", wantFlag: true, + }, + { + name: "gemini without model", + buildFn: func(model string) []string { + return (&GeminiAgent{Model: model}).buildArgs(false) + }, + flag: "-m", model: "", wantFlag: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + args := tt.buildFn(tt.model) + hasFlag := containsArg(args, tt.flag) + if tt.wantFlag { + if !hasFlag { + t.Errorf("expected flag %q in args %v", tt.flag, args) + } + if !containsSequence(args, tt.flag, tt.model) { + t.Errorf("expected %q %q sequence in args %v", tt.flag, tt.model, args) + } + } else { + if hasFlag { + t.Errorf("expected no flag %q in args %v", tt.flag, args) + } + } + }) + } +} + +func TestCodexBuildArgsModelWithReasoning(t *testing.T) { + a := &CodexAgent{Model: "o4-mini", Reasoning: ReasoningThorough} + args := a.buildArgs("/tmp", "/tmp/out.txt", false, true) + + if !containsSequence(args, "-m", "o4-mini") { + t.Errorf("expected -m o4-mini in args %v", args) + } + if !containsSequence(args, "-c", `model_reasoning_effort="high"`) { + t.Errorf("expected reasoning effort config in args %v", args) + } +} + +func TestOpenCodeReviewPassesModelFlag(t *testing.T) { + skipIfWindows(t) + // Script that echoes its arguments so we can verify --model was passed + script := `#!/bin/sh +echo "args: $@" +` + cmdPath := writeTempCommand(t, script) + a := NewOpenCodeAgent(cmdPath).WithModel("anthropic/claude-sonnet-4") + result, err := a.Review(context.Background(), t.TempDir(), "head", "test prompt", nil) + if err != nil { + t.Fatalf("Review: %v", err) + } + if !strings.Contains(result, "--model") { + t.Errorf("expected --model in args, got: %q", result) + } + if !strings.Contains(result, "anthropic/claude-sonnet-4") { + t.Errorf("expected model value in args, got: %q", result) + } +} + +func TestCopilotReviewPassesModelFlag(t *testing.T) { + skipIfWindows(t) + // Script that echoes its arguments so we can verify --model was passed + script := `#!/bin/sh +echo "args: $@" +` + cmdPath := writeTempCommand(t, script) + a := NewCopilotAgent(cmdPath).WithModel("gpt-4o") + result, err := a.Review(context.Background(), t.TempDir(), "head", "test prompt", nil) + if err != nil { + t.Fatalf("Review: %v", err) + } + if !strings.Contains(result, "--model") { + t.Errorf("expected --model in args, got: %q", result) + } + if !strings.Contains(result, "gpt-4o") { + t.Errorf("expected model value in args, got: %q", result) + } +} diff --git a/internal/agent/claude.go b/internal/agent/claude.go index e0975f25..8e815214 100644 --- a/internal/agent/claude.go +++ b/internal/agent/claude.go @@ -16,6 +16,7 @@ import ( // ClaudeAgent runs code reviews using Claude Code CLI type ClaudeAgent struct { Command string // The claude command to run (default: "claude") + Model string // Model to use (e.g., "opus", "sonnet", or full name) Reasoning ReasoningLevel // Reasoning level (for future extended thinking support) Agentic bool // Whether agentic mode is enabled (allow file edits) } @@ -32,20 +33,36 @@ func NewClaudeAgent(command string) *ClaudeAgent { return &ClaudeAgent{Command: command, Reasoning: ReasoningStandard} } -// WithReasoning returns the agent unchanged (reasoning not supported). +// WithReasoning returns a copy of the agent with the model preserved (reasoning not yet supported). func (a *ClaudeAgent) WithReasoning(level ReasoningLevel) Agent { - return a + return &ClaudeAgent{ + Command: a.Command, + Model: a.Model, + Reasoning: level, + Agentic: a.Agentic, + } } // WithAgentic returns a copy of the agent configured for agentic mode. func (a *ClaudeAgent) WithAgentic(agentic bool) Agent { return &ClaudeAgent{ Command: a.Command, + Model: a.Model, Reasoning: a.Reasoning, Agentic: agentic, } } +// WithModel returns a copy of the agent configured to use the specified model. +func (a *ClaudeAgent) WithModel(model string) Agent { + return &ClaudeAgent{ + Command: a.Command, + Model: model, + Reasoning: a.Reasoning, + Agentic: a.Agentic, + } +} + func (a *ClaudeAgent) Name() string { return "claude-code" } @@ -59,6 +76,10 @@ func (a *ClaudeAgent) buildArgs(agenticMode bool) []string { // (following claude-code-action pattern from Anthropic) args := []string{"-p", "--verbose", "--output-format", "stream-json"} + if a.Model != "" { + args = append(args, "--model", a.Model) + } + if agenticMode { // Agentic mode: Claude can use tools and make file changes args = append(args, claudeDangerousFlag) diff --git a/internal/agent/codex.go b/internal/agent/codex.go index 9efacfd2..30305de1 100644 --- a/internal/agent/codex.go +++ b/internal/agent/codex.go @@ -14,6 +14,7 @@ import ( // CodexAgent runs code reviews using the Codex CLI type CodexAgent struct { Command string // The codex command to run (default: "codex") + Model string // Model to use (e.g., "o3", "o4-mini") Reasoning ReasoningLevel // Reasoning level for the agent Agentic bool // Whether agentic mode is enabled (allow file edits) } @@ -34,18 +35,29 @@ func NewCodexAgent(command string) *CodexAgent { // WithReasoning returns a copy of the agent with the specified reasoning level func (a *CodexAgent) WithReasoning(level ReasoningLevel) Agent { - return &CodexAgent{Command: a.Command, Reasoning: level, Agentic: a.Agentic} + return &CodexAgent{Command: a.Command, Model: a.Model, Reasoning: level, Agentic: a.Agentic} } // WithAgentic returns a copy of the agent configured for agentic mode. func (a *CodexAgent) WithAgentic(agentic bool) Agent { return &CodexAgent{ Command: a.Command, + Model: a.Model, Reasoning: a.Reasoning, Agentic: agentic, } } +// WithModel returns a copy of the agent configured to use the specified model. +func (a *CodexAgent) WithModel(model string) Agent { + return &CodexAgent{ + Command: a.Command, + Model: model, + Reasoning: a.Reasoning, + Agentic: a.Agentic, + } +} + // codexReasoningEffort maps ReasoningLevel to codex-specific effort values func (a *CodexAgent) codexReasoningEffort() string { switch a.Reasoning { @@ -80,6 +92,9 @@ func (a *CodexAgent) buildArgs(repoPath, outputFile string, agenticMode, autoApp "-C", repoPath, "-o", outputFile, ) + if a.Model != "" { + args = append(args, "-m", a.Model) + } if effort := a.codexReasoningEffort(); effort != "" { args = append(args, "-c", fmt.Sprintf(`model_reasoning_effort="%s"`, effort)) } diff --git a/internal/agent/copilot.go b/internal/agent/copilot.go index 537f8199..871bb283 100644 --- a/internal/agent/copilot.go +++ b/internal/agent/copilot.go @@ -11,6 +11,7 @@ import ( // CopilotAgent runs code reviews using the GitHub Copilot CLI type CopilotAgent struct { Command string // The copilot command to run (default: "copilot") + Model string // Model to use Reasoning ReasoningLevel // Reasoning level (for future support) Agentic bool // Whether agentic mode is enabled (note: Copilot requires manual approval for actions) } @@ -23,9 +24,14 @@ func NewCopilotAgent(command string) *CopilotAgent { return &CopilotAgent{Command: command, Reasoning: ReasoningStandard} } -// WithReasoning returns the agent unchanged (reasoning not supported). +// WithReasoning returns a copy of the agent with the model preserved (reasoning not yet supported). func (a *CopilotAgent) WithReasoning(level ReasoningLevel) Agent { - return a + return &CopilotAgent{ + Command: a.Command, + Model: a.Model, + Reasoning: level, + Agentic: a.Agentic, + } } // WithAgentic returns a copy of the agent configured for agentic mode. @@ -34,11 +40,22 @@ func (a *CopilotAgent) WithReasoning(level ReasoningLevel) Agent { func (a *CopilotAgent) WithAgentic(agentic bool) Agent { return &CopilotAgent{ Command: a.Command, + Model: a.Model, Reasoning: a.Reasoning, Agentic: agentic, } } +// WithModel returns a copy of the agent configured to use the specified model. +func (a *CopilotAgent) WithModel(model string) Agent { + return &CopilotAgent{ + Command: a.Command, + Model: model, + Reasoning: a.Reasoning, + Agentic: a.Agentic, + } +} + func (a *CopilotAgent) Name() string { return "copilot" } @@ -49,9 +66,11 @@ func (a *CopilotAgent) CommandName() string { func (a *CopilotAgent) Review(ctx context.Context, repoPath, commitSHA, prompt string, output io.Writer) (string, error) { // Use copilot with --prompt for non-interactive mode - args := []string{ - "--prompt", prompt, + args := []string{} + if a.Model != "" { + args = append(args, "--model", a.Model) } + args = append(args, "--prompt", prompt) cmd := exec.CommandContext(ctx, a.Command, args...) cmd.Dir = repoPath diff --git a/internal/agent/droid.go b/internal/agent/droid.go index 4f6f2571..e7aef733 100644 --- a/internal/agent/droid.go +++ b/internal/agent/droid.go @@ -37,6 +37,11 @@ func (a *DroidAgent) WithAgentic(agentic bool) Agent { } } +// WithModel returns the agent unchanged (model selection not supported for droid). +func (a *DroidAgent) WithModel(model string) Agent { + return a +} + // droidReasoningEffort maps ReasoningLevel to droid-specific effort values func (a *DroidAgent) droidReasoningEffort() string { switch a.Reasoning { diff --git a/internal/agent/gemini.go b/internal/agent/gemini.go index 5c021df6..a93f6e84 100644 --- a/internal/agent/gemini.go +++ b/internal/agent/gemini.go @@ -30,6 +30,7 @@ func truncateStderr(stderr string) string { // GeminiAgent runs code reviews using the Gemini CLI type GeminiAgent struct { Command string // The gemini command to run (default: "gemini") + Model string // Model to use (e.g., "gemini-2.5-pro") Reasoning ReasoningLevel // Reasoning level (for future support) Agentic bool // Whether agentic mode is enabled (allow file edits) } @@ -42,20 +43,36 @@ func NewGeminiAgent(command string) *GeminiAgent { return &GeminiAgent{Command: command, Reasoning: ReasoningStandard} } -// WithReasoning returns the agent unchanged (reasoning not supported). +// WithReasoning returns a copy of the agent with the model preserved (reasoning not yet supported). func (a *GeminiAgent) WithReasoning(level ReasoningLevel) Agent { - return a + return &GeminiAgent{ + Command: a.Command, + Model: a.Model, + Reasoning: level, + Agentic: a.Agentic, + } } // WithAgentic returns a copy of the agent configured for agentic mode. func (a *GeminiAgent) WithAgentic(agentic bool) Agent { return &GeminiAgent{ Command: a.Command, + Model: a.Model, Reasoning: a.Reasoning, Agentic: agentic, } } +// WithModel returns a copy of the agent configured to use the specified model. +func (a *GeminiAgent) WithModel(model string) Agent { + return &GeminiAgent{ + Command: a.Command, + Model: model, + Reasoning: a.Reasoning, + Agentic: a.Agentic, + } +} + func (a *GeminiAgent) Name() string { return "gemini" } @@ -68,6 +85,10 @@ func (a *GeminiAgent) buildArgs(agenticMode bool) []string { // Use stream-json output for parsing, prompt via stdin args := []string{"--output-format", "stream-json"} + if a.Model != "" { + args = append(args, "-m", a.Model) + } + if agenticMode { // Agentic mode: auto-approve all actions, allow write tools args = append(args, "--yolo") diff --git a/internal/agent/opencode.go b/internal/agent/opencode.go index fefe00b4..7a898aa3 100644 --- a/internal/agent/opencode.go +++ b/internal/agent/opencode.go @@ -3,14 +3,17 @@ package agent import ( "bytes" "context" + "encoding/json" "fmt" "io" "os/exec" + "strings" ) // OpenCodeAgent runs code reviews using the OpenCode CLI type OpenCodeAgent struct { Command string // The opencode command to run (default: "opencode") + Model string // Model to use (provider/model format, e.g., "anthropic/claude-sonnet-4-20250514") Reasoning ReasoningLevel // Reasoning level (for future support) Agentic bool // Whether agentic mode is enabled (OpenCode auto-approves in non-interactive mode) } @@ -23,9 +26,14 @@ func NewOpenCodeAgent(command string) *OpenCodeAgent { return &OpenCodeAgent{Command: command, Reasoning: ReasoningStandard} } -// WithReasoning returns the agent unchanged (reasoning not supported). +// WithReasoning returns a copy of the agent with the model preserved (reasoning not yet supported). func (a *OpenCodeAgent) WithReasoning(level ReasoningLevel) Agent { - return a + return &OpenCodeAgent{ + Command: a.Command, + Model: a.Model, + Reasoning: level, + Agentic: a.Agentic, + } } // WithAgentic returns a copy of the agent configured for agentic mode. @@ -34,11 +42,22 @@ func (a *OpenCodeAgent) WithReasoning(level ReasoningLevel) Agent { func (a *OpenCodeAgent) WithAgentic(agentic bool) Agent { return &OpenCodeAgent{ Command: a.Command, + Model: a.Model, Reasoning: a.Reasoning, Agentic: agentic, } } +// WithModel returns a copy of the agent configured to use the specified model. +func (a *OpenCodeAgent) WithModel(model string) Agent { + return &OpenCodeAgent{ + Command: a.Command, + Model: model, + Reasoning: a.Reasoning, + Agentic: a.Agentic, + } +} + func (a *OpenCodeAgent) Name() string { return "opencode" } @@ -47,6 +66,44 @@ func (a *OpenCodeAgent) CommandName() string { return a.Command } +// filterOpencodeToolCallLines removes LLM tool-call JSON lines that may appear in stdout. +// When LLM providers stream responses, raw tool calls in the standard format +// {"name":"...","arguments":{...}} (exactly 2 keys) may occasionally leak through +// to stdout. Normally OpenCode formats these with ANSI codes, but edge cases +// (streaming glitches, format failures) can expose the raw JSON. +// We filter lines matching this exact format while preserving legitimate JSON +// examples which would have additional keys. +func filterOpencodeToolCallLines(s string) string { + var out []string + for _, line := range strings.Split(s, "\n") { + if isOpencodeToolCallLine(line) { + continue + } + out = append(out, line) + } + // Only trim trailing newlines to preserve leading indentation in code blocks + return strings.TrimRight(strings.Join(out, "\n"), "\r\n") +} + +func isOpencodeToolCallLine(line string) bool { + trimmed := strings.TrimSpace(line) + if !strings.HasPrefix(trimmed, "{") { + return false + } + var m map[string]json.RawMessage + if err := json.Unmarshal([]byte(trimmed), &m); err != nil { + return false + } + // Tool calls have exactly "name" and "arguments" keys, nothing else. + // This avoids stripping legitimate JSON examples that happen to include these keys. + if len(m) != 2 { + return false + } + _, hasName := m["name"] + _, hasArgs := m["arguments"] + return hasName && hasArgs +} + func (a *OpenCodeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt string, output io.Writer) (string, error) { // OpenCode CLI supports a headless invocation via `opencode run [message..]`. // We run it from the repo root so it can use project context, and pass the full @@ -55,7 +112,11 @@ func (a *OpenCodeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt // Helpful reference: // opencode --help // opencode run --help - args := []string{"run", "--format", "default", prompt} + args := []string{"run", "--format", "default"} + if a.Model != "" { + args = append(args, "--model", a.Model) + } + args = append(args, prompt) cmd := exec.CommandContext(ctx, a.Command, args...) cmd.Dir = repoPath @@ -79,11 +140,10 @@ func (a *OpenCodeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt ) } - result := stdout.String() + result := filterOpencodeToolCallLines(stdout.String()) if len(result) == 0 { return "No review output generated", nil } - return result, nil } diff --git a/internal/agent/opencode_test.go b/internal/agent/opencode_test.go new file mode 100644 index 00000000..e73449d8 --- /dev/null +++ b/internal/agent/opencode_test.go @@ -0,0 +1,99 @@ +package agent + +import ( + "context" + "strings" + "testing" +) + +func TestFilterOpencodeToolCallLines(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "only tool-call lines", + input: `{"name":"read","arguments":{"path":"/foo"}}` + "\n" + `{"name":"edit","arguments":{}}`, + expected: "", + }, + { + name: "only normal text", + input: "**Review:** No issues.\nDone.", + expected: "**Review:** No issues.\nDone.", + }, + { + name: "mixed", + input: `{"name":"read","arguments":{}}` + "\n" + "Real text\n" + `{"name":"edit","arguments":{}}`, + expected: "Real text", + }, + { + name: "empty", + input: "", + expected: "", + }, + { + name: "only newlines", + input: "\n\n", + expected: "", + }, + { + name: "JSON without arguments", + input: `{"name":"foo"}`, + expected: `{"name":"foo"}`, + }, + { + name: "JSON without name", + input: `{"arguments":{}}`, + expected: `{"arguments":{}}`, + }, + { + name: "JSON with name and arguments plus extra keys preserved", + input: `{"name":"example","arguments":{"foo":"bar"},"description":"This is a JSON example"}`, + expected: `{"name":"example","arguments":{"foo":"bar"},"description":"This is a JSON example"}`, + }, + { + name: "leading indentation preserved", + input: " indented line\n more indented", + expected: " indented line\n more indented", + }, + { + name: "code block with JSON example preserved", + input: "Here's an example:\n```json\n{\"name\":\"test\",\"arguments\":{},\"extra\":true}\n```", + expected: "Here's an example:\n```json\n{\"name\":\"test\",\"arguments\":{},\"extra\":true}\n```", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := filterOpencodeToolCallLines(tt.input) + if got != tt.expected { + t.Errorf("filterOpencodeToolCallLines(%q) = %q, want %q", tt.input, got, tt.expected) + } + }) + } +} + +func TestOpenCodeReviewFiltersToolCallLines(t *testing.T) { + skipIfWindows(t) + script := `#!/bin/sh +printf '%s\n' '{"name":"read","arguments":{"path":"/foo"}}' +echo "**Review:** Fix the typo." +printf '%s\n' '{"name":"edit","arguments":{}}' +echo "Done." +` + cmdPath := writeTempCommand(t, script) + a := NewOpenCodeAgent(cmdPath) + result, err := a.Review(context.Background(), t.TempDir(), "head", "prompt", nil) + if err != nil { + t.Fatalf("Review: %v", err) + } + if !strings.Contains(result, "**Review:**") { + t.Errorf("result missing **Review:**: %q", result) + } + if !strings.Contains(result, "Done.") { + t.Errorf("result missing Done.: %q", result) + } + if strings.Contains(result, `"name":"read"`) { + t.Errorf("result should not contain tool-call JSON: %q", result) + } +} diff --git a/internal/agent/test_agent.go b/internal/agent/test_agent.go index 146bb4aa..9a052141 100644 --- a/internal/agent/test_agent.go +++ b/internal/agent/test_agent.go @@ -39,6 +39,11 @@ func (a *TestAgent) WithAgentic(agentic bool) Agent { return a } +// WithModel returns the agent unchanged (model selection not supported for test agent). +func (a *TestAgent) WithModel(model string) Agent { + return a +} + func (a *TestAgent) Name() string { return "test" } diff --git a/internal/config/config.go b/internal/config/config.go index 066d14c9..7545c353 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,6 +18,7 @@ type Config struct { MaxWorkers int `toml:"max_workers"` ReviewContextCount int `toml:"review_context_count"` DefaultAgent string `toml:"default_agent"` + DefaultModel string `toml:"default_model"` // Default model for agents (format varies by agent) JobTimeoutMinutes int `toml:"job_timeout_minutes"` AllowUnsafeAgents *bool `toml:"allow_unsafe_agents"` // nil = not set, allows commands to choose their own default @@ -96,6 +97,7 @@ func (c *SyncConfig) Validate() []string { // RepoConfig holds per-repo overrides type RepoConfig struct { Agent string `toml:"agent"` + Model string `toml:"model"` // Model for agents (format varies by agent) ReviewContextCount int `toml:"review_context_count"` ReviewGuidelines string `toml:"review_guidelines"` JobTimeoutMinutes int `toml:"job_timeout_minutes"` @@ -275,6 +277,27 @@ func ResolveRefineReasoning(explicit string, repoPath string) (string, error) { return "standard", nil // Default for refine: balanced analysis } +// ResolveModel determines which model to use based on config priority: +// 1. Explicit model parameter (if non-empty) +// 2. Per-repo config (model in .roborev.toml) +// 3. Global config (default_model in config.toml) +// 4. Default (empty string, agent uses its default) +func ResolveModel(explicit string, repoPath string, globalCfg *Config) string { + if strings.TrimSpace(explicit) != "" { + return strings.TrimSpace(explicit) + } + + if repoCfg, err := LoadRepoConfig(repoPath); err == nil && repoCfg != nil && strings.TrimSpace(repoCfg.Model) != "" { + return strings.TrimSpace(repoCfg.Model) + } + + if globalCfg != nil && strings.TrimSpace(globalCfg.DefaultModel) != "" { + return strings.TrimSpace(globalCfg.DefaultModel) + } + + return "" +} + // SaveGlobal saves the global configuration func SaveGlobal(cfg *Config) error { path := GlobalConfigPath() diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f7a56b81..3dc88a1b 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -942,6 +942,145 @@ func TestResolveRepoIdentity(t *testing.T) { }) } +func TestResolveModel(t *testing.T) { + t.Run("explicit model takes precedence", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("explicit-model", tmpDir, cfg) + if model != "explicit-model" { + t.Errorf("Expected 'explicit-model', got '%s'", model) + } + }) + + t.Run("explicit with whitespace is trimmed", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel(" explicit-model ", tmpDir, cfg) + if model != "explicit-model" { + t.Errorf("Expected 'explicit-model', got '%s'", model) + } + }) + + t.Run("empty explicit falls back to repo config", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`model = "repo-model"`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("", tmpDir, cfg) + if model != "repo-model" { + t.Errorf("Expected 'repo-model' from repo config, got '%s'", model) + } + }) + + t.Run("repo config with whitespace is trimmed", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`model = " repo-model "`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{} + model := ResolveModel("", tmpDir, cfg) + if model != "repo-model" { + t.Errorf("Expected 'repo-model', got '%s'", model) + } + }) + + t.Run("no repo config falls back to global config", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("", tmpDir, cfg) + if model != "global-model" { + t.Errorf("Expected 'global-model' from global config, got '%s'", model) + } + }) + + t.Run("global config with whitespace is trimmed", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &Config{DefaultModel: " global-model "} + model := ResolveModel("", tmpDir, cfg) + if model != "global-model" { + t.Errorf("Expected 'global-model', got '%s'", model) + } + }) + + t.Run("no config returns empty", func(t *testing.T) { + tmpDir := t.TempDir() + model := ResolveModel("", tmpDir, nil) + if model != "" { + t.Errorf("Expected empty string when no config, got '%s'", model) + } + }) + + t.Run("empty global config returns empty", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &Config{DefaultModel: ""} + model := ResolveModel("", tmpDir, cfg) + if model != "" { + t.Errorf("Expected empty string when global model is empty, got '%s'", model) + } + }) + + t.Run("whitespace-only explicit falls through to repo config", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`model = "repo-model"`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel(" ", tmpDir, cfg) + if model != "repo-model" { + t.Errorf("Expected 'repo-model' when explicit is whitespace, got '%s'", model) + } + }) + + t.Run("whitespace-only repo config falls through to global", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`model = " "`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("", tmpDir, cfg) + if model != "global-model" { + t.Errorf("Expected 'global-model' when repo model is whitespace, got '%s'", model) + } + }) + + t.Run("explicit overrides repo config", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`model = "repo-model"`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("explicit-model", tmpDir, cfg) + if model != "explicit-model" { + t.Errorf("Expected 'explicit-model', got '%s'", model) + } + }) + + t.Run("malformed repo config falls through to global", func(t *testing.T) { + tmpDir := t.TempDir() + repoConfig := filepath.Join(tmpDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`this is not valid toml {{{`), 0644); err != nil { + t.Fatalf("Failed to write repo config: %v", err) + } + + cfg := &Config{DefaultModel: "global-model"} + model := ResolveModel("", tmpDir, cfg) + if model != "global-model" { + t.Errorf("Expected 'global-model' when repo config is malformed, got '%s'", model) + } + }) +} + func TestStripURLCredentials(t *testing.T) { tests := []struct { name string diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 5d1db970..eae2bdb4 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -294,6 +294,7 @@ type EnqueueRequest struct { CommitSHA string `json:"commit_sha,omitempty"` // Single commit (for backwards compat) GitRef string `json:"git_ref,omitempty"` // Single commit, range like "abc..def", or "dirty" Agent string `json:"agent,omitempty"` + Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format) DiffContent string `json:"diff_content,omitempty"` // Pre-captured diff for dirty reviews Reasoning string `json:"reasoning,omitempty"` // Reasoning level: thorough, standard, fast CustomPrompt string `json:"custom_prompt,omitempty"` // Custom prompt for ad-hoc agent work @@ -396,6 +397,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Resolve agent (uses main repo root for config lookup) agentName := config.ResolveAgent(req.Agent, repoRoot, s.configWatcher.Config()) + // Resolve model (uses main repo root for config lookup) + model := config.ResolveModel(req.Model, repoRoot, s.configWatcher.Config()) + // Resolve reasoning level (uses main repo root for config lookup) reasoning, err := config.ResolveReviewReasoning(req.Reasoning, repoRoot) if err != nil { @@ -426,14 +430,14 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { var job *storage.ReviewJob if isPrompt { // Custom prompt job - use provided prompt directly - job, err = s.db.EnqueuePromptJob(repo.ID, agentName, reasoning, req.CustomPrompt, req.Agentic) + job, err = s.db.EnqueuePromptJob(repo.ID, agentName, model, reasoning, req.CustomPrompt, req.Agentic) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue prompt job: %v", err)) return } } else if isDirty { // Dirty review - use pre-captured diff - job, err = s.db.EnqueueDirtyJob(repo.ID, gitRef, agentName, reasoning, req.DiffContent) + job, err = s.db.EnqueueDirtyJob(repo.ID, gitRef, agentName, model, reasoning, req.DiffContent) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue dirty job: %v", err)) return @@ -455,7 +459,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Store as full SHA range fullRef := startSHA + ".." + endSHA - job, err = s.db.EnqueueRangeJob(repo.ID, fullRef, agentName, reasoning) + job, err = s.db.EnqueueRangeJob(repo.ID, fullRef, agentName, model, reasoning) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) return @@ -482,7 +486,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - job, err = s.db.EnqueueJob(repo.ID, commit.ID, sha, agentName, reasoning) + job, err = s.db.EnqueueJob(repo.ID, commit.ID, sha, agentName, model, reasoning) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) return diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 1cb652d6..b37157ab 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -193,7 +193,7 @@ func TestHandleListRepos(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "test", ""); err != nil { + if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "test", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -205,7 +205,7 @@ func TestHandleListRepos(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "test", ""); err != nil { + if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "test", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -284,7 +284,7 @@ func TestHandleListJobsWithFilter(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "test", ""); err != nil { + if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "test", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -296,7 +296,7 @@ func TestHandleListJobsWithFilter(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "test", ""); err != nil { + if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "test", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -582,7 +582,7 @@ func TestHandleCancelJob(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "canceltest", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "canceltest", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -661,7 +661,7 @@ func TestHandleCancelJob(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job2, err := db.EnqueueJob(repo.ID, commit2.ID, "cancelrunning", "test", "") + job2, err := db.EnqueueJob(repo.ID, commit2.ID, "cancelrunning", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -706,7 +706,7 @@ func TestListJobsPagination(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - _, err = db.EnqueueJob(repo.ID, commit.ID, fmt.Sprintf("sha%d", i), "test", "") + _, err = db.EnqueueJob(repo.ID, commit.ID, fmt.Sprintf("sha%d", i), "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -830,7 +830,7 @@ func TestListJobsWithGitRefFilter(t *testing.T) { refs := []string{"abc123", "def456", "abc123..def456"} for _, ref := range refs { commit, _ := db.GetOrCreateCommit(repo.ID, ref, "A", "S", time.Now()) - db.EnqueueJob(repo.ID, commit.ID, ref, "codex", "") + db.EnqueueJob(repo.ID, commit.ID, ref, "codex", "", "") } t.Run("git_ref filter returns matching job", func(t *testing.T) { @@ -1360,7 +1360,7 @@ func TestHandleRerunJob(t *testing.T) { t.Run("rerun failed job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-failed", "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-failed", "test", "", "") db.ClaimJob("worker-1") db.FailJob(job.ID, "some error") @@ -1385,7 +1385,7 @@ func TestHandleRerunJob(t *testing.T) { t.Run("rerun canceled job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-canceled", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-canceled", "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-canceled", "test", "", "") db.CancelJob(job.ID) reqBody, _ := json.Marshal(RerunJobRequest{JobID: job.ID}) @@ -1409,7 +1409,7 @@ func TestHandleRerunJob(t *testing.T) { t.Run("rerun done job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-done", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-done", "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-done", "test", "", "") // Claim and complete job var claimed *storage.ReviewJob for { @@ -1445,7 +1445,7 @@ func TestHandleRerunJob(t *testing.T) { t.Run("rerun queued job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-queued", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-queued", "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-queued", "test", "", "") reqBody, _ := json.Marshal(RerunJobRequest{JobID: job.ID}) req := httptest.NewRequest(http.MethodPost, "/api/job/rerun", bytes.NewReader(reqBody)) diff --git a/internal/daemon/stream_integration_test.go b/internal/daemon/stream_integration_test.go index 61c0f6ab..c3b520a8 100644 --- a/internal/daemon/stream_integration_test.go +++ b/internal/daemon/stream_integration_test.go @@ -183,7 +183,7 @@ func TestBroadcasterIntegrationWithWorker(t *testing.T) { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "testsha", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "testsha", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 5b54f1f3..58f8a545 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -296,7 +296,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { reasoning = "thorough" } reasoningLevel := agent.ParseReasoningLevel(reasoning) - a := baseAgent.WithReasoning(reasoningLevel).WithAgentic(job.Agentic) + a := baseAgent.WithReasoning(reasoningLevel).WithAgentic(job.Agentic).WithModel(job.Model) // Use the actual agent name (may differ from requested if fallback occurred) agentName := a.Name() diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 5a0530ef..4b338110 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -29,7 +29,7 @@ func TestWorkerPoolE2E(t *testing.T) { } // Enqueue a job with test agent - job, err := db.EnqueueJob(repo.ID, commit.ID, "testsha123", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "testsha123", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -88,7 +88,7 @@ func TestWorkerPoolConcurrency(t *testing.T) { for i := 0; i < 5; i++ { sha := "concurrentsha" + string(rune('0'+i)) commit, _ := db.GetOrCreateCommit(repo.ID, sha, "Author", "Subject", time.Now()) - db.EnqueueJob(repo.ID, commit.ID, sha, "test", "") + db.EnqueueJob(repo.ID, commit.ID, sha, "test", "", "") } broadcaster := NewBroadcaster() @@ -119,7 +119,7 @@ func TestWorkerPoolCancelRunningJob(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "cancelsha", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "cancelsha", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -190,7 +190,7 @@ func TestWorkerPoolPendingCancellation(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "pending-cancel", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "pending-cancel", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -250,7 +250,7 @@ func TestWorkerPoolPendingCancellationAfterDBCancel(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "api-cancel-race", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "api-cancel-race", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -341,7 +341,7 @@ func TestWorkerPoolCancelJobFinishedDuringWindow(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "finish-window", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "finish-window", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -395,7 +395,7 @@ func TestWorkerPoolCancelJobRegisteredDuringCheck(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "register-during", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "register-during", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -446,7 +446,7 @@ func TestWorkerPoolCancelJobConcurrentRegister(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, sha, "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, sha, "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -497,7 +497,7 @@ func TestWorkerPoolCancelJobFinalCheckDeadlockSafe(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "deadlock-test", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "deadlock-test", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } diff --git a/internal/prompt/prompt_test.go b/internal/prompt/prompt_test.go index 2ce907de..54f800e1 100644 --- a/internal/prompt/prompt_test.go +++ b/internal/prompt/prompt_test.go @@ -127,7 +127,7 @@ func TestBuildPromptWithPreviousReviews(t *testing.T) { // Create review for some commits if reviewText, ok := reviewTexts[i]; ok { - job, err := db.EnqueueJob(repo.ID, commit.ID, sha, "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, sha, "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -208,7 +208,7 @@ func TestBuildPromptWithPreviousReviewsAndResponses(t *testing.T) { // Create review for commit 3 (parent of commit 6) with responses parentSHA := commits[2] // commit 3 commit3, _ := db.GetOrCreateCommit(repo.ID, parentSHA, "Test", "commit 3", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit3.ID, parentSHA, "test", "") + job, _ := db.EnqueueJob(repo.ID, commit3.ID, parentSHA, "test", "", "") db.ClaimJob("test-worker") db.CompleteJob(job.ID, "test", "prompt", "Found potential memory leak in connection pool") @@ -339,7 +339,7 @@ func TestPromptContainsExpectedFormat(t *testing.T) { repo, _ := db.GetOrCreateRepo(repoPath) commit, _ := db.GetOrCreateCommit(repo.ID, commits[4], "Test", "test", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, commits[4], "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, commits[4], "test", "", "") db.ClaimJob("test-worker") db.CompleteJob(job.ID, "test", "prompt", "Found 1 issue:\n1. pkg/cache/store.go:112 - Race condition") @@ -523,7 +523,7 @@ func TestBuildPromptWithPreviousAttempts(t *testing.T) { } for _, reviewText := range reviewTexts { - job, err := db.EnqueueJob(repo.ID, commit.ID, targetSHA, "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, targetSHA, "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -581,7 +581,7 @@ func TestBuildPromptWithPreviousAttemptsAndResponses(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, targetSHA, "Test", "test", time.Now()) // Create a previous review - job, _ := db.EnqueueJob(repo.ID, commit.ID, targetSHA, "test", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, targetSHA, "test", "", "") db.ClaimJob("test-worker") db.CompleteJob(job.ID, "test", "prompt", "Found issue: missing null check") diff --git a/internal/storage/db.go b/internal/storage/db.go index 96d567f9..b4d14d7c 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -37,6 +37,7 @@ CREATE TABLE IF NOT EXISTS review_jobs ( commit_id INTEGER REFERENCES commits(id), git_ref TEXT NOT NULL, agent TEXT NOT NULL DEFAULT 'codex', + model TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -188,6 +189,18 @@ func (db *DB) migrate() error { } } + // Migration: add model column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'model'`).Scan(&count) + if err != nil { + return fmt.Errorf("check model column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN model TEXT`) + if err != nil { + return fmt.Errorf("add model column: %w", err) + } + } + // Migration: update CHECK constraint to include 'canceled' status // SQLite requires table recreation to modify CHECK constraints var tableSql string @@ -228,6 +241,7 @@ func (db *DB) migrate() error { commit_id INTEGER REFERENCES commits(id), git_ref TEXT NOT NULL, agent TEXT NOT NULL DEFAULT 'codex', + model TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -246,8 +260,8 @@ func (db *DB) migrate() error { } // Check which optional columns exist in source table - var hasDiffContent, hasReasoning, hasAgentic bool - checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic')`) + var hasDiffContent, hasReasoning, hasAgentic, hasModel bool + checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic', 'model')`) if checkErr == nil { for checkRows.Next() { var colName string @@ -259,6 +273,8 @@ func (db *DB) migrate() error { hasReasoning = true case "agentic": hasAgentic = true + case "model": + hasModel = true } } checkRows.Close() @@ -271,6 +287,14 @@ func (db *DB) migrate() error { if hasReasoning { cols = "id, repo_id, commit_id, git_ref, agent, reasoning, status, enqueued_at, started_at, finished_at, worker_id, error, prompt, retry_count" } + if hasModel { + // Insert model after agent + if hasReasoning { + cols = "id, repo_id, commit_id, git_ref, agent, model, reasoning, status, enqueued_at, started_at, finished_at, worker_id, error, prompt, retry_count" + } else { + cols = "id, repo_id, commit_id, git_ref, agent, model, status, enqueued_at, started_at, finished_at, worker_id, error, prompt, retry_count" + } + } if hasDiffContent { cols += ", diff_content" } diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index bf54d9c6..210f32bc 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -100,7 +100,7 @@ func TestJobLifecycle(t *testing.T) { } // Enqueue job - job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "codex", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "codex", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -162,7 +162,7 @@ func TestJobFailure(t *testing.T) { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "def456", "codex", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "def456", "codex", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -200,7 +200,7 @@ func TestReviewOperations(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "rev123", "codex", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "rev123", "codex", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -233,7 +233,7 @@ func TestReviewVerdictComputation(t *testing.T) { t.Run("verdict populated when output exists and no error", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-pass", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-pass", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-pass", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "the prompt", "No issues found. The code looks good.") @@ -251,7 +251,7 @@ func TestReviewVerdictComputation(t *testing.T) { t.Run("verdict nil when output is empty", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-empty", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-empty", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-empty", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "the prompt", "") // empty output @@ -266,7 +266,7 @@ func TestReviewVerdictComputation(t *testing.T) { t.Run("verdict nil when job has error", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-error", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-error", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-error", "codex", "", "") db.ClaimJob("worker-1") db.FailJob(job.ID, "API rate limit exceeded") @@ -287,7 +287,7 @@ func TestReviewVerdictComputation(t *testing.T) { t.Run("GetReviewByCommitSHA also respects verdict guard", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-sha", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-sha", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-sha", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "the prompt", "No issues found.") @@ -338,7 +338,7 @@ func TestMarkReviewAddressed(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/test-repo") commit, _ := db.GetOrCreateCommit(repo.ID, "addr123", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "addr123", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "addr123", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "prompt", "output") @@ -403,12 +403,12 @@ func TestJobCounts(t *testing.T) { for i := 0; i < 3; i++ { sha := fmt.Sprintf("queued%d", i) commit, _ := db.GetOrCreateCommit(repo.ID, sha, "A", "S", time.Now()) - db.EnqueueJob(repo.ID, commit.ID, sha, "codex", "") + db.EnqueueJob(repo.ID, commit.ID, sha, "codex", "", "") } // Create a job, claim it, and complete it commit, _ := db.GetOrCreateCommit(repo.ID, "done1", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "done1", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "done1", "codex", "", "") _, _ = db.ClaimJob("w1") // Claims oldest queued job (one of queued0-2) _, _ = db.ClaimJob("w1") // Claims next _, _ = db.ClaimJob("w1") // Claims next @@ -419,7 +419,7 @@ func TestJobCounts(t *testing.T) { // Create a job, claim it, and fail it commit2, _ := db.GetOrCreateCommit(repo.ID, "fail1", "A", "S", time.Now()) - _, _ = db.EnqueueJob(repo.ID, commit2.ID, "fail1", "codex", "") + _, _ = db.EnqueueJob(repo.ID, commit2.ID, "fail1", "codex", "", "") claimed2, _ := db.ClaimJob("w2") if claimed2 != nil { db.FailJob(claimed2.ID, "err") @@ -450,7 +450,7 @@ func TestCountStalledJobs(t *testing.T) { // Create a job and claim it (makes it running with current timestamp) commit1, _ := db.GetOrCreateCommit(repo.ID, "recent1", "A", "S", time.Now()) - db.EnqueueJob(repo.ID, commit1.ID, "recent1", "codex", "") + db.EnqueueJob(repo.ID, commit1.ID, "recent1", "codex", "", "") _, _ = db.ClaimJob("worker-1") // No stalled jobs yet (just started) @@ -465,7 +465,7 @@ func TestCountStalledJobs(t *testing.T) { // Create a job and manually set started_at to 1 hour ago (simulating stalled job) // Use UTC format (ends with Z) to test basic case commit2, _ := db.GetOrCreateCommit(repo.ID, "stalled1", "A", "S", time.Now()) - job2, _ := db.EnqueueJob(repo.ID, commit2.ID, "stalled1", "codex", "") + job2, _ := db.EnqueueJob(repo.ID, commit2.ID, "stalled1", "codex", "", "") oldTimeUTC := time.Now().Add(-1 * time.Hour).UTC().Format(time.RFC3339) _, err = db.Exec(`UPDATE review_jobs SET status = 'running', started_at = ? WHERE id = ?`, oldTimeUTC, job2.ID) if err != nil { @@ -484,7 +484,7 @@ func TestCountStalledJobs(t *testing.T) { // Create another stalled job with a non-UTC timezone offset to verify datetime() handles offsets // This exercises the fix for RFC3339 timestamps with timezone offsets like "-07:00" commit3, _ := db.GetOrCreateCommit(repo.ID, "stalled2", "A", "S", time.Now()) - job3, _ := db.EnqueueJob(repo.ID, commit3.ID, "stalled2", "codex", "") + job3, _ := db.EnqueueJob(repo.ID, commit3.ID, "stalled2", "codex", "", "") // Use a fixed timezone offset (e.g., UTC-7) instead of UTC tzMinus7 := time.FixedZone("UTC-7", -7*60*60) oldTimeWithOffset := time.Now().Add(-1 * time.Hour).In(tzMinus7).Format(time.RFC3339) @@ -518,7 +518,7 @@ func TestRetryJob(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/test-repo") commit, _ := db.GetOrCreateCommit(repo.ID, "retry123", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry123", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry123", "codex", "", "") // Claim the job (makes it running) _, _ = db.ClaimJob("worker-1") @@ -576,7 +576,7 @@ func TestRetryJobOnlyWorksForRunning(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/test-repo") commit, _ := db.GetOrCreateCommit(repo.ID, "retry-status", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry-status", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry-status", "codex", "", "") // Try to retry a queued job (should fail - not running) retried, err := db.RetryJob(job.ID, 3) @@ -606,7 +606,7 @@ func TestRetryJobAtomic(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/test-repo") commit, _ := db.GetOrCreateCommit(repo.ID, "retry-atomic", "Author", "Subject", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry-atomic", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "retry-atomic", "codex", "", "") _, _ = db.ClaimJob("worker-1") // Simulate two concurrent retries - only first should succeed @@ -636,7 +636,7 @@ func TestCancelJob(t *testing.T) { t.Run("cancel queued job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-queued", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-queued", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-queued", "codex", "", "") err := db.CancelJob(job.ID) if err != nil { @@ -651,7 +651,7 @@ func TestCancelJob(t *testing.T) { t.Run("cancel running job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-running", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-running", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-running", "codex", "", "") db.ClaimJob("worker-1") err := db.CancelJob(job.ID) @@ -667,7 +667,7 @@ func TestCancelJob(t *testing.T) { t.Run("cancel done job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-done", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-done", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-done", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "prompt", "output") @@ -679,7 +679,7 @@ func TestCancelJob(t *testing.T) { t.Run("cancel failed job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-failed", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-failed", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-failed", "codex", "", "") db.ClaimJob("worker-1") db.FailJob(job.ID, "some error") @@ -691,7 +691,7 @@ func TestCancelJob(t *testing.T) { t.Run("complete respects canceled status", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "complete-canceled", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "complete-canceled", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "complete-canceled", "codex", "", "") db.ClaimJob("worker-1") db.CancelJob(job.ID) @@ -714,7 +714,7 @@ func TestCancelJob(t *testing.T) { t.Run("fail respects canceled status", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "fail-canceled", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "fail-canceled", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "fail-canceled", "codex", "", "") db.ClaimJob("worker-1") db.CancelJob(job.ID) @@ -730,7 +730,7 @@ func TestCancelJob(t *testing.T) { t.Run("canceled jobs counted correctly", func(t *testing.T) { // Create and cancel a new job commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-count", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-count", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cancel-count", "codex", "", "") db.CancelJob(job.ID) _, _, _, _, canceled, err := db.GetJobCounts() @@ -844,7 +844,7 @@ func TestMigrationFromOldSchema(t *testing.T) { // Verify the new constraint allows 'canceled' status repo, _ := db.GetOrCreateRepo("/tmp/test2") commit, _ := db.GetOrCreateCommit(repo.ID, "def456", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "def456", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "def456", "codex", "", "") db.ClaimJob("worker-1") // This should succeed with new schema (would fail with old constraint) @@ -1046,7 +1046,7 @@ func TestMigrationWithAlterTableColumnOrder(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - newJob, err := db.EnqueueJob(repo2.ID, commit2.ID, "newsha", "codex", "") + newJob, err := db.EnqueueJob(repo2.ID, commit2.ID, "newsha", "codex", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1253,7 +1253,7 @@ func TestListReposWithReviewCounts(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "codex", ""); err != nil { + if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "codex", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -1265,7 +1265,7 @@ func TestListReposWithReviewCounts(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "codex", ""); err != nil { + if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "codex", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -1361,7 +1361,7 @@ func TestListJobsWithRepoFilter(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "codex", ""); err != nil { + if _, err := db.EnqueueJob(repo1.ID, commit.ID, sha, "codex", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -1373,7 +1373,7 @@ func TestListJobsWithRepoFilter(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "codex", ""); err != nil { + if _, err := db.EnqueueJob(repo2.ID, commit.ID, sha, "codex", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -1518,7 +1518,7 @@ func TestListJobsWithGitRefFilter(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - if _, err := db.EnqueueJob(repo.ID, commit.ID, ref, "codex", ""); err != nil { + if _, err := db.EnqueueJob(repo.ID, commit.ID, ref, "codex", "", ""); err != nil { t.Fatalf("EnqueueJob failed: %v", err) } } @@ -1588,7 +1588,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun failed job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-failed", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-failed", "codex", "", "") db.ClaimJob("worker-1") db.FailJob(job.ID, "some error") @@ -1614,7 +1614,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun canceled job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-canceled", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-canceled", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-canceled", "codex", "", "") db.CancelJob(job.ID) err := db.ReenqueueJob(job.ID) @@ -1630,7 +1630,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun done job", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-done", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-done", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-done", "codex", "", "") // ClaimJob returns the claimed job; keep claiming until we get ours var claimed *ReviewJob for { @@ -1659,7 +1659,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun queued job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-queued", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-queued", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-queued", "codex", "", "") err := db.ReenqueueJob(job.ID) if err == nil { @@ -1669,7 +1669,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun running job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-running", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-running", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "rerun-running", "codex", "", "") db.ClaimJob("worker-1") err := db.ReenqueueJob(job.ID) @@ -1692,7 +1692,7 @@ func TestReenqueueJob(t *testing.T) { isolatedRepo, _ := isolatedDB.GetOrCreateRepo("/tmp/isolated-repo") commit, _ := isolatedDB.GetOrCreateCommit(isolatedRepo.ID, "rerun-complete-cycle", "A", "S", time.Now()) - job, _ := isolatedDB.EnqueueJob(isolatedRepo.ID, commit.ID, "rerun-complete-cycle", "codex", "") + job, _ := isolatedDB.EnqueueJob(isolatedRepo.ID, commit.ID, "rerun-complete-cycle", "codex", "", "") // First completion cycle claimed, _ := isolatedDB.ClaimJob("worker-1") @@ -1758,7 +1758,7 @@ func TestListJobsAndGetJobByIDReturnAgentic(t *testing.T) { } // Enqueue a prompt job with agentic=true - job, err := db.EnqueuePromptJob(repo.ID, "test-agent", "thorough", "Review this code", true) + job, err := db.EnqueuePromptJob(repo.ID, "test-agent", "", "thorough", "Review this code", true) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -1805,7 +1805,7 @@ func TestListJobsAndGetJobByIDReturnAgentic(t *testing.T) { // Also test with agentic=false to ensure we're not just always returning true t.Run("non-agentic job returns Agentic=false", func(t *testing.T) { - nonAgenticJob, err := db.EnqueuePromptJob(repo.ID, "test-agent", "thorough", "Another review", false) + nonAgenticJob, err := db.EnqueuePromptJob(repo.ID, "test-agent", "", "thorough", "Another review", false) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index f13ebbf0..8dc346ff 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -557,7 +557,7 @@ func parseSQLiteTime(s string) time.Time { } // EnqueueJob creates a new review job for a single commit -func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, reasoning string) (*ReviewJob, error) { +func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, model, reasoning string) (*ReviewJob, error) { if reasoning == "" { reasoning = "thorough" } @@ -566,8 +566,8 @@ func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, reasoning string now := time.Now() nowStr := now.Format(time.RFC3339) - result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, reasoning, status, uuid, source_machine_id, updated_at) VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, ?)`, - repoID, commitID, gitRef, agent, reasoning, uuid, machineID, nowStr) + result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, model, reasoning, status, uuid, source_machine_id, updated_at) VALUES (?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?)`, + repoID, commitID, gitRef, agent, nullString(model), reasoning, uuid, machineID, nowStr) if err != nil { return nil, err } @@ -579,6 +579,7 @@ func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, reasoning string CommitID: &commitID, GitRef: gitRef, Agent: agent, + Model: model, Reasoning: reasoning, Status: JobStatusQueued, EnqueuedAt: now, @@ -589,7 +590,7 @@ func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, reasoning string } // EnqueueRangeJob creates a new review job for a commit range -func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, reasoning string) (*ReviewJob, error) { +func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, model, reasoning string) (*ReviewJob, error) { if reasoning == "" { reasoning = "thorough" } @@ -598,8 +599,8 @@ func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, reasoning string) (*R now := time.Now() nowStr := now.Format(time.RFC3339) - result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, reasoning, status, uuid, source_machine_id, updated_at) VALUES (?, NULL, ?, ?, ?, 'queued', ?, ?, ?)`, - repoID, gitRef, agent, reasoning, uuid, machineID, nowStr) + result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, model, reasoning, status, uuid, source_machine_id, updated_at) VALUES (?, NULL, ?, ?, ?, ?, 'queued', ?, ?, ?)`, + repoID, gitRef, agent, nullString(model), reasoning, uuid, machineID, nowStr) if err != nil { return nil, err } @@ -611,6 +612,7 @@ func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, reasoning string) (*R CommitID: nil, GitRef: gitRef, Agent: agent, + Model: model, Reasoning: reasoning, Status: JobStatusQueued, EnqueuedAt: now, @@ -622,7 +624,7 @@ func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, reasoning string) (*R // EnqueueDirtyJob creates a new review job for uncommitted (dirty) changes. // The diff is captured at enqueue time since the working tree may change. -func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, reasoning, diffContent string) (*ReviewJob, error) { +func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, model, reasoning, diffContent string) (*ReviewJob, error) { if reasoning == "" { reasoning = "thorough" } @@ -631,8 +633,8 @@ func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, reasoning, diffConten now := time.Now() nowStr := now.Format(time.RFC3339) - result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, reasoning, status, diff_content, uuid, source_machine_id, updated_at) VALUES (?, NULL, ?, ?, ?, 'queued', ?, ?, ?, ?)`, - repoID, gitRef, agent, reasoning, diffContent, uuid, machineID, nowStr) + result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, model, reasoning, status, diff_content, uuid, source_machine_id, updated_at) VALUES (?, NULL, ?, ?, ?, ?, 'queued', ?, ?, ?, ?)`, + repoID, gitRef, agent, nullString(model), reasoning, diffContent, uuid, machineID, nowStr) if err != nil { return nil, err } @@ -644,6 +646,7 @@ func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, reasoning, diffConten CommitID: nil, GitRef: gitRef, Agent: agent, + Model: model, Reasoning: reasoning, Status: JobStatusQueued, EnqueuedAt: now, @@ -657,7 +660,7 @@ func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, reasoning, diffConten // EnqueuePromptJob creates a new job with a custom prompt (not a git review). // The prompt is stored at enqueue time and used directly by the worker. // If agentic is true, the agent will be allowed to edit files and run commands. -func (db *DB) EnqueuePromptJob(repoID int64, agent, reasoning, customPrompt string, agentic bool) (*ReviewJob, error) { +func (db *DB) EnqueuePromptJob(repoID int64, agent, model, reasoning, customPrompt string, agentic bool) (*ReviewJob, error) { if reasoning == "" { reasoning = "thorough" } @@ -670,8 +673,8 @@ func (db *DB) EnqueuePromptJob(repoID int64, agent, reasoning, customPrompt stri now := time.Now() nowStr := now.Format(time.RFC3339) - result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, reasoning, status, prompt, agentic, uuid, source_machine_id, updated_at) VALUES (?, NULL, 'prompt', ?, ?, 'queued', ?, ?, ?, ?, ?)`, - repoID, agent, reasoning, customPrompt, agenticInt, uuid, machineID, nowStr) + result, err := db.Exec(`INSERT INTO review_jobs (repo_id, commit_id, git_ref, agent, model, reasoning, status, prompt, agentic, uuid, source_machine_id, updated_at) VALUES (?, NULL, 'prompt', ?, ?, ?, 'queued', ?, ?, ?, ?, ?)`, + repoID, agent, nullString(model), reasoning, customPrompt, agenticInt, uuid, machineID, nowStr) if err != nil { return nil, err } @@ -683,6 +686,7 @@ func (db *DB) EnqueuePromptJob(repoID int64, agent, reasoning, customPrompt stri CommitID: nil, GitRef: "prompt", Agent: agent, + Model: model, Reasoning: reasoning, Status: JobStatusQueued, EnqueuedAt: now, @@ -731,9 +735,10 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { var commitSubject sql.NullString var diffContent sql.NullString var prompt sql.NullString + var model sql.NullString var agenticInt int err = db.QueryRow(` - SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.agent, j.reasoning, j.status, j.enqueued_at, + SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.agent, j.model, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0) FROM review_jobs j JOIN repos r ON r.id = j.repo_id @@ -741,7 +746,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { WHERE j.worker_id = ? AND j.status = 'running' ORDER BY j.started_at DESC LIMIT 1 - `, workerID).Scan(&job.ID, &job.RepoID, &commitID, &job.GitRef, &job.Agent, &job.Reasoning, &job.Status, &enqueuedAt, + `, workerID).Scan(&job.ID, &job.RepoID, &commitID, &job.GitRef, &job.Agent, &model, &job.Reasoning, &job.Status, &enqueuedAt, &job.RepoPath, &job.RepoName, &commitSubject, &diffContent, &prompt, &agenticInt) if err != nil { return nil, err @@ -759,6 +764,9 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { if prompt.Valid { job.Prompt = prompt.String } + if model.Valid { + job.Model = model.String + } job.Agentic = agenticInt != 0 job.EnqueuedAt = parseSQLiteTime(enqueuedAt) job.Status = JobStatusRunning @@ -914,7 +922,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.addressed, rv.output, - j.source_machine_id, j.uuid + j.source_machine_id, j.uuid, j.model FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -962,7 +970,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int for rows.Next() { var j ReviewJob var enqueuedAt string - var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID sql.NullString + var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID, model sql.NullString var commitID sql.NullInt64 var commitSubject sql.NullString var addressed sql.NullInt64 @@ -971,7 +979,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int err := rows.Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &j.RetryCount, &agentic, &j.RepoPath, &j.RepoName, &commitSubject, &addressed, &output, - &sourceMachineID, &jobUUID) + &sourceMachineID, &jobUUID, &model) if err != nil { return nil, err } @@ -1007,6 +1015,9 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int if sourceMachineID.Valid { j.SourceMachineID = sourceMachineID.String } + if model.Valid { + j.Model = model.String + } if addressed.Valid { val := addressed.Int64 != 0 j.Addressed = &val @@ -1035,17 +1046,18 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { var commitSubject sql.NullString var agentic int + var model sql.NullString err := db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), - r.root_path, r.name, c.subject + r.root_path, r.name, c.subject, j.model FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id WHERE j.id = ? `, id).Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &agentic, - &j.RepoPath, &j.RepoName, &commitSubject) + &j.RepoPath, &j.RepoName, &commitSubject, &model) if err != nil { return nil, err } @@ -1075,6 +1087,9 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { if prompt.Valid { j.Prompt = prompt.String } + if model.Valid { + j.Model = model.String + } return &j, nil } diff --git a/internal/storage/models.go b/internal/storage/models.go index eedc3ec3..f3558e5e 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -36,6 +36,7 @@ type ReviewJob struct { CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges GitRef string `json:"git_ref"` // SHA or "start..end" for ranges Agent string `json:"agent"` + Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format) Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough) Status JobStatus `json:"status"` EnqueuedAt time.Time `json:"enqueued_at"` diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 00104d7e..237b19c2 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -2,8 +2,10 @@ package storage import ( "context" + _ "embed" "errors" "fmt" + "strings" "time" "github.com/jackc/pgx/v5" @@ -12,97 +14,38 @@ import ( ) // PostgreSQL schema version - increment when schema changes -const pgSchemaVersion = 1 +const pgSchemaVersion = 2 // pgSchemaName is the PostgreSQL schema used to isolate roborev tables const pgSchemaName = "roborev" -// pgSchemaStatements are the individual DDL statements for schema creation. -// Defined as a slice to avoid SQL parsing issues with semicolons, comments, etc. -// Note: Tables are created in the 'roborev' schema (set via search_path on connection). -var pgSchemaStatements = []string{ - // Create dedicated schema for roborev - `CREATE SCHEMA IF NOT EXISTS roborev`, - - `CREATE TABLE IF NOT EXISTS schema_version ( - version INTEGER PRIMARY KEY, - applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE TABLE IF NOT EXISTS machines ( - id SERIAL PRIMARY KEY, - machine_id UUID UNIQUE NOT NULL, - name TEXT, - last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE TABLE IF NOT EXISTS repos ( - id SERIAL PRIMARY KEY, - identity TEXT UNIQUE NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE TABLE IF NOT EXISTS commits ( - id SERIAL PRIMARY KEY, - repo_id INTEGER REFERENCES repos(id), - sha TEXT NOT NULL, - author TEXT NOT NULL, - subject TEXT NOT NULL, - timestamp TIMESTAMP WITH TIME ZONE NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - UNIQUE(repo_id, sha) -)`, - `CREATE TABLE IF NOT EXISTS review_jobs ( - id SERIAL PRIMARY KEY, - uuid UUID UNIQUE NOT NULL, - repo_id INTEGER NOT NULL REFERENCES repos(id), - commit_id INTEGER REFERENCES commits(id), - git_ref TEXT NOT NULL, - agent TEXT NOT NULL, - reasoning TEXT, - status TEXT NOT NULL CHECK(status IN ('done', 'failed', 'canceled')), - agentic BOOLEAN DEFAULT FALSE, - enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, - started_at TIMESTAMP WITH TIME ZONE, - finished_at TIMESTAMP WITH TIME ZONE, - prompt TEXT, - diff_content TEXT, - error TEXT, - source_machine_id UUID NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE TABLE IF NOT EXISTS reviews ( - id SERIAL PRIMARY KEY, - uuid UUID UNIQUE NOT NULL, - job_uuid UUID NOT NULL REFERENCES review_jobs(uuid), - agent TEXT NOT NULL, - prompt TEXT NOT NULL, - output TEXT NOT NULL, - addressed BOOLEAN NOT NULL DEFAULT FALSE, - updated_by_machine_id UUID NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE TABLE IF NOT EXISTS responses ( - id SERIAL PRIMARY KEY, - uuid UUID UNIQUE NOT NULL, - job_uuid UUID NOT NULL REFERENCES review_jobs(uuid), - responder TEXT NOT NULL, - response TEXT NOT NULL, - source_machine_id UUID NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() -)`, - `CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON review_jobs(source_machine_id)`, - `CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON review_jobs(updated_at)`, - `CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON reviews(job_uuid)`, - `CREATE INDEX IF NOT EXISTS idx_reviews_updated ON reviews(updated_at)`, - `CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON responses(job_uuid)`, - `CREATE INDEX IF NOT EXISTS idx_responses_id ON responses(id)`, - // sync_metadata stores database-level sync configuration - // database_id is a unique ID for this Postgres instance, used to detect - // when a client is syncing to a different database than before - `CREATE TABLE IF NOT EXISTS sync_metadata ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL -)`, +//go:embed schemas/postgres_v2.sql +var pgSchemaSQL string + +// pgSchemaStatements returns the individual DDL statements for schema creation. +// Parsed from the embedded SQL file. +func pgSchemaStatements() []string { + var stmts []string + for _, stmt := range strings.Split(pgSchemaSQL, ";") { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + // Skip pure comment lines + lines := strings.Split(stmt, "\n") + hasCode := false + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" && !strings.HasPrefix(line, "--") { + hasCode = true + break + } + } + if hasCode { + stmts = append(stmts, stmt) + } + } + return stmts } // PgPool wraps a pgx connection pool with reconnection logic @@ -211,7 +154,7 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { // Execute each schema statement individually since pgx prepared // statement mode doesn't support multi-statement execution - for _, stmt := range pgSchemaStatements { + for _, stmt := range pgSchemaStatements() { if _, err := p.pool.Exec(ctx, stmt); err != nil { return fmt.Errorf("create schema: %w", err) } @@ -232,8 +175,21 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { } } else if currentVersion > pgSchemaVersion { return fmt.Errorf("database schema version %d is newer than supported version %d", currentVersion, pgSchemaVersion) + } else if currentVersion < pgSchemaVersion { + // Run migrations + if currentVersion < 2 { + // Migration 1->2: Add model column to review_jobs + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS model TEXT`) + if err != nil { + return fmt.Errorf("migrate to v2 (add model column): %w", err) + } + } + // Update version + _, err = p.pool.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, pgSchemaVersion) + if err != nil { + return fmt.Errorf("update schema version: %w", err) + } } - // Note: migrations for version upgrades would go here return nil } @@ -468,16 +424,17 @@ func (p *PgPool) Tx(ctx context.Context, fn func(tx pgx.Tx) error) error { func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error { _, err := p.pool.Exec(ctx, ` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, agent, reasoning, status, agentic, + uuid, repo_id, commit_id, git_ref, agent, model, reasoning, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW()) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, error = EXCLUDED.error, + model = COALESCE(EXCLUDED.model, review_jobs.model), updated_at = NOW() - `, j.UUID, pgRepoID, pgCommitID, j.GitRef, j.Agent, nullString(j.Reasoning), + `, j.UUID, pgRepoID, pgCommitID, j.GitRef, j.Agent, nullString(j.Model), nullString(j.Reasoning), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, nullString(j.Prompt), j.DiffContent, nullString(j.Error), j.SourceMachineID) return err @@ -520,6 +477,7 @@ type PulledJob struct { CommitTimestamp time.Time GitRef string Agent string + Model string Reasoning string Status string Agentic bool @@ -551,7 +509,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s rows, err := p.pool.Query(ctx, ` SELECT j.uuid, r.identity, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, '1970-01-01'::timestamptz), - j.git_ref, j.agent, COALESCE(j.reasoning, ''), j.status, j.agentic, + j.git_ref, j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), j.status, j.agentic, j.enqueued_at, j.started_at, j.finished_at, COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), j.source_machine_id, j.updated_at, j.id @@ -578,7 +536,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s err := rows.Scan( &j.UUID, &j.RepoIdentity, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &j.CommitTimestamp, - &j.GitRef, &j.Agent, &j.Reasoning, &j.Status, &j.Agentic, + &j.GitRef, &j.Agent, &j.Model, &j.Reasoning, &j.Status, &j.Agentic, &j.EnqueuedAt, &j.StartedAt, &j.FinishedAt, &j.Prompt, &diffContent, &j.Error, &j.SourceMachineID, &j.UpdatedAt, &lastID, diff --git a/internal/storage/postgres_integration_test.go b/internal/storage/postgres_integration_test.go index e6197741..5dfe25fa 100644 --- a/internal/storage/postgres_integration_test.go +++ b/internal/storage/postgres_integration_test.go @@ -84,7 +84,7 @@ func TestIntegration_SyncFullCycle(t *testing.T) { } // Enqueue a job - job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123def456", "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123def456", "test", "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -204,8 +204,8 @@ func TestIntegration_SyncMultipleRepos(t *testing.T) { commit2, _ := db.GetOrCreateCommit(repo2.ID, sameSHA, "Author", "Subject", time.Now()) // Create jobs for both repos and complete them (only terminal jobs are synced) - job1, _ := db.EnqueueJob(repo1.ID, commit1.ID, sameSHA, "test", "") - job2, _ := db.EnqueueJob(repo2.ID, commit2.ID, sameSHA, "test", "") + job1, _ := db.EnqueueJob(repo1.ID, commit1.ID, sameSHA, "test", "", "") + job2, _ := db.EnqueueJob(repo2.ID, commit2.ID, sameSHA, "test", "", "") // Complete both jobs db.Exec(`UPDATE review_jobs SET status = 'running', started_at = datetime('now') WHERE id = ?`, job1.ID) @@ -382,7 +382,7 @@ func TestIntegration_FinalPush(t *testing.T) { // Create many jobs to test batch pushing (only terminal jobs are synced) // Use EnqueueRangeJob since we don't have actual commits in this test for i := 0; i < 150; i++ { - job, err := db.EnqueueRangeJob(repo.ID, "HEAD", "test", "") + job, err := db.EnqueueRangeJob(repo.ID, "HEAD", "test", "", "") if err != nil { t.Fatalf("EnqueueRangeJob %d failed: %v", i, err) } @@ -544,7 +544,7 @@ func TestIntegration_Multiplayer(t *testing.T) { t.Fatalf("Machine A: GetOrCreateCommit failed: %v", err) } - jobA, err := dbA.EnqueueJob(repoA.ID, commitA.ID, "aaaa1111", "test", "") + jobA, err := dbA.EnqueueJob(repoA.ID, commitA.ID, "aaaa1111", "test", "", "") if err != nil { t.Fatalf("Machine A: EnqueueJob failed: %v", err) } @@ -571,7 +571,7 @@ func TestIntegration_Multiplayer(t *testing.T) { t.Fatalf("Machine B: GetOrCreateCommit failed: %v", err) } - jobB, err := dbB.EnqueueJob(repoB.ID, commitB.ID, "bbbb2222", "test", "") + jobB, err := dbB.EnqueueJob(repoB.ID, commitB.ID, "bbbb2222", "test", "", "") if err != nil { t.Fatalf("Machine B: EnqueueJob failed: %v", err) } @@ -775,7 +775,7 @@ func TestIntegration_MultiplayerSameCommit(t *testing.T) { if err != nil { t.Fatalf("Machine A: GetOrCreateCommit failed: %v", err) } - jobA, err := dbA.EnqueueJob(repoA.ID, commitA.ID, sharedCommitSHA, "test", "") + jobA, err := dbA.EnqueueJob(repoA.ID, commitA.ID, sharedCommitSHA, "test", "", "") if err != nil { t.Fatalf("Machine A: EnqueueJob failed: %v", err) } @@ -798,7 +798,7 @@ func TestIntegration_MultiplayerSameCommit(t *testing.T) { if err != nil { t.Fatalf("Machine B: GetOrCreateCommit failed: %v", err) } - jobB, err := dbB.EnqueueJob(repoB.ID, commitB.ID, sharedCommitSHA, "test", "") + jobB, err := dbB.EnqueueJob(repoB.ID, commitB.ID, sharedCommitSHA, "test", "", "") if err != nil { t.Fatalf("Machine B: EnqueueJob failed: %v", err) } @@ -1034,7 +1034,7 @@ func TestIntegration_MultiplayerRealistic(t *testing.T) { if err != nil { return nil, fmt.Errorf("GetOrCreateCommit: %w", err) } - job, err := db.EnqueueJob(repoID, commit.ID, sha, "test", "") + job, err := db.EnqueueJob(repoID, commit.ID, sha, "test", "", "") if err != nil { return nil, fmt.Errorf("EnqueueJob: %w", err) } @@ -1492,7 +1492,7 @@ func TestIntegration_MultiplayerOfflineReconnect(t *testing.T) { if err != nil { t.Fatalf("Machine A: GetOrCreateCommit failed: %v", err) } - jobA1, err := dbA.EnqueueJob(repoA.ID, commitA1.ID, "dddd4444", "test", "") + jobA1, err := dbA.EnqueueJob(repoA.ID, commitA1.ID, "dddd4444", "test", "", "") if err != nil { t.Fatalf("Machine A: EnqueueJob failed: %v", err) } @@ -1533,7 +1533,7 @@ func TestIntegration_MultiplayerOfflineReconnect(t *testing.T) { if err != nil { t.Fatalf("Machine A offline: GetOrCreateCommit failed: %v", err) } - jobA2, err := dbA.EnqueueJob(repoA.ID, commitA2.ID, "eeee5555", "test", "") + jobA2, err := dbA.EnqueueJob(repoA.ID, commitA2.ID, "eeee5555", "test", "", "") if err != nil { t.Fatalf("Machine A offline: EnqueueJob failed: %v", err) } @@ -1548,7 +1548,7 @@ func TestIntegration_MultiplayerOfflineReconnect(t *testing.T) { if err != nil { t.Fatalf("Machine A offline: GetOrCreateCommit failed: %v", err) } - jobA3, err := dbA.EnqueueJob(repoA.ID, commitA3.ID, "ffff6666", "test", "") + jobA3, err := dbA.EnqueueJob(repoA.ID, commitA3.ID, "ffff6666", "test", "", "") if err != nil { t.Fatalf("Machine A offline: EnqueueJob failed: %v", err) } @@ -1698,7 +1698,7 @@ func TestIntegration_SyncNowPushesAllBatches(t *testing.T) { if err != nil { t.Fatalf("Failed to create commit %d: %v", i, err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, fmt.Sprintf("commit%03d", i), "test", "") + job, err := db.EnqueueJob(repo.ID, commit.ID, fmt.Sprintf("commit%03d", i), "test", "", "") if err != nil { t.Fatalf("Failed to enqueue job %d: %v", i, err) } diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index d0e645bf..64766155 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -1,6 +1,7 @@ package storage import ( + _ "embed" "fmt" "os" "strings" @@ -11,6 +12,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +//go:embed schemas/postgres_v1.sql +var postgresV1Schema string + func TestDefaultPgPoolConfig(t *testing.T) { cfg := DefaultPgPoolConfig() @@ -32,22 +36,23 @@ func TestDefaultPgPoolConfig(t *testing.T) { } func TestPgSchemaStatementsContainsRequiredTables(t *testing.T) { - requiredTables := []string{ - "CREATE TABLE IF NOT EXISTS schema_version", - "CREATE TABLE IF NOT EXISTS machines", - "CREATE TABLE IF NOT EXISTS repos", - "CREATE TABLE IF NOT EXISTS commits", - "CREATE TABLE IF NOT EXISTS review_jobs", - "CREATE TABLE IF NOT EXISTS reviews", - "CREATE TABLE IF NOT EXISTS responses", + requiredStatements := []string{ + "CREATE SCHEMA IF NOT EXISTS roborev", + "CREATE TABLE IF NOT EXISTS roborev.schema_version", + "CREATE TABLE IF NOT EXISTS roborev.machines", + "CREATE TABLE IF NOT EXISTS roborev.repos", + "CREATE TABLE IF NOT EXISTS roborev.commits", + "CREATE TABLE IF NOT EXISTS roborev.review_jobs", + "CREATE TABLE IF NOT EXISTS roborev.reviews", + "CREATE TABLE IF NOT EXISTS roborev.responses", } // Join all statements to search across the actual executed schema - allStatements := strings.Join(pgSchemaStatements, "\n") + allStatements := strings.Join(pgSchemaStatements(), "\n") - for _, table := range requiredTables { - if !strings.Contains(allStatements, table) { - t.Errorf("Schema missing: %s", table) + for _, required := range requiredStatements { + if !strings.Contains(allStatements, required) { + t.Errorf("Schema missing: %s", required) } } } @@ -63,7 +68,7 @@ func TestPgSchemaStatementsContainsRequiredIndexes(t *testing.T) { } // Join all statements to search across the actual executed schema - allStatements := strings.Join(pgSchemaStatements, "\n") + allStatements := strings.Join(pgSchemaStatements(), "\n") for _, idx := range requiredIndexes { if !strings.Contains(allStatements, idx) { @@ -1093,7 +1098,7 @@ func TestIntegration_NewDatabaseClearsSyncedAt(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := sqliteDB.EnqueueJob(repo.ID, commit.ID, "test-sha", "test", "thorough") + job, err := sqliteDB.EnqueueJob(repo.ID, commit.ID, "test-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1477,3 +1482,234 @@ func TestIntegration_BatchOperations(t *testing.T) { } }) } + +func TestIntegration_EnsureSchema_MigratesV1ToV2(t *testing.T) { + // This test verifies that a v1 schema (without model column) gets migrated to v2 + connString := getTestPostgresURL(t) + ctx := t.Context() + + // Create a pool without AfterConnect to set up test state + cfg, err := pgxpool.ParseConfig(connString) + if err != nil { + t.Fatalf("Failed to parse config: %v", err) + } + setupPool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + t.Fatalf("Failed to create setup pool: %v", err) + } + + // Drop any existing schema to start fresh - this test needs to verify v1→v2 migration + _, err = setupPool.Exec(ctx, "DROP SCHEMA IF EXISTS roborev CASCADE") + if err != nil { + setupPool.Close() + t.Fatalf("Failed to drop existing schema: %v", err) + } + + // Cleanup after test + t.Cleanup(func() { + cleanupCfg, _ := pgxpool.ParseConfig(connString) + cleanupPool, _ := pgxpool.NewWithConfig(ctx, cleanupCfg) + if cleanupPool != nil { + cleanupPool.Exec(ctx, "DROP SCHEMA IF EXISTS roborev CASCADE") + cleanupPool.Close() + } + }) + + // Load and execute v1 schema from embedded SQL file + // Use same parsing logic as pgSchemaStatements() to handle comments correctly + for _, stmt := range strings.Split(postgresV1Schema, ";") { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + // Skip statements that are only comments + hasCode := false + for _, line := range strings.Split(stmt, "\n") { + line = strings.TrimSpace(line) + if line != "" && !strings.HasPrefix(line, "--") { + hasCode = true + break + } + } + if !hasCode { + continue + } + if _, err := setupPool.Exec(ctx, stmt); err != nil { + setupPool.Close() + t.Fatalf("Failed to execute v1 schema statement: %v\nStatement: %s", err, stmt) + } + } + + // Insert a test job to verify data survives migration + testJobUUID := uuid.NewString() + var repoID int64 + err = setupPool.QueryRow(ctx, ` + INSERT INTO roborev.repos (identity) VALUES ('test-repo-v1-migration') RETURNING id + `).Scan(&repoID) + if err != nil { + setupPool.Close() + t.Fatalf("Failed to insert test repo: %v", err) + } + _, err = setupPool.Exec(ctx, ` + INSERT INTO roborev.review_jobs (uuid, repo_id, git_ref, agent, status, source_machine_id, enqueued_at) + VALUES ($1, $2, 'HEAD', 'test-agent', 'done', '00000000-0000-0000-0000-000000000001', NOW()) + `, testJobUUID, repoID) + if err != nil { + setupPool.Close() + t.Fatalf("Failed to insert test job: %v", err) + } + + setupPool.Close() + + // Now connect with the normal pool and run EnsureSchema - should migrate v1→v2 + pool, err := NewPgPool(ctx, connString, DefaultPgPoolConfig()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer pool.Close() + + err = pool.EnsureSchema(ctx) + if err != nil { + t.Fatalf("EnsureSchema failed: %v", err) + } + + // Verify schema version advanced to 2 + var version int + err = pool.pool.QueryRow(ctx, `SELECT MAX(version) FROM schema_version`).Scan(&version) + if err != nil { + t.Fatalf("Failed to query schema version: %v", err) + } + if version != pgSchemaVersion { + t.Errorf("Expected schema version %d, got %d", pgSchemaVersion, version) + } + + // Verify model column was added + var hasModelColumn bool + err = pool.pool.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'roborev' AND table_name = 'review_jobs' AND column_name = 'model' + ) + `).Scan(&hasModelColumn) + if err != nil { + t.Fatalf("Failed to check for model column: %v", err) + } + if !hasModelColumn { + t.Error("Expected model column to exist after v1→v2 migration") + } + + // Verify pre-existing job survived migration with model=NULL + var jobAgent string + var jobModel *string + err = pool.pool.QueryRow(ctx, `SELECT agent, model FROM review_jobs WHERE uuid = $1`, testJobUUID).Scan(&jobAgent, &jobModel) + if err != nil { + t.Fatalf("Failed to query test job after migration: %v", err) + } + if jobAgent != "test-agent" { + t.Errorf("Expected agent 'test-agent', got %q", jobAgent) + } + if jobModel != nil { + t.Errorf("Expected model to be NULL for pre-migration job, got %q", *jobModel) + } +} + +func TestIntegration_UpsertJob_BackfillsModel(t *testing.T) { + // This test verifies that upserting a job with a model value backfills + // an existing job that has NULL model (COALESCE behavior) + connString := getTestPostgresURL(t) + ctx := t.Context() + + pool, err := NewPgPool(ctx, connString, DefaultPgPoolConfig()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer pool.Close() + + if err := pool.EnsureSchema(ctx); err != nil { + t.Fatalf("EnsureSchema failed: %v", err) + } + + // Create test data + machineID := uuid.NewString() + jobUUID := uuid.NewString() + repoIdentity := "test-repo-backfill-" + time.Now().Format("20060102150405") + + defer func() { + // Cleanup + pool.pool.Exec(ctx, `DELETE FROM review_jobs WHERE uuid = $1`, jobUUID) + pool.pool.Exec(ctx, `DELETE FROM repos WHERE identity = $1`, repoIdentity) + pool.pool.Exec(ctx, `DELETE FROM machines WHERE machine_id = $1`, machineID) + }() + + // Register machine and create repo + if err := pool.RegisterMachine(ctx, machineID, "test"); err != nil { + t.Fatalf("RegisterMachine failed: %v", err) + } + repoID, err := pool.GetOrCreateRepo(ctx, repoIdentity) + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + + // Insert job with NULL model directly + _, err = pool.pool.Exec(ctx, ` + INSERT INTO review_jobs (uuid, repo_id, git_ref, agent, status, source_machine_id, enqueued_at, created_at, updated_at) + VALUES ($1, $2, 'HEAD', 'test-agent', 'done', $3, NOW(), NOW(), NOW()) + `, jobUUID, repoID, machineID) + if err != nil { + t.Fatalf("Failed to insert job with NULL model: %v", err) + } + + // Verify model is NULL + var modelBefore *string + err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelBefore) + if err != nil { + t.Fatalf("Failed to query model before: %v", err) + } + if modelBefore != nil { + t.Fatalf("Expected model to be NULL before upsert, got %q", *modelBefore) + } + + // Upsert with a model value - should backfill + job := SyncableJob{ + UUID: jobUUID, + RepoIdentity: repoIdentity, + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", // Now providing a model + Status: "done", + SourceMachineID: machineID, + EnqueuedAt: time.Now(), + } + err = pool.UpsertJob(ctx, job, repoID, nil) + if err != nil { + t.Fatalf("UpsertJob failed: %v", err) + } + + // Verify model was backfilled + var modelAfter *string + err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelAfter) + if err != nil { + t.Fatalf("Failed to query model after: %v", err) + } + if modelAfter == nil { + t.Error("Expected model to be backfilled, but it's still NULL") + } else if *modelAfter != "gpt-4" { + t.Errorf("Expected model 'gpt-4', got %q", *modelAfter) + } + + // Also verify that upserting with empty model doesn't clear existing model + job.Model = "" // Empty model + err = pool.UpsertJob(ctx, job, repoID, nil) + if err != nil { + t.Fatalf("UpsertJob (empty model) failed: %v", err) + } + + var modelPreserved *string + err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelPreserved) + if err != nil { + t.Fatalf("Failed to query model preserved: %v", err) + } + if modelPreserved == nil || *modelPreserved != "gpt-4" { + t.Errorf("Expected model to be preserved as 'gpt-4' when upserting with empty model, got %v", modelPreserved) + } +} diff --git a/internal/storage/repos_test.go b/internal/storage/repos_test.go index 16613f10..7e1b3c10 100644 --- a/internal/storage/repos_test.go +++ b/internal/storage/repos_test.go @@ -18,7 +18,7 @@ func TestEnqueuePromptJob(t *testing.T) { t.Run("creates job with custom prompt", func(t *testing.T) { customPrompt := "Explain the architecture of this codebase" - job, err := db.EnqueuePromptJob(repo.ID, "claude-code", "thorough", customPrompt, false) + job, err := db.EnqueuePromptJob(repo.ID, "claude-code", "", "thorough", customPrompt, false) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -41,7 +41,7 @@ func TestEnqueuePromptJob(t *testing.T) { }) t.Run("defaults reasoning to thorough", func(t *testing.T) { - job, err := db.EnqueuePromptJob(repo.ID, "codex", "", "test prompt", false) + job, err := db.EnqueuePromptJob(repo.ID, "codex", "", "", "test prompt", false) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -63,7 +63,7 @@ func TestEnqueuePromptJob(t *testing.T) { } customPrompt := "Find security issues in the codebase" - _, err := db.EnqueuePromptJob(repo.ID, "claude-code", "standard", customPrompt, false) + _, err := db.EnqueuePromptJob(repo.ID, "claude-code", "", "standard", customPrompt, false) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -95,7 +95,7 @@ func TestEnqueuePromptJob(t *testing.T) { } // Enqueue with agentic=true - job, err := db.EnqueuePromptJob(repo.ID, "claude-code", "thorough", "Test agentic prompt", true) + job, err := db.EnqueuePromptJob(repo.ID, "claude-code", "", "thorough", "Test agentic prompt", true) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -138,7 +138,7 @@ func TestEnqueuePromptJob(t *testing.T) { } // Enqueue with agentic=false - job, err := db.EnqueuePromptJob(repo.ID, "codex", "standard", "Non-agentic prompt", false) + job, err := db.EnqueuePromptJob(repo.ID, "codex", "", "standard", "Non-agentic prompt", false) if err != nil { t.Fatalf("EnqueuePromptJob failed: %v", err) } @@ -379,13 +379,13 @@ func TestGetRepoStats(t *testing.T) { // Add some jobs commit1, _ := db.GetOrCreateCommit(repo.ID, "stats-sha1", "A", "S", time.Now()) - job1, _ := db.EnqueueJob(repo.ID, commit1.ID, "stats-sha1", "codex", "") + job1, _ := db.EnqueueJob(repo.ID, commit1.ID, "stats-sha1", "codex", "", "") commit2, _ := db.GetOrCreateCommit(repo.ID, "stats-sha2", "A", "S", time.Now()) - job2, _ := db.EnqueueJob(repo.ID, commit2.ID, "stats-sha2", "codex", "") + job2, _ := db.EnqueueJob(repo.ID, commit2.ID, "stats-sha2", "codex", "", "") commit3, _ := db.GetOrCreateCommit(repo.ID, "stats-sha3", "A", "S", time.Now()) - job3, _ := db.EnqueueJob(repo.ID, commit3.ID, "stats-sha3", "codex", "") + job3, _ := db.EnqueueJob(repo.ID, commit3.ID, "stats-sha3", "codex", "", "") // Complete job1 with PASS verdict db.ClaimJob("worker-1") @@ -436,12 +436,12 @@ func TestGetRepoStats(t *testing.T) { // Create a regular job with PASS verdict commit, _ := db.GetOrCreateCommit(repo.ID, "stats-prompt-sha1", "A", "S", time.Now()) - job1, _ := db.EnqueueJob(repo.ID, commit.ID, "stats-prompt-sha1", "codex", "") + job1, _ := db.EnqueueJob(repo.ID, commit.ID, "stats-prompt-sha1", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job1.ID, "codex", "prompt", "**Verdict: PASS**\nLooks good!") // Create a prompt job with output that contains verdict-like text - promptJob, _ := db.EnqueuePromptJob(repo.ID, "codex", "thorough", "Test prompt", false) + promptJob, _ := db.EnqueuePromptJob(repo.ID, "codex", "", "thorough", "Test prompt", false) db.ClaimJob("worker-1") // This has FAIL verdict text but should NOT count toward failed reviews db.CompleteJob(promptJob.ID, "codex", "prompt", "**Verdict: FAIL**\nSome issues found") @@ -495,7 +495,7 @@ func TestDeleteRepo(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/delete-with-jobs") commit, _ := db.GetOrCreateCommit(repo.ID, "delete-sha", "A", "S", time.Now()) - db.EnqueueJob(repo.ID, commit.ID, "delete-sha", "codex", "") + db.EnqueueJob(repo.ID, commit.ID, "delete-sha", "codex", "", "") // Without cascade, delete should return ErrRepoHasJobs err := db.DeleteRepo(repo.ID, false) @@ -519,7 +519,7 @@ func TestDeleteRepo(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/delete-cascade") commit, _ := db.GetOrCreateCommit(repo.ID, "cascade-sha", "A", "S", time.Now()) - job, _ := db.EnqueueJob(repo.ID, commit.ID, "cascade-sha", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "cascade-sha", "codex", "", "") db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "prompt", "output") @@ -570,13 +570,13 @@ func TestMergeRepos(t *testing.T) { // Create jobs in source commit1, _ := db.GetOrCreateCommit(source.ID, "merge-sha1", "A", "S", time.Now()) - db.EnqueueJob(source.ID, commit1.ID, "merge-sha1", "codex", "") + db.EnqueueJob(source.ID, commit1.ID, "merge-sha1", "codex", "", "") commit2, _ := db.GetOrCreateCommit(source.ID, "merge-sha2", "A", "S", time.Now()) - db.EnqueueJob(source.ID, commit2.ID, "merge-sha2", "codex", "") + db.EnqueueJob(source.ID, commit2.ID, "merge-sha2", "codex", "", "") // Create job in target commit3, _ := db.GetOrCreateCommit(target.ID, "merge-sha3", "A", "S", time.Now()) - db.EnqueueJob(target.ID, commit3.ID, "merge-sha3", "codex", "") + db.EnqueueJob(target.ID, commit3.ID, "merge-sha3", "codex", "", "") moved, err := db.MergeRepos(source.ID, target.ID) if err != nil { @@ -652,8 +652,8 @@ func TestMergeRepos(t *testing.T) { // Create commits in source commit1, _ := db.GetOrCreateCommit(source.ID, "commit-sha-1", "Author", "Subject 1", time.Now()) commit2, _ := db.GetOrCreateCommit(source.ID, "commit-sha-2", "Author", "Subject 2", time.Now()) - db.EnqueueJob(source.ID, commit1.ID, "commit-sha-1", "codex", "") - db.EnqueueJob(source.ID, commit2.ID, "commit-sha-2", "codex", "") + db.EnqueueJob(source.ID, commit1.ID, "commit-sha-1", "codex", "", "") + db.EnqueueJob(source.ID, commit2.ID, "commit-sha-2", "codex", "", "") // Verify commits belong to source before merge var sourceCommitCount int @@ -690,8 +690,8 @@ func TestDeleteRepoCascadeDeletesCommits(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/delete-commits-test") commit1, _ := db.GetOrCreateCommit(repo.ID, "del-commit-1", "A", "S1", time.Now()) commit2, _ := db.GetOrCreateCommit(repo.ID, "del-commit-2", "A", "S2", time.Now()) - db.EnqueueJob(repo.ID, commit1.ID, "del-commit-1", "codex", "") - db.EnqueueJob(repo.ID, commit2.ID, "del-commit-2", "codex", "") + db.EnqueueJob(repo.ID, commit1.ID, "del-commit-1", "codex", "", "") + db.EnqueueJob(repo.ID, commit2.ID, "del-commit-2", "codex", "", "") // Verify commits exist before delete var beforeCount int @@ -754,7 +754,7 @@ func TestVerdictSuppressionForPromptJobs(t *testing.T) { repo, _ := db.GetOrCreateRepo("/tmp/verdict-prompt-test") // Create a prompt job and complete it with output containing verdict-like text - promptJob, _ := db.EnqueuePromptJob(repo.ID, "codex", "thorough", "Test prompt", false) + promptJob, _ := db.EnqueuePromptJob(repo.ID, "codex", "", "thorough", "Test prompt", false) db.ClaimJob("worker-1") // Output that would normally be parsed as FAIL db.CompleteJob(promptJob.ID, "codex", "prompt", "Found issues:\n1. Problem A") @@ -786,7 +786,7 @@ func TestVerdictSuppressionForPromptJobs(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-sha", "Author", "Subject", time.Now()) // Create a regular job and complete it - job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-sha", "codex", "") + job, _ := db.EnqueueJob(repo.ID, commit.ID, "verdict-sha", "codex", "", "") db.ClaimJob("worker-1") // Output that should be parsed as PASS db.CompleteJob(job.ID, "codex", "prompt", "No issues found in this commit.") diff --git a/internal/storage/schemas/postgres_v1.sql b/internal/storage/schemas/postgres_v1.sql new file mode 100644 index 00000000..84bd76bb --- /dev/null +++ b/internal/storage/schemas/postgres_v1.sql @@ -0,0 +1,97 @@ +-- PostgreSQL schema version 1 +-- This is the original schema before the model column was added to review_jobs. +-- Used for migration testing. +-- +-- Note: This is the Postgres SYNC schema, which only stores completed jobs +-- (status IN 'done', 'failed', 'canceled'). The local SQLite database has +-- additional statuses ('queued', 'running') that are not synced to Postgres. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +INSERT INTO roborev.schema_version (version) VALUES (1); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + agent TEXT NOT NULL, + -- NOTE: no model column in v1 + reasoning TEXT, + status TEXT NOT NULL CHECK(status IN ('done', 'failed', 'canceled')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + addressed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/schemas/postgres_v2.sql b/internal/storage/schemas/postgres_v2.sql new file mode 100644 index 00000000..418eed32 --- /dev/null +++ b/internal/storage/schemas/postgres_v2.sql @@ -0,0 +1,91 @@ +-- PostgreSQL schema version 2 +-- Added model column to review_jobs for specifying which model an agent should use. +-- Note: Version is managed by EnsureSchema(), not this file. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + agent TEXT NOT NULL, + model TEXT, + reasoning TEXT, + status TEXT NOT NULL CHECK(status IN ('done', 'failed', 'canceled')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + addressed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/sync.go b/internal/storage/sync.go index 7a9e4012..5d8d8f8b 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -230,6 +230,7 @@ type SyncableJob struct { CommitTimestamp time.Time GitRef string Agent string + Model string Reasoning string Status string Agentic bool @@ -250,7 +251,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) SELECT j.id, j.uuid, j.repo_id, COALESCE(r.identity, ''), j.commit_id, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, ''), - j.git_ref, j.agent, COALESCE(j.reasoning, ''), j.status, j.agentic, + j.git_ref, j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), j.status, j.agentic, j.enqueued_at, COALESCE(j.started_at, ''), COALESCE(j.finished_at, ''), COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), j.source_machine_id, j.updated_at @@ -285,7 +286,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) err := rows.Scan( &j.ID, &j.UUID, &j.RepoID, &j.RepoIdentity, &commitID, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &commitTimestamp, - &j.GitRef, &j.Agent, &j.Reasoning, &j.Status, &j.Agentic, + &j.GitRef, &j.Agent, &j.Model, &j.Reasoning, &j.Status, &j.Agentic, &enqueuedAt, &startedAt, &finishedAt, &j.Prompt, &diffContent, &j.Error, &j.SourceMachineID, &updatedAt, @@ -529,17 +530,18 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error now := time.Now().UTC().Format(time.RFC3339) _, err := db.Exec(` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, agent, reasoning, status, agentic, + uuid, repo_id, commit_id, git_ref, agent, model, reasoning, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, source_machine_id, updated_at, synced_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET status = excluded.status, finished_at = excluded.finished_at, error = excluded.error, + model = COALESCE(excluded.model, review_jobs.model), updated_at = excluded.updated_at, synced_at = ? - `, j.UUID, repoID, commitID, j.GitRef, j.Agent, j.Reasoning, // reasoning can be empty string, not NULL + `, j.UUID, repoID, commitID, j.GitRef, j.Agent, nullStr(j.Model), j.Reasoning, // reasoning can be empty string, not NULL j.Status, j.Agentic, j.EnqueuedAt.Format(time.RFC3339), nullTimeStr(j.StartedAt), nullTimeStr(j.FinishedAt), nullStr(j.Prompt), j.DiffContent, nullStr(j.Error), diff --git a/internal/storage/sync_test.go b/internal/storage/sync_test.go index 03bf5e3c..1c66f83f 100644 --- a/internal/storage/sync_test.go +++ b/internal/storage/sync_test.go @@ -126,7 +126,7 @@ func TestBackfillSourceMachineID(t *testing.T) { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test", "thorough") + job, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -514,11 +514,11 @@ func TestGetKnownJobUUIDs(t *testing.T) { } // Create two jobs with UUIDs - job1, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test", "thorough") + job1, err := db.EnqueueJob(repo.ID, commit.ID, "abc123", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } - job2, err := db.EnqueueJob(repo.ID, commit.ID, "def456", "test", "quick") + job2, err := db.EnqueueJob(repo.ID, commit.ID, "def456", "test", "", "quick") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1015,7 +1015,7 @@ func TestGetJobsToSync_TimestampComparison(t *testing.T) { } // Create a job and complete it - job, err := db.EnqueueJob(repo.ID, commit.ID, "sync-test-sha", "test", "thorough") + job, err := db.EnqueueJob(repo.ID, commit.ID, "sync-test-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1092,7 +1092,7 @@ func TestGetJobsToSync_TimestampComparison(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job2, err := db.EnqueueJob(repo.ID, commit2.ID, "mixed-format-sha", "test", "thorough") + job2, err := db.EnqueueJob(repo.ID, commit2.ID, "mixed-format-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1178,7 +1178,7 @@ func TestGetJobsToSync_TimestampComparison(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job3, err := tzDB.EnqueueJob(tzRepo.ID, commit3.ID, "tz-test-sha", "test", "thorough") + job3, err := tzDB.EnqueueJob(tzRepo.ID, commit3.ID, "tz-test-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1255,7 +1255,7 @@ func TestGetReviewsToSync_TimestampComparison(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "review-sync-sha", "test", "thorough") + job, err := db.EnqueueJob(repo.ID, commit.ID, "review-sync-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1410,7 +1410,7 @@ func TestGetReviewsToSync_TimestampComparison(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - tzJob, err := tzDB.EnqueueJob(tzRepo.ID, tzCommit.ID, "tz-review-sha", "test", "thorough") + tzJob, err := tzDB.EnqueueJob(tzRepo.ID, tzCommit.ID, "tz-review-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1502,7 +1502,7 @@ func TestGetCommentsToSync_LegacyCommentsExcluded(t *testing.T) { } // Create a job-based response (should be synced) - job, err := db.EnqueueJob(repo.ID, commit.ID, "legacy-resp-sha", "test", "thorough") + job, err := db.EnqueueJob(repo.ID, commit.ID, "legacy-resp-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1617,7 +1617,7 @@ func TestUpsertPulledResponse_WithParentJob(t *testing.T) { if err != nil { t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, "parent-job-sha", "test", "thorough") + job, err := db.EnqueueJob(repo.ID, commit.ID, "parent-job-sha", "test", "", "thorough") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -1913,7 +1913,7 @@ func (h *syncTestHelper) createCompletedJob(sha string) *ReviewJob { if err != nil { h.t.Fatalf("Failed to create commit: %v", err) } - job, err := h.db.EnqueueJob(h.repo.ID, commit.ID, sha, "test", "thorough") + job, err := h.db.EnqueueJob(h.repo.ID, commit.ID, sha, "test", "", "thorough") if err != nil { h.t.Fatalf("Failed to enqueue job: %v", err) } @@ -2157,3 +2157,86 @@ func TestSyncOrder_FullWorkflow(t *testing.T) { t.Errorf("Expected 3 responses to sync, got %d", len(responses)) } } + +func TestUpsertPulledJob_BackfillsModel(t *testing.T) { + // This test verifies that upserting a pulled job with a model value backfills + // an existing job that has NULL model (COALESCE behavior in SQLite) + dbPath := filepath.Join(t.TempDir(), "test.db") + db, err := Open(dbPath) + if err != nil { + t.Fatalf("Failed to open database: %v", err) + } + defer db.Close() + + // Create a repo + repo, err := db.GetOrCreateRepo("/test/repo") + if err != nil { + t.Fatalf("GetOrCreateRepo failed: %v", err) + } + + // Insert a job with NULL model using EnqueueJob (which sets model to empty string by default) + // We need to directly insert with NULL model to test the COALESCE behavior + jobUUID := "test-uuid-backfill-" + time.Now().Format("20060102150405") + _, err = db.Exec(` + INSERT INTO review_jobs (uuid, repo_id, git_ref, agent, status, enqueued_at) + VALUES (?, ?, 'HEAD', 'test-agent', 'done', datetime('now')) + `, jobUUID, repo.ID) + if err != nil { + t.Fatalf("Failed to insert job with NULL model: %v", err) + } + + // Verify model is NULL + var modelBefore sql.NullString + err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelBefore) + if err != nil { + t.Fatalf("Failed to query model before: %v", err) + } + if modelBefore.Valid { + t.Fatalf("Expected model to be NULL before upsert, got %q", modelBefore.String) + } + + // Upsert with a model value - should backfill + pulledJob := PulledJob{ + UUID: jobUUID, + RepoIdentity: "/test/repo", + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", // Now providing a model + Status: "done", + SourceMachineID: "test-machine", + EnqueuedAt: time.Now(), + UpdatedAt: time.Now(), + } + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + if err != nil { + t.Fatalf("UpsertPulledJob failed: %v", err) + } + + // Verify model was backfilled + var modelAfter sql.NullString + err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelAfter) + if err != nil { + t.Fatalf("Failed to query model after: %v", err) + } + if !modelAfter.Valid { + t.Error("Expected model to be backfilled, but it's still NULL") + } else if modelAfter.String != "gpt-4" { + t.Errorf("Expected model 'gpt-4', got %q", modelAfter.String) + } + + // Also verify that upserting with empty model doesn't clear existing model + pulledJob.Model = "" // Empty model + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + if err != nil { + t.Fatalf("UpsertPulledJob (empty model) failed: %v", err) + } + + var modelPreserved sql.NullString + err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelPreserved) + if err != nil { + t.Fatalf("Failed to query model preserved: %v", err) + } + if !modelPreserved.Valid || modelPreserved.String != "gpt-4" { + t.Errorf("Expected model to be preserved as 'gpt-4' when upserting with empty model, got %v", modelPreserved) + } +} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 52406752..9f319bb5 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -81,7 +81,7 @@ func CreateTestJobs(t *testing.T, db *storage.DB, repo *storage.Repo, count int, t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, sha, agent, "") + job, err := db.EnqueueJob(repo.ID, commit.ID, sha, agent, "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) } @@ -101,7 +101,7 @@ func CreateTestJobWithSHA(t *testing.T, db *storage.DB, repo *storage.Repo, sha, t.Fatalf("GetOrCreateCommit failed: %v", err) } - job, err := db.EnqueueJob(repo.ID, commit.ID, sha, agent, "") + job, err := db.EnqueueJob(repo.ID, commit.ID, sha, agent, "", "") if err != nil { t.Fatalf("EnqueueJob failed: %v", err) }