Skip to content
This repository was archived by the owner on Dec 18, 2025. It is now read-only.
16 changes: 16 additions & 0 deletions src/deferred.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class Deferred<T> {
promise: Promise<T>;

resolve!: (value: T | PromiseLike<T>) => void;

reject!: (reason?: Error) => void;

constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}

export { Deferred };
275 changes: 132 additions & 143 deletions src/realtime/connection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { Events } from '@playcanvas/observer';
import { type } from 'ot-text';
import * as share from 'sharedb/lib/client/index';
share.types.register(type);

import { Deferred } from '../deferred';
import { globals as api } from '../globals';
import { Realtime } from '../realtime';
import { share } from '../sharedb';

const RECONNECT_INTERVAL = 3;
const MAX_ATTEMPTS = 3;

type ShareDb = share.Connection & {
bindToSocket: (socket: WebSocket) => void;
startBulk: () => void;
endBulk: () => void;
};

/**
* Handles connecting and communicating with the Realtime server.
*
Expand All @@ -19,19 +28,21 @@ class RealtimeConnection extends Events {

private _socket: WebSocket;

private _sharedb: any;
private _sharedb: ShareDb;

private _reconnectAttempts: number = 0;

private _reconnectAttempts: number;
private _reconnectInterval: number = RECONNECT_INTERVAL;

private _reconnectInterval: number;
private _connected: boolean = false;

private _connected: boolean;
private _authenticated: boolean = false;

private _authenticated: boolean;
private _active: Deferred<WebSocket> = new Deferred<WebSocket>();

private _sendQueue: (string | ArrayBuffer | Blob | ArrayBufferView)[] = [];
private _domEvtVisibilityChange: () => void;

private _domEvtVisibilityChange: any;
private _alive: ReturnType<typeof setInterval> | null = null;

/**
* Constructor
Expand All @@ -41,15 +52,77 @@ class RealtimeConnection extends Events {
constructor(realtime: Realtime) {
super();

this._url = null;
this._realtime = realtime;
this._socket = null;
this._sharedb = null;
this._reconnectAttempts = 0;
this._reconnectInterval = RECONNECT_INTERVAL;
this._connected = false;
this._authenticated = false;
this._domEvtVisibilityChange = this._onVisibilityChange.bind(this);
this._domEvtVisibilityChange = () => {
if (document.hidden) {
return;
}
if (!this.connected && this._url) {
this.connect(this._url);
}
};
}

private _onauth(socket: WebSocket) {
if (this._sharedb) {
this._sharedb.bindToSocket(socket);
} else {
this._sharedb = new share.Connection(socket) as ShareDb;
this._sharedb.on('error', (err) => {
if (this._sharedb?.state === 'connected') {
return;
}
this._realtime.emit('error', err);
});
this._sharedb.on('bs error' as any, (msg: any) => {
this._realtime.emit('error:bs', msg);
});
}

// reset keep alive
if (this._alive) {
clearInterval(this._alive);
}
this._alive = setInterval(() => {
if (!this._sharedb) {
return;
}
if (this._sharedb.state === 'connected') {
this._sharedb.ping();
}
}, 1000);

// intercept messages
const onmessage = socket.onmessage;
socket.onmessage = (msg) => {
try {
// handle auth
if (msg.data.startsWith('auth')) {
if (!this._authenticated) {
this._authenticated = true;
this._realtime.emit('authenticated');
}
return;
}

// handle selection
if (msg.data.startsWith('selection')) {
const data = msg.data.slice('selection:'.length);
this._realtime.emit('selection', data);
return;
}
} catch (err) {
console.error(err);
}
onmessage.call(socket, msg);
};

// allow sending messages
this._active.resolve(socket);

// mark as authenticated
this._authenticated = true;
this._realtime.emit('authenticated');
}

/**
Expand All @@ -69,53 +142,62 @@ class RealtimeConnection extends Events {
this._realtime.emit('connecting', this._reconnectAttempts);

// create new socket
this._socket = new WebSocket(url);
// create new sharedb connection
this._sharedb = new share.Connection(this._socket);

this._sharedb.on('connected', this._onConnect.bind(this));
this._sharedb.on('error', this._onError.bind(this));
this._sharedb.on('bs error', this._onBulkSubscribeError.bind(this));

const send = this._socket.send;
this._socket.send = (data) => {
if (this._socket.readyState === WebSocket.OPEN) {
send.call(this._socket, data);
} else {
this._sendQueue.push(data);
}
const socket = new WebSocket(url);

socket.onopen = () => {
this._connected = true;
this._reconnectAttempts = 0;

socket.send(`auth${JSON.stringify({ accessToken: api.accessToken })}`);

this._realtime.emit('connected');
};

const onmessage = this._socket.onmessage;
this._socket.onmessage = (msg) => {
if (!this._onMessage(msg)) {
onmessage.call(this._socket, msg);
socket.onmessage = (msg) => {
if (msg.data.toString().startsWith('auth')) {
// clear this handler
socket.onmessage = null;

this._onauth(socket);
}
};

const onopen = this._socket.onopen;
this._socket.onopen = (ev) => {
while (this._sendQueue.length) {
this._socket.send(this._sendQueue.shift());
socket.onclose = (reason) => {
// block sending messages
this._active = new Deferred<WebSocket>();

// clear keep alive
if (this._alive) {
clearInterval(this._alive);
this._alive = null;
}
onopen.call(this._socket, ev);
};

const onclose = this._socket.onclose;
this._socket.onclose = (reason) => {
this._onClose(reason, onclose.bind(this._socket));
this._connected = false;
this._authenticated = false;

this._realtime.emit('disconnect', reason);

this._realtime.emit('nextAttempt', this._reconnectInterval);

if (!document.hidden) {
setTimeout(() => {
this.connect(this._url);
}, this._reconnectInterval * 1000);
}
};

document.addEventListener('visibilitychange', this._domEvtVisibilityChange);

this._socket = socket;
}

/**
* Disconnect from the server
*/
disconnect() {
if (this._socket) {
this._socket.close();
}
this._sharedb?.close();

this._socket?.close();

document.removeEventListener('visibilitychange', this._domEvtVisibilityChange);
}
Expand All @@ -135,10 +217,9 @@ class RealtimeConnection extends Events {
*
* @param data - The message data
*/
send(data: string) {
if (this._socket && this._socket.readyState === WebSocket.OPEN) {
this._socket.send(data);
}
async send(data: string) {
const socket = await this._active.promise;
socket.send(data);
}

/**
Expand Down Expand Up @@ -166,98 +247,6 @@ class RealtimeConnection extends Events {
this._sharedb.endBulk();
}

private _onVisibilityChange() {
if (document.hidden) return;
if (!this.connected && this._url) {
this.connect(this._url);
}
}

private _onConnect() {
this._connected = true;
this._reconnectAttempts = 0;
this._reconnectInterval = RECONNECT_INTERVAL;
this._sendAuth();
this._realtime.emit('connected');
}

private _onError(msg: any) {
if (this._sharedb.state === 'connected') return;
this._realtime.emit('error', msg);
}

private _onBulkSubscribeError(msg: any) {
this._realtime.emit('error:bs', msg);
}

private _onClose(reason: CloseEvent, originalOnClose: () => void) {
this._connected = false;
this._authenticated = false;

this._realtime.emit('disconnect', reason);
originalOnClose();

this._realtime.emit('nextAttempt', this._reconnectInterval);

if (!document.hidden) {
setTimeout(() => {
this.connect(this._url);
}, this._reconnectInterval * 1000);
}
}

private _onMessage(msg: MessageEvent<any>) {
try {
if (msg.data.startsWith('auth')) {
return this._onMessageAuth(msg);
} else if (msg.data.startsWith('selection')) {
return this._onMessageSelection(msg);
}

return false;
} catch (err) {
console.error(err);
}
}

private _onMessageAuth(_msg: MessageEvent<any>) {
if (!this._authenticated) {
this._authenticated = true;
this._realtime.emit('authenticated');
}

return true;
}

private _onMessageSelection(msg: MessageEvent<any>) {
const data = msg.data.slice('selection:'.length);
this._realtime.emit('selection', data);
return true;
}

private _onMessageFs(msg: { data: any; }) {
let data = msg.data.slice('fs:'.length);
const ind = data.indexOf(':');
if (ind !== -1) {
const op = data.slice(0, ind);
if (op === 'paths') {
data = JSON.parse(data.slice(ind + 1));
this._realtime.emit('fs:paths', data);
}

return true;
}

return false;
}

private _sendAuth() {
this.sendMessage('auth', {
accessToken: api.accessToken
});
}


/**
* Whether the user is connected to the server
*/
Expand Down
5 changes: 0 additions & 5 deletions src/sharedb.ts

This file was deleted.