Harden stream listener fanout under churn#912
Harden stream listener fanout under churn#912bitcoin-coder-bob wants to merge 10 commits intomasterfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR enhances the gRPC listener lifecycle in the broker by introducing per-listener timeouts (5 seconds), adding a done channel for lifecycle signaling, and using snapshot copies of the listener list during event forwarding to reduce contention and enable non-blocking sends with graceful disconnection on timeout. Changes
Sequence DiagramsequenceDiagram
participant Client
participant arkservice as arkservice<br/>(GetEventStream)
participant broker as Broker<br/>(listenToEvents)
participant listener as Listener<br/>(done, ch)
Note over arkservice,listener: New Flow with Timeouts & Done Channel
Client->>arkservice: Subscribe to stream
arkservice->>broker: Register listener
broker->>listener: Create listener with done channel
loop Per Event Batch
arkservice->>broker: listenToEvents()
broker->>broker: getListenersCopy()
par Per-Listener Fanout (Snapshot)
broker->>listener: Send event to ch (5s timeout)
alt Timeout or Channel Full
broker->>listener: closeDone()
broker->>broker: Remove listener
listener-->>arkservice: Exit via listener.done case
else Success
listener->>Client: Receive event
end
end
end
alt Listener Canceled
Client->>arkservice: Close stream
arkservice->>listener: listener.done closes
arkservice->>arkservice: Exit select
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/interface/grpc/handlers/arkservice.go`:
- Around line 612-623: The current goroutine sending events via
eventsListenerHandler.getListenersCopy() silently drops messages due to the
select default on l.ch; change this to block with a bounded wait and disconnect
slow listeners instead of dropping: in the goroutine inside the for _, l :=
range listeners loop (the anonymous func taking l
*listener[*arkv1.GetEventStreamResponse]) replace the select { case <-l.done:
return; case l.ch <- ev.event: default: } with a select that waits on l.ch and
l.done plus a timeout (or use a non-zero context/deadline), and if the send
times out, close/mark the listener (e.g. signal l.done or call the existing
disconnect path) so the stream loop will exit; apply the same change to the
analogous tx stream code paths (the other select around l.ch) and ensure
listener.includesAny(ev.topics) behavior remains unchanged.
In `@internal/test/e2e/e2e_test.go`:
- Around line 4472-4490: The variable closeSentinelStream is written in one
goroutine (where transport.GetTransactionsStream is called) and invoked from
another, causing a data race; protect it by replacing the plain function
variable with a concurrency-safe holder (e.g., atomic.Value or a mutex-guarded
field) used in the goroutines that touch it: store the close function from
inside the goroutine that calls GetTransactionsStream and load it atomically (or
under the same mutex) before calling it from the other goroutine(s), and always
nil-check the loaded value before invoking; update both places that assign/read
closeSentinelStream (the loop around GetTransactionsStream and the callers at
the other noted locations) to use the new safe accessor.
| closeSentinelStream := func() {} | ||
| go func() { | ||
| defer close(sentinelDone) | ||
| for { | ||
| if stressCtx.Err() != nil { | ||
| return | ||
| } | ||
|
|
||
| stream, closeStream, err := transport.GetTransactionsStream(stressCtx) | ||
| if err != nil { | ||
| if stressCtx.Err() != nil { | ||
| return | ||
| } | ||
| sentinelErrors.Add(1) | ||
| time.Sleep(100 * time.Millisecond) | ||
| continue | ||
| } | ||
| closeSentinelStream = closeStream | ||
|
|
There was a problem hiding this comment.
Protect closeSentinelStream from data races across goroutines.
Line 4472-4490 (and 4755-4773) assigns to closeSentinelStream while Line 4651/4988 calls it from another goroutine without synchronization. This is a race under -race and can cause flaky tests.
🔧 One safe pattern using atomic.Value (apply to both churn tests)
- closeSentinelStream := func() {}
+ var closeSentinelStream atomic.Value
+ closeSentinelStream.Store(func() {})- closeSentinelStream = closeStream
+ closeSentinelStream.Store(closeStream)- closeSentinelStream()
+ closeSentinelStream.Load().(func())()Also applies to: 4651-4654, 4755-4773, 4988-4991
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/test/e2e/e2e_test.go` around lines 4472 - 4490, The variable
closeSentinelStream is written in one goroutine (where
transport.GetTransactionsStream is called) and invoked from another, causing a
data race; protect it by replacing the plain function variable with a
concurrency-safe holder (e.g., atomic.Value or a mutex-guarded field) used in
the goroutines that touch it: store the close function from inside the goroutine
that calls GetTransactionsStream and load it atomically (or under the same
mutex) before calling it from the other goroutine(s), and always nil-check the
loaded value before invoking; update both places that assign/read
closeSentinelStream (the loop around GetTransactionsStream and the callers at
the other noted locations) to use the new safe accessor.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/test/e2e/e2e_test.go`:
- Around line 4771-4779: TestEventListenerChurn currently drops non-retryable
errors (e.g., failures from NewClient and GetEventStream) instead of failing the
test; add an error channel (errCh) following the TestTxListenerChurn pattern and
a helper reportErr to send errors into it, then replace the bare
returns/continues inside the churn worker (the NewClient failure path and the
non-retryable GetEventStream path) with calls to reportErr(err) followed by the
same control flow (return/continue), and finally drain/assert errCh at the end
of TestEventListenerChurn (also apply the same changes to the similar block
around lines 4814-4827) so permanent server-side stream rejections surface as
test failures.
- Around line 4898-4902: The for-range over notifyErrors is a no-op; either mark
the round failed on any notify error or remove the loop. Replace the loop so
that when iterating notifyErrors you set roundOK = false (and optionally break)
if notifyErr != nil to mirror the settlement loop's behavior, referencing the
notifyErrors slice and roundOK flag; or delete the entire loop if notify
failures are intentionally ignored. Ensure the chosen change is applied where
notifyErrors is processed so behavior is consistent with the settlement logic.
---
Duplicate comments:
In `@internal/test/e2e/e2e_test.go`:
- Around line 4427-4444: The variable closeSentinelStream is written inside the
sentinel goroutine (closeSentinelStream = closeStream) and invoked from the main
goroutine, causing a data race; replace the plain func variable with a
concurrency-safe holder (e.g., store the close function in an atomic.Value or
protect access with a sync.Mutex) so writes inside the goroutine use
holder.Store(closeStream) and callers use holder.Load() (falling back to a no-op
if nil) before calling; update the sentinel that calls
transport.GetTransactionsStream and any other occurrences (the blocks around
closeSentinelStream at the other locations referenced) to use the same atomic
holder pattern to eliminate the race, and ensure sentinelDone, stressCtx, and
sentinelErrors usage remains unchanged.
| for _, notifyErr := range notifyErrors { | ||
| if notifyErr != nil { | ||
| continue | ||
| } | ||
| } |
There was a problem hiding this comment.
Dead loop body: notifyErrors iteration is a no-op.
continue on notifyErr != nil advances the range iteration, which is identical to falling through — neither branch does anything. Either the loop should contribute to roundOK or it should be removed.
🐛 Proposed fix — choose one of the two options
Option A — treat notify failures as round failures (matches settlement loop logic):
for _, notifyErr := range notifyErrors {
- if notifyErr != nil {
- continue
- }
+ if notifyErr != nil {
+ roundOK = false
+ break
+ }
}Option B — remove the loop entirely (notify errors intentionally tolerated):
-for _, notifyErr := range notifyErrors {
- if notifyErr != nil {
- continue
- }
-}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, notifyErr := range notifyErrors { | |
| if notifyErr != nil { | |
| continue | |
| } | |
| } | |
| for _, notifyErr := range notifyErrors { | |
| if notifyErr != nil { | |
| roundOK = false | |
| break | |
| } | |
| } |
| for _, notifyErr := range notifyErrors { | |
| if notifyErr != nil { | |
| continue | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/test/e2e/e2e_test.go` around lines 4898 - 4902, The for-range over
notifyErrors is a no-op; either mark the round failed on any notify error or
remove the loop. Replace the loop so that when iterating notifyErrors you set
roundOK = false (and optionally break) if notifyErr != nil to mirror the
settlement loop's behavior, referencing the notifyErrors slice and roundOK flag;
or delete the entire loop if notify failures are intentionally ignored. Ensure
the chosen change is applied where notifyErrors is processed so behavior is
consistent with the settlement logic.
0dd8a90 to
245a0a9
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/interface/grpc/handlers/arkservice.go (1)
610-610: Stale comment: "in the same routine" no longer holds.With per-listener goroutines, events for the same listener from different batches can interleave — goroutines launched for batch N+1 can send to
l.chwhile goroutines for batch N are still running. Consider updating the comment to clarify that ordering is preserved only within a single batch (one goroutine iteratesevssequentially), but not across batches.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/arkservice.go` at line 610, Update the stale comment that currently reads "forward all events in the same routine in order to preserve the ordering" to clarify ordering semantics: state that ordering is preserved only within a single batch because one goroutine iterates over evs sequentially, but with per-listener goroutines launched for each batch, events from different batches can interleave when they send to l.ch; reference l.ch and evs (and the per-listener goroutine behavior) so readers know why ordering is not guaranteed across batches.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@internal/interface/grpc/handlers/arkservice.go`:
- Around line 617-622: The event- and tx-stream handlers currently use a select
with a default case that silently drops events (select { case <-l.done: return
case l.ch <- ev.event: default }), violating delivery guarantees; replace the
non-blocking send with a blocking send that either waits or actively disconnects
slow consumers: remove the default arm so the select only has case <-l.done:
return and case l.ch <- ev.event: (making the send block), and/or implement
explicit full-channel handling by checking channel capacity and removing/closing
the listener (or increasing buffer) when l.ch is full before attempting to send;
apply the same change to the tx-stream code paths that reference l.done and l.ch
to ensure no silent drops.
In `@internal/test/e2e/e2e_test.go`:
- Around line 4898-4902: The loop over notifyErrors is a no-op because it
continues on non-nil errors instead of marking the round as failed; update the
logic in the loop that iterates notifyErrors so that if any notifyErr != nil you
set roundOK = false and break (or alternatively remove the loop entirely if
notify errors should be ignored), referencing the notifyErrors variable and the
roundOK flag in your change.
- Around line 4710-4755: The variable closeSentinelStream is written by the
sentinel goroutine (assignment closeSentinelStream = closeStream in the
anonymous goroutine around eventTransport.GetEventStream) and later invoked from
the main goroutine, causing a data race; fix it the same way as in
TestTxListenerChurn by synchronizing access to the close function: wrap
reads/writes to closeSentinelStream with a small sync.Mutex (or store the
function in an atomic.Value) so the sentinel goroutine sets the close function
under lock and the main goroutine reads/invokes it under the same lock (or use
the same atomic.Value load/store pattern), ensuring no unsynchronized read/write
of closeSentinelStream across goroutines.
- Around line 4771-4779: The churn worker currently swallows non-retryable
errors by returning/continuing instead of reporting them; update the
error-handling in TestTxListenerChurn (the block checking stressCtx.Err(),
isRetryableChurnError(err), and calling retryableSubscribeErrors.Add/
chrunWorkerBackoff) and the analogous block in TestEventListenerChurn so that
when an error is non-retryable you call reportErr(err) (or the existing
error-reporting helper used by TestTxListenerChurn) before returning/continuing;
keep the initial stressCtx.Err() short-circuit check and preserve retry behavior
for isRetryableChurnError(err) while ensuring all non-retryable failures are
surfaced via reportErr.
- Around line 4427-4474: The variable closeSentinelStream is being assigned in
the sentinel goroutine and invoked from the main goroutine without
synchronization, causing a data race; protect access by introducing a
synchronization primitive (e.g., a sync.Mutex or sync/atomic.Value) and use it
when assigning closeSentinelStream inside the goroutine (the assignment after
GetTransactionsStream) and when calling closeSentinelStream() from the main
goroutine (the caller around where it currently invokes closeSentinelStream()).
Also ensure a nil-check while holding the lock/reading the atomic value before
calling to avoid nil deref; update any related variables (e.g., sentinelDone,
stressCtx) only with the same protection if they are read/written across
goroutines.
---
Nitpick comments:
In `@internal/interface/grpc/handlers/arkservice.go`:
- Line 610: Update the stale comment that currently reads "forward all events in
the same routine in order to preserve the ordering" to clarify ordering
semantics: state that ordering is preserved only within a single batch because
one goroutine iterates over evs sequentially, but with per-listener goroutines
launched for each batch, events from different batches can interleave when they
send to l.ch; reference l.ch and evs (and the per-listener goroutine behavior)
so readers know why ordering is not guaranteed across batches.
5d9aaf7 to
81288f1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal/interface/grpc/handlers/arkservice.go (1)
625-632: Timeout-based disconnect is a good improvement over the previous silent drop, buttime.Afterper send leaks a timer until expiry.Each
time.After(5 * time.Second)allocates atime.Timerthat isn't collected until it fires, even when the send onl.chsucceeds immediately. Under bursts of events with many listeners, this can create non-trivial GC pressure.Consider using a reusable
time.Timerper goroutine:♻️ Suggested refactor
go func(l *listener[*arkv1.GetEventStreamResponse]) { + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() for _, ev := range evs { if l.includesAny(ev.topics) { + if !timeout.Stop() { + select { + case <-timeout.C: + default: + } + } + timeout.Reset(5 * time.Second) select { case <-l.done: return case l.ch <- ev.event: - case <-time.After(5 * time.Second): + case <-timeout.C: l.closeDone() return } } } }(l)The same pattern applies to
listenToTxEventsat Line 671.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/arkservice.go` around lines 625 - 632, The select uses time.After(5 * time.Second) which allocates a timer per loop and leaks until it fires; replace this with a reusable time.Timer created once in the goroutine (e.g., t := time.NewTimer(5 * time.Second)) and use t.C in the select, calling t.Stop() and draining t.C when a send on l.ch or l.done wins, and calling t.Reset(5 * time.Second) appropriately when looping; apply the same change to the analogous logic in listenToTxEvents so both handlers reuse a single timer per goroutine and avoid per-iteration allocations.internal/interface/grpc/handlers/broker_test.go (1)
139-154:require.NotPanicscannot catch panics in spawned goroutines.The goroutine launched on Line 140 runs independently; if it panics, the runtime will crash the process, not the
NotPanicswrapper. The test still works because thedonechannel + 1s timeout validates non-blocking completion, and the currentselectguards prevent panics by design. Just be aware thatNotPanicshere is cosmetic — the real assertion is the timeout-basedselecton Line 149–153.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/broker_test.go` around lines 139 - 154, The use of require.NotPanics around a spawned goroutine is ineffective; instead remove require.NotPanics and either (a) add a defer/recover inside the spawned goroutine to capture any panic and call t.Fatalf/t.Errorf, or (b) run the code synchronously so NotPanics can wrap it. Concretely, edit the test block that launches the goroutine using done/ref.done/ref.ch/ref.closeDone: delete the require.NotPanics wrapper and add at the top of the anonymous goroutine a defer func(){ if r:=recover(); r!=nil { t.Fatalf("goroutine panic: %v", r) } }() so any panic is reported to the test rather than crashing the process.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/test/e2e/e2e_test.go`:
- Around line 4769-4779: In TestEventListenerChurn, non-retryable failures from
grpcclient.NewClient and from the subsequent GetEventStream call are silently
exiting the churn worker (bare return/continue); update the churn worker loop to
mirror TestTxListenerChurn by calling reportErr(...) when a non-retryable error
is encountered instead of returning/continuing. Specifically, inside the
NewClient error branch (where streamClient, err :=
grpcclient.NewClient(serverUrl) is checked) and inside the GetEventStream error
branch, replace the bare return/continue with a reportErr call passing the
workerID and err (and keep the existing retryable handling and stressCtx.Err()
checks, backoff via churnWorkerBackoff(workerID), and retryableSubscribeErrors
behavior intact).
---
Duplicate comments:
In `@internal/test/e2e/e2e_test.go`:
- Around line 4898-4902: The loop over notifyErrors is a no-op; update it so
errors actually affect the test outcome by inspecting notifyErr and setting
roundOK=false (and optionally breaking early) when any notifyErr != nil; locate
the loop that iterates notifyErrors and change its body from a bare continue to
assign roundOK = false (and break) so failures are propagated.
- Around line 4427-4444: There is a data race on the closeSentinelStream
variable because it’s written inside the sentinel goroutine and read from the
main goroutine without synchronization; replace the plain variable with a
thread-safe holder (e.g., sync/atomic.Value) or a buffered chan to transfer the
close function from the sentinel goroutine to the main goroutine. Concretely:
declare a holder like var closeSentinelStream atomic.Value (or closeSentinelCh
:= make(chan func(),1)), in the sentinel goroutine store the obtained
closeStream via closeSentinelStream.Store(closeStream) (or send it on
closeSentinelCh) right after receiving it from
transport.GetTransactionsStream(stressCtx), and in the main goroutine read the
stored function via v := closeSentinelStream.Load().(func()) (or receive from
the channel) before invoking it; ensure imports (sync/atomic) and proper
nil-checks to avoid panics.
---
Nitpick comments:
In `@internal/interface/grpc/handlers/arkservice.go`:
- Around line 625-632: The select uses time.After(5 * time.Second) which
allocates a timer per loop and leaks until it fires; replace this with a
reusable time.Timer created once in the goroutine (e.g., t := time.NewTimer(5 *
time.Second)) and use t.C in the select, calling t.Stop() and draining t.C when
a send on l.ch or l.done wins, and calling t.Reset(5 * time.Second)
appropriately when looping; apply the same change to the analogous logic in
listenToTxEvents so both handlers reuse a single timer per goroutine and avoid
per-iteration allocations.
In `@internal/interface/grpc/handlers/broker_test.go`:
- Around line 139-154: The use of require.NotPanics around a spawned goroutine
is ineffective; instead remove require.NotPanics and either (a) add a
defer/recover inside the spawned goroutine to capture any panic and call
t.Fatalf/t.Errorf, or (b) run the code synchronously so NotPanics can wrap it.
Concretely, edit the test block that launches the goroutine using
done/ref.done/ref.ch/ref.closeDone: delete the require.NotPanics wrapper and add
at the top of the anonymous goroutine a defer func(){ if r:=recover(); r!=nil {
t.Fatalf("goroutine panic: %v", r) } }() so any panic is reported to the test
rather than crashing the process.
…stead of being silently swallowed
Fixes a DoS vector on both TX and batch event stream subscriptions where an attacker can crash the server by churning (rapidly opening and closing) gRPC streams.
The crash is caused by two concurrency bugs in the stream listener fanout:
triggering a "concurrent map iteration and map write" fatal error.
reference to the listener, the handler closes the channel, and the goroutine sends on the closed channel — panicking the server.
Both 1 and 2 are trivially triggerable by any unauthenticated client that opens and closes streams in a tight loop.
Fixes:
removed.
Resource exhaustion
The fanout select includes a default case in both listenToEvents and listenToTxEvents, so events are dropped rather than blocking when a listener's channel is full. This prevents goroutine pile-up from
clients that open streams without reading from them.
Summary by CodeRabbit
Bug Fixes
Tests