Wiki/bridge/tests/routes/events.test.ts
Corentin JOGUET c998c0d761
Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions
feat(bridge): add SSE realtime stream for R3.1.b database-view live updates
Redis Streams pub/sub (XADD/XREAD BLOCK) with Last-Event-ID replay, bounded
backpressure queue, 25s heartbeat, and full retry/abort handling. Publishes
RealtimeEvents from Baserow webhook handler after cache invalidation. 380 tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-07 23:55:06 +02:00

357 lines
12 KiB
TypeScript

/**
* Tests de la route GET /api/events/sse — R3.1.b.
*
* Stratégie : on construit une app Hono de test avec un container fake.
* Le `subscribeToStream` est mocké pour injecter des events contrôlés.
* On lit le ReadableStream SSE et on vérifie les events reçus.
*
* Vitest fake timers permettent de déclencher le heartbeat sans attendre 25s.
*/
import { Hono } from 'hono';
import { logger as honoLogger } from 'hono/logger';
import type { Redis } from 'ioredis';
import pino from 'pino';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type { RedisCache } from '../../src/adapters/redis-cache.js';
import type { RealtimeEvent } from '../../src/domain/event.js';
import type { Container } from '../../src/lib/container.js';
import { setContainer } from '../../src/lib/container.js';
import { type AuthVariables, authMiddleware } from '../../src/middleware/auth.js';
import { errorHandler } from '../../src/middleware/error-handler.js';
import { eventsRoutes } from '../../src/routes/events.js';
// ---------------------------------------------------------------------------
// Mock event-bus — contrôle complet sur subscribeToStream
// ---------------------------------------------------------------------------
vi.mock('../../src/lib/event-bus.js', async (importOriginal) => {
const original = await importOriginal<typeof import('../../src/lib/event-bus.js')>();
return {
...original,
subscribeToStream: vi.fn(),
};
});
import { subscribeToStream } from '../../src/lib/event-bus.js';
const mockSubscribeToStream = vi.mocked(subscribeToStream);
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
const silentLogger = pino({ level: 'silent' });
const ADMIN_TOKEN = 'brg_admin_events_test';
const NO_SCOPE_TOKEN = 'brg_no_scope_events_test';
function buildFakeRedis(): RedisCache {
return {
getClient: () => ({}) as Redis,
invalidatePattern: async () => 0,
checkRateLimit: async () => true,
} as unknown as RedisCache;
}
function buildTestContainer(): Container {
const tokensMap = new Map([
[ADMIN_TOKEN, { token: ADMIN_TOKEN, name: 'test-admin', scopes: ['admin:*'] }],
[NO_SCOPE_TOKEN, { token: NO_SCOPE_TOKEN, name: 'test-no-scope', scopes: ['read:tables'] }],
]);
return {
config: {
nodeEnv: 'test',
port: 0,
logLevel: 'fatal',
baserowApiUrl: 'http://localhost',
baserowApiToken: 'fake',
redisUrl: 'redis://localhost',
baserowWebhookSecret: 'fake_secret_at_least_16_chars',
rateLimitGlobalMax: 10_000,
rateLimitGlobalWindow: 60,
rateLimitMutationMax: 10_000,
rateLimitMutationWindow: 60,
streamMaxLen: 10_000,
},
baserow: {} as unknown,
redis: buildFakeRedis(),
repos: {} as unknown,
tokens: tokensMap,
oidc: null,
docmostJwt: null,
groupsScopesMap: {},
logger: silentLogger,
} as unknown as Container;
}
function buildEventsApp(): Hono<{ Variables: AuthVariables }> {
const container = buildTestContainer();
setContainer(container);
const app = new Hono<{ Variables: AuthVariables }>();
app.use('*', honoLogger());
app.onError(errorHandler);
const eventsRouter = new Hono<{ Variables: AuthVariables }>();
eventsRouter.use(
'*',
authMiddleware({
tokens: container.tokens,
oidc: null,
docmostJwt: null,
groupsScopesMap: {},
logger: silentLogger,
}),
);
eventsRouter.route('/events', eventsRoutes);
app.route('/api', eventsRouter);
return app;
}
/**
* Lit N events SSE depuis un ReadableStream. Timeout après `timeoutMs` ms.
* Retourne les events parsés (lignes `event:` + `data:` extraites).
*/
async function readSseEvents(
stream: ReadableStream<Uint8Array>,
n: number,
timeoutMs = 3_000,
): Promise<Array<{ event: string; data: string; id?: string }>> {
const decoder = new TextDecoder();
const events: Array<{ event: string; data: string; id?: string }> = [];
const reader = stream.getReader();
let buffer = '';
let currentEvent = '';
let currentData = '';
let currentId: string | undefined;
const deadline = Date.now() + timeoutMs;
try {
while (events.length < n && Date.now() < deadline) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE format : blocs séparés par \n\n.
const parts = buffer.split('\n\n');
buffer = parts.pop() ?? '';
for (const block of parts) {
currentEvent = '';
currentData = '';
currentId = undefined;
for (const line of block.split('\n')) {
if (line.startsWith('event: ')) currentEvent = line.slice(7);
else if (line.startsWith('data: ')) currentData = line.slice(6);
else if (line.startsWith('id: ')) currentId = line.slice(4);
}
if (currentEvent) {
events.push({ event: currentEvent, data: currentData, id: currentId });
}
}
}
} finally {
reader.cancel();
}
return events;
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('GET /api/events/sse', () => {
let app: Hono<{ Variables: AuthVariables }>;
beforeEach(() => {
app = buildEventsApp();
mockSubscribeToStream.mockReset();
});
afterEach(() => {
setContainer(null);
vi.restoreAllMocks();
});
it('retourne 401 sans auth', async () => {
const res = await app.request('/api/events/sse');
expect(res.status).toBe(401);
});
it('retourne les headers SSE corrects sur connexion authentifiée', async () => {
// subscribeToStream se termine immédiatement (pas d'events).
mockSubscribeToStream.mockResolvedValue(undefined);
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200);
expect(res.headers.get('Content-Type')).toContain('text/event-stream');
expect(res.headers.get('Cache-Control')).toBe('no-cache');
expect(res.headers.get('Connection')).toBe('keep-alive');
});
it('livre les events publiés via subscribeToStream', async () => {
const events: RealtimeEvent[] = [
{ type: 'row.created', tableId: 1, rowId: 10 },
{ type: 'row.updated', tableId: 1, rowId: 10, row: { name: 'Alice' } },
];
mockSubscribeToStream.mockImplementation(async (opts) => {
for (let i = 0; i < events.length; i++) {
await opts.onEvent(`event-id-${i + 1}`, events[i]);
}
});
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200);
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 2);
expect(received).toHaveLength(2);
expect(received[0].event).toBe('row.created');
expect(received[0].id).toBe('event-id-1');
const data0 = JSON.parse(received[0].data);
expect(data0.tableId).toBe(1);
expect(data0.rowId).toBe(10);
expect(received[1].event).toBe('row.updated');
const data1 = JSON.parse(received[1].data);
expect(data1.row).toEqual({ name: 'Alice' });
});
it('passe le filtre ?tables= à subscribeToStream', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?tables=10,20', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
});
it('passe le filtre ?views= à subscribeToStream', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?views=5,6', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.filter.viewIds).toEqual(new Set([5, 6]));
});
it('passe Last-Event-ID à subscribeToStream comme fromId', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse', {
headers: {
Authorization: `Bearer ${ADMIN_TOKEN}`,
'Last-Event-ID': '1234567890-0',
},
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.fromId).toBe('1234567890-0');
});
it('fromId est null quand pas de Last-Event-ID', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.fromId).toBeNull();
});
it('envoie un event error SSE si subscribeToStream throw une erreur fatale', async () => {
mockSubscribeToStream.mockRejectedValue(new Error('Redis max retries exceeded'));
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200); // SSE stream reste 200 même en cas d'erreur.
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 1, 2_000);
const errorEvent = received.find((e) => e.event === 'error');
expect(errorEvent).toBeDefined();
const data = JSON.parse(errorEvent?.data ?? '{}');
expect(data.message).toContain('reconnect');
});
it('heartbeat : vérifie que subscribeToStream reçoit un signal valide', async () => {
// On ne peut pas tester le heartbeat sans fake timers sur setInterval.
// On vérifie que l'abort signal est proprement transmis et que la fn est appelée.
mockSubscribeToStream.mockImplementation(async (opts) => {
// Vérifie que le signal est un AbortSignal.
expect(opts.signal).toBeInstanceOf(AbortSignal);
});
await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
});
it('filtre ?tables= avec valeurs invalides ignorées', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?tables=10,abc,0,-5,20', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
// Seules les valeurs entières positives sont conservées.
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
});
it('heartbeat envoie un event ping via setInterval', async () => {
// Utilise les fake timers pour déclencher setInterval sans attendre 25s réels.
vi.useFakeTimers();
// subscribeToStream bloque le temps que l'on avance les timers + reçoit le ping.
let resolveSubscribe!: () => void;
const subscribePromise = new Promise<void>((resolve) => {
resolveSubscribe = resolve;
});
mockSubscribeToStream.mockImplementation(async () => {
// Attend que le test avance les timers.
await subscribePromise;
});
// Collecte les events SSE reçus en parallèle.
const responsePromise = app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
// Avance les timers de 25s pour déclencher le heartbeat.
await vi.advanceTimersByTimeAsync(25_000);
// Libère subscribeToStream pour fermer le stream proprement.
resolveSubscribe();
const res = await responsePromise;
vi.useRealTimers();
expect(res.status).toBe(200);
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 1, 2_000);
const pingEvent = received.find((e) => e.event === 'ping');
expect(pingEvent).toBeDefined();
});
});