Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,8 @@ struct Configuration {
int32_t writer_retries{std::numeric_limits<int32_t>::max()};
// Writer batch size in bytes (2 MB)
int32_t writer_batch_size{2 * 1024 * 1024};
// Bucket assigner for tables without bucket keys: "sticky" or "round_robin"
std::string writer_bucket_no_key_assigner{"sticky"};
// Number of remote log batches to prefetch during scanning
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.writer_acks = rust::String(config.writer_acks);
ffi_config.writer_retries = config.writer_retries;
ffi_config.writer_batch_size = config.writer_batch_size;
ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner);
ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
Expand Down
11 changes: 11 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod ffi {
writer_acks: String,
writer_retries: i32,
writer_batch_size: i32,
writer_bucket_no_key_assigner: String,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
scanner_log_max_poll_records: usize,
Expand Down Expand Up @@ -607,12 +608,22 @@ fn err_from_core_error(e: &fcore::error::Error) -> ffi::FfiResult {

// Connection implementation
fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
"round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
"sticky" => fluss::config::NoKeyAssigner::Sticky,
other => {
return Err(format!(
"Unknown bucket assigner type: '{other}', expected 'sticky' or 'round_robin'"
));
}
};
let config = fluss::config::Config {
bootstrap_servers: config.bootstrap_servers.to_string(),
writer_request_max_size: config.writer_request_max_size,
writer_acks: config.writer_acks.to_string(),
writer_retries: config.writer_retries,
writer_batch_size: config.writer_batch_size,
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num: config.remote_file_download_thread_num,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
Expand Down
11 changes: 11 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ impl Config {
))
})?;
}
"writer.bucket.no-key-assigner" => {
config.writer_bucket_no_key_assigner = match value.as_str() {
"round_robin" => fcore::config::NoKeyAssigner::RoundRobin,
"sticky" => fcore::config::NoKeyAssigner::Sticky,
other => {
return Err(FlussError::new_err(format!(
"Unknown bucket assigner type: {other}, expected 'sticky' or 'round_robin'"
)));
}
};
}
_ => {
return Err(FlussError::new_err(format!("Unknown property: {key}")));
}
Expand Down
66 changes: 66 additions & 0 deletions crates/fluss/src/client/write/bucket_assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,44 @@ impl BucketAssigner for StickyBucketAssigner {
}
}

/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
/// in a rotating sequence, providing even data distribution across all buckets.
pub struct RoundRobinBucketAssigner {
table_path: Arc<PhysicalTablePath>,
num_buckets: i32,
counter: AtomicI32,
}

impl RoundRobinBucketAssigner {
pub fn new(table_path: Arc<PhysicalTablePath>, num_buckets: i32) -> Self {
let mut rng = rand::rng();
Self {
table_path,
num_buckets,
counter: AtomicI32::new(rng.random()),
}
}
}

impl BucketAssigner for RoundRobinBucketAssigner {
fn abort_if_batch_full(&self) -> bool {
false
}

fn on_new_batch(&self, _cluster: &Cluster, _prev_bucket_id: i32) {}

fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> Result<i32> {
let next_value = self.counter.fetch_add(1, Ordering::Relaxed);
let available_buckets = cluster.get_available_buckets_for_table_path(&self.table_path);
if available_buckets.is_empty() {
Ok((next_value & i32::MAX) % self.num_buckets)
} else {
let idx = (next_value & i32::MAX) % available_buckets.len() as i32;
Ok(available_buckets[idx as usize].bucket_id())
}
}
}

/// A [BucketAssigner] which assigns based on a modulo hashing function
pub struct HashBucketAssigner {
num_buckets: i32,
Expand Down Expand Up @@ -173,6 +211,34 @@ mod tests {
assert!((0..2).contains(&next_bucket));
}

#[test]
fn round_robin_assigner_cycles_through_buckets() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let num_buckets = 3;
let cluster = build_cluster(&table_path, 1, num_buckets);
let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
let assigner = RoundRobinBucketAssigner::new(physical, num_buckets);

let mut seen = Vec::new();
for _ in 0..(num_buckets * 2) {
let bucket = assigner.assign_bucket(None, &cluster).expect("bucket");
assert!((0..num_buckets).contains(&bucket));
seen.push(bucket);
}

assert_eq!(seen[0], seen[3]);
assert_eq!(seen[1], seen[4]);
assert_eq!(seen[2], seen[5]);
}

#[test]
fn round_robin_assigner_does_not_abort_on_batch_full() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
let assigner = RoundRobinBucketAssigner::new(physical, 3);
assert!(!assigner.abort_if_batch_full());
}

#[test]
fn hash_bucket_assigner_requires_key() {
let assigner = HashBucketAssigner::new(3, <dyn BucketingFunction>::of(None));
Expand Down
30 changes: 23 additions & 7 deletions crates/fluss/src/client/write/writer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use crate::BucketId;
use crate::bucketing::BucketingFunction;
use crate::client::metadata::Metadata;
use crate::client::write::bucket_assigner::{
BucketAssigner, HashBucketAssigner, StickyBucketAssigner,
BucketAssigner, HashBucketAssigner, RoundRobinBucketAssigner, StickyBucketAssigner,
};
use crate::client::write::sender::Sender;
use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
use crate::config::Config;
use crate::config::NoKeyAssigner;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableInfo};
use bytes::Bytes;
Expand Down Expand Up @@ -99,7 +100,12 @@ impl WriterClient {
let (bucket_assigner, bucket_id) =
self.assign_bucket(&record.table_info, bucket_key, physical_table_path)?;

let mut result = self.accumulate.append(record, bucket_id, &cluster, true)?;
let mut result = self.accumulate.append(
record,
bucket_id,
&cluster,
bucket_assigner.abort_if_batch_full(),
)?;

if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
Expand All @@ -125,10 +131,14 @@ impl WriterClient {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
assigner.clone()
} else {
let assigner =
Self::create_bucket_assigner(table_info, Arc::clone(table_path), bucket_key)?;
let assigner = Self::create_bucket_assigner(
table_info,
Arc::clone(table_path),
bucket_key,
&self.config,
)?;
self.bucket_assigners
.insert(Arc::clone(table_path), Arc::clone(&assigner.clone()));
.insert(Arc::clone(table_path), Arc::clone(&assigner));
assigner
}
};
Expand Down Expand Up @@ -164,6 +174,7 @@ impl WriterClient {
table_info: &Arc<TableInfo>,
table_path: Arc<PhysicalTablePath>,
bucket_key: Option<&Bytes>,
config: &Config,
) -> Result<Arc<dyn BucketAssigner>> {
if bucket_key.is_some() {
let datalake_format = table_info.get_table_config().get_datalake_format()?;
Expand All @@ -173,8 +184,13 @@ impl WriterClient {
function,
)))
} else {
// TODO: Wire up toi use round robin/sticky according to ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
Ok(Arc::new(StickyBucketAssigner::new(table_path)))
match config.writer_bucket_no_key_assigner {
NoKeyAssigner::Sticky => Ok(Arc::new(StickyBucketAssigner::new(table_path))),
NoKeyAssigner::RoundRobin => Ok(Arc::new(RoundRobinBucketAssigner::new(
table_path,
table_info.num_buckets,
))),
}
}
}
}
27 changes: 26 additions & 1 deletion crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use clap::Parser;
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
use std::fmt;

const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
Expand All @@ -28,6 +29,26 @@ const DEFAULT_MAX_POLL_RECORDS: usize = 500;

const DEFAULT_ACKS: &str = "all";

/// Bucket assigner strategy for tables without bucket keys.
/// Matches Java `client.writer.bucket.no-key-assigner`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum NoKeyAssigner {
/// Sticks to one bucket until the batch is full, then switches.
Sticky,
/// Assigns each record to the next bucket in a rotating sequence.
RoundRobin,
}

impl fmt::Display for NoKeyAssigner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NoKeyAssigner::Sticky => write!(f, "sticky"),
NoKeyAssigner::RoundRobin => write!(f, "round_robin"),
}
}
}

#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {
Expand All @@ -46,6 +67,9 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
pub writer_batch_size: i32,

#[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
pub writer_bucket_no_key_assigner: NoKeyAssigner,

/// Maximum number of remote log segments to prefetch
/// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
#[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
Expand All @@ -70,6 +94,7 @@ impl Default for Config {
writer_acks: String::from(DEFAULT_ACKS),
writer_retries: i32::MAX,
writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/cpp/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max request size (10 M
config.writer_acks = "all"; // Wait for all replicas
config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on failure
config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB)
config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
```
21 changes: 11 additions & 10 deletions website/docs/user-guide/python/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ with await fluss.FlussConnection.create(config) as conn:

## Connection Configurations

| Key | Description | Default |
|---------------------|-------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
| Key | Description | Default |
|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |

Remember to close the connection when done:

Expand Down
15 changes: 8 additions & 7 deletions website/docs/user-guide/rust/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ let conn = FlussConnection::new(config).await?;

## Connection Configurations

| Option | Description | Default |
|---------------------------|-------------------------------------------------------|------------------|
| `bootstrap_servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer_request_max_size` | Maximum request size in bytes | 10 MB |
| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer_retries` | Number of retries on failure | `i32::MAX` |
| `writer_batch_size` | Batch size for writes | 2 MB |
| Option | Description | Default |
|---------------------------------|--------------------------------------------------------------------------------------|------------------|
| `bootstrap_servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer_request_max_size` | Maximum request size in bytes | 10 MB |
| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer_retries` | Number of retries on failure | `i32::MAX` |
| `writer_batch_size` | Batch size for writes | 2 MB |
| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
Loading