Skip to content

Conversation

@bomanaps
Copy link
Collaborator

@bomanaps bomanaps commented Feb 10, 2026

Pull Request

Description

#55

Type of change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Other (describe):

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

Related Issues

Screenshots (if applicable)

Summary by Sourcery

Add concurrent batch upload and download capabilities with memory-aware backpressure and improved single-file download handling across the SDK and MCP server.

New Features:

  • Introduce batchUpload and batchDownload APIs in the SDK with configurable concurrency, retries, and progress events for bulk file operations.
  • Expose batch file upload and download capabilities via new MCP tools and corresponding Lighthouse service methods.
  • Add memory manager integration to track per-operation memory usage and apply backpressure during intensive upload/download batches.
  • Provide new batch operation type definitions for uploads and downloads, including per-file and aggregate result structures.

Enhancements:

  • Replace simple download implementation with streamed HTTP gateway downloads that include CID validation, dynamic timeouts, progress reporting, and richer error messages.
  • Add helper API on the SDK to surface memory manager stats and backpressure status.
  • Extend the MCP tool registry to register the new batch upload and download tools.

@sourcery-ai
Copy link

sourcery-ai bot commented Feb 10, 2026

Reviewer's Guide

Adds batch upload and download capabilities with concurrency and memory backpressure control to the Lighthouse AI SDK and MCP server, including new batch tool endpoints, types, and a more robust single-file download implementation.

Sequence diagram for batch upload via MCP tool with concurrency and memory backpressure

sequenceDiagram
  actor User
  participant Tool as LighthouseBatchUploadTool
  participant Service as LighthouseService
  participant SDK as LighthouseAISDK
  participant MM as MemoryManager
  participant BP as BatchProcessor
  participant IPFS as Lighthouse_IPFS

  User->>Tool: execute({ filePaths, concurrency, encrypt, ... })
  Tool->>Tool: validateParams()
  Tool-->>User: error (invalid params) note over Tool,User: optional early return

  User->>Tool: execute(valid params)
  Tool->>Service: batchUploadFiles(filePaths, BatchUploadOptions)
  Service->>SDK: batchUpload(BatchUploadInput[], BatchUploadOptions)
  SDK-->>User: emit batch:upload:start

  SDK->>BP: new BatchProcessor(uploadHandler, { concurrency, maxRetries, ... })
  SDK->>BP: addBatch(operations)

  loop per file (concurrent up to concurrency)
    BP->>SDK: uploadHandler(BatchUploadInput)
    SDK->>MM: isUnderBackpressure()
    alt under backpressure
      SDK-->>User: emit batch:backpressure(waiting true)
      SDK->>MM: waitForRelief(30000)
      SDK-->>User: emit batch:backpressure(waiting false)
    end

    SDK->>fs: validateFile(filePath)
    SDK-->>SDK: fileStats
    SDK->>MM: track(memoryId, fileStats.size, meta)

    SDK->>SDK: uploadFile(filePath, uploadOptions)
    SDK->>IPFS: upload stream
    IPFS-->>SDK: upload result (FileInfo)

    SDK->>MM: untrack(memoryId)
    BP-->>SDK: per file result
    SDK-->>User: emit batch:upload:progress(completed, total, failures)
  end

  BP-->>SDK: batchResults
  SDK-->>Service: BatchOperationResult
  SDK-->>User: emit batch:upload:complete
  Service->>Service: cache and persist successful files
  Service-->>Tool: BatchOperationResult
  Tool-->>User: formatted batch upload summary
Loading

Sequence diagram for batch download via MCP tool with concurrency and memory backpressure

sequenceDiagram
  actor User
  participant Tool as LighthouseBatchDownloadTool
  participant Service as LighthouseService
  participant SDK as LighthouseAISDK
  participant MM as MemoryManager
  participant BP as BatchProcessor
  participant Gateway as Lighthouse_IPFS_Gateway
  participant FS as FileSystem

  User->>Tool: execute({ cids, outputDir, concurrency, decrypt })
  Tool->>Tool: validateParams()
  Tool-->>User: error (invalid params) note over Tool,User: optional early return

  User->>Tool: execute(valid params)
  Tool->>Service: batchDownloadFiles(cids, BatchDownloadOptions)
  Service->>SDK: batchDownload(BatchDownloadInput[], BatchDownloadOptions)
  SDK-->>User: emit batch:download:start

  SDK->>BP: new BatchProcessor(downloadHandler, { concurrency, maxRetries, ... })
  SDK->>BP: addBatch(operations)

  loop per CID (concurrent up to concurrency)
    BP->>SDK: downloadHandler(BatchDownloadInput)
    SDK->>MM: isUnderBackpressure()
    alt under backpressure
      SDK-->>User: emit batch:backpressure(waiting true)
      SDK->>MM: waitForRelief(30000)
      SDK-->>User: emit batch:backpressure(waiting false)
    end

    SDK->>MM: track(memoryId, expectedSize, meta)

    SDK->>SDK: downloadFile(cid, outputPath, DownloadOptions)
    SDK->>Gateway: GET /ipfs/cid (stream)
    Gateway-->>SDK: stream response
    SDK->>FS: write stream to outputPath
    FS-->>SDK: file written

    SDK->>FS: stat(outputPath)
    SDK-->>SDK: fileStats
    SDK->>MM: untrack(memoryId)
    BP-->>SDK: per file BatchDownloadFileResult
    SDK-->>User: emit batch:download:progress(completed, total, failures)
  end

  BP-->>SDK: batchResults
  SDK-->>Service: BatchOperationResult
  SDK-->>User: emit batch:download:complete
  Service-->>Tool: BatchOperationResult
  Tool-->>User: formatted batch download summary
Loading

Class diagram for new batch operations and memory management

classDiagram
  class LighthouseAISDK {
    - AuthManager auth
    - ProgressTracker progress
    - CircuitBreaker circuitBreaker
    - EncryptionManager encryption
    - RateLimiter rateLimiter
    - MemoryManager memoryManager
    - LighthouseConfig config
    + batchUpload(files BatchUploadInput[], options BatchUploadOptions) BatchOperationResult~FileInfo~
    + batchDownload(files BatchDownloadInput[], options BatchDownloadOptions) BatchOperationResult~BatchDownloadFileResult~
    + downloadFile(cid string, outputPath string, options DownloadOptions) Promise~string~
    + getMemoryStats() MemoryStats
    + isUnderBackpressure() boolean
    + destroy() void
  }

  class MemoryManager {
    + track(id string, size number, meta any) void
    + untrack(id string) void
    + isUnderBackpressure() boolean
    + waitForRelief(timeoutMs number) Promise~void~
    + getStats() MemoryStats
    + destroy() void
  }

  class BatchProcessor~TInput, TResult~ {
    + constructor(handler function, options BatchProcessorOptions)
    + addBatch(operations BatchOperation~TInput~[]) Promise~BatchProcessorResult~TResult~[]~
    + destroy() void
  }

  class DownloadOptions {
    + onProgress(progress ProgressInfo) void
    + expectedSize number
    + decrypt boolean
    + timeout number
  }

  class BatchUploadOptions {
    + concurrency number
    + encrypt boolean
    + accessConditions AccessCondition[]
    + tags string[]
    + metadata Record~string, any~
    + onProgress(completed number, total number, failures number) void
    + continueOnError boolean
    + maxRetries number
  }

  class BatchDownloadOptions {
    + concurrency number
    + outputDir string
    + decrypt boolean
    + onProgress(completed number, total number, failures number) void
    + continueOnError boolean
    + maxRetries number
  }

  class BatchUploadInput {
    + filePath string
    + fileName string
    + metadata Record~string, any~
  }

  class BatchDownloadInput {
    + cid string
    + outputFileName string
    + expectedSize number
  }

  class BatchFileResult~T~ {
    + id string
    + success boolean
    + data T
    + error string
    + duration number
    + retries number
  }

  class BatchOperationResult~T~ {
    + total number
    + successful number
    + failed number
    + results BatchFileResult~T~[]
    + totalDuration number
    + averageDuration number
    + successRate number
  }

  class BatchDownloadFileResult {
    + cid string
    + filePath string
    + size number
    + decrypted boolean
  }

  class ILighthouseService {
    <<interface>>
    + uploadFile(...) Promise~UploadResult~
    + fetchFile(...) Promise~DownloadResult~
    + batchUploadFiles(filePaths string[], options BatchUploadOptions) Promise~BatchOperationResult~FileInfo~~
    + batchDownloadFiles(cids string[], options BatchDownloadOptions) Promise~BatchOperationResult~BatchDownloadFileResult~~
  }

  class LighthouseService {
    + batchUploadFiles(filePaths string[], options BatchUploadOptions) Promise~BatchOperationResult~FileInfo~~
    + batchDownloadFiles(cids string[], options BatchDownloadOptions) Promise~BatchOperationResult~BatchDownloadFileResult~~
  }

  class LighthouseBatchUploadTool {
    - ILighthouseService service
    - Logger logger
    + constructor(service ILighthouseService, logger Logger)
    + execute(args Record~string, unknown~) Promise~ProgressAwareToolResult~
    + getDefinition() MCPToolDefinition$static
  }

  class LighthouseBatchDownloadTool {
    - ILighthouseService service
    - Logger logger
    + constructor(service ILighthouseService, logger Logger)
    + execute(args Record~string, unknown~) Promise~ProgressAwareToolResult~
    + getDefinition() MCPToolDefinition$static
  }

  LighthouseAISDK --> MemoryManager : uses
  LighthouseAISDK --> BatchProcessor : uses
  LighthouseAISDK --> BatchUploadOptions
  LighthouseAISDK --> BatchDownloadOptions
  LighthouseAISDK --> BatchUploadInput
  LighthouseAISDK --> BatchDownloadInput
  LighthouseAISDK --> BatchOperationResult
  LighthouseAISDK --> BatchDownloadFileResult

  LighthouseService ..|> ILighthouseService
  LighthouseService --> LighthouseAISDK : uses

  LighthouseBatchUploadTool --> ILighthouseService : uses
  LighthouseBatchDownloadTool --> ILighthouseService : uses

  BatchOperationResult --> BatchFileResult
  BatchFileResult --> BatchDownloadFileResult
  BatchFileResult --> FileInfo
