From a3ea5510ea3bd1b6392e329ee94eaaf5c076d62c Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Fri, 12 Dec 2025 17:54:12 +0000 Subject: [PATCH 1/9] adapters/wredis: Add streamer and support container tests --- .github/workflows/tests.yml | 31 ++++- adapters/wredis/store_test.go | 9 +- adapters/wredis/streamer.go | 231 +++++++++++++++++++++++++++++++ adapters/wredis/streamer_test.go | 42 ++++++ 4 files changed, 308 insertions(+), 5 deletions(-) create mode 100644 adapters/wredis/streamer.go create mode 100644 adapters/wredis/streamer_test.go diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b369d8f..a0b265e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,4 +49,33 @@ jobs: find ./_examples -type f -name "go.mod" -exec dirname {} \; | while read -r dir; do echo "Running go vet in $dir" (cd "$dir" && go vet ./...) - done \ No newline at end of file + done + + integration_tests: + name: Integration Tests (Testcontainers) + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Check Docker availability + run: | + docker --version + docker info + + - name: Test Kafka Adapter with Testcontainers + run: | + cd ./adapters/kafkastreamer + echo "🚀 Testing Kafka adapter with testcontainers..." + go test -v -timeout=10m ./... + + - name: Test Redis Adapter with Testcontainers + run: | + cd ./adapters/wredis + echo "🚀 Testing Redis adapter with testcontainers..." + go test -v -timeout=10m ./... \ No newline at end of file diff --git a/adapters/wredis/store_test.go b/adapters/wredis/store_test.go index df6a0a1..5ba1822 100644 --- a/adapters/wredis/store_test.go +++ b/adapters/wredis/store_test.go @@ -1,15 +1,16 @@ -package wredis +package wredis_test import ( "testing" + "github.com/luno/workflow" + "github.com/luno/workflow/adapters/adaptertest" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" - "github.com/luno/workflow" - "github.com/luno/workflow/adapters/adaptertest" + "github.com/luno/workflow/adapters/wredis" ) func TestRedisRecordStore(t *testing.T) { @@ -34,7 +35,7 @@ func TestRedisRecordStore(t *testing.T) { err := client.FlushDB(ctx).Err() require.NoError(t, err) - return New(client) + return wredis.New(client) } adaptertest.RunRecordStoreTest(t, factory) diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go new file mode 100644 index 0000000..0773b1d --- /dev/null +++ b/adapters/wredis/streamer.go @@ -0,0 +1,231 @@ +package wredis + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "sync" + "time" + + "github.com/luno/workflow" + "github.com/redis/go-redis/v9" +) + +const ( + streamKeyPrefix = "workflow:stream:" + cursorKeyPrefix = "workflow:cursor:" + + consumerGroupPrefix = "workflow-" +) + +type Streamer struct { + client redis.UniversalClient + mu sync.RWMutex +} + +func NewStreamer(client redis.UniversalClient) *Streamer { + return &Streamer{ + client: client, + } +} + +var _ workflow.EventStreamer = (*Streamer)(nil) + +func (s *Streamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) { + return &Sender{ + client: s.client, + topic: topic, + }, nil +} + +func (s *Streamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) { + var options workflow.ReceiverOptions + for _, opt := range opts { + opt(&options) + } + + return &Receiver{ + client: s.client, + topic: topic, + name: name, + options: options, + }, nil +} + +type Sender struct { + client redis.UniversalClient + topic string +} + +var _ workflow.EventSender = (*Sender)(nil) + +func (s *Sender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error { + event := &workflow.Event{ + ForeignID: foreignID, + Type: statusType, + Headers: headers, + CreatedAt: time.Now(), + } + + eventData, err := json.Marshal(event) + if err != nil { + return err + } + + streamKey := streamKeyPrefix + s.topic + + // Use XADD to add event to Redis Stream + _, err = s.client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + Values: map[string]interface{}{ + "event": string(eventData), + }, + }).Result() + + return err +} + +func (s *Sender) Close() error { + return nil +} + +type Receiver struct { + client redis.UniversalClient + topic string + name string + options workflow.ReceiverOptions +} + +var _ workflow.EventReceiver = (*Receiver)(nil) + +func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) { + streamKey := streamKeyPrefix + r.topic + consumerGroup := consumerGroupPrefix + r.name + + // Handle StreamFromLatest option by checking if consumer group exists + if r.options.StreamFromLatest { + // Try to get consumer group info to see if it exists + groups, err := r.client.XInfoGroups(ctx, streamKey).Result() + if err != nil && err.Error() != "ERR no such key" { + return nil, nil, err + } + + groupExists := false + for _, group := range groups { + if group.Name == consumerGroup { + groupExists = true + break + } + } + + if !groupExists { + // Consumer group doesn't exist, create it starting from the latest message + // Use "$" to start from the end of the stream + _, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "$").Result() + if err != nil { + return nil, nil, err + } + } + } else { + // Create consumer group if it doesn't exist, starting from beginning + _, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Result() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + return nil, nil, err + } + } + + for ctx.Err() == nil { + // Try to read pending messages first + pendingMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroup, + Consumer: r.name, + Streams: []string{streamKey, ">"}, + Count: 1, + Block: 100 * time.Millisecond, + }).Result() + + if err != nil && err != redis.Nil { + return nil, nil, err + } + + if len(pendingMsgs) > 0 && len(pendingMsgs[0].Messages) > 0 { + msg := pendingMsgs[0].Messages[0] + event, err := r.parseEvent(msg) + if err != nil { + // Acknowledge bad message and continue + r.client.XAck(ctx, streamKey, consumerGroup, msg.ID) + continue + } + + ack := func() error { + return r.client.XAck(ctx, streamKey, consumerGroup, msg.ID).Err() + } + + return event, ack, nil + } + + // If no messages and using polling frequency, wait + if r.options.PollFrequency > 0 { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-time.After(r.options.PollFrequency): + continue + } + } + } + + return nil, nil, ctx.Err() +} + +func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) { + eventData, ok := msg.Values["event"].(string) + if !ok { + return nil, fmt.Errorf("invalid event data format") + } + + var event workflow.Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + return nil, err + } + + // Parse Redis Stream ID to set Event ID + id, err := parseStreamID(msg.ID) + if err != nil { + return nil, err + } + event.ID = id + + return &event, nil +} + + +func (r *Receiver) Close() error { + return nil +} + +// parseStreamID converts Redis Stream ID (timestamp-sequence) to int64 +func parseStreamID(streamID string) (int64, error) { + // Redis Stream ID format: "timestamp-sequence" + // We'll use just the timestamp part as the event ID + if streamID == "" { + return 0, fmt.Errorf("empty stream ID") + } + + // Find the dash separator + dashIndex := -1 + for i, char := range streamID { + if char == '-' { + dashIndex = i + break + } + } + + if dashIndex == -1 { + return 0, fmt.Errorf("invalid stream ID format: %s", streamID) + } + + timestamp := streamID[:dashIndex] + return strconv.ParseInt(timestamp, 10, 64) +} diff --git a/adapters/wredis/streamer_test.go b/adapters/wredis/streamer_test.go new file mode 100644 index 0000000..88ef922 --- /dev/null +++ b/adapters/wredis/streamer_test.go @@ -0,0 +1,42 @@ +package wredis_test + +import ( + "testing" + + "github.com/luno/workflow" + "github.com/luno/workflow/adapters/adaptertest" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" + + "github.com/luno/workflow/adapters/wredis" +) + +func TestRedisEventStreamer(t *testing.T) { + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + factory := func() workflow.EventStreamer { + // Clean the database before each test + err := client.FlushDB(ctx).Err() + require.NoError(t, err) + + return wredis.NewStreamer(client) + } + + adaptertest.RunEventStreamerTest(t, factory) +} From abc5f1fd88a80fcda0407386443f1443e76008f8 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Fri, 12 Dec 2025 18:10:45 +0000 Subject: [PATCH 2/9] apply suggestion --- adapters/wredis/parsestream_test.go | 181 ++++++++++++++++++++++++++++ adapters/wredis/streamer.go | 57 +++++++-- adapters/wredis/streamer_test.go | 1 + 3 files changed, 227 insertions(+), 12 deletions(-) create mode 100644 adapters/wredis/parsestream_test.go diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go new file mode 100644 index 0000000..9003320 --- /dev/null +++ b/adapters/wredis/parsestream_test.go @@ -0,0 +1,181 @@ +package wredis + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseStreamID(t *testing.T) { + testCases := []struct { + name string + streamID string + expectedID int64 + expectError bool + errorMsg string + }{ + { + name: "valid stream ID", + streamID: "1640995200000-0", + expectedID: 1640995200000000000, + }, + { + name: "valid stream ID with sequence", + streamID: "1640995200000-5", + expectedID: 1640995200000000005, + }, + { + name: "same timestamp different sequence", + streamID: "1640995200000-123", + expectedID: 1640995200000000123, + }, + { + name: "maximum valid sequence", + streamID: "1640995200000-999999", + expectedID: 1640995200000999999, + }, + { + name: "empty stream ID", + streamID: "", + expectError: true, + errorMsg: "empty stream ID", + }, + { + name: "invalid format - no dash", + streamID: "1640995200000", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "invalid format - multiple dashes", + streamID: "1640995200000-0-1", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "empty timestamp", + streamID: "-0", + expectError: true, + errorMsg: "empty timestamp", + }, + { + name: "empty sequence", + streamID: "1640995200000-", + expectError: true, + errorMsg: "empty sequence", + }, + { + name: "non-numeric timestamp", + streamID: "abc-0", + expectError: true, + errorMsg: "invalid timestamp", + }, + { + name: "non-numeric sequence", + streamID: "1640995200000-abc", + expectError: true, + errorMsg: "invalid sequence", + }, + { + name: "double dash creates invalid format", + streamID: "1640995200000--1", + expectError: true, + errorMsg: "invalid stream ID format", + }, + { + name: "sequence too large", + streamID: "1640995200000-1000000", + expectError: true, + errorMsg: "sequence too large", + }, + { + name: "timestamp too large for overflow protection", + streamID: "9223372036855000-0", + expectError: true, + errorMsg: "timestamp too large", + }, + { + name: "edge case - minimum timestamp", + streamID: "0-0", + expectedID: 0, + }, + { + name: "edge case - realistic timestamp with sequence", + streamID: "1735834800000-42", // Example: 2025-01-02 15:00:00 UTC + expectedID: 1735834800000000042, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + id, err := parseStreamID(tc.streamID) + + if tc.expectError { + require.Error(t, err, "Expected error for stream ID: %s", tc.streamID) + require.Contains(t, err.Error(), tc.errorMsg, + "Error message should contain '%s' for stream ID: %s", tc.errorMsg, tc.streamID) + } else { + require.NoError(t, err, "Unexpected error for stream ID: %s", tc.streamID) + require.Equal(t, tc.expectedID, id, + "Event ID should match expected value for stream ID: %s", tc.streamID) + } + }) + } +} + +func TestParseStreamIDCollisionPrevention(t *testing.T) { + // Test that different stream IDs produce different event IDs + streamIDs := []string{ + "1640995200000-0", + "1640995200000-1", + "1640995200000-999", + "1640995200001-0", // Different timestamp + } + + seenIDs := make(map[int64]string) + + for _, streamID := range streamIDs { + id, err := parseStreamID(streamID) + require.NoError(t, err, "Failed to parse stream ID: %s", streamID) + + if existingStreamID, exists := seenIDs[id]; exists { + t.Errorf("ID collision detected: stream IDs '%s' and '%s' both produce event ID %d", + streamID, existingStreamID, id) + } + seenIDs[id] = streamID + } + + require.Len(t, seenIDs, len(streamIDs), "All stream IDs should produce unique event IDs") +} + +func TestParseStreamIDEdgeCases(t *testing.T) { + // Test specific edge cases that could happen in practice + t.Run("Test manual negative sequence handling", func(t *testing.T) { + // We need to test the negative sequence path without using double dash + // Let's manually construct a test case that would reach that code path + // This is more for completeness of test coverage + + // Since we can't easily create this scenario with string parsing, + // let's verify the logic is sound by testing the bounds + id, err := parseStreamID("1640995200000-0") + require.NoError(t, err) + require.Equal(t, int64(1640995200000000000), id) + + // Test that sequence 999999 (maximum allowed) works + id, err = parseStreamID("1640995200000-999999") + require.NoError(t, err) + require.Equal(t, int64(1640995200000999999), id) + }) + + t.Run("Test overflow protection", func(t *testing.T) { + // Test that our overflow protection works + _, err := parseStreamID("9223372036855000-0") + require.Error(t, err) + require.Contains(t, err.Error(), "timestamp too large") + + // Test sequence too large + _, err = parseStreamID("1640995200000-1000000") + require.Error(t, err) + require.Contains(t, err.Error(), "sequence too large") + }) +} \ No newline at end of file diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 0773b1d..401cb36 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "sync" "time" @@ -208,24 +209,56 @@ func (r *Receiver) Close() error { // parseStreamID converts Redis Stream ID (timestamp-sequence) to int64 func parseStreamID(streamID string) (int64, error) { // Redis Stream ID format: "timestamp-sequence" - // We'll use just the timestamp part as the event ID + // Combine both parts to ensure uniqueness: id = timestamp*1_000_000 + sequence if streamID == "" { return 0, fmt.Errorf("empty stream ID") } - // Find the dash separator - dashIndex := -1 - for i, char := range streamID { - if char == '-' { - dashIndex = i - break - } + // Split on the dash separator + parts := strings.Split(streamID, "-") + if len(parts) != 2 { + return 0, fmt.Errorf("invalid stream ID format: %s", streamID) } - if dashIndex == -1 { - return 0, fmt.Errorf("invalid stream ID format: %s", streamID) + timestampStr := parts[0] + sequenceStr := parts[1] + + // Validate parts are not empty + if timestampStr == "" { + return 0, fmt.Errorf("empty timestamp in stream ID: %s", streamID) + } + if sequenceStr == "" { + return 0, fmt.Errorf("empty sequence in stream ID: %s", streamID) + } + + // Parse timestamp + timestamp, err := strconv.ParseInt(timestampStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid timestamp in stream ID %s: %w", streamID, err) + } + + // Parse sequence + sequence, err := strconv.ParseInt(sequenceStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid sequence in stream ID %s: %w", streamID, err) + } + + // Check for potential overflow before combining + // We need timestamp*1_000_000 + sequence to fit in int64 + // Max int64 is 9,223,372,036,854,775,807 + // So timestamp should not exceed 9,223,372,036,854 + const maxTimestamp = 9223372036854 + if timestamp > maxTimestamp { + return 0, fmt.Errorf("timestamp too large in stream ID %s: would cause overflow", streamID) + } + if sequence >= 1000000 { + return 0, fmt.Errorf("sequence too large in stream ID %s: must be less than 1,000,000", streamID) + } + if sequence < 0 { + return 0, fmt.Errorf("sequence cannot be negative in stream ID %s", streamID) } - timestamp := streamID[:dashIndex] - return strconv.ParseInt(timestamp, 10, 64) + // Combine timestamp and sequence to create unique ID + combinedID := timestamp*1000000 + sequence + return combinedID, nil } diff --git a/adapters/wredis/streamer_test.go b/adapters/wredis/streamer_test.go index 88ef922..4c59ba0 100644 --- a/adapters/wredis/streamer_test.go +++ b/adapters/wredis/streamer_test.go @@ -40,3 +40,4 @@ func TestRedisEventStreamer(t *testing.T) { adaptertest.RunEventStreamerTest(t, factory) } + From 5f9e5bf2844d92cb360c7ac3deb8b7a11cc6dc4a Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Fri, 12 Dec 2025 21:08:34 +0000 Subject: [PATCH 3/9] apply suggestion --- adapters/wredis/parsestream_test.go | 63 +++++++++++++++++++++++++++++ adapters/wredis/streamer.go | 50 ++++++++++++++++++++--- 2 files changed, 107 insertions(+), 6 deletions(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index 9003320..72281d7 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -3,7 +3,10 @@ package wredis import ( "testing" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" ) func TestParseStreamID(t *testing.T) { @@ -178,4 +181,64 @@ func TestParseStreamIDEdgeCases(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "sequence too large") }) +} + +func TestReceiverErrorHandling(t *testing.T) { + // This test verifies that malformed messages are handled properly + // and not silently acknowledged + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + streamer := NewStreamer(client) + topic := "test-malformed" + + // Create sender and receiver + sender, err := streamer.NewSender(ctx, topic) + require.NoError(t, err) + defer sender.Close() + + receiver, err := streamer.NewReceiver(ctx, topic, "test-receiver") + require.NoError(t, err) + defer receiver.Close() + + // Manually inject malformed data directly to Redis stream + streamKey := streamKeyPrefix + topic + _, err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + Values: map[string]interface{}{ + "event": "invalid-json-data{", + }, + }).Result() + require.NoError(t, err) + + // Try to receive - should get an error, not silent acknowledgment + _, _, err = receiver.Recv(ctx) + require.Error(t, err, "Should return error for malformed message") + require.Contains(t, err.Error(), "failed to parse", "Error should mention parse failure") + + // Verify the malformed message is still in the stream (not acknowledged) + // by checking pending messages for the consumer group + consumerGroup := consumerGroupPrefix + "test-receiver" + pending, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamKey, + Group: consumerGroup, + Start: "-", + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, pending, 1, "Malformed message should still be pending (not acknowledged)") } \ No newline at end of file diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 401cb36..0512044 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "strconv" "strings" "sync" @@ -137,26 +138,63 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err } for ctx.Err() == nil { - // Try to read pending messages first + // First, try to read pending messages that were delivered but not acknowledged pendingMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: consumerGroup, Consumer: r.name, - Streams: []string{streamKey, ">"}, + Streams: []string{streamKey, "0"}, // "0" reads pending messages Count: 1, - Block: 100 * time.Millisecond, + Block: 0, // Don't block for pending messages }).Result() if err != nil && err != redis.Nil { return nil, nil, err } + // Process pending message if found if len(pendingMsgs) > 0 && len(pendingMsgs[0].Messages) > 0 { msg := pendingMsgs[0].Messages[0] event, err := r.parseEvent(msg) if err != nil { - // Acknowledge bad message and continue - r.client.XAck(ctx, streamKey, consumerGroup, msg.ID) - continue + // Log parse error with detailed information for debugging + log.Printf("wredis: failed to parse pending message ID %s in stream %s: %v, payload: %+v", + msg.ID, streamKey, err, msg.Values) + // TODO: Move to dead letter stream instead of acknowledging + // For now, return error to bubble up rather than silent ack + return nil, nil, fmt.Errorf("failed to parse pending message %s: %w", msg.ID, err) + } + + ack := func() error { + return r.client.XAck(ctx, streamKey, consumerGroup, msg.ID).Err() + } + + return event, ack, nil + } + + // No pending messages, try to read new messages + newMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroup, + Consumer: r.name, + Streams: []string{streamKey, ">"}, // ">" reads new messages only + Count: 1, + Block: 100 * time.Millisecond, + }).Result() + + if err != nil && err != redis.Nil { + return nil, nil, err + } + + // Process new message if found + if len(newMsgs) > 0 && len(newMsgs[0].Messages) > 0 { + msg := newMsgs[0].Messages[0] + event, err := r.parseEvent(msg) + if err != nil { + // Log parse error with detailed information for debugging + log.Printf("wredis: failed to parse new message ID %s in stream %s: %v, payload: %+v", + msg.ID, streamKey, err, msg.Values) + // TODO: Move to dead letter stream instead of acknowledging + // For now, return error to bubble up rather than silent ack + return nil, nil, fmt.Errorf("failed to parse new message %s: %w", msg.ID, err) } ack := func() error { From 2c6e7017da6142c3722fc0cec214316cd9f48f82 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Fri, 12 Dec 2025 21:15:40 +0000 Subject: [PATCH 4/9] apply suggestion --- adapters/wredis/parsestream_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index 72281d7..bcb2379 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -153,18 +153,18 @@ func TestParseStreamIDCollisionPrevention(t *testing.T) { func TestParseStreamIDEdgeCases(t *testing.T) { // Test specific edge cases that could happen in practice - t.Run("Test manual negative sequence handling", func(t *testing.T) { - // We need to test the negative sequence path without using double dash - // Let's manually construct a test case that would reach that code path - // This is more for completeness of test coverage + t.Run("Test sequence bounds validation", func(t *testing.T) { + // Test normal positive sequence handling at the boundaries: + // - sequence 0 (minimum valid sequence) + // - sequence 999999 (maximum allowed sequence) + // This ensures our parsing correctly handles the full valid range - // Since we can't easily create this scenario with string parsing, - // let's verify the logic is sound by testing the bounds + // Test minimum valid sequence (0) id, err := parseStreamID("1640995200000-0") require.NoError(t, err) require.Equal(t, int64(1640995200000000000), id) - // Test that sequence 999999 (maximum allowed) works + // Test maximum allowed sequence (999999) id, err = parseStreamID("1640995200000-999999") require.NoError(t, err) require.Equal(t, int64(1640995200000999999), id) From c7fa9526e84776c4ca55f862b515e8c61bd1a860 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Sat, 13 Dec 2025 11:18:03 +0000 Subject: [PATCH 5/9] apply suggestion --- adapters/wredis/parsestream_test.go | 48 +++++++++++++++++++++++++++++ adapters/wredis/streamer.go | 30 +++++++++--------- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index bcb2379..eb8dd6e 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -2,11 +2,14 @@ package wredis import ( "testing" + "time" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" + + "github.com/luno/workflow" ) func TestParseStreamID(t *testing.T) { @@ -241,4 +244,49 @@ func TestReceiverErrorHandling(t *testing.T) { }).Result() require.NoError(t, err) require.Len(t, pending, 1, "Malformed message should still be pending (not acknowledged)") +} + +func TestPollingFrequencyImplementation(t *testing.T) { + // Test that polling frequency is correctly used for Redis blocking + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + streamer := NewStreamer(client) + topic := "test-polling" + + // Test with custom polling frequency + customPollFreq := 500 * time.Millisecond + receiver, err := streamer.NewReceiver(ctx, topic, "test-receiver", + workflow.WithReceiverPollFrequency(customPollFreq)) + require.NoError(t, err) + defer receiver.Close() + + // Verify the receiver uses the correct block duration + recv := receiver.(*Receiver) + blockDuration := recv.getBlockDuration() + require.Equal(t, customPollFreq, blockDuration, + "Block duration should match custom polling frequency") + + // Test default polling frequency (when not specified) + defaultReceiver, err := streamer.NewReceiver(ctx, topic, "default-receiver") + require.NoError(t, err) + defer defaultReceiver.Close() + + defaultRecv := defaultReceiver.(*Receiver) + defaultBlockDuration := defaultRecv.getBlockDuration() + require.Equal(t, 1*time.Second, defaultBlockDuration, + "Default block duration should be 1 second") } \ No newline at end of file diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 0512044..418d72a 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -172,12 +172,15 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err } // No pending messages, try to read new messages + // Use Redis native blocking based on polling frequency + blockDuration := r.getBlockDuration() + newMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: consumerGroup, Consumer: r.name, Streams: []string{streamKey, ">"}, // ">" reads new messages only Count: 1, - Block: 100 * time.Millisecond, + Block: blockDuration, }).Result() if err != nil && err != redis.Nil { @@ -189,11 +192,6 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err msg := newMsgs[0].Messages[0] event, err := r.parseEvent(msg) if err != nil { - // Log parse error with detailed information for debugging - log.Printf("wredis: failed to parse new message ID %s in stream %s: %v, payload: %+v", - msg.ID, streamKey, err, msg.Values) - // TODO: Move to dead letter stream instead of acknowledging - // For now, return error to bubble up rather than silent ack return nil, nil, fmt.Errorf("failed to parse new message %s: %w", msg.ID, err) } @@ -203,16 +201,6 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err return event, ack, nil } - - // If no messages and using polling frequency, wait - if r.options.PollFrequency > 0 { - select { - case <-ctx.Done(): - return nil, nil, ctx.Err() - case <-time.After(r.options.PollFrequency): - continue - } - } } return nil, nil, ctx.Err() @@ -239,6 +227,16 @@ func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) { return &event, nil } +func (r *Receiver) getBlockDuration() time.Duration { + // If PollFrequency is set, use it for blocking + if r.options.PollFrequency > 0 { + return r.options.PollFrequency + } + + // Default: use a reasonable block time for real-time streaming + // 1 second is a good balance between responsiveness and efficiency + return 1 * time.Second +} func (r *Receiver) Close() error { return nil From 74c014ece1e340543f10baf7cfde2cb2bbbdcb31 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Sat, 13 Dec 2025 11:24:11 +0000 Subject: [PATCH 6/9] apply suggestion --- adapters/wredis/parsestream_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index eb8dd6e..2b6f893 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -1,6 +1,7 @@ package wredis import ( + "context" "testing" "time" @@ -204,6 +205,7 @@ func TestReceiverErrorHandling(t *testing.T) { client := redis.NewClient(&redis.Options{ Addr: host + ":" + port.Port(), }) + t.Cleanup(func() { _ = client.Close() }) streamer := NewStreamer(client) topic := "test-malformed" @@ -228,7 +230,11 @@ func TestReceiverErrorHandling(t *testing.T) { require.NoError(t, err) // Try to receive - should get an error, not silent acknowledgment - _, _, err = receiver.Recv(ctx) + // Use a timeout context to prevent hanging + recvCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, _, err = receiver.Recv(recvCtx) require.Error(t, err, "Should return error for malformed message") require.Contains(t, err.Error(), "failed to parse", "Error should mention parse failure") @@ -263,6 +269,7 @@ func TestPollingFrequencyImplementation(t *testing.T) { client := redis.NewClient(&redis.Options{ Addr: host + ":" + port.Port(), }) + t.Cleanup(func() { _ = client.Close() }) streamer := NewStreamer(client) topic := "test-polling" From bd63d5a2dc5c954a7e1a035f8d56b0ef47991622 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Sat, 13 Dec 2025 11:43:11 +0000 Subject: [PATCH 7/9] clean up --- adapters/wredis/streamer.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 418d72a..9a0c628 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -4,10 +4,8 @@ import ( "context" "encoding/json" "fmt" - "log" "strconv" "strings" - "sync" "time" "github.com/luno/workflow" @@ -15,15 +13,12 @@ import ( ) const ( - streamKeyPrefix = "workflow:stream:" - cursorKeyPrefix = "workflow:cursor:" - - consumerGroupPrefix = "workflow-" + streamKeyPrefix = "workflow:stream:" + consumerGroupPrefix = "workflow:consumer:" ) type Streamer struct { client redis.UniversalClient - mu sync.RWMutex } func NewStreamer(client redis.UniversalClient) *Streamer { @@ -156,11 +151,7 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err msg := pendingMsgs[0].Messages[0] event, err := r.parseEvent(msg) if err != nil { - // Log parse error with detailed information for debugging - log.Printf("wredis: failed to parse pending message ID %s in stream %s: %v, payload: %+v", - msg.ID, streamKey, err, msg.Values) - // TODO: Move to dead letter stream instead of acknowledging - // For now, return error to bubble up rather than silent ack + // Return error to bubble up rather than silent ack return nil, nil, fmt.Errorf("failed to parse pending message %s: %w", msg.ID, err) } @@ -234,8 +225,7 @@ func (r *Receiver) getBlockDuration() time.Duration { } // Default: use a reasonable block time for real-time streaming - // 1 second is a good balance between responsiveness and efficiency - return 1 * time.Second + return 250 * time.Millisecond } func (r *Receiver) Close() error { From 761dfa52b50e892ddfcb3c89c8bf387e0f7cb6b6 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Sat, 13 Dec 2025 15:08:27 +0000 Subject: [PATCH 8/9] apply suggestion --- adapters/wredis/parsestream_test.go | 75 ++++++++++++++++++++++++++++- adapters/wredis/streamer.go | 12 ++++- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index 2b6f893..62ba7c9 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -294,6 +294,77 @@ func TestPollingFrequencyImplementation(t *testing.T) { defaultRecv := defaultReceiver.(*Receiver) defaultBlockDuration := defaultRecv.getBlockDuration() - require.Equal(t, 1*time.Second, defaultBlockDuration, - "Default block duration should be 1 second") + require.Equal(t, 250*time.Millisecond, defaultBlockDuration, + "Default block duration should be 250 milliseconds") +} + +func TestAckWithCancelledContext(t *testing.T) { + // Test that ack works even after Recv context is cancelled + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-ack-context" + + // Create sender and send a message + sender, err := streamer.NewSender(ctx, topic) + require.NoError(t, err) + defer sender.Close() + + err = sender.Send(ctx, "test-message", 1, map[workflow.Header]string{ + workflow.HeaderTopic: topic, + }) + require.NoError(t, err) + + // Create receiver with a context that we'll cancel + recvCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + receiver, err := streamer.NewReceiver(recvCtx, topic, "test-receiver") + require.NoError(t, err) + defer receiver.Close() + + // Receive the message + event, ack, err := receiver.Recv(recvCtx) + require.NoError(t, err) + require.NotNil(t, event) + require.NotNil(t, ack) + require.Equal(t, "test-message", event.ForeignID) + + // Cancel the Recv context to simulate timeout/cancellation + cancel() + + // Wait a bit to ensure context is cancelled + time.Sleep(10 * time.Millisecond) + + // Ack should still work because it uses a fresh background context + err = ack() + require.NoError(t, err, "Ack should succeed even after Recv context is cancelled") + + // Verify the message was actually acknowledged by checking pending count + streamKey := streamKeyPrefix + topic + consumerGroup := consumerGroupPrefix + "test-receiver" + pending, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamKey, + Group: consumerGroup, + Start: "-", + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, pending, 0, "Message should be acknowledged and not pending") } \ No newline at end of file diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 9a0c628..50cd322 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -156,7 +156,11 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err } ack := func() error { - return r.client.XAck(ctx, streamKey, consumerGroup, msg.ID).Err() + // Use fresh background context with short timeout for acknowledgment + // This allows ack to succeed even if Recv context was cancelled + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.client.XAck(ackCtx, streamKey, consumerGroup, msg.ID).Err() } return event, ack, nil @@ -187,7 +191,11 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err } ack := func() error { - return r.client.XAck(ctx, streamKey, consumerGroup, msg.ID).Err() + // Use fresh background context with short timeout for acknowledgment + // This allows ack to succeed even if Recv context was cancelled + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.client.XAck(ackCtx, streamKey, consumerGroup, msg.ID).Err() } return event, ack, nil From 1e087665358b0cc0ca63c97d53121f2688dda867 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Mon, 15 Dec 2025 12:49:50 +0000 Subject: [PATCH 9/9] apply suggestion --- adapters/wredis/parsestream_test.go | 73 +++++++++++++++++++++++++++-- adapters/wredis/streamer.go | 2 +- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/adapters/wredis/parsestream_test.go b/adapters/wredis/parsestream_test.go index 62ba7c9..1af79c2 100644 --- a/adapters/wredis/parsestream_test.go +++ b/adapters/wredis/parsestream_test.go @@ -2,15 +2,16 @@ package wredis import ( "context" + "fmt" + "sync" "testing" "time" + "github.com/luno/workflow" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" rediscontainer "github.com/testcontainers/testcontainers-go/modules/redis" - - "github.com/luno/workflow" ) func TestParseStreamID(t *testing.T) { @@ -367,4 +368,70 @@ func TestAckWithCancelledContext(t *testing.T) { }).Result() require.NoError(t, err) require.Len(t, pending, 0, "Message should be acknowledged and not pending") -} \ No newline at end of file +} + +func TestConcurrentStreamFromLatestReceivers(t *testing.T) { + // Test that multiple receivers with StreamFromLatest don't fail with BUSYGROUP + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + t.Cleanup(func() { _ = client.Close() }) + + streamer := NewStreamer(client) + topic := "test-concurrent-latest" + + // Create multiple receivers concurrently with StreamFromLatest + const numReceivers = 5 + receivers := make([]workflow.EventReceiver, numReceivers) + + // Use a sync.WaitGroup to start all receivers at roughly the same time + var startWg sync.WaitGroup + errorsChan := make(chan error, numReceivers) + + startWg.Add(numReceivers) + + for i := 0; i < numReceivers; i++ { + go func(i int) { + defer startWg.Done() + receiverName := fmt.Sprintf("concurrent-receiver-%d", i) + receiver, err := streamer.NewReceiver(ctx, topic, receiverName, workflow.StreamFromLatest()) + if err != nil { + errorsChan <- fmt.Errorf("receiver %d failed: %w", i, err) + return + } + receivers[i] = receiver + }(i) + } + + // Wait for all receivers to be created + startWg.Wait() + close(errorsChan) + + // Check for any errors + var errors []error + for err := range errorsChan { + errors = append(errors, err) + } + + require.Empty(t, errors, "No receivers should fail with BUSYGROUP error: %v", errors) + + // Clean up receivers + for i, receiver := range receivers { + if receiver != nil { + err := receiver.Close() + require.NoError(t, err, "Failed to close receiver %d", i) + } + } +} diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go index 50cd322..9b58070 100644 --- a/adapters/wredis/streamer.go +++ b/adapters/wredis/streamer.go @@ -120,7 +120,7 @@ func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err // Consumer group doesn't exist, create it starting from the latest message // Use "$" to start from the end of the stream _, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "$").Result() - if err != nil { + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { return nil, nil, err } }