Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion cpp/mcap/include/mcap/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
#include <string>
#include <unordered_set>
#include <vector>
#include <liburing.h>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>

// Forward declaration
#ifndef MCAP_COMPRESSION_NO_ZSTD
Expand Down Expand Up @@ -104,6 +109,8 @@ struct MCAP_PUBLIC McapWriterOptions {
bool noStatistics = false;
bool noSummaryOffsets = false;

bool noIoUring = false;

McapWriterOptions(const std::string_view profile)
: profile(profile) {}
};
Expand Down Expand Up @@ -167,16 +174,45 @@ class MCAP_PUBLIC FileWriter final : public IWritable {
public:
~FileWriter() override;

Status open(std::string_view filename);
Status openStd(std::string_view filename);
Status openUring(std::string_view filename);

void handleWrite(const std::byte* data, uint64_t size) override;
void end() override;
void flush() override;
uint64_t size() const override;

private:
bool isUring_ = false;
std::FILE* file_ = nullptr;
uint64_t size_ = 0;

int fd_=-1;
struct io_uring ring_;
bool ringInited_ = false;
void* buf_ = nullptr;
void* bufpong_ = nullptr;
uint64_t writeOffset_ = 0;
static constexpr int QUEUE_DEPTH = 128;
static constexpr size_t BUFFER_SIZE = 140 * 1024 * 1024;
static constexpr size_t IORING_BUFF_SIZE = 100 * 1024 * 1024;
static constexpr size_t ALIGNMENT = 512;
std::mutex mtx;
std::condition_variable cv;
bool ready= false;
std::atomic<bool> running{true};
int final_footer_count= 0;
bool final_footer= false;
bool ping_buffer_used = true;
std::thread workerThread_;
uint64_t aligned_blocks= 0;
void uringClose();
void handleWriteStd(const std::byte* data, uint64_t size);
void handleWriteUring(const std::byte* data, uint64_t size);
void worker_thread();
void* getActiveBuffer();
void switchBuffer();
void* getInactiveBuffer();
};

/**
Expand Down
Loading