Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion lib/rage/cable/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,32 @@ def periodically(method_name = nil, every:, &block)
@__periodic_timers << [callback, every]
end

# Broadcast data to all the clients subscribed to a channel-local stream.
#
# @param streamable [#id, String, Symbol, Numeric, Array] an object that will be used to generate the stream name
# @param data [Object] the data to send to the clients
# @raise [ArgumentError] if the streamable object does not satisfy the type requirements
# @example
# NotificationsChannel.broadcast_to(current_user, { message: "You have a new notification!" })
def broadcast_to(streamable, data)
Rage.cable.broadcast(__stream_name_for(streamable), data)
end

# @private
def __stream_name_for(streamables)
stream_name = Array(streamables).map do |streamable|
if streamable.respond_to?(:id)
streamable.id
elsif streamable.is_a?(String) || streamable.is_a?(Symbol) || streamable.is_a?(Numeric)
streamable
else
raise ArgumentError, "Unable to generate stream name. Expected an object that responds to `id`, got: #{streamable.class}"
end
end

"#{name}:#{stream_name.join(":")}"
end

protected

def set_up_periodic_timers
Expand Down Expand Up @@ -407,13 +433,39 @@ def subscription_rejected?
!!@__subscription_rejected
end

# Subscribe to a stream.
# Subscribe to a stream global stream. Global streams are not associated with any specific channel instance and can be used to broadcast data to multiple channels at once.
#
# @param stream [String] the name of the stream
# @raise [ArgumentError] if the stream name is not a String
# @example Subscribe to a stream
# class NotificationsChannel < Rage::Cable::Channel
# def subscribed
# stream_from "notifications"
# end
# end
# @example Broadcast to the stream
# Rage::Cable.broadcast("notifications", { message: "A new member has joined!" })
def stream_from(stream)
raise ArgumentError, "Stream name must be a String" unless stream.is_a?(String)
Rage.cable.__protocol.subscribe(@__connection, stream, @__params)
end

# Subscribe to a local stream. Local streams are associated with a specific channel instance and can be used to send data to the current channel only.
#
# @param streamable [#id, String, Symbol, Numeric, Array] an object that will be used to generate the stream name
# @raise [ArgumentError] if the streamable object does not satisfy the type requirements
# @example Subscribe to a stream
# class NotificationsChannel < Rage::Cable::Channel
# def subscribed
# stream_for current_user
# end
# end
# @example Broadcast to the stream
# NotificationsChannel.broadcast_to(current_user, { message: "You have a new notification!" })
def stream_for(streamable)
stream_from(self.class.__stream_name_for(streamable))
end

# Broadcast data to all the clients subscribed to a stream.
#
# @param stream [String] the name of the stream
Expand Down
187 changes: 187 additions & 0 deletions spec/cable/channel/stream_for_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# frozen_string_literal: true

module CableChannelStreamForSpec
class TestChannel < Rage::Cable::Channel
def subscribed
stream_for params[:user]
end
end

class TestChannel2 < Rage::Cable::Channel
def subscribed
stream_for "global"
end
end

class User
attr_reader :id

def initialize(id)
@id = id
end
end
end

RSpec.describe Rage::Cable::Channel do
let(:protocol) { double("protocol", supports_rpc?: true) }
let(:connection) { double("connection") }
let(:params) { {} }
let(:identified_by) { {} }

before do
allow(Rage.cable).to receive(:__protocol).and_return(protocol)
allow(Rage.cable).to receive(:broadcast)
end

describe "#stream_for" do
subject { klass.tap(&:__register_actions).new(connection, params, identified_by) }

context "with an object that responds to id" do
let(:klass) { CableChannelStreamForSpec::TestChannel }
let(:user) { CableChannelStreamForSpec::User.new(123) }
let(:params) { { user: user } }

it "subscribes to a stream with the channel name and object id" do
expect(protocol).to receive(:subscribe).with(
connection,
"CableChannelStreamForSpec::TestChannel:123",
params
)
subject.__run_action(:subscribed)
end
end

context "with a string streamable" do
let(:klass) { CableChannelStreamForSpec::TestChannel2 }

it "subscribes to a stream with the channel name and string" do
expect(protocol).to receive(:subscribe).with(
connection,
"CableChannelStreamForSpec::TestChannel2:global",
params
)
subject.__run_action(:subscribed)
end
end

context "with a symbol streamable" do
let(:klass) { CableChannelStreamForSpec::TestChannel }
let(:params) { { user: :admin } }

it "subscribes to a stream with the channel name and symbol" do
expect(protocol).to receive(:subscribe).with(
connection,
"CableChannelStreamForSpec::TestChannel:admin",
params
)
subject.__run_action(:subscribed)
end
end

context "with a numeric streamable" do
let(:klass) { CableChannelStreamForSpec::TestChannel }
let(:params) { { user: 42 } }

it "subscribes to a stream with the channel name and number" do
expect(protocol).to receive(:subscribe).with(
connection,
"CableChannelStreamForSpec::TestChannel:42",
params
)
subject.__run_action(:subscribed)
end
end

context "with an array of streamables" do
let(:klass) { CableChannelStreamForSpec::TestChannel }
let(:user) { CableChannelStreamForSpec::User.new(123) }
let(:params) { { user: [user, "room", 456] } }

it "subscribes to a stream with the channel name and joined stream parts" do
expect(protocol).to receive(:subscribe).with(
connection,
"CableChannelStreamForSpec::TestChannel:123:room:456",
params
)
subject.__run_action(:subscribed)
end
end

context "with an invalid streamable" do
let(:klass) { CableChannelStreamForSpec::TestChannel }
let(:params) { { user: Object.new } }

it "raises an ArgumentError" do
allow(protocol).to receive(:subscribe)
expect { subject.__run_action(:subscribed) }.to raise_error(
ArgumentError,
/Unable to generate stream name/
)
end
end
end

describe ".broadcast_to" do
context "with an object that responds to id" do
let(:user) { CableChannelStreamForSpec::User.new(123) }

it "broadcasts to the correct stream" do
expect(Rage.cable).to receive(:broadcast).with(
"CableChannelStreamForSpec::TestChannel:123",
{ message: "Hello!" }
)
CableChannelStreamForSpec::TestChannel.broadcast_to(user, { message: "Hello!" })
end
end

context "with a string streamable" do
it "broadcasts to the correct stream" do
expect(Rage.cable).to receive(:broadcast).with(
"CableChannelStreamForSpec::TestChannel:notifications",
{ type: "alert" }
)
CableChannelStreamForSpec::TestChannel.broadcast_to("notifications", { type: "alert" })
end
end

context "with a symbol streamable" do
it "broadcasts to the correct stream" do
expect(Rage.cable).to receive(:broadcast).with(
"CableChannelStreamForSpec::TestChannel:admin",
{ data: 123 }
)
CableChannelStreamForSpec::TestChannel.broadcast_to(:admin, { data: 123 })
end
end

context "with a numeric streamable" do
it "broadcasts to the correct stream" do
expect(Rage.cable).to receive(:broadcast).with(
"CableChannelStreamForSpec::TestChannel:42",
{ count: 1 }
)
CableChannelStreamForSpec::TestChannel.broadcast_to(42, { count: 1 })
end
end

context "with an array of streamables" do
let(:user) { CableChannelStreamForSpec::User.new(123) }

it "broadcasts to the correct stream" do
expect(Rage.cable).to receive(:broadcast).with(
"CableChannelStreamForSpec::TestChannel:123:room:456",
{ message: "Hello!" }
)
CableChannelStreamForSpec::TestChannel.broadcast_to([user, "room", 456], { message: "Hello!" })
end
end

context "with an invalid streamable" do
it "raises an ArgumentError" do
expect {
CableChannelStreamForSpec::TestChannel.broadcast_to(Object.new, { message: "Hello!" })
}.to raise_error(ArgumentError, /Unable to generate stream name/)
end
end
end
end