Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/boost/redis/detail/connection_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include <boost/redis/detail/subscription_tracker.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/response.hpp>

#include <random>
Expand Down Expand Up @@ -56,7 +56,7 @@ struct connection_state {
// Sentinel stuff
lazy_random_engine eng{};
std::vector<address> sentinels{};
std::vector<resp3::node> sentinel_resp_nodes{}; // for parsing
resp3::flat_tree sentinel_resp_nodes{}; // for parsing
};

} // namespace boost::redis::detail
Expand Down
1 change: 0 additions & 1 deletion include/boost/redis/impl/sentinel_resolve_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ sentinel_action sentinel_resolve_fsm::resume(

update_sentinel_list(st.sentinels, idx_, resp.sentinels, st.cfg.sentinel.addresses);

st.sentinel_resp_nodes.clear(); // reduce memory consumption
return system::error_code();
}

Expand Down
233 changes: 116 additions & 117 deletions include/boost/redis/impl/sentinel_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

#include <boost/redis/config.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/type.hpp>

#include <boost/assert.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/core/span.hpp>
#include <boost/system/error_code.hpp>

Expand Down Expand Up @@ -43,71 +43,131 @@ inline void compose_sentinel_request(config& cfg)
// Note that we don't care about request flags because this is a one-time request
}

// Helper
inline system::error_code sentinel_check_errors(const resp3::node_view& nd, std::string& diag)
{
switch (nd.data_type) {
case resp3::type::simple_error: diag = nd.value; return error::resp3_simple_error;
case resp3::type::blob_error: diag = nd.value; return error::resp3_blob_error;
default: return system::error_code();
}
};

// Parses a list of replicas or sentinels
inline system::error_code parse_server_list(
const resp3::node*& first,
const resp3::node* last,
const resp3::flat_tree& tree,
std::size_t& index,
std::string& diag,
std::vector<address>& out)
{
const auto* it = first;
ignore_unused(last);
const auto& root = tree.at(index);
BOOST_ASSERT(root.depth == 0u);

// If the command failed, this will be an error
if (auto ec = sentinel_check_errors(root, diag))
return ec;

// The root node must be an array
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 0u);
if (it->data_type != resp3::type::array)
return {error::expects_resp3_array};
const std::size_t num_servers = it->aggregate_size;
++it;
if (root.data_type != resp3::type::array)
return error::expects_resp3_array;
const std::size_t num_servers = root.aggregate_size;
++index;

// Each element in the array represents a server
out.resize(num_servers);
for (std::size_t i = 0u; i < num_servers; ++i) {
// A server is a map (resp3) or array (resp2, currently unsupported)
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 1u);
if (it->data_type != resp3::type::map)
return {error::expects_resp3_map};
const std::size_t num_key_values = it->aggregate_size;
++it;
const auto& server_node = tree.at(index);
BOOST_ASSERT(server_node.depth == 1u);
if (server_node.data_type != resp3::type::map)
return error::expects_resp3_map;
const std::size_t num_key_values = server_node.aggregate_size;
++index;

// The server object is composed by a set of key/value pairs.
// Skip everything except for the ones we care for.
bool ip_seen = false, port_seen = false;
for (std::size_t j = 0; j < num_key_values; ++j) {
// Key. It should be a string
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 2u);
if (it->data_type != resp3::type::blob_string)
return {error::expects_resp3_string};
const std::string_view key = it->value;
++it;
const auto& key_node = tree.at(index);
BOOST_ASSERT(key_node.depth == 2u);
if (key_node.data_type != resp3::type::blob_string)
return error::expects_resp3_string;
const std::string_view key = key_node.value;
++index;

// Value. All values seem to be strings, too.
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 2u);
if (it->data_type != resp3::type::blob_string)
return {error::expects_resp3_string};
const auto& value_node = tree.at(index);
BOOST_ASSERT(value_node.depth == 2u);
if (value_node.data_type != resp3::type::blob_string)
return error::expects_resp3_string;

// Record it
if (key == "ip") {
ip_seen = true;
out[i].host = it->value;
out[i].host = value_node.value;
} else if (key == "port") {
port_seen = true;
out[i].port = it->value;
out[i].port = value_node.value;
}

++it;
++index;
}

// Check that the response actually contained the fields we wanted
if (!ip_seen || !port_seen)
return {error::empty_field};
return error::empty_field;
}

// Done
first = it;
return system::error_code();
}

// Parses the output of SENTINEL GET-MASTER-ADDR-BY-NAME
inline system::error_code parse_get_master_addr_by_name(
const resp3::flat_tree& tree,
std::size_t& index,
std::string& diag,
address& out)
{
const auto& root_node = tree.at(index);
BOOST_ASSERT(root_node.depth == 0u);

// Check for errors
if (auto ec = sentinel_check_errors(root_node, diag))
return ec;

// If the root node is NULL, Sentinel doesn't know about this master.
// We use resp3_null to signal this fact. This doesn't reach the end user.
// If this is the case, SENTINEL REPLICAS and SENTINEL SENTINELS will fail.
// We exit here so the diagnostic is clean.
if (root_node.data_type == resp3::type::null) {
return error::resp3_null;
}

// If the root node is an array, an IP and port follow
if (root_node.data_type != resp3::type::array)
return error::expects_resp3_array;
if (root_node.aggregate_size != 2u)
return error::incompatible_size;
++index;

// IP
const auto& ip_node = tree.at(index);
BOOST_ASSERT(ip_node.depth == 1u);
if (ip_node.data_type != resp3::type::blob_string)
return error::expects_resp3_string;
out.host = ip_node.value;
++index;

// Port
const auto& port_node = tree.at(index);
BOOST_ASSERT(port_node.depth == 1u);
if (port_node.data_type != resp3::type::blob_string)
return error::expects_resp3_string;
out.port = port_node.value;
++index;

return system::error_code();
}

Expand All @@ -130,114 +190,53 @@ struct sentinel_response {
// This means that we can't use generic_response, since its adapter errors on error nodes.
// SENTINEL GET-MASTER-ADDR-BY-NAME is sent even when connecting to replicas
// for better diagnostics when the master name is unknown.
// Preconditions:
// * There are at least 2 (master)/3 (replica) root nodes.
// * The node array originates from parsing a valid RESP3 message.
// E.g. we won't check that the first node has depth 0.
// Note that the tree should originate from a valid RESP3 message
// (i.e. we won't check that the first node has depth 0.)
inline system::error_code parse_sentinel_response(
span<const resp3::node> nodes,
const resp3::flat_tree& tree,
role server_role,
sentinel_response& out)
{
auto check_errors = [&out](const resp3::node& nd) {
switch (nd.data_type) {
case resp3::type::simple_error:
out.diagnostic = nd.value;
return system::error_code(error::resp3_simple_error);
case resp3::type::blob_error:
out.diagnostic = nd.value;
return system::error_code(error::resp3_blob_error);
default: return system::error_code();
}
};

// Clear the output
out.diagnostic.clear();
out.sentinels.clear();
out.replicas.clear();

// Find the first root node of interest. It's the 2nd or 3rd, starting with the end
auto find_first = [nodes, server_role] {
const std::size_t expected_roots = server_role == role::master ? 2u : 3u;
std::size_t roots_seen = 0u;
for (auto it = nodes.rbegin();; ++it) {
BOOST_ASSERT(it != nodes.rend());
if (it->depth == 0u && ++roots_seen == expected_roots)
return &*it;
}
};
const resp3::node* lib_first = find_first();
// Index-based access
std::size_t index = 0;

// Iterators
const resp3::node* it = nodes.begin();
const resp3::node* last = nodes.end();
ignore_unused(last);
// User-supplied commands are before the ones added by us.
// Find out how many responses should we skip
const std::size_t num_lib_msgs = server_role == role::master ? 2u : 3u;
if (tree.get_total_msgs() < num_lib_msgs)
return error::incompatible_size;
const std::size_t num_user_msgs = tree.get_total_msgs() - num_lib_msgs;

// Go through all the responses to user-supplied requests checking for errors
for (; it != lib_first; ++it) {
if (auto ec = check_errors(*it))
BOOST_ASSERT(tree.at(index).depth == 0u);
for (std::size_t remaining_roots = num_user_msgs + 1u;; ++index) {
// Exit at node N+1
const auto& node = tree.at(index);
if (node.depth == 0u && --remaining_roots == 0u)
break;

// This is a user-supplied message. Check for errors
if (auto ec = sentinel_check_errors(node, out.diagnostic))
return ec;
}

// SENTINEL GET-MASTER-ADDR-BY-NAME

// Check for errors
if (auto ec = check_errors(*it))
if (auto ec = parse_get_master_addr_by_name(tree, index, out.diagnostic, out.master_addr))
return ec;

// If the root node is NULL, Sentinel doesn't know about this master.
// We use resp3_null to signal this fact. This doesn't reach the end user.
if (it->data_type == resp3::type::null) {
return {error::resp3_null};
}

// If the root node is an array, an IP and port follow
if (it->data_type != resp3::type::array)
return {error::expects_resp3_array};
if (it->aggregate_size != 2u)
return {error::incompatible_size};
++it;

// IP
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 1u);
if (it->data_type != resp3::type::blob_string)
return {error::expects_resp3_string};
out.master_addr.host = it->value;
++it;

// Port
BOOST_ASSERT(it != last);
BOOST_ASSERT(it->depth == 1u);
if (it->data_type != resp3::type::blob_string)
return {error::expects_resp3_string};
out.master_addr.port = it->value;
++it;

// SENTINEL REPLICAS
if (server_role == role::replica) {
// SENTINEL REPLICAS

// This request fails if Sentinel doesn't know about this master.
// However, that's not the case if we got here.
// Check for other errors.
if (auto ec = check_errors(*it))
return ec;

// Actual parsing
if (auto ec = parse_server_list(it, last, out.replicas))
if (auto ec = parse_server_list(tree, index, out.diagnostic, out.replicas))
return ec;
}

// SENTINEL SENTINELS

// This request fails if Sentinel doesn't know about this master.
// However, that's not the case if we got here.
// Check for other errors.
if (auto ec = check_errors(*it))
return ec;

// Actual parsing
if (auto ec = parse_server_list(it, last, out.sentinels))
if (auto ec = parse_server_list(tree, index, out.diagnostic, out.sentinels))
return ec;

// Done
Expand Down
11 changes: 5 additions & 6 deletions test/sansio_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/resp3/flat_tree.hpp>

#include <boost/assert/source_location.hpp>
#include <boost/core/ignore_unused.hpp>
Expand Down Expand Up @@ -74,12 +75,10 @@ logger log_fixture::make_logger()
});
}

std::vector<resp3::node> nodes_from_resp3(
const std::vector<std::string_view>& msgs,
source_location loc)
resp3::flat_tree tree_from_resp3(const std::vector<std::string_view>& msgs, source_location loc)
{
std::vector<resp3::node> nodes;
any_adapter adapter{nodes};
resp3::flat_tree res;
any_adapter adapter{res};

for (std::string_view resp : msgs) {
resp3::parser p;
Expand All @@ -91,7 +90,7 @@ std::vector<resp3::node> nodes_from_resp3(
std::cerr << "Called from " << loc << std::endl;
}

return nodes;
return res;
}

} // namespace boost::redis::detail
5 changes: 3 additions & 2 deletions test/sansio_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define BOOST_REDIS_TEST_SANSIO_UTILS_HPP

#include <boost/redis/logger.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/resp3/node.hpp>

#include <boost/assert/source_location.hpp>
Expand Down Expand Up @@ -52,10 +53,10 @@ constexpr auto to_milliseconds(std::chrono::steady_clock::duration d)
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}

// Creates a vector of nodes from a set of RESP3 messages.
// Creates a collection of nodes from a set of RESP3 messages.
// Using the raw RESP values ensures that the correct
// node tree is built, which is not always obvious
std::vector<resp3::node> nodes_from_resp3(
resp3::flat_tree tree_from_resp3(
const std::vector<std::string_view>& msgs,
source_location loc = BOOST_CURRENT_LOCATION);

Expand Down
Loading