Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions lib/fabrics/include/mxl/fabrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,21 @@ extern "C"
/**
* Non-blocking accessor for a flow grain at a specific index.
* \param in_target A valid fabrics target
* \param out_entryIndex The index of the grain ring buffer index that is ready, if any.
* \param out_sliceIndex The last valid slice index that is ready, if any.
* \param out_grainIndex The index of the grain that was written, if any.
* \return The result code. MXL_ERR_NOT_READY if no grain was available at the time of the call, and the call should be retried. \see mxlStatus
*/
MXL_EXPORT
mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex);
mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex);

/**
* Blocking accessor for a flow grain at a specific index.
* \param in_target A valid fabrics target
* \param out_entryIndex The index of the grain ring buffer index that is ready, if any.
* \param out_sliceIndex The last valid slice index that is ready, if any.
* \param out_grainIndex The index of the grain that was written, if any.
* \param in_timeoutMs How long should we wait for the grain (in milliseconds)
* \return The result code. MXL_ERR_NOT_READY if no grain was available before the timeout. \see mxlStatus
*/
MXL_EXPORT
mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs);
mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_entryIndex);

/**
* Create a fabrics initiator instance.
Expand Down
39 changes: 16 additions & 23 deletions lib/fabrics/ofi/src/fabrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <mxl/mxl.h>
#include "internal/Exception.hpp"
#include "internal/FabricInstance.hpp"
#include "internal/ImmData.hpp"
#include "internal/Initiator.hpp"
#include "internal/Provider.hpp"
#include "internal/Region.hpp"
Expand Down Expand Up @@ -99,7 +98,7 @@ mxlStatus mxlFabricsRegionsForFlowWriter(mxlFlowWriter in_writer, mxlFabricsRegi
auto writer = ::mxl::lib::to_FlowWriter(in_writer);

// We are leaking the ownership, the user is responsible for calling mxlFabricsRegionsFree to free the memory.
auto regionPtr = std::make_unique<ofi::MxlRegions>(ofi::mxlFabricsRegionsFromFlow(writer->getFlowData())).release();
auto regionPtr = std::make_unique<ofi::MxlRegions>(ofi::mxlFabricsRegionsFromMutableFlow(writer->getFlowData())).release();

*out_regions = regionPtr->toAPI();

Expand Down Expand Up @@ -225,53 +224,48 @@ mxlStatus mxlFabricsTargetSetup(mxlFabricsTarget in_target, mxlFabricsTargetConf
}

extern "C" MXL_EXPORT
mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex)
mxlStatus mxlFabricsTargetReadGrainNonBlocking(mxlFabricsTarget in_target, uint64_t* out_grainIndex)
{
if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr))
if ((in_target == nullptr) || (out_grainIndex == nullptr))
{
return MXL_ERR_INVALID_ARG;
}

return ofi::try_run(
[&]()
{
auto target = ofi::TargetWrapper::fromAPI(in_target);
if (auto res = target->read(); res.immData)
auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrain();
if (!res)
{
auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack();
*out_entryIndex = entryIndex;
*out_sliceIndex = sliceIndex;

return MXL_STATUS_OK;
MXL_INFO("not ready");
return MXL_ERR_NOT_READY;
}

return MXL_ERR_NOT_READY;
*out_grainIndex = res->grainIndex;
return MXL_STATUS_OK;
},
"Failed to try for new grain");
}

extern "C" MXL_EXPORT
mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t* out_entryIndex, uint16_t* out_sliceIndex, uint16_t in_timeoutMs)
mxlStatus mxlFabricsTargetReadGrain(mxlFabricsTarget in_target, uint16_t in_timeoutMs, uint64_t* out_grainIndex)
{
if ((in_target == nullptr) || (out_entryIndex == nullptr) || (out_sliceIndex == nullptr))
if ((in_target == nullptr) || (out_grainIndex == nullptr))
{
return MXL_ERR_INVALID_ARG;
}

return ofi::try_run(
[&]()
{
auto target = ofi::TargetWrapper::fromAPI(in_target);
if (auto res = target->readBlocking(std::chrono::milliseconds(in_timeoutMs)); res.immData)
auto res = ofi::TargetWrapper::fromAPI(in_target)->readGrainBlocking(std::chrono::milliseconds(in_timeoutMs));
if (!res)
{
auto [entryIndex, sliceIndex] = ofi::ImmDataGrain{*res.immData}.unpack();
*out_entryIndex = entryIndex;
*out_sliceIndex = sliceIndex;

return MXL_STATUS_OK;
return MXL_ERR_NOT_READY;
}

return MXL_ERR_NOT_READY;
*out_grainIndex = res->grainIndex;
return MXL_STATUS_OK;
},
"Failed to wait for new grain");
}
Expand Down Expand Up @@ -523,7 +517,6 @@ mxlStatus mxlFabricsTargetInfoToString(mxlFabricsTargetInfo const in_targetInfo,
return ofi::try_run(
[&]()
{
std::stringstream ss;
auto targetInfoString = ofi::TargetInfo::fromAPI(in_targetInfo)->toJSON();

if (out_string == nullptr)
Expand Down
202 changes: 102 additions & 100 deletions lib/fabrics/ofi/src/internal/FILogging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,138 +20,140 @@ namespace mxl::lib::fabrics::ofi
{
namespace
{
constexpr static auto fiLogLevelStrings = std::array<std::pair<std::string_view, ::fi_log_level>, 4>{
constexpr auto fiLogLevelStrings = std::array<std::pair<std::string_view, ::fi_log_level>, 4>{
{{"trace", FI_LOG_TRACE}, {"debug", FI_LOG_DEBUG}, {"info", FI_LOG_INFO}, {"warn", FI_LOG_WARN}}
};
}

int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags);
int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags, std::uint64_t* showtime);
void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn);
static int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags);
static int fiLogReady(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t flags,
std::uint64_t* showtime);
void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn);

class FILogging
{
public:
void init()
class FILogging
{
// It is safe to call a logging functions on the libfabric side
// while we initialize logging here. We just need to make sure
// to initialize logging only once.
if (_isInit.exchange(true, std::memory_order_relaxed))
{
return;
}

auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL");
if (fiLogLevelCStr != nullptr)
public:
void init()
{
auto fiLogLevelStr = std::string{fiLogLevelCStr};
std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](char const c) -> char { return std::tolower(c); });
auto it = std::ranges::find_if(fiLogLevelStrings,
[fiLogLevelStr](std::pair<std::string_view, ::fi_log_level> const& item) { return fiLogLevelStr == item.first; });
if (it != fiLogLevelStrings.end())
// It is safe to call a logging functions on the libfabric side
// while we initialize logging here. We just need to make sure
// to initialize logging only once.
if (_isInit.exchange(true, std::memory_order_relaxed))
{
_level = it->second;
return;
}
else

auto fiLogLevelCStr = ::getenv("FI_LOG_LEVEL");
if (fiLogLevelCStr != nullptr)
{
_level = FI_LOG_WARN;
auto fiLogLevelStr = std::string{fiLogLevelCStr};
std::ranges::transform(fiLogLevelStr, fiLogLevelStr.begin(), [](unsigned char const c) { return std::tolower(c); });
auto it = std::ranges::find_if(fiLogLevelStrings,
[fiLogLevelStr](std::pair<std::string_view, ::fi_log_level> const& item) { return fiLogLevelStr == item.first; });
if (it != fiLogLevelStrings.end())
{
_level = it->second;
}
else
{
_level = FI_LOG_WARN;
}
}

auto ops = ::fi_ops_log{
.size = sizeof(::fi_ops_log),
.enabled = &fiLogEnabled,
.ready = &fiLogReady,
.log = &fiLog,
};

auto logging = ::fid_logging{
.fid = ::fid{},
.ops = &ops,
};

fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging);
}

auto ops = ::fi_ops_log{
.size = sizeof(::fi_ops_log),
.enabled = &fiLogEnabled,
.ready = &fiLogReady,
.log = &fiLog,
};
private:
friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t);

[[nodiscard]]
fi_log_level level() const noexcept
{
return _level;
}

auto logging = ::fid_logging{
.fid = ::fid{},
.ops = &ops,
};
[[nodiscard]]
bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept
{
return true;
}

fiCall(::fi_import_log, "Failed to initialize logging", fiVersion(), 0, &logging);
}
[[nodiscard]]
bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept
{
return true;
}

private:
friend int fiLogEnabled(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t);
std::atomic_bool _isInit;
::fi_log_level _level;
};

[[nodiscard]]
fi_log_level level() const noexcept
{
return _level;
}
FILogging logging;

[[nodiscard]]
bool isProviderLoggingEnabled(const struct fi_provider*) const noexcept
char const* fiLogSubsystemName(enum fi_log_subsys subsys)
{
return true;
switch (subsys)
{
case FI_LOG_CORE: return "core";
case FI_LOG_FABRIC: return "fabric";
case FI_LOG_DOMAIN: return "domain";
case FI_LOG_EP_CTRL: return "ep_ctrl";
case FI_LOG_EP_DATA: return "ep_data";
case FI_LOG_AV: return "av";
case FI_LOG_CQ: return "cq";
case FI_LOG_EQ: return "eq";
case FI_LOG_MR: return "mr";
case FI_LOG_CNTR: return "cntr";
default: return "";
}
}

[[nodiscard]]
bool isSubsystemLoggingEnabled(enum fi_log_subsys) const noexcept
spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level)
{
return true;
switch (level)
{
case FI_LOG_WARN: return spdlog::level::warn;
case FI_LOG_TRACE: return spdlog::level::trace;
case FI_LOG_INFO: return spdlog::level::info;
case FI_LOG_DEBUG: return spdlog::level::debug;
default: return spdlog::level::warn;
}
}

std::atomic_bool _isInit;
::fi_log_level _level;
};

static FILogging logging;

char const* fiLogSubsystemName(enum fi_log_subsys subsys)
{
switch (subsys)
int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t)
{
case FI_LOG_CORE: return "core";
case FI_LOG_FABRIC: return "fabric";
case FI_LOG_DOMAIN: return "domain";
case FI_LOG_EP_CTRL: return "ep_ctrl";
case FI_LOG_EP_DATA: return "ep_data";
case FI_LOG_AV: return "av";
case FI_LOG_CQ: return "cq";
case FI_LOG_EQ: return "eq";
case FI_LOG_MR: return "mr";
case FI_LOG_CNTR: return "cntr";
default: return "";
return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) &&
logging.isSubsystemLoggingEnabled(subsys);
}
}

spdlog::level::level_enum fiLevelToSpdlogLevel(enum fi_log_level level)
{
switch (level)
int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*)
{
case FI_LOG_WARN: return spdlog::level::warn;
case FI_LOG_TRACE: return spdlog::level::trace;
case FI_LOG_INFO: return spdlog::level::info;
case FI_LOG_DEBUG: return spdlog::level::debug;
default: return spdlog::level::warn;
return 0;
}
}

int fiLogEnabled(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, std::uint64_t)
{
return (level <= logging.level() || logging.level() == FI_LOG_TRACE) && logging.isProviderLoggingEnabled(prov) &&
logging.isSubsystemLoggingEnabled(subsys);
}

int fiLogReady(const struct fi_provider*, enum fi_log_level, enum fi_log_subsys, std::uint64_t, std::uint64_t*)
{
return 0;
}

void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn)
{
auto msg = std::string{msgIn};
if (msg.ends_with('\n'))
void fiLog(const struct fi_provider* prov, enum fi_log_level level, enum fi_log_subsys subsys, char const*, int line, char const* msgIn)
{
msg.pop_back();
auto msg = std::string{msgIn};
if (msg.ends_with('\n'))
{
msg.pop_back();
}

spdlog::log(
{fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg);
}

spdlog::log(
{fmt::format("libfabric:{}:{}", fiLogSubsystemName(subsys), prov->name).c_str(), line, ""}, fiLevelToSpdlogLevel(level), "{}", msg);
}

// Initialize logging
Expand Down
2 changes: 1 addition & 1 deletion lib/fabrics/ofi/src/internal/GrainSlices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace mxl::lib::fabrics::ofi
return size;
}

std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept
std::uint32_t SliceRange::transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept
{
if (_start == 0)
{
Expand Down
2 changes: 1 addition & 1 deletion lib/fabrics/ofi/src/internal/GrainSlices.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace mxl::lib::fabrics::ofi
* \note When start is 0, the offset is 0, because we include the header in the transfer.
*/
[[nodiscard]]
std::uint32_t transferOffset(std::uint32_t payloadOffset, std::int32_t sliceSize) const noexcept;
std::uint32_t transferOffset(std::uint32_t payloadOffset, std::uint32_t sliceSize) const noexcept;

private:
SliceRange(std::uint16_t start, std::uint16_t end) noexcept
Expand Down
2 changes: 1 addition & 1 deletion lib/fabrics/ofi/src/internal/ImmData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi
ImmDataGrain::ImmDataGrain(std::uint64_t index, std::uint16_t sliceIndex) noexcept
{
auto ringBufferIndex = static_cast<std::uint16_t>(index);
_inner = std::bit_cast<std::uint32_t>(Unpacked{.ringBufferIndex = ringBufferIndex, .sliceIndex = sliceIndex});
_inner = std::bit_cast<std::uint32_t>(Unpacked{.ringBufferSlot = ringBufferIndex, .sliceIndex = sliceIndex});
}

ImmDataGrain::Unpacked ImmDataGrain::unpack() const noexcept
Expand Down
2 changes: 1 addition & 1 deletion lib/fabrics/ofi/src/internal/ImmData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace mxl::lib::fabrics::ofi
public:
struct Unpacked //**< Unpacked representation of immediate data. */
{
std::uint16_t ringBufferIndex;
std::uint16_t ringBufferSlot;
std::uint16_t sliceIndex;
};

Expand Down
Loading
Loading