From 4a37fd7363930715bf4bea143501c96158e6817a Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 5 Feb 2026 14:22:17 -0800 Subject: [PATCH 1/2] [Contrib] Agent-OS Governance Guardrails Adds kernel-level guardrails for OpenAI Agents SDK. Features: - GovernanceGuardrail: Output guardrail with policy enforcement - GovernancePolicy: Configure blocked patterns, tools, limits - GovernedRunner: Wrapper with automatic guardrail injection - Input validation for pre-processing - Violation callbacks for alerting Integration with Agent-OS kernel for enterprise governance. See: https://github.com/imran-siddique/agent-os --- src/agents/contrib/README.md | 156 +++++++++++++ src/agents/contrib/__init__.py | 22 ++ src/agents/contrib/_governance.py | 376 ++++++++++++++++++++++++++++++ 3 files changed, 554 insertions(+) create mode 100644 src/agents/contrib/README.md create mode 100644 src/agents/contrib/__init__.py create mode 100644 src/agents/contrib/_governance.py diff --git a/src/agents/contrib/README.md b/src/agents/contrib/README.md new file mode 100644 index 000000000..b7e6f29ce --- /dev/null +++ b/src/agents/contrib/README.md @@ -0,0 +1,156 @@ +# Agent-OS Governance for OpenAI Agents SDK + +Kernel-level guardrails and policy enforcement for OpenAI Agents SDK using [Agent-OS](https://github.com/imran-siddique/agent-os). + +## Features + +- **Output Guardrails**: Block dangerous patterns in agent outputs +- **Input Validation**: Filter malicious inputs before processing +- **Tool Control**: Limit which tools agents can use +- **Rate Limiting**: Cap tool invocations per run +- **Violation Handling**: Callbacks for policy violations + +## Installation + +```bash +pip install openai-agents[governance] +# or +pip install agent-os-kernel +``` + +## Quick Start + +```python +from agents import Agent, Runner +from agents.contrib import create_governance_guardrail + +# Create guardrail with simple config +guardrail = create_governance_guardrail( + blocked_patterns=["DROP TABLE", "rm -rf", "DELETE FROM"], + blocked_tools=["shell_execute"], + max_tool_calls=10, +) + +# Create agent with guardrail +agent = Agent( + name="analyst", + instructions="Analyze data safely", + output_guardrails=[guardrail], +) + +# Run agent +result = await Runner.run(agent, "Analyze Q4 sales data") +``` + +## Advanced Usage + +### Full Policy Configuration + +```python +from agents.contrib import GovernanceGuardrail, GovernancePolicy + +policy = GovernancePolicy( + # Content Filtering + blocked_patterns=["DROP TABLE", "rm -rf", "DELETE FROM"], + max_output_length=100_000, + + # Tool Control + blocked_tools=["shell_execute", "file_delete"], + allowed_tools=["search", "calculator", "code_executor"], + max_tool_calls=20, + + # Approval + require_human_approval=False, + approval_tools=["database_write"], +) + +guardrail = GovernanceGuardrail(policy) +``` + +### Handling Violations + +```python +def on_violation(violation): + print(f"BLOCKED: {violation.policy_name}") + print(f" Reason: {violation.description}") + # Send alert, log to SIEM, etc. + +guardrail = GovernanceGuardrail(policy, on_violation=on_violation) +``` + +### Using GovernedRunner + +```python +from agents import Agent +from agents.contrib import GovernedRunner, GovernancePolicy + +policy = GovernancePolicy( + blocked_patterns=["DROP TABLE"], + max_tool_calls=10, +) + +runner = GovernedRunner(policy) + +agent = Agent( + name="analyst", + instructions="Analyze data", +) + +# Runner handles guardrail injection +result = await runner.run(agent, "Analyze Q4 sales") + +# Check violations +print(f"Violations: {len(runner.violations)}") +for v in runner.violations: + print(f" - {v.description}") +``` + +### Input Validation + +```python +from agents.contrib import GovernanceGuardrail, GovernancePolicy + +policy = GovernancePolicy( + blocked_patterns=["DROP TABLE", "rm -rf"], +) + +guardrail = GovernanceGuardrail(policy) + +# Check input before sending to agent +user_input = "Delete the users table" +violation = guardrail.check_input(user_input) + +if violation: + print(f"Input blocked: {violation.description}") +else: + result = await Runner.run(agent, user_input) +``` + +## Integration with Agent-OS Kernel + +For full kernel-level governance: + +```python +from agent_os import KernelSpace +from agent_os.policies import SQLPolicy, CostControlPolicy +from agents import Agent, Runner + +# Create kernel with policies +kernel = KernelSpace(policy=[ + SQLPolicy(allow=["SELECT"], deny=["DROP", "DELETE"]), + CostControlPolicy(max_cost_usd=100), +]) + +# Wrap agent execution in kernel +@kernel.register +async def run_agent(input_text): + return await Runner.run(agent, input_text) + +# Execute with full governance +result = await kernel.execute(run_agent, "Analyze data") +``` + +## Links + +- [Agent-OS GitHub](https://github.com/imran-siddique/agent-os) +- [OpenAI Agents SDK Documentation](https://openai.github.io/openai-agents-python/) diff --git a/src/agents/contrib/__init__.py b/src/agents/contrib/__init__.py new file mode 100644 index 000000000..a2afdc3cf --- /dev/null +++ b/src/agents/contrib/__init__.py @@ -0,0 +1,22 @@ +# Copyright (c) Agent-OS Contributors. All rights reserved. +# Licensed under the MIT License. +"""Agent-OS Governance Integration for OpenAI Agents SDK. + +Provides kernel-level guardrails and policy enforcement. +""" + +from ._governance import ( + GovernanceGuardrail, + GovernancePolicy, + GovernedRunner, + PolicyViolation, + create_governance_guardrail, +) + +__all__ = [ + "GovernanceGuardrail", + "GovernancePolicy", + "GovernedRunner", + "PolicyViolation", + "create_governance_guardrail", +] diff --git a/src/agents/contrib/_governance.py b/src/agents/contrib/_governance.py new file mode 100644 index 000000000..7dc91316d --- /dev/null +++ b/src/agents/contrib/_governance.py @@ -0,0 +1,376 @@ +# Copyright (c) Agent-OS Contributors. All rights reserved. +# Licensed under the MIT License. +"""Kernel-level governance for OpenAI Agents SDK.""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Union + + +class ViolationType(Enum): + """Types of policy violations.""" + + TOOL_BLOCKED = "tool_blocked" + TOOL_LIMIT_EXCEEDED = "tool_limit_exceeded" + CONTENT_FILTERED = "content_filtered" + OUTPUT_BLOCKED = "output_blocked" + INPUT_BLOCKED = "input_blocked" + TIMEOUT = "timeout" + + +@dataclass +class PolicyViolation: + """Represents a policy violation event.""" + + violation_type: ViolationType + policy_name: str + description: str + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + details: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class GovernancePolicy: + """Policy configuration for agent governance. + + Attributes: + blocked_patterns: Regex patterns to block in inputs/outputs. + blocked_tools: Tool names that cannot be used. + allowed_tools: If set, only these tools can be used. + max_tool_calls: Maximum tool invocations per run. + max_output_length: Maximum output length in characters. + require_human_approval: Require approval for certain actions. + approval_tools: Tools requiring human approval. + """ + + blocked_patterns: List[str] = field(default_factory=list) + blocked_tools: List[str] = field(default_factory=list) + allowed_tools: Optional[List[str]] = None + max_tool_calls: int = 50 + max_output_length: int = 100_000 + require_human_approval: bool = False + approval_tools: List[str] = field(default_factory=list) + + def __post_init__(self): + """Compile regex patterns.""" + self._compiled_patterns = [ + re.compile(p, re.IGNORECASE) for p in self.blocked_patterns + ] + + +class GovernanceGuardrail: + """Guardrail that enforces Agent-OS governance policies. + + Implements the OpenAI Agents SDK guardrail interface to provide + kernel-level policy enforcement. + + Example: + ```python + from agents import Agent, Runner + from agents.contrib import GovernanceGuardrail, GovernancePolicy + + policy = GovernancePolicy( + blocked_patterns=["DROP TABLE", "rm -rf"], + blocked_tools=["shell_execute"], + max_tool_calls=10, + ) + + guardrail = GovernanceGuardrail(policy) + + agent = Agent( + name="analyst", + instructions="Analyze data safely", + output_guardrails=[guardrail], + ) + ``` + """ + + def __init__( + self, + policy: GovernancePolicy, + on_violation: Optional[Callable[[PolicyViolation], None]] = None, + ): + """Initialize governance guardrail. + + Args: + policy: Governance policy to enforce. + on_violation: Callback when violations occur. + """ + self.policy = policy + self.on_violation = on_violation + self._tool_calls = 0 + self._violations: List[PolicyViolation] = [] + + async def run(self, context: Any, agent: Any, output: Any) -> Any: + """Execute guardrail check on agent output. + + Args: + context: Run context. + agent: Agent that produced output. + output: Output to check. + + Returns: + GuardrailFunctionOutput with tripwire if violation detected. + """ + # Import here to avoid circular dependency + try: + from agents.guardrail import GuardrailFunctionOutput + except ImportError: + # Fallback for testing + @dataclass + class GuardrailFunctionOutput: + output_info: Any = None + tripwire_triggered: bool = False + + output_str = str(output) if output else "" + + # Check patterns + for pattern in self.policy._compiled_patterns: + if pattern.search(output_str): + violation = self._record_violation( + ViolationType.OUTPUT_BLOCKED, + f"Output contains blocked pattern: {pattern.pattern}", + pattern=pattern.pattern, + ) + return GuardrailFunctionOutput( + output_info=f"BLOCKED: {violation.description}", + tripwire_triggered=True, + ) + + # Check length + if len(output_str) > self.policy.max_output_length: + violation = self._record_violation( + ViolationType.OUTPUT_BLOCKED, + f"Output exceeds max length ({len(output_str)} > {self.policy.max_output_length})", + ) + return GuardrailFunctionOutput( + output_info=f"BLOCKED: {violation.description}", + tripwire_triggered=True, + ) + + return GuardrailFunctionOutput(tripwire_triggered=False) + + def check_tool(self, tool_name: str) -> Optional[PolicyViolation]: + """Check if a tool is allowed by policy. + + Args: + tool_name: Name of tool to check. + + Returns: + PolicyViolation if blocked, None if allowed. + """ + # Check blocked list + if tool_name in self.policy.blocked_tools: + return self._record_violation( + ViolationType.TOOL_BLOCKED, + f"Tool '{tool_name}' is blocked by policy", + tool_name=tool_name, + ) + + # Check allowed list + if ( + self.policy.allowed_tools is not None + and tool_name not in self.policy.allowed_tools + ): + return self._record_violation( + ViolationType.TOOL_BLOCKED, + f"Tool '{tool_name}' not in allowed list", + tool_name=tool_name, + ) + + # Check limit + self._tool_calls += 1 + if self._tool_calls > self.policy.max_tool_calls: + return self._record_violation( + ViolationType.TOOL_LIMIT_EXCEEDED, + f"Tool call limit exceeded ({self._tool_calls} > {self.policy.max_tool_calls})", + ) + + return None + + def check_input(self, input_text: str) -> Optional[PolicyViolation]: + """Check input text for policy violations. + + Args: + input_text: Input to check. + + Returns: + PolicyViolation if blocked, None if allowed. + """ + for pattern in self.policy._compiled_patterns: + if pattern.search(input_text): + return self._record_violation( + ViolationType.INPUT_BLOCKED, + f"Input contains blocked pattern: {pattern.pattern}", + pattern=pattern.pattern, + ) + return None + + def _record_violation( + self, + violation_type: ViolationType, + description: str, + **details: Any, + ) -> PolicyViolation: + """Record a policy violation.""" + violation = PolicyViolation( + violation_type=violation_type, + policy_name=violation_type.value, + description=description, + details=details, + ) + self._violations.append(violation) + + if self.on_violation: + self.on_violation(violation) + + return violation + + @property + def violations(self) -> List[PolicyViolation]: + """Get all violations.""" + return self._violations.copy() + + def reset(self): + """Reset guardrail state for new run.""" + self._tool_calls = 0 + self._violations = [] + + +class GovernedRunner: + """Runner wrapper with governance enforcement. + + Wraps the standard Runner to enforce policies on all operations. + + Example: + ```python + from agents import Agent + from agents.contrib import GovernedRunner, GovernancePolicy + + policy = GovernancePolicy( + blocked_patterns=["DROP TABLE"], + max_tool_calls=10, + ) + + runner = GovernedRunner(policy) + + agent = Agent( + name="analyst", + instructions="Analyze data", + ) + + result = await runner.run(agent, "Analyze Q4 sales") + print(f"Violations: {len(runner.violations)}") + ``` + """ + + def __init__( + self, + policy: GovernancePolicy, + on_violation: Optional[Callable[[PolicyViolation], None]] = None, + ): + """Initialize governed runner. + + Args: + policy: Governance policy to enforce. + on_violation: Callback when violations occur. + """ + self.policy = policy + self.guardrail = GovernanceGuardrail(policy, on_violation) + + async def run( + self, + agent: Any, + input_text: str, + **kwargs: Any, + ) -> Any: + """Run agent with governance. + + Args: + agent: Agent to run. + input_text: Input text. + **kwargs: Additional arguments passed to Runner.run. + + Returns: + Agent result. + + Raises: + ValueError: If input violates policy. + """ + # Import here to avoid circular dependency + try: + from agents import Runner + except ImportError: + raise ImportError("OpenAI Agents SDK not installed") + + # Check input + violation = self.guardrail.check_input(input_text) + if violation: + raise ValueError(f"Input blocked: {violation.description}") + + # Reset guardrail state + self.guardrail.reset() + + # Add guardrail to agent if not present + if hasattr(agent, "output_guardrails"): + if self.guardrail not in agent.output_guardrails: + agent.output_guardrails = list(agent.output_guardrails or []) + agent.output_guardrails.append(self.guardrail) + + # Run agent + result = await Runner.run(agent, input_text, **kwargs) + + return result + + @property + def violations(self) -> List[PolicyViolation]: + """Get all violations.""" + return self.guardrail.violations + + +def create_governance_guardrail( + blocked_patterns: Optional[List[str]] = None, + blocked_tools: Optional[List[str]] = None, + max_tool_calls: int = 50, + on_violation: Optional[Callable[[PolicyViolation], None]] = None, +) -> GovernanceGuardrail: + """Factory function to create a governance guardrail. + + Convenience function for common use cases. + + Args: + blocked_patterns: Regex patterns to block. + blocked_tools: Tool names to block. + max_tool_calls: Maximum tool invocations. + on_violation: Callback for violations. + + Returns: + Configured GovernanceGuardrail. + + Example: + ```python + from agents import Agent + from agents.contrib import create_governance_guardrail + + guardrail = create_governance_guardrail( + blocked_patterns=["DROP TABLE", "rm -rf"], + blocked_tools=["shell"], + max_tool_calls=10, + ) + + agent = Agent( + name="analyst", + output_guardrails=[guardrail], + ) + ``` + """ + policy = GovernancePolicy( + blocked_patterns=blocked_patterns or [], + blocked_tools=blocked_tools or [], + max_tool_calls=max_tool_calls, + ) + return GovernanceGuardrail(policy, on_violation) From 2ab584c4c5b596330a7232ee6a624bb5538b9c7a Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 5 Feb 2026 14:38:04 -0800 Subject: [PATCH 2/2] fix: Address review comments - Add output_info=None when guardrail passes (fixes TypeError) - Clarify that tool policies require manual check_tool() calls - Add Tool Policy Enforcement section to README --- src/agents/contrib/README.md | 34 +++++++++++++++++++++++++------ src/agents/contrib/_governance.py | 2 +- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/agents/contrib/README.md b/src/agents/contrib/README.md index b7e6f29ce..b0ca69b44 100644 --- a/src/agents/contrib/README.md +++ b/src/agents/contrib/README.md @@ -4,10 +4,9 @@ Kernel-level guardrails and policy enforcement for OpenAI Agents SDK using [Agen ## Features -- **Output Guardrails**: Block dangerous patterns in agent outputs -- **Input Validation**: Filter malicious inputs before processing -- **Tool Control**: Limit which tools agents can use -- **Rate Limiting**: Cap tool invocations per run +- **Output Guardrails**: Block dangerous patterns in agent outputs (automatic) +- **Input Validation**: Filter malicious inputs before processing (automatic) +- **Tool Policy**: Define allowed/blocked tools (requires manual `check_tool()` calls) - **Violation Handling**: Callbacks for policy violations ## Installation @@ -27,8 +26,8 @@ from agents.contrib import create_governance_guardrail # Create guardrail with simple config guardrail = create_governance_guardrail( blocked_patterns=["DROP TABLE", "rm -rf", "DELETE FROM"], - blocked_tools=["shell_execute"], - max_tool_calls=10, + blocked_tools=["shell_execute"], # Use with check_tool() for enforcement + max_tool_calls=10, # Use with check_tool() for enforcement ) # Create agent with guardrail @@ -126,6 +125,29 @@ else: result = await Runner.run(agent, user_input) ``` +### Tool Policy Enforcement + +Tool policies (`blocked_tools`, `allowed_tools`, `max_tool_calls`) are not automatically +enforced during agent execution. Use `check_tool()` in your tool implementations: + +```python +from agents.contrib import GovernanceGuardrail, GovernancePolicy + +policy = GovernancePolicy( + blocked_tools=["dangerous_tool"], + max_tool_calls=10, +) + +guardrail = GovernanceGuardrail(policy) + +# In your tool implementation: +def my_tool(name: str, args: dict): + violation = guardrail.check_tool(name) + if violation: + raise ValueError(f"Tool blocked: {violation.description}") + # ... execute tool +``` + ## Integration with Agent-OS Kernel For full kernel-level governance: diff --git a/src/agents/contrib/_governance.py b/src/agents/contrib/_governance.py index 7dc91316d..6045dc490 100644 --- a/src/agents/contrib/_governance.py +++ b/src/agents/contrib/_governance.py @@ -152,7 +152,7 @@ class GuardrailFunctionOutput: tripwire_triggered=True, ) - return GuardrailFunctionOutput(tripwire_triggered=False) + return GuardrailFunctionOutput(output_info=None, tripwire_triggered=False) def check_tool(self, tool_name: str) -> Optional[PolicyViolation]: """Check if a tool is allowed by policy.