From c3382012d5894f472d09059bd5f2f69f08d950d8 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 5 Feb 2026 14:06:04 -0800 Subject: [PATCH] [Contrib] Agent-OS Governance Extension Adds kernel-level governance for AutoGen multi-agent conversations. Features: - GovernancePolicy: Define rules for agent behavior - GovernedAgent: Wrap individual agents with policy enforcement - GovernedTeam: Govern entire agent teams - Content filtering with blocked patterns - Tool call limits and filtering - Full audit trail Integration with Agent-OS kernel for enterprise governance. See: https://github.com/imran-siddique/agent-os --- .../src/autogen_ext/governance/README.md | 123 +++++++ .../src/autogen_ext/governance/__init__.py | 47 +++ .../src/autogen_ext/governance/_governance.py | 341 ++++++++++++++++++ 3 files changed, 511 insertions(+) create mode 100644 python/packages/autogen-ext/src/autogen_ext/governance/README.md create mode 100644 python/packages/autogen-ext/src/autogen_ext/governance/__init__.py create mode 100644 python/packages/autogen-ext/src/autogen_ext/governance/_governance.py diff --git a/python/packages/autogen-ext/src/autogen_ext/governance/README.md b/python/packages/autogen-ext/src/autogen_ext/governance/README.md new file mode 100644 index 000000000000..e5a59e5e4729 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/governance/README.md @@ -0,0 +1,123 @@ +# Agent-OS Governance Extension for AutoGen + +This extension provides kernel-level governance for AutoGen multi-agent conversations using [Agent-OS](https://github.com/imran-siddique/agent-os). + +## Features + +- **Policy Enforcement**: Define rules for agent behavior +- **Tool Filtering**: Control which tools agents can use +- **Content Filtering**: Block dangerous patterns (SQL injection, shell commands) +- **Rate Limiting**: Limit messages and tool calls +- **Audit Trail**: Full logging of all agent interactions + +## Installation + +```bash +pip install autogen-ext[governance] +# or +pip install agent-os-kernel +``` + +## Quick Start + +```python +from autogen_ext.governance import GovernedTeam, GovernancePolicy +from autogen_agentchat.agents import AssistantAgent +from autogen_ext.models.openai import OpenAIChatCompletionClient + +# Create policy +policy = GovernancePolicy( + max_tool_calls=10, + max_messages=50, + blocked_patterns=["DROP TABLE", "rm -rf", "DELETE FROM"], + blocked_tools=["shell_execute"], + require_human_approval=False, +) + +# Create agents +model = OpenAIChatCompletionClient(model="gpt-4o") +analyst = AssistantAgent("analyst", model_client=model) +reviewer = AssistantAgent("reviewer", model_client=model) + +# Create governed team +team = GovernedTeam( + agents=[analyst, reviewer], + policy=policy, +) + +# Run with governance +result = await team.run("Analyze Q4 sales data") + +# Get audit log +audit = team.get_audit_log() +print(f"Total events: {len(audit)}") +``` + +## Policy Options + +```python +GovernancePolicy( + # Limits + max_messages=100, # Max messages per session + max_tool_calls=50, # Max tool invocations + timeout_seconds=300, # Session timeout + + # Tool Control + allowed_tools=["code_executor", "web_search"], # Whitelist + blocked_tools=["shell_execute"], # Blacklist + + # Content Filtering + blocked_patterns=["DROP TABLE", "rm -rf"], + max_message_length=50000, + + # Approval + require_human_approval=False, + approval_tools=["database_write"], # Tools needing approval + + # Audit + log_all_messages=True, +) +``` + +## Handling Violations + +```python +def on_violation(error): + print(f"BLOCKED: {error.policy_name} - {error.description}") + # Send alert, log to SIEM, etc. + +team = GovernedTeam( + agents=[agent1, agent2], + policy=policy, + on_violation=on_violation, +) +``` + +## Integration with Agent-OS Kernel + +For full kernel-level governance with signals, checkpoints, and policy languages: + +```python +from agent_os import KernelSpace +from agent_os.policies import SQLPolicy, CostControlPolicy + +# Create kernel with policies +kernel = KernelSpace(policy=[ + SQLPolicy(allow=["SELECT"], deny=["DROP", "DELETE"]), + CostControlPolicy(max_cost_usd=100), +]) + +# Wrap AutoGen team in kernel +@kernel.register +async def run_team(task: str): + return await team.run(task) + +# Execute with full governance +result = await kernel.execute(run_team, "Analyze data") +``` + +## Links + +- [Agent-OS GitHub](https://github.com/imran-siddique/agent-os) +- [AutoGen Documentation](https://microsoft.github.io/autogen/) +- [Governance Best Practices](https://github.com/imran-siddique/agent-os/blob/main/docs/governance.md) diff --git a/python/packages/autogen-ext/src/autogen_ext/governance/__init__.py b/python/packages/autogen-ext/src/autogen_ext/governance/__init__.py new file mode 100644 index 000000000000..3964c1a0c63a --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/governance/__init__.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Agent-OS Governance Extension for AutoGen +========================================== + +Provides kernel-level governance for AutoGen multi-agent conversations. + +Features: +- Policy enforcement for agent messages +- Tool call filtering and limits +- Content pattern blocking +- Human approval workflows +- Full audit trail + +Example: + >>> from autogen_ext.governance import GovernedTeam, GovernancePolicy + >>> from autogen_agentchat.agents import AssistantAgent + >>> + >>> policy = GovernancePolicy( + ... max_tool_calls=10, + ... blocked_patterns=["DROP TABLE", "rm -rf"], + ... require_human_approval=False, + ... ) + >>> + >>> team = GovernedTeam( + ... agents=[agent1, agent2], + ... policy=policy, + ... ) + >>> result = await team.run("Analyze this data") +""" + +from ._governance import ( + GovernancePolicy, + GovernedAgent, + GovernedTeam, + PolicyViolationError, + ExecutionContext, +) + +__all__ = [ + "GovernancePolicy", + "GovernedAgent", + "GovernedTeam", + "PolicyViolationError", + "ExecutionContext", +] diff --git a/python/packages/autogen-ext/src/autogen_ext/governance/_governance.py b/python/packages/autogen-ext/src/autogen_ext/governance/_governance.py new file mode 100644 index 000000000000..8cbcce5cf968 --- /dev/null +++ b/python/packages/autogen-ext/src/autogen_ext/governance/_governance.py @@ -0,0 +1,341 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Agent-OS Governance Implementation for AutoGen +=============================================== + +Kernel-level governance for AutoGen multi-agent conversations. +""" + +from __future__ import annotations + +import logging +import uuid +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Sequence + +logger = logging.getLogger(__name__) + + +@dataclass +class GovernancePolicy: + """Policy configuration for governed agents.""" + + # Message limits + max_messages: int = 100 + max_tool_calls: int = 50 + timeout_seconds: int = 300 + + # Tool filtering + allowed_tools: List[str] = field(default_factory=list) + blocked_tools: List[str] = field(default_factory=list) + + # Content filtering + blocked_patterns: List[str] = field(default_factory=list) + max_message_length: int = 50000 + + # Approval flows + require_human_approval: bool = False + approval_tools: List[str] = field(default_factory=list) + + # Audit + log_all_messages: bool = True + + +@dataclass +class ExecutionContext: + """Runtime context for governed execution.""" + + session_id: str + policy: GovernancePolicy + started_at: datetime = field(default_factory=datetime.utcnow) + + # Counters + message_count: int = 0 + tool_calls: int = 0 + + # Audit trail + events: List[Dict[str, Any]] = field(default_factory=list) + + def record_event(self, event_type: str, data: Dict[str, Any]) -> None: + """Record an audit event.""" + self.events.append( + { + "type": event_type, + "timestamp": datetime.utcnow().isoformat(), + "data": data, + } + ) + + +class PolicyViolationError(Exception): + """Raised when a policy violation is detected.""" + + def __init__(self, policy_name: str, description: str, severity: str = "high"): + self.policy_name = policy_name + self.description = description + self.severity = severity + super().__init__(f"Policy violation ({policy_name}): {description}") + + +class GovernedAgent: + """ + Wrapper that adds governance to any AutoGen agent. + + Intercepts messages and tool calls to enforce policies. + """ + + def __init__( + self, + agent: Any, + policy: GovernancePolicy, + on_violation: Optional[Callable[[PolicyViolationError], None]] = None, + ): + self._agent = agent + self._policy = policy + self._on_violation = on_violation or self._default_violation_handler + self._context = ExecutionContext( + session_id=str(uuid.uuid4())[:8], + policy=policy, + ) + + def _default_violation_handler(self, error: PolicyViolationError) -> None: + """Default handler logs violations.""" + logger.error(f"Policy violation: {error}") + + @property + def name(self) -> str: + """Get agent name.""" + return getattr(self._agent, "name", "unknown") + + @property + def original(self) -> Any: + """Get original unwrapped agent.""" + return self._agent + + def _check_content(self, content: str) -> tuple[bool, str]: + """Check content against blocked patterns.""" + if len(content) > self._policy.max_message_length: + return False, f"Message exceeds max length ({len(content)} > {self._policy.max_message_length})" + + content_lower = content.lower() + for pattern in self._policy.blocked_patterns: + if pattern.lower() in content_lower: + return False, f"Content matches blocked pattern: {pattern}" + + return True, "" + + def _check_tool(self, tool_name: str) -> tuple[bool, str]: + """Check if tool is allowed.""" + if tool_name in self._policy.blocked_tools: + return False, f"Tool '{tool_name}' is blocked" + + if self._policy.allowed_tools and tool_name not in self._policy.allowed_tools: + return False, f"Tool '{tool_name}' not in allowed list" + + if self._context.tool_calls >= self._policy.max_tool_calls: + return False, f"Tool call limit ({self._policy.max_tool_calls}) exceeded" + + return True, "" + + async def on_messages( + self, + messages: Sequence[Any], + cancellation_token: Optional[Any] = None, + ) -> Any: + """Handle incoming messages with governance.""" + # Check message count + if self._context.message_count >= self._policy.max_messages: + error = PolicyViolationError( + "message_limit", + f"Message limit ({self._policy.max_messages}) exceeded", + ) + self._on_violation(error) + raise error + + # Check each message content + for msg in messages: + content = getattr(msg, "content", str(msg)) + if isinstance(content, str): + ok, reason = self._check_content(content) + if not ok: + error = PolicyViolationError("content_filter", reason) + self._on_violation(error) + raise error + + self._context.message_count += len(messages) + self._context.record_event( + "messages_received", + {"count": len(messages)}, + ) + + # Forward to original agent + if hasattr(self._agent, "on_messages"): + return await self._agent.on_messages(messages, cancellation_token) + + return None + + async def on_messages_stream( + self, + messages: Sequence[Any], + cancellation_token: Optional[Any] = None, + ) -> AsyncGenerator[Any, None]: + """Handle streaming messages with governance.""" + # Pre-check + for msg in messages: + content = getattr(msg, "content", str(msg)) + if isinstance(content, str): + ok, reason = self._check_content(content) + if not ok: + error = PolicyViolationError("content_filter", reason) + self._on_violation(error) + raise error + + self._context.message_count += len(messages) + + # Stream from original + if hasattr(self._agent, "on_messages_stream"): + async for chunk in self._agent.on_messages_stream(messages, cancellation_token): + yield chunk + + def __getattr__(self, name: str) -> Any: + """Forward unknown attributes to original agent.""" + return getattr(self._agent, name) + + +class GovernedTeam: + """ + Governed team of AutoGen agents. + + Wraps a team to enforce policies across all agent interactions. + """ + + def __init__( + self, + agents: List[Any], + policy: Optional[GovernancePolicy] = None, + termination_condition: Optional[Any] = None, + on_violation: Optional[Callable[[PolicyViolationError], None]] = None, + ): + self._policy = policy or GovernancePolicy() + self._on_violation = on_violation + + # Wrap all agents + self._governed_agents = [ + GovernedAgent(agent, self._policy, on_violation) for agent in agents + ] + + self._termination_condition = termination_condition + self._context = ExecutionContext( + session_id=str(uuid.uuid4())[:8], + policy=self._policy, + ) + + @property + def agents(self) -> List[GovernedAgent]: + """Get governed agents.""" + return self._governed_agents + + async def run( + self, + task: str, + cancellation_token: Optional[Any] = None, + ) -> Any: + """Run team with governance.""" + # Check task content + ok, reason = self._check_content(task) + if not ok: + error = PolicyViolationError("content_filter", reason) + if self._on_violation: + self._on_violation(error) + raise error + + self._context.record_event("team_run_start", {"task_length": len(task)}) + + try: + # Import RoundRobinGroupChat dynamically to avoid hard dependency + from autogen_agentchat.teams import RoundRobinGroupChat + + # Create team with governed agents + original_agents = [ga.original for ga in self._governed_agents] + team = RoundRobinGroupChat( + original_agents, + termination_condition=self._termination_condition, + ) + + result = await team.run(task=task, cancellation_token=cancellation_token) + + self._context.record_event("team_run_complete", {"success": True}) + return result + + except ImportError: + # Fallback: just run first agent + logger.warning("autogen_agentchat not available, running first agent only") + if self._governed_agents: + return await self._governed_agents[0].on_messages([task], cancellation_token) + return None + + async def run_stream( + self, + task: str, + cancellation_token: Optional[Any] = None, + ) -> AsyncGenerator[Any, None]: + """Run team with streaming and governance.""" + ok, reason = self._check_content(task) + if not ok: + error = PolicyViolationError("content_filter", reason) + if self._on_violation: + self._on_violation(error) + raise error + + try: + from autogen_agentchat.teams import RoundRobinGroupChat + + original_agents = [ga.original for ga in self._governed_agents] + team = RoundRobinGroupChat( + original_agents, + termination_condition=self._termination_condition, + ) + + async for chunk in team.run_stream(task=task, cancellation_token=cancellation_token): + yield chunk + + except ImportError: + logger.warning("autogen_agentchat not available") + + def _check_content(self, content: str) -> tuple[bool, str]: + """Check content against policy.""" + if len(content) > self._policy.max_message_length: + return False, f"Content exceeds max length" + + content_lower = content.lower() + for pattern in self._policy.blocked_patterns: + if pattern.lower() in content_lower: + return False, f"Content matches blocked pattern: {pattern}" + + return True, "" + + def get_audit_log(self) -> List[Dict[str, Any]]: + """Get combined audit log from team and all agents.""" + events = list(self._context.events) + for agent in self._governed_agents: + events.extend(agent._context.events) + return sorted(events, key=lambda e: e["timestamp"]) + + def get_stats(self) -> Dict[str, Any]: + """Get governance statistics.""" + total_messages = sum(a._context.message_count for a in self._governed_agents) + total_tool_calls = sum(a._context.tool_calls for a in self._governed_agents) + + return { + "session_id": self._context.session_id, + "agent_count": len(self._governed_agents), + "total_messages": total_messages, + "total_tool_calls": total_tool_calls, + "policy": { + "max_messages": self._policy.max_messages, + "max_tool_calls": self._policy.max_tool_calls, + "blocked_patterns_count": len(self._policy.blocked_patterns), + }, + }