From dbb10525961a7de85f01a612e388ea680fe424ad Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Wed, 1 Oct 2025 02:17:07 -0300 Subject: [PATCH] feat: add lock method to LockRepository for managing room locks with disposal functionality --- .../src/listeners/staging-area.listener.ts | 4 +-- .../src/queues/staging-area.queue.ts | 20 +++++++++-- .../src/repositories/lock.repository.ts | 26 +++++++++++++++ .../src/services/event.service.ts | 25 -------------- .../src/services/staging-area.service.ts | 33 ++++++------------- 5 files changed, 56 insertions(+), 52 deletions(-) diff --git a/packages/federation-sdk/src/listeners/staging-area.listener.ts b/packages/federation-sdk/src/listeners/staging-area.listener.ts index 49bf09bee..977627ebd 100644 --- a/packages/federation-sdk/src/listeners/staging-area.listener.ts +++ b/packages/federation-sdk/src/listeners/staging-area.listener.ts @@ -14,8 +14,8 @@ export class StagingAreaListener { this.stagingAreaQueue.registerHandler(this.handleQueueItem.bind(this)); } - async handleQueueItem(data: string) { + async *handleQueueItem(data: string) { this.logger.debug(`Processing event ${data}`); - await this.stagingAreaService.processEventForRoom(data); + yield* this.stagingAreaService.processEventForRoom(data); } } diff --git a/packages/federation-sdk/src/queues/staging-area.queue.ts b/packages/federation-sdk/src/queues/staging-area.queue.ts index 98815b91d..fcc92a467 100644 --- a/packages/federation-sdk/src/queues/staging-area.queue.ts +++ b/packages/federation-sdk/src/queues/staging-area.queue.ts @@ -1,7 +1,9 @@ import 'reflect-metadata'; import { singleton } from 'tsyringe'; +import { LockRepository } from '../repositories/lock.repository'; +import { ConfigService } from '../services/config.service'; -type QueueHandler = (roomId: string) => Promise; +type QueueHandler = (roomId: string) => AsyncGenerator; @singleton() export class StagingAreaQueue { @@ -9,6 +11,11 @@ export class StagingAreaQueue { private handlers: QueueHandler[] = []; private processing = false; + constructor( + private readonly lockRepository: LockRepository, + private readonly configService: ConfigService, + ) {} + enqueue(roomId: string): void { this.queue.push(roomId); this.processQueue(); @@ -31,7 +38,16 @@ export class StagingAreaQueue { if (!roomId) continue; for (const handler of this.handlers) { - await handler(roomId); + await using lock = await this.lockRepository.lock( + roomId, + this.configService.instanceId, + ); + if (!lock.success) { + continue; + } + for await (const _ of handler(roomId)) { + await lock.update(); + } } } } finally { diff --git a/packages/federation-sdk/src/repositories/lock.repository.ts b/packages/federation-sdk/src/repositories/lock.repository.ts index 4fe294377..fa1c66eac 100644 --- a/packages/federation-sdk/src/repositories/lock.repository.ts +++ b/packages/federation-sdk/src/repositories/lock.repository.ts @@ -16,6 +16,32 @@ export class LockRepository { this.collection.createIndex({ roomId: 1 }, { unique: true }); } + async lock( + roomId: string, + instanceId: string, + ): Promise<{ + success: boolean; + update: () => Promise; + [Symbol.asyncDispose]: () => Promise; + }> { + const lock = await this.getLock(roomId, instanceId); + return { + success: lock, + update: async () => { + if (!lock) { + return; + } + return this.updateLockTimestamp(roomId, instanceId); + }, + [Symbol.asyncDispose]: async () => { + if (!lock) { + return; + } + return this.releaseLock(roomId, instanceId); + }, + }; + } + async getLock(roomId: string, instanceId: string): Promise { const timedout = new Date(); timedout.setTime(timedout.getTime() - 2 * 60 * 1000); // 2 minutes ago diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 263339d09..16229aa8e 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -199,19 +199,6 @@ export class EventService { // save the event as staged to be processed await this.eventStagingRepository.create(eventId, origin, event); - // acquire a lock for processing the event - const lock = await this.lockRepository.getLock( - roomId, - this.configService.instanceId, - ); - if (!lock) { - this.logger.debug(`Couldn't acquire a lock for room ${roomId}`); - continue; - } - - // if we have a lock, we can process the event - // void this.stagingAreaService.processEventForRoom(roomId); - // TODO change this to call stagingAreaService directly (line above) this.stagingAreaQueue.enqueue(roomId); } @@ -664,18 +651,6 @@ export class EventService { // not we try to process one room at a time for await (const roomId of rooms) { - const lock = await this.lockRepository.getLock( - roomId, - this.configService.instanceId, - ); - if (!lock) { - this.logger.debug(`Couldn't acquire a lock for room ${roomId}`); - continue; - } - - // if we have a lock, we can process the event - // void this.stagingAreaService.processEventForRoom(roomId); - // TODO change this to call stagingAreaService directly (line above) this.stagingAreaQueue.enqueue(roomId); diff --git a/packages/federation-sdk/src/services/staging-area.service.ts b/packages/federation-sdk/src/services/staging-area.service.ts index 25cddf4ab..d083facd4 100644 --- a/packages/federation-sdk/src/services/staging-area.service.ts +++ b/packages/federation-sdk/src/services/staging-area.service.ts @@ -2,16 +2,15 @@ import type { EventBase, EventStagingStore, Membership, + MessageType, } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; +import { createLogger } from '@rocket.chat/federation-core'; import { - MessageType, - createLogger, - isRedactedEvent, -} from '@rocket.chat/federation-core'; -import { PduPowerLevelsEventContent } from '@rocket.chat/federation-room'; -import type { EventID } from '@rocket.chat/federation-room'; + EventID, + PduPowerLevelsEventContent, +} from '@rocket.chat/federation-room'; import { EventAuthorizationService } from './event-authorization.service'; import { EventEmitterService } from './event-emitter.service'; import { EventService } from './event.service'; @@ -48,15 +47,10 @@ export class StagingAreaService { return [authEvents, prevEvents]; } - async processEventForRoom(roomId: string) { + async *processEventForRoom(roomId: string) { let event = await this.eventService.getLeastDepthEventForRoom(roomId); if (!event) { this.logger.debug({ msg: 'No staged event found for room', roomId }); - await this.lockRepository.releaseLock( - roomId, - this.configService.instanceId, - ); - return; } while (event) { @@ -92,19 +86,12 @@ export class StagingAreaService { // if we got an event, we need to update the lock's timestamp to avoid it being timed out // and acquired by another instance while we're processing a batch of events for this room + if (event) { - await this.lockRepository.updateLockTimestamp( - roomId, - this.configService.instanceId, - ); + yield event; } } - - // release the lock after processing - await this.lockRepository.releaseLock( - roomId, - this.configService.instanceId, - ); + this.logger.debug({ msg: 'No more events to process for room', roomId }); } private async processDependencyStage(event: EventStagingStore) { @@ -223,7 +210,7 @@ export class StagingAreaService { }); break; } - case isRedactedEvent(event.event): { + case event.event.type === 'm.room.redaction': { this.eventEmitterService.emit('homeserver.matrix.redaction', { event_id: eventId, room_id: roomId,