diff --git a/ai/analyzer.py b/ai/analyzer.py index ba786d2..9e32dea 100644 --- a/ai/analyzer.py +++ b/ai/analyzer.py @@ -18,7 +18,7 @@ search_vectors as _search_vectors, get_chunk_text as _get_chunk_text, ) -from .openai import get_embedding_for_text, call_coding_api +from .openai import call_coding_api, EmbeddingClient from llama_index.core import Document from utils.logger import get_logger from utils import compute_file_hash, chunk_text, norm, cosine @@ -59,15 +59,18 @@ logger = get_logger(__name__) +# Initialize EmbeddingClient for structured logging and retry logic +_embedding_client = EmbeddingClient() -def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, model: Optional[str] = None): + +def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "", chunk_index: int = 0, model: Optional[str] = None): """ Wrapper to acquire semaphore inside executor task to avoid deadlock. The semaphore is acquired in the worker thread, not the main thread. """ semaphore.acquire() try: - return get_embedding_for_text(text, model) + return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) finally: semaphore.release() @@ -192,7 +195,7 @@ def _process_file_sync( for idx, chunk_doc in batch: # Submit task to executor; semaphore will be acquired inside the worker embedding_start_time = time.time() - future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, embedding_model) + future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model) embedding_futures.append((idx, chunk_doc, future, embedding_start_time)) # Wait for batch to complete and store results @@ -434,7 +437,7 @@ def search_semantic(query: str, database_path: str, top_k: int = 5): Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns a list of {file_id, path, chunk_index, score}. """ - q_emb = get_embedding_for_text(query) + q_emb = _embedding_client.embed_text(query, file_path="", chunk_index=0) if not q_emb: return [] diff --git a/ai/llama_integration.py b/ai/llama_integration.py index 155fd23..bf8d18e 100644 --- a/ai/llama_integration.py +++ b/ai/llama_integration.py @@ -4,11 +4,14 @@ from typing import List from llama_index.core import Document -from .openai import get_embedding_for_text +from .openai import EmbeddingClient from utils.logger import get_logger logger = get_logger(__name__) +# Create a module-level embedding client instance +_embedding_client = EmbeddingClient() + def llama_index_retrieve_documents(query: str, database_path: str, top_k: int = 5, search_func=None, get_chunk_func=None) -> List[Document]: @@ -28,7 +31,7 @@ def llama_index_retrieve_documents(query: str, database_path: str, top_k: int = if search_func is None or get_chunk_func is None: raise ValueError("search_func and get_chunk_func must be provided") - q_emb = get_embedding_for_text(query) + q_emb = _embedding_client.embed_text(query, file_path="", chunk_index=0) if not q_emb: return [] diff --git a/ai/openai.py b/ai/openai.py index f0be269..4a75bd2 100644 --- a/ai/openai.py +++ b/ai/openai.py @@ -1,8 +1,13 @@ -from typing import Optional +from typing import Optional, List, Dict, Any import os import time +import uuid +import json +import logging +import traceback import threading from openai import OpenAI +import requests from utils.config import CFG @@ -13,6 +18,9 @@ DEFAULT_EMBEDDING_MODEL = CFG.get("embedding_model") DEFAULT_CODING_MODEL = CFG.get("coding_model") +# Embedding client logger +_embedding_logger = logging.getLogger("ai.analyzer.embedding") + # Rate limiting configuration _RATE_LIMIT_CALLS = 100 # max calls per minute _RATE_LIMIT_WINDOW = 60.0 # seconds @@ -100,24 +108,192 @@ def _retry_with_backoff(func, *args, **kwargs): time.sleep(delay) -def get_embedding_for_text(text: str, model: Optional[str] = None): +class EmbeddingError(Exception): + """Custom exception for embedding failures""" + pass + + +class EmbeddingClient: """ - Return embedding vector (list[float]) using the new OpenAI client. - Includes rate limiting, retry logic with exponential backoff, and circuit breaker. - model: optional model id; if not provided, uses DEFAULT_EMBEDDING_MODEL from CFG. + Embedding client with detailed logging, retry logic, and configurable timeouts. + Provides better debugging for embedding API failures. """ - model_to_use = model or DEFAULT_EMBEDDING_MODEL - if not model_to_use: - raise RuntimeError("No embedding model configured. Set EMBEDDING_MODEL in .env or pass model argument.") + def __init__(self, + api_url: Optional[str] = None, + api_key: Optional[str] = None, + model: Optional[str] = None, + timeout: float = 30.0, + max_retries: int = 2, + backoff: float = 1.5): + self.api_url = api_url or CFG.get("api_url") + self.api_key = api_key or CFG.get("api_key") + self.model = model or DEFAULT_EMBEDDING_MODEL or "text-embedding-3-small" + self.timeout = timeout + self.max_retries = max_retries + self.backoff = backoff + self.session = requests.Session() + if self.api_key: + self.session.headers.update({"Authorization": f"Bearer {self.api_key}"}) + self.session.headers.update({"Content-Type": "application/json"}) - def _get_embedding(): - resp = _client.embeddings.create(model=model_to_use, input=text) - return resp.data[0].embedding - - try: - return _retry_with_backoff(_get_embedding) - except Exception as e: - raise RuntimeError(f"Failed to obtain embedding from OpenAI client: {e}") from e + def _log_request_start(self, request_id: str, file_path: str, chunk_index: int, chunk_len: int): + _embedding_logger.debug( + "Embedding request START", + extra={ + "request_id": request_id, + "file": file_path, + "chunk_index": chunk_index, + "chunk_length": chunk_len, + "model": self.model, + "api_url": self.api_url, + "timeout": self.timeout, + }, + ) + + def _log_request_end(self, request_id: str, elapsed: float, status: Optional[int], response_body_preview: str): + _embedding_logger.debug( + "Embedding request END", + extra={ + "request_id": request_id, + "elapsed_s": elapsed, + "status": status, + "response_preview": response_body_preview, + }, + ) + + def embed_text(self, text: str, file_path: str = "", chunk_index: int = 0) -> List[float]: + """ + Embed a single chunk of text. Returns the embedding vector. + Raises EmbeddingError on failure. + """ + request_id = str(uuid.uuid4()) + chunk_len = len(text) + self._log_request_start(request_id, file_path, chunk_index, chunk_len) + + payload = { + "model": self.model, + "input": text, + } + + attempt = 0 + err_msg = "" + while True: + attempt += 1 + start = time.perf_counter() + try: + resp = self.session.post( + self.api_url, + data=json.dumps(payload), + timeout=self.timeout, + ) + elapsed = time.perf_counter() - start + + # Try to parse JSON safely + try: + resp_json = resp.json() + except Exception: + resp_json = None + + preview = "" + if resp_json is not None: + preview = json.dumps(resp_json)[:1000] + else: + preview = (resp.text or "")[:1000] + + self._log_request_end(request_id, elapsed, resp.status_code, preview) + + if resp.status_code >= 200 and resp.status_code < 300: + # expected format: {"data": [{"embedding": [...]}], ...} + if not resp_json: + raise EmbeddingError(f"Empty JSON response (status={resp.status_code})") + try: + # tolerant extraction + data = resp_json.get("data") if isinstance(resp_json, dict) else None + if data and isinstance(data, list) and len(data) > 0: + emb = data[0].get("embedding") + if emb and isinstance(emb, list): + _embedding_logger.info( + "Embedding succeeded", + extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index}, + ) + return emb + # Fallback: maybe top-level "embedding" key + if isinstance(resp_json, dict) and "embedding" in resp_json: + emb = resp_json["embedding"] + if isinstance(emb, list): + return emb + raise EmbeddingError(f"Unexpected embedding response shape: {resp_json}") + except KeyError as e: + raise EmbeddingError(f"Missing keys in embedding response: {e}") + else: + # Non-2xx + _embedding_logger.warning( + "Embedding API returned non-2xx", + extra={ + "request_id": request_id, + "status_code": resp.status_code, + "file": file_path, + "chunk_index": chunk_index, + "attempt": attempt, + "body_preview": preview, + }, + ) + # fall through to retry logic + err_msg = f"Status {resp.status_code}: {preview}" + + except requests.Timeout as e: + elapsed = time.perf_counter() - start + err_msg = f"Timeout after {elapsed:.2f}s: {e}" + _embedding_logger.error("Embedding API Timeout", extra={"request_id": request_id, "error": str(e)}) + except requests.RequestException as e: + elapsed = time.perf_counter() - start + err_msg = f"RequestException after {elapsed:.2f}s: {e}\n{traceback.format_exc()}" + _embedding_logger.error("Embedding request exception", extra={"request_id": request_id, "error": err_msg}) + except Exception as e: + elapsed = time.perf_counter() - start + err_msg = f"Unexpected error after {elapsed:.2f}s: {e}\n{traceback.format_exc()}" + _embedding_logger.exception("Unexpected embedding exception", extra={"request_id": request_id}) + + # Retry logic + if attempt > self.max_retries: + _embedding_logger.error( + "Max retries exceeded for embedding request", + extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index, "attempts": attempt}, + ) + raise EmbeddingError(f"Failed to get embedding after {attempt} attempts. Last error: {err_msg}") + + # Backoff and retry + sleep_for = self.backoff * (2 ** (attempt - 1)) + _embedding_logger.info( + "Retrying embedding request", + extra={ + "request_id": request_id, + "file": file_path, + "chunk_index": chunk_index, + "attempt": attempt, + "sleep_s": sleep_for, + }, + ) + time.sleep(sleep_for) + + def embed_multiple(self, chunks: List[str], file_path: str = "") -> List[Dict[str, Any]]: + """ + Embed a list of text chunks. Returns list of dicts: {"chunk_index": i, "embedding": [...]}. + This method logs progress and errors for each chunk. + """ + results = [] + for i, chunk in enumerate(chunks): + try: + emb = self.embed_text(chunk, file_path=file_path, chunk_index=i) + results.append({"chunk_index": i, "embedding": emb}) + except EmbeddingError as e: + _embedding_logger.error( + "Failed to embed chunk", + extra={"file": file_path, "chunk_index": i, "error": str(e)}, + ) + # append a failure marker or skip depending on desired behavior + results.append({"chunk_index": i, "embedding": None, "error": str(e)}) + return results def call_coding_api(prompt: str, model: Optional[str] = None, max_tokens: int = 1024):