From f100c1dd93d304ea01fcbe3284d6c83a5bbfcc42 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Wed, 10 Dec 2025 10:25:44 +0800 Subject: [PATCH 01/15] feat: timer false --- src/memos/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/utils.py b/src/memos/utils.py index a29eaf99d..cb50aab8d 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -80,7 +80,7 @@ def wrapper(*args, **kwargs): return decorator(func) -def timed(func=None, *, log=True, log_prefix=""): +def timed(func=None, *, log=False, log_prefix=""): def decorator(fn): def wrapper(*args, **kwargs): start = time.perf_counter() From 51a782b7db318a6862ab1012f49a715ed99bf048 Mon Sep 17 00:00:00 2001 From: Dubberman <48425266+whipser030@users.noreply.github.com> Date: Thu, 8 Jan 2026 16:25:02 +0800 Subject: [PATCH 02/15] fix: knowledge base adopt raw text (#836) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update reader and search strategy * set strategy reader and search config * fix install problem * fix * fix test * turn off graph recall * turn off graph recall * turn off graph recall * fix Searcher input bug * fix Searcher * fix Search * fix bug * adjust strategy reader * adjust strategy reader * adjust search config input * reformat code * re pr * format repair * fix time issue * develop feedback process * feedback handler configuration * upgrade feedback using * add threshold * update prompt * update prompt * fix handler * add feedback scheduler * add handler change node update * add handler change node update * add handler change node update * add handler change node update * fix interface input * add chunk and ratio filter * update stopwords * fix messages queue * add seach_by_keywords_LIKE * add doc filter * add retrieve query * add retrieve queies * patch info filter * add log and make embedding safety net * add log and make embedding safety net * deduplicate add objects * use _add_memories_parallel * delete Special characters * delete Special characters * delete Special characters * delete Special characters * add source_doc_id * add source_doc_id * add reranker in init com.. * fix circle import * add feedback judgement * add feedback judgement * add pref feedback * add pref feedback * patch: get_memory func filter user id and make page chunk * add total num * add total num * add milvus pagination * fix merge implicit explicit pref * fix merge implicit explicit pref * fix merge implicit explicit pref * fix json load bug * knowledge raw_text replace memory * knowledge raw_text replace memory * knowledge raw_text replace memory --------- Co-authored-by: 黑布林 <11641432+heiheiyouyou@user.noreply.gitee.com> Co-authored-by: CaralHsi Co-authored-by: chunyu li <78344051+fridayL@users.noreply.github.com> --- src/memos/api/handlers/formatters_handler.py | 104 ++++++++++++++++++- src/memos/api/handlers/memory_handler.py | 2 +- src/memos/api/handlers/search_handler.py | 15 +++ src/memos/reranker/concat.py | 12 ++- src/memos/reranker/http_bge.py | 16 +-- 5 files changed, 138 insertions(+), 11 deletions(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 94988295b..ca87d95d2 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -7,9 +7,13 @@ from typing import Any +from memos.log import get_logger from memos.templates.instruction_completion import instruct_completion +logger = get_logger(__name__) + + def to_iter(running: Any) -> list[Any]: """ Normalize running tasks to a list of task objects. @@ -29,7 +33,9 @@ def to_iter(running: Any) -> list[Any]: return list(running) if running else [] -def format_memory_item(memory_data: Any, include_embedding: bool = False) -> dict[str, Any]: +def format_memory_item( + memory_data: Any, include_embedding: bool = False, save_sources: bool = True +) -> dict[str, Any]: """ Format a single memory item for API response. @@ -49,7 +55,8 @@ def format_memory_item(memory_data: Any, include_embedding: bool = False) -> dic memory["ref_id"] = ref_id if not include_embedding: memory["metadata"]["embedding"] = [] - memory["metadata"]["sources"] = [] + if not save_sources: + memory["metadata"]["sources"] = [] memory["metadata"]["usage"] = [] memory["metadata"]["ref_id"] = ref_id memory["metadata"]["id"] = memory_id @@ -125,3 +132,96 @@ def post_process_textual_mem( } ) return memories_result + + +def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]): + """ + Separate knowledge and conversation memories from retrieval results. + """ + knowledge_mem = [] + conversation_mem = [] + for item in memories: + sources = item["metadata"]["sources"] + if ( + len(sources) > 0 + and "type" in sources[0] + and sources[0]["type"] == "file" + and "content" in sources[0] + and sources[0]["content"] != "" + ): # TODO change to memory_type + knowledge_mem.append(item) + else: + conversation_mem.append(item) + + logger.info( + f"Retrieval results number of knowledge_mem: {len(knowledge_mem)}, conversation_mem: {len(conversation_mem)}" + ) + return knowledge_mem, conversation_mem + + +def rerank_knowledge_mem( + reranker: Any, + query: str, + text_mem: list[dict[str, Any]], + top_k: int, + file_mem_proportion: float = 0.5, +) -> list[dict[str, Any]]: + """ + Rerank knowledge memories and keep conversation memories. + """ + memid2cubeid = {} + memories_list = [] + for memory_group in text_mem: + cube_id = memory_group["cube_id"] + memories = memory_group["memories"] + memories_list.extend(memories) + for memory in memories: + memid2cubeid[memory["id"]] = cube_id + + knowledge_mem, conversation_mem = separate_knowledge_and_conversation_mem(memories_list) + knowledge_mem_top_k = max(int(top_k * file_mem_proportion), int(top_k - len(conversation_mem))) + reranked_knowledge_mem = reranker.rerank(query, knowledge_mem, top_k=len(knowledge_mem)) + reranked_knowledge_mem = [item[0] for item in reranked_knowledge_mem] + + # TODO revoke sources replace memory value + for item in reranked_knowledge_mem: + item["memory"] = item["metadata"]["sources"][0]["content"] + item["metadata"]["sources"] = [] + + for item in conversation_mem: + item["metadata"]["sources"] = [] + + # deduplicate: remove items with duplicate memory content + original_count = len(reranked_knowledge_mem) + seen_memories = set[Any]() + deduplicated_knowledge_mem = [] + for item in reranked_knowledge_mem: + memory_content = item.get("memory", "") + if memory_content and memory_content not in seen_memories: + seen_memories.add(memory_content) + deduplicated_knowledge_mem.append(item) + deduplicated_count = len(deduplicated_knowledge_mem) + logger.info( + f"After filtering duplicate knowledge base text from sources, count changed from {original_count} to {deduplicated_count}" + ) + + reranked_knowledge_mem = deduplicated_knowledge_mem[:knowledge_mem_top_k] + conversation_mem_top_k = top_k - len(reranked_knowledge_mem) + cubeid2memories = {} + text_mem_res = [] + + for memory in reranked_knowledge_mem + conversation_mem[:conversation_mem_top_k]: + cube_id = memid2cubeid[memory["id"]] + if cube_id not in cubeid2memories: + cubeid2memories[cube_id] = [] + cubeid2memories[cube_id].append(memory) + + for cube_id, memories in cubeid2memories.items(): + text_mem_res.append( + { + "cube_id": cube_id, + "memories": memories, + } + ) + + return text_mem_res diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index ef829d757..14bb8eec5 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -204,7 +204,7 @@ def handle_get_memories( preferences, total_pref = naive_mem_cube.pref_mem.get_memory_by_filter( filter_params, page=get_mem_req.page, page_size=get_mem_req.page_size ) - format_preferences = [format_memory_item(item) for item in preferences] + format_preferences = [format_memory_item(item, save_sources=False) for item in preferences] return GetMemoryResponse( message="Memories retrieved successfully", diff --git a/src/memos/api/handlers/search_handler.py b/src/memos/api/handlers/search_handler.py index 3774410dc..32a970b22 100644 --- a/src/memos/api/handlers/search_handler.py +++ b/src/memos/api/handlers/search_handler.py @@ -5,9 +5,12 @@ using dependency injection for better modularity and testability. """ +import time + from typing import Any from memos.api.handlers.base_handler import BaseHandler, HandlerDependencies +from memos.api.handlers.formatters_handler import rerank_knowledge_mem from memos.api.product_models import APISearchRequest, SearchResponse from memos.log import get_logger from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import ( @@ -69,6 +72,18 @@ def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse # Restore original top_k for downstream logic or response metadata search_req.top_k = original_top_k + start_time = time.time() + text_mem = results["text_mem"] + results["text_mem"] = rerank_knowledge_mem( + self.reranker, + query=search_req.query, + text_mem=text_mem, + top_k=original_top_k, + file_mem_proportion=0.5, + ) + rerank_time = time.time() - start_time + self.logger.info(f"[Knowledge_replace_memory_time] Rerank time: {rerank_time} seconds") + self.logger.info( f"[SearchHandler] Final search results: count={len(results)} results={results}" ) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index 502af18b6..b39496a1c 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -83,10 +83,18 @@ def concat_original_source( merge_field = ["sources"] if rerank_source is None else rerank_source.split(",") documents = [] for item in graph_results: - memory = _TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m + m = item.get("memory") if isinstance(item, dict) else getattr(item, "memory", None) + + memory = _TAG1.sub("", m) if isinstance(m, str) else m + sources = [] for field in merge_field: - source = getattr(item.metadata, field, None) + if isinstance(item, dict): + metadata = item.get("metadata", {}) + source = metadata.get(field) if isinstance(metadata, dict) else None + else: + source = getattr(item.metadata, field, None) if hasattr(item, "metadata") else None + if source is None: continue sources.append((memory, source)) diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 4e9054f1e..32034cf6d 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -129,7 +129,7 @@ def __init__( def rerank( self, query: str, - graph_results: list[TextualMemoryItem], + graph_results: list[TextualMemoryItem] | list[dict[str, Any]], top_k: int, search_priority: dict | None = None, **kwargs, @@ -164,11 +164,15 @@ def rerank( if self.rerank_source: documents = concat_original_source(graph_results, self.rerank_source) else: - documents = [ - (_TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m) - for item in graph_results - ] - documents = [d for d in documents if isinstance(d, str) and d] + documents = [] + filtered_graph_results = [] + for item in graph_results: + m = item.get("memory") if isinstance(item, dict) else getattr(item, "memory", None) + + if isinstance(m, str) and m: + documents.append(_TAG1.sub("", m)) + filtered_graph_results.append(item) + graph_results = filtered_graph_results logger.info(f"[HTTPBGERerankerSample] query: {query} , documents: {documents[:5]}...") From 7ffdb523e2fa6634903667c18120c495eb467bf9 Mon Sep 17 00:00:00 2001 From: Wang Daoji <75928131+Wang-Daoji@users.noreply.github.com> Date: Thu, 8 Jan 2026 17:46:24 +0800 Subject: [PATCH 03/15] Feat/optimize cloud service api (#839) * add get_user_names_by_memory_ids api * modify delete api * modify bug * add extract limit in implicit memory * close internet search in chat api, modify implicit pref prompt * modify bug * add a new internal method for check cube id exist * modify code * add get memory by memory id api --------- Co-authored-by: yuan.wang --- src/memos/api/handlers/memory_handler.py | 43 ++++++++++++++++++++++++ src/memos/api/routers/server_router.py | 8 +++++ 2 files changed, 51 insertions(+) diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index 14bb8eec5..7110fae09 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -176,6 +176,49 @@ def handle_get_subgraph( raise +def handle_get_memory(memory_id: str, naive_mem_cube: NaiveMemCube) -> GetMemoryResponse: + """ + Handler for getting a single memory by its ID. + + Tries to retrieve from text memory first, then preference memory if not found. + + Args: + memory_id: The ID of the memory to retrieve + naive_mem_cube: Memory cube instance + + Returns: + GetMemoryResponse with the memory data + """ + + try: + memory = naive_mem_cube.text_mem.get(memory_id) + except Exception: + memory = None + + # If not found in text memory, try preference memory + pref = None + if memory is None and naive_mem_cube.pref_mem is not None: + collection_names = ["explicit_preference", "implicit_preference"] + for collection_name in collection_names: + try: + pref = naive_mem_cube.pref_mem.get_with_collection_name(collection_name, memory_id) + if pref is not None: + break + except Exception: + continue + + # Get the data from whichever memory source succeeded + data = (memory or pref).model_dump() if (memory or pref) else None + + return GetMemoryResponse( + message="Memory retrieved successfully" + if data + else f"Memory with ID {memory_id} not found", + code=200, + data=data, + ) + + def handle_get_memories( get_mem_req: GetMemoryRequest, naive_mem_cube: NaiveMemCube ) -> GetMemoryResponse: diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index a4052d313..8371c41b9 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -314,6 +314,14 @@ def get_memories(memory_req: GetMemoryRequest): ) +@router.get("/get_memory/{memory_id}", summary="Get memory by id", response_model=GetMemoryResponse) +def get_memory_by_id(memory_id: str): + return handlers.memory_handler.handle_get_memory( + memory_id=memory_id, + naive_mem_cube=naive_mem_cube, + ) + + @router.post( "/delete_memory", summary="Delete memories for user", response_model=DeleteMemoryResponse ) From 5f811d4ca5737cdb197fddc06743cfe79da36861 Mon Sep 17 00:00:00 2001 From: Wang Daoji <75928131+Wang-Daoji@users.noreply.github.com> Date: Thu, 8 Jan 2026 18:42:02 +0800 Subject: [PATCH 04/15] Feat/fix palyground bug (#841) * fix playground bug, internet search judge * fix playground internet bug * modify delete mem * modify tool resp bug in multi cube * fix bug in playground chat handle and search inter * modify prompt * fix bug in playground * fix bug playfround * fix bug * fix code * fix model bug in playground * modify plan b * llm param modify * add logger in playground * modify code * fix bug * modify code * modify code * fix bug * fix search bug in plarground * fixx bug * move schadualr to back * modify pref location * modify fast net search * add tags and new package * modify prompt fix bug * remove nltk due to image promblem * prompt modify * modify bug remove redundant field * modify bug * fix playground bug * fix bug * bust internet topk * bust to 50 * fix bug cite * modify search * remote query add in playground * modify bug * modify pref bug * move add position * modify chat prompt * modify overthinking * add logger in playground chat * midify mem * remove must in prompt * add logger * add logger * remove dedup in playground --------- Co-authored-by: yuan.wang Co-authored-by: chunyu li <78344051+fridayL@users.noreply.github.com> Co-authored-by: CaralHsi --- src/memos/api/handlers/memory_handler.py | 13 ------------- src/memos/api/routers/server_router.py | 2 -- 2 files changed, 15 deletions(-) diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index 7110fae09..a4f500560 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -23,10 +23,6 @@ remove_embedding_recursive, sort_children_by_memory_type, ) -from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import ( - cosine_similarity_matrix, - find_best_unrelated_subgroup, -) if TYPE_CHECKING: @@ -41,7 +37,6 @@ def handle_get_all_memories( mem_cube_id: str, memory_type: Literal["text_mem", "act_mem", "param_mem", "para_mem"], naive_mem_cube: Any, - embedder: Any, ) -> MemoryResponse: """ Main handler for getting all memories. @@ -64,14 +59,6 @@ def handle_get_all_memories( # Get all text memories from the graph database memories = naive_mem_cube.text_mem.get_all(user_name=mem_cube_id) - mems = [mem.get("memory", "") for mem in memories.get("nodes", [])] - embeddings = embedder.embed(mems) - similarity_matrix = cosine_similarity_matrix(embeddings) - selected_indices, _ = find_best_unrelated_subgroup( - embeddings, similarity_matrix, bar=0.9 - ) - memories["nodes"] = [memories["nodes"][i] for i in selected_indices] - # Format and convert to tree structure memories_cleaned = remove_embedding_recursive(memories) custom_type_ratios = { diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 8371c41b9..86b75d73e 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -88,7 +88,6 @@ naive_mem_cube = components["naive_mem_cube"] redis_client = components["redis_client"] status_tracker = TaskStatusTracker(redis_client=redis_client) -embedder = components["embedder"] graph_db = components["graph_db"] vector_db = components["vector_db"] @@ -302,7 +301,6 @@ def get_all_memories(memory_req: GetMemoryPlaygroundRequest): ), memory_type=memory_req.memory_type or "text_mem", naive_mem_cube=naive_mem_cube, - embedder=embedder, ) From 8b30a4414848b8b91ad857113db43984eefd8cb7 Mon Sep 17 00:00:00 2001 From: Travis Tang Date: Fri, 9 Jan 2026 10:43:18 +0800 Subject: [PATCH 05/15] refactor&fix: fix a range of bugs in scheduler and revise fine add apis (#840) * fix bugs: try to fix bugs in _submit_web_logs * fix bugs: try to address bugs * fix bugs * refactor: modify examples * revise add operation and fix an unbelievable bug * address the bug issues * the doc file has a format problem which has been fixed in this commit * add a range of new feats for the add operation * address the incompatible issue of local scheduler * feat(scheduler): optimize redis queue consumer group management - Proactively ensure consumer groups exist in '_refresh_stream_keys' for newly discovered streams. - Remove redundant consumer group checks in '_read_new_messages_batch' to improve read performance. - Clean up 'seen_streams' cache when streams are deleted to ensure correct group recreation. - This change reduces unnecessary Redis calls during high-frequency polling. * fix(tests): resolve AttributeError in SimpleStructMemReader tests - Import 'parse_json_result' from 'memos.mem_reader.utils' instead of accessing it as an instance attribute. - Fixes 'AttributeError: 'SimpleStructMemReader' object has no attribute 'parse_json_result'' in 'test_parse_json_result_success' and 'test_parse_json_result_failure'. - Remove incorrect mock assignment of 'parse_json_result' in 'test_process_chat_data'. * fix(mem_reader): pass info dict to add_before_search for correct user_id usage - Update 'add_before_search' signature in 'SimpleStructMemReader' to accept 'info' dict. - Pass 'info' (containing 'user_id' and 'session_id') to 'self.searcher.search' instead of using empty strings. - Add 'test_add_before_search' to 'TestSimpleStructMemReader' to verify the fix and ensure 'searcher.search' receives the correct 'info'. - This ensures that memory searches are scoped to the correct user and session. * refactor add_before_search from mem_reader to SingleCubeView * address bugs * fix: fix the qsize bug of task queue, and accept change from hotfix/scheduler * fix: address some issues to run old scheduler example and kv cache example * fix: address the issue of Top-level import of unavailable module 'torch' * fix: resolve linting errors and make optional dependencies lazy loaded - Fix ambiguous characters and commented-out code in examples/mem_scheduler/quick_start_examples.py - Fix nested if statements in src/memos/mem_os/core.py - Move torch and transformers imports to method scope in src/memos/llms/hf.py to support optional dependencies - Update tests/llms/test_hf.py to patch transformers module directly * refactor: revise the rewrite prompt to make it better * refactor: update examples * refactor: update examples for scheduler * fix bugs: address the unsupported xautoclaim command when redis version larger than 6.2.0 via adding a new feature of manul auto claim with the combination of xpending + xclaim * refactor: review settings * refactor: adjust examples to make it run better for code debugging * refactor: review slow add apis to get a better performance on Halumen * fix bugs: address the issue when set user_redis_queue to false, the status_tracker is still using * refactor: allow the code to run without rabbitmq * refactor: create a _parse_pending_entry for redis queue * refactor: add a try/catch for status_tracker --- .gitignore | 1 + docker/.env.example | 1 + .../mem_scheduler/quick_start_examples.py | 8 +- .../scheduler_for_async_tasks.py | 2 +- src/memos/mem_reader/simple_struct.py | 4 +- src/memos/mem_scheduler/base_scheduler.py | 9 +- .../task_schedule_modules/dispatcher.py | 31 -- .../task_schedule_modules/redis_queue.py | 276 +++++++++++++++--- .../webservice_modules/rabbitmq_service.py | 42 ++- src/memos/templates/mem_reader_prompts.py | 79 ++--- 10 files changed, 313 insertions(+), 140 deletions(-) diff --git a/.gitignore b/.gitignore index 8319a4d2f..ac31eb41a 100644 --- a/.gitignore +++ b/.gitignore @@ -204,6 +204,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ +.trae # VSCode .vscode* diff --git a/docker/.env.example b/docker/.env.example index ee26c7bcd..3674cd69b 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -123,6 +123,7 @@ API_SCHEDULER_ON=true API_SEARCH_WINDOW_SIZE=5 # Specify how many rounds of previous conversations (history) to retrieve and consider during the 'hybrid search' (fast search+asynchronous fine search). This helps provide context aware search results API_SEARCH_HISTORY_TURNS=5 +MEMSCHEDULER_USE_REDIS_QUEUE=false ## Graph / vector stores # Neo4j database selection mode diff --git a/examples/mem_scheduler/quick_start_examples.py b/examples/mem_scheduler/quick_start_examples.py index fbfef4d76..724663be6 100644 --- a/examples/mem_scheduler/quick_start_examples.py +++ b/examples/mem_scheduler/quick_start_examples.py @@ -146,7 +146,9 @@ def kv_cache_only(): def run_scheduler_example(): # 使用 MemScheduler 加载主 MOS(Memory-Oriented System)配置文件 - config = parse_yaml("./examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml") + config = parse_yaml( + f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml" + ) # 将解析出的配置字典传入 MOSConfig 构造器, 构建配置对象 mos_config = MOSConfig(**config) # 使用配置对象初始化 MOS 系统实例 @@ -159,12 +161,12 @@ def run_scheduler_example(): # 从 YAML 文件加载 MemCube(记忆立方体)的通用配置 config = GeneralMemCubeConfig.from_yaml_file( - "./examples/data/config/mem_scheduler/mem_cube_config.yaml" + f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config.yaml" ) # 定义 MemCube 的唯一标识符 mem_cube_id = "mem_cube_5" # 定义 MemCube 的本地存储路径(路径中包含用户 ID 和 MemCube ID) - mem_cube_name_or_path = f"./outputs/mem_scheduler/{user_id}/{mem_cube_id}" + mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}" # 如果该路径已存在, 则先删除旧目录 if Path(mem_cube_name_or_path).exists(): diff --git a/examples/mem_scheduler/scheduler_for_async_tasks.py b/examples/mem_scheduler/scheduler_for_async_tasks.py index a767b57c4..7f544c3da 100644 --- a/examples/mem_scheduler/scheduler_for_async_tasks.py +++ b/examples/mem_scheduler/scheduler_for_async_tasks.py @@ -57,7 +57,7 @@ def submit_tasks(): TEST_HANDLER_LABEL = "test_handler" mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler}) -# 10s to restart +# 5s to restart mem_scheduler.orchestrator.tasks_min_idle_ms[TEST_HANDLER_LABEL] = 5_000 tmp_dir = Path("./tmp") diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 61a7d2b6d..fa72bd063 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -614,11 +614,9 @@ def _read_memory( serialized_origin_memories = json.dumps( [one.memory for one in original_memory_group], indent=2 ) - revised_memory_list = self.rewrite_memories( + revised_memory_list = self.filter_hallucination_in_memories( messages=combined_messages, memory_list=original_memory_group, - user_only=os.getenv("SIMPLE_STRUCT_REWRITE_USER_ONLY", "true").lower() - == "false", ) serialized_revised_memories = json.dumps( [one.memory for one in revised_memory_list], indent=2 diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 3f5c90b67..4c9310cbb 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -225,7 +225,7 @@ def initialize_modules( process_llm = chat_llm try: - if redis_client: + if redis_client and self.use_redis_queue: self.status_tracker = TaskStatusTracker(redis_client) if self.dispatcher: self.dispatcher.status_tracker = self.status_tracker @@ -305,7 +305,7 @@ def status_tracker(self) -> TaskStatusTracker | None: available via RedisSchedulerModule. This mirrors the lazy pattern used by `mem_cube` so downstream modules can safely access the tracker. """ - if self._status_tracker is None: + if self._status_tracker is None and self.use_redis_queue: try: self._status_tracker = TaskStatusTracker(self.redis) # Propagate to submodules when created lazily @@ -314,7 +314,8 @@ def status_tracker(self) -> TaskStatusTracker | None: if self.memos_message_queue: self.memos_message_queue.set_status_tracker(self._status_tracker) except Exception as e: - logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) + logger.warning(f"Failed to lazy-initialize status_tracker: {e}", exc_info=True) + return self._status_tracker @status_tracker.setter @@ -869,6 +870,8 @@ def _submit_web_logs( messages = [messages] # transform single message to list for message in messages: + if self.rabbitmq_config is None: + return try: # Always call publish; the publisher now caches when offline and flushes after reconnect logger.info( diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index cdd491183..2099da5a1 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -108,8 +108,6 @@ def __init__( ) self.metrics = metrics - self._status_tracker: TaskStatusTracker | None = None - # Use setter to allow propagation and keep a single source of truth self.status_tracker = status_tracker self.submit_web_logs = submit_web_logs # ADDED @@ -118,35 +116,6 @@ def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None: return # This is handled in BaseScheduler now - @property - def status_tracker(self) -> TaskStatusTracker | None: - """Lazy-initialized status tracker for the dispatcher. - - If the tracker is None, attempt to initialize from the Redis-backed - components available to the dispatcher (queue or orchestrator). - """ - if self._status_tracker is None: - try: - self._status_tracker = TaskStatusTracker(self.redis) - # Propagate to submodules when created lazily - if self.memos_message_queue: - self.memos_message_queue.set_status_tracker(self._status_tracker) - except Exception as e: - logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) - return self._status_tracker - - @status_tracker.setter - def status_tracker(self, value: TaskStatusTracker | None) -> None: - self._status_tracker = value - # Propagate to the queue if possible - try: - if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"): - self.memos_message_queue.status_tracker = value - except Exception as e: - logger.warning( - f"Failed to propagate dispatcher status_tracker to queue: {e}", exc_info=True - ) - def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem): """ Create a wrapper around the handler to track task execution and capture results. diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index 557a45466..1c9683542 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -81,6 +81,7 @@ def __init__( # Consumer state self._is_listening = False self._message_handler: Callable[[ScheduleMessageItem], None] | None = None + self.supports_xautoclaim = False # Connection state self._is_connected = False @@ -105,6 +106,7 @@ def __init__( # Auto-initialize Redis connection if self.auto_initialize_redis(): self._is_connected = True + self._check_xautoclaim_support() self.seen_streams = set() @@ -143,6 +145,33 @@ def __init__( logger.debug(f"Initial stream keys refresh failed: {e}") self._start_stream_keys_refresh_thread() + def _check_xautoclaim_support(self): + """Check if the Redis server supports xautoclaim (v6.2+).""" + if not self._redis_conn: + return + + try: + info = self._redis_conn.info("server") + version_str = info.get("redis_version", "0.0.0") + # Simple version parsing + parts = [int(p) for p in version_str.split(".") if p.isdigit()] + while len(parts) < 3: + parts.append(0) + + major, minor, _ = parts[:3] + if major > 6 or (major == 6 and minor >= 2): + self.supports_xautoclaim = True + else: + self.supports_xautoclaim = False + + logger.info( + f"[REDIS_QUEUE] Redis version {version_str}. " + f"Supports xautoclaim: {self.supports_xautoclaim}" + ) + except Exception as e: + logger.warning(f"Failed to check Redis version: {e}") + self.supports_xautoclaim = False + def get_stream_key(self, user_id: str, mem_cube_id: str, task_label: str) -> str: stream_key = f"{self.stream_key_prefix}:{user_id}:{mem_cube_id}:{task_label}" return stream_key @@ -623,41 +652,67 @@ def _compute_pending_need( need_pending = max(0, batch_size - new_count) return need_pending if need_pending > 0 else 0 + def _parse_pending_entry(self, entry) -> tuple[str, int]: + """Extract message_id and idle_time from a pending entry (dict, tuple, or object).""" + if isinstance(entry, dict): + return entry.get("message_id"), entry.get("time_since_delivered") + elif isinstance(entry, tuple | list): + return entry[0], entry[2] + else: + # Assume object (redis-py 5.x+ PendingMessage) + return getattr(entry, "message_id", None), getattr(entry, "time_since_delivered", 0) + + def _manual_xautoclaim( + self, stream_key: str, min_idle_time: int, count: int + ) -> tuple[str, list[tuple[str, dict]], list[str]]: + """ + Simulate xautoclaim using xpending and xclaim for compatibility with older Redis versions. + """ + # 1. Get pending entries (fetch slightly more to increase chance of finding idle ones) + fetch_count = count * 3 + pending_entries = self._redis_conn.xpending_range( + stream_key, self.consumer_group, "-", "+", fetch_count + ) + + if not pending_entries: + return "0-0", [], [] + + claim_ids = [] + for entry in pending_entries: + # entry structure depends on redis-py version/decoding + # Assuming list of dicts: {'message_id': '...', 'time_since_delivered': ms, ...} + # or list of tuples + msg_id, idle_time = self._parse_pending_entry(entry) + + if idle_time >= min_idle_time: + claim_ids.append(msg_id) + if len(claim_ids) >= count: + break + + if not claim_ids: + return "0-0", [], [] + + # 2. Claim messages + claimed_messages = self._redis_conn.xclaim( + stream_key, self.consumer_group, self.consumer_name, min_idle_time, claim_ids + ) + + return "0-0", claimed_messages, [] + def _claim_pending_messages( self, stream_key: str, need_pending_count: int, task_label: str ) -> list[tuple[str, list[tuple[str, dict]]]]: """Claim pending messages exceeding idle threshold, with group existence handling.""" - try: - claimed_result = self._redis_conn.xautoclaim( - name=stream_key, - groupname=self.consumer_group, - consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=task_label), - start_id="0-0", - count=need_pending_count, - justid=False, - ) - if len(claimed_result) == 2: - next_id, claimed = claimed_result - deleted_ids = [] - elif len(claimed_result) == 3: - next_id, claimed, deleted_ids = claimed_result - else: - raise ValueError(f"Unexpected xautoclaim response length: {len(claimed_result)}") + min_idle = self.orchestrator.get_task_idle_min(task_label=task_label) - return [(stream_key, claimed)] if claimed else [] - except Exception as read_err: - err_msg = str(read_err).lower() - if "nogroup" in err_msg or "no such key" in err_msg: - logger.warning( - f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (xautoclaim)." - ) - self._ensure_consumer_group(stream_key=stream_key) + # Use native xautoclaim if supported (Redis 6.2+) + if self.supports_xautoclaim: + try: claimed_result = self._redis_conn.xautoclaim( name=stream_key, groupname=self.consumer_group, consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=task_label), + min_idle_time=min_idle, start_id="0-0", count=need_pending_count, justid=False, @@ -670,25 +725,64 @@ def _claim_pending_messages( else: raise ValueError( f"Unexpected xautoclaim response length: {len(claimed_result)}" - ) from read_err + ) return [(stream_key, claimed)] if claimed else [] - return [] - - def _batch_claim_pending_messages( - self, claims_spec: list[tuple[str, int, str]] - ) -> list[tuple[str, list[tuple[str, dict]]]]: - """Batch-claim pending messages across multiple streams. + except Exception as read_err: + err_msg = str(read_err).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + logger.warning( + f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (xautoclaim)." + ) + self._ensure_consumer_group(stream_key=stream_key) + claimed_result = self._redis_conn.xautoclaim( + name=stream_key, + groupname=self.consumer_group, + consumername=self.consumer_name, + min_idle_time=min_idle, + start_id="0-0", + count=need_pending_count, + justid=False, + ) + if len(claimed_result) == 2: + next_id, claimed = claimed_result + deleted_ids = [] + elif len(claimed_result) == 3: + next_id, claimed, deleted_ids = claimed_result + else: + raise ValueError( + f"Unexpected xautoclaim response length: {len(claimed_result)}" + ) from read_err - Args: - claims_spec: List of tuples (stream_key, need_pending_count, task_label) + return [(stream_key, claimed)] if claimed else [] + return [] - Returns: - A list of (stream_key, claimed_entries) pairs for all successful claims. - """ - if not self._redis_conn or not claims_spec: + # Fallback to manual xautoclaim for older Redis versions + try: + _next, claimed, _deleted = self._manual_xautoclaim( + stream_key, min_idle, need_pending_count + ) + return [(stream_key, claimed)] if claimed else [] + except Exception as read_err: + err_msg = str(read_err).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + logger.warning( + f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (manual xautoclaim)." + ) + self._ensure_consumer_group(stream_key=stream_key) + try: + _next, claimed, _deleted = self._manual_xautoclaim( + stream_key, min_idle, need_pending_count + ) + return [(stream_key, claimed)] if claimed else [] + except Exception: + return [] return [] + def _batch_claim_native( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages using Redis xautoclaim pipeline (Redis 6.2+).""" pipe = self._redis_conn.pipeline(transaction=False) for stream_key, need_count, label in claims_spec: pipe.xautoclaim( @@ -702,14 +796,11 @@ def _batch_claim_pending_messages( ) try: - # Execute with raise_on_error=False so we get exceptions in the results list - # instead of aborting the whole batch. results = pipe.execute(raise_on_error=False) except Exception as e: logger.error(f"Pipeline execution critical failure: {e}") results = [e] * len(claims_spec) - # Handle individual failures (e.g. NOGROUP) by retrying just that stream final_results = [] for i, res in enumerate(results): if isinstance(res, Exception): @@ -736,12 +827,8 @@ def _batch_claim_pending_messages( else: final_results.append(res) - results = final_results - - claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] - for (stream_key, _need_count, _label), claimed_result in zip( - claims_spec, results, strict=False - ): + claimed_pairs = [] + for (stream_key, _, _), claimed_result in zip(claims_spec, final_results, strict=False): try: if not claimed_result: continue @@ -760,6 +847,98 @@ def _batch_claim_pending_messages( return claimed_pairs + def _batch_claim_manual( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages using 2-phase pipeline (Redis < 6.2).""" + # Phase 1: Fetch pending messages for all streams + pending_pipe = self._redis_conn.pipeline(transaction=False) + for stream_key, need_count, _label in claims_spec: + fetch_count = need_count * 3 + pending_pipe.xpending_range(stream_key, self.consumer_group, "-", "+", fetch_count) + + try: + pending_results = pending_pipe.execute(raise_on_error=False) + except Exception as e: + logger.error(f"Pending fetch pipeline failed: {e}") + return [] + + # Phase 2: Filter and prepare claim pipeline + claim_pipe = self._redis_conn.pipeline(transaction=False) + streams_to_claim_indices = [] + claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] + + for i, (stream_key, need_count, label) in enumerate(claims_spec): + pending_res = pending_results[i] + min_idle = self.orchestrator.get_task_idle_min(task_label=label) + + if isinstance(pending_res, Exception): + err_msg = str(pending_res).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + try: + self._ensure_consumer_group(stream_key) + _next, claimed, _ = self._manual_xautoclaim( + stream_key, min_idle, need_count + ) + if claimed: + claimed_pairs.append((stream_key, claimed)) + except Exception as retry_err: + logger.warning(f"Retry manual claim failed for {stream_key}: {retry_err}") + continue + + if not pending_res: + continue + + claim_ids = [] + for entry in pending_res: + msg_id, idle_time = self._parse_pending_entry(entry) + if idle_time >= min_idle: + claim_ids.append(msg_id) + if len(claim_ids) >= need_count: + break + + if claim_ids: + claim_pipe.xclaim( + stream_key, + self.consumer_group, + self.consumer_name, + min_idle, + claim_ids, + ) + streams_to_claim_indices.append(i) + + if streams_to_claim_indices: + try: + claim_results = claim_pipe.execute(raise_on_error=False) + for idx_in_results, original_idx in enumerate(streams_to_claim_indices): + res = claim_results[idx_in_results] + stream_key = claims_spec[original_idx][0] + if isinstance(res, list) and res: + claimed_pairs.append((stream_key, res)) + except Exception as e: + logger.error(f"Claim pipeline failed: {e}") + + return claimed_pairs + + def _batch_claim_pending_messages( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages across multiple streams. + + Args: + claims_spec: List of tuples (stream_key, need_pending_count, task_label) + + Returns: + A list of (stream_key, claimed_entries) pairs for all successful claims. + """ + if not self._redis_conn or not claims_spec: + return [] + + if self.supports_xautoclaim: + return self._batch_claim_native(claims_spec) + + return self._batch_claim_manual(claims_spec) + def _convert_messages( self, messages: list[tuple[str, list[tuple[str, dict]]]] ) -> list[ScheduleMessageItem]: @@ -994,6 +1173,7 @@ def connect(self) -> None: # Test the connection self._redis_conn.ping() self._is_connected = True + self._check_xautoclaim_support() logger.debug("Redis connection established successfully") # Start stream keys refresher when connected self._start_stream_keys_refresh_thread() diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 5a94d2af2..a07934b8e 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -30,6 +30,7 @@ def __init__(self): Initialize RabbitMQ connection settings. """ super().__init__() + self.auth_config = None # RabbitMQ settings self.rabbitmq_config: RabbitMQConfig | None = None @@ -99,22 +100,35 @@ def initialize_rabbitmq( ) return + if self.is_rabbitmq_connected(): + logger.warning("RabbitMQ is already connected. Skipping initialization.") + return + from pika.adapters.select_connection import SelectConnection - if config is None: - if config_path is None and AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_config() - elif Path(config_path).exists(): - auth_config = AuthConfig.from_local_config(config_path=config_path) + if config is not None: + if isinstance(config, RabbitMQConfig): + self.rabbitmq_config = config + elif isinstance(config, dict): + self.rabbitmq_config = AuthConfig.from_dict(config).rabbitmq else: - auth_config = AuthConfig.from_local_env() - self.rabbitmq_config = auth_config.rabbitmq - elif isinstance(config, RabbitMQConfig): - self.rabbitmq_config = config - elif isinstance(config, dict): - self.rabbitmq_config = AuthConfig.from_dict(config).rabbitmq + logger.error(f"Unsupported config type: {type(config)}") + return + else: - logger.error("Not implemented") + if config_path is not None and Path(config_path).exists(): + self.auth_config = AuthConfig.from_local_config(config_path=config_path) + elif AuthConfig.default_config_exists(): + self.auth_config = AuthConfig.from_local_config() + else: + self.auth_config = AuthConfig.from_local_env() + self.rabbitmq_config = self.auth_config.rabbitmq + + if self.rabbitmq_config is None: + logger.error( + "Failed to load RabbitMQ configuration. Please check your config file or environment variables." + ) + return # Load exchange configuration from config if self.rabbitmq_config: @@ -140,7 +154,7 @@ def initialize_rabbitmq( self.rabbitmq_exchange_type = env_exchange_type logger.info(f"Using env exchange type override: {self.rabbitmq_exchange_type}") - # Start connection process + # Start connection process parameters = self.get_rabbitmq_connection_param() self.rabbitmq_connection = SelectConnection( parameters, @@ -156,7 +170,7 @@ def initialize_rabbitmq( self._io_loop_thread.start() logger.info("RabbitMQ connection process started") except Exception: - logger.error("Fail to initialize auth_config", exc_info=True) + logger.error("Failed to initialize RabbitMQ connection", exc_info=True) finally: with self._rabbitmq_lock: self._rabbitmq_initializing = False diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 9432d6303..20f8150b7 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -796,43 +796,48 @@ """ SIMPLE_STRUCT_HALLUCINATION_FILTER_PROMPT = """ -You are a strict memory validator. -Your task is to identify and delete hallucinated memories that are not explicitly stated by the user in the provided messages. - -Rules: -1. **User-Only Origin**: Verify facts against USER messages ONLY. If the Assistant repeats a User fact, it is VALID. If the Assistant introduces a new detail (e.g., 'philanthropy') that the User did not explicitly confirm, it is INVALID. -2. **No Inference Allowed**: Do NOT keep memories based on implication, emotion, preference, or generalization. Only verbatim or direct restatements of user-provided facts are valid. However, minor formatting corrections (e.g., adding missing spaces between names, fixing obvious typos) are ALLOWED. -3. **Hallucination = Deletion**: If a memory contains any detail not directly expressed by the user, mark it for deletion. -4. **Timestamp Exception**: Memories may include timestamps (e.g., dates like "On December 19, 2026") derived from conversation metadata. If the date in the memory is likely the conversation time (even if not shown in the `messages` list), do NOT treat it as a hallucination or require a rewrite. - -Examples: -Messages: -- [user]: I love coding in Python. -- [assistant]: That's great! I assume you also contribute to open source projects? -Memory: User enjoys Python and contributes to open source. -Result: {{"keep": false, "reason": "User never stated they contribute to open source; this came from Assistant's assumption."}} - -Messages: -- [user]: I am tired. -- [assistant]: I hear you are tired. Rest is important. -Memory: User stated they are tired. -Result: {{"keep": true, "reason": "Direct restatement of user input, even if Assistant repeated it."}} - -Inputs: -messages: -{messages_inline} - -memories: -{memories_inline} - -Output Format: -- Return a JSON object with string keys ("0", "1", "2", ...) matching the input memory indices. -- Each value must be: {{ "keep": boolean, "reason": string }} -- "keep": true only if the memory is a direct reflection of the user's explicit words. -- "reason": brief, factual, and cites missing or unsupported content. - -Important: Output **only** the JSON. No extra text, explanations, markdown, or fields. -""" + You are a strict memory validator. + Your task is to identify and delete hallucinated memories that are not explicitly stated by the user in the provided messages. + + Rules: + 1. **Explicit Denial & Inconsistency**: If a memory claims something that the user explicitly denied or is clearly inconsistent with the user's statements, mark it for deletion. + 2. **Timestamp Exception**: Memories may include timestamps (e.g., dates like "On December 19, 2026") derived from conversation metadata. If the date in the memory is likely the conversation time (even if not shown in the `messages` list), do NOT treat it as a hallucination or require a rewrite. + + Example: + Messages: + [user]: I'm planning a trip to Japan next month for about a week. + [assistant]: That sounds great! Are you planning to visit Tokyo Disneyland? + [user]: No, I won't be going to Tokyo this time. I plan to stay in Kyoto and Osaka to avoid crowds. + + Memories: + {{ + "0": "User plans to travel to Japan for a week next month.", + "1": "User intends to visit Tokyo Disneyland.", + "2": "User plans to stay in Kyoto and Osaka." + }} + + Output: + {{ + "0": {{ "keep": true, "reason": "Explicitly stated by user." }}, + "1": {{ "keep": false, "reason": "User explicitly denied visiting Tokyo." }}, + "2": {{ "keep": true, "reason": "Explicitly stated by user." }} + }} + + Inputs: + Messages: + {messages_inline} + + Memories: + {memories_inline} + + Output Format: + - Return a JSON object with string keys ("0", "1", "2", ...) matching the input memory indices. + - Each value must be: {{ "keep": boolean, "reason": string }} + - "keep": true only if the memory is a direct reflection of the user's explicit words. + - "reason": brief, factual, and cites missing or unsupported content. + + Important: Output **only** the JSON. No extra text, explanations, markdown, or fields. + """ SIMPLE_STRUCT_ADD_BEFORE_SEARCH_PROMPT = """ From cfd9105ae1ede522a47ae86420687a81547d413b Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Mon, 12 Jan 2026 15:47:59 +0800 Subject: [PATCH 06/15] feat: delete result --- src/memos/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/utils.py b/src/memos/utils.py index 594180e8f..bec3927d4 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -87,7 +87,6 @@ def wrapper(*args, **kwargs): msg = ( f"[TIMER_WITH_STATUS] {log_prefix or fn.__name__} " f"took {elapsed_ms:.0f} ms{status_info}, args: {ctx_str}" - f", result: {result}" ) logger.info(msg) From 1b853e14e91dd2cc1828cf3b4326544807b05af1 Mon Sep 17 00:00:00 2001 From: Dubberman <48425266+whipser030@users.noreply.github.com> Date: Tue, 13 Jan 2026 09:52:18 +0800 Subject: [PATCH 07/15] fix: unuse rerank (#855) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update reader and search strategy * set strategy reader and search config * fix install problem * fix * fix test * turn off graph recall * turn off graph recall * turn off graph recall * fix Searcher input bug * fix Searcher * fix Search * fix bug * adjust strategy reader * adjust strategy reader * adjust search config input * reformat code * re pr * format repair * fix time issue * develop feedback process * feedback handler configuration * upgrade feedback using * add threshold * update prompt * update prompt * fix handler * add feedback scheduler * add handler change node update * add handler change node update * add handler change node update * add handler change node update * fix interface input * add chunk and ratio filter * update stopwords * fix messages queue * add seach_by_keywords_LIKE * add doc filter * add retrieve query * add retrieve queies * patch info filter * add log and make embedding safety net * add log and make embedding safety net * deduplicate add objects * use _add_memories_parallel * delete Special characters * delete Special characters * delete Special characters * delete Special characters * add source_doc_id * add source_doc_id * add reranker in init com.. * fix circle import * add feedback judgement * add feedback judgement * add pref feedback * add pref feedback * patch: get_memory func filter user id and make page chunk * add total num * add total num * add milvus pagination * fix merge implicit explicit pref * fix merge implicit explicit pref * fix merge implicit explicit pref * fix json load bug * knowledge raw_text replace memory * knowledge raw_text replace memory * knowledge raw_text replace memory * unuse rerank --------- Co-authored-by: 黑布林 <11641432+heiheiyouyou@user.noreply.gitee.com> Co-authored-by: CaralHsi Co-authored-by: chunyu li <78344051+fridayL@users.noreply.github.com> --- src/memos/api/handlers/formatters_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index ca87d95d2..11a1ef71b 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -180,8 +180,8 @@ def rerank_knowledge_mem( knowledge_mem, conversation_mem = separate_knowledge_and_conversation_mem(memories_list) knowledge_mem_top_k = max(int(top_k * file_mem_proportion), int(top_k - len(conversation_mem))) - reranked_knowledge_mem = reranker.rerank(query, knowledge_mem, top_k=len(knowledge_mem)) - reranked_knowledge_mem = [item[0] for item in reranked_knowledge_mem] + # rerank set unuse + reranked_knowledge_mem = knowledge_mem # TODO revoke sources replace memory value for item in reranked_knowledge_mem: From 893a593b02c4a4f395ebf0bb6d2a064770c1c7d0 Mon Sep 17 00:00:00 2001 From: Dubberman <48425266+whipser030@users.noreply.github.com> Date: Tue, 13 Jan 2026 14:01:25 +0800 Subject: [PATCH 08/15] fix: backtrack knowledge retrieval (#857) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update reader and search strategy * set strategy reader and search config * fix install problem * fix * fix test * turn off graph recall * turn off graph recall * turn off graph recall * fix Searcher input bug * fix Searcher * fix Search * fix bug * adjust strategy reader * adjust strategy reader * adjust search config input * reformat code * re pr * format repair * fix time issue * develop feedback process * feedback handler configuration * upgrade feedback using * add threshold * update prompt * update prompt * fix handler * add feedback scheduler * add handler change node update * add handler change node update * add handler change node update * add handler change node update * fix interface input * add chunk and ratio filter * update stopwords * fix messages queue * add seach_by_keywords_LIKE * add doc filter * add retrieve query * add retrieve queies * patch info filter * add log and make embedding safety net * add log and make embedding safety net * deduplicate add objects * use _add_memories_parallel * delete Special characters * delete Special characters * delete Special characters * delete Special characters * add source_doc_id * add source_doc_id * add reranker in init com.. * fix circle import * add feedback judgement * add feedback judgement * add pref feedback * add pref feedback * patch: get_memory func filter user id and make page chunk * add total num * add total num * add milvus pagination * fix merge implicit explicit pref * fix merge implicit explicit pref * fix merge implicit explicit pref * fix json load bug * knowledge raw_text replace memory * knowledge raw_text replace memory * knowledge raw_text replace memory * unuse rerank * backtrack knowledge retrieval --------- Co-authored-by: 黑布林 <11641432+heiheiyouyou@user.noreply.gitee.com> Co-authored-by: CaralHsi Co-authored-by: chunyu li <78344051+fridayL@users.noreply.github.com> --- src/memos/api/handlers/formatters_handler.py | 2 +- src/memos/api/handlers/search_handler.py | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 11a1ef71b..0d77f46d9 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -34,7 +34,7 @@ def to_iter(running: Any) -> list[Any]: def format_memory_item( - memory_data: Any, include_embedding: bool = False, save_sources: bool = True + memory_data: Any, include_embedding: bool = False, save_sources: bool = False ) -> dict[str, Any]: """ Format a single memory item for API response. diff --git a/src/memos/api/handlers/search_handler.py b/src/memos/api/handlers/search_handler.py index 32a970b22..3774410dc 100644 --- a/src/memos/api/handlers/search_handler.py +++ b/src/memos/api/handlers/search_handler.py @@ -5,12 +5,9 @@ using dependency injection for better modularity and testability. """ -import time - from typing import Any from memos.api.handlers.base_handler import BaseHandler, HandlerDependencies -from memos.api.handlers.formatters_handler import rerank_knowledge_mem from memos.api.product_models import APISearchRequest, SearchResponse from memos.log import get_logger from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import ( @@ -72,18 +69,6 @@ def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse # Restore original top_k for downstream logic or response metadata search_req.top_k = original_top_k - start_time = time.time() - text_mem = results["text_mem"] - results["text_mem"] = rerank_knowledge_mem( - self.reranker, - query=search_req.query, - text_mem=text_mem, - top_k=original_top_k, - file_mem_proportion=0.5, - ) - rerank_time = time.time() - start_time - self.logger.info(f"[Knowledge_replace_memory_time] Rerank time: {rerank_time} seconds") - self.logger.info( f"[SearchHandler] Final search results: count={len(results)} results={results}" ) From c83107b00500e0448f1dc7533203db2231c46175 Mon Sep 17 00:00:00 2001 From: Hao <120852460@qq.com> Date: Tue, 13 Jan 2026 16:55:14 +0800 Subject: [PATCH 09/15] docs: fix server start cmd --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index f19b97cc1..bf89fe1e4 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,7 @@ print(f"result: {res}") - Launch via the uvicorn command line interface (CLI) ###### Tips: Please ensure that Neo4j and Qdrant are running before executing the following command. ```bash + cd src uvicorn memos.api.server_api:app --host 0.0.0.0 --port 8001 --workers 1 ``` ##### For detailed integration steps, see the [`CLI Reference`](https://docs.openmem.net/open_source/getting_started/rest_api_server/#method-3client-install-with-CLI). From 68c9f5abf555ab2d4de16392361b5f439f6a9ff5 Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Tue, 13 Jan 2026 17:18:16 +0800 Subject: [PATCH 10/15] squashed commit --- src/memos/mem_reader/multi_modal_struct.py | 48 +++-- .../read_multi_modal/system_parser.py | 166 ++++++++++++++++-- src/memos/templates/tool_mem_prompts.py | 121 +++++++++---- 3 files changed, 273 insertions(+), 62 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 2ed1af53e..3bf6d4927 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -1,5 +1,6 @@ import concurrent.futures import json +import re import traceback from typing import Any @@ -547,7 +548,11 @@ def _process_tool_trajectory_fine( for fast_item in fast_memory_items: # Extract memory text (string content) mem_str = fast_item.memory or "" - if not mem_str.strip() or "tool:" not in mem_str: + if not mem_str.strip() or ( + "tool:" not in mem_str + and "[tool_calls]:" not in mem_str + and not re.search(r".*?", mem_str, re.DOTALL) + ): continue try: resp = self._get_llm_tool_trajectory_response(mem_str) @@ -563,6 +568,8 @@ def _process_tool_trajectory_fine( value=m.get("trajectory", ""), info=info, memory_type=memory_type, + correctness=m.get("correctness", ""), + experience=m.get("experience", ""), tool_used_status=m.get("tool_used_status", []), ) fine_memory_items.append(node) @@ -606,16 +613,22 @@ def _process_multi_modal_data( if mode == "fast": return fast_memory_items else: - # Part A: call llm + # Part A: call llm in parallel using thread pool fine_memory_items = [] - fine_memory_items_string_parser = self._process_string_fine( - fast_memory_items, info, custom_tags - ) - fine_memory_items.extend(fine_memory_items_string_parser) - fine_memory_items_tool_trajectory_parser = self._process_tool_trajectory_fine( - fast_memory_items, info - ) + with ContextThreadPoolExecutor(max_workers=2) as executor: + future_string = executor.submit( + self._process_string_fine, fast_memory_items, info, custom_tags + ) + future_tool = executor.submit( + self._process_tool_trajectory_fine, fast_memory_items, info + ) + + # Collect results + fine_memory_items_string_parser = future_string.result() + fine_memory_items_tool_trajectory_parser = future_tool.result() + + fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) # Part B: get fine multimodal items @@ -658,13 +671,18 @@ def _process_transfer_multi_modal_data( } fine_memory_items = [] - # Part A: call llm - fine_memory_items_string_parser = self._process_string_fine([raw_node], info, custom_tags) - fine_memory_items.extend(fine_memory_items_string_parser) + # Part A: call llm in parallel using thread pool + with ContextThreadPoolExecutor(max_workers=2) as executor: + future_string = executor.submit( + self._process_string_fine, [raw_node], info, custom_tags + ) + future_tool = executor.submit(self._process_tool_trajectory_fine, [raw_node], info) - fine_memory_items_tool_trajectory_parser = self._process_tool_trajectory_fine( - [raw_node], info - ) + # Collect results + fine_memory_items_string_parser = future_string.result() + fine_memory_items_tool_trajectory_parser = future_tool.result() + + fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) # Part B: get fine multimodal items diff --git a/src/memos/mem_reader/read_multi_modal/system_parser.py b/src/memos/mem_reader/read_multi_modal/system_parser.py index deb2a9832..49264ce2c 100644 --- a/src/memos/mem_reader/read_multi_modal/system_parser.py +++ b/src/memos/mem_reader/read_multi_modal/system_parser.py @@ -42,9 +42,10 @@ def create_source( info: dict[str, Any], ) -> SourceMessage: """Create SourceMessage from system message.""" - content = message["content"] + + content = message.get("content", "") if isinstance(content, dict): - content = content["text"] + content = content.get("text", "") content_wo_tool_schema = re.sub( r"(.*?)", @@ -84,17 +85,154 @@ def parse_fast( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: - content = message["content"] + content = message.get("content", "") if isinstance(content, dict): - content = content["text"] + content = content.get("text", "") - # Replace tool_schema content with "omitted" in remaining content - content_wo_tool_schema = re.sub( - r"(.*?)", - r"omitted", - content, - flags=re.DOTALL, - ) + # Find first tool_schema block + tool_schema_pattern = r"(.*?)" + match = re.search(tool_schema_pattern, content, flags=re.DOTALL) + + if match: + original_text = match.group(0) # Complete ... block + schema_content = match.group(1) # Content between the tags + + # Parse tool schema + try: + tool_schema = json.loads(schema_content) + assert isinstance(tool_schema, list), "Tool schema must be a list[dict]" + except json.JSONDecodeError: + try: + tool_schema = ast.literal_eval(schema_content) + assert isinstance(tool_schema, list), "Tool schema must be a list[dict]" + except (ValueError, SyntaxError, AssertionError): + logger.warning( + f"[SystemParser] Failed to parse tool schema with both JSON and ast.literal_eval: {schema_content[:100]}..." + ) + tool_schema = None + except AssertionError: + logger.warning( + f"[SystemParser] Tool schema must be a list[dict]: {schema_content[:100]}..." + ) + tool_schema = None + + # Process and replace + if tool_schema is not None: + + def remove_descriptions(obj): + """Recursively remove all 'description' keys from a nested dict/list structure.""" + if isinstance(obj, dict): + return { + k: remove_descriptions(v) for k, v in obj.items() if k != "description" + } + elif isinstance(obj, list): + return [remove_descriptions(item) for item in obj] + else: + return obj + + def keep_first_layer_params(obj): + """Only keep first layer parameter information, remove nested parameters.""" + if isinstance(obj, list): + return [keep_first_layer_params(item) for item in obj] + elif isinstance(obj, dict): + result = {} + for k, v in obj.items(): + if k == "properties" and isinstance(v, dict): + # For properties, only keep first layer parameter names and types + first_layer_props = {} + for param_name, param_info in v.items(): + if isinstance(param_info, dict): + # Only keep type and basic info, remove nested properties + first_layer_props[param_name] = { + key: val + for key, val in param_info.items() + if key in ["type", "enum", "required"] + and key != "properties" + } + else: + first_layer_props[param_name] = param_info + result[k] = first_layer_props + elif k == "parameters" and isinstance(v, dict): + # Process parameters object but only keep first layer + result[k] = keep_first_layer_params(v) + elif isinstance(v, dict | list) and k != "properties": + result[k] = keep_first_layer_params(v) + else: + result[k] = v + return result + else: + return obj + + def format_tool_schema_readable(tool_schema): + """Convert tool schema to readable format: tool_name: [param1 (type1), ...](required: ...)""" + lines = [] + for tool in tool_schema: + if not tool: + continue + + # Handle both new format and old-style OpenAI function format + if tool.get("type") == "function" and "function" in tool: + tool_info = tool.get("function") + if not tool_info: + continue + else: + tool_info = tool + + tool_name = tool_info.get("name", "unknown") + params_obj = tool_info.get("parameters", {}) + properties = params_obj.get("properties", {}) + required = params_obj.get("required", []) + + # Format parameters + param_strs = [] + for param_name, param_info in properties.items(): + if isinstance(param_info, dict): + param_type = param_info.get("type", "any") + # Handle enum + if "enum" in param_info and param_info["enum"] is not None: + # Ensure all enum values are strings + enum_values = [str(v) for v in param_info["enum"]] + param_type = f"{param_type}[{', '.join(enum_values)}]" + param_strs.append(f"{param_name} ({param_type})") + else: + param_strs.append(f"{param_name} (any)") + + # Format required parameters + # Ensure all required parameter names are strings + required_strs = [str(r) for r in required] if required else [] + required_str = ( + f"(required: {', '.join(required_strs)})" if required_strs else "" + ) + + # Construct the line + params_part = f"[{', '.join(param_strs)}]" if param_strs else "[]" + line = f"{tool_name}: {params_part}{required_str}" + lines.append(line) + + return "\n".join(lines) + + # Compression mode literal: ["compress", "omit"]. compress is core-information-preserving, omit is full omission. + compression_mode = "compress" + if compression_mode == "omit": + processed_text = "omitted" + elif compression_mode == "compress": + # First keep only first layer params, then remove descriptions + simple_tool_schema = keep_first_layer_params(tool_schema) + simple_tool_schema = remove_descriptions(simple_tool_schema) + # change to readable format + readable_schema = format_tool_schema_readable(simple_tool_schema) + + processed_text = f"{readable_schema}" + else: + raise ValueError(f"Unknown compression mode: {compression_mode}") + + content = content.replace(original_text, processed_text, 1) + + parts = ["system: "] + if message.get("chat_time"): + parts.append(f"[{message.get('chat_time')}]: ") + prefix = "".join(parts) + msg_line = f"{prefix}{content}\n" source = self.create_source(message, info) @@ -104,7 +242,7 @@ def parse_fast( session_id = info_.pop("session_id", "") # Split parsed text into chunks - content_chunks = self._split_text(content_wo_tool_schema) + content_chunks = self._split_text(msg_line) memory_items = [] for _chunk_idx, chunk_text in enumerate(content_chunks): @@ -132,9 +270,9 @@ def parse_fine( info: dict[str, Any], **kwargs, ) -> list[TextualMemoryItem]: - content = message["content"] + content = message.get("content", "") if isinstance(content, dict): - content = content["text"] + content = content.get("text", "") try: tool_schema = json.loads(content) assert isinstance(tool_schema, list), "Tool schema must be a list[dict]" diff --git a/src/memos/templates/tool_mem_prompts.py b/src/memos/templates/tool_mem_prompts.py index 7d5363956..2fe8840b7 100644 --- a/src/memos/templates/tool_mem_prompts.py +++ b/src/memos/templates/tool_mem_prompts.py @@ -1,26 +1,47 @@ TOOL_TRAJECTORY_PROMPT_ZH = """ -你是一个专业的工具调用轨迹提取专家。你的任务是从给定的对话消息中提取完整的工具调用轨迹经验。 +你是一个专业的工具经验提取专家。你的任务是从给定的对话消息中提取完整的工具调用轨迹经验。 -## 提取规则: -1. 只有当对话中存在有价值的工具调用过程时才进行提取 -2. 有价值的轨迹至少包含以下元素: - - 用户的问题(user message) - - 助手的工具调用尝试(assistant message with tool_calls) - - 工具的执行结果(tool message with tool_call_id and content,无论成功或失败) - - 助手的响应(assistant message,无论是否给出最终答案) +## 分析判断步骤: +**步骤1:判断任务完成度** +根据用户反馈,判定correctness:success(成功)或 failed(失败),用户反馈决定权大于执行结果,用户反馈有误,则判定为failed + +**步骤2:成功轨迹(success)- 经验提炼** +从成功模式中提炼通用原则或规则,采用"when...then..."结构: +- when: 明确描述触发该经验的场景特征(任务类型、工具环境、参数特征等) +- then: 总结有效的参数模式、调用策略、最佳实践 +注意:经验是解决整个轨迹问题级别的,不仅仅针对单个工具 + +**步骤3:失败轨迹(failed)- 错误分析与经验提炼** +3.1 工具需求判断 + - 任务是否需要工具?(需要/直接回答/误调用) +3.2 工具调用检查 + - 工具存在性:是否在system中提供 + - 工具选择:是否选对工具 + - 参数正确性:是否符合类型定义 + - 幻觉检测:是否调用不存在的工具 +3.3 错误根因定位 + 结合消息中的错误反馈信息和上述分析,精准输出根本原因 +3.4 经验提炼(核心) + 从失败模式中提炼通用原则或规则,采用"when...then..."结构: + - when: 明确描述触发该经验的场景特征(任务类型、工具环境、参数特征等) + - then: 给出避免错误的通用策略、正确调用方式或决策规则 + 注意:经验是解决整个轨迹问题级别的,不仅仅针对单个工具 ## 输出格式: 返回一个JSON数组,格式如下: + ```json [ { - "trajectory": "自然语言输出包含'任务、使用的工具、工具观察、最终回答'的完整精炼的总结,体现顺序", + "correctness": "success 或 failed", + "trajectory": "精炼完整的自然语言总结,包含:[任务(用户任务) -> 执行动作(调用的工具/直接回答) -> 执行结果] (可能多轮) -> 最终回答", + "experience": "采用when...then...格式,例如:'when 遇到XX的任务时,应该YY'", "tool_used_status": [ { - "used_tool": "工具名1", + "used_tool": "工具名称(如果调用了工具)", "success_rate": "0.0-1.0之间的数值,表示该工具在本次轨迹中的成功率", "error_type": "调用失败时的错误类型和描述,成功时为空字符串", - "experience": "该工具的使用经验,比如常见的参数模式、执行特点、结果解读方式等" + "tool_experience": "调用该工具的经验,包括可能的前置条件和可能的后置效果" } ] } @@ -28,42 +49,72 @@ ``` ## 注意事项: -- 如果对话中没有完整的工具调用轨迹,返回空数组 - 每个轨迹必须是独立的完整过程 - 一个轨迹中可能涉及多个工具的使用,每个工具在tool_used_status中独立记录 +- 如果没有调用工具,tool_used_status为空数组[] +- 如果多条轨迹存在顺序依赖关系,需要将它们视为一条轨迹 - 只提取事实内容,不要添加任何解释或额外信息 - 确保返回的是有效的JSON格式 +- 输出的trajectory需要按照messages的发展顺序排列 +- experience必须是通用的、可复用的经验规则,而不是针对具体案例的描述 +- 无论成功或失败,都要提炼经验并使用when...then...格式 -请分析以下对话消息并提取工具调用轨迹: - +请分析以下对话消息并提取工具调用轨迹,基于以下对话消息: + {messages} - + """ TOOL_TRAJECTORY_PROMPT_EN = """ -You are a professional tool call trajectory extraction expert. Your task is to extract valuable tool call trajectory experiences from given conversation messages. +You are a professional tool experience extraction expert. Your task is to extract complete tool call trajectory experiences from given conversation messages. + +## Analysis and Judgment Steps: + +**Step 1: Assess Task Completion** +Determine correctness based on user feedback: success or failed, user feedback has higher priority than execution results, if user feedback is incorrect, then determine as failed + +**Step 2: Successful Trajectory (success) - Experience Extraction** +Extract general principles or rules from success patterns, using "when...then..." structure: +- when: clearly describe the scenario characteristics that trigger this experience (task type, tool environment, parameter characteristics, etc.) +- then: summarize effective parameter patterns, calling strategies, and best practices +Note: Experience is at the trajectory-level problem-solving, not just for a single tool -## Extraction Rules: -1. Only extract when there are valuable tool calling processes in the conversation -2. Valuable trajectories must contain at least the following elements: - - User's question (user message) - - Assistant's tool call attempt (assistant message with tool_calls) - - Tool execution results (tool message with tool_call_id and content, regardless of success or failure) - - Assistant's response (assistant message, whether or not a final answer is given) +**Step 3: Failed Trajectory (failed) - Error Analysis and Experience Extraction** + +3.1 Tool Requirement Assessment + - Does the task require tools? (required/direct answer/unnecessary call) + +3.2 Tool Call Verification + - Tool availability: provided in system? + - Tool selection: correct tool chosen? + - Parameter correctness: conform to type definitions? + - Hallucination detection: calling non-existent tools? + +3.3 Root Cause Identification + Combine error feedback from messages with above analysis to precisely output root cause + +3.4 Experience Extraction (Core) + Extract general principles or rules from failure patterns, using "when...then..." structure: + - when: clearly describe the scenario characteristics that trigger this experience (task type, tool environment, parameter characteristics, etc.) + - then: provide general strategies to avoid errors, correct calling approaches, or decision rules + Note: Experience is at the trajectory-level problem-solving, not just for a single tool ## Output Format: Return a JSON array in the following format: + ```json [ { - "trajectory": "Natural language summary containing 'task, tools used, tool observations, final answer' in a complete and refined manner, reflecting the sequence", + "correctness": "success or failed", + "trajectory": "Concise and complete natural language summary including: [task (user task) -> execution action (tool called/direct answer) -> execution result] (possibly multiple rounds) -> final answer", + "experience": "Use when...then... format, e.g., 'when encountering XX tasks, should do YY'", "tool_used_status": [ { - "used_tool": "Tool Name 1", - "success_rate": "Numerical value between 0.0-1.0, indicating the success rate of this tool in the current trajectory", + "used_tool": "Tool name (if tool was called)", + "success_rate": "Numerical value between 0.0-1.0, indicating the success rate of this tool in current trajectory", "error_type": "Error type and description when call fails, empty string when successful", - "experience": "Usage experience of this tool, such as common parameter patterns, execution characteristics, result interpretation methods, etc." + "tool_experience": "Experience of using this tool, including possible preconditions and possible post-effects" } ] } @@ -71,14 +122,18 @@ ``` ## Notes: -- If there are no complete tool call trajectories in the conversation, return an empty array - Each trajectory must be an independent complete process -- Multiple tools may be used in one trajectory, each tool is recorded independently in tool_used_status -- Only extract factual content, do not add any additional explanations or information +- A trajectory may involve multiple tools, each recorded independently in tool_used_status +- If no tool was called, tool_used_status is an empty array [] +- If multiple trajectories have sequential dependencies, treat them as one trajectory +- Only extract factual content, do not add any explanations or extra information - Ensure the returned content is valid JSON format +- The trajectory should be arranged according to the development order of messages +- Experience must be general and reusable rules, not descriptions specific to concrete cases +- Whether success or failed, always extract experience using when...then... format -Please analyze the following conversation messages and extract tool call trajectories: - +Please analyze the following conversation messages and extract tool call trajectories based on: + {messages} - + """ From fd0f2adf8df83dca0fb220037d35167952196630 Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Tue, 13 Jan 2026 20:31:31 +0800 Subject: [PATCH 11/15] modify get tool memory, modify search tool memory field --- src/memos/api/handlers/formatters_handler.py | 3 + src/memos/api/handlers/memory_handler.py | 75 ++++++++++++------- src/memos/api/product_models.py | 4 +- .../read_multi_modal/system_parser.py | 19 ++++- src/memos/memories/textual/tree.py | 3 +- 5 files changed, 72 insertions(+), 32 deletions(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 94988295b..f9e2022a9 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -84,6 +84,7 @@ def post_process_pref_mem( { "cube_id": mem_cube_id, "memories": pref_formatted_mem, + "total_nodes": len(pref_formatted_mem), } ) pref_instruction, pref_note = instruct_completion(pref_formatted_mem) @@ -116,12 +117,14 @@ def post_process_textual_mem( { "cube_id": mem_cube_id, "memories": fact_mem, + "total_nodes": len(fact_mem), } ) memories_result["tool_mem"].append( { "cube_id": mem_cube_id, "memories": tool_mem, + "total_nodes": len(tool_mem), } ) return memories_result diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index f6f1402fc..d81d1aba2 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -6,7 +6,11 @@ from typing import TYPE_CHECKING, Any, Literal -from memos.api.handlers.formatters_handler import format_memory_item +from memos.api.handlers.formatters_handler import ( + format_memory_item, + post_process_pref_mem, + post_process_textual_mem, +) from memos.api.product_models import ( DeleteMemoryRequest, DeleteMemoryResponse, @@ -209,54 +213,67 @@ def handle_get_memory(memory_id: str, naive_mem_cube: NaiveMemCube) -> GetMemory def handle_get_memories( get_mem_req: GetMemoryRequest, naive_mem_cube: NaiveMemCube ) -> GetMemoryResponse: - # TODO: Implement get memory with filter + results: dict[str, Any] = {"text_mem": [], "pref_mem": [], "tool_mem": []} memories = naive_mem_cube.text_mem.get_all( user_name=get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, - ) - total_nodes = memories["total_nodes"] - total_edges = memories["total_edges"] - del memories["total_nodes"] - del memories["total_edges"] + filter=get_mem_req.filter, + )["nodes"] + + results = post_process_textual_mem(results, memories, get_mem_req.mem_cube_id) + + if not get_mem_req.include_tool_memory: + results["tool_mem"] = [] preferences: list[TextualMemoryItem] = [] - total_pref = 0 + format_preferences = [] if get_mem_req.include_preference and naive_mem_cube.pref_mem is not None: filter_params: dict[str, Any] = {} if get_mem_req.user_id is not None: filter_params["user_id"] = get_mem_req.user_id if get_mem_req.mem_cube_id is not None: filter_params["mem_cube_id"] = get_mem_req.mem_cube_id - - preferences, total_pref = naive_mem_cube.pref_mem.get_memory_by_filter( + if get_mem_req.filter is not None: + # Check and remove user_id/mem_cube_id from filter if present + filter_copy = get_mem_req.filter.copy() + removed_fields = [] + + if "user_id" in filter_copy: + filter_copy.pop("user_id") + removed_fields.append("user_id") + if "mem_cube_id" in filter_copy: + filter_copy.pop("mem_cube_id") + removed_fields.append("mem_cube_id") + + if removed_fields: + logger.warning( + f"Fields {removed_fields} found in filter will be ignored. " + f"Use request-level user_id/mem_cube_id parameters instead." + ) + + filter_params.update(filter_copy) + + preferences, _ = naive_mem_cube.pref_mem.get_memory_by_filter( filter_params, page=get_mem_req.page, page_size=get_mem_req.page_size ) format_preferences = [format_memory_item(item) for item in preferences] - return GetMemoryResponse( - message="Memories retrieved successfully", - data={ - "text_mem": [ - { - "cube_id": get_mem_req.mem_cube_id, - "memories": memories, - "total_nodes": total_nodes, - "total_edges": total_edges, - } - ], - "pref_mem": [ - { - "cube_id": get_mem_req.mem_cube_id, - "memories": format_preferences, - "total_nodes": total_pref, - } - ], - }, + results = post_process_pref_mem( + results, format_preferences, get_mem_req.mem_cube_id, get_mem_req.include_preference ) + # Filter to only keep text_mem, pref_mem, tool_mem + filtered_results = { + "text_mem": results.get("text_mem", []), + "pref_mem": results.get("pref_mem", []), + "tool_mem": results.get("tool_mem", []), + } + + return GetMemoryResponse(message="Memories retrieved successfully", data=filtered_results) + def handle_delete_memories(delete_mem_req: DeleteMemoryRequest, naive_mem_cube: NaiveMemCube): logger.info( diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index d5f301c9d..b2f8a9fa3 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -771,7 +771,9 @@ class GetMemoryRequest(BaseRequest): mem_cube_id: str = Field(..., description="Cube ID") user_id: str | None = Field(None, description="User ID") - include_preference: bool = Field(True, description="Whether to handle preference memory") + include_preference: bool = Field(True, description="Whether to return preference memory") + include_tool_memory: bool = Field(False, description="Whether to return tool memory") + filter: dict[str, Any] | None = Field(None, description="Filter for the memory") page: int | None = Field( None, description="Page number (starts from 1). If None, exports all data without pagination.", diff --git a/src/memos/mem_reader/read_multi_modal/system_parser.py b/src/memos/mem_reader/read_multi_modal/system_parser.py index 49264ce2c..03a49afd8 100644 --- a/src/memos/mem_reader/read_multi_modal/system_parser.py +++ b/src/memos/mem_reader/read_multi_modal/system_parser.py @@ -1,6 +1,7 @@ """Parser for system messages.""" import ast +import hashlib import json import re import uuid @@ -293,6 +294,22 @@ def parse_fine( user_id = info_.pop("user_id", "") session_id = info_.pop("session_id", "") + # Deduplicate tool schemas based on memory content + # Use hash as key for efficiency, but store original string to handle collisions + seen_memories = {} # hash -> memory_str mapping + unique_schemas = [] + for schema in tool_schema: + memory_str = json.dumps(schema, ensure_ascii=False, sort_keys=True) + # Use SHA-256 for better collision resistance + memory_hash = hashlib.sha256(memory_str.encode("utf-8")).hexdigest() + + # Check if hash exists and verify the actual content (handle potential collision) + if memory_hash not in seen_memories: + seen_memories[memory_hash] = memory_str + unique_schemas.append(schema) + elif seen_memories[memory_hash] != memory_str: + unique_schemas.append(schema) + return [ TextualMemoryItem( id=str(uuid.uuid4()), @@ -306,5 +323,5 @@ def parse_fine( info=info_, ), ) - for schema in tool_schema + for schema in unique_schemas ] diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index c486e6cf6..b963cfa9b 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -327,13 +327,14 @@ def get_all( user_id: str | None = None, page: int | None = None, page_size: int | None = None, + filter: dict | None = None, ) -> dict: """Get all memories. Returns: list[TextualMemoryItem]: List of all memories. """ graph_output = self.graph_store.export_graph( - user_name=user_name, user_id=user_id, page=page, page_size=page_size + user_name=user_name, user_id=user_id, page=page, page_size=page_size, filter=filter ) return graph_output From 85c6a63372faf06fdbd988f2b92005ee735432a5 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 13 Jan 2026 20:37:19 +0800 Subject: [PATCH 12/15] chore: update python-tests.yml --- .github/workflows/python-tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 9fc53d5dd..de300c193 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -11,12 +11,14 @@ on: branches: - "main" - "dev" + - "dev*" - "feat/*" - "test" pull_request: branches: - "main" - "dev" + - "dev*" - "feat/*" - "test" From e4c67a433a6dda544d0d1972f35d6d5aba5c3275 Mon Sep 17 00:00:00 2001 From: OhhhhPi <55885746+OhhhhPi@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:15:42 +0800 Subject: [PATCH 13/15] fix: Qdrant empty when using neo4j-community (#843) * fix: Qdrant empty when using neo4j-community * fix: Qdrant empty when using neo4j-community * fix: Qdrant empty when using neo4j-community * format --------- Co-authored-by: CaralHsi --- src/memos/graph_dbs/neo4j_community.py | 104 ++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/neo4j_community.py b/src/memos/graph_dbs/neo4j_community.py index e943616da..f0be3d858 100644 --- a/src/memos/graph_dbs/neo4j_community.py +++ b/src/memos/graph_dbs/neo4j_community.py @@ -5,7 +5,7 @@ from typing import Any from memos.configs.graph_db import Neo4jGraphDBConfig -from memos.graph_dbs.neo4j import Neo4jGraphDB, _prepare_node_metadata +from memos.graph_dbs.neo4j import Neo4jGraphDB, _flatten_info_fields, _prepare_node_metadata from memos.log import get_logger from memos.vec_dbs.factory import VecDBFactory from memos.vec_dbs.item import VecDBItem @@ -104,6 +104,108 @@ def add_node( metadata=metadata, ) + def add_nodes_batch(self, nodes: list[dict[str, Any]], user_name: str | None = None) -> None: + if not nodes: + logger.warning("[add_nodes_batch] Empty nodes list, skipping") + return + + effective_user_name = user_name if user_name else self.config.user_name + + vec_items: list[VecDBItem] = [] + prepared_nodes: list[dict[str, Any]] = [] + + for node_data in nodes: + try: + node_id = node_data.get("id") + memory = node_data.get("memory") + metadata = node_data.get("metadata", {}) + + if node_id is None or memory is None: + logger.warning("[add_nodes_batch] Skip invalid node: missing id/memory") + continue + + if not self.config.use_multi_db and (self.config.user_name or effective_user_name): + metadata["user_name"] = effective_user_name + + metadata = _prepare_node_metadata(metadata) + metadata = _flatten_info_fields(metadata) + + embedding = metadata.pop("embedding", None) + if embedding is None: + raise ValueError(f"Missing 'embedding' in metadata for node {node_id}") + + vector_sync_status = "success" + vec_items.append( + VecDBItem( + id=node_id, + vector=embedding, + payload={ + "memory": memory, + "vector_sync": vector_sync_status, + **metadata, + }, + ) + ) + + created_at = metadata.pop("created_at") + updated_at = metadata.pop("updated_at") + metadata["vector_sync"] = vector_sync_status + + prepared_nodes.append( + { + "id": node_id, + "memory": memory, + "created_at": created_at, + "updated_at": updated_at, + "metadata": metadata, + } + ) + except Exception as e: + logger.error( + f"[add_nodes_batch] Failed to prepare node {node_data.get('id', 'unknown')}: {e}", + exc_info=True, + ) + continue + + if not prepared_nodes: + logger.warning("[add_nodes_batch] No valid nodes to insert after preparation") + return + + try: + self.vec_db.add(vec_items) + except Exception as e: + logger.warning(f"[VecDB] batch insert failed: {e}") + for node in prepared_nodes: + node["metadata"]["vector_sync"] = "failed" + + query = """ + UNWIND $nodes AS node + MERGE (n:Memory {id: node.id}) + SET n.memory = node.memory, + n.created_at = datetime(node.created_at), + n.updated_at = datetime(node.updated_at), + n += node.metadata + """ + + nodes_data = [ + { + "id": node["id"], + "memory": node["memory"], + "created_at": node["created_at"], + "updated_at": node["updated_at"], + "metadata": node["metadata"], + } + for node in prepared_nodes + ] + + try: + with self.driver.session(database=self.db_name) as session: + session.run(query, nodes=nodes_data) + logger.info(f"[add_nodes_batch] Successfully inserted {len(prepared_nodes)} nodes") + except Exception as e: + logger.error(f"[add_nodes_batch] Failed to add nodes: {e}", exc_info=True) + raise + def get_children_with_embeddings( self, id: str, user_name: str | None = None ) -> list[dict[str, Any]]: From 0a9398d52e6a409a6c34e94f6f9eece897bc7cdd Mon Sep 17 00:00:00 2001 From: Dubberman <48425266+whipser030@users.noreply.github.com> Date: Wed, 14 Jan 2026 14:35:54 +0800 Subject: [PATCH 14/15] fix: use knowledge embedding score rerank (#867) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update reader and search strategy * set strategy reader and search config * fix install problem * fix * fix test * turn off graph recall * turn off graph recall * turn off graph recall * fix Searcher input bug * fix Searcher * fix Search * fix bug * adjust strategy reader * adjust strategy reader * adjust search config input * reformat code * re pr * format repair * fix time issue * develop feedback process * feedback handler configuration * upgrade feedback using * add threshold * update prompt * update prompt * fix handler * add feedback scheduler * add handler change node update * add handler change node update * add handler change node update * add handler change node update * fix interface input * add chunk and ratio filter * update stopwords * fix messages queue * add seach_by_keywords_LIKE * add doc filter * add retrieve query * add retrieve queies * patch info filter * add log and make embedding safety net * add log and make embedding safety net * deduplicate add objects * use _add_memories_parallel * delete Special characters * delete Special characters * delete Special characters * delete Special characters * add source_doc_id * add source_doc_id * add reranker in init com.. * fix circle import * add feedback judgement * add feedback judgement * add pref feedback * add pref feedback * patch: get_memory func filter user id and make page chunk * add total num * add total num * add milvus pagination * fix merge implicit explicit pref * fix merge implicit explicit pref * fix merge implicit explicit pref * fix json load bug * knowledge raw_text replace memory * knowledge raw_text replace memory * knowledge raw_text replace memory * unuse rerank * backtrack knowledge retrieval * norerank knowledge data * use embedding rerank knowledge --------- Co-authored-by: 黑布林 <11641432+heiheiyouyou@user.noreply.gitee.com> Co-authored-by: CaralHsi Co-authored-by: chunyu li <78344051+fridayL@users.noreply.github.com> --- src/memos/api/handlers/formatters_handler.py | 9 ++++++++- src/memos/api/handlers/search_handler.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 81b957005..29e376d33 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -34,7 +34,7 @@ def to_iter(running: Any) -> list[Any]: def format_memory_item( - memory_data: Any, include_embedding: bool = False, save_sources: bool = False + memory_data: Any, include_embedding: bool = False, save_sources: bool = True ) -> dict[str, Any]: """ Format a single memory item for API response. @@ -186,6 +186,13 @@ def rerank_knowledge_mem( # rerank set unuse reranked_knowledge_mem = knowledge_mem + # Sort by relativity in descending order + reranked_knowledge_mem = sorted( + reranked_knowledge_mem, + key=lambda item: item.get("metadata", {}).get("relativity", 0.0), + reverse=True, + ) + # TODO revoke sources replace memory value for item in reranked_knowledge_mem: item["memory"] = item["metadata"]["sources"][0]["content"] diff --git a/src/memos/api/handlers/search_handler.py b/src/memos/api/handlers/search_handler.py index 3774410dc..e5af52f87 100644 --- a/src/memos/api/handlers/search_handler.py +++ b/src/memos/api/handlers/search_handler.py @@ -5,9 +5,12 @@ using dependency injection for better modularity and testability. """ +import time + from typing import Any from memos.api.handlers.base_handler import BaseHandler, HandlerDependencies +from memos.api.handlers.formatters_handler import rerank_knowledge_mem from memos.api.product_models import APISearchRequest, SearchResponse from memos.log import get_logger from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import ( @@ -69,6 +72,18 @@ def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse # Restore original top_k for downstream logic or response metadata search_req.top_k = original_top_k + start_time = time.time() + text_mem = results["text_mem"] + results["text_mem"] = rerank_knowledge_mem( + self.reranker, + query=search_req.query, + text_mem=text_mem, + top_k=original_top_k, + file_mem_proportion=0.5, + ) + rerank_time = time.time() - start_time + + self.logger.info(f"[Knowledge_replace_memory_time] Rerank time: {rerank_time} seconds") self.logger.info( f"[SearchHandler] Final search results: count={len(results)} results={results}" ) From dc103a30abdaf4c6a5d4b8e11f1471de991e0af5 Mon Sep 17 00:00:00 2001 From: Wang Daoji <75928131+Wang-Daoji@users.noreply.github.com> Date: Wed, 14 Jan 2026 18:43:34 +0800 Subject: [PATCH 15/15] fix: optimization_and_fix (#868) Co-authored-by: yuan.wang --- .../textual/prefer_text_memory/extractor.py | 8 ++-- src/memos/templates/prefer_complete_prompt.py | 46 ++++++++++++++++++- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/memos/memories/textual/prefer_text_memory/extractor.py b/src/memos/memories/textual/prefer_text_memory/extractor.py index 0c6e5339d..aa4f3cb44 100644 --- a/src/memos/memories/textual/prefer_text_memory/extractor.py +++ b/src/memos/memories/textual/prefer_text_memory/extractor.py @@ -70,7 +70,7 @@ def extract_explicit_preference(self, qa_pair: MessageList | str) -> dict[str, A try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) if not response: - logger.error( + logger.info( f"[prefer_extractor]: (Error) LLM response content is {response} when extracting explicit preference" ) return None @@ -80,7 +80,7 @@ def extract_explicit_preference(self, qa_pair: MessageList | str) -> dict[str, A d["preference"] = d.pop("explicit_preference") return result except Exception as e: - logger.error(f"Error extracting explicit preference: {e}, return None") + logger.info(f"Error extracting explicit preference: {e}, return None") return None def extract_implicit_preference(self, qa_pair: MessageList | str) -> dict[str, Any] | None: @@ -98,7 +98,7 @@ def extract_implicit_preference(self, qa_pair: MessageList | str) -> dict[str, A try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) if not response: - logger.error( + logger.info( f"[prefer_extractor]: (Error) LLM response content is {response} when extracting implicit preference" ) return None @@ -108,7 +108,7 @@ def extract_implicit_preference(self, qa_pair: MessageList | str) -> dict[str, A d["preference"] = d.pop("implicit_preference") return result except Exception as e: - logger.error(f"Error extracting implicit preferences: {e}, return None") + logger.info(f"Error extracting implicit preferences: {e}, return None") return None def _process_single_chunk_explicit( diff --git a/src/memos/templates/prefer_complete_prompt.py b/src/memos/templates/prefer_complete_prompt.py index a67f0c12c..d5c73d11a 100644 --- a/src/memos/templates/prefer_complete_prompt.py +++ b/src/memos/templates/prefer_complete_prompt.py @@ -69,6 +69,28 @@ (preferences that the user did not explicitly state but can be reasonably inferred from their underlying motivations, behavioral patterns, decision-making logic, and latent needs). Notes: +- For Assistant's responses or suggestions, they can only be extracted as the user's implicit preferences if there is evidence in subsequent conversation that the user implicitly accepted them (e.g., adoption, agreement, acting on the suggestion, etc.). Assistant suggestions alone do not constitute user preferences. +- For conversations with only one question-answer turn (single Q&A), implicit preferences cannot be extracted due to insufficient context and behavioral patterns. Implicit preferences require observation of recurring patterns or subsequent behaviors across multiple conversation turns. + +Counter-examples: +【Counter-example 1 - Assistant suggestion not accepted by user】 +Conversation: +User: I want to buy a phone, any recommendations? +Assistant: I suggest considering the iPhone 15 Pro, it has powerful performance and great camera quality. +User: What about the iPhone 16? +Assistant: The iPhone 16 is expected to be released in September 2026, it will have a new design and features. + +Analysis: Although the Assistant recommended iPhone, the user showed no acceptance (e.g., "okay", "I'll consider it", or follow-up questions about iPhone), so this cannot be extracted as the user's implicit preference. +Result: Cannot extract implicit preference + +【Counter-example 2 - Single question-answer situation】 +Conversation: +User: Any good movies recently? +Assistant: "Dune 2" has good reviews, it's a sci-fi epic genre. + +Analysis: This is just a single simple Q&A exchange. The user provided no further feedback or behavior, lacking sufficient context to infer user preferences for sci-fi movies or other hidden tendencies. +Result: Cannot extract implicit preference + - Implicit preferences refer to user inclinations or choices that are not directly expressed, but can be deeply inferred by analyzing: * **Hidden motivations**: What underlying needs or goals might drive the user's behavior? * **Behavioral patterns**: What recurring patterns or tendencies can be observed? @@ -77,7 +99,6 @@ * **Contextual signals**: What do the user's choices, comparisons, exclusions, or scenario selections reveal about their deeper preferences? - Do not treat explicitly stated preferences as implicit preferences; this prompt is only for inferring preferences that are not directly mentioned. - Go beyond surface-level facts to understand the user's hidden possibilities and underlying logic. -- For Assistant's responses or suggestions, they can only be extracted as the user's implicit preferences if there is evidence in subsequent conversation that the user implicitly accepted them (e.g., adoption, agreement, acting on the suggestion, etc.). Assistant suggestions alone do not constitute user preferences. Requirements: 1. Only make inferences when there is sufficient evidence in the conversation; avoid unsupported or far-fetched guesses. @@ -110,6 +131,28 @@ (用户没有明确表述,但可以通过分析其潜在动机、行为模式、决策逻辑和隐藏需求深度推断出的偏好)。 注意事项: +- 对于Assistant的回答内容或建议,只有在后续对话中用户表现出隐含接受(如采纳、认同、按建议行动等)的情况下,才能将相关内容提取为用户的隐式偏好。单纯的Assistant建议本身不构成用户偏好。 +- 对于只有一轮问答(一问一答)的对话,由于缺乏足够的上下文和行为模式,不能提取隐式偏好。隐式偏好需要从多轮对话中观察到的重复模式或后续行为来推断。 + +反例示例: +【反例1 - 未被用户认可的Assistant建议】 +对话: +User: 我想买个手机,有什么推荐吗? +Assistant: 建议你考虑iPhone 15 Pro,性能强大,拍照效果好。 +User: iPhone 16 怎么样? +Assistant: iPhone 16 预计将在2026年9月发布,会有新的设计和功能。 + +分析:虽然Assistant推荐了iPhone,但用户没有表现出任何接受态度(如"好的"、"我会考虑"、后续询问iPhone相关问题等),因此不能提取为用户的隐式偏好。 +结果:无法提取隐式偏好 + +【反例2 - 只有一问一答的情况】 +对话: +User: 最近有什么好看的电影吗? +Assistant: 《沙丘2》口碑不错,是科幻史诗类型的。 + +分析:这只是一轮简单问答,用户没有进一步的反馈或行为,缺乏足够的上下文来推断用户对科幻电影的偏好或其他隐藏倾向。 +结果:无法提取隐式偏好 + - 隐式偏好是指用户未直接表达,但可以通过深入分析以下方面推断出的倾向或选择: * **隐藏动机**:什么样的潜在需求或目标可能驱动用户的行为? * **行为模式**:可以观察到什么样的重复模式或倾向? @@ -118,7 +161,6 @@ * **情境信号**:用户的选择、比较、排除或场景选择揭示了什么样的深层偏好? - 不要将明确陈述的偏好视为隐式偏好;此提示仅用于推断未直接提及的偏好。 - 超越表面事实,理解用户的隐藏可能性和背后的逻辑。 -- 对于Assistant的回答内容或建议,只有在后续对话中用户表现出隐含接受(如采纳、认同、按建议行动等)的情况下,才能将相关内容提取为用户的隐式偏好。单纯的Assistant建议本身不构成用户偏好。 要求: 1. 仅在对话中有充分证据时进行推断;避免无根据或牵强的猜测。