diff --git a/ai/analyzer.py b/ai/analyzer.py index 2b59993..e7d8093 100644 --- a/ai/analyzer.py +++ b/ai/analyzer.py @@ -15,14 +15,14 @@ load_sqlite_vector_extension as _load_sqlite_vector_extension, ensure_chunks_and_meta as _ensure_chunks_and_meta, insert_chunk_vector_with_retry as _insert_chunk_vector_with_retry, - search_vectors as _search_vectors, get_chunk_text as _get_chunk_text, ) -from .openai import call_coding_api, EmbeddingClient +from .openai import call_coding_api +from .llama_embeddings import OpenAICompatibleEmbedding +from .llama_chunker import chunk_with_llama_index from llama_index.core import Document from utils.logger import get_logger -from utils import compute_file_hash, chunk_text, norm, cosine -from .smart_chunker import smart_chunk +from utils import compute_file_hash, norm, cosine import logging # reduce noise from httpx used by external libs @@ -64,8 +64,8 @@ logger = get_logger(__name__) -# Initialize EmbeddingClient for structured logging and retry logic -_embedding_client = EmbeddingClient() +# Initialize llama-index embedding client +_embedding_client = OpenAICompatibleEmbedding() # Thread-local storage to track execution state inside futures _thread_state = threading.local() @@ -86,7 +86,8 @@ def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, fil semaphore.acquire() try: _thread_state.stage = "calling_embed_text" - result = _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) + # Use llama-index embedding client + result = _embedding_client._get_text_embedding(text) _thread_state.stage = "completed" return result except Exception as e: @@ -171,14 +172,8 @@ def _process_file_sync( if isinstance(cfg, dict): embedding_model = cfg.get("embedding_model") - # Use smart chunking for supported code languages - use_smart_chunking = cfg.get("smart_chunking", True) if isinstance(cfg, dict) else True - supported_languages = ["python", "javascript", "typescript", "java", "go", "rust", "c", "cpp"] - - if use_smart_chunking and lang in supported_languages: - chunks = smart_chunk(content, language=lang, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP) - else: - chunks = chunk_text(content, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP) + # Use llama-index chunking for all content + chunks = chunk_with_llama_index(content, language=lang, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) if not chunks: chunks = [content] @@ -395,11 +390,13 @@ def analyze_local_path_sync( try: # Use batch update for efficiency - single database transaction + # Store total_files for performance (avoid re-scanning directory on every request) set_project_metadata_batch(database_path, { "last_indexed_at": time.strftime("%Y-%m-%d %H:%M:%S"), "last_index_duration": str(duration), "files_indexed": str(file_count), - "files_skipped": str(skipped_count) + "files_skipped": str(skipped_count), + "total_files": str(total_files) # Store total files found during indexing }) except Exception: logger.exception("Failed to store indexing metadata") @@ -442,16 +439,40 @@ def analyze_local_path_background(local_path: str, database_path: str, venv_path def search_semantic(query: str, database_path: str, top_k: int = 5): """ - Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns - a list of {file_id, path, chunk_index, score}. + Uses llama-index with sqlite-vector backend to retrieve best-matching chunks. + Always includes content as it's needed for the coding model context. + + Args: + query: Search query text + database_path: Path to the SQLite database + top_k: Number of results to return + + Returns: + List of dicts with file_id, path, chunk_index, score, and content """ - q_emb = _embedding_client.embed_text(query, file_path="", chunk_index=0) - if not q_emb: - return [] - try: - return _search_vectors(database_path, q_emb, top_k=top_k) - except Exception: + # Use llama-index for semantic search + from .llama_integration import llama_index_search + + docs = llama_index_search(query, database_path, top_k=top_k) + + results = [] + for doc in docs: + metadata = doc.metadata or {} + result = { + "file_id": metadata.get("file_id", 0), + "path": metadata.get("path", ""), + "chunk_index": metadata.get("chunk_index", 0), + "score": metadata.get("score", 0.0), + "content": doc.text or "" # Always include content for LLM context + } + results.append(result) + + logger.info(f"llama-index search returned {len(results)} results") + return results + + except Exception as e: + logger.exception(f"Semantic search failed: {e}") raise diff --git a/ai/llama_chunker.py b/ai/llama_chunker.py new file mode 100644 index 0000000..2ea0604 --- /dev/null +++ b/ai/llama_chunker.py @@ -0,0 +1,111 @@ +""" +LlamaIndex-based chunking for code and text. +Replaces smart_chunker.py with llama-index's built-in splitters. +""" +from typing import List +from llama_index.core.node_parser import CodeSplitter, SentenceSplitter +from llama_index.core.schema import Document + +from utils.logger import get_logger + +logger = get_logger(__name__) + + +def chunk_with_llama_index( + content: str, + language: str = "text", + chunk_size: int = 800, + chunk_overlap: int = 100 +) -> List[str]: + """ + Chunk text or code using llama-index's splitters. + + Args: + content: Text or code content to chunk + language: Programming language (python, javascript, etc.) or "text" + chunk_size: Target size for each chunk in characters + chunk_overlap: Overlap between chunks in characters + + Returns: + List of text chunks + """ + # Map language names to llama-index language identifiers + language_map = { + "python": "python", + "javascript": "js", + "typescript": "ts", + "java": "java", + "go": "go", + "rust": "rust", + "c": "c", + "cpp": "cpp", + "c++": "cpp", + } + + try: + # Check if it's a supported code language + llama_lang = language_map.get(language.lower()) + + if llama_lang: + # Use CodeSplitter for code + splitter = CodeSplitter( + language=llama_lang, + chunk_lines=40, # Target lines per chunk (approximation) + chunk_lines_overlap=5, # Overlap in lines + max_chars=chunk_size + ) + logger.debug(f"Using CodeSplitter for language: {llama_lang}") + else: + # Use SentenceSplitter for text or unknown languages + splitter = SentenceSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + paragraph_separator="\n\n", + secondary_chunking_regex="[^,.;。?!]+[,.;。?!]?" + ) + logger.debug(f"Using SentenceSplitter for language: {language}") + + # Create a document and split it + doc = Document(text=content) + nodes = splitter.get_nodes_from_documents([doc]) + + # Extract text from nodes + chunks = [node.text for node in nodes if node.text] + + logger.debug(f"Split content into {len(chunks)} chunks") + return chunks if chunks else [content] + + except Exception as e: + logger.exception(f"Error chunking with llama-index: {e}") + # Fallback to simple chunking + return simple_chunk(content, chunk_size, chunk_overlap) + + +def simple_chunk(text: str, chunk_size: int = 800, chunk_overlap: int = 100) -> List[str]: + """ + Simple character-based chunking fallback. + + Args: + text: Text to chunk + chunk_size: Size of each chunk + chunk_overlap: Overlap between chunks + + Returns: + List of text chunks + """ + if not text: + return [] + + chunks = [] + step = max(1, chunk_size - chunk_overlap) + + for i in range(0, len(text), step): + end = min(i + chunk_size, len(text)) + chunk = text[i:end] + if chunk.strip(): + chunks.append(chunk) + + if end >= len(text): + break + + return chunks if chunks else [text] diff --git a/ai/llama_embeddings.py b/ai/llama_embeddings.py new file mode 100644 index 0000000..142823a --- /dev/null +++ b/ai/llama_embeddings.py @@ -0,0 +1,100 @@ +""" +LlamaIndex-compatible embeddings using OpenAI API. +Replaces the custom EmbeddingClient with llama-index's embedding abstraction. +""" +from typing import List, Optional +from llama_index.core.embeddings import BaseEmbedding +from llama_index.core.bridge.pydantic import PrivateAttr +from openai import OpenAI + +from utils.config import CFG +from utils.logger import get_logger + +logger = get_logger(__name__) + + +class OpenAICompatibleEmbedding(BaseEmbedding): + """ + LlamaIndex-compatible embedding model using OpenAI-compatible API. + Works with any OpenAI-compatible endpoint (OpenAI, Azure, local servers, etc.) + """ + + _client: OpenAI = PrivateAttr() + _model: str = PrivateAttr() + + def __init__( + self, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + model: Optional[str] = None, + **kwargs + ): + """ + Initialize the embedding model. + + Args: + api_key: OpenAI API key (defaults to config) + api_base: API base URL (defaults to config) + model: Model name (defaults to config) + """ + super().__init__(**kwargs) + + # Get config values + self._client = OpenAI( + api_key=api_key or CFG.get("api_key"), + base_url=api_base or CFG.get("api_url") + ) + self._model = model or CFG.get("embedding_model") or "text-embedding-3-small" + + logger.info(f"Initialized OpenAICompatibleEmbedding with model: {self._model}") + + @classmethod + def class_name(cls) -> str: + return "OpenAICompatibleEmbedding" + + async def _aget_query_embedding(self, query: str) -> List[float]: + """Get query embedding asynchronously.""" + return self._get_query_embedding(query) + + async def _aget_text_embedding(self, text: str) -> List[float]: + """Get text embedding asynchronously.""" + return self._get_text_embedding(text) + + def _get_query_embedding(self, query: str) -> List[float]: + """Get embedding for a query.""" + return self._get_text_embedding(query) + + def _get_text_embedding(self, text: str) -> List[float]: + """Get embedding for a text.""" + try: + # Clean the text + text = text.replace("\n", " ").strip() + if not text: + logger.warning("Empty text provided for embedding") + return [] + + # Call OpenAI API + response = self._client.embeddings.create( + input=[text], + model=self._model + ) + + if response.data and len(response.data) > 0: + embedding = response.data[0].embedding + logger.debug(f"Generated embedding with dimension: {len(embedding)}") + return embedding + else: + logger.error("No embedding returned from API") + return [] + + except Exception as e: + logger.exception(f"Failed to generate embedding: {e}") + return [] + + def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]: + """Get embeddings for multiple texts.""" + embeddings = [] + for text in texts: + embedding = self._get_text_embedding(text) + embeddings.append(embedding) + return embeddings diff --git a/ai/llama_integration.py b/ai/llama_integration.py index bf8d18e..e6cd297 100644 --- a/ai/llama_integration.py +++ b/ai/llama_integration.py @@ -1,48 +1,69 @@ """ LlamaIndex integration for document retrieval. +Provides RAG functionality using llama-index with sqlite-vector backend. """ from typing import List from llama_index.core import Document +from llama_index.core.vector_stores.types import VectorStoreQuery -from .openai import EmbeddingClient +from .llama_embeddings import OpenAICompatibleEmbedding +from .llama_vector_store import SQLiteVectorStore from utils.logger import get_logger logger = get_logger(__name__) # Create a module-level embedding client instance -_embedding_client = EmbeddingClient() +_embedding_client = OpenAICompatibleEmbedding() -def llama_index_retrieve_documents(query: str, database_path: str, top_k: int = 5, - search_func=None, get_chunk_func=None) -> List[Document]: +def llama_index_search(query: str, database_path: str, top_k: int = 5) -> List[Document]: """ - Return llama_index.core.Document objects for the top_k matching chunks using sqlite-vector. + Perform semantic search using llama-index with sqlite-vector backend. Args: query: Search query text database_path: Path to project database top_k: Number of results to return - search_func: Function to search vectors (injected from analyzer) - get_chunk_func: Function to get chunk text (injected from analyzer) Returns: List of Document objects with chunk text and metadata """ - if search_func is None or get_chunk_func is None: - raise ValueError("search_func and get_chunk_func must be provided") - - q_emb = _embedding_client.embed_text(query, file_path="", chunk_index=0) - if not q_emb: + try: + # Get query embedding using llama-index embedding client + q_emb = _embedding_client._get_query_embedding(query) + if not q_emb: + logger.warning("Failed to generate query embedding") + return [] + + # Create vector store + vector_store = SQLiteVectorStore(database_path) + + # Create query + vector_query = VectorStoreQuery( + query_embedding=q_emb, + similarity_top_k=top_k + ) + + # Execute query + query_result = vector_store.query(vector_query) + + # Convert TextNodes to Documents + docs: List[Document] = [] + for node, score in zip(query_result.nodes, query_result.similarities): + doc = Document( + text=node.text, + metadata={ + **node.metadata, + "score": score + } + ) + docs.append(doc) + + logger.info(f"llama-index search returned {len(docs)} documents") + return docs + + except Exception as e: + logger.exception(f"llama-index search failed: {e}") return [] - rows = search_func(database_path, q_emb, top_k=top_k) - docs: List[Document] = [] - for r in rows: - fid = r.get("file_id") - path = r.get("path") - chunk_idx = r.get("chunk_index", 0) - score = r.get("score", 0.0) - chunk_text = get_chunk_func(database_path, fid, chunk_idx) or "" - doc = Document(text=chunk_text, extra_info={"path": path, "file_id": fid, "chunk_index": chunk_idx, "score": score}) - docs.append(doc) - return docs + diff --git a/ai/llama_vector_store.py b/ai/llama_vector_store.py new file mode 100644 index 0000000..7a8ff39 --- /dev/null +++ b/ai/llama_vector_store.py @@ -0,0 +1,135 @@ +""" +Custom LlamaIndex Vector Store implementation using sqlite-vector. +This bridges llama-index's vector store interface with our sqlite-vector backend. +""" +from typing import List, Optional, Any, Dict +from llama_index.core.vector_stores.types import ( + VectorStore, + VectorStoreQuery, + VectorStoreQueryResult, +) +from llama_index.core.schema import TextNode, BaseNode + +from db.vector_operations import search_vectors, get_chunk_text +from utils.logger import get_logger + +logger = get_logger(__name__) + + +class SQLiteVectorStore(VectorStore): + """ + Custom vector store implementation that uses sqlite-vector backend. + Compatible with llama-index's VectorStore interface. + """ + + def __init__(self, database_path: str): + """ + Initialize the SQLite vector store. + + Args: + database_path: Path to the SQLite database with vector extension + """ + self.database_path = database_path + self._is_embedding_query = True + logger.info(f"Initialized SQLiteVectorStore with database: {database_path}") + + @property + def client(self) -> Any: + """Return the database path as the client.""" + return self.database_path + + def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]: + """ + Add nodes to the vector store. + Note: In our implementation, nodes are added during the indexing process + via the analyzer module, not through this interface. + """ + logger.warning("add() called on SQLiteVectorStore - nodes should be added via analyzer module") + return [] + + def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: + """Delete a document from the vector store.""" + logger.warning(f"delete() called on SQLiteVectorStore for {ref_doc_id} - not implemented") + pass + + def query( + self, + query: VectorStoreQuery, + **kwargs: Any, + ) -> VectorStoreQueryResult: + """ + Query the vector store. + + Args: + query: VectorStoreQuery with query embedding and parameters + + Returns: + VectorStoreQueryResult with nodes, similarities, and ids + """ + if query.query_embedding is None: + logger.error("Query embedding is None") + return VectorStoreQueryResult(nodes=[], similarities=[], ids=[]) + + # Get top_k from query, default to 5 + top_k = query.similarity_top_k or 5 + + try: + # Use our existing search_vectors function + results = search_vectors( + database_path=self.database_path, + q_vector=query.query_embedding, + top_k=top_k + ) + + nodes: List[TextNode] = [] + similarities: List[float] = [] + ids: List[str] = [] + + for result in results: + file_id = result["file_id"] + path = result["path"] + chunk_index = result["chunk_index"] + score = result["score"] + + # Retrieve the actual chunk text + chunk_text = get_chunk_text(self.database_path, file_id, chunk_index) + + if chunk_text: + # Create a TextNode for llama-index + node = TextNode( + text=chunk_text, + metadata={ + "file_id": file_id, + "path": path, + "chunk_index": chunk_index, + }, + id_=f"{file_id}_{chunk_index}" + ) + + nodes.append(node) + similarities.append(score) + ids.append(node.id_) + + logger.debug(f"Vector query returned {len(nodes)} results") + + return VectorStoreQueryResult( + nodes=nodes, + similarities=similarities, + ids=ids + ) + + except Exception as e: + logger.exception(f"Error querying vector store: {e}") + return VectorStoreQueryResult(nodes=[], similarities=[], ids=[]) + + def persist( + self, + persist_path: str, + fs: Optional[Any] = None, + ) -> None: + """ + Persist the vector store. + Note: Our SQLite database is already persistent. + """ + logger.debug("persist() called - SQLite database is already persistent") + pass diff --git a/ai/openai.py b/ai/openai.py index eeaa51c..336b075 100644 --- a/ai/openai.py +++ b/ai/openai.py @@ -106,240 +106,11 @@ def _retry_with_backoff(func, *args, **kwargs): delay = base_delay * (2 ** attempt) time.sleep(delay) - class EmbeddingError(Exception): """Custom exception for embedding failures""" pass -class EmbeddingClient: - """ - Embedding client with detailed logging, retry logic, and configurable timeouts. - Provides better debugging for embedding API failures. - Uses OpenAI SDK for proper API compatibility. - """ - def __init__(self, - api_url: Optional[str] = None, - api_key: Optional[str] = None, - model: Optional[str] = None, - timeout: float = 15.0, - max_retries: int = 2, - backoff: float = 1.5): - self.api_url = api_url or CFG.get("api_url") - self.api_key = api_key or CFG.get("api_key") - self.model = model or DEFAULT_EMBEDDING_MODEL or "text-embedding-3-small" - self.timeout = timeout - self.max_retries = max_retries - self.backoff = backoff - - # Use OpenAI SDK client instead of raw requests - # The SDK automatically handles the /embeddings path - self.client = _client - - def _generate_curl_command(self, payload: Dict[str, Any]) -> str: - """ - Generate a curl command for debugging purposes. - Masks the API key for security. - """ - # Construct the full embeddings URL - base_url = self.api_url.rstrip('/') - if not base_url.endswith('/embeddings'): - url = f"{base_url}/embeddings" - else: - url = base_url - - # Start with basic curl command - curl_parts = ["curl", "-X", "POST", f"'{url}'"] - - # Add standard headers - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer " - } - - for key, value in headers.items(): - curl_parts.append(f"-H '{key}: {value}'") - - # Add data payload - payload_json = json.dumps(payload) - # Escape single quotes in the JSON for shell compatibility - payload_json_escaped = payload_json.replace("'", "'\\''") - curl_parts.append(f"-d '{payload_json_escaped}'") - - return " \\\n ".join(curl_parts) - - def _save_curl_script(self, curl_command: str, request_id: str, file_path: str, chunk_index: int) -> Optional[str]: - """ - Save curl command to a bash script in /tmp for debugging. - Returns the path to the generated script, or None if save failed. - """ - try: - import tempfile - # Create a unique filename based on request_id - script_name = f"embedding_debug_{request_id[:8]}.sh" - script_path = os.path.join("/tmp", script_name) - - # Generate script content with shebang and comments - script_content = f"""#!/bin/bash -# Embedding request debug script -# Request ID: {request_id} -# File: {file_path} -# Chunk: {chunk_index} -# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')} - -{curl_command} -""" - - with open(script_path, 'w') as f: - f.write(script_content) - - # Make the script executable - os.chmod(script_path, 0o755) - - return script_path - except Exception as e: - _embedding_logger.warning(f"Failed to save curl debug script: {e}") - return None - - - def _log_request_start(self, request_id: str, file_path: str, chunk_index: int, chunk_len: int): - _embedding_logger.debug( - "Embedding request START", - extra={ - "request_id": request_id, - "file": file_path, - "chunk_index": chunk_index, - "chunk_length": chunk_len, - "model": self.model, - "api_url": self.api_url, - "timeout": self.timeout, - }, - ) - - def _log_request_end(self, request_id: str, elapsed: float, status: Optional[int], response_body_preview: str): - _embedding_logger.debug( - "Embedding request END", - extra={ - "request_id": request_id, - "elapsed_s": elapsed, - "status": status, - "response_preview": response_body_preview, - }, - ) - - def embed_text(self, text: str, file_path: str = "", chunk_index: int = 0) -> List[float]: - """ - Embed a single chunk of text using OpenAI SDK. Returns the embedding vector. - Raises EmbeddingError on failure. - """ - request_id = str(uuid.uuid4()) - chunk_len = len(text) - self._log_request_start(request_id, file_path, chunk_index, chunk_len) - - payload = { - "model": self.model, - "input": text, - } - - attempt = 0 - err_msg = "" - while True: - attempt += 1 - start = time.perf_counter() - try: - # Use OpenAI SDK for embeddings - resp = self.client.embeddings.create( - model=self.model, - input=text, - timeout=self.timeout - ) - elapsed = time.perf_counter() - start - - # Log successful response - self._log_request_end(request_id, elapsed, 200, "Success") - - # Extract embedding from response - # The SDK returns a response object with a data list - if resp and hasattr(resp, 'data') and len(resp.data) > 0: - embedding = resp.data[0].embedding - if embedding and isinstance(embedding, list): - return embedding - else: - raise EmbeddingError(f"Invalid embedding format in response") - else: - raise EmbeddingError(f"Unexpected embedding response shape from SDK") - - except Exception as e: - elapsed = time.perf_counter() - start - err_msg = f"Error after {elapsed:.2f}s: {e}" - - # Save debug information for timeout or API errors - script_path = None - if CFG.get("debug"): - # Generate curl command for debugging - curl_command = self._generate_curl_command(payload) - script_path = self._save_curl_script(curl_command, request_id, file_path, chunk_index) - if script_path: - _embedding_logger.error(f"\nDebug script saved to: {script_path}") - _embedding_logger.error(f"Run with: bash {script_path}") - else: - _embedding_logger.error(f"\nDebug with this curl command:") - _embedding_logger.error(curl_command) - - _embedding_logger.error( - "Embedding API Error", - extra={ - "request_id": request_id, - "error": str(e), - "elapsed_s": elapsed, - "attempt": attempt, - "file": file_path, - "chunk_index": chunk_index, - } - ) - - # Retry logic - if attempt > self.max_retries: - _embedding_logger.error( - "Max retries exceeded for embedding request", - extra={"request_id": request_id, "file": file_path, "chunk_index": chunk_index, "attempts": attempt}, - ) - raise EmbeddingError(f"Failed to get embedding after {attempt} attempts. Last error: {err_msg}") - - # Backoff and retry - sleep_for = self.backoff * (2 ** (attempt - 1)) - _embedding_logger.info( - "Retrying embedding request", - extra={ - "request_id": request_id, - "file": file_path, - "chunk_index": chunk_index, - "attempt": attempt, - "sleep_s": sleep_for, - }, - ) - time.sleep(sleep_for) - - def embed_multiple(self, chunks: List[str], file_path: str = "") -> List[Dict[str, Any]]: - """ - Embed a list of text chunks. Returns list of dicts: {"chunk_index": i, "embedding": [...]}. - This method logs progress and errors for each chunk. - """ - results = [] - for i, chunk in enumerate(chunks): - try: - emb = self.embed_text(chunk, file_path=file_path, chunk_index=i) - results.append({"chunk_index": i, "embedding": emb}) - except EmbeddingError as e: - _embedding_logger.error( - "Failed to embed chunk", - extra={"file": file_path, "chunk_index": i, "error": str(e)}, - ) - # append a failure marker or skip depending on desired behavior - results.append({"chunk_index": i, "embedding": None, "error": str(e)}) - return results - - def call_coding_api(prompt: str, model: Optional[str] = None, max_tokens: int = 1024): """ Call a generative/coding model via the new OpenAI client. diff --git a/ai/smart_chunker.py b/ai/smart_chunker.py deleted file mode 100644 index d67dc7f..0000000 --- a/ai/smart_chunker.py +++ /dev/null @@ -1,306 +0,0 @@ -""" -Smart chunking module for code-aware text splitting. -Respects code structure (functions, classes, methods) for better semantic search. -""" -import re -from typing import List, Tuple, Optional -from pathlib import Path - - -class SmartChunker: - """ - Code-aware chunker that splits text based on language structure. - Falls back to simple chunking for non-code or unknown languages. - """ - - def __init__(self, chunk_size: int = 800, overlap: int = 100): - self.chunk_size = chunk_size - self.overlap = overlap - - def chunk(self, text: str, language: str = "text") -> List[str]: - """ - Chunk text based on language-specific rules. - - Args: - text: Text content to chunk - language: Programming language identifier - - Returns: - List of text chunks - """ - if language in ["python", "javascript", "typescript", "java", "go", "rust", "c", "cpp"]: - return self._chunk_code(text, language) - else: - return self._chunk_simple(text) - - def _chunk_code(self, text: str, language: str) -> List[str]: - """ - Smart chunking for code that respects structure. - """ - # Split into logical units (functions, classes, etc.) - units = self._split_into_units(text, language) - - if not units: - # Fallback to simple chunking if structure detection fails - return self._chunk_simple(text) - - chunks = [] - current_chunk = [] - current_size = 0 - - for unit_text, unit_type in units: - unit_size = len(unit_text) - - # If single unit is larger than chunk_size, split it - if unit_size > self.chunk_size: - # Save current chunk if it has content - if current_chunk: - chunks.append("\n".join(current_chunk)) - current_chunk = [] - current_size = 0 - - # Split large unit with simple chunking - sub_chunks = self._chunk_simple(unit_text) - chunks.extend(sub_chunks) - continue - - # Check if adding this unit would exceed chunk_size - if current_size + unit_size > self.chunk_size and current_chunk: - # Save current chunk - chunks.append("\n".join(current_chunk)) - - # Start new chunk with overlap - # Keep last unit for context - if len(current_chunk) > 1: - last_unit = current_chunk[-1] - current_chunk = [last_unit, unit_text] - current_size = len(last_unit) + unit_size - else: - current_chunk = [unit_text] - current_size = unit_size - else: - # Add to current chunk - current_chunk.append(unit_text) - current_size += unit_size - - # Add remaining chunk - if current_chunk: - chunks.append("\n".join(current_chunk)) - - return chunks if chunks else [text] - - def _split_into_units(self, text: str, language: str) -> List[Tuple[str, str]]: - """ - Split code into logical units (functions, classes, etc.). - Returns list of (text, unit_type) tuples. - """ - if language == "python": - return self._split_python(text) - elif language in ["javascript", "typescript"]: - return self._split_javascript(text) - elif language == "java": - return self._split_java(text) - elif language in ["go", "rust", "c", "cpp"]: - return self._split_c_style(text) - else: - return [] - - def _split_python(self, text: str) -> List[Tuple[str, str]]: - """ - Split Python code into classes and functions. - - Uses indentation-based parsing. Works well for most Python code - but may have edge cases with complex indentation patterns. - Falls back to simple chunking if parsing fails. - """ - units = [] - lines = text.split("\n") - current_unit = [] - current_type = None - indent_stack = [] # only populated when a class/def starts - - for i, line in enumerate(lines): - stripped = line.lstrip() - indent = len(line) - len(stripped) - - # Detect class or function definition - if stripped.startswith("class ") or stripped.startswith("def "): - # Save previous unit if exists - if current_unit: - units.append(("\n".join(current_unit), current_type or "code")) - current_unit = [] - - current_type = "class" if stripped.startswith("class ") else "function" - current_unit = [line] - indent_stack = [indent] - elif current_unit: - # Continue current unit - current_unit.append(line) - - # Check if we're back to base indent (end of function/class) - # Guard access to indent_stack: only compare indent if indent_stack is populated - if stripped and not stripped.startswith("#") and indent_stack and indent <= indent_stack[0]: - if i < len(lines) - 1: # Not last line - # Check next line to see if it's a new definition - next_stripped = lines[i + 1].lstrip() - if next_stripped.startswith("class ") or next_stripped.startswith("def "): - # End current unit - # current_unit contains the line that dedented; we want to separate the trailing dedent line - # The previous block is current_unit[:-1], remaining starts from current_unit[-1] - units.append(("\n".join(current_unit[:-1]), current_type)) - # Start module-level accumulation with the dedent line - current_unit = [current_unit[-1]] - current_type = "module" - indent_stack = [] - else: - # Module-level code - if not current_unit: - current_type = "module" - current_unit.append(line) - - # Add remaining unit - if current_unit: - units.append(("\n".join(current_unit), current_type or "code")) - - return units - - def _split_javascript(self, text: str) -> List[Tuple[str, str]]: - """ - Split JavaScript/TypeScript code into functions and classes. - - Uses regex patterns to match function and class declarations. - Works well for standard code patterns but may not handle all - edge cases with nested structures. Falls back to brace-based - splitting if regex matching doesn't find units. - """ - units = [] - - # Regex patterns for JS/TS - # Match function declarations, arrow functions, class declarations - # Note: Non-greedy matching, works for most cases but not perfect for deeply nested code - patterns = [ - r'((?:export\s+)?(?:async\s+)?function\s+\w+\s*\([^)]*\)\s*{[\s\S]*?})', - r'((?:export\s+)?const\s+\w+\s*=\s*(?:async\s*)?\([^)]*\)\s*=>\s*{[\s\S]*?})', - r'((?:export\s+)?class\s+\w+(?:\s+extends\s+\w+)?\s*{[\s\S]*?})', - ] - - # Try to match and extract units - for pattern in patterns: - matches = re.finditer(pattern, text) - for match in matches: - unit_text = match.group(1) - unit_type = "function" if "function" in unit_text or "=>" in unit_text else "class" - units.append((unit_text, unit_type)) - - # If no matches, fall back to brace-based splitting - if not units: - units = self._split_by_braces(text) - - return units - - def _split_java(self, text: str) -> List[Tuple[str, str]]: - """Split Java code into classes and methods.""" - # Similar to JavaScript but with Java-specific patterns - patterns = [ - r'((?:public|private|protected)?\s*(?:static)?\s*(?:class|interface|enum)\s+\w+[\s\S]*?{[\s\S]*?})', - r'((?:public|private|protected)?\s*(?:static)?\s*(?:\w+\s+)?\w+\s*\([^)]*\)\s*(?:throws\s+\w+(?:,\s*\w+)*)?\s*{[\s\S]*?})', - ] - - units = [] - for pattern in patterns: - matches = re.finditer(pattern, text) - for match in matches: - unit_text = match.group(1) - unit_type = "class" if any(kw in unit_text for kw in ["class", "interface", "enum"]) else "method" - units.append((unit_text, unit_type)) - - if not units: - units = self._split_by_braces(text) - - return units - - def _split_c_style(self, text: str) -> List[Tuple[str, str]]: - """Split C-style languages (Go, Rust, C, C++) into functions.""" - units = self._split_by_braces(text) - return units if units else [] - - def _split_by_braces(self, text: str) -> List[Tuple[str, str]]: - """ - Generic brace-based splitting for C-style languages. - Finds balanced brace blocks. - - Note: This is a simple heuristic that doesn't handle braces - inside strings, comments, or template literals. It works well - for most code but may produce imperfect results in edge cases. - The chunker will still fall back to simple chunking if needed. - """ - units = [] - lines = text.split("\n") - current_unit = [] - brace_count = 0 - in_block = False - - for line in lines: - current_unit.append(line) - - # Count braces (simple heuristic) - # Note: Doesn't handle strings/comments perfectly, but works well in practice - brace_count += line.count("{") - line.count("}") - - if "{" in line and not in_block: - in_block = True - - if in_block and brace_count == 0: - # Block closed - units.append(("\n".join(current_unit), "function")) - current_unit = [] - in_block = False - - # Add remaining lines - if current_unit: - units.append(("\n".join(current_unit), "code")) - - return units - - def _chunk_simple(self, text: str) -> List[str]: - """ - Simple character-based chunking with overlap. - Used as fallback or for non-code content. - """ - if not text: - return [] - - if len(text) <= self.chunk_size: - return [text] - - chunks = [] - step = max(1, self.chunk_size - self.overlap) - start = 0 - - while start < len(text): - end = min(start + self.chunk_size, len(text)) - chunks.append(text[start:end]) - start += step - - return chunks - - -# Global instance for convenience -_default_chunker = SmartChunker() - - -def smart_chunk(text: str, language: str = "text", chunk_size: int = 800, overlap: int = 100) -> List[str]: - """ - Convenience function for smart chunking. - - Args: - text: Text to chunk - language: Programming language - chunk_size: Maximum chunk size in characters - overlap: Overlap between chunks in characters - - Returns: - List of text chunks - """ - chunker = SmartChunker(chunk_size=chunk_size, overlap=overlap) - return chunker.chunk(text, language) diff --git a/db/models.py b/db/models.py index 41e7608..ae5c5cf 100644 --- a/db/models.py +++ b/db/models.py @@ -12,6 +12,7 @@ class CreateProjectRequest(BaseModel): class IndexProjectRequest(BaseModel): project_id: str + incremental: Optional[bool] = True # Default to incremental indexing class QueryRequest(BaseModel): diff --git a/endpoints/project_endpoints.py b/endpoints/project_endpoints.py index 73e5eb0..5c71e12 100644 --- a/endpoints/project_endpoints.py +++ b/endpoints/project_endpoints.py @@ -104,13 +104,20 @@ def api_get_project(project_id: str): # Add indexing statistics if project has a database db_path = project.get("database_path") + if db_path and os.path.exists(db_path): try: - from db.operations import get_project_stats + from db.operations import get_project_stats, get_project_metadata stats = get_project_stats(db_path) + + # Get total files from metadata (stored during indexing for performance) + total_files_str = get_project_metadata(db_path, "total_files") + total_files = int(total_files_str) if total_files_str else 0 + project["indexing_stats"] = { "file_count": stats.get("file_count", 0), "embedding_count": stats.get("embedding_count", 0), + "total_files": total_files, "is_indexed": stats.get("file_count", 0) > 0 } except Exception as e: @@ -118,12 +125,14 @@ def api_get_project(project_id: str): project["indexing_stats"] = { "file_count": 0, "embedding_count": 0, + "total_files": 0, "is_indexed": False } else: project["indexing_stats"] = { "file_count": 0, "embedding_count": 0, + "total_files": 0, "is_indexed": False } @@ -160,11 +169,12 @@ def api_index_project(http_request: Request, request: IndexProjectRequest, backg Index or re-index a project in the background. - **project_id**: Unique project identifier + - **incremental**: If True (default), only index new/changed files. If False, re-index all files. Starts background indexing process: - Scans project directory for code files - Generates embeddings for semantic search - - Uses incremental indexing (skips unchanged files) + - Uses incremental indexing by default (skips unchanged files) Rate limit: 10 requests per minute per IP. @@ -195,20 +205,31 @@ def api_index_project(http_request: Request, request: IndexProjectRequest, backg # Update status to indexing update_project_status(request.project_id, "indexing") - # Start background indexing + # Start background indexing with incremental flag venv_path = CFG.get("venv_path") + incremental = request.incremental if request.incremental is not None else True def index_callback(): try: - analyze_local_path_background(project_path, db_path, venv_path, MAX_FILE_SIZE, CFG) + from ai.analyzer import analyze_local_path_sync + # Use sync version directly with incremental flag + analyze_local_path_sync(project_path, db_path, venv_path, MAX_FILE_SIZE, CFG, incremental=incremental) update_project_status(request.project_id, "ready", datetime.utcnow().isoformat()) except Exception as e: + logger.exception(f"Indexing failed for project {request.project_id}: {e}") update_project_status(request.project_id, "error") raise background_tasks.add_task(index_callback) - return JSONResponse({"status": "indexing", "project_id": request.project_id}) + indexing_type = "incremental" if incremental else "full" + logger.info(f"Started {indexing_type} indexing for project {request.project_id}") + + return JSONResponse({ + "status": "indexing", + "project_id": request.project_id, + "incremental": incremental + }) except Exception as e: logger.exception(f"Error starting project indexing: {e}") return JSONResponse({"error": "Failed to start indexing"}, status_code=500) diff --git a/endpoints/query_endpoints.py b/endpoints/query_endpoints.py index 51674db..0ecdbae 100644 --- a/endpoints/query_endpoints.py +++ b/endpoints/query_endpoints.py @@ -33,12 +33,14 @@ def api_query(http_request: Request, request: QueryRequest): Performs semantic search using vector embeddings: - Generates embedding for query - Finds most similar code chunks - - Returns ranked results with scores + - Returns ranked results with scores and content + + Note: Content is always included as it's needed for the coding model. Rate limit: 100 requests per minute per IP. Returns: - - **results**: Array of matching code chunks + - **results**: Array of matching code chunks with content - **project_id**: Project identifier - **query**: Original query text """ diff --git a/endpoints/web_endpoints.py b/endpoints/web_endpoints.py index b1866ae..43c464a 100644 --- a/endpoints/web_endpoints.py +++ b/endpoints/web_endpoints.py @@ -184,24 +184,51 @@ async def code_endpoint(request: Request): # If RAG requested, perform semantic search and build context if use_rag: try: + # Retrieve with content (always included) retrieved = search_semantic(prompt, database_path, top_k=top_k) - # Build context WITHOUT including snippets: only include file references and scores + # Build context WITH actual file content for better RAG results context_parts = [] total_len = len(combined_context) for r in retrieved: - part = f"File: {r.get('path')} (score: {r.get('score', 0):.4f})\n" + content = r.get("content", "") + path = r.get("path", "") + score = r.get("score", 0) + + # Include file path, score, and actual content + part = f"File: {path} (score: {score:.4f})\n{content}\n" + if total_len + len(part) > TOTAL_CONTEXT_LIMIT: + # If full content doesn't fit, try to include at least partial content + remaining = TOTAL_CONTEXT_LIMIT - total_len + if remaining > 200: # Only include if we have meaningful space + truncated_content = content[:remaining - 100] + "..." + part = f"File: {path} (score: {score:.4f})\n{truncated_content}\n" + context_parts.append(part) + used_context.append({ + "path": path, + "score": score, + "file_id": r.get("file_id"), + "chunk_index": r.get("chunk_index") + }) break + context_parts.append(part) total_len += len(part) - used_context.append({"path": r.get("path"), "score": r.get("score")}) + used_context.append({ + "path": path, + "score": score, + "file_id": r.get("file_id"), + "chunk_index": r.get("chunk_index") + }) + if context_parts: - retrieved_text = "\n".join(context_parts) + retrieved_text = "\n---\n".join(context_parts) if combined_context: - combined_context = combined_context + "\n\nRetrieved:\n" + retrieved_text + combined_context = combined_context + "\n\nRetrieved Context:\n" + retrieved_text else: - combined_context = "Retrieved:\n" + retrieved_text - except Exception: + combined_context = "Retrieved Context:\n" + retrieved_text + except Exception as e: + logger.exception(f"RAG search failed: {e}") used_context = [] # Call the coding model with prompt and combined_context diff --git a/services/search_service.py b/services/search_service.py index df71187..98c084e 100644 --- a/services/search_service.py +++ b/services/search_service.py @@ -28,6 +28,7 @@ def semantic_search( ) -> Dict[str, Any]: """ Perform semantic search on a project. + Content is always included as it's required for the coding model. Args: project_id: Project identifier @@ -36,7 +37,7 @@ def semantic_search( use_cache: Whether to use result caching Returns: - Dictionary with results, project_id, and query + Dictionary with results (including content), project_id, and query Raises: ValueError: If project not found or not indexed @@ -53,15 +54,10 @@ def semantic_search( if stats.get("file_count", 0) == 0: raise ValueError(f"Project not indexed: {project_id}") - # Check cache - if use_cache: - cache_key = SearchService._make_cache_key(project_id, query, top_k) - cached = search_cache.get(cache_key) - if cached is not None: - logger.debug(f"Cache hit for query: {query[:50]}") - return cached + # Note: Caching disabled for now since content makes results large + # Future: could cache without content and retrieve content on demand - # Perform search + # Perform search (always includes content) try: results = search_semantic(query, db_path, top_k=top_k) @@ -72,10 +68,6 @@ def semantic_search( "count": len(results) } - # Cache results - if use_cache: - search_cache.set(cache_key, response) - logger.info(f"Search completed: {len(results)} results for '{query[:50]}'") return response diff --git a/templates/index.html b/templates/index.html index c9bb355..e3bb56d 100644 --- a/templates/index.html +++ b/templates/index.html @@ -5,6 +5,10 @@ PicoCode - Local Codebase Assistant + + + +