diff --git a/README.md b/README.md index ecf9bc9..9d5804f 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,12 @@ pip install taskiq-faststream[nats] pip install taskiq-faststream[redis] ``` +For **OpenTelemetry** distributed tracing support: + +```bash +pip install taskiq-faststream[otel] +``` + ## Usage The package gives you two classes: `AppWrapper` and `BrokerWrapper` @@ -141,3 +147,37 @@ taskiq_broker.task( ..., ) ``` + +## OpenTelemetry Support + +**taskiq-faststream** supports distributed tracing with OpenTelemetry. To enable it, install the `otel` extra and pass `enable_otel=True` when creating the broker wrapper: + +```python +from faststream.nats import NatsBroker +from taskiq_faststream import BrokerWrapper + +broker = NatsBroker() + +# Enable OpenTelemetry middleware +taskiq_broker = BrokerWrapper(broker, enable_otel=True) +``` + +This will automatically add OpenTelemetry middleware to track task execution, providing insights into: +- Task execution spans +- Task dependencies and call chains +- Performance metrics +- Error tracking + +Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup. + +The same applies to `AppWrapper`: + +```python +from faststream import FastStream +from taskiq_faststream import AppWrapper + +app = FastStream(broker) + +# Enable OpenTelemetry middleware +taskiq_broker = AppWrapper(app, enable_otel=True) +``` diff --git a/pyproject.toml b/pyproject.toml index 4ba4061..2407514 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "taskiq-faststream" -version = "0.3.2" +version = "0.4.0" description = "FastStream - taskiq integration to schedule FastStream tasks" readme = "README.md" license = "MIT" @@ -40,7 +40,7 @@ classifiers = [ dependencies = [ - "taskiq>=0.11.0,<0.12.0", + "taskiq>=0.12.1,<0.13.0", "faststream>=0.3.14,<0.7", ] @@ -65,6 +65,10 @@ redis = [ "faststream[redis]" ] +otel = [ + "taskiq[opentelemetry]>=0.12.1,<0.13.0" +] + [dependency-groups] test = [ "taskiq-faststream[nats]", diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f5a6009..bb7739a 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -13,6 +13,11 @@ from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg +try: + from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware +except ImportError: + OpenTelemetryMiddleware = None # type: ignore[assignment,misc] + PublishParameters: TypeAlias = typing.Any @@ -30,11 +35,32 @@ class BrokerWrapper(AsyncBroker): task : Register FastStream scheduled task. """ - def __init__(self, broker: Any) -> None: + def __init__( + self, + broker: Any, + *, + enable_otel: bool = False, + ) -> None: + """Initialize BrokerWrapper. + + Args: + broker: FastStream broker instance to wrap. + enable_otel: Enable OpenTelemetry middleware for distributed tracing. + Requires taskiq[otel] to be installed. + """ super().__init__() self.formatter = PatchedFormatter() self.broker = broker + if enable_otel: + if OpenTelemetryMiddleware is None: + msg = ( + "OpenTelemetry middleware requires taskiq[otel] to be installed. " + "Install it with: pip install taskiq-faststream[otel]" + ) + raise ImportError(msg) + self.middlewares.append(OpenTelemetryMiddleware()) + async def startup(self) -> None: """Startup wrapped FastStream broker.""" await super().startup() @@ -105,11 +131,32 @@ class AppWrapper(BrokerWrapper): task : Register FastStream scheduled task. """ - def __init__(self, app: Application) -> None: + def __init__( + self, + app: Application, + *, + enable_otel: bool = False, + ) -> None: + """Initialize AppWrapper. + + Args: + app: FastStream application instance to wrap. + enable_otel: Enable OpenTelemetry middleware for distributed tracing. + Requires taskiq[otel] to be installed. + """ super(BrokerWrapper, self).__init__() self.formatter = PatchedFormatter() self.app = app + if enable_otel: + if OpenTelemetryMiddleware is None: + msg = ( + "OpenTelemetry middleware requires taskiq[otel] to be installed. " + "Install it with: pip install taskiq-faststream[otel]" + ) + raise ImportError(msg) + self.middlewares.append(OpenTelemetryMiddleware()) + async def startup(self) -> None: """Startup wrapped FastStream.""" await super(BrokerWrapper, self).startup()