Skip to content

Implement MSG_ZEROCOPY for blight messages #398

@Eeems

Description

@Eeems
          @Eeems Here's a prototype implementation of `send_blocking` that uses MSG_ZEROCOPY:
// Helper struct to track zero-copy buffers
struct zerocopy_buffer {
    blight_data_t data;
    size_t size;
    int refcount;
    // Could add a callback for when the buffer is fully released
};

// Global buffer tracking (in a real implementation, this would be more sophisticated)
#include <unordered_map>
static std::unordered_map<blight_data_t, zerocopy_buffer*> zerocopy_buffers;
static std::mutex zerocopy_mutex;

// Register a buffer for zero-copy transmission
zerocopy_buffer* register_zerocopy_buffer(blight_data_t data, size_t size) {
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    auto buffer = new zerocopy_buffer{data, size, 1};
    zerocopy_buffers[data] = buffer;
    return buffer;
}

// Function signature changed to return the zerocopy_buffer for the caller to track
zerocopy_buffer* send_blocking_zerocopy(int fd, const blight_data_t data, ssize_t size) {
    // Register the buffer for zero-copy transmission
    zerocopy_buffer* buffer = register_zerocopy_buffer(const_cast<blight_data_t>(data), size);
    
    ssize_t sent = 0;
    ssize_t res = 0;
    
    while (sent < size) {
        if (!wait_for_send(fd)) {
            if (errno == EAGAIN || errno == EINTR) {
                short_pause();
                continue;
            }
            // Clean up on failure
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        // Use MSG_ZEROCOPY flag for zero-copy transmission
        res = ::send(fd, &data[sent], size - sent, MSG_NOSIGNAL | MSG_ZEROCOPY);
        
        if (res > 0) {
            sent += res;
            continue;
        }
        
        if (res == 0) {
            // Connection closed
            errno = ECONNRESET;
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        if (errno != EAGAIN && errno != EINTR) {
            // Unexpected error
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        // Temporary error, retry
        short_pause();
    }
    
    // Setup poll to monitor for completion events if needed
    return buffer;
}

// Process zero-copy completion notifications
bool process_zerocopy_completions(int fd) {
    struct sock_extended_err err;
    struct msghdr msg = {};
    struct iovec iov;
    
    char control[CMSG_SPACE(sizeof(struct sock_extended_err))];
    msg.msg_control = control;
    msg.msg_controllen = sizeof(control);
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    
    int res = recvmsg(fd, &msg, MSG_ERRQUEUE);
    if (res == -1)
        return false;
    
    struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
    if (!cmsg || cmsg->cmsg_level != SOL_IP || cmsg->cmsg_type != IP_RECVERR)
        return false;
    
    memcpy(&err, CMSG_DATA(cmsg), sizeof(err));
    if (err.ee_errno != 0 || err.ee_origin != SO_EE_ORIGIN_ZEROCOPY)
        return false;
    
    // Process completion notification
    // err.ee_data and err.ee_info contain information about completed buffers
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    // Update reference counts for completed buffers
    // In a real implementation, we'd map err.ee_data to specific buffers
    
    return true;
}

// Release a zero-copy buffer (decrease refcount, free if refcount reaches 0)
void release_zerocopy_buffer(zerocopy_buffer* buffer) {
    if (!buffer)
        return;
        
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    if (--buffer->refcount <= 0) {
        zerocopy_buffers.erase(buffer->data);
        delete[] buffer->data;  // Only free the data when refcount reaches 0
        delete buffer;
    }
}

This implementation:

  1. Creates a tracking system for zero-copy buffers
  2. Uses MSG_ZEROCOPY flag with send()
  3. Provides a mechanism to track when it's safe to free the buffer
  4. Includes a function to process completion notifications

Note that in a real implementation, you'd want to integrate this with your event loop to process the MSG_ERRQUEUE messages that signal when buffers can be released.


🧠 Learnings used
Learnt from: Eeems
PR: Eeems-Org/oxide#397
File: shared/libblight_protocol/socket.cpp:74-99
Timestamp: 2025-03-14T05:57:20.573Z
Learning: In shared/libblight_protocol/socket.cpp, the `recv_blocking` method is intentionally designed to block indefinitely, as it's only meant to be used in situations where data is guaranteed to be available due to a previous header being received. The non-blocking alternative with timeouts and retry limits is implemented in the `recv` method in the same file.

Originally posted by @coderabbitai[bot] in #397 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions