Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions
Redis Streams pub/sub (XADD/XREAD BLOCK) with Last-Event-ID replay, bounded backpressure queue, 25s heartbeat, and full retry/abort handling. Publishes RealtimeEvents from Baserow webhook handler after cache invalidation. 380 tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
357 lines
12 KiB
TypeScript
357 lines
12 KiB
TypeScript
/**
|
|
* Tests de la route GET /api/events/sse — R3.1.b.
|
|
*
|
|
* Stratégie : on construit une app Hono de test avec un container fake.
|
|
* Le `subscribeToStream` est mocké pour injecter des events contrôlés.
|
|
* On lit le ReadableStream SSE et on vérifie les events reçus.
|
|
*
|
|
* Vitest fake timers permettent de déclencher le heartbeat sans attendre 25s.
|
|
*/
|
|
|
|
import { Hono } from 'hono';
|
|
import { logger as honoLogger } from 'hono/logger';
|
|
import type { Redis } from 'ioredis';
|
|
import pino from 'pino';
|
|
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
|
import type { RedisCache } from '../../src/adapters/redis-cache.js';
|
|
import type { RealtimeEvent } from '../../src/domain/event.js';
|
|
import type { Container } from '../../src/lib/container.js';
|
|
import { setContainer } from '../../src/lib/container.js';
|
|
import { type AuthVariables, authMiddleware } from '../../src/middleware/auth.js';
|
|
import { errorHandler } from '../../src/middleware/error-handler.js';
|
|
import { eventsRoutes } from '../../src/routes/events.js';
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Mock event-bus — contrôle complet sur subscribeToStream
|
|
// ---------------------------------------------------------------------------
|
|
|
|
vi.mock('../../src/lib/event-bus.js', async (importOriginal) => {
|
|
const original = await importOriginal<typeof import('../../src/lib/event-bus.js')>();
|
|
return {
|
|
...original,
|
|
subscribeToStream: vi.fn(),
|
|
};
|
|
});
|
|
|
|
import { subscribeToStream } from '../../src/lib/event-bus.js';
|
|
const mockSubscribeToStream = vi.mocked(subscribeToStream);
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const silentLogger = pino({ level: 'silent' });
|
|
|
|
const ADMIN_TOKEN = 'brg_admin_events_test';
|
|
const NO_SCOPE_TOKEN = 'brg_no_scope_events_test';
|
|
|
|
function buildFakeRedis(): RedisCache {
|
|
return {
|
|
getClient: () => ({}) as Redis,
|
|
invalidatePattern: async () => 0,
|
|
checkRateLimit: async () => true,
|
|
} as unknown as RedisCache;
|
|
}
|
|
|
|
function buildTestContainer(): Container {
|
|
const tokensMap = new Map([
|
|
[ADMIN_TOKEN, { token: ADMIN_TOKEN, name: 'test-admin', scopes: ['admin:*'] }],
|
|
[NO_SCOPE_TOKEN, { token: NO_SCOPE_TOKEN, name: 'test-no-scope', scopes: ['read:tables'] }],
|
|
]);
|
|
|
|
return {
|
|
config: {
|
|
nodeEnv: 'test',
|
|
port: 0,
|
|
logLevel: 'fatal',
|
|
baserowApiUrl: 'http://localhost',
|
|
baserowApiToken: 'fake',
|
|
redisUrl: 'redis://localhost',
|
|
baserowWebhookSecret: 'fake_secret_at_least_16_chars',
|
|
rateLimitGlobalMax: 10_000,
|
|
rateLimitGlobalWindow: 60,
|
|
rateLimitMutationMax: 10_000,
|
|
rateLimitMutationWindow: 60,
|
|
streamMaxLen: 10_000,
|
|
},
|
|
baserow: {} as unknown,
|
|
redis: buildFakeRedis(),
|
|
repos: {} as unknown,
|
|
tokens: tokensMap,
|
|
oidc: null,
|
|
docmostJwt: null,
|
|
groupsScopesMap: {},
|
|
logger: silentLogger,
|
|
} as unknown as Container;
|
|
}
|
|
|
|
function buildEventsApp(): Hono<{ Variables: AuthVariables }> {
|
|
const container = buildTestContainer();
|
|
setContainer(container);
|
|
|
|
const app = new Hono<{ Variables: AuthVariables }>();
|
|
app.use('*', honoLogger());
|
|
app.onError(errorHandler);
|
|
|
|
const eventsRouter = new Hono<{ Variables: AuthVariables }>();
|
|
eventsRouter.use(
|
|
'*',
|
|
authMiddleware({
|
|
tokens: container.tokens,
|
|
oidc: null,
|
|
docmostJwt: null,
|
|
groupsScopesMap: {},
|
|
logger: silentLogger,
|
|
}),
|
|
);
|
|
eventsRouter.route('/events', eventsRoutes);
|
|
app.route('/api', eventsRouter);
|
|
|
|
return app;
|
|
}
|
|
|
|
/**
|
|
* Lit N events SSE depuis un ReadableStream. Timeout après `timeoutMs` ms.
|
|
* Retourne les events parsés (lignes `event:` + `data:` extraites).
|
|
*/
|
|
async function readSseEvents(
|
|
stream: ReadableStream<Uint8Array>,
|
|
n: number,
|
|
timeoutMs = 3_000,
|
|
): Promise<Array<{ event: string; data: string; id?: string }>> {
|
|
const decoder = new TextDecoder();
|
|
const events: Array<{ event: string; data: string; id?: string }> = [];
|
|
const reader = stream.getReader();
|
|
|
|
let buffer = '';
|
|
let currentEvent = '';
|
|
let currentData = '';
|
|
let currentId: string | undefined;
|
|
|
|
const deadline = Date.now() + timeoutMs;
|
|
|
|
try {
|
|
while (events.length < n && Date.now() < deadline) {
|
|
const { value, done } = await reader.read();
|
|
if (done) break;
|
|
buffer += decoder.decode(value, { stream: true });
|
|
|
|
// Parse SSE format : blocs séparés par \n\n.
|
|
const parts = buffer.split('\n\n');
|
|
buffer = parts.pop() ?? '';
|
|
|
|
for (const block of parts) {
|
|
currentEvent = '';
|
|
currentData = '';
|
|
currentId = undefined;
|
|
for (const line of block.split('\n')) {
|
|
if (line.startsWith('event: ')) currentEvent = line.slice(7);
|
|
else if (line.startsWith('data: ')) currentData = line.slice(6);
|
|
else if (line.startsWith('id: ')) currentId = line.slice(4);
|
|
}
|
|
if (currentEvent) {
|
|
events.push({ event: currentEvent, data: currentData, id: currentId });
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
reader.cancel();
|
|
}
|
|
|
|
return events;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Tests
|
|
// ---------------------------------------------------------------------------
|
|
|
|
describe('GET /api/events/sse', () => {
|
|
let app: Hono<{ Variables: AuthVariables }>;
|
|
|
|
beforeEach(() => {
|
|
app = buildEventsApp();
|
|
mockSubscribeToStream.mockReset();
|
|
});
|
|
|
|
afterEach(() => {
|
|
setContainer(null);
|
|
vi.restoreAllMocks();
|
|
});
|
|
|
|
it('retourne 401 sans auth', async () => {
|
|
const res = await app.request('/api/events/sse');
|
|
expect(res.status).toBe(401);
|
|
});
|
|
|
|
it('retourne les headers SSE corrects sur connexion authentifiée', async () => {
|
|
// subscribeToStream se termine immédiatement (pas d'events).
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
const res = await app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
expect(res.status).toBe(200);
|
|
expect(res.headers.get('Content-Type')).toContain('text/event-stream');
|
|
expect(res.headers.get('Cache-Control')).toBe('no-cache');
|
|
expect(res.headers.get('Connection')).toBe('keep-alive');
|
|
});
|
|
|
|
it('livre les events publiés via subscribeToStream', async () => {
|
|
const events: RealtimeEvent[] = [
|
|
{ type: 'row.created', tableId: 1, rowId: 10 },
|
|
{ type: 'row.updated', tableId: 1, rowId: 10, row: { name: 'Alice' } },
|
|
];
|
|
|
|
mockSubscribeToStream.mockImplementation(async (opts) => {
|
|
for (let i = 0; i < events.length; i++) {
|
|
await opts.onEvent(`event-id-${i + 1}`, events[i]);
|
|
}
|
|
});
|
|
|
|
const res = await app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
expect(res.status).toBe(200);
|
|
|
|
if (!res.body) throw new Error('SSE response has no body');
|
|
const received = await readSseEvents(res.body, 2);
|
|
expect(received).toHaveLength(2);
|
|
expect(received[0].event).toBe('row.created');
|
|
expect(received[0].id).toBe('event-id-1');
|
|
const data0 = JSON.parse(received[0].data);
|
|
expect(data0.tableId).toBe(1);
|
|
expect(data0.rowId).toBe(10);
|
|
|
|
expect(received[1].event).toBe('row.updated');
|
|
const data1 = JSON.parse(received[1].data);
|
|
expect(data1.row).toEqual({ name: 'Alice' });
|
|
});
|
|
|
|
it('passe le filtre ?tables= à subscribeToStream', async () => {
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
await app.request('/api/events/sse?tables=10,20', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
|
|
const callOpts = mockSubscribeToStream.mock.calls[0][0];
|
|
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
|
|
});
|
|
|
|
it('passe le filtre ?views= à subscribeToStream', async () => {
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
await app.request('/api/events/sse?views=5,6', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
const callOpts = mockSubscribeToStream.mock.calls[0][0];
|
|
expect(callOpts.filter.viewIds).toEqual(new Set([5, 6]));
|
|
});
|
|
|
|
it('passe Last-Event-ID à subscribeToStream comme fromId', async () => {
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
await app.request('/api/events/sse', {
|
|
headers: {
|
|
Authorization: `Bearer ${ADMIN_TOKEN}`,
|
|
'Last-Event-ID': '1234567890-0',
|
|
},
|
|
});
|
|
|
|
const callOpts = mockSubscribeToStream.mock.calls[0][0];
|
|
expect(callOpts.fromId).toBe('1234567890-0');
|
|
});
|
|
|
|
it('fromId est null quand pas de Last-Event-ID', async () => {
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
await app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
const callOpts = mockSubscribeToStream.mock.calls[0][0];
|
|
expect(callOpts.fromId).toBeNull();
|
|
});
|
|
|
|
it('envoie un event error SSE si subscribeToStream throw une erreur fatale', async () => {
|
|
mockSubscribeToStream.mockRejectedValue(new Error('Redis max retries exceeded'));
|
|
|
|
const res = await app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
expect(res.status).toBe(200); // SSE stream reste 200 même en cas d'erreur.
|
|
|
|
if (!res.body) throw new Error('SSE response has no body');
|
|
const received = await readSseEvents(res.body, 1, 2_000);
|
|
const errorEvent = received.find((e) => e.event === 'error');
|
|
expect(errorEvent).toBeDefined();
|
|
const data = JSON.parse(errorEvent?.data ?? '{}');
|
|
expect(data.message).toContain('reconnect');
|
|
});
|
|
|
|
it('heartbeat : vérifie que subscribeToStream reçoit un signal valide', async () => {
|
|
// On ne peut pas tester le heartbeat sans fake timers sur setInterval.
|
|
// On vérifie que l'abort signal est proprement transmis et que la fn est appelée.
|
|
mockSubscribeToStream.mockImplementation(async (opts) => {
|
|
// Vérifie que le signal est un AbortSignal.
|
|
expect(opts.signal).toBeInstanceOf(AbortSignal);
|
|
});
|
|
|
|
await app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
|
|
});
|
|
|
|
it('filtre ?tables= avec valeurs invalides ignorées', async () => {
|
|
mockSubscribeToStream.mockResolvedValue(undefined);
|
|
|
|
await app.request('/api/events/sse?tables=10,abc,0,-5,20', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
const callOpts = mockSubscribeToStream.mock.calls[0][0];
|
|
// Seules les valeurs entières positives sont conservées.
|
|
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
|
|
});
|
|
|
|
it('heartbeat envoie un event ping via setInterval', async () => {
|
|
// Utilise les fake timers pour déclencher setInterval sans attendre 25s réels.
|
|
vi.useFakeTimers();
|
|
|
|
// subscribeToStream bloque le temps que l'on avance les timers + reçoit le ping.
|
|
let resolveSubscribe!: () => void;
|
|
const subscribePromise = new Promise<void>((resolve) => {
|
|
resolveSubscribe = resolve;
|
|
});
|
|
|
|
mockSubscribeToStream.mockImplementation(async () => {
|
|
// Attend que le test avance les timers.
|
|
await subscribePromise;
|
|
});
|
|
|
|
// Collecte les events SSE reçus en parallèle.
|
|
const responsePromise = app.request('/api/events/sse', {
|
|
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
|
|
});
|
|
|
|
// Avance les timers de 25s pour déclencher le heartbeat.
|
|
await vi.advanceTimersByTimeAsync(25_000);
|
|
|
|
// Libère subscribeToStream pour fermer le stream proprement.
|
|
resolveSubscribe();
|
|
const res = await responsePromise;
|
|
|
|
vi.useRealTimers();
|
|
|
|
expect(res.status).toBe(200);
|
|
if (!res.body) throw new Error('SSE response has no body');
|
|
const received = await readSseEvents(res.body, 1, 2_000);
|
|
const pingEvent = received.find((e) => e.event === 'ping');
|
|
expect(pingEvent).toBeDefined();
|
|
});
|
|
});
|