diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index 7f826f4..c16bbd6 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -11,6 +11,7 @@ config: worker-shutdown: 10 # seconds node-creation: 130 # seconds reset: 30 + collect: 1800 # 30 minutes retry: data-collection: 30 # seconds logs-sink: @@ -39,3 +40,6 @@ config: deque-length: 10000 interval: 10 logging-enabled: false + file-transfer: + chunk-size: 500000 # bytes + max-chunks: 2 # Number of chunks to send at once diff --git a/chimerapy/engine/manager/artifacts_collector_service.py b/chimerapy/engine/manager/artifacts_collector_service.py new file mode 100644 index 0000000..335093a --- /dev/null +++ b/chimerapy/engine/manager/artifacts_collector_service.py @@ -0,0 +1,155 @@ +import logging +import pathlib +from typing import Any, Dict, Optional + +import aioshutil +import zmq.asyncio + +import chimerapy.engine.config as cpe_config +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.zmq_file_transfer_client import ZMQFileClient +from chimerapy.engine.utils import get_progress_bar + +from ..eventbus import Event, EventBus, TypedObserver +from ..service import Service +from ..states import ManagerState +from .events import UpdateSendArchiveEvent + + +class ArtifactsCollectorService(Service): + def __init__( + self, + name: str, + eventbus: EventBus, + state: ManagerState, + parent_logger: Optional[logging.Logger] = None, + ): + super().__init__(name=name) + self.eventbus = eventbus + self.observers: Dict[str, TypedObserver] = {} + self.clients: Dict[str, ZMQFileClient] = {} + self.state = state + self.progressbar = get_progress_bar() + + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + + self.logger = fork(parent_logger, self.__class__.__name__) + + async def async_init(self): + self.observers = { + "artifacts_transfer_ready": TypedObserver( + "artifacts_transfer_ready", on_asend=self.collect, handle_event="pass" + ) + } + + for name, observer in self.observers.items(): # noqa: B007 + await self.eventbus.asubscribe(observer) + + async def collect(self, event: Event) -> None: + assert event.data is not None + method = event.data["method"] + if method == "zmq": + self.logger.debug("Collecting artifacts over ZMQ") + await self._collect_zmq( + worker_id=event.data["worker_id"], + host=event.data["ip"], + port=event.data["port"], + artifacts=event.data["data"], + ) + else: + self.logger.debug("Collecting artifacts locally") + await self._collect_local( + worker_id=event.data["worker_id"], artifacts=event.data["data"] + ) + + async def _collect_zmq( + self, worker_id: str, host: str, port: int, artifacts: Dict[str, Any] + ): + files = {} + self.logger.debug("Preparing files to download") + for node_id, artifact_details in artifacts.items(): + out_dir = self._create_node_dir(worker_id, node_id) + for artifact in artifact_details: + key = f"{node_id}-{artifact['name']}" + files[key] = { + "name": artifact["filename"], + "size": artifact["size"], + "outdir": out_dir, + } + context = zmq.asyncio.Context.instance() + client = ZMQFileClient( + context=context, + host=host, + port=port, + credit=cpe_config.get("file-transfer.max-chunks"), + chunk_size=cpe_config.get("file-transfer.chunk-size"), + files=files, + parent_logger=self.logger, + progressbar=self.progressbar, + ) + self.clients[worker_id] = client + try: + await client.async_init() + await client.download_files() + event_data = UpdateSendArchiveEvent(worker_id=worker_id, success=True) + except Exception as e: + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=False, + ) + self.logger.error( + f"Error while collecting artifacts for worker {worker_id}: {e}" + ) + finally: + await self.eventbus.asend(Event("update_send_archive", event_data)) + self.logger.info(f"Successfully collected artifacts for worker {worker_id}") + + async def _collect_local(self, worker_id: str, artifacts: Dict[str, Any]) -> None: + try: + for node_id, node_artifacts in artifacts.items(): + node_dir = self._create_node_dir(worker_id, node_id) + + for artifact in node_artifacts: + artifact_path = pathlib.Path(artifact["path"]) + self.logger.debug(f"Copying {artifact_path} to {node_dir}") + await aioshutil.copyfile( + artifact_path, node_dir / artifact["filename"] + ) + + await self.eventbus.asend( + Event( + "update_send_archive", + UpdateSendArchiveEvent(worker_id=worker_id, success=True), + ) + ) + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=True, + ) + self.logger.info(f"Successfully collected artifacts for worker {worker_id}") + except Exception as e: + event_data = UpdateSendArchiveEvent( + worker_id=worker_id, + success=False, + ) + self.logger.error( + f"Error while collecting artifacts for worker {worker_id}: {e}" + ) + finally: + await self.eventbus.asend(Event("update_send_archive", event_data)) + + def _create_worker_dir(self, worker_id): + worker_name = self.state.workers[worker_id].name + worker_dir = self.state.logdir / f"{worker_name}-{worker_id[:10]}" + worker_dir.mkdir(parents=True, exist_ok=True) + + return worker_dir + + def _create_node_dir(self, worker_id, node_id): + worker_dir = self._create_worker_dir(worker_id) + nodes = self.state.workers[worker_id].nodes + node_dir = worker_dir / nodes[node_id].name + node_dir.mkdir(parents=True, exist_ok=True) + + return node_dir diff --git a/chimerapy/engine/manager/http_server_service.py b/chimerapy/engine/manager/http_server_service.py index 1d1557b..9359004 100644 --- a/chimerapy/engine/manager/http_server_service.py +++ b/chimerapy/engine/manager/http_server_service.py @@ -55,6 +55,10 @@ def __init__( web.post("/workers/deregister", self._deregister_worker_route), web.post("/workers/node_status", self._update_nodes_status), web.post("/workers/send_archive", self._update_send_archive), + web.post( + "/workers/artifacts_transfer_ready", + self._file_transfer_server_ready, + ), ], ) @@ -192,6 +196,12 @@ async def _update_send_archive(self, request: web.Request): await self.eventbus.asend(Event("update_send_archive", event_data)) return web.HTTPOk() + async def _file_transfer_server_ready(self, request: web.Request): + msg = await request.json() + event_data = msg + await self.eventbus.asend(Event("artifacts_transfer_ready", event_data)) + return web.HTTPOk() + ##################################################################################### ## Front-End API ##################################################################################### diff --git a/chimerapy/engine/manager/manager.py b/chimerapy/engine/manager/manager.py index 2b9b655..608c04b 100644 --- a/chimerapy/engine/manager/manager.py +++ b/chimerapy/engine/manager/manager.py @@ -15,6 +15,7 @@ # Eventbus from ..eventbus import Event, EventBus, make_evented from ..networking.async_loop_thread import AsyncLoopThread +from .artifacts_collector_service import ArtifactsCollectorService from .distributed_logging_service import DistributedLoggingService # Services @@ -110,6 +111,12 @@ async def aserve(self) -> bool: state=self.state, # **self.kwargs, ) + self.artifacts_collector = ArtifactsCollectorService( + name="artifacts_collector", + eventbus=self.eventbus, + state=self.state, + parent_logger=logger, + ) # Initialize services await self.http_server.async_init() @@ -117,6 +124,7 @@ async def aserve(self) -> bool: await self.zeroconf_service.async_init() await self.session_record.async_init() await self.distributed_logging.async_init() + await self.artifacts_collector.async_init() # Start all services await self.eventbus.asend(Event("start")) @@ -336,6 +344,9 @@ async def async_stop(self) -> bool: async def async_collect(self) -> bool: return await self.worker_handler.collect() + async def async_collect_v2(self) -> bool: + return await self.worker_handler.collect_v2() + async def async_reset(self, keep_workers: bool = True): return await self.worker_handler.reset(keep_workers) @@ -479,6 +490,18 @@ def collect(self) -> Future[bool]: """ return self._exec_coro(self.async_collect()) + def collect_v2(self) -> Future[bool]: + """Collect data from the Workers + + First, we wait until all the Nodes have finished save their data.\ + Then, manager request that Nodes' from the Workers. + + Returns: + Future[bool]: Future of success in collect data from Workers + + """ + return self._exec_coro(self.async_collect_v2()) + def reset( self, keep_workers: bool = True, blocking: bool = True ) -> Union[bool, Future[bool]]: diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index e70382e..796e051 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -12,6 +12,7 @@ import dill import networkx as nx +import chimerapy.engine.config as cpe_config from chimerapy.engine import _logger, config from ..data_protocols import NodePubTable @@ -824,6 +825,24 @@ async def collect(self) -> bool: await self.eventbus.asend(Event("save_meta")) return all(results) + def workers_collected(self): + for worker_id in self.state.workers: + if worker_id not in self.collected_workers: + return False + return True + + async def collect_v2(self) -> bool: + await self._broadcast_request("post", "/nodes/request_collect") + await async_waiting_for( + self.workers_collected, timeout=cpe_config.get("manager.timeout.collect") + ) + + for worker_id in self.collected_workers: + if not self.collected_workers[worker_id]: + return False + + return True + async def reset(self, keep_workers: bool = True): # Destroy Nodes safely diff --git a/chimerapy/engine/networking/enums.py b/chimerapy/engine/networking/enums.py index 04b9015..17058d5 100644 --- a/chimerapy/engine/networking/enums.py +++ b/chimerapy/engine/networking/enums.py @@ -36,3 +36,4 @@ class NODE_MESSAGE(Enum): REPORT_SAVING = 52 REPORT_RESULTS = 53 DIAGNOSTICS = 54 + ARTIFACTS = 55 diff --git a/chimerapy/engine/networking/utils.py b/chimerapy/engine/networking/utils.py new file mode 100644 index 0000000..892e893 --- /dev/null +++ b/chimerapy/engine/networking/utils.py @@ -0,0 +1,30 @@ +import os + + +class ZMQFileChunk: + """A ZeroMQ File Chunk.""" + + def __init__(self, data: bytes) -> None: + self.data = data + + def write_into(self, handle) -> None: + handle.write(self.data) + + async def awrite_into(self, ahandle) -> None: + await ahandle.write(self.data) + + @classmethod + def from_bytes(cls, data) -> "ZMQFileChunk": + return cls(data=data) + + @classmethod + def read_from(cls, handle, offset, chunk_size) -> "ZMQFileChunk": + handle.seek(offset, os.SEEK_SET) + data = handle.read(chunk_size) + return cls.from_bytes(data=data) + + @classmethod + async def aread_from(cls, ahandle, offset, chunk_size) -> "ZMQFileChunk": + await ahandle.seek(offset, os.SEEK_SET) + data = await ahandle.read(chunk_size) + return cls.from_bytes(data=data) diff --git a/chimerapy/engine/networking/zmq_file_transfer_client.py b/chimerapy/engine/networking/zmq_file_transfer_client.py new file mode 100644 index 0000000..14d957d --- /dev/null +++ b/chimerapy/engine/networking/zmq_file_transfer_client.py @@ -0,0 +1,122 @@ +import logging +import random +from typing import Any, Dict, Optional + +import aiofiles +import zmq.asyncio +from rich.progress import Progress + +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.utils import ZMQFileChunk + + +class ZMQFileClient: + def __init__( + self, + context: zmq.asyncio.Context, + host: str, + port: int, + credit: int, + chunk_size: int, + files: Dict[str, Any], + parent_logger: Optional[logging.Logger] = None, + progressbar: Optional[Progress] = None, + ) -> None: + self.host = host + self.port = port + self.context = context + self.files = files + self.credits = credit + self.chunk_size = chunk_size + self.progressbar = progressbar + self.socket = None + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + self.logger = fork(parent_logger, "ZMQFileClient") + + async def async_init(self): + self.socket = self.context.socket(zmq.DEALER) + self.socket.sndhwm = self.socket.rcvhwm = 1 + self.socket.connect(f"tcp://{self.host}:{self.port}") + self.logger.info(f"Connected to tcp://{self.host}:{self.port}") + + async def download_files(self): # noqa: C901 + handles = {} + offsets = {} + progressbar = self.progressbar + for name, details in self.files.items(): + fname = details["name"] + outdir = details["outdir"] + handles[name] = await aiofiles.open(outdir / fname, "wb") + offsets[name] = 0 + + download_tasks = {} + + credit = self.credits + chunk_size = self.chunk_size + completed = [] + + total = 0 + seq_no = 0 + + progressbar.start() + while True: + while credit: + selected_file = random.choice(list(offsets.keys())) + await self.socket.send_multipart( + [ + b"fetch", + selected_file.encode(), + b"%i" % offsets[selected_file], + b"%i" % chunk_size, + b"%i" % seq_no, + ] + ) + offsets[selected_file] = offsets[selected_file] + chunk_size + seq_no += 1 + credit -= 1 + try: + filekey, chunk, seq_no_recv_str = await self.socket.recv_multipart() + filekey_str = filekey.decode() + zmq_file_chunk = ZMQFileChunk.from_bytes(chunk) + + if filekey_str not in download_tasks and progressbar is not None: + human_size = ( + f"{round(self.files[filekey_str]['size'] / 1024 / 1024, 2)} MB" + ) + fname = self.files[filekey_str]["name"] + download_tasks[filekey_str] = progressbar.add_task( + f"Downloading({fname}|{human_size})", total=100 + ) + + complete = ( + offsets[filekey_str] / self.files[filekey_str]["size"] + ) * 100 + + if progressbar is not None: + progressbar.update(download_tasks[filekey_str], completed=complete) + + await zmq_file_chunk.awrite_into(ahandle=handles[filekey_str]) + + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise e + + credit += 1 + size = len(chunk) + total += size + if size < chunk_size: + completed.append(filekey_str) + progressbar.update(download_tasks[filekey_str], completed=100) + del offsets[filekey_str] + + if len(completed) == len(self.files): + for name, handle in handles.items(): # noqa: B007 + await handle.close() + progressbar.stop() + break + + async def close(self): + await self.socket.close() diff --git a/chimerapy/engine/networking/zmq_file_transfer_server.py b/chimerapy/engine/networking/zmq_file_transfer_server.py new file mode 100644 index 0000000..b213db5 --- /dev/null +++ b/chimerapy/engine/networking/zmq_file_transfer_server.py @@ -0,0 +1,133 @@ +import asyncio +import logging +import os +from typing import Dict, Optional + +import aiofiles +import zmq +import zmq.asyncio +from aiofiles.threadpool.binary import AsyncBufferedReader + +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.utils import ZMQFileChunk +from chimerapy.engine.utils import get_progress_bar + + +class ZMQFileServer: + def __init__( + self, + context: zmq.asyncio.Context, + paths, + credit: int, + host: str = "*", + port: int = 0, + parent_logger: Optional[logging.Logger] = None, + ) -> None: + self.context = context + self.host = host + self.port = port + self.paths = paths + self.handles: Dict[str, AsyncBufferedReader] = {} + self.credits = credit + self.socket: Optional[zmq.Socket] = None + self.send_task: Optional[asyncio.Task] = None + + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine-networking") + + self.logger = fork(parent_logger, "ZMQFileServer") + + async def async_init(self) -> None: + """Initialize the server.""" + await self._initialize_file_handlers() + self.socket = self.context.socket(zmq.ROUTER) + self.socket.sndhwm = self.socket.rcvhwm = self.credits + + if self.port != 0: + self.socket.bind(f"tcp://{self.host}:{self.port}") + else: + self.port = self.socket.bind_to_random_port(f"tcp://{self.host}") + + self.logger.info(f"Listening on tcp://{self.host}:{self.port}") + self.send_task = asyncio.create_task(self._send()) + self.progress_bar = None + + async def _initialize_file_handlers(self) -> None: + """Initialize the file handlers.""" + for name, file in self.paths.items(): + assert file.exists() + assert file.is_file() + self.handles[name] = await aiofiles.open(file, mode="rb") + + async def _send(self): + """Send the file chunks.""" + assert self.socket is not None + router = self.socket + self.progress_bar = get_progress_bar() + progress_bar = self.progress_bar + progress_bar.start() + upload_tasks = {} + + for name, file in self.paths.items(): + file_name = file.name + # round to 2 decimal places + human_size = f"{round(os.path.getsize(file) / 1024 / 1024, 2)} MB" + upload_tasks[name] = progress_bar.add_task( + f"Uploading({file_name}|{human_size})", total=100 + ) + uploaded = set() + while True: + try: + msg = await router.recv_multipart() + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + return + else: + raise e + + identity, command, fname, offset_str, chunksz_str, seq_nostr = msg + fname_str = fname.decode() + + if fname_str in self.handles: + assert command == b"fetch" + offset = int(offset_str) + chunksz = int(chunksz_str) + seq_no = int(seq_nostr) + handle = self.handles[fname_str] + zmq_chunk = await ZMQFileChunk.aread_from( + ahandle=handle, offset=offset, chunk_size=chunksz + ) + + data = zmq_chunk.data + if not data: + progress_bar.update(upload_tasks[fname_str], completed=100) + uploaded.add(fname_str) + + if uploaded == set(self.paths.keys()): + progress_bar.stop() + break + continue + else: + await router.send_multipart([identity, fname, data, b"%i" % seq_no]) + completed = ( + await handle.tell() / os.path.getsize(self.paths[fname_str]) + ) * 100 + progress_bar.update(upload_tasks[fname_str], completed=completed) + + async def ashutdown(self): + """Shutdown the server.""" + if self.progress_bar is not None: + self.progress_bar.stop() + + if self.send_task is not None: + await self.send_task + self.send_task = None + + if self.socket is not None: + self.socket.close() + self.socket = None + + for handle in self.handles.values(): + await handle.close() + + self.handles = {} diff --git a/chimerapy/engine/node/events.py b/chimerapy/engine/node/events.py index 2f59001..8c5a9ed 100644 --- a/chimerapy/engine/node/events.py +++ b/chimerapy/engine/node/events.py @@ -1,5 +1,6 @@ +import pathlib from dataclasses import dataclass -from typing import Any, Dict +from typing import Any, Dict, Optional from ..data_protocols import NodeDiagnostics, NodePubTable from ..networking.client import Client @@ -41,3 +42,12 @@ class GatherEvent: @dataclass class DiagnosticsReportEvent: # diagnostics_report diagnostics: NodeDiagnostics + + +@dataclass +class Artifact: + name: str + path: pathlib.Path + mime_type: str + size: Optional[int] = None + glob: Optional[str] = None diff --git a/chimerapy/engine/node/record_service.py b/chimerapy/engine/node/record_service.py index 4769fbf..3f3c4ab 100644 --- a/chimerapy/engine/node/record_service.py +++ b/chimerapy/engine/node/record_service.py @@ -18,6 +18,8 @@ ) from ..service import Service from ..states import NodeState +from .events import Artifact +from ..eventbus import Event logger = _logger.getLogger("chimerapy-engine") @@ -149,12 +151,31 @@ def run(self): # self.logger.debug(f"{self}: Closed all entries") - def collect(self): + async def collect(self): # self.logger.debug(f"{self}: collecting recording") # Signal to stop and save self.is_running.clear() + artifacts = {} if self._record_thread: self._record_thread.join() + for name, entry in self.records.items(): + artifacts[name] = entry.get_meta() + + all_artifacts = [] + self.logger.info("Sending artifacts event") + for name, artifact in artifacts.items(): + event_data = Artifact( + name=artifact["name"], + mime_type=artifact["mime_type"], + path=artifact["path"], + glob=artifact["glob"], + size=artifact["path"].stat().st_size, + ) + all_artifacts.append(event_data) + + await self.eventbus.asend(Event("artifacts", data=all_artifacts)) + self.logger.debug("Sent artifacts event") + # self.logger.debug(f"{self}: Finish saving records") diff --git a/chimerapy/engine/node/worker_comms_service.py b/chimerapy/engine/node/worker_comms_service.py index 1f055fd..e264346 100644 --- a/chimerapy/engine/node/worker_comms_service.py +++ b/chimerapy/engine/node/worker_comms_service.py @@ -1,7 +1,7 @@ import logging import pathlib import tempfile -from typing import Dict, Optional +from typing import Dict, List, Optional from ..data_protocols import NodeDiagnostics, NodePubTable from ..eventbus import Event, EventBus, TypedObserver @@ -10,6 +10,7 @@ from ..service import Service from ..states import NodeState from .events import ( + Artifact, DiagnosticsReportEvent, EnableDiagnosticsEvent, GatherEvent, @@ -76,6 +77,9 @@ async def async_init(self): "teardown": TypedObserver( "teardown", on_asend=self.teardown, handle_event="drop" ), + "artifacts": TypedObserver( + "artifacts", on_asend=self.send_artifacts_info, handle_event="pass" + ), } for ob in observers.values(): await self.eventbus.asubscribe(ob) @@ -130,6 +134,30 @@ async def setup(self): # Send publisher port and host information await self.send_state() + async def send_artifacts_info(self, artifacts_event: Event): + assert artifacts_event.data is not None + artifacts: List[Artifact] = artifacts_event.data # type: ignore + + assert self.state and self.eventbus and self.logger + if self.client: + pass + data = { + "node_id": self.state.id, + "artifacts": [ + { + "name": artifact.name, + "path": str(artifact.path), + "filename": artifact.path.name, + "mime_type": artifact.mime_type, + "glob": artifact.glob, + "size": artifact.size, + } + for artifact in artifacts + ], + } + assert self.client is not None + await self.client.async_send(signal=NODE_MESSAGE.ARTIFACTS, data=data) + async def teardown(self): # Shutdown the client diff --git a/chimerapy/engine/records/audio_record.py b/chimerapy/engine/records/audio_record.py index 72d8806..657ffc9 100644 --- a/chimerapy/engine/records/audio_record.py +++ b/chimerapy/engine/records/audio_record.py @@ -65,3 +65,11 @@ def close(self): # Close the audio writer self.audio_writer.close() + + def get_meta(self): + return { + "name": self.name, + "path": self.audio_file_path, + "glob": None, + "mime_type": "audio/wav" + } diff --git a/chimerapy/engine/records/image_record.py b/chimerapy/engine/records/image_record.py index b60a019..cc7b860 100644 --- a/chimerapy/engine/records/image_record.py +++ b/chimerapy/engine/records/image_record.py @@ -1,6 +1,7 @@ # Built-in Imports import os import pathlib +import shutil from typing import Any, Dict # Third-party Imports @@ -24,6 +25,7 @@ def __init__( # For image entry, need to save to a new directory self.save_loc = self.dir / self.name + self.zip_loc = None os.makedirs(self.save_loc, exist_ok=True) self.index = 0 @@ -37,4 +39,14 @@ def write(self, data_chunk: Dict[str, Any]): self.index += 1 def close(self): - ... + save_name = self.save_loc.parent / self.name + shutil.make_archive(str(save_name), "zip", str(self.save_loc)) + self.zip_loc = save_name.with_suffix(".zip") + + def get_meta(self): + return { + "name": self.name, + "path": self.zip_loc, + "glob": None, + "mime_type": "application/zip" + } diff --git a/chimerapy/engine/records/json_record.py b/chimerapy/engine/records/json_record.py index 5f87574..7cdbea8 100644 --- a/chimerapy/engine/records/json_record.py +++ b/chimerapy/engine/records/json_record.py @@ -42,3 +42,11 @@ def close(self): if self.file_handler is not None: self.file_handler.close() self.file_handler = None + + def get_meta(self): + return { + "name": self.name, + "path": self.jsonl_path, + "glob": None, + "mime_type": "text/plain" + } diff --git a/chimerapy/engine/records/record.py b/chimerapy/engine/records/record.py index 4335f1b..bdb70ad 100644 --- a/chimerapy/engine/records/record.py +++ b/chimerapy/engine/records/record.py @@ -32,3 +32,7 @@ def close(self): """ raise NotImplementedError("``close`` needs to be implemented.") + + def get_meta(self): + """Get metadata.""" + raise NotImplementedError("``get_meta`` needs to be implemented.") diff --git a/chimerapy/engine/records/tabular_record.py b/chimerapy/engine/records/tabular_record.py index 911f089..b6cffc1 100644 --- a/chimerapy/engine/records/tabular_record.py +++ b/chimerapy/engine/records/tabular_record.py @@ -52,3 +52,11 @@ def write(self, data_chunk: Dict[str, Any]): def close(self): ... + + def get_meta(self): + return { + "name": self.name, + "path": self.tabular_file_path, + "glob": None, + "mime_type": "text/csv" + } diff --git a/chimerapy/engine/records/text_record.py b/chimerapy/engine/records/text_record.py index d036bbc..2c2c128 100644 --- a/chimerapy/engine/records/text_record.py +++ b/chimerapy/engine/records/text_record.py @@ -27,12 +27,14 @@ def __init__( self.name = name self.first_frame = False self.file_handler: Optional[IO[str]] = None + self.save_path: Optional[pathlib.Path] = None def write(self, data_chunk: Dict[str, Any]): if not self.first_frame: self.file_handler = (self.dir / f"{self.name}.{data_chunk['suffix']}").open( "w" ) + self.save_path = self.dir / f"{self.name}.{data_chunk['suffix']}" self.first_frame = True text_data = data_chunk["data"] @@ -43,3 +45,11 @@ def close(self): if self.file_handler is not None: self.file_handler.close() self.file_handler = None + + def get_meta(self): + return { + "name": self.name, + "path": self.save_path, + "glob": None, + "mime_type": "text/plain" + } diff --git a/chimerapy/engine/records/video_record.py b/chimerapy/engine/records/video_record.py index 01d020d..d5c483d 100644 --- a/chimerapy/engine/records/video_record.py +++ b/chimerapy/engine/records/video_record.py @@ -100,3 +100,11 @@ def close(self): # Close the video self.video_writer.release() + + def get_meta(self): + return { + "name": self.name, + "path": self.video_file_path, + "glob": None, + "mime_type": "video/mp4" + } diff --git a/chimerapy/engine/utils.py b/chimerapy/engine/utils.py index 92c42d7..512e23c 100644 --- a/chimerapy/engine/utils.py +++ b/chimerapy/engine/utils.py @@ -10,6 +10,8 @@ from concurrent.futures import Future from typing import Any, Callable, Coroutine, Dict, Optional, Tuple, Union +from rich.progress import BarColumn, Progress, TimeElapsedColumn + # Third-party # Internal from chimerapy.engine import _logger @@ -19,6 +21,19 @@ BYTES_PER_MB = 1024 * 1024 +def get_progress_bar() -> Progress: + """Get a progress bar.""" + columns = [ + "[progress.description]{task.description}", + "[progress.percentage]{task.percentage:>3.0f}%", + BarColumn(bar_width=None), + TimeElapsedColumn(), + ] + + progress = Progress(*columns) + return progress + + def clear_queue(input_queue: queue.Queue): """Clear a queue. Args: diff --git a/chimerapy/engine/worker/artifacts_transfer_service.py b/chimerapy/engine/worker/artifacts_transfer_service.py new file mode 100644 index 0000000..0fb1f47 --- /dev/null +++ b/chimerapy/engine/worker/artifacts_transfer_service.py @@ -0,0 +1,114 @@ +import logging +import pathlib +from typing import Dict, Optional, Tuple + +import zmq.asyncio + +import chimerapy.engine.config as cpe_config +from chimerapy.engine._logger import fork, getLogger +from chimerapy.engine.networking.zmq_file_transfer_server import ZMQFileServer +from chimerapy.engine.utils import get_ip_address + +from ..eventbus import Event, EventBus, TypedObserver +from ..service import Service +from ..states import WorkerState + + +class ArtifactsTransferService(Service): + def __init__( + self, + name: str, + event_bus: EventBus, + state: WorkerState, + parent_logger: Optional[logging.Logger] = None, + ): + super().__init__(name=name) + self.event_bus = event_bus + self.observers: Dict[str, TypedObserver] = {} + self.server: Optional[ZMQFileServer] = None + self.state = state + self.addr: Optional[Tuple] = None + + if parent_logger is None: + parent_logger = getLogger("chimerapy-engine") + + self.logger = fork(parent_logger, self.__class__.__name__) + + async def async_init(self): + self.observers = { + "initiate_transfer": TypedObserver( + "initiate_transfer", + on_asend=self._initiate_artifacts_transfer, + handle_event="pass", + ), + "connected": TypedObserver( + "connected", on_asend=self._on_connect, handle_event="unpack" + ), + "disconnected": TypedObserver( + "disconnected", on_asend=self._on_disconnect, handle_event="drop" + ), + "shutdown": TypedObserver( + "shutdown", on_asend=self.shutdown, handle_event="drop" + ), + } + + for name, observer in self.observers.items(): # noqa: B007 + await self.event_bus.asubscribe(observer) + + async def _on_connect(self, manager_host: str, manager_port: int) -> None: + self.addr = (manager_host, manager_port) + + async def _on_disconnect(self) -> None: + self.addr = None + + def _is_remote_worker(self) -> bool: + return self.addr is not None and self.addr[0] != get_ip_address() + + async def _initiate_artifacts_transfer(self, event: Event) -> None: + artifacts_data = event.data + assert artifacts_data is not None + if not self._is_remote_worker(): + self.logger.info("Initiating local artifacts transfer") + await self.event_bus.asend( + Event( + "artifacts_transfer_ready", + data={ + "port": None, + "ip": None, + "data": artifacts_data, + "method": "file_copy", + }, + ) + ) + return + + files = {} + for node_id, data in artifacts_data.items(): + for artifact in data: + files[f"{node_id}-{artifact['name']}"] = pathlib.Path(artifact["path"]) + + self.server = ZMQFileServer( + context=zmq.asyncio.Context(), + paths=files, + credit=cpe_config.get("file-transfer.max-chunks"), + host=get_ip_address(), + port=0, + parent_logger=self.logger, + ) + + await self.server.async_init() + self.logger.info("Initiating remote artifacts transfer") + await self.event_bus.asend( + Event( + "artifacts_transfer_ready", + data={ + "port": self.server.port, + "ip": self.server.host, + "data": artifacts_data, + "method": "zmq", + }, + ) + ) + + async def shutdown(self) -> None: + await self.server.ashutdown() if self.server else None diff --git a/chimerapy/engine/worker/events.py b/chimerapy/engine/worker/events.py index e140349..e5ea0de 100644 --- a/chimerapy/engine/worker/events.py +++ b/chimerapy/engine/worker/events.py @@ -62,3 +62,14 @@ class UpdateResultsEvent: @dataclass class SendArchiveEvent: path: pathlib.Path + + +@dataclass +class ConnectedEvent: + manager_host: str + manager_port: int + + +@dataclass +class DisconnectedEvent: + pass diff --git a/chimerapy/engine/worker/http_client_service.py b/chimerapy/engine/worker/http_client_service.py index 26e2a68..c43bcb1 100644 --- a/chimerapy/engine/worker/http_client_service.py +++ b/chimerapy/engine/worker/http_client_service.py @@ -14,13 +14,13 @@ from chimerapy.engine import _logger, config -from ..eventbus import EventBus, TypedObserver +from ..eventbus import Event, EventBus, TypedObserver from ..logger.zmq_handlers import NodeIDZMQPullListener from ..networking import Client from ..service import Service from ..states import WorkerState from ..utils import get_ip_address -from .events import SendArchiveEvent +from .events import ConnectedEvent, DisconnectedEvent, SendArchiveEvent from .zeroconf_listener import ZeroconfListener @@ -50,6 +50,9 @@ def __init__( # Services self.http_client = aiohttp.ClientSession() + # Debounce + self._debounce_delay = 500 # ms + self._update_task: Optional[asyncio.Task] = None async def async_init(self): @@ -60,7 +63,7 @@ async def async_init(self): ), "WorkerState.changed": TypedObserver( "WorkerState.changed", - on_asend=self._async_node_status_update, + on_asend=self._debounced_async_node_status_update, handle_event="drop", ), "send_archive": TypedObserver( @@ -69,6 +72,11 @@ async def async_init(self): on_asend=self._send_archive, handle_event="unpack", ), + "artifacts_transfer_ready": TypedObserver( + "artifacts_transfer_ready", + on_asend=self._signal_manager_artifacts_transfer_ready, + handle_event="pass", + ), } for ob in self.observers.values(): await self.eventbus.asubscribe(ob) @@ -155,6 +163,7 @@ async def async_deregister(self) -> bool: if resp.ok: self.connected_to_manager = False + await self.eventbus.asend(Event("disconnected", DisconnectedEvent())) return resp.ok @@ -207,6 +216,12 @@ async def _async_connect_via_ip( self.logger.info( f"{self}: connection successful to Manager @ {host}:{port}." ) + await self.eventbus.asend( + Event( + "connected", + ConnectedEvent(manager_host=host, manager_port=port), + ) + ) return True return False @@ -292,6 +307,24 @@ async def _send_archive(self, path: pathlib.Path) -> bool: return success + async def _signal_manager_artifacts_transfer_ready(self, event: Event) -> bool: + manager_host, manager_port = self.get_address() + assert event.data is not None + + data = { + "ip": event.data["ip"], + "port": event.data["port"], + "worker_id": self.state.id, + "data": event.data["data"], + "method": event.data["method"], + } + + async with self.http_client.post( + f"http://{manager_host}:{manager_port}/workers/artifacts_transfer_ready", + data=json.dumps(data), + ) as resp: + return resp.ok + async def _send_archive_locally(self, path: pathlib.Path) -> pathlib.Path: self.logger.debug(f"{self}: sending archive locally") @@ -338,8 +371,23 @@ async def _send_archive_remotely(self, host: str, port: int) -> bool: return False - async def _async_node_status_update(self) -> bool: + async def _debounced_async_node_status_update(self) -> bool: + if not self.connected_to_manager: + return False + + # Debounce + if self._update_task is not None: + self._update_task.cancel() + + self._update_task = asyncio.ensure_future(self._delayed_node_status_update()) + return True + + async def _delayed_node_status_update(self) -> bool: + await asyncio.sleep(self._debounce_delay / 1000) + return await self._async_node_status_update() + + async def _async_node_status_update(self) -> bool: if not self.connected_to_manager: return False @@ -348,6 +396,6 @@ async def _async_node_status_update(self) -> bool: self.manager_url + "/workers/node_status", data=self.state.to_json() ) as resp: return resp.ok - except aiohttp.client_exceptions.ClientOSError: + except Exception: self.logger.error(traceback.format_exc()) return False diff --git a/chimerapy/engine/worker/http_server_service.py b/chimerapy/engine/worker/http_server_service.py index 8eaba1d..47fd5d9 100644 --- a/chimerapy/engine/worker/http_server_service.py +++ b/chimerapy/engine/worker/http_server_service.py @@ -1,12 +1,19 @@ import asyncio import enum import logging +import os import pathlib import pickle -from typing import Dict, List +from typing import Any, Dict, List +import aiofiles +import zmq +import zmq.asyncio from aiohttp import web +from chimerapy.engine import config +from chimerapy.engine.utils import async_waiting_for, get_progress_bar + from ..data_protocols import ( NodeDiagnostics, NodePubEntry, @@ -49,6 +56,7 @@ def __init__( # Containers self.tasks: List[asyncio.Task] = [] + self.artifacts_data: Dict[str, Any] = {} # Create server self.server = Server( @@ -60,6 +68,7 @@ def __init__( web.get("/nodes/pub_table", self._async_get_node_pub_table), web.post("/nodes/pub_table", self._async_process_node_pub_table), web.get("/nodes/gather", self._async_report_node_gather), + web.post("/nodes/request_collect", self._async_request_collect), web.post("/nodes/collect", self._async_collect), web.post("/nodes/step", self._async_step_route), web.post("/nodes/start", self._async_start_nodes_route), @@ -67,6 +76,7 @@ def __init__( web.post("/nodes/registered_methods", self._async_request_method_route), web.post("/nodes/stop", self._async_stop_nodes_route), web.post("/nodes/diagnostics", self._async_diagnostics_route), + web.get("/nodes/artifacts", self._async_get_artifacts_route), # web.post("/packages/load", self._async_load_sent_packages), web.post("/shutdown", self._async_shutdown_route), ], @@ -75,6 +85,7 @@ def __init__( NODE_MESSAGE.REPORT_GATHER: self._async_node_report_gather, NODE_MESSAGE.REPORT_RESULTS: self._async_node_report_results, NODE_MESSAGE.DIAGNOSTICS: self._async_node_diagnostics, + NODE_MESSAGE.ARTIFACTS: self._async_node_artifacts, }, parent_logger=self.logger, ) @@ -290,6 +301,23 @@ async def _async_collect(self, request: web.Request) -> web.Response: asyncio.create_task(self._collect_and_send(pathlib.Path(data["path"]))) return web.HTTPOk() + def _have_nodes_saved(self): + node_fsm = list(node.fsm for node in self.state.nodes.values()) + return all(fsm == "SAVED" for fsm in node_fsm) + + async def _async_request_collect(self, request: web.Request) -> web.Response: + data = await request.json() + await self.eventbus.asend(Event("collect")) + await async_waiting_for(self._have_nodes_saved, timeout=100) + await self.eventbus.asend(Event("initiate_transfer", data=self.artifacts_data)) + return web.HTTPOk() + + async def _zip_direcotry(self, path: pathlib.Path) -> pathlib.Path: + import aioshutil + + zip_path = await aioshutil.make_archive(path, "zip", path.parent, path.name) + return zip_path + async def _async_diagnostics_route(self, request: web.Request) -> web.Response: data = await request.json() @@ -298,6 +326,9 @@ async def _async_diagnostics_route(self, request: web.Request) -> web.Response: await self.eventbus.asend(Event("diagnostics", event_data)) return web.HTTPOk() + async def _async_get_artifacts_route(self, request: web.Request) -> web.Response: + return web.json_response(self.artifacts_data) + async def _async_shutdown_route(self, request: web.Request) -> web.Response: # Execute shutdown after returning HTTPOk (prevent Manager stuck waiting) self.tasks.append(asyncio.create_task(self.eventbus.asend(Event("shutdown")))) @@ -352,3 +383,8 @@ async def _async_node_diagnostics(self, msg: Dict, ws: web.WebSocketResponse): diag = NodeDiagnostics.from_dict(msg["data"]["diagnostics"]) if node_id in self.state.nodes: self.state.nodes[node_id].diagnostics = diag + + async def _async_node_artifacts(self, msg: Dict, ws: web.WebSocketResponse): + data = msg["data"] + self.artifacts_data[data["node_id"]] = data["artifacts"] + await self.eventbus.asend(Event("artifacts", data=self.artifacts_data)) diff --git a/chimerapy/engine/worker/worker.py b/chimerapy/engine/worker/worker.py index 4463eaa..49f070a 100644 --- a/chimerapy/engine/worker/worker.py +++ b/chimerapy/engine/worker/worker.py @@ -16,6 +16,7 @@ from ..networking.async_loop_thread import AsyncLoopThread from ..node import NodeConfig from ..states import NodeState, WorkerState +from .artifacts_transfer_service import ArtifactsTransferService from .http_client_service import HttpClientService from .http_server_service import HttpServerService from .node_handler_service import NodeHandlerService @@ -106,9 +107,17 @@ async def aserve(self) -> bool: logreceiver=self.logreceiver, ) + self.artifacts_transfer = ArtifactsTransferService( + name="artifacts_transfer", + event_bus=self.eventbus, + state=self.state, + parent_logger=self.logger, + ) + await self.http_client.async_init() await self.http_server.async_init() await self.node_handler.async_init() + await self.artifacts_transfer.async_init() # Start all services await self.eventbus.asend(Event("start")) diff --git a/test/file_transfer/.gitignore b/test/file_transfer/.gitignore new file mode 100644 index 0000000..01c9b85 --- /dev/null +++ b/test/file_transfer/.gitignore @@ -0,0 +1 @@ +cp-logs \ No newline at end of file diff --git a/test/file_transfer/configs/random-multi-file-creator.json b/test/file_transfer/configs/random-multi-file-creator.json new file mode 100644 index 0000000..39e9203 --- /dev/null +++ b/test/file_transfer/configs/random-multi-file-creator.json @@ -0,0 +1,73 @@ +{ + "mode": "record", + "runtime": 100, + "workers": { + "manager_ip": "129.59.104.153", + "manager_port": 9001, + "instances": [ + { + "name": "remote-mac", + "id": "remote-mac", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + }, + { + "name": "local", + "id": "local", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + }, + { + "name": "remote-win", + "id": "remote-win", + "remote": true, + "description": "local worker for the MMLA pipeline demo with a video node" + } + ] + }, + "nodes": [ + { + "registry_name": "FileCreator", + "name": "file-creator-win", + "kwargs": { + "filename": "random-win.bin", + "per_step_mb": 10 + }, + "package": "ft" + }, + { + "registry_name": "FileCreator", + "name": "file-creator-mac", + "kwargs": { + "filename": "random-mac.bin", + "per_step_mb": 10 + }, + "package": "ft" + }, + { + "registry_name": "FileCreator", + "name": "file-creator-local", + "kwargs": { + "filename": "random-local.bin", + "per_step_mb": 10 + }, + "package": "ft" + } + ], + "adj": [], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": [ + "file-creator-local" + ], + "remote-mac": [ + "file-creator-mac" + ], + "remote-win": [ + "file-creator-win" + ] + } +} diff --git a/test/file_transfer/ft/__init__.py b/test/file_transfer/ft/__init__.py new file mode 100644 index 0000000..bfea4a7 --- /dev/null +++ b/test/file_transfer/ft/__init__.py @@ -0,0 +1,7 @@ +def register_nodes_metadata(): + return { + "description": "Test File Upload Nodes", + "nodes": [ + "ft.file_creator:FileCreator" + ] + } diff --git a/test/file_transfer/ft/file_creator.py b/test/file_transfer/ft/file_creator.py new file mode 100644 index 0000000..a0539ff --- /dev/null +++ b/test/file_transfer/ft/file_creator.py @@ -0,0 +1,43 @@ +import os +import time +import json +import hashlib + +from chimerapy.orchestrator import sink_node +from chimerapy.engine.node import Node + + +@sink_node(name="FileCreator", add_to_registry=False) +class FileCreator(Node): + """A utility class to upload files to the manager.""" + def __init__(self, filename, per_step_mb = 1, name="FileUploader"): + super().__init__(name=name) + self.per_step_mb = per_step_mb + self.filename = filename + self.file = None + + def step(self) -> None: + if self.file is None: + self.file = open(self.state.logdir / self.filename, "wb") + + self.file.write(os.urandom(self.per_step_mb * 1024 * 1024)) + time.sleep(1.0) + self.logger.info(f"Written {self.per_step_mb}MB to {self.filename}") + if self.state.fsm == "STOPPED": + self.file.close() + self.calculate_md5() + + def calculate_md5(self): + md5 = hashlib.md5() + with open(self.state.logdir / self.filename, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + md5.update(chunk) + + summary = { + "filename": self.filename, + "md5": md5.hexdigest() + } + + with open(self.state.logdir / "summary.json", "w") as f: + json.dump(summary, f, indent=2) + diff --git a/test/file_transfer/setup.py b/test/file_transfer/setup.py new file mode 100644 index 0000000..0341db7 --- /dev/null +++ b/test/file_transfer/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, find_packages + +setup( + name="ft", + version="0.0.1", + packages=find_packages(), + install_requires=[ + "chimerapy-engine", + "chimerapy-orchestrator" + ], + entry_points={ + "chimerapy.orchestrator.nodes_registry": [ + "get_nodes_registry = ft:register_nodes_metadata" + ] + } +) \ No newline at end of file