Skip to content

Demand reset bug can cause dropped events #20

@alexpearce

Description

@alexpearce

I'm aware that this library is likely EOL (#18) but wanted to share this in case others are seeing the same issue.


The dispatch_events function contains a bug when the demand reaches zero.

defp dispatch_events(state, 0, events) do
{:noreply, Enum.reverse(events), state}
end
defp dispatch_events(state, demand, events) do
case :queue.out(state.queue) do
{{:value, event}, queue} ->
dispatch_events(%{state | queue: queue}, demand - 1, [event | events])
{:empty, _} ->
{:noreply, Enum.reverse(events), %{state | demand: demand}}
end
end

The dispatch_events(state, demand, events) body does not reset state.demand. This can cause the internal demand counter to remain at 1 even though the demand has been met. This, in turn, can cause single events to be dropped as they are received from RabbitMQ, which prints a warning:

GenStage producer :my_rabbitmq_producer has discarded 1 events from buffer

This patch fixes the bug:

diff --git a/lib/wabbit/gen_stage.ex b/lib/wabbit/gen_stage.ex
index e230132..402e0a7 100644
--- a/lib/wabbit/gen_stage.ex
+++ b/lib/wabbit/gen_stage.ex
@@ -401,7 +401,8 @@ defmodule Wabbit.GenStage do
   defp dispatch_events(state, demand, events) do
     case :queue.out(state.queue) do
       {{:value, event}, queue} ->
-        dispatch_events(%{state | queue: queue}, demand - 1, [event | events])
+        new_demand = demand - 1
+        dispatch_events(%{state | queue: queue, demand: new_demand}, new_demand, [event | events])
       {:empty, _} ->
         {:noreply, Enum.reverse(events), %{state | demand: demand}}
     end

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions