-
Notifications
You must be signed in to change notification settings - Fork 0
Fix Broadway pipelines stopping after each item #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Fix encoder producer: decrement demand when dispatching VMAFs so Broadway calls handle_demand again
- Fix pattern matching for completion events in all three pipelines:
- Encoder: changed from 3-tuple destructure to 2-tuple with event_data variable
- CRF searcher: changed from 3-tuple to 2-tuple matching
- Analyzer: changed to explicitly match 2-tuple with data variable
The issue was that producers were not correctly handling completion events because
the pattern matching didn't match the actual broadcast event structure (2-tuple
{:event_name, data_map} not 3-tuple). This meant completion handlers never fired,
so producers never transitioned state or dispatched the next item.
- Remove complex availability checking and demand tracking - Let CrfSearch GenServer handle queueing via its mailbox - Trust VideoStateMachine to prevent duplicate sends via state transitions
- Prevents Broadway from sending duplicate videos via handle_demand - VideoStateMachine transitions happen immediately before async cast - Producer's database query won't return videos in :crf_searching state
- Simplify get_optimistic_service_status to just check if processes alive - Show :idle by default for running pipelines, :processing when work is active - Reset status back to :idle when operations complete (encoding_completed, crf_search_completed) - Add CrfSearcherBroadway alias for consistency
- Remove dispatch_available/0 and queue_video/1 from CrfSearcher (no longer needed) - Remove empty pipeline_controller.ex file - Format code to fix trailing whitespace
- Fix demand tracking in producers (don't discard demand when busy) - Track current_video_id in CRF producer to prevent duplicate dispatches - Unify all completion events to single Events.channel() (dashboard) - Remove separate PubSub topics (crf_search_events, video_state_transitions, etc) - Simplify status checks (actively_running based on actual GenServer state) - Add Encode.available?() function - Fix Core.Time.to_seconds to accept floats - Fix mkvpropedit image stream removal (use track UIDs not just attachments) - Broadcast crf_search_completed via Events instead of separate PubSub topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes Broadway pipelines stopping after processing single items by addressing demand tracking, duplicate dispatches, and event system fragmentation. The root causes were: producers discarding demand instead of accumulating it, CRF producer returning the same video repeatedly, and over-complicated pipeline state machines obscuring actual worker state.
Key Changes:
- Fixed demand tracking across all Broadway producers to accumulate demand when workers are busy
- Prevented duplicate video dispatches in CRF producer by tracking
current_video_id - Unified event subscriptions to single
Events.channel()instead of multiple fragmented PubSub topics - Simplified status checking to directly query GenServer state instead of complex state machines
- Added missing
Encode.available?()function and fixedCore.Time.to_seconds/2to accept floats
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
lib/reencodarr_web/live/dashboard_live.ex |
Updated completion handlers to reset service status to idle, simplified optimistic status checking |
lib/reencodarr/encoder/broadway/producer.ex |
Fixed demand accumulation bug and simplified status broadcasting |
lib/reencodarr/encoder.ex |
Simplified actively_running? and delegated available? to Encode module |
lib/reencodarr/crf_searcher/supervisor.ex |
Added dynamic Broadway start/stop functions for pause/resume functionality |
lib/reencodarr/crf_searcher/broadway/producer.ex |
Complete rewrite: removed state machine complexity, added current_video_id tracking to prevent duplicates |
lib/reencodarr/crf_searcher/broadway.ex |
Updated pause/resume to use supervisor-based start/stop instead of state machine |
lib/reencodarr/crf_searcher.ex |
Simplified API by delegating to Broadway and CrfSearch modules |
lib/reencodarr/core/time.ex |
Fixed typespec and added trunc() calls to handle float inputs |
lib/reencodarr/analyzer/broadway/producer.ex |
Unified event subscription and simplified status broadcasting |
lib/reencodarr/ab_av1/helper.ex |
Enhanced image stream removal to use track UIDs with mkvpropedit --delete-track |
lib/reencodarr/ab_av1/encode.ex |
Added missing available?/0 function referenced by encoder module |
lib/reencodarr/ab_av1/crf_search.ex |
Restored completion event broadcasting and added state transition when starting CRF search |
.vscode/settings.json |
Removed workspace color customizations |
- Remove dead code calling removed Producer.pause() in encoder/broadway.ex
- Replace non-existent Factory import with Fixtures in tests
- Remove obsolete state_management_test.exs (tested removed State module)
- Fix broadway_producers_test.exs:
* Use {:producer, state} pattern (GenStage) not {:ok, state}
* Handle {:ok, video} tuple from video_fixture()
* Simplify assertions to be timing-tolerant
- Make dispatch_available() safer by checking Process.whereis() first
- Add orphan file deletion when video not found in DB
- Fix mark_invalid_videos to return {:error, path} not {:skip, message}
- Remove unused Video alias from crf_search.ex
- Add logging for image attachment detection/removal
All tests passing (608 tests, 0 failures)
No compile warnings
… retry encode exit notifications - Catch Exqlite.DB busy and DBConnection errors while processing CRF search output and requeue the original port message with a short backoff instead of crashing the GenServer. - For encoder, catch DB busy when notifying success/failure after process exit and schedule retry; only clear state when notification succeeds.
- Fix potential nil dereference in Helper.ex by using get_in/2 for safe nested access - Add clear documentation for perform_crf_search_cleanup/1 function clauses - Clarify when each cleanup function is called (with vs without video task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.
| # Individual analysis completed - this is handled by batch completion now | ||
| {:noreply, [], state} | ||
| end | ||
| def handle_demand(_demand, state), do: {:noreply, [], state} |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second handle_demand/2 clause will never match because the first clause already handles all cases. When demand <= 0, the first clause won't match due to the guard, but GenStage will never call handle_demand with a non-positive demand value. The second clause should be removed as it's unreachable code.
| def handle_demand(_demand, state), do: {:noreply, [], state} |
| def handle_info(:poll, state) do | ||
| schedule_poll() | ||
| # Check if there's work and Encode is available, wake up Broadway if so | ||
| if Encode.available?() do | ||
| case Media.get_next_for_encoding(1) do | ||
| %Reencodarr.Media.Vmaf{} = vmaf -> {:noreply, [vmaf], state} | ||
| [%Reencodarr.Media.Vmaf{} = vmaf] -> {:noreply, [vmaf], state} | ||
| _ -> {:noreply, [], state} | ||
| end | ||
| end) | ||
| end | ||
|
|
||
| defp dispatch_if_ready(state) do | ||
| current_status = PipelineStateMachine.get_state(state.pipeline) | ||
|
|
||
| Logger.debug( | ||
| "Producer: dispatch_if_ready called - status: #{current_status}, demand: #{state.demand}" | ||
| ) | ||
|
|
||
| if should_dispatch?(state) and state.demand > 0 do | ||
| Logger.debug("Producer: dispatch_if_ready - conditions met, dispatching VMAFs") | ||
| dispatch_vmafs(state) | ||
| else | ||
| Logger.debug( | ||
| "Producer: dispatch_if_ready - conditions NOT met, not dispatching (should_dispatch: #{should_dispatch?(state)}, demand: #{state.demand})" | ||
| ) | ||
|
|
||
| handle_no_dispatch_encoder(state) | ||
| {:noreply, [], state} | ||
| end | ||
| end |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handle_info(:poll, state) and dispatch(state) functions contain duplicated logic with only minor differences in pattern matching. Consider extracting the common logic or having handle_info call dispatch directly to reduce duplication and improve maintainability.
| def handle_info(:poll, state) do | ||
| schedule_poll() | ||
| # Check if there's work and CrfSearch is available, wake up Broadway if so | ||
| if CrfSearch.available?() do | ||
| case Media.get_videos_for_crf_search(1) do | ||
| [] -> | ||
| # No videos to process - transition to idle | ||
| new_pipeline = PipelineStateMachine.transition_to(state.pipeline, :idle) | ||
| new_state = %{state | pipeline: new_pipeline} | ||
| {:noreply, [], new_state} | ||
|
|
||
| [_video | _] -> | ||
| # Videos available but no demand or CRF service unavailable | ||
| {:noreply, [], state} | ||
| [] -> {:noreply, [], state} | ||
| videos -> {:noreply, videos, state} | ||
| end | ||
| else | ||
| {:noreply, [], state} | ||
| end | ||
| end |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handle_info(:poll, state) and dispatch(state) functions contain duplicated logic. The dispatch function doesn't match empty lists explicitly like handle_info does, creating inconsistency. Consider consolidating this logic to avoid duplication and ensure consistent behavior.
| defp image_video_track?(track) do | ||
| track["type"] == "video" and | ||
| (String.upcase(track["codec"] || "") in ["MJPEG", "PNG"] or | ||
| String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW", "PNG"])) |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check String.contains?(... || \"\", [\"V_MS/VFW\", \"PNG\"]) will match any track containing "PNG" in its codec_id, which could incorrectly identify legitimate PNG-encoded video streams as image attachments. The logic should be more specific - "PNG" is already checked in the codec field, so the codec_id check should only look for "V_MS/VFW" to identify attached pictures, not "PNG" as well.
| String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW", "PNG"])) | |
| String.contains?(get_in(track, ["properties", "codec_id"]) || "", ["V_MS/VFW"])) |
| :ok | ||
|
|
||
| {:error, reason} -> | ||
| Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message logs a warning when the state transition fails but then continues to process the CRF search anyway. This could lead to inconsistent state. Consider either handling the error properly by returning early, or updating the log message to clarify that processing continues despite the state transition failure.
| Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}") | |
| Logger.warning("Failed to mark video #{video.id} as crf_searching: #{inspect(reason)}. Aborting CRF search for this video.") | |
| # Do not proceed with CRF search if state transition fails | |
| {:noreply, state} |
| Logger.warning("Video not found in database, deleting orphan file: #{path}") | ||
|
|
||
| case File.rm(path) do | ||
| :ok -> | ||
| Logger.info("Successfully deleted orphan file: #{path}") | ||
|
|
||
| {:error, reason} -> | ||
| Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}") | ||
| end | ||
|
|
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Automatically deleting files that aren't found in the database could be dangerous if there's a temporary database issue or race condition. This could lead to unintended data loss. Consider adding a safeguard such as checking if the file is in an expected directory, or requiring manual intervention for orphan file cleanup.
| Logger.warning("Video not found in database, deleting orphan file: #{path}") | |
| case File.rm(path) do | |
| :ok -> | |
| Logger.info("Successfully deleted orphan file: #{path}") | |
| {:error, reason} -> | |
| Logger.error("Failed to delete orphan file #{path}: #{inspect(reason)}") | |
| end | |
| Logger.warning("Video not found in database for failed analysis: #{path}. Orphan file NOT deleted automatically. Manual review required.") | |
| # Optionally, emit a dashboard event or notification for manual orphan file review | |
| Events.orphan_file_detected(path, reason) |
| rescue | ||
| e in Exqlite.Error -> | ||
| Logger.warning( | ||
| "Encode: Database busy while handling exit_status, scheduling retry: #{inspect(e)}" | ||
| ) | ||
|
|
||
| # Retry after short backoff. Keep current state until retry completes. | ||
| Process.send_after(self(), {:encoding_exit_retry, exit_code, vmaf, output_file}, 200) | ||
| :retry_scheduled |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded retry delay of 200ms appears in multiple places (lines 157, 200). This magic number should be extracted to a module attribute like @db_retry_delay_ms for consistency and easier tuning.
| rescue | ||
| e in Exqlite.Error -> | ||
| # Database is busy — don't crash. Requeue the same port message with a short delay. | ||
| Logger.warning( | ||
| "CrfSearch: Database busy while upserting VMAF, requeuing line: #{inspect(e)}" | ||
| ) | ||
|
|
||
| # Re-send the exact same message after a short backoff to retry | ||
| Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) | ||
|
|
||
| # Keep state intact — we'll retry later | ||
| {:noreply, state} | ||
|
|
||
| e in DBConnection.ConnectionError -> | ||
| Logger.warning( | ||
| "CrfSearch: DB connection error while upserting VMAF, requeuing line: #{inspect(e)}" | ||
| ) | ||
|
|
||
| {:noreply, %{state | partial_line_buffer: "", output_buffer: new_output_buffer}} | ||
| Process.send_after(self(), {port, {:data, {:eol, line}}}, 200) | ||
| {:noreply, state} |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry logic for Exqlite.Error and DBConnection.ConnectionError is duplicated with identical behavior (200ms delay, same message requeue). This duplication should be consolidated into a single rescue clause handling both error types, or extracted into a helper function.
| @spec clean_mkv_attachments(String.t()) :: {:ok, String.t()} | ||
| def clean_mkv_attachments(file_path) do |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function spec changed from {:ok, String.t()} | {:error, term()} to always returning {:ok, String.t()}, but error cases (like check_for_image_attachments returning {:error, reason}) now also return {:ok, file_path}, masking failures. Callers can no longer distinguish between successful cleaning and fallback to original file due to errors. Consider keeping the error tuple in the spec or documenting this behavior change.
| defp mark_invalid_videos(invalid_videos) do | ||
| Enum.map(invalid_videos, fn video_info -> | ||
| {:skip, "file validation failed: #{video_info.path}"} | ||
| {:error, video_info.path} |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value changed from {:skip, \"file validation failed: #{video_info.path}\"} to {:error, video_info.path}. This changes both the tag (:skip to :error) and the message format. Verify that all callers of this function correctly handle the :error tuple and that the path-only format provides sufficient debugging information compared to the previous descriptive message.
| {:error, video_info.path} | |
| {:skip, "file validation failed: #{video_info.path}"} |
Complete Broadway Pipeline Simplification
Problem
Broadway pipelines (Analyzer, CRF Searcher, Encoder) were stopping after processing single items instead of continuously processing queued work. Complex state machines and pause/resume functionality were causing reliability issues and making the codebase difficult to maintain.
Solution: Architectural Simplification
Completely rewrote all Broadway producers from complex state machines to simple polling-based architecture.
Before vs After
What Was Removed
New Architecture
Simple Polling Pattern:
Additional Improvements
Database Resilience
Code Quality Fixes
Core.Time.to_seconds/2to handle float inputsRuntime Stability
Benefits
Reliability
Maintainability
Performance
Testing
Migration Notes
This represents a complete architectural simplification prioritizing reliability and maintainability over complex features that were causing operational issues.