Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ tests: unit-tests
unit-tests:
go test github.com/Henrod/task-queue/taskqueue -v -tags=unit

.PHONY: integration-tests
integration-tests:
go test -timeout 3s -tags unit -v github.com/Henrod/task-queue/test

.PHONY: setup
setup:
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: '3.7'

services:
redis:
image: redis:7-alpine
ports:
- 6379:6379
60 changes: 60 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/Henrod/task-queue/taskqueue"
"github.com/google/uuid"
)

func TestTaskQueueMainFlow(t *testing.T) {
options := &taskqueue.Options{
QueueKey: "integration-test",
Namespace: "simple",
StorageAddress: "localhost:6379",
WorkerID: "worker1",
MaxRetries: 0,
OperationTimeout: time.Minute,
}

ctx, cancel := context.WithCancel(context.Background())
taskQueue, err := taskqueue.NewTaskQueue(ctx, taskqueue.NewDefaultRedis(options), options)
if err != nil {
panic(err)
}

StartProducer(ctx, taskQueue)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
StartConsumer(ctx, cancel, taskQueue, waitGroup)
}

type Payload struct {
Body string
}

func StartProducer(ctx context.Context, taskQueue *taskqueue.TaskQueue) error {
id := 1234
_, err := taskQueue.ProduceAt(ctx, &Payload{Body: fmt.Sprintf("%d", id)}, time.Now())
if err != nil {
return err
}
return nil
}

func StartConsumer(ctx context.Context, cancel context.CancelFunc, taskQueue *taskqueue.TaskQueue, waitGroup sync.WaitGroup) error {
taskQueue.Consume(
ctx,
func(ctx context.Context, taskID uuid.UUID, payload interface{}) error {
waitGroup.Done()
waitGroup.Wait()
cancel()
return nil
},
)
return nil
}