This repository demonstrates how to build an efficient task queue using PostgreSQL's FOR UPDATE SKIP LOCKED feature, providing row-level locking for parallel task processing without contention. This is the companion code for the blog post Simplifying Task Queues with PostgreSQL.
- Task scheduling with priorities and delays
- Automatic retries with configurable intervals
- Unique task constraints
- Scheduled (recurring) tasks
- Graceful shutdown handling
- Parallel task processing with SKIP LOCKED
SendEmail: Simple email task simulationProcessData: Data processing task simulationDailyReport: Daily scheduled task (unique, runs every 24 hours)HealthCheck: High-priority health check (unique, runs every 5 seconds, with retries)
- Install dependencies:
npm install- Start PostgreSQL:
docker-compose up -d- Initialize the database:
npm run prisma:generate # Generate Prisma client
npm run prisma:migrate # Create database tables- Start the task system:
# Start both runner and scheduler
npm run start:all
# Or start them separately:
npm run start:runner # Terminal 1
npm run start:scheduler # Terminal 2src/types.ts: Core task system types and registrysrc/tasks.ts: Task implementations using decoratorssrc/taskQueue.ts: PostgreSQL-based queue operationssrc/runner.ts: Task execution and retry logicsrc/scheduler.ts: Scheduled task management
The system uses a simple PostgreSQL table:
CREATE TABLE "Task" (
id String @id @default(uuid())
type String
priority Int @default(100)
payload Json
maturityAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
)Required environment variables (see .env.example):
DATABASE_URL="postgresql://taskuser:taskpass@localhost:6000/taskdb"
- Create a new task:
@RegisterTask("MyTask", frequency?, { unique?: boolean })
export class MyTask extends Task {
constructor() {
super("MyTask", priority);
}
async run(): Promise<TASK_STATUS> {
// Implementation
}
}- Add retry logic:
nextTry(): number {
return this.tries < 3 ? 60 * 1000 : -1; // Retry 3 times with 1-minute delay
}