Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/redis/pubsub_messages.hpp>

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
Expand All @@ -22,8 +23,10 @@ using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_flat_response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using boost::redis::pubsub_messages;
using boost::redis::pubsub_message;
using boost::system::error_code;
using asio::signal_set;

/* This example will subscribe and read pushes indefinitely.
Expand Down Expand Up @@ -72,10 +75,10 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>

// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value())
std::cout << elem.value << "\n";

std::cout << std::endl;
for (pubsub_message elem : pubsub_messages{resp.value()}) {
std::cout << "Received message from channel " << elem.channel << ": " << elem.payload
<< "\n";
}

resp.value().clear();
}
Expand Down
121 changes: 121 additions & 0 deletions include/boost/redis/impl/pubsub_messages.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/redis/pubsub_messages.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/type.hpp>

#include <algorithm>
#include <string_view>

namespace boost::redis {

namespace detail {

// Advances the iterator past all nodes belonging to the current root message.
// Returns a pointer to the next root node (depth == 0) or end.
inline const resp3::node_view* skip_current_message(
const resp3::node_view* current,
const resp3::node_view* end)
{
return std::find_if(current, end, [](const resp3::node_view& n) {
return n.depth == 0u;
});
}

// Attempts to parse a pubsub message starting at 'current'.
// If successful, populates 'msg' and returns true.
// If not a valid pubsub message (message/pmessage), returns false.
inline bool try_parse_pubsub_message(
const resp3::node_view*& current,
const resp3::node_view* end,
pubsub_message& msg)
{
if (current == end)
return false;

// Root must be a push type
if (current->data_type != resp3::type::push) {
current = skip_current_message(current + 1, end);
return false;
}

// Move to first child (message type)
++current;
if (current == end || current->depth != 1u || current->data_type != resp3::type::blob_string) {
current = skip_current_message(current, end);
return false;
}

// Determine the message type
bool is_pmessage;
if (current->value == "message") {
is_pmessage = false;
} else if (current->value == "pmessage") {
is_pmessage = true;
} else {
// Not interested in this message
current = skip_current_message(current, end);
return false;
}

// For pmessage, the matched pattern goes next
if (is_pmessage) {
++current;
if (
current == end || current->depth != 1u || current->data_type != resp3::type::blob_string) {
current = skip_current_message(current, end);
return false;
}
msg.pattern = current->value;
} else {
msg.pattern = {};
}

// Channel
++current;
if (current == end || current->depth != 1 || current->data_type != resp3::type::blob_string) {
current = skip_current_message(current, end);
return false;
}
msg.channel = current->value;

// Payload
++current;
if (current == end || current->depth != 1 || current->data_type != resp3::type::blob_string) {
current = skip_current_message(current, end);
return false;
}
msg.payload = current->value;

// We're done. We should be at the end of the message
++current;
if (current != end && current->depth != 0u) {
current = skip_current_message(current, end);
return false;
}

return true;
}

} // namespace detail

void pubsub_messages::iterator::advance()
{
while (current_) {
// If we found a message, we're done
if (detail::try_parse_pubsub_message(current_, end_, msg_))
return;

// If we just reached the end without a valid message, the range is exhausted
if (current_ == end_)
current_ = nullptr;
}
}

} // namespace boost::redis
128 changes: 128 additions & 0 deletions include/boost/redis/pubsub_messages.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//
// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_PUBSUB_MESSAGES_HPP
#define BOOST_REDIS_PUBSUB_MESSAGES_HPP

#include <boost/redis/resp3/node.hpp>

#include <boost/core/span.hpp>

#include <string_view>

namespace boost::redis {

/// A pubsub message parsed from a RESP3 push.
struct pubsub_message {
/// The channel where the message was published.
std::string_view channel;

/// The pattern that matched the channel (only for pmessage, empty otherwise).
std::string_view pattern;

/// The message payload.
std::string_view payload;
};

/// A range of pubsub messages parsed from RESP3 nodes.
///
/// This class provides a range-based interface for iterating over pubsub messages
/// stored in a container of RESP3 nodes (e.g., a @ref resp3::flat_tree).
/// Non-pubsub messages (like subscribe confirmations) are automatically skipped.
///
/// @par Example
/// @code
/// resp3::flat_tree tree;
/// // ... populate tree ...
/// for (const auto& msg : pubsub_messages(tree)) {
/// std::cout << "Channel: " << msg.channel << ", Payload: " << msg.payload << "\n";
/// }
/// @endcode
class pubsub_messages {
span<const resp3::node_view> nodes_;

public:
/// The iterator type.
/// Forward iterator for parsing pubsub messages from a range of RESP3 nodes.
class iterator {
const resp3::node_view* current_{};
const resp3::node_view* end_{};
pubsub_message msg_{};

friend class pubsub_messages;

void advance();

iterator(const resp3::node_view* first, const resp3::node_view* last) noexcept
: current_{first}
, end_{last}
{
advance();
}

public:
using value_type = pubsub_message;
using difference_type = std::ptrdiff_t;
using pointer = pubsub_message;
using reference = pubsub_message;
using iterator_category = std::forward_iterator_tag;

/// Constructs an end iterator.
iterator() = default;

/// Returns a reference to the current pubsub message.
reference operator*() const noexcept { return msg_; }

pointer operator->() const noexcept { return msg_; }

/// Advances to the next pubsub message.
iterator& operator++() noexcept
{
advance();
return *this;
}

/// Advances to the next pubsub message (postfix).
iterator operator++(int) noexcept
{
auto tmp = *this;
++*this;
return tmp;
}

/// Compares two iterators for equality.
friend bool operator==(const iterator& lhs, const iterator& rhs) noexcept
{
return lhs.current_ == rhs.current_;
}

/// Compares two iterators for inequality.
friend bool operator!=(const iterator& lhs, const iterator& rhs) noexcept
{
return !(lhs == rhs);
}
};

/// Constructs a range from a span of nodes.
explicit pubsub_messages(span<const resp3::node_view> nodes) noexcept
: nodes_{nodes}
{ }

/// Returns an iterator to the first pubsub message.
iterator begin() const noexcept
{
return iterator(nodes_.data(), nodes_.data() + nodes_.size());
}

/// Returns an iterator past the last pubsub message.
iterator end() const noexcept { return iterator(); }
};

} // namespace boost::redis

#endif
1 change: 1 addition & 0 deletions include/boost/redis/src.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/redis/impl/ignore.ipp>
#include <boost/redis/impl/logger.ipp>
#include <boost/redis/impl/multiplexer.ipp>
#include <boost/redis/impl/pubsub_messages.ipp>
#include <boost/redis/impl/read_buffer.ipp>
#include <boost/redis/impl/reader_fsm.ipp>
#include <boost/redis/impl/receive_fsm.ipp>
Expand Down