From 6afad1d2bba8a415e4ccd7ab01ab006de1281bd7 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Tue, 15 Apr 2025 14:50:02 +0000 Subject: [PATCH] fix: handle incomplete JSON chunks --- lib/ai/pattern-model.ts | 69 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/lib/ai/pattern-model.ts b/lib/ai/pattern-model.ts index f59b516..49f852e 100644 --- a/lib/ai/pattern-model.ts +++ b/lib/ai/pattern-model.ts @@ -11,6 +11,8 @@ import type { ToolStartEvent, } from '@/lib/ai/types'; +import { extractErrorMessageOrDefault } from '../utils'; + const textDecoder = new TextDecoder(); const SUPPORTED_EVENT_TYPES = [ @@ -80,6 +82,8 @@ export class PatternModel implements LanguageModelV1 { * to AI SDK supported chunks */ private getTransformStream() { + let incompleteJsonFragment = ''; + return new TransformStream({ transform: (chunk, controller) => { if (ArrayBuffer.isView(chunk)) { @@ -90,9 +94,64 @@ export class PatternModel implements LanguageModelV1 { chunk.byteLength, ); const parsedChunk = textDecoder.decode(chunkBuffer).trim(); - const events = parsedChunk - .split('\n') - .map((eventObject) => JSON.parse(eventObject)); + + /** + * Due to an issue with nginx, we have to handle the case where a + * chunk is not a complete JSON object + */ + const jsonLines = parsedChunk.split('\n'); + + const { events, incompleteJson } = jsonLines.reduce<{ + events: PatternStreamingResponseEvent[]; + incompleteJson: string; + isLastLine: boolean; + }>( + (acc, line, index) => { + if (!line.trim()) return acc; + + const jsonToTry = acc.incompleteJson + line; + const isLastLine = index === jsonLines.length - 1; + + try { + const event = JSON.parse(jsonToTry); + return { + // biome-ignore lint:‌ premature optimization + ...acc, + events: [...acc.events, event], + incompleteJson: '', + isLastLine, + }; + } catch (e) { + if (isLastLine) { + return { + // biome-ignore lint:‌ premature optimization + ...acc, + incompleteJson: jsonToTry, + isLastLine, + }; + } + + controller.enqueue({ + type: 'error', + error: `Failed to parse JSON: ${jsonToTry}`, + }); + + return { + // biome-ignore lint:‌ premature optimization + ...acc, + incompleteJson: '', + isLastLine, + }; + } + }, + { + events: [], + incompleteJson: incompleteJsonFragment, + isLastLine: false, + }, + ); + + incompleteJsonFragment = incompleteJson; events.forEach((event: PatternStreamingResponseEvent) => { if (event.type === 'token') { @@ -115,7 +174,8 @@ export class PatternModel implements LanguageModelV1 { } catch (error) { controller.enqueue({ type: 'error', - error: 'Cannot parse chunk due to corrupted data or invalid JSON', + error: + `Cannot parse chunk due to corrupted data or invalid JSON: ${extractErrorMessageOrDefault(error)}`, }); } } else { @@ -126,6 +186,7 @@ export class PatternModel implements LanguageModelV1 { } }, flush: (controller) => { + // Handle any remaining incomplete JSON if present controller.enqueue({ type: 'finish', finishReason: 'stop',