Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/publish_mp4.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule Publisher do
%ExVideoData{
frame_type: :keyframe,
packet_type: :sequence_start,
fourcc: :hvc1,
codec_id: :hvc1,
data: dcr
}
end
Expand All @@ -125,7 +125,7 @@ defmodule Publisher do
%ExVideoData{
frame_type: if(keyframe?, do: :keyframe, else: :interframe),
packet_type: :coded_frames,
fourcc: :hvc1,
codec_id: :hvc1,
composition_time_offset: ct,
data: sample.payload
}
Expand Down
37 changes: 24 additions & 13 deletions lib/ex_rtmp/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,27 @@ defmodule ExRTMP.Client do
alias ExRTMP.Message.UserControl.Event

@default_buffer_size 2_000_000
@default_chunk_size 1024
@audio_msg_type 8
@video_msg_type 9

@type start_options :: [
{:uri, String.t()},
{:stream_key, String.t()},
{:name, GenServer.name()},
{:receiver, Process.dest()}
{:receiver, Process.dest()},
{:chunk_size, non_neg_integer()}
]

@doc """
Starts and links a new RTMP client.

## Options
* `:uri` - The RTMP server URI to connect to. This option is required.

* `:stream_key` - The stream key. This option is required.

* `:name` - The name to register the client process. This option is optional.
* `:receiver` - The process that will receive the media data when playing a stream. This option is optional and defaults to the calling process.
* `:chunk_size` - The RTMP chunk size to use for data sent to the server. This option is optional.
"""
@spec start_link(start_options()) :: GenServer.on_start()
def start_link(opts) do
Expand Down Expand Up @@ -134,7 +136,15 @@ defmodule ExRTMP.Client do
@impl true
def init(opts) do
opts = Config.validate!(opts)
state = %State{uri: opts[:uri], stream_key: opts[:stream_key], receiver: opts[:receiver]}

state =
%State{
uri: opts[:uri],
stream_key: opts[:stream_key],
receiver: opts[:receiver],
chunk_size: opts[:chunk_size] || @default_chunk_size
}

{:ok, state}
end

Expand Down Expand Up @@ -162,7 +172,7 @@ defmodule ExRTMP.Client do
state.stream_key
|> Play.new()
|> Message.command(state.stream_id)
|> send_message(state.socket)
|> send_message(state)

{:noreply, %{state | pending_action: :play, pending_peer: from}}
end
Expand All @@ -177,7 +187,7 @@ defmodule ExRTMP.Client do
state.stream_key
|> Publish.new("live")
|> Message.command(state.stream_id)
|> send_message(state.socket)
|> send_message(state)

{:noreply, %{state | pending_action: :publish, pending_peer: from}}
end
Expand Down Expand Up @@ -264,7 +274,8 @@ defmodule ExRTMP.Client do
}
}

send_message(Message.command(connect), state.socket)
send_message(Message.chunk_size(state.chunk_size), state)
send_message(Message.command(connect), state)
{:noreply, %{state | pending_peer: from, pending_action: :connect}}

error ->
Expand Down Expand Up @@ -292,7 +303,7 @@ defmodule ExRTMP.Client do

defp create_stream(state) do
ts_id = state.next_ts_id
%CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state.socket)
%CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state)
%{state | pending_action: :create_stream, next_ts_id: ts_id + 1}
end

Expand All @@ -303,7 +314,7 @@ defmodule ExRTMP.Client do
%Event{type: :ping_request, data: timestamp} ->
timestamp
|> Message.ping_response()
|> send_message(state.socket)
|> send_message(state)

state

Expand Down Expand Up @@ -424,7 +435,7 @@ defmodule ExRTMP.Client do

defp do_delete_stream(stream_id, state) do
if stream_id do
DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state.socket)
DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state)
end

:ok = :gen_tcp.close(state.socket)
Expand All @@ -439,15 +450,15 @@ defmodule ExRTMP.Client do
timestamp: timestamp
)

send_message(message, state.socket)
send_message(message, state)
end

defp handle_play_resp_code("NetStream.Play.Start"), do: :ok
defp handle_play_resp_code("NetStream.Play.Reset"), do: :ignore
defp handle_play_resp_code("NetStream.Play.StreamNotFound"), do: {:error, "Stream not found"}
defp handle_play_resp_code("NetStream.Play.Failed"), do: {:error, "Play failed"}

defp send_message(message, socket) do
:ok = :gen_tcp.send(socket, Message.serialize(message))
defp send_message(message, state) do
:ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size))
end
end
11 changes: 9 additions & 2 deletions lib/ex_rtmp/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule ExRTMP.Client.State do
window_ack_size: non_neg_integer(),
stream_id: Message.stream_id() | nil,
state: state(),
media_processor: MediaProcessor.t() | nil
media_processor: MediaProcessor.t() | nil,
chunk_size: non_neg_integer() | nil
}

@enforce_keys [:uri, :stream_key]
Expand All @@ -34,6 +35,7 @@ defmodule ExRTMP.Client.State do
:receiver,
:stream_id,
:media_processor,
:chunk_size,
state: :init,
chunk_parser: ChunkParser.new(),
next_ts_id: 2,
Expand All @@ -56,6 +58,11 @@ defmodule ExRTMP.Client.State do
@doc false
@spec reset(t()) :: t()
def reset(state) do
%__MODULE__{uri: state.uri, stream_key: state.stream_key, receiver: state.receiver}
%__MODULE__{
uri: state.uri,
stream_key: state.stream_key,
receiver: state.receiver,
chunk_size: state.chunk_size
}
end
end
8 changes: 4 additions & 4 deletions lib/ex_rtmp/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule ExRTMP.Message do
alias __MODULE__.UserControl.Event
alias ExRTMP.{Chunk, Message}

@default_chunk_size 128

@type stream_id :: non_neg_integer()

@type t :: %__MODULE__{
Expand Down Expand Up @@ -139,12 +141,10 @@ defmodule ExRTMP.Message do

The following options may be provided:

* `:chunk_size` - The size of each chunk (default: 128)
* `:chunk_stream_id` - The chunk stream id to use (default: 2)
"""
@spec serialize(t(), keyword()) :: binary()
def serialize(message, opts \\ []) do
chunk_size = Keyword.get(opts, :chunk_size, 128)
@spec serialize(t(), non_neg_integer(), keyword()) :: binary()
def serialize(message, chunk_size \\ @default_chunk_size, opts \\ []) do
chunk_stream_id = Keyword.get(opts, :chunk_stream_id, 2)

payload =
Expand Down
11 changes: 8 additions & 3 deletions lib/ex_rtmp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule ExRTMP.Server do
{:port, :inet.port_number()},
{:handler, module()},
{:handler_options, any()},
{:demux, boolean()}
{:demux, boolean()},
{:chunk_size, non_neg_integer()}
]

@default_port 1935
Expand Down Expand Up @@ -59,6 +60,8 @@ defmodule ExRTMP.Server do

* `demux` - Whether the server will demux the incoming RTMP streams into
audio and video frames. Defaults to `true`. See [Handling Media](#module-handling-media) below.

* `chunk_size` - The RTMP chunk size to use for data sent to the clients.
"""
@spec start_link(start_options()) :: GenServer.on_start()
def start_link(opts) do
Expand Down Expand Up @@ -91,7 +94,8 @@ defmodule ExRTMP.Server do
pid: self(),
handler: opts[:handler] || raise("Handler module is required"),
handler_options: opts[:handler_options],
demux: Keyword.get(opts, :demux, true)
demux: Keyword.get(opts, :demux, true),
chunk_size: opts[:chunk_size]
}

Logger.info("RTMP Server listening on port #{port}")
Expand Down Expand Up @@ -139,7 +143,8 @@ defmodule ExRTMP.Server do
socket: client_socket,
handler: state.handler,
handler_options: state.handler_options,
demux: state.demux
demux: state.demux,
chunk_size: state.chunk_size
)

:ok = :gen_tcp.controlling_process(client_socket, pid)
Expand Down
35 changes: 25 additions & 10 deletions lib/ex_rtmp/server/client_session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule ExRTMP.Server.ClientSession do
alias ExRTMP.Message.Metadata

@default_acknowledgement_size 3_000_000
@default_chunk_size 128

defmodule State do
@moduledoc false
Expand All @@ -29,7 +30,8 @@ defmodule ExRTMP.Server.ClientSession do
handler_state: any(),
state: state(),
stream_id: non_neg_integer() | nil,
media_processor: MediaProcessor.t() | nil
media_processor: MediaProcessor.t() | nil,
chunk_size: non_neg_integer()
}

@enforce_keys [:socket]
Expand All @@ -38,6 +40,7 @@ defmodule ExRTMP.Server.ClientSession do
:handler_mod,
:handler_state,
:media_processor,
:chunk_size,
chunk_parser: ChunkParser.new(),
state: :init,
stream_id: nil
Expand Down Expand Up @@ -87,7 +90,8 @@ defmodule ExRTMP.Server.ClientSession do
handler_mod: handler_mod,
handler_state: handler_mod.init(options[:handler_options]),
socket: options[:socket],
media_processor: if(options[:demux], do: MediaProcessor.new())
media_processor: if(options[:demux], do: MediaProcessor.new()),
chunk_size: options[:chunk_size] || @default_chunk_size
}

:ok =
Expand All @@ -105,6 +109,7 @@ defmodule ExRTMP.Server.ClientSession do
case do_handle_handshake(state.socket) do
:ok ->
Logger.debug("RTMP Handshake successful")
set_chunk_size(state.socket, state.chunk_size)
{:ok, data} = :gen_tcp.recv(state.socket, 0)
:ok = :inet.setopts(state.socket, active: true)
{:noreply, do_handle_data(state, data)}
Expand All @@ -116,15 +121,15 @@ defmodule ExRTMP.Server.ClientSession do

@impl true
def handle_cast({:video_data, timestamp, data}, state) do
case send_media(:video, state.socket, state.stream_id, timestamp, data) do
case send_media(:video, timestamp, data, state) do
:ok -> {:noreply, state}
{:error, reason} -> {:stop, reason, state}
end
end

@impl true
def handle_cast({:audio_data, timestamp, data}, state) do
case send_media(:audio, state.socket, state.stream_id, timestamp, data) do
case send_media(:audio, timestamp, data, state) do
:ok -> {:noreply, state}
{:error, reason} -> {:stop, reason, state}
end
Expand All @@ -133,7 +138,7 @@ defmodule ExRTMP.Server.ClientSession do
@impl true
def handle_cast({:metadata, data}, state) do
message = Message.metadata(data, state.stream_id)
:ok = :gen_tcp.send(state.socket, Message.serialize(message))
:ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size))
{:noreply, state}
end

Expand Down Expand Up @@ -173,6 +178,13 @@ defmodule ExRTMP.Server.ClientSession do
end
end

defp set_chunk_size(_socket, @default_chunk_size), do: :ok

defp set_chunk_size(socket, chunk_size) do
message = Message.chunk_size(chunk_size)
:gen_tcp.send(socket, Message.serialize(message))
end

defp do_handle_data(state, data) do
{messages, parser} = ChunkParser.process(data, state.chunk_parser)
Enum.reduce(messages, %{state | chunk_parser: parser}, &handle_message/2)
Expand Down Expand Up @@ -375,21 +387,24 @@ defmodule ExRTMP.Server.ClientSession do
end
end

defp send_media(media, socket, stream_id, timestamp, data) do
defp send_media(media, timestamp, data, state) do
{type, chunk_stream_id} =
case media do
:audio -> {8, stream_id * 3}
:video -> {9, stream_id * 3 + 1}
:audio -> {8, state.stream_id * 3}
:video -> {9, state.stream_id * 3 + 1}
end

message = %Message{
type: type,
timestamp: timestamp,
stream_id: stream_id,
stream_id: state.stream_id,
payload: data
}

:gen_tcp.send(socket, Message.serialize(message, chunk_stream_id: chunk_stream_id))
:gen_tcp.send(
state.socket,
Message.serialize(message, state.chunk_size, chunk_stream_id: chunk_stream_id)
)
end

defp send_messages(state, []), do: state
Expand Down