From 731d7f5e93c74486eb6a74cf8fbbf5419bb7d42e Mon Sep 17 00:00:00 2001 From: Corentin Date: Mon, 18 May 2026 14:31:50 +0000 Subject: [PATCH] feat(sync-blocks): SSE endpoint for realtime updates Client use-sync-block-realtime opened GET /v1/sync-blocks/:id/events but no such route existed -> SPA fallback served HTML 200 and the EventSource failed. Add @Sse(':id/events') wired to the existing EventEmitter2 broadcast (SYNC_BLOCK_UPDATED_EVENT), filtered per masterId, with a 25s heartbeat. Auth via the controller JwtAuthGuard (JWT strategy reads the authToken cookie EventSource sends). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../controllers/sync-blocks.controller.ts | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/apps/server/src/core/acadenice/sync-blocks/controllers/sync-blocks.controller.ts b/apps/server/src/core/acadenice/sync-blocks/controllers/sync-blocks.controller.ts index a48b1867..c4561487 100644 --- a/apps/server/src/core/acadenice/sync-blocks/controllers/sync-blocks.controller.ts +++ b/apps/server/src/core/acadenice/sync-blocks/controllers/sync-blocks.controller.ts @@ -5,12 +5,17 @@ import { Get, HttpCode, HttpStatus, + type MessageEvent, Param, ParseUUIDPipe, Patch, Post, + Sse, UseGuards, } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { fromEvent, interval, merge, type Observable } from 'rxjs'; +import { filter, map } from 'rxjs/operators'; import { ApiBearerAuth, ApiBody, @@ -24,6 +29,10 @@ import { AuthUser } from '../../../../common/decorators/auth-user.decorator'; import { AuthWorkspace } from '../../../../common/decorators/auth-workspace.decorator'; import { User, Workspace } from '@docmost/db/types/entity.types'; import { SyncBlocksService } from '../services/sync-blocks.service'; +import { + SYNC_BLOCK_UPDATED_EVENT, + type SyncBlockUpdatedPayload, +} from '../services/sync-block-broadcast.service'; import { CreateSyncBlockDto, UpdateSyncBlockDto, @@ -50,7 +59,51 @@ import { @UseGuards(JwtAuthGuard) @Controller('v1/sync-blocks') export class SyncBlocksController { - constructor(private readonly syncBlocksService: SyncBlocksService) {} + constructor( + private readonly syncBlocksService: SyncBlocksService, + private readonly eventEmitter: EventEmitter2, + ) {} + + /** + * SSE stream of updates for a single sync block. + * + * The client (use-sync-block-realtime) opens an EventSource here and + * invalidates its cache on `sync-block.updated`. Auth is the controller's + * JwtAuthGuard — EventSource cannot set headers but the JWT strategy reads + * the `authToken` cookie, sent automatically same-origin. + * + * A 25s heartbeat keeps the connection alive through proxies that drop idle + * streams (~30s). The client ignores the unknown `ping` event. + */ + @ApiOperation({ + summary: 'Sync block realtime stream (SSE)', + description: + 'Server-sent events: emits `sync-block.updated` when this block changes.', + }) + @ApiParam({ name: 'id', description: 'Sync block UUID', type: 'string' }) + @Sse(':id/events') + events( + @Param('id', ParseUUIDPipe) id: string, + ): Observable { + const updates = fromEvent( + this.eventEmitter, + SYNC_BLOCK_UPDATED_EVENT, + ).pipe( + filter((payload) => payload?.masterId === id), + map( + (payload): MessageEvent => ({ + type: 'sync-block.updated', + data: { masterId: payload.masterId }, + }), + ), + ); + + const heartbeat = interval(25_000).pipe( + map((): MessageEvent => ({ type: 'ping', data: {} })), + ); + + return merge(updates, heartbeat); + } @ApiOperation({ summary: 'Create sync block', description: 'Creates a master sync block that can be embedded by reference in multiple pages.' }) @ApiBody({ schema: { type: 'object', properties: { content: { type: 'object', description: 'ProseMirror JSON content' } } }, description: 'Initial content (optional)' })