mirror of
https://github.com/nvms/prsm.git
synced 2025-12-16 08:00:53 +00:00
add some record subscription tests
This commit is contained in:
parent
bbd48020de
commit
c6cb0da27c
@ -531,3 +531,196 @@ describe("Record Subscription", () => {
|
|||||||
expect(serverState.version).toBe(2);
|
expect(serverState.version).toBe(2);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Record Subscription (Multiple Instances)", () => {
|
||||||
|
let serverA: MeshServer;
|
||||||
|
let serverB: MeshServer;
|
||||||
|
let clientA: MeshClient;
|
||||||
|
let clientB: MeshClient;
|
||||||
|
|
||||||
|
const portA = 8131;
|
||||||
|
const portB = 8132;
|
||||||
|
const recordId = "test:record:multi-instance";
|
||||||
|
const writableRecordId = "writable:record:multi-instance";
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await flushRedis();
|
||||||
|
|
||||||
|
serverA = createTestServer(portA);
|
||||||
|
serverB = createTestServer(portB);
|
||||||
|
|
||||||
|
[serverA, serverB].forEach((server) => {
|
||||||
|
server.exposeRecord(/^test:record:.*/);
|
||||||
|
server.exposeWritableRecord(/^writable:record:.*/);
|
||||||
|
});
|
||||||
|
|
||||||
|
await serverA.ready();
|
||||||
|
await serverB.ready();
|
||||||
|
|
||||||
|
clientA = new MeshClient(`ws://localhost:${portA}`);
|
||||||
|
clientB = new MeshClient(`ws://localhost:${portB}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await clientA.close();
|
||||||
|
await clientB.close();
|
||||||
|
await serverA.close();
|
||||||
|
await serverB.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
test("server-published update propagates across instances (full mode)", async () => {
|
||||||
|
await clientA.connect();
|
||||||
|
await clientB.connect();
|
||||||
|
|
||||||
|
const callbackA = vi.fn();
|
||||||
|
const callbackB = vi.fn();
|
||||||
|
|
||||||
|
await clientA.subscribeRecord(recordId, callbackA);
|
||||||
|
await clientB.subscribeRecord(recordId, callbackB);
|
||||||
|
|
||||||
|
await wait(50);
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(1);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(1);
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId,
|
||||||
|
full: null,
|
||||||
|
version: 0,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId,
|
||||||
|
full: null,
|
||||||
|
version: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
const data1 = { value: "update1" };
|
||||||
|
// server A publishes, should reach both clients
|
||||||
|
await serverA.publishRecordUpdate(recordId, data1);
|
||||||
|
|
||||||
|
await wait(150);
|
||||||
|
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(2);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(2);
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId,
|
||||||
|
full: data1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId,
|
||||||
|
full: data1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
}, 10000);
|
||||||
|
|
||||||
|
test("client-published update propagates across instances (full mode)", async () => {
|
||||||
|
await clientA.connect();
|
||||||
|
await clientB.connect();
|
||||||
|
|
||||||
|
const callbackA = vi.fn();
|
||||||
|
const callbackB = vi.fn();
|
||||||
|
|
||||||
|
await clientA.subscribeRecord(writableRecordId, callbackA);
|
||||||
|
await clientB.subscribeRecord(writableRecordId, callbackB);
|
||||||
|
|
||||||
|
await wait(50);
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(1);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
const data1 = { message: "hello from client A" };
|
||||||
|
// client A writes, should propagate to client B via server B
|
||||||
|
const writeSuccess = await clientA.publishRecordUpdate(
|
||||||
|
writableRecordId,
|
||||||
|
data1
|
||||||
|
);
|
||||||
|
expect(writeSuccess).toBe(true);
|
||||||
|
|
||||||
|
await wait(150);
|
||||||
|
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(2);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(2);
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
full: data1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
full: data1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
}, 10000);
|
||||||
|
|
||||||
|
test("client-published update propagates across instances (patch mode)", async () => {
|
||||||
|
await clientA.connect();
|
||||||
|
await clientB.connect();
|
||||||
|
|
||||||
|
const callbackA = vi.fn();
|
||||||
|
const callbackB = vi.fn();
|
||||||
|
|
||||||
|
await clientA.subscribeRecord(writableRecordId, callbackA, {
|
||||||
|
mode: "patch",
|
||||||
|
});
|
||||||
|
await clientB.subscribeRecord(writableRecordId, callbackB, {
|
||||||
|
mode: "patch",
|
||||||
|
});
|
||||||
|
|
||||||
|
await wait(50);
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(1);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(1);
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
full: null,
|
||||||
|
version: 0,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
full: null,
|
||||||
|
version: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
const data1 = { count: 1 };
|
||||||
|
// client A writes first update
|
||||||
|
let writeSuccess = await clientA.publishRecordUpdate(
|
||||||
|
writableRecordId,
|
||||||
|
data1
|
||||||
|
);
|
||||||
|
expect(writeSuccess).toBe(true);
|
||||||
|
|
||||||
|
await wait(150);
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(2);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
const patch1 = [{ op: "add", path: "/count", value: 1 }];
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
patch: patch1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
patch: patch1,
|
||||||
|
version: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
const data2 = { count: 1, name: "added" };
|
||||||
|
// client A writes second update
|
||||||
|
writeSuccess = await clientA.publishRecordUpdate(writableRecordId, data2);
|
||||||
|
expect(writeSuccess).toBe(true);
|
||||||
|
|
||||||
|
await wait(150);
|
||||||
|
expect(callbackA).toHaveBeenCalledTimes(3);
|
||||||
|
expect(callbackB).toHaveBeenCalledTimes(3);
|
||||||
|
|
||||||
|
const patch2 = [{ op: "add", path: "/name", value: "added" }];
|
||||||
|
expect(callbackA).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
patch: patch2,
|
||||||
|
version: 2,
|
||||||
|
});
|
||||||
|
expect(callbackB).toHaveBeenCalledWith({
|
||||||
|
recordId: writableRecordId,
|
||||||
|
patch: patch2,
|
||||||
|
version: 2,
|
||||||
|
});
|
||||||
|
}, 10000);
|
||||||
|
});
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user