diff --git a/packages/mesh/src/tests/record-subscription.test.ts b/packages/mesh/src/tests/record-subscription.test.ts index 3777677..88cbb7b 100644 --- a/packages/mesh/src/tests/record-subscription.test.ts +++ b/packages/mesh/src/tests/record-subscription.test.ts @@ -531,3 +531,196 @@ describe("Record Subscription", () => { expect(serverState.version).toBe(2); }); }); + +describe("Record Subscription (Multiple Instances)", () => { + let serverA: MeshServer; + let serverB: MeshServer; + let clientA: MeshClient; + let clientB: MeshClient; + + const portA = 8131; + const portB = 8132; + const recordId = "test:record:multi-instance"; + const writableRecordId = "writable:record:multi-instance"; + + beforeEach(async () => { + await flushRedis(); + + serverA = createTestServer(portA); + serverB = createTestServer(portB); + + [serverA, serverB].forEach((server) => { + server.exposeRecord(/^test:record:.*/); + server.exposeWritableRecord(/^writable:record:.*/); + }); + + await serverA.ready(); + await serverB.ready(); + + clientA = new MeshClient(`ws://localhost:${portA}`); + clientB = new MeshClient(`ws://localhost:${portB}`); + }); + + afterEach(async () => { + await clientA.close(); + await clientB.close(); + await serverA.close(); + await serverB.close(); + }); + + test("server-published update propagates across instances (full mode)", async () => { + await clientA.connect(); + await clientB.connect(); + + const callbackA = vi.fn(); + const callbackB = vi.fn(); + + await clientA.subscribeRecord(recordId, callbackA); + await clientB.subscribeRecord(recordId, callbackB); + + await wait(50); + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackB).toHaveBeenCalledTimes(1); + expect(callbackA).toHaveBeenCalledWith({ + recordId, + full: null, + version: 0, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId, + full: null, + version: 0, + }); + + const data1 = { value: "update1" }; + // server A publishes, should reach both clients + await serverA.publishRecordUpdate(recordId, data1); + + await wait(150); + + expect(callbackA).toHaveBeenCalledTimes(2); + expect(callbackB).toHaveBeenCalledTimes(2); + expect(callbackA).toHaveBeenCalledWith({ + recordId, + full: data1, + version: 1, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId, + full: data1, + version: 1, + }); + }, 10000); + + test("client-published update propagates across instances (full mode)", async () => { + await clientA.connect(); + await clientB.connect(); + + const callbackA = vi.fn(); + const callbackB = vi.fn(); + + await clientA.subscribeRecord(writableRecordId, callbackA); + await clientB.subscribeRecord(writableRecordId, callbackB); + + await wait(50); + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackB).toHaveBeenCalledTimes(1); + + const data1 = { message: "hello from client A" }; + // client A writes, should propagate to client B via server B + const writeSuccess = await clientA.publishRecordUpdate( + writableRecordId, + data1 + ); + expect(writeSuccess).toBe(true); + + await wait(150); + + expect(callbackA).toHaveBeenCalledTimes(2); + expect(callbackB).toHaveBeenCalledTimes(2); + expect(callbackA).toHaveBeenCalledWith({ + recordId: writableRecordId, + full: data1, + version: 1, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId: writableRecordId, + full: data1, + version: 1, + }); + }, 10000); + + test("client-published update propagates across instances (patch mode)", async () => { + await clientA.connect(); + await clientB.connect(); + + const callbackA = vi.fn(); + const callbackB = vi.fn(); + + await clientA.subscribeRecord(writableRecordId, callbackA, { + mode: "patch", + }); + await clientB.subscribeRecord(writableRecordId, callbackB, { + mode: "patch", + }); + + await wait(50); + expect(callbackA).toHaveBeenCalledTimes(1); + expect(callbackB).toHaveBeenCalledTimes(1); + expect(callbackA).toHaveBeenCalledWith({ + recordId: writableRecordId, + full: null, + version: 0, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId: writableRecordId, + full: null, + version: 0, + }); + + const data1 = { count: 1 }; + // client A writes first update + let writeSuccess = await clientA.publishRecordUpdate( + writableRecordId, + data1 + ); + expect(writeSuccess).toBe(true); + + await wait(150); + expect(callbackA).toHaveBeenCalledTimes(2); + expect(callbackB).toHaveBeenCalledTimes(2); + + const patch1 = [{ op: "add", path: "/count", value: 1 }]; + expect(callbackA).toHaveBeenCalledWith({ + recordId: writableRecordId, + patch: patch1, + version: 1, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId: writableRecordId, + patch: patch1, + version: 1, + }); + + const data2 = { count: 1, name: "added" }; + // client A writes second update + writeSuccess = await clientA.publishRecordUpdate(writableRecordId, data2); + expect(writeSuccess).toBe(true); + + await wait(150); + expect(callbackA).toHaveBeenCalledTimes(3); + expect(callbackB).toHaveBeenCalledTimes(3); + + const patch2 = [{ op: "add", path: "/name", value: "added" }]; + expect(callbackA).toHaveBeenCalledWith({ + recordId: writableRecordId, + patch: patch2, + version: 2, + }); + expect(callbackB).toHaveBeenCalledWith({ + recordId: writableRecordId, + patch: patch2, + version: 2, + }); + }, 10000); +});