Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/crewai/src/crewai/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,23 @@ def platform_context(integration_token: str) -> Generator[None, Any, None]:
yield
finally:
_platform_integration_token.reset(token)


_current_task_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"current_task_id", default=None
)


def set_current_task_id(task_id: str | None) -> contextvars.Token[str | None]:
"""Set the current task ID in the context. Returns a token for reset."""
return _current_task_id.set(task_id)


def reset_current_task_id(token: contextvars.Token[str | None]) -> None:
"""Reset the current task ID to its previous value."""
_current_task_id.reset(token)


def get_current_task_id() -> str | None:
"""Get the current task ID from the context."""
return _current_task_id.get()
1 change: 1 addition & 0 deletions lib/crewai/src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,7 @@ def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
self.token_usage = self.calculate_usage_metrics()
crewai_event_bus.flush()
crewai_event_bus.emit(
self,
CrewKickoffCompletedEvent(
Expand Down
51 changes: 36 additions & 15 deletions lib/crewai/src/crewai/experimental/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.flow.flow import Flow, StateProxy, listen, or_, router, start
from crewai.flow.types import FlowMethodName
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
Expand Down Expand Up @@ -225,7 +226,11 @@ def messages(self) -> list[LLMMessage]:
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Delegate to state for ExecutorContext conformance."""
self._state.messages = value
if self._flow_initialized and hasattr(self, "_state_lock"):
with self._state_lock:
self._state.messages = value
else:
self._state.messages = value

@property
def ask_for_human_input(self) -> bool:
Expand Down Expand Up @@ -353,6 +358,8 @@ def state(self) -> AgentReActState:
Flow initialization is deferred to prevent event emission during agent setup.
Returns the temporary state until invoke() is called.
"""
if self._flow_initialized and hasattr(self, "_state_lock"):
return StateProxy(self._state, self._state_lock) # type: ignore[return-value]
return self._state

@property
Expand Down Expand Up @@ -461,15 +468,14 @@ def call_llm_and_parse(self) -> Literal["parsed", "parser_error", "context_error
raise

@listen("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
def call_llm_native_tools(self) -> None:
"""Execute LLM call with native function calling.

Always calls the LLM so it can read reflection prompts and decide
whether to provide a final answer or request more tools.

Returns routing decision based on whether tool calls or final answer.
Note: This is a listener, not a router. The route_native_tool_result
router fires after this to determine the next step based on state.
"""
try:
# Clear pending tools - LLM will decide what to do next after reading
Expand Down Expand Up @@ -499,8 +505,7 @@ def call_llm_native_tools(
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer)

return "native_tool_calls"
return # Router will check pending_tool_calls

if isinstance(answer, BaseModel):
self.state.current_answer = AgentFinish(
Expand All @@ -510,7 +515,7 @@ def call_llm_native_tools(
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer.model_dump_json())
return "native_finished"
return # Router will check current_answer

# Text response - this is the final answer
if isinstance(answer, str):
Expand All @@ -521,8 +526,7 @@ def call_llm_native_tools(
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)

return "native_finished"
return # Router will check current_answer

# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
Expand All @@ -532,13 +536,12 @@ def call_llm_native_tools(
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))

return "native_finished"
# Router will check current_answer

except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
return # Router will check _last_context_error
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
Expand All @@ -551,6 +554,22 @@ def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
return "execute_tool"
return "agent_finished"

@router(call_llm_native_tools)
def route_native_tool_result(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
"""Route based on LLM response for native tool calling.

Checks state set by call_llm_native_tools to determine next step.
This router is needed because only router return values trigger
downstream listeners.
"""
if self._last_context_error is not None:
return "context_error"
if self.state.pending_tool_calls:
return "native_tool_calls"
return "native_finished"

@listen("execute_tool")
def execute_tool_action(self) -> Literal["tool_completed", "tool_result_is_final"]:
"""Execute the tool action and handle the result."""
Expand Down Expand Up @@ -908,9 +927,11 @@ def increment_native_and_continue(self) -> Literal["initialized"]:
self.state.iterations += 1
return "initialized"

@listen("initialized")
@listen(or_("initialized", "tool_completed", "native_tool_completed"))
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
if self._flow_initialized:
self._discard_or_listener(FlowMethodName("continue_iteration"))
return "check_iteration"

@router(or_(initialize_reasoning, continue_iteration))
Expand Down
Loading
Loading