Production LLM Connection
diff --git a/GUI/src/pages/TestProductionLLM/index.tsx b/GUI/src/pages/TestProductionLLM/index.tsx
index b5334c1..a9c1493 100644
--- a/GUI/src/pages/TestProductionLLM/index.tsx
+++ b/GUI/src/pages/TestProductionLLM/index.tsx
@@ -124,11 +124,11 @@ const TestProductionLLM: FC = () => {
setMessages(prev => [...prev, botMessage]);
// Show toast notification
- toast.open({
- type: botMessageType,
- title: t('errorOccurred'),
- message: t('errorMessage'),
- });
+ // toast.open({
+ // type: botMessageType,
+ // title: t('errorOccurred'),
+ // message: t('errorMessage'),
+ // });
} catch (error) {
console.error('Error sending message:', error);
diff --git a/GUI/src/services/llmConnections.ts b/GUI/src/services/llmConnections.ts
index 83882ab..5b3921c 100644
--- a/GUI/src/services/llmConnections.ts
+++ b/GUI/src/services/llmConnections.ts
@@ -30,6 +30,13 @@ export interface LLMConnection {
accessKey?: string;
// Embedding model credentials
embeddingModelApiKey?: string;
+ // Embedding AWS Bedrock credentials
+ embeddingAccessKey?: string;
+ embeddingSecretKey?: string;
+ // Embedding Azure credentials
+ embeddingDeploymentName?: string;
+ embeddingTargetUri?: string;
+ embeddingAzureApiKey?: string;
}
export interface LLMConnectionsResponse {
@@ -69,6 +76,16 @@ export interface LLMConnectionFilters {
environment?: string;
status?: string;
}
+
+export interface ProductionConnectionFilters {
+ llmPlatform?: string;
+ llmModel?: string;
+ embeddingPlatform?: string;
+ embeddingModel?: string;
+ connectionStatus?: string;
+ sortBy?: string;
+ sortOrder?: string;
+}
export interface LegacyLLMConnectionFilters {
page: number;
pageSize: number;
@@ -98,11 +115,18 @@ export interface LLMConnectionFormData {
accessKey?: string;
// Embedding model credentials
embeddingModelApiKey?: string;
+ // Embedding AWS Bedrock credentials
+ embeddingAccessKey?: string;
+ embeddingSecretKey?: string;
+ // Embedding Azure credentials
+ embeddingDeploymentName?: string;
+ embeddingTargetUri?: string;
+ embeddingAzureApiKey?: string;
}
// Vault secret service functions
async function createVaultSecret(connectionId: string, connectionData: LLMConnectionFormData): Promise {
-
+
const payload = {
connectionId,
llmPlatform: connectionData.llmPlatform,
@@ -121,19 +145,29 @@ async function createVaultSecret(connectionId: string, connectionData: LLMConnec
targetUrl: connectionData.targetUri || '',
apiKey: connectionData.apiKey || '',
}),
- embeddingModelApiKey: connectionData.embeddingModelApiKey || '',
+ // Embedding AWS Bedrock credentials
+ ...(connectionData.embeddingModelPlatform === 'aws' && {
+ embeddingAccessKey: connectionData.embeddingAccessKey || '',
+ embeddingSecretKey: connectionData.embeddingSecretKey || '',
+ }),
+ // Embedding Azure credentials
+ ...(connectionData.embeddingModelPlatform === 'azure' && {
+ embeddingDeploymentName: connectionData.embeddingDeploymentName || '',
+ embeddingTargetUri: connectionData.embeddingTargetUri || '',
+ embeddingAzureApiKey: connectionData.embeddingAzureApiKey || '',
+ }),
};
await apiDev.post(vaultEndpoints.CREATE_VAULT_SECRET(), payload);
}
async function deleteVaultSecret(connectionId: string, connectionData: Partial): Promise {
-
+
const payload = {
connectionId,
llmPlatform: connectionData.llmPlatform || '',
llmModel: connectionData.llmModel || '',
- embeddingModel: connectionData.embeddingModel || '',
+ embeddingModel: connectionData.embeddingModel || '',
embeddingPlatform: connectionData.embeddingModelPlatform || '',
deploymentEnvironment: connectionData.deploymentEnvironment?.toLowerCase() || '',
};
@@ -164,8 +198,22 @@ export async function getLLMConnection(id: string | number): Promise {
- const { data } = await apiDev.get(llmConnectionsEndpoints.GET_PRODUCTION_CONNECTION());
+export async function getProductionConnection(filters?: ProductionConnectionFilters): Promise {
+ const queryParams = new URLSearchParams();
+
+ if (filters?.llmPlatform) queryParams.append('llmPlatform', filters.llmPlatform);
+ if (filters?.llmModel) queryParams.append('llmModel', filters.llmModel);
+ if (filters?.embeddingPlatform) queryParams.append('embeddingPlatform', filters.embeddingPlatform);
+ if (filters?.embeddingModel) queryParams.append('embeddingModel', filters.embeddingModel);
+ if (filters?.connectionStatus) queryParams.append('connectionStatus', filters.connectionStatus);
+ if (filters?.sortBy) queryParams.append('sortBy', filters.sortBy);
+ if (filters?.sortOrder) queryParams.append('sortOrder', filters.sortOrder);
+
+ const url = queryParams.toString()
+ ? `${llmConnectionsEndpoints.GET_PRODUCTION_CONNECTION()}?${queryParams.toString()}`
+ : llmConnectionsEndpoints.GET_PRODUCTION_CONNECTION();
+
+ const { data } = await apiDev.get(url);
return data?.response?.[0] || null;
}
@@ -190,11 +238,17 @@ export async function createLLMConnection(connectionData: LLMConnectionFormData)
secret_key: maskSensitiveKey(connectionData.secretKey) || "",
access_key: maskSensitiveKey(connectionData.accessKey) || "",
// Embedding model credentials
- embedding_model_api_key: maskSensitiveKey(connectionData.embeddingModelApiKey) || "",
+ // Embedding AWS Bedrock credentials
+ embedding_access_key: maskSensitiveKey(connectionData.embeddingAccessKey) || "",
+ embedding_secret_key: maskSensitiveKey(connectionData.embeddingSecretKey) || "",
+ // Embedding Azure credentials
+ embedding_deployment_name: connectionData.embeddingDeploymentName || "",
+ embedding_target_uri: connectionData.embeddingTargetUri || "",
+ embedding_azure_api_key: maskSensitiveKey(connectionData.embeddingAzureApiKey) || "",
});
-
+
const connection = data?.response;
-
+
// After successful database creation, store secrets in vault
if (connection && connection.id) {
try {
@@ -205,7 +259,7 @@ export async function createLLMConnection(connectionData: LLMConnectionFormData)
// The connection is already created in the database
}
}
-
+
return connection;
}
@@ -233,22 +287,30 @@ export async function updateLLMConnection(
secret_key: maskSensitiveKey(connectionData.secretKey) || "",
access_key: maskSensitiveKey(connectionData.accessKey) || "",
// Embedding model credentials
- embedding_model_api_key: maskSensitiveKey(connectionData.embeddingModelApiKey) || "",
+ // Embedding AWS Bedrock credentials
+ embedding_access_key: maskSensitiveKey(connectionData.embeddingAccessKey) || "",
+ embedding_secret_key: maskSensitiveKey(connectionData.embeddingSecretKey) || "",
+ // Embedding Azure credentials
+ embedding_deployment_name: connectionData.embeddingDeploymentName || "",
+ embedding_target_uri: connectionData.embeddingTargetUri || "",
+ embedding_azure_api_key: maskSensitiveKey(connectionData.embeddingAzureApiKey) || "",
});
-
+
const connection = data?.response;
-
- // After successful database update, update secrets in vault
- if (connection) {
+
+ if (connection && (connectionData.secretKey && !connectionData.secretKey?.includes('*')
+ || connectionData.accessKey && !connectionData.accessKey?.includes('*')
+ || connectionData.apiKey && !connectionData.apiKey?.includes('*')
+ || connectionData.embeddingAccessKey && !connectionData.embeddingAccessKey?.includes('*')
+ || connectionData.embeddingSecretKey && !connectionData.embeddingSecretKey?.includes('*')
+ || connectionData.embeddingAzureApiKey && !connectionData.embeddingAzureApiKey?.includes('*'))) {
try {
await createVaultSecret(id.toString(), connectionData);
} catch (vaultError) {
console.error('Failed to update secrets in vault:', vaultError);
- // Note: We don't throw here to avoid breaking the connection update flow
- // The connection is already updated in the database
}
}
-
+
return connection;
}
@@ -260,12 +322,12 @@ export async function deleteLLMConnection(id: string | number): Promise {
} catch (error) {
console.error('Failed to get connection data before deletion:', error);
}
-
+
// Delete from database
await apiDev.post(llmConnectionsEndpoints.DELETE_LLM_CONNECTION(), {
connection_id: id,
});
-
+
// After successful database deletion, delete secrets from vault
if (connectionToDelete) {
try {
@@ -293,9 +355,9 @@ export async function checkBudgetStatus(): Promise {
return null;
}
}
-
+
export async function updateLLMConnectionStatus(
- id: string | number,
+ id: string | number,
status: 'active' | 'inactive'
): Promise {
const { data } = await apiDev.post(llmConnectionsEndpoints.UPDATE_LLM_CONNECTION_STATUS(), {
diff --git a/GUI/src/utils/queryKeys.ts b/GUI/src/utils/queryKeys.ts
index e004497..e10462e 100644
--- a/GUI/src/utils/queryKeys.ts
+++ b/GUI/src/utils/queryKeys.ts
@@ -1,5 +1,5 @@
import { PaginationState, SortingState } from '@tanstack/react-table';
-import { LLMConnectionFilters, LegacyLLMConnectionFilters } from 'services/llmConnections';
+import { LLMConnectionFilters, LegacyLLMConnectionFilters, ProductionConnectionFilters } from 'services/llmConnections';
import { InferenceRequest } from 'services/inference';
@@ -30,7 +30,7 @@ export const llmConnectionsQueryKeys = {
details: () => [...llmConnectionsQueryKeys.all(), 'detail'] as const,
detail: (id: string | number) => [...llmConnectionsQueryKeys.details(), id] as const,
budgetStatus: () => [...llmConnectionsQueryKeys.all(), 'budget-status'] as const,
- production: () => [...llmConnectionsQueryKeys.all(), 'production'] as const,
+ production: (filters?: ProductionConnectionFilters) => [...llmConnectionsQueryKeys.all(), 'production', filters] as const,
};
export const inferenceQueryKeys = {
diff --git a/endpoints.md b/endpoints.md
index 6bd4fc9..262e81a 100644
--- a/endpoints.md
+++ b/endpoints.md
@@ -357,12 +357,41 @@ GET /ruuter-private/llm/connections/list
| `llmPlatform` | `string` | Filter by LLM platform |
| `llmModel` | `string` | Filter by LLM model |
| `deploymentEnvironment` | `string` | Filter by environment (Testing / Production) |
+| `pageNumber` | `number` | Page number (1-based) |
+| `pageSize` | `number` | Number of items per page |
+| `sortBy` | `string` | Field to sort by |
+| `sortOrder` | `string` | Sort order: 'asc' or 'desc' |
### Example Request
```http
GET /ruuter-private/llm/connections/list?llmPlatform=OpenAI&deploymentEnvironment=Testing&model=GPT4
```
+---
+
+## 5. Get Production LLM Connection (with filters)
+
+### Endpoint
+```http
+GET /ruuter-private/llm/connections/production
+```
+
+### Query Parameters (Optional for filtering)
+| Parameter | Type | Description |
+|-----------|------|-------------|
+| `llmPlatform` | `string` | Filter by LLM platform |
+| `llmModel` | `string` | Filter by LLM model |
+| `embeddingPlatform` | `string` | Filter by embedding platform |
+| `embeddingModel` | `string` | Filter by embedding model |
+| `connectionStatus` | `string` | Filter by connection status |
+| `sortBy` | `string` | Field to sort by |
+| `sortOrder` | `string` | Sort order: 'asc' or 'desc' |
+
+### Example Request
+```http
+GET /ruuter-private/llm/connections/production?llmPlatform=OpenAI&connectionStatus=active
+```
+
### Response (200 OK)
```json
[
diff --git a/pyproject.toml b/pyproject.toml
index 760dbb7..774f8af 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -34,6 +34,7 @@ dependencies = [
"anthropic>=0.69.0",
"nemoguardrails>=0.16.0",
"tiktoken>=0.11.0",
+ "langfuse>=3.8.1",
]
[tool.pyright]
diff --git a/src/contextual_retrieval/contextual_retriever.py b/src/contextual_retrieval/contextual_retriever.py
index e76165a..8ab5d24 100644
--- a/src/contextual_retrieval/contextual_retriever.py
+++ b/src/contextual_retrieval/contextual_retriever.py
@@ -14,7 +14,7 @@
from loguru import logger
import asyncio
import time
-
+from langfuse import observe
from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig
# Type checking import to avoid circular dependency at runtime
@@ -126,6 +126,7 @@ def _clear_session_cache(self):
logger.debug("Clearing session LLM service cache")
self._session_llm_service = None
+ @observe(name="retrieve_contextual_chunks", as_type="retriever")
async def retrieve_contextual_chunks(
self,
original_question: str,
diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py
index 08f3596..b5d5f7d 100644
--- a/src/llm_orchestration_service.py
+++ b/src/llm_orchestration_service.py
@@ -5,6 +5,7 @@
import asyncio
import os
from loguru import logger
+from langfuse import Langfuse, observe
from llm_orchestrator_config.llm_manager import LLMManager
from models.request_models import (
@@ -28,6 +29,36 @@
from src.contextual_retrieval import ContextualRetriever
+class LangfuseConfig:
+ """Configuration for Langfuse integration."""
+
+ def __init__(self):
+ self.langfuse_client: Optional[Langfuse] = None
+ self._initialize_langfuse()
+
+ def _initialize_langfuse(self):
+ """Initialize Langfuse client with Vault secrets."""
+ try:
+ from llm_orchestrator_config.vault.vault_client import VaultAgentClient
+
+ vault = VaultAgentClient()
+ if vault.is_vault_available():
+ langfuse_secrets = vault.get_secret("langfuse/config")
+ if langfuse_secrets:
+ self.langfuse_client = Langfuse(
+ public_key=langfuse_secrets.get("public_key"),
+ secret_key=langfuse_secrets.get("secret_key"),
+ host=langfuse_secrets.get("host", "http://langfuse-web:3000"),
+ )
+ logger.info("Langfuse client initialized successfully")
+ else:
+ logger.warning("Langfuse secrets not found in Vault")
+ else:
+ logger.warning("Vault not available, Langfuse tracing disabled")
+ except Exception as e:
+ logger.warning(f"Failed to initialize Langfuse: {e}")
+
+
class LLMOrchestrationService:
"""
Service class for handling LLM orchestration with integrated guardrails.
@@ -39,8 +70,9 @@ class LLMOrchestrationService:
def __init__(self) -> None:
"""Initialize the orchestration service."""
- pass
+ self.langfuse_config = LangfuseConfig()
+ @observe(name="orchestration_request", as_type="agent")
def process_orchestration_request(
self, request: OrchestrationRequest
) -> Union[OrchestrationResponse, TestOrchestrationResponse]:
@@ -82,6 +114,38 @@ def process_orchestration_request(
# Log final costs and return response
self._log_costs(costs_dict)
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ total_costs = calculate_total_costs(costs_dict)
+
+ total_input_tokens = sum(
+ c.get("total_prompt_tokens", 0) for c in costs_dict.values()
+ )
+ total_output_tokens = sum(
+ c.get("total_completion_tokens", 0) for c in costs_dict.values()
+ )
+
+ langfuse.update_current_generation(
+ model=components["llm_manager"]
+ .get_provider_info()
+ .get("model", "unknown"),
+ usage_details={
+ "input": total_input_tokens,
+ "output": total_output_tokens,
+ "total": total_costs.get("total_tokens", 0),
+ },
+ cost_details={
+ "total": total_costs.get("total_cost", 0.0),
+ },
+ metadata={
+ "total_calls": total_costs.get("total_calls", 0),
+ "cost_breakdown": costs_dict,
+ "chat_id": request.chatId,
+ "author_id": request.authorId,
+ "environment": request.environment,
+ },
+ )
+ langfuse.flush()
return response
except Exception as e:
@@ -89,9 +153,20 @@ def process_orchestration_request(
f"Error processing orchestration request for chatId: {request.chatId}, "
f"error: {str(e)}"
)
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ metadata={
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "response_type": "technical_issue",
+ }
+ )
+ langfuse.flush()
self._log_costs(costs_dict)
return self._create_error_response(request)
+ @observe(name="initialize_service_components", as_type="span")
def _initialize_service_components(
self, request: OrchestrationRequest
) -> Dict[str, Any]:
@@ -212,6 +287,7 @@ def _log_generator_status(self, components: Dict[str, Any]) -> None:
except Exception as e:
logger.warning(f" Generator: Status check failed - {str(e)}")
+ @observe(name="execute_orchestration_pipeline", as_type="span")
def _execute_orchestration_pipeline(
self,
request: OrchestrationRequest,
@@ -262,6 +338,7 @@ def _execute_orchestration_pipeline(
components["guardrails_adapter"], generated_response, request, costs_dict
)
+ @observe(name="safe_initialize_guardrails", as_type="span")
def _safe_initialize_guardrails(
self, environment: str, connection_id: Optional[str]
) -> Optional[NeMoRailsAdapter]:
@@ -275,6 +352,7 @@ def _safe_initialize_guardrails(
logger.warning("Continuing without guardrails protection")
return None
+ @observe(name="safe_initialize_contextual_retriever", as_type="span")
def _safe_initialize_contextual_retriever(
self, environment: str, connection_id: Optional[str]
) -> Optional[ContextualRetriever]:
@@ -292,6 +370,7 @@ def _safe_initialize_contextual_retriever(
logger.warning("Continuing without chunk retrieval capabilities")
return None
+ @observe(name="safe_initialize_response_generator", as_type="span")
def _safe_initialize_response_generator(
self, llm_manager: LLMManager
) -> Optional[ResponseGeneratorAgent]:
@@ -449,6 +528,7 @@ def _create_out_of_scope_response(
content=OUT_OF_SCOPE_MESSAGE,
)
+ @observe(name="initialize_guardrails", as_type="span")
def _initialize_guardrails(
self, environment: str, connection_id: Optional[str]
) -> NeMoRailsAdapter:
@@ -479,6 +559,7 @@ def _initialize_guardrails(
logger.error(f"Failed to initialize Guardrails adapter: {str(e)}")
raise
+ @observe(name="check_input_guardrails", as_type="span")
def _check_input_guardrails(
self,
guardrails_adapter: NeMoRailsAdapter,
@@ -503,7 +584,26 @@ def _check_input_guardrails(
# Store guardrail costs
costs_dict["input_guardrails"] = result.usage
-
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ input=user_message,
+ metadata={
+ "guardrail_type": "input",
+ "allowed": result.allowed,
+ "verdict": result.verdict,
+ "blocked_reason": result.reason if not result.allowed else None,
+ "error": result.error if result.error else None,
+ },
+ usage_details={
+ "input": result.usage.get("total_prompt_tokens", 0),
+ "output": result.usage.get("total_completion_tokens", 0),
+ "total": result.usage.get("total_tokens", 0),
+ }, # type: ignore
+ cost_details={
+ "total": result.usage.get("total_cost", 0.0),
+ },
+ )
logger.info(
f"Input guardrails check completed: allowed={result.allowed}, "
f"cost=${result.usage.get('total_cost', 0):.6f}"
@@ -513,6 +613,15 @@ def _check_input_guardrails(
except Exception as e:
logger.error(f"Input guardrails check failed: {str(e)}")
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ metadata={
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "guardrail_type": "input",
+ }
+ )
# Return conservative result on error
return GuardrailCheckResult(
allowed=False,
@@ -522,6 +631,7 @@ def _check_input_guardrails(
usage={},
)
+ @observe(name="check_output_guardrails", as_type="span")
def _check_output_guardrails(
self,
guardrails_adapter: NeMoRailsAdapter,
@@ -546,7 +656,28 @@ def _check_output_guardrails(
# Store guardrail costs
costs_dict["output_guardrails"] = result.usage
-
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ input=assistant_message[:500], # Truncate for readability
+ output=result.verdict,
+ metadata={
+ "guardrail_type": "output",
+ "allowed": result.allowed,
+ "verdict": result.verdict,
+ "reason": result.reason if not result.allowed else None,
+ "error": result.error if result.error else None,
+ "response_length": len(assistant_message),
+ },
+ usage_details={
+ "input": result.usage.get("total_prompt_tokens", 0),
+ "output": result.usage.get("total_completion_tokens", 0),
+ "total": result.usage.get("total_tokens", 0),
+ }, # type: ignore
+ cost_details={
+ "total": result.usage.get("total_cost", 0.0),
+ },
+ )
logger.info(
f"Output guardrails check completed: allowed={result.allowed}, "
f"cost=${result.usage.get('total_cost', 0):.6f}"
@@ -556,6 +687,15 @@ def _check_output_guardrails(
except Exception as e:
logger.error(f"Output guardrails check failed: {str(e)}")
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ metadata={
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "guardrail_type": "output",
+ }
+ )
# Return conservative result on error
return GuardrailCheckResult(
allowed=False,
@@ -631,6 +771,7 @@ def _log_costs(self, costs_dict: Dict[str, Dict[str, Any]]) -> None:
except Exception as e:
logger.warning(f"Failed to log costs: {str(e)}")
+ @observe(name="initialize_llm_manager", as_type="span")
def _initialize_llm_manager(
self, environment: str, connection_id: Optional[str]
) -> LLMManager:
@@ -660,6 +801,7 @@ def _initialize_llm_manager(
logger.error(f"Failed to initialize LLM Manager: {str(e)}")
raise
+ @observe(name="refine_user_prompt", as_type="chain")
def _refine_user_prompt(
self,
llm_manager: LLMManager,
@@ -725,7 +867,32 @@ def _refine_user_prompt(
raise ValueError(
f"Prompt refinement validation failed: {str(validation_error)}"
) from validation_error
-
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ refinement_applied = (
+ original_message.strip()
+ != validated_output.original_question.strip()
+ )
+ langfuse.update_current_generation(
+ model=llm_manager.get_provider_info().get("model", "unknown"),
+ input=original_message,
+ usage_details={
+ "input": usage_info.get("total_prompt_tokens", 0),
+ "output": usage_info.get("total_completion_tokens", 0),
+ "total": usage_info.get("total_tokens", 0),
+ },
+ cost_details={
+ "total": usage_info.get("total_cost", 0.0),
+ },
+ metadata={
+ "num_calls": usage_info.get("num_calls", 0),
+ "num_refined_questions": len(
+ validated_output.refined_questions
+ ),
+ "refinement_applied": refinement_applied,
+ "conversation_history_length": len(history),
+ }, # type: ignore
+ )
output_json = validated_output.model_dump()
logger.info(
f"Prompt refinement output: {json.dumps(output_json, indent=2)}"
@@ -738,9 +905,19 @@ def _refine_user_prompt(
raise
except Exception as e:
logger.error(f"Prompt refinement failed: {str(e)}")
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ metadata={
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "refinement_failed": True,
+ }
+ )
logger.error(f"Failed to refine message: {original_message}")
raise RuntimeError(f"Prompt refinement process failed: {str(e)}") from e
+ @observe(name="initialize_contextual_retriever", as_type="span")
def _initialize_contextual_retriever(
self, environment: str, connection_id: Optional[str]
) -> ContextualRetriever:
@@ -774,6 +951,7 @@ def _initialize_contextual_retriever(
logger.error(f"Failed to initialize contextual retriever: {str(e)}")
raise
+ @observe(name="initialize_response_generator", as_type="span")
def _initialize_response_generator(
self, llm_manager: LLMManager
) -> ResponseGeneratorAgent:
@@ -800,6 +978,7 @@ def _initialize_response_generator(
logger.error(f"Failed to initialize response generator: {str(e)}")
raise
+ @observe(name="generate_rag_response", as_type="generation")
def _generate_rag_response(
self,
llm_manager: LLMManager,
@@ -867,7 +1046,27 @@ def _generate_rag_response(
},
)
costs_dict["response_generator"] = generator_usage
-
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ model=llm_manager.get_provider_info().get("model", "unknown"),
+ usage_details={
+ "input": generator_usage.get("total_prompt_tokens", 0),
+ "output": generator_usage.get("total_completion_tokens", 0),
+ "total": generator_usage.get("total_tokens", 0),
+ },
+ cost_details={
+ "total": generator_usage.get("total_cost", 0.0),
+ },
+ metadata={
+ "num_calls": generator_usage.get("num_calls", 0),
+ "question_out_of_scope": question_out_of_scope,
+ "num_chunks_used": len(relevant_chunks)
+ if relevant_chunks
+ else 0,
+ },
+ output=answer,
+ )
if question_out_of_scope:
logger.info("Question determined out-of-scope – sending fixed message.")
if request.environment == "test":
@@ -910,6 +1109,16 @@ def _generate_rag_response(
except Exception as e:
logger.error(f"RAG Response generation failed: {str(e)}")
+ if self.langfuse_config.langfuse_client:
+ langfuse = self.langfuse_config.langfuse_client
+ langfuse.update_current_generation(
+ metadata={
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "response_type": "technical_issue",
+ "refinement_failed": False,
+ }
+ )
# Standardized technical issue; no second LLM call, no citations
if request.environment == "test":
logger.info(
@@ -933,7 +1142,7 @@ def _generate_rag_response(
# ========================================================================
# Vector Indexer Support Methods (Isolated from RAG Pipeline)
# ========================================================================
-
+ @observe(name="create_embeddings_for_indexer", as_type="span")
def create_embeddings_for_indexer(
self,
texts: List[str],
diff --git a/src/llm_orchestrator_config/context_manager.py b/src/llm_orchestrator_config/context_manager.py
index d1e0358..a14447e 100644
--- a/src/llm_orchestrator_config/context_manager.py
+++ b/src/llm_orchestrator_config/context_manager.py
@@ -6,6 +6,7 @@
from src.llm_orchestrator_config.llm_manager import LLMManager
from src.models.request_models import ContextGenerationRequest
+from langfuse import observe
class ContextGenerationManager:
@@ -30,6 +31,7 @@ def __init__(self, llm_manager: LLMManager) -> None:
# Cache structure prepared for future prompt caching implementation
self._cache: Dict[str, Any] = {}
+ @observe(name="generate_context_with_caching", as_type="generation")
def generate_context_with_caching(
self, request: ContextGenerationRequest
) -> Dict[str, Any]:
diff --git a/uv.lock b/uv.lock
index 8653912..5f79bf1 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1104,6 +1104,27 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/58/0d/41a51b40d24ff0384ec4f7ab8dd3dcea8353c05c973836b5e289f1465d4f/langchain_text_splitters-0.3.11-py3-none-any.whl", hash = "sha256:cf079131166a487f1372c8ab5d0bfaa6c0a4291733d9c43a34a16ac9bcd6a393", size = 33845, upload-time = "2025-08-31T23:02:57.195Z" },
]
+[[package]]
+name = "langfuse"
+version = "3.8.1"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "backoff" },
+ { name = "httpx" },
+ { name = "openai" },
+ { name = "opentelemetry-api" },
+ { name = "opentelemetry-exporter-otlp-proto-http" },
+ { name = "opentelemetry-sdk" },
+ { name = "packaging" },
+ { name = "pydantic" },
+ { name = "requests" },
+ { name = "wrapt" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/ca/0b/81f9c6a982f79c112b7f10bfd6f3a4871e6fa3e4fe8d078b6112abfd3c08/langfuse-3.8.1.tar.gz", hash = "sha256:2464ae3f8386d80e1252a0e7406e3be4121e792a74f1b1c21d9950f658e5168d", size = 197401, upload-time = "2025-10-22T13:35:52.572Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/b2/f9/538af0fc4219eb2484ba319483bce3383146f7a0923d5f39e464ad9a504b/langfuse-3.8.1-py3-none-any.whl", hash = "sha256:5b94b66ec0b0de388a8ea1f078b32c1666b5825b36eab863a21fdee78c53b3bb", size = 364580, upload-time = "2025-10-22T13:35:50.597Z" },
+]
+
[[package]]
name = "langsmith"
version = "0.4.37"
@@ -1499,6 +1520,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/28/f0/bd831afbdba74ca2ce3982142a2fad707f8c487e8a3b6fef01f1d5945d1b/opentelemetry_exporter_otlp_proto_grpc-1.38.0-py3-none-any.whl", hash = "sha256:7c49fd9b4bd0dbe9ba13d91f764c2d20b0025649a6e4ac35792fb8d84d764bc7", size = 19695, upload-time = "2025-10-16T08:35:35.053Z" },
]
+[[package]]
+name = "opentelemetry-exporter-otlp-proto-http"
+version = "1.38.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "googleapis-common-protos" },
+ { name = "opentelemetry-api" },
+ { name = "opentelemetry-exporter-otlp-proto-common" },
+ { name = "opentelemetry-proto" },
+ { name = "opentelemetry-sdk" },
+ { name = "requests" },
+ { name = "typing-extensions" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/81/0a/debcdfb029fbd1ccd1563f7c287b89a6f7bef3b2902ade56797bfd020854/opentelemetry_exporter_otlp_proto_http-1.38.0.tar.gz", hash = "sha256:f16bd44baf15cbe07633c5112ffc68229d0edbeac7b37610be0b2def4e21e90b", size = 17282, upload-time = "2025-10-16T08:35:54.422Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/e5/77/154004c99fb9f291f74aa0822a2f5bbf565a72d8126b3a1b63ed8e5f83c7/opentelemetry_exporter_otlp_proto_http-1.38.0-py3-none-any.whl", hash = "sha256:84b937305edfc563f08ec69b9cb2298be8188371217e867c1854d77198d0825b", size = 19579, upload-time = "2025-10-16T08:35:36.269Z" },
+]
+
[[package]]
name = "opentelemetry-proto"
version = "1.38.0"
@@ -2083,6 +2122,7 @@ dependencies = [
{ name = "dspy" },
{ name = "fastapi" },
{ name = "hvac" },
+ { name = "langfuse" },
{ name = "loguru" },
{ name = "nemoguardrails" },
{ name = "numpy" },
@@ -2114,6 +2154,7 @@ requires-dist = [
{ name = "dspy", specifier = ">=3.0.3" },
{ name = "fastapi", specifier = ">=0.116.1" },
{ name = "hvac", specifier = ">=2.3.0" },
+ { name = "langfuse", specifier = ">=3.8.1" },
{ name = "loguru", specifier = ">=0.7.3" },
{ name = "nemoguardrails", specifier = ">=0.16.0" },
{ name = "numpy", specifier = ">=2.3.2" },
@@ -2663,23 +2704,21 @@ wheels = [
[[package]]
name = "wrapt"
-version = "2.0.0"
+version = "1.17.3"
source = { registry = "https://pypi.org/simple" }
-sdist = { url = "https://files.pythonhosted.org/packages/49/19/5e5bcd855d808892fe02d49219f97a50f64cd6d8313d75df3494ee97b1a3/wrapt-2.0.0.tar.gz", hash = "sha256:35a542cc7a962331d0279735c30995b024e852cf40481e384fd63caaa391cbb9", size = 81722, upload-time = "2025-10-19T23:47:54.07Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/3c/28/7f266b5bf50c3ad0c99c524d99faa0f7d6eecb045d950e7d2c9e1f0e1338/wrapt-2.0.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:73c6f734aecb1a030d9a265c13a425897e1ea821b73249bb14471445467ca71c", size = 78078, upload-time = "2025-10-19T23:45:58.855Z" },
- { url = "https://files.pythonhosted.org/packages/06/0c/bbdcad7eb535fae9d6b0fcfa3995c364797cd8e2b423bba5559ab2d88dcf/wrapt-2.0.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b4a7f8023b8ce8a36370154733c747f8d65c8697cb977d8b6efeb89291fff23e", size = 61158, upload-time = "2025-10-19T23:46:00.096Z" },
- { url = "https://files.pythonhosted.org/packages/d3/8a/bba3e7a4ebf4d1624103ee59d97b78a1fbb08fb5753ff5d1b69f5ef5e863/wrapt-2.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a1cb62f686c50e9dab5983c68f6c8e9cbf14a6007935e683662898a7d892fa69", size = 61646, upload-time = "2025-10-19T23:46:01.279Z" },
- { url = "https://files.pythonhosted.org/packages/ff/0c/0f565294897a72493dbafe7b46229b5f09f3776795a894d6b737e98387de/wrapt-2.0.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:43dc0550ae15e33e6bb45a82a5e1b5495be2587fbaa996244b509921810ee49f", size = 121442, upload-time = "2025-10-19T23:46:04.287Z" },
- { url = "https://files.pythonhosted.org/packages/da/80/7f03501a8a078ad79b19b1a888f9192a9494e62ddf8985267902766a4f30/wrapt-2.0.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:39c5b45b056d630545e40674d1f5e1b51864b3546f25ab6a4a331943de96262e", size = 123018, upload-time = "2025-10-19T23:46:06.052Z" },
- { url = "https://files.pythonhosted.org/packages/37/6b/ad0e1ff98359f13b4b0c2c52848e792841146fe79ac5f56899b9a028fc0d/wrapt-2.0.0-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:804e88f824b76240a1b670330637ccfd2d18b9efa3bb4f02eb20b2f64880b324", size = 117369, upload-time = "2025-10-19T23:46:02.53Z" },
- { url = "https://files.pythonhosted.org/packages/ac/6c/a90437bba8cb1ce2ed639af979515e09784678c2a7f4ffc79f2cf7de809e/wrapt-2.0.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c2c476aa3fc2b9899c3f7b20963fac4f952e7edb74a31fc92f7745389a2e3618", size = 121453, upload-time = "2025-10-19T23:46:07.747Z" },
- { url = "https://files.pythonhosted.org/packages/2c/a9/b3982f9bd15bd45857a23c48b7c36e47d05db4a4dcc5061c31f169238845/wrapt-2.0.0-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:8d851e526891216f89fcb7a1820dad9bd503ba3468fb9635ee28e93c781aa98e", size = 116250, upload-time = "2025-10-19T23:46:09.385Z" },
- { url = "https://files.pythonhosted.org/packages/73/e2/b7a8b1afac9f791d8f5eac0d9726559f1d7ec4a2b5a6b4e67ac145b007a5/wrapt-2.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:b95733c2360c4a8656ee93c7af78e84c0bd617da04a236d7a456c8faa34e7a2d", size = 120575, upload-time = "2025-10-19T23:46:11.882Z" },
- { url = "https://files.pythonhosted.org/packages/a2/0f/37920eeea96094f450ae35505d39f1135df951a2cdee0d4e01d4f843396a/wrapt-2.0.0-cp312-cp312-win32.whl", hash = "sha256:ea56817176834edf143df1109ae8fdaa087be82fdad3492648de0baa8ae82bf2", size = 58175, upload-time = "2025-10-19T23:46:15.678Z" },
- { url = "https://files.pythonhosted.org/packages/f0/db/b395f3b0c7f2c60d9219afacc54ceb699801ccf2d3d969ba556dc6d3af20/wrapt-2.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:3c7d3bee7be7a2665286103f4d1f15405c8074e6e1f89dac5774f9357c9a3809", size = 60415, upload-time = "2025-10-19T23:46:12.913Z" },
- { url = "https://files.pythonhosted.org/packages/86/22/33d660214548af47fc59d9eec8c0e0693bcedc5b3a0b52e8cbdd61f3b646/wrapt-2.0.0-cp312-cp312-win_arm64.whl", hash = "sha256:680f707e1d26acbc60926659799b15659f077df5897a6791c7c598a5d4a211c4", size = 58911, upload-time = "2025-10-19T23:46:13.889Z" },
- { url = "https://files.pythonhosted.org/packages/00/5c/c34575f96a0a038579683c7f10fca943c15c7946037d1d254ab9db1536ec/wrapt-2.0.0-py3-none-any.whl", hash = "sha256:02482fb0df89857e35427dfb844319417e14fae05878f295ee43fa3bf3b15502", size = 43998, upload-time = "2025-10-19T23:47:52.858Z" },
+sdist = { url = "https://files.pythonhosted.org/packages/95/8f/aeb76c5b46e273670962298c23e7ddde79916cb74db802131d49a85e4b7d/wrapt-1.17.3.tar.gz", hash = "sha256:f66eb08feaa410fe4eebd17f2a2c8e2e46d3476e9f8c783daa8e09e0faa666d0", size = 55547, upload-time = "2025-08-12T05:53:21.714Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/9f/41/cad1aba93e752f1f9268c77270da3c469883d56e2798e7df6240dcb2287b/wrapt-1.17.3-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:ab232e7fdb44cdfbf55fc3afa31bcdb0d8980b9b95c38b6405df2acb672af0e0", size = 53998, upload-time = "2025-08-12T05:51:47.138Z" },
+ { url = "https://files.pythonhosted.org/packages/60/f8/096a7cc13097a1869fe44efe68dace40d2a16ecb853141394047f0780b96/wrapt-1.17.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:9baa544e6acc91130e926e8c802a17f3b16fbea0fd441b5a60f5cf2cc5c3deba", size = 39020, upload-time = "2025-08-12T05:51:35.906Z" },
+ { url = "https://files.pythonhosted.org/packages/33/df/bdf864b8997aab4febb96a9ae5c124f700a5abd9b5e13d2a3214ec4be705/wrapt-1.17.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6b538e31eca1a7ea4605e44f81a48aa24c4632a277431a6ed3f328835901f4fd", size = 39098, upload-time = "2025-08-12T05:51:57.474Z" },
+ { url = "https://files.pythonhosted.org/packages/9f/81/5d931d78d0eb732b95dc3ddaeeb71c8bb572fb01356e9133916cd729ecdd/wrapt-1.17.3-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:042ec3bb8f319c147b1301f2393bc19dba6e176b7da446853406d041c36c7828", size = 88036, upload-time = "2025-08-12T05:52:34.784Z" },
+ { url = "https://files.pythonhosted.org/packages/ca/38/2e1785df03b3d72d34fc6252d91d9d12dc27a5c89caef3335a1bbb8908ca/wrapt-1.17.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3af60380ba0b7b5aeb329bc4e402acd25bd877e98b3727b0135cb5c2efdaefe9", size = 88156, upload-time = "2025-08-12T05:52:13.599Z" },
+ { url = "https://files.pythonhosted.org/packages/b3/8b/48cdb60fe0603e34e05cffda0b2a4adab81fd43718e11111a4b0100fd7c1/wrapt-1.17.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0b02e424deef65c9f7326d8c19220a2c9040c51dc165cddb732f16198c168396", size = 87102, upload-time = "2025-08-12T05:52:14.56Z" },
+ { url = "https://files.pythonhosted.org/packages/3c/51/d81abca783b58f40a154f1b2c56db1d2d9e0d04fa2d4224e357529f57a57/wrapt-1.17.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:74afa28374a3c3a11b3b5e5fca0ae03bef8450d6aa3ab3a1e2c30e3a75d023dc", size = 87732, upload-time = "2025-08-12T05:52:36.165Z" },
+ { url = "https://files.pythonhosted.org/packages/9e/b1/43b286ca1392a006d5336412d41663eeef1ad57485f3e52c767376ba7e5a/wrapt-1.17.3-cp312-cp312-win32.whl", hash = "sha256:4da9f45279fff3543c371d5ababc57a0384f70be244de7759c85a7f989cb4ebe", size = 36705, upload-time = "2025-08-12T05:53:07.123Z" },
+ { url = "https://files.pythonhosted.org/packages/28/de/49493f962bd3c586ab4b88066e967aa2e0703d6ef2c43aa28cb83bf7b507/wrapt-1.17.3-cp312-cp312-win_amd64.whl", hash = "sha256:e71d5c6ebac14875668a1e90baf2ea0ef5b7ac7918355850c0908ae82bcb297c", size = 38877, upload-time = "2025-08-12T05:53:05.436Z" },
+ { url = "https://files.pythonhosted.org/packages/f1/48/0f7102fe9cb1e8a5a77f80d4f0956d62d97034bbe88d33e94699f99d181d/wrapt-1.17.3-cp312-cp312-win_arm64.whl", hash = "sha256:604d076c55e2fdd4c1c03d06dc1a31b95130010517b5019db15365ec4a405fc6", size = 36885, upload-time = "2025-08-12T05:52:54.367Z" },
+ { url = "https://files.pythonhosted.org/packages/1f/f6/a933bd70f98e9cf3e08167fc5cd7aaaca49147e48411c0bd5ae701bb2194/wrapt-1.17.3-py3-none-any.whl", hash = "sha256:7171ae35d2c33d326ac19dd8facb1e82e5fd04ef8c6c0e394d7af55a55051c22", size = 23591, upload-time = "2025-08-12T05:53:20.674Z" },
]
[[package]]
diff --git a/vault/agent-out/pidfile b/vault/agent-out/pidfile
deleted file mode 100644
index e69de29..0000000