From 718912b5abea966236d5f627d28a4f473100d412 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 24 Sep 2025 08:09:39 -0500 Subject: [PATCH 1/6] Change concurrency default to unlimited --- phlex/core/framework_graph.hpp | 12 ++++--- phlex/core/glue.hpp | 8 ++--- phlex/core/graph_proxy.hpp | 21 +++++------ test/benchmarks/accept_even_ids.cpp | 6 ++-- test/benchmarks/accept_even_numbers.cpp | 5 +-- test/benchmarks/accept_fibonacci_numbers.cpp | 3 +- test/benchmarks/last_index.cpp | 2 +- test/benchmarks/plus_101.cpp | 2 +- test/benchmarks/plus_one.cpp | 2 +- test/benchmarks/read_id.cpp | 5 +-- test/benchmarks/read_index.cpp | 3 +- test/benchmarks/verify_difference.cpp | 3 +- .../verify_even_fibonacci_numbers.cpp | 3 +- test/cached_execution.cpp | 18 ++++------ test/class_registration.cpp | 12 +++---- test/demo-giantdata/unfold_transform_fold.cpp | 2 +- test/different_hierarchies.cpp | 13 ++++--- test/filter.cpp | 36 +++++++++---------- test/fold.cpp | 7 ++-- test/hierarchical_nodes.cpp | 19 ++++------ test/memory-checks/many_events.cpp | 4 +-- test/multiple_function_registration.cpp | 12 +++---- test/plugins/module.cpp | 10 ++---- test/plugins/output.cpp | 3 +- test/unfold.cpp | 4 +-- 25 files changed, 93 insertions(+), 122 deletions(-) diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 18aa00cc..d1c04450 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -71,7 +71,7 @@ namespace phlex::experimental { template auto fold(std::string name, is_fold_like auto f, - concurrency c = concurrency::serial, + concurrency c, std::string partition = "job", InitArgs&&... init_args) { @@ -92,17 +92,21 @@ namespace phlex::experimental { std::move(pred), std::move(unf), c, std::move(destination_data_layer)); } - auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::serial) + auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::unlimited) { return make_glue().observe(std::move(name), std::move(f), c); } - auto predicate(std::string name, is_predicate_like auto f, concurrency c = concurrency::serial) + auto predicate(std::string name, + is_predicate_like auto f, + concurrency c = concurrency::unlimited) { return make_glue().predicate(std::move(name), std::move(f), c); } - auto transform(std::string name, is_transform_like auto f, concurrency c = concurrency::serial) + auto transform(std::string name, + is_transform_like auto f, + concurrency c = concurrency::unlimited) { return make_glue().transform(std::move(name), std::move(f), c); } diff --git a/phlex/core/glue.hpp b/phlex/core/glue.hpp index 7f0cfdbb..bd5afac3 100644 --- a/phlex/core/glue.hpp +++ b/phlex/core/glue.hpp @@ -57,7 +57,7 @@ namespace phlex::experimental { } template - auto observe(std::string name, FT f, concurrency c) + auto observe(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -70,7 +70,7 @@ namespace phlex::experimental { } template - auto transform(std::string name, FT f, concurrency c) + auto transform(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -83,7 +83,7 @@ namespace phlex::experimental { } template - auto predicate(std::string name, FT f, concurrency c) + auto predicate(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -124,7 +124,7 @@ namespace phlex::experimental { std::move(destination_data_layer)); } - auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial) + auto output(std::string name, is_output_like auto f, concurrency c = concurrency::unlimited) { return output_api{nodes_.registrar_for(errors_), config_, diff --git a/phlex/core/graph_proxy.hpp b/phlex/core/graph_proxy.hpp index 8e5239d8..dc5f39ff 100644 --- a/phlex/core/graph_proxy.hpp +++ b/phlex/core/graph_proxy.hpp @@ -46,7 +46,7 @@ namespace phlex::experimental { template auto fold(std::string name, is_fold_like auto f, - concurrency c = concurrency::serial, + concurrency c, std::string partition = "job", InitArgs&&... init_args) { @@ -57,37 +57,38 @@ namespace phlex::experimental { std::forward(init_args)...); } - auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::serial) + auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::unlimited) { return create_glue().observe(std::move(name), std::move(f), c); } - auto predicate(std::string name, is_predicate_like auto f, concurrency c = concurrency::serial) + auto predicate(std::string name, + is_predicate_like auto f, + concurrency c = concurrency::unlimited) { return create_glue().predicate(std::move(name), std::move(f), c); } - auto transform(std::string name, is_transform_like auto f, concurrency c = concurrency::serial) + auto transform(std::string name, + is_transform_like auto f, + concurrency c = concurrency::unlimited) { return create_glue().transform(std::move(name), std::move(f), c); } template - auto unfold(std::string name, - is_predicate_like auto pred, - auto unf, - concurrency c = concurrency::serial) + auto unfold(std::string name, is_predicate_like auto pred, auto unf, concurrency c) { return create_glue(false).unfold(std::move(name), std::move(pred), std::move(unf), c); } template - auto unfold(is_predicate_like auto pred, auto unf, concurrency c = concurrency::serial) + auto unfold(is_predicate_like auto pred, auto unf, concurrency c) { return create_glue(false).unfold(std::move(pred), std::move(unf), c); } - auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial) + auto output(std::string name, is_output_like auto f, concurrency c = concurrency::unlimited) { return create_glue().output(std::move(name), std::move(f), c); } diff --git a/test/benchmarks/accept_even_ids.cpp b/test/benchmarks/accept_even_ids.cpp index e490c46b..f15df997 100644 --- a/test/benchmarks/accept_even_ids.cpp +++ b/test/benchmarks/accept_even_ids.cpp @@ -5,9 +5,7 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.predicate( - "accept_even_ids", - [](phlex::experimental::level_id const& id) { return id.number() % 2 == 0; }, - phlex::experimental::concurrency::unlimited) + m.predicate("accept_even_ids", + [](phlex::experimental::level_id const& id) { return id.number() % 2 == 0; }) .input_family(config.get("product_name")); } diff --git a/test/benchmarks/accept_even_numbers.cpp b/test/benchmarks/accept_even_numbers.cpp index 33982374..3aa7c22d 100644 --- a/test/benchmarks/accept_even_numbers.cpp +++ b/test/benchmarks/accept_even_numbers.cpp @@ -4,9 +4,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.predicate( - "accept_even_numbers", - [](int i) { return i % 2 == 0; }, - phlex::experimental::concurrency::unlimited) + m.predicate("accept_even_numbers", [](int i) { return i % 2 == 0; }) .input_family(config.get("consumes")); } diff --git a/test/benchmarks/accept_fibonacci_numbers.cpp b/test/benchmarks/accept_fibonacci_numbers.cpp index e1438f20..1feba486 100644 --- a/test/benchmarks/accept_fibonacci_numbers.cpp +++ b/test/benchmarks/accept_fibonacci_numbers.cpp @@ -6,7 +6,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.make(config.get("max_number")) - .predicate( - "accept", &test::fibonacci_numbers::accept, phlex::experimental::concurrency::unlimited) + .predicate("accept", &test::fibonacci_numbers::accept) .input_family(config.get("consumes")); } diff --git a/test/benchmarks/last_index.cpp b/test/benchmarks/last_index.cpp index bf113dfc..b4a9b9d3 100644 --- a/test/benchmarks/last_index.cpp +++ b/test/benchmarks/last_index.cpp @@ -9,7 +9,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.transform("last_index", last_index, concurrency::unlimited) + m.transform("last_index", last_index) .input_family("id") .output_products(config.get("produces", "a")); } diff --git a/test/benchmarks/plus_101.cpp b/test/benchmarks/plus_101.cpp index 0d7b0ed5..39d20c52 100644 --- a/test/benchmarks/plus_101.cpp +++ b/test/benchmarks/plus_101.cpp @@ -8,5 +8,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_101", plus_101, concurrency::unlimited).input_family("a").output_products("c"); + m.transform("plus_101", plus_101).input_family("a").output_products("c"); } diff --git a/test/benchmarks/plus_one.cpp b/test/benchmarks/plus_one.cpp index fef141cc..c996fa0f 100644 --- a/test/benchmarks/plus_one.cpp +++ b/test/benchmarks/plus_one.cpp @@ -8,5 +8,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_one", plus_one, concurrency::unlimited).input_family("a").output_products("b"); + m.transform("plus_one", plus_one).input_family("a").output_products("b"); } diff --git a/test/benchmarks/read_id.cpp b/test/benchmarks/read_id.cpp index cc8c1667..b72fe016 100644 --- a/test/benchmarks/read_id.cpp +++ b/test/benchmarks/read_id.cpp @@ -5,7 +5,4 @@ namespace { void read_id(phlex::experimental::level_id const&) {} } -PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) -{ - m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited).input_family("id"); -} +PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { m.observe("read_id", read_id).input_family("id"); } diff --git a/test/benchmarks/read_index.cpp b/test/benchmarks/read_index.cpp index f2de1b18..2430b0db 100644 --- a/test/benchmarks/read_index.cpp +++ b/test/benchmarks/read_index.cpp @@ -7,6 +7,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.observe("read_index", read_index, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + m.observe("read_index", read_index).input_family(config.get("consumes")); } diff --git a/test/benchmarks/verify_difference.cpp b/test/benchmarks/verify_difference.cpp index 9294650a..33ee4f65 100644 --- a/test/benchmarks/verify_difference.cpp +++ b/test/benchmarks/verify_difference.cpp @@ -8,7 +8,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.observe( "verify_difference", - [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }, - concurrency::unlimited) + [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }) .input_family(config.get("i", "b"), config.get("j", "c")); } diff --git a/test/benchmarks/verify_even_fibonacci_numbers.cpp b/test/benchmarks/verify_even_fibonacci_numbers.cpp index 3da779f2..68d9a9da 100644 --- a/test/benchmarks/verify_even_fibonacci_numbers.cpp +++ b/test/benchmarks/verify_even_fibonacci_numbers.cpp @@ -18,7 +18,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { using namespace test; m.make(config.get("max_number")) - .observe( - "only_even", &even_fibonacci_numbers::only_even, phlex::experimental::concurrency::unlimited) + .observe("only_even", &even_fibonacci_numbers::only_even) .input_family(config.get("consumes")); } diff --git a/test/cached_execution.cpp b/test/cached_execution.cpp index 87b12d9f..a6a5eea0 100644 --- a/test/cached_execution.cpp +++ b/test/cached_execution.cpp @@ -46,24 +46,18 @@ TEST_CASE("Cached function calls", "[data model]") { framework_graph g{detail::create_next()}; - g.transform("A1", call_one, concurrency::unlimited) - .input_family("number"_in("run")) - .output_products("one"); - g.transform("A2", call_one, concurrency::unlimited) - .input_family("one"_in("run")) - .output_products("used_one"); - g.transform("A3", call_one, concurrency::unlimited) - .input_family("used_one"_in("run")) - .output_products("done_one"); + g.transform("A1", call_one).input_family("number"_in("run")).output_products("one"); + g.transform("A2", call_one).input_family("one"_in("run")).output_products("used_one"); + g.transform("A3", call_one).input_family("used_one"_in("run")).output_products("done_one"); - g.transform("B1", call_two, concurrency::unlimited) + g.transform("B1", call_two) .input_family("one"_in("run"), "another"_in("subrun")) .output_products("two"); - g.transform("B2", call_two, concurrency::unlimited) + g.transform("B2", call_two) .input_family("used_one"_in("run"), "two"_in("subrun")) .output_products("used_two"); - g.transform("C", call_two, concurrency::unlimited) + g.transform("C", call_two) .input_family("used_two"_in("subrun"), "still"_in("event")) .output_products("three"); diff --git a/test/class_registration.cpp b/test/class_registration.cpp index bb537b28..9758cb1b 100644 --- a/test/class_registration.cpp +++ b/test/class_registration.cpp @@ -62,37 +62,37 @@ TEST_CASE("Call non-framework functions", "[programming model]") auto glueball = g.make(); SECTION("No framework") { - glueball.transform("no_framework", &A::no_framework, concurrency::unlimited) + glueball.transform("no_framework", &A::no_framework) .input_family(product_names) .output_products(oproduct_names); } SECTION("No framework, all references") { - glueball.transform("no_framework_all_refs", &A::no_framework_all_refs, concurrency::unlimited) + glueball.transform("no_framework_all_refs", &A::no_framework_all_refs) .input_family(product_names) .output_products(oproduct_names); } SECTION("No framework, all pointers") { - glueball.transform("no_framework_all_ptrs", &A::no_framework_all_ptrs, concurrency::unlimited) + glueball.transform("no_framework_all_ptrs", &A::no_framework_all_ptrs) .input_family(product_names) .output_products(oproduct_names); } SECTION("One framework argument") { - glueball.transform("one_framework_arg", &A::one_framework_arg, concurrency::unlimited) + glueball.transform("one_framework_arg", &A::one_framework_arg) .input_family(product_names) .output_products(oproduct_names); } SECTION("All framework arguments") { - glueball.transform("all_framework_args", &A::all_framework_args, concurrency::unlimited) + glueball.transform("all_framework_args", &A::all_framework_args) .input_family(product_names) .output_products(oproduct_names); } // The following is invoked for *each* section above - g.observe("verify_results", verify_results, concurrency::unlimited).input_family(product_names); + g.observe("verify_results", verify_results).input_family(product_names); g.execute(); } diff --git a/test/demo-giantdata/unfold_transform_fold.cpp b/test/demo-giantdata/unfold_transform_fold.cpp index 8c48565b..35242fed 100644 --- a/test/demo-giantdata/unfold_transform_fold.cpp +++ b/test/demo-giantdata/unfold_transform_fold.cpp @@ -130,7 +130,7 @@ int main(int argc, char* argv[]) return demo::clampWaveforms(*hwf, run_id, subrun_id, spill_id, apa_id); }; - g.transform("clamp_node", wrapped_user_function, concurrency::unlimited) + g.transform("clamp_node", wrapped_user_function) .input_family("waves_in_apa"_in("APA")) .output_products("clamped_waves"); diff --git a/test/different_hierarchies.cpp b/test/different_hierarchies.cpp index 9f82fdec..c3c062f8 100644 --- a/test/different_hierarchies.cpp +++ b/test/different_hierarchies.cpp @@ -84,12 +84,15 @@ TEST_CASE("Different hierarchies used with fold", "[graph]") .output_products("run_sum"); g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); - g.observe("verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }) + g.observe( + "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::serial) .input_family("run_sum"); - g.observe("verify_job_sum", - [](unsigned int actual) { - CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives - }) + g.observe( + "verify_job_sum", + [](unsigned int actual) { + CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives + }, + concurrency::serial) .input_family("job_sum"); g.execute(); diff --git a/test/filter.cpp b/test/filter.cpp index 5e12f236..ba5593b3 100644 --- a/test/filter.cpp +++ b/test/filter.cpp @@ -85,14 +85,14 @@ namespace { TEST_CASE("Two predicates", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("evens_only", evens_only).input_family("num"_in("event")); + g.predicate("odds_only", odds_only).input_family("num"_in("event")); g.make(20u) - .observe("add_evens", &sum_numbers::add, concurrency::unlimited) + .observe("add_evens", &sum_numbers::add, concurrency::serial) .input_family("num"_in("event")) .when("evens_only"); g.make(25u) - .observe("add_odds", &sum_numbers::add, concurrency::unlimited) + .observe("add_odds", &sum_numbers::add, concurrency::serial) .input_family("num"_in("event")) .when("odds_only"); @@ -105,12 +105,10 @@ TEST_CASE("Two predicates", "[filtering]") TEST_CASE("Two predicates in series", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited) - .input_family("num") - .when("evens_only"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num").when("evens_only"); g.make(0u) - .observe("add", &sum_numbers::add, concurrency::unlimited) + .observe("add", &sum_numbers::add, concurrency::serial) .input_family("num") .when("odds_only"); @@ -122,10 +120,10 @@ TEST_CASE("Two predicates in series", "[filtering]") TEST_CASE("Two predicates in parallel", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num"); g.make(0u) - .observe("add", &sum_numbers::add, concurrency::unlimited) + .observe("add", &sum_numbers::add, concurrency::serial) .input_family("num") .when("odds_only", "evens_only"); @@ -147,16 +145,14 @@ TEST_CASE("Three predicates in parallel", "[filtering]") framework_graph g{source{10u}}; for (auto const& [name, b, e] : configs) { - g.make(b, e) - .predicate(name, ¬_in_range::eval, concurrency::unlimited) - .input_family("num"); + g.make(b, e).predicate(name, ¬_in_range::eval).input_family("num"); } std::vector const predicate_names{ "exclude_0_to_4", "exclude_6_to_7", "exclude_gt_8"}; auto const expected_numbers = {4u, 5u, 7u}; g.make(expected_numbers) - .observe("collect", &collect_numbers::collect, concurrency::unlimited) + .observe("collect", &collect_numbers::collect, concurrency::serial) .input_family("num") .when(predicate_names); @@ -168,15 +164,15 @@ TEST_CASE("Three predicates in parallel", "[filtering]") TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num"); g.make(5 * 100) - .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::unlimited) + .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::serial) .input_family("num", "other_num") // <= Note input order .when("evens_only"); g.make(-5 * 100) - .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::unlimited) + .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::serial) .input_family("other_num", "num") // <= Note input order .when("odds_only"); diff --git a/test/fold.cpp b/test/fold.cpp index 6654b9f9..b9c7b42d 100644 --- a/test/fold.cpp +++ b/test/fold.cpp @@ -61,7 +61,6 @@ TEST_CASE("Different levels of fold", "[graph]") } }; - // framework_graph g{levels_to_process}; framework_graph g{levels_to_process}; g.fold("run_add", add, concurrency::unlimited, "run") @@ -74,15 +73,15 @@ TEST_CASE("Different levels of fold", "[graph]") .output_products("two_layer_job_sum"); g.observe( - "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::unlimited) + "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::serial) .input_family("run_sum"); g.observe( "verify_two_layer_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, - concurrency::unlimited) + concurrency::serial) .input_family("two_layer_job_sum"); g.observe( - "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) + "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::serial) .input_family("job_sum"); g.execute(); diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index c72d81e1..5f2343a3 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -102,25 +102,20 @@ TEST_CASE("Hierarchical nodes", "[graph]") { framework_graph g{levels_to_process}; - g.transform("get_the_time", strtime, concurrency::unlimited) - .input_family("time") - .when() - .output_products("strtime"); - g.transform("square", square, concurrency::unlimited) - .input_family("number") - .output_products("squared_number"); + g.transform("get_the_time", strtime).input_family("time").when().output_products("strtime"); + g.transform("square", square).input_family("number").output_products("squared_number"); g.fold("add", add, concurrency::unlimited, "run", 15u) .input_family("squared_number") .when() .output_products("added_data"); - g.transform("scale", scale, concurrency::unlimited) - .input_family("added_data") - .output_products("result"); - g.observe("print_result", print_result, concurrency::unlimited).input_family("result", "strtime"); + g.transform("scale", scale).input_family("added_data").output_products("result"); + g.observe("print_result", print_result).input_family("result", "strtime"); - g.make().output("save", &test::products_for_output::save).when(); + g.make() + .output("save", &test::products_for_output::save, concurrency::serial) + .when(); g.execute(); diff --git a/test/memory-checks/many_events.cpp b/test/memory-checks/many_events.cpp index f6175d50..4d54f7e9 100644 --- a/test/memory-checks/many_events.cpp +++ b/test/memory-checks/many_events.cpp @@ -25,8 +25,6 @@ int main() }; framework_graph g{levels_to_process}; - g.transform("pass_on", pass_on, concurrency::unlimited) - .input_family("number") - .output_products("different"); + g.transform("pass_on", pass_on).input_family("number").output_products("different"); g.execute(); } diff --git a/test/multiple_function_registration.cpp b/test/multiple_function_registration.cpp index 71f56403..1ba12a30 100644 --- a/test/multiple_function_registration.cpp +++ b/test/multiple_function_registration.cpp @@ -48,29 +48,29 @@ TEST_CASE("Call multiple functions", "[programming model]") SECTION("All free functions") { - g.transform("square_numbers", square_numbers, concurrency::unlimited) + g.transform("square_numbers", square_numbers) .input_family("numbers") .output_products("squared_numbers"); - g.transform("sum_numbers", sum_numbers, concurrency::unlimited) + g.transform("sum_numbers", sum_numbers) .input_family("squared_numbers") .output_products("summed_numbers"); - g.transform("sqrt_sum_numbers", sqrt_sum_numbers, concurrency::unlimited) + g.transform("sqrt_sum_numbers", sqrt_sum_numbers) .input_family("summed_numbers", "offset") .output_products("result"); } SECTION("Transforms, one from a class") { - g.transform("square_numbers", square_numbers, concurrency::unlimited) + g.transform("square_numbers", square_numbers) .input_family("numbers") .output_products("squared_numbers"); - g.transform("sum_numbers", sum_numbers, concurrency::unlimited) + g.transform("sum_numbers", sum_numbers) .input_family("squared_numbers") .output_products("summed_numbers"); g.make() - .transform("sqrt_sum", &A::sqrt_sum, concurrency::unlimited) + .transform("sqrt_sum", &A::sqrt_sum) .input_family("summed_numbers", "offset") .output_products("result"); } diff --git a/test/plugins/module.cpp b/test/plugins/module.cpp index de874118..beaaaf0d 100644 --- a/test/plugins/module.cpp +++ b/test/plugins/module.cpp @@ -5,14 +5,8 @@ using namespace phlex::experimental; -// TODO: Option to select which algorithm to run via configuration? - PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("add", test::add, concurrency::unlimited) - .input_family("i", "j") - .output_products("sum"); - m.observe( - "verify", [](int actual) { assert(actual == 0); }, concurrency::unlimited) - .input_family("sum"); + m.transform("add", test::add).input_family("i", "j").output_products("sum"); + m.observe("verify", [](int actual) { assert(actual == 0); }).input_family("sum"); } diff --git a/test/plugins/output.cpp b/test/plugins/output.cpp index b0d24fd7..d7e376b2 100644 --- a/test/plugins/output.cpp +++ b/test/plugins/output.cpp @@ -5,6 +5,5 @@ using namespace phlex::experimental::test; PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.make().output( - "save", &products_for_output::save, phlex::experimental::concurrency::unlimited); + m.make().output("save", &products_for_output::save); } diff --git a/test/unfold.cpp b/test/unfold.cpp index 2690b1bc..ab5d3550 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -103,7 +103,7 @@ TEST_CASE("Splitting the processing", "[graph]") g.fold("add", add, concurrency::unlimited, "event") .input_family("new_number") .output_products("sum1"); - g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"); + g.observe("check_sum", check_sum, concurrency::serial).input_family("sum1"); g.unfold( &iterate_through::predicate, &iterate_through::unfold, concurrency::unlimited, "lower2") @@ -112,7 +112,7 @@ TEST_CASE("Splitting the processing", "[graph]") g.fold("add_numbers", add_numbers, concurrency::unlimited, "event") .input_family("each_number") .output_products("sum2"); - g.observe("check_sum_same", check_sum_same, concurrency::unlimited).input_family("sum2"); + g.observe("check_sum_same", check_sum_same, concurrency::serial).input_family("sum2"); g.make().output( "save", &test::products_for_output::save, concurrency::serial); From 7e23995950008d8ba82e3262d42d3bcb4a2b1ad1 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 17 Oct 2025 09:58:08 -0500 Subject: [PATCH 2/6] [TEMPORARY] Reduce execution times for mock workflow --- test/mock-workflow/G4Stage1.libsonnet | 2 +- test/mock-workflow/G4Stage2.libsonnet | 4 ++-- test/mock-workflow/SinglesGen.libsonnet | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/mock-workflow/G4Stage1.libsonnet b/test/mock-workflow/G4Stage1.libsonnet index 34559327..6dddee17 100644 --- a/test/mock-workflow/G4Stage1.libsonnet +++ b/test/mock-workflow/G4Stage1.libsonnet @@ -3,7 +3,7 @@ local generators = import 'SinglesGen.libsonnet'; { largeant: { plugin: 'largeant', - duration_usec: 15662051, + duration_usec: 156, # Typical: 15662051 inputs: [f + "/MCTruths" for f in std.objectFields(generators)], outputs: ["ParticleAncestryMap", "Assns", "SimEnergyDeposits", "AuxDetHits", "MCParticles"], } diff --git a/test/mock-workflow/G4Stage2.libsonnet b/test/mock-workflow/G4Stage2.libsonnet index 89482c45..40d7720f 100644 --- a/test/mock-workflow/G4Stage2.libsonnet +++ b/test/mock-workflow/G4Stage2.libsonnet @@ -3,13 +3,13 @@ local g4stage1 = import 'G4Stage1.libsonnet'; { IonAndScint: { plugin: 'ion_and_scint', - duration_usec: 5457973, + duration_usec: 546, # Typical: 5457973 inputs: [f + "/SimEnergyDeposits" for f in std.objectFields(g4stage1)], outputs: ["SimEnergyDeposits", "SimEnergyDeposits_priorSCE"], }, PDFastSim: { plugin: 'pd_fast_sim', - duration_usec: 69, #69681950, + duration_usec: 69, # Typical: 69681950 inputs: ['SimEnergyDeposits_priorSCE'], outputs: ['SimPhotonLites', 'OpDetBacktrackerRecords'], } diff --git a/test/mock-workflow/SinglesGen.libsonnet b/test/mock-workflow/SinglesGen.libsonnet index e8d10977..b84df7b5 100644 --- a/test/mock-workflow/SinglesGen.libsonnet +++ b/test/mock-workflow/SinglesGen.libsonnet @@ -13,7 +13,7 @@ }, cosmicgenerator: { plugin: "MC_truth_algorithm", - duration_usec: 4926215, + duration_usec: 492, # Typical: 4926215 inputs: ["id"], outputs: ["MCTruths"] }, From 11bea81e0bcf499ed69b93b40314741623ee258c Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 17 Oct 2025 08:50:15 -0500 Subject: [PATCH 3/6] Require strict equality in tests --- test/cached_execution.cpp | 11 +++++------ test/hierarchical_nodes.cpp | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/test/cached_execution.cpp b/test/cached_execution.cpp index a6a5eea0..903abf9f 100644 --- a/test/cached_execution.cpp +++ b/test/cached_execution.cpp @@ -63,13 +63,12 @@ TEST_CASE("Cached function calls", "[data model]") g.execute(); - // FIXME: Need to improve the synchronization to supply strict equality - CHECK(g.execution_counts("A1") >= n_runs); - CHECK(g.execution_counts("A2") >= n_runs); - CHECK(g.execution_counts("A3") >= n_runs); + CHECK(g.execution_counts("A1") == n_runs); + CHECK(g.execution_counts("A2") == n_runs); + CHECK(g.execution_counts("A3") == n_runs); - CHECK(g.execution_counts("B1") >= n_runs * n_subruns); - CHECK(g.execution_counts("B2") >= n_runs * n_subruns); + CHECK(g.execution_counts("B1") == n_runs * n_subruns); + CHECK(g.execution_counts("B2") == n_runs * n_subruns); CHECK(g.execution_counts("C") == n_runs * n_subruns * n_events); } diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index 5f2343a3..9e6688e3 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -121,7 +121,7 @@ TEST_CASE("Hierarchical nodes", "[graph]") CHECK(g.execution_counts("square") == index_limit * number_limit); CHECK(g.execution_counts("add") == index_limit * number_limit); - CHECK(g.execution_counts("get_the_time") >= index_limit); + CHECK(g.execution_counts("get_the_time") == index_limit); CHECK(g.execution_counts("scale") == index_limit); CHECK(g.execution_counts("print_result") == index_limit); } From 23afaf0ca044f339435bbc329c8dcfe9753a19f1 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 11 Nov 2025 17:08:02 -0600 Subject: [PATCH 4/6] First test of repeaters --- test/CMakeLists.txt | 1 + test/repeater.cpp | 209 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 test/repeater.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7ef6f56b..3f5947f1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,6 +49,7 @@ add_catch_test(product_matcher LIBRARIES phlex::model) add_catch_test(product_store LIBRARIES phlex::core) add_catch_test(fold LIBRARIES phlex::core) add_catch_test(replicated LIBRARIES TBB::tbb phlex::utilities spdlog::spdlog) +add_catch_test(repeater LIBRARIES TBB::tbb spdlog::spdlog) add_catch_test(serializer LIBRARIES phlex::core TBB::tbb) add_catch_test(specified_label LIBRARIES phlex::core) add_catch_test(unfold LIBRARIES Boost::json phlex::core TBB::tbb) diff --git a/test/repeater.cpp b/test/repeater.cpp new file mode 100644 index 00000000..fddf7902 --- /dev/null +++ b/test/repeater.cpp @@ -0,0 +1,209 @@ +// ============================= // +// This test is used to determine whether "repeaters" can work. // +// // +// (1) input // +// | // +// (2) router // +// / \ // +// | \ // +// | \ // +// (3) I(run) I(spill) (4) // +// | /| // +// | / | // +// (5) expo / | // +// | / | // +// \ / | // +// (7) repeater number (6) // +// \ / // +// \ / // +// (8) consume // +// ============================== // + +#include "catch2/catch_test_macros.hpp" +#include "fmt/ranges.h" +#include "fmt/std.h" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include +#include + +using namespace oneapi::tbb; +using namespace spdlog; + +namespace { + + constexpr int num_runs = 5; + constexpr int messages_per_run = 10; + constexpr int num_messages = num_runs * messages_per_run; + + constexpr int spills_per_run = messages_per_run - 1; + constexpr int num_spills = num_runs * spills_per_run; + + using id_t = std::vector; + using product_t = double; + using product_ptr_t = std::shared_ptr; + + struct data_cell_id { + int msg_id; + id_t cell_id; + }; + + struct message { + data_cell_id id; + product_ptr_t product; + }; + + using repeater_node_input = std::tuple; + class repeater_node : public flow::composite_node> { + using base_t = flow::composite_node>; + using tagged_repeater_msg = flow::tagged_msg; + + struct cached_product { + product_ptr_t product{}; + concurrent_queue msg_ids{}; + }; + + using cached_products_t = concurrent_hash_map; + using accessor = cached_products_t::accessor; + using const_accessor = cached_products_t::const_accessor; + + public: + repeater_node(flow::graph& g) : + base_t{g}, + indexer_{g}, + repeater_{g, + flow::unlimited, + [this](tagged_repeater_msg const& tagged, auto& outputs) -> flow::continue_msg { + int key = -1; + if (tagged.tag() == 0ull) { + auto const& msg = tagged.cast_to(); + key = msg.id.cell_id[0]; + accessor a; + std::ignore = cached_products_.insert(a, key); + a->second.product = msg.product; + } else { + auto const [msg_id, cell_id] = tagged.cast_to(); + key = cell_id[0]; + accessor a; + std::ignore = cached_products_.insert(a, key); + a->second.msg_ids.push(msg_id); + } + + accessor ca; + bool const result [[maybe_unused]] = cached_products_.find(ca, key); + assert(result); + int new_msg_id{}; + auto& output = std::get<0>(outputs); + while (ca->second.product and ca->second.msg_ids.try_pop(new_msg_id)) { + output.try_put({.id = data_cell_id{.msg_id = new_msg_id, .cell_id = {key}}, + .product = ca->second.product}); + } + return {}; + }} + { + base_t::set_external_ports( + base_t::input_ports_type{input_port<0>(indexer_), input_port<1>(indexer_)}, + base_t::output_ports_type{output_port<0>(repeater_)}); + make_edge(indexer_, repeater_); + } + + private: + flow::indexer_node indexer_; + flow::multifunction_node> repeater_; + concurrent_hash_map cached_products_; // FIXME: int should be the ID itself + }; + +} + +TEST_CASE("Serialize functions based on resource", "[multithreading]") +{ + spdlog::flush_on(spdlog::level::trace); + + flow::graph g; + + // 1. input + int i{}; + flow::input_node src{g, [&i](flow_control& fc) -> data_cell_id { + if (i == num_messages) { + fc.stop(); + return {}; + } + + auto const [remainder, quotient] = std::div(i++, 10); + if (quotient == 0) { + return {.msg_id = i, .cell_id = {remainder}}; + } + return {.msg_id = i, .cell_id = {remainder, quotient}}; + }}; + + flow::broadcast_node run_index_set{g}; // 3. I(run) + flow::broadcast_node spill_index_set{g}; // 4. I(spill) + + // 2. router + flow::function_node router{ + g, + flow::unlimited, + [&run_index_set, &spill_index_set](data_cell_id const& id) -> flow::continue_msg { + auto const& [_, cell_id] = id; + if (cell_id.size() == 1ull) { + run_index_set.try_put(id); + } else { + spill_index_set.try_put(id); + } + return {}; + }}; + make_edge(src, router); + + // 5. exponent provider + std::atomic run_counter; + flow::function_node exponent_provider{ + g, flow::unlimited, [&run_counter](data_cell_id const& id) -> message { + ++run_counter; + return {.id = id, .product = std::make_shared(id.cell_id[0])}; + }}; + make_edge(run_index_set, exponent_provider); + + // 6. number provider + flow::function_node number_provider{ + g, flow::unlimited, [](data_cell_id const& id) -> message { + return {.id = id, .product = std::make_shared(id.cell_id[1])}; + }}; + make_edge(spill_index_set, number_provider); + + // 7. repeater + repeater_node repeater{g}; + make_edge(exponent_provider, input_port<0>(repeater)); + make_edge(spill_index_set, input_port<1>(repeater)); + + auto use_message_id = [](message const& msg) { return msg.id.msg_id; }; + flow::join_node, flow::tag_matching> join_layers{ + g, use_message_id, use_message_id}; + make_edge(repeater, input_port<0>(join_layers)); + make_edge(number_provider, input_port<1>(join_layers)); + + // 8. consume + std::atomic spill_counter; + flow::function_node> consume{ + g, + flow::unlimited, + [&spill_counter](std::tuple const& joined_data) -> flow::continue_msg { + auto const& [a, b] = joined_data; + assert(a.id.cell_id[0] == b.id.cell_id[0]); + ++spill_counter; + return {}; + }}; + make_edge(join_layers, consume); + + src.activate(); + g.wait_for_all(); + + CHECK(run_counter == num_runs); + CHECK(spill_counter == num_spills); +} From 5761524655d7d0188074921af5d123d635bb3cfb Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 13 Nov 2025 11:39:50 -0600 Subject: [PATCH 5/6] Repeaters supporting end tokens --- test/repeater.cpp | 130 ++++++++++++++++++++++++++++++---------------- 1 file changed, 86 insertions(+), 44 deletions(-) diff --git a/test/repeater.cpp b/test/repeater.cpp index fddf7902..75081d0e 100644 --- a/test/repeater.cpp +++ b/test/repeater.cpp @@ -1,23 +1,23 @@ -// ============================= // +// =========================================================== // // This test is used to determine whether "repeaters" can work. // -// // -// (1) input // -// | // -// (2) router // -// / \ // -// | \ // -// | \ // -// (3) I(run) I(spill) (4) // -// | /| // -// | / | // -// (5) expo / | // -// | / | // -// \ / | // -// (7) repeater number (6) // -// \ / // -// \ / // -// (8) consume // -// ============================== // +// // +// * source // +// *-. router // +// |\ \ // +// * | | I(run) // +// | * | E(run) // +// | | * I(spill) // +// | | |\ // +// | | |/ // +// | |/| // +// * | | exponent // +// |/ / // +// | * number // +// * | repeater // +// |/ // +// * consume // +// // +// ============================================================ // #include "catch2/catch_test_macros.hpp" #include "fmt/ranges.h" @@ -27,6 +27,7 @@ #include "oneapi/tbb/flow_graph.h" #include "spdlog/spdlog.h" +#include #include #include #include @@ -50,6 +51,11 @@ namespace { using product_t = double; using product_ptr_t = std::shared_ptr; + struct end_token { + int count; + id_t cell_id; + }; + struct data_cell_id { int msg_id; id_t cell_id; @@ -60,14 +66,16 @@ namespace { product_ptr_t product; }; - using repeater_node_input = std::tuple; + using repeater_node_input = std::tuple; class repeater_node : public flow::composite_node> { using base_t = flow::composite_node>; - using tagged_repeater_msg = flow::tagged_msg; + using tagged_repeater_msg = flow::tagged_msg; struct cached_product { product_ptr_t product{}; concurrent_queue msg_ids{}; + std::atomic counter; + std::atomic_flag flush_received{false}; }; using cached_products_t = concurrent_hash_map; @@ -81,41 +89,56 @@ namespace { repeater_{g, flow::unlimited, [this](tagged_repeater_msg const& tagged, auto& outputs) -> flow::continue_msg { + cached_product* entry{nullptr}; int key = -1; if (tagged.tag() == 0ull) { auto const& msg = tagged.cast_to(); key = msg.id.cell_id[0]; accessor a; std::ignore = cached_products_.insert(a, key); - a->second.product = msg.product; + entry = &a->second; + entry->product = msg.product; + } else if (tagged.tag() == 1ull) { + auto const [count, cell_id] = tagged.cast_to(); + key = cell_id[0]; + accessor a; + std::ignore = cached_products_.insert(a, key); + entry = &a->second; + entry->counter -= count; + std::ignore = entry->flush_received.test_and_set(); } else { auto const [msg_id, cell_id] = tagged.cast_to(); key = cell_id[0]; accessor a; std::ignore = cached_products_.insert(a, key); - a->second.msg_ids.push(msg_id); + entry = &a->second; + entry->msg_ids.push(msg_id); } - accessor ca; - bool const result [[maybe_unused]] = cached_products_.find(ca, key); - assert(result); int new_msg_id{}; auto& output = std::get<0>(outputs); - while (ca->second.product and ca->second.msg_ids.try_pop(new_msg_id)) { + while (entry->product and entry->msg_ids.try_pop(new_msg_id)) { output.try_put({.id = data_cell_id{.msg_id = new_msg_id, .cell_id = {key}}, - .product = ca->second.product}); + .product = entry->product}); + ++entry->counter; + } + + // Cleanup + if (entry->flush_received.test() and entry->counter == 0) { + cached_products_.erase(key); } return {}; }} { - base_t::set_external_ports( - base_t::input_ports_type{input_port<0>(indexer_), input_port<1>(indexer_)}, - base_t::output_ports_type{output_port<0>(repeater_)}); + base_t::set_external_ports(base_t::input_ports_type{input_port<0>(indexer_), + input_port<1>(indexer_), + input_port<2>(indexer_)}, + base_t::output_ports_type{output_port<0>(repeater_)}); make_edge(indexer_, repeater_); } private: - flow::indexer_node indexer_; + flow::indexer_node indexer_; flow::multifunction_node> repeater_; concurrent_hash_map cached_products_; // FIXME: int should be the ID itself }; @@ -130,32 +153,50 @@ TEST_CASE("Serialize functions based on resource", "[multithreading]") // 1. input int i{}; - flow::input_node src{g, [&i](flow_control& fc) -> data_cell_id { - if (i == num_messages) { + bool flush_run{false}; + flow::input_node src{g, + [&i, &flush_run](flow_control& fc) -> std::variant { + if (i == num_messages and not flush_run) { fc.stop(); return {}; } - auto const [remainder, quotient] = std::div(i++, 10); - if (quotient == 0) { - return {.msg_id = i, .cell_id = {remainder}}; + auto const [quotient, remainder] = std::div(i, 10); + if (remainder == 0 and flush_run) { + flush_run = false; + return end_token{.count = spills_per_run, .cell_id = {quotient}}; + } + + ++i; + if (remainder == 0) { + return data_cell_id{.msg_id = i, .cell_id = {quotient}}; } - return {.msg_id = i, .cell_id = {remainder, quotient}}; + flush_run = true; + return data_cell_id{.msg_id = i, .cell_id = {quotient, remainder}}; }}; flow::broadcast_node run_index_set{g}; // 3. I(run) flow::broadcast_node spill_index_set{g}; // 4. I(spill) + flow::broadcast_node end_run{g}; // 2. router - flow::function_node router{ + flow::function_node> router{ g, flow::unlimited, - [&run_index_set, &spill_index_set](data_cell_id const& id) -> flow::continue_msg { - auto const& [_, cell_id] = id; + [&run_index_set, &end_run, &spill_index_set]( + std::variant const& src_token) -> flow::continue_msg { + auto const* id = std::get_if(&src_token); + if (!id) { + auto const& end_run_token = std::get(src_token); + end_run.try_put(end_run_token); + return {}; + } + + auto const& [_, cell_id] = *id; if (cell_id.size() == 1ull) { - run_index_set.try_put(id); + run_index_set.try_put(*id); } else { - spill_index_set.try_put(id); + spill_index_set.try_put(*id); } return {}; }}; @@ -180,7 +221,8 @@ TEST_CASE("Serialize functions based on resource", "[multithreading]") // 7. repeater repeater_node repeater{g}; make_edge(exponent_provider, input_port<0>(repeater)); - make_edge(spill_index_set, input_port<1>(repeater)); + make_edge(end_run, input_port<1>(repeater)); + make_edge(spill_index_set, input_port<2>(repeater)); auto use_message_id = [](message const& msg) { return msg.id.msg_id; }; flow::join_node, flow::tag_matching> join_layers{ From 570295728454c3e2fbe7fea694c3e180acc6f68d Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Mon, 17 Nov 2025 16:44:09 -0600 Subject: [PATCH 6/6] Only drain the queue once the product is received --- test/repeater.cpp | 64 +++++++++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/test/repeater.cpp b/test/repeater.cpp index 75081d0e..0d453daf 100644 --- a/test/repeater.cpp +++ b/test/repeater.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -37,11 +38,12 @@ using namespace oneapi::tbb; using namespace spdlog; +using namespace std::chrono; namespace { constexpr int num_runs = 5; - constexpr int messages_per_run = 10; + constexpr int messages_per_run = 1000; constexpr int num_messages = num_runs * messages_per_run; constexpr int spills_per_run = messages_per_run - 1; @@ -91,36 +93,44 @@ namespace { [this](tagged_repeater_msg const& tagged, auto& outputs) -> flow::continue_msg { cached_product* entry{nullptr}; int key = -1; - if (tagged.tag() == 0ull) { + auto& output = std::get<0>(outputs); + + auto drain = [&output](int const key, cached_product* entry) -> int { + assert(entry->product); + int counter{}; + int new_msg_id{}; + while (entry->msg_ids.try_pop(new_msg_id)) { + output.try_put({.id = data_cell_id{.msg_id = new_msg_id, .cell_id = {key}}, + .product = entry->product}); + ++counter; + } + return counter; + }; + + if (tagged.is_a()) { auto const& msg = tagged.cast_to(); key = msg.id.cell_id[0]; - accessor a; - std::ignore = cached_products_.insert(a, key); - entry = &a->second; + entry = entry_for(key); entry->product = msg.product; - } else if (tagged.tag() == 1ull) { + entry->counter += drain(key, entry); + + } else if (tagged.is_a()) { auto const [count, cell_id] = tagged.cast_to(); key = cell_id[0]; - accessor a; - std::ignore = cached_products_.insert(a, key); - entry = &a->second; + entry = entry_for(key); entry->counter -= count; std::ignore = entry->flush_received.test_and_set(); } else { auto const [msg_id, cell_id] = tagged.cast_to(); key = cell_id[0]; - accessor a; - std::ignore = cached_products_.insert(a, key); - entry = &a->second; - entry->msg_ids.push(msg_id); - } - - int new_msg_id{}; - auto& output = std::get<0>(outputs); - while (entry->product and entry->msg_ids.try_pop(new_msg_id)) { - output.try_put({.id = data_cell_id{.msg_id = new_msg_id, .cell_id = {key}}, - .product = entry->product}); - ++entry->counter; + entry = entry_for(key); + if (entry->product) { + output.try_put({.id = data_cell_id{.msg_id = msg_id, .cell_id = {key}}, + .product = entry->product}); + ++entry->counter; + } else { + entry->msg_ids.push(msg_id); + } } // Cleanup @@ -138,6 +148,13 @@ namespace { } private: + cached_product* entry_for(int key) + { + accessor a; + std::ignore = cached_products_.insert(a, key); + return &a->second; + } + flow::indexer_node indexer_; flow::multifunction_node> repeater_; concurrent_hash_map cached_products_; // FIXME: int should be the ID itself @@ -161,7 +178,7 @@ TEST_CASE("Serialize functions based on resource", "[multithreading]") return {}; } - auto const [quotient, remainder] = std::div(i, 10); + auto const [quotient, remainder] = std::div(i, messages_per_run); if (remainder == 0 and flush_run) { flush_run = false; return end_token{.count = spills_per_run, .cell_id = {quotient}}; @@ -243,8 +260,11 @@ TEST_CASE("Serialize functions based on resource", "[multithreading]") }}; make_edge(join_layers, consume); + auto start_time = steady_clock::now(); src.activate(); g.wait_for_all(); + spdlog::info("Total execution time: {} microseconds", + duration_cast(steady_clock::now() - start_time).count()); CHECK(run_counter == num_runs); CHECK(spill_counter == num_spills);