From 729c37d2fb67aa94837c94264e875a94a1391856 Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 16:35:12 +0100 Subject: [PATCH 1/8] fix: folded all methods into connection class --- src/realtime/connection.ts | 176 +++++++++++++++---------------------- src/sharedb.ts | 5 -- 2 files changed, 73 insertions(+), 108 deletions(-) delete mode 100644 src/sharedb.ts diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index faeb7d2..8a6ccde 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -1,12 +1,20 @@ import { Events } from '@playcanvas/observer'; +import { type } from 'ot-text'; +import * as share from 'sharedb/lib/client/index'; +share.types.register(type); import { globals as api } from '../globals'; import { Realtime } from '../realtime'; -import { share } from '../sharedb'; const RECONNECT_INTERVAL = 3; const MAX_ATTEMPTS = 3; +type ShareDb = share.Connection & { + bindToSocket: (socket: WebSocket) => void; + startBulk: () => void; + endBulk: () => void; +}; + /** * Handles connecting and communicating with the Realtime server. * @@ -19,7 +27,7 @@ class RealtimeConnection extends Events { private _socket: WebSocket; - private _sharedb: any; + private _sharedb: ShareDb; private _reconnectAttempts: number; @@ -31,7 +39,7 @@ class RealtimeConnection extends Events { private _sendQueue: (string | ArrayBuffer | Blob | ArrayBufferView)[] = []; - private _domEvtVisibilityChange: any; + private _domEvtVisibilityChange: () => void; /** * Constructor @@ -49,7 +57,15 @@ class RealtimeConnection extends Events { this._reconnectInterval = RECONNECT_INTERVAL; this._connected = false; this._authenticated = false; - this._domEvtVisibilityChange = this._onVisibilityChange.bind(this); + + this._domEvtVisibilityChange = () => { + if (document.hidden) { + return; + } + if (!this.connected && this._url) { + this.connect(this._url); + } + }; } /** @@ -71,11 +87,28 @@ class RealtimeConnection extends Events { // create new socket this._socket = new WebSocket(url); // create new sharedb connection - this._sharedb = new share.Connection(this._socket); + this._sharedb = new share.Connection(this._socket) as ShareDb; - this._sharedb.on('connected', this._onConnect.bind(this)); - this._sharedb.on('error', this._onError.bind(this)); - this._sharedb.on('bs error', this._onBulkSubscribeError.bind(this)); + this._sharedb.on('connected', () => { + this._connected = true; + this._reconnectAttempts = 0; + this._reconnectInterval = RECONNECT_INTERVAL; + + this.sendMessage('auth', { + accessToken: api.accessToken + }); + + this._realtime.emit('connected'); + }); + this._sharedb.on('error', (msg: any) => { + if (this._sharedb.state === 'connected') { + return; + } + this._realtime.emit('error', msg); + }); + this._sharedb.on('bs error' as any, (msg: any) => { + this._realtime.emit('error:bs', msg); + }); const send = this._socket.send; this._socket.send = (data) => { @@ -88,9 +121,26 @@ class RealtimeConnection extends Events { const onmessage = this._socket.onmessage; this._socket.onmessage = (msg) => { - if (!this._onMessage(msg)) { - onmessage.call(this._socket, msg); + try { + // handle auth + if (msg.data.startsWith('auth')) { + if (!this._authenticated) { + this._authenticated = true; + this._realtime.emit('authenticated'); + } + return; + } + + // handle selection + if (msg.data.startsWith('selection')) { + const data = msg.data.slice('selection:'.length); + this._realtime.emit('selection', data); + return; + } + } catch (err) { + console.error(err); } + onmessage.call(this._socket, msg); }; const onopen = this._socket.onopen; @@ -103,7 +153,19 @@ class RealtimeConnection extends Events { const onclose = this._socket.onclose; this._socket.onclose = (reason) => { - this._onClose(reason, onclose.bind(this._socket)); + this._connected = false; + this._authenticated = false; + + this._realtime.emit('disconnect', reason); + onclose.call(this._socket, reason); + + this._realtime.emit('nextAttempt', this._reconnectInterval); + + if (!document.hidden) { + setTimeout(() => { + this.connect(this._url); + }, this._reconnectInterval * 1000); + } }; document.addEventListener('visibilitychange', this._domEvtVisibilityChange); @@ -166,98 +228,6 @@ class RealtimeConnection extends Events { this._sharedb.endBulk(); } - private _onVisibilityChange() { - if (document.hidden) return; - if (!this.connected && this._url) { - this.connect(this._url); - } - } - - private _onConnect() { - this._connected = true; - this._reconnectAttempts = 0; - this._reconnectInterval = RECONNECT_INTERVAL; - this._sendAuth(); - this._realtime.emit('connected'); - } - - private _onError(msg: any) { - if (this._sharedb.state === 'connected') return; - this._realtime.emit('error', msg); - } - - private _onBulkSubscribeError(msg: any) { - this._realtime.emit('error:bs', msg); - } - - private _onClose(reason: CloseEvent, originalOnClose: () => void) { - this._connected = false; - this._authenticated = false; - - this._realtime.emit('disconnect', reason); - originalOnClose(); - - this._realtime.emit('nextAttempt', this._reconnectInterval); - - if (!document.hidden) { - setTimeout(() => { - this.connect(this._url); - }, this._reconnectInterval * 1000); - } - } - - private _onMessage(msg: MessageEvent) { - try { - if (msg.data.startsWith('auth')) { - return this._onMessageAuth(msg); - } else if (msg.data.startsWith('selection')) { - return this._onMessageSelection(msg); - } - - return false; - } catch (err) { - console.error(err); - } - } - - private _onMessageAuth(_msg: MessageEvent) { - if (!this._authenticated) { - this._authenticated = true; - this._realtime.emit('authenticated'); - } - - return true; - } - - private _onMessageSelection(msg: MessageEvent) { - const data = msg.data.slice('selection:'.length); - this._realtime.emit('selection', data); - return true; - } - - private _onMessageFs(msg: { data: any; }) { - let data = msg.data.slice('fs:'.length); - const ind = data.indexOf(':'); - if (ind !== -1) { - const op = data.slice(0, ind); - if (op === 'paths') { - data = JSON.parse(data.slice(ind + 1)); - this._realtime.emit('fs:paths', data); - } - - return true; - } - - return false; - } - - private _sendAuth() { - this.sendMessage('auth', { - accessToken: api.accessToken - }); - } - - /** * Whether the user is connected to the server */ diff --git a/src/sharedb.ts b/src/sharedb.ts deleted file mode 100644 index 102de17..0000000 --- a/src/sharedb.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { type } from 'ot-text'; -import * as share from 'sharedb/lib/client/index'; -share.types.register(type); - -export { share }; From 4e007babdc1ca23dcff3912dc060ecd46225a68b Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 16:46:24 +0100 Subject: [PATCH 2/8] fix: implement Deferred class for managing asynchronous operations in RealtimeConnection --- src/deferred.ts | 16 ++++++++++++++++ src/realtime/connection.ts | 26 ++++++++------------------ 2 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 src/deferred.ts diff --git a/src/deferred.ts b/src/deferred.ts new file mode 100644 index 0000000..9cd6bf4 --- /dev/null +++ b/src/deferred.ts @@ -0,0 +1,16 @@ +class Deferred { + promise: Promise; + + resolve!: (value: T | PromiseLike) => void; + + reject!: (reason?: Error) => void; + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + +export { Deferred }; diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index 8a6ccde..7232524 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -3,6 +3,7 @@ import { type } from 'ot-text'; import * as share from 'sharedb/lib/client/index'; share.types.register(type); +import { Deferred } from '../deferred'; import { globals as api } from '../globals'; import { Realtime } from '../realtime'; @@ -37,7 +38,7 @@ class RealtimeConnection extends Events { private _authenticated: boolean; - private _sendQueue: (string | ArrayBuffer | Blob | ArrayBufferView)[] = []; + private _sendPromise: Deferred; private _domEvtVisibilityChange: () => void; @@ -92,7 +93,6 @@ class RealtimeConnection extends Events { this._sharedb.on('connected', () => { this._connected = true; this._reconnectAttempts = 0; - this._reconnectInterval = RECONNECT_INTERVAL; this.sendMessage('auth', { accessToken: api.accessToken @@ -110,15 +110,6 @@ class RealtimeConnection extends Events { this._realtime.emit('error:bs', msg); }); - const send = this._socket.send; - this._socket.send = (data) => { - if (this._socket.readyState === WebSocket.OPEN) { - send.call(this._socket, data); - } else { - this._sendQueue.push(data); - } - }; - const onmessage = this._socket.onmessage; this._socket.onmessage = (msg) => { try { @@ -145,14 +136,14 @@ class RealtimeConnection extends Events { const onopen = this._socket.onopen; this._socket.onopen = (ev) => { - while (this._sendQueue.length) { - this._socket.send(this._sendQueue.shift()); - } + this._sendPromise.resolve(); onopen.call(this._socket, ev); }; const onclose = this._socket.onclose; this._socket.onclose = (reason) => { + this._sendPromise = new Deferred(); + this._connected = false; this._authenticated = false; @@ -197,10 +188,9 @@ class RealtimeConnection extends Events { * * @param data - The message data */ - send(data: string) { - if (this._socket && this._socket.readyState === WebSocket.OPEN) { - this._socket.send(data); - } + async send(data: string) { + await this._sendPromise; + this._socket.send(data); } /** From 72d4117bc62223ede6f9857554d6929f782c19bb Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 16:48:03 +0100 Subject: [PATCH 3/8] fix: initialize private properties in RealtimeConnection class --- src/realtime/connection.ts | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index 7232524..bcbd327 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -30,15 +30,15 @@ class RealtimeConnection extends Events { private _sharedb: ShareDb; - private _reconnectAttempts: number; + private _reconnectAttempts: number = 0; - private _reconnectInterval: number; + private _reconnectInterval: number = RECONNECT_INTERVAL; - private _connected: boolean; + private _connected: boolean = false; - private _authenticated: boolean; + private _authenticated: boolean = false; - private _sendPromise: Deferred; + private _sendPromise: Deferred = new Deferred(); private _domEvtVisibilityChange: () => void; @@ -50,15 +50,7 @@ class RealtimeConnection extends Events { constructor(realtime: Realtime) { super(); - this._url = null; this._realtime = realtime; - this._socket = null; - this._sharedb = null; - this._reconnectAttempts = 0; - this._reconnectInterval = RECONNECT_INTERVAL; - this._connected = false; - this._authenticated = false; - this._domEvtVisibilityChange = () => { if (document.hidden) { return; From 79a3ae160711a88cf29f2ab65a5a9e6d4a196898 Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 16:49:18 +0100 Subject: [PATCH 4/8] fix: update socket event handlers to manage message sending state --- src/realtime/connection.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index bcbd327..7b4dbb1 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -128,12 +128,15 @@ class RealtimeConnection extends Events { const onopen = this._socket.onopen; this._socket.onopen = (ev) => { + // allow sending messages this._sendPromise.resolve(); + onopen.call(this._socket, ev); }; const onclose = this._socket.onclose; this._socket.onclose = (reason) => { + // block sending messages this._sendPromise = new Deferred(); this._connected = false; From a77273274d8584bf49ff83388e95a7bf69319c8f Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 18:00:00 +0100 Subject: [PATCH 5/8] feat: reworked realtime connection to prioritise auth --- src/realtime/connection.ts | 139 ++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 55 deletions(-) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index 7b4dbb1..e9da3dd 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -38,10 +38,12 @@ class RealtimeConnection extends Events { private _authenticated: boolean = false; - private _sendPromise: Deferred = new Deferred(); + private _active: Deferred = new Deferred(); private _domEvtVisibilityChange: () => void; + private _alive: ReturnType | null = null; + /** * Constructor * @@ -61,6 +63,63 @@ class RealtimeConnection extends Events { }; } + private _onauth(socket: WebSocket) { + if (this._sharedb) { + this._sharedb.bindToSocket(socket); + } else { + this._sharedb = new share.Connection(socket) as ShareDb; + this._sharedb.on('error', (err) => { + if (this._sharedb?.state === 'connected') { + return; + } + this._realtime.emit('error', err); + }); + this._sharedb.on('bs error' as any, (msg: any) => { + this._realtime.emit('error:bs', msg); + }); + } + + // reset keep alive + if (this._alive) { + clearInterval(this._alive); + } + this._alive = setInterval(() => { + this._sharedb?.ping(); + }, 1000); + + // intercept messages + const onmessage = socket.onmessage; + socket.onmessage = (msg) => { + try { + // handle auth + if (msg.data.startsWith('auth')) { + if (!this._authenticated) { + this._authenticated = true; + this._realtime.emit('authenticated'); + } + return; + } + + // handle selection + if (msg.data.startsWith('selection')) { + const data = msg.data.slice('selection:'.length); + this._realtime.emit('selection', data); + return; + } + } catch (err) { + console.error(err); + } + onmessage.call(socket, msg); + }; + + // allow sending messages + this._active.resolve(socket); + + // mark as authenticated + this._authenticated = true; + this._realtime.emit('authenticated'); + } + /** * Connect to the realtime server * @@ -78,72 +137,40 @@ class RealtimeConnection extends Events { this._realtime.emit('connecting', this._reconnectAttempts); // create new socket - this._socket = new WebSocket(url); - // create new sharedb connection - this._sharedb = new share.Connection(this._socket) as ShareDb; + const socket = new WebSocket(url); - this._sharedb.on('connected', () => { + socket.onopen = () => { this._connected = true; this._reconnectAttempts = 0; - this.sendMessage('auth', { - accessToken: api.accessToken - }); + socket.send(`auth${JSON.stringify({ accessToken: api.accessToken })}`); this._realtime.emit('connected'); - }); - this._sharedb.on('error', (msg: any) => { - if (this._sharedb.state === 'connected') { - return; - } - this._realtime.emit('error', msg); - }); - this._sharedb.on('bs error' as any, (msg: any) => { - this._realtime.emit('error:bs', msg); - }); - - const onmessage = this._socket.onmessage; - this._socket.onmessage = (msg) => { - try { - // handle auth - if (msg.data.startsWith('auth')) { - if (!this._authenticated) { - this._authenticated = true; - this._realtime.emit('authenticated'); - } - return; - } - - // handle selection - if (msg.data.startsWith('selection')) { - const data = msg.data.slice('selection:'.length); - this._realtime.emit('selection', data); - return; - } - } catch (err) { - console.error(err); - } - onmessage.call(this._socket, msg); }; - const onopen = this._socket.onopen; - this._socket.onopen = (ev) => { - // allow sending messages - this._sendPromise.resolve(); + socket.onmessage = (msg) => { + if (msg.data.toString().startsWith('auth')) { - onopen.call(this._socket, ev); + socket.onmessage = null; // clear this handler + + this._onauth(socket); + } }; - const onclose = this._socket.onclose; - this._socket.onclose = (reason) => { + socket.onclose = (reason) => { // block sending messages - this._sendPromise = new Deferred(); + this._active = new Deferred(); + + // clear keep alive + if (this._alive) { + clearInterval(this._alive); + this._alive = null; + } this._connected = false; this._authenticated = false; this._realtime.emit('disconnect', reason); - onclose.call(this._socket, reason); this._realtime.emit('nextAttempt', this._reconnectInterval); @@ -155,15 +182,17 @@ class RealtimeConnection extends Events { }; document.addEventListener('visibilitychange', this._domEvtVisibilityChange); + + this._socket = socket; } /** * Disconnect from the server */ disconnect() { - if (this._socket) { - this._socket.close(); - } + this._sharedb?.close(); + + this._socket?.close(); document.removeEventListener('visibilitychange', this._domEvtVisibilityChange); } @@ -184,8 +213,8 @@ class RealtimeConnection extends Events { * @param data - The message data */ async send(data: string) { - await this._sendPromise; - this._socket.send(data); + const socket = await this._active.promise; + socket.send(data); } /** From bbab7475bcb782bf119ec13d6a5e1f95c6548779 Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 19:06:42 +0100 Subject: [PATCH 6/8] fix: clear socket onmessage handler after authentication --- src/realtime/connection.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index e9da3dd..44b6057 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -150,9 +150,7 @@ class RealtimeConnection extends Events { socket.onmessage = (msg) => { if (msg.data.toString().startsWith('auth')) { - socket.onmessage = null; // clear this handler - this._onauth(socket); } }; From 1c87a29ff075ade62fe717bdc6daf523546dd4a3 Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 19:07:02 +0100 Subject: [PATCH 7/8] fix: clarify socket onmessage handler clearing after authentication --- src/realtime/connection.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index 44b6057..a2d1474 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -150,7 +150,9 @@ class RealtimeConnection extends Events { socket.onmessage = (msg) => { if (msg.data.toString().startsWith('auth')) { - socket.onmessage = null; // clear this handler + // clear this handler + socket.onmessage = null; + this._onauth(socket); } }; From 859cc63840bfe7709247ab4dbe4064373f0361af Mon Sep 17 00:00:00 2001 From: kpal Date: Thu, 23 Oct 2025 19:08:27 +0100 Subject: [PATCH 8/8] fix: prevent pinging when sharedb is not connected --- src/realtime/connection.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index a2d1474..557c4dd 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -84,7 +84,12 @@ class RealtimeConnection extends Events { clearInterval(this._alive); } this._alive = setInterval(() => { - this._sharedb?.ping(); + if (!this._sharedb) { + return; + } + if (this._sharedb.state === 'connected') { + this._sharedb.ping(); + } }, 1000); // intercept messages