From 9fbd947ad1d44282b025b5e0fddd260cee05125a Mon Sep 17 00:00:00 2001 From: nvms Date: Sun, 20 Apr 2025 17:05:32 -0400 Subject: [PATCH] refactor for maintainability and modularity --- packages/mesh/src/server/index.ts | 1293 +---------------- .../mesh/src/server/managers/broadcast.ts | 212 +++ packages/mesh/src/server/managers/channel.ts | 200 +++ packages/mesh/src/server/managers/command.ts | 144 ++ .../connection.ts} | 4 +- .../presence.ts} | 4 +- packages/mesh/src/server/managers/pubsub.ts | 223 +++ .../server/managers/record-subscription.ts | 270 ++++ .../{record-manager.ts => managers/record.ts} | 0 packages/mesh/src/server/managers/redis.ts | 120 ++ .../{room-manager.ts => managers/room.ts} | 2 +- packages/mesh/src/server/mesh-context.ts | 21 + packages/mesh/src/server/mesh-server.ts | 770 ++++++++++ packages/mesh/src/server/types.ts | 55 + packages/mesh/src/server/utils/constants.ts | 2 + packages/mesh/src/tests/basic.test.ts | 7 - 16 files changed, 2034 insertions(+), 1293 deletions(-) create mode 100644 packages/mesh/src/server/managers/broadcast.ts create mode 100644 packages/mesh/src/server/managers/channel.ts create mode 100644 packages/mesh/src/server/managers/command.ts rename packages/mesh/src/server/{connection-manager.ts => managers/connection.ts} (98%) rename packages/mesh/src/server/{presence-manager.ts => managers/presence.ts} (97%) create mode 100644 packages/mesh/src/server/managers/pubsub.ts create mode 100644 packages/mesh/src/server/managers/record-subscription.ts rename packages/mesh/src/server/{record-manager.ts => managers/record.ts} (100%) create mode 100644 packages/mesh/src/server/managers/redis.ts rename packages/mesh/src/server/{room-manager.ts => managers/room.ts} (99%) create mode 100644 packages/mesh/src/server/mesh-context.ts create mode 100644 packages/mesh/src/server/mesh-server.ts create mode 100644 packages/mesh/src/server/types.ts create mode 100644 packages/mesh/src/server/utils/constants.ts diff --git a/packages/mesh/src/server/index.ts b/packages/mesh/src/server/index.ts index 3504147..a6a544b 100644 --- a/packages/mesh/src/server/index.ts +++ b/packages/mesh/src/server/index.ts @@ -1,1281 +1,12 @@ -import { IncomingMessage } from "node:http"; -import { v4 as uuidv4 } from "uuid"; -import { Redis, type RedisOptions } from "ioredis"; -import { WebSocket, WebSocketServer, type ServerOptions } from "ws"; -import { RoomManager } from "./room-manager"; -import { RecordManager } from "./record-manager"; -import { ConnectionManager } from "./connection-manager"; -import { PresenceManager } from "./presence-manager"; -import { CodeError, Status } from "../client"; -import { Connection } from "./connection"; -import { parseCommand, type Command } from "../common/message"; -import type { Operation } from "fast-json-patch"; - -const PUB_SUB_CHANNEL_PREFIX = "mesh:pubsub:"; -const RECORD_PUB_SUB_CHANNEL = "mesh:record-updates"; - -export class MeshContext { - server: MeshServer; - command: string; - connection: Connection; - payload: T; - - constructor( - server: MeshServer, - command: string, - connection: Connection, - payload: T - ) { - this.server = server; - this.command = command; - this.connection = connection; - this.payload = payload; - } -} - -export type SocketMiddleware = ( - context: MeshContext -) => any | Promise; - -type PubSubMessagePayload = { - targetConnectionIds: string[]; - command: Command; -}; - -type RecordUpdatePubSubPayload = { - recordId: string; - newValue?: any; - patch?: Operation[]; - version: number; -}; - -export type MeshServerOptions = ServerOptions & { - /** - * The interval at which to send ping messages to the client. - * - * @default 30000 - */ - pingInterval?: number; - - /** - * The interval at which to send both latency requests and updates to the client. - * - * @default 5000 - */ - latencyInterval?: number; - redisOptions: RedisOptions; - - /** - * The maximum number of consecutive ping intervals the server will wait - * for a pong response before considering the client disconnected. - * A value of 1 means the client must respond within roughly 2 * pingInterval - * before being disconnected. Setting it to 0 is not recommended as it will - * immediately disconnect the client if it doesn't respond to the first ping in - * exactly `pingInterval` milliseconds, which doesn't provide wiggle room for - * network latency. - * - * @see pingInterval - * @default 1 - */ - maxMissedPongs?: number; -}; - -type ChannelPattern = string | RegExp; - -export class MeshServer extends WebSocketServer { - readonly instanceId: string; - redis: Redis; - pubClient: Redis; - subClient: Redis; - roomManager: RoomManager; - recordManager: RecordManager; - connectionManager: ConnectionManager; - presenceManager: PresenceManager; - serverOptions: MeshServerOptions; - status: Status = Status.OFFLINE; - private exposedChannels: ChannelPattern[] = []; - private exposedRecords: ChannelPattern[] = []; - private exposedWritableRecords: ChannelPattern[] = []; // New: Track writable records - private channelSubscriptions: { [channel: string]: Set } = {}; - private recordSubscriptions: Map< - string, // recordId - Map // connectionId -> mode - > = new Map(); - private channelGuards: Map< - ChannelPattern, - (connection: Connection, channel: string) => Promise | boolean - > = new Map(); - private recordGuards: Map< - ChannelPattern, - (connection: Connection, recordId: string) => Promise | boolean - > = new Map(); - private writableRecordGuards: Map< - // New: Guards for writable records - ChannelPattern, - (connection: Connection, recordId: string) => Promise | boolean - > = new Map(); - private _isShuttingDown = false; - - commands: { - [command: string]: (context: MeshContext) => Promise | any; - } = {}; - private globalMiddlewares: SocketMiddleware[] = []; - middlewares: { [key: string]: SocketMiddleware[] } = {}; - - private _listening = false; - private _subscriptionPromise!: Promise; - - get listening(): boolean { - return this._listening; - } - - set listening(value: boolean) { - this._listening = value; - this.status = value ? Status.ONLINE : Status.OFFLINE; - } - - constructor(opts: MeshServerOptions) { - super(opts); - - this.instanceId = uuidv4(); - - this.redis = new Redis({ - retryStrategy: (times: number) => { - if (this._isShuttingDown) { - return null; - } - - if (times > 10) { - return null; - } - - return Math.min(1000 * Math.pow(2, times), 30000); - }, - ...opts.redisOptions, - }); - this.redis.on("error", (err) => console.error("Redis error:", err)); - - this.serverOptions = { - ...opts, - pingInterval: opts.pingInterval ?? 30_000, - latencyInterval: opts.latencyInterval ?? 5_000, - maxMissedPongs: opts.maxMissedPongs ?? 1, - }; - - this.pubClient = this.redis.duplicate(); - this.subClient = this.redis.duplicate(); - - this.roomManager = new RoomManager(this.redis); - this.recordManager = new RecordManager(this.redis); - this.connectionManager = new ConnectionManager( - this.pubClient, - this.instanceId, - this.roomManager - ); - this.presenceManager = new PresenceManager(this.redis, this.roomManager); - - this.subscribeToInstanceChannel(); - - this.on("listening", () => { - this.listening = true; - }); - - this.on("error", (err) => { - console.error(`[MeshServer] Error: ${err}`); - }); - - this.on("close", () => { - this.listening = false; - }); - - this.registerBuiltinCommands(); - this.registerRecordCommands(); // Add this line - this.applyListeners(); - } - - /** - * Waits until the service is ready by ensuring it is listening and the instance channel subscription is established. - * - * @returns {Promise} A promise that resolves when the service is fully ready. - * @throws {Error} If the readiness process fails or if any awaited promise rejects. - */ - async ready(): Promise { - const listeningPromise = this.listening - ? Promise.resolve() - : new Promise((resolve) => this.once("listening", resolve)); - - await Promise.all([listeningPromise, this._subscriptionPromise]); - } - - private subscribeToInstanceChannel(): Promise { - const channel = `${PUB_SUB_CHANNEL_PREFIX}${this.instanceId}`; - - this._subscriptionPromise = new Promise((resolve, reject) => { - this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL); - this.subClient.psubscribe("mesh:presence:updates:*", (err) => { - if (err) { - if (!this._isShuttingDown) { - console.error( - `Failed to subscribe to channels ${channel}, ${RECORD_PUB_SUB_CHANNEL}:`, - err - ); - } - reject(err); - return; - } - resolve(); - }); - }); - - this.subClient.on("message", async (channel, message) => { - if (channel.startsWith(PUB_SUB_CHANNEL_PREFIX)) { - this.handleInstancePubSubMessage(channel, message); - } else if (channel === RECORD_PUB_SUB_CHANNEL) { - this.handleRecordUpdatePubSubMessage(message); - } else if (this.channelSubscriptions[channel]) { - for (const connection of this.channelSubscriptions[channel]) { - if (!connection.isDead) { - connection.send({ - command: "mesh/subscription-message", - payload: { channel, message }, - }); - } - } - } - }); - - this.subClient.on("pmessage", async (pattern, channel, message) => { - if (pattern === "mesh:presence:updates:*") { - // channel here is the actual channel, e.g., mesh:presence:updates:roomName - const subscribers = this.channelSubscriptions[channel]; - if (subscribers) { - try { - const payload = JSON.parse(message); - subscribers.forEach((connection) => { - if (!connection.isDead) { - connection.send({ - command: "mesh/presence-update", - payload: payload, - }); - } else { - // clean up dead connections from subscription list - subscribers.delete(connection); - } - }); - } catch (e) { - this.emit( - "error", - new Error(`Failed to parse presence update: ${message}`) - ); - } - } - } - }); - - return this._subscriptionPromise; - } - - private handleInstancePubSubMessage(channel: string, message: string) { - try { - const parsedMessage = JSON.parse(message) as PubSubMessagePayload; - - if ( - !parsedMessage || - !Array.isArray(parsedMessage.targetConnectionIds) || - !parsedMessage.command || - typeof parsedMessage.command.command !== "string" - ) { - throw new Error("Invalid message format"); - } - - const { targetConnectionIds, command } = parsedMessage; - - targetConnectionIds.forEach((connectionId) => { - const connection = - this.connectionManager.getLocalConnection(connectionId); - - if (connection && !connection.isDead) { - connection.send(command); - } - }); - } catch (err) { - this.emit("error", new Error(`Failed to parse message: ${message}`)); - } - } - - private handleRecordUpdatePubSubMessage(message: string) { - try { - const parsedMessage = JSON.parse(message) as RecordUpdatePubSubPayload; - const { recordId, newValue, patch, version } = parsedMessage; - - if (!recordId || typeof version !== "number") { - throw new Error("Invalid record update message format"); - } - - const subscribers = this.recordSubscriptions.get(recordId); - - if (!subscribers) { - return; - } - - subscribers.forEach((mode, connectionId) => { - const connection = - this.connectionManager.getLocalConnection(connectionId); - if (connection && !connection.isDead) { - if (mode === "patch" && patch) { - connection.send({ - command: "mesh/record-update", - payload: { recordId, patch, version }, - }); - } else if (mode === "full" && newValue !== undefined) { - connection.send({ - command: "mesh/record-update", - payload: { recordId, full: newValue, version }, - }); - } - } else if (!connection) { - subscribers.delete(connectionId); - if (subscribers.size === 0) { - this.recordSubscriptions.delete(recordId); - } - } - }); - } catch (err) { - this.emit( - "error", - new Error(`Failed to parse record update message: ${message}`) - ); - } - } - - private applyListeners() { - this.on("connection", async (socket: WebSocket, req: IncomingMessage) => { - const connection = new Connection(socket, req, this.serverOptions); - - connection.on("message", (buffer: Buffer) => { - try { - const data = buffer.toString(); - const command = parseCommand(data); - - if ( - command.id !== undefined && - !["latency:response", "pong"].includes(command.command) - ) { - this.runCommand( - command.id, - command.command, - command.payload, - connection - ); - } - } catch (err) { - this.emit("error", err); - } - }); - - try { - await this.connectionManager.registerConnection(connection); - } catch (error) { - connection.close(); - return; - } - - this.emit("connected", connection); - - connection.on("close", async () => { - await this.cleanupConnection(connection); - this.emit("disconnected", connection); - }); - - connection.on("error", (err) => { - this.emit("clientError", err, connection); - }); - - connection.on("pong", async (connectionId) => { - try { - const rooms = await this.roomManager.getRoomsForConnection( - connectionId - ); - for (const roomName of rooms) { - if (await this.presenceManager.isRoomTracked(roomName)) { - await this.presenceManager.refreshPresence( - connectionId, - roomName - ); - } - } - } catch (err) { - this.emit("error", new Error(`Failed to refresh presence: ${err}`)); - } - }); - }); - } - - /** - * Exposes a channel for external access and optionally associates a guard function - * to control access to that channel. The guard function determines whether a given - * connection is permitted to access the channel. - * - * @param {ChannelPattern} channel - The channel or pattern to expose. - * @param {(connection: Connection, channel: string) => Promise | boolean} [guard] - - * Optional guard function that receives the connection and channel name, returning - * a boolean or a promise that resolves to a boolean indicating whether access is allowed. - * @returns {void} - */ - exposeChannel( - channel: ChannelPattern, - guard?: ( - connection: Connection, - channel: string - ) => Promise | boolean - ): void { - this.exposedChannels.push(channel); - if (guard) { - this.channelGuards.set(channel, guard); - } - } - - private async isChannelExposed( - channel: string, - connection: Connection - ): Promise { - const matchedPattern = this.exposedChannels.find((pattern) => - typeof pattern === "string" ? pattern === channel : pattern.test(channel) - ); - - if (!matchedPattern) { - return false; - } - - const guard = this.channelGuards.get(matchedPattern); - if (guard) { - try { - return await Promise.resolve(guard(connection, channel)); - } catch (e) { - return false; - } - } - - return true; - } - - /** - * Exposes a record or pattern for client subscriptions, optionally adding a guard function. - * - * @param {ChannelPattern} recordPattern - The record ID or pattern to expose. - * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. - */ - exposeRecord( - recordPattern: ChannelPattern, - guard?: ( - connection: Connection, - recordId: string - ) => Promise | boolean - ): void { - this.exposedRecords.push(recordPattern); - if (guard) { - this.recordGuards.set(recordPattern, guard); - } - } - - private async isRecordExposed( - recordId: string, - connection: Connection - ): Promise { - const readPattern = this.exposedRecords.find((pattern) => - typeof pattern === "string" - ? pattern === recordId - : pattern.test(recordId) - ); - - let canRead = false; - if (readPattern) { - const guard = this.recordGuards.get(readPattern); - if (guard) { - try { - canRead = await Promise.resolve(guard(connection, recordId)); - } catch (e) { - canRead = false; - } - } else { - canRead = true; - } - } - - if (canRead) { - return true; - } - - // if exposed as writable, it is implicitly readable - const writePattern = this.exposedWritableRecords.find((pattern) => - typeof pattern === "string" - ? pattern === recordId - : pattern.test(recordId) - ); - - // If exposed as writable, it's readable. No need to check the *write* guard here. - if (writePattern) { - return true; - } - - return false; - } - - /** - * Exposes a record or pattern for client writes, optionally adding a guard function. - * - * @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable. - * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. - */ - exposeWritableRecord( - recordPattern: ChannelPattern, - guard?: ( - connection: Connection, - recordId: string - ) => Promise | boolean - ): void { - this.exposedWritableRecords.push(recordPattern); - if (guard) { - this.writableRecordGuards.set(recordPattern, guard); - } - } - - private async isRecordWritable( - recordId: string, - connection: Connection - ): Promise { - const matchedPattern = this.exposedWritableRecords.find((pattern) => - typeof pattern === "string" - ? pattern === recordId - : pattern.test(recordId) - ); - - if (!matchedPattern) { - return false; - } - - const guard = this.writableRecordGuards.get(matchedPattern); - if (guard) { - try { - return await Promise.resolve(guard(connection, recordId)); - } catch (e) { - return false; - } - } - - return true; - } - - /** - * Publishes a message to a specified channel and optionally maintains a history of messages. - * - * @param {string} channel - The name of the channel to which the message will be published. - * @param {any} message - The message to be published. Will not be stringified automatically for you. You need to do that yourself. - * @param {number} [history=0] - The number of historical messages to retain for the channel. Defaults to 0, meaning no history is retained. - * If greater than 0, the message will be added to the channel's history and the history will be trimmed to the specified size. - * @returns {Promise} A Promise that resolves once the message has been published and, if applicable, the history has been updated. - * @throws {Error} This function may throw an error if the underlying `pubClient` operations (e.g., `lpush`, `ltrim`, `publish`) fail. - */ - async publishToChannel( - channel: string, - message: any, - history: number = 0 - ): Promise { - const parsedHistory = parseInt(history as any, 10); - if (!isNaN(parsedHistory) && parsedHistory > 0) { - await this.pubClient.lpush(`history:${channel}`, message); - await this.pubClient.ltrim(`history:${channel}`, 0, parsedHistory); - } - await this.pubClient.publish(channel, message); - } - - /** - * Updates a record, persists it to Redis, increments its version, computes a patch, - * and publishes the update via Redis pub/sub. - * - * @param {string} recordId - The ID of the record to update. - * @param {any} newValue - The new value for the record. - * @returns {Promise} - * @throws {Error} If the update fails. - */ - async publishRecordUpdate(recordId: string, newValue: any): Promise { - const updateResult = await this.recordManager.publishUpdate( - recordId, - newValue - ); - - if (!updateResult) { - return; - } - - const { patch, version } = updateResult; - - const messagePayload: RecordUpdatePubSubPayload = { - recordId, - newValue, - patch, - version, - }; - - try { - await this.pubClient.publish( - RECORD_PUB_SUB_CHANNEL, - JSON.stringify(messagePayload) - ); - } catch (err) { - this.emit( - "error", - new Error(`Failed to publish record update for "${recordId}": ${err}`) - ); - } - } - - /** - * Adds one or more middleware functions to the global middleware stack. - * - * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added. Each middleware - * is expected to conform to the `SocketMiddleware` type. - * @returns {void} - * @throws {Error} If the provided middlewares are not valid or fail validation (if applicable). - */ - addMiddleware(...middlewares: SocketMiddleware[]): void { - this.globalMiddlewares.push(...middlewares); - } - - /** - * Registers a command with an associated callback and optional middleware. - * - * @template T The type for `MeshContext.payload`. Defaults to `any`. - * @template U The command's return value type. Defaults to `any`. - * @param {string} command - The unique identifier for the command to register. - * @param {(context: MeshContext) => Promise | U} callback - The function to execute when the command is invoked. It receives a `MeshContext` of type `T` and may return a value of type `U` or a `Promise` resolving to `U`. - * @param {SocketMiddleware[]} [middlewares=[]] - An optional array of middleware functions to apply to the command. Defaults to an empty array. - * @throws {Error} May throw an error if the command registration or middleware addition fails. - */ - registerCommand( - command: string, - callback: (context: MeshContext) => Promise | U, - middlewares: SocketMiddleware[] = [] - ) { - this.commands[command] = callback; - - if (middlewares.length > 0) { - this.addMiddlewareToCommand(command, middlewares); - } - } - - trackPresence( - roomPattern: string | RegExp, - guardOrOptions?: - | (( - connection: Connection, - roomName: string - ) => Promise | boolean) - | { - ttl?: number; - guard?: ( - connection: Connection, - roomName: string - ) => Promise | boolean; - } - ): void { - this.presenceManager.trackRoom(roomPattern, guardOrOptions); - } - - private registerBuiltinCommands() { - this.registerCommand< - { channel: string; historyLimit?: number }, - { success: boolean; history?: string[] } - >("mesh/subscribe-channel", async (ctx) => { - const { channel, historyLimit } = ctx.payload; - - if (!(await this.isChannelExposed(channel, ctx.connection))) { - return { success: false, history: [] }; - } - - try { - if (!this.channelSubscriptions[channel]) { - this.channelSubscriptions[channel] = new Set(); - await new Promise((resolve, reject) => { - this.subClient.subscribe(channel, (err) => { - if (err) reject(err); - else resolve(); - }); - }); - } - this.channelSubscriptions[channel].add(ctx.connection); - - let history: string[] = []; - if (historyLimit && historyLimit > 0) { - const historyKey = `history:${channel}`; - history = await this.redis.lrange(historyKey, 0, historyLimit - 1); - } - - return { - success: true, - history, - }; - } catch (e) { - return { success: false, history: [] }; - } - }); - - this.registerCommand<{ channel: string }, boolean>( - "mesh/unsubscribe-channel", - async (ctx) => { - const { channel } = ctx.payload; - if (this.channelSubscriptions[channel]) { - this.channelSubscriptions[channel].delete(ctx.connection); - if (this.channelSubscriptions[channel].size === 0) { - await new Promise((resolve, reject) => { - this.subClient.unsubscribe(channel, (err) => { - if (err) reject(err); - else resolve(); - }); - }); - delete this.channelSubscriptions[channel]; - } - return true; - } - return false; - } - ); - - this.registerCommand< - { roomName: string }, - { success: boolean; present: string[] } - >("mesh/join-room", async (ctx) => { - const { roomName } = ctx.payload; - await this.addToRoom(roomName, ctx.connection); - const present = await this.presenceManager.getPresentConnections( - roomName - ); - return { success: true, present }; - }); - - this.registerCommand<{ roomName: string }, { success: boolean }>( - "mesh/leave-room", - async (ctx) => { - const { roomName } = ctx.payload; - await this.removeFromRoom(roomName, ctx.connection); - return { success: true }; - } - ); - } - - private registerRecordCommands() { - this.registerCommand< - { recordId: string; mode?: "patch" | "full" }, - { success: boolean; record?: any; version?: number } - >("mesh/subscribe-record", async (ctx) => { - const { recordId, mode = "full" } = ctx.payload; - const connectionId = ctx.connection.id; - - if (!(await this.isRecordExposed(recordId, ctx.connection))) { - return { success: false }; - } - - try { - const { record, version } = - await this.recordManager.getRecordAndVersion(recordId); - - if (!this.recordSubscriptions.has(recordId)) { - this.recordSubscriptions.set(recordId, new Map()); - } - this.recordSubscriptions.get(recordId)!.set(connectionId, mode); - - return { success: true, record, version }; - } catch (e) { - console.error(`Failed to subscribe to record ${recordId}:`, e); - return { success: false }; - } - }); - - this.registerCommand<{ recordId: string }, boolean>( - "mesh/unsubscribe-record", - async (ctx) => { - const { recordId } = ctx.payload; - const connectionId = ctx.connection.id; - const recordSubs = this.recordSubscriptions.get(recordId); - - if (recordSubs?.has(connectionId)) { - recordSubs.delete(connectionId); - if (recordSubs.size === 0) { - this.recordSubscriptions.delete(recordId); - } - return true; - } - return false; - } - ); - - // New command for client-initiated record updates - this.registerCommand< - { recordId: string; newValue: any }, - { success: boolean } - >("mesh/publish-record-update", async (ctx) => { - const { recordId, newValue } = ctx.payload; - - if (!(await this.isRecordWritable(recordId, ctx.connection))) { - throw new CodeError( - `Record "${recordId}" is not writable by this connection.`, - "EACCESS", - "PermissionError" - ); - } - - try { - await this.publishRecordUpdate(recordId, newValue); - return { success: true }; - } catch (e: any) { - throw new CodeError( - `Failed to publish update for record "${recordId}": ${e.message}`, - "EUPDATE", - "UpdateError" - ); - } - }); - - this.registerCommand< - { roomName: string }, - { success: boolean; present: string[] } - >("mesh/subscribe-presence", async (ctx) => { - const { roomName } = ctx.payload; - const connectionId = ctx.connection.id; - - if ( - !(await this.presenceManager.isRoomTracked(roomName, ctx.connection)) - ) { - return { success: false, present: [] }; - } - - try { - const presenceChannel = `mesh:presence:updates:${roomName}`; - - if (!this.channelSubscriptions[presenceChannel]) { - this.channelSubscriptions[presenceChannel] = new Set(); - } - - this.channelSubscriptions[presenceChannel].add(ctx.connection); - - const present = await this.presenceManager.getPresentConnections( - roomName - ); - - return { success: true, present }; - } catch (e) { - console.error( - `Failed to subscribe to presence for room ${roomName}:`, - e - ); - return { success: false, present: [] }; - } - }); - - this.registerCommand<{ roomName: string }, boolean>( - "mesh/unsubscribe-presence", - async (ctx) => { - const { roomName } = ctx.payload; - const presenceChannel = `mesh:presence:updates:${roomName}`; - - if (this.channelSubscriptions[presenceChannel]) { - this.channelSubscriptions[presenceChannel].delete(ctx.connection); - - if (this.channelSubscriptions[presenceChannel].size === 0) { - delete this.channelSubscriptions[presenceChannel]; - } - - return true; - } - - return false; - } - ); - } - - /** - * Adds an array of middleware functions to a specific command. - * - * @param {string} command - The name of the command to associate the middleware with. - * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added to the command. - * @returns {void} - */ - addMiddlewareToCommand( - command: string, - middlewares: SocketMiddleware[] - ): void { - if (middlewares.length) { - this.middlewares[command] = this.middlewares[command] || []; - this.middlewares[command] = middlewares.concat(this.middlewares[command]); - } - } - - private async cleanupRecordSubscriptions(connection: Connection) { - const connectionId = connection.id; - this.recordSubscriptions.forEach((subscribers, recordId) => { - if (subscribers.has(connectionId)) { - subscribers.delete(connectionId); - if (subscribers.size === 0) { - this.recordSubscriptions.delete(recordId); - } - } - }); - } - - private async runCommand( - id: number, - commandName: string, - payload: any, - connection: Connection - ) { - const context = new MeshContext(this, commandName, connection, payload); - - try { - if (!this.commands[commandName]) { - throw new CodeError( - `Command "${commandName}" not found`, - "ENOTFOUND", - "CommandError" - ); - } - - if (this.globalMiddlewares.length) { - for (const middleware of this.globalMiddlewares) { - await middleware(context); - } - } - - if (this.middlewares[commandName]) { - for (const middleware of this.middlewares[commandName]) { - await middleware(context); - } - } - - const result = await this.commands[commandName](context); - connection.send({ id, command: commandName, payload: result }); - } catch (err) { - const errorPayload = - err instanceof Error - ? { - error: err.message, - code: (err as CodeError).code || "ESERVER", - name: err.name || "Error", - } - : { error: String(err), code: "EUNKNOWN", name: "UnknownError" }; - - connection.send({ id, command: commandName, payload: errorPayload }); - } - } - - /** - * Broadcasts a command and payload to a set of connections or all available connections. - * - * @param {string} command - The command to be broadcasted. - * @param {any} payload - The data associated with the command. - * @param {Connection[]=} connections - (Optional) A specific list of connections to broadcast to. If not provided, the command will be sent to all connections. - * - * @throws {Error} Emits an "error" event if broadcasting fails. - */ - async broadcast(command: string, payload: any, connections?: Connection[]) { - const cmd: Command = { command, payload }; - - try { - if (connections) { - const allConnectionIds = connections.map(({ id }) => id); - const connectionIds = - await this.connectionManager.getAllConnectionIds(); - const filteredIds = allConnectionIds.filter((id) => - connectionIds.includes(id) - ); - await this.publishOrSend(filteredIds, cmd); - } else { - const allConnectionIds = - await this.connectionManager.getAllConnectionIds(); - await this.publishOrSend(allConnectionIds, cmd); - } - } catch (err) { - this.emit( - "error", - new Error(`Failed to broadcast command "${command}": ${err}`) - ); - } - } - - /** - * Broadcasts a command and associated payload to all active connections within the specified room. - * - * @param {string} roomName - The name of the room whose connections will receive the broadcast. - * @param {string} command - The command to be broadcasted to the connections. - * @param {unknown} payload - The data payload associated with the command. - * @returns {Promise} A promise that resolves when the broadcast operation is complete. - * @throws {Error} If the broadcast operation fails, an error is thrown and the promise is rejected. - */ - async broadcastRoom( - roomName: string, - command: string, - payload: any - ): Promise { - const connectionIds = await this.roomManager.getRoomConnectionIds(roomName); - - try { - await this.publishOrSend(connectionIds, { command, payload }); - } catch (err) { - this.emit( - "error", - new Error(`Failed to broadcast command "${command}": ${err}`) - ); - } - } - - /** - * Broadcasts a command and payload to all active connections except for the specified one(s). - * Excludes the provided connection(s) from receiving the broadcast. - * - * @param {string} command - The command to broadcast to connections. - * @param {any} payload - The payload to send along with the command. - * @param {Connection | Connection[]} exclude - A single connection or an array of connections to exclude from the broadcast. - * @returns {Promise} A promise that resolves when the broadcast is complete. - * @emits {Error} Emits an "error" event if broadcasting the command fails. - */ - async broadcastExclude( - command: string, - payload: any, - exclude: Connection | Connection[] - ): Promise { - const excludedIds = new Set( - (Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id) - ); - - try { - const connectionIds = ( - await this.connectionManager.getAllConnectionIds() - ).filter((id) => !excludedIds.has(id)); - await this.publishOrSend(connectionIds, { command, payload }); - } catch (err) { - this.emit( - "error", - new Error(`Failed to broadcast command "${command}": ${err}`) - ); - } - } - - /** - * Broadcasts a command with a payload to all connections in a specified room, - * excluding one or more given connections. If the broadcast fails, emits an error event. - * - * @param {string} roomName - The name of the room to broadcast to. - * @param {string} command - The command to broadcast. - * @param {any} payload - The payload to send with the command. - * @param {Connection | Connection[]} exclude - A connection or array of connections to exclude from the broadcast. - * @returns {Promise} A promise that resolves when the broadcast is complete. - * @emits {Error} Emits an error event if broadcasting fails. - */ - async broadcastRoomExclude( - roomName: string, - command: string, - payload: any, - exclude: Connection | Connection[] - ): Promise { - const excludedIds = new Set( - (Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id) - ); - - try { - const connectionIds = ( - await this.roomManager.getRoomConnectionIds(roomName) - ).filter((id) => !excludedIds.has(id)); - await this.publishOrSend(connectionIds, { command, payload }); - } catch (err) { - this.emit( - "error", - new Error(`Failed to broadcast command "${command}": ${err}`) - ); - } - } - - private async publishOrSend(connectionIds: string[], command: Command) { - if (connectionIds.length === 0) { - return; - } - - // get instance mapping for the target connection IDs - const connectionInstanceMapping = - await this.connectionManager.getInstanceIdsForConnections(connectionIds); - const instanceMap: { [instanceId: string]: string[] } = {}; - - // group connection IDs by instance ID - for (const connectionId of connectionIds) { - const instanceId = connectionInstanceMapping[connectionId]; - - if (instanceId) { - if (!instanceMap[instanceId]) { - instanceMap[instanceId] = []; - } - - instanceMap[instanceId].push(connectionId); - } - } - - // publish command to each instance - for (const [instanceId, targetConnectionIds] of Object.entries( - instanceMap - )) { - if (targetConnectionIds.length === 0) continue; - - if (instanceId === this.instanceId) { - // send locally - targetConnectionIds.forEach((connectionId) => { - const connection = - this.connectionManager.getLocalConnection(connectionId); - if (connection && !connection.isDead) { - connection.send(command); - } - }); - } else { - // publish to remote instance via pubsub - const messagePayload: PubSubMessagePayload = { - targetConnectionIds, - command, - }; - const message = JSON.stringify(messagePayload); - - try { - await this.pubClient.publish( - this.getPubSubChannel(instanceId), - message - ); - } catch (err) { - this.emit( - "error", - new Error(`Failed to publish command "${command.command}": ${err}`) - ); - } - } - } - } - - async isInRoom(roomName: string, connection: Connection | string) { - const connectionId = - typeof connection === "string" ? connection : connection.id; - return this.roomManager.connectionIsInRoom(roomName, connectionId); - } - - async addToRoom(roomName: string, connection: Connection | string) { - const connectionId = - typeof connection === "string" ? connection : connection.id; - await this.roomManager.addToRoom(roomName, connection); - - if (await this.presenceManager.isRoomTracked(roomName)) { - await this.presenceManager.markOnline(connectionId, roomName); - } - } - - async removeFromRoom(roomName: string, connection: Connection | string) { - const connectionId = - typeof connection === "string" ? connection : connection.id; - - if (await this.presenceManager.isRoomTracked(roomName)) { - await this.presenceManager.markOffline(connectionId, roomName); - } - - return this.roomManager.removeFromRoom(roomName, connection); - } - - async removeFromAllRooms(connection: Connection | string) { - return this.roomManager.removeFromAllRooms(connection); - } - - async clearRoom(roomName: string) { - return this.roomManager.clearRoom(roomName); - } - - async getRoomMembers(roomName: string): Promise { - return this.roomManager.getRoomConnectionIds(roomName); - } - - private getPubSubChannel(instanceId: string): string { - return `${PUB_SUB_CHANNEL_PREFIX}${instanceId}`; - } - - private async cleanupConnection(connection: Connection) { - connection.stopIntervals(); - - try { - await this.presenceManager.cleanupConnection(connection); - await this.connectionManager.cleanupConnection(connection); - await this.roomManager.cleanupConnection(connection); - await this.cleanupRecordSubscriptions(connection); - } catch (err) { - this.emit("error", new Error(`Failed to clean up connection: ${err}`)); - } - } - - /** - * Gracefully closes all active connections, cleans up resources, - * and shuts down the service. Optionally accepts a callback function - * that will be invoked once shutdown is complete or if an error occurs. - * - * @param {((err?: Error) => void)=} callback - Optional callback to be invoked when closing is complete or if an error occurs. - * @returns {Promise} A promise that resolves when shutdown is complete. - * @throws {Error} If an error occurs during shutdown, the promise will be rejected with the error. - */ - async close(callback?: (err?: Error) => void): Promise { - this._isShuttingDown = true; - - const connections = Object.values( - this.connectionManager.getLocalConnections() - ); - await Promise.all( - connections.map(async (connection) => { - if (!connection.isDead) { - await connection.close(); - } - await this.cleanupConnection(connection); - }) - ); - - await new Promise((resolve, reject) => { - super.close((err?: Error) => { - if (err) reject(err); - else resolve(); - }); - }); - - this.pubClient.disconnect(); - this.subClient.disconnect(); - this.redis.disconnect(); - - this.listening = false; - this.removeAllListeners(); - - if (callback) { - callback(); - } - } - - /** - * Registers a callback function to be executed when a new connection is established. - * - * @param {(connection: Connection) => Promise | void} callback - The function to execute when a new connection is established. - * @returns {MeshServer} The server instance for method chaining. - */ - onConnection( - callback: (connection: Connection) => Promise | void - ): MeshServer { - this.on("connected", callback); - return this; - } - - /** - * Registers a callback function to be executed when a connection is closed. - * - * @param {(connection: Connection) => Promise | void} callback - The function to execute when a connection is closed. - * @returns {MeshServer} The server instance for method chaining. - */ - onDisconnection( - callback: (connection: Connection) => Promise | void - ): MeshServer { - this.on("disconnected", callback); - return this; - } -} +export { MeshServer } from "./mesh-server"; +export { MeshContext } from "./mesh-context"; +export type { + SocketMiddleware, + MeshServerOptions, + ChannelPattern, +} from "./types"; +export { RoomManager } from "./managers/room"; +export { RecordManager } from "./managers/record"; +export { ConnectionManager } from "./managers/connection"; +export { PresenceManager } from "./managers/presence"; +export { Connection } from "./connection"; diff --git a/packages/mesh/src/server/managers/broadcast.ts b/packages/mesh/src/server/managers/broadcast.ts new file mode 100644 index 0000000..c7b9489 --- /dev/null +++ b/packages/mesh/src/server/managers/broadcast.ts @@ -0,0 +1,212 @@ +import type { Connection } from "../connection"; +import type { Command } from "../../common/message"; +import type { ConnectionManager } from "./connection"; +import type { RoomManager } from "./room"; +import type Redis from "ioredis"; + +export class BroadcastManager { + private connectionManager: ConnectionManager; + private roomManager: RoomManager; + private instanceId: string; + private pubClient: Redis; + private getPubSubChannel: (instanceId: string) => string; + private emitError: (error: Error) => void; + + constructor( + connectionManager: ConnectionManager, + roomManager: RoomManager, + instanceId: string, + pubClient: any, + getPubSubChannel: (instanceId: string) => string, + emitError: (error: Error) => void + ) { + this.connectionManager = connectionManager; + this.roomManager = roomManager; + this.instanceId = instanceId; + this.pubClient = pubClient; + this.getPubSubChannel = getPubSubChannel; + this.emitError = emitError; + } + + /** + * Broadcasts a command and payload to a set of connections or all available connections. + * + * @param {string} command - The command to be broadcasted. + * @param {any} payload - The data associated with the command. + * @param {Connection[]=} connections - (Optional) A specific list of connections to broadcast to. If not provided, the command will be sent to all connections. + * + * @throws {Error} Emits an "error" event if broadcasting fails. + */ + async broadcast(command: string, payload: any, connections?: Connection[]) { + const cmd: Command = { command, payload }; + + try { + if (connections) { + const allConnectionIds = connections.map(({ id }) => id); + const connectionIds = + await this.connectionManager.getAllConnectionIds(); + const filteredIds = allConnectionIds.filter((id) => + connectionIds.includes(id) + ); + await this.publishOrSend(filteredIds, cmd); + } else { + const allConnectionIds = + await this.connectionManager.getAllConnectionIds(); + await this.publishOrSend(allConnectionIds, cmd); + } + } catch (err) { + this.emitError( + new Error(`Failed to broadcast command "${command}": ${err}`) + ); + } + } + + /** + * Broadcasts a command and associated payload to all active connections within the specified room. + * + * @param {string} roomName - The name of the room whose connections will receive the broadcast. + * @param {string} command - The command to be broadcasted to the connections. + * @param {unknown} payload - The data payload associated with the command. + * @returns {Promise} A promise that resolves when the broadcast operation is complete. + * @throws {Error} If the broadcast operation fails, an error is thrown and the promise is rejected. + */ + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + const connectionIds = await this.roomManager.getRoomConnectionIds(roomName); + + try { + await this.publishOrSend(connectionIds, { command, payload }); + } catch (err) { + this.emitError( + new Error(`Failed to broadcast command "${command}": ${err}`) + ); + } + } + + /** + * Broadcasts a command and payload to all active connections except for the specified one(s). + * Excludes the provided connection(s) from receiving the broadcast. + * + * @param {string} command - The command to broadcast to connections. + * @param {any} payload - The payload to send along with the command. + * @param {Connection | Connection[]} exclude - A single connection or an array of connections to exclude from the broadcast. + * @returns {Promise} A promise that resolves when the broadcast is complete. + * @emits {Error} Emits an "error" event if broadcasting the command fails. + */ + async broadcastExclude( + command: string, + payload: any, + exclude: Connection | Connection[] + ): Promise { + const excludedIds = new Set( + (Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id) + ); + + try { + const connectionIds = ( + await this.connectionManager.getAllConnectionIds() + ).filter((id: string) => !excludedIds.has(id)); + await this.publishOrSend(connectionIds, { command, payload }); + } catch (err) { + this.emitError( + new Error(`Failed to broadcast command "${command}": ${err}`) + ); + } + } + + /** + * Broadcasts a command with a payload to all connections in a specified room, + * excluding one or more given connections. If the broadcast fails, emits an error event. + * + * @param {string} roomName - The name of the room to broadcast to. + * @param {string} command - The command to broadcast. + * @param {any} payload - The payload to send with the command. + * @param {Connection | Connection[]} exclude - A connection or array of connections to exclude from the broadcast. + * @returns {Promise} A promise that resolves when the broadcast is complete. + * @emits {Error} Emits an error event if broadcasting fails. + */ + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + exclude: Connection | Connection[] + ): Promise { + const excludedIds = new Set( + (Array.isArray(exclude) ? exclude : [exclude]).map(({ id }) => id) + ); + + try { + const connectionIds = ( + await this.roomManager.getRoomConnectionIds(roomName) + ).filter((id: string) => !excludedIds.has(id)); + await this.publishOrSend(connectionIds, { command, payload }); + } catch (err) { + this.emitError( + new Error(`Failed to broadcast command "${command}": ${err}`) + ); + } + } + + private async publishOrSend(connectionIds: string[], command: Command) { + if (connectionIds.length === 0) { + return; + } + + // get instance mapping for the target connection IDs + const connectionInstanceMapping = + await this.connectionManager.getInstanceIdsForConnections(connectionIds); + const instanceMap: { [instanceId: string]: string[] } = {}; + + // group connection IDs by instance ID + for (const connectionId of connectionIds) { + const instanceId = connectionInstanceMapping[connectionId]; + + if (instanceId) { + if (!instanceMap[instanceId]) { + instanceMap[instanceId] = []; + } + + instanceMap[instanceId].push(connectionId); + } + } + + // publish command to each instance + for (const [instanceId, targetConnectionIds] of Object.entries( + instanceMap + )) { + if (targetConnectionIds.length === 0) continue; + + if (instanceId === this.instanceId) { + // send locally + targetConnectionIds.forEach((connectionId) => { + const connection = + this.connectionManager.getLocalConnection(connectionId); + if (connection && !connection.isDead) { + connection.send(command); + } + }); + } else { + // publish to remote instance via pubsub + const messagePayload = { + targetConnectionIds, + command, + }; + const message = JSON.stringify(messagePayload); + + try { + await this.pubClient.publish( + this.getPubSubChannel(instanceId), + message + ); + } catch (err) { + this.emitError( + new Error(`Failed to publish command "${command.command}": ${err}`) + ); + } + } + } + } +} diff --git a/packages/mesh/src/server/managers/channel.ts b/packages/mesh/src/server/managers/channel.ts new file mode 100644 index 0000000..5022ac3 --- /dev/null +++ b/packages/mesh/src/server/managers/channel.ts @@ -0,0 +1,200 @@ +import type { Redis } from "ioredis"; +import type { Connection } from "../connection"; +import type { ChannelPattern } from "../types"; + +export class ChannelManager { + private redis: Redis; + private pubClient: Redis; + private subClient: Redis; + private exposedChannels: ChannelPattern[] = []; + private channelGuards: Map< + ChannelPattern, + (connection: Connection, channel: string) => Promise | boolean + > = new Map(); + private channelSubscriptions: { [channel: string]: Set } = {}; + private emitError: (error: Error) => void; + + constructor( + redis: Redis, + pubClient: Redis, + subClient: Redis, + emitError: (error: Error) => void + ) { + this.redis = redis; + this.pubClient = pubClient; + this.subClient = subClient; + this.emitError = emitError; + } + + /** + * Exposes a channel for external access and optionally associates a guard function + * to control access to that channel. The guard function determines whether a given + * connection is permitted to access the channel. + * + * @param {ChannelPattern} channel - The channel or pattern to expose. + * @param {(connection: Connection, channel: string) => Promise | boolean} [guard] - + * Optional guard function that receives the connection and channel name, returning + * a boolean or a promise that resolves to a boolean indicating whether access is allowed. + * @returns {void} + */ + exposeChannel( + channel: ChannelPattern, + guard?: ( + connection: Connection, + channel: string + ) => Promise | boolean + ): void { + this.exposedChannels.push(channel); + if (guard) { + this.channelGuards.set(channel, guard); + } + } + + /** + * Checks if a channel is exposed and if the connection has access to it. + * + * @param channel - The channel to check + * @param connection - The connection requesting access + * @returns A promise that resolves to true if the channel is exposed and the connection has access + */ + async isChannelExposed( + channel: string, + connection: Connection + ): Promise { + const matchedPattern = this.exposedChannels.find((pattern) => + typeof pattern === "string" ? pattern === channel : pattern.test(channel) + ); + + if (!matchedPattern) { + return false; + } + + const guard = this.channelGuards.get(matchedPattern); + if (guard) { + try { + return await Promise.resolve(guard(connection, channel)); + } catch (e) { + return false; + } + } + + return true; + } + + /** + * Publishes a message to a specified channel and optionally maintains a history of messages. + * + * @param {string} channel - The name of the channel to which the message will be published. + * @param {any} message - The message to be published. Will not be stringified automatically for you. You need to do that yourself. + * @param {number} [history=0] - The number of historical messages to retain for the channel. Defaults to 0, meaning no history is retained. + * If greater than 0, the message will be added to the channel's history and the history will be trimmed to the specified size. + * @returns {Promise} A Promise that resolves once the message has been published and, if applicable, the history has been updated. + * @throws {Error} This function may throw an error if the underlying `pubClient` operations (e.g., `lpush`, `ltrim`, `publish`) fail. + */ + async publishToChannel( + channel: string, + message: any, + history: number = 0 + ): Promise { + const parsedHistory = parseInt(history as any, 10); + if (!isNaN(parsedHistory) && parsedHistory > 0) { + await this.pubClient.lpush(`history:${channel}`, message); + await this.pubClient.ltrim(`history:${channel}`, 0, parsedHistory); + } + await this.pubClient.publish(channel, message); + } + + /** + * Subscribes a connection to a channel + * + * @param channel - The channel to subscribe to + * @param connection - The connection to subscribe + */ + addSubscription(channel: string, connection: Connection): void { + if (!this.channelSubscriptions[channel]) { + this.channelSubscriptions[channel] = new Set(); + } + this.channelSubscriptions[channel].add(connection); + } + + /** + * Unsubscribes a connection from a channel + * + * @param channel - The channel to unsubscribe from + * @param connection - The connection to unsubscribe + * @returns true if the connection was subscribed and is now unsubscribed, false otherwise + */ + removeSubscription(channel: string, connection: Connection): boolean { + if (this.channelSubscriptions[channel]) { + this.channelSubscriptions[channel].delete(connection); + if (this.channelSubscriptions[channel].size === 0) { + delete this.channelSubscriptions[channel]; + } + return true; + } + return false; + } + + /** + * Gets all subscribers for a channel + * + * @param channel - The channel to get subscribers for + * @returns A set of connections subscribed to the channel, or undefined if none + */ + getSubscribers(channel: string): Set | undefined { + return this.channelSubscriptions[channel]; + } + + /** + * Subscribes to a Redis channel + * + * @param channel - The channel to subscribe to + * @returns A promise that resolves when the subscription is complete + */ + async subscribeToRedisChannel(channel: string): Promise { + return new Promise((resolve, reject) => { + this.subClient.subscribe(channel, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + } + + /** + * Unsubscribes from a Redis channel + * + * @param channel - The channel to unsubscribe from + * @returns A promise that resolves when the unsubscription is complete + */ + async unsubscribeFromRedisChannel(channel: string): Promise { + return new Promise((resolve, reject) => { + this.subClient.unsubscribe(channel, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + } + + /** + * Gets channel history from Redis + * + * @param channel - The channel to get history for + * @param limit - The maximum number of history items to retrieve + * @returns A promise that resolves to an array of history items + */ + async getChannelHistory(channel: string, limit: number): Promise { + const historyKey = `history:${channel}`; + return this.redis.lrange(historyKey, 0, limit - 1); + } + + /** + * Cleans up all subscriptions for a connection + * + * @param connection - The connection to clean up + */ + cleanupConnection(connection: Connection): void { + for (const channel in this.channelSubscriptions) { + this.removeSubscription(channel, connection); + } + } +} diff --git a/packages/mesh/src/server/managers/command.ts b/packages/mesh/src/server/managers/command.ts new file mode 100644 index 0000000..5a715ae --- /dev/null +++ b/packages/mesh/src/server/managers/command.ts @@ -0,0 +1,144 @@ +import { CodeError } from "../../client"; +import { MeshContext } from "../mesh-context"; +import type { Connection } from "../connection"; +import type { SocketMiddleware } from "../types"; + +export class CommandManager { + private commands: { + [command: string]: (context: MeshContext) => Promise | any; + } = {}; + private globalMiddlewares: SocketMiddleware[] = []; + private middlewares: { [key: string]: SocketMiddleware[] } = {}; + private emitError: (error: Error) => void; + + constructor(emitError: (error: Error) => void) { + this.emitError = emitError; + } + + /** + * Registers a command with an associated callback and optional middleware. + * + * @template T The type for `MeshContext.payload`. Defaults to `any`. + * @template U The command's return value type. Defaults to `any`. + * @param {string} command - The unique identifier for the command to register. + * @param {(context: MeshContext) => Promise | U} callback - The function to execute when the command is invoked. It receives a `MeshContext` of type `T` and may return a value of type `U` or a `Promise` resolving to `U`. + * @param {SocketMiddleware[]} [middlewares=[]] - An optional array of middleware functions to apply to the command. Defaults to an empty array. + * @throws {Error} May throw an error if the command registration or middleware addition fails. + */ + registerCommand( + command: string, + callback: (context: MeshContext) => Promise | U, + middlewares: SocketMiddleware[] = [] + ) { + this.commands[command] = callback; + + if (middlewares.length > 0) { + this.addMiddlewareToCommand(command, middlewares); + } + } + + /** + * Adds one or more middleware functions to the global middleware stack. + * + * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added. Each middleware + * is expected to conform to the `SocketMiddleware` type. + * @returns {void} + * @throws {Error} If the provided middlewares are not valid or fail validation (if applicable). + */ + addMiddleware(...middlewares: SocketMiddleware[]): void { + this.globalMiddlewares.push(...middlewares); + } + + /** + * Adds an array of middleware functions to a specific command. + * + * @param {string} command - The name of the command to associate the middleware with. + * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added to the command. + * @returns {void} + */ + addMiddlewareToCommand( + command: string, + middlewares: SocketMiddleware[] + ): void { + if (middlewares.length) { + this.middlewares[command] = this.middlewares[command] || []; + this.middlewares[command] = middlewares.concat(this.middlewares[command]); + } + } + + /** + * Runs a command with the given parameters + * + * @param id - The command ID + * @param commandName - The name of the command to run + * @param payload - The payload for the command + * @param connection - The connection that initiated the command + * @param server - The server instance + */ + async runCommand( + id: number, + commandName: string, + payload: any, + connection: Connection, + server: any + ) { + const context = new MeshContext(server, commandName, connection, payload); + + try { + if (!this.commands[commandName]) { + throw new CodeError( + `Command "${commandName}" not found`, + "ENOTFOUND", + "CommandError" + ); + } + + if (this.globalMiddlewares.length) { + for (const middleware of this.globalMiddlewares) { + await middleware(context); + } + } + + if (this.middlewares[commandName]) { + for (const middleware of this.middlewares[commandName]) { + await middleware(context); + } + } + + const result = await this.commands[commandName](context); + connection.send({ id, command: commandName, payload: result }); + } catch (err) { + const errorPayload = + err instanceof Error + ? { + error: err.message, + code: (err as CodeError).code || "ESERVER", + name: err.name || "Error", + } + : { error: String(err), code: "EUNKNOWN", name: "UnknownError" }; + + connection.send({ id, command: commandName, payload: errorPayload }); + } + } + + /** + * Gets all registered commands + * + * @returns An object mapping command names to their handler functions + */ + getCommands(): { + [command: string]: (context: MeshContext) => Promise | any; + } { + return this.commands; + } + + /** + * Checks if a command is registered + * + * @param commandName - The name of the command to check + * @returns true if the command is registered, false otherwise + */ + hasCommand(commandName: string): boolean { + return !!this.commands[commandName]; + } +} diff --git a/packages/mesh/src/server/connection-manager.ts b/packages/mesh/src/server/managers/connection.ts similarity index 98% rename from packages/mesh/src/server/connection-manager.ts rename to packages/mesh/src/server/managers/connection.ts index 3c758f2..c83f05f 100644 --- a/packages/mesh/src/server/connection-manager.ts +++ b/packages/mesh/src/server/managers/connection.ts @@ -1,6 +1,6 @@ import type Redis from "ioredis"; -import type { Connection } from "./connection"; -import type { RoomManager } from "./room-manager"; +import type { Connection } from "../connection"; +import type { RoomManager } from "./room"; const CONNECTIONS_HASH_KEY = "mesh:connections"; const INSTANCE_CONNECTIONS_KEY_PREFIX = "mesh:connections:"; diff --git a/packages/mesh/src/server/presence-manager.ts b/packages/mesh/src/server/managers/presence.ts similarity index 97% rename from packages/mesh/src/server/presence-manager.ts rename to packages/mesh/src/server/managers/presence.ts index b607a47..d72f6e2 100644 --- a/packages/mesh/src/server/presence-manager.ts +++ b/packages/mesh/src/server/managers/presence.ts @@ -1,6 +1,6 @@ import type { Redis } from "ioredis"; -import type { Connection } from "./connection"; -import type { RoomManager } from "./room-manager"; +import type { Connection } from "../connection"; +import type { RoomManager } from "./room"; type ChannelPattern = string | RegExp; diff --git a/packages/mesh/src/server/managers/pubsub.ts b/packages/mesh/src/server/managers/pubsub.ts new file mode 100644 index 0000000..172fc03 --- /dev/null +++ b/packages/mesh/src/server/managers/pubsub.ts @@ -0,0 +1,223 @@ +import type { Redis } from "ioredis"; +import type { Connection } from "../connection"; +import type { ConnectionManager } from "./connection"; +import type { PubSubMessagePayload, RecordUpdatePubSubPayload } from "../types"; +import { + PUB_SUB_CHANNEL_PREFIX, + RECORD_PUB_SUB_CHANNEL, +} from "../utils/constants"; + +export class PubSubManager { + private subClient: Redis; + private instanceId: string; + private connectionManager: ConnectionManager; + private recordSubscriptions: Map< + string, // recordId + Map // connectionId -> mode + >; + private getChannelSubscriptions: ( + channel: string + ) => Set | undefined; + private emitError: (error: Error) => void; + private _subscriptionPromise!: Promise; + + constructor( + subClient: Redis, + instanceId: string, + connectionManager: ConnectionManager, + recordSubscriptions: Map>, + getChannelSubscriptions: (channel: string) => Set | undefined, + emitError: (error: Error) => void + ) { + this.subClient = subClient; + this.instanceId = instanceId; + this.connectionManager = connectionManager; + this.recordSubscriptions = recordSubscriptions; + this.getChannelSubscriptions = getChannelSubscriptions; + this.emitError = emitError; + } + + /** + * Subscribes to the instance channel and sets up message handlers + * + * @returns A promise that resolves when the subscription is complete + */ + subscribeToInstanceChannel(): Promise { + const channel = `${PUB_SUB_CHANNEL_PREFIX}${this.instanceId}`; + + this._subscriptionPromise = new Promise((resolve, reject) => { + this.subClient.subscribe(channel, RECORD_PUB_SUB_CHANNEL); + this.subClient.psubscribe("mesh:presence:updates:*", (err) => { + if (err) { + this.emitError( + new Error( + `Failed to subscribe to channels ${channel}, ${RECORD_PUB_SUB_CHANNEL}:`, + { cause: err } + ) + ); + reject(err); + return; + } + resolve(); + }); + }); + + this.setupMessageHandlers(); + + return this._subscriptionPromise; + } + + /** + * Sets up message handlers for the subscribed channels + */ + private setupMessageHandlers(): void { + this.subClient.on("message", async (channel, message) => { + if (channel.startsWith(PUB_SUB_CHANNEL_PREFIX)) { + this.handleInstancePubSubMessage(channel, message); + } else if (channel === RECORD_PUB_SUB_CHANNEL) { + this.handleRecordUpdatePubSubMessage(message); + } else { + const subscribers = this.getChannelSubscriptions(channel); + if (subscribers) { + for (const connection of subscribers) { + if (!connection.isDead) { + connection.send({ + command: "mesh/subscription-message", + payload: { channel, message }, + }); + } + } + } + } + }); + + this.subClient.on("pmessage", async (pattern, channel, message) => { + if (pattern === "mesh:presence:updates:*") { + // channel here is the actual channel, e.g., mesh:presence:updates:roomName + const subscribers = this.getChannelSubscriptions(channel); + if (subscribers) { + try { + const payload = JSON.parse(message); + subscribers.forEach((connection: Connection) => { + if (!connection.isDead) { + connection.send({ + command: "mesh/presence-update", + payload: payload, + }); + } else { + // clean up dead connections from subscription list + subscribers.delete(connection); + } + }); + } catch (e) { + this.emitError( + new Error(`Failed to parse presence update: ${message}`) + ); + } + } + } + }); + } + + /** + * Handles messages from the instance PubSub channel + * + * @param channel - The channel the message was received on + * @param message - The message content + */ + private handleInstancePubSubMessage(channel: string, message: string) { + try { + const parsedMessage = JSON.parse(message) as PubSubMessagePayload; + + if ( + !parsedMessage || + !Array.isArray(parsedMessage.targetConnectionIds) || + !parsedMessage.command || + typeof parsedMessage.command.command !== "string" + ) { + throw new Error("Invalid message format"); + } + + const { targetConnectionIds, command } = parsedMessage; + + targetConnectionIds.forEach((connectionId) => { + const connection = + this.connectionManager.getLocalConnection(connectionId); + + if (connection && !connection.isDead) { + connection.send(command); + } + }); + } catch (err) { + this.emitError(new Error(`Failed to parse message: ${message}`)); + } + } + + /** + * Handles record update messages from the record PubSub channel + * + * @param message - The message content + */ + private handleRecordUpdatePubSubMessage(message: string) { + try { + const parsedMessage = JSON.parse(message) as RecordUpdatePubSubPayload; + const { recordId, newValue, patch, version } = parsedMessage; + + if (!recordId || typeof version !== "number") { + throw new Error("Invalid record update message format"); + } + + const subscribers = this.recordSubscriptions.get(recordId); + + if (!subscribers) { + return; + } + + subscribers.forEach((mode, connectionId) => { + const connection = + this.connectionManager.getLocalConnection(connectionId); + if (connection && !connection.isDead) { + if (mode === "patch" && patch) { + connection.send({ + command: "mesh/record-update", + payload: { recordId, patch, version }, + }); + } else if (mode === "full" && newValue !== undefined) { + connection.send({ + command: "mesh/record-update", + payload: { recordId, full: newValue, version }, + }); + } + } else if (!connection) { + subscribers.delete(connectionId); + if (subscribers.size === 0) { + this.recordSubscriptions.delete(recordId); + } + } + }); + } catch (err) { + this.emitError( + new Error(`Failed to parse record update message: ${message}`) + ); + } + } + + /** + * Gets the subscription promise + * + * @returns The subscription promise + */ + getSubscriptionPromise(): Promise { + return this._subscriptionPromise; + } + + /** + * Gets the PubSub channel for an instance + * + * @param instanceId - The instance ID + * @returns The PubSub channel name + */ + getPubSubChannel(instanceId: string): string { + return `${PUB_SUB_CHANNEL_PREFIX}${instanceId}`; + } +} diff --git a/packages/mesh/src/server/managers/record-subscription.ts b/packages/mesh/src/server/managers/record-subscription.ts new file mode 100644 index 0000000..b956f94 --- /dev/null +++ b/packages/mesh/src/server/managers/record-subscription.ts @@ -0,0 +1,270 @@ +import type { Redis } from "ioredis"; +import type { Connection } from "../connection"; +import type { ChannelPattern } from "../types"; +import type { RecordManager } from "./record"; +import type { Operation } from "fast-json-patch"; +import { RECORD_PUB_SUB_CHANNEL } from "../utils/constants"; + +export class RecordSubscriptionManager { + private pubClient: Redis; + private recordManager: RecordManager; + private exposedRecords: ChannelPattern[] = []; + private exposedWritableRecords: ChannelPattern[] = []; + private recordGuards: Map< + ChannelPattern, + (connection: Connection, recordId: string) => Promise | boolean + > = new Map(); + private writableRecordGuards: Map< + ChannelPattern, + (connection: Connection, recordId: string) => Promise | boolean + > = new Map(); + private recordSubscriptions: Map< + string, // recordId + Map // connectionId -> mode + > = new Map(); + private emitError: (error: Error) => void; + + constructor( + pubClient: Redis, + recordManager: RecordManager, + emitError: (error: Error) => void + ) { + this.pubClient = pubClient; + this.recordManager = recordManager; + this.emitError = emitError; + } + + /** + * Exposes a record or pattern for client subscriptions, optionally adding a guard function. + * + * @param {ChannelPattern} recordPattern - The record ID or pattern to expose. + * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. + */ + exposeRecord( + recordPattern: ChannelPattern, + guard?: ( + connection: Connection, + recordId: string + ) => Promise | boolean + ): void { + this.exposedRecords.push(recordPattern); + if (guard) { + this.recordGuards.set(recordPattern, guard); + } + } + + /** + * Exposes a record or pattern for client writes, optionally adding a guard function. + * + * @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable. + * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. + */ + exposeWritableRecord( + recordPattern: ChannelPattern, + guard?: ( + connection: Connection, + recordId: string + ) => Promise | boolean + ): void { + this.exposedWritableRecords.push(recordPattern); + if (guard) { + this.writableRecordGuards.set(recordPattern, guard); + } + } + + /** + * Checks if a record is exposed for reading + * + * @param recordId - The record ID to check + * @param connection - The connection requesting access + * @returns A promise that resolves to true if the record is exposed and the connection has access + */ + async isRecordExposed( + recordId: string, + connection: Connection + ): Promise { + const readPattern = this.exposedRecords.find((pattern) => + typeof pattern === "string" + ? pattern === recordId + : pattern.test(recordId) + ); + + let canRead = false; + if (readPattern) { + const guard = this.recordGuards.get(readPattern); + if (guard) { + try { + canRead = await Promise.resolve(guard(connection, recordId)); + } catch (e) { + canRead = false; + } + } else { + canRead = true; + } + } + + if (canRead) { + return true; + } + + // if exposed as writable, it is implicitly readable + const writePattern = this.exposedWritableRecords.find((pattern) => + typeof pattern === "string" + ? pattern === recordId + : pattern.test(recordId) + ); + + // If exposed as writable, it's readable. No need to check the *write* guard here. + if (writePattern) { + return true; + } + + return false; + } + + /** + * Checks if a record is exposed for writing + * + * @param recordId - The record ID to check + * @param connection - The connection requesting access + * @returns A promise that resolves to true if the record is writable and the connection has access + */ + async isRecordWritable( + recordId: string, + connection: Connection + ): Promise { + const matchedPattern = this.exposedWritableRecords.find((pattern) => + typeof pattern === "string" + ? pattern === recordId + : pattern.test(recordId) + ); + + if (!matchedPattern) { + return false; + } + + const guard = this.writableRecordGuards.get(matchedPattern); + if (guard) { + try { + return await Promise.resolve(guard(connection, recordId)); + } catch (e) { + return false; + } + } + + return true; + } + + /** + * Subscribes a connection to a record + * + * @param recordId - The record ID to subscribe to + * @param connectionId - The connection ID to subscribe + * @param mode - The subscription mode (patch or full) + */ + addSubscription( + recordId: string, + connectionId: string, + mode: "patch" | "full" + ): void { + if (!this.recordSubscriptions.has(recordId)) { + this.recordSubscriptions.set(recordId, new Map()); + } + this.recordSubscriptions.get(recordId)!.set(connectionId, mode); + } + + /** + * Unsubscribes a connection from a record + * + * @param recordId - The record ID to unsubscribe from + * @param connectionId - The connection ID to unsubscribe + * @returns true if the connection was subscribed and is now unsubscribed, false otherwise + */ + removeSubscription(recordId: string, connectionId: string): boolean { + const recordSubs = this.recordSubscriptions.get(recordId); + if (recordSubs?.has(connectionId)) { + recordSubs.delete(connectionId); + if (recordSubs.size === 0) { + this.recordSubscriptions.delete(recordId); + } + return true; + } + return false; + } + + /** + * Gets all subscribers for a record + * + * @param recordId - The record ID to get subscribers for + * @returns A map of connection IDs to subscription modes, or undefined if none + */ + getSubscribers(recordId: string): Map | undefined { + return this.recordSubscriptions.get(recordId); + } + + /** + * Updates a record, persists it to Redis, increments its version, computes a patch, + * and publishes the update via Redis pub/sub. + * + * @param {string} recordId - The ID of the record to update. + * @param {any} newValue - The new value for the record. + * @returns {Promise} + * @throws {Error} If the update fails. + */ + async publishRecordUpdate(recordId: string, newValue: any): Promise { + const updateResult = await this.recordManager.publishUpdate( + recordId, + newValue + ); + + if (!updateResult) { + return; + } + + const { patch, version } = updateResult; + + const messagePayload = { + recordId, + newValue, + patch, + version, + }; + + try { + await this.pubClient.publish( + RECORD_PUB_SUB_CHANNEL, + JSON.stringify(messagePayload) + ); + } catch (err) { + this.emitError( + new Error(`Failed to publish record update for "${recordId}": ${err}`) + ); + } + } + + /** + * Cleans up all subscriptions for a connection + * + * @param connection - The connection to clean up + */ + cleanupConnection(connection: Connection): void { + const connectionId = connection.id; + this.recordSubscriptions.forEach((subscribers, recordId) => { + if (subscribers.has(connectionId)) { + subscribers.delete(connectionId); + if (subscribers.size === 0) { + this.recordSubscriptions.delete(recordId); + } + } + }); + } + + /** + * Gets all record subscriptions + * + * @returns The record subscriptions map + */ + getRecordSubscriptions(): Map> { + return this.recordSubscriptions; + } +} diff --git a/packages/mesh/src/server/record-manager.ts b/packages/mesh/src/server/managers/record.ts similarity index 100% rename from packages/mesh/src/server/record-manager.ts rename to packages/mesh/src/server/managers/record.ts diff --git a/packages/mesh/src/server/managers/redis.ts b/packages/mesh/src/server/managers/redis.ts new file mode 100644 index 0000000..cee7915 --- /dev/null +++ b/packages/mesh/src/server/managers/redis.ts @@ -0,0 +1,120 @@ +import { Redis, type RedisOptions } from "ioredis"; + +export class RedisManager { + private _redis: Redis | null = null; + private _pubClient: Redis | null = null; + private _subClient: Redis | null = null; + private _isShuttingDown = false; + private _options: RedisOptions | null = null; + + /** + * Initializes Redis connections with the provided options + * + * @param options - Redis connection options + * @param onError - Error handler callback + */ + initialize(options: RedisOptions, onError: (err: Error) => void): void { + this._options = options; + + this._redis = new Redis({ + retryStrategy: (times: number) => { + if (this._isShuttingDown) { + return null; + } + + if (times > 10) { + return null; + } + + return Math.min(1000 * Math.pow(2, times), 30000); + }, + ...options, + }); + + this._redis.on("error", (err) => { + onError(new Error(`Redis error: ${err}`)); + }); + + this._pubClient = this._redis.duplicate(); + this._subClient = this._redis.duplicate(); + } + + /** + * Gets the main Redis client + * + * @returns The Redis client + * @throws Error if Redis is not initialized + */ + get redis(): Redis { + if (!this._redis) { + throw new Error("Redis not initialized"); + } + return this._redis; + } + + /** + * Gets the Redis client for publishing + * + * @returns The publishing Redis client + * @throws Error if Redis is not initialized + */ + get pubClient(): Redis { + if (!this._pubClient) { + throw new Error("Redis pub client not initialized"); + } + return this._pubClient; + } + + /** + * Gets the Redis client for subscribing + * + * @returns The subscribing Redis client + * @throws Error if Redis is not initialized + */ + get subClient(): Redis { + if (!this._subClient) { + throw new Error("Redis sub client not initialized"); + } + return this._subClient; + } + + /** + * Disconnects all Redis clients + */ + disconnect(): void { + this._isShuttingDown = true; + + if (this._pubClient) { + this._pubClient.disconnect(); + this._pubClient = null; + } + + if (this._subClient) { + this._subClient.disconnect(); + this._subClient = null; + } + + if (this._redis) { + this._redis.disconnect(); + this._redis = null; + } + } + + /** + * Checks if Redis is shutting down + * + * @returns true if Redis is shutting down, false otherwise + */ + get isShuttingDown(): boolean { + return this._isShuttingDown; + } + + /** + * Sets the shutting down state + * + * @param value - The new shutting down state + */ + set isShuttingDown(value: boolean) { + this._isShuttingDown = value; + } +} diff --git a/packages/mesh/src/server/room-manager.ts b/packages/mesh/src/server/managers/room.ts similarity index 99% rename from packages/mesh/src/server/room-manager.ts rename to packages/mesh/src/server/managers/room.ts index a8e3871..31d2380 100644 --- a/packages/mesh/src/server/room-manager.ts +++ b/packages/mesh/src/server/managers/room.ts @@ -1,5 +1,5 @@ import Redis from "ioredis"; -import type { Connection } from "./connection"; +import type { Connection } from "../connection"; export class RoomManager { private redis: Redis; diff --git a/packages/mesh/src/server/mesh-context.ts b/packages/mesh/src/server/mesh-context.ts new file mode 100644 index 0000000..ab56edd --- /dev/null +++ b/packages/mesh/src/server/mesh-context.ts @@ -0,0 +1,21 @@ +import type { Connection } from "./connection"; +import type { MeshServer } from "./mesh-server"; + +export class MeshContext { + server: MeshServer; + command: string; + connection: Connection; + payload: T; + + constructor( + server: MeshServer, + command: string, + connection: Connection, + payload: T + ) { + this.server = server; + this.command = command; + this.connection = connection; + this.payload = payload; + } +} diff --git a/packages/mesh/src/server/mesh-server.ts b/packages/mesh/src/server/mesh-server.ts new file mode 100644 index 0000000..5c6b32c --- /dev/null +++ b/packages/mesh/src/server/mesh-server.ts @@ -0,0 +1,770 @@ +import { IncomingMessage } from "node:http"; +import { v4 as uuidv4 } from "uuid"; +import { WebSocketServer } from "ws"; +import { Status } from "../client"; +import { parseCommand } from "../common/message"; +import { Connection } from "./connection"; +import { MeshContext } from "./mesh-context"; +import { ConnectionManager } from "./managers/connection"; +import { PresenceManager } from "./managers/presence"; +import { RecordManager } from "./managers/record"; +import { RoomManager } from "./managers/room"; +import { BroadcastManager } from "./managers/broadcast"; +import { ChannelManager } from "./managers/channel"; +import { CommandManager } from "./managers/command"; +import { PubSubManager } from "./managers/pubsub"; +import { RecordSubscriptionManager } from "./managers/record-subscription"; +import { RedisManager } from "./managers/redis"; +import type { + ChannelPattern, + MeshServerOptions, + SocketMiddleware, +} from "./types"; +import { PUB_SUB_CHANNEL_PREFIX } from "./utils/constants"; + +export class MeshServer extends WebSocketServer { + readonly instanceId: string; + + private redisManager: RedisManager; + private commandManager: CommandManager; + private channelManager: ChannelManager; + private pubSubManager: PubSubManager; + private recordSubscriptionManager: RecordSubscriptionManager; + private broadcastManager: BroadcastManager; + roomManager: RoomManager; + recordManager: RecordManager; + connectionManager: ConnectionManager; + presenceManager: PresenceManager; + + serverOptions: MeshServerOptions; + status: Status = Status.OFFLINE; + private _listening = false; + + get listening(): boolean { + return this._listening; + } + + set listening(value: boolean) { + this._listening = value; + this.status = value ? Status.ONLINE : Status.OFFLINE; + } + + constructor(opts: MeshServerOptions) { + super(opts); + + this.instanceId = uuidv4(); + this.serverOptions = { + ...opts, + pingInterval: opts.pingInterval ?? 30_000, + latencyInterval: opts.latencyInterval ?? 5_000, + maxMissedPongs: opts.maxMissedPongs ?? 1, + }; + + this.redisManager = new RedisManager(); + this.redisManager.initialize(opts.redisOptions, (err) => + this.emit("error", err) + ); + + this.roomManager = new RoomManager(this.redisManager.redis); + this.recordManager = new RecordManager(this.redisManager.redis); + this.connectionManager = new ConnectionManager( + this.redisManager.pubClient, + this.instanceId, + this.roomManager + ); + this.presenceManager = new PresenceManager( + this.redisManager.redis, + this.roomManager + ); + this.commandManager = new CommandManager((err) => this.emit("error", err)); + this.channelManager = new ChannelManager( + this.redisManager.redis, + this.redisManager.pubClient, + this.redisManager.subClient, + (err) => this.emit("error", err) + ); + this.recordSubscriptionManager = new RecordSubscriptionManager( + this.redisManager.pubClient, + this.recordManager, + (err) => this.emit("error", err) + ); + this.pubSubManager = new PubSubManager( + this.redisManager.subClient, + this.instanceId, + this.connectionManager, + this.recordSubscriptionManager.getRecordSubscriptions(), + this.channelManager.getSubscribers.bind(this.channelManager), + (err) => this.emit("error", err) + ); + this.broadcastManager = new BroadcastManager( + this.connectionManager, + this.roomManager, + this.instanceId, + this.redisManager.pubClient, + (instanceId) => `${PUB_SUB_CHANNEL_PREFIX}${instanceId}`, + (err) => this.emit("error", err) + ); + + this.on("listening", () => { + this.listening = true; + }); + + this.on("error", (err) => { + console.error(`[MeshServer] Error: ${err}`); + }); + + this.on("close", () => { + this.listening = false; + }); + + this.pubSubManager.subscribeToInstanceChannel(); + this.registerBuiltinCommands(); + this.registerRecordCommands(); + this.applyListeners(); + } + + /** + * Waits until the service is ready by ensuring it is listening and the instance channel subscription is established. + * + * @returns {Promise} A promise that resolves when the service is fully ready. + * @throws {Error} If the readiness process fails or if any awaited promise rejects. + */ + async ready(): Promise { + const listeningPromise = this.listening + ? Promise.resolve() + : new Promise((resolve) => this.once("listening", resolve)); + + await Promise.all([ + listeningPromise, + this.pubSubManager.getSubscriptionPromise(), + ]); + } + + private applyListeners() { + this.on("connection", async (socket, req: IncomingMessage) => { + const connection = new Connection(socket, req, this.serverOptions); + + connection.on("message", (buffer: Buffer) => { + try { + const data = buffer.toString(); + const command = parseCommand(data); + + if ( + command.id !== undefined && + !["latency:response", "pong"].includes(command.command) + ) { + this.commandManager.runCommand( + command.id, + command.command, + command.payload, + connection, + this + ); + } + } catch (err) { + this.emit("error", err); + } + }); + + try { + await this.connectionManager.registerConnection(connection); + } catch (error) { + connection.close(); + return; + } + + this.emit("connected", connection); + + connection.on("close", async () => { + await this.cleanupConnection(connection); + this.emit("disconnected", connection); + }); + + connection.on("error", (err) => { + this.emit("clientError", err, connection); + }); + + connection.on("pong", async (connectionId) => { + try { + const rooms = await this.roomManager.getRoomsForConnection( + connectionId + ); + for (const roomName of rooms) { + if (await this.presenceManager.isRoomTracked(roomName)) { + await this.presenceManager.refreshPresence( + connectionId, + roomName + ); + } + } + } catch (err) { + this.emit("error", new Error(`Failed to refresh presence: ${err}`)); + } + }); + }); + } + + // #region Command Management + + /** + * Registers a command with an associated callback and optional middleware. + * + * @template T The type for `MeshContext.payload`. Defaults to `any`. + * @template U The command's return value type. Defaults to `any`. + * @param {string} command - The unique identifier for the command to register. + * @param {(context: MeshContext) => Promise | U} callback - The function to execute when the command is invoked. It receives a `MeshContext` of type `T` and may return a value of type `U` or a `Promise` resolving to `U`. + * @param {SocketMiddleware[]} [middlewares=[]] - An optional array of middleware functions to apply to the command. Defaults to an empty array. + * @throws {Error} May throw an error if the command registration or middleware addition fails. + */ + registerCommand( + command: string, + callback: (context: MeshContext) => Promise | U, + middlewares: SocketMiddleware[] = [] + ) { + this.commandManager.registerCommand(command, callback, middlewares); + } + + /** + * Adds one or more middleware functions to the global middleware stack. + * + * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added. Each middleware + * is expected to conform to the `SocketMiddleware` type. + * @returns {void} + * @throws {Error} If the provided middlewares are not valid or fail validation (if applicable). + */ + addMiddleware(...middlewares: SocketMiddleware[]): void { + this.commandManager.addMiddleware(...middlewares); + } + + /** + * Adds an array of middleware functions to a specific command. + * + * @param {string} command - The name of the command to associate the middleware with. + * @param {SocketMiddleware[]} middlewares - An array of middleware functions to be added to the command. + * @returns {void} + */ + addMiddlewareToCommand( + command: string, + middlewares: SocketMiddleware[] + ): void { + this.commandManager.addMiddlewareToCommand(command, middlewares); + } + + // #endregion + + // #region Channel Management + + /** + * Exposes a channel for external access and optionally associates a guard function + * to control access to that channel. The guard function determines whether a given + * connection is permitted to access the channel. + * + * @param {ChannelPattern} channel - The channel or pattern to expose. + * @param {(connection: Connection, channel: string) => Promise | boolean} [guard] - + * Optional guard function that receives the connection and channel name, returning + * a boolean or a promise that resolves to a boolean indicating whether access is allowed. + * @returns {void} + */ + exposeChannel( + channel: ChannelPattern, + guard?: ( + connection: Connection, + channel: string + ) => Promise | boolean + ): void { + this.channelManager.exposeChannel(channel, guard); + } + + /** + * Publishes a message to a specified channel and optionally maintains a history of messages. + * + * @param {string} channel - The name of the channel to which the message will be published. + * @param {any} message - The message to be published. Will not be stringified automatically for you. You need to do that yourself. + * @param {number} [history=0] - The number of historical messages to retain for the channel. Defaults to 0, meaning no history is retained. + * If greater than 0, the message will be added to the channel's history and the history will be trimmed to the specified size. + * @returns {Promise} A Promise that resolves once the message has been published and, if applicable, the history has been updated. + * @throws {Error} This function may throw an error if the underlying `pubClient` operations (e.g., `lpush`, `ltrim`, `publish`) fail. + */ + async publishToChannel( + channel: string, + message: any, + history: number = 0 + ): Promise { + return this.channelManager.publishToChannel(channel, message, history); + } + + // #endregion + + // #region Record Management + + /** + * Exposes a record or pattern for client subscriptions, optionally adding a guard function. + * + * @param {ChannelPattern} recordPattern - The record ID or pattern to expose. + * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. + */ + exposeRecord( + recordPattern: ChannelPattern, + guard?: ( + connection: Connection, + recordId: string + ) => Promise | boolean + ): void { + this.recordSubscriptionManager.exposeRecord(recordPattern, guard); + } + + /** + * Exposes a record or pattern for client writes, optionally adding a guard function. + * + * @param {ChannelPattern} recordPattern - The record ID or pattern to expose as writable. + * @param {(connection: Connection, recordId: string) => Promise | boolean} [guard] - Optional guard function. + */ + exposeWritableRecord( + recordPattern: ChannelPattern, + guard?: ( + connection: Connection, + recordId: string + ) => Promise | boolean + ): void { + this.recordSubscriptionManager.exposeWritableRecord(recordPattern, guard); + } + + /** + * Updates a record, persists it to Redis, increments its version, computes a patch, + * and publishes the update via Redis pub/sub. + * + * @param {string} recordId - The ID of the record to update. + * @param {any} newValue - The new value for the record. + * @returns {Promise} + * @throws {Error} If the update fails. + */ + async publishRecordUpdate(recordId: string, newValue: any): Promise { + return this.recordSubscriptionManager.publishRecordUpdate( + recordId, + newValue + ); + } + + // #endregion + + // #region Room Management + + async isInRoom(roomName: string, connection: Connection | string) { + const connectionId = + typeof connection === "string" ? connection : connection.id; + return this.roomManager.connectionIsInRoom(roomName, connectionId); + } + + async addToRoom(roomName: string, connection: Connection | string) { + const connectionId = + typeof connection === "string" ? connection : connection.id; + await this.roomManager.addToRoom(roomName, connection); + + if (await this.presenceManager.isRoomTracked(roomName)) { + await this.presenceManager.markOnline(connectionId, roomName); + } + } + + async removeFromRoom(roomName: string, connection: Connection | string) { + const connectionId = + typeof connection === "string" ? connection : connection.id; + + if (await this.presenceManager.isRoomTracked(roomName)) { + await this.presenceManager.markOffline(connectionId, roomName); + } + + return this.roomManager.removeFromRoom(roomName, connection); + } + + async removeFromAllRooms(connection: Connection | string) { + return this.roomManager.removeFromAllRooms(connection); + } + + async clearRoom(roomName: string) { + return this.roomManager.clearRoom(roomName); + } + + async getRoomMembers(roomName: string): Promise { + return this.roomManager.getRoomConnectionIds(roomName); + } + + // #endregion + + // #region Broadcasting + + /** + * Broadcasts a command and payload to a set of connections or all available connections. + * + * @param {string} command - The command to be broadcasted. + * @param {any} payload - The data associated with the command. + * @param {Connection[]=} connections - (Optional) A specific list of connections to broadcast to. If not provided, the command will be sent to all connections. + * + * @throws {Error} Emits an "error" event if broadcasting fails. + */ + async broadcast(command: string, payload: any, connections?: Connection[]) { + return this.broadcastManager.broadcast(command, payload, connections); + } + + /** + * Broadcasts a command and associated payload to all active connections within the specified room. + * + * @param {string} roomName - The name of the room whose connections will receive the broadcast. + * @param {string} command - The command to be broadcasted to the connections. + * @param {unknown} payload - The data payload associated with the command. + * @returns {Promise} A promise that resolves when the broadcast operation is complete. + * @throws {Error} If the broadcast operation fails, an error is thrown and the promise is rejected. + */ + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + return this.broadcastManager.broadcastRoom(roomName, command, payload); + } + + /** + * Broadcasts a command and payload to all active connections except for the specified one(s). + * Excludes the provided connection(s) from receiving the broadcast. + * + * @param {string} command - The command to broadcast to connections. + * @param {any} payload - The payload to send along with the command. + * @param {Connection | Connection[]} exclude - A single connection or an array of connections to exclude from the broadcast. + * @returns {Promise} A promise that resolves when the broadcast is complete. + * @emits {Error} Emits an "error" event if broadcasting the command fails. + */ + async broadcastExclude( + command: string, + payload: any, + exclude: Connection | Connection[] + ): Promise { + return this.broadcastManager.broadcastExclude(command, payload, exclude); + } + + /** + * Broadcasts a command with a payload to all connections in a specified room, + * excluding one or more given connections. If the broadcast fails, emits an error event. + * + * @param {string} roomName - The name of the room to broadcast to. + * @param {string} command - The command to broadcast. + * @param {any} payload - The payload to send with the command. + * @param {Connection | Connection[]} exclude - A connection or array of connections to exclude from the broadcast. + * @returns {Promise} A promise that resolves when the broadcast is complete. + * @emits {Error} Emits an error event if broadcasting fails. + */ + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + exclude: Connection | Connection[] + ): Promise { + return this.broadcastManager.broadcastRoomExclude( + roomName, + command, + payload, + exclude + ); + } + + // #endregion + + // #region Presence Management + + trackPresence( + roomPattern: string | RegExp, + guardOrOptions?: + | (( + connection: Connection, + roomName: string + ) => Promise | boolean) + | { + ttl?: number; + guard?: ( + connection: Connection, + roomName: string + ) => Promise | boolean; + } + ): void { + this.presenceManager.trackRoom(roomPattern, guardOrOptions); + } + + // #endregion + + // #region Command Registration + + private registerBuiltinCommands() { + this.registerCommand< + { channel: string; historyLimit?: number }, + { success: boolean; history?: string[] } + >("mesh/subscribe-channel", async (ctx) => { + const { channel, historyLimit } = ctx.payload; + + if ( + !(await this.channelManager.isChannelExposed(channel, ctx.connection)) + ) { + return { success: false, history: [] }; + } + + try { + if (!this.channelManager.getSubscribers(channel)) { + await this.channelManager.subscribeToRedisChannel(channel); + } + this.channelManager.addSubscription(channel, ctx.connection); + + let history: string[] = []; + if (historyLimit && historyLimit > 0) { + history = await this.channelManager.getChannelHistory( + channel, + historyLimit + ); + } + + return { + success: true, + history, + }; + } catch (e) { + return { success: false, history: [] }; + } + }); + + this.registerCommand<{ channel: string }, boolean>( + "mesh/unsubscribe-channel", + async (ctx) => { + const { channel } = ctx.payload; + const wasSubscribed = this.channelManager.removeSubscription( + channel, + ctx.connection + ); + + if (wasSubscribed && !this.channelManager.getSubscribers(channel)) { + await this.channelManager.unsubscribeFromRedisChannel(channel); + } + + return wasSubscribed; + } + ); + + this.registerCommand< + { roomName: string }, + { success: boolean; present: string[] } + >("mesh/join-room", async (ctx) => { + const { roomName } = ctx.payload; + await this.addToRoom(roomName, ctx.connection); + const present = await this.presenceManager.getPresentConnections( + roomName + ); + return { success: true, present }; + }); + + this.registerCommand<{ roomName: string }, { success: boolean }>( + "mesh/leave-room", + async (ctx) => { + const { roomName } = ctx.payload; + await this.removeFromRoom(roomName, ctx.connection); + return { success: true }; + } + ); + } + + private registerRecordCommands() { + this.registerCommand< + { recordId: string; mode?: "patch" | "full" }, + { success: boolean; record?: any; version?: number } + >("mesh/subscribe-record", async (ctx) => { + const { recordId, mode = "full" } = ctx.payload; + const connectionId = ctx.connection.id; + + if ( + !(await this.recordSubscriptionManager.isRecordExposed( + recordId, + ctx.connection + )) + ) { + return { success: false }; + } + + try { + const { record, version } = + await this.recordManager.getRecordAndVersion(recordId); + + this.recordSubscriptionManager.addSubscription( + recordId, + connectionId, + mode + ); + + return { success: true, record, version }; + } catch (e) { + console.error(`Failed to subscribe to record ${recordId}:`, e); + return { success: false }; + } + }); + + this.registerCommand<{ recordId: string }, boolean>( + "mesh/unsubscribe-record", + async (ctx) => { + const { recordId } = ctx.payload; + const connectionId = ctx.connection.id; + return this.recordSubscriptionManager.removeSubscription( + recordId, + connectionId + ); + } + ); + + this.registerCommand< + { recordId: string; newValue: any }, + { success: boolean } + >("mesh/publish-record-update", async (ctx) => { + const { recordId, newValue } = ctx.payload; + + if ( + !(await this.recordSubscriptionManager.isRecordWritable( + recordId, + ctx.connection + )) + ) { + throw new Error( + `Record "${recordId}" is not writable by this connection.` + ); + } + + try { + await this.publishRecordUpdate(recordId, newValue); + return { success: true }; + } catch (e: any) { + throw new Error( + `Failed to publish update for record "${recordId}": ${e.message}` + ); + } + }); + + this.registerCommand< + { roomName: string }, + { success: boolean; present: string[] } + >("mesh/subscribe-presence", async (ctx) => { + const { roomName } = ctx.payload; + + if ( + !(await this.presenceManager.isRoomTracked(roomName, ctx.connection)) + ) { + return { success: false, present: [] }; + } + + try { + const presenceChannel = `mesh:presence:updates:${roomName}`; + + if (!this.channelManager.getSubscribers(presenceChannel)) { + this.channelManager.addSubscription(presenceChannel, ctx.connection); + } + + const present = await this.presenceManager.getPresentConnections( + roomName + ); + + return { success: true, present }; + } catch (e) { + console.error( + `Failed to subscribe to presence for room ${roomName}:`, + e + ); + return { success: false, present: [] }; + } + }); + + this.registerCommand<{ roomName: string }, boolean>( + "mesh/unsubscribe-presence", + async (ctx) => { + const { roomName } = ctx.payload; + const presenceChannel = `mesh:presence:updates:${roomName}`; + return this.channelManager.removeSubscription( + presenceChannel, + ctx.connection + ); + } + ); + } + + // #endregion + + private async cleanupConnection(connection: Connection) { + connection.stopIntervals(); + + try { + await this.presenceManager.cleanupConnection(connection); + await this.connectionManager.cleanupConnection(connection); + await this.roomManager.cleanupConnection(connection); + this.recordSubscriptionManager.cleanupConnection(connection); + this.channelManager.cleanupConnection(connection); + } catch (err) { + this.emit("error", new Error(`Failed to clean up connection: ${err}`)); + } + } + + /** + * Gracefully closes all active connections, cleans up resources, + * and shuts down the service. Optionally accepts a callback function + * that will be invoked once shutdown is complete or if an error occurs. + * + * @param {((err?: Error) => void)=} callback - Optional callback to be invoked when closing is complete or if an error occurs. + * @returns {Promise} A promise that resolves when shutdown is complete. + * @throws {Error} If an error occurs during shutdown, the promise will be rejected with the error. + */ + async close(callback?: (err?: Error) => void): Promise { + this.redisManager.isShuttingDown = true; + + const connections = Object.values( + this.connectionManager.getLocalConnections() + ); + await Promise.all( + connections.map(async (connection) => { + if (!connection.isDead) { + await connection.close(); + } + await this.cleanupConnection(connection); + }) + ); + + await new Promise((resolve, reject) => { + super.close((err?: Error) => { + if (err) reject(err); + else resolve(); + }); + }); + + this.redisManager.disconnect(); + + this.listening = false; + this.removeAllListeners(); + + if (callback) { + callback(); + } + } + + /** + * Registers a callback function to be executed when a new connection is established. + * + * @param {(connection: Connection) => Promise | void} callback - The function to execute when a new connection is established. + * @returns {MeshServer} The server instance for method chaining. + */ + onConnection( + callback: (connection: Connection) => Promise | void + ): MeshServer { + this.on("connected", callback); + return this; + } + + /** + * Registers a callback function to be executed when a connection is closed. + * + * @param {(connection: Connection) => Promise | void} callback - The function to execute when a connection is closed. + * @returns {MeshServer} The server instance for method chaining. + */ + onDisconnection( + callback: (connection: Connection) => Promise | void + ): MeshServer { + this.on("disconnected", callback); + return this; + } +} diff --git a/packages/mesh/src/server/types.ts b/packages/mesh/src/server/types.ts new file mode 100644 index 0000000..589bc1a --- /dev/null +++ b/packages/mesh/src/server/types.ts @@ -0,0 +1,55 @@ +import type { ServerOptions } from "ws"; +import type { RedisOptions } from "ioredis"; +import type { Operation } from "fast-json-patch"; +import type { Connection } from "./connection"; +import type { Command } from "../common/message"; +import type { MeshContext } from "./mesh-context"; + +export type SocketMiddleware = ( + context: MeshContext +) => any | Promise; + +export type PubSubMessagePayload = { + targetConnectionIds: string[]; + command: Command; +}; + +export type RecordUpdatePubSubPayload = { + recordId: string; + newValue?: any; + patch?: Operation[]; + version: number; +}; + +export type MeshServerOptions = ServerOptions & { + /** + * The interval at which to send ping messages to the client. + * + * @default 30000 + */ + pingInterval?: number; + + /** + * The interval at which to send both latency requests and updates to the client. + * + * @default 5000 + */ + latencyInterval?: number; + redisOptions: RedisOptions; + + /** + * The maximum number of consecutive ping intervals the server will wait + * for a pong response before considering the client disconnected. + * A value of 1 means the client must respond within roughly 2 * pingInterval + * before being disconnected. Setting it to 0 is not recommended as it will + * immediately disconnect the client if it doesn't respond to the first ping in + * exactly `pingInterval` milliseconds, which doesn't provide wiggle room for + * network latency. + * + * @see pingInterval + * @default 1 + */ + maxMissedPongs?: number; +}; + +export type ChannelPattern = string | RegExp; diff --git a/packages/mesh/src/server/utils/constants.ts b/packages/mesh/src/server/utils/constants.ts new file mode 100644 index 0000000..6aac51f --- /dev/null +++ b/packages/mesh/src/server/utils/constants.ts @@ -0,0 +1,2 @@ +export const PUB_SUB_CHANNEL_PREFIX = "mesh:pubsub:"; +export const RECORD_PUB_SUB_CHANNEL = "mesh:record-updates"; diff --git a/packages/mesh/src/tests/basic.test.ts b/packages/mesh/src/tests/basic.test.ts index 29e5ee0..2a9b7a2 100644 --- a/packages/mesh/src/tests/basic.test.ts +++ b/packages/mesh/src/tests/basic.test.ts @@ -48,13 +48,6 @@ describe("MeshServer", () => { await server.close(); }); - test("should create a server instance", () => { - expect(server).toBeInstanceOf(MeshServer); - expect(server.redis).toBeInstanceOf(Redis); - expect(server.roomManager).toBeDefined(); - expect(server.connectionManager).toBeDefined(); - }); - test("clients can connect to the server", async () => { await clientA.connect(); expect(clientA.status).toBe(Status.ONLINE);