Skip to content

Conversation

@mjc
Copy link
Owner

@mjc mjc commented Oct 27, 2025

Complete Broadway Pipeline Simplification

Problem

Broadway pipelines (Analyzer, CRF Searcher, Encoder) were stopping after processing single items instead of continuously processing queued work. Complex state machines and pause/resume functionality were causing reliability issues and making the codebase difficult to maintain.

Solution: Architectural Simplification

Completely rewrote all Broadway producers from complex state machines to simple polling-based architecture.

Before vs After

  • Analyzer Producer: 442 → 62 lines (86% reduction)
  • CRF Searcher Producer: 281 → 43 lines (85% reduction)
  • Encoder Producer: 398 → 69 lines (83% reduction)
  • Net Change: -949 lines of code across 19 files

What Was Removed

  • ✂️ PipelineStateMachine module - Complex state transitions eliminated
  • ✂️ Pause/Resume functionality - Pipelines now run continuously
  • ✂️ Demand tracking complexity - No more accumulation/dispatching logic
  • ✂️ Cross-producer communication - Simplified to polling pattern
  • ✂️ Event system fragmentation - Unified to single Events channel

New Architecture

Simple Polling Pattern:

  1. Every 2 seconds, check for available work
  2. If work exists and GenServer available, push 1 item to wake Broadway
  3. Broadway handles batching and backpressure automatically
  4. No manual state management or pause/resume controls

Additional Improvements

Database Resilience

  • DB Busy Handling: Added retry mechanisms for SQLite busy errors
  • CRF Search: Requeue messages with 200-500ms backoff during DB contention
  • Encoder: Schedule retry attempts instead of crashing on DB busy

Code Quality Fixes

  • Warnings: All Credo warnings eliminated
  • Type Safety: Fixed Core.Time.to_seconds/2 to handle float inputs
  • Image Processing: Enhanced MKV attachment removal using track UIDs
  • Test Coverage: Replaced 242-line state management tests with 121-line producer tests

Runtime Stability

  • Error Recovery: GenServers no longer crash on database busy conditions
  • Orphan Cleanup: Delete files that exist on disk but not in database
  • Argument Safety: Fixed mkvpropedit argument order and nil handling

Benefits

Reliability

  • No More Stuck Pipelines: Simple polling prevents complex state machine bugs
  • Database Resilience: Graceful handling of SQLite busy conditions
  • Fault Tolerance: Simpler code = fewer failure modes

Maintainability

  • 86% Less Code: Dramatic reduction in producer complexity
  • No State Machines: Eliminated hard-to-debug state transitions
  • Clear Architecture: Polling pattern is easy to understand and modify

Performance

  • Automatic Scaling: Broadway handles batching and concurrency
  • Resource Efficiency: No complex demand tracking overhead
  • Predictable Behavior: Simple polling is easier to reason about

Testing

  • 608 tests passing (0 failures)
  • Credo strict checks passing
  • Code formatting validated
  • All warnings eliminated

Migration Notes

  • Breaking Change: Pause/resume functionality removed from UI
  • Behavioral Change: Pipelines now run continuously (no user control needed)
  • Performance: Better throughput due to simplified architecture
  • Monitoring: Service status now reflects actual GenServer availability

This represents a complete architectural simplification prioritizing reliability and maintainability over complex features that were causing operational issues.

mjc added 6 commits October 23, 2025 12:23
- Fix encoder producer: decrement demand when dispatching VMAFs so Broadway calls handle_demand again
- Fix pattern matching for completion events in all three pipelines:
  - Encoder: changed from 3-tuple destructure to 2-tuple with event_data variable
  - CRF searcher: changed from 3-tuple to 2-tuple matching
  - Analyzer: changed to explicitly match 2-tuple with data variable

The issue was that producers were not correctly handling completion events because
the pattern matching didn't match the actual broadcast event structure (2-tuple
{:event_name, data_map} not 3-tuple). This meant completion handlers never fired,
so producers never transitioned state or dispatched the next item.
- Remove complex availability checking and demand tracking
- Let CrfSearch GenServer handle queueing via its mailbox
- Trust VideoStateMachine to prevent duplicate sends via state transitions
- Prevents Broadway from sending duplicate videos via handle_demand
- VideoStateMachine transitions happen immediately before async cast
- Producer's database query won't return videos in :crf_searching state
- Simplify get_optimistic_service_status to just check if processes alive
- Show :idle by default for running pipelines, :processing when work is active
- Reset status back to :idle when operations complete (encoding_completed, crf_search_completed)
- Add CrfSearcherBroadway alias for consistency
- Remove dispatch_available/0 and queue_video/1 from CrfSearcher (no longer needed)
- Remove empty pipeline_controller.ex file
- Format code to fix trailing whitespace
- Fix demand tracking in producers (don't discard demand when busy)
- Track current_video_id in CRF producer to prevent duplicate dispatches
- Unify all completion events to single Events.channel() (dashboard)
- Remove separate PubSub topics (crf_search_events, video_state_transitions, etc)
- Simplify status checks (actively_running based on actual GenServer state)
- Add Encode.available?() function
- Fix Core.Time.to_seconds to accept floats
- Fix mkvpropedit image stream removal (use track UIDs not just attachments)
- Broadcast crf_search_completed via Events instead of separate PubSub topic
Copilot AI review requested due to automatic review settings October 27, 2025 17:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes Broadway pipelines stopping after processing single items by addressing demand tracking, duplicate dispatches, and event system fragmentation. The root causes were: producers discarding demand instead of accumulating it, CRF producer returning the same video repeatedly, and over-complicated pipeline state machines obscuring actual worker state.

Key Changes:

  • Fixed demand tracking across all Broadway producers to accumulate demand when workers are busy
  • Prevented duplicate video dispatches in CRF producer by tracking current_video_id
  • Unified event subscriptions to single Events.channel() instead of multiple fragmented PubSub topics
  • Simplified status checking to directly query GenServer state instead of complex state machines
  • Added missing Encode.available?() function and fixed Core.Time.to_seconds/2 to accept floats

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
lib/reencodarr_web/live/dashboard_live.ex Updated completion handlers to reset service status to idle, simplified optimistic status checking
lib/reencodarr/encoder/broadway/producer.ex Fixed demand accumulation bug and simplified status broadcasting
lib/reencodarr/encoder.ex Simplified actively_running? and delegated available? to Encode module
lib/reencodarr/crf_searcher/supervisor.ex Added dynamic Broadway start/stop functions for pause/resume functionality
lib/reencodarr/crf_searcher/broadway/producer.ex Complete rewrite: removed state machine complexity, added current_video_id tracking to prevent duplicates
lib/reencodarr/crf_searcher/broadway.ex Updated pause/resume to use supervisor-based start/stop instead of state machine
lib/reencodarr/crf_searcher.ex Simplified API by delegating to Broadway and CrfSearch modules
lib/reencodarr/core/time.ex Fixed typespec and added trunc() calls to handle float inputs
lib/reencodarr/analyzer/broadway/producer.ex Unified event subscription and simplified status broadcasting
lib/reencodarr/ab_av1/helper.ex Enhanced image stream removal to use track UIDs with mkvpropedit --delete-track
lib/reencodarr/ab_av1/encode.ex Added missing available?/0 function referenced by encoder module
lib/reencodarr/ab_av1/crf_search.ex Restored completion event broadcasting and added state transition when starting CRF search
.vscode/settings.json Removed workspace color customizations

mjc added 3 commits October 27, 2025 18:49
- Remove dead code calling removed Producer.pause() in encoder/broadway.ex
- Replace non-existent Factory import with Fixtures in tests
- Remove obsolete state_management_test.exs (tested removed State module)
- Fix broadway_producers_test.exs:
  * Use {:producer, state} pattern (GenStage) not {:ok, state}
  * Handle {:ok, video} tuple from video_fixture()
  * Simplify assertions to be timing-tolerant
- Make dispatch_available() safer by checking Process.whereis() first
- Add orphan file deletion when video not found in DB
- Fix mark_invalid_videos to return {:error, path} not {:skip, message}
- Remove unused Video alias from crf_search.ex
- Add logging for image attachment detection/removal

All tests passing (608 tests, 0 failures)
No compile warnings
… retry encode exit notifications

- Catch Exqlite.DB busy and DBConnection errors while processing CRF search output and requeue the original port message with a short backoff instead of crashing the GenServer.
- For encoder, catch DB busy when notifying success/failure after process exit and schedule retry; only clear state when notification succeeds.
- Fix potential nil dereference in Helper.ex by using get_in/2 for safe nested access
- Add clear documentation for perform_crf_search_cleanup/1 function clauses
- Clarify when each cleanup function is called (with vs without video task)
@mjc mjc requested a review from Copilot October 29, 2025 20:23
@mjc mjc merged commit 152d88b into main Oct 29, 2025
7 checks passed
@mjc mjc deleted the fix/broadway-pipelines-stopping branch October 29, 2025 20:27
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.

# Individual analysis completed - this is handled by batch completion now
{:noreply, [], state}
end
def handle_demand(_demand, state), do: {:noreply, [], state}
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second handle_demand/2 clause will never match because the first clause already handles all cases. When demand <= 0, the first clause won't match due to the guard, but GenStage will never call handle_demand with a non-positive demand value. The second clause should be removed as it's unreachable code.

Suggested change
def handle_demand(_demand, state), do: {:noreply, [], state}

Copilot uses AI. Check for mistakes.
Comment on lines +36 to 48
def handle_info(:poll, state) do
schedule_poll()
# Check if there's work and Encode is available, wake up Broadway if so
if Encode.available?() do
case Media.get_next_for_encoding(1) do
%Reencodarr.Media.Vmaf{} = vmaf -> {:noreply, [vmaf], state}
[%Reencodarr.Media.Vmaf{} = vmaf] -> {:noreply, [vmaf], state}
_ -> {:noreply, [], state}
end
end)
end

defp dispatch_if_ready(state) do
current_status = PipelineStateMachine.get_state(state.pipeline)

Logger.debug(
"Producer: dispatch_if_ready called - status: #{current_status}, demand: #{state.demand}"
)

if should_dispatch?(state) and state.demand > 0 do
Logger.debug("Producer: dispatch_if_ready - conditions met, dispatching VMAFs")
dispatch_vmafs(state)
else
Logger.debug(
"Producer: dispatch_if_ready - conditions NOT met, not dispatching (should_dispatch: #{should_dispatch?(state)}, demand: #{state.demand})"
)

handle_no_dispatch_encoder(state)
{:noreply, [], state}
end
end
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handle_info(:poll, state) and dispatch(state) functions contain duplicated logic with only minor differences in pattern matching. Consider extracting the common logic or having handle_info call dispatch directly to reduce duplication and improve maintainability.

Copilot uses AI. Check for mistakes.
Comment on lines +30 to 41
def handle_info(:poll, state) do
schedule_poll()
# Check if there's work and CrfSearch is available, wake up Broadway if so
if CrfSearch.available?() do
case Media.get_videos_for_crf_search(1) do
[] ->
# No videos to process - transition to idle
new_pipeline = PipelineStateMachine.transition_to(state.pipeline, :idle)
new_state = %{state | pipeline: new_pipeline}
{:noreply, [], new_state}

[_video | _] ->
# Videos available but no demand or CRF service unavailable
{:noreply, [], state}
[] -> {:noreply, [], state}
videos -> {:noreply, videos, state}
end
else
{:noreply, [], state}
end
end
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handle_info(:poll, state) and dispatch(state) functions contain duplicated logic. The dispatch function doesn't match empty lists explicitly like handle_info does, creating inconsistency. Consider consolidating this logic to avoid duplication and ensure consistent behavior.

