Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions lib/ai/pattern-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import type {
ToolStartEvent,
} from '@/lib/ai/types';

import { extractErrorMessageOrDefault } from '../utils';

const textDecoder = new TextDecoder();

const SUPPORTED_EVENT_TYPES = [
Expand Down Expand Up @@ -80,6 +82,8 @@ export class PatternModel implements LanguageModelV1 {
* to AI SDK supported chunks
*/
private getTransformStream() {
let incompleteJsonFragment = '';

return new TransformStream<string, LanguageModelV1StreamPart>({
transform: (chunk, controller) => {
if (ArrayBuffer.isView(chunk)) {
Expand All @@ -90,9 +94,64 @@ export class PatternModel implements LanguageModelV1 {
chunk.byteLength,
);
const parsedChunk = textDecoder.decode(chunkBuffer).trim();
const events = parsedChunk
.split('\n')
.map((eventObject) => JSON.parse(eventObject));

/**
* Due to an issue with nginx, we have to handle the case where a
* chunk is not a complete JSON object
*/
const jsonLines = parsedChunk.split('\n');

const { events, incompleteJson } = jsonLines.reduce<{
events: PatternStreamingResponseEvent[];
incompleteJson: string;
isLastLine: boolean;
}>(
(acc, line, index) => {
if (!line.trim()) return acc;

const jsonToTry = acc.incompleteJson + line;
const isLastLine = index === jsonLines.length - 1;

try {
const event = JSON.parse(jsonToTry);
return {
// biome-ignore lint:‌ premature optimization
...acc,
events: [...acc.events, event],
incompleteJson: '',
isLastLine,
};
} catch (e) {
if (isLastLine) {
return {
// biome-ignore lint:‌ premature optimization
...acc,
incompleteJson: jsonToTry,
isLastLine,
};
}

controller.enqueue({
type: 'error',
error: `Failed to parse JSON: ${jsonToTry}`,
});

return {
// biome-ignore lint:‌ premature optimization
...acc,
incompleteJson: '',
isLastLine,
};
}
},
{
events: [],
incompleteJson: incompleteJsonFragment,
isLastLine: false,
},
);

incompleteJsonFragment = incompleteJson;

events.forEach((event: PatternStreamingResponseEvent) => {
if (event.type === 'token') {
Expand All @@ -115,7 +174,8 @@ export class PatternModel implements LanguageModelV1 {
} catch (error) {
controller.enqueue({
type: 'error',
error: 'Cannot parse chunk due to corrupted data or invalid JSON',
error:
`Cannot parse chunk due to corrupted data or invalid JSON: ${extractErrorMessageOrDefault(error)}`,
});
}
} else {
Expand All @@ -126,6 +186,7 @@ export class PatternModel implements LanguageModelV1 {
}
},
flush: (controller) => {
// Handle any remaining incomplete JSON if present
controller.enqueue({
type: 'finish',
finishReason: 'stop',
Expand Down