Skip to content

Conversation

@jianjunzhong
Copy link
Contributor

@jianjunzhong jianjunzhong commented Oct 25, 2025

Summary by CodeRabbit

  • Refactor
    • Enhanced internal type annotations for improved code reliability.
    • Optimized multi-storage data operations by switching to thread-based concurrent execution, potentially improving performance.

@coderabbitai
Copy link

coderabbitai bot commented Oct 25, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

The pull request includes a minor typing fix in client.py using forward-reference annotations and a refactoring in storage.py that replaces asyncio.gather with ThreadPoolExecutor for concurrent multi-storage-unit operations in put_data and get_data methods.

Changes

Cohort / File(s) Summary
Typing annotation fix
transfer_queue/client.py
Updated process_zmq_server_info method signature: the handlers parameter type annotation changed from an unquoted union (TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit) to a quoted forward-reference union to defer type evaluation.
Concurrency paradigm shift
transfer_queue/storage.py
Replaced asyncio.gather-based parallelism with ThreadPoolExecutor-based concurrency in put_data and get_data methods. Each per-storage-unit task now wraps async calls within asyncio.run inside a thread, with explicit thread synchronization via futures and result collection. Added concurrent.futures import.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Extra attention areas:
    • Thread safety and potential race conditions in the ThreadPoolExecutor implementation
    • Exception handling and propagation from futures vs. previous asyncio exception handling behavior
    • Performance implications of switching from async/await to thread pool (context switching overhead, GIL interactions)
    • Correctness of asyncio.run calls within threads and event loop management

Poem

🐰 From async chains to threaded streams,
Our types now forward-reference in dreams,
The storage units dance in parallel flight,
With thread pools glowing, futures so bright!
✨🧵

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat: enhance storage manager with parallel data handling" is directly related to the main change in the changeset. The primary modification involves replacing asyncio.gather calls with ThreadPoolExecutor-based parallelism in transfer_queue/storage.py to enable concurrent execution for multi-storage-unit operations. The title accurately identifies both the component (storage manager) and the nature of the enhancement (parallel data handling). While the PR also includes a minor typing fix in client.py, the title correctly focuses on the substantive change, which is the parallelism enhancement. The title is specific enough to convey the primary intent without unnecessary detail.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@0oshowero0 0oshowero0 requested a review from Copilot October 25, 2025 02:55
@0oshowero0
Copy link
Owner

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Oct 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the storage manager to use parallel thread execution for data handling operations, replacing the previous async/await pattern with concurrent.futures.ThreadPoolExecutor. The change aims to improve performance by processing multiple storage units simultaneously.

Key Changes:

  • Replaced asyncio.gather() with ThreadPoolExecutor for parallel execution in put and get operations
  • Added concurrent.futures import to support the new threading implementation
  • Updated type hint to use string literal for forward reference

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
transfer_queue/storage.py Refactored put_data and get_data methods to use ThreadPoolExecutor instead of asyncio.gather for parallel storage unit operations
transfer_queue/client.py Added string literal quotes around type hint for forward reference compatibility

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1054 to +1056
return asyncio.run(
self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
)
Copy link

Copilot AI Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.run() inside a thread within an async function is problematic. This creates a new event loop in each thread, which can cause issues if the async methods (_put_to_single_storage_unit) expect to run in the same event loop context as the parent. Consider using asyncio.create_task() or restructuring to maintain a single event loop.

Copilot uses AI. Check for mistakes.
Comment on lines +1157 to +1158
# Wait for all threads to complete and collect results
results = [future.result() for future in concurrent.futures.as_completed(futures)]
Copy link

Copilot AI Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using as_completed(futures) returns futures in completion order, not submission order. This breaks the correspondence between storage_meta_groups.items() and results, potentially causing data to be associated with the wrong storage unit. Use results = [future.result() for future in futures] to maintain order.

Suggested change
# Wait for all threads to complete and collect results
results = [future.result() for future in concurrent.futures.as_completed(futures)]
# Wait for all threads to complete and collect results in submission order
results = [future.result() for future in futures]

Copilot uses AI. Check for mistakes.
Comment on lines +1147 to +1149
return asyncio.run(
self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
)
Copy link

Copilot AI Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.run() inside a thread within an async function is problematic. This creates a new event loop in each thread, which can cause issues if the async methods (_get_from_single_storage_unit) expect to run in the same event loop context as the parent. Consider using asyncio.create_task() or restructuring to maintain a single event loop.

Copilot uses AI. Check for mistakes.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6042b7b and e594093.

📒 Files selected for processing (2)
  • transfer_queue/client.py (1 hunks)
  • transfer_queue/storage.py (3 hunks)
🔇 Additional comments (1)
transfer_queue/storage.py (1)

16-16: Import added for problematic threading approach.

While the import is syntactically correct, it's being used to implement an anti-pattern (see comments below on put_data and get_data).


