Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ config :shinkai, :server, enabled: false

config :shinkai, :rtmp, port: 0

config :shinkai, :rtsp, enabled: false

config :shinkai, :hls, storage_dir: "tmp"
34 changes: 22 additions & 12 deletions lib/shinkai.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ defmodule Shinkai do

To configure the http server responsible for serving HLS streams.

* `enabled` - Enable or disable the HTTP server.
* `port` - Port number for the HTTP server.
* `certfile` - Path to the SSL certificate file (optional).
* `keyfile` - Path to the SSL key file (optional).
#{NimbleOptions.docs(Shinkai.Config.server_schema())}

```elixir
config :shinkai, :server,
Expand All @@ -40,12 +37,7 @@ defmodule Shinkai do

To configure HLS streaming options.

* `storage_dir` - Directory to store HLS segments.
* `max_segments` - Maximum number of segments to keep.
* `segment_duration` - Segment duration in milliseconds.
* `part_duration` - Part duration in milliseconds.
* `segment_type` - Type of segments to generate, either `fmp4`,
`mpeg_ts`, or `low_latency`.
#{NimbleOptions.docs(Shinkai.Config.hls_schema())}

```elixir
config :shinkai, :hls,
Expand All @@ -66,8 +58,8 @@ defmodule Shinkai do
### RTMP

To configure the RTMP server.
* `enabled` - Enable or disable the RTMP server.
* `port` - Port number for the RTMP server.

#{NimbleOptions.docs(Shinkai.Config.rtmp_schema())}

```elixir
config :shinkai, :rtmp,
Expand All @@ -81,6 +73,24 @@ defmodule Shinkai do
port: 1935 # Port number for the RTMP server (default: 1935)
```

### RTSP

To configure the RTSP server.

#{NimbleOptions.docs(Shinkai.Config.rtsp_schema())}

```elixir
config :shinkai, :rtsp,
enabled: true,
port: 8554
```

```yaml
rtsp:
enabled: true # Enable or disable the RTSP server (default: true)
port: 8554 # Port number for the RTSP server (default: 8554)
```

### Paths

To configure media source paths. Each source should have a unique alphanumeric ID.
Expand Down
10 changes: 8 additions & 2 deletions lib/shinkai/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ defmodule Shinkai.Application do
{Sources.PublishManager, []},
{Registry, name: Sink.Registry, keys: :duplicate},
{Registry, name: Source.Registry, keys: :unique},
{Task, fn -> Sources.start_all() end},
{RTSP.Server, handler: Sources.RTSP.Handler, port: 8554}
{Task, fn -> Sources.start_all() end}
]

children =
Expand All @@ -27,6 +26,13 @@ defmodule Shinkai.Application do
children
end

children =
if config[:rtsp][:enabled] do
children ++ [{RTSP.Server, handler: Sources.RTSP.Handler, port: config[:rtsp][:port]}]
else
children
end

children =
if Code.ensure_loaded?(Bandit) and config[:server][:enabled] do
children ++ [{Bandit, configure_bandit(config[:server])}]
Expand Down
264 changes: 130 additions & 134 deletions lib/shinkai/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,107 @@ defmodule Shinkai.Config do

use GenServer

@top_level_keys [:rtmp, :server, :hls]
@top_level_keys [:rtmp, :server, :hls, :rtsp]

@default_config [
rtmp: [
enabled: true,
port: 1935
@rtmp_schema [
enabled: [
type: :boolean,
default: true,
doc: "Enable or disable rtmp"
],
server: [
enabled: true,
port: 8888,
certfile: nil,
keyfile: nil
port: [
type: {:in, 0..(2 ** 16 - 1)},
default: 1935,
doc: "RTMP listening port",
type_doc: "`t::socket.port_number/0`",
type_spec: quote(do: :socket.port_number())
]
]

@rtsp_schema [
enabled: [
type: :boolean,
default: true,
doc: "Enable or disable rtsp"
],
port: [
type: {:in, 0..(2 ** 16 - 1)},
default: 8554,
doc: "RTSP listening port",
type_doc: "`t::socket.port_number/0`",
type_spec: quote(do: :socket.port_number())
]
]

@server_schema [
enabled: [
type: :boolean,
default: true,
doc: "Enable or disable http(s) server"
],
port: [
type: {:in, 0..(2 ** 16 - 1)},
default: 8888,
doc: "http port",
type_doc: "`t::socket.port_number/0`",
type_spec: quote(do: :socket.port_number())
],
certfile: [
type: {:or, [:string, nil]},
default: nil,
doc: "https certificate"
],
keyfile: [
type: {:or, [:string, nil]},
default: nil,
doc: "https private key certificate"
]
]

@hls_schema [
storage_dir: [
type: :string,
default: "/tmp/shinkai/hls",
doc: "Directory to store HLS segments"
],
max_segments: [
type: :non_neg_integer,
default: 7,
doc: "Max segments to keep in live playlists"
],
segment_duration: [
type: :non_neg_integer,
default: 2000,
doc: "Segment duration in milliseconds"
],
hls: [
storage_dir: "/tmp/shinkai/hls",
max_segments: 7,
segment_duration: 2_000,
part_duration: 500,
segment_type: :fmp4
part_duration: [
type: :non_neg_integer,
default: 300,
doc: "Part duration in milliseconds for low-latency HLS"
],
segment_type: [
type: {:custom, __MODULE__, :validate_hls_segment_type, []},
default: :fmp4,
doc: "Type of segments to generate, either `:fmp4`, `:mpeg_ts` or `:low_latency`"
]
]

@doc false
@spec server_schema() :: keyword()
def server_schema, do: @server_schema

@doc false
@spec rtmp_schema() :: keyword()
def rtmp_schema, do: @rtmp_schema

@doc false
@spec hls_schema() :: keyword()
def hls_schema, do: @hls_schema

@doc false
@spec rtsp_schema() :: keyword()
def rtsp_schema, do: @rtsp_schema

def start_link(config) do
GenServer.start_link(__MODULE__, config, name: __MODULE__)
end
Expand All @@ -48,9 +127,14 @@ defmodule Shinkai.Config do

app_configs = Enum.map(@top_level_keys, &{&1, Application.get_env(:shinkai, &1, [])})

Enum.map(@default_config, fn {key, config} ->
app_configs =
@top_level_keys
|> Enum.map(&{&1, []})
|> Keyword.merge(app_configs)
|> parse_and_validate()

Enum.map(app_configs, fn {key, config} ->
config
|> Keyword.merge(app_configs[key])
|> Keyword.merge(user_config[key] || [])
|> then(&{key, &1})
end)
Expand Down Expand Up @@ -95,129 +179,41 @@ defmodule Shinkai.Config do

defp parse_and_validate([], acc), do: acc

defp parse_and_validate([{:hls, hls_config} | rest], acc) do
hls_config = parse_and_validate_hls(hls_config)
parse_and_validate(rest, [{:hls, hls_config} | acc])
end

defp parse_and_validate([{:server, server_config} | rest], acc) do
server_config = parse_and_validate_server(server_config)
parse_and_validate(rest, [{:server, server_config} | acc])
end

defp parse_and_validate([{:rtmp, rtmp_config} | rest], acc) do
rtmp_config = parse_and_validate_rtmp(rtmp_config)
parse_and_validate(rest, [{:rtmp, rtmp_config} | acc])
end

defp parse_and_validate_hls(config, acc \\ [])

defp parse_and_validate_hls(nil, _acc), do: []
defp parse_and_validate_hls([], acc), do: acc

defp parse_and_validate_hls(config, acc) when is_map(config) do
parse_and_validate_hls(Map.to_list(config), acc)
end

defp parse_and_validate_hls([{:segment_type, value} | rest], acc)
when value in [:fmp4, :mpeg_ts, :low_latency] do
parse_and_validate_hls(rest, [{:segment_type, value} | acc])
end

defp parse_and_validate_hls([{"segment_type", value} | rest], acc)
when value in ["fmp4", "mpeg_ts", "low_latency"] do
parse_and_validate_hls(rest, [{:segment_type, String.to_atom(value)} | acc])
end

defp parse_and_validate_hls([{key, value} | rest], acc)
when key in ["segment_duration", :segment_duration] and is_integer(value) and value >= 1000 do
parse_and_validate_hls(rest, [{:segment_duration, value} | acc])
end

defp parse_and_validate_hls([{key, value} | rest], acc)
when key in ["max_segments", :max_segments] and is_integer(value) and value > 3 do
parse_and_validate_hls(rest, [{:max_segments, value} | acc])
end

defp parse_and_validate_hls([{key, value} | rest], acc)
when key in ["part_duration", :part_duration] and is_integer(value) and value >= 100 and
value < 1000 do
parse_and_validate_hls(rest, [{:part_duration, value} | acc])
end

defp parse_and_validate_hls([{key, value} | rest], acc)
when key in ["storage_dir", :storage_dir] do
parse_and_validate_hls(rest, [{:storage_dir, value} | acc])
end

defp parse_and_validate_hls([{key, value} | _rest], _acc) do
raise ArgumentError, """
Invalid HLS configuration key or value detected.
Key: #{inspect(key)}, Value: #{inspect(value)}.
"""
end
defp parse_and_validate([{key, config} | rest], acc) do
config =
case key do
:hls -> do_parse_and_validate(config, @hls_schema)
:server -> do_parse_and_validate(config, @server_schema)
:rtmp -> do_parse_and_validate(config, @rtmp_schema)
:rtsp -> do_parse_and_validate(config, @rtsp_schema)
end

defp parse_and_validate_hls(config, _acc) do
raise ArgumentError, """
Invalid HLS configuration format detected.
Config: #{inspect(config)}.
"""
parse_and_validate(rest, [{key, config} | acc])
end

# HTTP server
defp parse_and_validate_server(config, acc \\ [])
defp parse_and_validate_server(nil, _acc), do: []
defp parse_and_validate_server([], acc), do: acc
defp do_parse_and_validate(config, schema) do
config = config || []

defp parse_and_validate_server(config, acc) when is_map(config) do
parse_and_validate_server(Map.to_list(config), acc)
end

defp parse_and_validate_server([{key, value} | rest], acc)
when key in ["enabled", :enabled] and is_boolean(value) do
parse_and_validate_server(rest, [{:enabled, value} | acc])
end

defp parse_and_validate_server([{key, value} | rest], acc)
when key in [:port, "port"] and is_integer(value) and value > 0 and value < 65_536 do
parse_and_validate_server(rest, [{:port, value} | acc])
end

defp parse_and_validate_server([{key, value} | rest], acc)
when key in ["certfile", "keyfile", :certfile, :keyfile] do
parse_and_validate_server(rest, [{String.to_atom(key), value} | acc])
end
cond do
Keyword.keyword?(config) ->
NimbleOptions.validate!(config, schema)

defp parse_and_validate_server([{key, value} | _rest], _acc) do
raise ArgumentError, """
Invalid Server configuration key or value detected.
Key: #{inspect(key)}, Value: #{inspect(value)}.
"""
end
is_map(config) ->
config
|> Keyword.new(fn {k, v} -> {String.to_existing_atom(k), v} end)
|> NimbleOptions.validate!(schema)

# RTMP
defp parse_and_validate_rtmp(config, acc \\ [])
defp parse_and_validate_rtmp(nil, _acc), do: []
defp parse_and_validate_rtmp([], acc), do: acc

defp parse_and_validate_rtmp(config, acc) when is_map(config) do
parse_and_validate_rtmp(Map.to_list(config), acc)
end

defp parse_and_validate_rtmp([{key, value} | rest], acc)
when key in ["enabled", :enabled] and is_boolean(value) do
parse_and_validate_rtmp(rest, [{:enabled, value} | acc])
end

defp parse_and_validate_rtmp([{key, value} | rest], acc)
when key in [:port, "port"] and is_integer(value) and value > 0 and value < 65_536 do
parse_and_validate_rtmp(rest, [{:port, value} | acc])
true ->
raise ArgumentError, "Expected a map or keyword list received: #{inspect(config)}"
end
end

defp parse_and_validate_rtmp([{key, value} | _rest], _acc) do
raise ArgumentError, """
Invalid RTMP configuration key or value detected.
Key: #{inspect(key)}, Value: #{inspect(value)}.
"""
@doc false
def validate_hls_segment_type(value) do
cond do
value in [:mpeg_ts, :fmp4, :low_latency] -> {:ok, value}
value in ["mpeg_ts", "fmp4", "low_latency"] -> {:ok, String.to_atom(value)}
true -> {:error, value}
end
end
end
Loading
Loading