Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion lib/crewai/src/crewai/agents/crew_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
raise

if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
formatted_answer = await self._ahandle_human_feedback(formatted_answer)

self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
Expand Down Expand Up @@ -1508,6 +1508,20 @@ def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
provider = get_provider()
return provider.handle_feedback(formatted_answer, self)

async def _ahandle_human_feedback(
self, formatted_answer: AgentFinish
) -> AgentFinish:
"""Process human feedback asynchronously via the configured provider.

Args:
formatted_answer: Initial agent result.

Returns:
Final answer after feedback.
"""
provider = get_provider()
return await provider.handle_feedback_async(formatted_answer, self)

def _is_training_mode(self) -> bool:
"""Check if training mode is active.

Expand Down
16 changes: 12 additions & 4 deletions lib/crewai/src/crewai/cli/plus_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from typing import Any
from urllib.parse import urljoin
import os

import httpx
import requests

from crewai.cli.config import Settings
Expand Down Expand Up @@ -33,7 +35,11 @@ def __init__(self, api_key: str) -> None:
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid

self.base_url = os.getenv("CREWAI_PLUS_URL") or str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
self.base_url = (
os.getenv("CREWAI_PLUS_URL")
or str(settings.enterprise_base_url)
or DEFAULT_CREWAI_ENTERPRISE_URL
)

def _make_request(
self, method: str, endpoint: str, **kwargs: Any
Expand All @@ -49,8 +55,10 @@ def login_to_tool_repository(self) -> requests.Response:
def get_tool(self, handle: str) -> requests.Response:
return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}")

def get_agent(self, handle: str) -> requests.Response:
return self._make_request("GET", f"{self.AGENTS_RESOURCE}/{handle}")
async def get_agent(self, handle: str) -> httpx.Response:
url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}")
async with httpx.AsyncClient() as client:
return await client.get(url, headers=self.headers)

def publish_tool(
self,
Expand Down
191 changes: 188 additions & 3 deletions lib/crewai/src/crewai/core/providers/human_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

import asyncio
from contextvars import ContextVar, Token
import sys
from typing import TYPE_CHECKING, Protocol, runtime_checkable


Expand Down Expand Up @@ -46,13 +48,21 @@ def _format_feedback_message(self, feedback: str) -> LLMMessage:
...


class AsyncExecutorContext(ExecutorContext, Protocol):
"""Extended context for executors that support async invocation."""

async def _ainvoke_loop(self) -> AgentFinish:
"""Invoke the agent loop asynchronously and return the result."""
...


@runtime_checkable
class HumanInputProvider(Protocol):
"""Protocol for human input handling.

Implementations handle the full feedback flow:
- Sync: prompt user, loop until satisfied
- Async: raise exception for external handling
- Async: use non-blocking I/O and async invoke loop
"""

def setup_messages(self, context: ExecutorContext) -> bool:
Expand Down Expand Up @@ -86,7 +96,7 @@ def handle_feedback(
formatted_answer: AgentFinish,
context: ExecutorContext,
) -> AgentFinish:
"""Handle the full human feedback flow.
"""Handle the full human feedback flow synchronously.

Args:
formatted_answer: The agent's current answer.
Expand All @@ -100,6 +110,25 @@ def handle_feedback(
"""
...

async def handle_feedback_async(
self,
formatted_answer: AgentFinish,
context: AsyncExecutorContext,
) -> AgentFinish:
"""Handle the full human feedback flow asynchronously.

Uses non-blocking I/O for user prompts and async invoke loop
for agent re-execution.

Args:
formatted_answer: The agent's current answer.
context: Async executor context for callbacks.

Returns:
The final answer after feedback processing.
"""
...

