[client] Make batch timeout configurable#371
Conversation
|
@Prajwal-banakar Thank you for PR. |
|
Hi @fresh-borzoni Thanks for review! Updated the python wiring |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Ty, left some comments
-> We need to modify docs to mention new config.
bindings/cpp/src/lib.rs
Outdated
| writer_acks: config.writer_acks.to_string(), | ||
| writer_retries: config.writer_retries, | ||
| writer_batch_size: config.writer_batch_size, | ||
| writer_batch_timeout_ms: 100, |
There was a problem hiding this comment.
We need to pass the value from config ->
FfiConfig -Configuration(fluss.hpp) - ffi_converter.cpp changes
|
Hi @fresh-borzoni, I've resolved the merge conflicts with main and completed the wiring for the writer_batch_timeout_ms across both Python and C++ bindings. I also ran cargo fmt across the workspace to fix the formatting issues identified by the CI. Verified the build locally with cargo check -p fluss-cpp and cargo check -p fluss_python. Ready for your review! |
|
@Prajwal-banakar Ty, would you mind to update website docs to reference new config? |
bindings/cpp/src/lib.rs
Outdated
|
|
||
| // Connection implementation | ||
| fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { | ||
| // 1. Parse the bucket assigner type (the upstream feature) |
There was a problem hiding this comment.
we don't need this context, it's only rebase/conflict fix related
bindings/cpp/src/lib.rs
Outdated
| } | ||
| }; | ||
|
|
||
| // 2. Build the config with BOTH your timeout and the new assigner |
|
Hi @fresh-borzoni updated website docs and addressed feedback! PTAL! |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Ty, LGTM
luoyuxia
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Thanks. LGMT. Merging..
|
@fresh-borzoni @luoyuxia Thanks both for review and merging! Happy to help! |
Purpose
Linked issue: close #321
The purpose of this change is to make the client-side writer batch timeout configurable. Previously, this was hardcoded to 500ms, which differed from the Java client's default of 100ms and prevented users from tuning the latency/throughput trade-off.
Brief change log
Config Enhancement: Added writer_batch_timeout_ms to the Config struct in config.rs with a default value of 100ms.
Accumulator Update: Updated RecordAccumulator in accumulator.rs to initialize using the configuration value.
Python Binding Wiring: Added writer.batch-timeout-ms mapping to the Python Config, including getters and setters.
C++ Binding Wiring: Fully wired the timeout through the C++ FFI bridge, updated the FfiConfig struct, and ensured proper conversion in the FFI layer.
Merge & Sync: Successfully resolved merge conflicts with the main branch, integrating the new writer_bucket_no_key_assigner alongside the batch timeout.
Code Health: Applied cargo fmt across the workspace to ensure consistent styling and formatting.
Tests
Unit Tests: Ran cargo test --workspace. All 182 unit tests passed, including existing accumulator tests.
Integration Tests: Ran cargo test --features integration_tests --workspace. Verified that core logic remains sound despite local environment port contention in Docker.
Manual Verification: Used git diff to confirm the total removal of the hardcoded 500ms value.
API and Format
This change adds a new configuration option writer_batch_timeout_ms to the client Config. It does not affect the storage format or existing public method signatures.
Documentation
Website Update: Updated the official user guides for Rust, Python, and C++ in the website/docs directory to include the new writer_batch_timeout_ms (or its language-specific equivalent) in the configuration tables and examples.