Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 123 additions & 30 deletions lib/commanded/aggregates/multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Commanded.Aggregate.Multi do

@type t :: %__MODULE__{
aggregate: struct(),
executions: list(function())
executions: list({step_name :: atom(), function()})
}

defstruct [:aggregate, executions: []]
Expand All @@ -62,12 +62,32 @@ defmodule Commanded.Aggregate.Multi do

@doc """
Adds a command execute function to the multi.

If `step_name` is provided, the aggregate state after that step is
stored under that name. That can be useful in a long multi step multi
in which one needs to know what was the agg state while procesisng
the multi. It's possible, then, to pattern match the step name in the
second parameter of the anonymous function to be executed.

## Example

alias Commanded.Aggregate.Multi

aggregate
|> Multi.new()
|> Multi.execute(:interesting_event, fn aggregate ->
%Event{data: 1}
end)
|> Multi.execute(fn aggregate, %{interesting_event: _aggregate_after_interesting_event} ->
%Event{data: 2}
end)
"""
@spec execute(Multi.t(), function()) :: Multi.t()
def execute(%Multi{} = multi, execute_fun) when is_function(execute_fun, 1) do
@spec execute(Multi.t(), atom(), function()) :: Multi.t()
def execute(%Multi{} = multi, step_name \\ false, execute_fun)
when is_function(execute_fun, 1) or is_function(execute_fun, 2) do
%Multi{executions: executions} = multi

%Multi{multi | executions: [execute_fun | executions]}
%Multi{multi | executions: [{step_name, execute_fun} | executions]}
end

@doc """
Expand All @@ -78,6 +98,12 @@ defmodule Commanded.Aggregate.Multi do
state based upon events produced by previous items in the enumerable, such as
running totals.

If `step_name` is provided, the aggregate state after that step will be
stored under that name. That can be useful in a long multi step multi
in which one needs to know what was the agg state while procesisng
the multi. It's possible, then, to pattern match the step name in the
third parameter of the anonymous function to be executed.

## Example

alias Commanded.Aggregate.Multi
Expand All @@ -88,57 +114,124 @@ defmodule Commanded.Aggregate.Multi do
%AnEvent{item: item, total: aggregate.total + item}
end)

## Example with named steps

alias Commanded.Aggregate.Multi

aggregate
|> Multi.new()
|> Multi.execute(:start, fn aggregate ->
%AnEvent{item: nil, total: 0}
end)
|> Multi.reduce(:interesting_event, [1, 2, 3], fn aggregate, item ->
%AnEvent{item: item, total: aggregate.total + item}
end)
|> Multi.reduce([4, 5, 6], fn aggregate,
item,
%{
start: aggregate_state_after_start,
interesting_event: aggregate_state_after_interesting_event
} ->
%AnEvent{item: item, total: aggregate.total + item}
end)

"""
@spec reduce(Multi.t(), Enum.t(), function()) :: Multi.t()
def reduce(%Multi{} = multi, enumerable, execute_fun) when is_function(execute_fun, 2) do
@spec reduce(Multi.t(), atom(), Enum.t(), function()) :: Multi.t()
def reduce(multi, step_name \\ false, enumerable, execute_fun)

def reduce(%Multi{} = multi, step_name, enumerable, execute_fun)
when is_function(execute_fun, 2) do
Enum.reduce(enumerable, multi, fn item, %Multi{} = multi ->
execute(multi, &execute_fun.(&1, item))
execute(multi, step_name, &execute_fun.(&1, item))
end)
end

def reduce(%Multi{} = multi, step_name, enumerable, execute_fun)
when is_function(execute_fun, 3) do
Enum.reduce(enumerable, multi, fn item, %Multi{} = multi ->
execute(multi, step_name, &execute_fun.(&1, item, &2))
end)
end

@doc """
Run the execute functions contained within the multi, returning the updated
aggregate state and all created events.
aggregate state, the aggregate state for each named step and all created events.
"""
@spec run(Multi.t()) ::
{aggregate :: struct(), list(event :: struct())} | {:error, reason :: any()}
def run(%Multi{aggregate: aggregate, executions: executions}) do
try do
executions
|> Enum.reverse()
|> Enum.reduce({aggregate, []}, fn execute_fun, {aggregate, events} ->
case execute_fun.(aggregate) do
{:error, _reason} = error ->
throw(error)

%Multi{} = multi ->
case Multi.run(multi) do
{evolved_aggregate, _steps, pending_events} =
executions
|> Enum.reverse()
|> Enum.reduce({aggregate, %{}, []}, fn
{step_name, execute_fun}, {aggregate, steps, events}
when is_function(execute_fun, 1) or is_function(execute_fun, 2) ->
case execute_function(execute_fun, aggregate, steps) do
{:error, _reason} = error ->
throw(error)

{evolved_aggregate, pending_events} ->
{evolved_aggregate, events ++ pending_events}
end
%Multi{} = multi ->
case Multi.run(multi) do
{:error, _reason} = error ->
throw(error)

none when none in [:ok, nil, []] ->
{aggregate, events}
# do not leak nested multi steps to outer multis
{evolved_aggregate, pending_events} ->
updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate)

{:ok, pending_events} ->
pending_events = List.wrap(pending_events)
{evolved_aggregate, updated_steps, events ++ pending_events}
end

{apply_events(aggregate, pending_events), events ++ pending_events}
none when none in [:ok, nil, []] ->
updated_steps = maybe_update_steps(step_name, steps, aggregate)

pending_events ->
pending_events = List.wrap(pending_events)
{aggregate, updated_steps, events}

{apply_events(aggregate, pending_events), events ++ pending_events}
end
end)
{:ok, pending_events} ->
pending_events = List.wrap(pending_events)

