diff --git a/lib/fabrics/include/mxl/fabrics.h b/lib/fabrics/include/mxl/fabrics.h index cfc3d8d1..f8615702 100644 --- a/lib/fabrics/include/mxl/fabrics.h +++ b/lib/fabrics/include/mxl/fabrics.h @@ -167,23 +167,21 @@ extern "C" /** * Non-blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target - * \param out_entryIndex The index of the grain ring buffer index that is ready, if any. - * \param out_sliceIndex The last valid slice index that is ready, if any. + * \param out_grainIndex The index of the grain that was written, if any. * \return The result code. MXL_ERR_NOT_READY if no grain was available at the time of the call, and the call should be retried. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex); + mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex); /** * Blocking accessor for a flow grain at a specific index. * \param in_target A valid fabrics target - * \param out_entryIndex The index of the grain ring buffer index that is ready, if any. - * \param out_sliceIndex The last valid slice index that is ready, if any. + * \param out_grainIndex The index of the grain that was written, if any. * \param in_timeoutMs How long should we wait for the grain (in milliseconds) * \return The result code. MXL_ERR_NOT_READY if no grain was available before the timeout. \see mxlStatus */ MXL_EXPORT - mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs); + mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_entryIndex); /** * Create a fabrics initiator instance. diff --git a/lib/fabrics/ofi/src/fabrics.cpp b/lib/fabrics/ofi/src/fabrics.cpp index aba15e6b..f0bb7002 100644 --- a/lib/fabrics/ofi/src/fabrics.cpp +++ b/lib/fabrics/ofi/src/fabrics.cpp @@ -15,7 +15,6 @@ #include #include "internal/Exception.hpp" #include "internal/FabricInstance.hpp" -#include "internal/ImmData.hpp" #include "internal/Initiator.hpp" #include "internal/Provider.hpp" #include "internal/Region.hpp" @@ -99,7 +98,7 @@ mxlStatus mxlFabricsRegionsForFlowWriter(mxlFlowWriter in_writer, mxlFabricsRegi auto writer = ::mxl::lib::to_FlowWriter(in_writer); // We are leaking the ownership, the user is responsible for calling mxlFabricsRegionsFree to free the memory. - auto regionPtr = std::make_unique(ofi::mxlFabricsRegionsFromFlow(writer->getFlowData())).release(); + auto regionPtr = std::make_unique(ofi::mxlFabricsRegionsFromMutableFlow(writer->getFlowData())).release(); *out_regions = regionPtr->toAPI(); @@ -225,9 +224,9 @@ mxlStatus mxlFabricsTargetSetup(mxlFabricsTarget in_target, mxlFabricsTargetConf } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex) +mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex) { - if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr)) + if ((in_target == nullptr) || (out_grainIndex == nullptr)) { return MXL_ERR_INVALID_ARG; } @@ -235,25 +234,23 @@ mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint1 return ofi::try_run( [&]() { - auto target = ofi::TargetWrapper::fromAPI(in_target); - if (auto res = target->read(); res.immData) + auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrain(); + if (!res) { - auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack(); - *out_entryIndex = entryIndex; - *out_sliceIndex = sliceIndex; - - return MXL_STATUS_OK; + MXL_INFO("not ready"); + return MXL_ERR_NOT_READY; } - return MXL_ERR_NOT_READY; + *out_grainIndex = res->grainIndex; + return MXL_STATUS_OK; }, "Failed to try for new grain"); } extern "C" MXL_EXPORT -mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs) +mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_grainIndex) { - if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr)) + if ((in_target == nullptr) || (out_grainIndex == nullptr)) { return MXL_ERR_INVALID_ARG; } @@ -261,17 +258,14 @@ mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_en return ofi::try_run( [&]() { - auto target = ofi::TargetWrapper::fromAPI(in_target); - if (auto res = target->readBlocking(std::chrono::milliseconds(in_timeoutMs)); res.immData) + auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrainBlocking(std::chrono::milliseconds(in_timeoutMs)); + if (!res) { - auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack(); - *out_entryIndex = entryIndex; - *out_sliceIndex = sliceIndex; - - return MXL_STATUS_OK; + return MXL_ERR_NOT_READY; } - return MXL_ERR_NOT_READY; + *out_grainIndex = res->grainIndex; + return MXL_STATUS_OK; }, "Failed to wait for new grain"); } @@ -523,7 +517,6 @@ mxlStatus mxlFabricsTargetInfoToString(mxlFabricsTargetInfo const in_targetInfo, return ofi::try_run( [&]() { - std::stringstream ss; auto targetInfoString = ofi::TargetInfo::fromAPI(in_targetInfo)->toJSON(); if (out_string == nullptr) diff --git a/lib/fabrics/ofi/src/internal/FILogging.cpp b/lib/fabrics/ofi/src/internal/FILogging.cpp index fcb74135..4e6d7ece 100644 --- a/lib/fabrics/ofi/src/internal/FILogging.cpp +++ b/lib/fabrics/ofi/src/internal/FILogging.cpp @@ -20,138 +20,140 @@ namespace mxl::lib::fabrics::ofi { namespace { - constexpr static auto fiLogLevelStrings = std::array, 4>{ + constexpr auto fiLogLevelStrings = std::array, 4>{ {{"trace", FI_LOG_TRACE}, {"debug", FI_LOG_DEBUG}, {"info", FI_LOG_INFO}, {"warn", FI_LOG_WARN}} }; - } - int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags); - int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags, std::uint64_t* showtime); - void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn); + static int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags); + static int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags, + std::uint64_t* showtime); + void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn); - class FILogging - { - public: - void init() + class FILogging { - // It is safe to call a logging functions on the libfabric side - // while we initialize logging here. We just need to make sure - // to initialize logging only once. - if (_isInit.exchange(true, std::memory_order_relaxed)) - { - return; - } - - auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL"); - if (fiLogLevelCStr != nullptr) + public: + void init() { - auto fiLogLevelStr = std::string{fiLogLevelCStr}; - std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](char const c) -> char { return std::tolower(c); }); - auto it = std::ranges::find_if(fiLogLevelStrings, - [fiLogLevelStr](std::pair const& item) { return fiLogLevelStr == item.first; }); - if (it != fiLogLevelStrings.end()) + // It is safe to call a logging functions on the libfabric side + // while we initialize logging here. We just need to make sure + // to initialize logging only once. + if (_isInit.exchange(true, std::memory_order_relaxed)) { - _level = it->second; + return; } - else + + auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL"); + if (fiLogLevelCStr != nullptr) { - _level = FI_LOG_WARN; + auto fiLogLevelStr = std::string{fiLogLevelCStr}; + std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](unsigned char const c) { return std::tolower(c); }); + auto it = std::ranges::find_if(fiLogLevelStrings, + [fiLogLevelStr](std::pair const& item) { return fiLogLevelStr == item.first; }); + if (it != fiLogLevelStrings.end()) + { + _level = it->second; + } + else + { + _level = FI_LOG_WARN; + } } + + auto ops = ::fi_ops_log{ + .size = sizeof(::fi_ops_log), + .enabled = &fiLogEnabled, + .ready = &fiLogReady, + .log = &fiLog, + }; + + auto logging = ::fid_logging{ + .fid = ::fid{}, + .ops = &ops, + }; + + fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging); } - auto ops = ::fi_ops_log{ - .size = sizeof(::fi_ops_log), - .enabled = &fiLogEnabled, - .ready = &fiLogReady, - .log = &fiLog, - }; + private: + friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t); + + [[nodiscard]] + fi_log_level level() const noexcept + { + return _level; + } - auto logging = ::fid_logging{ - .fid = ::fid{}, - .ops = &ops, - }; + [[nodiscard]] + bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept + { + return true; + } - fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging); - } + [[nodiscard]] + bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept + { + return true; + } - private: - friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t); + std::atomic_bool _isInit; + ::fi_log_level _level; + }; - [[nodiscard]] - fi_log_level level() const noexcept - { - return _level; - } + FILogging logging; - [[nodiscard]] - bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept + char const* fiLogSubsystemName(enum fi_log_subsys subsys) { - return true; + switch (subsys) + { + case FI_LOG_CORE: return "core"; + case FI_LOG_FABRIC: return "fabric"; + case FI_LOG_DOMAIN: return "domain"; + case FI_LOG_EP_CTRL: return "ep_ctrl"; + case FI_LOG_EP_DATA: return "ep_data"; + case FI_LOG_AV: return "av"; + case FI_LOG_CQ: return "cq"; + case FI_LOG_EQ: return "eq"; + case FI_LOG_MR: return "mr"; + case FI_LOG_CNTR: return "cntr"; + default: return ""; + } } - [[nodiscard]] - bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept + spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level) { - return true; + switch (level) + { + case FI_LOG_WARN: return spdlog::level::warn; + case FI_LOG_TRACE: return spdlog::level::trace; + case FI_LOG_INFO: return spdlog::level::info; + case FI_LOG_DEBUG: return spdlog::level::debug; + default: return spdlog::level::warn; + } } - std::atomic_bool _isInit; - ::fi_log_level _level; - }; - - static FILogging logging; - - char const* fiLogSubsystemName(enum fi_log_subsys subsys) - { - switch (subsys) + int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t) { - case FI_LOG_CORE: return "core"; - case FI_LOG_FABRIC: return "fabric"; - case FI_LOG_DOMAIN: return "domain"; - case FI_LOG_EP_CTRL: return "ep_ctrl"; - case FI_LOG_EP_DATA: return "ep_data"; - case FI_LOG_AV: return "av"; - case FI_LOG_CQ: return "cq"; - case FI_LOG_EQ: return "eq"; - case FI_LOG_MR: return "mr"; - case FI_LOG_CNTR: return "cntr"; - default: return ""; + return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) && + logging.isSubsystemLoggingEnabled(subsys); } - } - spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level) - { - switch (level) + int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*) { - case FI_LOG_WARN: return spdlog::level::warn; - case FI_LOG_TRACE: return spdlog::level::trace; - case FI_LOG_INFO: return spdlog::level::info; - case FI_LOG_DEBUG: return spdlog::level::debug; - default: return spdlog::level::warn; + return 0; } - } - int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t) - { - return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) && - logging.isSubsystemLoggingEnabled(subsys); - } - - int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*) - { - return 0; - } - - void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn) - { - auto msg = std::string{msgIn}; - if (msg.ends_with('\n')) + void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn) { - msg.pop_back(); + auto msg = std::string{msgIn}; + if (msg.ends_with('\n')) + { + msg.pop_back(); + } + + spdlog::log( + {fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg); } - spdlog::log( - {fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg); } // Initialize logging diff --git a/lib/fabrics/ofi/src/internal/GrainSlices.cpp b/lib/fabrics/ofi/src/internal/GrainSlices.cpp index 853581c2..e45d8b1a 100644 --- a/lib/fabrics/ofi/src/internal/GrainSlices.cpp +++ b/lib/fabrics/ofi/src/internal/GrainSlices.cpp @@ -29,7 +29,7 @@ namespace mxl::lib::fabrics::ofi return size; } - std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept + std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept { if (_start == 0) { diff --git a/lib/fabrics/ofi/src/internal/GrainSlices.hpp b/lib/fabrics/ofi/src/internal/GrainSlices.hpp index fb214b4b..bbc91918 100644 --- a/lib/fabrics/ofi/src/internal/GrainSlices.hpp +++ b/lib/fabrics/ofi/src/internal/GrainSlices.hpp @@ -46,7 +46,7 @@ namespace mxl::lib::fabrics::ofi * \note When start is 0, the offset is 0, because we include the header in the transfer. */ [[nodiscard]] - std::uint32_t transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept; + std::uint32_t transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept; private: SliceRange(std::uint16_t start, std::uint16_t end) noexcept diff --git a/lib/fabrics/ofi/src/internal/ImmData.cpp b/lib/fabrics/ofi/src/internal/ImmData.cpp index d1f59f59..65ddd94a 100644 --- a/lib/fabrics/ofi/src/internal/ImmData.cpp +++ b/lib/fabrics/ofi/src/internal/ImmData.cpp @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi ImmDataGrain::ImmDataGrain(std::uint64_t index, std::uint16_t sliceIndex) noexcept { auto ringBufferIndex = static_cast(index); - _inner = std::bit_cast(Unpacked{.ringBufferIndex = ringBufferIndex, .sliceIndex = sliceIndex}); + _inner = std::bit_cast(Unpacked{.ringBufferSlot = ringBufferIndex, .sliceIndex = sliceIndex}); } ImmDataGrain::Unpacked ImmDataGrain::unpack() const noexcept diff --git a/lib/fabrics/ofi/src/internal/ImmData.hpp b/lib/fabrics/ofi/src/internal/ImmData.hpp index 6d72a251..21d35ee6 100644 --- a/lib/fabrics/ofi/src/internal/ImmData.hpp +++ b/lib/fabrics/ofi/src/internal/ImmData.hpp @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi public: struct Unpacked //**< Unpacked representation of immediate data. */ { - std::uint16_t ringBufferIndex; + std::uint16_t ringBufferSlot; std::uint16_t sliceIndex; }; diff --git a/lib/fabrics/ofi/src/internal/LocalRegion.cpp b/lib/fabrics/ofi/src/internal/LocalRegion.cpp index 0fe6664d..49e4f855 100644 --- a/lib/fabrics/ofi/src/internal/LocalRegion.cpp +++ b/lib/fabrics/ofi/src/internal/LocalRegion.cpp @@ -4,13 +4,17 @@ #include "LocalRegion.hpp" #include +#include "Exception.hpp" namespace mxl::lib::fabrics::ofi { - LocalRegion LocalRegion::sub(std::uint64_t offset, std::size_t length) const noexcept + LocalRegion LocalRegion::sub(std::uint64_t offset, std::size_t length) const { - assert(offset + length <= len); + if (offset + length > len) + { + throw Exception::invalidState("Tried to access out-of-bounds sub region of LocalRegion"); + } return LocalRegion{ .addr = addr + offset, @@ -21,7 +25,7 @@ namespace mxl::lib::fabrics::ofi ::iovec LocalRegion::toIovec() const noexcept { - return ::iovec{.iov_base = reinterpret_cast(addr), .iov_len = len}; + return ::iovec{.iov_base = reinterpret_cast(addr), .iov_len = len}; // NOLINT(performance-no-int-to-ptr): No way to avoid this } ::iovec const* LocalRegionGroup::asIovec() const noexcept diff --git a/lib/fabrics/ofi/src/internal/LocalRegion.hpp b/lib/fabrics/ofi/src/internal/LocalRegion.hpp index cca74438..550ac737 100644 --- a/lib/fabrics/ofi/src/internal/LocalRegion.hpp +++ b/lib/fabrics/ofi/src/internal/LocalRegion.hpp @@ -26,9 +26,8 @@ namespace mxl::lib::fabrics::ofi * \param length The length of the sub-region. * \return A new LocalRegion representing the specified sub-region. */ - [[nodiscard]] - LocalRegion sub(std::uint64_t offset, std::size_t length) const noexcept; + LocalRegion sub(std::uint64_t offset, std::size_t length) const; /** \brief Convert this LocalRegion to a struct iovec used by libfabric transfer functions. */ diff --git a/lib/fabrics/ofi/src/internal/Protocol.hpp b/lib/fabrics/ofi/src/internal/Protocol.hpp index c810b015..a48c6827 100644 --- a/lib/fabrics/ofi/src/internal/Protocol.hpp +++ b/lib/fabrics/ofi/src/internal/Protocol.hpp @@ -40,7 +40,7 @@ namespace mxl::lib::fabrics::ofi * \param endpoint The endpoint associated with the completion * \param completion The completion object to process. */ - virtual Target::ReadResult processCompletion(Endpoint& endpoint, Completion const& completion) = 0; + virtual std::optional readGrain(Endpoint& endpoint, Completion const& completion) = 0; /** \brief Destroy the protocol object. */ diff --git a/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp b/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp index 811bef6a..4b2a5bb3 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp +++ b/lib/fabrics/ofi/src/internal/ProtocolEgressRMA.cpp @@ -27,8 +27,9 @@ namespace mxl::lib::fabrics::ofi auto localRegion = _localRegions[localIndex % _localRegions.size()].sub(localOffset, localSize); auto remoteRegion = _remoteInfo.remoteRegions[remoteIndex % _remoteInfo.remoteRegions.size()].sub(remoteOffset, remoteSize); + auto remoteSlot = remoteIndex % _remoteInfo.remoteRegions.size(); - _pending += ep.write(_token, localRegion, remoteRegion, destAddr, ImmDataGrain{remoteIndex, sliceRange.end()}.data()); + _pending += ep.write(_token, localRegion, remoteRegion, destAddr, ImmDataGrain{remoteSlot, sliceRange.end()}.data()); } void RMAGrainEgressProtocol::processCompletion(Completion::Data const&) @@ -47,7 +48,7 @@ namespace mxl::lib::fabrics::ofi } RMAGrainEgressProtocolTemplate::RMAGrainEgressProtocolTemplate(DataLayout layout, std::vector regions) - : _dataLayout(std::move(layout)) + : _dataLayout(layout) , _regions(std::move(regions)) {} @@ -72,7 +73,7 @@ namespace mxl::lib::fabrics::ofi struct MakeUniqueEnabler : RMAGrainEgressProtocol { MakeUniqueEnabler(Completion::Token token, TargetInfo info, DataLayout layout, std::vector localRegion) - : RMAGrainEgressProtocol(token, std::move(info), std::move(layout), std::move(localRegion)) + : RMAGrainEgressProtocol(token, std::move(info), layout, std::move(localRegion)) {} }; diff --git a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp index d4e773d9..8de8b4b6 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp +++ b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.cpp @@ -4,6 +4,8 @@ #include "ProtocolIngressRMA.hpp" #include "Exception.hpp" +#include "ImmData.hpp" +#include "Region.hpp" namespace mxl::lib::fabrics::ofi { @@ -22,27 +24,43 @@ namespace mxl::lib::fabrics::ofi return domain->remoteRegions(); } - void RMAGrainIngressProtocol::start(Endpoint& ep) + void RMAGrainIngressProtocol::start(Endpoint& endpoint) { - if (ep.domain()->usingRecvBufForCqData()) + if (endpoint.domain()->usingRecvBufForCqData()) { - ep.recv(immDataRegion()); + endpoint.recv(immDataRegion()); } } - Target::ReadResult RMAGrainIngressProtocol::processCompletion(Endpoint& ep, Completion const& completion) + std::optional RMAGrainIngressProtocol::readGrain(Endpoint& endpoint, Completion const& completion) { - if (auto data = completion.tryData(); data) + auto completionData = completion.tryData(); + if (!completionData) { - if (_immDataBuffer) - { - ep.recv(immDataRegion()); - } + return {}; + } + + if (_immDataBuffer) + { + endpoint.recv(_immDataBuffer->toLocalRegion()); + } - return Target::ReadResult{data->data()}; + auto immData = completionData->data(); + if (!immData) + { + throw Exception::invalidState("Received a completion without immediate data."); } - return {}; + auto [slot, slice] = ImmDataGrain{static_cast(*immData)}.unpack(); + + // Set the number of valid slices in the grain header. This information is received through the immediate data and must be updated + // in the local shared memory in the case of partial writes. + setValidSlicesForGrain(_regions, slot, slice); + + // Get the actual grain index from the grain header in share memory. This was written in the first RMA write. + auto grainIndex = getGrainIndexInRingSlot(_regions, slot); + + return std::make_optional(grainIndex); } void RMAGrainIngressProtocol::reset() diff --git a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp index da0d0561..8c229f06 100644 --- a/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp +++ b/lib/fabrics/ofi/src/internal/ProtocolIngressRMA.hpp @@ -28,7 +28,7 @@ namespace mxl::lib::fabrics::ofi /** \copydoc IngressProtocol::processCompletion() */ - Target::ReadResult processCompletion(Endpoint& endpoint, Completion const& completion) override; + std::optional readGrain(Endpoint& endpoint, Completion const& completion) override; /** \copydoc IngressProtocol::destroy() */ diff --git a/lib/fabrics/ofi/src/internal/RCTarget.cpp b/lib/fabrics/ofi/src/internal/RCTarget.cpp index 77ad3589..fa5673a8 100644 --- a/lib/fabrics/ofi/src/internal/RCTarget.cpp +++ b/lib/fabrics/ofi/src/internal/RCTarget.cpp @@ -74,23 +74,23 @@ namespace mxl::lib::fabrics::ofi , _state(WaitForConnectionRequest{std::move(pep)}) {} - Target::ReadResult RCTarget::read() + std::optional RCTarget::readGrain() { - return makeProgress({}); + return readNextGrain({}); } - Target::ReadResult RCTarget::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional RCTarget::readGrainBlocking(std::chrono::steady_clock::duration timeout) { - return makeProgress(timeout); + return readNextGrain(timeout); } void RCTarget::shutdown() {} template - Target::ReadResult RCTarget::makeProgress(std::chrono::steady_clock::duration timeout) + std::optional RCTarget::readNextGrain(std::chrono::steady_clock::duration timeout) { - Target::ReadResult result; + auto result = std::optional{std::nullopt}; _state = std::visit( overloaded{[](std::monostate) -> State { throw Exception::invalidState("Target is in an invalid state an can no longer make progress"); }, @@ -142,7 +142,6 @@ namespace mxl::lib::fabrics::ofi [&](RCTarget::Connected state) -> State { auto [completion, event] = readEndpointQueues(state.ep, timeout); - if (event && event.value().isShutdown()) { MXL_INFO("Remote endpoint has shutdown the connection. Transitioning to listening to new connection."); @@ -151,7 +150,7 @@ namespace mxl::lib::fabrics::ofi if (completion) { - result = _proto->processCompletion(state.ep, *completion); + result = _proto->readGrain(state.ep, *completion); } return Connected{.ep = std::move(state.ep)}; @@ -161,7 +160,7 @@ namespace mxl::lib::fabrics::ofi return result; } - PassiveEndpoint RCTarget::makeListener(std::shared_ptr fabric) + PassiveEndpoint RCTarget::makeListener(std::shared_ptr const& fabric) { // Create a passive endpoint. A passive endpoint can be viewed like a bound TCP socket listening for // incoming connections. diff --git a/lib/fabrics/ofi/src/internal/RCTarget.hpp b/lib/fabrics/ofi/src/internal/RCTarget.hpp index 08ef42ce..dd5bd494 100644 --- a/lib/fabrics/ofi/src/internal/RCTarget.hpp +++ b/lib/fabrics/ofi/src/internal/RCTarget.hpp @@ -31,14 +31,16 @@ namespace mxl::lib::fabrics::ofi [[nodiscard]] static std::pair, std::unique_ptr> setup(mxlFabricsTargetConfig const& config); - /** \copydoc Target::read() + /** \copydoc Target::readGrain() */ - Target::ReadResult read() final; + std::optional readGrain() final; - /** \copydoc Target::readBlocking() + /** \copydoc Target::readGrainBlocking() */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout) final; + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) final; + /** \brief Shut down the target. + */ void shutdown() override; private: @@ -87,10 +89,10 @@ namespace mxl::lib::fabrics::ofi * \return The result of the read operation. */ template - Target::ReadResult makeProgress(std::chrono::steady_clock::duration timeout); + std::optional readNextGrain(std::chrono::steady_clock::duration timeout); [[nodiscard]] - static PassiveEndpoint makeListener(std::shared_ptr fabric); + static PassiveEndpoint makeListener(std::shared_ptr const& fabric); private: std::unique_ptr _proto; diff --git a/lib/fabrics/ofi/src/internal/RDMTarget.cpp b/lib/fabrics/ofi/src/internal/RDMTarget.cpp index ea28a0db..b70629c3 100644 --- a/lib/fabrics/ofi/src/internal/RDMTarget.cpp +++ b/lib/fabrics/ofi/src/internal/RDMTarget.cpp @@ -73,34 +73,34 @@ namespace mxl::lib::fabrics::ofi return {std::make_unique(std::move(endpoint), std::move(protocol)), std::move(targetInfo)}; } - RDMTarget::RDMTarget(Endpoint ep, std::unique_ptr ingress) + RDMTarget::RDMTarget(Endpoint ep, std::unique_ptr proto) : _ep(std::move(ep)) - , _protocol(std::move(ingress)) + , _protocol(std::move(proto)) {} - Target::ReadResult RDMTarget::read() + std::optional RDMTarget::readGrain() { - return makeProgress({}); + return readNextGrain({}); } - Target::ReadResult RDMTarget::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional RDMTarget::readGrainBlocking(std::chrono::steady_clock::duration timeout) { - return makeProgress(timeout); + return readNextGrain(timeout); } void RDMTarget::shutdown() {} template - Target::ReadResult RDMTarget::makeProgress(std::chrono::steady_clock::duration timeout) + std::optional RDMTarget::readNextGrain(std::chrono::steady_clock::duration timeout) { auto completion = readCompletionQueue(*_ep.completionQueue(), timeout); if (completion) { - return _protocol->processCompletion(_ep, *completion); + return _protocol->readGrain(_ep, *completion); } - return ReadResult{std::nullopt}; + return {}; } } diff --git a/lib/fabrics/ofi/src/internal/RDMTarget.hpp b/lib/fabrics/ofi/src/internal/RDMTarget.hpp index 771fc945..38308544 100644 --- a/lib/fabrics/ofi/src/internal/RDMTarget.hpp +++ b/lib/fabrics/ofi/src/internal/RDMTarget.hpp @@ -28,15 +28,15 @@ namespace mxl::lib::fabrics::ofi /** \copydoc Target::read() */ - Target::ReadResult read() override; + std::optional readGrain() final; /** \copydoc Target::readBlocking() */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout) override; + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) final; /** \copydoc Target::shutdown() */ - void shutdown() override; + void shutdown() final; private: /** \brief Construct an RDMTarget with the given endpoint and immediate data location. @@ -53,10 +53,11 @@ namespace mxl::lib::fabrics::ofi * \return The result of the read operation. */ template - Target::ReadResult makeProgress(std::chrono::steady_clock::duration timeout); + std::optional readNextGrain(std::chrono::steady_clock::duration timeout); private: Endpoint _ep; std::unique_ptr _protocol = {}; + std::vector _regions; }; } diff --git a/lib/fabrics/ofi/src/internal/Region.cpp b/lib/fabrics/ofi/src/internal/Region.cpp index 8165d499..384b795f 100644 --- a/lib/fabrics/ofi/src/internal/Region.cpp +++ b/lib/fabrics/ofi/src/internal/Region.cpp @@ -9,6 +9,7 @@ #include #include #include +#include "mxl/dataformat.h" #include "mxl/fabrics.h" #include "mxl/flow.h" #include "mxl/mxl.h" @@ -119,6 +120,29 @@ namespace mxl::lib::fabrics::ofi return _layout; } + MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow) + { + auto mxlRegions = mxlFabricsRegionsFromFlow(flow); + + if (mxlIsDiscreteDataFormat(static_cast(flow.flowInfo()->config.common.format))) + { + auto& discreteFlow = static_cast(flow); + + if (mxlRegions._regions.size() != discreteFlow.grainCount()) + { + throw Exception::invalidState("Unexpected number of grains in discrete flow"); + } + + for (std::size_t i = 0; i < discreteFlow.grainCount(); ++i) + { + mxlRegions._regions[i].grainIndexPtr = &discreteFlow.grainAt(i)->header.info.index; + mxlRegions._regions[i].validSlicesPtr = &discreteFlow.grainAt(i)->header.info.validSlices; + } + } + + return mxlRegions; + } + MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow) { static_assert(sizeof(GrainHeader) == 8192, @@ -131,7 +155,7 @@ namespace mxl::lib::fabrics::ofi "GPU memory is not currently supported in the Flow API of MXL. Edit the code below when it is supported"); } - if (mxlIsDiscreteDataFormat(flow.flowInfo()->config.common.format)) + if (mxlIsDiscreteDataFormat(static_cast(flow.flowInfo()->config.common.format))) { auto& discreteFlow = static_cast(flow); std::vector regions; @@ -150,7 +174,7 @@ namespace mxl::lib::fabrics::ofi "GPU memory is not currently supported in the Flow API of MXL. Edit the code below when it is supported"); } - regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, Region::Location::host()); + regions.emplace_back(grainInfoBaseAddr, grainInfoSize + grainPayloadSize, nullptr, nullptr, Region::Location::host()); } // TODO: Add an utility function to retrieve the number of available planes when alpha support is added. @@ -173,4 +197,24 @@ namespace mxl::lib::fabrics::ofi throw Exception::make(MXL_ERR_UNKNOWN, "Unsupported flow fromat {}", flow.flowInfo()->config.common.format); } } + + std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slot) + { + if (slot >= regions.size()) + { + throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slot, regions.size()); + } + + return *regions[slot].grainIndexPtr; + } + + void setValidSlicesForGrain(std::vector const& regions, std::uint16_t slot, std::uint16_t validSlices) + { + if (slot >= regions.size()) + { + throw Exception::invalidArgument("Invalid ring buffer slot number: {}, ring buffer len: {}", slot, regions.size()); + } + + *regions[slot].validSlicesPtr = validSlices; + } } diff --git a/lib/fabrics/ofi/src/internal/Region.hpp b/lib/fabrics/ofi/src/internal/Region.hpp index 18289fe7..5bedb3d2 100644 --- a/lib/fabrics/ofi/src/internal/Region.hpp +++ b/lib/fabrics/ofi/src/internal/Region.hpp @@ -95,9 +95,12 @@ namespace mxl::lib::fabrics::ofi * \param size The size of the memory region in bytes. * \param loc The location of the memory region \see Location. */ - explicit Region(std::uintptr_t base, std::size_t size, Location loc = Location::host()) noexcept + explicit Region(std::uintptr_t base, std::size_t size, std::uint64_t const* grainIndexPtr, std::uint16_t* validSlicesPtr, + Location loc = Location::host()) noexcept : base(base) , size(size) + , grainIndexPtr(grainIndexPtr) + , validSlicesPtr(validSlicesPtr) , loc(loc) , _iovec(iovecFromRegion(base, size)) {} @@ -116,6 +119,8 @@ namespace mxl::lib::fabrics::ofi public: std::uintptr_t base; std::size_t size; + std::uint64_t const* grainIndexPtr; + std::uint16_t* validSlicesPtr; Location loc; private: @@ -199,7 +204,7 @@ namespace mxl::lib::fabrics::ofi public: MxlRegions(std::vector regions, DataLayout dataLayout) : _regions(std::move(regions)) - , _layout(std::move(dataLayout)) + , _layout(dataLayout) {} /** \brief Convert between external and internal versions of this type @@ -219,7 +224,8 @@ namespace mxl::lib::fabrics::ofi DataLayout const& dataLayout() const noexcept; private: - friend MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow); + friend MxlRegions mxlFabricsRegionsFromFlow(FlowData& flow); + friend MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow); private: std::vector _regions; @@ -231,4 +237,18 @@ namespace mxl::lib::fabrics::ofi */ [[nodiscard]] MxlRegions mxlFabricsRegionsFromFlow(FlowData const& flow); + + /** \brief Convert a FlowData's memory regions to MxlRegions. + * FlowData are obtained from an MXL FlowWriter or FlowReader. + */ + [[nodiscard]] + MxlRegions mxlFabricsRegionsFromMutableFlow(FlowData& flow); + + /** \brief + */ + std::uint64_t getGrainIndexInRingSlot(std::vector const& regions, std::uint16_t slotIndex); + + /** \brief + */ + void setValidSlicesForGrain(std::vector const& regions, std::uint16_t slot, std::uint16_t validSlices); } diff --git a/lib/fabrics/ofi/src/internal/Target.cpp b/lib/fabrics/ofi/src/internal/Target.cpp index 00bfd9c4..21017030 100644 --- a/lib/fabrics/ofi/src/internal/Target.cpp +++ b/lib/fabrics/ofi/src/internal/Target.cpp @@ -10,6 +10,7 @@ #include #include "mxl/fabrics.h" #include "Exception.hpp" +#include "ImmData.hpp" #include "LocalRegion.hpp" #include "RCTarget.hpp" #include "RDMTarget.hpp" @@ -35,31 +36,31 @@ namespace mxl::lib::fabrics::ofi return reinterpret_cast(this); } - Target::ReadResult TargetWrapper::read() + std::optional TargetWrapper::readGrain() { if (!_inner) { - throw Exception::invalidState("Target is not set up"); + throw Exception::invalidState("Target is not set up."); } - return _inner->read(); + return _inner->readGrain(); } - Target::ReadResult TargetWrapper::readBlocking(std::chrono::steady_clock::duration timeout) + std::optional TargetWrapper::readGrainBlocking(std::chrono::steady_clock::duration timeout) { if (!_inner) { - throw Exception::invalidState("Target is not set up"); + throw Exception::invalidState("Target is not set up."); } - return _inner->readBlocking(timeout); + return _inner->readGrainBlocking(timeout); } std::unique_ptr TargetWrapper::setup(mxlFabricsTargetConfig const& config) { if (_inner) { - _inner.release(); + _inner.reset(); } switch (config.provider) @@ -84,5 +85,4 @@ namespace mxl::lib::fabrics::ofi throw Exception::invalidArgument("Invalid provider value"); } - } diff --git a/lib/fabrics/ofi/src/internal/Target.hpp b/lib/fabrics/ofi/src/internal/Target.hpp index 3b57c536..f1327c4c 100644 --- a/lib/fabrics/ofi/src/internal/Target.hpp +++ b/lib/fabrics/ofi/src/internal/Target.hpp @@ -19,9 +19,9 @@ namespace mxl::lib::fabrics::ofi public: /** \brief Result of a read operation. */ - struct ReadResult + struct GrainReadResult { - std::optional immData{std::nullopt}; /**< If a transfer was initiated with immediate data, this contains the data. */ + std::uint64_t grainIndex; }; public: @@ -32,13 +32,13 @@ namespace mxl::lib::fabrics::ofi * A non-blocking operation that also drives the connection forward. Continuous invocation of this function is necessary for connection * establishment and ongoing progress. */ - virtual ReadResult read() = 0; + virtual std::optional readGrain() = 0; /** \brief Determine if new data can be consumed. * * A blocking version of read. see read(). */ - virtual ReadResult readBlocking(std::chrono::steady_clock::duration timeout) = 0; + virtual std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout) = 0; /** \brief Shut down the target gracefully. * Initiates a graceful shutdown of the target and blocks until the shutdown is complete. @@ -87,11 +87,11 @@ namespace mxl::lib::fabrics::ofi /** \copydoc Target::read() */ - Target::ReadResult read(); + std::optional readGrain(); /** \copydoc Target::readBlocking(std::chrono::steady_clock::duration) */ - Target::ReadResult readBlocking(std::chrono::steady_clock::duration timeout); + std::optional readGrainBlocking(std::chrono::steady_clock::duration timeout); /** \brief Set up the target with the specified configuration. * diff --git a/lib/internal/include/mxl-internal/FlowWriter.hpp b/lib/internal/include/mxl-internal/FlowWriter.hpp index 37415199..754406e0 100644 --- a/lib/internal/include/mxl-internal/FlowWriter.hpp +++ b/lib/internal/include/mxl-internal/FlowWriter.hpp @@ -22,6 +22,13 @@ namespace mxl::lib [[nodiscard]] uuids::uuid const& getId() const; + /** + * Accessor for the underlying flow data. + * The flow writer must first open the flow before invoking this method. + */ + [[nodiscard]] + virtual FlowData& getFlowData() = 0; + /** * Accessor for the underlying flow data. * The flow writer must first open the flow before invoking this method. diff --git a/lib/internal/src/PosixContinuousFlowWriter.cpp b/lib/internal/src/PosixContinuousFlowWriter.cpp index 1b632172..d9206ed5 100644 --- a/lib/internal/src/PosixContinuousFlowWriter.cpp +++ b/lib/internal/src/PosixContinuousFlowWriter.cpp @@ -28,6 +28,15 @@ namespace mxl::lib } } + FlowData& PosixContinuousFlowWriter::getFlowData() + { + if (_flowData) + { + return *_flowData; + } + throw std::runtime_error("No open flow."); + } + FlowData const& PosixContinuousFlowWriter::getFlowData() const { if (_flowData) diff --git a/lib/internal/src/PosixContinuousFlowWriter.hpp b/lib/internal/src/PosixContinuousFlowWriter.hpp index 85a6125e..a0572d99 100644 --- a/lib/internal/src/PosixContinuousFlowWriter.hpp +++ b/lib/internal/src/PosixContinuousFlowWriter.hpp @@ -32,6 +32,10 @@ namespace mxl::lib PosixContinuousFlowWriter(FlowManager const& manager, uuids::uuid const& flowId, std::unique_ptr&& data); public: + /** \see FlowWriter::getFlowData */ + [[nodiscard]] + virtual FlowData& getFlowData() override; + /** \see FlowWriter::getFlowData */ [[nodiscard]] virtual FlowData const& getFlowData() const override; diff --git a/lib/internal/src/PosixDiscreteFlowWriter.cpp b/lib/internal/src/PosixDiscreteFlowWriter.cpp index e4eed228..b3f8d9ec 100644 --- a/lib/internal/src/PosixDiscreteFlowWriter.cpp +++ b/lib/internal/src/PosixDiscreteFlowWriter.cpp @@ -39,6 +39,15 @@ namespace mxl::lib } } + FlowData& PosixDiscreteFlowWriter::getFlowData() + { + if (_flowData) + { + return *_flowData; + } + throw std::runtime_error("No open flow."); + } + FlowData const& PosixDiscreteFlowWriter::getFlowData() const { if (_flowData) diff --git a/lib/internal/src/PosixDiscreteFlowWriter.hpp b/lib/internal/src/PosixDiscreteFlowWriter.hpp index cdb253fe..efaec904 100644 --- a/lib/internal/src/PosixDiscreteFlowWriter.hpp +++ b/lib/internal/src/PosixDiscreteFlowWriter.hpp @@ -37,6 +37,9 @@ namespace mxl::lib * Accessor for the underlying flow data. * The flow writer must first open the flow before invoking this method. */ + [[nodiscard]] + virtual FlowData& getFlowData() override; + [[nodiscard]] virtual FlowData const& getFlowData() const override; diff --git a/lib/internal/tests/test_domainwatcher.cpp b/lib/internal/tests/test_domainwatcher.cpp index 9c9e7242..f11967dd 100644 --- a/lib/internal/tests/test_domainwatcher.cpp +++ b/lib/internal/tests/test_domainwatcher.cpp @@ -137,6 +137,13 @@ struct MockWriter : mxl::lib::DiscreteFlowWriter } } + [[noreturn]] + virtual FlowData& getFlowData() override + { + // Implementation is not provided + std::terminate(); + } + [[noreturn]] virtual FlowData const& getFlowData() const override { diff --git a/lib/tests/fabrics/ofi/Util.hpp b/lib/tests/fabrics/ofi/Util.hpp index aaf2bbad..cc5ac470 100644 --- a/lib/tests/fabrics/ofi/Util.hpp +++ b/lib/tests/fabrics/ofi/Util.hpp @@ -90,7 +90,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size()); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr, nullptr); } auto mxlRegions = MxlRegions(regions, DataLayout::fromVideo({8, 0, 0, 0})); @@ -103,7 +103,7 @@ namespace mxl::lib::fabrics::ofi std::vector regions; for (auto const& innerRegion : innerRegions) { - regions.emplace_back(*innerRegion.data(), innerRegion.size()); + regions.emplace_back(*innerRegion.data(), innerRegion.size(), nullptr, nullptr); } return {regions, dataLayout}; } diff --git a/lib/tests/fabrics/ofi/test_Region.cpp b/lib/tests/fabrics/ofi/test_Region.cpp index 3a7aa744..495403e8 100644 --- a/lib/tests/fabrics/ofi/test_Region.cpp +++ b/lib/tests/fabrics/ofi/test_Region.cpp @@ -24,8 +24,11 @@ TEST_CASE("ofi: Region constructors", "[ofi][Constructors]") TEST_CASE("ofi: RegionGroup view and iovec conversion", "[ofi][RegionGroup]") { - auto r1 = Region{0x1000, 64, Region::Location::host()}; - auto r2 = Region{0x2000, 128, Region::Location::host()}; + std::uint64_t dummyGrainIndex = 0; + std::uint16_t dummyValidSlices = 0; + + auto r1 = Region{0x1000, 64, &dummyGrainIndex, &dummyValidSlices, Region::Location::host()}; + auto r2 = Region{0x2000, 128, &dummyGrainIndex, &dummyValidSlices, Region::Location::host()}; auto group = RegionGroup({r1, r2}); REQUIRE(group.size() == 2); diff --git a/lib/tests/fabrics/test_basics.cpp b/lib/tests/fabrics/test_basics.cpp index cc39254a..064a0076 100644 --- a/lib/tests/fabrics/test_basics.cpp +++ b/lib/tests/fabrics/test_basics.cpp @@ -98,11 +98,10 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -125,12 +124,10 @@ TEST_CASE("Fabrics connection oriented activation tests", "[fabrics][connected][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrain( - target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -208,11 +205,10 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -235,11 +231,10 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ { // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // make progress on target + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -267,8 +262,6 @@ TEST_CASE("Fabrics connectionless activation tests", "[fabrics][connectionless][ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Grain with flows", "[Fabrics][Transfer][Flows]") { - namespace ofi = mxl::lib::fabrics::ofi; - auto instance = mxlCreateInstance(domain.c_str(), ""); mxlFabricsInstance fabrics; @@ -319,11 +312,10 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -349,7 +341,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -369,7 +361,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressNonBlocking(initiator); - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -391,7 +383,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -411,7 +403,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -452,11 +444,10 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target auto status = mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) @@ -482,7 +473,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -502,7 +493,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressNonBlocking(initiator); - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -524,7 +515,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); do { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) { @@ -544,7 +535,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr do { mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); - auto status = mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(target, 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -576,8 +567,6 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Grain with flows multi target", "[Fabrics][Transfer][Flows][Multi-targets]") { - namespace ofi = mxl::lib::fabrics::ofi; - auto flowDef = mxl::tests::readFile("../data/v210_flow.json"); auto jsonValue = picojson::value{}; REQUIRE(picojson::parse(jsonValue, flowDef).empty()); @@ -643,14 +632,13 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; bool initiatorConnected = false; do { for (auto& target : targets) { - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) { FAIL("Something went wrong in the target: " + std::to_string(status)); @@ -688,7 +676,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressNonBlocking(initiator); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -712,7 +700,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressNonBlocking(initiator); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -742,7 +730,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -766,7 +754,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -820,13 +808,12 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr // try to connect them for 5 seconds auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - std::uint16_t dummyIndex; - std::uint16_t dummyValidSlices; + std::uint64_t dummyIndex; do { for (auto& target : targets) { - auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // make progress on target + auto status = mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // make progress on target if (status != MXL_STATUS_OK && status != MXL_ERR_NOT_READY) { FAIL("Something went wrong in the target: " + std::to_string(status)); @@ -859,7 +846,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex, &dummyValidSlices); // target make progress + mxlFabricsTargetReadGrainNonBlocking(target, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressNonBlocking(initiator); @@ -884,7 +871,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressNonBlocking(initiator); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex, &dummyValidSlices); + auto status = mxlFabricsTargetReadGrainNonBlocking(targets[i], &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); @@ -913,7 +900,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr { for (auto& target : targets) { - mxlFabricsTargetReadGrain(target, &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); // target make progress + mxlFabricsTargetReadGrain(target, 20, &dummyIndex); // target make progress } mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); if (mxlFabricsInitiatorTransferGrain(initiator, 0, 0, 1) == MXL_STATUS_OK) @@ -937,7 +924,7 @@ TEST_CASE_PERSISTENT_FIXTURE(mxl::tests::mxlDomainFixture, "Fabrics: Transfer Gr mxlFabricsInitiatorMakeProgressBlocking(initiator, std::chrono::milliseconds(20).count()); for (size_t i = 0; i < nbTargets; i++) { - auto status = mxlFabricsTargetReadGrain(targets[i], &dummyIndex, &dummyValidSlices, std::chrono::milliseconds(20).count()); + auto status = mxlFabricsTargetReadGrain(targets[i], 20, &dummyIndex); if (status == MXL_ERR_INTERRUPTED) { FAIL("Peer disconnected before the transfer completed"); diff --git a/tools/mxl-fabrics-demo/demo.cpp b/tools/mxl-fabrics-demo/demo.cpp index f853b5a5..93fe23b7 100644 --- a/tools/mxl-fabrics-demo/demo.cpp +++ b/tools/mxl-fabrics-demo/demo.cpp @@ -101,7 +101,7 @@ class AppInitator } } - mxlStatus setup(std::string targetInfoStr) + mxlStatus setup(std::string const& targetInfoStr) { _instance = mxlCreateInstance(_config.domain.c_str(), ""); if (_instance == nullptr) @@ -217,7 +217,7 @@ class AppInitator continue; } else if (ret == MXL_ERR_OUT_OF_RANGE_TOO_EARLY) - { + { // NOLINT(bugprone-branch-clone): Repeated for clarity. // We are too early somehow.. retry the same grain later. continue; } @@ -478,14 +478,13 @@ class AppTarget mxlStatus run() { mxlGrainInfo grainInfo; - uint16_t entryIndex = 0; - uint16_t validSlices = 0; - uint8_t* dummyPayload; + std::uint64_t grainIndex = 0; + std::uint8_t* dummyPayload; mxlStatus status; while (!g_exit_requested) { - status = targetReadGrain(&entryIndex, &validSlices, std::chrono::milliseconds(200)); + status = targetReadGrain(&grainIndex, std::chrono::milliseconds(200)); if (status == MXL_ERR_TIMEOUT) { // No completion before a timeout was triggered, most likely a problem upstream. @@ -506,15 +505,6 @@ class AppTarget return status; } - status = mxlFlowWriterGetGrainInfo(_writer, entryIndex, &grainInfo); - if (status != MXL_STATUS_OK) - { - MXL_ERROR("Failed to get grain info with status '{}'", static_cast(status)); - return status; - } - - std::uint64_t grainIndex = grainInfo.index; - // Here we open so that we can commit, we are not going to modify the grain as it was already modified by the initiator. status = mxlFlowWriterOpenGrain(_writer, grainIndex, &grainInfo, &dummyPayload); if (status != MXL_STATUS_OK) @@ -523,9 +513,6 @@ class AppTarget return status; } - // Now that the grain is open, we can se the valid slices. - grainInfo.validSlices = validSlices; - // GrainInfo and media payload was already written by the remote endpoint, we simply commit!. status = mxlFlowWriterCommitGrain(_writer, &grainInfo); if (status != MXL_STATUS_OK) @@ -537,7 +524,7 @@ class AppTarget MXL_DEBUG("Comitted grain with index={} current index={} validSlices={} flags={}", grainIndex, mxlGetCurrentIndex(&_configInfo.common.grainRate), - validSlices, + grainInfo.validSlices, grainInfo.flags); } @@ -545,16 +532,15 @@ class AppTarget } private: - mxlStatus targetReadGrain(std::uint16_t* entryIndex, std::uint16_t* validSlices, std::chrono::steady_clock::duration timeout) + mxlStatus targetReadGrain(std::uint64_t* grainIndex, std::chrono::steady_clock::duration timeout) { if (_config.provider == MXL_FABRICS_PROVIDER_EFA) { - return mxlFabricsTargetReadGrainNonBlocking(_target, entryIndex, validSlices); + return mxlFabricsTargetReadGrainNonBlocking(_target, grainIndex); } else { - return mxlFabricsTargetReadGrain( - _target, entryIndex, validSlices, std::chrono::duration_cast(timeout).count()); + return mxlFabricsTargetReadGrain(_target, std::chrono::duration_cast(timeout).count(), grainIndex); } }