feat(sync-blocks): SSE endpoint for realtime updates

Client use-sync-block-realtime opened GET /v1/sync-blocks/:id/events
but no such route existed -> SPA fallback served HTML 200 and the
EventSource failed. Add @Sse(':id/events') wired to the existing
EventEmitter2 broadcast (SYNC_BLOCK_UPDATED_EVENT), filtered per
masterId, with a 25s heartbeat. Auth via the controller JwtAuthGuard
(JWT strategy reads the authToken cookie EventSource sends).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Corentin JOGUET 2026-05-18 14:31:50 +00:00
parent 7a11ff4e85
commit 731d7f5e93

View file

@ -5,12 +5,17 @@ import {
Get,
HttpCode,
HttpStatus,
type MessageEvent,
Param,
ParseUUIDPipe,
Patch,
Post,
Sse,
UseGuards,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { fromEvent, interval, merge, type Observable } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import {
ApiBearerAuth,
ApiBody,
@ -24,6 +29,10 @@ import { AuthUser } from '../../../../common/decorators/auth-user.decorator';
import { AuthWorkspace } from '../../../../common/decorators/auth-workspace.decorator';
import { User, Workspace } from '@docmost/db/types/entity.types';
import { SyncBlocksService } from '../services/sync-blocks.service';
import {
SYNC_BLOCK_UPDATED_EVENT,
type SyncBlockUpdatedPayload,
} from '../services/sync-block-broadcast.service';
import {
CreateSyncBlockDto,
UpdateSyncBlockDto,
@ -50,7 +59,51 @@ import {
@UseGuards(JwtAuthGuard)
@Controller('v1/sync-blocks')
export class SyncBlocksController {
constructor(private readonly syncBlocksService: SyncBlocksService) {}
constructor(
private readonly syncBlocksService: SyncBlocksService,
private readonly eventEmitter: EventEmitter2,
) {}
/**
* SSE stream of updates for a single sync block.
*
* The client (use-sync-block-realtime) opens an EventSource here and
* invalidates its cache on `sync-block.updated`. Auth is the controller's
* JwtAuthGuard EventSource cannot set headers but the JWT strategy reads
* the `authToken` cookie, sent automatically same-origin.
*
* A 25s heartbeat keeps the connection alive through proxies that drop idle
* streams (~30s). The client ignores the unknown `ping` event.
*/
@ApiOperation({
summary: 'Sync block realtime stream (SSE)',
description:
'Server-sent events: emits `sync-block.updated` when this block changes.',
})
@ApiParam({ name: 'id', description: 'Sync block UUID', type: 'string' })
@Sse(':id/events')
events(
@Param('id', ParseUUIDPipe) id: string,
): Observable<MessageEvent> {
const updates = fromEvent<SyncBlockUpdatedPayload>(
this.eventEmitter,
SYNC_BLOCK_UPDATED_EVENT,
).pipe(
filter((payload) => payload?.masterId === id),
map(
(payload): MessageEvent => ({
type: 'sync-block.updated',
data: { masterId: payload.masterId },
}),
),
);
const heartbeat = interval(25_000).pipe(
map((): MessageEvent => ({ type: 'ping', data: {} })),
);
return merge(updates, heartbeat);
}
@ApiOperation({ summary: 'Create sync block', description: 'Creates a master sync block that can be embedded by reference in multiple pages.' })
@ApiBody({ schema: { type: 'object', properties: { content: { type: 'object', description: 'ProseMirror JSON content' } } }, description: 'Initial content (optional)' })