Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 123 additions & 21 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2502,21 +2502,37 @@ def clear(self, user_name: str | None = None) -> None:

@timed
def export_graph(
self, include_embedding: bool = False, user_name: str | None = None
self,
include_embedding: bool = False,
user_name: str | None = None,
page: int = 1,
page_size: int = 10,
) -> dict[str, Any]:
"""
Export all graph nodes and edges in a structured form.
Args:
include_embedding (bool): Whether to include the large embedding field.
user_name (str, optional): User name for filtering in non-multi-db mode
page (int): Page number (starts from 1). Default is 1.
page_size (int): Number of items per page. Default is 1000.

Returns:
{
"nodes": [ { "id": ..., "memory": ..., "metadata": {...} }, ... ],
"edges": [ { "source": ..., "target": ..., "type": ... }, ... ]
}
"""
logger.info(
f"[export_graph] include_embedding: {include_embedding}, user_name: {user_name}, page: {page}, page_size: {page_size}"
)
user_name = user_name if user_name else self._get_config_value("user_name")

# Validate pagination parameters
if page < 1:
page = 1
if page_size < 1:
page_size = 10

conn = None
try:
conn = self._get_connection()
Expand All @@ -2526,14 +2542,18 @@ def export_graph(
SELECT id, properties, embedding
FROM "{self.db_name}_graph"."Memory"
WHERE ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = '\"{user_name}\"'::agtype
ORDER BY id
LIMIT {page_size} OFFSET {(page - 1) * page_size}
"""
else:
node_query = f"""
SELECT id, properties
FROM "{self.db_name}_graph"."Memory"
WHERE ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = '\"{user_name}\"'::agtype
ORDER BY id
LIMIT {page_size} OFFSET {(page - 1) * page_size}
"""

logger.info(f"[export_graph nodes] Query: {node_query}")
with conn.cursor() as cursor:
cursor.execute(node_query)
node_results = cursor.fetchall()
Expand Down Expand Up @@ -2580,14 +2600,19 @@ def export_graph(
try:
conn = self._get_connection()
# Export edges using cypher query
# Note: Apache AGE Cypher may not support SKIP, so we use SQL LIMIT/OFFSET on the subquery
edge_query = f"""
SELECT * FROM cypher('{self.db_name}_graph', $$
MATCH (a:Memory)-[r]->(b:Memory)
WHERE a.user_name = '{user_name}' AND b.user_name = '{user_name}'
RETURN a.id AS source, b.id AS target, type(r) as edge
$$) AS (source agtype, target agtype, edge agtype)
SELECT source, target, edge FROM (
SELECT * FROM cypher('{self.db_name}_graph', $$
MATCH (a:Memory)-[r]->(b:Memory)
WHERE a.user_name = '{user_name}' AND b.user_name = '{user_name}'
RETURN a.id AS source, b.id AS target, type(r) as edge
ORDER BY a.id, b.id
$$) AS (source agtype, target agtype, edge agtype)
) AS edges
LIMIT {page_size} OFFSET {(page - 1) * page_size}
"""

logger.info(f"[export_graph edges] Query: {edge_query}")
with conn.cursor() as cursor:
cursor.execute(edge_query)
edge_results = cursor.fetchall()
Expand Down Expand Up @@ -4580,28 +4605,105 @@ def build_filter_condition(condition_dict: dict) -> str:
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {op_value}::agtype"
)
elif op == "contains":
# Handle contains operator (for string fields only)
# Check if agtype contains value (using @> operator)
if not isinstance(op_value, str):
raise ValueError(
f"contains operator only supports string format. "
f"Use {{'{key}': {{'contains': '{op_value}'}}}} instead of {{'{key}': {{'contains': {op_value}}}}}"
)
# Handle contains operator
# For array fields: check if array contains the value using @> operator
# For string fields: check if string contains the value using @> operator
# Check if key starts with "info." prefix
if key.startswith("info."):
info_field = key[5:] # Remove "info." prefix
# String contains: use @> operator for agtype contains
escaped_value = escape_sql_string(op_value)
escaped_value = escape_sql_string(str(op_value))
# For array fields, use @> with array format: '["value"]'::agtype
# For string fields, use @> with string format: '"value"'::agtype
# We'll use array format for contains to check if array contains the value
condition_parts.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '\"{escaped_value}\"'::agtype"
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '[\"{escaped_value}\"]'::agtype"
)
else:
# Direct property access
# String contains: use @> operator for agtype contains
escaped_value = escape_sql_string(op_value)
escaped_value = escape_sql_string(str(op_value))
# For array fields, use @> with array format
condition_parts.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '\"{escaped_value}\"'::agtype"
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype"
)
elif op == "in":
# Handle in operator (for checking if field value is in a list)
# Supports array format: {"field": {"in": ["value1", "value2"]}}
if not isinstance(op_value, list):
raise ValueError(
f"in operator only supports array format. "
f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}"
)
# Check if key starts with "info." prefix
if key.startswith("info."):
info_field = key[5:] # Remove "info." prefix
# Build OR conditions for nested properties
if len(op_value) == 0:
# Empty list means no match
condition_parts.append("false")
elif len(op_value) == 1:
# Single value, use equality
item = op_value[0]
if isinstance(item, str):
escaped_value = escape_sql_string(item)
condition_parts.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype"
)
else:
condition_parts.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype"
)
else:
# Multiple values, use OR conditions
or_conditions = []
for item in op_value:
if isinstance(item, str):
escaped_value = escape_sql_string(item)
or_conditions.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype"
)
else:
or_conditions.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype"
)
if or_conditions:
condition_parts.append(
f"({' OR '.join(or_conditions)})"
)
else:
# Direct property access
# Build OR conditions
if len(op_value) == 0:
# Empty list means no match
condition_parts.append("false")
elif len(op_value) == 1:
# Single value, use equality
item = op_value[0]
if isinstance(item, str):
escaped_value = escape_sql_string(item)
condition_parts.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype"
)
else:
condition_parts.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype"
)
else:
# Multiple values, use OR conditions
or_conditions = []
for item in op_value:
if isinstance(item, str):
escaped_value = escape_sql_string(item)
or_conditions.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype"
)
else:
or_conditions.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype"
)
if or_conditions:
condition_parts.append(
f"({' OR '.join(or_conditions)})"
)
elif op == "like":
# Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%')
# Check if key starts with "info." prefix
Expand Down