Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions include/elio/io/io_awaitables.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,43 @@ class async_close_awaitable : public io_awaitable_base {
int fd_;
};

/// Awaitable for async writev operations (scatter-gather write)
class async_writev_awaitable : public io_awaitable_base {
public:
async_writev_awaitable(io_context& ctx, int fd, struct iovec* iovecs,
size_t iovec_count) noexcept
: io_awaitable_base(ctx)
, fd_(fd)
, iovecs_(iovecs)
, iovec_count_(iovec_count) {}

void await_suspend(std::coroutine_handle<> awaiter) {
io_request req{};
req.op = io_op::writev;
req.fd = fd_;
req.iovecs = iovecs_;
req.iovec_count = iovec_count_;
req.awaiter = awaiter;

if (!ctx_.prepare(req)) {
result_ = io_result{-EAGAIN, 0};
awaiter.resume();
return;
}
ctx_.submit();
}

io_result await_resume() noexcept {
result_ = io_context::get_last_result();
return result_;
}

private:
int fd_;
struct iovec* iovecs_;
size_t iovec_count_;
};

/// Awaitable for poll (wait for socket readable/writable)
class async_poll_awaitable : public io_awaitable_base {
public:
Expand Down Expand Up @@ -385,6 +422,12 @@ inline auto async_send(io_context& ctx, int fd, const void* buffer,
return async_send_awaitable(ctx, fd, buffer, length, flags);
}

/// Create an async writev awaitable (scatter-gather write)
inline auto async_writev(io_context& ctx, int fd, struct iovec* iovecs,
size_t iovec_count) {
return async_writev_awaitable(ctx, fd, iovecs, iovec_count);
}

/// Create an async accept awaitable
inline auto async_accept(io_context& ctx, int listen_fd,
struct sockaddr* addr = nullptr,
Expand Down Expand Up @@ -433,6 +476,10 @@ inline auto async_send(int fd, const void* buffer, size_t length, int flags = 0)
return async_send(default_io_context(), fd, buffer, length, flags);
}

inline auto async_writev(int fd, struct iovec* iovecs, size_t iovec_count) {
return async_writev(default_io_context(), fd, iovecs, iovec_count);
}

inline auto async_accept(int listen_fd, struct sockaddr* addr = nullptr,
socklen_t* addrlen = nullptr, int flags = 0) {
return async_accept(default_io_context(), listen_fd, addr, addrlen, flags);
Expand Down
5 changes: 5 additions & 0 deletions include/elio/net/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ class tcp_stream {
return io::async_send(*ctx_, fd_, str.data(), str.size());
}

/// Async writev (scatter-gather write)
auto writev(struct iovec* iovecs, size_t count) {
return io::async_writev(*ctx_, fd_, iovecs, count);
}

/// Wait for socket to be readable
auto poll_read() {
return io::async_poll_read(*ctx_, fd_);
Expand Down
5 changes: 5 additions & 0 deletions include/elio/net/uds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ class uds_stream {
return io::async_send(*ctx_, fd_, str.data(), str.size());
}

/// Async writev (scatter-gather write)
auto writev(struct iovec* iovecs, size_t count) {
return io::async_writev(*ctx_, fd_, iovecs, count);
}

/// Wait for socket to be readable
auto poll_read() {
return io::async_poll_read(*ctx_, fd_);
Expand Down
151 changes: 104 additions & 47 deletions include/elio/rpc/rpc_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ class request_id_generator {

/// Stream concept for TCP or UDS streams
template<typename T>
concept rpc_stream = requires(T& stream, void* buf, const void* cbuf, size_t len) {
concept rpc_stream = requires(T& stream, void* buf, const void* cbuf, size_t len,
struct iovec* iovecs, size_t iov_count) {
{ stream.read(buf, len) };
{ stream.write(cbuf, len) };
{ stream.writev(iovecs, iov_count) };
{ stream.is_valid() } -> std::same_as<bool>;
};

Expand Down Expand Up @@ -236,6 +238,45 @@ coro::task<io::io_result> write_exact(Stream& stream, const void* buffer, size_t
co_return io::io_result{static_cast<int32_t>(length), 0};
}

/// Write all data from iovec array to stream (scatter-gather write)
/// Handles partial writes by adjusting iovec entries
template<rpc_stream Stream>
coro::task<io::io_result> writev_exact(Stream& stream, struct iovec* iovecs, size_t iov_count) {
size_t total_length = 0;
for (size_t i = 0; i < iov_count; ++i) {
total_length += iovecs[i].iov_len;
}

size_t current_iov = 0;
size_t bytes_written = 0;

while (current_iov < iov_count) {
auto result = co_await stream.writev(&iovecs[current_iov], iov_count - current_iov);
if (result.result <= 0) {
co_return result;
}

bytes_written += result.result;
size_t written = static_cast<size_t>(result.result);

// Advance through iovecs based on how much was written
while (written > 0 && current_iov < iov_count) {
if (written >= iovecs[current_iov].iov_len) {
written -= iovecs[current_iov].iov_len;
++current_iov;
} else {
// Partial write within this iovec entry
iovecs[current_iov].iov_base =
static_cast<uint8_t*>(iovecs[current_iov].iov_base) + written;
iovecs[current_iov].iov_len -= written;
written = 0;
}
}
}

co_return io::io_result{static_cast<int32_t>(total_length), 0};
}

/// Read a complete frame from stream
/// If the frame has the has_checksum flag set, verifies the CRC32 checksum
template<rpc_stream Stream>
Expand Down Expand Up @@ -291,80 +332,96 @@ read_frame(Stream& stream) {
co_return std::make_pair(header, std::move(payload));
}

/// Write a frame to stream
/// Write a frame to stream using scatter-gather I/O for atomicity
/// If the header has the has_checksum flag set, appends CRC32 checksum
template<rpc_stream Stream>
coro::task<bool> write_frame(Stream& stream, const frame_header& header,
const buffer_writer& payload) {
// Write header
// Prepare header bytes
auto header_bytes = header.to_bytes();
auto result = co_await write_exact(stream, header_bytes.data(), frame_header_size);
if (result.result <= 0) {
co_return false;
}

// Write payload
if (payload.size() > 0) {
result = co_await write_exact(stream, payload.data(), payload.size());
if (result.result <= 0) {
co_return false;
}
}

// Write checksum if requested
// Compute checksum if needed (must be done before building iovecs)
uint32_t checksum = 0;
if (has_flag(header.flags, message_flags::has_checksum)) {
// Compute checksum over header + payload
uint32_t crc = hash::crc32_update(header_bytes.data(), frame_header_size, 0xFFFFFFFF);
if (payload.size() > 0) {
crc = hash::crc32_update(payload.data(), payload.size(), crc);
}
uint32_t checksum = hash::crc32_finalize(crc);

result = co_await write_exact(stream, &checksum, checksum_size);
if (result.result <= 0) {
co_return false;
}
checksum = hash::crc32_finalize(crc);
}

// Build iovec array for atomic write
struct iovec iovecs[3];
size_t iov_count = 0;

// Header
iovecs[iov_count].iov_base = header_bytes.data();
iovecs[iov_count].iov_len = frame_header_size;
++iov_count;

// Payload
if (payload.size() > 0) {
iovecs[iov_count].iov_base = const_cast<uint8_t*>(payload.data());
iovecs[iov_count].iov_len = payload.size();
++iov_count;
}

// Checksum
if (has_flag(header.flags, message_flags::has_checksum)) {
iovecs[iov_count].iov_base = &checksum;
iovecs[iov_count].iov_len = checksum_size;
++iov_count;
}

co_return true;
// Write all data atomically using scatter-gather I/O
auto result = co_await writev_exact(stream, iovecs, iov_count);
co_return result.result > 0;
}

/// Write a frame with raw payload
/// Write a frame with raw payload using scatter-gather I/O for atomicity
/// If the header has the has_checksum flag set, appends CRC32 checksum
template<rpc_stream Stream>
coro::task<bool> write_frame(Stream& stream, const frame_header& header,
const void* payload_data, size_t payload_size) {
// Write header
// Prepare header bytes
auto header_bytes = header.to_bytes();
auto result = co_await write_exact(stream, header_bytes.data(), frame_header_size);
if (result.result <= 0) {
co_return false;
}

// Write payload
if (payload_size > 0) {
result = co_await write_exact(stream, payload_data, payload_size);
if (result.result <= 0) {
co_return false;
}
}

// Write checksum if requested
// Compute checksum if needed
uint32_t checksum = 0;
if (has_flag(header.flags, message_flags::has_checksum)) {
// Compute checksum over header + payload
uint32_t crc = hash::crc32_update(header_bytes.data(), frame_header_size, 0xFFFFFFFF);
if (payload_size > 0) {
crc = hash::crc32_update(payload_data, payload_size, crc);
}
uint32_t checksum = hash::crc32_finalize(crc);

result = co_await write_exact(stream, &checksum, checksum_size);
if (result.result <= 0) {
co_return false;
}
checksum = hash::crc32_finalize(crc);
}

// Build iovec array for atomic write
struct iovec iovecs[3];
size_t iov_count = 0;

// Header
iovecs[iov_count].iov_base = header_bytes.data();
iovecs[iov_count].iov_len = frame_header_size;
++iov_count;

// Payload
if (payload_size > 0) {
iovecs[iov_count].iov_base = const_cast<void*>(payload_data);
iovecs[iov_count].iov_len = payload_size;
++iov_count;
}

// Checksum
if (has_flag(header.flags, message_flags::has_checksum)) {
iovecs[iov_count].iov_base = &checksum;
iovecs[iov_count].iov_len = checksum_size;
++iov_count;
}

co_return true;
// Write all data atomically using scatter-gather I/O
auto result = co_await writev_exact(stream, iovecs, iov_count);
co_return result.result > 0;
}

// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions wiki/API-Reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,9 @@ public:
// Write data (awaitable)
/* awaitable */ write(const void* data, size_t size);

// Scatter-gather write (awaitable) - writes multiple buffers atomically
/* awaitable */ writev(struct iovec* iovecs, size_t count);

// Poll for readability (awaitable)
/* awaitable */ poll_read();

Expand Down
30 changes: 30 additions & 0 deletions wiki/Networking.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,36 @@ if (peer) {
}
```

### Scatter-Gather I/O (writev)

For efficient writing of multiple buffers without copying, use `writev()`:

```cpp
coro::task<void> send_message(tcp_stream& stream) {
// Prepare header and payload separately
std::array<uint8_t, 8> header = {0x01, 0x02, ...};
std::string payload = "Hello, World!";

// Write both in a single syscall using scatter-gather I/O
struct iovec iovecs[2];
iovecs[0].iov_base = header.data();
iovecs[0].iov_len = header.size();
iovecs[1].iov_base = const_cast<char*>(payload.data());
iovecs[1].iov_len = payload.size();

auto result = co_await stream.writev(iovecs, 2);
if (result.result > 0) {
ELIO_LOG_INFO("Sent {} bytes", result.result);
}
}
```

**Benefits of writev:**
- Reduces syscall overhead by combining multiple writes into one
- Avoids buffer copying when you have data in separate locations
- More atomic writes - better behavior under high concurrency
- Used internally by the RPC framework for efficient frame writing

## Unix Domain Sockets (UDS)

Unix Domain Sockets provide high-performance local inter-process communication. Elio supports both filesystem sockets and abstract sockets (Linux-specific).
Expand Down
10 changes: 10 additions & 0 deletions wiki/RPC-Framework.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ auto listener = net::uds_listener::bind(addr, ctx);

## Performance Considerations

### Atomic Frame Writing

The RPC framework uses scatter-gather I/O (`writev`) to write frames atomically. This means the header, payload, and optional checksum are written in a single syscall, which:

- Reduces the number of syscalls (typically 1 instead of 2-3)
- Minimizes context switching under high concurrency
- Provides better behavior when multiple coroutines make parallel RPC calls with large payloads

This is handled automatically by `write_frame()` - no special configuration needed.

### Zero-Copy Deserialization

For read-only access, use `buffer_view` directly:
Expand Down