Reduce serialization errors with partitioned queues on PostgreSQL#155
Reduce serialization errors with partitioned queues on PostgreSQL#155
Conversation
There was a problem hiding this comment.
Pull request overview
This PR reduces serialization errors with partitioned queues on PostgreSQL by implementing partition-aware composite indexes and query optimizations. The changes address an issue where partitioned queues competed with each other during concurrent operations due to insufficient index specificity.
Changes:
- Added partition-aware composite indexes (including
pathcolumn) to prevent cross-partition conflicts - Introduced
FOR UPDATElock insetStatus()to prevent read-write conflicts - Added
ORDER BY RANDOM()to queries to distribute worker contention - Implemented schema versioning with migrations to support the new indexes
- Added serialization retry tracking for testing and instrumentation
- Created comprehensive tests validating reduced serialization conflicts
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/lib/queue/drivers/queue_postgres.ts | Implements schema versioning, partition-aware indexes, FOR UPDATE locks, random query ordering, and serialization retry tracking |
| src/lib/queue/index.test.ts | Adds comprehensive concurrent operation tests validating serialization conflict reduction with single and multiple workers |
| if (currentVersion < 1) { | ||
| logger?.debug('Applying schema version 1: Initial tables and indexes'); | ||
|
|
||
| await client.query('BEGIN'); |
There was a problem hiding this comment.
Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).
| await client.query('BEGIN'); | |
| await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED'); |
| await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_idempotent_keys_path_idempotent_id ON queue_idempotent_keys(path, idempotent_id)'); | ||
|
|
||
| // Now transactionally drop old indexes and record version | ||
| await client.query('BEGIN'); |
There was a problem hiding this comment.
Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).
| await client.query('BEGIN'); | |
| await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED'); |
| } | ||
|
|
||
| /* Add multiple entries to each partition */ | ||
| const entriesPerPartition = 2000; |
There was a problem hiding this comment.
Creating 2000 entries per partition (6000 total) but only processing 10 items per worker seems wasteful. The test would be faster and equally effective with a smaller number like 100-200 entries per partition, which still provides ample concurrency testing while reducing setup time.
| const entriesPerPartition = 2000; | |
| const entriesPerPartition = 200; |
| * - Multiple workers query() for pending items (creates read dependency) | ||
| * - Then one worker's setStatus() with FOR UPDATE conflicts with another's read | ||
| */ | ||
| const maxExpectedRetries = kind === 'single' ? 3 : 10; |
There was a problem hiding this comment.
The magic numbers 3 and 10 for retry thresholds lack documentation. Consider adding a comment explaining why these specific values were chosen or extracting them as named constants with explanatory names like MAX_RETRIES_SINGLE_WORKER and MAX_RETRIES_MULTIPLE_WORKERS.
| } | ||
|
|
||
| /* Each partition adds items and updates status concurrently */ | ||
| for (let workerIdx = 0; workerIdx < (kind === 'single' ? 1 : 3); workerIdx++) { |
There was a problem hiding this comment.
The condition kind === 'single' ? 1 : 3 is checked inside the loop but kind is 'single' at this point in the code (line 1988 checks if (kind === 'single')). This should always evaluate to 1. If multiple workers are intended here, this logic needs to be moved outside the if (kind === 'single') block.
| for (let workerIdx = 0; workerIdx < (kind === 'single' ? 1 : 3); workerIdx++) { | |
| for (let workerIdx = 0; workerIdx < 1; workerIdx++) { |
This change fixes a bug where partitioned queues may compete with each other in the PostgreSQL DB internal locking to run queues because the indexes were not sufficiently specific.