@staticmethod
def _get_output_string(answer: AgentFinish) -> str:
"""Extract output string from answer.
Expand All @@ -116,7 +145,7 @@ def _get_output_string(answer: AgentFinish) -> str:


class SyncHumanInputProvider(HumanInputProvider):
"""Default synchronous human input via terminal."""
"""Default human input provider with sync and async support."""

def setup_messages(self, context: ExecutorContext) -> bool:
"""Use standard message setup.
Expand Down Expand Up @@ -157,6 +186,33 @@ def handle_feedback(

return self._handle_regular_feedback(formatted_answer, feedback, context)

async def handle_feedback_async(
self,
formatted_answer: AgentFinish,
context: AsyncExecutorContext,
) -> AgentFinish:
"""Handle feedback asynchronously without blocking the event loop.

Args:
formatted_answer: The agent's current answer.
context: Async executor context for callbacks.

Returns:
The final answer after feedback processing.
"""
feedback = await self._prompt_input_async(context.crew)

if context._is_training_mode():
return await self._handle_training_feedback_async(
formatted_answer, feedback, context
)

return await self._handle_regular_feedback_async(
formatted_answer, feedback, context
)

# ── Sync helpers ──────────────────────────────────────────────────

@staticmethod
def _handle_training_feedback(
initial_answer: AgentFinish,
Expand Down Expand Up @@ -209,6 +265,62 @@ def _handle_regular_feedback(

return answer

# ── Async helpers ─────────────────────────────────────────────────

@staticmethod
async def _handle_training_feedback_async(
initial_answer: AgentFinish,
feedback: str,
context: AsyncExecutorContext,
) -> AgentFinish:
"""Process training feedback asynchronously (single iteration).

Args:
initial_answer: The agent's initial answer.
feedback: Human feedback string.
context: Async executor context for callbacks.

Returns:
Improved answer after processing feedback.
"""
context._handle_crew_training_output(initial_answer, feedback)
context.messages.append(context._format_feedback_message(feedback))
improved_answer = await context._ainvoke_loop()
context._handle_crew_training_output(improved_answer)
context.ask_for_human_input = False
return improved_answer

async def _handle_regular_feedback_async(
self,
current_answer: AgentFinish,
initial_feedback: str,
context: AsyncExecutorContext,
) -> AgentFinish:
"""Process regular feedback with async iteration loop.

Args:
current_answer: The agent's current answer.
initial_feedback: Initial human feedback string.
context: Async executor context for callbacks.

Returns:
Final answer after all feedback iterations.
"""
feedback = initial_feedback
answer = current_answer

while context.ask_for_human_input:
if feedback.strip() == "":
context.ask_for_human_input = False
else:
context.messages.append(context._format_feedback_message(feedback))
answer = await context._ainvoke_loop()
feedback = await self._prompt_input_async(context.crew)

return answer

# ── I/O ───────────────────────────────────────────────────────────

@staticmethod
def _prompt_input(crew: Crew | None) -> str:
"""Show rich panel and prompt for input.
Expand Down Expand Up @@ -262,6 +374,79 @@ def _prompt_input(crew: Crew | None) -> str:
finally:
formatter.resume_live_updates()

@staticmethod
async def _prompt_input_async(crew: Crew | None) -> str:
"""Show rich panel and prompt for input without blocking the event loop.

Args:
crew: The crew instance for context.

Returns:
User input string from terminal.
"""
from rich.panel import Panel
from rich.text import Text

from crewai.events.event_listener import event_listener

formatter = event_listener.formatter
formatter.pause_live_updates()

try:
if crew and getattr(crew, "_train", False):
prompt_text = (
"TRAINING MODE: Provide feedback to improve the agent's performance.\n\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process."
)
title = "🎓 Training Feedback Required"
else:
prompt_text = (
"Provide feedback on the Final Result above.\n\n"
"• If you are happy with the result, simply hit Enter without typing anything.\n"
"• Otherwise, provide specific improvement requests.\n"
"• You can provide multiple rounds of feedback until satisfied."
)
title = "💬 Human Feedback Required"

content = Text()
content.append(prompt_text, style="yellow")

prompt_panel = Panel(
content,
title=title,
border_style="yellow",
padding=(1, 2),
)
formatter.console.print(prompt_panel)

response = await _async_readline()
if response.strip() != "":
formatter.console.print("\n[cyan]Processing your feedback...[/cyan]")
return response
finally:
formatter.resume_live_updates()


async def _async_readline() -> str:
"""Read a line from stdin using the event loop's native I/O.

Falls back to asyncio.to_thread on platforms where piping stdin
is unsupported.

Returns:
The line read from stdin, with trailing newline stripped.
"""
loop = asyncio.get_running_loop()
try:
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
raw = await reader.readline()
return raw.decode().rstrip("\n")
except (OSError, NotImplementedError, ValueError):
return await asyncio.to_thread(input)


_provider: ContextVar[HumanInputProvider | None] = ContextVar(
"human_input_provider",
Expand Down
32 changes: 31 additions & 1 deletion lib/crewai/src/crewai/experimental/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ def _invoke_loop(self) -> AgentFinish:
raise RuntimeError("Agent loop did not produce a final answer")
return answer

async def _ainvoke_loop(self) -> AgentFinish:
"""Invoke the agent loop asynchronously and return the result.

Required by AsyncExecutorContext protocol.
"""
self._state.iterations = 0
self._state.is_finished = False
self._state.current_answer = None

await self.akickoff()

answer = self._state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Agent loop did not produce a final answer")
return answer

def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format feedback as a message for the LLM.

Expand Down Expand Up @@ -1173,7 +1189,7 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
)

if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
formatted_answer = await self._ahandle_human_feedback(formatted_answer)

self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
Expand Down Expand Up @@ -1390,6 +1406,20 @@ def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
provider = get_provider()
return provider.handle_feedback(formatted_answer, self)

async def _ahandle_human_feedback(
self, formatted_answer: AgentFinish
) -> AgentFinish:
"""Process human feedback asynchronously and refine answer.

Args:
formatted_answer: Initial agent result.

Returns:
Final answer after feedback.
"""
provider = get_provider()
return await provider.handle_feedback_async(formatted_answer, self)

def _is_training_mode(self) -> bool:
"""Check if training mode is active.

Expand Down
Loading
Loading