Skip to content

A lightweight task queue implementation using PostgreSQL's FOR UPDATE SKIP LOCKED feature. Supports priority scheduling, automatic retries, unique tasks, and parallel

Notifications You must be signed in to change notification settings

gentrace/simple-task-queue

Repository files navigation

Task Queue with PostgreSQL SKIP LOCKED

This repository demonstrates how to build an efficient task queue using PostgreSQL's FOR UPDATE SKIP LOCKED feature, providing row-level locking for parallel task processing without contention. This is the companion code for the blog post Simplifying Task Queues with PostgreSQL.

Features

  • Task scheduling with priorities and delays
  • Automatic retries with configurable intervals
  • Unique task constraints
  • Scheduled (recurring) tasks
  • Graceful shutdown handling
  • Parallel task processing with SKIP LOCKED

Example Tasks

  • SendEmail: Simple email task simulation
  • ProcessData: Data processing task simulation
  • DailyReport: Daily scheduled task (unique, runs every 24 hours)
  • HealthCheck: High-priority health check (unique, runs every 5 seconds, with retries)

Getting Started

  1. Install dependencies:
npm install
  1. Start PostgreSQL:
docker-compose up -d
  1. Initialize the database:
npm run prisma:generate  # Generate Prisma client
npm run prisma:migrate  # Create database tables
  1. Start the task system:
# Start both runner and scheduler
npm run start:all

# Or start them separately:
npm run start:runner    # Terminal 1
npm run start:scheduler # Terminal 2

Architecture

  • src/types.ts: Core task system types and registry
  • src/tasks.ts: Task implementations using decorators
  • src/taskQueue.ts: PostgreSQL-based queue operations
  • src/runner.ts: Task execution and retry logic
  • src/scheduler.ts: Scheduled task management

Database Schema

The system uses a simple PostgreSQL table:

CREATE TABLE "Task" (
  id         String   @id @default(uuid())
  type       String
  priority   Int      @default(100)
  payload    Json
  maturityAt DateTime @default(now())
  createdAt  DateTime @default(now())
  updatedAt  DateTime @updatedAt
)

Environment

Required environment variables (see .env.example):

DATABASE_URL="postgresql://taskuser:taskpass@localhost:6000/taskdb"

Development

  1. Create a new task:
@RegisterTask("MyTask", frequency?, { unique?: boolean })
export class MyTask extends Task {
  constructor() {
    super("MyTask", priority);
  }

  async run(): Promise<TASK_STATUS> {
    // Implementation
  }
}
  1. Add retry logic:
nextTry(): number {
  return this.tries < 3 ? 60 * 1000 : -1; // Retry 3 times with 1-minute delay
}

About

A lightweight task queue implementation using PostgreSQL's FOR UPDATE SKIP LOCKED feature. Supports priority scheduling, automatic retries, unique tasks, and parallel

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •