diff --git a/lite_bootstrap/__init__.py b/lite_bootstrap/__init__.py index 8dc9383..59156c8 100644 --- a/lite_bootstrap/__init__.py +++ b/lite_bootstrap/__init__.py @@ -1,4 +1,5 @@ from lite_bootstrap.bootstrappers.fastapi_bootstrapper import FastAPIBootstrapper, FastAPIConfig +from lite_bootstrap.bootstrappers.faststream_bootstrapper import FastStreamBootstrapper, FastStreamConfig from lite_bootstrap.bootstrappers.free_bootstrapper import FreeBootstrapper, FreeBootstrapperConfig from lite_bootstrap.bootstrappers.litestar_bootstrapper import LitestarBootstrapper, LitestarConfig @@ -6,6 +7,8 @@ __all__ = [ "FastAPIBootstrapper", "FastAPIConfig", + "FastStreamBootstrapper", + "FastStreamConfig", "FreeBootstrapper", "FreeBootstrapperConfig", "LitestarBootstrapper", diff --git a/lite_bootstrap/bootstrappers/fastapi_bootstrapper.py b/lite_bootstrap/bootstrappers/fastapi_bootstrapper.py index 9e8fcd1..682750f 100644 --- a/lite_bootstrap/bootstrappers/fastapi_bootstrapper.py +++ b/lite_bootstrap/bootstrappers/fastapi_bootstrapper.py @@ -112,8 +112,5 @@ class FastAPIBootstrapper(BaseBootstrapper[fastapi.FastAPI]): bootstrap_config: FastAPIConfig __slots__ = "bootstrap_config", "instruments" - def __init__(self, bootstrap_config: FastAPIConfig) -> None: - super().__init__(bootstrap_config) - def _prepare_application(self) -> fastapi.FastAPI: return self.bootstrap_config.application diff --git a/lite_bootstrap/bootstrappers/faststream_bootstrapper.py b/lite_bootstrap/bootstrappers/faststream_bootstrapper.py new file mode 100644 index 0000000..686edf1 --- /dev/null +++ b/lite_bootstrap/bootstrappers/faststream_bootstrapper.py @@ -0,0 +1,143 @@ +from __future__ import annotations +import contextlib +import dataclasses +import json +import typing + +from lite_bootstrap.bootstrappers.base import BaseBootstrapper +from lite_bootstrap.instruments.healthchecks_instrument import HealthChecksConfig, HealthChecksInstrument +from lite_bootstrap.instruments.logging_instrument import LoggingConfig, LoggingInstrument +from lite_bootstrap.instruments.opentelemetry_instrument import OpentelemetryConfig, OpenTelemetryInstrument +from lite_bootstrap.instruments.prometheus_instrument import PrometheusConfig, PrometheusInstrument +from lite_bootstrap.instruments.sentry_instrument import SentryConfig, SentryInstrument + + +with contextlib.suppress(ImportError): + import faststream + import prometheus_client + from faststream.asgi import AsgiFastStream, AsgiResponse + from faststream.asgi import get as handle_get + from faststream.broker.core.usecase import BrokerUsecase + from opentelemetry.metrics import Meter, MeterProvider + from opentelemetry.trace import TracerProvider, get_tracer_provider + + +@typing.runtime_checkable +class FastStreamTelemetryMiddlewareProtocol(typing.Protocol): + def __init__( + self, + *, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + meter: Meter | None = None, + ) -> None: ... + def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401 + + +@typing.runtime_checkable +class FastStreamPrometheusMiddlewareProtocol(typing.Protocol): + def __init__( + self, + *, + registry: prometheus_client.CollectorRegistry, + app_name: str = ..., + metrics_prefix: str = "faststream", + received_messages_size_buckets: typing.Sequence[float] | None = None, + ) -> None: ... + def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401 + + +@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) +class FastStreamConfig(HealthChecksConfig, LoggingConfig, OpentelemetryConfig, PrometheusConfig, SentryConfig): + application: AsgiFastStream = dataclasses.field(default_factory=AsgiFastStream) + broker: BrokerUsecase[typing.Any, typing.Any] | None = None + opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = None + prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = None + + +@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) +class FastStreamHealthChecksInstrument(HealthChecksInstrument): + bootstrap_config: FastStreamConfig + + def bootstrap(self) -> None: + @handle_get + async def check_health(_: object) -> AsgiResponse: + return ( + AsgiResponse( + json.dumps(self.render_health_check_data()).encode(), 200, headers={"content-type": "text/plain"} + ) + if await self._define_health_status() + else AsgiResponse(b"Service is unhealthy", 500, headers={"content-type": "application/json"}) + ) + + self.bootstrap_config.application.mount(self.bootstrap_config.health_checks_path, check_health) + + async def _define_health_status(self) -> bool: + if not self.bootstrap_config.application or not self.bootstrap_config.application.broker: + return False + + return await self.bootstrap_config.application.broker.ping(timeout=5) + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class FastStreamLoggingInstrument(LoggingInstrument): + bootstrap_config: FastStreamConfig + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class FastStreamOpenTelemetryInstrument(OpenTelemetryInstrument): + bootstrap_config: FastStreamConfig + + def is_ready(self) -> bool: + return bool(self.bootstrap_config.opentelemetry_middleware_cls and super().is_ready()) + + def bootstrap(self) -> None: + if self.bootstrap_config.opentelemetry_middleware_cls and self.bootstrap_config.application.broker: + self.bootstrap_config.application.broker.add_middleware( + self.bootstrap_config.opentelemetry_middleware_cls(tracer_provider=get_tracer_provider()) + ) + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class FastStreamSentryInstrument(SentryInstrument): + bootstrap_config: FastStreamConfig + + +@dataclasses.dataclass(kw_only=True, frozen=True) +class FastStreamPrometheusInstrument(PrometheusInstrument): + bootstrap_config: FastStreamConfig + collector_registry: prometheus_client.CollectorRegistry = dataclasses.field( + default_factory=prometheus_client.CollectorRegistry, init=False + ) + + def is_ready(self) -> bool: + return bool(self.bootstrap_config.prometheus_middleware_cls and super().is_ready()) + + def bootstrap(self) -> None: + self.bootstrap_config.application.mount( + self.bootstrap_config.prometheus_metrics_path, prometheus_client.make_asgi_app(self.collector_registry) + ) + if self.bootstrap_config.prometheus_middleware_cls and self.bootstrap_config.application.broker: + self.bootstrap_config.application.broker.add_middleware( + self.bootstrap_config.prometheus_middleware_cls(registry=self.collector_registry) + ) + + +class FastStreamBootstrapper(BaseBootstrapper[AsgiFastStream]): + instruments_types: typing.ClassVar = [ + FastStreamOpenTelemetryInstrument, + FastStreamSentryInstrument, + FastStreamHealthChecksInstrument, + FastStreamLoggingInstrument, + FastStreamPrometheusInstrument, + ] + bootstrap_config: FastStreamConfig + __slots__ = "bootstrap_config", "instruments" + + def __init__(self, bootstrap_config: FastStreamConfig) -> None: + super().__init__(bootstrap_config) + if self.bootstrap_config.broker: + self.bootstrap_config.application.broker = self.bootstrap_config.broker + + def _prepare_application(self) -> AsgiFastStream: + return self.bootstrap_config.application diff --git a/pyproject.toml b/pyproject.toml index 71b5434..1ce78ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,15 +67,29 @@ litestar = [ litestar-otl = [ "opentelemetry-instrumentation-asgi>=0.46b0", ] +litestar-metrics = [ + "prometheus-client>=0.20", +] litestar-all = [ - "lite-bootstrap[sentry,otl,logging,litestar,litestar-otl]" + "lite-bootstrap[sentry,otl,logging,litestar,litestar-otl,litestar-metrics]" +] +faststream = [ + "faststream", +] +faststream-metrics = [ + "prometheus-client>=0.20", +] +faststream-all = [ + "lite-bootstrap[sentry,otl,logging,faststream,faststream-metrics]" ] [dependency-groups] dev = [ "pytest", "pytest-cov", + "pytest-asyncio", "httpx", # for test client + "redis>=5.2.1", "mypy", "ruff", ] @@ -117,6 +131,8 @@ isort.no-lines-before = ["standard-library", "local-folder"] [tool.pytest.ini_options] addopts = "--cov=. --cov-report term-missing" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" [tool.coverage.report] exclude_also = ["if typing.TYPE_CHECKING:"] diff --git a/tests/test_faststream_bootstrap.py b/tests/test_faststream_bootstrap.py new file mode 100644 index 0000000..16d8ff2 --- /dev/null +++ b/tests/test_faststream_bootstrap.py @@ -0,0 +1,78 @@ +import pytest +import structlog +from faststream.redis import RedisBroker, TestRedisBroker +from faststream.redis.opentelemetry import RedisTelemetryMiddleware +from faststream.redis.prometheus import RedisPrometheusMiddleware +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from starlette import status +from starlette.testclient import TestClient + +from lite_bootstrap import FastStreamBootstrapper, FastStreamConfig +from tests.conftest import CustomInstrumentor + + +logger = structlog.getLogger(__name__) + + +@pytest.fixture +def broker() -> RedisBroker: + return RedisBroker() + + +async def test_faststream_bootstrap(broker: RedisBroker) -> None: + prometheus_metrics_path = "/test-metrics-path" + health_check_path = "/custom-health-check-path" + bootstrapper = FastStreamBootstrapper( + bootstrap_config=FastStreamConfig( + broker=broker, + service_name="microservice", + service_version="2.0.0", + service_environment="test", + service_debug=False, + opentelemetry_endpoint="otl", + opentelemetry_instrumentors=[CustomInstrumentor()], + opentelemetry_span_exporter=ConsoleSpanExporter(), + opentelemetry_middleware_cls=RedisTelemetryMiddleware, + prometheus_metrics_path=prometheus_metrics_path, + prometheus_middleware_cls=RedisPrometheusMiddleware, + sentry_dsn="https://testdsn@localhost/1", + health_checks_path=health_check_path, + logging_buffer_capacity=0, + ), + ) + application = bootstrapper.bootstrap() + logger.info("testing logging", key="value") + test_client = TestClient(app=application) + + async with TestRedisBroker(broker): + response = test_client.get(prometheus_metrics_path) + assert response.status_code == status.HTTP_200_OK + + response = test_client.get(health_check_path) + assert response.status_code == status.HTTP_200_OK + assert response.json() == {"health_status": True, "service_name": "microservice", "service_version": "2.0.0"} + + +async def test_faststream_bootstrap_health_check_wo_broker() -> None: + health_check_path = "/custom-health-check-path" + bootstrapper = FastStreamBootstrapper( + bootstrap_config=FastStreamConfig( + service_name="microservice", + service_version="2.0.0", + service_environment="test", + service_debug=False, + opentelemetry_endpoint="otl", + opentelemetry_instrumentors=[CustomInstrumentor()], + opentelemetry_span_exporter=ConsoleSpanExporter(), + sentry_dsn="https://testdsn@localhost/1", + health_checks_path=health_check_path, + logging_buffer_capacity=0, + ), + ) + application = bootstrapper.bootstrap() + logger.info("testing logging", key="value") + test_client = TestClient(app=application) + + response = test_client.get(health_check_path) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.text == "Service is unhealthy"