Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ def reset_event_state() -> None:
EventContextConfig,
_event_context_config,
_event_id_stack,
reset_last_event_id,
)

reset_emission_counter()
reset_last_event_id() # Added missing call to reset last event ID
_event_id_stack.set(())
_event_context_config.set(EventContextConfig())

Expand Down
14 changes: 14 additions & 0 deletions lib/crewai/src/crewai/memory/storage/mem0_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ def _create_filter_for_search(self):
elif agent_id:
filter["AND"].append({"agent_id": agent_id})

# Flatten filter for local Memory instances with vector stores that don't support AND/OR structure
# (e.g., Valkey, Redis). If there's only one condition, return it directly.
# Only apply flattening for local Memory, not cloud MemoryClient.
if isinstance(self.memory, Memory):
local_mem0_config = self.config.get("local_mem0_config")
vector_store_config = {}
if local_mem0_config and isinstance(local_mem0_config, dict):
vector_store_config = local_mem0_config.get("vector_store") or {}
provider = vector_store_config.get("provider", "") if isinstance(vector_store_config, dict) else ""

if provider in {"valkey", "redis"}:
if len(filter.get("AND", [])) == 1 and "OR" not in filter:
return filter["AND"][0]

return filter

def save(self, value: Any, metadata: dict[str, Any]) -> None:
Expand Down
327 changes: 327 additions & 0 deletions lib/crewai/tests/storage/test_mem0_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,49 @@ def test_search_method_with_memory_oss(mem0_storage_with_mocked_config):

assert len(results) == 2
assert results[0]["content"] == "Result 1"

def test_search_method_with_memory_oss_with_valkey():
"""Test search method with Valkey provider - filters should be flattened"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
{"score": 0.4, "memory": "Result 2"},
]
}

config = {
"user_id": "test_user",
"run_id": "my_run_id",
"local_mem0_config": {
"vector_store": {
"provider": "valkey",
"config": {
"valkey_url": "valkey://localhost:6379",
"collection_name": "test_collection",
"embedding_model_dims": 1536,
},
},
},
}

with patch("mem0.Memory.from_config", return_value=mock_memory):
mem0_storage = Mem0Storage(type="short_term", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

# For Valkey with a single AND condition, the filter should be flattened
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
user_id="test_user",
filters={"run_id": "my_run_id"}, # Flattened from {"AND": [{"run_id": "my_run_id"}]}
threshold=0.5,
)

assert len(results) == 2
assert results[0]["content"] == "Result 1"


def test_search_method_with_memory_client(
Expand Down Expand Up @@ -502,3 +545,287 @@ def test_search_method_with_agent_id_and_user_id():

assert len(results) == 2
assert results[0]["content"] == "Result 1"


def test_search_method_with_redis_provider():
"""Test search method with Redis provider - filters should be flattened"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

config = {
"agent_id": "agent-123",
"local_mem0_config": {
"vector_store": {
"provider": "redis",
"config": {
"redis_url": "redis://localhost:6379",
"collection_name": "test_collection",
"embedding_model_dims": 1536,
},
},
},
}

with patch("mem0.Memory.from_config", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

# For Redis with a single AND condition, the filter should be flattened
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
filters={"agent_id": "agent-123"}, # Flattened from {"AND": [{"agent_id": "agent-123"}]}
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_search_method_with_valkey_and_or_filters():
"""Test search method with Valkey and OR filters - should NOT be flattened for multiple conditions"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

config = {
"agent_id": "agent-123",
"user_id": "user-123",
"local_mem0_config": {
"vector_store": {
"provider": "valkey",
"config": {
"valkey_url": "valkey://localhost:6379",
"collection_name": "test_collection",
"embedding_model_dims": 1536,
},
},
},
}

with patch("mem0.Memory.from_config", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

# For Valkey with OR filters containing multiple conditions, keep the structure
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
user_id="user-123",
filters={"OR": [{"user_id": "user-123"}, {"agent_id": "agent-123"}]},
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_search_method_with_non_valkey_redis_provider():
"""Test search method with non-Valkey/Redis provider - filters should NOT be flattened"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

config = {
"agent_id": "agent-123",
"local_mem0_config": {
"vector_store": {
"provider": "qdrant",
"config": {"host": "localhost", "port": 6333},
},
},
}

with patch.object(Memory, "__new__", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

# For non-Valkey/Redis providers, keep the AND structure
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
filters={"AND": [{"agent_id": "agent-123"}]},
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_search_method_with_none_local_mem0_config():
"""Test that search handles None values in local_mem0_config without raising AttributeError"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

# Test with local_mem0_config explicitly set to None
config = {
"agent_id": "agent-123",
"local_mem0_config": None,
}

with patch.object(Memory, "__new__", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

# This should not raise AttributeError
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
filters={"AND": [{"agent_id": "agent-123"}]},
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_search_method_with_none_vector_store():
"""Test that search handles None values in vector_store without raising AttributeError"""
mock_memory = MagicMock(spec=Memory)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

# Test with vector_store explicitly set to None in config
# This simulates YAML: local_mem0_config: { vector_store: } or JSON: "vector_store": null
config = {
"agent_id": "agent-123",
"local_mem0_config": {
"vector_store": None,
},
}

# Mock Memory initialization to bypass validation
with patch("mem0.Memory.from_config", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

# This should not raise AttributeError when accessing vector_store config
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
filters={"AND": [{"agent_id": "agent-123"}]},
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_search_method_with_memory_client_and_valkey_config():
"""Test that MemoryClient (cloud) with valkey config does NOT flatten filters"""
mock_memory_client = MagicMock(spec=MemoryClient)
mock_results = {
"results": [
{"score": 0.9, "memory": "Result 1"},
]
}

# This is the problematic scenario: MEM0_API_KEY is set (so MemoryClient is used)
# but local_mem0_config with valkey provider is also present
config = {
"agent_id": "agent-123",
"api_key": "test-api-key", # This causes MemoryClient to be used
"local_mem0_config": {
"vector_store": {
"provider": "valkey", # This would trigger flattening in the old code
"config": {
"valkey_url": "valkey://localhost:6379",
"collection_name": "test_collection",
"embedding_model_dims": 1536,
},
},
},
}

with patch.object(MemoryClient, "__new__", return_value=mock_memory_client):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.memory.search = MagicMock(return_value=mock_results)

results = mem0_storage.search("test query", limit=5, score_threshold=0.5)

# For MemoryClient (cloud), filters should NOT be flattened even with valkey config
# The cloud API expects AND/OR structure
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
metadata={"type": "external"},
version="v2",
output_format="v1.1",
filters={"AND": [{"agent_id": "agent-123"}]}, # Should NOT be flattened
threshold=0.5,
)

assert len(results) == 1
assert results[0]["content"] == "Result 1"


def test_magicmock_spec_isinstance_behavior():
"""
Verify that MagicMock with spec parameter passes isinstance checks.

This is a critical assumption for our test suite - we use MagicMock(spec=Memory)
to mock Memory instances, and the production code uses isinstance(self.memory, Memory)
to guard flattening logic. This test ensures our mocking strategy is valid.
"""
# Test that MagicMock with spec passes isinstance check
mock_memory = MagicMock(spec=Memory)
assert isinstance(mock_memory, Memory), "MagicMock(spec=Memory) should pass isinstance check"

# Test that regular MagicMock does NOT pass isinstance check
regular_mock = MagicMock()
assert not isinstance(regular_mock, Memory), "Regular MagicMock should NOT pass isinstance check"

# Test that MagicMock with spec passes isinstance check for MemoryClient too
mock_memory_client = MagicMock(spec=MemoryClient)
assert isinstance(mock_memory_client, MemoryClient), "MagicMock(spec=MemoryClient) should pass isinstance check"

# Verify the flattening logic works correctly with MagicMock(spec=Memory)
config = {
"agent_id": "test-agent",
"local_mem0_config": {
"vector_store": {
"provider": "valkey",
"config": {"valkey_url": "valkey://localhost:6379"},
},
},
}

with patch("mem0.Memory.from_config", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)

# Verify that isinstance check works in the actual storage class
assert isinstance(mem0_storage.memory, Memory), "Storage should recognize mock as Memory instance"

# Test that the filter flattening logic is triggered
filter_result = mem0_storage._create_filter_for_search()

# For valkey provider with single AND condition, filter should be flattened
expected_flattened = {"agent_id": "test-agent"}
assert filter_result == expected_flattened, f"Filter should be flattened to {expected_flattened}, got {filter_result}"
Loading