-
Notifications
You must be signed in to change notification settings - Fork 17
feat: enhance storage manager with parallel data handling #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enhance storage manager with parallel data handling #11
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThe pull request includes a minor typing fix in Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the storage manager to use parallel thread execution for data handling operations, replacing the previous async/await pattern with concurrent.futures.ThreadPoolExecutor. The change aims to improve performance by processing multiple storage units simultaneously.
Key Changes:
- Replaced
asyncio.gather()withThreadPoolExecutorfor parallel execution in put and get operations - Added
concurrent.futuresimport to support the new threading implementation - Updated type hint to use string literal for forward reference
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| transfer_queue/storage.py | Refactored put_data and get_data methods to use ThreadPoolExecutor instead of asyncio.gather for parallel storage unit operations |
| transfer_queue/client.py | Added string literal quotes around type hint for forward reference compatibility |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return asyncio.run( | ||
| self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id) | ||
| ) |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using asyncio.run() inside a thread within an async function is problematic. This creates a new event loop in each thread, which can cause issues if the async methods (_put_to_single_storage_unit) expect to run in the same event loop context as the parent. Consider using asyncio.create_task() or restructuring to maintain a single event loop.
| # Wait for all threads to complete and collect results | ||
| results = [future.result() for future in concurrent.futures.as_completed(futures)] |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using as_completed(futures) returns futures in completion order, not submission order. This breaks the correspondence between storage_meta_groups.items() and results, potentially causing data to be associated with the wrong storage unit. Use results = [future.result() for future in futures] to maintain order.
| # Wait for all threads to complete and collect results | |
| results = [future.result() for future in concurrent.futures.as_completed(futures)] | |
| # Wait for all threads to complete and collect results in submission order | |
| results = [future.result() for future in futures] |
| return asyncio.run( | ||
| self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id) | ||
| ) |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using asyncio.run() inside a thread within an async function is problematic. This creates a new event loop in each thread, which can cause issues if the async methods (_get_from_single_storage_unit) expect to run in the same event loop context as the parent. Consider using asyncio.create_task() or restructuring to maintain a single event loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
transfer_queue/client.py(1 hunks)transfer_queue/storage.py(3 hunks)
🔇 Additional comments (1)
transfer_queue/storage.py (1)
16-16: Import added for problematic threading approach.While the import is syntactically correct, it's being used to implement an anti-pattern (see comments below on
put_dataandget_data).
|
|
||
| def process_zmq_server_info( | ||
| handlers: dict[Any, TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit], | ||
| handlers: dict[Any, "TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify if forward reference is necessary.
The union type has been changed to a forward reference (quoted string), but all three classes (TransferQueueController, TransferQueueStorageManager, SimpleStorageUnit) are already imported at the top of the file. Forward references are typically needed to avoid circular imports or forward-declaration issues.
Run this script to check if there are circular import issues that justify this change:
If no circular imports exist and the Python version supports the | union syntax (3.10+), the forward reference may be unnecessary. However, if this resolves a circular import or you're targeting Python 3.9, the change is appropriate.
🏁 Script executed:
#!/bin/bash
# Check for potential circular imports between client.py and the referenced modules
echo "=== Checking imports in client.py ==="
rg -n "^from|^import" transfer_queue/client.py
echo -e "\n=== Checking if controller.py imports client ==="
rg -n "from.*client|import.*client" transfer_queue/controller.py
echo -e "\n=== Checking if storage.py imports client ==="
rg -n "from.*client|import.*client" transfer_queue/storage.py
echo -e "\n=== Checking Python version (PEP 604 '|' union syntax requires 3.10+) ==="
rg -n "python_requires|Programming Language :: Python :: 3\." setup.py pyproject.toml 2>/dev/null || echo "Version info not found in setup.py/pyproject.toml"Length of output: 1419
I need to check the Python version requirement in alternative locations and examine the actual code context around line 579.
I need to verify whether the codebase uses from __future__ import annotations and check the Python version requirement more thoroughly.
Let me check if client.py has from __future__ import annotations and search for Python version configuration more thoroughly.
🏁 Script executed:
#!/bin/bash
echo "=== Checking first 30 lines of client.py for future imports ==="
head -30 transfer_queue/client.py
echo -e "\n=== Searching for Python version in all config files ==="
find . -maxdepth 2 -type f \( -name "*.cfg" -o -name "*.toml" -o -name ".python-version" -o -name "tox.ini" -o -name "setup.cfg" \) -exec grep -l "python" {} \;
echo -e "\n=== Checking for python_requires in setup.cfg (if exists) ==="
if [ -f "setup.cfg" ]; then
grep -i "python_requires\|version" setup.cfg | head -10
fi
echo -e "\n=== Checking for python versions in any YAML/GitHub Actions workflows ==="
find . -type f -name "*.yml" -o -name "*.yaml" | head -5 | xargs grep -i "python" 2>/dev/null | head -20 || echo "No YAML files found"Length of output: 2614
Now let me check the actual pyproject.toml content and examine line 579 with context.
🏁 Script executed:
#!/bin/bash
echo "=== Reading pyproject.toml ==="
cat pyproject.toml | grep -A 5 -B 5 "requires-python\|python"
echo -e "\n=== Viewing line 579 and surrounding context in client.py ==="
sed -n '570,590p' transfer_queue/client.pyLength of output: 1120
Remove the forward reference quotes on line 579.
Verification confirms the forward reference is unnecessary. The codebase requires Python 3.10+ (requires-python = ">=3.10"), which natively supports PEP 604 union syntax (|) without quotes. There are no circular imports between client.py and the referenced modules (controller.py and storage.py don't import client.py), and all three classes are already imported at the top of the file. Without from __future__ import annotations, the quotes provide no functional benefit.
Change:
handlers: dict[Any, TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit],🤖 Prompt for AI Agents
In transfer_queue/client.py around line 579, the type annotation uses
unnecessary forward-reference quotes for the union of TransferQueueController,
TransferQueueStorageManager, and SimpleStorageUnit; remove the surrounding
quotes so the annotation reads using native PEP 604 union syntax (handlers:
dict[Any, TransferQueueController | TransferQueueStorageManager |
SimpleStorageUnit],) ensuring the three classes are imported at top and no other
changes are required.
| # send data to each storage unit using parallel threads for each meta_group | ||
| def put_data_single(storage_id, meta_group): | ||
| return asyncio.run( | ||
| self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id) | ||
| ) | ||
|
|
||
| # Create and run threads for each meta_group | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor: | ||
| futures = [ | ||
| executor.submit(put_data_single, storage_id, meta_group) | ||
| for storage_id, meta_group in storage_meta_groups.items() | ||
| ] | ||
| # Wait for all threads to complete | ||
| concurrent.futures.wait(futures) | ||
| # Check for exceptions | ||
| for future in futures: | ||
| future.result() # This will raise any exception that occurred |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Replace thread-based parallelism with native async concurrency.
This implementation has several critical issues:
-
Event loop anti-pattern: Calling
asyncio.run()in each thread creates a new event loop per thread, which violates asyncio design principles and is explicitly discouraged in the documentation. -
Blocks the event loop: The
put_datamethod isasync, but now blocks onconcurrent.futures.wait(), defeating the purpose of async I/O and preventing other tasks from running. -
Thread-safety concerns: ZMQ sockets/contexts created by the
@dynamic_storage_manager_socketdecorator may not be thread-safe when accessed from multiple threads concurrently. -
Performance regression: This approach creates OS threads (expensive) that block waiting for async operations, which is slower than the original
asyncio.gather()approach that efficiently multiplexes I/O operations on a single thread.
The original asyncio.gather() approach is the correct pattern for parallelizing async operations. Please revert to:
- # send data to each storage unit using parallel threads for each meta_group
- def put_data_single(storage_id, meta_group):
- return asyncio.run(
- self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
- )
-
- # Create and run threads for each meta_group
- with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
- futures = [
- executor.submit(put_data_single, storage_id, meta_group)
- for storage_id, meta_group in storage_meta_groups.items()
- ]
- # Wait for all threads to complete
- concurrent.futures.wait(futures)
- # Check for exceptions
- for future in futures:
- future.result() # This will raise any exception that occurred
+ # send data to each storage unit in parallel using asyncio.gather
+ tasks = [
+ self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
+ for storage_id, meta_group in storage_meta_groups.items()
+ ]
+ await asyncio.gather(*tasks)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # send data to each storage unit using parallel threads for each meta_group | |
| def put_data_single(storage_id, meta_group): | |
| return asyncio.run( | |
| self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id) | |
| ) | |
| # Create and run threads for each meta_group | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor: | |
| futures = [ | |
| executor.submit(put_data_single, storage_id, meta_group) | |
| for storage_id, meta_group in storage_meta_groups.items() | |
| ] | |
| # Wait for all threads to complete | |
| concurrent.futures.wait(futures) | |
| # Check for exceptions | |
| for future in futures: | |
| future.result() # This will raise any exception that occurred | |
| # send data to each storage unit in parallel using asyncio.gather | |
| tasks = [ | |
| self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id) | |
| for storage_id, meta_group in storage_meta_groups.items() | |
| ] | |
| await asyncio.gather(*tasks) |
🤖 Prompt for AI Agents
In transfer_queue/storage.py around lines 1052 to 1068, the code spawns OS
threads and calls asyncio.run() inside them which creates per-thread event
loops, blocks the async flow with concurrent.futures.wait(), and risks
thread-safety issues for ZMQ sockets; replace this with native async concurrency
by building a list of coroutines calling
self._put_to_single_storage_unit(get_transfer_data(meta_group, data),
target_storage_unit=storage_id) for each storage_id/meta_group, then await
asyncio.gather(*coros) (or await asyncio.gather(*coros, return_exceptions=True)
and handle exceptions centrally), avoid creating threads or calling
asyncio.run(), and if you need to limit concurrency use an asyncio.Semaphore or
asyncio.TaskGroup to throttle tasks while preserving single-event-loop execution
and avoiding use of threads for ZMQ sockets.
| # retrieve data using parallel threads for each meta_group | ||
| def get_data_single(storage_id, meta_group): | ||
| return asyncio.run( | ||
| self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id) | ||
| ) | ||
|
|
||
| results = await asyncio.gather(*tasks) | ||
| # Create and run threads for each meta_group | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor: | ||
| futures = [ | ||
| executor.submit(get_data_single, storage_id, meta_group) | ||
| for storage_id, meta_group in storage_meta_groups.items() | ||
| ] | ||
| # Wait for all threads to complete and collect results | ||
| results = [future.result() for future in concurrent.futures.as_completed(futures)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Same threading anti-pattern as in put_data.
This suffers from the same critical issues as put_data (lines 1052-1068): calling asyncio.run() in threads is an anti-pattern that breaks async semantics, introduces thread-safety concerns, and degrades performance.
Revert to the native async approach:
- # retrieve data using parallel threads for each meta_group
- def get_data_single(storage_id, meta_group):
- return asyncio.run(
- self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
- )
-
- # Create and run threads for each meta_group
- with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
- futures = [
- executor.submit(get_data_single, storage_id, meta_group)
- for storage_id, meta_group in storage_meta_groups.items()
- ]
- # Wait for all threads to complete and collect results
- results = [future.result() for future in concurrent.futures.as_completed(futures)]
+ # retrieve data from each storage unit in parallel using asyncio.gather
+ tasks = [
+ self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
+ for storage_id, meta_group in storage_meta_groups.items()
+ ]
+ results = await asyncio.gather(*tasks)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In transfer_queue/storage.py around lines 1145-1158, the code spawns threads
that call asyncio.run() per thread which is an anti-pattern; replace this with a
native async pattern: remove ThreadPoolExecutor and the get_data_single wrapper,
create async tasks that call self._get_from_single_storage_unit(...) for each
storage_id/meta_group, optionally guard concurrency with an asyncio.Semaphore
(using max_workers as the limit), await asyncio.gather on those tasks to collect
results, and ensure the surrounding function remains async so you await the
gather call rather than running event loops inside threads.
[BREAKING] Refactor: Provide a general storage abstraction layer
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
e594093 to
d61e218
Compare
Summary by CodeRabbit