From 005f1149ab1a41178ad1c04bab54fae2482bb238 Mon Sep 17 00:00:00 2001 From: fusion44 Date: Thu, 1 Aug 2024 09:22:29 +0200 Subject: [PATCH] feat: replace SSE with a websockets Old: /sse/subscribe New: /ws refs #252 --- app/api/sse_manager.py | 66 ----- app/api/utils.py | 24 +- app/api/ws_manager.py | 54 ++++ app/apps/impl/native_python.py | 22 +- app/apps/impl/raspiblitz.py | 24 +- app/apps/router.py | 13 - app/apps/service.py | 6 +- app/bitcoind/router.py | 15 +- app/bitcoind/service.py | 6 +- app/external/__init__.py | 0 app/external/sse_starlette/LICENSE.md | 27 -- app/external/sse_starlette/__init__.py | 7 - app/external/sse_starlette/sse_starlette.py | 263 -------------------- app/lightning/impl/cln_grpc.py | 4 +- app/lightning/impl/cln_jrpc.py | 4 +- app/lightning/impl/lnd_grpc.py | 6 +- app/lightning/impl/protos/lnd/flake.nix | 17 +- app/lightning/service.py | 14 +- app/main.py | 112 ++++----- app/system/impl/raspiblitz.py | 6 +- app/system/router.py | 1 - app/system/service.py | 10 +- flake.nix | 51 ++-- poetry.lock | 83 +++++- pyproject.toml | 1 + scripts/sync_to_blitz.sh | 3 - 26 files changed, 286 insertions(+), 553 deletions(-) delete mode 100644 app/api/sse_manager.py create mode 100644 app/api/ws_manager.py delete mode 100644 app/external/__init__.py delete mode 100644 app/external/sse_starlette/LICENSE.md delete mode 100644 app/external/sse_starlette/__init__.py delete mode 100644 app/external/sse_starlette/sse_starlette.py diff --git a/app/api/sse_manager.py b/app/api/sse_manager.py deleted file mode 100644 index 9a3fdf6..0000000 --- a/app/api/sse_manager.py +++ /dev/null @@ -1,66 +0,0 @@ -import asyncio -from asyncio.log import logger -from typing import Tuple - -import async_timeout -from fastapi import Request - -from app.external.sse_starlette import EventSourceResponse, ServerSentEvent - - -class SSEManager: - _setup_finished = False - _num_connections = 0 - _connections = {} - _sse_queue = asyncio.Queue() - - def setup(self) -> None: - if self._setup_finished: - raise RuntimeError("SSEManager setup must not be called twice") - - loop = asyncio.get_event_loop() - loop.create_task(self._broadcast_data_sse()) - self._setup_finished = True - - def add_connection(self, request: Request) -> Tuple[EventSourceResponse, int]: - q = asyncio.Queue() - id = self._num_connections - self._num_connections += 1 - self._connections[id] = q - event_source = EventSourceResponse(self._subscribe(request, id, q)) - return (event_source, id) - - async def send_to_single(self, id: int, data: ServerSentEvent): - await self._connections[id].put(data) - - async def broadcast_to_all(self, data: ServerSentEvent): - await self._sse_queue.put(data) - - async def _subscribe(self, request: Request, id: int, q: asyncio.Queue): - try: - while True: - if await request.is_disconnected(): - logger.info(f"Client with ID {id} has disconnected") - self._connections.pop(id) - await request.close() - break - else: - data = await q.get() - yield data - except asyncio.CancelledError as e: - logger.info(f"CancelledError on client with ID {id}: {e}") - self._connections.pop(id) - await request.close() - - async def _broadcast_data_sse(self): - while True: - try: - async with async_timeout.timeout(1): - msg = await self._sse_queue.get() - if msg is not None: - for k in self._connections.keys(): - if self._connections.get(k): - await self._connections.get(k).put(msg) - await asyncio.sleep(0.01) - except asyncio.TimeoutError: - pass diff --git a/app/api/utils.py b/app/api/utils.py index d0ebbf3..39731e9 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -6,17 +6,15 @@ import re import time import warnings -from typing import Dict, Optional +from typing import Any, Dict, Optional from fastapi.encoders import jsonable_encoder from fastapi_plugins import redis_plugin from loguru import logger -from app.api.sse_manager import SSEManager -from app.external.sse_starlette import ServerSentEvent +from app.api.ws_manager import WebSocketManager -sse_mgr = SSEManager() -sse_mgr.setup() +ws_mgr = WebSocketManager() class ProcessResult: @@ -37,15 +35,8 @@ def __str__(self) -> str: ) -def build_sse_event(event: str, json_data: Optional[Dict]): - return ServerSentEvent( - event=event, - data=json.dumps(jsonable_encoder(json_data)), - ) - - -async def broadcast_sse_msg(event: str, json_data: Optional[Dict]): - """Broadcasts a message to all connected clients +async def broadcast_json_ws(event: str, json_data: dict[str, Any]): + """Broadcasts a json message to all connected clients Parameters ---------- @@ -54,8 +45,7 @@ async def broadcast_sse_msg(event: str, json_data: Optional[Dict]): data : dictionary, optional The data to include """ - - await sse_mgr.broadcast_to_all(build_sse_event(event, json_data)) + await ws_mgr.broadcast_json(json_data={"event": event, "data": json_data}) async def redis_get(key: str) -> str: @@ -108,6 +98,8 @@ class SSE: LN_FORWARD_SUCCESSES = "ln_forward_successes" WALLET_BALANCE = "wallet_balance" + SERVER_ERROR = "server_error" + # https://gist.github.com/risent/4cab3878d995bec7d1c2 # https://firebase.blog/posts/2015/02/the-2120-ways-to-ensure-unique_68 diff --git a/app/api/ws_manager.py b/app/api/ws_manager.py new file mode 100644 index 0000000..62b0658 --- /dev/null +++ b/app/api/ws_manager.py @@ -0,0 +1,54 @@ +from typing import Any + +from fastapi import WebSocket, WebSocketDisconnect +from loguru import logger + + +class WebSocketManager: + def __init__(self): + self._ctr = 0 + self.active_connections: dict[int, WebSocket] = {} + + async def connect(self, websocket: WebSocket) -> int: + await websocket.accept() + id = self._next_id() + self.active_connections[id] = websocket + logger.debug(f"Connecting Websocket with ID {id}") + + return id + + def disconnect(self, id: int): + del self.active_connections[id] + + async def send_json(self, id: int, data: dict[str, Any]): + websocket = self._get(id) + await websocket.send_json(data=data) + + async def send_personal_message(self, message: str, id: int): + await self._send(message, id) + + async def broadcast(self, message: str): + for id in self.active_connections.keys(): + await self._send(message, id) + + async def broadcast_json(self, json_data: dict[str, Any]): + for id in self.active_connections.keys(): + await self.send_json(id=id, data=json_data) + + async def _send(self, message: str, id: int): + try: + websocket = self._get(id) + await websocket.send_text(message) + except WebSocketDisconnect: + self.disconnect(id) + + def _get(self, id: int): + try: + return self.active_connections[id] + except Exception as e: + raise e + + def _next_id(self): + next_id = self._ctr + self._ctr += 1 + return next_id diff --git a/app/apps/impl/native_python.py b/app/apps/impl/native_python.py index ea099f8..0b3d1b8 100644 --- a/app/apps/impl/native_python.py +++ b/app/apps/impl/native_python.py @@ -1,31 +1,21 @@ -from fastapi import HTTPException, status - from app.apps.impl.apps_base import AppsBase -class _NotImplemented(HTTPException): - def __init__(self): - super().__init__( - status_code=status.HTTP_501_NOT_IMPLEMENTED, - detail="Not available in native python mode.", - ) - - class NativePythonApps(AppsBase): async def get_app_status_single(self, app_id: str): - raise _NotImplemented() + raise NotImplementedError() async def get_app_status(self): - raise _NotImplemented() + raise NotImplementedError() async def get_app_status_advanced(self, app_id: str): - raise _NotImplemented() + raise NotImplementedError() async def get_app_status_sub(self): - raise _NotImplemented() + raise NotImplementedError() async def install_app_sub(self, app_id: str): - raise _NotImplemented() + raise NotImplementedError() async def uninstall_app_sub(self, app_id: str, delete_data: bool): - raise _NotImplemented() + raise NotImplementedError() diff --git a/app/apps/impl/raspiblitz.py b/app/apps/impl/raspiblitz.py index 081594c..86effa4 100644 --- a/app/apps/impl/raspiblitz.py +++ b/app/apps/impl/raspiblitz.py @@ -11,7 +11,7 @@ from fastapi.encoders import jsonable_encoder from loguru import logger as logging -from app.api.utils import SSE, broadcast_sse_msg, call_sudo_script, parse_key_value_text +from app.api.utils import SSE, broadcast_json_ws, call_sudo_script, parse_key_value_text from app.apps.impl.apps_base import AppsBase available_app_ids = { @@ -196,7 +196,7 @@ async def install_app_sub(self, app_id: str): detail=app_id + "install script does not exist / is not supported", ) - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, {"id": app_id, "mode": "on", "result": "running", "details": ""}, ) @@ -212,7 +212,7 @@ async def uninstall_app_sub(self, app_id: str, delete_data: bool): status.HTTP_400_BAD_REQUEST, detail="script not exist/supported" ) - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, {"id": app_id, "mode": "off", "result": "running", "details": ""}, ) @@ -279,7 +279,7 @@ async def run_bonus_script(self, app_id: str, params: str): logging.error( f"FOUND `error=` returned by script: {stdoutData['error']}" ) - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, { "id": app_id, @@ -292,7 +292,7 @@ async def run_bonus_script(self, app_id: str, params: str): # stdout - consider also script had error elif "result" not in stdoutData: logging.error("NO `result=` returned by script:") - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, { "id": app_id, @@ -310,7 +310,7 @@ async def run_bonus_script(self, app_id: str, params: str): if updatedAppData["error"] != "": logging.warning("Error Detected ...") logging.warning(f"updatedAppData: {updatedAppData}") - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, { "id": app_id, @@ -324,7 +324,7 @@ async def run_bonus_script(self, app_id: str, params: str): elif mode == "on": if updatedAppData["installed"]: logging.info(f"WIN - install of {app_id} was effective") - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, { "id": app_id, @@ -335,14 +335,14 @@ async def run_bonus_script(self, app_id: str, params: str): "details": stdoutData["result"], }, ) - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALLED_APP_STATUS, [updatedAppData] ) else: logging.error(f"FAIL - {app_id} was not installed") logging.debug(f"updatedAppData: {updatedAppData}") logging.debug(f"params: {params}") - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, { "id": app_id, @@ -351,16 +351,16 @@ async def run_bonus_script(self, app_id: str, params: str): "details": "install was not effective", }, ) - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALLED_APP_STATUS, [updatedAppData] ) elif mode == "off": - await broadcast_sse_msg( + await broadcast_json_ws( SSE.INSTALL_APP, {"id": app_id, "mode": mode, "result": "win"}, ) - await broadcast_sse_msg(SSE.INSTALLED_APP_STATUS, [updatedAppData]) + await broadcast_json_ws(SSE.INSTALLED_APP_STATUS, [updatedAppData]) if not updatedAppData["installed"]: logging.info(f"WIN - uninstall of {app_id} was effective") diff --git a/app/apps/router.py b/app/apps/router.py index e34bae1..5528c8f 100644 --- a/app/apps/router.py +++ b/app/apps/router.py @@ -6,7 +6,6 @@ import app.apps.docs as docs import app.apps.service as repo from app.auth.auth_bearer import JWTBearer -from app.external.sse_starlette import EventSourceResponse _PREFIX = "apps" @@ -53,18 +52,6 @@ async def get_single_status_advanced(id: str = Path(..., required=True)): return await repo.get_app_status_advanced(id) -@router.get( - "/status-sub", - name=f"{_PREFIX}/status-sub", - summary="Subscribe to status changes of currently installed apps.", - response_description=docs.get_app_status_sub_response_docs, - dependencies=[Depends(JWTBearer())], -) -@logger.catch(exclude=(HTTPException,)) -async def get_status_sub(): - return EventSourceResponse(repo.get_app_status_sub()) - - @router.post( "/install/{name}", name=f"{_PREFIX}/install", diff --git a/app/apps/service.py b/app/apps/service.py index 12faba5..3685592 100644 --- a/app/apps/service.py +++ b/app/apps/service.py @@ -1,4 +1,5 @@ from decouple import config +from fastapi import HTTPException, status from app.system.models import APIPlatform @@ -21,7 +22,10 @@ async def get_app_status_single(app_id: str): async def get_app_status(): - return await apps.get_app_status() + try: + return await apps.get_app_status() + except NotImplementedError: + return HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED) async def get_app_status_advanced(app_id: str): diff --git a/app/bitcoind/router.py b/app/bitcoind/router.py index 156b762..ea6415c 100644 --- a/app/bitcoind/router.py +++ b/app/bitcoind/router.py @@ -19,7 +19,6 @@ handle_block_sub, ) from app.bitcoind.utils import bitcoin_rpc -from app.external.sse_starlette import EventSourceResponse _PREFIX = "bitcoin" @@ -132,18 +131,6 @@ async def getnetworkinfo(): async def get_raw_transaction_path( txid: str = Query( ..., min_length=64, max_length=64, description="The transaction id" - ) + ), ): return await get_raw_transaction(txid) - - -@router.get( - "/block-sub", - name=f"{_PREFIX}.block-sub", - summary="Subscribe to incoming blocks.", - description=blocks_sub_doc, - response_description="A JSON object with information about the new block.", - dependencies=[Depends(JWTBearer())], -) -async def zmq_sub(request: Request, verbosity: int = 1): - return EventSourceResponse(handle_block_sub(request, verbosity)) diff --git a/app/bitcoind/service.py b/app/bitcoind/service.py index 20e6a95..f47c9ed 100644 --- a/app/bitcoind/service.py +++ b/app/bitcoind/service.py @@ -10,7 +10,7 @@ from loguru import logger from starlette import status -from app.api.utils import SSE, broadcast_sse_msg +from app.api.utils import SSE, broadcast_json_ws from app.bitcoind.models import ( BlockchainInfo, BlockRpcFunc, @@ -187,7 +187,7 @@ async def handle_block_sub_redis(verbosity: int = 1) -> str: ) r = await bitcoin_rpc_async("getblock", [hash, verbosity]) - await broadcast_sse_msg(SSE.BTC_NEW_BLOC, r["result"]) + await broadcast_json_ws(SSE.BTC_NEW_BLOC, r["result"]) @logger.catch(exclude=(HTTPException,)) @@ -213,7 +213,7 @@ async def _handle_gather_bitcoin_status(): if last_info != info: # only send data if anything has changed - await broadcast_sse_msg(SSE.BTC_INFO, info.model_dump()) + await broadcast_json_ws(SSE.BTC_INFO, info.model_dump()) last_info = info await asyncio.sleep(2) diff --git a/app/external/__init__.py b/app/external/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/app/external/sse_starlette/LICENSE.md b/app/external/sse_starlette/LICENSE.md deleted file mode 100644 index 3d12971..0000000 --- a/app/external/sse_starlette/LICENSE.md +++ /dev/null @@ -1,27 +0,0 @@ -Copyright © 2020, [sysid](https://sysid.github.io/). -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/app/external/sse_starlette/__init__.py b/app/external/sse_starlette/__init__.py deleted file mode 100644 index d182987..0000000 --- a/app/external/sse_starlette/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from app.external.sse_starlette.sse_starlette import ( - EventSourceResponse, - ServerSentEvent, -) - -__all__ = ["ServerSentEvent", "EventSourceResponse"] -__version__ = "1.1.6" diff --git a/app/external/sse_starlette/sse_starlette.py b/app/external/sse_starlette/sse_starlette.py deleted file mode 100644 index e931e52..0000000 --- a/app/external/sse_starlette/sse_starlette.py +++ /dev/null @@ -1,263 +0,0 @@ -import enum -import inspect -import io -import logging -import re -from datetime import datetime -from functools import partial -from typing import Any, Callable, Coroutine, Dict, Optional, Union - -import anyio -from starlette.background import BackgroundTask -from starlette.concurrency import iterate_in_threadpool -from starlette.responses import Response -from starlette.types import Receive, Scope, Send - -_log = logging.getLogger(__name__) - - -# https://stackoverflow.com/questions/58133694/graceful-shutdown-of-uvicorn-starlette-app-with-websockets -class AppStatus: - """helper for monkey-patching the signal-handler of uvicorn""" - - should_exit = False - - @staticmethod - def handle_exit(*args, **kwargs): - AppStatus.should_exit = True - original_handler(*args, **kwargs) - - -try: - from uvicorn.main import Server # type: ignore - - original_handler = Server.handle_exit - Server.handle_exit = AppStatus.handle_exit - - def unpatch_uvicorn_signal_handler(): - """restores original signal-handler and rolls back monkey-patching. - Normally this should not be necessary. - """ - Server.handle_exit = original_handler - -except ModuleNotFoundError: - _log.debug("Uvicorn not used.") - - -class SseState(enum.Enum): - CONNECTING = 0 - OPENED = 1 - CLOSED = 2 - - -class ServerSentEvent: - def __init__( - self, - data: Optional[Any] = None, - *, - event: Optional[str] = None, - id: Optional[int] = None, - retry: Optional[int] = None, - comment: Optional[str] = None, - sep: Optional[str] = None, - ) -> None: - """Send data using EventSource protocol - :param str data: The data field for the message. - :param str id: The event ID to set the EventSource object's last - event ID value to. - :param str event: The event's type. If this is specified, an event will - be dispatched on the browser to the listener for the specified - event name; the web site would use addEventListener() to listen - for named events. The default event type is "message". - :param int retry: The reconnection time to use when attempting to send - the event. [What code handles this?] This must be an integer, - specifying the reconnection time in milliseconds. If a non-integer - value is specified, the field is ignored. - :param str comment: A colon as the first character of a line is essence - a comment, and is ignored. Usually used as a ping message to keep - connecting. f set, this will be a comment message. - """ - self.data = data - self.event = event - self.id = id - self.retry = retry - self.comment = comment - self.DEFAULT_SEPARATOR = "\r\n" - self.LINE_SEP_EXPR = re.compile(r"\r\n|\r|\n") - self._sep = sep if sep is not None else self.DEFAULT_SEPARATOR - - def encode(self) -> bytes: - buffer = io.StringIO() - if self.comment is not None: - for chunk in self.LINE_SEP_EXPR.split(str(self.comment)): - buffer.write(f": {chunk}") - buffer.write(self._sep) - return buffer.getvalue().encode("utf-8") - - if self.id is not None: - buffer.write(self.LINE_SEP_EXPR.sub("", f"id: {self.id}")) - buffer.write(self._sep) - - if self.event is not None: - buffer.write(self.LINE_SEP_EXPR.sub("", f"event: {self.event}")) - buffer.write(self._sep) - - for chunk in self.LINE_SEP_EXPR.split(str(self.data)): - buffer.write(f"data: {chunk}") - buffer.write(self._sep) - - if self.retry is not None: - if not isinstance(self.retry, int): - raise TypeError("retry argument must be int") - buffer.write(f"retry: {self.retry}") - buffer.write(self._sep) - - buffer.write(self._sep) - return buffer.getvalue().encode("utf-8") - - -def ensure_bytes(data: Union[bytes, dict, ServerSentEvent, Any]) -> bytes: - if isinstance(data, bytes): - return data - elif isinstance(data, ServerSentEvent): - return data.encode() - elif isinstance(data, dict): - return ServerSentEvent(**data).encode() - else: - return ServerSentEvent(str(data)).encode() - - -class EventSourceResponse(Response): - """Implements the ServerSentEvent Protocol: - https://www.w3.org/TR/2009/WD-eventsource-20090421/ - Responses must not be compressed by middleware in order to work. - """ - - DEFAULT_PING_INTERVAL = 15 - - # follow Starlette StreamingResponse - # noinspection PyMissingConstructor - def __init__( - self, - content: Any, - status_code: int = 200, - headers: Optional[Dict] = None, - media_type: str = "text/event-stream", - background: Optional[BackgroundTask] = None, - ping: Optional[int] = None, - sep: Optional[str] = None, - ping_message_factory: Optional[Callable[[], ServerSentEvent]] = None, - ) -> None: - self.sep = sep - self.ping_message_factory = ping_message_factory - if inspect.isasyncgen(content): - self.body_iterator = ( - content - ) # type: AsyncIterable[Union[Any,dict,ServerSentEvent]] - else: - self.body_iterator = iterate_in_threadpool(content) # type: ignore - self.status_code = status_code - self.media_type = self.media_type if media_type is None else media_type - self.background = background # type: ignore # follows https://github.com/encode/starlette/blob/master/starlette/responses.py - - _headers = {} - if headers is not None: # pragma: no cover - _headers.update(headers) - - # mandatory for servers-sent events headers - # allow cache control header to be set by user to support fan out proxies - # https://www.fastly.com/blog/server-sent-events-fastly - _headers.setdefault("Cache-Control", "no-cache") - _headers["Connection"] = "keep-alive" - _headers["X-Accel-Buffering"] = "no" - - self.init_headers(_headers) - - self.ping_interval = self.DEFAULT_PING_INTERVAL if ping is None else ping - self.active = True - - self._ping_task = None - - @staticmethod - async def listen_for_disconnect(receive: Receive) -> None: - while True: - message = await receive() - if message["type"] == "http.disconnect": - _log.debug("Got event: http.disconnect. Stop streaming.") - break - - @staticmethod - async def listen_for_exit_signal() -> None: - while not AppStatus.should_exit: - await anyio.sleep(1.0) - - async def stream_response(self, send) -> None: - await send( - { - "type": "http.response.start", - "status": self.status_code, - "headers": self.raw_headers, - } - ) - async for data in self.body_iterator: - chunk = ensure_bytes(data) - _log.debug(f"chunk: {chunk.decode()}") - await send({"type": "http.response.body", "body": chunk, "more_body": True}) - - await send({"type": "http.response.body", "body": b"", "more_body": False}) - - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: - async with anyio.create_task_group() as task_group: - # https://trio.readthedocs.io/en/latest/reference-core.html#custom-supervisors - async def wrap(func: Callable[[], Coroutine[None, None, None]]) -> None: - await func() - # noinspection PyAsyncCall - task_group.cancel_scope.cancel() - - task_group.start_soon(wrap, partial(self.stream_response, send)) - task_group.start_soon(wrap, partial(self._ping, send)) - task_group.start_soon(wrap, self.listen_for_exit_signal) - await wrap(partial(self.listen_for_disconnect, receive)) - - if self.background is not None: # pragma: no cover, tested in StreamResponse - await self.background() - - def enable_compression(self, force: bool = False) -> None: - raise NotImplementedError - - @property - def ping_interval(self) -> Union[int, float]: - """Time interval between two ping massages""" - return self._ping_interval - - @ping_interval.setter - def ping_interval(self, value: Union[int, float]) -> None: - """Setter for ping_interval property. - :param int value: interval in sec between two ping values. - """ - - if not isinstance(value, (int, float)): - raise TypeError("ping interval must be int") - if value < 0: - raise ValueError("ping interval must be greater then 0") - - self._ping_interval = value - - async def _ping(self, send: Send) -> None: - # Legacy proxy servers are known to, in certain cases, drop HTTP connections - # after a short timeout. To protect against such proxy servers, authors can - # send a custom (ping) event every 15 seconds or so. - # Alternatively one can send periodically a comment line - # (one starting with a ':' character) - while self.active: - await anyio.sleep(self._ping_interval) - if self.ping_message_factory: - # https://github.com/python/mypy/issues/6864 - assert isinstance(self.ping_message_factory, Callable) # type: ignore - ping = ( - ServerSentEvent(datetime.utcnow(), event="ping").encode() - if self.ping_message_factory is None - else ensure_bytes(self.ping_message_factory()) - ) - _log.debug(f"ping: {ping.decode()}") - await send({"type": "http.response.body", "body": ping, "more_body": True}) diff --git a/app/lightning/impl/cln_grpc.py b/app/lightning/impl/cln_grpc.py index 21e2ddb..de40beb 100644 --- a/app/lightning/impl/cln_grpc.py +++ b/app/lightning/impl/cln_grpc.py @@ -12,7 +12,7 @@ import app.lightning.impl.protos.cln.node_pb2 as ln import app.lightning.impl.protos.cln.node_pb2_grpc as clnrpc import app.lightning.impl.protos.cln.primitives_pb2 as lnp -from app.api.utils import SSE, broadcast_sse_msg, config_get_hex_str, next_push_id +from app.api.utils import SSE, broadcast_json_ws, config_get_hex_str, next_push_id from app.bitcoind.utils import bitcoin_rpc_async from app.lightning.exceptions import NodeNotFoundError from app.lightning.impl.cln_utils import cln_classify_fee_revenue, parse_cln_msat @@ -647,7 +647,7 @@ async def send_coins(self, input: SendCoinsInput) -> SendCoinsResponse: ) response = await self._cln_stub.Withdraw(req) r = SendCoinsResponse.from_cln_grpc(response, input) - await broadcast_sse_msg(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) + await broadcast_json_ws(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) return r except grpc.aio._call.AioRpcError as error: details = error.details() diff --git a/app/lightning/impl/cln_jrpc.py b/app/lightning/impl/cln_jrpc.py index 2f55785..a709050 100644 --- a/app/lightning/impl/cln_jrpc.py +++ b/app/lightning/impl/cln_jrpc.py @@ -10,7 +10,7 @@ from loguru import logger from starlette import status -from app.api.utils import SSE, broadcast_sse_msg, next_push_id +from app.api.utils import SSE, broadcast_json_ws, next_push_id from app.bitcoind.utils import bitcoin_rpc_async from app.lightning.exceptions import NodeNotFoundError from app.lightning.impl.cln_utils import ( @@ -536,7 +536,7 @@ async def send_coins(self, input: SendCoinsInput) -> SendCoinsResponse: if "error" not in res: res = res["result"] r = SendCoinsResponse.from_cln_json(res, input) - await broadcast_sse_msg(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) + await broadcast_json_ws(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) return r diff --git a/app/lightning/impl/lnd_grpc.py b/app/lightning/impl/lnd_grpc.py index 0d96eae..17e7988 100644 --- a/app/lightning/impl/lnd_grpc.py +++ b/app/lightning/impl/lnd_grpc.py @@ -15,7 +15,7 @@ import app.lightning.impl.protos.lnd.router_pb2_grpc as routerrpc import app.lightning.impl.protos.lnd.walletunlocker_pb2 as unlocker import app.lightning.impl.protos.lnd.walletunlocker_pb2_grpc as unlockerrpc -from app.api.utils import SSE, broadcast_sse_msg, config_get_hex_str +from app.api.utils import SSE, broadcast_json_ws, config_get_hex_str from app.lightning.exceptions import NodeNotFoundError from app.lightning.impl.ln_base import LightningNodeBase from app.lightning.models import ( @@ -572,7 +572,7 @@ async def send_coins(self, input: SendCoinsInput) -> SendCoinsResponse: break r = SendCoinsResponse.from_lnd_grpc(tx, input) - await broadcast_sse_msg(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) + await broadcast_json_ws(SSE.LN_ONCHAIN_PAYMENT_STATUS, r.model_dump()) return r except grpc.aio._call.AioRpcError as error: _check_if_locked(error) @@ -619,7 +619,7 @@ async def send_payment( p = None async for response in self._router_stub.SendPaymentV2(r): p = Payment.from_lnd_grpc(response) - await broadcast_sse_msg(SSE.LN_PAYMENT_STATUS, p.model_dump()) + await broadcast_json_ws(SSE.LN_PAYMENT_STATUS, p.model_dump()) return p except grpc.aio._call.AioRpcError as error: _check_if_locked(error) diff --git a/app/lightning/impl/protos/lnd/flake.nix b/app/lightning/impl/protos/lnd/flake.nix index 6f2d553..2bf7f7f 100644 --- a/app/lightning/impl/protos/lnd/flake.nix +++ b/app/lightning/impl/protos/lnd/flake.nix @@ -6,15 +6,18 @@ flake-utils.url = "github:numtide/flake-utils"; }; - outputs = { self, nixpkgs, flake-utils }: - flake-utils.lib.eachDefaultSystem (system: - let - pkgs = import nixpkgs { inherit system; }; - in - { + outputs = { + self, + nixpkgs, + flake-utils, + }: + flake-utils.lib.eachDefaultSystem ( + system: let + pkgs = import nixpkgs {inherit system;}; + in { devShell = pkgs.mkShell { buildInputs = with pkgs; [ - (python3.withPackages (ps: [ ps.grpcio ps.grpcio-tools ])) + (python3.withPackages (ps: [ps.grpcio ps.grpcio-tools])) curl jq ]; diff --git a/app/lightning/service.py b/app/lightning/service.py index b861c43..db538de 100644 --- a/app/lightning/service.py +++ b/app/lightning/service.py @@ -6,7 +6,7 @@ from fastapi.exceptions import HTTPException from loguru import logger -from app.api.utils import SSE, broadcast_sse_msg, redis_get +from app.api.utils import SSE, broadcast_json_ws, redis_get from app.lightning.models import ( Channel, FeeRevenue, @@ -220,13 +220,13 @@ async def _handle_info_listener(): info = await ln.get_ln_info() if last_info != info: - await broadcast_sse_msg(SSE.LN_INFO, info.model_dump()) + await broadcast_json_ws(SSE.LN_INFO, info.model_dump()) last_info = info info_lite = LightningInfoLite.from_lninfo(info) if last_info_lite != info_lite: - await broadcast_sse_msg(SSE.LN_INFO_LITE, info_lite.model_dump()) + await broadcast_json_ws(SSE.LN_INFO_LITE, info_lite.model_dump()) last_info_lite = info_lite await asyncio.sleep(GATHER_INFO_INTERVALL) @@ -234,7 +234,7 @@ async def _handle_info_listener(): async def _handle_invoice_listener(): async for i in ln.listen_invoices(): - await broadcast_sse_msg(SSE.LN_INVOICE_STATUS, i.model_dump()) + await broadcast_json_ws(SSE.LN_INVOICE_STATUS, i.model_dump()) _schedule_wallet_balance_update() @@ -254,11 +254,11 @@ async def _schedule_fwd_update(): if len(_fwd_successes) > 0: sending_successes = _fwd_successes _fwd_successes = [] - await broadcast_sse_msg(SSE.LN_FORWARD_SUCCESSES, sending_successes) + await broadcast_json_ws(SSE.LN_FORWARD_SUCCESSES, sending_successes) _schedule_wallet_balance_update() rev = await get_fee_revenue() - await broadcast_sse_msg(SSE.LN_FEE_REVENUE, rev.model_dump()) + await broadcast_json_ws(SSE.LN_FEE_REVENUE, rev.model_dump()) _fwd_update_scheduled = False @@ -281,7 +281,7 @@ async def _perform_update(): await asyncio.sleep(1.1) wb = await ln.get_wallet_balance() if _CACHE["wallet_balance"] != wb: - await broadcast_sse_msg(SSE.WALLET_BALANCE, wb.model_dump()) + await broadcast_json_ws(SSE.WALLET_BALANCE, wb.model_dump()) _CACHE["wallet_balance"] = wb _wallet_balance_update_scheduled = False diff --git a/app/main.py b/app/main.py index e9bc207..49132e0 100644 --- a/app/main.py +++ b/app/main.py @@ -3,8 +3,7 @@ from contextlib import asynccontextmanager from decouple import config as dconfig -from fastapi import FastAPI, Request -from fastapi.encoders import jsonable_encoder +from fastapi import Depends, FastAPI, Request, WebSocket from fastapi.exceptions import HTTPException from fastapi_plugins import ( RedisSettings, @@ -19,7 +18,7 @@ from starlette.responses import RedirectResponse from app.api.models import ApiStartupStatus, StartupState -from app.api.utils import SSE, broadcast_sse_msg, build_sse_event, sse_mgr +from app.api.utils import SSE, broadcast_json_ws, ws_mgr from app.api.warmup import ( get_bitcoin_client_warmup_data, get_full_client_warmup_data, @@ -86,7 +85,7 @@ async def lifespan(app: FastAPI): await redis_plugin.init_app(app, config=config) await redis_plugin.init() register_cookie_updater() - await broadcast_sse_msg(SSE.SYSTEM_STARTUP_INFO, api_startup_status.model_dump()) + await broadcast_json_ws(SSE.SYSTEM_STARTUP_INFO, api_startup_status.model_dump()) loop = asyncio.get_event_loop() loop.create_task(_initialize_bitcoin()) loop.create_task(_initialize_lightning()) @@ -145,7 +144,7 @@ async def _set_startup_status( loop = asyncio.get_event_loop() loop.create_task(warmup_new_connections()) - await broadcast_sse_msg(SSE.SYSTEM_STARTUP_INFO, api_startup_status.model_dump()) + await broadcast_json_ws(SSE.SYSTEM_STARTUP_INFO, api_startup_status.model_dump()) @logger.catch @@ -230,46 +229,31 @@ def index(req: Request): new_connections = [] -def _send_sse_event(id, event, data): - return sse_mgr.send_to_single(id, build_sse_event(event, data)) - - -@app.get( - "/sse/subscribe", - status_code=status.HTTP_200_OK, +@logger.catch(exclude=(HTTPException,)) +@app.websocket( + "/ws", + dependencies=[Depends(JWTBearer())], ) -async def stream(request: Request): - token = request.cookies.get("access_token") - if not token: - # No token in cookies found, try to get it from the Authorization header - token = request.headers.get("authorization") - - if not token: - # Raise an exception as there is not token found - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="No authorization code.", - ) - - token = token.replace("Bearer ", "") - if not JWTBearer().verify_jwt(jwtoken=token): - # token is invalid - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid authorization code.", +async def ws(websocket: WebSocket): + try: + id = await ws_mgr.connect(websocket) + + new_connections.append(id) + loop = asyncio.get_event_loop() + loop.create_task(warmup_new_connections()) + await ws_mgr.send_json( + id=id, + data={ + "event": SSE.SYSTEM_STARTUP_INFO, + "data": api_startup_status.model_dump(), + }, ) - event_source, id = sse_mgr.add_connection(request) - new_connections.append(id) - - await _send_sse_event( - id, SSE.SYSTEM_STARTUP_INFO, jsonable_encoder(api_startup_status.model_dump()) - ) - - loop = asyncio.get_event_loop() - loop.create_task(warmup_new_connections()) - - return event_source + while True: + await websocket.receive_text() + await websocket.send_text("Read only WebSocket") + except Exception as e: + logger.error(e) warmup_running = False @@ -281,14 +265,22 @@ async def warmup_new_connections(): # when the startup state changes. Especially the hardware info # is rather data intensive. This is OK for now, to keep the code simple. - async def _handle(id, event, res): + async def _send(id, event, res): if isinstance(res, BaseModel): - return await _send_sse_event(id, event, res.model_dump()) + e = {"event": event, "data": res.model_dump()} + return await ws_mgr.send_json(id, e) elif isinstance(res, dict) or isinstance(res, list): - return await _send_sse_event(id, event, res) + e = {"event": event, "data": res} + return await ws_mgr.send_json(id, e) + elif isinstance(res, HTTPException): + e = { + "event": event, + "data": {"error_code": res.status_code, "detail": res.detail}, + } + return await ws_mgr.send_json(id, e) logger.error(f"Error while fetching warmup_data for {event}: {res}") - return await _send_sse_event(id, event, {"error": f"{res}"}) + return await ws_mgr.send_json(id, {"event": SSE.SERVER_ERROR, "data": res}) global new_connections if len(new_connections) == 0: @@ -309,14 +301,14 @@ async def _handle(id, event, res): for id in new_connections: await asyncio.gather( *[ - _handle(id, SSE.SYSTEM_INFO, res[0]), - _handle(id, SSE.BTC_INFO, res[1]), - _handle(id, SSE.LN_INFO, res[2]), - _handle(id, SSE.LN_INFO_LITE, res[3]), - _handle(id, SSE.LN_FEE_REVENUE, res[4]), - _handle(id, SSE.WALLET_BALANCE, res[5]), - _handle(id, SSE.INSTALLED_APP_STATUS, res[6]), - _handle(id, SSE.HARDWARE_INFO, res[7]), + _send(id, SSE.SYSTEM_INFO, res[0]), + _send(id, SSE.BTC_INFO, res[1]), + _send(id, SSE.LN_INFO, res[2]), + _send(id, SSE.LN_INFO_LITE, res[3]), + _send(id, SSE.LN_FEE_REVENUE, res[4]), + _send(id, SSE.WALLET_BALANCE, res[5]), + _send(id, SSE.INSTALLED_APP_STATUS, res[6]), + _send(id, SSE.HARDWARE_INFO, res[7]), ] ) @@ -326,9 +318,9 @@ async def _handle(id, event, res): for id in new_connections: await asyncio.gather( *[ - _handle(id, SSE.SYSTEM_INFO, res[0]), - _handle(id, SSE.BTC_INFO, res[1]), - _handle(id, SSE.HARDWARE_INFO, res[2]), + _send(id, SSE.SYSTEM_INFO, res[0]), + _send(id, SSE.BTC_INFO, res[1]), + _send(id, SSE.HARDWARE_INFO, res[2]), ] ) @@ -342,8 +334,8 @@ async def _handle(id, event, res): for id in new_connections: await asyncio.gather( *[ - _handle(id, SSE.BTC_INFO, res[0]), - _handle(id, SSE.HARDWARE_INFO, res[1]), + _send(id, SSE.BTC_INFO, res[0]), + _send(id, SSE.HARDWARE_INFO, res[1]), ] ) @@ -357,7 +349,7 @@ async def _handle(id, event, res): # Bitcoin Core and Lightning running res = await get_hardware_info() for id in new_connections: - await _send_sse_event(id, SSE.HARDWARE_INFO, res) + await _send(id, SSE.HARDWARE_INFO, res) # don't clear new_connections, we'll try again later when api is initialized diff --git a/app/system/impl/raspiblitz.py b/app/system/impl/raspiblitz.py index 91dcae0..4323717 100644 --- a/app/system/impl/raspiblitz.py +++ b/app/system/impl/raspiblitz.py @@ -10,7 +10,7 @@ from app.api.constants import API_VERSION from app.api.utils import ( SSE, - broadcast_sse_msg, + broadcast_json_ws, call_script, call_sudo_script, parse_key_value_text, @@ -103,9 +103,9 @@ async def shutdown(self, reboot: bool) -> bool: if proc.returncode > 0: err = stderr.decode() if reboot: - await broadcast_sse_msg(SSE.SYSTEM_REBOOT_ERROR, {"error_message": err}) + await broadcast_json_ws(SSE.SYSTEM_REBOOT_ERROR, {"error_message": err}) else: - await broadcast_sse_msg( + await broadcast_json_ws( SSE.SYSTEM_SHUTDOWN_ERROR, {"error_message": err} ) diff --git a/app/system/router.py b/app/system/router.py index 6c0943c..bc929be 100644 --- a/app/system/router.py +++ b/app/system/router.py @@ -6,7 +6,6 @@ from app.api.utils import SSE from app.auth.auth_bearer import JWTBearer from app.auth.auth_handler import sign_jwt -from app.external.sse_starlette import EventSourceResponse from app.system.docs import ( get_debug_logs_raw_desc, get_debug_logs_raw_resp_desc, diff --git a/app/system/service.py b/app/system/service.py index 3b7e48b..4a00a9a 100644 --- a/app/system/service.py +++ b/app/system/service.py @@ -1,10 +1,10 @@ import asyncio -from typing import Dict, Optional +from typing import Any, Dict, Optional from decouple import config from fastapi import HTTPException, Request, status -from app.api.utils import SSE, broadcast_sse_msg +from app.api.utils import SSE, broadcast_json_ws from app.system.models import ( APIPlatform, ConnectionInfo, @@ -76,9 +76,9 @@ async def get_connection_info() -> ConnectionInfo: async def shutdown(reboot: bool) -> bool: if reboot: - await broadcast_sse_msg(SSE.SYSTEM_REBOOT_NOTICE, {"reboot": True}) + await broadcast_json_ws(SSE.SYSTEM_REBOOT_NOTICE, {"reboot": True}) else: - await broadcast_sse_msg(SSE.SYSTEM_SHUTDOWN_NOTICE, {"shutdown": True}) + await broadcast_json_ws(SSE.SYSTEM_SHUTDOWN_NOTICE, {"shutdown": True}) try: return await system.shutdown(reboot=reboot) @@ -111,7 +111,7 @@ async def _handle_gather_hardware_info(): while True: info = await get_hardware_info() if last_info != info: - await broadcast_sse_msg(SSE.HARDWARE_INFO, info) + await broadcast_json_ws(SSE.HARDWARE_INFO, info) last_info = info await asyncio.sleep(HW_INFO_YIELD_TIME) diff --git a/flake.nix b/flake.nix index 5b2a4a5..1eee30b 100644 --- a/flake.nix +++ b/flake.nix @@ -10,26 +10,35 @@ }; }; - outputs = { self, nixpkgs, flake-utils, poetry2nix }: - flake-utils.lib.eachDefaultSystem (system: - let - # see https://github.com/nix-community/poetry2nix/tree/master#api for more functions and examples. - pkgs = nixpkgs.legacyPackages.${system}; - inherit (poetry2nix.lib.mkPoetry2Nix { inherit pkgs; }) mkPoetryApplication; - in - { - packages = { - myapp = mkPoetryApplication { projectDir = self; }; - default = self.packages.${system}.myapp; - }; + outputs = { + self, + nixpkgs, + flake-utils, + poetry2nix, + }: + flake-utils.lib.eachDefaultSystem (system: let + # see https://github.com/nix-community/poetry2nix/tree/master#api for more functions and examples. + pkgs = nixpkgs.legacyPackages.${system}; + inherit (poetry2nix.lib.mkPoetry2Nix {inherit pkgs;}) mkPoetryApplication; + in { + packages = { + myapp = mkPoetryApplication {projectDir = self;}; + default = self.packages.${system}.myapp; + }; - devShells.default = pkgs.mkShell { - inputsFrom = [ self.packages.${system}.myapp ]; - packages = with pkgs; [ - poetry - pyright - sshpass - ]; - }; - }); + devShells.default = pkgs.mkShell { + inputsFrom = [self.packages.${system}.myapp]; + packages = with pkgs; [ + alejandra + claws # testing websockets: https://github.com/thehowl/claws + redis + poetry + pyright + sshpass + ]; + shellHook = '' + LD_LIBRARY_PATH="${pkgs.stdenv.cc.cc.lib}/lib"; + ''; + }; + }); } diff --git a/poetry.lock b/poetry.lock index f5eac79..738e84e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1983,6 +1983,87 @@ platformdirs = ">=3.9.1,<5" docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"] +[[package]] +name = "websockets" +version = "12.0" +description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)" +optional = false +python-versions = ">=3.8" +files = [ + {file = "websockets-12.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d554236b2a2006e0ce16315c16eaa0d628dab009c33b63ea03f41c6107958374"}, + {file = "websockets-12.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2d225bb6886591b1746b17c0573e29804619c8f755b5598d875bb4235ea639be"}, + {file = "websockets-12.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:eb809e816916a3b210bed3c82fb88eaf16e8afcf9c115ebb2bacede1797d2547"}, + {file = "websockets-12.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c588f6abc13f78a67044c6b1273a99e1cf31038ad51815b3b016ce699f0d75c2"}, + {file = "websockets-12.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5aa9348186d79a5f232115ed3fa9020eab66d6c3437d72f9d2c8ac0c6858c558"}, + {file = "websockets-12.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6350b14a40c95ddd53e775dbdbbbc59b124a5c8ecd6fbb09c2e52029f7a9f480"}, + {file = "websockets-12.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:70ec754cc2a769bcd218ed8d7209055667b30860ffecb8633a834dde27d6307c"}, + {file = "websockets-12.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:6e96f5ed1b83a8ddb07909b45bd94833b0710f738115751cdaa9da1fb0cb66e8"}, + {file = "websockets-12.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:4d87be612cbef86f994178d5186add3d94e9f31cc3cb499a0482b866ec477603"}, + {file = "websockets-12.0-cp310-cp310-win32.whl", hash = "sha256:befe90632d66caaf72e8b2ed4d7f02b348913813c8b0a32fae1cc5fe3730902f"}, + {file = "websockets-12.0-cp310-cp310-win_amd64.whl", hash = "sha256:363f57ca8bc8576195d0540c648aa58ac18cf85b76ad5202b9f976918f4219cf"}, + {file = "websockets-12.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:5d873c7de42dea355d73f170be0f23788cf3fa9f7bed718fd2830eefedce01b4"}, + {file = "websockets-12.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3f61726cae9f65b872502ff3c1496abc93ffbe31b278455c418492016e2afc8f"}, + {file = "websockets-12.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ed2fcf7a07334c77fc8a230755c2209223a7cc44fc27597729b8ef5425aa61a3"}, + {file = "websockets-12.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e332c210b14b57904869ca9f9bf4ca32f5427a03eeb625da9b616c85a3a506c"}, + {file = "websockets-12.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5693ef74233122f8ebab026817b1b37fe25c411ecfca084b29bc7d6efc548f45"}, + {file = "websockets-12.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6e9e7db18b4539a29cc5ad8c8b252738a30e2b13f033c2d6e9d0549b45841c04"}, + {file = "websockets-12.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6e2df67b8014767d0f785baa98393725739287684b9f8d8a1001eb2839031447"}, + {file = "websockets-12.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:bea88d71630c5900690fcb03161ab18f8f244805c59e2e0dc4ffadae0a7ee0ca"}, + {file = "websockets-12.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:dff6cdf35e31d1315790149fee351f9e52978130cef6c87c4b6c9b3baf78bc53"}, + {file = "websockets-12.0-cp311-cp311-win32.whl", hash = "sha256:3e3aa8c468af01d70332a382350ee95f6986db479ce7af14d5e81ec52aa2b402"}, + {file = "websockets-12.0-cp311-cp311-win_amd64.whl", hash = "sha256:25eb766c8ad27da0f79420b2af4b85d29914ba0edf69f547cc4f06ca6f1d403b"}, + {file = "websockets-12.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:0e6e2711d5a8e6e482cacb927a49a3d432345dfe7dea8ace7b5790df5932e4df"}, + {file = "websockets-12.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:dbcf72a37f0b3316e993e13ecf32f10c0e1259c28ffd0a85cee26e8549595fbc"}, + {file = "websockets-12.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:12743ab88ab2af1d17dd4acb4645677cb7063ef4db93abffbf164218a5d54c6b"}, + {file = "websockets-12.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7b645f491f3c48d3f8a00d1fce07445fab7347fec54a3e65f0725d730d5b99cb"}, + {file = "websockets-12.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9893d1aa45a7f8b3bc4510f6ccf8db8c3b62120917af15e3de247f0780294b92"}, + {file = "websockets-12.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f38a7b376117ef7aff996e737583172bdf535932c9ca021746573bce40165ed"}, + {file = "websockets-12.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:f764ba54e33daf20e167915edc443b6f88956f37fb606449b4a5b10ba42235a5"}, + {file = "websockets-12.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:1e4b3f8ea6a9cfa8be8484c9221ec0257508e3a1ec43c36acdefb2a9c3b00aa2"}, + {file = "websockets-12.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:9fdf06fd06c32205a07e47328ab49c40fc1407cdec801d698a7c41167ea45113"}, + {file = "websockets-12.0-cp312-cp312-win32.whl", hash = "sha256:baa386875b70cbd81798fa9f71be689c1bf484f65fd6fb08d051a0ee4e79924d"}, + {file = "websockets-12.0-cp312-cp312-win_amd64.whl", hash = "sha256:ae0a5da8f35a5be197f328d4727dbcfafa53d1824fac3d96cdd3a642fe09394f"}, + {file = "websockets-12.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:5f6ffe2c6598f7f7207eef9a1228b6f5c818f9f4d53ee920aacd35cec8110438"}, + {file = "websockets-12.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:9edf3fc590cc2ec20dc9d7a45108b5bbaf21c0d89f9fd3fd1685e223771dc0b2"}, + {file = "websockets-12.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8572132c7be52632201a35f5e08348137f658e5ffd21f51f94572ca6c05ea81d"}, + {file = "websockets-12.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:604428d1b87edbf02b233e2c207d7d528460fa978f9e391bd8aaf9c8311de137"}, + {file = "websockets-12.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1a9d160fd080c6285e202327aba140fc9a0d910b09e423afff4ae5cbbf1c7205"}, + {file = "websockets-12.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:87b4aafed34653e465eb77b7c93ef058516cb5acf3eb21e42f33928616172def"}, + {file = "websockets-12.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:b2ee7288b85959797970114deae81ab41b731f19ebcd3bd499ae9ca0e3f1d2c8"}, + {file = "websockets-12.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:7fa3d25e81bfe6a89718e9791128398a50dec6d57faf23770787ff441d851967"}, + {file = "websockets-12.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:a571f035a47212288e3b3519944f6bf4ac7bc7553243e41eac50dd48552b6df7"}, + {file = "websockets-12.0-cp38-cp38-win32.whl", hash = "sha256:3c6cc1360c10c17463aadd29dd3af332d4a1adaa8796f6b0e9f9df1fdb0bad62"}, + {file = "websockets-12.0-cp38-cp38-win_amd64.whl", hash = "sha256:1bf386089178ea69d720f8db6199a0504a406209a0fc23e603b27b300fdd6892"}, + {file = "websockets-12.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:ab3d732ad50a4fbd04a4490ef08acd0517b6ae6b77eb967251f4c263011a990d"}, + {file = "websockets-12.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a1d9697f3337a89691e3bd8dc56dea45a6f6d975f92e7d5f773bc715c15dde28"}, + {file = "websockets-12.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1df2fbd2c8a98d38a66f5238484405b8d1d16f929bb7a33ed73e4801222a6f53"}, + {file = "websockets-12.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:23509452b3bc38e3a057382c2e941d5ac2e01e251acce7adc74011d7d8de434c"}, + {file = "websockets-12.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e5fc14ec6ea568200ea4ef46545073da81900a2b67b3e666f04adf53ad452ec"}, + {file = "websockets-12.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:46e71dbbd12850224243f5d2aeec90f0aaa0f2dde5aeeb8fc8df21e04d99eff9"}, + {file = "websockets-12.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:b81f90dcc6c85a9b7f29873beb56c94c85d6f0dac2ea8b60d995bd18bf3e2aae"}, + {file = "websockets-12.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:a02413bc474feda2849c59ed2dfb2cddb4cd3d2f03a2fedec51d6e959d9b608b"}, + {file = "websockets-12.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:bbe6013f9f791944ed31ca08b077e26249309639313fff132bfbf3ba105673b9"}, + {file = "websockets-12.0-cp39-cp39-win32.whl", hash = "sha256:cbe83a6bbdf207ff0541de01e11904827540aa069293696dd528a6640bd6a5f6"}, + {file = "websockets-12.0-cp39-cp39-win_amd64.whl", hash = "sha256:fc4e7fa5414512b481a2483775a8e8be7803a35b30ca805afa4998a84f9fd9e8"}, + {file = "websockets-12.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:248d8e2446e13c1d4326e0a6a4e9629cb13a11195051a73acf414812700badbd"}, + {file = "websockets-12.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f44069528d45a933997a6fef143030d8ca8042f0dfaad753e2906398290e2870"}, + {file = "websockets-12.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c4e37d36f0d19f0a4413d3e18c0d03d0c268ada2061868c1e6f5ab1a6d575077"}, + {file = "websockets-12.0-pp310-pypy310_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d829f975fc2e527a3ef2f9c8f25e553eb7bc779c6665e8e1d52aa22800bb38b"}, + {file = "websockets-12.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:2c71bd45a777433dd9113847af751aae36e448bc6b8c361a566cb043eda6ec30"}, + {file = "websockets-12.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:0bee75f400895aef54157b36ed6d3b308fcab62e5260703add87f44cee9c82a6"}, + {file = "websockets-12.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:423fc1ed29f7512fceb727e2d2aecb952c46aa34895e9ed96071821309951123"}, + {file = "websockets-12.0-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:27a5e9964ef509016759f2ef3f2c1e13f403725a5e6a1775555994966a66e931"}, + {file = "websockets-12.0-pp38-pypy38_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c3181df4583c4d3994d31fb235dc681d2aaad744fbdbf94c4802485ececdecf2"}, + {file = "websockets-12.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:b067cb952ce8bf40115f6c19f478dc71c5e719b7fbaa511359795dfd9d1a6468"}, + {file = "websockets-12.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:00700340c6c7ab788f176d118775202aadea7602c5cc6be6ae127761c16d6b0b"}, + {file = "websockets-12.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e469d01137942849cff40517c97a30a93ae79917752b34029f0ec72df6b46399"}, + {file = "websockets-12.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ffefa1374cd508d633646d51a8e9277763a9b78ae71324183693959cf94635a7"}, + {file = "websockets-12.0-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba0cab91b3956dfa9f512147860783a1829a8d905ee218a9837c18f683239611"}, + {file = "websockets-12.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:2cb388a5bfb56df4d9a406783b7f9dbefb888c09b71629351cc6b036e9259370"}, + {file = "websockets-12.0-py3-none-any.whl", hash = "sha256:dc284bbc8d7c78a6c69e0c7325ab46ee5e40bb4d50e494d8131a07ef47500e9e"}, + {file = "websockets-12.0.tar.gz", hash = "sha256:81df9cbcbb6c260de1e007e58c011bfebe2dafc8435107b0537f393dd38c8b1b"}, +] + [[package]] name = "win32-setctime" version = "1.1.0" @@ -2103,4 +2184,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "50ad61bb8ac78338df35cae7dae4c641b1f2625bc3cd2f27458440933aa0ceda" +content-hash = "dbef7a2b2a50c765a011782d84692f793974238510fa4d564da2bbb281964c3f" diff --git a/pyproject.toml b/pyproject.toml index 78ee698..b90aa8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ googleapis-common-protos = "^1.62.0" protobuf = "^4.25.3" deepdiff = "^6.7.1" loguru = "^0.7.2" +websockets = "^12.0" [tool.poetry.group.dev.dependencies] black = "^24.3.0" diff --git a/scripts/sync_to_blitz.sh b/scripts/sync_to_blitz.sh index 9ea22c7..fb6b700 100644 --- a/scripts/sync_to_blitz.sh +++ b/scripts/sync_to_blitz.sh @@ -48,9 +48,6 @@ echo "# local cache delete .." rm -r ./app/__pycache__ 2>/dev/null rm -r ./app/repositories/ln_impl/protos/__pycache__ 2>/dev/null rm -r ./app/repositories/ln_impl/__pycache__ 2>/dev/null -rm -r ./app/external/fastapi_versioning/__pycache__ 2>/dev/null -rm -r ./app/external/sse_starlette/__pycache__ 2>/dev/null -rm -r ./app/external/__pycache__ 2>/dev/null rm -r ./app/models/__pycache__ 2>/dev/null rm -r ./app/repositories/__pycache__ 2>/dev/null rm -r ./app/routers/__pycache__ 2>/dev/null