From 24b7d8b7d7798b1e02cd27a9852b0db6124bf5ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Tue, 23 Dec 2025 10:35:48 +0800 Subject: [PATCH 1/3] add export_graph log --- src/memos/graph_dbs/polardb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index c81e46804..0bf02f5ba 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2533,7 +2533,7 @@ def export_graph( FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = '\"{user_name}\"'::agtype """ - + logger.info(f"[export_graph nodes] Query: {node_query}") with conn.cursor() as cursor: cursor.execute(node_query) node_results = cursor.fetchall() @@ -2587,7 +2587,7 @@ def export_graph( RETURN a.id AS source, b.id AS target, type(r) as edge $$) AS (source agtype, target agtype, edge agtype) """ - + logger.info(f"[export_graph edges] Query: {edge_query}") with conn.cursor() as cursor: cursor.execute(edge_query) edge_results = cursor.fetchall() From 4a2da8290404adabe50b485a5060ec906d6bc793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Tue, 23 Dec 2025 14:23:48 +0800 Subject: [PATCH 2/3] fix search_by_embedding filter in --- src/memos/graph_dbs/polardb.py | 103 ++++++++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 13 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 0bf02f5ba..7fb60568f 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -4580,28 +4580,105 @@ def build_filter_condition(condition_dict: dict) -> str: f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {op_value}::agtype" ) elif op == "contains": - # Handle contains operator (for string fields only) - # Check if agtype contains value (using @> operator) - if not isinstance(op_value, str): - raise ValueError( - f"contains operator only supports string format. " - f"Use {{'{key}': {{'contains': '{op_value}'}}}} instead of {{'{key}': {{'contains': {op_value}}}}}" - ) + # Handle contains operator + # For array fields: check if array contains the value using @> operator + # For string fields: check if string contains the value using @> operator # Check if key starts with "info." prefix if key.startswith("info."): info_field = key[5:] # Remove "info." prefix - # String contains: use @> operator for agtype contains - escaped_value = escape_sql_string(op_value) + escaped_value = escape_sql_string(str(op_value)) + # For array fields, use @> with array format: '["value"]'::agtype + # For string fields, use @> with string format: '"value"'::agtype + # We'll use array format for contains to check if array contains the value condition_parts.append( - f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '[\"{escaped_value}\"]'::agtype" ) else: # Direct property access - # String contains: use @> operator for agtype contains - escaped_value = escape_sql_string(op_value) + escaped_value = escape_sql_string(str(op_value)) + # For array fields, use @> with array format condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" ) + elif op == "in": + # Handle in operator (for checking if field value is in a list) + # Supports array format: {"field": {"in": ["value1", "value2"]}} + if not isinstance(op_value, list): + raise ValueError( + f"in operator only supports array format. " + f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}" + ) + # Check if key starts with "info." prefix + if key.startswith("info."): + info_field = key[5:] # Remove "info." prefix + # Build OR conditions for nested properties + if len(op_value) == 0: + # Empty list means no match + condition_parts.append("false") + elif len(op_value) == 1: + # Single value, use equality + item = op_value[0] + if isinstance(item, str): + escaped_value = escape_sql_string(item) + condition_parts.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype" + ) + else: + condition_parts.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype" + ) + else: + # Multiple values, use OR conditions + or_conditions = [] + for item in op_value: + if isinstance(item, str): + escaped_value = escape_sql_string(item) + or_conditions.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype" + ) + else: + or_conditions.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype" + ) + if or_conditions: + condition_parts.append( + f"({' OR '.join(or_conditions)})" + ) + else: + # Direct property access + # Build OR conditions + if len(op_value) == 0: + # Empty list means no match + condition_parts.append("false") + elif len(op_value) == 1: + # Single value, use equality + item = op_value[0] + if isinstance(item, str): + escaped_value = escape_sql_string(item) + condition_parts.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + ) + else: + condition_parts.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" + ) + else: + # Multiple values, use OR conditions + or_conditions = [] + for item in op_value: + if isinstance(item, str): + escaped_value = escape_sql_string(item) + or_conditions.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + ) + else: + or_conditions.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" + ) + if or_conditions: + condition_parts.append( + f"({' OR '.join(or_conditions)})" + ) elif op == "like": # Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%') # Check if key starts with "info." prefix From 56193bf4b96676e90ff796cc7e4841814a87cc5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Tue, 23 Dec 2025 20:37:42 +0800 Subject: [PATCH 3/3] add export_graph page --- src/memos/graph_dbs/polardb.py | 37 ++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 7fb60568f..1d19dc98d 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2502,13 +2502,19 @@ def clear(self, user_name: str | None = None) -> None: @timed def export_graph( - self, include_embedding: bool = False, user_name: str | None = None + self, + include_embedding: bool = False, + user_name: str | None = None, + page: int = 1, + page_size: int = 10, ) -> dict[str, Any]: """ Export all graph nodes and edges in a structured form. Args: include_embedding (bool): Whether to include the large embedding field. user_name (str, optional): User name for filtering in non-multi-db mode + page (int): Page number (starts from 1). Default is 1. + page_size (int): Number of items per page. Default is 1000. Returns: { @@ -2516,7 +2522,17 @@ def export_graph( "edges": [ { "source": ..., "target": ..., "type": ... }, ... ] } """ + logger.info( + f"[export_graph] include_embedding: {include_embedding}, user_name: {user_name}, page: {page}, page_size: {page_size}" + ) user_name = user_name if user_name else self._get_config_value("user_name") + + # Validate pagination parameters + if page < 1: + page = 1 + if page_size < 1: + page_size = 10 + conn = None try: conn = self._get_connection() @@ -2526,12 +2542,16 @@ def export_graph( SELECT id, properties, embedding FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = '\"{user_name}\"'::agtype + ORDER BY id + LIMIT {page_size} OFFSET {(page - 1) * page_size} """ else: node_query = f""" SELECT id, properties FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = '\"{user_name}\"'::agtype + ORDER BY id + LIMIT {page_size} OFFSET {(page - 1) * page_size} """ logger.info(f"[export_graph nodes] Query: {node_query}") with conn.cursor() as cursor: @@ -2580,12 +2600,17 @@ def export_graph( try: conn = self._get_connection() # Export edges using cypher query + # Note: Apache AGE Cypher may not support SKIP, so we use SQL LIMIT/OFFSET on the subquery edge_query = f""" - SELECT * FROM cypher('{self.db_name}_graph', $$ - MATCH (a:Memory)-[r]->(b:Memory) - WHERE a.user_name = '{user_name}' AND b.user_name = '{user_name}' - RETURN a.id AS source, b.id AS target, type(r) as edge - $$) AS (source agtype, target agtype, edge agtype) + SELECT source, target, edge FROM ( + SELECT * FROM cypher('{self.db_name}_graph', $$ + MATCH (a:Memory)-[r]->(b:Memory) + WHERE a.user_name = '{user_name}' AND b.user_name = '{user_name}' + RETURN a.id AS source, b.id AS target, type(r) as edge + ORDER BY a.id, b.id + $$) AS (source agtype, target agtype, edge agtype) + ) AS edges + LIMIT {page_size} OFFSET {(page - 1) * page_size} """ logger.info(f"[export_graph edges] Query: {edge_query}") with conn.cursor() as cursor: