-
Notifications
You must be signed in to change notification settings - Fork 3
add batch upload and download with concurrency control #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
add batch upload and download with concurrency control #74
Conversation
Reviewer's GuideAdds batch upload and download capabilities with concurrency and memory backpressure control to the Lighthouse AI SDK and MCP server, including new batch tool endpoints, types, and a more robust single-file download implementation. Sequence diagram for batch upload via MCP tool with concurrency and memory backpressuresequenceDiagram
actor User
participant Tool as LighthouseBatchUploadTool
participant Service as LighthouseService
participant SDK as LighthouseAISDK
participant MM as MemoryManager
participant BP as BatchProcessor
participant IPFS as Lighthouse_IPFS
User->>Tool: execute({ filePaths, concurrency, encrypt, ... })
Tool->>Tool: validateParams()
Tool-->>User: error (invalid params) note over Tool,User: optional early return
User->>Tool: execute(valid params)
Tool->>Service: batchUploadFiles(filePaths, BatchUploadOptions)
Service->>SDK: batchUpload(BatchUploadInput[], BatchUploadOptions)
SDK-->>User: emit batch:upload:start
SDK->>BP: new BatchProcessor(uploadHandler, { concurrency, maxRetries, ... })
SDK->>BP: addBatch(operations)
loop per file (concurrent up to concurrency)
BP->>SDK: uploadHandler(BatchUploadInput)
SDK->>MM: isUnderBackpressure()
alt under backpressure
SDK-->>User: emit batch:backpressure(waiting true)
SDK->>MM: waitForRelief(30000)
SDK-->>User: emit batch:backpressure(waiting false)
end
SDK->>fs: validateFile(filePath)
SDK-->>SDK: fileStats
SDK->>MM: track(memoryId, fileStats.size, meta)
SDK->>SDK: uploadFile(filePath, uploadOptions)
SDK->>IPFS: upload stream
IPFS-->>SDK: upload result (FileInfo)
SDK->>MM: untrack(memoryId)
BP-->>SDK: per file result
SDK-->>User: emit batch:upload:progress(completed, total, failures)
end
BP-->>SDK: batchResults
SDK-->>Service: BatchOperationResult
SDK-->>User: emit batch:upload:complete
Service->>Service: cache and persist successful files
Service-->>Tool: BatchOperationResult
Tool-->>User: formatted batch upload summary
Sequence diagram for batch download via MCP tool with concurrency and memory backpressuresequenceDiagram
actor User
participant Tool as LighthouseBatchDownloadTool
participant Service as LighthouseService
participant SDK as LighthouseAISDK
participant MM as MemoryManager
participant BP as BatchProcessor
participant Gateway as Lighthouse_IPFS_Gateway
participant FS as FileSystem
User->>Tool: execute({ cids, outputDir, concurrency, decrypt })
Tool->>Tool: validateParams()
Tool-->>User: error (invalid params) note over Tool,User: optional early return
User->>Tool: execute(valid params)
Tool->>Service: batchDownloadFiles(cids, BatchDownloadOptions)
Service->>SDK: batchDownload(BatchDownloadInput[], BatchDownloadOptions)
SDK-->>User: emit batch:download:start
SDK->>BP: new BatchProcessor(downloadHandler, { concurrency, maxRetries, ... })
SDK->>BP: addBatch(operations)
loop per CID (concurrent up to concurrency)
BP->>SDK: downloadHandler(BatchDownloadInput)
SDK->>MM: isUnderBackpressure()
alt under backpressure
SDK-->>User: emit batch:backpressure(waiting true)
SDK->>MM: waitForRelief(30000)
SDK-->>User: emit batch:backpressure(waiting false)
end
SDK->>MM: track(memoryId, expectedSize, meta)
SDK->>SDK: downloadFile(cid, outputPath, DownloadOptions)
SDK->>Gateway: GET /ipfs/cid (stream)
Gateway-->>SDK: stream response
SDK->>FS: write stream to outputPath
FS-->>SDK: file written
SDK->>FS: stat(outputPath)
SDK-->>SDK: fileStats
SDK->>MM: untrack(memoryId)
BP-->>SDK: per file BatchDownloadFileResult
SDK-->>User: emit batch:download:progress(completed, total, failures)
end
BP-->>SDK: batchResults
SDK-->>Service: BatchOperationResult
SDK-->>User: emit batch:download:complete
Service-->>Tool: BatchOperationResult
Tool-->>User: formatted batch download summary
Class diagram for new batch operations and memory managementclassDiagram
class LighthouseAISDK {
- AuthManager auth
- ProgressTracker progress
- CircuitBreaker circuitBreaker
- EncryptionManager encryption
- RateLimiter rateLimiter
- MemoryManager memoryManager
- LighthouseConfig config
+ batchUpload(files BatchUploadInput[], options BatchUploadOptions) BatchOperationResult~FileInfo~
+ batchDownload(files BatchDownloadInput[], options BatchDownloadOptions) BatchOperationResult~BatchDownloadFileResult~
+ downloadFile(cid string, outputPath string, options DownloadOptions) Promise~string~
+ getMemoryStats() MemoryStats
+ isUnderBackpressure() boolean
+ destroy() void
}
class MemoryManager {
+ track(id string, size number, meta any) void
+ untrack(id string) void
+ isUnderBackpressure() boolean
+ waitForRelief(timeoutMs number) Promise~void~
+ getStats() MemoryStats
+ destroy() void
}
class BatchProcessor~TInput, TResult~ {
+ constructor(handler function, options BatchProcessorOptions)
+ addBatch(operations BatchOperation~TInput~[]) Promise~BatchProcessorResult~TResult~[]~
+ destroy() void
}
class DownloadOptions {
+ onProgress(progress ProgressInfo) void
+ expectedSize number
+ decrypt boolean
+ timeout number
}
class BatchUploadOptions {
+ concurrency number
+ encrypt boolean
+ accessConditions AccessCondition[]
+ tags string[]
+ metadata Record~string, any~
+ onProgress(completed number, total number, failures number) void
+ continueOnError boolean
+ maxRetries number
}
class BatchDownloadOptions {
+ concurrency number
+ outputDir string
+ decrypt boolean
+ onProgress(completed number, total number, failures number) void
+ continueOnError boolean
+ maxRetries number
}
class BatchUploadInput {
+ filePath string
+ fileName string
+ metadata Record~string, any~
}
class BatchDownloadInput {
+ cid string
+ outputFileName string
+ expectedSize number
}
class BatchFileResult~T~ {
+ id string
+ success boolean
+ data T
+ error string
+ duration number
+ retries number
}
class BatchOperationResult~T~ {
+ total number
+ successful number
+ failed number
+ results BatchFileResult~T~[]
+ totalDuration number
+ averageDuration number
+ successRate number
}
class BatchDownloadFileResult {
+ cid string
+ filePath string
+ size number
+ decrypted boolean
}
class ILighthouseService {
<<interface>>
+ uploadFile(...) Promise~UploadResult~
+ fetchFile(...) Promise~DownloadResult~
+ batchUploadFiles(filePaths string[], options BatchUploadOptions) Promise~BatchOperationResult~FileInfo~~
+ batchDownloadFiles(cids string[], options BatchDownloadOptions) Promise~BatchOperationResult~BatchDownloadFileResult~~
}
class LighthouseService {
+ batchUploadFiles(filePaths string[], options BatchUploadOptions) Promise~BatchOperationResult~FileInfo~~
+ batchDownloadFiles(cids string[], options BatchDownloadOptions) Promise~BatchOperationResult~BatchDownloadFileResult~~
}
class LighthouseBatchUploadTool {
- ILighthouseService service
- Logger logger
+ constructor(service ILighthouseService, logger Logger)
+ execute(args Record~string, unknown~) Promise~ProgressAwareToolResult~
+ getDefinition() MCPToolDefinition$static
}
class LighthouseBatchDownloadTool {
- ILighthouseService service
- Logger logger
+ constructor(service ILighthouseService, logger Logger)
+ execute(args Record~string, unknown~) Promise~ProgressAwareToolResult~
+ getDefinition() MCPToolDefinition$static
}
LighthouseAISDK --> MemoryManager : uses
LighthouseAISDK --> BatchProcessor : uses
LighthouseAISDK --> BatchUploadOptions
LighthouseAISDK --> BatchDownloadOptions
LighthouseAISDK --> BatchUploadInput
LighthouseAISDK --> BatchDownloadInput
LighthouseAISDK --> BatchOperationResult
LighthouseAISDK --> BatchDownloadFileResult
LighthouseService ..|> ILighthouseService
LighthouseService --> LighthouseAISDK : uses
LighthouseBatchUploadTool --> ILighthouseService : uses
LighthouseBatchDownloadTool --> ILighthouseService : uses
BatchOperationResult --> BatchFileResult
BatchFileResult --> BatchDownloadFileResult
BatchFileResult --> FileInfo
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - I've found 2 issues, and left some high level feedback:
- In
LighthouseBatchDownloadTool.validateParams, you're importingfsfromfs/promisesbut usingfs.constantsinfs.access;fs/promisesdoesn't exposeconstants, so you should importfs(callbacks) for constants or pullconstantsfromnode:fsseparately. - In
batchDownload, the output path is constructed with string interpolation (${outputDir}/${fileName}); consider usingpath.jointo avoid issues on Windows and ensure correct path handling. - The new
DownloadOptions.timeoutfield is not wired intodownloadFile(you always derivedynamicTimeoutfromexpectedSize); consider allowing an explicittimeoutoption to override the dynamic calculation so callers can control download timeouts more precisely.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `LighthouseBatchDownloadTool.validateParams`, you're importing `fs` from `fs/promises` but using `fs.constants` in `fs.access`; `fs/promises` doesn't expose `constants`, so you should import `fs` (callbacks) for constants or pull `constants` from `node:fs` separately.
- In `batchDownload`, the output path is constructed with string interpolation (`${outputDir}/${fileName}`); consider using `path.join` to avoid issues on Windows and ensure correct path handling.
- The new `DownloadOptions.timeout` field is not wired into `downloadFile` (you always derive `dynamicTimeout` from `expectedSize`); consider allowing an explicit `timeout` option to override the dynamic calculation so callers can control download timeouts more precisely.
## Individual Comments
### Comment 1
<location> `packages/sdk-wrapper/src/LighthouseAISDK.ts:462-471` </location>
<code_context>
+
+ // Calculate timeout based on expected size (minimum 2 minutes, +30s per 10MB)
+ const expectedSizeMB = (options.expectedSize || 10 * 1024 * 1024) / (1024 * 1024);
+ const dynamicTimeout = Math.max(120000, 120000 + (expectedSizeMB / 10) * 30000);
// Update progress to downloading phase
this.progress.updateProgress(operationId, 0, "downloading");
- // Create progress callback
- const progressCallback = this.progress.createProgressCallback(operationId);
+ // Download with progress tracking
+ const response = await axios({
+ method: "GET",
+ url: gatewayUrl,
+ responseType: "stream",
+ timeout: dynamicTimeout,
+ headers: {
+ "User-Agent": "LighthouseAISDK/1.0",
</code_context>
<issue_to_address>
**issue (bug_risk):** Download options timeout and decrypt flags are defined but not respected in this implementation.
This divergence between the interface and behavior can lead to unexpected timeouts and unused flags. Please either let `options.timeout` override `dynamicTimeout` when set, and hook `options.decrypt` into the decryption flow, or remove `decrypt` from `DownloadOptions` until it is supported.
</issue_to_address>
### Comment 2
<location> `apps/mcp-server/src/tools/LighthouseBatchDownloadTool.ts:153` </location>
<code_context>
+ }
+
+ try {
+ await fs.access(params.outputDir, fs.constants.W_OK);
+ } catch {
+ // Try to create directory
</code_context>
<issue_to_address>
**issue (bug_risk):** Using fs.constants with the promises API will throw because constants is not available on fs/promises.
Since `fs` here comes from `"fs/promises"`, `constants` is not defined and the access mode will be invalid. Please either import `constants` (or `access`/`mkdir`) from `"fs"` and use `fsConstants.W_OK`, or import `fs` from `"fs"` for constants and `fsPromises` from `"fs/promises"` for the async calls, so output directory validation works correctly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Please can you review and merge if its all good? cc @Patrick-Ehimen |
I will later today @bomanaps |
Pull Request
Description
#55
Type of change
Checklist
Related Issues
Screenshots (if applicable)
Summary by Sourcery
Add concurrent batch upload and download capabilities with memory-aware backpressure and improved single-file download handling across the SDK and MCP server.
New Features:
Enhancements: