Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agentex/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ dependencies = [
"ddtrace>=3.13.0",
"json_log_formatter>=1.1.1",
"datadog>=0.52.1",
"opentelemetry-api>=1.27.0",
"opentelemetry-sdk>=1.27.0",
"opentelemetry-instrumentation-sqlalchemy>=0.48b0",
]

[dependency-groups]
Expand Down
10 changes: 10 additions & 0 deletions agentex/src/config/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from temporalio.client import Client as TemporalClient

from src.config.environment_variables import Environment, EnvironmentVariables
from src.config.otel_instrumentation import instrument_engine, uninstrument_all
from src.utils.database import async_db_engine_creator
from src.utils.logging import make_logger

Expand Down Expand Up @@ -113,6 +114,12 @@ async def load(self):
pool_recycle=3600, # Recycle connections after 1 hour
)

# Instrument SQLAlchemy engines with OpenTelemetry
instrument_engine(self.database_async_read_write_engine, "main")
instrument_engine(
self.database_async_middleware_read_write_engine, "middleware"
)

# Initialize MongoDB client and database
try:
mongodb_uri = self.environment_variables.MONGODB_URI
Expand Down Expand Up @@ -231,6 +238,9 @@ def shutdown():


async def async_shutdown():
# Uninstrument SQLAlchemy engines before disposal
uninstrument_all()

global_dependencies = GlobalDependencies()
run_concurrently = []
if global_dependencies.database_async_read_only_engine:
Expand Down
57 changes: 57 additions & 0 deletions agentex/src/config/otel_instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
OpenTelemetry instrumentation for SQLAlchemy engines.
"""

from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy.ext.asyncio import AsyncEngine

from src.utils.logging import make_logger

logger = make_logger(__name__)

_instrumented_engines: set[int] = set()


def instrument_engine(
engine: AsyncEngine | None,
engine_name: str = "default",
) -> None:
"""
Instrument a SQLAlchemy async engine with OpenTelemetry.

For async engines, we instrument the underlying sync_engine since
SQLAlchemyInstrumentor works at the sync engine level.

Args:
engine: The async SQLAlchemy engine to instrument
engine_name: A name identifier for logging
"""
if engine is None:
logger.warning(f"Cannot instrument {engine_name}: engine is None")
return

engine_id = id(engine)
if engine_id in _instrumented_engines:
logger.debug(f"Engine {engine_name} already instrumented")
return

try:
SQLAlchemyInstrumentor().instrument(
engine=engine.sync_engine,
)
_instrumented_engines.add(engine_id)
logger.info(
f"Instrumented SQLAlchemy engine '{engine_name}' with OpenTelemetry"
)
except Exception as e:
logger.error(f"Failed to instrument SQLAlchemy engine '{engine_name}': {e}")


def uninstrument_all() -> None:
"""Remove instrumentation from all engines."""
try:
SQLAlchemyInstrumentor().uninstrument()
_instrumented_engines.clear()
logger.info("Uninstrumented all SQLAlchemy engines")
except Exception as e:
logger.error(f"Failed to uninstrument SQLAlchemy engines: {e}")
66 changes: 66 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading