/** * Tests event-bus — Redis Streams publish/subscribe. * * Stratégie : testcontainers Redis 7-alpine pour les tests d'intégration * (publish, subscribe, replay Last-Event-ID). Les tests unitaires (parseStreamEntry, * passesFilter) sont purement en mémoire sans Redis. */ import IORedis, { type Redis } from 'ioredis'; import pino from 'pino'; import { GenericContainer, type StartedTestContainer } from 'testcontainers'; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from 'vitest'; import type { RealtimeEvent } from '../../src/domain/event.js'; import { STREAM_KEY, parseStreamEntry, passesFilter, publishEvent, subscribeToStream, } from '../../src/lib/event-bus.js'; const silentLogger = pino({ level: 'silent' }); // --------------------------------------------------------------------------- // Unitaires (pas de Redis) // --------------------------------------------------------------------------- describe('parseStreamEntry', () => { it('parse un entry rows.created minimal', () => { const fields = ['type', 'row.created', 'tableId', '42', 'rowId', '100']; const result = parseStreamEntry(fields, silentLogger); expect(result).toEqual({ type: 'row.created', tableId: 42, rowId: 100 }); }); it('parse un entry avec viewId', () => { const fields = ['type', 'view.updated', 'tableId', '5', 'viewId', '200']; const result = parseStreamEntry(fields, silentLogger); expect(result).toEqual({ type: 'view.updated', tableId: 5, viewId: 200 }); }); it('parse un entry avec row JSON', () => { const row = { name: 'Alice', age: 30 }; const fields = [ 'type', 'row.updated', 'tableId', '7', 'rowId', '1', 'row', JSON.stringify(row), ]; const result = parseStreamEntry(fields, silentLogger); expect(result?.row).toEqual(row); }); it('retourne null sur type inconnu', () => { const fields = ['type', 'unknown.event', 'tableId', '1']; const result = parseStreamEntry(fields, silentLogger); expect(result).toBeNull(); }); it('retourne null si tableId manquant', () => { const fields = ['type', 'row.created']; const result = parseStreamEntry(fields, silentLogger); expect(result).toBeNull(); }); it('ignore row JSON invalide et livre l event sans row', () => { const fields = ['type', 'row.created', 'tableId', '1', 'rowId', '2', 'row', 'NOT_JSON']; const result = parseStreamEntry(fields, silentLogger); // L'event est livré sans le payload row malformé. expect(result).not.toBeNull(); expect(result?.row).toBeUndefined(); expect(result?.type).toBe('row.created'); }); }); describe('passesFilter', () => { const event: RealtimeEvent = { type: 'row.created', tableId: 10, viewId: 20, rowId: 5 }; it('filtre vide -> passe toujours', () => { expect(passesFilter(event, {})).toBe(true); expect(passesFilter(event, { tableIds: new Set(), viewIds: new Set() })).toBe(true); }); it('filtre tableIds match -> passe', () => { expect(passesFilter(event, { tableIds: new Set([10]) })).toBe(true); }); it('filtre tableIds no match -> rejecte', () => { expect(passesFilter(event, { tableIds: new Set([99]) })).toBe(false); }); it('filtre viewIds match -> passe', () => { expect(passesFilter(event, { viewIds: new Set([20]) })).toBe(true); }); it('filtre viewIds no match -> rejecte', () => { expect(passesFilter(event, { viewIds: new Set([99]) })).toBe(false); }); it('event sans viewId rejecte sur filtre viewIds strict', () => { const noView: RealtimeEvent = { type: 'row.created', tableId: 10 }; expect(passesFilter(noView, { viewIds: new Set([20]) })).toBe(false); }); it('filtre tableIds + viewIds combinés -> passe ssi les deux matchent', () => { expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([20]) })).toBe(true); expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([99]) })).toBe(false); expect(passesFilter(event, { tableIds: new Set([99]), viewIds: new Set([20]) })).toBe(false); }); }); // --------------------------------------------------------------------------- // Unitaires avec fake Redis (retry logic, xrevrange failure) // --------------------------------------------------------------------------- describe('subscribeToStream — retry logic (fake Redis)', () => { it('retente sur erreur XREAD transitoire et livre les events ensuite', async () => { // Premier xread lance une erreur, second retourne un event. const fields = ['type', 'row.created', 'tableId', '7', 'rowId', '1']; let callCount = 0; const received: Array<{ id: string; event: RealtimeEvent }> = []; const ac = new AbortController(); const fakeRedis = { xrevrange: vi.fn().mockResolvedValue([['999-0', []]]), xread: vi.fn().mockImplementation(async () => { callCount++; if (callCount === 1) throw new Error('ECONNRESET'); // Deuxième appel : retourne l'event (on aborts depuis onEvent après réception). return [['events:stream', [['1000-0', fields]]]]; }), } as unknown as Redis; await subscribeToStream({ redis: fakeRedis, filter: {}, fromId: null, onEvent: async (id, ev) => { received.push({ id, event: ev }); // Abort après avoir reçu l'event pour sortir de la boucle. ac.abort(); }, signal: ac.signal, logger: silentLogger, maxRetries: 3, blockMs: 50, }); // callCount >= 2 : au moins un retry a eu lieu. expect(callCount).toBeGreaterThanOrEqual(2); expect(received).toHaveLength(1); expect(received[0].event.type).toBe('row.created'); }, 10_000); it('throw si max retries depasse', async () => { const ac = new AbortController(); const fakeRedis = { xrevrange: vi.fn().mockResolvedValue([]), xread: vi.fn().mockRejectedValue(new Error('Redis unavailable')), } as unknown as Redis; await expect( subscribeToStream({ redis: fakeRedis, filter: {}, fromId: null, onEvent: async () => {}, signal: ac.signal, logger: silentLogger, maxRetries: 1, blockMs: 10, }), ).rejects.toThrow('Redis unavailable'); }, 10_000); it('sort proprement si le signal est abort avant le premier XREAD', async () => { const ac = new AbortController(); // Abort avant que subscribeToStream ne commence. ac.abort(); const fakeRedis = { xrevrange: vi.fn().mockResolvedValue([]), xread: vi.fn(), } as unknown as Redis; await expect( subscribeToStream({ redis: fakeRedis, filter: {}, fromId: null, onEvent: async () => {}, signal: ac.signal, logger: silentLogger, blockMs: 10, }), ).resolves.toBeUndefined(); // xread ne doit jamais avoir été appelé. expect(fakeRedis.xread).not.toHaveBeenCalled(); }, 5_000); it('sort proprement si le signal est abort pendant le retry backoff', async () => { let callCount = 0; const ac = new AbortController(); const fakeRedis = { xrevrange: vi.fn().mockResolvedValue([]), xread: vi.fn().mockImplementation(async () => { callCount++; // Abort le signal pendant le backoff pour couvrir la branche abort dans le catch. setTimeout(() => ac.abort(), 10); throw new Error('ECONNRESET during retry'); }), } as unknown as Redis; await expect( subscribeToStream({ redis: fakeRedis, filter: {}, fromId: null, onEvent: async () => {}, signal: ac.signal, logger: silentLogger, maxRetries: 5, blockMs: 10, }), ).resolves.toBeUndefined(); // Exactement 1 XREAD avant l'abort (abort coupe le backoff et sort de la boucle). expect(callCount).toBe(1); }, 10_000); it('fromId null avec xrevrange qui throw utilise Date.now() comme cursor', async () => { const ac = new AbortController(); const beforeNow = Date.now(); const fakeRedis = { // xrevrange lance une erreur — couvre le catch lines 182-185. xrevrange: vi.fn().mockRejectedValue(new Error('xrevrange failed')), xread: vi.fn().mockImplementation(async () => { // On vérifie que subscribeToStream a bien initialisé un cursor proche de now. ac.abort(); return null; }), } as unknown as Redis; await subscribeToStream({ redis: fakeRedis, filter: {}, fromId: null, onEvent: async () => {}, signal: ac.signal, logger: silentLogger, blockMs: 10, }); const afterNow = Date.now(); // xread a bien été appelé (curseur initialisé), avec un argument cursor ~ Date.now(). expect(fakeRedis.xread).toHaveBeenCalledOnce(); const xreadArgs = vi.mocked(fakeRedis.xread).mock.calls[0] as unknown[]; // Args : 'COUNT', 100, 'BLOCK', blockMs, 'STREAMS', STREAM_KEY, cursor const cursor = String(xreadArgs[xreadArgs.length - 1]); const ts = Number(cursor.split('-')[0]); expect(ts).toBeGreaterThanOrEqual(beforeNow); expect(ts).toBeLessThanOrEqual(afterNow + 100); }, 5_000); }); // --------------------------------------------------------------------------- // Integration (testcontainers Redis) // --------------------------------------------------------------------------- describe('event-bus integration (Redis Streams)', () => { let container: StartedTestContainer; let redis: Redis; beforeAll(async () => { container = await new GenericContainer('redis:7-alpine').withExposedPorts(6379).start(); const host = container.getHost(); const port = container.getMappedPort(6379); redis = new IORedis(`redis://${host}:${port}`, { lazyConnect: false, maxRetriesPerRequest: 3, }); await new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error('redis ready timeout')), 10_000); if (redis.status === 'ready') { clearTimeout(timer); resolve(); return; } redis.once('ready', () => { clearTimeout(timer); resolve(); }); }); }, 60_000); afterAll(async () => { await redis?.quit(); await container?.stop(); }, 30_000); afterEach(async () => { await redis.del(STREAM_KEY); }); it('publishEvent ajoute une entry au stream et retourne un ID', async () => { const event: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 42 }; const id = await publishEvent(redis, event); expect(id).toMatch(/^\d+-\d+$/); // Vérifie que l'entry est bien dans le stream. const entries = await redis.xrange(STREAM_KEY, '-', '+'); expect(entries).toHaveLength(1); expect(entries[0][0]).toBe(id); }); it('publishEvent sérialise les champs optionnels', async () => { const row = { field1: 'hello', num: 42 }; const event: RealtimeEvent = { type: 'row.updated', tableId: 5, viewId: 10, rowId: 99, row, }; const id = await publishEvent(redis, event); const entries = await redis.xrange(STREAM_KEY, '-', '+'); expect(entries).toHaveLength(1); // Reconstruit le hash depuis les fields. const fields = entries[0][1]; const map: Record = {}; for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1]; expect(map.type).toBe('row.updated'); expect(map.tableId).toBe('5'); expect(map.viewId).toBe('10'); expect(map.rowId).toBe('99'); expect(JSON.parse(map.row)).toEqual(row); // Vérifie que parseStreamEntry round-trip fonctionne. const parsed = parseStreamEntry(fields, silentLogger); expect(parsed).toEqual({ type: 'row.updated', tableId: 5, viewId: 10, rowId: 99, row }); void id; // Utilisé pour la vérification ci-dessus. }); it('MAXLEN borne le stream (approximatif)', async () => { const maxLen = 100; // Publie 500 events pour déclencher le trimming. for (let i = 0; i < 500; i++) { await publishEvent(redis, { type: 'row.created', tableId: i + 1 }, maxLen); } const count = await redis.xlen(STREAM_KEY); // MAXLEN ~ est approximatif — Redis conserve maxLen + quelques entries (radix node boundary). // En pratique, on vérifie que le stream est significativement plus petit que 500. expect(count).toBeLessThan(300); }); it('subscribeToStream livre les events publiés après la connexion', async () => { const received: Array<{ id: string; event: RealtimeEvent }> = []; const ac = new AbortController(); const subscribePromise = subscribeToStream({ redis, filter: {}, fromId: null, onEvent: async (id, event) => { received.push({ id, event }); if (received.length >= 2) ac.abort(); }, signal: ac.signal, logger: silentLogger, // BLOCK court pour que les tests ne bloquent pas 2s par cycle. blockMs: 200, }); // Attend que le subscriber soit prêt (curseur initialisé + 1er XREAD lancé). // 300ms pour absorber la latence réseau testcontainer. await new Promise((r) => setTimeout(r, 300)); const e1: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 1 }; const e2: RealtimeEvent = { type: 'row.updated', tableId: 2, rowId: 2 }; await publishEvent(redis, e1); await publishEvent(redis, e2); await subscribePromise; expect(received).toHaveLength(2); expect(received[0].event.type).toBe('row.created'); expect(received[1].event.type).toBe('row.updated'); }, 15_000); it('subscribeToStream filtre les events par tableIds', async () => { const received: Array<{ id: string; event: RealtimeEvent }> = []; const ac = new AbortController(); const subscribePromise = subscribeToStream({ redis, filter: { tableIds: new Set([42]) }, fromId: null, onEvent: async (id, event) => { received.push({ id, event }); if (received.length >= 1) ac.abort(); }, signal: ac.signal, logger: silentLogger, blockMs: 200, }); await new Promise((r) => setTimeout(r, 300)); // Publie un event hors filtre et un event dans le filtre. await publishEvent(redis, { type: 'row.created', tableId: 99, rowId: 1 }); await publishEvent(redis, { type: 'row.created', tableId: 42, rowId: 2 }); await subscribePromise; expect(received).toHaveLength(1); expect(received[0].event.tableId).toBe(42); }, 15_000); it('subscribeToStream supporte le replay Last-Event-ID', async () => { // Publie 3 events avant la connexion. const id1 = await publishEvent(redis, { type: 'row.created', tableId: 1, rowId: 1 }); const id2 = await publishEvent(redis, { type: 'row.updated', tableId: 1, rowId: 2 }); await publishEvent(redis, { type: 'row.deleted', tableId: 1, rowId: 3 }); // Simule une reconnexion depuis id1 : on doit recevoir id2 et id3. const received: Array<{ id: string; event: RealtimeEvent }> = []; const ac = new AbortController(); await subscribeToStream({ redis, filter: {}, fromId: id1, // Replay depuis id1 exclu (XREAD strict). onEvent: async (id, event) => { received.push({ id, event }); // id2 + id3 = 2 events. if (received.length >= 2) ac.abort(); }, signal: ac.signal, logger: silentLogger, blockMs: 200, }); expect(received).toHaveLength(2); expect(received[0].id).toBe(id2); void id2; // Référencé ci-dessus. }, 15_000); it('subscribeToStream s arrête proprement sur abort signal', async () => { const ac = new AbortController(); let callCount = 0; const subscribePromise = subscribeToStream({ redis, filter: {}, fromId: null, onEvent: async () => { callCount++; }, signal: ac.signal, logger: silentLogger, blockMs: 200, }); // Abort immédiat — la promesse doit résoudre sans erreur. ac.abort(); await expect(subscribePromise).resolves.toBeUndefined(); expect(callCount).toBe(0); }, 10_000); });