diff --git a/examples/publish_mp4.exs b/examples/publish_mp4.exs index 64c0180..6aec52d 100644 --- a/examples/publish_mp4.exs +++ b/examples/publish_mp4.exs @@ -100,7 +100,7 @@ defmodule Publisher do %ExVideoData{ frame_type: :keyframe, packet_type: :sequence_start, - fourcc: :hvc1, + codec_id: :hvc1, data: dcr } end @@ -125,7 +125,7 @@ defmodule Publisher do %ExVideoData{ frame_type: if(keyframe?, do: :keyframe, else: :interframe), packet_type: :coded_frames, - fourcc: :hvc1, + codec_id: :hvc1, composition_time_offset: ct, data: sample.payload } diff --git a/lib/ex_rtmp/client.ex b/lib/ex_rtmp/client.ex index 41b138c..f4dd974 100644 --- a/lib/ex_rtmp/client.ex +++ b/lib/ex_rtmp/client.ex @@ -42,6 +42,7 @@ defmodule ExRTMP.Client do alias ExRTMP.Message.UserControl.Event @default_buffer_size 2_000_000 + @default_chunk_size 1024 @audio_msg_type 8 @video_msg_type 9 @@ -49,7 +50,8 @@ defmodule ExRTMP.Client do {:uri, String.t()}, {:stream_key, String.t()}, {:name, GenServer.name()}, - {:receiver, Process.dest()} + {:receiver, Process.dest()}, + {:chunk_size, non_neg_integer()} ] @doc """ @@ -57,10 +59,10 @@ defmodule ExRTMP.Client do ## Options * `:uri` - The RTMP server URI to connect to. This option is required. - * `:stream_key` - The stream key. This option is required. - * `:name` - The name to register the client process. This option is optional. + * `:receiver` - The process that will receive the media data when playing a stream. This option is optional and defaults to the calling process. + * `:chunk_size` - The RTMP chunk size to use for data sent to the server. This option is optional. """ @spec start_link(start_options()) :: GenServer.on_start() def start_link(opts) do @@ -134,7 +136,15 @@ defmodule ExRTMP.Client do @impl true def init(opts) do opts = Config.validate!(opts) - state = %State{uri: opts[:uri], stream_key: opts[:stream_key], receiver: opts[:receiver]} + + state = + %State{ + uri: opts[:uri], + stream_key: opts[:stream_key], + receiver: opts[:receiver], + chunk_size: opts[:chunk_size] || @default_chunk_size + } + {:ok, state} end @@ -162,7 +172,7 @@ defmodule ExRTMP.Client do state.stream_key |> Play.new() |> Message.command(state.stream_id) - |> send_message(state.socket) + |> send_message(state) {:noreply, %{state | pending_action: :play, pending_peer: from}} end @@ -177,7 +187,7 @@ defmodule ExRTMP.Client do state.stream_key |> Publish.new("live") |> Message.command(state.stream_id) - |> send_message(state.socket) + |> send_message(state) {:noreply, %{state | pending_action: :publish, pending_peer: from}} end @@ -264,7 +274,8 @@ defmodule ExRTMP.Client do } } - send_message(Message.command(connect), state.socket) + send_message(Message.chunk_size(state.chunk_size), state) + send_message(Message.command(connect), state) {:noreply, %{state | pending_peer: from, pending_action: :connect}} error -> @@ -292,7 +303,7 @@ defmodule ExRTMP.Client do defp create_stream(state) do ts_id = state.next_ts_id - %CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state.socket) + %CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state) %{state | pending_action: :create_stream, next_ts_id: ts_id + 1} end @@ -303,7 +314,7 @@ defmodule ExRTMP.Client do %Event{type: :ping_request, data: timestamp} -> timestamp |> Message.ping_response() - |> send_message(state.socket) + |> send_message(state) state @@ -424,7 +435,7 @@ defmodule ExRTMP.Client do defp do_delete_stream(stream_id, state) do if stream_id do - DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state.socket) + DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state) end :ok = :gen_tcp.close(state.socket) @@ -439,7 +450,7 @@ defmodule ExRTMP.Client do timestamp: timestamp ) - send_message(message, state.socket) + send_message(message, state) end defp handle_play_resp_code("NetStream.Play.Start"), do: :ok @@ -447,7 +458,7 @@ defmodule ExRTMP.Client do defp handle_play_resp_code("NetStream.Play.StreamNotFound"), do: {:error, "Stream not found"} defp handle_play_resp_code("NetStream.Play.Failed"), do: {:error, "Play failed"} - defp send_message(message, socket) do - :ok = :gen_tcp.send(socket, Message.serialize(message)) + defp send_message(message, state) do + :ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size)) end end diff --git a/lib/ex_rtmp/client/state.ex b/lib/ex_rtmp/client/state.ex index cdf43e1..29d9cb7 100644 --- a/lib/ex_rtmp/client/state.ex +++ b/lib/ex_rtmp/client/state.ex @@ -22,7 +22,8 @@ defmodule ExRTMP.Client.State do window_ack_size: non_neg_integer(), stream_id: Message.stream_id() | nil, state: state(), - media_processor: MediaProcessor.t() | nil + media_processor: MediaProcessor.t() | nil, + chunk_size: non_neg_integer() | nil } @enforce_keys [:uri, :stream_key] @@ -34,6 +35,7 @@ defmodule ExRTMP.Client.State do :receiver, :stream_id, :media_processor, + :chunk_size, state: :init, chunk_parser: ChunkParser.new(), next_ts_id: 2, @@ -56,6 +58,11 @@ defmodule ExRTMP.Client.State do @doc false @spec reset(t()) :: t() def reset(state) do - %__MODULE__{uri: state.uri, stream_key: state.stream_key, receiver: state.receiver} + %__MODULE__{ + uri: state.uri, + stream_key: state.stream_key, + receiver: state.receiver, + chunk_size: state.chunk_size + } end end diff --git a/lib/ex_rtmp/message.ex b/lib/ex_rtmp/message.ex index 75b7dfc..0179f65 100644 --- a/lib/ex_rtmp/message.ex +++ b/lib/ex_rtmp/message.ex @@ -11,6 +11,8 @@ defmodule ExRTMP.Message do alias __MODULE__.UserControl.Event alias ExRTMP.{Chunk, Message} + @default_chunk_size 128 + @type stream_id :: non_neg_integer() @type t :: %__MODULE__{ @@ -139,12 +141,10 @@ defmodule ExRTMP.Message do The following options may be provided: - * `:chunk_size` - The size of each chunk (default: 128) * `:chunk_stream_id` - The chunk stream id to use (default: 2) """ - @spec serialize(t(), keyword()) :: binary() - def serialize(message, opts \\ []) do - chunk_size = Keyword.get(opts, :chunk_size, 128) + @spec serialize(t(), non_neg_integer(), keyword()) :: binary() + def serialize(message, chunk_size \\ @default_chunk_size, opts \\ []) do chunk_stream_id = Keyword.get(opts, :chunk_stream_id, 2) payload = diff --git a/lib/ex_rtmp/server.ex b/lib/ex_rtmp/server.ex index eea752a..96a60fa 100644 --- a/lib/ex_rtmp/server.ex +++ b/lib/ex_rtmp/server.ex @@ -29,7 +29,8 @@ defmodule ExRTMP.Server do {:port, :inet.port_number()}, {:handler, module()}, {:handler_options, any()}, - {:demux, boolean()} + {:demux, boolean()}, + {:chunk_size, non_neg_integer()} ] @default_port 1935 @@ -59,6 +60,8 @@ defmodule ExRTMP.Server do * `demux` - Whether the server will demux the incoming RTMP streams into audio and video frames. Defaults to `true`. See [Handling Media](#module-handling-media) below. + + * `chunk_size` - The RTMP chunk size to use for data sent to the clients. """ @spec start_link(start_options()) :: GenServer.on_start() def start_link(opts) do @@ -91,7 +94,8 @@ defmodule ExRTMP.Server do pid: self(), handler: opts[:handler] || raise("Handler module is required"), handler_options: opts[:handler_options], - demux: Keyword.get(opts, :demux, true) + demux: Keyword.get(opts, :demux, true), + chunk_size: opts[:chunk_size] } Logger.info("RTMP Server listening on port #{port}") @@ -139,7 +143,8 @@ defmodule ExRTMP.Server do socket: client_socket, handler: state.handler, handler_options: state.handler_options, - demux: state.demux + demux: state.demux, + chunk_size: state.chunk_size ) :ok = :gen_tcp.controlling_process(client_socket, pid) diff --git a/lib/ex_rtmp/server/client_session.ex b/lib/ex_rtmp/server/client_session.ex index 72bb2c2..bff8857 100644 --- a/lib/ex_rtmp/server/client_session.ex +++ b/lib/ex_rtmp/server/client_session.ex @@ -16,6 +16,7 @@ defmodule ExRTMP.Server.ClientSession do alias ExRTMP.Message.Metadata @default_acknowledgement_size 3_000_000 + @default_chunk_size 128 defmodule State do @moduledoc false @@ -29,7 +30,8 @@ defmodule ExRTMP.Server.ClientSession do handler_state: any(), state: state(), stream_id: non_neg_integer() | nil, - media_processor: MediaProcessor.t() | nil + media_processor: MediaProcessor.t() | nil, + chunk_size: non_neg_integer() } @enforce_keys [:socket] @@ -38,6 +40,7 @@ defmodule ExRTMP.Server.ClientSession do :handler_mod, :handler_state, :media_processor, + :chunk_size, chunk_parser: ChunkParser.new(), state: :init, stream_id: nil @@ -87,7 +90,8 @@ defmodule ExRTMP.Server.ClientSession do handler_mod: handler_mod, handler_state: handler_mod.init(options[:handler_options]), socket: options[:socket], - media_processor: if(options[:demux], do: MediaProcessor.new()) + media_processor: if(options[:demux], do: MediaProcessor.new()), + chunk_size: options[:chunk_size] || @default_chunk_size } :ok = @@ -105,6 +109,7 @@ defmodule ExRTMP.Server.ClientSession do case do_handle_handshake(state.socket) do :ok -> Logger.debug("RTMP Handshake successful") + set_chunk_size(state.socket, state.chunk_size) {:ok, data} = :gen_tcp.recv(state.socket, 0) :ok = :inet.setopts(state.socket, active: true) {:noreply, do_handle_data(state, data)} @@ -116,7 +121,7 @@ defmodule ExRTMP.Server.ClientSession do @impl true def handle_cast({:video_data, timestamp, data}, state) do - case send_media(:video, state.socket, state.stream_id, timestamp, data) do + case send_media(:video, timestamp, data, state) do :ok -> {:noreply, state} {:error, reason} -> {:stop, reason, state} end @@ -124,7 +129,7 @@ defmodule ExRTMP.Server.ClientSession do @impl true def handle_cast({:audio_data, timestamp, data}, state) do - case send_media(:audio, state.socket, state.stream_id, timestamp, data) do + case send_media(:audio, timestamp, data, state) do :ok -> {:noreply, state} {:error, reason} -> {:stop, reason, state} end @@ -133,7 +138,7 @@ defmodule ExRTMP.Server.ClientSession do @impl true def handle_cast({:metadata, data}, state) do message = Message.metadata(data, state.stream_id) - :ok = :gen_tcp.send(state.socket, Message.serialize(message)) + :ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size)) {:noreply, state} end @@ -173,6 +178,13 @@ defmodule ExRTMP.Server.ClientSession do end end + defp set_chunk_size(_socket, @default_chunk_size), do: :ok + + defp set_chunk_size(socket, chunk_size) do + message = Message.chunk_size(chunk_size) + :gen_tcp.send(socket, Message.serialize(message)) + end + defp do_handle_data(state, data) do {messages, parser} = ChunkParser.process(data, state.chunk_parser) Enum.reduce(messages, %{state | chunk_parser: parser}, &handle_message/2) @@ -375,21 +387,24 @@ defmodule ExRTMP.Server.ClientSession do end end - defp send_media(media, socket, stream_id, timestamp, data) do + defp send_media(media, timestamp, data, state) do {type, chunk_stream_id} = case media do - :audio -> {8, stream_id * 3} - :video -> {9, stream_id * 3 + 1} + :audio -> {8, state.stream_id * 3} + :video -> {9, state.stream_id * 3 + 1} end message = %Message{ type: type, timestamp: timestamp, - stream_id: stream_id, + stream_id: state.stream_id, payload: data } - :gen_tcp.send(socket, Message.serialize(message, chunk_stream_id: chunk_stream_id)) + :gen_tcp.send( + state.socket, + Message.serialize(message, state.chunk_size, chunk_stream_id: chunk_stream_id) + ) end defp send_messages(state, []), do: state