diff --git a/include/elio/io/io_awaitables.hpp b/include/elio/io/io_awaitables.hpp index bd17d64..ae14475 100644 --- a/include/elio/io/io_awaitables.hpp +++ b/include/elio/io/io_awaitables.hpp @@ -306,6 +306,43 @@ class async_close_awaitable : public io_awaitable_base { int fd_; }; +/// Awaitable for async writev operations (scatter-gather write) +class async_writev_awaitable : public io_awaitable_base { +public: + async_writev_awaitable(io_context& ctx, int fd, struct iovec* iovecs, + size_t iovec_count) noexcept + : io_awaitable_base(ctx) + , fd_(fd) + , iovecs_(iovecs) + , iovec_count_(iovec_count) {} + + void await_suspend(std::coroutine_handle<> awaiter) { + io_request req{}; + req.op = io_op::writev; + req.fd = fd_; + req.iovecs = iovecs_; + req.iovec_count = iovec_count_; + req.awaiter = awaiter; + + if (!ctx_.prepare(req)) { + result_ = io_result{-EAGAIN, 0}; + awaiter.resume(); + return; + } + ctx_.submit(); + } + + io_result await_resume() noexcept { + result_ = io_context::get_last_result(); + return result_; + } + +private: + int fd_; + struct iovec* iovecs_; + size_t iovec_count_; +}; + /// Awaitable for poll (wait for socket readable/writable) class async_poll_awaitable : public io_awaitable_base { public: @@ -385,6 +422,12 @@ inline auto async_send(io_context& ctx, int fd, const void* buffer, return async_send_awaitable(ctx, fd, buffer, length, flags); } +/// Create an async writev awaitable (scatter-gather write) +inline auto async_writev(io_context& ctx, int fd, struct iovec* iovecs, + size_t iovec_count) { + return async_writev_awaitable(ctx, fd, iovecs, iovec_count); +} + /// Create an async accept awaitable inline auto async_accept(io_context& ctx, int listen_fd, struct sockaddr* addr = nullptr, @@ -433,6 +476,10 @@ inline auto async_send(int fd, const void* buffer, size_t length, int flags = 0) return async_send(default_io_context(), fd, buffer, length, flags); } +inline auto async_writev(int fd, struct iovec* iovecs, size_t iovec_count) { + return async_writev(default_io_context(), fd, iovecs, iovec_count); +} + inline auto async_accept(int listen_fd, struct sockaddr* addr = nullptr, socklen_t* addrlen = nullptr, int flags = 0) { return async_accept(default_io_context(), listen_fd, addr, addrlen, flags); diff --git a/include/elio/net/tcp.hpp b/include/elio/net/tcp.hpp index 019d95e..d127844 100644 --- a/include/elio/net/tcp.hpp +++ b/include/elio/net/tcp.hpp @@ -374,6 +374,11 @@ class tcp_stream { return io::async_send(*ctx_, fd_, str.data(), str.size()); } + /// Async writev (scatter-gather write) + auto writev(struct iovec* iovecs, size_t count) { + return io::async_writev(*ctx_, fd_, iovecs, count); + } + /// Wait for socket to be readable auto poll_read() { return io::async_poll_read(*ctx_, fd_); diff --git a/include/elio/net/uds.hpp b/include/elio/net/uds.hpp index 213614a..ed8ab47 100644 --- a/include/elio/net/uds.hpp +++ b/include/elio/net/uds.hpp @@ -202,6 +202,11 @@ class uds_stream { return io::async_send(*ctx_, fd_, str.data(), str.size()); } + /// Async writev (scatter-gather write) + auto writev(struct iovec* iovecs, size_t count) { + return io::async_writev(*ctx_, fd_, iovecs, count); + } + /// Wait for socket to be readable auto poll_read() { return io::async_poll_read(*ctx_, fd_); diff --git a/include/elio/rpc/rpc_protocol.hpp b/include/elio/rpc/rpc_protocol.hpp index 1cb3f48..5c25ab1 100644 --- a/include/elio/rpc/rpc_protocol.hpp +++ b/include/elio/rpc/rpc_protocol.hpp @@ -194,9 +194,11 @@ class request_id_generator { /// Stream concept for TCP or UDS streams template -concept rpc_stream = requires(T& stream, void* buf, const void* cbuf, size_t len) { +concept rpc_stream = requires(T& stream, void* buf, const void* cbuf, size_t len, + struct iovec* iovecs, size_t iov_count) { { stream.read(buf, len) }; { stream.write(cbuf, len) }; + { stream.writev(iovecs, iov_count) }; { stream.is_valid() } -> std::same_as; }; @@ -236,6 +238,45 @@ coro::task write_exact(Stream& stream, const void* buffer, size_t co_return io::io_result{static_cast(length), 0}; } +/// Write all data from iovec array to stream (scatter-gather write) +/// Handles partial writes by adjusting iovec entries +template +coro::task writev_exact(Stream& stream, struct iovec* iovecs, size_t iov_count) { + size_t total_length = 0; + for (size_t i = 0; i < iov_count; ++i) { + total_length += iovecs[i].iov_len; + } + + size_t current_iov = 0; + size_t bytes_written = 0; + + while (current_iov < iov_count) { + auto result = co_await stream.writev(&iovecs[current_iov], iov_count - current_iov); + if (result.result <= 0) { + co_return result; + } + + bytes_written += result.result; + size_t written = static_cast(result.result); + + // Advance through iovecs based on how much was written + while (written > 0 && current_iov < iov_count) { + if (written >= iovecs[current_iov].iov_len) { + written -= iovecs[current_iov].iov_len; + ++current_iov; + } else { + // Partial write within this iovec entry + iovecs[current_iov].iov_base = + static_cast(iovecs[current_iov].iov_base) + written; + iovecs[current_iov].iov_len -= written; + written = 0; + } + } + } + + co_return io::io_result{static_cast(total_length), 0}; +} + /// Read a complete frame from stream /// If the frame has the has_checksum flag set, verifies the CRC32 checksum template @@ -291,80 +332,96 @@ read_frame(Stream& stream) { co_return std::make_pair(header, std::move(payload)); } -/// Write a frame to stream +/// Write a frame to stream using scatter-gather I/O for atomicity /// If the header has the has_checksum flag set, appends CRC32 checksum template coro::task write_frame(Stream& stream, const frame_header& header, const buffer_writer& payload) { - // Write header + // Prepare header bytes auto header_bytes = header.to_bytes(); - auto result = co_await write_exact(stream, header_bytes.data(), frame_header_size); - if (result.result <= 0) { - co_return false; - } - // Write payload - if (payload.size() > 0) { - result = co_await write_exact(stream, payload.data(), payload.size()); - if (result.result <= 0) { - co_return false; - } - } - - // Write checksum if requested + // Compute checksum if needed (must be done before building iovecs) + uint32_t checksum = 0; if (has_flag(header.flags, message_flags::has_checksum)) { - // Compute checksum over header + payload uint32_t crc = hash::crc32_update(header_bytes.data(), frame_header_size, 0xFFFFFFFF); if (payload.size() > 0) { crc = hash::crc32_update(payload.data(), payload.size(), crc); } - uint32_t checksum = hash::crc32_finalize(crc); - - result = co_await write_exact(stream, &checksum, checksum_size); - if (result.result <= 0) { - co_return false; - } + checksum = hash::crc32_finalize(crc); + } + + // Build iovec array for atomic write + struct iovec iovecs[3]; + size_t iov_count = 0; + + // Header + iovecs[iov_count].iov_base = header_bytes.data(); + iovecs[iov_count].iov_len = frame_header_size; + ++iov_count; + + // Payload + if (payload.size() > 0) { + iovecs[iov_count].iov_base = const_cast(payload.data()); + iovecs[iov_count].iov_len = payload.size(); + ++iov_count; + } + + // Checksum + if (has_flag(header.flags, message_flags::has_checksum)) { + iovecs[iov_count].iov_base = &checksum; + iovecs[iov_count].iov_len = checksum_size; + ++iov_count; } - co_return true; + // Write all data atomically using scatter-gather I/O + auto result = co_await writev_exact(stream, iovecs, iov_count); + co_return result.result > 0; } -/// Write a frame with raw payload +/// Write a frame with raw payload using scatter-gather I/O for atomicity /// If the header has the has_checksum flag set, appends CRC32 checksum template coro::task write_frame(Stream& stream, const frame_header& header, const void* payload_data, size_t payload_size) { - // Write header + // Prepare header bytes auto header_bytes = header.to_bytes(); - auto result = co_await write_exact(stream, header_bytes.data(), frame_header_size); - if (result.result <= 0) { - co_return false; - } - - // Write payload - if (payload_size > 0) { - result = co_await write_exact(stream, payload_data, payload_size); - if (result.result <= 0) { - co_return false; - } - } - // Write checksum if requested + // Compute checksum if needed + uint32_t checksum = 0; if (has_flag(header.flags, message_flags::has_checksum)) { - // Compute checksum over header + payload uint32_t crc = hash::crc32_update(header_bytes.data(), frame_header_size, 0xFFFFFFFF); if (payload_size > 0) { crc = hash::crc32_update(payload_data, payload_size, crc); } - uint32_t checksum = hash::crc32_finalize(crc); - - result = co_await write_exact(stream, &checksum, checksum_size); - if (result.result <= 0) { - co_return false; - } + checksum = hash::crc32_finalize(crc); + } + + // Build iovec array for atomic write + struct iovec iovecs[3]; + size_t iov_count = 0; + + // Header + iovecs[iov_count].iov_base = header_bytes.data(); + iovecs[iov_count].iov_len = frame_header_size; + ++iov_count; + + // Payload + if (payload_size > 0) { + iovecs[iov_count].iov_base = const_cast(payload_data); + iovecs[iov_count].iov_len = payload_size; + ++iov_count; + } + + // Checksum + if (has_flag(header.flags, message_flags::has_checksum)) { + iovecs[iov_count].iov_base = &checksum; + iovecs[iov_count].iov_len = checksum_size; + ++iov_count; } - co_return true; + // Write all data atomically using scatter-gather I/O + auto result = co_await writev_exact(stream, iovecs, iov_count); + co_return result.result > 0; } // ============================================================================ diff --git a/wiki/API-Reference.md b/wiki/API-Reference.md index e71dd2f..ce58440 100644 --- a/wiki/API-Reference.md +++ b/wiki/API-Reference.md @@ -731,6 +731,9 @@ public: // Write data (awaitable) /* awaitable */ write(const void* data, size_t size); + // Scatter-gather write (awaitable) - writes multiple buffers atomically + /* awaitable */ writev(struct iovec* iovecs, size_t count); + // Poll for readability (awaitable) /* awaitable */ poll_read(); diff --git a/wiki/Networking.md b/wiki/Networking.md index 1f34c29..bf707f3 100644 --- a/wiki/Networking.md +++ b/wiki/Networking.md @@ -85,6 +85,36 @@ if (peer) { } ``` +### Scatter-Gather I/O (writev) + +For efficient writing of multiple buffers without copying, use `writev()`: + +```cpp +coro::task send_message(tcp_stream& stream) { + // Prepare header and payload separately + std::array header = {0x01, 0x02, ...}; + std::string payload = "Hello, World!"; + + // Write both in a single syscall using scatter-gather I/O + struct iovec iovecs[2]; + iovecs[0].iov_base = header.data(); + iovecs[0].iov_len = header.size(); + iovecs[1].iov_base = const_cast(payload.data()); + iovecs[1].iov_len = payload.size(); + + auto result = co_await stream.writev(iovecs, 2); + if (result.result > 0) { + ELIO_LOG_INFO("Sent {} bytes", result.result); + } +} +``` + +**Benefits of writev:** +- Reduces syscall overhead by combining multiple writes into one +- Avoids buffer copying when you have data in separate locations +- More atomic writes - better behavior under high concurrency +- Used internally by the RPC framework for efficient frame writing + ## Unix Domain Sockets (UDS) Unix Domain Sockets provide high-performance local inter-process communication. Elio supports both filesystem sockets and abstract sockets (Linux-specific). diff --git a/wiki/RPC-Framework.md b/wiki/RPC-Framework.md index e6df132..49c66e9 100644 --- a/wiki/RPC-Framework.md +++ b/wiki/RPC-Framework.md @@ -479,6 +479,16 @@ auto listener = net::uds_listener::bind(addr, ctx); ## Performance Considerations +### Atomic Frame Writing + +The RPC framework uses scatter-gather I/O (`writev`) to write frames atomically. This means the header, payload, and optional checksum are written in a single syscall, which: + +- Reduces the number of syscalls (typically 1 instead of 2-3) +- Minimizes context switching under high concurrency +- Provides better behavior when multiple coroutines make parallel RPC calls with large payloads + +This is handled automatically by `write_frame()` - no special configuration needed. + ### Zero-Copy Deserialization For read-only access, use `buffer_view` directly: