Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
new_orchestration_state,
)

# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
try:
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
GrpcInstrumentorClient().instrument()
except ImportError:
pass


class AsyncTaskHubGrpcClient:
def __init__(
Expand Down
6 changes: 6 additions & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")

# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
try:
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
GrpcInstrumentorClient().instrument()
except ImportError:
pass

class OrchestrationStatus(Enum):
"""The status of an orchestration instance."""
Expand Down
73 changes: 50 additions & 23 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import asyncio
import contextlib
import inspect
import logging
import os
Expand All @@ -26,6 +27,17 @@
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")

# If `opentelemetry-sdk` is available, enable the tracer
try:
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

otel_propagator = TraceContextTextMapPropagator()
otel_tracer = trace.get_tracer(__name__)
except ImportError:
otel_tracer = None



def _log_all_threads(logger: logging.Logger, context: str = ""):
"""Helper function to log all currently active threads for debugging."""
Expand Down Expand Up @@ -759,31 +771,46 @@ def _execute_activity(
completionToken,
):
instance_id = req.orchestrationInstance.instanceId
try:
executor = _ActivityExecutor(self._registry, self._logger)
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
result=ph.get_string_value(result),
completionToken=completionToken,
)
except Exception as ex:
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
failureDetails=ph.new_failure_details(ex),
completionToken=completionToken,
)

try:
stub.CompleteActivityTask(res)
except grpc.RpcError as rpc_error: # type: ignore
self._handle_grpc_execution_error(rpc_error, "activity")
except Exception as ex:
self._logger.exception(
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
if otel_tracer is not None:
span_context = otel_tracer.start_as_current_span(
name=f'activity: {req.name}',
context=otel_propagator.extract(carrier={"traceparent": req.parentTraceContext.traceParent}),
attributes={
"durabletask.task.instance_id": instance_id,
"durabletask.task.id": req.taskId,
"durabletask.activity.name": req.name,
}
)
else:
span_context = contextlib.nullcontext()

with span_context:
try:
executor = _ActivityExecutor(self._registry, self._logger)
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
result=ph.get_string_value(result),
completionToken=completionToken,
)
except Exception as ex:
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
failureDetails=ph.new_failure_details(ex),
completionToken=completionToken,
)

try:
stub.CompleteActivityTask(res)
except grpc.RpcError as rpc_error: # type: ignore
self._handle_grpc_execution_error(rpc_error, "activity")
except Exception as ex:
self._logger.exception(
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
)


class _RuntimeOrchestrationContext(
Expand Down
Loading