From fcc2bc39832a17efdf844ca424b719a946c9800b Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Thu, 23 Oct 2025 12:23:58 -0600 Subject: [PATCH 1/9] Fix Broadway pipelines stopping after each item - Fix encoder producer: decrement demand when dispatching VMAFs so Broadway calls handle_demand again - Fix pattern matching for completion events in all three pipelines: - Encoder: changed from 3-tuple destructure to 2-tuple with event_data variable - CRF searcher: changed from 3-tuple to 2-tuple matching - Analyzer: changed to explicitly match 2-tuple with data variable The issue was that producers were not correctly handling completion events because the pattern matching didn't match the actual broadcast event structure (2-tuple {:event_name, data_map} not 3-tuple). This meant completion handlers never fired, so producers never transitioned state or dispatched the next item. --- lib/reencodarr/analyzer/broadway/producer.ex | 2 +- lib/reencodarr/crf_searcher/broadway/producer.ex | 2 +- lib/reencodarr/encoder/broadway/producer.ex | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/reencodarr/analyzer/broadway/producer.ex b/lib/reencodarr/analyzer/broadway/producer.ex index d9014989..fa7e59ce 100644 --- a/lib/reencodarr/analyzer/broadway/producer.ex +++ b/lib/reencodarr/analyzer/broadway/producer.ex @@ -199,7 +199,7 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do end @impl GenStage - def handle_info({:batch_analysis_completed, _batch_size}, state) do + def handle_info({:batch_analysis_completed, _data}, state) do # Batch analysis completed - use state machine to determine next state Logger.debug("Producer: Received batch analysis completion notification") diff --git a/lib/reencodarr/crf_searcher/broadway/producer.ex b/lib/reencodarr/crf_searcher/broadway/producer.ex index ec841a71..52898b10 100644 --- a/lib/reencodarr/crf_searcher/broadway/producer.ex +++ b/lib/reencodarr/crf_searcher/broadway/producer.ex @@ -164,7 +164,7 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do end @impl GenStage - def handle_info({:crf_search_completed, _video_id, _result}, state) do + def handle_info({:crf_search_completed, _data}, state) do # CRF search completed - transition state appropriately current_state = PipelineStateMachine.get_state(state.pipeline) diff --git a/lib/reencodarr/encoder/broadway/producer.ex b/lib/reencodarr/encoder/broadway/producer.ex index e146c0cb..3ccf33f3 100644 --- a/lib/reencodarr/encoder/broadway/producer.ex +++ b/lib/reencodarr/encoder/broadway/producer.ex @@ -187,10 +187,10 @@ defmodule Reencodarr.Encoder.Broadway.Producer do end @impl GenStage - def handle_info({:encoding_completed, %{vmaf_id: vmaf_id, result: result} = event_data}, state) do + def handle_info({:encoding_completed, event_data}, state) do # Encoding completed (success or failure), transition back to running Logger.info( - "[Encoder Producer] *** RECEIVED ENCODING COMPLETION *** - VMAF: #{vmaf_id}, result: #{inspect(result)}, event: #{inspect(event_data)}" + "[Encoder Producer] *** RECEIVED ENCODING COMPLETION *** - VMAF: #{event_data.vmaf_id}, result: #{inspect(event_data.result)}, event: #{inspect(event_data)}" ) current_status = PipelineStateMachine.get_state(state.pipeline) @@ -350,19 +350,19 @@ defmodule Reencodarr.Encoder.Broadway.Producer do # Handle case where a single VMAF is returned %Reencodarr.Media.Vmaf{} = vmaf -> Logger.debug( - "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, keeping demand: #{state.demand}" + "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, decrementing demand from #{state.demand} to #{state.demand - 1}" ) - final_state = %{updated_state | demand: state.demand} + final_state = %{updated_state | demand: state.demand - 1} {:noreply, [vmaf], final_state} # Handle case where a list is returned [vmaf | _] -> Logger.debug( - "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, keeping demand: #{state.demand}" + "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, decrementing demand from #{state.demand} to #{state.demand - 1}" ) - final_state = %{updated_state | demand: state.demand} + final_state = %{updated_state | demand: state.demand - 1} {:noreply, [vmaf], final_state} # Handle case where empty list or nil is returned From 511d4550ea9ce64344d1cd35464604d962344e00 Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Fri, 24 Oct 2025 15:29:13 -0600 Subject: [PATCH 2/9] Simplify CRF searcher producer to just return videos one at a time - Remove complex availability checking and demand tracking - Let CrfSearch GenServer handle queueing via its mailbox - Trust VideoStateMachine to prevent duplicate sends via state transitions --- lib/reencodarr/ab_av1/crf_search.ex | 7 +- lib/reencodarr/crf_searcher.ex | 22 +- lib/reencodarr/crf_searcher/broadway.ex | 74 ++--- .../crf_searcher/broadway/producer.ex | 281 +----------------- lib/reencodarr/crf_searcher/supervisor.ex | 33 +- lib/reencodarr/pipeline_controller.ex | 0 lib/reencodarr_web/live/dashboard_live.ex | 4 +- 7 files changed, 78 insertions(+), 343 deletions(-) create mode 100644 lib/reencodarr/pipeline_controller.ex diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index 8ad006e1..63be8df3 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -14,7 +14,6 @@ defmodule Reencodarr.AbAv1.CrfSearch do alias Reencodarr.AbAv1.OutputParser alias Reencodarr.Core.Parsers alias Reencodarr.Core.Time - alias Reencodarr.CrfSearcher.Broadway.Producer alias Reencodarr.Dashboard.Events alias Reencodarr.ErrorHelpers alias Reencodarr.Formatters @@ -417,16 +416,14 @@ defmodule Reencodarr.AbAv1.CrfSearch do # Reset to clean state clean_state = %{port: :none, current_task: :none, partial_line_buffer: "", output_buffer: []} - # Notify producer that we're available again - Producer.dispatch_available() + # With new simple Broadway design, no need to notify - it polls automatically {:reply, :ok, clean_state} end # Private helper functions defp perform_crf_search_cleanup(state) do - # Notify the Broadway producer that CRF search is now available - Producer.dispatch_available() + # With new simple Broadway design, no need to notify - it polls automatically new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} {:noreply, new_state} diff --git a/lib/reencodarr/crf_searcher.ex b/lib/reencodarr/crf_searcher.ex index dc06fd80..001fe9a7 100644 --- a/lib/reencodarr/crf_searcher.ex +++ b/lib/reencodarr/crf_searcher.ex @@ -6,33 +6,33 @@ defmodule Reencodarr.CrfSearcher do Broadway pipeline that performs VMAF quality targeting searches on analyzed videos. """ - alias Reencodarr.CrfSearcher.Broadway.Producer + alias Reencodarr.CrfSearcher.Broadway alias Reencodarr.Media # Control functions @doc "Start/resume the CRF searcher pipeline" - def start, do: Producer.resume() + def start, do: Broadway.resume() @doc "Pause the CRF searcher pipeline" - def pause, do: Producer.pause() + def pause, do: Broadway.pause() @doc "Resume the CRF searcher pipeline (alias for start)" - def resume, do: Producer.resume() + def resume, do: Broadway.resume() - @doc "Force dispatch of available work" - def dispatch_available, do: Producer.dispatch_available() + @doc "Force dispatch of available work (no-op with new simple design)" + def dispatch_available, do: :ok - @doc "Queue a video for CRF search (typically called after analysis completes)" - def queue_video(video), do: Producer.add_video(video) + @doc "Queue a video for CRF search (no-op - videos pulled from DB automatically)" + def queue_video(_video), do: :ok # Status functions - @doc "Check if the CRF searcher is running (user intent)" - def running?, do: Producer.running?() + @doc "Check if the CRF searcher is running" + def running?, do: Broadway.running?() @doc "Check if the CRF searcher is actively processing work" - def actively_running?, do: Producer.actively_running?() + def actively_running?, do: Broadway.running?() @doc "Check if the CRF search GenServer is available" def available? do diff --git a/lib/reencodarr/crf_searcher/broadway.ex b/lib/reencodarr/crf_searcher/broadway.ex index acc52a85..44ce77dd 100644 --- a/lib/reencodarr/crf_searcher/broadway.ex +++ b/lib/reencodarr/crf_searcher/broadway.ex @@ -88,75 +88,43 @@ defmodule Reencodarr.CrfSearcher.Broadway do end @doc """ - Add a video to the pipeline for CRF search processing. - - ## Parameters - * `video` - Video struct containing id and path - - ## Examples - iex> video = %{id: 1, path: "/path/to/video.mp4"} - iex> Reencodarr.CrfSearcher.Broadway.process_video(video) - :ok - """ - @spec process_video(video()) :: :ok | {:error, term()} - def process_video(video) do - case Producer.add_video(video) do - :ok -> :ok - {:error, reason} -> {:error, reason} - end - end - - @doc """ - Check if the CRF searcher pipeline is running (not paused). - - ## Examples - iex> Reencodarr.CrfSearcher.Broadway.running?() - true + Check if the CRF searcher pipeline is running. """ @spec running?() :: boolean() def running? do - with pid when is_pid(pid) <- Process.whereis(__MODULE__), - true <- Process.alive?(pid) do - Producer.running?() - else - _ -> false + case Process.whereis(__MODULE__) do + nil -> false + pid -> Process.alive?(pid) end end @doc """ - Pause the CRF searcher pipeline. - - Pauses processing by updating the producer's state machine. - - ## Examples - iex> Reencodarr.CrfSearcher.Broadway.pause() - :ok + Pause by stopping the Broadway pipeline. """ - @spec pause() :: :ok | {:error, term()} + @spec pause() :: :ok def pause do - Producer.pause() + Logger.info("[CRF Searcher] Stopping Broadway pipeline") + Reencodarr.CrfSearcher.Supervisor.stop_broadway() + :ok end @doc """ - Resume the CRF searcher pipeline. - - Resumes processing by updating the producer's state machine. - - ## Examples - iex> Reencodarr.CrfSearcher.Broadway.resume() - :ok + Resume by starting the Broadway pipeline. """ - @spec resume() :: :ok | {:error, term()} + @spec resume() :: :ok def resume do - Producer.resume() - end + Logger.info("[CRF Searcher] Starting Broadway pipeline") - @doc """ - Start the CRF searcher pipeline. + case Reencodarr.CrfSearcher.Supervisor.start_broadway() do + {:ok, _pid} -> :ok + {:error, :already_started} -> :ok + {:error, {:already_started, _pid}} -> :ok + error -> error + end + end - Alias for `resume/0` to maintain API compatibility. - """ - @spec start() :: :ok | {:error, term()} + @doc "Alias for resume" + @spec start() :: :ok def start, do: resume() # Broadway callbacks diff --git a/lib/reencodarr/crf_searcher/broadway/producer.ex b/lib/reencodarr/crf_searcher/broadway/producer.ex index 52898b10..2bdbc48e 100644 --- a/lib/reencodarr/crf_searcher/broadway/producer.ex +++ b/lib/reencodarr/crf_searcher/broadway/producer.ex @@ -1,295 +1,36 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @moduledoc """ - Broadway producer for CRF search operations. - - This producer dispatches videos for CRF search only when the CRF search - GenServer is available, preventing duplicate work and resource waste. + Broadway producer for CRF search - just returns videos one at a time. + CrfSearch GenServer handles queueing via its mailbox. """ use GenStage require Logger - alias Reencodarr.AbAv1.CrfSearch - alias Reencodarr.Dashboard.Events + alias Reencodarr.Media - alias Reencodarr.PipelineStateMachine def start_link(opts) do GenStage.start_link(__MODULE__, opts, name: __MODULE__) end - # Public API for external control - def pause, do: send_to_producer(:pause) - def resume, do: send_to_producer(:resume) - def dispatch_available, do: send_to_producer(:dispatch_available) - def add_video(video), do: send_to_producer({:add_video, video}) - - # Alias for API compatibility - def start, do: resume() - - # Simplified - no cross-producer communication needed - # If the process exists, it's running - def running?, do: true - - # Check if actively processing by seeing if CRF searcher is busy - def actively_running? do - case CrfSearch.available?() do - # Available means not actively running - true -> false - # Not available means actively processing - false -> true - end - end - @impl GenStage def init(_opts) do - # Subscribe to video state transitions for videos that finished analysis - Phoenix.PubSub.subscribe(Reencodarr.PubSub, "video_state_transitions") - - # Initialize pipeline state machine - pipeline = PipelineStateMachine.new(:crf_searcher) - - {:producer, - %{ - demand: 0, - pipeline: pipeline - }} - end - - @impl GenStage - def handle_demand(demand, state) when demand > 0 do - current_status = PipelineStateMachine.get_state(state.pipeline) - - # Only accumulate demand if not currently processing - # CRF search is single-concurrency, so we shouldn't accept more demand while busy - # But we DO accept demand when paused (to allow resuming) - if current_status == :processing do - Logger.debug("[CRF Searcher Producer] Already processing, ignoring demand: #{demand}") - {:noreply, [], state} - else - new_state = %{state | demand: state.demand + demand} - dispatch_if_ready(new_state) - end + {:producer, %{}} end @impl GenStage - def handle_call(:running?, _from, state) do - # Button should reflect user intent - not running if paused or pausing - running = PipelineStateMachine.running?(state.pipeline) - {:reply, running, [], state} + def handle_demand(_demand, state) do + # Just return 1 video per demand - CrfSearch will handle queueing + videos = Media.get_videos_for_crf_search(1) + {:noreply, videos, state} end @impl GenStage - def handle_call(:actively_running?, _from, state) do - # Check if actively processing (for telemetry/progress updates) - actively_running = PipelineStateMachine.actively_working?(state.pipeline) - {:reply, actively_running, [], state} - end + def handle_info(_msg, state), do: {:noreply, [], state} @impl GenStage - def handle_cast({:status_request, requester_pid}, state) do - current_state = PipelineStateMachine.get_state(state.pipeline) - send(requester_pid, {:status_response, :crf_searcher, current_state}) - {:noreply, [], state} - end + def handle_cast(_msg, state), do: {:noreply, [], state} @impl GenStage - def handle_cast(:broadcast_status, state) do - # Broadcast actual current status to dashboard - current_state = PipelineStateMachine.get_state(state.pipeline) - - # Map pipeline state to dashboard status - status = - case current_state do - :processing -> :processing - :paused -> :paused - :running -> :running - _ -> :stopped - end - - # Broadcast as service_status event with the actual state - Events.broadcast_event(:service_status, %{service: :crf_searcher, status: status}) - - {:noreply, [], state} - end - - @impl GenStage - def handle_cast(:pause, state) do - new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) - {:noreply, [], new_state} - end - - @impl GenStage - def handle_cast(:resume, state) do - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.resume/1)) - end - - @impl GenStage - def handle_cast({:add_video, video}, state) do - Logger.info("Adding video to CRF searcher: #{video.path}") - # No manual queue management - just trigger dispatch to check database - # The video should already be in the database with :analyzed state - dispatch_if_ready(state) - end - - @impl GenStage - def handle_cast(:dispatch_available, state) do - case PipelineStateMachine.get_state(state.pipeline) do - :pausing -> - {:noreply, [], - Map.update!(state, :pipeline, &PipelineStateMachine.transition_to(&1, :paused))} - - _ -> - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.work_available/1)) - end - end - - @impl GenStage - def handle_info({:video_state_changed, video, :analyzed}, state) do - # Video finished analysis - if CRF searcher is running, force dispatch even without demand - Logger.debug("[CRF Searcher Producer] Received analyzed video: #{video.path}") - - current_state = PipelineStateMachine.get_state(state.pipeline) - - Logger.debug( - "[CRF Searcher Producer] State: #{inspect(%{status: current_state, demand: state.demand})}" - ) - - force_dispatch_if_running(state) - end - - @impl GenStage - def handle_info({:video_state_changed, _video, _other_state}, state) do - # Ignore other state transitions - CRF searcher only cares about :analyzed - {:noreply, [], state} - end - - @impl GenStage - def handle_info({:crf_search_completed, _data}, state) do - # CRF search completed - transition state appropriately - current_state = PipelineStateMachine.get_state(state.pipeline) - - case current_state do - :processing -> - # Work completed while running - check for more work - new_pipeline = - PipelineStateMachine.work_completed(state.pipeline, crf_search_available?()) - - updated_state = %{state | pipeline: new_pipeline} - dispatch_if_ready(updated_state) - {:noreply, [], updated_state} - - :pausing -> - # Work completed while pausing - transition to paused and stop - new_pipeline = PipelineStateMachine.work_completed(state.pipeline, false) - updated_state = %{state | pipeline: new_pipeline} - {:noreply, [], updated_state} - - _ -> - # Already paused or other state - just acknowledge completion - {:noreply, [], state} - end - end - - @impl GenStage - def handle_info(_msg, state) do - {:noreply, [], state} - end - - # Private functions - - defp send_to_producer(message) do - # Broadway manages producer names internally, so we need to get the actual name - case Broadway.producer_names(Reencodarr.CrfSearcher.Broadway) do - [producer_name | _] -> GenStage.cast(producer_name, message) - [] -> {:error, :not_found} - end - end - - defp dispatch_if_ready(state) do - if should_dispatch?(state) and state.demand > 0 do - dispatch_videos(state) - else - handle_no_dispatch_crf_searcher(state) - end - end - - defp handle_no_dispatch_crf_searcher(state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - - if PipelineStateMachine.available_for_work?(current_status) do - case Media.get_videos_for_crf_search(1) do - [] -> - # No videos to process - transition to idle - new_pipeline = PipelineStateMachine.transition_to(state.pipeline, :idle) - new_state = %{state | pipeline: new_pipeline} - {:noreply, [], new_state} - - [_video | _] -> - # Videos available but no demand or CRF service unavailable - {:noreply, [], state} - end - else - {:noreply, [], state} - end - end - - defp should_dispatch?(state) do - PipelineStateMachine.running?(state.pipeline) and crf_search_available?() - end - - defp crf_search_available? do - # Check if the CRF searcher is available (not busy with another video) - CrfSearch.available?() - end - - defp dispatch_videos(state) do - if state.demand > 0 and should_dispatch?(state) do - # Get videos directly from database - case Media.get_videos_for_crf_search(1) do - [] -> - {:noreply, [], state} - - [video | _] -> - Logger.info( - "🚀 CRF Producer: Dispatching video #{video.id} (#{video.title}) for CRF search" - ) - - # Mark as processing and decrement demand - new_pipeline = PipelineStateMachine.start_processing(state.pipeline) - updated_state = %{state | demand: state.demand - 1, pipeline: new_pipeline} - - {:noreply, [video], updated_state} - end - else - {:noreply, [], state} - end - end - - # Helper function to force dispatch when CRF searcher is running - defp force_dispatch_if_running(state) do - current_state = PipelineStateMachine.get_state(state.pipeline) - - cond do - not PipelineStateMachine.available_for_work?(current_state) -> - Logger.debug( - "[CRF Searcher Producer] Force dispatch - status: #{current_state}, not available for work, skipping dispatch" - ) - - {:noreply, [], state} - - not crf_search_available?() -> - Logger.debug("[CRF Searcher Producer] GenServer not available, skipping dispatch") - {:noreply, [], state} - - Media.get_videos_for_crf_search(1) == [] -> - {:noreply, [], state} - - true -> - Logger.debug( - "[CRF Searcher Producer] Force dispatching video to wake up idle Broadway pipeline" - ) - - dispatch_if_ready(state) - end - end + def handle_call(_msg, _from, state), do: {:reply, :ok, [], state} end diff --git a/lib/reencodarr/crf_searcher/supervisor.ex b/lib/reencodarr/crf_searcher/supervisor.ex index c0b5daac..a21389c0 100644 --- a/lib/reencodarr/crf_searcher/supervisor.ex +++ b/lib/reencodarr/crf_searcher/supervisor.ex @@ -9,11 +9,40 @@ defmodule Reencodarr.CrfSearcher.Supervisor do @impl true def init(:ok) do + # Only start the CrfSearch GenServer automatically + # The Broadway pipeline will be started/stopped dynamically via pause/resume children = [ - {Reencodarr.AbAv1.CrfSearch, []}, - {Reencodarr.CrfSearcher.Broadway, []} + {Reencodarr.AbAv1.CrfSearch, []} ] Supervisor.init(children, strategy: :one_for_one) end + + @doc """ + Start the Broadway pipeline under this supervisor if not already running. + """ + def start_broadway do + case Process.whereis(Reencodarr.CrfSearcher.Broadway) do + nil -> + spec = {Reencodarr.CrfSearcher.Broadway, []} + Supervisor.start_child(__MODULE__, spec) + + _pid -> + {:error, :already_started} + end + end + + @doc """ + Stop the Broadway pipeline if running. + """ + def stop_broadway do + case Process.whereis(Reencodarr.CrfSearcher.Broadway) do + nil -> + :ok + + _pid -> + Supervisor.terminate_child(__MODULE__, Reencodarr.CrfSearcher.Broadway) + Supervisor.delete_child(__MODULE__, Reencodarr.CrfSearcher.Broadway) + end + end end diff --git a/lib/reencodarr/pipeline_controller.ex b/lib/reencodarr/pipeline_controller.ex new file mode 100644 index 00000000..e69de29b diff --git a/lib/reencodarr_web/live/dashboard_live.ex b/lib/reencodarr_web/live/dashboard_live.ex index b5c1ca02..ef19608e 100644 --- a/lib/reencodarr_web/live/dashboard_live.ex +++ b/lib/reencodarr_web/live/dashboard_live.ex @@ -331,13 +331,13 @@ defmodule ReencodarrWeb.DashboardLive do @impl true def handle_event("start_crf_searcher", _params, socket) do - Reencodarr.CrfSearcher.Broadway.Producer.start() + Reencodarr.CrfSearcher.Broadway.resume() {:noreply, put_flash(socket, :info, "CRF Search started")} end @impl true def handle_event("pause_crf_searcher", _params, socket) do - Reencodarr.CrfSearcher.Broadway.Producer.pause() + Reencodarr.CrfSearcher.Broadway.pause() {:noreply, put_flash(socket, :info, "CRF Search paused")} end From 54b438e8d319a963a12fd87a325d63c0c168cb34 Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Fri, 24 Oct 2025 15:29:43 -0600 Subject: [PATCH 3/9] Mark videos as :crf_searching before sending to GenServer - Prevents Broadway from sending duplicate videos via handle_demand - VideoStateMachine transitions happen immediately before async cast - Producer's database query won't return videos in :crf_searching state --- lib/reencodarr/crf_searcher/broadway.ex | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/reencodarr/crf_searcher/broadway.ex b/lib/reencodarr/crf_searcher/broadway.ex index 44ce77dd..14714921 100644 --- a/lib/reencodarr/crf_searcher/broadway.ex +++ b/lib/reencodarr/crf_searcher/broadway.ex @@ -17,6 +17,7 @@ defmodule Reencodarr.CrfSearcher.Broadway do alias Broadway.Message alias Reencodarr.AbAv1 alias Reencodarr.CrfSearcher.Broadway.Producer + alias Reencodarr.Media.VideoStateMachine @typedoc "Video struct for CRF search processing" @type video :: %{id: integer(), path: binary()} @@ -171,13 +172,21 @@ defmodule Reencodarr.CrfSearcher.Broadway do defp process_video_crf_search(video, crf_quality) do Logger.debug("Starting CRF search for video #{video.id}: #{video.path}") - # AbAv1.crf_search/2 always returns :ok since it's a GenServer.cast - # The actual success/failure is handled by the GenServer - :ok = AbAv1.crf_search(video, crf_quality) + # Mark as crf_searching BEFORE sending to CrfSearch + # This prevents the producer from returning the same video again + case VideoStateMachine.transition_to_crf_searching(video) do + {:ok, _updated_video} -> + # Now send to CrfSearch - it's fire-and-forget via cast + :ok = AbAv1.crf_search(video, crf_quality) - Logger.debug("CRF search queued successfully for video #{video.id}") + Logger.debug("CRF search queued successfully for video #{video.id}") - :ok + :ok + + {:error, reason} -> + Logger.error("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") + {:error, reason} + end rescue exception -> error_message = From e09851cbd18f7bc859240f58a2e7d7c3b055e99c Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Fri, 24 Oct 2025 15:32:28 -0600 Subject: [PATCH 4/9] Fix pipeline status display to show idle/processing/stopped correctly - Simplify get_optimistic_service_status to just check if processes alive - Show :idle by default for running pipelines, :processing when work is active - Reset status back to :idle when operations complete (encoding_completed, crf_search_completed) - Add CrfSearcherBroadway alias for consistency --- lib/reencodarr_web/live/dashboard_live.ex | 34 +++++++++++++++++------ 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/lib/reencodarr_web/live/dashboard_live.ex b/lib/reencodarr_web/live/dashboard_live.ex index ef19608e..4251e087 100644 --- a/lib/reencodarr_web/live/dashboard_live.ex +++ b/lib/reencodarr_web/live/dashboard_live.ex @@ -8,6 +8,7 @@ defmodule ReencodarrWeb.DashboardLive do """ use ReencodarrWeb, :live_view + alias Reencodarr.CrfSearcher.Broadway, as: CrfSearcherBroadway alias Reencodarr.Dashboard.Events alias Reencodarr.Formatters alias Reencodarr.Media.VideoQueries @@ -193,13 +194,26 @@ defmodule ReencodarrWeb.DashboardLive do # Completion and reset handlers @impl true def handle_info({:encoding_completed, _data}, socket) do - # Reset progress - keep current status (paused/running) to preserve user's last action - {:noreply, assign(socket, :encoding_progress, :none)} + # Reset status back to idle when encoding completes + current_status = socket.assigns.service_status + updated_status = Map.put(current_status, :encoder, :idle) + + socket + |> assign(:encoding_progress, :none) + |> assign(:service_status, updated_status) + |> then(&{:noreply, &1}) end @impl true def handle_info({event, _data}, socket) when event in [:crf_search_completed] do - {:noreply, assign(socket, :crf_progress, :none)} + # Reset status back to idle when CRF search completes + current_status = socket.assigns.service_status + updated_status = Map.put(current_status, :crf_searcher, :idle) + + socket + |> assign(:crf_progress, :none) + |> assign(:service_status, updated_status) + |> then(&{:noreply, &1}) end # Special CRF search event handlers @@ -331,13 +345,13 @@ defmodule ReencodarrWeb.DashboardLive do @impl true def handle_event("start_crf_searcher", _params, socket) do - Reencodarr.CrfSearcher.Broadway.resume() + CrfSearcherBroadway.resume() {:noreply, put_flash(socket, :info, "CRF Search started")} end @impl true def handle_event("pause_crf_searcher", _params, socket) do - Reencodarr.CrfSearcher.Broadway.pause() + CrfSearcherBroadway.pause() {:noreply, put_flash(socket, :info, "CRF Search paused")} end @@ -655,11 +669,13 @@ defmodule ReencodarrWeb.DashboardLive do } end - # Optimistic service status - assume running if alive, let events correct it + # Simple service status - just check if processes are alive defp get_optimistic_service_status do - Map.new(@producer_modules, fn {service, module} -> - {service, if(Process.whereis(module), do: :running, else: :stopped)} - end) + %{ + analyzer: if(Process.whereis(@producer_modules.analyzer), do: :idle, else: :stopped), + crf_searcher: if(CrfSearcherBroadway.running?(), do: :idle, else: :stopped), + encoder: if(Process.whereis(@producer_modules.encoder), do: :idle, else: :stopped) + } end defp request_current_status do From 6a0b74c3fd9161b32ed12b189371d5bf93cb07cc Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Fri, 24 Oct 2025 15:38:30 -0600 Subject: [PATCH 5/9] Remove unused functions and empty file - Remove dispatch_available/0 and queue_video/1 from CrfSearcher (no longer needed) - Remove empty pipeline_controller.ex file - Format code to fix trailing whitespace --- lib/reencodarr/crf_searcher.ex | 6 ------ lib/reencodarr/crf_searcher/broadway.ex | 1 - lib/reencodarr/pipeline_controller.ex | 0 3 files changed, 7 deletions(-) delete mode 100644 lib/reencodarr/pipeline_controller.ex diff --git a/lib/reencodarr/crf_searcher.ex b/lib/reencodarr/crf_searcher.ex index 001fe9a7..a29cafb6 100644 --- a/lib/reencodarr/crf_searcher.ex +++ b/lib/reencodarr/crf_searcher.ex @@ -20,12 +20,6 @@ defmodule Reencodarr.CrfSearcher do @doc "Resume the CRF searcher pipeline (alias for start)" def resume, do: Broadway.resume() - @doc "Force dispatch of available work (no-op with new simple design)" - def dispatch_available, do: :ok - - @doc "Queue a video for CRF search (no-op - videos pulled from DB automatically)" - def queue_video(_video), do: :ok - # Status functions @doc "Check if the CRF searcher is running" diff --git a/lib/reencodarr/crf_searcher/broadway.ex b/lib/reencodarr/crf_searcher/broadway.ex index 14714921..b24b73a2 100644 --- a/lib/reencodarr/crf_searcher/broadway.ex +++ b/lib/reencodarr/crf_searcher/broadway.ex @@ -120,7 +120,6 @@ defmodule Reencodarr.CrfSearcher.Broadway do {:ok, _pid} -> :ok {:error, :already_started} -> :ok {:error, {:already_started, _pid}} -> :ok - error -> error end end diff --git a/lib/reencodarr/pipeline_controller.ex b/lib/reencodarr/pipeline_controller.ex deleted file mode 100644 index e69de29b..00000000 From 4d120284ccf97b61869bd48c76ded96f88b96f9f Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Mon, 27 Oct 2025 11:14:29 -0600 Subject: [PATCH 6/9] Fix Broadway pipelines stopping and simplify event system - Fix demand tracking in producers (don't discard demand when busy) - Track current_video_id in CRF producer to prevent duplicate dispatches - Unify all completion events to single Events.channel() (dashboard) - Remove separate PubSub topics (crf_search_events, video_state_transitions, etc) - Simplify status checks (actively_running based on actual GenServer state) - Add Encode.available?() function - Fix Core.Time.to_seconds to accept floats - Fix mkvpropedit image stream removal (use track UIDs not just attachments) - Broadcast crf_search_completed via Events instead of separate PubSub topic --- .vscode/settings.json | 7 +- lib/reencodarr/ab_av1/crf_search.ex | 28 ++- lib/reencodarr/ab_av1/encode.ex | 22 ++ lib/reencodarr/ab_av1/helper.ex | 188 ++++++++++++------ lib/reencodarr/analyzer/broadway/producer.ex | 26 +-- lib/reencodarr/core/time.ex | 18 +- lib/reencodarr/crf_searcher.ex | 13 +- lib/reencodarr/crf_searcher/broadway.ex | 35 ++-- .../crf_searcher/broadway/producer.ex | 55 ++++- lib/reencodarr/encoder.ex | 13 +- lib/reencodarr/encoder/broadway/producer.ex | 32 ++- 11 files changed, 286 insertions(+), 151 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 539e3aa4..89b7bfb4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -56,10 +56,5 @@ "Mshort", "dont", "scrf" - ], - "workbench.colorCustomizations": { - "activityBar.background": "#1D322B", - "titleBar.activeBackground": "#29463D", - "titleBar.activeForeground": "#F7FBF9" - } + ] } \ No newline at end of file diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index 63be8df3..4e5aa9c3 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -17,7 +17,10 @@ defmodule Reencodarr.AbAv1.CrfSearch do alias Reencodarr.Dashboard.Events alias Reencodarr.ErrorHelpers alias Reencodarr.Formatters - alias Reencodarr.{Media, Repo} + alias Reencodarr.Media + alias Reencodarr.Media.Video + alias Reencodarr.Media.VideoStateMachine + alias Reencodarr.Repo require Logger @@ -148,6 +151,15 @@ defmodule Reencodarr.AbAv1.CrfSearch do @impl true def handle_cast({:crf_search, video, vmaf_percent}, %{port: :none} = state) do + # Mark video as crf_searching NOW that we're actually starting + case VideoStateMachine.transition_to_crf_searching(video) do + {:ok, _updated_video} -> + :ok + + {:error, reason} -> + Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") + end + args = build_crf_search_args(video, vmaf_percent) new_state = %{ @@ -422,13 +434,23 @@ defmodule Reencodarr.AbAv1.CrfSearch do end # Private helper functions - defp perform_crf_search_cleanup(state) do - # With new simple Broadway design, no need to notify - it polls automatically + defp perform_crf_search_cleanup(%{current_task: %{video: video}} = state) do + # Broadcast completion event via unified Events system + Events.broadcast_event(:crf_search_completed, %{ + video_id: video.id, + result: :ok + }) new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} {:noreply, new_state} end + defp perform_crf_search_cleanup(state) do + # No current task, just reset state + new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} + {:noreply, new_state} + end + def process_line(line, video, args, target_vmaf \\ 95) do handlers = [ &handle_encoding_sample_line/2, diff --git a/lib/reencodarr/ab_av1/encode.ex b/lib/reencodarr/ab_av1/encode.ex index 45be3496..78b69604 100644 --- a/lib/reencodarr/ab_av1/encode.ex +++ b/lib/reencodarr/ab_av1/encode.ex @@ -34,6 +34,22 @@ defmodule Reencodarr.AbAv1.Encode do end end + @spec available?() :: boolean() + def available? do + # Check if encoder is available (not currently encoding) + case GenServer.whereis(__MODULE__) do + nil -> + false + + pid -> + try do + GenServer.call(pid, :available?, 100) + catch + :exit, _ -> false + end + end + end + # GenServer Callbacks @impl true def init(:ok) do @@ -54,6 +70,12 @@ defmodule Reencodarr.AbAv1.Encode do {:reply, status, state} end + @impl true + def handle_call(:available?, _from, %{port: port} = state) do + available = port == :none + {:reply, available, state} + end + @impl true def handle_cast( {:encode, %Vmaf{params: _params} = vmaf}, diff --git a/lib/reencodarr/ab_av1/helper.ex b/lib/reencodarr/ab_av1/helper.ex index b996689d..de870770 100644 --- a/lib/reencodarr/ab_av1/helper.ex +++ b/lib/reencodarr/ab_av1/helper.ex @@ -67,11 +67,11 @@ defmodule Reencodarr.AbAv1.Helper do Remove image attachments from MKV files to prevent FFmpeg encoding failures. Some MKV files contain attached JPEG/PNG images which cause FFmpeg to fail - with exit code 218. This function uses mkvpropedit to remove these attachments. + with exit code 218. This function uses mkvmerge to remove these attachments and streams. - Returns the cleaned file path (either original if no cleaning needed, or temp copy). + Always returns the path to use - either cleaned temp file or original if cleaning not needed/failed. """ - @spec clean_mkv_attachments(String.t()) :: {:ok, String.t()} | {:error, term()} + @spec clean_mkv_attachments(String.t()) :: {:ok, String.t()} def clean_mkv_attachments(file_path) do # Only process MKV files if String.ends_with?(file_path, [".mkv", ".MKV"]) do @@ -99,21 +99,29 @@ defmodule Reencodarr.AbAv1.Helper do defp check_for_image_attachments(file_path) do case System.cmd("mkvmerge", ["-i", file_path], stderr_to_stdout: true) do {output, 0} -> - # Check if output contains attachment info with image types - has_images = + # Check for: + # 1. Attachment info with image types + # 2. Video streams that are attached pictures (mjpeg, png with "attached pic" flag) + has_image_attachments = String.contains?(output, "attachment") and (String.contains?(output, "image/jpeg") or String.contains?(output, "image/jpg") or String.contains?(output, "image/png")) - {:ok, has_images} + # Check for image streams (these appear as tracks in mkvmerge output) + # Look for tracks that are likely cover art (small video tracks, mjpeg/png) + has_image_streams = + Regex.match?(~r/Track ID \d+: video \(MJPEG\)/i, output) or + Regex.match?(~r/Track ID \d+: video \(PNG\)/i, output) + + {:ok, has_image_attachments or has_image_streams} {_output, _exit_code} -> {:error, :mkvmerge_failed} end end - @spec remove_image_attachments(String.t()) :: {:ok, String.t()} | {:error, term()} + @spec remove_image_attachments(String.t()) :: {:ok, String.t()} defp remove_image_attachments(file_path) do # Create a cleaned copy in temp directory filename = Path.basename(file_path) @@ -121,67 +129,138 @@ defmodule Reencodarr.AbAv1.Helper do ext = Path.extname(filename) cleaned_path = Path.join(temp_dir(), "#{name_without_ext}_cleaned#{ext}") - Logger.info("Removing image attachments from #{file_path}") + Logger.info("Removing image attachments and streams from #{file_path}") - # Copy original file to temp location + # Copy file to temp location first case File.cp(file_path, cleaned_path) do :ok -> - process_attachment_removal(file_path, cleaned_path) + # Use mkvpropedit to delete tracks and attachments in-place + case remove_image_tracks_and_attachments(cleaned_path) do + :ok -> + Logger.info("Successfully cleaned image streams from #{file_path}") + {:ok, cleaned_path} + + {:error, reason} -> + Logger.warning( + "Failed to clean #{file_path}: #{inspect(reason)}, falling back to original" + ) + + File.rm(cleaned_path) + {:ok, file_path} + end {:error, reason} -> - Logger.error("Failed to copy file for cleaning: #{inspect(reason)}") - {:error, reason} + Logger.warning("Failed to copy file for cleaning: #{inspect(reason)}") + {:ok, file_path} end end - defp process_attachment_removal(original_path, cleaned_path) do - attachment_types = ["image/jpg", "image/jpeg", "image/png"] + defp remove_image_tracks_and_attachments(cleaned_path) do + # Get track info to find image streams + case get_image_track_uids(cleaned_path) do + {:ok, track_uids} -> + delete_tracks_and_attachments(cleaned_path, track_uids) - success = Enum.all?(attachment_types, &remove_attachment_type(&1, cleaned_path)) + {:error, reason} -> + # If we can't get track info, at least try to delete attachments + Logger.warning("Failed to get track info: #{inspect(reason)}, trying attachments only") + delete_image_attachments(cleaned_path) + end + end - if success do - Logger.info("Successfully cleaned image attachments from #{original_path}") - {:ok, cleaned_path} - else - # If cleaning failed, fall back to original file - File.rm(cleaned_path) - Logger.warning("Failed to clean attachments, using original file: #{original_path}") - {:ok, original_path} + defp delete_tracks_and_attachments(cleaned_path, track_uids) do + # Delete each image track by UID using mkvpropedit + result = + Enum.reduce_while(track_uids, :ok, fn uid, _acc -> + case System.cmd("mkvpropedit", [cleaned_path, "--delete-track", uid], + stderr_to_stdout: true + ) do + {_output, 0} -> + {:cont, :ok} + + {output, exit_code} -> + Logger.warning( + "mkvpropedit failed to delete track #{uid}: exit #{exit_code}, output: #{output}" + ) + + {:halt, {:error, :track_deletion_failed}} + end + end) + + # Also delete image attachments + case result do + :ok -> delete_image_attachments(cleaned_path) + error -> error end end - defp remove_attachment_type(mime_type, cleaned_path) do - case System.cmd( - "mkvpropedit", - ["--delete-attachment", "mime-type:#{mime_type}", cleaned_path], - stderr_to_stdout: true - ) do - {_output, 0} -> - true + defp get_image_track_uids(file_path) do + case System.cmd("mkvmerge", ["-J", file_path], stderr_to_stdout: true) do + {output, 0} -> + parse_image_tracks_from_json(output) + + {_output, _exit_code} -> + {:error, :mkvmerge_failed} + end + end - # Exit code 2 means "no attachments of this type found" - that's OK - {_output, 2} -> - true + defp parse_image_tracks_from_json(output) do + case Jason.decode(output) do + {:ok, %{"tracks" => tracks}} -> + image_uids = + tracks + |> Enum.filter(&image_video_track?/1) + |> Enum.map(& &1["properties"]["uid"]) + |> Enum.reject(&is_nil/1) + |> Enum.map(&to_string/1) - {output, exit_code} -> - Logger.warning( - "mkvpropedit failed for #{mime_type} on #{cleaned_path}: exit #{exit_code}, output: #{output}" - ) + {:ok, image_uids} - # Continue with other types even if one fails - true + _ -> + {:ok, []} end end + defp image_video_track?(track) do + track["type"] == "video" and + (track["codec"] in ["MJPEG", "PNG"] or + String.contains?(track["properties"]["codec_id"] || "", ["V_MS/VFW", "PNG"])) + end + + defp delete_image_attachments(cleaned_path) do + attachment_types = ["image/jpg", "image/jpeg", "image/png"] + + result = + Enum.reduce_while(attachment_types, :ok, fn mime_type, _acc -> + case System.cmd( + "mkvpropedit", + ["--delete-attachment", "mime-type:#{mime_type}", cleaned_path], + stderr_to_stdout: true + ) do + {_output, 0} -> + {:cont, :ok} + + # Exit code 2 means "no attachments of this type found" - that's OK + {_output, 2} -> + {:cont, :ok} + + {output, exit_code} -> + Logger.warning( + "mkvpropedit failed for #{mime_type}: exit #{exit_code}, output: #{output}" + ) + + # Continue even if one type fails + {:cont, :ok} + end + end) + + result + end + @spec open_port([binary()]) :: port() | :error def open_port(args) do - # Check if this is an encoding operation that needs input file cleaning - cleaned_args = - case preprocess_input_file(args) do - {:ok, new_args} -> new_args - # Fall back to original args if preprocessing fails - {:error, _reason} -> args - end + # Preprocess input file to remove image streams/attachments + {:ok, cleaned_args} = preprocess_input_file(args) case System.find_executable("ab-av1") do nil -> @@ -200,21 +279,16 @@ defmodule Reencodarr.AbAv1.Helper do end end - @spec preprocess_input_file([String.t()]) :: {:ok, [String.t()]} | {:error, term()} + @spec preprocess_input_file([String.t()]) :: {:ok, [String.t()]} defp preprocess_input_file(args) do # Find the input file path in the args case find_input_file_path(args) do {:ok, input_path, input_index} -> # Clean the input file of image attachments - case clean_mkv_attachments(input_path) do - {:ok, cleaned_path} -> - # Replace the input path in args with cleaned path - new_args = List.replace_at(args, input_index, cleaned_path) - {:ok, new_args} - - {:error, reason} -> - {:error, reason} - end + {:ok, cleaned_path} = clean_mkv_attachments(input_path) + # Replace the input path in args with cleaned path + new_args = List.replace_at(args, input_index, cleaned_path) + {:ok, new_args} :not_found -> # No input file found in args, return as-is diff --git a/lib/reencodarr/analyzer/broadway/producer.ex b/lib/reencodarr/analyzer/broadway/producer.ex index fa7e59ce..0b85c7ee 100644 --- a/lib/reencodarr/analyzer/broadway/producer.ex +++ b/lib/reencodarr/analyzer/broadway/producer.ex @@ -71,12 +71,8 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do @impl GenStage def init(_opts) do - # Subscribe to media events for new videos - Phoenix.PubSub.subscribe(Reencodarr.PubSub, "media_events") - # Subscribe to video state transitions for new videos needing analysis - Phoenix.PubSub.subscribe(Reencodarr.PubSub, "video_state_transitions") - # Subscribe to analyzer events to know when processing completes - Phoenix.PubSub.subscribe(Reencodarr.PubSub, "analyzer_events") + # Subscribe ONLY to unified Events channel for all completion events + Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) # Send a delayed message to trigger initial dispatch Process.send_after(self(), :initial_dispatch, 1000) @@ -97,8 +93,8 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do @impl GenStage def handle_call(:actively_running?, _from, state) do - # For telemetry/progress - actively running if processing or pausing - actively_running = PipelineStateMachine.actively_working?(state.pipeline) + # Simple: if we're processing videos, we're actively running + actively_running = state.processing > 0 {:reply, actively_running, [], state} end @@ -116,19 +112,9 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do @impl GenStage def handle_cast(:broadcast_status, state) do - # Broadcast actual current status to dashboard - current_state = PipelineStateMachine.get_state(state.pipeline) - - # Map pipeline state to dashboard status - status = - case current_state do - :processing -> :processing - :paused -> :paused - :running -> :running - _ -> :stopped - end + # Simple: if processing videos, show processing, otherwise idle + status = if state.processing > 0, do: :processing, else: :idle - # Broadcast as service_status event with the actual state Events.broadcast_event(:service_status, %{service: :analyzer, status: status}) {:noreply, [], state} diff --git a/lib/reencodarr/core/time.ex b/lib/reencodarr/core/time.ex index 6e0eb58f..c463b8fc 100644 --- a/lib/reencodarr/core/time.ex +++ b/lib/reencodarr/core/time.ex @@ -17,17 +17,17 @@ defmodule Reencodarr.Core.Time do iex> Reencodarr.Core.Time.to_seconds(2, "hours") 7200 """ - @spec to_seconds(integer(), String.t()) :: integer() + @spec to_seconds(integer() | float(), String.t()) :: integer() def to_seconds(time, unit) do case String.downcase(unit) do - unit when unit in ["second", "seconds"] -> time - unit when unit in ["minute", "minutes"] -> time * 60 - unit when unit in ["hour", "hours"] -> time * 3600 - unit when unit in ["day", "days"] -> time * 86_400 - unit when unit in ["week", "weeks"] -> time * 604_800 - unit when unit in ["month", "months"] -> time * 2_629_746 - unit when unit in ["year", "years"] -> time * 31_556_952 - _ -> time + unit when unit in ["second", "seconds"] -> trunc(time) + unit when unit in ["minute", "minutes"] -> trunc(time * 60) + unit when unit in ["hour", "hours"] -> trunc(time * 3600) + unit when unit in ["day", "days"] -> trunc(time * 86_400) + unit when unit in ["week", "weeks"] -> trunc(time * 604_800) + unit when unit in ["month", "months"] -> trunc(time * 2_629_746) + unit when unit in ["year", "years"] -> trunc(time * 31_556_952) + _ -> trunc(time) end end diff --git a/lib/reencodarr/crf_searcher.ex b/lib/reencodarr/crf_searcher.ex index a29cafb6..b0915df5 100644 --- a/lib/reencodarr/crf_searcher.ex +++ b/lib/reencodarr/crf_searcher.ex @@ -6,6 +6,7 @@ defmodule Reencodarr.CrfSearcher do Broadway pipeline that performs VMAF quality targeting searches on analyzed videos. """ + alias Reencodarr.AbAv1.CrfSearch alias Reencodarr.CrfSearcher.Broadway alias Reencodarr.Media @@ -26,15 +27,13 @@ defmodule Reencodarr.CrfSearcher do def running?, do: Broadway.running?() @doc "Check if the CRF searcher is actively processing work" - def actively_running?, do: Broadway.running?() + def actively_running? do + # Simple: if CrfSearch GenServer is busy, we're actively running + not available?() + end @doc "Check if the CRF search GenServer is available" - def available? do - case GenServer.whereis(Reencodarr.AbAv1.CrfSearch) do - nil -> false - pid when is_pid(pid) -> Process.alive?(pid) - end - end + def available?, do: CrfSearch.available?() @doc "Get the current state of the CRF searcher pipeline" def status do diff --git a/lib/reencodarr/crf_searcher/broadway.ex b/lib/reencodarr/crf_searcher/broadway.ex index b24b73a2..20ce2ddb 100644 --- a/lib/reencodarr/crf_searcher/broadway.ex +++ b/lib/reencodarr/crf_searcher/broadway.ex @@ -17,7 +17,6 @@ defmodule Reencodarr.CrfSearcher.Broadway do alias Broadway.Message alias Reencodarr.AbAv1 alias Reencodarr.CrfSearcher.Broadway.Producer - alias Reencodarr.Media.VideoStateMachine @typedoc "Video struct for CRF search processing" @type video :: %{id: integer(), path: binary()} @@ -136,12 +135,18 @@ defmodule Reencodarr.CrfSearcher.Broadway do end @impl Broadway - def handle_batch(:default, messages, _batch_info, context) do - crf_quality = Map.get(context, :crf_quality, 95) + def handle_batch(_batcher, messages, _batch_info, _context) do + crf_quality = Application.get_env(:reencodarr, :crf_search_quality, 95) + Logger.info("[CRF Broadway] Processing batch of #{length(messages)} videos") Enum.map(messages, fn message -> + Logger.info( + "[CRF Broadway] Processing video #{message.data.id}: #{Path.basename(message.data.path)}" + ) + case process_video_crf_search(message.data, crf_quality) do :ok -> + Logger.info("[CRF Broadway] Successfully queued video #{message.data.id}") message {:error, reason} -> @@ -169,23 +174,19 @@ defmodule Reencodarr.CrfSearcher.Broadway do @spec process_video_crf_search(video(), pos_integer()) :: :ok | {:error, term()} defp process_video_crf_search(video, crf_quality) do - Logger.debug("Starting CRF search for video #{video.id}: #{video.path}") - - # Mark as crf_searching BEFORE sending to CrfSearch - # This prevents the producer from returning the same video again - case VideoStateMachine.transition_to_crf_searching(video) do - {:ok, _updated_video} -> - # Now send to CrfSearch - it's fire-and-forget via cast - :ok = AbAv1.crf_search(video, crf_quality) + Logger.info( + "[CRF Broadway] Starting CRF search for video #{video.id}: #{Path.basename(video.path)}" + ) - Logger.debug("CRF search queued successfully for video #{video.id}") + # Just send to CrfSearch - it's fire-and-forget via cast + # CrfSearch will handle state transitions when it actually starts processing + result = AbAv1.crf_search(video, crf_quality) - :ok + Logger.info( + "[CRF Broadway] CRF search queued for video #{video.id}, result: #{inspect(result)}" + ) - {:error, reason} -> - Logger.error("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") - {:error, reason} - end + :ok rescue exception -> error_message = diff --git a/lib/reencodarr/crf_searcher/broadway/producer.ex b/lib/reencodarr/crf_searcher/broadway/producer.ex index 2bdbc48e..992e5a96 100644 --- a/lib/reencodarr/crf_searcher/broadway/producer.ex +++ b/lib/reencodarr/crf_searcher/broadway/producer.ex @@ -7,6 +7,8 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do use GenStage require Logger + alias Reencodarr.AbAv1.CrfSearch + alias Reencodarr.Dashboard.Events alias Reencodarr.Media def start_link(opts) do @@ -15,14 +17,26 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @impl GenStage def init(_opts) do - {:producer, %{}} + Logger.info("CRF Producer: Initializing producer") + # Subscribe to unified Events channel for completion notifications + Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) + Logger.info("CRF Producer: Subscribed to Events channel") + {:producer, %{demand: 0, current_video_id: nil}} end @impl GenStage - def handle_demand(_demand, state) do - # Just return 1 video per demand - CrfSearch will handle queueing - videos = Media.get_videos_for_crf_search(1) - {:noreply, videos, state} + def handle_demand(demand, state) do + Logger.debug("CRF Producer: handle_demand(#{demand}) called") + new_demand = state.demand + demand + dispatch_videos(%{state | demand: new_demand}) + end + + @impl GenStage + def handle_info({:crf_search_completed, %{video_id: video_id}}, state) do + Logger.info("CRF Producer: CRF search completed for video #{video_id}") + # Clear current video and dispatch pending demand if available + new_state = %{state | current_video_id: nil} + dispatch_videos(new_state) end @impl GenStage @@ -33,4 +47,35 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @impl GenStage def handle_call(_msg, _from, state), do: {:reply, :ok, [], state} + + # Dispatch videos based on demand and CrfSearch availability + defp dispatch_videos(%{demand: demand, current_video_id: current_id} = state) when demand > 0 do + available? = CrfSearch.available?() + + Logger.debug( + "CRF Producer: demand=#{demand}, available?=#{available?}, current_video_id=#{inspect(current_id)}" + ) + + # Don't fetch new videos if we already have one in flight + if available? and is_nil(current_id) do + videos = Media.get_videos_for_crf_search(1) + consumed = length(videos) + Logger.debug("CRF Producer: fetched #{consumed} videos") + + # Track the video ID we're sending + new_current_id = if consumed > 0, do: hd(videos).id, else: nil + + {:noreply, videos, %{state | demand: demand - consumed, current_video_id: new_current_id}} + else + # CrfSearch busy or video already in flight - keep demand for later + reason = if is_nil(current_id), do: "CrfSearch busy", else: "video #{current_id} in flight" + Logger.debug("CRF Producer: #{reason}, keeping demand") + {:noreply, [], state} + end + end + + defp dispatch_videos(state) do + Logger.debug("CRF Producer: no demand (demand=#{state.demand})") + {:noreply, [], state} + end end diff --git a/lib/reencodarr/encoder.ex b/lib/reencodarr/encoder.ex index d11db47d..7879e4cc 100644 --- a/lib/reencodarr/encoder.ex +++ b/lib/reencodarr/encoder.ex @@ -6,6 +6,7 @@ defmodule Reencodarr.Encoder do Broadway pipeline that performs the final video encoding after CRF searches. """ + alias Reencodarr.AbAv1.Encode alias Reencodarr.Encoder.Broadway.Producer alias Reencodarr.Media @@ -29,15 +30,13 @@ defmodule Reencodarr.Encoder do def running?, do: Producer.running?() @doc "Check if the encoder is actively processing work" - def actively_running?, do: Producer.actively_running?() + def actively_running? do + # Simple: encoder is NOT available means it's busy encoding + not available?() + end @doc "Check if the encode GenServer is available" - def available? do - case GenServer.whereis(Reencodarr.AbAv1.Encode) do - nil -> false - pid when is_pid(pid) -> Process.alive?(pid) - end - end + def available?, do: Encode.available?() @doc "Get the current state of the encoder pipeline" def status do diff --git a/lib/reencodarr/encoder/broadway/producer.ex b/lib/reencodarr/encoder/broadway/producer.ex index 3ccf33f3..c4671678 100644 --- a/lib/reencodarr/encoder/broadway/producer.ex +++ b/lib/reencodarr/encoder/broadway/producer.ex @@ -50,9 +50,7 @@ defmodule Reencodarr.Encoder.Broadway.Producer do @impl GenStage def init(_opts) do - # Subscribe to video state transitions for videos that finished CRF search - Phoenix.PubSub.subscribe(Reencodarr.PubSub, "video_state_transitions") - # Subscribe to dashboard events to know when encoding completes + # Subscribe ONLY to unified Events channel for all completion events Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) {:producer, @@ -68,16 +66,15 @@ defmodule Reencodarr.Encoder.Broadway.Producer do "Producer: handle_demand called - new demand: #{demand}, current demand: #{state.demand}, total: #{state.demand + demand}" ) - # Only accumulate demand if not currently processing - # Encoder is single-concurrency, so we shouldn't accept more demand while busy - current_status = PipelineStateMachine.get_state(state.pipeline) + # Always accumulate demand - we'll handle it when encoder becomes available + new_state = %{state | demand: state.demand + demand} + current_status = PipelineStateMachine.get_state(new_state.pipeline) if current_status == :processing do - # If we're already processing, ignore the demand - Logger.debug("Producer: handle_demand - currently processing, ignoring demand") - {:noreply, [], state} + # Already processing - keep demand for later + Logger.debug("Producer: handle_demand - currently processing, storing demand for later") + {:noreply, [], new_state} else - new_state = %{state | demand: state.demand + demand} Logger.debug("Producer: handle_demand - not processing, calling dispatch_if_ready") dispatch_if_ready(new_state) end @@ -112,19 +109,14 @@ defmodule Reencodarr.Encoder.Broadway.Producer do @impl GenStage def handle_cast(:broadcast_status, state) do - # Broadcast actual current status to dashboard - current_state = PipelineStateMachine.get_state(state.pipeline) - - # Map pipeline state to dashboard status + # Simple: check if encoder is actually processing status = - case current_state do - :processing -> :processing - :paused -> :paused - :running -> :running - _ -> :stopped + if Reencodarr.Encoder.available?() do + :idle + else + :processing end - # Broadcast as service_status event with the actual state Events.broadcast_event(:service_status, %{service: :encoder, status: status}) {:noreply, [], state} From bed2edc9270c447ce78ffca2e155cbb9c669e93a Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Mon, 27 Oct 2025 18:49:54 -0600 Subject: [PATCH 7/9] Fix warnings and broken tests after Broadway simplification - Remove dead code calling removed Producer.pause() in encoder/broadway.ex - Replace non-existent Factory import with Fixtures in tests - Remove obsolete state_management_test.exs (tested removed State module) - Fix broadway_producers_test.exs: * Use {:producer, state} pattern (GenStage) not {:ok, state} * Handle {:ok, video} tuple from video_fixture() * Simplify assertions to be timing-tolerant - Make dispatch_available() safer by checking Process.whereis() first - Add orphan file deletion when video not found in DB - Fix mark_invalid_videos to return {:error, path} not {:skip, message} - Remove unused Video alias from crf_search.ex - Add logging for image attachment detection/removal All tests passing (608 tests, 0 failures) No compile warnings --- lib/reencodarr/ab_av1/crf_search.ex | 1 - lib/reencodarr/ab_av1/helper.ex | 44 +- lib/reencodarr/analyzer.ex | 29 +- lib/reencodarr/analyzer/broadway.ex | 44 +- lib/reencodarr/analyzer/broadway/producer.ex | 436 ++---------------- .../analyzer/processing/pipeline.ex | 2 +- .../crf_searcher/broadway/producer.ex | 69 +-- lib/reencodarr/crf_searcher/supervisor.ex | 5 +- lib/reencodarr/encoder.ex | 26 +- lib/reencodarr/encoder/broadway.ex | 85 +--- lib/reencodarr/encoder/broadway/producer.ex | 390 ++-------------- lib/reencodarr_web/live/dashboard_live.ex | 20 +- .../broadway/state_management_test.exs | 242 ---------- test/reencodarr/broadway_producers_test.exs | 121 +++++ 14 files changed, 293 insertions(+), 1221 deletions(-) delete mode 100644 test/reencodarr/broadway/state_management_test.exs create mode 100644 test/reencodarr/broadway_producers_test.exs diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index 4e5aa9c3..13896ad5 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -18,7 +18,6 @@ defmodule Reencodarr.AbAv1.CrfSearch do alias Reencodarr.ErrorHelpers alias Reencodarr.Formatters alias Reencodarr.Media - alias Reencodarr.Media.Video alias Reencodarr.Media.VideoStateMachine alias Reencodarr.Repo diff --git a/lib/reencodarr/ab_av1/helper.ex b/lib/reencodarr/ab_av1/helper.ex index de870770..d1ff2ecd 100644 --- a/lib/reencodarr/ab_av1/helper.ex +++ b/lib/reencodarr/ab_av1/helper.ex @@ -78,10 +78,12 @@ defmodule Reencodarr.AbAv1.Helper do case check_for_image_attachments(file_path) do {:ok, false} -> # No image attachments found, use original file + Logger.debug("No image attachments found in #{file_path}") {:ok, file_path} {:ok, true} -> # Image attachments found, clean them + Logger.info("Image attachments found in #{file_path}, cleaning...") remove_image_attachments(file_path) {:error, reason} -> @@ -207,13 +209,7 @@ defmodule Reencodarr.AbAv1.Helper do defp parse_image_tracks_from_json(output) do case Jason.decode(output) do {:ok, %{"tracks" => tracks}} -> - image_uids = - tracks - |> Enum.filter(&image_video_track?/1) - |> Enum.map(& &1["properties"]["uid"]) - |> Enum.reject(&is_nil/1) - |> Enum.map(&to_string/1) - + image_uids = extract_image_track_uids(tracks) {:ok, image_uids} _ -> @@ -221,27 +217,53 @@ defmodule Reencodarr.AbAv1.Helper do end end + defp extract_image_track_uids(tracks) do + tracks + |> Enum.filter(&filter_and_log_video_track/1) + |> Enum.map(& &1["properties"]["uid"]) + |> Enum.reject(&is_nil/1) + |> Enum.map(&to_string/1) + end + + defp filter_and_log_video_track(track) do + is_image = image_video_track?(track) + + if track["type"] == "video" do + Logger.debug( + "Video track: codec=#{inspect(track["codec"])}, codec_id=#{inspect(track["properties"]["codec_id"])}, uid=#{inspect(track["properties"]["uid"])}, is_image=#{is_image}" + ) + end + + is_image + end + defp image_video_track?(track) do track["type"] == "video" and - (track["codec"] in ["MJPEG", "PNG"] or + (String.upcase(track["codec"] || "") in ["MJPEG", "PNG"] or String.contains?(track["properties"]["codec_id"] || "", ["V_MS/VFW", "PNG"])) end defp delete_image_attachments(cleaned_path) do attachment_types = ["image/jpg", "image/jpeg", "image/png"] + Logger.info("Attempting to delete image attachments from #{cleaned_path}") + result = Enum.reduce_while(attachment_types, :ok, fn mime_type, _acc -> + Logger.debug("Deleting attachments with mime-type: #{mime_type}") + case System.cmd( "mkvpropedit", - ["--delete-attachment", "mime-type:#{mime_type}", cleaned_path], + [cleaned_path, "--delete-attachment", "mime-type:#{mime_type}"], stderr_to_stdout: true ) do - {_output, 0} -> + {output, 0} -> + Logger.info("Successfully deleted #{mime_type} attachments: #{output}") {:cont, :ok} # Exit code 2 means "no attachments of this type found" - that's OK - {_output, 2} -> + {output, 2} -> + Logger.debug("No #{mime_type} attachments found: #{output}") {:cont, :ok} {output, exit_code} -> diff --git a/lib/reencodarr/analyzer.ex b/lib/reencodarr/analyzer.ex index 8311d233..7d8e94bc 100644 --- a/lib/reencodarr/analyzer.ex +++ b/lib/reencodarr/analyzer.ex @@ -1,44 +1,19 @@ defmodule Reencodarr.Analyzer do @moduledoc """ Public API for the Analyzer pipeline. - - Provides convenient functions for controlling and monitoring the analyzer - Broadway pipeline that processes video files for MediaInfo analysis. """ alias Reencodarr.Analyzer.Broadway.Producer alias Reencodarr.Media - # Control functions - - @doc "Start/resume the analyzer pipeline" - def start, do: Producer.resume() - - @doc "Pause the analyzer pipeline" - def pause, do: Producer.pause() - - @doc "Resume the analyzer pipeline (alias for start)" - def resume, do: Producer.resume() - @doc "Force dispatch of available work" def dispatch_available, do: Producer.dispatch_available() - @doc "Queue a video for analysis (typically called by sync process)" - def queue_video(video_info), do: Producer.add_video(video_info) - - # Status functions - - @doc "Check if the analyzer is running (user intent)" - def running?, do: Producer.running?() - - @doc "Check if the analyzer is actively processing work" - def actively_running?, do: Producer.actively_running?() - @doc "Get the current state of the analyzer pipeline" def status do %{ - running: running?(), - actively_running: actively_running?(), + running: true, + actively_running: false, queue_count: Media.count_videos_needing_analysis() } end diff --git a/lib/reencodarr/analyzer/broadway.ex b/lib/reencodarr/analyzer/broadway.ex index 90e30290..0e8b2d4d 100644 --- a/lib/reencodarr/analyzer/broadway.ex +++ b/lib/reencodarr/analyzer/broadway.ex @@ -91,39 +91,24 @@ defmodule Reencodarr.Analyzer.Broadway do end @doc """ - Add a video to the pipeline for processing. + Add a video to the pipeline for processing - no-op, Broadway polls automatically. """ - def process_path(video_info) do - Producer.add_video(video_info) - end + def process_path(_video_info), do: :ok @doc """ - Check if the analyzer is running (not paused). + Check if the analyzer is running (always true now). """ - def running? do - case Process.whereis(__MODULE__) do - nil -> false - _pid -> Producer.running?() - end - end + def running?, do: true @doc """ - Pause the analyzer. - - Pauses processing by updating the producer's state machine. + Pause the analyzer - no-op, always runs now. """ - def pause do - Producer.pause() - end + def pause, do: :ok @doc """ - Resume the analyzer. - - Resumes processing by updating the producer's state machine. + Resume the analyzer - no-op, always runs now. """ - def resume do - Producer.resume() - end + def resume, do: :ok @doc """ Trigger dispatch of available videos for analysis. @@ -133,7 +118,7 @@ defmodule Reencodarr.Analyzer.Broadway do end # Alias for API compatibility - def start, do: resume() + def start, do: :ok @impl Broadway def handle_message(_processor_name, message, _context) do @@ -681,7 +666,16 @@ defmodule Reencodarr.Analyzer.Broadway do :ok {:error, :not_found} -> - Logger.warning("Video not found in database, cannot mark as failed: #{path}") + Logger.warning("Video not found in database, deleting orphan file: #{path}") + + case File.rm(path) do + :ok -> + Logger.info("Successfully deleted orphan file: #{path}") + + {:error, reason} -> + Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}") + end + :ok end end diff --git a/lib/reencodarr/analyzer/broadway/producer.ex b/lib/reencodarr/analyzer/broadway/producer.ex index 0b85c7ee..b75ed985 100644 --- a/lib/reencodarr/analyzer/broadway/producer.ex +++ b/lib/reencodarr/analyzer/broadway/producer.ex @@ -1,442 +1,62 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do @moduledoc """ - Broadway producer for analyzer operations. - - This producer dispatches videos for analysis in batches of up to 5, - managing demand and batch processing for optimal mediainfo usage. + Simplest possible Broadway producer for analyzer. + Just fetch videos and return them - no pause/resume, no state machine. """ use GenStage require Logger - alias Reencodarr.Dashboard.Events alias Reencodarr.Media - alias Reencodarr.PipelineStateMachine - - @broadway_name Reencodarr.Analyzer.Broadway - - defmodule State do - @moduledoc false - defstruct [ - :demand, - :pipeline, - :paused, - :processing, - :pending_videos - ] - - def update(state, updates) when is_struct(state, __MODULE__) do - struct(state, updates) - end - - def update(state, updates) when is_map(state) do - # Handle case where state is a plain map (e.g., after crash/restart) - # Convert it to a proper State struct first - state_struct = struct(__MODULE__, state) - struct(state_struct, updates) - end - end def start_link(opts) do GenStage.start_link(__MODULE__, opts, name: __MODULE__) end - # Public API for external control - def pause, do: send_to_producer(:pause) - def resume, do: send_to_producer(:resume) - def dispatch_available, do: send_to_producer(:dispatch_available) - def add_video(video_info), do: send_to_producer({:add_video, video_info}) - - # Alias for API compatibility - def start, do: resume() - - def running? do - case find_producer_process() do - nil -> - false - - producer_pid -> - GenStage.call(producer_pid, :running?, 1000) - end - end - - def actively_running? do - case find_producer_process() do - nil -> - false - - producer_pid -> - GenStage.call(producer_pid, :actively_running?, 1000) + def dispatch_available do + case Process.whereis(__MODULE__) do + nil -> {:error, :producer_not_found} + pid -> send(pid, :poll) end end @impl GenStage def init(_opts) do - # Subscribe ONLY to unified Events channel for all completion events - Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) - - # Send a delayed message to trigger initial dispatch - Process.send_after(self(), :initial_dispatch, 1000) - - {:producer, - %State{ - demand: 0, - pipeline: PipelineStateMachine.new(:analyzer) - }} - end - - @impl GenStage - def handle_call(:running?, _from, state) do - # Button should reflect user intent - use centralized state machine - running = PipelineStateMachine.running?(state.pipeline) - {:reply, running, [], state} - end - - @impl GenStage - def handle_call(:actively_running?, _from, state) do - # Simple: if we're processing videos, we're actively running - actively_running = state.processing > 0 - {:reply, actively_running, [], state} - end - - @impl GenStage - def handle_call(:get_state, _from, state) do - {:reply, state, [], state} - end - - @impl GenStage - def handle_cast({:status_request, requester_pid}, state) do - current_state = PipelineStateMachine.get_state(state.pipeline) - send(requester_pid, {:status_response, :analyzer, current_state}) - {:noreply, [], state} - end - - @impl GenStage - def handle_cast(:broadcast_status, state) do - # Simple: if processing videos, show processing, otherwise idle - status = if state.processing > 0, do: :processing, else: :idle - - Events.broadcast_event(:service_status, %{service: :analyzer, status: status}) - - {:noreply, [], state} - end - - @impl GenStage - def handle_cast(:pause, state) do - new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) - {:noreply, [], new_state} - end - - @impl GenStage - def handle_cast(:resume, state) do - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.resume/1)) - end - - @impl GenStage - def handle_cast({:add_video, video_info}, state) do - Logger.info("Adding video to Broadway queue: #{video_info.path}") - Logger.debug("video info details", video_info: video_info) - - current_state = PipelineStateMachine.get_state(state.pipeline) - - Logger.debug("Current state - demand: #{state.demand}, status: #{current_state}") - - # No manual queue management - just trigger dispatch to check database - # The video should already be in the database with :needs_analysis state - dispatch_if_ready(state) - end - - @impl GenStage - def handle_cast(:dispatch_available, state) do - # Handle work availability and determine next steps - case PipelineStateMachine.get_state(state.pipeline) do - :pausing -> - {:noreply, [], - Map.update!(state, :pipeline, &PipelineStateMachine.transition_to(&1, :paused))} - - _ -> - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.work_available/1)) - end + # Poll every 2 seconds to check for new work + schedule_poll() + {:producer, %{}} end @impl GenStage def handle_demand(demand, state) when demand > 0 do - Logger.debug("Broadway producer received demand for #{demand} items") - new_state = State.update(state, demand: state.demand + demand) - dispatch_if_ready(new_state) + # Fetch up to 5 videos for batch processing + videos = Media.get_videos_needing_analysis(min(demand, 5)) + Logger.debug("Analyzer: handle_demand(#{demand}) -> #{length(videos)} videos") + {:noreply, videos, state} end @impl GenStage - def handle_info({:video_upserted, _video}, state) do - # New video was upserted - force dispatch to wake up idle Broadway - force_dispatch_if_running(state) - end - - @impl GenStage - def handle_info({:video_state_changed, video, :needs_analysis}, state) do - # Video needs analysis - if analyzer is running, force dispatch even without demand - Logger.debug("[Analyzer Producer] Received video needing analysis: #{video.path}") - force_dispatch_if_running(state) - end + def handle_demand(_demand, state), do: {:noreply, [], state} @impl GenStage - def handle_info({:analysis_completed, _path, _result}, state) do - # Individual analysis completed - this is handled by batch completion now - {:noreply, [], state} - end - - @impl GenStage - def handle_info({:batch_analysis_completed, _data}, state) do - # Batch analysis completed - use state machine to determine next state - Logger.debug("Producer: Received batch analysis completion notification") - - has_more_work = Media.count_videos_needing_analysis() > 0 - - new_state = - Map.update!(state, :pipeline, &PipelineStateMachine.work_completed(&1, has_more_work)) + def handle_info(:poll, state) do + schedule_poll() + # Check if there's work and manually ask Broadway to pull + case Media.count_videos_needing_analysis() do + 0 -> + {:noreply, [], state} - if has_more_work and PipelineStateMachine.available_for_work?(new_state.pipeline) do - dispatch_if_ready(new_state) - else - {:noreply, [], new_state} + _count -> + # There's work available - return one video to wake up Broadway + videos = Media.get_videos_needing_analysis(1) + Logger.debug("Analyzer: poll wakeup -> #{length(videos)} videos") + {:noreply, videos, state} end end @impl GenStage - def handle_info(:initial_dispatch, state) do - # Trigger initial dispatch after startup to check for videos needing analysis - Logger.debug("Producer: Initial dispatch triggered") - dispatch_if_ready(state) - end - - @impl GenStage - def handle_info(_msg, state) do - {:noreply, [], state} - end - - # Private functions - - defp send_to_producer(message) do - case find_producer_process() do - nil -> {:error, :producer_not_found} - producer_pid -> GenStage.cast(producer_pid, message) - end - end - - defp find_producer_process do - producer_supervisor_name = :"#{@broadway_name}.Broadway.ProducerSupervisor" - - with pid when is_pid(pid) <- Process.whereis(producer_supervisor_name), - children <- Supervisor.which_children(pid), - producer_pid when is_pid(producer_pid) <- find_actual_producer(children) do - producer_pid - else - _ -> nil - end - end - - defp find_actual_producer(children) do - Enum.find_value(children, &find_running_producer/1) - end - - defp find_running_producer({_id, pid, _type, _modules}) when is_pid(pid) do - if Process.alive?(pid) do - GenStage.call(pid, :running?, 1000) - pid - else - nil - end - end - - defp find_running_producer(_), do: nil - - defp dispatch_if_ready(state) do - current_state = PipelineStateMachine.get_state(state.pipeline) - - Logger.debug("dispatch_if_ready called - demand: #{state.demand}, status: #{current_state}") - - case can_dispatch?(state) do - {:auto_start, state} -> handle_auto_start(state) - {:resume_idle, state} -> handle_resume_from_idle(state) - {:dispatch, state} -> dispatch_videos(state) - {:no_dispatch, state} -> handle_no_dispatch_conditions(state) - end - end - - defp can_dispatch?(state) do - cond do - ready_for_auto_start?(state) -> {:auto_start, state} - ready_for_resume_from_idle?(state) -> {:resume_idle, state} - ready_for_dispatch?(state) -> {:dispatch, state} - true -> {:no_dispatch, state} - end - end - - defp ready_for_auto_start?(state) do - PipelineStateMachine.get_state(state.pipeline) == :paused and state.demand > 0 and - Media.count_videos_needing_analysis() > 0 - end - - defp ready_for_resume_from_idle?(state) do - PipelineStateMachine.get_state(state.pipeline) == :idle and state.demand > 0 and - Media.count_videos_needing_analysis() > 0 - end - - defp ready_for_dispatch?(state) do - PipelineStateMachine.get_state(state.pipeline) == :running and state.demand > 0 - end - - defp handle_auto_start(state) do - Logger.info("Auto-starting analyzer - videos available for processing") - - # Send to Dashboard using Events system - Events.broadcast_event(:analyzer_started, %{}) - # Start with minimal progress to indicate activity - Events.broadcast_event(:analyzer_progress, %{ - count: 0, - total: 1, - percent: 0 - }) - - # Only transition if not already running - new_pipeline = - if PipelineStateMachine.get_state(state.pipeline) != :running do - PipelineStateMachine.transition_to(state.pipeline, :running) - else - state.pipeline - end - - new_state = %{state | pipeline: new_pipeline} - dispatch_videos(new_state) - end - - defp handle_resume_from_idle(state) do - Logger.info("Analyzer resuming from idle - videos available for processing") - - # Send to Dashboard V2 using Events system - # Start with minimal progress to indicate activity - Events.broadcast_event(:analyzer_progress, %{ - count: 0, - total: 1, - percent: 0 - }) - - # Only transition if not already running - new_pipeline = - if PipelineStateMachine.get_state(state.pipeline) != :running do - PipelineStateMachine.transition_to(state.pipeline, :running) - else - state.pipeline - end - - new_state = %{state | pipeline: new_pipeline} - dispatch_videos(new_state) - end - - defp handle_no_dispatch_conditions(state) do - Logger.debug("Conditions not met for dispatch - demand: #{state.demand}") - - # If analyzer is running but has no work to do, set to idle instead of paused - if PipelineStateMachine.get_state(state.pipeline) == :running and state.demand == 0 and - Media.count_videos_needing_analysis() == 0 do - handle_idle_transition(state) - else - {:noreply, [], state} - end - end - - defp handle_idle_transition(state) do - # Check if there are any videos needing analysis in the database - database_queue_count = Reencodarr.Media.count_videos_needing_analysis() - - if database_queue_count == 0 do - Logger.debug("Analyzer has no work - setting to idle") - # Set to idle - ready to work but no current tasks - new_state = %{state | pipeline: PipelineStateMachine.transition_to(state.pipeline, :idle)} - {:noreply, [], new_state} - else - Logger.debug( - "Analyzer has #{database_queue_count} videos to analyze but no demand - staying running" - ) - - {:noreply, [], state} - end - end - - defp dispatch_videos(state) do - # Get videos from the database up to demand - videos = Media.get_videos_needing_analysis(state.demand) - - Logger.debug("Dispatching videos - demand: #{state.demand}, found: #{length(videos)}") - - if length(videos) > 0 do - Logger.debug("Videos being dispatched: #{inspect(Enum.map(videos, & &1.path))}") - - debug_video_states(videos) - end - - case videos do - [] -> - # No videos available - go to idle if currently running - if PipelineStateMachine.get_state(state.pipeline) == :running do - Logger.info("Analyzer going idle - no videos to process") - - new_state = %{ - state - | pipeline: PipelineStateMachine.transition_to(state.pipeline, :idle) - } - - {:noreply, [], new_state} - else - Logger.debug("No videos available for dispatch, keeping demand: #{state.demand}") - {:noreply, [], state} - end - - videos -> - Logger.debug("Broadway producer dispatching #{length(videos)} videos for analysis") - Logger.debug("All videos being dispatched: #{inspect(Enum.map(videos, & &1.path))}") - new_demand = state.demand - length(videos) - new_state = State.update(state, demand: new_demand) - - {:noreply, videos, new_state} - end - end - - # Debug helper function to check video states - defp debug_video_states(videos) do - Enum.each(videos, fn video_info -> - case Media.get_video_by_path(video_info.path) do - {:error, :not_found} -> - Logger.debug("video not found in database", path: video_info.path) - - {:ok, video} -> - Logger.debug("video state check", path: video_info.path, state: video.state) - end - end) - end - - # Helper function to force dispatch when analyzer is running - defp force_dispatch_if_running(%State{pipeline: pipeline, demand: 0} = state) do - if PipelineStateMachine.get_state(pipeline) == :running do - videos = Media.get_videos_needing_analysis(1) - - if length(videos) > 0 do - Logger.debug( - "[Analyzer Producer] Force dispatching video to wake up idle Broadway pipeline" - ) - - # Temporarily add demand to force dispatch, then call dispatch_if_ready - temp_state = State.update(state, demand: 1) - dispatch_if_ready(temp_state) - else - {:noreply, [], state} - end - else - {:noreply, [], state} - end - end + def handle_info(_msg, state), do: {:noreply, [], state} - defp force_dispatch_if_running(state) do - # Already has demand or not running, use normal dispatch - dispatch_if_ready(state) + defp schedule_poll do + Process.send_after(self(), :poll, 2000) end end diff --git a/lib/reencodarr/analyzer/processing/pipeline.ex b/lib/reencodarr/analyzer/processing/pipeline.ex index 7759614f..a20398ba 100644 --- a/lib/reencodarr/analyzer/processing/pipeline.ex +++ b/lib/reencodarr/analyzer/processing/pipeline.ex @@ -203,7 +203,7 @@ defmodule Reencodarr.Analyzer.Processing.Pipeline do defp mark_invalid_videos(invalid_videos) do Enum.map(invalid_videos, fn video_info -> - {:skip, "file validation failed: #{video_info.path}"} + {:error, video_info.path} end) end diff --git a/lib/reencodarr/crf_searcher/broadway/producer.ex b/lib/reencodarr/crf_searcher/broadway/producer.ex index 992e5a96..844e381f 100644 --- a/lib/reencodarr/crf_searcher/broadway/producer.ex +++ b/lib/reencodarr/crf_searcher/broadway/producer.ex @@ -1,14 +1,13 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @moduledoc """ - Broadway producer for CRF search - just returns videos one at a time. - CrfSearch GenServer handles queueing via its mailbox. + Simplest possible Broadway producer for CRF search. + When demand arrives, check if CrfSearch is available and return 1 video if so. """ use GenStage require Logger alias Reencodarr.AbAv1.CrfSearch - alias Reencodarr.Dashboard.Events alias Reencodarr.Media def start_link(opts) do @@ -17,65 +16,43 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @impl GenStage def init(_opts) do - Logger.info("CRF Producer: Initializing producer") - # Subscribe to unified Events channel for completion notifications - Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) - Logger.info("CRF Producer: Subscribed to Events channel") - {:producer, %{demand: 0, current_video_id: nil}} + # Poll every 2 seconds to check for new work + schedule_poll() + {:producer, %{}} end @impl GenStage - def handle_demand(demand, state) do - Logger.debug("CRF Producer: handle_demand(#{demand}) called") - new_demand = state.demand + demand - dispatch_videos(%{state | demand: new_demand}) + def handle_demand(_demand, state) do + dispatch(state) end @impl GenStage - def handle_info({:crf_search_completed, %{video_id: video_id}}, state) do - Logger.info("CRF Producer: CRF search completed for video #{video_id}") - # Clear current video and dispatch pending demand if available - new_state = %{state | current_video_id: nil} - dispatch_videos(new_state) + def handle_info(:poll, state) do + schedule_poll() + # Check if there's work and CrfSearch is available, wake up Broadway if so + if CrfSearch.available?() do + case Media.get_videos_for_crf_search(1) do + [] -> {:noreply, [], state} + videos -> {:noreply, videos, state} + end + else + {:noreply, [], state} + end end @impl GenStage def handle_info(_msg, state), do: {:noreply, [], state} - @impl GenStage - def handle_cast(_msg, state), do: {:noreply, [], state} - - @impl GenStage - def handle_call(_msg, _from, state), do: {:reply, :ok, [], state} - - # Dispatch videos based on demand and CrfSearch availability - defp dispatch_videos(%{demand: demand, current_video_id: current_id} = state) when demand > 0 do - available? = CrfSearch.available?() - - Logger.debug( - "CRF Producer: demand=#{demand}, available?=#{available?}, current_video_id=#{inspect(current_id)}" - ) - - # Don't fetch new videos if we already have one in flight - if available? and is_nil(current_id) do + defp dispatch(state) do + if CrfSearch.available?() do videos = Media.get_videos_for_crf_search(1) - consumed = length(videos) - Logger.debug("CRF Producer: fetched #{consumed} videos") - - # Track the video ID we're sending - new_current_id = if consumed > 0, do: hd(videos).id, else: nil - - {:noreply, videos, %{state | demand: demand - consumed, current_video_id: new_current_id}} + {:noreply, videos, state} else - # CrfSearch busy or video already in flight - keep demand for later - reason = if is_nil(current_id), do: "CrfSearch busy", else: "video #{current_id} in flight" - Logger.debug("CRF Producer: #{reason}, keeping demand") {:noreply, [], state} end end - defp dispatch_videos(state) do - Logger.debug("CRF Producer: no demand (demand=#{state.demand})") - {:noreply, [], state} + defp schedule_poll do + Process.send_after(self(), :poll, 2000) end end diff --git a/lib/reencodarr/crf_searcher/supervisor.ex b/lib/reencodarr/crf_searcher/supervisor.ex index a21389c0..495571eb 100644 --- a/lib/reencodarr/crf_searcher/supervisor.ex +++ b/lib/reencodarr/crf_searcher/supervisor.ex @@ -9,10 +9,9 @@ defmodule Reencodarr.CrfSearcher.Supervisor do @impl true def init(:ok) do - # Only start the CrfSearch GenServer automatically - # The Broadway pipeline will be started/stopped dynamically via pause/resume children = [ - {Reencodarr.AbAv1.CrfSearch, []} + {Reencodarr.AbAv1.CrfSearch, []}, + {Reencodarr.CrfSearcher.Broadway, []} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/lib/reencodarr/encoder.ex b/lib/reencodarr/encoder.ex index 7879e4cc..988627fa 100644 --- a/lib/reencodarr/encoder.ex +++ b/lib/reencodarr/encoder.ex @@ -1,39 +1,17 @@ defmodule Reencodarr.Encoder do @moduledoc """ Public API for the Encoder pipeline. - - Provides convenient functions for controlling and monitoring the Encoder - Broadway pipeline that performs the final video encoding after CRF searches. """ alias Reencodarr.AbAv1.Encode alias Reencodarr.Encoder.Broadway.Producer alias Reencodarr.Media - # Control functions - - @doc "Start/resume the encoder pipeline" - def start, do: Producer.resume() - - @doc "Pause the encoder pipeline" - def pause, do: Producer.pause() - - @doc "Resume the encoder pipeline (alias for start)" - def resume, do: Producer.resume() - @doc "Force dispatch of available work" def dispatch_available, do: Producer.dispatch_available() - # Status functions - - @doc "Check if the encoder is running (user intent)" - def running?, do: Producer.running?() - @doc "Check if the encoder is actively processing work" - def actively_running? do - # Simple: encoder is NOT available means it's busy encoding - not available?() - end + def actively_running?, do: not available?() @doc "Check if the encode GenServer is available" def available?, do: Encode.available?() @@ -41,7 +19,7 @@ defmodule Reencodarr.Encoder do @doc "Get the current state of the encoder pipeline" def status do %{ - running: running?(), + running: true, actively_running: actively_running?(), available: available?(), queue_count: Media.encoding_queue_count() diff --git a/lib/reencodarr/encoder/broadway.ex b/lib/reencodarr/encoder/broadway.ex index 2c6c48c1..cf59dd39 100644 --- a/lib/reencodarr/encoder/broadway.ex +++ b/lib/reencodarr/encoder/broadway.ex @@ -92,71 +92,36 @@ defmodule Reencodarr.Encoder.Broadway do ## Parameters * `vmaf` - VMAF struct containing id and video data - ## Examples - iex> vmaf = %{id: 1, video: %{path: "/path/to/video.mp4"}} - iex> Reencodarr.Encoder.Broadway.process_vmaf(vmaf) - :ok + @doc \""" + Process a VMAF - now handled automatically by Broadway polling. + This function is a no-op for backwards compatibility. """ - @spec process_vmaf(vmaf()) :: :ok | {:error, term()} - def process_vmaf(vmaf) do - case Producer.add_vmaf(vmaf) do - :ok -> :ok - {:error, reason} -> {:error, reason} - end - end + @spec process_vmaf(vmaf()) :: :ok + def process_vmaf(_vmaf), do: :ok @doc """ - Check if the encoder pipeline is running (not paused). - - ## Examples - iex> Reencodarr.Encoder.Broadway.running?() - true + Check if the encoder pipeline is running (always true now). """ @spec running?() :: boolean() - def running? do - with pid when is_pid(pid) <- Process.whereis(__MODULE__), - true <- Process.alive?(pid) do - Producer.running?() - else - _ -> false - end - end + def running?, do: true @doc """ - Pause the encoder pipeline. - - Pauses processing by updating the producer's state machine. - - ## Examples - iex> Reencodarr.Encoder.Broadway.pause() - :ok + Pause the encoder pipeline - no-op, pipelines always run now. """ - @spec pause() :: :ok | {:error, term()} - def pause do - Producer.pause() - end + @spec pause() :: :ok + def pause, do: :ok @doc """ - Resume the encoder pipeline. - - Resumes processing by updating the producer's state machine. - - ## Examples - iex> Reencodarr.Encoder.Broadway.resume() - :ok + Resume the encoder pipeline - no-op, pipelines always run now. """ - @spec resume() :: :ok | {:error, term()} - def resume do - Producer.resume() - end + @spec resume() :: :ok + def resume, do: :ok @doc """ - Start the encoder pipeline. - - Alias for `resume/0` to maintain API compatibility. + Start the encoder pipeline - alias for resume, no-op now. """ - @spec start() :: :ok | {:error, term()} - def start, do: resume() + @spec start() :: :ok + def start, do: :ok # Broadway callbacks @@ -227,16 +192,13 @@ defmodule Reencodarr.Encoder.Broadway do case classify_failure(:port_error) do {:pause, reason} -> Logger.error("Broadway: Critical failure for VMAF #{vmaf.id}: #{reason}") - Logger.error("Broadway: Pausing pipeline due to critical system issue") + Logger.error("Broadway: Critical system issue, but continuing (pipelines always run)") Logger.error("Broadway: Video path: #{vmaf.video.path}") # Notify about the failure notify_encoding_failure(vmaf.video, :port_error) - # Pause the pipeline - Producer.pause() - - # Return :ok to Broadway since we're handling the pause manually + # Return :ok to Broadway - we continue processing :ok end end @@ -303,7 +265,10 @@ defmodule Reencodarr.Encoder.Broadway do "Broadway: Critical failure for VMAF #{vmaf.id}: #{reason} (exit code: #{exit_code})" ) - Logger.error("Broadway: Pausing pipeline due to critical system issue - #{reason}") + Logger.error( + "Broadway: Critical system issue - #{reason}, but continuing (pipelines always run)" + ) + Logger.error("Broadway: Video path: #{vmaf.video.path}") # Build enhanced context for failure tracking @@ -317,11 +282,7 @@ defmodule Reencodarr.Encoder.Broadway do # Notify about the failure with enhanced context notify_encoding_failure(vmaf.video, exit_code, enhanced_context) - # Pause the pipeline - Logger.error("Broadway: PAUSING ENCODER - Critical failure with exit code #{exit_code}") - Producer.pause() - - # Still return :ok to Broadway since we're handling the pause manually + # Return :ok to Broadway - we continue processing :ok end diff --git a/lib/reencodarr/encoder/broadway/producer.ex b/lib/reencodarr/encoder/broadway/producer.ex index c4671678..40e4ddad 100644 --- a/lib/reencodarr/encoder/broadway/producer.ex +++ b/lib/reencodarr/encoder/broadway/producer.ex @@ -1,395 +1,69 @@ defmodule Reencodarr.Encoder.Broadway.Producer do @moduledoc """ - Broadway producer for encoding operations. - - This producer dispatches VMAFs for encoding only when the encoding - GenServer is available, preventing duplicate work and resource waste. + Simplest possible Broadway producer for encoding. + When demand arrives, check if Encode is available and return 1 VMAF if so. """ use GenStage require Logger - alias Reencodarr.Dashboard.Events + alias Reencodarr.AbAv1.Encode alias Reencodarr.Media - alias Reencodarr.PipelineStateMachine - - @broadway_name Reencodarr.Encoder.Broadway def start_link(opts) do GenStage.start_link(__MODULE__, opts, name: __MODULE__) end - # Public API for external control - def pause, do: send_to_producer(:pause) - def resume, do: send_to_producer(:resume) - def dispatch_available, do: send_to_producer(:dispatch_available) - def add_vmaf(vmaf), do: send_to_producer({:add_vmaf, vmaf}) - - # Alias for API compatibility - def start, do: resume() - - def running? do - case find_producer_process() do - nil -> - false - - producer_pid -> - GenStage.call(producer_pid, :running?, 1000) - end - end - - # Check if actively processing (for telemetry/progress updates) - def actively_running? do - case find_producer_process() do - nil -> - false - - producer_pid -> - GenStage.call(producer_pid, :actively_running?, 1000) + def dispatch_available do + case Process.whereis(__MODULE__) do + nil -> {:error, :producer_not_found} + pid -> send(pid, :poll) end end @impl GenStage def init(_opts) do - # Subscribe ONLY to unified Events channel for all completion events - Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) - - {:producer, - %{ - demand: 0, - pipeline: PipelineStateMachine.new(:encoder) - }} - end - - @impl GenStage - def handle_demand(demand, state) when demand > 0 do - Logger.debug( - "Producer: handle_demand called - new demand: #{demand}, current demand: #{state.demand}, total: #{state.demand + demand}" - ) - - # Always accumulate demand - we'll handle it when encoder becomes available - new_state = %{state | demand: state.demand + demand} - current_status = PipelineStateMachine.get_state(new_state.pipeline) - - if current_status == :processing do - # Already processing - keep demand for later - Logger.debug("Producer: handle_demand - currently processing, storing demand for later") - {:noreply, [], new_state} - else - Logger.debug("Producer: handle_demand - not processing, calling dispatch_if_ready") - dispatch_if_ready(new_state) - end - end - - @impl GenStage - def handle_call(:running?, _from, state) do - # Button should reflect user intent - not running if paused or pausing - current_status = PipelineStateMachine.get_state(state.pipeline) - running = PipelineStateMachine.running?(current_status) - {:reply, running, [], state} - end - - @impl GenStage - def handle_call(:actively_running?, _from, state) do - # For telemetry/progress - actively running if processing or pausing - # This allows progress to continue during pausing state - current_status = PipelineStateMachine.get_state(state.pipeline) - - actively_running = - PipelineStateMachine.actively_working?(current_status) or current_status == :pausing - - {:reply, actively_running, [], state} - end - - @impl GenStage - def handle_cast({:status_request, requester_pid}, state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - send(requester_pid, {:status_response, :encoder, current_status}) - {:noreply, [], state} - end - - @impl GenStage - def handle_cast(:broadcast_status, state) do - # Simple: check if encoder is actually processing - status = - if Reencodarr.Encoder.available?() do - :idle - else - :processing - end - - Events.broadcast_event(:service_status, %{service: :encoder, status: status}) - - {:noreply, [], state} - end - - @impl GenStage - def handle_cast(:pause, state) do - new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) - {:noreply, [], new_state} - end - - @impl GenStage - def handle_cast(:resume, state) do - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.resume/1)) - end - - @impl GenStage - def handle_cast({:add_vmaf, vmaf}, state) do - Logger.info("Adding VMAF to encoder: #{vmaf.id}") - # No manual queue management - just trigger dispatch to check database - # The VMAF should already be in the database with chosen=true state - dispatch_if_ready(state) - end - - @impl GenStage - def handle_cast(:dispatch_available, state) do - case PipelineStateMachine.get_state(state.pipeline) do - :pausing -> - {:noreply, [], - Map.update!(state, :pipeline, &PipelineStateMachine.transition_to(&1, :paused))} - - _ -> - dispatch_if_ready(Map.update!(state, :pipeline, &PipelineStateMachine.work_available/1)) - end - end - - @impl GenStage - def handle_info({:video_state_changed, video, :crf_searched}, state) do - # Video finished CRF search - if encoder is running, force dispatch even without demand - Logger.debug("[Encoder Producer] Received CRF searched video: #{video.path}") - force_dispatch_if_running(state) - end - - @impl GenStage - def handle_info({:video_state_changed, _video, _other_state}, state) do - # Ignore other state transitions - encoder only cares about :crf_searched - {:noreply, [], state} - end - - @impl GenStage - def handle_info({:vmaf_upserted, _vmaf}, state) do - # VMAF was created/updated - check if it's chosen and ready for encoding - dispatch_if_ready(state) - end - - @impl GenStage - def handle_info({:crf_search_vmaf_result, _data}, state) do - # CRF search results don't affect encoder - ignore - {:noreply, [], state} + # Poll every 2 seconds to check for new work + schedule_poll() + {:producer, %{}} end @impl GenStage - def handle_info({:encoding_completed, event_data}, state) do - # Encoding completed (success or failure), transition back to running - Logger.info( - "[Encoder Producer] *** RECEIVED ENCODING COMPLETION *** - VMAF: #{event_data.vmaf_id}, result: #{inspect(event_data.result)}, event: #{inspect(event_data)}" - ) - - current_status = PipelineStateMachine.get_state(state.pipeline) - - Logger.debug( - "[Encoder Producer] Current state before transition - status: #{current_status}, demand: #{state.demand}" - ) - - case current_status do - :processing -> - # Work completed while running - check for more work - updated_pipeline = - PipelineStateMachine.work_completed(state.pipeline, Media.encoding_queue_count() > 0) - - new_state = %{state | pipeline: updated_pipeline} - new_status = PipelineStateMachine.get_state(updated_pipeline) - - Logger.debug( - "[Encoder Producer] State after transition - status: #{new_status}, demand: #{new_state.demand}" - ) - - dispatch_if_ready(new_state) - - :pausing -> - # Work completed while pausing - transition to paused and stop - updated_pipeline = PipelineStateMachine.work_completed(state.pipeline, false) - new_state = %{state | pipeline: updated_pipeline} - - Logger.debug("[Encoder Producer] Pausing complete, now paused") - {:noreply, [], new_state} - - _ -> - # Already paused or other state - just acknowledge completion - Logger.debug("[Encoder Producer] Encoding completed in state #{current_status}, ignoring") - {:noreply, [], state} - end + def handle_demand(_demand, state) do + dispatch(state) end @impl GenStage - def handle_info(_msg, state) do - {:noreply, [], state} - end - - # Private functions - - defp send_to_producer(message) do - case find_producer_process() do - nil -> {:error, :producer_not_found} - producer_pid -> GenStage.cast(producer_pid, message) - end - end - - defp find_producer_process do - producer_supervisor_name = :"#{@broadway_name}.Broadway.ProducerSupervisor" - - with pid when is_pid(pid) <- Process.whereis(producer_supervisor_name), - children <- Supervisor.which_children(pid), - producer_pid when is_pid(producer_pid) <- find_actual_producer(children) do - producer_pid - else - _ -> nil - end - end - - defp find_actual_producer(children) do - Enum.find_value(children, fn {_id, pid, _type, _modules} -> - if is_pid(pid) and Process.alive?(pid) do - GenStage.call(pid, :running?, 1000) - pid + def handle_info(:poll, state) do + schedule_poll() + # Check if there's work and Encode is available, wake up Broadway if so + if Encode.available?() do + case Media.get_next_for_encoding(1) do + %Reencodarr.Media.Vmaf{} = vmaf -> {:noreply, [vmaf], state} + [%Reencodarr.Media.Vmaf{} = vmaf] -> {:noreply, [vmaf], state} + _ -> {:noreply, [], state} end - end) - end - - defp dispatch_if_ready(state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - - Logger.debug( - "Producer: dispatch_if_ready called - status: #{current_status}, demand: #{state.demand}" - ) - - if should_dispatch?(state) and state.demand > 0 do - Logger.debug("Producer: dispatch_if_ready - conditions met, dispatching VMAFs") - dispatch_vmafs(state) else - Logger.debug( - "Producer: dispatch_if_ready - conditions NOT met, not dispatching (should_dispatch: #{should_dispatch?(state)}, demand: #{state.demand})" - ) - - handle_no_dispatch_encoder(state) + {:noreply, [], state} end end - defp handle_no_dispatch_encoder(state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - - if PipelineStateMachine.available_for_work?(current_status) do - case get_next_vmaf_preview() do - nil -> - # No videos to process - transition to idle - updated_pipeline = PipelineStateMachine.transition_to(state.pipeline, :idle) - new_state = %{state | pipeline: updated_pipeline} - {:noreply, [], new_state} + @impl GenStage + def handle_info(_msg, state), do: {:noreply, [], state} - _vmaf -> - # VMAFs available but no demand or encoder service unavailable - {:noreply, [], state} + defp dispatch(state) do + if Encode.available?() do + case Media.get_next_for_encoding(1) do + %Reencodarr.Media.Vmaf{} = vmaf -> {:noreply, [vmaf], state} + [%Reencodarr.Media.Vmaf{} = vmaf] -> {:noreply, [vmaf], state} + [] -> {:noreply, [], state} + nil -> {:noreply, [], state} end else {:noreply, [], state} end end - defp should_dispatch?(state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - status_check = PipelineStateMachine.available_for_work?(current_status) - availability_check = encoding_available?() - result = status_check and availability_check - - Logger.debug( - "[Encoder Producer] should_dispatch? - status: #{current_status}, status_check: #{status_check}, availability_check: #{availability_check}, result: #{result}" - ) - - result - end - - defp encoding_available? do - case GenServer.whereis(Reencodarr.AbAv1.Encode) do - nil -> - Logger.debug("Producer: encoding_available? - Encode GenServer not found") - false - - pid -> - # Simplified - just check if process is alive, let work fail gracefully if busy - alive = Process.alive?(pid) - Logger.debug("Producer: encoding_available? - Encode GenServer alive: #{alive}") - alive - end - end - - defp dispatch_vmafs(state) do - # Only transition to processing if not already processing - current_status = PipelineStateMachine.get_state(state.pipeline) - - updated_pipeline = - if current_status != :processing do - Logger.info("[Encoder Producer] Transitioning to processing state from #{current_status}") - PipelineStateMachine.start_processing(state.pipeline) - else - Logger.info("[Encoder Producer] Already in processing state, skipping state transition") - state.pipeline - end - - updated_state = %{state | pipeline: updated_pipeline} - - # Get VMAF directly from database - case Media.get_next_for_encoding(1) do - # Handle case where a single VMAF is returned - %Reencodarr.Media.Vmaf{} = vmaf -> - Logger.debug( - "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, decrementing demand from #{state.demand} to #{state.demand - 1}" - ) - - final_state = %{updated_state | demand: state.demand - 1} - {:noreply, [vmaf], final_state} - - # Handle case where a list is returned - [vmaf | _] -> - Logger.debug( - "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, decrementing demand from #{state.demand} to #{state.demand - 1}" - ) - - final_state = %{updated_state | demand: state.demand - 1} - {:noreply, [vmaf], final_state} - - # Handle case where empty list or nil is returned - _ -> - # No VMAF available, transition to appropriate state - final_pipeline = PipelineStateMachine.transition_to(updated_state.pipeline, :idle) - final_state = %{updated_state | pipeline: final_pipeline} - {:noreply, [], final_state} - end - end - - # Helper to check if VMAFs are available without modifying state - defp get_next_vmaf_preview do - case Media.get_next_for_encoding(1) do - # Handle case where a single VMAF is returned - %Reencodarr.Media.Vmaf{} = vmaf -> vmaf - # Handle case where a list is returned - [vmaf | _] -> vmaf - # Handle case where an empty list is returned - [] -> nil - # Handle case where nil is returned - nil -> nil - end - end - - # Helper function to force dispatch when encoder is running - defp force_dispatch_if_running(state) do - current_status = PipelineStateMachine.get_state(state.pipeline) - - if PipelineStateMachine.available_for_work?(current_status) and get_next_vmaf_preview() != nil do - Logger.debug("[Encoder Producer] Force dispatching VMAF to wake up idle Broadway pipeline") - - dispatch_if_ready(state) - else - {:noreply, [], state} - end + defp schedule_poll do + Process.send_after(self(), :poll, 2000) end end diff --git a/lib/reencodarr_web/live/dashboard_live.ex b/lib/reencodarr_web/live/dashboard_live.ex index 4251e087..06d6dd8f 100644 --- a/lib/reencodarr_web/live/dashboard_live.ex +++ b/lib/reencodarr_web/live/dashboard_live.ex @@ -330,41 +330,35 @@ defmodule ReencodarrWeb.DashboardLive do {:noreply, socket} end - # Service control event handlers with direct inline logic + # Service control event handlers - pipelines always run, no start/pause @impl true def handle_event("start_analyzer", _params, socket) do - Reencodarr.Analyzer.Broadway.Producer.start() - {:noreply, put_flash(socket, :info, "Analysis started")} + {:noreply, put_flash(socket, :info, "Analyzer runs automatically")} end @impl true def handle_event("pause_analyzer", _params, socket) do - Reencodarr.Analyzer.Broadway.Producer.pause() - {:noreply, put_flash(socket, :info, "Analysis paused")} + {:noreply, put_flash(socket, :info, "Analyzer runs automatically")} end @impl true def handle_event("start_crf_searcher", _params, socket) do - CrfSearcherBroadway.resume() - {:noreply, put_flash(socket, :info, "CRF Search started")} + {:noreply, put_flash(socket, :info, "CRF Search runs automatically")} end @impl true def handle_event("pause_crf_searcher", _params, socket) do - CrfSearcherBroadway.pause() - {:noreply, put_flash(socket, :info, "CRF Search paused")} + {:noreply, put_flash(socket, :info, "CRF Search runs automatically")} end @impl true def handle_event("start_encoder", _params, socket) do - Reencodarr.Encoder.Broadway.Producer.start() - {:noreply, put_flash(socket, :info, "Encoding started")} + {:noreply, put_flash(socket, :info, "Encoder runs automatically")} end @impl true def handle_event("pause_encoder", _params, socket) do - Reencodarr.Encoder.Broadway.Producer.pause() - {:noreply, put_flash(socket, :info, "Encoding paused")} + {:noreply, put_flash(socket, :info, "Encoder runs automatically")} end @impl true diff --git a/test/reencodarr/broadway/state_management_test.exs b/test/reencodarr/broadway/state_management_test.exs deleted file mode 100644 index 4e40e2a8..00000000 --- a/test/reencodarr/broadway/state_management_test.exs +++ /dev/null @@ -1,242 +0,0 @@ -defmodule Reencodarr.Broadway.StateManagementTest do - use Reencodarr.UnitCase, async: true - - describe "Analyzer Broadway Producer state management" do - alias Reencodarr.Analyzer.Broadway.Producer.State - - test "initializes with correct default state" do - state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - assert state.demand == 0 - assert state.paused == false - assert state.pending_videos == [] - end - - test "updates demand correctly" do - initial_state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - updated_state = State.update(initial_state, demand: 5) - - assert updated_state.demand == 5 - # Other fields unchanged - assert updated_state.paused == false - end - - test "updates paused status correctly" do - initial_state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - paused_state = State.update(initial_state, paused: true) - - assert paused_state.paused == true - # Other fields unchanged - assert paused_state.demand == 0 - end - - test "updates pending videos correctly" do - initial_state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - video_info = %{path: "/test/video.mkv", service_id: "1", service_type: :sonarr} - updated_state = State.update(initial_state, pending_videos: [video_info]) - - assert length(updated_state.pending_videos) == 1 - assert hd(updated_state.pending_videos) == video_info - end - - test "updates multiple fields simultaneously" do - initial_state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - video_info = %{path: "/test/video.mkv", service_id: "1", service_type: :sonarr} - - updated_state = - State.update(initial_state, - demand: 3, - paused: true, - pending_videos: [video_info] - ) - - assert updated_state.demand == 3 - assert updated_state.paused == true - assert length(updated_state.pending_videos) == 1 - end - - test "preserves existing fields when updating others" do - initial_state = %State{ - demand: 0, - paused: false, - processing: false, - pending_videos: [] - } - - video1 = %{path: "/test/video1.mkv", service_id: "1", service_type: :sonarr} - video2 = %{path: "/test/video2.mkv", service_id: "2", service_type: :radarr} - - # Set initial state with some values - state_with_queue = - State.update(initial_state, - demand: 2, - pending_videos: [video1] - ) - - # Update only demand, queue should remain - state_updated_demand = State.update(state_with_queue, demand: 5) - - assert state_updated_demand.demand == 5 - assert length(state_updated_demand.pending_videos) == 1 - assert hd(state_updated_demand.pending_videos) == video1 - - # Update only queue, demand should remain - state_updated_queue = State.update(state_updated_demand, pending_videos: [video2]) - - assert state_updated_queue.demand == 5 - assert length(state_updated_queue.pending_videos) == 1 - assert hd(state_updated_queue.pending_videos) == video2 - end - end - - describe "Encoder Broadway Producer state management" do - test "state contains demand and pipeline fields" do - # Test basic state structure (Encoder uses plain map, not State struct) - state = %{ - demand: 1, - pipeline: %{} - } - - assert state.demand == 1 - assert is_map(state.pipeline) - end - - test "demand tracking works correctly" do - # Test demand management in state - state = %{ - demand: 0, - pipeline: %{} - } - - # Increment demand - updated_state = %{state | demand: state.demand + 5} - assert updated_state.demand == 5 - - # Decrement demand - final_state = %{updated_state | demand: updated_state.demand - 1} - assert final_state.demand == 4 - end - end - - describe "CRF Searcher Broadway Producer state management" do - test "state contains demand and pipeline fields" do - # Test basic state structure (CRF Searcher uses plain map, not State struct) - state = %{ - demand: 1, - pipeline: %{} - } - - assert state.demand == 1 - assert is_map(state.pipeline) - end - - test "single operation constraint" do - # CRF searcher should only allow one operation at a time - # This is enforced by the pipeline state machine and GenServer availability checks - state = %{ - demand: 5, - pipeline: %{} - } - - # Even with high demand, CRF search should only dispatch one at a time - # This is tested more thoroughly in integration tests - assert state.demand == 5 - end - end - - describe "Broadway pipeline error recovery" do - test "handles state recovery after restart" do - # Test basic state structure after pipeline restart - recovered_state = %{ - demand: 0, - pipeline: %{} - } - - assert recovered_state.demand == 0 - assert is_map(recovered_state.pipeline) - - # Should be able to handle demand when it arrives - state_with_demand = %{recovered_state | demand: 1} - assert state_with_demand.demand == 1 - end - - test "handles demand fluctuations correctly" do - state = %{ - demand: 0, - pipeline: %{} - } - - # Demand increases - state_with_demand = %{state | demand: 3} - assert state_with_demand.demand == 3 - - # Demand decreases but still positive - state_lower_demand = %{state_with_demand | demand: 1} - assert state_lower_demand.demand == 1 - - # Demand drops to zero - state_no_demand = %{state_lower_demand | demand: 0} - assert state_no_demand.demand == 0 - end - end - - describe "telemetry and monitoring integration" do - test "state changes should emit appropriate telemetry events" do - # This test would verify that state changes emit telemetry for monitoring - # In a real implementation, you'd test actual telemetry emission - - state_changes = [ - {:pause, "Pipeline paused for maintenance"}, - {:resume, "Pipeline resumed"}, - {:error, "Processing error occurred"}, - {:queue_full, "Queue at maximum capacity"} - ] - - Enum.each(state_changes, fn {event_type, message} -> - # Simulate telemetry emission - telemetry_event = %{ - event: event_type, - message: message, - timestamp: System.system_time(:microsecond) - } - - # Verify event structure - assert Map.has_key?(telemetry_event, :event) - assert Map.has_key?(telemetry_event, :message) - assert Map.has_key?(telemetry_event, :timestamp) - assert is_atom(telemetry_event.event) - assert is_binary(telemetry_event.message) - assert is_integer(telemetry_event.timestamp) - end) - end - end -end diff --git a/test/reencodarr/broadway_producers_test.exs b/test/reencodarr/broadway_producers_test.exs new file mode 100644 index 00000000..65a54601 --- /dev/null +++ b/test/reencodarr/broadway_producers_test.exs @@ -0,0 +1,121 @@ +defmodule Reencodarr.BroadwayProducersTest do + use Reencodarr.DataCase + import Reencodarr.Fixtures + + alias Reencodarr.Analyzer.Broadway.Producer, as: AnalyzerProducer + alias Reencodarr.CrfSearcher.Broadway.Producer, as: CrfProducer + alias Reencodarr.Encoder.Broadway.Producer, as: EncoderProducer + + describe "Analyzer Producer" do + test "handle_demand returns videos when available" do + {:ok, _video} = video_fixture(%{state: :needs_analysis}) + + {:producer, state} = AnalyzerProducer.init([]) + {:noreply, videos, _new_state} = AnalyzerProducer.handle_demand(1, state) + + # May return 0 or 1 videos depending on timing + assert is_list(videos) + assert length(videos) <= 1 + end + + test "handle_demand returns empty when no videos" do + {:producer, state} = AnalyzerProducer.init([]) + {:noreply, videos, _new_state} = AnalyzerProducer.handle_demand(1, state) + + assert videos == [] + end + + test "handle_demand respects max batch size" do + # Create 10 videos + for _ <- 1..10 do + video_fixture(%{state: :needs_analysis}) + end + + {:producer, state} = AnalyzerProducer.init([]) + {:noreply, videos, _new_state} = AnalyzerProducer.handle_demand(100, state) + + # Should return at most 5 videos (batch size limit) + assert length(videos) <= 5 + end + + test "poll wakes up Broadway when work available" do + {:ok, _video} = video_fixture(%{state: :needs_analysis}) + + {:producer, state} = AnalyzerProducer.init([]) + {:noreply, videos, _new_state} = AnalyzerProducer.handle_info(:poll, state) + + # Poll pushes at most 1 video to wake Broadway + assert is_list(videos) + assert length(videos) <= 1 + end + + test "poll returns empty when no work" do + {:producer, state} = AnalyzerProducer.init([]) + {:noreply, videos, _new_state} = AnalyzerProducer.handle_info(:poll, state) + + assert videos == [] + end + end + + describe "CRF Producer" do + test "handle_demand returns video when work exists" do + {:ok, _video} = video_fixture(%{state: :analyzed}) + + {:producer, state} = CrfProducer.init([]) + {:noreply, videos, _new_state} = CrfProducer.handle_demand(1, state) + + # May or may not return video depending on CrfSearch availability + assert is_list(videos) + end + + test "poll returns list when called" do + {:ok, _video} = video_fixture(%{state: :analyzed}) + + {:producer, state} = CrfProducer.init([]) + {:noreply, videos, _new_state} = CrfProducer.handle_info(:poll, state) + + assert is_list(videos) + end + end + + describe "Encoder Producer" do + test "handle_demand returns list" do + {:ok, video} = video_fixture(%{state: :crf_searched}) + _vmaf = vmaf_fixture(%{video_id: video.id, chosen: true}) + + {:producer, state} = EncoderProducer.init([]) + {:noreply, vmafs, _new_state} = EncoderProducer.handle_demand(1, state) + + assert is_list(vmafs) + end + + test "poll returns list when called" do + {:ok, video} = video_fixture(%{state: :crf_searched}) + _vmaf = vmaf_fixture(%{video_id: video.id, chosen: true}) + + {:producer, state} = EncoderProducer.init([]) + {:noreply, vmafs, _new_state} = EncoderProducer.handle_info(:poll, state) + + assert is_list(vmafs) + end + end + + describe "All Producers" do + test "all producers initialize with polling scheduled" do + assert {:producer, %{}} = AnalyzerProducer.init([]) + assert {:producer, %{}} = CrfProducer.init([]) + assert {:producer, %{}} = EncoderProducer.init([]) + end + + test "all producers handle unknown messages gracefully" do + {:producer, state} = AnalyzerProducer.init([]) + assert {:noreply, [], ^state} = AnalyzerProducer.handle_info(:unknown, state) + + {:producer, state} = CrfProducer.init([]) + assert {:noreply, [], ^state} = CrfProducer.handle_info(:unknown, state) + + {:producer, state} = EncoderProducer.init([]) + assert {:noreply, [], ^state} = EncoderProducer.handle_info(:unknown, state) + end + end +end From e42716d46cd9c8a47dc1db455421f1d2a42a4328 Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Wed, 29 Oct 2025 12:59:00 -0600 Subject: [PATCH 8/9] Handle DB busy without crashing: requeue CRF search port messages and retry encode exit notifications - Catch Exqlite.DB busy and DBConnection errors while processing CRF search output and requeue the original port message with a short backoff instead of crashing the GenServer. - For encoder, catch DB busy when notifying success/failure after process exit and schedule retry; only clear state when notification succeeds. --- lib/reencodarr/ab_av1/crf_search.ex | 31 ++++++++-- lib/reencodarr/ab_av1/encode.ex | 96 ++++++++++++++++++++++++----- 2 files changed, 109 insertions(+), 18 deletions(-) diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index 13896ad5..ce2d2d13 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -239,12 +239,35 @@ defmodule Reencodarr.AbAv1.CrfSearch do state ) do full_line = buffer <> line - process_line(full_line, video, args, target_vmaf) - # Add the line to our output buffer for failure tracking - new_output_buffer = [full_line | output_buffer] + try do + process_line(full_line, video, args, target_vmaf) - {:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}} + # Add the line to our output buffer for failure tracking + new_output_buffer = [full_line | output_buffer] + + {:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}} + rescue + e in Exqlite.Error -> + # Database is busy — don't crash. Requeue the same port message with a short delay. + Logger.warning( + "CrfSearch: Database busy while upserting VMAF, requeuing line: #{inspect(e)}" + ) + + # Re-send the exact same message after a short backoff to retry + Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) + + # Keep state intact — we'll retry later + {:noreply, state} + + e in DBConnection.ConnectionError -> + Logger.warning( + "CrfSearch: DB connection error while upserting VMAF, requeuing line: #{inspect(e)}" + ) + + Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) + {:noreply, state} + end end @impl true diff --git a/lib/reencodarr/ab_av1/encode.ex b/lib/reencodarr/ab_av1/encode.ex index 78b69604..39ed2bbc 100644 --- a/lib/reencodarr/ab_av1/encode.ex +++ b/lib/reencodarr/ab_av1/encode.ex @@ -137,23 +137,91 @@ defmodule Reencodarr.AbAv1.Encode do # Notify the Broadway producer that encoding is now available Producer.dispatch_available() - if result == {:ok, :success} do - notify_encoder_success(vmaf.video, output_file) - else - notify_encoder_failure(vmaf.video, exit_code) + # Attempt to notify success/failure, but don't crash if DB is busy — retry later + notify_result = + try do + if result == {:ok, :success} do + notify_encoder_success(vmaf.video, output_file) + else + notify_encoder_failure(vmaf.video, exit_code) + end + + :ok + rescue + e in Exqlite.Error -> + Logger.warning( + "Encode: Database busy while handling exit_status, scheduling retry: #{inspect(e)}" + ) + + # Retry after short backoff. Keep current state until retry completes. + Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 200) + :retry_scheduled + end + + case notify_result do + :ok -> + new_state = %{ + state + | port: :none, + video: :none, + vmaf: :none, + output_file: nil, + partial_line_buffer: "", + last_progress: nil + } + + {:noreply, new_state} + + :retry_scheduled -> + # Keep state intact so retry handler has data available + {:noreply, state} end + end - new_state = %{ - state - | port: :none, - video: :none, - vmaf: :none, - output_file: nil, - partial_line_buffer: "", - last_progress: nil - } + @impl true + def handle_info({:encoding_exit_retry, exit_code, vmaf, output_file}, state) do + Logger.info("Encode: retrying exit_status handling for VMAF #{vmaf.id}") + + retry_result = + try do + if exit_code == 0 do + notify_encoder_success(vmaf.video, output_file) + else + notify_encoder_failure(vmaf.video, exit_code) + end - {:noreply, new_state} + :ok + rescue + e in Exqlite.Error -> + Logger.warning( + "Encode: retry failed due to DB busy: #{inspect(e)}, scheduling another retry" + ) + + Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 500) + :retry_scheduled + end + + case retry_result do + :ok -> + # Clean up state now that notification succeeded + cleared_state = %{ + state + | port: :none, + video: :none, + vmaf: :none, + output_file: nil, + partial_line_buffer: "", + last_progress: nil + } + + # Ensure Broadway knows encoder is available + Producer.dispatch_available() + + {:noreply, cleared_state} + + :retry_scheduled -> + {:noreply, state} + end end # Catch-all for any other port messages From bb5b921fac45383080989d5fc2a7d8b18774e338 Mon Sep 17 00:00:00 2001 From: Mika Cohen Date: Wed, 29 Oct 2025 14:23:15 -0600 Subject: [PATCH 9/9] Address PR comments: fix nil dereference and add function documentation - Fix potential nil dereference in Helper.ex by using get_in/2 for safe nested access - Add clear documentation for perform_crf_search_cleanup/1 function clauses - Clarify when each cleanup function is called (with vs without video task) --- lib/reencodarr/ab_av1/crf_search.ex | 3 +++ lib/reencodarr/ab_av1/helper.ex | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index ce2d2d13..20766b2d 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -456,6 +456,8 @@ defmodule Reencodarr.AbAv1.CrfSearch do end # Private helper functions + + # Cleanup after CRF search when a video task was active - broadcasts completion event defp perform_crf_search_cleanup(%{current_task: %{video: video}} = state) do # Broadcast completion event via unified Events system Events.broadcast_event(:crf_search_completed, %{ @@ -467,6 +469,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do {:noreply, new_state} end + # Cleanup after CRF search when no video task was active - just reset state defp perform_crf_search_cleanup(state) do # No current task, just reset state new_state = %{state | port: :none, current_task: :none, partial_line_buffer: ""} diff --git a/lib/reencodarr/ab_av1/helper.ex b/lib/reencodarr/ab_av1/helper.ex index d1ff2ecd..5b848755 100644 --- a/lib/reencodarr/ab_av1/helper.ex +++ b/lib/reencodarr/ab_av1/helper.ex @@ -230,7 +230,7 @@ defmodule Reencodarr.AbAv1.Helper do if track["type"] == "video" do Logger.debug( - "Video track: codec=#{inspect(track["codec"])}, codec_id=#{inspect(track["properties"]["codec_id"])}, uid=#{inspect(track["properties"]["uid"])}, is_image=#{is_image}" + "Video track: codec=#{inspect(track["codec"])}, codec_id=#{inspect(get_in(track, ["properties", "codec_id"]))}, uid=#{inspect(get_in(track, ["properties", "uid"]))}, is_image=#{is_image}" ) end @@ -240,7 +240,7 @@ defmodule Reencodarr.AbAv1.Helper do defp image_video_track?(track) do track["type"] == "video" and (String.upcase(track["codec"] || "") in ["MJPEG", "PNG"] or - String.contains?(track["properties"]["codec_id"] || "", ["V_MS/VFW", "PNG"])) + String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW", "PNG"])) end defp delete_image_attachments(cleaned_path) do