Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nemo_deploy/deploy_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def deploy_inframework_model(
model_type: str = "gpt",
model_format: str = "nemo",
micro_batch_size: Optional[int] = None,
tokenizer_path: Optional[str] = None,
**model_config_kwargs,
):
"""Deploy an inframework NeMo/Megatron model using Ray Serve.
Expand Down Expand Up @@ -218,6 +219,7 @@ def deploy_inframework_model(
model_type (str, optional): Type of model to load. Defaults to "gpt".
model_format (str, optional): Format of model to load. Defaults to "nemo".
micro_batch_size (Optional[int], optional): Micro batch size for model execution. Defaults to None.
tokenizer_path (Optional[str], optional): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer. Defaults to None.

Raises:
SystemExit: If parallelism configuration is invalid.
Expand Down Expand Up @@ -260,6 +262,7 @@ def deploy_inframework_model(
model_type=model_type,
model_format=model_format,
micro_batch_size=micro_batch_size,
tokenizer_path=tokenizer_path,
**model_config_kwargs,
)

Expand Down
32 changes: 31 additions & 1 deletion nemo_deploy/llm/inference/inference_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def setup_megatron_model_and_tokenizer_for_inference(
expert_model_parallel_size: Optional[int] = None,
micro_batch_size: Optional[int] = None,
model_type: str = "gpt",
tokenizer_path: Optional[str] = None,
) -> Tuple[List[MegatronModule], MegatronTokenizer]:
"""Initialize a Megatron model and tokenizer for inference from a Megatron-LM/MBridge checkpoint.

