diff --git a/.githooks/pre-commit b/.githooks/pre-commit index 30d72862..3222a516 100755 --- a/.githooks/pre-commit +++ b/.githooks/pre-commit @@ -1,15 +1,15 @@ #!/bin/sh # Save current staged changes -git stash push --keep-index --include-untracked -m "pre-commit-stash" +#git stash push --keep-index --include-untracked -m "pre-commit-stash" # Function to cleanup on exit cleanup() { EXIT_CODE=$? # Only pop stash if we created one - if [ -n "$(git stash list | grep "pre-commit-stash")" ]; then - git stash pop - fi + #if [ -n "$(git stash list | grep "pre-commit-stash")" ]; then + # git stash pop + #fi exit $EXIT_CODE } diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 971d7455..8848f6bd 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -83,7 +83,7 @@ Pre-commit hooks automatically enforce code quality: When adding new pipelines: 1. Create producer that checks GenServer availability (`crf_search_available?()` pattern) 2. Implement single concurrency to respect external tool limitations -3. Add telemetry events for dashboard updates +3. Use `Reencodarr.Dashboard.Events` to broadcast state changes to Dashboard V2 4. Update `application.ex` with test environment considerations ### Database Query Patterns diff --git a/.iex.exs b/.iex.exs index 18001d92..4b246567 100644 --- a/.iex.exs +++ b/.iex.exs @@ -6,7 +6,7 @@ alias Reencodarr.Media.{Video, Library, Vmaf} # Broadway pipelines and workers alias Reencodarr.{Analyzer, CrfSearcher, Encoder} -alias Reencodarr.Analyzer.{Broadway, QueueManager} +alias Reencodarr.Analyzer.Broadway alias Reencodarr.AbAv1.{CrfSearch, Encode} # State management diff --git a/assets/css/app.css b/assets/css/app.css index 39dfc94f..097d7ef2 100644 --- a/assets/css/app.css +++ b/assets/css/app.css @@ -2,9 +2,6 @@ @import "tailwindcss/components"; @import "tailwindcss/utilities"; -/* LCARS Interface Styling */ -@import "./lcars.css"; - /* This file is for your main application CSS */ /* Text shadow utility for better button text readability */ diff --git a/assets/css/lcars.css b/assets/css/lcars.css deleted file mode 100644 index 8083c7e8..00000000 --- a/assets/css/lcars.css +++ /dev/null @@ -1,218 +0,0 @@ -/* LCARS Interface Styling */ - -/* Custom LCARS fonts and animations */ -@import url('https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700;900&display=swap'); - -.lcars-font { - font-family: 'Orbitron', 'Courier New', monospace; -} - -/* LCARS Button Animations */ -.lcars-button { - position: relative; - overflow: hidden; -} - -.lcars-button::before { - content: ''; - position: absolute; - top: 0; - left: -100%; - width: 100%; - height: 100%; - background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.2), transparent); - transition: left 0.5s; -} - -.lcars-button:hover::before { - left: 100%; -} - -/* LCARS Panel Borders */ -.lcars-panel { - border-style: solid; - border-width: 2px; - border-image: linear-gradient(45deg, #ff6600, #ffcc00, #ff6600) 1; -} - -/* LCARS Status Indicators */ -.lcars-status-online { - animation: lcars-pulse 2s infinite; -} - -@keyframes lcars-pulse { - 0%, 50%, 100% { - opacity: 1; - } - 25%, 75% { - opacity: 0.5; - } -} - -/* LCARS Progress Bars */ -.lcars-progress { - background: linear-gradient(90deg, #ff6600 0%, #ffcc00 50%, #ff0000 100%); - animation: lcars-progress-glow 2s ease-in-out infinite alternate; -} - -@keyframes lcars-progress-glow { - from { - box-shadow: 0 0 5px rgba(255, 102, 0, 0.5); - } - to { - box-shadow: 0 0 20px rgba(255, 102, 0, 0.8); - } -} - -/* LCARS Screen Flicker Effect */ -.lcars-screen { - animation: lcars-flicker 0.1s infinite linear alternate; -} - -@keyframes lcars-flicker { - 0% { - opacity: 1; - } - 98% { - opacity: 1; - } - 99% { - opacity: 0.98; - } - 100% { - opacity: 1; - } -} - -/* LCARS Scanning Lines */ -.lcars-scan-lines { - background-image: - repeating-linear-gradient( - 0deg, - transparent, - transparent 2px, - rgba(255, 102, 0, 0.03) 2px, - rgba(255, 102, 0, 0.03) 4px - ); -} - -/* LCARS Text Styles */ -.lcars-text-primary { - color: #ff6600; - text-shadow: 0 0 10px rgba(255, 102, 0, 0.5); -} - -.lcars-text-secondary { - color: #ffcc99; - text-shadow: 0 0 5px rgba(255, 204, 153, 0.3); -} - -.lcars-text-alert { - color: #ff0000; - text-shadow: 0 0 10px rgba(255, 0, 0, 0.5); - animation: lcars-blink 1s infinite; -} - -@keyframes lcars-blink { - 0%, 50% { - opacity: 1; - } - 51%, 100% { - opacity: 0.3; - } -} - -/* LCARS Rounded Corners */ -.lcars-corner-tl { - border-top-left-radius: 50px; -} - -.lcars-corner-tr { - border-top-right-radius: 50px; -} - -.lcars-corner-bl { - border-bottom-left-radius: 50px; -} - -.lcars-corner-br { - border-bottom-right-radius: 50px; -} - -/* LCARS Gradient Borders */ -.lcars-border-gradient { - position: relative; -} - -.lcars-border-gradient::before { - content: ''; - position: absolute; - top: -2px; - left: -2px; - right: -2px; - bottom: -2px; - background: linear-gradient(45deg, #ff6600, #ffcc00, #ff0000, #ff6600); - border-radius: inherit; - z-index: -1; -} - -/* LCARS Data Stream Effect */ -.lcars-data-stream { - background: linear-gradient(90deg, - transparent 0%, - rgba(255, 102, 0, 0.1) 25%, - rgba(255, 204, 0, 0.2) 50%, - rgba(255, 102, 0, 0.1) 75%, - transparent 100% - ); - background-size: 200% 100%; - animation: lcars-stream 3s linear infinite; -} - -@keyframes lcars-stream { - 0% { - background-position: -200% 0; - } - 100% { - background-position: 200% 0; - } -} - -/* LCARS Typography */ -.lcars-title { - font-family: 'Orbitron', monospace; - font-weight: 900; - letter-spacing: 0.2em; - text-transform: uppercase; -} - -.lcars-label { - font-family: 'Orbitron', monospace; - font-weight: 700; - letter-spacing: 0.15em; - text-transform: uppercase; -} - -.lcars-data { - font-family: 'Courier New', monospace; - font-weight: 400; - letter-spacing: 0.05em; -} - -/* Responsive LCARS adjustments */ -@media (max-width: 768px) { - .lcars-corner-tl, - .lcars-corner-tr, - .lcars-corner-bl, - .lcars-corner-br { - border-radius: 25px; - } - - .lcars-title { - font-size: 1.5rem; - } - - .lcars-label { - font-size: 0.875rem; - } -} diff --git a/docker-compose.yml b/docker-compose.yml index 2d9622f3..015c0ed6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,28 +6,15 @@ services: ports: - "4000:4000" environment: - - DATABASE_URL=ecto://postgres:postgres@db:5432/reencodarr_prod + # SQLite database - file stored in mounted volume + - DATABASE_PATH=/app/data/reencodarr_prod.db - SECRET_KEY_BASE=your-secret-key-base-here-change-this-in-production - PHX_SERVER=true - PORT=4000 - depends_on: - - db volumes: - ./media:/app/media # Mount for media files processing - restart: unless-stopped - - db: - image: postgres:15 - environment: - - POSTGRES_DB=reencodarr_prod - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgres - volumes: - - postgres_data:/var/lib/postgresql/data - - ./priv/repo/structure.sql:/docker-entrypoint-initdb.d/structure.sql - ports: - - "5432:5432" + - ./data:/app/data # SQLite database storage restart: unless-stopped volumes: - postgres_data: + # No database service needed - SQLite is embedded diff --git a/docs/distributed_architecture.md b/docs/distributed_architecture.md index 062ae614..b9522b79 100644 --- a/docs/distributed_architecture.md +++ b/docs/distributed_architecture.md @@ -1,4 +1,14 @@ -# Reencodarr Distributed Architecture +# Reencodarr Distributed Architecture (Future Design) + +> **⚠️ NOTE**: This document describes a **future architectural vision** for Reencodarr, not the current implementation. +> +> **Current Architecture (as of October 2025)**: +> - Single monolithic application +> - SQLite database with WAL mode for concurrency +> - Broadway pipelines for processing coordination +> - All services run on a single node +> +> **This Document**: Outlines a proposed distributed client-server architecture for future scalability. ## Overview @@ -13,7 +23,7 @@ Currently, Reencodarr runs as a single application with three main pipelines: 3. **Encoder** - Re-encodes videos with chosen parameters All components share: -- Database (PostgreSQL with video metadata, VMAF results) +- Database (SQLite with video metadata, VMAF results) - File system (original videos, temporary files) - Web interface (Phoenix LiveView dashboard) - Service integrations (Sonarr/Radarr APIs) diff --git a/lib/mix/tasks/capture_encoding_output.ex b/lib/mix/tasks/capture_encoding_output.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/mix/tasks/restore.ex b/lib/mix/tasks/restore.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/ab_av1/crf_search.ex b/lib/reencodarr/ab_av1/crf_search.ex index 2009ed29..8ad006e1 100644 --- a/lib/reencodarr/ab_av1/crf_search.ex +++ b/lib/reencodarr/ab_av1/crf_search.ex @@ -117,50 +117,29 @@ defmodule Reencodarr.AbAv1.CrfSearch do end end - # Test helpers - only available in test environment - if Mix.env() == :test do - def has_preset_6_params?(params), do: has_preset_6_params_private(params) - def should_retry_with_preset_6(video_id), do: should_retry_with_preset_6_private(video_id) - - def build_crf_search_args_with_preset_6(video, vmaf_percent), - do: build_crf_search_args_with_preset_6_private(video, vmaf_percent) - - def build_crf_search_args(video, vmaf_percent), - do: build_crf_search_args_private(video, vmaf_percent) - - # Legacy test function names for backward compatibility - def should_retry_with_preset_6_for_test(video_id), do: should_retry_with_preset_6(video_id) - - def build_crf_search_args_with_preset_6_for_test(video, vmaf_percent), - do: build_crf_search_args_with_preset_6(video, vmaf_percent) - - def build_crf_search_args_for_test(video, vmaf_percent), - do: build_crf_search_args(video, vmaf_percent) - - # Private helper functions for tests - defp has_preset_6_params_private(params) when is_list(params) do - # Check for adjacent --preset and 6 in a flat list - case check_for_preset_6_in_flat_list(params) do - true -> - true - - false -> - # Also check for tuple format - Enum.any?(params, fn - {flag, value} -> flag == "--preset" and value == "6" - _ -> false - end) - end + # Public test helpers - no wrappers, just make functions public + def has_preset_6_params?(params) when is_list(params) do + # Check for adjacent --preset and 6 in a flat list + case check_for_preset_6_in_flat_list(params) do + true -> + true + + false -> + # Also check for tuple format + Enum.any?(params, fn + {flag, value} -> flag == "--preset" and value == "6" + _ -> false + end) end + end - defp has_preset_6_params_private(_), do: false + def has_preset_6_params?(_), do: false - # Helper to check for --preset 6 in flat list format - defp check_for_preset_6_in_flat_list([]), do: false - defp check_for_preset_6_in_flat_list([_]), do: false - defp check_for_preset_6_in_flat_list(["--preset", "6" | _]), do: true - defp check_for_preset_6_in_flat_list([_ | rest]), do: check_for_preset_6_in_flat_list(rest) - end + # Helper to check for --preset 6 in flat list format + defp check_for_preset_6_in_flat_list([]), do: false + defp check_for_preset_6_in_flat_list([_]), do: false + defp check_for_preset_6_in_flat_list(["--preset", "6" | _]), do: true + defp check_for_preset_6_in_flat_list([_ | rest]), do: check_for_preset_6_in_flat_list(rest) # GenServer callbacks @impl true @@ -170,7 +149,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do @impl true def handle_cast({:crf_search, video, vmaf_percent}, %{port: :none} = state) do - args = build_crf_search_args_private(video, vmaf_percent) + args = build_crf_search_args(video, vmaf_percent) new_state = %{ state @@ -191,7 +170,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do def handle_cast({:crf_search_with_preset_6, video, vmaf_percent}, %{port: :none} = state) do Logger.info("CrfSearch: Starting retry with --preset 6 for video #{video.id}") - args = build_crf_search_args_with_preset_6_private(video, vmaf_percent) + args = build_crf_search_args_with_preset_6(video, vmaf_percent) new_state = %{ state @@ -355,7 +334,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do defp handle_crf_search_failure(video, target_vmaf, exit_code, command_line, full_output, state) do # Preset 6 retry is disabled - go straight to marking as failed - case should_retry_with_preset_6_private(video.id) do + case should_retry_with_preset_6(video.id) do :mark_failed -> handle_mark_failed(video, target_vmaf, exit_code, command_line, full_output, state) end @@ -453,8 +432,6 @@ defmodule Reencodarr.AbAv1.CrfSearch do {:noreply, new_state} end - # Removed append_decimal_before_float - no longer needed since we parse types early - def process_line(line, video, args, target_vmaf \\ 95) do handlers = [ &handle_encoding_sample_line/2, @@ -647,7 +624,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do # Check if we should retry with --preset 6 (disabled - always mark as failed) Logger.debug("CrfSearch: About to check retry logic for video #{video.id}") - retry_result = should_retry_with_preset_6_private(video.id) + retry_result = should_retry_with_preset_6(video.id) Logger.info("CrfSearch: Retry result: #{inspect(retry_result)}") case retry_result do @@ -718,7 +695,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do end end - defp build_crf_search_args_private(video, vmaf_percent) do + def build_crf_search_args(video, vmaf_percent) do base_args = [ "crf-search", "--input", @@ -738,7 +715,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do end # Build CRF search args with --preset 6 added - defp build_crf_search_args_with_preset_6_private(video, vmaf_percent) do + def build_crf_search_args_with_preset_6(video, vmaf_percent) do base_args = [ "crf-search", "--input", @@ -861,9 +838,6 @@ defmodule Reencodarr.AbAv1.CrfSearch do filename: progress[:filename] && Path.basename(progress[:filename]) }) - # Also broadcast that CRF searcher is running when progress is sent - Events.broadcast_event(:crf_searcher_started, %{}) - # Update cache update_last_progress(filename, progress) end @@ -1048,7 +1022,7 @@ defmodule Reencodarr.AbAv1.CrfSearch do do: calculate_savings(percent, video_size) # Determine if we should retry with preset 6 based on video ID - defp should_retry_with_preset_6_private(video_id) do + def should_retry_with_preset_6(video_id) do import Ecto.Query # Get existing VMAF records for this video diff --git a/lib/reencodarr/ab_av1/crf_search/command_builder.ex b/lib/reencodarr/ab_av1/crf_search/command_builder.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/ab_av1/crf_search/line_handler.ex b/lib/reencodarr/ab_av1/crf_search/line_handler.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/ab_av1/crf_search/retry_logic.ex b/lib/reencodarr/ab_av1/crf_search/retry_logic.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/ab_av1/crf_search/vmaf_manager.ex b/lib/reencodarr/ab_av1/crf_search/vmaf_manager.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/ab_av1/progress_parser.ex b/lib/reencodarr/ab_av1/progress_parser.ex index 14762207..521a9e19 100644 --- a/lib/reencodarr/ab_av1/progress_parser.ex +++ b/lib/reencodarr/ab_av1/progress_parser.ex @@ -37,9 +37,6 @@ defmodule Reencodarr.AbAv1.ProgressParser do filename: progress.filename }) - # Also broadcast that encoder is running when progress is sent - Events.broadcast_event(:encoder_started, %{}) - :ok {:unmatched, line} -> diff --git a/lib/reencodarr/analyzer.ex b/lib/reencodarr/analyzer.ex index 13d1378b..8311d233 100644 --- a/lib/reencodarr/analyzer.ex +++ b/lib/reencodarr/analyzer.ex @@ -50,15 +50,4 @@ defmodule Reencodarr.Analyzer do @doc "Get next videos in the analysis queue" def next_videos(limit \\ 10), do: Media.get_videos_needing_analysis(limit) - - # Debug functions - - @doc "Get detailed analyzer debug information" - def debug_info do - %{ - status: status(), - next_videos: next_videos(5), - pipeline_state: Producer.debug_status() - } - end end diff --git a/lib/reencodarr/analyzer/broadway.ex b/lib/reencodarr/analyzer/broadway.ex index 74228900..90e30290 100644 --- a/lib/reencodarr/analyzer/broadway.ex +++ b/lib/reencodarr/analyzer/broadway.ex @@ -28,10 +28,12 @@ defmodule Reencodarr.Analyzer.Broadway do @default_batch_timeout 25 @default_mediainfo_batch_size 5 @default_processing_timeout :timer.minutes(5) - # Conservative start - @initial_rate_limit_messages 500 @rate_limit_interval 1000 - @max_db_retry_attempts 3 + # Retry many times for database busy - SQLite WAL mode handles concurrency well + @max_db_retry_attempts 50 + + # Rate limiting values + @running_rate_limit 500 @doc """ Start the Broadway pipeline. @@ -43,7 +45,8 @@ defmodule Reencodarr.Analyzer.Broadway do module: {Producer, []}, transformer: {__MODULE__, :transform, []}, rate_limiting: [ - allowed_messages: @initial_rate_limit_messages, + # Use normal rate limiting - pause/resume controlled by producer state + allowed_messages: @running_rate_limit, interval: @rate_limit_interval ] ], @@ -106,6 +109,8 @@ defmodule Reencodarr.Analyzer.Broadway do @doc """ Pause the analyzer. + + Pauses processing by updating the producer's state machine. """ def pause do Producer.pause() @@ -113,6 +118,8 @@ defmodule Reencodarr.Analyzer.Broadway do @doc """ Resume the analyzer. + + Resumes processing by updating the producer's state machine. """ def resume do Producer.resume() @@ -207,9 +214,6 @@ defmodule Reencodarr.Analyzer.Broadway do total: current_queue_length + 1, percent: percent }) - - # Also broadcast that analyzer is running when progress is sent - Events.broadcast_event(:analyzer_started, %{}) end # Note: Don't send progress events if queue is empty or no throughput @@ -465,16 +469,13 @@ defmodule Reencodarr.Analyzer.Broadway do case {upsert_results, successful_data} do {[], [_ | _]} -> - Logger.error("Broadway: Batch upsert failed after retries, marking all videos as failed") - - Enum.each(successful_data, fn {video_info, _attrs} -> - mark_video_as_failed( - video_info.path, - "database busy - batch upsert failed after retries" - ) - end) + Logger.warning( + "Broadway: Batch upsert failed after #{@max_db_retry_attempts} retries due to database busy. " <> + "Broadway will retry the batch automatically." + ) - {:error, "batch upsert failed after retries"} + # Return error to trigger Broadway retry - don't mark as failed for DB busy + {:error, :database_busy_retry_later} _ -> {:ok, upsert_results} @@ -716,7 +717,9 @@ defmodule Reencodarr.Analyzer.Broadway do error in [Exqlite.Error] -> case error.message do "Database busy" -> - wait_time = (:math.pow(2, attempt) * 100) |> round() + # Exponential backoff with max cap at 10 seconds + base_wait = (:math.pow(2, attempt) * 100) |> round() + wait_time = min(base_wait, 10_000) Logger.warning( "Database busy on attempt #{attempt + 1}/#{max_retries}, retrying in #{wait_time}ms" diff --git a/lib/reencodarr/analyzer/broadway/producer.ex b/lib/reencodarr/analyzer/broadway/producer.ex index f2d9ed89..d9014989 100644 --- a/lib/reencodarr/analyzer/broadway/producer.ex +++ b/lib/reencodarr/analyzer/broadway/producer.ex @@ -116,14 +116,28 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do @impl GenStage def handle_cast(:broadcast_status, state) do - p = state.pipeline - Events.pipeline_state_changed(p.service, p.current_state, p.current_state) + # Broadcast actual current status to dashboard + current_state = PipelineStateMachine.get_state(state.pipeline) + + # Map pipeline state to dashboard status + status = + case current_state do + :processing -> :processing + :paused -> :paused + :running -> :running + _ -> :stopped + end + + # Broadcast as service_status event with the actual state + Events.broadcast_event(:service_status, %{service: :analyzer, status: status}) + {:noreply, [], state} end @impl GenStage def handle_cast(:pause, state) do - {:noreply, [], Map.update!(state, :pipeline, &PipelineStateMachine.pause/1)} + new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) + {:noreply, [], new_state} end @impl GenStage @@ -205,8 +219,6 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do def handle_info(:initial_dispatch, state) do # Trigger initial dispatch after startup to check for videos needing analysis Logger.debug("Producer: Initial dispatch triggered") - # Broadcast initial queue state so UI shows correct count on startup - broadcast_queue_state() dispatch_if_ready(state) end @@ -289,7 +301,6 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do defp handle_auto_start(state) do Logger.info("Auto-starting analyzer - videos available for processing") - :telemetry.execute([:reencodarr, :analyzer, :started], %{}, %{}) # Send to Dashboard using Events system Events.broadcast_event(:analyzer_started, %{}) @@ -315,9 +326,7 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do defp handle_resume_from_idle(state) do Logger.info("Analyzer resuming from idle - videos available for processing") - # Send to Dashboard V2 - alias Reencodarr.Dashboard.Events - Events.broadcast_event(:analyzer_started, %{}) + # Send to Dashboard V2 using Events system # Start with minimal progress to indicate activity Events.broadcast_event(:analyzer_progress, %{ count: 0, @@ -367,31 +376,6 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do end end - defp broadcast_queue_state do - # Get next videos for UI display from database - next_videos = Media.get_videos_needing_analysis(10) - - # Format for UI display - formatted_videos = - Enum.map(next_videos, fn video -> - %{ - path: video.path, - service_id: video.service_id || "unknown" - } - end) - - # Emit telemetry event that the UI expects - measurements = %{ - queue_size: Media.count_videos_needing_analysis() - } - - metadata = %{ - next_videos: formatted_videos - } - - :telemetry.execute([:reencodarr, :analyzer, :queue_changed], measurements, metadata) - end - defp dispatch_videos(state) do # Get videos from the database up to demand videos = Media.get_videos_needing_analysis(state.demand) @@ -415,8 +399,6 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do | pipeline: PipelineStateMachine.transition_to(state.pipeline, :idle) } - # Broadcast queue state when going idle - broadcast_queue_state() {:noreply, [], new_state} else Logger.debug("No videos available for dispatch, keeping demand: #{state.demand}") @@ -429,139 +411,10 @@ defmodule Reencodarr.Analyzer.Broadway.Producer do new_demand = state.demand - length(videos) new_state = State.update(state, demand: new_demand) - # Always broadcast queue state when dispatching videos - broadcast_queue_state() - {:noreply, videos, new_state} end end - @doc """ - Debug function to check Broadway pipeline and producer status - """ - def debug_status do - broadway_name = Reencodarr.Analyzer.Broadway - - case Process.whereis(broadway_name) do - nil -> - IO.puts("❌ Broadway pipeline not found") - {:error, :broadway_not_found} - - broadway_pid -> - IO.puts("✅ Broadway pipeline found: #{inspect(broadway_pid)}") - debug_producer_supervisor(broadway_name) - end - end - - defp debug_producer_supervisor(broadway_name) do - producer_supervisor_name = :"#{broadway_name}.Broadway.ProducerSupervisor" - - case Process.whereis(producer_supervisor_name) do - nil -> - IO.puts("❌ Producer supervisor not found") - {:error, :producer_supervisor_not_found} - - producer_supervisor_pid -> - IO.puts("✅ Producer supervisor found: #{inspect(producer_supervisor_pid)}") - debug_producer_children(producer_supervisor_pid) - end - end - - defp debug_producer_children(producer_supervisor_pid) do - children = Supervisor.which_children(producer_supervisor_pid) - IO.puts("Producer supervisor children: #{inspect(children)}") - - case find_actual_producer(children) do - nil -> - IO.puts("❌ Producer process not found in supervision tree") - {:error, :producer_process_not_found} - - producer_pid -> - IO.puts("✅ Producer process found: #{inspect(producer_pid)}") - get_producer_state(producer_pid) - end - end - - # Helper to get and display producer state - defp get_producer_state(producer_pid) do - state = GenStage.call(producer_pid, :get_state, 1000) - - IO.puts( - "State: demand=#{state.demand}, status=#{PipelineStateMachine.get_state(state.pipeline)}, queue_size=#{length(state.manual_queue)}" - ) - - case state.manual_queue do - [] -> - :ok - - videos -> - IO.puts("Manual queue contents:") - Enum.each(videos, fn video -> IO.puts(" - #{video.path}") end) - end - - # Get up to 5 videos from queue or database for batching - case get_next_videos(state, min(state.demand, 5)) do - {[], new_state} -> - Logger.debug("No videos available, resetting processing flag") - # No videos available, reset processing flag - {:noreply, [], %{new_state | processing: false}} - - {videos, new_state} -> - video_count = length(videos) - Logger.debug("Dispatching #{video_count} videos for analysis") - # Decrement demand and mark as processing - final_state = %{ - new_state - | demand: state.demand - video_count, - pipeline: PipelineStateMachine.transition_to(new_state.pipeline, :processing) - } - - # Broadcast status change to dashboard - Events.broadcast_event(:analyzer_started, %{}) - - Logger.debug( - "Final state: status: #{PipelineStateMachine.get_state(final_state.pipeline)}, demand: #{final_state.demand}" - ) - - {:noreply, videos, final_state} - end - end - - defp get_next_videos(state, max_count) do - # First, get videos from the manual queue - {queue_videos, remaining_queue} = take_from_queue(state.queue, max_count) - new_state = %{state | queue: remaining_queue} - - remaining_needed = max_count - length(queue_videos) - - if remaining_needed > 0 do - # Get additional videos from database - db_videos = Media.get_videos_needing_analysis(remaining_needed) - all_videos = queue_videos ++ db_videos - {all_videos, new_state} - else - {queue_videos, new_state} - end - end - - defp take_from_queue(queue, max_count) do - take_from_queue(queue, max_count, []) - end - - defp take_from_queue(queue, 0, acc) do - {Enum.reverse(acc), queue} - end - - defp take_from_queue(queue, count, acc) when count > 0 do - case :queue.out(queue) do - {{:value, video}, remaining_queue} -> - take_from_queue(remaining_queue, count - 1, [video | acc]) - - {:empty, _queue} -> - {Enum.reverse(acc), queue} - end - end - # Debug helper function to check video states defp debug_video_states(videos) do Enum.each(videos, fn video_info -> diff --git a/lib/reencodarr/analyzer/core/concurrency_manager.ex b/lib/reencodarr/analyzer/core/concurrency_manager.ex index 9baf5fa8..81c9bbad 100644 --- a/lib/reencodarr/analyzer/core/concurrency_manager.ex +++ b/lib/reencodarr/analyzer/core/concurrency_manager.ex @@ -145,7 +145,10 @@ defmodule Reencodarr.Analyzer.Core.ConcurrencyManager do end defp get_storage_performance_tier do - PerformanceMonitor.get_storage_performance_tier() + case Process.whereis(PerformanceMonitor) do + nil -> :unknown + _pid -> PerformanceMonitor.get_storage_performance_tier() + end end defp get_min_concurrency do diff --git a/lib/reencodarr/analyzer/queue_manager.ex b/lib/reencodarr/analyzer/queue_manager.ex deleted file mode 100644 index 57376747..00000000 --- a/lib/reencodarr/analyzer/queue_manager.ex +++ /dev/null @@ -1,100 +0,0 @@ -defmodule Reencodarr.Analyzer.QueueManager do - @moduledoc """ - Manages analyzer queue state and broadcasts updates. - - This GenServer subscribes to analyzer queue events and maintains - the current queue state in memory, providing fast access for - dashboard and statistics without polling Broadway directly. - - Follows idiomatic OTP patterns with proper state management - and PubSub for decoupled communication. - """ - - use GenServer - require Logger - - alias Reencodarr.Dashboard.Events - - @queue_topic "analyzer_queue" - - defstruct queue: [], count: 0 - - ## Client API - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Get the current analyzer queue for dashboard display. - """ - def get_queue do - case GenServer.whereis(__MODULE__) do - nil -> - {:error, :not_started} - - pid when is_pid(pid) -> - if Process.alive?(pid) do - {:ok, GenServer.call(__MODULE__, :get_queue)} - else - {:error, :not_alive} - end - - _ -> - {:error, :not_available} - end - end - - @doc """ - Get the current queue count. - """ - def get_count do - GenServer.call(__MODULE__, :get_count) - end - - @doc """ - Broadcast a queue update (called by Broadway producer). - """ - def broadcast_queue_update(queue_items) do - Events.broadcast_event( - :analyzer_queue_updated, - %{queue_items: queue_items} - ) - end - - ## Server Implementation - - @impl GenServer - def init(_opts) do - # Subscribe to queue updates - Phoenix.PubSub.subscribe(Reencodarr.PubSub, @queue_topic) - - Logger.info("Analyzer QueueManager started") - {:ok, %__MODULE__{}} - end - - @impl GenServer - def handle_call(:get_queue, _from, state) do - {:reply, state.queue, state} - end - - @impl GenServer - def handle_call(:get_count, _from, state) do - {:reply, state.count, state} - end - - @impl GenServer - def handle_info({:analyzer_queue_updated, queue_items}, _state) do - new_state = %{ - queue: queue_items, - count: length(queue_items) - } - - {:noreply, new_state} - end - - @impl GenServer - def handle_info(_msg, state) do - {:noreply, state} - end -end diff --git a/lib/reencodarr/analyzer/supervisor.ex b/lib/reencodarr/analyzer/supervisor.ex index 6d642831..bc4e1a51 100644 --- a/lib/reencodarr/analyzer/supervisor.ex +++ b/lib/reencodarr/analyzer/supervisor.ex @@ -10,9 +10,6 @@ defmodule Reencodarr.Analyzer.Supervisor do @impl true def init(:ok) do children = [ - # Start the QueueManager to track analyzer queue state - {Reencodarr.Analyzer.QueueManager, []}, - # Start the Broadway pipeline {Reencodarr.Analyzer.Broadway, []} ] diff --git a/lib/reencodarr/broadway_config.ex b/lib/reencodarr/broadway_config.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/crf_searcher.ex b/lib/reencodarr/crf_searcher.ex index d16131f4..dc06fd80 100644 --- a/lib/reencodarr/crf_searcher.ex +++ b/lib/reencodarr/crf_searcher.ex @@ -59,15 +59,4 @@ defmodule Reencodarr.CrfSearcher do @doc "Get next videos in the CRF search queue" def next_videos(limit \\ 10), do: Media.get_videos_for_crf_search(limit) - - # Debug functions - - @doc "Get detailed CRF searcher debug information" - def debug_info do - %{ - status: status(), - next_videos: next_videos(5), - genserver_available: available?() - } - end end diff --git a/lib/reencodarr/crf_searcher/broadway.ex b/lib/reencodarr/crf_searcher/broadway.ex index bf72b676..acc52a85 100644 --- a/lib/reencodarr/crf_searcher/broadway.ex +++ b/lib/reencodarr/crf_searcher/broadway.ex @@ -61,6 +61,7 @@ defmodule Reencodarr.CrfSearcher.Broadway do module: {Producer, []}, transformer: {__MODULE__, :transform, []}, rate_limiting: [ + # Use normal rate limiting - pause/resume controlled by producer state allowed_messages: config[:rate_limit_messages], interval: config[:rate_limit_interval] ] @@ -78,7 +79,11 @@ defmodule Reencodarr.CrfSearcher.Broadway do concurrency: 1 ] ], - context: %{crf_quality: config[:crf_quality]} + context: %{ + crf_quality: config[:crf_quality], + rate_limit_messages: config[:rate_limit_messages], + rate_limit_interval: config[:rate_limit_interval] + } ) end @@ -121,6 +126,8 @@ defmodule Reencodarr.CrfSearcher.Broadway do @doc """ Pause the CRF searcher pipeline. + Pauses processing by updating the producer's state machine. + ## Examples iex> Reencodarr.CrfSearcher.Broadway.pause() :ok @@ -133,6 +140,8 @@ defmodule Reencodarr.CrfSearcher.Broadway do @doc """ Resume the CRF searcher pipeline. + Resumes processing by updating the producer's state machine. + ## Examples iex> Reencodarr.CrfSearcher.Broadway.resume() :ok @@ -194,25 +203,12 @@ defmodule Reencodarr.CrfSearcher.Broadway do defp process_video_crf_search(video, crf_quality) do Logger.debug("Starting CRF search for video #{video.id}: #{video.path}") - # Emit telemetry event for monitoring - :telemetry.execute( - [:reencodarr, :crf_search, :start], - %{}, - %{video_id: video.id, video_path: video.path} - ) - # AbAv1.crf_search/2 always returns :ok since it's a GenServer.cast # The actual success/failure is handled by the GenServer :ok = AbAv1.crf_search(video, crf_quality) Logger.debug("CRF search queued successfully for video #{video.id}") - :telemetry.execute( - [:reencodarr, :crf_search, :success], - %{}, - %{video_id: video.id} - ) - :ok rescue exception -> @@ -221,12 +217,6 @@ defmodule Reencodarr.CrfSearcher.Broadway do Logger.error(error_message) - :telemetry.execute( - [:reencodarr, :crf_search, :exception], - %{}, - %{video_id: video.id, exception: exception} - ) - {:error, error_message} end end diff --git a/lib/reencodarr/crf_searcher/broadway/producer.ex b/lib/reencodarr/crf_searcher/broadway/producer.ex index f0dd7969..ec841a71 100644 --- a/lib/reencodarr/crf_searcher/broadway/producer.ex +++ b/lib/reencodarr/crf_searcher/broadway/producer.ex @@ -40,14 +40,6 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do end end - def get_producer_state do - # Get the current producer state for debugging - case Broadway.producer_names(Reencodarr.CrfSearcher.Broadway) do - [producer_name | _] -> GenServer.call(producer_name, :get_debug_state, 5000) - [] -> {:error, :not_running} - end - end - @impl GenStage def init(_opts) do # Subscribe to video state transitions for videos that finished analysis @@ -56,9 +48,6 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do # Initialize pipeline state machine pipeline = PipelineStateMachine.new(:crf_searcher) - # Send a delayed message to trigger initial telemetry emission - Process.send_after(self(), :initial_telemetry, 1000) - {:producer, %{ demand: 0, @@ -68,8 +57,18 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @impl GenStage def handle_demand(demand, state) when demand > 0 do - new_state = %{state | demand: state.demand + demand} - dispatch_if_ready(new_state) + current_status = PipelineStateMachine.get_state(state.pipeline) + + # Only accumulate demand if not currently processing + # CRF search is single-concurrency, so we shouldn't accept more demand while busy + # But we DO accept demand when paused (to allow resuming) + if current_status == :processing do + Logger.debug("[CRF Searcher Producer] Already processing, ignoring demand: #{demand}") + {:noreply, [], state} + else + new_state = %{state | demand: state.demand + demand} + dispatch_if_ready(new_state) + end end @impl GenStage @@ -86,20 +85,6 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do {:reply, actively_running, [], state} end - @impl GenStage - def handle_call(:get_debug_state, _from, state) do - debug_info = %{ - demand: state.demand, - pipeline_state: PipelineStateMachine.get_state(state.pipeline), - pipeline_running: PipelineStateMachine.running?(state.pipeline), - crf_search_available: crf_search_available?(), - should_dispatch: should_dispatch?(state), - queue_count: length(Media.get_videos_for_crf_search(10)) - } - - {:reply, debug_info, [], state} - end - @impl GenStage def handle_cast({:status_request, requester_pid}, state) do current_state = PipelineStateMachine.get_state(state.pipeline) @@ -109,14 +94,28 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do @impl GenStage def handle_cast(:broadcast_status, state) do - p = state.pipeline - Events.pipeline_state_changed(p.service, p.current_state, p.current_state) + # Broadcast actual current status to dashboard + current_state = PipelineStateMachine.get_state(state.pipeline) + + # Map pipeline state to dashboard status + status = + case current_state do + :processing -> :processing + :paused -> :paused + :running -> :running + _ -> :stopped + end + + # Broadcast as service_status event with the actual state + Events.broadcast_event(:service_status, %{service: :crf_searcher, status: status}) + {:noreply, [], state} end @impl GenStage def handle_cast(:pause, state) do - {:noreply, [], Map.update!(state, :pipeline, &PipelineStateMachine.pause/1)} + new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) + {:noreply, [], new_state} end @impl GenStage @@ -169,50 +168,26 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do # CRF search completed - transition state appropriately current_state = PipelineStateMachine.get_state(state.pipeline) - updated_state = - case current_state do - :processing -> - new_pipeline = - PipelineStateMachine.work_completed(state.pipeline, crf_search_available?()) - - %{state | pipeline: new_pipeline} - - :pausing -> - new_pipeline = PipelineStateMachine.work_completed(state.pipeline, false) - %{state | pipeline: new_pipeline} - - _ -> - state - end - - # Refresh queue telemetry and check for more work - emit_initial_telemetry(updated_state) - dispatch_if_ready(updated_state) - {:noreply, [], updated_state} - end - - @impl GenStage - def handle_info(:initial_telemetry, state) do - # Emit initial telemetry on startup to populate dashboard queue - Logger.debug("🔍 CRF Searcher: Emitting initial telemetry") - emit_initial_telemetry(state) - - # Schedule periodic telemetry updates like the analyzer does - Process.send_after(self(), :periodic_telemetry, 5000) + case current_state do + :processing -> + # Work completed while running - check for more work + new_pipeline = + PipelineStateMachine.work_completed(state.pipeline, crf_search_available?()) - {:noreply, [], state} - end + updated_state = %{state | pipeline: new_pipeline} + dispatch_if_ready(updated_state) + {:noreply, [], updated_state} - @impl GenStage - def handle_info(:periodic_telemetry, state) do - # Emit periodic telemetry to keep dashboard updated - Logger.debug("🔍 CRF Searcher: Emitting periodic telemetry") - emit_initial_telemetry(state) - - # Schedule next update - Process.send_after(self(), :periodic_telemetry, 5000) + :pausing -> + # Work completed while pausing - transition to paused and stop + new_pipeline = PipelineStateMachine.work_completed(state.pipeline, false) + updated_state = %{state | pipeline: new_pipeline} + {:noreply, [], updated_state} - {:noreply, [], state} + _ -> + # Already paused or other state - just acknowledge completion + {:noreply, [], state} + end end @impl GenStage @@ -283,24 +258,6 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do new_pipeline = PipelineStateMachine.start_processing(state.pipeline) updated_state = %{state | demand: state.demand - 1, pipeline: new_pipeline} - # Get remaining videos for queue state update - remaining_videos = Media.get_videos_for_crf_search(10) - total_count = Media.count_videos_for_crf_search() - - # Emit telemetry event for queue state change - :telemetry.execute( - [:reencodarr, :crf_searcher, :queue_changed], - %{ - dispatched_count: 1, - remaining_demand: updated_state.demand, - queue_size: total_count - }, - %{ - next_videos: remaining_videos, - database_queue_available: total_count > 0 - } - ) - {:noreply, [video], updated_state} end else @@ -308,70 +265,31 @@ defmodule Reencodarr.CrfSearcher.Broadway.Producer do end end - # Emit initial telemetry on startup to populate dashboard queues - defp emit_initial_telemetry(_state) do - # Get 10 for dashboard display - next_videos = get_next_videos_for_telemetry(10) - # Get total count for accurate queue size - total_count = Media.count_videos_for_crf_search() - - Logger.debug( - "🔍 CRF Searcher: Emitting telemetry - #{total_count} videos, #{length(next_videos)} in next batch" - ) - - measurements = %{ - queue_size: total_count - } - - metadata = %{ - producer_type: :crf_searcher, - # For backward compatibility - next_video: List.first(next_videos), - # Full list for dashboard - next_videos: next_videos - } - - :telemetry.execute([:reencodarr, :crf_searcher, :queue_changed], measurements, metadata) - end - - # Get multiple next videos for dashboard display - defp get_next_videos_for_telemetry(limit) do - # Get videos from database for dashboard display - Media.get_videos_for_crf_search(limit) - end - # Helper function to force dispatch when CRF searcher is running defp force_dispatch_if_running(state) do current_state = PipelineStateMachine.get_state(state.pipeline) - case {PipelineStateMachine.available_for_work?(current_state), crf_search_available?()} do - {false, _} -> + cond do + not PipelineStateMachine.available_for_work?(current_state) -> Logger.debug( "[CRF Searcher Producer] Force dispatch - status: #{current_state}, not available for work, skipping dispatch" ) {:noreply, [], state} - {true, false} -> - Logger.debug("[CRF Searcher Producer] Force dispatch - status: #{current_state}") + not crf_search_available?() -> Logger.debug("[CRF Searcher Producer] GenServer not available, skipping dispatch") {:noreply, [], state} - {true, true} -> - Logger.debug("[CRF Searcher Producer] Force dispatch - status: #{current_state}") - Logger.debug("[CRF Searcher Producer] GenServer available, getting videos...") - - case Media.get_videos_for_crf_search(1) do - [] -> - {:noreply, [], state} + Media.get_videos_for_crf_search(1) == [] -> + {:noreply, [], state} - videos -> - Logger.debug( - "[CRF Searcher Producer] Force dispatching video to wake up idle Broadway pipeline" - ) + true -> + Logger.debug( + "[CRF Searcher Producer] Force dispatching video to wake up idle Broadway pipeline" + ) - {:noreply, videos, state} - end + dispatch_if_ready(state) end end end diff --git a/lib/reencodarr/dashboard/events.ex b/lib/reencodarr/dashboard/events.ex index 15b0a969..74a2fa30 100644 --- a/lib/reencodarr/dashboard/events.ex +++ b/lib/reencodarr/dashboard/events.ex @@ -22,18 +22,14 @@ defmodule Reencodarr.Dashboard.Events do end @doc """ - Broadcast a pipeline state change to all interested parties. - - Notifies the dashboard UI and other services about the state change. + Pipeline state change tracking with internal service broadcasts only. + Dashboard UI broadcasts removed - LiveView handles state via direct progress events. """ @spec pipeline_state_changed(service(), pipeline_state(), pipeline_state()) :: {:ok, pipeline_state()} def pipeline_state_changed(service, _from_state, to_state) when service in [:analyzer, :crf_searcher, :encoder] do - # Dashboard UI events - let the dashboard handle the mapping - broadcast_event({service, to_state}, %{}) - - # Internal service PubSub (for service-to-service communication) + # Only internal service PubSub (for service-to-service communication and tests) Phoenix.PubSub.broadcast(Reencodarr.PubSub, Atom.to_string(service), {service, to_state}) {:ok, to_state} diff --git a/lib/reencodarr/encoder.ex b/lib/reencodarr/encoder.ex index c1a8a512..d11db47d 100644 --- a/lib/reencodarr/encoder.ex +++ b/lib/reencodarr/encoder.ex @@ -56,15 +56,4 @@ defmodule Reencodarr.Encoder do @doc "Get next videos in the encoding queue" def next_videos(limit \\ 10), do: Media.get_next_for_encoding(limit) - - # Debug functions - - @doc "Get detailed encoder debug information" - def debug_info do - %{ - status: status(), - next_videos: next_videos(5), - genserver_available: available?() - } - end end diff --git a/lib/reencodarr/encoder/broadway.ex b/lib/reencodarr/encoder/broadway.ex index aab5c52c..2c6c48c1 100644 --- a/lib/reencodarr/encoder/broadway.ex +++ b/lib/reencodarr/encoder/broadway.ex @@ -67,6 +67,7 @@ defmodule Reencodarr.Encoder.Broadway do module: {Producer, []}, transformer: {__MODULE__, :transform, []}, rate_limiting: [ + # Use normal rate limiting - pause/resume controlled by producer state allowed_messages: config[:rate_limit_messages], interval: config[:rate_limit_interval] ] @@ -78,7 +79,9 @@ defmodule Reencodarr.Encoder.Broadway do ] ], context: %{ - encoding_timeout: config[:encoding_timeout] + encoding_timeout: config[:encoding_timeout], + rate_limit_messages: config[:rate_limit_messages], + rate_limit_interval: config[:rate_limit_interval] } ) end @@ -122,6 +125,8 @@ defmodule Reencodarr.Encoder.Broadway do @doc """ Pause the encoder pipeline. + Pauses processing by updating the producer's state machine. + ## Examples iex> Reencodarr.Encoder.Broadway.pause() :ok @@ -134,6 +139,8 @@ defmodule Reencodarr.Encoder.Broadway do @doc """ Resume the encoder pipeline. + Resumes processing by updating the producer's state machine. + ## Examples iex> Reencodarr.Encoder.Broadway.resume() :ok diff --git a/lib/reencodarr/encoder/broadway/producer.ex b/lib/reencodarr/encoder/broadway/producer.ex index a088d62b..e146c0cb 100644 --- a/lib/reencodarr/encoder/broadway/producer.ex +++ b/lib/reencodarr/encoder/broadway/producer.ex @@ -55,9 +55,6 @@ defmodule Reencodarr.Encoder.Broadway.Producer do # Subscribe to dashboard events to know when encoding completes Phoenix.PubSub.subscribe(Reencodarr.PubSub, Events.channel()) - # Send a delayed message to broadcast initial queue state - Process.send_after(self(), :initial_queue_broadcast, 1000) - {:producer, %{ demand: 0, @@ -71,15 +68,16 @@ defmodule Reencodarr.Encoder.Broadway.Producer do "Producer: handle_demand called - new demand: #{demand}, current demand: #{state.demand}, total: #{state.demand + demand}" ) - new_state = %{state | demand: state.demand + demand} - # Only dispatch if we're not already processing something + # Only accumulate demand if not currently processing + # Encoder is single-concurrency, so we shouldn't accept more demand while busy current_status = PipelineStateMachine.get_state(state.pipeline) if current_status == :processing do - # If we're already processing, just store the demand for later - Logger.debug("Producer: handle_demand - currently processing, storing demand for later") - {:noreply, [], new_state} + # If we're already processing, ignore the demand + Logger.debug("Producer: handle_demand - currently processing, ignoring demand") + {:noreply, [], state} else + new_state = %{state | demand: state.demand + demand} Logger.debug("Producer: handle_demand - not processing, calling dispatch_if_ready") dispatch_if_ready(new_state) end @@ -114,14 +112,28 @@ defmodule Reencodarr.Encoder.Broadway.Producer do @impl GenStage def handle_cast(:broadcast_status, state) do - p = state.pipeline - Events.pipeline_state_changed(p.service, p.current_state, p.current_state) + # Broadcast actual current status to dashboard + current_state = PipelineStateMachine.get_state(state.pipeline) + + # Map pipeline state to dashboard status + status = + case current_state do + :processing -> :processing + :paused -> :paused + :running -> :running + _ -> :stopped + end + + # Broadcast as service_status event with the actual state + Events.broadcast_event(:service_status, %{service: :encoder, status: status}) + {:noreply, [], state} end @impl GenStage def handle_cast(:pause, state) do - {:noreply, [], Map.update!(state, :pipeline, &PipelineStateMachine.pause/1)} + new_state = Map.update!(state, :pipeline, &PipelineStateMachine.handle_pause_request/1) + {:noreply, [], new_state} end @impl GenStage @@ -168,6 +180,12 @@ defmodule Reencodarr.Encoder.Broadway.Producer do dispatch_if_ready(state) end + @impl GenStage + def handle_info({:crf_search_vmaf_result, _data}, state) do + # CRF search results don't affect encoder - ignore + {:noreply, [], state} + end + @impl GenStage def handle_info({:encoding_completed, %{vmaf_id: vmaf_id, result: result} = event_data}, state) do # Encoding completed (success or failure), transition back to running @@ -181,27 +199,34 @@ defmodule Reencodarr.Encoder.Broadway.Producer do "[Encoder Producer] Current state before transition - status: #{current_status}, demand: #{state.demand}" ) - # Use struct API to handle work completion - it will transition to appropriate state - updated_pipeline = - PipelineStateMachine.work_completed(state.pipeline, Media.encoding_queue_count() > 0) + case current_status do + :processing -> + # Work completed while running - check for more work + updated_pipeline = + PipelineStateMachine.work_completed(state.pipeline, Media.encoding_queue_count() > 0) - new_state = %{state | pipeline: updated_pipeline} + new_state = %{state | pipeline: updated_pipeline} + new_status = PipelineStateMachine.get_state(updated_pipeline) - new_status = PipelineStateMachine.get_state(updated_pipeline) + Logger.debug( + "[Encoder Producer] State after transition - status: #{new_status}, demand: #{new_state.demand}" + ) - Logger.debug( - "[Encoder Producer] State after transition - status: #{new_status}, demand: #{new_state.demand}" - ) + dispatch_if_ready(new_state) - # Always dispatch when encoding completes - this ensures we check for next work - dispatch_if_ready(new_state) - end + :pausing -> + # Work completed while pausing - transition to paused and stop + updated_pipeline = PipelineStateMachine.work_completed(state.pipeline, false) + new_state = %{state | pipeline: updated_pipeline} - @impl GenStage - def handle_info(:initial_queue_broadcast, state) do - # Broadcast initial queue state so UI shows correct count on startup - broadcast_queue_state() - {:noreply, [], state} + Logger.debug("[Encoder Producer] Pausing complete, now paused") + {:noreply, [], new_state} + + _ -> + # Already paused or other state - just acknowledge completion + Logger.debug("[Encoder Producer] Encoding completed in state #{current_status}, ignoring") + {:noreply, [], state} + end end @impl GenStage @@ -324,9 +349,6 @@ defmodule Reencodarr.Encoder.Broadway.Producer do case Media.get_next_for_encoding(1) do # Handle case where a single VMAF is returned %Reencodarr.Media.Vmaf{} = vmaf -> - # Emit queue state update when dispatching - broadcast_queue_state() - Logger.debug( "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, keeping demand: #{state.demand}" ) @@ -336,9 +358,6 @@ defmodule Reencodarr.Encoder.Broadway.Producer do # Handle case where a list is returned [vmaf | _] -> - # Emit queue state update when dispatching - broadcast_queue_state() - Logger.debug( "Producer: dispatch_vmafs - dispatching VMAF #{vmaf.id}, keeping demand: #{state.demand}" ) @@ -348,8 +367,7 @@ defmodule Reencodarr.Encoder.Broadway.Producer do # Handle case where empty list or nil is returned _ -> - # No VMAF available, emit queue state and transition to appropriate state - broadcast_queue_state() + # No VMAF available, transition to appropriate state final_pipeline = PipelineStateMachine.transition_to(updated_state.pipeline, :idle) final_state = %{updated_state | pipeline: final_pipeline} {:noreply, [], final_state} @@ -374,53 +392,12 @@ defmodule Reencodarr.Encoder.Broadway.Producer do defp force_dispatch_if_running(state) do current_status = PipelineStateMachine.get_state(state.pipeline) - if PipelineStateMachine.available_for_work?(current_status) do - videos = Media.get_next_for_encoding(1) - - if length(videos) > 0 do - Logger.debug( - "[Encoder Producer] Force dispatching video to wake up idle Broadway pipeline" - ) + if PipelineStateMachine.available_for_work?(current_status) and get_next_vmaf_preview() != nil do + Logger.debug("[Encoder Producer] Force dispatching VMAF to wake up idle Broadway pipeline") - {:noreply, videos, state} - else - {:noreply, [], state} - end + dispatch_if_ready(state) else - Logger.debug( - "[Encoder Producer] Force dispatch - status: #{current_status}, not available for work, skipping dispatch" - ) - {:noreply, [], state} end end - - # Broadcast current queue state for UI updates - defp broadcast_queue_state do - # Get next VMAFs for UI display - next_vmafs = Media.list_videos_by_estimated_percent(10) - - # Format for UI display - formatted_vmafs = - Enum.map(next_vmafs, fn vmaf -> - %{ - path: vmaf.video.path, - crf: vmaf.crf, - vmaf: vmaf.score, - savings: vmaf.savings, - size: vmaf.size - } - end) - - # Emit telemetry event that the UI expects - measurements = %{ - queue_size: length(next_vmafs) - } - - metadata = %{ - next_vmafs: formatted_vmafs - } - - :telemetry.execute([:reencodarr, :encoder, :queue_changed], measurements, metadata) - end end diff --git a/lib/reencodarr/error_helpers.ex b/lib/reencodarr/error_helpers.ex index acaf92d3..03252c06 100644 --- a/lib/reencodarr/error_helpers.ex +++ b/lib/reencodarr/error_helpers.ex @@ -23,110 +23,6 @@ defmodule Reencodarr.ErrorHelpers do {:error, reason} end - @doc """ - Handles result tuples with automatic logging on errors. - - ## Examples - - iex> handle_result_with_logging({:ok, data}, &process_data/1, "Data processing") - processed_data - - iex> handle_result_with_logging({:error, :timeout}, &process_data/1, "Data processing") - {:error, :timeout} - - """ - def handle_result_with_logging(result, success_fn, context \\ "") do - case result do - {:ok, value} -> - success_fn.(value) - - {:error, reason} -> - log_and_return_error(reason, context) - - other -> - log_and_return_error({:unexpected_result, other}, context) - end - end - - @doc """ - Handles results with custom error processing. - - ## Examples - - iex> handle_result({:ok, data}, &process_data/1, &handle_error/1) - processed_data - - """ - def handle_result(result, success_fn, error_fn) do - case result do - {:ok, value} -> success_fn.(value) - {:error, reason} -> error_fn.(reason) - other -> error_fn.({:unexpected_result, other}) - end - end - - @doc """ - Wraps a function call with error logging. - - ## Examples - - iex> with_error_logging(fn -> risky_operation() end, "Risky operation") - result_or_logged_error - - """ - def with_error_logging(func, context \\ "") do - func.() - rescue - e -> log_and_return_error({:exception, Exception.message(e)}, context) - catch - :exit, reason -> log_and_return_error({:exit, reason}, context) - :throw, value -> log_and_return_error({:throw, value}, context) - end - - @doc """ - Logs debug information for successful operations. - - ## Examples - - iex> log_success("User created", %{id: 1}) - :ok - - """ - def log_success(message, data \\ nil) do - log_message = if data, do: "#{message}: #{inspect(data)}", else: message - Logger.debug(log_message) - :ok - end - - @doc """ - Standard pattern for handling service API responses. - - ## Examples - - iex> handle_api_response({:ok, %{status: 200, body: data}}, "User fetch") - {:ok, data} - - iex> handle_api_response({:error, reason}, "User fetch") - {:error, reason} - - """ - def handle_api_response(response, context \\ "API call") do - case response do - {:ok, %{status: status, body: body}} when status in 200..299 -> - log_success("#{context} succeeded") - {:ok, body} - - {:ok, %{status: status, body: body}} -> - log_and_return_error({:http_error, status, body}, context) - - {:error, reason} -> - log_and_return_error(reason, context) - - other -> - log_and_return_error({:unexpected_response, other}, context) - end - end - @doc """ Creates a standardized error for missing configuration. """ diff --git a/lib/reencodarr/media.ex b/lib/reencodarr/media.ex index 597c2abd..0339c61d 100644 --- a/lib/reencodarr/media.ex +++ b/lib/reencodarr/media.ex @@ -6,7 +6,6 @@ defmodule Reencodarr.Media do warn: false alias Reencodarr.Analyzer.Broadway, as: AnalyzerBroadway - alias Reencodarr.Analyzer.QueueManager alias Reencodarr.Core.Parsers alias Reencodarr.Media.{ @@ -453,10 +452,6 @@ defmodule Reencodarr.Media do end end - def most_recent_video_update, do: Repo.one(from v in Video, select: max(v.updated_at)) - def get_most_recent_inserted_at, do: Repo.one(from v in Video, select: max(v.inserted_at)) - def video_has_vmafs?(%Video{id: id}), do: Repo.exists?(from v in Vmaf, where: v.video_id == ^id) - # Consolidated shared logic for video deletion defp delete_videos_by_ids(video_ids) do Repo.transaction(fn -> @@ -715,21 +710,6 @@ defmodule Reencodarr.Media do # --- Queue helpers --- - # Manual analyzer queue items from QueueManager - defp get_manual_analyzer_items do - case QueueManager.get_queue() do - {:ok, queue} -> queue - {:error, _} -> [] - end - end - - defp count_manual_analyzer_items do - case QueueManager.get_queue() do - {:ok, queue} -> length(queue) - {:error, _} -> 0 - end - end - def get_next_for_encoding_by_time do result = Repo.one( @@ -891,20 +871,6 @@ defmodule Reencodarr.Media do |> Repo.update_all([]) end - # --- Debug helpers --- - - @doc """ - Debug function to check the analyzer state and queue status. - """ - def debug_analyzer_status do - %{ - analyzer_running: AnalyzerBroadway.running?(), - videos_needing_analysis: get_videos_needing_analysis(5), - manual_queue: get_manual_analyzer_items(), - total_analyzer_queue_count: count_videos_needing_analysis() + count_manual_analyzer_items() - } - end - @doc """ Force trigger analysis of a specific video for debugging. """ @@ -963,282 +929,6 @@ defmodule Reencodarr.Media do end) end - @doc """ - Explains where a specific video path is located in the system and which queues it belongs to. - - Returns a detailed map with information about: - - Database state (analyzed, has VMAF, ready for encoding, etc.) - - Current queue memberships (analyzer, CRF searcher, encoder) - - Processing status and next steps - - Error states if any - - ## Examples - - iex> Reencodarr.Media.explain_path_location("/path/to/video.mkv") - %{ - path: "/path/to/video.mkv", - exists_in_db: true, - database_state: %{ - analyzed: true, - has_vmaf: true, - ready_for_encoding: true, - state: :crf_searched - }, - queue_memberships: %{ - analyzer_broadway: false, - analyzer_manual: false, - crf_searcher_broadway: false, - crf_searcher_genserver: false, - encoder_broadway: true, - encoder_genserver: false - }, - next_steps: ["ready for encoding"], - details: %{ - video_id: 123, - library_name: "Movies", - bitrate: 5000, - vmaf_count: 3, - chosen_vmaf: %{crf: 23, percent: 95.2} - } - } - """ - @spec explain_path_location(String.t()) :: %{ - path: String.t(), - exists_in_db: boolean(), - database_state: %{ - analyzed: boolean(), - has_vmaf: boolean(), - ready_for_encoding: boolean(), - encoded: boolean(), - failed: boolean(), - state: atom() - }, - queue_memberships: %{ - analyzer_broadway: boolean(), - analyzer_manual: boolean(), - crf_searcher_broadway: boolean(), - crf_searcher_genserver: boolean(), - encoder_broadway: boolean(), - encoder_genserver: boolean() - }, - next_steps: [String.t()], - details: map() | nil - } - def explain_path_location(path) when is_binary(path) do - case get_video_by_path(path) do - {:error, :not_found} -> - build_not_found_response(path) - - {:ok, video} -> - build_video_response(path, video) - end - end - - # Helper function to build response for paths not in database - defp build_not_found_response(path) do - %{ - path: path, - exists_in_db: false, - database_state: %{ - analyzed: false, - has_vmaf: false, - ready_for_encoding: false, - state: :needs_analysis - }, - queue_memberships: %{ - analyzer_broadway: false, - analyzer_manual: false, - crf_searcher_broadway: false, - crf_searcher_genserver: false, - encoder_broadway: false, - encoder_genserver: false - }, - next_steps: ["not in database - needs to be added"], - details: nil - } - end - - # Helper function to build response for existing videos - defp build_video_response(path, video) do - {has_vmaf, chosen_vmaf} = get_vmaf_info(video) - analyzed = is_integer(video.bitrate) and video.bitrate > 0 - - ready_for_encoding = - match?(%Vmaf{chosen: true}, chosen_vmaf) && video.state not in [:encoded, :failed] - - queue_memberships = build_queue_memberships(path) - next_steps = determine_next_steps(video, analyzed, has_vmaf, ready_for_encoding, chosen_vmaf) - details = build_video_details(video, chosen_vmaf) - - %{ - path: path, - exists_in_db: true, - database_state: %{ - analyzed: analyzed, - has_vmaf: has_vmaf, - ready_for_encoding: ready_for_encoding, - encoded: video.state == :encoded, - failed: video.state == :failed, - state: video.state - }, - queue_memberships: queue_memberships, - next_steps: next_steps, - details: details - } - end - - # Helper function to get VMAF information - defp get_vmaf_info(video) do - has_vmaf = Repo.exists?(from v in Vmaf, where: v.video_id == ^video.id) - - chosen_vmaf = - if has_vmaf do - Repo.one( - from v in Vmaf, - where: v.video_id == ^video.id and v.chosen == true, - preload: [:video] - ) - else - nil - end - - {has_vmaf, chosen_vmaf} - end - - # Helper function to build queue memberships - defp build_queue_memberships(path) do - %{ - analyzer_broadway: path_in_analyzer_broadway?(path), - analyzer_manual: path_in_analyzer_manual?(path), - crf_searcher_broadway: path_in_crf_searcher_broadway?(path), - crf_searcher_genserver: path_in_crf_searcher_genserver?(path), - encoder_broadway: path_in_encoder_broadway?(path), - encoder_genserver: path_in_encoder_genserver?(path) - } - end - - # Helper function to build video details - defp build_video_details(video, chosen_vmaf) do - library = video.library_id && Repo.get(Library, video.library_id) - - %{ - video_id: video.id, - library_name: library && library.name, - bitrate: video.bitrate, - vmaf_count: Repo.aggregate(from(v in Vmaf, where: v.video_id == ^video.id), :count, :id), - chosen_vmaf: chosen_vmaf && %{crf: chosen_vmaf.crf, percent: chosen_vmaf.percent}, - video_codecs: video.video_codecs, - audio_codecs: video.audio_codecs, - size: video.size, - inserted_at: video.inserted_at, - updated_at: video.updated_at - } - end - - # Helper functions to check queue memberships - defp path_in_analyzer_broadway?(_path) do - # The analyzer Broadway producer manages its own queue internally - # We can't easily check this without accessing its internal state - # For now, return false as this would require more complex introspection - false - end - - defp path_in_analyzer_manual?(path) do - # Check the QueueManager's manual queue - case QueueManager.get_queue() do - {:ok, manual_queue} -> queue_contains_path?(manual_queue, path) - {:error, _} -> false - end - end - - defp queue_contains_path?(manual_queue, path) do - Enum.any?(manual_queue, fn item -> - case item do - %{path: item_path} -> String.downcase(item_path) == String.downcase(path) - _ -> false - end - end) - end - - defp path_in_crf_searcher_broadway?(_path) do - # Similar to analyzer Broadway, internal state not easily accessible - false - end - - defp path_in_crf_searcher_genserver?(_path) do - # Would need to inspect GenServer message queue, which is not practical - false - end - - defp path_in_encoder_broadway?(_path) do - # Similar to other Broadway producers - false - end - - defp path_in_encoder_genserver?(_path) do - # Would need to inspect GenServer message queue, which is not practical - false - end - - defp determine_next_steps(video, analyzed, has_vmaf, ready_for_encoding, chosen_vmaf) do - determine_video_status(video, analyzed, has_vmaf, ready_for_encoding, chosen_vmaf) - end - - defp determine_video_status(video, _analyzed, _has_vmaf, _ready_for_encoding, _chosen_vmaf) - when video.state == :failed do - ["marked as failed - manual intervention needed"] - end - - defp determine_video_status(video, _analyzed, _has_vmaf, _ready_for_encoding, _chosen_vmaf) - when video.state == :encoded do - ["already encoded - processing complete"] - end - - defp determine_video_status(_video, _analyzed, _has_vmaf, true, chosen_vmaf) do - ["ready for encoding with CRF #{chosen_vmaf.crf}"] - end - - defp determine_video_status(_video, _analyzed, true, _ready_for_encoding, nil) do - ["has VMAF results but none chosen - needs manual selection"] - end - - defp determine_video_status(video, true, false, _ready_for_encoding, _chosen_vmaf) do - determine_analyzed_video_steps(video) - end - - defp determine_video_status(_video, false, _has_vmaf, _ready_for_encoding, _chosen_vmaf) do - ["needs analysis - should be in analyzer queue"] - end - - defp determine_video_status(_video, _analyzed, _has_vmaf, _ready_for_encoding, _chosen_vmaf) do - ["unknown state - check manually"] - end - - defp determine_analyzed_video_steps(video) do - cond do - has_av1_codec?(video) -> - ["already AV1 encoded - no CRF search needed"] - - has_opus_codec?(video) -> - ["has Opus audio - skipped from CRF search queue"] - - true -> - ["analyzed but needs CRF search"] - end - end - - defp has_av1_codec?(video) do - Enum.any?(video.video_codecs || [], fn codec -> - String.downcase(codec) |> String.contains?("av1") - end) - end - - defp has_opus_codec?(video) do - Enum.any?(video.audio_codecs || [], fn codec -> - String.downcase(codec) |> String.contains?("opus") - end) - end - @doc """ Diagnostic function to test inserting a video path and report exactly what happened. @@ -1440,85 +1130,4 @@ defmodule Reencodarr.Media do defp log_test_result_details(%{success: false, errors: errors}) do Logger.warning(" Errors: #{Enum.join(errors, ", ")}") end - - # === Missing function implementations for backward compatibility === - - @doc """ - Check if parameters contain preset 6 settings. - """ - def has_preset_6_params?(params) do - case params do - params_list when is_list(params_list) -> - # Check for adjacent --preset and 6 in the list - check_for_preset_6_in_list(params_list) - - _ -> - false - end - end - - # Helper function to check for --preset 6 in parameter list - defp check_for_preset_6_in_list([]), do: false - defp check_for_preset_6_in_list([_]), do: false - defp check_for_preset_6_in_list(["--preset", "6" | _]), do: true - defp check_for_preset_6_in_list([_ | rest]), do: check_for_preset_6_in_list(rest) - - @doc """ - Upserts a VMAF record for CRF search operations. - Delegates to standard upsert_vmaf with additional context. - """ - def upsert_crf_search_vmaf(params, video, args) do - # Add context information for CRF search - enhanced_params = - Map.merge(params, %{ - "video_id" => video.id, - "params" => args - }) - - upsert_vmaf(enhanced_params) - end - - @doc """ - Get VMAF record by video ID and CRF value. - """ - def get_vmaf_by_crf(video_id, crf_str) do - case Parsers.parse_float_exact(to_string(crf_str)) do - {:ok, crf_float} -> - Repo.one(from v in Vmaf, where: v.video_id == ^video_id and v.crf == ^crf_float, limit: 1) - - {:error, _} -> - nil - end - end - - @doc """ - Clear/delete VMAF records for a video. - """ - def clear_vmaf_records(video_id, vmaf_records) when is_list(vmaf_records) do - vmaf_ids = Enum.map(vmaf_records, & &1.id) - - from(v in Vmaf, where: v.video_id == ^video_id and v.id in ^vmaf_ids) - |> Repo.delete_all() - end - - def clear_vmaf_records(video_id, _) do - # If not a list, clear all VMAFs for the video - delete_vmafs_for_video(video_id) - end - - @doc """ - Get VMAF scores for a video as a list of score values. - """ - def get_vmaf_scores_for_video(video_id) do - Repo.all(from v in Vmaf, where: v.video_id == ^video_id, select: v.score) - end - - @doc """ - Check if a VMAF record has preset 6 parameters. - """ - def vmaf_has_preset_6?(%Vmaf{params: params}) do - has_preset_6_params?(params) - end - - def vmaf_has_preset_6?(_), do: false end diff --git a/lib/reencodarr/media/bulk_operations.ex b/lib/reencodarr/media/bulk_operations.ex deleted file mode 100644 index 44df88b7..00000000 --- a/lib/reencodarr/media/bulk_operations.ex +++ /dev/null @@ -1,402 +0,0 @@ -defmodule Reencodarr.Media.BulkOperations do - @moduledoc """ - Handles bulk operations for the Media context. - - Extracted from the main Media module to provide specialized functionality - for batch processing, cleanup operations, and mass data manipulation. - """ - - import Ecto.Query - import Reencodarr.Media.SharedQueries, only: [videos_with_no_chosen_vmafs_query: 0] - alias Reencodarr.Media.{SharedQueries, Video, VideoFailure, Vmaf} - alias Reencodarr.Repo - require Logger - - @doc """ - Counts videos that would generate invalid audio encoding arguments (b:a=0k, ac=0). - - Tests each video by calling Rules.build_args/2 and checking if it produces invalid - audio encoding arguments like "--enc b:a=0k" or "--enc ac=0". Useful for monitoring - and deciding whether to run reset_videos_with_invalid_audio_args/0. - - ## Examples - iex> BulkOperations.count_videos_with_invalid_audio_args() - %{videos_tested: 1250, videos_with_invalid_args: 42} - """ - @spec count_videos_with_invalid_audio_args() :: %{ - videos_tested: integer(), - videos_with_invalid_args: integer() - } - def count_videos_with_invalid_audio_args do - # Get all videos that haven't been processed yet - videos_to_test = - from(v in Video, - where: v.state not in [:encoded, :failed], - select: v - ) - |> Repo.all() - - videos_tested_count = length(videos_to_test) - - # Test each video to see if it produces invalid audio args - videos_with_invalid_args_count = - videos_to_test - |> Enum.count(&produces_invalid_audio_args?/1) - - %{ - videos_tested: videos_tested_count, - videos_with_invalid_args: videos_with_invalid_args_count - } - end - - @doc """ - One-liner to reset videos that would generate invalid audio encoding arguments (b:a=0k, ac=0). - - Tests each video by calling Rules.build_args/2 and checking if it produces invalid - audio encoding arguments like "--enc b:a=0k" or "--enc ac=0". Resets analysis - fields and deletes VMAFs for videos that would generate these invalid arguments. - - ## Examples - iex> BulkOperations.reset_videos_with_invalid_audio_args() - %{videos_tested: 1250, videos_reset: 42, vmafs_deleted: 156} - """ - @spec reset_videos_with_invalid_audio_args() :: %{ - videos_tested: integer(), - videos_reset: integer(), - vmafs_deleted: integer() - } - def reset_videos_with_invalid_audio_args do - # Get all videos that haven't been processed yet - videos_to_test = - from(v in Video, - where: v.state not in [:encoded, :failed], - select: v - ) - |> Repo.all() - - videos_tested_count = length(videos_to_test) - - # Test each video to see if it produces invalid audio args - problematic_video_ids = - videos_to_test - |> Enum.filter(&produces_invalid_audio_args?/1) - |> Enum.map(& &1.id) - - videos_reset_count = length(problematic_video_ids) - - if videos_reset_count > 0 do - Repo.transaction(fn -> - # Delete VMAFs for these videos (they were generated with bad audio data) - {vmafs_deleted_count, _} = - from(v in Vmaf, where: v.video_id in ^problematic_video_ids) - |> Repo.delete_all() - - # Reset analysis fields to force re-analysis - from(v in Video, where: v.id in ^problematic_video_ids) - |> Repo.update_all( - set: [ - bitrate: nil, - video_codecs: nil, - audio_codecs: nil, - max_audio_channels: nil, - atmos: nil, - hdr: nil, - width: nil, - height: nil, - frame_rate: nil, - duration: nil, - updated_at: DateTime.utc_now() - ] - ) - - %{ - videos_tested: videos_tested_count, - videos_reset: videos_reset_count, - vmafs_deleted: vmafs_deleted_count - } - end) - |> case do - {:ok, result} -> - result - - {:error, _reason} -> - %{videos_tested: videos_tested_count, videos_reset: 0, vmafs_deleted: 0} - end - else - %{ - videos_tested: videos_tested_count, - videos_reset: 0, - vmafs_deleted: 0 - } - end - end - - @doc """ - One-liner to reset videos with invalid audio metadata that would cause 0 bitrate/channels. - - Finds videos where max_audio_channels is nil/0 OR audio_codecs is nil/empty, - resets their analysis fields, and deletes their VMAFs since they're based on bad data. - - ## Examples - iex> BulkOperations.reset_videos_with_invalid_audio_metadata() - %{videos_reset: 42, vmafs_deleted: 156} - """ - @spec reset_videos_with_invalid_audio_metadata() :: %{ - videos_reset: integer(), - vmafs_deleted: integer() - } - def reset_videos_with_invalid_audio_metadata do - Repo.transaction(fn -> - # Find videos with problematic audio metadata that would cause Rules.audio/1 to return [] - # This happens when max_audio_channels is nil/0 OR audio_codecs is nil/empty - problematic_video_ids = - from(v in Video, - where: - v.state not in [:encoded, :failed] and - v.atmos != true and - (is_nil(v.max_audio_channels) or v.max_audio_channels == 0 or - is_nil(v.audio_codecs) or fragment("array_length(?, 1) IS NULL", v.audio_codecs)), - select: v.id - ) - |> Repo.all() - - videos_reset_count = length(problematic_video_ids) - - # Delete VMAFs for these videos (they were generated with bad audio data) - {vmafs_deleted_count, _} = - from(v in Vmaf, where: v.video_id in ^problematic_video_ids) - |> Repo.delete_all() - - # Reset analysis fields to force re-analysis - from(v in Video, where: v.id in ^problematic_video_ids) - |> Repo.update_all( - set: [ - bitrate: nil, - video_codecs: nil, - audio_codecs: nil, - max_audio_channels: nil, - atmos: nil, - hdr: nil, - width: nil, - height: nil, - frame_rate: nil, - duration: nil, - updated_at: DateTime.utc_now() - ] - ) - - %{ - videos_reset: videos_reset_count, - vmafs_deleted: vmafs_deleted_count - } - end) - |> case do - {:ok, result} -> result - {:error, _reason} -> %{videos_reset: 0, vmafs_deleted: 0} - end - end - - @doc """ - Convenience function to reset all failed videos and clear their failure entries. - - This is useful for mass retry scenarios after fixing configuration issues - or updating encoding logic. Clears the `failed` flag on videos, removes all - associated VideoFailure records, and deletes VMAFs for failed videos since - they were likely generated with incorrect data. - - Returns a summary of the operation. - """ - @spec reset_all_failures() :: %{ - videos_reset: integer(), - failures_deleted: integer(), - vmafs_deleted: integer() - } - def reset_all_failures do - Repo.transaction(fn -> - # First, get IDs and counts of videos that will be reset - failed_video_ids = - from(v in Video, where: v.state == :failed, select: v.id) - |> Repo.all() - - videos_to_reset_count = length(failed_video_ids) - - # Get count of failures that will be deleted - failures_to_delete_count = - from(f in VideoFailure, where: is_nil(f.resolved_at), select: count(f.id)) - |> Repo.one() - - # Delete VMAFs for failed videos (they were likely generated with bad data) - {vmafs_deleted_count, _} = - from(v in Vmaf, where: v.video_id in ^failed_video_ids) - |> Repo.delete_all() - - # Reset all failed videos back to needs_analysis - from(v in Video, where: v.state == :failed) - |> Repo.update_all(set: [state: :needs_analysis, updated_at: DateTime.utc_now()]) - - # Delete all unresolved failures - from(f in VideoFailure, where: is_nil(f.resolved_at)) - |> Repo.delete_all() - - %{ - videos_reset: videos_to_reset_count, - failures_deleted: failures_to_delete_count, - vmafs_deleted: vmafs_deleted_count - } - end) - |> case do - {:ok, result} -> result - {:error, _reason} -> %{videos_reset: 0, failures_deleted: 0, vmafs_deleted: 0} - end - end - - @doc """ - Reset all videos for reanalysis by clearing their bitrate. - This is much more efficient than calling Analyzer.reanalyze_video/1 for each video. - Videos will be automatically picked up by the analyzer when there's demand. - VMAFs will be deleted automatically when videos are re-analyzed and their properties change. - """ - @spec reset_all_videos_for_reanalysis() :: {integer(), nil} - def reset_all_videos_for_reanalysis do - from(v in Video, - where: v.state not in [:encoded, :failed], - update: [set: [bitrate: nil]] - ) - |> Repo.update_all([]) - end - - @doc """ - Reset videos for reanalysis in batches to avoid overwhelming the Broadway queue. - VMAFs will be deleted automatically when videos are re-analyzed and their properties change. - """ - @spec reset_videos_for_reanalysis_batched(integer()) :: :ok - def reset_videos_for_reanalysis_batched(batch_size \\ 1000) do - videos_to_reset = - from(v in Video, - where: v.state not in [:encoded, :failed], - select: %{id: v.id} - ) - |> Repo.all() - - total_videos = length(videos_to_reset) - Logger.info("Resetting #{total_videos} videos for reanalysis in batches of #{batch_size}") - - videos_to_reset - |> Enum.chunk_every(batch_size) - |> Enum.with_index() - |> Enum.each(fn {batch, index} -> - Logger.info("Processing batch #{index + 1}/#{div(total_videos, batch_size) + 1}") - - # Reset bitrate for this batch (set to NULL so analyzer picks them up) - video_ids = Enum.map(batch, & &1.id) - - from(v in Video, where: v.id in ^video_ids, update: [set: [bitrate: nil]]) - |> Repo.update_all([]) - - # Small delay to prevent overwhelming the system - Process.sleep(100) - end) - - Logger.info("Completed resetting videos for reanalysis") - end - - @doc """ - Reset all failed videos to not failed in a single bulk operation. - """ - @spec reset_failed_videos() :: {integer(), nil} - def reset_failed_videos do - from(v in Video, - where: v.state == :failed, - update: [set: [state: :needs_analysis]] - ) - |> Repo.update_all([]) - end - - @doc """ - Deletes all unchosen VMAFs to clean up the database. - """ - @spec delete_unchosen_vmafs() :: {integer(), nil} - def delete_unchosen_vmafs do - Repo.transaction(fn -> - # Get video_ids that have vmafs but none are chosen - video_ids_with_no_chosen_vmafs = - videos_with_no_chosen_vmafs_query() - |> Repo.all() - - # Delete all vmafs for those video_ids - from(v in Vmaf, where: v.video_id in ^video_ids_with_no_chosen_vmafs) - |> Repo.delete_all() - end) - end - - @doc """ - Deletes videos with paths matching the given pattern. - """ - @spec delete_videos_with_path(String.t()) :: {integer(), nil} - def delete_videos_with_path(path) do - case_insensitive_like_condition = SharedQueries.case_insensitive_like(:path, path) - - video_ids = - from(v in Video, where: ^case_insensitive_like_condition, select: v.id) |> Repo.all() - - delete_videos_by_ids(video_ids) - end - - @doc """ - Deletes videos that reference non-existent file paths. - """ - @spec delete_videos_with_nonexistent_paths() :: {integer(), nil} - def delete_videos_with_nonexistent_paths do - video_ids = get_video_ids_with_missing_files() - delete_videos_by_ids(video_ids) - end - - # === Private Helper Functions === - - # Helper function to test if a video would produce invalid audio encoding arguments - defp produces_invalid_audio_args?(video) do - # Generate encoding arguments using the Rules module - args = Reencodarr.Rules.build_args(video, :encode) - - # Look for invalid audio encoding arguments - opus_args = - args - |> Enum.chunk_every(2, 1, :discard) - |> Enum.filter(fn - [flag, value] when flag == "--enc" -> - String.contains?(value, "b:a=") or String.contains?(value, "ac=") - - _ -> - false - end) - - # Check if any of the audio args are invalid (0 bitrate or 0 channels) - Enum.any?(opus_args, fn - ["--enc", value] -> - String.contains?(value, "b:a=0k") or String.contains?(value, "ac=0") - - _ -> - false - end) - rescue - # If there's any error generating args, consider it problematic - _ -> true - end - - # Consolidated shared logic for video deletion - defp delete_videos_by_ids(video_ids) do - Repo.transaction(fn -> - from(v in Vmaf, where: v.video_id in ^video_ids) |> Repo.delete_all() - from(v in Video, where: v.id in ^video_ids) |> Repo.delete_all() - end) - end - - defp get_video_ids_with_missing_files do - from(v in Video, select: %{id: v.id, path: v.path}) - |> Repo.all() - |> Enum.filter(&file_missing?/1) - |> Enum.map(& &1.id) - end - - defp file_missing?(%{path: path}), do: not File.exists?(path) -end diff --git a/lib/reencodarr/media/clean.ex b/lib/reencodarr/media/clean.ex deleted file mode 100644 index 67111889..00000000 --- a/lib/reencodarr/media/clean.ex +++ /dev/null @@ -1,493 +0,0 @@ -defmodule Reencodarr.Media.Clean do - @moduledoc """ - Clean, focused Media context for Reencodarr. - - This module provides core CRUD operations for the Media domain without - the clutter of statistics, debugging, or bulk operations. It maintains - clear separation of concerns and provides a stable API for media operations. - - For specialized operations, see: - - `Reencodarr.Media.Statistics` - Analytics and reporting - - `Reencodarr.Media.BulkOperations` - Mass data operations - - `Reencodarr.Media.VideoQueries` - Complex query logic - """ - - import Ecto.Query, warn: false - - alias Reencodarr.Core.Parsers - - alias Reencodarr.Analyzer.Broadway, as: AnalyzerBroadway - - alias Reencodarr.Media.{ - Library, - Video, - VideoFailure, - VideoQueries, - VideoUpsert, - Vmaf - } - - alias Reencodarr.Repo - require Logger - - # === Video CRUD Operations === - - @doc """ - Returns the list of videos ordered by most recently updated. - """ - @spec list_videos() :: [Video.t()] - def list_videos, do: Repo.all(from v in Video, order_by: [desc: v.updated_at]) - - @doc """ - Gets a single video by ID, raising if not found. - """ - @spec get_video!(integer()) :: Video.t() - def get_video!(id), do: Repo.get!(Video, id) - - @doc """ - Gets a single video by ID, returning nil if not found. - """ - @spec get_video(integer()) :: Video.t() | nil - def get_video(id), do: Repo.get(Video, id) - - @doc """ - Gets a video by its file path. - """ - @spec get_video_by_path(String.t()) :: Video.t() | nil - def get_video_by_path(path), do: Repo.one(from v in Video, where: v.path == ^path) - - @doc """ - Checks if a video exists at the given path. - """ - @spec video_exists?(String.t()) :: boolean() - def video_exists?(path), do: Repo.exists?(from v in Video, where: v.path == ^path) - - @doc """ - Finds videos matching a path wildcard pattern. - """ - @spec find_videos_by_path_wildcard(String.t()) :: [Video.t()] - def find_videos_by_path_wildcard(pattern), - do: Repo.all(from v in Video, where: like(v.path, ^pattern)) - - @doc """ - Creates a video with the given attributes. - """ - @spec create_video(map()) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()} - def create_video(attrs \\ %{}) do - %Video{} |> Video.changeset(attrs) |> Repo.insert() - end - - @doc """ - Creates or updates a video using upsert logic. - - Delegates to VideoUpsert for complex upsert handling. - """ - @spec upsert_video(map()) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()} - def upsert_video(attrs), do: VideoUpsert.upsert(attrs) - - @doc """ - Updates a video with the given attributes. - """ - @spec update_video(Video.t(), map()) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()} - def update_video(%Video{} = video, attrs) do - video |> Video.changeset(attrs) |> Repo.update() - end - - @doc """ - Deletes the given video. - """ - @spec delete_video(Video.t()) :: {:ok, Video.t()} | {:error, Ecto.Changeset.t()} - def delete_video(%Video{} = video), do: Repo.delete(video) - - @doc """ - Returns an `%Ecto.Changeset{}` for tracking video changes. - """ - @spec change_video(Video.t(), map()) :: Ecto.Changeset.t() - def change_video(%Video{} = video, attrs \\ %{}) do - Video.changeset(video, attrs) - end - - # === Video Query Delegations === - - @doc """ - Counts videos ready for CRF search. - """ - @spec count_videos_for_crf_search() :: integer() - def count_videos_for_crf_search do - VideoQueries.count_videos_for_crf_search() - end - - @doc """ - Counts videos needing analysis. - """ - @spec count_videos_needing_analysis() :: integer() - def count_videos_needing_analysis do - VideoQueries.count_videos_needing_analysis() - end - - @doc """ - Gets the next video(s) for encoding. - """ - @spec get_next_for_encoding(integer()) :: Vmaf.t() | [Vmaf.t()] | nil - def get_next_for_encoding(limit \\ 1) do - case limit do - 1 -> VideoQueries.videos_ready_for_encoding(1) |> List.first() - _ -> VideoQueries.videos_ready_for_encoding(limit) - end - end - - @doc """ - Counts videos in the encoding queue. - """ - @spec encoding_queue_count() :: integer() - def encoding_queue_count do - VideoQueries.encoding_queue_count() - end - - @doc """ - Lists videos awaiting CRF search (analyzed but no VMAFs). - """ - @spec list_videos_awaiting_crf_search() :: [Video.t()] - def list_videos_awaiting_crf_search do - from(v in Video, - left_join: vmafs in assoc(v, :vmafs), - where: is_nil(vmafs.id) and v.state == :analyzed, - select: v - ) - |> Repo.all() - end - - @doc """ - Checks if a video has any VMAF records. - """ - @spec video_has_vmafs?(Video.t()) :: boolean() - def video_has_vmafs?(%Video{id: id}), do: Repo.exists?(from v in Vmaf, where: v.video_id == ^id) - - # === Video Failure Operations === - - @doc """ - Gets unresolved failures for a video. - """ - @spec get_video_failures(integer()) :: [VideoFailure.t()] - def get_video_failures(video_id), do: VideoFailure.get_unresolved_failures_for_video(video_id) - - @doc """ - Resolves all failures for a video (typically when re-processing succeeds). - """ - @spec resolve_video_failures(integer()) :: :ok - def resolve_video_failures(video_id) do - video_id - |> VideoFailure.get_unresolved_failures_for_video() - |> Enum.each(&VideoFailure.resolve_failure/1) - end - - @doc """ - Gets failure statistics for monitoring and investigation. - """ - @spec get_failure_statistics(keyword()) :: map() - def get_failure_statistics(opts \\ []), do: VideoFailure.get_failure_statistics(opts) - - @doc """ - Gets common failure patterns for investigation. - """ - @spec get_common_failure_patterns(integer()) :: [map()] - def get_common_failure_patterns(limit \\ 10), - do: VideoFailure.get_common_failure_patterns(limit) - - @doc """ - Forces complete re-analysis of a video by resetting all analysis data and manually queuing it. - - This function: - 1. Deletes all VMAFs for the video - 2. Resets video analysis fields (bitrate, etc.) - 3. Manually adds the video to the analyzer queue - 4. Returns the video path for verification - - ## Parameters - - `video_id`: integer video ID - - ## Returns - - `{:ok, video_path}` on success - - `{:error, reason}` if video not found - - ## Examples - iex> force_reanalyze_video(9008028) - {:ok, "/path/to/video.mkv"} - """ - @spec force_reanalyze_video(integer()) :: {:ok, String.t()} | {:error, String.t()} - def force_reanalyze_video(video_id) when is_integer(video_id) do - case get_video(video_id) do - nil -> - {:error, "Video #{video_id} not found"} - - video -> - Repo.transaction(fn -> - # 1. Delete all VMAFs - delete_vmafs_for_video(video_id) - - # 2. Reset analysis fields to force re-analysis - update_video(video, %{ - bitrate: nil, - video_codecs: nil, - audio_codecs: nil, - max_audio_channels: nil, - atmos: nil, - hdr: nil, - width: nil, - height: nil, - frame_rate: nil, - duration: nil - }) - - # 3. Manually trigger analysis using Broadway dispatch - AnalyzerBroadway.dispatch_available() - - video.path - end) - |> case do - {:ok, path} -> {:ok, path} - {:error, reason} -> {:error, reason} - end - end - end - - # === Library CRUD Operations === - - @doc """ - Returns the list of libraries. - """ - @spec list_libraries() :: [Library.t()] - def list_libraries, do: Repo.all(from(l in Library)) - - @doc """ - Gets a single library by ID, raising if not found. - """ - @spec get_library!(integer()) :: Library.t() - def get_library!(id), do: Repo.get!(Library, id) - - @doc """ - Creates a library with the given attributes. - """ - @spec create_library(map()) :: {:ok, Library.t()} | {:error, Ecto.Changeset.t()} - def create_library(attrs \\ %{}) do - %Library{} |> Library.changeset(attrs) |> Repo.insert() - end - - @doc """ - Updates a library with the given attributes. - """ - @spec update_library(Library.t(), map()) :: {:ok, Library.t()} | {:error, Ecto.Changeset.t()} - def update_library(%Library{} = l, attrs) do - l |> Library.changeset(attrs) |> Repo.update() - end - - @doc """ - Deletes the given library. - """ - @spec delete_library(Library.t()) :: {:ok, Library.t()} | {:error, Ecto.Changeset.t()} - def delete_library(%Library{} = l), do: Repo.delete(l) - - @doc """ - Returns an `%Ecto.Changeset{}` for tracking library changes. - """ - @spec change_library(Library.t(), map()) :: Ecto.Changeset.t() - def change_library(%Library{} = l, attrs \\ %{}) do - Library.changeset(l, attrs) - end - - # === VMAF CRUD Operations === - - @doc """ - Returns the list of VMAFs. - """ - @spec list_vmafs() :: [Vmaf.t()] - def list_vmafs, do: Repo.all(Vmaf) - - @doc """ - Gets a single VMAF by ID with preloaded video. - """ - @spec get_vmaf!(integer()) :: Vmaf.t() - def get_vmaf!(id), do: Repo.get!(Vmaf, id) |> Repo.preload(:video) - - @doc """ - Creates a VMAF with the given attributes. - """ - @spec create_vmaf(map()) :: {:ok, Vmaf.t()} | {:error, Ecto.Changeset.t()} - def create_vmaf(attrs \\ %{}) do - %Vmaf{} |> Vmaf.changeset(attrs) |> Repo.insert() - end - - @doc """ - Creates or updates a VMAF using upsert logic. - """ - @spec upsert_vmaf(map()) :: {:ok, Vmaf.t()} | {:error, Ecto.Changeset.t()} - def upsert_vmaf(attrs) do - # Calculate savings if not provided but percent and video are available - attrs_with_savings = maybe_calculate_savings(attrs) - - result = - %Vmaf{} - |> Vmaf.changeset(attrs_with_savings) - |> Repo.insert( - on_conflict: {:replace_all_except, [:id, :video_id, :inserted_at]}, - conflict_target: [:crf, :video_id] - ) - - case result do - {:ok, vmaf} -> - # If this VMAF is chosen, update video state to crf_searched - if vmaf.chosen do - video = get_video!(vmaf.video_id) - Reencodarr.Media.mark_as_crf_searched(video) - end - - {:error, _error} -> - :ok - end - - result - end - - @doc """ - Updates a VMAF with the given attributes. - """ - @spec update_vmaf(Vmaf.t(), map()) :: {:ok, Vmaf.t()} | {:error, Ecto.Changeset.t()} - def update_vmaf(%Vmaf{} = vmaf, attrs) do - vmaf |> Vmaf.changeset(attrs) |> Repo.update() - end - - @doc """ - Deletes the given VMAF. - """ - @spec delete_vmaf(Vmaf.t()) :: {:ok, Vmaf.t()} | {:error, Ecto.Changeset.t()} - def delete_vmaf(%Vmaf{} = vmaf), do: Repo.delete(vmaf) - - @doc """ - Deletes all VMAFs for a given video ID. - - ## Parameters - - `video_id`: integer video ID - - ## Returns - - `{count, nil}` where count is the number of deleted VMAFs - - ## Examples - iex> delete_vmafs_for_video(123) - {3, nil} - """ - @spec delete_vmafs_for_video(integer()) :: {integer(), nil} - def delete_vmafs_for_video(video_id) when is_integer(video_id) do - from(v in Vmaf, where: v.video_id == ^video_id) - |> Repo.delete_all() - end - - @doc """ - Returns an `%Ecto.Changeset{}` for tracking VMAF changes. - """ - @spec change_vmaf(Vmaf.t(), map()) :: Ecto.Changeset.t() - def change_vmaf(%Vmaf{} = vmaf, attrs \\ %{}) do - Vmaf.changeset(vmaf, attrs) - end - - @doc """ - Checks if a chosen VMAF exists for the given video. - """ - @spec chosen_vmaf_exists?(Video.t()) :: boolean() - def chosen_vmaf_exists?(%{id: id}), - do: Repo.exists?(from v in Vmaf, where: v.video_id == ^id and v.chosen == true) - - @doc """ - Lists all chosen VMAFs. - """ - @spec list_chosen_vmafs() :: [Vmaf.t()] - def list_chosen_vmafs do - Repo.all(query_chosen_vmafs()) - end - - @doc """ - Gets the chosen VMAF for a specific video. - """ - @spec get_chosen_vmaf_for_video(Video.t()) :: Vmaf.t() | nil - def get_chosen_vmaf_for_video(%Video{id: video_id}) do - Repo.one( - from v in Vmaf, - join: vid in assoc(v, :video), - where: v.chosen == true and v.video_id == ^video_id and vid.state == :crf_searched, - preload: [:video], - order_by: [asc: v.percent, asc: v.time] - ) - end - - @doc """ - Marks a specific VMAF as chosen for a video and unmarks all others. - """ - @spec mark_vmaf_as_chosen(integer(), String.t() | float()) :: - {:ok, {integer(), nil}} | {:error, term()} - def mark_vmaf_as_chosen(video_id, crf) do - crf_float = parse_crf(crf) - - Repo.transaction(fn -> - from(v in Vmaf, where: v.video_id == ^video_id, update: [set: [chosen: false]]) - |> Repo.update_all([]) - - from(v in Vmaf, - where: v.video_id == ^video_id and v.crf == ^crf_float, - update: [set: [chosen: true]] - ) - |> Repo.update_all([]) - end) - end - - # === Private Helper Functions === - - # Calculate savings if not already provided and we have the necessary data - defp maybe_calculate_savings(attrs) do - case {Map.get(attrs, "savings"), Map.get(attrs, "percent"), Map.get(attrs, "video_id")} do - {nil, percent, video_id} - when (is_number(percent) or is_binary(percent)) and - (is_integer(video_id) or is_binary(video_id)) -> - case get_video(video_id) do - %Video{size: size} when is_integer(size) and size > 0 -> - savings = calculate_vmaf_savings(percent, size) - Map.put(attrs, "savings", savings) - - _ -> - attrs - end - - _ -> - attrs - end - end - - # Calculate estimated space savings in bytes based on percent and video size - defp calculate_vmaf_savings(percent, video_size) when is_binary(percent) do - case Parsers.parse_float_exact(percent) do - {:ok, percent_float} -> calculate_vmaf_savings(percent_float, video_size) - {:error, _} -> nil - end - end - - defp calculate_vmaf_savings(percent, video_size) - when is_number(percent) and is_number(video_size) and - percent > 0 and percent <= 100 do - # Savings = (100 - percent) / 100 * original_size - round((100 - percent) / 100 * video_size) - end - - defp calculate_vmaf_savings(_, _), do: nil - - # Consolidated shared logic for chosen VMAF queries - defp query_chosen_vmafs do - from v in Vmaf, - join: vid in assoc(v, :video), - where: v.chosen == true and vid.state == :crf_searched, - preload: [:video], - order_by: [asc: v.percent, asc: v.time] - end - - defp parse_crf(crf) do - {:ok, value} = Parsers.parse_float_exact(crf) - value - end -end diff --git a/lib/reencodarr/media/media_info_parser.ex b/lib/reencodarr/media/media_info_parser.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/media/shared_queries.ex b/lib/reencodarr/media/shared_queries.ex index 282b72ab..c0f3b776 100644 --- a/lib/reencodarr/media/shared_queries.ex +++ b/lib/reencodarr/media/shared_queries.ex @@ -11,8 +11,6 @@ defmodule Reencodarr.Media.SharedQueries do # Configuration constants @large_list_threshold 50 - @max_retry_count 3 - @default_min_file_size_mb 100 @doc """ Database-agnostic case-insensitive LIKE operation. @@ -64,163 +62,6 @@ defmodule Reencodarr.Media.SharedQueries do filter_videos_by_patterns(video_list, patterns) end - @doc """ - Get all unique video states from the database. - - Used by various dashboard and filtering components to present - consistent state options to users. - """ - def get_all_video_states do - from(v in Video, - select: v.state, - distinct: true, - order_by: v.state - ) - |> Reencodarr.Repo.all() - end - - @doc """ - Find videos that are candidates for retry operations. - - Returns videos in failed state that haven't exceeded the retry limit - and aren't in an indefinite failure state. - """ - def retry_candidate_videos(limit \\ 100) do - from(v in Video, - where: v.state == :failed, - # Configurable retry limit - where: v.retry_count < @max_retry_count, - order_by: [desc: v.updated_at], - limit: ^limit - ) - end - - @doc """ - Complex query to find videos with optimal encoding characteristics. - - Returns videos that are good candidates for immediate encoding based on: - - File size vs quality metrics - - Available CRF search results - - System capacity - """ - def optimal_encoding_candidates(opts \\ []) do - limit = Keyword.get(opts, :limit, 50) - min_file_size = Keyword.get(opts, :min_file_size_mb, @default_min_file_size_mb) - - # Convert MB to bytes for comparison - min_size_bytes = min_file_size * 1024 * 1024 - - from(v in Video, - left_join: vmaf in Vmaf, - on: v.id == vmaf.video_id, - where: v.state == :crf_searched, - where: v.size > ^min_size_bytes, - where: not is_nil(vmaf.crf), - select: %{ - video: v, - vmaf_score: vmaf.score, - crf: vmaf.crf, - compression_ratio: - fragment( - "CAST(? AS FLOAT) / CAST(? AS FLOAT)", - v.size, - v.size - ), - priority_score: - fragment( - """ - (CAST(? AS FLOAT) / 1000000.0) * -- File size in MB - (CASE - WHEN ? > 95 THEN 1.5 -- High VMAF bonus - WHEN ? > 90 THEN 1.2 -- Medium VMAF bonus - ELSE 1.0 - END) * - (CASE - WHEN CAST(? AS FLOAT) / CAST(? AS FLOAT) > 2.0 THEN 2.0 -- High compression potential - WHEN CAST(? AS FLOAT) / CAST(? AS FLOAT) > 1.5 THEN 1.5 -- Medium compression - ELSE 1.0 - END) - """, - v.size, - vmaf.score, - vmaf.score, - v.size, - v.size, - v.size, - v.size - ) - }, - order_by: [desc: fragment("priority_score")], - limit: ^limit - ) - end - - @doc """ - Get storage statistics across video states for dashboard display. - - Returns aggregated storage usage information grouped by video processing state. - """ - def storage_stats_by_state do - from(v in Video, - group_by: v.state, - select: %{ - state: v.state, - count: count(v.id), - total_size_gb: fragment("ROUND(CAST(SUM(?) AS FLOAT) / 1073741824.0, 2)", v.size), - avg_size_mb: fragment("ROUND(CAST(AVG(?) AS FLOAT) / 1048576.0, 2)", v.size), - largest_file_gb: fragment("ROUND(CAST(MAX(?) AS FLOAT) / 1073741824.0, 2)", v.size) - }, - order_by: [desc: fragment("total_size_gb")] - ) - end - - @doc """ - Find duplicate videos based on file size and duration. - - Helps identify potential duplicate content that may have been - imported from multiple sources or with slight variations. - """ - def potential_duplicate_videos(tolerance_percent \\ 5) do - # Calculate size tolerance (e.g., 5% difference) - size_tolerance_query = """ - WITH size_groups AS ( - SELECT - size, - duration, - ARRAY_AGG(id) as video_ids, - COUNT(*) as group_size - FROM videos - WHERE size IS NOT NULL - AND duration IS NOT NULL - GROUP BY size, duration - HAVING COUNT(*) > 1 - ), - tolerance_groups AS ( - SELECT DISTINCT - v1.id as video1_id, - v2.id as video2_id, - v1.path as path1, - v2.path as path2, - v1.size as size1, - v2.size as size2, - ABS(v1.size - v2.size) as size_diff, - ABS(v1.duration - v2.duration) as duration_diff - FROM videos v1 - JOIN videos v2 ON v1.id < v2.id - WHERE v1.size IS NOT NULL - AND v2.size IS NOT NULL - AND v1.duration IS NOT NULL - AND v2.duration IS NOT NULL - AND ABS(v1.size - v2.size) <= (GREATEST(v1.size, v2.size) * #{tolerance_percent} / 100) - AND ABS(v1.duration - v2.duration) <= 60 -- Within 1 minute - ) - SELECT * FROM tolerance_groups - ORDER BY size_diff ASC - """ - - Reencodarr.Repo.query(size_tolerance_query) - end - @doc """ Database-agnostic query to find video IDs with no chosen VMAFs. Used by delete_unchosen_vmafs functions. diff --git a/lib/reencodarr/pipeline_state_machine.ex b/lib/reencodarr/pipeline_state_machine.ex index 295b320c..20ad65c4 100644 --- a/lib/reencodarr/pipeline_state_machine.ex +++ b/lib/reencodarr/pipeline_state_machine.ex @@ -78,6 +78,14 @@ defmodule Reencodarr.PipelineStateMachine do def pause(%{current_state: :processing} = m), do: transition_to(m, :pausing) def pause(m), do: transition_to(m, :paused) + @doc """ + Handle pause request with proper state checking to avoid duplicate transitions. + Returns the updated pipeline state machine. + """ + def handle_pause_request(%{current_state: :processing} = m), do: pause(m) + def handle_pause_request(%{current_state: :pausing} = m), do: m + def handle_pause_request(m), do: transition_to(m, :paused) + def resume(%{current_state: c} = m) when c in [:paused, :stopped], do: transition_to(m, :running) diff --git a/lib/reencodarr/query_patterns.ex b/lib/reencodarr/query_patterns.ex deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/reencodarr/sync.ex b/lib/reencodarr/sync.ex index 317c4ed0..2677d69e 100644 --- a/lib/reencodarr/sync.ex +++ b/lib/reencodarr/sync.ex @@ -348,9 +348,10 @@ defmodule Reencodarr.Sync do def refresh_operations(file_id, :sonarr) do with {:ok, %Req.Response{body: episode_file}} <- Services.Sonarr.get_episode_file(file_id), - {:ok, _} <- Services.Sonarr.refresh_series(episode_file["seriesId"]), + {:ok, series_id} <- validate_series_id(episode_file["seriesId"]), + {:ok, _} <- Services.Sonarr.refresh_series(series_id), {:ok, _} <- - Services.Sonarr.rename_files(episode_file["seriesId"], [file_id]) do + Services.Sonarr.rename_files(series_id, [file_id]) do {:ok, "Refresh and rename triggered"} else {:error, reason} -> {:error, reason} @@ -359,8 +360,9 @@ defmodule Reencodarr.Sync do def refresh_operations(file_id, :radarr) do with {:ok, %Req.Response{body: movie_file}} <- Services.Radarr.get_movie_file(file_id), - {:ok, _} <- Services.Radarr.refresh_movie(movie_file["movieId"]), - {:ok, _} <- Services.Radarr.rename_movie_files(movie_file["movieId"]) do + {:ok, movie_id} <- validate_movie_id(movie_file["movieId"]), + {:ok, _} <- Services.Radarr.refresh_movie(movie_id), + {:ok, _} <- Services.Radarr.rename_movie_files(movie_id) do {:ok, "Refresh triggered for Radarr"} else {:error, reason} -> {:error, reason} @@ -375,6 +377,36 @@ defmodule Reencodarr.Sync do def rescan_and_rename_series(id), do: refresh_operations(id, :sonarr) + # Helper function to validate series ID from episode file response + defp validate_series_id(nil) do + Logger.error("Series ID is null - episode file may be orphaned or invalid") + {:error, "Series ID is null"} + end + + defp validate_series_id(series_id) when is_integer(series_id) and series_id > 0 do + {:ok, series_id} + end + + defp validate_series_id(series_id) do + Logger.error("Invalid series ID: #{inspect(series_id)} - expected positive integer") + {:error, "Invalid series ID"} + end + + # Helper function to validate movie ID from movie file response + defp validate_movie_id(nil) do + Logger.error("Movie ID is null - movie file may be orphaned or invalid") + {:error, "Movie ID is null"} + end + + defp validate_movie_id(movie_id) when is_integer(movie_id) and movie_id > 0 do + {:ok, movie_id} + end + + defp validate_movie_id(movie_id) do + Logger.error("Invalid movie ID: #{inspect(movie_id)} - expected positive integer") + {:error, "Invalid movie ID"} + end + def delete_video_and_vmafs(path) do case Media.delete_videos_with_path(path) do {:ok, _} -> :ok diff --git a/lib/reencodarr/utils.ex b/lib/reencodarr/utils.ex deleted file mode 100644 index c3c116b6..00000000 --- a/lib/reencodarr/utils.ex +++ /dev/null @@ -1,104 +0,0 @@ -defmodule Reencodarr.Utils do - @moduledoc """ - Core utility functions for the Reencodarr application. - - This module serves as the central hub for all utility functions, providing: - - Data validation and transformation - - Text parsing and regex operations - - Error handling patterns - - Guard macros for common patterns - - Replaces the scattered helper modules with a single, comprehensive utility module. - """ - - # === VALIDATION UTILITIES === - - @doc """ - Validates that a value is present and meaningful. - - Returns true if the value is not nil, empty string, empty list, or empty map. - """ - def present?(nil), do: false - def present?(""), do: false - def present?([]), do: false - def present?(map) when map == %{}, do: false - def present?(_), do: true - - @doc """ - Validates that a numeric value is positive. - """ - def positive?(value) when is_number(value), do: value > 0 - def positive?(_), do: false - - @doc """ - Validates that a numeric value is within a reasonable range. - """ - def in_range?(value, min, max) when is_number(value) do - value >= min and value <= max - end - - def in_range?(_, _, _), do: false - - @doc """ - Validates that a string is a valid file path. - """ - def valid_path?(path) when is_binary(path) and path != "", do: true - def valid_path?(_), do: false - - # === PARSING UTILITIES === - - @doc """ - Parses a line using regex pattern and field mappings. - - Returns {:error, :no_match} if no match, or {:ok, map} with extracted and transformed fields. - """ - @spec parse_with_regex(String.t(), Regex.t(), map()) :: {:ok, map()} | {:error, :no_match} - def parse_with_regex(line, pattern, field_mapping) when is_binary(line) do - case Regex.named_captures(pattern, line) do - nil -> {:error, :no_match} - captures -> {:ok, extract_fields(captures, field_mapping)} - end - end - - defp extract_fields(captures, field_mapping) do - Enum.reduce(field_mapping, %{}, fn {key, {capture_key, transformer}}, acc -> - case Map.get(captures, capture_key) do - nil -> Map.put(acc, key, nil) - raw_value -> Map.put(acc, key, transformer.(raw_value)) - end - end) - end - - # === GUARD MACROS === - - @doc """ - Guard for non-empty binary values. - """ - defguard is_non_empty_binary(value) when is_binary(value) and value != "" - - @doc """ - Guard for positive numbers. - """ - defguard is_positive_number(value) when is_number(value) and value > 0 - - @doc """ - Guard for non-negative numbers. - """ - defguard is_non_negative_number(value) when is_number(value) and value >= 0 - - @doc """ - Guard for valid percentage values (0-100). - """ - defguard is_valid_percentage(value) when is_number(value) and value >= 0 and value <= 100 - - @doc """ - Guard for non-empty lists. - """ - defguard is_non_empty_list(value) when is_list(value) and value != [] - - @doc """ - Guard for reasonable integer ranges. - """ - defguard is_reasonable_int(value, min, max) - when is_integer(value) and value >= min and value <= max -end diff --git a/lib/reencodarr_web/components/lcars_components.ex b/lib/reencodarr_web/components/lcars_components.ex deleted file mode 100644 index a2fe5b2c..00000000 --- a/lib/reencodarr_web/components/lcars_components.ex +++ /dev/null @@ -1,284 +0,0 @@ -defmodule ReencodarrWeb.LcarsComponents do - @moduledoc """ - Modern LCARS (Library Computer Access/Retrieval System) UI components. - - Provides reusable Star Trek-themed interface components with: - - Consistent styling and behavior - - Proper accessibility attributes - - Modern Phoenix 1.8+ patterns - - Comprehensive documentation - """ - - use Phoenix.Component - - import ReencodarrWeb.UIHelpers - - @doc """ - Renders the main LCARS page frame with header, navigation, and footer. - - Creates the overall page structure with LCARS styling and provides - slots for page content. - - ## Attributes - - * `title` (required) - Page title displayed in header - * `current_page` (required) - Current navigation page for highlighting - * `current_stardate` (required) - Stardate for footer display - - ## Slots - - * `inner_block` (required) - Main page content - """ - attr :title, :string, required: true, doc: "Page title for header" - attr :current_page, :atom, required: true, doc: "Current page for navigation highlighting" - attr :current_stardate, :float, required: true, doc: "Current stardate for footer" - - slot :inner_block, required: true, doc: "Main page content" - - def lcars_page_frame(assigns) do - ~H""" -
| File Name | -Bitrate | -Size | -
|---|---|---|
| - {format_name(file)} - | -- {Reencodarr.Formatters.bitrate_mbps(file.bitrate)} Mbit/s - | -- {Reencodarr.Formatters.file_size_gib(file.size)} GiB - | -
| File Name | -Size | -Savings | -Percent | -
|---|---|---|---|
| - {format_name(file.video)} - | -- {Reencodarr.Formatters.file_size_gib(file.video.size)} GiB - | -- {Reencodarr.Formatters.potential_savings_gib(file.video.size, file.predicted_filesize)} GiB - | -- {Reencodarr.Formatters.savings_percentage(file.video.size, file.predicted_filesize)}% - | -
Overview of current statistics
-Real-time status and controls for video transcoding pipeline
+Real-time status and controls for video transcoding pipeline
+Monitor and manage failed video processing operations
+<%= if @search_term != "" do %> No failed videos match your search criteria <% else %> All videos are processing successfully <% end %> -
{Path.dirname(video.path)} -
| ID | -FILE | -SIZE | -CODECS | -LATEST FAILURE | -ACTIONS | -
|---|---|---|---|---|---|
| {video.id} | -
-
- {Path.basename(video.path)}
-
-
- .../{video.path |> String.split("/") |> Enum.take(-2) |> Enum.join("/")}
+
+
+ <%= case Map.get(@video_failures, video.id) do %>
+ <% failures when is_list(failures) and length(failures) > 0 -> %>
+ <% latest_failure = List.first(failures) %>
+
+
+
-
+ {latest_failure.failure_stage} / {latest_failure.failure_category}
+ <%= if latest_failure.failure_code do %>
+ ({latest_failure.failure_code})
+ <% end %>
+
+ <%= if has_command_details?(latest_failure.system_context) do %>
+
+ 💻
+
+ <% end %>
|
-
- <%= if video.size do %>
- {Reencodarr.Formatters.file_size(video.size)}
- <% else %>
- Unknown
+ {latest_failure.failure_message} + <%= if length(failures) > 1 do %> ++ +{length(failures) - 1} additional {if length(failures) == 2, + do: "failure", + else: "failures"} + <% end %> - |
-
-
+
+ <% _ -> %>
+
+
+ <% end %>
+
+
+ <%= if video.id in @expanded_details do %>
+ No specific failure information recorded +
+
+
+ Technical Details+
- V: {format_codecs(video.video_codecs)}
+ Bitrate:
+
+ {Reencodarr.Formatters.bitrate(video.bitrate)}
+
- A: {format_codecs(video.audio_codecs)}
+ Duration:
+
+ {Reencodarr.Formatters.duration_minutes(video.duration)}
+
+
+
+ Resolution:
+
+ {Reencodarr.Formatters.resolution(video.width, video.height)}
+
+
+
+ Service:
+ {video.service_type || "Unknown"}
|
-
- <%= case Map.get(@video_failures, video.id) do %>
- <% failures when is_list(failures) and length(failures) > 0 -> %>
- <% latest_failure = List.first(failures) %>
- " text-xs"}>
-
- {latest_failure.failure_stage}/{latest_failure.failure_category}
-
-
- {latest_failure.failure_message}
-
- <%= if has_command_details?(latest_failure.system_context) do %>
-
- 💻 Command details available
+
+
+
+ <%= case Map.get(@video_failures, video.id) do %>
+ <% failures when is_list(failures) and length(failures) > 0 -> %>
+
+ + All Failures ({length(failures)}) ++
+ <%= for failure <- failures do %>
+
- <%= if length(failures) > 1 do %>
-
+
<% end %>
+
+
+ {failure.failure_stage} / {failure.failure_category}
+ <%= if failure.failure_code do %>
+ ({failure.failure_code})
+ <% end %>
+
+
+ {failure.failure_message} + + <%= if Map.get(failure.system_context || %{}, "command") do %> +
+
+ <% end %>
+
+ <%= if has_command_details?(failure.system_context) do %>
+
+ Command:
+
+
+ {Map.get(failure.system_context, "command")}
+
+
+
+ <% end %>
+ Output:
+
+
+
+ {format_command_output(
+ Map.get(failure.system_context, "full_output")
+ )}
+
- +{length(failures) - 1} more
-
- <% end %>
- <% _ -> %>
- No specific failures recorded
- <% end %>
- |
-
-
-
- |
-
|
-
-
-
-
- <%= case Map.get(@video_failures, video.id) do %>
- <% failures when is_list(failures) and length(failures) > 0 -> %>
- - VIDEO DETAILS --
-
-
- Bitrate: {Reencodarr.Formatters.bitrate(
- video.bitrate
- )}
-
-
- Duration: {Reencodarr.Formatters.duration_minutes(
- video.duration
- )}
-
-
- Resolution: {Reencodarr.Formatters.resolution(
- video.width,
- video.height
- )}
-
- Service: {video.service_type}
-
-
- <% _ -> %>
- - FAILURE DETAILS --
- <%= for failure <- failures do %>
-
-
-
- <% end %>
-
- {failure.failure_stage} / {failure.failure_category}
- {if failure.failure_code, do: " (#{failure.failure_code})"}
-
- {failure.failure_message}
-
-
- <%= if Map.get(failure.system_context || %{}, "command") do %>
-
-
- <% end %>
-
- <%= if has_command_details?(failure.system_context) do %>
-
- EXECUTED COMMAND:
-
-
- {Map.get(failure.system_context, "command")}
-
-
-
- <% end %>
-
-
- FULL COMMAND OUTPUT:
-
-
- {format_command_output(
- Map.get(failure.system_context, "full_output")
- )}
-
-
- {Calendar.strftime(
- failure.inserted_at,
- "%Y-%m-%d %H:%M:%S UTC"
- )}
-
-
- No detailed failure information available
-
- <% end %>
|
- |||||
{pattern.sample_message}
++ Learn how Reencodarr automatically optimizes video encoding +
+Reencodarr uses a sophisticated rule system to determine optimal encoding parameters for each video file. The rules analyze media properties and apply appropriate settings @@ -177,9 +209,9 @@ defmodule ReencodarrWeb.RulesLive do
Rules are applied in a specific order with different rules for different contexts:
-hdr/1
- - HDR and SDR tuning parameters
+ 1. HDR Rule:
+ hdr/1 - HDR and SDR tuning parameters
resolution/1 - 4K+ downscaling to 1080p
+ 2. Resolution Rule:
+ resolution/1 - 4K+ downscaling to 1080p
video/1
- - Pixel format standardization
+ 3. Video Rule:
+ video/1 - Pixel format standardization
audio/1
+ 0. Audio Rule:
+ audio/1
- Opus transcoding (encoding only)
cuda/1 - Hardware acceleration (manual application)grain/2 - Film grain synthesis for SDR contentcuda/1
+ - Hardware acceleration (manual application)
+ grain/2
+ - Film grain synthesis for SDR content
+ Reencodarr enforces consistent video quality standards across all your media by automatically applying the best pixel format for modern AV1 encoding.
-Every video gets converted to 10-bit YUV 4:2:0 format, regardless of its original format.
10-bit provides 1,024 shades per color channel instead of 256, resulting in smoother transitions and more accurate colors, especially noticeable in dark scenes and gradients.
This is the standard way video is stored - full resolution for brightness (luma) but reduced resolution for color information (chroma). Your eyes are more sensitive to brightness than color, so this saves space without visible quality loss.
This rule applies to ALL videos, whether they're 720p, 1080p, 4K, HDR, or SDR. It ensures your entire library has consistent, high-quality encoding.
@@ -319,11 +364,11 @@ defmodule ReencodarrWeb.RulesLive do+
Even old content gets the modern pixel format treatment for better compression and future compatibility.
+
Already-optimal content stays optimal, ensuring no degradation during re-encoding.
+
Streaming content gets upgraded to broadcast/disc quality standards for your personal library.
Reencodarr intelligently decides when and how to convert your audio to the modern Opus codec, which provides excellent quality at smaller file sizes.
-+
Everything else gets transcoded to Opus with channel-specific bitrates for optimal quality and file size.
| Audio Layout | -Channels | -Bitrate | -Notes | +|
|---|---|---|---|---|
| Audio Layout | +Channels | +Bitrate | +Notes | |
| Mono | 1 | -64 kbps | +64 kbps | Perfect for speech |
| Stereo | 2 | -96 kbps | +96 kbps | Excellent for music |
| 128 kbps | ⭐ Upmixed to 5.1! | |||
| 5.1 Surround | 6 | -384 kbps | +384 kbps | Theater experience |
| 7.1 Surround | 8 | -510 kbps | +510 kbps | Premium surround |
| High Channel Count | 9+ | -510 kbps | +510 kbps | Capped maximum |