Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,17 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.
// You need to use specialized request::subscribe() function (instead of request::push)
// to enable this behavior.

// Loop to read Redis push messages.
for (error_code ec;;) {
while (conn->will_reconnect()) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
auto [ec] = co_await conn->async_receive2(asio::as_tuple);

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
if (ec) {
std::cerr << "Error during receive: " << ec << std::endl;
break;
}

Expand Down
10 changes: 6 additions & 4 deletions doc/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,17 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.
// You need to use specialized request::subscribe() function (instead of request::push)
// to enable this behavior.

// Loop to read Redis push messages.
for (error_code ec;;) {
while (conn->will_reconnect()) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
auto [ec] = co_await conn->async_receive2(asio::as_tuple);

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
if (ec) {
std::cerr << "Error during receive: " << ec << std::endl;
break;
}

Expand Down
11 changes: 5 additions & 6 deletions example/cpp20_chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

#include <boost/redis/connection.hpp>

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>

#include <exception>
Expand All @@ -30,7 +30,6 @@ using boost::asio::co_spawn;
using boost::asio::consign;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_flat_response;
Expand Down Expand Up @@ -61,13 +60,13 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
req.subscribe({"channel"});
co_await conn->async_exec(req);

for (error_code ec;;) {
while (conn->will_reconnect()) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
auto [ec] = co_await conn->async_receive2(asio::as_tuple);

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
if (ec) {
std::cerr << "Error during receive: " << ec << std::endl;
break;
}

Expand Down
12 changes: 5 additions & 7 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@

#include <boost/redis/connection.hpp>

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>

#include <iostream>

Expand Down Expand Up @@ -62,13 +60,13 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// to enable this behavior.

// Loop to read Redis push messages.
for (error_code ec;;) {
while (conn->will_reconnect()) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
auto [ec] = co_await conn->async_receive2(asio::as_tuple);

// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
if (ec) {
std::cerr << "Error during receive: " << ec << std::endl;
break;
}

Expand Down
159 changes: 116 additions & 43 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <boost/redis/detail/exec_one_fsm.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/detail/receive_fsm.hpp>
#include <boost/redis/detail/redis_stream.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/detail/sentinel_resolve_fsm.hpp>
Expand Down Expand Up @@ -154,7 +155,7 @@ struct connection_impl {
{
switch (op) {
case operation::exec: st_.mpx.cancel_waiting(); break;
case operation::receive: receive_channel_.cancel(); break;
case operation::receive: cancel_receive_v2(); break;
case operation::reconnection:
st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero();
break;
Expand All @@ -165,14 +166,22 @@ struct connection_impl {
case operation::health_check: cancel_run(); break;
case operation::all:
st_.mpx.cancel_waiting(); // exec
receive_channel_.cancel(); // receive
cancel_receive_v2(); // receive
st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect
cancel_run(); // run
break;
default: /* ignore */;
}
}

void cancel_receive_v1() { receive_channel_.cancel(); }

void cancel_receive_v2()
{
st_.receive2_cancelled = true;
cancel_receive_v1();
}

void cancel_run()
{
// Individual operations should see a terminal cancellation, regardless
Expand All @@ -184,8 +193,8 @@ struct connection_impl {
stream_.cancel_resolve();

// Receive is technically not part of run, but we also cancel it for
// backwards compatibility.
receive_channel_.cancel();
// backwards compatibility. Note that this intentionally affects v1 receive, only.
cancel_receive_v1();
}

bool is_open() const noexcept { return stream_.is_open(); }
Expand Down Expand Up @@ -234,32 +243,45 @@ struct connection_impl {

return size;
}
};

template <class CompletionToken>
auto async_receive2(CompletionToken&& token)
template <class Executor>
struct receive2_op {
connection_impl<Executor>* conn_;
receive_fsm fsm_{};

void drain_receive_channel()
{
// clang-format off
return
receive_channel_.async_receive(
asio::deferred(
[this](system::error_code ec, std::size_t)
{
if (!ec) {
auto f = [](system::error_code, std::size_t) {
// There is no point in checking for errors
// here since async_receive just completed
// without errors.
};

// We just want to drain the channel.
while (receive_channel_.try_receive(f));
}

return asio::deferred.values(ec);
}
)
)(std::forward<CompletionToken>(token));
// clang-format on
// We don't expect any errors here. The only errors
// that might appear in the channel are due to cancellations,
// and these don't make sense with try_receive
auto f = [](system::error_code, std::size_t) { };
while (conn_->receive_channel_.try_receive(f))
;
}

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t /* push_bytes */ = 0u)
{
receive_action act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled());

switch (act.type) {
case receive_action::action_type::setup_cancellation:
self.reset_cancellation_state(asio::enable_total_cancellation());
(*this)(self); // this action does not require yielding
return;
case receive_action::action_type::wait:
conn_->receive_channel_.async_receive(std::move(self));
return;
case receive_action::action_type::drain_channel:
drain_receive_channel();
(*this)(self); // this action does not require yielding
return;
case receive_action::action_type::immediate:
asio::async_immediate(self.get_io_executor(), std::move(self));
return;
case receive_action::action_type::done: self.complete(act.ec); return;
}
}
};

Expand Down Expand Up @@ -785,27 +807,68 @@ class basic_connection {
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
}

/** @brief Wait for server pushes asynchronously
/** @brief Wait for server pushes asynchronously.
*
* This function suspends until a server push is received by the
* This function suspends until at least one server push is received by the
* connection. On completion an unspecified number of pushes will
* have been added to the response object set with @ref
* boost::redis::connection::set_receive_response.
* set_receive_response. Use the functions in the response object
* to know how many messages they were received and consume them.
*
* To prevent receiving an unbound number of pushes the connection
* blocks further read operations on the socket when 256 pushes
* accumulate internally (we don't make any commitment to this
* exact number). When that happens any `async_exec`s and
* health-checks won't make any progress and the connection may
* eventually timeout. To avoid that Apps should call
* `async_receive2` continuously in a loop.
*
* @Note To avoid deadlocks the task (e.g. coroutine) calling
* eventually timeout. To avoid this, apps that expect server pushes
* should call this function continuously in a loop.
*
* This function should be used instead of the deprecated @ref async_receive.
* It differs from `async_receive` in the following:
*
* @li `async_receive` is designed to consume a single push message at a time.
* This can be inefficient when receiving lots of server pushes.
* `async_receive2` is batch-oriented. All pushes that are available
* when `async_receive2` is called will be marked as consumed.
* @li `async_receive` is cancelled when a reconnection happens (e.g. because
* of a network error). This enabled the user to re-establish subscriptions
* using @ref async_exec before waiting for pushes again. With the introduction of
* functions like @ref request::subscribe, subscriptions are automatically
* re-established on reconnection. Thus, `async_receive2` is not cancelled
* on reconnection.
* @li `async_receive` passes the number of bytes that each received
* push message contains. This information is unreliable and not very useful.
* Equivalent information is available using functions in the response object.
* @li `async_receive` might get cancelled if `async_run` is cancelled.
* This doesn't happen with `async_receive2`.
*
* This function does *not* remove messages from the response object
* passed to @ref set_receive_response - use the functions in the response
* object to achieve this.
*
* Only a single instance of `async_receive2` may be outstanding
* for a given connection at any time. Trying to start a second one
* will fail with @ref error::already_running.
*
* @note To avoid deadlocks the task (e.g. coroutine) calling
* `async_receive2` should not call `async_exec` in a way where
* they could block each other.
* they could block each other. This is, avoid the following pattern:
*
* @code
* asio::awaitable<void> receiver()
* {
* // Do NOT do this!!! The receive buffer might get full while
* // async_exec runs, which will block all read operations until async_receive2
* // is called. The two operations end up waiting each other, making the connection unresponsive.
* // If you need to do this, use two connections, instead.
* co_await conn.async_receive2();
* co_await conn.async_exec(req, resp);
* }
* @endcode
*
* For an example see cpp20_subscriber.cpp.
*
* For an example see cpp20_subscriber.cpp. The completion token
* must have the following signature
* The completion token must have the following signature:
*
* @code
* void f(system::error_code);
Expand All @@ -818,15 +881,15 @@ class basic_connection {
* @li `asio::cancellation_type_t::partial`.
* @li `asio::cancellation_type_t::total`.
*
* Calling `basic_connection::cancel(operation::receive)` will
* also cancel any ongoing receive operations.
*
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_receive2(CompletionToken&& token = {})
{
return impl_->async_receive2(std::forward<CompletionToken>(token));
return asio::async_compose<CompletionToken, void(system::error_code)>(
detail::receive2_op<Executor>{impl_.get()},
token,
*impl_);
}

/** @brief (Deprecated) Receives server pushes synchronously without blocking.
Expand Down Expand Up @@ -1223,7 +1286,9 @@ class connection {
template <class CompletionToken = asio::deferred_t>
auto async_receive2(CompletionToken&& token = {})
{
return impl_.async_receive2(std::forward<CompletionToken>(token));
return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
initiation{this},
token);
}

/// @copydoc basic_connection::receive
Expand Down Expand Up @@ -1336,6 +1401,12 @@ class connection {
{
self->async_exec_impl(*req, std::move(adapter), std::forward<Handler>(handler));
}

template <class Handler>
void operator()(Handler&& handler)
{
self->async_receive2_impl(std::forward<Handler>(handler));
}
};

void async_run_impl(
Expand All @@ -1352,6 +1423,8 @@ class connection {
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);

void async_receive2_impl(asio::any_completion_handler<void(boost::system::error_code)> token);

basic_connection<executor_type> impl_;
};

Expand Down
1 change: 1 addition & 0 deletions include/boost/redis/detail/connection_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct connection_state {
request setup_req{};
request ping_req{};
subscription_tracker tracker{};
bool receive2_running{false}, receive2_cancelled{false};

// Sentinel stuff
lazy_random_engine eng{};
Expand Down
Loading