Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,16 @@ nm.deploy()
nm.serve()
```

### Deploy NeMo Multimodal Models Directly with Triton Inference Server
### Deploy Megatron Multimodal Models Directly with Triton Inference Server

You can also deploy NeMo multimodal models directly using Triton Inference Server without exporting to TensorRT-LLM. This provides a simpler deployment path while still leveraging Triton's scalable serving capabilities.
You can also deploy Megatron multimodal models directly using Triton Inference Server without exporting to TensorRT-LLM. This provides a simpler deployment path while still leveraging Triton's scalable serving capabilities.

```python
from nemo_deploy import DeployPyTriton
from nemo_deploy.multimodal import NeMoMultimodalDeployable
from nemo_deploy.multimodal import MegatronMultimodalDeployable

model = NeMoMultimodalDeployable(
nemo_checkpoint_filepath="/path/to/model.nemo",
model = MegatronMultimodalDeployable(
megatron_checkpoint_filepath="/path/to/model.nemo",
tensor_parallel_size=1,
pipeline_parallel_size=1,
)
Expand Down Expand Up @@ -458,18 +458,17 @@ output = nq.query(
print(output)
```

### Query Directly Deployed NeMo Multimodal Models
### Query Directly Deployed Megatron Multimodal Models

For multimodal models deployed directly with `NeMoMultimodalDeployable`, use the `NemoQueryMultimodalPytorch` class:
For multimodal models deployed directly with `MegatronMultimodalDeployable`, use the `NemoQueryMultimodalPytorch` class:

```python
from nemo_deploy.multimodal import NemoQueryMultimodalPytorch
from PIL import Image

nq = NemoQueryMultimodalPytorch(url="localhost:8000", model_name="qwen")
output = nq.query_multimodal(
prompts=["What is in this image?"],
images=[Image.open("/path/to/image.jpg")],
images=["https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen-VL/assets/demo.jpeg"],
max_length=100,
top_k=1,
top_p=0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
from nemo_deploy import ITritonDeployable
from nemo_deploy.utils import cast_output, str_ndarray2list
from nemo_export_deploy_common.import_utils import (
MISSING_NEMO_MSG,
MISSING_MBRIDGE_MSG,
MISSING_TRITON_MSG,
UnavailableError,
null_decorator,
)

try:
from nemo.collections.vlm.inference.base import generate, setup_model_and_tokenizer
from nemo.collections.vlm.inference.qwenvl_inference_wrapper import QwenVLInferenceWrapper
from megatron.bridge.inference.vlm.base import generate, setup_model_and_tokenizer
from megatron.bridge.inference.vlm.qwenvl_inference_wrapper import QwenVLInferenceWrapper

HAVE_NEMO = True
HAVE_MBRIDGE = True
except (ImportError, ModuleNotFoundError):
HAVE_NEMO = False
HAVE_MBRIDGE = False
from typing import Any

generate = Any
Expand Down Expand Up @@ -67,42 +67,46 @@ def dict_to_str(messages):
return json.dumps(messages)


class NeMoMultimodalDeployable(ITritonDeployable):
"""Triton inference server compatible deploy class for a NeMo multimodal model file.
class MegatronMultimodalDeployable(ITritonDeployable):
"""Triton inference server compatible deploy class for a Megatron multimodal model file.

Args:
nemo_checkpoint_filepath (str): path for the nemo checkpoint.
tensor_parallel_size (int): tensor parallelism.
pipeline_parallel_size (int): pipeline parallelism.
megatron_checkpoint_filepath (str): path for the megatron checkpoint.
tensor_model_parallel_size (int): tensor parallelism.
pipeline_model_parallel_size (int): pipeline parallelism.
params_dtype (torch.dtype): data type for model parameters.
inference_batch_times_seqlen_threshold (int): sequence threshold.
inference_max_seq_length (int): maximum sequence length for inference.
"""

def __init__(
self,
nemo_checkpoint_filepath: str = None,
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
megatron_checkpoint_filepath: str,
tensor_model_parallel_size: int = 1,
pipeline_model_parallel_size: int = 1,
params_dtype: torch.dtype = torch.bfloat16,
inference_batch_times_seqlen_threshold: int = 1000,
inference_max_seq_length: int = 8192,
):
if not HAVE_TRITON:
raise UnavailableError(MISSING_TRITON_MSG)
if not HAVE_NEMO:
raise UnavailableError(MISSING_NEMO_MSG)
if not HAVE_MBRIDGE:
raise UnavailableError(MISSING_MBRIDGE_MSG)

self.nemo_checkpoint_filepath = nemo_checkpoint_filepath
self.tensor_parallel_size = tensor_parallel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.megatron_checkpoint_filepath = megatron_checkpoint_filepath
self.tensor_model_parallel_size = tensor_model_parallel_size
self.pipeline_model_parallel_size = pipeline_model_parallel_size
self.params_dtype = params_dtype
self.inference_batch_times_seqlen_threshold = inference_batch_times_seqlen_threshold
self.inference_max_seq_length = inference_max_seq_length

self.inference_wrapped_model, self.processor = setup_model_and_tokenizer(
path=nemo_checkpoint_filepath,
tp_size=tensor_parallel_size,
pp_size=pipeline_parallel_size,
megatron_model_path=megatron_checkpoint_filepath,
tp=tensor_model_parallel_size,
pp=pipeline_model_parallel_size,
params_dtype=params_dtype,
inference_batch_times_seqlen_threshold=inference_batch_times_seqlen_threshold,
inference_max_seq_length=inference_max_seq_length,
)

def generate(
Expand Down Expand Up @@ -157,16 +161,24 @@ def apply_chat_template(self, messages, add_generation_prompt=True):
)
return text

def base64_to_image(self, image_base64):
"""Convert base64-encoded image to PIL Image."""
def process_image_input(self, image_source):
"""Process image input from base64-encoded string or HTTP URL.

Args:
image_source (str): Image source - either base64-encoded image string with data URI prefix
(e.g., "data:image;base64,...") or HTTP/HTTPS URL (e.g., "http://example.com/image.jpg")

Returns:
Processed image content suitable for model inference.
"""
if isinstance(self.inference_wrapped_model, QwenVLInferenceWrapper):
from qwen_vl_utils import process_vision_info

messages = [
{
"role": "user",
"content": [
{"type": "image", "image": f"data:image;base64,{image_base64}"},
{"type": "image", "image": image_source},
],
}
]
Expand Down Expand Up @@ -259,14 +271,20 @@ def _infer_fn(
Returns:
dict: sentences.
"""
# Handle temperature=0.0 for greedy decoding
if temperature == 0.0:
LOGGER.warning("temperature=0.0 detected. Setting top_k=1 for greedy sampling.")
top_k = 1
top_p = 0.0

inference_params = CommonInferenceParams(
temperature=float(temperature),
top_k=int(top_k),
top_p=float(top_p),
num_tokens_to_generate=num_tokens_to_generate,
)

images = [self.base64_to_image(img_b64) for img_b64 in images]
images = [self.process_image_input(image_source) for image_source in images]

results = self.generate(
prompts,
Expand Down
14 changes: 11 additions & 3 deletions nemo_deploy/multimodal/query_multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,16 @@ class NemoQueryMultimodalPytorch:

nq = NemoQueryMultimodalPytorch(url="localhost", model_name="qwen")

# Encode image to base64
# Option 1: Use HTTP URL directly
output = nq.query_multimodal(
prompts=["Describe this image"],
images=["http://example.com/image.jpg"],
max_length=100,
)

# Option 2: Encode image to base64 with data URI prefix
with open("image.jpg", "rb") as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
image_base64 = "data:image;base64," + base64.b64encode(f.read()).decode('utf-8')

output = nq.query_multimodal(
prompts=["Describe this image"],
Expand Down Expand Up @@ -231,7 +238,8 @@ def query_multimodal(

Args:
prompts (List[str]): List of input text prompts.
images (List[str]): List of base64-encoded image strings.
images (List[str]): List of image strings - either base64-encoded with data URI prefix
(e.g., "data:image;base64,...") or HTTP/HTTPS URLs (e.g., "http://example.com/image.jpg").
max_length (Optional[int]): Maximum number of tokens to generate.
max_batch_size (Optional[int]): Maximum batch size for inference.
top_k (Optional[int]): Limits to the top K tokens to consider at each step.
Expand Down
35 changes: 24 additions & 11 deletions nemo_deploy/service/fastapi_interface_to_pytriton_multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, model_validator
from pydantic import BaseModel
from pydantic_settings import BaseSettings

from nemo_deploy.multimodal.query_multimodal import NemoQueryMultimodalPytorch
Expand Down Expand Up @@ -82,18 +82,10 @@ class BaseMultimodalRequest(BaseModel):
max_tokens: int = 50
temperature: float = 1.0
top_p: float = 0.0
top_k: int = 1
top_k: int = 0
random_seed: Optional[int] = None
max_batch_size: int = 4

@model_validator(mode="after")
def set_greedy_params(self):
"""Validate parameters for greedy decoding."""
if self.temperature == 0 and self.top_p == 0:
logging.warning("Both temperature and top_p are 0. Setting top_k to 1 to ensure greedy sampling.")
self.top_k = 1
return self


class MultimodalCompletionRequest(BaseMultimodalRequest):
"""Represents a request for multimodal text completion.
Expand Down Expand Up @@ -290,12 +282,33 @@ def dict_to_str(messages):

@app.post("/v1/chat/completions/")
async def chat_completions_v1(request: MultimodalChatCompletionRequest):
"""Defines the multimodal chat completions endpoint and queries the model deployed on PyTriton server."""
"""Defines the multimodal chat completions endpoint and queries the model deployed on PyTriton server.

Supports two image content formats (normalized internally to format 1):
1. {"type": "image", "image": "url_or_base64"}
2. {"type": "image_url", "image_url": {"url": "url_or_base64"}} (OpenAI-style, converted to format 1)
"""
url = f"http://{triton_settings.triton_service_ip}:{triton_settings.triton_service_port}"

prompts = request.messages
if not isinstance(request.messages, list):
prompts = [request.messages]

# Normalize image_url format to image format for consistent processing
for message in prompts:
for content in message["content"]:
if content["type"] == "image_url":
# Convert OpenAI-style image_url to standard image format
if isinstance(content.get("image_url"), dict):
image_data = content["image_url"]["url"]
else:
image_data = content["image_url"]
# Transform to image format
content["type"] = "image"
content["image"] = image_data
# Remove image_url field
content.pop("image_url", None)

# Serialize the dictionary to a JSON string represnetation to be able to convert to numpy array
# (str_list2numpy) and back to list (str_ndarray2list) as required by PyTriton. Using the dictionaries directly
# with these methods is not possible as they expect string type.
Expand Down
3 changes: 3 additions & 0 deletions nemo_export_deploy_common/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
MISSING_TENSORRT_LLM_MSG = "tensorrt_llm is not available. Please install it with `pip install tensorrt-llm`."
MISSING_TENSORRT_MSG = "tensorrt is not available. Please install it with `pip install nvidia-tensorrt`."
MISSING_NEMO_MSG = "nemo is not available. Please install it with `pip install nemo`."
MISSING_MBRIDGE_MSG = (
"megatron.bridge is not available. Please install it from https://github.com/NVIDIA-NeMo/Megatron-Bridge"
)
MISSING_TORCHVISION_MSG = "torchvision is not available. Please install it with `pip install torchvision`."
MISSING_MODELOPT_MSG = "modelopt is not available. Please install it with `pip install nvidia-modelopt[torch]`."
MISSING_RAY_MSG = "ray is not available. Please install it with `pip install ray`."
Expand Down
32 changes: 20 additions & 12 deletions scripts/deploy/multimodal/deploy_inframework_triton.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@

multimodal_supported = True
try:
from nemo_deploy.multimodal.nemo_multimodal_deployable import NeMoMultimodalDeployable
from nemo_deploy.multimodal.megatron_multimodal_deployable import MegatronMultimodalDeployable
except Exception as e:
LOGGER.warning(f"Cannot import NeMoMultimodalDeployable, it will not be available. {type(e).__name__}: {e}")
LOGGER.warning(f"Cannot import MegatronMultimodalDeployable, it will not be available. {type(e).__name__}: {e}")
multimodal_supported = False


def get_args(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Deploy nemo multimodal models to Triton",
description="Deploy megatron multimodal models to Triton",
)
parser.add_argument("-nc", "--nemo_checkpoint", type=str, help="Source .nemo file")
parser.add_argument("-mc", "--megatron_checkpoint", type=str, help="Source megatron checkpoint path")
parser.add_argument(
"-tmn",
"--triton_model_name",
Expand Down Expand Up @@ -88,14 +88,14 @@ def get_args(argv):

parser.add_argument(
"-tps",
"--tensor_parallel_size",
"--tensor_model_parallel_size",
default=1,
type=int,
help="Tensor parallelism size",
)
parser.add_argument(
"-pps",
"--pipeline_parallel_size",
"--pipeline_model_parallel_size",
default=1,
type=int,
help="Pipeline parallelism size",
Expand Down Expand Up @@ -130,6 +130,13 @@ def get_args(argv):
type=int,
help="Inference batch times sequence length threshold",
)
parser.add_argument(
"-imsl",
"--inference_max_seq_length",
default=8192,
type=int,
help="Maximum sequence length for inference",
)
args = parser.parse_args(argv)
return args

Expand All @@ -147,9 +154,9 @@ def nemo_deploy(argv):
LOGGER.info(args)

if not multimodal_supported:
raise ValueError("NeMoMultimodalDeployable is not supported in this environment.")
raise ValueError("MegatronMultimodalDeployable is not supported in this environment.")

if args.nemo_checkpoint is None:
if args.megatron_checkpoint is None:
raise ValueError("In-Framework deployment requires a checkpoint folder.")

# Convert dtype string to torch dtype
Expand All @@ -160,12 +167,13 @@ def nemo_deploy(argv):
}
params_dtype = dtype_map[args.params_dtype]

model = NeMoMultimodalDeployable(
nemo_checkpoint_filepath=args.nemo_checkpoint,
tensor_parallel_size=args.tensor_parallel_size,
pipeline_parallel_size=args.pipeline_parallel_size,
model = MegatronMultimodalDeployable(
megatron_checkpoint_filepath=args.megatron_checkpoint,
tensor_model_parallel_size=args.tensor_model_parallel_size,
pipeline_model_parallel_size=args.pipeline_model_parallel_size,
params_dtype=params_dtype,
inference_batch_times_seqlen_threshold=args.inference_batch_times_seqlen_threshold,
inference_max_seq_length=args.inference_max_seq_length,
)

if torch.distributed.is_initialized():
Expand Down
Loading
Loading