Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ struct Configuration {
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
size_t remote_file_download_thread_num{3};
// Remote log read concurrency within one file (streaming read path)
size_t scanner_remote_log_read_concurrency{4};
// Maximum number of records returned in a single call to Poll() for LogScanner
size_t scanner_log_max_poll_records{500};
int64_t writer_batch_timeout_ms{100};
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner);
ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency;
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
return ffi_config;
Expand Down
6 changes: 4 additions & 2 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod ffi {
writer_bucket_no_key_assigner: String,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
scanner_remote_log_read_concurrency: usize,
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
}
Expand Down Expand Up @@ -618,7 +619,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
));
}
};
let config = fluss::config::Config {
let config_core = fluss::config::Config {
bootstrap_servers: config.bootstrap_servers.to_string(),
writer_request_max_size: config.writer_request_max_size,
writer_acks: config.writer_acks.to_string(),
Expand All @@ -628,10 +629,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num: config.remote_file_download_thread_num,
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
};

let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await });
let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await });

match conn {
Ok(c) => {
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class Config:
@remote_file_download_thread_num.setter
def remote_file_download_thread_num(self, num: int) -> None: ...
@property
def scanner_remote_log_read_concurrency(self) -> int: ...
@scanner_remote_log_read_concurrency.setter
def scanner_remote_log_read_concurrency(self, num: int) -> None: ...
@property
def scanner_log_max_poll_records(self) -> int: ...
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
Expand Down
20 changes: 20 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ impl Config {
))
})?;
}
"scanner.remote-log.read-concurrency" => {
config.scanner_remote_log_read_concurrency =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.log.max-poll-records" => {
config.scanner_log_max_poll_records =
value.parse::<usize>().map_err(|e| {
Expand Down Expand Up @@ -194,6 +202,18 @@ impl Config {
self.inner.remote_file_download_thread_num = num;
}

/// Get the scanner remote log read concurrency
#[getter]
fn scanner_remote_log_read_concurrency(&self) -> usize {
self.inner.scanner_remote_log_read_concurrency
}

/// Set the scanner remote log read concurrency
#[setter]
fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) {
self.inner.scanner_remote_log_read_concurrency = num;
}

/// Get the scanner log max poll records
#[getter]
fn scanner_log_max_poll_records(&self) -> usize {
Expand Down
103 changes: 62 additions & 41 deletions crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use futures::TryStreamExt;
use parking_lot::Mutex;
use std::{
cmp::{Ordering, Reverse, min},
cmp::{Ordering, Reverse},
collections::{BinaryHeap, HashMap},
future::Future,
io, mem,
Expand Down Expand Up @@ -293,6 +294,7 @@ enum DownloadResult {
struct ProductionFetcher {
credentials_rx: CredentialsReceiver,
local_log_dir: Arc<TempDir>,
remote_log_read_concurrency: usize,
}

impl RemoteLogFetcher for ProductionFetcher {
Expand All @@ -302,6 +304,7 @@ impl RemoteLogFetcher for ProductionFetcher {
) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
let mut credentials_rx = self.credentials_rx.clone();
let local_log_dir = self.local_log_dir.clone();
let remote_log_read_concurrency = self.remote_log_read_concurrency;

// Clone data needed for async operation to avoid lifetime issues
let segment = request.segment.clone();
Expand Down Expand Up @@ -361,6 +364,7 @@ impl RemoteLogFetcher for ProductionFetcher {
&remote_path,
&local_file_path,
&remote_fs_props,
remote_log_read_concurrency,
)
.await?;

Expand Down Expand Up @@ -768,11 +772,13 @@ impl RemoteLogDownloader {
local_log_dir: TempDir,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
remote_log_read_concurrency: usize,
credentials_rx: CredentialsReceiver,
) -> Result<Self> {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
remote_log_read_concurrency: remote_log_read_concurrency.max(1),
});

Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)
Expand Down Expand Up @@ -848,12 +854,13 @@ impl Drop for RemoteLogDownloader {
}

impl RemoteLogDownloader {
/// Download a file from remote storage to local using streaming read/write
/// Download a file from remote storage to local using streaming read/write.
async fn download_file(
remote_log_tablet_dir: &str,
remote_path: &str,
local_path: &Path,
remote_fs_props: &HashMap<String, String>,
remote_log_read_concurrency: usize,
) -> Result<PathBuf> {
Comment on lines 860 to 864
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment at line 862 states "using streaming read/write" but the function now supports both streaming and range-based downloads depending on the streaming_read parameter. Consider updating the comment to reflect this, such as "Download a file from remote storage to local using either streaming or range-based read/write".

Copilot uses AI. Check for mistakes.
// Handle both URL (e.g., "s3://bucket/path") and local file paths
// If the path doesn't contain "://", treat it as a local file path
Expand Down Expand Up @@ -886,56 +893,70 @@ impl RemoteLogDownloader {

// Timeout for remote storage operations (30 seconds)
const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB

Self::download_file_streaming(
&op,
relative_path,
remote_path,
local_path,
CHUNK_SIZE,
remote_log_read_concurrency,
REMOTE_OP_TIMEOUT,
)
.await?;

// Get file metadata to know the size with timeout
let meta = op.stat(relative_path).await?;
let file_size = meta.content_length();
Ok(local_path.to_path_buf())
}

// Create local file for writing
async fn download_file_streaming(
op: &opendal::Operator,
relative_path: &str,
remote_path: &str,
local_path: &Path,
chunk_size: usize,
streaming_read_concurrency: usize,
remote_op_timeout: Duration,
) -> Result<()> {
let mut local_file = tokio::fs::File::create(local_path).await?;

// Stream data from remote to local file in chunks
// opendal::Reader::read accepts a range, so we read in chunks
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient reading
let mut offset = 0u64;
let mut chunk_count = 0u64;
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
let reader_future = op
.reader_with(relative_path)
.chunk(chunk_size)
.concurrent(streaming_read_concurrency);
let reader = tokio::time::timeout(remote_op_timeout, reader_future)
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!("Timeout creating streaming reader for {remote_path}: {e}."),
source: io::ErrorKind::TimedOut.into(),
})??;

let mut stream = tokio::time::timeout(remote_op_timeout, reader.into_bytes_stream(..))
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!("Timeout creating streaming bytes stream for {remote_path}: {e}."),
source: io::ErrorKind::TimedOut.into(),
})??;

while offset < file_size {
let end = min(offset + CHUNK_SIZE, file_size);
let range = offset..end;
let mut chunk_count = 0u64;
while let Some(chunk) = tokio::time::timeout(remote_op_timeout, stream.try_next())
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!(
"Timeout streaming chunk from remote storage: {remote_path}, exception: {e}."
),
source: io::ErrorKind::TimedOut.into(),
})??
{
chunk_count += 1;

if chunk_count <= 3 || chunk_count % 10 == 0 {
log::debug!(
"Remote log download: reading chunk {chunk_count}/{total_chunks} (offset {offset})"
);
log::debug!("Remote log streaming download: chunk #{chunk_count} ({remote_path})");
}

// Read chunk from remote storage with timeout
let read_future = op.read_with(relative_path).range(range.clone());
let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
.await
.map_err(|e| {
Error::IoUnexpectedError {
message: format!(
"Timeout reading chunk from remote storage: {remote_path} at offset {offset}, exception: {e}."
),
source: io::ErrorKind::TimedOut.into(),
}
})??;
let bytes = chunk.to_bytes();

// Write chunk to local file
local_file.write_all(&bytes).await?;

offset = end;
local_file.write_all(&chunk).await?;
}

// Ensure all data is flushed to disk
local_file.sync_all().await?;

Ok(local_path.to_path_buf())
Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ impl LogFetcher {
tmp_dir,
config.scanner_remote_log_prefetch_num,
config.remote_file_download_thread_num,
config.scanner_remote_log_read_concurrency,
credentials_rx,
)?);

Expand Down
7 changes: 7 additions & 0 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;

Expand Down Expand Up @@ -81,6 +82,11 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
pub remote_file_download_thread_num: usize,

/// Intra-file remote log read concurrency for each remote segment download.
/// Download path always uses streaming reader.
#[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
pub scanner_remote_log_read_concurrency: usize,

/// Maximum number of records returned in a single call to poll() for LogScanner.
/// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
Expand All @@ -103,6 +109,7 @@ impl Default for Config {
writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
}
Expand Down
2 changes: 2 additions & 0 deletions website/docs/user-guide/cpp/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ config.writer_batch_timeout_ms = 100; // Max time to wait for a batch
config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
config.scanner_remote_log_read_concurrency = 4; // In-file remote log read concurrency
config.scanner_log_max_poll_records = 500; // Max records returned per poll()
```
25 changes: 13 additions & 12 deletions website/docs/user-guide/python/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ with await fluss.FlussConnection.create(config) as conn:

## Connection Configurations

| Key | Description | Default |
|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |
| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |
| Key | Description | Default |
|---------------------------------------|---------------------------------------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer.retries` | Number of retries on failure | `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) |
| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |
| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` |
| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` |
| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` |

Remember to close the connection when done:

Expand Down
6 changes: 5 additions & 1 deletion website/docs/user-guide/rust/example/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ let conn = FlussConnection::new(config).await?;
| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | `all` |
| `writer_retries` | Number of retries on failure | `i32::MAX` |
| `writer_batch_size` | Batch size for writes | 2 MB |
| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `writer_batch_timeout_ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` |
| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner_remote_log_prefetch_num` | Number of remote log segments to prefetch | `4` |
| `remote_file_download_thread_num` | Number of concurrent remote log file downloads | `3` |
| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a remote log file | `4` |
| `scanner_log_max_poll_records` | Maximum records returned in a single `poll()` | `500` |
Loading