From d434844fe08441818901d97403fe7d740373fb9c Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 11:43:10 +0100 Subject: [PATCH 01/23] Migrate test_conn_push to lightweight_test --- test/test_conn_push.cpp | 98 ++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index c3f1c339f..4fd078af5 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -10,17 +10,15 @@ #include #include +#include +#include #include -#include - -#define BOOST_TEST_MODULE conn_push -#include - #include "common.hpp" #include #include +#include namespace net = boost::asio; namespace redis = boost::redis; @@ -38,7 +36,7 @@ using namespace std::chrono_literals; namespace { -BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) +void receives_push_waiting_resps() { request req1; req1.push("HELLO", 3); @@ -64,13 +62,13 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) auto c2 = [&, conn](error_code ec, std::size_t) { c2_called = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req3, ignore, c3); }; auto c1 = [&, conn](error_code ec, std::size_t) { c1_called = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c2); }; @@ -80,7 +78,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) conn->async_receive([&, conn](error_code ec, std::size_t) { std::cout << "async_receive" << std::endl; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); push_received = true; conn->cancel(); }); @@ -93,7 +91,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) BOOST_TEST(c3_called); } -BOOST_AUTO_TEST_CASE(push_received1) +void push_received1() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -111,28 +109,26 @@ BOOST_AUTO_TEST_CASE(push_received1) conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { exec_finished = true; std::cout << "async_exec" << std::endl; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); }); conn->async_receive([&, conn](error_code ec, std::size_t) { push_received = true; std::cout << "(1) async_receive" << std::endl; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); // Receives the second push synchronously. error_code ec2; std::size_t res = 0; res = conn->receive(ec2); - BOOST_TEST(!ec2); - BOOST_TEST(res != std::size_t(0)); + BOOST_TEST_EQ(ec2, error_code()); + BOOST_TEST_NE(res, 0u); // Tries to receive a third push synchronously. ec2 = {}; res = conn->receive(ec2); - BOOST_CHECK_EQUAL( - ec2, - boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed)); + BOOST_TEST_EQ(ec2, error_code(boost::redis::error::sync_receive_push_failed)); conn->cancel(); }); @@ -144,7 +140,7 @@ BOOST_AUTO_TEST_CASE(push_received1) BOOST_TEST(push_received); } -BOOST_AUTO_TEST_CASE(push_filtered_out) +void push_filtered_out() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -161,12 +157,12 @@ BOOST_AUTO_TEST_CASE(push_filtered_out) conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { exec_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); }); conn->async_receive([&, conn](error_code ec, std::size_t) { push_received = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->cancel(operation::reconnection); }); @@ -176,8 +172,8 @@ BOOST_AUTO_TEST_CASE(push_filtered_out) BOOST_TEST(exec_finished); BOOST_TEST(push_received); - BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG"); - BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK"); + BOOST_TEST_EQ(std::get<1>(resp).value(), "PONG"); + BOOST_TEST_EQ(std::get<2>(resp).value(), "OK"); } struct response_error_tag { }; @@ -197,7 +193,7 @@ struct response_error_adapter { auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } -BOOST_AUTO_TEST_CASE(test_push_adapter) +void test_push_adapter() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -213,19 +209,19 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) bool push_received = false, exec_finished = false, run_finished = false; conn->async_receive([&, conn](error_code ec, std::size_t) { - BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, boost::asio::experimental::error::channel_cancelled); push_received = true; }); conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_TEST_EQ(ec, boost::system::errc::errc_t::operation_canceled); exec_finished = true; }); auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; conn->async_run(cfg, [&run_finished](error_code ec) { - BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); + BOOST_TEST_EQ(ec, redis::error::incompatible_size); run_finished = true; }); @@ -242,14 +238,14 @@ void launch_push_consumer(std::shared_ptr conn) { conn->async_receive([conn](error_code ec, std::size_t) { if (ec) { - BOOST_TEST(ec == net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); return; } launch_push_consumer(conn); }); } -BOOST_AUTO_TEST_CASE(many_subscribers) +void many_subscribers() { request req0; req0.get_config().cancel_on_connection_lost = false; @@ -273,52 +269,52 @@ BOOST_AUTO_TEST_CASE(many_subscribers) bool finished = false; auto c11 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->cancel(operation::reconnection); finished = true; }; auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req3, ignore, c11); }; auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c10); }; auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c9); }; auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c8); }; auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c7); }; auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c6); }; auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c5); }; auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c4); }; auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c3); }; auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c2); }; auto c0 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c1); }; @@ -331,7 +327,7 @@ BOOST_AUTO_TEST_CASE(many_subscribers) BOOST_TEST(finished); } -BOOST_AUTO_TEST_CASE(test_unsubscribe) +void test_unsubscribe() { net::io_context ioc; connection conn{ioc}; @@ -358,7 +354,7 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) run_finished = false; auto on_ping = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); ping_finished = true; BOOST_TEST(std::get<0>(resp_ping).has_value()); BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe"); @@ -367,7 +363,7 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) auto on_unsubscribe = [&](error_code ec, std::size_t) { unsubscribe_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(std::get<0>(resp_unsubscribe).has_value()); BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2"); BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1"); @@ -376,7 +372,7 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) auto on_subscribe = [&](error_code ec, std::size_t) { subscribe_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(std::get<0>(resp_subscribe).has_value()); BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3"); BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2"); @@ -386,7 +382,7 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) conn.async_exec(req_subscribe, resp_subscribe, on_subscribe); conn.async_run(make_test_config(), [&run_finished](error_code ec) { - BOOST_TEST(ec == net::error::operation_aborted); + BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); @@ -399,3 +395,15 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) } } // namespace + +int main() +{ + receives_push_waiting_resps(); + push_received1(); + push_filtered_out(); + test_push_adapter(); + many_subscribers(); + test_unsubscribe(); + + return boost::report_errors(); +} From 0ac723c035dcdc317a4e2c5ce1fd88f5ec5e81d6 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 11:45:54 +0100 Subject: [PATCH 02/23] using cleanup --- test/test_conn_push.cpp | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 4fd078af5..d9a1709e4 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -7,12 +7,11 @@ #include #include #include +#include #include #include #include -#include -#include #include "common.hpp" @@ -21,18 +20,9 @@ #include namespace net = boost::asio; -namespace redis = boost::redis; - -using boost::redis::operation; -using boost::redis::connection; -using boost::system::error_code; -using boost::redis::request; -using boost::redis::response; -using boost::redis::ignore; -using boost::redis::ignore_t; -using boost::system::error_code; -using boost::redis::logger; +using namespace boost::redis; using namespace std::chrono_literals; +using boost::system::error_code; namespace { @@ -128,7 +118,7 @@ void push_received1() // Tries to receive a third push synchronously. ec2 = {}; res = conn->receive(ec2); - BOOST_TEST_EQ(ec2, error_code(boost::redis::error::sync_receive_push_failed)); + BOOST_TEST_EQ(ec2, error::sync_receive_push_failed); conn->cancel(); }); @@ -182,13 +172,7 @@ response_error_tag error_tag_obj; struct response_error_adapter { void on_init() { } void on_done() { } - - void on_node( - boost::redis::resp3::basic_node const&, - boost::system::error_code& ec) - { - ec = boost::redis::error::incompatible_size; - } + void on_node(resp3::node_view const&, error_code& ec) { ec = error::incompatible_size; } }; auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } @@ -221,7 +205,7 @@ void test_push_adapter() auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; conn->async_run(cfg, [&run_finished](error_code ec) { - BOOST_TEST_EQ(ec, redis::error::incompatible_size); + BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; }); From 299ca9c9d02c6ff2232ae1638e575bd3def8c916 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 12:23:50 +0100 Subject: [PATCH 03/23] cleanup receive waiting for push --- test/test_conn_push.cpp | 65 +++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index d9a1709e4..da8c95ba5 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -26,59 +27,47 @@ using boost::system::error_code; namespace { -void receives_push_waiting_resps() +// async_receive is outstanding when a push is received +void receive_waiting_for_push() { + net::io_context ioc; + connection conn{ioc}; + request req1; - req1.push("HELLO", 3); req1.push("PING", "Message1"); + req1.push("SUBSCRIBE", "channel"); request req2; - req2.push("SUBSCRIBE", "channel"); - - request req3; - req3.push("PING", "Message2"); - req3.push("QUIT"); - - net::io_context ioc; - - auto conn = std::make_shared(ioc); - - bool push_received = false, c1_called = false, c2_called = false, c3_called = false; - - auto c3 = [&](error_code ec, std::size_t) { - c3_called = true; - std::cout << "c3: " << ec.message() << std::endl; - }; + req2.push("PING", "Message2"); - auto c2 = [&, conn](error_code ec, std::size_t) { - c2_called = true; - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req3, ignore, c3); - }; + bool run_finished = false, push_received = false, exec1_finished = false, exec2_finished = false; - auto c1 = [&, conn](error_code ec, std::size_t) { - c1_called = true; + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c2); - }; - - conn->async_exec(req1, ignore, c1); - - run(conn, make_test_config(), {}); + exec1_finished = true; + }); - conn->async_receive([&, conn](error_code ec, std::size_t) { - std::cout << "async_receive" << std::endl; + conn.async_receive([&](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, error_code()); push_received = true; - conn->cancel(); + conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) { + BOOST_TEST_EQ(ec2, error_code()); + exec2_finished = true; + conn.cancel(); + }); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; }); ioc.run_for(test_timeout); BOOST_TEST(push_received); - BOOST_TEST(c1_called); - BOOST_TEST(c2_called); - BOOST_TEST(c3_called); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); + BOOST_TEST(run_finished); } void push_received1() @@ -382,7 +371,7 @@ void test_unsubscribe() int main() { - receives_push_waiting_resps(); + receive_waiting_for_push(); push_received1(); push_filtered_out(); test_push_adapter(); From 835eb2390320716e5ee1c60e28a578899f34768e Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 12:27:46 +0100 Subject: [PATCH 04/23] validate push response --- test/test_conn_push.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index da8c95ba5..e36064866 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -30,8 +31,10 @@ namespace { // async_receive is outstanding when a push is received void receive_waiting_for_push() { + resp3::flat_tree resp; net::io_context ioc; connection conn{ioc}; + conn.set_receive_response(resp); request req1; req1.push("PING", "Message1"); @@ -42,19 +45,23 @@ void receive_waiting_for_push() bool run_finished = false, push_received = false, exec1_finished = false, exec2_finished = false; + auto on_exec2 = [&](error_code ec2, std::size_t) { + BOOST_TEST_EQ(ec2, error_code()); + exec2_finished = true; + conn.cancel(); + }; + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, error_code()); exec1_finished = true; }); - conn.async_receive([&](error_code ec, std::size_t) { + conn.async_receive([&](error_code ec, std::size_t num_pushes) { BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_GT(num_pushes, 0u); + BOOST_TEST_GT(resp.get_view().size(), 0u); push_received = true; - conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) { - BOOST_TEST_EQ(ec2, error_code()); - exec2_finished = true; - conn.cancel(); - }); + conn.async_exec(req2, ignore, on_exec2); }); conn.async_run(make_test_config(), [&](error_code ec) { From 76bbd293e08ecb4115c21e4874826e890f78d7d7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 12:33:14 +0100 Subject: [PATCH 05/23] refactor push available test 1 --- test/test_conn_push.cpp | 65 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index e36064866..db558a4d0 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -29,7 +29,7 @@ using boost::system::error_code; namespace { // async_receive is outstanding when a push is received -void receive_waiting_for_push() +void test_async_receive_waiting_for_push() { resp3::flat_tree resp; net::io_context ioc; @@ -77,15 +77,63 @@ void receive_waiting_for_push() BOOST_TEST(run_finished); } -void push_received1() +// A push is already available when async_receive is called +void test_async_receive_push_available() { net::io_context ioc; + connection conn{ioc}; + + request req; + req.push("SUBSCRIBE", "channel1"); + req.push("SUBSCRIBE", "channel2"); + + bool push_received = false, exec_finished = false, run_finished = false; + + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + std::cout << "async_exec" << std::endl; + BOOST_TEST_EQ(ec, error_code()); + }); + + conn.async_receive([&](error_code ec, std::size_t) { + push_received = true; + std::cout << "(1) async_receive" << std::endl; + + BOOST_TEST_EQ(ec, error_code()); + + // Receives the second push synchronously. + error_code ec2; + std::size_t res = 0; + res = conn.receive(ec2); + BOOST_TEST_EQ(ec2, error_code()); + BOOST_TEST_NE(res, 0u); + + // Tries to receive a third push synchronously. + ec2 = {}; + res = conn.receive(ec2); + BOOST_TEST_EQ(ec2, error::sync_receive_push_failed); + + conn.cancel(); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(push_received); + BOOST_TEST(run_finished); +} + +void async_receive_push_available2() +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); - // Trick: Uses SUBSCRIBE because this command has no response or - // better said, its response is a server push, which is what we - // want to test. We send two because we want to test both - // async_receive and receive. request req; req.push("SUBSCRIBE", "channel1"); req.push("SUBSCRIBE", "channel2"); @@ -378,8 +426,9 @@ void test_unsubscribe() int main() { - receive_waiting_for_push(); - push_received1(); + test_async_receive_waiting_for_push(); + test_async_receive_push_available(); + async_receive_push_available2(); push_filtered_out(); test_push_adapter(); many_subscribers(); From beac60e62895f249f94d359b110dd89d7ad1b3f9 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 12:41:41 +0100 Subject: [PATCH 06/23] make test stronger --- test/test_conn_push.cpp | 46 ++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index db558a4d0..6ba169cda 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -38,7 +38,7 @@ void test_async_receive_waiting_for_push() request req1; req1.push("PING", "Message1"); - req1.push("SUBSCRIBE", "channel"); + req1.push("SUBSCRIBE", "test_async_receive_waiting_for_push"); request req2; req2.push("PING", "Message2"); @@ -56,10 +56,9 @@ void test_async_receive_waiting_for_push() exec1_finished = true; }); - conn.async_receive([&](error_code ec, std::size_t num_pushes) { + conn.async_receive([&](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_GT(num_pushes, 0u); - BOOST_TEST_GT(resp.get_view().size(), 0u); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); push_received = true; conn.async_exec(req2, ignore, on_exec2); }); @@ -82,38 +81,29 @@ void test_async_receive_push_available() { net::io_context ioc; connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); + // SUBSCRIBE doesn't have a response, but causes a push to be delivered. + // Add a PING so the overall request has a response. + // This ensures that when async_exec completes, the push has been delivered request req; - req.push("SUBSCRIBE", "channel1"); - req.push("SUBSCRIBE", "channel2"); + req.push("SUBSCRIBE", "test_async_receive_push_available"); + req.push("PING", "message"); bool push_received = false, exec_finished = false, run_finished = false; - conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { - exec_finished = true; - std::cout << "async_exec" << std::endl; - BOOST_TEST_EQ(ec, error_code()); - }); - - conn.async_receive([&](error_code ec, std::size_t) { + auto on_receive = [&](error_code ec, std::size_t) { push_received = true; - std::cout << "(1) async_receive" << std::endl; - BOOST_TEST_EQ(ec, error_code()); - - // Receives the second push synchronously. - error_code ec2; - std::size_t res = 0; - res = conn.receive(ec2); - BOOST_TEST_EQ(ec2, error_code()); - BOOST_TEST_NE(res, 0u); - - // Tries to receive a third push synchronously. - ec2 = {}; - res = conn.receive(ec2); - BOOST_TEST_EQ(ec2, error::sync_receive_push_failed); - + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); conn.cancel(); + }; + + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST_EQ(ec, error_code()); + conn.async_receive(on_receive); }); conn.async_run(make_test_config(), [&](error_code ec) { From 83ae312d27f9c7c8b7b66511f1c33f19d2a377b8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 12:53:26 +0100 Subject: [PATCH 07/23] sync receive --- test/test_conn_push.cpp | 66 ++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 6ba169cda..25e0cc69f 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -18,7 +18,6 @@ #include "common.hpp" #include -#include #include namespace net = boost::asio; @@ -118,50 +117,63 @@ void test_async_receive_push_available() BOOST_TEST(run_finished); } -void async_receive_push_available2() +// Synchronous receive can be used to try to read a message +void test_sync_receive() { net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); - auto conn = std::make_shared(ioc); - + // Subscribing to 2 channels causes 2 pushes to be delivered. + // Adding a PING guarantees that after exec finishes, the push has been read request req; - req.push("SUBSCRIBE", "channel1"); - req.push("SUBSCRIBE", "channel2"); + req.push("SUBSCRIBE", "test_sync_receive_channel1"); + req.push("SUBSCRIBE", "test_sync_receive_channel2"); + req.push("PING", "message"); - bool push_received = false, exec_finished = false; + bool exec_finished = false, run_finished = false; - conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { exec_finished = true; - std::cout << "async_exec" << std::endl; BOOST_TEST_EQ(ec, error_code()); - }); - conn->async_receive([&, conn](error_code ec, std::size_t) { - push_received = true; - std::cout << "(1) async_receive" << std::endl; + // At this point, the receive response contains all the pushes + BOOST_TEST_EQ(resp.get_total_msgs(), 2u); + // Receive the 1st push synchronously + std::size_t push_bytes = conn.receive(ec); BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_GT(push_bytes, 0u); - // Receives the second push synchronously. - error_code ec2; - std::size_t res = 0; - res = conn->receive(ec2); - BOOST_TEST_EQ(ec2, error_code()); - BOOST_TEST_NE(res, 0u); + // Receive the 2nd push synchronously + push_bytes = conn.receive(ec); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_GT(push_bytes, 0u); - // Tries to receive a third push synchronously. - ec2 = {}; - res = conn->receive(ec2); - BOOST_TEST_EQ(ec2, error::sync_receive_push_failed); + // There are no more pushes. Trying to receive one more fails + push_bytes = conn.receive(ec); + BOOST_TEST_EQ(ec, error::sync_receive_push_failed); + BOOST_TEST_EQ(push_bytes, 0u); - conn->cancel(); + conn.cancel(); }); - run(conn); + conn.async_run(make_test_config(), [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + // Trying to receive a push before one is received fails + error_code ec; + std::size_t push_bytes = conn.receive(ec); + BOOST_TEST_EQ(ec, error::sync_receive_push_failed); + BOOST_TEST_EQ(push_bytes, 0u); + ioc.run_for(test_timeout); BOOST_TEST(exec_finished); - BOOST_TEST(push_received); + BOOST_TEST(run_finished); } void push_filtered_out() @@ -418,7 +430,7 @@ int main() { test_async_receive_waiting_for_push(); test_async_receive_push_available(); - async_receive_push_available2(); + test_sync_receive(); push_filtered_out(); test_push_adapter(); many_subscribers(); From bcc2c9176fdd18d1d184456a0d217ff83e50039d Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 13:08:52 +0100 Subject: [PATCH 08/23] push adapter error --- test/test_conn_push.cpp | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 25e0cc69f..9b336eee2 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -212,6 +212,7 @@ void push_filtered_out() BOOST_TEST_EQ(std::get<2>(resp).value(), "OK"); } +// An adapter that always errors struct response_error_tag { }; response_error_tag error_tag_obj; @@ -223,34 +224,38 @@ struct response_error_adapter { auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } -void test_push_adapter() +// If the push adapter returns an error, the connection is torn down +// (a reconnection would be triggered) +// TODO: this test should be in push2 +void test_push_adapter_error() { net::io_context ioc; - auto conn = std::make_shared(ioc); + connection conn{ioc}; + conn.set_receive_response(error_tag_obj); request req; - req.push("HELLO", 3); req.push("PING"); req.push("SUBSCRIBE", "channel"); req.push("PING"); - conn->set_receive_response(error_tag_obj); - bool push_received = false, exec_finished = false, run_finished = false; - conn->async_receive([&, conn](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, boost::asio::experimental::error::channel_cancelled); + // async_receive is cancelled every reconnection cycle + conn.async_receive([&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); push_received = true; }); - conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, boost::system::errc::errc_t::operation_canceled); + // The request is cancelled because the PING response isn't processed + // by the time the error is generated + conn.async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); exec_finished = true; }); auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 0s; - conn->async_run(cfg, [&run_finished](error_code ec) { + cfg.reconnect_wait_interval = 0s; // so we can validate the generated error + conn.async_run(cfg, [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; }); @@ -259,11 +264,11 @@ void test_push_adapter() BOOST_TEST(push_received); BOOST_TEST(exec_finished); BOOST_TEST(run_finished); - - // TODO: Reset the ioc reconnect and send a quit to ensure - // reconnection is possible after an error. } +// TODO: push adapter errors trigger a reconnection +// TODO: async_receive is cancelled when a reconnection happens + void launch_push_consumer(std::shared_ptr conn) { conn->async_receive([conn](error_code ec, std::size_t) { @@ -431,8 +436,8 @@ int main() test_async_receive_waiting_for_push(); test_async_receive_push_available(); test_sync_receive(); + test_push_adapter_error(); push_filtered_out(); - test_push_adapter(); many_subscribers(); test_unsubscribe(); From 4d06064bf23e2a16dba5610c6f0c90cfa1e988a8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 13:15:33 +0100 Subject: [PATCH 09/23] test_exec_push_interleaved --- test/test_conn_push.cpp | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 9b336eee2..648a44601 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -176,40 +176,48 @@ void test_sync_receive() BOOST_TEST(run_finished); } -void push_filtered_out() +// A push may be interleaved between regular responses. +// It is handed to the receive adapter (filtered out). +void test_exec_push_interleaved() { net::io_context ioc; - auto conn = std::make_shared(ioc); + connection conn{ioc}; + resp3::flat_tree receive_resp; + conn.set_receive_response(receive_resp); request req; - req.push("HELLO", 3); - req.push("PING"); - req.push("SUBSCRIBE", "channel"); - req.push("QUIT"); + req.push("PING", "msg1"); + req.push("SUBSCRIBE", "test_exec_push_interleaved"); + req.push("PING", "msg2"); - response resp; + response resp; - bool exec_finished = false, push_received = false; + bool exec_finished = false, push_received = false, run_finished = false; - conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { + conn.async_exec(req, resp, [&](error_code ec, std::size_t) { exec_finished = true; BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(std::get<0>(resp).value(), "msg1"); + BOOST_TEST_EQ(std::get<1>(resp).value(), "msg2"); + conn.cancel(); }); - conn->async_receive([&, conn](error_code ec, std::size_t) { + conn.async_receive([&](error_code ec, std::size_t) { push_received = true; BOOST_TEST_EQ(ec, error_code()); - conn->cancel(operation::reconnection); + BOOST_TEST_EQ(receive_resp.get_total_msgs(), 1u); }); - run(conn); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); ioc.run_for(test_timeout); + BOOST_TEST(exec_finished); BOOST_TEST(push_received); - - BOOST_TEST_EQ(std::get<1>(resp).value(), "PONG"); - BOOST_TEST_EQ(std::get<2>(resp).value(), "OK"); + BOOST_TEST(run_finished); } // An adapter that always errors @@ -436,8 +444,8 @@ int main() test_async_receive_waiting_for_push(); test_async_receive_push_available(); test_sync_receive(); + test_exec_push_interleaved(); test_push_adapter_error(); - push_filtered_out(); many_subscribers(); test_unsubscribe(); From 878d1a3afd70426c18c1a1d1bc8e61adeaddc7e8 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 13:29:01 +0100 Subject: [PATCH 10/23] consecutive receives --- test/test_conn_push.cpp | 184 ++++++++++++++++++++-------------------- 1 file changed, 92 insertions(+), 92 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 648a44601..27358202b 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include "common.hpp" @@ -277,98 +278,97 @@ void test_push_adapter_error() // TODO: push adapter errors trigger a reconnection // TODO: async_receive is cancelled when a reconnection happens -void launch_push_consumer(std::shared_ptr conn) -{ - conn->async_receive([conn](error_code ec, std::size_t) { - if (ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - return; - } - launch_push_consumer(conn); - }); -} - -void many_subscribers() -{ - request req0; - req0.get_config().cancel_on_connection_lost = false; - req0.push("HELLO", 3); - - request req1; - req1.get_config().cancel_on_connection_lost = false; - req1.push("PING", "Message1"); - - request req2; - req2.get_config().cancel_on_connection_lost = false; - req2.push("SUBSCRIBE", "channel"); - - request req3; - req3.get_config().cancel_on_connection_lost = false; - req3.push("QUIT"); - +// After an async_receive operation finishes, another one can be issued +struct test_consecutive_receives { net::io_context ioc; - auto conn = std::make_shared(ioc); - - bool finished = false; - - auto c11 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->cancel(operation::reconnection); - finished = true; - }; - auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req3, ignore, c11); - }; - auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c10); - }; - auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c9); - }; - auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c8); - }; - auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c7); - }; - auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c6); - }; - auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c5); - }; - auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c4); - }; - auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c3); - }; - auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c2); - }; - auto c0 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c1); - }; - - conn->async_exec(req0, ignore, c0); - launch_push_consumer(conn); - - run(conn, make_test_config(), {}); - - ioc.run_for(test_timeout); - BOOST_TEST(finished); -} + connection conn{ioc}; + resp3::flat_tree resp; + bool push_consumer_finished{false}; + + void launch_push_consumer() + { + conn.async_receive([this](error_code ec, std::size_t) { + if (ec) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + push_consumer_finished = true; + resp.clear(); + return; + } + launch_push_consumer(); + }); + } + + void run() + { + conn.set_receive_response(resp); + + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); + + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); + + bool exec_finished = false, run_finished = false; + + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + conn.cancel(); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c2); + }; + + conn.async_exec(req1, ignore, c1); + launch_push_consumer(); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + BOOST_TEST(push_consumer_finished); + } +}; void test_unsubscribe() { @@ -446,7 +446,7 @@ int main() test_sync_receive(); test_exec_push_interleaved(); test_push_adapter_error(); - many_subscribers(); + test_consecutive_receives{}.run(); test_unsubscribe(); return boost::report_errors(); From 0db98bc51d0d7d561010eebc26abc07d9d620248 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 13:30:11 +0100 Subject: [PATCH 11/23] remove duplicate test_unsubscribe --- test/test_conn_push.cpp | 68 ----------------------------------------- 1 file changed, 68 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 27358202b..6e7ca84a2 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -370,73 +370,6 @@ struct test_consecutive_receives { } }; -void test_unsubscribe() -{ - net::io_context ioc; - connection conn{ioc}; - - // Subscribe to 3 channels and 2 patterns. Use CLIENT INFO to verify this took effect - request req_subscribe; - req_subscribe.push("SUBSCRIBE", "ch1", "ch2", "ch3"); - req_subscribe.push("PSUBSCRIBE", "ch1*", "ch2*"); - req_subscribe.push("CLIENT", "INFO"); - - // Then, unsubscribe from some of them, and verify again - request req_unsubscribe; - req_unsubscribe.push("UNSUBSCRIBE", "ch1"); - req_unsubscribe.push("PUNSUBSCRIBE", "ch2*"); - req_unsubscribe.push("CLIENT", "INFO"); - - // Finally, ping to verify that the connection is still usable - request req_ping; - req_ping.push("PING", "test_unsubscribe"); - - response resp_subscribe, resp_unsubscribe, resp_ping; - - bool subscribe_finished = false, unsubscribe_finished = false, ping_finished = false, - run_finished = false; - - auto on_ping = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - ping_finished = true; - BOOST_TEST(std::get<0>(resp_ping).has_value()); - BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe"); - conn.cancel(); - }; - - auto on_unsubscribe = [&](error_code ec, std::size_t) { - unsubscribe_finished = true; - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST(std::get<0>(resp_unsubscribe).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2"); - BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1"); - conn.async_exec(req_ping, resp_ping, on_ping); - }; - - auto on_subscribe = [&](error_code ec, std::size_t) { - subscribe_finished = true; - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST(std::get<0>(resp_subscribe).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3"); - BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2"); - conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe); - }; - - conn.async_exec(req_subscribe, resp_subscribe, on_subscribe); - - conn.async_run(make_test_config(), [&run_finished](error_code ec) { - BOOST_TEST_EQ(ec, net::error::operation_aborted); - run_finished = true; - }); - - ioc.run_for(test_timeout); - - BOOST_TEST(subscribe_finished); - BOOST_TEST(unsubscribe_finished); - BOOST_TEST(ping_finished); - BOOST_TEST(run_finished); -} - } // namespace int main() @@ -447,7 +380,6 @@ int main() test_exec_push_interleaved(); test_push_adapter_error(); test_consecutive_receives{}.run(); - test_unsubscribe(); return boost::report_errors(); } From 8dc64ccbeca72b2fb8f12c8e101e5eeb95714f97 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 13:36:40 +0100 Subject: [PATCH 12/23] push adapter error reconnection --- test/test_conn_push.cpp | 59 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 6e7ca84a2..039baae97 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #include #include @@ -234,7 +235,6 @@ struct response_error_adapter { auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } // If the push adapter returns an error, the connection is torn down -// (a reconnection would be triggered) // TODO: this test should be in push2 void test_push_adapter_error() { @@ -275,7 +275,61 @@ void test_push_adapter_error() BOOST_TEST(run_finished); } -// TODO: push adapter errors trigger a reconnection +// A push response error triggers a reconnection +// TODO: this test should be in push2 +void test_push_adapter_error_reconnection() +{ + net::io_context ioc; + connection conn{ioc}; + conn.set_receive_response(error_tag_obj); + + request req; + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("PING"); + + request req2; + req2.push("PING", "msg2"); + req2.get_config().cancel_if_unresponded = false; + + response resp; + + bool push_received = false, exec_finished = false, run_finished = false; + + // async_receive is cancelled every reconnection cycle + conn.async_receive([&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + push_received = true; + }); + + auto on_exec2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(std::get<0>(resp).value(), "msg2"); + exec_finished = true; + conn.cancel(); + }; + + // The request is cancelled because the PING response isn't processed + // by the time the error is generated + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + conn.async_exec(req2, resp, on_exec2); + }); + + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 50ms; // make the test run faster + conn.async_run(cfg, [&run_finished](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(push_received); + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); +} + // TODO: async_receive is cancelled when a reconnection happens // After an async_receive operation finishes, another one can be issued @@ -379,6 +433,7 @@ int main() test_sync_receive(); test_exec_push_interleaved(); test_push_adapter_error(); + test_push_adapter_error_reconnection(); test_consecutive_receives{}.run(); return boost::report_errors(); From c98ff42ef91aeae944112321f3913510d2ba7273 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:05:22 +0100 Subject: [PATCH 13/23] async_receive cancel on reconnection --- test/test_conn_push.cpp | 101 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 2 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 039baae97..12ba77ca5 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -222,6 +222,104 @@ void test_exec_push_interleaved() BOOST_TEST(run_finished); } +// async_receive is cancelled every time a reconnection happens, +// so we can re-establish subscriptions +struct test_async_receive_cancelled_on_reconnection { + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp{}; + request req_subscribe{}, req_quit{}; + bool receive_finished = false, quit_finished = false; + + // Subscribe to a channel. This will cause a push to be received + void start_subscribe1() + { + conn.async_exec(req_subscribe, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive1(); + }); + } + + // Receive the push triggered by the subscribe + void start_receive1() + { + conn.async_receive([this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + resp.clear(); + + // In parallel, trigger a reconnection and start a receive operation + start_receive_reconnection(); + start_quit(); + }); + } + + // The next receive operation will be cancelled by the reconnection + void start_receive_reconnection() + { + conn.async_receive([this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::experimental::channel_errc::channel_cancelled); + BOOST_TEST_EQ(resp.get_total_msgs(), 0u); + start_subscribe2(); + }); + } + + // Trigger a reconnection. This is a "leaf" operation + void start_quit() + { + conn.async_exec(req_quit, ignore, [this](error_code, std::size_t) { + quit_finished = true; + }); + } + + // Resubscribe after the reconnection + void start_subscribe2() + { + conn.async_exec(req_subscribe, ignore, [this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + start_receive2(); + }); + } + + // Receive the push triggered by the 2nd subscribe + void start_receive2() + { + conn.async_receive([this](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + receive_finished = true; + conn.cancel(); + }); + } + + void run() + { + req_subscribe.push("SUBSCRIBE", "test_async_receive_cancelled_on_reconnection"); + req_subscribe.push("PING"); + + req_quit.push("QUIT"); + + conn.set_receive_response(resp); + + bool run_finished = false; + + start_subscribe1(); + + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 50ms; // make the test run faster + conn.async_run(cfg, [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(run_finished); + BOOST_TEST(receive_finished); + BOOST_TEST(quit_finished); + } +}; + // An adapter that always errors struct response_error_tag { }; response_error_tag error_tag_obj; @@ -330,8 +428,6 @@ void test_push_adapter_error_reconnection() BOOST_TEST(run_finished); } -// TODO: async_receive is cancelled when a reconnection happens - // After an async_receive operation finishes, another one can be issued struct test_consecutive_receives { net::io_context ioc; @@ -432,6 +528,7 @@ int main() test_async_receive_push_available(); test_sync_receive(); test_exec_push_interleaved(); + test_async_receive_cancelled_on_reconnection{}.run(); test_push_adapter_error(); test_push_adapter_error_reconnection(); test_consecutive_receives{}.run(); From f37dac4ab38ab5b2ba9909b8c50428cbaaf015ab Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:11:21 +0100 Subject: [PATCH 14/23] migrate push2 to lightweight test --- test/test_conn_push2.cpp | 150 ++++++++++++++++++++++----------------- 1 file changed, 83 insertions(+), 67 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 9cf638e92..e5688fdc1 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -11,19 +11,17 @@ #include #include +#include #include -#include -#include -#include - -#define BOOST_TEST_MODULE conn_push -#include - #include "common.hpp" #include #include +#include +#include +#include +#include namespace net = boost::asio; namespace redis = boost::redis; @@ -44,7 +42,7 @@ using namespace std::chrono_literals; namespace { -BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) +void test_receives_push_waiting_resps() { request req1; req1.push("HELLO", 3); @@ -70,13 +68,13 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) auto c2 = [&, conn](error_code ec, std::size_t) { c2_called = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req3, ignore, c3); }; auto c1 = [&, conn](error_code ec, std::size_t) { c1_called = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c2); }; @@ -86,7 +84,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) conn->async_receive2([&, conn](error_code ec) { std::cout << "async_receive2" << std::endl; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); push_received = true; conn->cancel(); }); @@ -99,7 +97,7 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) BOOST_TEST(c3_called); } -BOOST_AUTO_TEST_CASE(push_received1) +void test_push_received1() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -119,16 +117,15 @@ BOOST_AUTO_TEST_CASE(push_received1) conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { exec_finished = true; std::cout << "async_exec" << std::endl; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); }); conn->async_receive2([&, conn](error_code ec) { push_received = true; std::cout << "async_receive2" << std::endl; - BOOST_TEST(ec == error_code()); - - BOOST_CHECK_EQUAL(resp.get_total_msgs(), 2u); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 2u); conn->cancel(); }); @@ -140,7 +137,7 @@ BOOST_AUTO_TEST_CASE(push_received1) BOOST_TEST(push_received); } -BOOST_AUTO_TEST_CASE(push_filtered_out) +void test_push_filtered_out() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -157,12 +154,12 @@ BOOST_AUTO_TEST_CASE(push_filtered_out) conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { exec_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); }); conn->async_receive2([&, conn](error_code ec) { push_received = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->cancel(operation::reconnection); }); @@ -172,8 +169,8 @@ BOOST_AUTO_TEST_CASE(push_filtered_out) BOOST_TEST(exec_finished); BOOST_TEST(push_received); - BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG"); - BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK"); + BOOST_TEST_EQ(std::get<1>(resp).value(), "PONG"); + BOOST_TEST_EQ(std::get<2>(resp).value(), "OK"); } struct response_error_tag { }; @@ -193,7 +190,7 @@ struct response_error_adapter { auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } -BOOST_AUTO_TEST_CASE(test_push_adapter) +void test_test_push_adapter() { net::io_context ioc; auto conn = std::make_shared(ioc); @@ -209,19 +206,19 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) bool push_received = false, exec_finished = false, run_finished = false; conn->async_receive2([&, conn](error_code ec) { - BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, boost::asio::experimental::error::channel_cancelled); push_received = true; }); conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_TEST_EQ(ec, boost::system::errc::errc_t::operation_canceled); exec_finished = true; }); auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; conn->async_run(cfg, [&run_finished](error_code ec) { - BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); + BOOST_TEST_EQ(ec, redis::error::incompatible_size); run_finished = true; }); @@ -238,14 +235,14 @@ void launch_push_consumer(std::shared_ptr conn) { conn->async_receive2([conn](error_code ec) { if (ec) { - BOOST_TEST(ec == net::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); return; } launch_push_consumer(conn); }); } -BOOST_AUTO_TEST_CASE(many_subscribers) +void test_many_subscribers() { request req0; req0.get_config().cancel_on_connection_lost = false; @@ -269,52 +266,52 @@ BOOST_AUTO_TEST_CASE(many_subscribers) bool finished = false; auto c11 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->cancel(operation::reconnection); finished = true; }; auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req3, ignore, c11); }; auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c10); }; auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c9); }; auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c8); }; auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c7); }; auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c6); }; auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c5); }; auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c4); }; auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c3); }; auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req2, ignore, c2); }; auto c0 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); conn->async_exec(req1, ignore, c1); }; @@ -327,7 +324,7 @@ BOOST_AUTO_TEST_CASE(many_subscribers) BOOST_TEST(finished); } -BOOST_AUTO_TEST_CASE(test_unsubscribe) +void test_test_unsubscribe() { net::io_context ioc; connection conn{ioc}; @@ -354,35 +351,35 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) run_finished = false; auto on_ping = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); ping_finished = true; BOOST_TEST(std::get<0>(resp_ping).has_value()); - BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe"); + BOOST_TEST_EQ(std::get<0>(resp_ping).value(), "test_unsubscribe"); conn.cancel(); }; auto on_unsubscribe = [&](error_code ec, std::size_t) { unsubscribe_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(std::get<0>(resp_unsubscribe).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2"); - BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub"), "2"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub"), "1"); conn.async_exec(req_ping, resp_ping, on_ping); }; auto on_subscribe = [&](error_code ec, std::size_t) { subscribe_finished = true; - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(std::get<0>(resp_subscribe).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3"); - BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_subscribe).value(), "sub"), "3"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_subscribe).value(), "psub"), "2"); conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe); }; conn.async_exec(req_subscribe, resp_subscribe, on_subscribe); conn.async_run(make_test_config(), [&run_finished](error_code ec) { - BOOST_TEST(ec == net::error::operation_aborted); + BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); @@ -394,7 +391,7 @@ BOOST_AUTO_TEST_CASE(test_unsubscribe) BOOST_TEST(run_finished); } -class test_pubsub_state_restoration_ { +class test_pubsub_state_restoration { net::io_context ioc; connection conn{ioc}; request req; @@ -408,13 +405,13 @@ class test_pubsub_state_restoration_ { std::set seen_channels, seen_patterns; for (auto it = resp_push.get_view().begin(); it != resp_push.get_view().end();) { // The root element should be a push - BOOST_TEST_REQUIRE(it->data_type == type::push); - BOOST_TEST_REQUIRE(it->aggregate_size >= 2u); - BOOST_TEST_REQUIRE((++it != resp_push.get_view().end())); + BOOST_TEST_EQ(it->data_type, type::push); + BOOST_TEST_GE(it->aggregate_size, 2u); + BOOST_TEST(++it != resp_push.get_view().end()); // The next element should be the message type std::string_view msg_type = it->value; - BOOST_TEST_REQUIRE((++it != resp_push.get_view().end())); + BOOST_TEST(++it != resp_push.get_view().end()); // The next element is the channel or pattern if (msg_type == "subscribe") @@ -430,8 +427,16 @@ class test_pubsub_state_restoration_ { const std::string_view expected_channels[] = {"ch1", "ch3", "ch5"}; const std::string_view expected_patterns[] = {"ch1*", "ch3*", "ch4*", "ch8*"}; - BOOST_TEST(seen_channels == expected_channels, boost::test_tools::per_element()); - BOOST_TEST(seen_patterns == expected_patterns, boost::test_tools::per_element()); + BOOST_TEST_ALL_EQ( + seen_channels.begin(), + seen_channels.end(), + std::begin(expected_channels), + std::end(expected_channels)); + BOOST_TEST_ALL_EQ( + seen_patterns.begin(), + seen_patterns.end(), + std::begin(expected_patterns), + std::end(expected_patterns)); } void sub1() @@ -441,7 +446,7 @@ class test_pubsub_state_restoration_ { req.subscribe({"ch1", "ch2", "ch3"}); // active: 1, 2, 3 req.psubscribe({"ch1*", "ch2*", "ch3*", "ch4*"}); // active: 1, 2, 3, 4 conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); unsub(); }); } @@ -454,7 +459,7 @@ class test_pubsub_state_restoration_ { req.unsubscribe({"ch2", "ch1", "ch5"}); // active: 3 req.punsubscribe({"ch2*", "ch4*", "ch9*"}); // active: 1, 3 conn.async_exec(req, ignore, [this](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); sub2(); }); } @@ -476,12 +481,12 @@ class test_pubsub_state_restoration_ { req.push("CLIENT", "INFO"); conn.async_exec(req, resp_str, [this](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); // We are subscribed to 4 channels and 5 patterns BOOST_TEST(std::get<0>(resp_str).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_str).value(), "sub") == "4"); - BOOST_TEST(find_client_info(std::get<0>(resp_str).value(), "psub") == "5"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_str).value(), "sub"), "4"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_str).value(), "psub"), "5"); resp_push.clear(); @@ -507,12 +512,12 @@ class test_pubsub_state_restoration_ { req.get_config().cancel_if_unresponded = false; conn.async_exec(req, resp_str, [this](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); + BOOST_TEST_EQ(ec, error_code()); // We are subscribed to 3 channels and 4 patterns (1 of each didn't survive reconnection) BOOST_TEST(std::get<0>(resp_str).has_value()); - BOOST_TEST(find_client_info(std::get<0>(resp_str).value(), "sub") == "3"); - BOOST_TEST(find_client_info(std::get<0>(resp_str).value(), "psub") == "4"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_str).value(), "sub"), "3"); + BOOST_TEST_EQ(find_client_info(std::get<0>(resp_str).value(), "psub"), "4"); // We have received pushes confirming it check_subscriptions(); @@ -533,7 +538,7 @@ class test_pubsub_state_restoration_ { // Start running bool run_finished = false; conn.async_run(make_test_config(), [&run_finished](error_code ec) { - BOOST_TEST(ec == net::error::operation_aborted); + BOOST_TEST_EQ(ec, net::error::operation_aborted); run_finished = true; }); @@ -545,6 +550,17 @@ class test_pubsub_state_restoration_ { } }; -BOOST_AUTO_TEST_CASE(test_pubsub_state_restoration) { test_pubsub_state_restoration_().run(); } - } // namespace + +int main() +{ + test_receives_push_waiting_resps(); + test_push_received1(); + test_push_filtered_out(); + test_test_push_adapter(); + test_many_subscribers(); + test_test_unsubscribe(); + test_pubsub_state_restoration{}.run(); + + return boost::report_errors(); +} From 39437c26585e8d5544482bffb03dafefac50a80e Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:12:58 +0100 Subject: [PATCH 15/23] purpose comment --- test/test_conn_push.cpp | 3 +++ test/test_conn_push2.cpp | 3 +++ 2 files changed, 6 insertions(+) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 12ba77ca5..190e74ace 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -27,6 +27,9 @@ using namespace boost::redis; using namespace std::chrono_literals; using boost::system::error_code; +// Focuses on the deprecated async_receive and receive +// functions. test_conn_push2 covers the newer receive functionality. + namespace { // async_receive is outstanding when a push is received diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index e5688fdc1..1a9ff53b1 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -40,6 +40,9 @@ using boost::redis::resp3::node_view; using boost::redis::resp3::type; using namespace std::chrono_literals; +// Covers all receive functionality except for the deprecated +// async_receive and receive functions. + namespace { void test_receives_push_waiting_resps() From 4f8bf6621713343a2c8f1123c2aff1bfd186ffc0 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:14:43 +0100 Subject: [PATCH 16/23] using cleanup --- test/test_conn_push2.cpp | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 1a9ff53b1..681aae4bc 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -10,9 +10,9 @@ #include #include +#include #include #include -#include #include "common.hpp" @@ -24,21 +24,12 @@ #include namespace net = boost::asio; -namespace redis = boost::redis; - -using boost::redis::operation; -using boost::redis::connection; -using boost::system::error_code; -using boost::redis::request; -using boost::redis::response; -using boost::redis::resp3::flat_tree; -using boost::redis::ignore; -using boost::redis::ignore_t; -using boost::system::error_code; -using boost::redis::logger; -using boost::redis::resp3::node_view; -using boost::redis::resp3::type; +using namespace boost::redis; using namespace std::chrono_literals; +using boost::system::error_code; +using resp3::flat_tree; +using resp3::node_view; +using resp3::type; // Covers all receive functionality except for the deprecated // async_receive and receive functions. @@ -209,19 +200,19 @@ void test_test_push_adapter() bool push_received = false, exec_finished = false, run_finished = false; conn->async_receive2([&, conn](error_code ec) { - BOOST_TEST_EQ(ec, boost::asio::experimental::error::channel_cancelled); + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); push_received = true; }); conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_TEST_EQ(ec, net::error::operation_aborted); exec_finished = true; }); auto cfg = make_test_config(); cfg.reconnect_wait_interval = 0s; conn->async_run(cfg, [&run_finished](error_code ec) { - BOOST_TEST_EQ(ec, redis::error::incompatible_size); + BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; }); From 63c40f61b42779dfc54856e0fe8ce01037c856be Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:16:36 +0100 Subject: [PATCH 17/23] async_receive2 running when push received --- test/test_conn_push2.cpp | 65 ++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 681aae4bc..3180ce6dd 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -36,59 +36,52 @@ using resp3::type; namespace { -void test_receives_push_waiting_resps() +// async_receive2 is outstanding when a push is received +void test_async_receive2_waiting_for_push() { + resp3::flat_tree resp; + net::io_context ioc; + connection conn{ioc}; + conn.set_receive_response(resp); + request req1; - req1.push("HELLO", 3); req1.push("PING", "Message1"); + req1.push("SUBSCRIBE", "test_async_receive_waiting_for_push"); request req2; - req2.push("SUBSCRIBE", "channel"); + req2.push("PING", "Message2"); - request req3; - req3.push("PING", "Message2"); - req3.push("QUIT"); - - net::io_context ioc; + bool run_finished = false, push_received = false, exec1_finished = false, exec2_finished = false; - auto conn = std::make_shared(ioc); - - bool push_received = false, c1_called = false, c2_called = false, c3_called = false; - - auto c3 = [&](error_code ec, std::size_t) { - c3_called = true; - std::cout << "c3: " << ec.message() << std::endl; - }; - - auto c2 = [&, conn](error_code ec, std::size_t) { - c2_called = true; - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req3, ignore, c3); + auto on_exec2 = [&](error_code ec2, std::size_t) { + BOOST_TEST_EQ(ec2, error_code()); + exec2_finished = true; + conn.cancel(); }; - auto c1 = [&, conn](error_code ec, std::size_t) { - c1_called = true; + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c2); - }; - - conn->async_exec(req1, ignore, c1); - - run(conn, make_test_config(), {}); + exec1_finished = true; + }); - conn->async_receive2([&, conn](error_code ec) { - std::cout << "async_receive2" << std::endl; + conn.async_receive2([&](error_code ec) { BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); push_received = true; - conn->cancel(); + conn.async_exec(req2, ignore, on_exec2); + }); + + conn.async_run(make_test_config(), [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; }); ioc.run_for(test_timeout); BOOST_TEST(push_received); - BOOST_TEST(c1_called); - BOOST_TEST(c2_called); - BOOST_TEST(c3_called); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); + BOOST_TEST(run_finished); } void test_push_received1() @@ -548,7 +541,7 @@ class test_pubsub_state_restoration { int main() { - test_receives_push_waiting_resps(); + test_async_receive2_waiting_for_push(); test_push_received1(); test_push_filtered_out(); test_test_push_adapter(); From 3abf62e0ebb55ab20295da658175fb9fee4134e7 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:18:39 +0100 Subject: [PATCH 18/23] test_async_receive2_push_available --- test/test_conn_push2.cpp | 48 +++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 3180ce6dd..82fded232 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -84,44 +84,46 @@ void test_async_receive2_waiting_for_push() BOOST_TEST(run_finished); } -void test_push_received1() +// A push is already available when async_receive2 is called +void test_async_receive2_push_available() { net::io_context ioc; - auto conn = std::make_shared(ioc); - - flat_tree resp; - conn->set_receive_response(resp); + connection conn{ioc}; + resp3::flat_tree resp; + conn.set_receive_response(resp); - // Trick: Uses SUBSCRIBE because this command has no response or - // better said, its response is a server push, which is what we - // want to test. + // SUBSCRIBE doesn't have a response, but causes a push to be delivered. + // Add a PING so the overall request has a response. + // This ensures that when async_exec completes, the push has been delivered request req; - req.push("SUBSCRIBE", "channel1"); - req.push("SUBSCRIBE", "channel2"); - - bool push_received = false, exec_finished = false; + req.push("SUBSCRIBE", "test_async_receive_push_available"); + req.push("PING", "message"); - conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { - exec_finished = true; - std::cout << "async_exec" << std::endl; - BOOST_TEST_EQ(ec, error_code()); - }); + bool push_received = false, exec_finished = false, run_finished = false; - conn->async_receive2([&, conn](error_code ec) { + auto on_receive = [&](error_code ec, std::size_t) { push_received = true; - std::cout << "async_receive2" << std::endl; + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(resp.get_total_msgs(), 1u); + conn.cancel(); + }; + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + exec_finished = true; BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(resp.get_total_msgs(), 2u); + conn.async_receive(on_receive); + }); - conn->cancel(); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); }); - run(conn); ioc.run_for(test_timeout); BOOST_TEST(exec_finished); BOOST_TEST(push_received); + BOOST_TEST(run_finished); } void test_push_filtered_out() @@ -542,7 +544,7 @@ class test_pubsub_state_restoration { int main() { test_async_receive2_waiting_for_push(); - test_push_received1(); + test_async_receive2_push_available(); test_push_filtered_out(); test_test_push_adapter(); test_many_subscribers(); From f02fe7ec2c52b08b1aaeb3668dbd50d2e0e531bb Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:20:27 +0100 Subject: [PATCH 19/23] push filtered out --- test/test_conn_push.cpp | 45 ---------------------------------------- test/test_conn_push2.cpp | 40 +++++++++++++++++++++-------------- 2 files changed, 24 insertions(+), 61 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 190e74ace..6cba97da9 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -181,50 +181,6 @@ void test_sync_receive() BOOST_TEST(run_finished); } -// A push may be interleaved between regular responses. -// It is handed to the receive adapter (filtered out). -void test_exec_push_interleaved() -{ - net::io_context ioc; - connection conn{ioc}; - resp3::flat_tree receive_resp; - conn.set_receive_response(receive_resp); - - request req; - req.push("PING", "msg1"); - req.push("SUBSCRIBE", "test_exec_push_interleaved"); - req.push("PING", "msg2"); - - response resp; - - bool exec_finished = false, push_received = false, run_finished = false; - - conn.async_exec(req, resp, [&](error_code ec, std::size_t) { - exec_finished = true; - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(std::get<0>(resp).value(), "msg1"); - BOOST_TEST_EQ(std::get<1>(resp).value(), "msg2"); - conn.cancel(); - }); - - conn.async_receive([&](error_code ec, std::size_t) { - push_received = true; - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(receive_resp.get_total_msgs(), 1u); - }); - - conn.async_run(make_test_config(), [&](error_code ec) { - run_finished = true; - BOOST_TEST_EQ(ec, net::error::operation_aborted); - }); - - ioc.run_for(test_timeout); - - BOOST_TEST(exec_finished); - BOOST_TEST(push_received); - BOOST_TEST(run_finished); -} - // async_receive is cancelled every time a reconnection happens, // so we can re-establish subscriptions struct test_async_receive_cancelled_on_reconnection { @@ -530,7 +486,6 @@ int main() test_async_receive_waiting_for_push(); test_async_receive_push_available(); test_sync_receive(); - test_exec_push_interleaved(); test_async_receive_cancelled_on_reconnection{}.run(); test_push_adapter_error(); test_push_adapter_error_reconnection(); diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 82fded232..5fddb8c4e 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -126,40 +126,48 @@ void test_async_receive2_push_available() BOOST_TEST(run_finished); } -void test_push_filtered_out() +// A push may be interleaved between regular responses. +// It is handed to the receive adapter (filtered out). +void test_exec_push_interleaved() { net::io_context ioc; - auto conn = std::make_shared(ioc); + connection conn{ioc}; + resp3::flat_tree receive_resp; + conn.set_receive_response(receive_resp); request req; - req.push("HELLO", 3); - req.push("PING"); - req.push("SUBSCRIBE", "channel"); - req.push("QUIT"); + req.push("PING", "msg1"); + req.push("SUBSCRIBE", "test_exec_push_interleaved"); + req.push("PING", "msg2"); - response resp; + response resp; - bool exec_finished = false, push_received = false; + bool exec_finished = false, push_received = false, run_finished = false; - conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { + conn.async_exec(req, resp, [&](error_code ec, std::size_t) { exec_finished = true; BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(std::get<0>(resp).value(), "msg1"); + BOOST_TEST_EQ(std::get<1>(resp).value(), "msg2"); + conn.cancel(); }); - conn->async_receive2([&, conn](error_code ec) { + conn.async_receive2([&](error_code ec) { push_received = true; BOOST_TEST_EQ(ec, error_code()); - conn->cancel(operation::reconnection); + BOOST_TEST_EQ(receive_resp.get_total_msgs(), 1u); }); - run(conn); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); ioc.run_for(test_timeout); + BOOST_TEST(exec_finished); BOOST_TEST(push_received); - - BOOST_TEST_EQ(std::get<1>(resp).value(), "PONG"); - BOOST_TEST_EQ(std::get<2>(resp).value(), "OK"); + BOOST_TEST(run_finished); } struct response_error_tag { }; @@ -545,7 +553,7 @@ int main() { test_async_receive2_waiting_for_push(); test_async_receive2_push_available(); - test_push_filtered_out(); + test_exec_push_interleaved(); test_test_push_adapter(); test_many_subscribers(); test_test_unsubscribe(); From fd433d962f370196beb50dd13d34ced2d437e3ef Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:24:55 +0100 Subject: [PATCH 20/23] adapter errors --- test/test_conn_push.cpp | 110 --------------------------------------- test/test_conn_push2.cpp | 87 ++++++++++++++++++++++++------- 2 files changed, 68 insertions(+), 129 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index 6cba97da9..bfd055003 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -279,114 +279,6 @@ struct test_async_receive_cancelled_on_reconnection { } }; -// An adapter that always errors -struct response_error_tag { }; -response_error_tag error_tag_obj; - -struct response_error_adapter { - void on_init() { } - void on_done() { } - void on_node(resp3::node_view const&, error_code& ec) { ec = error::incompatible_size; } -}; - -auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } - -// If the push adapter returns an error, the connection is torn down -// TODO: this test should be in push2 -void test_push_adapter_error() -{ - net::io_context ioc; - connection conn{ioc}; - conn.set_receive_response(error_tag_obj); - - request req; - req.push("PING"); - req.push("SUBSCRIBE", "channel"); - req.push("PING"); - - bool push_received = false, exec_finished = false, run_finished = false; - - // async_receive is cancelled every reconnection cycle - conn.async_receive([&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - push_received = true; - }); - - // The request is cancelled because the PING response isn't processed - // by the time the error is generated - conn.async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, net::error::operation_aborted); - exec_finished = true; - }); - - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 0s; // so we can validate the generated error - conn.async_run(cfg, [&run_finished](error_code ec) { - BOOST_TEST_EQ(ec, error::incompatible_size); - run_finished = true; - }); - - ioc.run_for(test_timeout); - BOOST_TEST(push_received); - BOOST_TEST(exec_finished); - BOOST_TEST(run_finished); -} - -// A push response error triggers a reconnection -// TODO: this test should be in push2 -void test_push_adapter_error_reconnection() -{ - net::io_context ioc; - connection conn{ioc}; - conn.set_receive_response(error_tag_obj); - - request req; - req.push("PING"); - req.push("SUBSCRIBE", "channel"); - req.push("PING"); - - request req2; - req2.push("PING", "msg2"); - req2.get_config().cancel_if_unresponded = false; - - response resp; - - bool push_received = false, exec_finished = false, run_finished = false; - - // async_receive is cancelled every reconnection cycle - conn.async_receive([&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - push_received = true; - }); - - auto on_exec2 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(std::get<0>(resp).value(), "msg2"); - exec_finished = true; - conn.cancel(); - }; - - // The request is cancelled because the PING response isn't processed - // by the time the error is generated - conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, net::error::operation_aborted); - conn.async_exec(req2, resp, on_exec2); - }); - - auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 50ms; // make the test run faster - conn.async_run(cfg, [&run_finished](error_code ec) { - BOOST_TEST_EQ(ec, net::error::operation_aborted); - run_finished = true; - }); - - ioc.run_for(test_timeout); - - BOOST_TEST(push_received); - BOOST_TEST(exec_finished); - BOOST_TEST(run_finished); -} - // After an async_receive operation finishes, another one can be issued struct test_consecutive_receives { net::io_context ioc; @@ -487,8 +379,6 @@ int main() test_async_receive_push_available(); test_sync_receive(); test_async_receive_cancelled_on_reconnection{}.run(); - test_push_adapter_error(); - test_push_adapter_error_reconnection(); test_consecutive_receives{}.run(); return boost::report_errors(); diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 5fddb8c4e..40f46b244 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -170,51 +170,48 @@ void test_exec_push_interleaved() BOOST_TEST(run_finished); } +// An adapter that always errors struct response_error_tag { }; response_error_tag error_tag_obj; struct response_error_adapter { void on_init() { } void on_done() { } - - void on_node( - boost::redis::resp3::basic_node const&, - boost::system::error_code& ec) - { - ec = boost::redis::error::incompatible_size; - } + void on_node(node_view const&, error_code& ec) { ec = error::incompatible_size; } }; auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } -void test_test_push_adapter() +// If the push adapter returns an error, the connection is torn down +void test_push_adapter_error() { net::io_context ioc; - auto conn = std::make_shared(ioc); + connection conn{ioc}; + conn.set_receive_response(error_tag_obj); request req; - req.push("HELLO", 3); req.push("PING"); req.push("SUBSCRIBE", "channel"); req.push("PING"); - conn->set_receive_response(error_tag_obj); - bool push_received = false, exec_finished = false, run_finished = false; - conn->async_receive2([&, conn](error_code ec) { + // async_receive2 is cancelled every reconnection cycle + conn.async_receive2([&](error_code ec) { BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); push_received = true; }); - conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { + // The request is cancelled because the PING response isn't processed + // by the time the error is generated + conn.async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { BOOST_TEST_EQ(ec, net::error::operation_aborted); exec_finished = true; }); auto cfg = make_test_config(); - cfg.reconnect_wait_interval = 0s; - conn->async_run(cfg, [&run_finished](error_code ec) { + cfg.reconnect_wait_interval = 0s; // so we can validate the generated error + conn.async_run(cfg, [&run_finished](error_code ec) { BOOST_TEST_EQ(ec, error::incompatible_size); run_finished = true; }); @@ -223,9 +220,60 @@ void test_test_push_adapter() BOOST_TEST(push_received); BOOST_TEST(exec_finished); BOOST_TEST(run_finished); +} + +// A push response error triggers a reconnection +void test_push_adapter_error_reconnection() +{ + net::io_context ioc; + connection conn{ioc}; + conn.set_receive_response(error_tag_obj); + + request req; + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("PING"); + + request req2; + req2.push("PING", "msg2"); + req2.get_config().cancel_if_unresponded = false; + + response resp; + + bool push_received = false, exec_finished = false, run_finished = false; + + // async_receive2 is cancelled every reconnection cycle + conn.async_receive2([&](error_code ec) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + push_received = true; + }); + + auto on_exec2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(std::get<0>(resp).value(), "msg2"); + exec_finished = true; + conn.cancel(); + }; - // TODO: Reset the ioc reconnect and send a quit to ensure - // reconnection is possible after an error. + // The request is cancelled because the PING response isn't processed + // by the time the error is generated + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + conn.async_exec(req2, resp, on_exec2); + }); + + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 50ms; // make the test run faster + conn.async_run(cfg, [&run_finished](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(push_received); + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); } void launch_push_consumer(std::shared_ptr conn) @@ -554,7 +602,8 @@ int main() test_async_receive2_waiting_for_push(); test_async_receive2_push_available(); test_exec_push_interleaved(); - test_test_push_adapter(); + test_push_adapter_error(); + test_push_adapter_error_reconnection(); test_many_subscribers(); test_test_unsubscribe(); test_pubsub_state_restoration{}.run(); From 704472c77d793e6176f2d09e34a61c799903e236 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:36:11 +0100 Subject: [PATCH 21/23] test_consecutive_receives --- test/test_conn_push2.cpp | 178 ++++++++++++++++++++------------------- 1 file changed, 91 insertions(+), 87 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 40f46b244..7b1e678fb 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -276,97 +276,101 @@ void test_push_adapter_error_reconnection() BOOST_TEST(run_finished); } -void launch_push_consumer(std::shared_ptr conn) +// After an async_receive2 operation finishes, another one can be issued +void test_consecutive_receives() { - conn->async_receive2([conn](error_code ec) { - if (ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - return; + struct impl { + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + bool push_consumer_finished{false}; + + void launch_push_consumer() + { + conn.async_receive2([this](error_code ec) { + if (ec) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + push_consumer_finished = true; + resp.clear(); + return; + } + launch_push_consumer(); + }); } - launch_push_consumer(conn); - }); -} - -void test_many_subscribers() -{ - request req0; - req0.get_config().cancel_on_connection_lost = false; - req0.push("HELLO", 3); - - request req1; - req1.get_config().cancel_on_connection_lost = false; - req1.push("PING", "Message1"); - - request req2; - req2.get_config().cancel_on_connection_lost = false; - req2.push("SUBSCRIBE", "channel"); - - request req3; - req3.get_config().cancel_on_connection_lost = false; - req3.push("QUIT"); - net::io_context ioc; - auto conn = std::make_shared(ioc); - - bool finished = false; - - auto c11 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->cancel(operation::reconnection); - finished = true; - }; - auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req3, ignore, c11); - }; - auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c10); - }; - auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c9); - }; - auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c8); - }; - auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c7); - }; - auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c6); - }; - auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c5); - }; - auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c4); - }; - auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c3); - }; - auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req2, ignore, c2); - }; - auto c0 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn->async_exec(req1, ignore, c1); + void run() + { + conn.set_receive_response(resp); + + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); + + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); + + bool exec_finished = false, run_finished = false; + + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + conn.cancel(); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c2); + }; + + conn.async_exec(req1, ignore, c1); + launch_push_consumer(); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + BOOST_TEST(push_consumer_finished); + } }; - conn->async_exec(req0, ignore, c0); - launch_push_consumer(conn); - - run(conn, make_test_config(), {}); - - ioc.run_for(test_timeout); - BOOST_TEST(finished); + impl{}.run(); } void test_test_unsubscribe() @@ -604,7 +608,7 @@ int main() test_exec_push_interleaved(); test_push_adapter_error(); test_push_adapter_error_reconnection(); - test_many_subscribers(); + test_consecutive_receives(); test_test_unsubscribe(); test_pubsub_state_restoration{}.run(); From 3b6f8053bd8ed921d17915058a467b468625b67b Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:42:40 +0100 Subject: [PATCH 22/23] refactor --- test/test_conn_push2.cpp | 183 +++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 95 deletions(-) diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp index 7b1e678fb..5c260f8c3 100644 --- a/test/test_conn_push2.cpp +++ b/test/test_conn_push2.cpp @@ -17,7 +17,7 @@ #include "common.hpp" #include -#include +#include #include #include #include @@ -279,101 +279,94 @@ void test_push_adapter_error_reconnection() // After an async_receive2 operation finishes, another one can be issued void test_consecutive_receives() { - struct impl { - net::io_context ioc; - connection conn{ioc}; - resp3::flat_tree resp; - bool push_consumer_finished{false}; - - void launch_push_consumer() - { - conn.async_receive2([this](error_code ec) { - if (ec) { - BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); - push_consumer_finished = true; - resp.clear(); - return; - } - launch_push_consumer(); - }); - } - - void run() - { - conn.set_receive_response(resp); - - request req1; - req1.get_config().cancel_on_connection_lost = false; - req1.push("PING", "Message1"); - - request req2; - req2.get_config().cancel_on_connection_lost = false; - req2.push("SUBSCRIBE", "channel"); - - bool exec_finished = false, run_finished = false; - - auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - exec_finished = true; - conn.cancel(); - }; - auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c10); - }; - auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c9); - }; - auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c8); - }; - auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c7); - }; - auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c6); - }; - auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c5); - }; - auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c4); - }; - auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c3); - }; - auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c2); - }; - - conn.async_exec(req1, ignore, c1); + net::io_context ioc; + connection conn{ioc}; + resp3::flat_tree resp; + bool push_consumer_finished{false}; + + std::function launch_push_consumer = [&]() { + conn.async_receive2([&](error_code ec) { + if (ec) { + BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); + push_consumer_finished = true; + resp.clear(); + return; + } launch_push_consumer(); + }); + }; - conn.async_run(make_test_config(), [&](error_code ec) { - run_finished = true; - BOOST_TEST_EQ(ec, net::error::operation_aborted); - }); + conn.set_receive_response(resp); - ioc.run_for(test_timeout); + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); - BOOST_TEST(exec_finished); - BOOST_TEST(run_finished); - BOOST_TEST(push_consumer_finished); - } + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); + + bool exec_finished = false, run_finished = false; + + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + conn.cancel(); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c2); }; - impl{}.run(); + conn.async_exec(req1, ignore, c1); + launch_push_consumer(); + + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + BOOST_TEST(push_consumer_finished); } -void test_test_unsubscribe() +// UNSUBSCRIBE and PUNSUBSCRIBE work +void test_unsubscribe() { net::io_context ioc; connection conn{ioc}; @@ -440,12 +433,12 @@ void test_test_unsubscribe() BOOST_TEST(run_finished); } -class test_pubsub_state_restoration { +struct test_pubsub_state_restoration_impl { net::io_context ioc; connection conn{ioc}; - request req; - response resp_str; - flat_tree resp_push; + request req{}; + response resp_str{}; + flat_tree resp_push{}; bool exec_finished = false; void check_subscriptions() @@ -576,7 +569,6 @@ class test_pubsub_state_restoration { }); } -public: void run() { conn.set_receive_response(resp_push); @@ -598,6 +590,7 @@ class test_pubsub_state_restoration { BOOST_TEST(run_finished); } }; +void test_pubsub_state_restoration() { test_pubsub_state_restoration_impl{}.run(); } } // namespace @@ -609,8 +602,8 @@ int main() test_push_adapter_error(); test_push_adapter_error_reconnection(); test_consecutive_receives(); - test_test_unsubscribe(); - test_pubsub_state_restoration{}.run(); + test_unsubscribe(); + test_pubsub_state_restoration(); return boost::report_errors(); } From f3286df6d1d27ea9e7ec919dae5a9ff71cfce2ac Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Wed, 14 Jan 2026 14:45:30 +0100 Subject: [PATCH 23/23] cleanup --- test/test_conn_push.cpp | 144 ++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 71 deletions(-) diff --git a/test/test_conn_push.cpp b/test/test_conn_push.cpp index bfd055003..3b2645522 100644 --- a/test/test_conn_push.cpp +++ b/test/test_conn_push.cpp @@ -20,7 +20,7 @@ #include "common.hpp" #include -#include +#include namespace net = boost::asio; using namespace boost::redis; @@ -183,7 +183,7 @@ void test_sync_receive() // async_receive is cancelled every time a reconnection happens, // so we can re-establish subscriptions -struct test_async_receive_cancelled_on_reconnection { +struct test_async_receive_cancelled_on_reconnection_impl { net::io_context ioc; connection conn{ioc}; resp3::flat_tree resp{}; @@ -279,16 +279,21 @@ struct test_async_receive_cancelled_on_reconnection { } }; +void test_async_receive_cancelled_on_reconnection() +{ + test_async_receive_cancelled_on_reconnection_impl{}.run(); +} + // After an async_receive operation finishes, another one can be issued -struct test_consecutive_receives { +void test_consecutive_receives() +{ net::io_context ioc; connection conn{ioc}; resp3::flat_tree resp; bool push_consumer_finished{false}; - void launch_push_consumer() - { - conn.async_receive([this](error_code ec, std::size_t) { + std::function launch_push_consumer = [&]() { + conn.async_receive([&](error_code ec, std::size_t) { if (ec) { BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled); push_consumer_finished = true; @@ -297,78 +302,75 @@ struct test_consecutive_receives { } launch_push_consumer(); }); - } + }; - void run() - { - conn.set_receive_response(resp); + conn.set_receive_response(resp); - request req1; - req1.get_config().cancel_on_connection_lost = false; - req1.push("PING", "Message1"); + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); - request req2; - req2.get_config().cancel_on_connection_lost = false; - req2.push("SUBSCRIBE", "channel"); + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); - bool exec_finished = false, run_finished = false; + bool exec_finished = false, run_finished = false; - auto c10 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - exec_finished = true; - conn.cancel(); - }; - auto c9 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c10); - }; - auto c8 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c9); - }; - auto c7 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c8); - }; - auto c6 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c7); - }; - auto c5 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c6); - }; - auto c4 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c5); - }; - auto c3 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req1, ignore, c4); - }; - auto c2 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c3); - }; - auto c1 = [&](error_code ec, std::size_t) { - BOOST_TEST_EQ(ec, error_code()); - conn.async_exec(req2, ignore, c2); - }; + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + exec_finished = true; + conn.cancel(); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + conn.async_exec(req2, ignore, c2); + }; - conn.async_exec(req1, ignore, c1); - launch_push_consumer(); + conn.async_exec(req1, ignore, c1); + launch_push_consumer(); - conn.async_run(make_test_config(), [&](error_code ec) { - run_finished = true; - BOOST_TEST_EQ(ec, net::error::operation_aborted); - }); + conn.async_run(make_test_config(), [&](error_code ec) { + run_finished = true; + BOOST_TEST_EQ(ec, net::error::operation_aborted); + }); - ioc.run_for(test_timeout); + ioc.run_for(test_timeout); - BOOST_TEST(exec_finished); - BOOST_TEST(run_finished); - BOOST_TEST(push_consumer_finished); - } + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + BOOST_TEST(push_consumer_finished); }; } // namespace @@ -378,8 +380,8 @@ int main() test_async_receive_waiting_for_push(); test_async_receive_push_available(); test_sync_receive(); - test_async_receive_cancelled_on_reconnection{}.run(); - test_consecutive_receives{}.run(); + test_async_receive_cancelled_on_reconnection(); + test_consecutive_receives(); return boost::report_errors(); }