Skip to content

Stream.group_by/2 #3

@hamiltop

Description

@hamiltop

Suggestion from elixir-lang-core mailing list.

Stream.group_by/2 would end up looking something like:

grouped_stream = Streamz.group_by(stream, fn (x) -> rem(x, 3) end) # Stream that represents %{1 => stream_one, 2 => stream_two, 3 => stream_three}
grouped_stream |> Enum.each fn ({key, stream}) ->
  spawn_link fn ->
    stream |> Enum.each fn (value) ->
      IO.puts "#{value} is #{key} mod 3"
    end
  end
end

One interesting use case (though there is probably a better way to do this) would be:

  computationally_intensive_stream
  |> Streamz.group_by(fn (x) -> rem(x, 3) end)
  |> Enum.map(fn {_, stream} -> stream end)
  |> Streamz.merge

Perhaps not intended, but that code could be used to distribute the stream and execute it in parallel. There is probably a better way to do it though. Perhaps Streamz.pmap/2 is a good idea.

Implementation would be something along the lines of one process per key in the group_by map. A router would either push new elements to the existing process or spawn a new process if a new key is created. One issue would be back pressure. A good test of whether back pressure is working properly is:

Stream.cycle([1,2,3])
  |> Streamz.group_by( fn (x) -> rem(x, 3) end)
  |> Enum.map &IO.inspect/1

And do nothing else with it. If back pressure is not working, then BEAM will start consuming GBs of memory, largely due to massive inboxes for the resulting processes.

The normal solution of requiring an ack before moving on to the next data point in the stream will not work if at least one of the resulting streams is not being consumed.

Potential solutions:

  1. Do not consume from the source stream (Stream.cycle/1 in this case) until at least one downstream source is being consumed. For the streams not being consumed, either:
    a. queue the messages up.
    b. drop the messages.
  2. Do not consume from the source stream until all downstream sources are being consumed.
  3. Don't apply backpressure.

Problems:

1a. Only partial backpressure. If the source happens to only be 1% messages that make it to the downstream being consumed, we will build up a lot of queued messages.
1b. We lose messages. For some streams that is probably ok. For others, not so much.
2. We lose the ability to do things like filter on the result of group_by, which is a common pattern. We could solve this by requiring filter to happen before group_by, but that limits flexibility.
3. It is up to the user to not do stupid things.

I think 1a would be our best bet, though it would be pretty complicated. We could also allow for a flag to toggle the behavior between 1a and 1b.

The implementation I see is for the substream to send a :get message to the router process. The router then consumes data until it gets a data point that gets routed to the requesting process. This would result in an odd hybrid model of push/pull. A substream may have data in its inbox, but if it doesn't it would need to request data.

Thoughts and comments are welcome!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions