diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index c7adcbe092..afb337a2db 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -210,6 +210,10 @@ def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: Returns: Dictionary with agent output. """ + # Reset state for fresh execution + self.messages.clear() + self.iterations = 0 + self._setup_messages(inputs) self._inject_multimodal_files(inputs) @@ -987,6 +991,10 @@ async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]: Returns: Dictionary with agent output. """ + # Reset state for fresh execution + self.messages.clear() + self.iterations = 0 + self._setup_messages(inputs) await self._ainject_multimodal_files(inputs) diff --git a/lib/crewai/tests/agents/test_async_agent_executor.py b/lib/crewai/tests/agents/test_async_agent_executor.py index 4dc72ab2a9..e33cae5b9a 100644 --- a/lib/crewai/tests/agents/test_async_agent_executor.py +++ b/lib/crewai/tests/agents/test_async_agent_executor.py @@ -83,9 +83,17 @@ class TestAsyncAgentExecutor: @pytest.mark.asyncio async def test_ainvoke_returns_output(self, executor: CrewAgentExecutor) -> None: - """Test that ainvoke returns the expected output.""" + """Test that ainvoke returns the expected output and resets stale state.""" expected_output = "Final answer from agent" + # Pre-populate stale state from a previous task execution to verify + # that ainvoke() resets it before starting the new execution. + executor.messages = [ + {"role": "user", "content": "stale prompt from task 1"}, + {"role": "assistant", "content": "stale response from task 1"}, + ] + executor.iterations = 5 + with patch.object( executor, "_ainvoke_loop", @@ -108,6 +116,13 @@ async def test_ainvoke_returns_output(self, executor: CrewAgentExecutor) -> None assert result == {"output": expected_output} + # Stale state must have been cleared before execution + all_content = " ".join( + str(m.get("content", "")) for m in executor.messages + ) + assert "stale" not in all_content + assert executor.iterations == 0 + @pytest.mark.asyncio async def test_ainvoke_loop_calls_aget_llm_response( self, executor: CrewAgentExecutor @@ -294,6 +309,85 @@ async def delayed_response(*args: Any, **kwargs: Any) -> str: assert all("output" in r for r in results) assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}" + def test_sequential_invoke_produces_isolated_messages( + self, executor: CrewAgentExecutor + ) -> None: + """Test that consecutive invoke() calls on the same executor produce + isolated message histories, as happens when Crew reuses an agent + for sequential tasks. + """ + # --- First task --- + def loop_task1() -> AgentFinish: + executor.messages.append( + {"role": "assistant", "content": "task1 answer"} + ) + executor.iterations = 3 + return AgentFinish( + thought="", output="Task 1 done", text="Task 1 done" + ) + + with patch.object(executor, "_invoke_loop", side_effect=loop_task1): + with patch.object(executor, "_show_start_logs"): + with patch.object(executor, "_create_short_term_memory"): + with patch.object(executor, "_create_long_term_memory"): + with patch.object(executor, "_create_external_memory"): + executor.invoke( + { + "input": "first task", + "tool_names": "", + "tools": "", + } + ) + + # Sanity: task 1 state is present + assert any( + "task1" in str(m.get("content", "")) for m in executor.messages + ) + task1_iterations = executor.iterations + + # --- Second task (same executor, simulating Crew reuse) --- + messages_at_loop_start: list[dict[str, Any]] = [] + + def loop_task2() -> AgentFinish: + messages_at_loop_start.extend( + [dict(m) for m in executor.messages] + ) + return AgentFinish( + thought="", output="Task 2 done", text="Task 2 done" + ) + + with patch.object(executor, "_invoke_loop", side_effect=loop_task2): + with patch.object(executor, "_show_start_logs"): + with patch.object(executor, "_create_short_term_memory"): + with patch.object(executor, "_create_long_term_memory"): + with patch.object(executor, "_create_external_memory"): + executor.invoke( + { + "input": "second task", + "tool_names": "", + "tools": "", + } + ) + + # No task 1 messages should be visible when task 2's loop starts + for msg in messages_at_loop_start: + content = str(msg.get("content", "")) + assert "task1" not in content, ( + f"Task 1 message leaked into task 2: {content}" + ) + assert "first task" not in content, ( + f"Task 1 prompt leaked into task 2: {content}" + ) + + # Task 2 should see its own prompt + assert any( + "second task" in str(m.get("content", "")) + for m in messages_at_loop_start + ) + + # Iterations must not carry over + assert executor.iterations < task1_iterations + class TestAsyncLLMResponseHelper: """Tests for aget_llm_response helper function."""