actor: add new abstraction over mailbox#10142
actor: add new abstraction over mailbox#10142Roasbeef merged 3 commits intolightningnetwork:masterfrom
Conversation
There was a problem hiding this comment.
Summary of Changes
Hello @Roasbeef, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
I've implemented a significant architectural improvement to the actor system by introducing an abstraction layer over the actor's mailbox. This change decouples the actor's core logic from its message queuing mechanism, making the system more extensible and adaptable. The immediate benefit is a cleaner, more robust message processing flow within the actor, with future possibilities for advanced mailbox behaviors like persistence or flow control.
Highlights
- Mailbox Abstraction: I've introduced a new
Mailboxinterface that abstracts the underlying message queue mechanism for actors. This allows for greater flexibility, enabling different mailbox implementations such as durable on-disk queues or backpressure-aware mailboxes in the future. - ChannelMailbox Implementation with iter.Seq: The default mailbox implementation,
ChannelMailbox, now leverages Go channels and integrates with Go'siter.Seqfor an elegant and context-aware message reception API within the actor's processing loop. - Actor Core Logic Refactoring: The core
Actorlogic and itsTellandAskmethods have been updated to interact with the newMailboxinterface, simplifying the code and centralizing message handling concerns within the mailbox implementation itself.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable abstraction for the actor mailbox, which is a great step towards making the system more flexible and modular. The use of Go 1.22 iterators for message consumption is a nice, modern touch. However, the new ChannelMailbox implementation has a critical race condition between send and close operations that could lead to panics in a concurrent environment. Additionally, some of the new tests rely on time.Sleep for synchronization, which can make them flaky. I've provided suggestions to fix the race condition and improve the robustness of the tests.
| func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | ||
| env envelope[M, R]) bool { | ||
| if m.IsClosed() { | ||
| return false | ||
| } | ||
|
|
||
| select { | ||
| case m.ch <- env: | ||
| return true | ||
| case <-ctx.Done(): | ||
| return false | ||
| case <-m.actorCtx.Done(): | ||
| // Actor is shutting down. | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
There is a critical race condition here. A concurrent call to Close() could close the channel m.ch after m.IsClosed() returns false but before the m.ch <- env send operation occurs. This would cause the send on the closed channel to panic.
To fix this, you should use the RWMutex (m.mu) to protect the send operation. By acquiring a read lock before checking IsClosed and sending, you can prevent Close() (which acquires a write lock) from executing between the check and the send.
| func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | |
| env envelope[M, R]) bool { | |
| if m.IsClosed() { | |
| return false | |
| } | |
| select { | |
| case m.ch <- env: | |
| return true | |
| case <-ctx.Done(): | |
| return false | |
| case <-m.actorCtx.Done(): | |
| // Actor is shutting down. | |
| return false | |
| } | |
| } | |
| func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | |
| env envelope[M, R]) bool { | |
| m.mu.RLock() | |
| defer m.mu.RUnlock() | |
| if m.IsClosed() { | |
| return false | |
| } | |
| select { | |
| case m.ch <- env: | |
| return true | |
| case <-ctx.Done(): | |
| return false | |
| case <-m.actorCtx.Done(): | |
| // Actor is shutting down. | |
| return false | |
| } | |
| } |
e465137 to
99f3f9e
Compare
99f3f9e to
03e7648
Compare
3134546 to
8781daa
Compare
This commit introduces a new Mailbox interface that abstracts the message queue implementation for actors. Previously, actors used a direct channel for their mailbox, which limited flexibility and made it difficult to implement alternative mailbox strategies. The new Mailbox interface provides methods for sending, receiving, and draining messages, with full context support for cancellation. The Receive method leverages Go 1.23's iter.Seq pattern, providing a clean iterator-based API that allows natural for-range loops over messages. The ChannelMailbox implementation maintains the existing channel-based behavior while conforming to the new interface. It stores the actor's context internally, ensuring both caller and actor contexts are properly respected during send and receive operations. This simplifies context handling compared to complex context merging approaches. This abstraction enables future implementations such as priority mailboxes, persistent mailboxes, or bounded mailboxes with overflow strategies, without requiring changes to the actor implementation.
This commit adds thorough test coverage for the new Mailbox interface and ChannelMailbox implementation. The tests verify correct behavior across various scenarios including successful sends, context cancellation, mailbox closure, and concurrent operations. The test suite specifically validates that the mailbox respects both the caller's context and the actor's context during send and receive operations. This ensures that actors properly shut down when their context is cancelled, and that callers can cancel operations without affecting the actor's lifecycle. Additional tests cover edge cases such as zero-capacity mailboxes (which default to a capacity of 1), draining messages after closure, and concurrent sends from multiple goroutines. The concurrent test uses 10 senders each sending 100 messages to verify thread-safety and proper message ordering. All tests pass with the race detector enabled, confirming the implementation is free from data races.
03e7648 to
331b259
Compare
🟡 PR Severity: MEDIUM
🟡 Medium (2 files)
🟢 Test files (1 file, excluded from severity calculation)
AnalysisThis PR introduces a new mailbox abstraction in the Severity rationale:
Recommendation: Standard code review with focus on the actor pattern implementation and ensuring the mailbox abstraction maintains proper concurrency guarantees. To override, add a |
This commit refactors the Actor implementation to use the new Mailbox interface instead of directly managing a channel. This change significantly simplifies the actor's message processing loop and improves separation of concerns. The main changes include replacing the direct channel field with a Mailbox interface, updating NewActor to create a ChannelMailbox instance, and refactoring the process method to use the iterator pattern provided by mailbox.Receive. The new implementation uses a clean for-range loop over the mailbox's message iterator, eliminating the complex select statement that previously handled both message reception and context cancellation. The Tell and Ask methods in actorRefImpl have been simplified to use the mailbox's Send method, which internally handles both the caller's context and the actor's context. This eliminates the need for complex select statements in these methods and ensures consistent context handling throughout the actor system. Message draining during shutdown is now handled through the mailbox's Drain method, providing a cleaner separation between normal message processing and cleanup operations. The actor still properly sends unprocessed messages to the Dead Letter Office and completes pending promises with appropriate errors during shutdown.
331b259 to
20c5c9d
Compare
|
@Roasbeef, remember to re-request review from reviewers when ready |
Abdulkbk
left a comment
There was a problem hiding this comment.
LGTM
The linter seems to pass, though some lines exceed the 80-character limit
| return | ||
| } | ||
| } | ||
| // Use the new iterator pattern for receiving messages. |
| func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] { | ||
| return func(yield func(envelope[M, R]) bool) { | ||
| // Only drain if closed. | ||
| if !m.IsClosed() { |
There was a problem hiding this comment.
I wonder if we can do without this restriction, which would make this usable for "grab everything currently" on live mailboxes, which seems useful for mailbox strategies like batch processing or backpressure relief?
There was a problem hiding this comment.
Great observation. So my idea here is to use the BackpressureQueue I added in this PR to implement the new mailbox interface. So then it'll automatically drop items as needed before they enter the queue.
That's because this is a sub-module, and right now the linter doesn't actually cover sub modules. There's a PR to fix this however. With that said, thanks for the review! |
In this PR, we add an abstraction over the mailbox w/ the default actor impl. This is useful, as this allows us to slot in a different mailbox. Examples include a durable on disk queue, a mailbox with backpressure awareness, etc.
We then make a default implementation that uses Go channels, then use that in place.
The new interface also uses an
iter.Seqto allow for an easy iteration API that still is aware of context cancellations.