From 17b51e89d9ce669bd76fb07e04b3a0ae0d50f0b5 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 6 Apr 2025 20:28:23 +0100 Subject: [PATCH 1/3] WIP feat(client): experimental support for streaming SSE in live mode --- packages/typescript-client/src/client.ts | 148 +++++++++++++++++- packages/typescript-client/src/constants.ts | 1 + .../typescript-client/test/client.test.ts | 2 +- .../typescript-client/tsconfig.build.json | 9 +- packages/typescript-client/tsconfig.json | 4 +- 5 files changed, 159 insertions(+), 5 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 58e5b68268..7724947d8e 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -39,6 +39,7 @@ import { TABLE_QUERY_PARAM, REPLICA_PARAM, FORCE_DISCONNECT_AND_REFRESH, + EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, } from './constants' const RESERVED_PARAMS: Set = new Set([ @@ -244,6 +245,11 @@ export interface ShapeStreamOptions { */ subscribe?: boolean + /** + * Experimental support for Server-Sent Events (SSE) for live updates. + */ + experimentalLiveSse?: boolean + signal?: AbortSignal fetchClient?: typeof fetch backoffOptions?: BackoffOptions @@ -281,8 +287,9 @@ export interface ShapeStreamInterface = Row> { } /** - * Reads updates to a shape from Electric using HTTP requests and long polling. Notifies subscribers - * when new messages come in. Doesn't maintain any history of the + * Reads updates to a shape from Electric using HTTP requests and long polling or + * Server-Sent Events (SSE). + * Notifies subscribers when new messages come in. Doesn't maintain any history of the * log but does keep track of the offset position and is the best way * to consume the HTTP `GET /v1/shape` api. * @@ -297,6 +304,14 @@ export interface ShapeStreamInterface = Row> { * }) * ``` * + * To use Server-Sent Events (SSE) for real-time updates: + * ``` + * const stream = new ShapeStream({ + * url: `http://localhost:3000/v1/shape`, + * liveMode: 'sse' + * }) + * ``` + * * To abort the stream, abort the `signal` * passed in via the `ShapeStreamOptions`. * ``` @@ -484,6 +499,26 @@ export class ShapeStream = Row> } } + // If using SSE mode we handle the connection differently using the + // this.#connectSSE method which wraps the EventSource API. + if (this.#isUpToDate && this.options.experimentalLiveSse) { + fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`) + try { + await this.#connectSSE(fetchUrl.toString()) + } catch (error) { + if (error instanceof SSEConnectionAborted) { + break + } + this.#sendErrorToSubscribers( + error instanceof Error ? error : new Error(String(error)) + ) + throw error + } + // TODO: What should we do here? Is this the behaviour we want? + // Skip the regular fetch and continue the loop to reconnect if needed + continue + } + let response!: Response try { response = await this.#fetchClient(fetchUrl.toString(), { @@ -714,6 +749,108 @@ export class ShapeStream = Row> this.#connected = false this.#schema = undefined } + + /** + * Connects to the server using Server-Sent Events. + * Returns a promise that resolves when the connection is closed. + */ + async #connectSSE(url: string): Promise { + return new Promise((resolve, reject) => { + try { + if (!this.#requestAbortController) { + reject( + new Error( + `Request abort controller is not set - this should never happen` + ) + ) + return + } + + if (this.#requestAbortController.signal.aborted) { + reject( + new SSEConnectionAborted( + `Connection aborted before SSE connection established` + ) + ) + return + } + + // Create an EventSource instance + const eventSource = new EventSource(url) + + // Set up event handlers + eventSource.onopen = () => { + this.#connected = true + } + + eventSource.onmessage = async (event: MessageEvent) => { + try { + if (event.data) { + // Process the SSE message + // Provide an empty schema object if schema is undefined, which it + // should not be as we only get to SSE mode after being in normal mode + // and getting a schema from a header then. + // The event.data is a single JSON object, so we wrap it in an array + // to be consistent with the way we parse the response from the HTTP API. + // TODO: Is this needed? + const batch = this.#messageParser.parse( + `[${event.data}]`, + this.#schema || {} + ) + + if (batch.length > 0) { + const lastMessage = batch[batch.length - 1] + if (isUpToDateMessage(lastMessage)) { + const upToDateMsg = lastMessage as typeof lastMessage & { + headers: { global_last_seen_lsn: string } + } + this.#lastSyncedAt = Date.now() + this.#isUpToDate = true + this.#lastOffset = + `${upToDateMsg.headers.global_last_seen_lsn}_0` as Offset + // TODO: we also need the cache buster `cursor` value + } + + await this.#publish(batch) + } + } + } catch (error) { + // Handle parsing errors + this.#sendErrorToSubscribers( + error instanceof Error ? error : new Error(String(error)) + ) + } + } + + eventSource.onerror = (_error: Event) => { + // Connection was closed or errored + // EventSource would normally automatically reconnect but want to close the + // connection and reconnect on the next outer loop iteration with the new + // url and offset. + // TODO: It may be that some errors we should elevate to the user + eventSource.close() + resolve() + } + + // Listen for abort signals + const abortHandler = () => { + eventSource.close() + reject(new SSEConnectionAborted(`SSE connection aborted`)) + } + + this.#requestAbortController.signal.addEventListener( + `abort`, + abortHandler, + { once: true } + ) + } catch (error) { + this.#sendErrorToSubscribers( + error instanceof Error ? error : new Error(String(error)) + ) + reject(error) + } + }) + } } /** @@ -782,3 +919,10 @@ function convertWhereParamsToObj( } return allPgParams } + +class SSEConnectionAborted extends Error { + constructor(message: string) { + super(message) + this.name = `SSEConnectionAborted` + } +} diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index 77d381efff..2995ecd513 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -12,4 +12,5 @@ export const TABLE_QUERY_PARAM = `table` export const WHERE_QUERY_PARAM = `where` export const REPLICA_PARAM = `replica` export const WHERE_PARAMS_PARAM = `params` +export const EXPERIMENTAL_LIVE_SSE_QUERY_PARAM = `experimental_live_sse` export const FORCE_DISCONNECT_AND_REFRESH = `force-disconnect-and-refresh` diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index 0d927baa7b..a200d0c1f0 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -391,7 +391,7 @@ describe(`Shape`, () => { todo: `fail`, }, fetchClient: async (input, _init) => { - const url = new URL(input) + const url = new URL(input instanceof Request ? input.url : input) if (url.searchParams.get(`todo`) === `fail`) { return new Response(undefined, { status: 401, diff --git a/packages/typescript-client/tsconfig.build.json b/packages/typescript-client/tsconfig.build.json index c147f4334f..8cebc9df84 100644 --- a/packages/typescript-client/tsconfig.build.json +++ b/packages/typescript-client/tsconfig.build.json @@ -1,5 +1,12 @@ { "extends": "../../tsconfig.build.json", "include": ["src/**/*"], - "exclude": ["node_modules", "tests", "dist"] + "exclude": ["node_modules", "tests", "dist"], + "compilerOptions": { + "lib": [ + "ESNext", + "DOM", + "dom.iterable" + ] + } } diff --git a/packages/typescript-client/tsconfig.json b/packages/typescript-client/tsconfig.json index 87d0bec14c..d9139e486a 100644 --- a/packages/typescript-client/tsconfig.json +++ b/packages/typescript-client/tsconfig.json @@ -14,7 +14,9 @@ /* Language and Environment */ "target": "es2016" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */, "lib": [ - "ESNext" + "ESNext", + "DOM", + "dom.iterable" ] /* Specify a set of bundled library declaration files that describe the target runtime environment. */, "jsx": "preserve" /* Specify what JSX code is generated. */, // "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */ From 50d0f29f70b517eae6f6f3b1d1c63079124aa5ee Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 6 Apr 2025 20:55:54 +0100 Subject: [PATCH 2/3] Fix type in test --- packages/typescript-client/test/integration.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index cd29ec9062..5729bedb69 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -64,7 +64,7 @@ describe(`HTTP Sync`, () => { }) => { const urlsRequested: URL[] = [] const fetchWrapper = (...args: Parameters) => { - const url = new URL(args[0]) + const url = new URL(args[0] instanceof Request ? args[0].url : args[0]) urlsRequested.push(url) return fetch(...args) } From 572a93587cda94745a8b7865d9eb515e7035ca97 Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 9 Jun 2025 15:02:37 +0200 Subject: [PATCH 3/3] feat: support for Server-Sent Events (SSE) (#2776) This is a follow up PR on https://github.com/electric-sql/electric/pull/2546 and https://github.com/electric-sql/electric/pull/2544. It solves a bug related with 409s (must refetch) in SSE mode and it replaces the EventSource browser API by the [fetch-event-source](https://github.com/Azure/fetch-event-source) library. I refactored the `ShapeStream.#start` method which was becoming very big and complex. To this end, i split the logic into helper methods that handle the different parts that need to happen (building the shape URL, making the request, parsing the response headers, handling the response body, etc.). I had to patch the [fetch-event-source](https://github.com/Azure/fetch-event-source) library because it relies on browser-specific features such as `document` and `window` (cf. https://github.com/Azure/fetch-event-source/pull/41). But we want our client to also work in server-side JS environments. I also had to patch the `fetch-event-source` library because it does not abort the fetch when you pass an already aborted signal. A complete description of the bug and the fix can be found here: https://github.com/Azure/fetch-event-source/issues/98. --- package.json | 5 + .../sync-service/lib/electric/shapes/api.ex | 33 +- .../lib/electric/shapes/api/response.ex | 2 +- packages/typescript-client/package.json | 4 +- packages/typescript-client/src/client.ts | 559 +++--- packages/typescript-client/src/fetch.ts | 14 +- packages/typescript-client/src/helpers.ts | 14 +- packages/typescript-client/src/types.ts | 5 +- .../test/__snapshots__/client.test.ts.snap | 4 +- .../typescript-client/test/client.test.ts | 1736 +++++++++-------- .../test/integration.test.ts | 1065 +++++----- patches/@microsoft__fetch-event-source.patch | 124 ++ pnpm-lock.yaml | 14 + 13 files changed, 1958 insertions(+), 1621 deletions(-) create mode 100644 patches/@microsoft__fetch-event-source.patch diff --git a/package.json b/package.json index aff3bfbc53..e7012119ce 100644 --- a/package.json +++ b/package.json @@ -20,5 +20,10 @@ }, "devDependencies": { "glob": "^10.3.10" + }, + "pnpm": { + "patchedDependencies": { + "@microsoft/fetch-event-source": "patches/@microsoft__fetch-event-source.patch" + } } } diff --git a/packages/sync-service/lib/electric/shapes/api.ex b/packages/sync-service/lib/electric/shapes/api.ex index 3656c6d3ad..2232098c7f 100644 --- a/packages/sync-service/lib/electric/shapes/api.ex +++ b/packages/sync-service/lib/electric/shapes/api.ex @@ -317,8 +317,18 @@ defmodule Electric.Shapes.Api do # TODO: discuss returning a 307 redirect rather than a 409, the client # will have to detect this and throw out old data + + # In SSE mode we send the must refetch object as an event + # instead of a singleton array containing that object + must_refetch = + if request.params.experimental_live_sse do + hd(@must_refetch) + else + @must_refetch + end + {:error, - Response.error(request, @must_refetch, + Response.error(request, must_refetch, handle: active_shape_handle, status: 409 )} @@ -610,10 +620,9 @@ defmodule Electric.Shapes.Api do last_message_time: last_message_time, request: %{ - api: - %{ - keepalive_interval: keepalive_interval - } = api, + api: %{ + keepalive_interval: keepalive_interval + }, handle: shape_handle, new_changes_ref: ref } = request, @@ -664,7 +673,7 @@ defmodule Electric.Shapes.Api do {^ref, :shape_rotation} -> must_refetch = %{headers: %{control: "must-refetch"}} - message = encode_message(api, must_refetch) + message = encode_message(request, must_refetch) {message, %{state | mode: :done}} @@ -793,11 +802,19 @@ defmodule Electric.Shapes.Api do encode(api, :log, stream) end - @spec encode_message(Api.t() | Request.t(), term()) :: Enum.t() - def encode_message(%Api{} = api, message) do + # Error messages are encoded normally, even when using SSE + # because they are returned on the original fetch request + # with a status code that is not 2xx. + @spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t() + def encode_error_message(%Api{} = api, message) do + encode(api, :message, message) + end + + def encode_error_message(%Request{api: api}, message) do encode(api, :message, message) end + @spec encode_message(Request.t(), term()) :: Enum.t() def encode_message( %Request{api: api, params: %{live: true, experimental_live_sse: true}}, message diff --git a/packages/sync-service/lib/electric/shapes/api/response.ex b/packages/sync-service/lib/electric/shapes/api/response.ex index 107d15c5a3..c856dc9b9d 100644 --- a/packages/sync-service/lib/electric/shapes/api/response.ex +++ b/packages/sync-service/lib/electric/shapes/api/response.ex @@ -86,7 +86,7 @@ defmodule Electric.Shapes.Api.Response do message end - Api.encode_message(api_or_request, body) + Api.encode_error_message(api_or_request, body) end @spec send(Plug.Conn.t(), t()) :: Plug.Conn.t() diff --git a/packages/typescript-client/package.json b/packages/typescript-client/package.json index c117affcde..47bf147ecb 100644 --- a/packages/typescript-client/package.json +++ b/packages/typescript-client/package.json @@ -6,7 +6,9 @@ "bugs": { "url": "https://github.com/electric-sql/electric/issues" }, - "dependencies": {}, + "dependencies": { + "@microsoft/fetch-event-source": "^2.0.1" + }, "devDependencies": { "@types/pg": "^8.11.6", "@types/uuid": "^10.0.0", diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 7724947d8e..817156865e 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -7,7 +7,7 @@ import { GetExtensions, } from './types' import { MessageParser, Parser } from './parser' -import { isUpToDateMessage } from './helpers' +import { getOffset, isUpToDateMessage } from './helpers' import { FetchError, FetchBackoffAbortError, @@ -41,6 +41,10 @@ import { FORCE_DISCONNECT_AND_REFRESH, EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, } from './constants' +import { + EventSourceMessage, + fetchEventSource, +} from '@microsoft/fetch-event-source' const RESERVED_PARAMS: Set = new Set([ LIVE_CACHE_BUSTER_QUERY_PARAM, @@ -308,7 +312,7 @@ export interface ShapeStreamInterface = Row> { * ``` * const stream = new ShapeStream({ * url: `http://localhost:3000/v1/shape`, - * liveMode: 'sse' + * experimentalLiveSse: true * }) * ``` * @@ -338,6 +342,7 @@ export class ShapeStream = Row> #error: unknown = null readonly #fetchClient: typeof fetch + readonly #sseFetchClient: typeof fetch readonly #messageParser: MessageParser readonly #subscribers = new Map< @@ -362,6 +367,7 @@ export class ShapeStream = Row> #tickPromise?: Promise #tickPromiseResolver?: () => void #tickPromiseRejecter?: (reason?: unknown) => void + #messageChain = Promise.resolve([]) // promise chain for incoming messages constructor(options: ShapeStreamOptions>) { this.options = { subscribe: true, ...options } @@ -376,19 +382,33 @@ export class ShapeStream = Row> options.fetchClient ?? ((...args: Parameters) => fetch(...args)) - const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, { + const backOffOpts = { ...(options.backoffOptions ?? BackoffDefaults), onFailedAttempt: () => { this.#connected = false options.backoffOptions?.onFailedAttempt?.() }, - }) + } + const fetchWithBackoffClient = createFetchWithBackoff( + baseFetchClient, + backOffOpts + ) this.#fetchClient = createFetchWithConsumedMessages( createFetchWithResponseHeadersCheck( createFetchWithChunkBuffer(fetchWithBackoffClient) ) ) + + const sseFetchWithBackoffClient = createFetchWithBackoff( + baseFetchClient, + backOffOpts, + true + ) + + this.#sseFetchClient = createFetchWithResponseHeadersCheck( + createFetchWithChunkBuffer(sseFetchWithBackoffClient) + ) } get shapeHandle() { @@ -417,121 +437,22 @@ export class ShapeStream = Row> this.options.subscribe ) { const { url, signal } = this.options + const { fetchUrl, requestHeaders } = await this.#constructUrl(url) + const abortListener = await this.#createAbortListener(signal) + const requestAbortController = this.#requestAbortController! // we know that it is not undefined because it is set by `this.#createAbortListener` - // Resolve headers and params in parallel - const [requestHeaders, params] = await Promise.all([ - resolveHeaders(this.options.headers), - this.options.params - ? toInternalParams(convertWhereParamsToObj(this.options.params)) - : undefined, - ]) - - // Validate params after resolution - if (params) { - validateParams(params) - } - - const fetchUrl = new URL(url) - - // Add PostgreSQL-specific parameters - if (params) { - if (params.table) - setQueryParam(fetchUrl, TABLE_QUERY_PARAM, params.table) - if (params.where) - setQueryParam(fetchUrl, WHERE_QUERY_PARAM, params.where) - if (params.columns) - setQueryParam(fetchUrl, COLUMNS_QUERY_PARAM, params.columns) - if (params.replica) - setQueryParam(fetchUrl, REPLICA_PARAM, params.replica) - if (params.params) - setQueryParam(fetchUrl, WHERE_PARAMS_PARAM, params.params) - - // Add any remaining custom parameters - const customParams = { ...params } - delete customParams.table - delete customParams.where - delete customParams.columns - delete customParams.replica - delete customParams.params - - for (const [key, value] of Object.entries(customParams)) { - setQueryParam(fetchUrl, key, value) - } - } - - // Add Electric's internal parameters - fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) - - if (this.#isUpToDate) { - if (!this.#isRefreshing) { - fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) - } - fetchUrl.searchParams.set( - LIVE_CACHE_BUSTER_QUERY_PARAM, - this.#liveCacheBuster - ) - } - - if (this.#shapeHandle) { - // This should probably be a header for better cache breaking? - fetchUrl.searchParams.set( - SHAPE_HANDLE_QUERY_PARAM, - this.#shapeHandle! - ) - } - - // sort query params in-place for stable URLs and improved cache hits - fetchUrl.searchParams.sort() - - // Create a new AbortController for this request - this.#requestAbortController = new AbortController() - - // If user provided a signal, listen to it and pass on the reason for the abort - let abortListener: (() => void) | undefined - if (signal) { - abortListener = () => { - this.#requestAbortController?.abort(signal.reason) - } - signal.addEventListener(`abort`, abortListener, { once: true }) - if (signal.aborted) { - // If the signal is already aborted, abort the request immediately - this.#requestAbortController?.abort(signal.reason) - } - } - - // If using SSE mode we handle the connection differently using the - // this.#connectSSE method which wraps the EventSource API. - if (this.#isUpToDate && this.options.experimentalLiveSse) { - fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`) - try { - await this.#connectSSE(fetchUrl.toString()) - } catch (error) { - if (error instanceof SSEConnectionAborted) { - break - } - this.#sendErrorToSubscribers( - error instanceof Error ? error : new Error(String(error)) - ) - throw error - } - // TODO: What should we do here? Is this the behaviour we want? - // Skip the regular fetch and continue the loop to reconnect if needed - continue - } - - let response!: Response try { - response = await this.#fetchClient(fetchUrl.toString(), { - signal: this.#requestAbortController.signal, + await this.#requestShape({ + fetchUrl, + requestAbortController, headers: requestHeaders, }) - this.#connected = true } catch (e) { // Handle abort error triggered by refresh if ( (e instanceof FetchError || e instanceof FetchBackoffAbortError) && - this.#requestAbortController.signal.aborted && - this.#requestAbortController.signal.reason === + requestAbortController.signal.aborted && + requestAbortController.signal.reason === FORCE_DISCONNECT_AND_REFRESH ) { // Loop back to the top of the while loop to start a new request @@ -564,50 +485,6 @@ export class ShapeStream = Row> this.#requestAbortController = undefined } - const { headers, status } = response - const shapeHandle = headers.get(SHAPE_HANDLE_HEADER) - if (shapeHandle) { - this.#shapeHandle = shapeHandle - } - - const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER) - if (lastOffset) { - this.#lastOffset = lastOffset as Offset - } - - const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER) - if (liveCacheBuster) { - this.#liveCacheBuster = liveCacheBuster - } - - const getSchema = (): Schema => { - const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) - return schemaHeader ? JSON.parse(schemaHeader) : {} - } - this.#schema = this.#schema ?? getSchema() - - // NOTE: 204s are deprecated, the Electric server should not - // send these in latest versions but this is here for backwards - // compatibility - if (status === 204) { - // There's no content so we are live and up to date - this.#lastSyncedAt = Date.now() - } - - const messages = (await response.text()) || `[]` - const batch = this.#messageParser.parse(messages, this.#schema) - - // Update isUpToDate - if (batch.length > 0) { - const lastMessage = batch[batch.length - 1] - if (isUpToDateMessage(lastMessage)) { - this.#lastSyncedAt = Date.now() - this.#isUpToDate = true - } - - await this.#publish(batch) - } - this.#tickPromiseResolver?.() } } catch (err) { @@ -640,6 +517,239 @@ export class ShapeStream = Row> } } + async #constructUrl(url: string) { + // Resolve headers and params in parallel + const [requestHeaders, params] = await Promise.all([ + resolveHeaders(this.options.headers), + this.options.params + ? toInternalParams(convertWhereParamsToObj(this.options.params)) + : undefined, + ]) + + // Validate params after resolution + if (params) { + validateParams(params) + } + + const fetchUrl = new URL(url) + + // Add PostgreSQL-specific parameters + if (params) { + if (params.table) setQueryParam(fetchUrl, TABLE_QUERY_PARAM, params.table) + if (params.where) setQueryParam(fetchUrl, WHERE_QUERY_PARAM, params.where) + if (params.columns) + setQueryParam(fetchUrl, COLUMNS_QUERY_PARAM, params.columns) + if (params.replica) setQueryParam(fetchUrl, REPLICA_PARAM, params.replica) + if (params.params) + setQueryParam(fetchUrl, WHERE_PARAMS_PARAM, params.params) + + // Add any remaining custom parameters + const customParams = { ...params } + delete customParams.table + delete customParams.where + delete customParams.columns + delete customParams.replica + delete customParams.params + + for (const [key, value] of Object.entries(customParams)) { + setQueryParam(fetchUrl, key, value) + } + } + + // Add Electric's internal parameters + fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) + + if (this.#isUpToDate) { + if (!this.#isRefreshing) { + fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) + } + fetchUrl.searchParams.set( + LIVE_CACHE_BUSTER_QUERY_PARAM, + this.#liveCacheBuster + ) + } + + if (this.#shapeHandle) { + // This should probably be a header for better cache breaking? + fetchUrl.searchParams.set(SHAPE_HANDLE_QUERY_PARAM, this.#shapeHandle!) + } + + // sort query params in-place for stable URLs and improved cache hits + fetchUrl.searchParams.sort() + + return { + fetchUrl, + requestHeaders, + } + } + + async #createAbortListener(signal?: AbortSignal) { + // Create a new AbortController for this request + this.#requestAbortController = new AbortController() + + // If user provided a signal, listen to it and pass on the reason for the abort + if (signal) { + const abortListener = () => { + this.#requestAbortController?.abort(signal.reason) + } + + signal.addEventListener(`abort`, abortListener, { once: true }) + + if (signal.aborted) { + // If the signal is already aborted, abort the request immediately + this.#requestAbortController?.abort(signal.reason) + } + + return abortListener + } + } + + async #onInitialResponse(response: Response) { + const { headers, status } = response + const shapeHandle = headers.get(SHAPE_HANDLE_HEADER) + if (shapeHandle) { + this.#shapeHandle = shapeHandle + } + + const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER) + if (lastOffset) { + this.#lastOffset = lastOffset as Offset + } + + const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER) + if (liveCacheBuster) { + this.#liveCacheBuster = liveCacheBuster + } + + const getSchema = (): Schema => { + const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) + return schemaHeader ? JSON.parse(schemaHeader) : {} + } + this.#schema = this.#schema ?? getSchema() + + // NOTE: 204s are deprecated, the Electric server should not + // send these in latest versions but this is here for backwards + // compatibility + if (status === 204) { + // There's no content so we are live and up to date + this.#lastSyncedAt = Date.now() + } + } + + async #onMessages(messages: string, schema: Schema, isSseMessage = false) { + const batch = this.#messageParser.parse(messages, schema) + + // Update isUpToDate + if (batch.length > 0) { + const lastMessage = batch[batch.length - 1] + if (isUpToDateMessage(lastMessage)) { + if (isSseMessage) { + // Only use the offset from the up-to-date message if this was an SSE message. + // If we would use this offset from a regular fetch, then it will be wrong + // and we will get an "offset is out of bounds for this shape" error + const offset = getOffset(lastMessage) + if (offset) { + this.#lastOffset = offset + } + } + this.#lastSyncedAt = Date.now() + this.#isUpToDate = true + } + + await this.#publish(batch) + } + } + + /** + * Requests the shape from the server using either long polling or SSE. + * Upon receiving a successfull response, the #onInitialResponse method is called. + * Afterwards, the #onMessages method is called for all the incoming updates. + * @param opts - The options for the request. + * @returns A promise that resolves when the request is complete (i.e. the long poll receives a response or the SSE connection is closed). + */ + async #requestShape(opts: { + fetchUrl: URL + requestAbortController: AbortController + headers: Record + }): Promise { + if ( + this.#isUpToDate && + this.options.experimentalLiveSse && + !this.#isRefreshing + ) { + opts.fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`) + return this.#requestShapeSSE(opts) + } + + return this.#requestShapeLongPoll(opts) + } + + async #requestShapeLongPoll(opts: { + fetchUrl: URL + requestAbortController: AbortController + headers: Record + }): Promise { + const { fetchUrl, requestAbortController, headers } = opts + const response = await this.#fetchClient(fetchUrl.toString(), { + signal: requestAbortController.signal, + headers, + }) + + this.#connected = true + await this.#onInitialResponse(response) + + const schema = this.#schema! // we know that it is not undefined because it is set by `this.#onInitialResponse` + const res = await response.text() + const messages = res || `[]` + + await this.#onMessages(messages, schema) + } + + async #requestShapeSSE(opts: { + fetchUrl: URL + requestAbortController: AbortController + headers: Record + }): Promise { + const { fetchUrl, requestAbortController, headers } = opts + const fetch = this.#sseFetchClient + try { + await fetchEventSource(fetchUrl.toString(), { + headers, + fetch, + onopen: async (response: Response) => { + this.#connected = true + await this.#onInitialResponse(response) + }, + onmessage: (event: EventSourceMessage) => { + if (event.data) { + // Process the SSE message + // The event.data is a single JSON object, so we wrap it in an array + const messages = `[${event.data}]` + const schema = this.#schema! // we know that it is not undefined because it is set in onopen when we call this.#onInitialResponse + this.#onMessages(messages, schema, true) + } + }, + onerror: (error: Error) => { + // rethrow to close the SSE connection + throw error + }, + signal: requestAbortController.signal, + }) + } catch (error) { + if (requestAbortController.signal.aborted) { + // During an SSE request, the fetch might have succeeded + // and we are parsing the incoming stream. + // If the abort happens while we're parsing the stream, + // then it won't be caught by our `createFetchWithBackoff` wrapper + // and instead we will get a raw AbortError here + // which we need to turn into a `FetchBackoffAbortError` + // such that #start handles it correctly. + throw new FetchBackoffAbortError() + } + throw error + } + } + subscribe( callback: (messages: Message[]) => MaybePromise, onError: (error: Error) => void = () => {} @@ -717,18 +827,26 @@ export class ShapeStream = Row> this.#isRefreshing = false } - async #publish(messages: Message[]): Promise { - await Promise.all( - Array.from(this.#subscribers.values()).map(async ([callback, __]) => { - try { - await callback(messages) - } catch (err) { - queueMicrotask(() => { - throw err - }) - } - }) + async #publish(messages: Message[]): Promise { + // We process messages asynchronously + // but SSE's `onmessage` handler is synchronous. + // We use a promise chain to ensure that the handlers + // execute sequentially in the order the messages were received. + this.#messageChain = this.#messageChain.then(() => + Promise.all( + Array.from(this.#subscribers.values()).map(async ([callback, __]) => { + try { + await callback(messages) + } catch (err) { + queueMicrotask(() => { + throw err + }) + } + }) + ) ) + + return this.#messageChain } #sendErrorToSubscribers(error: Error) { @@ -749,108 +867,6 @@ export class ShapeStream = Row> this.#connected = false this.#schema = undefined } - - /** - * Connects to the server using Server-Sent Events. - * Returns a promise that resolves when the connection is closed. - */ - async #connectSSE(url: string): Promise { - return new Promise((resolve, reject) => { - try { - if (!this.#requestAbortController) { - reject( - new Error( - `Request abort controller is not set - this should never happen` - ) - ) - return - } - - if (this.#requestAbortController.signal.aborted) { - reject( - new SSEConnectionAborted( - `Connection aborted before SSE connection established` - ) - ) - return - } - - // Create an EventSource instance - const eventSource = new EventSource(url) - - // Set up event handlers - eventSource.onopen = () => { - this.#connected = true - } - - eventSource.onmessage = async (event: MessageEvent) => { - try { - if (event.data) { - // Process the SSE message - // Provide an empty schema object if schema is undefined, which it - // should not be as we only get to SSE mode after being in normal mode - // and getting a schema from a header then. - // The event.data is a single JSON object, so we wrap it in an array - // to be consistent with the way we parse the response from the HTTP API. - // TODO: Is this needed? - const batch = this.#messageParser.parse( - `[${event.data}]`, - this.#schema || {} - ) - - if (batch.length > 0) { - const lastMessage = batch[batch.length - 1] - if (isUpToDateMessage(lastMessage)) { - const upToDateMsg = lastMessage as typeof lastMessage & { - headers: { global_last_seen_lsn: string } - } - this.#lastSyncedAt = Date.now() - this.#isUpToDate = true - this.#lastOffset = - `${upToDateMsg.headers.global_last_seen_lsn}_0` as Offset - // TODO: we also need the cache buster `cursor` value - } - - await this.#publish(batch) - } - } - } catch (error) { - // Handle parsing errors - this.#sendErrorToSubscribers( - error instanceof Error ? error : new Error(String(error)) - ) - } - } - - eventSource.onerror = (_error: Event) => { - // Connection was closed or errored - // EventSource would normally automatically reconnect but want to close the - // connection and reconnect on the next outer loop iteration with the new - // url and offset. - // TODO: It may be that some errors we should elevate to the user - eventSource.close() - resolve() - } - - // Listen for abort signals - const abortHandler = () => { - eventSource.close() - reject(new SSEConnectionAborted(`SSE connection aborted`)) - } - - this.#requestAbortController.signal.addEventListener( - `abort`, - abortHandler, - { once: true } - ) - } catch (error) { - this.#sendErrorToSubscribers( - error instanceof Error ? error : new Error(String(error)) - ) - reject(error) - } - }) - } } /** @@ -919,10 +935,3 @@ function convertWhereParamsToObj( } return allPgParams } - -class SSEConnectionAborted extends Error { - constructor(message: string) { - super(message) - this.name = `SSEConnectionAborted` - } -} diff --git a/packages/typescript-client/src/fetch.ts b/packages/typescript-client/src/fetch.ts index 4818a97a52..9bd1bb211a 100644 --- a/packages/typescript-client/src/fetch.ts +++ b/packages/typescript-client/src/fetch.ts @@ -38,7 +38,8 @@ export const BackoffDefaults = { export function createFetchWithBackoff( fetchClient: typeof fetch, - backoffOptions: BackoffOptions = BackoffDefaults + backoffOptions: BackoffOptions = BackoffDefaults, + sseMode: boolean = false ): typeof fetch { const { initialDelay, @@ -64,7 +65,16 @@ export function createFetchWithBackoff( try { const result = await fetchClient(...args) if (result.ok) return result - else throw await FetchError.fromResponse(result, url.toString()) + + const err = await FetchError.fromResponse(result, url.toString()) + if (err.status === 409 && sseMode) { + // The json body is [ { headers: { control: 'must-refetch' } } ] in normal mode + // and is { headers: { control: 'must-refetch' } } in SSE mode + // So in SSE mode we need to wrap it in an array + err.json = [err.json] + } + + throw err } catch (e) { onFailedAttempt?.() if (options?.signal?.aborted) { diff --git a/packages/typescript-client/src/helpers.ts b/packages/typescript-client/src/helpers.ts index c30ed129f0..1c1aa21f6a 100644 --- a/packages/typescript-client/src/helpers.ts +++ b/packages/typescript-client/src/helpers.ts @@ -1,4 +1,4 @@ -import { ChangeMessage, ControlMessage, Message, Row } from './types' +import { ChangeMessage, ControlMessage, Message, Offset, Row } from './types' /** * Type guard for checking {@link Message} is {@link ChangeMessage}. @@ -51,3 +51,15 @@ export function isUpToDateMessage = Row>( ): message is ControlMessage & { up_to_date: true } { return isControlMessage(message) && message.headers.control === `up-to-date` } + +/** + * Parses the LSN from the up-to-date message and turns it into an offset. + * The LSN is only present in the up-to-date control message when in SSE mode. + * If we are not in SSE mode this function will return undefined. + */ +export function getOffset(message: ControlMessage): Offset | undefined { + const lsn = Number(message.headers.global_last_seen_lsn) + if (lsn && !isNaN(lsn)) { + return `${lsn}_0` + } +} diff --git a/packages/typescript-client/src/types.ts b/packages/typescript-client/src/types.ts index 35eefc7a34..ce476c1af6 100644 --- a/packages/typescript-client/src/types.ts +++ b/packages/typescript-client/src/types.ts @@ -26,7 +26,10 @@ interface Header { export type Operation = `insert` | `update` | `delete` export type ControlMessage = { - headers: Header & { control: `up-to-date` | `must-refetch` } + headers: Header & { + control: `up-to-date` | `must-refetch` + global_last_seen_lsn?: string + } } export type ChangeMessage = Row> = { diff --git a/packages/typescript-client/test/__snapshots__/client.test.ts.snap b/packages/typescript-client/test/__snapshots__/client.test.ts.snap index 7c8bb2bc64..d42038b711 100644 --- a/packages/typescript-client/test/__snapshots__/client.test.ts.snap +++ b/packages/typescript-client/test/__snapshots__/client.test.ts.snap @@ -1,3 +1,5 @@ // Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html -exports[`Shape > should throw on a reserved parameter 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: live]`; +exports[`Shape > should throw on a reserved parameter (liveSSE=false) 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: live]`; + +exports[`Shape > should throw on a reserved parameter (liveSSE=true) 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: live]`; diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index a200d0c1f0..54230aeaae 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -9,882 +9,958 @@ import { resolveValue } from '../src' const BASE_URL = inject(`baseUrl`) +const fetchAndSse = [ + { experimentalLiveSse: false }, + { experimentalLiveSse: true }, +] + describe(`Shape`, () => { - it(`should sync an empty shape`, async ({ issuesTableUrl }) => { - const start = Date.now() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - }) - const shape = new Shape(shapeStream) - - expect(await shape.value).toEqual(new Map()) - expect(await shape.rows).toEqual([]) - expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) - expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) - }) - - it(`should throw on a reserved parameter`, async () => { - expect(() => { + it.for(fetchAndSse)( + `should sync an empty shape (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const start = Date.now() const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, params: { - table: `foo`, - live: `false`, - }, - }) - new Shape(shapeStream) - }).toThrowErrorMatchingSnapshot() - }) - - it(`should notify with the initial value`, async ({ - issuesTableUrl, - insertIssues, - aborter, - }) => { - const [id] = await insertIssues({ title: `test title` }) - - const start = Date.now() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) - const shape = new Shape(shapeStream) - - const rows = await new Promise((resolve) => { - shape.subscribe(({ rows }) => resolve(rows)) - }) - - expect(rows).toEqual([{ id: id, title: `test title`, priority: 10 }]) - expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) - expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) - }) - - it(`should continually sync a shape/table`, async ({ - issuesTableUrl, - insertIssues, - deleteIssue, - updateIssue, - waitForIssues, - aborter, - }) => { - const [id] = await insertIssues({ title: `test title` }) - - const expectedValue1 = [ - { - id: id, - title: `test title`, - priority: 10, - }, - ] - - const start = Date.now() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) - const shape = new Shape(shapeStream) - const rows = await shape.rows - - expect(rows).toEqual(expectedValue1) - expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) - expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) - - await sleep(105) - expect(shape.lastSynced()).toBeGreaterThanOrEqual(100) - - // FIXME: might get notified before all changes are submitted - const intermediate = Date.now() - const hasNotified = new Promise((resolve) => { - shape.subscribe(resolve) - }) - const [id2] = await insertIssues({ title: `other title` }) - const [id3] = await insertIssues({ title: `other title2` }) - await deleteIssue({ id: id3, title: `other title2` }) - // Test an update too because we're sending patches that should be correctly merged in - await updateIssue({ id: id2, title: `new title` }) - await waitForIssues({ numChangesExpected: 5 }) - await vi.waitUntil(() => hasNotified) - - const expectedValue2 = [ - ...expectedValue1, - { - id: id2, - title: `new title`, - priority: 10, - }, - ] - - await vi.waitFor(() => expect(shape.currentRows).toEqual(expectedValue2)) - expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(intermediate) - expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - intermediate) - - shape.unsubscribeAll() - }) - - it(`should resync from scratch on a shape rotation`, async ({ - issuesTableUrl, - insertIssues, - deleteIssue, - waitForIssues, - clearIssuesShape, - aborter, - }) => { - const id1 = uuidv4() - const id2 = uuidv4() - await insertIssues({ id: id1, title: `foo1` }) - - const expectedValue1 = [ + table: issuesTableUrl, + }, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + + expect(await shape.value).toEqual(new Map()) + expect(await shape.rows).toEqual([]) + expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) + expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) + } + ) + + it.for(fetchAndSse)( + `should throw on a reserved parameter (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }) => { + expect(() => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: `foo`, + live: `false`, + }, + experimentalLiveSse, + }) + new Shape(shapeStream) + }).toThrowErrorMatchingSnapshot() + } + ) + + it.for(fetchAndSse)( + `should notify with the initial value (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { issuesTableUrl, insertIssues, aborter } + ) => { + const [id] = await insertIssues({ title: `test title` }) + + const start = Date.now() + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + + const rows = await new Promise((resolve) => { + shape.subscribe(({ rows }) => resolve(rows)) + }) + + expect(rows).toEqual([{ id: id, title: `test title`, priority: 10 }]) + expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) + expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) + } + ) + + it.for(fetchAndSse)( + `should continually sync a shape/table (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, { - id: id1, - title: `foo1`, - priority: 10, - }, - ] + issuesTableUrl, + insertIssues, + deleteIssue, + updateIssue, + waitForIssues, + aborter, + } + ) => { + const [id] = await insertIssues({ title: `test title` }) + + const expectedValue1 = [ + { + id: id, + title: `test title`, + priority: 10, + }, + ] + + const start = Date.now() + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + const rows = await shape.rows + + expect(rows).toEqual(expectedValue1) + expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) + expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) + + await sleep(105) + expect(shape.lastSynced()).toBeGreaterThanOrEqual(100) + + // FIXME: might get notified before all changes are submitted + const intermediate = Date.now() + const hasNotified = new Promise((resolve) => { + shape.subscribe(resolve) + }) + const [id2] = await insertIssues({ title: `other title` }) + const [id3] = await insertIssues({ title: `other title2` }) + await deleteIssue({ id: id3, title: `other title2` }) + // Test an update too because we're sending patches that should be correctly merged in + await updateIssue({ id: id2, title: `new title` }) + await waitForIssues({ numChangesExpected: 5 }) + await vi.waitUntil(() => hasNotified) + + const expectedValue2 = [ + ...expectedValue1, + { + id: id2, + title: `new title`, + priority: 10, + }, + ] - const expectedValue2 = [ + await vi.waitFor(() => expect(shape.currentRows).toEqual(expectedValue2)) + expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(intermediate) + expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - intermediate) + + shape.unsubscribeAll() + } + ) + + it.for(fetchAndSse)( + `should resync from scratch on a shape rotation (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, { - id: id2, - title: `foo2`, - priority: 10, - }, - ] - - const start = Date.now() - let rotationTime: number = Infinity - let fetchPausePromise = Promise.resolve() - const fetchWrapper = async (...args: Parameters) => { - await fetchPausePromise - return await fetch(...args) + issuesTableUrl, + insertIssues, + deleteIssue, + waitForIssues, + clearIssuesShape, + aborter, + } + ) => { + const id1 = uuidv4() + const id2 = uuidv4() + await insertIssues({ id: id1, title: `foo1` }) + + const expectedValue1 = [ + { + id: id1, + title: `foo1`, + priority: 10, + }, + ] + + const expectedValue2 = [ + { + id: id2, + title: `foo2`, + priority: 10, + }, + ] + + const start = Date.now() + let rotationTime: number = Infinity + let fetchPausePromise = Promise.resolve() + const fetchWrapper = async (...args: Parameters) => { + await fetchPausePromise + return await fetch(...args) + } + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + fetchClient: fetchWrapper, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + let dataUpdateCount = 0 + await new Promise((resolve, reject) => { + setTimeout(() => reject(`Timed out waiting for data changes`), 1000) + shape.subscribe(async ({ rows }) => { + dataUpdateCount++ + if (dataUpdateCount === 1) { + expect(rows).toEqual(expectedValue1) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) + + // clear the shape and modify the data after the initial request + fetchPausePromise = Promise.resolve().then(async () => { + await deleteIssue({ id: id1, title: `foo1` }) + await insertIssues({ id: id2, title: `foo2` }) + await waitForIssues({ numChangesExpected: 3 }) + await clearIssuesShape(shapeStream.shapeHandle) + }) + + rotationTime = Date.now() + return + } else if (dataUpdateCount === 2) { + expect(rows).toEqual(expectedValue2) + expect(shape.lastSynced()).toBeLessThanOrEqual( + Date.now() - rotationTime + ) + return resolve() + } + throw new Error(`Received more data updates than expected`) + }) + }) } + ) - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - fetchClient: fetchWrapper, - }) - const shape = new Shape(shapeStream) - let dataUpdateCount = 0 - await new Promise((resolve, reject) => { - setTimeout(() => reject(`Timed out waiting for data changes`), 1000) - shape.subscribe(async ({ rows }) => { - dataUpdateCount++ - if (dataUpdateCount === 1) { - expect(rows).toEqual(expectedValue1) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) - - // clear the shape and modify the data after the initial request - fetchPausePromise = Promise.resolve().then(async () => { - await deleteIssue({ id: id1, title: `foo1` }) - await insertIssues({ id: id2, title: `foo2` }) - await waitForIssues({ numChangesExpected: 3 }) - await clearIssuesShape(shapeStream.shapeHandle) - }) + it.for(fetchAndSse)( + `should notify subscribers when the value changes (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { issuesTableUrl, insertIssues, aborter } + ) => { + const [id] = await insertIssues({ title: `test title` }) - rotationTime = Date.now() - return - } else if (dataUpdateCount === 2) { - expect(rows).toEqual(expectedValue2) - expect(shape.lastSynced()).toBeLessThanOrEqual( - Date.now() - rotationTime + const start = Date.now() + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + + const hasNotified = new Promise((resolve) => { + shape.subscribe(({ rows }) => resolve(rows)) + }) + + const [id2] = await insertIssues({ title: `other title` }) + + const value = await hasNotified + const expectedValue = [ + { + id: id, + title: `test title`, + priority: 10, + }, + { + id: id2, + title: `other title`, + priority: 10, + }, + ] + expect(value).toEqual(expectedValue) + expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) + expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) + expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) + + shape.unsubscribeAll() + } + ) + + it.for(fetchAndSse)( + `should support unsubscribe (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + experimentalLiveSse, + }) + await waitForFetch(shapeStream) + const shape = new Shape(shapeStream) + + const subFn = vi.fn((_) => void 0) + + const unsubscribeFn = shape.subscribe(subFn) + unsubscribeFn() + + expect(shape.numSubscribers).toBe(0) + expect(subFn).not.toHaveBeenCalled() + } + ) + + it.for(fetchAndSse)( + `should expose connection status (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const aborter = new AbortController() + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + + // give some time for the initial fetch to complete + await waitForFetch(shapeStream) + expect(shapeStream.isConnected()).true + + const shape = new Shape(shapeStream) + await shape.rows + + expect(shapeStream.isConnected()).true + + // Abort the shape stream and check connectivity status + aborter.abort() + await vi.waitFor(() => expect(shapeStream.isConnected()).false) + } + ) + + it.for(fetchAndSse)( + `should set isConnected to false on fetch error and back on true when fetch succeeds again (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + let fetchShouldFail = false + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (_input, _init) => { + if (fetchShouldFail) + throw new FetchError( + 500, + `Artifical fetch error.`, + undefined, + {}, + ``, + undefined + ) + await sleep(50) + return new Response( + JSON.stringify([{ headers: { control: `up-to-date` } }]), + { + status: 200, + headers: new Headers({ + [`electric-offset`]: `0_0`, + [`electric-handle`]: `foo`, + [`electric-schema`]: ``, + [`electric-cursor`]: `123`, + }), + } ) - return resolve() + }, + experimentalLiveSse, + }) + + const unsubscribe = shapeStream.subscribe(() => unsubscribe()) + + await vi.waitFor(() => expect(shapeStream.isConnected()).true) + + // Now make fetch fail and check the status + fetchShouldFail = true + await vi.waitFor(() => expect(shapeStream.isConnected()).false) + + fetchShouldFail = false + await vi.waitFor(() => expect(shapeStream.isConnected()).true) + } + ) + + it.for(fetchAndSse)( + `should not throw error if an error handler is provided (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const mockErrorHandler = vi.fn() + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (_input, _init) => { + return new Response(undefined, { + status: 401, + }) + }, + experimentalLiveSse, + onError: mockErrorHandler, + }) + + await waitForFetch(shapeStream) + expect(mockErrorHandler.mock.calls.length).toBe(1) + expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) + } + ) + + it.for(fetchAndSse)( + `should retry on error if error handler returns modified params (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + // This test creates a shapestream but provides wrong query params + // the fetch client therefore returns a 401 status code + // the custom error handler handles it by correcting the query param + // after which the fetch succeeds + + const mockErrorHandler = vi.fn().mockImplementation((error) => { + if (error instanceof FetchError && error.status === 401) { + return { + params: { + todo: `pass`, + }, + } } - throw new Error(`Received more data updates than expected`) - }) - }) - }) - - it(`should notify subscribers when the value changes`, async ({ - issuesTableUrl, - insertIssues, - aborter, - }) => { - const [id] = await insertIssues({ title: `test title` }) - - const start = Date.now() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) - const shape = new Shape(shapeStream) - - const hasNotified = new Promise((resolve) => { - shape.subscribe(({ rows }) => resolve(rows)) - }) - - const [id2] = await insertIssues({ title: `other title` }) - - const value = await hasNotified - const expectedValue = [ - { - id: id, - title: `test title`, - priority: 10, - }, - { - id: id2, - title: `other title`, - priority: 10, - }, - ] - expect(value).toEqual(expectedValue) - expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start) - expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now()) - expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start) - - shape.unsubscribeAll() - }) - - it(`should support unsubscribe`, async ({ issuesTableUrl }) => { - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - }) - await waitForFetch(shapeStream) - const shape = new Shape(shapeStream) - - const subFn = vi.fn((_) => void 0) - - const unsubscribeFn = shape.subscribe(subFn) - unsubscribeFn() - - expect(shape.numSubscribers).toBe(0) - expect(subFn).not.toHaveBeenCalled() - }) - - it(`should expose connection status`, async ({ issuesTableUrl }) => { - const aborter = new AbortController() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) - - // give some time for the initial fetch to complete - await waitForFetch(shapeStream) - expect(shapeStream.isConnected()).true - - const shape = new Shape(shapeStream) - await shape.rows - - expect(shapeStream.isConnected()).true - - // Abort the shape stream and check connectivity status - aborter.abort() - await vi.waitFor(() => expect(shapeStream.isConnected()).false) - }) - - it(`should set isConnected to false on fetch error and back on true when fetch succeeds again`, async ({ - issuesTableUrl, - }) => { - let fetchShouldFail = false - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (_input, _init) => { - if (fetchShouldFail) - throw new FetchError( - 500, - `Artifical fetch error.`, - undefined, - {}, - ``, - undefined + }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + todo: `fail`, + }, + fetchClient: async (input, _init) => { + const url = new URL(input instanceof Request ? input.url : input) + if (url.searchParams.get(`todo`) === `fail`) { + return new Response(undefined, { + status: 401, + }) + } + + return new Response( + JSON.stringify([{ headers: { control: `up-to-date` } }]), + { status: 200 } ) - await sleep(50) - return new Response( - JSON.stringify([{ headers: { control: `up-to-date` } }]), - { - status: 200, - headers: new Headers({ - [`electric-offset`]: `0_0`, - [`electric-handle`]: `foo`, - [`electric-schema`]: ``, - [`electric-cursor`]: `123`, - }), + }, + experimentalLiveSse, + onError: mockErrorHandler, + }) + + await waitForFetch(shapeStream) + expect(mockErrorHandler.mock.calls.length).toBe(1) + expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) + } + ) + + it.for(fetchAndSse)( + `should retry on error if error handler returns modified headers (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + // This test creates a shapestream but provides invalid auth credentials + // the fetch client therefore returns a 401 status code + // the custom error handler handles it by replacing the credentials with valid credentials + // after which the fetch succeeds + + const mockErrorHandler = vi.fn().mockImplementation((error) => { + if (error instanceof FetchError && error.status === 401) { + return { + headers: { + Authorization: `valid credentials`, + }, } - ) - }, - }) - - const unsubscribe = shapeStream.subscribe(() => unsubscribe()) - - await vi.waitFor(() => expect(shapeStream.isConnected()).true) - - // Now make fetch fail and check the status - fetchShouldFail = true - await vi.waitFor(() => expect(shapeStream.isConnected()).false) - - fetchShouldFail = false - await vi.waitFor(() => expect(shapeStream.isConnected()).true) - }) - - it(`should not throw error if an error handler is provided`, async ({ - issuesTableUrl, - }) => { - const mockErrorHandler = vi.fn() - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (_input, _init) => { - return new Response(undefined, { - status: 401, - }) - }, - onError: mockErrorHandler, - }) - - await waitForFetch(shapeStream) - expect(mockErrorHandler.mock.calls.length).toBe(1) - expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) - }) - - it(`should retry on error if error handler returns modified params`, async ({ - issuesTableUrl, - }) => { - // This test creates a shapestream but provides wrong query params - // the fetch client therefore returns a 401 status code - // the custom error handler handles it by correcting the query param - // after which the fetch succeeds - - const mockErrorHandler = vi.fn().mockImplementation((error) => { - if (error instanceof FetchError && error.status === 401) { - return { - params: { - todo: `pass`, - }, } - } - }) - - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - todo: `fail`, - }, - fetchClient: async (input, _init) => { - const url = new URL(input instanceof Request ? input.url : input) - if (url.searchParams.get(`todo`) === `fail`) { + }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + headers: { + Authorization: `invalid credentials`, + }, + fetchClient: async (input, init) => { + const headers = init?.headers as Record + if (headers && headers.Authorization === `valid credentials`) { + return fetch(input, init) + } + return new Response(undefined, { status: 401, }) + }, + experimentalLiveSse, + onError: mockErrorHandler, + }) + + await waitForFetch(shapeStream) + expect(mockErrorHandler.mock.calls.length).toBe(1) + expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) + } + ) + + it.for(fetchAndSse)( + `should support async error handler (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + let authChanged: () => void + const authChangePromise = new Promise((res) => { + authChanged = res + }) + const mockErrorHandler = vi.fn().mockImplementation(async (error) => { + if (error instanceof FetchError && error.status === 401) { + authChanged() + return { + headers: { + Authorization: `valid credentials`, + }, + } } + }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + headers: { + Authorization: `invalid credentials`, + }, + fetchClient: async (input, init) => { + const headers = init?.headers as Record + if (headers && headers.Authorization === `valid credentials`) { + return fetch(input, init) + } + + return new Response(undefined, { + status: 401, + }) + }, + experimentalLiveSse, + onError: mockErrorHandler, + }) - return new Response( - JSON.stringify([{ headers: { control: `up-to-date` } }]), - { status: 200 } + await waitForFetch(shapeStream) + expect(mockErrorHandler.mock.calls.length).toBe(1) + expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) + expect(shapeStream.isConnected()).toBe(false) + + await authChangePromise + // give some time for the error handler to modify the authorization header + await vi.waitFor(() => expect(shapeStream.isConnected()).true) + } + ) + + it.for(fetchAndSse)( + `should stop fetching and report an error if response is missing required headers (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + let url: string = `` + let error1: Error, error2: Error + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (input, _init) => { + url = input.toString() + const headers = new Headers() + headers.set(`electric-offset`, `0_0`) + return new Response(``, { status: 200, headers }) + }, + onError: (err) => { + error1 = err + }, + experimentalLiveSse, + }) + + const unsub = shapeStream.subscribe(() => unsub()) + expect(shapeStream.isConnected()).false + + await vi.waitFor(() => { + const expectedErrorMessage = new MissingHeadersError(url, [ + `electric-handle`, + `electric-schema`, + ]).message + expect(error1!.message).equals(expectedErrorMessage) + expect((shapeStream.error as Error).message).equals( + expectedErrorMessage ) - }, - onError: mockErrorHandler, - }) - - await waitForFetch(shapeStream) - expect(mockErrorHandler.mock.calls.length).toBe(1) - expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) - }) - - it(`should retry on error if error handler returns modified headers`, async ({ - issuesTableUrl, - }) => { - // This test creates a shapestream but provides invalid auth credentials - // the fetch client therefore returns a 401 status code - // the custom error handler handles it by replacing the credentials with valid credentials - // after which the fetch succeeds - - const mockErrorHandler = vi.fn().mockImplementation((error) => { - if (error instanceof FetchError && error.status === 401) { - return { - headers: { - Authorization: `valid credentials`, - }, - } - } - }) - - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - headers: { - Authorization: `invalid credentials`, - }, - fetchClient: async (input, init) => { - const headers = init?.headers as Record - if (headers && headers.Authorization === `valid credentials`) { - return fetch(input, init) - } + }) - return new Response(undefined, { - status: 401, - }) - }, - onError: mockErrorHandler, - }) - - await waitForFetch(shapeStream) - expect(mockErrorHandler.mock.calls.length).toBe(1) - expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) - }) - - it(`should support async error handler`, async ({ issuesTableUrl }) => { - let authChanged: () => void - const authChangePromise = new Promise((res) => { - authChanged = res - }) - const mockErrorHandler = vi.fn().mockImplementation(async (error) => { - if (error instanceof FetchError && error.status === 401) { - authChanged() - return { - headers: { - Authorization: `valid credentials`, - }, - } - } - }) - - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - headers: { - Authorization: `invalid credentials`, - }, - fetchClient: async (input, init) => { - const headers = init?.headers as Record - if (headers && headers.Authorization === `valid credentials`) { - return fetch(input, init) - } + expect(shapeStream.isConnected()).false - return new Response(undefined, { - status: 401, - }) - }, - onError: mockErrorHandler, - }) - - await waitForFetch(shapeStream) - expect(mockErrorHandler.mock.calls.length).toBe(1) - expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) - expect(shapeStream.isConnected()).toBe(false) - - await authChangePromise - // give some time for the error handler to modify the authorization header - await vi.waitFor(() => expect(shapeStream.isConnected()).true) - }) - - it(`should stop fetching and report an error if response is missing required headers`, async ({ - issuesTableUrl, - }) => { - let url: string = `` - let error1: Error, error2: Error - - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (input, _init) => { - url = input.toString() - const headers = new Headers() - headers.set(`electric-offset`, `0_0`) - return new Response(``, { status: 200, headers }) - }, - onError: (err) => { - error1 = err - }, - }) - - const unsub = shapeStream.subscribe(() => unsub()) - expect(shapeStream.isConnected()).false - - await vi.waitFor(() => { - const expectedErrorMessage = new MissingHeadersError(url, [ - `electric-handle`, - `electric-schema`, - ]).message - expect(error1!.message).equals(expectedErrorMessage) - expect((shapeStream.error as Error).message).equals(expectedErrorMessage) - }) - - expect(shapeStream.isConnected()).false - - // Also check that electric-cursor is a required header for responses to live queries - const shapeStreamLive = new ShapeStream({ - url: `${BASE_URL}/v1/shape?live=true`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (input, _init) => { - url = input.toString() - const headers = new Headers() - headers.set(`electric-offset`, `0_0`) - return new Response(undefined, { status: 200, headers }) - }, - onError: (err) => { - error2 = err - }, - }) - - const unsubLive = shapeStreamLive.subscribe(() => unsubLive()) - expect(shapeStreamLive.isConnected()).false - - await vi.waitFor(() => { - const expectedErrorMessageLive = new MissingHeadersError(url, [ - `electric-handle`, - `electric-cursor`, - ]).message - expect(error2!.message).equals(expectedErrorMessageLive) - expect((shapeStreamLive.error as Error).message).equals( - expectedErrorMessageLive - ) - }) - expect(shapeStreamLive.isConnected()).false - }) - - it(`should set isConnected to false after fetch if not subscribed`, async ({ - issuesTableUrl, - }) => { - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - subscribe: false, - }) - - await waitForFetch(shapeStream) - - // We should no longer be connected because - // the initial fetch finished and we've not subscribed to changes - await vi.waitFor(() => expect(shapeStream.isConnected()).false) - }) - - it(`should expose isLoading status`, async ({ issuesTableUrl }) => { - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (input, init) => { - await sleep(20) - return fetch(input, init) - }, - }) - - expect(shapeStream.isLoading()).true - - await waitForFetch(shapeStream) - - expect(shapeStream.isLoading()).false - }) - - it(`should expose lastOffset`, async ({ issuesTableUrl }) => { - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (input, init) => { - await sleep(20) - return fetch(input, init) - }, - }) - const shape = new Shape(shapeStream) - - expect(shapeStream.lastOffset).toBe(`-1`) - expect(shape.lastOffset).toBe(shapeStream.lastOffset) - await waitForFetch(shapeStream) - - shape.unsubscribeAll() - }) - - it(`should honour replica: full`, async ({ - issuesTableUrl, - insertIssues, - updateIssue, - clearIssuesShape, - aborter, - }) => { - const [id] = await insertIssues({ title: `first title` }) - - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - replica: `full`, - }, - signal: aborter.signal, - }) - - let unsub: () => void = () => {} - try { - const lastMsgs: Message[] = [] - unsub = shapeStream.subscribe((msgs) => { - lastMsgs.push(...msgs) + // Also check that electric-cursor is a required header for responses to live queries + const shapeStreamLive = new ShapeStream({ + url: `${BASE_URL}/v1/shape?live=true`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (input, _init) => { + url = input.toString() + const headers = new Headers() + headers.set(`electric-offset`, `0_0`) + return new Response(undefined, { status: 200, headers }) + }, + onError: (err) => { + error2 = err + }, + experimentalLiveSse, }) + const unsubLive = shapeStreamLive.subscribe(() => unsubLive()) + expect(shapeStreamLive.isConnected()).false + await vi.waitFor(() => { - const msg = lastMsgs.shift() - expect(msg?.headers.control).toEqual(`up-to-date`) + const expectedErrorMessageLive = new MissingHeadersError(url, [ + `electric-handle`, + `electric-cursor`, + ]).message + expect(error2!.message).equals(expectedErrorMessageLive) + expect((shapeStreamLive.error as Error).message).equals( + expectedErrorMessageLive + ) }) + expect(shapeStreamLive.isConnected()).false + } + ) - const expectedValue = { - id: id, - title: `updated title`, - // because we're sending the full row, the update will include the - // unchanged `priority` column - priority: 10, - } - await updateIssue({ id: id, title: `updated title` }) + it.for(fetchAndSse)( + `should set isConnected to false after fetch if not subscribed (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + subscribe: false, + experimentalLiveSse, + }) - await vi.waitFor( - () => { - const msg = lastMsgs.shift() - if (!msg) throw new Error(`Update message not yet received`) - const changeMsg: ChangeMessage = msg as ChangeMessage - expect(changeMsg.headers.operation).toEqual(`update`) - expect(changeMsg.value).toEqual(expectedValue) - }, - { timeout: 2000 } - ) - } finally { - unsub() - // the normal cleanup doesn't work because our shape definition is - // changed by the updates: 'full' param - await clearIssuesShape(shapeStream.shapeHandle) + await waitForFetch(shapeStream) + + // We should no longer be connected because + // the initial fetch finished and we've not subscribed to changes + await vi.waitFor(() => expect(shapeStream.isConnected()).false) } - }) - - it(`should support function-based params and headers`, async ({ - issuesTableUrl, - }) => { - const mockParamFn = vi.fn().mockReturnValue(`test-value`) - const mockAsyncParamFn = vi.fn().mockResolvedValue(`test-value`) - const mockHeaderFn = vi.fn().mockReturnValue(`test-value`) - const mockAsyncHeaderFn = vi.fn().mockResolvedValue(`test-value`) - - // Test with synchronous functions - const shapeStream1 = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - customParam: mockParamFn, - }, - headers: { - 'X-Custom-Header': mockHeaderFn, - }, - }) - const shape1 = new Shape(shapeStream1) - await shape1.value - - expect(mockParamFn).toHaveBeenCalled() - expect(mockHeaderFn).toHaveBeenCalled() - - // Test with async functions - const shapeStream2 = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - customParam: mockAsyncParamFn, - }, - headers: { - 'X-Custom-Header': mockAsyncHeaderFn, - }, - }) - const shape2 = new Shape(shapeStream2) - await shape2.value - - expect(mockAsyncParamFn).toHaveBeenCalled() - expect(mockAsyncHeaderFn).toHaveBeenCalled() - - // Verify the resolved values - expect(await resolveValue(mockParamFn())).toBe(`test-value`) - expect(await resolveValue(mockAsyncParamFn())).toBe(`test-value`) - }) - - it(`should support forceDisconnectAndRefresh() to force a sync`, async ({ - issuesTableUrl, - insertIssues, - updateIssue, - waitForIssues, - aborter, - }) => { - // Create initial data - const [id] = await insertIssues({ title: `initial title` }) - await waitForIssues({ numChangesExpected: 1 }) - - // Track fetch requests - const pendingRequests: Array< - [string | URL | Request, () => Promise] - > = [] - - const resolveRequests = async () => { - await Promise.all(pendingRequests.map(([_, doFetch]) => doFetch())) - pendingRequests.length = 0 // Clear the array + ) + + it.for(fetchAndSse)( + `should expose isLoading status (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (input, init) => { + await sleep(20) + return fetch(input, init) + }, + experimentalLiveSse, + }) + + expect(shapeStream.isLoading()).true + + await waitForFetch(shapeStream) + + expect(shapeStream.isLoading()).false } + ) - const fetchClient = async ( - input: string | URL | Request, - init?: RequestInit + it.for(fetchAndSse)( + `should expose lastOffset (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (input, init) => { + await sleep(20) + return fetch(input, init) + }, + experimentalLiveSse, + }) + const shape = new Shape(shapeStream) + + expect(shapeStream.lastOffset).toBe(`-1`) + expect(shape.lastOffset).toBe(shapeStream.lastOffset) + await waitForFetch(shapeStream) + + shape.unsubscribeAll() + } + ) + + it.for(fetchAndSse)( + `should honour replica: full (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { issuesTableUrl, insertIssues, updateIssue, clearIssuesShape, aborter } ) => { - const signal = init?.signal - return new Promise((resolve, reject) => { - signal?.addEventListener( - `abort`, + const [id] = await insertIssues({ title: `first title` }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + replica: `full`, + }, + experimentalLiveSse, + signal: aborter.signal, + }) + + let unsub: () => void = () => {} + try { + const lastMsgs: Message[] = [] + unsub = shapeStream.subscribe((msgs) => { + lastMsgs.push(...msgs) + }) + + await vi.waitFor(() => { + const msg = lastMsgs.shift() + expect(msg?.headers.control).toEqual(`up-to-date`) + }) + + const expectedValue = { + id: id, + title: `updated title`, + // because we're sending the full row, the update will include the + // unchanged `priority` column + priority: 10, + } + await updateIssue({ id: id, title: `updated title` }) + + await vi.waitFor( () => { - reject(new Error(`AbortError`)) + const msg = lastMsgs.shift() + if (!msg) throw new Error(`Update message not yet received`) + const changeMsg: ChangeMessage = msg as ChangeMessage + expect(changeMsg.headers.operation).toEqual(`update`) + expect(changeMsg.value).toEqual(expectedValue) }, - { once: true } + { timeout: 2000 } ) - console.log(input) - pendingRequests.push([ - input, - async () => { - try { - const response = await fetch(input, init) - resolve(response) - } catch (e) { - reject(e) - } - }, - ]) + } finally { + unsub() + // the normal cleanup doesn't work because our shape definition is + // changed by the updates: 'full' param + await clearIssuesShape(shapeStream.shapeHandle) + } + } + ) + + it.for(fetchAndSse)( + `should support function-based params and headers (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const mockParamFn = vi.fn().mockReturnValue(`test-value`) + const mockAsyncParamFn = vi.fn().mockResolvedValue(`test-value`) + const mockHeaderFn = vi.fn().mockReturnValue(`test-value`) + const mockAsyncHeaderFn = vi.fn().mockResolvedValue(`test-value`) + + // Test with synchronous functions + const shapeStream1 = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + customParam: mockParamFn, + }, + headers: { + 'X-Custom-Header': mockHeaderFn, + }, + experimentalLiveSse, }) + const shape1 = new Shape(shapeStream1) + await shape1.value + + expect(mockParamFn).toHaveBeenCalled() + expect(mockHeaderFn).toHaveBeenCalled() + + // Test with async functions + const shapeStream2 = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + customParam: mockAsyncParamFn, + }, + headers: { + 'X-Custom-Header': mockAsyncHeaderFn, + }, + experimentalLiveSse, + }) + const shape2 = new Shape(shapeStream2) + await shape2.value + + expect(mockAsyncParamFn).toHaveBeenCalled() + expect(mockAsyncHeaderFn).toHaveBeenCalled() + + // Verify the resolved values + expect(await resolveValue(mockParamFn())).toBe(`test-value`) + expect(await resolveValue(mockAsyncParamFn())).toBe(`test-value`) } + ) - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - fetchClient, - }) - - // Subscribe to start the stream - const shape = new Shape(shapeStream) - - // Wait for initial fetch to start: offset: -1 - await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) - expect(pendingRequests[0][0].toString()).toContain(`offset=-1`) - - // Complete initial fetch - await resolveRequests() - - // Wait for second fetch to start: offset: 0_0 - await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) - expect(pendingRequests[0][0].toString()).toContain(`offset=0_0`) - - // Complete second fetch - await resolveRequests() - - // We should be in live mode - await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) - expect(pendingRequests[0][0].toString()).toContain(`live=true`) - - // Update data while stream is long polling and ensure it has been processed - await updateIssue({ id, title: `updated title` }) - await waitForIssues({ - numChangesExpected: 1, - shapeStreamOptions: { - offset: shapeStream.lastOffset, - handle: shapeStream.shapeHandle, - }, - }) - - // Start refresh - const refreshPromise = shapeStream.forceDisconnectAndRefresh() - - // Verify the long polling request was aborted and a new request started - await vi.waitFor(() => expect(pendingRequests.length).toBe(2)) - expect(pendingRequests.length).toBe(2) // Aborted long poll + refresh request - expect(pendingRequests[0][0].toString()).toContain(`live=true`) // The aborted long poll - expect(pendingRequests[1][0].toString()).not.toContain(`live=true`) // The refresh request - - // Complete refresh request - // This will abort the long poll and start a new one - await resolveRequests() - - // Wait for the refresh to complete, this resolves once the next request - // after calling forceDisconnectAndRefresh() has completed - await refreshPromise - - // Verify we got the updated data - expect(shape.currentRows).toEqual([ - { - id, - title: `updated title`, - priority: 10, - }, - ]) - - // Verify we return to normal processing (long polling) - await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) // New long poll - expect(pendingRequests[0][0].toString()).toContain(`live=true`) - }) + it.for(fetchAndSse)( + `should support forceDisconnectAndRefresh() to force a sync (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { issuesTableUrl, insertIssues, updateIssue, waitForIssues, aborter } + ) => { + // Create initial data + const [id] = await insertIssues({ title: `initial title` }) + await waitForIssues({ numChangesExpected: 1 }) + + // Track fetch requests + let pendingRequests: Array< + [string | URL | Request, () => Promise] + > = [] + + const resolveRequests = async () => { + for (const [_, doFetch] of pendingRequests) { + await doFetch() + } + pendingRequests = [] // clear the array + } + + const fetchClient = async ( + input: string | URL | Request, + init?: RequestInit + ) => { + const signal = init?.signal + return new Promise((resolve, reject) => { + signal?.addEventListener( + `abort`, + () => { + reject(new Error(`AbortError`)) + }, + { once: true } + ) + pendingRequests.push([ + input, + async () => { + try { + const response = await fetch(input, init) + resolve(response) + } catch (e) { + reject(e) + } + }, + ]) + }) + } + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + fetchClient, + experimentalLiveSse, + }) + + // Subscribe to start the stream + const shape = new Shape(shapeStream) + + // Wait for initial fetch to start: offset: -1 + await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) + expect(pendingRequests[0][0].toString()).toContain(`offset=-1`) + + // Complete initial fetch + await resolveRequests() + + // Wait for second fetch to start: offset: 0_0 + await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) + expect(pendingRequests[0][0].toString()).toContain(`offset=0_0`) + + // Complete second fetch + await resolveRequests() + + // We should be in live mode + await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) + expect(pendingRequests[0][0].toString()).toContain(`live=true`) + + // Update data while stream is long polling and ensure it has been processed + await updateIssue({ id, title: `updated title` }) + await waitForIssues({ + numChangesExpected: 1, + shapeStreamOptions: { + offset: shapeStream.lastOffset, + handle: shapeStream.shapeHandle, + }, + }) + + // Start refresh + const refreshPromise = shapeStream.forceDisconnectAndRefresh() + + // Verify the long polling request was aborted and a new request started + await vi.waitFor(() => expect(pendingRequests.length).toBe(2)) + expect(pendingRequests.length).toBe(2) // Aborted long poll + refresh request + expect(pendingRequests[0][0].toString()).toContain(`live=true`) // The aborted long poll + expect(pendingRequests[1][0].toString()).not.toContain(`live=true`) // The refresh request + + // Complete refresh request + // This will abort the long poll and start a new one + await resolveRequests() + + // Wait for the refresh to complete, this resolves once the next request + // after calling forceDisconnectAndRefresh() has completed + await refreshPromise + + // Verify we got the updated data + expect(shape.currentRows).toEqual([ + { + id, + title: `updated title`, + priority: 10, + }, + ]) + + // Verify we return to normal processing (long polling) + await vi.waitFor(() => expect(pendingRequests.length).toBe(1)) // New long poll + expect(pendingRequests[0][0].toString()).toContain(`live=true`) + } + ) }) describe(`Shape - backwards compatible`, () => { - it(`should set isConnected to false on fetch error and back on true when fetch succeeds again`, async ({ - issuesTableUrl, - }) => { - const shapeStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - fetchClient: async (_input, _init) => { - await sleep(20) - return new Response(null, { - status: 204, - headers: new Headers({ - [`electric-offset`]: `0_0`, - [`electric-handle`]: `foo`, - [`electric-schema`]: ``, - [`electric-cursor`]: `123`, - }), - }) - }, - }) + it.for(fetchAndSse)( + `should set isConnected to false on fetch error and back on true when fetch succeeds again (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl }) => { + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + fetchClient: async (_input, _init) => { + await sleep(20) + return new Response(null, { + status: 204, + headers: new Headers({ + [`electric-offset`]: `0_0`, + [`electric-handle`]: `foo`, + [`electric-schema`]: ``, + [`electric-cursor`]: `123`, + }), + }) + }, + experimentalLiveSse, + }) - const unsubscribe = shapeStream.subscribe(() => unsubscribe()) + const unsubscribe = shapeStream.subscribe(() => unsubscribe()) - await vi.waitFor(() => expect(shapeStream.isConnected()).true) - expect(shapeStream.lastSyncedAt()).closeTo(Date.now(), 200) + await vi.waitFor(() => expect(shapeStream.isConnected()).true) + expect(shapeStream.lastSyncedAt()).closeTo(Date.now(), 200) - await sleep(400) + await sleep(400) - expect(shapeStream.lastSyncedAt()).closeTo(Date.now(), 200) - }) + expect(shapeStream.lastSyncedAt()).closeTo(Date.now(), 200) + } + ) }) function waitForFetch(stream: ShapeStream): Promise { diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 5729bedb69..c125c2e857 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -18,6 +18,11 @@ import * as h from './support/test-helpers' const BASE_URL = inject(`baseUrl`) +const fetchAndSse = [ + { experimentalLiveSse: false }, + { experimentalLiveSse: true }, +] + it(`sanity check`, async ({ dbClient, issuesTableSql }) => { const result = await dbClient.query(`SELECT * FROM ${issuesTableSql}`) @@ -25,102 +30,123 @@ it(`sanity check`, async ({ dbClient, issuesTableSql }) => { }) describe(`HTTP Sync`, () => { - it(`should work with empty shape/table`, async ({ - issuesTableUrl, - aborter, - }) => { - // Get initial data - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - subscribe: false, - signal: aborter.signal, - }) - - await new Promise((resolve, reject) => { - issueStream.subscribe((messages) => { - messages.forEach((message) => { - if (isChangeMessage(message)) { - shapeData.set(message.key, message.value) - } - if (isUpToDateMessage(message)) { - aborter.abort() - return resolve() - } - }) - }, reject) - }) - const values = [...shapeData.values()] + it.for(fetchAndSse)( + `should work with empty shape/table (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl, aborter }) => { + // Get initial data + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + subscribe: false, + signal: aborter.signal, + experimentalLiveSse, + }) - expect(values).toHaveLength(0) - }) + await new Promise((resolve, reject) => { + issueStream.subscribe((messages) => { + messages.forEach((message) => { + if (isChangeMessage(message)) { + shapeData.set(message.key, message.value) + } + if (isUpToDateMessage(message)) { + aborter.abort() + return resolve() + } + }) + }, reject) + }) + const values = [...shapeData.values()] - it(`should wait properly for updates on an empty shape/table`, async ({ - issuesTableUrl, - aborter, - }) => { - const urlsRequested: URL[] = [] - const fetchWrapper = (...args: Parameters) => { - const url = new URL(args[0] instanceof Request ? args[0].url : args[0]) - urlsRequested.push(url) - return fetch(...args) + expect(values).toHaveLength(0) } + ) - // Get initial data - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - fetchClient: fetchWrapper, - }) + it.for(fetchAndSse)( + `should wait properly for updates on an empty shape/table (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { issuesTableUrl, aborter }) => { + const urlsRequested: URL[] = [] + const fetchWrapper = async (...args: Parameters) => { + //console.log('fetch sse', experimentalLiveSse) + const url = new URL(args[0] instanceof Request ? args[0].url : args[0]) + //console.log("url", url) + urlsRequested.push(url) + const res = await fetch(...args) + //console.log("res", res) + return res + } - let upToDateMessageCount = 0 + // Get initial data + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + fetchClient: fetchWrapper, + experimentalLiveSse, + }) - await new Promise((resolve, reject) => { - issueStream.subscribe((messages) => { - messages.forEach((message) => { - if (isChangeMessage(message)) { - shapeData.set(message.key, message.value) - } - if (isUpToDateMessage(message)) { - upToDateMessageCount += 1 - } - }) - }, reject) - - // count updates received over 1 second - proper long polling - // should wait for far longer than this time period - setTimeout(() => { - aborter.abort() - resolve() - }, 1000) - }) + let upToDateMessageCount = 0 + + // TODO: this test fails in SSE mode because we don't use the provided fetchWrapper + // SSE uses the built-in fetch. + // Should fix that + + await new Promise((resolve, reject) => { + issueStream.subscribe((messages) => { + //console.log("sse", experimentalLiveSse) + //console.log("messages", messages) + messages.forEach((message) => { + if (isChangeMessage(message)) { + shapeData.set(message.key, message.value) + } + if (isUpToDateMessage(message)) { + upToDateMessageCount += 1 + } + }) + }, reject) + + // count updates received over 1 second - proper long polling + // should wait for far longer than this time period + setTimeout(() => { + aborter.abort() + resolve() + }, 1000) + }) - // first request was -1, last requests should be live ones - const numRequests = urlsRequested.length - expect(numRequests).toBeGreaterThan(2) - expect(urlsRequested[0].searchParams.get(`offset`)).toBe(`-1`) - expect(urlsRequested[0].searchParams.has(`live`)).false - expect(urlsRequested[numRequests - 1].searchParams.get(`offset`)).not.toBe( - `-1` - ) - expect(urlsRequested[numRequests - 1].searchParams.has(`live`)).true - expect(urlsRequested[numRequests - 1].searchParams.has(`cursor`)).true + // first request was -1, last requests should be live ones + const numRequests = urlsRequested.length + //console.log("urlsRequested", urlsRequested) - // first request comes back immediately and is up to date, second one - // should hang while waiting for updates - expect(upToDateMessageCount).toBe(1) + if (experimentalLiveSse) { + // We expect 3 requests: 2 requests for the initial fetch and the live request (which is 1 request streaming all updates) + expect(numRequests).toBe(3) + } else { + // We expect more than 2 requests: the initial fetch + 1 request per live update + expect(numRequests).toBeGreaterThan(2) + } - // data should be 0 - const values = [...shapeData.values()] - expect(values).toHaveLength(0) - }) + expect(urlsRequested[0].searchParams.get(`offset`)).toBe(`-1`) + expect(urlsRequested[0].searchParams.has(`live`)).false + expect( + urlsRequested[numRequests - 1].searchParams.get(`offset`) + ).not.toBe(`-1`) + expect(urlsRequested[numRequests - 1].searchParams.has(`live`)).true + expect(urlsRequested[numRequests - 1].searchParams.has(`cursor`)).true + + // first request comes back immediately and is up to date, second one + // should hang while waiting for updates + expect(upToDateMessageCount).toBe(1) + + // data should be 0 + const values = [...shapeData.values()] + expect(values).toHaveLength(0) + } + ) it(`returns a header with the server shape handle`, async ({ issuesTableUrl, @@ -144,46 +170,52 @@ describe(`HTTP Sync`, () => { expect(lastOffset).to.exist }) - it(`should get initial data`, async ({ - insertIssues, - issuesTableUrl, - aborter, - }) => { - // Add an initial row. - const uuid = uuidv4() - await insertIssues({ id: uuid, title: `foo + ${uuid}` }) - - // Get initial data - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) + it.for(fetchAndSse)( + `should get initial data (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { insertIssues, issuesTableUrl, aborter } + ) => { + // Add an initial row. + const uuid = uuidv4() + await insertIssues({ id: uuid, title: `foo + ${uuid}` }) - await new Promise((resolve) => { - issueStream.subscribe((messages) => { - messages.forEach((message) => { - if (isChangeMessage(message)) { - shapeData.set(message.key, message.value) - } - if (isUpToDateMessage(message)) { - aborter.abort() - return resolve() - } + // Get initial data + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + + await new Promise((resolve) => { + issueStream.subscribe((messages) => { + messages.forEach((message) => { + if (isChangeMessage(message)) { + shapeData.set(message.key, message.value) + } + if (isUpToDateMessage(message)) { + aborter.abort() + return resolve() + } + }) }) }) - }) - const values = [...shapeData.values()] + const values = [...shapeData.values()] - expect(values).toMatchObject([{ title: `foo + ${uuid}` }]) - }) + expect(values).toMatchObject([{ title: `foo + ${uuid}` }]) + } + ) - mit( - `should parse incoming data`, - async ({ dbClient, aborter, tableSql, tableUrl }) => { + mit.for(fetchAndSse)( + `should parse incoming data (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { dbClient, aborter, tableSql, tableUrl } + ) => { // Create a table with data we want to be parsed await dbClient.query( ` @@ -239,6 +271,7 @@ describe(`HTTP Sync`, () => { table: tableUrl, }, signal: aborter.signal, + experimentalLiveSse, }) const client = new Shape(issueStream) const rows = await client.rows @@ -354,205 +387,221 @@ describe(`HTTP Sync`, () => { } ) - it(`should get initial data and then receive updates`, async ({ - aborter, - issuesTableUrl, - issuesTableKey, - updateIssue, - insertIssues, - }) => { - // With initial data - const rowId = uuidv4() - await insertIssues({ id: rowId, title: `original insert` }) - - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - }) - let secondRowId = `` - await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - if (!isChangeMessage(msg)) return - shapeData.set(msg.key, msg.value) - - if (nth === 0) { - updateIssue({ id: rowId, title: `foo1` }) - } else if (nth === 1) { - ;[secondRowId] = await insertIssues({ title: `foo2` }) - } else if (nth === 2) { - res() - } - }) + it.for(fetchAndSse)( + `should get initial data and then receive updates (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { aborter, issuesTableUrl, issuesTableKey, updateIssue, insertIssues } + ) => { + // With initial data + const rowId = uuidv4() + await insertIssues({ id: rowId, title: `original insert` }) - // Only initial insert has the full row, the update contains only PK & changed columns. - // This test doesn't merge in updates, so we don't have `priority` on the row. - expect(shapeData).toEqual( - new Map([ - [`${issuesTableKey}/"${rowId}"`, { id: rowId, title: `foo1` }], - [ - `${issuesTableKey}/"${secondRowId}"`, - { id: secondRowId, title: `foo2`, priority: 10 }, - ], - ]) - ) - }) - - it(`should wait for processing before advancing stream`, async ({ - aborter, - issuesTableUrl, - insertIssues, - waitForIssues, - }) => { - // With initial data - await insertIssues({ id: uuidv4(), title: `original insert` }) + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + experimentalLiveSse, + }) + let secondRowId = `` + await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { + //console.log("GOT msg:", msg) + //console.log("nth", nth) + //console.log('isChangeMessage', isChangeMessage(msg)) + if (!isChangeMessage(msg)) return + shapeData.set(msg.key, msg.value) - const fetchWrapper = vi - .fn() - .mockImplementation((...args: Parameters) => { - return fetch(...args) + if (nth === 0) { + await updateIssue({ id: rowId, title: `foo1` }) + } else if (nth === 1) { + ;[secondRowId] = await insertIssues({ title: `foo2` }) + } else if (nth === 2) { + res() + } }) - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter.signal, - fetchClient: fetchWrapper, - }) + // Only initial insert has the full row, the update contains only PK & changed columns. + // This test doesn't merge in updates, so we don't have `priority` on the row. + expect(shapeData).toEqual( + new Map([ + [`${issuesTableKey}/"${rowId}"`, { id: rowId, title: `foo1` }], + [ + `${issuesTableKey}/"${secondRowId}"`, + { id: secondRowId, title: `foo2`, priority: 10 }, + ], + ]) + ) + } + ) - let numFetchCalls = 0 + it.for(fetchAndSse)( + `should wait for processing before advancing stream (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { aborter, issuesTableUrl, insertIssues, waitForIssues } + ) => { + // With initial data + await insertIssues({ id: uuidv4(), title: `original insert` }) + + const fetchWrapper = vi + .fn() + .mockImplementation((...args: Parameters) => { + return fetch(...args) + }) - await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - if (!isChangeMessage(msg)) return - shapeData.set(msg.key, msg.value) + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter.signal, + fetchClient: fetchWrapper, + experimentalLiveSse, + }) - if (nth === 0) { - await sleep(100) - numFetchCalls = fetchWrapper.mock.calls.length + let numFetchCalls = 0 - // ensure fetch has not been called again while - // waiting for processing - await insertIssues({ title: `foo1` }) + await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { + if (!isChangeMessage(msg)) return + shapeData.set(msg.key, msg.value) - // independent stream should be able to see this item, - // but the stream we have is waiting - await waitForIssues({ numChangesExpected: 1 }) - expect(fetchWrapper).toHaveBeenCalledTimes(numFetchCalls) - } else if (nth === 1) { - expect(fetchWrapper.mock.calls.length).greaterThan(numFetchCalls) - res() - } - }) - }) + if (nth === 0) { + await sleep(100) + numFetchCalls = fetchWrapper.mock.calls.length - it(`multiple clients can get the same data in parallel`, async ({ - issuesTableUrl, - updateIssue, - insertIssues, - }) => { - const rowId = uuidv4(), - rowId2 = uuidv4() - await insertIssues( - { id: rowId, title: `first original insert` }, - { id: rowId2, title: `second original insert` } - ) + // ensure fetch has not been called again while + // waiting for processing + await insertIssues({ title: `foo1` }) - const shapeData1 = new Map() - const aborter1 = new AbortController() - const issueStream1 = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter1.signal, - }) + // independent stream should be able to see this item, + // but the stream we have is waiting + await waitForIssues({ numChangesExpected: 1 }) + expect(fetchWrapper).toHaveBeenCalledTimes(numFetchCalls) + } else if (nth === 1) { + expect(fetchWrapper.mock.calls.length).greaterThan(numFetchCalls) + res() + } + }) + } + ) - const shapeData2 = new Map() - const aborter2 = new AbortController() - const issueStream2 = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - signal: aborter2.signal, - }) + it.for(fetchAndSse)( + `multiple clients can get the same data in parallel (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { issuesTableUrl, updateIssue, insertIssues } + ) => { + const rowId = uuidv4(), + rowId2 = uuidv4() + await insertIssues( + { id: rowId, title: `first original insert` }, + { id: rowId2, title: `second original insert` } + ) - const p1 = h.forEachMessage(issueStream1, aborter1, (res, msg, nth) => { - if (!isChangeMessage(msg)) return - shapeData1.set(msg.key, msg.value) + const shapeData1 = new Map() + const aborter1 = new AbortController() + const issueStream1 = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter1.signal, + experimentalLiveSse, + }) - if (nth === 1) { - setTimeout(() => updateIssue({ id: rowId, title: `foo3` }), 50) - } else if (nth === 2) { - return res() - } - }) + const shapeData2 = new Map() + const aborter2 = new AbortController() + const issueStream2 = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + signal: aborter2.signal, + experimentalLiveSse, + }) - const p2 = h.forEachMessage(issueStream2, aborter2, (res, msg, nth) => { - if (!isChangeMessage(msg)) return - shapeData2.set(msg.key, msg.value) + const p1 = h.forEachMessage(issueStream1, aborter1, (res, msg, nth) => { + if (!isChangeMessage(msg)) return + shapeData1.set(msg.key, msg.value) - if (nth === 2) { - return res() - } - }) + if (nth === 1) { + setTimeout(() => updateIssue({ id: rowId, title: `foo3` }), 50) + } else if (nth === 2) { + return res() + } + }) - await Promise.all([p1, p2]) + const p2 = h.forEachMessage(issueStream2, aborter2, (res, msg, nth) => { + if (!isChangeMessage(msg)) return + shapeData2.set(msg.key, msg.value) - expect(shapeData1).toEqual(shapeData2) - }) + if (nth === 2) { + return res() + } + }) - it(`can go offline and then catchup`, async ({ - aborter, - issuesTableUrl, - insertIssues, - waitForIssues, - }) => { - // initialize storage for the cases where persisted shape streams are tested - await insertIssues({ title: `foo1` }, { title: `foo2` }, { title: `foo3` }) + await Promise.all([p1, p2]) - const streamState = await waitForIssues({ numChangesExpected: 3 }) + expect(shapeData1).toEqual(shapeData2) + } + ) - const numIssuesToAdd = 9 - await insertIssues( - ...Array.from({ length: numIssuesToAdd }, (_, i) => ({ - title: `foo${i + 5}`, - })) - ) + it.for(fetchAndSse)( + `can go offline and then catchup (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { aborter, issuesTableUrl, insertIssues, waitForIssues } + ) => { + // initialize storage for the cases where persisted shape streams are tested + await insertIssues( + { title: `foo1` }, + { title: `foo2` }, + { title: `foo3` } + ) - // And wait until it's definitely seen - await waitForIssues({ - shapeStreamOptions: streamState, - numChangesExpected: numIssuesToAdd, - }) + const streamState = await waitForIssues({ numChangesExpected: 3 }) - let catchupOpsCount = 0 - const newIssueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - subscribe: true, - signal: aborter.signal, - offset: streamState.offset, - handle: streamState.handle, - }) + const numIssuesToAdd = 9 + await insertIssues( + ...Array.from({ length: numIssuesToAdd }, (_, i) => ({ + title: `foo${i + 5}`, + })) + ) - await h.forEachMessage(newIssueStream, aborter, (res, msg, nth) => { - if (isUpToDateMessage(msg)) { - res() - } else { - catchupOpsCount = nth + 1 - } - }) + // And wait until it's definitely seen + await waitForIssues({ + shapeStreamOptions: streamState, + numChangesExpected: numIssuesToAdd, + }) - expect(catchupOpsCount).toBe(9) - }) + let catchupOpsCount = 0 + const newIssueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + subscribe: true, + signal: aborter.signal, + offset: streamState.offset, + handle: streamState.handle, + experimentalLiveSse, + }) + + await h.forEachMessage(newIssueStream, aborter, (res, msg, nth) => { + if (isUpToDateMessage(msg)) { + res() + } else { + catchupOpsCount = nth + 1 + } + }) + + expect(catchupOpsCount).toBe(9) + } + ) it(`should return correct caching headers`, async ({ issuesTableUrl, @@ -673,53 +722,63 @@ describe(`HTTP Sync`, () => { expect(catchupStatus).toEqual(304) }) - it(`should correctly use a where clause for initial sync and updates`, async ({ - insertIssues, - updateIssue, - issuesTableUrl, - issuesTableKey, - clearShape, - aborter, - }) => { - // Add an initial rows - const id1 = uuidv4() - const id2 = uuidv4() - - await insertIssues({ id: id1, title: `foo` }, { id: id2, title: `bar` }) - - // Get initial data - const shapeData = new Map() - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - where: `title LIKE 'foo%'`, - }, - signal: aborter.signal, - }) + it.for(fetchAndSse)( + `should correctly use a where clause for initial sync and updates (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { + insertIssues, + updateIssue, + issuesTableUrl, + issuesTableKey, + clearShape, + aborter, + } + ) => { + // Add an initial rows + const id1 = uuidv4() + const id2 = uuidv4() - await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - if (!isChangeMessage(msg)) return - shapeData.set(msg.key, msg.value) + await insertIssues({ id: id1, title: `foo` }, { id: id2, title: `bar` }) - if (nth === 0) { - updateIssue({ id: id1, title: `foo1` }) - updateIssue({ id: id2, title: `bar1` }) - } else if (nth === 1) { - res() - } - }) + // Get initial data + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + where: `title LIKE 'foo%'`, + }, + signal: aborter.signal, + experimentalLiveSse, + }) - await clearShape(issuesTableUrl, { handle: issueStream.shapeHandle! }) + await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { + if (!isChangeMessage(msg)) return + shapeData.set(msg.key, msg.value) - expect(shapeData).toEqual( - new Map([[`${issuesTableKey}/"${id1}"`, { id: id1, title: `foo1` }]]) - ) - }) + if (nth === 0) { + updateIssue({ id: id1, title: `foo1` }) + updateIssue({ id: id2, title: `bar1` }) + } else if (nth === 1) { + res() + } + }) + + await clearShape(issuesTableUrl, { handle: issueStream.shapeHandle! }) + + expect(shapeData).toEqual( + new Map([[`${issuesTableKey}/"${id1}"`, { id: id1, title: `foo1` }]]) + ) + } + ) - mit( - `should correctly select columns for initial sync and updates`, - async ({ dbClient, aborter, tableSql, tableUrl }) => { + mit.for(fetchAndSse)( + `should correctly select columns for initial sync and updates (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { dbClient, aborter, tableSql, tableUrl } + ) => { await dbClient.query( `INSERT INTO ${tableSql} (txt, i2, i4, i8) VALUES ($1, $2, $3, $4)`, [`test1`, 1, 10, 100] @@ -734,6 +793,7 @@ describe(`HTTP Sync`, () => { columns: [`txt`, `i2`, `i4`], }, signal: aborter.signal, + experimentalLiveSse, }) await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { if (!isChangeMessage(msg)) return @@ -828,189 +888,192 @@ describe(`HTTP Sync`, () => { } }) - it(`should handle invalid requests by terminating stream`, async ({ - expect, - issuesTableUrl, - aborter, - waitForIssues, - }) => { - const streamState = await waitForIssues({ numChangesExpected: 0 }) - - let error: Error - const invalidIssueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - where: `1 x 1`, // invalid SQL - }, - signal: aborter.signal, - handle: streamState.handle, - onError: (err) => { - error = err - }, - }) + it.for(fetchAndSse)( + `should handle invalid requests by terminating stream (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { expect, issuesTableUrl, aborter, waitForIssues } + ) => { + const streamState = await waitForIssues({ numChangesExpected: 0 }) - const errorSubscriberPromise = new Promise((_, reject) => - invalidIssueStream.subscribe(() => {}, reject) - ) + let error: Error + const invalidIssueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + where: `1 x 1`, // invalid SQL + }, + signal: aborter.signal, + handle: streamState.handle, + onError: (err) => { + error = err + }, + experimentalLiveSse, + }) - await expect(errorSubscriberPromise).rejects.toThrow(FetchError) - expect(invalidIssueStream.error).instanceOf(FetchError) - expect((invalidIssueStream.error! as FetchError).status).toBe(400) - expect(invalidIssueStream.isConnected()).false - expect((error! as FetchError).json).toStrictEqual({ - message: `Invalid request`, - errors: { - where: [`At location 17: syntax error at or near "x"`], - }, - }) - }) + const errorSubscriberPromise = new Promise((_, reject) => + invalidIssueStream.subscribe(() => {}, reject) + ) - it(`should handle invalid requests by terminating stream`, async ({ - expect, - issuesTableUrl, - aborter, - }) => { - let error: Error - const invalidIssueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - where: `1=1`, - }, - signal: aborter.signal, - // handle: streamState.handle, - onError: (err) => { - error = err - }, - fetchClient: async (...args) => { - const res = await fetch(...args) - await res.text() - return res - }, - }) + await expect(errorSubscriberPromise).rejects.toThrow(FetchError) + expect(invalidIssueStream.error).instanceOf(FetchError) + expect((invalidIssueStream.error! as FetchError).status).toBe(400) + expect(invalidIssueStream.isConnected()).false + expect((error! as FetchError).json).toStrictEqual({ + message: `Invalid request`, + errors: { + where: [`At location 17: syntax error at or near "x"`], + }, + }) + } + ) - const errorSubscriberPromise = new Promise((_, reject) => - invalidIssueStream.subscribe(() => {}, reject) - ) + it.for(fetchAndSse)( + `should handle invalid requests by terminating stream (liveSSE=$experimentalLiveSse)`, + async ({ experimentalLiveSse }, { expect, issuesTableUrl, aborter }) => { + let error: Error + const invalidIssueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + where: `1=1`, + }, + signal: aborter.signal, + // handle: streamState.handle, + onError: (err) => { + error = err + }, + fetchClient: async (...args) => { + const res = await fetch(...args) + await res.text() + return res + }, + experimentalLiveSse, + }) - await expect(errorSubscriberPromise).rejects.toThrow(FetchError) - expect(invalidIssueStream.error).instanceOf(FetchError) - expect(invalidIssueStream.isConnected()).false - expect(error!.message).contains( - `Body is unusable: Body has already been read` - ) - }) + const errorSubscriberPromise = new Promise((_, reject) => + invalidIssueStream.subscribe(() => {}, reject) + ) - it(`should detect shape deprecation and restart syncing`, async ({ - expect, - insertIssues, - issuesTableUrl, - aborter, - clearIssuesShape, - }) => { - // With initial data - const rowId = uuidv4(), - rowId2 = uuidv4() - await insertIssues({ id: rowId, title: `foo1` }) + await expect(errorSubscriberPromise).rejects.toThrow(FetchError) + expect(invalidIssueStream.error).instanceOf(FetchError) + expect(invalidIssueStream.isConnected()).false + expect(error!.message).contains( + `Body is unusable: Body has already been read` + ) + } + ) - const statusCodesReceived: number[] = [] - let numRequests = 0 + it.for(fetchAndSse)( + `should detect shape deprecation and restart syncing (liveSSE=$experimentalLiveSse)`, + async ( + { experimentalLiveSse }, + { expect, insertIssues, issuesTableUrl, aborter, clearIssuesShape } + ) => { + // With initial data + const rowId = uuidv4(), + rowId2 = uuidv4() + await insertIssues({ id: rowId, title: `foo1` }) + + const statusCodesReceived: number[] = [] + let numRequests = 0 + + const fetchWrapper = async (...args: Parameters) => { + // before any subsequent requests after the initial one, ensure + // that the existing shape is deleted and some more data is inserted + if (numRequests === 2) { + await insertIssues({ id: rowId2, title: `foo2` }) + await clearIssuesShape(issueStream.shapeHandle) + } - const fetchWrapper = async (...args: Parameters) => { - // before any subsequent requests after the initial one, ensure - // that the existing shape is deleted and some more data is inserted - if (numRequests === 2) { - await insertIssues({ id: rowId2, title: `foo2` }) - await clearIssuesShape(issueStream.shapeHandle) - } + numRequests++ + const response = await fetch(...args) - numRequests++ - const response = await fetch(...args) + if (response.status < 500) { + statusCodesReceived.push(response.status) + } - if (response.status < 500) { - statusCodesReceived.push(response.status) + return response } - return response - } + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { + table: issuesTableUrl, + }, + subscribe: true, + signal: aborter.signal, + fetchClient: fetchWrapper, + experimentalLiveSse, + }) - const issueStream = new ShapeStream({ - url: `${BASE_URL}/v1/shape`, - params: { - table: issuesTableUrl, - }, - subscribe: true, - signal: aborter.signal, - fetchClient: fetchWrapper, - }) + expect.assertions(12) - expect.assertions(12) - - let originalShapeHandle: string | undefined - let upToDateReachedCount = 0 - await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { - // shapeData.set(msg.key, msg.value) - if (isUpToDateMessage(msg)) { - upToDateReachedCount++ - if (upToDateReachedCount === 1) { - // upon reaching up to date initially, we have one - // response with the initial data - expect(statusCodesReceived).toHaveLength(2) - expect(statusCodesReceived[0]).toBe(200) - expect(statusCodesReceived[1]).toBe(200) - } else if (upToDateReachedCount === 2) { - // the next up to date message should have had - // a 409 interleaved before it that instructed the - // client to go and fetch data from scratch - expect(statusCodesReceived.length).greaterThanOrEqual(5) - expect(statusCodesReceived[2]).toBe(409) - expect(statusCodesReceived[3]).toBe(200) - return res() + let originalShapeHandle: string | undefined + let upToDateReachedCount = 0 + await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { + // shapeData.set(msg.key, msg.value) + if (isUpToDateMessage(msg)) { + upToDateReachedCount++ + if (upToDateReachedCount === 1) { + // upon reaching up to date initially, we have one + // response with the initial data + expect(statusCodesReceived).toHaveLength(2) + expect(statusCodesReceived[0]).toBe(200) + expect(statusCodesReceived[1]).toBe(200) + } else if (upToDateReachedCount === 2) { + // the next up to date message should have had + // a 409 interleaved before it that instructed the + // client to go and fetch data from scratch + expect(statusCodesReceived.length).greaterThanOrEqual(5) + expect(statusCodesReceived[2]).toBe(409) + expect(statusCodesReceived[3]).toBe(200) + return res() + } + return } - return - } - if (!isChangeMessage(msg)) return + if (!isChangeMessage(msg)) return - switch (nth) { - case 0: - // first message is the initial row - expect(msg.value).toEqual({ - id: rowId, - title: `foo1`, - priority: 10, - }) - expect(issueStream.shapeHandle).to.exist - originalShapeHandle = issueStream.shapeHandle - break - case 1: - case 2: - // Second snapshot queries PG without `ORDER BY`, so check that it's generally correct. - // We're checking that both messages arrive by using `expect.assertions(N)` above. - - if (msg.value.id == rowId) { - // message is the initial row again as it is a new shape - // with different shape handle + switch (nth) { + case 0: + // first message is the initial row expect(msg.value).toEqual({ id: rowId, title: `foo1`, priority: 10, }) - expect(issueStream.shapeHandle).not.toBe(originalShapeHandle) - } else { - // should get the second row as well with the new shape handle - expect(msg.value).toEqual({ - id: rowId2, - title: `foo2`, - priority: 10, - }) - expect(issueStream.shapeHandle).not.toBe(originalShapeHandle) - } - break - default: - expect.unreachable(`Received more messages than expected`) - } - }) - }) + expect(issueStream.shapeHandle).to.exist + originalShapeHandle = issueStream.shapeHandle + break + case 1: + case 2: + // Second snapshot queries PG without `ORDER BY`, so check that it's generally correct. + // We're checking that both messages arrive by using `expect.assertions(N)` above. + + if (msg.value.id == rowId) { + // message is the initial row again as it is a new shape + // with different shape handle + expect(msg.value).toEqual({ + id: rowId, + title: `foo1`, + priority: 10, + }) + expect(issueStream.shapeHandle).not.toBe(originalShapeHandle) + } else { + // should get the second row as well with the new shape handle + expect(msg.value).toEqual({ + id: rowId2, + title: `foo2`, + priority: 10, + }) + expect(issueStream.shapeHandle).not.toBe(originalShapeHandle) + } + break + default: + expect.unreachable(`Received more messages than expected`) + } + }) + } + ) }) diff --git a/patches/@microsoft__fetch-event-source.patch b/patches/@microsoft__fetch-event-source.patch new file mode 100644 index 0000000000..2eae7c4855 --- /dev/null +++ b/patches/@microsoft__fetch-event-source.patch @@ -0,0 +1,124 @@ +diff --git a/lib/cjs/fetch.js b/lib/cjs/fetch.js +index ab40f1eeff0ec0a30043e45478f81cd1dc845adb..47be859185ab0276954300f06b1c74eafae62ba7 100644 +--- a/lib/cjs/fetch.js ++++ b/lib/cjs/fetch.js +@@ -26,31 +26,33 @@ function fetchEventSource(input, _a) { + let curRequestController; + function onVisibilityChange() { + curRequestController.abort(); +- if (!document.hidden) { ++ if (typeof document !== 'undefined' && !document.hidden) { + create(); + } + } +- if (!openWhenHidden) { ++ if (typeof document !== 'undefined' && !openWhenHidden) { + document.addEventListener('visibilitychange', onVisibilityChange); + } + let retryInterval = DefaultRetryInterval; + let retryTimer = 0; + function dispose() { +- document.removeEventListener('visibilitychange', onVisibilityChange); +- window.clearTimeout(retryTimer); ++ if (typeof document !== 'undefined') { ++ document.removeEventListener('visibilitychange', onVisibilityChange); ++ } ++ clearTimeout(retryTimer); + curRequestController.abort(); + } + inputSignal === null || inputSignal === void 0 ? void 0 : inputSignal.addEventListener('abort', () => { + dispose(); +- resolve(); + }); + const fetch = inputFetch !== null && inputFetch !== void 0 ? inputFetch : window.fetch; + const onopen = inputOnOpen !== null && inputOnOpen !== void 0 ? inputOnOpen : defaultOnOpen; + async function create() { + var _a; + curRequestController = new AbortController(); ++ const sig = inputSignal.aborted ? inputSignal : curRequestController.signal + try { +- const response = await fetch(input, Object.assign(Object.assign({}, rest), { headers, signal: curRequestController.signal })); ++ const response = await fetch(input, Object.assign(Object.assign({}, rest), { headers, signal: sig })); + await onopen(response); + await parse_1.getBytes(response.body, parse_1.getLines(parse_1.getMessages(id => { + if (id) { +@@ -67,11 +69,14 @@ function fetchEventSource(input, _a) { + resolve(); + } + catch (err) { +- if (!curRequestController.signal.aborted) { ++ if (sig.aborted) { ++ dispose(); ++ reject(err); ++ } else if (!curRequestController.signal.aborted) { + try { + const interval = (_a = onerror === null || onerror === void 0 ? void 0 : onerror(err)) !== null && _a !== void 0 ? _a : retryInterval; +- window.clearTimeout(retryTimer); +- retryTimer = window.setTimeout(create, interval); ++ clearTimeout(retryTimer); ++ retryTimer = setTimeout(create, interval); + } + catch (innerErr) { + dispose(); +diff --git a/lib/esm/fetch.js b/lib/esm/fetch.js +index 31eb2278da53ba79d9fc78ea32d69f2d15f325ff..5c56bd9909f89156e4176b14f07c4e35edf91220 100644 +--- a/lib/esm/fetch.js ++++ b/lib/esm/fetch.js +@@ -23,31 +23,33 @@ export function fetchEventSource(input, _a) { + let curRequestController; + function onVisibilityChange() { + curRequestController.abort(); +- if (!document.hidden) { ++ if (typeof document !== 'undefined' && !document.hidden) { + create(); + } + } +- if (!openWhenHidden) { ++ if (typeof document !== 'undefined' && !openWhenHidden) { + document.addEventListener('visibilitychange', onVisibilityChange); + } + let retryInterval = DefaultRetryInterval; + let retryTimer = 0; + function dispose() { +- document.removeEventListener('visibilitychange', onVisibilityChange); +- window.clearTimeout(retryTimer); ++ if (typeof document !== 'undefined') { ++ document.removeEventListener('visibilitychange', onVisibilityChange); ++ } ++ clearTimeout(retryTimer); + curRequestController.abort(); + } + inputSignal === null || inputSignal === void 0 ? void 0 : inputSignal.addEventListener('abort', () => { + dispose(); +- resolve(); + }); + const fetch = inputFetch !== null && inputFetch !== void 0 ? inputFetch : window.fetch; + const onopen = inputOnOpen !== null && inputOnOpen !== void 0 ? inputOnOpen : defaultOnOpen; + async function create() { + var _a; + curRequestController = new AbortController(); ++ const sig = inputSignal.aborted ? inputSignal : curRequestController.signal + try { +- const response = await fetch(input, Object.assign(Object.assign({}, rest), { headers, signal: curRequestController.signal })); ++ const response = await fetch(input, Object.assign(Object.assign({}, rest), { headers, signal: sig })); + await onopen(response); + await getBytes(response.body, getLines(getMessages(id => { + if (id) { +@@ -64,11 +66,14 @@ export function fetchEventSource(input, _a) { + resolve(); + } + catch (err) { +- if (!curRequestController.signal.aborted) { ++ if (sig.aborted) { ++ dispose(); ++ reject(err); ++ } else if (!curRequestController.signal.aborted) { + try { + const interval = (_a = onerror === null || onerror === void 0 ? void 0 : onerror(err)) !== null && _a !== void 0 ? _a : retryInterval; +- window.clearTimeout(retryTimer); +- retryTimer = window.setTimeout(create, interval); ++ clearTimeout(retryTimer); ++ retryTimer = setTimeout(create, interval); + } + catch (innerErr) { + dispose(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9a1f1ddc67..7c586b9d67 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4,6 +4,11 @@ settings: autoInstallPeers: true excludeLinksFromLockfile: false +patchedDependencies: + '@microsoft/fetch-event-source': + hash: lgwcujj3mimdfutlwueisfm32u + path: patches/@microsoft__fetch-event-source.patch + importers: .: @@ -1317,6 +1322,10 @@ importers: packages/sync-service: {} packages/typescript-client: + dependencies: + '@microsoft/fetch-event-source': + specifier: ^2.0.1 + version: 2.0.1(patch_hash=lgwcujj3mimdfutlwueisfm32u) optionalDependencies: '@rollup/rollup-darwin-arm64': specifier: ^4.18.1 @@ -3123,6 +3132,9 @@ packages: '@mdx-js/mdx@2.3.0': resolution: {integrity: sha512-jLuwRlz8DQfQNiUCJR50Y09CGPq3fLtmtUQfVrj79E0JWu3dvsVcxVIcfhR5h0iXu+/z++zDrYeiJqifRynJkA==} + '@microsoft/fetch-event-source@2.0.1': + resolution: {integrity: sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==} + '@neondatabase/serverless@0.10.4': resolution: {integrity: sha512-2nZuh3VUO9voBauuh+IGYRhGU/MskWHt1IuZvHcJw6GLjDgtqj/KViKo7SIrLdGLdot7vFbiRRw+BgEy3wT9HA==} @@ -11887,6 +11899,8 @@ snapshots: transitivePeerDependencies: - supports-color + '@microsoft/fetch-event-source@2.0.1(patch_hash=lgwcujj3mimdfutlwueisfm32u)': {} + '@neondatabase/serverless@0.10.4': dependencies: '@types/pg': 8.11.6