-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Summary
Add documentation (or a sample utility) showing how to surface Pydantic-AI agent progress events back to a client via Server-Sent Events.
Motivation
Our voice-driven Slack agent needs to keep the mobile user informed ("transcribing", "posting to Slack", etc.). Pydantic-AI already exposes rich AgentStreamEvents; we just need an example that maps those events to user-friendly updates and streams them to the app.
Proposal
Document or ship an example FastAPI endpoint that wraps agent.run_stream() with an event_stream_handler, queues translated events, and feeds them to the client over SSE. A full sketch of the pattern we use:
# app/routes/slack_agent.py
import asyncio
import uuid
from collections.abc import AsyncIterable, AsyncIterator
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from pydantic_ai.stream import AgentStreamEvent
router = APIRouter(prefix="/voice", tags=["voice"])
agents = Agent("openai:gpt-4o") # existing agent, simplified here
class RunRequest(BaseModel):
audio_url: str
transcription: str | None = None
class ClientEvent(BaseModel):
timestamp: str
threadId: str
runId: str
event: str
payload: dict[str, Any]
async def translate_events(
ctx: RunContext,
events: AsyncIterable[AgentStreamEvent],
queue: "asyncio.Queue[ClientEvent]",
thread_id: str,
run_id: str,
) -> None:
async for event in events:
if event.type == "run-started":
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="agent_started",
payload={"agent": ctx.agent.name},
)
)
elif event.type == "part-delta":
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="model_delta",
payload={"delta": event.delta},
)
)
elif event.type == "tool-call-start":
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="tool_call",
payload={
"toolCallId": event.tool_call.id,
"toolName": event.tool_call.name,
"arguments": event.tool_call.args,
},
)
)
elif event.type == "tool-call-finish":
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="tool_result",
payload={
"toolCallId": event.tool_call.id,
"resultSummary": event.tool_call.result_summary,
},
)
)
elif event.type == "run-finished":
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="agent_finished",
payload={"status": "success"},
)
)
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="stream_complete",
payload={},
)
)
async def sse_stream(queue: "asyncio.Queue[ClientEvent]") -> AsyncIterator[bytes]:
while True:
event = await queue.get()
yield f"data: {event.model_dump_json()}\n\n".encode()
if event.event == "stream_complete":
break
@router.post("/runs")
async def voice_run(req: RunRequest, request: Request) -> StreamingResponse:
if not req.transcription:
raise HTTPException(status_code=400, detail="transcription required for now")
thread_id = f"mobile-{uuid.uuid4()}"
run_id = f"run-{uuid.uuid4()}"
queue: asyncio.Queue[ClientEvent] = asyncio.Queue()
async def run_agent() -> None:
async with agents.run_stream(
req.transcription,
event_stream_handler=lambda ctx, events: translate_events(
ctx, events, queue, thread_id, run_id
),
) as run:
async for chunk in run.stream_text():
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="model_delta",
payload={"delta": chunk},
)
)
await queue.put(
ClientEvent(
timestamp=datetime.now(timezone.utc).isoformat(),
threadId=thread_id,
runId=run_id,
event="final_output",
payload={"text": run.output},
)
)
task = asyncio.create_task(run_agent())
request.scope["background"] = task
return StreamingResponse(sse_stream(queue), media_type="text/event-stream")A documented pattern (or helper) like this would make it straightforward to surface safe, user-facing progress logs without exposing internal observability tooling.