diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index cefb903c29895..bfe481c86a7c7 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -105,6 +105,7 @@ o2_add_library(Framework src/StringContext.cxx src/LogParsingHelpers.cxx src/MessageContext.cxx + src/MessageSet.cxx src/Metric2DViewIndex.cxx src/SimpleOptionsRetriever.cxx src/O2ControlHelpers.cxx diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 012b909096317..6256e38c486ca 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -104,6 +104,8 @@ class DataRelayer TimesliceIndex&, ServiceRegistryRef); + ~DataRelayer(); + /// This invokes the appropriate `InputRoute::danglingChecker` on every /// entry in the cache and if it returns true, it creates a new /// cache entry by invoking the associated `InputRoute::expirationHandler`. diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index e7ae70e0ea2e5..bf3c20d04197b 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -1,4 +1,4 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. // See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. // All rights not expressly granted are reserved. // @@ -8,19 +8,47 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_MESSAGESET_H -#define FRAMEWORK_MESSAGESET_H +#ifndef O2_FRAMEWORK_MESSAGESET_H_ +#define O2_FRAMEWORK_MESSAGESET_H_ #include "Framework/PartRef.h" +#include +#include #include #include #include +#include -namespace o2 -{ -namespace framework +namespace o2::framework { +template +concept MessageSetFiller = requires(T t, size_t n) { + { t(n) } -> std::same_as; +}; + +template +concept MessageSetCounter = requires(fair::mq::MessagePtr && (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) { + { t(std::forward(ref)) } -> std::same_as; +}; + +template +concept MessageSetDisposer = requires(void (*t)(fair::mq::MessagePtr&&), fair::mq::MessagePtr&& ref) { + { t(std::forward(ref)) } -> std::same_as; +}; + +// Sometimes we fill from a PartRef, e.g. (header, payload pair) +// So we need some special code for it. +template +concept PartRefFiller = requires(T t, size_t n) { + { t(n) } -> std::same_as; +}; + +template +concept PartRefCounter = requires(PartRef && (*t)(PartRef&&), PartRef&& ref) { + { t(std::forward(ref)) } -> std::same_as; +}; + /// A set of inflight messages. /// The messages are stored in a linear vector. Originally, an O2 message was /// comprised of a header-payload pair which makes indexing of pairs in the @@ -31,20 +59,30 @@ namespace framework /// O2 message model. For this purpose, also the pair index is filled and can /// be used to access header and payload associated with a pair struct MessageSet { + static auto passthrough(fair::mq::MessagePtr&& ref) -> fair::mq::MessagePtr&& { return std::forward(ref); } + static auto passthrough_partref(o2::framework::PartRef&& ref) -> o2::framework::PartRef&& { return std::forward(ref); } + // Use this when you want to delete the messages on clear. + static auto destroy_message(fair::mq::MessagePtr&& ref) -> void + { + fair::mq::MessagePtr toDelete(nullptr); + ref.swap(toDelete); + } + static auto noop(fair::mq::MessagePtr&& ref) -> void {} + static auto assert_empty(fair::mq::MessagePtr&& ref) -> void { assert(ref.get() == nullptr); } + static auto enforce_empty(fair::mq::MessagePtr&& ref) -> void; + struct Index { - Index(size_t p, size_t s) : position(p), size(s) {} size_t position = 0; size_t size = 0; }; // linear storage of messages - std::vector messages; + std::vector> messages; // message map describes O2 messages consisting of a header message and // payload message(s), index describes position in the linear storage std::vector messageMap; // pair map describes all messages in one sequence of header-payload pairs and // where in the message index the associated header and payload can be found struct PairMapping { - PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {} // O2 message where the pair is located in size_t partIndex = 0; // payload index within the O2 message @@ -57,17 +95,44 @@ struct MessageSet { { } - template - MessageSet(F getter, size_t size) + // Allow creating a message set via a getter. + // The counting function will be invoked only + // once per message. If you want to augment a + // MessageSet use the merge method. + MessageSet(MessageSetFiller auto getter, size_t size, MessageSetCounter auto counter) + : messages(), messageMap(), pairMap() + { + messages.reserve(size); + pairMap.reserve(size - 1); + messageMap.emplace_back(Index{.position = 0, .size = size - 1}); + for (size_t i = 0; i < size; ++i) { + if (i > 0) { + pairMap.emplace_back(0, i - 1); + } + messages.emplace_back(std::move(counter(getter(i)))); + } + } + + MessageSet(PartRefFiller auto filler, size_t nPartRef, PartRefCounter auto counter) : messages(), messageMap(), pairMap() { - add(std::forward(getter), size); + messages.reserve(2 * nPartRef); // Beacause messages contains all the messages + messageMap.reserve(nPartRef); // Because the message map tracks how many (header, payload0, payload1 ...) there are + pairMap.reserve(2 * nPartRef - 1); // Because pairMap tracks (header, payload0), (header, payload1), etc. + + for (size_t i = 0; i < nPartRef; ++i) { + pairMap.emplace_back(PairMapping{.partIndex = messageMap.size(), .payloadIndex = 0}); // Because this is the first one + messageMap.emplace_back(Index{.position = i, .size = 1}); // Because a PartRef only has 2 messages (and 1 payload) + o2::framework::PartRef ref = counter(filler(i)); + messages.emplace_back(std::move(ref.header)); + messages.emplace_back(std::move(ref.payload)); + } } MessageSet(MessageSet&& other) : messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap)) { - other.clear(); + other.clear(MessageSet::noop); } MessageSet& operator=(MessageSet&& other) @@ -78,100 +143,91 @@ struct MessageSet { messages = std::move(other.messages); messageMap = std::move(other.messageMap); pairMap = std::move(other.pairMap); - other.clear(); + other.clear(MessageSet::noop); return *this; } + ~MessageSet(); + /// get number of in-flight O2 messages - size_t size() const + [[nodiscard]] size_t size() const { return messageMap.size(); } /// get number of header-payload pairs - size_t getNumberOfPairs() const + [[nodiscard]] size_t getNumberOfPairs() const { return pairMap.size(); } /// get number of payloads for an in-flight message - size_t getNumberOfPayloads(size_t mi) const + [[nodiscard]] size_t getNumberOfPayloads(size_t mi) const { return messageMap[mi].size; } /// clear the set - void clear() + void clear(MessageSetDisposer auto dispose) { + for (auto& message : messages) { + dispose(std::move(message)); + } messages.clear(); messageMap.clear(); pairMap.clear(); } - // this is more or less legacy - // PartRef has been earlier used to store fixed header-payload pairs - // reset the set and store content of the part ref - void reset(PartRef&& ref) - { - clear(); - add(std::move(ref)); - } - - // this is more or less legacy - // PartRef has been earlier used to store fixed header-payload pairs - // add content of the part ref - void add(PartRef&& ref) - { - pairMap.emplace_back(messageMap.size(), 0); - messageMap.emplace_back(messages.size(), 1); - messages.emplace_back(std::move(ref.header)); - messages.emplace_back(std::move(ref.payload)); - } - - /// add an O2 message - template - void add(F getter, size_t size) + /// Add messages in bulk. We are guaranteed that this + /// function is executed only once for each incoming message + /// so it can be used to trigger the early forwarding. + /// + void merge(MessageSet&& other) { auto partid = messageMap.size(); - messageMap.emplace_back(messages.size(), size - 1); - for (size_t i = 0; i < size; ++i) { + messageMap.emplace_back(messages.size(), other.messages.size() - 1); + for (size_t i = 0; i < other.messages.size(); ++i) { if (i > 0) { pairMap.emplace_back(partid, i - 1); } - messages.emplace_back(std::move(getter(i))); + messages.emplace_back(std::move(other.messages[i])); } + // Every message should be removed once the MessageSet is + // merged. + other.clear(MessageSet::assert_empty); } - fair::mq::MessagePtr& header(size_t partIndex) + // This should really be used to give ownership to something else. + [[nodiscard]] std::unique_ptr extractHeader(size_t partIndex) { - return messages[messageMap[partIndex].position]; + return std::move(messages[messageMap[partIndex].position]); } - fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0) + [[nodiscard]] std::unique_ptr extractPayload(size_t partIndex, size_t payloadIndex = 0) { assert(partIndex < messageMap.size()); assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); - return messages[messageMap[partIndex].position + payloadIndex + 1]; + return std::move(messages[messageMap[partIndex].position + payloadIndex + 1]); } - fair::mq::MessagePtr const& header(size_t partIndex) const + [[nodiscard]] std::unique_ptr const& header(size_t partIndex) const { return messages[messageMap[partIndex].position]; } - fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const + [[nodiscard]] std::unique_ptr const& payload(size_t partIndex, size_t payloadIndex = 0) const { assert(partIndex < messageMap.size()); assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); return messages[messageMap[partIndex].position + payloadIndex + 1]; } - fair::mq::MessagePtr const& associatedHeader(size_t pos) const + [[nodiscard]] std::unique_ptr const& associatedHeader(size_t pos) const { return messages[messageMap[pairMap[pos].partIndex].position]; } - fair::mq::MessagePtr const& associatedPayload(size_t pos) const + [[nodiscard]] std::unique_ptr const& associatedPayload(size_t pos) const { auto partIndex = pairMap[pos].partIndex; auto payloadIndex = pairMap[pos].payloadIndex; @@ -179,6 +235,6 @@ struct MessageSet { } }; -} // namespace framework -} // namespace o2 -#endif // FRAMEWORK_MESSAGESET_H +} // namespace o2::framework + +#endif // O2_FRAMEWORK_MESSAGESET_H_ diff --git a/Framework/Core/include/Framework/PartRef.h b/Framework/Core/include/Framework/PartRef.h index 91fb4d857102e..6200de10667d6 100644 --- a/Framework/Core/include/Framework/PartRef.h +++ b/Framework/Core/include/Framework/PartRef.h @@ -8,16 +8,13 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_PARTREF_H -#define FRAMEWORK_PARTREF_H +#ifndef O2_FRAMEWORK_PARTREF_H_ +#define O2_FRAMEWORK_PARTREF_H_ #include - #include -namespace o2 -{ -namespace framework +namespace o2::framework { /// Reference to an inflight part. @@ -26,6 +23,6 @@ struct PartRef { std::unique_ptr payload; }; -} // namespace framework -} // namespace o2 +} // namespace o2::framework + #endif // FRAMEWORK_PARTREF_H diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3b430378dc0b0..edc80b21a797c 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -581,8 +581,8 @@ static auto toBeForwardedHeader = [](void* header) -> bool { static auto toBeforwardedMessageSet = [](std::vector& cachedForwardingChoices, FairMQDeviceProxy& proxy, - std::unique_ptr& header, - std::unique_ptr& payload, + std::unique_ptr const& header, + std::unique_ptr const& payload, size_t total, bool consume) { if (header.get() == nullptr) { @@ -596,7 +596,6 @@ static auto toBeforwardedMessageSet = [](std::vector& cachedForwar // If the payload is not there, it means we already // processed it with ConsumeExisiting. Therefore we // need to do something only if this is the last consume. - header.reset(nullptr); return false; } @@ -681,16 +680,20 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) { auto& messageSet = currentSetOfInputs[ii]; - auto& header = messageSet.header(pi); - auto& payload = messageSet.payload(pi); + auto const& header = messageSet.header(pi); + auto const& payload = messageSet.payload(pi); auto total = messageSet.getNumberOfPayloads(pi); + // Already forwarded. Skip the rest. if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) { + if (payload->GetData() == nullptr && consume) { + std::unique_ptr header{messageSet.extractHeader(pi)}; + } continue; } // In case of more than one forward route, we need to copy the message. - // This will eventually use the same mamory if running with the same backend. + // This will eventually use the same memory if running with the same backend. if (cachedForwardingChoices.size() > 1) { copy = true; } @@ -714,10 +717,11 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, } else { O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.", fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value); - forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.extractHeader(pi))); for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.extractPayload(pi, payloadIndex))); } + messageSet.clear(MessageSet::assert_empty); } } } @@ -1829,7 +1833,6 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& // This is the same id as the upper level function, so we get the events // associated with the same interval. We will simply use "handle_data" as // the category. - O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); // This is how we validate inputs. I.e. we try to enforce the O2 Data model // and we do a few stats. We bind parts as a lambda captured variable, rather @@ -2182,8 +2185,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v }; #if __has_include() auto refCountGetter = [¤tSetOfInputs](size_t idx) -> int { - auto& header = static_cast(*currentSetOfInputs[idx].header(0)); - return header.GetRefCount(); + return static_cast(*currentSetOfInputs[idx].header(0)).GetRefCount(); }; #else std::function refCountGetter = nullptr; @@ -2244,7 +2246,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v continue; } // This will hopefully delete the message. - currentSetOfInputs[ii].clear(); + currentSetOfInputs[ii].clear(MessageSet::destroy_message); } }; @@ -2367,6 +2369,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str()); if (action.op == CompletionPolicy::CompletionOp::Wait) { O2_SIGNPOST_END(device, aid, "device", "Waiting for more data."); + if (spec.forwards.empty() == false && context.canForwardEarly) { + auto& timesliceIndex = ref.get(); + forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, false); + } continue; } @@ -2551,6 +2557,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v DataProcessingHelpers::switchState(ref, StreamingState::Idle); } + // If messages got up to here, it means they are transient ones which need to be + // destroyed. Cleaning them up. + for (auto &set : currentSetOfInputs) { + set.clear(MessageSet::destroy_message); + } return true; } diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 06e920112649e..a62958525f184 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -9,10 +9,11 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/DeviceState.h" -#include "Framework/RootSerializationSupport.h" #include "Framework/DataRelayer.h" #include "Framework/DataProcessingStats.h" #include "Framework/DriverConfig.h" +#include "Headers/DataHeaderHelpers.h" +#include "Framework/Formatters.h" #include "Framework/CompilerBuiltins.h" #include "Framework/DataDescriptorMatcher.h" @@ -29,22 +30,18 @@ #include "Framework/RoutingIndices.h" #include "Framework/VariableContextHelpers.h" #include "Framework/FairMQDeviceProxy.h" -#include "DataProcessingStatus.h" #include "DataRelayerHelpers.h" #include "InputRouteHelpers.h" #include "Framework/LifetimeHelpers.h" -#include "Framework/CommonServices.h" #include "Framework/DataProcessingStates.h" -#include "Framework/DataTakingContext.h" #include "Framework/DefaultsHelpers.h" - -#include "Headers/DataHeaderHelpers.h" -#include "Framework/Formatters.h" +#include "Framework/Signpost.h" #include #include #include +#include #include #include #if __has_include() @@ -108,6 +105,16 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy, states.processCommandQueue(); } +DataRelayer::~DataRelayer() +{ + // Clear everything in the cache, checking that there is no + // pending messages. If there is, then there is most likely + // an issue on the onDrop. + for (auto& set : mCache) { + set.clear(MessageSet::enforce_empty); + } +} + TimesliceId DataRelayer::getTimesliceForSlot(TimesliceSlot slot) { std::scoped_lock lock(mMutex); @@ -172,7 +179,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector 0 && part.header(0) != nullptr) { headerPresent++; continue; @@ -232,15 +239,19 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector PartRef { + assert(expirator.handler); + PartRef newRef; + expirator.handler(services, newRef, variables); + return newRef; + }; + part = MessageSet(filler, 1, MessageSet::passthrough_partref); activity.expiredSlots++; mTimesliceIndex.markAsDirty(slot, true); - assert(part.header(0) != nullptr); - assert(part.payload(0) != nullptr); + assert(part.header(0).get() != nullptr); + assert(part.payload(0).get() != nullptr); } } LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}", @@ -422,7 +433,7 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop) // will be ignored. assert(numInputTypes * slot.index < cache.size()); for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) { - cache[ai].clear(); + cache[ai].clear(MessageSet::destroy_message); cachedStateMetrics[ai] = CacheEntryStatus::EMPTY; } }; @@ -490,7 +501,7 @@ DataRelayer::RelayChoice &nPayloads, &cache = mCache, &services = mContext, - numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t { + numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) mutable -> size_t { O2_SIGNPOST_ID_GENERATE(aid, data_relayer); O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s", fmt::format("{:x}", *o2::header::get(messages[0]->GetData())).c_str(), @@ -518,7 +529,11 @@ DataRelayer::RelayChoice mi += nPayloads; continue; } - target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1); + auto filler = [&messages, &mi](size_t i) mutable -> fair::mq::MessagePtr { + return std::move(messages[mi + i]); + }; + MessageSet inputs(filler, nPayloads + 1, MessageSet::passthrough); + target.merge(std::move(inputs)); mi += nPayloads; saved += nPayloads; } @@ -867,11 +882,6 @@ std::vector DataRelayer::consumeAllInputsForTimeslice auto& cache = mCache; auto& index = mTimesliceIndex; - // Nothing to see here, this is just to make the outer loop more understandable. - auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) { - return; - }; - // We move ownership so that the cache can be reused once the computation is // finished. We mark the given cache slot invalid, so that it can be reused // This means we can still handle old messages if there is still space in the @@ -895,14 +905,12 @@ std::vector DataRelayer::consumeAllInputsForTimeslice // FIXME: what happens when we have enough timeslices to hit the invalid one? auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) { for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) { - assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; })); - cache[ai].clear(); + cache[ai].clear(MessageSet::enforce_empty); } index.markAsInvalid(s); }; // Outer loop here. - jumpToCacheEntryAssociatedWith(slot); for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) { moveHeaderPayloadToOutput(slot, ai); } @@ -919,36 +927,23 @@ std::vector DataRelayer::consumeExistingInputsForTime // State of the computation std::vector messages(numInputTypes); auto& cache = mCache; - auto& index = mTimesliceIndex; - - // Nothing to see here, this is just to make the outer loop more understandable. - auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) { - return; - }; // We move ownership so that the cache can be reused once the computation is // finished. We mark the given cache slot invalid, so that it can be reused // This means we can still handle old messages if there is still space in the // cache where to put them. - auto copyHeaderPayloadToOutput = [&messages, - &cachedStateMetrics = mCachedStateMetrics, - &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) { - auto cacheId = s.index * numInputTypes + arg; - cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; - // TODO: in the original implementation of the cache, there have been only two messages per entry, - // check if the 2 above corresponds to the number of messages. - for (size_t pi = 0; pi < cache[cacheId].size(); pi++) { - auto& header = cache[cacheId].header(pi); + for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) { + auto cacheId = slot.index * numInputTypes + ai; + auto size = 2 * cache[cacheId].size(); + mCachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; + + auto extractFromSlot = [s = slot, arg = ai, &cachedStateMetrics = mCachedStateMetrics, &cache, cacheId](size_t pi) -> PartRef { + auto const& header = cache[cacheId].header(pi); auto&& newHeader = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); - messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))}); - } - }; - - // Outer loop here. - jumpToCacheEntryAssociatedWith(slot); - for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) { - copyHeaderPayloadToOutput(slot, ai); + return std::move(PartRef{std::move(newHeader), cache[cacheId].extractPayload(pi)}); + }; + messages[ai] = std::move(MessageSet(extractFromSlot, size, MessageSet::passthrough_partref)); } return std::move(messages); @@ -959,7 +954,7 @@ void DataRelayer::clear() std::scoped_lock lock(mMutex); for (auto& cache : mCache) { - cache.clear(); + cache.clear(MessageSet::destroy_message); } for (size_t s = 0; s < mTimesliceIndex.size(); ++s) { mTimesliceIndex.markAsInvalid(TimesliceSlot{s}); diff --git a/Framework/Core/src/MessageSet.cxx b/Framework/Core/src/MessageSet.cxx new file mode 100644 index 0000000000000..5d1b0b7e6791c --- /dev/null +++ b/Framework/Core/src/MessageSet.cxx @@ -0,0 +1,31 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// +#include "Framework/MessageSet.h" +#include "Framework/Logger.h" +#include "Framework/RuntimeError.h" + +namespace o2::framework +{ +MessageSet::~MessageSet() +{ + assert(messages.empty()); + if (!messages.empty()) { + LOGP(fatal, "MessageSet should be cleared before being destroyed."); + } +} +auto MessageSet::enforce_empty(fair::mq::MessagePtr&& ref) -> void +{ + if (ref.get() != nullptr) { + throw o2::framework::runtime_error("MessageSet not empty"); + } +} +} // namespace o2::framework diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 7d5a3ded88e16..fca23e51dd60b 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -24,6 +24,7 @@ #include "Framework/DataProcessingHeader.h" #include "Framework/ServiceRegistryHelpers.h" #include "Framework/WorkflowSpec.h" +#include #include #include #include @@ -116,6 +117,7 @@ TEST_CASE("DataRelayer") // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); REQUIRE(result.at(0).size() == 1); + result.at(0).clear(MessageSet::noop); } // @@ -166,6 +168,7 @@ TEST_CASE("DataRelayer") // one MessageSet with one PartRef with header and payload REQUIRE(result.size() == 1); REQUIRE(result.at(0).size() == 1); + result.at(0).clear(MessageSet::noop); } // This test a more complicated set of inputs, and verifies that data is @@ -247,6 +250,8 @@ TEST_CASE("DataRelayer") REQUIRE(result.size() == 2); REQUIRE(result.at(0).size() == 1); REQUIRE(result.at(1).size() == 1); + result.at(0).clear(MessageSet::noop); + result.at(1).clear(MessageSet::noop); } // This test a more complicated set of inputs, and verifies that data is @@ -341,7 +346,13 @@ TEST_CASE("DataRelayer") REQUIRE(ready.size() == 1); REQUIRE(ready[0].slot.index == 1); REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume); + REQUIRE(result.size() == 2); + result.at(0).clear(MessageSet::noop); + result.at(1).clear(MessageSet::noop); result = relayer.consumeAllInputsForTimeslice(ready[0].slot); + REQUIRE(result.size() == 2); + result.at(0).clear(MessageSet::noop); + result.at(1).clear(MessageSet::noop); } // This tests a simple cache pruning, where a single input is shifted out of @@ -401,6 +412,9 @@ TEST_CASE("DataRelayer") REQUIRE(ready[1].op == CompletionPolicy::CompletionOp::Consume); for (size_t i = 0; i < ready.size(); ++i) { auto result = relayer.consumeAllInputsForTimeslice(ready[i].slot); + for (auto& s : result) { + s.clear(MessageSet::noop); + } } // This fills the cache and makes 2 obsolete. @@ -416,6 +430,8 @@ TEST_CASE("DataRelayer") // One for the header, one for the payload REQUIRE(result1.size() == 1); REQUIRE(result2.size() == 1); + result1.at(0).clear(MessageSet::noop); + result2.at(0).clear(MessageSet::noop); } // This the any policy. Even when there are two inputs, given the any policy @@ -489,6 +505,7 @@ TEST_CASE("DataRelayer") REQUIRE(ready3.size() == 1); REQUIRE(ready3[0].slot.index == 1); REQUIRE(ready3[0].op == CompletionPolicy::CompletionOp::Consume); + relayer.clear(); } /// Test that the clear method actually works. @@ -610,6 +627,7 @@ TEST_CASE("DataRelayer") REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured); REQUIRE(header2.get() != nullptr); REQUIRE(payload2.get() != nullptr); + relayer.clear(); } SECTION("SplitParts") @@ -681,6 +699,7 @@ TEST_CASE("DataRelayer") CHECK(action.timeslice.value == 1); REQUIRE(header2.get() != nullptr); REQUIRE(payload2.get() != nullptr); + relayer.clear(); } SECTION("SplitPayloadPairs") @@ -735,6 +754,7 @@ TEST_CASE("DataRelayer") REQUIRE(messageSet.size() == 1); REQUIRE(messageSet[0].size() == nSplitParts); REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1); + messageSet[0].clear(MessageSet::noop); } SECTION("SplitPayloadSequence") @@ -801,11 +821,13 @@ TEST_CASE("DataRelayer") for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) { REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]); for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) { - REQUIRE(messageSet[0].payload(seqid, pi)); - auto const* data = messageSet[0].payload(seqid, pi)->GetData(); + auto const& c = messageSet[0].payload(seqid, pi); + REQUIRE(c.get()); + auto const* data = c->GetData(); REQUIRE(*(reinterpret_cast(data)) == counter); ++counter; } } + messageSet[0].clear(MessageSet::noop); } } diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index d56e32fea1adb..f7f038a634cbc 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -15,14 +15,14 @@ using namespace o2::framework; -TEST_CASE("MessageSet") { - o2::framework::MessageSet msgSet; +TEST_CASE("MessageSet") +{ std::vector ptrs; std::unique_ptr msg(nullptr); std::unique_ptr msg2(nullptr); ptrs.emplace_back(std::move(msg)); ptrs.emplace_back(std::move(msg2)); - msgSet.add([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 2); + o2::framework::MessageSet msgSet([&ptrs](size_t i) -> fair::mq::MessagePtr { return std::move(ptrs[i]); }, 2, MessageSet::passthrough); REQUIRE(msgSet.messages.size() == 2); REQUIRE(msgSet.messageMap.size() == 1); @@ -32,15 +32,17 @@ TEST_CASE("MessageSet") { REQUIRE(msgSet.pairMap[0].partIndex == 0); REQUIRE(msgSet.pairMap[0].payloadIndex == 0); + msgSet.clear(MessageSet::noop); } -TEST_CASE("MessageSetWithFunction") { +TEST_CASE("MessageSetWithFunction") +{ std::vector ptrs; std::unique_ptr msg(nullptr); std::unique_ptr msg2(nullptr); ptrs.emplace_back(std::move(msg)); ptrs.emplace_back(std::move(msg2)); - o2::framework::MessageSet msgSet([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 2); + o2::framework::MessageSet msgSet([&ptrs](size_t i) -> fair::mq::MessagePtr { return std::move(ptrs[i]); }, 2, MessageSet::passthrough); REQUIRE(msgSet.messages.size() == 2); REQUIRE(msgSet.messageMap.size() == 1); @@ -50,9 +52,11 @@ TEST_CASE("MessageSetWithFunction") { REQUIRE(msgSet.pairMap[0].partIndex == 0); REQUIRE(msgSet.pairMap[0].payloadIndex == 0); + msgSet.clear(MessageSet::noop); } -TEST_CASE("MessageSetWithMultipart") { +TEST_CASE("MessageSetWithMultipart") +{ std::vector ptrs; std::unique_ptr msg(nullptr); std::unique_ptr msg2(nullptr); @@ -60,7 +64,7 @@ TEST_CASE("MessageSetWithMultipart") { ptrs.emplace_back(std::move(msg)); ptrs.emplace_back(std::move(msg2)); ptrs.emplace_back(std::move(msg3)); - o2::framework::MessageSet msgSet([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 3); + o2::framework::MessageSet msgSet([&ptrs](size_t i) -> fair::mq::MessagePtr { return std::move(ptrs[i]); }, 3, MessageSet::passthrough); REQUIRE(msgSet.messages.size() == 3); REQUIRE(msgSet.messageMap.size() == 1); @@ -72,17 +76,19 @@ TEST_CASE("MessageSetWithMultipart") { REQUIRE(msgSet.pairMap[0].payloadIndex == 0); REQUIRE(msgSet.pairMap[1].partIndex == 0); REQUIRE(msgSet.pairMap[1].payloadIndex == 1); + msgSet.clear(MessageSet::noop); } -TEST_CASE("MessageSetAddPartRef") { +TEST_CASE("MessageSetAddPartRef") +{ std::vector ptrs; std::unique_ptr msg(nullptr); std::unique_ptr msg2(nullptr); - ptrs.emplace_back(std::move(msg)); - ptrs.emplace_back(std::move(msg2)); - PartRef ref {std::move(msg), std::move(msg2)}; + PartRef ref{std::move(msg), std::move(msg2)}; + o2::framework::MessageSet refSet([&ref](size_t i) -> PartRef { return std::move(ref); }, 1, MessageSet::passthrough_partref); + REQUIRE(refSet.messages.size() == 2); o2::framework::MessageSet msgSet; - msgSet.add(std::move(ref)); + msgSet.merge(std::move(refSet)); REQUIRE(msgSet.messages.size() == 2); REQUIRE(msgSet.messageMap.size() == 1); @@ -92,6 +98,8 @@ TEST_CASE("MessageSetAddPartRef") { REQUIRE(msgSet.pairMap[0].partIndex == 0); REQUIRE(msgSet.pairMap[0].payloadIndex == 0); + msgSet.clear(MessageSet::noop); + refSet.clear(MessageSet::noop); } TEST_CASE("MessageSetAddMultiple") @@ -103,16 +111,17 @@ TEST_CASE("MessageSetAddMultiple") ptrs.emplace_back(std::move(msg2)); PartRef ref{std::move(msg), std::move(msg2)}; o2::framework::MessageSet msgSet; - msgSet.add(std::move(ref)); + o2::framework::MessageSet refSet([&ref](size_t) { return std::move(ref); }, 1, MessageSet::passthrough_partref); + msgSet.merge(std::move(refSet)); PartRef ref2{std::move(msg), std::move(msg2)}; - msgSet.add(std::move(ref2)); + o2::framework::MessageSet refSet2([&ref](size_t) { return std::move(ref); }, 1, MessageSet::passthrough_partref); + msgSet.merge(std::move(refSet2)); std::vector msgs; msgs.push_back(std::unique_ptr(nullptr)); msgs.push_back(std::unique_ptr(nullptr)); msgs.push_back(std::unique_ptr(nullptr)); - msgSet.add([&msgs](size_t i) { - return std::move(msgs[i]); - }, 3); + o2::framework::MessageSet refSet3([&msgs](size_t i) { return std::move(msgs[i]); }, 3, MessageSet::passthrough); + msgSet.merge(std::move(refSet3)); REQUIRE(msgSet.messages.size() == 7); REQUIRE(msgSet.messageMap.size() == 3); @@ -132,4 +141,8 @@ TEST_CASE("MessageSetAddMultiple") REQUIRE(msgSet.pairMap[2].payloadIndex == 0); REQUIRE(msgSet.pairMap[3].partIndex == 2); REQUIRE(msgSet.pairMap[3].payloadIndex == 1); + msgSet.clear(MessageSet::noop); + refSet.clear(MessageSet::noop); + refSet2.clear(MessageSet::noop); + refSet3.clear(MessageSet::noop); } diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index f5d18183c3705..88709da404df6 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -26,13 +26,17 @@ o2_add_dpl_workflow(basic-arrow-workflow COMPONENT_NAME TestWorkflows) o2_add_dpl_workflow(o2rootmessage-workflow - SOURCES "src/test_o2RootMessageWorkflow.cxx" + SOURCES src/test_o2RootMessageWorkflow.cxx COMPONENT_NAME TestWorkflows) o2_add_dpl_workflow(diamond-workflow SOURCES src/o2DiamondWorkflow.cxx COMPONENT_NAME TestWorkflows) +o2_add_dpl_workflow(early-forwarding + SOURCES src/o2EarlyForwarding.cxx + COMPONENT_NAME TestWorkflows) + o2_add_dpl_workflow(dummy-populator-workflow SOURCES src/o2DummyPopulatorWorkflow.cxx COMPONENT_NAME TestWorkflows) diff --git a/Framework/TestWorkflows/src/o2EarlyForwarding.cxx b/Framework/TestWorkflows/src/o2EarlyForwarding.cxx new file mode 100644 index 0000000000000..20049429aa3ba --- /dev/null +++ b/Framework/TestWorkflows/src/o2EarlyForwarding.cxx @@ -0,0 +1,118 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/ConfigParamSpec.h" +#include "Framework/DataTakingContext.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "Framework/DeviceSpec.h" +#include "Framework/RawDeviceService.h" +#include "Framework/ControlService.h" +#include "Framework/Configurable.h" +#include "Framework/RunningWorkflowInfo.h" +#include "Framework/CallbackService.h" +#include "Framework/RateLimiter.h" +#include + +#include +#include +#include +#include + +using namespace o2::framework; + +struct WorkflowOptions { + Configurable anInt{"anInt", 1, ""}; + Configurable aFloat{"aFloat", 2.0f, {"a float option"}}; + Configurable aDouble{"aDouble", 3., {"a double option"}}; + Configurable aString{"aString", "foobar", {"a string option"}}; + Configurable aBool{"aBool", true, {"a boolean option"}}; +}; + +void customize(std::vector& policies) +{ + policies.push_back(CallbacksPolicy{ + .matcher = DeviceMatchers::matchByName("A"), + .policy = [](CallbackService& service, InitContext&) { + service.set([]() { LOG(info) << "invoked at start"; }); + }}); +} + +#include "Framework/runDataProcessing.h" + +AlgorithmSpec simplePipe(std::string const& what, int minDelay) +{ + return AlgorithmSpec{adaptStateful([what, minDelay](RunningWorkflowInfo const& runningWorkflow) { + srand(getpid()); + LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow"; + return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) { + LOGP(info, "Invoked {}", what); + device.device()->WaitFor(std::chrono::milliseconds(minDelay)); + auto& bData = outputs.make(OutputRef{what}, 1); + }); + })}; +} + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessing(ConfigContext const& specs) +{ + DataProcessorSpec a{ + .name = "A", + .outputs = {OutputSpec{{"a1"}, "TST", "A1"}, + OutputSpec{{"a2"}, "TST", "A2"}}, + .algorithm = AlgorithmSpec{adaptStateless( + [](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) { + // static RateLimiter limiter; + // limiter.check(pcx, std::stoi(device.device()->fConfig->GetValue("timeframes-rate-limit")), 2000); + outputs.make(OutputRef{"a1"}, 1); + outputs.make(OutputRef{"a2"}, 1); + })}, + .options = { + ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}, + }}; + DataProcessorSpec b{ + .name = "B", + .inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}}, + .outputs = {OutputSpec{{"b1"}, "TST", "B1"}}, + .algorithm = simplePipe("b1", 1000)}; + DataProcessorSpec c{.name = "C", + .inputs = {InputSpec{"x", "TST", "A2"}}, + .outputs = {OutputSpec{{"c1"}, "TST", "C1"}}, + .algorithm = simplePipe("c1", 10000)}; + DataProcessorSpec d{.name = "D", + .inputs = {InputSpec{"a", "TST", "A1"}, + InputSpec{"b", "TST", "B1"}, + InputSpec{"c", "TST", "C1"}}, + .algorithm = AlgorithmSpec{adaptStateless( + [](InputRecord& inputs) { + auto ref = inputs.get("b"); + auto header = o2::header::get(ref.header); + LOG(info) << "Start time: " << header->startTime; + })}}; + + // Depends on only a and b which are not slow, however it should + // still be after D, because of the expendable label. + // This is to mimick the case in which early forwarding happens + // immediately, rather than when the processing is ready to go. + DataProcessorSpec e{.name = "E", + .inputs = { + InputSpec{"a", "TST", "A1"}, + InputSpec{"b", "TST", "B1"}, + }, + .algorithm = AlgorithmSpec{adaptStateless([](InputRecord& inputs) { + auto ref = inputs.get("b"); + auto header = o2::header::get(ref.header); + LOG(info) << "Start time: " << header->startTime; + })}, + .labels = {{"expendable"}}}; + + return workflow::concat(WorkflowSpec{a}, + WorkflowSpec{b, c}, + WorkflowSpec{d}, WorkflowSpec{e}); +}