From 359b1cb4d71caa4f0890634321a3c14d3fe0fdce Mon Sep 17 00:00:00 2001 From: Borewit Date: Sat, 8 Feb 2025 16:18:39 +0100 Subject: [PATCH] =?UTF-8?q?Replace=20the=20single=20=E2=80=9CidlePromise?= =?UTF-8?q?=E2=80=9D=20and=20=E2=80=9CresolveIdle=E2=80=9D=20mechanism=20w?= =?UTF-8?q?ith=20a=20small=20notification=20system=20that=20supports=20mul?= =?UTF-8?q?tiple=20waiters.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biome.jsonc | 3 +- src/bounded-queue.ts | 134 ++++++++++++++++++++++++++----------------- 2 files changed, 84 insertions(+), 53 deletions(-) diff --git a/biome.jsonc b/biome.jsonc index fc41d72..2e182a8 100644 --- a/biome.jsonc +++ b/biome.jsonc @@ -33,7 +33,8 @@ "style":{ "useConsistentBuiltinInstantiation": "error", "useThrowNewError": "error", - "useThrowOnlyError": "error" + "useThrowOnlyError": "error", + "noNonNullAssertion": "off" } } }, diff --git a/src/bounded-queue.ts b/src/bounded-queue.ts index e4cc770..d178109 100644 --- a/src/bounded-queue.ts +++ b/src/bounded-queue.ts @@ -1,11 +1,16 @@ -export type Producer = () => Promise; +export type Producer = () => Promise; export type Consumer = (item: ItemType) => Promise; +/** + * A bounded asynchronous queue that fills items via a producer + * and consumes them via a consumer. + */ export class BoundedQueue { private queue: ItemType[] = []; - private idlePromise?: Promise | null = null; - private resolveIdle: (value: boolean) => void = () => undefined; - private endOfProduction = false; + private productionEnded = false; + // When the queue is full (for the producer) or empty (for the consumer), + // the corresponding loop waits on one or more of these resolvers. + private waiters: (() => void)[] = []; constructor( private maxQueueSize: number, @@ -14,73 +19,98 @@ export class BoundedQueue { ) { } - private async asyncFillQueue(): Promise { - do { - while (this.queue.length < this.maxQueueSize) { - const batch = await this.producer(); - if (batch === null) { - this.endOfProduction = true; - this.wakeUp(); - return; - } - this.queue.push(batch); - this.wakeUp(); - } - } while (await this.idleWait()); + /** + * Notifies all waiting producers/consumers that the queue state has changed. + */ + private notifyAll(): void { + for (const resolve of this.waiters) { + resolve(); + } + this.waiters = []; } - private async asyncEmptyQueue(): Promise { - do { - while (this.queue.length > 0) { - const batchItem = this.queue.shift() as ItemType; - this.wakeUp(); - if (batchItem === null) { - if (this.idlePromise) { - this.idlePromise = null; - this.resolveIdle(false); - } - return; - } - await this.consumer(batchItem); - } - } while (await this.idleWait()); + /** + * Returns a promise that resolves when a notification is sent. + */ + private waitForNotification(): Promise { + return new Promise(resolve => { + this.waiters.push(resolve); + }); } - private wakeUp(): void { - if (this.idlePromise) { - this.idlePromise = null; - this.resolveIdle(!this.endOfProduction); + /** + * The producer loop: repeatedly ask for new items until the producer + * returns `null`. If the queue is full, wait until consumers have removed items. + */ + private async produce(): Promise { + while (!this.productionEnded) { + // Fill the queue until full. + while (this.queue.length < this.maxQueueSize && !this.productionEnded) { + const item = await this.producer(); + if (item === null) { + this.productionEnded = true; + // Wake up any waiting consumers. + this.notifyAll(); + break; + } + this.queue.push(item); + this.notifyAll(); + } + // Wait until a consumer removes some items. + if (!this.productionEnded) { + await this.waitForNotification(); + } } } - private async idleWait(): Promise { - if (this.endOfProduction) { - return false; + /** + * The consumer loop: repeatedly removes items from the queue and + * processes them. It keeps running until production ends and the queue is empty. + */ + private async consume(): Promise { + while (!this.productionEnded || this.queue.length > 0) { + while (this.queue.length > 0) { + // Since the producer never enqueues null, we can safely assert the item exists. + const item = this.queue.shift()!; + await this.consumer(item); + this.notifyAll(); + } + // If production is complete and there are no items, exit. + if (this.productionEnded && this.queue.length === 0) { + break; + } + // Wait for new items to be enqueued. + await this.waitForNotification(); } - this.idlePromise = new Promise(resolve => { - this.resolveIdle = resolve; - }); - return this.idlePromise; } /** - * Number of items queued + * Returns the current number of items in the queue. */ - public length(): number { + public get length(): number { return this.queue.length; } + /** + * Runs the producer and consumer loops concurrently until all work is done. + */ public async run(): Promise { - await Promise.all([this.asyncFillQueue(), this.asyncEmptyQueue()]); + await Promise.all([this.produce(), this.consume()]); } } /** - * @param maxQueueSize Maximum number of items that can be in the queue. - * @param producer A function that produces items to be added to the queue. - * @param consumer A function that consumes items from the queue. - * @returns {Promise} + * Creates and runs a bounded queue that uses the given producer and consumer. + * + * @param maxQueueSize - Maximum number of items allowed in the queue. + * @param producer - A function producing items (or `null` when done). + * @param consumer - A function that consumes an item. + * @returns A promise that resolves when all production and consumption is complete. */ -export function queue(maxQueueSize: number, producer: Producer, consumer: Consumer): Promise { +export function queue( + maxQueueSize: number, + producer: Producer, + consumer: Consumer +): Promise { return new BoundedQueue(maxQueueSize, producer, consumer).run(); -} \ No newline at end of file +}