From 256114a92de1a52fdce5829b87626255b0fb0cd5 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:48:32 +0800 Subject: [PATCH 01/13] Enhance ColpaliEmbeddingModel with dynamic model loading Updated ColpaliEmbeddingModel to support dynamic model loading based on environment variable. Added support for ColIdefics3 architecture and adjusted batch sizes accordingly. --- core/embedding/colpali_embedding_model.py | 195 ++++++++-------------- 1 file changed, 69 insertions(+), 126 deletions(-) diff --git a/core/embedding/colpali_embedding_model.py b/core/embedding/colpali_embedding_model.py index bf9d22af..cff6d846 100644 --- a/core/embedding/colpali_embedding_model.py +++ b/core/embedding/colpali_embedding_model.py @@ -4,13 +4,21 @@ import time from contextvars import ContextVar from typing import Any, Dict, List, Tuple, Union - +import os import numpy as np import torch -from colpali_engine.models import ColQwen2_5, ColQwen2_5_Processor from PIL.Image import Image from PIL.Image import open as open_image +# --- V'S CHANGE: Import BOTH model architectures --- +from colpali_engine.models import ( + ColQwen2_5, + ColQwen2_5_Processor, + ColIdefics3, + ColIdefics3Processor +) +# --------------------------------------------------- + from core.config import get_settings from core.embedding.base_embedding_model import BaseEmbeddingModel from core.models.chunk import Chunk @@ -18,52 +26,63 @@ logger = logging.getLogger(__name__) - _INGEST_METRICS: ContextVar[Dict[str, Any]] = ContextVar("_colpali_ingest_metrics", default={}) - class ColpaliEmbeddingModel(BaseEmbeddingModel): def __init__(self): + self.settings = get_settings() + self.mode = self.settings.MODE + + # 1. Determine Device device = "mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing ColpaliEmbeddingModel with device: {device}") - start_time = time.time() - - # Enable TF32 for faster matmuls on Ampere+ GPUs (A10, A100, etc.) + + # 2. Configure Attention (Flash Attn 2 Check) + attn_implementation = "eager" if device == "cuda": torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True - logger.info("Enabled TF32 for CUDA matmul operations") - - attn_implementation = "eager" - if device == "cuda": if importlib.util.find_spec("flash_attn") is not None: attn_implementation = "flash_attention_2" - else: - logger.warning( - "flash_attn package not found; falling back to 'eager' attention. " - "Install flash-attn to enable FlashAttention2 on GPU." - ) + + # 3. Model Selector Logic + # You can add a specific setting for this, or default to standard ColPali + # Example: self.settings.COLPALI_MODEL_NAME or a default string + model_name = os.getenv("COLPALI_MODEL_NAME", "vidore/colpali-v1.2-merged") + + logger.info(f"Loading ColPali Model: {model_name}") + + # --- V'S CHANGE: Dynamic Loading based on Model Name --- + if "smol" in model_name.lower() or "idefics" in model_name.lower(): + # Use Idefics3 architecture for SmolVLM + self.model = ColIdefics3.from_pretrained( + model_name, + torch_dtype=torch.bfloat16, + device_map=device, + attn_implementation=attn_implementation, + ).eval() + self.processor = ColIdefics3Processor.from_pretrained(model_name, use_fast=True) + # SmolVLM is tiny, so we can boost batch size + self.batch_size = 32 if self.mode == "cloud" else 4 + + else: + # Default to Qwen2.5/Standard ColPali architecture + self.model = ColQwen2_5.from_pretrained( + model_name, + torch_dtype=torch.bfloat16, + device_map=device, + attn_implementation=attn_implementation, + ).eval() + self.processor = ColQwen2_5_Processor.from_pretrained(model_name, use_fast=True) + # Standard models are heavier, use conservative batch size + self.batch_size = 8 if self.mode == "cloud" else 1 + # ------------------------------------------------------- - self.model = ColQwen2_5.from_pretrained( - "tsystems/colqwen2.5-3b-multilingual-v1.0", - dtype=torch.bfloat16, # preferred kwarg per upstream deprecation notice - device_map=device, # Automatically detect and use available device - attn_implementation=attn_implementation, - ).eval() - self.processor: ColQwen2_5_Processor = ColQwen2_5_Processor.from_pretrained( - "tsystems/colqwen2.5-3b-multilingual-v1.0", - use_fast=True, - ) - self.settings = get_settings() - self.mode = self.settings.MODE self.device = device - # Set batch size based on mode - self.batch_size = 8 if self.mode == "cloud" else 1 - logger.info(f"Colpali running in mode: {self.mode} with batch size: {self.batch_size}") - total_init_time = time.time() - start_time - logger.info(f"Colpali initialization time: {total_init_time:.2f} seconds") + logger.info(f"Colpali initialized. Batch size: {self.batch_size}") async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[np.ndarray]: + # ... (Rest of the method remains exactly the same) ... job_start_time = time.time() if isinstance(chunks, Chunk): chunks = [chunks] @@ -87,15 +106,13 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n if isinstance(raw_bytes, (bytes, bytearray, memoryview)): image_bytes = bytes(raw_bytes) else: - # data_uri_to_bytes handles both data URIs and raw base64 image_bytes = data_uri_to_bytes(chunk.content) image = open_image(io.BytesIO(image_bytes)) - # Drop cached bytes once we've materialized the image to keep metadata lean chunk.metadata.pop("_image_bytes", None) image_items.append((index, image)) except Exception as e: logger.error(f"Error processing image chunk {index}: {str(e)}. Falling back to text.") - text_items.append((index, chunk.content)) # Fallback: treat content as text + text_items.append((index, chunk.content)) else: text_items.append((index, chunk.content)) @@ -105,10 +122,8 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n f"Found {len(image_items)} images and {len(text_items)} text chunks" ) - # Initialize results array to preserve order results: List[np.ndarray | None] = [None] * len(chunks) - # Process image batches if image_items: img_start = time.time() indices_to_process = [item[0] for item in image_items] @@ -117,31 +132,22 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n for i in range(0, len(images_to_process), self.batch_size): batch_indices = indices_to_process[i : i + self.batch_size] batch_images = images_to_process[i : i + self.batch_size] - logger.debug( - f"Processing image batch {i//self.batch_size + 1}/" - f"{(len(images_to_process)-1)//self.batch_size + 1} with {len(batch_images)} images" - ) + # ... Logging ... batch_start = time.time() batch_embeddings, batch_metrics = await self.generate_embeddings_batch_images(batch_images) image_process += batch_metrics["process"] image_model += batch_metrics["model"] image_convert += batch_metrics["convert"] image_total += batch_metrics["total"] - # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding - batch_time = time.time() - batch_start - logger.debug( - f"Image batch {i//self.batch_size + 1} processing took {batch_time:.2f}s " - f"({batch_time/len(batch_images):.2f}s per image)" - ) + # ... Logging ... img_time = time.time() - img_start - logger.info(f"All image embedding took {img_time:.2f}s ({img_time/len(images_to_process):.2f}s per image)") + logger.info(f"All image embedding took {img_time:.2f}s") else: image_process = image_model = image_convert = image_total = 0.0 img_time = 0.0 - # Process text batches if text_items: text_start = time.time() indices_to_process = [item[0] for item in text_items] @@ -150,79 +156,34 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n for i in range(0, len(texts_to_process), self.batch_size): batch_indices = indices_to_process[i : i + self.batch_size] batch_texts = texts_to_process[i : i + self.batch_size] - logger.debug( - f"Processing text batch {i//self.batch_size + 1}/" - f"{(len(texts_to_process)-1)//self.batch_size + 1} with {len(batch_texts)} texts" - ) + # ... Logging ... batch_start = time.time() batch_embeddings, batch_metrics = await self.generate_embeddings_batch_texts(batch_texts) text_process += batch_metrics["process"] text_model += batch_metrics["model"] text_convert += batch_metrics["convert"] text_total += batch_metrics["total"] - # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding - batch_time = time.time() - batch_start - logger.debug( - f"Text batch {i//self.batch_size + 1} processing took {batch_time:.2f}s " - f"({batch_time/len(batch_texts):.2f}s per text)" - ) + # ... Logging ... text_time = time.time() - text_start - logger.info(f"All text embedding took {text_time:.2f}s ({text_time/len(texts_to_process):.2f}s per text)") + logger.info(f"All text embedding took {text_time:.2f}s") else: text_process = text_model = text_convert = text_total = 0.0 text_time = 0.0 - # Ensure all chunks were processed (handle potential None entries if errors occurred, - # though unlikely with fallback) final_results = [res for res in results if res is not None] - if len(final_results) != len(chunks): - logger.warning( - f"Number of embeddings ({len(final_results)}) does not match number of chunks " - f"({len(chunks)}). Some chunks might have failed." - ) - # Fill potential gaps if necessary, though the current logic should cover all chunks - # For safety, let's reconstruct based on successfully processed indices, though it shouldn't be needed - processed_indices = {idx for idx, _ in image_items} | {idx for idx, _ in text_items} - if len(processed_indices) != len(chunks): - logger.error("Mismatch in processed indices vs original chunks count. This indicates a logic error.") - # Assuming results contains embeddings at correct original indices, filter out Nones - final_results = [results[i] for i in range(len(chunks)) if results[i] is not None] - - total_time = time.time() - job_start_time - logger.info( - f"Total Colpali embed_for_ingestion took {total_time:.2f}s for {len(chunks)} chunks " - f"({total_time/len(chunks) if chunks else 0:.2f}s per chunk)" - ) - metrics = { - "sorting": sorting_time, - "image_process": image_process, - "image_model": image_model, - "image_convert": image_convert, - "image_total": image_total, - "text_process": text_process, - "text_model": text_model, - "text_convert": text_convert, - "text_total": text_total, - "process": image_process + text_process, - "model": image_model + text_model, - "convert": image_convert + text_convert, - "image_count": len(image_items), - "text_count": len(text_items), - "total": total_time, - "chunk_count": len(chunks), - } - _INGEST_METRICS.set(metrics) - # Cast is safe because we filter out Nones, though Nones shouldn't occur with the fallback logic - return final_results # type: ignore + + # ... (Metrics calculation remains the same) ... + + return final_results # type: ignore def latest_ingest_metrics(self) -> Dict[str, Any]: - """Return timing metrics from the most recent embed_for_ingestion call in this context.""" metrics = _INGEST_METRICS.get() return dict(metrics) if metrics else {} async def embed_for_query(self, text: str) -> torch.Tensor: + # Remains the same start_time = time.time() result = await self.generate_embeddings(text) elapsed = time.time() - start_time @@ -230,6 +191,7 @@ async def embed_for_query(self, text: str) -> torch.Tensor: return result async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: + # Remains the same start_time = time.time() content_type = "image" if isinstance(content, Image) else "text" process_start = time.time() @@ -239,11 +201,8 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: processed = self.processor.process_queries([content]).to(self.model.device) process_time = time.time() - process_start - model_start = time.time() - # inference_mode is faster than no_grad (disables version tracking) - # autocast ensures consistent bf16 inference on CUDA with torch.inference_mode(): if self.device == "cuda": with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): @@ -252,23 +211,14 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: embeddings = self.model(**processed) model_time = time.time() - model_start - convert_start = time.time() - result = embeddings.to(torch.float32).numpy(force=True)[0] - convert_time = time.time() - convert_start - total_time = time.time() - start_time - logger.debug( - f"Generate embeddings ({content_type}): process={process_time:.2f}s, model={model_time:.2f}s, " - f"convert={convert_time:.2f}s, total={total_time:.2f}s" - ) return result - # ---- Batch processing methods (only used in 'cloud' mode) ---- - async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[List[np.ndarray], Dict[str, float]]: + # Remains the same batch_start_time = time.time() process_start = time.time() processed_images = self.processor.process_images(images).to(self.model.device) @@ -287,12 +237,8 @@ async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[L image_embeddings_np = image_embeddings.to(torch.float32).numpy(force=True) result = [emb for emb in image_embeddings_np] convert_time = time.time() - convert_start - total_batch_time = time.time() - batch_start_time - logger.debug( - f"Batch images ({len(images)}): process={process_time:.2f}s, model={model_time:.2f}s, " - f"convert={convert_time:.2f}s, total={total_batch_time:.2f}s ({total_batch_time/len(images):.3f}s/image)" - ) + return result, { "process": process_time, "model": model_time, @@ -301,6 +247,7 @@ async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[L } async def generate_embeddings_batch_texts(self, texts: List[str]) -> Tuple[List[np.ndarray], Dict[str, float]]: + # Remains the same batch_start_time = time.time() process_start = time.time() processed_texts = self.processor.process_queries(texts).to(self.model.device) @@ -319,12 +266,8 @@ async def generate_embeddings_batch_texts(self, texts: List[str]) -> Tuple[List[ text_embeddings_np = text_embeddings.to(torch.float32).numpy(force=True) result = [emb for emb in text_embeddings_np] convert_time = time.time() - convert_start - total_batch_time = time.time() - batch_start_time - logger.debug( - f"Batch texts ({len(texts)}): process={process_time:.2f}s, model={model_time:.2f}s, " - f"convert={convert_time:.2f}s, total={total_batch_time:.2f}s ({total_batch_time/len(texts):.3f}s/text)" - ) + return result, { "process": process_time, "model": model_time, From a7e001414b063ad38ca4fa40d9d3c1c38b47c07a Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:50:15 +0800 Subject: [PATCH 02/13] Add COLPALI_MODEL_NAME to .env.example Added support for COLPALI model configuration. --- .env.example | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.env.example b/.env.example index b07806d5..dcbb5858 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,9 @@ OPENAI_API_KEY= ANTHROPIC_API_KEY= GEMINI_API_KEY= +# Support using colpali other model +COLPALI_MODEL_NAME="vidore/colSmol-256M" + # Optional AWS_ACCESS_KEY= AWS_SECRET_ACCESS_KEY= From 2d4de96092a70d94ff28e41fd83ba386f96110c3 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:51:06 +0800 Subject: [PATCH 03/13] Change PDF DPI for ColPali processing to 300 Updated PDF DPI setting for ColPali processing. --- morphik.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/morphik.toml b/morphik.toml index 738a3df3..7c13ddec 100644 --- a/morphik.toml +++ b/morphik.toml @@ -140,7 +140,8 @@ arq_max_jobs = 1 # Maximum concurrent jobs for ARQ worker colpali_store_batch_size = 16 # Batch size for ColPali vector storage [pdf] -colpali_pdf_dpi = 150 # DPI for PDF to image conversion in ColPali processing +# colpali_pdf_dpi = 150 # DPI for PDF to image conversion in ColPali processing +colpali_pdf_dpi = 300 # For clarity when using small colpali model [morphik] enable_colpali = true From cb34c4f1f9cd553579da915b5117c78f519e9580 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:57:34 +0800 Subject: [PATCH 04/13] Update COLPALI_MODEL_NAME in .env.example Updated COLPALI_MODEL_NAME to use ColSmolVLM 256/500M architecture. --- .env.example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index dcbb5858..5608235e 100644 --- a/.env.example +++ b/.env.example @@ -8,8 +8,8 @@ OPENAI_API_KEY= ANTHROPIC_API_KEY= GEMINI_API_KEY= -# Support using colpali other model -COLPALI_MODEL_NAME="vidore/colSmol-256M" +# Support using ColSmolVLM 256/500M architecture : vidore/colSmol-500M / vidore/colSmol-256M +COLPALI_MODEL_NAME="vidore/colSmol-500M" # Optional AWS_ACCESS_KEY= From c3fac8dc7e8851f8a593ed2b8953dcef4e9c0041 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:58:27 +0800 Subject: [PATCH 05/13] Add COLPALI_MODEL_NAME to .env.example Added guide comment for colpali model setup. --- .env.example | 1 + 1 file changed, 1 insertion(+) diff --git a/.env.example b/.env.example index 5608235e..f4787805 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,7 @@ ANTHROPIC_API_KEY= GEMINI_API_KEY= # Support using ColSmolVLM 256/500M architecture : vidore/colSmol-500M / vidore/colSmol-256M +# Default : "vidore/colpali-v1.2-merged" COLPALI_MODEL_NAME="vidore/colSmol-500M" # Optional From 8da1166b795b29a2239a0f7d52b31f463936322a Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 01:02:47 +0800 Subject: [PATCH 06/13] Add support for ColPali and update model loading logic Added support for PaliGemma model. --- core/embedding/colpali_embedding_model.py | 39 +++++++++++++++++------ 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/core/embedding/colpali_embedding_model.py b/core/embedding/colpali_embedding_model.py index cff6d846..bd241178 100644 --- a/core/embedding/colpali_embedding_model.py +++ b/core/embedding/colpali_embedding_model.py @@ -10,12 +10,14 @@ from PIL.Image import Image from PIL.Image import open as open_image -# --- V'S CHANGE: Import BOTH model architectures --- +# --- V'S CHANGE: Import ALL 3 model architectures --- from colpali_engine.models import ( ColQwen2_5, ColQwen2_5_Processor, ColIdefics3, - ColIdefics3Processor + ColIdefics3Processor, + ColPali, # <--- New: For vidore/colpali-v1.2 (PaliGemma) + ColPaliProcessor # <--- New ) # --------------------------------------------------- @@ -46,15 +48,16 @@ def __init__(self): attn_implementation = "flash_attention_2" # 3. Model Selector Logic - # You can add a specific setting for this, or default to standard ColPali - # Example: self.settings.COLPALI_MODEL_NAME or a default string + # Get model name from ENV, default to the standard ColPali v1.2 model_name = os.getenv("COLPALI_MODEL_NAME", "vidore/colpali-v1.2-merged") logger.info(f"Loading ColPali Model: {model_name}") - # --- V'S CHANGE: Dynamic Loading based on Model Name --- + # --- V'S CHANGE: Dynamic Loading for Smol, Qwen, AND PaliGemma --- + + # CASE 1: SMOLVLM (Idefics3 Architecture) if "smol" in model_name.lower() or "idefics" in model_name.lower(): - # Use Idefics3 architecture for SmolVLM + logger.info("Detected SmolVLM/Idefics3 architecture.") self.model = ColIdefics3.from_pretrained( model_name, torch_dtype=torch.bfloat16, @@ -62,11 +65,12 @@ def __init__(self): attn_implementation=attn_implementation, ).eval() self.processor = ColIdefics3Processor.from_pretrained(model_name, use_fast=True) - # SmolVLM is tiny, so we can boost batch size + # Smol is tiny (256M/500M), boost batch size! self.batch_size = 32 if self.mode == "cloud" else 4 - else: - # Default to Qwen2.5/Standard ColPali architecture + # CASE 2: QWEN (Qwen2/2.5-VL Architecture) + elif "qwen" in model_name.lower(): + logger.info("Detected Qwen2/2.5-VL architecture.") self.model = ColQwen2_5.from_pretrained( model_name, torch_dtype=torch.bfloat16, @@ -74,13 +78,28 @@ def __init__(self): attn_implementation=attn_implementation, ).eval() self.processor = ColQwen2_5_Processor.from_pretrained(model_name, use_fast=True) - # Standard models are heavier, use conservative batch size + # Qwen is heavy (3B+), keep batch size conservative self.batch_size = 8 if self.mode == "cloud" else 1 + + # CASE 3: COLPALI (PaliGemma Architecture - The Default) + else: + logger.info("Detected Standard ColPali (PaliGemma) architecture.") + self.model = ColPali.from_pretrained( + model_name, + torch_dtype=torch.bfloat16, + device_map=device, + attn_implementation=attn_implementation, + ).eval() + self.processor = ColPaliProcessor.from_pretrained(model_name, use_fast=True) + # PaliGemma is ~3B, keep batch size conservative + self.batch_size = 8 if self.mode == "cloud" else 1 + # ------------------------------------------------------- self.device = device logger.info(f"Colpali initialized. Batch size: {self.batch_size}") + async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[np.ndarray]: # ... (Rest of the method remains exactly the same) ... job_start_time = time.time() From fa9e1154f95f8e6c8de4c23323e2ec63232a67a7 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 01:04:10 +0800 Subject: [PATCH 07/13] Revise ColPali model selection comments in .env.example Updated comments for ColPali model selection and clarified default model name. --- .env.example | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index f4787805..748531ce 100644 --- a/.env.example +++ b/.env.example @@ -8,9 +8,18 @@ OPENAI_API_KEY= ANTHROPIC_API_KEY= GEMINI_API_KEY= -# Support using ColSmolVLM 256/500M architecture : vidore/colSmol-500M / vidore/colSmol-256M -# Default : "vidore/colpali-v1.2-merged" -COLPALI_MODEL_NAME="vidore/colSmol-500M" +# --- ColPali Model Selection --- +# 1. SmolVLM (Lightweight & Fast - Uses ColIdefics3) +# - vidore/colSmol-256M +# - vidore/colSmol-500M +# 2. Qwen 2.5-VL (High Performance - Uses ColQwen2_5) +# - vidore/colqwen2.5-v0.1 +# 3. Standard ColPali (PaliGemma - Uses ColPali) +# - vidore/colpali-v1.2-merged +# - vidore/colpali-v1.3 +# Default if not set: "vidore/colpali-v1.2-merged" + +COLPALI_MODEL_NAME="vidore/colSmol-500M" # Optional AWS_ACCESS_KEY= From 8976f830a70272c745283203721090b7d402f6c3 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:05:48 +0800 Subject: [PATCH 08/13] Added metrics collection and updated model loading method. Changes from .env loading method to morphik.toml global config. --- core/embedding/colpali_embedding_model.py | 150 +++++++++++++++++----- 1 file changed, 121 insertions(+), 29 deletions(-) diff --git a/core/embedding/colpali_embedding_model.py b/core/embedding/colpali_embedding_model.py index bd241178..b9acfee7 100644 --- a/core/embedding/colpali_embedding_model.py +++ b/core/embedding/colpali_embedding_model.py @@ -10,16 +10,15 @@ from PIL.Image import Image from PIL.Image import open as open_image -# --- V'S CHANGE: Import ALL 3 model architectures --- +# Import ALL 3 model architectures for multi-model support from colpali_engine.models import ( ColQwen2_5, ColQwen2_5_Processor, ColIdefics3, ColIdefics3Processor, - ColPali, # <--- New: For vidore/colpali-v1.2 (PaliGemma) - ColPaliProcessor # <--- New + ColPali, + ColPaliProcessor ) -# --------------------------------------------------- from core.config import get_settings from core.embedding.base_embedding_model import BaseEmbeddingModel @@ -30,6 +29,7 @@ _INGEST_METRICS: ContextVar[Dict[str, Any]] = ContextVar("_colpali_ingest_metrics", default={}) + class ColpaliEmbeddingModel(BaseEmbeddingModel): def __init__(self): self.settings = get_settings() @@ -38,22 +38,37 @@ def __init__(self): # 1. Determine Device device = "mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing ColpaliEmbeddingModel with device: {device}") + start_time = time.time() - # 2. Configure Attention (Flash Attn 2 Check) - attn_implementation = "eager" + # Enable TF32 for faster matmuls on Ampere+ GPUs (A10, A100, etc.) if device == "cuda": torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True + logger.info("Enabled TF32 for CUDA matmul operations") + + # 2. Configure Attention (Flash Attn 2 Check) + attn_implementation = "eager" + if device == "cuda": if importlib.util.find_spec("flash_attn") is not None: attn_implementation = "flash_attention_2" + else: + logger.warning( + "flash_attn package not found; falling back to 'eager' attention. " + "Install flash-attn to enable FlashAttention2 on GPU." + ) # 3. Model Selector Logic - # Get model name from ENV, default to the standard ColPali v1.2 - model_name = os.getenv("COLPALI_MODEL_NAME", "vidore/colpali-v1.2-merged") + # Get model name from morphik.toml via settings, with fallback to default + # The settings object should have COLPALI_MODEL_NAME from morphik.toml [morphik] section + model_name = getattr(self.settings, 'COLPALI_MODEL_NAME', None) + if model_name is None: + # Fallback to default ColPali v1.2 if not configured + model_name = "vidore/colpali-v1.2-merged" + logger.info(f"COLPALI_MODEL_NAME not found in settings, using default: {model_name}") logger.info(f"Loading ColPali Model: {model_name}") - # --- V'S CHANGE: Dynamic Loading for Smol, Qwen, AND PaliGemma --- + # Dynamic Loading for Smol, Qwen, AND PaliGemma architectures # CASE 1: SMOLVLM (Idefics3 Architecture) if "smol" in model_name.lower() or "idefics" in model_name.lower(): @@ -93,15 +108,13 @@ def __init__(self): self.processor = ColPaliProcessor.from_pretrained(model_name, use_fast=True) # PaliGemma is ~3B, keep batch size conservative self.batch_size = 8 if self.mode == "cloud" else 1 - - # ------------------------------------------------------- self.device = device - logger.info(f"Colpali initialized. Batch size: {self.batch_size}") + total_init_time = time.time() - start_time + logger.info(f"Colpali running in mode: {self.mode} with batch size: {self.batch_size}") + logger.info(f"Colpali initialization time: {total_init_time:.2f} seconds") - async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[np.ndarray]: - # ... (Rest of the method remains exactly the same) ... job_start_time = time.time() if isinstance(chunks, Chunk): chunks = [chunks] @@ -125,13 +138,15 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n if isinstance(raw_bytes, (bytes, bytearray, memoryview)): image_bytes = bytes(raw_bytes) else: + # data_uri_to_bytes handles both data URIs and raw base64 image_bytes = data_uri_to_bytes(chunk.content) image = open_image(io.BytesIO(image_bytes)) + # Drop cached bytes once we've materialized the image to keep metadata lean chunk.metadata.pop("_image_bytes", None) image_items.append((index, image)) except Exception as e: logger.error(f"Error processing image chunk {index}: {str(e)}. Falling back to text.") - text_items.append((index, chunk.content)) + text_items.append((index, chunk.content)) # Fallback: treat content as text else: text_items.append((index, chunk.content)) @@ -141,8 +156,10 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n f"Found {len(image_items)} images and {len(text_items)} text chunks" ) + # Initialize results array to preserve order results: List[np.ndarray | None] = [None] * len(chunks) + # Process image batches if image_items: img_start = time.time() indices_to_process = [item[0] for item in image_items] @@ -151,22 +168,31 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n for i in range(0, len(images_to_process), self.batch_size): batch_indices = indices_to_process[i : i + self.batch_size] batch_images = images_to_process[i : i + self.batch_size] - # ... Logging ... + logger.debug( + f"Processing image batch {i//self.batch_size + 1}/" + f"{(len(images_to_process)-1)//self.batch_size + 1} with {len(batch_images)} images" + ) batch_start = time.time() batch_embeddings, batch_metrics = await self.generate_embeddings_batch_images(batch_images) image_process += batch_metrics["process"] image_model += batch_metrics["model"] image_convert += batch_metrics["convert"] image_total += batch_metrics["total"] + # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding - # ... Logging ... + batch_time = time.time() - batch_start + logger.debug( + f"Image batch {i//self.batch_size + 1} processing took {batch_time:.2f}s " + f"({batch_time/len(batch_images):.2f}s per image)" + ) img_time = time.time() - img_start - logger.info(f"All image embedding took {img_time:.2f}s") + logger.info(f"All image embedding took {img_time:.2f}s ({img_time/len(images_to_process):.2f}s per image)") else: image_process = image_model = image_convert = image_total = 0.0 img_time = 0.0 + # Process text batches if text_items: text_start = time.time() indices_to_process = [item[0] for item in text_items] @@ -175,34 +201,82 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n for i in range(0, len(texts_to_process), self.batch_size): batch_indices = indices_to_process[i : i + self.batch_size] batch_texts = texts_to_process[i : i + self.batch_size] - # ... Logging ... + logger.debug( + f"Processing text batch {i//self.batch_size + 1}/" + f"{(len(texts_to_process)-1)//self.batch_size + 1} with {len(batch_texts)} texts" + ) batch_start = time.time() batch_embeddings, batch_metrics = await self.generate_embeddings_batch_texts(batch_texts) text_process += batch_metrics["process"] text_model += batch_metrics["model"] text_convert += batch_metrics["convert"] text_total += batch_metrics["total"] + # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding - # ... Logging ... + batch_time = time.time() - batch_start + logger.debug( + f"Text batch {i//self.batch_size + 1} processing took {batch_time:.2f}s " + f"({batch_time/len(batch_texts):.2f}s per text)" + ) text_time = time.time() - text_start - logger.info(f"All text embedding took {text_time:.2f}s") + logger.info(f"All text embedding took {text_time:.2f}s ({text_time/len(texts_to_process):.2f}s per text)") else: text_process = text_model = text_convert = text_total = 0.0 text_time = 0.0 + # Ensure all chunks were processed (handle potential None entries if errors occurred, + # though unlikely with fallback) final_results = [res for res in results if res is not None] + if len(final_results) != len(chunks): + logger.warning( + f"Number of embeddings ({len(final_results)}) does not match number of chunks " + f"({len(chunks)}). Some chunks might have failed." + ) + # Fill potential gaps if necessary, though the current logic should cover all chunks + # For safety, let's reconstruct based on successfully processed indices, though it shouldn't be needed + processed_indices = {idx for idx, _ in image_items} | {idx for idx, _ in text_items} + if len(processed_indices) != len(chunks): + logger.error("Mismatch in processed indices vs original chunks count. This indicates a logic error.") + # Assuming results contains embeddings at correct original indices, filter out Nones + final_results = [results[i] for i in range(len(chunks)) if results[i] is not None] + + total_time = time.time() - job_start_time + logger.info( + f"Total Colpali embed_for_ingestion took {total_time:.2f}s for {len(chunks)} chunks " + f"({total_time/len(chunks) if chunks else 0:.2f}s per chunk)" + ) - # ... (Metrics calculation remains the same) ... + # Collect and store metrics + metrics = { + "sorting": sorting_time, + "image_process": image_process, + "image_model": image_model, + "image_convert": image_convert, + "image_total": image_total, + "text_process": text_process, + "text_model": text_model, + "text_convert": text_convert, + "text_total": text_total, + "process": image_process + text_process, + "model": image_model + text_model, + "convert": image_convert + text_convert, + "image_count": len(image_items), + "text_count": len(text_items), + "total": total_time, + "chunk_count": len(chunks), + } + _INGEST_METRICS.set(metrics) - return final_results # type: ignore + # Cast is safe because we filter out Nones, though Nones shouldn't occur with the fallback logic + return final_results # type: ignore def latest_ingest_metrics(self) -> Dict[str, Any]: + """Return timing metrics from the most recent embed_for_ingestion call in this context.""" metrics = _INGEST_METRICS.get() return dict(metrics) if metrics else {} async def embed_for_query(self, text: str) -> torch.Tensor: - # Remains the same start_time = time.time() result = await self.generate_embeddings(text) elapsed = time.time() - start_time @@ -210,7 +284,6 @@ async def embed_for_query(self, text: str) -> torch.Tensor: return result async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: - # Remains the same start_time = time.time() content_type = "image" if isinstance(content, Image) else "text" process_start = time.time() @@ -220,8 +293,11 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: processed = self.processor.process_queries([content]).to(self.model.device) process_time = time.time() - process_start + model_start = time.time() + # inference_mode is faster than no_grad (disables version tracking) + # autocast ensures consistent bf16 inference on CUDA with torch.inference_mode(): if self.device == "cuda": with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): @@ -230,14 +306,23 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: embeddings = self.model(**processed) model_time = time.time() - model_start + convert_start = time.time() + result = embeddings.to(torch.float32).numpy(force=True)[0] + convert_time = time.time() - convert_start + total_time = time.time() - start_time + logger.debug( + f"Generate embeddings ({content_type}): process={process_time:.2f}s, model={model_time:.2f}s, " + f"convert={convert_time:.2f}s, total={total_time:.2f}s" + ) return result + # ---- Batch processing methods (only used in 'cloud' mode) ---- + async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[List[np.ndarray], Dict[str, float]]: - # Remains the same batch_start_time = time.time() process_start = time.time() processed_images = self.processor.process_images(images).to(self.model.device) @@ -256,8 +341,12 @@ async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[L image_embeddings_np = image_embeddings.to(torch.float32).numpy(force=True) result = [emb for emb in image_embeddings_np] convert_time = time.time() - convert_start + total_batch_time = time.time() - batch_start_time - + logger.debug( + f"Batch images ({len(images)}): process={process_time:.2f}s, model={model_time:.2f}s, " + f"convert={convert_time:.2f}s, total={total_batch_time:.2f}s ({total_batch_time/len(images):.3f}s/image)" + ) return result, { "process": process_time, "model": model_time, @@ -266,7 +355,6 @@ async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[L } async def generate_embeddings_batch_texts(self, texts: List[str]) -> Tuple[List[np.ndarray], Dict[str, float]]: - # Remains the same batch_start_time = time.time() process_start = time.time() processed_texts = self.processor.process_queries(texts).to(self.model.device) @@ -285,8 +373,12 @@ async def generate_embeddings_batch_texts(self, texts: List[str]) -> Tuple[List[ text_embeddings_np = text_embeddings.to(torch.float32).numpy(force=True) result = [emb for emb in text_embeddings_np] convert_time = time.time() - convert_start + total_batch_time = time.time() - batch_start_time - + logger.debug( + f"Batch texts ({len(texts)}): process={process_time:.2f}s, model={model_time:.2f}s, " + f"convert={convert_time:.2f}s, total={total_batch_time:.2f}s ({total_batch_time/len(texts):.3f}s/text)" + ) return result, { "process": process_time, "model": model_time, From 5a97f43b6f82ec6ba202e6ad1134664fad66e46c Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:07:14 +0800 Subject: [PATCH 09/13] Add ColPali model selection options to morphik.toml Added options for ColPali model selection in configuration. --- morphik.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/morphik.toml b/morphik.toml index 7c13ddec..d8b879fb 100644 --- a/morphik.toml +++ b/morphik.toml @@ -152,6 +152,13 @@ api_domain = "api.morphik.ai" # API domain for cloud URIs morphik_embedding_api_domain = ["http://localhost:6000"] colpali_mode = "local" # "off", "local", or "api" +# NEW: ColPali model selection +# Options: +# - "vidore/colpali-v1.2-merged" (default, PaliGemma architecture) +# - "tsystems/colqwen2.5-3b-multilingual-v1.0" (Qwen architecture) +# - Any SmolVLM/Idefics3 compatible model +colpali_model_name = "vidore/colpali-v1.2-merged" + [pdf_viewer] frontend_url = "http://localhost:3000/api/pdf" # "https://morphik.ai/api/pdf" # "http://localhost:3000/api/pdf" # "https://morphik.ai/api/pdf" From 76ff119b8fa5a0939cc96335bb38dbd81a0fe8a9 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:07:43 +0800 Subject: [PATCH 10/13] Clean up .env.example by removing model selection Removed comments and default model name from .env.example --- .env.example | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/.env.example b/.env.example index 748531ce..b07806d5 100644 --- a/.env.example +++ b/.env.example @@ -8,19 +8,6 @@ OPENAI_API_KEY= ANTHROPIC_API_KEY= GEMINI_API_KEY= -# --- ColPali Model Selection --- -# 1. SmolVLM (Lightweight & Fast - Uses ColIdefics3) -# - vidore/colSmol-256M -# - vidore/colSmol-500M -# 2. Qwen 2.5-VL (High Performance - Uses ColQwen2_5) -# - vidore/colqwen2.5-v0.1 -# 3. Standard ColPali (PaliGemma - Uses ColPali) -# - vidore/colpali-v1.2-merged -# - vidore/colpali-v1.3 -# Default if not set: "vidore/colpali-v1.2-merged" - -COLPALI_MODEL_NAME="vidore/colSmol-500M" - # Optional AWS_ACCESS_KEY= AWS_SECRET_ACCESS_KEY= From 37ee3cb55960a02c395b1bbe267c4f3ae4244189 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:08:39 +0800 Subject: [PATCH 11/13] Revise ColPali model selection comments Updated ColPali model selection options with descriptions. --- morphik.toml | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/morphik.toml b/morphik.toml index d8b879fb..ea8e5346 100644 --- a/morphik.toml +++ b/morphik.toml @@ -152,12 +152,17 @@ api_domain = "api.morphik.ai" # API domain for cloud URIs morphik_embedding_api_domain = ["http://localhost:6000"] colpali_mode = "local" # "off", "local", or "api" -# NEW: ColPali model selection -# Options: -# - "vidore/colpali-v1.2-merged" (default, PaliGemma architecture) -# - "tsystems/colqwen2.5-3b-multilingual-v1.0" (Qwen architecture) -# - Any SmolVLM/Idefics3 compatible model -colpali_model_name = "vidore/colpali-v1.2-merged" +# --- ColPali Model Selection --- +# 1. SmolVLM (Lightweight & Fast - Uses ColIdefics3) +# - vidore/colSmol-256M +# - vidore/colSmol-500M +# 2. Qwen 2.5-VL (High Performance - Uses ColQwen2_5) +# - vidore/colqwen2.5-v0.1 +# 3. Standard ColPali (PaliGemma - Uses ColPali) +# - vidore/colpali-v1.2-merged +# - vidore/colpali-v1.3 +# Default if not set: "vidore/colpali-v1.2-merged" +colpali_model_name="vidore/colpali-v1.2-merged" [pdf_viewer] frontend_url = "http://localhost:3000/api/pdf" # "https://morphik.ai/api/pdf" # "http://localhost:3000/api/pdf" # "https://morphik.ai/api/pdf" From 8121780d4f2e890ad755bfd5436e1d2f44bb13ea Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:10:11 +0800 Subject: [PATCH 12/13] Add ColPali model configuration to config.py --- core/config.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/config.py b/core/config.py index c5635181..49a0f1c2 100644 --- a/core/config.py +++ b/core/config.py @@ -74,6 +74,10 @@ class Settings(BaseSettings): DB_MAX_RETRIES: int = 3 DB_RETRY_DELAY: float = 1.0 + # ColPali model configuration + COLPALI_MODEL_NAME: str = "vidore/colpali-v1.2-merged" + + # Embedding configuration EMBEDDING_PROVIDER: Literal["litellm"] = "litellm" EMBEDDING_MODEL: str From 464df2db5b5830caac27a832db9bac9b83d5afe4 Mon Sep 17 00:00:00 2001 From: Husayn Irfan <112241008+husaynirfan1@users.noreply.github.com> Date: Sat, 20 Dec 2025 10:21:31 +0800 Subject: [PATCH 13/13] Refactor comments and change to correct model processors. --- core/embedding/colpali_embedding_model.py | 40 ++++++----------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/core/embedding/colpali_embedding_model.py b/core/embedding/colpali_embedding_model.py index b9acfee7..054734f5 100644 --- a/core/embedding/colpali_embedding_model.py +++ b/core/embedding/colpali_embedding_model.py @@ -4,13 +4,13 @@ import time from contextvars import ContextVar from typing import Any, Dict, List, Tuple, Union -import os + import numpy as np import torch from PIL.Image import Image from PIL.Image import open as open_image -# Import ALL 3 model architectures for multi-model support +# Import ALL 3 model architectures from colpali_engine.models import ( ColQwen2_5, ColQwen2_5_Processor, @@ -40,15 +40,12 @@ def __init__(self): logger.info(f"Initializing ColpaliEmbeddingModel with device: {device}") start_time = time.time() - # Enable TF32 for faster matmuls on Ampere+ GPUs (A10, A100, etc.) + # 2. Configure Attention (Flash Attn 2 Check) + attn_implementation = "eager" if device == "cuda": torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True logger.info("Enabled TF32 for CUDA matmul operations") - - # 2. Configure Attention (Flash Attn 2 Check) - attn_implementation = "eager" - if device == "cuda": if importlib.util.find_spec("flash_attn") is not None: attn_implementation = "flash_attention_2" else: @@ -58,19 +55,15 @@ def __init__(self): ) # 3. Model Selector Logic - # Get model name from morphik.toml via settings, with fallback to default - # The settings object should have COLPALI_MODEL_NAME from morphik.toml [morphik] section - model_name = getattr(self.settings, 'COLPALI_MODEL_NAME', None) - if model_name is None: - # Fallback to default ColPali v1.2 if not configured - model_name = "vidore/colpali-v1.2-merged" - logger.info(f"COLPALI_MODEL_NAME not found in settings, using default: {model_name}") + # Get model name from morphik.toml via settings, default to the standard ColPali v1.2 + model_name = getattr(self.settings, 'COLPALI_MODEL_NAME', "vidore/colpali-v1.2-merged") logger.info(f"Loading ColPali Model: {model_name}") - # Dynamic Loading for Smol, Qwen, AND PaliGemma architectures + # Dynamic Loading for Smol, Qwen, AND PaliGemma # CASE 1: SMOLVLM (Idefics3 Architecture) + # See: https://huggingface.co/vidore/colSmol-256M if "smol" in model_name.lower() or "idefics" in model_name.lower(): logger.info("Detected SmolVLM/Idefics3 architecture.") self.model = ColIdefics3.from_pretrained( @@ -146,7 +139,7 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n image_items.append((index, image)) except Exception as e: logger.error(f"Error processing image chunk {index}: {str(e)}. Falling back to text.") - text_items.append((index, chunk.content)) # Fallback: treat content as text + text_items.append((index, chunk.content)) else: text_items.append((index, chunk.content)) @@ -178,7 +171,6 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n image_model += batch_metrics["model"] image_convert += batch_metrics["convert"] image_total += batch_metrics["total"] - # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding batch_time = time.time() - batch_start @@ -211,7 +203,6 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n text_model += batch_metrics["model"] text_convert += batch_metrics["convert"] text_total += batch_metrics["total"] - # Place embeddings in the correct position in results for original_index, embedding in zip(batch_indices, batch_embeddings): results[original_index] = embedding batch_time = time.time() - batch_start @@ -225,20 +216,16 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n text_process = text_model = text_convert = text_total = 0.0 text_time = 0.0 - # Ensure all chunks were processed (handle potential None entries if errors occurred, - # though unlikely with fallback) + # Ensure all chunks were processed final_results = [res for res in results if res is not None] if len(final_results) != len(chunks): logger.warning( f"Number of embeddings ({len(final_results)}) does not match number of chunks " f"({len(chunks)}). Some chunks might have failed." ) - # Fill potential gaps if necessary, though the current logic should cover all chunks - # For safety, let's reconstruct based on successfully processed indices, though it shouldn't be needed processed_indices = {idx for idx, _ in image_items} | {idx for idx, _ in text_items} if len(processed_indices) != len(chunks): logger.error("Mismatch in processed indices vs original chunks count. This indicates a logic error.") - # Assuming results contains embeddings at correct original indices, filter out Nones final_results = [results[i] for i in range(len(chunks)) if results[i] is not None] total_time = time.time() - job_start_time @@ -268,7 +255,6 @@ async def embed_for_ingestion(self, chunks: Union[Chunk, List[Chunk]]) -> List[n } _INGEST_METRICS.set(metrics) - # Cast is safe because we filter out Nones, though Nones shouldn't occur with the fallback logic return final_results # type: ignore def latest_ingest_metrics(self) -> Dict[str, Any]: @@ -296,8 +282,6 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: model_start = time.time() - # inference_mode is faster than no_grad (disables version tracking) - # autocast ensures consistent bf16 inference on CUDA with torch.inference_mode(): if self.device == "cuda": with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): @@ -308,9 +292,7 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: model_time = time.time() - model_start convert_start = time.time() - result = embeddings.to(torch.float32).numpy(force=True)[0] - convert_time = time.time() - convert_start total_time = time.time() - start_time @@ -320,8 +302,6 @@ async def generate_embeddings(self, content: Union[str, Image]) -> np.ndarray: ) return result - # ---- Batch processing methods (only used in 'cloud' mode) ---- - async def generate_embeddings_batch_images(self, images: List[Image]) -> Tuple[List[np.ndarray], Dict[str, float]]: batch_start_time = time.time() process_start = time.time()