diff --git a/examples/interceptors_examples.py b/examples/interceptors_examples.py new file mode 100644 index 0000000..cee1102 --- /dev/null +++ b/examples/interceptors_examples.py @@ -0,0 +1,22 @@ +from tracely import Interceptor +from tracely import init_tracing +from tracely import trace_event + + +class ExampleInterceptor(Interceptor): + def before_call(self, span, context, *args, **kwargs): + span.set_attribute("custom_attribute", 1) + + def after_call(self, span, context, *args, **kwargs): + pass + + def on_exception(self, span, context, exception) -> bool: + pass + + +init_tracing(exporter_type="console", interceptors=[ExampleInterceptor()]) + + +@trace_event() +def func(data: str) -> str: + return data diff --git a/examples/openai_manual_tracing.py b/examples/openai_manual_tracing.py index 597a7b6..341faaf 100644 --- a/examples/openai_manual_tracing.py +++ b/examples/openai_manual_tracing.py @@ -77,9 +77,9 @@ def call_with_user_id_explicit(input: str): ) print(call_openai("What is LLM?")) - print(call_openai_with_helper("What is LLM?")) - print(call_openai_with_context("What is LLM?")) - print(multiple_calls_openai("What is LLM?")) - print(call_with_user_id_param("What is LLM?", "user_id")) - print(call_with_user_id_explicit("What is LLM?")) + # print(call_openai_with_helper("What is LLM?")) + # print(call_openai_with_context("What is LLM?")) + # print(multiple_calls_openai("What is LLM?")) + # print(call_with_user_id_param("What is LLM?", "user_id")) + # print(call_with_user_id_explicit("What is LLM?")) sleep(1) diff --git a/tracely/src/tracely/__init__.py b/tracely/src/tracely/__init__.py index f34b4e3..e663b02 100644 --- a/tracely/src/tracely/__init__.py +++ b/tracely/src/tracely/__init__.py @@ -1,10 +1,15 @@ from ._tracer_provider import UsageDetails from ._tracer_provider import init_tracing -from ._tracer_provider import get_info +from ._context import get_info +from ._context import get_interceptors +from ._context import get_tracer from .decorators import trace_event from .context import create_trace_event from .context import bind_to_trace -from .proxy import get_current_span +from .interceptors import Interceptor +from .proxy import SpanObject +from ._runtime_context import get_current_span +from ._runtime_context import RuntimeContext from ._version import __version__ @@ -13,8 +18,13 @@ "create_trace_event", "get_current_span", "get_info", + "get_tracer", + "get_interceptors", "init_tracing", "bind_to_trace", + "Interceptor", "trace_event", + "SpanObject", + "RuntimeContext", "__version__", ] diff --git a/tracely/src/tracely/_context.py b/tracely/src/tracely/_context.py new file mode 100644 index 0000000..f316078 --- /dev/null +++ b/tracely/src/tracely/_context.py @@ -0,0 +1,83 @@ +import dataclasses +import typing +import uuid +from typing import Dict +from typing import List +from typing import Optional +from typing import Union + +import opentelemetry +from opentelemetry import trace +from opentelemetry.context import Context +from opentelemetry.trace import NonRecordingSpan +from opentelemetry.trace import SpanContext +from opentelemetry.trace import TraceFlags + +if typing.TYPE_CHECKING: + from .interceptors import Interceptor + + +@dataclasses.dataclass +class UsageDetails: + cost_per_token: Dict[str, float] + + +class DataContext: + export_id: Union[str, uuid.UUID] + project_id: Union[str, uuid.UUID] + default_usage_details: Optional[UsageDetails] + usage_details_by_model_id: Optional[Dict[str, UsageDetails]] + interceptors: List["Interceptor"] + + def __init__( + self, + export_id: str, + project_id: str, + default_usage_details: Optional[UsageDetails] = None, + usage_details_by_model_id: Optional[Dict[str, UsageDetails]] = None, + interceptors: Optional[List["Interceptor"]] = None, + ): + self.export_id = export_id + self.project_id = project_id + self.default_usage_details = default_usage_details + self.usage_details_by_model_id = usage_details_by_model_id + self.interceptors = interceptors or [] + + def get_model_usage_details(self, model_id: str) -> Optional[UsageDetails]: + if self.usage_details_by_model_id is None: + return self.default_usage_details + return self.usage_details_by_model_id.get(model_id, self.default_usage_details) + + +_tracer: Optional[trace.Tracer] = None +_context: Optional[Context] = None +_data_context: DataContext = DataContext("", "") + + +def set_tracer(new_tracer: trace.Tracer) -> None: + global _tracer + _tracer = new_tracer + + +def get_tracer() -> Optional[trace.Tracer]: + return _tracer + + +def get_info(): + return { + "export_id": _data_context.export_id, + "project_id": _data_context.project_id, + } + + +def get_interceptors() -> List["Interceptor"]: + return _data_context.interceptors + + +def create_context(trace_id: int, parent_span_id: Optional[int]): + if parent_span_id is None: + generator = opentelemetry.sdk.trace.RandomIdGenerator() + parent_span_id = generator.generate_span_id() + span_context = SpanContext(trace_id=trace_id, span_id=parent_span_id, is_remote=True, trace_flags=TraceFlags(0x01)) + context = opentelemetry.trace.set_span_in_context(NonRecordingSpan(span_context)) + return context diff --git a/tracely/src/tracely/_runtime_context.py b/tracely/src/tracely/_runtime_context.py new file mode 100644 index 0000000..cdfa364 --- /dev/null +++ b/tracely/src/tracely/_runtime_context.py @@ -0,0 +1,40 @@ +from typing import Optional + +from .proxy import SpanObject + + +class RuntimeContext: + def __init__(self): + self.span = None + + def set_current_span(self, span: Optional[SpanObject]): + self.span = span + + def get_current_span(self) -> Optional[SpanObject]: + return self.span + + def reset_span(self): + self.span = None + + +_DEFAULT_CONTEXT = RuntimeContext() + + +def get_current_span(context: Optional[RuntimeContext] = None) -> Optional[SpanObject]: + if context is None: + return _DEFAULT_CONTEXT.get_current_span() + return context.get_current_span() + + +def set_current_span(span: Optional[SpanObject], context: Optional[RuntimeContext] = None): + if context is None: + _DEFAULT_CONTEXT.set_current_span(span) + else: + context.set_current_span(span) + + +def reset_span(context: Optional[RuntimeContext] = None): + if context is None: + _DEFAULT_CONTEXT.reset_span() + else: + context.reset_span() diff --git a/tracely/src/tracely/_tracer_provider.py b/tracely/src/tracely/_tracer_provider.py index d5fd180..90b1bfd 100644 --- a/tracely/src/tracely/_tracer_provider.py +++ b/tracely/src/tracely/_tracer_provider.py @@ -1,14 +1,13 @@ -import dataclasses import urllib.parse import uuid from typing import Dict +from typing import List from typing import Optional from typing import Union import opentelemetry.trace import requests from opentelemetry import trace -from opentelemetry.context import Context from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SimpleSpanProcessor @@ -16,6 +15,9 @@ from opentelemetry.trace import SpanContext from opentelemetry.trace import TraceFlags +from ._context import UsageDetails +from ._context import _data_context +from ._context import set_tracer from ._env import ( _EVIDENTLY_API_KEY, _TRACE_COLLECTOR_ADDRESS, @@ -25,40 +27,7 @@ _TRACE_COLLECTOR_PROJECT_ID, ) from .evidently_cloud_client import EvidentlyCloudClient - - -@dataclasses.dataclass -class UsageDetails: - cost_per_token: Dict[str, float] - - -class DataContext: - export_id: Union[str, uuid.UUID] - project_id: Union[str, uuid.UUID] - default_usage_details: Optional[UsageDetails] - usage_details_by_model_id: Optional[Dict[str, UsageDetails]] - - def __init__( - self, - export_id: str, - project_id: str, - default_usage_details: Optional[UsageDetails] = None, - usage_details_by_model_id: Optional[Dict[str, UsageDetails]] = None, - ): - self.export_id = export_id - self.project_id = project_id - self.default_usage_details = default_usage_details - self.usage_details_by_model_id = usage_details_by_model_id - - def get_model_usage_details(self, model_id: str) -> Optional[UsageDetails]: - if self.usage_details_by_model_id is None: - return self.default_usage_details - return self.usage_details_by_model_id.get(model_id, self.default_usage_details) - - -_tracer: Optional[trace.Tracer] = None -_context: Optional[Context] = None -_data_context: DataContext = DataContext("", "") +from .interceptors import Interceptor def _create_tracer_provider( @@ -70,6 +39,7 @@ def _create_tracer_provider( export_name: Optional[str] = None, default_usage_details: Optional[UsageDetails] = None, usage_details_by_model_id: Optional[Dict[str, UsageDetails]] = None, + interceptors: Optional[List[Interceptor]] = None, ) -> trace.TracerProvider: """ Creates Evidently telemetry tracer provider which would be used for sending traces. @@ -80,7 +50,6 @@ def _create_tracer_provider( project_id: id of project in Evidently Cloud export_name: string name of exported data, all data with same id would be grouped into single dataset """ - global _tracer # noqa: PLW0603 _address = address or _TRACE_COLLECTOR_ADDRESS if len(_address) == 0: @@ -140,6 +109,7 @@ def _create_tracer_provider( _data_context.project_id = uuid.UUID(_project_id) _data_context.default_usage_details = default_usage_details _data_context.usage_details_by_model_id = usage_details_by_model_id + _data_context.interceptors = interceptors or [] tracer_provider = TracerProvider( resource=Resource.create( @@ -181,7 +151,7 @@ def _create_tracer_provider( tracer_provider.add_span_processor(SimpleSpanProcessor(exporter)) else: raise ValueError(f"Unexpected processor type: {processor_type}. Expected values: batch or simple") - _tracer = tracer_provider.get_tracer("evidently") + set_tracer(tracer_provider.get_tracer("evidently")) return tracer_provider @@ -196,6 +166,7 @@ def init_tracing( processor_type: str = "batch", default_usage_details: Optional[UsageDetails] = None, usage_details_by_model_id: Optional[Dict[str, UsageDetails]] = None, + interceptors: Optional[List[Interceptor]] = None, ) -> trace.TracerProvider: """ Initialize Evidently tracing @@ -213,9 +184,9 @@ def init_tracing( 'simple' - upload traces synchronously as it is reported, can cause performance issues. default_usage_details: usage data for tokens usage_details_by_model_id: usage data for tokens by model id (if provided) + interceptors: list of interceptors to use """ - global _tracer # noqa: PLW0603 provider = _create_tracer_provider( address, exporter_type, @@ -225,22 +196,17 @@ def init_tracing( export_name, default_usage_details, usage_details_by_model_id, + interceptors, ) if as_global: trace.set_tracer_provider(provider) - _tracer = trace.get_tracer("evidently") + set_tracer(trace.get_tracer("evidently")) else: - _tracer = provider.get_tracer("evidently") + set_tracer(provider.get_tracer("evidently")) return provider -def get_tracer() -> trace.Tracer: - if _tracer is None: - raise ValueError("TracerProvider not initialized, use init_tracer()") - return _tracer - - def create_context(trace_id: int, parent_span_id: Optional[int]): if parent_span_id is None: generator = opentelemetry.sdk.trace.RandomIdGenerator() @@ -248,10 +214,3 @@ def create_context(trace_id: int, parent_span_id: Optional[int]): span_context = SpanContext(trace_id=trace_id, span_id=parent_span_id, is_remote=True, trace_flags=TraceFlags(0x01)) context = opentelemetry.trace.set_span_in_context(NonRecordingSpan(span_context)) return context - - -def get_info(): - return { - "export_id": _data_context.export_id, - "project_id": _data_context.project_id, - } diff --git a/tracely/src/tracely/context.py b/tracely/src/tracely/context.py index e4dd6e8..4e1f8dd 100644 --- a/tracely/src/tracely/context.py +++ b/tracely/src/tracely/context.py @@ -4,12 +4,15 @@ import opentelemetry.sdk.trace -from . import _tracer_provider -from .proxy import _ProxySpanObject +from ._context import get_tracer +from ._context import create_context +from ._runtime_context import get_current_span +from ._runtime_context import set_current_span +from .proxy import SpanObject @contextmanager -def create_trace_event(name: str, **params) -> Generator[_ProxySpanObject, None, None]: +def create_trace_event(name: str, **params) -> Generator[SpanObject, None, None]: """ Create a span with given name. @@ -20,19 +23,25 @@ def create_trace_event(name: str, **params) -> Generator[_ProxySpanObject, None, Returns: span object to work with """ - _tracer = _tracer_provider.get_tracer() + _tracer = get_tracer() + if _tracer is None: + raise ValueError("tracer not initialized") with _tracer.start_as_current_span(f"{name}") as span: - obj = _ProxySpanObject(span) + obj = SpanObject(span) + prev_span = get_current_span() + set_current_span(obj) + try: yield obj finally: for attr, value in params.items(): span.set_attribute(attr, value) + set_current_span(prev_span) @contextmanager def bind_to_trace(trace_id: int, parent_span_id: Optional[int] = None): - context = _tracer_provider.create_context(trace_id, parent_span_id) + context = create_context(trace_id, parent_span_id) token = opentelemetry.sdk.trace.context_api.attach(context) try: yield diff --git a/tracely/src/tracely/decorators.py b/tracely/src/tracely/decorators.py index 110da41..d0fa42a 100644 --- a/tracely/src/tracely/decorators.py +++ b/tracely/src/tracely/decorators.py @@ -5,9 +5,13 @@ from opentelemetry.trace import StatusCode import tracely -from . import _tracer_provider +from ._context import get_interceptors +from ._context import get_tracer +from .proxy import SpanObject from .proxy import set_result -from .proxy import _ProxySpanObject +from ._runtime_context import get_current_span +from ._runtime_context import set_current_span +from .interceptors import InterceptorContext def _fill_span_from_signature( @@ -15,7 +19,7 @@ def _fill_span_from_signature( ignore_args: Optional[List[str]], sign: Signature, bind: BoundArguments, - span: _ProxySpanObject, + span: SpanObject, ): final_args = track_args if final_args is None: @@ -60,16 +64,25 @@ async def func(*args, **kwargs): sign = inspect.signature(f) bind = sign.bind(*args, **kwargs) + interceptor_context = InterceptorContext() with tracely.create_trace_event(f"{span_name or f.__name__}", parse_output) as span: _fill_span_from_signature(track_args, ignore_args, bind.signature, bind, span) + for interceptor in get_interceptors(): + interceptor.before_call(span, interceptor_context, *args, **kwargs) try: result = await f(*args, **kwargs) if result is not None and track_output: span.set_result(result) + for interceptor in get_interceptors(): + interceptor.after_call(span, interceptor_context, result) span.set_status(StatusCode.OK) except Exception as e: - span.set_attribute("exception", str(e)) - span.set_status(StatusCode.ERROR) + processed = False + for interceptor in get_interceptors(): + processed = processed or interceptor.on_exception(span, interceptor_context, e) + if not processed: + span.set_attribute("exception", str(e)) + span.set_status(StatusCode.ERROR) raise return result @@ -80,20 +93,34 @@ async def func(*args, **kwargs): def func(*args, **kwargs): import inspect - _tracer = _tracer_provider.get_tracer() + _tracer = get_tracer() sign = inspect.signature(f) bind = sign.bind(*args, **kwargs) - with _tracer.start_as_current_span(f"{span_name or f.__name__}") as span: + interceptor_context = InterceptorContext() + with _tracer.start_as_current_span(f"{span_name or f.__name__}") as otel_span: + prev_span = get_current_span() + span = SpanObject(otel_span) + set_current_span(span) _fill_span_from_signature(track_args, ignore_args, bind.signature, bind, span) + for interceptor in get_interceptors(): + interceptor.before_call(span, interceptor_context, *args, **kwargs) try: result = f(*args, **kwargs) if result is not None and track_output: set_result(span, result, parse_output) + for interceptor in get_interceptors(): + interceptor.after_call(span, interceptor_context, result) span.set_status(StatusCode.OK) except Exception as e: - span.set_attribute("exception", str(e)) - span.set_status(StatusCode.ERROR) + processed = False + for interceptor in get_interceptors(): + processed = processed or interceptor.on_exception(span, interceptor_context, e) + if not processed: + span.set_attribute("exception", str(e)) + span.set_status(StatusCode.ERROR) raise + finally: + set_current_span(prev_span) return result return func diff --git a/tracely/src/tracely/interceptors.py b/tracely/src/tracely/interceptors.py new file mode 100644 index 0000000..8f44079 --- /dev/null +++ b/tracely/src/tracely/interceptors.py @@ -0,0 +1,32 @@ +import abc +from typing import Any +from typing import Dict + +from tracely.proxy import SpanObject + + +class InterceptorContext: + data: Dict[str, Any] + + def __init__(self): + self.data = {} + + def set(self, key: str, value): + self.data[key] = value + + def get(self, key: str): + return self.data.get(key) + + +class Interceptor: + @abc.abstractmethod + def before_call(self, span: SpanObject, context: InterceptorContext, *args, **kwargs): + pass + + @abc.abstractmethod + def after_call(self, span: SpanObject, context: InterceptorContext, return_value): + pass + + @abc.abstractmethod + def on_exception(self, span: SpanObject, context: InterceptorContext, ex: Exception) -> bool: + pass diff --git a/tracely/src/tracely/proxy.py b/tracely/src/tracely/proxy.py index 4660731..061d00f 100644 --- a/tracely/src/tracely/proxy.py +++ b/tracely/src/tracely/proxy.py @@ -10,8 +10,11 @@ from openai.types.responses import ResponseUsage -class _ProxySpanObject: +class SpanObject: + context: Dict[str, typing.Any] + def __init__(self, span: Optional[opentelemetry.trace.Span] = None): + self.context = {} if span is None: self.span = opentelemetry.trace.get_current_span() else: @@ -45,6 +48,15 @@ def update_usage( raise ValueError("Must specify either tokens or usage") self._update_usage(tokens=tokens, costs=costs) + def set_context_value(self, key, value): + self.context[key] = value + + def get_context_value(self, key): + return self.context.get(key) + + def get_context(self): + return self.context + def _update_usage( self, *, @@ -73,9 +85,8 @@ def _update_usage_openai(self, usage: "ResponseUsage"): }, ) - -def get_current_span(): - return _ProxySpanObject() + def set_status(self, status): + self.span.set_status(status) def set_result(span, result, parse_output: bool): diff --git a/tracely/tests/test_interceptors.py b/tracely/tests/test_interceptors.py new file mode 100644 index 0000000..e0d9e15 --- /dev/null +++ b/tracely/tests/test_interceptors.py @@ -0,0 +1,109 @@ +from functools import wraps +from uuid import UUID + +import pytest +import opentelemetry +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from tracely import init_tracing +from tracely import trace_event +from tracely import get_current_span +from tracely.interceptors import Interceptor + + +class MyException(Exception): + message = "Exception message" + + def __init__(self, value): + self.message = value + + +class AddAttributeBeforeCallInterceptor(Interceptor): + def before_call(self, span, context, *args, **kwargs): + pass + + def after_call(self, span, context, *args, **kwargs): + value = span.get_context_value("deco") + if value is not None: + span.set_attribute("deco", value) + span.set_attribute("status", "passed") + + def on_exception(self, span, context, exception) -> bool: + value = span.get_context_value("deco") + if value is not None: + if isinstance(exception, MyException): + span.set_attribute("deco", value) + span.set_attribute("status", "failed") + span.set_attribute("error", exception.message) + return True + return False + + +@pytest.fixture +def exporter(): + provider = init_tracing( + exporter_type="console", + processor_type="simple", + project_id=UUID(int=0), + export_name="test", + as_global=False, + interceptors=[AddAttributeBeforeCallInterceptor()], + ) + exporter = InMemorySpanExporter() + if isinstance(provider, opentelemetry.sdk.trace.TracerProvider): + provider.add_span_processor(SimpleSpanProcessor(exporter)) + return exporter + + +def stub_deco(fail, message): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + span = get_current_span() + if span: + span.set_context_value("deco", "test_deco") + if fail: + raise MyException(message) + return func(*args, **kwargs) + + return wrapper + + return decorator + + +@trace_event(track_output=True) +@stub_deco(False, "message") +def trace_func_with_output(): + return 100 + + +@trace_event(track_output=True) +@stub_deco(True, "message") +def trace_func_with_output_failed_deco(): + return 100 + + +def test_trace_func_with_output(exporter): + result = trace_func_with_output() + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "trace_func_with_output" + assert span.attributes["deco"] == "test_deco" + assert span.attributes["status"] == "passed" + assert span.attributes["result"] == result + + +def test_trace_func_with_output_failed_deco(exporter): + with pytest.raises(MyException): + trace_func_with_output_failed_deco() + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "trace_func_with_output_failed_deco" + assert span.attributes["deco"] == "test_deco" + assert span.attributes["status"] == "failed" + assert span.attributes["error"] == "message" diff --git a/tracely/tests/test_spans.py b/tracely/tests/test_spans.py index d21e294..30b8f56 100644 --- a/tracely/tests/test_spans.py +++ b/tracely/tests/test_spans.py @@ -24,6 +24,11 @@ def trace_func_with_output_struct(): } +@trace_event() +def trace_func_with_inner_trace(): + return trace_func_with_output() + + @trace_event(track_output=True, parse_output=True) def trace_func_with_tokens(): span = tracely.get_current_span() @@ -155,3 +160,12 @@ def test_trace_func_with_tokens(exporter): assert span.attributes["tokens.output"] == 200 assert span.attributes["cost.input"] == 0.1 assert span.attributes["cost.output"] == 1.0 + + +def test_trace_func_with_inner_trace(exporter): + trace_func_with_inner_trace() + + spans = exporter.get_finished_spans() + assert len(spans) == 2 + assert spans[0].name == "trace_func_with_output" + assert spans[1].name == "trace_func_with_inner_trace"