diff --git a/src/deferred.ts b/src/deferred.ts new file mode 100644 index 0000000..9cd6bf4 --- /dev/null +++ b/src/deferred.ts @@ -0,0 +1,16 @@ +class Deferred { + promise: Promise; + + resolve!: (value: T | PromiseLike) => void; + + reject!: (reason?: Error) => void; + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + +export { Deferred }; diff --git a/src/realtime/connection.ts b/src/realtime/connection.ts index faeb7d2..557c4dd 100644 --- a/src/realtime/connection.ts +++ b/src/realtime/connection.ts @@ -1,12 +1,21 @@ import { Events } from '@playcanvas/observer'; +import { type } from 'ot-text'; +import * as share from 'sharedb/lib/client/index'; +share.types.register(type); +import { Deferred } from '../deferred'; import { globals as api } from '../globals'; import { Realtime } from '../realtime'; -import { share } from '../sharedb'; const RECONNECT_INTERVAL = 3; const MAX_ATTEMPTS = 3; +type ShareDb = share.Connection & { + bindToSocket: (socket: WebSocket) => void; + startBulk: () => void; + endBulk: () => void; +}; + /** * Handles connecting and communicating with the Realtime server. * @@ -19,19 +28,21 @@ class RealtimeConnection extends Events { private _socket: WebSocket; - private _sharedb: any; + private _sharedb: ShareDb; + + private _reconnectAttempts: number = 0; - private _reconnectAttempts: number; + private _reconnectInterval: number = RECONNECT_INTERVAL; - private _reconnectInterval: number; + private _connected: boolean = false; - private _connected: boolean; + private _authenticated: boolean = false; - private _authenticated: boolean; + private _active: Deferred = new Deferred(); - private _sendQueue: (string | ArrayBuffer | Blob | ArrayBufferView)[] = []; + private _domEvtVisibilityChange: () => void; - private _domEvtVisibilityChange: any; + private _alive: ReturnType | null = null; /** * Constructor @@ -41,15 +52,77 @@ class RealtimeConnection extends Events { constructor(realtime: Realtime) { super(); - this._url = null; this._realtime = realtime; - this._socket = null; - this._sharedb = null; - this._reconnectAttempts = 0; - this._reconnectInterval = RECONNECT_INTERVAL; - this._connected = false; - this._authenticated = false; - this._domEvtVisibilityChange = this._onVisibilityChange.bind(this); + this._domEvtVisibilityChange = () => { + if (document.hidden) { + return; + } + if (!this.connected && this._url) { + this.connect(this._url); + } + }; + } + + private _onauth(socket: WebSocket) { + if (this._sharedb) { + this._sharedb.bindToSocket(socket); + } else { + this._sharedb = new share.Connection(socket) as ShareDb; + this._sharedb.on('error', (err) => { + if (this._sharedb?.state === 'connected') { + return; + } + this._realtime.emit('error', err); + }); + this._sharedb.on('bs error' as any, (msg: any) => { + this._realtime.emit('error:bs', msg); + }); + } + + // reset keep alive + if (this._alive) { + clearInterval(this._alive); + } + this._alive = setInterval(() => { + if (!this._sharedb) { + return; + } + if (this._sharedb.state === 'connected') { + this._sharedb.ping(); + } + }, 1000); + + // intercept messages + const onmessage = socket.onmessage; + socket.onmessage = (msg) => { + try { + // handle auth + if (msg.data.startsWith('auth')) { + if (!this._authenticated) { + this._authenticated = true; + this._realtime.emit('authenticated'); + } + return; + } + + // handle selection + if (msg.data.startsWith('selection')) { + const data = msg.data.slice('selection:'.length); + this._realtime.emit('selection', data); + return; + } + } catch (err) { + console.error(err); + } + onmessage.call(socket, msg); + }; + + // allow sending messages + this._active.resolve(socket); + + // mark as authenticated + this._authenticated = true; + this._realtime.emit('authenticated'); } /** @@ -69,53 +142,62 @@ class RealtimeConnection extends Events { this._realtime.emit('connecting', this._reconnectAttempts); // create new socket - this._socket = new WebSocket(url); - // create new sharedb connection - this._sharedb = new share.Connection(this._socket); - - this._sharedb.on('connected', this._onConnect.bind(this)); - this._sharedb.on('error', this._onError.bind(this)); - this._sharedb.on('bs error', this._onBulkSubscribeError.bind(this)); - - const send = this._socket.send; - this._socket.send = (data) => { - if (this._socket.readyState === WebSocket.OPEN) { - send.call(this._socket, data); - } else { - this._sendQueue.push(data); - } + const socket = new WebSocket(url); + + socket.onopen = () => { + this._connected = true; + this._reconnectAttempts = 0; + + socket.send(`auth${JSON.stringify({ accessToken: api.accessToken })}`); + + this._realtime.emit('connected'); }; - const onmessage = this._socket.onmessage; - this._socket.onmessage = (msg) => { - if (!this._onMessage(msg)) { - onmessage.call(this._socket, msg); + socket.onmessage = (msg) => { + if (msg.data.toString().startsWith('auth')) { + // clear this handler + socket.onmessage = null; + + this._onauth(socket); } }; - const onopen = this._socket.onopen; - this._socket.onopen = (ev) => { - while (this._sendQueue.length) { - this._socket.send(this._sendQueue.shift()); + socket.onclose = (reason) => { + // block sending messages + this._active = new Deferred(); + + // clear keep alive + if (this._alive) { + clearInterval(this._alive); + this._alive = null; } - onopen.call(this._socket, ev); - }; - const onclose = this._socket.onclose; - this._socket.onclose = (reason) => { - this._onClose(reason, onclose.bind(this._socket)); + this._connected = false; + this._authenticated = false; + + this._realtime.emit('disconnect', reason); + + this._realtime.emit('nextAttempt', this._reconnectInterval); + + if (!document.hidden) { + setTimeout(() => { + this.connect(this._url); + }, this._reconnectInterval * 1000); + } }; document.addEventListener('visibilitychange', this._domEvtVisibilityChange); + + this._socket = socket; } /** * Disconnect from the server */ disconnect() { - if (this._socket) { - this._socket.close(); - } + this._sharedb?.close(); + + this._socket?.close(); document.removeEventListener('visibilitychange', this._domEvtVisibilityChange); } @@ -135,10 +217,9 @@ class RealtimeConnection extends Events { * * @param data - The message data */ - send(data: string) { - if (this._socket && this._socket.readyState === WebSocket.OPEN) { - this._socket.send(data); - } + async send(data: string) { + const socket = await this._active.promise; + socket.send(data); } /** @@ -166,98 +247,6 @@ class RealtimeConnection extends Events { this._sharedb.endBulk(); } - private _onVisibilityChange() { - if (document.hidden) return; - if (!this.connected && this._url) { - this.connect(this._url); - } - } - - private _onConnect() { - this._connected = true; - this._reconnectAttempts = 0; - this._reconnectInterval = RECONNECT_INTERVAL; - this._sendAuth(); - this._realtime.emit('connected'); - } - - private _onError(msg: any) { - if (this._sharedb.state === 'connected') return; - this._realtime.emit('error', msg); - } - - private _onBulkSubscribeError(msg: any) { - this._realtime.emit('error:bs', msg); - } - - private _onClose(reason: CloseEvent, originalOnClose: () => void) { - this._connected = false; - this._authenticated = false; - - this._realtime.emit('disconnect', reason); - originalOnClose(); - - this._realtime.emit('nextAttempt', this._reconnectInterval); - - if (!document.hidden) { - setTimeout(() => { - this.connect(this._url); - }, this._reconnectInterval * 1000); - } - } - - private _onMessage(msg: MessageEvent) { - try { - if (msg.data.startsWith('auth')) { - return this._onMessageAuth(msg); - } else if (msg.data.startsWith('selection')) { - return this._onMessageSelection(msg); - } - - return false; - } catch (err) { - console.error(err); - } - } - - private _onMessageAuth(_msg: MessageEvent) { - if (!this._authenticated) { - this._authenticated = true; - this._realtime.emit('authenticated'); - } - - return true; - } - - private _onMessageSelection(msg: MessageEvent) { - const data = msg.data.slice('selection:'.length); - this._realtime.emit('selection', data); - return true; - } - - private _onMessageFs(msg: { data: any; }) { - let data = msg.data.slice('fs:'.length); - const ind = data.indexOf(':'); - if (ind !== -1) { - const op = data.slice(0, ind); - if (op === 'paths') { - data = JSON.parse(data.slice(ind + 1)); - this._realtime.emit('fs:paths', data); - } - - return true; - } - - return false; - } - - private _sendAuth() { - this.sendMessage('auth', { - accessToken: api.accessToken - }); - } - - /** * Whether the user is connected to the server */ diff --git a/src/sharedb.ts b/src/sharedb.ts deleted file mode 100644 index 102de17..0000000 --- a/src/sharedb.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { type } from 'ot-text'; -import * as share from 'sharedb/lib/client/index'; -share.types.register(type); - -export { share };