diff --git a/actor/actor.go b/actor/actor.go index 373898b8795..f75b4bb2520 100644 --- a/actor/actor.go +++ b/actor/actor.go @@ -45,7 +45,7 @@ type Actor[M Message, R any] struct { behavior ActorBehavior[M, R] // mailbox is the incoming message queue for the actor. - mailbox chan envelope[M, R] + mailbox Mailbox[M, R] // ctx is the context governing the actor's lifecycle. ctx context.Context @@ -93,10 +93,13 @@ func NewActor[M Message, R any](cfg ActorConfig[M, R]) (*Actor[M, R], mailboxCapacity = 1 } + // Create mailbox - could be injected via config in the future. + mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity) + actor := &Actor[M, R]{ id: cfg.ID, behavior: cfg.Behavior, - mailbox: make(chan envelope[M, R], mailboxCapacity), + mailbox: mailbox, ctx: ctx, cancel: cancel, dlo: cfg.DLO, @@ -123,76 +126,32 @@ func (a *Actor[M, R]) Start() { // process is the main event loop for the actor. It continuously monitors its // mailbox for incoming messages and its context for cancellation signals. func (a *Actor[M, R]) process() { - for { - select { - case env := <-a.mailbox: - result := a.behavior.Receive(a.ctx, env.message) - - // If a promise was provided (i.e., it was an "ask" - // operation), complete the promise with the result from - // the behavior. - if env.promise != nil { - env.promise.Complete(result) - } - - // The actor's context has been cancelled, signaling a stop - // request. Exit the processing loop to terminate the actor's - // goroutine. Before exiting, drain any remaining messages from - // the mailbox. - // - // NOTE: We intentionally do NOT close the mailbox channel here. - // Closing it would create a TOCTOU race with Tell/Ask, which - // check ctx.Err() before sending to the mailbox. Between that - // check and the actual send, the channel could be closed, - // causing a panic. Instead, we drain using a non-blocking - // select loop. Messages sent after this point will remain in - // the buffered channel and be garbage collected. - case <-a.ctx.Done(): - log.Debugf("Actor %s: context cancelled, draining "+ - "mailbox", a.id) - - // Drain any remaining messages from the mailbox - // without closing the channel. - drained := 0 - for { - select { - case env := <-a.mailbox: - drained++ - - // If a DLO is configured, send the - // original message there for auditing - // or potential manual reprocessing. - if a.dlo != nil { - a.dlo.Tell( - context.Background(), - env.message, - ) - } - - // If it was an Ask, complete the - // promise with an error indicating the - // actor terminated. - if env.promise != nil { - env.promise.Complete( - fn.Err[R]( - ErrActorTerminated, - ), - ) - } - - default: - if drained > 0 { - log.Debugf("Actor %s: "+ - "drained %d "+ - "message(s) during "+ - "shutdown", - a.id, drained) - } - - // No more messages in the mailbox. - return - } - } + // Use the new iterator pattern for receiving messages. + for env := range a.mailbox.Receive(a.ctx) { + result := a.behavior.Receive(a.ctx, env.message) + + // If a promise was provided (i.e., it was an "ask" + // operation), complete the promise with the result from + // the behavior. + if env.promise != nil { + env.promise.Complete(result) + } + } + + // Context was cancelled or mailbox closed, drain remaining messages. + a.mailbox.Close() + + for env := range a.mailbox.Drain() { + // If a DLO is configured, send the original message there + // for auditing or potential manual reprocessing. + if a.dlo != nil { + a.dlo.Tell(context.Background(), env.message) + } + + // If it was an Ask, complete the promise with an error + // indicating the actor terminated. + if env.promise != nil { + env.promise.Complete(fn.Err[R](ErrActorTerminated)) } } } @@ -227,22 +186,15 @@ func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) { return } - select { - // Message successfully enqueued in the actor's mailbox. - case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: nil}: - - // The context for the Tell operation was cancelled before the message - // could be enqueued. The message is dropped. - case <-ctx.Done(): - log.Warnf("Tell to actor %s: message %s dropped "+ - "(caller context cancelled)", ref.actor.id, - msg.MessageType()) - - // The actor itself has been stopped/terminated. - case <-ref.actor.ctx.Done(): - // If the actor is terminated and has a DLO, send the message - // there. Otherwise, it's dropped. - ref.trySendToDLO(msg) + env := envelope[M, R]{message: msg, promise: nil} + + // Use mailbox Send method which internally checks both contexts. + if !ref.actor.mailbox.Send(ctx, env) { + // Failed to send - check if actor terminated. + if ref.actor.ctx.Err() != nil { + ref.trySendToDLO(msg) + } + // Otherwise it was the caller's context that cancelled. } } @@ -271,21 +223,16 @@ func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] { return promise.Future() } - select { - // Attempt to send the message along with its promise to the actor's - // mailbox. - case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: promise}: + env := envelope[M, R]{message: msg, promise: promise} - // The context for the Ask operation was cancelled before the message - // could be enqueued. Complete the promise with the context's error to - // unblock the caller. - case <-ctx.Done(): - promise.Complete(fn.Err[R](ctx.Err())) - - // The actor's context was cancelled (e.g., actor stopped) while this - // Ask operation was attempting to send (e.g., mailbox was full). - case <-ref.actor.ctx.Done(): - promise.Complete(fn.Err[R](ErrActorTerminated)) + // Use mailbox Send method which internally checks both contexts. + if !ref.actor.mailbox.Send(ctx, env) { + // Determine the error based on what failed. + if ref.actor.ctx.Err() != nil { + promise.Complete(fn.Err[R](ErrActorTerminated)) + } else { + promise.Complete(fn.Err[R](ctx.Err())) + } } // Return the future associated with the promise, allowing the caller to diff --git a/actor/mailbox.go b/actor/mailbox.go new file mode 100644 index 00000000000..4d15f434c15 --- /dev/null +++ b/actor/mailbox.go @@ -0,0 +1,176 @@ +package actor + +import ( + "context" + "iter" + "sync" + "sync/atomic" +) + +// Mailbox represents the message queue for an actor. It provides methods for +// sending messages and receiving them via an iterator pattern. +type Mailbox[M Message, R any] interface { + // Send attempts to send an envelope to the mailbox with context-based + // cancellation. Returns true if sent successfully, false if the + // context was cancelled or the mailbox is closed. + Send(ctx context.Context, env envelope[M, R]) bool + + // TrySend attempts to send without blocking. Returns true if the + // envelope was sent, false if the mailbox is full or closed. + TrySend(env envelope[M, R]) bool + + // Receive returns an iterator for consuming messages from the mailbox. + // The iterator will yield messages until the mailbox is closed or the + // context is cancelled. + Receive(ctx context.Context) iter.Seq[envelope[M, R]] + + // Close closes the mailbox, preventing new messages from being sent. + // Any remaining messages can still be consumed via Receive. + Close() + + // IsClosed returns true if the mailbox has been closed. + IsClosed() bool + + // Drain returns an iterator that yields all remaining messages in the + // mailbox after it has been closed. This is useful for cleanup. + Drain() iter.Seq[envelope[M, R]] +} + +// ChannelMailbox is a channel-based implementation of the Mailbox interface. +type ChannelMailbox[M Message, R any] struct { + ch chan envelope[M, R] + closed atomic.Bool + + // mu protects Send/TrySend operations to prevent send-on-closed-channel + // panics. Close() acquires write lock, Send/TrySend acquire read lock. + mu sync.RWMutex + + // closeOnce ensures Close() executes exactly once. + closeOnce sync.Once + + // actorCtx is the actor's context for lifecycle management. + actorCtx context.Context +} + +// NewChannelMailbox creates a new channel-based mailbox with the specified +// buffer capacity and actor context. +func NewChannelMailbox[M Message, R any](actorCtx context.Context, + capacity int) *ChannelMailbox[M, R] { + + if capacity <= 0 { + capacity = 1 + } + return &ChannelMailbox[M, R]{ + ch: make(chan envelope[M, R], capacity), + actorCtx: actorCtx, + } +} + +// Send implements Mailbox.Send with context-aware blocking send. +func (m *ChannelMailbox[M, R]) Send(ctx context.Context, + env envelope[M, R]) bool { + + m.mu.RLock() + defer m.mu.RUnlock() + + if m.IsClosed() { + return false + } + + select { + case m.ch <- env: + return true + case <-ctx.Done(): + return false + case <-m.actorCtx.Done(): + // Actor is shutting down. + return false + } +} + +// TrySend implements Mailbox.TrySend with non-blocking send. +func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.IsClosed() { + return false + } + + select { + case m.ch <- env: + return true + default: + return false + } +} + +// Receive implements Mailbox.Receive using iter.Seq pattern. +func (m *ChannelMailbox[M, R]) Receive( + ctx context.Context) iter.Seq[envelope[M, R]] { + return func(yield func(envelope[M, R]) bool) { + for { + select { + case env, ok := <-m.ch: + if !ok { + return + } + + if !yield(env) { + return + } + + case <-ctx.Done(): + return + + case <-m.actorCtx.Done(): + return + } + } + } +} + +// Close implements Mailbox.Close. +func (m *ChannelMailbox[M, R]) Close() { + m.closeOnce.Do(func() { + m.mu.Lock() + defer m.mu.Unlock() + + m.closed.Store(true) + + close(m.ch) + }) +} + +// IsClosed implements Mailbox.IsClosed. +func (m *ChannelMailbox[M, R]) IsClosed() bool { + return m.closed.Load() +} + +// Drain implements Mailbox.Drain for cleanup after close. +func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] { + return func(yield func(envelope[M, R]) bool) { + // Only drain if closed. + if !m.IsClosed() { + return + } + + // Drain all remaining messages from the channel. + for { + select { + case env, ok := <-m.ch: + // Channel closed, nothing left to drain. + if !ok { + return + } + + if !yield(env) { + return + } + default: + // Channel empty, done draining. + return + } + } + } +} diff --git a/actor/mailbox_test.go b/actor/mailbox_test.go new file mode 100644 index 00000000000..b96d5e4de53 --- /dev/null +++ b/actor/mailbox_test.go @@ -0,0 +1,593 @@ +package actor + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestMessage is a test message type that embeds BaseMessage. +type TestMessage struct { + BaseMessage + Value int +} + +// MessageType returns the type name of the message for routing/filtering. +func (tm TestMessage) MessageType() string { + return "TestMessage" +} + +// TestChannelMailboxSend tests the Send method of ChannelMailbox. +func TestChannelMailboxSend(t *testing.T) { + t.Run("successful send", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + ctx := context.Background() + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + sent := mailbox.Send(ctx, env) + require.True(t, sent, "Send should succeed") + }) + + t.Run("send with cancelled context", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1) + // Fill the mailbox first. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + mailbox.TrySend(env) + + ctx, cancel := context.WithCancel(context.Background()) + // Cancel immediately. + cancel() + + env2 := envelope[TestMessage, int]{ + message: TestMessage{Value: 43}, + promise: nil, + } + + sent := mailbox.Send(ctx, env2) + require.False(t, sent, "Send should fail with cancelled context") + }) + + t.Run("send to closed mailbox", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + mailbox.Close() + + ctx := context.Background() + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + sent := mailbox.Send(ctx, env) + require.False(t, sent, "Send should fail on closed mailbox") + }) +} + +// TestChannelMailboxTrySend tests the TrySend method of ChannelMailbox. +func TestChannelMailboxTrySend(t *testing.T) { + t.Run("successful try send", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + sent := mailbox.TrySend(env) + require.True(t, sent, "TrySend should succeed") + }) + + t.Run("try send to full mailbox", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1) + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + // Fill the mailbox. + sent := mailbox.TrySend(env) + require.True(t, sent, "First TrySend should succeed") + + // Try to send again - should fail. + sent = mailbox.TrySend(env) + require.False(t, sent, "TrySend should fail on full mailbox") + }) + + t.Run("try send to closed mailbox", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + mailbox.Close() + + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + sent := mailbox.TrySend(env) + require.False(t, sent, "TrySend should fail on closed mailbox") + }) +} + +// TestChannelMailboxReceive tests the Receive method of ChannelMailbox. +func TestChannelMailboxReceive(t *testing.T) { + t.Run("receive messages", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + ctx := context.Background() + + // Send some messages. + for i := 0; i < 3; i++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + promise: nil, + } + mailbox.Send(ctx, env) + } + + // Start receiving in a goroutine. + var received []int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for env := range mailbox.Receive(ctx) { + received = append(received, env.message.Value) + } + }() + + // Close the mailbox after sending all messages. + mailbox.Close() + wg.Wait() + + require.Len(t, received, 3, "Should receive 3 messages") + require.Equal(t, []int{0, 1, 2}, received, "Should receive messages in order") + }) + + t.Run("receive with cancelled context", func(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + ctx, cancel := context.WithCancel(context.Background()) + + // Send a message. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + mailbox.Send(context.Background(), env) + + // Start receiving. + var received int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for env := range mailbox.Receive(ctx) { + received++ + _ = env + } + }() + + // Cancel the context. + cancel() + wg.Wait() + + // Might receive 0 or 1 message depending on timing. + require.LessOrEqual(t, received, 1, + "Should stop receiving after context cancel") + }) +} + +// TestChannelMailboxClose tests the Close and IsClosed methods. +func TestChannelMailboxClose(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + + require.False(t, mailbox.IsClosed(), "Mailbox should not be closed initially") + + mailbox.Close() + require.True(t, mailbox.IsClosed(), "Mailbox should be closed after Close()") + + // Closing again should be safe. + mailbox.Close() + require.True(t, mailbox.IsClosed(), "Mailbox should remain closed") +} + +// TestChannelMailboxDrain tests the Drain method of ChannelMailbox. +func TestChannelMailboxDrain(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + ctx := context.Background() + + // Send some messages. + for i := 0; i < 3; i++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: i}, + promise: nil, + } + mailbox.Send(ctx, env) + } + + // Close the mailbox. + mailbox.Close() + + // Drain messages. + var drained []int + for env := range mailbox.Drain() { + drained = append(drained, env.message.Value) + } + + require.Len(t, drained, 3, "Should drain 3 messages") + require.Equal(t, []int{0, 1, 2}, drained, "Should drain messages in order") +} + +// TestChannelMailboxConcurrent tests concurrent operations on ChannelMailbox. +func TestChannelMailboxConcurrent(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) + ctx := context.Background() + + const numSenders = 10 + const messagesPerSender = 100 + + var wg sync.WaitGroup + + // Start multiple senders. + for i := 0; i < numSenders; i++ { + wg.Add(1) + go func(senderID int) { + defer wg.Done() + for j := 0; j < messagesPerSender; j++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: senderID*1000 + j}, + promise: nil, + } + mailbox.Send(ctx, env) + } + }(i) + } + + // Start receiver. + received := make([]int, 0, numSenders*messagesPerSender) + var receiverWg sync.WaitGroup + receiverWg.Add(1) + go func() { + defer receiverWg.Done() + for env := range mailbox.Receive(ctx) { + received = append(received, env.message.Value) + } + }() + + // Wait for all senders to complete. + wg.Wait() + + // Close the mailbox now that all sends are complete. + mailbox.Close() + receiverWg.Wait() + + require.Len(t, received, numSenders*messagesPerSender, + "Should receive all messages") +} + +// TestChannelMailboxZeroCapacity tests that zero capacity defaults to 1. +func TestChannelMailboxZeroCapacity(t *testing.T) { + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 0) + + // Should default to capacity of 1. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + + sent := mailbox.TrySend(env) + require.True(t, sent, "Should be able to send one message") + + // Second send should fail (mailbox full). + sent = mailbox.TrySend(env) + require.False(t, sent, "Second send should fail on full mailbox") +} + +// TestChannelMailboxActorContext tests that the mailbox respects the actor's +// context for cancellation. +func TestChannelMailboxActorContext(t *testing.T) { + t.Run("send respects actor context", func(t *testing.T) { + actorCtx, actorCancel := context.WithCancel(context.Background()) + mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 1) + + // Fill the mailbox. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + mailbox.TrySend(env) + + // Cancel the actor context. + actorCancel() + + // Try to send with a fresh caller context - should fail due to + // actor context cancellation. + callerCtx := context.Background() + env2 := envelope[TestMessage, int]{ + message: TestMessage{Value: 43}, + promise: nil, + } + + sent := mailbox.Send(callerCtx, env2) + require.False(t, sent, "Send should fail when actor context is cancelled") + }) + + t.Run("receive respects actor context", func(t *testing.T) { + actorCtx, actorCancel := context.WithCancel(context.Background()) + mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 10) + + // Send a message. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 42}, + promise: nil, + } + mailbox.Send(context.Background(), env) + + // Start receiving with a fresh context. + callerCtx := context.Background() + var received int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for env := range mailbox.Receive(callerCtx) { + received++ + _ = env + } + }() + + // Cancel the actor context. + actorCancel() + wg.Wait() + + // Should have stopped receiving due to actor context cancellation. + require.LessOrEqual(t, received, 1, + "Should stop receiving when actor context is cancelled") + }) +} + +// TestMailboxConcurrentSendAndClose tests concurrent Send and Close operations +// to ensure no race conditions or panics occur. +func TestMailboxConcurrentSendAndClose(t *testing.T) { + const numSenders = 20 + const sendsPerSender = 100 + + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) + ctx := context.Background() + + var wg sync.WaitGroup + + // Start receiver to drain messages. + var recvWg sync.WaitGroup + recvWg.Add(1) + go func() { + defer recvWg.Done() + for range mailbox.Receive(ctx) { + // Just drain. + } + }() + + // Start multiple senders. + for i := 0; i < numSenders; i++ { + wg.Add(1) + go func(senderID int) { + defer wg.Done() + for j := 0; j < sendsPerSender; j++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: senderID*1000 + j}, + promise: nil, + } + // Send may fail if mailbox closes, that's ok. + mailbox.Send(ctx, env) + } + }(i) + } + + // Concurrently close the mailbox multiple times from different + // goroutines. + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mailbox.Close() + }() + } + + wg.Wait() + recvWg.Wait() + + // Mailbox should be closed. + require.True(t, mailbox.IsClosed(), "Mailbox should be closed") + + // Further sends should fail without panic. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 999}, + promise: nil, + } + sent := mailbox.Send(ctx, env) + require.False(t, sent, "Send should fail on closed mailbox") +} + +// TestMailboxConcurrentTrySendAndClose tests concurrent TrySend and Close +// operations to ensure no race conditions or panics occur. +func TestMailboxConcurrentTrySendAndClose(t *testing.T) { + const numSenders = 20 + const sendsPerSender = 100 + + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + + var wg sync.WaitGroup + + // Start multiple senders using TrySend. + for i := 0; i < numSenders; i++ { + wg.Add(1) + go func(senderID int) { + defer wg.Done() + for j := 0; j < sendsPerSender; j++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: senderID*1000 + j}, + promise: nil, + } + // TrySend may fail if mailbox is full or closed. + mailbox.TrySend(env) + } + }(i) + } + + // Concurrently close the mailbox. + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mailbox.Close() + }() + } + + wg.Wait() + + // Mailbox should be closed. + require.True(t, mailbox.IsClosed(), "Mailbox should be closed") + + // Further sends should fail without panic. + env := envelope[TestMessage, int]{ + message: TestMessage{Value: 999}, + promise: nil, + } + sent := mailbox.TrySend(env) + require.False(t, sent, "TrySend should fail on closed mailbox") +} + +// TestMailboxMultipleCloseCallers tests that multiple goroutines calling +// Close() simultaneously don't cause panics or issues. +func TestMailboxMultipleCloseCallers(t *testing.T) { + const numClosers = 100 + + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) + + var wg sync.WaitGroup + + // Start many goroutines all trying to close the mailbox. + for i := 0; i < numClosers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mailbox.Close() + }() + } + + wg.Wait() + + // Mailbox should be closed exactly once. + require.True(t, mailbox.IsClosed(), "Mailbox should be closed") + + // Calling Close again should be safe. + mailbox.Close() + require.True(t, mailbox.IsClosed(), "Mailbox should remain closed") +} + +// TestMailboxCloseWhileSending tests closing the mailbox while multiple +// senders are actively sending messages. +func TestMailboxCloseWhileSending(t *testing.T) { + const numSenders = 10 + const sendsPerSender = 1000 + + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) + ctx := context.Background() + + var sendWg sync.WaitGroup + + // Start multiple senders. + for i := 0; i < numSenders; i++ { + sendWg.Add(1) + go func(senderID int) { + defer sendWg.Done() + for j := 0; j < sendsPerSender; j++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: senderID*1000 + j}, + promise: nil, + } + // Send may fail after close, that's expected. + mailbox.Send(ctx, env) + } + }(i) + } + + // Start receiver to drain messages. + var recvWg sync.WaitGroup + recvWg.Add(1) + receivedCount := 0 + go func() { + defer recvWg.Done() + for range mailbox.Receive(ctx) { + receivedCount++ + } + }() + + // Close mailbox while sends are happening. + mailbox.Close() + + sendWg.Wait() + recvWg.Wait() + + // Should have received at least some messages (exact count depends on + // timing). + t.Logf("Received %d messages before close", receivedCount) + + // Mailbox should be closed. + require.True(t, mailbox.IsClosed(), "Mailbox should be closed") +} + +// TestMailboxStressTest performs a high-concurrency stress test with multiple +// senders, receivers, and close operations. +func TestMailboxStressTest(t *testing.T) { + const numSenders = 50 + const numReceivers = 5 + const sendsPerSender = 200 + + mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 200) + ctx := context.Background() + + var sendWg sync.WaitGroup + + // Start multiple senders. + for i := 0; i < numSenders; i++ { + sendWg.Add(1) + go func(senderID int) { + defer sendWg.Done() + for j := 0; j < sendsPerSender; j++ { + env := envelope[TestMessage, int]{ + message: TestMessage{Value: senderID*1000 + j}, + promise: nil, + } + mailbox.Send(ctx, env) + } + }(i) + } + + // Start multiple receivers. + var recvWg sync.WaitGroup + for i := 0; i < numReceivers; i++ { + recvWg.Add(1) + go func() { + defer recvWg.Done() + for range mailbox.Receive(ctx) { + // Just drain messages. + } + }() + } + + // Wait for all sends to complete. + sendWg.Wait() + + // Close mailbox. + mailbox.Close() + + // Wait for all receivers to finish. + recvWg.Wait() + + // Mailbox should be closed. + require.True(t, mailbox.IsClosed(), "Mailbox should be closed") +}