Queueing package based on postgres written in Go.
This queuer is meant to be as easy as possible to use. No specific function signature (except for an error as the last output parameter, if you want to give back an error), easy setup and still fast.
The job table contains only queued, scheduled and running tasks. The ended jobs (succeeded, cancelled, failed) are moved to a timescaleDB table.
To integrate the queuer package into your Go project, use the standard go get command:
go get github.com/siherrmann/queuer
To use the package you also need a running postgres database with the timescaleDB extension. You can use the docker-compose.yml file in the example folder or start a Docker container with the timescale/timescaledb:latest-pg17 image.
The full initialisation is (in the easiest case):
// Create a new queuer instance
q := queuer.NewQueuer("exampleWorker", 3)
// Add a task to the queuer
q.AddTask(ExampleTask)
// Start the queuer
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q.Start(ctx, cancel)That's easy, right? Adding a job is just as easy:
// Add a job to the queue
_, err := q.AddJob(ExampleTask, 5, "12")
if err != nil {
log.Fatalf("Error adding job: %v", err)
}In the initialisation of the queuer the existance of the necessary database tables is checked and if they don't exist they get created. The database is configured with these environment variables:
QUEUER_DB_HOST=localhost
QUEUER_DB_PORT=5432
QUEUER_DB_DATABASE=postgres
QUEUER_DB_USERNAME=username
QUEUER_DB_PASSWORD=password1234
QUEUER_DB_SCHEMA=publicYou can find a full example (the same as above plus a more detailed example) in the example folder. In there you'll also find a docker-compose file with the timescaleDB/postgres service that is needed for the running the queuer (it's just postgres with an extension).
NewQueuer is a convenience constructor that creates a new Queuer instance using default database configuration derived from environment variables. It acts as a wrapper around NewQueuerWithDB. The encryption key for the database is taken from the QUEUER_ENCRYPTION_KEY environment variable; if not provided, it defaults to unencrypted results.
NewQueuerWithDB is the primary constructor for creating a new Queuer instance. It allows for explicit database configuration and encryption key specification, and initializes all necessary components, including database handlers, internal event listeners, and the worker.
func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer
func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuername: Astringidentifier for this queuer instance.maxConcurrency: Anintspecifying the maximum number of jobs this queuer can process concurrently.encryptionKey: Astringused for encrypting sensitive job data in the database. If empty, results will be stored unencrypted.dbConfig: An optional*helper.DatabaseConfiguration. If nil, the configuration will be loaded from environment variables.options: OptionalOnErrorconfigurations to apply to the worker.
This function performs the following setup:
- Initializes a logger.
- Sets up the database connection using the provided
dbConfigor environment variables. - Creates
JobDBHandler,WorkerDBHandler, andMasterDBHandlerinstances for database interactions. - Initializes internal
core.Listenerinstances forjobInsert,jobUpdate, andjobDeleteevents. - Creates and inserts a new
model.Workerinto the database based on the providedname,maxConcurrency, andoptions. - If any critical error occurs during this initialization (e.g., database connection failure, worker creation error), the function will log a panic error and exit the program. It returns a pointer to the newly configured
Queuerinstance.
The Start method initiates the operational lifecycle of the Queuer. It sets up the main context, initializes database listeners, and begins the job processing and polling loops in a dedicated goroutine.
func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings)ctx: The parent context.Context for the queuer's operations. This context will control the overall lifetime of the queuer.cancel: The context.CancelFunc associated with the provided ctx. This function should be called to gracefully stop the queuer.masterSettings: The central settings set will be set if the current worker becomes the master.
Upon calling Start:
- It performs a basic check to ensure internal listeners are initialized.
- Db listeners and broadcasters are created to listen to job events (inserts, updates, deletes).
- It starts a poller to periodically poll the database for new jobs to process (5 minute interval).
- It signals its readiness via an internal channel, ensuring the
Startmethod returns only when the core loops are active. - If
MasterSettingsare given it sets the current worker as master if none is active. If the current worker is the master it starts a ticker that updates the master entry, else it starts a ticker that checks for a missing master. If noMasterSettingsare given, no ticker gets started.
The method includes a timeout mechanism (5 seconds) to detect if the queuer fails to start its internal processes promptly, panicking if the timeout is exceeded.
If the queuer is not not properly initialized (created by calling NewQueuer), or if there's an error creating the database listeners, the function will panic.
The StartWithoutWorker method provides a way to start the Queuer instance without an active worker. This is particularly useful for scenarios where you need to interact with the job queue (e.g., add jobs, check job status) but don't intend for this specific instance to actively process them. This is also nice to only have one service that can become the master so updating the MasterSettings only requires this serivce to be restarted. This has also the (very small) benefit that all other services don't run a ticker for updating or becoming the master.
func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings)ctx: The parent context.Context for the queuer's operations.cancel: The context.CancelFunc associated with the provided ctx.withoutListeners: Aboolflag. If true, the database.NewQueuerDBListener instances for job and job_archive tables will not be created.masterSettings: The central settings set will be set if the current worker becomes the master.
The Stop method gracefully shuts down the Queuer instance, releasing resources and ensuring ongoing operations are properly concluded.
func (q *Queuer) Stop() errorThe Stop method cancels all jobs, closes db listeners and returns an error if any step of the stopping process encounters an issue
The AddTask method registers a new job task with the queuer. A task is the actual function that will be executed when a job associated with it is processed.
func (q *Queuer) AddTask(task interface{}) *model.Task
func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Tasktask: Aninterface{}representing the function that will serve as the job's executable logic. The queuer will automatically derive a name for this task based on its function signature (e.g.,main.MyTaskFunction). The derived name must be unique if nonameis given.name: Astringspecifying the custom name for this task. This name must be unique within the queuer's tasks.
This method handles the registration of a task, making the worker able to pick up and execute a job of this task type. It also updates the worker's available tasks in the database. The task should be added before starting the queuer. If there's an issue during task creation or database update, the program will panic.
The AddNextIntervalFunc method registers a custom function that determines the next execution time for scheduled jobs. This is useful for implementing complex scheduling logic beyond simple fixed intervals.
func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker
func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Workernif: An instance ofmodel.NextIntervalFunc, which is a function type defining custom logic for calculating the next interval. The queuer will automatically derive a name for this function. The derived name must be unique if nonameis given.name: A string specifying the custom name for thisNextIntervalFunc. This name must be unique within the queuer's NextIntervalFuncs.
This method adds the provided NextIntervalFunc to the queuer's available functions, making it usable for jobs with custom scheduling requirements. It updates the worker's configuration in the database. If nif is nil, if the function name cannot be derived, or if a function with the same name already exists, the program will panic.
The OnError struct defines how a worker should handle errors when processing a job. This allows for configurable retry behavior.
type OnError struct {
Timeout float64 `json:"timeout"`
MaxRetries int `json:"max_retries"`
RetryDelay float64 `json:"retry_delay"`
RetryBackoff string `json:"retry_backoff"`
}Timeout: The maximum time (in seconds) allowed for a single attempt of a job. If the job exceeds this duration, it's considered to have timed out.MaxRetries: The maximum number of times a job will be retried after a failure.RetryDelay: The initial delay (in seconds) before the first retry attempt. This delay can be modified by theRetryBackoffstrategy.RetryBackoff: Specifies the strategy used to increase the delay between subsequent retries.
The RetryBackoff constant defines the available strategies for increasing retry delays:
const (
RETRY_BACKOFF_NONE = "none"
RETRY_BACKOFF_LINEAR = "linear"
RETRY_BACKOFF_EXPONENTIAL = "exponential"
)RETRY_BACKOFF_NONE: No backoff. The RetryDelay remains constant for all retries.RETRY_BACKOFF_LINEAR: The retry delay increases linearly with each attempt (e.g., delay, 2delay, 3delay).RETRY_BACKOFF_EXPONENTIAL: The retry delay increases exponentially with each attempt (e.g., delay, delay2, delay2*2).
Job Options The Options struct allows you to define specific behaviors for individual jobs, overriding default worker settings where applicable.
type Options struct {
OnError *OnError
Schedule *Schedule
}OnError: An optionalOnErrorconfiguration that will override the worker's default error handling for this specific job. This allows you to define unique retry logic per job.Schedule: An optionalScheduleconfiguration for jobs that need to be executed at recurring intervals.
OnError for Jobs The OnError struct for jobs is identical to the one used for worker options, allowing granular control over error handling for individual jobs.
type OnError struct {
Timeout float64 `json:"timeout"`
MaxRetries int `json:"max_retries"`
RetryDelay float64 `json:"retry_delay"`
RetryBackoff string `json:"retry_backoff"`
}The Schedule struct is used to define recurring jobs.
type Schedule struct {
Start time.Time `json:"start"`
MaxCount int `json:"max_count"`
Interval time.Duration `json:"interval"`
NextInterval string `json:"next_interval"`
}Start: The initial time at which the scheduled job should first run.MaxCount: The maximum number of times the job should be executed. A value 0 indicates an indefinite number of repetitions (run forever).Interval: The duration between consecutive executions of the scheduled job.NextInterval: Function name of theNextIntervalFuncreturning the time of the next execution of the scheduled job. EitherIntervalorNextIntervalhave to be set if theMaxCountis 0 or greater 1.
The queuer package includes a small command-line interface (CLI) tool for monitoring your job queues. The CLI provides easy access to view jobs, workers, connections, and archived data.
The CLI tool can be built from the cli directory:
cd cli
go build -o queuer .The CLI tool supports the following main commands:
-
list- List queuer resources with pagination supportlist job- List active jobs (queued, scheduled, running)list worker- List registered workers and their statuslist connection- List active database connectionslist jobArchive- List completed/archived jobs
-
get- Get detailed information about a specific resource by RIDget job --rid <RID>- Get details of a specific jobget worker --rid <RID>- Get worker information and statusget jobArchive --rid <RID>- Get archived job details
-
cancel- Cancel operations on specific resources by RIDcancel job --rid <RID>- Cancel a running or queued jobcancel worker --rid <RID>- Cancel/shutdown a worker
version- Display version information of the Queuer CLIcompletion- Generate autocompletion scripts for various shellshelp- Display help information for any command
-v, --verbose- Enable verbose output for detailed information-h, --help- Show help for any command
List commands support pagination through:
--lastId <int>- Last ID from previous call for pagination--limit <int>- Maximum number of entries to return (default: 10)
# List workers with pagination
queuer list worker --limit 5
# Get specific job details
queuer get job --rid "550e8400-e29b-41d4-a716-446655440000"
# Cancel a running job
queuer cancel job --rid "550e8400-e29b-41d4-a716-446655440000"
# List jobs with pagination
queuer list job --lastId 100 --limit 20For detailed information about any command, use the built-in help:
queuer --help # General help
queuer list --help # Help for list commands
queuer get job --help # Help for specific subcommandsThe CLI tool uses the same database configuration environment variables as the main queuer package.
- Insert job batches using the
COPY FROMpostgres feature. - Insert a job in a transaction to rollback if eg. the step after job insertion fails.
- Panic recovery for all running jobs.
- Error handling by checking last output parameter for error.
- Multiple queuers can be started in different microservices while maintaining job start order and isolation.
- Scheduled and periodic jobs.
- Easy functions to get jobs and workers.
- Listener functions for job updates and deletion (ended jobs).
- Helper function to listen for a specific finished job.
- Retry mechanism for ended jobs which creates a new job with the same parameters.
- Custom NextInterval functions to address custom needs for scheduling (eg. scheduling with timezone offset)
- Automatic master worker setting retention and other central settings. Automatic switch to new master if old worker stops.
- Heartbeat for all workers and automatic stale worker detection and cancelation by the master.
- Encryption support for sensitive job data stored in the database.
- Command-line interface (CLI) tool for monitoring job queues.