def process_zmq_server_info(
handlers: dict[Any, TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit],
handlers: dict[Any, "TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit"],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify if forward reference is necessary.

The union type has been changed to a forward reference (quoted string), but all three classes (TransferQueueController, TransferQueueStorageManager, SimpleStorageUnit) are already imported at the top of the file. Forward references are typically needed to avoid circular imports or forward-declaration issues.

Run this script to check if there are circular import issues that justify this change:

If no circular imports exist and the Python version supports the | union syntax (3.10+), the forward reference may be unnecessary. However, if this resolves a circular import or you're targeting Python 3.9, the change is appropriate.


🏁 Script executed:

#!/bin/bash
# Check for potential circular imports between client.py and the referenced modules

echo "=== Checking imports in client.py ==="
rg -n "^from|^import" transfer_queue/client.py

echo -e "\n=== Checking if controller.py imports client ==="
rg -n "from.*client|import.*client" transfer_queue/controller.py

echo -e "\n=== Checking if storage.py imports client ==="
rg -n "from.*client|import.*client" transfer_queue/storage.py

echo -e "\n=== Checking Python version (PEP 604 '|' union syntax requires 3.10+) ==="
rg -n "python_requires|Programming Language :: Python :: 3\." setup.py pyproject.toml 2>/dev/null || echo "Version info not found in setup.py/pyproject.toml"

Length of output: 1419


I need to check the Python version requirement in alternative locations and examine the actual code context around line 579.

I need to verify whether the codebase uses from __future__ import annotations and check the Python version requirement more thoroughly.

Let me check if client.py has from __future__ import annotations and search for Python version configuration more thoroughly.


🏁 Script executed:

#!/bin/bash

echo "=== Checking first 30 lines of client.py for future imports ==="
head -30 transfer_queue/client.py

echo -e "\n=== Searching for Python version in all config files ==="
find . -maxdepth 2 -type f \( -name "*.cfg" -o -name "*.toml" -o -name ".python-version" -o -name "tox.ini" -o -name "setup.cfg" \) -exec grep -l "python" {} \;

echo -e "\n=== Checking for python_requires in setup.cfg (if exists) ==="
if [ -f "setup.cfg" ]; then
  grep -i "python_requires\|version" setup.cfg | head -10
fi

echo -e "\n=== Checking for python versions in any YAML/GitHub Actions workflows ==="
find . -type f -name "*.yml" -o -name "*.yaml" | head -5 | xargs grep -i "python" 2>/dev/null | head -20 || echo "No YAML files found"

Length of output: 2614


Now let me check the actual pyproject.toml content and examine line 579 with context.


🏁 Script executed:

#!/bin/bash

echo "=== Reading pyproject.toml ==="
cat pyproject.toml | grep -A 5 -B 5 "requires-python\|python"

echo -e "\n=== Viewing line 579 and surrounding context in client.py ==="
sed -n '570,590p' transfer_queue/client.py

Length of output: 1120


Remove the forward reference quotes on line 579.

Verification confirms the forward reference is unnecessary. The codebase requires Python 3.10+ (requires-python = ">=3.10"), which natively supports PEP 604 union syntax (|) without quotes. There are no circular imports between client.py and the referenced modules (controller.py and storage.py don't import client.py), and all three classes are already imported at the top of the file. Without from __future__ import annotations, the quotes provide no functional benefit.

Change:

handlers: dict[Any, TransferQueueController | TransferQueueStorageManager | SimpleStorageUnit],
🤖 Prompt for AI Agents
In transfer_queue/client.py around line 579, the type annotation uses
unnecessary forward-reference quotes for the union of TransferQueueController,
TransferQueueStorageManager, and SimpleStorageUnit; remove the surrounding
quotes so the annotation reads using native PEP 604 union syntax (handlers:
dict[Any, TransferQueueController | TransferQueueStorageManager |
SimpleStorageUnit],) ensuring the three classes are imported at top and no other
changes are required.

Comment on lines +1052 to +1068
# send data to each storage unit using parallel threads for each meta_group
def put_data_single(storage_id, meta_group):
return asyncio.run(
self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
)

# Create and run threads for each meta_group
with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
futures = [
executor.submit(put_data_single, storage_id, meta_group)
for storage_id, meta_group in storage_meta_groups.items()
]
# Wait for all threads to complete
concurrent.futures.wait(futures)
# Check for exceptions
for future in futures:
future.result() # This will raise any exception that occurred
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Replace thread-based parallelism with native async concurrency.

This implementation has several critical issues:

  1. Event loop anti-pattern: Calling asyncio.run() in each thread creates a new event loop per thread, which violates asyncio design principles and is explicitly discouraged in the documentation.

  2. Blocks the event loop: The put_data method is async, but now blocks on concurrent.futures.wait(), defeating the purpose of async I/O and preventing other tasks from running.

  3. Thread-safety concerns: ZMQ sockets/contexts created by the @dynamic_storage_manager_socket decorator may not be thread-safe when accessed from multiple threads concurrently.

  4. Performance regression: This approach creates OS threads (expensive) that block waiting for async operations, which is slower than the original asyncio.gather() approach that efficiently multiplexes I/O operations on a single thread.

The original asyncio.gather() approach is the correct pattern for parallelizing async operations. Please revert to:

-        # send data to each storage unit using parallel threads for each meta_group
-        def put_data_single(storage_id, meta_group):
-            return asyncio.run(
-                self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
-            )
-
-        # Create and run threads for each meta_group
-        with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
-            futures = [
-                executor.submit(put_data_single, storage_id, meta_group)
-                for storage_id, meta_group in storage_meta_groups.items()
-            ]
-            # Wait for all threads to complete
-            concurrent.futures.wait(futures)
-            # Check for exceptions
-            for future in futures:
-                future.result()  # This will raise any exception that occurred
+        # send data to each storage unit in parallel using asyncio.gather
+        tasks = [
+            self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
+            for storage_id, meta_group in storage_meta_groups.items()
+        ]
+        await asyncio.gather(*tasks)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# send data to each storage unit using parallel threads for each meta_group
def put_data_single(storage_id, meta_group):
return asyncio.run(
self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
)
# Create and run threads for each meta_group
with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
futures = [
executor.submit(put_data_single, storage_id, meta_group)
for storage_id, meta_group in storage_meta_groups.items()
]
# Wait for all threads to complete
concurrent.futures.wait(futures)
# Check for exceptions
for future in futures:
future.result() # This will raise any exception that occurred
# send data to each storage unit in parallel using asyncio.gather
tasks = [
self._put_to_single_storage_unit(get_transfer_data(meta_group, data), target_storage_unit=storage_id)
for storage_id, meta_group in storage_meta_groups.items()
]
await asyncio.gather(*tasks)
🤖 Prompt for AI Agents
In transfer_queue/storage.py around lines 1052 to 1068, the code spawns OS
threads and calls asyncio.run() inside them which creates per-thread event
loops, blocks the async flow with concurrent.futures.wait(), and risks
thread-safety issues for ZMQ sockets; replace this with native async concurrency
by building a list of coroutines calling
self._put_to_single_storage_unit(get_transfer_data(meta_group, data),
target_storage_unit=storage_id) for each storage_id/meta_group, then await
asyncio.gather(*coros) (or await asyncio.gather(*coros, return_exceptions=True)
and handle exceptions centrally), avoid creating threads or calling
asyncio.run(), and if you need to limit concurrency use an asyncio.Semaphore or
asyncio.TaskGroup to throttle tasks while preserving single-event-loop execution
and avoiding use of threads for ZMQ sockets.

Comment on lines 1145 to 1158
# retrieve data using parallel threads for each meta_group
def get_data_single(storage_id, meta_group):
return asyncio.run(
self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
)

results = await asyncio.gather(*tasks)
# Create and run threads for each meta_group
with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
futures = [
executor.submit(get_data_single, storage_id, meta_group)
for storage_id, meta_group in storage_meta_groups.items()
]
# Wait for all threads to complete and collect results
results = [future.result() for future in concurrent.futures.as_completed(futures)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Same threading anti-pattern as in put_data.

This suffers from the same critical issues as put_data (lines 1052-1068): calling asyncio.run() in threads is an anti-pattern that breaks async semantics, introduces thread-safety concerns, and degrades performance.

Revert to the native async approach:

-        # retrieve data using parallel threads for each meta_group
-        def get_data_single(storage_id, meta_group):
-            return asyncio.run(
-                self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
-            )
-
-        # Create and run threads for each meta_group
-        with concurrent.futures.ThreadPoolExecutor(max_workers=len(storage_meta_groups)) as executor:
-            futures = [
-                executor.submit(get_data_single, storage_id, meta_group)
-                for storage_id, meta_group in storage_meta_groups.items()
-            ]
-            # Wait for all threads to complete and collect results
-            results = [future.result() for future in concurrent.futures.as_completed(futures)]
+        # retrieve data from each storage unit in parallel using asyncio.gather
+        tasks = [
+            self._get_from_single_storage_unit(meta_group.get_transfer_data(), target_storage_unit=storage_id)
+            for storage_id, meta_group in storage_meta_groups.items()
+        ]
+        results = await asyncio.gather(*tasks)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In transfer_queue/storage.py around lines 1145-1158, the code spawns threads
that call asyncio.run() per thread which is an anti-pattern; replace this with a
native async pattern: remove ThreadPoolExecutor and the get_data_single wrapper,
create async tasks that call self._get_from_single_storage_unit(...) for each
storage_id/meta_group, optionally guard concurrency with an asyncio.Semaphore
(using max_workers as the limit), await asyncio.gather on those tasks to collect
results, and ensure the surrounding function remains async so you await the
gather call rather than running event loops inside threads.

0oshowero0 and others added 2 commits October 25, 2025 15:42
[BREAKING] Refactor: Provide a general storage abstraction layer
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
@0oshowero0 0oshowero0 deleted the branch 0oshowero0:han/unified_storage_abstract October 29, 2025 06:13
@0oshowero0 0oshowero0 closed this Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants