-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Spec:
@spec collapse(Enumerable.t, (Enumerable.t -> [Enumerable.t]), (Enumerable.t -> any)) :: Enumerable.t
def collapse(stream, grouper, reducer)Example:
uniq_values = Streamz.collapse(values, &Stream.chunk(&1, 100), &Stream.uniq/1) |> Enum.uniqExplanation:
By breaking values up into chunks of length 100, we call uniq on them in parallel. We then call uniq on the final result. This is a building block function. It could be used to easily do parallel reduce:
def parallel_reduce(stream, reducer) do
Streamz.collapse(stream, &Stream.chunk(&1, 100), reducer) |> reducer.()
endImplementation:
This can be done right now serially via chunk, map and reduce. To do it in parallel, we can use the same building blocks as farm. group_by will break them out into a stream of streams. That stream of streams can be Stream.farm'd out and in the map we do Enum.reduce with the passed in reducer.
Why build this one? I see parallel reduce to have two components. The lazy/parallel component which must be stream based and the final Enum.reduce component which moves from a vector component to a scalar.
They could all be built together, but I think collapse may have other use cases where maintaining the stream is a good idea.
One issue here is that this stream will not emit any values until the stream has terminated... so maybe it doesn't belong in Stream. We'll see what happens here.