From bfc9c0c4af5df5759a70f113942dbce36aa7556a Mon Sep 17 00:00:00 2001 From: Pranav Prashant Thombre Date: Wed, 14 Jan 2026 10:24:18 -0800 Subject: [PATCH 1/5] Temp fix for k8s issue Signed-off-by: Pranav Prashant Thombre --- nemo_deploy/llm/megatronllm_deployable_ray.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 904ef1c4e3..0ec5900db4 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -192,9 +192,8 @@ def __init__( deployment_node_id = node.get("NodeID") break - rank_0_worker = ModelWorker.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=deployment_node_id, soft=False) - ).remote( + # Common arguments for rank 0 worker + rank_0_kwargs = dict( nemo_checkpoint_filepath=nemo_checkpoint_filepath, rank=0, world_size=num_gpus, @@ -216,6 +215,14 @@ def __init__( micro_batch_size=micro_batch_size, **model_config_kwargs, ) + + # Use node affinity if we found a matching node, otherwise use default scheduling + if deployment_node_id is not None: + rank_0_worker = ModelWorker.options( + scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=deployment_node_id, soft=True) + ).remote(**rank_0_kwargs) + else: + rank_0_worker = ModelWorker.remote(**rank_0_kwargs) worker_futures.append(rank_0_worker) # Wait for rank 0 to start before creating other workers From e3b0018f13b4a98538318e4044449846f86da670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Mon, 19 Jan 2026 17:33:33 +0000 Subject: [PATCH 2/5] fix: Pin master addr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_deploy/llm/megatronllm_deployable_ray.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 0ec5900db4..92f125217e 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -17,6 +17,7 @@ import logging import os import random +import socket import time from typing import Any, Dict, Optional @@ -66,6 +67,8 @@ def __init__( **model_config_kwargs, ): # Use replica-specific environment variables to avoid conflicts + master_addr = "127.0.0.1" + os.environ["MASTER_PORT"] = master_port # All ranks must use the SAME MASTER_ADDR (rank 0 node IP) os.environ["MASTER_ADDR"] = master_addr if master_addr else ray._private.services.get_node_ip_address() From 51af26aa74a312a87a0dd0275a43837a71e090d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Mon, 19 Jan 2026 17:42:54 +0000 Subject: [PATCH 3/5] add delay between workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_deploy/llm/megatronllm_deployable_ray.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 92f125217e..5afb9c8ce2 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -85,6 +85,11 @@ def __init__( LOGGER.info(f"Replica {replica_id} - MASTER_PORT: {os.environ['MASTER_PORT']}") LOGGER.info(f"Replica {replica_id} - MASTER_ADDR: {os.environ['MASTER_ADDR']}") + if rank != 0: + sleep_time = 5 + rank # Rank 1 waits 6s, Rank 2 waits 7s, etc. + LOGGER.info(f"Worker {rank}: Sleeping {sleep_time}s to avoid JIT lock contention...") + time.sleep(sleep_time) + try: self.model = MegatronLLMDeployableNemo2( nemo_checkpoint_filepath=nemo_checkpoint_filepath, From 4b86332995a7bcb5d1924ff22f4e7441c4f7e77c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Mon, 19 Jan 2026 17:49:41 +0000 Subject: [PATCH 4/5] socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_deploy/llm/megatronllm_deployable_ray.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 5afb9c8ce2..2de2dae0a7 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -68,6 +68,10 @@ def __init__( ): # Use replica-specific environment variables to avoid conflicts master_addr = "127.0.0.1" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) # Bind to port 0 lets OS pick a free port + new_port = s.getsockname()[1] + self.master_port = str(new_port) os.environ["MASTER_PORT"] = master_port # All ranks must use the SAME MASTER_ADDR (rank 0 node IP) From 7ed9663f7d70b70ff97825713a0e5b865c26e4d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 22 Jan 2026 18:49:16 +0000 Subject: [PATCH 5/5] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_deploy/llm/megatronllm_deployable_ray.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 2de2dae0a7..4468b1cee5 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -67,11 +67,11 @@ def __init__( **model_config_kwargs, ): # Use replica-specific environment variables to avoid conflicts - master_addr = "127.0.0.1" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) # Bind to port 0 lets OS pick a free port - new_port = s.getsockname()[1] - self.master_port = str(new_port) + # master_addr = "127.0.0.1" + # with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # s.bind(("", 0)) # Bind to port 0 lets OS pick a free port + # new_port = s.getsockname()[1] + # self.master_port = str(new_port) os.environ["MASTER_PORT"] = master_port # All ranks must use the SAME MASTER_ADDR (rank 0 node IP) @@ -89,10 +89,10 @@ def __init__( LOGGER.info(f"Replica {replica_id} - MASTER_PORT: {os.environ['MASTER_PORT']}") LOGGER.info(f"Replica {replica_id} - MASTER_ADDR: {os.environ['MASTER_ADDR']}") - if rank != 0: - sleep_time = 5 + rank # Rank 1 waits 6s, Rank 2 waits 7s, etc. - LOGGER.info(f"Worker {rank}: Sleeping {sleep_time}s to avoid JIT lock contention...") - time.sleep(sleep_time) + # if rank != 0: + # sleep_time = 5 + rank # Rank 1 waits 6s, Rank 2 waits 7s, etc. + # LOGGER.info(f"Worker {rank}: Sleeping {sleep_time}s to avoid JIT lock contention...") + # time.sleep(sleep_time) try: self.model = MegatronLLMDeployableNemo2( @@ -188,8 +188,12 @@ def __init__( # Use replica-specific port to avoid conflicts between replicas base_port = random.randint(29500, 29999) + (replica_id % 100) * 100 deploy_node_ip = ray._private.services.get_node_ip_address() + LOGGER.error(f"Replica {ray._private.services.get_node_ip_address()}") + if not deploy_node_ip: + deploy_node_ip = socket.gethostbyname(socket.gethostname()) + master_port = str(find_available_port(base_port, deploy_node_ip)) - LOGGER.info(f"Replica {replica_id} - Pre-allocated master port: {master_port}") + LOGGER.error(f"Replica {replica_id} - Pre-allocated master port: {master_port}") # Create workers with proper synchronization for distributed initialization # Rank 0 must be created first as it acts as the master in PyTorch distributed