Expand All @@ -236,6 +237,7 @@ def setup_megatron_model_and_tokenizer_for_inference(
to the checkpoint value when not provided.
micro_batch_size (Optional[int]): Micro-batch size to use during runtime initialization.
model_type (str): Model family to build (for example, "gpt").
tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer.

Returns:
Tuple[List[MegatronModule], MegatronTokenizer, Any]:
Expand All @@ -245,7 +247,28 @@ def setup_megatron_model_and_tokenizer_for_inference(
"""
dist_config = DistributedInitConfig(distributed_backend="nccl")
torch_distributed_init(dist_config)

model_config, mlm_args = load_model_config(checkpoint_path)

# Use the provided tokenizer_path if available, otherwise use checkpoint tokenizer
new_tokenizer_path = None
if tokenizer_path:
# User explicitly provided a tokenizer path, use it
new_tokenizer_path = tokenizer_path
if hasattr(mlm_args, "tokenizer_model"):
mlm_args.tokenizer_model = tokenizer_path
elif hasattr(mlm_args, "tokenizer_model") and mlm_args.tokenizer_model:
tokenizer_model_path = Path(mlm_args.tokenizer_model)
if not tokenizer_model_path.exists():
# Attempt to reconstruct tokenizer path from checkpoint_path
checkpoint_dir = Path(checkpoint_path)
if checkpoint_dir.is_file():
checkpoint_dir = checkpoint_dir.parent
# Use the filename of the original tokenizer_model (if possible)
tokenizer_filename = tokenizer_model_path.name
new_tokenizer_path = checkpoint_dir / tokenizer_filename
mlm_args.tokenizer_model = str(new_tokenizer_path)

if tensor_model_parallel_size is not None:
model_config.tensor_model_parallel_size = tensor_model_parallel_size
if pipeline_model_parallel_size is not None:
Expand All @@ -254,6 +277,7 @@ def setup_megatron_model_and_tokenizer_for_inference(
model_config.context_parallel_size = context_parallel_size
if expert_model_parallel_size is not None:
model_config.expert_model_parallel_size = expert_model_parallel_size

# Initialize Megatron for inference
rng_config = RNGConfig(inference_rng_tracker=True)
initialize_megatron_for_inference(model_config, dist_config, rng_config, micro_batch_size)
Expand All @@ -264,7 +288,10 @@ def setup_megatron_model_and_tokenizer_for_inference(
megatron_args=mlm_args,
use_cpu_init=False,
)
tokenizer = load_tokenizer(checkpoint_path)
if new_tokenizer_path:
tokenizer = load_tokenizer(checkpoint_path, tokenizer_model=str(new_tokenizer_path))
else:
tokenizer = load_tokenizer(checkpoint_path)
return model, tokenizer, mlm_args


Expand Down Expand Up @@ -437,6 +464,7 @@ def create_mcore_engine(
model_type: str = "gpt",
model_format: str = "nemo",
micro_batch_size: Optional[int] = None,
tokenizer_path: Optional[str] = None,
**model_config_kwargs,
) -> Tuple[MCoreEngineWithCleanup, GPTInferenceWrapper, Union[MCoreTokenizerWrappper, MegatronTokenizer]]:
"""Set up the model, tokenizer and MCoreEngine for inference.
Expand All @@ -458,6 +486,7 @@ def create_mcore_engine(
model_type (str): Type of model to load (default: "gpt")
model_format (str): Format of model to load (default: "nemo")
micro_batch_size (Optional[int]): Micro batch size for model execution
tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer
Returns:
Tuple[MCoreEngineWithCleanup, GPTInferenceWrapper, Union[MCoreTokenizerWrappper, MegatronTokenizer]]: Tuple containing:
- MCoreEngineWithCleanup: Engine for text generation with proper cleanup
Expand Down Expand Up @@ -497,6 +526,7 @@ def create_mcore_engine(
expert_model_parallel_size=expert_model_parallel_size,
micro_batch_size=micro_batch_size,
model_type=model_type,
tokenizer_path=tokenizer_path,
)
model = modelList[0]
if mlm_args is not None:
Expand Down
3 changes: 3 additions & 0 deletions nemo_deploy/llm/megatronllm_deployable.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class MegatronLLMDeployableNemo2(ITritonDeployable):
model_type (str): type of model to load. Defaults to "gpt".(Only for Megatron models)
model_format (str): format of model to load. Defaults to "nemo".
micro_batch_size (Optional[int]): micro batch size for model execution. Defaults to None.(Only for Megatron models)
tokenizer_path (Optional[str]): path to the tokenizer model file. If provided, overrides checkpoint tokenizer. Defaults to None.
"""

def __init__(
Expand All @@ -163,6 +164,7 @@ def __init__(
model_type: str = "gpt",
model_format: str = "nemo",
micro_batch_size: Optional[int] = None,
tokenizer_path: Optional[str] = None,
**model_config_kwargs,
):
if not HAVE_TRITON:
Expand Down Expand Up @@ -196,6 +198,7 @@ def __init__(
model_type=model_type,
model_format=model_format,
micro_batch_size=micro_batch_size,
tokenizer_path=tokenizer_path,
**model_config_kwargs,
)
self.enable_cuda_graphs = enable_cuda_graphs
Expand Down
6 changes: 6 additions & 0 deletions nemo_deploy/llm/megatronllm_deployable_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
model_type: str = "gpt",
model_format: str = "nemo",
micro_batch_size: Optional[int] = None,
tokenizer_path: Optional[str] = None,
**model_config_kwargs,
):
# Use replica-specific environment variables to avoid conflicts
Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(
model_type=model_type,
model_format=model_format,
micro_batch_size=micro_batch_size,
tokenizer_path=tokenizer_path,
**model_config_kwargs,
)
if rank != 0:
Expand Down Expand Up @@ -144,6 +146,7 @@ def __init__(
model_type: str = "gpt",
model_format: str = "nemo",
micro_batch_size: Optional[int] = None,
tokenizer_path: Optional[str] = None,
**model_config_kwargs,
):
"""Initialize the distributed Megatron LLM model deployment.
Expand All @@ -165,6 +168,7 @@ def __init__(
model_type (str): Type of model to load.
model_format (str): Format of model to load.
micro_batch_size (Optional[int]): Micro batch size for model execution.
tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer.
"""
try:
self.model_id = model_id
Expand Down Expand Up @@ -214,6 +218,7 @@ def __init__(
model_type=model_type,
model_format=model_format,
micro_batch_size=micro_batch_size,
tokenizer_path=tokenizer_path,
**model_config_kwargs,
)
worker_futures.append(rank_0_worker)
Expand Down Expand Up @@ -244,6 +249,7 @@ def __init__(
model_type=model_type,
model_format=model_format,
micro_batch_size=micro_batch_size,
tokenizer_path=tokenizer_path,
**model_config_kwargs,
)
worker_futures.append(worker)
Expand Down
8 changes: 8 additions & 0 deletions scripts/deploy/nlp/deploy_inframework_triton.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ def get_args(argv):
default=None,
help="Micro batch size for model execution",
)
parser.add_argument(
"-tp",
"--tokenizer_path",
type=str,
default=None,
help="Path to the tokenizer model file (optional, overrides checkpoint tokenizer)",
)
args = parser.parse_args(argv)
return args

Expand Down Expand Up @@ -276,6 +283,7 @@ def nemo_deploy(argv):
model_type=args.model_type,
model_format=args.model_format,
micro_batch_size=args.micro_batch_size,
tokenizer_path=args.tokenizer_path,
**model_config_kwargs,
)

Expand Down
7 changes: 7 additions & 0 deletions scripts/deploy/nlp/deploy_ray_inframework.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ def parse_args():
default=None,
help="Micro batch size for model execution",
)
parser.add_argument(
"--tokenizer_path",
type=str,
default=None,
help="Path to the tokenizer model file (optional, overrides checkpoint tokenizer)",
)
parser.add_argument(
"--runtime_env",
type=dict,
Expand Down Expand Up @@ -250,6 +256,7 @@ def main():
model_type=args.model_type,
model_format=model_format,
micro_batch_size=args.micro_batch_size,
tokenizer_path=args.tokenizer_path,
**model_config_kwargs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,72 @@ def test_deploy_ray(self):
if self.deploy_proc is not None:
terminate_deployment_process(self.deploy_proc)
self.deploy_proc = None

def test_deploy_ray_with_tokenizer_path(self):
mlm_checkpoint_path = "/home/TestData/megatron_bridge/checkpoints/llama3_145m-mlm_saved-distckpt"
tokenizer_path = "/home/TestData/megatron_bridge/checkpoints/llama3_145m-mlm_saved-distckpt/tokenizer.model"

try:
# Run Ray deployment with tokenizer_path
self.deploy_proc = subprocess.Popen(
[
"coverage",
"run",
"--data-file=/workspace/.coverage",
"--source=/workspace/",
"--parallel-mode",
"scripts/deploy/nlp/deploy_ray_inframework.py",
"--megatron_checkpoint",
mlm_checkpoint_path,
"--model_id",
"llama",
"--num_gpus",
str(1),
"--host",
"0.0.0.0",
"--port",
str(8000),
"--cuda_visible_devices",
"0",
"--tokenizer_path",
tokenizer_path,
]
)
logging.info("Deployment with tokenizer_path started. Waiting for it to be ready...")

# Wait for deployment to be ready
if not wait_for_deployment_ready(host="0.0.0.0", port=8000, max_wait_time=180):
assert False, "Deployment failed to become ready within timeout"

time.sleep(120)

output = query_ray_deployment(
host="0.0.0.0",
port=8000,
model_id="llama",
prompt="What is the color of a banana? ",
max_tokens=20,
)

print(output)

# Check if deployment was successful
# assert output != "", "First prediction is empty"

# Send a second request using the chat endpoint
output_chat = query_ray_deployment(
host="0.0.0.0",
port=8000,
model_id="llama",
prompt="Hello, how are you? ",
max_tokens=20,
use_chat=True,
)
print(output_chat)
# Check if deployment was successful
# assert output_chat != "", "Second prediction (chat) is empty"
finally:
# Ensure the deployment is terminated as soon as queries complete or on failure
if self.deploy_proc is not None:
terminate_deployment_process(self.deploy_proc)
self.deploy_proc = None
Loading
Loading