From c35d5e4bb338b4c92d29ffc8423dc7f281be13b6 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Tue, 27 Jan 2026 11:51:18 +0800 Subject: [PATCH] Cancel token --- include/elio/coro/cancel_token.hpp | 230 +++++++++++++++++++++++++ include/elio/elio.hpp | 1 + include/elio/http/http_client.hpp | 111 ++++++++++-- include/elio/http/sse_client.hpp | 132 ++++++++++---- include/elio/http/websocket_client.hpp | 136 ++++++++++----- include/elio/io/epoll_backend.hpp | 123 +++++++++++-- include/elio/rpc/rpc_client.hpp | 71 +++++++- include/elio/time/timer.hpp | 138 +++++++++++++++ tests/unit/test_timer.cpp | 161 +++++++++++++++++ wiki/API-Reference.md | 126 +++++++++++++- wiki/Core-Concepts.md | 83 +++++++++ 11 files changed, 1205 insertions(+), 107 deletions(-) create mode 100644 include/elio/coro/cancel_token.hpp diff --git a/include/elio/coro/cancel_token.hpp b/include/elio/coro/cancel_token.hpp new file mode 100644 index 0000000..308fb93 --- /dev/null +++ b/include/elio/coro/cancel_token.hpp @@ -0,0 +1,230 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace elio::coro { + +/// Result of a cancellable operation +enum class cancel_result { + completed, ///< Operation completed normally + cancelled ///< Operation was cancelled +}; + +namespace detail { + +/// Shared cancellation state (implementation detail) +struct cancel_state { + std::atomic cancelled{false}; + std::mutex mutex; + std::vector>> callbacks; + uint64_t next_id = 1; + + uint64_t add_callback(std::function cb) { + std::lock_guard lock(mutex); + if (cancelled.load(std::memory_order_relaxed)) { + // Already cancelled, invoke immediately outside lock + // Need to release lock first + mutex.unlock(); + cb(); + mutex.lock(); + return 0; + } + uint64_t id = next_id++; + callbacks.emplace_back(id, std::move(cb)); + return id; + } + + void remove_callback(uint64_t id) { + std::lock_guard lock(mutex); + callbacks.erase( + std::remove_if(callbacks.begin(), callbacks.end(), + [id](const auto& p) { return p.first == id; }), + callbacks.end() + ); + } + + void trigger() { + std::vector> to_invoke; + { + std::lock_guard lock(mutex); + if (cancelled.exchange(true, std::memory_order_release)) { + return; // Already cancelled + } + for (auto& [id, cb] : callbacks) { + to_invoke.push_back(std::move(cb)); + } + callbacks.clear(); + } + // Invoke callbacks outside the lock + for (auto& cb : to_invoke) { + cb(); + } + } +}; + +} // namespace detail + +/// Forward declaration +class cancel_source; + +/// Registration handle for cancel callbacks +class cancel_registration { +public: + cancel_registration() = default; + cancel_registration(cancel_registration&& other) noexcept + : state_(std::move(other.state_)), id_(other.id_) { + other.id_ = 0; + } + cancel_registration& operator=(cancel_registration&& other) noexcept { + if (this != &other) { + unregister(); + state_ = std::move(other.state_); + id_ = other.id_; + other.id_ = 0; + } + return *this; + } + ~cancel_registration() { unregister(); } + + // Non-copyable + cancel_registration(const cancel_registration&) = delete; + cancel_registration& operator=(const cancel_registration&) = delete; + + /// Manually unregister the callback + void unregister() { + if (state_ && id_ != 0) { + state_->remove_callback(id_); + id_ = 0; + } + } + +private: + template friend class basic_cancel_token; + friend class cancel_token; + + cancel_registration(std::shared_ptr state, uint64_t id) + : state_(std::move(state)), id_(id) {} + + std::shared_ptr state_; + uint64_t id_ = 0; +}; + +/// A token that can be used to check for and respond to cancellation requests. +/// +/// cancel_token is a lightweight handle that can be copied and passed to +/// functions that should be cancellable. Multiple tokens can share the same +/// cancellation state via a cancel_source. +/// +/// Example: +/// ```cpp +/// task cancellable_work(cancel_token token) { +/// while (!token.is_cancelled()) { +/// // do work +/// auto result = co_await time::sleep_for(100ms, token); +/// if (result == cancel_result::cancelled) break; +/// } +/// } +/// ``` +class cancel_token { +public: + using registration = cancel_registration; + + /// Default constructor creates an empty (never-cancelled) token + cancel_token() = default; + + /// Check if cancellation has been requested + bool is_cancelled() const noexcept { + return state_ && state_->cancelled.load(std::memory_order_acquire); + } + + /// Implicit conversion to bool - returns true if NOT cancelled + /// Allows: if (token) { /* not cancelled */ } + explicit operator bool() const noexcept { + return !is_cancelled(); + } + + /// Register a callback to be invoked when cancellation is requested. + /// The callback will be invoked immediately if already cancelled. + /// @param callback Function to call on cancellation + /// @return Registration handle (callback unregisters when handle is destroyed) + template + [[nodiscard]] registration on_cancel(F&& callback) const { + if (!state_) { + return registration{}; + } + return registration{state_, state_->add_callback(std::forward(callback))}; + } + + /// Register a coroutine handle to be resumed when cancelled. + /// @param handle Coroutine to resume on cancellation + /// @return Registration handle + [[nodiscard]] registration on_cancel_resume(std::coroutine_handle<> handle) const { + return on_cancel([handle]() { + if (handle && !handle.done()) { + handle.resume(); + } + }); + } + +private: + friend class cancel_source; + + explicit cancel_token(std::shared_ptr state) + : state_(std::move(state)) {} + + std::shared_ptr state_; +}; + +/// A source of cancellation that can create tokens and trigger cancellation. +/// +/// cancel_source owns the cancellation state and can create multiple tokens +/// that share the same state. When cancel() is called, all associated tokens +/// become cancelled and their registered callbacks are invoked. +/// +/// Example: +/// ```cpp +/// cancel_source source; +/// auto token = source.get_token(); +/// +/// // Pass token to cancellable operations +/// auto task = do_work(token); +/// +/// // Later, cancel the operation +/// source.cancel(); +/// ``` +class cancel_source { +public: + /// Create a new cancel source + cancel_source() + : state_(std::make_shared()) {} + + /// Get a token associated with this source + cancel_token get_token() const noexcept { + return cancel_token{state_}; + } + + /// Request cancellation + /// All registered callbacks will be invoked and all tokens will report + /// is_cancelled() == true + void cancel() { + if (state_) { + state_->trigger(); + } + } + + /// Check if cancellation has been requested + bool is_cancelled() const noexcept { + return state_ && state_->cancelled.load(std::memory_order_acquire); + } + +private: + std::shared_ptr state_; +}; + +} // namespace elio::coro diff --git a/include/elio/elio.hpp b/include/elio/elio.hpp index 4d1bebe..7d44543 100644 --- a/include/elio/elio.hpp +++ b/include/elio/elio.hpp @@ -17,6 +17,7 @@ #include "coro/task.hpp" #include "coro/awaitable_base.hpp" #include "coro/frame.hpp" +#include "coro/cancel_token.hpp" // Runtime scheduler #include "runtime/scheduler.hpp" diff --git a/include/elio/http/http_client.hpp b/include/elio/http/http_client.hpp index afeb998..4e48da7 100644 --- a/include/elio/http/http_client.hpp +++ b/include/elio/http/http_client.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -216,7 +217,12 @@ class client { /// Perform HTTP GET request /// @return Response on success, std::nullopt on error (check errno) coro::task> get(std::string_view url_str) { - return request_url(method::GET, url_str, ""); + return request_url(method::GET, url_str, "", "", coro::cancel_token{}); + } + + /// Perform HTTP GET request with cancellation support + coro::task> get(std::string_view url_str, coro::cancel_token token) { + return request_url(method::GET, url_str, "", "", std::move(token)); } /// Perform HTTP POST request @@ -224,7 +230,15 @@ class client { coro::task> post(std::string_view url_str, std::string_view body, std::string_view content_type = mime::application_form_urlencoded) { - return request_url(method::POST, url_str, body, content_type); + return request_url(method::POST, url_str, body, content_type, coro::cancel_token{}); + } + + /// Perform HTTP POST request with cancellation support + coro::task> post(std::string_view url_str, + std::string_view body, + coro::cancel_token token, + std::string_view content_type = mime::application_form_urlencoded) { + return request_url(method::POST, url_str, body, content_type, std::move(token)); } /// Perform HTTP PUT request @@ -232,13 +246,26 @@ class client { coro::task> put(std::string_view url_str, std::string_view body, std::string_view content_type = mime::application_json) { - return request_url(method::PUT, url_str, body, content_type); + return request_url(method::PUT, url_str, body, content_type, coro::cancel_token{}); + } + + /// Perform HTTP PUT request with cancellation support + coro::task> put(std::string_view url_str, + std::string_view body, + coro::cancel_token token, + std::string_view content_type = mime::application_json) { + return request_url(method::PUT, url_str, body, content_type, std::move(token)); } /// Perform HTTP DELETE request /// @return Response on success, std::nullopt on error (check errno) coro::task> del(std::string_view url_str) { - return request_url(method::DELETE_, url_str, ""); + return request_url(method::DELETE_, url_str, "", "", coro::cancel_token{}); + } + + /// Perform HTTP DELETE request with cancellation support + coro::task> del(std::string_view url_str, coro::cancel_token token) { + return request_url(method::DELETE_, url_str, "", "", std::move(token)); } /// Perform HTTP PATCH request @@ -246,19 +273,37 @@ class client { coro::task> patch(std::string_view url_str, std::string_view body, std::string_view content_type = mime::application_json) { - return request_url(method::PATCH, url_str, body, content_type); + return request_url(method::PATCH, url_str, body, content_type, coro::cancel_token{}); + } + + /// Perform HTTP PATCH request with cancellation support + coro::task> patch(std::string_view url_str, + std::string_view body, + coro::cancel_token token, + std::string_view content_type = mime::application_json) { + return request_url(method::PATCH, url_str, body, content_type, std::move(token)); } /// Perform HTTP HEAD request /// @return Response on success, std::nullopt on error (check errno) coro::task> head(std::string_view url_str) { - return request_url(method::HEAD, url_str, ""); + return request_url(method::HEAD, url_str, "", "", coro::cancel_token{}); + } + + /// Perform HTTP HEAD request with cancellation support + coro::task> head(std::string_view url_str, coro::cancel_token token) { + return request_url(method::HEAD, url_str, "", "", std::move(token)); } /// Send a custom request /// @return Response on success, std::nullopt on error (check errno) coro::task> send(request& req, const url& target) { - return send_request(req, target, 0); + return send_request(req, target, 0, coro::cancel_token{}); + } + + /// Send a custom request with cancellation support + coro::task> send(request& req, const url& target, coro::cancel_token token) { + return send_request(req, target, 0, std::move(token)); } /// Get TLS context for configuration @@ -273,7 +318,14 @@ class client { coro::task> request_url(method m, std::string_view url_str, std::string_view body, - std::string_view content_type = "") { + std::string_view content_type, + coro::cancel_token token) { + // Check if already cancelled + if (token.is_cancelled()) { + errno = ECANCELED; + co_return std::nullopt; + } + auto parsed = url::parse(url_str); if (!parsed) { ELIO_LOG_ERROR("Invalid URL: {}", url_str); @@ -295,11 +347,19 @@ class client { req.set_header("User-Agent", config_.user_agent); } - co_return co_await send_request(req, *parsed, 0); + co_return co_await send_request(req, *parsed, 0, std::move(token)); } /// Send request with redirect handling - coro::task> send_request(request& req, const url& target, size_t redirect_count) { + coro::task> send_request(request& req, const url& target, + size_t redirect_count, + coro::cancel_token token) { + // Check if cancelled + if (token.is_cancelled()) { + errno = ECANCELED; + co_return std::nullopt; + } + // Get connection from pool auto conn_opt = co_await pool_.acquire(*io_ctx_, target.host, target.effective_port(), target.is_secure(), &tls_ctx_); @@ -325,6 +385,12 @@ class client { ELIO_LOG_DEBUG("Sending request to {}:{}\n{}", target.host, target.effective_port(), request_data); + // Check cancellation before write + if (token.is_cancelled()) { + errno = ECANCELED; + co_return std::nullopt; + } + auto write_result = co_await conn.write(request_data); if (write_result.result <= 0) { ELIO_LOG_ERROR("Failed to send request: {}", strerror(-write_result.result)); @@ -337,6 +403,12 @@ class client { response_parser parser; while (!parser.is_complete() && !parser.has_error()) { + // Check cancellation before read + if (token.is_cancelled()) { + errno = ECANCELED; + co_return std::nullopt; + } + auto read_result = co_await conn.read(buffer.data(), buffer.size()); if (read_result.result <= 0) { @@ -428,7 +500,7 @@ class client { } } - co_return co_await send_request(redirect_req, *redirect_url, redirect_count + 1); + co_return co_await send_request(redirect_req, *redirect_url, redirect_count + 1, token); } } } @@ -451,6 +523,13 @@ inline coro::task> get(io::io_context& io_ctx, std::stri co_return co_await c.get(url); } +/// Perform HTTP GET request with cancellation support +inline coro::task> get(io::io_context& io_ctx, std::string_view url, + coro::cancel_token token) { + client c(io_ctx); + co_return co_await c.get(url, std::move(token)); +} + /// Perform HTTP POST request /// @return Response on success, std::nullopt on error (check errno) inline coro::task> post(io::io_context& io_ctx, @@ -461,4 +540,14 @@ inline coro::task> post(io::io_context& io_ctx, co_return co_await c.post(url, body, content_type); } +/// Perform HTTP POST request with cancellation support +inline coro::task> post(io::io_context& io_ctx, + std::string_view url, + std::string_view body, + coro::cancel_token token, + std::string_view content_type = mime::application_form_urlencoded) { + client c(io_ctx); + co_return co_await c.post(url, body, std::move(token), content_type); +} + } // namespace elio::http diff --git a/include/elio/http/sse_client.hpp b/include/elio/http/sse_client.hpp index 150c78c..c6b5590 100644 --- a/include/elio/http/sse_client.hpp +++ b/include/elio/http/sse_client.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -229,17 +230,15 @@ class sse_client { /// @param url HTTP(S) URL /// @return true on success coro::task connect(std::string_view url_str) { - // Parse URL - auto parsed = url::parse(url_str); - if (!parsed) { - ELIO_LOG_ERROR("Invalid SSE URL: {}", url_str); - co_return false; - } - - url_ = *parsed; - state_ = client_state::connecting; - - co_return co_await do_connect(); + return connect_impl(url_str, coro::cancel_token{}); + } + + /// Connect to an SSE endpoint with cancellation support + /// @param url HTTP(S) URL + /// @param token Cancellation token + /// @return true on success + coro::task connect(std::string_view url_str, coro::cancel_token token) { + return connect_impl(url_str, std::move(token)); } /// Get connection state @@ -253,7 +252,66 @@ class sse_client { /// Receive next event (blocks until event available or connection closed) coro::task> receive() { + return receive_impl(coro::cancel_token{}); + } + + /// Receive next event with cancellation support + /// @param token Cancellation token + /// @return Event on success, std::nullopt on close/error/cancel + coro::task> receive(coro::cancel_token token) { + return receive_impl(std::move(token)); + } + + /// Close the connection + coro::task close() { + state_ = client_state::closed; + + if (std::holds_alternative(stream_)) { + co_await std::get(stream_).shutdown(); + } + stream_ = std::monostate{}; + } + + /// Get TLS context for configuration + tls::tls_context& tls_context() noexcept { return tls_ctx_; } + + /// Get configuration + client_config& config() noexcept { return config_; } + const client_config& config() const noexcept { return config_; } + +private: + /// Internal connect implementation + coro::task connect_impl(std::string_view url_str, coro::cancel_token token) { + // Check if already cancelled + if (token.is_cancelled()) { + co_return false; + } + + // Parse URL + auto parsed = url::parse(url_str); + if (!parsed) { + ELIO_LOG_ERROR("Invalid SSE URL: {}", url_str); + co_return false; + } + + url_ = *parsed; + token_ = std::move(token); + state_ = client_state::connecting; + + co_return co_await do_connect(); + } + + /// Internal receive implementation + coro::task> receive_impl(coro::cancel_token token) { + // Use passed token or stored token + auto& active_token = token.is_cancelled() ? token_ : token; + while (state_ == client_state::connected) { + // Check for cancellation + if (active_token.is_cancelled()) { + co_return std::nullopt; + } + // Check for already-parsed events if (parser_.has_event()) { auto evt = parser_.get_event(); @@ -273,6 +331,12 @@ class sse_client { ELIO_LOG_ERROR("SSE read error: {}", strerror(-result.result)); } + // Check cancellation before reconnect + if (active_token.is_cancelled()) { + state_ = client_state::disconnected; + co_return std::nullopt; + } + // Handle reconnection if (config_.auto_reconnect && state_ != client_state::closed) { state_ = client_state::reconnecting; @@ -292,25 +356,7 @@ class sse_client { co_return std::nullopt; } - - /// Close the connection - coro::task close() { - state_ = client_state::closed; - - if (std::holds_alternative(stream_)) { - co_await std::get(stream_).shutdown(); - } - stream_ = std::monostate{}; - } - - /// Get TLS context for configuration - tls::tls_context& tls_context() noexcept { return tls_ctx_; } - - /// Get configuration - client_config& config() noexcept { return config_; } - const client_config& config() const noexcept { return config_; } - -private: + coro::task do_connect() { ELIO_LOG_DEBUG("Connecting to SSE endpoint {}:{}{}", url_.host, url_.effective_port(), url_.path); @@ -448,6 +494,11 @@ class sse_client { size_t attempts = 0; while (state_ == client_state::reconnecting) { + // Check for cancellation + if (token_.is_cancelled()) { + co_return false; + } + ++attempts; if (config_.max_reconnect_attempts > 0 && @@ -458,8 +509,12 @@ class sse_client { ELIO_LOG_DEBUG("SSE reconnecting (attempt {}) in {}ms...", attempts, retry_ms); - // Wait before reconnecting - co_await elio::time::sleep_for(std::chrono::milliseconds(retry_ms)); + // Wait before reconnecting (cancellable) + auto result = co_await elio::time::sleep_for( + std::chrono::milliseconds(retry_ms), token_); + if (result == coro::cancel_result::cancelled) { + co_return false; + } if (state_ != client_state::reconnecting) { co_return false; @@ -502,6 +557,7 @@ class sse_client { client_config config_; tls::tls_context tls_ctx_; stream_type stream_; + coro::cancel_token token_; ///< Cancellation token for connection url url_; std::string last_event_id_; @@ -521,4 +577,16 @@ sse_connect(io::io_context& io_ctx, std::string_view url, client_config config = co_return client; } +/// Convenience function for one-off SSE connection with cancellation support +inline coro::task> +sse_connect(io::io_context& io_ctx, std::string_view url, coro::cancel_token token, + client_config config = {}) { + auto client = std::make_optional(io_ctx, config); + bool success = co_await client->connect(url, std::move(token)); + if (!success) { + co_return std::nullopt; + } + co_return client; +} + } // namespace elio::http::sse diff --git a/include/elio/http/websocket_client.hpp b/include/elio/http/websocket_client.hpp index 36b7a5a..f7196b9 100644 --- a/include/elio/http/websocket_client.hpp +++ b/include/elio/http/websocket_client.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -77,49 +78,15 @@ class ws_client { /// @param url WebSocket URL (ws:// or wss://) /// @return true on success, false on failure coro::task connect(std::string_view url_str) { - // Parse URL - auto parsed = parse_ws_url(url_str); - if (!parsed) { - ELIO_LOG_ERROR("Invalid WebSocket URL: {}", url_str); - co_return false; - } - - host_ = parsed->host; - path_ = parsed->path.empty() ? "/" : parsed->path; - secure_ = parsed->secure; - - uint16_t port = parsed->port; - if (port == 0) { - port = secure_ ? 443 : 80; - } - - ELIO_LOG_DEBUG("Connecting to WebSocket server {}:{}{}", host_, port, path_); - - // Establish TCP connection - if (secure_) { - auto result = co_await tls::tls_connect(tls_ctx_, *io_ctx_, host_, port); - if (!result) { - ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host_, port, strerror(errno)); - co_return false; - } - stream_ = std::move(*result); - } else { - auto result = co_await net::tcp_connect(*io_ctx_, host_, port); - if (!result) { - ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host_, port, strerror(errno)); - co_return false; - } - stream_ = std::move(*result); - } - - // Perform WebSocket handshake - bool success = co_await perform_handshake(); - if (success) { - state_ = connection_state::open; - ELIO_LOG_DEBUG("WebSocket connected to {}{}", host_, path_); - } - - co_return success; + return connect_impl(url_str, coro::cancel_token{}); + } + + /// Connect to a WebSocket server with cancellation support + /// @param url WebSocket URL (ws:// or wss://) + /// @param token Cancellation token + /// @return true on success, false on failure + coro::task connect(std::string_view url_str, coro::cancel_token token) { + return connect_impl(url_str, std::move(token)); } /// Get connection state @@ -193,7 +160,25 @@ class ws_client { /// Receive next message (blocks until message available or connection closed) coro::task> receive() { + return receive_impl(coro::cancel_token{}); + } + + /// Receive next message with cancellation support + /// @param token Cancellation token + /// @return Message on success, std::nullopt on close/error/cancel + coro::task> receive(coro::cancel_token token) { + return receive_impl(std::move(token)); + } + +private: + /// Internal receive implementation + coro::task> receive_impl(coro::cancel_token token) { while (state_ == connection_state::open || state_ == connection_state::closing) { + // Check for cancellation + if (token.is_cancelled()) { + co_return std::nullopt; + } + // Check for already-parsed messages if (parser_.has_message()) { co_return parser_.get_message(); @@ -240,6 +225,71 @@ class ws_client { co_return std::nullopt; } + /// Internal connect implementation + coro::task connect_impl(std::string_view url_str, coro::cancel_token token) { + // Check if already cancelled + if (token.is_cancelled()) { + co_return false; + } + + // Parse URL + auto parsed = parse_ws_url(url_str); + if (!parsed) { + ELIO_LOG_ERROR("Invalid WebSocket URL: {}", url_str); + co_return false; + } + + host_ = parsed->host; + path_ = parsed->path.empty() ? "/" : parsed->path; + secure_ = parsed->secure; + + uint16_t port = parsed->port; + if (port == 0) { + port = secure_ ? 443 : 80; + } + + ELIO_LOG_DEBUG("Connecting to WebSocket server {}:{}{}", host_, port, path_); + + // Check cancellation before connection + if (token.is_cancelled()) { + co_return false; + } + + // Establish TCP connection + if (secure_) { + auto result = co_await tls::tls_connect(tls_ctx_, *io_ctx_, host_, port); + if (!result) { + ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host_, port, strerror(errno)); + co_return false; + } + stream_ = std::move(*result); + } else { + auto result = co_await net::tcp_connect(*io_ctx_, host_, port); + if (!result) { + ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host_, port, strerror(errno)); + co_return false; + } + stream_ = std::move(*result); + } + + // Check cancellation before handshake + if (token.is_cancelled()) { + stream_ = std::monostate{}; + co_return false; + } + + // Perform WebSocket handshake + bool success = co_await perform_handshake(); + if (success) { + state_ = connection_state::open; + ELIO_LOG_DEBUG("WebSocket connected to {}{}", host_, path_); + } + + co_return success; + } + +public: + /// Get TLS context for configuration tls::tls_context& tls_context() noexcept { return tls_ctx_; } diff --git a/include/elio/io/epoll_backend.hpp b/include/elio/io/epoll_backend.hpp index bf7d754..4992de2 100644 --- a/include/elio/io/epoll_backend.hpp +++ b/include/elio/io/epoll_backend.hpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace elio::io { @@ -66,6 +67,16 @@ class epoll_backend : public io_backend { } } fd_states_.clear(); + + // Cancel all pending timers + while (!timer_queue_.empty()) { + auto& entry = timer_queue_.top(); + if (entry.awaiter && !entry.awaiter.done()) { + last_result_ = io_result{-ECANCELED, 0}; + deferred_resumes.push_back(entry.awaiter); + } + timer_queue_.pop(); + } } // mutex released here // Resume outside lock to prevent deadlock @@ -117,11 +128,18 @@ class epoll_backend : public io_backend { op.synchronous = true; break; - case io_op::timeout: - // Timeouts are handled separately - op.is_timeout = true; - op.timeout_ns = static_cast(req.length); - break; + case io_op::timeout: { + // Use timer queue for timeout operations + int64_t timeout_ns = static_cast(req.length); + auto deadline = std::chrono::steady_clock::now() + + std::chrono::nanoseconds(timeout_ns); + + timer_queue_.push(timer_entry{deadline, req.awaiter}); + pending_count_++; + + ELIO_LOG_DEBUG("Prepared timeout: {}ns", timeout_ns); + return true; + } case io_op::cancel: case io_op::none: @@ -131,10 +149,9 @@ class epoll_backend : public io_backend { // Get or create fd state auto& state = fd_states_[req.fd]; bool is_sync = op.synchronous; - bool is_timo = op.is_timeout; state.pending_ops.push_back(std::move(op)); - if (!is_sync && !is_timo) { + if (!is_sync) { // Register with epoll state.events |= events; @@ -197,6 +214,24 @@ class epoll_backend : public io_backend { timeout_ms = -1; // Block indefinitely } + // Adjust timeout based on earliest timer deadline + { + std::lock_guard lock(mutex_); + if (!timer_queue_.empty()) { + auto now = std::chrono::steady_clock::now(); + auto earliest = timer_queue_.top().deadline; + if (earliest <= now) { + timeout_ms = 0; // Timer already expired + } else { + auto timer_timeout = std::chrono::duration_cast( + earliest - now).count(); + if (timeout_ms < 0 || timer_timeout < timeout_ms) { + timeout_ms = static_cast(timer_timeout); + } + } + } + } + int nfds = epoll_wait(epoll_fd_, events_.data(), static_cast(events_.size()), timeout_ms); @@ -217,6 +252,25 @@ class epoll_backend : public io_backend { { std::lock_guard lock(mutex_); + // Process expired timers + auto now = std::chrono::steady_clock::now(); + while (!timer_queue_.empty() && timer_queue_.top().deadline <= now) { + auto entry = timer_queue_.top(); + timer_queue_.pop(); + + last_result_ = io_result{0, 0}; // Timeout completed successfully + + if (entry.awaiter && !entry.awaiter.done()) { + deferred_resumes.push_back(entry.awaiter); + } + + pending_count_--; + completions++; + + ELIO_LOG_DEBUG("Timer expired"); + } + + // Process epoll events for (int i = 0; i < nfds; ++i) { int fd = events_[i].data.fd; uint32_t revents = events_[i].events; @@ -300,6 +354,7 @@ class epoll_backend : public io_backend { { std::lock_guard lock(mutex_); + // Search in fd_states first for (auto& [fd, state] : fd_states_) { auto it = std::find_if(state.pending_ops.begin(), state.pending_ops.end(), @@ -315,9 +370,40 @@ class epoll_backend : public io_backend { } state.pending_ops.erase(it); pending_count_--; - break; + + // Cleanup if no more pending ops + if (state.pending_ops.empty() && state.registered) { + epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); + state.registered = false; + state.events = 0; + } + goto found; + } + } + + // Search in timer queue - need to rebuild queue without the cancelled entry + if (!timer_queue_.empty()) { + std::vector remaining; + while (!timer_queue_.empty()) { + auto entry = timer_queue_.top(); + timer_queue_.pop(); + if (entry.awaiter.address() == user_data) { + last_result_ = io_result{-ECANCELED, 0}; + if (entry.awaiter && !entry.awaiter.done()) { + to_resume = entry.awaiter; + } + pending_count_--; + // Don't add back to remaining + } else { + remaining.push_back(entry); + } + } + // Rebuild queue + for (auto& e : remaining) { + timer_queue_.push(std::move(e)); } } + found:; } // mutex released here // Resume outside lock to prevent deadlock @@ -344,8 +430,6 @@ class epoll_backend : public io_backend { io_request req; std::coroutine_handle<> awaiter; bool synchronous = false; - bool is_timeout = false; - int64_t timeout_ns = 0; }; struct fd_state { @@ -354,6 +438,22 @@ class epoll_backend : public io_backend { bool registered = false; }; + /// Timer entry for the timer queue + struct timer_entry { + std::chrono::steady_clock::time_point deadline; + std::coroutine_handle<> awaiter; + + /// Comparison for min-heap (earliest deadline first) + bool operator>(const timer_entry& other) const { + return deadline > other.deadline; + } + }; + + /// Min-heap priority queue for timers + using timer_queue_t = std::priority_queue, + std::greater>; + void execute_sync_op(pending_operation& op) { int result = 0; @@ -506,8 +606,9 @@ class epoll_backend : public io_backend { int epoll_fd_ = -1; ///< epoll file descriptor std::vector events_; ///< Event buffer for epoll_wait std::unordered_map fd_states_; ///< Per-fd state + timer_queue_t timer_queue_; ///< Timer queue for timeouts size_t pending_count_ = 0; ///< Number of pending operations - mutable std::mutex mutex_; ///< Protects fd_states_ + mutable std::mutex mutex_; ///< Protects fd_states_ and timer_queue_ static inline thread_local io_result last_result_{}; }; diff --git a/include/elio/rpc/rpc_client.hpp b/include/elio/rpc/rpc_client.hpp index b472c76..8e1c44e 100644 --- a/include/elio/rpc/rpc_client.hpp +++ b/include/elio/rpc/rpc_client.hpp @@ -21,6 +21,7 @@ #include "rpc_protocol.hpp" #include +#include #include #include #include @@ -152,9 +153,53 @@ class rpc_client : public std::enable_shared_from_this> { coro::task> call( const typename Method::request_type& request, std::chrono::duration timeout) + { + return call_impl(request, timeout, coro::cancel_token{}); + } + + /// Make an RPC call with cancellation support + /// @tparam Method The method descriptor type + /// @param request The request object + /// @param token Cancellation token + /// @return Result containing response or error (rpc_error::cancelled if cancelled) + template + coro::task> call( + const typename Method::request_type& request, + coro::cancel_token token) + { + return call_impl(request, std::chrono::milliseconds(default_timeout_ms), std::move(token)); + } + + /// Make an RPC call with timeout and cancellation support + /// @tparam Method The method descriptor type + /// @param request The request object + /// @param timeout Per-call timeout duration + /// @param token Cancellation token + /// @return Result containing response or error + template + coro::task> call( + const typename Method::request_type& request, + std::chrono::duration timeout, + coro::cancel_token token) + { + return call_impl(request, timeout, std::move(token)); + } + +private: + /// Internal implementation of call with cancellation support + template + coro::task> call_impl( + const typename Method::request_type& request, + std::chrono::duration timeout, + coro::cancel_token token) { using Response = typename Method::response_type; + // Check if already cancelled + if (token.is_cancelled()) { + co_return rpc_result(rpc_error::cancelled); + } + if (!is_connected()) { co_return rpc_result(rpc_error::connection_closed); } @@ -168,6 +213,16 @@ class rpc_client : public std::enable_shared_from_this> { pending_requests_[request_id] = pending; } + // Register cancellation callback + auto cancel_registration = token.on_cancel([this, pending, request_id]() { + std::lock_guard lock(pending_mutex_); + if (!pending->completed) { + pending->error = rpc_error::cancelled; + pending->completed = true; + pending->completion_event.set(); + } + }); + // Build and send request auto timeout_ms = static_cast( std::chrono::duration_cast(timeout).count()); @@ -190,12 +245,14 @@ class rpc_client : public std::enable_shared_from_this> { auto self = this->shared_from_this(); auto timeout_task = [](ptr client, std::chrono::milliseconds ms, - std::shared_ptr pending) + std::shared_ptr pending, + coro::cancel_token tok) -> coro::task { - co_await time::sleep_for(ms); + auto result = co_await time::sleep_for(ms, tok); - if (!pending->completed) { + // Only timeout if sleep completed normally (not cancelled) + if (result == coro::cancel_result::completed && !pending->completed) { std::lock_guard lock(client->pending_mutex_); if (!pending->completed) { pending->timed_out = true; @@ -210,13 +267,16 @@ class rpc_client : public std::enable_shared_from_this> { auto* sched = runtime::scheduler::current(); if (sched) { auto task = timeout_task(self, - std::chrono::duration_cast(timeout), pending); + std::chrono::duration_cast(timeout), pending, token); sched->spawn(task.release()); } - // Wait for completion (either response or timeout) + // Wait for completion (either response, timeout, or cancellation) co_await pending->completion_event.wait(); + // Unregister cancellation callback + cancel_registration.unregister(); + // Remove from pending { std::lock_guard lock(pending_mutex_); @@ -245,6 +305,7 @@ class rpc_client : public std::enable_shared_from_this> { } } +public: /// Send a one-way message (no response expected) template coro::task send_oneway(const typename Method::request_type& request) { diff --git a/include/elio/time/timer.hpp b/include/elio/time/timer.hpp index 90d1fc5..1967bab 100644 --- a/include/elio/time/timer.hpp +++ b/include/elio/time/timer.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -73,6 +74,127 @@ class sleep_awaitable { int64_t duration_ns_; }; +/// Awaitable for cancellable sleep operations +/// Returns cancel_result indicating if sleep completed or was cancelled +class cancellable_sleep_awaitable { +public: + using cancel_result = coro::cancel_result; + + /// Construct with duration and cancel token + template + cancellable_sleep_awaitable(std::chrono::duration duration, + coro::cancel_token token) + : duration_ns_(std::chrono::duration_cast(duration).count()) + , token_(std::move(token)) {} + + /// Construct with io_context, duration, and cancel token + template + cancellable_sleep_awaitable(io::io_context& ctx, + std::chrono::duration duration, + coro::cancel_token token) + : ctx_(&ctx) + , duration_ns_(std::chrono::duration_cast(duration).count()) + , token_(std::move(token)) {} + + bool await_ready() const noexcept { + // Complete immediately if already cancelled or duration <= 0 + return token_.is_cancelled() || duration_ns_ <= 0; + } + + void await_suspend(std::coroutine_handle<> awaiter) { + awaiter_ = awaiter; + + // Check if already cancelled before setting up + if (token_.is_cancelled()) { + cancelled_ = true; + awaiter.resume(); + return; + } + + // Get io_context from scheduler or use provided one + io::io_context* ctx = ctx_; + if (!ctx) { + auto* sched = runtime::scheduler::current(); + if (sched) { + ctx = sched->get_io_context(); + } + } + + if (!ctx) { + // No io_context available - check cancellation in a loop + ELIO_LOG_DEBUG("cancellable_sleep: no io_context, using polling sleep"); + auto end_time = std::chrono::steady_clock::now() + + std::chrono::nanoseconds(duration_ns_); + while (std::chrono::steady_clock::now() < end_time) { + if (token_.is_cancelled()) { + cancelled_ = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + awaiter.resume(); + return; + } + + // Register cancellation callback before setting up the timer + // The callback will cancel the pending timeout operation + cancel_registration_ = token_.on_cancel([this, ctx]() { + cancelled_ = true; + // Cancel the pending timeout operation + ctx->cancel(awaiter_.address()); + }); + + // Check again after registration (in case cancelled between check and register) + if (token_.is_cancelled()) { + cancel_registration_.unregister(); + cancelled_ = true; + awaiter.resume(); + return; + } + + // Use io_context timeout mechanism + io::io_request req{}; + req.op = io::io_op::timeout; + req.length = static_cast(duration_ns_); + req.awaiter = awaiter; + + if (!ctx->prepare(req)) { + cancel_registration_.unregister(); + // Failed to prepare, fall back to polling sleep + ELIO_LOG_WARNING("cancellable_sleep: failed to prepare timeout, using polling sleep"); + auto end_time = std::chrono::steady_clock::now() + + std::chrono::nanoseconds(duration_ns_); + while (std::chrono::steady_clock::now() < end_time) { + if (token_.is_cancelled()) { + cancelled_ = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + awaiter.resume(); + return; + } + + ctx->submit(); + } + + cancel_result await_resume() noexcept { + // Unregister callback to prevent use-after-free + cancel_registration_.unregister(); + // Check both the flag and the token state (for await_ready() early return case) + return (cancelled_ || token_.is_cancelled()) ? cancel_result::cancelled + : cancel_result::completed; + } + +private: + io::io_context* ctx_ = nullptr; + int64_t duration_ns_; + coro::cancel_token token_; + coro::cancel_token::registration cancel_registration_; + std::coroutine_handle<> awaiter_; + bool cancelled_ = false; +}; + /// Sleep for a duration /// @param duration Duration to sleep /// @return Awaitable that completes after the duration @@ -87,6 +209,22 @@ inline auto sleep_for(io::io_context& ctx, std::chrono::duration du return sleep_awaitable(ctx, duration); } +/// Sleep for a duration with cancellation support +/// @param duration Duration to sleep +/// @param token Cancellation token - sleep returns early if cancelled +/// @return Awaitable that returns cancel_result::completed or cancel_result::cancelled +template +inline auto sleep_for(std::chrono::duration duration, coro::cancel_token token) { + return cancellable_sleep_awaitable(duration, std::move(token)); +} + +/// Sleep for a duration with cancellation support using a specific io_context +template +inline auto sleep_for(io::io_context& ctx, std::chrono::duration duration, + coro::cancel_token token) { + return cancellable_sleep_awaitable(ctx, duration, std::move(token)); +} + /// Sleep until a time point /// @param time_point Time point to sleep until /// @return Awaitable that completes at the time point diff --git a/tests/unit/test_timer.cpp b/tests/unit/test_timer.cpp index ff170d6..dee502f 100644 --- a/tests/unit/test_timer.cpp +++ b/tests/unit/test_timer.cpp @@ -171,3 +171,164 @@ TEST_CASE("sleep_until past time", "[time][sleep]") { REQUIRE(completed); } + +TEST_CASE("cancellable sleep - normal completion", "[time][sleep][cancel]") { + std::atomic completed{false}; + std::atomic result_value{-1}; + + cancel_source source; + + auto sleep_task = [&]() -> task { + auto result = co_await sleep_for(50ms, source.get_token()); + result_value = (result == cancel_result::completed) ? 1 : 0; + completed = true; + }; + + scheduler sched(1); + sched.start(); + + { + auto t = sleep_task(); + sched.spawn(t.release()); + } + + // Wait for completion without cancelling + for (int i = 0; i < 100 && !completed; ++i) { + std::this_thread::sleep_for(10ms); + } + + sched.shutdown(); + + REQUIRE(completed); + REQUIRE(result_value == 1); // Should be completed, not cancelled +} + +TEST_CASE("cancellable sleep - cancelled early", "[time][sleep][cancel]") { + std::atomic completed{false}; + std::atomic result_value{-1}; + + cancel_source source; + + auto sleep_task = [&]() -> task { + auto start = std::chrono::steady_clock::now(); + auto result = co_await sleep_for(500ms, source.get_token()); + auto elapsed = std::chrono::steady_clock::now() - start; + + result_value = (result == cancel_result::cancelled) ? 1 : 0; + completed = true; + + // Should have been cancelled early, not waited full 500ms + REQUIRE(elapsed < 400ms); + }; + + scheduler sched(1); + sched.start(); + + { + auto t = sleep_task(); + sched.spawn(t.release()); + } + + // Wait a bit then cancel + std::this_thread::sleep_for(50ms); + source.cancel(); + + // Wait for completion + for (int i = 0; i < 100 && !completed; ++i) { + std::this_thread::sleep_for(10ms); + } + + sched.shutdown(); + + REQUIRE(completed); + REQUIRE(result_value == 1); // Should be cancelled +} + +TEST_CASE("cancellable sleep - already cancelled token", "[time][sleep][cancel]") { + std::atomic completed{false}; + std::atomic result_value{-1}; + + cancel_source source; + source.cancel(); // Cancel before sleep starts + + auto sleep_task = [&]() -> task { + auto start = std::chrono::steady_clock::now(); + auto result = co_await sleep_for(500ms, source.get_token()); + auto elapsed = std::chrono::steady_clock::now() - start; + + result_value = (result == cancel_result::cancelled) ? 1 : 0; + completed = true; + + // Should complete immediately since already cancelled + REQUIRE(elapsed < 50ms); + }; + + scheduler sched(1); + sched.start(); + + { + auto t = sleep_task(); + sched.spawn(t.release()); + } + + // Wait for completion + for (int i = 0; i < 50 && !completed; ++i) { + std::this_thread::sleep_for(10ms); + } + + sched.shutdown(); + + REQUIRE(completed); + REQUIRE(result_value == 1); // Should be cancelled +} + +TEST_CASE("cancel_token basic operations", "[time][cancel]") { + cancel_source source; + auto token = source.get_token(); + + REQUIRE_FALSE(token.is_cancelled()); + REQUIRE_FALSE(source.is_cancelled()); + REQUIRE(static_cast(token)); // token is truthy when not cancelled + + source.cancel(); + + REQUIRE(token.is_cancelled()); + REQUIRE(source.is_cancelled()); + REQUIRE_FALSE(static_cast(token)); // token is falsy when cancelled +} + +TEST_CASE("cancel_token callback invocation", "[time][cancel]") { + cancel_source source; + auto token = source.get_token(); + + std::atomic callback_count{0}; + + { + auto reg1 = token.on_cancel([&]() { callback_count++; }); + auto reg2 = token.on_cancel([&]() { callback_count++; }); + + REQUIRE(callback_count == 0); + + source.cancel(); + + REQUIRE(callback_count == 2); + } + + // Registering after cancellation should invoke immediately + auto reg3 = token.on_cancel([&]() { callback_count++; }); + REQUIRE(callback_count == 3); +} + +TEST_CASE("cancel_token registration unregister", "[time][cancel]") { + cancel_source source; + auto token = source.get_token(); + + std::atomic callback_count{0}; + + auto reg = token.on_cancel([&]() { callback_count++; }); + reg.unregister(); // Unregister before cancel + + source.cancel(); + + REQUIRE(callback_count == 0); // Callback should not have been invoked +} diff --git a/wiki/API-Reference.md b/wiki/API-Reference.md index 4713914..826a0f1 100644 --- a/wiki/API-Reference.md +++ b/wiki/API-Reference.md @@ -116,6 +116,96 @@ coro::task main_task() { } ``` +### `cancel_token` and `cancel_source` + +Cooperative cancellation mechanism for async operations. + +```cpp +namespace elio::coro { + +/// Result of a cancellable operation +enum class cancel_result { + completed, ///< Operation completed normally + cancelled ///< Operation was cancelled +}; + +/// A token that can be checked for cancellation +class cancel_token { +public: + using registration = cancel_registration; + + cancel_token() = default; // Empty token (never cancelled) + + // Check if cancellation has been requested + bool is_cancelled() const noexcept; + + // Implicit bool conversion (true if NOT cancelled) + explicit operator bool() const noexcept; + + // Register a callback for cancellation + template + [[nodiscard]] registration on_cancel(F&& callback) const; + + // Register a coroutine to resume on cancellation + [[nodiscard]] registration on_cancel_resume(std::coroutine_handle<> h) const; +}; + +/// Source for creating cancel tokens and triggering cancellation +class cancel_source { +public: + cancel_source(); // Create new cancellation state + + // Get a token to pass to cancellable operations + cancel_token get_token() const noexcept; + + // Request cancellation (invokes all callbacks) + void cancel(); + + // Check if cancelled + bool is_cancelled() const noexcept; +}; + +} // namespace elio::coro +``` + +**Basic Example:** +```cpp +task cancellable_work(cancel_token token) { + while (!token.is_cancelled()) { + // Do some work... + + // Cancellable sleep + auto result = co_await time::sleep_for(100ms, token); + if (result == cancel_result::cancelled) { + break; // Exit early + } + } +} + +task controller() { + cancel_source source; + + // Start work with token + cancellable_work(source.get_token()).go(); + + // Later, cancel + co_await time::sleep_for(5s); + source.cancel(); +} +``` + +**Supported Cancellable Operations:** + +| Operation | Usage | +|-----------|-------| +| `time::sleep_for()` | `co_await sleep_for(duration, token)` | +| `rpc_client::call()` | `co_await client->call(req, timeout, token)` | +| `http::client::get()` | `co_await client.get(url, token)` | +| `websocket::ws_client::connect()` | `co_await client.connect(url, token)` | +| `websocket::ws_client::receive()` | `co_await client.receive(token)` | +| `sse::sse_client::connect()` | `co_await client.connect(url, token)` | +| `sse::sse_client::receive()` | `co_await client.receive(token)` | + --- ## Runtime (`elio::runtime`) @@ -1022,13 +1112,39 @@ public: ## Timers (`elio::time`) ```cpp -// Sleep for duration (awaitable) -/* awaitable */ sleep(io_context& ctx, std::chrono::nanoseconds duration); +// Sleep for duration +template +/* awaitable */ sleep_for(std::chrono::duration duration); + +// Sleep for duration with cancellation support +// Returns cancel_result::completed or cancel_result::cancelled +template +/* awaitable */ sleep_for(std::chrono::duration duration, + coro::cancel_token token); -// Sleep until time point (awaitable) +// Sleep until time point template -/* awaitable */ sleep_until(io_context& ctx, - std::chrono::time_point tp); +/* awaitable */ sleep_until(std::chrono::time_point tp); + +// Yield execution to other coroutines +/* awaitable */ yield(); +``` + +**Example:** +```cpp +task example(coro::cancel_token token) { + // Simple sleep + co_await time::sleep_for(100ms); + + // Cancellable sleep + auto result = co_await time::sleep_for(5s, token); + if (result == coro::cancel_result::cancelled) { + // Cancelled early + } + + // Yield to other coroutines + co_await time::yield(); +} ``` --- diff --git a/wiki/Core-Concepts.md b/wiki/Core-Concepts.md index 1343cc1..c4d8052 100644 --- a/wiki/Core-Concepts.md +++ b/wiki/Core-Concepts.md @@ -334,6 +334,89 @@ coro::task limited_work() { } ``` +## Cancellation + +Elio provides a cooperative cancellation mechanism for async operations using `cancel_source` and `cancel_token`. + +### Basic Usage + +```cpp +#include + +coro::task cancellable_work(coro::cancel_token token) { + while (!token.is_cancelled()) { + // Do work... + + // Cancellable sleep - returns early if cancelled + auto result = co_await time::sleep_for(100ms, token); + if (result == coro::cancel_result::cancelled) { + break; + } + } + co_return; +} + +coro::task controller() { + coro::cancel_source source; + + // Start work with a token + cancellable_work(source.get_token()).go(); + + // Wait some time + co_await time::sleep_for(5s); + + // Cancel the operation + source.cancel(); + co_return; +} +``` + +### How It Works + +1. **`cancel_source`** - Creates and controls cancellation state +2. **`cancel_token`** - Lightweight handle passed to operations +3. Operations periodically check `token.is_cancelled()` +4. Calling `source.cancel()` triggers all registered callbacks + +### Cancellable Operations + +Many Elio operations support cancellation: + +```cpp +// Sleep with cancellation +auto result = co_await time::sleep_for(1s, token); + +// HTTP request with cancellation +auto response = co_await client.get(url, token); + +// RPC call with cancellation +auto result = co_await rpc_client->call(req, timeout, token); + +// WebSocket receive with cancellation +auto msg = co_await ws_client.receive(token); + +// SSE event receive with cancellation +auto event = co_await sse_client.receive(token); +``` + +### Implementing Cancellable Operations + +Register callbacks to respond to cancellation: + +```cpp +coro::task custom_operation(coro::cancel_token token) { + // Register a callback + auto reg = token.on_cancel([&]() { + // Cleanup or signal early exit + }); + + // Do work... + + // Registration automatically unregisters on destruction + co_return; +} +``` + ## Error Handling Elio uses `std::optional` for error handling in I/O operations. On failure, functions return `std::nullopt` and set `errno`: