From 87fb82746a1374e1bfceff637419f3dda5450857 Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Thu, 9 Oct 2025 17:41:05 +0200 Subject: [PATCH 1/6] Initial zmq communication for metric proxy --- ftio/api/metric_proxy/proxy_zmq.py | 68 ++++++++++++++++++++++++++++++ ftio/prediction/tasks.py | 3 +- pyproject.toml | 1 + 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100755 ftio/api/metric_proxy/proxy_zmq.py diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py new file mode 100755 index 0000000..7c996d9 --- /dev/null +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -0,0 +1,68 @@ +import json +import zmq +from rich.console import Console + +from ftio.api.metric_proxy.helper import NpArrayEncode, data_to_json +from ftio.api.metric_proxy.parse_proxy import filter_metrics, load_proxy_trace_stdin +from ftio.freq.helper import MyConsole +from ftio.parse.args import parse_args +from ftio.api.metric_proxy.parallel_proxy import execute_parallel, execute + +CONSOLE = MyConsole() +CONSOLE.set(True) + + +def handle_request(msg: str) -> str: + """Handle one FTIO request via ZMQ.""" + if msg == "ping": + return "pong" + + try: + req = json.loads(msg) + argv = req.get("argv", []) + raw_metrics = req.get("metrics", []) + + metrics = filter_metrics(raw_metrics, filter_deriv=False) + + print(f"Arguments: {argv}") + argv.extend(["-e", "no"]) + + + disable_parallel = req.get("disable_parallel", False) + ranks = 32 + + except (KeyError, json.JSONDecodeError) as e: + return json.dumps({"error": f"Invalid request: {e}"}) + + try: + if disable_parallel: + data = execute(metrics, argv, ranks) + else: + data = execute_parallel(metrics, argv, ranks) + + native_data = list(data) if not isinstance(data, list) else data + + return json.dumps(native_data, cls=NpArrayEncode) + + except Exception as e: + return json.dumps({"error": str(e)}) + + +def main(address: str = "tcp://*:5555"): + """FTIO ZMQ Server entrypoint for Metric Proxy.""" + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(address) + + console = Console() + console.print(f"[green]FTIO ZMQ Server listening on {address}[/]") + + while True: + msg = socket.recv_string() + console.print(f"[cyan]Received request ({len(msg)} bytes)[/]") + reply = handle_request(msg) + socket.send_string(reply) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ftio/prediction/tasks.py b/ftio/prediction/tasks.py index e749575..f680394 100644 --- a/ftio/prediction/tasks.py +++ b/ftio/prediction/tasks.py @@ -83,8 +83,9 @@ def ftio_metric_task_save( "t_end": prediction.t_end, "total_bytes": prediction.total_bytes, "ranks": prediction.ranks, - "freq": prediction.freq, + "freq": float(prediction.freq), "top_freq": prediction.top_freqs, + "n_samples": prediction.n_samples, } ) else: diff --git a/pyproject.toml b/pyproject.toml index 73aaaa4..b23b5e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,6 +85,7 @@ convert_trace = "ftio.util.convert_old_trace:main" trace_ftio = "ftio.api.trace_analysis.trace_ftio_v2:main" trace_analysis = "ftio.api.trace_analysis.trace_analysis:main" admire_proxy_invoke_ftio = "ftio.api.metric_proxy.proxy_invoke_ftio:main" +admire_proxy_zmq = "ftio.api.metric_proxy.proxy_zmq:main" jit_plot = "ftio.api.gekkoFs.jit.jit_plot:main" server_ftio = "ftio.util.server_ftio:main_cli" From 604ea4ab23aee33a57f6643a7a842b579be05e79 Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Mon, 20 Oct 2025 15:50:36 +0200 Subject: [PATCH 2/6] Changed proxy communication to msgpack from JSON --- ftio/api/metric_proxy/proxy_zmq.py | 41 ++++++++++++++++++------------ 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py index 7c996d9..081aa97 100755 --- a/ftio/api/metric_proxy/proxy_zmq.py +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -1,24 +1,32 @@ -import json +import numpy as np import zmq +import msgpack from rich.console import Console -from ftio.api.metric_proxy.helper import NpArrayEncode, data_to_json -from ftio.api.metric_proxy.parse_proxy import filter_metrics, load_proxy_trace_stdin +from ftio.api.metric_proxy.parse_proxy import filter_metrics from ftio.freq.helper import MyConsole -from ftio.parse.args import parse_args from ftio.api.metric_proxy.parallel_proxy import execute_parallel, execute CONSOLE = MyConsole() CONSOLE.set(True) +def sanitize(obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {k: sanitize(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [sanitize(v) for v in obj] + return obj -def handle_request(msg: str) -> str: + +def handle_request(msg: bytes) -> bytes: """Handle one FTIO request via ZMQ.""" - if msg == "ping": - return "pong" + if msg == b"ping": + return b"pong" try: - req = json.loads(msg) + req = msgpack.unpackb(msg, raw=False) argv = req.get("argv", []) raw_metrics = req.get("metrics", []) @@ -27,25 +35,26 @@ def handle_request(msg: str) -> str: print(f"Arguments: {argv}") argv.extend(["-e", "no"]) - disable_parallel = req.get("disable_parallel", False) ranks = 32 - except (KeyError, json.JSONDecodeError) as e: - return json.dumps({"error": f"Invalid request: {e}"}) + except Exception as e: + return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True) try: if disable_parallel: - data = execute(metrics, argv, ranks) + data = execute(metrics, argv, ranks, show=False) else: data = execute_parallel(metrics, argv, ranks) native_data = list(data) if not isinstance(data, list) else data + native_data = sanitize(native_data) - return json.dumps(native_data, cls=NpArrayEncode) + return msgpack.packb(native_data, use_bin_type=True) except Exception as e: - return json.dumps({"error": str(e)}) + print(f"Error during processing: {e}") + return msgpack.packb({"error": str(e)}, use_bin_type=True) def main(address: str = "tcp://*:5555"): @@ -58,10 +67,10 @@ def main(address: str = "tcp://*:5555"): console.print(f"[green]FTIO ZMQ Server listening on {address}[/]") while True: - msg = socket.recv_string() + msg = socket.recv() console.print(f"[cyan]Received request ({len(msg)} bytes)[/]") reply = handle_request(msg) - socket.send_string(reply) + socket.send(reply) if __name__ == "__main__": From 2e623c96ba05944ea3b63a852d99f2120182855c Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Tue, 18 Nov 2025 14:39:55 +0100 Subject: [PATCH 3/6] Added wave names to ftio task output --- ftio/prediction/tasks.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ftio/prediction/tasks.py b/ftio/prediction/tasks.py index f680394..ccffdfe 100644 --- a/ftio/prediction/tasks.py +++ b/ftio/prediction/tasks.py @@ -5,6 +5,7 @@ # from ftio.prediction.helper import get_dominant # from ftio.plot.freq_plot import convert_and_plot from ftio.freq.helper import MyConsole +from ftio.freq.prediction import Prediction from ftio.parse.args import parse_args from ftio.plot.freq_plot import convert_and_plot from ftio.processing.print_output import display_prediction @@ -71,7 +72,15 @@ def ftio_metric_task_save( ) -> None: prediction = ftio_metric_task(metric, arrays, argv, ranks, show) # freq = get_dominant(prediction) #just get a single dominant value - if prediction: + names = [] + if prediction.top_freqs: + freqs = prediction.top_freqs["freq"] + amps = prediction.top_freqs["amp"] + phis = prediction.top_freqs["phi"] + + for f, a, p in zip(freqs, amps, phis): + names.append(prediction.get_wave_name(f, a, p)) + data.append( { "metric": f"{metric}", @@ -86,6 +95,7 @@ def ftio_metric_task_save( "freq": float(prediction.freq), "top_freq": prediction.top_freqs, "n_samples": prediction.n_samples, + "wave_names": names, } ) else: From 6ed20b3b0d52ade9deac19078926a7004bcd56fd Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Tue, 2 Dec 2025 13:54:24 +0100 Subject: [PATCH 4/6] Added dynamic initial port and ability to change port at runtime --- ftio/api/metric_proxy/proxy_zmq.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py index 081aa97..36f7567 100755 --- a/ftio/api/metric_proxy/proxy_zmq.py +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -10,6 +10,8 @@ CONSOLE = MyConsole() CONSOLE.set(True) +CURRENT_ADDRESS = None + def sanitize(obj): if isinstance(obj, np.ndarray): return obj.tolist() @@ -22,9 +24,16 @@ def sanitize(obj): def handle_request(msg: bytes) -> bytes: """Handle one FTIO request via ZMQ.""" + global CURRENT_ADDRESS + if msg == b"ping": return b"pong" + if msg.startswith(b"New Address: "): + new_address = msg[len(b"New Address: "):].decode() + CURRENT_ADDRESS = new_address + return b"Address updated" + try: req = msgpack.unpackb(msg, raw=False) argv = req.get("argv", []) @@ -57,14 +66,19 @@ def handle_request(msg: bytes) -> bytes: return msgpack.packb({"error": str(e)}, use_bin_type=True) -def main(address: str = "tcp://*:5555"): +def main(address: str = "tcp://*:0"): """FTIO ZMQ Server entrypoint for Metric Proxy.""" + global CURRENT_ADDRESS context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(address) + CURRENT_ADDRESS = address + + endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode() + print(endpoint, flush=True) console = Console() - console.print(f"[green]FTIO ZMQ Server listening on {address}[/]") + console.print(f"[green]FTIO ZMQ Server listening on {endpoint}[/]") while True: msg = socket.recv() @@ -72,6 +86,12 @@ def main(address: str = "tcp://*:5555"): reply = handle_request(msg) socket.send(reply) + if reply == b"Address updated": + console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]") + socket.close() + socket = context.socket(zmq.REP) + socket.bind(CURRENT_ADDRESS) + if __name__ == "__main__": main() \ No newline at end of file From 3d4eb007a48c025affae77d213e169ad66b22902 Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Thu, 18 Dec 2025 14:05:40 +0100 Subject: [PATCH 5/6] Optimized metric processing for high core counts --- ftio/api/metric_proxy/proxy_zmq.py | 140 ++++++++++++++++++++++++----- 1 file changed, 117 insertions(+), 23 deletions(-) diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py index 36f7567..6e15126 100755 --- a/ftio/api/metric_proxy/proxy_zmq.py +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -1,16 +1,25 @@ +import math +import time import numpy as np import zmq import msgpack from rich.console import Console +from multiprocessing import Pool, cpu_count +from ftio.prediction.tasks import ftio_metric_task, ftio_metric_task_save + from ftio.api.metric_proxy.parse_proxy import filter_metrics from ftio.freq.helper import MyConsole -from ftio.api.metric_proxy.parallel_proxy import execute_parallel, execute +import signal CONSOLE = MyConsole() CONSOLE.set(True) CURRENT_ADDRESS = None +IDLE_TIMEOUT = 100 +last_request = time.time() + +POOL = None def sanitize(obj): if isinstance(obj, np.ndarray): @@ -40,24 +49,22 @@ def handle_request(msg: bytes) -> bytes: raw_metrics = req.get("metrics", []) metrics = filter_metrics(raw_metrics, filter_deriv=False) + print(f"Processing {len(metrics)} metrics") - print(f"Arguments: {argv}") + print(f"With Arguments: {argv}") argv.extend(["-e", "no"]) - disable_parallel = req.get("disable_parallel", False) - ranks = 32 except Exception as e: return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True) try: - if disable_parallel: - data = execute(metrics, argv, ranks, show=False) - else: - data = execute_parallel(metrics, argv, ranks) - - native_data = list(data) if not isinstance(data, list) else data - native_data = sanitize(native_data) + t = time.process_time() + data = execute_parallel(metrics, argv) + elapsed_time = time.process_time() - t + CONSOLE.info(f"[blue]Calculation time: {elapsed_time} s[/]") + + native_data = sanitize(data) return msgpack.packb(native_data, use_bin_type=True) @@ -65,32 +72,119 @@ def handle_request(msg: bytes) -> bytes: print(f"Error during processing: {e}") return msgpack.packb({"error": str(e)}, use_bin_type=True) +def execute_parallel(metrics: dict, argv: list): + global POOL + + cpu_workers = max(1, cpu_count() - 2) + batch_size = max(1, math.ceil(len(metrics) / cpu_workers)) + results = [] + + metric_items = list(metrics.items()) + batches = [metric_items[i:i+batch_size] for i in range(0, len(metric_items), batch_size)] + + batch_results = POOL.starmap( + ftio_metric_task_batch, + [(batch, argv) for batch in batches] + ) + + for br in batch_results: + results.extend(br) + + return results + +def ftio_metric_task_batch(batch, argv): + batch_results = [] + for metric, arrays in batch: + batch_results.extend(ftio_metric_task_save(metric, arrays, argv)) + return batch_results + +def ftio_metric_task_save( + metric: str, + arrays: np.ndarray, + argv: list, + show: bool = False, +) -> None: + ranks = 32 + prediction = ftio_metric_task(metric, arrays, argv, ranks, show) + names = [] + result = [] + if prediction.top_freqs: + freqs = prediction.top_freqs["freq"] + amps = prediction.top_freqs["amp"] + phis = prediction.top_freqs["phi"] + + for f, a, p in zip(freqs, amps, phis): + names.append(prediction.get_wave_name(f, a, p)) + + result.append( + { + "metric": f"{metric}", + "dominant_freq": prediction.dominant_freq, + "conf": prediction.conf, + "amp": prediction.amp, + "phi": prediction.phi, + "t_start": prediction.t_start, + "t_end": prediction.t_end, + "total_bytes": prediction.total_bytes, + "ranks": prediction.ranks, + "freq": float(prediction.freq), + "top_freq": prediction.top_freqs, + "n_samples": prediction.n_samples, + "wave_names": names, + } + ) + else: + CONSOLE.info(f"\n[yellow underline]Warning: {metric} returned {prediction}[/]") + + return result def main(address: str = "tcp://*:0"): """FTIO ZMQ Server entrypoint for Metric Proxy.""" - global CURRENT_ADDRESS + global CURRENT_ADDRESS, last_request, POOL context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(address) CURRENT_ADDRESS = address + POOL = Pool(processes=cpu_count() - 2, maxtasksperchild=500) + + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal(signal.SIGINT, shutdown_handler) + endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode() print(endpoint, flush=True) console = Console() console.print(f"[green]FTIO ZMQ Server listening on {endpoint}[/]") - while True: - msg = socket.recv() - console.print(f"[cyan]Received request ({len(msg)} bytes)[/]") - reply = handle_request(msg) - socket.send(reply) - - if reply == b"Address updated": - console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]") - socket.close() - socket = context.socket(zmq.REP) - socket.bind(CURRENT_ADDRESS) + try: + while True: + if socket.poll(timeout=1000): + msg = socket.recv() + console.print(f"[cyan]Received request ({len(msg)} bytes)[/]") + last_request = time.time() + reply = handle_request(msg) + socket.send(reply) + + if reply == b"Address updated": + console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]") + socket.close() + socket = context.socket(zmq.REP) + socket.bind(CURRENT_ADDRESS) + else: + if time.time() - last_request > IDLE_TIMEOUT: + console.print("Idle timeout reached, shutting down server") + break + finally: + POOL.close() + POOL.join() + socket.close(linger=0) + context.term() + + +def shutdown_handler(signum, frame): + raise SystemExit + if __name__ == "__main__": From 7a0ac99a87c862fbc3dc75958982cafb62d2af7f Mon Sep 17 00:00:00 2001 From: Tim Dieringer Date: Mon, 22 Dec 2025 16:57:50 +0100 Subject: [PATCH 6/6] Simplified code and added information to contributing.md --- docs/contributing.md | 3 +- ftio/api/metric_proxy/proxy_zmq.py | 83 ++++-------------------------- 2 files changed, 12 insertions(+), 74 deletions(-) diff --git a/docs/contributing.md b/docs/contributing.md index 89ce639..10007e4 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -91,4 +91,5 @@ By contributing, you agree that your contributions will be licensed under the sa We sincerely thank the following contributors for their valuable contributions: - [Ahmad Tarraf](https://github.com/a-tarraf) - [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration -- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score \ No newline at end of file +- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score +- [Tim Dieringer](https://github.com/Tim-Dieringer): bachelor thesis: Additional integration for Metric Proxy \ No newline at end of file diff --git a/ftio/api/metric_proxy/proxy_zmq.py b/ftio/api/metric_proxy/proxy_zmq.py index 6e15126..d1014ea 100755 --- a/ftio/api/metric_proxy/proxy_zmq.py +++ b/ftio/api/metric_proxy/proxy_zmq.py @@ -6,6 +6,7 @@ from rich.console import Console from multiprocessing import Pool, cpu_count +from ftio.api.metric_proxy.parallel_proxy import execute, execute_parallel from ftio.prediction.tasks import ftio_metric_task, ftio_metric_task_save from ftio.api.metric_proxy.parse_proxy import filter_metrics @@ -19,8 +20,6 @@ IDLE_TIMEOUT = 100 last_request = time.time() -POOL = None - def sanitize(obj): if isinstance(obj, np.ndarray): return obj.tolist() @@ -54,17 +53,24 @@ def handle_request(msg: bytes) -> bytes: print(f"With Arguments: {argv}") argv.extend(["-e", "no"]) + disable_parallel = req.get("disable_parallel", False) + + ranks = 32 + except Exception as e: return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True) try: t = time.process_time() - data = execute_parallel(metrics, argv) + if disable_parallel: + data = execute(metrics, argv, ranks, False) + else: + data = execute_parallel(metrics, argv, ranks) elapsed_time = time.process_time() - t CONSOLE.info(f"[blue]Calculation time: {elapsed_time} s[/]") - native_data = sanitize(data) + native_data = sanitize(list(data)) return msgpack.packb(native_data, use_bin_type=True) @@ -72,71 +78,6 @@ def handle_request(msg: bytes) -> bytes: print(f"Error during processing: {e}") return msgpack.packb({"error": str(e)}, use_bin_type=True) -def execute_parallel(metrics: dict, argv: list): - global POOL - - cpu_workers = max(1, cpu_count() - 2) - batch_size = max(1, math.ceil(len(metrics) / cpu_workers)) - results = [] - - metric_items = list(metrics.items()) - batches = [metric_items[i:i+batch_size] for i in range(0, len(metric_items), batch_size)] - - batch_results = POOL.starmap( - ftio_metric_task_batch, - [(batch, argv) for batch in batches] - ) - - for br in batch_results: - results.extend(br) - - return results - -def ftio_metric_task_batch(batch, argv): - batch_results = [] - for metric, arrays in batch: - batch_results.extend(ftio_metric_task_save(metric, arrays, argv)) - return batch_results - -def ftio_metric_task_save( - metric: str, - arrays: np.ndarray, - argv: list, - show: bool = False, -) -> None: - ranks = 32 - prediction = ftio_metric_task(metric, arrays, argv, ranks, show) - names = [] - result = [] - if prediction.top_freqs: - freqs = prediction.top_freqs["freq"] - amps = prediction.top_freqs["amp"] - phis = prediction.top_freqs["phi"] - - for f, a, p in zip(freqs, amps, phis): - names.append(prediction.get_wave_name(f, a, p)) - - result.append( - { - "metric": f"{metric}", - "dominant_freq": prediction.dominant_freq, - "conf": prediction.conf, - "amp": prediction.amp, - "phi": prediction.phi, - "t_start": prediction.t_start, - "t_end": prediction.t_end, - "total_bytes": prediction.total_bytes, - "ranks": prediction.ranks, - "freq": float(prediction.freq), - "top_freq": prediction.top_freqs, - "n_samples": prediction.n_samples, - "wave_names": names, - } - ) - else: - CONSOLE.info(f"\n[yellow underline]Warning: {metric} returned {prediction}[/]") - - return result def main(address: str = "tcp://*:0"): """FTIO ZMQ Server entrypoint for Metric Proxy.""" @@ -146,8 +87,6 @@ def main(address: str = "tcp://*:0"): socket.bind(address) CURRENT_ADDRESS = address - POOL = Pool(processes=cpu_count() - 2, maxtasksperchild=500) - signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) @@ -176,8 +115,6 @@ def main(address: str = "tcp://*:0"): console.print("Idle timeout reached, shutting down server") break finally: - POOL.close() - POOL.join() socket.close(linger=0) context.term()