Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions packages/mongodb/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,33 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
// Buffer for chunks that arrive during initial load
const bufferedEventChunks: StreamChunk[] = [];
let isLoadingFromStorage = true;
let closeRequested = false;
let isClosed = false;

// Helper to convert MongoDB Binary to Uint8Array
const toUint8Array = (data: Uint8Array | unknown): Uint8Array => {
if (data instanceof Uint8Array) return data;
return new Uint8Array((data as any).buffer || data);
};

const closeController = () => {
if (isClosed) {
return;
}
isClosed = true;
cleanup();
try {
controller.close();
} catch {
// Ignore if already closed
}
};

// Handler for new chunks (real-time)
const chunkHandler = (chunk: StreamChunk) => {
if (isClosed) {
return;
}
// Skip if already delivered
if (deliveredChunkIds.has(chunk.chunkId)) {
return;
Expand Down Expand Up @@ -278,12 +296,13 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{

// Handler for stream close
const closeHandler = () => {
cleanup();
try {
controller.close();
} catch {
// Ignore if already closed
// If close arrives while we're still loading persisted chunks,
// defer closure until we flush buffered data.
if (isLoadingFromStorage) {
closeRequested = true;
return;
}
closeController();
};

// Cleanup function
Expand All @@ -307,8 +326,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{

// Check for EOF
if (chunk.eof) {
cleanup();
controller.close();
closeController();
return;
}

Expand Down Expand Up @@ -338,8 +356,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{

for (const chunk of bufferedEventChunks) {
if (chunk.eof) {
cleanup();
controller.close();
closeController();
return;
}
const data = toUint8Array(chunk.data);
Expand All @@ -350,9 +367,8 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{

// Check if we already received EOF while loading
const lastChunk = existingChunks[existingChunks.length - 1];
if (lastChunk?.eof) {
cleanup();
controller.close();
if (closeRequested || lastChunk?.eof) {
closeController();
}
},

Expand Down
63 changes: 63 additions & 0 deletions packages/mongodb/test/streamer-race.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { Streamer } from '@workflow/world';
import { beforeAll, describe, expect, test } from 'vitest';
import { createStreamer } from './setup.js';

async function readText(stream: ReadableStream<Uint8Array>): Promise<string> {
const reader = stream.getReader();
const chunks: Uint8Array[] = [];

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
} finally {
reader.releaseLock();
}

const bytes = new Uint8Array(chunks.flatMap((chunk) => Array.from(chunk)));
return new TextDecoder().decode(bytes);
}

function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
let timeout: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new Error(`Timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
return Promise.race([promise, timeoutPromise]).finally(() => {
if (timeout) {
clearTimeout(timeout);
}
});
}

describe('streamer race conditions', () => {
let streamer: Streamer;

beforeAll(async () => {
({ streamer } = await createStreamer());
});

test('does not drop chunk when close happens immediately after write', async () => {
const expected = 'Hello from webhook!';
const encoder = new TextEncoder();

for (let i = 0; i < 50; i++) {
const id = `${Date.now()}-${i}`;
const streamName = `test-stream-race-${id}`;
const runId = `wrun_test-${id}`;

const readable = await streamer.readFromStream(streamName);
const readPromise = withTimeout(readText(readable), 5000);

await streamer.writeToStream(streamName, runId, encoder.encode(expected));
await streamer.closeStream(streamName, runId);

const result = await readPromise;
expect(result).toBe(expected);
}
});
});
Loading