diff --git a/Framework/Core/include/Framework/DataProcessingHelpers.h b/Framework/Core/include/Framework/DataProcessingHelpers.h index be02aae5d2f69..6c14d1f7d0c3d 100644 --- a/Framework/Core/include/Framework/DataProcessingHelpers.h +++ b/Framework/Core/include/Framework/DataProcessingHelpers.h @@ -53,8 +53,8 @@ struct DataProcessingHelpers { /// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies); /// Helper to route messages for forwarding - static std::vector routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector& currentSetOfInputs, - TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume); + static std::vector routeForwardedMessages(FairMQDeviceProxy& proxy, std::vector& currentSetOfInputs, + bool copy, bool consume); }; } // namespace o2::framework #endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_ diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 40f1061e60332..63c333561f24e 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -588,10 +588,12 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void { static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { auto& proxy = registry.get(); - auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy, consume); O2_SIGNPOST_ID_GENERATE(sid, forwarding); - O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size()); + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", + slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); + auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume); + for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { if (forwardedParts[fi].Size() == 0) { continue; diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 9c53bbf8b2c10..ad2263e480dd5 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -221,129 +221,99 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi } } -static auto toBeForwardedHeader = [](void* header) -> bool { - // If is now possible that the record is not complete when - // we forward it, because of a custom completion policy. - // this means that we need to skip the empty entries in the - // record for being forwarded. - if (header == nullptr) { - return false; - } - auto dh = o2::header::get(header); - if (!dh) { - return false; - } - bool retval = !o2::header::get(header) && - !o2::header::get(header) && - o2::header::get(header); - // DataHeader is there. Complain if we have unexpected headers present / missing - if (!retval) { - LOGP(error, "Dropping data because of malformed header structure"); - } - return retval; -}; - -static auto toBeforwardedMessageSet = [](std::vector& cachedForwardingChoices, - FairMQDeviceProxy& proxy, - std::unique_ptr& header, - std::unique_ptr& payload, - size_t total, - bool consume) { - if (header.get() == nullptr) { - // Missing an header is not an error anymore. - // it simply means that we did not receive the - // given input, but we were asked to - // consume existing, so we skip it. - return false; - } - if (payload.get() == nullptr && consume == true) { - // If the payload is not there, it means we already - // processed it with ConsumeExisiting. Therefore we - // need to do something only if this is the last consume. - header.reset(nullptr); - return false; - } - - auto fdph = o2::header::get(header->GetData()); - if (fdph == nullptr) { - LOG(error) << "Data is missing DataProcessingHeader"; - return false; - } - auto fdh = o2::header::get(header->GetData()); - if (fdh == nullptr) { - LOG(error) << "Data is missing DataHeader"; - return false; - } - - // We need to find the forward route only for the first - // part of a split payload. All the others will use the same. - // but always check if we have a sequence of multiple payloads - if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) { - proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime); - } - return cachedForwardingChoices.empty() == false; -}; - -std::vector DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector& currentSetOfInputs, - TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume) +auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, + std::vector& currentSetOfInputs, + const bool copyByDefault, bool consume) -> std::vector { // we collect all messages per forward in a map and send them together std::vector forwardedParts; forwardedParts.resize(proxy.getNumForwards()); - std::vector cachedForwardingChoices{}; + std::vector forwardingChoices{}; O2_SIGNPOST_ID_GENERATE(sid, forwarding); - O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", - slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { auto& messageSet = currentSetOfInputs[ii]; - // In case the messageSet is empty, there is nothing to be done. - if (messageSet.size() == 0) { - continue; - } - if (!toBeForwardedHeader(messageSet.header(0)->GetData())) { - continue; - } - cachedForwardingChoices.clear(); - for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) { - auto& messageSet = currentSetOfInputs[ii]; + for (size_t pi = 0; pi < messageSet.size(); ++pi) { auto& header = messageSet.header(pi); + + // If is now possible that the record is not complete when + // we forward it, because of a custom completion policy. + // this means that we need to skip the empty entries in the + // record for being forwarded. + if (header->GetData() == nullptr) { + continue; + } + auto dih = o2::header::get(header->GetData()); + if (dih) { + continue; + } + auto sih = o2::header::get(header->GetData()); + if (sih) { + continue; + } + + auto dph = o2::header::get(header->GetData()); + auto dh = o2::header::get(header->GetData()); + + if (dph == nullptr || dh == nullptr) { + // Complain only if this is not an out-of-band message + LOGP(error, "Data is missing {}{}{}", + dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : ""); + continue; + } + auto& payload = messageSet.payload(pi); - auto total = messageSet.getNumberOfPayloads(pi); - if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) { + if (payload.get() == nullptr && consume == true) { + // If the payload is not there, it means we already + // processed it with ConsumeExisiting. Therefore we + // need to do something only if this is the last consume. + header.reset(nullptr); continue; } - // In case of more than one forward route, we need to copy the message. - // This will eventually use the same mamory if running with the same backend. - if (cachedForwardingChoices.size() > 1) { - copy = true; + // We need to find the forward route only for the first + // part of a split payload. All the others will use the same. + // Therefore, we reset and recompute the forwarding choice: + // + // - If this is the first payload of a [header0][payload0][header0][payload1] sequence, + // which is actually always created and handled together + // - If the message is not a multipart (splitPayloadParts 0) or has only one part + // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore + // we will already use the same choice in the for loop below. + if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) { + forwardingChoices.clear(); + proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime); } - auto* dh = o2::header::get(header->GetData()); - auto* dph = o2::header::get(header->GetData()); - if (copy) { - for (auto& cachedForwardingChoice : cachedForwardingChoices) { + if (forwardingChoices.empty()) { + // Nothing to forward go to the next messageset + continue; + } + + // In case of more than one forward route, we need to copy the message. + // This will eventually use the same memory if running with the same backend. + if (copyByDefault || forwardingChoices.size() > 1) { + for (auto& choice : forwardingChoices) { auto&& newHeader = header->GetTransport()->CreateMessage(); O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.", - fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value); + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value); newHeader->Copy(*header); - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader)); + forwardedParts[choice.value].AddPart(std::move(newHeader)); for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { auto&& newPayload = header->GetTransport()->CreateMessage(); newPayload->Copy(*messageSet.payload(pi, payloadIndex)); - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload)); + forwardedParts[choice.value].AddPart(std::move(newPayload)); } } } else { O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.", - fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value); - forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value); + forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); + forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); } } } diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index b1f42fb0398ca..7ddbc831edad2 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -15,8 +15,6 @@ #include "Framework/DataProcessingHelpers.h" #include "Framework/SourceInfoHeader.h" #include "Framework/DomainInfoHeader.h" -#include "Framework/ServiceRegistry.h" -#include "Framework/ServiceRegistryRef.h" #include "Framework/Signpost.h" #include "Framework/MessageSet.h" #include "Framework/FairMQDeviceProxy.h" @@ -45,11 +43,9 @@ TEST_CASE("ForwardInputsEmpty") bool copyByDefault = true; FairMQDeviceProxy proxy; - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {1}}; std::vector currentSetOfInputs; - TimesliceSlot slot{0}; - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.empty()); } @@ -88,7 +84,6 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -100,9 +95,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 2); // Two messages for that route } @@ -141,7 +134,6 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -154,9 +146,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, true); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, true); REQUIRE(result.size() == 1); REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed. } @@ -199,7 +189,6 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -212,11 +201,10 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route - REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2 + REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen. // Correct behavior below: // REQUIRE(result[0].Size() == 2); // REQUIRE(o2::header::get(result[0].At(0)->GetData()) == nullptr); @@ -260,7 +248,6 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -273,9 +260,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong // FIXME: actually correct behavior below @@ -329,7 +314,6 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -341,9 +325,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // Two messages per route REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters @@ -395,7 +377,6 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -407,9 +388,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward REQUIRE(result[1].Size() == 2); // @@ -468,7 +447,6 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); @@ -488,9 +466,7 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") currentSetOfInputs.emplace_back(std::move(messageSet2)); REQUIRE(currentSetOfInputs.size() == 2); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // REQUIRE(result[1].Size() == 2); // @@ -542,7 +518,6 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -554,9 +529,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 0); // Two messages per route REQUIRE(result[1].Size() == 2); // Two messages per route @@ -615,7 +588,6 @@ TEST_CASE("ForwardInputsSplitPayload") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -639,13 +611,10 @@ TEST_CASE("ForwardInputsSplitPayload") REQUIRE(messageSet.size() == 2); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes CHECK(result[0].Size() == 2); // No messages on this route - CHECK(result[1].Size() == 5); // FIXME: Multipart matching has side effects also for the elements - // CHECK(result[1].Size() == 3); // FIXME: the correct forwarding is that only the multipart goes to the same route + CHECK(result[1].Size() == 3); } TEST_CASE("ForwardInputEOSSingleRoute") @@ -677,7 +646,6 @@ TEST_CASE("ForwardInputEOSSingleRoute") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -689,9 +657,7 @@ TEST_CASE("ForwardInputEOSSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded } @@ -725,7 +691,6 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") proxy.bind({}, {}, routes, findChannelByName, nullptr); - TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}}; std::vector currentSetOfInputs; MessageSet messageSet; @@ -737,9 +702,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - TimesliceSlot slot{0}; - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded }