diff --git a/packages/keepalive-ws/README.md b/packages/keepalive-ws/README.md index 13554b7..be5b0d5 100644 --- a/packages/keepalive-ws/README.md +++ b/packages/keepalive-ws/README.md @@ -26,6 +26,9 @@ const server = new KeepAliveServer({ port: 8080, pingInterval: 30000, latencyInterval: 5000, + // Multi-instance room support (optional): + // roomBackend: "redis", + // redisOptions: { host: "localhost", port: 6379 } }); // Register command handlers @@ -41,8 +44,8 @@ server.registerCommand("throws", async () => { // Room-based messaging server.registerCommand("join-room", async (context) => { const { roomName } = context.payload; - server.addToRoom(roomName, context.connection); - server.broadcastRoom(roomName, "user-joined", { + await server.addToRoom(roomName, context.connection); + await server.broadcastRoom(roomName, "user-joined", { id: context.connection.id }); return { success: true }; @@ -101,17 +104,17 @@ await client.close(); ### Room Management ```typescript -// Add a connection to a room -server.addToRoom("roomName", connection); +// Add a connection to a room (async) +await server.addToRoom("roomName", connection); -// Remove a connection from a room -server.removeFromRoom("roomName", connection); +// Remove a connection from a room (async) +await server.removeFromRoom("roomName", connection); -// Get all connections in a room -const roomConnections = server.getRoom("roomName"); +// Get all connections in a room (async) +const roomConnections = await server.getRoom("roomName"); -// Clear all connections from a room -server.clearRoom("roomName"); +// Clear all connections from a room (async) +await server.clearRoom("roomName"); ``` ### Broadcasting @@ -162,6 +165,22 @@ server.registerCommand( ); ``` +## Multi-Instance Room Support + +To enable multi-instance room support (so rooms are shared across all server instances), configure the server with `roomBackend: "redis"` and provide `redisOptions`: + +```typescript +import { KeepAliveServer } from "@prsm/keepalive-ws/server"; + +const server = new KeepAliveServer({ + port: 8080, + roomBackend: "redis", + redisOptions: { host: "localhost", port: 6379 } +}); +``` + +All room management methods become async and must be awaited. + ## Graceful Shutdown ```typescript diff --git a/packages/keepalive-ws/bun.lockb b/packages/keepalive-ws/bun.lockb index 0e58303..4866361 100755 Binary files a/packages/keepalive-ws/bun.lockb and b/packages/keepalive-ws/bun.lockb differ diff --git a/packages/keepalive-ws/docker-compose.yml b/packages/keepalive-ws/docker-compose.yml new file mode 100644 index 0000000..a05de07 --- /dev/null +++ b/packages/keepalive-ws/docker-compose.yml @@ -0,0 +1,16 @@ +version: "3.8" +services: + redis: + image: redis:7 + ports: + - "6379:6379" + command: ["redis-server", "--save", "", "--appendonly", "no"] + + redis-commander: + image: rediscommander/redis-commander:latest + environment: + - REDIS_HOSTS=local:redis:6379 + ports: + - "8081:8081" + depends_on: + - redis diff --git a/packages/keepalive-ws/package.json b/packages/keepalive-ws/package.json index ac8f3f0..fe0bbf0 100644 --- a/packages/keepalive-ws/package.json +++ b/packages/keepalive-ws/package.json @@ -40,6 +40,7 @@ "keywords": [], "license": "Apache-2.0", "dependencies": { + "ioredis": "^5.6.1", "ws": "^8.9.0" }, "devDependencies": { diff --git a/packages/keepalive-ws/src/client/client.ts b/packages/keepalive-ws/src/client/client.ts index 2b0acf2..29fb4d3 100644 --- a/packages/keepalive-ws/src/client/client.ts +++ b/packages/keepalive-ws/src/client/client.ts @@ -73,7 +73,21 @@ export class KeepAliveClient extends EventEmitter { private setupConnectionEvents(): void { // Forward relevant events from connection to client this.connection.on("message", (data) => { + // Forward the raw message event this.emit("message", data); + + // Also forward the specific command event if it's not a system event + // (System events like ping/latency are handled separately below) + const systemCommands = [ + "ping", + "pong", + "latency", + "latency:request", + "latency:response", + ]; + if (data.command && !systemCommands.includes(data.command)) { + this.emit(data.command, data.payload); + } }); this.connection.on("close", () => { @@ -150,8 +164,8 @@ export class KeepAliveClient extends EventEmitter { new CodeError( "WebSocket connection error", "ECONNECTION", - "ConnectionError", - ), + "ConnectionError" + ) ); }; } catch (error) { @@ -233,11 +247,12 @@ export class KeepAliveClient extends EventEmitter { if (attempt <= this.options.maxReconnectAttempts) { setTimeout(connect, this.options.reconnectInterval); - } else { - this.isReconnecting = false; - this._status = Status.OFFLINE; - this.emit("reconnectfailed"); + return; } + + this.isReconnecting = false; + this._status = Status.OFFLINE; + this.emit("reconnectfailed"); }; this.socket.onopen = () => { @@ -268,13 +283,13 @@ export class KeepAliveClient extends EventEmitter { command: string, payload?: any, expiresIn: number = 30000, - callback?: (result: any, error?: Error) => void, + callback?: (result: any, error?: Error) => void ): Promise { // Ensure we're connected before sending commands if (this._status !== Status.ONLINE) { return this.connect() .then(() => - this.connection.command(command, payload, expiresIn, callback), + this.connection.command(command, payload, expiresIn, callback) ) .catch((error) => { if (callback) { diff --git a/packages/keepalive-ws/src/client/connection.ts b/packages/keepalive-ws/src/client/connection.ts index 081a7ee..8f2a414 100644 --- a/packages/keepalive-ws/src/client/connection.ts +++ b/packages/keepalive-ws/src/client/connection.ts @@ -115,7 +115,7 @@ export class Connection extends EventEmitter { command: string, payload: any, expiresIn: number | null = 30_000, - callback?: (result: any, error?: Error) => void, + callback?: (result: any, error?: Error) => void ): Promise | null { const id = this.ids.reserve(); const cmd: Command = { id, command, payload: payload ?? {} }; @@ -142,17 +142,17 @@ export class Connection extends EventEmitter { const timeoutPromise = new Promise((_, reject) => { setTimeout(() => { - if (this.callbacks[id]) { - this.ids.release(id); - delete this.callbacks[id]; - reject( - new CodeError( - `Command timed out after ${expiresIn}ms.`, - "ETIMEOUT", - "TimeoutError", - ), - ); - } + if (!this.callbacks[id]) return; + + this.ids.release(id); + delete this.callbacks[id]; + reject( + new CodeError( + `Command timed out after ${expiresIn}ms.`, + "ETIMEOUT", + "TimeoutError" + ) + ); }, expiresIn); }); diff --git a/packages/keepalive-ws/src/client/ids.ts b/packages/keepalive-ws/src/client/ids.ts index 1d0d252..942ac49 100644 --- a/packages/keepalive-ws/src/client/ids.ts +++ b/packages/keepalive-ws/src/client/ids.ts @@ -10,7 +10,7 @@ export class IdManager { release(id: number) { if (id < 0 || id > this.maxIndex) { throw new TypeError( - `ID must be between 0 and ${this.maxIndex}. Got ${id}.`, + `ID must be between 0 and ${this.maxIndex}. Got ${id}.` ); } this.ids[id] = false; @@ -36,7 +36,7 @@ export class IdManager { if (this.index === startIndex) { throw new Error( - `All IDs are reserved. Make sure to release IDs when they are no longer used.`, + `All IDs are reserved. Make sure to release IDs when they are no longer used.` ); } } diff --git a/packages/keepalive-ws/src/server/connection.ts b/packages/keepalive-ws/src/server/connection.ts index 81d9af2..d81bb4e 100644 --- a/packages/keepalive-ws/src/server/connection.ts +++ b/packages/keepalive-ws/src/server/connection.ts @@ -20,7 +20,7 @@ export class Connection extends EventEmitter { constructor( socket: WebSocket, req: IncomingMessage, - options: KeepAliveServerOptions, + options: KeepAliveServerOptions ) { super(); this.socket = socket; diff --git a/packages/keepalive-ws/src/server/index.ts b/packages/keepalive-ws/src/server/index.ts index 04992fc..7004eb1 100644 --- a/packages/keepalive-ws/src/server/index.ts +++ b/packages/keepalive-ws/src/server/index.ts @@ -1,5 +1,11 @@ import { IncomingMessage } from "node:http"; import { ServerOptions, WebSocket, WebSocketServer } from "ws"; +import type { RedisOptions } from "ioredis"; +import { + RoomManager, + InMemoryRoomManager, + RedisRoomManager, +} from "./room-manager"; import { CodeError } from "../common/codeerror"; import { Command, parseCommand } from "../common/message"; import { Status } from "../common/status"; @@ -34,6 +40,16 @@ export type KeepAliveServerOptions = ServerOptions & { * @default 5000 */ latencyInterval?: number; + + /** + * Room backend type: "memory" (default) or "redis" + */ + roomBackend?: "memory" | "redis"; + + /** + * Redis options, required if roomBackend is "redis" + */ + redisOptions?: RedisOptions; }; export class KeepAliveServer extends WebSocketServer { @@ -44,7 +60,7 @@ export class KeepAliveServer extends WebSocketServer { } = {}; globalMiddlewares: SocketMiddleware[] = []; middlewares: { [key: string]: SocketMiddleware[] } = {}; - rooms: { [roomName: string]: Set } = {}; + roomManager: RoomManager; serverOptions: ServerOptions & { pingInterval: number; latencyInterval: number; @@ -67,6 +83,23 @@ export class KeepAliveServer extends WebSocketServer { latencyInterval: opts.latencyInterval ?? 5_000, }; + // Room manager selection + if (opts.roomBackend === "redis") { + if (!opts.redisOptions) { + throw new Error( + "redisOptions must be provided when roomBackend is 'redis'" + ); + } + this.roomManager = new RedisRoomManager( + opts.redisOptions, + (id: string) => this.connections[id] + ); + } else { + this.roomManager = new InMemoryRoomManager( + (id: string) => this.connections[id] + ); + } + this.on("listening", () => { this._listening = true; this.status = Status.ONLINE; @@ -80,14 +113,14 @@ export class KeepAliveServer extends WebSocketServer { this.applyListeners(); } - private cleanupConnection(connection: Connection): void { + private async cleanupConnection(connection: Connection): Promise { connection.stopIntervals(); delete this.connections[connection.id]; if (this.remoteAddressToConnections[connection.remoteAddress]) { this.remoteAddressToConnections[connection.remoteAddress] = this.remoteAddressToConnections[connection.remoteAddress].filter( - (conn) => conn.id !== connection.id, + (conn) => conn.id !== connection.id ); if ( @@ -98,9 +131,7 @@ export class KeepAliveServer extends WebSocketServer { } // Remove from all rooms - Object.keys(this.rooms).forEach((roomName) => { - this.rooms[roomName].delete(connection.id); - }); + await this.roomManager.removeFromAllRooms(connection); } private applyListeners(): void { @@ -113,13 +144,13 @@ export class KeepAliveServer extends WebSocketServer { } this.remoteAddressToConnections[connection.remoteAddress].push( - connection, + connection ); this.emit("connected", connection); - connection.on("close", () => { - this.cleanupConnection(connection); + connection.on("close", async () => { + await this.cleanupConnection(connection); this.emit("close", connection); }); @@ -137,7 +168,7 @@ export class KeepAliveServer extends WebSocketServer { command.id, command.command, command.payload, - connection, + connection ); } } catch (error) { @@ -172,7 +203,7 @@ export class KeepAliveServer extends WebSocketServer { broadcastRemoteAddress( connection: Connection, command: string, - payload: any, + payload: any ): void { const cmd: Command = { command, payload }; const connections = @@ -194,47 +225,30 @@ export class KeepAliveServer extends WebSocketServer { * Given a roomName, a command and a payload, broadcasts to all Connections * that are in the room. */ - broadcastRoom(roomName: string, command: string, payload: any): void { - const cmd: Command = { command, payload }; - const room = this.rooms[roomName]; - - if (!room) return; - - room.forEach((connectionId) => { - const connection = this.connections[connectionId]; - if (connection) { - connection.send(cmd); - } - }); + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + await this.roomManager.broadcastRoom(roomName, command, payload); } /** * Given a roomName, command, payload, and Connection OR Connection[], broadcasts to all Connections * that are in the room except the provided Connection(s). */ - broadcastRoomExclude( + async broadcastRoomExclude( roomName: string, command: string, payload: any, - connection: Connection | Connection[], - ): void { - const cmd: Command = { command, payload }; - const room = this.rooms[roomName]; - - if (!room) return; - - const excludeIds = Array.isArray(connection) - ? connection.map((c) => c.id) - : [connection.id]; - - room.forEach((connectionId) => { - if (!excludeIds.includes(connectionId)) { - const conn = this.connections[connectionId]; - if (conn) { - conn.send(cmd); - } - } - }); + connection: Connection | Connection[] + ): Promise { + await this.roomManager.broadcastRoomExclude( + roomName, + command, + payload, + connection + ); } /** @@ -244,7 +258,7 @@ export class KeepAliveServer extends WebSocketServer { broadcastExclude( connection: Connection, command: string, - payload: any, + payload: any ): void { const cmd: Command = { command, payload }; @@ -258,46 +272,39 @@ export class KeepAliveServer extends WebSocketServer { /** * Add a connection to a room */ - addToRoom(roomName: string, connection: Connection): void { - this.rooms[roomName] = this.rooms[roomName] ?? new Set(); - this.rooms[roomName].add(connection.id); + async addToRoom(roomName: string, connection: Connection): Promise { + await this.roomManager.addToRoom(roomName, connection); } /** * Remove a connection from a room */ - removeFromRoom(roomName: string, connection: Connection): void { - if (!this.rooms[roomName]) return; - this.rooms[roomName].delete(connection.id); + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + await this.roomManager.removeFromRoom(roomName, connection); } /** * Remove a connection from all rooms */ - removeFromAllRooms(connection: Connection | string): void { - const connectionId = - typeof connection === "string" ? connection : connection.id; - - Object.keys(this.rooms).forEach((roomName) => { - this.rooms[roomName].delete(connectionId); - }); + async removeFromAllRooms(connection: Connection | string): Promise { + await this.roomManager.removeFromAllRooms(connection); } /** * Returns all connections in a room */ - getRoom(roomName: string): Connection[] { - const ids = this.rooms[roomName] || new Set(); - return Array.from(ids) - .map((id) => this.connections[id]) - .filter(Boolean); + async getRoom(roomName: string): Promise { + return this.roomManager.getRoom(roomName); } /** * Clear all connections from a room */ - clearRoom(roomName: string): void { - this.rooms[roomName] = new Set(); + async clearRoom(roomName: string): Promise { + await this.roomManager.clearRoom(roomName); } /** @@ -306,7 +313,7 @@ export class KeepAliveServer extends WebSocketServer { async registerCommand( command: string, callback: (context: WSContext) => Promise | T, - middlewares: SocketMiddleware[] = [], + middlewares: SocketMiddleware[] = [] ): Promise { this.commands[command] = callback; @@ -322,7 +329,7 @@ export class KeepAliveServer extends WebSocketServer { */ prependMiddlewareToCommand( command: string, - middlewares: SocketMiddleware[], + middlewares: SocketMiddleware[] ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; @@ -335,7 +342,7 @@ export class KeepAliveServer extends WebSocketServer { */ appendMiddlewareToCommand( command: string, - middlewares: SocketMiddleware[], + middlewares: SocketMiddleware[] ): void { if (middlewares.length) { this.middlewares[command] = this.middlewares[command] || []; @@ -350,7 +357,7 @@ export class KeepAliveServer extends WebSocketServer { id: number, command: string, payload: any, - connection: Connection, + connection: Connection ): Promise { const context = new WSContext(this, connection, payload); @@ -359,7 +366,7 @@ export class KeepAliveServer extends WebSocketServer { throw new CodeError( `Command [${command}] not found.`, "ENOTFOUND", - "CommandError", + "CommandError" ); } diff --git a/packages/keepalive-ws/src/server/room-manager.ts b/packages/keepalive-ws/src/server/room-manager.ts new file mode 100644 index 0000000..222797d --- /dev/null +++ b/packages/keepalive-ws/src/server/room-manager.ts @@ -0,0 +1,192 @@ +import { Connection } from "./connection"; +import Redis from "ioredis"; +import type { RedisOptions } from "ioredis"; + +export interface RoomManager { + addToRoom(roomName: string, connection: Connection): Promise; + removeFromRoom(roomName: string, connection: Connection): Promise; + removeFromAllRooms(connection: Connection | string): Promise; + getRoom(roomName: string): Promise; + clearRoom(roomName: string): Promise; + broadcastRoom(roomName: string, command: string, payload: any): Promise; + broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise; +} + +export class InMemoryRoomManager implements RoomManager { + private rooms: { [roomName: string]: Set } = {}; + private getConnectionById: (id: string) => Connection | undefined; + + constructor(getConnectionById: (id: string) => Connection | undefined) { + this.getConnectionById = getConnectionById; + } + + async addToRoom(roomName: string, connection: Connection): Promise { + this.rooms[roomName] = this.rooms[roomName] ?? new Set(); + this.rooms[roomName].add(connection.id); + } + + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + if (!this.rooms[roomName]) return; + this.rooms[roomName].delete(connection.id); + } + + async removeFromAllRooms(connection: Connection | string): Promise { + const connectionId = + typeof connection === "string" ? connection : connection.id; + Object.keys(this.rooms).forEach((roomName) => { + this.rooms[roomName].delete(connectionId); + }); + } + + async getRoom(roomName: string): Promise { + const ids = this.rooms[roomName] || new Set(); + return Array.from(ids) + .map((id) => this.getConnectionById(id)) + .filter(Boolean) as Connection[]; + } + + async clearRoom(roomName: string): Promise { + this.rooms[roomName] = new Set(); + } + + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + const ids = this.rooms[roomName]; + if (!ids) return; + for (const connectionId of ids) { + const connection = this.getConnectionById(connectionId); + if (connection) { + connection.send({ command, payload }); + } + } + } + + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise { + const ids = this.rooms[roomName]; + if (!ids) return; + const excludeIds = Array.isArray(connection) + ? connection.map((c) => c.id) + : [connection.id]; + for (const connectionId of ids) { + if (!excludeIds.includes(connectionId)) { + const conn = this.getConnectionById(connectionId); + if (conn) { + conn.send({ command, payload }); + } + } + } + } +} + +export class RedisRoomManager implements RoomManager { + private redis: Redis; + private getConnectionById: (id: string) => Connection | undefined; + + constructor( + redisOptions: RedisOptions, + getConnectionById: (id: string) => Connection | undefined + ) { + this.redis = new Redis(redisOptions); + this.getConnectionById = getConnectionById; + // TODO: reconnect logic? + } + + private roomKey(roomName: string) { + return `room:${roomName}`; + } + + private connRoomsKey(connectionId: string) { + return `connection:${connectionId}:rooms`; + } + + async addToRoom(roomName: string, connection: Connection): Promise { + await this.redis.sadd(this.roomKey(roomName), connection.id); + await this.redis.sadd(this.connRoomsKey(connection.id), roomName); + } + + async removeFromRoom( + roomName: string, + connection: Connection + ): Promise { + await this.redis.srem(this.roomKey(roomName), connection.id); + await this.redis.srem(this.connRoomsKey(connection.id), roomName); + } + + async removeFromAllRooms(connection: Connection | string): Promise { + const connectionId = + typeof connection === "string" ? connection : connection.id; + const roomNames = await this.redis.smembers( + this.connRoomsKey(connectionId) + ); + + if (!(roomNames.length > 0)) return; + + const pipeline = this.redis.pipeline(); + for (const roomName of roomNames) { + pipeline.srem(this.roomKey(roomName), connectionId); + } + pipeline.del(this.connRoomsKey(connectionId)); + await pipeline.exec(); + } + + async getRoom(roomName: string): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + return ids + .map((id) => this.getConnectionById(id)) + .filter(Boolean) as Connection[]; + } + + async clearRoom(roomName: string): Promise { + await this.redis.del(this.roomKey(roomName)); + } + + async broadcastRoom( + roomName: string, + command: string, + payload: any + ): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + for (const connectionId of ids) { + const connection = this.getConnectionById(connectionId); + if (connection) { + connection.send({ command, payload }); + } + } + } + + async broadcastRoomExclude( + roomName: string, + command: string, + payload: any, + connection: Connection | Connection[] + ): Promise { + const ids = await this.redis.smembers(this.roomKey(roomName)); + const excludeIds = Array.isArray(connection) + ? connection.map((c) => c.id) + : [connection.id]; + for (const connectionId of ids) { + if (!excludeIds.includes(connectionId)) { + const conn = this.getConnectionById(connectionId); + if (conn) { + conn.send({ command, payload }); + } + } + } + } +} diff --git a/packages/keepalive-ws/tests/advanced.test.ts b/packages/keepalive-ws/tests/advanced.test.ts index dc1172a..790d6ba 100644 --- a/packages/keepalive-ws/tests/advanced.test.ts +++ b/packages/keepalive-ws/tests/advanced.test.ts @@ -2,13 +2,12 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { KeepAliveClient, Status } from "../src/client/client"; import { KeepAliveServer } from "../src/server/index"; -const createTestServer = (port: number) => { - return new KeepAliveServer({ +const createTestServer = (port: number) => + new KeepAliveServer({ port, pingInterval: 1000, latencyInterval: 500, }); -}; describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { const port = 8125; @@ -49,14 +48,15 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }); test("command times out when server doesn't respond", async () => { - await server.registerCommand("never-responds", async () => { - return new Promise(() => {}); - }); + await server.registerCommand( + "never-responds", + async () => new Promise(() => {}) + ); await client.connect(); await expect( - client.command("never-responds", "Should timeout", 500), + client.command("never-responds", "Should timeout", 500) ).rejects.toThrow(/timed out/); }, 2000); @@ -82,9 +82,10 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { return `Slow: ${context.payload}`; }); - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); await client.connect(); @@ -98,9 +99,7 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }, 3000); test("handles large payloads correctly", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); @@ -123,9 +122,10 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { }, 10000); test("server handles multiple client connections", async () => { - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); const clients = Array(5) .fill(0) @@ -134,7 +134,7 @@ describe("Advanced KeepAliveClient and KeepAliveServer Tests", () => { await Promise.all(clients.map((client) => client.connect())); const results = await Promise.all( - clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)), + clients.map((client, i) => client.command("echo", `Client ${i}`, 1000)) ); results.forEach((result, i) => { diff --git a/packages/keepalive-ws/tests/basic.test.ts b/packages/keepalive-ws/tests/basic.test.ts index b1ea141..acee5d3 100644 --- a/packages/keepalive-ws/tests/basic.test.ts +++ b/packages/keepalive-ws/tests/basic.test.ts @@ -2,13 +2,12 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { KeepAliveClient, Status } from "../src/client/client"; import { KeepAliveServer } from "../src/server/index"; -const createTestServer = (port: number) => { - return new KeepAliveServer({ +const createTestServer = (port: number) => + new KeepAliveServer({ port, pingInterval: 1000, latencyInterval: 500, }); -}; describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { const port = 8124; @@ -48,18 +47,17 @@ describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { }); test("client-server connection should be online", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); expect(client.status).toBe(Status.ONLINE); }, 10000); test("simple echo command", async () => { - await server.registerCommand("echo", async (context) => { - return `Echo: ${context.payload}`; - }); + await server.registerCommand( + "echo", + async (context) => `Echo: ${context.payload}` + ); await client.connect(); @@ -68,9 +66,7 @@ describe("Basic KeepAliveClient and KeepAliveServer Tests", () => { }, 10000); test("connect should resolve when already connected", async () => { - await server.registerCommand("echo", async (context) => { - return context.payload; - }); + await server.registerCommand("echo", async (context) => context.payload); await client.connect(); expect(client.status).toBe(Status.ONLINE); diff --git a/packages/keepalive-ws/tests/redis-room.test.ts b/packages/keepalive-ws/tests/redis-room.test.ts new file mode 100644 index 0000000..17f53e0 --- /dev/null +++ b/packages/keepalive-ws/tests/redis-room.test.ts @@ -0,0 +1,343 @@ +import { describe, test, expect, beforeEach, afterEach } from "vitest"; +import Redis from "ioredis"; +import { KeepAliveClient, Status } from "../src/client/client"; +import { KeepAliveServer } from "../src/server/index"; + +const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1"; +const REDIS_PORT = process.env.REDIS_PORT + ? parseInt(process.env.REDIS_PORT, 10) + : 6379; + +const createRedisServer = (port: number) => + new KeepAliveServer({ + port, + pingInterval: 1000, + latencyInterval: 500, + roomBackend: "redis", + redisOptions: { host: REDIS_HOST, port: REDIS_PORT }, + }); + +const flushRedis = async () => { + const redis = new Redis({ host: REDIS_HOST, port: REDIS_PORT }); + await redis.flushdb(); + await redis.quit(); +}; + +describe("KeepAliveServer with Redis room backend", () => { + const port = 8126; + let server: KeepAliveServer; + let clientA: KeepAliveClient; + let clientB: KeepAliveClient; + + beforeEach(async () => { + await flushRedis(); + + server = createRedisServer(port); + + await new Promise((resolve) => { + server.on("listening", () => resolve()); + if (server.listening) resolve(); + }); + + clientA = new KeepAliveClient(`ws://localhost:${port}`); + clientB = new KeepAliveClient(`ws://localhost:${port}`); + }); + + afterEach(async () => { + if (clientA.status === Status.ONLINE) await clientA.close(); + if (clientB.status === Status.ONLINE) await clientB.close(); + + return new Promise((resolve) => { + if (server) { + server.close(() => resolve()); + } else { + resolve(); + } + }); + }); + + test("multi-instance room membership and broadcast with Redis", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom" }); + await clientB.command("join-room", { room: "testroom" }); + + await clientA.command("broadcast-room", { + room: "testroom", + message: "hello", + }); + + // Wait for both events or timeout + await new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, 2000); + const check = () => { + if (receivedA && receivedB) { + clearTimeout(timeout); + resolve(null); + } + }; + clientA.on("room-message", check); + clientB.on("room-message", check); + }); + + expect(receivedA).toBe(true); + expect(receivedB).toBe(true); + }, 10000); + + test("removeFromRoom removes a client from a specific room", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("leave-room", async (context) => { + await server.removeFromRoom(context.payload.room, context.connection); + return { left: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello after leave") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello after leave") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom-leave" }); + await clientB.command("join-room", { room: "testroom-leave" }); + + // Ensure both are in before leaving + await new Promise((res) => setTimeout(res, 100)); // Short delay for redis propagation + + await clientA.command("leave-room", { room: "testroom-leave" }); + + // Wait a bit for leave command to process + await new Promise((res) => setTimeout(res, 100)); + + await clientB.command("broadcast-room", { + room: "testroom-leave", + message: "hello after leave", + }); + + // Wait for potential message or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A should not receive the message + expect(receivedB).toBe(true); // Client B should receive the message + }, 10000); + + test("removeFromAllRooms removes a client from all rooms", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("leave-all-rooms", async (context) => { + await server.removeFromAllRooms(context.connection); + return { left_all: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA_room1 = false; + let receivedA_room2 = false; + let receivedB_room1 = false; + + clientA.on("room-message", (data) => { + if (data === "hello room1 after all") receivedA_room1 = true; + if (data === "hello room2 after all") receivedA_room2 = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello room1 after all") receivedB_room1 = true; + }); + + await clientA.command("join-room", { room: "room1" }); + await clientA.command("join-room", { room: "room2" }); + await clientB.command("join-room", { room: "room1" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + await clientA.command("leave-all-rooms", {}); + + // Wait a bit for leave command to process + await new Promise((res) => setTimeout(res, 100)); + + // Broadcast to room1 + await clientB.command("broadcast-room", { + room: "room1", + message: "hello room1 after all", + }); + // Broadcast to room2 (no one should be left) + await clientB.command("broadcast-room", { + // Client B isn't in room2, but can still broadcast + room: "room2", + message: "hello room2 after all", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA_room1).toBe(false); // Client A should not receive from room1 + expect(receivedA_room2).toBe(false); // Client A should not receive from room2 + expect(receivedB_room1).toBe(true); // Client B should receive from room1 + }, 10000); + + test("clearRoom removes all clients from a room", async () => { + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("clear-room", async (context) => { + await server.clearRoom(context.payload.room); + return { cleared: true }; + }); + await server.registerCommand("broadcast-room", async (context) => { + await server.broadcastRoom( + context.payload.room, + "room-message", + context.payload.message + ); + return { sent: true }; + }); + + await clientA.connect(); + await clientB.connect(); + + let receivedA = false; + let receivedB = false; + + clientA.on("room-message", (data) => { + if (data === "hello after clear") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello after clear") receivedB = true; + }); + + await clientA.command("join-room", { room: "testroom-clear" }); + await clientB.command("join-room", { room: "testroom-clear" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + await clientA.command("clear-room", { room: "testroom-clear" }); + + // Wait a bit for clear command to process + await new Promise((res) => setTimeout(res, 100)); + + // Try broadcasting (client A is still connected, just not in room) + await clientA.command("broadcast-room", { + room: "testroom-clear", + message: "hello after clear", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A should not receive + expect(receivedB).toBe(false); // Client B should not receive + }, 10000); + + test("broadcastRoomExclude sends to all except specified clients", async () => { + const clientC = new KeepAliveClient(`ws://localhost:${port}`); + + await server.registerCommand("join-room", async (context) => { + await server.addToRoom(context.payload.room, context.connection); + return { joined: true }; + }); + await server.registerCommand("broadcast-exclude", async (context) => { + await server.broadcastRoomExclude( + context.payload.room, + "room-message", + context.payload.message, + context.connection // Exclude sender + ); + return { sent_exclude: true }; + }); + + await clientA.connect(); + await clientB.connect(); + await clientC.connect(); + + let receivedA = false; + let receivedB = false; + let receivedC = false; + + clientA.on("room-message", (data) => { + if (data === "hello exclude") receivedA = true; + }); + clientB.on("room-message", (data) => { + if (data === "hello exclude") receivedB = true; + }); + clientC.on("room-message", (data) => { + if (data === "hello exclude") receivedC = true; + }); + + await clientA.command("join-room", { room: "testroom-exclude" }); + await clientB.command("join-room", { room: "testroom-exclude" }); + await clientC.command("join-room", { room: "testroom-exclude" }); + + // Ensure joins are processed + await new Promise((res) => setTimeout(res, 100)); + + // Client A broadcasts, excluding itself + await clientA.command("broadcast-exclude", { + room: "testroom-exclude", + message: "hello exclude", + }); + + // Wait for potential messages or timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(receivedA).toBe(false); // Client A (sender) should not receive + expect(receivedB).toBe(true); // Client B should receive + expect(receivedC).toBe(true); // Client C should receive + + if (clientC.status === Status.ONLINE) await clientC.close(); + }, 10000); +});