diff --git a/lib/commanded/aggregates/multi.ex b/lib/commanded/aggregates/multi.ex index bc316c9e..fe188dc4 100644 --- a/lib/commanded/aggregates/multi.ex +++ b/lib/commanded/aggregates/multi.ex @@ -49,7 +49,7 @@ defmodule Commanded.Aggregate.Multi do @type t :: %__MODULE__{ aggregate: struct(), - executions: list(function()) + executions: list({step_name :: atom(), function()}) } defstruct [:aggregate, executions: []] @@ -62,12 +62,32 @@ defmodule Commanded.Aggregate.Multi do @doc """ Adds a command execute function to the multi. + + If `step_name` is provided, the aggregate state after that step is + stored under that name. That can be useful in a long multi step multi + in which one needs to know what was the agg state while procesisng + the multi. It's possible, then, to pattern match the step name in the + second parameter of the anonymous function to be executed. + + ## Example + + alias Commanded.Aggregate.Multi + + aggregate + |> Multi.new() + |> Multi.execute(:interesting_event, fn aggregate -> + %Event{data: 1} + end) + |> Multi.execute(fn aggregate, %{interesting_event: _aggregate_after_interesting_event} -> + %Event{data: 2} + end) """ - @spec execute(Multi.t(), function()) :: Multi.t() - def execute(%Multi{} = multi, execute_fun) when is_function(execute_fun, 1) do + @spec execute(Multi.t(), atom(), function()) :: Multi.t() + def execute(%Multi{} = multi, step_name \\ false, execute_fun) + when is_function(execute_fun, 1) or is_function(execute_fun, 2) do %Multi{executions: executions} = multi - %Multi{multi | executions: [execute_fun | executions]} + %Multi{multi | executions: [{step_name, execute_fun} | executions]} end @doc """ @@ -78,6 +98,12 @@ defmodule Commanded.Aggregate.Multi do state based upon events produced by previous items in the enumerable, such as running totals. + If `step_name` is provided, the aggregate state after that step will be + stored under that name. That can be useful in a long multi step multi + in which one needs to know what was the agg state while procesisng + the multi. It's possible, then, to pattern match the step name in the + third parameter of the anonymous function to be executed. + ## Example alias Commanded.Aggregate.Multi @@ -88,57 +114,124 @@ defmodule Commanded.Aggregate.Multi do %AnEvent{item: item, total: aggregate.total + item} end) + ## Example with named steps + + alias Commanded.Aggregate.Multi + + aggregate + |> Multi.new() + |> Multi.execute(:start, fn aggregate -> + %AnEvent{item: nil, total: 0} + end) + |> Multi.reduce(:interesting_event, [1, 2, 3], fn aggregate, item -> + %AnEvent{item: item, total: aggregate.total + item} + end) + |> Multi.reduce([4, 5, 6], fn aggregate, + item, + %{ + start: aggregate_state_after_start, + interesting_event: aggregate_state_after_interesting_event + } -> + %AnEvent{item: item, total: aggregate.total + item} + end) + """ - @spec reduce(Multi.t(), Enum.t(), function()) :: Multi.t() - def reduce(%Multi{} = multi, enumerable, execute_fun) when is_function(execute_fun, 2) do + @spec reduce(Multi.t(), atom(), Enum.t(), function()) :: Multi.t() + def reduce(multi, step_name \\ false, enumerable, execute_fun) + + def reduce(%Multi{} = multi, step_name, enumerable, execute_fun) + when is_function(execute_fun, 2) do Enum.reduce(enumerable, multi, fn item, %Multi{} = multi -> - execute(multi, &execute_fun.(&1, item)) + execute(multi, step_name, &execute_fun.(&1, item)) + end) + end + + def reduce(%Multi{} = multi, step_name, enumerable, execute_fun) + when is_function(execute_fun, 3) do + Enum.reduce(enumerable, multi, fn item, %Multi{} = multi -> + execute(multi, step_name, &execute_fun.(&1, item, &2)) end) end @doc """ Run the execute functions contained within the multi, returning the updated - aggregate state and all created events. + aggregate state, the aggregate state for each named step and all created events. """ @spec run(Multi.t()) :: {aggregate :: struct(), list(event :: struct())} | {:error, reason :: any()} def run(%Multi{aggregate: aggregate, executions: executions}) do try do - executions - |> Enum.reverse() - |> Enum.reduce({aggregate, []}, fn execute_fun, {aggregate, events} -> - case execute_fun.(aggregate) do - {:error, _reason} = error -> - throw(error) - - %Multi{} = multi -> - case Multi.run(multi) do + {evolved_aggregate, _steps, pending_events} = + executions + |> Enum.reverse() + |> Enum.reduce({aggregate, %{}, []}, fn + {step_name, execute_fun}, {aggregate, steps, events} + when is_function(execute_fun, 1) or is_function(execute_fun, 2) -> + case execute_function(execute_fun, aggregate, steps) do {:error, _reason} = error -> throw(error) - {evolved_aggregate, pending_events} -> - {evolved_aggregate, events ++ pending_events} - end + %Multi{} = multi -> + case Multi.run(multi) do + {:error, _reason} = error -> + throw(error) - none when none in [:ok, nil, []] -> - {aggregate, events} + # do not leak nested multi steps to outer multis + {evolved_aggregate, pending_events} -> + updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate) - {:ok, pending_events} -> - pending_events = List.wrap(pending_events) + {evolved_aggregate, updated_steps, events ++ pending_events} + end - {apply_events(aggregate, pending_events), events ++ pending_events} + none when none in [:ok, nil, []] -> + updated_steps = maybe_update_steps(step_name, steps, aggregate) - pending_events -> - pending_events = List.wrap(pending_events) + {aggregate, updated_steps, events} - {apply_events(aggregate, pending_events), events ++ pending_events} - end - end) + {:ok, pending_events} -> + pending_events = List.wrap(pending_events) + + evolved_aggregate = apply_events(aggregate, pending_events) + + updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate) + + {evolved_aggregate, updated_steps, events ++ pending_events} + + pending_events -> + pending_events = List.wrap(pending_events) + + evolved_aggregate = apply_events(aggregate, pending_events) + + updated_steps = maybe_update_steps(step_name, steps, evolved_aggregate) + + {evolved_aggregate, updated_steps, events ++ pending_events} + end + end) + + {evolved_aggregate, pending_events} catch {:error, _error} = error -> error end end + defp maybe_update_steps(step_name, actual_steps, aggregate_state_after_step) + + defp maybe_update_steps(false, actual_steps, _aggregate_state_after_step), do: actual_steps + + defp maybe_update_steps(step_name, actual_steps, aggregate_state_after_step) do + Map.put(actual_steps, step_name, aggregate_state_after_step) + end + + defp execute_function(execute_fun, aggregate, _steps) + when is_function(execute_fun, 1) do + execute_fun.(aggregate) + end + + defp execute_function(execute_fun, aggregate, steps) + when is_function(execute_fun, 2) do + execute_fun.(aggregate, steps) + end + defp apply_events(aggregate, events) do Enum.reduce(events, aggregate, &aggregate.__struct__.apply(&2, &1)) end diff --git a/test/aggregates/multi_test.exs b/test/aggregates/multi_test.exs index bec76f6e..ffb6ce0c 100644 --- a/test/aggregates/multi_test.exs +++ b/test/aggregates/multi_test.exs @@ -138,9 +138,84 @@ defmodule Commanded.Aggregate.MultiTest do %Event{data: 3} end) - |> Multi.execute(fn %ExampleAggregate{events: events} -> + |> Multi.execute(fn %ExampleAggregate{events: events}, steps -> + assert length(events) == 3 + assert steps == %{} + + [] + end) + |> Multi.run() + + assert events == [%Event{data: 1}, %Event{data: 2}, %Event{data: 3}] + end + + test "should store aggregate state under step_name if step name is passed on multi step nested multis" do + {%ExampleAggregate{}, events} = + %ExampleAggregate{} + |> Multi.new() + |> Multi.execute(:event_1, fn %ExampleAggregate{events: events} -> + assert events == [] + + %Event{data: 1} + end) + |> Multi.execute(:event_2, fn %ExampleAggregate{} = aggregate, steps -> + assert steps == %{event_1: %ExampleAggregate{events: [%Event{data: 1}]}} + + aggregate + |> Multi.new() + # ensure :event_2_2 won't leak into outer multi + |> Multi.execute(:event_2_2, fn %ExampleAggregate{events: events} -> + assert length(events) == 1 + + %Event{data: 2} + end) + end) + |> Multi.execute(:event_3, fn %ExampleAggregate{events: events}, steps -> + assert length(events) == 2 + + assert steps == %{ + event_1: %ExampleAggregate{events: [%Event{data: 1}]}, + event_2: %ExampleAggregate{events: [%Event{data: 1}, %Event{data: 2}]} + } + + %Event{data: 3} + end) + |> Multi.execute(:event_4, fn %ExampleAggregate{events: events} -> + # if steps won't be used, we can pass function with arity 1 + assert length(events) == 3 + + [] + end) + |> Multi.execute(fn %ExampleAggregate{events: events}, steps -> + # you can also not name the step even if you names previous ones assert length(events) == 3 + assert steps == %{ + event_1: %Commanded.Aggregate.MultiTest.ExampleAggregate{ + events: [%Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1}] + }, + event_2: %Commanded.Aggregate.MultiTest.ExampleAggregate{ + events: [ + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1}, + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2} + ] + }, + event_3: %Commanded.Aggregate.MultiTest.ExampleAggregate{ + events: [ + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1}, + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2}, + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 3} + ] + }, + event_4: %Commanded.Aggregate.MultiTest.ExampleAggregate{ + events: [ + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 1}, + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 2}, + %Commanded.Aggregate.MultiTest.ExampleAggregate.Event{data: 3} + ] + } + } + [] end) |> Multi.run() @@ -161,5 +236,65 @@ defmodule Commanded.Aggregate.MultiTest do assert events == [%Event{data: 1}, %Event{data: 2}, %Event{data: 3}] end + + test "should store aggregate state under step_name on reduce" do + {%ExampleAggregate{}, events} = + %ExampleAggregate{} + |> Multi.new() + |> Multi.reduce(:reduce_step, [1, 2, 3], fn %ExampleAggregate{events: events}, index -> + assert length(events) == index - 1 + + %Event{data: index} + end) + |> Multi.reduce(:reduce_step_2, [4, 5, 6], fn %ExampleAggregate{events: events}, + index, + %{reduce_step: reduce_step} -> + assert reduce_step == + %ExampleAggregate{ + events: [ + %ExampleAggregate.Event{data: 1}, + %ExampleAggregate.Event{data: 2}, + %ExampleAggregate.Event{data: 3} + ] + } + + assert length(events) == index - 1 + + %Event{data: index} + end) + |> Multi.execute(fn %ExampleAggregate{events: _events}, steps -> + assert steps == %{ + reduce_step: %ExampleAggregate{ + events: [ + %ExampleAggregate.Event{data: 1}, + %ExampleAggregate.Event{data: 2}, + %ExampleAggregate.Event{data: 3} + ] + }, + reduce_step_2: %ExampleAggregate{ + events: [ + %ExampleAggregate.Event{data: 1}, + %ExampleAggregate.Event{data: 2}, + %ExampleAggregate.Event{data: 3}, + %ExampleAggregate.Event{data: 4}, + %ExampleAggregate.Event{data: 5}, + %ExampleAggregate.Event{data: 6} + ] + } + } + + [] + end) + |> Multi.run() + + assert events == [ + %Event{data: 1}, + %Event{data: 2}, + %Event{data: 3}, + %Event{data: 4}, + %Event{data: 5}, + %Event{data: 6} + ] + end end end