diff --git a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py index bfa9ea2b1..bb809e69d 100644 --- a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py +++ b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py @@ -41,15 +41,24 @@ def add_id_to_mysql(memory_id: str, mem_cube_id: str): skill_mysql_bearer = os.getenv("SKILLS_MYSQL_BEARER", "") if not skill_mysql_url or not skill_mysql_bearer: - logger.warning("SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set") + logger.warning("[PROCESS_SKILLS] SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set") return None headers = {"Authorization": skill_mysql_bearer, "Content-Type": "application/json"} data = {"memCubeId": mem_cube_id, "skillId": memory_id} try: response = requests.post(skill_mysql_url, headers=headers, json=data) + + logger.info(f"[PROCESS_SKILLS] response: \n\n{response.json()}") + logger.info(f"[PROCESS_SKILLS] memory_id: \n\n{memory_id}") + logger.info(f"[PROCESS_SKILLS] mem_cube_id: \n\n{mem_cube_id}") + logger.info(f"[PROCESS_SKILLS] skill_mysql_url: \n\n{skill_mysql_url}") + logger.info(f"[PROCESS_SKILLS] skill_mysql_bearer: \n\n{skill_mysql_bearer}") + logger.info(f"[PROCESS_SKILLS] headers: \n\n{headers}") + logger.info(f"[PROCESS_SKILLS] data: \n\n{data}") + return response.json() except Exception as e: - logger.warning(f"Error adding id to mysql: {e}") + logger.warning(f"[PROCESS_SKILLS] Error adding id to mysql: {e}") return None @@ -90,7 +99,7 @@ def _reconstruct_messages_from_memory_items(memory_items: list[TextualMemoryItem reconstructed_messages.append({"role": role, "content": content}) seen.add(message_key) except Exception as e: - logger.warning(f"Error reconstructing message: {e}") + logger.warning(f"[PROCESS_SKILLS] Error reconstructing message: {e}") continue return reconstructed_messages @@ -121,9 +130,11 @@ def _split_task_chunk_by_llm(llm: BaseLLM, messages: MessageList) -> dict[str, M response_json = json.loads(response_text.replace("```json", "").replace("```", "")) break except Exception as e: - logger.warning(f"LLM generate failed (attempt {attempt + 1}): {e}") + logger.warning(f"[PROCESS_SKILLS] LLM generate failed (attempt {attempt + 1}): {e}") if attempt == 2: - logger.warning("LLM generate failed after 3 retries, returning empty dict") + logger.warning( + "[PROCESS_SKILLS] LLM generate failed after 3 retries, returning empty dict" + ) response_json = [] break @@ -135,7 +146,7 @@ def _split_task_chunk_by_llm(llm: BaseLLM, messages: MessageList) -> dict[str, M # Validate that indices is a list/tuple with exactly 2 elements if not isinstance(indices, list | tuple) or len(indices) != 2: logger.warning( - f"Invalid message indices format for task '{task_name}': {indices}, skipping" + f"[PROCESS_SKILLS] Invalid message indices format for task '{task_name}': {indices}, skipping" ) continue start, end = indices @@ -196,21 +207,27 @@ def _extract_skill_memory_by_llm( # If LLM returns null (parsed as None), log and return None if skill_memory is None: - logger.info("No skill memory extracted from conversation (LLM returned null)") + logger.info( + "[PROCESS_SKILLS] No skill memory extracted from conversation (LLM returned null)" + ) return None return skill_memory except json.JSONDecodeError as e: - logger.warning(f"JSON decode failed (attempt {attempt + 1}): {e}") - logger.debug(f"Response text: {response_text}") + logger.warning(f"[PROCESS_SKILLS] JSON decode failed (attempt {attempt + 1}): {e}") + logger.debug(f"[PROCESS_SKILLS] Response text: {response_text}") if attempt == 2: - logger.warning("Failed to parse skill memory after 3 retries") + logger.warning("[PROCESS_SKILLS] Failed to parse skill memory after 3 retries") return None except Exception as e: - logger.warning(f"LLM skill memory extraction failed (attempt {attempt + 1}): {e}") + logger.warning( + f"[PROCESS_SKILLS] LLM skill memory extraction failed (attempt {attempt + 1}): {e}" + ) if attempt == 2: - logger.warning("LLM skill memory extraction failed after 3 retries") + logger.warning( + "[PROCESS_SKILLS] LLM skill memory extraction failed after 3 retries" + ) return None return None @@ -262,13 +279,15 @@ def _rewrite_query(task_type: str, messages: MessageList, llm: BaseLLM, rewrite_ response_text = llm.generate(prompt) # Clean up response (remove any markdown formatting if present) response_text = response_text.strip() - logger.info(f"Rewritten query for task '{task_type}': {response_text}") + logger.info(f"[PROCESS_SKILLS] Rewritten query for task '{task_type}': {response_text}") return response_text except Exception as e: - logger.warning(f"LLM query rewrite failed (attempt {attempt + 1}): {e}") + logger.warning( + f"[PROCESS_SKILLS] LLM query rewrite failed (attempt {attempt + 1}): {e}" + ) if attempt == 2: logger.warning( - "LLM query rewrite failed after 3 retries, returning first message content" + "[PROCESS_SKILLS] LLM query rewrite failed after 3 retries, returning first message content" ) return messages[0]["content"] if messages else "" @@ -292,7 +311,7 @@ def _upload_skills_to_oss(local_file_path: str, oss_file_path: str, client: Any) ) if result.status_code != 200: - logger.warning("Failed to upload skill to OSS") + logger.warning("[PROCESS_SKILLS] Failed to upload skill to OSS") return "" # Construct and return the URL @@ -440,7 +459,7 @@ def _write_skills_to_file( arcname = Path(skill_dir.name) / file_path.relative_to(skill_dir) zipf.write(str(file_path), str(arcname)) - logger.info(f"Created skill zip file: {zip_path}") + logger.info(f"[PROCESS_SKILLS] Created skill zip file: {zip_path}") return str(zip_path) @@ -509,11 +528,13 @@ def process_skill_memory_fine( ) -> list[TextualMemoryItem]: # Validate required configurations if not oss_config: - logger.warning("OSS configuration is required for skill memory processing") + logger.warning("[PROCESS_SKILLS] OSS configuration is required for skill memory processing") return [] if not skills_dir_config: - logger.warning("Skills directory configuration is required for skill memory processing") + logger.warning( + "[PROCESS_SKILLS] Skills directory configuration is required for skill memory processing" + ) return [] # Validate skills_dir has required keys @@ -521,13 +542,13 @@ def process_skill_memory_fine( missing_keys = [key for key in required_keys if key not in skills_dir_config] if missing_keys: logger.warning( - f"Skills directory configuration missing required keys: {', '.join(missing_keys)}" + f"[PROCESS_SKILLS] Skills directory configuration missing required keys: {', '.join(missing_keys)}" ) return [] oss_client = create_oss_client(oss_config) if not oss_client: - logger.warning("Failed to create OSS client") + logger.warning("[PROCESS_SKILLS] Failed to create OSS client") return [] messages = _reconstruct_messages_from_memory_items(fast_memory_items) @@ -535,7 +556,7 @@ def process_skill_memory_fine( task_chunks = _split_task_chunk_by_llm(llm, messages) if not task_chunks: - logger.warning("No task chunks found") + logger.warning("[PROCESS_SKILLS] No task chunks found") return [] # recall - get related skill memories for each task separately (parallel) @@ -560,7 +581,9 @@ def process_skill_memory_fine( related_memories = future.result() related_skill_memories_by_task[task_name] = related_memories except Exception as e: - logger.warning(f"Error recalling skill memories for task '{task_name}': {e}") + logger.warning( + f"[PROCESS_SKILLS] Error recalling skill memories for task '{task_name}': {e}" + ) related_skill_memories_by_task[task_name] = [] skill_memories = [] @@ -580,7 +603,7 @@ def process_skill_memory_fine( if skill_memory: # Only add non-None results skill_memories.append(skill_memory) except Exception as e: - logger.warning(f"Error extracting skill memory: {e}") + logger.warning(f"[PROCESS_SKILLS] Error extracting skill memory: {e}") continue # write skills to file and get zip paths @@ -598,7 +621,7 @@ def process_skill_memory_fine( skill_memory = futures[future] skill_memory_with_paths.append((skill_memory, zip_path)) except Exception as e: - logger.warning(f"Error writing skills to file: {e}") + logger.warning(f"[PROCESS_SKILLS] Error writing skills to file: {e}") continue # Create a mapping from old_memory_id to old memory for easy lookup @@ -630,14 +653,20 @@ def process_skill_memory_fine( Path(skills_dir_config["skills_oss_dir"]) / user_id / zip_filename ).as_posix() _delete_skills_from_oss(old_oss_path, oss_client) - logger.info(f"Deleted old skill from OSS: {old_oss_path}") + logger.info( + f"[PROCESS_SKILLS] Deleted old skill from OSS: {old_oss_path}" + ) except Exception as e: - logger.warning(f"Failed to delete old skill from OSS: {e}") + logger.warning( + f"[PROCESS_SKILLS] Failed to delete old skill from OSS: {e}" + ) # delete old skill from graph db if graph_db: graph_db.delete_node_by_prams(memory_ids=[old_memory_id]) - logger.info(f"Deleted old skill from graph db: {old_memory_id}") + logger.info( + f"[PROCESS_SKILLS] Deleted old skill from graph db: {old_memory_id}" + ) # Upload new skill to OSS # Use the same filename as the local zip file @@ -654,9 +683,9 @@ def process_skill_memory_fine( # Set URL directly to skill_memory skill_memory["url"] = url - logger.info(f"Uploaded skill to OSS: {url}") + logger.info(f"[PROCESS_SKILLS] Uploaded skill to OSS: {url}") except Exception as e: - logger.warning(f"Error uploading skill to OSS: {e}") + logger.warning(f"[PROCESS_SKILLS] Error uploading skill to OSS: {e}") skill_memory["url"] = "" # Set to empty string if upload fails finally: # Clean up local files after upload @@ -669,9 +698,9 @@ def process_skill_memory_fine( # Delete skill directory if skill_dir.exists(): shutil.rmtree(skill_dir) - logger.info(f"Cleaned up local files: {zip_path} and {skill_dir}") + logger.info(f"[PROCESS_SKILLS] Cleaned up local files: {zip_path} and {skill_dir}") except Exception as cleanup_error: - logger.warning(f"Error cleaning up local files: {cleanup_error}") + logger.warning(f"[PROCESS_SKILLS] Error cleaning up local files: {cleanup_error}") # Create TextualMemoryItem objects skill_memory_items = [] @@ -680,7 +709,7 @@ def process_skill_memory_fine( memory_item = create_skill_memory_item(skill_memory, info, embedder) skill_memory_items.append(memory_item) except Exception as e: - logger.warning(f"Error creating skill memory item: {e}") + logger.warning(f"[PROCESS_SKILLS] Error creating skill memory item: {e}") continue # TODO: deprecate this funtion and call