From 93619faccec35e9239f067f4ee0efb09bada0a6b Mon Sep 17 00:00:00 2001 From: yann-combarnous <39089766+yann-combarnous@users.noreply.github.com> Date: Wed, 24 Dec 2025 11:25:24 +0100 Subject: [PATCH 1/4] Update pyproject --- pyproject.toml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4ba4061..65aab82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "taskiq-faststream" -version = "0.3.2" +version = "0.4.0" description = "FastStream - taskiq integration to schedule FastStream tasks" readme = "README.md" license = "MIT" @@ -40,7 +40,7 @@ classifiers = [ dependencies = [ - "taskiq>=0.11.0,<0.12.0", + "taskiq>=0.12.1,<0.13.0", "faststream>=0.3.14,<0.7", ] @@ -65,6 +65,10 @@ redis = [ "faststream[redis]" ] +otel = [ + "taskiq[otel]>=0.12.1,<0.13.0" +] + [dependency-groups] test = [ "taskiq-faststream[nats]", From 5574e57763ab9f2acfde979321551b6b0cb0bbbe Mon Sep 17 00:00:00 2001 From: yann-combarnous <39089766+yann-combarnous@users.noreply.github.com> Date: Wed, 24 Dec 2025 11:25:58 +0100 Subject: [PATCH 2/4] Update broker.py --- taskiq_faststream/broker.py | 51 +++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f5a6009..bb7739a 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -13,6 +13,11 @@ from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg +try: + from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware +except ImportError: + OpenTelemetryMiddleware = None # type: ignore[assignment,misc] + PublishParameters: TypeAlias = typing.Any @@ -30,11 +35,32 @@ class BrokerWrapper(AsyncBroker): task : Register FastStream scheduled task. """ - def __init__(self, broker: Any) -> None: + def __init__( + self, + broker: Any, + *, + enable_otel: bool = False, + ) -> None: + """Initialize BrokerWrapper. + + Args: + broker: FastStream broker instance to wrap. + enable_otel: Enable OpenTelemetry middleware for distributed tracing. + Requires taskiq[otel] to be installed. + """ super().__init__() self.formatter = PatchedFormatter() self.broker = broker + if enable_otel: + if OpenTelemetryMiddleware is None: + msg = ( + "OpenTelemetry middleware requires taskiq[otel] to be installed. " + "Install it with: pip install taskiq-faststream[otel]" + ) + raise ImportError(msg) + self.middlewares.append(OpenTelemetryMiddleware()) + async def startup(self) -> None: """Startup wrapped FastStream broker.""" await super().startup() @@ -105,11 +131,32 @@ class AppWrapper(BrokerWrapper): task : Register FastStream scheduled task. """ - def __init__(self, app: Application) -> None: + def __init__( + self, + app: Application, + *, + enable_otel: bool = False, + ) -> None: + """Initialize AppWrapper. + + Args: + app: FastStream application instance to wrap. + enable_otel: Enable OpenTelemetry middleware for distributed tracing. + Requires taskiq[otel] to be installed. + """ super(BrokerWrapper, self).__init__() self.formatter = PatchedFormatter() self.app = app + if enable_otel: + if OpenTelemetryMiddleware is None: + msg = ( + "OpenTelemetry middleware requires taskiq[otel] to be installed. " + "Install it with: pip install taskiq-faststream[otel]" + ) + raise ImportError(msg) + self.middlewares.append(OpenTelemetryMiddleware()) + async def startup(self) -> None: """Startup wrapped FastStream.""" await super(BrokerWrapper, self).startup() From c70d755a87198f0c2c62b7c12c60ceb14b0b03c8 Mon Sep 17 00:00:00 2001 From: yann-combarnous <39089766+yann-combarnous@users.noreply.github.com> Date: Wed, 24 Dec 2025 11:28:34 +0100 Subject: [PATCH 3/4] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 65aab82..2407514 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ redis = [ ] otel = [ - "taskiq[otel]>=0.12.1,<0.13.0" + "taskiq[opentelemetry]>=0.12.1,<0.13.0" ] [dependency-groups] From 3b492c3d09ae2480bda20409e60d279098ae6dcb Mon Sep 17 00:00:00 2001 From: yann-combarnous <39089766+yann-combarnous@users.noreply.github.com> Date: Wed, 24 Dec 2025 11:31:03 +0100 Subject: [PATCH 4/4] Update README.md --- README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.md b/README.md index ecf9bc9..9d5804f 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,12 @@ pip install taskiq-faststream[nats] pip install taskiq-faststream[redis] ``` +For **OpenTelemetry** distributed tracing support: + +```bash +pip install taskiq-faststream[otel] +``` + ## Usage The package gives you two classes: `AppWrapper` and `BrokerWrapper` @@ -141,3 +147,37 @@ taskiq_broker.task( ..., ) ``` + +## OpenTelemetry Support + +**taskiq-faststream** supports distributed tracing with OpenTelemetry. To enable it, install the `otel` extra and pass `enable_otel=True` when creating the broker wrapper: + +```python +from faststream.nats import NatsBroker +from taskiq_faststream import BrokerWrapper + +broker = NatsBroker() + +# Enable OpenTelemetry middleware +taskiq_broker = BrokerWrapper(broker, enable_otel=True) +``` + +This will automatically add OpenTelemetry middleware to track task execution, providing insights into: +- Task execution spans +- Task dependencies and call chains +- Performance metrics +- Error tracking + +Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup. + +The same applies to `AppWrapper`: + +```python +from faststream import FastStream +from taskiq_faststream import AppWrapper + +app = FastStream(broker) + +# Enable OpenTelemetry middleware +taskiq_broker = AppWrapper(app, enable_otel=True) +```