diff --git a/README.md b/README.md index 62a03896..5b71d904 100644 --- a/README.md +++ b/README.md @@ -106,15 +106,17 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored // in resp. If the connection encounters a network error and reconnects to the server, // it will automatically subscribe to 'mychannel' again. This is transparent to the user. + // You need to use specialized request::subscribe() function (instead of request::push) + // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index b7194d1b..f8c6e191 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -117,15 +117,17 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored // in resp. If the connection encounters a network error and reconnects to the server, // it will automatically subscribe to 'mychannel' again. This is transparent to the user. + // You need to use specialized request::subscribe() function (instead of request::push) + // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index 4dd8180d..2edb5cea 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -6,12 +6,12 @@ #include +#include #include #include #include #include #include -#include #include #include @@ -30,7 +30,6 @@ using boost::asio::co_spawn; using boost::asio::consign; using boost::asio::detached; using boost::asio::dynamic_buffer; -using boost::asio::redirect_error; using boost::redis::config; using boost::redis::connection; using boost::redis::generic_flat_response; @@ -61,13 +60,13 @@ auto receiver(std::shared_ptr conn) -> awaitable req.subscribe({"channel"}); co_await conn->async_exec(req); - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index b3eb9cdd..b8802b03 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -6,14 +6,12 @@ #include +#include #include #include #include #include -#include -#include #include -#include #include @@ -62,13 +60,13 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // to enable this behavior. // Loop to read Redis push messages. - for (error_code ec;;) { + while (conn->will_reconnect()) { // Wait for pushes - co_await conn->async_receive2(asio::redirect_error(ec)); + auto [ec] = co_await conn->async_receive2(asio::as_tuple); // Check for errors and cancellations - if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) { - std::cerr << "Error during receive2: " << ec << std::endl; + if (ec) { + std::cerr << "Error during receive: " << ec << std::endl; break; } diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 1fcf2d6b..1801dd4d 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -154,7 +155,7 @@ struct connection_impl { { switch (op) { case operation::exec: st_.mpx.cancel_waiting(); break; - case operation::receive: receive_channel_.cancel(); break; + case operation::receive: cancel_receive_v2(); break; case operation::reconnection: st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); break; @@ -165,7 +166,7 @@ struct connection_impl { case operation::health_check: cancel_run(); break; case operation::all: st_.mpx.cancel_waiting(); // exec - receive_channel_.cancel(); // receive + cancel_receive_v2(); // receive st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect cancel_run(); // run break; @@ -173,6 +174,14 @@ struct connection_impl { } } + void cancel_receive_v1() { receive_channel_.cancel(); } + + void cancel_receive_v2() + { + st_.receive2_cancelled = true; + cancel_receive_v1(); + } + void cancel_run() { // Individual operations should see a terminal cancellation, regardless @@ -184,8 +193,8 @@ struct connection_impl { stream_.cancel_resolve(); // Receive is technically not part of run, but we also cancel it for - // backwards compatibility. - receive_channel_.cancel(); + // backwards compatibility. Note that this intentionally affects v1 receive, only. + cancel_receive_v1(); } bool is_open() const noexcept { return stream_.is_open(); } @@ -234,32 +243,45 @@ struct connection_impl { return size; } +}; - template - auto async_receive2(CompletionToken&& token) +template +struct receive2_op { + connection_impl* conn_; + receive_fsm fsm_{}; + + void drain_receive_channel() { - // clang-format off - return - receive_channel_.async_receive( - asio::deferred( - [this](system::error_code ec, std::size_t) - { - if (!ec) { - auto f = [](system::error_code, std::size_t) { - // There is no point in checking for errors - // here since async_receive just completed - // without errors. - }; - - // We just want to drain the channel. - while (receive_channel_.try_receive(f)); - } - - return asio::deferred.values(ec); - } - ) - )(std::forward(token)); - // clang-format on + // We don't expect any errors here. The only errors + // that might appear in the channel are due to cancellations, + // and these don't make sense with try_receive + auto f = [](system::error_code, std::size_t) { }; + while (conn_->receive_channel_.try_receive(f)) + ; + } + + template + void operator()(Self& self, system::error_code ec = {}, std::size_t /* push_bytes */ = 0u) + { + receive_action act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled()); + + switch (act.type) { + case receive_action::action_type::setup_cancellation: + self.reset_cancellation_state(asio::enable_total_cancellation()); + (*this)(self); // this action does not require yielding + return; + case receive_action::action_type::wait: + conn_->receive_channel_.async_receive(std::move(self)); + return; + case receive_action::action_type::drain_channel: + drain_receive_channel(); + (*this)(self); // this action does not require yielding + return; + case receive_action::action_type::immediate: + asio::async_immediate(self.get_io_executor(), std::move(self)); + return; + case receive_action::action_type::done: self.complete(act.ec); return; + } } }; @@ -785,27 +807,68 @@ class basic_connection { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Wait for server pushes asynchronously + /** @brief Wait for server pushes asynchronously. * - * This function suspends until a server push is received by the + * This function suspends until at least one server push is received by the * connection. On completion an unspecified number of pushes will * have been added to the response object set with @ref - * boost::redis::connection::set_receive_response. + * set_receive_response. Use the functions in the response object + * to know how many messages they were received and consume them. * * To prevent receiving an unbound number of pushes the connection * blocks further read operations on the socket when 256 pushes * accumulate internally (we don't make any commitment to this * exact number). When that happens any `async_exec`s and * health-checks won't make any progress and the connection may - * eventually timeout. To avoid that Apps should call - * `async_receive2` continuously in a loop. - * - * @Note To avoid deadlocks the task (e.g. coroutine) calling + * eventually timeout. To avoid this, apps that expect server pushes + * should call this function continuously in a loop. + * + * This function should be used instead of the deprecated @ref async_receive. + * It differs from `async_receive` in the following: + * + * @li `async_receive` is designed to consume a single push message at a time. + * This can be inefficient when receiving lots of server pushes. + * `async_receive2` is batch-oriented. All pushes that are available + * when `async_receive2` is called will be marked as consumed. + * @li `async_receive` is cancelled when a reconnection happens (e.g. because + * of a network error). This enabled the user to re-establish subscriptions + * using @ref async_exec before waiting for pushes again. With the introduction of + * functions like @ref request::subscribe, subscriptions are automatically + * re-established on reconnection. Thus, `async_receive2` is not cancelled + * on reconnection. + * @li `async_receive` passes the number of bytes that each received + * push message contains. This information is unreliable and not very useful. + * Equivalent information is available using functions in the response object. + * @li `async_receive` might get cancelled if `async_run` is cancelled. + * This doesn't happen with `async_receive2`. + * + * This function does *not* remove messages from the response object + * passed to @ref set_receive_response - use the functions in the response + * object to achieve this. + * + * Only a single instance of `async_receive2` may be outstanding + * for a given connection at any time. Trying to start a second one + * will fail with @ref error::already_running. + * + * @note To avoid deadlocks the task (e.g. coroutine) calling * `async_receive2` should not call `async_exec` in a way where - * they could block each other. + * they could block each other. This is, avoid the following pattern: + * + * @code + * asio::awaitable receiver() + * { + * // Do NOT do this!!! The receive buffer might get full while + * // async_exec runs, which will block all read operations until async_receive2 + * // is called. The two operations end up waiting each other, making the connection unresponsive. + * // If you need to do this, use two connections, instead. + * co_await conn.async_receive2(); + * co_await conn.async_exec(req, resp); + * } + * @endcode + * + * For an example see cpp20_subscriber.cpp. * - * For an example see cpp20_subscriber.cpp. The completion token - * must have the following signature + * The completion token must have the following signature: * * @code * void f(system::error_code); @@ -818,15 +881,15 @@ class basic_connection { * @li `asio::cancellation_type_t::partial`. * @li `asio::cancellation_type_t::total`. * - * Calling `basic_connection::cancel(operation::receive)` will - * also cancel any ongoing receive operations. - * * @param token Completion token. */ template > auto async_receive2(CompletionToken&& token = {}) { - return impl_->async_receive2(std::forward(token)); + return asio::async_compose( + detail::receive2_op{impl_.get()}, + token, + *impl_); } /** @brief (Deprecated) Receives server pushes synchronously without blocking. @@ -1223,7 +1286,9 @@ class connection { template auto async_receive2(CompletionToken&& token = {}) { - return impl_.async_receive2(std::forward(token)); + return asio::async_initiate( + initiation{this}, + token); } /// @copydoc basic_connection::receive @@ -1336,6 +1401,12 @@ class connection { { self->async_exec_impl(*req, std::move(adapter), std::forward(handler)); } + + template + void operator()(Handler&& handler) + { + self->async_receive2_impl(std::forward(handler)); + } }; void async_run_impl( @@ -1352,6 +1423,8 @@ class connection { any_adapter&& adapter, asio::any_completion_handler token); + void async_receive2_impl(asio::any_completion_handler token); + basic_connection impl_; }; diff --git a/include/boost/redis/detail/connection_state.hpp b/include/boost/redis/detail/connection_state.hpp index 75ddf177..e8e09106 100644 --- a/include/boost/redis/detail/connection_state.hpp +++ b/include/boost/redis/detail/connection_state.hpp @@ -51,6 +51,7 @@ struct connection_state { request setup_req{}; request ping_req{}; subscription_tracker tracker{}; + bool receive2_running{false}, receive2_cancelled{false}; // Sentinel stuff lazy_random_engine eng{}; diff --git a/include/boost/redis/detail/receive_fsm.hpp b/include/boost/redis/detail/receive_fsm.hpp new file mode 100644 index 00000000..16989dce --- /dev/null +++ b/include/boost/redis/detail/receive_fsm.hpp @@ -0,0 +1,58 @@ +// +// Copyright (c) 2018-2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_RECEIVE_FSM_HPP +#define BOOST_REDIS_RECEIVE_FSM_HPP + +#include +#include + +// Sans-io algorithm for async_receive2, as a finite state machine + +namespace boost::redis::detail { + +struct connection_state; + +struct receive_action { + enum class action_type + { + setup_cancellation, // Set up the cancellation types supported by the composed operation + wait, // Wait for a message to appear in the receive channel + drain_channel, // Empty the receive channel + immediate, // Call async_immediate + done, // Complete + }; + + action_type type; + system::error_code ec; + + receive_action(action_type type) noexcept + : type{type} + { } + + receive_action(system::error_code ec) noexcept + : type{action_type::done} + , ec{ec} + { } +}; + +class receive_fsm { + int resume_point_{0}; + +public: + receive_fsm() = default; + + receive_action resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 346e3ad1..55356635 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -112,6 +112,10 @@ enum class error /// Expects a RESP3 array, but got a different data type. expects_resp3_array, + + /// A @ref basic_connection::async_receive2 operation is already running. + /// Only one of such operations might be running at any point in time. + already_running, }; /** diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 088b1ac3..9db49af3 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -51,6 +51,12 @@ void connection::async_exec_impl( impl_.async_exec(req, std::move(adapter), std::move(token)); } +void connection::async_receive2_impl( + asio::any_completion_handler token) +{ + impl_.async_receive2(std::move(token)); +} + void connection::cancel(operation op) { impl_.cancel(op); } } // namespace boost::redis diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index f7071506..04bfbdc2 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -68,6 +68,9 @@ struct error_category_impl : system::error_category { return "Expects a RESP3 string, but got a different data type."; case error::expects_resp3_array: return "Expects a RESP3 array, but got a different data type."; + case error::already_running: + return "An async_receive2 operation is already running. Only one of such operations " + "might be running at any point in time."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/receive_fsm.ipp b/include/boost/redis/impl/receive_fsm.ipp new file mode 100644 index 00000000..ca98dcd8 --- /dev/null +++ b/include/boost/redis/impl/receive_fsm.ipp @@ -0,0 +1,85 @@ +// +// Copyright (c) 2018-2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include + +#include +#include +#include + +namespace boost::redis::detail { + +constexpr bool is_any_cancel(asio::cancellation_type_t type) +{ + return !!( + type & (asio::cancellation_type_t::terminal | asio::cancellation_type_t::partial | + asio::cancellation_type_t::total)); +} + +// We use the receive2_cancelled flag rather than will_reconnect() to +// avoid entanglement between async_run and async_receive2 cancellations. +// If we had used will_reconnect(), async_receive2 would be cancelled +// when disabling reconnection and async_run exits, and in an unpredictable fashion. +receive_action receive_fsm::resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Parallel async_receive2 operations not supported + if (st.receive2_running) { + BOOST_REDIS_YIELD(resume_point_, 1, receive_action::action_type::immediate) + return system::error_code(error::already_running); + } + + // We're now running. Discard any previous cancellation state + st.receive2_running = true; + st.receive2_cancelled = false; + + // This operation supports total cancellation. Set it up + BOOST_REDIS_YIELD(resume_point_, 2, receive_action::action_type::setup_cancellation) + + while (true) { + // Wait at least once for a notification to arrive + BOOST_REDIS_YIELD(resume_point_, 3, receive_action::action_type::wait) + + // If the wait completed successfully, we have pushes. Drain the channel and exit + if (!ec) { + BOOST_REDIS_YIELD(resume_point_, 4, receive_action::action_type::drain_channel) + st.receive2_running = false; + return system::error_code(); + } + + // Check for cancellations + if (is_any_cancel(cancel_state) || st.receive2_cancelled) { + st.receive2_running = false; + return system::error_code(asio::error::operation_aborted); + } + + // If we get any unknown errors, propagate them (shouldn't happen, but just in case) + if (ec != asio::experimental::channel_errc::channel_cancelled) { + st.receive2_running = false; + return ec; + } + + // The channel was cancelled and no cancellation state is set. + // This is due to a reconnection. Ignore the notification + } + } + + // We should never get here + BOOST_ASSERT(false); + return receive_action{system::error_code()}; +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index da087cc0..9d05d1d7 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -52,7 +52,12 @@ enum class operation /// Refers to `connection::async_run` operations. run, - /// Refers to `connection::async_receive` operations. + /** + * @brief (Deprecated) Refers to `async_receive` and `async_receive2` operations. + * + * To cancel `async_receive2`, use either @ref basic_connection::cancel with no arguments + * or per-operation cancellation. + */ receive, /** diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 647e643d..196b2960 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9c6a97e0..859dfc51 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ make_test(test_writer_fsm) make_test(test_reader_fsm) make_test(test_connect_fsm) make_test(test_sentinel_resolve_fsm) +make_test(test_receive_fsm) make_test(test_run_fsm) make_test(test_compose_setup_request) make_test(test_setup_adapter) diff --git a/test/Jamfile b/test/Jamfile index ccd47060..34683729 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -62,6 +62,7 @@ local tests = test_writer_fsm test_reader_fsm test_sentinel_resolve_fsm + test_receive_fsm test_run_fsm test_connect_fsm test_compose_setup_request diff --git a/test/common.cpp b/test/common.cpp index cb7e1891..54b3697f 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -7,12 +7,14 @@ #include "common.hpp" +#include #include #include #include #include namespace net = boost::asio; +using namespace std::chrono_literals; struct run_callback { std::shared_ptr conn; @@ -55,6 +57,7 @@ boost::redis::config make_test_config() { boost::redis::config cfg; cfg.addr.host = get_server_hostname(); + cfg.reconnect_wait_interval = 50ms; // make tests involving reconnection faster return cfg; } diff --git a/test/test_conn_cancel_after.cpp b/test/test_conn_cancel_after.cpp index dd25a4a5..b7f35800 100644 --- a/test/test_conn_cancel_after.cpp +++ b/test/test_conn_cancel_after.cpp @@ -93,6 +93,27 @@ void test_receive() BOOST_TEST(receive_finished); } +template +void test_receive2() +{ + // Setup + asio::io_context ioc; + Connection conn{ioc}; + bool receive_finished = false; + generic_response resp; + conn.set_receive_response(resp); + + // Call the function with a very short timeout. + conn.async_receive2(asio::cancel_after(1ms, [&](error_code ec) { + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + receive_finished = true; + })); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); +} + } // namespace int main() @@ -106,5 +127,8 @@ int main() test_receive>(); test_receive(); + test_receive2>(); + test_receive2(); + return boost::report_errors(); } diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index 78062e3b..5d53b084 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -51,7 +51,6 @@ void test_reconnection() // Make the test run faster auto cfg = make_test_config(); cfg.health_check_interval = 500ms; - cfg.reconnect_wait_interval = 100ms; bool run_finished = false, exec1_finished = false, exec2_finished = false; diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 3b264552..0ba3258f 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -264,9 +264,7 @@ struct test_async_receive_cancelled_on_reconnection_impl { start_subscribe1(); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&](error_code ec) { + conn.async_run(make_test_config(), [&](error_code ec) { run_finished = true; BOOST_TEST_EQ(ec, net::error::operation_aborted); }); diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 5c260f8c..ab188d0d 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -5,11 +5,17 @@ */ #include +#include #include +#include #include #include #include +#include +#include +#include +#include #include #include #include @@ -18,6 +24,7 @@ #include #include +#include #include #include #include @@ -126,6 +133,237 @@ void test_async_receive2_push_available() BOOST_TEST(run_finished); } +// async_receive2 blocks only once if several messages are received in a batch +void test_async_receive2_batch() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); + + // Cause two messages to be delivered. The PING ensures that + // the pushes have been read when exec completes + request req; + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("SUBSCRIBE", "test_async_receive2_batch"); + req.push("PING", "message"); + + bool receive_finished = false, run_finished = false; + + // 1. Trigger pushes + // 2. Receive both of them + // 3. Check that receive2 has consumed them by calling it again + auto on_receive2 = [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; + conn.cancel(); + }; + + auto on_receive1 = [&](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 2u); + conn.async_receive2(net::cancel_after(50ms, on_receive2)); + }; + + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_receive2(on_receive1); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); +} + +// async_receive2 can be called several times in a row +void test_async_receive2_subsequent_calls() +{ + struct impl { + net::io_context ioc{}; + connection conn{ioc}; + resp3::flat_tree resp{}; + request req{}; + bool receive_finished = false, run_finished = false; + + // Send a SUBSCRIBE, which will trigger a push + void start_subscribe1() + { + conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive1(); + }); + } + + // Receive the push + void start_receive1() + { + conn.async_receive2([this](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + resp.clear(); + start_subscribe2(); + }); + } + + // Send another SUBSCRIBE, which will trigger another push + void start_subscribe2() + { + conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive2(); + }); + } + + // End + void start_receive2() + { + conn.async_receive2([this](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + receive_finished = true; + conn.cancel(); + }); + } + + void run() + { + // Setup + conn.set_receive_response(resp); + req.push("SUBSCRIBE", "test_async_receive2_subsequent_calls"); + + start_subscribe1(); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); + } + }; + + impl{}.run(); +} + +// async_receive2 can be cancelled using per-operation cancellation, +// and supports all cancellation types +void test_async_receive2_per_operation_cancellation( + std::string_view name, + net::cancellation_type_t type) +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + bool receive_finished = false; + + conn.async_receive2(net::bind_cancellation_slot(sig.slot(), [&](error_code ec) { + if (!BOOST_TEST_EQ(ec, net::error::operation_aborted)) + std::cerr << "With cancellation type " << name << std::endl; + receive_finished = true; + })); + + sig.emit(type); + + ioc.run_for(test_timeout); + + if (!BOOST_TEST(receive_finished)) + std::cerr << "With cancellation type " << name << std::endl; +} + +// connection::cancel() cancels async_receive2 +void test_async_receive2_connection_cancel() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + net::cancellation_signal sig; + bool receive_finished = false; + + conn.async_receive2([&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; + }); + + conn.cancel(); + + ioc.run_for(test_timeout); + + BOOST_TEST(receive_finished); +} + +// Reconnection doesn't cancel async_receive2 +void test_async_receive2_reconnection() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); + + // Causes the reconnection + request req_quit; + req_quit.push("QUIT"); + + // When this completes, the reconnection has happened + request req_ping; + req_ping.get_config().cancel_if_unresponded = false; + req_ping.push("PING", "test_async_receive2_connection"); + + // Generates a push + request req_subscribe; + req_subscribe.push("SUBSCRIBE", "test_async_receive2_connection"); + + bool exec_finished = false, receive_finished = false, run_finished = false; + + // Launch a receive operation, and in parallel + // 1. Trigger a reconnection + // 2. Wait for the reconnection and check that receive hasn't been cancelled + // 3. Trigger a push to make receive complete + auto on_subscribe = [&](error_code ec, std::size_t) { + // Will finish before receive2 because the command doesn't have a response + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + }; + + auto on_ping = [&](error_code ec, std::size_t) { + // Reconnection has already happened here + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_NOT(receive_finished); + conn.async_exec(req_subscribe, ignore, on_subscribe); + }; + + conn.async_exec(req_quit, ignore, [&](error_code, std::size_t) { + conn.async_exec(req_ping, ignore, on_ping); + }); + + conn.async_receive2([&](error_code ec) { + BOOST_TEST_EQ(ec, error_code()); + receive_finished = true; + conn.cancel(); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(receive_finished); + BOOST_TEST(run_finished); +} + // A push may be interleaved between regular responses. // It is handed to the receive adapter (filtered out). void test_exec_push_interleaved() @@ -194,12 +432,12 @@ void test_push_adapter_error() req.push("SUBSCRIBE", "channel"); req.push("PING"); - bool push_received = false, exec_finished = false, run_finished = false; + bool receive_finished = false, exec_finished = false, run_finished = false; - // async_receive2 is cancelled every reconnection cycle + // We cancel receive when run exits conn.async_receive2([&](error_code ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - push_received = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + receive_finished = true; }); // The request is cancelled because the PING response isn't processed @@ -211,13 +449,14 @@ void test_push_adapter_error() auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; // so we can validate the generated error - conn.async_run(cfg, [&run_finished](error_code ec) { + conn.async_run(cfg, [&](error_code ec) { BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; + conn.cancel(); }); ioc.run_for(test_timeout); - BOOST_TEST(push_received); + BOOST_TEST(receive_finished); BOOST_TEST(exec_finished); BOOST_TEST(run_finished); } @@ -244,7 +483,7 @@ void test_push_adapter_error_reconnection() // async_receive2 is cancelled every reconnection cycle conn.async_receive2([&](error_code ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); push_received = true; }); @@ -262,9 +501,7 @@ void test_push_adapter_error_reconnection() conn.async_exec(req2, resp, on_exec2); }); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&run_finished](error_code ec) { + conn.async_run(make_test_config(), [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); @@ -276,8 +513,8 @@ void test_push_adapter_error_reconnection() BOOST_TEST(run_finished); } -// After an async_receive2 operation finishes, another one can be issued -void test_consecutive_receives() +// Tests the usual push consumer pattern that we recommend in the examples +void test_push_consumer() { net::io_context ioc; connection conn{ioc}; @@ -287,7 +524,7 @@ void test_consecutive_receives() std::function launch_push_consumer = [&]() { conn.async_receive2([&](error_code ec) { if (ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); push_consumer_finished = true; resp.clear(); return; @@ -598,10 +835,17 @@ int main() { test_async_receive2_waiting_for_push(); test_async_receive2_push_available(); + test_async_receive2_batch(); + test_async_receive2_subsequent_calls(); + test_async_receive2_per_operation_cancellation("terminal", net::cancellation_type_t::terminal); + test_async_receive2_per_operation_cancellation("partial", net::cancellation_type_t::partial); + test_async_receive2_per_operation_cancellation("total", net::cancellation_type_t::total); + test_async_receive2_connection_cancel(); + test_async_receive2_reconnection(); test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); - test_consecutive_receives(); + test_push_consumer(); test_unsubscribe(); test_pubsub_state_restoration(); diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 2a9ea337..7740c9b5 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -47,9 +47,7 @@ net::awaitable test_reconnect_impl() regular_req.get_config().cancel_if_unresponded = false; auto conn = std::make_shared(ex); - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 100ms; // make the test run faster - run(conn, std::move(cfg)); + run(conn, make_test_config()); for (int i = 0; i < 3; ++i) { BOOST_TEST_CONTEXT("i=" << i) diff --git a/test/test_conn_tls.cpp b/test/test_conn_tls.cpp index 43939b40..8989e2e3 100644 --- a/test/test_conn_tls.cpp +++ b/test/test_conn_tls.cpp @@ -147,8 +147,6 @@ BOOST_AUTO_TEST_CASE(reconnection) net::io_context ioc; net::steady_timer timer{ioc}; connection conn{ioc}; - auto cfg = make_tls_config(); - cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; ping_request.push("PING", "some_value"); @@ -161,7 +159,7 @@ BOOST_AUTO_TEST_CASE(reconnection) bool exec_finished = false, run_finished = false; // Run the connection - conn.async_run(cfg, {}, [&](error_code ec) { + conn.async_run(make_test_config(), [&](error_code ec) { run_finished = true; BOOST_TEST(ec == net::error::operation_aborted); }); diff --git a/test/test_receive_fsm.cpp b/test/test_receive_fsm.cpp new file mode 100644 index 00000000..351af0b3 --- /dev/null +++ b/test/test_receive_fsm.cpp @@ -0,0 +1,246 @@ +// +// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace net = boost::asio; +using namespace boost::redis; +using net::cancellation_type_t; +using boost::system::error_code; +using net::cancellation_type_t; +using detail::receive_action; +using detail::receive_fsm; +using detail::connection_state; +namespace channel_errc = net::experimental::channel_errc; +using action_type = receive_action::action_type; + +// Operators +static const char* to_string(action_type type) +{ + switch (type) { + case action_type::setup_cancellation: return "setup_cancellation"; + case action_type::wait: return "wait"; + case action_type::drain_channel: return "drain_channel"; + case action_type::immediate: return "immediate"; + case action_type::done: return "done"; + default: return ""; + } +} + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, action_type type) { return os << to_string(type); } + +bool operator==(const receive_action& lhs, const receive_action& rhs) noexcept +{ + return lhs.type == rhs.type && lhs.ec == rhs.ec; +} + +std::ostream& operator<<(std::ostream& os, const receive_action& act) +{ + os << "action{ .type=" << act.type; + if (act.type == action_type::done) + os << ", ec=" << act.ec; + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +struct fixture { + connection_state st; + generic_response resp; +}; + +void test_success() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // At this point, the operation is now running + BOOST_TEST(st.receive2_running); + + // The wait finishes successfully (we were notified). Receive exits + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::drain_channel); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code()); + + // The operation is no longer running + BOOST_TEST_NOT(st.receive2_running); +} + +// We might see spurious cancels during reconnection (v1 compatibility). +void test_cancelled_reconnection() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // Reconnection happens + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + + // Another reconnection + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + + // The wait finishes successfully (we were notified). Receive exits + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::drain_channel); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code()); + + // The operation is no longer running + BOOST_TEST_NOT(st.receive2_running); +} + +// We might get cancellations due to connection::cancel() +void test_cancelled_connection_cancel() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + + // Simulate a connection::cancel() + st.receive2_cancelled = true; + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); +} + +// Operations can still run after connection::cancel() +void test_after_connection_cancel() +{ + connection_state st; + receive_fsm fsm; + st.receive2_cancelled = true; + + // The operation initiates and runs normally + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // Reconnection behavior not affected + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); // still running + + // Simulate a connection::cancel() + st.receive2_cancelled = true; + act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); +} + +// Per-operation cancellation is supported +void test_per_operation_cancellation(std::string_view name, cancellation_type_t type) +{ + std::cerr << "Running cancellation case " << name << std::endl; + + connection_state st; + receive_fsm fsm; + + // The operation initiates and runs normally + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // Cancellation is received + act = fsm.resume(st, channel_errc::channel_cancelled, type); + BOOST_TEST_EQ(act, error_code(net::error::operation_aborted)); + BOOST_TEST_NOT(st.receive2_running); +} + +// Only a single instance of async_receive2 can be running at the same time +void test_error_already_running() +{ + connection_state st; + receive_fsm fsm; + st.receive2_running = true; + + // The operation fails immediately + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::immediate); + BOOST_TEST(st.receive2_running); // not affected + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::already_running)); + BOOST_TEST(st.receive2_running); // not affected +} + +// If an unknown error was obtained during channel receive, we propagate it +void test_error_unknown() +{ + connection_state st; + receive_fsm fsm; + + // Initiate + auto act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::setup_cancellation); + act = fsm.resume(st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, action_type::wait); + BOOST_TEST(st.receive2_running); + + // We have an unknown error + act = fsm.resume(st, channel_errc::channel_closed, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(channel_errc::channel_closed)); + BOOST_TEST_NOT(st.receive2_running); +} + +} // namespace + +int main() +{ + test_success(); + test_cancelled_reconnection(); + test_cancelled_connection_cancel(); + test_after_connection_cancel(); + + test_per_operation_cancellation("terminal", cancellation_type_t::terminal); + test_per_operation_cancellation("partial", cancellation_type_t::partial); + test_per_operation_cancellation("total", cancellation_type_t::total); + test_per_operation_cancellation("all", cancellation_type_t::all); + + test_error_already_running(); + test_error_unknown(); + + return boost::report_errors(); +} diff --git a/test/test_unix_sockets.cpp b/test/test_unix_sockets.cpp index be4fa139..ec53b583 100644 --- a/test/test_unix_sockets.cpp +++ b/test/test_unix_sockets.cpp @@ -78,7 +78,6 @@ void test_reconnection() connection conn{ioc}; auto cfg = make_test_config(); cfg.unix_socket = unix_socket_path; - cfg.reconnect_wait_interval = 10ms; // make the test run faster request ping_request; ping_request.get_config().cancel_if_not_connected = false;