-
Notifications
You must be signed in to change notification settings - Fork 0
Fix Broadway pipelines stopping after each item #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fcc2bc3
511d455
54b438e
e09851c
6a0b74c
4d12028
bed2edc
e42716d
bb5b921
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,11 +14,12 @@ defmodule Reencodarr.AbAv1.CrfSearch do | |
| alias Reencodarr.AbAv1.OutputParser | ||
| alias Reencodarr.Core.Parsers | ||
| alias Reencodarr.Core.Time | ||
| alias Reencodarr.CrfSearcher.Broadway.Producer | ||
| alias Reencodarr.Dashboard.Events | ||
| alias Reencodarr.ErrorHelpers | ||
| alias Reencodarr.Formatters | ||
| alias Reencodarr.{Media, Repo} | ||
| alias Reencodarr.Media | ||
| alias Reencodarr.Media.VideoStateMachine | ||
| alias Reencodarr.Repo | ||
|
|
||
| require Logger | ||
|
|
||
|
|
@@ -149,6 +150,15 @@ defmodule Reencodarr.AbAv1.CrfSearch do | |
|
|
||
| @impl true | ||
| def handle_cast({:crf_search, video, vmaf_percent}, %{port: :none} = state) do | ||
| # Mark video as crf_searching NOW that we're actually starting | ||
| case VideoStateMachine.transition_to_crf_searching(video) do | ||
| {:ok, _updated_video} -> | ||
| :ok | ||
|
|
||
| {:error, reason} -> | ||
| Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") | ||
| end | ||
|
|
||
| args = build_crf_search_args(video, vmaf_percent) | ||
|
|
||
| new_state = %{ | ||
|
|
@@ -229,12 +239,35 @@ defmodule Reencodarr.AbAv1.CrfSearch do | |
| state | ||
| ) do | ||
| full_line = buffer <> line | ||
| process_line(full_line, video, args, target_vmaf) | ||
|
|
||
| # Add the line to our output buffer for failure tracking | ||
| new_output_buffer = [full_line | output_buffer] | ||
| try do | ||
| process_line(full_line, video, args, target_vmaf) | ||
|
|
||
| # Add the line to our output buffer for failure tracking | ||
| new_output_buffer = [full_line | output_buffer] | ||
|
|
||
| {:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}} | ||
| rescue | ||
| e in Exqlite.Error -> | ||
| # Database is busy — don't crash. Requeue the same port message with a short delay. | ||
| Logger.warning( | ||
| "CrfSearch: Database busy while upserting VMAF, requeuing line: #{inspect(e)}" | ||
| ) | ||
|
|
||
| # Re-send the exact same message after a short backoff to retry | ||
| Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) | ||
|
|
||
| # Keep state intact — we'll retry later | ||
| {:noreply, state} | ||
|
|
||
| e in DBConnection.ConnectionError -> | ||
| Logger.warning( | ||
| "CrfSearch: DB connection error while upserting VMAF, requeuing line: #{inspect(e)}" | ||
| ) | ||
|
|
||
| {:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}} | ||
| Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) | ||
| {:noreply, state} | ||
|
Comment on lines
+250
to
+269
|
||
| end | ||
| end | ||
|
|
||
| @impl true | ||
|
|
@@ -417,17 +450,28 @@ defmodule Reencodarr.AbAv1.CrfSearch do | |
| # Reset to clean state | ||
| clean_state = %{port: :none, current_task: :none, partial_line_buffer: "", output_buffer: []} | ||
|
|
||
| # Notify producer that we're available again | ||
| Producer.dispatch_available() | ||
| # With new simple Broadway design, no need to notify - it polls automatically | ||
|
|
||
| {:reply, :ok, clean_state} | ||
| end | ||
|
|
||
| # Private helper functions | ||
| defp perform_crf_search_cleanup(state) do | ||
| # Notify the Broadway producer that CRF search is now available | ||
| Producer.dispatch_available() | ||
|
|
||
| # Cleanup after CRF search when a video task was active - broadcasts completion event | ||
| defp perform_crf_search_cleanup(%{current_task: %{video: video}} = state) do | ||
mjc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Broadcast completion event via unified Events system | ||
| Events.broadcast_event(:crf_search_completed, %{ | ||
| video_id: video.id, | ||
| result: :ok | ||
| }) | ||
|
|
||
| new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} | ||
| {:noreply, new_state} | ||
| end | ||
|
|
||
| # Cleanup after CRF search when no video task was active - just reset state | ||
| defp perform_crf_search_cleanup(state) do | ||
| # No current task, just reset state | ||
| new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} | ||
| {:noreply, new_state} | ||
| end | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,22 @@ defmodule Reencodarr.AbAv1.Encode do | |
| end | ||
| end | ||
|
|
||
| @spec available?() :: boolean() | ||
| def available? do | ||
| # Check if encoder is available (not currently encoding) | ||
| case GenServer.whereis(__MODULE__) do | ||
| nil -> | ||
| false | ||
|
|
||
| pid -> | ||
| try do | ||
| GenServer.call(pid, :available?, 100) | ||
| catch | ||
| :exit, _ -> false | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # GenServer Callbacks | ||
| @impl true | ||
| def init(:ok) do | ||
|
|
@@ -54,6 +70,12 @@ defmodule Reencodarr.AbAv1.Encode do | |
| {:reply, status, state} | ||
| end | ||
|
|
||
| @impl true | ||
| def handle_call(:available?, _from, %{port: port} = state) do | ||
| available = port == :none | ||
| {:reply, available, state} | ||
| end | ||
|
|
||
| @impl true | ||
| def handle_cast( | ||
| {:encode, %Vmaf{params: _params} = vmaf}, | ||
|
|
@@ -115,23 +137,91 @@ defmodule Reencodarr.AbAv1.Encode do | |
| # Notify the Broadway producer that encoding is now available | ||
| Producer.dispatch_available() | ||
|
|
||
| if result == {:ok, :success} do | ||
| notify_encoder_success(vmaf.video, output_file) | ||
| else | ||
| notify_encoder_failure(vmaf.video, exit_code) | ||
| # Attempt to notify success/failure, but don't crash if DB is busy — retry later | ||
| notify_result = | ||
| try do | ||
| if result == {:ok, :success} do | ||
| notify_encoder_success(vmaf.video, output_file) | ||
| else | ||
| notify_encoder_failure(vmaf.video, exit_code) | ||
| end | ||
|
|
||
| :ok | ||
| rescue | ||
| e in Exqlite.Error -> | ||
| Logger.warning( | ||
| "Encode: Database busy while handling exit_status, scheduling retry: #{inspect(e)}" | ||
| ) | ||
|
|
||
| # Retry after short backoff. Keep current state until retry completes. | ||
| Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 200) | ||
| :retry_scheduled | ||
|
Comment on lines
+150
to
+158
|
||
| end | ||
|
|
||
| case notify_result do | ||
| :ok -> | ||
| new_state = %{ | ||
| state | ||
| | port: :none, | ||
| video: :none, | ||
| vmaf: :none, | ||
| output_file: nil, | ||
| partial_line_buffer: "", | ||
| last_progress: nil | ||
| } | ||
|
|
||
| {:noreply, new_state} | ||
|
|
||
| :retry_scheduled -> | ||
| # Keep state intact so retry handler has data available | ||
| {:noreply, state} | ||
| end | ||
| end | ||
|
|
||
| new_state = %{ | ||
| state | ||
| | port: :none, | ||
| video: :none, | ||
| vmaf: :none, | ||
| output_file: nil, | ||
| partial_line_buffer: "", | ||
| last_progress: nil | ||
| } | ||
| @impl true | ||
| def handle_info({:encoding_exit_retry, exit_code, vmaf, output_file}, state) do | ||
| Logger.info("Encode: retrying exit_status handling for VMAF #{vmaf.id}") | ||
|
|
||
| retry_result = | ||
| try do | ||
| if exit_code == 0 do | ||
| notify_encoder_success(vmaf.video, output_file) | ||
| else | ||
| notify_encoder_failure(vmaf.video, exit_code) | ||
| end | ||
|
|
||
| :ok | ||
| rescue | ||
| e in Exqlite.Error -> | ||
| Logger.warning( | ||
| "Encode: retry failed due to DB busy: #{inspect(e)}, scheduling another retry" | ||
| ) | ||
|
|
||
| Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 500) | ||
| :retry_scheduled | ||
| end | ||
|
|
||
| {:noreply, new_state} | ||
| case retry_result do | ||
| :ok -> | ||
| # Clean up state now that notification succeeded | ||
| cleared_state = %{ | ||
| state | ||
| | port: :none, | ||
| video: :none, | ||
| vmaf: :none, | ||
| output_file: nil, | ||
| partial_line_buffer: "", | ||
| last_progress: nil | ||
| } | ||
|
|
||
| # Ensure Broadway knows encoder is available | ||
| Producer.dispatch_available() | ||
|
|
||
| {:noreply, cleared_state} | ||
|
|
||
| :retry_scheduled -> | ||
| {:noreply, state} | ||
| end | ||
| end | ||
|
|
||
| # Catch-all for any other port messages | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message logs a warning when the state transition fails but then continues to process the CRF search anyway. This could lead to inconsistent state. Consider either handling the error properly by returning early, or updating the log message to clarify that processing continues despite the state transition failure.