Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions cortex_on/TRACING_IMPLEMENTATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# CortexON Tracing System Implementation

## Overview

Successfully implemented a lightweight execution tracing and structured logging system for the CortexON multi-agent workflow system. The implementation meets all specified requirements and integrates cleanly with the existing FastAPI + PydanticAI architecture.

## ✅ Requirements Met

### 1. Unique Request Tracking
- ✅ Every user request generates a unique `run_id` via FastAPI middleware
- ✅ `run_id` is propagated across all async boundaries using `contextvars`
- ✅ `X-Run-ID` header is automatically added to HTTP responses

### 2. Step-Level Tracing
- ✅ Every agent action/tool execution generates a unique `step_id`
- ✅ Step lifecycle managed via helper functions and context managers
- ✅ Automatic step start/end with success/error tracking

### 3. Structured Logging
- ✅ JSON-friendly structured logs with automatic trace metadata injection
- ✅ Logs include `run_id`, `step_id`, `agent_name`, `tool_name`
- ✅ Compatible with Pydantic Logfire
- ✅ Graceful fallback when Logfire is not available

### 4. Async-Safe Context Propagation
- ✅ Uses `contextvars.ContextVar` for proper async context propagation
- ✅ Context automatically propagates across async function calls
- ✅ No context leakage between concurrent requests

### 5. Clean Integration
- ✅ Integrates with existing FastAPI + PydanticAI code
- ✅ No breaking changes to existing APIs
- ✅ Minimal code changes required for integration

### 6. Configuration & Performance
- ✅ Environment-based configuration (`TRACING_ENABLED`, `LOG_FORMAT`)
- ✅ Can be disabled with near-zero overhead
- ✅ No persistence required (logs only)

## 📁 File Structure

```
cortex_on/tracing/
├── __init__.py # Main exports and public API
├── README.md # Comprehensive documentation
├── context.py # TraceContext and contextvars management
├── middleware.py # FastAPI middleware for run_id generation
├── steps.py # Step lifecycle helper functions
├── logging.py # Structured logging with trace metadata
├── decorators.py # Automatic tracing decorators
└── config.py # Environment-based configuration

cortex_on/tests/
└── test_tracing.py # Comprehensive test suite

cortex_on/examples/
└── tracing_demo.py # Complete demonstration script
```

## 🔧 Integration Points

### FastAPI Application (`main.py`)
```python
from tracing import TracingMiddleware
app.add_middleware(TracingMiddleware)
```

### Orchestrator Agent (`agents/orchestrator_agent.py`)
```python
from tracing import trace_agent_tool, log as trace_log

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="plan_task")
async def plan_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
trace_log.info("Planning task", task=task)
# ... existing logic
```

### System Instructor (`instructor.py`)
```python
from tracing import start_step, end_step, log as trace_log

async def run(self, task: str, websocket: WebSocket) -> List[Dict[str, Any]]:
start_step("SystemInstructor", "orchestrate")
trace_log.info("Starting orchestration", task=task)
# ... existing logic
```

## 🎯 Key Features

### 1. TraceContext
- Immutable context object with `run_id`, `step_id`, `agent_name`, `tool_name`
- Factory methods for creating run and step contexts
- Serializable to dictionary for logging

### 2. Automatic Middleware
- Creates unique `run_id` per HTTP request
- Sets trace context for async propagation
- Adds `X-Run-ID` to response headers
- Can be disabled via configuration

### 3. Step Lifecycle Management
```python
# Manual management
start_step("AgentName", "tool_name")
end_step("success")

# Context manager (recommended)
async with trace_step("AgentName", "tool_name"):
# Your logic here
pass

# Decorator (automatic)
@trace_agent_tool(agent_name="MyAgent", tool_name="my_tool")
async def my_tool():
pass
```

### 4. Structured Logging
```python
from tracing import log

# Automatically includes trace metadata
log.info("Processing request", user_id=123, action="login")
# Output: {"level": "INFO", "message": "Processing request",
# "run_id": "...", "step_id": "...", "agent_name": "...",
# "user_id": 123, "action": "login"}
```

### 5. Configuration
```bash
# Environment variables
export TRACING_ENABLED=true
export LOG_FORMAT=json
export TRACING_INCLUDE_SENSITIVE=false
export TRACING_MAX_STEP_DEPTH=10
```

## 🧪 Testing

Comprehensive test suite covering:
- TraceContext functionality
- Context propagation across async boundaries
- FastAPI middleware integration
- Step lifecycle management
- Structured logging
- Decorator functionality
- Error handling
- End-to-end integration

## 📊 Example Log Output

With tracing enabled, logs automatically include trace metadata:

```json
{
"level": "INFO",
"message": "Planning task",
"run_id": "550e8400-e29b-41d4-a716-446655440000",
"step_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
"agent_name": "Orchestrator",
"tool_name": "plan_task",
"task": "Create a new user account"
}
```

## 🚀 Usage Examples

### Basic HTTP Request Tracing
```bash
curl -X GET http://localhost:8000/agent/chat?task="Hello"
# Response includes: X-Run-ID: 550e8400-e29b-41d4-a716-446655440000
```

### Agent Tool Tracing
```python
@trace_agent_tool(agent_name="DataProcessor", tool_name="validate_data")
async def validate_data(data):
log.info("Validating data", size=len(data))
# Validation logic
return is_valid
```

### Multi-Step Workflow
```python
async with trace_step("Orchestrator", "plan_workflow"):
plan = await create_plan(task)

for step in plan.steps:
async with trace_step("Executor", f"execute_{step.name}"):
result = await execute_step(step)
log.info("Step completed", step=step.name, result=result)
```

## 🎉 Benefits Achieved

1. **Complete Request Visibility**: Every user request can be traced from start to finish
2. **Agent Action Tracking**: Every agent tool execution is automatically logged
3. **Structured Observability**: Rich, searchable logs with consistent metadata
4. **Zero Breaking Changes**: Existing code continues to work unchanged
5. **Performance Conscious**: Can be disabled with minimal overhead
6. **Developer Friendly**: Simple APIs with comprehensive documentation
7. **Production Ready**: Robust error handling and configuration options

## 🔄 Next Steps

The tracing system is fully implemented and ready for use. To enable:

1. Set `TRACING_ENABLED=true` in environment
2. Optionally set `LOG_FORMAT=json` for structured logs
3. The system will automatically start tracing all requests and agent actions

For additional agent integration, simply add the `@trace_agent_tool` decorator to agent tools and use the `log` object for structured logging.

## 📝 Documentation

- **README.md**: Comprehensive user guide with examples
- **API Documentation**: Inline docstrings throughout codebase
- **Test Suite**: Demonstrates usage patterns and validates functionality
- **Demo Script**: Complete working examples of all features

The implementation is complete, tested, and ready for production use!



10 changes: 9 additions & 1 deletion cortex_on/agents/orchestrator_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from agents.planner_agent import planner_agent, update_todo_status
from agents.code_agent import coder_agent, CoderAgentDeps
from utils.ant_client import get_client
from tracing import trace_agent_tool, log as trace_log

@dataclass
class orchestrator_deps:
Expand Down Expand Up @@ -171,10 +172,12 @@ class orchestrator_deps:
)

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="plan_task")
async def plan_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
"""Plans the task and assigns it to the appropriate agents"""
try:
logfire.info(f"Planning task: {task}")
trace_log.info("Planning task", task=task)

# Create a new StreamResponse for Planner Agent
planner_stream_output = StreamResponse(
Expand Down Expand Up @@ -228,10 +231,12 @@ async def plan_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
return f"Failed to plan task: {error_msg}"

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="coder_task")
async def coder_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
"""Assigns coding tasks to the coder agent"""
try:
logfire.info(f"Assigning coding task: {task}")
trace_log.info("Assigning coding task", task=task)

# Create a new StreamResponse for Coder Agent
coder_stream_output = StreamResponse(
Expand Down Expand Up @@ -286,6 +291,7 @@ async def coder_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
return f"Failed to assign coding task: {error_msg}"

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="web_surfer_task")
async def web_surfer_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
"""Assigns web surfing tasks to the web surfer agent"""
try:
Expand Down Expand Up @@ -347,6 +353,7 @@ async def web_surfer_task(ctx: RunContext[orchestrator_deps], task: str) -> str:
return f"Failed to assign web surfing task: {error_msg}"

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="ask_human")
async def ask_human(ctx: RunContext[orchestrator_deps], question: str) -> str:
"""Sends a question to the frontend and waits for human input"""
try:
Expand Down Expand Up @@ -394,6 +401,7 @@ async def ask_human(ctx: RunContext[orchestrator_deps], question: str) -> str:
return f"Failed to get human input: {error_msg}"

@orchestrator_agent.tool
@trace_agent_tool(agent_name="Orchestrator", tool_name="planner_agent_update")
async def planner_agent_update(ctx: RunContext[orchestrator_deps], completed_task: str) -> str:
"""
Updates the todo.md file to mark a task as completed and returns the full updated plan.
Expand Down Expand Up @@ -484,7 +492,7 @@ async def planner_agent_update(ctx: RunContext[orchestrator_deps], completed_tas
logfire.error(error_msg, exc_info=True)

planner_stream_output.steps.append(f"Plan update failed: {str(e)}")
planner_stream_output.status_code = a500
planner_stream_output.status_code = 500
await _safe_websocket_send(ctx.deps.websocket, planner_stream_output)

return f"Failed to update the plan: {error_msg}"
Expand Down
Loading