diff --git a/.gitignore b/.gitignore index 485dee6..d09f079 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +dump.rdb \ No newline at end of file diff --git a/Makefile b/Makefile index c82fcca..a3d7c97 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,8 @@ +.PHONY: setup +setup: ## Download project dependencies + @go mod download + @go mod tidy + .PHONY: tests tests: unit-tests @@ -5,8 +10,8 @@ tests: unit-tests unit-tests: go test github.com/Henrod/task-queue/taskqueue -v -tags=unit -.PHONY: setup -setup: +.PHONY: setup/dev +setup/dev: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest .PHONY: lint diff --git a/README.md b/README.md index 008b201..4faad54 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,20 @@ -[Sidekiq](http://sidekiq.org/) compatible -background workers in [golang](http://golang.org/). +# Task Queue + +[Sidekiq](http://sidekiq.org/) compatible background workers for [Golang](http://golang.org/). * handles retries * responds to Unix signals to safely wait for jobs to finish before exiting * well tested -The application using this library will usually be divided into two independent parts: one set of producers (usually an API where the operations will be requested) and one set of consumers (usually background workers). +The application using this library will usually be divided into two independent parts: one set of producers (usually an API where the operations will be requested) and one set of consumers (usually background workers that will handle the heavy load). + +## Consumer + +The consumer app will be responsible for connecting to Redis and waiting for tasks to be queued by the producer. + +Once the consumer detects a task, it will execute it using the provided callback function. -The consumer app will be responsible for connecting to Redis and waiting for operations to be queued by the producer. This can be done through the following source code example: +This can be done through the following source code example: ```go package main @@ -25,6 +32,7 @@ import ( "github.com/sirupsen/logrus" ) +// Job function that will be binded to the first queue. func myJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error { // err := doSomethingWithYourMessage // if err != nil { @@ -33,6 +41,7 @@ func myJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error return nil // if no error happened during the job execution } +// Job function that will be binded to the second queue. func myOtherJobFunc(ctx context.Context, taskID uuid.UUID, payload interface{}) error { // err := doSomethingWithYourMessage // if err != nil { @@ -45,16 +54,18 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) go handleStop(cancel) + // Configure the first queue. firstQueueKey := "dummy-queue" firstWorkerID := "my-worker-ID" firstQueueOptions := &taskqueue.Options{ QueueKey: firstQueueKey, Namespace: "my-namespace", - StorageAddress: "localhost:6379", + StorageAddress: "localhost:6379", // address of the redis instance WorkerID: firstWorkerID, MaxRetries: 3, // message will be discarded after 3 retries OperationTimeout: 2 * time.Minute, } + // Instantiate a Task Queue object for the first queue. firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions) if err != nil { panic(err) @@ -65,6 +76,7 @@ func main() { "workerID": firstWorkerID, }) firstLogger.Info("consuming tasks from first queue") + // Bind the first queue to the myJobFunc callback function. firstTaskQueue.Consume( ctx, func(ctx context.Context, taskID uuid.UUID, payload interface{}) error { @@ -73,16 +85,18 @@ func main() { }, ) + // Configure the second queue. secondQueueKey := "other-dummy-queue" secondWorkerID := "my-other-worker-ID" secondQueueOptions := &taskqueue.Options{ QueueKey: secondQueueKey, Namespace: "my-namespace", - StorageAddress: "localhost:6379", + StorageAddress: "localhost:6379", // address of the redis instance WorkerID: secondWorkerID, MaxRetries: -1, // unlimited max retries OperationTimeout: 1 * time.Minute, } + // Instantiate a Task Queue object for the second queue. secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions) if err != nil { panic(err) @@ -93,6 +107,7 @@ func main() { "worker": secondWorkerID, }) secondLogger.Info("consuming tasks from second queue") + // Bind the second queue to the myOtherJobFunc callback function. secondTaskQueue.Consume( ctx, func(ctx context.Context, taskID uuid.UUID, payload interface{}) error { @@ -113,6 +128,8 @@ func handleStop(cancel context.CancelFunc) { } ``` +## Producer + The producer app will be responsible for connecting to Redis and enqueueing operations to be processed by the consumer. This can be done through the following source code example: ```go @@ -139,12 +156,14 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) go handleStop(cancel) + // Configure the first queue. firstQueueKey := "dummy-queue" firstQueueOptions := &taskqueue.Options{ QueueKey: firstQueueKey, Namespace: "my-namespace", StorageAddress: "localhost:6379", } + // Instantiate a Task Queue object for the first queue. firstTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(firstQueueOptions), firstQueueOptions) if err != nil { panic(err) @@ -154,6 +173,7 @@ func main() { "queueKey": firstQueueKey, }) firstLogger.Info("producing task in first queue") + // Produce a task to be consumed by the fist queue's consumer. firstQueueTaskID, err := firstTaskQueue.ProduceAt(ctx, &Payload{ SomeKey: "some-value", }, time.Now()) @@ -162,12 +182,14 @@ func main() { } firstLogger.Infof("enqueued task %s in first queue", firstQueueTaskID) + // Configure the second queue. secondQueueKey := "other-dummy-queue" secondQueueOptions := &taskqueue.Options{ QueueKey: secondQueueKey, Namespace: "my-namespace", StorageAddress: "localhost:6379", } + // Instantiate a Task Queue object for the second queue. secondTaskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(secondQueueOptions), secondQueueOptions) if err != nil { panic(err) @@ -177,6 +199,7 @@ func main() { "queueKey": secondQueueKey, }) secondLogger.Info("producing task in second queue") + // Produce a task to be consumed by the second queue's consumer. secondQueueTaskID, err := secondTaskQueue.ProduceAt(ctx, &Payload{ SomeKey: "some-value", }, time.Now()) @@ -197,11 +220,79 @@ func handleStop(cancel context.CancelFunc) { } ``` -To be implemented: -* supports custom middleware -* provides stats on what jobs are currently running +## Testing Locally + +You can quickly run the project locally using the example file stored in [examples/simple/main.go](./examples/simple/main.go). + +In order to do so, all you need to do is: + +- Run a local Redis instance: + +```shell +$ redis-server +``` + +- Run the consumer by executing the example code passing 'consumer' as argument: + +```shell +$ go run examples/simple/main.go consumer +``` + +- If no producer is running yet, you should just see a loop of "consuming task" messages: + +```shell +INFO[0000] consuming task operation=consumer +INFO[0001] consuming task operation=consumer package=taskqueue +INFO[0002] consuming task operation=consumer package=taskqueue +INFO[0003] consuming task operation=consumer package=taskqueue +INFO[0004] consuming task operation=consumer package=taskqueue +INFO[0005] consuming task operation=consumer package=taskqueue +``` + +- Run the producer by executing the example code passing 'producer' as argument: + +```shell +$ go run examples/simple/main.go producer +``` + +After running these 3 commands, you should start seeing messages like the ones below: + +In the producer: + +```shell +INFO[0001] producing task operation=producer +INFO[0001] enqueued task 67ff2e08-ccb7-4b94-ac9e-5be97268255c operation=producer +INFO[0002] producing task operation=producer +INFO[0002] enqueued task 53679dc0-669f-4f97-89fc-431aa066c2df operation=producer +INFO[0003] producing task operation=producer +INFO[0003] enqueued task 3d6b358e-3b43-4efe-8eba-b53cf1d2855a operation=producer +INFO[0004] producing task operation=producer +INFO[0004] enqueued task d881ebc4-d8a8-4821-b89f-751d4f26c3ef operation=producer +INFO[0005] producing task operation=producer +INFO[0005] enqueued task 502ea8be-90f2-41fa-92bb-8670ed652ea1 operation=producer +``` + +In the consumer: + +```shell +INFO[0034] consuming task operation=consumer package=taskqueue +INFO[0034] consumed task 67ff2e08-ccb7-4b94-ac9e-5be97268255c: map[Body:0] operation=consumer +INFO[0035] consuming task operation=consumer package=taskqueue +INFO[0035] consumed task 53679dc0-669f-4f97-89fc-431aa066c2df: map[Body:1] operation=consumer +INFO[0036] consuming task operation=consumer package=taskqueue +INFO[0036] consumed task 3d6b358e-3b43-4efe-8eba-b53cf1d2855a: map[Body:2] operation=consumer +INFO[0037] consuming task operation=consumer package=taskqueue +INFO[0037] consumed task d881ebc4-d8a8-4821-b89f-751d4f26c3ef: map[Body:3] operation=consumer +INFO[0038] consuming task operation=consumer package=taskqueue +INFO[0038] consumed task 502ea8be-90f2-41fa-92bb-8670ed652ea1: map[Body:4] operation=consumer +``` + +## To be implemented +* support custom middleware +* provide stats on what jobs are currently running -Future implementation possibilities: +## Future implementation possibilities * customize concurrency per queue -Initial development sponsored by [Customer.io](http://customer.io) +Initial development sponsored by [Customer.io](http://customer.io). +Implementation based on [jrallison/go-workers](https://github.com/jrallison/go-workers) and [topfreegames/go-workers](https://github.com/topfreegames/go-workers). diff --git a/deprecated/config.go b/deprecated/config.go new file mode 100644 index 0000000..9e29d09 --- /dev/null +++ b/deprecated/config.go @@ -0,0 +1,30 @@ +package deprecated + +type EnqueueOptions struct { + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` + ConnectionOptions map[string]string `json:"connection_options,omitempty"` +} + +type RetryOptions struct { + Exp int `json:"exp"` + MinDelay int `json:"min_delay"` + MaxDelay int `json:"max_delay"` + MaxRand int `json:"max_rand"` +} + +type Conn struct { + Err func() error + Close func() +} + +type Pool struct { + Get func() Conn +} + +type config struct { + Pool *Pool +} diff --git a/deprecated/msg.go b/deprecated/msg.go new file mode 100644 index 0000000..487e635 --- /dev/null +++ b/deprecated/msg.go @@ -0,0 +1,45 @@ +package deprecated + +import ( + "encoding/json" + "fmt" + "github.com/bitly/go-simplejson" +) + +type data struct { + *simplejson.Json +} + +type Msg struct { + *data + original string +} + +func NewMsg(content string) (*Msg, error) { + if d, err := newData(content); err != nil { + return nil, err + } else { + return &Msg{d, content}, nil + } +} + +func newData(content string) (*data, error) { + contentJson, err := simplejson.NewJson([]byte(content)) + if err != nil { + return nil, fmt.Errorf("failed to convert content to simpleJson: %w", err) + } + + contentMap := map[string]interface{}{} + contentMap["args"] = contentJson + contentMapBytes, err := json.Marshal(contentMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal contentMap: %w", err) + } + + parentJson, err := simplejson.NewJson(contentMapBytes) + if err != nil { + return nil, fmt.Errorf("failed to convert contentMap to simpleJson: %w", err) + } + + return &data{parentJson}, nil +} diff --git a/deprecated/workers.go b/deprecated/workers.go new file mode 100644 index 0000000..cd6a735 --- /dev/null +++ b/deprecated/workers.go @@ -0,0 +1,202 @@ +package deprecated + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/google/uuid" + "log" + "time" + + "github.com/Henrod/task-queue/taskqueue" +) + +var ( + baseOptions *taskqueue.Options // nolint:gochecknoglobals + taskQueueMapping map[string]*taskqueue.TaskQueue // nolint:gochecknoglobals + jobFuncMapping map[string]jobFunc // nolint:gochecknoglobals + redisClientMapping map[string]*redis.Client // nolint:gochecknoglobals +) + +type jobFunc func(message *Msg) +var Config *config + +// Configure creates a base Options object based on the information provided in the options map. +func Configure(optionsMap map[string]string) { + baseOptions = &taskqueue.Options{ + QueueKey: "", + Namespace: optionsMap["namespace"], + StorageAddress: optionsMap["server"], + WorkerID: optionsMap["process"], + MaxRetries: -1, + OperationTimeout: time.Minute, + } + + Config = &config{ + Pool: &Pool{ + Get: func() Conn { + return Conn{ + Err: func() error { + return nil + }, + Close: func() {}, + } + }, + }, + } +} + +// Process binds a job function to a specific queue. +func Process(queueKey string, job jobFunc, concurrency int) { + var err error + + // Check if TaskQueue mapping already exists and create one if it does not + if taskQueueMapping == nil { + taskQueueMapping = map[string]*taskqueue.TaskQueue{} + } + + // Check if TaskQueue object already exists and create one if it does not + taskQueue, ok := taskQueueMapping[queueKey] + if !ok { + // Configure basic options + ctx := context.Background() + options := baseOptions.Copy() + options.QueueKey = queueKey + + // Configure Redis client + if redisClientMapping == nil { + redisClientMapping = map[string]*redis.Client{} + } + redisClient, ok := redisClientMapping[queueKey] + if !ok { + redisClient = taskqueue.NewDefaultRedis(options) + redisClientMapping[queueKey] = redisClient + } + + // Create task queue + taskQueue, err = taskqueue.NewTaskQueue(ctx, redisClient, options) + if err != nil { + log.Printf("failed to start taskQueue: %s", err) + return + } + taskQueueMapping[queueKey] = taskQueue + } + + // Check if JobFunc mapping already exists and create one if it does not + if jobFuncMapping == nil { + jobFuncMapping = map[string]jobFunc{} + } + + // Bind job to queue through the mapping object + jobFuncMapping[queueKey] = job +} + +// Enqueue is not backward compatible yet since it retries by default. +func Enqueue(queueKey, class string, args interface{}) (string, error) { + var err error + ctx := context.Background() + + // Check if TaskQueue mapping already exists and create one if it does not + if taskQueueMapping == nil { + taskQueueMapping = map[string]*taskqueue.TaskQueue{} + } + + // Check if TaskQueue object already exists and create one if it does not + taskQueue, ok := taskQueueMapping[queueKey] + if !ok { + // Configure basic options + options := baseOptions.Copy() + options.QueueKey = queueKey + + // Configure Redis client + if redisClientMapping == nil { + redisClientMapping = map[string]*redis.Client{} + } + redisClient, ok := redisClientMapping[queueKey] + if !ok { + redisClient = taskqueue.NewDefaultRedis(options) + redisClientMapping[queueKey] = redisClient + } + + // Create task queue + taskQueue, err = taskqueue.NewTaskQueue( + ctx, + redisClient, + options, + ) + if err != nil { + log.Printf("failed to start taskQueue: %s", err) + return "", fmt.Errorf("failed to start taskQueue: %w", err) + } + taskQueueMapping[queueKey] = taskQueue + } + + // Produce message into specified queue + taskID, err := taskQueue.ProduceAt( + ctx, + args, + time.Now(), + ) + if err != nil { + return "", fmt.Errorf("failed to enqueue: %w", err) + } + + return taskID.String(), nil +} + +// EnqueueWithOptions is not backward compatible yet since it doesn't accept another Redis connection. +func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) { + return Enqueue(queue, class, args) +} + +// Run loops over all the queues configured and starts consuming each one of them using the respective jobs. +func Run() { + ctx := context.Background() + + for queueKey, taskQueue := range taskQueueMapping { + + if taskQueue == nil { + log.Printf("nil task queue attached to queue %s", queueKey) + } + + go taskQueue.Consume(ctx, func(ctx context.Context, taskID uuid.UUID, payload interface{}) (err error) { + payloadBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + msg, err := NewMsg(string(payloadBytes)) + if err != nil { + return fmt.Errorf("failed to build *workers.Msg: %w", err) + } + + defer func() { + if r := recover(); r != nil { + if panicErr, ok := r.(error); ok { + err = panicErr + } + } + }() + + job, ok := jobFuncMapping[queueKey] + if !ok { + return fmt.Errorf("no job func mapped to queue %s", queueKey) + } + if job == nil { + return fmt.Errorf("nil job func mapped to queue %s", queueKey) + } + job(msg) + + return nil + }) + + } +} + +// Quit loops over all the queues configured and removes their references so the GC can free the respective memory. +func Quit() { + for queueName, _ := range taskQueueMapping { + taskQueueMapping[queueName] = nil + } +} diff --git a/docs/retrocompatibility-layer.md b/docs/retrocompatibility-layer.md index ce737c1..03a2af1 100644 --- a/docs/retrocompatibility-layer.md +++ b/docs/retrocompatibility-layer.md @@ -14,7 +14,7 @@ Example usage: ```go package main import ( - "https://github.com/Henrod/task-queue" + workers "github.com/Henrod/task-queue/deprecated" ) func myJob(message *workers.Msg) { // do something with your message diff --git a/go.mod b/go.mod index a418e1e..d5acb79 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,14 @@ module github.com/Henrod/task-queue go 1.16 require ( + github.com/bitly/go-simplejson v0.5.0 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 // indirect + github.com/garyburd/redigo v1.6.3 github.com/go-redis/redis/v8 v8.11.4 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 + github.com/kr/pretty v0.3.0 // indirect github.com/sirupsen/logrus v1.8.1 + github.com/topfreegames/go-workers v1.0.1 ) diff --git a/go.sum b/go.sum index f5621a8..fb08cd6 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,12 @@ +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -8,6 +15,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/garyburd/redigo v1.6.3 h1:HCeeRluvAgMusMomi1+6Y5dmFOdYV/JzoRrrbFlkGIc= +github.com/garyburd/redigo v1.6.3/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw= github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -30,6 +39,13 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -43,12 +59,16 @@ github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/topfreegames/go-workers v1.0.1 h1:lkCLn1ebANFrPfJyRhGsFHctljtF0imRYtMyHXtlA5c= +github.com/topfreegames/go-workers v1.0.1/go.mod h1:FZ45GKJrHNIChLw008zoR8lvoMTxK9RprdE2c3uLrCQ= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -105,6 +125,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/main b/main new file mode 100755 index 0000000..b49c65b Binary files /dev/null and b/main differ diff --git a/taskqueue/options.go b/taskqueue/options.go index adb0701..ad74ce4 100644 --- a/taskqueue/options.go +++ b/taskqueue/options.go @@ -11,6 +11,17 @@ type Options struct { OperationTimeout time.Duration } +func (o *Options) Copy() *Options { + return &Options{ + QueueKey: o.QueueKey, + Namespace: o.Namespace, + StorageAddress: o.StorageAddress, + WorkerID: o.WorkerID, + MaxRetries: o.MaxRetries, + OperationTimeout: o.OperationTimeout, + } +} + func (o *Options) setDefaults() { if o.Namespace == "" { o.Namespace = "default" diff --git a/taskqueue/task_queue.go b/taskqueue/task_queue.go index 7460717..1354eb8 100644 --- a/taskqueue/task_queue.go +++ b/taskqueue/task_queue.go @@ -141,12 +141,19 @@ func (t *TaskQueue) consume( } else if err != nil { return fmt.Errorf("failed to get task: %w", err) } + if task == nil { + return errors.New("task is nil") + } + logger.Info("task ID: ", task.ID.String()) logger = withTaskLabels(logger, task) defer t.removeInProgressTask(ctx, task) logger.Debug("consuming task") + if consume == nil { + return errors.New("consume func is nil") + } err = consume(ctx, task.ID, task.Payload) if err != nil { logger.WithError(err).Debug("failed to consume, retrying after backoff")