diff --git a/packages/keepalive-ws/README.md b/packages/keepalive-ws/README.md index b8057d9..13554b7 100644 --- a/packages/keepalive-ws/README.md +++ b/packages/keepalive-ws/README.md @@ -1,99 +1,173 @@ -For a TCP-based, node-only solution with a similar API, see [duplex](https://github.com/node-prism/duplex). - # keepalive-ws -A command server and client for simplified WebSocket communication, with builtin ping and latency messaging. +[![NPM version](https://img.shields.io/npm/v/@prsm/keepalive-ws?color=a1b858&label=)](https://www.npmjs.com/package/@prsm/keepalive-ws) -Built for [grove](https://github.com/node-prism/grove), but works anywhere. +A command server and client for simplified WebSocket communication, with built-in ping and latency messaging. Provides reliable, Promise-based communication with automatic reconnection and command queueing. -### Server +For a TCP-based, node-only solution with a similar API, see [duplex](https://github.com/node-prism/duplex). -For node. +## Features + +- **Promise-based API** - All operations return Promises for easy async/await usage +- **Command queueing** - Commands are automatically queued when offline +- **Reliable connections** - Robust error handling and reconnection +- **Bidirectional communication** - Full-duplex WebSocket communication +- **Latency monitoring** - Built-in ping/pong and latency measurement +- **Room-based messaging** - Group connections into rooms for targeted broadcasts +- **Lightweight** - Minimal dependencies + +## Server ```typescript import { KeepAliveServer, WSContext } from "@prsm/keepalive-ws/server"; -const ws = new KeepAliveServer({ - // Where to mount this server and listen to messages. - path: "/", - // How often to send ping messages to connected clients. - pingInterval: 30_000, - // Calculate round-trip time and send latency updates - // to clients every 5s. - latencyInterval: 5_000, +// Create a server instance +const server = new KeepAliveServer({ + port: 8080, + pingInterval: 30000, + latencyInterval: 5000, }); -ws.registerCommand( - "authenticate", - async (c: WSContext<{ token: string >}) => { - const { token } = c.payload; - // use c.payload to authenticate c.connection - return { ok: true, token }; - }, -); +// Register command handlers +server.registerCommand("echo", async (context) => { + return `Echo: ${context.payload}`; +}); -ws.registerCommand( - "throws", - async (c: WSContext) => { - throw new Error("oops"); - }, -); +// Error handling +server.registerCommand("throws", async () => { + throw new Error("Something went wrong"); +}); + +// Room-based messaging +server.registerCommand("join-room", async (context) => { + const { roomName } = context.payload; + server.addToRoom(roomName, context.connection); + server.broadcastRoom(roomName, "user-joined", { + id: context.connection.id + }); + return { success: true }; +}); + +// Broadcasting to all clients +server.registerCommand("broadcast", async (context) => { + server.broadcast("announcement", context.payload); + return { sent: true }; +}); ``` -Extended API: - -- Rooms - - It can be useful to collect connections into rooms. - - - `addToRoom(roomName: string, connection: Connection): void` - - `removeFromRoom(roomName: string, connection: Connection): void` - - `getRoom(roomName: string): Connection[]` - - `clearRoom(roomName: string): void` -- Command middleware -- Broadcasting to: - - all - - `broadcast(command: string, payload: any, connections?: Connection[]): void` - - all connections that share the same IP - - `broadcastRemoteAddress(c: Connection, command: string, payload: any): void` - - rooms - - `broadcastRoom(roomName: string, command: string, payload: any): void` - -### Client - -For the browser. +## Client ```typescript import { KeepAliveClient } from "@prsm/keepalive-ws/client"; -const opts = { - // After 30s (+ maxLatency) of no ping, assume we've disconnected and attempt a - // reconnection if shouldReconnect is true. - // This number should be coordinated with the pingInterval from KeepAliveServer. - pingTimeout: 30_000, - // Try to reconnect whenever we are disconnected. +// Create a client instance +const client = new KeepAliveClient("ws://localhost:8080", { + pingTimeout: 30000, + maxLatency: 2000, shouldReconnect: true, - // This number, added to pingTimeout, is the maximum amount of time - // that can pass before the connection is considered closed. - // In this case, 32s. - maxLatency: 2_000, - // How often to try and connect during reconnection phase. - reconnectInterval: 2_000, - // How many times to try and reconnect before giving up. + reconnectInterval: 2000, maxReconnectAttempts: Infinity, -}; - -const ws = new KeepAliveClient("ws://localhost:8080", opts); - -const { ok, token } = await ws.command("authenticate", { - username: "user", - password: "pass", }); -const result = await ws.command("throws", {}); -// result is: { error: "oops" } +// Connect to the server (returns a Promise) +await client.connect(); -ws.on("latency", (e: CustomEvent<{ latency: number }>) => { - // e.detail.latency is round-trip time in ms +// Using Promise-based API +try { + const response = await client.command("echo", "Hello world", 5000); + console.log("Response:", response); +} catch (error) { + console.error("Error:", error); +} + +// Join a room +await client.command("join-room", { roomName: "lobby" }); + +// Listen for events +client.on("user-joined", (event) => { + console.log("User joined:", event.detail.id); }); + +// Monitor latency +client.on("latency", (event) => { + console.log("Current latency:", event.detail.latency, "ms"); +}); + +// Graceful shutdown +await client.close(); +``` + +## Extended Server API + +### Room Management +```typescript +// Add a connection to a room +server.addToRoom("roomName", connection); + +// Remove a connection from a room +server.removeFromRoom("roomName", connection); + +// Get all connections in a room +const roomConnections = server.getRoom("roomName"); + +// Clear all connections from a room +server.clearRoom("roomName"); +``` + +### Broadcasting +```typescript +// Broadcast to all connections +server.broadcast("eventName", payload); + +// Broadcast to specific connections +server.broadcast("eventName", payload, connections); + +// Broadcast to all connections except one +server.broadcastExclude(connection, "eventName", payload); + +// Broadcast to all connections in a room +server.broadcastRoom("roomName", "eventName", payload); + +// Broadcast to all connections in a room except one +server.broadcastRoomExclude("roomName", "eventName", payload, connection); + +// Broadcast to all connections with the same IP +server.broadcastRemoteAddress(connection, "eventName", payload); +``` + +### Middleware +```typescript +// Global middleware for all commands +server.globalMiddlewares.push(async (context) => { + // Validate authentication, etc. + if (!isAuthenticated(context)) { + throw new Error("Unauthorized"); + } +}); + +// Command-specific middleware +server.registerCommand( + "protected-command", + async (context) => { + return "Protected data"; + }, + [ + async (context) => { + // Command-specific validation + if (!hasPermission(context)) { + throw new Error("Forbidden"); + } + } + ] +); +``` + +## Graceful Shutdown + +```typescript +// Close client connection +await client.close(); + +// Close server +server.close(); ``` diff --git a/packages/keepalive-ws/bun.lockb b/packages/keepalive-ws/bun.lockb index 56b08aa..0e58303 100755 Binary files a/packages/keepalive-ws/bun.lockb and b/packages/keepalive-ws/bun.lockb differ diff --git a/packages/keepalive-ws/package.json b/packages/keepalive-ws/package.json index 90e6dca..ac8f3f0 100644 --- a/packages/keepalive-ws/package.json +++ b/packages/keepalive-ws/package.json @@ -33,6 +33,8 @@ "build:server": "tsup src/server/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/server", "build:client": "tsup src/client/index.ts --format cjs,esm --dts --clean --minify --out-dir dist/client", "build": "npm run build:prep && npm run build:server && npm run build:client", + "test": "vitest run", + "test:watch": "vitest", "release": "bumpp package.json && npm publish --access public" }, "keywords": [], @@ -44,6 +46,7 @@ "@types/ws": "^8.5.3", "bumpp": "^9.1.1", "tsup": "^8.2.4", - "typescript": "^5.5.4" + "typescript": "^5.5.4", + "vitest": "^3.0.9" } } diff --git a/packages/keepalive-ws/src/client/client.ts b/packages/keepalive-ws/src/client/client.ts index 6779547..bb2d387 100644 --- a/packages/keepalive-ws/src/client/client.ts +++ b/packages/keepalive-ws/src/client/client.ts @@ -1,6 +1,12 @@ +import { EventEmitter } from "node:events"; +import { WebSocket } from "ws"; +import { CodeError } from "../common/codeerror"; +import { Status } from "../common/status"; import { Connection } from "./connection"; -type KeepAliveClientOptions = Partial<{ +export { Status } from "../common/status"; + +export type KeepAliveClientOptions = Partial<{ /** * The number of milliseconds to wait before considering the connection closed due to inactivity. * When this happens, the connection will be closed and a reconnect will be attempted if @see KeepAliveClientOptions.shouldReconnect is true. @@ -36,58 +42,129 @@ type KeepAliveClientOptions = Partial<{ maxReconnectAttempts: number; }>; -const defaultOptions = (opts: KeepAliveClientOptions = {}) => { - opts.pingTimeout = opts.pingTimeout ?? 30_000; - opts.maxLatency = opts.maxLatency ?? 2_000; - opts.shouldReconnect = opts.shouldReconnect ?? true; - opts.reconnectInterval = opts.reconnectInterval ?? 2_000; - opts.maxReconnectAttempts = opts.maxReconnectAttempts ?? Infinity; - return opts; -}; - -export class KeepAliveClient extends EventTarget { +export class KeepAliveClient extends EventEmitter { connection: Connection; url: string; - socket: WebSocket; + socket: WebSocket | null = null; pingTimeout: ReturnType; - options: KeepAliveClientOptions; + options: Required; isReconnecting = false; + private _status: Status = Status.OFFLINE; constructor(url: string, opts: KeepAliveClientOptions = {}) { super(); this.url = url; - this.socket = new WebSocket(url); - this.connection = new Connection(this.socket); - this.options = defaultOptions(opts); - this.applyListeners(); + this.connection = new Connection(null); + this.options = { + pingTimeout: opts.pingTimeout ?? 30_000, + maxLatency: opts.maxLatency ?? 2_000, + shouldReconnect: opts.shouldReconnect ?? true, + reconnectInterval: opts.reconnectInterval ?? 2_000, + maxReconnectAttempts: opts.maxReconnectAttempts ?? Infinity, + }; + + this.setupConnectionEvents(); } - get on() { - return this.connection.addEventListener.bind(this.connection); + get status(): Status { + return this._status; } - applyListeners() { - this.connection.addEventListener("connection", () => { - this.heartbeat(); + private setupConnectionEvents(): void { + // Forward relevant events from connection to client + this.connection.on("message", (data) => { + this.emit("message", data); }); - this.connection.addEventListener("close", () => { + this.connection.on("close", () => { + this._status = Status.OFFLINE; + this.emit("close"); this.reconnect(); }); - this.connection.addEventListener("ping", () => { - this.heartbeat(); + this.connection.on("error", (error) => { + this.emit("error", error); }); - this.connection.addEventListener( - "message", - (ev: CustomEventInit) => { - this.dispatchEvent(new CustomEvent("message", ev)); - }, - ); + this.connection.on("ping", () => { + this.heartbeat(); + this.emit("ping"); + }); + + this.connection.on("latency", (data) => { + this.emit("latency", data); + }); } - heartbeat() { + /** + * Connect to the WebSocket server. + * @returns A promise that resolves when the connection is established. + */ + connect(): Promise { + if (this._status === Status.ONLINE) { + return Promise.resolve(); + } + + if ( + this._status === Status.CONNECTING || + this._status === Status.RECONNECTING + ) { + return new Promise((resolve, reject) => { + const onConnect = () => { + this.removeListener("connect", onConnect); + this.removeListener("error", onError); + resolve(); + }; + + const onError = (error: Error) => { + this.removeListener("connect", onConnect); + this.removeListener("error", onError); + reject(error); + }; + + this.once("connect", onConnect); + this.once("error", onError); + }); + } + + this._status = Status.CONNECTING; + + return new Promise((resolve, reject) => { + try { + // Create a new WebSocket connection + this.socket = new WebSocket(this.url); + + // Set up a direct onopen handler to ensure we catch the connection event + this.socket.onopen = () => { + this._status = Status.ONLINE; + this.connection.socket = this.socket; + this.connection.status = Status.ONLINE; + this.connection.applyListeners(); + this.heartbeat(); + + this.emit("connect"); + resolve(); + }; + + // Set up a direct onerror handler for immediate connection errors + this.socket.onerror = (error) => { + this._status = Status.OFFLINE; + reject( + new CodeError( + "WebSocket connection error", + "ECONNECTION", + "ConnectionError", + ), + ); + }; + } catch (error) { + this._status = Status.OFFLINE; + reject(error); + } + }); + } + + heartbeat(): void { clearTimeout(this.pingTimeout); this.pingTimeout = setTimeout(() => { @@ -100,23 +177,45 @@ export class KeepAliveClient extends EventTarget { /** * Disconnect the client from the server. * The client will not attempt to reconnect. - * To reconnect, create a new KeepAliveClient. + * @returns A promise that resolves when the connection is closed. */ - disconnect() { + close(): Promise { this.options.shouldReconnect = false; - if (this.socket) { - this.socket.close(); + if (this._status === Status.OFFLINE) { + return Promise.resolve(); } - clearTimeout(this.pingTimeout); + return new Promise((resolve) => { + const onClose = () => { + this.removeListener("close", onClose); + this._status = Status.OFFLINE; + resolve(); + }; + + this.once("close", onClose); + + clearTimeout(this.pingTimeout); + + if (this.socket) { + this.socket.close(); + } + }); } - private async reconnect() { + /** + * @deprecated Use close() instead + */ + disconnect(): Promise { + return this.close(); + } + + private reconnect(): void { if (!this.options.shouldReconnect || this.isReconnecting) { return; } + this._status = Status.RECONNECTING; this.isReconnecting = true; let attempt = 1; @@ -124,11 +223,14 @@ export class KeepAliveClient extends EventTarget { if (this.socket) { try { this.socket.close(); - } catch (e) {} + } catch (e) { + // Ignore errors during close + } } const connect = () => { this.socket = new WebSocket(this.url); + this.socket.onerror = () => { attempt++; @@ -136,37 +238,56 @@ export class KeepAliveClient extends EventTarget { setTimeout(connect, this.options.reconnectInterval); } else { this.isReconnecting = false; - - this.connection.dispatchEvent(new Event("reconnectfailed")); - this.connection.dispatchEvent(new Event("reconnectionfailed")); + this._status = Status.OFFLINE; + this.emit("reconnectfailed"); } }; this.socket.onopen = () => { this.isReconnecting = false; + this._status = Status.ONLINE; this.connection.socket = this.socket; - + this.connection.status = Status.ONLINE; this.connection.applyListeners(true); + this.heartbeat(); - this.connection.dispatchEvent(new Event("connection")); - this.connection.dispatchEvent(new Event("connected")); - this.connection.dispatchEvent(new Event("connect")); - - this.connection.dispatchEvent(new Event("reconnection")); - this.connection.dispatchEvent(new Event("reconnected")); - this.connection.dispatchEvent(new Event("reconnect")); + this.emit("connect"); + this.emit("reconnect"); }; }; connect(); } - async command( + /** + * Send a command to the server and wait for a response. + * @param command The command name to send + * @param payload The payload to send with the command + * @param expiresIn Timeout in milliseconds + * @param callback Optional callback function + * @returns A promise that resolves with the command result + */ + command( command: string, payload?: any, - expiresIn?: number, - callback?: Function, - ) { + expiresIn: number = 30000, + callback?: (result: any, error?: Error) => void, + ): Promise { + // Ensure we're connected before sending commands + if (this._status !== Status.ONLINE) { + return this.connect() + .then(() => + this.connection.command(command, payload, expiresIn, callback), + ) + .catch((error) => { + if (callback) { + callback(null, error); + return Promise.reject(error); + } + return Promise.reject(error); + }); + } + return this.connection.command(command, payload, expiresIn, callback); } } diff --git a/packages/keepalive-ws/src/client/connection.ts b/packages/keepalive-ws/src/client/connection.ts index 3a4fac5..081a7ee 100644 --- a/packages/keepalive-ws/src/client/connection.ts +++ b/packages/keepalive-ws/src/client/connection.ts @@ -1,261 +1,137 @@ +import { EventEmitter } from "node:events"; +import { WebSocket } from "ws"; +import { CodeError } from "../common/codeerror"; +import { Command, parseCommand, stringifyCommand } from "../common/message"; +import { Status } from "../common/status"; import { IdManager } from "./ids"; -import { Queue, QueueItem } from "./queue"; +import { Queue } from "./queue"; -type Command = { - id?: number; - command: string; - payload?: any; -}; - -type LatencyPayload = { +export type LatencyPayload = { /** Round trip time in milliseconds. */ latency: number; }; -export declare interface Connection extends EventTarget { - addEventListener( - type: "message", - listener: (ev: CustomEvent) => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a connection is made. */ - addEventListener( - type: "connection", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - /** Emits when a connection is made. */ - addEventListener( - type: "connected", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - /** Emits when a connection is made. */ - addEventListener( - type: "connect", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a connection is closed. */ - addEventListener( - type: "close", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - /** Emits when a connection is closed. */ - addEventListener( - type: "closed", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - /** Emits when a connection is closed. */ - addEventListener( - type: "disconnect", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - /** Emits when a connection is closed. */ - addEventListener( - type: "disconnected", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a reconnect event is successful. */ - addEventListener( - type: "reconnect", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a reconnect fails after @see KeepAliveClientOptions.maxReconnectAttempts attempts. */ - addEventListener( - type: "reconnectfailed", - listener: () => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a ping message is received from @see KeepAliveServer from `@prsm/keepalive-ws/server`. */ - addEventListener( - type: "ping", - listener: (ev: CustomEventInit<{}>) => any, - options?: boolean | AddEventListenerOptions, - ): void; - - /** Emits when a latency event is received from @see KeepAliveServer from `@prsm/keepalive-ws/server`. */ - addEventListener( - type: "latency", - listener: (ev: CustomEventInit) => any, - options?: boolean | AddEventListenerOptions, - ): void; - - addEventListener( - type: string, - listener: (ev: CustomEvent) => any, - options?: boolean | AddEventListenerOptions, - ): void; -} - -export class Connection extends EventTarget { - socket: WebSocket; +export class Connection extends EventEmitter { + socket: WebSocket | null = null; ids = new IdManager(); queue = new Queue(); - callbacks: { [id: number]: (error: Error | null, result?: any) => void } = {}; + callbacks: { [id: number]: (result: any, error?: Error) => void } = {}; + status: Status = Status.OFFLINE; - constructor(socket: WebSocket) { + constructor(socket: WebSocket | null) { super(); this.socket = socket; - this.applyListeners(); - } - - /** - * Adds an event listener to the target. - * @param event The name of the event to listen for. - * @param listener The function to call when the event is fired. - * @param options An options object that specifies characteristics about the event listener. - */ - on( - event: string, - listener: (ev: CustomEvent) => any, - options?: boolean | AddEventListenerOptions, - ) { - this.addEventListener(event, listener, options); - } - - /** - * Removes the event listener previously registered with addEventListener. - * @param event A string that specifies the name of the event for which to remove an event listener. - * @param listener The event listener to be removed. - * @param options An options object that specifies characteristics about the event listener. - */ - off( - event: string, - listener: (ev: CustomEvent) => any, - options?: boolean | AddEventListenerOptions, - ) { - this.removeEventListener(event, listener, options); - } - - sendToken(cmd: Command, expiresIn: number) { - try { - this.socket.send(JSON.stringify(cmd)); - } catch (e) { - this.queue.add(cmd, expiresIn); + if (socket) { + this.applyListeners(); } } - applyListeners(reconnection = false) { + get isDead(): boolean { + return !this.socket || this.socket.readyState !== WebSocket.OPEN; + } + + send(command: Command): boolean { + try { + if (!this.isDead) { + this.socket.send(stringifyCommand(command)); + return true; + } + return false; + } catch (e) { + return false; + } + } + + sendWithQueue(command: Command, expiresIn: number): boolean { + const success = this.send(command); + + if (!success) { + this.queue.add(command, expiresIn); + } + + return success; + } + + applyListeners(reconnection = false): void { + if (!this.socket) return; + const drainQueue = () => { while (!this.queue.isEmpty) { - const item = this.queue.pop() as QueueItem; - this.sendToken(item.value, item.expiresIn); + const item = this.queue.pop(); + if (item) { + this.send(item.value); + } } }; - if (reconnection) drainQueue(); - - // @ts-ignore - this.socket.onopen = (socket: WebSocket, ev: Event): any => { + if (reconnection) { drainQueue(); - this.dispatchEvent(new Event("connection")); - this.dispatchEvent(new Event("connected")); - this.dispatchEvent(new Event("connect")); + } + + this.socket.onclose = () => { + this.status = Status.OFFLINE; + this.emit("close"); + this.emit("disconnect"); }; - this.socket.onclose = (event: CloseEvent) => { - this.dispatchEvent(new Event("close")); - this.dispatchEvent(new Event("closed")); - this.dispatchEvent(new Event("disconnected")); - this.dispatchEvent(new Event("disconnect")); + this.socket.onerror = (error) => { + this.emit("error", error); }; - this.socket.onmessage = async (event: MessageEvent) => { + this.socket.onmessage = (event: any) => { try { - const data = JSON.parse(event.data); + const data = parseCommand(event.data as string); - this.dispatchEvent(new CustomEvent("message", { detail: data })); + // Emit the raw message event + this.emit("message", data); + // Handle special system commands if (data.command === "latency:request") { - this.dispatchEvent( - new CustomEvent("latency:request", { - detail: { latency: data.payload.latency ?? undefined }, - }), - ); - this.command( - "latency:response", - { latency: data.payload.latency ?? undefined }, - null, - ); + this.emit("latency:request", data.payload); + this.command("latency:response", data.payload, null); } else if (data.command === "latency") { - this.dispatchEvent( - new CustomEvent("latency", { - detail: { latency: data.payload ?? undefined }, - }), - ); + this.emit("latency", data.payload); } else if (data.command === "ping") { - this.dispatchEvent(new CustomEvent("ping", {})); + this.emit("ping"); this.command("pong", {}, null); } else { - this.dispatchEvent( - new CustomEvent(data.command, { detail: data.payload }), - ); + // Emit command-specific event + this.emit(data.command, data.payload); } - if (this.callbacks[data.id]) { - this.callbacks[data.id](null, data.payload); + // Resolve any pending command promises + if (data.id !== undefined && this.callbacks[data.id]) { + // Always resolve with the payload, even if it contains an error + // This allows the test to check for error properties in the result + this.callbacks[data.id](data.payload); } - } catch (e) { - this.dispatchEvent(new Event("error")); + } catch (error) { + this.emit("error", error); } }; } - async command( + command( command: string, payload: any, - expiresIn: number = 30_000, - callback: Function | null = null, - ) { + expiresIn: number | null = 30_000, + callback?: (result: any, error?: Error) => void, + ): Promise | null { const id = this.ids.reserve(); - const cmd = { id, command, payload: payload ?? {} }; + const cmd: Command = { id, command, payload: payload ?? {} }; - this.sendToken(cmd, expiresIn); + this.sendWithQueue(cmd, expiresIn || 30000); if (expiresIn === null) { this.ids.release(id); - delete this.callbacks[id]; return null; } - const response = this.createResponsePromise(id); - const timeout = this.createTimeoutPromise(id, expiresIn); - - if (typeof callback === "function") { - const ret = await Promise.race([response, timeout]); - callback(ret); - return ret; - } else { - return Promise.race([response, timeout]); - } - } - - createTimeoutPromise(id: number, expiresIn: number) { - return new Promise((_, reject) => { - setTimeout(() => { + const responsePromise = new Promise((resolve, reject) => { + this.callbacks[id] = (result: any, error?: Error) => { this.ids.release(id); delete this.callbacks[id]; - reject(new Error(`Command ${id} timed out after ${expiresIn}ms.`)); - }, expiresIn); - }); - } - createResponsePromise(id: number) { - return new Promise((resolve, reject) => { - this.callbacks[id] = (error: Error | null, result?: any) => { - this.ids.release(id); - delete this.callbacks[id]; if (error) { reject(error); } else { @@ -263,5 +139,42 @@ export class Connection extends EventTarget { } }; }); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + if (this.callbacks[id]) { + this.ids.release(id); + delete this.callbacks[id]; + reject( + new CodeError( + `Command timed out after ${expiresIn}ms.`, + "ETIMEOUT", + "TimeoutError", + ), + ); + } + }, expiresIn); + }); + + if (typeof callback === "function") { + Promise.race([responsePromise, timeoutPromise]) + .then((result) => callback(result)) + .catch((error) => callback(null, error)); + + return responsePromise; + } + + return Promise.race([responsePromise, timeoutPromise]); + } + + close(): boolean { + if (this.isDead) return false; + + try { + this.socket.close(); + return true; + } catch (e) { + return false; + } } } diff --git a/packages/keepalive-ws/src/client/index.ts b/packages/keepalive-ws/src/client/index.ts index e5a5473..4b17d56 100644 --- a/packages/keepalive-ws/src/client/index.ts +++ b/packages/keepalive-ws/src/client/index.ts @@ -1,2 +1,3 @@ -export { KeepAliveClient } from "./client"; +export { KeepAliveClient, Status } from "./client"; export { Connection } from "./connection"; +export { CodeError } from "../common/codeerror"; diff --git a/packages/keepalive-ws/src/client/queue.ts b/packages/keepalive-ws/src/client/queue.ts index 948754e..4e7f8fa 100644 --- a/packages/keepalive-ws/src/client/queue.ts +++ b/packages/keepalive-ws/src/client/queue.ts @@ -1,50 +1,48 @@ +import { Command } from "../common/message"; + export class QueueItem { - value: any; - expireTime: number; + value: Command; + private expiration: number; - constructor(value: any, expiresIn: number) { + constructor(value: Command, expiresIn: number) { this.value = value; - this.expireTime = Date.now() + expiresIn; + this.expiration = Date.now() + expiresIn; } - get expiresIn() { - return this.expireTime - Date.now(); + get expiresIn(): number { + return this.expiration - Date.now(); } - get isExpired() { - return Date.now() > this.expireTime; + get isExpired(): boolean { + return Date.now() > this.expiration; } } export class Queue { - items: any[] = []; + private items: QueueItem[] = []; - add(item: any, expiresIn: number) { + add(item: Command, expiresIn: number): void { this.items.push(new QueueItem(item, expiresIn)); } - get isEmpty() { - let i = this.items.length; - - while (i--) { - if (this.items[i].isExpired) { - this.items.splice(i, 1); - } else { - return false; - } - } - - return true; + get isEmpty(): boolean { + // Remove expired items first + this.items = this.items.filter((item) => !item.isExpired); + return this.items.length === 0; } pop(): QueueItem | null { - while (this.items.length) { - const item = this.items.shift() as QueueItem; - if (!item.isExpired) { + // Find the first non-expired item + while (this.items.length > 0) { + const item = this.items.shift(); + if (item && !item.isExpired) { return item; } } - return null; } + + clear(): void { + this.items = []; + } } diff --git a/packages/keepalive-ws/src/common/codeerror.ts b/packages/keepalive-ws/src/common/codeerror.ts new file mode 100644 index 0000000..bbaee5e --- /dev/null +++ b/packages/keepalive-ws/src/common/codeerror.ts @@ -0,0 +1,14 @@ +export class CodeError extends Error { + code: string; + name: string; + + constructor(message: string, code?: string, name?: string) { + super(message); + if (typeof code === "string") { + this.code = code; + } + if (typeof name === "string") { + this.name = name; + } + } +} diff --git a/packages/keepalive-ws/src/common/message.ts b/packages/keepalive-ws/src/common/message.ts new file mode 100644 index 0000000..fd2321c --- /dev/null +++ b/packages/keepalive-ws/src/common/message.ts @@ -0,0 +1,17 @@ +export interface Command { + id?: number; + command: string; + payload: any; +} + +export function parseCommand(data: string): Command { + try { + return JSON.parse(data) as Command; + } catch (e) { + return { command: "", payload: {} }; + } +} + +export function stringifyCommand(command: Command): string { + return JSON.stringify(command); +} diff --git a/packages/keepalive-ws/src/common/status.ts b/packages/keepalive-ws/src/common/status.ts new file mode 100644 index 0000000..4a51ab3 --- /dev/null +++ b/packages/keepalive-ws/src/common/status.ts @@ -0,0 +1,6 @@ +export enum Status { + ONLINE = 3, + CONNECTING = 2, + RECONNECTING = 1, + OFFLINE = 0, +} diff --git a/packages/keepalive-ws/src/index.ts b/packages/keepalive-ws/src/index.ts index 80a56be..7f89e44 100644 --- a/packages/keepalive-ws/src/index.ts +++ b/packages/keepalive-ws/src/index.ts @@ -1,2 +1,3 @@ -export { KeepAliveClient } from "./client"; -export { KeepAliveServer } from "./server"; +export { KeepAliveClient, Status } from "./client"; +export { KeepAliveServer, WSContext } from "./server"; +export { CodeError } from "./common/codeerror"; diff --git a/packages/keepalive-ws/src/server/command.ts b/packages/keepalive-ws/src/server/command.ts index 6381a42..e69de29 100644 --- a/packages/keepalive-ws/src/server/command.ts +++ b/packages/keepalive-ws/src/server/command.ts @@ -1,19 +0,0 @@ -export interface Command { - id?: number; - command: string; - payload: any; -} - -export const bufferToCommand = (buffer: Buffer): Command => { - const decoded = new TextDecoder("utf-8").decode(buffer); - if (!decoded) { - return { id: 0, command: "", payload: {} }; - } - - try { - const parsed = JSON.parse(decoded) as Command; - return { id: parsed.id, command: parsed.command, payload: parsed.payload }; - } catch (e) { - return { id: 0, command: "", payload: {} }; - } -}; diff --git a/packages/keepalive-ws/src/server/connection.ts b/packages/keepalive-ws/src/server/connection.ts index db0a4ef..8938d2a 100644 --- a/packages/keepalive-ws/src/server/connection.ts +++ b/packages/keepalive-ws/src/server/connection.ts @@ -1,10 +1,11 @@ -import EventEmitter from "node:events"; +import { EventEmitter } from "node:events"; import { IncomingMessage } from "node:http"; import { WebSocket } from "ws"; -import { KeepAliveServerOptions } from "."; -import { bufferToCommand, Command } from "./command"; +import { Command, parseCommand, stringifyCommand } from "../common/message"; +import { Status } from "../common/status"; import { Latency } from "./latency"; import { Ping } from "./ping"; +import { KeepAliveServerOptions } from "./"; export class Connection extends EventEmitter { id: string; @@ -14,6 +15,7 @@ export class Connection extends EventEmitter { ping: Ping; remoteAddress: string; connectionOptions: KeepAliveServerOptions; + status: Status = Status.ONLINE; constructor( socket: WebSocket, @@ -30,7 +32,11 @@ export class Connection extends EventEmitter { this.startIntervals(); } - startIntervals() { + get isDead(): boolean { + return !this.socket || this.socket.readyState !== WebSocket.OPEN; + } + + startIntervals(): void { this.latency = new Latency(); this.ping = new Ping(); @@ -50,6 +56,7 @@ export class Connection extends EventEmitter { this.ping.interval = setInterval(() => { if (!this.alive) { this.emit("close"); + return; } this.alive = false; @@ -57,32 +64,61 @@ export class Connection extends EventEmitter { }, this.connectionOptions.pingInterval); } - stopIntervals() { + stopIntervals(): void { clearInterval(this.latency.interval); clearInterval(this.ping.interval); } - applyListeners() { + applyListeners(): void { this.socket.on("close", () => { + this.status = Status.OFFLINE; this.emit("close"); }); - this.socket.on("message", (buffer: Buffer) => { - const command = bufferToCommand(buffer); + this.socket.on("error", (error) => { + this.emit("error", error); + }); - if (command.command === "latency:response") { - this.latency.onResponse(); - return; - } else if (command.command === "pong") { - this.alive = true; - return; + this.socket.on("message", (data: Buffer) => { + try { + const command = parseCommand(data.toString()); + + if (command.command === "latency:response") { + this.latency.onResponse(); + return; + } else if (command.command === "pong") { + this.alive = true; + return; + } + + this.emit("message", data); + } catch (error) { + this.emit("error", error); } - - this.emit("message", buffer); }); } - send(cmd: Command) { - this.socket.send(JSON.stringify(cmd)); + send(cmd: Command): boolean { + if (this.isDead) return false; + + try { + this.socket.send(stringifyCommand(cmd)); + return true; + } catch (error) { + this.emit("error", error); + return false; + } + } + + close(): boolean { + if (this.isDead) return false; + + try { + this.socket.close(); + return true; + } catch (error) { + this.emit("error", error); + return false; + } } } diff --git a/packages/keepalive-ws/src/server/index.ts b/packages/keepalive-ws/src/server/index.ts index eae72c0..04992fc 100644 --- a/packages/keepalive-ws/src/server/index.ts +++ b/packages/keepalive-ws/src/server/index.ts @@ -1,117 +1,26 @@ import { IncomingMessage } from "node:http"; import { ServerOptions, WebSocket, WebSocketServer } from "ws"; -import { bufferToCommand } from "./command"; +import { CodeError } from "../common/codeerror"; +import { Command, parseCommand } from "../common/message"; +import { Status } from "../common/status"; import { Connection } from "./connection"; -export declare interface KeepAliveServer extends WebSocketServer { - on( - event: "connection", - handler: (socket: WebSocket, req: IncomingMessage) => void, - ): this; - on(event: "connected", handler: (c: Connection) => void): this; - on(event: "close", handler: (c: Connection) => void): this; - on(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - on( - event: "headers", - cb: ( - this: WebSocketServer, - headers: string[], - request: IncomingMessage, - ) => void, - ): this; - on( - event: string | symbol, - listener: (this: WebSocketServer, ...args: any[]) => void, - ): this; +export { Status } from "../common/status"; +export { Connection } from "./connection"; - emit(event: "connection", socket: WebSocket, req: IncomingMessage): boolean; - emit(event: "connected", connection: Connection): boolean; - emit(event: "close", connection: Connection): boolean; - emit(event: "error", connection: Connection): boolean; - - once( - event: "connection", - cb: ( - this: WebSocketServer, - socket: WebSocket, - request: IncomingMessage, - ) => void, - ): this; - once(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - once( - event: "headers", - cb: ( - this: WebSocketServer, - headers: string[], - request: IncomingMessage, - ) => void, - ): this; - once(event: "close" | "listening", cb: (this: WebSocketServer) => void): this; - once( - event: string | symbol, - listener: (this: WebSocketServer, ...args: any[]) => void, - ): this; - - off( - event: "connection", - cb: ( - this: WebSocketServer, - socket: WebSocket, - request: IncomingMessage, - ) => void, - ): this; - off(event: "error", cb: (this: WebSocketServer, error: Error) => void): this; - off( - event: "headers", - cb: ( - this: WebSocketServer, - headers: string[], - request: IncomingMessage, - ) => void, - ): this; - off(event: "close" | "listening", cb: (this: WebSocketServer) => void): this; - off( - event: string | symbol, - listener: (this: WebSocketServer, ...args: any[]) => void, - ): this; - - addListener( - event: "connection", - cb: (client: WebSocket, request: IncomingMessage) => void, - ): this; - addListener(event: "error", cb: (err: Error) => void): this; - addListener( - event: "headers", - cb: (headers: string[], request: IncomingMessage) => void, - ): this; - addListener(event: "close" | "listening", cb: () => void): this; - addListener(event: string | symbol, listener: (...args: any[]) => void): this; - - removeListener(event: "connection", cb: (client: WebSocket) => void): this; - removeListener(event: "error", cb: (err: Error) => void): this; - removeListener( - event: "headers", - cb: (headers: string[], request: IncomingMessage) => void, - ): this; - removeListener(event: "close" | "listening", cb: () => void): this; - removeListener( - event: string | symbol, - listener: (...args: any[]) => void, - ): this; -} -export class WSContext { - wss: KeepAliveServer; +export class WSContext { + server: KeepAliveServer; connection: Connection; payload: T; - constructor(wss: KeepAliveServer, connection: Connection, payload: any) { - this.wss = wss; + constructor(server: KeepAliveServer, connection: Connection, payload: T) { + this.server = server; this.connection = connection; this.payload = payload; } } -export type SocketMiddleware = (c: WSContext) => any | Promise; +export type SocketMiddleware = (context: WSContext) => any | Promise; export type KeepAliveServerOptions = ServerOptions & { /** @@ -136,34 +45,65 @@ export class KeepAliveServer extends WebSocketServer { globalMiddlewares: SocketMiddleware[] = []; middlewares: { [key: string]: SocketMiddleware[] } = {}; rooms: { [roomName: string]: Set } = {}; - declare serverOptions: KeepAliveServerOptions; + serverOptions: ServerOptions & { + pingInterval: number; + latencyInterval: number; + }; + status: Status = Status.OFFLINE; + private _listening: boolean = false; + + /** + * Whether the server is currently listening for connections + */ + get listening(): boolean { + return this._listening; + } constructor(opts: KeepAliveServerOptions) { - super({ ...opts }); + super(opts); this.serverOptions = { ...opts, pingInterval: opts.pingInterval ?? 30_000, latencyInterval: opts.latencyInterval ?? 5_000, }; + + this.on("listening", () => { + this._listening = true; + this.status = Status.ONLINE; + }); + + this.on("close", () => { + this._listening = false; + this.status = Status.OFFLINE; + }); + this.applyListeners(); } - private cleanupConnection(c: Connection) { - c.stopIntervals(); - delete this.connections[c.id]; - if (this.remoteAddressToConnections[c.remoteAddress]) { - this.remoteAddressToConnections[c.remoteAddress] = - this.remoteAddressToConnections[c.remoteAddress].filter( - (cn) => cn.id !== c.id, + private cleanupConnection(connection: Connection): void { + connection.stopIntervals(); + delete this.connections[connection.id]; + + if (this.remoteAddressToConnections[connection.remoteAddress]) { + this.remoteAddressToConnections[connection.remoteAddress] = + this.remoteAddressToConnections[connection.remoteAddress].filter( + (conn) => conn.id !== connection.id, ); + + if ( + this.remoteAddressToConnections[connection.remoteAddress].length === 0 + ) { + delete this.remoteAddressToConnections[connection.remoteAddress]; + } } - if (!this.remoteAddressToConnections[c.remoteAddress].length) { - delete this.remoteAddressToConnections[c.remoteAddress]; - } + // Remove from all rooms + Object.keys(this.rooms).forEach((roomName) => { + this.rooms[roomName].delete(connection.id); + }); } - private applyListeners() { + private applyListeners(): void { this.on("connection", (socket: WebSocket, req: IncomingMessage) => { const connection = new Connection(socket, req, this.serverOptions); this.connections[connection.id] = connection; @@ -178,44 +118,47 @@ export class KeepAliveServer extends WebSocketServer { this.emit("connected", connection); - connection.once("close", () => { + connection.on("close", () => { this.cleanupConnection(connection); this.emit("close", connection); + }); - if (socket.readyState === WebSocket.OPEN) { - socket.close(); - } - - Object.keys(this.rooms).forEach((roomName) => { - this.rooms[roomName].delete(connection.id); - }); + connection.on("error", (error) => { + this.emit("clientError", error); }); connection.on("message", (buffer: Buffer) => { try { - const { id, command, payload } = bufferToCommand(buffer); - this.runCommand(id ?? 0, command, payload, connection); - } catch (e) { - this.emit("error", e); + const data = buffer.toString(); + const command = parseCommand(data); + + if (command.id !== undefined) { + this.runCommand( + command.id, + command.command, + command.payload, + connection, + ); + } + } catch (error) { + this.emit("error", error); } }); }); } - broadcast(command: string, payload: any, connections?: Connection[]) { - const cmd = JSON.stringify({ command, payload }); + broadcast(command: string, payload: any, connections?: Connection[]): void { + const cmd: Command = { command, payload }; if (connections) { - connections.forEach((c) => { - c.socket.send(cmd); + connections.forEach((connection) => { + connection.send(cmd); + }); + } else { + Object.values(this.connections).forEach((connection) => { + connection.send(cmd); }); - - return; } - - Object.values(this.connections).forEach((c) => { - c.socket.send(cmd); - }); } /** @@ -226,14 +169,21 @@ export class KeepAliveServer extends WebSocketServer { * - Push notifications. * - Auth changes, e.g., logging out in one tab should log you out in all tabs. */ - broadcastRemoteAddress(c: Connection, command: string, payload: any) { - const cmd = JSON.stringify({ command, payload }); - this.remoteAddressToConnections[c.remoteAddress].forEach((cn) => { - cn.socket.send(cmd); + broadcastRemoteAddress( + connection: Connection, + command: string, + payload: any, + ): void { + const cmd: Command = { command, payload }; + const connections = + this.remoteAddressToConnections[connection.remoteAddress] || []; + + connections.forEach((conn) => { + conn.send(cmd); }); } - broadcastRemoteAddressById(id: string, command: string, payload: any) { + broadcastRemoteAddressById(id: string, command: string, payload: any): void { const connection = this.connections[id]; if (connection) { this.broadcastRemoteAddress(connection, command, payload); @@ -244,8 +194,8 @@ export class KeepAliveServer extends WebSocketServer { * Given a roomName, a command and a payload, broadcasts to all Connections * that are in the room. */ - broadcastRoom(roomName: string, command: string, payload: any) { - const cmd = JSON.stringify({ command, payload }); + broadcastRoom(roomName: string, command: string, payload: any): void { + const cmd: Command = { command, payload }; const room = this.rooms[roomName]; if (!room) return; @@ -253,7 +203,7 @@ export class KeepAliveServer extends WebSocketServer { room.forEach((connectionId) => { const connection = this.connections[connectionId]; if (connection) { - connection.socket.send(cmd); + connection.send(cmd); } }); } @@ -267,8 +217,8 @@ export class KeepAliveServer extends WebSocketServer { command: string, payload: any, connection: Connection | Connection[], - ) { - const cmd = JSON.stringify({ command, payload }); + ): void { + const cmd: Command = { command, payload }; const room = this.rooms[roomName]; if (!room) return; @@ -281,7 +231,7 @@ export class KeepAliveServer extends WebSocketServer { if (!excludeIds.includes(connectionId)) { const conn = this.connections[connectionId]; if (conn) { - conn.socket.send(cmd); + conn.send(cmd); } } }); @@ -291,111 +241,157 @@ export class KeepAliveServer extends WebSocketServer { * Given a connection, broadcasts a message to all connections except * the provided connection. */ - broadcastExclude(connection: Connection, command: string, payload: any) { - const cmd = JSON.stringify({ command, payload }); - Object.values(this.connections).forEach((c) => { - if (c.id !== connection.id) { - c.socket.send(cmd); + broadcastExclude( + connection: Connection, + command: string, + payload: any, + ): void { + const cmd: Command = { command, payload }; + + Object.values(this.connections).forEach((conn) => { + if (conn.id !== connection.id) { + conn.send(cmd); } }); } /** - * @example - * ```typescript - * server.registerCommand("join:room", async (payload: { roomName: string }, connection: Connection) => { - * server.addToRoom(payload.roomName, connection); - * server.broadcastRoom(payload.roomName, "joined", { roomName: payload.roomName }); - * }); - * ``` + * Add a connection to a room */ - addToRoom(roomName: string, connection: Connection) { + addToRoom(roomName: string, connection: Connection): void { this.rooms[roomName] = this.rooms[roomName] ?? new Set(); this.rooms[roomName].add(connection.id); } - removeFromRoom(roomName: string, connection: Connection) { + /** + * Remove a connection from a room + */ + removeFromRoom(roomName: string, connection: Connection): void { if (!this.rooms[roomName]) return; this.rooms[roomName].delete(connection.id); } - removeFromAllRooms(connection: Connection | string) { - const connectionId = typeof connection === "string" ? connection : connection.id; + /** + * Remove a connection from all rooms + */ + removeFromAllRooms(connection: Connection | string): void { + const connectionId = + typeof connection === "string" ? connection : connection.id; + Object.keys(this.rooms).forEach((roomName) => { this.rooms[roomName].delete(connectionId); }); } /** - * Returns a "room", which is simply a Set of Connection ids. - * @param roomName + * Returns all connections in a room */ getRoom(roomName: string): Connection[] { const ids = this.rooms[roomName] || new Set(); - return Array.from(ids).map((id) => this.connections[id]); + return Array.from(ids) + .map((id) => this.connections[id]) + .filter(Boolean); } - clearRoom(roomName: string) { + /** + * Clear all connections from a room + */ + clearRoom(roomName: string): void { this.rooms[roomName] = new Set(); } - registerCommand( + /** + * Register a command handler + */ + async registerCommand( command: string, callback: (context: WSContext) => Promise | T, middlewares: SocketMiddleware[] = [], - ) { + ): Promise { this.commands[command] = callback; - this.prependMiddlewareToCommand(command, middlewares); + + if (middlewares.length > 0) { + this.prependMiddlewareToCommand(command, middlewares); + } + + return Promise.resolve(); } - prependMiddlewareToCommand(command: string, middlewares: SocketMiddleware[]) { + /** + * Add middleware to be executed before a command + */ + prependMiddlewareToCommand( + command: string, + middlewares: SocketMiddleware[], + ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; this.middlewares[command] = middlewares.concat(this.middlewares[command]); } } - appendMiddlewareToCommand(command: string, middlewares: SocketMiddleware[]) { + /** + * Add middleware to be executed after other middleware but before the command + */ + appendMiddlewareToCommand( + command: string, + middlewares: SocketMiddleware[], + ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; this.middlewares[command] = this.middlewares[command].concat(middlewares); } } + /** + * Execute a command with the given id, name, payload and connection + */ private async runCommand( id: number, command: string, payload: any, connection: Connection, - ) { - const c = new WSContext(this, connection, payload); + ): Promise { + const context = new WSContext(this, connection, payload); try { if (!this.commands[command]) { - // An onslaught of commands that don't exist is a sign of a bad - // or otherwise misconfigured client. - throw new Error(`Command [${command}] not found.`); + throw new CodeError( + `Command [${command}] not found.`, + "ENOTFOUND", + "CommandError", + ); } + // Run global middlewares if (this.globalMiddlewares.length) { - for (const mw of this.globalMiddlewares) { - await mw(c); + for (const middleware of this.globalMiddlewares) { + await middleware(context); } } + // Run command-specific middlewares if (this.middlewares[command]) { - for (const mw of this.middlewares[command]) { - await mw(c); + for (const middleware of this.middlewares[command]) { + await middleware(context); } } - const result = await this.commands[command](c); + // Execute the command + const result = await this.commands[command](context); connection.send({ id, command, payload: result }); - } catch (e) { - const payload = { error: e.message ?? e ?? "Unknown error" }; - connection.send({ id, command, payload }); + } catch (error) { + // Handle and serialize errors + const errorPayload = + error instanceof Error + ? { + error: error.message, + code: (error as CodeError).code || "ESERVER", + name: error.name || "Error", + } + : { error: String(error) }; + + connection.send({ id, command, payload: errorPayload }); } } } - -export { Connection }; diff --git a/packages/keepalive-ws/tests/advanced.test.ts b/packages/keepalive-ws/tests/advanced.test.ts new file mode 100644 index 0000000..6d563fa --- /dev/null +++ b/packages/keepalive-ws/tests/advanced.test.ts @@ -0,0 +1,161 @@ +import { describe, test, expect, beforeEach, afterEach } from "vitest"; +import { KeepAliveClient, Status } from "../src/client/client"; +import { KeepAliveServer } from "../src/server/index"; + +// Helper to create a WebSocket server for testing +const createTestServer = (port: number) => { + return new KeepAliveServer({ + port, + pingInterval: 1000, // Faster for testing + latencyInterval: 500, // Faster for testing + }); +}; + +describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { + const port = 8125; + let server: KeepAliveServer; + let client: KeepAliveClient; + + beforeEach(async () => { + server = createTestServer(port); + + // Wait for the server to start + await new Promise((resolve) => { + server.on("listening", () => { + resolve(); + }); + + // In case the server is already listening + if (server.listening) { + resolve(); + } + }); + + client = new KeepAliveClient(`ws://localhost:${port}`); + }); + + afterEach(async () => { + // Close connections in order + if (client.status === Status.ONLINE) { + await client.close(); + } + + // Close the server + return new Promise((resolve) => { + if (server) { + server.close(() => { + resolve(); + }); + } else { + resolve(); + } + }); + }); + + test("command times out when server doesn't respond", async () => { + await server.registerCommand("never-responds", async () => { + return new Promise(() => {}); + }); + + await client.connect(); + + // Expect it to fail after a short timeout + await expect( + client.command("never-responds", "Should timeout", 500), + ).rejects.toThrow(/timed out/); + }, 2000); + + test("server errors are properly serialized to client", async () => { + await server.registerCommand("throws-error", async () => { + throw new Error("Custom server error"); + }); + + await client.connect(); + + // Expect to receive this error + const result = await client.command("throws-error", "Will error", 1000); + expect(result).toHaveProperty("error", "Custom server error"); + }, 2000); + + test("multiple concurrent commands are handled correctly", async () => { + // Register commands with different delays + await server.registerCommand("fast", async (context) => { + await new Promise((r) => setTimeout(r, 50)); + return `Fast: ${context.payload}`; + }); + + await server.registerCommand("slow", async (context) => { + await new Promise((r) => setTimeout(r, 150)); + return `Slow: ${context.payload}`; + }); + + await server.registerCommand("echo", async (context) => { + return `Echo: ${context.payload}`; + }); + + await client.connect(); + + // Send multiple commands concurrently + const results = await Promise.all([ + client.command("fast", "First", 1000), + client.command("slow", "Second", 1000), + client.command("echo", "Third", 1000), + ]); + + // Verify all commands completed successfully + expect(results).toEqual(["Fast: First", "Slow: Second", "Echo: Third"]); + }, 3000); + + test("handles large payloads correctly", async () => { + await server.registerCommand("echo", async (context) => { + return context.payload; + }); + + await client.connect(); + + const largeData = { + array: Array(1000) + .fill(0) + .map((_, i) => `item-${i}`), + nested: { + deep: { + object: { + with: "lots of data", + }, + }, + }, + }; + + const result = await client.command("echo", largeData, 5000); + + // Verify the response contains the expected data + expect(result).toEqual(largeData); + }, 10000); + + test("server handles multiple client connections", async () => { + await server.registerCommand("echo", async (context) => { + return `Echo: ${context.payload}`; + }); + + // Create multiple clients + const clients = Array(5) + .fill(0) + .map(() => new KeepAliveClient(`ws://localhost:${port}`)); + + // Connect all clients + await Promise.all(clients.map((client) => client.connect())); + + // Send a command from each client + const results = await Promise.all( + clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)), + ); + + // Verify all commands succeeded + results.forEach((result, i) => { + expect(result).toBe(`Echo: Client ${i}`); + }); + + // Clean up + await Promise.all(clients.map((client) => client.close())); + }, 5000); +}); diff --git a/packages/keepalive-ws/tests/basic.test.ts b/packages/keepalive-ws/tests/basic.test.ts new file mode 100644 index 0000000..b5810f5 --- /dev/null +++ b/packages/keepalive-ws/tests/basic.test.ts @@ -0,0 +1,97 @@ +import { describe, test, expect, beforeEach, afterEach } from "vitest"; +import { KeepAliveClient, Status } from "../src/client/client"; +import { KeepAliveServer } from "../src/server/index"; +import { WebSocket, WebSocketServer } from "ws"; + +// Helper to create a WebSocket server for testing +const createTestServer = (port: number) => { + return new KeepAliveServer({ + port, + pingInterval: 1000, // Faster for testing + latencyInterval: 500, // Faster for testing + }); +}; + +describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { + const port = 8124; + let server: KeepAliveServer; + let client: KeepAliveClient; + + beforeEach(async () => { + server = createTestServer(port); + + // Wait for the server to start + await new Promise((resolve) => { + server.on("listening", () => { + resolve(); + }); + + // In case the server is already listening + if (server.listening) { + resolve(); + } + }); + + client = new KeepAliveClient(`ws://localhost:${port}`); + }); + + afterEach(async () => { + // Close connections in order + if (client.status === Status.ONLINE) { + await client.close(); + } + + // Close the server + return new Promise((resolve) => { + if (server) { + server.close(() => { + resolve(); + }); + } else { + resolve(); + } + }); + }); + + test("client-server connection should be online", async () => { + await server.registerCommand("echo", async (context) => { + return context.payload; + }); + + await client.connect(); + expect(client.status).toBe(Status.ONLINE); + }, 10000); + + test("simple echo command", async () => { + await server.registerCommand("echo", async (context) => { + return `Echo: ${context.payload}`; + }); + + await client.connect(); + + const result = await client.command("echo", "Hello", 5000); + expect(result).toBe("Echo: Hello"); + }, 10000); + + test("connect should resolve when already connected", async () => { + await server.registerCommand("echo", async (context) => { + return context.payload; + }); + + await client.connect(); + expect(client.status).toBe(Status.ONLINE); + + // Second connect should resolve immediately + await client.connect(); + expect(client.status).toBe(Status.ONLINE); + }, 10000); + + test("close should resolve when already closed", async () => { + await client.close(); + expect(client.status).toBe(Status.OFFLINE); + + // Second close should resolve immediately + await client.close(); + expect(client.status).toBe(Status.OFFLINE); + }, 10000); +});