-
Notifications
You must be signed in to change notification settings - Fork 3k
test on avoiding unnecessary object deserialization #4887
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: xinyuangui2 <xinyuan.gui95@gmail.com>
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces distributed data preprocessing for gsm8k.py using Ray and adds a new utility script run_on_all_nodes.py to execute commands across a Ray cluster. The overall approach is sound, but I have identified two high-severity issues. First, there is significant code duplication in gsm8k.py between the single-node and distributed code paths, which impacts maintainability. I've suggested a refactoring to unify these paths. Second, the new run_on_all_nodes.py script uses shell=True with subprocess.run, which introduces a command injection vulnerability. I've raised this as a security concern that needs to be addressed.
| else: | ||
| # Original single-node behavior | ||
| if local_dataset_path is not None: | ||
| dataset = datasets.load_dataset(local_dataset_path, "main") | ||
| else: | ||
| dataset = datasets.load_dataset(data_source, "main") | ||
|
|
||
| train_dataset = dataset["train"] | ||
| test_dataset = dataset["test"] | ||
|
|
||
| instruction_following = 'Let\'s think step by step and output the final answer after "####".' | ||
|
|
||
| # add a row to each data item that represents a unique id | ||
| def make_map_fn(split): | ||
| def process_fn(example, idx): | ||
| question_raw = example.pop("question") | ||
|
|
||
| question = question_raw + " " + instruction_following | ||
|
|
||
| answer_raw = example.pop("answer") | ||
| solution = extract_solution(answer_raw) | ||
| data = { | ||
| "data_source": data_source, | ||
| "prompt": [ | ||
| { | ||
| "role": "user", | ||
| "content": question, | ||
| } | ||
| ], | ||
| "ability": "math", | ||
| "reward_model": {"style": "rule", "ground_truth": solution}, | ||
| "extra_info": { | ||
| "split": split, | ||
| "index": idx, | ||
| "answer": answer_raw, | ||
| "question": question_raw, | ||
| }, | ||
| } | ||
| return data | ||
|
|
||
| return process_fn | ||
|
|
||
| train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True) | ||
| test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True) | ||
|
|
||
| local_save_dir_expanded = os.path.expanduser(local_save_dir) | ||
| os.makedirs(local_save_dir_expanded, exist_ok=True) | ||
|
|
||
| train_dataset.to_parquet(os.path.join(local_save_dir_expanded, "train.parquet")) | ||
| test_dataset.to_parquet(os.path.join(local_save_dir_expanded, "test.parquet")) | ||
|
|
||
| if hdfs_dir is not None: | ||
| makedirs(hdfs_dir) | ||
|
|
||
| copy(src=local_save_dir_expanded, dst=hdfs_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a large amount of code duplication between the single-node execution path (this else block) and the download_and_preprocess_on_node function. This makes the code harder to maintain, as any changes to the preprocessing logic will need to be applied in two places.
To adhere to the Don't Repeat Yourself (DRY) principle, you can refactor this to reuse the download_and_preprocess_on_node function for the single-node case as well. This can be done by treating the single-node case as a distributed execution on a single-node Ray cluster.
| else: | |
| # Original single-node behavior | |
| if local_dataset_path is not None: | |
| dataset = datasets.load_dataset(local_dataset_path, "main") | |
| else: | |
| dataset = datasets.load_dataset(data_source, "main") | |
| train_dataset = dataset["train"] | |
| test_dataset = dataset["test"] | |
| instruction_following = 'Let\'s think step by step and output the final answer after "####".' | |
| # add a row to each data item that represents a unique id | |
| def make_map_fn(split): | |
| def process_fn(example, idx): | |
| question_raw = example.pop("question") | |
| question = question_raw + " " + instruction_following | |
| answer_raw = example.pop("answer") | |
| solution = extract_solution(answer_raw) | |
| data = { | |
| "data_source": data_source, | |
| "prompt": [ | |
| { | |
| "role": "user", | |
| "content": question, | |
| } | |
| ], | |
| "ability": "math", | |
| "reward_model": {"style": "rule", "ground_truth": solution}, | |
| "extra_info": { | |
| "split": split, | |
| "index": idx, | |
| "answer": answer_raw, | |
| "question": question_raw, | |
| }, | |
| } | |
| return data | |
| return process_fn | |
| train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True) | |
| test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True) | |
| local_save_dir_expanded = os.path.expanduser(local_save_dir) | |
| os.makedirs(local_save_dir_expanded, exist_ok=True) | |
| train_dataset.to_parquet(os.path.join(local_save_dir_expanded, "train.parquet")) | |
| test_dataset.to_parquet(os.path.join(local_save_dir_expanded, "test.parquet")) | |
| if hdfs_dir is not None: | |
| makedirs(hdfs_dir) | |
| copy(src=local_save_dir_expanded, dst=hdfs_dir) | |
| else: | |
| # Original single-node behavior. To avoid code duplication, we can run the Ray task on the local node. | |
| if not ray.is_initialized(): | |
| # Initialize Ray for a single-node execution. | |
| ray.init(num_cpus=1) | |
| print("Running in single-node mode.") | |
| # Run the task and wait for it to complete. | |
| completed_node = ray.get( | |
| download_and_preprocess_on_node.remote(data_source, local_dataset_path, local_save_dir, hdfs_dir) | |
| ) | |
| print(f"\nDataset successfully downloaded and preprocessed on node: {completed_node}") |
| result = subprocess.run( | ||
| command, | ||
| shell=True, | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=timeout, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using subprocess.run with shell=True is a security risk, as it can lead to shell injection vulnerabilities. Even though the command is passed from the command line by a trusted user, it's easy to make a mistake that could have severe consequences on all nodes of the cluster (e.g., rm -rf /).
To mitigate this, it's recommended to avoid shell=True. If you need to support shell features like pipes or redirection, consider parsing the command more carefully or explicitly documenting the risks and perhaps adding a confirmation step for potentially destructive commands.
How to test
thinking: can we move most parts and do some fusion as the current worker components, so the need for the deserialized data on driver is less?