Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ By contributing, you agree that your contributions will be licensed under the sa
We sincerely thank the following contributors for their valuable contributions:
- [Ahmad Tarraf](https://github.com/a-tarraf)
- [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration
- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score
- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score
- [Tim Dieringer](https://github.com/Tim-Dieringer): bachelor thesis: Additional integration for Metric Proxy
128 changes: 128 additions & 0 deletions ftio/api/metric_proxy/proxy_zmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import math
import time
import numpy as np
import zmq
import msgpack
from rich.console import Console

from multiprocessing import Pool, cpu_count
from ftio.api.metric_proxy.parallel_proxy import execute, execute_parallel
from ftio.prediction.tasks import ftio_metric_task, ftio_metric_task_save

from ftio.api.metric_proxy.parse_proxy import filter_metrics
from ftio.freq.helper import MyConsole
import signal

CONSOLE = MyConsole()
CONSOLE.set(True)

CURRENT_ADDRESS = None
IDLE_TIMEOUT = 100
last_request = time.time()

def sanitize(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {k: sanitize(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize(v) for v in obj]
return obj


def handle_request(msg: bytes) -> bytes:
"""Handle one FTIO request via ZMQ."""
global CURRENT_ADDRESS

if msg == b"ping":
return b"pong"

if msg.startswith(b"New Address: "):
new_address = msg[len(b"New Address: "):].decode()
CURRENT_ADDRESS = new_address
return b"Address updated"

try:
req = msgpack.unpackb(msg, raw=False)
argv = req.get("argv", [])
raw_metrics = req.get("metrics", [])

metrics = filter_metrics(raw_metrics, filter_deriv=False)
print(f"Processing {len(metrics)} metrics")

print(f"With Arguments: {argv}")
argv.extend(["-e", "no"])

disable_parallel = req.get("disable_parallel", False)

ranks = 32


except Exception as e:
return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True)

try:
t = time.process_time()
if disable_parallel:
data = execute(metrics, argv, ranks, False)
else:
data = execute_parallel(metrics, argv, ranks)
elapsed_time = time.process_time() - t
CONSOLE.info(f"[blue]Calculation time: {elapsed_time} s[/]")

native_data = sanitize(list(data))

return msgpack.packb(native_data, use_bin_type=True)

except Exception as e:
print(f"Error during processing: {e}")
return msgpack.packb({"error": str(e)}, use_bin_type=True)


def main(address: str = "tcp://*:0"):
"""FTIO ZMQ Server entrypoint for Metric Proxy."""
global CURRENT_ADDRESS, last_request, POOL
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(address)
CURRENT_ADDRESS = address

signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)

endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode()
print(endpoint, flush=True)

console = Console()
console.print(f"[green]FTIO ZMQ Server listening on {endpoint}[/]")

try:
while True:
if socket.poll(timeout=1000):
msg = socket.recv()
console.print(f"[cyan]Received request ({len(msg)} bytes)[/]")
last_request = time.time()
reply = handle_request(msg)
socket.send(reply)

if reply == b"Address updated":
console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]")
socket.close()
socket = context.socket(zmq.REP)
socket.bind(CURRENT_ADDRESS)
else:
if time.time() - last_request > IDLE_TIMEOUT:
console.print("Idle timeout reached, shutting down server")
break
finally:
socket.close(linger=0)
context.term()


def shutdown_handler(signum, frame):
raise SystemExit



if __name__ == "__main__":
main()
34 changes: 31 additions & 3 deletions ftio/prediction/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# from ftio.prediction.helper import get_dominant
# from ftio.plot.freq_plot import convert_and_plot
from ftio.freq.helper import MyConsole
from ftio.freq.prediction import Prediction
from ftio.parse.args import parse_args
from ftio.plot.freq_plot import convert_and_plot
from ftio.processing.print_output import display_prediction
Expand Down Expand Up @@ -71,7 +72,33 @@ def ftio_metric_task_save(
) -> None:
prediction = ftio_metric_task(metric, arrays, argv, ranks, show)
# freq = get_dominant(prediction) #just get a single dominant value
if prediction:
names = []
if prediction.top_freqs:
freqs = prediction.top_freqs["freq"]
amps = prediction.top_freqs["amp"]
phis = prediction.top_freqs["phi"]

for f, a, p in zip(freqs, amps, phis):
names.append(prediction.get_wave_name(f, a, p))

data.append(
{
"metric": f"{metric}",
"dominant_freq": prediction.dominant_freq,
"conf": prediction.conf,
"amp": prediction.amp,
"phi": prediction.phi,
"t_start": prediction.t_start,
"t_end": prediction.t_end,
"total_bytes": prediction.total_bytes,
"ranks": prediction.ranks,
"freq": float(prediction.freq),
"top_freq": prediction.top_freqs,
"n_samples": prediction.n_samples,
"wave_names": names,
}
)
#if prediction:
# data.append(
# {
# "metric": f"{metric}",
Expand All @@ -87,7 +114,8 @@ def ftio_metric_task_save(
# "top_freq": prediction.top_freqs,
# }
# )
prediction.metric = metric
data.append(prediction)
# caused issues with msgpack serialization
#prediction.metric = metric
#data.append(prediction)
else:
CONSOLE.info(f"\n[yellow underline]Warning: {metric} returned {prediction}[/]")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ convert_trace = "ftio.util.convert_old_trace:main"
trace_ftio = "ftio.api.trace_analysis.trace_ftio_v2:main"
trace_analysis = "ftio.api.trace_analysis.trace_analysis:main"
admire_proxy_invoke_ftio = "ftio.api.metric_proxy.proxy_invoke_ftio:main"
admire_proxy_zmq = "ftio.api.metric_proxy.proxy_zmq:main"
jit_plot = "ftio.api.gekkoFs.jit.jit_plot:main"
server_ftio = "ftio.util.server_ftio:main_cli"

Expand Down