diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b369d8f..a0b265e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,4 +49,33 @@ jobs: find ./_examples -type f -name "go.mod" -exec dirname {} \; | while read -r dir; do echo "Running go vet in $dir" (cd "$dir" && go vet ./...) - done \ No newline at end of file + done + + integration_tests: + name: Integration Tests (Testcontainers) + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Check Docker availability + run: | + docker --version + docker info + + - name: Test Kafka Adapter with Testcontainers + run: | + cd ./adapters/kafkastreamer + echo "🚀 Testing Kafka adapter with testcontainers..." + go test -v -timeout=10m ./... + + - name: Test Redis Adapter with Testcontainers + run: | + cd ./adapters/wredis + echo "🚀 Testing Redis adapter with testcontainers..." + go test -v -timeout=10m ./... \ No newline at end of file diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go new file mode 100644 index 0000000..1af79c2 --- /dev/null +++ b/adapters/wredis/parsestream_test.go @@ -0,0 +1,437 @@ +package wredis + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/luno/workflow" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" +) + +func TestParseStreamID(t *testing.T) { + testCases := []struct { + name string + streamID string + expectedID int64 + expectError bool + errorMsg string + }{ + { + name: "valid stream ID", + streamID: "1640995200000-0", + expectedID: 1640995200000000000, + }, + { + name: "valid stream ID with sequence", + streamID: "1640995200000-5", + expectedID: 1640995200000000005, + }, + { + name: "same timestamp different sequence", + streamID: "1640995200000-123", + expectedID: 1640995200000000123, + }, + { + name: "maximum valid sequence", + streamID: "1640995200000-999999", + expectedID: 1640995200000999999, + }, + { + name: "empty stream ID", + streamID: "", + expectError: true, + errorMsg: "empty stream ID", + }, + { + name: "invalid format - no dash", + streamID: "1640995200000", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "invalid format - multiple dashes", + streamID: "1640995200000-0-1", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "empty timestamp", + streamID: "-0", + expectError: true, + errorMsg: "empty timestamp", + }, + { + name: "empty sequence", + streamID: "1640995200000-", + expectError: true, + errorMsg: "empty sequence", + }, + { + name: "non-numeric timestamp", + streamID: "abc-0", + expectError: true, + errorMsg: "invalid timestamp", + }, + { + name: "non-numeric sequence", + streamID: "1640995200000-abc", + expectError: true, + errorMsg: "invalid sequence", + }, + { + name: "double dash creates invalid format", + streamID: "1640995200000--1", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "sequence too large", + streamID: "1640995200000-1000000", + expectError: true, + errorMsg: "sequence too large", + }, + { + name: "timestamp too large for overflow protection", + streamID: "9223372036855000-0", + expectError: true, + errorMsg: "timestamp too large", + }, + { + name: "edge case - minimum timestamp", + streamID: "0-0", + expectedID: 0, + }, + { + name: "edge case - realistic timestamp with sequence", + streamID: "1735834800000-42", // Example: 2025-01-02 15:00:00 UTC + expectedID: 1735834800000000042, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + id, err := parseStreamID(tc.streamID) + + if tc.expectError { + require.Error(t, err, "Expected error for stream ID: %s", tc.streamID) + require.Contains(t, err.Error(), tc.errorMsg, + "Error message should contain '%s' for stream ID: %s", tc.errorMsg, tc.streamID) + } else { + require.NoError(t, err, "Unexpected error for stream ID: %s", tc.streamID) + require.Equal(t, tc.expectedID, id, + "Event ID should match expected value for stream ID: %s", tc.streamID) + } + }) + } +} + +func TestParseStreamIDCollisionPrevention(t *testing.T) { + // Test that different stream IDs produce different event IDs + streamIDs := []string{ + "1640995200000-0", + "1640995200000-1", + "1640995200000-999", + "1640995200001-0", // Different timestamp + } + + seenIDs := make(map[int64]string) + + for _, streamID := range streamIDs { + id, err := parseStreamID(streamID) + require.NoError(t, err, "Failed to parse stream ID: %s", streamID) + + if existingStreamID, exists := seenIDs[id]; exists { + t.Errorf("ID collision detected: stream IDs '%s' and '%s' both produce event ID %d", + streamID, existingStreamID, id) + } + seenIDs[id] = streamID + } + + require.Len(t, seenIDs, len(streamIDs), "All stream IDs should produce unique event IDs") +} + +func TestParseStreamIDEdgeCases(t *testing.T) { + // Test specific edge cases that could happen in practice + t.Run("Test sequence bounds validation", func(t *testing.T) { + // Test normal positive sequence handling at the boundaries: + // - sequence 0 (minimum valid sequence) + // - sequence 999999 (maximum allowed sequence) + // This ensures our parsing correctly handles the full valid range + + // Test minimum valid sequence (0) + id, err := parseStreamID("1640995200000-0") + require.NoError(t, err) + require.Equal(t, int64(1640995200000000000), id) + + // Test maximum allowed sequence (999999) + id, err = parseStreamID("1640995200000-999999") + require.NoError(t, err) + require.Equal(t, int64(1640995200000999999), id) + }) + + t.Run("Test overflow protection", func(t *testing.T) { + // Test that our overflow protection works + _, err := parseStreamID("9223372036855000-0") + require.Error(t, err) + require.Contains(t, err.Error(), "timestamp too large") + + // Test sequence too large + _, err = parseStreamID("1640995200000-1000000") + require.Error(t, err) + require.Contains(t, err.Error(), "sequence too large") + }) +} + +func TestReceiverErrorHandling(t *testing.T) { + // This test verifies that malformed messages are handled properly + // and not silently acknowledged + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-malformed" + + // Create sender and receiver + sender, err := streamer.NewSender(ctx, topic) + require.NoError(t, err) + defer sender.Close() + + receiver, err := streamer.NewReceiver(ctx, topic, "test-receiver") + require.NoError(t, err) + defer receiver.Close() + + // Manually inject malformed data directly to Redis stream + streamKey := streamKeyPrefix + topic + _, err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + Values: map[string]interface{}{ + "event": "invalid-json-data{", + }, + }).Result() + require.NoError(t, err) + + // Try to receive - should get an error, not silent acknowledgment + // Use a timeout context to prevent hanging + recvCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, _, err = receiver.Recv(recvCtx) + require.Error(t, err, "Should return error for malformed message") + require.Contains(t, err.Error(), "failed to parse", "Error should mention parse failure") + + // Verify the malformed message is still in the stream (not acknowledged) + // by checking pending messages for the consumer group + consumerGroup := consumerGroupPrefix + "test-receiver" + pending, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamKey, + Group: consumerGroup, + Start: "-", + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, pending, 1, "Malformed message should still be pending (not acknowledged)") +} + +func TestPollingFrequencyImplementation(t *testing.T) { + // Test that polling frequency is correctly used for Redis blocking + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-polling" + + // Test with custom polling frequency + customPollFreq := 500 * time.Millisecond + receiver, err := streamer.NewReceiver(ctx, topic, "test-receiver", + workflow.WithReceiverPollFrequency(customPollFreq)) + require.NoError(t, err) + defer receiver.Close() + + // Verify the receiver uses the correct block duration + recv := receiver.(*Receiver) + blockDuration := recv.getBlockDuration() + require.Equal(t, customPollFreq, blockDuration, + "Block duration should match custom polling frequency") + + // Test default polling frequency (when not specified) + defaultReceiver, err := streamer.NewReceiver(ctx, topic, "default-receiver") + require.NoError(t, err) + defer defaultReceiver.Close() + + defaultRecv := defaultReceiver.(*Receiver) + defaultBlockDuration := defaultRecv.getBlockDuration() + require.Equal(t, 250*time.Millisecond, defaultBlockDuration, + "Default block duration should be 250 milliseconds") +} + +func TestAckWithCancelledContext(t *testing.T) { + // Test that ack works even after Recv context is cancelled + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-ack-context" + + // Create sender and send a message + sender, err := streamer.NewSender(ctx, topic) + require.NoError(t, err) + defer sender.Close() + + err = sender.Send(ctx, "test-message", 1, map[workflow.Header]string{ + workflow.HeaderTopic: topic, + }) + require.NoError(t, err) + + // Create receiver with a context that we'll cancel + recvCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + receiver, err := streamer.NewReceiver(recvCtx, topic, "test-receiver") + require.NoError(t, err) + defer receiver.Close() + + // Receive the message + event, ack, err := receiver.Recv(recvCtx) + require.NoError(t, err) + require.NotNil(t, event) + require.NotNil(t, ack) + require.Equal(t, "test-message", event.ForeignID) + + // Cancel the Recv context to simulate timeout/cancellation + cancel() + + // Wait a bit to ensure context is cancelled + time.Sleep(10 * time.Millisecond) + + // Ack should still work because it uses a fresh background context + err = ack() + require.NoError(t, err, "Ack should succeed even after Recv context is cancelled") + + // Verify the message was actually acknowledged by checking pending count + streamKey := streamKeyPrefix + topic + consumerGroup := consumerGroupPrefix + "test-receiver" + pending, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamKey, + Group: consumerGroup, + Start: "-", + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, pending, 0, "Message should be acknowledged and not pending") +} + +func TestConcurrentStreamFromLatestReceivers(t *testing.T) { + // Test that multiple receivers with StreamFromLatest don't fail with BUSYGROUP + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-concurrent-latest" + + // Create multiple receivers concurrently with StreamFromLatest + const numReceivers = 5 + receivers := make([]workflow.EventReceiver, numReceivers) + + // Use a sync.WaitGroup to start all receivers at roughly the same time + var startWg sync.WaitGroup + errorsChan := make(chan error, numReceivers) + + startWg.Add(numReceivers) + + for i := 0; i < numReceivers; i++ { + go func(i int) { + defer startWg.Done() + receiverName := fmt.Sprintf("concurrent-receiver-%d", i) + receiver, err := streamer.NewReceiver(ctx, topic, receiverName, workflow.StreamFromLatest()) + if err != nil { + errorsChan <- fmt.Errorf("receiver %d failed: %w", i, err) + return + } + receivers[i] = receiver + }(i) + } + + // Wait for all receivers to be created + startWg.Wait() + close(errorsChan) + + // Check for any errors + var errors []error + for err := range errorsChan { + errors = append(errors, err) + } + + require.Empty(t, errors, "No receivers should fail with BUSYGROUP error: %v", errors) + + // Clean up receivers + for i, receiver := range receivers { + if receiver != nil { + err := receiver.Close() + require.NoError(t, err, "Failed to close receiver %d", i) + } + } +} diff --git a/adapters/wredis/store_test.go b/adapters/wredis/store_test.go index df6a0a1..5ba1822 100644 --- a/adapters/wredis/store_test.go +++ b/adapters/wredis/store_test.go @@ -1,15 +1,16 @@ -package wredis +package wredis_test import ( "testing" + "github.com/luno/workflow" + "github.com/luno/workflow/adapters/adaptertest" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" - "github.com/luno/workflow" - "github.com/luno/workflow/adapters/adaptertest" + "github.com/luno/workflow/adapters/wredis" ) func TestRedisRecordStore(t *testing.T) { @@ -34,7 +35,7 @@ func TestRedisRecordStore(t *testing.T) { err := client.FlushDB(ctx).Err() require.NoError(t, err) - return New(client) + return wredis.New(client) } adaptertest.RunRecordStoreTest(t, factory) diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go new file mode 100644 index 0000000..9b58070 --- /dev/null +++ b/adapters/wredis/streamer.go @@ -0,0 +1,298 @@ +package wredis + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/luno/workflow" + "github.com/redis/go-redis/v9" +) + +const ( + streamKeyPrefix = "workflow:stream:" + consumerGroupPrefix = "workflow:consumer:" +) + +type Streamer struct { + client redis.UniversalClient +} + +func NewStreamer(client redis.UniversalClient) *Streamer { + return &Streamer{ + client: client, + } +} + +var _ workflow.EventStreamer = (*Streamer)(nil) + +func (s *Streamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) { + return &Sender{ + client: s.client, + topic: topic, + }, nil +} + +func (s *Streamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) { + var options workflow.ReceiverOptions + for _, opt := range opts { + opt(&options) + } + + return &Receiver{ + client: s.client, + topic: topic, + name: name, + options: options, + }, nil +} + +type Sender struct { + client redis.UniversalClient + topic string +} + +var _ workflow.EventSender = (*Sender)(nil) + +func (s *Sender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error { + event := &workflow.Event{ + ForeignID: foreignID, + Type: statusType, + Headers: headers, + CreatedAt: time.Now(), + } + + eventData, err := json.Marshal(event) + if err != nil { + return err + } + + streamKey := streamKeyPrefix + s.topic + + // Use XADD to add event to Redis Stream + _, err = s.client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + Values: map[string]interface{}{ + "event": string(eventData), + }, + }).Result() + + return err +} + +func (s *Sender) Close() error { + return nil +} + +type Receiver struct { + client redis.UniversalClient + topic string + name string + options workflow.ReceiverOptions +} + +var _ workflow.EventReceiver = (*Receiver)(nil) + +func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) { + streamKey := streamKeyPrefix + r.topic + consumerGroup := consumerGroupPrefix + r.name + + // Handle StreamFromLatest option by checking if consumer group exists + if r.options.StreamFromLatest { + // Try to get consumer group info to see if it exists + groups, err := r.client.XInfoGroups(ctx, streamKey).Result() + if err != nil && err.Error() != "ERR no such key" { + return nil, nil, err + } + + groupExists := false + for _, group := range groups { + if group.Name == consumerGroup { + groupExists = true + break + } + } + + if !groupExists { + // Consumer group doesn't exist, create it starting from the latest message + // Use "$" to start from the end of the stream + _, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "$").Result() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + return nil, nil, err + } + } + } else { + // Create consumer group if it doesn't exist, starting from beginning + _, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Result() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + return nil, nil, err + } + } + + for ctx.Err() == nil { + // First, try to read pending messages that were delivered but not acknowledged + pendingMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroup, + Consumer: r.name, + Streams: []string{streamKey, "0"}, // "0" reads pending messages + Count: 1, + Block: 0, // Don't block for pending messages + }).Result() + + if err != nil && err != redis.Nil { + return nil, nil, err + } + + // Process pending message if found + if len(pendingMsgs) > 0 && len(pendingMsgs[0].Messages) > 0 { + msg := pendingMsgs[0].Messages[0] + event, err := r.parseEvent(msg) + if err != nil { + // Return error to bubble up rather than silent ack + return nil, nil, fmt.Errorf("failed to parse pending message %s: %w", msg.ID, err) + } + + ack := func() error { + // Use fresh background context with short timeout for acknowledgment + // This allows ack to succeed even if Recv context was cancelled + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.client.XAck(ackCtx, streamKey, consumerGroup, msg.ID).Err() + } + + return event, ack, nil + } + + // No pending messages, try to read new messages + // Use Redis native blocking based on polling frequency + blockDuration := r.getBlockDuration() + + newMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroup, + Consumer: r.name, + Streams: []string{streamKey, ">"}, // ">" reads new messages only + Count: 1, + Block: blockDuration, + }).Result() + + if err != nil && err != redis.Nil { + return nil, nil, err + } + + // Process new message if found + if len(newMsgs) > 0 && len(newMsgs[0].Messages) > 0 { + msg := newMsgs[0].Messages[0] + event, err := r.parseEvent(msg) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse new message %s: %w", msg.ID, err) + } + + ack := func() error { + // Use fresh background context with short timeout for acknowledgment + // This allows ack to succeed even if Recv context was cancelled + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.client.XAck(ackCtx, streamKey, consumerGroup, msg.ID).Err() + } + + return event, ack, nil + } + } + + return nil, nil, ctx.Err() +} + +func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) { + eventData, ok := msg.Values["event"].(string) + if !ok { + return nil, fmt.Errorf("invalid event data format") + } + + var event workflow.Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + return nil, err + } + + // Parse Redis Stream ID to set Event ID + id, err := parseStreamID(msg.ID) + if err != nil { + return nil, err + } + event.ID = id + + return &event, nil +} + +func (r *Receiver) getBlockDuration() time.Duration { + // If PollFrequency is set, use it for blocking + if r.options.PollFrequency > 0 { + return r.options.PollFrequency + } + + // Default: use a reasonable block time for real-time streaming + return 250 * time.Millisecond +} + +func (r *Receiver) Close() error { + return nil +} + +// parseStreamID converts Redis Stream ID (timestamp-sequence) to int64 +func parseStreamID(streamID string) (int64, error) { + // Redis Stream ID format: "timestamp-sequence" + // Combine both parts to ensure uniqueness: id = timestamp*1_000_000 + sequence + if streamID == "" { + return 0, fmt.Errorf("empty stream ID") + } + + // Split on the dash separator + parts := strings.Split(streamID, "-") + if len(parts) != 2 { + return 0, fmt.Errorf("invalid stream ID format: %s", streamID) + } + + timestampStr := parts[0] + sequenceStr := parts[1] + + // Validate parts are not empty + if timestampStr == "" { + return 0, fmt.Errorf("empty timestamp in stream ID: %s", streamID) + } + if sequenceStr == "" { + return 0, fmt.Errorf("empty sequence in stream ID: %s", streamID) + } + + // Parse timestamp + timestamp, err := strconv.ParseInt(timestampStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid timestamp in stream ID %s: %w", streamID, err) + } + + // Parse sequence + sequence, err := strconv.ParseInt(sequenceStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid sequence in stream ID %s: %w", streamID, err) + } + + // Check for potential overflow before combining + // We need timestamp*1_000_000 + sequence to fit in int64 + // Max int64 is 9,223,372,036,854,775,807 + // So timestamp should not exceed 9,223,372,036,854 + const maxTimestamp = 9223372036854 + if timestamp > maxTimestamp { + return 0, fmt.Errorf("timestamp too large in stream ID %s: would cause overflow", streamID) + } + if sequence >= 1000000 { + return 0, fmt.Errorf("sequence too large in stream ID %s: must be less than 1,000,000", streamID) + } + if sequence < 0 { + return 0, fmt.Errorf("sequence cannot be negative in stream ID %s", streamID) + } + + // Combine timestamp and sequence to create unique ID + combinedID := timestamp*1000000 + sequence + return combinedID, nil +} diff --git a/adapters/wredis/streamer_test.go b/adapters/wredis/streamer_test.go new file mode 100644 index 0000000..4c59ba0 --- /dev/null +++ b/adapters/wredis/streamer_test.go @@ -0,0 +1,43 @@ +package wredis_test + +import ( + "testing" + + "github.com/luno/workflow" + "github.com/luno/workflow/adapters/adaptertest" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" + + "github.com/luno/workflow/adapters/wredis" +) + +func TestRedisEventStreamer(t *testing.T) { + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + factory := func() workflow.EventStreamer { + // Clean the database before each test + err := client.FlushDB(ctx).Err() + require.NoError(t, err) + + return wredis.NewStreamer(client) + } + + adaptertest.RunEventStreamerTest(t, factory) +} +