-
-
Notifications
You must be signed in to change notification settings - Fork 937
fix(fair-queue): prevent unbounded memory growth by cleaning up queue descriptor and cooloff state cache #2816
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… descriptor and cooloff state cache
|
WalkthroughThis pull request spans three files with complementary changes to the batch processing and fair-queue systems. The first change refactors the BatchQueue.handleMessage flow to reduce nesting and consolidate control paths, with earlier metadata retrieval and adjusted telemetry instrumentation timing. The second change extends the FairQueue with new public methods to expose cache sizes and adds cleanup logic for in-memory caches (queueDescriptorCache and queueCooloffStates) when queues become empty across multiple code paths. The third change introduces an optional getDynamicAttributes callback to the BatchedSpanManagerOptions interface, allowing dynamic span attributes to be computed at span creation time, which FairQueue now uses to report cache sizes during telemetry. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes RationaleThe diff involves three interdependent files with mixed complexity. File 1 contains control-flow refactoring with consolidated error handling and telemetry instrumentation changes. File 2 introduces new public methods and replicates cache cleanup logic across three similar code paths (claim results, message completion, and direct processing), requiring consistent verification across locations. File 3 adds interface-level callback support. The heterogeneous nature of changes—refactoring, new public methods, cross-file integration, and logic additions—requires separate reasoning per file, though the repetitive cache cleanup pattern in File 2 reduces some complexity. The integration dependency between Files 2 and 3 adds moderate scrutiny overhead. Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Repository UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🧰 Additional context used📓 Path-based instructions (3)**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.{ts,tsx,js,jsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.{js,ts,jsx,tsx,json,md,css,scss}📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧬 Code graph analysis (3)packages/redis-worker/src/fair-queue/telemetry.ts (1)
internal-packages/run-engine/src/batch-queue/index.ts (2)
packages/redis-worker/src/fair-queue/index.ts (3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
🔇 Additional comments (7)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Review CompleteYour review story is ready! Comment !reviewfast on this PR to re-generate the story. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)
588-709: Potential undefinedprocessedCountifrecordFailurethrows in catch block.If
recordFailureat line 685-694 throws an exception,processedCountremains uninitialized, causing a runtime error at line 709 when accessingprocessedCount. Consider wrapping the catch block'srecordFailurein its own try/catch or using a default value.🔎 Suggested fix
} catch (error) { span?.setAttribute("batch.result", "unexpected_error"); span?.setAttribute("batch.error", error instanceof Error ? error.message : String(error)); // Unexpected error during processing // For offloaded payloads, payload is an R2 path; for inline payloads, store full payload - const payloadStr = await this.#startSpan( - "BatchQueue.serializePayload", - async (innerSpan) => { - const str = - typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); - innerSpan?.setAttribute("batch.payloadSize", str.length); - return str; - } - ); - - processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => { - return this.completionTracker.recordFailure(batchId, { - index: itemIndex, - taskIdentifier: item.task, - payload: payloadStr, - options: item.options, - error: error instanceof Error ? error.message : String(error), - errorCode: "UNEXPECTED_ERROR", + try { + const payloadStr = await this.#startSpan( + "BatchQueue.serializePayload", + async (innerSpan) => { + const str = + typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); + innerSpan?.setAttribute("batch.payloadSize", str.length); + return str; + } + ); + + processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => { + return this.completionTracker.recordFailure(batchId, { + index: itemIndex, + taskIdentifier: item.task, + payload: payloadStr, + options: item.options, + error: error instanceof Error ? error.message : String(error), + errorCode: "UNEXPECTED_ERROR", + }); }); - }); + } catch (recordError) { + this.logger.error("Failed to record failure in completion tracker", { + batchId, + itemIndex, + error: recordError instanceof Error ? recordError.message : String(recordError), + }); + // Re-throw to ensure message is not incorrectly completed + throw recordError; + }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
internal-packages/run-engine/src/batch-queue/index.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/telemetry.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
packages/redis-worker/src/fair-queue/telemetry.tsinternal-packages/run-engine/src/batch-queue/index.tspackages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
packages/redis-worker/src/fair-queue/telemetry.tsinternal-packages/run-engine/src/batch-queue/index.tspackages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
packages/redis-worker/src/fair-queue/telemetry.tsinternal-packages/run-engine/src/batch-queue/index.tspackages/redis-worker/src/fair-queue/index.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
internal-packages/tracing/src/index.ts (1)
Attributes(15-15)
internal-packages/run-engine/src/batch-queue/index.ts (2)
packages/core/src/v3/apiClient/index.ts (1)
batchId(419-537)internal-packages/run-engine/src/engine/systems/batchSystem.ts (1)
batchId(39-137)
packages/redis-worker/src/fair-queue/index.ts (3)
packages/redis-worker/src/fair-queue/keyProducer.ts (2)
masterQueueKey(29-31)queueKey(37-39)packages/redis-worker/src/fair-queue/visibility.ts (1)
queueId(366-368)packages/redis-worker/src/fair-queue/masterQueue.ts (1)
queueId(195-197)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
- GitHub Check: Cursor Bugbot
- GitHub Check: typecheck / typecheck
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
internal-packages/run-engine/src/batch-queue/index.ts (1)
540-730: Well-structured refactoring of message handling flow.The consolidation of the
#handleMessageflow with earlier metadata retrieval, unified try/catch block, and improved telemetry instrumentation is a clean improvement. The ordering of operations (record success/failure before completing message) is correctly documented and implemented.packages/redis-worker/src/fair-queue/telemetry.ts (2)
471-494: Clean addition of dynamic attributes callback.The optional
getDynamicAttributescallback is well-integrated into the existingBatchedSpanManagerOptionsinterface. The type signature() => Attributesis appropriate for a synchronous callback that returns span attributes.
592-609: Good attribute precedence ordering.The spread order
...dynamicAttributes, ...attributescorrectly allows explicitly passed attributes to override dynamic ones, providing flexibility while maintaining the default dynamic attribute behavior.packages/redis-worker/src/fair-queue/index.ts (4)
166-169: Good integration of cache size telemetry.Exposing cache sizes as dynamic span attributes enables monitoring of the in-memory cache growth that this PR aims to address. This provides operational visibility without adding significant overhead.
604-631: Well-documented cache size monitoring API.The three methods provide flexible access to cache sizes for monitoring purposes. The JSDoc comments clearly explain the purpose and lifecycle of each cache, which aids in understanding and debugging.
933-938: Consistent cache cleanup across all empty-queue code paths.The cache cleanup is correctly applied in all three locations where queues become empty:
#claimAndPushToWorkerQueue(two-stage processing)#processOneMessage(direct processing)#completeMessage(after processing last message)The condition
removed === 1ensures cleanup only happens when the queue was actually removed from the master queue, preventing premature cleanup in race conditions.Also applies to: 1274-1279, 1486-1491
1746-1748: Note:#resetCooloffprovides additional cleanup path.The
#resetCooloffmethod at line 1747 deletes fromqueueCooloffStateswhen processing succeeds, which means cooloff entries are cleaned up either on success or when the queue becomes empty. This dual cleanup path ensures cooloff state doesn't accumulate.
No description provided.