Skip to content
7 changes: 1 addition & 6 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,5 @@
"Mshort",
"dont",
"scrf"
],
"workbench.colorCustomizations": {
"activityBar.background": "#1D322B",
"titleBar.activeBackground": "#29463D",
"titleBar.activeForeground": "#F7FBF9"
}
]
}
66 changes: 55 additions & 11 deletions lib/reencodarr/ab_av1/crf_search.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ defmodule Reencodarr.AbAv1.CrfSearch do
alias Reencodarr.AbAv1.OutputParser
alias Reencodarr.Core.Parsers
alias Reencodarr.Core.Time
alias Reencodarr.CrfSearcher.Broadway.Producer
alias Reencodarr.Dashboard.Events
alias Reencodarr.ErrorHelpers
alias Reencodarr.Formatters
alias Reencodarr.{Media, Repo}
alias Reencodarr.Media
alias Reencodarr.Media.VideoStateMachine
alias Reencodarr.Repo

require Logger

Expand Down Expand Up @@ -149,6 +150,15 @@ defmodule Reencodarr.AbAv1.CrfSearch do

@impl true
def handle_cast({:crf_search, video, vmaf_percent}, %{port: :none} = state) do
# Mark video as crf_searching NOW that we're actually starting
case VideoStateMachine.transition_to_crf_searching(video) do
{:ok, _updated_video} ->
:ok

{:error, reason} ->
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}")
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message logs a warning when the state transition fails but then continues to process the CRF search anyway. This could lead to inconsistent state. Consider either handling the error properly by returning early, or updating the log message to clarify that processing continues despite the state transition failure.

Suggested change
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}")
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}. Aborting CRF search for this video.")
# Do not proceed with CRF search if state transition fails
{:noreply, state}

Copilot uses AI. Check for mistakes.
end

args = build_crf_search_args(video, vmaf_percent)

new_state = %{
Expand Down Expand Up @@ -229,12 +239,35 @@ defmodule Reencodarr.AbAv1.CrfSearch do
state
) do
full_line = buffer <> line
process_line(full_line, video, args, target_vmaf)

# Add the line to our output buffer for failure tracking
new_output_buffer = [full_line | output_buffer]
try do
process_line(full_line, video, args, target_vmaf)

# Add the line to our output buffer for failure tracking
new_output_buffer = [full_line | output_buffer]

{:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}}
rescue
e in Exqlite.Error ->
# Database is busy — don't crash. Requeue the same port message with a short delay.
Logger.warning(
"CrfSearch: Database busy while upserting VMAF, requeuing line: #{inspect(e)}"
)

# Re-send the exact same message after a short backoff to retry
Process.send_after(self(), {port, {:data, {:eol, line}}}, 200)

# Keep state intact — we'll retry later
{:noreply, state}

e in DBConnection.ConnectionError ->
Logger.warning(
"CrfSearch: DB connection error while upserting VMAF, requeuing line: #{inspect(e)}"
)

{:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}}
Process.send_after(self(), {port, {:data, {:eol, line}}}, 200)
{:noreply, state}
Comment on lines +250 to +269
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry logic for Exqlite.Error and DBConnection.ConnectionError is duplicated with identical behavior (200ms delay, same message requeue). This duplication should be consolidated into a single rescue clause handling both error types, or extracted into a helper function.

Copilot uses AI. Check for mistakes.
end
end

@impl true
Expand Down Expand Up @@ -417,17 +450,28 @@ defmodule Reencodarr.AbAv1.CrfSearch do
# Reset to clean state
clean_state = %{port: :none, current_task: :none, partial_line_buffer: "", output_buffer: []}

# Notify producer that we're available again
Producer.dispatch_available()
# With new simple Broadway design, no need to notify - it polls automatically

{:reply, :ok, clean_state}
end

# Private helper functions
defp perform_crf_search_cleanup(state) do
# Notify the Broadway producer that CRF search is now available
Producer.dispatch_available()

# Cleanup after CRF search when a video task was active - broadcasts completion event
defp perform_crf_search_cleanup(%{current_task: %{video: video}} = state) do
# Broadcast completion event via unified Events system
Events.broadcast_event(:crf_search_completed, %{
video_id: video.id,
result: :ok
})

new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""}
{:noreply, new_state}
end

# Cleanup after CRF search when no video task was active - just reset state
defp perform_crf_search_cleanup(state) do
# No current task, just reset state
new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""}
{:noreply, new_state}
end
Expand Down
118 changes: 104 additions & 14 deletions lib/reencodarr/ab_av1/encode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ defmodule Reencodarr.AbAv1.Encode do
end
end

@spec available?() :: boolean()
def available? do
# Check if encoder is available (not currently encoding)
case GenServer.whereis(__MODULE__) do
nil ->
false

pid ->
try do
GenServer.call(pid, :available?, 100)
catch
:exit, _ -> false
end
end
end

# GenServer Callbacks
@impl true
def init(:ok) do
Expand All @@ -54,6 +70,12 @@ defmodule Reencodarr.AbAv1.Encode do
{:reply, status, state}
end

@impl true
def handle_call(:available?, _from, %{port: port} = state) do
available = port == :none
{:reply, available, state}
end

@impl true
def handle_cast(
{:encode, %Vmaf{params: _params} = vmaf},
Expand Down Expand Up @@ -115,23 +137,91 @@ defmodule Reencodarr.AbAv1.Encode do
# Notify the Broadway producer that encoding is now available
Producer.dispatch_available()

if result == {:ok, :success} do
notify_encoder_success(vmaf.video, output_file)
else
notify_encoder_failure(vmaf.video, exit_code)
# Attempt to notify success/failure, but don't crash if DB is busy — retry later
notify_result =
try do
if result == {:ok, :success} do
notify_encoder_success(vmaf.video, output_file)
else
notify_encoder_failure(vmaf.video, exit_code)
end

:ok
rescue
e in Exqlite.Error ->
Logger.warning(
"Encode: Database busy while handling exit_status, scheduling retry: #{inspect(e)}"
)

# Retry after short backoff. Keep current state until retry completes.
Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 200)
:retry_scheduled
Comment on lines +150 to +158
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded retry delay of 200ms appears in multiple places (lines 157, 200). This magic number should be extracted to a module attribute like @db_retry_delay_ms for consistency and easier tuning.

Copilot uses AI. Check for mistakes.
end

case notify_result do
:ok ->
new_state = %{
state
| port: :none,
video: :none,
vmaf: :none,
output_file: nil,
partial_line_buffer: "",
last_progress: nil
}

{:noreply, new_state}

:retry_scheduled ->
# Keep state intact so retry handler has data available
{:noreply, state}
end
end

new_state = %{
state
| port: :none,
video: :none,
vmaf: :none,
output_file: nil,
partial_line_buffer: "",
last_progress: nil
}
@impl true
def handle_info({:encoding_exit_retry, exit_code, vmaf, output_file}, state) do
Logger.info("Encode: retrying exit_status handling for VMAF #{vmaf.id}")

retry_result =
try do
if exit_code == 0 do
notify_encoder_success(vmaf.video, output_file)
else
notify_encoder_failure(vmaf.video, exit_code)
end

:ok
rescue
e in Exqlite.Error ->
Logger.warning(
"Encode: retry failed due to DB busy: #{inspect(e)}, scheduling another retry"
)

Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 500)
:retry_scheduled
end

{:noreply, new_state}
case retry_result do
:ok ->
# Clean up state now that notification succeeded
cleared_state = %{
state
| port: :none,
video: :none,
vmaf: :none,
output_file: nil,
partial_line_buffer: "",
last_progress: nil
}

# Ensure Broadway knows encoder is available
Producer.dispatch_available()

{:noreply, cleared_state}

:retry_scheduled ->
{:noreply, state}
end
end

# Catch-all for any other port messages
Expand Down
Loading