diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 904ef1c4e3..4468b1cee5 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -17,6 +17,7 @@ import logging import os import random +import socket import time from typing import Any, Dict, Optional @@ -66,6 +67,12 @@ def __init__( **model_config_kwargs, ): # Use replica-specific environment variables to avoid conflicts + # master_addr = "127.0.0.1" + # with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # s.bind(("", 0)) # Bind to port 0 lets OS pick a free port + # new_port = s.getsockname()[1] + # self.master_port = str(new_port) + os.environ["MASTER_PORT"] = master_port # All ranks must use the SAME MASTER_ADDR (rank 0 node IP) os.environ["MASTER_ADDR"] = master_addr if master_addr else ray._private.services.get_node_ip_address() @@ -82,6 +89,11 @@ def __init__( LOGGER.info(f"Replica {replica_id} - MASTER_PORT: {os.environ['MASTER_PORT']}") LOGGER.info(f"Replica {replica_id} - MASTER_ADDR: {os.environ['MASTER_ADDR']}") + # if rank != 0: + # sleep_time = 5 + rank # Rank 1 waits 6s, Rank 2 waits 7s, etc. + # LOGGER.info(f"Worker {rank}: Sleeping {sleep_time}s to avoid JIT lock contention...") + # time.sleep(sleep_time) + try: self.model = MegatronLLMDeployableNemo2( nemo_checkpoint_filepath=nemo_checkpoint_filepath, @@ -176,8 +188,12 @@ def __init__( # Use replica-specific port to avoid conflicts between replicas base_port = random.randint(29500, 29999) + (replica_id % 100) * 100 deploy_node_ip = ray._private.services.get_node_ip_address() + LOGGER.error(f"Replica {ray._private.services.get_node_ip_address()}") + if not deploy_node_ip: + deploy_node_ip = socket.gethostbyname(socket.gethostname()) + master_port = str(find_available_port(base_port, deploy_node_ip)) - LOGGER.info(f"Replica {replica_id} - Pre-allocated master port: {master_port}") + LOGGER.error(f"Replica {replica_id} - Pre-allocated master port: {master_port}") # Create workers with proper synchronization for distributed initialization # Rank 0 must be created first as it acts as the master in PyTorch distributed @@ -192,9 +208,8 @@ def __init__( deployment_node_id = node.get("NodeID") break - rank_0_worker = ModelWorker.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=deployment_node_id, soft=False) - ).remote( + # Common arguments for rank 0 worker + rank_0_kwargs = dict( nemo_checkpoint_filepath=nemo_checkpoint_filepath, rank=0, world_size=num_gpus, @@ -216,6 +231,14 @@ def __init__( micro_batch_size=micro_batch_size, **model_config_kwargs, ) + + # Use node affinity if we found a matching node, otherwise use default scheduling + if deployment_node_id is not None: + rank_0_worker = ModelWorker.options( + scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=deployment_node_id, soft=True) + ).remote(**rank_0_kwargs) + else: + rank_0_worker = ModelWorker.remote(**rank_0_kwargs) worker_futures.append(rank_0_worker) # Wait for rank 0 to start before creating other workers