From 4e71eb238b32f99ddbd2de6f16d1f5c14ce5cb21 Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Tue, 24 Feb 2026 12:46:37 +0000 Subject: [PATCH 1/4] make writer batch timeout configurable --- bindings/cpp/src/lib.rs | 1 + crates/fluss/src/client/write/accumulator.rs | 3 ++- crates/fluss/src/config.rs | 7 +++++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 5a1b3dbd..cdbd98c2 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -613,6 +613,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { writer_acks: config.writer_acks.to_string(), writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, + writer_batch_timeout_ms: 100, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, remote_file_download_thread_num: config.remote_file_download_thread_num, scanner_log_max_poll_records: config.scanner_log_max_poll_records, diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 2c364524..0cf501cb 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -50,11 +50,12 @@ pub struct RecordAccumulator { impl RecordAccumulator { pub fn new(config: Config) -> Self { + let batch_timeout_ms = config.writer_batch_timeout_ms; RecordAccumulator { config, write_batches: Default::default(), incomplete_batches: Default::default(), - batch_timeout_ms: 500, + batch_timeout_ms, closed: Default::default(), flushes_in_progress: Default::default(), appends_in_progress: Default::default(), diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index ecf7e122..a36d9a20 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -25,6 +25,7 @@ const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; const DEFAULT_MAX_POLL_RECORDS: usize = 500; +const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_ACKS: &str = "all"; @@ -60,6 +61,11 @@ pub struct Config { /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS) #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)] pub scanner_log_max_poll_records: usize, + + /// The maximum time to wait for a writer batch to fill up before sending. + /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT) + #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)] + pub writer_batch_timeout_ms: i64, } impl Default for Config { @@ -73,6 +79,7 @@ impl Default for Config { scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, + writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, } } } From 68d1659b852d4d4d6e5338883fe33757e23d2c08 Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Tue, 24 Feb 2026 16:00:17 +0000 Subject: [PATCH 2/4] add python wiring for writer batch timeout --- bindings/python/src/config.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index fdf90b7e..fe20de93 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -60,6 +60,11 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "writer.batch-timeout-ms" => { + config.writer_batch_timeout_ms = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "scanner.remote-log.prefetch-num" => { config.scanner_remote_log_prefetch_num = value.parse::().map_err(|e| { @@ -189,6 +194,18 @@ impl Config { fn set_scanner_log_max_poll_records(&mut self, num: usize) { self.inner.scanner_log_max_poll_records = num; } + + /// Get the writer batch timeout in milliseconds + #[getter] + fn writer_batch_timeout_ms(&self) -> i64 { + self.inner.writer_batch_timeout_ms + } + + /// Set the writer batch timeout in milliseconds + #[setter] + fn set_writer_batch_timeout_ms(&mut self, timeout: i64) { + self.inner.writer_batch_timeout_ms = timeout; + } } impl Config { From 68bc98398fb1a0c94a88929687120d925df3a725 Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Wed, 25 Feb 2026 05:55:32 +0000 Subject: [PATCH 3/4] complete cross-binding wiring and docs --- bindings/cpp/include/fluss.hpp | 1 + bindings/cpp/src/ffi_converter.hpp | 1 + bindings/cpp/src/lib.rs | 3 ++- bindings/python/src/config.rs | 22 +++++++++++----------- crates/fluss/src/config.rs | 2 +- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 6b56ba20..2c02cf3a 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -985,6 +985,7 @@ struct Configuration { size_t remote_file_download_thread_num{3}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; + int64_t writer_batch_timeout_ms{100}; }; class Connection { diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 370429bc..23c17fda 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -54,6 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; + ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; return ffi_config; } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index cdbd98c2..7b659460 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -46,6 +46,7 @@ mod ffi { scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, scanner_log_max_poll_records: usize, + writer_batch_timeout_ms: i64, } struct FfiResult { @@ -613,7 +614,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { writer_acks: config.writer_acks.to_string(), writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, - writer_batch_timeout_ms: 100, + writer_batch_timeout_ms: config.writer_batch_timeout_ms, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, remote_file_download_thread_num: config.remote_file_download_thread_num, scanner_log_max_poll_records: config.scanner_log_max_poll_records, diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index fe20de93..10bf0a33 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -195,17 +195,17 @@ impl Config { self.inner.scanner_log_max_poll_records = num; } - /// Get the writer batch timeout in milliseconds - #[getter] - fn writer_batch_timeout_ms(&self) -> i64 { - self.inner.writer_batch_timeout_ms - } - - /// Set the writer batch timeout in milliseconds - #[setter] - fn set_writer_batch_timeout_ms(&mut self, timeout: i64) { - self.inner.writer_batch_timeout_ms = timeout; - } + /// Get the writer batch timeout in milliseconds + #[getter] + fn writer_batch_timeout_ms(&self) -> i64 { + self.inner.writer_batch_timeout_ms + } + + /// Set the writer batch timeout in milliseconds + #[setter] + fn set_writer_batch_timeout_ms(&mut self, timeout: i64) { + self.inner.writer_batch_timeout_ms = timeout; + } } impl Config { diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index a36d9a20..d72f499f 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -62,7 +62,7 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)] pub scanner_log_max_poll_records: usize, - /// The maximum time to wait for a writer batch to fill up before sending. + /// The maximum time to wait for a batch to be completed in milliseconds. /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT) #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)] pub writer_batch_timeout_ms: i64, From c42f07dc83125201c87e296ec67c4721a2a4697c Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Wed, 25 Feb 2026 15:23:37 +0000 Subject: [PATCH 4/4] finalize website guides and fix formatting in C++ bindings --- bindings/cpp/src/lib.rs | 3 --- .../user-guide/cpp/example/configuration.md | 1 + .../python/example/configuration.md | 19 ++++++++++--------- .../user-guide/rust/example/configuration.md | 1 + 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 92998805..b5699576 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -609,7 +609,6 @@ fn err_from_core_error(e: &fcore::error::Error) -> ffi::FfiResult { // Connection implementation fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { - // 1. Parse the bucket assigner type (the upstream feature) let assigner_type = match config.writer_bucket_no_key_assigner.as_str() { "round_robin" => fluss::config::NoKeyAssigner::RoundRobin, "sticky" => fluss::config::NoKeyAssigner::Sticky, @@ -619,8 +618,6 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { )); } }; - - // 2. Build the config with BOTH your timeout and the new assigner let config = fluss::config::Config { bootstrap_servers: config.bootstrap_servers.to_string(), writer_request_max_size: config.writer_request_max_size, diff --git a/website/docs/user-guide/cpp/example/configuration.md b/website/docs/user-guide/cpp/example/configuration.md index 715e3c63..d73661ae 100644 --- a/website/docs/user-guide/cpp/example/configuration.md +++ b/website/docs/user-guide/cpp/example/configuration.md @@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max request size (10 M config.writer_acks = "all"; // Wait for all replicas config.writer_retries = std::numeric_limits::max(); // Retry on failure config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB) +config.writer_batch_timeout_ms = 100; // Max time to wait for a batch to fill config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin" config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count config.remote_file_download_thread_num = 3; // Download threads diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 466bf0dd..71e71994 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -23,15 +23,16 @@ with await fluss.FlussConnection.create(config) as conn: | Key | Description | Default | |------------------------------------|--------------------------------------------------------------------------------------|--------------------| -| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | -| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | -| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | -| `writer.retries` | Number of retries on failure | `2147483647` | -| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | -| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | -| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | -| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | -| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | +| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | +| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | +| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | +| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` | Remember to close the connection when done: diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index 92b9bf2f..7d7cc939 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -25,3 +25,4 @@ let conn = FlussConnection::new(config).await?; | `writer_retries` | Number of retries on failure | `i32::MAX` | | `writer_batch_size` | Batch size for writes | 2 MB | | `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `writer_batch_timeout_ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |