From 68187be091e6bce7f45a9a21e7021aa1177bbbf7 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 01:27:49 -0500 Subject: [PATCH 01/13] CollectArtifactsV2 --- chimerapy/engine/chimerapyrc.yaml | 3 + .../manager/artifacts_collector_service.py | 116 ++++++++++++++++++ chimerapy/engine/manager/manager.py | 15 +++ .../engine/manager/worker_handler_service.py | 10 ++ .../engine/worker/http_server_service.py | 74 ++++++++++- 5 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 chimerapy/engine/manager/artifacts_collector_service.py diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index 7f826f48..a52a9a9a 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -39,3 +39,6 @@ config: deque-length: 10000 interval: 10 logging-enabled: false + file-transfer: + chunk-size: 250000 # bytes + max-chunks: 1 # Number of chunks to send at once diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py new file mode 100644 index 00000000..83f95851 --- /dev/null +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -0,0 +1,116 @@ +import asyncio +import json +import logging +import pathlib +from typing import Any, Dict, Optional + +import aiofiles +import aiohttp +import aioshutil +from aiohttp import ClientSession +from tqdm import tqdm + +from chimerapy.engine import config +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.states import ManagerState, NodeState +from chimerapy.engine.utils import async_waiting_for +import zmq +from zmq.asyncio import Context +from rich.progress import Progress + +PIPELINE = 1 + +CHUNK_SIZE = 250000 + + +async def client_task(context, ip, filename: pathlib.Path, expected_size): + dealer = context.socket(zmq.DEALER) + dealer.sndhwm = dealer.rcvhwm = PIPELINE + dealer.connect(f"tcp://{ip}:6000") + + f = open(filename, "wb") + credit = PIPELINE + + total = 0 + chunks = 0 + offset = 0 + seq_no = 0 + + # Create a progress bar + human_size = round(expected_size / 1024 / 1024, 2) + + with Progress() as progress: + task = progress.add_task(f"[cyan]Downloading ({filename.name}-{human_size}MB...)", total=100) + while True: + while credit: + await dealer.send_multipart([b"fetch", b"%i" % offset, b"%i" % CHUNK_SIZE, b"%i" % seq_no]) + offset += CHUNK_SIZE + seq_no += 1 + credit -= 1 + + try: + chunk, seq_no_recv_str = await dealer.recv_multipart() + seq_no_recv = int(seq_no_recv_str) + f.write(chunk) + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise + + chunks += 1 + credit += 1 + size = len(chunk) + total += size + # print(f"Receiving {total} {offset} {seq_no_recv} {size}") + progress.update(task, completed=(total / expected_size) * 100) + if size < CHUNK_SIZE: + f.close() + break + + + +class ArtifactsCollector: + """A utility class to collect artifacts recorded by the nodes.""" + + def __init__( + self, + state: ManagerState, + worker_id: str, + parent_logger: Optional[logging.Logger] = None, + unzip: bool = False, + ): + worker_state = state.workers[worker_id] + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + + self.logger = fork( + parent_logger, + f"ArtifactsCollector-[Worker({worker_state.name})]", + ) + + self.state = state + self.worker_id = worker_id + self.base_url = ( + f"http://{self.state.workers[self.worker_id].ip}:" + f"{self.state.workers[self.worker_id].port}" + ) + self.unzip = unzip + + async def _artifact_info(self, session: aiohttp.ClientSession): + self.logger.info(f"Requesting artifact info from {self.base_url}") + async with session.post("/nodes/request_collect") as resp: + if resp.ok: + data = await resp.json() + return data["zip_path"], data["port"], data["size"] + else: + raise Exception(f"Request failed: {resp.status} {resp.reason}") + + async def collect(self): + client_session = ClientSession(base_url=self.base_url) + zip_path, port, size = await self._artifact_info(client_session) + print(pathlib.Path(zip_path).name, self.state.logdir) + save_path = self.state.logdir / pathlib.Path(zip_path).name + self.logger.info(f"Downloading {zip_path} from {self.base_url}") + await client_task(Context(), self.state.workers[self.worker_id].ip, save_path, size) + diff --git a/chimerapy/engine/manager/manager.py b/chimerapy/engine/manager/manager.py index 2b9b6552..2c003504 100644 --- a/chimerapy/engine/manager/manager.py +++ b/chimerapy/engine/manager/manager.py @@ -336,6 +336,9 @@ async def async_stop(self) -> bool: async def async_collect(self) -> bool: return await self.worker_handler.collect() + async def async_collect_v2(self) -> bool: + return await self.worker_handler.collect_v2() + async def async_reset(self, keep_workers: bool = True): return await self.worker_handler.reset(keep_workers) @@ -479,6 +482,18 @@ def collect(self) -> Future[bool]: """ return self._exec_coro(self.async_collect()) + def collect_v2(self) -> Future[bool]: + """Collect data from the Workers + + First, we wait until all the Nodes have finished save their data.\ + Then, manager request that Nodes' from the Workers. + + Returns: + Future[bool]: Future of success in collect data from Workers + + """ + return self._exec_coro(self.async_collect_v2()) + def reset( self, keep_workers: bool = True, blocking: bool = True ) -> Union[bool, Future[bool]]: diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index e70382e2..829ee2b4 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -824,6 +824,16 @@ async def collect(self) -> bool: await self.eventbus.asend(Event("save_meta")) return all(results) + async def collect_v2(self): + from .artifacts_collector_service import ArtifactsCollector + for worker_id in self.state.workers: + artifacts_collector = ArtifactsCollector( + worker_id=worker_id, + state=self.state, + parent_logger=logger, + ) + await artifacts_collector.collect() + async def reset(self, keep_workers: bool = True): # Destroy Nodes safely diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index 8eaba1d3..9008bc6a 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -3,9 +3,12 @@ import logging import pathlib import pickle -from typing import Dict, List +from typing import Dict, List, Tuple +import zmq +import zmq.asyncio from aiohttp import web +import os from ..data_protocols import ( NodeDiagnostics, @@ -30,6 +33,47 @@ UpdateGatherEvent, UpdateResultsEvent, ) +from chimerapy.engine import config + + +class ZMQFileServer: + def __init__(self, ctx: zmq.asyncio.Context, host="*", port=6000): + self.host = host + self.port = port + self.ctx = ctx + self.socket = None + + async def mount(self, file: pathlib.Path): + file = open(file, "rb") + router = self.ctx.socket(zmq.ROUTER) + + router.sndhwm = router.rcvhwm = config.get("file-transfer.max-chunks") + router.bind("tcp://*:6000") + + while True: + try: + msg = await router.recv_multipart() + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise + + identity, command, offset_str, chunksz_str, seq_nostr = msg + + assert command == b"fetch" + offset = int(offset_str) + chunksz = int(chunksz_str) + seq_no = int(seq_nostr) + file.seek(offset, os.SEEK_SET) + data = file.read(chunksz) + print(f"sending {offset} {chunksz} {seq_no} {len(data)}") + + if not data: + await asyncio.sleep(5) + break + + await router.send_multipart([identity, data, b"%i" % seq_no]) class HttpServerService(Service): @@ -60,6 +104,7 @@ def __init__( web.get("/nodes/pub_table", self._async_get_node_pub_table), web.post("/nodes/pub_table", self._async_process_node_pub_table), web.get("/nodes/gather", self._async_report_node_gather), + web.post("/nodes/request_collect", self._async_request_collect), web.post("/nodes/collect", self._async_collect), web.post("/nodes/step", self._async_step_route), web.post("/nodes/start", self._async_start_nodes_route), @@ -290,6 +335,33 @@ async def _async_collect(self, request: web.Request) -> web.Response: asyncio.create_task(self._collect_and_send(pathlib.Path(data["path"]))) return web.HTTPOk() + def _have_nodes_saved(self): + node_fsm = map(lambda node: node.fsm, self.state.nodes.values()) + return all(map(lambda fsm: fsm == "SAVED", node_fsm)) + + async def _async_request_collect(self, request: web.Request) -> web.Response: + print("Collecting") + await self.eventbus.asend(Event("collect")) + from chimerapy.engine.utils import async_waiting_for + await async_waiting_for(self._have_nodes_saved, timeout=10) + print("Starting File Transfer Server") + path = pathlib.Path(self.state.tempfolder) + zip_path, port = await self._start_file_transfer_server(path) + return web.json_response({ + "zip_path": zip_path, + "port": port, + "size": os.path.getsize(zip_path) + }) + + async def _start_file_transfer_server(self, path: pathlib.Path) -> Tuple[pathlib.Path, int]: + # Start file transfer server + context = zmq.asyncio.Context() + server = ZMQFileServer(context) + import aioshutil + zip_path = await aioshutil.make_archive(path, "zip", path.parent, path.name) + asyncio.create_task(server.mount(zip_path)) + return zip_path, server.port + async def _async_diagnostics_route(self, request: web.Request) -> web.Response: data = await request.json() From f7a8f10a4844a2d42c1a16e66b8e7187aaeb11a8 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 04:07:34 -0500 Subject: [PATCH 02/13] Cleanups --- .../manager/artifacts_collector_service.py | 165 ++++++++++++------ chimerapy/engine/manager/manager.py | 4 +- .../engine/manager/worker_handler_service.py | 18 +- chimerapy/engine/utils.py | 15 ++ .../engine/worker/http_server_service.py | 72 +++++--- 5 files changed, 187 insertions(+), 87 deletions(-) diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py index 83f95851..5670a7d7 100644 --- a/chimerapy/engine/manager/artifacts_collector_service.py +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -1,35 +1,39 @@ -import asyncio import json import logging +import os import pathlib -from typing import Any, Dict, Optional +from typing import Optional import aiofiles import aiohttp import aioshutil +import zmq from aiohttp import ClientSession -from tqdm import tqdm +from rich.progress import Progress +from zmq.asyncio import Context from chimerapy.engine import config from chimerapy.engine._logger import fork, getLogger -from chimerapy.engine.states import ManagerState, NodeState -from chimerapy.engine.utils import async_waiting_for -import zmq -from zmq.asyncio import Context -from rich.progress import Progress - -PIPELINE = 1 - -CHUNK_SIZE = 250000 - - -async def client_task(context, ip, filename: pathlib.Path, expected_size): +from chimerapy.engine.states import ManagerState +from chimerapy.engine.utils import get_ip_address + + +async def download_task( + context: zmq.Context, + ip: str, + port: int, + filename: pathlib.Path, + expected_size, + progress=None, +): + """Download a file from a worker.""" dealer = context.socket(zmq.DEALER) - dealer.sndhwm = dealer.rcvhwm = PIPELINE - dealer.connect(f"tcp://{ip}:6000") + dealer.sndhwm = dealer.rcvhwm = config.get("file-transfer.max-chunks") + dealer.connect(f"tcp://{ip}:{port}") - f = open(filename, "wb") - credit = PIPELINE + f = await aiofiles.open(filename, "wb") + credit = config.get("file-transfer.max-chunks") + chunk_size = config.get("file-transfer.chunk-size") total = 0 chunks = 0 @@ -38,36 +42,41 @@ async def client_task(context, ip, filename: pathlib.Path, expected_size): # Create a progress bar human_size = round(expected_size / 1024 / 1024, 2) + update_task = None + if progress: + update_task = progress.add_task( + f"[cyan]Downloading ({filename.name}-{human_size}MB...)", total=100 + ) - with Progress() as progress: - task = progress.add_task(f"[cyan]Downloading ({filename.name}-{human_size}MB...)", total=100) - while True: - while credit: - await dealer.send_multipart([b"fetch", b"%i" % offset, b"%i" % CHUNK_SIZE, b"%i" % seq_no]) - offset += CHUNK_SIZE - seq_no += 1 - credit -= 1 + while True: + while credit: + await dealer.send_multipart( + [b"fetch", b"%i" % offset, b"%i" % chunk_size, b"%i" % seq_no] + ) + offset += chunk_size + seq_no += 1 + credit -= 1 + + try: + chunk, seq_no_recv_str = await dealer.recv_multipart() + await f.write(chunk) + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise - try: - chunk, seq_no_recv_str = await dealer.recv_multipart() - seq_no_recv = int(seq_no_recv_str) - f.write(chunk) - except zmq.ZMQError as e: - if e.errno == zmq.ETERM: - return - else: - raise - - chunks += 1 - credit += 1 - size = len(chunk) - total += size - # print(f"Receiving {total} {offset} {seq_no_recv} {size}") - progress.update(task, completed=(total / expected_size) * 100) - if size < CHUNK_SIZE: - f.close() - break + chunks += 1 + credit += 1 + size = len(chunk) + total += size + if update_task: + progress.update(update_task, completed=(total / expected_size) * 100) + + if size < chunk_size: + await f.close() + break class ArtifactsCollector: @@ -79,6 +88,7 @@ def __init__( worker_id: str, parent_logger: Optional[logging.Logger] = None, unzip: bool = False, + progressbar: Optional[Progress] = None, ): worker_state = state.workers[worker_id] if parent_logger is None: @@ -96,21 +106,68 @@ def __init__( f"{self.state.workers[self.worker_id].port}" ) self.unzip = unzip + self.progressbar = progressbar async def _artifact_info(self, session: aiohttp.ClientSession): self.logger.info(f"Requesting artifact info from {self.base_url}") - async with session.post("/nodes/request_collect") as resp: + data = { + "initiate_remote_transfer": get_ip_address() + != self.state.workers[self.worker_id].ip + } + + async with session.post( + "/nodes/request_collect", + data=json.dumps(data), + ) as resp: if resp.ok: data = await resp.json() return data["zip_path"], data["port"], data["size"] else: - raise Exception(f"Request failed: {resp.status} {resp.reason}") + # FixMe: Handle this error properly + raise ConnectionError( + f"Artifacts Collection Failed: {resp.status} {resp.reason}" + ) - async def collect(self): + async def collect(self) -> bool: client_session = ClientSession(base_url=self.base_url) - zip_path, port, size = await self._artifact_info(client_session) - print(pathlib.Path(zip_path).name, self.state.logdir) - save_path = self.state.logdir / pathlib.Path(zip_path).name - self.logger.info(f"Downloading {zip_path} from {self.base_url}") - await client_task(Context(), self.state.workers[self.worker_id].ip, save_path, size) + try: + zip_path, port, size = await self._artifact_info(client_session) + except Exception as e: + self.logger.error(f"Failed to get artifact info: {e}") + return False + save_name = f"{self.state.workers[self.worker_id].name}_{self.worker_id[:8]}" + zip_save_path = self.state.logdir / f"{save_name}.zip" + if port is not None: + try: + await download_task( + Context(), + self.state.workers[self.worker_id].ip, + int(port), + zip_save_path, + size, + self.progressbar, + ) + except Exception as e: + self.logger.error(f"Failed to download artifacts: {e}") + return False + else: + self.logger.info(f"Copying {zip_path} to {zip_save_path}") + try: + await aioshutil.copyfile(zip_path, self.state.logdir / zip_save_path) + except Exception as e: + self.logger.error(f"Failed to copy artifacts: {e}") + return False + if not self.unzip: + self.logger.info(f"Unzipping {zip_save_path}") + try: + await aioshutil.unpack_archive( + zip_save_path, self.state.logdir / save_name + ) + self.logger.info(f"Removing {zip_save_path}") + os.remove(zip_save_path) + except Exception as e: + self.logger.error(f"Failed to unzip artifacts: {e}") + return False + + return True diff --git a/chimerapy/engine/manager/manager.py b/chimerapy/engine/manager/manager.py index 2c003504..efce38dc 100644 --- a/chimerapy/engine/manager/manager.py +++ b/chimerapy/engine/manager/manager.py @@ -336,8 +336,8 @@ async def async_stop(self) -> bool: async def async_collect(self) -> bool: return await self.worker_handler.collect() - async def async_collect_v2(self) -> bool: - return await self.worker_handler.collect_v2() + async def async_collect_v2(self, unzip=False) -> bool: + return await self.worker_handler.collect_v2(unzip) async def async_reset(self, keep_workers: bool = True): return await self.worker_handler.reset(keep_workers) diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index 829ee2b4..1716e7e4 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -13,6 +13,7 @@ import networkx as nx from chimerapy.engine import _logger, config +from chimerapy.engine.utils import get_progress_bar from ..data_protocols import NodePubTable from ..eventbus import Event, EventBus, TypedObserver, make_evented @@ -824,15 +825,28 @@ async def collect(self) -> bool: await self.eventbus.asend(Event("save_meta")) return all(results) - async def collect_v2(self): + async def collect_v2(self, unzip: bool = False) -> bool: from .artifacts_collector_service import ArtifactsCollector + + futures = [] + progress_bar = get_progress_bar() + progress_bar.start() for worker_id in self.state.workers: artifacts_collector = ArtifactsCollector( worker_id=worker_id, state=self.state, parent_logger=logger, + unzip=unzip, + progressbar=progress_bar, ) - await artifacts_collector.collect() + future = asyncio.ensure_future(artifacts_collector.collect()) + futures.append(future) + + logger.info("Collecting artifacts from workers...") + results = await asyncio.gather(*futures) + progress_bar.stop() + + return all(results) async def reset(self, keep_workers: bool = True): diff --git a/chimerapy/engine/utils.py b/chimerapy/engine/utils.py index 92c42d74..512e23ca 100644 --- a/chimerapy/engine/utils.py +++ b/chimerapy/engine/utils.py @@ -10,6 +10,8 @@ from concurrent.futures import Future from typing import Any, Callable, Coroutine, Dict, Optional, Tuple, Union +from rich.progress import BarColumn, Progress, TimeElapsedColumn + # Third-party # Internal from chimerapy.engine import _logger @@ -19,6 +21,19 @@ BYTES_PER_MB = 1024 * 1024 +def get_progress_bar() -> Progress: + """Get a progress bar.""" + columns = [ + "[progress.description]{task.description}", + "[progress.percentage]{task.percentage:>3.0f}%", + BarColumn(bar_width=None), + TimeElapsedColumn(), + ] + + progress = Progress(*columns) + return progress + + def clear_queue(input_queue: queue.Queue): """Clear a queue. Args: diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index 9008bc6a..ffd8e171 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -1,14 +1,17 @@ import asyncio import enum import logging +import os import pathlib import pickle -from typing import Dict, List, Tuple +from typing import Dict, List import zmq import zmq.asyncio from aiohttp import web -import os + +from chimerapy.engine import config +from chimerapy.engine.utils import async_waiting_for from ..data_protocols import ( NodeDiagnostics, @@ -33,23 +36,25 @@ UpdateGatherEvent, UpdateResultsEvent, ) -from chimerapy.engine import config class ZMQFileServer: - def __init__(self, ctx: zmq.asyncio.Context, host="*", port=6000): + def __init__(self, ctx: zmq.asyncio.Context, host="*"): self.host = host - self.port = port self.ctx = ctx - self.socket = None + self.router = None - async def mount(self, file: pathlib.Path): - file = open(file, "rb") + async def ainit(self): router = self.ctx.socket(zmq.ROUTER) - router.sndhwm = router.rcvhwm = config.get("file-transfer.max-chunks") - router.bind("tcp://*:6000") - + port = router.bind_to_random_port(f"tcp://{self.host}", max_tries=100) + self.router = router + return port + + async def mount(self, file_path: pathlib.Path): + file = open(file_path, "rb") + assert self.router is not None + router = self.router while True: try: msg = await router.recv_multipart() @@ -67,7 +72,6 @@ async def mount(self, file: pathlib.Path): seq_no = int(seq_nostr) file.seek(offset, os.SEEK_SET) data = file.read(chunksz) - print(f"sending {offset} {chunksz} {seq_no} {len(data)}") if not data: await asyncio.sleep(5) @@ -336,31 +340,41 @@ async def _async_collect(self, request: web.Request) -> web.Response: return web.HTTPOk() def _have_nodes_saved(self): - node_fsm = map(lambda node: node.fsm, self.state.nodes.values()) - return all(map(lambda fsm: fsm == "SAVED", node_fsm)) + node_fsm = (node.fsm for node in self.state.nodes.values()) + return all(fsm == "SAVED" for fsm in node_fsm) async def _async_request_collect(self, request: web.Request) -> web.Response: - print("Collecting") + data = await request.json() await self.eventbus.asend(Event("collect")) - from chimerapy.engine.utils import async_waiting_for await async_waiting_for(self._have_nodes_saved, timeout=10) - print("Starting File Transfer Server") + + initiate_remote_transfer = data.get("initiate_remote_transfer", True) path = pathlib.Path(self.state.tempfolder) - zip_path, port = await self._start_file_transfer_server(path) - return web.json_response({ - "zip_path": zip_path, - "port": port, - "size": os.path.getsize(zip_path) - }) - - async def _start_file_transfer_server(self, path: pathlib.Path) -> Tuple[pathlib.Path, int]: + zip_path = await self._zip_direcotry(path) + port = None + + if initiate_remote_transfer: + self.logger.debug("Starting File Transfer Server") + port = await self._start_file_transfer_server(zip_path) + self.logger.info(f"File Transfer Server Started at {port}") + + return web.json_response( + {"zip_path": str(zip_path), "port": port, "size": os.path.getsize(zip_path)} + ) + + async def _zip_direcotry(self, path: pathlib.Path) -> pathlib.Path: + import aioshutil + + zip_path = await aioshutil.make_archive(path, "zip", path.parent, path.name) + return zip_path + + async def _start_file_transfer_server(self, path: pathlib.Path) -> int: # Start file transfer server context = zmq.asyncio.Context() server = ZMQFileServer(context) - import aioshutil - zip_path = await aioshutil.make_archive(path, "zip", path.parent, path.name) - asyncio.create_task(server.mount(zip_path)) - return zip_path, server.port + port = await server.ainit() + self.tasks.append(asyncio.create_task(server.mount(path))) + return port async def _async_diagnostics_route(self, request: web.Request) -> web.Response: data = await request.json() From b757d2fbea567af5552b4fb9016119e6bd826ea2 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 05:13:03 -0500 Subject: [PATCH 03/13] change if not unzip to if unzip --- chimerapy/engine/manager/artifacts_collector_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py index 5670a7d7..d38e255a 100644 --- a/chimerapy/engine/manager/artifacts_collector_service.py +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -158,7 +158,7 @@ async def collect(self) -> bool: self.logger.error(f"Failed to copy artifacts: {e}") return False - if not self.unzip: + if self.unzip: self.logger.info(f"Unzipping {zip_save_path}") try: await aioshutil.unpack_archive( From 862d53ef00b1c759dcf709a3044e7fcef9edca43 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 19:37:50 -0500 Subject: [PATCH 04/13] Add random file creator pipeline --- test/file_transfer/.gitignore | 1 + .../configs/random-file-creator.json | 73 +++++++++++++++++++ test/file_transfer/ft/__init__.py | 7 ++ test/file_transfer/ft/file_creator.py | 43 +++++++++++ test/file_transfer/setup.py | 16 ++++ 5 files changed, 140 insertions(+) create mode 100644 test/file_transfer/.gitignore create mode 100644 test/file_transfer/configs/random-file-creator.json create mode 100644 test/file_transfer/ft/__init__.py create mode 100644 test/file_transfer/ft/file_creator.py create mode 100644 test/file_transfer/setup.py diff --git a/test/file_transfer/.gitignore b/test/file_transfer/.gitignore new file mode 100644 index 00000000..01c9b85c --- /dev/null +++ b/test/file_transfer/.gitignore @@ -0,0 +1 @@ +cp-logs \ No newline at end of file diff --git a/test/file_transfer/configs/random-file-creator.json b/test/file_transfer/configs/random-file-creator.json new file mode 100644 index 00000000..b909f49f --- /dev/null +++ b/test/file_transfer/configs/random-file-creator.json @@ -0,0 +1,73 @@ +{ + "mode": "record", + "runtime": 25, + "workers": { + "manager_ip": "129.59.104.153", + "manager_port": 9001, + "instances": [ + { + "name": "remote-mac", + "id": "remote-mac", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + }, + { + "name": "local", + "id": "local", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + }, + { + "name": "remote-win", + "id": "remote-win", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + } + ] + }, + "nodes": [ + { + "registry_name": "FileCreator", + "name": "file-creator-win", + "kwargs": { + "filename": "random-win.bin", + "per_step_mb": 4 + }, + "package": "ft" + }, + { + "registry_name": "FileCreator", + "name": "file-creator-mac", + "kwargs": { + "filename": "random-mac.bin", + "per_step_mb": 4 + }, + "package": "ft" + }, + { + "registry_name": "FileCreator", + "name": "file-creator-local", + "kwargs": { + "filename": "random-local.bin", + "per_step_mb": 4 + }, + "package": "ft" + } + ], + "adj": [], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": [ + "file-creator-local" + ], + "remote-mac": [ + "file-creator-mac" + ], + "remote-win": [ + "file-creator-win" + ] + } +} diff --git a/test/file_transfer/ft/__init__.py b/test/file_transfer/ft/__init__.py new file mode 100644 index 00000000..bfea4a71 --- /dev/null +++ b/test/file_transfer/ft/__init__.py @@ -0,0 +1,7 @@ +def register_nodes_metadata(): + return { + "description": "Test File Upload Nodes", + "nodes": [ + "ft.file_creator:FileCreator" + ] + } diff --git a/test/file_transfer/ft/file_creator.py b/test/file_transfer/ft/file_creator.py new file mode 100644 index 00000000..a0539ff9 --- /dev/null +++ b/test/file_transfer/ft/file_creator.py @@ -0,0 +1,43 @@ +import os +import time +import json +import hashlib + +from chimerapy.orchestrator import sink_node +from chimerapy.engine.node import Node + + +@sink_node(name="FileCreator", add_to_registry=False) +class FileCreator(Node): + """A utility class to upload files to the manager.""" + def __init__(self, filename, per_step_mb = 1, name="FileUploader"): + super().__init__(name=name) + self.per_step_mb = per_step_mb + self.filename = filename + self.file = None + + def step(self) -> None: + if self.file is None: + self.file = open(self.state.logdir / self.filename, "wb") + + self.file.write(os.urandom(self.per_step_mb * 1024 * 1024)) + time.sleep(1.0) + self.logger.info(f"Written {self.per_step_mb}MB to {self.filename}") + if self.state.fsm == "STOPPED": + self.file.close() + self.calculate_md5() + + def calculate_md5(self): + md5 = hashlib.md5() + with open(self.state.logdir / self.filename, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + md5.update(chunk) + + summary = { + "filename": self.filename, + "md5": md5.hexdigest() + } + + with open(self.state.logdir / "summary.json", "w") as f: + json.dump(summary, f, indent=2) + diff --git a/test/file_transfer/setup.py b/test/file_transfer/setup.py new file mode 100644 index 00000000..0341db7f --- /dev/null +++ b/test/file_transfer/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, find_packages + +setup( + name="ft", + version="0.0.1", + packages=find_packages(), + install_requires=[ + "chimerapy-engine", + "chimerapy-orchestrator" + ], + entry_points={ + "chimerapy.orchestrator.nodes_registry": [ + "get_nodes_registry = ft:register_nodes_metadata" + ] + } +) \ No newline at end of file From 3adf05b844321d552b5f0372a127e0dbb5f19ac9 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 19:40:54 -0500 Subject: [PATCH 05/13] Generate 4 Megabytes --- test/file_transfer/configs/random-file-creator.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/file_transfer/configs/random-file-creator.json b/test/file_transfer/configs/random-file-creator.json index b909f49f..39e9203e 100644 --- a/test/file_transfer/configs/random-file-creator.json +++ b/test/file_transfer/configs/random-file-creator.json @@ -1,6 +1,6 @@ { "mode": "record", - "runtime": 25, + "runtime": 100, "workers": { "manager_ip": "129.59.104.153", "manager_port": 9001, @@ -31,7 +31,7 @@ "name": "file-creator-win", "kwargs": { "filename": "random-win.bin", - "per_step_mb": 4 + "per_step_mb": 10 }, "package": "ft" }, @@ -40,7 +40,7 @@ "name": "file-creator-mac", "kwargs": { "filename": "random-mac.bin", - "per_step_mb": 4 + "per_step_mb": 10 }, "package": "ft" }, @@ -49,7 +49,7 @@ "name": "file-creator-local", "kwargs": { "filename": "random-local.bin", - "per_step_mb": 4 + "per_step_mb": 10 }, "package": "ft" } From 6925431342b745500a7ed64ffe44db09801631a1 Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 20:40:57 -0500 Subject: [PATCH 06/13] Send Progressbar --- chimerapy/engine/chimerapyrc.yaml | 2 +- chimerapy/engine/worker/http_server_service.py | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index a52a9a9a..15e04166 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -41,4 +41,4 @@ config: logging-enabled: false file-transfer: chunk-size: 250000 # bytes - max-chunks: 1 # Number of chunks to send at once + max-chunks: 2 # Number of chunks to send at once diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index ffd8e171..68cfa261 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -6,12 +6,13 @@ import pickle from typing import Dict, List +import aiofiles import zmq import zmq.asyncio from aiohttp import web from chimerapy.engine import config -from chimerapy.engine.utils import async_waiting_for +from chimerapy.engine.utils import async_waiting_for, get_progress_bar from ..data_protocols import ( NodeDiagnostics, @@ -52,9 +53,17 @@ async def ainit(self): return port async def mount(self, file_path: pathlib.Path): - file = open(file_path, "rb") + file = await aiofiles.open(file_path, "rb") + progressbar = get_progress_bar() + size = os.path.getsize(file_path) + human_size = f"{size / 1024 / 1024:.2f} MB" assert self.router is not None router = self.router + task = None + if progressbar is not None: + task = progressbar.add_task( + f"Sending {file_path.name} {human_size}", total=100 + ) while True: try: msg = await router.recv_multipart() @@ -70,9 +79,12 @@ async def mount(self, file_path: pathlib.Path): offset = int(offset_str) chunksz = int(chunksz_str) seq_no = int(seq_nostr) - file.seek(offset, os.SEEK_SET) + await file.seek(offset, os.SEEK_SET) data = file.read(chunksz) + if task is not None: + progressbar.update(task, completed=(offset / size) * 100) + if not data: await asyncio.sleep(5) break From 1bcd55a014e65dedda11e3086bca4473c62095bf Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 18 Oct 2023 21:02:39 -0500 Subject: [PATCH 07/13] Log --- chimerapy/engine/worker/http_server_service.py | 17 +++++++++++------ ...ator.json => random-multi-file-creator.json} | 0 2 files changed, 11 insertions(+), 6 deletions(-) rename test/file_transfer/configs/{random-file-creator.json => random-multi-file-creator.json} (100%) diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index 68cfa261..2733fbb6 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -59,9 +59,9 @@ async def mount(self, file_path: pathlib.Path): human_size = f"{size / 1024 / 1024:.2f} MB" assert self.router is not None router = self.router - task = None + upload_task = None if progressbar is not None: - task = progressbar.add_task( + upload_task = progressbar.add_task( f"Sending {file_path.name} {human_size}", total=100 ) while True: @@ -82,8 +82,11 @@ async def mount(self, file_path: pathlib.Path): await file.seek(offset, os.SEEK_SET) data = file.read(chunksz) - if task is not None: - progressbar.update(task, completed=(offset / size) * 100) + if upload_task is not None: + print( + f"Sending {file_path.name} {human_size} {offset / size * 100:.2f}" + ) + progressbar.update(upload_task, completed=(offset / size) * 100) if not data: await asyncio.sleep(5) @@ -352,14 +355,15 @@ async def _async_collect(self, request: web.Request) -> web.Response: return web.HTTPOk() def _have_nodes_saved(self): - node_fsm = (node.fsm for node in self.state.nodes.values()) + node_fsm = list(node.fsm for node in self.state.nodes.values()) + print(node_fsm) return all(fsm == "SAVED" for fsm in node_fsm) async def _async_request_collect(self, request: web.Request) -> web.Response: data = await request.json() await self.eventbus.asend(Event("collect")) await async_waiting_for(self._have_nodes_saved, timeout=10) - + self.logger.info("All nodes saved") initiate_remote_transfer = data.get("initiate_remote_transfer", True) path = pathlib.Path(self.state.tempfolder) zip_path = await self._zip_direcotry(path) @@ -385,6 +389,7 @@ async def _start_file_transfer_server(self, path: pathlib.Path) -> int: context = zmq.asyncio.Context() server = ZMQFileServer(context) port = await server.ainit() + print("Initiated File Server") self.tasks.append(asyncio.create_task(server.mount(path))) return port diff --git a/test/file_transfer/configs/random-file-creator.json b/test/file_transfer/configs/random-multi-file-creator.json similarity index 100% rename from test/file_transfer/configs/random-file-creator.json rename to test/file_transfer/configs/random-multi-file-creator.json From 204724909dcb5a874df5791b5952aca4213aa76d Mon Sep 17 00:00:00 2001 From: Umesh Date: Thu, 19 Oct 2023 16:37:35 -0500 Subject: [PATCH 08/13] Refactor for event driven file uploads --- chimerapy/engine/networking/enums.py | 1 + .../engine/networking/file_transfer_client.py | 123 +++++++++++++++ .../engine/networking/file_transfer_server.py | 141 ++++++++++++++++++ chimerapy/engine/node/events.py | 12 +- chimerapy/engine/node/record_service.py | 23 ++- chimerapy/engine/node/worker_comms_service.py | 31 +++- chimerapy/engine/records/audio_record.py | 8 + chimerapy/engine/records/image_record.py | 14 +- chimerapy/engine/records/json_record.py | 8 + chimerapy/engine/records/record.py | 4 + chimerapy/engine/records/tabular_record.py | 8 + chimerapy/engine/records/text_record.py | 10 ++ chimerapy/engine/records/video_record.py | 8 + .../engine/worker/file_transfer_service.py | 34 +++++ .../engine/worker/http_server_service.py | 15 +- chimerapy/engine/worker/worker.py | 7 + 16 files changed, 442 insertions(+), 5 deletions(-) create mode 100644 chimerapy/engine/networking/file_transfer_client.py create mode 100644 chimerapy/engine/networking/file_transfer_server.py create mode 100644 chimerapy/engine/worker/file_transfer_service.py diff --git a/chimerapy/engine/networking/enums.py b/chimerapy/engine/networking/enums.py index 04b90152..17058d5c 100644 --- a/chimerapy/engine/networking/enums.py +++ b/chimerapy/engine/networking/enums.py @@ -36,3 +36,4 @@ class NODE_MESSAGE(Enum): REPORT_SAVING = 52 REPORT_RESULTS = 53 DIAGNOSTICS = 54 + ARTIFACTS = 55 diff --git a/chimerapy/engine/networking/file_transfer_client.py b/chimerapy/engine/networking/file_transfer_client.py new file mode 100644 index 00000000..c4ae04e7 --- /dev/null +++ b/chimerapy/engine/networking/file_transfer_client.py @@ -0,0 +1,123 @@ +import asyncio +from typing import Dict, Any + +import pathlib + +import aiofiles +import zmq.asyncio + +from chimerapy.engine.utils import get_progress_bar +import random + +class ZMQFileClient: + def __init__(self, + context: zmq.asyncio.Context, + host: str, + port: int, + outdir: pathlib.Path, + files: Dict[str, Any]): + self.host = host + self.port = port + self.context = context + self.outdir = outdir + self.files = files + self.socket = None + + async def async_init(self): + self.socket = self.context.socket(zmq.DEALER) + self.socket.sndhwm = self.socket.rcvhwm = 1 + self.socket.connect(f"tcp://{self.host}:{self.port}") + + + async def _download_task(self): + handles = {} + offsets = {} + progress_bar = get_progress_bar() + for name, details in self.files.items(): + fname = details["name"] + handles[name] = await aiofiles.open(self.outdir / fname, "wb") + offsets[name] = 0 + + download_tasks = {} + + credit = 1 + chunk_size = 250000 + completed = [] + + total = 0 + seq_no = 0 + + progress_bar.start() + while True: + while credit: + selected_file = random.choice(list(offsets.keys())) + await self.socket.send_multipart( + [b"fetch", selected_file.encode(), b"%i" % offsets[selected_file], b"%i" % chunk_size, b"%i" % seq_no] + ) + offsets[selected_file] = offsets[selected_file] + chunk_size + seq_no += 1 + credit -= 1 + try: + filename, chunk, seq_no_recv_str = await self.socket.recv_multipart() + filename_str = filename.decode() + + if filename_str not in download_tasks: + download_tasks[filename_str] = progress_bar.add_task(f"Downloading({filename_str})", total=100) + + complete = (offsets[filename_str] / self.files[filename_str]["size"]) * 100 + progress_bar.update(download_tasks[filename_str], completed=complete) + await handles[filename_str].write(chunk) + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise e + + credit += 1 + size = len(chunk) + total += size + if size < chunk_size: + completed.append(filename_str) + progress_bar.update(download_tasks[filename_str], completed=100) + del offsets[filename_str] + + if len(completed) == len(self.files): + for name, handle in handles.items(): + await handle.close() + + progress_bar.stop() + break + + +async def main(): + context = zmq.asyncio.Context() + host = "localhost" + port = 6000 + + outdir = pathlib.Path("dst") + files = { + "test1": { + "name": "test1.mp4", + "size": 31376170 + }, + "test2": { + "name": "test2.mp4", + "size": 36633129, + }, + "test3": { + "name": "test3.mp4", + "size": 39156488 + }, + "test4": { + "name": "test4.mp4", + "size": 33941417 + } + } + + client = ZMQFileClient(context, host, port, outdir, files) + await client.async_init() + await client._download_task() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/chimerapy/engine/networking/file_transfer_server.py b/chimerapy/engine/networking/file_transfer_server.py new file mode 100644 index 00000000..e6cc6c0f --- /dev/null +++ b/chimerapy/engine/networking/file_transfer_server.py @@ -0,0 +1,141 @@ +import asyncio +import os +import pathlib +import aiofiles +from chimerapy.engine.utils import get_progress_bar +import zmq +from typing import Optional, List, Dict, Any +import zmq.asyncio + + +class ZMQFileChunk: + def __init__(self, data: bytes): + self.data = data + + def write_into(self, handle): + handle.write(self.data) + + async def awrite_into(self, ahandle): + await ahandle.write(self.data) + + @classmethod + def from_bytes(cls, data): + return ZMQFileChunk(data=data) + + @classmethod + def read_from(cls, handle, offset, chunk_size): + handle.seek(offset, os.SEEK_SET) + data = handle.read(chunk_size) + return cls.from_bytes(data=data) + + @classmethod + async def aread_from(cls, ahandle, offset, chunk_size): + await ahandle.seek(offset, os.SEEK_SET) + data = await ahandle.read(chunk_size) + return cls.from_bytes(data=data) + + +class ZMQFileServer: + def __init__(self, context: zmq.asyncio.Context, paths, host="*", port=0): + self.context = context + self.host = host + self.port = port + self.paths = paths + self.handles = {} + self.socket: Optional[zmq.Socket] = None + self.send_task: Optional[asyncio.Task] = None + + async def async_init(self): + await self._initialize_file_handlers() + self.socket = self.context.socket(zmq.ROUTER) + self.socket.sndhwm = self.socket.rcvhwm = 1 + if self.port != 0: + self.socket.bind(f"tcp://{self.host}:{self.port}") + else: + self.port = self.socket.bind_to_random_port(f"tcp://{self.host}") + + self.send_task = asyncio.create_task(self._send()) + + async def _initialize_file_handlers(self): + for name, file in self.paths.items(): + assert file.exists() + assert file.is_file() + self.handles[name] = await aiofiles.open(file, mode="rb") + + async def _send(self): + assert self.socket is not None + router = self.socket + progress_bar = get_progress_bar() + progress_bar.start() + upload_tasks = {} + + for name, file in self.paths.items(): + upload_tasks[name] = progress_bar.add_task(f"Uploading({name})", total=100) + uploaded = set() + while True: + try: + msg = await router.recv_multipart() + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise + + identity, command, fname, offset_str, chunksz_str, seq_nostr = msg + fname_str = fname.decode() + if fname_str in self.handles: + assert command == b"fetch" + offset = int(offset_str) + chunksz = int(chunksz_str) + seq_no = int(seq_nostr) + handle = self.handles[fname_str] + zmq_chunk = await ZMQFileChunk.aread_from( + ahandle=handle, offset=offset, chunk_size=chunksz + ) + + data = zmq_chunk.data + if not data: + progress_bar.update(upload_tasks[fname_str], completed=100) + uploaded.add(fname_str) + print(f"Uploaded {fname_str}. {uploaded}") + if uploaded == set(self.paths.keys()): + progress_bar.stop() + break + continue + else: + await router.send_multipart([identity, fname, data, b"%i" % seq_no]) + completed = (await handle.tell() / os.path.getsize(self.paths[fname_str])) * 100 + progress_bar.update(upload_tasks[fname_str], completed=completed) + + + async def ashutdown(self): + if self.send_task is not None: + self.send_task.cancel() + await self.send_task + self.send_task = None + + if self.socket is not None: + self.socket.close() + self.socket = None + + for handle in self.handles.values(): + await handle.close() + + self.handles = {} + + +async def main(): + files = { + "test1": pathlib.Path("src/test1.mp4"), + "test2": pathlib.Path("src/test2.mp4"), + "test3": pathlib.Path("src/test3.mp4"), + "test4": pathlib.Path("src/test4.mp4") + } + server = ZMQFileServer(context=zmq.asyncio.Context(), paths=files, port=6000) + await server.async_init() + await server.send_task + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/chimerapy/engine/node/events.py b/chimerapy/engine/node/events.py index 2f590012..8c5a9ed0 100644 --- a/chimerapy/engine/node/events.py +++ b/chimerapy/engine/node/events.py @@ -1,5 +1,6 @@ +import pathlib from dataclasses import dataclass -from typing import Any, Dict +from typing import Any, Dict, Optional from ..data_protocols import NodeDiagnostics, NodePubTable from ..networking.client import Client @@ -41,3 +42,12 @@ class GatherEvent: @dataclass class DiagnosticsReportEvent: # diagnostics_report diagnostics: NodeDiagnostics + + +@dataclass +class Artifact: + name: str + path: pathlib.Path + mime_type: str + size: Optional[int] = None + glob: Optional[str] = None diff --git a/chimerapy/engine/node/record_service.py b/chimerapy/engine/node/record_service.py index 4769fbf3..3f3c4ab1 100644 --- a/chimerapy/engine/node/record_service.py +++ b/chimerapy/engine/node/record_service.py @@ -18,6 +18,8 @@ ) from ..service import Service from ..states import NodeState +from .events import Artifact +from ..eventbus import Event logger = _logger.getLogger("chimerapy-engine") @@ -149,12 +151,31 @@ def run(self): # self.logger.debug(f"{self}: Closed all entries") - def collect(self): + async def collect(self): # self.logger.debug(f"{self}: collecting recording") # Signal to stop and save self.is_running.clear() + artifacts = {} if self._record_thread: self._record_thread.join() + for name, entry in self.records.items(): + artifacts[name] = entry.get_meta() + + all_artifacts = [] + self.logger.info("Sending artifacts event") + for name, artifact in artifacts.items(): + event_data = Artifact( + name=artifact["name"], + mime_type=artifact["mime_type"], + path=artifact["path"], + glob=artifact["glob"], + size=artifact["path"].stat().st_size, + ) + all_artifacts.append(event_data) + + await self.eventbus.asend(Event("artifacts", data=all_artifacts)) + self.logger.debug("Sent artifacts event") + # self.logger.debug(f"{self}: Finish saving records") diff --git a/chimerapy/engine/node/worker_comms_service.py b/chimerapy/engine/node/worker_comms_service.py index 1f055fda..d01e56bb 100644 --- a/chimerapy/engine/node/worker_comms_service.py +++ b/chimerapy/engine/node/worker_comms_service.py @@ -1,7 +1,7 @@ import logging import pathlib import tempfile -from typing import Dict, Optional +from typing import Dict, Optional, List from ..data_protocols import NodeDiagnostics, NodePubTable from ..eventbus import Event, EventBus, TypedObserver @@ -15,6 +15,7 @@ GatherEvent, ProcessNodePubTableEvent, RegisteredMethodEvent, + Artifact ) from .node_config import NodeConfig @@ -76,6 +77,11 @@ async def async_init(self): "teardown": TypedObserver( "teardown", on_asend=self.teardown, handle_event="drop" ), + "artifacts": TypedObserver( + "artifacts", + on_asend=self.send_artifacts_info, + handle_event="pass" + ), } for ob in observers.values(): await self.eventbus.asubscribe(ob) @@ -130,6 +136,29 @@ async def setup(self): # Send publisher port and host information await self.send_state() + async def send_artifacts_info(self, artifacts_event: Event): + artifacts: List[Artifact] = artifacts_event.data + + assert self.state and self.eventbus and self.logger + if self.client: + pass + data = { + "node_id": self.state.id, + "artifacts": [ + { + "name": artifact.name, + "path": str(artifact.path), + "filename": artifact.path.name, + "mime_type": artifact.mime_type, + "glob": artifact.glob, + "size": artifact.size + } + for artifact in artifacts + ] + } + + await self.client.async_send(signal=NODE_MESSAGE.ARTIFACTS, data=data) + async def teardown(self): # Shutdown the client diff --git a/chimerapy/engine/records/audio_record.py b/chimerapy/engine/records/audio_record.py index 72d88063..657ffc9e 100644 --- a/chimerapy/engine/records/audio_record.py +++ b/chimerapy/engine/records/audio_record.py @@ -65,3 +65,11 @@ def close(self): # Close the audio writer self.audio_writer.close() + + def get_meta(self): + return { + "name": self.name, + "path": self.audio_file_path, + "glob": None, + "mime_type": "audio/wav" + } diff --git a/chimerapy/engine/records/image_record.py b/chimerapy/engine/records/image_record.py index b60a0198..cc7b8602 100644 --- a/chimerapy/engine/records/image_record.py +++ b/chimerapy/engine/records/image_record.py @@ -1,6 +1,7 @@ # Built-in Imports import os import pathlib +import shutil from typing import Any, Dict # Third-party Imports @@ -24,6 +25,7 @@ def __init__( # For image entry, need to save to a new directory self.save_loc = self.dir / self.name + self.zip_loc = None os.makedirs(self.save_loc, exist_ok=True) self.index = 0 @@ -37,4 +39,14 @@ def write(self, data_chunk: Dict[str, Any]): self.index += 1 def close(self): - ... + save_name = self.save_loc.parent / self.name + shutil.make_archive(str(save_name), "zip", str(self.save_loc)) + self.zip_loc = save_name.with_suffix(".zip") + + def get_meta(self): + return { + "name": self.name, + "path": self.zip_loc, + "glob": None, + "mime_type": "application/zip" + } diff --git a/chimerapy/engine/records/json_record.py b/chimerapy/engine/records/json_record.py index 5f875745..7cdbea8a 100644 --- a/chimerapy/engine/records/json_record.py +++ b/chimerapy/engine/records/json_record.py @@ -42,3 +42,11 @@ def close(self): if self.file_handler is not None: self.file_handler.close() self.file_handler = None + + def get_meta(self): + return { + "name": self.name, + "path": self.jsonl_path, + "glob": None, + "mime_type": "text/plain" + } diff --git a/chimerapy/engine/records/record.py b/chimerapy/engine/records/record.py index 4335f1b9..bdb70ada 100644 --- a/chimerapy/engine/records/record.py +++ b/chimerapy/engine/records/record.py @@ -32,3 +32,7 @@ def close(self): """ raise NotImplementedError("``close`` needs to be implemented.") + + def get_meta(self): + """Get metadata.""" + raise NotImplementedError("``get_meta`` needs to be implemented.") diff --git a/chimerapy/engine/records/tabular_record.py b/chimerapy/engine/records/tabular_record.py index 911f0890..b6cffc11 100644 --- a/chimerapy/engine/records/tabular_record.py +++ b/chimerapy/engine/records/tabular_record.py @@ -52,3 +52,11 @@ def write(self, data_chunk: Dict[str, Any]): def close(self): ... + + def get_meta(self): + return { + "name": self.name, + "path": self.tabular_file_path, + "glob": None, + "mime_type": "text/csv" + } diff --git a/chimerapy/engine/records/text_record.py b/chimerapy/engine/records/text_record.py index d036bbce..2c2c128e 100644 --- a/chimerapy/engine/records/text_record.py +++ b/chimerapy/engine/records/text_record.py @@ -27,12 +27,14 @@ def __init__( self.name = name self.first_frame = False self.file_handler: Optional[IO[str]] = None + self.save_path: Optional[pathlib.Path] = None def write(self, data_chunk: Dict[str, Any]): if not self.first_frame: self.file_handler = (self.dir / f"{self.name}.{data_chunk['suffix']}").open( "w" ) + self.save_path = self.dir / f"{self.name}.{data_chunk['suffix']}" self.first_frame = True text_data = data_chunk["data"] @@ -43,3 +45,11 @@ def close(self): if self.file_handler is not None: self.file_handler.close() self.file_handler = None + + def get_meta(self): + return { + "name": self.name, + "path": self.save_path, + "glob": None, + "mime_type": "text/plain" + } diff --git a/chimerapy/engine/records/video_record.py b/chimerapy/engine/records/video_record.py index 01d020d4..d5c483da 100644 --- a/chimerapy/engine/records/video_record.py +++ b/chimerapy/engine/records/video_record.py @@ -100,3 +100,11 @@ def close(self): # Close the video self.video_writer.release() + + def get_meta(self): + return { + "name": self.name, + "path": self.video_file_path, + "glob": None, + "mime_type": "video/mp4" + } diff --git a/chimerapy/engine/worker/file_transfer_service.py b/chimerapy/engine/worker/file_transfer_service.py new file mode 100644 index 00000000..79f1019a --- /dev/null +++ b/chimerapy/engine/worker/file_transfer_service.py @@ -0,0 +1,34 @@ +import logging +from typing import List, Dict + + +from ..service import Service +from ..eventbus import Event, EventBus, TypedObserver + +from chimerapy.engine._logger import fork + + + +class FileTransferService(Service): + def __init__(self, name: str, event_bus: EventBus, logger: logging.Logger): + super().__init__(name=name) + self.event_bus = event_bus + self.observers: Dict[str, TypedObserver] = {} + self.logger = logger + + async def async_init(self): + self.observers = { + "artifacts": TypedObserver( + "artifacts", + self._initiate_artifacts_transfer, + handle_event="pass" + ) + } + + for name, observer in self.observers.items(): + await self.event_bus.asubscribe(observer) + + async def _initiate_artifacts_transfer(self, event: Event): + artifacts_data = event.data + for name, data in artifacts_data.items(): + print(data, name) diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index 2733fbb6..cba12490 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -4,7 +4,7 @@ import os import pathlib import pickle -from typing import Dict, List +from typing import Dict, List, Any import aiofiles import zmq @@ -112,6 +112,7 @@ def __init__( # Containers self.tasks: List[asyncio.Task] = [] + self.artifacts_data: Dict[str, Any] = {} # Create server self.server = Server( @@ -131,6 +132,7 @@ def __init__( web.post("/nodes/registered_methods", self._async_request_method_route), web.post("/nodes/stop", self._async_stop_nodes_route), web.post("/nodes/diagnostics", self._async_diagnostics_route), + web.get("/nodes/artifacts", self._async_get_artifacts_route), # web.post("/packages/load", self._async_load_sent_packages), web.post("/shutdown", self._async_shutdown_route), ], @@ -139,6 +141,7 @@ def __init__( NODE_MESSAGE.REPORT_GATHER: self._async_node_report_gather, NODE_MESSAGE.REPORT_RESULTS: self._async_node_report_results, NODE_MESSAGE.DIAGNOSTICS: self._async_node_diagnostics, + NODE_MESSAGE.ARTIFACTS: self._async_node_artifacts, }, parent_logger=self.logger, ) @@ -401,6 +404,9 @@ async def _async_diagnostics_route(self, request: web.Request) -> web.Response: await self.eventbus.asend(Event("diagnostics", event_data)) return web.HTTPOk() + async def _async_get_artifacts_route(self, request: web.Request) -> web.Response: + return web.json_response(self.artifacts_data) + async def _async_shutdown_route(self, request: web.Request) -> web.Response: # Execute shutdown after returning HTTPOk (prevent Manager stuck waiting) self.tasks.append(asyncio.create_task(self.eventbus.asend(Event("shutdown")))) @@ -455,3 +461,10 @@ async def _async_node_diagnostics(self, msg: Dict, ws: web.WebSocketResponse): diag = NodeDiagnostics.from_dict(msg["data"]["diagnostics"]) if node_id in self.state.nodes: self.state.nodes[node_id].diagnostics = diag + + async def _async_node_artifacts(self, msg: Dict, ws: web.WebSocketResponse): + data = msg["data"] + self.artifacts_data[data["node_id"]] = data["artifacts"] + await self.eventbus.asend( + Event("artifacts", self.artifacts_data) + ) diff --git a/chimerapy/engine/worker/worker.py b/chimerapy/engine/worker/worker.py index 4463eaab..d9959509 100644 --- a/chimerapy/engine/worker/worker.py +++ b/chimerapy/engine/worker/worker.py @@ -19,6 +19,7 @@ from .http_client_service import HttpClientService from .http_server_service import HttpServerService from .node_handler_service import NodeHandlerService +from .file_transfer_service import FileTransferService class Worker: @@ -106,6 +107,12 @@ async def aserve(self) -> bool: logreceiver=self.logreceiver, ) + self.file_transfer = FileTransferService( + name="file_transfer", + event_bus=self.eventbus, + logger=self.logger, + ) + await self.http_client.async_init() await self.http_server.async_init() await self.node_handler.async_init() From e76e886834fab3c3893e056da33825b02defb5c7 Mon Sep 17 00:00:00 2001 From: Umesh Date: Thu, 19 Oct 2023 21:34:15 -0500 Subject: [PATCH 09/13] Rich Progress Handlers --- chimerapy/engine/chimerapyrc.yaml | 3 +- .../manager/artifacts_collector_service.py | 278 ++++++++---------- .../engine/manager/http_server_service.py | 10 + chimerapy/engine/manager/manager.py | 12 +- .../engine/manager/worker_handler_service.py | 31 +- .../engine/networking/file_transfer_client.py | 123 -------- chimerapy/engine/networking/utils.py | 30 ++ .../networking/zmq_file_transfer_client.py | 156 ++++++++++ ..._server.py => zmq_file_transfer_server.py} | 90 +++--- .../worker/artifacts_transfer_service.py | 114 +++++++ chimerapy/engine/worker/events.py | 11 + .../engine/worker/file_transfer_service.py | 34 --- .../engine/worker/http_client_service.py | 31 +- .../engine/worker/http_server_service.py | 90 +----- chimerapy/engine/worker/worker.py | 10 +- 15 files changed, 561 insertions(+), 462 deletions(-) delete mode 100644 chimerapy/engine/networking/file_transfer_client.py create mode 100644 chimerapy/engine/networking/utils.py create mode 100644 chimerapy/engine/networking/zmq_file_transfer_client.py rename chimerapy/engine/networking/{file_transfer_server.py => zmq_file_transfer_server.py} (67%) create mode 100644 chimerapy/engine/worker/artifacts_transfer_service.py delete mode 100644 chimerapy/engine/worker/file_transfer_service.py diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index 15e04166..8b6fc4ee 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -11,6 +11,7 @@ config: worker-shutdown: 10 # seconds node-creation: 130 # seconds reset: 30 + collect: 1800 # 30 minutes retry: data-collection: 30 # seconds logs-sink: @@ -41,4 +42,4 @@ config: logging-enabled: false file-transfer: chunk-size: 250000 # bytes - max-chunks: 2 # Number of chunks to send at once + max-chunks: 1 # Number of chunks to send at once diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py index d38e255a..6253560a 100644 --- a/chimerapy/engine/manager/artifacts_collector_service.py +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -1,173 +1,155 @@ -import json import logging -import os import pathlib -from typing import Optional +from typing import Any, Dict, Optional -import aiofiles -import aiohttp import aioshutil -import zmq -from aiohttp import ClientSession -from rich.progress import Progress -from zmq.asyncio import Context +import zmq.asyncio -from chimerapy.engine import config +import chimerapy.engine.config as cpe_config from chimerapy.engine._logger import fork, getLogger -from chimerapy.engine.states import ManagerState -from chimerapy.engine.utils import get_ip_address - - -async def download_task( - context: zmq.Context, - ip: str, - port: int, - filename: pathlib.Path, - expected_size, - progress=None, -): - """Download a file from a worker.""" - dealer = context.socket(zmq.DEALER) - dealer.sndhwm = dealer.rcvhwm = config.get("file-transfer.max-chunks") - dealer.connect(f"tcp://{ip}:{port}") - - f = await aiofiles.open(filename, "wb") - credit = config.get("file-transfer.max-chunks") - chunk_size = config.get("file-transfer.chunk-size") - - total = 0 - chunks = 0 - offset = 0 - seq_no = 0 - - # Create a progress bar - human_size = round(expected_size / 1024 / 1024, 2) - update_task = None - if progress: - update_task = progress.add_task( - f"[cyan]Downloading ({filename.name}-{human_size}MB...)", total=100 - ) - - while True: - while credit: - await dealer.send_multipart( - [b"fetch", b"%i" % offset, b"%i" % chunk_size, b"%i" % seq_no] - ) - offset += chunk_size - seq_no += 1 - credit -= 1 - - try: - chunk, seq_no_recv_str = await dealer.recv_multipart() - await f.write(chunk) - except zmq.ZMQError as e: - if e.errno == zmq.ETERM: - return - else: - raise +from chimerapy.engine.networking.zmq_file_transfer_client import ZMQFileClient +from chimerapy.engine.utils import get_progress_bar - chunks += 1 - credit += 1 - size = len(chunk) - total += size +from ..eventbus import Event, EventBus, TypedObserver +from ..service import Service +from ..states import ManagerState +from .events import UpdateSendArchiveEvent - if update_task: - progress.update(update_task, completed=(total / expected_size) * 100) - - if size < chunk_size: - await f.close() - break - - -class ArtifactsCollector: - """A utility class to collect artifacts recorded by the nodes.""" +class ArtifactsCollectorService(Service): def __init__( self, + name: str, + eventbus: EventBus, state: ManagerState, - worker_id: str, parent_logger: Optional[logging.Logger] = None, - unzip: bool = False, - progressbar: Optional[Progress] = None, ): - worker_state = state.workers[worker_id] + super().__init__(name=name) + self.eventbus = eventbus + self.observers: Dict[str, TypedObserver] = {} + self.clients: Dict[str, ZMQFileClient] = {} + self.state = state + self.progressbar = get_progress_bar() + if parent_logger is None: parent_logger = getLogger("chimerapy-engine") - self.logger = fork( - parent_logger, - f"ArtifactsCollector-[Worker({worker_state.name})]", - ) + self.logger = fork(parent_logger, self.__class__.__name__) - self.state = state - self.worker_id = worker_id - self.base_url = ( - f"http://{self.state.workers[self.worker_id].ip}:" - f"{self.state.workers[self.worker_id].port}" - ) - self.unzip = unzip - self.progressbar = progressbar - - async def _artifact_info(self, session: aiohttp.ClientSession): - self.logger.info(f"Requesting artifact info from {self.base_url}") - data = { - "initiate_remote_transfer": get_ip_address() - != self.state.workers[self.worker_id].ip + async def async_init(self): + self.observers = { + "artifacts_transfer_ready": TypedObserver( + "artifacts_transfer_ready", on_asend=self.collect, handle_event="pass" + ) } - async with session.post( - "/nodes/request_collect", - data=json.dumps(data), - ) as resp: - if resp.ok: - data = await resp.json() - return data["zip_path"], data["port"], data["size"] - else: - # FixMe: Handle this error properly - raise ConnectionError( - f"Artifacts Collection Failed: {resp.status} {resp.reason}" - ) + for name, observer in self.observers.items(): + await self.eventbus.asubscribe(observer) + + async def collect(self, event: Event) -> None: + method = event.data["method"] + print(event.data) + if method == "zmq": + self.logger.debug("Collecting artifacts over ZMQ") + await self._collect_zmq( + worker_id=event.data["worker_id"], + host=event.data["ip"], + port=event.data["port"], + artifacts=event.data["data"], + ) + else: + self.logger.debug("Collecting artifacts locally") + await self._collect_local( + worker_id=event.data["worker_id"], artifacts=event.data["data"] + ) - async def collect(self) -> bool: - client_session = ClientSession(base_url=self.base_url) + async def _collect_zmq( + self, worker_id: str, host: str, port: int, artifacts: Dict[str, Any] + ): + files = {} + self.logger.debug("Preparing files to download") + for node_id, artifacts in artifacts.items(): + out_dir = self._create_node_dir(worker_id, node_id) + for artifact in artifacts: + key = f"{node_id}-{artifact['name']}" + files[key] = { + "name": artifact["filename"], + "size": artifact["size"], + "outdir": out_dir, + } + context = zmq.asyncio.Context.instance() + client = ZMQFileClient( + context=context, + host=host, + port=port, + credit=cpe_config.get("file-transfer.max-chunks"), + chunk_size=cpe_config.get("file-transfer.chunk-size"), + files=files, + parent_logger=self.logger, + progressbar=self.progressbar, + ) + self.clients[worker_id] = client try: - zip_path, port, size = await self._artifact_info(client_session) + await client.async_init() + await client.download_files() + event_data = UpdateSendArchiveEvent(worker_id=worker_id, success=True) except Exception as e: - self.logger.error(f"Failed to get artifact info: {e}") - return False - save_name = f"{self.state.workers[self.worker_id].name}_{self.worker_id[:8]}" - zip_save_path = self.state.logdir / f"{save_name}.zip" - if port is not None: - try: - await download_task( - Context(), - self.state.workers[self.worker_id].ip, - int(port), - zip_save_path, - size, - self.progressbar, - ) - except Exception as e: - self.logger.error(f"Failed to download artifacts: {e}") - return False - else: - self.logger.info(f"Copying {zip_path} to {zip_save_path}") - try: - await aioshutil.copyfile(zip_path, self.state.logdir / zip_save_path) - except Exception as e: - self.logger.error(f"Failed to copy artifacts: {e}") - return False - - if self.unzip: - self.logger.info(f"Unzipping {zip_save_path}") - try: - await aioshutil.unpack_archive( - zip_save_path, self.state.logdir / save_name + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=False, + ) + self.logger.error( + f"Error while collecting artifacts for worker {worker_id}: {e}" + ) + finally: + await self.eventbus.asend(Event("update_send_archive", event_data)) + self.logger.info(f"Successfully collected artifacts for worker {worker_id}") + + async def _collect_local(self, worker_id: str, artifacts: Dict[str, Any]) -> None: + try: + for node_id, node_artifacts in artifacts.items(): + node_dir = self._create_node_dir(worker_id, node_id) + + for artifact in node_artifacts: + artifact_path = pathlib.Path(artifact["path"]) + self.logger.debug(f"Copying {artifact_path} to {node_dir}") + await aioshutil.copyfile( + artifact_path, node_dir / artifact["filename"] + ) + + await self.eventbus.asend( + Event( + "update_send_archive", + UpdateSendArchiveEvent(worker_id=worker_id, success=True), ) - self.logger.info(f"Removing {zip_save_path}") - os.remove(zip_save_path) - except Exception as e: - self.logger.error(f"Failed to unzip artifacts: {e}") - return False + ) + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=True, + ) + self.logger.info(f"Successfully collected artifacts for worker {worker_id}") + except Exception as e: + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=False, + ) + self.logger.error( + f"Error while collecting artifacts for worker {worker_id}: {e}" + ) + finally: + await self.eventbus.asend(Event("update_send_archive", event_data)) + + def _create_worker_dir(self, worker_id): + worker_name = self.state.workers[worker_id].name + worker_dir = self.state.logdir / f"{worker_name}-{worker_id[:10]}" + worker_dir.mkdir(parents=True, exist_ok=True) + + return worker_dir + + def _create_node_dir(self, worker_id, node_id): + worker_dir = self._create_worker_dir(worker_id) + nodes = self.state.workers[worker_id].nodes + node_dir = worker_dir / nodes[node_id].name + node_dir.mkdir(parents=True, exist_ok=True) - return True + return node_dir diff --git a/chimerapy/engine/manager/http_server_service.py b/chimerapy/engine/manager/http_server_service.py index 1d1557bb..93590048 100644 --- a/chimerapy/engine/manager/http_server_service.py +++ b/chimerapy/engine/manager/http_server_service.py @@ -55,6 +55,10 @@ def __init__( web.post("/workers/deregister", self._deregister_worker_route), web.post("/workers/node_status", self._update_nodes_status), web.post("/workers/send_archive", self._update_send_archive), + web.post( + "/workers/artifacts_transfer_ready", + self._file_transfer_server_ready, + ), ], ) @@ -192,6 +196,12 @@ async def _update_send_archive(self, request: web.Request): await self.eventbus.asend(Event("update_send_archive", event_data)) return web.HTTPOk() + async def _file_transfer_server_ready(self, request: web.Request): + msg = await request.json() + event_data = msg + await self.eventbus.asend(Event("artifacts_transfer_ready", event_data)) + return web.HTTPOk() + ##################################################################################### ## Front-End API ##################################################################################### diff --git a/chimerapy/engine/manager/manager.py b/chimerapy/engine/manager/manager.py index efce38dc..067e95c2 100644 --- a/chimerapy/engine/manager/manager.py +++ b/chimerapy/engine/manager/manager.py @@ -15,6 +15,7 @@ # Eventbus from ..eventbus import Event, EventBus, make_evented from ..networking.async_loop_thread import AsyncLoopThread +from .artifacts_collector_service import ArtifactsCollectorService from .distributed_logging_service import DistributedLoggingService # Services @@ -110,6 +111,12 @@ async def aserve(self) -> bool: state=self.state, # **self.kwargs, ) + self.artifacts_collector = ArtifactsCollectorService( + name="artifacts_collector", + eventbus=self.eventbus, + state=self.state, + parent_logger=logger, + ) # Initialize services await self.http_server.async_init() @@ -117,6 +124,7 @@ async def aserve(self) -> bool: await self.zeroconf_service.async_init() await self.session_record.async_init() await self.distributed_logging.async_init() + await self.artifacts_collector.async_init() # Start all services await self.eventbus.asend(Event("start")) @@ -336,8 +344,8 @@ async def async_stop(self) -> bool: async def async_collect(self) -> bool: return await self.worker_handler.collect() - async def async_collect_v2(self, unzip=False) -> bool: - return await self.worker_handler.collect_v2(unzip) + async def async_collect_v2(self) -> bool: + await self.worker_handler.collect_v2() async def async_reset(self, keep_workers: bool = True): return await self.worker_handler.reset(keep_workers) diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index 1716e7e4..2bd229f4 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -12,8 +12,8 @@ import dill import networkx as nx +import chimerapy.engine.config as cpe_config from chimerapy.engine import _logger, config -from chimerapy.engine.utils import get_progress_bar from ..data_protocols import NodePubTable from ..eventbus import Event, EventBus, TypedObserver, make_evented @@ -825,28 +825,17 @@ async def collect(self) -> bool: await self.eventbus.asend(Event("save_meta")) return all(results) - async def collect_v2(self, unzip: bool = False) -> bool: - from .artifacts_collector_service import ArtifactsCollector - - futures = [] - progress_bar = get_progress_bar() - progress_bar.start() + def workers_collected(self): for worker_id in self.state.workers: - artifacts_collector = ArtifactsCollector( - worker_id=worker_id, - state=self.state, - parent_logger=logger, - unzip=unzip, - progressbar=progress_bar, - ) - future = asyncio.ensure_future(artifacts_collector.collect()) - futures.append(future) - - logger.info("Collecting artifacts from workers...") - results = await asyncio.gather(*futures) - progress_bar.stop() + if worker_id not in self.collected_workers: + return False + return True - return all(results) + async def collect_v2(self, unzip: bool = False) -> bool: + await self._broadcast_request("post", "/nodes/request_collect") + await async_waiting_for( + self.workers_collected, timeout=cpe_config.get("manager.timeout.collect") + ) async def reset(self, keep_workers: bool = True): diff --git a/chimerapy/engine/networking/file_transfer_client.py b/chimerapy/engine/networking/file_transfer_client.py deleted file mode 100644 index c4ae04e7..00000000 --- a/chimerapy/engine/networking/file_transfer_client.py +++ /dev/null @@ -1,123 +0,0 @@ -import asyncio -from typing import Dict, Any - -import pathlib - -import aiofiles -import zmq.asyncio - -from chimerapy.engine.utils import get_progress_bar -import random - -class ZMQFileClient: - def __init__(self, - context: zmq.asyncio.Context, - host: str, - port: int, - outdir: pathlib.Path, - files: Dict[str, Any]): - self.host = host - self.port = port - self.context = context - self.outdir = outdir - self.files = files - self.socket = None - - async def async_init(self): - self.socket = self.context.socket(zmq.DEALER) - self.socket.sndhwm = self.socket.rcvhwm = 1 - self.socket.connect(f"tcp://{self.host}:{self.port}") - - - async def _download_task(self): - handles = {} - offsets = {} - progress_bar = get_progress_bar() - for name, details in self.files.items(): - fname = details["name"] - handles[name] = await aiofiles.open(self.outdir / fname, "wb") - offsets[name] = 0 - - download_tasks = {} - - credit = 1 - chunk_size = 250000 - completed = [] - - total = 0 - seq_no = 0 - - progress_bar.start() - while True: - while credit: - selected_file = random.choice(list(offsets.keys())) - await self.socket.send_multipart( - [b"fetch", selected_file.encode(), b"%i" % offsets[selected_file], b"%i" % chunk_size, b"%i" % seq_no] - ) - offsets[selected_file] = offsets[selected_file] + chunk_size - seq_no += 1 - credit -= 1 - try: - filename, chunk, seq_no_recv_str = await self.socket.recv_multipart() - filename_str = filename.decode() - - if filename_str not in download_tasks: - download_tasks[filename_str] = progress_bar.add_task(f"Downloading({filename_str})", total=100) - - complete = (offsets[filename_str] / self.files[filename_str]["size"]) * 100 - progress_bar.update(download_tasks[filename_str], completed=complete) - await handles[filename_str].write(chunk) - except zmq.ZMQError as e: - if e.errno == zmq.ETERM: - return - else: - raise e - - credit += 1 - size = len(chunk) - total += size - if size < chunk_size: - completed.append(filename_str) - progress_bar.update(download_tasks[filename_str], completed=100) - del offsets[filename_str] - - if len(completed) == len(self.files): - for name, handle in handles.items(): - await handle.close() - - progress_bar.stop() - break - - -async def main(): - context = zmq.asyncio.Context() - host = "localhost" - port = 6000 - - outdir = pathlib.Path("dst") - files = { - "test1": { - "name": "test1.mp4", - "size": 31376170 - }, - "test2": { - "name": "test2.mp4", - "size": 36633129, - }, - "test3": { - "name": "test3.mp4", - "size": 39156488 - }, - "test4": { - "name": "test4.mp4", - "size": 33941417 - } - } - - client = ZMQFileClient(context, host, port, outdir, files) - await client.async_init() - await client._download_task() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/chimerapy/engine/networking/utils.py b/chimerapy/engine/networking/utils.py new file mode 100644 index 00000000..892e893e --- /dev/null +++ b/chimerapy/engine/networking/utils.py @@ -0,0 +1,30 @@ +import os + + +class ZMQFileChunk: + """A ZeroMQ File Chunk.""" + + def __init__(self, data: bytes) -> None: + self.data = data + + def write_into(self, handle) -> None: + handle.write(self.data) + + async def awrite_into(self, ahandle) -> None: + await ahandle.write(self.data) + + @classmethod + def from_bytes(cls, data) -> "ZMQFileChunk": + return cls(data=data) + + @classmethod + def read_from(cls, handle, offset, chunk_size) -> "ZMQFileChunk": + handle.seek(offset, os.SEEK_SET) + data = handle.read(chunk_size) + return cls.from_bytes(data=data) + + @classmethod + async def aread_from(cls, ahandle, offset, chunk_size) -> "ZMQFileChunk": + await ahandle.seek(offset, os.SEEK_SET) + data = await ahandle.read(chunk_size) + return cls.from_bytes(data=data) diff --git a/chimerapy/engine/networking/zmq_file_transfer_client.py b/chimerapy/engine/networking/zmq_file_transfer_client.py new file mode 100644 index 00000000..76e2ebd7 --- /dev/null +++ b/chimerapy/engine/networking/zmq_file_transfer_client.py @@ -0,0 +1,156 @@ +import asyncio +import logging +import pathlib +import random +from typing import Any, Dict, Optional + +import aiofiles +import zmq.asyncio +from rich.progress import Progress + +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.utils import ZMQFileChunk + + +class ZMQFileClient: + def __init__( + self, + context: zmq.asyncio.Context, + host: str, + port: int, + credit: int, + chunk_size: int, + files: Dict[str, Any], + parent_logger: Optional[logging.Logger] = None, + progressbar: Optional[Progress] = None, + ) -> None: + self.host = host + self.port = port + self.context = context + self.files = files + self.credits = credit + self.chunk_size = chunk_size + self.progressbar = progressbar + self.socket = None + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + self.logger = fork(parent_logger, "ZMQFileClient") + print(self.host, self.port) + + async def async_init(self): + self.socket = self.context.socket(zmq.DEALER) + self.socket.sndhwm = self.socket.rcvhwm = 1 + self.socket.connect(f"tcp://{self.host}:{self.port}") + self.logger.info(f"Connected to tcp://{self.host}:{self.port}") + + async def download_files(self): + handles = {} + offsets = {} + progressbar = self.progressbar + for name, details in self.files.items(): + fname = details["name"] + outdir = details["outdir"] + handles[name] = await aiofiles.open(outdir / fname, "wb") + offsets[name] = 0 + + download_tasks = {} + + credit = self.credits + chunk_size = self.chunk_size + completed = [] + + total = 0 + seq_no = 0 + + progressbar.start() + while True: + while credit: + selected_file = random.choice(list(offsets.keys())) + await self.socket.send_multipart( + [ + b"fetch", + selected_file.encode(), + b"%i" % offsets[selected_file], + b"%i" % chunk_size, + b"%i" % seq_no, + ] + ) + offsets[selected_file] = offsets[selected_file] + chunk_size + seq_no += 1 + credit -= 1 + try: + filename, chunk, seq_no_recv_str = await self.socket.recv_multipart() + filename_str = filename.decode() + zmq_file_chunk = ZMQFileChunk.from_bytes(chunk) + + if filename_str not in download_tasks and progressbar is not None: + download_tasks[filename_str] = progressbar.add_task( + f"Downloading({filename_str})", total=100 + ) + + complete = ( + offsets[filename_str] / self.files[filename_str]["size"] + ) * 100 + + if progressbar is not None: + progressbar.update(download_tasks[filename_str], completed=complete) + + await zmq_file_chunk.awrite_into(ahandle=handles[filename_str]) + + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise e + + credit += 1 + size = len(chunk) + total += size + if size < chunk_size: + completed.append(filename_str) + progressbar.update(download_tasks[filename_str], completed=100) + del offsets[filename_str] + + if len(completed) == len(self.files): + for name, handle in handles.items(): + await handle.close() + progressbar.stop() + break + + async def close(self): + await self.socket.close() + + +async def main(): + context = zmq.asyncio.Context() + host = "localhost" + port = 6000 + + files = { + "test1": {"name": "test1.mp4", "size": 31376170, "outdir": pathlib.Path("dst")}, + "test2": {"name": "test2.mp4", "size": 36633129, "outdir": pathlib.Path("dst")}, + "test3": {"name": "test3.mp4", "size": 39156488, "outdir": pathlib.Path("dst")}, + "test4": {"name": "test4.mp4", "size": 33941417, "outdir": pathlib.Path("dst")}, + "oele-11-webcam-video": { + "name": "oele-11-webcam-video.mp4", + "size": 351384728, + "outdir": pathlib.Path("dst"), + }, + } + from chimerapy.engine.utils import get_progress_bar + + client = ZMQFileClient( + context, + host, + port, + credit=1, + chunk_size=250000, + files=files, + progressbar=get_progress_bar(), + ) + await client.async_init() + await client.download_files() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/chimerapy/engine/networking/file_transfer_server.py b/chimerapy/engine/networking/zmq_file_transfer_server.py similarity index 67% rename from chimerapy/engine/networking/file_transfer_server.py rename to chimerapy/engine/networking/zmq_file_transfer_server.py index e6cc6c0f..51895caf 100644 --- a/chimerapy/engine/networking/file_transfer_server.py +++ b/chimerapy/engine/networking/zmq_file_transfer_server.py @@ -1,71 +1,70 @@ import asyncio +import logging import os import pathlib +from typing import Optional + import aiofiles -from chimerapy.engine.utils import get_progress_bar import zmq -from typing import Optional, List, Dict, Any import zmq.asyncio - -class ZMQFileChunk: - def __init__(self, data: bytes): - self.data = data - - def write_into(self, handle): - handle.write(self.data) - - async def awrite_into(self, ahandle): - await ahandle.write(self.data) - - @classmethod - def from_bytes(cls, data): - return ZMQFileChunk(data=data) - - @classmethod - def read_from(cls, handle, offset, chunk_size): - handle.seek(offset, os.SEEK_SET) - data = handle.read(chunk_size) - return cls.from_bytes(data=data) - - @classmethod - async def aread_from(cls, ahandle, offset, chunk_size): - await ahandle.seek(offset, os.SEEK_SET) - data = await ahandle.read(chunk_size) - return cls.from_bytes(data=data) +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.utils import ZMQFileChunk +from chimerapy.engine.utils import get_progress_bar class ZMQFileServer: - def __init__(self, context: zmq.asyncio.Context, paths, host="*", port=0): + def __init__( + self, + context: zmq.asyncio.Context, + paths, + credit: int, + host: str = "*", + port: int = 0, + parent_logger: Optional[logging.Logger] = None, + ) -> None: self.context = context self.host = host self.port = port self.paths = paths self.handles = {} + self.credits = credit self.socket: Optional[zmq.Socket] = None self.send_task: Optional[asyncio.Task] = None - async def async_init(self): + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine-networking") + + self.logger = fork(parent_logger, "ZMQFileServer") + + async def async_init(self) -> None: + """Initialize the server.""" await self._initialize_file_handlers() self.socket = self.context.socket(zmq.ROUTER) - self.socket.sndhwm = self.socket.rcvhwm = 1 + self.socket.sndhwm = self.socket.rcvhwm = self.credits + if self.port != 0: self.socket.bind(f"tcp://{self.host}:{self.port}") else: self.port = self.socket.bind_to_random_port(f"tcp://{self.host}") + self.logger.info(f"Listening on tcp://{self.host}:{self.port}") self.send_task = asyncio.create_task(self._send()) + self.progress_bar = None - async def _initialize_file_handlers(self): + async def _initialize_file_handlers(self) -> None: + """Initialize the file handlers.""" for name, file in self.paths.items(): assert file.exists() assert file.is_file() self.handles[name] = await aiofiles.open(file, mode="rb") async def _send(self): + """Send the file chunks.""" assert self.socket is not None router = self.socket - progress_bar = get_progress_bar() + self.progress_bar = get_progress_bar() + progress_bar = self.progress_bar progress_bar.start() upload_tasks = {} @@ -79,10 +78,11 @@ async def _send(self): if e.errno == zmq.ETERM: return else: - raise + raise e identity, command, fname, offset_str, chunksz_str, seq_nostr = msg fname_str = fname.decode() + if fname_str in self.handles: assert command == b"fetch" offset = int(offset_str) @@ -97,20 +97,24 @@ async def _send(self): if not data: progress_bar.update(upload_tasks[fname_str], completed=100) uploaded.add(fname_str) - print(f"Uploaded {fname_str}. {uploaded}") + if uploaded == set(self.paths.keys()): progress_bar.stop() break continue else: await router.send_multipart([identity, fname, data, b"%i" % seq_no]) - completed = (await handle.tell() / os.path.getsize(self.paths[fname_str])) * 100 + completed = ( + await handle.tell() / os.path.getsize(self.paths[fname_str]) + ) * 100 progress_bar.update(upload_tasks[fname_str], completed=completed) - async def ashutdown(self): + """Shutdown the server.""" + if self.progress_bar is not None: + self.progress_bar.stop() + if self.send_task is not None: - self.send_task.cancel() await self.send_task self.send_task = None @@ -129,13 +133,15 @@ async def main(): "test1": pathlib.Path("src/test1.mp4"), "test2": pathlib.Path("src/test2.mp4"), "test3": pathlib.Path("src/test3.mp4"), - "test4": pathlib.Path("src/test4.mp4") + "test4": pathlib.Path("src/test4.mp4"), + "oele-11-webcam-video": pathlib.Path("src/oele-11-webcam-video.mp4"), } - server = ZMQFileServer(context=zmq.asyncio.Context(), paths=files, port=6000) + server = ZMQFileServer( + context=zmq.asyncio.Context(), paths=files, port=6000, credit=1 + ) await server.async_init() await server.send_task + if __name__ == "__main__": asyncio.run(main()) - - diff --git a/chimerapy/engine/worker/artifacts_transfer_service.py b/chimerapy/engine/worker/artifacts_transfer_service.py new file mode 100644 index 00000000..d6013e0a --- /dev/null +++ b/chimerapy/engine/worker/artifacts_transfer_service.py @@ -0,0 +1,114 @@ +import logging +import pathlib +from typing import Dict, Optional, Tuple + +import zmq.asyncio + +import chimerapy.engine.config as cpe_config +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.zmq_file_transfer_server import ZMQFileServer +from chimerapy.engine.utils import get_ip_address + +from ..eventbus import Event, EventBus, TypedObserver +from ..service import Service +from ..states import WorkerState + + +class ArtifactsTransferService(Service): + def __init__( + self, + name: str, + event_bus: EventBus, + state: WorkerState, + parent_logger: Optional[logging.Logger] = None, + ): + super().__init__(name=name) + self.event_bus = event_bus + self.observers: Dict[str, TypedObserver] = {} + self.server: Optional[ZMQFileServer] = None + self.state = state + self.addr: Optional[Tuple] = None + + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + + self.logger = fork(parent_logger, self.__class__.__name__) + + async def async_init(self): + self.observers = { + "initiate_transfer": TypedObserver( + "initiate_transfer", + on_asend=self._initiate_artifacts_transfer, + handle_event="pass", + ), + "connected": TypedObserver( + "connected", on_asend=self._on_connect, handle_event="unpack" + ), + "disconnected": TypedObserver( + "disconnected", on_asend=self._on_disconnect, handle_event="drop" + ), + "shutdown": TypedObserver( + "shutdown", on_asend=self.shutdown, handle_event="drop" + ), + } + + for name, observer in self.observers.items(): + await self.event_bus.asubscribe(observer) + + async def _on_connect(self, manager_host: str, manager_port: int) -> None: + self.addr = (manager_host, manager_port) + + async def _on_disconnect(self) -> None: + self.addr = None + + def _is_remote_worker(self) -> bool: + return self.addr is not None and self.addr[0] != get_ip_address() + + async def _initiate_artifacts_transfer(self, event: Event) -> None: + artifacts_data = event.data + + if not self._is_remote_worker(): + self.logger.info("Initiating local artifacts transfer") + await self.event_bus.asend( + Event( + "artifacts_transfer_ready", + data={ + "port": None, + "ip": None, + "data": artifacts_data, + "method": "file_copy", + }, + ) + ) + return + + files = {} + for node_id, data in artifacts_data.items(): + for artifact in data: + files[f"{node_id}-{artifact['name']}"] = pathlib.Path(artifact["path"]) + + self.server = ZMQFileServer( + context=zmq.asyncio.Context(), + paths=files, + credit=cpe_config.get("file-transfer.max-chunks"), + host=get_ip_address(), + port=0, + parent_logger=self.logger, + ) + + await self.server.async_init() + self.logger.info("Initiating remote artifacts transfer") + await self.event_bus.asend( + Event( + "artifacts_transfer_ready", + data={ + "port": self.server.port, + "ip": self.server.host, + "data": artifacts_data, + "method": "zmq", + }, + ) + ) + + async def shutdown(self) -> None: + await self.server.ashutdown() if self.server else None diff --git a/chimerapy/engine/worker/events.py b/chimerapy/engine/worker/events.py index e1403497..e5ea0de7 100644 --- a/chimerapy/engine/worker/events.py +++ b/chimerapy/engine/worker/events.py @@ -62,3 +62,14 @@ class UpdateResultsEvent: @dataclass class SendArchiveEvent: path: pathlib.Path + + +@dataclass +class ConnectedEvent: + manager_host: str + manager_port: int + + +@dataclass +class DisconnectedEvent: + pass diff --git a/chimerapy/engine/worker/file_transfer_service.py b/chimerapy/engine/worker/file_transfer_service.py deleted file mode 100644 index 79f1019a..00000000 --- a/chimerapy/engine/worker/file_transfer_service.py +++ /dev/null @@ -1,34 +0,0 @@ -import logging -from typing import List, Dict - - -from ..service import Service -from ..eventbus import Event, EventBus, TypedObserver - -from chimerapy.engine._logger import fork - - - -class FileTransferService(Service): - def __init__(self, name: str, event_bus: EventBus, logger: logging.Logger): - super().__init__(name=name) - self.event_bus = event_bus - self.observers: Dict[str, TypedObserver] = {} - self.logger = logger - - async def async_init(self): - self.observers = { - "artifacts": TypedObserver( - "artifacts", - self._initiate_artifacts_transfer, - handle_event="pass" - ) - } - - for name, observer in self.observers.items(): - await self.event_bus.asubscribe(observer) - - async def _initiate_artifacts_transfer(self, event: Event): - artifacts_data = event.data - for name, data in artifacts_data.items(): - print(data, name) diff --git a/chimerapy/engine/worker/http_client_service.py b/chimerapy/engine/worker/http_client_service.py index 26e2a683..6c841811 100644 --- a/chimerapy/engine/worker/http_client_service.py +++ b/chimerapy/engine/worker/http_client_service.py @@ -14,13 +14,13 @@ from chimerapy.engine import _logger, config -from ..eventbus import EventBus, TypedObserver +from ..eventbus import Event, EventBus, TypedObserver from ..logger.zmq_handlers import NodeIDZMQPullListener from ..networking import Client from ..service import Service from ..states import WorkerState from ..utils import get_ip_address -from .events import SendArchiveEvent +from .events import ConnectedEvent, DisconnectedEvent, SendArchiveEvent from .zeroconf_listener import ZeroconfListener @@ -69,6 +69,11 @@ async def async_init(self): on_asend=self._send_archive, handle_event="unpack", ), + "artifacts_transfer_ready": TypedObserver( + "artifacts_transfer_ready", + on_asend=self._signal_manager_artifacts_transfer_ready, + handle_event="pass", + ), } for ob in self.observers.values(): await self.eventbus.asubscribe(ob) @@ -155,6 +160,7 @@ async def async_deregister(self) -> bool: if resp.ok: self.connected_to_manager = False + await self.eventbus.asend(Event("disconnected", DisconnectedEvent())) return resp.ok @@ -207,6 +213,12 @@ async def _async_connect_via_ip( self.logger.info( f"{self}: connection successful to Manager @ {host}:{port}." ) + await self.eventbus.asend( + Event( + "connected", + ConnectedEvent(manager_host=host, manager_port=port), + ) + ) return True return False @@ -292,6 +304,21 @@ async def _send_archive(self, path: pathlib.Path) -> bool: return success + async def _signal_manager_artifacts_transfer_ready(self, event: Event) -> bool: + manager_host, manager_port = self.get_address() + data = { + "ip": event.data["ip"], + "port": event.data["port"], + "worker_id": self.state.id, + "data": event.data["data"], + "method": event.data["method"], + } + async with self.http_client.post( + f"http://{manager_host}:{manager_port}/workers/artifacts_transfer_ready", + data=json.dumps(data), + ) as resp: + return resp.ok + async def _send_archive_locally(self, path: pathlib.Path) -> pathlib.Path: self.logger.debug(f"{self}: sending archive locally") diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index cba12490..47fd5d93 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -4,7 +4,7 @@ import os import pathlib import pickle -from typing import Dict, List, Any +from typing import Any, Dict, List import aiofiles import zmq @@ -39,62 +39,6 @@ ) -class ZMQFileServer: - def __init__(self, ctx: zmq.asyncio.Context, host="*"): - self.host = host - self.ctx = ctx - self.router = None - - async def ainit(self): - router = self.ctx.socket(zmq.ROUTER) - router.sndhwm = router.rcvhwm = config.get("file-transfer.max-chunks") - port = router.bind_to_random_port(f"tcp://{self.host}", max_tries=100) - self.router = router - return port - - async def mount(self, file_path: pathlib.Path): - file = await aiofiles.open(file_path, "rb") - progressbar = get_progress_bar() - size = os.path.getsize(file_path) - human_size = f"{size / 1024 / 1024:.2f} MB" - assert self.router is not None - router = self.router - upload_task = None - if progressbar is not None: - upload_task = progressbar.add_task( - f"Sending {file_path.name} {human_size}", total=100 - ) - while True: - try: - msg = await router.recv_multipart() - except zmq.ZMQError as e: - if e.errno == zmq.ETERM: - return - else: - raise - - identity, command, offset_str, chunksz_str, seq_nostr = msg - - assert command == b"fetch" - offset = int(offset_str) - chunksz = int(chunksz_str) - seq_no = int(seq_nostr) - await file.seek(offset, os.SEEK_SET) - data = file.read(chunksz) - - if upload_task is not None: - print( - f"Sending {file_path.name} {human_size} {offset / size * 100:.2f}" - ) - progressbar.update(upload_task, completed=(offset / size) * 100) - - if not data: - await asyncio.sleep(5) - break - - await router.send_multipart([identity, data, b"%i" % seq_no]) - - class HttpServerService(Service): def __init__( self, @@ -359,27 +303,14 @@ async def _async_collect(self, request: web.Request) -> web.Response: def _have_nodes_saved(self): node_fsm = list(node.fsm for node in self.state.nodes.values()) - print(node_fsm) return all(fsm == "SAVED" for fsm in node_fsm) async def _async_request_collect(self, request: web.Request) -> web.Response: data = await request.json() await self.eventbus.asend(Event("collect")) - await async_waiting_for(self._have_nodes_saved, timeout=10) - self.logger.info("All nodes saved") - initiate_remote_transfer = data.get("initiate_remote_transfer", True) - path = pathlib.Path(self.state.tempfolder) - zip_path = await self._zip_direcotry(path) - port = None - - if initiate_remote_transfer: - self.logger.debug("Starting File Transfer Server") - port = await self._start_file_transfer_server(zip_path) - self.logger.info(f"File Transfer Server Started at {port}") - - return web.json_response( - {"zip_path": str(zip_path), "port": port, "size": os.path.getsize(zip_path)} - ) + await async_waiting_for(self._have_nodes_saved, timeout=100) + await self.eventbus.asend(Event("initiate_transfer", data=self.artifacts_data)) + return web.HTTPOk() async def _zip_direcotry(self, path: pathlib.Path) -> pathlib.Path: import aioshutil @@ -387,15 +318,6 @@ async def _zip_direcotry(self, path: pathlib.Path) -> pathlib.Path: zip_path = await aioshutil.make_archive(path, "zip", path.parent, path.name) return zip_path - async def _start_file_transfer_server(self, path: pathlib.Path) -> int: - # Start file transfer server - context = zmq.asyncio.Context() - server = ZMQFileServer(context) - port = await server.ainit() - print("Initiated File Server") - self.tasks.append(asyncio.create_task(server.mount(path))) - return port - async def _async_diagnostics_route(self, request: web.Request) -> web.Response: data = await request.json() @@ -465,6 +387,4 @@ async def _async_node_diagnostics(self, msg: Dict, ws: web.WebSocketResponse): async def _async_node_artifacts(self, msg: Dict, ws: web.WebSocketResponse): data = msg["data"] self.artifacts_data[data["node_id"]] = data["artifacts"] - await self.eventbus.asend( - Event("artifacts", self.artifacts_data) - ) + await self.eventbus.asend(Event("artifacts", data=self.artifacts_data)) diff --git a/chimerapy/engine/worker/worker.py b/chimerapy/engine/worker/worker.py index d9959509..49f070a3 100644 --- a/chimerapy/engine/worker/worker.py +++ b/chimerapy/engine/worker/worker.py @@ -16,10 +16,10 @@ from ..networking.async_loop_thread import AsyncLoopThread from ..node import NodeConfig from ..states import NodeState, WorkerState +from .artifacts_transfer_service import ArtifactsTransferService from .http_client_service import HttpClientService from .http_server_service import HttpServerService from .node_handler_service import NodeHandlerService -from .file_transfer_service import FileTransferService class Worker: @@ -107,15 +107,17 @@ async def aserve(self) -> bool: logreceiver=self.logreceiver, ) - self.file_transfer = FileTransferService( - name="file_transfer", + self.artifacts_transfer = ArtifactsTransferService( + name="artifacts_transfer", event_bus=self.eventbus, - logger=self.logger, + state=self.state, + parent_logger=self.logger, ) await self.http_client.async_init() await self.http_server.async_init() await self.node_handler.async_init() + await self.artifacts_transfer.async_init() # Start all services await self.eventbus.asend(Event("start")) From 60137955d3e4189bac9dc5f4d1c8612274c64c84 Mon Sep 17 00:00:00 2001 From: Umesh Date: Thu, 19 Oct 2023 22:23:13 -0500 Subject: [PATCH 10/13] Larger Chunk Size --- chimerapy/engine/chimerapyrc.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index 8b6fc4ee..c16bbd66 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -41,5 +41,5 @@ config: interval: 10 logging-enabled: false file-transfer: - chunk-size: 250000 # bytes - max-chunks: 1 # Number of chunks to send at once + chunk-size: 500000 # bytes + max-chunks: 2 # Number of chunks to send at once From 7bb49aa293e0586a71d1fdf32ebaf871ab66a93d Mon Sep 17 00:00:00 2001 From: Umesh Date: Fri, 20 Oct 2023 19:12:10 -0500 Subject: [PATCH 11/13] Use human readable sizes --- .../networking/zmq_file_transfer_client.py | 37 +++++++++---------- .../networking/zmq_file_transfer_server.py | 11 +++--- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/chimerapy/engine/networking/zmq_file_transfer_client.py b/chimerapy/engine/networking/zmq_file_transfer_client.py index 76e2ebd7..1a0697d0 100644 --- a/chimerapy/engine/networking/zmq_file_transfer_client.py +++ b/chimerapy/engine/networking/zmq_file_transfer_client.py @@ -79,23 +79,25 @@ async def download_files(self): seq_no += 1 credit -= 1 try: - filename, chunk, seq_no_recv_str = await self.socket.recv_multipart() - filename_str = filename.decode() + filekey, chunk, seq_no_recv_str = await self.socket.recv_multipart() + filekey_str = filekey.decode() zmq_file_chunk = ZMQFileChunk.from_bytes(chunk) - if filename_str not in download_tasks and progressbar is not None: - download_tasks[filename_str] = progressbar.add_task( - f"Downloading({filename_str})", total=100 + if filekey_str not in download_tasks and progressbar is not None: + human_size = f"{round(self.files[filekey_str]['size'] / 1024 / 1024, 2)} MB" + fname = self.files[filekey_str]["name"] + download_tasks[filekey_str] = progressbar.add_task( + f"Downloading({fname}|{human_size})", total=100 ) complete = ( - offsets[filename_str] / self.files[filename_str]["size"] + offsets[filekey_str] / self.files[filekey_str]["size"] ) * 100 if progressbar is not None: - progressbar.update(download_tasks[filename_str], completed=complete) + progressbar.update(download_tasks[filekey_str], completed=complete) - await zmq_file_chunk.awrite_into(ahandle=handles[filename_str]) + await zmq_file_chunk.awrite_into(ahandle=handles[filekey_str]) except zmq.ZMQError as e: if e.errno == zmq.ETERM: @@ -107,9 +109,9 @@ async def download_files(self): size = len(chunk) total += size if size < chunk_size: - completed.append(filename_str) - progressbar.update(download_tasks[filename_str], completed=100) - del offsets[filename_str] + completed.append(filekey_str) + progressbar.update(download_tasks[filekey_str], completed=100) + del offsets[filekey_str] if len(completed) == len(self.files): for name, handle in handles.items(): @@ -127,15 +129,12 @@ async def main(): port = 6000 files = { - "test1": {"name": "test1.mp4", "size": 31376170, "outdir": pathlib.Path("dst")}, - "test2": {"name": "test2.mp4", "size": 36633129, "outdir": pathlib.Path("dst")}, - "test3": {"name": "test3.mp4", "size": 39156488, "outdir": pathlib.Path("dst")}, - "test4": {"name": "test4.mp4", "size": 33941417, "outdir": pathlib.Path("dst")}, - "oele-11-webcam-video": { - "name": "oele-11-webcam-video.mp4", - "size": 351384728, + "mac-capture": { + "name": "mac-capture.mp4", "outdir": pathlib.Path("dst"), - }, + "size": 2415932486 + + } } from chimerapy.engine.utils import get_progress_bar diff --git a/chimerapy/engine/networking/zmq_file_transfer_server.py b/chimerapy/engine/networking/zmq_file_transfer_server.py index 51895caf..e6e605a5 100644 --- a/chimerapy/engine/networking/zmq_file_transfer_server.py +++ b/chimerapy/engine/networking/zmq_file_transfer_server.py @@ -69,7 +69,10 @@ async def _send(self): upload_tasks = {} for name, file in self.paths.items(): - upload_tasks[name] = progress_bar.add_task(f"Uploading({name})", total=100) + file_name = file.name + # round to 2 decimal places + human_size = f"{round(os.path.getsize(file) / 1024 / 1024, 2)} MB" + upload_tasks[name] = progress_bar.add_task(f"Uploading({file_name}|{human_size})", total=100) uploaded = set() while True: try: @@ -130,11 +133,7 @@ async def ashutdown(self): async def main(): files = { - "test1": pathlib.Path("src/test1.mp4"), - "test2": pathlib.Path("src/test2.mp4"), - "test3": pathlib.Path("src/test3.mp4"), - "test4": pathlib.Path("src/test4.mp4"), - "oele-11-webcam-video": pathlib.Path("src/oele-11-webcam-video.mp4"), + "mac-capture": pathlib.Path("src/mac-capture.mp4"), } server = ZMQFileServer( context=zmq.asyncio.Context(), paths=files, port=6000, credit=1 From 96726c9532ffa1ec9101cb284bfb4aef7fc5478f Mon Sep 17 00:00:00 2001 From: Umesh Timalsina Date: Mon, 23 Oct 2023 13:15:43 -0500 Subject: [PATCH 12/13] Minor Cleanups --- .../manager/artifacts_collector_service.py | 8 ++-- chimerapy/engine/manager/manager.py | 2 +- .../engine/manager/worker_handler_service.py | 8 +++- .../networking/zmq_file_transfer_client.py | 43 +++---------------- .../networking/zmq_file_transfer_server.py | 25 +++-------- chimerapy/engine/node/worker_comms_service.py | 17 ++++---- .../worker/artifacts_transfer_service.py | 4 +- .../engine/worker/http_client_service.py | 3 ++ 8 files changed, 36 insertions(+), 74 deletions(-) diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py index 6253560a..335093a6 100644 --- a/chimerapy/engine/manager/artifacts_collector_service.py +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -43,12 +43,12 @@ async def async_init(self): ) } - for name, observer in self.observers.items(): + for name, observer in self.observers.items(): # noqa: B007 await self.eventbus.asubscribe(observer) async def collect(self, event: Event) -> None: + assert event.data is not None method = event.data["method"] - print(event.data) if method == "zmq": self.logger.debug("Collecting artifacts over ZMQ") await self._collect_zmq( @@ -68,9 +68,9 @@ async def _collect_zmq( ): files = {} self.logger.debug("Preparing files to download") - for node_id, artifacts in artifacts.items(): + for node_id, artifact_details in artifacts.items(): out_dir = self._create_node_dir(worker_id, node_id) - for artifact in artifacts: + for artifact in artifact_details: key = f"{node_id}-{artifact['name']}" files[key] = { "name": artifact["filename"], diff --git a/chimerapy/engine/manager/manager.py b/chimerapy/engine/manager/manager.py index 067e95c2..608c04b1 100644 --- a/chimerapy/engine/manager/manager.py +++ b/chimerapy/engine/manager/manager.py @@ -345,7 +345,7 @@ async def async_collect(self) -> bool: return await self.worker_handler.collect() async def async_collect_v2(self) -> bool: - await self.worker_handler.collect_v2() + return await self.worker_handler.collect_v2() async def async_reset(self, keep_workers: bool = True): return await self.worker_handler.reset(keep_workers) diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index 2bd229f4..796e0515 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -831,12 +831,18 @@ def workers_collected(self): return False return True - async def collect_v2(self, unzip: bool = False) -> bool: + async def collect_v2(self) -> bool: await self._broadcast_request("post", "/nodes/request_collect") await async_waiting_for( self.workers_collected, timeout=cpe_config.get("manager.timeout.collect") ) + for worker_id in self.collected_workers: + if not self.collected_workers[worker_id]: + return False + + return True + async def reset(self, keep_workers: bool = True): # Destroy Nodes safely diff --git a/chimerapy/engine/networking/zmq_file_transfer_client.py b/chimerapy/engine/networking/zmq_file_transfer_client.py index 1a0697d0..14d957d1 100644 --- a/chimerapy/engine/networking/zmq_file_transfer_client.py +++ b/chimerapy/engine/networking/zmq_file_transfer_client.py @@ -1,6 +1,4 @@ -import asyncio import logging -import pathlib import random from typing import Any, Dict, Optional @@ -35,7 +33,6 @@ def __init__( if parent_logger is None: parent_logger = getLogger("chimerapy-engine") self.logger = fork(parent_logger, "ZMQFileClient") - print(self.host, self.port) async def async_init(self): self.socket = self.context.socket(zmq.DEALER) @@ -43,7 +40,7 @@ async def async_init(self): self.socket.connect(f"tcp://{self.host}:{self.port}") self.logger.info(f"Connected to tcp://{self.host}:{self.port}") - async def download_files(self): + async def download_files(self): # noqa: C901 handles = {} offsets = {} progressbar = self.progressbar @@ -84,7 +81,9 @@ async def download_files(self): zmq_file_chunk = ZMQFileChunk.from_bytes(chunk) if filekey_str not in download_tasks and progressbar is not None: - human_size = f"{round(self.files[filekey_str]['size'] / 1024 / 1024, 2)} MB" + human_size = ( + f"{round(self.files[filekey_str]['size'] / 1024 / 1024, 2)} MB" + ) fname = self.files[filekey_str]["name"] download_tasks[filekey_str] = progressbar.add_task( f"Downloading({fname}|{human_size})", total=100 @@ -114,42 +113,10 @@ async def download_files(self): del offsets[filekey_str] if len(completed) == len(self.files): - for name, handle in handles.items(): + for name, handle in handles.items(): # noqa: B007 await handle.close() progressbar.stop() break async def close(self): await self.socket.close() - - -async def main(): - context = zmq.asyncio.Context() - host = "localhost" - port = 6000 - - files = { - "mac-capture": { - "name": "mac-capture.mp4", - "outdir": pathlib.Path("dst"), - "size": 2415932486 - - } - } - from chimerapy.engine.utils import get_progress_bar - - client = ZMQFileClient( - context, - host, - port, - credit=1, - chunk_size=250000, - files=files, - progressbar=get_progress_bar(), - ) - await client.async_init() - await client.download_files() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/chimerapy/engine/networking/zmq_file_transfer_server.py b/chimerapy/engine/networking/zmq_file_transfer_server.py index e6e605a5..b213db56 100644 --- a/chimerapy/engine/networking/zmq_file_transfer_server.py +++ b/chimerapy/engine/networking/zmq_file_transfer_server.py @@ -1,12 +1,12 @@ import asyncio import logging import os -import pathlib -from typing import Optional +from typing import Dict, Optional import aiofiles import zmq import zmq.asyncio +from aiofiles.threadpool.binary import AsyncBufferedReader from chimerapy.engine._logger import fork, getLogger from chimerapy.engine.networking.utils import ZMQFileChunk @@ -27,7 +27,7 @@ def __init__( self.host = host self.port = port self.paths = paths - self.handles = {} + self.handles: Dict[str, AsyncBufferedReader] = {} self.credits = credit self.socket: Optional[zmq.Socket] = None self.send_task: Optional[asyncio.Task] = None @@ -72,7 +72,9 @@ async def _send(self): file_name = file.name # round to 2 decimal places human_size = f"{round(os.path.getsize(file) / 1024 / 1024, 2)} MB" - upload_tasks[name] = progress_bar.add_task(f"Uploading({file_name}|{human_size})", total=100) + upload_tasks[name] = progress_bar.add_task( + f"Uploading({file_name}|{human_size})", total=100 + ) uploaded = set() while True: try: @@ -129,18 +131,3 @@ async def ashutdown(self): await handle.close() self.handles = {} - - -async def main(): - files = { - "mac-capture": pathlib.Path("src/mac-capture.mp4"), - } - server = ZMQFileServer( - context=zmq.asyncio.Context(), paths=files, port=6000, credit=1 - ) - await server.async_init() - await server.send_task - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/chimerapy/engine/node/worker_comms_service.py b/chimerapy/engine/node/worker_comms_service.py index d01e56bb..e264346e 100644 --- a/chimerapy/engine/node/worker_comms_service.py +++ b/chimerapy/engine/node/worker_comms_service.py @@ -1,7 +1,7 @@ import logging import pathlib import tempfile -from typing import Dict, Optional, List +from typing import Dict, List, Optional from ..data_protocols import NodeDiagnostics, NodePubTable from ..eventbus import Event, EventBus, TypedObserver @@ -10,12 +10,12 @@ from ..service import Service from ..states import NodeState from .events import ( + Artifact, DiagnosticsReportEvent, EnableDiagnosticsEvent, GatherEvent, ProcessNodePubTableEvent, RegisteredMethodEvent, - Artifact ) from .node_config import NodeConfig @@ -78,9 +78,7 @@ async def async_init(self): "teardown", on_asend=self.teardown, handle_event="drop" ), "artifacts": TypedObserver( - "artifacts", - on_asend=self.send_artifacts_info, - handle_event="pass" + "artifacts", on_asend=self.send_artifacts_info, handle_event="pass" ), } for ob in observers.values(): @@ -137,7 +135,8 @@ async def setup(self): await self.send_state() async def send_artifacts_info(self, artifacts_event: Event): - artifacts: List[Artifact] = artifacts_event.data + assert artifacts_event.data is not None + artifacts: List[Artifact] = artifacts_event.data # type: ignore assert self.state and self.eventbus and self.logger if self.client: @@ -151,12 +150,12 @@ async def send_artifacts_info(self, artifacts_event: Event): "filename": artifact.path.name, "mime_type": artifact.mime_type, "glob": artifact.glob, - "size": artifact.size + "size": artifact.size, } for artifact in artifacts - ] + ], } - + assert self.client is not None await self.client.async_send(signal=NODE_MESSAGE.ARTIFACTS, data=data) async def teardown(self): diff --git a/chimerapy/engine/worker/artifacts_transfer_service.py b/chimerapy/engine/worker/artifacts_transfer_service.py index d6013e0a..0fb1f473 100644 --- a/chimerapy/engine/worker/artifacts_transfer_service.py +++ b/chimerapy/engine/worker/artifacts_transfer_service.py @@ -52,7 +52,7 @@ async def async_init(self): ), } - for name, observer in self.observers.items(): + for name, observer in self.observers.items(): # noqa: B007 await self.event_bus.asubscribe(observer) async def _on_connect(self, manager_host: str, manager_port: int) -> None: @@ -66,7 +66,7 @@ def _is_remote_worker(self) -> bool: async def _initiate_artifacts_transfer(self, event: Event) -> None: artifacts_data = event.data - + assert artifacts_data is not None if not self._is_remote_worker(): self.logger.info("Initiating local artifacts transfer") await self.event_bus.asend( diff --git a/chimerapy/engine/worker/http_client_service.py b/chimerapy/engine/worker/http_client_service.py index 6c841811..349cfef9 100644 --- a/chimerapy/engine/worker/http_client_service.py +++ b/chimerapy/engine/worker/http_client_service.py @@ -306,6 +306,8 @@ async def _send_archive(self, path: pathlib.Path) -> bool: async def _signal_manager_artifacts_transfer_ready(self, event: Event) -> bool: manager_host, manager_port = self.get_address() + assert event.data is not None + data = { "ip": event.data["ip"], "port": event.data["port"], @@ -313,6 +315,7 @@ async def _signal_manager_artifacts_transfer_ready(self, event: Event) -> bool: "data": event.data["data"], "method": event.data["method"], } + async with self.http_client.post( f"http://{manager_host}:{manager_port}/workers/artifacts_transfer_ready", data=json.dumps(data), From 0955d7752c8a1952611c36097c80cad519025cde Mon Sep 17 00:00:00 2001 From: Umesh Date: Thu, 26 Oct 2023 18:13:18 -0500 Subject: [PATCH 13/13] Add debounced node status updates. Closes #288 --- .../engine/worker/http_client_service.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/chimerapy/engine/worker/http_client_service.py b/chimerapy/engine/worker/http_client_service.py index 26e2a683..9ae66e90 100644 --- a/chimerapy/engine/worker/http_client_service.py +++ b/chimerapy/engine/worker/http_client_service.py @@ -50,6 +50,9 @@ def __init__( # Services self.http_client = aiohttp.ClientSession() + # Debounce + self._debounce_delay = 500 # ms + self._update_task: Optional[asyncio.Task] = None async def async_init(self): @@ -60,7 +63,7 @@ async def async_init(self): ), "WorkerState.changed": TypedObserver( "WorkerState.changed", - on_asend=self._async_node_status_update, + on_asend=self._debounced_async_node_status_update, handle_event="drop", ), "send_archive": TypedObserver( @@ -338,8 +341,23 @@ async def _send_archive_remotely(self, host: str, port: int) -> bool: return False - async def _async_node_status_update(self) -> bool: + async def _debounced_async_node_status_update(self) -> bool: + if not self.connected_to_manager: + return False + + # Debounce + if self._update_task is not None: + self._update_task.cancel() + + self._update_task = asyncio.ensure_future(self._delayed_node_status_update()) + return True + + async def _delayed_node_status_update(self) -> bool: + await asyncio.sleep(self._debounce_delay / 1000) + return await self._async_node_status_update() + + async def _async_node_status_update(self) -> bool: if not self.connected_to_manager: return False @@ -348,6 +366,6 @@ async def _async_node_status_update(self) -> bool: self.manager_url + "/workers/node_status", data=self.state.to_json() ) as resp: return resp.ok - except aiohttp.client_exceptions.ClientOSError: + except Exception: self.logger.error(traceback.format_exc()) return False