diff --git a/packages/ws/.npmignore b/packages/ws/.npmignore new file mode 100644 index 00000000..f72befe7 --- /dev/null +++ b/packages/ws/.npmignore @@ -0,0 +1,3 @@ +__test__/ +out/** +src/** diff --git a/packages/ws/README.md b/packages/ws/README.md new file mode 100644 index 00000000..9efb2bce --- /dev/null +++ b/packages/ws/README.md @@ -0,0 +1 @@ +# [View the documentation here.](https://discloud.github.io/discloud.app/modules/_discloudapp_ws.html) diff --git a/packages/ws/esbuild.mjs b/packages/ws/esbuild.mjs new file mode 100644 index 00000000..e8b988f4 --- /dev/null +++ b/packages/ws/esbuild.mjs @@ -0,0 +1,35 @@ +import { context } from "esbuild"; +import { esbuildDefaultPlugins } from "../../esbuild.mjs"; + +async function main() { + const production = process.argv.includes("--production"); + const watch = process.argv.includes("--watch"); + + const ctx = await context({ + entryPoints: ["src/index.ts"], + bundle: true, + format: "cjs", + minify: production, + sourcemap: "inline", + sourcesContent: false, + platform: "node", + outdir: "dist", + logLevel: "warning", + packages: "external", + plugins: esbuildDefaultPlugins, + }); + + if (watch) { + await ctx.watch(); + } else { + await ctx.rebuild(); + await ctx.dispose(); + } +} + +try { + await main(); +} catch (error) { + console.error(error); + process.exit(1); +} diff --git a/packages/ws/package.json b/packages/ws/package.json new file mode 100644 index 00000000..0a481091 --- /dev/null +++ b/packages/ws/package.json @@ -0,0 +1,33 @@ +{ + "name": "@discloudapp/ws", + "version": "0.1.0", + "description": "A WebSocket for discloud.app", + "main": "dist", + "types": "dist/index.d.ts", + "scripts": { + "watch": "npm-run-all -p watch:*", + "watch:esbuild": "node esbuild.mjs --watch", + "watch:tsc": "tsc --noEmit --watch", + "prepublish": "node esbuild.mjs --production && tsc --emitDeclarationOnly --outDir dist", + "release:pre": "npm version pre --legacy-peer-deps --no-git-tag-version && npm run prepublish && npm publish --tag=beta", + "release": "npm run prepublish && npm publish", + "test": "tsc --noEmit && tsc && npm run test:node", + "test:node": "node --test" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/discloud/discloud.app.git" + }, + "keywords": [ + "discloud", + "discloud.app" + ], + "license": "Apache-2.0", + "dependencies": { + "@discloudapp/api-types": "^1.0.2", + "ws": "^8.18.2" + }, + "publishConfig": { + "access": "public" + } +} diff --git a/packages/ws/src/actions/commit.ts b/packages/ws/src/actions/commit.ts new file mode 100644 index 00000000..6b8c9d6b --- /dev/null +++ b/packages/ws/src/actions/commit.ts @@ -0,0 +1,50 @@ +import { type SocketClient } from "../client"; +import { SocketEvents } from "../enum"; +import type { SocketCommitActionOptions, SocketEventCommitData } from "../types"; + +/** *Note* Setting the `options.onError` property prevents promise rejection */ +export function commitAction( + socket: SocketClient, + buffer: Buffer, + options: SocketCommitActionOptions = {}, +) { + return new Promise((resolve, reject) => { + let success = false; + + function onError(error: any) { + if (typeof options.onError === "function") return options.onError(error); + socket.dispose(); + reject(error); + } + + socket + .on(SocketEvents.close, (code, reason) => { + if (typeof options.onClose === "function") options.onClose(code, reason); + resolve(success); + }) + .on(SocketEvents.error, onError) + .on(SocketEvents.connecting, () => { + if (typeof options.onConnecting === "function") options.onConnecting(); + }) + .on(SocketEvents.connected, async () => { + if (typeof options.onConnected === "function") options.onConnected(); + + try { + await socket.sendBuffer(buffer, options.onProgress); + } catch (error: any) { + onError(error); + } + }) + .on(SocketEvents.data, (data) => { + if (typeof options.onData === "function") options.onData(data); + + switch (data.statusCode) { + case 200: + success = true; + break; + } + }) + .connect() + .catch(onError); + }); +} diff --git a/packages/ws/src/actions/upload.ts b/packages/ws/src/actions/upload.ts new file mode 100644 index 00000000..50250103 --- /dev/null +++ b/packages/ws/src/actions/upload.ts @@ -0,0 +1,49 @@ +import { type ApiApp } from "@discloudapp/api-types/v2"; +import { type SocketClient } from "../client"; +import { SocketEvents } from "../enum"; +import type { SocketEventUploadData, SocketUploadActionOptions } from "../types"; + +/** *Note* Setting the `options.onError` property prevents promise rejection */ +export function uploadAction(socket: SocketClient, buffer: Buffer, options?: SocketUploadActionOptions): Promise +export function uploadAction(socket: SocketClient, buffer: Buffer, options?: SocketUploadActionOptions): Promise +export function uploadAction(socket: SocketClient, buffer: Buffer, options: SocketUploadActionOptions = {}) { + return new Promise((resolve, reject) => { + let app: ApiApp; + + function onError(error: any) { + if (typeof options.onError === "function") return options.onError(error); + socket.dispose(); + reject(error); + } + + socket + .on(SocketEvents.close, (code, reason) => { + if (typeof options.onClose === "function") options.onClose(code, reason); + resolve(app); + }) + .on(SocketEvents.error, onError) + .on(SocketEvents.connecting, () => { + if (typeof options.onConnecting === "function") options.onConnecting(); + }) + .on(SocketEvents.connected, async () => { + if (typeof options.onConnected === "function") options.onConnected(); + + try { + await socket.sendBuffer(buffer, options.onProgress); + } catch (error: any) { + onError(error); + } + }) + .on(SocketEvents.data, (data) => { + switch (data.statusCode) { + case 102: + if (typeof options.onData === "function") options.onData(data); + break; + } + + if (data.app) app = data.app; + }) + .connect() + .catch(onError); + }); +} diff --git a/packages/ws/src/client.ts b/packages/ws/src/client.ts new file mode 100644 index 00000000..39b3e6d5 --- /dev/null +++ b/packages/ws/src/client.ts @@ -0,0 +1,228 @@ +import { RouteBases, Routes } from "@discloudapp/api-types/v2"; +import EventEmitter from "events"; +import WebSocket from "ws"; +import { commitAction } from "./actions/commit"; +import { uploadAction } from "./actions/upload"; +import { DEFAULT_CHUNK_SIZE, MAX_FILE_SIZE, NETWORK_UNREACHABLE_CODE, SOCKET_ABNORMAL_CLOSURE, SOCKET_UNAUTHORIZED_CODE } from "./constants"; +import { SocketEvents } from "./enum"; +import { BufferOverflowError } from "./errors/BufferOverflow"; +import ClosedError from "./errors/Closed"; +import { NetworkUnreachableError } from "./errors/NetworkUnreachable"; +import { UnauthorizedError } from "./errors/Unauthorized"; +import { type BufferLike, type OnProgressCallback, type ProgressData, type SocketCommitActionOptions, type SocketEventCommitData, type SocketEventsMap, type SocketEventUploadData, type SocketOptions, type SocketUploadActionOptions } from "./types"; + +export class SocketClient | any[] = Record | any[]> + extends EventEmitter> + implements Disposable { + static async teamCommit(appId: string, buffer: Buffer, options: SocketCommitActionOptions) { + const url = new URL(`${RouteBases.api}/ws${Routes.teamCommit(appId)}`); + + const socket = new SocketClient(url); + + return commitAction(socket, buffer, options); + } + + static async userCommit(appId: string, buffer: Buffer, options: SocketCommitActionOptions) { + const url = new URL(`${RouteBases.api}/ws${Routes.appCommit(appId)}`); + + const socket = new SocketClient(url); + + return commitAction(socket, buffer, options); + } + + static async upload(buffer: Buffer, options: SocketUploadActionOptions) { + const url = new URL(`${RouteBases.api}/ws${Routes.upload()}`); + + const socket = new SocketClient(url); + + return uploadAction(socket, buffer, options); + } + + constructor(protected wsURL: URL, options?: SocketOptions) { + super({ captureRejections: true }); + + if (options) { + if (typeof options.chunkSize === "number") + this._chunkSize = options.chunkSize; + + if (options.connectingTimeout !== undefined) + this._connectingTimeout = options.connectingTimeout; + + if (options.headers) Object.assign(this._headers, options.headers); + } + } + + protected _chunkSize: number = DEFAULT_CHUNK_SIZE; + /** @internal */ + declare protected _connected: boolean; + protected readonly _connectingTimeout: number | null = 10_000; + protected readonly _headers: Record = {}; + declare protected _error?: any; + declare protected _socket?: WebSocket; + declare protected _ping: number; + declare protected _pong: number; + + get ping() { return this._pong; } + + get closed() { return !this._socket || this._socket.readyState === WebSocket.CLOSED; } + get closing() { return this._socket ? this._socket.readyState === WebSocket.CLOSING : false; } + get connected() { return this._socket ? this._socket.readyState === WebSocket.OPEN : false; } + get connecting() { return this._socket ? this._socket.readyState === WebSocket.CONNECTING : false; } + + disconnect() { + if (this._socket) { + this._socket.removeAllListeners().close(); + delete this._socket; + } + } + + dispose() { + this[Symbol.dispose](); + } + + async connect() { + await new Promise((resolve, reject) => { + if (this.connected) return resolve(); + this.#createWebSocket().then(resolve).catch(reject); + }); + } + + async sendAsync(data: BufferLike) { + if (!this.connected) await this.connect(); + + await new Promise((resolve, reject) => { + this._socket!.send(data, (err) => { + if (err) return reject(err); + resolve(); + }); + }); + } + + async sendJSON(value: Record | any[]): Promise { + await this.sendAsync(JSON.stringify(value)); + } + + async sendBuffer(buffer: Buffer, onProgress?: OnProgressCallback) { + if (buffer.length > MAX_FILE_SIZE) throw new BufferOverflowError(buffer.length, MAX_FILE_SIZE); + + /** Number of parts to be sent */ + const total = Math.ceil(buffer.length / this._chunkSize); + /** Size of each part to be sent */ + const chunkSize = Math.ceil(buffer.length / total); + + for (let i = 0; i < total;) { + const offset = chunkSize * i; + const end = offset + chunkSize; + const chunk = buffer.subarray(offset, end); + const current = ++i; + const pending = current < total; + + const data: ProgressData = { chunk, current, offset, pending, total }; + + await this.sendJSON(data); + + await onProgress?.(data); + } + } + + #createWebSocket() { + return new Promise((resolve, reject) => { + if (this.connecting) return this.#waitConnect().then(resolve).catch(reject); + + if (this.connected) return resolve(); + + this.emit(SocketEvents.connecting); + + const options: ConstructorParameters[2] = { + headers: this.#resolveHeaders(this._headers), + ...typeof this._connectingTimeout === "number" + ? { signal: AbortSignal.timeout(this._connectingTimeout) } + : {}, + }; + + this._socket = new WebSocket(this.wsURL, options) + .once("close", (code, reason) => { + queueMicrotask(() => this.dispose()); + + const isConnected = this._connected; + this._connected = false; + + switch (code) { + case SOCKET_ABNORMAL_CLOSURE: + if (isConnected) break; + return reject(new NetworkUnreachableError(reason)); + + case SOCKET_UNAUTHORIZED_CODE: + return reject(new UnauthorizedError(reason)); + } + + if (this._error) { + const error = this._error; + delete this._error; + + switch (error.code) { + case NETWORK_UNREACHABLE_CODE: + return reject(new NetworkUnreachableError(reason)); + } + } + + this.emit(SocketEvents.close, code, reason); + }) + .on("error", (error) => { + this.emit(SocketEvents.error, this._error = error); + }) + .on("message", (data) => { + try { this.emit(SocketEvents.data, JSON.parse(data.toString())); } + catch { this.emit(SocketEvents.message, data); } + }) + .once("open", () => { + this._connected = true; + delete this._error; + + this._ping = Date.now(); + this._socket!.ping(); + + this.emit(SocketEvents.connected); + + resolve(); + }) + .on("ping", () => { + this._ping = Date.now(); + this._socket!.ping(); + }) + .on("pong", () => { + this._pong = Date.now() - this._ping; + }); + }); + } + + async #waitConnect() { + await new Promise((resolve, reject) => { + if (this.connecting) { + const onConnected = () => { + this.off(SocketEvents.close, onClose); + resolve(); + }; + const onClose = (code: number, reason: Buffer) => { + this.off(SocketEvents.connected, onConnected); + reject(new ClosedError(code, reason)); + }; + return this.once(SocketEvents.connected, onConnected).once(SocketEvents.close, onClose); + } + if (this.connected) return resolve(); + reject(this._error); + }); + } + + #resolveHeaders(headers: Record) { + if (!headers["api-token"] && process.env.DISCLOUD_TOKEN) + headers["api-token"] = process.env.DISCLOUD_TOKEN; + + return headers; + } + + [Symbol.dispose]() { + this.disconnect(); + this.removeAllListeners(); + } +} diff --git a/packages/ws/src/constants.ts b/packages/ws/src/constants.ts new file mode 100644 index 00000000..b332c105 --- /dev/null +++ b/packages/ws/src/constants.ts @@ -0,0 +1,22 @@ +/** `256KB` */ +export const DEFAULT_CHUNK_SIZE = 262_144; + +/** `100MB` */ +export const MAX_BUFFER_SIZE = 104_857_600; + +/** `1MB` */ +export const MAX_CHUNK_SIZE = 1_048_576; + +/** `512MB` */ +export const MAX_FILE_SIZE = 536_870_912; + +/** `8KB` */ +export const MIN_CHUNK_SIZE = 8_192; + +export const NETWORK_UNREACHABLE_ERRNO = -3008 as const; + +export const NETWORK_UNREACHABLE_CODE = "ENOTFOUND" as const; + +export const SOCKET_ABNORMAL_CLOSURE = 1006 as const; + +export const SOCKET_UNAUTHORIZED_CODE = 3000 as const; diff --git a/packages/ws/src/enum/events.ts b/packages/ws/src/enum/events.ts new file mode 100644 index 00000000..15c853b7 --- /dev/null +++ b/packages/ws/src/enum/events.ts @@ -0,0 +1,10 @@ +export enum SocketEvents { + close = "close", + connected = "connected", + connecting = "connecting", + connectionFailed = "connectionFailed", + data = "data", + error = "error", + message = "message", + unauthorized = "unauthorized", +} diff --git a/packages/ws/src/enum/index.ts b/packages/ws/src/enum/index.ts new file mode 100644 index 00000000..1784004f --- /dev/null +++ b/packages/ws/src/enum/index.ts @@ -0,0 +1 @@ +export * from "./events"; diff --git a/packages/ws/src/errors/BufferOverflow.ts b/packages/ws/src/errors/BufferOverflow.ts new file mode 100644 index 00000000..2caad1f5 --- /dev/null +++ b/packages/ws/src/errors/BufferOverflow.ts @@ -0,0 +1,10 @@ +export class BufferOverflowError extends Error { + readonly name = "BufferOverflow"; + + constructor( + readonly size: number, + readonly max: number, + ) { + super("Buffer overflow"); + } +} diff --git a/packages/ws/src/errors/Closed.ts b/packages/ws/src/errors/Closed.ts new file mode 100644 index 00000000..c5f95f7f --- /dev/null +++ b/packages/ws/src/errors/Closed.ts @@ -0,0 +1,10 @@ +export default class ClosedError extends Error { + readonly name = "Closed"; + + constructor( + readonly code: number, + readonly reason: Buffer, + ) { + super(); + } +} diff --git a/packages/ws/src/errors/NetworkUnreachable.ts b/packages/ws/src/errors/NetworkUnreachable.ts new file mode 100644 index 00000000..7b707130 --- /dev/null +++ b/packages/ws/src/errors/NetworkUnreachable.ts @@ -0,0 +1,9 @@ +export class NetworkUnreachableError extends Error { + readonly name = "NetworkUnreachable"; + + constructor( + readonly reason: Buffer, + ) { + super("Network unreachable"); + } +} diff --git a/packages/ws/src/errors/Unauthorized.ts b/packages/ws/src/errors/Unauthorized.ts new file mode 100644 index 00000000..336158df --- /dev/null +++ b/packages/ws/src/errors/Unauthorized.ts @@ -0,0 +1,12 @@ +import { SOCKET_UNAUTHORIZED_CODE } from "../constants"; + +export class UnauthorizedError extends Error { + readonly code = SOCKET_UNAUTHORIZED_CODE; + readonly name = "Unauthorized"; + + constructor( + readonly reason: Buffer, + ) { + super("Unauthorized"); + } +} diff --git a/packages/ws/src/index.ts b/packages/ws/src/index.ts new file mode 100644 index 00000000..8829a8ae --- /dev/null +++ b/packages/ws/src/index.ts @@ -0,0 +1,10 @@ +export * from "./client"; +export * from "./constants"; +export * from "./errors/BufferOverflow"; +export * from "./errors/Closed"; +export * from "./errors/NetworkUnreachable"; +export * from "./errors/Unauthorized"; +export * from "./types"; +export * from "./utils"; + +export const version: string = "[VI]{{inject}}[/VI]"; diff --git a/packages/ws/src/types/actions.ts b/packages/ws/src/types/actions.ts new file mode 100644 index 00000000..26fa435b --- /dev/null +++ b/packages/ws/src/types/actions.ts @@ -0,0 +1,21 @@ +import type { OnProgressCallback, SocketEventCommitData, SocketEventUploadData } from "."; + +/** *Note* Setting the `onError` property prevents promise rejection */ +export interface SocketActionOptions { + onClose?: (code: number, reason: Buffer) => unknown + onConnected?: () => unknown + onConnecting?: () => unknown + onData?: (data: Data) => unknown + /** *Note* Setting this property prevents promise rejection */ + onError?: (error: Error) => unknown +} + +/** *Note* Setting the `onError` property prevents promise rejection */ +export interface SocketCommitActionOptions extends SocketActionOptions { + onProgress?: OnProgressCallback +} + +/** *Note* Setting the `onError` property prevents promise rejection */ +export interface SocketUploadActionOptions extends SocketActionOptions { + onProgress?: OnProgressCallback +} diff --git a/packages/ws/src/types/events.ts b/packages/ws/src/types/events.ts new file mode 100644 index 00000000..0f770f9e --- /dev/null +++ b/packages/ws/src/types/events.ts @@ -0,0 +1,10 @@ +import type { RawData } from "ws"; + +export interface SocketEventsMap | any[] = Record | any[]> { + close: [code: number, reason: Buffer] + connected: [] + connecting: [] + data: [data: Data] + error: [error: Error] + message: [data: RawData] +} diff --git a/packages/ws/src/types/index.ts b/packages/ws/src/types/index.ts new file mode 100644 index 00000000..eb3aacf0 --- /dev/null +++ b/packages/ws/src/types/index.ts @@ -0,0 +1,5 @@ +export * from "./actions"; +export * from "./events"; +export * from "./options"; +export * from "./payload"; + diff --git a/packages/ws/src/types/options.ts b/packages/ws/src/types/options.ts new file mode 100644 index 00000000..42ea01a9 --- /dev/null +++ b/packages/ws/src/types/options.ts @@ -0,0 +1,17 @@ +export interface SocketOptions { + /** + * Set the buffer chunk size per message + * + * Note that very large chunks may cause unexpected closure + * + * @default 262_144 (256KB) + */ + chunkSize?: number + /** + * Connecting timeout in milliseconds + * + * @default 10_000 (10 seconds) + */ + connectingTimeout?: number | null + headers?: Record +} diff --git a/packages/ws/src/types/payload.ts b/packages/ws/src/types/payload.ts new file mode 100644 index 00000000..3e598b31 --- /dev/null +++ b/packages/ws/src/types/payload.ts @@ -0,0 +1,36 @@ +import type { ApiApp } from "@discloudapp/api-types/v2"; + +export type BufferLike = Parameters[0] + +export interface SocketEventCommitData { + logs?: string + message: string | null + progress: SocketProgressData + status: "ok" | "error" + statusCode: number +} + +export interface SocketEventUploadData { + app?: ApiApp + logs?: string + message: string | null + progress: SocketProgressData + status: "ok" | "error" + statusCode: number +} + +export interface SocketProgressData { + /** `0 - 100` */ + bar: number + log: string +} + +export interface ProgressData { + chunk: Buffer + current: number + offset: number + pending: boolean + total: number +} + +export type OnProgressCallback = (data: ProgressData) => unknown | Promise diff --git a/packages/ws/src/utils/buffer.ts b/packages/ws/src/utils/buffer.ts new file mode 100644 index 00000000..b75b21b3 --- /dev/null +++ b/packages/ws/src/utils/buffer.ts @@ -0,0 +1,22 @@ +import { DEFAULT_CHUNK_SIZE, MAX_CHUNK_SIZE, MIN_CHUNK_SIZE } from "../constants"; + +/** + * This is a Buffer chunk generator + * + * ```js + * for (const chunk of chunkifyBuffer(buffer, chunkSize)) { + * // ... + * } + * ``` + * + * @param chunkSize + * Limited between `8_192` (`8KB`) and `1_048_576` (`1MB`) + * Default `262_144` (`256KB`) + */ +export function* chunkifyBuffer(buffer: Buffer, chunkSize: number = DEFAULT_CHUNK_SIZE) { + chunkSize = Math.max(MIN_CHUNK_SIZE, Math.min(MAX_CHUNK_SIZE, chunkSize)); + + for (let i = 0; i < buffer.length;) { + yield buffer.subarray(i, i += chunkSize); + } +} diff --git a/packages/ws/src/utils/index.ts b/packages/ws/src/utils/index.ts new file mode 100644 index 00000000..bfea5c78 --- /dev/null +++ b/packages/ws/src/utils/index.ts @@ -0,0 +1 @@ +export * from "./buffer"; diff --git a/packages/ws/tsconfig.json b/packages/ws/tsconfig.json new file mode 100644 index 00000000..659491e8 --- /dev/null +++ b/packages/ws/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.json", + "include": ["src"], + "compilerOptions": { + "outDir": "out" + } +} diff --git a/packages/ws/typedoc.json b/packages/ws/typedoc.json new file mode 100644 index 00000000..be8ce11f --- /dev/null +++ b/packages/ws/typedoc.json @@ -0,0 +1,3 @@ +{ + "entryPoints": ["./src/index.ts"] +} diff --git a/yarn.lock b/yarn.lock index 30caa993..b8646ee9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2713,6 +2713,11 @@ wrap-ansi@^8.1.0: string-width "^5.0.1" strip-ansi "^7.0.1" +ws@^8.18.2: + version "8.18.2" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.2.tgz#42738b2be57ced85f46154320aabb51ab003705a" + integrity sha512-DMricUmwGZUVr++AEAe2uiVM7UoO9MAVZMDu05UQOaUII0lp+zOzLLU4Xqh/JvTqklB1T4uELaaPBKyjE1r4fQ== + yaml@^2.8.1: version "2.8.1" resolved "https://registry.yarnpkg.com/yaml/-/yaml-2.8.1.tgz#1870aa02b631f7e8328b93f8bc574fac5d6c4d79"