From 18e47def0b0fe2878edcdef4d5d7508f5371cf85 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 15 Jan 2026 18:40:57 +0100 Subject: [PATCH 01/27] Initial impl --- include/boost/redis/connection.hpp | 75 ++++++++++++++-------- include/boost/redis/detail/receive_fsm.hpp | 57 ++++++++++++++++ include/boost/redis/impl/receive_fsm.ipp | 74 +++++++++++++++++++++ include/boost/redis/src.hpp | 1 + test/test_conn_push2.cpp | 6 +- 5 files changed, 185 insertions(+), 28 deletions(-) create mode 100644 include/boost/redis/detail/receive_fsm.hpp create mode 100644 include/boost/redis/impl/receive_fsm.ipp diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 1fcf2d6b..fcf4645a 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -234,35 +235,58 @@ struct connection_impl { return size; } +}; - template - auto async_receive2(CompletionToken&& token) +template +struct receive2_op { + connection_impl* conn_; + receive_fsm fsm_{}; + + void drain_receive_channel() { - // clang-format off - return - receive_channel_.async_receive( - asio::deferred( - [this](system::error_code ec, std::size_t) - { - if (!ec) { - auto f = [](system::error_code, std::size_t) { - // There is no point in checking for errors - // here since async_receive just completed - // without errors. - }; - - // We just want to drain the channel. - while (receive_channel_.try_receive(f)); - } - - return asio::deferred.values(ec); - } - ) - )(std::forward(token)); - // clang-format on + // TODO: review + auto f = [](system::error_code, std::size_t) { + // There is no point in checking for errors + // here since async_receive just completed + // without errors. + }; + + // We just want to drain the channel. + while (conn_->receive_channel_.try_receive(f)) + ; + } + + template + void operator()(Self& self, system::error_code ec = {}, std::size_t /* push_bytes */ = 0u) + { + receive_action act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled()); + + switch (act.type) { + case receive_action::action_type::setup_cancellation: + self.reset_cancellation_state(asio::enable_total_cancellation()); + (*this)(self); // this action does not require yielding + return; + case receive_action::action_type::wait: + conn_->receive_channel_.async_receive(std::move(self)); + return; + case receive_action::action_type::drain_channel: + drain_receive_channel(); + (*this)(self); // this action does not require yielding + return; + case receive_action::action_type::done: self.complete(act.ec); return; + } } }; +template +auto async_receive2(connection_impl& conn, CompletionToken&& token) +{ + return asio::async_compose( + receive2_op{&conn}, + token, + conn); +} + template struct exec_one_op { connection_impl* conn_; @@ -826,7 +850,7 @@ class basic_connection { template > auto async_receive2(CompletionToken&& token = {}) { - return impl_->async_receive2(std::forward(token)); + return detail::async_receive2(*impl_, std::forward(token)); } /** @brief (Deprecated) Receives server pushes synchronously without blocking. @@ -1219,6 +1243,7 @@ class connection { return impl_.async_receive(std::forward(token)); } + // TODO: might want to use type-erased handlers here /// @copydoc basic_connection::async_receive2 template auto async_receive2(CompletionToken&& token = {}) diff --git a/include/boost/redis/detail/receive_fsm.hpp b/include/boost/redis/detail/receive_fsm.hpp new file mode 100644 index 00000000..ca0b8bfe --- /dev/null +++ b/include/boost/redis/detail/receive_fsm.hpp @@ -0,0 +1,57 @@ +// +// Copyright (c) 2018-2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_RECEIVE_FSM_HPP +#define BOOST_REDIS_RECEIVE_FSM_HPP + +#include +#include + +// Sans-io algorithm for async_receive2, as a finite state machine + +namespace boost::redis::detail { + +struct connection_state; + +struct receive_action { + enum class action_type + { + setup_cancellation, // Set up the cancellation types supported by the composed operation + wait, // Wait for a message to appear in the receive channel + drain_channel, // Empty the receive channel + done, // Complete + }; + + action_type type; + system::error_code ec; + + receive_action(action_type type) noexcept + : type{type} + { } + + receive_action(system::error_code ec) noexcept + : type{action_type::done} + , ec{ec} + { } +}; + +class receive_fsm { + int resume_point_{0}; + +public: + receive_fsm() = default; + + receive_action resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/impl/receive_fsm.ipp b/include/boost/redis/impl/receive_fsm.ipp new file mode 100644 index 00000000..efb0d7cb --- /dev/null +++ b/include/boost/redis/impl/receive_fsm.ipp @@ -0,0 +1,74 @@ +// +// Copyright (c) 2018-2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include +#include +#include + +namespace boost::redis::detail { + +constexpr bool is_any_cancel(asio::cancellation_type_t type) +{ + return !!( + type & (asio::cancellation_type_t::terminal | asio::cancellation_type_t::partial | + asio::cancellation_type_t::total)); +} + +receive_action receive_fsm::resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // This operation supports total cancellation. Set it up + BOOST_REDIS_YIELD(resume_point_, 1, receive_action::action_type::setup_cancellation) + + while (true) { + // Wait at least once for a notification to arrive + BOOST_REDIS_YIELD(resume_point_, 2, receive_action::action_type::wait) + + // If the wait completed successfully, we have pushes. Drain the channel and exit + if (!ec) { + BOOST_REDIS_YIELD(resume_point_, 3, receive_action::action_type::drain_channel) + return system::error_code(); + } + + // Check for cancellations + if (is_any_cancel(cancel_state)) + return system::error_code(asio::error::operation_aborted); + + // If the channel was cancelled, it might be due to a reconnection. + // If the connection isn't reconnecting (run is exiting), exit, otherwise continue. + if (ec == asio::experimental::channel_errc::channel_cancelled) { + if (st.cfg.reconnect_wait_interval.count() == 0) { + // Won't reconnect + return system::error_code(asio::error::operation_aborted); + } else { + // Will reconnect, ignore the notification + continue; + } + } else { + // This is an unknown error. Propagate it, just in case + return ec; + } + } + } + + // We should never get here + BOOST_ASSERT(false); + return receive_action{system::error_code()}; +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 647e643d..196b2960 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 5c260f8c..ebe96f82 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -198,7 +198,7 @@ void test_push_adapter_error() // async_receive2 is cancelled every reconnection cycle conn.async_receive2([&](error_code ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); push_received = true; }); @@ -244,7 +244,7 @@ void test_push_adapter_error_reconnection() // async_receive2 is cancelled every reconnection cycle conn.async_receive2([&](error_code ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); push_received = true; }); @@ -287,7 +287,7 @@ void test_consecutive_receives() std::function launch_push_consumer = [&]() { conn.async_receive2([&](error_code ec) { if (ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); push_consumer_finished = true; resp.clear(); return; From f10e7b869b3854591ac43a9720ac65182f7c83a8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 15 Jan 2026 19:03:20 +0100 Subject: [PATCH 02/27] operation::receive docs --- include/boost/redis/operation.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index da087cc0..4ae9329e 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -52,7 +52,7 @@ enum class operation /// Refers to `connection::async_run` operations. run, - /// Refers to `connection::async_receive` operations. + /// Refers to @ref basic_connection::async_receive and @ref basic_connection::async_receive2 operations. receive, /** From 724f29058d10f037074644b4c8602abdd6dfcc76 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 15 Jan 2026 19:11:41 +0100 Subject: [PATCH 03/27] Update examples --- README.md | 10 ++++++---- doc/modules/ROOT/pages/index.adoc | 10 ++++++---- example/cpp20_chat_room.cpp | 11 +++++------ example/cpp20_subscriber.cpp | 12 +++++------- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 62a03896..5b71d904 100644 --- a/README.md +++ b/README.md @@ -106,15 +106,17 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored // in resp. If the connection encounters a network error and reconnects to the server, // it will automatically subscribe to 'mychannel' again. This is transparent to the user. + // You need to use specialized request::subscribe() function (instead of request::push) + // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index b7194d1b..f8c6e191 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -117,15 +117,17 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored // in resp. If the connection encounters a network error and reconnects to the server, // it will automatically subscribe to 'mychannel' again. This is transparent to the user. + // You need to use specialized request::subscribe() function (instead of request::push) + // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index 4dd8180d..2edb5cea 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -6,12 +6,12 @@ #include +#include #include #include #include #include #include -#include #include #include @@ -30,7 +30,6 @@ using boost::asio::co_spawn; using boost::asio::consign; using boost::asio::detached; using boost::asio::dynamic_buffer; -using boost::asio::redirect_error; using boost::redis::config; using boost::redis::connection; using boost::redis::generic_flat_response; @@ -61,13 +60,13 @@ auto receiver(std::shared_ptr conn) -> awaitable req.subscribe({"channel"}); co_await conn->async_exec(req); - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index b3eb9cdd..b8802b03 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -6,14 +6,12 @@ #include +#include #include #include #include #include -#include -#include #include -#include #include @@ -62,13 +60,13 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } From 52f537eb27deabbe9e516b0d1cb7611767be23bb Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Thu, 15 Jan 2026 19:37:07 +0100 Subject: [PATCH 04/27] Stronger cancellation guarantees --- include/boost/redis/connection.hpp | 16 +++++++-- .../boost/redis/detail/connection_state.hpp | 1 + include/boost/redis/detail/receive_fsm.hpp | 1 + include/boost/redis/error.hpp | 4 +++ include/boost/redis/impl/error.ipp | 3 ++ include/boost/redis/impl/receive_fsm.ipp | 35 +++++++++++-------- test/test_conn_push2.cpp | 12 ++++--- 7 files changed, 50 insertions(+), 22 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index fcf4645a..14b50971 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -155,7 +155,7 @@ struct connection_impl { { switch (op) { case operation::exec: st_.mpx.cancel_waiting(); break; - case operation::receive: receive_channel_.cancel(); break; + case operation::receive: cancel_receive(); break; case operation::reconnection: st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); break; @@ -166,7 +166,7 @@ struct connection_impl { case operation::health_check: cancel_run(); break; case operation::all: st_.mpx.cancel_waiting(); // exec - receive_channel_.cancel(); // receive + cancel_receive(); // receive st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect cancel_run(); // run break; @@ -174,6 +174,12 @@ struct connection_impl { } } + void cancel_receive() + { + st_.receive2_cancelled = true; + receive_channel_.cancel(); + } + void cancel_run() { // Individual operations should see a terminal cancellation, regardless @@ -185,7 +191,8 @@ struct connection_impl { stream_.cancel_resolve(); // Receive is technically not part of run, but we also cancel it for - // backwards compatibility. + // backwards compatibility. Note that this intentionally doesn't + // set the receive2_cancelled flag, so only v1 receive is cancelled. receive_channel_.cancel(); } @@ -273,6 +280,9 @@ struct receive2_op { drain_receive_channel(); (*this)(self); // this action does not require yielding return; + case receive_action::action_type::immediate: + asio::async_immediate(self.get_io_executor(), std::move(self)); + return; case receive_action::action_type::done: self.complete(act.ec); return; } } diff --git a/include/boost/redis/detail/connection_state.hpp b/include/boost/redis/detail/connection_state.hpp index 75ddf177..e8e09106 100644 --- a/include/boost/redis/detail/connection_state.hpp +++ b/include/boost/redis/detail/connection_state.hpp @@ -51,6 +51,7 @@ struct connection_state { request setup_req{}; request ping_req{}; subscription_tracker tracker{}; + bool receive2_running{false}, receive2_cancelled{false}; // Sentinel stuff lazy_random_engine eng{}; diff --git a/include/boost/redis/detail/receive_fsm.hpp b/include/boost/redis/detail/receive_fsm.hpp index ca0b8bfe..16989dce 100644 --- a/include/boost/redis/detail/receive_fsm.hpp +++ b/include/boost/redis/detail/receive_fsm.hpp @@ -24,6 +24,7 @@ struct receive_action { setup_cancellation, // Set up the cancellation types supported by the composed operation wait, // Wait for a message to appear in the receive channel drain_channel, // Empty the receive channel + immediate, // Call async_immediate done, // Complete }; diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 346e3ad1..55356635 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -112,6 +112,10 @@ enum class error /// Expects a RESP3 array, but got a different data type. expects_resp3_array, + + /// A @ref basic_connection::async_receive2 operation is already running. + /// Only one of such operations might be running at any point in time. + already_running, }; /** diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index f7071506..04bfbdc2 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -68,6 +68,9 @@ struct error_category_impl : system::error_category { return "Expects a RESP3 string, but got a different data type."; case error::expects_resp3_array: return "Expects a RESP3 array, but got a different data type."; + case error::already_running: + return "An async_receive2 operation is already running. Only one of such operations " + "might be running at any point in time."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/receive_fsm.ipp b/include/boost/redis/impl/receive_fsm.ipp index efb0d7cb..46428a78 100644 --- a/include/boost/redis/impl/receive_fsm.ipp +++ b/include/boost/redis/impl/receive_fsm.ipp @@ -9,11 +9,11 @@ #include #include #include +#include #include #include #include -#include namespace boost::redis::detail { @@ -32,6 +32,16 @@ receive_action receive_fsm::resume( switch (resume_point_) { BOOST_REDIS_CORO_INITIAL + // Parallel async_receive2 operations not supported + if (st.receive2_running) { + BOOST_REDIS_YIELD(resume_point_, 4, receive_action::action_type::immediate) + return system::error_code(error::already_running); + } + + // We're now running. Discard any previous cancellation state + st.receive2_running = true; + st.receive2_cancelled = false; + // This operation supports total cancellation. Set it up BOOST_REDIS_YIELD(resume_point_, 1, receive_action::action_type::setup_cancellation) @@ -42,27 +52,24 @@ receive_action receive_fsm::resume( // If the wait completed successfully, we have pushes. Drain the channel and exit if (!ec) { BOOST_REDIS_YIELD(resume_point_, 3, receive_action::action_type::drain_channel) + st.receive2_running = false; return system::error_code(); } // Check for cancellations - if (is_any_cancel(cancel_state)) + if (is_any_cancel(cancel_state) || st.receive2_cancelled) { + st.receive2_running = false; return system::error_code(asio::error::operation_aborted); + } - // If the channel was cancelled, it might be due to a reconnection. - // If the connection isn't reconnecting (run is exiting), exit, otherwise continue. - if (ec == asio::experimental::channel_errc::channel_cancelled) { - if (st.cfg.reconnect_wait_interval.count() == 0) { - // Won't reconnect - return system::error_code(asio::error::operation_aborted); - } else { - // Will reconnect, ignore the notification - continue; - } - } else { - // This is an unknown error. Propagate it, just in case + // If we get any unknown errors, propagate them (shouldn't happen, but just in case) + if (ec != asio::experimental::channel_errc::channel_cancelled) { + st.receive2_running = false; return ec; } + + // The channel was cancelled and no cancellation state is set. + // This is due to a reconnection. Ignore the notification } } diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index ebe96f82..addbd957 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -194,12 +195,12 @@ void test_push_adapter_error() req.push("SUBSCRIBE", "channel"); req.push("PING"); - bool push_received = false, exec_finished = false, run_finished = false; + bool receive_finished = false, exec_finished = false, run_finished = false; - // async_receive2 is cancelled every reconnection cycle + // We cancel receive when run exits conn.async_receive2([&](error_code ec) { BOOST_TEST_EQ(ec, net::error::operation_aborted); - push_received = true; + receive_finished = true; }); // The request is cancelled because the PING response isn't processed @@ -211,13 +212,14 @@ void test_push_adapter_error() auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; // so we can validate the generated error - conn.async_run(cfg, [&run_finished](error_code ec) { + conn.async_run(cfg, [&](error_code ec) { BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; + conn.cancel(operation::receive); }); ioc.run_for(test_timeout); - BOOST_TEST(push_received); + BOOST_TEST(receive_finished); BOOST_TEST(exec_finished); BOOST_TEST(run_finished); } From b256e0f77432933d74bd016025cb182b99a57b7f Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 11:37:12 +0100 Subject: [PATCH 05/27] rationale --- include/boost/redis/impl/receive_fsm.ipp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/boost/redis/impl/receive_fsm.ipp b/include/boost/redis/impl/receive_fsm.ipp index 46428a78..d796b3bc 100644 --- a/include/boost/redis/impl/receive_fsm.ipp +++ b/include/boost/redis/impl/receive_fsm.ipp @@ -24,6 +24,10 @@ constexpr bool is_any_cancel(asio::cancellation_type_t type) asio::cancellation_type_t::total)); } +// We use the receive2_cancelled flag rather than will_reconnect() to +// avoid entanglement between async_run and async_receive2 cancellations. +// If we had used will_reconnect(), async_receive2 would be cancelled +// when disabling reconnection and async_run exits, and in an unpredictable fashion. receive_action receive_fsm::resume( connection_state& st, system::error_code ec, From b5dade36ffcc3a549a120d4ccaf99038ac2d8e05 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:05:54 +0100 Subject: [PATCH 06/27] Reference docs --- include/boost/redis/connection.hpp | 50 +++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 14b50971..a8916bbb 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -819,27 +819,56 @@ class basic_connection { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Wait for server pushes asynchronously + /** @brief Wait for server pushes asynchronously. * - * This function suspends until a server push is received by the + * This function suspends until at least one server push is received by the * connection. On completion an unspecified number of pushes will * have been added to the response object set with @ref - * boost::redis::connection::set_receive_response. + * set_receive_response. Use the functions in the response object + * to know how many messages they were received and consume them. * * To prevent receiving an unbound number of pushes the connection * blocks further read operations on the socket when 256 pushes * accumulate internally (we don't make any commitment to this * exact number). When that happens any `async_exec`s and * health-checks won't make any progress and the connection may - * eventually timeout. To avoid that Apps should call - * `async_receive2` continuously in a loop. - * - * @Note To avoid deadlocks the task (e.g. coroutine) calling + * eventually timeout. To avoid that apps that expect server pushes + * should call this function continuously in a loop. + * + * This function should be used instead of the deprecated @ref async_receive. + * It differs from `async_receive` in the following: + * + * @li `async_receive` is designed to consume a single push message at a time. + * This can be inefficient when receiving lots of server pushes. + * `async_receive2` is batch-oriented, and will only suspend once if + * several pushes are received in a single network packet. + * @li `async_receive` is cancelled when a reconnection happens (e.g. because + * of a network error). This enabled the user to re-establish subscriptions + * using @ref async_exec before waiting for pushes again. With the introduction of + * functions like @ref request::subscribe, subscriptions are automatically + * re-established on reconnection. Thus, `async_receive2` is not cancelled + * on reconnection. + * @li `async_receive` passes the number of bytes that each received + * push message contains. This information is unreliable and not very useful. + * Equivalent information is available using functions in the response object. + * @li `async_receive` might get cancelled if `async_run` is cancelled. + * This doesn't happen with `async_receive2`. + * + * This function does *not* remove messages from the response object + * passed to @ref set_receive_response - use the functions in the response + * object to achieve this. + * + * Only a single instance of `async_receive2` may be outstanding + * for a given connection at any time. Trying to start a second one + * will fail with @ref error::already_running. + * + * @note To avoid deadlocks the task (e.g. coroutine) calling * `async_receive2` should not call `async_exec` in a way where * they could block each other. * - * For an example see cpp20_subscriber.cpp. The completion token - * must have the following signature + * For an example see cpp20_subscriber.cpp. + * + * The completion token must have the following signature: * * @code * void f(system::error_code); @@ -852,9 +881,6 @@ class basic_connection { * @li `asio::cancellation_type_t::partial`. * @li `asio::cancellation_type_t::total`. * - * Calling `basic_connection::cancel(operation::receive)` will - * also cancel any ongoing receive operations. - * * @param token Completion token. */ template > From ecc8cfe3fbbab878caea834ce4fe6fee716bb452 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:08:56 +0100 Subject: [PATCH 07/27] Deprecate operation::receive --- include/boost/redis/operation.hpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index 4ae9329e..ca6d8cc4 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -52,7 +52,13 @@ enum class operation /// Refers to `connection::async_run` operations. run, - /// Refers to @ref basic_connection::async_receive and @ref basic_connection::async_receive2 operations. + /** + * @brief (Deprecated) Refers to @ref basic_connection::async_receive operations. + * + * Affects only the deprecated `async_receive`, and not `async_receive2`. + * To cancel `async_receive2`, either use @ref basic_connection::cancel with no arguments, + * or use per-operation cancellation. + */ receive, /** From 19d0c3261a64f025e13a2fd51df6ab2f2509bfe3 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:27:43 +0100 Subject: [PATCH 08/27] cancel restructure --- include/boost/redis/connection.hpp | 15 ++++++++------- include/boost/redis/operation.hpp | 7 +++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index a8916bbb..9ca1709b 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -155,7 +155,7 @@ struct connection_impl { { switch (op) { case operation::exec: st_.mpx.cancel_waiting(); break; - case operation::receive: cancel_receive(); break; + case operation::receive: cancel_receive_v2(); break; case operation::reconnection: st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); break; @@ -166,7 +166,7 @@ struct connection_impl { case operation::health_check: cancel_run(); break; case operation::all: st_.mpx.cancel_waiting(); // exec - cancel_receive(); // receive + cancel_receive_v2(); // receive st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect cancel_run(); // run break; @@ -174,10 +174,12 @@ struct connection_impl { } } - void cancel_receive() + void cancel_receive_v1() { receive_channel_.cancel(); } + + void cancel_receive_v2() { st_.receive2_cancelled = true; - receive_channel_.cancel(); + cancel_receive_v1(); } void cancel_run() @@ -191,9 +193,8 @@ struct connection_impl { stream_.cancel_resolve(); // Receive is technically not part of run, but we also cancel it for - // backwards compatibility. Note that this intentionally doesn't - // set the receive2_cancelled flag, so only v1 receive is cancelled. - receive_channel_.cancel(); + // backwards compatibility. Note that this intentionally affects v1 receive, only. + cancel_receive_v1(); } bool is_open() const noexcept { return stream_.is_open(); } diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index ca6d8cc4..9d05d1d7 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -53,11 +53,10 @@ enum class operation run, /** - * @brief (Deprecated) Refers to @ref basic_connection::async_receive operations. + * @brief (Deprecated) Refers to `async_receive` and `async_receive2` operations. * - * Affects only the deprecated `async_receive`, and not `async_receive2`. - * To cancel `async_receive2`, either use @ref basic_connection::cancel with no arguments, - * or use per-operation cancellation. + * To cancel `async_receive2`, use either @ref basic_connection::cancel with no arguments + * or per-operation cancellation. */ receive, From 858ea429dcc894d768ce8e614950c6b7d54db009 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:32:10 +0100 Subject: [PATCH 09/27] explanation on deadlock --- include/boost/redis/connection.hpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 9ca1709b..5de8f251 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -865,7 +865,19 @@ class basic_connection { * * @note To avoid deadlocks the task (e.g. coroutine) calling * `async_receive2` should not call `async_exec` in a way where - * they could block each other. + * they could block each other. This is, avoid the following pattern: + * + * @code + * asio::awaitable receiver() + * { + * // Do NOT do this!!! The receive buffer might get full while + * // async_exec runs, which will block all read operations until async_receive2 + * // is called. The two operations end up waiting each other, making the connection unresponsive. + * // If you need to do this, use two connections, instead. + * co_await conn.async_receive2(); + * co_await conn.async_exec(req, resp); + * } + * @endcode * * For an example see cpp20_subscriber.cpp. * From 691b4371b51caa6f38eeade8916202837e7fec21 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:35:19 +0100 Subject: [PATCH 10/27] don't use deprecated operation in test --- test/test_conn_push2.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index addbd957..61857119 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -215,7 +215,7 @@ void test_push_adapter_error() conn.async_run(cfg, [&](error_code ec) { BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; - conn.cancel(operation::receive); + conn.cancel(); }); ioc.run_for(test_timeout); From 5e3f87cb0e00fdb37363956c8db18bd21a64dcd0 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:52:21 +0100 Subject: [PATCH 11/27] subsequent calls --- test/test_conn_push2.cpp | 82 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 61857119..5fdcfe1f 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #include #include @@ -127,6 +128,80 @@ void test_async_receive2_push_available() BOOST_TEST(run_finished); } +// TODO: several pushes available + +// async_receive2 can be called several times in a row +void test_async_receive2_subsequent_calls() +{ + struct impl { + net::io_context ioc{}; + connection conn{ioc}; + resp3::flat_tree resp{}; + request req{}; + bool receive_finished = false, run_finished = false; + + // Send a SUBSCRIBE, which will trigger a push + void start_subscribe1() + { + conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive1(); + }); + } + + // Receive the push + void start_receive1() + { + conn.async_receive2([this](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + resp.clear(); + start_subscribe2(); + }); + } + + // Send another SUBSCRIBE, which will trigger another push + void start_subscribe2() + { + conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive2(); + }); + } + + // End + void start_receive2() + { + conn.async_receive2([this](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + receive_finished = true; + conn.cancel(); + }); + } + + void run() + { + // Setup + conn.set_receive_response(resp); + req.push("SUBSCRIBE", "test_async_receive2_subsequent_calls"); + + start_subscribe1(); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); + } + }; + + impl{}.run(); +} + // A push may be interleaved between regular responses. // It is handed to the receive adapter (filtered out). void test_exec_push_interleaved() @@ -278,8 +353,8 @@ void test_push_adapter_error_reconnection() BOOST_TEST(run_finished); } -// After an async_receive2 operation finishes, another one can be issued -void test_consecutive_receives() +// Tests the usual push consumer pattern that we recommend in the examples +void test_push_consumer() { net::io_context ioc; connection conn{ioc}; @@ -600,10 +675,11 @@ int main() { test_async_receive2_waiting_for_push(); test_async_receive2_push_available(); + test_async_receive2_subsequent_calls(); test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); - test_consecutive_receives(); + test_push_consumer(); test_unsubscribe(); test_pubsub_state_restoration(); From 8bfc2eb2bf18bea24ba751745cba88c35e1e00fb Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 12:53:26 +0100 Subject: [PATCH 12/27] make test faster --- test/test_conn_push2.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 5fdcfe1f..2ee3f8ef 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -655,7 +655,9 @@ struct test_pubsub_state_restoration_impl { // Start running bool run_finished = false; - conn.async_run(make_test_config(), [&run_finished](error_code ec) { + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 50ms; // make the test run faster + conn.async_run(cfg, [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); From 17441eb8e6cbda1de24f79ab243b651e5f853f5b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:06:23 +0100 Subject: [PATCH 13/27] batch test --- test/test_conn_push2.cpp | 69 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 2ee3f8ef..d48fb6d6 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -128,7 +129,72 @@ void test_async_receive2_push_available() BOOST_TEST(run_finished); } -// TODO: several pushes available +// async_receive2 blocks only once if several messages are received in a batch +void test_async_receive2_batch() +{ + struct impl { + net::io_context ioc{}; + connection conn{ioc}; + resp3::flat_tree resp{}; + request req{}; + bool receive_finished = false, run_finished = false; + + // Trigger pushes + void start_exec() + { + conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive1(); + }); + } + + // Receive the two pushes + void start_receive1() + { + conn.async_receive2([this](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 2u); + start_receive2(); + }); + } + + // The previous receive has consumed the two pushes, + // so this one will block (and be cancelled). + void start_receive2() + { + conn.async_receive2(net::cancel_after(50ms, [this](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; + conn.cancel(); + })); + } + + void run() + { + // Setup + conn.set_receive_response(resp); + + // Cause two messages to be delivered. The PING ensures that + // the pushes have been read when exec completes + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("PING", "message"); + + start_exec(); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); + } + }; + + impl{}.run(); +} // async_receive2 can be called several times in a row void test_async_receive2_subsequent_calls() @@ -677,6 +743,7 @@ int main() { test_async_receive2_waiting_for_push(); test_async_receive2_push_available(); + test_async_receive2_batch(); test_async_receive2_subsequent_calls(); test_exec_push_interleaved(); test_push_adapter_error(); From 450d8d25c9bea4bd282e698293eb792c41322e1d Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:13:10 +0100 Subject: [PATCH 14/27] simplify the test --- test/test_conn_push2.cpp | 90 ++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 54 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index d48fb6d6..bfbf8d07 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -132,68 +132,50 @@ void test_async_receive2_push_available() // async_receive2 blocks only once if several messages are received in a batch void test_async_receive2_batch() { - struct impl { - net::io_context ioc{}; - connection conn{ioc}; - resp3::flat_tree resp{}; - request req{}; - bool receive_finished = false, run_finished = false; - - // Trigger pushes - void start_exec() - { - conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - start_receive1(); - }); - } + // Setup + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); - // Receive the two pushes - void start_receive1() - { - conn.async_receive2([this](error_code ec) { - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(resp.get_total_msgs(), 2u); - start_receive2(); - }); - } + // Cause two messages to be delivered. The PING ensures that + // the pushes have been read when exec completes + request req; + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("PING", "message"); - // The previous receive has consumed the two pushes, - // so this one will block (and be cancelled). - void start_receive2() - { - conn.async_receive2(net::cancel_after(50ms, [this](error_code ec) { - BOOST_TEST_EQ(ec, net::error::operation_aborted); - receive_finished = true; - conn.cancel(); - })); - } + bool receive_finished = false, run_finished = false; - void run() - { - // Setup - conn.set_receive_response(resp); + // 1. Trigger pushes + // 2. Receive both of them + // 3. Check that receive2 has consumed them by calling it again + auto on_receive2 = [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; + conn.cancel(); + }; - // Cause two messages to be delivered. The PING ensures that - // the pushes have been read when exec completes - req.push("SUBSCRIBE", "test_async_receive2_batch"); - req.push("SUBSCRIBE", "test_async_receive2_batch"); - req.push("PING", "message"); + auto on_receive1 = [&](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 2u); + conn.async_receive2(net::cancel_after(50ms, on_receive2)); + }; - start_exec(); - conn.async_run(make_test_config(), [&](error_code ec) { - run_finished = true; - BOOST_TEST_EQ(ec, net::error::operation_aborted); - }); + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_receive2(on_receive1); + }); - ioc.run_for(test_timeout); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); - BOOST_TEST(receive_finished); - BOOST_TEST(run_finished); - } - }; + ioc.run_for(test_timeout); - impl{}.run(); + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); } // async_receive2 can be called several times in a row From e22b1a11cb5a16b1aef4c5aa9a09234c448172ef Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:14:54 +0100 Subject: [PATCH 15/27] cancel_after tests --- test/test_conn_cancel_after.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test_conn_cancel_after.cpp b/test/test_conn_cancel_after.cpp index dd25a4a5..b7f35800 100644 --- a/test/test_conn_cancel_after.cpp +++ b/test/test_conn_cancel_after.cpp @@ -93,6 +93,27 @@ void test_receive() BOOST_TEST(receive_finished); } +template +void test_receive2() +{ + // Setup + asio::io_context ioc; + Connection conn{ioc}; + bool receive_finished = false; + generic_response resp; + conn.set_receive_response(resp); + + // Call the function with a very short timeout. + conn.async_receive2(asio::cancel_after(1ms, [&](error_code ec) { + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + receive_finished = true; + })); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); +} + } // namespace int main() @@ -106,5 +127,8 @@ int main() test_receive>(); test_receive(); + test_receive2>(); + test_receive2(); + return boost::report_errors(); } From 99b722f31a65eed541e94ce8a15b4b52dc488640 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:21:36 +0100 Subject: [PATCH 16/27] per-op cancel --- test/test_conn_push2.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index bfbf8d07..013e28b4 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -12,7 +12,10 @@ #include #include +#include #include +#include +#include #include #include #include @@ -21,6 +24,7 @@ #include #include +#include #include #include #include @@ -250,6 +254,32 @@ void test_async_receive2_subsequent_calls() impl{}.run(); } +// async_receive2 can be cancelled using per-operation cancellation, +// and supports all cancellation types +void test_async_receive2_per_operation_cancellation( + std::string_view name, + net::cancellation_type_t type) +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + bool receive_finished = false; + + conn.async_receive2(net::bind_cancellation_slot(sig.slot(), [&](error_code ec) { + if (!BOOST_TEST_EQ(ec, net::error::operation_aborted)) + std::cerr << "With cancellation type " << name << std::endl; + receive_finished = true; + })); + + sig.emit(type); + + ioc.run_for(test_timeout); + + if (!BOOST_TEST(receive_finished)) + std::cerr << "With cancellation type " << name << std::endl; +} + // A push may be interleaved between regular responses. // It is handed to the receive adapter (filtered out). void test_exec_push_interleaved() @@ -727,6 +757,9 @@ int main() test_async_receive2_push_available(); test_async_receive2_batch(); test_async_receive2_subsequent_calls(); + test_async_receive2_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); + test_async_receive2_per_operation_cancellation("partial", net::cancellation_type_t::partial); + test_async_receive2_per_operation_cancellation("total", net::cancellation_type_t::total); test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); From 687fb926902e50012df7215bf61cec765916119e Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:23:14 +0100 Subject: [PATCH 17/27] connection cancel --- test/test_conn_push2.cpp | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 013e28b4..197ef425 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -280,6 +280,27 @@ void test_async_receive2_per_operation_cancellation( std::cerr << "With cancellation type " << name << std::endl; } +// connection::cancel() cancels async_receive2 +void test_async_receive2_connection_cancel() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + bool receive_finished = false; + + conn.async_receive2([&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; + }); + + conn.cancel(); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); +} + // A push may be interleaved between regular responses. // It is handed to the receive adapter (filtered out). void test_exec_push_interleaved() @@ -760,6 +781,7 @@ int main() test_async_receive2_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); test_async_receive2_per_operation_cancellation("partial", net::cancellation_type_t::partial); test_async_receive2_per_operation_cancellation("total", net::cancellation_type_t::total); + test_async_receive2_connection_cancel(); test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); From dda55b65f3ba9a3fa0810a9251a11dddbdb3d4c7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:37:47 +0100 Subject: [PATCH 18/27] reconnection --- test/test_conn_push2.cpp | 64 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 197ef425..ed6e9bbf 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -301,6 +301,69 @@ void test_async_receive2_connection_cancel() BOOST_TEST(receive_finished); } +// Reconnection doesn't cancel async_receive2 +void test_async_receive2_reconnection() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); + + // Causes the reconnection + request req_quit; + req_quit.push("QUIT"); + + // When this completes, the reconnection has happened + request req_ping; + req_ping.get_config().cancel_if_unresponded = false; + req_ping.push("PING", "test_async_receive2_connection"); + + // Generates a push + request req_subscribe; + req_subscribe.push("SUBSCRIBE", "test_async_receive2_connection"); + + bool exec_finished = false, receive_finished = false, run_finished = false; + + // Launch a receive operation, and in parallel + // 1. Trigger a reconnection + // 2. Wait for the reconnection and check that receive hasn't been cancelled + // 3. Trigger a push to make receive complete + auto on_subscribe = [&](error_code ec, std::size_t) { + // Will finish before receive2 because the command doesn't have a response + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + }; + + auto on_ping = [&](error_code ec, std::size_t) { + // Reconnection has already happened here + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_NOT(receive_finished); + conn.async_exec(req_subscribe, ignore, on_subscribe); + }; + + conn.async_exec(req_quit, ignore, [&](error_code, std::size_t) { + conn.async_exec(req_ping, ignore, on_ping); + }); + + conn.async_receive2([&](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + receive_finished = true; + conn.cancel(); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); +} + // A push may be interleaved between regular responses. // It is handed to the receive adapter (filtered out). void test_exec_push_interleaved() @@ -782,6 +845,7 @@ int main() test_async_receive2_per_operation_cancellation("partial", net::cancellation_type_t::partial); test_async_receive2_per_operation_cancellation("total", net::cancellation_type_t::total); test_async_receive2_connection_cancel(); + test_async_receive2_reconnection(); test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); From eed407506790b5f4320e05087b794216e50cc7cb Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:49:50 +0100 Subject: [PATCH 19/27] Solve TODOs --- include/boost/redis/connection.hpp | 25 +++++++++++++++---------- include/boost/redis/impl/connection.ipp | 6 ++++++ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 5de8f251..c7b22e5a 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -252,14 +252,10 @@ struct receive2_op { void drain_receive_channel() { - // TODO: review - auto f = [](system::error_code, std::size_t) { - // There is no point in checking for errors - // here since async_receive just completed - // without errors. - }; - - // We just want to drain the channel. + // We don't expect any errors here. The only errors + // that might appear in the channel are due to cancellations, + // and these don't make sense with try_receive + auto f = [](system::error_code, std::size_t) { }; while (conn_->receive_channel_.try_receive(f)) ; } @@ -1292,12 +1288,13 @@ class connection { return impl_.async_receive(std::forward(token)); } - // TODO: might want to use type-erased handlers here /// @copydoc basic_connection::async_receive2 template auto async_receive2(CompletionToken&& token = {}) { - return impl_.async_receive2(std::forward(token)); + return asio::async_initiate( + initiation{this}, + token); } /// @copydoc basic_connection::receive @@ -1410,6 +1407,12 @@ class connection { { self->async_exec_impl(*req, std::move(adapter), std::forward(handler)); } + + template + void operator()(Handler&& handler) + { + self->async_receive2_impl(std::forward(handler)); + } }; void async_run_impl( @@ -1426,6 +1429,8 @@ class connection { any_adapter&& adapter, asio::any_completion_handler token); + void async_receive2_impl(asio::any_completion_handler token); + basic_connection impl_; }; diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 088b1ac3..9db49af3 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -51,6 +51,12 @@ void connection::async_exec_impl( impl_.async_exec(req, std::move(adapter), std::move(token)); } +void connection::async_receive2_impl( + asio::any_completion_handler token) +{ + impl_.async_receive2(std::move(token)); +} + void connection::cancel(operation op) { impl_.cancel(op); } } // namespace boost::redis From 6ee1594699523d3864470153a7074ef6f6bf1304 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 13:56:18 +0100 Subject: [PATCH 20/27] make tests faster --- test/common.cpp | 3 +++ test/test_conn_check_health.cpp | 1 - test/test_conn_push.cpp | 4 +--- test/test_conn_push2.cpp | 8 ++------ test/test_conn_reconnect.cpp | 4 +--- test/test_conn_tls.cpp | 4 +--- test/test_unix_sockets.cpp | 1 - 7 files changed, 8 insertions(+), 17 deletions(-) diff --git a/test/common.cpp b/test/common.cpp index cb7e1891..54b3697f 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -7,12 +7,14 @@ #include "common.hpp" +#include #include #include #include #include namespace net = boost::asio; +using namespace std::chrono_literals; struct run_callback { std::shared_ptr conn; @@ -55,6 +57,7 @@ boost::redis::config make_test_config() { boost::redis::config cfg; cfg.addr.host = get_server_hostname(); + cfg.reconnect_wait_interval = 50ms; // make tests involving reconnection faster return cfg; } diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 78062e3b..5d53b084 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -51,7 +51,6 @@ void test_reconnection() // Make the test run faster auto cfg = make_test_config(); cfg.health_check_interval = 500ms; - cfg.reconnect_wait_interval = 100ms; bool run_finished = false, exec1_finished = false, exec2_finished = false; diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 3b264552..0ba3258f 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -264,9 +264,7 @@ struct test_async_receive_cancelled_on_reconnection_impl { start_subscribe1(); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&](error_code ec) { + conn.async_run(make_test_config(), [&](error_code ec) { run_finished = true; BOOST_TEST_EQ(ec, net::error::operation_aborted); }); diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index ed6e9bbf..ab188d0d 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -501,9 +501,7 @@ void test_push_adapter_error_reconnection() conn.async_exec(req2, resp, on_exec2); }); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&run_finished](error_code ec) { + conn.async_run(make_test_config(), [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); @@ -817,9 +815,7 @@ struct test_pubsub_state_restoration_impl { // Start running bool run_finished = false; - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&run_finished](error_code ec) { + conn.async_run(make_test_config(), [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 2a9ea337..7740c9b5 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -47,9 +47,7 @@ net::awaitable test_reconnect_impl() regular_req.get_config().cancel_if_unresponded = false; auto conn = std::make_shared(ex); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 100ms; // make the test run faster - run(conn, std::move(cfg)); + run(conn, make_test_config()); for (int i = 0; i < 3; ++i) { BOOST_TEST_CONTEXT("i=" << i) diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index 43939b40..8989e2e3 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -147,8 +147,6 @@ BOOST_AUTO_TEST_CASE(reconnection) net::io_context ioc; net::steady_timer timer{ioc}; connection conn{ioc}; - auto cfg = make_tls_config(); - cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; ping_request.push("PING", "some_value"); @@ -161,7 +159,7 @@ BOOST_AUTO_TEST_CASE(reconnection) bool exec_finished = false, run_finished = false; // Run the connection - conn.async_run(cfg, {}, [&](error_code ec) { + conn.async_run(make_test_config(), [&](error_code ec) { run_finished = true; BOOST_TEST(ec == net::error::operation_aborted); }); diff --git a/test/test_unix_sockets.cpp b/test/test_unix_sockets.cpp index be4fa139..ec53b583 100644 --- a/test/test_unix_sockets.cpp +++ b/test/test_unix_sockets.cpp @@ -78,7 +78,6 @@ void test_reconnection() connection conn{ioc}; auto cfg = make_test_config(); cfg.unix_socket = unix_socket_path; - cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; ping_request.get_config().cancel_if_not_connected = false; From f6bbae04c94e951a9b7d12600271eb6a2a7cabca Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Fri, 16 Jan 2026 20:17:05 +0100 Subject: [PATCH 21/27] simplify connection impl --- include/boost/redis/connection.hpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c7b22e5a..99b62ae7 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -285,15 +285,6 @@ struct receive2_op { } }; -template -auto async_receive2(connection_impl& conn, CompletionToken&& token) -{ - return asio::async_compose( - receive2_op{&conn}, - token, - conn); -} - template struct exec_one_op { connection_impl* conn_; @@ -895,7 +886,10 @@ class basic_connection { template > auto async_receive2(CompletionToken&& token = {}) { - return detail::async_receive2(*impl_, std::forward(token)); + return asio::async_compose( + detail::receive2_op{impl_.get()}, + token, + *impl_); } /** @brief (Deprecated) Receives server pushes synchronously without blocking. From d311a8f60d834f5c8ab1580a6e28d3029e2ad380 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 17 Jan 2026 18:54:09 +0100 Subject: [PATCH 22/27] Initial test --- test/CMakeLists.txt | 1 + test/Jamfile | 1 + test/test_receive_fsm.cpp | 94 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 test/test_receive_fsm.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9c6a97e0..859dfc51 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ make_test(test_writer_fsm) make_test(test_reader_fsm) make_test(test_connect_fsm) make_test(test_sentinel_resolve_fsm) +make_test(test_receive_fsm) make_test(test_run_fsm) make_test(test_compose_setup_request) make_test(test_setup_adapter) diff --git a/test/Jamfile b/test/Jamfile index ccd47060..34683729 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -62,6 +62,7 @@ local tests = test_writer_fsm test_reader_fsm test_sentinel_resolve_fsm + test_receive_fsm test_run_fsm test_connect_fsm test_compose_setup_request diff --git a/test/test_receive_fsm.cpp b/test/test_receive_fsm.cpp new file mode 100644 index 00000000..fe93b8bb --- /dev/null +++ b/test/test_receive_fsm.cpp @@ -0,0 +1,94 @@ +// +// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include +#include +#include + +#include "sansio_utils.hpp" + +namespace net = boost::asio; +using namespace boost::redis; +using net::cancellation_type_t; +using boost::system::error_code; +using net::cancellation_type_t; +using detail::receive_action; +using detail::receive_fsm; +using detail::connection_state; +using action_type = receive_action::action_type; + +// Operators +static const char* to_string(action_type type) +{ + switch (type) { + case action_type::setup_cancellation: return "setup_cancellation"; + case action_type::wait: return "wait"; + case action_type::drain_channel: return "drain_channel"; + case action_type::immediate: return "immediate"; + case action_type::done: return "done"; + default: return ""; + } +} + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, action_type type) { return os << to_string(type); } + +bool operator==(const receive_action& lhs, const receive_action& rhs) noexcept +{ + return lhs.type == rhs.type && lhs.ec == rhs.ec; +} + +std::ostream& operator<<(std::ostream& os, const receive_action& act) +{ + os << "action{ .type=" << act.type; + if (act.type == action_type::done) + os << ", ec=" << act.ec; + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +struct fixture { + connection_state st; + generic_response resp; +}; + +void test_success() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // The wait finishes successfully (we were notified). Receive exits + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::drain_channel); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code()); +} + +} // namespace + +int main() +{ + test_success(); + + return boost::report_errors(); +} From 2883c8d74f0fec79a3fa99af27a59d7ef9e15e11 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 17 Jan 2026 19:20:17 +0100 Subject: [PATCH 23/27] more tests --- test/test_receive_fsm.cpp | 86 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 3 deletions(-) diff --git a/test/test_receive_fsm.cpp b/test/test_receive_fsm.cpp index fe93b8bb..519638f5 100644 --- a/test/test_receive_fsm.cpp +++ b/test/test_receive_fsm.cpp @@ -11,12 +11,10 @@ #include #include +#include #include -#include #include -#include "sansio_utils.hpp" - namespace net = boost::asio; using namespace boost::redis; using net::cancellation_type_t; @@ -25,6 +23,7 @@ using net::cancellation_type_t; using detail::receive_action; using detail::receive_fsm; using detail::connection_state; +namespace channel_errc = net::experimental::channel_errc; using action_type = receive_action::action_type; // Operators @@ -77,11 +76,89 @@ void test_success() act = fsm.resume(st, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action_type::wait); + // At this point, the operation is now running + BOOST_TEST(st.receive2_running); + + // The wait finishes successfully (we were notified). Receive exits + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::drain_channel); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code()); + + // The operation is no longer running + BOOST_TEST_NOT(st.receive2_running); +} + +// We might see spurious cancels during reconnection (v1 compatibility). +void test_cancelled_reconnection() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // Reconnection happens + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + // The wait finishes successfully (we were notified). Receive exits act = fsm.resume(st, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action_type::drain_channel); act = fsm.resume(st, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, error_code()); + + // The operation is no longer running + BOOST_TEST_NOT(st.receive2_running); +} + +// We might get cancellations due to connection::cancel() +void test_cancelled_connection_cancel() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // Simulate a connection::cancel() + st.receive2_cancelled = true; + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); +} + +// Operations can still run after connection::cancel() +void test_after_connection_cancel() +{ + connection_state st; + receive_fsm fsm; + st.receive2_cancelled = true; + + // The operation initiates and runs normally + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // Reconnection behavior not affected + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + + // Simulate a connection::cancel() + st.receive2_cancelled = true; + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); } } // namespace @@ -89,6 +166,9 @@ void test_success() int main() { test_success(); + test_cancelled_reconnection(); + test_cancelled_connection_cancel(); + test_after_connection_cancel(); return boost::report_errors(); } From 49cf31b1179f0aeff1de788b07b40595214b67b5 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 17 Jan 2026 19:26:58 +0100 Subject: [PATCH 24/27] per-op cancellation --- test/test_receive_fsm.cpp | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/test_receive_fsm.cpp b/test/test_receive_fsm.cpp index 519638f5..58e4218b 100644 --- a/test/test_receive_fsm.cpp +++ b/test/test_receive_fsm.cpp @@ -15,6 +15,9 @@ #include #include +#include +#include + namespace net = boost::asio; using namespace boost::redis; using net::cancellation_type_t; @@ -106,6 +109,11 @@ void test_cancelled_reconnection() BOOST_TEST_EQ(act, action_type::wait); BOOST_TEST(st.receive2_running); // still running + // Another reconnection + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + // The wait finishes successfully (we were notified). Receive exits act = fsm.resume(st, error_code(), cancellation_type_t::none); BOOST_TEST_EQ(act, action_type::drain_channel); @@ -161,6 +169,27 @@ void test_after_connection_cancel() BOOST_TEST_NOT(st.receive2_running); } +// Per-operation cancellation is supported +void test_per_operation_cancellation(std::string_view name, cancellation_type_t type) +{ + std::cerr << "Running cancellation case " << name << std::endl; + + connection_state st; + receive_fsm fsm; + + // The operation initiates and runs normally + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // Cancellation is received + act = fsm.resume(st, channel_errc::channel_cancelled, type); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); +} + } // namespace int main() @@ -170,5 +199,10 @@ int main() test_cancelled_connection_cancel(); test_after_connection_cancel(); + test_per_operation_cancellation("terminal", cancellation_type_t::terminal); + test_per_operation_cancellation("partial", cancellation_type_t::partial); + test_per_operation_cancellation("total", cancellation_type_t::total); + test_per_operation_cancellation("all", cancellation_type_t::all); + return boost::report_errors(); } From 8c1000e3b241486545b8c58466cb6295255be7a4 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 17 Jan 2026 19:30:47 +0100 Subject: [PATCH 25/27] Finished tests --- test/test_receive_fsm.cpp | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/test/test_receive_fsm.cpp b/test/test_receive_fsm.cpp index 58e4218b..351af0b3 100644 --- a/test/test_receive_fsm.cpp +++ b/test/test_receive_fsm.cpp @@ -190,6 +190,41 @@ void test_per_operation_cancellation(std::string_view name, cancellation_type_t BOOST_TEST_NOT(st.receive2_running); } +// Only a single instance of async_receive2 can be running at the same time +void test_error_already_running() +{ + connection_state st; + receive_fsm fsm; + st.receive2_running = true; + + // The operation fails immediately + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::immediate); + BOOST_TEST(st.receive2_running); // not affected + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::already_running)); + BOOST_TEST(st.receive2_running); // not affected +} + +// If an unknown error was obtained during channel receive, we propagate it +void test_error_unknown() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // We have an unknown error + act = fsm.resume(st, channel_errc::channel_closed, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(channel_errc::channel_closed)); + BOOST_TEST_NOT(st.receive2_running); +} + } // namespace int main() @@ -204,5 +239,8 @@ int main() test_per_operation_cancellation("total", cancellation_type_t::total); test_per_operation_cancellation("all", cancellation_type_t::all); + test_error_already_running(); + test_error_unknown(); + return boost::report_errors(); } From 0183af2b4d6f8ffc5eae783bb5e6ffba887ce6d9 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sat, 17 Jan 2026 19:31:42 +0100 Subject: [PATCH 26/27] rewrite yield indices --- include/boost/redis/impl/receive_fsm.ipp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/boost/redis/impl/receive_fsm.ipp b/include/boost/redis/impl/receive_fsm.ipp index d796b3bc..ca98dcd8 100644 --- a/include/boost/redis/impl/receive_fsm.ipp +++ b/include/boost/redis/impl/receive_fsm.ipp @@ -38,7 +38,7 @@ receive_action receive_fsm::resume( // Parallel async_receive2 operations not supported if (st.receive2_running) { - BOOST_REDIS_YIELD(resume_point_, 4, receive_action::action_type::immediate) + BOOST_REDIS_YIELD(resume_point_, 1, receive_action::action_type::immediate) return system::error_code(error::already_running); } @@ -47,15 +47,15 @@ receive_action receive_fsm::resume( st.receive2_cancelled = false; // This operation supports total cancellation. Set it up - BOOST_REDIS_YIELD(resume_point_, 1, receive_action::action_type::setup_cancellation) + BOOST_REDIS_YIELD(resume_point_, 2, receive_action::action_type::setup_cancellation) while (true) { // Wait at least once for a notification to arrive - BOOST_REDIS_YIELD(resume_point_, 2, receive_action::action_type::wait) + BOOST_REDIS_YIELD(resume_point_, 3, receive_action::action_type::wait) // If the wait completed successfully, we have pushes. Drain the channel and exit if (!ec) { - BOOST_REDIS_YIELD(resume_point_, 3, receive_action::action_type::drain_channel) + BOOST_REDIS_YIELD(resume_point_, 4, receive_action::action_type::drain_channel) st.receive2_running = false; return system::error_code(); } From c684ceaf9d7601712555f4e9bf0603c67ec58981 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Sun, 18 Jan 2026 14:16:07 +0100 Subject: [PATCH 27/27] reword --- include/boost/redis/connection.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 99b62ae7..1801dd4d 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -820,7 +820,7 @@ class basic_connection { * accumulate internally (we don't make any commitment to this * exact number). When that happens any `async_exec`s and * health-checks won't make any progress and the connection may - * eventually timeout. To avoid that apps that expect server pushes + * eventually timeout. To avoid this, apps that expect server pushes * should call this function continuously in a loop. * * This function should be used instead of the deprecated @ref async_receive. @@ -828,8 +828,8 @@ class basic_connection { * * @li `async_receive` is designed to consume a single push message at a time. * This can be inefficient when receiving lots of server pushes. - * `async_receive2` is batch-oriented, and will only suspend once if - * several pushes are received in a single network packet. + * `async_receive2` is batch-oriented. All pushes that are available + * when `async_receive2` is called will be marked as consumed. * @li `async_receive` is cancelled when a reconnection happens (e.g. because * of a network error). This enabled the user to re-establish subscriptions * using @ref async_exec before waiting for pushes again. With the introduction of