diff --git a/src/memos/api/handlers/chat_handler.py b/src/memos/api/handlers/chat_handler.py index bcc3669b6..3e9d1e5ec 100644 --- a/src/memos/api/handlers/chat_handler.py +++ b/src/memos/api/handlers/chat_handler.py @@ -99,15 +99,13 @@ def __init__( def handle_chat_complete(self, chat_req: APIChatCompleteRequest) -> dict[str, Any]: """ - Chat with MemOS for complete response (non-streaming). - - This implementation directly uses search/add handlers instead of mos_server. + Chat with MemOS for chat complete response (non-streaming). Args: chat_req: Chat complete request Returns: - Dictionary with response and references + Dictionary with chat complete response and reasoning Raises: HTTPException: If chat fails @@ -161,7 +159,7 @@ def handle_chat_complete(self, chat_req: APIChatCompleteRequest) -> dict[str, An {"role": "user", "content": chat_req.query}, ] - self.logger.info("Starting to generate complete response...") + self.logger.info("[Cloud Service] Starting to generate chat complete response...") # Step 3: Generate complete response from LLM if chat_req.model_name_or_path and chat_req.model_name_or_path not in self.chat_llms: @@ -172,11 +170,23 @@ def handle_chat_complete(self, chat_req: APIChatCompleteRequest) -> dict[str, An model = chat_req.model_name_or_path or next(iter(self.chat_llms.keys())) - self.logger.info(f"[Cloud Service Chat Complete Model]: {model}") + self.logger.info(f"[Cloud Service] Chat Complete Model: {model}") strat = time.time() response = self.chat_llms[model].generate(current_messages, model_name_or_path=model) end = time.time() - self.logger.info(f"[Cloud Service Chat Complete Time]: {end - strat} seconds") + self.logger.info(f"[Cloud Service] Chat Complete Time: {end - strat} seconds") + + if not response: + self.logger.error( + f"[Cloud Service] Chat Complete Failed, LLM response is {response}" + ) + raise HTTPException( + status_code=500, detail="Chat complete failed, LLM response is None" + ) + + self.logger.info( + f"[Cloud Service] Chat Complete LLM Input: {json.dumps(current_messages, ensure_ascii=False)} Chat Complete LLM Response: {response}" + ) # Step 4: start add after chat asynchronously if chat_req.add_message_on_answer: @@ -192,7 +202,7 @@ def handle_chat_complete(self, chat_req: APIChatCompleteRequest) -> dict[str, An async_mode="async", ) end = time.time() - self.logger.info(f"[Cloud Service Chat Add Time]: {end - start} seconds") + self.logger.info(f"[Cloud Service] Chat Add Time: {end - start} seconds") match = re.search(r"([\s\S]*?)", response) reasoning_text = match.group(1) if match else None @@ -208,14 +218,12 @@ def handle_chat_complete(self, chat_req: APIChatCompleteRequest) -> dict[str, An except ValueError as err: raise HTTPException(status_code=404, detail=str(traceback.format_exc())) from err except Exception as err: - self.logger.error(f"Failed to complete chat: {traceback.format_exc()}") + self.logger.error(f"[Cloud Service] Failed to chat complete: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(traceback.format_exc())) from err def handle_chat_stream(self, chat_req: ChatRequest) -> StreamingResponse: """ - Chat with MemOS via Server-Sent Events (SSE) stream using search/add handlers. - - This implementation directly uses search_handler and add_handler. + Chat with MemOS via Server-Sent Events (SSE) stream for chat stream response. Args: chat_req: Chat stream request @@ -229,7 +237,7 @@ def handle_chat_stream(self, chat_req: ChatRequest) -> StreamingResponse: try: def generate_chat_response() -> Generator[str, None, None]: - """Generate chat response as SSE stream.""" + """Generate chat stream response as SSE stream.""" try: # Resolve readable cube IDs (for search) readable_cube_ids = chat_req.readable_cube_ids or ( @@ -289,7 +297,7 @@ def generate_chat_response() -> Generator[str, None, None]: ] self.logger.info( - f"user_id: {chat_req.user_id}, readable_cube_ids: {readable_cube_ids}, " + f"[Cloud Service] chat stream user_id: {chat_req.user_id}, readable_cube_ids: {readable_cube_ids}, " f"current_system_prompt: {system_prompt}" ) @@ -304,14 +312,12 @@ def generate_chat_response() -> Generator[str, None, None]: ) model = chat_req.model_name_or_path or next(iter(self.chat_llms.keys())) - self.logger.info(f"[Cloud Service Chat Stream Model]: {model}") + self.logger.info(f"[Cloud Service] Chat Stream Model: {model}") start = time.time() response_stream = self.chat_llms[model].generate_stream( current_messages, model_name_or_path=model ) - end = time.time() - self.logger.info(f"[Cloud Service Chat Stream Time]: {end - start} seconds") # Stream the response buffer = "" @@ -337,6 +343,13 @@ def generate_chat_response() -> Generator[str, None, None]: chunk_data = f"data: {json.dumps({'type': 'text', 'data': chunk}, ensure_ascii=False)}\n\n" yield chunk_data + end = time.time() + self.logger.info(f"[Cloud Service] Chat Stream Time: {end - start} seconds") + + self.logger.info( + f"[Cloud Service] Chat Stream LLM Input: {json.dumps(current_messages, ensure_ascii=False)} Chat Stream LLM Response: {full_response}" + ) + current_messages.append({"role": "assistant", "content": full_response}) if chat_req.add_message_on_answer: # Resolve writable cube IDs (for add) @@ -354,10 +367,10 @@ def generate_chat_response() -> Generator[str, None, None]: ) end = time.time() self.logger.info( - f"[Cloud Service Chat Stream Add Time]: {end - start} seconds" + f"[Cloud Service] Chat Stream Add Time: {end - start} seconds" ) except Exception as e: - self.logger.error(f"Error in chat stream: {e}", exc_info=True) + self.logger.error(f"[Cloud Service] Error in chat stream: {e}", exc_info=True) error_data = f"data: {json.dumps({'type': 'error', 'content': str(traceback.format_exc())})}\n\n" yield error_data @@ -377,14 +390,14 @@ def generate_chat_response() -> Generator[str, None, None]: except ValueError as err: raise HTTPException(status_code=404, detail=str(traceback.format_exc())) from err except Exception as err: - self.logger.error(f"Failed to start chat stream: {traceback.format_exc()}") + self.logger.error( + f"[Cloud Service] Failed to start chat stream: {traceback.format_exc()}" + ) raise HTTPException(status_code=500, detail=str(traceback.format_exc())) from err def handle_chat_stream_playground(self, chat_req: ChatPlaygroundRequest) -> StreamingResponse: """ - Chat with MemOS via Server-Sent Events (SSE) stream using search/add handlers. - - This implementation directly uses search_handler and add_handler. + Chat with MemOS via Server-Sent Events (SSE) stream for playground chat stream response. Args: chat_req: Chat stream request @@ -398,7 +411,7 @@ def handle_chat_stream_playground(self, chat_req: ChatPlaygroundRequest) -> Stre try: def generate_chat_response() -> Generator[str, None, None]: - """Generate chat response as SSE stream.""" + """Generate playground chat stream response as SSE stream.""" try: import time @@ -434,7 +447,9 @@ def generate_chat_response() -> Generator[str, None, None]: start_time = time.time() search_response = self.search_handler.handle_search_memories(search_req) end_time = time.time() - self.logger.info(f"first search time: {end_time - start_time}") + self.logger.info( + f"[PLAYGROUND CHAT] first search time: {end_time - start_time}" + ) yield f"data: {json.dumps({'type': 'status', 'data': '1'})}\n\n" @@ -481,7 +496,7 @@ def generate_chat_response() -> Generator[str, None, None]: conversation=chat_req.history, mode="fine", ) - self.logger.info(f"[PLAYGROUND chat parsed_goal]: {parsed_goal}") + self.logger.info(f"[PLAYGROUND CHAT] parsed_goal: {parsed_goal}") if chat_req.beginner_guide_step == "first": chat_req.internet_search = False @@ -512,12 +527,14 @@ def generate_chat_response() -> Generator[str, None, None]: search_tool_memory=False, ) - self.logger.info(f"[PLAYGROUND second search query]: {search_req.query}") + self.logger.info(f"[PLAYGROUND CHAT] second search query: {search_req.query}") start_time = time.time() search_response = self.search_handler.handle_search_memories(search_req) end_time = time.time() - self.logger.info(f"second search time: {end_time - start_time}") + self.logger.info( + f"[PLAYGROUND CHAT] second search time: {end_time - start_time}" + ) # for playground, add the query to memory without response self._start_add_to_memory( @@ -578,13 +595,15 @@ def generate_chat_response() -> Generator[str, None, None]: ] self.logger.info( - f"user_id: {chat_req.user_id}, readable_cube_ids: {readable_cube_ids}, " + f"[PLAYGROUND CHAT] user_id: {chat_req.user_id}, readable_cube_ids: {readable_cube_ids}, " f"current_system_prompt: {system_prompt}" ) # Step 3: Generate streaming response from LLM try: model = next(iter(self.chat_llms.keys())) + self.logger.info(f"[PLAYGROUND CHAT] Chat Playground Stream Model: {model}") + start = time.time() response_stream = self.chat_llms[model].generate_stream( current_messages, model_name_or_path=model ) @@ -629,10 +648,19 @@ def generate_chat_response() -> Generator[str, None, None]: chunk_data = f"data: {json.dumps({'type': 'text', 'data': processed_chunk}, ensure_ascii=False)}\n\n" yield chunk_data + end = time.time() + self.logger.info( + f"[PLAYGROUND CHAT] Chat Playground Stream Time: {end - start} seconds" + ) + self.logger.info( + f"[PLAYGROUND CHAT] Chat Playground Stream LLM Input: {json.dumps(current_messages, ensure_ascii=False)} Chat Playground Stream LLM Response: {full_response}" + ) + except Exception as llm_error: # Log the error self.logger.error( - f"Error during LLM generation: {llm_error}", exc_info=True + f"[PLAYGROUND CHAT] Error during LLM generation: {llm_error}", + exc_info=True, ) # Send error message to client error_msg = f"模型生成错误: {llm_error!s}" @@ -654,7 +682,7 @@ def generate_chat_response() -> Generator[str, None, None]: # Get further suggestion current_messages.append({"role": "assistant", "content": full_response}) further_suggestion = self._get_further_suggestion(current_messages) - self.logger.info(f"further_suggestion: {further_suggestion}") + self.logger.info(f"[PLAYGROUND CHAT] further_suggestion: {further_suggestion}") yield f"data: {json.dumps({'type': 'suggestion', 'data': further_suggestion})}\n\n" yield f"data: {json.dumps({'type': 'end'})}\n\n" @@ -685,7 +713,9 @@ def generate_chat_response() -> Generator[str, None, None]: ) except Exception as e: - self.logger.error(f"Error in chat stream: {e}", exc_info=True) + self.logger.error( + f"[PLAYGROUND CHAT] Error in playground chat stream: {e}", exc_info=True + ) error_data = f"data: {json.dumps({'type': 'error', 'content': str(traceback.format_exc())})}\n\n" yield error_data @@ -705,7 +735,9 @@ def generate_chat_response() -> Generator[str, None, None]: except ValueError as err: raise HTTPException(status_code=404, detail=str(traceback.format_exc())) from err except Exception as err: - self.logger.error(f"Failed to start chat stream: {traceback.format_exc()}") + self.logger.error( + f"[PLAYGROUND CHAT] Failed to start playground chat stream: {traceback.format_exc()}" + ) raise HTTPException(status_code=500, detail=str(traceback.format_exc())) from err def _dedup_and_supplement_memories( diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 7af3afe74..56f8ac195 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -177,7 +177,11 @@ def init_server() -> dict[str, Any]: else None ) llm = LLMFactory.from_config(llm_config) - chat_llms = _init_chat_llms(chat_llm_config) + chat_llms = ( + _init_chat_llms(chat_llm_config) + if os.getenv("ENABLE_CHAT_API", "false") == "true" + else None + ) embedder = EmbedderFactory.from_config(embedder_config) mem_reader = MemReaderFactory.from_config(mem_reader_config) reranker = RerankerFactory.from_config(reranker_config) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index fcb70a64c..37ca361ea 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -15,7 +15,7 @@ import random as _random import socket -from fastapi import APIRouter, Query +from fastapi import APIRouter, HTTPException, Query from memos.api import handlers from memos.api.handlers.add_handler import AddHandler @@ -64,12 +64,16 @@ # Initialize all handlers with dependency injection search_handler = SearchHandler(dependencies) add_handler = AddHandler(dependencies) -chat_handler = ChatHandler( - dependencies, - components["chat_llms"], - search_handler, - add_handler, - online_bot=components.get("online_bot"), +chat_handler = ( + ChatHandler( + dependencies, + components["chat_llms"], + search_handler, + add_handler, + online_bot=components.get("online_bot"), + ) + if os.getenv("ENABLE_CHAT_API", "false") == "true" + else None ) feedback_handler = FeedbackHandler(dependencies) # Extract commonly used components for function-based handlers @@ -201,6 +205,10 @@ def chat_complete(chat_req: APIChatCompleteRequest): This endpoint uses the class-based ChatHandler. """ + if chat_handler is None: + raise HTTPException( + status_code=503, detail="Chat service is not available. Chat handler not initialized." + ) return chat_handler.handle_chat_complete(chat_req) @@ -212,6 +220,10 @@ def chat_stream(chat_req: ChatRequest): This endpoint uses the class-based ChatHandler which internally composes SearchHandler and AddHandler for a clean architecture. """ + if chat_handler is None: + raise HTTPException( + status_code=503, detail="Chat service is not available. Chat handler not initialized." + ) return chat_handler.handle_chat_stream(chat_req) @@ -223,6 +235,10 @@ def chat_stream_playground(chat_req: ChatPlaygroundRequest): This endpoint uses the class-based ChatHandler which internally composes SearchHandler and AddHandler for a clean architecture. """ + if chat_handler is None: + raise HTTPException( + status_code=503, detail="Chat service is not available. Chat handler not initialized." + ) return chat_handler.handle_chat_stream_playground(chat_req) diff --git a/src/memos/memories/textual/prefer_text_memory/extractor.py b/src/memos/memories/textual/prefer_text_memory/extractor.py index 3404c6d4c..0c6e5339d 100644 --- a/src/memos/memories/textual/prefer_text_memory/extractor.py +++ b/src/memos/memories/textual/prefer_text_memory/extractor.py @@ -70,6 +70,9 @@ def extract_explicit_preference(self, qa_pair: MessageList | str) -> dict[str, A try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) if not response: + logger.error( + f"[prefer_extractor]: (Error) LLM response content is {response} when extracting explicit preference" + ) return None response = response.strip().replace("```json", "").replace("```", "").strip() result = json.loads(response) @@ -95,6 +98,9 @@ def extract_implicit_preference(self, qa_pair: MessageList | str) -> dict[str, A try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) if not response: + logger.error( + f"[prefer_extractor]: (Error) LLM response content is {response} when extracting implicit preference" + ) return None response = response.strip().replace("```json", "").replace("```", "").strip() result = json.loads(response) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index ab3d0ce03..a920f7b0e 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -57,6 +57,7 @@ class SingleCubeView(MemCubeView): feedback_server: Any | None = None deepsearch_agent: Any | None = None + @timed def add_memories(self, add_req: APIADDRequest) -> list[dict[str, Any]]: """ This is basically your current handle_add_memories logic, @@ -103,6 +104,7 @@ def add_memories(self, add_req: APIADDRequest) -> list[dict[str, Any]]: return all_memories + @timed def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: # Create UserContext object user_context = UserContext( @@ -150,6 +152,7 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: self.logger.info(f"Search {len(memories_result)} memories.") return memories_result + @timed def feedback_memories(self, feedback_req: APIFeedbackRequest) -> dict[str, Any]: target_session_id = feedback_req.session_id or "default_session" if feedback_req.async_mode == "async": @@ -554,6 +557,7 @@ def _schedule_memory_tasks( ) self.mem_scheduler.submit_messages(messages=[message_item_add]) + @timed def _process_pref_mem( self, add_req: APIADDRequest, @@ -732,6 +736,7 @@ def add_before_search( self.logger.error(f"[add_before_search] LLM execution error: {e}") return memory_list + @timed def _process_text_mem( self, add_req: APIADDRequest,