Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 49 additions & 102 deletions actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Actor[M Message, R any] struct {
behavior ActorBehavior[M, R]

// mailbox is the incoming message queue for the actor.
mailbox chan envelope[M, R]
mailbox Mailbox[M, R]

// ctx is the context governing the actor's lifecycle.
ctx context.Context
Expand Down Expand Up @@ -93,10 +93,13 @@ func NewActor[M Message, R any](cfg ActorConfig[M, R]) (*Actor[M, R],
mailboxCapacity = 1
}

// Create mailbox - could be injected via config in the future.
mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity)

actor := &Actor[M, R]{
id: cfg.ID,
behavior: cfg.Behavior,
mailbox: make(chan envelope[M, R], mailboxCapacity),
mailbox: mailbox,
ctx: ctx,
cancel: cancel,
dlo: cfg.DLO,
Expand All @@ -123,76 +126,32 @@ func (a *Actor[M, R]) Start() {
// process is the main event loop for the actor. It continuously monitors its
// mailbox for incoming messages and its context for cancellation signals.
func (a *Actor[M, R]) process() {
for {
select {
case env := <-a.mailbox:
result := a.behavior.Receive(a.ctx, env.message)

// If a promise was provided (i.e., it was an "ask"
// operation), complete the promise with the result from
// the behavior.
if env.promise != nil {
env.promise.Complete(result)
}

// The actor's context has been cancelled, signaling a stop
// request. Exit the processing loop to terminate the actor's
// goroutine. Before exiting, drain any remaining messages from
// the mailbox.
//
// NOTE: We intentionally do NOT close the mailbox channel here.
// Closing it would create a TOCTOU race with Tell/Ask, which
// check ctx.Err() before sending to the mailbox. Between that
// check and the actual send, the channel could be closed,
// causing a panic. Instead, we drain using a non-blocking
// select loop. Messages sent after this point will remain in
// the buffered channel and be garbage collected.
case <-a.ctx.Done():
log.Debugf("Actor %s: context cancelled, draining "+
"mailbox", a.id)

// Drain any remaining messages from the mailbox
// without closing the channel.
drained := 0
for {
select {
case env := <-a.mailbox:
drained++

// If a DLO is configured, send the
// original message there for auditing
// or potential manual reprocessing.
if a.dlo != nil {
a.dlo.Tell(
context.Background(),
env.message,
)
}

// If it was an Ask, complete the
// promise with an error indicating the
// actor terminated.
if env.promise != nil {
env.promise.Complete(
fn.Err[R](
ErrActorTerminated,
),
)
}

default:
if drained > 0 {
log.Debugf("Actor %s: "+
"drained %d "+
"message(s) during "+
"shutdown",
a.id, drained)
}

// No more messages in the mailbox.
return
}
}
// Use the new iterator pattern for receiving messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor 👍

for env := range a.mailbox.Receive(a.ctx) {
result := a.behavior.Receive(a.ctx, env.message)

// If a promise was provided (i.e., it was an "ask"
// operation), complete the promise with the result from
// the behavior.
if env.promise != nil {
env.promise.Complete(result)
}
}

// Context was cancelled or mailbox closed, drain remaining messages.
a.mailbox.Close()

for env := range a.mailbox.Drain() {
// If a DLO is configured, send the original message there
// for auditing or potential manual reprocessing.
if a.dlo != nil {
a.dlo.Tell(context.Background(), env.message)
}

// If it was an Ask, complete the promise with an error
// indicating the actor terminated.
if env.promise != nil {
env.promise.Complete(fn.Err[R](ErrActorTerminated))
}
}
}
Expand Down Expand Up @@ -227,22 +186,15 @@ func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) {
return
}

select {
// Message successfully enqueued in the actor's mailbox.
case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: nil}:

// The context for the Tell operation was cancelled before the message
// could be enqueued. The message is dropped.
case <-ctx.Done():
log.Warnf("Tell to actor %s: message %s dropped "+
"(caller context cancelled)", ref.actor.id,
msg.MessageType())

// The actor itself has been stopped/terminated.
case <-ref.actor.ctx.Done():
// If the actor is terminated and has a DLO, send the message
// there. Otherwise, it's dropped.
ref.trySendToDLO(msg)
env := envelope[M, R]{message: msg, promise: nil}

// Use mailbox Send method which internally checks both contexts.
if !ref.actor.mailbox.Send(ctx, env) {
// Failed to send - check if actor terminated.
if ref.actor.ctx.Err() != nil {
ref.trySendToDLO(msg)
}
// Otherwise it was the caller's context that cancelled.
}
}

Expand Down Expand Up @@ -271,21 +223,16 @@ func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] {
return promise.Future()
}

select {
// Attempt to send the message along with its promise to the actor's
// mailbox.
case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: promise}:
env := envelope[M, R]{message: msg, promise: promise}

// The context for the Ask operation was cancelled before the message
// could be enqueued. Complete the promise with the context's error to
// unblock the caller.
case <-ctx.Done():
promise.Complete(fn.Err[R](ctx.Err()))

// The actor's context was cancelled (e.g., actor stopped) while this
// Ask operation was attempting to send (e.g., mailbox was full).
case <-ref.actor.ctx.Done():
promise.Complete(fn.Err[R](ErrActorTerminated))
// Use mailbox Send method which internally checks both contexts.
if !ref.actor.mailbox.Send(ctx, env) {
// Determine the error based on what failed.
if ref.actor.ctx.Err() != nil {
promise.Complete(fn.Err[R](ErrActorTerminated))
} else {
promise.Complete(fn.Err[R](ctx.Err()))
}
}

// Return the future associated with the promise, allowing the caller to
Expand Down
176 changes: 176 additions & 0 deletions actor/mailbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package actor

import (
"context"
"iter"
"sync"
"sync/atomic"
)

// Mailbox represents the message queue for an actor. It provides methods for
// sending messages and receiving them via an iterator pattern.
type Mailbox[M Message, R any] interface {
// Send attempts to send an envelope to the mailbox with context-based
// cancellation. Returns true if sent successfully, false if the
// context was cancelled or the mailbox is closed.
Send(ctx context.Context, env envelope[M, R]) bool

// TrySend attempts to send without blocking. Returns true if the
// envelope was sent, false if the mailbox is full or closed.
TrySend(env envelope[M, R]) bool

// Receive returns an iterator for consuming messages from the mailbox.
// The iterator will yield messages until the mailbox is closed or the
// context is cancelled.
Receive(ctx context.Context) iter.Seq[envelope[M, R]]

// Close closes the mailbox, preventing new messages from being sent.
// Any remaining messages can still be consumed via Receive.
Close()

// IsClosed returns true if the mailbox has been closed.
IsClosed() bool

// Drain returns an iterator that yields all remaining messages in the
// mailbox after it has been closed. This is useful for cleanup.
Drain() iter.Seq[envelope[M, R]]
}

// ChannelMailbox is a channel-based implementation of the Mailbox interface.
type ChannelMailbox[M Message, R any] struct {
ch chan envelope[M, R]
closed atomic.Bool

// mu protects Send/TrySend operations to prevent send-on-closed-channel
// panics. Close() acquires write lock, Send/TrySend acquire read lock.
mu sync.RWMutex

// closeOnce ensures Close() executes exactly once.
closeOnce sync.Once

// actorCtx is the actor's context for lifecycle management.
actorCtx context.Context
}

// NewChannelMailbox creates a new channel-based mailbox with the specified
// buffer capacity and actor context.
func NewChannelMailbox[M Message, R any](actorCtx context.Context,
capacity int) *ChannelMailbox[M, R] {

if capacity <= 0 {
capacity = 1
}
return &ChannelMailbox[M, R]{
ch: make(chan envelope[M, R], capacity),
actorCtx: actorCtx,
}
}

// Send implements Mailbox.Send with context-aware blocking send.
func (m *ChannelMailbox[M, R]) Send(ctx context.Context,
env envelope[M, R]) bool {

m.mu.RLock()
defer m.mu.RUnlock()

if m.IsClosed() {
return false
}

select {
case m.ch <- env:
return true
case <-ctx.Done():
return false
case <-m.actorCtx.Done():
// Actor is shutting down.
return false
}
}

// TrySend implements Mailbox.TrySend with non-blocking send.
func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool {
m.mu.RLock()
defer m.mu.RUnlock()

if m.IsClosed() {
return false
}

select {
case m.ch <- env:
return true
default:
return false
}
}

// Receive implements Mailbox.Receive using iter.Seq pattern.
func (m *ChannelMailbox[M, R]) Receive(
ctx context.Context) iter.Seq[envelope[M, R]] {
return func(yield func(envelope[M, R]) bool) {
for {
select {
case env, ok := <-m.ch:
if !ok {
return
}

if !yield(env) {
return
}

case <-ctx.Done():
return

case <-m.actorCtx.Done():
return
}
}
}
}

// Close implements Mailbox.Close.
func (m *ChannelMailbox[M, R]) Close() {
m.closeOnce.Do(func() {
m.mu.Lock()
defer m.mu.Unlock()

m.closed.Store(true)

close(m.ch)
})
}

// IsClosed implements Mailbox.IsClosed.
func (m *ChannelMailbox[M, R]) IsClosed() bool {
return m.closed.Load()
}

// Drain implements Mailbox.Drain for cleanup after close.
func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] {
return func(yield func(envelope[M, R]) bool) {
// Only drain if closed.
if !m.IsClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can do without this restriction, which would make this usable for "grab everything currently" on live mailboxes, which seems useful for mailbox strategies like batch processing or backpressure relief?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great observation. So my idea here is to use the BackpressureQueue I added in this PR to implement the new mailbox interface. So then it'll automatically drop items as needed before they enter the queue.

return
}

// Drain all remaining messages from the channel.
for {
select {
case env, ok := <-m.ch:
// Channel closed, nothing left to drain.
if !ok {
return
}

if !yield(env) {
return
}
default:
// Channel empty, done draining.
return
}
}
}
}
Loading
Loading