feat(bridge): add SSE realtime stream for R3.1.b database-view live updates
Some checks are pending
CI / Lint bridge (Biome) (push) Waiting to run
CI / Type-check bridge (push) Blocked by required conditions
CI / Tests unit bridge (push) Blocked by required conditions
CI / Tests integration bridge (push) Blocked by required conditions
CI / Security scan (push) Waiting to run
CI / Docker build + healthcheck (push) Blocked by required conditions

Redis Streams pub/sub (XADD/XREAD BLOCK) with Last-Event-ID replay, bounded
backpressure queue, 25s heartbeat, and full retry/abort handling. Publishes
RealtimeEvents from Baserow webhook handler after cache invalidation. 380 tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Corentin JOGUET 2026-05-07 23:55:06 +02:00
parent 2ac2eccf9a
commit c998c0d761
14 changed files with 1709 additions and 9 deletions

View file

@ -118,4 +118,13 @@ export class RedisCache {
async close(): Promise<void> {
await this.client.quit();
}
/**
* Expose le client Redis sous-jacent pour les usages avancés qui nécessitent
* des commandes non exposées par RedisCache (ex: XADD/XREAD pour event-bus).
* Réservé aux modules internes du bridge ne pas exposer côté HTTP.
*/
getClient(): Redis {
return this.client;
}
}

View file

@ -0,0 +1,47 @@
/**
* RealtimeEvent schema normalise des events publiés sur le Redis Stream et
* exposés via SSE.
*
* Le schema est volontairement aligné avec ce que Baserow envoie en webhook
* (cf webhooks/types.ts) mais normalized : camelCase, tableId/viewId/rowId
* en number, payload row opaque.
*
* Les events view.* sont émis quand un utilisateur modifie les filtres/sorts/
* groupBys d'une vue utile pour invalider les renderers côté client.
*/
import { z } from 'zod';
export const REALTIME_EVENT_TYPES = [
'row.created',
'row.updated',
'row.deleted',
'view.created',
'view.updated',
'view.deleted',
'table.updated',
] as const;
export type RealtimeEventType = (typeof REALTIME_EVENT_TYPES)[number];
/**
* Payload complet d'un event realtime. Tous les champs sont optionnels sauf
* `type` et `tableId` certains events (table.updated) n'ont pas de viewId ni
* de rowId.
*/
export const RealtimeEventSchema = z.object({
/** Discriminant. */
type: z.enum(REALTIME_EVENT_TYPES),
tableId: z.number().int().positive(),
/** Présent pour les events row.* et view.*. */
viewId: z.number().int().positive().optional(),
/** Présent pour les events row.* uniquement. */
rowId: z.number().int().positive().optional(),
/**
* Payload brut de la row (fields opaque). Présent sur row.created et
* row.updated quand Baserow fournit les items. Absent sur row.deleted.
*/
row: z.record(z.unknown()).optional(),
});
export type RealtimeEvent = z.infer<typeof RealtimeEventSchema>;

View file

@ -17,6 +17,7 @@ import { logger } from './lib/logger.js';
import { type AuthVariables, authMiddleware } from './middleware/auth.js';
import { errorHandler } from './middleware/error-handler.js';
import { defaultRateLimitKey, rateLimit } from './middleware/rate-limit.js';
import { eventsRoutes } from './routes/events.js';
import { tablesRoutes } from './routes/tables.js';
import { viewsRoutes } from './routes/views.js';
import { webhooksRoutes } from './routes/webhooks.js';
@ -84,6 +85,22 @@ export async function buildApp(): Promise<Hono<{ Variables: AuthVariables }>> {
v1.route('/views', viewsRoutes);
app.route('/api/v1', v1);
// R3.1.b : SSE realtime stream — monté sur /api/events hors /api/v1 pour
// éviter le rate limit mutation. Utilise le même authMiddleware que /api/v1.
const eventsRouter = new Hono<{ Variables: AuthVariables }>();
eventsRouter.use(
'*',
authMiddleware({
tokens: ctn.tokens,
oidc: ctn.oidc,
docmostJwt: ctn.docmostJwt,
groupsScopesMap: ctn.groupsScopesMap,
logger: ctn.logger,
}),
);
eventsRouter.route('/events', eventsRoutes);
app.route('/api', eventsRouter);
app.notFound((c) => c.json({ error: { code: 'NOT_FOUND', message: 'Route not found' } }, 404));
return app;

View file

@ -35,6 +35,10 @@ const ConfigSchema = z.object({
rateLimitGlobalWindow: z.coerce.number().int().positive().default(60),
rateLimitMutationMax: z.coerce.number().int().positive().default(30),
rateLimitMutationWindow: z.coerce.number().int().positive().default(60),
// R3.1.b : Redis Streams event-bus. STREAM_MAXLEN borne la mémoire du stream
// global (XADD MAXLEN ~). Défaut 10 000 events. Les events plus anciens sont
// purgés automatiquement par Redis (mode ~ = approximatif, plus performant).
streamMaxLen: z.coerce.number().int().positive().default(10_000),
});
export type Config = z.infer<typeof ConfigSchema>;
@ -63,6 +67,7 @@ export function loadConfig(): Config {
rateLimitGlobalWindow: process.env.RATE_LIMIT_GLOBAL_WINDOW,
rateLimitMutationMax: process.env.RATE_LIMIT_MUTATION_MAX,
rateLimitMutationWindow: process.env.RATE_LIMIT_MUTATION_WINDOW,
streamMaxLen: process.env.STREAM_MAXLEN,
});
if (!parsed.success) {

243
bridge/src/lib/event-bus.ts Normal file
View file

@ -0,0 +1,243 @@
/**
* EventBus abstraction publish/subscribe sur Redis Streams (XADD/XREAD).
*
* Choix Redis Streams vs Pub/Sub :
* - Streams (XADD/XREAD) permettent le REPLAY via Last-Event-ID (reconnexion
* SSE). Pub/Sub est fire-and-forget : un client déconnecté 2s perd tous les
* events émis pendant son absence. Pour un Notion-like multi-onglets, la
* reconnexion sans perte est essentielle (prod-like).
* - MAXLEN borne la mémoire. Default 10 000 events, surchargeables via
* STREAM_MAXLEN env var.
* - Single stream global `events:stream` le filtrage tables/views se fait
* côté consumer. On évite la prolifération de streams par table (ioredis
* gère 1 connexion XREAD bloquante ; fan-out par table nécessiterait N
* connexions ou un consumer group complexe).
*
* Format Last-Event-ID : ID Redis Streams natif (`<timestamp>-<seq>`) passé
* en pass-through. Le client SSE renvoie ce même ID dans le header
* `Last-Event-ID` pour rejouer depuis ce point. Simple, pas de mapping custom.
*
* Format des champs XADD : tous scalaires (ioredis sérialise les valeurs en
* string). On JSON-encode `row` dans un champ dédié pour éviter un niveau
* d'imbrication.
*/
import type { Redis } from 'ioredis';
import type { Logger } from 'pino';
import { type RealtimeEvent, RealtimeEventSchema } from '../domain/event.js';
export const STREAM_KEY = 'events:stream';
/** Taille maximale par défaut du stream (XADD MAXLEN ~). */
const DEFAULT_MAXLEN = 10_000;
/** Timeout XREAD bloquant en ms. 0 = bloquer indéfiniment, mais on préfère
* une valeur courte pour pouvoir réagir à l'abort signal. */
const XREAD_BLOCK_MS = 2_000;
export interface EventBusOptions {
redis: Redis;
logger: Logger;
/** Surcharge STREAM_MAXLEN. */
maxLen?: number;
}
export interface EventFilter {
/** Si fourni, ne livrer que les events dont tableId est dans ce set. */
tableIds?: Set<number>;
/** Si fourni, ne livrer que les events dont viewId est dans ce set. */
viewIds?: Set<number>;
}
/**
* Publish un RealtimeEvent sur le stream Redis.
* XADD avec MAXLEN ~ (approximatif, évite le scan complet à chaque write).
*/
export async function publishEvent(
redis: Redis,
event: RealtimeEvent,
maxLen = DEFAULT_MAXLEN,
): Promise<string> {
// Sérialise row en JSON string pour stocker dans le hash Redis Streams.
const fields: string[] = ['type', event.type, 'tableId', String(event.tableId)];
if (event.viewId !== undefined) {
fields.push('viewId', String(event.viewId));
}
if (event.rowId !== undefined) {
fields.push('rowId', String(event.rowId));
}
if (event.row !== undefined) {
fields.push('row', JSON.stringify(event.row));
}
// XADD MAXLEN ~ <maxLen> * <fields> — '*' auto-génère l'ID.
// ioredis type XADD comme string | null (null uniquement avec NOMKSTREAM absent).
// Dans notre cas sans NOMKSTREAM, l'ID est toujours retourné.
const id = await redis.xadd(STREAM_KEY, 'MAXLEN', '~', maxLen, '*', ...fields);
// id sera null seulement si le stream n'existe pas et NOMKSTREAM est passé — on l'ignore ici.
return id ?? 'unknown';
}
/**
* Parse un hash Redis Streams (tableau [field, value, field, value, ...])
* en RealtimeEvent. Retourne null si le hash est malformé (log warn).
*/
export function parseStreamEntry(fields: string[], logger: Logger): RealtimeEvent | null {
const map: Record<string, string> = {};
for (let i = 0; i < fields.length - 1; i += 2) {
map[fields[i]] = fields[i + 1];
}
const raw: Record<string, unknown> = {
type: map.type,
tableId: map.tableId !== undefined ? Number(map.tableId) : undefined,
viewId: map.viewId !== undefined ? Number(map.viewId) : undefined,
rowId: map.rowId !== undefined ? Number(map.rowId) : undefined,
};
if (map.row !== undefined) {
try {
raw.row = JSON.parse(map.row);
} catch {
logger.warn({ fields: map }, 'event-bus: row field not valid JSON, dropping row payload');
// row reste absent — l'event est quand même livré sans payload row.
}
}
const parsed = RealtimeEventSchema.safeParse(raw);
if (!parsed.success) {
logger.warn({ issues: parsed.error.issues, raw }, 'event-bus: malformed stream entry, skip');
return null;
}
return parsed.data;
}
/**
* Vérifie si un event passe le filtre. Un filtre absent = pas de restriction.
*/
export function passesFilter(event: RealtimeEvent, filter: EventFilter): boolean {
if (filter.tableIds && filter.tableIds.size > 0) {
if (!filter.tableIds.has(event.tableId)) return false;
}
if (filter.viewIds && filter.viewIds.size > 0) {
// Un event sans viewId ne passe pas un filtre viewIds strict.
if (event.viewId === undefined || !filter.viewIds.has(event.viewId)) return false;
}
return true;
}
/**
* Subscribe aux events Redis Streams à partir de `fromId` (inclus pour replay,
* exclusif via `>` pour nouveaux). Appelle `onEvent` pour chaque event qui
* passe le filtre. Boucle jusqu'à ce que l'abortSignal soit déclenché.
*
* Reconnexion interne : si Redis disconnect mid-stream, on tente jusqu'à
* `maxRetries` fois avec backoff exponentiel (1s2s4smaxDelay 30s).
* Si toutes les tentatives échouent, on rejette la promesse la couche SSE
* envoie un event `error` et ferme proprement.
*/
export async function subscribeToStream(opts: {
redis: Redis;
filter: EventFilter;
/**
* ID Redis Stream depuis lequel rejouer. Null = seulement les nouveaux events
* (équivalent `$` mais on utilise le dernier ID connu).
*/
fromId: string | null;
onEvent: (id: string, event: RealtimeEvent) => Promise<void>;
signal: AbortSignal;
logger: Logger;
maxRetries?: number;
/** Durée du BLOCK XREAD en ms. Surcharger à une valeur courte en test. */
blockMs?: number;
}): Promise<void> {
const { redis, filter, onEvent, signal, logger, maxRetries = 5, blockMs = XREAD_BLOCK_MS } = opts;
// Détermine le curseur de départ.
// - fromId fourni (Last-Event-ID reconnexion) : on repart depuis cet ID.
// XREAD est exclusif — Redis retourne les entries STRICTEMENT > cursor.
// Convention SSE : le client a déjà traité fromId, on lui envoie la suite.
// - null : on part du dernier ID existant pour ne livrer que les futurs events.
//
// IMPORTANT : on N'utilise PAS '$' comme cursor persistant. '$' dans XREAD est
// évalué à chaque appel comme "dernier ID au moment du XREAD" — si des events
// arrivent entre deux cycles XREAD, ils sont perdus. On résout '$' en un vrai
// ID numérique (timestamp-seq) avant la boucle pour éviter ce cas.
let cursor: string;
if (opts.fromId !== null) {
cursor = opts.fromId;
} else {
// Récupère le dernier ID réel du stream. Si le stream n'existe pas ou est
// vide, on utilise `<now>-0` comme ID fictif — tous les events futurs auront
// un ID > maintenant et seront capturés par le premier XREAD.
try {
const info = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 1);
if (info.length > 0 && Array.isArray(info[0]) && typeof info[0][0] === 'string') {
cursor = info[0][0];
} else {
// Stream vide — utilise le timestamp actuel comme borne basse.
// Tous les events publiés après cette ligne auront un ID >= now.
cursor = `${Date.now()}-0`;
}
} catch {
// Accès Redis impossible — démarre depuis maintenant de façon approximative.
cursor = `${Date.now()}-0`;
}
}
let retries = 0;
while (!signal.aborted) {
try {
// XREAD BLOCK — libère le thread Vitest/Node après XREAD_BLOCK_MS ms.
const results = await (
redis as Redis & {
xread: (...args: unknown[]) => Promise<[string, [string, string[]][]][] | null>;
}
).xread('COUNT', 100, 'BLOCK', blockMs, 'STREAMS', STREAM_KEY, cursor);
if (signal.aborted) break;
retries = 0; // Reset sur succès.
if (!results || results.length === 0) {
// Timeout XREAD sans message — normal, boucle.
continue;
}
const [, entries] = results[0];
for (const [id, fields] of entries) {
if (signal.aborted) break;
const event = parseStreamEntry(fields, logger);
if (event && passesFilter(event, filter)) {
await onEvent(id, event);
}
cursor = id; // Avance le curseur même si l'event est filtré.
}
} catch (err: unknown) {
if (signal.aborted) break;
retries++;
if (retries > maxRetries) {
logger.error({ err, retries }, 'event-bus: max retries exceeded, giving up');
throw err;
}
const delay = Math.min(1_000 * 2 ** (retries - 1), 30_000);
logger.warn({ err, retries, delayMs: delay }, 'event-bus: Redis error, retrying');
// Attente avec respect de l'abort signal.
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, delay);
signal.addEventListener(
'abort',
() => {
clearTimeout(timer);
resolve();
},
{ once: true },
);
});
}
}
}

255
bridge/src/routes/events.ts Normal file
View file

