From 428457e5ef9e5174811c557d6cbcf0599554cd0b Mon Sep 17 00:00:00 2001 From: Joy He Date: Sun, 28 Sep 2025 15:28:20 -0700 Subject: [PATCH 1/9] upload --- src/upload.rs | 529 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 529 insertions(+) create mode 100644 src/upload.rs diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..c357e3a --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,529 @@ +use std::future::Future; +use std::io::Read; +use std::io::Seek; +use std::path::Path; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; + +use conjure_error::Error; +use conjure_http::client::AsyncWriteBody; +use conjure_http::private::Stream; +use conjure_object::BearerToken; +use conjure_object::ResourceIdentifier; +use conjure_object::SafeLong; +use conjure_runtime::BodyWriter; +use conjure_runtime::Client; +use futures::StreamExt; +use nominal_api::api::rids::WorkspaceRid; +use nominal_api::ingest::api::AvroStreamOpts; +use nominal_api::ingest::api::CompleteMultipartUploadResponse; +use nominal_api::ingest::api::DatasetIngestTarget; +use nominal_api::ingest::api::ExistingDatasetIngestDestination; +use nominal_api::ingest::api::IngestOptions; +use nominal_api::ingest::api::IngestRequest; +use nominal_api::ingest::api::IngestResponse; +use nominal_api::ingest::api::IngestServiceAsyncClient; +use nominal_api::ingest::api::IngestSource; +use nominal_api::ingest::api::InitiateMultipartUploadRequest; +use nominal_api::ingest::api::InitiateMultipartUploadResponse; +use nominal_api::ingest::api::Part; +use nominal_api::ingest::api::S3IngestSource; +use nominal_api::upload::api::UploadServiceAsyncClient; +use tokio::sync::Semaphore; +use tracing::error; +use tracing::info; + +use crate::client::NominalApiClients; +use crate::types::AuthProvider; + +const SMALL_FILE_SIZE_LIMIT: u64 = 256 * 1024 * 1024; // 256 MB + +#[derive(Clone)] +pub struct UploadManager { + pub upload_queue: async_channel::Receiver, +} + +impl UploadManager { + pub fn new( + clients: NominalApiClients, + http_client: reqwest::Client, + handle: tokio::runtime::Handle, + opts: UploaderOpts, + upload_queue: async_channel::Receiver, + auth_provider: impl AuthProvider + 'static, + data_source_rid: ResourceIdentifier, + ) -> Self { + let uploader = Uploader::new( + clients.upload, + clients.ingest, + http_client, + handle.clone(), + opts, + ); + + let upload_queue_clone = upload_queue.clone(); + + handle.spawn(async move { + Self::run(upload_queue_clone, uploader, auth_provider, data_source_rid).await; + }); + + UploadManager { upload_queue } + } + + pub async fn run( + upload_queue: async_channel::Receiver, + uploader: Uploader, + auth_provider: impl AuthProvider + 'static, + data_source_rid: ResourceIdentifier, + ) { + while let Ok(file_path) = upload_queue.recv().await { + let file_name = file_path.to_str().unwrap_or("nmstream_file"); + let file = std::fs::File::open(&file_path); + let Some(token) = auth_provider.token() else { + error!("Missing token for upload"); + continue; + }; + let Some(workspace_rid) = auth_provider.workspace_rid() else { + error!("Missing workspace RID for upload"); + continue; + }; + match file { + Ok(f) => { + match uploader + .upload(&token, f, file_name, workspace_rid.clone()) + .await + { + Ok(response) => { + match uploader + .ingest_avro(&token, &response, data_source_rid.clone()) + .await + { + Ok(ingest_response) => { + info!( + "Successfully uploaded and ingested file {}: {:?}", + file_name, ingest_response + ); + // remove file + if let Err(e) = std::fs::remove_file(&file_path) { + error!( + "Failed to remove file {}: {:?}", + file_path.display(), + e + ); + } else { + info!("Removed file {}", file_path.display()); + } + } + Err(e) => { + error!("Failed to ingest file {}: {:?}", file_name, e); + } + } + } + Err(e) => { + error!("Failed to upload file {}: {:?}", file_name, e); + } + } + } + Err(e) => { + error!("Failed to open file {}: {:?}", file_path.display(), e); + } + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum UploaderError { + #[error("Conjure error: {0}")] + Conjure(String), + #[error("Failed to initiate multipart upload: {0}")] + IOError(#[from] std::io::Error), + #[error("Failed to upload part: {0}")] + HTTPError(#[from] reqwest::Error), + #[error("Error executing upload tasks: {0}")] + TokioError(#[from] tokio::task::JoinError), + #[error("Error: {0}")] + Other(String), +} + +#[derive(Debug, Clone)] +pub struct UploaderOpts { + pub chunk_size: usize, + pub max_retries: usize, + pub max_concurrent_uploads: usize, +} + +impl Default for UploaderOpts { + fn default() -> Self { + UploaderOpts { + chunk_size: 64_000_000, // 128 MB + max_retries: 3, + max_concurrent_uploads: 8, + } + } +} + +pub struct FileWriteBody { + file: std::fs::File, +} + +impl FileWriteBody { + pub fn new(file: std::fs::File) -> Self { + FileWriteBody { file } + } +} + +impl AsyncWriteBody for FileWriteBody { + #[expect(clippy::manual_async_fn)] + fn write_body( + self: Pin<&mut Self>, + w: Pin<&mut BodyWriter>, + ) -> impl Future> + Send { + async move { + let mut file = self.file.try_clone().map_err(|e| { + Error::internal_safe(format!("Failed to clone file for upload: {e}")) + })?; + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).map_err(|e| { + Error::internal_safe(format!("Failed to read bytes from file: {e}")) + })?; + + w.write_bytes(buffer.into()) + .await + .map_err(|e| Error::internal_safe(format!("Failed to write bytes to body: {e}")))?; + + Ok(()) + } + } + + #[expect(clippy::manual_async_fn)] + fn reset(self: Pin<&mut Self>) -> impl Future + Send { + async move { + let Ok(mut file) = self.file.try_clone() else { + return false; + }; + + use std::io::SeekFrom; + + file.seek(SeekFrom::Start(0)).is_ok() + } + } +} + +#[derive(Clone)] +pub struct Uploader { + upload_client: UploadServiceAsyncClient, + ingest_client: IngestServiceAsyncClient, + http_client: reqwest::Client, + handle: tokio::runtime::Handle, + opts: UploaderOpts, +} + +impl Uploader { + pub fn new( + upload_client: UploadServiceAsyncClient, + ingest_client: IngestServiceAsyncClient, + http_client: reqwest::Client, + handle: tokio::runtime::Handle, + opts: UploaderOpts, + ) -> Self { + Uploader { + upload_client, + ingest_client, + http_client, + handle, + opts, + } + } + + pub async fn initiate_upload( + &self, + token: &BearerToken, + file_name: &str, + workspace_rid: ResourceIdentifier, + ) -> Result { + let request = InitiateMultipartUploadRequest::builder() + .filename(file_name) + .filetype("application/octet-stream") + .workspace(Some(WorkspaceRid::from(workspace_rid))) + .build(); + let response = self + .upload_client + .initiate_multipart_upload(token, &request) + .await + .map_err(|e| UploaderError::Conjure(format!("{e:?}")))?; + + info!("Initiated multipart upload for file: {}", file_name); + Ok(response) + } + + #[expect(clippy::too_many_arguments)] + async fn upload_part( + client: UploadServiceAsyncClient, + http_client: reqwest::Client, + token: BearerToken, + upload_id: String, + key: String, + part_number: i32, + chunk: Vec, + max_retries: usize, + ) -> Result { + let mut attempts = 0; + + loop { + attempts += 1; + match Self::try_upload_part( + client.clone(), + http_client.clone(), + &token, + &upload_id, + &key, + part_number, + chunk.clone(), + ) + .await + { + Ok(part) => return Ok(part), + Err(e) if attempts < max_retries => { + error!("Upload attempt {} failed, retrying: {}", attempts, e); + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + async fn try_upload_part( + client: UploadServiceAsyncClient, + http_client: reqwest::Client, + token: &BearerToken, + upload_id: &str, + key: &str, + part_number: i32, + chunk: Vec, + ) -> Result { + let response = client + .sign_part(token, upload_id, key, part_number) + .await + .map_err(|e| UploaderError::Conjure(format!("{e:?}")))?; + + let mut request_builder = http_client.put(response.url()).body(chunk); + + for (header_name, header_value) in response.headers() { + request_builder = request_builder.header(header_name, header_value); + } + + let http_response = request_builder.send().await?; + let headers = http_response.headers().clone(); + let status = http_response.status(); + + if !status.is_success() { + error!("Failed to upload body"); + return Err(UploaderError::Other(format!( + "Failed to upload part {part_number}: HTTP status {status}" + ))); + } + + let etag = headers + .get("etag") + .and_then(|v| v.to_str().ok()) + .unwrap_or("ignored-etag"); + + Ok(Part::new(part_number, etag)) + } + + pub async fn upload_parts( + &self, + token: &BearerToken, + reader: R, + key: &str, + upload_id: &str, + ) -> Result + where + R: Read + Send + 'static, + { + let chunks = ChunkedStreamReader::new(reader, self.opts.chunk_size); + + let parallel_part_uploads = Arc::new(Semaphore::new(self.opts.max_concurrent_uploads)); + let mut upload_futures = Vec::new(); + + futures::pin_mut!(chunks); + + while let Some(entry) = chunks.next().await { + let (index, chunk) = entry?; + let part_number = (index + 1) as i32; + + let token = token.clone(); + let key = key.to_string(); + let upload_id = upload_id.to_string(); + let parallel_part_uploads = Arc::clone(¶llel_part_uploads); + let client = self.upload_client.clone(); + let http_client = self.http_client.clone(); + let max_retries = self.opts.max_retries; + + upload_futures.push(self.handle.spawn(async move { + let _permit = parallel_part_uploads.acquire().await; + Self::upload_part( + client, + http_client, + token, + upload_id, + key, + part_number, + chunk, + max_retries, + ) + .await + })); + } + + let mut part_responses = futures::future::join_all(upload_futures) + .await + .into_iter() + .map(|result| result.map_err(UploaderError::TokioError)?) + .collect::, _>>()?; + + part_responses.sort_by_key(|part| part.part_number()); + + let response = self + .upload_client + .complete_multipart_upload(token, upload_id, key, &part_responses) + .await + .map_err(|e| UploaderError::Conjure(format!("{e:?}")))?; + + Ok(response) + } + + pub async fn upload_small_file( + &self, + token: &BearerToken, + file_name: &str, + size_bytes: i64, + workspace_rid: ResourceIdentifier, + file: std::fs::File, + ) -> Result { + let workspace_rid = WorkspaceRid::from(workspace_rid); + + let s3_path = self + .upload_client + .upload_file( + token, + file_name, + SafeLong::new(size_bytes).ok(), + Some(&workspace_rid), + FileWriteBody::new(file), + ) + .await + .map_err(|e| UploaderError::Conjure(format!("{e:?}")))?; + + Ok(s3_path.as_str().to_string()) + } + + pub async fn upload( + &self, + token: &BearerToken, + reader: R, + file_name: impl Into<&str>, + workspace_rid: ResourceIdentifier, + ) -> Result + where + R: Read + Send + 'static, + { + let file_name = file_name.into(); + let path = Path::new(file_name); + let file_size = std::fs::metadata(path)?.len(); + if file_size < SMALL_FILE_SIZE_LIMIT { + return self + .upload_small_file( + token, + file_name, + file_size as i64, + workspace_rid, + std::fs::File::open(path)?, + ) + .await; + } + + let initiate_response = self + .initiate_upload(token, file_name, workspace_rid) + .await?; + let upload_id = initiate_response.upload_id(); + let key = initiate_response.key(); + + let response = self.upload_parts(token, reader, key, upload_id).await?; + + let s3_path = response.location().ok_or_else(|| { + UploaderError::Other("Upload response did not contain a location".to_string()) + })?; + + Ok(s3_path.to_string()) + } + + pub async fn ingest_avro( + &self, + token: &BearerToken, + s3_path: &str, + data_source_rid: ResourceIdentifier, + ) -> Result { + let opts = IngestOptions::AvroStream( + AvroStreamOpts::builder() + .source(IngestSource::S3(S3IngestSource::new(s3_path))) + .target(DatasetIngestTarget::Existing( + ExistingDatasetIngestDestination::new(data_source_rid), + )) + .build(), + ); + + let request = IngestRequest::new(opts); + + self.ingest_client + .ingest(token, &request) + .await + .map_err(|e| UploaderError::Conjure(format!("{e:?}"))) + } +} + +pub struct ChunkedStreamReader { + reader: Box, + chunk_size: usize, + current_index: usize, +} + +impl ChunkedStreamReader { + pub fn new(reader: R, chunk_size: usize) -> Self + where + R: Read + Send + 'static, + { + Self { + reader: Box::new(reader), + chunk_size, + current_index: 0, + } + } +} + +impl Stream for ChunkedStreamReader { + type Item = Result<(usize, Vec), std::io::Error>; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut buffer = vec![0u8; self.chunk_size]; + + match self.reader.read(&mut buffer) { + Ok(0) => std::task::Poll::Ready(None), + Ok(n) => { + buffer.truncate(n); + let index = self.current_index; + self.current_index += 1; + std::task::Poll::Ready(Some(Ok((index, buffer)))) + } + Err(e) => std::task::Poll::Ready(Some(Err(e))), + } + } +} From 74846ba131e93766aa3ad14a8dce82929e0fb04c Mon Sep 17 00:00:00 2001 From: Joy He Date: Sun, 28 Sep 2025 15:30:28 -0700 Subject: [PATCH 2/9] mod --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 99533bd..3666a81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod client; pub mod consumer; pub mod notifier; pub mod stream; +pub mod upload; mod types; /// This includes the most common types in this crate, re-exported for your convenience. From 3989086351c7f6e1be2f0277f7ee6e9be304e3c7 Mon Sep 17 00:00:00 2001 From: Joy He Date: Sun, 28 Sep 2025 16:36:09 -0700 Subject: [PATCH 3/9] fix --- Cargo.lock | 390 +++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 5 +- src/client.rs | 103 ++++++++++--- src/consumer.rs | 18 +-- src/lib.rs | 2 - src/stream.rs | 5 +- src/types.rs | 6 +- 7 files changed, 484 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c146ce..059de5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,6 +153,18 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -314,7 +326,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -323,6 +335,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "conjure-codegen" version = "4.15.0" @@ -481,6 +502,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -601,6 +632,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum-ordinalize" version = "4.3.0" @@ -668,6 +708,27 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "exponential-decay-histogram" version = "0.1.13" @@ -712,6 +773,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -1021,12 +1097,29 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ + "base64", "bytes", "futures-channel", "futures-core", @@ -1034,12 +1127,16 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.0", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -1195,6 +1292,22 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1367,6 +1480,23 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nominal-api" version = "0.867.0" @@ -1387,6 +1517,7 @@ name = "nominal-streaming" version = "0.4.0" dependencies = [ "apache-avro", + "async-channel", "chrono", "conjure-error", "conjure-http", @@ -1394,9 +1525,11 @@ dependencies = [ "conjure-runtime", "crossbeam-channel", "derive_more", + "futures", "nominal-api", "parking_lot", "prost", + "reqwest", "serde_json", "snap", "test-log", @@ -1465,6 +1598,50 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1485,6 +1662,12 @@ dependencies = [ "serde", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.4" @@ -1556,6 +1739,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "potential_utf" version = "0.1.3" @@ -1790,6 +1979,46 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "reqwest" +version = "0.12.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "js-sys", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-native-tls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "ring" version = "0.17.14" @@ -1885,12 +2114,44 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.1", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.219" @@ -1964,6 +2225,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2118,6 +2391,9 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -2130,6 +2406,27 @@ dependencies = [ "syn", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.21.0" @@ -2266,6 +2563,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" @@ -2403,6 +2710,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -2567,6 +2892,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2623,6 +2954,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.100" @@ -2655,6 +2999,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" @@ -2681,7 +3035,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", - "windows-link", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -2714,13 +3068,30 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" + +[[package]] +name = "windows-registry" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" +dependencies = [ + "windows-link 0.1.3", + "windows-result", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -2729,7 +3100,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -2759,6 +3130,15 @@ dependencies = [ "windows-targets 0.53.3", ] +[[package]] +name = "windows-sys" +version = "0.61.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f109e41dd4a3c848907eb83d5a42ea98b3769495597450cf6d153507b166f0f" +dependencies = [ + "windows-link 0.2.0", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -2781,7 +3161,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/Cargo.toml b/Cargo.toml index fbc9460..6d1aadd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,8 @@ conjure-runtime = "6" nominal-api = "0.867.0" prost = "0.13" snap = "1" -tokio = { version = "1", features = ["full", "tracing"] } +futures = "0.3" +tokio = { version = "1", features = ["full", "tracing", "io-util"] } url = "2.5.4" parking_lot = "0.12" tracing = "0.1" @@ -23,6 +24,8 @@ serde_json = "1.0.140" chrono = "0.4.41" thiserror = "2" crossbeam-channel = "0.5.15" +reqwest = "0.12.22" +async-channel = "2.5.0" [profile.release] debug = true diff --git a/src/client.rs b/src/client.rs index c35d57f..9be76f7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,11 @@ use std::fmt::Debug; -use std::fmt::Formatter; use std::io::Write; use std::sync::LazyLock; use conjure_error::Error; use conjure_http::client::AsyncClient; use conjure_http::client::AsyncRequestBody; +use conjure_http::client::AsyncService; use conjure_http::private::header::CONTENT_ENCODING; use conjure_http::private::header::CONTENT_TYPE; use conjure_http::private::Request; @@ -18,8 +18,9 @@ use conjure_runtime::Client; use conjure_runtime::Idempotency; use conjure_runtime::ResponseBody; use conjure_runtime::UserAgent; -use derive_more::From; use nominal_api::api::rids::NominalDataSourceOrDatasetRid; +use nominal_api::ingest::api::IngestServiceAsyncClient; +use nominal_api::upload::api::UploadServiceAsyncClient; use snap::write::FrameEncoder; use url::Url; @@ -32,27 +33,86 @@ pub mod conjure { pub use conjure_runtime as runtime; } +const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api"; +const STAGING_API_URL: &str = "https://api-staging.gov.nominal.io/api"; +const USER_AGENT: &str = "nominal-streaming"; + impl AuthProvider for BearerToken { fn token(&self) -> Option { Some(self.clone()) } } -pub static PRODUCTION_STREAMING_CLIENT: LazyLock = LazyLock::new(|| { - async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap()) - .expect("Failed to create client") +#[derive(Debug, Clone)] +pub struct TokenAndWorkspaceRid { + pub token: BearerToken, + pub workspace_rid: Option, +} + +impl AuthProvider for TokenAndWorkspaceRid { + fn token(&self) -> Option { + Some(self.token.clone()) + } + + fn workspace_rid(&self) -> Option { + self.workspace_rid.clone() + } +} + +#[derive(Clone)] +pub struct NominalApiClients { + pub streaming: Client, + pub upload: UploadServiceAsyncClient, + pub ingest: IngestServiceAsyncClient, +} + +impl Debug for NominalApiClients { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NominalApiClients") + .field("streaming", &"Client") + .field("upload", &"UploadServiceAsyncClient") + .field("ingest", &"IngestServiceAsyncClient") + .finish() + } +} + +impl NominalApiClients { + pub async fn send(&self, req: WriteRequest<'_>) -> Result, Error> { + self.streaming.send(req).await + } +} + +pub static PRODUCTION_CLIENTS: LazyLock = LazyLock::new(|| NominalApiClients { + streaming: async_conjure_streaming_client(PRODUCTION_API_URL.try_into().unwrap()) + .expect("Failed to create streaming client"), + upload: UploadServiceAsyncClient::new( + async_conjure_client("upload", PRODUCTION_API_URL.try_into().unwrap()) + .expect("Failed to create upload client"), + ), + ingest: IngestServiceAsyncClient::new( + async_conjure_client("ingest", PRODUCTION_API_URL.try_into().unwrap()) + .expect("Failed to create ingest client"), + ), }); -pub static STAGING_STREAMING_CLIENT: LazyLock = LazyLock::new(|| { - async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap()) - .expect("Failed to create client") +pub static STAGING_CLIENTS: LazyLock = LazyLock::new(|| NominalApiClients { + streaming: async_conjure_streaming_client(STAGING_API_URL.try_into().unwrap()) + .expect("Failed to create streaming client"), + upload: UploadServiceAsyncClient::new( + async_conjure_client("upload", STAGING_API_URL.try_into().unwrap()) + .expect("Failed to create upload client"), + ), + ingest: IngestServiceAsyncClient::new( + async_conjure_client("ingest", STAGING_API_URL.try_into().unwrap()) + .expect("Failed to create ingest client"), + ), }); -fn async_conjure_streaming_client(uri: Url) -> Result { +fn async_conjure_streaming_client(uri: Url) -> Result { Client::builder() .service("core-streaming-rs") .user_agent(UserAgent::new(Agent::new( - "core-streaming-rs", + USER_AGENT, env!("CARGO_PKG_VERSION"), ))) .uri(uri) @@ -64,22 +124,17 @@ fn async_conjure_streaming_client(uri: Url) -> Result { // enables retries for POST endpoints like the streaming ingest one .idempotency(Idempotency::Always) .build() - .map(|client| client.into()) -} - -#[derive(From, Clone)] -pub struct StreamingClient(Client); - -impl Debug for StreamingClient { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "StreamingClient") - } } -impl StreamingClient { - pub async fn send(&self, req: WriteRequest<'_>) -> Result, Error> { - self.0.send(req).await - } +fn async_conjure_client(service: &'static str, uri: Url) -> Result { + Client::builder() + .service(service) + .user_agent(UserAgent::new(Agent::new( + USER_AGENT, + env!("CARGO_PKG_VERSION"), + ))) + .uri(uri) + .build() } pub type WriteRequest<'a> = Request>; diff --git a/src/consumer.rs b/src/consumer.rs index f6e6c67..576af8b 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -19,7 +19,7 @@ use parking_lot::Mutex; use prost::Message; use tracing::warn; -use crate::client::StreamingClient; +use crate::client::NominalApiClients; use crate::client::{self}; use crate::notifier::NominalStreamListener; use crate::types::AuthProvider; @@ -45,24 +45,24 @@ pub trait WriteRequestConsumer: Send + Sync + Debug { } #[derive(Clone)] -pub struct NominalCoreConsumer { - client: StreamingClient, +pub struct NominalCoreConsumer { + client: NominalApiClients, handle: tokio::runtime::Handle, - token_provider: T, + auth_provider: A, data_source_rid: ResourceIdentifier, } -impl NominalCoreConsumer { +impl NominalCoreConsumer { pub fn new( - client: StreamingClient, + client: NominalApiClients, handle: tokio::runtime::Handle, - token_provider: T, + auth_provider: A, data_source_rid: ResourceIdentifier, ) -> Self { Self { client, handle, - token_provider, + auth_provider, data_source_rid, } } @@ -80,7 +80,7 @@ impl Debug for NominalCoreConsumer { impl WriteRequestConsumer for NominalCoreConsumer { fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> { let token = self - .token_provider + .auth_provider .token() .ok_or(ConsumerError::MissingTokenError)?; let write_request = diff --git a/src/lib.rs b/src/lib.rs index 3666a81..9846357 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,8 +18,6 @@ pub mod prelude { pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest; pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal; - pub use crate::client::PRODUCTION_STREAMING_CLIENT; - pub use crate::client::STAGING_STREAMING_CLIENT; pub use crate::consumer::NominalCoreConsumer; pub use crate::stream::NominalDatasetStream; pub use crate::stream::NominalDatasourceStream; diff --git a/src/stream.rs b/src/stream.rs index 941090d..4c39b65 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -28,8 +28,7 @@ use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; - -use crate::client::PRODUCTION_STREAMING_CLIENT; +use crate::client::PRODUCTION_CLIENTS; use crate::consumer::AvroFileConsumer; use crate::consumer::DualWriteRequestConsumer; use crate::consumer::ListeningWriteRequestConsumer; @@ -129,7 +128,7 @@ impl NominalDatasetStreamBuilder { .as_ref() .map(|(token, dataset, handle)| { NominalCoreConsumer::new( - PRODUCTION_STREAMING_CLIENT.clone(), + PRODUCTION_CLIENTS.clone(), handle.clone(), token.clone(), dataset.clone(), diff --git a/src/types.rs b/src/types.rs index 8ded0c5..bdd8beb 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::time::Duration; -use conjure_object::BearerToken; +use conjure_object::{BearerToken, ResourceIdentifier}; use nominal_api::tonic::google::protobuf::Timestamp; use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType; use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint; @@ -53,6 +53,10 @@ impl ChannelDescriptor { pub trait AuthProvider: Clone + Send + Sync { fn token(&self) -> Option; + + fn workspace_rid(&self) -> Option { + None + } } pub trait IntoPoints { From 65305950d213f406c06ea5e36f2b31bda9b9db88 Mon Sep 17 00:00:00 2001 From: Joy He Date: Sun, 28 Sep 2025 16:51:37 -0700 Subject: [PATCH 4/9] fmt --- src/lib.rs | 2 +- src/stream.rs | 1 + src/types.rs | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9846357..24ddb75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,8 @@ pub mod client; pub mod consumer; pub mod notifier; pub mod stream; -pub mod upload; mod types; +pub mod upload; /// This includes the most common types in this crate, re-exported for your convenience. pub mod prelude { diff --git a/src/stream.rs b/src/stream.rs index 0252649..e3b787e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -28,6 +28,7 @@ use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; + use crate::client::PRODUCTION_CLIENTS; use crate::consumer::AvroFileConsumer; use crate::consumer::DualWriteRequestConsumer; diff --git a/src/types.rs b/src/types.rs index bdd8beb..1b2313e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,8 @@ use std::collections::BTreeMap; use std::time::Duration; -use conjure_object::{BearerToken, ResourceIdentifier}; +use conjure_object::BearerToken; +use conjure_object::ResourceIdentifier; use nominal_api::tonic::google::protobuf::Timestamp; use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType; use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint; From 3647b8f739d7dd7b48d34d2866d998607a1d1f34 Mon Sep 17 00:00:00 2001 From: Joy He Date: Sun, 28 Sep 2025 16:54:20 -0700 Subject: [PATCH 5/9] fix --- Cargo.lock | 31 +++++++++++++++++++++++++------ src/stream.rs | 1 - 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 691137e..13d22ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -739,6 +739,12 @@ dependencies = [ "rand 0.9.2", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.0" @@ -1381,6 +1387,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "litemap" version = "0.8.0" @@ -1456,12 +1468,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "multimap" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" - [[package]] name = "native-tls" version = "0.2.14" @@ -1988,6 +1994,19 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustix" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.23.31" diff --git a/src/stream.rs b/src/stream.rs index e3b787e..4a0d8eb 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -27,7 +27,6 @@ use parking_lot::MutexGuard; use tracing::debug; use tracing::error; use tracing::info; -use tracing::warn; use crate::client::PRODUCTION_CLIENTS; use crate::consumer::AvroFileConsumer; From 5504297cd727bb073c57b651940cf6572c3dd44d Mon Sep 17 00:00:00 2001 From: Joy He Date: Thu, 2 Oct 2025 16:17:24 -0400 Subject: [PATCH 6/9] optional workspace rid --- src/upload.rs | 173 +++++++++++++++++++++++++++----------------------- 1 file changed, 92 insertions(+), 81 deletions(-) diff --git a/src/upload.rs b/src/upload.rs index c357e3a..81042c4 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,4 +1,3 @@ -use std::future::Future; use std::io::Read; use std::io::Seek; use std::path::Path; @@ -37,14 +36,14 @@ use tracing::info; use crate::client::NominalApiClients; use crate::types::AuthProvider; -const SMALL_FILE_SIZE_LIMIT: u64 = 256 * 1024 * 1024; // 256 MB +const SMALL_FILE_SIZE_LIMIT: u64 = 512 * 1024 * 1024; // 512 MB #[derive(Clone)] -pub struct UploadManager { +pub struct AvroIngestManager { pub upload_queue: async_channel::Receiver, } -impl UploadManager { +impl AvroIngestManager { pub fn new( clients: NominalApiClients, http_client: reqwest::Client, @@ -54,7 +53,7 @@ impl UploadManager { auth_provider: impl AuthProvider + 'static, data_source_rid: ResourceIdentifier, ) -> Self { - let uploader = Uploader::new( + let uploader = FileObjectStoreUploader::new( clients.upload, clients.ingest, http_client, @@ -68,12 +67,12 @@ impl UploadManager { Self::run(upload_queue_clone, uploader, auth_provider, data_source_rid).await; }); - UploadManager { upload_queue } + AvroIngestManager { upload_queue } } pub async fn run( upload_queue: async_channel::Receiver, - uploader: Uploader, + uploader: FileObjectStoreUploader, auth_provider: impl AuthProvider + 'static, data_source_rid: ResourceIdentifier, ) { @@ -84,44 +83,26 @@ impl UploadManager { error!("Missing token for upload"); continue; }; - let Some(workspace_rid) = auth_provider.workspace_rid() else { - error!("Missing workspace RID for upload"); - continue; - }; match file { Ok(f) => { - match uploader - .upload(&token, f, file_name, workspace_rid.clone()) - .await + match upload_and_ingest_file( + uploader.clone(), + &token, + auth_provider.workspace_rid().map(WorkspaceRid::from), + f, + file_name, + &file_path, + data_source_rid.clone(), + ) + .await { - Ok(response) => { - match uploader - .ingest_avro(&token, &response, data_source_rid.clone()) - .await - { - Ok(ingest_response) => { - info!( - "Successfully uploaded and ingested file {}: {:?}", - file_name, ingest_response - ); - // remove file - if let Err(e) = std::fs::remove_file(&file_path) { - error!( - "Failed to remove file {}: {:?}", - file_path.display(), - e - ); - } else { - info!("Removed file {}", file_path.display()); - } - } - Err(e) => { - error!("Failed to ingest file {}: {:?}", file_name, e); - } - } - } + Ok(()) => {} Err(e) => { - error!("Failed to upload file {}: {:?}", file_name, e); + error!( + "Error uploading and ingesting file {}: {}", + file_path.display(), + e + ); } } } @@ -133,6 +114,47 @@ impl UploadManager { } } +async fn upload_and_ingest_file( + uploader: FileObjectStoreUploader, + token: &BearerToken, + workspace_rid: Option, + file: std::fs::File, + file_name: &str, + file_path: &PathBuf, + data_source_rid: ResourceIdentifier, +) -> Result<(), String> { + match uploader + .upload(token, file, file_name, workspace_rid) + .await + { + Ok(response) => { + match uploader + .ingest_avro(token, &response, data_source_rid) + .await + { + Ok(ingest_response) => { + info!( + "Successfully uploaded and ingested file {}: {:?}", + file_name, ingest_response + ); + if let Err(e) = std::fs::remove_file(file_path) { + Err(format!( + "Failed to remove file {}: {:?}", + file_path.display(), + e + )) + } else { + info!("Removed file {}", file_path.display()); + Ok(()) + } + } + Err(e) => Err(format!("Failed to ingest file {file_name}: {e:?}")), + } + } + Err(e) => Err(format!("Failed to upload file {file_name}: {e:?}")), + } +} + #[derive(Debug, thiserror::Error)] pub enum UploaderError { #[error("Conjure error: {0}")] @@ -175,45 +197,36 @@ impl FileWriteBody { } impl AsyncWriteBody for FileWriteBody { - #[expect(clippy::manual_async_fn)] - fn write_body( - self: Pin<&mut Self>, - w: Pin<&mut BodyWriter>, - ) -> impl Future> + Send { - async move { - let mut file = self.file.try_clone().map_err(|e| { - Error::internal_safe(format!("Failed to clone file for upload: {e}")) - })?; - - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).map_err(|e| { - Error::internal_safe(format!("Failed to read bytes from file: {e}")) - })?; - - w.write_bytes(buffer.into()) - .await - .map_err(|e| Error::internal_safe(format!("Failed to write bytes to body: {e}")))?; + async fn write_body(self: Pin<&mut Self>, w: Pin<&mut BodyWriter>) -> Result<(), Error> { + let mut file = self + .file + .try_clone() + .map_err(|e| Error::internal_safe(format!("Failed to clone file for upload: {e}")))?; - Ok(()) - } + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer) + .map_err(|e| Error::internal_safe(format!("Failed to read bytes from file: {e}")))?; + + w.write_bytes(buffer.into()) + .await + .map_err(|e| Error::internal_safe(format!("Failed to write bytes to body: {e}")))?; + + Ok(()) } - #[expect(clippy::manual_async_fn)] - fn reset(self: Pin<&mut Self>) -> impl Future + Send { - async move { - let Ok(mut file) = self.file.try_clone() else { - return false; - }; + async fn reset(self: Pin<&mut Self>) -> bool { + let Ok(mut file) = self.file.try_clone() else { + return false; + }; - use std::io::SeekFrom; + use std::io::SeekFrom; - file.seek(SeekFrom::Start(0)).is_ok() - } + file.seek(SeekFrom::Start(0)).is_ok() } } #[derive(Clone)] -pub struct Uploader { +pub struct FileObjectStoreUploader { upload_client: UploadServiceAsyncClient, ingest_client: IngestServiceAsyncClient, http_client: reqwest::Client, @@ -221,7 +234,7 @@ pub struct Uploader { opts: UploaderOpts, } -impl Uploader { +impl FileObjectStoreUploader { pub fn new( upload_client: UploadServiceAsyncClient, ingest_client: IngestServiceAsyncClient, @@ -229,7 +242,7 @@ impl Uploader { handle: tokio::runtime::Handle, opts: UploaderOpts, ) -> Self { - Uploader { + FileObjectStoreUploader { upload_client, ingest_client, http_client, @@ -242,12 +255,12 @@ impl Uploader { &self, token: &BearerToken, file_name: &str, - workspace_rid: ResourceIdentifier, + workspace_rid: Option, ) -> Result { let request = InitiateMultipartUploadRequest::builder() .filename(file_name) .filetype("application/octet-stream") - .workspace(Some(WorkspaceRid::from(workspace_rid))) + .workspace(workspace_rid) .build(); let response = self .upload_client @@ -403,18 +416,16 @@ impl Uploader { token: &BearerToken, file_name: &str, size_bytes: i64, - workspace_rid: ResourceIdentifier, + workspace_rid: Option, file: std::fs::File, ) -> Result { - let workspace_rid = WorkspaceRid::from(workspace_rid); - let s3_path = self .upload_client .upload_file( token, file_name, SafeLong::new(size_bytes).ok(), - Some(&workspace_rid), + workspace_rid.as_ref(), FileWriteBody::new(file), ) .await @@ -428,7 +439,7 @@ impl Uploader { token: &BearerToken, reader: R, file_name: impl Into<&str>, - workspace_rid: ResourceIdentifier, + workspace_rid: Option, ) -> Result where R: Read + Send + 'static, @@ -449,7 +460,7 @@ impl Uploader { } let initiate_response = self - .initiate_upload(token, file_name, workspace_rid) + .initiate_upload(token, file_name, workspace_rid.map(WorkspaceRid::from)) .await?; let upload_id = initiate_response.upload_id(); let key = initiate_response.key(); From db7f12a8510e38da6736ea0edf4fe5c7e08b67c3 Mon Sep 17 00:00:00 2001 From: Joy He Date: Thu, 2 Oct 2025 16:22:51 -0400 Subject: [PATCH 7/9] change types --- src/client.rs | 6 +++--- src/types.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9be76f7..d18fade 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,7 +18,7 @@ use conjure_runtime::Client; use conjure_runtime::Idempotency; use conjure_runtime::ResponseBody; use conjure_runtime::UserAgent; -use nominal_api::api::rids::NominalDataSourceOrDatasetRid; +use nominal_api::api::rids::{NominalDataSourceOrDatasetRid, WorkspaceRid}; use nominal_api::ingest::api::IngestServiceAsyncClient; use nominal_api::upload::api::UploadServiceAsyncClient; use snap::write::FrameEncoder; @@ -46,7 +46,7 @@ impl AuthProvider for BearerToken { #[derive(Debug, Clone)] pub struct TokenAndWorkspaceRid { pub token: BearerToken, - pub workspace_rid: Option, + pub workspace_rid: Option, } impl AuthProvider for TokenAndWorkspaceRid { @@ -54,7 +54,7 @@ impl AuthProvider for TokenAndWorkspaceRid { Some(self.token.clone()) } - fn workspace_rid(&self) -> Option { + fn workspace_rid(&self) -> Option { self.workspace_rid.clone() } } diff --git a/src/types.rs b/src/types.rs index 1b2313e..b261479 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::time::Duration; use conjure_object::BearerToken; -use conjure_object::ResourceIdentifier; +use nominal_api::api::rids::WorkspaceRid; use nominal_api::tonic::google::protobuf::Timestamp; use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType; use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint; @@ -55,7 +55,7 @@ impl ChannelDescriptor { pub trait AuthProvider: Clone + Send + Sync { fn token(&self) -> Option; - fn workspace_rid(&self) -> Option { + fn workspace_rid(&self) -> Option { None } } From 1261c7acc1199a544ff5d7f9fbd319ccb82e766b Mon Sep 17 00:00:00 2001 From: Joy He Date: Thu, 2 Oct 2025 16:25:52 -0400 Subject: [PATCH 8/9] change defaults --- src/upload.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/upload.rs b/src/upload.rs index 81042c4..6d96a2e 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -179,9 +179,9 @@ pub struct UploaderOpts { impl Default for UploaderOpts { fn default() -> Self { UploaderOpts { - chunk_size: 64_000_000, // 128 MB + chunk_size: 512 * 1024 * 1024, // 512 MB max_retries: 3, - max_concurrent_uploads: 8, + max_concurrent_uploads: 1, } } } From 01fbf25760de021eda83a3fd944ca10ea397eefd Mon Sep 17 00:00:00 2001 From: Joy He Date: Thu, 2 Oct 2025 17:24:48 -0400 Subject: [PATCH 9/9] fmt --- src/client.rs | 3 ++- src/upload.rs | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index d18fade..89f4b9b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,7 +18,8 @@ use conjure_runtime::Client; use conjure_runtime::Idempotency; use conjure_runtime::ResponseBody; use conjure_runtime::UserAgent; -use nominal_api::api::rids::{NominalDataSourceOrDatasetRid, WorkspaceRid}; +use nominal_api::api::rids::NominalDataSourceOrDatasetRid; +use nominal_api::api::rids::WorkspaceRid; use nominal_api::ingest::api::IngestServiceAsyncClient; use nominal_api::upload::api::UploadServiceAsyncClient; use snap::write::FrameEncoder; diff --git a/src/upload.rs b/src/upload.rs index 6d96a2e..0c85c27 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -123,10 +123,7 @@ async fn upload_and_ingest_file( file_path: &PathBuf, data_source_rid: ResourceIdentifier, ) -> Result<(), String> { - match uploader - .upload(token, file, file_name, workspace_rid) - .await - { + match uploader.upload(token, file, file_name, workspace_rid).await { Ok(response) => { match uploader .ingest_avro(token, &response, data_source_rid)