From 99efe8e2944de210c9408e162bfbdbc54afd8c59 Mon Sep 17 00:00:00 2001 From: Adonis Carvajal Date: Mon, 5 May 2025 21:33:18 -0400 Subject: [PATCH] Decoupled video buffer from video encoder for more extensibility --- Cargo.lock | 4 +- src/encoders/buffer.rs | 4 +- src/encoders/buffer_tests.rs | 4 +- src/encoders/nvenc_encoder.rs | 75 +++++++++++++++++++------------- src/encoders/vaapi_encoder.rs | 82 ++++++++++++++++++++++------------- src/encoders/video_encoder.rs | 14 +++--- src/main.rs | 4 +- src/modes/shadow_cap.rs | 70 ++++++++++++++++++++++++------ src/waycap.rs | 19 ++++---- 9 files changed, 175 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5fbb101..d2cb9e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1004,9 +1004,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libdbus-sys" diff --git a/src/encoders/buffer.rs b/src/encoders/buffer.rs index 132744c..10a0680 100644 --- a/src/encoders/buffer.rs +++ b/src/encoders/buffer.rs @@ -35,7 +35,7 @@ impl VideoFrameData { /// The buffer is ordered by decoding timestamp (DTS) and maintains complete GOPs (groups of pictures), /// ensuring that no partial GOPs are kept when trimming for ease of muxing and playback. #[derive(Clone)] -pub struct VideoBuffer { +pub struct ShadowCaptureVideoBuffer { frames: BTreeMap, /// Maximum duration (in seconds) that the buffer should retain. @@ -47,7 +47,7 @@ pub struct VideoBuffer { key_frame_keys: Vec, } -impl VideoBuffer { +impl ShadowCaptureVideoBuffer { /// Creates a new `FrameBuffer` with a specified maximum duration. /// /// # Arguments diff --git a/src/encoders/buffer_tests.rs b/src/encoders/buffer_tests.rs index 4aaa82a..6262a3b 100644 --- a/src/encoders/buffer_tests.rs +++ b/src/encoders/buffer_tests.rs @@ -17,7 +17,7 @@ fn test_video_buffer_no_trim() { VideoFrameData::new(vec![3], true, 6), ]; - let mut buffer = VideoBuffer::new(10); + let mut buffer = ShadowCaptureVideoBuffer::new(10); buffer.insert(1, dummy_frames[0].clone()); buffer.insert(2, dummy_frames[1].clone()); @@ -42,7 +42,7 @@ fn test_video_buffer_no_trim() { #[test] fn test_video_buffer_trimming() { - let mut buffer = VideoBuffer::new(10); + let mut buffer = ShadowCaptureVideoBuffer::new(10); let dummy_frames = [ VideoFrameData::new(vec![1], true, 0), diff --git a/src/encoders/nvenc_encoder.rs b/src/encoders/nvenc_encoder.rs index f10cf7a..dd2d064 100644 --- a/src/encoders/nvenc_encoder.rs +++ b/src/encoders/nvenc_encoder.rs @@ -1,42 +1,44 @@ use ffmpeg_next::{self as ffmpeg, Rational}; +use ringbuf::{ + traits::{Producer, Split}, + HeapCons, HeapProd, HeapRb, +}; use crate::{application_config::QualityPreset, RawVideoFrame}; use super::{ - buffer::{VideoBuffer, VideoFrameData}, - video_encoder::{VideoEncoder, GOP_SIZE, ONE_MICROS}, + buffer::VideoFrameData, + video_encoder::{VideoEncoder, GOP_SIZE}, }; pub struct NvencEncoder { encoder: Option, - video_buffer: VideoBuffer, width: u32, height: u32, encoder_name: String, quality: QualityPreset, + encoded_frame_recv: Option>, + encoded_frame_sender: Option>, } impl VideoEncoder for NvencEncoder { - fn new( - width: u32, - height: u32, - max_buffer_seconds: u32, - quality: QualityPreset, - ) -> anyhow::Result + fn new(width: u32, height: u32, quality: QualityPreset) -> anyhow::Result where Self: Sized, { let encoder_name = "h264_nvenc"; let encoder = Some(Self::create_encoder(width, height, encoder_name, &quality)?); - let max_time = max_buffer_seconds as usize * ONE_MICROS; + let video_ring_buffer = HeapRb::<(i64, VideoFrameData)>::new(120); + let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); Ok(Self { encoder, - video_buffer: VideoBuffer::new(max_time), width, height, encoder_name: encoder_name.to_string(), quality, + encoded_frame_recv: Some(video_ring_receiver), + encoded_frame_sender: Some(video_ring_sender), }) } @@ -56,14 +58,21 @@ impl VideoEncoder for NvencEncoder { let mut packet = ffmpeg::codec::packet::Packet::empty(); if encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { - let frame_data = VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ); - - self.video_buffer - .insert(packet.dts().unwrap_or(0), frame_data); + if let Some(ref mut sender) = self.encoded_frame_sender { + if sender + .try_push(( + packet.dts().unwrap_or(0), + VideoFrameData::new( + data.to_vec(), + packet.is_key(), + packet.pts().unwrap_or(0), + ), + )) + .is_err() + { + log::error!("Could not send encoded packet to the ringbuf"); + } + } }; } } @@ -77,14 +86,21 @@ impl VideoEncoder for NvencEncoder { let mut packet = ffmpeg::codec::packet::Packet::empty(); while encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { - let frame_data = VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ); - - self.video_buffer - .insert(packet.dts().unwrap_or(0), frame_data); + if let Some(ref mut sender) = self.encoded_frame_sender { + if sender + .try_push(( + packet.dts().unwrap_or(0), + VideoFrameData::new( + data.to_vec(), + packet.is_key(), + packet.pts().unwrap_or(0), + ), + )) + .is_err() + { + log::error!("Could not send encoded packet to the ringbuf"); + } + } }; packet = ffmpeg::codec::packet::Packet::empty(); } @@ -93,7 +109,6 @@ impl VideoEncoder for NvencEncoder { } fn drop_encoder(&mut self) { - self.video_buffer.reset(); self.encoder.take(); } @@ -112,8 +127,8 @@ impl VideoEncoder for NvencEncoder { &self.encoder } - fn get_buffer(&self) -> &VideoBuffer { - &self.video_buffer + fn take_encoded_recv(&mut self) -> Option> { + self.encoded_frame_recv.take() } } diff --git a/src/encoders/vaapi_encoder.rs b/src/encoders/vaapi_encoder.rs index 4bff52e..4451b27 100644 --- a/src/encoders/vaapi_encoder.rs +++ b/src/encoders/vaapi_encoder.rs @@ -13,10 +13,14 @@ use ffmpeg_next::{ Rational, }; use log::error; +use ringbuf::{ + traits::{Producer, Split}, + HeapCons, HeapProd, HeapRb, +}; use super::{ - buffer::{VideoBuffer, VideoFrameData}, - video_encoder::{VideoEncoder, GOP_SIZE, ONE_MICROS}, + buffer::VideoFrameData, + video_encoder::{VideoEncoder, GOP_SIZE}, }; thread_local! { @@ -25,32 +29,35 @@ thread_local! { pub struct VaapiEncoder { encoder: Option, - video_buffer: VideoBuffer, width: u32, height: u32, encoder_name: String, quality: QualityPreset, + encoded_frame_recv: Option>, + encoded_frame_sender: Option>, } impl VideoEncoder for VaapiEncoder { - fn new( - width: u32, - height: u32, - max_buffer_seconds: u32, - quality: QualityPreset, - ) -> anyhow::Result + fn new(width: u32, height: u32, quality: QualityPreset) -> anyhow::Result where Self: Sized, { + // TODO: Should create a child thread that polls the encoder and does the buffer logic + // doing it all at once takes too long + let encoder_name = "h264_vaapi"; let encoder = Self::create_encoder(width, height, encoder_name, &quality)?; + let video_ring_buffer = HeapRb::<(i64, VideoFrameData)>::new(120); + let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); + Ok(Self { encoder: Some(encoder), - video_buffer: VideoBuffer::new(max_buffer_seconds as usize * ONE_MICROS), width, height, encoder_name: encoder_name.to_string(), quality, + encoded_frame_recv: Some(video_ring_receiver), + encoded_frame_sender: Some(video_ring_sender), }) } @@ -126,14 +133,21 @@ impl VideoEncoder for VaapiEncoder { let mut packet = ffmpeg::codec::packet::Packet::empty(); if encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { - let frame_data = VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ); - - self.video_buffer - .insert(packet.dts().unwrap_or(0), frame_data); + if let Some(ref mut sender) = self.encoded_frame_sender { + if sender + .try_push(( + packet.dts().unwrap_or(0), + VideoFrameData::new( + data.to_vec(), + packet.is_key(), + packet.pts().unwrap_or(0), + ), + )) + .is_err() + { + log::error!("Could not send encoded packet to the ringbuf"); + } + } }; } }); @@ -148,14 +162,21 @@ impl VideoEncoder for VaapiEncoder { let mut packet = ffmpeg::codec::packet::Packet::empty(); while encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { - let frame_data = VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ); - - self.video_buffer - .insert(packet.dts().unwrap_or(0), frame_data); + if let Some(ref mut sender) = self.encoded_frame_sender { + if sender + .try_push(( + packet.dts().unwrap_or(0), + VideoFrameData::new( + data.to_vec(), + packet.is_key(), + packet.pts().unwrap_or(0), + ), + )) + .is_err() + { + log::error!("Could not send encoded packet to the ringbuf"); + } + } }; packet = ffmpeg::codec::packet::Packet::empty(); } @@ -178,14 +199,13 @@ impl VideoEncoder for VaapiEncoder { &self.encoder } - fn get_buffer(&self) -> &VideoBuffer { - &self.video_buffer - } - fn drop_encoder(&mut self) { - self.video_buffer.reset(); self.encoder.take(); } + + fn take_encoded_recv(&mut self) -> Option> { + self.encoded_frame_recv.take() + } } impl VaapiEncoder { diff --git a/src/encoders/video_encoder.rs b/src/encoders/video_encoder.rs index 3c8c3a4..c3eaaed 100644 --- a/src/encoders/video_encoder.rs +++ b/src/encoders/video_encoder.rs @@ -1,26 +1,22 @@ use anyhow::Result; use ffmpeg_next::{self as ffmpeg}; +use ringbuf::HeapCons; use crate::{application_config::QualityPreset, RawVideoFrame}; -use super::buffer::VideoBuffer; +use super::buffer::VideoFrameData; pub const ONE_MICROS: usize = 1_000_000; pub const GOP_SIZE: u32 = 30; -pub trait VideoEncoder { - fn new( - width: u32, - height: u32, - max_buffer_seconds: u32, - quality: QualityPreset, - ) -> Result +pub trait VideoEncoder: Send { + fn new(width: u32, height: u32, quality: QualityPreset) -> Result where Self: Sized; fn process(&mut self, frame: &RawVideoFrame) -> Result<(), ffmpeg::Error>; fn drain(&mut self) -> Result<(), ffmpeg::Error>; fn reset(&mut self) -> Result<()>; - fn get_buffer(&self) -> &VideoBuffer; fn drop_encoder(&mut self); fn get_encoder(&self) -> &Option; + fn take_encoded_recv(&mut self) -> Option>; } diff --git a/src/main.rs b/src/main.rs index b53f92a..5558a49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use anyhow::{Context, Error, Result}; use application_config::load_or_create_config; use encoders::{ audio_encoder::{AudioEncoderImpl, FfmpegAudioEncoder}, - buffer::{AudioBuffer, VideoBuffer}, + buffer::{AudioBuffer, ShadowCaptureVideoBuffer}, video_encoder::ONE_MICROS, }; use ffmpeg_next::{self as ffmpeg}; @@ -82,7 +82,7 @@ async fn main() -> Result<(), Error> { fn save_buffer( filename: &str, - video_buffer: &VideoBuffer, + video_buffer: &ShadowCaptureVideoBuffer, video_encoder: &ffmpeg::codec::encoder::Video, audio_buffer: &AudioBuffer, audio_encoder: &FfmpegAudioEncoder, diff --git a/src/modes/shadow_cap.rs b/src/modes/shadow_cap.rs index 61c1889..a5367be 100644 --- a/src/modes/shadow_cap.rs +++ b/src/modes/shadow_cap.rs @@ -1,6 +1,6 @@ use std::{ sync::{atomic::AtomicBool, Arc}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use anyhow::Context; @@ -12,6 +12,7 @@ use crate::{ application_config, encoders::{ audio_encoder::{AudioEncoder, FfmpegAudioEncoder}, + buffer::{ShadowCaptureVideoBuffer, VideoFrameData}, nvenc_encoder::NvencEncoder, vaapi_encoder::VaapiEncoder, video_encoder::{VideoEncoder, ONE_MICROS}, @@ -22,9 +23,9 @@ use crate::{ use super::AppMode; pub struct ShadowCapMode { - max_seconds: u32, audio_encoder: Option>>>, video_encoder: Option>>, + video_buffer: Arc>, } impl AppMode for ShadowCapMode { @@ -34,10 +35,10 @@ impl AppMode for ShadowCapMode { let video_encoder: Arc> = match ctx.config.encoder { application_config::EncoderToUse::H264Nvenc => Arc::new(Mutex::new( - NvencEncoder::new(ctx.width, ctx.height, self.max_seconds, ctx.config.quality)?, + NvencEncoder::new(ctx.width, ctx.height, ctx.config.quality)?, )), application_config::EncoderToUse::H264Vaapi => Arc::new(Mutex::new( - VaapiEncoder::new(ctx.width, ctx.height, self.max_seconds, ctx.config.quality)?, + VaapiEncoder::new(ctx.width, ctx.height, ctx.config.quality)?, )), }; @@ -52,7 +53,7 @@ impl AppMode for ShadowCapMode { Arc::clone(&video_encoder), ); ctx.join_handles.push(video_worker); - self.video_encoder = Some(video_encoder); + self.video_encoder = Some(video_encoder.clone()); // Audio let audio_encoder = Arc::new(Mutex::new(AudioEncoder::new_with_encoder( @@ -73,6 +74,13 @@ impl AppMode for ShadowCapMode { ctx.join_handles.push(audio_worker); self.audio_encoder = Some(audio_encoder); + let recv = { video_encoder.lock().await.take_encoded_recv() } + .context("Could not take encoded frame recv")?; + let shadow_worker = + Self::create_shadow_worker(recv, Arc::clone(&self.video_buffer), Arc::clone(&ctx.stop)); + + ctx.join_handles.push(shadow_worker); + log::debug!("Successfully initialized Shadow Capture Mode"); Ok(()) } @@ -81,15 +89,17 @@ impl AppMode for ShadowCapMode { ctx.saving.store(true, std::sync::atomic::Ordering::Release); if let Some(video_encoder) = &self.video_encoder { if let Some(audio_encoder) = &self.audio_encoder { - let (mut video_lock, mut audio_lock) = - tokio::join!(video_encoder.lock(), audio_encoder.lock()); + let (mut video_lock, mut audio_lock, mut video_buffer) = tokio::join!( + video_encoder.lock(), + audio_encoder.lock(), + self.video_buffer.lock() + ); // Drain both encoders of any remaining frames being processed video_lock.drain()?; audio_lock.drain()?; let filename = format!("clip_{}.mp4", chrono::Local::now().timestamp()); - let video_buffer = video_lock.get_buffer(); let video_encoder = video_lock .get_encoder() .as_ref() @@ -103,13 +113,14 @@ impl AppMode for ShadowCapMode { save_buffer( &filename, - video_buffer, + &video_buffer, video_encoder, audio_buffer, audio_encoder, )?; video_lock.reset()?; + video_buffer.reset(); audio_lock.reset_encoder(FfmpegAudioEncoder::new_opus)?; } } @@ -139,10 +150,14 @@ impl ShadowCapMode { max_seconds <= 86400, "Max seconds is above 24 hours. This is too much time for shadow capture" ); + + let actual_max = max_seconds * ONE_MICROS as u32; Ok(Self { - max_seconds: max_seconds * ONE_MICROS as u32, audio_encoder: None, video_encoder: None, + video_buffer: Arc::new(Mutex::new(ShadowCaptureVideoBuffer::new( + actual_max as usize, + ))), }) } @@ -198,13 +213,15 @@ impl ShadowCapMode { ) -> std::thread::JoinHandle<()> { std::thread::spawn(move || { let mut last_timestamp: u64 = 0; + let mut total_time: u128 = 0; + let mut frame_count: u64 = 0; loop { if stop_video.load(std::sync::atomic::Ordering::Acquire) { break; } while let Some(raw_frame) = video_receiver.try_pop() { - let now = SystemTime::now(); + let now = Instant::now(); let current_time = *raw_frame.get_timestamp() as u64; // Throttle FPS @@ -221,14 +238,39 @@ impl ShadowCapMode { ); } + let elapsed = now.elapsed().as_nanos(); + total_time += elapsed; + frame_count += 1; + + let average_time = total_time / frame_count as u128; + log::trace!( - "Took {:?} to process this video frame at {:?}", - now.elapsed(), - raw_frame.timestamp + "Took {:?} to process this video frame. Average time: {:.3}ms, Frame Count: {:?}", + elapsed, + average_time / 1_000_000, + frame_count, ); } std::thread::sleep(Duration::from_nanos(100)); } }) } + + fn create_shadow_worker( + mut recv: HeapCons<(i64, VideoFrameData)>, + buffer: Arc>, + stop: Arc, + ) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || loop { + if stop.load(std::sync::atomic::Ordering::Acquire) { + break; + } + + while let Some((dts, encoded_frame)) = recv.try_pop() { + buffer.blocking_lock().insert(dts, encoded_frame); + } + + std::thread::sleep(Duration::from_nanos(100)); + }) + } } diff --git a/src/waycap.rs b/src/waycap.rs index 13a65cb..094ff31 100644 --- a/src/waycap.rs +++ b/src/waycap.rs @@ -29,7 +29,7 @@ pub struct WayCap { impl WayCap { pub async fn new(mut mode: M, config: AppConfig) -> Result { - simple_logging::log_to_file("logs.txt", log::LevelFilter::Debug)?; + simple_logging::log_to_file("logs.txt", log::LevelFilter::Trace)?; let current_time = SystemTime::now(); let saving = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false)); @@ -61,15 +61,16 @@ impl WayCap { let audio_ready_pw = Arc::clone(&audio_ready); let saving_video = Arc::clone(&saving); + let mut screen_cast = ScreenCast::new()?; + screen_cast.set_source_types(SourceType::all()); + screen_cast.set_cursor_mode(CursorMode::EMBEDDED); + let active_cast = screen_cast.start(None)?; + + let fd = active_cast.pipewire_fd(); + let stream = active_cast.streams().next().unwrap(); + let stream_node = stream.pipewire_node(); + let pw_video_capture = std::thread::spawn(move || { - let mut screen_cast = ScreenCast::new().unwrap(); - screen_cast.set_source_types(SourceType::all()); - screen_cast.set_cursor_mode(CursorMode::EMBEDDED); - let active_cast = screen_cast.start(None).unwrap(); - - let fd = active_cast.pipewire_fd(); - let stream = active_cast.streams().next().unwrap(); - let stream_node = stream.pipewire_node(); let video_cap = VideoCapture::new(video_ready_pw, audio_ready_pw); video_cap .run(