@ -0,0 +1,255 @@
/**
* Routes /api/events R3.1.b SSE realtime stream.
*
* Endpoint :
* GET /api/events/sse
*
* Auth : Bearer JWT (DocAdenice HS* ou Authentik RS*) ou service token brg_*.
* Service tokens brg_* sont acceptés pour faciliter les tests d'intégration,
* mais en production seuls les JWT user feront sens pour une UI.
* Pas de scope minimum au connect : tous les users authentifiés peuvent ouvrir
* le stream. Le filtrage par tables/views est server-side (query params).
* Un RBAC plus fin viendra en R3.1.c côté renderer (client-side par page).
*
* Query params :
* tables=<id>,<id> filtre sur ces tableIds uniquement (optionnel)
* views=<id>,<id> filtre sur ces viewIds uniquement (optionnel)
*
* Reconnexion :
* Le client SSE envoie le header `Last-Event-ID` avec le dernier ID Redis
* Streams reçu. Le bridge rejoue depuis cet ID (exclusif le client a déjà
* traité cet event).
*
* Heartbeat :
* `event: ping\ndata: {}\n\n` toutes les 25s pour garder la connexion
* vivante à travers les proxies HTTP qui coupent à ~30s.
*
* Backpressure :
* Queue bornée à MAX_QUEUE_SIZE events. Si la queue est pleine (client lent),
* on drop le plus ancien et on insère un event synthétique `backpressure`
* pour signaler la perte. Le client doit alors faire un full-reload.
* Seuil surchargeble via SSE_MAX_QUEUE_SIZE env var (non exposé dans config
* zod pour garder config.ts minimal valeur interne).
*/
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import type { SSEStreamingApi } from 'hono/streaming';
import type { RealtimeEvent } from '../domain/event.js';
import { getContainer } from '../lib/container.js';
import { subscribeToStream } from '../lib/event-bus.js';
import type { AuthVariables } from '../middleware/auth.js';
export const eventsRoutes = new Hono<{ Variables: AuthVariables }>();
const HEARTBEAT_INTERVAL_MS = 25_000;
const MAX_QUEUE_SIZE = Number(process.env.SSE_MAX_QUEUE_SIZE ?? 100);
const MAX_EVENT_BUS_RETRIES = 5;
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/**
* Parse une query string CSV en Set<number>. Ignore les valeurs non-entières.
*/
function parseIds(raw: string | undefined): Set<number> {
if (!raw) return new Set();
const ids = new Set<number>();
for (const part of raw.split(',')) {
const n = Number.parseInt(part.trim(), 10);
if (!Number.isNaN(n) && n > 0) ids.add(n);
}
return ids;
}
/**
* Écrit un event SSE. Retourne false si le stream est fermé (client déconnecté).
*/
async function writeEvent(
stream: SSEStreamingApi,
type: string,
data: unknown,
id?: string,
): Promise<boolean> {
if (stream.closed) return false;
try {
await stream.writeSSE({
event: type,
data: JSON.stringify(data),
id,
});
return true;
} catch {
return false;
}
}
// ---------------------------------------------------------------------------
// GET /sse
// ---------------------------------------------------------------------------
/**
* Pas de `requireScope` tous les users authentifiés ouvrent le stream.
* L'auth est garantie par `authMiddleware` sur le sous-routeur parent.
* Le filtrage tables/views est server-side sur le stream Redis.
*/
eventsRoutes.get('/sse', async (c) => {
const { redis, logger } = getContainer();
const url = new URL(c.req.url);
const tableIds = parseIds(url.searchParams.get('tables') ?? undefined);
const viewIds = parseIds(url.searchParams.get('views') ?? undefined);
const lastEventId = c.req.header('Last-Event-ID') ?? null;
const user = c.get('user');
const userId = user?.email ?? user?.sub ?? user?.tokenId ?? 'anonymous';
logger.info(
{ userId, tableIds: [...tableIds], viewIds: [...viewIds], lastEventId },
'SSE client connected',
);
return streamSSE(
c,
async (stream) => {
const abortController = new AbortController();
// Nettoyage quand le client se déconnecte (ou que Hono ferme le stream).
// streamSSE appelle stream.close() sur abort du request signal — on
// surveille stream.closed dans la boucle pour sortir proprement.
const onAbort = () => abortController.abort();
c.req.raw.signal.addEventListener('abort', onAbort, { once: true });
// Queue bornée pour backpressure : accumule les events pendant que
// writeSSE attend le flush réseau. Drop oldest + signal si pleine.
type QueueItem = { id: string; event: RealtimeEvent };
const queue: QueueItem[] = [];
// Promesse de la drainQueue en cours (null si aucune).
// Permet d'attendre la fin du drain actuel avant d'en lancer un autre,
// et de drainer le résidu après que subscribeToStream se termine.
let drainPromise: Promise<void> | null = null;
/**
* Tente de drainer la queue vers le stream SSE. Si le stream est fermé,
* abandonne silencieusement (le finally du streamSSE nettoiera).
* Sérialisé : si une drainQueue est en cours, on attend qu'elle finisse
* puis on relance si des items restent dans la queue.
*/
function drainQueue(): Promise<void> {
if (drainPromise !== null) {
// Un drain est en cours — on enchaîne pour drainer le résidu.
drainPromise = drainPromise.then(() => drainOnce());
} else {
drainPromise = drainOnce().finally(() => {
drainPromise = null;
});
}
return drainPromise;
}
async function drainOnce(): Promise<void> {
while (queue.length > 0 && !stream.closed) {
const item = queue.shift();
if (!item) break;
const ok = await writeEvent(
stream,
item.event.type,
{
tableId: item.event.tableId,
viewId: item.event.viewId,
rowId: item.event.rowId,
row: item.event.row,
},
item.id,
);
if (!ok) break;
}
}
/**
* Enqueue un event. Si la queue est pleine, on drop le plus ancien et
* insère un event synthétique `backpressure` pour prévenir le client.
*/
async function enqueue(id: string, event: RealtimeEvent): Promise<void> {
if (queue.length >= MAX_QUEUE_SIZE) {
// Drop oldest — le client sera informé de la perte.
queue.shift();
// Insère l'event backpressure à la tête pour que le client sache
// qu'il doit recharger son état.
queue.unshift({
id: 'backpressure',
event: {
type: 'table.updated',
tableId: event.tableId,
},
});
logger.warn(
{ userId, queueSize: queue.length },
'SSE backpressure: dropping oldest event',
);
// Signal explicite backpressure au client.
if (!stream.closed) {
await writeEvent(stream, 'backpressure', {
message: 'event queue full, please reload',
});
}
}
queue.push({ id, event });
// Déclenche le drain. L'appel est sérialisé via drainPromise.
drainQueue().catch((err) => {
logger.error({ err }, 'SSE drainQueue error');
});
}
// Heartbeat toutes les 25s. Utilise un intervalle natif Node.js.
const heartbeatTimer = setInterval(async () => {
if (stream.closed || abortController.signal.aborted) {
clearInterval(heartbeatTimer);
return;
}
const ok = await writeEvent(stream, 'ping', {});
if (!ok) clearInterval(heartbeatTimer);
}, HEARTBEAT_INTERVAL_MS);
try {
// Lance la souscription — bloque jusqu'à l'abort ou une erreur fatale.
await subscribeToStream({
redis: redis.getClient(),
filter: { tableIds, viewIds },
fromId: lastEventId,
onEvent: enqueue,
signal: abortController.signal,
logger,
maxRetries: MAX_EVENT_BUS_RETRIES,
});
// Drainer les events restants dans la queue avant de fermer (events reçus
// dans le dernier batch de subscribeToStream mais pas encore flushés).
// Si un drain est en cours, on attend qu'il finisse.
if (drainPromise) await drainPromise;
// Drainer le résidu si la queue n'est pas vide après le drain précédent.
if (queue.length > 0) await drainOnce();
} catch (err: unknown) {
// Erreur fatale (max retries Redis dépassé). On prévient le client.
logger.error({ err, userId }, 'SSE: fatal Redis error, closing stream');
if (!stream.closed) {
await writeEvent(stream, 'error', { message: 'server error, please reconnect' });
}
} finally {
clearInterval(heartbeatTimer);
c.req.raw.signal.removeEventListener('abort', onAbort);
logger.info({ userId }, 'SSE client disconnected');
}
},
// onError handler Hono streamSSE — capture les erreurs non prévues dans le cb.
async (err, stream) => {
logger.error({ err }, 'SSE stream uncaught error');
if (!stream.closed) {
await stream.writeSSE({
event: 'error',
data: JSON.stringify({ message: 'internal server error' }),
});
}
},
);
});

View file

@ -25,7 +25,6 @@ export const webhooksRoutes = new Hono();
webhooksRoutes.post('/baserow', async (c) => {
const { config, redis, logger } = getContainer();
const signature = c.req.header(BASEROW_SIGNATURE_HEADER);
if (!signature) {
throw errors.authRequired();
@ -58,7 +57,13 @@ webhooksRoutes.post('/baserow', async (c) => {
return c.json({ status: 'duplicate', eventId: payload.event_id }, 200);
}
const result = await handleBaserowEvent(payload, { redis, logger });
// R3.1.b : passe le client Redis brut pour la publication SSE (Redis Streams).
const result = await handleBaserowEvent(payload, {
redis,
logger,
streamRedis: redis.getClient(),
streamMaxLen: config.streamMaxLen,
});
return c.json(
{
status: result.status,

View file

@ -2,28 +2,44 @@
* Handler webhooks Baserow generique style Notion.
*
* Pipeline : payload deja valide (zod) + idempotence verifiee en amont (route).
* Ici on invalide les caches Redis associes a la table touchee. Plus de
* cascade rollup metier : si l'utilisateur a configure des formules/lookups
* cross-table cote Baserow, elles emettront leurs propres webhooks
* Ici on invalide les caches Redis associes a la table touchee, puis on publie
* un RealtimeEvent sur le stream Redis pour les clients SSE connectes (R3.1.b).
*
* Plus de cascade rollup metier : si l'utilisateur a configure des formules/
* lookups cross-table cote Baserow, elles emettront leurs propres webhooks
* naturellement (chaque table touchee declenche son propre event).
*
* Le handler ne sait pas quelle table c'est metier-parlant il invalide
* juste `bridge:tables:<tableId>:*`.
*/
import type { Redis } from 'ioredis';
import type { Logger } from 'pino';
import type { RedisCache } from '../adapters/redis-cache.js';
import type { RealtimeEvent } from '../domain/event.js';
import { publishEvent } from '../lib/event-bus.js';
import type { BaserowEventType, BaserowWebhookPayload } from './types.js';
export interface BaserowHandlerDeps {
redis: RedisCache;
logger: Logger;
/**
* Client Redis brut pour XADD (event-bus R3.1.b). Séparé de RedisCache car
* l'event-bus a besoin de commandes Streams non exposées par RedisCache.
* Optionnel pour rétrocompatibilité des tests existants si absent, la
* publication SSE est simplement skippée (mode dégradé sans clients SSE).
*/
streamRedis?: Redis;
/** Taille max du stream Redis (MAXLEN). Default 10 000. */
streamMaxLen?: number;
}
export interface BaserowHandleResult {
status: 'processed' | 'ignored';
tableId: number | null;
invalidatedKeys: number;
/** ID Redis Stream de l'event publié (null si pas de streamRedis ou event ignoré). */
publishedEventId: string | null;
}
/**
@ -73,6 +89,62 @@ function buildViewInvalidationPatterns(tableId: number, viewId: number | undefin
return patterns;
}
/**
* Convertit un event Baserow webhook en RealtimeEvent(s) normalisés.
*
* Pour rows.* avec plusieurs items, on émet un event par row (granularité fine
* pour les clients SSE qui peuvent mettre à jour une seule ligne sans recharger
* toute la vue). Pour view.* et table.updated, un seul event suffit.
*/
function buildRealtimeEvents(payload: BaserowWebhookPayload): RealtimeEvent[] {
const { event_type, table_id, view_id, items } = payload;
if (
event_type === 'view.created' ||
event_type === 'view.updated' ||
event_type === 'view.deleted'
) {
return [
{
type: event_type,
tableId: table_id,
viewId: view_id,
},
];
}
// rows.created | rows.updated | rows.deleted
const rowEventType =
event_type === 'rows.created'
? 'row.created'
: event_type === 'rows.updated'
? 'row.updated'
: 'row.deleted';
if (items.length === 0) {
// Payload sans items (ex: test ping Baserow) — émet un event table.updated
// pour signaler que la table a changé sans détails précis.
return [{ type: 'table.updated', tableId: table_id }];
}
return items.map((item) => {
const event: RealtimeEvent = {
type: rowEventType,
tableId: table_id,
rowId: item.id,
};
// Pour rows.created et rows.updated, on inclut les fields si disponibles.
// Baserow peut fournir les champs dans l'item (passthrough via zod).
if (rowEventType !== 'row.deleted') {
const { id: _id, ...fields } = item as Record<string, unknown>;
if (Object.keys(fields).length > 0) {
event.row = fields;
}
}
return event;
});
}
export async function handleBaserowEvent(
payload: BaserowWebhookPayload,
deps: BaserowHandlerDeps,
@ -82,7 +154,7 @@ export async function handleBaserowEvent(
{ tableId: payload.table_id, eventId: payload.event_id, eventType: payload.event_type },
'baserow webhook: table_id invalide, ignore',
);
return { status: 'ignored', tableId: null, invalidatedKeys: 0 };
return { status: 'ignored', tableId: null, invalidatedKeys: 0, publishedEventId: null };
}
const isViewEvent =
@ -106,6 +178,27 @@ export async function handleBaserowEvent(
total += n;
}
// R3.1.b : publie les RealtimeEvents sur le stream Redis pour les clients SSE.
// Chaque event row est publié séparément pour la granularité fine.
let publishedEventId: string | null = null;
if (deps.streamRedis) {
const events = buildRealtimeEvents(payload);
for (const event of events) {
try {
// On conserve le dernier ID publié pour le log (le premier suffit
// comme indicateur que la publication a eu lieu).
publishedEventId = await publishEvent(deps.streamRedis, event, deps.streamMaxLen);
} catch (err) {
// La publication SSE est non-critique : si Redis Streams échoue, le
// cache a déjà été invalidé et les clients SSE se reconnecteront.
deps.logger.error(
{ err, eventType: event.type, tableId: event.tableId },
'baserow webhook: failed to publish realtime event',
);
}
}
}
deps.logger.info(
{
eventId: payload.event_id,
@ -115,9 +208,15 @@ export async function handleBaserowEvent(
itemIds,
patternsApplied: patterns.length,
keysInvalidated: total,
publishedEventId,
},
'baserow webhook processed',
);
return { status: 'processed', tableId: payload.table_id, invalidatedKeys: total };
return {
status: 'processed',
tableId: payload.table_id,
invalidatedKeys: total,
publishedEventId,
};
}

View file

@ -41,12 +41,14 @@ export interface TestContainerOverrides {
/**
* Stub Redis minimal pour les tests routes : juste les methodes que les routes
* appellent (invalidatePattern + checkRateLimit). No-op qui ne refuse jamais.
* appellent (invalidatePattern + checkRateLimit + getClient). No-op qui ne refuse jamais.
* R3.1.b : getClient retourne un fake Redis Streams (xadd no-op).
*/
function buildNoopRedis(): RedisCache {
return {
invalidatePattern: async (_pattern: string) => 0,
checkRateLimit: async (_key: string, _max: number, _win: number) => true,
getClient: () => ({ xadd: async () => '0-0' }),
} as unknown as RedisCache;
}
@ -72,6 +74,7 @@ export function installTestContainer(over: TestContainerOverrides): Container {
rateLimitGlobalWindow: 60,
rateLimitMutationMax: 10000,
rateLimitMutationWindow: 60,
streamMaxLen: 10000,
},
baserow: fakeBaserow,
redis: fakeRedis,

View file

@ -0,0 +1,476 @@
/**
* Tests event-bus Redis Streams publish/subscribe.
*
* Stratégie : testcontainers Redis 7-alpine pour les tests d'intégration
* (publish, subscribe, replay Last-Event-ID). Les tests unitaires (parseStreamEntry,
* passesFilter) sont purement en mémoire sans Redis.
*/
import IORedis, { type Redis } from 'ioredis';
import pino from 'pino';
import { GenericContainer, type StartedTestContainer } from 'testcontainers';
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from 'vitest';
import type { RealtimeEvent } from '../../src/domain/event.js';
import {
STREAM_KEY,
parseStreamEntry,
passesFilter,
publishEvent,
subscribeToStream,
} from '../../src/lib/event-bus.js';
const silentLogger = pino({ level: 'silent' });
// ---------------------------------------------------------------------------
// Unitaires (pas de Redis)
// ---------------------------------------------------------------------------
describe('parseStreamEntry', () => {
it('parse un entry rows.created minimal', () => {
const fields = ['type', 'row.created', 'tableId', '42', 'rowId', '100'];
const result = parseStreamEntry(fields, silentLogger);
expect(result).toEqual({ type: 'row.created', tableId: 42, rowId: 100 });
});
it('parse un entry avec viewId', () => {
const fields = ['type', 'view.updated', 'tableId', '5', 'viewId', '200'];
const result = parseStreamEntry(fields, silentLogger);
expect(result).toEqual({ type: 'view.updated', tableId: 5, viewId: 200 });
});
it('parse un entry avec row JSON', () => {
const row = { name: 'Alice', age: 30 };
const fields = [
'type',
'row.updated',
'tableId',
'7',
'rowId',
'1',
'row',
JSON.stringify(row),
];
const result = parseStreamEntry(fields, silentLogger);
expect(result?.row).toEqual(row);
});
it('retourne null sur type inconnu', () => {
const fields = ['type', 'unknown.event', 'tableId', '1'];
const result = parseStreamEntry(fields, silentLogger);
expect(result).toBeNull();
});
it('retourne null si tableId manquant', () => {
const fields = ['type', 'row.created'];
const result = parseStreamEntry(fields, silentLogger);
expect(result).toBeNull();
});
it('ignore row JSON invalide et livre l event sans row', () => {
const fields = ['type', 'row.created', 'tableId', '1', 'rowId', '2', 'row', 'NOT_JSON'];
const result = parseStreamEntry(fields, silentLogger);
// L'event est livré sans le payload row malformé.
expect(result).not.toBeNull();
expect(result?.row).toBeUndefined();
expect(result?.type).toBe('row.created');
});
});
describe('passesFilter', () => {
const event: RealtimeEvent = { type: 'row.created', tableId: 10, viewId: 20, rowId: 5 };
it('filtre vide -> passe toujours', () => {
expect(passesFilter(event, {})).toBe(true);
expect(passesFilter(event, { tableIds: new Set(), viewIds: new Set() })).toBe(true);
});
it('filtre tableIds match -> passe', () => {
expect(passesFilter(event, { tableIds: new Set([10]) })).toBe(true);
});
it('filtre tableIds no match -> rejecte', () => {
expect(passesFilter(event, { tableIds: new Set([99]) })).toBe(false);
});
it('filtre viewIds match -> passe', () => {
expect(passesFilter(event, { viewIds: new Set([20]) })).toBe(true);
});
it('filtre viewIds no match -> rejecte', () => {
expect(passesFilter(event, { viewIds: new Set([99]) })).toBe(false);
});
it('event sans viewId rejecte sur filtre viewIds strict', () => {
const noView: RealtimeEvent = { type: 'row.created', tableId: 10 };
expect(passesFilter(noView, { viewIds: new Set([20]) })).toBe(false);
});
it('filtre tableIds + viewIds combinés -> passe ssi les deux matchent', () => {
expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([20]) })).toBe(true);
expect(passesFilter(event, { tableIds: new Set([10]), viewIds: new Set([99]) })).toBe(false);
expect(passesFilter(event, { tableIds: new Set([99]), viewIds: new Set([20]) })).toBe(false);
});
});
// ---------------------------------------------------------------------------
// Unitaires avec fake Redis (retry logic, xrevrange failure)
// ---------------------------------------------------------------------------
describe('subscribeToStream — retry logic (fake Redis)', () => {
it('retente sur erreur XREAD transitoire et livre les events ensuite', async () => {
// Premier xread lance une erreur, second retourne un event.
const fields = ['type', 'row.created', 'tableId', '7', 'rowId', '1'];
let callCount = 0;
const received: Array<{ id: string; event: RealtimeEvent }> = [];
const ac = new AbortController();
const fakeRedis = {
xrevrange: vi.fn().mockResolvedValue([['999-0', []]]),
xread: vi.fn().mockImplementation(async () => {
callCount++;
if (callCount === 1) throw new Error('ECONNRESET');
// Deuxième appel : retourne l'event (on aborts depuis onEvent après réception).
return [['events:stream', [['1000-0', fields]]]];
}),
} as unknown as Redis;
await subscribeToStream({
redis: fakeRedis,
filter: {},
fromId: null,
onEvent: async (id, ev) => {
received.push({ id, event: ev });
// Abort après avoir reçu l'event pour sortir de la boucle.
ac.abort();
},
signal: ac.signal,
logger: silentLogger,
maxRetries: 3,
blockMs: 50,
});
// callCount >= 2 : au moins un retry a eu lieu.
expect(callCount).toBeGreaterThanOrEqual(2);
expect(received).toHaveLength(1);
expect(received[0].event.type).toBe('row.created');
}, 10_000);
it('throw si max retries depasse', async () => {
const ac = new AbortController();
const fakeRedis = {
xrevrange: vi.fn().mockResolvedValue([]),
xread: vi.fn().mockRejectedValue(new Error('Redis unavailable')),
} as unknown as Redis;
await expect(
subscribeToStream({
redis: fakeRedis,
filter: {},
fromId: null,
onEvent: async () => {},
signal: ac.signal,
logger: silentLogger,
maxRetries: 1,
blockMs: 10,
}),
).rejects.toThrow('Redis unavailable');
}, 10_000);
it('sort proprement si le signal est abort avant le premier XREAD', async () => {
const ac = new AbortController();
// Abort avant que subscribeToStream ne commence.
ac.abort();
const fakeRedis = {
xrevrange: vi.fn().mockResolvedValue([]),
xread: vi.fn(),
} as unknown as Redis;
await expect(
subscribeToStream({
redis: fakeRedis,
filter: {},
fromId: null,
onEvent: async () => {},
signal: ac.signal,
logger: silentLogger,
blockMs: 10,
}),
).resolves.toBeUndefined();
// xread ne doit jamais avoir été appelé.
expect(fakeRedis.xread).not.toHaveBeenCalled();
}, 5_000);
it('sort proprement si le signal est abort pendant le retry backoff', async () => {
let callCount = 0;
const ac = new AbortController();
const fakeRedis = {
xrevrange: vi.fn().mockResolvedValue([]),
xread: vi.fn().mockImplementation(async () => {
callCount++;
// Abort le signal pendant le backoff pour couvrir la branche abort dans le catch.
setTimeout(() => ac.abort(), 10);
throw new Error('ECONNRESET during retry');
}),
} as unknown as Redis;
await expect(
subscribeToStream({
redis: fakeRedis,
filter: {},
fromId: null,
onEvent: async () => {},
signal: ac.signal,
logger: silentLogger,
maxRetries: 5,
blockMs: 10,
}),
).resolves.toBeUndefined();
// Exactement 1 XREAD avant l'abort (abort coupe le backoff et sort de la boucle).
expect(callCount).toBe(1);
}, 10_000);
it('fromId null avec xrevrange qui throw utilise Date.now() comme cursor', async () => {
const ac = new AbortController();
const beforeNow = Date.now();
const fakeRedis = {
// xrevrange lance une erreur — couvre le catch lines 182-185.
xrevrange: vi.fn().mockRejectedValue(new Error('xrevrange failed')),
xread: vi.fn().mockImplementation(async () => {
// On vérifie que subscribeToStream a bien initialisé un cursor proche de now.
ac.abort();
return null;
}),
} as unknown as Redis;
await subscribeToStream({
redis: fakeRedis,
filter: {},
fromId: null,
onEvent: async () => {},
signal: ac.signal,
logger: silentLogger,
blockMs: 10,
});
const afterNow = Date.now();
// xread a bien été appelé (curseur initialisé), avec un argument cursor ~ Date.now().
expect(fakeRedis.xread).toHaveBeenCalledOnce();
const xreadArgs = vi.mocked(fakeRedis.xread).mock.calls[0] as unknown[];
// Args : 'COUNT', 100, 'BLOCK', blockMs, 'STREAMS', STREAM_KEY, cursor
const cursor = String(xreadArgs[xreadArgs.length - 1]);
const ts = Number(cursor.split('-')[0]);
expect(ts).toBeGreaterThanOrEqual(beforeNow);
expect(ts).toBeLessThanOrEqual(afterNow + 100);
}, 5_000);
});
// ---------------------------------------------------------------------------
// Integration (testcontainers Redis)
// ---------------------------------------------------------------------------
describe('event-bus integration (Redis Streams)', () => {
let container: StartedTestContainer;
let redis: Redis;
beforeAll(async () => {
container = await new GenericContainer('redis:7-alpine').withExposedPorts(6379).start();
const host = container.getHost();
const port = container.getMappedPort(6379);
redis = new IORedis(`redis://${host}:${port}`, {
lazyConnect: false,
maxRetriesPerRequest: 3,
});
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error('redis ready timeout')), 10_000);
if (redis.status === 'ready') {
clearTimeout(timer);
resolve();
return;
}
redis.once('ready', () => {
clearTimeout(timer);
resolve();
});
});
}, 60_000);
afterAll(async () => {
await redis?.quit();
await container?.stop();
}, 30_000);
afterEach(async () => {
await redis.del(STREAM_KEY);
});
it('publishEvent ajoute une entry au stream et retourne un ID', async () => {
const event: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 42 };
const id = await publishEvent(redis, event);
expect(id).toMatch(/^\d+-\d+$/);
// Vérifie que l'entry est bien dans le stream.
const entries = await redis.xrange(STREAM_KEY, '-', '+');
expect(entries).toHaveLength(1);
expect(entries[0][0]).toBe(id);
});
it('publishEvent sérialise les champs optionnels', async () => {
const row = { field1: 'hello', num: 42 };
const event: RealtimeEvent = {
type: 'row.updated',
tableId: 5,
viewId: 10,
rowId: 99,
row,
};
const id = await publishEvent(redis, event);
const entries = await redis.xrange(STREAM_KEY, '-', '+');
expect(entries).toHaveLength(1);
// Reconstruit le hash depuis les fields.
const fields = entries[0][1];
const map: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
expect(map.type).toBe('row.updated');
expect(map.tableId).toBe('5');
expect(map.viewId).toBe('10');
expect(map.rowId).toBe('99');
expect(JSON.parse(map.row)).toEqual(row);
// Vérifie que parseStreamEntry round-trip fonctionne.
const parsed = parseStreamEntry(fields, silentLogger);
expect(parsed).toEqual({ type: 'row.updated', tableId: 5, viewId: 10, rowId: 99, row });
void id; // Utilisé pour la vérification ci-dessus.
});
it('MAXLEN borne le stream (approximatif)', async () => {
const maxLen = 100;
// Publie 500 events pour déclencher le trimming.
for (let i = 0; i < 500; i++) {
await publishEvent(redis, { type: 'row.created', tableId: i + 1 }, maxLen);
}
const count = await redis.xlen(STREAM_KEY);
// MAXLEN ~ est approximatif — Redis conserve maxLen + quelques entries (radix node boundary).
// En pratique, on vérifie que le stream est significativement plus petit que 500.
expect(count).toBeLessThan(300);
});
it('subscribeToStream livre les events publiés après la connexion', async () => {
const received: Array<{ id: string; event: RealtimeEvent }> = [];
const ac = new AbortController();
const subscribePromise = subscribeToStream({
redis,
filter: {},
fromId: null,
onEvent: async (id, event) => {
received.push({ id, event });
if (received.length >= 2) ac.abort();
},
signal: ac.signal,
logger: silentLogger,
// BLOCK court pour que les tests ne bloquent pas 2s par cycle.
blockMs: 200,
});
// Attend que le subscriber soit prêt (curseur initialisé + 1er XREAD lancé).
// 300ms pour absorber la latence réseau testcontainer.
await new Promise((r) => setTimeout(r, 300));
const e1: RealtimeEvent = { type: 'row.created', tableId: 1, rowId: 1 };
const e2: RealtimeEvent = { type: 'row.updated', tableId: 2, rowId: 2 };
await publishEvent(redis, e1);
await publishEvent(redis, e2);
await subscribePromise;
expect(received).toHaveLength(2);
expect(received[0].event.type).toBe('row.created');
expect(received[1].event.type).toBe('row.updated');
}, 15_000);
it('subscribeToStream filtre les events par tableIds', async () => {
const received: Array<{ id: string; event: RealtimeEvent }> = [];
const ac = new AbortController();
const subscribePromise = subscribeToStream({
redis,
filter: { tableIds: new Set([42]) },
fromId: null,
onEvent: async (id, event) => {
received.push({ id, event });
if (received.length >= 1) ac.abort();
},
signal: ac.signal,
logger: silentLogger,
blockMs: 200,
});
await new Promise((r) => setTimeout(r, 300));
// Publie un event hors filtre et un event dans le filtre.
await publishEvent(redis, { type: 'row.created', tableId: 99, rowId: 1 });
await publishEvent(redis, { type: 'row.created', tableId: 42, rowId: 2 });
await subscribePromise;
expect(received).toHaveLength(1);
expect(received[0].event.tableId).toBe(42);
}, 15_000);
it('subscribeToStream supporte le replay Last-Event-ID', async () => {
// Publie 3 events avant la connexion.
const id1 = await publishEvent(redis, { type: 'row.created', tableId: 1, rowId: 1 });
const id2 = await publishEvent(redis, { type: 'row.updated', tableId: 1, rowId: 2 });
await publishEvent(redis, { type: 'row.deleted', tableId: 1, rowId: 3 });
// Simule une reconnexion depuis id1 : on doit recevoir id2 et id3.
const received: Array<{ id: string; event: RealtimeEvent }> = [];
const ac = new AbortController();
await subscribeToStream({
redis,
filter: {},
fromId: id1, // Replay depuis id1 exclu (XREAD strict).
onEvent: async (id, event) => {
received.push({ id, event });
// id2 + id3 = 2 events.
if (received.length >= 2) ac.abort();
},
signal: ac.signal,
logger: silentLogger,
blockMs: 200,
});
expect(received).toHaveLength(2);
expect(received[0].id).toBe(id2);
void id2; // Référencé ci-dessus.
}, 15_000);
it('subscribeToStream s arrête proprement sur abort signal', async () => {
const ac = new AbortController();
let callCount = 0;
const subscribePromise = subscribeToStream({
redis,
filter: {},
fromId: null,
onEvent: async () => {
callCount++;
},
signal: ac.signal,
logger: silentLogger,
blockMs: 200,
});
// Abort immédiat — la promesse doit résoudre sans erreur.
ac.abort();
await expect(subscribePromise).resolves.toBeUndefined();
expect(callCount).toBe(0);
}, 10_000);
});

View file

@ -0,0 +1,357 @@
/**
* Tests de la route GET /api/events/sse R3.1.b.
*
* Stratégie : on construit une app Hono de test avec un container fake.
* Le `subscribeToStream` est mocké pour injecter des events contrôlés.
* On lit le ReadableStream SSE et on vérifie les events reçus.
*
* Vitest fake timers permettent de déclencher le heartbeat sans attendre 25s.
*/
import { Hono } from 'hono';
import { logger as honoLogger } from 'hono/logger';
import type { Redis } from 'ioredis';
import pino from 'pino';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type { RedisCache } from '../../src/adapters/redis-cache.js';
import type { RealtimeEvent } from '../../src/domain/event.js';
import type { Container } from '../../src/lib/container.js';
import { setContainer } from '../../src/lib/container.js';
import { type AuthVariables, authMiddleware } from '../../src/middleware/auth.js';
import { errorHandler } from '../../src/middleware/error-handler.js';
import { eventsRoutes } from '../../src/routes/events.js';
// ---------------------------------------------------------------------------
// Mock event-bus — contrôle complet sur subscribeToStream
// ---------------------------------------------------------------------------
vi.mock('../../src/lib/event-bus.js', async (importOriginal) => {
const original = await importOriginal<typeof import('../../src/lib/event-bus.js')>();
return {
...original,
subscribeToStream: vi.fn(),
};
});
import { subscribeToStream } from '../../src/lib/event-bus.js';
const mockSubscribeToStream = vi.mocked(subscribeToStream);
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
const silentLogger = pino({ level: 'silent' });
const ADMIN_TOKEN = 'brg_admin_events_test';
const NO_SCOPE_TOKEN = 'brg_no_scope_events_test';
function buildFakeRedis(): RedisCache {
return {
getClient: () => ({}) as Redis,
invalidatePattern: async () => 0,
checkRateLimit: async () => true,
} as unknown as RedisCache;
}
function buildTestContainer(): Container {
const tokensMap = new Map([
[ADMIN_TOKEN, { token: ADMIN_TOKEN, name: 'test-admin', scopes: ['admin:*'] }],
[NO_SCOPE_TOKEN, { token: NO_SCOPE_TOKEN, name: 'test-no-scope', scopes: ['read:tables'] }],
]);
return {
config: {
nodeEnv: 'test',
port: 0,
logLevel: 'fatal',
baserowApiUrl: 'http://localhost',
baserowApiToken: 'fake',
redisUrl: 'redis://localhost',
baserowWebhookSecret: 'fake_secret_at_least_16_chars',
rateLimitGlobalMax: 10_000,
rateLimitGlobalWindow: 60,
rateLimitMutationMax: 10_000,
rateLimitMutationWindow: 60,
streamMaxLen: 10_000,
},
baserow: {} as unknown,
redis: buildFakeRedis(),
repos: {} as unknown,
tokens: tokensMap,
oidc: null,
docmostJwt: null,
groupsScopesMap: {},
logger: silentLogger,
} as unknown as Container;
}
function buildEventsApp(): Hono<{ Variables: AuthVariables }> {
const container = buildTestContainer();
setContainer(container);
const app = new Hono<{ Variables: AuthVariables }>();
app.use('*', honoLogger());
app.onError(errorHandler);
const eventsRouter = new Hono<{ Variables: AuthVariables }>();
eventsRouter.use(
'*',
authMiddleware({
tokens: container.tokens,
oidc: null,
docmostJwt: null,
groupsScopesMap: {},
logger: silentLogger,
}),
);
eventsRouter.route('/events', eventsRoutes);
app.route('/api', eventsRouter);
return app;
}
/**
* Lit N events SSE depuis un ReadableStream. Timeout après `timeoutMs` ms.
* Retourne les events parsés (lignes `event:` + `data:` extraites).
*/
async function readSseEvents(
stream: ReadableStream<Uint8Array>,
n: number,
timeoutMs = 3_000,
): Promise<Array<{ event: string; data: string; id?: string }>> {
const decoder = new TextDecoder();
const events: Array<{ event: string; data: string; id?: string }> = [];
const reader = stream.getReader();
let buffer = '';
let currentEvent = '';
let currentData = '';
let currentId: string | undefined;
const deadline = Date.now() + timeoutMs;
try {
while (events.length < n && Date.now() < deadline) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE format : blocs séparés par \n\n.
const parts = buffer.split('\n\n');
buffer = parts.pop() ?? '';
for (const block of parts) {
currentEvent = '';
currentData = '';
currentId = undefined;
for (const line of block.split('\n')) {
if (line.startsWith('event: ')) currentEvent = line.slice(7);
else if (line.startsWith('data: ')) currentData = line.slice(6);
else if (line.startsWith('id: ')) currentId = line.slice(4);
}
if (currentEvent) {
events.push({ event: currentEvent, data: currentData, id: currentId });
}
}
}
} finally {
reader.cancel();
}
return events;
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('GET /api/events/sse', () => {
let app: Hono<{ Variables: AuthVariables }>;
beforeEach(() => {
app = buildEventsApp();
mockSubscribeToStream.mockReset();
});
afterEach(() => {
setContainer(null);
vi.restoreAllMocks();
});
it('retourne 401 sans auth', async () => {
const res = await app.request('/api/events/sse');
expect(res.status).toBe(401);
});
it('retourne les headers SSE corrects sur connexion authentifiée', async () => {
// subscribeToStream se termine immédiatement (pas d'events).
mockSubscribeToStream.mockResolvedValue(undefined);
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200);
expect(res.headers.get('Content-Type')).toContain('text/event-stream');
expect(res.headers.get('Cache-Control')).toBe('no-cache');
expect(res.headers.get('Connection')).toBe('keep-alive');
});
it('livre les events publiés via subscribeToStream', async () => {
const events: RealtimeEvent[] = [
{ type: 'row.created', tableId: 1, rowId: 10 },
{ type: 'row.updated', tableId: 1, rowId: 10, row: { name: 'Alice' } },
];
mockSubscribeToStream.mockImplementation(async (opts) => {
for (let i = 0; i < events.length; i++) {
await opts.onEvent(`event-id-${i + 1}`, events[i]);
}
});
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200);
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 2);
expect(received).toHaveLength(2);
expect(received[0].event).toBe('row.created');
expect(received[0].id).toBe('event-id-1');
const data0 = JSON.parse(received[0].data);
expect(data0.tableId).toBe(1);
expect(data0.rowId).toBe(10);
expect(received[1].event).toBe('row.updated');
const data1 = JSON.parse(received[1].data);
expect(data1.row).toEqual({ name: 'Alice' });
});
it('passe le filtre ?tables= à subscribeToStream', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?tables=10,20', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
});
it('passe le filtre ?views= à subscribeToStream', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?views=5,6', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.filter.viewIds).toEqual(new Set([5, 6]));
});
it('passe Last-Event-ID à subscribeToStream comme fromId', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse', {
headers: {
Authorization: `Bearer ${ADMIN_TOKEN}`,
'Last-Event-ID': '1234567890-0',
},
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.fromId).toBe('1234567890-0');
});
it('fromId est null quand pas de Last-Event-ID', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
expect(callOpts.fromId).toBeNull();
});
it('envoie un event error SSE si subscribeToStream throw une erreur fatale', async () => {
mockSubscribeToStream.mockRejectedValue(new Error('Redis max retries exceeded'));
const res = await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(res.status).toBe(200); // SSE stream reste 200 même en cas d'erreur.
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 1, 2_000);
const errorEvent = received.find((e) => e.event === 'error');
expect(errorEvent).toBeDefined();
const data = JSON.parse(errorEvent?.data ?? '{}');
expect(data.message).toContain('reconnect');
});
it('heartbeat : vérifie que subscribeToStream reçoit un signal valide', async () => {
// On ne peut pas tester le heartbeat sans fake timers sur setInterval.
// On vérifie que l'abort signal est proprement transmis et que la fn est appelée.
mockSubscribeToStream.mockImplementation(async (opts) => {
// Vérifie que le signal est un AbortSignal.
expect(opts.signal).toBeInstanceOf(AbortSignal);
});
await app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
expect(mockSubscribeToStream).toHaveBeenCalledOnce();
});
it('filtre ?tables= avec valeurs invalides ignorées', async () => {
mockSubscribeToStream.mockResolvedValue(undefined);
await app.request('/api/events/sse?tables=10,abc,0,-5,20', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
const callOpts = mockSubscribeToStream.mock.calls[0][0];
// Seules les valeurs entières positives sont conservées.
expect(callOpts.filter.tableIds).toEqual(new Set([10, 20]));
});
it('heartbeat envoie un event ping via setInterval', async () => {
// Utilise les fake timers pour déclencher setInterval sans attendre 25s réels.
vi.useFakeTimers();
// subscribeToStream bloque le temps que l'on avance les timers + reçoit le ping.
let resolveSubscribe!: () => void;
const subscribePromise = new Promise<void>((resolve) => {
resolveSubscribe = resolve;
});
mockSubscribeToStream.mockImplementation(async () => {
// Attend que le test avance les timers.
await subscribePromise;
});
// Collecte les events SSE reçus en parallèle.
const responsePromise = app.request('/api/events/sse', {
headers: { Authorization: `Bearer ${ADMIN_TOKEN}` },
});
// Avance les timers de 25s pour déclencher le heartbeat.
await vi.advanceTimersByTimeAsync(25_000);
// Libère subscribeToStream pour fermer le stream proprement.
resolveSubscribe();
const res = await responsePromise;
vi.useRealTimers();
expect(res.status).toBe(200);
if (!res.body) throw new Error('SSE response has no body');
const received = await readSseEvents(res.body, 1, 2_000);
const pingEvent = received.find((e) => e.event === 'ping');
expect(pingEvent).toBeDefined();
});
});

