/** * Tests de la route GET /api/events/sse — R3.1.b. * * Stratégie : on construit une app Hono de test avec un container fake. * Le `subscribeToStream` est mocké pour injecter des events contrôlés. * On lit le ReadableStream SSE et on vérifie les events reçus. * * Vitest fake timers permettent de déclencher le heartbeat sans attendre 25s. */ import { Hono } from 'hono'; import { logger as honoLogger } from 'hono/logger'; import type { Redis } from 'ioredis'; import pino from 'pino'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import type { RedisCache } from '../../src/adapters/redis-cache.js'; import type { RealtimeEvent } from '../../src/domain/event.js'; import type { Container } from '../../src/lib/container.js'; import { setContainer } from '../../src/lib/container.js'; import { type AuthVariables, authMiddleware } from '../../src/middleware/auth.js'; import { errorHandler } from '../../src/middleware/error-handler.js'; import { eventsRoutes } from '../../src/routes/events.js'; // --------------------------------------------------------------------------- // Mock event-bus — contrôle complet sur subscribeToStream // --------------------------------------------------------------------------- vi.mock('../../src/lib/event-bus.js', async (importOriginal) => { const original = await importOriginal(); return { ...original, subscribeToStream: vi.fn(), }; }); import { subscribeToStream } from '../../src/lib/event-bus.js'; const mockSubscribeToStream = vi.mocked(subscribeToStream); // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- const silentLogger = pino({ level: 'silent' }); const ADMIN_TOKEN = 'brg_admin_events_test'; const NO_SCOPE_TOKEN = 'brg_no_scope_events_test'; function buildFakeRedis(): RedisCache { return { getClient: () => ({}) as Redis, invalidatePattern: async () => 0, checkRateLimit: async () => true, } as unknown as RedisCache; } function buildTestContainer(): Container { const tokensMap = new Map([ [ADMIN_TOKEN, { token: ADMIN_TOKEN, name: 'test-admin', scopes: ['admin:*'] }], [NO_SCOPE_TOKEN, { token: NO_SCOPE_TOKEN, name: 'test-no-scope', scopes: ['read:tables'] }], ]); return { config: { nodeEnv: 'test', port: 0, logLevel: 'fatal', baserowApiUrl: 'http://localhost', baserowApiToken: 'fake', redisUrl: 'redis://localhost', baserowWebhookSecret: 'fake_secret_at_least_16_chars', rateLimitGlobalMax: 10_000, rateLimitGlobalWindow: 60, rateLimitMutationMax: 10_000, rateLimitMutationWindow: 60, streamMaxLen: 10_000, }, baserow: {} as unknown, redis: buildFakeRedis(), repos: {} as unknown, tokens: tokensMap, oidc: null, docmostJwt: null, groupsScopesMap: {}, logger: silentLogger, } as unknown as Container; } function buildEventsApp(): Hono<{ Variables: AuthVariables }> { const container = buildTestContainer(); setContainer(container); const app = new Hono<{ Variables: AuthVariables }>(); app.use('*', honoLogger()); app.onError(errorHandler); const eventsRouter = new Hono<{ Variables: AuthVariables }>(); eventsRouter.use( '*', authMiddleware({ tokens: container.tokens, oidc: null, docmostJwt: null, groupsScopesMap: {}, logger: silentLogger, }), ); eventsRouter.route('/events', eventsRoutes); app.route('/api', eventsRouter); return app; } /** * Lit N events SSE depuis un ReadableStream. Timeout après `timeoutMs` ms. * Retourne les events parsés (lignes `event:` + `data:` extraites). */ async function readSseEvents( stream: ReadableStream, n: number, timeoutMs = 3_000, ): Promise> { const decoder = new TextDecoder(); const events: Array<{ event: string; data: string; id?: string }> = []; const reader = stream.getReader(); let buffer = ''; let currentEvent = ''; let currentData = ''; let currentId: string | undefined; const deadline = Date.now() + timeoutMs; try { while (events.length < n && Date.now() < deadline) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Parse SSE format : blocs séparés par \n\n. const parts = buffer.split('\n\n'); buffer = parts.pop() ?? ''; for (const block of parts) { currentEvent = ''; currentData = ''; currentId = undefined; for (const line of block.split('\n')) { if (line.startsWith('event: ')) currentEvent = line.slice(7); else if (line.startsWith('data: ')) currentData = line.slice(6); else if (line.startsWith('id: ')) currentId = line.slice(4); } if (currentEvent) { events.push({ event: currentEvent, data: currentData, id: currentId }); } } } } finally { reader.cancel(); } return events; } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- describe('GET /api/events/sse', () => { let app: Hono<{ Variables: AuthVariables }>; beforeEach(() => { app = buildEventsApp(); mockSubscribeToStream.mockReset(); }); afterEach(() => { setContainer(null); vi.restoreAllMocks(); }); it('retourne 401 sans auth', async () => { const res = await app.request('/api/events/sse'); expect(res.status).toBe(401); }); it('retourne les headers SSE corrects sur connexion authentifiée', async () => { // subscribeToStream se termine immédiatement (pas d'events). mockSubscribeToStream.mockResolvedValue(undefined); const res = await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); expect(res.status).toBe(200); expect(res.headers.get('Content-Type')).toContain('text/event-stream'); expect(res.headers.get('Cache-Control')).toBe('no-cache'); expect(res.headers.get('Connection')).toBe('keep-alive'); }); it('livre les events publiés via subscribeToStream', async () => { const events: RealtimeEvent[] = [ { type: 'row.created', tableId: 1, rowId: 10 }, { type: 'row.updated', tableId: 1, rowId: 10, row: { name: 'Alice' } }, ]; mockSubscribeToStream.mockImplementation(async (opts) => { for (let i = 0; i < events.length; i++) { await opts.onEvent(`event-id-${i + 1}`, events[i]); } }); const res = await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); expect(res.status).toBe(200); if (!res.body) throw new Error('SSE response has no body'); const received = await readSseEvents(res.body, 2); expect(received).toHaveLength(2); expect(received[0].event).toBe('row.created'); expect(received[0].id).toBe('event-id-1'); const data0 = JSON.parse(received[0].data); expect(data0.tableId).toBe(1); expect(data0.rowId).toBe(10); expect(received[1].event).toBe('row.updated'); const data1 = JSON.parse(received[1].data); expect(data1.row).toEqual({ name: 'Alice' }); }); it('passe le filtre ?tables= à subscribeToStream', async () => { mockSubscribeToStream.mockResolvedValue(undefined); await app.request('/api/events/sse?tables=10,20', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); expect(mockSubscribeToStream).toHaveBeenCalledOnce(); const callOpts = mockSubscribeToStream.mock.calls[0][0]; expect(callOpts.filter.tableIds).toEqual(new Set([10, 20])); }); it('passe le filtre ?views= à subscribeToStream', async () => { mockSubscribeToStream.mockResolvedValue(undefined); await app.request('/api/events/sse?views=5,6', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); const callOpts = mockSubscribeToStream.mock.calls[0][0]; expect(callOpts.filter.viewIds).toEqual(new Set([5, 6])); }); it('passe Last-Event-ID à subscribeToStream comme fromId', async () => { mockSubscribeToStream.mockResolvedValue(undefined); await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}`, 'Last-Event-ID': '1234567890-0', }, }); const callOpts = mockSubscribeToStream.mock.calls[0][0]; expect(callOpts.fromId).toBe('1234567890-0'); }); it('fromId est null quand pas de Last-Event-ID', async () => { mockSubscribeToStream.mockResolvedValue(undefined); await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); const callOpts = mockSubscribeToStream.mock.calls[0][0]; expect(callOpts.fromId).toBeNull(); }); it('envoie un event error SSE si subscribeToStream throw une erreur fatale', async () => { mockSubscribeToStream.mockRejectedValue(new Error('Redis max retries exceeded')); const res = await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); expect(res.status).toBe(200); // SSE stream reste 200 même en cas d'erreur. if (!res.body) throw new Error('SSE response has no body'); const received = await readSseEvents(res.body, 1, 2_000); const errorEvent = received.find((e) => e.event === 'error'); expect(errorEvent).toBeDefined(); const data = JSON.parse(errorEvent?.data ?? '{}'); expect(data.message).toContain('reconnect'); }); it('heartbeat : vérifie que subscribeToStream reçoit un signal valide', async () => { // On ne peut pas tester le heartbeat sans fake timers sur setInterval. // On vérifie que l'abort signal est proprement transmis et que la fn est appelée. mockSubscribeToStream.mockImplementation(async (opts) => { // Vérifie que le signal est un AbortSignal. expect(opts.signal).toBeInstanceOf(AbortSignal); }); await app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); expect(mockSubscribeToStream).toHaveBeenCalledOnce(); }); it('filtre ?tables= avec valeurs invalides ignorées', async () => { mockSubscribeToStream.mockResolvedValue(undefined); await app.request('/api/events/sse?tables=10,abc,0,-5,20', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); const callOpts = mockSubscribeToStream.mock.calls[0][0]; // Seules les valeurs entières positives sont conservées. expect(callOpts.filter.tableIds).toEqual(new Set([10, 20])); }); it('heartbeat envoie un event ping via setInterval', async () => { // Utilise les fake timers pour déclencher setInterval sans attendre 25s réels. vi.useFakeTimers(); // subscribeToStream bloque le temps que l'on avance les timers + reçoit le ping. let resolveSubscribe!: () => void; const subscribePromise = new Promise((resolve) => { resolveSubscribe = resolve; }); mockSubscribeToStream.mockImplementation(async () => { // Attend que le test avance les timers. await subscribePromise; }); // Collecte les events SSE reçus en parallèle. const responsePromise = app.request('/api/events/sse', { headers: { Authorization: `Bearer ${ADMIN_TOKEN}` }, }); // Avance les timers de 25s pour déclencher le heartbeat. await vi.advanceTimersByTimeAsync(25_000); // Libère subscribeToStream pour fermer le stream proprement. resolveSubscribe(); const res = await responsePromise; vi.useRealTimers(); expect(res.status).toBe(200); if (!res.body) throw new Error('SSE response has no body'); const received = await readSseEvents(res.body, 1, 2_000); const pingEvent = received.find((e) => e.event === 'ping'); expect(pingEvent).toBeDefined(); }); });