diff --git a/cpp/mcap/include/mcap/writer.hpp b/cpp/mcap/include/mcap/writer.hpp index 2349b6a32f..883e88beae 100644 --- a/cpp/mcap/include/mcap/writer.hpp +++ b/cpp/mcap/include/mcap/writer.hpp @@ -7,6 +7,11 @@ #include #include #include +#include +#include +#include +#include +#include // Forward declaration #ifndef MCAP_COMPRESSION_NO_ZSTD @@ -104,6 +109,8 @@ struct MCAP_PUBLIC McapWriterOptions { bool noStatistics = false; bool noSummaryOffsets = false; + bool noIoUring = false; + McapWriterOptions(const std::string_view profile) : profile(profile) {} }; @@ -167,7 +174,8 @@ class MCAP_PUBLIC FileWriter final : public IWritable { public: ~FileWriter() override; - Status open(std::string_view filename); + Status openStd(std::string_view filename); + Status openUring(std::string_view filename); void handleWrite(const std::byte* data, uint64_t size) override; void end() override; @@ -175,8 +183,36 @@ class MCAP_PUBLIC FileWriter final : public IWritable { uint64_t size() const override; private: + bool isUring_ = false; std::FILE* file_ = nullptr; uint64_t size_ = 0; + + int fd_=-1; + struct io_uring ring_; + bool ringInited_ = false; + void* buf_ = nullptr; + void* bufpong_ = nullptr; + uint64_t writeOffset_ = 0; + static constexpr int QUEUE_DEPTH = 128; + static constexpr size_t BUFFER_SIZE = 140 * 1024 * 1024; + static constexpr size_t IORING_BUFF_SIZE = 100 * 1024 * 1024; + static constexpr size_t ALIGNMENT = 512; + std::mutex mtx; + std::condition_variable cv; + bool ready= false; + std::atomic running{true}; + int final_footer_count= 0; + bool final_footer= false; + bool ping_buffer_used = true; + std::thread workerThread_; + uint64_t aligned_blocks= 0; + void uringClose(); + void handleWriteStd(const std::byte* data, uint64_t size); + void handleWriteUring(const std::byte* data, uint64_t size); + void worker_thread(); + void* getActiveBuffer(); + void switchBuffer(); + void* getInactiveBuffer(); }; /** diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index f925596c24..9aa5be0b96 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -11,8 +11,57 @@ # include #endif + +#if defined(__aarch64__) + #define MCAP_ARM64 +#endif + +#include +#include +#include // setpriority, PRIO_PROCESS + +#include +#include +#include + +#ifdef MCAP_ARM64 +#include +#endif + +#include +#include +#include +#include + namespace mcap { + +#ifdef MCAP_ARM64 + void neon_memcpy(uint8_t* dst, const uint8_t* src, size_t size) { + size_t i = 0; + // Copy 16 bytes (128 bits) at a time + for (; i + 15 < size; i += 16) { + uint8x16_t data = vld1q_u8(src + i); + vst1q_u8(dst + i, data); + } + // Copy remaining bytes + for (; i < size; ++i) { + dst[i] = src[i]; + } + } + // void neon_memset(uint8_t* dst, uint8_t value, size_t size) { + // size_t i = 0; + // uint8x16_t val = vdupq_n_u8(value); + // for (; i + 15 < size; i += 16) { + // vst1q_u8(dst + i, val); + // } + // for (; i < size; ++i) { + // dst[i] = value; + // } + // } +#endif + + // IWritable /////////////////////////////////////////////////////////////////// IWritable::IWritable() noexcept @@ -38,12 +87,290 @@ void IWritable::resetCrc() { } // FileWriter ////////////////////////////////////////////////////////////////// + + uint64_t mcap_buffsize=0; + bool io_uring_=false; + Status FileWriter::openUring(std::string_view filename) { + end(); + fd_ = ::open(filename.data(), O_CREAT |O_RDWR | O_TRUNC | O_DIRECT, 0644); + if (fd_ < 0) { + perror("open"); + } + + if (posix_memalign(&buf_, ALIGNMENT, BUFFER_SIZE) != 0) { + perror("posix_memalign"); + close(fd_); + } + + if (posix_memalign(&bufpong_, ALIGNMENT,BUFFER_SIZE) != 0) { + perror("posix_memalign mcapbuf_"); + free(buf_); // Clean up buf_ + close(fd_); + } + int ret = io_uring_queue_init(QUEUE_DEPTH, &ring_, 0); + if (ret < 0) { + std::cerr << "io_uring_queue_init failed: " << strerror(-ret) << "\n"; + close(fd_); + } + ringInited_ = true; + + // Register buffer + struct iovec iovecs[2]; + iovecs[0].iov_base = buf_; + iovecs[0].iov_len = BUFFER_SIZE; + + iovecs[1].iov_base = bufpong_; + iovecs[1].iov_len = BUFFER_SIZE; + ret = io_uring_register_buffers(&ring_, iovecs, 2); + if (ret < 0) { + std::cerr << "io_uring_register_buffers failed: " << strerror(-ret) << "\n"; + close(fd_); + } + + // Register file descriptor + ret = io_uring_register_files(&ring_, &fd_, 1); + if (ret < 0) { + std::cerr << "io_uring_register_files failed: " << strerror(-ret) << "\n"; + close(fd_); + } + + size_ = 0; + writeOffset_ = 0; + isUring_ = true; + running=true; + ready=false; + mcap_buffsize=0; + ping_buffer_used=true; + std::thread t(&FileWriter::worker_thread,this); + workerThread_ = std::move(t); // Store in a class member + return StatusCode::Success; + } + + + void FileWriter::worker_thread() { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + pid_t tid = gettid(); // get thread ID (Linux-specific) + if (setpriority(PRIO_PROCESS, tid, -18) != 0) { + perror("setpriority failed"); + } else { + std::cout << " Nice value set to " << std::endl; + } + + std::unique_lock lock(mtx); // ✅ This must be defined + while(running) + { + cv.wait(lock, [this] { return ready; }); // Wait until ready is + io_uring_cqe* cqe; + if (io_uring_wait_cqe(&ring_, &cqe) < 0) { + std::cerr << "io_uring_wait_cqe failed\n"; + continue; + } + if (cqe->res < 0) { + std::cerr << "Async write error: " << strerror(-cqe->res) << "\n"; + } else { + //std::cout << " io_ring Write completed: " << cqe->res << " bytes\n"; + } + io_uring_cqe_seen(&ring_, cqe); + ready=false; + } + std::cout << "Detached thread received signal. Proceeding..." << std::endl; + } + inline void* FileWriter::getActiveBuffer() { + return ping_buffer_used ? buf_ : bufpong_; + } + + inline void FileWriter::switchBuffer() { + ping_buffer_used = !ping_buffer_used; + } + + inline void* FileWriter::getInactiveBuffer() { + return ping_buffer_used ? bufpong_ : buf_; + } + + // Your requested handleWriteUring equivalent + void FileWriter::handleWriteUring(const std::byte* data, uint64_t size) { + if (!ringInited_ || !data || size == 0) + return; + + // Append incoming data to internal MCAP buffer if size is reasonable + if (mcap_buffsize + size > BUFFER_SIZE) { + std::cerr << "Error: Buffer overflow risk. Data size exceeds allocated buffer.\n"; + return; + } + void* activeBuf = getActiveBuffer(); + +#if defined(__aarch64__) + neon_memcpy(static_cast(activeBuf) + mcap_buffsize, + reinterpret_cast(data), + size); + +#else + + memcpy(static_cast(activeBuf) + mcap_buffsize, + reinterpret_cast(data), + size); +#endif + mcap_buffsize += size; + + // Check for MCAP magic footer (assuming Magic is a byte array and sizeof(Magic) is valid) + if (size == sizeof(Magic) && memcmp(data, Magic, sizeof(Magic)) == 0) { + final_footer_count++; + if (final_footer_count == 2) { + final_footer = true; + final_footer_count=0; + } + } + + // Lambda to submit a fixed buffer write using io_uring + auto submit_fixed_write = [&](void* buf, size_t bufSize, off_t offset, int bufIndex) -> bool { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + std::cerr << "io_uring_get_sqe failed\n"; + return false; + } + io_uring_prep_write(sqe, 0, buf,IORING_BUFF_SIZE, offset); + sqe->flags |= IOSQE_FIXED_FILE; + return true; + }; + + if (mcap_buffsize >= IORING_BUFF_SIZE) { + // Align mcap_buffsize down to multiple of ALIGNMENT + size_t remain_mcapbytes = mcap_buffsize -IORING_BUFF_SIZE; + + void* activeBuf = getActiveBuffer(); + if(ping_buffer_used) + { + if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 0)) + return; + } + else + { + if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 1)) + return; + } + + int ret = io_uring_submit(&ring_); + if (ret < 0) { + std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n"; + return; + } + { + std::lock_guard lock(mtx); + ready = true; + } + cv.notify_one(); // response thread + + if (remain_mcapbytes > 0) { + void* InactiveBuf = getInactiveBuffer(); + void* activeBuf = getActiveBuffer(); + std::memcpy(reinterpret_cast(InactiveBuf), + reinterpret_cast(activeBuf) +IORING_BUFF_SIZE, + remain_mcapbytes); + switchBuffer(); + } + mcap_buffsize = remain_mcapbytes; + writeOffset_ +=IORING_BUFF_SIZE; + size_ += IORING_BUFF_SIZE; + } + + if (final_footer) { + // Align mcap_buffsize up to next multiple of ALIGNMENT + size_t aligned_size = (mcap_buffsize + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1); + size_t appended_bytes = aligned_size - mcap_buffsize; + // Submit final footer write + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + std::cerr << "io_uring_get_sqe failed\n"; + return; + } + void* activeBuf = getActiveBuffer(); + io_uring_prep_write(sqe, 0, activeBuf,aligned_size, writeOffset_); + sqe->flags |= IOSQE_FIXED_FILE; + + int ret = io_uring_submit(&ring_); + if (ret < 0) { + std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n"; + return; + } else if (ret == 0) { + std::cerr << "io_uring_submit submitted 0 entries, nothing queued\n"; + return; + } else { + std::cout << "Submitted " << ret << " sqe(s) successfully\n"; + } + + // Wait for completion + struct io_uring_cqe* cqe; + if (io_uring_wait_cqe(&ring_, &cqe) < 0) { + std::cerr << "io_uring_wait_cqe failed\n"; + return; + } + if (cqe->res < 0) { + std::cerr << "Async write error 0001: " << strerror(-cqe->res) << "\n"; + } else { + std::cout << "Write completed: " << cqe->res << " bytes\n"; + } + io_uring_cqe_seen(&ring_, cqe); + + writeOffset_ += aligned_size; + + // Truncate file to remove padded bytes + struct stat st; + if (fstat(fd_, &st) < 0) { + perror("fstat"); + return; + } + off_t current_size = st.st_size; + off_t new_size = current_size - appended_bytes; + + if (new_size < 0) { + std::cerr << "Cannot remove more bytes than file size\n"; + return; + } + if (ftruncate(fd_, new_size) < 0) { + perror("ftruncate"); + return; + } else { + std::cout << "File truncated to " << new_size << " bytes\n"; + } + std::cout << " after current_size " << current_size << " bytes\n"; + final_footer = false; + } + + } + + +void FileWriter::uringClose() { + if (ringInited_) { + io_uring_unregister_buffers(&ring_); + io_uring_unregister_files(&ring_); + io_uring_queue_exit(&ring_); + ringInited_ = false; + } + if (fd_ >= 0) { + ::close(fd_); + fd_ = -1; + } + if (buf_) { + free(buf_); + buf_ = nullptr; + } + writeOffset_ = 0; + running = false; + final_footer_count=0; + final_footer=false; + cv.notify_all(); + if (workerThread_.joinable()) workerThread_.join(); +} + FileWriter::~FileWriter() { end(); } -Status FileWriter::open(std::string_view filename) { +Status FileWriter::openStd(std::string_view filename) { end(); file_ = std::fopen(filename.data(), "wb"); if (!file_) { @@ -54,6 +381,13 @@ Status FileWriter::open(std::string_view filename) { } void FileWriter::handleWrite(const std::byte* data, uint64_t size) { + if (isUring_) + handleWriteUring(data, size); + else + handleWriteStd(data, size); +} + +void FileWriter::handleWriteStd(const std::byte* data, uint64_t size) { assert(file_); const size_t written = std::fwrite(data, 1, size, file_); (void)written; @@ -72,7 +406,14 @@ void FileWriter::end() { std::fclose(file_); file_ = nullptr; } + + if (isUring_) { + uringClose(); + close(fd_); + } + size_ = 0; + isUring_ = false; } uint64_t FileWriter::size() const { @@ -343,7 +684,8 @@ Status McapWriter::open(const std::string_view filename, const McapWriterOptions // If the writer was opened, close it first close(); fileOutput_ = std::make_unique(); - const auto status = fileOutput_->open(filename); + io_uring_=!(options.noIoUring); + const auto status = options.noIoUring ? fileOutput_->openStd(filename) : fileOutput_->openUring(filename); if (!status.ok()) { fileOutput_.reset(); return status; @@ -387,20 +729,17 @@ void McapWriter::close() { ByteOffset summaryStart = 0; ByteOffset summaryOffsetStart = 0; - if (!options_.noSummary) { // Get the offset of the End Of File section - summaryStart = fileOutput.size(); - - ByteOffset schemaStart = fileOutput.size(); + summaryStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); + ByteOffset schemaStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noRepeatedSchemas) { // Write all schema records for (const auto& schemaId : writtenSchemas_) { write(fileOutput, schemas_[schemaId - 1]); } } - - ByteOffset channelStart = fileOutput.size(); + ByteOffset channelStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noRepeatedChannels) { // Write all channel records, but only if they appeared in this file auto& channelMessageCounts = statistics_.channelMessageCounts; @@ -410,40 +749,36 @@ void McapWriter::close() { } } } - - ByteOffset statisticsStart = fileOutput.size(); + ByteOffset statisticsStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noStatistics) { // Write the statistics record write(fileOutput, statistics_); } - - ByteOffset chunkIndexStart = fileOutput.size(); + ByteOffset chunkIndexStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noChunkIndex) { // Write chunk index records for (const auto& chunkIndexRecord : chunkIndex_) { write(fileOutput, chunkIndexRecord); } } - - ByteOffset attachmentIndexStart = fileOutput.size(); + ByteOffset attachmentIndexStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noAttachmentIndex) { // Write attachment index records for (const auto& attachmentIndexRecord : attachmentIndex_) { write(fileOutput, attachmentIndexRecord); } } - - ByteOffset metadataIndexStart = fileOutput.size(); + ByteOffset metadataIndexStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noMetadataIndex) { // Write metadata index records for (const auto& metadataIndexRecord : metadataIndex_) { write(fileOutput, metadataIndexRecord); } } - + uint64_t actual_filesize=(io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noSummaryOffsets) { // Write summary offset records - summaryOffsetStart = fileOutput.size(); + summaryOffsetStart = (io_uring_ > 0) ? fileOutput.size() + mcap_buffsize : fileOutput.size(); if (!options_.noRepeatedSchemas && !writtenSchemas_.empty()) { write(fileOutput, SummaryOffset{OpCode::Schema, schemaStart, channelStart - schemaStart}); } @@ -467,8 +802,9 @@ void McapWriter::close() { write(fileOutput, SummaryOffset{OpCode::MetadataIndex, metadataIndexStart, summaryOffsetStart - metadataIndexStart}); } - } else if (summaryStart == fileOutput.size()) { + } else if (summaryStart == actual_filesize) { // No summary records were written + std::cout<<"no summary records were wruttern "<