diff --git a/lib/rage/cable/channel.rb b/lib/rage/cable/channel.rb index de461403..f5340e10 100644 --- a/lib/rage/cable/channel.rb +++ b/lib/rage/cable/channel.rb @@ -289,6 +289,32 @@ def periodically(method_name = nil, every:, &block) @__periodic_timers << [callback, every] end + # Broadcast data to all the clients subscribed to a channel-local stream. + # + # @param streamable [#id, String, Symbol, Numeric, Array] an object that will be used to generate the stream name + # @param data [Object] the data to send to the clients + # @raise [ArgumentError] if the streamable object does not satisfy the type requirements + # @example + # NotificationsChannel.broadcast_to(current_user, { message: "You have a new notification!" }) + def broadcast_to(streamable, data) + Rage.cable.broadcast(__stream_name_for(streamable), data) + end + + # @private + def __stream_name_for(streamables) + stream_name = Array(streamables).map do |streamable| + if streamable.respond_to?(:id) + streamable.id + elsif streamable.is_a?(String) || streamable.is_a?(Symbol) || streamable.is_a?(Numeric) + streamable + else + raise ArgumentError, "Unable to generate stream name. Expected an object that responds to `id`, got: #{streamable.class}" + end + end + + "#{name}:#{stream_name.join(":")}" + end + protected def set_up_periodic_timers @@ -407,13 +433,39 @@ def subscription_rejected? !!@__subscription_rejected end - # Subscribe to a stream. + # Subscribe to a stream global stream. Global streams are not associated with any specific channel instance and can be used to broadcast data to multiple channels at once. # # @param stream [String] the name of the stream + # @raise [ArgumentError] if the stream name is not a String + # @example Subscribe to a stream + # class NotificationsChannel < Rage::Cable::Channel + # def subscribed + # stream_from "notifications" + # end + # end + # @example Broadcast to the stream + # Rage::Cable.broadcast("notifications", { message: "A new member has joined!" }) def stream_from(stream) + raise ArgumentError, "Stream name must be a String" unless stream.is_a?(String) Rage.cable.__protocol.subscribe(@__connection, stream, @__params) end + # Subscribe to a local stream. Local streams are associated with a specific channel instance and can be used to send data to the current channel only. + # + # @param streamable [#id, String, Symbol, Numeric, Array] an object that will be used to generate the stream name + # @raise [ArgumentError] if the streamable object does not satisfy the type requirements + # @example Subscribe to a stream + # class NotificationsChannel < Rage::Cable::Channel + # def subscribed + # stream_for current_user + # end + # end + # @example Broadcast to the stream + # NotificationsChannel.broadcast_to(current_user, { message: "You have a new notification!" }) + def stream_for(streamable) + stream_from(self.class.__stream_name_for(streamable)) + end + # Broadcast data to all the clients subscribed to a stream. # # @param stream [String] the name of the stream diff --git a/spec/cable/channel/stream_for_spec.rb b/spec/cable/channel/stream_for_spec.rb new file mode 100644 index 00000000..5715d23a --- /dev/null +++ b/spec/cable/channel/stream_for_spec.rb @@ -0,0 +1,187 @@ +# frozen_string_literal: true + +module CableChannelStreamForSpec + class TestChannel < Rage::Cable::Channel + def subscribed + stream_for params[:user] + end + end + + class TestChannel2 < Rage::Cable::Channel + def subscribed + stream_for "global" + end + end + + class User + attr_reader :id + + def initialize(id) + @id = id + end + end +end + +RSpec.describe Rage::Cable::Channel do + let(:protocol) { double("protocol", supports_rpc?: true) } + let(:connection) { double("connection") } + let(:params) { {} } + let(:identified_by) { {} } + + before do + allow(Rage.cable).to receive(:__protocol).and_return(protocol) + allow(Rage.cable).to receive(:broadcast) + end + + describe "#stream_for" do + subject { klass.tap(&:__register_actions).new(connection, params, identified_by) } + + context "with an object that responds to id" do + let(:klass) { CableChannelStreamForSpec::TestChannel } + let(:user) { CableChannelStreamForSpec::User.new(123) } + let(:params) { { user: user } } + + it "subscribes to a stream with the channel name and object id" do + expect(protocol).to receive(:subscribe).with( + connection, + "CableChannelStreamForSpec::TestChannel:123", + params + ) + subject.__run_action(:subscribed) + end + end + + context "with a string streamable" do + let(:klass) { CableChannelStreamForSpec::TestChannel2 } + + it "subscribes to a stream with the channel name and string" do + expect(protocol).to receive(:subscribe).with( + connection, + "CableChannelStreamForSpec::TestChannel2:global", + params + ) + subject.__run_action(:subscribed) + end + end + + context "with a symbol streamable" do + let(:klass) { CableChannelStreamForSpec::TestChannel } + let(:params) { { user: :admin } } + + it "subscribes to a stream with the channel name and symbol" do + expect(protocol).to receive(:subscribe).with( + connection, + "CableChannelStreamForSpec::TestChannel:admin", + params + ) + subject.__run_action(:subscribed) + end + end + + context "with a numeric streamable" do + let(:klass) { CableChannelStreamForSpec::TestChannel } + let(:params) { { user: 42 } } + + it "subscribes to a stream with the channel name and number" do + expect(protocol).to receive(:subscribe).with( + connection, + "CableChannelStreamForSpec::TestChannel:42", + params + ) + subject.__run_action(:subscribed) + end + end + + context "with an array of streamables" do + let(:klass) { CableChannelStreamForSpec::TestChannel } + let(:user) { CableChannelStreamForSpec::User.new(123) } + let(:params) { { user: [user, "room", 456] } } + + it "subscribes to a stream with the channel name and joined stream parts" do + expect(protocol).to receive(:subscribe).with( + connection, + "CableChannelStreamForSpec::TestChannel:123:room:456", + params + ) + subject.__run_action(:subscribed) + end + end + + context "with an invalid streamable" do + let(:klass) { CableChannelStreamForSpec::TestChannel } + let(:params) { { user: Object.new } } + + it "raises an ArgumentError" do + allow(protocol).to receive(:subscribe) + expect { subject.__run_action(:subscribed) }.to raise_error( + ArgumentError, + /Unable to generate stream name/ + ) + end + end + end + + describe ".broadcast_to" do + context "with an object that responds to id" do + let(:user) { CableChannelStreamForSpec::User.new(123) } + + it "broadcasts to the correct stream" do + expect(Rage.cable).to receive(:broadcast).with( + "CableChannelStreamForSpec::TestChannel:123", + { message: "Hello!" } + ) + CableChannelStreamForSpec::TestChannel.broadcast_to(user, { message: "Hello!" }) + end + end + + context "with a string streamable" do + it "broadcasts to the correct stream" do + expect(Rage.cable).to receive(:broadcast).with( + "CableChannelStreamForSpec::TestChannel:notifications", + { type: "alert" } + ) + CableChannelStreamForSpec::TestChannel.broadcast_to("notifications", { type: "alert" }) + end + end + + context "with a symbol streamable" do + it "broadcasts to the correct stream" do + expect(Rage.cable).to receive(:broadcast).with( + "CableChannelStreamForSpec::TestChannel:admin", + { data: 123 } + ) + CableChannelStreamForSpec::TestChannel.broadcast_to(:admin, { data: 123 }) + end + end + + context "with a numeric streamable" do + it "broadcasts to the correct stream" do + expect(Rage.cable).to receive(:broadcast).with( + "CableChannelStreamForSpec::TestChannel:42", + { count: 1 } + ) + CableChannelStreamForSpec::TestChannel.broadcast_to(42, { count: 1 }) + end + end + + context "with an array of streamables" do + let(:user) { CableChannelStreamForSpec::User.new(123) } + + it "broadcasts to the correct stream" do + expect(Rage.cable).to receive(:broadcast).with( + "CableChannelStreamForSpec::TestChannel:123:room:456", + { message: "Hello!" } + ) + CableChannelStreamForSpec::TestChannel.broadcast_to([user, "room", 456], { message: "Hello!" }) + end + end + + context "with an invalid streamable" do + it "raises an ArgumentError" do + expect { + CableChannelStreamForSpec::TestChannel.broadcast_to(Object.new, { message: "Hello!" }) + }.to raise_error(ArgumentError, /Unable to generate stream name/) + end + end + end +end