Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/plug/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ if Code.ensure_loaded?(Plug) do
end
end

forward "/webrtc", to: Plug.Shinkai.Router.WebRTC

match _ do
send_resp(conn, 404, "Not Found")
end
Expand Down
62 changes: 62 additions & 0 deletions lib/plug/router/webrtc.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
if Code.ensure_loaded?(Plug) do
defmodule Plug.Shinkai.Router.WebRTC do
@moduledoc false
require Logger

require EEx

use Plug.Router
use Plug.ErrorHandler

plug :match
plug :dispatch

EEx.function_from_file(:defp, :webrtc_index, "lib/plug/templates/webrtc.html.eex", [:assigns])

get "/:source_id" do
conn
|> put_resp_content_type("text/html")
|> send_resp(200, webrtc_index(source_id: source_id))
end

post "/:source_id/whep" do
case Shinkai.Sources.add_webrtc_peer(source_id) do
{:ok, sdp_offer, session_id} ->
conn
|> put_resp_content_type("application/sdp")
|> put_resp_header("location", "/webrtc/#{source_id}/whep/#{session_id}")
|> send_resp(416, sdp_offer)

{:error, reason} ->
Logger.error("Failed to create WebRTC peer: #{inspect(reason)}")
send_resp(conn, 400, "Bad Request")
end
end

patch "/:source_id/whep/:session_id" do
case get_req_header(conn, "content-type") do
["application/sdp"] ->
{:ok, body, conn} = Plug.Conn.read_body(conn)

case Shinkai.Sources.handle_webrtc_peer_answer(source_id, session_id, body) do
:ok ->
send_resp(conn, 204, "")

{:error, reason} ->
Logger.error("Failed to handle WebRTC peer answer: #{inspect(reason)}")
send_resp(conn, 400, "Bad Request")
end

send_resp(conn, 204, "")

_ ->
send_resp(conn, 415, "Unsupported Media Type")
end
end

delete "/:source_id/whep/:session_id" do
Shinkai.Sources.remove_webrtc_peer(source_id, session_id)
send_resp(conn, 204, "")
end
end
end
60 changes: 60 additions & 0 deletions lib/plug/templates/webrtc.html.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Shinkai WebRTC</title>
<style>
html,
body {
margin: 0;
padding: 0;
width: 100%;
height: 100%;
overflow: hidden;
background: black;
}

video {
width: 100vw;
height: 100vh;
}
</style>
</head>
<body>
<video id="video" controls autoplay muted></video>
<script>
(async () => {
const avStream = new MediaStream();
const pc = new RTCPeerConnection();
let response = await fetch("/webrtc/<%= @source_id %>/whep", {
method: "POST",
headers: { "Content-Type": "application/sdp" },
body: "",
});

pc.addEventListener("track", (e) => {
const video = document.getElementById("video");
video.srcObject = e.streams[0];
});

if (response.status == 416) {
const session_url = response.headers.get("Location");
const offer = await response.text();
await pc.setRemoteDescription(
new RTCSessionDescription({
type: "offer",
sdp: offer,
}),
);
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
await fetch(session_url, {
method: "PATCH",
headers: { "Content-Type": "application/sdp" },
body: pc.localDescription.sdp,
});
}
})();
</script>
</body>
</html>
20 changes: 18 additions & 2 deletions lib/shinkai/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ defmodule Shinkai.Pipeline do

@spec add_rtmp_client(String.t()) :: :ok
def add_rtmp_client(source_id) do
Sink.RTMP.add_client({:via, Registry, {Source.Registry, {:rtmp_sink, source_id}}}, self())
Sink.RTMP.add_client(rtmp_name(source_id), self())
end

def add_webrtc_peer(source_id) do
Sink.WebRTC.add_new_peer(webrtc_name(source_id))
end

def handle_webrtc_peer_answer(source_id, session_id, sdp_answer) do
Sink.WebRTC.handle_peer_answer(webrtc_name(source_id), session_id, sdp_answer)
end

def remove_webrtc_peer(source_id, session_id) do
Sink.WebRTC.remove_peer(webrtc_name(source_id), session_id)
end

def stop(source_id) do
Expand All @@ -29,7 +41,8 @@ defmodule Shinkai.Pipeline do

children =
[
{Sink.Hls, [id: id] ++ hls_config}
{Sink.Hls, [id: id] ++ hls_config},
{Sink.WebRTC, id: id}
] ++ rtmp_sink(rtmp_config[:enabled], id) ++ source(source)

Supervisor.init(children, strategy: :one_for_all)
Expand All @@ -41,4 +54,7 @@ defmodule Shinkai.Pipeline do

defp rtmp_sink(false, _id), do: []
defp rtmp_sink(true, id), do: [{Sink.RTMP, [id: id]}]

defp webrtc_name(id), do: {:via, Registry, {Source.Registry, {:webrtc_sink, id}}}
defp rtmp_name(id), do: {:via, Registry, {Source.Registry, {:rtmp_sink, id}}}
end
218 changes: 218 additions & 0 deletions lib/shinkai/sink/webrtc.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
defmodule Shinkai.Sink.WebRTC do
@moduledoc false

use GenServer

require Logger

import Shinkai.Utils

alias __MODULE__.PeerManager
alias ExWebRTC.RTPCodecParameters
alias Phoenix.PubSub
alias RTSP.RTP.Encoder, as: RTPEncoder

@supported_codecs [:h264, :h265, :av1, :pcma]
@video_clock_rate 90_000

def start_link(opts) do
id = {:via, Registry, {Source.Registry, {:webrtc_sink, opts[:id]}}}
GenServer.start_link(__MODULE__, opts, name: id)
end

@spec add_new_peer(GenServer.server()) :: {:ok, String.t(), String.t()} | {:error, any()}
def add_new_peer(server) do
GenServer.call(server, :add_new_peer)
end

@spec handle_peer_answer(
GenServer.server(),
session_id :: String.t(),
sdp :: String.t()
) :: :ok | {:error, any()}
def handle_peer_answer(server, session_id, sdp) do
GenServer.call(server, {:handle_peer_answer, session_id, sdp})
end

@spec remove_peer(GenServer.server(), session_id :: String.t()) :: :ok
def remove_peer(server, session_id) do
GenServer.cast(server, {:remove_peer, session_id})
end

@impl true
def init(opts) do
source_id = opts[:id]
{:ok, peer_manager} = PeerManager.start_link(source_id: source_id)

PubSub.subscribe(Shinkai.PubSub, tracks_topic(source_id))

{:ok,
%{
peer_manager: peer_manager,
source_id: source_id,
packets_topic: packets_topic(source_id),
tracks: %{}
}}
end

@impl true
def handle_call(:add_new_peer, _from, %{video_tracks: [], audio_tracks: []} = state) do
{:reply, {:error, :no_tracks}, state}
end

def handle_call(:add_new_peer, from, state) do
:ok = PeerManager.add_peer(state.peer_manager, from)
{:noreply, state}
end

def handle_call({:handle_peer_answer, session_id, sdp}, from, state) do
:ok = PeerManager.handle_peer_answer(state.peer_manager, from, session_id, sdp)
{:noreply, state}
end

@impl true
def handle_cast({:remove_peer, session_id}, state) do
:ok = PeerManager.remove_peer(state.peer_manager, session_id)
{:noreply, state}
end

@impl true
def handle_info({:tracks, tracks}, state) do
{tracks, unsupported_tracks} = Enum.split_with(tracks, &(&1.codec in @supported_codecs))

if unsupported_tracks != [] do
Logger.warning(
"Unsupported codecs received in WebRTC sink: #{join_codecs(unsupported_tracks)}"
)
end

video_track = Enum.find(tracks, fn t -> t.type == :video end)
audio_track = Enum.find(tracks, fn t -> t.type == :audio end)

stream_id = ExWebRTC.MediaStreamTrack.generate_stream_id()

state =
[video_track, audio_track]
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state, fn track, state ->
media_stream = ExWebRTC.MediaStreamTrack.new(track.type, [stream_id])
webrtc_track = webrtc_track(track)

payloader_mod = payloader_mod(track.codec)

track_ctx = %{
id: media_stream.id,
timescale: track.timescale,
target_timescale: webrtc_track.clock_rate,
payloader_mod: payloader_mod,
payloader_state: payloader_mod.init([])
}

if track.type == :video,
do: PeerManager.add_video_track(state.peer_manager, {media_stream, webrtc_track}),
else: PeerManager.add_audio_track(state.peer_manager, {media_stream, webrtc_track})

%{state | tracks: Map.put(state.tracks, track.id, track_ctx)}
end)

:ok = PubSub.subscribe(Shinkai.PubSub, state.packets_topic)

{:noreply, state}
end

@impl true
def handle_info({:packet, packets}, state) when is_list(packets) do
track_id = hd(packets).track_id

case(Map.fetch(state, track_id)) do
:error ->
{:noreply, state}

{:ok, track_ctx} ->
track_ctx =
Enum.reduce(packets, track_ctx, fn packet, track_ctx ->
do_handle_packet(packet, state.source_id, track_ctx)
end)

{:noreply, %{state | tracks: Map.put(state.tracks, track_id, track_ctx)}}
end
end

def handle_info({:packet, packet}, state) do
case Map.fetch(state.tracks, packet.track_id) do
:error ->
{:noreply, state}

{:ok, track_ctx} ->
track_ctx = do_handle_packet(packet, state.source_id, track_ctx)
{:noreply, %{state | tracks: Map.put(state.tracks, packet.track_id, track_ctx)}}
end
end

def handle_info(_msg, state) do
{:noreply, state}
end

defp do_handle_packet(packet, source_id, track_ctx) do
rtp_timestamp =
ExMP4.Helper.timescalify(packet.pts, track_ctx.timescale, track_ctx.target_timescale)

{packets, payloader_state} =
track_ctx.payloader_mod.handle_sample(
packet.data,
rtp_timestamp,
track_ctx.payloader_state
)

track_id = track_ctx.id

Registry.dispatch(Sink.Registry, {:webrtc, source_id}, fn peers ->
for {_pid, {pc, _session_id}} <- peers do
Enum.each(packets, fn rtp_packet ->
:ok = ExWebRTC.PeerConnection.send_rtp(pc, track_id, rtp_packet)
end)
end
end)

%{track_ctx | payloader_state: payloader_state}
end

defp webrtc_track(track) do
pt = payload_type(track.codec)

%RTPCodecParameters{
payload_type: pt,
mime_type: mime_type(track.codec),
clock_rate: clock_rate(track),
channels: if(track.type == :audio, do: 1, else: nil),
sdp_fmtp_line: sdp_fmtp_line(track.codec, pt)
}
end

defp clock_rate(%{type: :video}), do: @video_clock_rate
defp clock_rate(%{timescale: timescale}), do: timescale

defp payload_type(:pcma), do: 8
defp payload_type(_codec), do: 96

defp mime_type(:h264), do: "video/H264"
defp mime_type(:h265), do: "video/H265"
defp mime_type(:av1), do: "video/AV1"
defp mime_type(:pcma), do: "audio/PCMA"

defp sdp_fmtp_line(:h264, pt) do
%ExSDP.Attribute.FMTP{
pt: pt,
level_asymmetry_allowed: true,
packetization_mode: 1,
profile_level_id: 0x42E01F
}
end

defp sdp_fmtp_line(_codec, _pt), do: nil

defp payloader_mod(:h264), do: RTPEncoder.H264
defp payloader_mod(:h265), do: RTPEncoder.H265
defp payloader_mod(:av1), do: RTPEncoder.AV1
defp payloader_mod(:pcma), do: RTPEncoder.G711
end
Loading
Loading