Skip to content

feat: uploader and upload manager#21

Merged
joyhe208 merged 10 commits intomainfrom
joy/upload
Oct 7, 2025
Merged

feat: uploader and upload manager#21
joyhe208 merged 10 commits intomainfrom
joy/upload

Conversation

@joyhe208
Copy link
Contributor

@joyhe208 joyhe208 commented Sep 28, 2025

No description provided.

Copy link
Contributor Author

joyhe208 commented Sep 28, 2025

@joyhe208 joyhe208 changed the title upload feat: uploader and upload manager Sep 28, 2025
@joyhe208 joyhe208 marked this pull request as ready for review September 28, 2025 23:11
src/client.rs Outdated
Comment on lines 57 to 60
fn workspace_rid(&self) -> Option<ResourceIdentifier> {
self.workspace_rid.clone()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which endpoints are we using that require a workspace rid? is it needed for any of the upload endpoints or do we use a listResource type endpoints now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need it for upload

src/types.rs Outdated
Comment on lines 58 to 61
fn workspace_rid(&self) -> Option<ResourceIdentifier> {
None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have a default? will behaviour changes if the auth provider doesn't have a workspace rid?

Comment on lines 91 to 132
match file {
Ok(f) => {
match uploader
.upload(&token, f, file_name, workspace_rid.clone())
.await
{
Ok(response) => {
match uploader
.ingest_avro(&token, &response, data_source_rid.clone())
.await
{
Ok(ingest_response) => {
info!(
"Successfully uploaded and ingested file {}: {:?}",
file_name, ingest_response
);
// remove file
if let Err(e) = std::fs::remove_file(&file_path) {
error!(
"Failed to remove file {}: {:?}",
file_path.display(),
e
);
} else {
info!("Removed file {}", file_path.display());
}
}
Err(e) => {
error!("Failed to ingest file {}: {:?}", file_name, e);
}
}
}
Err(e) => {
error!("Failed to upload file {}: {:?}", file_name, e);
}
}
}
Err(e) => {
error!("Failed to open file {}: {:?}", file_path.display(), e);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be wrapped into a function that returns Result<.., String> (or similar), then can use the ? operator + a single match on Ok(..) / Err for the logging?

src/upload.rs Outdated
Comment on lines 160 to 162
chunk_size: 64_000_000, // 128 MB
max_retries: 3,
max_concurrent_uploads: 8,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the use-case of the re-uploading failed points, feels like these are the wrong defaults - if retries fail, the network is probably down, and 8 concurrent uploads seems like a lot in parallel with actual streams

src/upload.rs Outdated
Comment on lines 188 to 197
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).map_err(|e| {
Error::internal_safe(format!("Failed to read bytes from file: {e}"))
})?;

w.write_bytes(buffer.into())
.await
.map_err(|e| Error::internal_safe(format!("Failed to write bytes to body: {e}")))?;

Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for the "upload while ingesting" but reading the whole file into memory won't work so well for larger files - something to consider if we need to support backfill use cases some day

src/upload.rs Outdated
}
}

#[expect(clippy::manual_async_fn)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me why these are necessary?

src/upload.rs Outdated
pub upload_queue: async_channel::Receiver<PathBuf>,
}

impl UploadManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we name this something more specific? e.g AvroIngestManager ? similar for Uploader - FileObjectStoreUploader ?

Comment on lines 74 to 80
pub async fn run(
upload_queue: async_channel::Receiver<PathBuf>,
uploader: Uploader,
auth_provider: impl AuthProvider + 'static,
data_source_rid: ResourceIdentifier,
) {
while let Ok(file_path) = upload_queue.recv().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious if you tried a fire-and-forget task approach instead - rough gist would be, we spawn a new 'ingest file' task that either uploads (and deletes) the file, or fails and leaves it on-disk

src/upload.rs Outdated
let request = InitiateMultipartUploadRequest::builder()
.filename(file_name)
.filetype("application/octet-stream")
.workspace(Some(WorkspaceRid::from(workspace_rid)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this fail if workspace_rid is None? could we e.g default to checking the user's workspaces & if there's only one, use that else throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like using the auth endpoints?

Copy link
Contributor

alxhill commented Oct 1, 2025

okay next step for this PR is making the workspace rid optional even when doing the upload (as it's not required if there's a default workspace for the user account).

we can merge this as-is & explore the fire & forget upload task approach in a follow up

Copy link
Contributor

@alxhill alxhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving (see notes for next steps)

@joyhe208 joyhe208 merged commit eacd595 into main Oct 7, 2025
3 checks passed
@nominal-bot nominal-bot mentioned this pull request Oct 7, 2025
@drake-nominal drake-nominal deleted the joy/upload branch October 23, 2025 00:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants