From fe1b1ac553b649b35e738774507d9e7cff521e07 Mon Sep 17 00:00:00 2001 From: Dustin Townsend Date: Mon, 9 Feb 2026 15:01:03 -0500 Subject: [PATCH] fix(mongodb): prevent stream close race from dropping webhook responses --- packages/mongodb/src/streamer.ts | 40 +++++++++---- packages/mongodb/test/streamer-race.test.ts | 63 +++++++++++++++++++++ 2 files changed, 91 insertions(+), 12 deletions(-) create mode 100644 packages/mongodb/test/streamer-race.test.ts diff --git a/packages/mongodb/src/streamer.ts b/packages/mongodb/src/streamer.ts index 4340fd7..4e5de28 100644 --- a/packages/mongodb/src/streamer.ts +++ b/packages/mongodb/src/streamer.ts @@ -241,6 +241,8 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ // Buffer for chunks that arrive during initial load const bufferedEventChunks: StreamChunk[] = []; let isLoadingFromStorage = true; + let closeRequested = false; + let isClosed = false; // Helper to convert MongoDB Binary to Uint8Array const toUint8Array = (data: Uint8Array | unknown): Uint8Array => { @@ -248,8 +250,24 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ return new Uint8Array((data as any).buffer || data); }; + const closeController = () => { + if (isClosed) { + return; + } + isClosed = true; + cleanup(); + try { + controller.close(); + } catch { + // Ignore if already closed + } + }; + // Handler for new chunks (real-time) const chunkHandler = (chunk: StreamChunk) => { + if (isClosed) { + return; + } // Skip if already delivered if (deliveredChunkIds.has(chunk.chunkId)) { return; @@ -278,12 +296,13 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ // Handler for stream close const closeHandler = () => { - cleanup(); - try { - controller.close(); - } catch { - // Ignore if already closed + // If close arrives while we're still loading persisted chunks, + // defer closure until we flush buffered data. + if (isLoadingFromStorage) { + closeRequested = true; + return; } + closeController(); }; // Cleanup function @@ -307,8 +326,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ // Check for EOF if (chunk.eof) { - cleanup(); - controller.close(); + closeController(); return; } @@ -338,8 +356,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ for (const chunk of bufferedEventChunks) { if (chunk.eof) { - cleanup(); - controller.close(); + closeController(); return; } const data = toUint8Array(chunk.data); @@ -350,9 +367,8 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{ // Check if we already received EOF while loading const lastChunk = existingChunks[existingChunks.length - 1]; - if (lastChunk?.eof) { - cleanup(); - controller.close(); + if (closeRequested || lastChunk?.eof) { + closeController(); } }, diff --git a/packages/mongodb/test/streamer-race.test.ts b/packages/mongodb/test/streamer-race.test.ts new file mode 100644 index 0000000..c774dc3 --- /dev/null +++ b/packages/mongodb/test/streamer-race.test.ts @@ -0,0 +1,63 @@ +import type { Streamer } from '@workflow/world'; +import { beforeAll, describe, expect, test } from 'vitest'; +import { createStreamer } from './setup.js'; + +async function readText(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const chunks: Uint8Array[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + } finally { + reader.releaseLock(); + } + + const bytes = new Uint8Array(chunks.flatMap((chunk) => Array.from(chunk))); + return new TextDecoder().decode(bytes); +} + +function withTimeout(promise: Promise, timeoutMs: number): Promise { + let timeout: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new Error(`Timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + return Promise.race([promise, timeoutPromise]).finally(() => { + if (timeout) { + clearTimeout(timeout); + } + }); +} + +describe('streamer race conditions', () => { + let streamer: Streamer; + + beforeAll(async () => { + ({ streamer } = await createStreamer()); + }); + + test('does not drop chunk when close happens immediately after write', async () => { + const expected = 'Hello from webhook!'; + const encoder = new TextEncoder(); + + for (let i = 0; i < 50; i++) { + const id = `${Date.now()}-${i}`; + const streamName = `test-stream-race-${id}`; + const runId = `wrun_test-${id}`; + + const readable = await streamer.readFromStream(streamName); + const readPromise = withTimeout(readText(readable), 5000); + + await streamer.writeToStream(streamName, runId, encoder.encode(expected)); + await streamer.closeStream(streamName, runId); + + const result = await readPromise; + expect(result).toBe(expected); + } + }); +});