From 73c141e04e491d0792c5631e17e47ecafc39353a Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 22 Jan 2026 14:58:04 +0100 Subject: [PATCH] Hook otel instrumentation when available Signed-off-by: Albert Callarisa --- durabletask/aio/client.py | 7 ++++ durabletask/client.py | 6 ++++ durabletask/worker.py | 73 +++++++++++++++++++++++++++------------ 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/durabletask/aio/client.py b/durabletask/aio/client.py index 9b93b96d..a57fe019 100644 --- a/durabletask/aio/client.py +++ b/durabletask/aio/client.py @@ -24,6 +24,13 @@ new_orchestration_state, ) +# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor +try: + from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + GrpcInstrumentorClient().instrument() +except ImportError: + pass + class AsyncTaskHubGrpcClient: def __init__( diff --git a/durabletask/client.py b/durabletask/client.py index e3d391fc..7d86414f 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -21,6 +21,12 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") +# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor +try: + from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + GrpcInstrumentorClient().instrument() +except ImportError: + pass class OrchestrationStatus(Enum): """The status of an orchestration instance.""" diff --git a/durabletask/worker.py b/durabletask/worker.py index 5ad1cccb..7764f98a 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import asyncio +import contextlib import inspect import logging import os @@ -26,6 +27,17 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") +# If `opentelemetry-sdk` is available, enable the tracer +try: + from opentelemetry import trace + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + otel_propagator = TraceContextTextMapPropagator() + otel_tracer = trace.get_tracer(__name__) +except ImportError: + otel_tracer = None + + def _log_all_threads(logger: logging.Logger, context: str = ""): """Helper function to log all currently active threads for debugging.""" @@ -759,31 +771,46 @@ def _execute_activity( completionToken, ): instance_id = req.orchestrationInstance.instanceId - try: - executor = _ActivityExecutor(self._registry, self._logger) - result = executor.execute(instance_id, req.name, req.taskId, req.input.value) - res = pb.ActivityResponse( - instanceId=instance_id, - taskId=req.taskId, - result=ph.get_string_value(result), - completionToken=completionToken, - ) - except Exception as ex: - res = pb.ActivityResponse( - instanceId=instance_id, - taskId=req.taskId, - failureDetails=ph.new_failure_details(ex), - completionToken=completionToken, - ) - try: - stub.CompleteActivityTask(res) - except grpc.RpcError as rpc_error: # type: ignore - self._handle_grpc_execution_error(rpc_error, "activity") - except Exception as ex: - self._logger.exception( - f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" + if otel_tracer is not None: + span_context = otel_tracer.start_as_current_span( + name=f'activity: {req.name}', + context=otel_propagator.extract(carrier={"traceparent": req.parentTraceContext.traceParent}), + attributes={ + "durabletask.task.instance_id": instance_id, + "durabletask.task.id": req.taskId, + "durabletask.activity.name": req.name, + } ) + else: + span_context = contextlib.nullcontext() + + with span_context: + try: + executor = _ActivityExecutor(self._registry, self._logger) + result = executor.execute(instance_id, req.name, req.taskId, req.input.value) + res = pb.ActivityResponse( + instanceId=instance_id, + taskId=req.taskId, + result=ph.get_string_value(result), + completionToken=completionToken, + ) + except Exception as ex: + res = pb.ActivityResponse( + instanceId=instance_id, + taskId=req.taskId, + failureDetails=ph.new_failure_details(ex), + completionToken=completionToken, + ) + + try: + stub.CompleteActivityTask(res) + except grpc.RpcError as rpc_error: # type: ignore + self._handle_grpc_execution_error(rpc_error, "activity") + except Exception as ex: + self._logger.exception( + f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" + ) class _RuntimeOrchestrationContext(