From 1b97711e3d258e16f986bddb9be52af3f78919e1 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sat, 10 Jan 2026 16:03:57 -0500 Subject: [PATCH 01/21] fix --- components/src/dynamo/vllm/handlers.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index b9a7ba6baa5..63b77a52b42 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1300,16 +1300,21 @@ async def _generate_token_mode(self, request, context, request_id): async def _generate_text_mode(self, request, context, request_id): """Generate text using OpenAI-compatible format (text-in-text-out).""" + # Get text input using InputParamManager input_data = self.input_param_manager.get_input_param( request, use_tokenizer=True ) + multi_modal_data = await self._extract_multimodal_data(request) + # Build prompt for vLLM if isinstance(input_data, list): - prompt = TokensPrompt(prompt_token_ids=input_data) + prompt = TokensPrompt( + prompt_token_ids=input_data, multi_modal_data=multi_modal_data + ) else: - prompt = TextPrompt(prompt=input_data) + prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) # Build sampling params from OpenAI-style request sampling_params = build_sampling_params_openai( From e3254ad11b1fbdc23197a33308145d526aca68cf Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Fri, 9 Jan 2026 14:39:47 -0800 Subject: [PATCH 02/21] ups Signed-off-by: Qidong Su From b251962a8505c30e6364fc7e2d714d8874f02aca Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 11 Jan 2026 00:00:14 -0500 Subject: [PATCH 03/21] upd --- components/src/dynamo/vllm/handlers.py | 56 +++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 63b77a52b42..e757c7b1429 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -893,6 +893,59 @@ async def _extract_multimodal_data( return vllm_mm_data if vllm_mm_data else None + async def _extract_multimodal_from_openai_messages( + self, request: Dict[str, Any] + ) -> Dict[str, Any] | None: + messages = request.get("messages") + if not messages: + return None + + image_urls = [] + for message in messages: + content = message.get("content") + if not isinstance(content, list): + continue + + for item in content: + if not isinstance(item, dict) or item.get("type") != "image_url": + continue + + image_url_data = item.get("image_url") + if isinstance(image_url_data, dict): + url = image_url_data.get("url") + elif isinstance(image_url_data, str): + url = image_url_data + else: + continue + + if url: + image_urls.append(url) + + if not image_urls: + return None + + if not self.enable_multimodal: + raise ValueError( + "Received multimodal data but multimodal processing is not enabled. " + "Use --enable-multimodal flag to enable multimodal processing." + ) + + images = [] + for url in image_urls: + try: + image = await self.image_loader.load_image(url) + images.append(image) + logger.debug(f"Loaded image from OpenAI message: {url[:80]}...") + except Exception: + logger.exception(f"Failed to load image from {url[:80]}...") + raise + + vllm_mm_data = {"image": images[0] if len(images) == 1 else images} + logger.debug( + f"Extracted {len(images)} image(s) from OpenAI messages for multimodal processing" + ) + return vllm_mm_data + def _build_prompt_from_request( self, request: Dict[str, Any], @@ -1306,7 +1359,8 @@ async def _generate_text_mode(self, request, context, request_id): request, use_tokenizer=True ) - multi_modal_data = await self._extract_multimodal_data(request) + # Extract multimodal data + multi_modal_data = await self._extract_multimodal_from_openai_messages(request) # Build prompt for vLLM if isinstance(input_data, list): From 11db62bab46d3a7c803e1e0b742d8fbfb1affef9 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Mon, 12 Jan 2026 14:42:04 -0500 Subject: [PATCH 04/21] update --- components/src/dynamo/vllm/handlers.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index e757c7b1429..c3daf0a50ae 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -174,6 +174,7 @@ def build_sampling_params( def build_sampling_params_openai( request: Dict[str, Any], default_sampling_params: Dict[str, Any], + model_max_len: int | None = None, ) -> SamplingParams: """ Build SamplingParams from an OpenAI-compatible request format. @@ -181,7 +182,8 @@ def build_sampling_params_openai( Args: request: The OpenAI-style request dict with parameters like temperature, max_tokens, etc. default_sampling_params: Default sampling parameters to initialize with - + model_max_len: Maximum model context length for computing dynamic max_tokens default + Returns: SamplingParams configured from the request """ @@ -210,6 +212,9 @@ def build_sampling_params_openai( # Handle max_tokens if "max_tokens" in request and request["max_tokens"] is not None: sampling_params.max_tokens = request["max_tokens"] + elif model_max_len is not None: + # Match token mode behavior: generate until context limit + sampling_params.max_tokens = model_max_len # Handle stop sequences if "stop" in request and request["stop"] is not None: @@ -1372,7 +1377,7 @@ async def _generate_text_mode(self, request, context, request_id): # Build sampling params from OpenAI-style request sampling_params = build_sampling_params_openai( - request, self.default_sampling_params + request, self.default_sampling_params, self.model_max_len ) dp_rank = request.get("dp_rank", None) From 986f699465f44a0d60945617d08647d35990ed50 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Mon, 12 Jan 2026 14:58:01 -0500 Subject: [PATCH 05/21] fix --- components/src/dynamo/vllm/handlers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index c3daf0a50ae..b84bcbef039 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -210,8 +210,8 @@ def build_sampling_params_openai( setattr(sampling_params, param_key, request[req_key]) # Handle max_tokens - if "max_tokens" in request and request["max_tokens"] is not None: - sampling_params.max_tokens = request["max_tokens"] + if (provided_max_tokens := request.get("max_tokens")) is not None: + sampling_params.max_tokens = provided_max_tokens elif model_max_len is not None: # Match token mode behavior: generate until context limit sampling_params.max_tokens = model_max_len From c3794c5779751d2b3975a5bd34d7f1e372167bab Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 1 Feb 2026 22:12:35 -0500 Subject: [PATCH 06/21] upd --- components/src/dynamo/vllm/handlers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index b84bcbef039..c1053f17f16 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -210,11 +210,11 @@ def build_sampling_params_openai( setattr(sampling_params, param_key, request[req_key]) # Handle max_tokens - if (provided_max_tokens := request.get("max_tokens")) is not None: - sampling_params.max_tokens = provided_max_tokens - elif model_max_len is not None: - # Match token mode behavior: generate until context limit - sampling_params.max_tokens = model_max_len + provided_max_tokens = request.get("max_tokens") + model_config_max_tokens = default_sampling_params.get("max_tokens") + + sampling_params.max_tokens = min(filter(lambda x: x is not None, + [provided_max_tokens, model_max_len, model_config_max_tokens])) # Handle stop sequences if "stop" in request and request["stop"] is not None: From ddde01b414c81eeed6f9c452ad005b160887e509 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Mon, 2 Feb 2026 16:27:22 -0500 Subject: [PATCH 07/21] fix --- components/src/dynamo/vllm/handlers.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index c1053f17f16..b6daf581294 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -166,8 +166,12 @@ def build_sampling_params( input_length = len(token_ids) dynamic_default = max(1, model_max_len - input_length) model_config_max_tokens = default_sampling_params.get("max_tokens") - sampling_params.max_tokens = min(filter(lambda x: x is not None, - [provided_max_tokens, dynamic_default, model_config_max_tokens])) + sampling_params.max_tokens = min( + filter( + lambda x: x is not None, + [provided_max_tokens, dynamic_default, model_config_max_tokens], + ) + ) return sampling_params @@ -183,7 +187,7 @@ def build_sampling_params_openai( request: The OpenAI-style request dict with parameters like temperature, max_tokens, etc. default_sampling_params: Default sampling parameters to initialize with model_max_len: Maximum model context length for computing dynamic max_tokens default - + Returns: SamplingParams configured from the request """ @@ -212,9 +216,13 @@ def build_sampling_params_openai( # Handle max_tokens provided_max_tokens = request.get("max_tokens") model_config_max_tokens = default_sampling_params.get("max_tokens") - - sampling_params.max_tokens = min(filter(lambda x: x is not None, - [provided_max_tokens, model_max_len, model_config_max_tokens])) + + sampling_params.max_tokens = min( + filter( + lambda x: x is not None, + [provided_max_tokens, model_max_len, model_config_max_tokens], + ) + ) # Handle stop sequences if "stop" in request and request["stop"] is not None: From b2c511014e145cd6911b7ef535ab6321b96ceb19 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Wed, 4 Feb 2026 23:12:28 -0500 Subject: [PATCH 08/21] upd --- .../vllm/multimodal_utils/image_loader.py | 206 ++++++++++++++---- 1 file changed, 163 insertions(+), 43 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index d34fd9863e1..3571a78ff04 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -14,31 +14,175 @@ # limitations under the License. import asyncio -import base64 -import binascii import logging +import threading from io import BytesIO +from typing import TypeAlias, Union from urllib.parse import urlparse import httpx +import pybase64 +import torch from PIL import Image from .http_client import get_http_client logger = logging.getLogger(__name__) +# Image output can be either PIL Image or Tensor (from nvimgcodec) +ImageOutput: TypeAlias = Union[Image.Image, torch.Tensor] + +# Thread-local storage for nvimgcodec decoders +_thread_local = threading.local() + +# Lazy import for nvimgcodec +_nvimgcodec = None + + +def _get_nvimgcodec(): + """Lazy import nvimgcodec to avoid import errors if not installed.""" + global _nvimgcodec + if _nvimgcodec is None: + from nvidia import nvimgcodec + _nvimgcodec = nvimgcodec + return _nvimgcodec + + +def get_decoder(): + """Get or create a thread-local nvimgcodec decoder instance.""" + if not hasattr(_thread_local, "decoder"): + nvimgcodec = _get_nvimgcodec() + _thread_local.decoder = nvimgcodec.Decoder() + logger.info("nvimgcodec decoder initialized for thread") + return _thread_local.decoder + class ImageLoader: CACHE_SIZE_MAXIMUM = 8 def __init__( - self, cache_size: int = CACHE_SIZE_MAXIMUM, http_timeout: float = 30.0 + self, + cache_size: int = CACHE_SIZE_MAXIMUM, + http_timeout: float = 30.0, + use_nvimgcodec: bool = True, + image_mode: str = "RGB", ): + """ + Initialize the ImageLoader. + + Args: + cache_size: Maximum number of images to cache + http_timeout: Timeout for HTTP requests + use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding + (returns 4D torch.Tensor). If False, use PIL (returns Image.Image) + image_mode: Target image mode for PIL conversion (default: "RGB") + """ self._http_timeout = http_timeout - self._image_cache: dict[str, Image.Image] = {} + self._use_nvimgcodec = use_nvimgcodec + self._image_mode = image_mode + self._image_cache: dict[str, ImageOutput] = {} self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) - async def load_image(self, image_url: str) -> Image.Image: + def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: + """ + Decode image bytes using nvimgcodec for GPU-accelerated decoding. + + Args: + data: Raw image bytes + + Returns: + torch.Tensor in NCHW format (4D) on CUDA device. + Shape: (1, C, H, W) - batch dimension added so vLLM treats it as + a batch of images, not as embeddings. + """ + decoder = get_decoder() + decoded = decoder.decode(data) + + device = torch.device("cuda", torch.cuda.current_device()) + tensor = torch.as_tensor(decoded, device=device) + # HWC -> CHW + tensor = tensor.permute(2, 0, 1) + # Add batch dimension: CHW -> NCHW (1, C, H, W) + # This is critical: 3D tensors are interpreted as embeddings by vLLM, + # but 4D tensors are interpreted as a batch of images. + tensor = tensor.unsqueeze(0) + + return tensor + + def _decode_with_pil(self, data: bytes) -> Image.Image: + """ + Decode image bytes using PIL. + + Args: + data: Raw image bytes + + Returns: + PIL Image converted to the target image mode + """ + image = Image.open(BytesIO(data)) + + # Validate image format + if image.format not in ("JPEG", "PNG", "WEBP", "GIF"): + raise ValueError(f"Unsupported image format: {image.format}") + + # Convert to target mode + if image.mode != self._image_mode: + image = image.convert(self._image_mode) + + return image + + async def _fetch_image_bytes(self, image_url: str) -> bytes: + """ + Fetch image bytes from a URL or data URI. + + Args: + image_url: URL (http/https) or data URI (data:image/...;base64,...) + + Returns: + Raw image bytes + """ + parsed_url = urlparse(image_url) + + if parsed_url.scheme == "data": + # Parse data URL format: data:[][;base64], + if not parsed_url.path.startswith("image/"): + raise ValueError("Data URL must be an image type") + + # Split the path into media type and data + media_type, data = parsed_url.path.split(",", 1) + if ";base64" not in media_type: + raise ValueError("Data URL must be base64 encoded") + + try: + # Use pybase64 for faster base64 decoding + return pybase64.b64decode(data, validate=True) + except Exception as e: + raise ValueError(f"Invalid base64 encoding: {e}") + + elif parsed_url.scheme in ("http", "https"): + http_client = get_http_client(self._http_timeout) + + response = await http_client.get(image_url) + response.raise_for_status() + + if not response.content: + raise ValueError("Empty response content from image URL") + + return response.content + + else: + raise ValueError(f"Invalid image source scheme: {parsed_url.scheme}") + + async def load_image(self, image_url: str) -> ImageOutput: + """ + Load an image from a URL or data URI. + + Args: + image_url: URL (http/https) or data URI (data:image/...;base64,...) + + Returns: + torch.Tensor in NCHW format (if use_nvimgcodec=True) or PIL Image + """ parsed_url = urlparse(image_url) # For HTTP(S) URLs, check cache first @@ -49,55 +193,31 @@ async def load_image(self, image_url: str) -> Image.Image: return self._image_cache[image_url_lower] try: - if parsed_url.scheme == "data": - # Parse data URL format: data:[][;base64], - if not parsed_url.path.startswith("image/"): - raise ValueError("Data URL must be an image type") - - # Split the path into media type and data - media_type, data = parsed_url.path.split(",", 1) - if ";base64" not in media_type: - raise ValueError("Data URL must be base64 encoded") - - try: - image_bytes = base64.b64decode(data) - image_data = BytesIO(image_bytes) - except binascii.Error as e: - raise ValueError(f"Invalid base64 encoding: {e}") - elif parsed_url.scheme in ("http", "https"): - http_client = get_http_client(self._http_timeout) - - response = await http_client.get(image_url) - response.raise_for_status() - - if not response.content: - raise ValueError("Empty response content from image URL") - - image_data = BytesIO(response.content) - else: - raise ValueError(f"Invalid image source scheme: {parsed_url.scheme}") - - # PIL is sync, so offload to a thread to avoid blocking the event loop - image = await asyncio.to_thread(Image.open, image_data) + # Fetch image bytes + image_bytes = await self._fetch_image_bytes(image_url) - # Validate image format and convert to RGB - if image.format not in ("JPEG", "PNG", "WEBP"): - raise ValueError(f"Unsupported image format: {image.format}") - - image_converted = image.convert("RGB") + # Decode the image + if self._use_nvimgcodec: + # nvimgcodec decoding (GPU-accelerated, returns 4D tensor) + image_result = self._decode_with_nvimgcodec(image_bytes) + else: + # PIL decoding (CPU-bound, offload to thread) + image_result = await asyncio.to_thread( + self._decode_with_pil, image_bytes + ) # Cache HTTP(S) URLs if parsed_url.scheme in ("http", "https"): image_url_lower = image_url.lower() - # Cache the image for future use, and evict the oldest image if the cache is full + # Cache the image for future use, and evict the oldest image if full if self._cache_queue.full(): oldest_image_url = await self._cache_queue.get() del self._image_cache[oldest_image_url] - self._image_cache[image_url_lower] = image_converted + self._image_cache[image_url_lower] = image_result await self._cache_queue.put(image_url_lower) - return image_converted + return image_result except httpx.HTTPError as e: logger.error(f"HTTP error loading image: {e}") From f10b42040a69b8dffda75b02dfde282404b93418 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Thu, 5 Feb 2026 14:02:09 -0500 Subject: [PATCH 09/21] upd --- .../dynamo/vllm/multimodal_utils/__init__.py | 3 +- .../vllm/multimodal_utils/image_loader.py | 107 +++++++++++++++++- 2 files changed, 103 insertions(+), 7 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_utils/__init__.py b/components/src/dynamo/vllm/multimodal_utils/__init__.py index e9f79e025c1..399a7fafed2 100644 --- a/components/src/dynamo/vllm/multimodal_utils/__init__.py +++ b/components/src/dynamo/vllm/multimodal_utils/__init__.py @@ -13,7 +13,7 @@ get_encoder_components, ) from dynamo.vllm.multimodal_utils.http_client import get_http_client -from dynamo.vllm.multimodal_utils.image_loader import ImageLoader +from dynamo.vllm.multimodal_utils.image_loader import ImageLoader, get_decode_thread_pool from dynamo.vllm.multimodal_utils.model import ( SupportedModels, construct_mm_data, @@ -37,6 +37,7 @@ "encode_image_embeddings", "extract_user_text", "get_encoder_components", + "get_decode_thread_pool", "get_http_client", "ImageLoader", "SupportedModels", diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index 3571a78ff04..8ce9ab47a4a 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -14,11 +14,13 @@ # limitations under the License. import asyncio +import atexit import logging +import os import threading +from concurrent.futures import ThreadPoolExecutor from io import BytesIO from typing import TypeAlias, Union -from urllib.parse import urlparse import httpx import pybase64 @@ -38,6 +40,54 @@ # Lazy import for nvimgcodec _nvimgcodec = None +# Global thread pool for image decoding operations +# Default to 8 workers, configurable via DYN_IMAGE_DECODE_WORKERS env var +_DEFAULT_DECODE_WORKERS = 8 +_decode_thread_pool: ThreadPoolExecutor | None = None +_decode_pool_lock = threading.Lock() + + +def get_decode_thread_pool(max_workers: int | None = None) -> ThreadPoolExecutor: + """ + Get or create the global thread pool for image decoding. + + Args: + max_workers: Maximum number of worker threads. If None, uses + DYN_IMAGE_DECODE_WORKERS env var or defaults to 8. + + Returns: + ThreadPoolExecutor for image decoding operations + """ + global _decode_thread_pool + + if _decode_thread_pool is None: + with _decode_pool_lock: + # Double-check after acquiring lock + if _decode_thread_pool is None: + if max_workers is None: + max_workers = int( + os.environ.get("DYN_IMAGE_DECODE_WORKERS", _DEFAULT_DECODE_WORKERS) + ) + _decode_thread_pool = ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="image_decode_", + ) + logger.info( + f"Initialized global image decode thread pool with {max_workers} workers" + ) + # Register cleanup on exit + atexit.register(_shutdown_decode_pool) + + return _decode_thread_pool + + +def _shutdown_decode_pool(): + """Shutdown the global decode thread pool on exit.""" + global _decode_thread_pool + if _decode_thread_pool is not None: + _decode_thread_pool.shutdown(wait=False) + logger.debug("Shutdown global image decode thread pool") + def _get_nvimgcodec(): """Lazy import nvimgcodec to avoid import errors if not installed.""" @@ -66,6 +116,7 @@ def __init__( http_timeout: float = 30.0, use_nvimgcodec: bool = True, image_mode: str = "RGB", + decode_workers: int | None = None, ): """ Initialize the ImageLoader. @@ -76,12 +127,16 @@ def __init__( use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding (returns 4D torch.Tensor). If False, use PIL (returns Image.Image) image_mode: Target image mode for PIL conversion (default: "RGB") + decode_workers: Number of worker threads for image decoding. If None, + uses DYN_IMAGE_DECODE_WORKERS env var or defaults to 8. """ self._http_timeout = http_timeout self._use_nvimgcodec = use_nvimgcodec self._image_mode = image_mode self._image_cache: dict[str, ImageOutput] = {} self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) + # Initialize the thread pool (uses global pool) + self._thread_pool = get_decode_thread_pool(decode_workers) def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: """ @@ -196,14 +251,18 @@ async def load_image(self, image_url: str) -> ImageOutput: # Fetch image bytes image_bytes = await self._fetch_image_bytes(image_url) - # Decode the image + # Decode the image using thread pool to avoid blocking event loop + loop = asyncio.get_running_loop() if self._use_nvimgcodec: # nvimgcodec decoding (GPU-accelerated, returns 4D tensor) - image_result = self._decode_with_nvimgcodec(image_bytes) + # Offload to thread pool to avoid blocking the event loop + image_result = await loop.run_in_executor( + self._thread_pool, self._decode_with_nvimgcodec, image_bytes + ) else: - # PIL decoding (CPU-bound, offload to thread) - image_result = await asyncio.to_thread( - self._decode_with_pil, image_bytes + # PIL decoding (CPU-bound, offload to thread pool) + image_result = await loop.run_in_executor( + self._thread_pool, self._decode_with_pil, image_bytes ) # Cache HTTP(S) URLs @@ -225,3 +284,39 @@ async def load_image(self, image_url: str) -> ImageOutput: except Exception as e: logger.error(f"Error loading image: {e}") raise ValueError(f"Failed to load image: {e}") + + async def load_images(self, image_urls: list[str]) -> list[ImageOutput]: + """ + Load multiple images concurrently. + + This method fetches and decodes multiple images in parallel, utilizing + the thread pool for decoding operations. This is more efficient than + calling load_image() sequentially for each image. + + Args: + image_urls: List of URLs (http/https) or data URIs + + Returns: + List of decoded images in the same order as input URLs. + Each element is torch.Tensor (if use_nvimgcodec=True) or PIL Image. + + Raises: + ValueError: If any image fails to load (all-or-nothing semantics) + """ + if not image_urls: + return [] + + if len(image_urls) == 1: + return [await self.load_image(image_urls[0])] + + # Load all images concurrently + tasks = [self.load_image(url) for url in image_urls] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check for exceptions and raise the first one found + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"Failed to load image at index {i} ({image_urls[i]}): {result}") + raise result + + return results From d63223bb9ac5233ae4bfd76678f9e2daefea2b0c Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Thu, 5 Feb 2026 14:12:59 -0500 Subject: [PATCH 10/21] upd --- .../dynamo/vllm/multimodal_utils/__init__.py | 3 +- .../vllm/multimodal_utils/image_loader.py | 97 ++----------------- 2 files changed, 9 insertions(+), 91 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_utils/__init__.py b/components/src/dynamo/vllm/multimodal_utils/__init__.py index 399a7fafed2..e9f79e025c1 100644 --- a/components/src/dynamo/vllm/multimodal_utils/__init__.py +++ b/components/src/dynamo/vllm/multimodal_utils/__init__.py @@ -13,7 +13,7 @@ get_encoder_components, ) from dynamo.vllm.multimodal_utils.http_client import get_http_client -from dynamo.vllm.multimodal_utils.image_loader import ImageLoader, get_decode_thread_pool +from dynamo.vllm.multimodal_utils.image_loader import ImageLoader from dynamo.vllm.multimodal_utils.model import ( SupportedModels, construct_mm_data, @@ -37,7 +37,6 @@ "encode_image_embeddings", "extract_user_text", "get_encoder_components", - "get_decode_thread_pool", "get_http_client", "ImageLoader", "SupportedModels", diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index 8ce9ab47a4a..3c6c07d76bf 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -14,13 +14,13 @@ # limitations under the License. import asyncio -import atexit import logging import os import threading from concurrent.futures import ThreadPoolExecutor from io import BytesIO from typing import TypeAlias, Union +from urllib.parse import urlparse import httpx import pybase64 @@ -42,51 +42,11 @@ # Global thread pool for image decoding operations # Default to 8 workers, configurable via DYN_IMAGE_DECODE_WORKERS env var -_DEFAULT_DECODE_WORKERS = 8 -_decode_thread_pool: ThreadPoolExecutor | None = None -_decode_pool_lock = threading.Lock() - - -def get_decode_thread_pool(max_workers: int | None = None) -> ThreadPoolExecutor: - """ - Get or create the global thread pool for image decoding. - - Args: - max_workers: Maximum number of worker threads. If None, uses - DYN_IMAGE_DECODE_WORKERS env var or defaults to 8. - - Returns: - ThreadPoolExecutor for image decoding operations - """ - global _decode_thread_pool - - if _decode_thread_pool is None: - with _decode_pool_lock: - # Double-check after acquiring lock - if _decode_thread_pool is None: - if max_workers is None: - max_workers = int( - os.environ.get("DYN_IMAGE_DECODE_WORKERS", _DEFAULT_DECODE_WORKERS) - ) - _decode_thread_pool = ThreadPoolExecutor( - max_workers=max_workers, - thread_name_prefix="image_decode_", - ) - logger.info( - f"Initialized global image decode thread pool with {max_workers} workers" - ) - # Register cleanup on exit - atexit.register(_shutdown_decode_pool) - - return _decode_thread_pool - - -def _shutdown_decode_pool(): - """Shutdown the global decode thread pool on exit.""" - global _decode_thread_pool - if _decode_thread_pool is not None: - _decode_thread_pool.shutdown(wait=False) - logger.debug("Shutdown global image decode thread pool") +_IMAGE_DECODE_WORKERS = int(os.environ.get("DYN_IMAGE_DECODE_WORKERS", 8)) +_decode_thread_pool = ThreadPoolExecutor( + max_workers=_IMAGE_DECODE_WORKERS, + thread_name_prefix="image_decode_", +) def _get_nvimgcodec(): @@ -116,7 +76,6 @@ def __init__( http_timeout: float = 30.0, use_nvimgcodec: bool = True, image_mode: str = "RGB", - decode_workers: int | None = None, ): """ Initialize the ImageLoader. @@ -127,16 +86,12 @@ def __init__( use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding (returns 4D torch.Tensor). If False, use PIL (returns Image.Image) image_mode: Target image mode for PIL conversion (default: "RGB") - decode_workers: Number of worker threads for image decoding. If None, - uses DYN_IMAGE_DECODE_WORKERS env var or defaults to 8. """ self._http_timeout = http_timeout self._use_nvimgcodec = use_nvimgcodec self._image_mode = image_mode self._image_cache: dict[str, ImageOutput] = {} self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) - # Initialize the thread pool (uses global pool) - self._thread_pool = get_decode_thread_pool(decode_workers) def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: """ @@ -257,12 +212,12 @@ async def load_image(self, image_url: str) -> ImageOutput: # nvimgcodec decoding (GPU-accelerated, returns 4D tensor) # Offload to thread pool to avoid blocking the event loop image_result = await loop.run_in_executor( - self._thread_pool, self._decode_with_nvimgcodec, image_bytes + _decode_thread_pool, self._decode_with_nvimgcodec, image_bytes ) else: # PIL decoding (CPU-bound, offload to thread pool) image_result = await loop.run_in_executor( - self._thread_pool, self._decode_with_pil, image_bytes + _decode_thread_pool, self._decode_with_pil, image_bytes ) # Cache HTTP(S) URLs @@ -284,39 +239,3 @@ async def load_image(self, image_url: str) -> ImageOutput: except Exception as e: logger.error(f"Error loading image: {e}") raise ValueError(f"Failed to load image: {e}") - - async def load_images(self, image_urls: list[str]) -> list[ImageOutput]: - """ - Load multiple images concurrently. - - This method fetches and decodes multiple images in parallel, utilizing - the thread pool for decoding operations. This is more efficient than - calling load_image() sequentially for each image. - - Args: - image_urls: List of URLs (http/https) or data URIs - - Returns: - List of decoded images in the same order as input URLs. - Each element is torch.Tensor (if use_nvimgcodec=True) or PIL Image. - - Raises: - ValueError: If any image fails to load (all-or-nothing semantics) - """ - if not image_urls: - return [] - - if len(image_urls) == 1: - return [await self.load_image(image_urls[0])] - - # Load all images concurrently - tasks = [self.load_image(url) for url in image_urls] - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Check for exceptions and raise the first one found - for i, result in enumerate(results): - if isinstance(result, Exception): - logger.error(f"Failed to load image at index {i} ({image_urls[i]}): {result}") - raise result - - return results From d80adb58bc2670d36d0e2e4aa556e358790a0222 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Thu, 5 Feb 2026 22:42:41 -0500 Subject: [PATCH 11/21] upd --- components/src/dynamo/vllm/multimodal_utils/image_loader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index 3c6c07d76bf..c34c8a5353a 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -105,8 +105,10 @@ def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: Shape: (1, C, H, W) - batch dimension added so vLLM treats it as a batch of images, not as embeddings. """ + nvimgcodec = _get_nvimgcodec() decoder = get_decoder() - decoded = decoder.decode(data) + code_stream = nvimgcodec.CodeStream(data) + decoded = decoder.decode(code_stream) device = torch.device("cuda", torch.cuda.current_device()) tensor = torch.as_tensor(decoded, device=device) From 128a1dacda1eaead218e9d9fec6e89774e76c7f0 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sat, 7 Feb 2026 16:27:28 -0500 Subject: [PATCH 12/21] add limit --- .../multimodal_handlers/worker_handler.py | 118 ++++++++++-------- .../vllm/multimodal_utils/image_loader.py | 28 +++++ 2 files changed, 91 insertions(+), 55 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py b/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py index d423860936f..978c78ecd10 100644 --- a/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py +++ b/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py @@ -170,6 +170,7 @@ async def generate(self, request: vLLMMultimodalRequest, context): logger.debug(f"Received PD request: {{ id: {request.request_id} }}.") multi_modal_data = defaultdict(list) + num_loaded_images = 0 for mi in request.multimodal_inputs: # ECConnector consumer mode: vLLM loads embeddings automatically from disk # We need to pass multimodal_input so vLLM can generate mm_hash and look up cache @@ -273,6 +274,8 @@ async def generate(self, request: vLLMMultimodalRequest, context): await self.image_loader.load_image(mi.multimodal_input.image_url) ) + num_loaded_images += 1 + # Remove the image features from the request as they are not required request.multimodal_inputs = None @@ -304,61 +307,66 @@ async def generate(self, request: vLLMMultimodalRequest, context): request_id=pd_request.request_id, ) - if self.enable_disagg and self.decode_worker_client: - decode_request = copy.deepcopy(request) - async for prefill_response in gen: - # For Qwen VL models with mRoPE: Keep the ORIGINAL unexpanded prompt. - # The decode worker will pass multi_modal_data which causes vLLM to - # expand the prompt identically to prefill, ensuring block counts match. - # - # For other models: Use the expanded prompt from prefill response. - # These models don't pass multi_modal_data in decode, so they need - # the already-expanded prompt to match the KV cache layout. - if not is_qwen_vl_model(self.config.model): - decode_request.engine_prompt[ - "prompt_token_ids" - ] = prefill_response.prompt_token_ids - logger.debug( - f"Prefill response kv_transfer_params: {prefill_response.kv_transfer_params}" - ) - extra_args = decode_request.sampling_params.extra_args or {} - extra_args["kv_transfer_params"] = prefill_response.kv_transfer_params - extra_args.pop("serialized_request", None) - decode_request.sampling_params.extra_args = extra_args - logger.debug("Decode request: %s", decode_request) - async for ( - decode_response - ) in await self.decode_worker_client.round_robin( - decode_request.model_dump_json() - ): - output = MyRequestOutput.model_validate_json(decode_response.data()) + try: + if self.enable_disagg and self.decode_worker_client: + decode_request = copy.deepcopy(request) + async for prefill_response in gen: + # For Qwen VL models with mRoPE: Keep the ORIGINAL unexpanded prompt. + # The decode worker will pass multi_modal_data which causes vLLM to + # expand the prompt identically to prefill, ensuring block counts match. + # + # For other models: Use the expanded prompt from prefill response. + # These models don't pass multi_modal_data in decode, so they need + # the already-expanded prompt to match the KV cache layout. + if not is_qwen_vl_model(self.config.model): + decode_request.engine_prompt[ + "prompt_token_ids" + ] = prefill_response.prompt_token_ids + logger.debug( + f"Prefill response kv_transfer_params: {prefill_response.kv_transfer_params}" + ) + extra_args = decode_request.sampling_params.extra_args or {} + extra_args["kv_transfer_params"] = prefill_response.kv_transfer_params + extra_args.pop("serialized_request", None) + decode_request.sampling_params.extra_args = extra_args + logger.debug("Decode request: %s", decode_request) + async for ( + decode_response + ) in await self.decode_worker_client.round_robin( + decode_request.model_dump_json() + ): + output = MyRequestOutput.model_validate_json(decode_response.data()) + yield MyRequestOutput( + request_id=output.request_id, + prompt=output.prompt, + prompt_token_ids=output.prompt_token_ids, + prompt_logprobs=output.prompt_logprobs, + outputs=output.outputs, + finished=output.finished, + metrics=output.metrics, + kv_transfer_params=output.kv_transfer_params, + ).model_dump_json() + + else: + async for response in gen: + logger.debug( + f"Response kv_transfer_params: {response.kv_transfer_params}" + ) + logger.debug( + f"length of expanded prompt ids: {len(response.prompt_token_ids)}" + ) + # logger.info(f"Response outputs: {response.outputs}") yield MyRequestOutput( - request_id=output.request_id, - prompt=output.prompt, - prompt_token_ids=output.prompt_token_ids, - prompt_logprobs=output.prompt_logprobs, - outputs=output.outputs, - finished=output.finished, - metrics=output.metrics, - kv_transfer_params=output.kv_transfer_params, + request_id=response.request_id, + prompt=response.prompt, + prompt_token_ids=response.prompt_token_ids, + prompt_logprobs=response.prompt_logprobs, + outputs=response.outputs, + finished=response.finished, + metrics=response.metrics, + kv_transfer_params=response.kv_transfer_params, ).model_dump_json() - else: - async for response in gen: - logger.debug( - f"Response kv_transfer_params: {response.kv_transfer_params}" - ) - logger.debug( - f"length of expanded prompt ids: {len(response.prompt_token_ids)}" - ) - # logger.info(f"Response outputs: {response.outputs}") - yield MyRequestOutput( - request_id=response.request_id, - prompt=response.prompt, - prompt_token_ids=response.prompt_token_ids, - prompt_logprobs=response.prompt_logprobs, - outputs=response.outputs, - finished=response.finished, - metrics=response.metrics, - kv_transfer_params=response.kv_transfer_params, - ).model_dump_json() + finally: + if num_loaded_images > 0: + self.image_loader.mark_consumed(num_loaded_images) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index c34c8a5353a..f9ae7614a72 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -69,6 +69,7 @@ def get_decoder(): class ImageLoader: CACHE_SIZE_MAXIMUM = 8 + DEFAULT_MAX_PENDING = 64 def __init__( self, @@ -76,6 +77,7 @@ def __init__( http_timeout: float = 30.0, use_nvimgcodec: bool = True, image_mode: str = "RGB", + max_pending: int | None = None, ): """ Initialize the ImageLoader. @@ -86,6 +88,10 @@ def __init__( use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding (returns 4D torch.Tensor). If False, use PIL (returns Image.Image) image_mode: Target image mode for PIL conversion (default: "RGB") + max_pending: Maximum number of decoded images waiting for the vLLM + scheduler to consume them. Decode will block if this + limit is reached. Defaults to DYN_IMAGE_MAX_PENDING + env var, or 64. """ self._http_timeout = http_timeout self._use_nvimgcodec = use_nvimgcodec @@ -93,6 +99,24 @@ def __init__( self._image_cache: dict[str, ImageOutput] = {} self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) + if max_pending is None: + max_pending = int( + os.environ.get("DYN_IMAGE_MAX_PENDING", self.DEFAULT_MAX_PENDING) + ) + self._pending_semaphore = asyncio.Semaphore(max_pending) + self._max_pending = max_pending + + def mark_consumed(self, count: int = 1): + """ + Signal that decoded images have been consumed by the vLLM prefill batch. + Call this after the prefill batch completes to allow more images to be decoded. + + Args: + count: Number of images consumed (default: 1) + """ + for _ in range(count): + self._pending_semaphore.release() + def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: """ Decode image bytes using nvimgcodec for GPU-accelerated decoding. @@ -208,6 +232,10 @@ async def load_image(self, image_url: str) -> ImageOutput: # Fetch image bytes image_bytes = await self._fetch_image_bytes(image_url) + # Wait if too many decoded images are pending in the vLLM scheduler. + # Released when the caller invokes mark_consumed() after prefill. + await self._pending_semaphore.acquire() + # Decode the image using thread pool to avoid blocking event loop loop = asyncio.get_running_loop() if self._use_nvimgcodec: From e722e41f0f22339b4cd62d3081df2f1c6612df2e Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sat, 7 Feb 2026 16:51:09 -0500 Subject: [PATCH 13/21] simplify --- .../multimodal_handlers/worker_handler.py | 117 +++++++++--------- 1 file changed, 57 insertions(+), 60 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py b/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py index 978c78ecd10..ed9e084b290 100644 --- a/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py +++ b/components/src/dynamo/vllm/multimodal_handlers/worker_handler.py @@ -307,66 +307,63 @@ async def generate(self, request: vLLMMultimodalRequest, context): request_id=pd_request.request_id, ) - try: - if self.enable_disagg and self.decode_worker_client: - decode_request = copy.deepcopy(request) - async for prefill_response in gen: - # For Qwen VL models with mRoPE: Keep the ORIGINAL unexpanded prompt. - # The decode worker will pass multi_modal_data which causes vLLM to - # expand the prompt identically to prefill, ensuring block counts match. - # - # For other models: Use the expanded prompt from prefill response. - # These models don't pass multi_modal_data in decode, so they need - # the already-expanded prompt to match the KV cache layout. - if not is_qwen_vl_model(self.config.model): - decode_request.engine_prompt[ - "prompt_token_ids" - ] = prefill_response.prompt_token_ids - logger.debug( - f"Prefill response kv_transfer_params: {prefill_response.kv_transfer_params}" - ) - extra_args = decode_request.sampling_params.extra_args or {} - extra_args["kv_transfer_params"] = prefill_response.kv_transfer_params - extra_args.pop("serialized_request", None) - decode_request.sampling_params.extra_args = extra_args - logger.debug("Decode request: %s", decode_request) - async for ( - decode_response - ) in await self.decode_worker_client.round_robin( - decode_request.model_dump_json() - ): - output = MyRequestOutput.model_validate_json(decode_response.data()) - yield MyRequestOutput( - request_id=output.request_id, - prompt=output.prompt, - prompt_token_ids=output.prompt_token_ids, - prompt_logprobs=output.prompt_logprobs, - outputs=output.outputs, - finished=output.finished, - metrics=output.metrics, - kv_transfer_params=output.kv_transfer_params, - ).model_dump_json() - - else: - async for response in gen: - logger.debug( - f"Response kv_transfer_params: {response.kv_transfer_params}" - ) - logger.debug( - f"length of expanded prompt ids: {len(response.prompt_token_ids)}" - ) - # logger.info(f"Response outputs: {response.outputs}") + if self.enable_disagg and self.decode_worker_client: + decode_request = copy.deepcopy(request) + async for prefill_response in gen: + # For Qwen VL models with mRoPE: Keep the ORIGINAL unexpanded prompt. + # The decode worker will pass multi_modal_data which causes vLLM to + # expand the prompt identically to prefill, ensuring block counts match. + # + # For other models: Use the expanded prompt from prefill response. + # These models don't pass multi_modal_data in decode, so they need + # the already-expanded prompt to match the KV cache layout. + if not is_qwen_vl_model(self.config.model): + decode_request.engine_prompt[ + "prompt_token_ids" + ] = prefill_response.prompt_token_ids + logger.debug( + f"Prefill response kv_transfer_params: {prefill_response.kv_transfer_params}" + ) + extra_args = decode_request.sampling_params.extra_args or {} + extra_args["kv_transfer_params"] = prefill_response.kv_transfer_params + extra_args.pop("serialized_request", None) + decode_request.sampling_params.extra_args = extra_args + logger.debug("Decode request: %s", decode_request) + async for ( + decode_response + ) in await self.decode_worker_client.round_robin( + decode_request.model_dump_json() + ): + output = MyRequestOutput.model_validate_json(decode_response.data()) yield MyRequestOutput( - request_id=response.request_id, - prompt=response.prompt, - prompt_token_ids=response.prompt_token_ids, - prompt_logprobs=response.prompt_logprobs, - outputs=response.outputs, - finished=response.finished, - metrics=response.metrics, - kv_transfer_params=response.kv_transfer_params, + request_id=output.request_id, + prompt=output.prompt, + prompt_token_ids=output.prompt_token_ids, + prompt_logprobs=output.prompt_logprobs, + outputs=output.outputs, + finished=output.finished, + metrics=output.metrics, + kv_transfer_params=output.kv_transfer_params, ).model_dump_json() - finally: - if num_loaded_images > 0: - self.image_loader.mark_consumed(num_loaded_images) + else: + async for response in gen: + logger.debug( + f"Response kv_transfer_params: {response.kv_transfer_params}" + ) + logger.debug( + f"length of expanded prompt ids: {len(response.prompt_token_ids)}" + ) + # logger.info(f"Response outputs: {response.outputs}") + yield MyRequestOutput( + request_id=response.request_id, + prompt=response.prompt, + prompt_token_ids=response.prompt_token_ids, + prompt_logprobs=response.prompt_logprobs, + outputs=response.outputs, + finished=response.finished, + metrics=response.metrics, + kv_transfer_params=response.kv_transfer_params, + ).model_dump_json() + + self.image_loader.mark_consumed(num_loaded_images) From 013a77b69ea6fa8105a0bbc91b33fd094a08f097 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sat, 7 Feb 2026 19:42:44 -0500 Subject: [PATCH 14/21] fix --- components/src/dynamo/vllm/handlers.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index b6daf581294..e9072e12675 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1363,6 +1363,9 @@ async def _generate_token_mode(self, request, context, request_id): logger.warning("Initiating Dynamo Runtime shutdown.") self.runtime.shutdown() os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() async def _generate_text_mode(self, request, context, request_id): """Generate text using OpenAI-compatible format (text-in-text-out).""" @@ -1455,6 +1458,9 @@ async def _generate_text_mode(self, request, context, request_id): logger.warning("Initiating Dynamo Runtime shutdown.") self.runtime.shutdown() os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() class PrefillWorkerHandler(BaseWorkerHandler): From 2ec7adacdae18b512fb9baa3dfe2baa71bcfaf2b Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 22:58:35 -0500 Subject: [PATCH 15/21] upd --- components/src/dynamo/vllm/handlers.py | 284 +++++++++--------- .../encode_worker_handler.py | 2 +- .../vllm/multimodal_utils/image_loader.py | 43 +-- 3 files changed, 152 insertions(+), 177 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index e9072e12675..4f31373fd4c 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1286,86 +1286,87 @@ async def _generate_token_mode(self, request, context, request_id): # Extract and decode multimodal data if present multi_modal_data = await self._extract_multimodal_data(request) - # Build prompt from request (handles both prompt_embeds and token_ids) - prompt, embedding_sequence_length, error = self._build_prompt_from_request( - request, request_id, multi_modal_data - ) - if error is not None: - yield error - return - - # Build sampling params from request - sampling_params = build_sampling_params( - request, self.default_sampling_params, self.model_max_len - ) - - prefill_result = request.get("prefill_result") - if prefill_result and isinstance(prefill_result, dict): - kv_params = prefill_result.get("disaggregated_params", {}).get( - "kv_transfer_params" + try: + # Build prompt from request (handles both prompt_embeds and token_ids) + prompt, embedding_sequence_length, error = self._build_prompt_from_request( + request, request_id, multi_modal_data ) - else: - kv_params = None + if error is not None: + yield error + return - if kv_params is not None: - if sampling_params.extra_args is None: - sampling_params.extra_args = {} - sampling_params.extra_args["kv_transfer_params"] = kv_params - logger.debug( - f"Using disaggregated params from prefill for request {request_id}" + # Build sampling params from request + sampling_params = build_sampling_params( + request, self.default_sampling_params, self.model_max_len ) - prefill_prompt_tokens_details = ( - prefill_result.get("prompt_tokens_details") if prefill_result else None - ) - - # Extract LoRA request if present - # Check if model name matches a loaded LoRA adapter - lora_request = None - model_name = request.get("model") - if model_name and model_name in self.lora_id_for_name: - lora_id = self.lora_id_for_name[model_name] - lora_request = LoRARequest( - lora_name=model_name, - lora_int_id=lora_id, - lora_path=self.lora_name_to_path[model_name], - ) - logger.info( - f"Decode request {request_id} will use LoRA adapter: {model_name} (ID: {lora_id})" - ) - else: - logger.debug( - f"Decode request {request_id} has no LoRA specified (model: {model_name})" + prefill_result = request.get("prefill_result") + if prefill_result and isinstance(prefill_result, dict): + kv_params = prefill_result.get("disaggregated_params", {}).get( + "kv_transfer_params" + ) + else: + kv_params = None + + if kv_params is not None: + if sampling_params.extra_args is None: + sampling_params.extra_args = {} + sampling_params.extra_args["kv_transfer_params"] = kv_params + logger.debug( + f"Using disaggregated params from prefill for request {request_id}" + ) + prefill_prompt_tokens_details = ( + prefill_result.get("prompt_tokens_details") if prefill_result else None ) - dp_rank = request.get("dp_rank", None) + # Extract LoRA request if present + # Check if model name matches a loaded LoRA adapter + lora_request = None + model_name = request.get("model") + + if model_name and model_name in self.lora_id_for_name: + lora_id = self.lora_id_for_name[model_name] + lora_request = LoRARequest( + lora_name=model_name, + lora_int_id=lora_id, + lora_path=self.lora_name_to_path[model_name], + ) + logger.info( + f"Decode request {request_id} will use LoRA adapter: {model_name} (ID: {lora_id})" + ) + else: + logger.debug( + f"Decode request {request_id} has no LoRA specified (model: {model_name})" + ) - trace_headers = build_trace_headers(context) + dp_rank = request.get("dp_rank", None) - async with self._abort_monitor(context, request_id): - try: - async for tok in self.generate_tokens( - prompt, - sampling_params, - request_id, - data_parallel_rank=dp_rank, - lora_request=lora_request, - embedding_sequence_length=embedding_sequence_length, - trace_headers=trace_headers, - ): - if prefill_result is not None and "completion_usage" in tok: - tok["completion_usage"][ - "prompt_tokens_details" - ] = prefill_prompt_tokens_details - yield tok - except EngineDeadError as e: - logger.error(f"vLLM EngineDeadError: {e}") - logger.warning("Initiating Dynamo Runtime shutdown.") - self.runtime.shutdown() - os._exit(1) - finally: - if multi_modal_data is not None: - self.image_loader.mark_consumed() + trace_headers = build_trace_headers(context) + + async with self._abort_monitor(context, request_id): + try: + async for tok in self.generate_tokens( + prompt, + sampling_params, + request_id, + data_parallel_rank=dp_rank, + lora_request=lora_request, + embedding_sequence_length=embedding_sequence_length, + trace_headers=trace_headers, + ): + if prefill_result is not None and "completion_usage" in tok: + tok["completion_usage"][ + "prompt_tokens_details" + ] = prefill_prompt_tokens_details + yield tok + except EngineDeadError as e: + logger.error(f"vLLM EngineDeadError: {e}") + logger.warning("Initiating Dynamo Runtime shutdown.") + self.runtime.shutdown() + os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() async def _generate_text_mode(self, request, context, request_id): """Generate text using OpenAI-compatible format (text-in-text-out).""" @@ -1378,89 +1379,90 @@ async def _generate_text_mode(self, request, context, request_id): # Extract multimodal data multi_modal_data = await self._extract_multimodal_from_openai_messages(request) - # Build prompt for vLLM - if isinstance(input_data, list): - prompt = TokensPrompt( - prompt_token_ids=input_data, multi_modal_data=multi_modal_data + try: + # Build prompt for vLLM + if isinstance(input_data, list): + prompt = TokensPrompt( + prompt_token_ids=input_data, multi_modal_data=multi_modal_data + ) + else: + prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) + + # Build sampling params from OpenAI-style request + sampling_params = build_sampling_params_openai( + request, self.default_sampling_params, self.model_max_len ) - else: - prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) - # Build sampling params from OpenAI-style request - sampling_params = build_sampling_params_openai( - request, self.default_sampling_params, self.model_max_len - ) + dp_rank = request.get("dp_rank", None) + openai_request_id = request.get("id") or request.get("request_id", request_id) + previous_text = "" - dp_rank = request.get("dp_rank", None) - openai_request_id = request.get("id") or request.get("request_id", request_id) - previous_text = "" + trace_headers = build_trace_headers(context) - trace_headers = build_trace_headers(context) + async with self._abort_monitor(context, request_id): + try: + gen = self.engine_client.generate( + prompt, + sampling_params, + request_id, + data_parallel_rank=dp_rank, + trace_headers=trace_headers, + ) - async with self._abort_monitor(context, request_id): - try: - gen = self.engine_client.generate( - prompt, - sampling_params, - request_id, - data_parallel_rank=dp_rank, - trace_headers=trace_headers, - ) + async for res in gen: + if not res.outputs: + yield { + "id": openai_request_id, + "created": int(time.time()), + "object": "chat.completion.chunk", + "model": "unknown", + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": ""}, + "finish_reason": "error", + } + ], + } + break + + output = res.outputs[0] + # Calculate the delta text (new text since last chunk) + delta_text = output.text[len(previous_text) :] + previous_text = output.text + + choice_data = { + "index": 0, + "delta": { + "role": "assistant", + "content": delta_text, + }, + "finish_reason": output.finish_reason, + } - async for res in gen: - if not res.outputs: - yield { + chunk = { "id": openai_request_id, "created": int(time.time()), "object": "chat.completion.chunk", "model": "unknown", - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "content": ""}, - "finish_reason": "error", - } - ], + "choices": [choice_data], } - break - - output = res.outputs[0] - # Calculate the delta text (new text since last chunk) - delta_text = output.text[len(previous_text) :] - previous_text = output.text - - choice_data = { - "index": 0, - "delta": { - "role": "assistant", - "content": delta_text, - }, - "finish_reason": output.finish_reason, - } - chunk = { - "id": openai_request_id, - "created": int(time.time()), - "object": "chat.completion.chunk", - "model": "unknown", - "choices": [choice_data], - } - - if output.finish_reason: - chunk["usage"] = BaseWorkerHandler._build_completion_usage( - request_output=res, - ) + if output.finish_reason: + chunk["usage"] = BaseWorkerHandler._build_completion_usage( + request_output=res, + ) - yield chunk + yield chunk - except EngineDeadError as e: - logger.error(f"vLLM EngineDeadError: {e}") - logger.warning("Initiating Dynamo Runtime shutdown.") - self.runtime.shutdown() - os._exit(1) - finally: - if multi_modal_data is not None: - self.image_loader.mark_consumed() + except EngineDeadError as e: + logger.error(f"vLLM EngineDeadError: {e}") + logger.warning("Initiating Dynamo Runtime shutdown.") + self.runtime.shutdown() + os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() class PrefillWorkerHandler(BaseWorkerHandler): diff --git a/components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py b/components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py index 2348e8a838a..782bf69ba99 100644 --- a/components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py +++ b/components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py @@ -58,7 +58,7 @@ def __init__( self.engine_args = engine_args self.model = self.engine_args.model - self.image_loader = ImageLoader(cache_size=CACHE_SIZE_MAXIMUM) + self.image_loader = ImageLoader() self.image_processor = AutoImageProcessor.from_pretrained( self.model, trust_remote_code=True ) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index f9ae7614a72..0e86b40e07d 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -68,12 +68,10 @@ def get_decoder(): class ImageLoader: - CACHE_SIZE_MAXIMUM = 8 DEFAULT_MAX_PENDING = 64 def __init__( self, - cache_size: int = CACHE_SIZE_MAXIMUM, http_timeout: float = 30.0, use_nvimgcodec: bool = True, image_mode: str = "RGB", @@ -83,7 +81,6 @@ def __init__( Initialize the ImageLoader. Args: - cache_size: Maximum number of images to cache http_timeout: Timeout for HTTP requests use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding (returns 4D torch.Tensor). If False, use PIL (returns Image.Image) @@ -96,8 +93,6 @@ def __init__( self._http_timeout = http_timeout self._use_nvimgcodec = use_nvimgcodec self._image_mode = image_mode - self._image_cache: dict[str, ImageOutput] = {} - self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) if max_pending is None: max_pending = int( @@ -219,23 +214,14 @@ async def load_image(self, image_url: str) -> ImageOutput: Returns: torch.Tensor in NCHW format (if use_nvimgcodec=True) or PIL Image """ - parsed_url = urlparse(image_url) + # Fetch image bytes (before acquiring semaphore so fetch errors don't leak) + image_bytes = await self._fetch_image_bytes(image_url) - # For HTTP(S) URLs, check cache first - if parsed_url.scheme in ("http", "https"): - image_url_lower = image_url.lower() - if image_url_lower in self._image_cache: - logger.debug(f"Image found in cache for URL: {image_url}") - return self._image_cache[image_url_lower] + # Wait if too many decoded images are pending in the vLLM scheduler. + # Released when the caller invokes mark_consumed() after prefill. + await self._pending_semaphore.acquire() try: - # Fetch image bytes - image_bytes = await self._fetch_image_bytes(image_url) - - # Wait if too many decoded images are pending in the vLLM scheduler. - # Released when the caller invokes mark_consumed() after prefill. - await self._pending_semaphore.acquire() - # Decode the image using thread pool to avoid blocking event loop loop = asyncio.get_running_loop() if self._use_nvimgcodec: @@ -250,22 +236,9 @@ async def load_image(self, image_url: str) -> ImageOutput: _decode_thread_pool, self._decode_with_pil, image_bytes ) - # Cache HTTP(S) URLs - if parsed_url.scheme in ("http", "https"): - image_url_lower = image_url.lower() - # Cache the image for future use, and evict the oldest image if full - if self._cache_queue.full(): - oldest_image_url = await self._cache_queue.get() - del self._image_cache[oldest_image_url] - - self._image_cache[image_url_lower] = image_result - await self._cache_queue.put(image_url_lower) - return image_result - except httpx.HTTPError as e: - logger.error(f"HTTP error loading image: {e}") + except Exception: + # Release semaphore on decode failure to prevent leak + self._pending_semaphore.release() raise - except Exception as e: - logger.error(f"Error loading image: {e}") - raise ValueError(f"Failed to load image: {e}") From f0c7a372b560c302bf5aa38772caf026d2bdd2d1 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:00:16 -0500 Subject: [PATCH 16/21] fix --- components/src/dynamo/vllm/handlers.py | 8 ++++++-- .../src/dynamo/vllm/multimodal_utils/image_loader.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 4f31373fd4c..2dfb1e4eb9f 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1386,7 +1386,9 @@ async def _generate_text_mode(self, request, context, request_id): prompt_token_ids=input_data, multi_modal_data=multi_modal_data ) else: - prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) + prompt = TextPrompt( + prompt=input_data, multi_modal_data=multi_modal_data + ) # Build sampling params from OpenAI-style request sampling_params = build_sampling_params_openai( @@ -1394,7 +1396,9 @@ async def _generate_text_mode(self, request, context, request_id): ) dp_rank = request.get("dp_rank", None) - openai_request_id = request.get("id") or request.get("request_id", request_id) + openai_request_id = request.get("id") or request.get( + "request_id", request_id + ) previous_text = "" trace_headers = build_trace_headers(context) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index 0e86b40e07d..97af076ca98 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -22,7 +22,6 @@ from typing import TypeAlias, Union from urllib.parse import urlparse -import httpx import pybase64 import torch from PIL import Image @@ -54,6 +53,7 @@ def _get_nvimgcodec(): global _nvimgcodec if _nvimgcodec is None: from nvidia import nvimgcodec + _nvimgcodec = nvimgcodec return _nvimgcodec From 9ee90d7491385db1cc28e743d57fc96239bc9e5f Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:12:17 -0500 Subject: [PATCH 17/21] fix --- components/src/dynamo/vllm/handlers.py | 31 -------------------------- 1 file changed, 31 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 96fc5aff54b..2dfb1e4eb9f 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1378,7 +1378,6 @@ async def _generate_text_mode(self, request, context, request_id): # Extract multimodal data multi_modal_data = await self._extract_multimodal_from_openai_messages(request) -<<<<<<< HEAD try: # Build prompt for vLLM @@ -1389,36 +1388,6 @@ async def _generate_text_mode(self, request, context, request_id): else: prompt = TextPrompt( prompt=input_data, multi_modal_data=multi_modal_data -======= - - # Build prompt for vLLM - if isinstance(input_data, list): - prompt = TokensPrompt( - prompt_token_ids=input_data, multi_modal_data=multi_modal_data - ) - else: - prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) - - # Build sampling params from OpenAI-style request - sampling_params = build_sampling_params_openai( - request, self.default_sampling_params, self.model_max_len - ) - - dp_rank = request.get("dp_rank", None) - openai_request_id = request.get("id") or request.get("request_id", request_id) - previous_text = "" - - trace_headers = build_trace_headers(context) - - async with self._abort_monitor(context, request_id): - try: - gen = self.engine_client.generate( - prompt, - sampling_params, - request_id, - data_parallel_rank=dp_rank, - trace_headers=trace_headers, ->>>>>>> 876e6d62c943608a6f42cb20805b82b544bcedd5 ) # Build sampling params from OpenAI-style request From b7f344b7b210bba34e7ea1ce6aab8bf378a79c1c Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:17:43 -0500 Subject: [PATCH 18/21] clean --- components/src/dynamo/vllm/handlers.py | 292 ++++++++++++------------- 1 file changed, 145 insertions(+), 147 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 2dfb1e4eb9f..a1b315b396d 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1286,87 +1286,86 @@ async def _generate_token_mode(self, request, context, request_id): # Extract and decode multimodal data if present multi_modal_data = await self._extract_multimodal_data(request) - try: - # Build prompt from request (handles both prompt_embeds and token_ids) - prompt, embedding_sequence_length, error = self._build_prompt_from_request( - request, request_id, multi_modal_data - ) - if error is not None: - yield error - return + # Build prompt from request (handles both prompt_embeds and token_ids) + prompt, embedding_sequence_length, error = self._build_prompt_from_request( + request, request_id, multi_modal_data + ) + if error is not None: + yield error + return + + # Build sampling params from request + sampling_params = build_sampling_params( + request, self.default_sampling_params, self.model_max_len + ) - # Build sampling params from request - sampling_params = build_sampling_params( - request, self.default_sampling_params, self.model_max_len + prefill_result = request.get("prefill_result") + if prefill_result and isinstance(prefill_result, dict): + kv_params = prefill_result.get("disaggregated_params", {}).get( + "kv_transfer_params" ) + else: + kv_params = None - prefill_result = request.get("prefill_result") - if prefill_result and isinstance(prefill_result, dict): - kv_params = prefill_result.get("disaggregated_params", {}).get( - "kv_transfer_params" - ) - else: - kv_params = None - - if kv_params is not None: - if sampling_params.extra_args is None: - sampling_params.extra_args = {} - sampling_params.extra_args["kv_transfer_params"] = kv_params - logger.debug( - f"Using disaggregated params from prefill for request {request_id}" - ) - prefill_prompt_tokens_details = ( - prefill_result.get("prompt_tokens_details") if prefill_result else None + if kv_params is not None: + if sampling_params.extra_args is None: + sampling_params.extra_args = {} + sampling_params.extra_args["kv_transfer_params"] = kv_params + logger.debug( + f"Using disaggregated params from prefill for request {request_id}" ) + prefill_prompt_tokens_details = ( + prefill_result.get("prompt_tokens_details") if prefill_result else None + ) - # Extract LoRA request if present - # Check if model name matches a loaded LoRA adapter - lora_request = None - model_name = request.get("model") - - if model_name and model_name in self.lora_id_for_name: - lora_id = self.lora_id_for_name[model_name] - lora_request = LoRARequest( - lora_name=model_name, - lora_int_id=lora_id, - lora_path=self.lora_name_to_path[model_name], - ) - logger.info( - f"Decode request {request_id} will use LoRA adapter: {model_name} (ID: {lora_id})" - ) - else: - logger.debug( - f"Decode request {request_id} has no LoRA specified (model: {model_name})" - ) + # Extract LoRA request if present + # Check if model name matches a loaded LoRA adapter + lora_request = None + model_name = request.get("model") - dp_rank = request.get("dp_rank", None) + if model_name and model_name in self.lora_id_for_name: + lora_id = self.lora_id_for_name[model_name] + lora_request = LoRARequest( + lora_name=model_name, + lora_int_id=lora_id, + lora_path=self.lora_name_to_path[model_name], + ) + logger.info( + f"Decode request {request_id} will use LoRA adapter: {model_name} (ID: {lora_id})" + ) + else: + logger.debug( + f"Decode request {request_id} has no LoRA specified (model: {model_name})" + ) - trace_headers = build_trace_headers(context) + dp_rank = request.get("dp_rank", None) - async with self._abort_monitor(context, request_id): - try: - async for tok in self.generate_tokens( - prompt, - sampling_params, - request_id, - data_parallel_rank=dp_rank, - lora_request=lora_request, - embedding_sequence_length=embedding_sequence_length, - trace_headers=trace_headers, - ): - if prefill_result is not None and "completion_usage" in tok: - tok["completion_usage"][ - "prompt_tokens_details" - ] = prefill_prompt_tokens_details - yield tok - except EngineDeadError as e: - logger.error(f"vLLM EngineDeadError: {e}") - logger.warning("Initiating Dynamo Runtime shutdown.") - self.runtime.shutdown() - os._exit(1) - finally: - if multi_modal_data is not None: - self.image_loader.mark_consumed() + trace_headers = build_trace_headers(context) + + async with self._abort_monitor(context, request_id): + try: + async for tok in self.generate_tokens( + prompt, + sampling_params, + request_id, + data_parallel_rank=dp_rank, + lora_request=lora_request, + embedding_sequence_length=embedding_sequence_length, + trace_headers=trace_headers, + ): + if prefill_result is not None and "completion_usage" in tok: + tok["completion_usage"][ + "prompt_tokens_details" + ] = prefill_prompt_tokens_details + yield tok + except EngineDeadError as e: + logger.error(f"vLLM EngineDeadError: {e}") + logger.warning("Initiating Dynamo Runtime shutdown.") + self.runtime.shutdown() + os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() async def _generate_text_mode(self, request, context, request_id): """Generate text using OpenAI-compatible format (text-in-text-out).""" @@ -1379,94 +1378,93 @@ async def _generate_text_mode(self, request, context, request_id): # Extract multimodal data multi_modal_data = await self._extract_multimodal_from_openai_messages(request) - try: - # Build prompt for vLLM - if isinstance(input_data, list): - prompt = TokensPrompt( - prompt_token_ids=input_data, multi_modal_data=multi_modal_data - ) - else: - prompt = TextPrompt( - prompt=input_data, multi_modal_data=multi_modal_data - ) - - # Build sampling params from OpenAI-style request - sampling_params = build_sampling_params_openai( - request, self.default_sampling_params, self.model_max_len + # Build prompt for vLLM + if isinstance(input_data, list): + prompt = TokensPrompt( + prompt_token_ids=input_data, multi_modal_data=multi_modal_data ) - - dp_rank = request.get("dp_rank", None) - openai_request_id = request.get("id") or request.get( - "request_id", request_id + else: + prompt = TextPrompt( + prompt=input_data, multi_modal_data=multi_modal_data ) - previous_text = "" - trace_headers = build_trace_headers(context) + # Build sampling params from OpenAI-style request + sampling_params = build_sampling_params_openai( + request, self.default_sampling_params, self.model_max_len + ) - async with self._abort_monitor(context, request_id): - try: - gen = self.engine_client.generate( - prompt, - sampling_params, - request_id, - data_parallel_rank=dp_rank, - trace_headers=trace_headers, - ) + dp_rank = request.get("dp_rank", None) + openai_request_id = request.get("id") or request.get( + "request_id", request_id + ) + previous_text = "" - async for res in gen: - if not res.outputs: - yield { - "id": openai_request_id, - "created": int(time.time()), - "object": "chat.completion.chunk", - "model": "unknown", - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "content": ""}, - "finish_reason": "error", - } - ], - } - break - - output = res.outputs[0] - # Calculate the delta text (new text since last chunk) - delta_text = output.text[len(previous_text) :] - previous_text = output.text - - choice_data = { - "index": 0, - "delta": { - "role": "assistant", - "content": delta_text, - }, - "finish_reason": output.finish_reason, - } + trace_headers = build_trace_headers(context) - chunk = { + async with self._abort_monitor(context, request_id): + try: + gen = self.engine_client.generate( + prompt, + sampling_params, + request_id, + data_parallel_rank=dp_rank, + trace_headers=trace_headers, + ) + + async for res in gen: + if not res.outputs: + yield { "id": openai_request_id, "created": int(time.time()), "object": "chat.completion.chunk", "model": "unknown", - "choices": [choice_data], + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": ""}, + "finish_reason": "error", + } + ], } + break - if output.finish_reason: - chunk["usage"] = BaseWorkerHandler._build_completion_usage( - request_output=res, - ) + output = res.outputs[0] + # Calculate the delta text (new text since last chunk) + delta_text = output.text[len(previous_text) :] + previous_text = output.text + + choice_data = { + "index": 0, + "delta": { + "role": "assistant", + "content": delta_text, + }, + "finish_reason": output.finish_reason, + } - yield chunk + chunk = { + "id": openai_request_id, + "created": int(time.time()), + "object": "chat.completion.chunk", + "model": "unknown", + "choices": [choice_data], + } - except EngineDeadError as e: - logger.error(f"vLLM EngineDeadError: {e}") - logger.warning("Initiating Dynamo Runtime shutdown.") - self.runtime.shutdown() - os._exit(1) - finally: - if multi_modal_data is not None: - self.image_loader.mark_consumed() + if output.finish_reason: + chunk["usage"] = BaseWorkerHandler._build_completion_usage( + request_output=res, + ) + + yield chunk + + except EngineDeadError as e: + logger.error(f"vLLM EngineDeadError: {e}") + logger.warning("Initiating Dynamo Runtime shutdown.") + self.runtime.shutdown() + os._exit(1) + finally: + if multi_modal_data is not None: + self.image_loader.mark_consumed() class PrefillWorkerHandler(BaseWorkerHandler): From a85101c62c23748726acd15cf40e89bccb7dbb77 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:18:55 -0500 Subject: [PATCH 19/21] format --- components/src/dynamo/vllm/handlers.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index a1b315b396d..e9072e12675 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1384,9 +1384,7 @@ async def _generate_text_mode(self, request, context, request_id): prompt_token_ids=input_data, multi_modal_data=multi_modal_data ) else: - prompt = TextPrompt( - prompt=input_data, multi_modal_data=multi_modal_data - ) + prompt = TextPrompt(prompt=input_data, multi_modal_data=multi_modal_data) # Build sampling params from OpenAI-style request sampling_params = build_sampling_params_openai( @@ -1394,9 +1392,7 @@ async def _generate_text_mode(self, request, context, request_id): ) dp_rank = request.get("dp_rank", None) - openai_request_id = request.get("id") or request.get( - "request_id", request_id - ) + openai_request_id = request.get("id") or request.get("request_id", request_id) previous_text = "" trace_headers = build_trace_headers(context) From c97184b57a099996a917a49197d8222bcf8d3783 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:24:43 -0500 Subject: [PATCH 20/21] clean --- components/src/dynamo/vllm/handlers.py | 13 +- .../vllm/multimodal_utils/image_loader.py | 203 ++++++++---------- 2 files changed, 99 insertions(+), 117 deletions(-) diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index e9072e12675..884608f7e1d 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -1365,7 +1365,9 @@ async def _generate_token_mode(self, request, context, request_id): os._exit(1) finally: if multi_modal_data is not None: - self.image_loader.mark_consumed() + images = multi_modal_data.get("image") + count = len(images) if isinstance(images, list) else 1 + self.image_loader.mark_consumed(count) async def _generate_text_mode(self, request, context, request_id): """Generate text using OpenAI-compatible format (text-in-text-out).""" @@ -1460,7 +1462,9 @@ async def _generate_text_mode(self, request, context, request_id): os._exit(1) finally: if multi_modal_data is not None: - self.image_loader.mark_consumed() + images = multi_modal_data.get("image") + count = len(images) if isinstance(images, list) else 1 + self.image_loader.mark_consumed(count) class PrefillWorkerHandler(BaseWorkerHandler): @@ -1614,3 +1618,8 @@ async def _generate_token_mode(self, request, context, request_id): raise GeneratorExit( "Prefill engine was shut down during token generation" ) from None + finally: + if multi_modal_data is not None: + images = multi_modal_data.get("image") + count = len(images) if isinstance(images, list) else 1 + self.image_loader.mark_consumed(count) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index a04e9f9f037..01a817ad495 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -14,6 +14,8 @@ # limitations under the License. import asyncio +import base64 +import binascii import logging import os import threading @@ -22,7 +24,7 @@ from typing import TypeAlias, Union from urllib.parse import urlparse -import pybase64 +import httpx import torch from PIL import Image @@ -40,7 +42,7 @@ _nvimgcodec = None _nvimgcodec_available: bool | None = None # None = not yet probed -# Global thread pool for image decoding operations +# Global thread pool for nvimgcodec decoding operations # Default to 8 workers, configurable via DYN_IMAGE_DECODE_WORKERS env var _IMAGE_DECODE_WORKERS = int(os.environ.get("DYN_IMAGE_DECODE_WORKERS", 8)) _decode_thread_pool = ThreadPoolExecutor( @@ -81,31 +83,19 @@ def get_decoder(): class ImageLoader: + CACHE_SIZE_MAXIMUM = 8 DEFAULT_MAX_PENDING = 64 def __init__( self, + cache_size: int = CACHE_SIZE_MAXIMUM, http_timeout: float = 30.0, use_nvimgcodec: bool = True, - image_mode: str = "RGB", max_pending: int | None = None, ): - """ - Initialize the ImageLoader. - - Args: - http_timeout: Timeout for HTTP requests - use_nvimgcodec: If True, use nvimgcodec for GPU-accelerated decoding - (returns 4D torch.Tensor). If False, use PIL (returns Image.Image). - Falls back to PIL automatically if nvimgcodec is not installed. - image_mode: Target image mode for PIL conversion (default: "RGB") - max_pending: Maximum number of decoded images waiting for the vLLM - scheduler to consume them. Decode will block if this - limit is reached. Defaults to DYN_IMAGE_MAX_PENDING - env var, or 64. - """ self._http_timeout = http_timeout - self._image_mode = image_mode + self._image_cache: dict[str, Image.Image] = {} + self._cache_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=cache_size) # Fall back to PIL if nvimgcodec was requested but is not installed if use_nvimgcodec and not _is_nvimgcodec_available(): @@ -138,9 +128,6 @@ def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: """ Decode image bytes using nvimgcodec for GPU-accelerated decoding. - Args: - data: Raw image bytes - Returns: torch.Tensor in NCHW format (4D) on CUDA device. Shape: (1, C, H, W) - batch dimension added so vLLM treats it as @@ -162,105 +149,91 @@ def _decode_with_nvimgcodec(self, data: bytes) -> torch.Tensor: return tensor - def _decode_with_pil(self, data: bytes) -> Image.Image: - """ - Decode image bytes using PIL. - - Args: - data: Raw image bytes - - Returns: - PIL Image converted to the target image mode - """ - image = Image.open(BytesIO(data)) - - # Validate image format - if image.format not in ("JPEG", "PNG", "WEBP", "GIF"): - raise ValueError(f"Unsupported image format: {image.format}") - - # Convert to target mode - if image.mode != self._image_mode: - image = image.convert(self._image_mode) - - return image - - async def _fetch_image_bytes(self, image_url: str) -> bytes: - """ - Fetch image bytes from a URL or data URI. - - Args: - image_url: URL (http/https) or data URI (data:image/...;base64,...) - - Returns: - Raw image bytes - """ - parsed_url = urlparse(image_url) - - if parsed_url.scheme == "data": - # Parse data URL format: data:[][;base64], - if not parsed_url.path.startswith("image/"): - raise ValueError("Data URL must be an image type") - - # Split the path into media type and data - media_type, data = parsed_url.path.split(",", 1) - if ";base64" not in media_type: - raise ValueError("Data URL must be base64 encoded") - - try: - # Use pybase64 for faster base64 decoding - return pybase64.b64decode(data, validate=True) - except Exception as e: - raise ValueError(f"Invalid base64 encoding: {e}") - - elif parsed_url.scheme in ("http", "https"): - http_client = get_http_client(self._http_timeout) - - response = await http_client.get(image_url) - response.raise_for_status() - - if not response.content: - raise ValueError("Empty response content from image URL") - - return response.content - - else: - raise ValueError(f"Invalid image source scheme: {parsed_url.scheme}") - async def load_image(self, image_url: str) -> ImageOutput: - """ - Load an image from a URL or data URI. - - Args: - image_url: URL (http/https) or data URI (data:image/...;base64,...) - - Returns: - torch.Tensor in NCHW format (if use_nvimgcodec=True) or PIL Image - """ - # Fetch image bytes (before acquiring semaphore so fetch errors don't leak) - image_bytes = await self._fetch_image_bytes(image_url) + """Load an image from a URL or data URI.""" + parsed_url = urlparse(image_url) - # Wait if too many decoded images are pending in the vLLM scheduler. - # Released when the caller invokes mark_consumed() after prefill. - await self._pending_semaphore.acquire() + # For HTTP(S) URLs, check cache first (PIL path only) + if not self._use_nvimgcodec and parsed_url.scheme in ("http", "https"): + image_url_lower = image_url.lower() + if image_url_lower in self._image_cache: + logger.debug(f"Image found in cache for URL: {image_url}") + return self._image_cache[image_url_lower] try: - # Decode the image using thread pool to avoid blocking event loop - loop = asyncio.get_running_loop() - if self._use_nvimgcodec: - # nvimgcodec decoding (GPU-accelerated, returns 4D tensor) - # Offload to thread pool to avoid blocking the event loop - image_result = await loop.run_in_executor( - _decode_thread_pool, self._decode_with_nvimgcodec, image_bytes - ) + if parsed_url.scheme == "data": + # Parse data URL format: data:[][;base64], + if not parsed_url.path.startswith("image/"): + raise ValueError("Data URL must be an image type") + + # Split the path into media type and data + media_type, data = parsed_url.path.split(",", 1) + if ";base64" not in media_type: + raise ValueError("Data URL must be base64 encoded") + + try: + image_bytes = base64.b64decode(data) + except binascii.Error as e: + raise ValueError(f"Invalid base64 encoding: {e}") + elif parsed_url.scheme in ("http", "https"): + http_client = get_http_client(self._http_timeout) + + response = await http_client.get(image_url) + response.raise_for_status() + + if not response.content: + raise ValueError("Empty response content from image URL") + + image_bytes = response.content else: - # PIL decoding (CPU-bound, offload to thread pool) - image_result = await loop.run_in_executor( - _decode_thread_pool, self._decode_with_pil, image_bytes - ) + raise ValueError(f"Invalid image source scheme: {parsed_url.scheme}") - return image_result + # Wait if too many decoded images are pending in the vLLM scheduler. + # Released when the caller invokes mark_consumed() after prefill. + await self._pending_semaphore.acquire() - except Exception: - # Release semaphore on decode failure to prevent leak - self._pending_semaphore.release() + try: + if self._use_nvimgcodec: + # nvimgcodec decoding (GPU-accelerated, returns 4D tensor) + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + _decode_thread_pool, + self._decode_with_nvimgcodec, + image_bytes, + ) + else: + # Original PIL path + image_data = BytesIO(image_bytes) + image = await asyncio.to_thread(Image.open, image_data) + + # Validate image format and convert to RGB + if image.format not in ("JPEG", "PNG", "WEBP"): + raise ValueError( + f"Unsupported image format: {image.format}" + ) + + image_converted = image.convert("RGB") + + # Cache HTTP(S) URLs + if parsed_url.scheme in ("http", "https"): + image_url_lower = image_url.lower() + if self._cache_queue.full(): + oldest_image_url = await self._cache_queue.get() + del self._image_cache[oldest_image_url] + + self._image_cache[image_url_lower] = image_converted + await self._cache_queue.put(image_url_lower) + + return image_converted + + except Exception: + # Release semaphore on decode failure to prevent leak + self._pending_semaphore.release() + raise + + except httpx.HTTPError as e: + logger.error(f"HTTP error loading image: {e}") raise + except Exception as e: + logger.error(f"Error loading image: {e}") + raise ValueError(f"Failed to load image: {e}") From d7e326f321c01869c1e6aa9bb54ed4ed4d06c085 Mon Sep 17 00:00:00 2001 From: Qidong Su Date: Sun, 8 Feb 2026 23:25:37 -0500 Subject: [PATCH 21/21] upd --- components/src/dynamo/vllm/multimodal_utils/image_loader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/components/src/dynamo/vllm/multimodal_utils/image_loader.py b/components/src/dynamo/vllm/multimodal_utils/image_loader.py index 01a817ad495..0e3c68f5b8f 100644 --- a/components/src/dynamo/vllm/multimodal_utils/image_loader.py +++ b/components/src/dynamo/vllm/multimodal_utils/image_loader.py @@ -208,9 +208,7 @@ async def load_image(self, image_url: str) -> ImageOutput: # Validate image format and convert to RGB if image.format not in ("JPEG", "PNG", "WEBP"): - raise ValueError( - f"Unsupported image format: {image.format}" - ) + raise ValueError(f"Unsupported image format: {image.format}") image_converted = image.convert("RGB")