View file

@ -1,9 +1,11 @@
/**
* Tests handler webhooks Baserow generique R1 (plus de cascade rollup metier).
* R3.1.b : tests de la publication SSE via streamRedis.
*/
import type { Redis } from 'ioredis';
import pino from 'pino';
import { describe, expect, it } from 'vitest';
import { describe, expect, it, vi } from 'vitest';
import type { RedisCache } from '../../src/adapters/redis-cache.js';
import { handleBaserowEvent } from '../../src/webhooks/baserow-handler.js';
import type { BaserowWebhookPayload } from '../../src/webhooks/types.js';
@ -152,4 +154,152 @@ describe('handleBaserowEvent (R1 generique)', () => {
// R3.1.a : 5 patterns — list:*, views:* (tables keyspace), views:data:* (R3.1.a), row:1, row:2.
expect(res.invalidatedKeys).toBe(5);
});
it('sans streamRedis, publishedEventId est null', async () => {
const redis = new FakeRedis();
const res = await handleBaserowEvent(makePayload(), {
redis: redis as unknown as RedisCache,
logger: silentLogger(),
// Pas de streamRedis — mode dégradé sans clients SSE.
});
expect(res.publishedEventId).toBeNull();
});
});
describe('handleBaserowEvent R3.1.b — publication SSE', () => {
/**
* Fake Redis Streams : simule xadd et retourne un ID déterministe.
*/
class FakeStreamRedis {
public xaddCalls: Array<{ key: string; fields: string[] }> = [];
private seq = 0;
async xadd(key: string, ...args: unknown[]): Promise<string> {
// args = ['MAXLEN', '~', maxLen, '*', ...fields]
const fieldsStart = args.indexOf('*') + 1;
const fields = args.slice(fieldsStart) as string[];
this.xaddCalls.push({ key, fields });
return `${Date.now()}-${++this.seq}`;
}
}
it('rows.created -> publie un event row.created sur le stream', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
const res = await handleBaserowEvent(
makePayload({ event_type: 'rows.created', table_id: 42, items: [{ id: 100 }] }),
{
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
},
);
expect(res.publishedEventId).not.toBeNull();
expect(streamRedis.xaddCalls).toHaveLength(1);
const { fields } = streamRedis.xaddCalls[0];
const map: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
expect(map.type).toBe('row.created');
expect(map.tableId).toBe('42');
expect(map.rowId).toBe('100');
});
it('rows.updated avec 2 items -> publie 2 events row.updated', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
await handleBaserowEvent(
makePayload({ event_type: 'rows.updated', items: [{ id: 1 }, { id: 2 }] }),
{
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
},
);
expect(streamRedis.xaddCalls).toHaveLength(2);
for (const call of streamRedis.xaddCalls) {
const map: Record<string, string> = {};
for (let i = 0; i < call.fields.length; i += 2) map[call.fields[i]] = call.fields[i + 1];
expect(map.type).toBe('row.updated');
}
});
it('rows.deleted -> publie un event row.deleted sans row payload', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
await handleBaserowEvent(makePayload({ event_type: 'rows.deleted', items: [{ id: 5 }] }), {
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
});
const { fields } = streamRedis.xaddCalls[0];
const map: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
expect(map.type).toBe('row.deleted');
// row.deleted ne doit pas inclure de payload row.
expect(map.row).toBeUndefined();
});
it('view.created -> publie un event view.created avec viewId', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
await handleBaserowEvent(
makePayload({ event_type: 'view.created', table_id: 5, view_id: 200, items: [] }),
{
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
},
);
expect(streamRedis.xaddCalls).toHaveLength(1);
const { fields } = streamRedis.xaddCalls[0];
const map: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
expect(map.type).toBe('view.created');
expect(map.viewId).toBe('200');
});
it('rows.created sans items -> publie un event table.updated', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
await handleBaserowEvent(makePayload({ event_type: 'rows.created', items: [] }), {
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
});
expect(streamRedis.xaddCalls).toHaveLength(1);
const { fields } = streamRedis.xaddCalls[0];
const map: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) map[fields[i]] = fields[i + 1];
expect(map.type).toBe('table.updated');
});
it('table_id invalide -> ignoré, pas de publication SSE', async () => {
const redis = new FakeRedis();
const streamRedis = new FakeStreamRedis();
await handleBaserowEvent(makePayload({ table_id: 0 }), {
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: streamRedis as unknown as Redis,
});
expect(streamRedis.xaddCalls).toHaveLength(0);
});
it('erreur xadd est loggée mais ne lève pas d exception (non-critique)', async () => {
const redis = new FakeRedis();
const failingStreamRedis = {
xadd: vi.fn().mockRejectedValue(new Error('Redis connection lost')),
} as unknown as Redis;
// Doit résoudre sans throw même si xadd échoue.
const res = await handleBaserowEvent(makePayload(), {
redis: redis as unknown as RedisCache,
logger: silentLogger(),
streamRedis: failingStreamRedis,
});
// La cache invalidation a quand même eu lieu.
expect(res.invalidatedKeys).toBeGreaterThan(0);
// publishedEventId est null car xadd a échoué.
expect(res.publishedEventId).toBeNull();
});
});

