From ae854092db7b53b52d476c42c59437c0f353a731 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Tue, 18 Nov 2025 16:24:39 +0800 Subject: [PATCH 1/2] fix agent_loop TQ initialization Signed-off-by: 0oshowero0 --- verl/experimental/agent_loop/agent_loop.py | 8 ++++---- verl/utils/transferqueue_utils.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index d6c1319ea75..6fa8aa38c03 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -178,7 +178,7 @@ def __init__(self, config: DictConfig) -> None: class AgentLoopBase(ABC): - """An agent loop takes a input message, chat with OpenAI compatible LLM server and interact with various + """An agent loop takes an input message, chat with OpenAI compatible LLM server and interact with various environments.""" _class_initialized = False @@ -608,7 +608,7 @@ def _postprocess(self, inputs: list[_InternalAgentLoopOutput]) -> DataProto: meta_info={"metrics": metrics, "reward_extra_keys": reward_extra_keys}, ) - def create_transferqueue_client(self, controller_infos, storage_infos, role): + def create_transferqueue_client(self, controller_infos, role): """Create a client for data system(transfer queue).""" from verl.single_controller.ray.base import get_random_string from verl.utils.transferqueue_utils import create_transferqueue_client @@ -616,8 +616,8 @@ def create_transferqueue_client(self, controller_infos, storage_infos, role): client_name = get_random_string(length=6) create_transferqueue_client( client_id=f"{role}_worker_{client_name}", - controller_infos=controller_infos, - storage_infos=storage_infos, + controller_info=controller_infos, + config=self.config, ) diff --git a/verl/utils/transferqueue_utils.py b/verl/utils/transferqueue_utils.py index c692578e3a0..5002539e675 100644 --- a/verl/utils/transferqueue_utils.py +++ b/verl/utils/transferqueue_utils.py @@ -44,7 +44,7 @@ class BatchMeta: def create_transferqueue_client( client_id: str, - controller_info: dict[Any, "ZMQServerInfo"], + controller_info: "ZMQServerInfo", config, ) -> None: global _TRANSFER_QUEUE_CLIENT From c2142acbcb443de492fa97f195726f44c108de7c Mon Sep 17 00:00:00 2001 From: jianjunzhong Date: Tue, 18 Nov 2025 09:43:24 +0800 Subject: [PATCH 2/2] fix issues when running deepeyes using tq Signed-off-by: jianjunzhong --- recipe/deepeyes/deepeyes.py | 2 +- recipe/transfer_queue/agent_loop.py | 9 +++++---- verl/experimental/agent_loop/agent_loop.py | 6 ++++++ verl/utils/transferqueue_utils.py | 2 +- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/recipe/deepeyes/deepeyes.py b/recipe/deepeyes/deepeyes.py index 94c5535a043..7f5ead5b83b 100644 --- a/recipe/deepeyes/deepeyes.py +++ b/recipe/deepeyes/deepeyes.py @@ -179,7 +179,7 @@ def __getitem__(self, item): return row_dict -def compute_score(data_source: str, solution_str: str, ground_truth: str, extra_info=None) -> float: +def compute_score(data_source: str, solution_str: str, ground_truth: str, extra_info=None, **kwargs) -> float: """ Compute reward score for model solutions with robust handling of various formats. diff --git a/recipe/transfer_queue/agent_loop.py b/recipe/transfer_queue/agent_loop.py index 7f936e6730e..7e4ef2a398e 100644 --- a/recipe/transfer_queue/agent_loop.py +++ b/recipe/transfer_queue/agent_loop.py @@ -30,12 +30,11 @@ def generate_sequences(self, prompts: BatchMeta) -> BatchMeta: BatchMeta: Output batch metadata. """ - if self.rm_micro_batch_size and len(prompts) % self.rm_micro_batch_size != 0: - raise ValueError( - f"The length of prompts {len(prompts)} cannot divide the world size of rm_wg {self.rm_micro_batch_size}" - ) if self.config.actor_rollout_ref.rollout.free_cache_engine: self.wake_up() + if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine: + self.reward_model_manager.wake_up() + chunkes = prompts.chunk(len(self.agent_loop_workers)) outputs = ray.get( [ @@ -46,6 +45,8 @@ def generate_sequences(self, prompts: BatchMeta) -> BatchMeta: output = BatchMeta.concat(outputs) if self.config.actor_rollout_ref.rollout.free_cache_engine: self.sleep() + if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine: + self.reward_model_manager.sleep() # calculate performance metrics metrics = [output.extra_info.pop("metrics") for output in outputs] # List[List[Dict[str, str]]] diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index 6fa8aa38c03..b2c20574268 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -734,6 +734,11 @@ def _initialize_llm_servers(self): def _init_agent_loop_workers(self): self.agent_loop_workers = [] num_workers = self.config.actor_rollout_ref.rollout.agent.num_workers + runtime_env = { + "env_vars": { + "TRANSFER_QUEUE_ENABLE": "1" if self.config.transfer_queue.enable else "0", + } + } node_ids = [node["NodeID"] for node in ray.nodes() if node["Alive"] and node["Resources"].get("CPU", 0) > 0] for i in range(num_workers): @@ -745,6 +750,7 @@ def _init_agent_loop_workers(self): scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, soft=True ), + runtime_env=runtime_env, ).remote(self.config, self.server_handles, self.reward_router_address) ) diff --git a/verl/utils/transferqueue_utils.py b/verl/utils/transferqueue_utils.py index 5002539e675..2c0a08def43 100644 --- a/verl/utils/transferqueue_utils.py +++ b/verl/utils/transferqueue_utils.py @@ -39,7 +39,7 @@ class BatchMeta: _TRANSFER_QUEUE_CLIENT = None -is_transferqueue_enabled = os.environ.get("TRANSFER_QUEUE_ENABLE", False) +is_transferqueue_enabled = os.environ.get("TRANSFER_QUEUE_ENABLE", "0") == "1" def create_transferqueue_client(