From 0955d7752c8a1952611c36097c80cad519025cde Mon Sep 17 00:00:00 2001 From: Umesh Date: Thu, 26 Oct 2023 18:13:18 -0500 Subject: [PATCH] Add debounced node status updates. Closes #288 --- .../engine/worker/http_client_service.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/chimerapy/engine/worker/http_client_service.py b/chimerapy/engine/worker/http_client_service.py index 26e2a68..9ae66e9 100644 --- a/chimerapy/engine/worker/http_client_service.py +++ b/chimerapy/engine/worker/http_client_service.py @@ -50,6 +50,9 @@ def __init__( # Services self.http_client = aiohttp.ClientSession() + # Debounce + self._debounce_delay = 500 # ms + self._update_task: Optional[asyncio.Task] = None async def async_init(self): @@ -60,7 +63,7 @@ async def async_init(self): ), "WorkerState.changed": TypedObserver( "WorkerState.changed", - on_asend=self._async_node_status_update, + on_asend=self._debounced_async_node_status_update, handle_event="drop", ), "send_archive": TypedObserver( @@ -338,8 +341,23 @@ async def _send_archive_remotely(self, host: str, port: int) -> bool: return False - async def _async_node_status_update(self) -> bool: + async def _debounced_async_node_status_update(self) -> bool: + if not self.connected_to_manager: + return False + + # Debounce + if self._update_task is not None: + self._update_task.cancel() + + self._update_task = asyncio.ensure_future(self._delayed_node_status_update()) + return True + + async def _delayed_node_status_update(self) -> bool: + await asyncio.sleep(self._debounce_delay / 1000) + return await self._async_node_status_update() + + async def _async_node_status_update(self) -> bool: if not self.connected_to_manager: return False @@ -348,6 +366,6 @@ async def _async_node_status_update(self) -> bool: self.manager_url + "/workers/node_status", data=self.state.to_json() ) as resp: return resp.ok - except aiohttp.client_exceptions.ClientOSError: + except Exception: self.logger.error(traceback.format_exc()) return False