Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions include/elio/rpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,22 @@ struct pending_request {
message_buffer response_data;
frame_header response_header;
rpc_error error = rpc_error::success;
bool completed = false;
std::atomic<bool> completed{false};
bool timed_out = false;

pending_request() = default;

/// Try to complete the request. Returns true if this call completed it.
bool try_complete() noexcept {
bool expected = false;
return completed.compare_exchange_strong(expected, true,
std::memory_order_acq_rel);
}

/// Check if completed (read-only)
bool is_completed() const noexcept {
return completed.load(std::memory_order_acquire);
}
};

// ============================================================================
Expand Down Expand Up @@ -123,9 +135,8 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {
{
std::lock_guard<std::mutex> lock(pending_mutex_);
for (auto& [id, req] : pending_requests_) {
if (!req->completed) {
if (req->try_complete()) {
req->error = rpc_error::connection_closed;
req->completed = true;
req->completion_event.set();
}
}
Expand Down Expand Up @@ -215,10 +226,8 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {

// Register cancellation callback
auto cancel_registration = token.on_cancel([this, pending, request_id]() {
std::lock_guard<std::mutex> lock(pending_mutex_);
if (!pending->completed) {
if (pending->try_complete()) {
pending->error = rpc_error::cancelled;
pending->completed = true;
pending->completion_event.set();
}
});
Expand All @@ -243,30 +252,25 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {
// Wait for response with timeout
// Start a timeout coroutine
auto self = this->shared_from_this();
auto timeout_task = [](ptr client,
std::chrono::milliseconds ms,
auto timeout_task = [](std::chrono::milliseconds ms,
std::shared_ptr<pending_request> pending,
coro::cancel_token tok)
-> coro::task<void>
{
auto result = co_await time::sleep_for(ms, tok);

// Only timeout if sleep completed normally (not cancelled)
if (result == coro::cancel_result::completed && !pending->completed) {
std::lock_guard<std::mutex> lock(client->pending_mutex_);
if (!pending->completed) {
pending->timed_out = true;
pending->error = rpc_error::timeout;
pending->completed = true;
pending->completion_event.set();
}
if (result == coro::cancel_result::completed && pending->try_complete()) {
pending->timed_out = true;
pending->error = rpc_error::timeout;
pending->completion_event.set();
}
};

// Spawn timeout watcher
auto* sched = runtime::scheduler::current();
if (sched) {
auto task = timeout_task(self,
auto task = timeout_task(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout), pending, token);
sched->spawn(task.release());
}
Expand Down Expand Up @@ -353,25 +357,20 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {

// Setup timeout
auto self = this->shared_from_this();
auto timeout_task = [](ptr client,
std::chrono::milliseconds ms,
auto timeout_task = [](std::chrono::milliseconds ms,
std::shared_ptr<pending_request> pending)
-> coro::task<void>
{
co_await time::sleep_for(ms);
if (!pending->completed) {
std::lock_guard<std::mutex> lock(client->pending_mutex_);
if (!pending->completed) {
pending->timed_out = true;
pending->completed = true;
pending->completion_event.set();
}
if (pending->try_complete()) {
pending->timed_out = true;
pending->completion_event.set();
}
};

auto* sched = runtime::scheduler::current();
if (sched) {
auto task = timeout_task(self, timeout, pending);
auto task = timeout_task(timeout, pending);
sched->spawn(task.release());
}

Expand Down Expand Up @@ -464,10 +463,10 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {
pending = it->second;
}

if (!pending->completed) {
// Use atomic try_complete to ensure only one thread sets the response
if (pending->try_complete()) {
pending->response_header = header;
pending->response_data = std::move(payload);
pending->completed = true;
pending->completion_event.set();
}
}
Expand All @@ -485,8 +484,7 @@ class rpc_client : public std::enable_shared_from_this<rpc_client<Stream>> {
pending = it->second;
}

if (!pending->completed) {
pending->completed = true;
if (pending->try_complete()) {
pending->completion_event.set();
}
}
Expand Down