diff --git a/.changeset/silent-ears-agree.md b/.changeset/silent-ears-agree.md new file mode 100644 index 0000000000..ba9e6ec3db --- /dev/null +++ b/.changeset/silent-ears-agree.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": patch +--- + +Buffer SSE messages until up-to-date message to avoid duplicate operations from being published on the shape stream. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 5e028702fe..60033fc607 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -398,22 +398,12 @@ export class ShapeStream = Row> backOffOpts ) - this.#fetchClient = createFetchWithConsumedMessages( - createFetchWithResponseHeadersCheck( - createFetchWithChunkBuffer(fetchWithBackoffClient) - ) - ) - - const sseFetchWithBackoffClient = createFetchWithBackoff( - baseFetchClient, - backOffOpts, - true - ) - this.#sseFetchClient = createFetchWithResponseHeadersCheck( - createFetchWithChunkBuffer(sseFetchWithBackoffClient) + createFetchWithChunkBuffer(fetchWithBackoffClient) ) + this.#fetchClient = createFetchWithConsumedMessages(this.#sseFetchClient) + this.#subscribeToVisibilityChanges() } @@ -498,7 +488,7 @@ export class ShapeStream = Row> fetchUrl, requestAbortController, headers: requestHeaders, - resumingFromPause: true, + resumingFromPause, }) } catch (e) { // Handle abort error triggered by refresh @@ -671,9 +661,7 @@ export class ShapeStream = Row> } } - async #onMessages(messages: string, schema: Schema, isSseMessage = false) { - const batch = this.#messageParser.parse(messages, schema) - + async #onMessages(batch: Array>, isSseMessage = false) { // Update isUpToDate if (batch.length > 0) { const lastMessage = batch[batch.length - 1] @@ -738,8 +726,9 @@ export class ShapeStream = Row> const schema = this.#schema! // we know that it is not undefined because it is set by `this.#onInitialResponse` const res = await response.text() const messages = res || `[]` + const batch = this.#messageParser.parse>>(messages, schema) - await this.#onMessages(messages, schema) + await this.#onMessages(batch) } async #requestShapeSSE(opts: { @@ -750,6 +739,7 @@ export class ShapeStream = Row> const { fetchUrl, requestAbortController, headers } = opts const fetch = this.#sseFetchClient try { + let buffer: Array> = [] await fetchEventSource(fetchUrl.toString(), { headers, fetch, @@ -759,11 +749,20 @@ export class ShapeStream = Row> }, onmessage: (event: EventSourceMessage) => { if (event.data) { - // Process the SSE message - // The event.data is a single JSON object, so we wrap it in an array - const messages = `[${event.data}]` + // event.data is a single JSON object const schema = this.#schema! // we know that it is not undefined because it is set in onopen when we call this.#onInitialResponse - this.#onMessages(messages, schema, true) + const message = this.#messageParser.parse>( + event.data, + schema + ) + buffer.push(message) + + if (isUpToDateMessage(message)) { + // Flush the buffer on up-to-date message. + // Ensures that we only process complete batches of operations. + this.#onMessages(buffer, true) + buffer = [] + } } }, onerror: (error: Error) => { diff --git a/packages/typescript-client/src/fetch.ts b/packages/typescript-client/src/fetch.ts index 9bd1bb211a..b13d637261 100644 --- a/packages/typescript-client/src/fetch.ts +++ b/packages/typescript-client/src/fetch.ts @@ -38,8 +38,7 @@ export const BackoffDefaults = { export function createFetchWithBackoff( fetchClient: typeof fetch, - backoffOptions: BackoffOptions = BackoffDefaults, - sseMode: boolean = false + backoffOptions: BackoffOptions = BackoffDefaults ): typeof fetch { const { initialDelay, @@ -67,12 +66,6 @@ export function createFetchWithBackoff( if (result.ok) return result const err = await FetchError.fromResponse(result, url.toString()) - if (err.status === 409 && sseMode) { - // The json body is [ { headers: { control: 'must-refetch' } } ] in normal mode - // and is { headers: { control: 'must-refetch' } } in SSE mode - // So in SSE mode we need to wrap it in an array - err.json = [err.json] - } throw err } catch (e) { diff --git a/packages/typescript-client/src/helpers.ts b/packages/typescript-client/src/helpers.ts index 1c1aa21f6a..a672602255 100644 --- a/packages/typescript-client/src/helpers.ts +++ b/packages/typescript-client/src/helpers.ts @@ -58,8 +58,9 @@ export function isUpToDateMessage = Row>( * If we are not in SSE mode this function will return undefined. */ export function getOffset(message: ControlMessage): Offset | undefined { - const lsn = Number(message.headers.global_last_seen_lsn) - if (lsn && !isNaN(lsn)) { - return `${lsn}_0` + const lsn = message.headers.global_last_seen_lsn + if (!lsn) { + return } + return `${lsn}_0` as Offset } diff --git a/packages/typescript-client/src/parser.ts b/packages/typescript-client/src/parser.ts index efa8d85055..27daa29500 100644 --- a/packages/typescript-client/src/parser.ts +++ b/packages/typescript-client/src/parser.ts @@ -1,4 +1,4 @@ -import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types' +import { ColumnInfo, GetExtensions, Row, Schema, Value } from './types' import { ParserNullValueError } from './error' type NullToken = null | `NULL` @@ -98,7 +98,7 @@ export class MessageParser> { this.parser = { ...defaultParser, ...parser } } - parse(messages: string, schema: Schema): Message[] { + parse(messages: string, schema: Schema): Result { return JSON.parse(messages, (key, value) => { // typeof value === `object` && value !== null // is needed because there could be a column named `value` @@ -117,7 +117,7 @@ export class MessageParser> { }) } return value - }) as Message[] + }) as Result } // Parses the message values using the provided parser based on the schema information diff --git a/packages/typescript-client/src/types.ts b/packages/typescript-client/src/types.ts index ce476c1af6..396928ef84 100644 --- a/packages/typescript-client/src/types.ts +++ b/packages/typescript-client/src/types.ts @@ -17,7 +17,7 @@ export type Row = Record> export type GetExtensions> = T extends Row ? Extensions : never -export type Offset = `-1` | `${number}_${number}` +export type Offset = `-1` | `${number}_${number}` | `${bigint}_${number}` interface Header { [key: Exclude]: Value diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index 039702803c..fef8d5d686 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -2,7 +2,13 @@ import { describe, expect, inject, vi } from 'vitest' import { v4 as uuidv4 } from 'uuid' import { setTimeout as sleep } from 'node:timers/promises' import { testWithIssuesTable as it } from './support/test-context' -import { ShapeStream, Shape, FetchError } from '../src' +import { + ShapeStream, + Shape, + FetchError, + isChangeMessage, + isControlMessage, +} from '../src' import { Message, Row, ChangeMessage } from '../src/types' import { MissingHeadersError } from '../src/error' import { resolveValue } from '../src' @@ -1099,6 +1105,170 @@ describe.for(fetchAndSse)( } ) +describe(`Shape - SSE`, () => { + it(`should handle SSE messages in batches`, async ({ + issuesTableUrl, + insertIssues, + aborter, + }) => { + // Create some initial data + const [id1] = await insertIssues({ title: `initial title` }) + + // Track if we've already thrown an error to ensure we only throw once + let hasThrownError = false + + let resolveRefresh: () => void = () => {} + const refreshPromise = new Promise((resolve) => { + resolveRefresh = resolve + }) + + // Custom fetch client that intercepts SSE messages + const customFetchClient = async ( + input: string | URL | Request, + init?: RequestInit + ) => { + const url = input.toString() + + // Only intercept SSE requests (those with experimental_live_sse=true) + if (url.includes(`experimental_live_sse=true`)) { + // Create a custom response that intercepts the SSE stream + const response = await fetch(input, init) + + // Create a custom readable stream that intercepts messages + const originalBody = response.body + if (!originalBody) { + throw new Error(`No response body`) + } + + const filteredStream = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough( + createSSEFilterStream((event) => { + const data = event.slice(6) // remove 'data: ' prefix + + let message + try { + message = JSON.parse(data) + } catch (parseError) { + // Ignore JSON parse errors for non-JSON lines + } + + // Check if this is the first up-to-date message + if ( + message.headers?.control === `up-to-date` && + !hasThrownError + ) { + hasThrownError = true + + // Force a refresh to interrupt the stream + shapeStream.forceDisconnectAndRefresh().then(resolveRefresh) + + // Filter it out + return false + } + + return true + }) + ) + .pipeThrough(new TextEncoderStream()) + + // Return a new response with our custom stream + return new Response(filteredStream, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }) + } + + // For non-SSE requests, just forward to the real fetch + return fetch(input, init) + } + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { table: issuesTableUrl }, + signal: aborter.signal, + experimentalLiveSse: true, + fetchClient: customFetchClient, + }) + + // Track received messages to ensure no duplicates + const receivedRows: Array = [] + const messageIds = new Set() + + let resolveInitialSync: () => void = () => {} + const initialSyncComplete = new Promise((resolve) => { + resolveInitialSync = resolve + }) + + // Subscribe to the shape stream + const unsubscribe = shapeStream.subscribe((messages) => { + for (const message of messages) { + if (isChangeMessage(message)) { + // Check for duplicates + const rowId = message.key + if (messageIds.has(rowId)) { + throw new Error(`Duplicate message received for id: ${rowId}`) + } + messageIds.add(rowId) + receivedRows.push(message.value) + } + + if ( + isControlMessage(message) && + message.headers.control === `up-to-date` + ) { + resolveInitialSync() + } + } + }) + + // Wait for initial sync + await initialSyncComplete + + // Insert another issue to trigger an update + const [id2] = await insertIssues({ title: `second title` }) + + // Wait for the update to be processed + await vi.waitFor( + () => { + expect(receivedRows.length).toBe(2) + }, + { timeout: 5000 } + ) + + // Verify we received both messages without duplicates + expect(receivedRows).toEqual([ + { + id: id1, + title: `initial title`, + priority: 10, + }, + { + id: id2, + title: `second title`, + priority: 10, + }, + ]) + + // Check that we interrupted the stream + expect(hasThrownError).toBe(true) + + // Await the refresh to complete + await refreshPromise + + // Verify that there are no duplicates after the refresh + expect(receivedRows.length).toBe(2) + expect(messageIds.size).toBe(2) + + // Verify the stream is connected and up to date + expect(shapeStream.isConnected()).toBe(true) + expect(shapeStream.isUpToDate).toBe(true) + + unsubscribe() + }) +}) + function waitForFetch(stream: ShapeStream): Promise { let unsub = () => {} return new Promise((resolve) => { @@ -1108,3 +1278,33 @@ function waitForFetch(stream: ShapeStream): Promise { ) }).finally(() => unsub()) } + +// Simple SSE parser that buffers lines until an event is complete +// And filters out events that don't pass the filter function +function createSSEFilterStream(filterFn: (event: string) => boolean) { + let buffer = `` + return new TransformStream({ + transform(chunk, controller) { + buffer += chunk + const lines = buffer.split(`\n`) + buffer = lines.pop() || `` // Keep the last incomplete line + let currentEvent = `` + for (const line of lines) { + currentEvent += line + `\n` + if (line.trim() === ``) { + // End of event + if (filterFn(currentEvent)) { + controller.enqueue(currentEvent) + } + currentEvent = `` + } + } + }, + flush(controller) { + // Emit any remaining buffered event + if (buffer && filterFn(buffer)) { + controller.enqueue(buffer) + } + }, + }) +}