View file

@ -29,6 +29,14 @@ class FakeRedis {
this.invalidated.push(pattern);
return Promise.resolve(1);
}
// R3.1.b : getClient requis par webhooks/baserow route pour le streamRedis.
// Retourne un faux client Redis Streams qui ignore les XADD.
getClient() {
return {
xadd: async () => '0-0',
};
}
}
function installContainer(redis: FakeRedis, withDocmostSecret = true) {
@ -47,6 +55,7 @@ function installContainer(redis: FakeRedis, withDocmostSecret = true) {
rateLimitGlobalWindow: 60,
rateLimitMutationMax: 10000,
rateLimitMutationWindow: 60,
streamMaxLen: 10000,
},
// biome-ignore lint/suspicious/noExplicitAny: fake injection
baserow: {} as any,

View file

@ -57,6 +57,31 @@ export default defineConfig({
branches: 85,
statements: 85,
},
// R3.1.b : domain/event.ts — 100% coverage cible (schema pur, pas de branche complexe).
'src/domain/event.ts': {
lines: 100,
functions: 100,
branches: 100,
statements: 100,
},
// R3.1.b : event-bus.ts — 100% lines/funcs/stmts, branches >= 90%.
// Branches non couverts (94%) : operateur ?? sur xadd null et checks
// signal.aborted inline que l'on couvre via tests d'abort direct.
'src/lib/event-bus.ts': {
lines: 100,
functions: 100,
branches: 90,
statements: 100,
},
// R3.1.b : events.ts (route SSE) — lignes 208-210 (heartbeat guard already-closed)
// et 246-253 (Hono onError safety net) non atteignables via les tests unitaires
// Hono sans setup serveur reel. Seuil >= 78% lines, >= 70% branches.
'src/routes/events.ts': {
lines: 78,
functions: 85,
branches: 70,
statements: 78,
},
},
},
passWithNoTests: true,