Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lite_bootstrap/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from lite_bootstrap.bootstrappers.fastapi_bootstrapper import FastAPIBootstrapper, FastAPIConfig
from lite_bootstrap.bootstrappers.faststream_bootstrapper import FastStreamBootstrapper, FastStreamConfig
from lite_bootstrap.bootstrappers.free_bootstrapper import FreeBootstrapper, FreeBootstrapperConfig
from lite_bootstrap.bootstrappers.litestar_bootstrapper import LitestarBootstrapper, LitestarConfig


__all__ = [
"FastAPIBootstrapper",
"FastAPIConfig",
"FastStreamBootstrapper",
"FastStreamConfig",
"FreeBootstrapper",
"FreeBootstrapperConfig",
"LitestarBootstrapper",
Expand Down
3 changes: 0 additions & 3 deletions lite_bootstrap/bootstrappers/fastapi_bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,5 @@ class FastAPIBootstrapper(BaseBootstrapper[fastapi.FastAPI]):
bootstrap_config: FastAPIConfig
__slots__ = "bootstrap_config", "instruments"

def __init__(self, bootstrap_config: FastAPIConfig) -> None:
super().__init__(bootstrap_config)

def _prepare_application(self) -> fastapi.FastAPI:
return self.bootstrap_config.application
143 changes: 143 additions & 0 deletions lite_bootstrap/bootstrappers/faststream_bootstrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations
import contextlib
import dataclasses
import json
import typing

from lite_bootstrap.bootstrappers.base import BaseBootstrapper
from lite_bootstrap.instruments.healthchecks_instrument import HealthChecksConfig, HealthChecksInstrument
from lite_bootstrap.instruments.logging_instrument import LoggingConfig, LoggingInstrument
from lite_bootstrap.instruments.opentelemetry_instrument import OpentelemetryConfig, OpenTelemetryInstrument
from lite_bootstrap.instruments.prometheus_instrument import PrometheusConfig, PrometheusInstrument
from lite_bootstrap.instruments.sentry_instrument import SentryConfig, SentryInstrument


with contextlib.suppress(ImportError):
import faststream
import prometheus_client
from faststream.asgi import AsgiFastStream, AsgiResponse
from faststream.asgi import get as handle_get
from faststream.broker.core.usecase import BrokerUsecase
from opentelemetry.metrics import Meter, MeterProvider
from opentelemetry.trace import TracerProvider, get_tracer_provider


@typing.runtime_checkable
class FastStreamTelemetryMiddlewareProtocol(typing.Protocol):
def __init__(
self,
*,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
meter: Meter | None = None,
) -> None: ...
def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401


@typing.runtime_checkable
class FastStreamPrometheusMiddlewareProtocol(typing.Protocol):
def __init__(
self,
*,
registry: prometheus_client.CollectorRegistry,
app_name: str = ...,
metrics_prefix: str = "faststream",
received_messages_size_buckets: typing.Sequence[float] | None = None,
) -> None: ...
def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401


@dataclasses.dataclass(kw_only=True, slots=True, frozen=True)
class FastStreamConfig(HealthChecksConfig, LoggingConfig, OpentelemetryConfig, PrometheusConfig, SentryConfig):
application: AsgiFastStream = dataclasses.field(default_factory=AsgiFastStream)
broker: BrokerUsecase[typing.Any, typing.Any] | None = None
opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = None
prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = None


@dataclasses.dataclass(kw_only=True, slots=True, frozen=True)
class FastStreamHealthChecksInstrument(HealthChecksInstrument):
bootstrap_config: FastStreamConfig

def bootstrap(self) -> None:
@handle_get
async def check_health(_: object) -> AsgiResponse:
return (
AsgiResponse(
json.dumps(self.render_health_check_data()).encode(), 200, headers={"content-type": "text/plain"}
)
if await self._define_health_status()
else AsgiResponse(b"Service is unhealthy", 500, headers={"content-type": "application/json"})
)

self.bootstrap_config.application.mount(self.bootstrap_config.health_checks_path, check_health)

async def _define_health_status(self) -> bool:
if not self.bootstrap_config.application or not self.bootstrap_config.application.broker:
return False

return await self.bootstrap_config.application.broker.ping(timeout=5)


@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamLoggingInstrument(LoggingInstrument):
bootstrap_config: FastStreamConfig


@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamOpenTelemetryInstrument(OpenTelemetryInstrument):
bootstrap_config: FastStreamConfig

def is_ready(self) -> bool:
return bool(self.bootstrap_config.opentelemetry_middleware_cls and super().is_ready())

def bootstrap(self) -> None:
if self.bootstrap_config.opentelemetry_middleware_cls and self.bootstrap_config.application.broker:
self.bootstrap_config.application.broker.add_middleware(
self.bootstrap_config.opentelemetry_middleware_cls(tracer_provider=get_tracer_provider())
)


@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamSentryInstrument(SentryInstrument):
bootstrap_config: FastStreamConfig


@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamPrometheusInstrument(PrometheusInstrument):
bootstrap_config: FastStreamConfig
collector_registry: prometheus_client.CollectorRegistry = dataclasses.field(
default_factory=prometheus_client.CollectorRegistry, init=False
)

def is_ready(self) -> bool:
return bool(self.bootstrap_config.prometheus_middleware_cls and super().is_ready())

def bootstrap(self) -> None:
self.bootstrap_config.application.mount(
self.bootstrap_config.prometheus_metrics_path, prometheus_client.make_asgi_app(self.collector_registry)
)
if self.bootstrap_config.prometheus_middleware_cls and self.bootstrap_config.application.broker:
self.bootstrap_config.application.broker.add_middleware(
self.bootstrap_config.prometheus_middleware_cls(registry=self.collector_registry)
)


class FastStreamBootstrapper(BaseBootstrapper[AsgiFastStream]):
instruments_types: typing.ClassVar = [
FastStreamOpenTelemetryInstrument,
FastStreamSentryInstrument,
FastStreamHealthChecksInstrument,
FastStreamLoggingInstrument,
FastStreamPrometheusInstrument,
]
bootstrap_config: FastStreamConfig
__slots__ = "bootstrap_config", "instruments"

def __init__(self, bootstrap_config: FastStreamConfig) -> None:
super().__init__(bootstrap_config)
if self.bootstrap_config.broker:
self.bootstrap_config.application.broker = self.bootstrap_config.broker

def _prepare_application(self) -> AsgiFastStream:
return self.bootstrap_config.application
18 changes: 17 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,29 @@ litestar = [
litestar-otl = [
"opentelemetry-instrumentation-asgi>=0.46b0",
]
litestar-metrics = [
"prometheus-client>=0.20",
]
litestar-all = [
"lite-bootstrap[sentry,otl,logging,litestar,litestar-otl]"
"lite-bootstrap[sentry,otl,logging,litestar,litestar-otl,litestar-metrics]"
]
faststream = [
"faststream",
]
faststream-metrics = [
"prometheus-client>=0.20",
]
faststream-all = [
"lite-bootstrap[sentry,otl,logging,faststream,faststream-metrics]"
]

[dependency-groups]
dev = [
"pytest",
"pytest-cov",
"pytest-asyncio",
"httpx", # for test client
"redis>=5.2.1",
"mypy",
"ruff",
]
Expand Down Expand Up @@ -117,6 +131,8 @@ isort.no-lines-before = ["standard-library", "local-folder"]

[tool.pytest.ini_options]
addopts = "--cov=. --cov-report term-missing"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.coverage.report]
exclude_also = ["if typing.TYPE_CHECKING:"]
78 changes: 78 additions & 0 deletions tests/test_faststream_bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pytest
import structlog
from faststream.redis import RedisBroker, TestRedisBroker
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
from faststream.redis.prometheus import RedisPrometheusMiddleware
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from starlette import status
from starlette.testclient import TestClient

from lite_bootstrap import FastStreamBootstrapper, FastStreamConfig
from tests.conftest import CustomInstrumentor


logger = structlog.getLogger(__name__)


@pytest.fixture
def broker() -> RedisBroker:
return RedisBroker()


async def test_faststream_bootstrap(broker: RedisBroker) -> None:
prometheus_metrics_path = "/test-metrics-path"
health_check_path = "/custom-health-check-path"
bootstrapper = FastStreamBootstrapper(
bootstrap_config=FastStreamConfig(
broker=broker,
service_name="microservice",
service_version="2.0.0",
service_environment="test",
service_debug=False,
opentelemetry_endpoint="otl",
opentelemetry_instrumentors=[CustomInstrumentor()],
opentelemetry_span_exporter=ConsoleSpanExporter(),
opentelemetry_middleware_cls=RedisTelemetryMiddleware,
prometheus_metrics_path=prometheus_metrics_path,
prometheus_middleware_cls=RedisPrometheusMiddleware,
sentry_dsn="https://testdsn@localhost/1",
health_checks_path=health_check_path,
logging_buffer_capacity=0,
),
)
application = bootstrapper.bootstrap()
logger.info("testing logging", key="value")
test_client = TestClient(app=application)

async with TestRedisBroker(broker):
response = test_client.get(prometheus_metrics_path)
assert response.status_code == status.HTTP_200_OK

response = test_client.get(health_check_path)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"health_status": True, "service_name": "microservice", "service_version": "2.0.0"}


async def test_faststream_bootstrap_health_check_wo_broker() -> None:
health_check_path = "/custom-health-check-path"
bootstrapper = FastStreamBootstrapper(
bootstrap_config=FastStreamConfig(
service_name="microservice",
service_version="2.0.0",
service_environment="test",
service_debug=False,
opentelemetry_endpoint="otl",
opentelemetry_instrumentors=[CustomInstrumentor()],
opentelemetry_span_exporter=ConsoleSpanExporter(),
sentry_dsn="https://testdsn@localhost/1",
health_checks_path=health_check_path,
logging_buffer_capacity=0,
),
)
application = bootstrapper.bootstrap()
logger.info("testing logging", key="value")
test_client = TestClient(app=application)

response = test_client.get(health_check_path)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert response.text == "Service is unhealthy"