Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/crewai/src/crewai/agents/crew_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
Returns:
Dictionary with agent output.
"""
# Reset state for fresh execution
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lovely!

self.messages.clear()
self.iterations = 0

self._setup_messages(inputs)

self._inject_multimodal_files(inputs)
Expand Down Expand Up @@ -987,6 +991,10 @@ async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
Returns:
Dictionary with agent output.
"""
# Reset state for fresh execution
self.messages.clear()
self.iterations = 0

self._setup_messages(inputs)

await self._ainject_multimodal_files(inputs)
Expand Down
96 changes: 95 additions & 1 deletion lib/crewai/tests/agents/test_async_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,17 @@ class TestAsyncAgentExecutor:

@pytest.mark.asyncio
async def test_ainvoke_returns_output(self, executor: CrewAgentExecutor) -> None:
"""Test that ainvoke returns the expected output."""
"""Test that ainvoke returns the expected output and resets stale state."""
expected_output = "Final answer from agent"

# Pre-populate stale state from a previous task execution to verify
# that ainvoke() resets it before starting the new execution.
executor.messages = [
{"role": "user", "content": "stale prompt from task 1"},
{"role": "assistant", "content": "stale response from task 1"},
]
executor.iterations = 5

with patch.object(
executor,
"_ainvoke_loop",
Expand All @@ -108,6 +116,13 @@ async def test_ainvoke_returns_output(self, executor: CrewAgentExecutor) -> None

assert result == {"output": expected_output}

# Stale state must have been cleared before execution
all_content = " ".join(
str(m.get("content", "")) for m in executor.messages
)
assert "stale" not in all_content
assert executor.iterations == 0

@pytest.mark.asyncio
async def test_ainvoke_loop_calls_aget_llm_response(
self, executor: CrewAgentExecutor
Expand Down Expand Up @@ -294,6 +309,85 @@ async def delayed_response(*args: Any, **kwargs: Any) -> str:
assert all("output" in r for r in results)
assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}"

def test_sequential_invoke_produces_isolated_messages(
self, executor: CrewAgentExecutor
) -> None:
"""Test that consecutive invoke() calls on the same executor produce
isolated message histories, as happens when Crew reuses an agent
for sequential tasks.
"""
# --- First task ---
def loop_task1() -> AgentFinish:
executor.messages.append(
{"role": "assistant", "content": "task1 answer"}
)
executor.iterations = 3
return AgentFinish(
thought="", output="Task 1 done", text="Task 1 done"
)

with patch.object(executor, "_invoke_loop", side_effect=loop_task1):
with patch.object(executor, "_show_start_logs"):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
executor.invoke(
{
"input": "first task",
"tool_names": "",
"tools": "",
}
)

# Sanity: task 1 state is present
assert any(
"task1" in str(m.get("content", "")) for m in executor.messages
)
task1_iterations = executor.iterations

# --- Second task (same executor, simulating Crew reuse) ---
messages_at_loop_start: list[dict[str, Any]] = []

def loop_task2() -> AgentFinish:
messages_at_loop_start.extend(
[dict(m) for m in executor.messages]
)
return AgentFinish(
thought="", output="Task 2 done", text="Task 2 done"
)

with patch.object(executor, "_invoke_loop", side_effect=loop_task2):
with patch.object(executor, "_show_start_logs"):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
executor.invoke(
{
"input": "second task",
"tool_names": "",
"tools": "",
}
)

# No task 1 messages should be visible when task 2's loop starts
for msg in messages_at_loop_start:
content = str(msg.get("content", ""))
assert "task1" not in content, (
f"Task 1 message leaked into task 2: {content}"
)
assert "first task" not in content, (
f"Task 1 prompt leaked into task 2: {content}"
)

# Task 2 should see its own prompt
assert any(
"second task" in str(m.get("content", ""))
for m in messages_at_loop_start
)

# Iterations must not carry over
assert executor.iterations < task1_iterations


class TestAsyncLLMResponseHelper:
"""Tests for aget_llm_response helper function."""
Expand Down