diff --git a/CHANGELOG.md b/CHANGELOG.md index 66849da2..ef9d16e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 7.6.0 - 2026-01-15 + +feat(ai): Add OpenAI Agents SDK integration + +Automatic tracing for agent workflows, handoffs, tool calls, guardrails, and custom spans. Includes `$ai_total_tokens`, `$ai_error_type` categorization, and `$ai_framework` property. + # 7.5.1 - 2026-01-07 fix: avoid return from finally block to fix Python 3.14 SyntaxWarning (#361) - thanks @jodal diff --git a/posthog/ai/openai_agents/__init__.py b/posthog/ai/openai_agents/__init__.py new file mode 100644 index 00000000..49e4186e --- /dev/null +++ b/posthog/ai/openai_agents/__init__.py @@ -0,0 +1,67 @@ +try: + import agents # noqa: F401 +except ImportError: + raise ModuleNotFoundError( + "Please install the OpenAI Agents SDK to use this feature: 'pip install openai-agents'" + ) + +from posthog.ai.openai_agents.processor import PostHogTracingProcessor + +__all__ = ["PostHogTracingProcessor", "instrument"] + + +def instrument( + client=None, + distinct_id=None, + privacy_mode: bool = False, + groups=None, + properties=None, +): + """ + One-liner to instrument OpenAI Agents SDK with PostHog tracing. + + This registers a PostHogTracingProcessor with the OpenAI Agents SDK, + automatically capturing traces, spans, and LLM generations. + + Args: + client: Optional PostHog client instance. If not provided, uses the default client. + distinct_id: Optional distinct ID to associate with all traces. + Can also be a callable that takes a trace and returns a distinct ID. + privacy_mode: If True, redacts input/output content from events. + groups: Optional PostHog groups to associate with events. + properties: Optional additional properties to include with all events. + + Returns: + PostHogTracingProcessor: The registered processor instance. + + Example: + ```python + from posthog.ai.openai_agents import instrument + + # Simple setup + instrument(distinct_id="user@example.com") + + # With custom properties + instrument( + distinct_id="user@example.com", + privacy_mode=True, + properties={"environment": "production"} + ) + + # Now run agents as normal - traces automatically sent to PostHog + from agents import Agent, Runner + agent = Agent(name="Assistant", instructions="You are helpful.") + result = Runner.run_sync(agent, "Hello!") + ``` + """ + from agents.tracing import add_trace_processor + + processor = PostHogTracingProcessor( + client=client, + distinct_id=distinct_id, + privacy_mode=privacy_mode, + groups=groups, + properties=properties, + ) + add_trace_processor(processor) + return processor diff --git a/posthog/ai/openai_agents/processor.py b/posthog/ai/openai_agents/processor.py new file mode 100644 index 00000000..07385d2c --- /dev/null +++ b/posthog/ai/openai_agents/processor.py @@ -0,0 +1,746 @@ +import json +import logging +import time +from datetime import datetime +from typing import Any, Callable, Dict, Optional, Union + +from agents.tracing import Span, Trace +from agents.tracing.processor_interface import TracingProcessor +from agents.tracing.span_data import ( + AgentSpanData, + CustomSpanData, + FunctionSpanData, + GenerationSpanData, + GuardrailSpanData, + HandoffSpanData, + MCPListToolsSpanData, + ResponseSpanData, + SpeechGroupSpanData, + SpeechSpanData, + TranscriptionSpanData, +) + +from posthog import setup +from posthog.client import Client + +log = logging.getLogger("posthog") + + +def _safe_json(obj: Any) -> Any: + """Safely convert object to JSON-serializable format.""" + if obj is None: + return None + try: + json.dumps(obj) + return obj + except (TypeError, ValueError): + return str(obj) + + +def _parse_iso_timestamp(iso_str: Optional[str]) -> Optional[float]: + """Parse ISO timestamp to Unix timestamp.""" + if not iso_str: + return None + try: + dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00")) + return dt.timestamp() + except (ValueError, AttributeError): + return None + + +class PostHogTracingProcessor(TracingProcessor): + """ + A tracing processor that sends OpenAI Agents SDK traces to PostHog. + + This processor implements the TracingProcessor interface from the OpenAI Agents SDK + and maps agent traces, spans, and generations to PostHog's LLM analytics events. + + Example: + ```python + from agents import Agent, Runner + from agents.tracing import add_trace_processor + from posthog.ai.openai_agents import PostHogTracingProcessor + + # Create and register the processor + processor = PostHogTracingProcessor( + distinct_id="user@example.com", + privacy_mode=False, + ) + add_trace_processor(processor) + + # Run agents as normal - traces automatically sent to PostHog + agent = Agent(name="Assistant", instructions="You are helpful.") + result = Runner.run_sync(agent, "Hello!") + ``` + """ + + def __init__( + self, + client: Optional[Client] = None, + distinct_id: Optional[Union[str, Callable[[Trace], Optional[str]]]] = None, + privacy_mode: bool = False, + groups: Optional[Dict[str, Any]] = None, + properties: Optional[Dict[str, Any]] = None, + ): + """ + Initialize the PostHog tracing processor. + + Args: + client: Optional PostHog client instance. If not provided, uses the default client. + distinct_id: Either a string distinct ID or a callable that takes a Trace + and returns a distinct ID. If not provided, uses the trace_id. + privacy_mode: If True, redacts input/output content from events. + groups: Optional PostHog groups to associate with all events. + properties: Optional additional properties to include with all events. + """ + self._client = client or setup() + self._distinct_id = distinct_id + self._privacy_mode = privacy_mode + self._groups = groups or {} + self._properties = properties or {} + + # Track span start times for latency calculation + self._span_start_times: Dict[str, float] = {} + + # Track trace metadata for associating with spans + self._trace_metadata: Dict[str, Dict[str, Any]] = {} + + def _get_distinct_id(self, trace: Optional[Trace]) -> str: + """Resolve the distinct ID for a trace.""" + if callable(self._distinct_id): + if trace: + result = self._distinct_id(trace) + if result: + return str(result) + return trace.trace_id if trace else "unknown" + elif self._distinct_id: + return str(self._distinct_id) + elif trace: + return trace.trace_id + return "unknown" + + def _with_privacy_mode(self, value: Any) -> Any: + """Apply privacy mode redaction if enabled.""" + if self._privacy_mode or ( + hasattr(self._client, "privacy_mode") and self._client.privacy_mode + ): + return None + return value + + def _get_group_id(self, trace_id: str) -> Optional[str]: + """Get the group_id for a trace from stored metadata.""" + if trace_id in self._trace_metadata: + return self._trace_metadata[trace_id].get("group_id") + return None + + def _capture_event( + self, + event: str, + properties: Dict[str, Any], + distinct_id: Optional[str] = None, + ) -> None: + """Capture an event to PostHog with error handling.""" + try: + if not hasattr(self._client, "capture") or not callable(self._client.capture): + return + + final_distinct_id = distinct_id or "unknown" + final_properties = { + **properties, + **self._properties, + } + + # Don't process person profile if no distinct_id + if distinct_id is None: + final_properties["$process_person_profile"] = False + + self._client.capture( + distinct_id=final_distinct_id, + event=event, + properties=final_properties, + groups=self._groups, + ) + except Exception as e: + log.debug(f"Failed to capture PostHog event: {e}") + + def on_trace_start(self, trace: Trace) -> None: + """Called when a new trace begins.""" + try: + trace_id = trace.trace_id + trace_name = trace.name + group_id = getattr(trace, "group_id", None) + metadata = getattr(trace, "metadata", None) + + # Store trace metadata for later (used by spans) + self._trace_metadata[trace_id] = { + "name": trace_name, + "group_id": group_id, + "metadata": metadata, + } + + distinct_id = self._get_distinct_id(trace) + + properties = { + "$ai_trace_id": trace_id, + "$ai_trace_name": trace_name, + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + } + + # Include group_id for linking related traces (e.g., conversation threads) + if group_id: + properties["$ai_group_id"] = group_id + + # Include trace metadata if present + if metadata: + properties["$ai_trace_metadata"] = _safe_json(metadata) + + self._capture_event( + event="$ai_trace", + distinct_id=distinct_id, + properties=properties, + ) + except Exception as e: + log.debug(f"Error in on_trace_start: {e}") + + def on_trace_end(self, trace: Trace) -> None: + """Called when a trace completes.""" + try: + trace_id = trace.trace_id + + # Clean up stored metadata + self._trace_metadata.pop(trace_id, None) + except Exception as e: + log.debug(f"Error in on_trace_end: {e}") + + def on_span_start(self, span: Span[Any]) -> None: + """Called when a new span begins.""" + try: + span_id = span.span_id + self._span_start_times[span_id] = time.time() + except Exception as e: + log.debug(f"Error in on_span_start: {e}") + + def on_span_end(self, span: Span[Any]) -> None: + """Called when a span completes.""" + try: + span_id = span.span_id + trace_id = span.trace_id + parent_id = span.parent_id + span_data = span.span_data + + # Calculate latency + start_time = self._span_start_times.pop(span_id, None) + if start_time: + latency = time.time() - start_time + else: + # Fall back to parsing timestamps + started = _parse_iso_timestamp(span.started_at) + ended = _parse_iso_timestamp(span.ended_at) + latency = (ended - started) if (started and ended) else 0 + + # Get distinct ID from trace metadata or default + distinct_id = self._get_distinct_id(None) + + # Get group_id from trace metadata for linking + group_id = self._get_group_id(trace_id) + + # Get error info if present + error_info = span.error + error_properties = {} + if error_info: + error_message = error_info.get("message", str(error_info)) + error_type_raw = error_info.get("type", "") + + # Categorize error type for cross-provider filtering/alerting + error_type = "unknown" + if "ModelBehaviorError" in error_type_raw or "ModelBehaviorError" in error_message: + error_type = "model_behavior_error" + elif "UserError" in error_type_raw or "UserError" in error_message: + error_type = "user_error" + elif "InputGuardrailTripwireTriggered" in error_message: + error_type = "input_guardrail_triggered" + elif "OutputGuardrailTripwireTriggered" in error_message: + error_type = "output_guardrail_triggered" + elif "MaxTurnsExceeded" in error_message: + error_type = "max_turns_exceeded" + + error_properties = { + "$ai_is_error": True, + "$ai_error": error_message, + "$ai_error_type": error_type, + } + + # Dispatch based on span data type + if isinstance(span_data, GenerationSpanData): + self._handle_generation_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, FunctionSpanData): + self._handle_function_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, AgentSpanData): + self._handle_agent_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, HandoffSpanData): + self._handle_handoff_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, GuardrailSpanData): + self._handle_guardrail_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, ResponseSpanData): + self._handle_response_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, CustomSpanData): + self._handle_custom_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, (TranscriptionSpanData, SpeechSpanData, SpeechGroupSpanData)): + self._handle_audio_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + elif isinstance(span_data, MCPListToolsSpanData): + self._handle_mcp_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + else: + # Unknown span type - capture as generic span + self._handle_generic_span( + span_data, trace_id, span_id, parent_id, latency, distinct_id, group_id, error_properties + ) + + except Exception as e: + log.debug(f"Error in on_span_end: {e}") + + def _handle_generation_span( + self, + span_data: GenerationSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle LLM generation spans - maps to $ai_generation event.""" + # Extract token usage + usage = span_data.usage or {} + input_tokens = usage.get("input_tokens") or usage.get("prompt_tokens", 0) + output_tokens = usage.get("output_tokens") or usage.get("completion_tokens", 0) + + # Extract model config parameters + model_config = span_data.model_config or {} + model_params = {} + for param in ["temperature", "max_tokens", "top_p", "frequency_penalty", "presence_penalty"]: + if param in model_config: + model_params[param] = model_config[param] + + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_model": span_data.model, + "$ai_model_parameters": model_params if model_params else None, + "$ai_input": self._with_privacy_mode(_safe_json(span_data.input)), + "$ai_output_choices": self._with_privacy_mode(_safe_json(span_data.output)), + "$ai_input_tokens": input_tokens, + "$ai_output_tokens": output_tokens, + "$ai_total_tokens": input_tokens + output_tokens, + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Add optional token fields if present + if usage.get("reasoning_tokens"): + properties["$ai_reasoning_tokens"] = usage["reasoning_tokens"] + if usage.get("cache_read_input_tokens"): + properties["$ai_cache_read_input_tokens"] = usage["cache_read_input_tokens"] + if usage.get("cache_creation_input_tokens"): + properties["$ai_cache_creation_input_tokens"] = usage["cache_creation_input_tokens"] + + self._capture_event("$ai_generation", properties, distinct_id) + + def _handle_function_span( + self, + span_data: FunctionSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle function/tool call spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_data.name, + "$ai_span_type": "tool", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_input_state": self._with_privacy_mode(_safe_json(span_data.input)), + "$ai_output_state": self._with_privacy_mode(_safe_json(span_data.output)), + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Add MCP data if present + if span_data.mcp_data: + properties["$ai_mcp_data"] = _safe_json(span_data.mcp_data) + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_agent_span( + self, + span_data: AgentSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle agent execution spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_data.name, + "$ai_span_type": "agent", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Add agent-specific metadata + if span_data.handoffs: + properties["$ai_agent_handoffs"] = span_data.handoffs + if span_data.tools: + properties["$ai_agent_tools"] = span_data.tools + if span_data.output_type: + properties["$ai_agent_output_type"] = span_data.output_type + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_handoff_span( + self, + span_data: HandoffSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle agent handoff spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": f"{span_data.from_agent} -> {span_data.to_agent}", + "$ai_span_type": "handoff", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_handoff_from_agent": span_data.from_agent, + "$ai_handoff_to_agent": span_data.to_agent, + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_guardrail_span( + self, + span_data: GuardrailSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle guardrail execution spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_data.name, + "$ai_span_type": "guardrail", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_guardrail_triggered": span_data.triggered, + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_response_span( + self, + span_data: ResponseSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle OpenAI Response API spans - maps to $ai_generation event.""" + response = span_data.response + response_id = response.id if response else None + + # Try to extract usage from response + usage = getattr(response, "usage", None) if response else None + input_tokens = 0 + output_tokens = 0 + if usage: + input_tokens = getattr(usage, "input_tokens", 0) or 0 + output_tokens = getattr(usage, "output_tokens", 0) or 0 + + # Try to extract model from response + model = getattr(response, "model", None) if response else None + + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_model": model, + "$ai_response_id": response_id, + "$ai_input": self._with_privacy_mode(_safe_json(span_data.input)), + "$ai_input_tokens": input_tokens, + "$ai_output_tokens": output_tokens, + "$ai_total_tokens": input_tokens + output_tokens, + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Extract output content from response + if response: + output_items = getattr(response, "output", None) + if output_items: + properties["$ai_output_choices"] = self._with_privacy_mode(_safe_json(output_items)) + + self._capture_event("$ai_generation", properties, distinct_id) + + def _handle_custom_span( + self, + span_data: CustomSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle custom user-defined spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_data.name, + "$ai_span_type": "custom", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_custom_data": self._with_privacy_mode(_safe_json(span_data.data)), + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_audio_span( + self, + span_data: Union[TranscriptionSpanData, SpeechSpanData, SpeechGroupSpanData], + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle audio-related spans (transcription, speech) - maps to $ai_span event.""" + span_type = span_data.type # "transcription", "speech", or "speech_group" + + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_type, + "$ai_span_type": span_type, + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Add model info if available + if hasattr(span_data, "model") and span_data.model: + properties["$ai_model"] = span_data.model + + # Add model config if available (pass-through property) + if hasattr(span_data, "model_config") and span_data.model_config: + properties["model_config"] = _safe_json(span_data.model_config) + + # Add time to first audio byte for speech spans (pass-through property) + if hasattr(span_data, "first_content_at") and span_data.first_content_at: + properties["first_content_at"] = span_data.first_content_at + + # Add audio format info (pass-through properties) + if hasattr(span_data, "input_format"): + properties["audio_input_format"] = span_data.input_format + if hasattr(span_data, "output_format"): + properties["audio_output_format"] = span_data.output_format + + # Add text input for TTS + if hasattr(span_data, "input") and span_data.input and isinstance(span_data.input, str): + properties["$ai_input"] = self._with_privacy_mode(span_data.input) + + # Don't include audio data (base64) - just metadata + if hasattr(span_data, "output") and isinstance(span_data.output, str): + # For transcription, output is the text + properties["$ai_output_state"] = self._with_privacy_mode(span_data.output) + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_mcp_span( + self, + span_data: MCPListToolsSpanData, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle MCP (Model Context Protocol) spans - maps to $ai_span event.""" + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": f"mcp:{span_data.server}", + "$ai_span_type": "mcp_tools", + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_mcp_server": span_data.server, + "$ai_mcp_tools": span_data.result, + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + self._capture_event("$ai_span", properties, distinct_id) + + def _handle_generic_span( + self, + span_data: Any, + trace_id: str, + span_id: str, + parent_id: Optional[str], + latency: float, + distinct_id: str, + group_id: Optional[str], + error_properties: Dict[str, Any], + ) -> None: + """Handle unknown span types - maps to $ai_span event.""" + span_type = getattr(span_data, "type", "unknown") + + properties = { + "$ai_trace_id": trace_id, + "$ai_span_id": span_id, + "$ai_parent_id": parent_id, + "$ai_span_name": span_type, + "$ai_span_type": span_type, + "$ai_provider": "openai", + "$ai_framework": "openai-agents", + "$ai_latency": latency, + **error_properties, + } + + # Include group_id for linking related traces + if group_id: + properties["$ai_group_id"] = group_id + + # Try to export span data + if hasattr(span_data, "export"): + try: + exported = span_data.export() + properties["$ai_span_data"] = _safe_json(exported) + except Exception: + pass + + self._capture_event("$ai_span", properties, distinct_id) + + def shutdown(self) -> None: + """Clean up resources when the application stops.""" + try: + self._span_start_times.clear() + self._trace_metadata.clear() + + # Flush the PostHog client if possible + if hasattr(self._client, "flush") and callable(self._client.flush): + self._client.flush() + except Exception as e: + log.debug(f"Error in shutdown: {e}") + + def force_flush(self) -> None: + """Force immediate processing of any queued events.""" + try: + if hasattr(self._client, "flush") and callable(self._client.flush): + self._client.flush() + except Exception as e: + log.debug(f"Error in force_flush: {e}") diff --git a/posthog/test/ai/openai_agents/__init__.py b/posthog/test/ai/openai_agents/__init__.py new file mode 100644 index 00000000..1a28a2a4 --- /dev/null +++ b/posthog/test/ai/openai_agents/__init__.py @@ -0,0 +1 @@ +# Tests for OpenAI Agents SDK integration diff --git a/posthog/test/ai/openai_agents/test_processor.py b/posthog/test/ai/openai_agents/test_processor.py new file mode 100644 index 00000000..b5509340 --- /dev/null +++ b/posthog/test/ai/openai_agents/test_processor.py @@ -0,0 +1,583 @@ +import logging +from unittest.mock import MagicMock, patch + +import pytest + +try: + from agents.tracing.span_data import ( + AgentSpanData, + CustomSpanData, + FunctionSpanData, + GenerationSpanData, + GuardrailSpanData, + HandoffSpanData, + ResponseSpanData, + SpeechSpanData, + TranscriptionSpanData, + ) + + from posthog.ai.openai_agents import PostHogTracingProcessor, instrument + + OPENAI_AGENTS_AVAILABLE = True +except ImportError: + OPENAI_AGENTS_AVAILABLE = False + + +# Skip all tests if OpenAI Agents SDK is not available +pytestmark = pytest.mark.skipif( + not OPENAI_AGENTS_AVAILABLE, reason="OpenAI Agents SDK is not available" +) + + +@pytest.fixture(scope="function") +def mock_client(): + client = MagicMock() + client.privacy_mode = False + logging.getLogger("posthog").setLevel(logging.DEBUG) + return client + + +@pytest.fixture(scope="function") +def processor(mock_client): + return PostHogTracingProcessor( + client=mock_client, + distinct_id="test-user", + privacy_mode=False, + ) + + +@pytest.fixture +def mock_trace(): + trace = MagicMock() + trace.trace_id = "trace_123456789" + trace.name = "Test Workflow" + trace.group_id = "group_123" + trace.metadata = {"key": "value"} + return trace + + +@pytest.fixture +def mock_span(): + span = MagicMock() + span.trace_id = "trace_123456789" + span.span_id = "span_987654321" + span.parent_id = None + span.started_at = "2024-01-01T00:00:00Z" + span.ended_at = "2024-01-01T00:00:01Z" + span.error = None + return span + + +class TestPostHogTracingProcessor: + """Tests for the PostHogTracingProcessor class.""" + + def test_initialization(self, mock_client): + """Test processor initializes correctly.""" + processor = PostHogTracingProcessor( + client=mock_client, + distinct_id="user@example.com", + privacy_mode=True, + groups={"company": "acme"}, + properties={"env": "test"}, + ) + + assert processor._client == mock_client + assert processor._distinct_id == "user@example.com" + assert processor._privacy_mode is True + assert processor._groups == {"company": "acme"} + assert processor._properties == {"env": "test"} + + def test_initialization_with_callable_distinct_id(self, mock_client, mock_trace): + """Test processor with callable distinct_id resolver.""" + resolver = lambda trace: trace.metadata.get("user_id", "default") + processor = PostHogTracingProcessor( + client=mock_client, + distinct_id=resolver, + ) + + mock_trace.metadata = {"user_id": "resolved-user"} + distinct_id = processor._get_distinct_id(mock_trace) + assert distinct_id == "resolved-user" + + def test_on_trace_start(self, processor, mock_client, mock_trace): + """Test that on_trace_start captures $ai_trace event.""" + processor.on_trace_start(mock_trace) + + mock_client.capture.assert_called_once() + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_trace" + assert call_kwargs["distinct_id"] == "test-user" + assert call_kwargs["properties"]["$ai_trace_id"] == "trace_123456789" + assert call_kwargs["properties"]["$ai_trace_name"] == "Test Workflow" + assert call_kwargs["properties"]["$ai_provider"] == "openai" + assert call_kwargs["properties"]["$ai_framework"] == "openai-agents" + + def test_on_trace_end_clears_metadata(self, processor, mock_trace): + """Test that on_trace_end clears stored trace metadata.""" + processor.on_trace_start(mock_trace) + assert mock_trace.trace_id in processor._trace_metadata + + processor.on_trace_end(mock_trace) + assert mock_trace.trace_id not in processor._trace_metadata + + def test_on_span_start_tracks_time(self, processor, mock_span): + """Test that on_span_start records start time.""" + processor.on_span_start(mock_span) + assert mock_span.span_id in processor._span_start_times + + def test_generation_span_mapping(self, processor, mock_client, mock_span): + """Test GenerationSpanData maps to $ai_generation event.""" + span_data = GenerationSpanData( + input=[{"role": "user", "content": "Hello"}], + output=[{"role": "assistant", "content": "Hi there!"}], + model="gpt-4o", + model_config={"temperature": 0.7, "max_tokens": 100}, + usage={"input_tokens": 10, "output_tokens": 20}, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + mock_client.capture.assert_called_once() + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_generation" + assert call_kwargs["properties"]["$ai_trace_id"] == "trace_123456789" + assert call_kwargs["properties"]["$ai_span_id"] == "span_987654321" + assert call_kwargs["properties"]["$ai_provider"] == "openai" + assert call_kwargs["properties"]["$ai_framework"] == "openai-agents" + assert call_kwargs["properties"]["$ai_model"] == "gpt-4o" + assert call_kwargs["properties"]["$ai_input_tokens"] == 10 + assert call_kwargs["properties"]["$ai_output_tokens"] == 20 + assert call_kwargs["properties"]["$ai_input"] == [ + {"role": "user", "content": "Hello"} + ] + assert call_kwargs["properties"]["$ai_output_choices"] == [ + {"role": "assistant", "content": "Hi there!"} + ] + + def test_generation_span_with_reasoning_tokens(self, processor, mock_client, mock_span): + """Test GenerationSpanData includes reasoning tokens when present.""" + span_data = GenerationSpanData( + model="o1-preview", + usage={ + "input_tokens": 100, + "output_tokens": 500, + "reasoning_tokens": 400, + }, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_reasoning_tokens"] == 400 + + def test_function_span_mapping(self, processor, mock_client, mock_span): + """Test FunctionSpanData maps to $ai_span event with type=tool.""" + span_data = FunctionSpanData( + name="get_weather", + input='{"city": "San Francisco"}', + output="Sunny, 72F", + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_name"] == "get_weather" + assert call_kwargs["properties"]["$ai_span_type"] == "tool" + assert call_kwargs["properties"]["$ai_input_state"] == '{"city": "San Francisco"}' + assert call_kwargs["properties"]["$ai_output_state"] == "Sunny, 72F" + + def test_agent_span_mapping(self, processor, mock_client, mock_span): + """Test AgentSpanData maps to $ai_span event with type=agent.""" + span_data = AgentSpanData( + name="CustomerServiceAgent", + handoffs=["TechnicalAgent", "BillingAgent"], + tools=["search", "get_order"], + output_type="str", + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_name"] == "CustomerServiceAgent" + assert call_kwargs["properties"]["$ai_span_type"] == "agent" + assert call_kwargs["properties"]["$ai_agent_handoffs"] == [ + "TechnicalAgent", + "BillingAgent", + ] + assert call_kwargs["properties"]["$ai_agent_tools"] == ["search", "get_order"] + + def test_handoff_span_mapping(self, processor, mock_client, mock_span): + """Test HandoffSpanData maps to $ai_span event with type=handoff.""" + span_data = HandoffSpanData( + from_agent="TriageAgent", + to_agent="TechnicalAgent", + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_type"] == "handoff" + assert call_kwargs["properties"]["$ai_handoff_from_agent"] == "TriageAgent" + assert call_kwargs["properties"]["$ai_handoff_to_agent"] == "TechnicalAgent" + assert ( + call_kwargs["properties"]["$ai_span_name"] + == "TriageAgent -> TechnicalAgent" + ) + + def test_guardrail_span_mapping(self, processor, mock_client, mock_span): + """Test GuardrailSpanData maps to $ai_span event with type=guardrail.""" + span_data = GuardrailSpanData( + name="ContentFilter", + triggered=True, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_name"] == "ContentFilter" + assert call_kwargs["properties"]["$ai_span_type"] == "guardrail" + assert call_kwargs["properties"]["$ai_guardrail_triggered"] is True + + def test_custom_span_mapping(self, processor, mock_client, mock_span): + """Test CustomSpanData maps to $ai_span event with type=custom.""" + span_data = CustomSpanData( + name="database_query", + data={"query": "SELECT * FROM users", "rows": 100}, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_name"] == "database_query" + assert call_kwargs["properties"]["$ai_span_type"] == "custom" + assert call_kwargs["properties"]["$ai_custom_data"] == { + "query": "SELECT * FROM users", + "rows": 100, + } + + def test_privacy_mode_redacts_content(self, mock_client, mock_span): + """Test that privacy_mode redacts input/output content.""" + processor = PostHogTracingProcessor( + client=mock_client, + distinct_id="test-user", + privacy_mode=True, + ) + + span_data = GenerationSpanData( + input=[{"role": "user", "content": "Secret message"}], + output=[{"role": "assistant", "content": "Secret response"}], + model="gpt-4o", + usage={"input_tokens": 10, "output_tokens": 20}, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + # Content should be redacted + assert call_kwargs["properties"]["$ai_input"] is None + assert call_kwargs["properties"]["$ai_output_choices"] is None + # Token counts should still be present + assert call_kwargs["properties"]["$ai_input_tokens"] == 10 + assert call_kwargs["properties"]["$ai_output_tokens"] == 20 + + def test_error_handling_in_span(self, processor, mock_client, mock_span): + """Test that span errors are captured correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "Rate limit exceeded", "data": {"code": 429}} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["properties"]["$ai_is_error"] is True + assert call_kwargs["properties"]["$ai_error"] == "Rate limit exceeded" + + def test_generation_span_includes_total_tokens(self, processor, mock_client, mock_span): + """Test that $ai_total_tokens is calculated and included.""" + span_data = GenerationSpanData( + model="gpt-4o", + usage={"input_tokens": 100, "output_tokens": 50}, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_total_tokens"] == 150 + + def test_error_type_categorization_model_behavior(self, processor, mock_client, mock_span): + """Test that ModelBehaviorError is categorized correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "ModelBehaviorError: Invalid JSON output", "type": "ModelBehaviorError"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "model_behavior_error" + + def test_error_type_categorization_user_error(self, processor, mock_client, mock_span): + """Test that UserError is categorized correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "UserError: Tool failed", "type": "UserError"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "user_error" + + def test_error_type_categorization_input_guardrail(self, processor, mock_client, mock_span): + """Test that InputGuardrailTripwireTriggered is categorized correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "InputGuardrailTripwireTriggered: Content blocked"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "input_guardrail_triggered" + + def test_error_type_categorization_output_guardrail(self, processor, mock_client, mock_span): + """Test that OutputGuardrailTripwireTriggered is categorized correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "OutputGuardrailTripwireTriggered: Response blocked"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "output_guardrail_triggered" + + def test_error_type_categorization_max_turns(self, processor, mock_client, mock_span): + """Test that MaxTurnsExceeded is categorized correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "MaxTurnsExceeded: Agent exceeded maximum turns"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "max_turns_exceeded" + + def test_error_type_categorization_unknown(self, processor, mock_client, mock_span): + """Test that unknown errors are categorized as unknown.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + mock_span.error = {"message": "Some random error occurred"} + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_error_type"] == "unknown" + + def test_response_span_with_output_and_total_tokens(self, processor, mock_client, mock_span): + """Test ResponseSpanData includes output choices and total tokens.""" + # Create a mock response object + mock_response = MagicMock() + mock_response.id = "resp_123" + mock_response.model = "gpt-4o" + mock_response.output = [{"type": "message", "content": "Hello!"}] + mock_response.usage = MagicMock() + mock_response.usage.input_tokens = 25 + mock_response.usage.output_tokens = 10 + + span_data = ResponseSpanData( + response=mock_response, + input="Hello, world!", + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_generation" + assert call_kwargs["properties"]["$ai_total_tokens"] == 35 + assert call_kwargs["properties"]["$ai_output_choices"] == [{"type": "message", "content": "Hello!"}] + assert call_kwargs["properties"]["$ai_response_id"] == "resp_123" + + def test_speech_span_with_pass_through_properties(self, processor, mock_client, mock_span): + """Test SpeechSpanData includes pass-through properties.""" + span_data = SpeechSpanData( + input="Hello, how can I help you?", + output="base64_audio_data", + output_format="pcm", + model="tts-1", + model_config={"voice": "alloy", "speed": 1.0}, + first_content_at="2024-01-01T00:00:00.500Z", + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_type"] == "speech" + assert call_kwargs["properties"]["$ai_model"] == "tts-1" + # Pass-through properties (no $ai_ prefix) + assert call_kwargs["properties"]["first_content_at"] == "2024-01-01T00:00:00.500Z" + assert call_kwargs["properties"]["audio_output_format"] == "pcm" + assert call_kwargs["properties"]["model_config"] == {"voice": "alloy", "speed": 1.0} + # Text input should be captured + assert call_kwargs["properties"]["$ai_input"] == "Hello, how can I help you?" + + def test_transcription_span_with_pass_through_properties(self, processor, mock_client, mock_span): + """Test TranscriptionSpanData includes pass-through properties.""" + span_data = TranscriptionSpanData( + input="base64_audio_data", + input_format="pcm", + output="This is the transcribed text.", + model="whisper-1", + model_config={"language": "en"}, + ) + mock_span.span_data = span_data + + processor.on_span_start(mock_span) + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + + assert call_kwargs["event"] == "$ai_span" + assert call_kwargs["properties"]["$ai_span_type"] == "transcription" + assert call_kwargs["properties"]["$ai_model"] == "whisper-1" + # Pass-through properties (no $ai_ prefix) + assert call_kwargs["properties"]["audio_input_format"] == "pcm" + assert call_kwargs["properties"]["model_config"] == {"language": "en"} + # Transcription output should be captured + assert call_kwargs["properties"]["$ai_output_state"] == "This is the transcribed text." + + def test_latency_calculation(self, processor, mock_client, mock_span): + """Test that latency is calculated correctly.""" + span_data = GenerationSpanData(model="gpt-4o") + mock_span.span_data = span_data + + with patch("time.time") as mock_time: + mock_time.return_value = 1000.0 + processor.on_span_start(mock_span) + + mock_time.return_value = 1001.5 # 1.5 seconds later + processor.on_span_end(mock_span) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["$ai_latency"] == pytest.approx(1.5, rel=0.01) + + def test_groups_included_in_events(self, mock_client, mock_trace, mock_span): + """Test that groups are included in captured events.""" + processor = PostHogTracingProcessor( + client=mock_client, + distinct_id="test-user", + groups={"company": "acme", "team": "engineering"}, + ) + + processor.on_trace_start(mock_trace) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["groups"] == {"company": "acme", "team": "engineering"} + + def test_additional_properties_included(self, mock_client, mock_trace): + """Test that additional properties are included in events.""" + processor = PostHogTracingProcessor( + client=mock_client, + distinct_id="test-user", + properties={"environment": "production", "version": "1.0"}, + ) + + processor.on_trace_start(mock_trace) + + call_kwargs = mock_client.capture.call_args[1] + assert call_kwargs["properties"]["environment"] == "production" + assert call_kwargs["properties"]["version"] == "1.0" + + def test_shutdown_clears_state(self, processor): + """Test that shutdown clears internal state.""" + processor._span_start_times["span_1"] = 1000.0 + processor._trace_metadata["trace_1"] = {"name": "test"} + + processor.shutdown() + + assert len(processor._span_start_times) == 0 + assert len(processor._trace_metadata) == 0 + + def test_force_flush_calls_client_flush(self, processor, mock_client): + """Test that force_flush calls client.flush().""" + processor.force_flush() + mock_client.flush.assert_called_once() + + +class TestInstrumentHelper: + """Tests for the instrument() convenience function.""" + + def test_instrument_registers_processor(self, mock_client): + """Test that instrument() registers a processor.""" + with patch("agents.tracing.add_trace_processor") as mock_add: + processor = instrument( + client=mock_client, + distinct_id="test-user", + ) + + mock_add.assert_called_once_with(processor) + assert isinstance(processor, PostHogTracingProcessor) + + def test_instrument_with_privacy_mode(self, mock_client): + """Test instrument() respects privacy_mode.""" + with patch("agents.tracing.add_trace_processor"): + processor = instrument( + client=mock_client, + privacy_mode=True, + ) + + assert processor._privacy_mode is True + + def test_instrument_with_groups_and_properties(self, mock_client): + """Test instrument() accepts groups and properties.""" + with patch("agents.tracing.add_trace_processor"): + processor = instrument( + client=mock_client, + groups={"company": "acme"}, + properties={"env": "test"}, + ) + + assert processor._groups == {"company": "acme"} + assert processor._properties == {"env": "test"} diff --git a/posthog/version.py b/posthog/version.py index 353acdce..05114bf5 100644 --- a/posthog/version.py +++ b/posthog/version.py @@ -1,4 +1,4 @@ -VERSION = "7.5.1" +VERSION = "7.6.0" if __name__ == "__main__": print(VERSION, end="") # noqa: T201