Loading

File-Level Changes

Change Details Files
Integrate a memory manager into the SDK to support backpressure-aware operations and expose memory state helpers.
  • Instantiate MemoryManager in the SDK constructor with configurable thresholds and cleanup behavior.
  • Forward memory-related events (backpressure start/end, cleanup needed) from MemoryManager through the SDK event emitter.
  • Expose helper methods getMemoryStats and isUnderBackpressure for external monitoring.
  • Ensure MemoryManager is destroyed during SDK cleanup.
packages/sdk-wrapper/src/LighthouseAISDK.ts
Harden single-file download to stream from the Lighthouse gateway with validation, progress tracking, and detailed error handling.
  • Validate CID format and ensure output directory exists before downloading.
  • Use axios streaming from the Lighthouse IPFS gateway with dynamic timeout and progress forwarding to user callbacks.
  • Stream response to disk with robust error handling, cleanup of partial files, and file-size verification.
  • Map low-level errors (network, 404/not found, permissions) to clearer user-facing messages while integrating with existing progress/error handling.
packages/sdk-wrapper/src/LighthouseAISDK.ts
Add batch upload and batch download operations to the SDK with concurrency, retries, and memory backpressure support.
  • Define batchUpload and batchDownload methods that use a BatchProcessor to handle multiple files concurrently with configurable concurrency and maxRetries.
  • Integrate MemoryManager in each per-file operation to track expected memory usage and wait on backpressure before proceeding.
  • Emit detailed lifecycle events for batch operations (start, progress, complete, error, backpressure) and aggregate per-file results into BatchOperationResult.
  • Derive stats such as total/average duration and success rate for each batch operation.
packages/sdk-wrapper/src/LighthouseAISDK.ts
Introduce shared batch operation types for uploads and downloads in the SDK type definitions and re-export them from the package entrypoint.
  • Define BatchUploadOptions, BatchDownloadOptions, BatchUploadInput, BatchDownloadInput to configure and describe per-file batch behavior.
  • Define BatchFileResult and BatchOperationResult to capture per-file and aggregate batch outcomes, including retries and timing.
  • Define BatchDownloadFileResult to standardize batch download metadata (CID, path, size, decrypted flag).
  • Export the new batch-related types from the SDK wrapper index for external consumers.
packages/sdk-wrapper/src/types.ts
packages/sdk-wrapper/src/index.ts
Extend the MCP Lighthouse service to support batch upload and download operations and surface their results and logging.
  • Add optional batchUploadFiles and batchDownloadFiles methods to the ILighthouseService interface with appropriate batch types.
  • Implement batchUploadFiles and batchDownloadFiles in LighthouseService, delegating to SDK batch methods and logging timing and success metrics.
  • On batch upload, persist successful file metadata to storage and cache using the existing StoredFile shape.
apps/mcp-server/src/services/ILighthouseService.ts
apps/mcp-server/src/services/LighthouseService.ts
Expose new MCP tools for batch uploading and downloading files through the MCP server with parameter validation and structured results.
  • Register LighthouseBatchUploadTool and LighthouseBatchDownloadTool in the tools index and factory so they are discoverable by the MCP server.
  • Implement LighthouseBatchUploadTool with schema-validated inputs, file existence/size checks, and mapping of SDK batch results into a concise response payload.
  • Implement LighthouseBatchDownloadTool with CID validation, writable output directory checks, and mapping of SDK batch results into a concise response payload.
  • Ensure both tools respect concurrency limits, continueOnError semantics, and provide execution metadata such as timing and success rates.
apps/mcp-server/src/tools/index.ts
apps/mcp-server/src/tools/LighthouseBatchUploadTool.ts
apps/mcp-server/src/tools/LighthouseBatchDownloadTool.ts
Minor utility and safety improvements.
  • Adjust dataset ID generation to use String.slice instead of deprecated substr.
  • Extend DownloadOptions with decrypt and timeout fields to support richer download semantics.
packages/sdk-wrapper/src/LighthouseAISDK.ts
packages/sdk-wrapper/src/types.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In LighthouseBatchDownloadTool.validateParams, you're importing fs from fs/promises but using fs.constants in fs.access; fs/promises doesn't expose constants, so you should import fs (callbacks) for constants or pull constants from node:fs separately.
  • In batchDownload, the output path is constructed with string interpolation (${outputDir}/${fileName}); consider using path.join to avoid issues on Windows and ensure correct path handling.
  • The new DownloadOptions.timeout field is not wired into downloadFile (you always derive dynamicTimeout from expectedSize); consider allowing an explicit timeout option to override the dynamic calculation so callers can control download timeouts more precisely.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `LighthouseBatchDownloadTool.validateParams`, you're importing `fs` from `fs/promises` but using `fs.constants` in `fs.access`; `fs/promises` doesn't expose `constants`, so you should import `fs` (callbacks) for constants or pull `constants` from `node:fs` separately.
- In `batchDownload`, the output path is constructed with string interpolation (`${outputDir}/${fileName}`); consider using `path.join` to avoid issues on Windows and ensure correct path handling.
- The new `DownloadOptions.timeout` field is not wired into `downloadFile` (you always derive `dynamicTimeout` from `expectedSize`); consider allowing an explicit `timeout` option to override the dynamic calculation so callers can control download timeouts more precisely.

## Individual Comments

### Comment 1
<location> `packages/sdk-wrapper/src/LighthouseAISDK.ts:462-471` </location>
<code_context>
+
+          // Calculate timeout based on expected size (minimum 2 minutes, +30s per 10MB)
+          const expectedSizeMB = (options.expectedSize || 10 * 1024 * 1024) / (1024 * 1024);
+          const dynamicTimeout = Math.max(120000, 120000 + (expectedSizeMB / 10) * 30000);

           // Update progress to downloading phase
           this.progress.updateProgress(operationId, 0, "downloading");

-          // Create progress callback
-          const progressCallback = this.progress.createProgressCallback(operationId);
+          // Download with progress tracking
+          const response = await axios({
+            method: "GET",
+            url: gatewayUrl,
+            responseType: "stream",
+            timeout: dynamicTimeout,
+            headers: {
+              "User-Agent": "LighthouseAISDK/1.0",
</code_context>

<issue_to_address>
**issue (bug_risk):** Download options timeout and decrypt flags are defined but not respected in this implementation.

This divergence between the interface and behavior can lead to unexpected timeouts and unused flags. Please either let `options.timeout` override `dynamicTimeout` when set, and hook `options.decrypt` into the decryption flow, or remove `decrypt` from `DownloadOptions` until it is supported.
</issue_to_address>

### Comment 2
<location> `apps/mcp-server/src/tools/LighthouseBatchDownloadTool.ts:153` </location>
<code_context>
+      }
+
+      try {
+        await fs.access(params.outputDir, fs.constants.W_OK);
+      } catch {
+        // Try to create directory
</code_context>

<issue_to_address>
**issue (bug_risk):** Using fs.constants with the promises API will throw because constants is not available on fs/promises.

Since `fs` here comes from `"fs/promises"`, `constants` is not defined and the access mode will be invalid. Please either import `constants` (or `access`/`mkdir`) from `"fs"` and use `fsConstants.W_OK`, or import `fs` from `"fs"` for constants and `fsPromises` from `"fs/promises"` for the async calls, so output directory validation works correctly.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@bomanaps
Copy link
Collaborator Author

Please can you review and merge if its all good? cc @Patrick-Ehimen

@Patrick-Ehimen
Copy link
Owner

Please can you review and merge if its all good? cc @Patrick-Ehimen

I will later today @bomanaps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants