Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 44 additions & 76 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,122 +13,88 @@ jobs:
build:
name: Build and test
runs-on: ubuntu-latest
container:
image: alpine:3.21

env:
MIX_ENV: test

steps:
- uses: actions/checkout@v4

- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
elixir-version: '1.19.2'
otp-version: '28.0'
version-type: 'strict'
id: beam

- name: Restore PLT cache
uses: actions/cache@v4
id: plt-cache
with:
path: priv/plts
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plt-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plt-

- name: Install system dependencies and FFmpeg
run: |
# Add community repository for 3.21 packages
echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/community" >> /etc/apk/repositories
apk update

# Install basic system dependencies including GNU tar for GitHub Actions caching
apk add --no-cache \
git \
bash \
curl \
build-base \
openssl-dev \
ncurses-dev \
zlib-dev \
sudo apt-get update
sudo apt-get install -y \
mediainfo \
cmake \
make \
gzip \
tar

# Verify tar installation for GitHub Actions caching compatibility
# GitHub Actions cache requires POSIX-compliant tar
which tar && tar --version | head -1

# Install FFmpeg with essential codec libraries (minimal set to avoid conflicts)
apk add --no-cache \
ffmpeg \
x264-libs \
x265-libs \
svt-av1

- name: Install Erlang and Elixir
run: |
# Install Erlang 27 and Elixir 1.19 from Alpine 3.21 (should be available in community repo)
apk add --no-cache \
erlang \
elixir

- uses: actions/checkout@v4
build-essential

- name: Cache rustup and cargo installation
- name: Cache Rust toolchain
uses: actions/cache@v4
id: rust-toolchain-cache
with:
path: |
~/.rustup
~/.cargo/registry/index
~/.cargo/registry/cache
~/.cargo/git
key: ${{ runner.os }}-alpine-rust-${{ hashFiles('**/Cargo.lock') }}-v1
~/.cargo/git/db
key: ${{ runner.os }}-rust-toolchain-v3
restore-keys: |
${{ runner.os }}-alpine-rust-
${{ runner.os }}-rust-toolchain-

- name: Cache ab-av1 binary
uses: actions/cache@v4
id: ab-av1-cache
with:
path: ~/.cargo/bin/ab-av1
key: ${{ runner.os }}-alpine-ab-av1-${{ hashFiles('~/.cargo/bin/ab-av1') }}-v1
key: ${{ runner.os }}-ab-av1-binary-v3
restore-keys: |
${{ runner.os }}-alpine-ab-av1-
${{ runner.os }}-ab-av1-binary-

- name: Install Rust toolchain
if: steps.rust-toolchain-cache.outputs.cache-hit != 'true'
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable

- name: Install ab-av1
if: steps.ab-av1-cache.outputs.cache-hit != 'true'
run: |
# Install rustup to get latest stable Rust (Alpine's is too old)
if [ ! -f ~/.cargo/env ]; then
apk add --no-cache curl
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
fi
source ~/.cargo/env
# Add cargo to PATH for this and future steps
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
# Install latest stable ab-av1 with newer Rust toolchain
cargo install ab-av1
cargo install ab-av1 --locked

- name: Setup cargo PATH
run: |
# Ensure cargo is available in PATH for subsequent steps
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
source ~/.cargo/env || true

- name: Cache Mix dependencies
- name: Cache Mix dependencies and build
uses: actions/cache@v4
id: mix-cache
with:
path: |
deps
_build/test/lib
key: ${{ runner.os }}-alpine-mix-${{ hashFiles('mix.lock') }}
restore-keys: |
${{ runner.os }}-alpine-mix-

- name: Cache compiled dependencies
uses: actions/cache@v4
with:
path: _build
key: ${{ runner.os }}-alpine-build-${{ hashFiles('mix.lock') }}-${{ hashFiles('lib/**/*.ex', 'test/**/*.exs', 'config/**/*.exs') }}
restore-keys: |
${{ runner.os }}-alpine-build-${{ hashFiles('mix.lock') }}-
${{ runner.os }}-alpine-build-

- name: Cache native dependencies
uses: actions/cache@v4
with:
path: |
deps/*/priv
deps/*/_build
key: ${{ runner.os }}-alpine-native-${{ hashFiles('mix.lock') }}
_build
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('**/mix.lock') }}-${{ hashFiles('lib/**/*.ex', 'config/**/*.exs') }}
restore-keys: |
${{ runner.os }}-alpine-native-
${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('**/mix.lock') }}-
${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-mix-

- name: Verify environment
run: |
Expand All @@ -139,9 +105,11 @@ jobs:
echo 'ffmpeg version:' && ffmpeg -version | head -1

- name: Install dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: mix deps.get

- name: Compile dependencies
if: steps.mix-cache.outputs.cache-hit != 'true'
run: mix deps.compile

- name: Compile project
Expand Down
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@
system: let
pkgs = import nixpkgs {inherit system;};
lib = pkgs.lib;
# Use latest OTP 28.1 with Elixir 1.19.0-rc.0 for cutting-edge features
# Use latest stable OTP 28 with Elixir 1.19
erlang = pkgs.erlang_28;
beamPackages = pkgs.beam.packagesWith erlang;
elixir = beamPackages.elixir.override {
erlang = erlang;
version = "1.19.0-rc.0";
src = pkgs.fetchurl {
url = "https://github.com/elixir-lang/elixir/archive/refs/tags/v${elixir.version}.tar.gz";
sha256 = "sha256-YvkDCI578h4SmtEA5XP2XQNjixDGIHdwIEuOa50Uh5E=";
};
};
elixir = beamPackages.elixir_1_19;
in {
# Docker image for the application
packages.dockerImage = pkgs.dockerTools.buildLayeredImage {
Expand Down Expand Up @@ -77,7 +70,6 @@
pkgs.curl
pkgs.docker-compose
pkgs.gnupg
pkgs.pinentry
pkgs.pinentry-curses
# Video processing tools for CI/dev
pkgs.ab-av1
Expand Down
37 changes: 21 additions & 16 deletions lib/reencodarr/analyzer/broadway/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do
def init(_opts) do
# Poll every 2 seconds to check for new work
schedule_poll()
{:producer, %{}}
{:producer, %{pending_demand: 0}}
end

@impl GenStage
def handle_demand(demand, state) when demand > 0 do
# Fetch up to 5 videos for batch processing
videos = Media.get_videos_needing_analysis(min(demand, 5))
Logger.debug("Analyzer: handle_demand(#{demand}) -> #{length(videos)} videos")
{:noreply, videos, state}
# Accumulate demand and fetch videos
new_demand = state.pending_demand + demand
dispatch(new_demand, state)
end

@impl GenStage
Expand All @@ -40,22 +39,28 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do
@impl GenStage
def handle_info(:poll, state) do
schedule_poll()
# Check if there's work and manually ask Broadway to pull
case Media.count_videos_needing_analysis() do
0 ->
{:noreply, [], state}

_count ->
# There's work available - return one video to wake up Broadway
videos = Media.get_videos_needing_analysis(1)
Logger.debug("Analyzer: poll wakeup -> #{length(videos)} videos")
{:noreply, videos, state}
end
# If there's pending demand, try to fulfill it
# This wakes up Broadway when new work appears
dispatch(state.pending_demand, state)
end

@impl GenStage
def handle_info(_msg, state), do: {:noreply, [], state}

defp dispatch(demand, state) when demand > 0 do
# Fetch up to 5 videos for batch processing
videos = Media.get_videos_needing_analysis(min(demand, 5))
remaining_demand = demand - length(videos)

Logger.debug(
"Analyzer: dispatch(#{demand}) -> #{length(videos)} videos, #{remaining_demand} remaining"
)

{:noreply, videos, %{state | pending_demand: remaining_demand}}
end

defp dispatch(_demand, state), do: {:noreply, [], state}

defp schedule_poll do
Process.send_after(self(), :poll, 2000)
end
Expand Down
37 changes: 18 additions & 19 deletions lib/reencodarr/crf_searcher/broadway/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,39 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do
def init(_opts) do
# Poll every 2 seconds to check for new work
schedule_poll()
{:producer, %{}}
{:producer, %{pending_demand: 0}}
end

@impl GenStage
def handle_demand(_demand, state) do
dispatch(state)
def handle_demand(demand, state) do
new_demand = state.pending_demand + demand
dispatch(new_demand, state)
end

@impl GenStage
def handle_info(:poll, state) do
schedule_poll()
# Check if there's work and CrfSearch is available, wake up Broadway if so
if CrfSearch.available?() do
case Media.get_videos_for_crf_search(1) do
[] -> {:noreply, [], state}
videos -> {:noreply, videos, state}
end
else
{:noreply, [], state}
end
# If there's pending demand, try to fulfill it
dispatch(state.pending_demand, state)
end

@impl GenStage
def handle_info(_msg, state), do: {:noreply, [], state}

defp dispatch(state) do
if CrfSearch.available?() do
videos = Media.get_videos_for_crf_search(1)
{:noreply, videos, state}
else
{:noreply, [], state}
end
defp dispatch(demand, state) when demand > 0 do
videos =
if CrfSearch.available?() do
Media.get_videos_for_crf_search(1)
else
[]
end

remaining_demand = demand - length(videos)
{:noreply, videos, %{state | pending_demand: remaining_demand}}
end

defp dispatch(_demand, state), do: {:noreply, [], state}

defp schedule_poll do
Process.send_after(self(), :poll, 2000)
end
Expand Down
21 changes: 21 additions & 0 deletions lib/reencodarr/encoder/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ defmodule Reencodarr.Encoder.Broadway do
alias Reencodarr.AbAv1.ProgressParser
alias Reencodarr.Dashboard.Events
alias Reencodarr.Encoder.Broadway.Producer
alias Reencodarr.Media.VideoStateMachine
alias Reencodarr.PostProcessor
alias Reencodarr.Repo

@typedoc "VMAF struct for encoding processing"
@type vmaf :: %{id: integer(), video: map()}
Expand Down Expand Up @@ -169,6 +171,25 @@ defmodule Reencodarr.Encoder.Broadway do
defp process_vmaf_encoding(vmaf, context) do
Logger.info("Broadway: Starting encoding for VMAF #{vmaf.id}: #{vmaf.video.path}")

# Mark video as encoding NOW that we're actually starting
case VideoStateMachine.transition_to_encoding(vmaf.video) do
{:ok, changeset} ->
case Repo.update(changeset) do
{:ok, _updated_video} ->
:ok

{:error, reason} ->
Logger.warning(
"Failed to mark video #{vmaf.video.id} as encoding: #{inspect(reason)}"
)
end

{:error, reason} ->
Logger.warning(
"Failed to transition video #{vmaf.video.id} to encoding: #{inspect(reason)}"
)
end

# Broadcast initial encoding progress at 0%
Events.broadcast_event(:encoding_started, %{
video_id: vmaf.video.id,
Expand Down
Loading