diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index f17cafc1..0f980b73 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -987,6 +987,7 @@ struct Configuration { size_t remote_file_download_thread_num{3}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; + int64_t writer_batch_timeout_ms{100}; }; class Connection { diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index a2e7fa26..e3763435 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -55,6 +55,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; + ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; return ffi_config; } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 9fbdc8ff..b5699576 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -47,6 +47,7 @@ mod ffi { scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, scanner_log_max_poll_records: usize, + writer_batch_timeout_ms: i64, } struct FfiResult { @@ -623,6 +624,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { writer_acks: config.writer_acks.to_string(), writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, + writer_batch_timeout_ms: config.writer_batch_timeout_ms, writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, remote_file_download_thread_num: config.remote_file_download_thread_num, diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 75056a52..5b7f2d37 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -60,6 +60,11 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "writer.batch-timeout-ms" => { + config.writer_batch_timeout_ms = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "scanner.remote-log.prefetch-num" => { config.scanner_remote_log_prefetch_num = value.parse::().map_err(|e| { @@ -200,6 +205,18 @@ impl Config { fn set_scanner_log_max_poll_records(&mut self, num: usize) { self.inner.scanner_log_max_poll_records = num; } + + /// Get the writer batch timeout in milliseconds + #[getter] + fn writer_batch_timeout_ms(&self) -> i64 { + self.inner.writer_batch_timeout_ms + } + + /// Set the writer batch timeout in milliseconds + #[setter] + fn set_writer_batch_timeout_ms(&mut self, timeout: i64) { + self.inner.writer_batch_timeout_ms = timeout; + } } impl Config { diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 2c364524..0cf501cb 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -50,11 +50,12 @@ pub struct RecordAccumulator { impl RecordAccumulator { pub fn new(config: Config) -> Self { + let batch_timeout_ms = config.writer_batch_timeout_ms; RecordAccumulator { config, write_batches: Default::default(), incomplete_batches: Default::default(), - batch_timeout_ms: 500, + batch_timeout_ms, closed: Default::default(), flushes_in_progress: Default::default(), appends_in_progress: Default::default(), diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 6ff4327f..30264932 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -26,6 +26,7 @@ const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; const DEFAULT_MAX_POLL_RECORDS: usize = 500; +const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_ACKS: &str = "all"; @@ -84,6 +85,11 @@ pub struct Config { /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS) #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)] pub scanner_log_max_poll_records: usize, + + /// The maximum time to wait for a batch to be completed in milliseconds. + /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT) + #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)] + pub writer_batch_timeout_ms: i64, } impl Default for Config { @@ -98,6 +104,7 @@ impl Default for Config { scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, + writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, } } } diff --git a/website/docs/user-guide/cpp/example/configuration.md b/website/docs/user-guide/cpp/example/configuration.md index 715e3c63..d73661ae 100644 --- a/website/docs/user-guide/cpp/example/configuration.md +++ b/website/docs/user-guide/cpp/example/configuration.md @@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max request size (10 M config.writer_acks = "all"; // Wait for all replicas config.writer_retries = std::numeric_limits::max(); // Retry on failure config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB) +config.writer_batch_timeout_ms = 100; // Max time to wait for a batch to fill config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin" config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count config.remote_file_download_thread_num = 3; // Download threads diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 466bf0dd..71e71994 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -23,15 +23,16 @@ with await fluss.FlussConnection.create(config) as conn: | Key | Description | Default | |------------------------------------|--------------------------------------------------------------------------------------|--------------------| -| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | -| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | -| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | -| `writer.retries` | Number of retries on failure | `2147483647` | -| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | -| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | -| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | -| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | -| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | +| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | +| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | +| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | +| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` | Remember to close the connection when done: diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index 92b9bf2f..7d7cc939 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -25,3 +25,4 @@ let conn = FlussConnection::new(config).await?; | `writer_retries` | Number of retries on failure | `i32::MAX` | | `writer_batch_size` | Batch size for writes | 2 MB | | `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `writer_batch_timeout_ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |