Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions
Redis Streams pub/sub (XADD/XREAD BLOCK) with Last-Event-ID replay, bounded backpressure queue, 25s heartbeat, and full retry/abort handling. Publishes RealtimeEvents from Baserow webhook handler after cache invalidation. 380 tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
476 lines
16 KiB
TypeScript
476 lines
16 KiB
TypeScript
/**
|
|
* Tests event-bus — Redis Streams publish/subscribe.
|
|
*
|
|
* Stratégie : testcontainers Redis 7-alpine pour les tests d'intégration
|
|
* (publish, subscribe, replay Last-Event-ID). Les tests unitaires (parseStreamEntry,
|
|
* passesFilter) sont purement en mémoire sans Redis.
|
|
*/
|
|
|
|
import IORedis, { type Redis } from 'ioredis';
|
|
import pino from 'pino';
|
|
import { GenericContainer, type StartedTestContainer } from 'testcontainers';
|
|
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from 'vitest';
|
|
import type { RealtimeEvent } from '../../src/domain/event.js';
|
|
import {
|
|
STREAM_KEY,
|
|
parseStreamEntry,
|
|
passesFilter,
|
|
publishEvent,
|
|
subscribeToStream,
|
|
} from '../../src/lib/event-bus.js';
|
|
|
|
const silentLogger = pino({ level: 'silent' });
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Unitaires (pas de Redis)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
describe('parseStreamEntry', () => {
|
|
it('parse un entry rows.created minimal', () => {
|
|
const fields = ['type', 'row.created', 'tableId', '42', 'rowId', '100'];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
expect(result).toEqual({ type: 'row.created', tableId: 42, rowId: 100 });
|
|
});
|
|
|
|
it('parse un entry avec viewId', () => {
|
|
const fields = ['type', 'view.updated', 'tableId', '5', 'viewId', '200'];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
expect(result).toEqual({ type: 'view.updated', tableId: 5, viewId: 200 });
|
|
});
|
|
|
|
it('parse un entry avec row JSON', () => {
|
|
const row = { name: 'Alice', age: 30 };
|
|
const fields = [
|
|
'type',
|
|
'row.updated',
|
|
'tableId',
|
|
'7',
|
|
'rowId',
|
|
'1',
|
|
'row',
|
|
JSON.stringify(row),
|
|
];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
expect(result?.row).toEqual(row);
|
|
});
|
|
|
|
it('retourne null sur type inconnu', () => {
|
|
const fields = ['type', 'unknown.event', 'tableId', '1'];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('retourne null si tableId manquant', () => {
|
|
const fields = ['type', 'row.created'];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('ignore row JSON invalide et livre l event sans row', () => {
|
|
const fields = ['type', 'row.created', 'tableId', '1', 'rowId', '2', 'row', 'NOT_JSON'];
|
|
const result = parseStreamEntry(fields, silentLogger);
|
|
// L'event est livré sans le payload row malformé.
|
|
expect(result).not.toBeNull();
|
|
expect(result?.row).toBeUndefined();
|
|
expect(result?.type).toBe('row.created');
|
|
});
|
|
});
|
|
|
|
describe('passesFilter', () => {
|
|
const event: RealtimeEvent = { type: 'row.created', tableId: 10, viewId: 20, rowId: 5 };
|
|
|
|
it('filtre vide -> passe toujours', () => {
|
|
expect(passesFilter(event, {})).toBe(true);
|
|
expect(passesFilter(event, { tableIds: new Set(), viewIds: new Set() })).toBe(true);
|
|
});
|
|
|
|
it('filtre tableIds match -> passe', () => {
|
|
expect(passesFilter(event, { tableIds: new Set([10]) })).toBe(true);
|
|
});
|
|
|
|
it('filtre tableIds no match -> rejecte', () => {
|
|
expect(passesFilter(event, { tableIds: new Set([99]) })).toBe(false);
|
|
});
|
|
|
|
it('filtre viewIds match -> passe', () => {
|
|
expect(passesFilter(event, { viewIds: new Set([20]) })).toBe(true);
|
|
});
|
|
|
|
it('filtre viewIds no match -> rejecte', () => {
|
|
expect(passesFilter(event, { viewIds: new Set([99]) })).toBe(false);
|
|
});
|
|
|
|
it('event sans viewId rejecte sur filtre viewIds strict', () => {
|
|
const noView: RealtimeEvent = { type: 'row.created', tableId: 10 };
|
|
expect(passesFilter(noView, { viewIds: new Set([20]) })).toBe(false);
|
|
});
|
|
|
|
it('filtre tableIds + viewIds combinés -> passe ssi les deux matchent', () => {
|
|
expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([20]) })).toBe(true);
|
|
expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([99]) })).toBe(false);
|
|
expect(passesFilter(event, { tableIds: new Set([99]), viewIds: new Set([20]) })).toBe(false);
|
|
});
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Unitaires avec fake Redis (retry logic, xrevrange failure)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
describe('subscribeToStream — retry logic (fake Redis)', () => {
|
|
it('retente sur erreur XREAD transitoire et livre les events ensuite', async () => {
|
|
// Premier xread lance une erreur, second retourne un event.
|
|
const fields = ['type', 'row.created', 'tableId', '7', 'rowId', '1'];
|
|
let callCount = 0;
|
|
const received: Array<{ id: string; event: RealtimeEvent }> = [];
|
|
const ac = new AbortController();
|
|
|
|
const fakeRedis = {
|
|
xrevrange: vi.fn().mockResolvedValue([['999-0', []]]),
|
|
xread: vi.fn().mockImplementation(async () => {
|
|
callCount++;
|
|
if (callCount === 1) throw new Error('ECONNRESET');
|
|
// Deuxième appel : retourne l'event (on aborts depuis onEvent après réception).
|
|
return [['events:stream', [['1000-0', fields]]]];
|
|
}),
|
|
} as unknown as Redis;
|
|
|
|
await subscribeToStream({
|
|
redis: fakeRedis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async (id, ev) => {
|
|
received.push({ id, event: ev });
|
|
// Abort après avoir reçu l'event pour sortir de la boucle.
|
|
ac.abort();
|
|
},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
maxRetries: 3,
|
|
blockMs: 50,
|
|
});
|
|
|
|
// callCount >= 2 : au moins un retry a eu lieu.
|
|
expect(callCount).toBeGreaterThanOrEqual(2);
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0].event.type).toBe('row.created');
|
|
}, 10_000);
|
|
|
|
it('throw si max retries depasse', async () => {
|
|
const ac = new AbortController();
|
|
const fakeRedis = {
|
|
xrevrange: vi.fn().mockResolvedValue([]),
|
|
xread: vi.fn().mockRejectedValue(new Error('Redis unavailable')),
|
|
} as unknown as Redis;
|
|
|
|
await expect(
|
|
subscribeToStream({
|
|
redis: fakeRedis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async () => {},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
maxRetries: 1,
|
|
blockMs: 10,
|
|
}),
|
|
).rejects.toThrow('Redis unavailable');
|
|
}, 10_000);
|
|
|
|
it('sort proprement si le signal est abort avant le premier XREAD', async () => {
|
|
const ac = new AbortController();
|
|
// Abort avant que subscribeToStream ne commence.
|
|
ac.abort();
|
|
const fakeRedis = {
|
|
xrevrange: vi.fn().mockResolvedValue([]),
|
|
xread: vi.fn(),
|
|
} as unknown as Redis;
|
|
|
|
await expect(
|
|
subscribeToStream({
|
|
redis: fakeRedis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async () => {},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
blockMs: 10,
|
|
}),
|
|
).resolves.toBeUndefined();
|
|
|
|
// xread ne doit jamais avoir été appelé.
|
|
expect(fakeRedis.xread).not.toHaveBeenCalled();
|
|
}, 5_000);
|
|
|
|
it('sort proprement si le signal est abort pendant le retry backoff', async () => {
|
|
let callCount = 0;
|
|
const ac = new AbortController();
|
|
const fakeRedis = {
|
|
xrevrange: vi.fn().mockResolvedValue([]),
|
|
xread: vi.fn().mockImplementation(async () => {
|
|
callCount++;
|
|
// Abort le signal pendant le backoff pour couvrir la branche abort dans le catch.
|
|
setTimeout(() => ac.abort(), 10);
|
|
throw new Error('ECONNRESET during retry');
|
|
}),
|
|
} as unknown as Redis;
|
|
|
|
await expect(
|
|
subscribeToStream({
|
|
redis: fakeRedis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async () => {},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
maxRetries: 5,
|
|
blockMs: 10,
|
|
}),
|
|
).resolves.toBeUndefined();
|
|
|
|
// Exactement 1 XREAD avant l'abort (abort coupe le backoff et sort de la boucle).
|
|
expect(callCount).toBe(1);
|
|
}, 10_000);
|
|
|
|
it('fromId null avec xrevrange qui throw utilise Date.now() comme cursor', async () => {
|
|
const ac = new AbortController();
|
|
const beforeNow = Date.now();
|
|
|
|
const fakeRedis = {
|
|
// xrevrange lance une erreur — couvre le catch lines 182-185.
|
|
xrevrange: vi.fn().mockRejectedValue(new Error('xrevrange failed')),
|
|
xread: vi.fn().mockImplementation(async () => {
|
|
// On vérifie que subscribeToStream a bien initialisé un cursor proche de now.
|
|
ac.abort();
|
|
return null;
|
|
}),
|
|
} as unknown as Redis;
|
|
|
|
await subscribeToStream({
|
|
redis: fakeRedis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async () => {},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
blockMs: 10,
|
|
});
|
|
|
|
const afterNow = Date.now();
|
|
// xread a bien été appelé (curseur initialisé), avec un argument cursor ~ Date.now().
|
|
expect(fakeRedis.xread).toHaveBeenCalledOnce();
|
|
const xreadArgs = vi.mocked(fakeRedis.xread).mock.calls[0] as unknown[];
|
|
// Args : 'COUNT', 100, 'BLOCK', blockMs, 'STREAMS', STREAM_KEY, cursor
|
|
const cursor = String(xreadArgs[xreadArgs.length - 1]);
|
|
const ts = Number(cursor.split('-')[0]);
|
|
expect(ts).toBeGreaterThanOrEqual(beforeNow);
|
|
expect(ts).toBeLessThanOrEqual(afterNow + 100);
|
|
}, 5_000);
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Integration (testcontainers Redis)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
describe('event-bus integration (Redis Streams)', () => {
|
|
let container: StartedTestContainer;
|
|
let redis: Redis;
|
|
|
|
beforeAll(async () => {
|
|
container = await new GenericContainer('redis:7-alpine').withExposedPorts(6379).start();
|
|
const host = container.getHost();
|
|
const port = container.getMappedPort(6379);
|
|
redis = new IORedis(`redis://${host}:${port}`, {
|
|
lazyConnect: false,
|
|
maxRetriesPerRequest: 3,
|
|
});
|
|
await new Promise<void>((resolve, reject) => {
|
|
const timer = setTimeout(() => reject(new Error('redis ready timeout')), 10_000);
|
|
if (redis.status === 'ready') {
|
|
clearTimeout(timer);
|
|
resolve();
|
|
return;
|
|
}
|
|
redis.once('ready', () => {
|
|
clearTimeout(timer);
|
|
resolve();
|
|
});
|
|
});
|
|
}, 60_000);
|
|
|
|
afterAll(async () => {
|
|
await redis?.quit();
|
|
await container?.stop();
|
|
}, 30_000);
|
|
|
|
afterEach(async () => {
|
|
await redis.del(STREAM_KEY);
|
|
});
|
|
|
|
it('publishEvent ajoute une entry au stream et retourne un ID', async () => {
|
|
const event: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 42 };
|
|
const id = await publishEvent(redis, event);
|
|
expect(id).toMatch(/^\d+-\d+$/);
|
|
|
|
// Vérifie que l'entry est bien dans le stream.
|
|
const entries = await redis.xrange(STREAM_KEY, '-', '+');
|
|
expect(entries).toHaveLength(1);
|
|
expect(entries[0][0]).toBe(id);
|
|
});
|
|
|
|
it('publishEvent sérialise les champs optionnels', async () => {
|
|
const row = { field1: 'hello', num: 42 };
|
|
const event: RealtimeEvent = {
|
|
type: 'row.updated',
|
|
tableId: 5,
|
|
viewId: 10,
|
|
rowId: 99,
|
|
row,
|
|
};
|
|
const id = await publishEvent(redis, event);
|
|
const entries = await redis.xrange(STREAM_KEY, '-', '+');
|
|
expect(entries).toHaveLength(1);
|
|
|
|
// Reconstruit le hash depuis les fields.
|
|
const fields = entries[0][1];
|
|
const map: Record<string, string> = {};
|
|
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
|
|
expect(map.type).toBe('row.updated');
|
|
expect(map.tableId).toBe('5');
|
|
expect(map.viewId).toBe('10');
|
|
expect(map.rowId).toBe('99');
|
|
expect(JSON.parse(map.row)).toEqual(row);
|
|
|
|
// Vérifie que parseStreamEntry round-trip fonctionne.
|
|
const parsed = parseStreamEntry(fields, silentLogger);
|
|
expect(parsed).toEqual({ type: 'row.updated', tableId: 5, viewId: 10, rowId: 99, row });
|
|
|
|
void id; // Utilisé pour la vérification ci-dessus.
|
|
});
|
|
|
|
it('MAXLEN borne le stream (approximatif)', async () => {
|
|
const maxLen = 100;
|
|
// Publie 500 events pour déclencher le trimming.
|
|
for (let i = 0; i < 500; i++) {
|
|
await publishEvent(redis, { type: 'row.created', tableId: i + 1 }, maxLen);
|
|
}
|
|
const count = await redis.xlen(STREAM_KEY);
|
|
// MAXLEN ~ est approximatif — Redis conserve maxLen + quelques entries (radix node boundary).
|
|
// En pratique, on vérifie que le stream est significativement plus petit que 500.
|
|
expect(count).toBeLessThan(300);
|
|
});
|
|
|
|
it('subscribeToStream livre les events publiés après la connexion', async () => {
|
|
const received: Array<{ id: string; event: RealtimeEvent }> = [];
|
|
const ac = new AbortController();
|
|
|
|
const subscribePromise = subscribeToStream({
|
|
redis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async (id, event) => {
|
|
received.push({ id, event });
|
|
if (received.length >= 2) ac.abort();
|
|
},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
// BLOCK court pour que les tests ne bloquent pas 2s par cycle.
|
|
blockMs: 200,
|
|
});
|
|
|
|
// Attend que le subscriber soit prêt (curseur initialisé + 1er XREAD lancé).
|
|
// 300ms pour absorber la latence réseau testcontainer.
|
|
await new Promise((r) => setTimeout(r, 300));
|
|
|
|
const e1: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 1 };
|
|
const e2: RealtimeEvent = { type: 'row.updated', tableId: 2, rowId: 2 };
|
|
await publishEvent(redis, e1);
|
|
await publishEvent(redis, e2);
|
|
|
|
await subscribePromise;
|
|
|
|
expect(received).toHaveLength(2);
|
|
expect(received[0].event.type).toBe('row.created');
|
|
expect(received[1].event.type).toBe('row.updated');
|
|
}, 15_000);
|
|
|
|
it('subscribeToStream filtre les events par tableIds', async () => {
|
|
const received: Array<{ id: string; event: RealtimeEvent }> = [];
|
|
const ac = new AbortController();
|
|
|
|
const subscribePromise = subscribeToStream({
|
|
redis,
|
|
filter: { tableIds: new Set([42]) },
|
|
fromId: null,
|
|
onEvent: async (id, event) => {
|
|
received.push({ id, event });
|
|
if (received.length >= 1) ac.abort();
|
|
},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
blockMs: 200,
|
|
});
|
|
|
|
await new Promise((r) => setTimeout(r, 300));
|
|
|
|
// Publie un event hors filtre et un event dans le filtre.
|
|
await publishEvent(redis, { type: 'row.created', tableId: 99, rowId: 1 });
|
|
await publishEvent(redis, { type: 'row.created', tableId: 42, rowId: 2 });
|
|
|
|
await subscribePromise;
|
|
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0].event.tableId).toBe(42);
|
|
}, 15_000);
|
|
|
|
it('subscribeToStream supporte le replay Last-Event-ID', async () => {
|
|
// Publie 3 events avant la connexion.
|
|
const id1 = await publishEvent(redis, { type: 'row.created', tableId: 1, rowId: 1 });
|
|
const id2 = await publishEvent(redis, { type: 'row.updated', tableId: 1, rowId: 2 });
|
|
await publishEvent(redis, { type: 'row.deleted', tableId: 1, rowId: 3 });
|
|
|
|
// Simule une reconnexion depuis id1 : on doit recevoir id2 et id3.
|
|
const received: Array<{ id: string; event: RealtimeEvent }> = [];
|
|
const ac = new AbortController();
|
|
|
|
await subscribeToStream({
|
|
redis,
|
|
filter: {},
|
|
fromId: id1, // Replay depuis id1 exclu (XREAD strict).
|
|
onEvent: async (id, event) => {
|
|
received.push({ id, event });
|
|
// id2 + id3 = 2 events.
|
|
if (received.length >= 2) ac.abort();
|
|
},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
blockMs: 200,
|
|
});
|
|
|
|
expect(received).toHaveLength(2);
|
|
expect(received[0].id).toBe(id2);
|
|
|
|
void id2; // Référencé ci-dessus.
|
|
}, 15_000);
|
|
|
|
it('subscribeToStream s arrête proprement sur abort signal', async () => {
|
|
const ac = new AbortController();
|
|
let callCount = 0;
|
|
|
|
const subscribePromise = subscribeToStream({
|
|
redis,
|
|
filter: {},
|
|
fromId: null,
|
|
onEvent: async () => {
|
|
callCount++;
|
|
},
|
|
signal: ac.signal,
|
|
logger: silentLogger,
|
|
blockMs: 200,
|
|
});
|
|
|
|
// Abort immédiat — la promesse doit résoudre sans erreur.
|
|
ac.abort();
|
|
await expect(subscribePromise).resolves.toBeUndefined();
|
|
expect(callCount).toBe(0);
|
|
}, 10_000);
|
|
});
|