From 70709c2fa3a507a427d9b05c86b22fe40537eacb Mon Sep 17 00:00:00 2001 From: "Jonas Ohland (Riedel)" Date: Wed, 11 Feb 2026 12:25:19 +0100 Subject: [PATCH 1/4] Some very minor improvements Co-authored-by: Jonas Ohland Signed-off-by: Jonas Ohland (Riedel) --- lib/fabrics/ofi/src/internal/FILogging.cpp | 204 ++++++++++--------- lib/fabrics/ofi/src/internal/LocalRegion.cpp | 10 +- lib/fabrics/ofi/src/internal/LocalRegion.hpp | 3 +- 3 files changed, 111 insertions(+), 106 deletions(-) diff --git a/lib/fabrics/ofi/src/internal/FILogging.cpp b/lib/fabrics/ofi/src/internal/FILogging.cpp index fcb74135..cd8dca2a 100644 --- a/lib/fabrics/ofi/src/internal/FILogging.cpp +++ b/lib/fabrics/ofi/src/internal/FILogging.cpp @@ -20,142 +20,144 @@ namespace mxl::lib::fabrics::ofi { namespace { - constexpr static auto fiLogLevelStrings = std::array, 4>{ + constexpr auto fiLogLevelStrings = std::array, 4>{ {{"trace", FI_LOG_TRACE}, {"debug", FI_LOG_DEBUG}, {"info", FI_LOG_INFO}, {"warn", FI_LOG_WARN}} }; - } - int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags); - int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags, std::uint64_t* showtime); - void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn); + static int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags); + static int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags, + std::uint64_t* showtime); + void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn); - class FILogging - { - public: - void init() + class FILogging { - // It is safe to call a logging functions on the libfabric side - // while we initialize logging here. We just need to make sure - // to initialize logging only once. - if (_isInit.exchange(true, std::memory_order_relaxed)) - { - return; - } - - auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL"); - if (fiLogLevelCStr != nullptr) + public: + void init() { - auto fiLogLevelStr = std::string{fiLogLevelCStr}; - std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](char const c) -> char { return std::tolower(c); }); - auto it = std::ranges::find_if(fiLogLevelStrings, - [fiLogLevelStr](std::pair const& item) { return fiLogLevelStr == item.first; }); - if (it != fiLogLevelStrings.end()) + // It is safe to call a logging functions on the libfabric side + // while we initialize logging here. We just need to make sure + // to initialize logging only once. + if (_isInit.exchange(true, std::memory_order_relaxed)) { - _level = it->second; + return; } - else + + auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL"); + if (fiLogLevelCStr != nullptr) { - _level = FI_LOG_WARN; + auto fiLogLevelStr = std::string{fiLogLevelCStr}; + std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](unsigned char const c) { return std::tolower(c); }); + auto it = std::ranges::find_if(fiLogLevelStrings, + [fiLogLevelStr](std::pair const& item) { return fiLogLevelStr == item.first; }); + if (it != fiLogLevelStrings.end()) + { + _level = it->second; + } + else + { + _level = FI_LOG_WARN; + } } + + auto ops = ::fi_ops_log{ + .size = sizeof(::fi_ops_log), + .enabled = &fiLogEnabled, + .ready = &fiLogReady, + .log = &fiLog, + }; + + auto logging = ::fid_logging{ + .fid = ::fid{}, + .ops = &ops, + }; + + fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging); } - auto ops = ::fi_ops_log{ - .size = sizeof(::fi_ops_log), - .enabled = &fiLogEnabled, - .ready = &fiLogReady, - .log = &fiLog, - }; + private: + friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t); + + [[nodiscard]] + fi_log_level level() const noexcept + { + return _level; + } - auto logging = ::fid_logging{ - .fid = ::fid{}, - .ops = &ops, - }; + [[nodiscard]] + bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept + { + return true; + } - fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging); - } + [[nodiscard]] + bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept + { + return true; + } - private: - friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t); + std::atomic_bool _isInit; + ::fi_log_level _level; + }; - [[nodiscard]] - fi_log_level level() const noexcept - { - return _level; - } + FILogging logging; - [[nodiscard]] - bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept + char const* fiLogSubsystemName(enum fi_log_subsys subsys) { - return true; + switch (subsys) + { + case FI_LOG_CORE: return "core"; + case FI_LOG_FABRIC: return "fabric"; + case FI_LOG_DOMAIN: return "domain"; + case FI_LOG_EP_CTRL: return "ep_ctrl"; + case FI_LOG_EP_DATA: return "ep_data"; + case FI_LOG_AV: return "av"; + case FI_LOG_CQ: return "cq"; + case FI_LOG_EQ: return "eq"; + case FI_LOG_MR: return "mr"; + case FI_LOG_CNTR: return "cntr"; + default: return ""; + } } - [[nodiscard]] - bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept + spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level) { - return true; + switch (level) + { + case FI_LOG_WARN: return spdlog::level::warn; + case FI_LOG_TRACE: return spdlog::level::trace; + case FI_LOG_INFO: return spdlog::level::info; + case FI_LOG_DEBUG: return spdlog::level::debug; + default: return spdlog::level::warn; + } } - std::atomic_bool _isInit; - ::fi_log_level _level; - }; - - static FILogging logging; - - char const* fiLogSubsystemName(enum fi_log_subsys subsys) - { - switch (subsys) + int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t) { - case FI_LOG_CORE: return "core"; - case FI_LOG_FABRIC: return "fabric"; - case FI_LOG_DOMAIN: return "domain"; - case FI_LOG_EP_CTRL: return "ep_ctrl"; - case FI_LOG_EP_DATA: return "ep_data"; - case FI_LOG_AV: return "av"; - case FI_LOG_CQ: return "cq"; - case FI_LOG_EQ: return "eq"; - case FI_LOG_MR: return "mr"; - case FI_LOG_CNTR: return "cntr"; - default: return ""; + return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) && + logging.isSubsystemLoggingEnabled(subsys); } - } - spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level) - { - switch (level) + int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*) { - case FI_LOG_WARN: return spdlog::level::warn; - case FI_LOG_TRACE: return spdlog::level::trace; - case FI_LOG_INFO: return spdlog::level::info; - case FI_LOG_DEBUG: return spdlog::level::debug; - default: return spdlog::level::warn; + return 0; } - } - int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t) - { - return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) && - logging.isSubsystemLoggingEnabled(subsys); - } - - int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*) - { - return 0; - } - - void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn) - { - auto msg = std::string{msgIn}; - if (msg.ends_with('\n')) + void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn) { - msg.pop_back(); + auto msg = std::string{msgIn}; + if (msg.ends_with('\n')) + { + msg.pop_back(); + } + + spdlog::log( + {fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg); } - spdlog::log( - {fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg); } // Initialize logging - void fiInitLogging() + static void fiInitLogging() { logging.init(); } diff --git a/lib/fabrics/ofi/src/internal/LocalRegion.cpp b/lib/fabrics/ofi/src/internal/LocalRegion.cpp index 0fe6664d..49e4f855 100644 --- a/lib/fabrics/ofi/src/internal/LocalRegion.cpp +++ b/lib/fabrics/ofi/src/internal/LocalRegion.cpp @@ -4,13 +4,17 @@ #include "LocalRegion.hpp" #include +#include "Exception.hpp" namespace mxl::lib::fabrics::ofi { - LocalRegion LocalRegion::sub(std::uint64_t offset, std::size_t length) const noexcept + LocalRegion LocalRegion::sub(std::uint64_t offset, std::size_t length) const { - assert(offset + length <= len); + if (offset + length > len) + { + throw Exception::invalidState("Tried to access out-of-bounds sub region of LocalRegion"); + } return LocalRegion{ .addr = addr + offset, @@ -21,7 +25,7 @@ namespace mxl::lib::fabrics::ofi ::iovec LocalRegion::toIovec() const noexcept { - return ::iovec{.iov_base = reinterpret_cast(addr), .iov_len = len}; + return ::iovec{.iov_base = reinterpret_cast(addr), .iov_len = len}; // NOLINT(performance-no-int-to-ptr): No way to avoid this } ::iovec const* LocalRegionGroup::asIovec() const noexcept diff --git a/lib/fabrics/ofi/src/internal/LocalRegion.hpp b/lib/fabrics/ofi/src/internal/LocalRegion.hpp index cca74438..550ac737 100644 --- a/lib/fabrics/ofi/src/internal/LocalRegion.hpp +++ b/lib/fabrics/ofi/src/internal/LocalRegion.hpp @@ -26,9 +26,8 @@ namespace mxl::lib::fabrics::ofi * \param length The length of the sub-region. * \return A new LocalRegion representing the specified sub-region. */ - [[nodiscard]] - LocalRegion sub(std::uint64_t offset, std::size_t length) const noexcept; + LocalRegion sub(std::uint64_t offset, std::size_t length) const; /** \brief Convert this LocalRegion to a struct iovec used by libfabric transfer functions. */ From aaaa29f94577017739e53231604ac1fa51080f6f Mon Sep 17 00:00:00 2001 From: "Jonas Ohland (Riedel)" Date: Wed, 11 Feb 2026 17:02:05 +0100 Subject: [PATCH 2/4] Return grain index from read functions instead of buffer index Co-authored-by: Jonas Ohland Signed-off-by: Jonas Ohland (Riedel) --- lib/fabrics/include/mxl/fabrics.h | 8 ++-- lib/fabrics/ofi/src/fabrics.cpp | 42 +++++++++++-------- lib/fabrics/ofi/src/internal/FILogging.cpp | 2 +- lib/fabrics/ofi/src/internal/Protocol.hpp | 2 +- .../ofi/src/internal/ProtocolIngressRMA.cpp | 33 ++++++++++----- .../ofi/src/internal/ProtocolIngressRMA.hpp | 2 +- lib/fabrics/ofi/src/internal/RCTarget.cpp | 17 ++++---- lib/fabrics/ofi/src/internal/RCTarget.hpp | 14 ++++--- lib/fabrics/ofi/src/internal/RDMTarget.cpp | 18 ++++---- lib/fabrics/ofi/src/internal/RDMTarget.hpp | 9 ++-- lib/fabrics/ofi/src/internal/Region.cpp | 13 +++++- lib/fabrics/ofi/src/internal/Region.hpp | 8 +++- lib/fabrics/ofi/src/internal/Target.cpp | 16 +++---- lib/fabrics/ofi/src/internal/Target.hpp | 13 +++--- lib/tests/fabrics/ofi/Util.hpp | 4 +- lib/tests/fabrics/ofi/test_Region.cpp | 6 ++- lib/tests/fabrics/test_basics.cpp | 41 ++++++++---------- tools/mxl-fabrics-demo/demo.cpp | 27 ++++-------- 18 files changed, 149 insertions(+), 126 deletions(-) diff --git a/lib/fabrics/include/mxl/fabrics.h b/lib/fabrics/include/mxl/fabrics.h index cfc3d8d1..a8da8d7c 100644 --- a/lib/fabrics/include/mxl/fabrics.h +++ b/lib/fabrics/include/mxl/fabrics.h @@ -167,23 +167,23 @@ extern "C" /** * Non-blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target - * \param out_entryIndex The index of the grain ring buffer index that is ready, if any. + * \param out_grainIndex The index of the grain that was written, if any.. * \param out_sliceIndex The last valid slice index that is ready, if any. * \return The result code. MXL_ERR_NOT_READY if no grain was available at the time of the call, and the call should be retried. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex); + mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex, uint16_t* out_sliceIndex); /** * Blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target - * \param out_entryIndex The index of the grain ring buffer index that is ready, if any. + * \param out_grainIndex The index of the grain that was written, if any. * \param out_sliceIndex The last valid slice index that is ready, if any. * \param in_timeoutMs How long should we wait for the grain (in milliseconds) * \return The result code. MXL_ERR_NOT_READY if no grain was available before the timeout. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs); + mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_entryIndex, uint16_t* out_sliceIndex); /** * Create a fabrics initiator instance. diff --git a/lib/fabrics/ofi/src/fabrics.cpp b/lib/fabrics/ofi/src/fabrics.cpp index aba15e6b..3c2c920b 100644 --- a/lib/fabrics/ofi/src/fabrics.cpp +++ b/lib/fabrics/ofi/src/fabrics.cpp @@ -225,9 +225,9 @@ mxlStatus mxlFabricsTargetSetup(mxlFabricsTarget in_target, mxlFabricsTargetConf } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex) +mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex, uint16_t* out_sliceIndex) { - if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr)) + if ((in_target == nullptr) || (out_grainIndex == nullptr)) { return MXL_ERR_INVALID_ARG; } @@ -235,25 +235,28 @@ mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint1 return ofi::try_run( [&]() { - auto target = ofi::TargetWrapper::fromAPI(in_target); - if (auto res = target->read(); res.immData) + auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrain(); + if (!res) { - auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack(); - *out_entryIndex = entryIndex; - *out_sliceIndex = sliceIndex; + return MXL_ERR_NOT_READY; + } - return MXL_STATUS_OK; + auto [entryIndex, sliceIndex] = *res; + *out_grainIndex = entryIndex; + if (out_sliceIndex) + { + *out_sliceIndex = sliceIndex; } - return MXL_ERR_NOT_READY; + return MXL_STATUS_OK; }, "Failed to try for new grain"); } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs) +mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_grainIndex, uint16_t* out_sliceIndex) { - if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr)) + if ((in_target == nullptr) || (out_grainIndex == nullptr)) { return MXL_ERR_INVALID_ARG; } @@ -261,17 +264,20 @@ mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_en return ofi::try_run( [&]() { - auto target = ofi::TargetWrapper::fromAPI(in_target); - if (auto res = target->readBlocking(std::chrono::milliseconds(in_timeoutMs)); res.immData) + auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrainBlocking(std::chrono::milliseconds(in_timeoutMs)); + if (!res) { - auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack(); - *out_entryIndex = entryIndex; - *out_sliceIndex = sliceIndex; + return MXL_ERR_TIMEOUT; + } - return MXL_STATUS_OK; + auto [grainIndex, sliceIndex] = *res; + *out_grainIndex = grainIndex; + if (out_sliceIndex) + { + *out_sliceIndex = sliceIndex; } - return MXL_ERR_NOT_READY; + return MXL_STATUS_OK; }, "Failed to wait for new grain"); } diff --git a/lib/fabrics/ofi/src/internal/FILogging.cpp b/lib/fabrics/ofi/src/internal/FILogging.cpp index cd8dca2a..4e6d7ece 100644 --- a/lib/fabrics/ofi/src/internal/FILogging.cpp +++ b/lib/fabrics/ofi/src/internal/FILogging.cpp @@ -157,7 +157,7 @@ namespace mxl::lib::fabrics::ofi } // Initialize logging - static void fiInitLogging() + void fiInitLogging() { logging.init(); } diff --git a/lib/fabrics/ofi/src/internal/Protocol.hpp b/lib/fabrics/ofi/src/internal/Protocol.hpp index c810b015..a48c6827 100644 --- a/lib/fabrics/ofi/src/internal/Protocol.hpp +++ b/lib/fabrics/ofi/src/internal/Protocol.hpp @@ -40,7 +40,7 @@ namespace mxl::lib::fabrics::ofi * \param endpoint The endpoint associated with the completion * \param completion The completion object to process. */ - virtual Target::ReadResult processCompletion(Endpoint& endpoint, Completion const& completion) = 0; + virtual std::optional readGrain(Endpoint& endpoint, Completion const& completion) = 0; /** \brief Destroy the protocol object. */ diff --git a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp index d4e773d9..24539b42 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp +++ b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp @@ -4,6 +4,8 @@ #include "ProtocolIngressRMA.hpp" #include "Exception.hpp" +#include "ImmData.hpp" +#include "Region.hpp" namespace mxl::lib::fabrics::ofi { @@ -22,27 +24,36 @@ namespace mxl::lib::fabrics::ofi return domain->remoteRegions(); } - void RMAGrainIngressProtocol::start(Endpoint& ep) + void RMAGrainIngressProtocol::start(Endpoint& endpoint) { - if (ep.domain()->usingRecvBufForCqData()) + if (endpoint.domain()->usingRecvBufForCqData()) { - ep.recv(immDataRegion()); + endpoint.recv(immDataRegion()); } } - Target::ReadResult RMAGrainIngressProtocol::processCompletion(Endpoint& ep, Completion const& completion) + std::optional RMAGrainIngressProtocol::readGrain(Endpoint& endpoint, Completion const& completion) { - if (auto data = completion.tryData(); data) + auto completionData = completion.tryData(); + if (!completionData) { - if (_immDataBuffer) - { - ep.recv(immDataRegion()); - } + return {}; + } - return Target::ReadResult{data->data()}; + if (_immDataBuffer) + { + endpoint.recv(_immDataBuffer->toLocalRegion()); + } + + auto immData = completionData->data(); + if (!immData) + { + throw Exception::invalidState("Received a completion without immediate data."); } - return {}; + auto [slot, slice] = ImmDataGrain{static_cast(*immData)}.unpack(); + auto grainIndex = getGrainIndexInRingSlot(_regions, slot); + return std::make_optional(grainIndex, slice); } void RMAGrainIngressProtocol::reset() diff --git a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp index da0d0561..8c229f06 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp +++ b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp @@ -28,7 +28,7 @@ namespace mxl::lib::fabrics::ofi /** \copydoc IngressProtocol::processCompletion() */ - Target::ReadResult processCompletion(Endpoint& endpoint, Completion const& completion) override; + std::optional readGrain(Endpoint& endpoint, Completion const& completion) override; /** \copydoc IngressProtocol::destroy() */ diff --git a/lib/fabrics/ofi/src/internal/RCTarget.cpp b/lib/fabrics/ofi/src/internal/RCTarget.cpp index 77ad3589..fa5673a8 100644 --- a/lib/fabrics/ofi/src/internal/RCTarget.cpp +++ b/lib/fabrics/ofi/src/internal/RCTarget.cpp @@ -74,23 +74,23 @@ namespace mxl::lib::fabrics::ofi , _state(WaitForConnectionRequest{std::move(pep)}) {} - Target::ReadResult RCTarget::read() + std::optional RCTarget::readGrain() { - return makeProgress({}); + return readNextGrain({}); } - Target::ReadResult RCTarget::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional RCTarget::readGrainBlocking(std::chrono::steady_clock::duration timeout) { - return makeProgress(timeout); + return readNextGrain(timeout); } void RCTarget::shutdown() {} template - Target::ReadResult RCTarget::makeProgress(std::chrono::steady_clock::duration timeout) + std::optional RCTarget::readNextGrain(std::chrono::steady_clock::duration timeout) { - Target::ReadResult result; + auto result = std::optional{std::nullopt}; _state = std::visit( overloaded{[](std::monostate) -> State { throw Exception::invalidState("Target is in an invalid state an can no longer make progress"); }, @@ -142,7 +142,6 @@ namespace mxl::lib::fabrics::ofi [&](RCTarget::Connected state) -> State { auto [completion, event] = readEndpointQueues(state.ep, timeout); - if (event && event.value().isShutdown()) { MXL_INFO("Remote endpoint has shutdown the connection. Transitioning to listening to new connection."); @@ -151,7 +150,7 @@ namespace mxl::lib::fabrics::ofi if (completion) { - result = _proto->processCompletion(state.ep, *completion); + result = _proto->readGrain(state.ep, *completion); } return Connected{.ep = std::move(state.ep)}; @@ -161,7 +160,7 @@ namespace mxl::lib::fabrics::ofi return result; } - PassiveEndpoint RCTarget::makeListener(std::shared_ptr fabric) + PassiveEndpoint RCTarget::makeListener(std::shared_ptr const& fabric) { // Create a passive endpoint. A passive endpoint can be viewed like a bound TCP socket listening for // incoming connections. diff --git a/lib/fabrics/ofi/src/internal/RCTarget.hpp b/lib/fabrics/ofi/src/internal/RCTarget.hpp index 08ef42ce..dd5bd494 100644 --- a/lib/fabrics/ofi/src/internal/RCTarget.hpp +++ b/lib/fabrics/ofi/src/internal/RCTarget.hpp @@ -31,14 +31,16 @@ namespace mxl::lib::fabrics::ofi [[nodiscard]] static std::pair, std::unique_ptr> setup(mxlFabricsTargetConfig const& config); - /** \copydoc Target::read() + /** \copydoc Target::readGrain() */ - Target::ReadResult read() final; + std::optional readGrain() final; - /** \copydoc Target::readBlocking() + /** \copydoc Target::readGrainBlocking() */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout) final; + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) final; + /** \brief Shut down the target. + */ void shutdown() override; private: @@ -87,10 +89,10 @@ namespace mxl::lib::fabrics::ofi * \return The result of the read operation. */ template - Target::ReadResult makeProgress(std::chrono::steady_clock::duration timeout); + std::optional readNextGrain(std::chrono::steady_clock::duration timeout); [[nodiscard]] - static PassiveEndpoint makeListener(std::shared_ptr fabric); + static PassiveEndpoint makeListener(std::shared_ptr const& fabric); private: std::unique_ptr _proto; diff --git a/lib/fabrics/ofi/src/internal/RDMTarget.cpp b/lib/fabrics/ofi/src/internal/RDMTarget.cpp index ea28a0db..b70629c3 100644 --- a/lib/fabrics/ofi/src/internal/RDMTarget.cpp +++ b/lib/fabrics/ofi/src/internal/RDMTarget.cpp @@ -73,34 +73,34 @@ namespace mxl::lib::fabrics::ofi return {std::make_unique(std::move(endpoint), std::move(protocol)), std::move(targetInfo)}; } - RDMTarget::RDMTarget(Endpoint ep, std::unique_ptr ingress) + RDMTarget::RDMTarget(Endpoint ep, std::unique_ptr proto) : _ep(std::move(ep)) - , _protocol(std::move(ingress)) + , _protocol(std::move(proto)) {} - Target::ReadResult RDMTarget::read() + std::optional RDMTarget::readGrain() { - return makeProgress({}); + return readNextGrain({}); } - Target::ReadResult RDMTarget::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional RDMTarget::readGrainBlocking(std::chrono::steady_clock::duration timeout) { - return makeProgress(timeout); + return readNextGrain(timeout); } void RDMTarget::shutdown() {} template - Target::ReadResult RDMTarget::makeProgress(std::chrono::steady_clock::duration timeout) + std::optional RDMTarget::readNextGrain(std::chrono::steady_clock::duration timeout) { auto completion = readCompletionQueue(*_ep.completionQueue(), timeout); if (completion) { - return _protocol->processCompletion(_ep, *completion); + return _protocol->readGrain(_ep, *completion); } - return ReadResult{std::nullopt}; + return {}; } } diff --git a/lib/fabrics/ofi/src/internal/RDMTarget.hpp b/lib/fabrics/ofi/src/internal/RDMTarget.hpp index 771fc945..38308544 100644 --- a/lib/fabrics/ofi/src/internal/RDMTarget.hpp +++ b/lib/fabrics/ofi/src/internal/RDMTarget.hpp @@ -28,15 +28,15 @@ namespace mxl::lib::fabrics::ofi /** \copydoc Target::read() */ - Target::ReadResult read() override; + std::optional readGrain() final; /** \copydoc Target::readBlocking() */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout) override; + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) final; /** \copydoc Target::shutdown() */ - void shutdown() override; + void shutdown() final; private: /** \brief Construct an RDMTarget with the given endpoint and immediate data location. @@ -53,10 +53,11 @@ namespace mxl::lib::fabrics::ofi * \return The result of the read operation. */ template - Target::ReadResult makeProgress(std::chrono::steady_clock::duration timeout); + std::optional readNextGrain(std::chrono::steady_clock::duration timeout); private: Endpoint _ep; std::unique_ptr _protocol = {}; + std::vector _regions; }; } diff --git a/lib/fabrics/ofi/src/internal/Region.cpp b/lib/fabrics/ofi/src/internal/Region.cpp index 8165d499..ba107009 100644 --- a/lib/fabrics/ofi/src/internal/Region.cpp +++ b/lib/fabrics/ofi/src/internal/Region.cpp @@ -143,6 +143,7 @@ namespace mxl::lib::fabrics::ofi auto grainInfoBaseAddr = reinterpret_cast(discreteFlow.grainAt(i)); auto grainInfoSize = sizeof(GrainHeader); auto grainPayloadSize = grain->header.info.grainSize; + auto grainIndexPtr = &grain->header.info.index; if (flow.flowInfo()->config.common.payloadLocation != MXL_PAYLOAD_LOCATION_HOST_MEMORY) { @@ -150,7 +151,7 @@ namespace mxl::lib::fabrics::ofi "GPU memory is not currently supported in the Flow API of MXL. Edit the code below when it is supported"); } - regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, Region::Location::host()); + regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, grainIndexPtr, Region::Location::host()); } // TODO: Add an utility function to retrieve the number of available planes when alpha support is added. @@ -173,4 +174,14 @@ namespace mxl::lib::fabrics::ofi throw Exception::make(MXL_ERR_UNKNOWN, "Unsupported flow fromat {}", flow.flowInfo()->config.common.format); } } + + std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slotIndex) + { + if (slotIndex >= regions.size()) + { + throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slotIndex, regions.size()); + } + + return *regions[slotIndex].grainIndexPtr; + } } diff --git a/lib/fabrics/ofi/src/internal/Region.hpp b/lib/fabrics/ofi/src/internal/Region.hpp index 18289fe7..5a29f370 100644 --- a/lib/fabrics/ofi/src/internal/Region.hpp +++ b/lib/fabrics/ofi/src/internal/Region.hpp @@ -95,9 +95,10 @@ namespace mxl::lib::fabrics::ofi * \param size The size of the memory region in bytes. * \param loc The location of the memory region \see Location. */ - explicit Region(std::uintptr_t base, std::size_t size, Location loc = Location::host()) noexcept + explicit Region(std::uintptr_t base, std::size_t size, std::uint64_t const* grainIndexPtr, Location loc = Location::host()) noexcept : base(base) , size(size) + , grainIndexPtr(grainIndexPtr) , loc(loc) , _iovec(iovecFromRegion(base, size)) {} @@ -116,6 +117,7 @@ namespace mxl::lib::fabrics::ofi public: std::uintptr_t base; std::size_t size; + std::uint64_t const* grainIndexPtr; Location loc; private: @@ -199,7 +201,7 @@ namespace mxl::lib::fabrics::ofi public: MxlRegions(std::vector regions, DataLayout dataLayout) : _regions(std::move(regions)) - , _layout(std::move(dataLayout)) + , _layout(dataLayout) {} /** \brief Convert between external and internal versions of this type @@ -231,4 +233,6 @@ namespace mxl::lib::fabrics::ofi */ [[nodiscard]] MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow); + + std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slotIndex); } diff --git a/lib/fabrics/ofi/src/internal/Target.cpp b/lib/fabrics/ofi/src/internal/Target.cpp index 00bfd9c4..21017030 100644 --- a/lib/fabrics/ofi/src/internal/Target.cpp +++ b/lib/fabrics/ofi/src/internal/Target.cpp @@ -10,6 +10,7 @@ #include #include "mxl/fabrics.h" #include "Exception.hpp" +#include "ImmData.hpp" #include "LocalRegion.hpp" #include "RCTarget.hpp" #include "RDMTarget.hpp" @@ -35,31 +36,31 @@ namespace mxl::lib::fabrics::ofi return reinterpret_cast(this); } - Target::ReadResult TargetWrapper::read() + std::optional TargetWrapper::readGrain() { if (!_inner) { - throw Exception::invalidState("Target is not set up"); + throw Exception::invalidState("Target is not set up."); } - return _inner->read(); + return _inner->readGrain(); } - Target::ReadResult TargetWrapper::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional TargetWrapper::readGrainBlocking(std::chrono::steady_clock::duration timeout) { if (!_inner) { - throw Exception::invalidState("Target is not set up"); + throw Exception::invalidState("Target is not set up."); } - return _inner->readBlocking(timeout); + return _inner->readGrainBlocking(timeout); } std::unique_ptr TargetWrapper::setup(mxlFabricsTargetConfig const& config) { if (_inner) { - _inner.release(); + _inner.reset(); } switch (config.provider) @@ -84,5 +85,4 @@ namespace mxl::lib::fabrics::ofi throw Exception::invalidArgument("Invalid provider value"); } - } diff --git a/lib/fabrics/ofi/src/internal/Target.hpp b/lib/fabrics/ofi/src/internal/Target.hpp index 3b57c536..b3d20e56 100644 --- a/lib/fabrics/ofi/src/internal/Target.hpp +++ b/lib/fabrics/ofi/src/internal/Target.hpp @@ -19,9 +19,10 @@ namespace mxl::lib::fabrics::ofi public: /** \brief Result of a read operation. */ - struct ReadResult + struct GrainReadResult { - std::optional immData{std::nullopt}; /**< If a transfer was initiated with immediate data, this contains the data. */ + std::uint64_t grainIndex; + std::uint16_t sliceIndex; }; public: @@ -32,13 +33,13 @@ namespace mxl::lib::fabrics::ofi * A non-blocking operation that also drives the connection forward. Continuous invocation of this function is necessary for connection * establishment and ongoing progress. */ - virtual ReadResult read() = 0; + virtual std::optional readGrain() = 0; /** \brief Determine if new data can be consumed. * * A blocking version of read. see read(). */ - virtual ReadResult readBlocking(std::chrono::steady_clock::duration timeout) = 0; + virtual std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) = 0; /** \brief Shut down the target gracefully. * Initiates a graceful shutdown of the target and blocks until the shutdown is complete. @@ -87,11 +88,11 @@ namespace mxl::lib::fabrics::ofi /** \copydoc Target::read() */ - Target::ReadResult read(); + std::optional readGrain(); /** \copydoc Target::readBlocking(std::chrono::steady_clock::duration) */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout); + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout); /** \brief Set up the target with the specified configuration. * diff --git a/lib/tests/fabrics/ofi/Util.hpp b/lib/tests/fabrics/ofi/Util.hpp index aaf2bbad..143eeb9f 100644 --- a/lib/tests/fabrics/ofi/Util.hpp +++ b/lib/tests/fabrics/ofi/Util.hpp @@ -90,7 +90,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size()); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr); } auto mxlRegions = MxlRegions(regions, DataLayout::fromVideo({8, 0, 0, 0})); @@ -103,7 +103,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size()); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr); } return {regions, dataLayout}; } diff --git a/lib/tests/fabrics/ofi/test_Region.cpp b/lib/tests/fabrics/ofi/test_Region.cpp index 3a7aa744..b7df7bd9 100644 --- a/lib/tests/fabrics/ofi/test_Region.cpp +++ b/lib/tests/fabrics/ofi/test_Region.cpp @@ -24,8 +24,10 @@ TEST_CASE("ofi: Region constructors", "[ofi][Constructors]") TEST_CASE("ofi: RegionGroup view and iovec conversion", "[ofi][RegionGroup]") { - auto r1 = Region{0x1000, 64, Region::Location::host()}; - auto r2 = Region{0x2000, 128, Region::Location::host()}; + std::uint64_t dummyGrainIndex = 0; + + auto r1 = Region{0x1000, 64, &dummyGrainIndex, Region::Location::host()}; + auto r2 = Region{0x2000, 128, &dummyGrainIndex, Region::Location::host()}; auto group = RegionGroup({r1, r2}); REQUIRE(group.size() == 2); diff --git a/lib/tests/fabrics/test_basics.cpp b/lib/tests/fabrics/test_basics.cpp index cc39254a..428d0a82 100644 --- a/lib/tests/fabrics/test_basics.cpp +++ b/lib/tests/fabrics/test_basics.cpp @@ -98,7 +98,7 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { @@ -125,12 +125,11 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrain( - target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -208,7 +207,7 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { @@ -235,11 +234,11 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -267,8 +266,6 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Grain with flows", "[Fabrics][Transfer][Flows]") { - namespace ofi = mxl::lib::fabrics::ofi; - auto instance = mxlCreateInstance(domain.c_str(), ""); mxlFabricsInstance fabrics; @@ -319,7 +316,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { @@ -391,7 +388,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -411,7 +408,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -452,7 +449,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { @@ -524,7 +521,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -544,7 +541,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -576,8 +573,6 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Grain with flows multi target", "[Fabrics][Transfer][Flows][Multi-targets]") { - namespace ofi = mxl::lib::fabrics::ofi; - auto flowDef = mxl::tests::readFile("../data/v210_flow.json"); auto jsonValue = picojson::value{}; REQUIRE(picojson::parse(jsonValue, flowDef).empty()); @@ -643,7 +638,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; bool initiatorConnected = false; do @@ -742,7 +737,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -766,7 +761,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex, &dummyValidSlices); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -820,7 +815,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; + std::uint64_t dummyIndex; std::uint16_t dummyValidSlices; do { @@ -913,7 +908,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -937,7 +932,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex, &dummyValidSlices); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); diff --git a/tools/mxl-fabrics-demo/demo.cpp b/tools/mxl-fabrics-demo/demo.cpp index f853b5a5..d225258b 100644 --- a/tools/mxl-fabrics-demo/demo.cpp +++ b/tools/mxl-fabrics-demo/demo.cpp @@ -101,7 +101,7 @@ class AppInitator } } - mxlStatus setup(std::string targetInfoStr) + mxlStatus setup(std::string const& targetInfoStr) { _instance = mxlCreateInstance(_config.domain.c_str(), ""); if (_instance == nullptr) @@ -217,7 +217,7 @@ class AppInitator continue; } else if (ret == MXL_ERR_OUT_OF_RANGE_TOO_EARLY) - { + { // NOLINT(bugprone-branch-clone): Repeated for clarity. // We are too early somehow.. retry the same grain later. continue; } @@ -478,14 +478,14 @@ class AppTarget mxlStatus run() { mxlGrainInfo grainInfo; - uint16_t entryIndex = 0; - uint16_t validSlices = 0; - uint8_t* dummyPayload; + std::uint64_t grainIndex = 0; + std::uint16_t validSlices = 0; + std::uint8_t* dummyPayload; mxlStatus status; while (!g_exit_requested) { - status = targetReadGrain(&entryIndex, &validSlices, std::chrono::milliseconds(200)); + status = targetReadGrain(&grainIndex, &validSlices, std::chrono::milliseconds(200)); if (status == MXL_ERR_TIMEOUT) { // No completion before a timeout was triggered, most likely a problem upstream. @@ -506,15 +506,6 @@ class AppTarget return status; } - status = mxlFlowWriterGetGrainInfo(_writer, entryIndex, &grainInfo); - if (status != MXL_STATUS_OK) - { - MXL_ERROR("Failed to get grain info with status '{}'", static_cast(status)); - return status; - } - - std::uint64_t grainIndex = grainInfo.index; - // Here we open so that we can commit, we are not going to modify the grain as it was already modified by the initiator. status = mxlFlowWriterOpenGrain(_writer, grainIndex, &grainInfo, &dummyPayload); if (status != MXL_STATUS_OK) @@ -545,16 +536,16 @@ class AppTarget } private: - mxlStatus targetReadGrain(std::uint16_t* entryIndex, std::uint16_t* validSlices, std::chrono::steady_clock::duration timeout) + mxlStatus targetReadGrain(std::uint64_t* grainIndex, std::uint16_t* validSlices, std::chrono::steady_clock::duration timeout) { if (_config.provider == MXL_FABRICS_PROVIDER_EFA) { - return mxlFabricsTargetReadGrainNonBlocking(_target, entryIndex, validSlices); + return mxlFabricsTargetReadGrainNonBlocking(_target, grainIndex, validSlices); } else { return mxlFabricsTargetReadGrain( - _target, entryIndex, validSlices, std::chrono::duration_cast(timeout).count()); + _target, std::chrono::duration_cast(timeout).count(), grainIndex, validSlices); } } From 736f7e1232626719b0c1ea02d2de843165f0629e Mon Sep 17 00:00:00 2001 From: "Jonas Ohland (Riedel)" Date: Wed, 11 Feb 2026 18:22:57 +0100 Subject: [PATCH 3/4] Handle 'validSlices' update internally Co-authored-by: Jonas Ohland Signed-off-by: Jonas Ohland (Riedel) --- lib/fabrics/include/mxl/fabrics.h | 8 +-- lib/fabrics/ofi/src/fabrics.cpp | 23 ++------ .../ofi/src/internal/ProtocolIngressRMA.cpp | 9 ++- lib/fabrics/ofi/src/internal/Region.cpp | 47 +++++++++++++--- lib/fabrics/ofi/src/internal/Region.hpp | 20 ++++++- lib/fabrics/ofi/src/internal/Target.hpp | 1 - .../include/mxl-internal/FlowWriter.hpp | 7 +++ .../src/PosixContinuousFlowWriter.cpp | 9 +++ .../src/PosixContinuousFlowWriter.hpp | 4 ++ lib/internal/src/PosixDiscreteFlowWriter.cpp | 9 +++ lib/internal/src/PosixDiscreteFlowWriter.hpp | 3 + lib/internal/tests/test_domainwatcher.cpp | 7 +++ lib/tests/fabrics/ofi/Util.hpp | 4 +- lib/tests/fabrics/ofi/test_Region.cpp | 5 +- lib/tests/fabrics/test_basics.cpp | 56 ++++++++----------- tools/mxl-fabrics-demo/demo.cpp | 15 ++--- 16 files changed, 147 insertions(+), 80 deletions(-) diff --git a/lib/fabrics/include/mxl/fabrics.h b/lib/fabrics/include/mxl/fabrics.h index a8da8d7c..f8615702 100644 --- a/lib/fabrics/include/mxl/fabrics.h +++ b/lib/fabrics/include/mxl/fabrics.h @@ -167,23 +167,21 @@ extern "C" /** * Non-blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target - * \param out_grainIndex The index of the grain that was written, if any.. - * \param out_sliceIndex The last valid slice index that is ready, if any. + * \param out_grainIndex The index of the grain that was written, if any. * \return The result code. MXL_ERR_NOT_READY if no grain was available at the time of the call, and the call should be retried. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex, uint16_t* out_sliceIndex); + mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex); /** * Blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target * \param out_grainIndex The index of the grain that was written, if any. - * \param out_sliceIndex The last valid slice index that is ready, if any. * \param in_timeoutMs How long should we wait for the grain (in milliseconds) * \return The result code. MXL_ERR_NOT_READY if no grain was available before the timeout. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_entryIndex, uint16_t* out_sliceIndex); + mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_entryIndex); /** * Create a fabrics initiator instance. diff --git a/lib/fabrics/ofi/src/fabrics.cpp b/lib/fabrics/ofi/src/fabrics.cpp index 3c2c920b..623d3a50 100644 --- a/lib/fabrics/ofi/src/fabrics.cpp +++ b/lib/fabrics/ofi/src/fabrics.cpp @@ -99,7 +99,7 @@ mxlStatus mxlFabricsRegionsForFlowWriter(mxlFlowWriter in_writer, mxlFabricsRegi auto writer = ::mxl::lib::to_FlowWriter(in_writer); // We are leaking the ownership, the user is responsible for calling mxlFabricsRegionsFree to free the memory. - auto regionPtr = std::make_unique(ofi::mxlFabricsRegionsFromFlow(writer->getFlowData())).release(); + auto regionPtr = std::make_unique(ofi::mxlFabricsRegionsFromMutableFlow(writer->getFlowData())).release(); *out_regions = regionPtr->toAPI(); @@ -225,7 +225,7 @@ mxlStatus mxlFabricsTargetSetup(mxlFabricsTarget in_target, mxlFabricsTargetConf } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex, uint16_t* out_sliceIndex) +mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex) { if ((in_target == nullptr) || (out_grainIndex == nullptr)) { @@ -241,20 +241,14 @@ mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint6 return MXL_ERR_NOT_READY; } - auto [entryIndex, sliceIndex] = *res; - *out_grainIndex = entryIndex; - if (out_sliceIndex) - { - *out_sliceIndex = sliceIndex; - } - + *out_grainIndex = res->grainIndex; return MXL_STATUS_OK; }, "Failed to try for new grain"); } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_grainIndex, uint16_t* out_sliceIndex) +mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_grainIndex) { if ((in_target == nullptr) || (out_grainIndex == nullptr)) { @@ -270,13 +264,7 @@ mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_time return MXL_ERR_TIMEOUT; } - auto [grainIndex, sliceIndex] = *res; - *out_grainIndex = grainIndex; - if (out_sliceIndex) - { - *out_sliceIndex = sliceIndex; - } - + *out_grainIndex = res->grainIndex; return MXL_STATUS_OK; }, "Failed to wait for new grain"); @@ -529,7 +517,6 @@ mxlStatus mxlFabricsTargetInfoToString(mxlFabricsTargetInfo const in_targetInfo, return ofi::try_run( [&]() { - std::stringstream ss; auto targetInfoString = ofi::TargetInfo::fromAPI(in_targetInfo)->toJSON(); if (out_string == nullptr) diff --git a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp index 24539b42..8de8b4b6 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp +++ b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp @@ -52,8 +52,15 @@ namespace mxl::lib::fabrics::ofi } auto [slot, slice] = ImmDataGrain{static_cast(*immData)}.unpack(); + + // Set the number of valid slices in the grain header. This information is received through the immediate data and must be updated + // in the local shared memory in the case of partial writes. + setValidSlicesForGrain(_regions, slot, slice); + + // Get the actual grain index from the grain header in share memory. This was written in the first RMA write. auto grainIndex = getGrainIndexInRingSlot(_regions, slot); - return std::make_optional(grainIndex, slice); + + return std::make_optional(grainIndex); } void RMAGrainIngressProtocol::reset() diff --git a/lib/fabrics/ofi/src/internal/Region.cpp b/lib/fabrics/ofi/src/internal/Region.cpp index ba107009..384b795f 100644 --- a/lib/fabrics/ofi/src/internal/Region.cpp +++ b/lib/fabrics/ofi/src/internal/Region.cpp @@ -9,6 +9,7 @@ #include #include #include +#include "mxl/dataformat.h" #include "mxl/fabrics.h" #include "mxl/flow.h" #include "mxl/mxl.h" @@ -119,6 +120,29 @@ namespace mxl::lib::fabrics::ofi return _layout; } + MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow) + { + auto mxlRegions = mxlFabricsRegionsFromFlow(flow); + + if (mxlIsDiscreteDataFormat(static_cast(flow.flowInfo()->config.common.format))) + { + auto& discreteFlow = static_cast(flow); + + if (mxlRegions._regions.size() != discreteFlow.grainCount()) + { + throw Exception::invalidState("Unexpected number of grains in discrete flow"); + } + + for (std::size_t i = 0; i < discreteFlow.grainCount(); ++i) + { + mxlRegions._regions[i].grainIndexPtr = &discreteFlow.grainAt(i)->header.info.index; + mxlRegions._regions[i].validSlicesPtr = &discreteFlow.grainAt(i)->header.info.validSlices; + } + } + + return mxlRegions; + } + MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow) { static_assert(sizeof(GrainHeader) == 8192, @@ -131,7 +155,7 @@ namespace mxl::lib::fabrics::ofi "GPU memory is not currently supported in the Flow API of MXL. Edit the code below when it is supported"); } - if (mxlIsDiscreteDataFormat(flow.flowInfo()->config.common.format)) + if (mxlIsDiscreteDataFormat(static_cast(flow.flowInfo()->config.common.format))) { auto& discreteFlow = static_cast(flow); std::vector regions; @@ -143,7 +167,6 @@ namespace mxl::lib::fabrics::ofi auto grainInfoBaseAddr = reinterpret_cast(discreteFlow.grainAt(i)); auto grainInfoSize = sizeof(GrainHeader); auto grainPayloadSize = grain->header.info.grainSize; - auto grainIndexPtr = &grain->header.info.index; if (flow.flowInfo()->config.common.payloadLocation != MXL_PAYLOAD_LOCATION_HOST_MEMORY) { @@ -151,7 +174,7 @@ namespace mxl::lib::fabrics::ofi "GPU memory is not currently supported in the Flow API of MXL. Edit the code below when it is supported"); } - regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, grainIndexPtr, Region::Location::host()); + regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, nullptr, nullptr, Region::Location::host()); } // TODO: Add an utility function to retrieve the number of available planes when alpha support is added. @@ -175,13 +198,23 @@ namespace mxl::lib::fabrics::ofi } } - std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slotIndex) + std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slot) + { + if (slot >= regions.size()) + { + throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slot, regions.size()); + } + + return *regions[slot].grainIndexPtr; + } + + void setValidSlicesForGrain(std::vector const& regions, std::uint16_t slot, std::uint16_t validSlices) { - if (slotIndex >= regions.size()) + if (slot >= regions.size()) { - throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slotIndex, regions.size()); + throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slot, regions.size()); } - return *regions[slotIndex].grainIndexPtr; + *regions[slot].validSlicesPtr = validSlices; } } diff --git a/lib/fabrics/ofi/src/internal/Region.hpp b/lib/fabrics/ofi/src/internal/Region.hpp index 5a29f370..5bedb3d2 100644 --- a/lib/fabrics/ofi/src/internal/Region.hpp +++ b/lib/fabrics/ofi/src/internal/Region.hpp @@ -95,10 +95,12 @@ namespace mxl::lib::fabrics::ofi * \param size The size of the memory region in bytes. * \param loc The location of the memory region \see Location. */ - explicit Region(std::uintptr_t base, std::size_t size, std::uint64_t const* grainIndexPtr, Location loc = Location::host()) noexcept + explicit Region(std::uintptr_t base, std::size_t size, std::uint64_t const* grainIndexPtr, std::uint16_t* validSlicesPtr, + Location loc = Location::host()) noexcept : base(base) , size(size) , grainIndexPtr(grainIndexPtr) + , validSlicesPtr(validSlicesPtr) , loc(loc) , _iovec(iovecFromRegion(base, size)) {} @@ -118,6 +120,7 @@ namespace mxl::lib::fabrics::ofi std::uintptr_t base; std::size_t size; std::uint64_t const* grainIndexPtr; + std::uint16_t* validSlicesPtr; Location loc; private: @@ -221,7 +224,8 @@ namespace mxl::lib::fabrics::ofi DataLayout const& dataLayout() const noexcept; private: - friend MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow); + friend MxlRegions mxlFabricsRegionsFromFlow(FlowData& flow); + friend MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow); private: std::vector _regions; @@ -234,5 +238,17 @@ namespace mxl::lib::fabrics::ofi [[nodiscard]] MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow); + /** \brief Convert a FlowData's memory regions to MxlRegions. + * FlowData are obtained from an MXL FlowWriter or FlowReader. + */ + [[nodiscard]] + MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow); + + /** \brief + */ std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slotIndex); + + /** \brief + */ + void setValidSlicesForGrain(std::vector const& regions, std::uint16_t slot, std::uint16_t validSlices); } diff --git a/lib/fabrics/ofi/src/internal/Target.hpp b/lib/fabrics/ofi/src/internal/Target.hpp index b3d20e56..f1327c4c 100644 --- a/lib/fabrics/ofi/src/internal/Target.hpp +++ b/lib/fabrics/ofi/src/internal/Target.hpp @@ -22,7 +22,6 @@ namespace mxl::lib::fabrics::ofi struct GrainReadResult { std::uint64_t grainIndex; - std::uint16_t sliceIndex; }; public: diff --git a/lib/internal/include/mxl-internal/FlowWriter.hpp b/lib/internal/include/mxl-internal/FlowWriter.hpp index 37415199..754406e0 100644 --- a/lib/internal/include/mxl-internal/FlowWriter.hpp +++ b/lib/internal/include/mxl-internal/FlowWriter.hpp @@ -22,6 +22,13 @@ namespace mxl::lib [[nodiscard]] uuids::uuid const& getId() const; + /** + * Accessor for the underlying flow data. + * The flow writer must first open the flow before invoking this method. + */ + [[nodiscard]] + virtual FlowData& getFlowData() = 0; + /** * Accessor for the underlying flow data. * The flow writer must first open the flow before invoking this method. diff --git a/lib/internal/src/PosixContinuousFlowWriter.cpp b/lib/internal/src/PosixContinuousFlowWriter.cpp index 1b632172..d9206ed5 100644 --- a/lib/internal/src/PosixContinuousFlowWriter.cpp +++ b/lib/internal/src/PosixContinuousFlowWriter.cpp @@ -28,6 +28,15 @@ namespace mxl::lib } } + FlowData& PosixContinuousFlowWriter::getFlowData() + { + if (_flowData) + { + return *_flowData; + } + throw std::runtime_error("No open flow."); + } + FlowData const& PosixContinuousFlowWriter::getFlowData() const { if (_flowData) diff --git a/lib/internal/src/PosixContinuousFlowWriter.hpp b/lib/internal/src/PosixContinuousFlowWriter.hpp index 85a6125e..a0572d99 100644 --- a/lib/internal/src/PosixContinuousFlowWriter.hpp +++ b/lib/internal/src/PosixContinuousFlowWriter.hpp @@ -32,6 +32,10 @@ namespace mxl::lib PosixContinuousFlowWriter(FlowManager const& manager, uuids::uuid const& flowId, std::unique_ptr&& data); public: + /** \see FlowWriter::getFlowData */ + [[nodiscard]] + virtual FlowData& getFlowData() override; + /** \see FlowWriter::getFlowData */ [[nodiscard]] virtual FlowData const& getFlowData() const override; diff --git a/lib/internal/src/PosixDiscreteFlowWriter.cpp b/lib/internal/src/PosixDiscreteFlowWriter.cpp index e4eed228..b3f8d9ec 100644 --- a/lib/internal/src/PosixDiscreteFlowWriter.cpp +++ b/lib/internal/src/PosixDiscreteFlowWriter.cpp @@ -39,6 +39,15 @@ namespace mxl::lib } } + FlowData& PosixDiscreteFlowWriter::getFlowData() + { + if (_flowData) + { + return *_flowData; + } + throw std::runtime_error("No open flow."); + } + FlowData const& PosixDiscreteFlowWriter::getFlowData() const { if (_flowData) diff --git a/lib/internal/src/PosixDiscreteFlowWriter.hpp b/lib/internal/src/PosixDiscreteFlowWriter.hpp index cdb253fe..efaec904 100644 --- a/lib/internal/src/PosixDiscreteFlowWriter.hpp +++ b/lib/internal/src/PosixDiscreteFlowWriter.hpp @@ -37,6 +37,9 @@ namespace mxl::lib * Accessor for the underlying flow data. * The flow writer must first open the flow before invoking this method. */ + [[nodiscard]] + virtual FlowData& getFlowData() override; + [[nodiscard]] virtual FlowData const& getFlowData() const override; diff --git a/lib/internal/tests/test_domainwatcher.cpp b/lib/internal/tests/test_domainwatcher.cpp index 9c9e7242..f11967dd 100644 --- a/lib/internal/tests/test_domainwatcher.cpp +++ b/lib/internal/tests/test_domainwatcher.cpp @@ -137,6 +137,13 @@ struct MockWriter : mxl::lib::DiscreteFlowWriter } } + [[noreturn]] + virtual FlowData& getFlowData() override + { + // Implementation is not provided + std::terminate(); + } + [[noreturn]] virtual FlowData const& getFlowData() const override { diff --git a/lib/tests/fabrics/ofi/Util.hpp b/lib/tests/fabrics/ofi/Util.hpp index 143eeb9f..cc5ac470 100644 --- a/lib/tests/fabrics/ofi/Util.hpp +++ b/lib/tests/fabrics/ofi/Util.hpp @@ -90,7 +90,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr, nullptr); } auto mxlRegions = MxlRegions(regions, DataLayout::fromVideo({8, 0, 0, 0})); @@ -103,7 +103,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr, nullptr); } return {regions, dataLayout}; } diff --git a/lib/tests/fabrics/ofi/test_Region.cpp b/lib/tests/fabrics/ofi/test_Region.cpp index b7df7bd9..495403e8 100644 --- a/lib/tests/fabrics/ofi/test_Region.cpp +++ b/lib/tests/fabrics/ofi/test_Region.cpp @@ -25,9 +25,10 @@ TEST_CASE("ofi: Region constructors", "[ofi][Constructors]") TEST_CASE("ofi: RegionGroup view and iovec conversion", "[ofi][RegionGroup]") { std::uint64_t dummyGrainIndex = 0; + std::uint16_t dummyValidSlices = 0; - auto r1 = Region{0x1000, 64, &dummyGrainIndex, Region::Location::host()}; - auto r2 = Region{0x2000, 128, &dummyGrainIndex, Region::Location::host()}; + auto r1 = Region{0x1000, 64, &dummyGrainIndex, &dummyValidSlices, Region::Location::host()}; + auto r2 = Region{0x2000, 128, &dummyGrainIndex, &dummyValidSlices, Region::Location::host()}; auto group = RegionGroup({r1, r2}); REQUIRE(group.size() == 2); diff --git a/lib/tests/fabrics/test_basics.cpp b/lib/tests/fabrics/test_basics.cpp index 428d0a82..064a0076 100644 --- a/lib/tests/fabrics/test_basics.cpp +++ b/lib/tests/fabrics/test_basics.cpp @@ -99,10 +99,9 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -126,10 +125,9 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -208,10 +206,9 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -235,10 +232,9 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -317,10 +313,9 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -346,7 +341,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -366,7 +361,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressNonBlocking(initiator); - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -388,7 +383,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -408,7 +403,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -450,10 +445,9 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -479,7 +473,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -499,7 +493,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressNonBlocking(initiator); - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -521,7 +515,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -541,7 +535,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -639,13 +633,12 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; bool initiatorConnected = false; do { for (auto& target : targets) { - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) { FAIL("Something went wrong in the target: " + std::to_string(status)); @@ -683,7 +676,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -707,7 +700,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressNonBlocking(initiator); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -737,7 +730,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -761,7 +754,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -816,12 +809,11 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); std::uint64_t dummyIndex; - std::uint16_t dummyValidSlices; do { for (auto& target : targets) { - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) { FAIL("Something went wrong in the target: " + std::to_string(status)); @@ -854,7 +846,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressNonBlocking(initiator); @@ -879,7 +871,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressNonBlocking(initiator); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -908,7 +900,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, 20, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -932,7 +924,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); diff --git a/tools/mxl-fabrics-demo/demo.cpp b/tools/mxl-fabrics-demo/demo.cpp index d225258b..93fe23b7 100644 --- a/tools/mxl-fabrics-demo/demo.cpp +++ b/tools/mxl-fabrics-demo/demo.cpp @@ -479,13 +479,12 @@ class AppTarget { mxlGrainInfo grainInfo; std::uint64_t grainIndex = 0; - std::uint16_t validSlices = 0; std::uint8_t* dummyPayload; mxlStatus status; while (!g_exit_requested) { - status = targetReadGrain(&grainIndex, &validSlices, std::chrono::milliseconds(200)); + status = targetReadGrain(&grainIndex, std::chrono::milliseconds(200)); if (status == MXL_ERR_TIMEOUT) { // No completion before a timeout was triggered, most likely a problem upstream. @@ -514,9 +513,6 @@ class AppTarget return status; } - // Now that the grain is open, we can se the valid slices. - grainInfo.validSlices = validSlices; - // GrainInfo and media payload was already written by the remote endpoint, we simply commit!. status = mxlFlowWriterCommitGrain(_writer, &grainInfo); if (status != MXL_STATUS_OK) @@ -528,7 +524,7 @@ class AppTarget MXL_DEBUG("Comitted grain with index={} current index={} validSlices={} flags={}", grainIndex, mxlGetCurrentIndex(&_configInfo.common.grainRate), - validSlices, + grainInfo.validSlices, grainInfo.flags); } @@ -536,16 +532,15 @@ class AppTarget } private: - mxlStatus targetReadGrain(std::uint64_t* grainIndex, std::uint16_t* validSlices, std::chrono::steady_clock::duration timeout) + mxlStatus targetReadGrain(std::uint64_t* grainIndex, std::chrono::steady_clock::duration timeout) { if (_config.provider == MXL_FABRICS_PROVIDER_EFA) { - return mxlFabricsTargetReadGrainNonBlocking(_target, grainIndex, validSlices); + return mxlFabricsTargetReadGrainNonBlocking(_target, grainIndex); } else { - return mxlFabricsTargetReadGrain( - _target, std::chrono::duration_cast(timeout).count(), grainIndex, validSlices); + return mxlFabricsTargetReadGrain(_target, std::chrono::duration_cast(timeout).count(), grainIndex); } } From 42b4a4a074326e9daec44c5123310da6558ba1c2 Mon Sep 17 00:00:00 2001 From: "Jonas Ohland (Riedel)" Date: Wed, 11 Feb 2026 21:56:33 +0100 Subject: [PATCH 4/4] Fix remote index packing Co-authored-by: Jonas Ohland Signed-off-by: Jonas Ohland (Riedel) --- lib/fabrics/ofi/src/fabrics.cpp | 4 ++-- lib/fabrics/ofi/src/internal/GrainSlices.cpp | 2 +- lib/fabrics/ofi/src/internal/GrainSlices.hpp | 2 +- lib/fabrics/ofi/src/internal/ImmData.cpp | 2 +- lib/fabrics/ofi/src/internal/ImmData.hpp | 2 +- lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp | 7 ++++--- 6 files changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/fabrics/ofi/src/fabrics.cpp b/lib/fabrics/ofi/src/fabrics.cpp index 623d3a50..f0bb7002 100644 --- a/lib/fabrics/ofi/src/fabrics.cpp +++ b/lib/fabrics/ofi/src/fabrics.cpp @@ -15,7 +15,6 @@ #include #include "internal/Exception.hpp" #include "internal/FabricInstance.hpp" -#include "internal/ImmData.hpp" #include "internal/Initiator.hpp" #include "internal/Provider.hpp" #include "internal/Region.hpp" @@ -238,6 +237,7 @@ mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint6 auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrain(); if (!res) { + MXL_INFO("not ready"); return MXL_ERR_NOT_READY; } @@ -261,7 +261,7 @@ mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_time auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrainBlocking(std::chrono::milliseconds(in_timeoutMs)); if (!res) { - return MXL_ERR_TIMEOUT; + return MXL_ERR_NOT_READY; } *out_grainIndex = res->grainIndex; diff --git a/lib/fabrics/ofi/src/internal/GrainSlices.cpp b/lib/fabrics/ofi/src/internal/GrainSlices.cpp index 853581c2..e45d8b1a 100644 --- a/lib/fabrics/ofi/src/internal/GrainSlices.cpp +++ b/lib/fabrics/ofi/src/internal/GrainSlices.cpp @@ -29,7 +29,7 @@ namespace mxl::lib::fabrics::ofi return size; } - std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept + std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept { if (_start == 0) { diff --git a/lib/fabrics/ofi/src/internal/GrainSlices.hpp b/lib/fabrics/ofi/src/internal/GrainSlices.hpp index fb214b4b..bbc91918 100644 --- a/lib/fabrics/ofi/src/internal/GrainSlices.hpp +++ b/lib/fabrics/ofi/src/internal/GrainSlices.hpp @@ -46,7 +46,7 @@ namespace mxl::lib::fabrics::ofi * \note When start is 0, the offset is 0, because we include the header in the transfer. */ [[nodiscard]] - std::uint32_t transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept; + std::uint32_t transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept; private: SliceRange(std::uint16_t start, std::uint16_t end) noexcept diff --git a/lib/fabrics/ofi/src/internal/ImmData.cpp b/lib/fabrics/ofi/src/internal/ImmData.cpp index d1f59f59..65ddd94a 100644 --- a/lib/fabrics/ofi/src/internal/ImmData.cpp +++ b/lib/fabrics/ofi/src/internal/ImmData.cpp @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi ImmDataGrain::ImmDataGrain(std::uint64_t index, std::uint16_t sliceIndex) noexcept { auto ringBufferIndex = static_cast(index); - _inner = std::bit_cast(Unpacked{.ringBufferIndex = ringBufferIndex, .sliceIndex = sliceIndex}); + _inner = std::bit_cast(Unpacked{.ringBufferSlot = ringBufferIndex, .sliceIndex = sliceIndex}); } ImmDataGrain::Unpacked ImmDataGrain::unpack() const noexcept diff --git a/lib/fabrics/ofi/src/internal/ImmData.hpp b/lib/fabrics/ofi/src/internal/ImmData.hpp index 6d72a251..21d35ee6 100644 --- a/lib/fabrics/ofi/src/internal/ImmData.hpp +++ b/lib/fabrics/ofi/src/internal/ImmData.hpp @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi public: struct Unpacked //**< Unpacked representation of immediate data. */ { - std::uint16_t ringBufferIndex; + std::uint16_t ringBufferSlot; std::uint16_t sliceIndex; }; diff --git a/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp b/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp index 811bef6a..4b2a5bb3 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp +++ b/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp @@ -27,8 +27,9 @@ namespace mxl::lib::fabrics::ofi auto localRegion = _localRegions[localIndex % _localRegions.size()].sub(localOffset, localSize); auto remoteRegion = _remoteInfo.remoteRegions[remoteIndex % _remoteInfo.remoteRegions.size()].sub(remoteOffset, remoteSize); + auto remoteSlot = remoteIndex % _remoteInfo.remoteRegions.size(); - _pending += ep.write(_token, localRegion, remoteRegion, destAddr, ImmDataGrain{remoteIndex, sliceRange.end()}.data()); + _pending += ep.write(_token, localRegion, remoteRegion, destAddr, ImmDataGrain{remoteSlot, sliceRange.end()}.data()); } void RMAGrainEgressProtocol::processCompletion(Completion::Data const&) @@ -47,7 +48,7 @@ namespace mxl::lib::fabrics::ofi } RMAGrainEgressProtocolTemplate::RMAGrainEgressProtocolTemplate(DataLayout layout, std::vector regions) - : _dataLayout(std::move(layout)) + : _dataLayout(layout) , _regions(std::move(regions)) {} @@ -72,7 +73,7 @@ namespace mxl::lib::fabrics::ofi struct MakeUniqueEnabler : RMAGrainEgressProtocol { MakeUniqueEnabler(Completion::Token token, TargetInfo info, DataLayout layout, std::vector localRegion) - : RMAGrainEgressProtocol(token, std::move(info), std::move(layout), std::move(localRegion)) + : RMAGrainEgressProtocol(token, std::move(info), layout, std::move(localRegion)) {} };