diff --git a/include/elio/rpc/rpc_client.hpp b/include/elio/rpc/rpc_client.hpp index 8e1c44e..a037ac4 100644 --- a/include/elio/rpc/rpc_client.hpp +++ b/include/elio/rpc/rpc_client.hpp @@ -44,10 +44,22 @@ struct pending_request { message_buffer response_data; frame_header response_header; rpc_error error = rpc_error::success; - bool completed = false; + std::atomic completed{false}; bool timed_out = false; pending_request() = default; + + /// Try to complete the request. Returns true if this call completed it. + bool try_complete() noexcept { + bool expected = false; + return completed.compare_exchange_strong(expected, true, + std::memory_order_acq_rel); + } + + /// Check if completed (read-only) + bool is_completed() const noexcept { + return completed.load(std::memory_order_acquire); + } }; // ============================================================================ @@ -123,9 +135,8 @@ class rpc_client : public std::enable_shared_from_this> { { std::lock_guard lock(pending_mutex_); for (auto& [id, req] : pending_requests_) { - if (!req->completed) { + if (req->try_complete()) { req->error = rpc_error::connection_closed; - req->completed = true; req->completion_event.set(); } } @@ -215,10 +226,8 @@ class rpc_client : public std::enable_shared_from_this> { // Register cancellation callback auto cancel_registration = token.on_cancel([this, pending, request_id]() { - std::lock_guard lock(pending_mutex_); - if (!pending->completed) { + if (pending->try_complete()) { pending->error = rpc_error::cancelled; - pending->completed = true; pending->completion_event.set(); } }); @@ -243,8 +252,7 @@ class rpc_client : public std::enable_shared_from_this> { // Wait for response with timeout // Start a timeout coroutine auto self = this->shared_from_this(); - auto timeout_task = [](ptr client, - std::chrono::milliseconds ms, + auto timeout_task = [](std::chrono::milliseconds ms, std::shared_ptr pending, coro::cancel_token tok) -> coro::task @@ -252,21 +260,17 @@ class rpc_client : public std::enable_shared_from_this> { auto result = co_await time::sleep_for(ms, tok); // Only timeout if sleep completed normally (not cancelled) - if (result == coro::cancel_result::completed && !pending->completed) { - std::lock_guard lock(client->pending_mutex_); - if (!pending->completed) { - pending->timed_out = true; - pending->error = rpc_error::timeout; - pending->completed = true; - pending->completion_event.set(); - } + if (result == coro::cancel_result::completed && pending->try_complete()) { + pending->timed_out = true; + pending->error = rpc_error::timeout; + pending->completion_event.set(); } }; // Spawn timeout watcher auto* sched = runtime::scheduler::current(); if (sched) { - auto task = timeout_task(self, + auto task = timeout_task( std::chrono::duration_cast(timeout), pending, token); sched->spawn(task.release()); } @@ -353,25 +357,20 @@ class rpc_client : public std::enable_shared_from_this> { // Setup timeout auto self = this->shared_from_this(); - auto timeout_task = [](ptr client, - std::chrono::milliseconds ms, + auto timeout_task = [](std::chrono::milliseconds ms, std::shared_ptr pending) -> coro::task { co_await time::sleep_for(ms); - if (!pending->completed) { - std::lock_guard lock(client->pending_mutex_); - if (!pending->completed) { - pending->timed_out = true; - pending->completed = true; - pending->completion_event.set(); - } + if (pending->try_complete()) { + pending->timed_out = true; + pending->completion_event.set(); } }; auto* sched = runtime::scheduler::current(); if (sched) { - auto task = timeout_task(self, timeout, pending); + auto task = timeout_task(timeout, pending); sched->spawn(task.release()); } @@ -464,10 +463,10 @@ class rpc_client : public std::enable_shared_from_this> { pending = it->second; } - if (!pending->completed) { + // Use atomic try_complete to ensure only one thread sets the response + if (pending->try_complete()) { pending->response_header = header; pending->response_data = std::move(payload); - pending->completed = true; pending->completion_event.set(); } } @@ -485,8 +484,7 @@ class rpc_client : public std::enable_shared_from_this> { pending = it->second; } - if (!pending->completed) { - pending->completed = true; + if (pending->try_complete()) { pending->completion_event.set(); } }