Skip to content
Merged
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dbac035
feat: skill memory
Jan 23, 2026
36f626a
feat: split task chunks for skill memories
endxxxx Jan 23, 2026
ec9316d
fix: refine the returned format from llm and parsing
endxxxx Jan 23, 2026
7702670
Merge pull request #4 from Wang-Daoji/feat/split_chunks
Wang-Daoji Jan 24, 2026
0d33b1d
feat: add new pack oss
Jan 24, 2026
3a88419
feat: skill mem pipeline
Jan 25, 2026
2903152
feat: fill code
Jan 26, 2026
bd119d6
feat: modify code
Jan 26, 2026
4173f7b
feat: modify code
Jan 26, 2026
bccba71
feat: async add skill memory
Jan 27, 2026
14f85e0
feat: update ollama version
Jan 27, 2026
b3c79ac
feat: get memory return skill memory
Jan 27, 2026
76f1975
feat: get api add skill mem
Jan 27, 2026
687cf9d
feat: get api add skill mem
Jan 27, 2026
8555b1d
feat: modify env config
Jan 27, 2026
ae67378
feat: back set oss client
Jan 27, 2026
793b508
feat: delete tmp skill code
Jan 27, 2026
1824f5b
feat: merge main
Jan 27, 2026
e3ef4cc
feat: process new package import error
Jan 27, 2026
f14fd58
Merge remote-tracking branch 'upstream/dev-20260126-v2.0.4' into feat…
Jan 27, 2026
6ba55d3
feat: modify oss config
Jan 27, 2026
85e42d9
feat: modiy prompt and add two api
Jan 28, 2026
be17f3f
feat: merge dev-20260126-v2.0.4
Jan 28, 2026
962f804
feat: modify prompt
Jan 28, 2026
aeeb27e
feat: merge
Jan 28, 2026
bbb6e79
feat: modify code
Jan 28, 2026
a32ec5f
Merge remote-tracking branch 'upstream/dev-20260126-v2.0.4' into feat…
Jan 28, 2026
dcfb772
feat: add logger
Jan 29, 2026
bc40783
Merge remote-tracking branch 'upstream/dev-20260126-v2.0.4' into feat…
Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 62 additions & 33 deletions src/memos/mem_reader/read_skill_memory/process_skill_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,24 @@ def add_id_to_mysql(memory_id: str, mem_cube_id: str):
skill_mysql_bearer = os.getenv("SKILLS_MYSQL_BEARER", "")

if not skill_mysql_url or not skill_mysql_bearer:
logger.warning("SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set")
logger.warning("[PROCESS_SKILLS] SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set")
return None
headers = {"Authorization": skill_mysql_bearer, "Content-Type": "application/json"}
data = {"memCubeId": mem_cube_id, "skillId": memory_id}
try:
response = requests.post(skill_mysql_url, headers=headers, json=data)

logger.info(f"[PROCESS_SKILLS] response: \n\n{response.json()}")
logger.info(f"[PROCESS_SKILLS] memory_id: \n\n{memory_id}")
logger.info(f"[PROCESS_SKILLS] mem_cube_id: \n\n{mem_cube_id}")
logger.info(f"[PROCESS_SKILLS] skill_mysql_url: \n\n{skill_mysql_url}")
logger.info(f"[PROCESS_SKILLS] skill_mysql_bearer: \n\n{skill_mysql_bearer}")
logger.info(f"[PROCESS_SKILLS] headers: \n\n{headers}")
logger.info(f"[PROCESS_SKILLS] data: \n\n{data}")

return response.json()
except Exception as e:
logger.warning(f"Error adding id to mysql: {e}")
logger.warning(f"[PROCESS_SKILLS] Error adding id to mysql: {e}")
return None


Expand Down Expand Up @@ -90,7 +99,7 @@ def _reconstruct_messages_from_memory_items(memory_items: list[TextualMemoryItem
reconstructed_messages.append({"role": role, "content": content})
seen.add(message_key)
except Exception as e:
logger.warning(f"Error reconstructing message: {e}")
logger.warning(f"[PROCESS_SKILLS] Error reconstructing message: {e}")
continue

return reconstructed_messages
Expand Down Expand Up @@ -121,9 +130,11 @@ def _split_task_chunk_by_llm(llm: BaseLLM, messages: MessageList) -> dict[str, M
response_json = json.loads(response_text.replace("```json", "").replace("```", ""))
break
except Exception as e:
logger.warning(f"LLM generate failed (attempt {attempt + 1}): {e}")
logger.warning(f"[PROCESS_SKILLS] LLM generate failed (attempt {attempt + 1}): {e}")
if attempt == 2:
logger.warning("LLM generate failed after 3 retries, returning empty dict")
logger.warning(
"[PROCESS_SKILLS] LLM generate failed after 3 retries, returning empty dict"
)
response_json = []
break

Expand All @@ -135,7 +146,7 @@ def _split_task_chunk_by_llm(llm: BaseLLM, messages: MessageList) -> dict[str, M
# Validate that indices is a list/tuple with exactly 2 elements
if not isinstance(indices, list | tuple) or len(indices) != 2:
logger.warning(
f"Invalid message indices format for task '{task_name}': {indices}, skipping"
f"[PROCESS_SKILLS] Invalid message indices format for task '{task_name}': {indices}, skipping"
)
continue
start, end = indices
Expand Down Expand Up @@ -196,21 +207,27 @@ def _extract_skill_memory_by_llm(

# If LLM returns null (parsed as None), log and return None
if skill_memory is None:
logger.info("No skill memory extracted from conversation (LLM returned null)")
logger.info(
"[PROCESS_SKILLS] No skill memory extracted from conversation (LLM returned null)"
)
return None

return skill_memory

except json.JSONDecodeError as e:
logger.warning(f"JSON decode failed (attempt {attempt + 1}): {e}")
logger.debug(f"Response text: {response_text}")
logger.warning(f"[PROCESS_SKILLS] JSON decode failed (attempt {attempt + 1}): {e}")
logger.debug(f"[PROCESS_SKILLS] Response text: {response_text}")
if attempt == 2:
logger.warning("Failed to parse skill memory after 3 retries")
logger.warning("[PROCESS_SKILLS] Failed to parse skill memory after 3 retries")
return None
except Exception as e:
logger.warning(f"LLM skill memory extraction failed (attempt {attempt + 1}): {e}")
logger.warning(
f"[PROCESS_SKILLS] LLM skill memory extraction failed (attempt {attempt + 1}): {e}"
)
if attempt == 2:
logger.warning("LLM skill memory extraction failed after 3 retries")
logger.warning(
"[PROCESS_SKILLS] LLM skill memory extraction failed after 3 retries"
)
return None

return None
Expand Down Expand Up @@ -262,13 +279,15 @@ def _rewrite_query(task_type: str, messages: MessageList, llm: BaseLLM, rewrite_
response_text = llm.generate(prompt)
# Clean up response (remove any markdown formatting if present)
response_text = response_text.strip()
logger.info(f"Rewritten query for task '{task_type}': {response_text}")
logger.info(f"[PROCESS_SKILLS] Rewritten query for task '{task_type}': {response_text}")
return response_text
except Exception as e:
logger.warning(f"LLM query rewrite failed (attempt {attempt + 1}): {e}")
logger.warning(
f"[PROCESS_SKILLS] LLM query rewrite failed (attempt {attempt + 1}): {e}"
)
if attempt == 2:
logger.warning(
"LLM query rewrite failed after 3 retries, returning first message content"
"[PROCESS_SKILLS] LLM query rewrite failed after 3 retries, returning first message content"
)
return messages[0]["content"] if messages else ""

Expand All @@ -292,7 +311,7 @@ def _upload_skills_to_oss(local_file_path: str, oss_file_path: str, client: Any)
)

if result.status_code != 200:
logger.warning("Failed to upload skill to OSS")
logger.warning("[PROCESS_SKILLS] Failed to upload skill to OSS")
return ""

# Construct and return the URL
Expand Down Expand Up @@ -440,7 +459,7 @@ def _write_skills_to_file(
arcname = Path(skill_dir.name) / file_path.relative_to(skill_dir)
zipf.write(str(file_path), str(arcname))

logger.info(f"Created skill zip file: {zip_path}")
logger.info(f"[PROCESS_SKILLS] Created skill zip file: {zip_path}")
return str(zip_path)


Expand Down Expand Up @@ -509,33 +528,35 @@ def process_skill_memory_fine(
) -> list[TextualMemoryItem]:
# Validate required configurations
if not oss_config:
logger.warning("OSS configuration is required for skill memory processing")
logger.warning("[PROCESS_SKILLS] OSS configuration is required for skill memory processing")
return []

if not skills_dir_config:
logger.warning("Skills directory configuration is required for skill memory processing")
logger.warning(
"[PROCESS_SKILLS] Skills directory configuration is required for skill memory processing"
)
return []

# Validate skills_dir has required keys
required_keys = ["skills_local_dir", "skills_oss_dir"]
missing_keys = [key for key in required_keys if key not in skills_dir_config]
if missing_keys:
logger.warning(
f"Skills directory configuration missing required keys: {', '.join(missing_keys)}"
f"[PROCESS_SKILLS] Skills directory configuration missing required keys: {', '.join(missing_keys)}"
)
return []

oss_client = create_oss_client(oss_config)
if not oss_client:
logger.warning("Failed to create OSS client")
logger.warning("[PROCESS_SKILLS] Failed to create OSS client")
return []

messages = _reconstruct_messages_from_memory_items(fast_memory_items)
messages = _add_index_to_message(messages)

task_chunks = _split_task_chunk_by_llm(llm, messages)
if not task_chunks:
logger.warning("No task chunks found")
logger.warning("[PROCESS_SKILLS] No task chunks found")
return []

# recall - get related skill memories for each task separately (parallel)
Expand All @@ -560,7 +581,9 @@ def process_skill_memory_fine(
related_memories = future.result()
related_skill_memories_by_task[task_name] = related_memories
except Exception as e:
logger.warning(f"Error recalling skill memories for task '{task_name}': {e}")
logger.warning(
f"[PROCESS_SKILLS] Error recalling skill memories for task '{task_name}': {e}"
)
related_skill_memories_by_task[task_name] = []

skill_memories = []
Expand All @@ -580,7 +603,7 @@ def process_skill_memory_fine(
if skill_memory: # Only add non-None results
skill_memories.append(skill_memory)
except Exception as e:
logger.warning(f"Error extracting skill memory: {e}")
logger.warning(f"[PROCESS_SKILLS] Error extracting skill memory: {e}")
continue

# write skills to file and get zip paths
Expand All @@ -598,7 +621,7 @@ def process_skill_memory_fine(
skill_memory = futures[future]
skill_memory_with_paths.append((skill_memory, zip_path))
except Exception as e:
logger.warning(f"Error writing skills to file: {e}")
logger.warning(f"[PROCESS_SKILLS] Error writing skills to file: {e}")
continue

# Create a mapping from old_memory_id to old memory for easy lookup
Expand Down Expand Up @@ -630,14 +653,20 @@ def process_skill_memory_fine(
Path(skills_dir_config["skills_oss_dir"]) / user_id / zip_filename
).as_posix()
_delete_skills_from_oss(old_oss_path, oss_client)
logger.info(f"Deleted old skill from OSS: {old_oss_path}")
logger.info(
f"[PROCESS_SKILLS] Deleted old skill from OSS: {old_oss_path}"
)
except Exception as e:
logger.warning(f"Failed to delete old skill from OSS: {e}")
logger.warning(
f"[PROCESS_SKILLS] Failed to delete old skill from OSS: {e}"
)

# delete old skill from graph db
if graph_db:
graph_db.delete_node_by_prams(memory_ids=[old_memory_id])
logger.info(f"Deleted old skill from graph db: {old_memory_id}")
logger.info(
f"[PROCESS_SKILLS] Deleted old skill from graph db: {old_memory_id}"
)

# Upload new skill to OSS
# Use the same filename as the local zip file
Expand All @@ -654,9 +683,9 @@ def process_skill_memory_fine(
# Set URL directly to skill_memory
skill_memory["url"] = url

logger.info(f"Uploaded skill to OSS: {url}")
logger.info(f"[PROCESS_SKILLS] Uploaded skill to OSS: {url}")
except Exception as e:
logger.warning(f"Error uploading skill to OSS: {e}")
logger.warning(f"[PROCESS_SKILLS] Error uploading skill to OSS: {e}")
skill_memory["url"] = "" # Set to empty string if upload fails
finally:
# Clean up local files after upload
Expand All @@ -669,9 +698,9 @@ def process_skill_memory_fine(
# Delete skill directory
if skill_dir.exists():
shutil.rmtree(skill_dir)
logger.info(f"Cleaned up local files: {zip_path} and {skill_dir}")
logger.info(f"[PROCESS_SKILLS] Cleaned up local files: {zip_path} and {skill_dir}")
except Exception as cleanup_error:
logger.warning(f"Error cleaning up local files: {cleanup_error}")
logger.warning(f"[PROCESS_SKILLS] Error cleaning up local files: {cleanup_error}")

# Create TextualMemoryItem objects
skill_memory_items = []
Expand All @@ -680,7 +709,7 @@ def process_skill_memory_fine(
memory_item = create_skill_memory_item(skill_memory, info, embedder)
skill_memory_items.append(memory_item)
except Exception as e:
logger.warning(f"Error creating skill memory item: {e}")
logger.warning(f"[PROCESS_SKILLS] Error creating skill memory item: {e}")
continue

# TODO: deprecate this funtion and call
Expand Down