Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
src/client.rs
Outdated
| fn workspace_rid(&self) -> Option<ResourceIdentifier> { | ||
| self.workspace_rid.clone() | ||
| } | ||
| } |
There was a problem hiding this comment.
which endpoints are we using that require a workspace rid? is it needed for any of the upload endpoints or do we use a listResource type endpoints now?
There was a problem hiding this comment.
we need it for upload
src/types.rs
Outdated
| fn workspace_rid(&self) -> Option<ResourceIdentifier> { | ||
| None | ||
| } | ||
| } |
There was a problem hiding this comment.
why does this have a default? will behaviour changes if the auth provider doesn't have a workspace rid?
| match file { | ||
| Ok(f) => { | ||
| match uploader | ||
| .upload(&token, f, file_name, workspace_rid.clone()) | ||
| .await | ||
| { | ||
| Ok(response) => { | ||
| match uploader | ||
| .ingest_avro(&token, &response, data_source_rid.clone()) | ||
| .await | ||
| { | ||
| Ok(ingest_response) => { | ||
| info!( | ||
| "Successfully uploaded and ingested file {}: {:?}", | ||
| file_name, ingest_response | ||
| ); | ||
| // remove file | ||
| if let Err(e) = std::fs::remove_file(&file_path) { | ||
| error!( | ||
| "Failed to remove file {}: {:?}", | ||
| file_path.display(), | ||
| e | ||
| ); | ||
| } else { | ||
| info!("Removed file {}", file_path.display()); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| error!("Failed to ingest file {}: {:?}", file_name, e); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| error!("Failed to upload file {}: {:?}", file_name, e); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| error!("Failed to open file {}: {:?}", file_path.display(), e); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
could this be wrapped into a function that returns Result<.., String> (or similar), then can use the ? operator + a single match on Ok(..) / Err for the logging?
src/upload.rs
Outdated
| chunk_size: 64_000_000, // 128 MB | ||
| max_retries: 3, | ||
| max_concurrent_uploads: 8, |
There was a problem hiding this comment.
for the use-case of the re-uploading failed points, feels like these are the wrong defaults - if retries fail, the network is probably down, and 8 concurrent uploads seems like a lot in parallel with actual streams
src/upload.rs
Outdated
| let mut buffer = Vec::new(); | ||
| file.read_to_end(&mut buffer).map_err(|e| { | ||
| Error::internal_safe(format!("Failed to read bytes from file: {e}")) | ||
| })?; | ||
|
|
||
| w.write_bytes(buffer.into()) | ||
| .await | ||
| .map_err(|e| Error::internal_safe(format!("Failed to write bytes to body: {e}")))?; | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
This is fine for the "upload while ingesting" but reading the whole file into memory won't work so well for larger files - something to consider if we need to support backfill use cases some day
src/upload.rs
Outdated
| } | ||
| } | ||
|
|
||
| #[expect(clippy::manual_async_fn)] |
There was a problem hiding this comment.
remind me why these are necessary?
src/upload.rs
Outdated
| pub upload_queue: async_channel::Receiver<PathBuf>, | ||
| } | ||
|
|
||
| impl UploadManager { |
There was a problem hiding this comment.
nit: can we name this something more specific? e.g AvroIngestManager ? similar for Uploader - FileObjectStoreUploader ?
| pub async fn run( | ||
| upload_queue: async_channel::Receiver<PathBuf>, | ||
| uploader: Uploader, | ||
| auth_provider: impl AuthProvider + 'static, | ||
| data_source_rid: ResourceIdentifier, | ||
| ) { | ||
| while let Ok(file_path) = upload_queue.recv().await { |
There was a problem hiding this comment.
curious if you tried a fire-and-forget task approach instead - rough gist would be, we spawn a new 'ingest file' task that either uploads (and deletes) the file, or fails and leaves it on-disk
src/upload.rs
Outdated
| let request = InitiateMultipartUploadRequest::builder() | ||
| .filename(file_name) | ||
| .filetype("application/octet-stream") | ||
| .workspace(Some(WorkspaceRid::from(workspace_rid))) |
There was a problem hiding this comment.
does this fail if workspace_rid is None? could we e.g default to checking the user's workspaces & if there's only one, use that else throw?
There was a problem hiding this comment.
Like using the auth endpoints?
|
okay next step for this PR is making the workspace rid optional even when doing the upload (as it's not required if there's a default workspace for the user account). we can merge this as-is & explore the fire & forget upload task approach in a follow up |
alxhill
left a comment
There was a problem hiding this comment.
approving (see notes for next steps)

No description provided.