Skip to content
Merged
9 changes: 2 additions & 7 deletions .iex.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,13 @@ defmodule IExHelpers do

@doc "Start all pipelines"
def start_all do
Analyzer.start()
CrfSearcher.start()
Encoder.start()
pipelines_status()
end

@doc "Pause all pipelines"
def pause_all do
Analyzer.pause()
CrfSearcher.pause()
Encoder.pause()
pipelines_status()
end

Expand All @@ -71,7 +67,7 @@ defmodule IExHelpers do

@doc "Get service configs"
def configs do
Services.list_services()
Services.list_configs()
end

@doc "Quick video state summary"
Expand All @@ -91,8 +87,7 @@ defmodule IExHelpers do
%{
video: video,
vmaf_count: length(video.vmafs),
chosen_vmaf: Enum.find(video.vmafs, & &1.chosen),
service: Services.get_service_by_id(video.service_id)
chosen_vmaf: Enum.find(video.vmafs, & &1.chosen)
}
end
end
Expand Down
29 changes: 0 additions & 29 deletions lib/reencodarr/ab_av1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,6 @@ defmodule Reencodarr.AbAv1 do
"""
@spec encode(Media.Vmaf.t()) :: :ok | {:error, atom()}
def encode(vmaf) do
# Skip MP4 files - compatibility issues to be resolved later
video_id = Map.get(vmaf, :video_id) || Map.get(vmaf, "video_id")

if is_integer(video_id) do
try do
video = Media.get_video!(video_id)

if is_binary(video.path) and String.ends_with?(video.path, ".mp4") do
# Skip MP4 files - compatibility issues
Logger.info("Skipping encode for MP4 file (compatibility issues): #{video.path}")
# Mark as failed to skip future encoding attempts
case Media.mark_as_failed(video) do
{:ok, _updated} -> :ok
error -> error
end
else
do_queue_encode(vmaf)
end
rescue
Ecto.NoResultsError ->
# Video doesn't exist - fall back to normal validation/queuing
do_queue_encode(vmaf)
end
else
do_queue_encode(vmaf)
end
end

defp do_queue_encode(vmaf) do
case QueueManager.validate_encode_request(vmaf) do
{:ok, validated_vmaf} ->
message = QueueManager.build_encode_message(validated_vmaf)
Expand Down
17 changes: 15 additions & 2 deletions lib/reencodarr/ab_av1/encode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,23 @@ defmodule Reencodarr.AbAv1.Encode do
end

# Private Helper Functions

# Determine output extension based on input file
# MP4 files output as MP4, everything else outputs as MKV
defp output_extension(video_path) do
case Path.extname(video_path) |> String.downcase() do
".mp4" -> ".mp4"
_ -> ".mkv"
end
end

defp prepare_encode_state(vmaf, state) do
# Mark video as encoding BEFORE starting the port to prevent duplicate dispatches
case Media.mark_as_encoding(vmaf.video) do
{:ok, _updated_video} ->
args = build_encode_args(vmaf)
output_file = Path.join(Helper.temp_dir(), "#{vmaf.video.id}.mkv")
ext = output_extension(vmaf.video.path)
output_file = Path.join(Helper.temp_dir(), "#{vmaf.video.id}#{ext}")

port = Helper.open_port(args)

Expand Down Expand Up @@ -309,12 +320,14 @@ defmodule Reencodarr.AbAv1.Encode do
end

defp build_encode_args(vmaf) do
ext = output_extension(vmaf.video.path)

base_args = [
"encode",
"--crf",
to_string(vmaf.crf),
"--output",
Path.join(Helper.temp_dir(), "#{vmaf.video.id}.mkv"),
Path.join(Helper.temp_dir(), "#{vmaf.video.id}#{ext}"),
"--input",
vmaf.video.path
]
Expand Down
2 changes: 1 addition & 1 deletion lib/reencodarr/ab_av1/output_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Reencodarr.AbAv1.OutputParser do
~r/\[(?<timestamp>[^\]]+)\].*?(?<progress>\d+(?:\.\d+)?)%,\s(?<fps>\d+(?:\.\d+)?)\sfps?,\seta\s(?<eta>\d+)\s(?<time_unit>second|minute|hour|day|week|month|year)s?/,
success: ~r/(?:\[.*\]\s)?crf\s(?<crf>\d+(?:\.\d+)?)\ssuccessful/,
warning: ~r/^Warning:\s(?<message>.*)/,
encoding_start: ~r/\[.*\] encoding (?<filename>\d+\.mkv)/,
encoding_start: ~r/\[.*\] encoding (?<filename>\d+\.(?:mkv|mp4))/,
encoding_progress:
~r/\[.*\]\s*(?<percent>\d+)%,\s*(?<fps>[\d\.]+)\s*fps,\s*eta\s*(?<eta>\d+)\s*(?<unit>minutes|seconds|hours|days|weeks|months|years)/,
encoding_progress_alt:
Expand Down
6 changes: 3 additions & 3 deletions lib/reencodarr/ab_av1/progress_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Reencodarr.AbAv1.ProgressParser do
case Process.get(:progress_parser_patterns) do
nil ->
patterns = %{
encoding_start: ~r/\[.*\] encoding (?<filename>\d+\.mkv)/,
encoding_start: ~r/\[.*\] encoding (?<filename>\d+\.(?:mkv|mp4))/,
# Main progress pattern with brackets: [timestamp] percent%, fps fps, eta time unit
progress:
~r/\[(?<timestamp>[^\]]+)\].*?(?<percent>\d+(?:\.\d+)?)%,\s(?<fps>\d+(?:\.\d+)?)\sfps?,?\s?eta\s(?<eta>\d+)\s(?<time_unit>(?:second|minute|hour|day|week|month|year)s?)/,
Expand Down Expand Up @@ -99,10 +99,10 @@ defmodule Reencodarr.AbAv1.ProgressParser do
end

defp handle_encoding_start(%{"filename" => filename_with_ext}, state) do
# Extract video ID from filename (e.g., "123.mkv" -> get original filename)
# Extract video ID from filename (e.g., "123.mkv" or "123.mp4" -> get original filename)
video_id =
filename_with_ext
|> Path.basename(".mkv")
|> Path.basename(Path.extname(filename_with_ext))
|> Parsers.parse_int(0)

filename = get_filename_for_encoding_start(state, video_id, filename_with_ext)
Expand Down
85 changes: 55 additions & 30 deletions lib/reencodarr/analyzer/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Reencodarr.Analyzer.Broadway do
Processing.Pipeline
}

alias Reencodarr.Core.Retry
alias Reencodarr.Dashboard.Events
alias Reencodarr.Media
alias Reencodarr.Media.{Codecs, Video}
Expand Down Expand Up @@ -389,7 +390,11 @@ defmodule Reencodarr.Analyzer.Broadway do
)

# Only proceed with upsert if we have successful videos
if length(successful_videos) > 0 do
if Enum.empty?(successful_videos) do
Logger.debug("Broadway: No successful videos to upsert")
# Still handle any failed paths
log_processing_summary([], failed_paths ++ additional_failed_paths)
else
# Extract video attributes from successful videos
video_attrs_list = Enum.map(successful_videos, fn {_video_info, attrs} -> attrs end)

Expand All @@ -405,10 +410,6 @@ defmodule Reencodarr.Analyzer.Broadway do
Logger.error("Broadway: perform_batch_upsert failed: #{inspect(reason)}")
{:error, reason}
end
else
Logger.debug("Broadway: No successful videos to upsert")
# Still handle any failed paths
log_processing_summary([], failed_paths ++ additional_failed_paths)
end

Logger.debug("Broadway: batch_upsert_and_transition_videos completed")
Expand Down Expand Up @@ -595,38 +596,72 @@ defmodule Reencodarr.Analyzer.Broadway do

# Applies the processing decision by updating video state in the database
defp apply_processing_decision(video, {:skip_encoding, reason}) do
Logger.debug("Video #{video.path} #{reason}, marking as encoded (skipping all processing)")
# Check if video is already encoded - if so, no need to transition
if video.state == :encoded do
Logger.debug(
"Video #{video.path} already in encoded state, skipping transition (reason: #{reason})"
)

case Media.mark_as_encoded(video) do
{:ok, updated_video} ->
Logger.debug(
"Successfully marked as encoded: #{video.path}, video_id: #{updated_video.id}, state: #{updated_video.state}"
{:ok, video}
else
Logger.debug("Video #{video.path} #{reason}, marking as encoded (skipping all processing)")

result =
Retry.retry_on_db_busy(
fn -> Media.mark_as_encoded(video) end,
max_attempts: 10,
base_backoff_ms: 50
)

{:ok, updated_video}
case result do
{:ok, updated_video} ->
Logger.debug(
"Successfully marked as encoded: #{video.path}, video_id: #{updated_video.id}, state: #{updated_video.state}"
)

{:ok, updated_video}

{:error, changeset_error} ->
Logger.error("Failed to mark as encoded for #{video.path}: #{inspect(changeset_error)}")
{:error, error} ->
Logger.error("Failed to mark as encoded for #{video.path}: #{inspect(error)}")

{:ok, video}
{:ok, video}
end
end
end

defp apply_processing_decision(video, {:needs_encoding, _reason}) do
# Skip if video is already past analysis stage
if video.state in [:analyzed, :crf_searching, :crf_searched, :encoding, :encoded] do
Logger.debug(
"Video #{video.path} already in state #{video.state}, skipping analysis transition"
)

{:ok, video}
else
mark_video_as_analyzed(video)
end
end

defp mark_video_as_analyzed(video) do
# Validate video has required fields before marking as analyzed
if has_required_mediainfo_fields?(video) do
case Media.mark_as_analyzed(video) do
result =
Retry.retry_on_db_busy(
fn -> Media.mark_as_analyzed(video) end,
max_attempts: 10,
base_backoff_ms: 50
)

case result do
{:ok, updated_video} ->
Logger.debug(
"Successfully marked as analyzed: #{video.path}, video_id: #{updated_video.id}, state: #{updated_video.state}"
)

{:ok, updated_video}

{:error, changeset_error} ->
Logger.error(
"Failed to mark as analyzed for #{video.path}: #{inspect(changeset_error)}"
)
{:error, error} ->
Logger.error("Failed to mark as analyzed for #{video.path}: #{inspect(error)}")

{:ok, video}
end
Expand Down Expand Up @@ -673,21 +708,11 @@ defmodule Reencodarr.Analyzer.Broadway do
{:ok, video} ->
# Record detailed failure information based on reason
record_analysis_failure(video, reason)

Logger.debug("Successfully recorded analysis failure for video #{video.id}")
:ok

{:error, :not_found} ->
Logger.warning("Video not found in database, deleting orphan file: #{path}")

case File.rm(path) do
:ok ->
Logger.info("Successfully deleted orphan file: #{path}")

{:error, reason} ->
Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}")
end

Logger.warning("Video not found in database for path: #{path}")
:ok
end
end
Expand Down
44 changes: 28 additions & 16 deletions lib/reencodarr/analyzer/broadway/performance_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,32 @@ defmodule Reencodarr.Analyzer.Broadway.PerformanceMonitor do
time_since_last = current_time - state.last_adjustment

# Only process if we have enough data and auto-tuning is enabled
if length(state.throughput_history) >= 3 and time_since_last >= @adjustment_interval do
avg_throughput = calculate_average_throughput(state.throughput_history)
cond do
length(state.throughput_history) < 3 or time_since_last < @adjustment_interval ->
state

# Only log and adjust if there's been recent activity (throughput > 0)
if avg_throughput > 0 do
adjust_based_on_throughput(state, avg_throughput, current_time)
else
# No recent activity, just reset the timer
%{state | last_adjustment: current_time}
end
not has_recent_activity?(state.throughput_history, current_time) ->
# No recent activity, clear stale history and reset timer silently
%{state | last_adjustment: current_time, throughput_history: []}

true ->
maybe_adjust_with_throughput(state, current_time)
end
end

defp has_recent_activity?(throughput_history, current_time) do
recent_cutoff = current_time - @adjustment_interval
Enum.any?(throughput_history, fn {timestamp, _} -> timestamp > recent_cutoff end)
end

defp maybe_adjust_with_throughput(state, current_time) do
avg_throughput = calculate_average_throughput(state.throughput_history)

if avg_throughput > 0 do
adjust_based_on_throughput(state, avg_throughput, current_time)
else
state
# No meaningful throughput, just reset the timer
%{state | last_adjustment: current_time}
end
end

Expand Down Expand Up @@ -355,13 +369,11 @@ defmodule Reencodarr.Analyzer.Broadway.PerformanceMonitor do
Logger.warning("Failed to send context update to producer: #{inspect(error)}")
end

defp calculate_average_throughput([]), do: 0

defp calculate_average_throughput(history) do
if length(history) > 0 do
total = Enum.reduce(history, 0, fn {_time, throughput}, acc -> acc + throughput end)
total / length(history)
else
0
end
total = Enum.reduce(history, 0, fn {_time, throughput}, acc -> acc + throughput end)
total / length(history)
end

defp calculate_throughput(batch_size, duration_ms) when duration_ms > 0,
Expand Down
13 changes: 4 additions & 9 deletions lib/reencodarr/analyzer/core/file_operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,10 @@ defmodule Reencodarr.Analyzer.Core.FileOperations do
end

defp validate_file_accessibility(path, %{exists: true}) do
# Skip MP4 files - compatibility issues to be resolved later
if String.ends_with?(path, ".mp4") do
{:error, "MP4 files skipped (compatibility issues)"}
else
# Additional checks can be added here (permissions, file type, etc.)
case File.stat(path, [:read]) do
{:ok, _file_stat} -> :ok
{:error, reason} -> {:error, "file not accessible: #{path} (#{reason})"}
end
# Additional checks can be added here (permissions, file type, etc.)
case File.stat(path, [:read]) do
{:ok, _file_stat} -> :ok
{:error, reason} -> {:error, "file not accessible: #{path} (#{reason})"}
end
end

Expand Down
5 changes: 3 additions & 2 deletions lib/reencodarr/analyzer/core/file_stat_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ defmodule Reencodarr.Analyzer.Core.FileStatCache do
end)

new_timers = Map.drop(state.cache_timers, expired_keys)
expired_count = length(expired_keys)

if length(expired_keys) > 0 do
Logger.debug("FileStatCache: Cleaned up #{length(expired_keys)} expired entries")
if expired_count > 0 do
Logger.debug("FileStatCache: Cleaned up #{expired_count} expired entries")
end

{:noreply, %{state | cache: active_cache, cache_timers: new_timers}}
Expand Down
2 changes: 1 addition & 1 deletion lib/reencodarr/analyzer/media_info/command_executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Reencodarr.Analyzer.MediaInfo.CommandExecutor do
Automatically optimizes batch size and concurrency based on system capabilities.
"""
@spec execute_batch_mediainfo([String.t()]) :: {:ok, map()} | {:error, term()}
def execute_batch_mediainfo(paths) when is_list(paths) and length(paths) > 0 do
def execute_batch_mediainfo([_ | _] = paths) do
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guard pattern has changed from when is_list(paths) and length(paths) > 0 to when [_ | _] = paths. However, this creates a subtle bug: the new pattern [_ | _] = paths is a match operation that will always succeed and bind the result to paths, rather than a proper guard. The correct non-empty list pattern in a guard should be [_ | _] without the assignment. The function will now match any value (not just non-empty lists) because the pattern match in the guard always succeeds.

Copilot uses AI. Check for mistakes.
Logger.debug("Executing MediaInfo for #{length(paths)} files")

# Pre-filter existing files to avoid command errors
Expand Down
Loading