diff --git a/docs/contributing.md b/docs/contributing.md index 89ce639..10007e4 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -91,4 +91,5 @@ By contributing, you agree that your contributions will be licensed under the sa We sincerely thank the following contributors for their valuable contributions: - [Ahmad Tarraf](https://github.com/a-tarraf) - [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration -- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score \ No newline at end of file +- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score +- [Tim Dieringer](https://github.com/Tim-Dieringer): bachelor thesis: Additional integration for Metric Proxy \ No newline at end of file diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py new file mode 100755 index 0000000..d1014ea --- /dev/null +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -0,0 +1,128 @@ +import math +import time +import numpy as np +import zmq +import msgpack +from rich.console import Console + +from multiprocessing import Pool, cpu_count +from ftio.api.metric_proxy.parallel_proxy import execute, execute_parallel +from ftio.prediction.tasks import ftio_metric_task, ftio_metric_task_save + +from ftio.api.metric_proxy.parse_proxy import filter_metrics +from ftio.freq.helper import MyConsole +import signal + +CONSOLE = MyConsole() +CONSOLE.set(True) + +CURRENT_ADDRESS = None +IDLE_TIMEOUT = 100 +last_request = time.time() + +def sanitize(obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {k: sanitize(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [sanitize(v) for v in obj] + return obj + + +def handle_request(msg: bytes) -> bytes: + """Handle one FTIO request via ZMQ.""" + global CURRENT_ADDRESS + + if msg == b"ping": + return b"pong" + + if msg.startswith(b"New Address: "): + new_address = msg[len(b"New Address: "):].decode() + CURRENT_ADDRESS = new_address + return b"Address updated" + + try: + req = msgpack.unpackb(msg, raw=False) + argv = req.get("argv", []) + raw_metrics = req.get("metrics", []) + + metrics = filter_metrics(raw_metrics, filter_deriv=False) + print(f"Processing {len(metrics)} metrics") + + print(f"With Arguments: {argv}") + argv.extend(["-e", "no"]) + + disable_parallel = req.get("disable_parallel", False) + + ranks = 32 + + + except Exception as e: + return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True) + + try: + t = time.process_time() + if disable_parallel: + data = execute(metrics, argv, ranks, False) + else: + data = execute_parallel(metrics, argv, ranks) + elapsed_time = time.process_time() - t + CONSOLE.info(f"[blue]Calculation time: {elapsed_time} s[/]") + + native_data = sanitize(list(data)) + + return msgpack.packb(native_data, use_bin_type=True) + + except Exception as e: + print(f"Error during processing: {e}") + return msgpack.packb({"error": str(e)}, use_bin_type=True) + + +def main(address: str = "tcp://*:0"): + """FTIO ZMQ Server entrypoint for Metric Proxy.""" + global CURRENT_ADDRESS, last_request, POOL + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(address) + CURRENT_ADDRESS = address + + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal(signal.SIGINT, shutdown_handler) + + endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode() + print(endpoint, flush=True) + + console = Console() + console.print(f"[green]FTIO ZMQ Server listening on {endpoint}[/]") + + try: + while True: + if socket.poll(timeout=1000): + msg = socket.recv() + console.print(f"[cyan]Received request ({len(msg)} bytes)[/]") + last_request = time.time() + reply = handle_request(msg) + socket.send(reply) + + if reply == b"Address updated": + console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]") + socket.close() + socket = context.socket(zmq.REP) + socket.bind(CURRENT_ADDRESS) + else: + if time.time() - last_request > IDLE_TIMEOUT: + console.print("Idle timeout reached, shutting down server") + break + finally: + socket.close(linger=0) + context.term() + + +def shutdown_handler(signum, frame): + raise SystemExit + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ftio/prediction/tasks.py b/ftio/prediction/tasks.py index 73d74cb..79c6d30 100644 --- a/ftio/prediction/tasks.py +++ b/ftio/prediction/tasks.py @@ -5,6 +5,7 @@ # from ftio.prediction.helper import get_dominant # from ftio.plot.freq_plot import convert_and_plot from ftio.freq.helper import MyConsole +from ftio.freq.prediction import Prediction from ftio.parse.args import parse_args from ftio.plot.freq_plot import convert_and_plot from ftio.processing.print_output import display_prediction @@ -71,7 +72,33 @@ def ftio_metric_task_save( ) -> None: prediction = ftio_metric_task(metric, arrays, argv, ranks, show) # freq = get_dominant(prediction) #just get a single dominant value - if prediction: + names = [] + if prediction.top_freqs: + freqs = prediction.top_freqs["freq"] + amps = prediction.top_freqs["amp"] + phis = prediction.top_freqs["phi"] + + for f, a, p in zip(freqs, amps, phis): + names.append(prediction.get_wave_name(f, a, p)) + + data.append( + { + "metric": f"{metric}", + "dominant_freq": prediction.dominant_freq, + "conf": prediction.conf, + "amp": prediction.amp, + "phi": prediction.phi, + "t_start": prediction.t_start, + "t_end": prediction.t_end, + "total_bytes": prediction.total_bytes, + "ranks": prediction.ranks, + "freq": float(prediction.freq), + "top_freq": prediction.top_freqs, + "n_samples": prediction.n_samples, + "wave_names": names, + } + ) + #if prediction: # data.append( # { # "metric": f"{metric}", @@ -87,7 +114,8 @@ def ftio_metric_task_save( # "top_freq": prediction.top_freqs, # } # ) - prediction.metric = metric - data.append(prediction) + # caused issues with msgpack serialization + #prediction.metric = metric + #data.append(prediction) else: CONSOLE.info(f"\n[yellow underline]Warning: {metric} returned {prediction}[/]") diff --git a/pyproject.toml b/pyproject.toml index 18f62b8..73b52b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,6 +85,7 @@ convert_trace = "ftio.util.convert_old_trace:main" trace_ftio = "ftio.api.trace_analysis.trace_ftio_v2:main" trace_analysis = "ftio.api.trace_analysis.trace_analysis:main" admire_proxy_invoke_ftio = "ftio.api.metric_proxy.proxy_invoke_ftio:main" +admire_proxy_zmq = "ftio.api.metric_proxy.proxy_zmq:main" jit_plot = "ftio.api.gekkoFs.jit.jit_plot:main" server_ftio = "ftio.util.server_ftio:main_cli"