diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 0f980b73..0a62af91 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -985,6 +985,8 @@ struct Configuration { size_t scanner_remote_log_prefetch_num{4}; // Number of threads for downloading remote log data size_t remote_file_download_thread_num{3}; + // Remote log read concurrency within one file (streaming read path) + size_t scanner_remote_log_read_concurrency{4}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; int64_t writer_batch_timeout_ms{100}; diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index e3763435..90200277 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -54,6 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner); ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; + ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency; ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; return ffi_config; diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index ea1307e2..9b01d322 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -46,6 +46,7 @@ mod ffi { writer_bucket_no_key_assigner: String, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, + scanner_remote_log_read_concurrency: usize, scanner_log_max_poll_records: usize, writer_batch_timeout_ms: i64, } @@ -618,7 +619,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { )); } }; - let config = fluss::config::Config { + let config_core = fluss::config::Config { bootstrap_servers: config.bootstrap_servers.to_string(), writer_request_max_size: config.writer_request_max_size, writer_acks: config.writer_acks.to_string(), @@ -628,10 +629,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, remote_file_download_thread_num: config.remote_file_download_thread_num, + scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, scanner_log_max_poll_records: config.scanner_log_max_poll_records, }; - let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await }); + let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); match conn { Ok(c) => { diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 6f9ae0b3..514d011a 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -162,6 +162,10 @@ class Config: @remote_file_download_thread_num.setter def remote_file_download_thread_num(self, num: int) -> None: ... @property + def scanner_remote_log_read_concurrency(self) -> int: ... + @scanner_remote_log_read_concurrency.setter + def scanner_remote_log_read_concurrency(self, num: int) -> None: ... + @property def scanner_log_max_poll_records(self) -> int: ... @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 5b7f2d37..9c0059e0 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -81,6 +81,14 @@ impl Config { )) })?; } + "scanner.remote-log.read-concurrency" => { + config.scanner_remote_log_read_concurrency = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; + } "scanner.log.max-poll-records" => { config.scanner_log_max_poll_records = value.parse::().map_err(|e| { @@ -194,6 +202,18 @@ impl Config { self.inner.remote_file_download_thread_num = num; } + /// Get the scanner remote log read concurrency + #[getter] + fn scanner_remote_log_read_concurrency(&self) -> usize { + self.inner.scanner_remote_log_read_concurrency + } + + /// Set the scanner remote log read concurrency + #[setter] + fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) { + self.inner.scanner_remote_log_read_concurrency = num; + } + /// Get the scanner log max poll records #[getter] fn scanner_log_max_poll_records(&self) -> usize { diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 02820d93..6bc95512 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -19,9 +19,10 @@ use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use futures::TryStreamExt; use parking_lot::Mutex; use std::{ - cmp::{Ordering, Reverse, min}, + cmp::{Ordering, Reverse}, collections::{BinaryHeap, HashMap}, future::Future, io, mem, @@ -293,6 +294,7 @@ enum DownloadResult { struct ProductionFetcher { credentials_rx: CredentialsReceiver, local_log_dir: Arc, + remote_log_read_concurrency: usize, } impl RemoteLogFetcher for ProductionFetcher { @@ -302,6 +304,7 @@ impl RemoteLogFetcher for ProductionFetcher { ) -> Pin> + Send>> { let mut credentials_rx = self.credentials_rx.clone(); let local_log_dir = self.local_log_dir.clone(); + let remote_log_read_concurrency = self.remote_log_read_concurrency; // Clone data needed for async operation to avoid lifetime issues let segment = request.segment.clone(); @@ -361,6 +364,7 @@ impl RemoteLogFetcher for ProductionFetcher { &remote_path, &local_file_path, &remote_fs_props, + remote_log_read_concurrency, ) .await?; @@ -768,11 +772,13 @@ impl RemoteLogDownloader { local_log_dir: TempDir, max_prefetch_segments: usize, max_concurrent_downloads: usize, + remote_log_read_concurrency: usize, credentials_rx: CredentialsReceiver, ) -> Result { let fetcher = Arc::new(ProductionFetcher { credentials_rx, local_log_dir: Arc::new(local_log_dir), + remote_log_read_concurrency: remote_log_read_concurrency.max(1), }); Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) @@ -848,12 +854,13 @@ impl Drop for RemoteLogDownloader { } impl RemoteLogDownloader { - /// Download a file from remote storage to local using streaming read/write + /// Download a file from remote storage to local using streaming read/write. async fn download_file( remote_log_tablet_dir: &str, remote_path: &str, local_path: &Path, remote_fs_props: &HashMap, + remote_log_read_concurrency: usize, ) -> Result { // Handle both URL (e.g., "s3://bucket/path") and local file paths // If the path doesn't contain "://", treat it as a local file path @@ -886,56 +893,70 @@ impl RemoteLogDownloader { // Timeout for remote storage operations (30 seconds) const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30); + const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB + + Self::download_file_streaming( + &op, + relative_path, + remote_path, + local_path, + CHUNK_SIZE, + remote_log_read_concurrency, + REMOTE_OP_TIMEOUT, + ) + .await?; - // Get file metadata to know the size with timeout - let meta = op.stat(relative_path).await?; - let file_size = meta.content_length(); + Ok(local_path.to_path_buf()) + } - // Create local file for writing + async fn download_file_streaming( + op: &opendal::Operator, + relative_path: &str, + remote_path: &str, + local_path: &Path, + chunk_size: usize, + streaming_read_concurrency: usize, + remote_op_timeout: Duration, + ) -> Result<()> { let mut local_file = tokio::fs::File::create(local_path).await?; - // Stream data from remote to local file in chunks - // opendal::Reader::read accepts a range, so we read in chunks - const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient reading - let mut offset = 0u64; - let mut chunk_count = 0u64; - let total_chunks = file_size.div_ceil(CHUNK_SIZE); + let reader_future = op + .reader_with(relative_path) + .chunk(chunk_size) + .concurrent(streaming_read_concurrency); + let reader = tokio::time::timeout(remote_op_timeout, reader_future) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!("Timeout creating streaming reader for {remote_path}: {e}."), + source: io::ErrorKind::TimedOut.into(), + })??; + + let mut stream = tokio::time::timeout(remote_op_timeout, reader.into_bytes_stream(..)) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!("Timeout creating streaming bytes stream for {remote_path}: {e}."), + source: io::ErrorKind::TimedOut.into(), + })??; - while offset < file_size { - let end = min(offset + CHUNK_SIZE, file_size); - let range = offset..end; + let mut chunk_count = 0u64; + while let Some(chunk) = tokio::time::timeout(remote_op_timeout, stream.try_next()) + .await + .map_err(|e| Error::IoUnexpectedError { + message: format!( + "Timeout streaming chunk from remote storage: {remote_path}, exception: {e}." + ), + source: io::ErrorKind::TimedOut.into(), + })?? + { chunk_count += 1; - if chunk_count <= 3 || chunk_count % 10 == 0 { - log::debug!( - "Remote log download: reading chunk {chunk_count}/{total_chunks} (offset {offset})" - ); + log::debug!("Remote log streaming download: chunk #{chunk_count} ({remote_path})"); } - - // Read chunk from remote storage with timeout - let read_future = op.read_with(relative_path).range(range.clone()); - let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future) - .await - .map_err(|e| { - Error::IoUnexpectedError { - message: format!( - "Timeout reading chunk from remote storage: {remote_path} at offset {offset}, exception: {e}." - ), - source: io::ErrorKind::TimedOut.into(), - } - })??; - let bytes = chunk.to_bytes(); - - // Write chunk to local file - local_file.write_all(&bytes).await?; - - offset = end; + local_file.write_all(&chunk).await?; } - // Ensure all data is flushed to disk local_file.sync_all().await?; - - Ok(local_path.to_path_buf()) + Ok(()) } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 3ec9106d..e837ba76 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -677,6 +677,7 @@ impl LogFetcher { tmp_dir, config.scanner_remote_log_prefetch_num, config.remote_file_download_thread_num, + config.scanner_remote_log_read_concurrency, credentials_rx, )?); diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 30264932..a0d7e707 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -25,6 +25,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; +const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4; const DEFAULT_MAX_POLL_RECORDS: usize = 500; const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; @@ -81,6 +82,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)] pub remote_file_download_thread_num: usize, + /// Intra-file remote log read concurrency for each remote segment download. + /// Download path always uses streaming reader. + #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)] + pub scanner_remote_log_read_concurrency: usize, + /// Maximum number of records returned in a single call to poll() for LogScanner. /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS) #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)] @@ -103,6 +109,7 @@ impl Default for Config { writer_bucket_no_key_assigner: NoKeyAssigner::Sticky, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, + scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY, scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, } diff --git a/website/docs/user-guide/cpp/example/configuration.md b/website/docs/user-guide/cpp/example/configuration.md index d73661ae..2245ee1b 100644 --- a/website/docs/user-guide/cpp/example/configuration.md +++ b/website/docs/user-guide/cpp/example/configuration.md @@ -34,4 +34,6 @@ config.writer_batch_timeout_ms = 100; // Max time to wait for a batch config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin" config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count config.remote_file_download_thread_num = 3; // Download threads +config.scanner_remote_log_read_concurrency = 4; // In-file remote log read concurrency +config.scanner_log_max_poll_records = 500; // Max records returned per poll() ``` diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 71e71994..39c53be4 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -21,18 +21,19 @@ with await fluss.FlussConnection.create(config) as conn: ## Connection Configurations -| Key | Description | Default | -|------------------------------------|--------------------------------------------------------------------------------------|--------------------| -| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | -| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | -| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | -| `writer.retries` | Number of retries on failure | `2147483647` | -| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | -| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | -| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | -| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | -| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | -| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` | +| Key | Description | Default | +|---------------------------------------|---------------------------------------------------------------------------------------|--------------------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `writer.request-max-size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch-size` | Batch size for writes in bytes | `2097152` (2 MB) | +| `writer.batch-timeout-ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` | +| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `scanner.remote-log.prefetch-num` | Number of remote log segments to prefetch | `4` | +| `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | +| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` | +| `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | Remember to close the connection when done: diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index 7d7cc939..a2f52dc1 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -24,5 +24,9 @@ let conn = FlussConnection::new(config).await?; | `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | | `writer_retries` | Number of retries on failure | `i32::MAX` | | `writer_batch_size` | Batch size for writes | 2 MB | -| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | | `writer_batch_timeout_ms` | The maximum time to wait for a writer batch to fill up before sending. | `100` | +| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | `sticky` | +| `scanner_remote_log_prefetch_num` | Number of remote log segments to prefetch | `4` | +| `remote_file_download_thread_num` | Number of concurrent remote log file downloads | `3` | +| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a remote log file | `4` | +| `scanner_log_max_poll_records` | Maximum records returned in a single `poll()` | `500` |