-
Notifications
You must be signed in to change notification settings - Fork 18
Open
Description
I'm aware that this library is likely EOL (#18) but wanted to share this in case others are seeing the same issue.
The dispatch_events function contains a bug when the demand reaches zero.
wabbit/lib/wabbit/gen_stage.ex
Lines 398 to 408 in 26cf5d2
| defp dispatch_events(state, 0, events) do | |
| {:noreply, Enum.reverse(events), state} | |
| end | |
| defp dispatch_events(state, demand, events) do | |
| case :queue.out(state.queue) do | |
| {{:value, event}, queue} -> | |
| dispatch_events(%{state | queue: queue}, demand - 1, [event | events]) | |
| {:empty, _} -> | |
| {:noreply, Enum.reverse(events), %{state | demand: demand}} | |
| end | |
| end |
The dispatch_events(state, demand, events) body does not reset state.demand. This can cause the internal demand counter to remain at 1 even though the demand has been met. This, in turn, can cause single events to be dropped as they are received from RabbitMQ, which prints a warning:
GenStage producer :my_rabbitmq_producer has discarded 1 events from buffer
This patch fixes the bug:
diff --git a/lib/wabbit/gen_stage.ex b/lib/wabbit/gen_stage.ex
index e230132..402e0a7 100644
--- a/lib/wabbit/gen_stage.ex
+++ b/lib/wabbit/gen_stage.ex
@@ -401,7 +401,8 @@ defmodule Wabbit.GenStage do
defp dispatch_events(state, demand, events) do
case :queue.out(state.queue) do
{{:value, event}, queue} ->
- dispatch_events(%{state | queue: queue}, demand - 1, [event | events])
+ new_demand = demand - 1
+ dispatch_events(%{state | queue: queue, demand: new_demand}, new_demand, [event | events])
{:empty, _} ->
{:noreply, Enum.reverse(events), %{state | demand: demand}}
endMetadata
Metadata
Assignees
Labels
No labels