Skip to content

Document SSE streaming example for agent progress #13

@fbraza

Description

@fbraza

Summary

Add documentation (or a sample utility) showing how to surface Pydantic-AI agent progress events back to a client via Server-Sent Events.

Motivation

Our voice-driven Slack agent needs to keep the mobile user informed ("transcribing", "posting to Slack", etc.). Pydantic-AI already exposes rich AgentStreamEvents; we just need an example that maps those events to user-friendly updates and streams them to the app.

Proposal

Document or ship an example FastAPI endpoint that wraps agent.run_stream() with an event_stream_handler, queues translated events, and feeds them to the client over SSE. A full sketch of the pattern we use:

# app/routes/slack_agent.py
import asyncio
import uuid
from collections.abc import AsyncIterable, AsyncIterator
from datetime import datetime, timezone
from typing import Any

from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from pydantic_ai import Agent, RunContext
from pydantic_ai.stream import AgentStreamEvent

router = APIRouter(prefix="/voice", tags=["voice"])
agents = Agent("openai:gpt-4o")  # existing agent, simplified here

class RunRequest(BaseModel):
    audio_url: str
    transcription: str | None = None

class ClientEvent(BaseModel):
    timestamp: str
    threadId: str
    runId: str
    event: str
    payload: dict[str, Any]

async def translate_events(
    ctx: RunContext,
    events: AsyncIterable[AgentStreamEvent],
    queue: "asyncio.Queue[ClientEvent]",
    thread_id: str,
    run_id: str,
) -> None:
    async for event in events:
        if event.type == "run-started":
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="agent_started",
                    payload={"agent": ctx.agent.name},
                )
            )
        elif event.type == "part-delta":
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="model_delta",
                    payload={"delta": event.delta},
                )
            )
        elif event.type == "tool-call-start":
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="tool_call",
                    payload={
                        "toolCallId": event.tool_call.id,
                        "toolName": event.tool_call.name,
                        "arguments": event.tool_call.args,
                    },
                )
            )
        elif event.type == "tool-call-finish":
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="tool_result",
                    payload={
                        "toolCallId": event.tool_call.id,
                        "resultSummary": event.tool_call.result_summary,
                    },
                )
            )
        elif event.type == "run-finished":
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="agent_finished",
                    payload={"status": "success"},
                )
            )
    await queue.put(
        ClientEvent(
            timestamp=datetime.now(timezone.utc).isoformat(),
            threadId=thread_id,
            runId=run_id,
            event="stream_complete",
            payload={},
        )
    )

async def sse_stream(queue: "asyncio.Queue[ClientEvent]") -> AsyncIterator[bytes]:
    while True:
        event = await queue.get()
        yield f"data: {event.model_dump_json()}\n\n".encode()
        if event.event == "stream_complete":
            break

@router.post("/runs")
async def voice_run(req: RunRequest, request: Request) -> StreamingResponse:
    if not req.transcription:
        raise HTTPException(status_code=400, detail="transcription required for now")

    thread_id = f"mobile-{uuid.uuid4()}"
    run_id = f"run-{uuid.uuid4()}"
    queue: asyncio.Queue[ClientEvent] = asyncio.Queue()

    async def run_agent() -> None:
        async with agents.run_stream(
            req.transcription,
            event_stream_handler=lambda ctx, events: translate_events(
                ctx, events, queue, thread_id, run_id
            ),
        ) as run:
            async for chunk in run.stream_text():
                await queue.put(
                    ClientEvent(
                        timestamp=datetime.now(timezone.utc).isoformat(),
                        threadId=thread_id,
                        runId=run_id,
                        event="model_delta",
                        payload={"delta": chunk},
                    )
                )
            await queue.put(
                ClientEvent(
                    timestamp=datetime.now(timezone.utc).isoformat(),
                    threadId=thread_id,
                    runId=run_id,
                    event="final_output",
                    payload={"text": run.output},
                )
            )

    task = asyncio.create_task(run_agent())
    request.scope["background"] = task
    return StreamingResponse(sse_stream(queue), media_type="text/event-stream")

A documented pattern (or helper) like this would make it straightforward to surface safe, user-facing progress logs without exposing internal observability tooling.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions