From 1747f01716976b3b03e2410ca29618972e4e7e78 Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Tue, 27 Jan 2026 19:52:31 +0100 Subject: [PATCH] Initial impl --- example/cpp20_subscriber.cpp | 13 +- include/boost/redis/impl/pubsub_messages.ipp | 121 ++++++++++++++++++ include/boost/redis/pubsub_messages.hpp | 128 +++++++++++++++++++ include/boost/redis/src.hpp | 1 + 4 files changed, 258 insertions(+), 5 deletions(-) create mode 100644 include/boost/redis/impl/pubsub_messages.ipp create mode 100644 include/boost/redis/pubsub_messages.hpp diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index 513f90ac..94d43290 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -5,6 +5,7 @@ */ #include +#include #include #include @@ -22,8 +23,10 @@ using namespace std::chrono_literals; using boost::redis::request; using boost::redis::generic_flat_response; using boost::redis::config; -using boost::system::error_code; using boost::redis::connection; +using boost::redis::pubsub_messages; +using boost::redis::pubsub_message; +using boost::system::error_code; using asio::signal_set; /* This example will subscribe and read pushes indefinitely. @@ -72,10 +75,10 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // The response must be consumed without suspending the // coroutine i.e. without the use of async operations. - for (auto const& elem : resp.value()) - std::cout << elem.value << "\n"; - - std::cout << std::endl; + for (pubsub_message elem : pubsub_messages{resp.value()}) { + std::cout << "Received message from channel " << elem.channel << ": " << elem.payload + << "\n"; + } resp.value().clear(); } diff --git a/include/boost/redis/impl/pubsub_messages.ipp b/include/boost/redis/impl/pubsub_messages.ipp new file mode 100644 index 00000000..111be866 --- /dev/null +++ b/include/boost/redis/impl/pubsub_messages.ipp @@ -0,0 +1,121 @@ +// +// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include + +namespace boost::redis { + +namespace detail { + +// Advances the iterator past all nodes belonging to the current root message. +// Returns a pointer to the next root node (depth == 0) or end. +inline const resp3::node_view* skip_current_message( + const resp3::node_view* current, + const resp3::node_view* end) +{ + return std::find_if(current, end, [](const resp3::node_view& n) { + return n.depth == 0u; + }); +} + +// Attempts to parse a pubsub message starting at 'current'. +// If successful, populates 'msg' and returns true. +// If not a valid pubsub message (message/pmessage), returns false. +inline bool try_parse_pubsub_message( + const resp3::node_view*& current, + const resp3::node_view* end, + pubsub_message& msg) +{ + if (current == end) + return false; + + // Root must be a push type + if (current->data_type != resp3::type::push) { + current = skip_current_message(current + 1, end); + return false; + } + + // Move to first child (message type) + ++current; + if (current == end || current->depth != 1u || current->data_type != resp3::type::blob_string) { + current = skip_current_message(current, end); + return false; + } + + // Determine the message type + bool is_pmessage; + if (current->value == "message") { + is_pmessage = false; + } else if (current->value == "pmessage") { + is_pmessage = true; + } else { + // Not interested in this message + current = skip_current_message(current, end); + return false; + } + + // For pmessage, the matched pattern goes next + if (is_pmessage) { + ++current; + if ( + current == end || current->depth != 1u || current->data_type != resp3::type::blob_string) { + current = skip_current_message(current, end); + return false; + } + msg.pattern = current->value; + } else { + msg.pattern = {}; + } + + // Channel + ++current; + if (current == end || current->depth != 1 || current->data_type != resp3::type::blob_string) { + current = skip_current_message(current, end); + return false; + } + msg.channel = current->value; + + // Payload + ++current; + if (current == end || current->depth != 1 || current->data_type != resp3::type::blob_string) { + current = skip_current_message(current, end); + return false; + } + msg.payload = current->value; + + // We're done. We should be at the end of the message + ++current; + if (current != end && current->depth != 0u) { + current = skip_current_message(current, end); + return false; + } + + return true; +} + +} // namespace detail + +void pubsub_messages::iterator::advance() +{ + while (current_) { + // If we found a message, we're done + if (detail::try_parse_pubsub_message(current_, end_, msg_)) + return; + + // If we just reached the end without a valid message, the range is exhausted + if (current_ == end_) + current_ = nullptr; + } +} + +} // namespace boost::redis diff --git a/include/boost/redis/pubsub_messages.hpp b/include/boost/redis/pubsub_messages.hpp new file mode 100644 index 00000000..4f0d2196 --- /dev/null +++ b/include/boost/redis/pubsub_messages.hpp @@ -0,0 +1,128 @@ +// +// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_PUBSUB_MESSAGES_HPP +#define BOOST_REDIS_PUBSUB_MESSAGES_HPP + +#include + +#include + +#include + +namespace boost::redis { + +/// A pubsub message parsed from a RESP3 push. +struct pubsub_message { + /// The channel where the message was published. + std::string_view channel; + + /// The pattern that matched the channel (only for pmessage, empty otherwise). + std::string_view pattern; + + /// The message payload. + std::string_view payload; +}; + +/// A range of pubsub messages parsed from RESP3 nodes. +/// +/// This class provides a range-based interface for iterating over pubsub messages +/// stored in a container of RESP3 nodes (e.g., a @ref resp3::flat_tree). +/// Non-pubsub messages (like subscribe confirmations) are automatically skipped. +/// +/// @par Example +/// @code +/// resp3::flat_tree tree; +/// // ... populate tree ... +/// for (const auto& msg : pubsub_messages(tree)) { +/// std::cout << "Channel: " << msg.channel << ", Payload: " << msg.payload << "\n"; +/// } +/// @endcode +class pubsub_messages { + span nodes_; + +public: + /// The iterator type. + /// Forward iterator for parsing pubsub messages from a range of RESP3 nodes. + class iterator { + const resp3::node_view* current_{}; + const resp3::node_view* end_{}; + pubsub_message msg_{}; + + friend class pubsub_messages; + + void advance(); + + iterator(const resp3::node_view* first, const resp3::node_view* last) noexcept + : current_{first} + , end_{last} + { + advance(); + } + + public: + using value_type = pubsub_message; + using difference_type = std::ptrdiff_t; + using pointer = pubsub_message; + using reference = pubsub_message; + using iterator_category = std::forward_iterator_tag; + + /// Constructs an end iterator. + iterator() = default; + + /// Returns a reference to the current pubsub message. + reference operator*() const noexcept { return msg_; } + + pointer operator->() const noexcept { return msg_; } + + /// Advances to the next pubsub message. + iterator& operator++() noexcept + { + advance(); + return *this; + } + + /// Advances to the next pubsub message (postfix). + iterator operator++(int) noexcept + { + auto tmp = *this; + ++*this; + return tmp; + } + + /// Compares two iterators for equality. + friend bool operator==(const iterator& lhs, const iterator& rhs) noexcept + { + return lhs.current_ == rhs.current_; + } + + /// Compares two iterators for inequality. + friend bool operator!=(const iterator& lhs, const iterator& rhs) noexcept + { + return !(lhs == rhs); + } + }; + + /// Constructs a range from a span of nodes. + explicit pubsub_messages(span nodes) noexcept + : nodes_{nodes} + { } + + /// Returns an iterator to the first pubsub message. + iterator begin() const noexcept + { + return iterator(nodes_.data(), nodes_.data() + nodes_.size()); + } + + /// Returns an iterator past the last pubsub message. + iterator end() const noexcept { return iterator(); } +}; + +} // namespace boost::redis + +#endif diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 196b2960..474fca04 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include