diff --git a/agentex/src/adapters/streams/adapter_redis.py b/agentex/src/adapters/streams/adapter_redis.py index d283dc8..7af40d5 100644 --- a/agentex/src/adapters/streams/adapter_redis.py +++ b/agentex/src/adapters/streams/adapter_redis.py @@ -54,11 +54,13 @@ async def send_data(self, topic: str, data: dict[str, Any]) -> str: logger.info(f"Publishing data to stream {topic}, data: {data_json}") - # Add to Redis stream with a reasonable max length + # Add to Redis stream with maxlen to prevent unbounded growth await self.send_redis_connection_metrics() message_id = await self.redis.xadd( name=topic, fields={"data": data_json}, + maxlen=self.environment_variables.REDIS_STREAM_MAXLEN, + approximate=True, # Use ~ for better performance (O(1) vs O(N)) ) return message_id except Exception as e: diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 1222b61..f82743b 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -40,6 +40,7 @@ class EnvVarKeys(str, Enum): REDIS_MAX_CONNECTIONS = "REDIS_MAX_CONNECTIONS" REDIS_CONNECTION_TIMEOUT = "REDIS_CONNECTION_TIMEOUT" REDIS_SOCKET_TIMEOUT = "REDIS_SOCKET_TIMEOUT" + REDIS_STREAM_MAXLEN = "REDIS_STREAM_MAXLEN" IMAGE_PULL_SECRET_NAME = "IMAGE_PULL_SECRET_NAME" AGENTEX_AUTH_URL = "AGENTEX_AUTH_URL" ALLOWED_ORIGINS = "ALLOWED_ORIGINS" @@ -88,6 +89,9 @@ class EnvironmentVariables(BaseModel): REDIS_MAX_CONNECTIONS: int = 50 # Increased for SSE streaming REDIS_CONNECTION_TIMEOUT: int = 60 # Connection timeout in seconds REDIS_SOCKET_TIMEOUT: int = 30 # Socket timeout in seconds + REDIS_STREAM_MAXLEN: int = ( + 10000 # Max entries per Redis stream to prevent unbounded growth + ) IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None ALLOWED_ORIGINS: str | None = None @@ -146,6 +150,9 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: REDIS_SOCKET_TIMEOUT=int( os.environ.get(EnvVarKeys.REDIS_SOCKET_TIMEOUT, "30") ), + REDIS_STREAM_MAXLEN=int( + os.environ.get(EnvVarKeys.REDIS_STREAM_MAXLEN, "10000") + ), IMAGE_PULL_SECRET_NAME=os.environ.get(EnvVarKeys.IMAGE_PULL_SECRET_NAME), AGENTEX_AUTH_URL=os.environ.get(EnvVarKeys.AGENTEX_AUTH_URL), ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"), diff --git a/agentex/tests/integration/test_redis_stream_maxlen.py b/agentex/tests/integration/test_redis_stream_maxlen.py new file mode 100644 index 0000000..f47819d --- /dev/null +++ b/agentex/tests/integration/test_redis_stream_maxlen.py @@ -0,0 +1,115 @@ +""" +Integration test to verify Redis stream max length behavior. + +This test validates whether the RedisStreamRepository enforces any +max length trimming on stream messages. +""" + +import pytest + +from tests.fixtures.repositories import create_redis_stream_repository + +# Test configuration +NUM_MESSAGES = 10500 # Test beyond 10,000 to check for hidden limits +TEST_STREAM_TOPIC = "test:maxlen:verification" + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_redis_stream_no_maxlen_enforcement(base_redis_client): + """ + Test that Redis streams do NOT enforce a max length. + + This verifies the current implementation in RedisStreamRepository.send_data() + does not use MAXLEN parameter, meaning streams grow unbounded. + """ + # Create repository using the existing factory + repository = create_redis_stream_repository(base_redis_client) + + # Clean up any existing test stream + await repository.cleanup_stream(TEST_STREAM_TOPIC) + + # Send messages through the actual API + print(f"\nAdding {NUM_MESSAGES} messages via RedisStreamRepository.send_data()...") + for i in range(NUM_MESSAGES): + await repository.send_data( + topic=TEST_STREAM_TOPIC, data={"index": i, "message": f"test_message_{i}"} + ) + if (i + 1) % 1000 == 0: + print(f" Sent {i + 1} messages...") + + # Check stream length using underlying Redis client + stream_length = await repository.redis.xlen(TEST_STREAM_TOPIC) + + print(f"\n{'=' * 50}") + print("RESULTS:") + print(f"{'=' * 50}") + print(f"Messages sent: {NUM_MESSAGES}") + print(f"Stream length: {stream_length}") + print(f"Messages trimmed: {NUM_MESSAGES - stream_length}") + + # Get stream info for details + info = await repository.redis.xinfo_stream(TEST_STREAM_TOPIC) + print("\nSTREAM INFO:") + print(f"Length: {info.get('length')}") + print(f"First entry ID: {info.get('first-entry', [b'N/A'])[0]}") + print(f"Last entry ID: {info.get('last-entry', [b'N/A'])[0]}") + + # Memory usage + memory = await repository.redis.memory_usage(TEST_STREAM_TOPIC) + print(f"Memory usage: {memory} bytes ({memory / 1024:.2f} KB)") + + # Cleanup + await repository.cleanup_stream(TEST_STREAM_TOPIC) + + # Assert no trimming occurred + assert stream_length == NUM_MESSAGES, ( + f"Expected {NUM_MESSAGES} messages but found {stream_length}. " + f"Redis may be enforcing a max length!" + ) + + print(f"\n✓ PASSED: No max length enforced - all {NUM_MESSAGES} messages retained") + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_redis_stream_info_after_messages(base_redis_client): + """ + Test to gather detailed stream information after adding messages. + + This is useful for understanding Redis stream behavior and memory characteristics. + """ + repository = create_redis_stream_repository(base_redis_client) + topic = "test:stream:info" + + await repository.cleanup_stream(topic) + + # Add a smaller batch of messages + message_count = 100 + for i in range(message_count): + await repository.send_data( + topic=topic, + data={ + "index": i, + "payload": "x" * 100, # 100 byte payload + "nested": {"key": f"value_{i}"}, + }, + ) + + # Get comprehensive stream info + info = await repository.redis.xinfo_stream(topic) + length = await repository.redis.xlen(topic) + memory = await repository.redis.memory_usage(topic) + + print(f"\n{'=' * 50}") + print(f"STREAM ANALYSIS: {topic}") + print(f"{'=' * 50}") + print(f"Total messages: {length}") + print(f"Memory usage: {memory} bytes") + print(f"Bytes per message: {memory / length:.2f}") + print(f"Radix tree keys: {info.get('radix-tree-keys')}") + print(f"Radix tree nodes: {info.get('radix-tree-nodes')}") + + await repository.cleanup_stream(topic) + + assert length == message_count