evolved_aggregate = apply_events(aggregate, pending_events)

updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate)

{evolved_aggregate, updated_steps, events ++ pending_events}

pending_events ->
pending_events = List.wrap(pending_events)

evolved_aggregate = apply_events(aggregate, pending_events)

updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate)

{evolved_aggregate, updated_steps, events ++ pending_events}
end
end)

{evolved_aggregate, pending_events}
catch
{:error, _error} = error -> error
end
end

defp maybe_update_steps(step_name, actual_steps, aggregate_state_after_step)

defp maybe_update_steps(false, actual_steps, _aggregate_state_after_step), do: actual_steps

defp maybe_update_steps(step_name, actual_steps, aggregate_state_after_step) do
Map.put(actual_steps, step_name, aggregate_state_after_step)
end

defp execute_function(execute_fun, aggregate, _steps)
when is_function(execute_fun, 1) do
execute_fun.(aggregate)
end

defp execute_function(execute_fun, aggregate, steps)
when is_function(execute_fun, 2) do
execute_fun.(aggregate, steps)
end

defp apply_events(aggregate, events) do
Enum.reduce(events, aggregate, &aggregate.__struct__.apply(&2, &1))
end
Expand Down
137 changes: 136 additions & 1 deletion test/aggregates/multi_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,84 @@ defmodule Commanded.Aggregate.MultiTest do

%Event{data: 3}
end)
|> Multi.execute(fn %ExampleAggregate{events: events} ->
|> Multi.execute(fn %ExampleAggregate{events: events}, steps ->
assert length(events) == 3
assert steps == %{}

[]
end)
|> Multi.run()

assert events == [%Event{data: 1}, %Event{data: 2}, %Event{data: 3}]
end

test "should store aggregate state under step_name if step name is passed on multi step nested multis" do
{%ExampleAggregate{}, events} =
%ExampleAggregate{}
|> Multi.new()
|> Multi.execute(:event_1, fn %ExampleAggregate{events: events} ->
assert events == []

%Event{data: 1}
end)
|> Multi.execute(:event_2, fn %ExampleAggregate{} = aggregate, steps ->
assert steps == %{event_1: %ExampleAggregate{events: [%Event{data: 1}]}}

aggregate
|> Multi.new()
# ensure :event_2_2 won't leak into outer multi
|> Multi.execute(:event_2_2, fn %ExampleAggregate{events: events} ->
assert length(events) == 1

%Event{data: 2}
end)
end)
|> Multi.execute(:event_3, fn %ExampleAggregate{events: events}, steps ->
assert length(events) == 2

assert steps == %{
event_1: %ExampleAggregate{events: [%Event{data: 1}]},
event_2: %ExampleAggregate{events: [%Event{data: 1}, %Event{data: 2}]}
}

%Event{data: 3}
end)
|> Multi.execute(:event_4, fn %ExampleAggregate{events: events} ->
# if steps won't be used, we can pass function with arity 1
assert length(events) == 3

[]
end)
|> Multi.execute(fn %ExampleAggregate{events: events}, steps ->
# you can also not name the step even if you names previous ones
assert length(events) == 3

assert steps == %{
event_1: %Commanded.Aggregate.MultiTest.ExampleAggregate{
events: [%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1}]
},
event_2: %Commanded.Aggregate.MultiTest.ExampleAggregate{
events: [
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1},
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2}
]
},
event_3: %Commanded.Aggregate.MultiTest.ExampleAggregate{
events: [
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1},
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2},
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 3}
]
},
event_4: %Commanded.Aggregate.MultiTest.ExampleAggregate{
events: [
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1},
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2},
%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 3}
]
}
}

[]
end)
|> Multi.run()
Expand All @@ -161,5 +236,65 @@ defmodule Commanded.Aggregate.MultiTest do

assert events == [%Event{data: 1}, %Event{data: 2}, %Event{data: 3}]
end

test "should store aggregate state under step_name on reduce" do
{%ExampleAggregate{}, events} =
%ExampleAggregate{}
|> Multi.new()
|> Multi.reduce(:reduce_step, [1, 2, 3], fn %ExampleAggregate{events: events}, index ->
assert length(events) == index - 1

%Event{data: index}
end)
|> Multi.reduce(:reduce_step_2, [4, 5, 6], fn %ExampleAggregate{events: events},
index,
%{reduce_step: reduce_step} ->
assert reduce_step ==
%ExampleAggregate{
events: [
%ExampleAggregate.Event{data: 1},
%ExampleAggregate.Event{data: 2},
%ExampleAggregate.Event{data: 3}
]
}

assert length(events) == index - 1

%Event{data: index}
end)
|> Multi.execute(fn %ExampleAggregate{events: _events}, steps ->
assert steps == %{
reduce_step: %ExampleAggregate{
events: [
%ExampleAggregate.Event{data: 1},
%ExampleAggregate.Event{data: 2},
%ExampleAggregate.Event{data: 3}
]
},
reduce_step_2: %ExampleAggregate{
events: [
%ExampleAggregate.Event{data: 1},
%ExampleAggregate.Event{data: 2},
%ExampleAggregate.Event{data: 3},
%ExampleAggregate.Event{data: 4},
%ExampleAggregate.Event{data: 5},
%ExampleAggregate.Event{data: 6}
]
}
}

[]
end)
|> Multi.run()

assert events == [
%Event{data: 1},
%Event{data: 2},
%Event{data: 3},
%Event{data: 4},
%Event{data: 5},
%Event{data: 6}
]
end
end
end