fix(presence): add TTL-based expiration cleanup using Redis keyspace notifications

This commit is contained in:
nvms 2025-04-21 08:37:05 -04:00
parent 6e153b1b44
commit 58980e9f09
6 changed files with 188 additions and 50 deletions

View File

@ -323,57 +323,42 @@ Mesh provides a built-in presence system that tracks which connections are prese
### Server configuration ### Server configuration
Enable presence tracking for specific rooms using exact names or regex patterns: Enable presence tracking for specific rooms using exact names or regex patterns. You can optionally customize the TTL or restrict access with a guard.
```ts ```ts
// track presence for all rooms matching a pattern // track presence for all rooms matching a pattern
server.trackPresence(/^room:.*$/); server.trackPresence(/^room:.*$/);
// track presence for a specific room // track presence for a specific room with a custom TTL
server.trackPresence("lobby"); server.trackPresence("game-room", {
ttl: 60_000, // time in ms before presence entry expires if not refreshed
// guard who can see presence.
// clients who attempt to subscribe to the presence of this room
// will be rejected if the guard returns false
server.trackPresence("admin-room", async (conn, roomName) => {
const meta = await server.connectionManager.getMetadata(conn);
return meta?.isAdmin === true;
}); });
// custom TTL // restrict visibility to admins
server.trackPresence("game-room", { ttl: 60_000 }); // ms server.trackPresence("admin-room", {
// guard and TTL
server.trackPresence("vip-room", {
ttl: 30_000,
guard: async (conn, roomName) => { guard: async (conn, roomName) => {
const meta = await server.connectionManager.getMetadata(conn); const meta = await server.connectionManager.getMetadata(conn);
return meta?.isVIP === true; return meta?.isAdmin === true;
} }
}); });
``` ```
When presence tracking is enabled for a room, Mesh automatically: When presence tracking is enabled:
- Detects and records the connection IDs of clients joining the room. - Mesh stores connection IDs in Redis with a TTL
- Emits realtime “join” and “leave” events to subscribed clients. - As long as a client remains active, the TTL is automatically refreshed
- Automatically refreshes each connections presence using a configurable TTL as long as the client remains active. - When the TTL expires (e.g. due to disconnect or inactivity), Mesh **automatically marks the connection offline** and emits a `leave` event
- Cleans up expired or disconnected entries to maintain an up-to-date presence list.
### Getting presence information (server-side) > [!INFO]
> Under the hood, this uses Redis keyspace notifications to detect expiration events and trigger cleanup. This behavior is enabled by default and can be disabled via the `enablePresenceExpirationEvents` server option.
```ts
// get all connections currently present in a room
const connectionIds = await server.presenceManager.getPresentConnections("lobby");
```
### Client usage ### Client usage
Subscribe to presence updates for a room: Subscribe to presence updates:
```ts ```ts
const { success, present } = await client.subscribePresence( const { success, present } = await client.subscribePresence(
"lobby", "room:lobby",
(update) => { (update) => {
if (update.type === "join") { if (update.type === "join") {
console.log("User joined:", update.connectionId); console.log("User joined:", update.connectionId);
@ -387,12 +372,38 @@ const { success, present } = await client.subscribePresence(
console.log("Currently present:", present); // ["conn1", "conn2", ...] console.log("Currently present:", present); // ["conn1", "conn2", ...]
``` ```
Unsubscribe when no longer needed: You'll receive:
- The current list of `connectionId`s as `present`
- Real-time `"join"` and `"leave"` events as users come and go (or TTL expires)
Unsubscribe when done:
```ts ```ts
await client.unsubscribePresence("lobby"); await client.unsubscribePresence("room:lobby");
``` ```
### Getting presence information (server-side)
```ts
const ids = await server.presenceManager.getPresentConnections("room:lobby");
// ["abc123", "def456", ...]
```
### Disabling auto-cleanup (optional)
If for some reason you don't want TTL expirations to trigger `leave` events, you can disable it in your `MeshServer` options:
```ts
const server = new MeshServer({
port: 8080,
redisOptions: { host: "localhost", port: 6379 },
enablePresenceExpirationEvents: false,
});
```
This disables Redis keyspace notifications and requires you to manage stale connections yourself (not recommended).
### Combining presence with user info ### Combining presence with user info
Presence is most useful when combined with connection metadata. For example: Presence is most useful when combined with connection metadata. For example:
@ -407,15 +418,14 @@ server.onConnection(async (connection) => {
avatar: "https://example.com/avatar.png" avatar: "https://example.com/avatar.png"
}); });
}); });
```
// client: subscribe to presence and resolve metadata Then on the client:
```ts
const { success, present } = await client.subscribePresence( const { success, present } = await client.subscribePresence(
"lobby", "lobby",
async (update) => { async (update) => {
// fetch metadata for the connection that joined/left.
//
// since clients cannot access `getAllMetadataForRoom()` directly (it's just a server API),
// you can expose it via a custom command like `get-user-metadata`:
const metadata = await client.command("get-user-metadata", { const metadata = await client.command("get-user-metadata", {
connectionId: update.connectionId connectionId: update.connectionId
}); });
@ -427,16 +437,21 @@ const { success, present } = await client.subscribePresence(
} }
} }
); );
// initial presence - fetch metadata for all present connections
const allMetadata = await Promise.all(
present.map(connectionId =>
client.command("get-user-metadata", { connectionId })
)
);
console.log("Users in lobby:", allMetadata);
``` ```
To resolve all present users:
```ts
const allMetadata = await Promise.all(
present.map((connectionId) => client.command("get-user-metadata", { connectionId }))
);
// [{ userId: "user123", username: "Alice", avatar: "..." }, ...]
```
> [!TIP]
> You can expose a `get-user-metadata` command on the server that reads from `connectionManager.getMetadata(...)` to support this.
### Metadata ### Metadata
You can associate data like user IDs, tokens, or custom attributes with a connection using the `setMetadata` method. This metadata is stored in Redis, making it ideal for identifying users, managing permissions, or persisting session-related data across a distributed setup. Since it lives in Redis, its accessible from any server instance. You can associate data like user IDs, tokens, or custom attributes with a connection using the `setMetadata` method. This metadata is stored in Redis, making it ideal for identifying users, managing permissions, or persisting session-related data across a distributed setup. Since it lives in Redis, its accessible from any server instance.

View File

@ -1,12 +1,22 @@
import type { Redis } from "ioredis"; import type { Redis } from "ioredis";
import type { Connection } from "../connection"; import type { Connection } from "../connection";
import type { RoomManager } from "./room"; import type { RoomManager } from "./room";
import type { RedisManager } from "./redis";
type ChannelPattern = string | RegExp; type ChannelPattern = string | RegExp;
export class PresenceManager { export class PresenceManager {
private redis: Redis; private redis: Redis;
private roomManager: RoomManager; private roomManager: RoomManager;
private redisManager: RedisManager;
private presenceExpirationEventsEnabled: boolean;
private getExpiredEventsPattern(): string {
const dbIndex = (this.redis as any).options?.db ?? 0;
return `__keyevent@${dbIndex}__:expired`;
}
private readonly PRESENCE_KEY_PATTERN = /^mesh:presence:room:(.+):conn:(.+)$/;
private trackedRooms: ChannelPattern[] = []; private trackedRooms: ChannelPattern[] = [];
private roomGuards: Map< private roomGuards: Map<
ChannelPattern, ChannelPattern,
@ -15,9 +25,51 @@ export class PresenceManager {
private roomTTLs: Map<ChannelPattern, number> = new Map(); private roomTTLs: Map<ChannelPattern, number> = new Map();
private defaultTTL = 30_000; // 30 seconds default TTL private defaultTTL = 30_000; // 30 seconds default TTL
constructor(redis: Redis, roomManager: RoomManager) { constructor(
redis: Redis,
roomManager: RoomManager,
redisManager: RedisManager,
enableExpirationEvents: boolean = true
) {
this.redis = redis; this.redis = redis;
this.roomManager = roomManager; this.roomManager = roomManager;
this.redisManager = redisManager;
this.presenceExpirationEventsEnabled = enableExpirationEvents;
if (this.presenceExpirationEventsEnabled) {
this.subscribeToExpirationEvents();
}
}
/**
* Subscribes to Redis keyspace notifications for expired presence keys
*/
private subscribeToExpirationEvents(): void {
const { subClient } = this.redisManager;
const pattern = this.getExpiredEventsPattern();
subClient.psubscribe(pattern);
subClient.on("pmessage", (pattern, channel, key) => {
if (this.PRESENCE_KEY_PATTERN.test(key)) {
this.handleExpiredKey(key);
}
});
}
/**
* Handles an expired key notification
*/
private async handleExpiredKey(key: string): Promise<void> {
try {
const match = key.match(this.PRESENCE_KEY_PATTERN);
if (match && match[1] && match[2]) {
const roomName = match[1];
const connectionId = match[2];
await this.markOffline(connectionId, roomName);
}
} catch (err) {
console.error("[PresenceManager] Failed to handle expired key:", err);
}
} }
trackRoom( trackRoom(
@ -113,7 +165,8 @@ export class PresenceManager {
const pipeline = this.redis.pipeline(); const pipeline = this.redis.pipeline();
pipeline.sadd(roomKey, connectionId); pipeline.sadd(roomKey, connectionId);
pipeline.set(connKey, "", "EX", Math.floor(ttl / 1000)); const ttlSeconds = Math.max(1, Math.floor(ttl / 1000));
pipeline.set(connKey, "", "EX", ttlSeconds);
await pipeline.exec(); await pipeline.exec();
await this.publishPresenceUpdate(roomName, connectionId, "join"); await this.publishPresenceUpdate(roomName, connectionId, "join");
@ -134,8 +187,8 @@ export class PresenceManager {
async refreshPresence(connectionId: string, roomName: string): Promise<void> { async refreshPresence(connectionId: string, roomName: string): Promise<void> {
const connKey = this.presenceConnectionKey(roomName, connectionId); const connKey = this.presenceConnectionKey(roomName, connectionId);
const ttl = this.getRoomTTL(roomName); const ttl = this.getRoomTTL(roomName);
const ttlSeconds = Math.max(1, Math.floor(ttl / 1000));
await this.redis.set(connKey, "", "EX", Math.floor(ttl / 1000)); await this.redis.set(connKey, "", "EX", ttlSeconds);
} }
async getPresentConnections(roomName: string): Promise<string[]> { async getPresentConnections(roomName: string): Promise<string[]> {

View File

@ -117,4 +117,26 @@ export class RedisManager {
set isShuttingDown(value: boolean) { set isShuttingDown(value: boolean) {
this._isShuttingDown = value; this._isShuttingDown = value;
} }
/**
* Enables Redis keyspace notifications for expired events by updating the
* "notify-keyspace-events" configuration. Ensures that both keyevent ('E')
* and expired event ('x') notifications are enabled. If they are not already
* present, the method appends them to the current configuration.
*
* @returns {Promise<void>} A promise that resolves when the configuration has been updated.
* @throws {Error} If the Redis CONFIG commands fail or the connection encounters an error.
*/
async enableKeyspaceNotifications(): Promise<void> {
const result = await this.redis.config("GET", "notify-keyspace-events");
const currentConfig =
Array.isArray(result) && result.length > 1 ? result[1] : "";
// add expired events notification if not already enabled
// 'E' enables keyevent notifications, 'x' enables expired events
let newConfig = currentConfig || "";
if (!newConfig.includes("E")) newConfig += "E";
if (!newConfig.includes("x")) newConfig += "x";
await this.redis.config("SET", "notify-keyspace-events", newConfig);
}
} }

View File

@ -74,8 +74,14 @@ export class MeshServer extends WebSocketServer {
); );
this.presenceManager = new PresenceManager( this.presenceManager = new PresenceManager(
this.redisManager.redis, this.redisManager.redis,
this.roomManager this.roomManager,
this.redisManager,
this.serverOptions.enablePresenceExpirationEvents
); );
if (this.serverOptions.enablePresenceExpirationEvents) {
this.redisManager.enableKeyspaceNotifications()
.catch(err => this.emit("error", new Error(`Failed to enable keyspace notifications: ${err}`)));
}
this.commandManager = new CommandManager((err) => this.emit("error", err)); this.commandManager = new CommandManager((err) => this.emit("error", err));
this.channelManager = new ChannelManager( this.channelManager = new ChannelManager(
this.redisManager.redis, this.redisManager.redis,

View File

@ -37,6 +37,14 @@ export type MeshServerOptions = ServerOptions & {
latencyInterval?: number; latencyInterval?: number;
redisOptions: RedisOptions; redisOptions: RedisOptions;
/**
* Whether to enable Redis keyspace notifications for presence expiration.
* When enabled, connections will be automatically marked as offline when their presence TTL expires.
*
* @default true
*/
enablePresenceExpirationEvents?: boolean;
/** /**
* The maximum number of consecutive ping intervals the server will wait * The maximum number of consecutive ping intervals the server will wait
* for a pong response before considering the client disconnected. * for a pong response before considering the client disconnected.

View File

@ -17,6 +17,7 @@ const createTestServer = (port: number) =>
}, },
pingInterval: 1000, pingInterval: 1000,
latencyInterval: 500, latencyInterval: 500,
enablePresenceExpirationEvents: true,
}); });
const flushRedis = async () => { const flushRedis = async () => {
@ -243,6 +244,39 @@ describe("Presence Subscription", () => {
present = await server.presenceManager.getPresentConnections(roomName); present = await server.presenceManager.getPresentConnections(roomName);
expect(present).not.toContain(connection2.id); expect(present).not.toContain(connection2.id);
}); });
test("presence is automatically cleaned up when TTL expires", async () => {
const roomName = "test:room:auto-cleanup";
const shortTTL = 1000;
const testServer = createTestServer(port + 100);
await testServer.ready();
testServer.trackPresence(roomName, { ttl: shortTTL });
const testClient = new MeshClient(`ws://localhost:${port + 100}`);
await testClient.connect();
const connections = testServer.connectionManager.getLocalConnections();
const connection = connections[0]!;
await testServer.addToRoom(roomName, connection);
let present = await testServer.presenceManager.getPresentConnections(
roomName
);
expect(present).toContain(connection.id);
// wait for more than the TTL to allow the key to expire and notification to be processed
await wait(shortTTL * 3);
// the connection should be automatically marked as offline when the key expires
present = await testServer.presenceManager.getPresentConnections(roomName);
expect(present).not.toContain(connection.id);
await testClient.close();
await testServer.close();
}, 10000);
}); });
describe("Presence Subscription (Multiple Instances)", () => { describe("Presence Subscription (Multiple Instances)", () => {