End-to-End Framework for Building, Orchestrating & Observing Intelligent AI Agents
The SynapSys is a modular, extensible system for building autonomous and semi-autonomous agents that can reason, act, and adapt.
It integrates workflow orchestration, observability, tool integrations, and Intel® optimizations to deliver real-world performance and reliability for intelligent automation systems.
This repository contains an Enterprise-grade implementation with: multi-tenancy, Pydantic validation, a policy system, PostgreSQL-backed Flow Registry, REST ingress, Docker Compose for local dev, CLI tooling, and extensive tests.
agent-framework/
├── sdk/ # Core SDK for agent creation and workflow design
│ ├── models.py # Pydantic Task/Flow models
│ ├── policy.py # Policy interfaces and builtin policies
│ └── registry.py # FlowRegistry client & helpers
├── orchestrator/ # Workflow orchestration engine (Airflow/state-machine)
│ ├── dags/ # Airflow DAG definitions generated from flows
│ └── state_machine.py # Alternative state-machine runner
├── executor/ # Execution engine and workers
│ ├── executor_core.py
│ ├── retry_manager.py
│ └── tools/ # Tool adapters (LLM, HTTP, DB, OCR...)
├── ingress/ # REST API, webhooks, and ingress handlers
│ ├── api.py # FastAPI application
│ └── auth.py # JWT auth & tenant middleware
├── infra/ # Docker, Kubernetes manifests, CI configs
│ ├── docker-compose.yml
│ └── k8s/
├── observability/ # Prometheus, Grafana, OpenTelemetry configs
├── demos/ # Reference agents and example workflows
│ ├── doc_qna/
│ └── ticket_resolver/
├── docs/ # Design docs, diagrams, screenshots, READMEs per module
├── tests/ # Unit & integration tests (pytest)
└── tools/ # CLI (agentctl) and helper scripts- Pydantic Models: Type-safe Task and Flow models with automatic validation and helpful error messages.
- Policy System: Extensible policy framework supporting pre-task, post-task, and flow-level policies (for retries, timeouts, validation, tenant rules).
- Flow Registry: PostgreSQL-backed registry for versioned flow metadata, auditing, and search.
- REST API: FastAPI-based ingress API with JWT authentication, multi-tenant middleware, and webhook support.
- Docker Compose: Complete local development environment (Postgres, Redis, Kafka, Airflow, Grafana, Prometheus).
- CLI Tool:
agentctlfor validate/publish/list/get/delete flows and view registry stats. - Multi-tenancy: Tenant isolation via request middleware, storage partitioning and policy scopes.
- Comprehensive Testing: Pytest test suite covering SDK, policies, registry, and end-to-end flows.
The easiest way to get started is using Docker Compose, which provides a complete local development environment:
# Start all services
docker-compose up -d
# View logs for the ingress API
docker-compose logs -f ingress-api
# Test the API health endpoint
curl http://localhost:8000/health
# Stop services
docker-compose down- Ingress API (Port 8000): FastAPI service for flow management and ingress.
- PostgreSQL (Port 5432): Database for audit logs and flow registry.
- Redis (Port 6379): Caching layer and short-term memory.
- Kafka (Port 9092): Message broker for flow events and inter-agent messaging.
- Kafka UI (Port 8080): Web interface for Kafka monitoring.
- Airflow (Port 8081): DAG orchestration UI (optional, if using Airflow orchestrator).
- Grafana (Port 3000) & Prometheus (Port 9090): Observability stack.
For detailed Docker setup instructions, see docker/README.md.
git clone https://github.com/your-username/agent-framework.git
cd agent-framework
python -m venv venv
source venv/bin/activate
pip install -r requirements.txtIf you use Docker Compose, many dependencies (DB, Kafka, Redis) are provided by the compose stack so local install is mainly for development tools and SDK usage.
from sdk.models import Task, Flow
# Create tasks
task1 = Task(
handler="data_processor",
inputs={"source": "database", "table": "users"},
retries=3,
timeout=300,
condition="user_count > 0"
)
task2 = Task(
handler="ml_predictor",
inputs={"model": "sentiment_analysis"},
retries=2,
timeout=600
)
# Create flow
flow = Flow(
tasks=[task1, task2],
version="1.0.0",
type="DAG",
tenant_id="tenant_123"
)
# Validate flow
validation_result = flow.validate()
if validation_result["valid"]:
print("Flow is valid!")
else:
print("Validation errors:", validation_result["errors"])from sdk.policy import RetryLimitPolicy, TimeoutPolicy, TenantIsolationPolicy
# Create policies
retry_policy = RetryLimitPolicy(max_retries=5)
timeout_policy = TimeoutPolicy(max_timeout=3600)
tenant_policy = TenantIsolationPolicy()
# Apply policies
for task in flow.tasks:
retry_result = retry_policy.evaluate(task=task, flow=flow)
timeout_result = timeout_policy.evaluate(task=task, flow=flow)
print(f"Task {task.handler}: {retry_result['result'].value}")from sdk.registry import FlowRegistry
# Connect to registry
registry = FlowRegistry(
host="localhost",
database="agent_framework",
username="postgres",
password="password"
)
# Register flow
metadata = registry.register_flow(
flow=flow,
name="Data Processing Flow",
description="Processes user data and generates predictions",
tags=["data-processing", "ml"]
)
# List flows
flows = registry.list_flows(tenant_id="tenant_123")
for flow_metadata in flows:
print(f"Flow: {flow_metadata.name} (v{flow_metadata.version})")# Validate a flow
python agentctl validate my_flow.json
# Publish a flow to registry
python agentctl publish my_flow.json --name "My Flow" --tags "ml,data"
# List flows
python agentctl list --tenant-id tenant_123
# Get flow details
python agentctl get flow-123-456
# Show registry statistics
python agentctl statsA single executable task in a workflow.
Task(
handler: str, # Handler/connector name
inputs: Dict[str, Any] = {}, # Input parameters
retries: int = 3, # Maximum retry attempts
timeout: int = 300, # Timeout in seconds
condition: Optional[str] = None, # Conditional execution
metadata: Dict[str, Any] = {} # Additional metadata
)A complete workflow consisting of multiple tasks.
Flow(
tasks: List[Task], # List of tasks
version: str = "1.0.0", # Flow version
type: Literal["DAG", "STATE_MACHINE"] = "DAG", # Execution type
tenant_id: Optional[str] = None, # Tenant identifier
metadata: Dict[str, Any] = {} # Additional metadata
)class Policy(ABC):
def evaluate(self, context: Dict[str, Any], **kwargs) -> Dict[str, Any]:
passEvaluated before task execution.
class PreTaskPolicy(Policy):
def evaluate(self, task: Task, flow: Flow, context: Optional[Dict] = None) -> Dict[str, Any]:
passEvaluated after task execution.
class PostTaskPolicy(Policy):
def evaluate(self, task: Task, flow: Flow, result: Dict[str, Any], context: Optional[Dict] = None) -> Dict[str, Any]:
passEvaluated at the flow level.
class FlowPolicy(Policy):
def evaluate(self, flow: Flow, context: Optional[Dict] = None) -> Dict[str, Any]:
passPostgreSQL-backed registry for flow metadata.
registry = FlowRegistry(
connection_string: Optional[str] = None,
host: str = "localhost",
port: int = 5432,
database: str = "agent_framework",
username: str = "postgres",
password: str = "password",
schema: str = "public"
)Methods:
register_flow(flow, name, description=None, tags=None, metadata=None)get_flow(flow_id)get_flow_metadata(flow_id)list_flows(tenant_id=None, name_filter=None, tag_filter=None, limit=100, offset=0)update_flow(flow, name=None, description=None, tags=None, metadata=None)delete_flow(flow_id)search_flows(query, tenant_id=None, limit=100)get_flow_statistics(tenant_id=None)
Enforces maximum retry limits on tasks.
policy = RetryLimitPolicy(max_retries=5)Enforces timeout limits on tasks.
policy = TimeoutPolicy(max_timeout=3600)Enforces tenant isolation in flows.
policy = TenantIsolationPolicy()Validates task execution results.
policy = ResultValidationPolicy(required_fields=["status", "data"])Run the test suite:
pytest tests/ -vRun specific test modules:
pytest tests/sdk/test_models.py -v
pytest tests/sdk/test_policy.py -v
pytest tests/sdk/test_registry.py -vSee the examples/ directory for complete usage examples:
sdk_example.py: Comprehensive example showing all SDK features
Validate a flow definition file.
python agentctl validate <flow.json> [--output results.json] [--format json|yaml|text]Publish a flow to the registry.
python agentctl publish <flow.json> --name "Flow Name" [--description "Description"] [--tags "tag1,tag2"]List flows in the registry.
python agentctl list [--tenant-id <id>] [--name-filter <name>] [--tags "tag1,tag2"] [--format table|json|yaml]Retrieve a flow by ID.
python agentctl get <flow-id> [--output <file.json>]Delete a flow from the registry.
python agentctl delete <flow-id> [--confirm]Show registry statistics.
python agentctl statsThe registry requires a PostgreSQL database. Create the database and user:
CREATE DATABASE agent_framework;
CREATE USER agent_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE agent_framework TO agent_user;The registry will automatically create the required tables on first use.
You can configure the registry using environment variables:
export AGENT_DB_HOST=localhost
export AGENT_DB_PORT=5432
export AGENT_DB_NAME=agent_framework
export AGENT_DB_USER=agent_user
export AGENT_DB_PASSWORD=your_passwordUse these links to jump to module-level documentation within this repository.
- 📦 docker/README.md — Docker Compose, environment, and deployment instructions.
- ⚙️ sdk/README.md — SDK reference, models, and code examples.
- 🧩 cli/README.md — CLI usage, flags, and examples (
agentctl). - 🌐 api/README.md — REST API endpoints, authentication, and webhook docs.
- 📊 observability/README.md — Grafana dashboards, Prometheus metrics, and tracing.
- 🧪 tests/README.md — Test strategy and running CI tests.
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request with clear description and changelog
This project is licensed under the MIT License - see the LICENSE file for details.
Built with ❤️ for the Intel Unnati Program 2025
Powered by FastAPI, PostgreSQL, and Docker.
© 2025 AI Agent Framework