From 5a591827758ebbe9a5045355fa38a3385b5f95b4 Mon Sep 17 00:00:00 2001 From: nvms Date: Fri, 18 Apr 2025 09:22:51 -0400 Subject: [PATCH] include the record identifier in the `subscribeRecord` callback --- packages/mesh/README.md | 13 ++++-- packages/mesh/package.json | 2 +- packages/mesh/src/client/client.ts | 12 ++++-- .../src/tests/record-subscription.test.ts | 41 +++++++++++++------ 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/packages/mesh/README.md b/packages/mesh/README.md index 253373f..0043b83 100644 --- a/packages/mesh/README.md +++ b/packages/mesh/README.md @@ -308,8 +308,12 @@ let userProfile = {}; const { success, record, version } = await client.subscribeRecord( "user:123", (update) => { + // update contains { recordId, full, version } userProfile = update.full; - console.log(`Received full update v${update.version}:`, update.full); + console.log( + `Received full update for ${update.recordId} v${update.version}:`, + update.full + ); } ); @@ -330,14 +334,17 @@ let productData = {}; const { success, record, version } = await client.subscribeRecord( "product:456", (update) => { + // update contains { recordId, patch?, full?, version } if (update.patch) { // normally you’ll receive `patch`, but if the client falls out of sync, // the server will send a full update instead to resynchronize. applyPatch(productData, update.patch); - console.log(`Applied patch v${update.version}`); + console.log(`Applied patch for ${update.recordId} v${update.version}`); } else { productData = update.full; - console.log(`Received full (resync) v${update.version}`); + console.log( + `Received full (resync) for ${update.recordId} v${update.version}` + ); } }, { mode: "patch" } diff --git a/packages/mesh/package.json b/packages/mesh/package.json index d79c8f0..5d5faad 100644 --- a/packages/mesh/package.json +++ b/packages/mesh/package.json @@ -1,6 +1,6 @@ { "name": "@prsm/mesh", - "version": "1.0.2", + "version": "1.0.3", "type": "module", "exports": { "./server": { diff --git a/packages/mesh/src/client/client.ts b/packages/mesh/src/client/client.ts index 3795bc3..6c3a26f 100644 --- a/packages/mesh/src/client/client.ts +++ b/packages/mesh/src/client/client.ts @@ -66,6 +66,7 @@ export class MeshClient extends EventEmitter { string, // recordId { callback: (update: { + recordId: string; full?: any; patch?: Operation[]; version: number; @@ -409,14 +410,14 @@ export class MeshClient extends EventEmitter { } subscription.localVersion = version; - await subscription.callback({ patch, version }); + await subscription.callback({ recordId, patch, version }); return; } if (full !== undefined) { subscription.localVersion = version; - await subscription.callback({ full, version }); + await subscription.callback({ recordId, full, version }); } } @@ -484,6 +485,7 @@ export class MeshClient extends EventEmitter { async subscribeRecord( recordId: string, callback: (update: { + recordId: string; full?: any; patch?: Operation[]; version: number; @@ -502,7 +504,11 @@ export class MeshClient extends EventEmitter { mode, }); // Immediately call callback with the initial full record - await callback({ full: result.record, version: result.version }); + await callback({ + recordId, + full: result.record, + version: result.version, + }); } return { diff --git a/packages/mesh/src/tests/record-subscription.test.ts b/packages/mesh/src/tests/record-subscription.test.ts index 1fc13e6..2f53343 100644 --- a/packages/mesh/src/tests/record-subscription.test.ts +++ b/packages/mesh/src/tests/record-subscription.test.ts @@ -70,7 +70,11 @@ describe("Record Subscription", () => { // callback is called once initially with the full record expect(callback).toHaveBeenCalledTimes(1); - expect(callback).toHaveBeenCalledWith({ full: initialData, version: 1 }); + expect(callback).toHaveBeenCalledWith({ + recordId, + full: initialData, + version: 1, + }); }); test("client cannot subscribe to an unexposed record", async () => { @@ -109,6 +113,11 @@ describe("Record Subscription", () => { expect(result1.version).toBe(0); // nothing published yet expect(result1.record).toBeNull(); expect(callback1).toHaveBeenCalledTimes(1); // initial call with null + expect(callback1).toHaveBeenCalledWith({ + recordId: "guarded:record", + full: null, + version: 0, + }); expect(result2.success).toBe(false); expect(result2.version).toBe(0); @@ -136,9 +145,9 @@ describe("Record Subscription", () => { await wait(50); expect(updates.length).toBe(3); // initial + 2 updates - expect(updates[0]).toEqual({ full: null, version: 0 }); - expect(updates[1]).toEqual({ full: data1, version: 1 }); - expect(updates[2]).toEqual({ full: data2, version: 2 }); + expect(updates[0]).toEqual({ recordId, full: null, version: 0 }); + expect(updates[1]).toEqual({ recordId, full: data1, version: 1 }); + expect(updates[2]).toEqual({ recordId, full: data2, version: 2 }); }); test("client receives patch updates when mode is 'patch'", async () => { @@ -165,16 +174,19 @@ describe("Record Subscription", () => { await wait(50); expect(updates.length).toBe(4); - expect(updates[0]).toEqual({ full: null, version: 0 }); + expect(updates[0]).toEqual({ recordId, full: null, version: 0 }); expect(updates[1]).toEqual({ + recordId, patch: [{ op: "add", path: "/count", value: 1 }], version: 1, }); expect(updates[2]).toEqual({ + recordId, patch: [{ op: "add", path: "/name", value: "added" }], version: 2, }); expect(updates[3]).toEqual({ + recordId, patch: [{ op: "remove", path: "/count" }], version: 3, }); @@ -207,18 +219,20 @@ describe("Record Subscription", () => { // client 1 wants full updates expect(updates1.length).toBe(3); - expect(updates1[0]).toEqual({ full: null, version: 0 }); - expect(updates1[1]).toEqual({ full: data1, version: 1 }); - expect(updates1[2]).toEqual({ full: data2, version: 2 }); + expect(updates1[0]).toEqual({ recordId, full: null, version: 0 }); + expect(updates1[1]).toEqual({ recordId, full: data1, version: 1 }); + expect(updates1[2]).toEqual({ recordId, full: data2, version: 2 }); // client 2 wants patches expect(updates2.length).toBe(3); - expect(updates2[0]).toEqual({ full: null, version: 0 }); + expect(updates2[0]).toEqual({ recordId, full: null, version: 0 }); expect(updates2[1]).toEqual({ + recordId, patch: [{ op: "add", path: "/value", value: "a" }], version: 1, }); expect(updates2[2]).toEqual({ + recordId, patch: [{ op: "replace", path: "/value", value: "b" }], version: 2, }); @@ -245,8 +259,8 @@ describe("Record Subscription", () => { await wait(50); expect(updates.length).toBe(2); - expect(updates[0]).toEqual({ full: null, version: 0 }); - expect(updates[1]).toEqual({ full: { count: 1 }, version: 1 }); + expect(updates[0]).toEqual({ recordId, full: null, version: 0 }); + expect(updates[1]).toEqual({ recordId, full: { count: 1 }, version: 1 }); }); test("desync detection triggers resubscribe (patch mode)", async () => { @@ -283,13 +297,14 @@ describe("Record Subscription", () => { await wait(100); // allocate time for desync handling expect(callback).toHaveBeenCalledTimes(3); // v0, v1, v4 - expect(updates[0]).toEqual({ full: null, version: 0 }); + expect(updates[0]).toEqual({ recordId, full: null, version: 0 }); expect(updates[1]).toEqual({ + recordId, patch: [{ op: "add", path: "/count", value: 1 }], version: 1, }); // third call is the full record after resync - expect(updates[2]).toEqual({ full: data4, version: 4 }); + expect(updates[2]).toEqual({ recordId, full: data4, version: 4 }); // verify unsubscribe and subscribe were called for resync expect(commandSpy).toHaveBeenCalledWith(