Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
dump.rdb
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
.PHONY: setup
setup: ## Download project dependencies
@go mod download
@go mod tidy

.PHONY: tests
tests: unit-tests

.PHONY: unit-tests
unit-tests:
go test github.com/Henrod/task-queue/taskqueue -v -tags=unit

.PHONY: setup
setup:
.PHONY: setup/dev
setup/dev:
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest

.PHONY: lint
Expand Down
113 changes: 102 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
[Sidekiq](http://sidekiq.org/) compatible
background workers in [golang](http://golang.org/).
# Task Queue

[Sidekiq](http://sidekiq.org/) compatible background workers for [Golang](http://golang.org/).

* handles retries
* responds to Unix signals to safely wait for jobs to finish before exiting
* well tested

The application using this library will usually be divided into two independent parts: one set of producers (usually an API where the operations will be requested) and one set of consumers (usually background workers).
The application using this library will usually be divided into two independent parts: one set of producers (usually an API where the operations will be requested) and one set of consumers (usually background workers that will handle the heavy load).

## Consumer

The consumer app will be responsible for connecting to Redis and waiting for tasks to be queued by the producer.

Once the consumer detects a task, it will execute it using the provided callback function.

The consumer app will be responsible for connecting to Redis and waiting for operations to be queued by the producer. This can be done through the following source code example:
This can be done through the following source code example:

```go
package main
Expand All @@ -25,6 +32,7 @@ import (
"github.com/sirupsen/logrus"
)

// Job function that will be binded to the first queue.
func myJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
// err := doSomethingWithYourMessage
// if err != nil {
Expand All @@ -33,6 +41,7 @@ func myJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error
return nil // if no error happened during the job execution
}

// Job function that will be binded to the second queue.
func myOtherJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
// err := doSomethingWithYourMessage
// if err != nil {
Expand All @@ -45,16 +54,18 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
go handleStop(cancel)

// Configure the first queue.
firstQueueKey := "dummy-queue"
firstWorkerID := "my-worker-ID"
firstQueueOptions := &taskqueue.Options{
QueueKey: firstQueueKey,
Namespace: "my-namespace",
StorageAddress: "localhost:6379",
StorageAddress: "localhost:6379", // address of the redis instance
WorkerID: firstWorkerID,
MaxRetries: 3, // message will be discarded after 3 retries
OperationTimeout: 2 * time.Minute,
}
// Instantiate a Task Queue object for the first queue.
firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions)
if err != nil {
panic(err)
Expand All @@ -65,6 +76,7 @@ func main() {
"workerID": firstWorkerID,
})
firstLogger.Info("consuming tasks from first queue")
// Bind the first queue to the myJobFunc callback function.
firstTaskQueue.Consume(
ctx,
func(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
Expand All @@ -73,16 +85,18 @@ func main() {
},
)

// Configure the second queue.
secondQueueKey := "other-dummy-queue"
secondWorkerID := "my-other-worker-ID"
secondQueueOptions := &taskqueue.Options{
QueueKey: secondQueueKey,
Namespace: "my-namespace",
StorageAddress: "localhost:6379",
StorageAddress: "localhost:6379", // address of the redis instance
WorkerID: secondWorkerID,
MaxRetries: -1, // unlimited max retries
OperationTimeout: 1 * time.Minute,
}
// Instantiate a Task Queue object for the second queue.
secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions)
if err != nil {
panic(err)
Expand All @@ -93,6 +107,7 @@ func main() {
"worker": secondWorkerID,
})
secondLogger.Info("consuming tasks from second queue")
// Bind the second queue to the myOtherJobFunc callback function.
secondTaskQueue.Consume(
ctx,
func(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
Expand All @@ -113,6 +128,8 @@ func handleStop(cancel context.CancelFunc) {
}
```

## Producer

The producer app will be responsible for connecting to Redis and enqueueing operations to be processed by the consumer. This can be done through the following source code example:

```go
Expand All @@ -139,12 +156,14 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
go handleStop(cancel)

// Configure the first queue.
firstQueueKey := "dummy-queue"
firstQueueOptions := &taskqueue.Options{
QueueKey: firstQueueKey,
Namespace: "my-namespace",
StorageAddress: "localhost:6379",
}
// Instantiate a Task Queue object for the first queue.
firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions)
if err != nil {
panic(err)
Expand All @@ -154,6 +173,7 @@ func main() {
"queueKey": firstQueueKey,
})
firstLogger.Info("producing task in first queue")
// Produce a task to be consumed by the fist queue's consumer.
firstQueueTaskID, err := firstTaskQueue.ProduceAt(ctx, &Payload{
SomeKey: "some-value",
}, time.Now())
Expand All @@ -162,12 +182,14 @@ func main() {
}
firstLogger.Infof("enqueued task %s in first queue", firstQueueTaskID)

// Configure the second queue.
secondQueueKey := "other-dummy-queue"
secondQueueOptions := &taskqueue.Options{
QueueKey: secondQueueKey,
Namespace: "my-namespace",
StorageAddress: "localhost:6379",
}
// Instantiate a Task Queue object for the second queue.
secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions)
if err != nil {
panic(err)
Expand All @@ -177,6 +199,7 @@ func main() {
"queueKey": secondQueueKey,
})
secondLogger.Info("producing task in second queue")
// Produce a task to be consumed by the second queue's consumer.
secondQueueTaskID, err := secondTaskQueue.ProduceAt(ctx, &Payload{
SomeKey: "some-value",
}, time.Now())
Expand All @@ -197,11 +220,79 @@ func handleStop(cancel context.CancelFunc) {
}
```

To be implemented:
* supports custom middleware
* provides stats on what jobs are currently running
## Testing Locally

You can quickly run the project locally using the example file stored in [examples/simple/main.go](./examples/simple/main.go).

In order to do so, all you need to do is:

- Run a local Redis instance:

```shell
$ redis-server
```

- Run the consumer by executing the example code passing 'consumer' as argument:

```shell
$ go run examples/simple/main.go consumer
```

- If no producer is running yet, you should just see a loop of "consuming task" messages:

```shell
INFO[0000] consuming task operation=consumer
INFO[0001] consuming task operation=consumer package=taskqueue
INFO[0002] consuming task operation=consumer package=taskqueue
INFO[0003] consuming task operation=consumer package=taskqueue
INFO[0004] consuming task operation=consumer package=taskqueue
INFO[0005] consuming task operation=consumer package=taskqueue
```

- Run the producer by executing the example code passing 'producer' as argument:

```shell
$ go run examples/simple/main.go producer
```

After running these 3 commands, you should start seeing messages like the ones below:

In the producer:

```shell
INFO[0001] producing task operation=producer
INFO[0001] enqueued task 67ff2e08-ccb7-4b94-ac9e-5be97268255c operation=producer
INFO[0002] producing task operation=producer
INFO[0002] enqueued task 53679dc0-669f-4f97-89fc-431aa066c2df operation=producer
INFO[0003] producing task operation=producer
INFO[0003] enqueued task 3d6b358e-3b43-4efe-8eba-b53cf1d2855a operation=producer
INFO[0004] producing task operation=producer
INFO[0004] enqueued task d881ebc4-d8a8-4821-b89f-751d4f26c3ef operation=producer
INFO[0005] producing task operation=producer
INFO[0005] enqueued task 502ea8be-90f2-41fa-92bb-8670ed652ea1 operation=producer
```

In the consumer:

```shell
INFO[0034] consuming task operation=consumer package=taskqueue
INFO[0034] consumed task 67ff2e08-ccb7-4b94-ac9e-5be97268255c: map[Body:0] operation=consumer
INFO[0035] consuming task operation=consumer package=taskqueue
INFO[0035] consumed task 53679dc0-669f-4f97-89fc-431aa066c2df: map[Body:1] operation=consumer
INFO[0036] consuming task operation=consumer package=taskqueue
INFO[0036] consumed task 3d6b358e-3b43-4efe-8eba-b53cf1d2855a: map[Body:2] operation=consumer
INFO[0037] consuming task operation=consumer package=taskqueue
INFO[0037] consumed task d881ebc4-d8a8-4821-b89f-751d4f26c3ef: map[Body:3] operation=consumer
INFO[0038] consuming task operation=consumer package=taskqueue
INFO[0038] consumed task 502ea8be-90f2-41fa-92bb-8670ed652ea1: map[Body:4] operation=consumer
```

## To be implemented
* support custom middleware
* provide stats on what jobs are currently running

Future implementation possibilities:
## Future implementation possibilities
* customize concurrency per queue

Initial development sponsored by [Customer.io](http://customer.io)
Initial development sponsored by [Customer.io](http://customer.io).
Implementation based on [jrallison/go-workers](https://github.com/jrallison/go-workers) and [topfreegames/go-workers](https://github.com/topfreegames/go-workers).
30 changes: 30 additions & 0 deletions deprecated/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package deprecated

type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
RetryMax int `json:"retry_max,omitempty"`
At float64 `json:"at,omitempty"`
RetryOptions RetryOptions `json:"retry_options,omitempty"`
ConnectionOptions map[string]string `json:"connection_options,omitempty"`
}

type RetryOptions struct {
Exp int `json:"exp"`
MinDelay int `json:"min_delay"`
MaxDelay int `json:"max_delay"`
MaxRand int `json:"max_rand"`
}

type Conn struct {
Err func() error
Close func()
}

type Pool struct {
Get func() Conn
}

type config struct {
Pool *Pool
}
45 changes: 45 additions & 0 deletions deprecated/msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package deprecated

import (
"encoding/json"
"fmt"
"github.com/bitly/go-simplejson"
)

type data struct {
*simplejson.Json
}

type Msg struct {
*data
original string
}

func NewMsg(content string) (*Msg, error) {
if d, err := newData(content); err != nil {
return nil, err
} else {
return &Msg{d, content}, nil
}
}

func newData(content string) (*data, error) {
contentJson, err := simplejson.NewJson([]byte(content))
if err != nil {
return nil, fmt.Errorf("failed to convert content to simpleJson: %w", err)
}

contentMap := map[string]interface{}{}
contentMap["args"] = contentJson
contentMapBytes, err := json.Marshal(contentMap)
if err != nil {
return nil, fmt.Errorf("failed to marshal contentMap: %w", err)
}

parentJson, err := simplejson.NewJson(contentMapBytes)
if err != nil {
return nil, fmt.Errorf("failed to convert contentMap to simpleJson: %w", err)
}

return &data{parentJson}, nil
}
Loading