diff --git a/apps/inference/src/claudebench_inference/main.py b/apps/inference/src/claudebench_inference/main.py index 75ab330..5551f37 100644 --- a/apps/inference/src/claudebench_inference/main.py +++ b/apps/inference/src/claudebench_inference/main.py @@ -159,17 +159,25 @@ async def decompose_task(request: DecompositionRequest): logger.info(f"Decomposition request for session {request.sessionId}: {request.task[:50]}...") try: + # Extract working directory from context if provided + working_directory = None + if request.context and hasattr(request.context, 'workingDirectory'): + working_directory = request.context.workingDirectory + if working_directory: + logger.info(f"Using working directory for decomposition: {working_directory}") + # Build the prompt prompt = prompt_builder.build_decomposition_prompt( task=request.task, context=request.context ) - # Perform sampling + # Perform sampling with working directory result = await sampling_engine.sample_json( prompt=prompt, max_tokens=8192, - temperature=0.7 + temperature=0.7, + working_directory=working_directory ) # Validate and return response @@ -222,6 +230,13 @@ async def generate_context(request: ContextRequest): logger.info(f"Context generation for subtask {request.subtaskId}") try: + # Extract working directory from subtask metadata if provided + working_directory = None + if request.subtask and isinstance(request.subtask, dict): + working_directory = request.subtask.get('workingDirectory') + if working_directory: + logger.info(f"Using working directory: {working_directory}") + # Build the prompt prompt = prompt_builder.build_context_prompt( subtaskId=request.subtaskId, @@ -229,11 +244,12 @@ async def generate_context(request: ContextRequest): subtask=request.subtask ) - # Perform sampling + # Perform sampling with working directory result = await sampling_engine.sample_json( prompt=prompt, max_tokens=16384, - temperature=0.5 # Lower temperature for more focused context + temperature=0.5, # Lower temperature for more focused context + working_directory=working_directory ) # Validate and return response diff --git a/apps/inference/src/claudebench_inference/models.py b/apps/inference/src/claudebench_inference/models.py index 0f80dcb..5490a0e 100644 --- a/apps/inference/src/claudebench_inference/models.py +++ b/apps/inference/src/claudebench_inference/models.py @@ -52,6 +52,7 @@ class DecompositionContext(BaseModel): specialists: List[Specialist] priority: int = Field(ge=0, le=100) constraints: Optional[List[str]] = Field(default_factory=list) + workingDirectory: Optional[str] = None # Working directory for codebase exploration class DecompositionRequest(BaseModel): diff --git a/apps/inference/src/claudebench_inference/sampling.py b/apps/inference/src/claudebench_inference/sampling.py index 6e772ca..577d49f 100644 --- a/apps/inference/src/claudebench_inference/sampling.py +++ b/apps/inference/src/claudebench_inference/sampling.py @@ -50,7 +50,8 @@ async def sample( max_tokens: int = 2000, # Note: not directly used by SDK temperature: float = 0.7, # Note: not directly used by SDK system_prompt: Optional[str] = None, - max_turns: int = 50 # Allow extensive exploration + max_turns: int = 50, # Allow extensive exploration + working_directory: Optional[str] = None # Working directory for the context ) -> str: """ Perform sampling using claude-code-sdk @@ -61,6 +62,7 @@ async def sample( temperature: Sampling temperature (not used by SDK) system_prompt: Override the default system prompt max_turns: Maximum number of turns for tool usage + working_directory: Working directory to run the SDK in Returns: The response text from Claude @@ -71,11 +73,16 @@ async def sample( try: self.stats["total_requests"] += 1 + # Log the working directory if provided + if working_directory: + logger.info(f"Using working directory: {working_directory}") + options = ClaudeCodeOptions( max_turns=max_turns, # Allow multiple turns for exploration system_prompt=system_prompt or self.default_system_prompt, allowed_tools=self.allowed_tools, - permission_mode='bypassPermissions' # Allow tool use without prompting + permission_mode='bypassPermissions', # Allow tool use without prompting + cwd=working_directory # Set the working directory for the SDK ) response_text = "" @@ -133,7 +140,8 @@ async def sample_json( max_tokens: int = 2000, temperature: float = 0.7, system_prompt: Optional[str] = None, - max_turns: int = 50 # Allow extensive exploration + max_turns: int = 50, # Allow extensive exploration + working_directory: Optional[str] = None ) -> Dict[str, Any]: """ Sample and parse JSON response @@ -144,6 +152,7 @@ async def sample_json( temperature: Sampling temperature system_prompt: Override the default system prompt max_turns: Maximum number of turns for tool usage + working_directory: Working directory to run the SDK in Returns: Parsed JSON response as dictionary @@ -151,7 +160,7 @@ async def sample_json( Raises: Exception: If sampling or parsing fails """ - response = await self.sample(prompt, max_tokens, temperature, system_prompt, max_turns) + response = await self.sample(prompt, max_tokens, temperature, system_prompt, max_turns, working_directory) return self.extract_json(response) def get_stats(self) -> Dict[str, Any]: diff --git a/apps/server/src/core/lua-scripts.ts b/apps/server/src/core/lua-scripts.ts index bfa9678..f7df8c1 100644 --- a/apps/server/src/core/lua-scripts.ts +++ b/apps/server/src/core/lua-scripts.ts @@ -842,6 +842,7 @@ local instance_id = ARGV[1] local roles_json = ARGV[2] local timestamp = ARGV[3] local ttl = tonumber(ARGV[4]) +local metadata_json = ARGV[5] or '{}' -- Register instance redis.call('hset', instance_key, @@ -849,7 +850,8 @@ redis.call('hset', instance_key, 'roles', roles_json, 'health', 'healthy', 'status', 'ACTIVE', - 'lastSeen', timestamp + 'lastSeen', timestamp, + 'metadata', metadata_json ) redis.call('expire', instance_key, ttl) diff --git a/apps/server/src/core/redis-scripts.ts b/apps/server/src/core/redis-scripts.ts index fea0ed0..00e1529 100644 --- a/apps/server/src/core/redis-scripts.ts +++ b/apps/server/src/core/redis-scripts.ts @@ -354,7 +354,8 @@ export class RedisScriptExecutor { async registerInstance( instanceId: string, roles: string[], - ttl: number + ttl: number, + metadata?: any ): Promise<{ success: boolean; becameLeader: boolean }> { const result = await this.redis.stream.eval( scripts.INSTANCE_REGISTER, @@ -364,7 +365,8 @@ export class RedisScriptExecutor { instanceId, JSON.stringify(roles), Date.now().toString(), - ttl.toString() + ttl.toString(), + JSON.stringify(metadata || {}) ) as [number, number]; return { diff --git a/apps/server/src/core/sampling.ts b/apps/server/src/core/sampling.ts index 46ea0ec..2e6d0b1 100644 --- a/apps/server/src/core/sampling.ts +++ b/apps/server/src/core/sampling.ts @@ -16,6 +16,7 @@ export interface DecompositionContext { }>; priority: number; constraints?: string[]; + workingDirectory?: string; } export interface Decomposition { diff --git a/apps/server/src/handlers/swarm/swarm.decompose.handler.ts b/apps/server/src/handlers/swarm/swarm.decompose.handler.ts index 3644705..c99f798 100644 --- a/apps/server/src/handlers/swarm/swarm.decompose.handler.ts +++ b/apps/server/src/handlers/swarm/swarm.decompose.handler.ts @@ -3,6 +3,7 @@ import type { EventContext } from "@/core/context"; import { swarmDecomposeInput, swarmDecomposeOutput } from "@/schemas/swarm.schema"; import type { SwarmDecomposeInput, SwarmDecomposeOutput } from "@/schemas/swarm.schema"; import { redisScripts } from "@/core/redis-scripts"; +import { getRedis } from "@/core/redis"; import { getSamplingService } from "@/core/sampling"; import { registry } from "@/core/registry"; @@ -82,6 +83,23 @@ export class SwarmDecomposeHandler { throw new Error("No session ID available for sampling"); } + // Get worker's working directory from instance metadata + const redis = getRedis(); + let workingDirectory: string | undefined; + if (ctx.instanceId) { + const instanceKey = `cb:instance:${ctx.instanceId}`; + const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata'); + if (instanceMetadata) { + try { + const metadata = JSON.parse(instanceMetadata); + workingDirectory = metadata.workingDirectory; + console.log(`[SwarmDecompose] Using working directory from instance ${ctx.instanceId}: ${workingDirectory}`); + } catch (e) { + console.warn(`[SwarmDecompose] Failed to parse instance metadata:`, e); + } + } + } + // Request decomposition via sampling const decomposition = await samplingService.requestDecomposition( sessionId, @@ -89,7 +107,8 @@ export class SwarmDecomposeHandler { { specialists, priority: input.priority, - constraints: input.constraints + constraints: input.constraints, + workingDirectory } ); diff --git a/apps/server/src/handlers/system/system.register.handler.ts b/apps/server/src/handlers/system/system.register.handler.ts index d3f6fc7..a84ec70 100644 --- a/apps/server/src/handlers/system/system.register.handler.ts +++ b/apps/server/src/handlers/system/system.register.handler.ts @@ -33,7 +33,8 @@ export class SystemRegisterHandler { const result = await redisScripts.registerInstance( input.id, input.roles, - ttl + ttl, + input.metadata ); console.log(`[SystemRegister] Registration result for ${input.id}:`, result); diff --git a/apps/server/src/handlers/task/task.context.handler.ts b/apps/server/src/handlers/task/task.context.handler.ts index 27c0a61..abf3685 100644 --- a/apps/server/src/handlers/task/task.context.handler.ts +++ b/apps/server/src/handlers/task/task.context.handler.ts @@ -148,7 +148,37 @@ export class TaskContextHandler { url: att.url || null })); - // Prepare task info for context generation + // Generate context via sampling service + const samplingService = getSamplingService(); + const sessionId = ctx.metadata?.sessionId || ctx.metadata?.clientId || ctx.instanceId; + + if (!sessionId) { + throw new Error("No session ID available for sampling"); + } + + // Get worker's working directory from instance metadata + let workingDirectory: string | undefined; + // First check if a specific worker was requested in the input metadata + const requestedWorkerId = input.metadata?.workerId; + const workerId = requestedWorkerId || ctx.instanceId; + + if (workerId) { + const instanceKey = `cb:instance:${workerId}`; + const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata'); + if (instanceMetadata) { + try { + const metadata = JSON.parse(instanceMetadata); + workingDirectory = metadata.workingDirectory; + console.log(`[TaskContext] Using working directory from instance ${workerId}: ${workingDirectory}`); + } catch (e) { + console.warn(`[TaskContext] Failed to parse instance metadata for ${workerId}:`, e); + } + } else if (requestedWorkerId) { + console.warn(`[TaskContext] Requested worker ${requestedWorkerId} not found or has no metadata`); + } + } + + // Prepare task info for context generation - include workingDirectory const taskInfo = { id: input.taskId, description: input.customDescription || taskData.text || "No description", @@ -160,17 +190,10 @@ export class TaskContextHandler { constraints: input.constraints || [], requirements: input.requirements || [], existingFiles: input.existingFiles || [], - additionalContext: input.additionalContext || "" + additionalContext: input.additionalContext || "", + workingDirectory: workingDirectory // Now properly typed in the object }; - // Generate context via sampling service - const samplingService = getSamplingService(); - const sessionId = ctx.metadata?.sessionId || ctx.metadata?.clientId || ctx.instanceId; - - if (!sessionId) { - throw new Error("No session ID available for sampling"); - } - // Call the context generation endpoint with task info const response = await samplingService.generateContext( sessionId, diff --git a/apps/server/src/handlers/task/task.create_project.handler.ts b/apps/server/src/handlers/task/task.create_project.handler.ts index 9e39853..f9b3be6 100644 --- a/apps/server/src/handlers/task/task.create_project.handler.ts +++ b/apps/server/src/handlers/task/task.create_project.handler.ts @@ -178,7 +178,8 @@ export class TaskCreateProjectHandler { sessionId: sessionId, metadata: { projectId: projectId, - source: "task.create_project" + source: "task.create_project", + workerId: input.metadata?.workerId // Pass through the workerId } }, ctx.metadata?.clientId); diff --git a/apps/server/src/handlers/task/task.decompose.handler.ts b/apps/server/src/handlers/task/task.decompose.handler.ts index 8bc3007..65111bf 100644 --- a/apps/server/src/handlers/task/task.decompose.handler.ts +++ b/apps/server/src/handlers/task/task.decompose.handler.ts @@ -115,6 +115,22 @@ export class TaskDecomposeHandler { throw new Error("No session ID available for sampling"); } + // Get worker's working directory from instance metadata + let workingDirectory: string | undefined; + if (ctx.instanceId) { + const instanceKey = `cb:instance:${ctx.instanceId}`; + const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata'); + if (instanceMetadata) { + try { + const metadata = JSON.parse(instanceMetadata); + workingDirectory = metadata.workingDirectory; + console.log(`[TaskDecompose] Using working directory from instance ${ctx.instanceId}: ${workingDirectory}`); + } catch (e) { + console.warn(`[TaskDecompose] Failed to parse instance metadata:`, e); + } + } + } + // Request decomposition via sampling const decomposition = await samplingService.requestDecomposition( sessionId, @@ -122,7 +138,8 @@ export class TaskDecomposeHandler { { specialists, priority: input.priority, - constraints: input.constraints + constraints: input.constraints, + workingDirectory } ); diff --git a/apps/server/src/schemas/system.schema.ts b/apps/server/src/schemas/system.schema.ts index 2ce3176..92317fc 100644 --- a/apps/server/src/schemas/system.schema.ts +++ b/apps/server/src/schemas/system.schema.ts @@ -18,6 +18,9 @@ export const systemHealthOutput = z.object({ export const systemRegisterInput = z.object({ id: z.string().min(1), roles: z.array(z.string()), + metadata: z.object({ + workingDirectory: z.string().optional(), + }).optional(), }); export const systemRegisterOutput = z.object({ diff --git a/apps/web/src/components/ContextGenerationDialog.tsx b/apps/web/src/components/ContextGenerationDialog.tsx index 1ecc86f..bc686c9 100644 --- a/apps/web/src/components/ContextGenerationDialog.tsx +++ b/apps/web/src/components/ContextGenerationDialog.tsx @@ -1,4 +1,4 @@ -import { useState } from "react"; +import { useState, useEffect } from "react"; import { Dialog, DialogContent, @@ -33,8 +33,10 @@ import { Zap, CheckCircle, XCircle, + User, + FolderOpen, } from "lucide-react"; -import { useGenerateContext } from "@/services/event-client"; +import { useGenerateContext, useSystemState } from "@/services/event-client"; interface Task { id: string; @@ -57,6 +59,7 @@ export function ContextGenerationDialog({ onSuccess, }: ContextGenerationDialogProps) { const [specialist, setSpecialist] = useState("general"); + const [selectedWorker, setSelectedWorker] = useState(""); const [customDescription, setCustomDescription] = useState(""); const [constraints, setConstraints] = useState(""); const [requirements, setRequirements] = useState(""); @@ -67,6 +70,21 @@ export function ContextGenerationDialog({ const [error, setError] = useState(null); const generateContextMutation = useGenerateContext(); + const { data: systemState } = useSystemState(); + + // Extract active workers from system state + const activeWorkers = systemState?.instances?.filter((instance: any) => + instance.roles?.includes("worker") || + instance.roles?.includes("relay") || + instance.id?.startsWith("worker-") + ) || []; + + // Set default worker when available + useEffect(() => { + if (activeWorkers.length > 0 && !selectedWorker) { + setSelectedWorker(activeWorkers[0].id); + } + }, [activeWorkers, selectedWorker]); const handleGenerateContext = async () => { if (!task) return; @@ -89,6 +107,9 @@ export function ContextGenerationDialog({ requirements: requirementsList, existingFiles: filesList, additionalContext: additionalContext || undefined, + metadata: { + workerId: selectedWorker || undefined, + }, }); setGeneratedContext(result); @@ -333,45 +354,91 @@ export function ContextGenerationDialog({ -
- - +
+
+ + +
+ +
+ + + {selectedWorker && ( +

+ Context will be generated using this worker's environment +

+ )} +
diff --git a/apps/web/src/components/InstanceManager.tsx b/apps/web/src/components/InstanceManager.tsx index e014ff5..dc03002 100644 --- a/apps/web/src/components/InstanceManager.tsx +++ b/apps/web/src/components/InstanceManager.tsx @@ -43,6 +43,7 @@ import { Server, Zap, Heart, + FolderOpen, } from "lucide-react"; import { useEventMutation, useEventQuery } from "@/services/event-client"; import { formatDistanceToNow } from "date-fns"; @@ -54,6 +55,8 @@ interface Instance { health?: string; lastSeen?: string; taskCount?: number; + workingDirectory?: string; + metadata?: any; } interface InstanceManagerProps { @@ -112,6 +115,21 @@ export function InstanceManager({ onInstancesChange, className }: InstanceManage } } } + + // Extract working directory from metadata + let workingDirectory: string | undefined; + let metadata = inst.metadata; + if (metadata) { + if (typeof metadata === 'string') { + try { + metadata = JSON.parse(metadata); + } catch { + // Keep as is if not valid JSON + } + } + workingDirectory = metadata?.workingDirectory; + } + return { id: inst.id || inst.instanceId, roles, @@ -119,6 +137,8 @@ export function InstanceManager({ onInstancesChange, className }: InstanceManage health: inst.health || "healthy", lastSeen: inst.lastSeen || inst.lastHeartbeat || new Date().toISOString(), taskCount: inst.taskCount || 0, + workingDirectory, + metadata, }; }); setInstances(instanceList); @@ -444,6 +464,14 @@ export function InstanceManager({ onInstancesChange, className }: InstanceManage
)}
+ {instance.workingDirectory && ( +
+ + + {instance.workingDirectory} + +
+ )}
+ {/* Worker Selection */} +
+ + +

+ The project will be created and decomposed using this worker's environment. +

+
+ {/* Constraints */}