Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,7 @@ struct Configuration {
size_t remote_file_download_thread_num{3};
// Maximum number of records returned in a single call to Poll() for LogScanner
size_t scanner_log_max_poll_records{500};
int64_t writer_batch_timeout_ms{100};
};

class Connection {
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
return ffi_config;
}

Expand Down
2 changes: 2 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod ffi {
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
}

struct FfiResult {
Expand Down Expand Up @@ -623,6 +624,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
writer_acks: config.writer_acks.to_string(),
writer_retries: config.writer_retries,
writer_batch_size: config.writer_batch_size,
writer_batch_timeout_ms: config.writer_batch_timeout_ms,
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num: config.remote_file_download_thread_num,
Expand Down
17 changes: 17 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ impl Config {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.batch-timeout-ms" => {
config.writer_batch_timeout_ms = value.parse::<i64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.remote-log.prefetch-num" => {
config.scanner_remote_log_prefetch_num =
value.parse::<usize>().map_err(|e| {
Expand Down Expand Up @@ -200,6 +205,18 @@ impl Config {
fn set_scanner_log_max_poll_records(&mut self, num: usize) {
self.inner.scanner_log_max_poll_records = num;
}

/// Get the writer batch timeout in milliseconds
#[getter]
fn writer_batch_timeout_ms(&self) -> i64 {
self.inner.writer_batch_timeout_ms
}

/// Set the writer batch timeout in milliseconds
#[setter]
fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
self.inner.writer_batch_timeout_ms = timeout;
}
}

impl Config {
Expand Down
3 changes: 2 additions & 1 deletion crates/fluss/src/client/write/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ pub struct RecordAccumulator {

impl RecordAccumulator {
pub fn new(config: Config) -> Self {
let batch_timeout_ms = config.writer_batch_timeout_ms;
RecordAccumulator {
config,
write_batches: Default::default(),
incomplete_batches: Default::default(),
batch_timeout_ms: 500,
batch_timeout_ms,
closed: Default::default(),
flushes_in_progress: Default::default(),
appends_in_progress: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;

const DEFAULT_ACKS: &str = "all";

Expand Down Expand Up @@ -84,6 +85,11 @@ pub struct Config {
/// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
pub scanner_log_max_poll_records: usize,

/// The maximum time to wait for a batch to be completed in milliseconds.
/// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
pub writer_batch_timeout_ms: i64,
}

impl Default for Config {
Expand All @@ -98,6 +104,7 @@ impl Default for Config {
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
}
}
}
1 change: 1 addition & 0 deletions website/docs/user-guide/cpp/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max request size (10 M
config.writer_acks = "all"; // Wait for all replicas
config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on failure
config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB)
config.writer_batch_timeout_ms = 100; // Max time to wait for a batch to fill
config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
Expand Down
19 changes: 10 additions & 9 deletions website/docs/user-guide/python/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ with await fluss.FlussConnection.create(config) as conn:

| Key | Description | Default |
|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |

Remember to close the connection when done:

Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/rust/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ let conn = FlussConnection::new(config).await?;
| `writer_retries` | Number of retries on failure | `i32::MAX` |
| `writer_batch_size` | Batch size for writes | 2 MB |
| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `writer_batch_timeout_ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |
Loading