From 42f6ba45788cafeb3b23eb8c364766ef47333ef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Mon, 16 Feb 2026 12:55:34 +0800 Subject: [PATCH 1/7] perf: add configurable streaming remote-log download --- bindings/cpp/src/lib.rs | 21 ++- crates/fluss/src/client/table/remote_log.rs | 138 ++++++++++++++++---- crates/fluss/src/client/table/scanner.rs | 2 + crates/fluss/src/config.rs | 26 ++++ 4 files changed, 148 insertions(+), 39 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index fab8edff..441bb7af 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -485,17 +485,16 @@ fn err_from_core_error(e: &fcore::error::Error) -> ffi::FfiResult { // Connection implementation fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { - let config = fluss::config::Config { - bootstrap_servers: config.bootstrap_servers.to_string(), - writer_request_max_size: config.writer_request_max_size, - writer_acks: config.writer_acks.to_string(), - writer_retries: config.writer_retries, - writer_batch_size: config.writer_batch_size, - scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, - remote_file_download_thread_num: config.remote_file_download_thread_num, - }; - - let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await }); + let mut config_core = fluss::config::Config::default(); + config_core.bootstrap_servers = config.bootstrap_servers.to_string(); + config_core.writer_request_max_size = config.writer_request_max_size; + config_core.writer_acks = config.writer_acks.to_string(); + config_core.writer_retries = config.writer_retries; + config_core.writer_batch_size = config.writer_batch_size; + config_core.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; + config_core.remote_file_download_thread_num = config.remote_file_download_thread_num; + + let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); match conn { Ok(c) => { diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 02820d93..cef5b10d 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -19,6 +19,7 @@ use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use futures::TryStreamExt; use parking_lot::Mutex; use std::{ cmp::{Ordering, Reverse, min}, @@ -293,6 +294,8 @@ enum DownloadResult { struct ProductionFetcher { credentials_rx: CredentialsReceiver, local_log_dir: Arc, + streaming_read: bool, + streaming_read_concurrency: usize, } impl RemoteLogFetcher for ProductionFetcher { @@ -302,6 +305,8 @@ impl RemoteLogFetcher for ProductionFetcher { ) -> Pin> + Send>> { let mut credentials_rx = self.credentials_rx.clone(); let local_log_dir = self.local_log_dir.clone(); + let streaming_read = self.streaming_read; + let streaming_read_concurrency = self.streaming_read_concurrency; // Clone data needed for async operation to avoid lifetime issues let segment = request.segment.clone(); @@ -361,6 +366,8 @@ impl RemoteLogFetcher for ProductionFetcher { &remote_path, &local_file_path, &remote_fs_props, + streaming_read, + streaming_read_concurrency, ) .await?; @@ -768,11 +775,15 @@ impl RemoteLogDownloader { local_log_dir: TempDir, max_prefetch_segments: usize, max_concurrent_downloads: usize, + streaming_read: bool, + streaming_read_concurrency: usize, credentials_rx: CredentialsReceiver, ) -> Result { let fetcher = Arc::new(ProductionFetcher { credentials_rx, local_log_dir: Arc::new(local_log_dir), + streaming_read, + streaming_read_concurrency: streaming_read_concurrency.max(1), }); Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) @@ -854,6 +865,8 @@ impl RemoteLogDownloader { remote_path: &str, local_path: &Path, remote_fs_props: &HashMap, + streaming_read: bool, + streaming_read_concurrency: usize, ) -> Result { // Handle both URL (e.g., "s3://bucket/path") and local file paths // If the path doesn't contain "://", treat it as a local file path @@ -886,56 +899,125 @@ impl RemoteLogDownloader { // Timeout for remote storage operations (30 seconds) const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30); + const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB + + if streaming_read { + Self::download_file_streaming( + &op, + relative_path, + remote_path, + local_path, + CHUNK_SIZE, + streaming_read_concurrency.max(1), + REMOTE_OP_TIMEOUT, + ) + .await?; + } else { + Self::download_file_by_range( + &op, + relative_path, + remote_path, + local_path, + CHUNK_SIZE as u64, + REMOTE_OP_TIMEOUT, + ) + .await?; + } - // Get file metadata to know the size with timeout - let meta = op.stat(relative_path).await?; - let file_size = meta.content_length(); + Ok(local_path.to_path_buf()) + } - // Create local file for writing + async fn download_file_streaming( + op: &opendal::Operator, + relative_path: &str, + remote_path: &str, + local_path: &Path, + chunk_size: usize, + streaming_read_concurrency: usize, + remote_op_timeout: Duration, + ) -> Result<()> { let mut local_file = tokio::fs::File::create(local_path).await?; - // Stream data from remote to local file in chunks - // opendal::Reader::read accepts a range, so we read in chunks - const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient reading + let reader_future = op + .reader_with(relative_path) + .chunk(chunk_size) + .concurrent(streaming_read_concurrency); + let reader = tokio::time::timeout(remote_op_timeout, reader_future) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!("Timeout creating streaming reader for {remote_path}: {e}."), + source: io::ErrorKind::TimedOut.into(), + })??; + + let mut stream = tokio::time::timeout(remote_op_timeout, reader.into_bytes_stream(..)) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!("Timeout creating streaming bytes stream for {remote_path}: {e}."), + source: io::ErrorKind::TimedOut.into(), + })??; + + let mut chunk_count = 0u64; + while let Some(chunk) = tokio::time::timeout(remote_op_timeout, stream.try_next()) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!( + "Timeout streaming chunk from remote storage: {remote_path}, exception: {e}." + ), + source: io::ErrorKind::TimedOut.into(), + })?? + { + chunk_count += 1; + if chunk_count <= 3 || chunk_count % 10 == 0 { + log::debug!("Remote log streaming download: chunk #{chunk_count} ({remote_path})"); + } + local_file.write_all(&chunk).await?; + } + + local_file.sync_all().await?; + Ok(()) + } + + async fn download_file_by_range( + op: &opendal::Operator, + relative_path: &str, + remote_path: &str, + local_path: &Path, + chunk_size: u64, + remote_op_timeout: Duration, + ) -> Result<()> { + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + let mut local_file = tokio::fs::File::create(local_path).await?; + let total_chunks = file_size.div_ceil(chunk_size); let mut offset = 0u64; let mut chunk_count = 0u64; - let total_chunks = file_size.div_ceil(CHUNK_SIZE); while offset < file_size { - let end = min(offset + CHUNK_SIZE, file_size); + let end = min(offset + chunk_size, file_size); let range = offset..end; chunk_count += 1; if chunk_count <= 3 || chunk_count % 10 == 0 { log::debug!( - "Remote log download: reading chunk {chunk_count}/{total_chunks} (offset {offset})" + "Remote log range download: reading chunk {chunk_count}/{total_chunks} (offset {offset})" ); } - // Read chunk from remote storage with timeout - let read_future = op.read_with(relative_path).range(range.clone()); - let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future) + let read_future = op.read_with(relative_path).range(range); + let chunk = tokio::time::timeout(remote_op_timeout, read_future) .await - .map_err(|e| { - Error::IoUnexpectedError { - message: format!( - "Timeout reading chunk from remote storage: {remote_path} at offset {offset}, exception: {e}." - ), - source: io::ErrorKind::TimedOut.into(), - } + .map_err(|e| Error::IoUnexpectedError { + message: format!( + "Timeout reading chunk from remote storage: {remote_path} at offset {offset}, exception: {e}." + ), + source: io::ErrorKind::TimedOut.into(), })??; - let bytes = chunk.to_bytes(); - - // Write chunk to local file - local_file.write_all(&bytes).await?; - + local_file.write_all(&chunk.to_bytes()).await?; offset = end; } - // Ensure all data is flushed to disk local_file.sync_all().await?; - - Ok(local_path.to_path_buf()) + Ok(()) } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 0900267b..03291745 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -676,6 +676,8 @@ impl LogFetcher { tmp_dir, config.scanner_remote_log_prefetch_num, config.remote_file_download_thread_num, + config.scanner_remote_log_streaming_read, + config.scanner_remote_log_streaming_read_concurrency, credentials_rx, )?); diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 92f0b0d9..4df15592 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -24,9 +24,19 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; +const DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ: bool = true; +const DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY: usize = 4; const DEFAULT_ACKS: &str = "all"; +fn default_scanner_remote_log_streaming_read() -> bool { + DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ +} + +fn default_scanner_remote_log_streaming_read_concurrency() -> usize { + DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY +} + #[derive(Parser, Debug, Clone, Deserialize, Serialize)] #[command(author, version, about, long_about = None)] pub struct Config { @@ -54,6 +64,19 @@ pub struct Config { /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM) #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)] pub remote_file_download_thread_num: usize, + + /// Whether to use opendal streaming reader path for remote log downloads. + #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ)] + #[serde(default = "default_scanner_remote_log_streaming_read")] + pub scanner_remote_log_streaming_read: bool, + + /// Intra-file streaming read concurrency for each remote segment download. + #[arg( + long, + default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY + )] + #[serde(default = "default_scanner_remote_log_streaming_read_concurrency")] + pub scanner_remote_log_streaming_read_concurrency: usize, } impl Default for Config { @@ -66,6 +89,9 @@ impl Default for Config { writer_batch_size: DEFAULT_WRITER_BATCH_SIZE, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, + scanner_remote_log_streaming_read: DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ, + scanner_remote_log_streaming_read_concurrency: + DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY, } } } From ebfd12d6b26f2df26a4c85f760357d4530b43785 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 27 Feb 2026 02:33:34 +0000 Subject: [PATCH 2/7] cpp: expose remote log streaming download config --- bindings/cpp/include/fluss.hpp | 4 ++++ bindings/cpp/src/ffi_converter.hpp | 4 ++++ bindings/cpp/src/lib.rs | 22 ++++++++++++++-------- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 30a8636b..23afb62a 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -954,6 +954,10 @@ struct Configuration { size_t scanner_remote_log_prefetch_num{4}; // Number of threads for downloading remote log data size_t remote_file_download_thread_num{3}; + // Whether to use streaming reads for remote log download + bool scanner_remote_log_streaming_read{true}; + // Streaming read concurrency within one remote log file + size_t scanner_remote_log_streaming_read_concurrency{4}; }; class Connection { diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 3c918e56..edbd7ace 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -136,6 +136,10 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_batch_size = config.writer_batch_size; ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; + ffi_config.scanner_remote_log_streaming_read = + config.scanner_remote_log_streaming_read; + ffi_config.scanner_remote_log_streaming_read_concurrency = + config.scanner_remote_log_streaming_read_concurrency; return ffi_config; } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 441bb7af..5596027c 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -45,6 +45,8 @@ mod ffi { writer_batch_size: i32, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, + scanner_remote_log_streaming_read: bool, + scanner_remote_log_streaming_read_concurrency: usize, } struct FfiResult { @@ -485,14 +487,18 @@ fn err_from_core_error(e: &fcore::error::Error) -> ffi::FfiResult { // Connection implementation fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { - let mut config_core = fluss::config::Config::default(); - config_core.bootstrap_servers = config.bootstrap_servers.to_string(); - config_core.writer_request_max_size = config.writer_request_max_size; - config_core.writer_acks = config.writer_acks.to_string(); - config_core.writer_retries = config.writer_retries; - config_core.writer_batch_size = config.writer_batch_size; - config_core.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; - config_core.remote_file_download_thread_num = config.remote_file_download_thread_num; + let config_core = fluss::config::Config { + bootstrap_servers: config.bootstrap_servers.to_string(), + writer_request_max_size: config.writer_request_max_size, + writer_acks: config.writer_acks.to_string(), + writer_retries: config.writer_retries, + writer_batch_size: config.writer_batch_size, + scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, + remote_file_download_thread_num: config.remote_file_download_thread_num, + scanner_remote_log_streaming_read: config.scanner_remote_log_streaming_read, + scanner_remote_log_streaming_read_concurrency: config + .scanner_remote_log_streaming_read_concurrency, + }; let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); From 5f6ab2b3185d34887af0db29609dbad8edc45a74 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 27 Feb 2026 03:03:37 +0000 Subject: [PATCH 3/7] config: allow explicit bool value for streaming read flag --- crates/fluss/src/config.rs | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 4df15592..cd85bca1 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use clap::Parser; +use clap::{ArgAction, Parser}; use serde::{Deserialize, Serialize}; const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123"; @@ -66,7 +66,13 @@ pub struct Config { pub remote_file_download_thread_num: usize, /// Whether to use opendal streaming reader path for remote log downloads. - #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ)] + #[arg( + long, + default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ, + action = ArgAction::Set, + num_args = 0..=1, + default_missing_value = "true" + )] #[serde(default = "default_scanner_remote_log_streaming_read")] pub scanner_remote_log_streaming_read: bool, @@ -95,3 +101,27 @@ impl Default for Config { } } } + +#[cfg(test)] +mod tests { + use super::Config; + use clap::Parser; + + #[test] + fn parse_streaming_read_defaults_to_true() { + let config = Config::parse_from(["prog"]); + assert!(config.scanner_remote_log_streaming_read); + } + + #[test] + fn parse_streaming_read_accepts_false() { + let config = Config::parse_from(["prog", "--scanner-remote-log-streaming-read", "false"]); + assert!(!config.scanner_remote_log_streaming_read); + } + + #[test] + fn parse_streaming_read_flag_without_value_means_true() { + let config = Config::parse_from(["prog", "--scanner-remote-log-streaming-read"]); + assert!(config.scanner_remote_log_streaming_read); + } +} From 21d869ef18adf7a1e2f340d4f142c2ed331f65ab Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 27 Feb 2026 03:24:45 +0000 Subject: [PATCH 4/7] address comments --- crates/fluss/src/client/table/remote_log.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index cef5b10d..e0fea0ab 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -859,7 +859,7 @@ impl Drop for RemoteLogDownloader { } impl RemoteLogDownloader { - /// Download a file from remote storage to local using streaming read/write + /// Download a file from remote storage to local using either streaming or range read/write. async fn download_file( remote_log_tablet_dir: &str, remote_path: &str, @@ -908,7 +908,7 @@ impl RemoteLogDownloader { remote_path, local_path, CHUNK_SIZE, - streaming_read_concurrency.max(1), + streaming_read_concurrency, REMOTE_OP_TIMEOUT, ) .await?; From 0b405dfa15fabcfa2c443a45026a58ac5ae4752d Mon Sep 17 00:00:00 2001 From: AlexZhao Date: Fri, 27 Feb 2026 12:41:35 +0800 Subject: [PATCH 5/7] Update bindings/cpp/include/fluss.hpp Co-authored-by: yuxia Luo --- bindings/cpp/include/fluss.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 884cec81..78bf1754 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -988,7 +988,7 @@ struct Configuration { // Whether to use streaming reads for remote log download bool scanner_remote_log_streaming_read{true}; // Streaming read concurrency within one remote log file - size_t scanner_remote_log_streaming_read_concurrency{4}; + size_t scanner_remote_log_read_concurrency{4}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; }; From 61132ab5ad7e0ee92efc0e684a83f76343cce3a0 Mon Sep 17 00:00:00 2001 From: zhaohaidao Date: Fri, 27 Feb 2026 04:37:25 +0000 Subject: [PATCH 6/7] address comments --- bindings/cpp/include/fluss.hpp | 4 +- bindings/cpp/src/ffi_converter.hpp | 5 +- bindings/cpp/src/lib.rs | 7 +- bindings/python/fluss/__init__.pyi | 4 + bindings/python/src/config.rs | 20 ++++ crates/fluss/src/client/table/remote_log.rs | 97 ++++--------------- crates/fluss/src/client/table/scanner.rs | 3 +- crates/fluss/src/config.rs | 56 +++-------- .../user-guide/cpp/example/configuration.md | 2 + .../python/example/configuration.md | 1 + .../user-guide/rust/example/configuration.md | 4 + 11 files changed, 69 insertions(+), 134 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 78bf1754..d47e2943 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -985,9 +985,7 @@ struct Configuration { size_t scanner_remote_log_prefetch_num{4}; // Number of threads for downloading remote log data size_t remote_file_download_thread_num{3}; - // Whether to use streaming reads for remote log download - bool scanner_remote_log_streaming_read{true}; - // Streaming read concurrency within one remote log file + // Remote log read concurrency within one file (streaming read path) size_t scanner_remote_log_read_concurrency{4}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index babf5bc2..c6ba039e 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -54,10 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner); ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; - ffi_config.scanner_remote_log_streaming_read = - config.scanner_remote_log_streaming_read; - ffi_config.scanner_remote_log_streaming_read_concurrency = - config.scanner_remote_log_streaming_read_concurrency; + ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency; ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; return ffi_config; } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index a17090f8..11558ee5 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -46,8 +46,7 @@ mod ffi { writer_bucket_no_key_assigner: String, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, - scanner_remote_log_streaming_read: bool, - scanner_remote_log_streaming_read_concurrency: usize, + scanner_remote_log_read_concurrency: usize, scanner_log_max_poll_records: usize, } @@ -628,9 +627,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, remote_file_download_thread_num: config.remote_file_download_thread_num, - scanner_remote_log_streaming_read: config.scanner_remote_log_streaming_read, - scanner_remote_log_streaming_read_concurrency: config - .scanner_remote_log_streaming_read_concurrency, + scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, scanner_log_max_poll_records: config.scanner_log_max_poll_records, }; diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 6f9ae0b3..514d011a 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -162,6 +162,10 @@ class Config: @remote_file_download_thread_num.setter def remote_file_download_thread_num(self, num: int) -> None: ... @property + def scanner_remote_log_read_concurrency(self) -> int: ... + @scanner_remote_log_read_concurrency.setter + def scanner_remote_log_read_concurrency(self, num: int) -> None: ... + @property def scanner_log_max_poll_records(self) -> int: ... @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 75056a52..e36c4e07 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -76,6 +76,14 @@ impl Config { )) })?; } + "scanner.remote-log.read-concurrency" => { + config.scanner_remote_log_read_concurrency = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; + } "scanner.log.max-poll-records" => { config.scanner_log_max_poll_records = value.parse::().map_err(|e| { @@ -189,6 +197,18 @@ impl Config { self.inner.remote_file_download_thread_num = num; } + /// Get the scanner remote log read concurrency + #[getter] + fn scanner_remote_log_read_concurrency(&self) -> usize { + self.inner.scanner_remote_log_read_concurrency + } + + /// Set the scanner remote log read concurrency + #[setter] + fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) { + self.inner.scanner_remote_log_read_concurrency = num; + } + /// Get the scanner log max poll records #[getter] fn scanner_log_max_poll_records(&self) -> usize { diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index e0fea0ab..6bc95512 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -22,7 +22,7 @@ use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use futures::TryStreamExt; use parking_lot::Mutex; use std::{ - cmp::{Ordering, Reverse, min}, + cmp::{Ordering, Reverse}, collections::{BinaryHeap, HashMap}, future::Future, io, mem, @@ -294,8 +294,7 @@ enum DownloadResult { struct ProductionFetcher { credentials_rx: CredentialsReceiver, local_log_dir: Arc, - streaming_read: bool, - streaming_read_concurrency: usize, + remote_log_read_concurrency: usize, } impl RemoteLogFetcher for ProductionFetcher { @@ -305,8 +304,7 @@ impl RemoteLogFetcher for ProductionFetcher { ) -> Pin> + Send>> { let mut credentials_rx = self.credentials_rx.clone(); let local_log_dir = self.local_log_dir.clone(); - let streaming_read = self.streaming_read; - let streaming_read_concurrency = self.streaming_read_concurrency; + let remote_log_read_concurrency = self.remote_log_read_concurrency; // Clone data needed for async operation to avoid lifetime issues let segment = request.segment.clone(); @@ -366,8 +364,7 @@ impl RemoteLogFetcher for ProductionFetcher { &remote_path, &local_file_path, &remote_fs_props, - streaming_read, - streaming_read_concurrency, + remote_log_read_concurrency, ) .await?; @@ -775,15 +772,13 @@ impl RemoteLogDownloader { local_log_dir: TempDir, max_prefetch_segments: usize, max_concurrent_downloads: usize, - streaming_read: bool, - streaming_read_concurrency: usize, + remote_log_read_concurrency: usize, credentials_rx: CredentialsReceiver, ) -> Result { let fetcher = Arc::new(ProductionFetcher { credentials_rx, local_log_dir: Arc::new(local_log_dir), - streaming_read, - streaming_read_concurrency: streaming_read_concurrency.max(1), + remote_log_read_concurrency: remote_log_read_concurrency.max(1), }); Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) @@ -859,14 +854,13 @@ impl Drop for RemoteLogDownloader { } impl RemoteLogDownloader { - /// Download a file from remote storage to local using either streaming or range read/write. + /// Download a file from remote storage to local using streaming read/write. async fn download_file( remote_log_tablet_dir: &str, remote_path: &str, local_path: &Path, remote_fs_props: &HashMap, - streaming_read: bool, - streaming_read_concurrency: usize, + remote_log_read_concurrency: usize, ) -> Result { // Handle both URL (e.g., "s3://bucket/path") and local file paths // If the path doesn't contain "://", treat it as a local file path @@ -901,28 +895,16 @@ impl RemoteLogDownloader { const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30); const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB - if streaming_read { - Self::download_file_streaming( - &op, - relative_path, - remote_path, - local_path, - CHUNK_SIZE, - streaming_read_concurrency, - REMOTE_OP_TIMEOUT, - ) - .await?; - } else { - Self::download_file_by_range( - &op, - relative_path, - remote_path, - local_path, - CHUNK_SIZE as u64, - REMOTE_OP_TIMEOUT, - ) - .await?; - } + Self::download_file_streaming( + &op, + relative_path, + remote_path, + local_path, + CHUNK_SIZE, + remote_log_read_concurrency, + REMOTE_OP_TIMEOUT, + ) + .await?; Ok(local_path.to_path_buf()) } @@ -976,49 +958,6 @@ impl RemoteLogDownloader { local_file.sync_all().await?; Ok(()) } - - async fn download_file_by_range( - op: &opendal::Operator, - relative_path: &str, - remote_path: &str, - local_path: &Path, - chunk_size: u64, - remote_op_timeout: Duration, - ) -> Result<()> { - let meta = op.stat(relative_path).await?; - let file_size = meta.content_length(); - let mut local_file = tokio::fs::File::create(local_path).await?; - let total_chunks = file_size.div_ceil(chunk_size); - let mut offset = 0u64; - let mut chunk_count = 0u64; - - while offset < file_size { - let end = min(offset + chunk_size, file_size); - let range = offset..end; - chunk_count += 1; - - if chunk_count <= 3 || chunk_count % 10 == 0 { - log::debug!( - "Remote log range download: reading chunk {chunk_count}/{total_chunks} (offset {offset})" - ); - } - - let read_future = op.read_with(relative_path).range(range); - let chunk = tokio::time::timeout(remote_op_timeout, read_future) - .await - .map_err(|e| Error::IoUnexpectedError { - message: format!( - "Timeout reading chunk from remote storage: {remote_path} at offset {offset}, exception: {e}." - ), - source: io::ErrorKind::TimedOut.into(), - })??; - local_file.write_all(&chunk.to_bytes()).await?; - offset = end; - } - - local_file.sync_all().await?; - Ok(()) - } } #[cfg(test)] diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 738ed48d..e837ba76 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -677,8 +677,7 @@ impl LogFetcher { tmp_dir, config.scanner_remote_log_prefetch_num, config.remote_file_download_thread_num, - config.scanner_remote_log_streaming_read, - config.scanner_remote_log_streaming_read_concurrency, + config.scanner_remote_log_read_concurrency, credentials_rx, )?); diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 20b2c5c1..749851a0 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use clap::{ArgAction, Parser, ValueEnum}; +use clap::{Parser, ValueEnum}; use serde::{Deserialize, Serialize}; use std::fmt; @@ -25,18 +25,13 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; -const DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ: bool = true; -const DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY: usize = 4; +const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4; const DEFAULT_MAX_POLL_RECORDS: usize = 500; const DEFAULT_ACKS: &str = "all"; -fn default_scanner_remote_log_streaming_read() -> bool { - DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ -} - -fn default_scanner_remote_log_streaming_read_concurrency() -> usize { - DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY +fn default_scanner_remote_log_read_concurrency() -> usize { + DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY } /// Bucket assigner strategy for tables without bucket keys. @@ -90,24 +85,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)] pub remote_file_download_thread_num: usize, - /// Whether to use opendal streaming reader path for remote log downloads. - #[arg( - long, - default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ, - action = ArgAction::Set, - num_args = 0..=1, - default_missing_value = "true" - )] - #[serde(default = "default_scanner_remote_log_streaming_read")] - pub scanner_remote_log_streaming_read: bool, - - /// Intra-file streaming read concurrency for each remote segment download. - #[arg( - long, - default_value_t = DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY - )] - #[serde(default = "default_scanner_remote_log_streaming_read_concurrency")] - pub scanner_remote_log_streaming_read_concurrency: usize, + /// Intra-file remote log read concurrency for each remote segment download. + /// Download path always uses streaming reader. + #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)] + #[serde(default = "default_scanner_remote_log_read_concurrency")] + pub scanner_remote_log_read_concurrency: usize, /// Maximum number of records returned in a single call to poll() for LogScanner. /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS) @@ -126,9 +108,7 @@ impl Default for Config { writer_bucket_no_key_assigner: NoKeyAssigner::Sticky, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, - scanner_remote_log_streaming_read: DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ, - scanner_remote_log_streaming_read_concurrency: - DEFAULT_SCANNER_REMOTE_LOG_STREAMING_READ_CONCURRENCY, + scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY, scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, } } @@ -140,20 +120,14 @@ mod tests { use clap::Parser; #[test] - fn parse_streaming_read_defaults_to_true() { + fn parse_remote_log_read_concurrency_defaults_to_four() { let config = Config::parse_from(["prog"]); - assert!(config.scanner_remote_log_streaming_read); - } - - #[test] - fn parse_streaming_read_accepts_false() { - let config = Config::parse_from(["prog", "--scanner-remote-log-streaming-read", "false"]); - assert!(!config.scanner_remote_log_streaming_read); + assert_eq!(config.scanner_remote_log_read_concurrency, 4); } #[test] - fn parse_streaming_read_flag_without_value_means_true() { - let config = Config::parse_from(["prog", "--scanner-remote-log-streaming-read"]); - assert!(config.scanner_remote_log_streaming_read); + fn parse_remote_log_read_concurrency() { + let config = Config::parse_from(["prog", "--scanner-remote-log-read-concurrency", "8"]); + assert_eq!(config.scanner_remote_log_read_concurrency, 8); } } diff --git a/website/docs/user-guide/cpp/example/configuration.md b/website/docs/user-guide/cpp/example/configuration.md index 715e3c63..be2fad9b 100644 --- a/website/docs/user-guide/cpp/example/configuration.md +++ b/website/docs/user-guide/cpp/example/configuration.md @@ -33,4 +33,6 @@ config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB) config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin" config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count config.remote_file_download_thread_num = 3; // Download threads +config.scanner_remote_log_read_concurrency = 4; // In-file remote log read concurrency +config.scanner_log_max_poll_records = 500; // Max records returned per poll() ``` diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 466bf0dd..458014ed 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -31,6 +31,7 @@ with await fluss.FlussConnection.create(config) as conn: | `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | | `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | | `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | +| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` | | `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | Remember to close the connection when done: diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index 92b9bf2f..bc95ce99 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -25,3 +25,7 @@ let conn = FlussConnection::new(config).await?; | `writer_retries` | Number of retries on failure | `i32::MAX` | | `writer_batch_size` | Batch size for writes | 2 MB | | `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `scanner_remote_log_prefetch_num` | Number of remote log segments to prefetch | `4` | +| `remote_file_download_thread_num` | Number of concurrent remote log file downloads | `3` | +| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a remote log file | `4` | +| `scanner_log_max_poll_records` | Maximum records returned in a single `poll()` | `500` | From 0eb784cdf59920883b8fe8dbf69cef659c256c34 Mon Sep 17 00:00:00 2001 From: zhaohaidao Date: Fri, 27 Feb 2026 12:23:52 +0000 Subject: [PATCH 7/7] address comments --- crates/fluss/src/config.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 0a15a5a6..a0d7e707 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -31,10 +31,6 @@ const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_ACKS: &str = "all"; -fn default_scanner_remote_log_read_concurrency() -> usize { - DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY -} - /// Bucket assigner strategy for tables without bucket keys. /// Matches Java `client.writer.bucket.no-key-assigner`. #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize)] @@ -89,7 +85,6 @@ pub struct Config { /// Intra-file remote log read concurrency for each remote segment download. /// Download path always uses streaming reader. #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)] - #[serde(default = "default_scanner_remote_log_read_concurrency")] pub scanner_remote_log_read_concurrency: usize, /// Maximum number of records returned in a single call to poll() for LogScanner. @@ -120,21 +115,3 @@ impl Default for Config { } } } - -#[cfg(test)] -mod tests { - use super::Config; - use clap::Parser; - - #[test] - fn parse_remote_log_read_concurrency_defaults_to_four() { - let config = Config::parse_from(["prog"]); - assert_eq!(config.scanner_remote_log_read_concurrency, 4); - } - - #[test] - fn parse_remote_log_read_concurrency() { - let config = Config::parse_from(["prog", "--scanner-remote-log-read-concurrency", "8"]); - assert_eq!(config.scanner_remote_log_read_concurrency, 8); - } -}