Copilot uses AI. Check for mistakes.
defp image_video_track?(track) do
track["type"] == "video" and
(String.upcase(track["codec"] || "") in ["MJPEG", "PNG"] or
String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW", "PNG"]))
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check String.contains?(... || \"\", [\"V_MS/VFW\", \"PNG\"]) will match any track containing "PNG" in its codec_id, which could incorrectly identify legitimate PNG-encoded video streams as image attachments. The logic should be more specific - "PNG" is already checked in the codec field, so the codec_id check should only look for "V_MS/VFW" to identify attached pictures, not "PNG" as well.

Suggested change
String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW", "PNG"]))
String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW"]))

Copilot uses AI. Check for mistakes.
:ok

{:error, reason} ->
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}")
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message logs a warning when the state transition fails but then continues to process the CRF search anyway. This could lead to inconsistent state. Consider either handling the error properly by returning early, or updating the log message to clarify that processing continues despite the state transition failure.

Suggested change
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}")
Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}. Aborting CRF search for this video.")
# Do not proceed with CRF search if state transition fails
{:noreply, state}

Copilot uses AI. Check for mistakes.
Comment on lines +669 to +678
Logger.warning("Video not found in database, deleting orphan file: #{path}")

case File.rm(path) do
:ok ->
Logger.info("Successfully deleted orphan file: #{path}")

{:error, reason} ->
Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}")
end

Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatically deleting files that aren't found in the database could be dangerous if there's a temporary database issue or race condition. This could lead to unintended data loss. Consider adding a safeguard such as checking if the file is in an expected directory, or requiring manual intervention for orphan file cleanup.

Suggested change
Logger.warning("Video not found in database, deleting orphan file: #{path}")
case File.rm(path) do
:ok ->
Logger.info("Successfully deleted orphan file: #{path}")
{:error, reason} ->
Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}")
end
Logger.warning("Video not found in database for failed analysis: #{path}. Orphan file NOT deleted automatically. Manual review required.")
# Optionally, emit a dashboard event or notification for manual orphan file review
Events.orphan_file_detected(path, reason)

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +158
rescue
e in Exqlite.Error ->
Logger.warning(
"Encode: Database busy while handling exit_status, scheduling retry: #{inspect(e)}"
)

# Retry after short backoff. Keep current state until retry completes.
Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 200)
:retry_scheduled
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded retry delay of 200ms appears in multiple places (lines 157, 200). This magic number should be extracted to a module attribute like @db_retry_delay_ms for consistency and easier tuning.

Copilot uses AI. Check for mistakes.
Comment on lines +250 to +269
rescue
e in Exqlite.Error ->
# Database is busy — don't crash. Requeue the same port message with a short delay.
Logger.warning(
"CrfSearch: Database busy while upserting VMAF, requeuing line: #{inspect(e)}"
)

# Re-send the exact same message after a short backoff to retry
Process.send_after(self(), {port, {:data, {:eol, line}}}, 200)

# Keep state intact — we'll retry later
{:noreply, state}

e in DBConnection.ConnectionError ->
Logger.warning(
"CrfSearch: DB connection error while upserting VMAF, requeuing line: #{inspect(e)}"
)

{:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}}
Process.send_after(self(), {port, {:data, {:eol, line}}}, 200)
{:noreply, state}
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry logic for Exqlite.Error and DBConnection.ConnectionError is duplicated with identical behavior (200ms delay, same message requeue). This duplication should be consolidated into a single rescue clause handling both error types, or extracted into a helper function.

Copilot uses AI. Check for mistakes.
Comment on lines +74 to 75
@spec clean_mkv_attachments(String.t()) :: {:ok, String.t()}
def clean_mkv_attachments(file_path) do
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function spec changed from {:ok, String.t()} | {:error, term()} to always returning {:ok, String.t()}, but error cases (like check_for_image_attachments returning {:error, reason}) now also return {:ok, file_path}, masking failures. Callers can no longer distinguish between successful cleaning and fallback to original file due to errors. Consider keeping the error tuple in the spec or documenting this behavior change.

Copilot uses AI. Check for mistakes.
defp mark_invalid_videos(invalid_videos) do
Enum.map(invalid_videos, fn video_info ->
{:skip, "file validation failed: #{video_info.path}"}
{:error, video_info.path}
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value changed from {:skip, \"file validation failed: #{video_info.path}\"} to {:error, video_info.path}. This changes both the tag (:skip to :error) and the message format. Verify that all callers of this function correctly handle the :error tuple and that the path-only format provides sufficient debugging information compared to the previous descriptive message.

Suggested change
{:error, video_info.path}
{:skip, "file validation failed: #{video_info.path}"}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants