Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions include/boost/redis/impl/reader_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ reader_fsm::action reader_fsm::resume(
// Prepare the buffer for the read operation
ec = st.mpx.prepare_read();
if (ec) {
log_debug(st.logger, "Reader task: error in prepare_read: ", ec);
log_err(st.logger, "Error preparing the read buffer: ", ec);
return {ec};
}

Expand All @@ -53,10 +53,9 @@ reader_fsm::action reader_fsm::resume(
}

// Log what we read
log_debug(st.logger, "Reader task: ", bytes_read, " bytes read");
if (ec) {
log_debug(st.logger, "Reader task: ", bytes_read, " bytes read, error: ", ec);
} else {
log_debug(st.logger, "Reader task: ", bytes_read, " bytes read");
log_err(st.logger, "Error reading data from the server: ", ec);
}

// Process the bytes read, even if there was an error
Expand All @@ -77,7 +76,12 @@ reader_fsm::action reader_fsm::resume(
if (ec) {
// TODO: Perhaps log what has not been consumed to aid
// debugging.
log_debug(st.logger, "Reader task: error processing message: ", ec);
if (ec == error::resp3_hello) {
// This is already logged in the setup adapter
log_debug(st.logger, "Error processing message: setup request error");
} else {
log_err(st.logger, "Error processing message: ", ec);
}
return ec;
}

Expand All @@ -94,9 +98,10 @@ reader_fsm::action reader_fsm::resume(
return system::error_code(asio::error::operation_aborted);
}

// Check for other errors
// Check for other errors.
// We should't get any in the real world, but just in case.
if (ec) {
log_debug(st.logger, "Reader task: error notifying push receiver: ", ec);
log_err(st.logger, "Error notifying push receiver: ", ec);
return ec;
}
} else {
Expand Down
10 changes: 6 additions & 4 deletions include/boost/redis/impl/run_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ inline void on_setup_done(const multiplexer::elem& elm, connection_state& st)
{
const auto ec = elm.get_error();
if (ec) {
if (st.diagnostic.empty()) {
log_info(st.logger, "Setup request execution: ", ec);
if (ec == error::resp3_hello) {
// This is the most common case, and the only one that generates a string diagnostic
log_err(st.logger, "Setup request execution failed: ", st.diagnostic);
} else {
log_info(st.logger, "Setup request execution: ", ec, " (", st.diagnostic, ")");
// Something else went wrong (e.g. network error while running the request).
log_err(st.logger, "Setup request execution failed: ", ec);
}
} else {
log_info(st.logger, "Setup request execution: success");
Expand Down Expand Up @@ -144,7 +146,7 @@ run_action run_fsm::resume(

if (ec) {
// There was an error. Skip to the reconnection loop
log_info(
log_err(
st.logger,
"Failed to connect to Redis server at ",
get_server_address(st),
Expand Down
5 changes: 3 additions & 2 deletions include/boost/redis/impl/writer_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/writer_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/impl/is_terminal_cancel.hpp>
#include <boost/redis/impl/log_utils.hpp>
#include <boost/redis/logger.hpp>
Expand All @@ -39,7 +40,7 @@ inline void process_ping_node(
}

if (ec) {
log_info(lgr, "Health checker: server answered ping with an error: ", nd.value);
log_err(lgr, "Health checker: server answered ping with an error: ", nd.value);
}
}

Expand Down Expand Up @@ -89,7 +90,7 @@ writer_action writer_fsm::resume(
if (ec == asio::error::operation_aborted) {
log_debug(st.logger, "Writer task: cancelled (1).");
} else {
log_debug(st.logger, "Writer task error: ", ec);
log_err(st.logger, "Error writing data to the server: ", ec);
}
return ec;
}
Expand Down
97 changes: 71 additions & 26 deletions test/test_reader_fsm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
Expand All @@ -19,20 +22,17 @@
#include "sansio_utils.hpp"

#include <chrono>
#include <memory>
#include <string_view>

using namespace boost::redis;
namespace net = boost::asio;
namespace redis = boost::redis;
using boost::system::error_code;
using net::cancellation_type_t;
using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::generic_response;
using redis::any_adapter;
using redis::config;
using redis::detail::connection_state;
using action = redis::detail::reader_fsm::action;
using redis::logger;
using detail::reader_fsm;
using detail::multiplexer;
using detail::connection_state;
using action = detail::reader_fsm::action;
using namespace std::chrono_literals;

// Operators
Expand Down Expand Up @@ -95,7 +95,7 @@ void copy_to(multiplexer& mpx, std::string_view data)
std::copy(data.cbegin(), data.cend(), buffer.begin());
}

struct fixture : redis::detail::log_fixture {
struct fixture : detail::log_fixture {
connection_state st{{make_logger()}};
generic_response resp;

Expand Down Expand Up @@ -246,14 +246,15 @@ void test_read_error()
copy_to(fix.st.mpx, payload);

// Deliver the data
act = fsm.resume(fix.st, payload.size(), {redis::error::empty_field}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::empty_field});
act = fsm.resume(fix.st, payload.size(), {error::empty_field}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{error::empty_field});

// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read, error: Expected field value is empty. [boost.redis:5]"},
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::err, "Error reading data from the server: Expected field value is empty. [boost.redis:5]"},
// clang-format on
});
}
Expand All @@ -270,13 +271,14 @@ void test_read_timeout()

// Timeout
act = fsm.resume(fix.st, 0, {net::error::operation_aborted}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::pong_timeout});
BOOST_TEST_EQ(act, error_code{error::pong_timeout});

// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 0 bytes read, error: Pong timeout. [boost.redis:19]"},
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 0 bytes read" },
{logger::level::err, "Error reading data from the server: Pong timeout. [boost.redis:19]"},
// clang-format on
});
}
Expand All @@ -296,18 +298,60 @@ void test_parse_error()

// Deliver the data
act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::not_a_number});
BOOST_TEST_EQ(act, error_code{error::not_a_number});

// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read"},
{logger::level::debug, "Reader task: 4 bytes read"},
{logger::level::debug,
"Reader task: error processing message: Can't convert string to number (maybe forgot to "
{logger::level::err,
"Error processing message: Can't convert string to number (maybe forgot to "
"upgrade to RESP3?). [boost.redis:2]" },
});
}

// A setup request error is similar to a parse error.
// The adapter would return error::resp3_hello.
// We log this somewhere else, so it shouldn't be logged here by default.
void test_setup_request_error()
{
// Setup
fixture fix;
reader_fsm fsm;
request req;
req.push("PING"); // should have 1 command
auto elem = std::make_shared<multiplexer::elem>(
req,
any_adapter{[](any_adapter::parse_event, const resp3::node_view&, error_code& ec) {
ec = error::resp3_hello;
}});
elem->set_done_callback([] { });

// Add the request to the multiplexer and simulate a successful write
fix.st.mpx.add(elem);
BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 1u);
BOOST_TEST(fix.st.mpx.commit_write(fix.st.mpx.get_write_buffer().size()));

// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::read_some(6s));

// The fsm is asking for data.
std::string const payload = "-ERR: bad\r\n";
copy_to(fix.st.mpx, payload);

// Deliver the data
act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{error::resp3_hello});

// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Error processing message: setup request error"},
});
}

void test_push_deliver_error()
{
fixture fix;
Expand All @@ -326,15 +370,15 @@ void test_push_deliver_error()
BOOST_TEST_EQ(act, action::notify_push_receiver(11u));

// Resumes from notifying a push with an error.
act = fsm.resume(fix.st, 0, redis::error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::empty_field});
act = fsm.resume(fix.st, 0, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{error::empty_field});

// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Reader task: error notifying push receiver: Expected field value is empty. [boost.redis:5]"},
{logger::level::err, "Error notifying push receiver: Expected field value is empty. [boost.redis:5]"},
// clang-format on
});
}
Expand All @@ -355,15 +399,15 @@ void test_max_read_buffer_size()
std::string const part1 = ">3\r\n";
copy_to(fix.st.mpx, part1);
act = fsm.resume(fix.st, part1.size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size));
BOOST_TEST_EQ(act, error_code(error::exceeds_maximum_read_buffer_size));

// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 4 bytes read" },
{logger::level::debug, "Reader task: incomplete message received"},
{logger::level::debug,
"Reader task: error in prepare_read: Reading data from the socket would exceed the maximum "
{logger::level::err,
"Error preparing the read buffer: Reading data from the socket would exceed the maximum "
"size allowed of the read buffer. [boost.redis:26]" },
});
}
Expand Down Expand Up @@ -494,6 +538,7 @@ int main()
test_read_error();
test_read_timeout();
test_parse_error();
test_setup_request_error();
test_push_deliver_error();
test_max_read_buffer_size();

Expand Down
Loading