Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pip install taskiq-faststream[nats]
pip install taskiq-faststream[redis]
```

For **OpenTelemetry** distributed tracing support:

```bash
pip install taskiq-faststream[otel]
```

## Usage

The package gives you two classes: `AppWrapper` and `BrokerWrapper`
Expand Down Expand Up @@ -141,3 +147,37 @@ taskiq_broker.task(
...,
)
```

## OpenTelemetry Support

**taskiq-faststream** supports distributed tracing with OpenTelemetry. To enable it, install the `otel` extra and pass `enable_otel=True` when creating the broker wrapper:

```python
from faststream.nats import NatsBroker
from taskiq_faststream import BrokerWrapper

broker = NatsBroker()

# Enable OpenTelemetry middleware
taskiq_broker = BrokerWrapper(broker, enable_otel=True)
```

This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
- Task execution spans
- Task dependencies and call chains
- Performance metrics
- Error tracking

Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup.

The same applies to `AppWrapper`:

```python
from faststream import FastStream
from taskiq_faststream import AppWrapper

app = FastStream(broker)

# Enable OpenTelemetry middleware
taskiq_broker = AppWrapper(app, enable_otel=True)
```
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "taskiq-faststream"
version = "0.3.2"
version = "0.4.0"
description = "FastStream - taskiq integration to schedule FastStream tasks"
readme = "README.md"
license = "MIT"
Expand Down Expand Up @@ -40,7 +40,7 @@ classifiers = [


dependencies = [
"taskiq>=0.11.0,<0.12.0",
"taskiq>=0.12.1,<0.13.0",
"faststream>=0.3.14,<0.7",
]

Expand All @@ -65,6 +65,10 @@ redis = [
"faststream[redis]"
]

otel = [
"taskiq[opentelemetry]>=0.12.1,<0.13.0"
]

[dependency-groups]
test = [
"taskiq-faststream[nats]",
Expand Down
51 changes: 49 additions & 2 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from taskiq_faststream.types import ScheduledTask
from taskiq_faststream.utils import resolve_msg

try:
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
except ImportError:
OpenTelemetryMiddleware = None # type: ignore[assignment,misc]

PublishParameters: TypeAlias = typing.Any


Expand All @@ -30,11 +35,32 @@ class BrokerWrapper(AsyncBroker):
task : Register FastStream scheduled task.
"""

def __init__(self, broker: Any) -> None:
def __init__(
self,
broker: Any,
*,
enable_otel: bool = False,
) -> None:
"""Initialize BrokerWrapper.

Args:
broker: FastStream broker instance to wrap.
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
Requires taskiq[otel] to be installed.
"""
super().__init__()
self.formatter = PatchedFormatter()
self.broker = broker

if enable_otel:
if OpenTelemetryMiddleware is None:
msg = (
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
"Install it with: pip install taskiq-faststream[otel]"
)
raise ImportError(msg)
self.middlewares.append(OpenTelemetryMiddleware())

async def startup(self) -> None:
"""Startup wrapped FastStream broker."""
await super().startup()
Expand Down Expand Up @@ -105,11 +131,32 @@ class AppWrapper(BrokerWrapper):
task : Register FastStream scheduled task.
"""

def __init__(self, app: Application) -> None:
def __init__(
self,
app: Application,
*,
enable_otel: bool = False,
) -> None:
"""Initialize AppWrapper.

Args:
app: FastStream application instance to wrap.
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
Requires taskiq[otel] to be installed.
"""
super(BrokerWrapper, self).__init__()
self.formatter = PatchedFormatter()
self.app = app

if enable_otel:
if OpenTelemetryMiddleware is None:
msg = (
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
"Install it with: pip install taskiq-faststream[otel]"
)
raise ImportError(msg)
self.middlewares.append(OpenTelemetryMiddleware())

async def startup(self) -> None:
"""Startup wrapped FastStream."""
await super(BrokerWrapper, self).startup()
Expand Down