Skip to content

Conversation

@xinyuangui2
Copy link

@xinyuangui2 xinyuangui2 commented Jan 12, 2026

How to test

python3 examples/data_preprocess/gsm8k.py --local_save_dir ~/data/gsm8k --distributed

python run_on_all_nodes.py "python3 -c \"import transformers; transformers.pipeline('text-generation', model='Qwen/Qwen2.5-0.5B-Instruct')\"" --timeout 600

PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \
 data.train_files=$HOME/data/gsm8k/train.parquet \
 data.val_files=$HOME/data/gsm8k/test.parquet \
 data.train_batch_size=256 \
 data.max_prompt_length=512 \
 data.max_response_length=512 \
 actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \
 actor_rollout_ref.actor.optim.lr=1e-6 \
 actor_rollout_ref.actor.ppo_mini_batch_size=64 \
 actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
 actor_rollout_ref.rollout.name=vllm \
 actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
 actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
 actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
 actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
 critic.optim.lr=1e-5 \
 critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \
 critic.ppo_micro_batch_size_per_gpu=4 \
 algorithm.kl_ctrl.kl_coef=0.001 \
 trainer.logger=console \
 trainer.val_before_train=False \
 trainer.n_gpus_per_node=1 \
 trainer.nnodes=1 \
 trainer.save_freq=10 \
 trainer.test_freq=10 \
 trainer.total_epochs=15 2>&1 | tee verl_demo.log

thinking: can we move most parts and do some fusion as the current worker components, so the need for the deserialized data on driver is less?

Signed-off-by: xinyuangui2 <xinyuan.gui95@gmail.com>
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces distributed data preprocessing for gsm8k.py using Ray and adds a new utility script run_on_all_nodes.py to execute commands across a Ray cluster. The overall approach is sound, but I have identified two high-severity issues. First, there is significant code duplication in gsm8k.py between the single-node and distributed code paths, which impacts maintainability. I've suggested a refactoring to unify these paths. Second, the new run_on_all_nodes.py script uses shell=True with subprocess.run, which introduces a command injection vulnerability. I've raised this as a security concern that needs to be addressed.

Comment on lines +156 to +210
else:
# Original single-node behavior
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path, "main")
else:
dataset = datasets.load_dataset(data_source, "main")

train_dataset = dataset["train"]
test_dataset = dataset["test"]

instruction_following = 'Let\'s think step by step and output the final answer after "####".'

# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
question_raw = example.pop("question")

question = question_raw + " " + instruction_following

answer_raw = example.pop("answer")
solution = extract_solution(answer_raw)
data = {
"data_source": data_source,
"prompt": [
{
"role": "user",
"content": question,
}
],
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": solution},
"extra_info": {
"split": split,
"index": idx,
"answer": answer_raw,
"question": question_raw,
},
}
return data

return process_fn

train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)

local_save_dir_expanded = os.path.expanduser(local_save_dir)
os.makedirs(local_save_dir_expanded, exist_ok=True)

train_dataset.to_parquet(os.path.join(local_save_dir_expanded, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir_expanded, "test.parquet"))

if hdfs_dir is not None:
makedirs(hdfs_dir)

copy(src=local_save_dir_expanded, dst=hdfs_dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a large amount of code duplication between the single-node execution path (this else block) and the download_and_preprocess_on_node function. This makes the code harder to maintain, as any changes to the preprocessing logic will need to be applied in two places.

To adhere to the Don't Repeat Yourself (DRY) principle, you can refactor this to reuse the download_and_preprocess_on_node function for the single-node case as well. This can be done by treating the single-node case as a distributed execution on a single-node Ray cluster.

Suggested change
else:
# Original single-node behavior
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path, "main")
else:
dataset = datasets.load_dataset(data_source, "main")
train_dataset = dataset["train"]
test_dataset = dataset["test"]
instruction_following = 'Let\'s think step by step and output the final answer after "####".'
# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
question_raw = example.pop("question")
question = question_raw + " " + instruction_following
answer_raw = example.pop("answer")
solution = extract_solution(answer_raw)
data = {
"data_source": data_source,
"prompt": [
{
"role": "user",
"content": question,
}
],
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": solution},
"extra_info": {
"split": split,
"index": idx,
"answer": answer_raw,
"question": question_raw,
},
}
return data
return process_fn
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)
local_save_dir_expanded = os.path.expanduser(local_save_dir)
os.makedirs(local_save_dir_expanded, exist_ok=True)
train_dataset.to_parquet(os.path.join(local_save_dir_expanded, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir_expanded, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_save_dir_expanded, dst=hdfs_dir)
else:
# Original single-node behavior. To avoid code duplication, we can run the Ray task on the local node.
if not ray.is_initialized():
# Initialize Ray for a single-node execution.
ray.init(num_cpus=1)
print("Running in single-node mode.")
# Run the task and wait for it to complete.
completed_node = ray.get(
download_and_preprocess_on_node.remote(data_source, local_dataset_path, local_save_dir, hdfs_dir)
)
print(f"\nDataset successfully downloaded and preprocessed on node: {completed_node}")

Comment on lines +37 to +43
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using subprocess.run with shell=True is a security risk, as it can lead to shell injection vulnerabilities. Even though the command is passed from the command line by a trusted user, it's easy to make a mistake that could have severe consequences on all nodes of the cluster (e.g., rm -rf /).

To mitigate this, it's recommended to avoid shell=True. If you need to support shell features like pipes or redirection, consider parsing the command more carefully or explicitly documenting the risks and perhaps adding a confirmation step for potentially destructive commands.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants