Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/shinkai/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ defmodule Shinkai.Application do
{Sources.PublishManager, []},
{Registry, name: Sink.Registry, keys: :duplicate},
{Registry, name: Source.Registry, keys: :unique},
{Task, fn -> Sources.start_all() end}
{Task, fn -> Sources.start_all() end},
{RTSP.Server, handler: Sources.RTSP.Handler, port: 8554}
]

children =
Expand Down
62 changes: 34 additions & 28 deletions lib/shinkai/sink/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ defmodule Shinkai.Sink.Hls do
File.rm_rf!(hls_config[:storage_dir])

:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, tracks_topic(id))
:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, packets_topic(id))
:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, state_topic(id))

{:ok, _} = RequestHolder.start_link(:"request_holder_#{id}")

Process.flag(:trap_exit, true)

{:ok,
%{
writer: Writer.new!(hls_config),
Expand All @@ -71,6 +74,8 @@ defmodule Shinkai.Sink.Hls do
)
end

Logger.info("[#{state.source_id}] [hls] start muxing")

audio_track = Enum.find(hls_tracks, fn t -> t.type == :audio end)
video_track = Enum.find(hls_tracks, fn t -> t.type == :video end)

Expand All @@ -89,7 +94,10 @@ defmodule Shinkai.Sink.Hls do
end

buffer? = length(hls_tracks) > 1 and Enum.any?(hls_tracks, &(&1.type == :video))
:ok = PubSub.subscribe(Shinkai.PubSub, packets_topic(state.source_id))

if hls_tracks == [] do
:ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id))
end

{:noreply,
%{
Expand All @@ -100,30 +108,45 @@ defmodule Shinkai.Sink.Hls do
}}
end

def handle_info({:packet, packets}, state) when is_list(packets) do
{:noreply, Enum.reduce(packets, state, &do_handle_packet/2)}
end

@impl true
def handle_info({:packet, packet}, state) do
def handle_info({:packet, %Shinkai.Packet{} = packet}, state) do
{:noreply, do_handle_packet(packet, state)}
end

def handle_info({:packet, packets}, state) do
{:noreply, Enum.reduce(packets, state, &do_handle_packet/2)}
end

@impl true
def handle_info(:disconnected, state) do
:ok = Writer.close(state.writer)
:ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id))
:ok = PubSub.local_broadcast(Shinkai.PubSub, sink_topic(state.source_id), {:hls, :done})

{:noreply,
%{state | writer: Writer.new!(state.config), last_sample: %{}, packets: [], buffer?: false}}
end

defp do_handle_packet(%{track_id: id}, state) when not is_map_key(state.tracks, id) do
state
defp do_handle_packet(packet, %{buffer?: false} = state)
when is_map_key(state.tracks, packet.track_id) do
case Map.fetch(state.last_sample, packet.track_id) do
:error ->
last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet))
%{state | last_sample: last_samples}

{:ok, last_sample} ->
variant_name = state.tracks[packet.track_id].type |> to_string()
sample = packet_to_sample(packet)
last_sample = %{last_sample | duration: sample.dts - last_sample.dts}

%{
state
| writer: Writer.write_sample(state.writer, variant_name, last_sample),
last_sample: Map.put(state.last_sample, packet.track_id, sample)
}
end
end

defp do_handle_packet(packet, %{buffer?: true} = state) do
defp do_handle_packet(packet, state) when is_map_key(state.tracks, packet.track_id) do
# buffer until we get a video packet
# and then drop all packets with dts < dts of that video packet
track = state.tracks[packet.track_id]
Expand All @@ -143,24 +166,7 @@ defmodule Shinkai.Sink.Hls do
end
end

defp do_handle_packet(packet, state) do
case Map.fetch(state.last_sample, packet.track_id) do
:error ->
last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet))
%{state | last_sample: last_samples}

{:ok, last_sample} ->
variant_name = state.tracks[packet.track_id].type |> to_string()
sample = packet_to_sample(packet)
last_sample = %{last_sample | duration: sample.dts - last_sample.dts}

%{
state
| writer: Writer.write_sample(state.writer, variant_name, last_sample),
last_sample: Map.put(state.last_sample, packet.track_id, sample)
}
end
end
defp do_handle_packet(_packet, state), do: state

defp reject?(packet, state, max_dts) do
track = state.tracks[packet.track_id]
Expand Down
27 changes: 19 additions & 8 deletions lib/shinkai/sink/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Shinkai.Sink.RTMP do

if unsupported_tracks != [] do
Logger.warning(
"[#{state.source_id}] Ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}"
"[#{state.source_id}] rtmp sink: ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}"
)
end

Expand All @@ -79,14 +79,13 @@ defmodule Shinkai.Sink.RTMP do
def handle_info({:packet, packets}, state) do
Registry.dispatch(Sink.Registry, {:rtmp, state.source_id}, fn entries ->
packets = List.wrap(packets)
track = state.tracks[hd(packets).track_id]
tags = Enum.map(packets, &packet_to_tag(track, &1))

for {pid, _} <- entries, {timestamp, data} <- tags do
case track.type do
:video -> ClientSession.send_video_data(pid, timestamp, data)
:audio -> ClientSession.send_audio_data(pid, timestamp, data)
end
case state.tracks[hd(packets).track_id] do
nil ->
:ok

track ->
dispatch_packets(entries, packets, track)
end
end)

Expand All @@ -106,6 +105,18 @@ defmodule Shinkai.Sink.RTMP do
{:noreply, state}
end

defp dispatch_packets(entries, packets, track) do
tags = Enum.map(packets, &packet_to_tag(track, &1))

for {pid, _} <- entries, {timestamp, data} <- tags do
# credo:disable-for-next-line
case track.type do
:video -> ClientSession.send_video_data(pid, timestamp, data)
:audio -> ClientSession.send_audio_data(pid, timestamp, data)
end
end
end

defp packet_to_tag(track, packet) do
dts = div(packet.dts * @timescale, track.timescale)
cts = div((packet.pts - packet.dts) * @timescale, track.timescale)
Expand Down
2 changes: 1 addition & 1 deletion lib/shinkai/sources.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Shinkai.Sources do
[] ->
if source.type == :publish do
:ok = PublishManager.monitor(source, self())
:ets.insert(:sources, {source.id, source})
:ets.insert(:sources, {source.id, %{source | status: :streaming}})
end

DynamicSupervisor.start_child(
Expand Down
83 changes: 10 additions & 73 deletions lib/shinkai/sources/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ defmodule Shinkai.Sources.RTSP do

require Logger

import Shinkai.Utils

alias MediaCodecs.MPEG4
alias Shinkai.{Sources, Track}
alias Shinkai.Sources
alias Shinkai.Sources.RTSP.MediaProcessor

@timeout 6_000
@reconnect_timeout 5_000
Expand All @@ -33,7 +31,7 @@ defmodule Shinkai.Sources.RTSP do
id: source.id,
rtsp_pid: pid,
tracks: %{},
packets_topic: packets_topic(source.id)
media_processor: nil
}

{:ok, state, {:continue, :connect}}
Expand All @@ -46,21 +44,15 @@ defmodule Shinkai.Sources.RTSP do
def handle_info(:reconnect, state), do: do_connect(state)

def handle_info({:rtsp, _pid, {id, sample_or_samples}}, state) do
:ok =
Phoenix.PubSub.broadcast(
Shinkai.PubSub,
state.packets_topic,
{:packet, to_packets(sample_or_samples, state.tracks[id].id)}
)

{:noreply, state}
media_processor = MediaProcessor.handle_sample(id, sample_or_samples, state.media_processor)
{:noreply, %{state | media_processor: media_processor}}
end

@impl true
def handle_info({:rtsp, pid, :session_closed}, %{rtsp_pid: pid} = state) do
Logger.error("[#{state.id}] rtsp client disconnected")
Phoenix.PubSub.broadcast!(Shinkai.PubSub, state_topic(state.id), :disconnected)
Sources.update_source_status(state.id, :failed)
Phoenix.PubSub.broadcast!(Shinkai.PubSub, Shinkai.Utils.state_topic(state.id), :disconnected)
update_status(state, :failed)
Process.send_after(self(), :reconnect, @reconnect_timeout)
{:noreply, state}
end
Expand All @@ -73,21 +65,10 @@ defmodule Shinkai.Sources.RTSP do

defp do_connect(state) do
with {:ok, tracks} <- RTSP.connect(state.rtsp_pid, @timeout),
tracks <- build_tracks(tracks),
:ok <- RTSP.play(state.rtsp_pid) do
codecs = tracks |> Map.values() |> Enum.map_join(", ", & &1.codec)
Logger.info("[#{state.id}] start reading from #{map_size(tracks)} tracks (#{codecs})")

update_status(state, :streaming)

:ok =
Phoenix.PubSub.broadcast(
Shinkai.PubSub,
tracks_topic(state.id),
{:tracks, Map.values(tracks)}
)

{:noreply, %{state | tracks: tracks}}
media_processor = MediaProcessor.new(state.id, tracks)
{:noreply, %{state | media_processor: media_processor}}
else
{:error, reason} ->
Logger.error("[#{state.id}] rtsp connection failed: #{inspect(reason)}")
Expand All @@ -97,49 +78,5 @@ defmodule Shinkai.Sources.RTSP do
end
end

defp build_tracks(tracks) do
tracks
|> Enum.with_index(1)
|> Map.new(fn {track, id} ->
codec = codec(String.downcase(track.rtpmap.encoding))

{track.control_path,
Track.new(
id: id,
type: track.type,
codec: codec,
timescale: track.rtpmap.clock_rate,
priv_data: priv_data(codec, track.fmtp)
)}
end)
end

defp codec("mpeg4-generic"), do: :aac
defp codec(other), do: String.to_atom(other)

defp priv_data(:aac, fmtp), do: MPEG4.AudioSpecificConfig.parse(fmtp.config)
defp priv_data(:h264, %{sprop_parameter_sets: nil}), do: nil
defp priv_data(:h264, %{sprop_parameter_sets: pps}), do: {pps.sps, [pps.pps]}
defp priv_data(:h265, %{sprop_vps: nil}), do: nil
defp priv_data(:h265, fmtp), do: {hd(fmtp.sprop_vps), hd(fmtp.sprop_sps), fmtp.sprop_pps}
defp priv_data(_codec, _fmtp), do: nil

defp to_packets(samples, track_id) when is_list(samples) do
Enum.map(samples, &packet_from_sample(track_id, &1))
end

defp to_packets(sample, track_id), do: packet_from_sample(track_id, sample)

defp packet_from_sample(track_id, {payload, pts, sync?, _timestamp}) do
Shinkai.Packet.new(payload,
track_id: track_id,
dts: pts,
pts: pts,
sync?: sync?
)
end

defp update_status(state, status) do
Sources.update_source_status(state.id, status)
end
defp update_status(state, status), do: Sources.update_source_status(state.id, status)
end
38 changes: 38 additions & 0 deletions lib/shinkai/sources/rtsp/handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Shinkai.Sources.RTSP.Handler do
@moduledoc false

use RTSP.Server.ClientHandler

require Logger

alias Shinkai.Sources
alias Shinkai.Sources.RTSP.MediaProcessor

@impl true
def init(_options) do
nil
end

@impl true
def handle_record(path, tracks, _state) do
with {:ok, source_id} <- source_id(path),
source <- %Sources.Source{id: source_id, type: :publish},
{:ok, _pid} <- Sources.start(source) do
Logger.info("[RTSP] is publishing to: #{path}")
{:ok, MediaProcessor.new(source_id, tracks)}
end
end

@impl true
def handle_media(control_path, sample, state) do
MediaProcessor.handle_sample(control_path, sample, state)
end

@impl true
def handle_closed_connection(state) do
MediaProcessor.close(state)
end

defp source_id("/"), do: {:error, :missing_path}
defp source_id(<<"/", path::binary>>), do: {:ok, String.replace(path, "/", "-")}
end
Loading
Loading