diff --git a/nemo_deploy/deploy_ray.py b/nemo_deploy/deploy_ray.py index 66115f5d16..d6891edd5f 100644 --- a/nemo_deploy/deploy_ray.py +++ b/nemo_deploy/deploy_ray.py @@ -190,6 +190,7 @@ def deploy_inframework_model( model_type: str = "gpt", model_format: str = "nemo", micro_batch_size: Optional[int] = None, + tokenizer_path: Optional[str] = None, **model_config_kwargs, ): """Deploy an inframework NeMo/Megatron model using Ray Serve. @@ -218,6 +219,7 @@ def deploy_inframework_model( model_type (str, optional): Type of model to load. Defaults to "gpt". model_format (str, optional): Format of model to load. Defaults to "nemo". micro_batch_size (Optional[int], optional): Micro batch size for model execution. Defaults to None. + tokenizer_path (Optional[str], optional): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer. Defaults to None. Raises: SystemExit: If parallelism configuration is invalid. @@ -260,6 +262,7 @@ def deploy_inframework_model( model_type=model_type, model_format=model_format, micro_batch_size=micro_batch_size, + tokenizer_path=tokenizer_path, **model_config_kwargs, ) diff --git a/nemo_deploy/llm/inference/inference_base.py b/nemo_deploy/llm/inference/inference_base.py index 4aed3f086e..d287405a02 100644 --- a/nemo_deploy/llm/inference/inference_base.py +++ b/nemo_deploy/llm/inference/inference_base.py @@ -217,6 +217,7 @@ def setup_megatron_model_and_tokenizer_for_inference( expert_model_parallel_size: Optional[int] = None, micro_batch_size: Optional[int] = None, model_type: str = "gpt", + tokenizer_path: Optional[str] = None, ) -> Tuple[List[MegatronModule], MegatronTokenizer]: """Initialize a Megatron model and tokenizer for inference from a Megatron-LM/MBridge checkpoint. @@ -236,6 +237,7 @@ def setup_megatron_model_and_tokenizer_for_inference( to the checkpoint value when not provided. micro_batch_size (Optional[int]): Micro-batch size to use during runtime initialization. model_type (str): Model family to build (for example, "gpt"). + tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer. Returns: Tuple[List[MegatronModule], MegatronTokenizer, Any]: @@ -245,7 +247,28 @@ def setup_megatron_model_and_tokenizer_for_inference( """ dist_config = DistributedInitConfig(distributed_backend="nccl") torch_distributed_init(dist_config) + model_config, mlm_args = load_model_config(checkpoint_path) + + # Use the provided tokenizer_path if available, otherwise use checkpoint tokenizer + new_tokenizer_path = None + if tokenizer_path: + # User explicitly provided a tokenizer path, use it + new_tokenizer_path = tokenizer_path + if hasattr(mlm_args, "tokenizer_model"): + mlm_args.tokenizer_model = tokenizer_path + elif hasattr(mlm_args, "tokenizer_model") and mlm_args.tokenizer_model: + tokenizer_model_path = Path(mlm_args.tokenizer_model) + if not tokenizer_model_path.exists(): + # Attempt to reconstruct tokenizer path from checkpoint_path + checkpoint_dir = Path(checkpoint_path) + if checkpoint_dir.is_file(): + checkpoint_dir = checkpoint_dir.parent + # Use the filename of the original tokenizer_model (if possible) + tokenizer_filename = tokenizer_model_path.name + new_tokenizer_path = checkpoint_dir / tokenizer_filename + mlm_args.tokenizer_model = str(new_tokenizer_path) + if tensor_model_parallel_size is not None: model_config.tensor_model_parallel_size = tensor_model_parallel_size if pipeline_model_parallel_size is not None: @@ -254,6 +277,7 @@ def setup_megatron_model_and_tokenizer_for_inference( model_config.context_parallel_size = context_parallel_size if expert_model_parallel_size is not None: model_config.expert_model_parallel_size = expert_model_parallel_size + # Initialize Megatron for inference rng_config = RNGConfig(inference_rng_tracker=True) initialize_megatron_for_inference(model_config, dist_config, rng_config, micro_batch_size) @@ -264,7 +288,10 @@ def setup_megatron_model_and_tokenizer_for_inference( megatron_args=mlm_args, use_cpu_init=False, ) - tokenizer = load_tokenizer(checkpoint_path) + if new_tokenizer_path: + tokenizer = load_tokenizer(checkpoint_path, tokenizer_model=str(new_tokenizer_path)) + else: + tokenizer = load_tokenizer(checkpoint_path) return model, tokenizer, mlm_args @@ -437,6 +464,7 @@ def create_mcore_engine( model_type: str = "gpt", model_format: str = "nemo", micro_batch_size: Optional[int] = None, + tokenizer_path: Optional[str] = None, **model_config_kwargs, ) -> Tuple[MCoreEngineWithCleanup, GPTInferenceWrapper, Union[MCoreTokenizerWrappper, MegatronTokenizer]]: """Set up the model, tokenizer and MCoreEngine for inference. @@ -458,6 +486,7 @@ def create_mcore_engine( model_type (str): Type of model to load (default: "gpt") model_format (str): Format of model to load (default: "nemo") micro_batch_size (Optional[int]): Micro batch size for model execution + tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer Returns: Tuple[MCoreEngineWithCleanup, GPTInferenceWrapper, Union[MCoreTokenizerWrappper, MegatronTokenizer]]: Tuple containing: - MCoreEngineWithCleanup: Engine for text generation with proper cleanup @@ -497,6 +526,7 @@ def create_mcore_engine( expert_model_parallel_size=expert_model_parallel_size, micro_batch_size=micro_batch_size, model_type=model_type, + tokenizer_path=tokenizer_path, ) model = modelList[0] if mlm_args is not None: diff --git a/nemo_deploy/llm/megatronllm_deployable.py b/nemo_deploy/llm/megatronllm_deployable.py index 8cd59d50e1..3013111804 100755 --- a/nemo_deploy/llm/megatronllm_deployable.py +++ b/nemo_deploy/llm/megatronllm_deployable.py @@ -140,6 +140,7 @@ class MegatronLLMDeployableNemo2(ITritonDeployable): model_type (str): type of model to load. Defaults to "gpt".(Only for Megatron models) model_format (str): format of model to load. Defaults to "nemo". micro_batch_size (Optional[int]): micro batch size for model execution. Defaults to None.(Only for Megatron models) + tokenizer_path (Optional[str]): path to the tokenizer model file. If provided, overrides checkpoint tokenizer. Defaults to None. """ def __init__( @@ -163,6 +164,7 @@ def __init__( model_type: str = "gpt", model_format: str = "nemo", micro_batch_size: Optional[int] = None, + tokenizer_path: Optional[str] = None, **model_config_kwargs, ): if not HAVE_TRITON: @@ -196,6 +198,7 @@ def __init__( model_type=model_type, model_format=model_format, micro_batch_size=micro_batch_size, + tokenizer_path=tokenizer_path, **model_config_kwargs, ) self.enable_cuda_graphs = enable_cuda_graphs diff --git a/nemo_deploy/llm/megatronllm_deployable_ray.py b/nemo_deploy/llm/megatronllm_deployable_ray.py index 904ef1c4e3..7acb4900bc 100644 --- a/nemo_deploy/llm/megatronllm_deployable_ray.py +++ b/nemo_deploy/llm/megatronllm_deployable_ray.py @@ -63,6 +63,7 @@ def __init__( model_type: str = "gpt", model_format: str = "nemo", micro_batch_size: Optional[int] = None, + tokenizer_path: Optional[str] = None, **model_config_kwargs, ): # Use replica-specific environment variables to avoid conflicts @@ -100,6 +101,7 @@ def __init__( model_type=model_type, model_format=model_format, micro_batch_size=micro_batch_size, + tokenizer_path=tokenizer_path, **model_config_kwargs, ) if rank != 0: @@ -144,6 +146,7 @@ def __init__( model_type: str = "gpt", model_format: str = "nemo", micro_batch_size: Optional[int] = None, + tokenizer_path: Optional[str] = None, **model_config_kwargs, ): """Initialize the distributed Megatron LLM model deployment. @@ -165,6 +168,7 @@ def __init__( model_type (str): Type of model to load. model_format (str): Format of model to load. micro_batch_size (Optional[int]): Micro batch size for model execution. + tokenizer_path (Optional[str]): Path to the tokenizer model file. If provided, overrides checkpoint tokenizer. """ try: self.model_id = model_id @@ -214,6 +218,7 @@ def __init__( model_type=model_type, model_format=model_format, micro_batch_size=micro_batch_size, + tokenizer_path=tokenizer_path, **model_config_kwargs, ) worker_futures.append(rank_0_worker) @@ -244,6 +249,7 @@ def __init__( model_type=model_type, model_format=model_format, micro_batch_size=micro_batch_size, + tokenizer_path=tokenizer_path, **model_config_kwargs, ) worker_futures.append(worker) diff --git a/scripts/deploy/nlp/deploy_inframework_triton.py b/scripts/deploy/nlp/deploy_inframework_triton.py index 571d855ed5..c15094fc54 100755 --- a/scripts/deploy/nlp/deploy_inframework_triton.py +++ b/scripts/deploy/nlp/deploy_inframework_triton.py @@ -224,6 +224,13 @@ def get_args(argv): default=None, help="Micro batch size for model execution", ) + parser.add_argument( + "-tp", + "--tokenizer_path", + type=str, + default=None, + help="Path to the tokenizer model file (optional, overrides checkpoint tokenizer)", + ) args = parser.parse_args(argv) return args @@ -276,6 +283,7 @@ def nemo_deploy(argv): model_type=args.model_type, model_format=args.model_format, micro_batch_size=args.micro_batch_size, + tokenizer_path=args.tokenizer_path, **model_config_kwargs, ) diff --git a/scripts/deploy/nlp/deploy_ray_inframework.py b/scripts/deploy/nlp/deploy_ray_inframework.py index d76857070e..bba18fe28f 100644 --- a/scripts/deploy/nlp/deploy_ray_inframework.py +++ b/scripts/deploy/nlp/deploy_ray_inframework.py @@ -185,6 +185,12 @@ def parse_args(): default=None, help="Micro batch size for model execution", ) + parser.add_argument( + "--tokenizer_path", + type=str, + default=None, + help="Path to the tokenizer model file (optional, overrides checkpoint tokenizer)", + ) parser.add_argument( "--runtime_env", type=dict, @@ -250,6 +256,7 @@ def main(): model_type=args.model_type, model_format=model_format, micro_batch_size=args.micro_batch_size, + tokenizer_path=args.tokenizer_path, **model_config_kwargs, ) diff --git a/tests/functional_tests/tests_inframework/test_deploy_query_mlm_ray.py b/tests/functional_tests/tests_inframework/test_deploy_query_mlm_ray.py index a10eb5452a..5d91456d8b 100644 --- a/tests/functional_tests/tests_inframework/test_deploy_query_mlm_ray.py +++ b/tests/functional_tests/tests_inframework/test_deploy_query_mlm_ray.py @@ -103,3 +103,72 @@ def test_deploy_ray(self): if self.deploy_proc is not None: terminate_deployment_process(self.deploy_proc) self.deploy_proc = None + + def test_deploy_ray_with_tokenizer_path(self): + mlm_checkpoint_path = "/home/TestData/megatron_bridge/checkpoints/llama3_145m-mlm_saved-distckpt" + tokenizer_path = "/home/TestData/megatron_bridge/checkpoints/llama3_145m-mlm_saved-distckpt/tokenizer.model" + + try: + # Run Ray deployment with tokenizer_path + self.deploy_proc = subprocess.Popen( + [ + "coverage", + "run", + "--data-file=/workspace/.coverage", + "--source=/workspace/", + "--parallel-mode", + "scripts/deploy/nlp/deploy_ray_inframework.py", + "--megatron_checkpoint", + mlm_checkpoint_path, + "--model_id", + "llama", + "--num_gpus", + str(1), + "--host", + "0.0.0.0", + "--port", + str(8000), + "--cuda_visible_devices", + "0", + "--tokenizer_path", + tokenizer_path, + ] + ) + logging.info("Deployment with tokenizer_path started. Waiting for it to be ready...") + + # Wait for deployment to be ready + if not wait_for_deployment_ready(host="0.0.0.0", port=8000, max_wait_time=180): + assert False, "Deployment failed to become ready within timeout" + + time.sleep(120) + + output = query_ray_deployment( + host="0.0.0.0", + port=8000, + model_id="llama", + prompt="What is the color of a banana? ", + max_tokens=20, + ) + + print(output) + + # Check if deployment was successful + # assert output != "", "First prediction is empty" + + # Send a second request using the chat endpoint + output_chat = query_ray_deployment( + host="0.0.0.0", + port=8000, + model_id="llama", + prompt="Hello, how are you? ", + max_tokens=20, + use_chat=True, + ) + print(output_chat) + # Check if deployment was successful + # assert output_chat != "", "Second prediction (chat) is empty" + finally: + # Ensure the deployment is terminated as soon as queries complete or on failure + if self.deploy_proc is not None: + terminate_deployment_process(self.deploy_proc) + self.deploy_proc = None diff --git a/tests/unit_tests/deploy/test_inference_base.py b/tests/unit_tests/deploy/test_inference_base.py index 317b9338a3..8441747796 100644 --- a/tests/unit_tests/deploy/test_inference_base.py +++ b/tests/unit_tests/deploy/test_inference_base.py @@ -34,6 +34,7 @@ initialize_megatron_for_inference, load_nemo_checkpoint_to_tron_model, peel, + setup_megatron_model_and_tokenizer_for_inference, setup_model_and_tokenizer_for_inference, ) from nemo_deploy.llm.inference.tron_utils import DistributedInitConfig, RNGConfig @@ -461,6 +462,127 @@ def test_create_mcore_engine_unavailable_nemo_raises(self): with self.assertRaises(UnavailableError): create_mcore_engine(path=self.mock_path) + @patch("nemo_deploy.llm.inference.inference_base.load_tokenizer") + @patch("nemo_deploy.llm.inference.inference_base.build_and_load_model") + @patch("nemo_deploy.llm.inference.inference_base.initialize_megatron_for_inference") + @patch("nemo_deploy.llm.inference.inference_base.load_model_config") + @patch("nemo_deploy.llm.inference.inference_base.torch_distributed_init") + def test_setup_megatron_tokenizer_path_provided( + self, + mock_torch_dist_init, + mock_load_config, + mock_init_megatron, + mock_build_model, + mock_load_tokenizer, + ): + """Test that when tokenizer_path is provided, it overrides checkpoint tokenizer.""" + # Setup mocks + mock_mlm_args = MagicMock() + mock_mlm_args.tokenizer_model = "/checkpoint/tokenizer.model" + mock_model_config = self.model_config + mock_load_config.return_value = (mock_model_config, mock_mlm_args) + mock_build_model.return_value = self.mock_model_list + mock_load_tokenizer.return_value = self.mock_tokenizer + + # Custom tokenizer path + custom_tokenizer_path = "/custom/path/tokenizer.model" + + # Call the function with tokenizer_path + result = setup_megatron_model_and_tokenizer_for_inference( + checkpoint_path=self.mock_path, + tokenizer_path=custom_tokenizer_path, + ) + + # Verify that mlm_args.tokenizer_model was updated to custom path + self.assertEqual(mock_mlm_args.tokenizer_model, custom_tokenizer_path) + + # Verify load_tokenizer was called with the custom tokenizer path + mock_load_tokenizer.assert_called_once_with(self.mock_path, tokenizer_model=custom_tokenizer_path) + + # Verify result contains model list and tokenizer + self.assertEqual(result[0], self.mock_model_list) + self.assertEqual(result[1], self.mock_tokenizer) + self.assertEqual(result[2], mock_mlm_args) + + @patch("nemo_deploy.llm.inference.inference_base.load_tokenizer") + @patch("nemo_deploy.llm.inference.inference_base.build_and_load_model") + @patch("nemo_deploy.llm.inference.inference_base.initialize_megatron_for_inference") + @patch("nemo_deploy.llm.inference.inference_base.load_model_config") + @patch("nemo_deploy.llm.inference.inference_base.torch_distributed_init") + @patch("pathlib.Path.exists") + def test_setup_megatron_tokenizer_path_none_checkpoint_exists( + self, + mock_path_exists, + mock_torch_dist_init, + mock_load_config, + mock_init_megatron, + mock_build_model, + mock_load_tokenizer, + ): + """Test that when tokenizer_path is None and checkpoint tokenizer exists, it uses checkpoint tokenizer.""" + # Setup mocks + mock_mlm_args = MagicMock() + mock_mlm_args.tokenizer_model = "/checkpoint/tokenizer.model" + mock_model_config = self.model_config + mock_load_config.return_value = (mock_model_config, mock_mlm_args) + mock_build_model.return_value = self.mock_model_list + mock_load_tokenizer.return_value = self.mock_tokenizer + mock_path_exists.return_value = True # Tokenizer exists + + # Call the function without tokenizer_path + result = setup_megatron_model_and_tokenizer_for_inference( + checkpoint_path=self.mock_path, + tokenizer_path=None, + ) + + # Verify that mlm_args.tokenizer_model was NOT changed + self.assertEqual(mock_mlm_args.tokenizer_model, "/checkpoint/tokenizer.model") + + # Verify load_tokenizer was called without custom tokenizer path + mock_load_tokenizer.assert_called_once_with(self.mock_path) + + # Verify result contains model list and tokenizer + self.assertEqual(result[0], self.mock_model_list) + self.assertEqual(result[1], self.mock_tokenizer) + self.assertEqual(result[2], mock_mlm_args) + + @patch("nemo_deploy.llm.inference.inference_base.load_tokenizer") + @patch("nemo_deploy.llm.inference.inference_base.build_and_load_model") + @patch("nemo_deploy.llm.inference.inference_base.initialize_megatron_for_inference") + @patch("nemo_deploy.llm.inference.inference_base.load_model_config") + @patch("nemo_deploy.llm.inference.inference_base.torch_distributed_init") + def test_setup_megatron_tokenizer_path_no_tokenizer_in_mlm_args( + self, + mock_torch_dist_init, + mock_load_config, + mock_init_megatron, + mock_build_model, + mock_load_tokenizer, + ): + """Test that when mlm_args has no tokenizer_model attribute and custom path is provided.""" + # Setup mocks - mlm_args without tokenizer_model attribute + mock_mlm_args = MagicMock(spec=[]) # Empty spec means no attributes + mock_model_config = self.model_config + mock_load_config.return_value = (mock_model_config, mock_mlm_args) + mock_build_model.return_value = self.mock_model_list + mock_load_tokenizer.return_value = self.mock_tokenizer + + # Custom tokenizer path + custom_tokenizer_path = "/custom/path/tokenizer.model" + + # Call the function with tokenizer_path + result = setup_megatron_model_and_tokenizer_for_inference( + checkpoint_path=self.mock_path, + tokenizer_path=custom_tokenizer_path, + ) + + # Verify load_tokenizer was called with the custom tokenizer path + mock_load_tokenizer.assert_called_once_with(self.mock_path, tokenizer_model=custom_tokenizer_path) + + # Verify result contains model list and tokenizer + self.assertEqual(result[0], self.mock_model_list) + self.assertEqual(result[1], self.mock_tokenizer) + if __name__ == "__main__": unittest.main()