From de772ec502dc9de7880deef4030efe9e3dcde292 Mon Sep 17 00:00:00 2001 From: Adonis Carvajal Date: Fri, 9 May 2025 19:34:03 -0400 Subject: [PATCH 1/2] Added DMA BUF no copy encoding --- Cargo.lock | 15 ++ Cargo.toml | 3 + src/encoders/audio_encoder.rs | 68 +++++-- src/encoders/audio_encoder_tests.rs | 254 ----------------------- src/encoders/buffer.rs | 6 +- src/encoders/buffer_tests.rs | 8 +- src/encoders/mod.rs | 2 - src/encoders/vaapi_encoder.rs | 306 ++++++++++++++++++++-------- src/main.rs | 12 +- src/modes/shadow_cap.rs | 66 ++++-- src/pw_capture/video_stream.rs | 57 ++++-- src/waycap.rs | 2 +- 12 files changed, 403 insertions(+), 396 deletions(-) delete mode 100644 src/encoders/audio_encoder_tests.rs diff --git a/Cargo.lock b/Cargo.lock index d2cb9e3..0ee9ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -359,6 +359,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +[[package]] +name = "c_str_macro" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6d44951c469019e225e7667d799052f67fb8ea358d086878f3582b39f0de5e5" + [[package]] name = "cc" version = "1.2.10" @@ -604,6 +610,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "drm-fourcc" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aafbcdb8afc29c1a7ee5fbe53b5d62f4565b35a042a662ca9fecd0b54dae6f4" + [[package]] name = "either" version = "1.13.0" @@ -2072,10 +2084,13 @@ version = "0.1.0" dependencies = [ "anyhow", "bytemuck", + "c_str_macro", "chrono", "config", "directories", + "drm-fourcc", "ffmpeg-next", + "libc", "log", "pipewire", "portal-screencast", diff --git a/Cargo.toml b/Cargo.toml index 056b6ca..b047b46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,13 @@ edition = "2021" [dependencies] anyhow = "1.0.95" bytemuck = "1.21.0" +c_str_macro = "1.0.3" chrono = "0.4.39" config = "0.15.11" directories = "6.0.0" +drm-fourcc = "2.2.0" ffmpeg-next = { version = "7.1.0", features = ["codec", "format"] } +libc = "0.2.172" log = "0.4.25" pipewire = "0.8.0" portal-screencast = { path = "portal-screencast" } diff --git a/src/encoders/audio_encoder.rs b/src/encoders/audio_encoder.rs index 5937269..961c20c 100644 --- a/src/encoders/audio_encoder.rs +++ b/src/encoders/audio_encoder.rs @@ -2,11 +2,13 @@ use std::collections::VecDeque; use anyhow::Result; use ffmpeg_next::{self as ffmpeg, Rational}; +use ringbuf::{ + traits::{Producer, Split}, + HeapCons, HeapProd, HeapRb, +}; use crate::RawAudioFrame; -use super::{buffer::AudioBuffer, video_encoder::ONE_MICROS}; - const MIN_RMS: f32 = 0.01; pub struct FfmpegAudioEncoder(ffmpeg::codec::encoder::Audio); @@ -114,14 +116,27 @@ impl AsRef for FfmpegAudioEncoder { } } +/// Represents an encoded audio frame encoded by the Opus encoder. +pub struct EncodedAudioFrameData { + /// The raw bytes of the encoded frame + pub data: Vec, + /// The presentation timestamp in the encoder's time base + /// (960 samples per channel = ~20 ms) + pub pts: i64, + /// The timestamp for when the frame was originally captured in micro seconds + pub timestamp: i64, +} + pub struct AudioEncoder where E: AudioEncoderImpl, { encoder: Option, - audio_buffer: AudioBuffer, next_pts: i64, leftover_data: VecDeque, + encoded_samples_recv: Option>, + encoded_samples_sender: Option>, + capture_timestamps: VecDeque, } impl AudioEncoder @@ -130,16 +145,19 @@ where { pub fn new_with_encoder( factory: impl Fn() -> Result, - max_seconds: u32, ) -> Result { let encoder = factory()?; - let max_time = max_seconds as usize * ONE_MICROS; + + let audio_ring_buffer = HeapRb::::new(5); + let (sender, receiver) = audio_ring_buffer.split(); Ok(Self { encoder: Some(encoder), - audio_buffer: AudioBuffer::new(max_time), next_pts: 0, leftover_data: VecDeque::new(), + encoded_samples_recv: Some(receiver), + encoded_samples_sender: Some(sender), + capture_timestamps: VecDeque::with_capacity(4), }) } @@ -173,7 +191,7 @@ where frame.set_pts(Some(self.next_pts)); frame.set_rate(encoder.rate()); - self.audio_buffer.insert_capture_time(raw_frame.timestamp); + self.capture_timestamps.push_back(raw_frame.timestamp); encoder.send_frame(&frame)?; // Try and get a frame back from encoder @@ -181,7 +199,18 @@ where if encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { let pts = packet.pts().unwrap_or(0); - self.audio_buffer.insert_frame(pts, data.to_vec()); + if let Some(ref mut sender) = self.encoded_samples_sender { + if sender + .try_push(EncodedAudioFrameData { + data: data.to_vec(), + pts, + timestamp: self.capture_timestamps.pop_front().unwrap_or(0), + }) + .is_err() + { + log::error!("Could not send encoded packet to audio ringbuf"); + } + } } } @@ -196,10 +225,6 @@ where &self.encoder } - pub fn get_buffer(&self) -> &AudioBuffer { - &self.audio_buffer - } - // Drain remaining frames being processed in the encoder pub fn drain(&mut self) -> Result<(), ffmpeg::Error> { if let Some(ref mut encoder) = self.encoder { @@ -208,16 +233,27 @@ where while encoder.receive_packet(&mut packet).is_ok() { if let Some(data) = packet.data() { let pts = packet.pts().unwrap_or(0); - self.audio_buffer.insert_frame(pts, data.to_vec()); + if let Some(ref mut sender) = self.encoded_samples_sender { + if sender + .try_push(EncodedAudioFrameData { + data: data.to_vec(), + pts, + timestamp: self.capture_timestamps.pop_front().unwrap_or(0), + }) + .is_err() + { + log::error!("Could not send encoded packet to audio ringbuf"); + } + } } } } + Ok(()) } pub fn drop_encoder(&mut self) { self.encoder.take(); - self.audio_buffer.reset(); } pub fn reset_encoder( @@ -229,6 +265,10 @@ where Ok(()) } + + pub fn take_encoded_recv(&mut self) -> Option> { + self.encoded_samples_recv.take() + } } impl AudioEncoder diff --git a/src/encoders/audio_encoder_tests.rs b/src/encoders/audio_encoder_tests.rs deleted file mode 100644 index 51092bd..0000000 --- a/src/encoders/audio_encoder_tests.rs +++ /dev/null @@ -1,254 +0,0 @@ -use crate::{encoders::video_encoder::ONE_MICROS, RawAudioFrame}; - -use super::audio_encoder::*; - -use ffmpeg_next::{ - self as ffmpeg, - codec::packet::Packet, - format, - frame::{self}, - Rational, -}; -use std::collections::VecDeque; - -/// A fake audio encoder for testing. -#[derive(Clone)] -pub struct FakeAudioEncoder { - pub channels: u16, - pub frame_size: i32, - pub rate: u32, - pub sent_frames: Vec>, - pub queued_packets: VecDeque>, -} - -impl FakeAudioEncoder { - /// Create a new fake. By default no packets are queued. - pub fn new(channels: u16, frame_size: i32, rate: u32) -> Self { - Self { - channels, - frame_size, - rate, - sent_frames: Vec::new(), - queued_packets: VecDeque::new(), - } - } - - fn push_packet(&mut self, data: Vec) { - self.queued_packets.push_back(data); - } -} - -impl AudioEncoderImpl for FakeAudioEncoder { - type Error = ffmpeg::Error; - - fn codec(&self) -> Option { - todo!() - } - - fn time_base(&self) -> Rational { - todo!() - } - - fn channels(&self) -> u16 { - self.channels - } - - fn frame_size(&self) -> i32 { - self.frame_size - } - - fn format(&self) -> format::Sample { - format::Sample::F32(ffmpeg_next::format::sample::Type::Packed) - } - - fn channel_layout(&self) -> ffmpeg_next::channel_layout::ChannelLayout { - ffmpeg_next::channel_layout::ChannelLayout::STEREO - } - - fn rate(&self) -> u32 { - self.rate - } - - fn send_frame(&mut self, frame: &frame::Audio) -> Result<(), Self::Error> { - let buf = frame.plane(0).to_vec(); - self.push_packet(buf.to_vec()); - self.sent_frames.push(buf); - Ok(()) - } - - fn receive_packet(&mut self, pkt: &mut Packet) -> Result<(), Self::Error> { - if let Some(data) = self.queued_packets.pop_front() { - let as_u8: &[u8] = bytemuck::cast_slice(&data); - *pkt = Packet::copy(as_u8); - let pts = (self.sent_frames.len() - 1) as i64 * self.frame_size as i64; - pkt.set_pts(Some(pts)); - pkt.set_dts(Some(pts)); - Ok(()) - } else { - Err(ffmpeg::Error::Exit) - } - } - - fn send_eof(&mut self) -> Result<(), Self::Error> { - Ok(()) - } -} - -fn create_default_frame_encoder() -> FakeAudioEncoder { - // These numbers match the default opus encoder values - FakeAudioEncoder::new( - /*channesl=*/ 2, /*frame_size=*/ 960, /*rate=*/ 48000, - ) -} - -#[test] -fn test_process_drain() { - let fake = create_default_frame_encoder(); - let mut audio_encoder = AudioEncoder::new_with_encoder(|| Ok(fake.clone()), 10).unwrap(); - - let mut raw = [ - RawAudioFrame { - timestamp: 1, - samples: vec![0.0; 1024], - }, - RawAudioFrame { - timestamp: 2, - samples: vec![0.0; 1024], - }, - ]; - - for frame in raw.iter_mut() { - audio_encoder.process(frame).unwrap(); - } - - audio_encoder.drain().unwrap(); - audio_encoder.drop_encoder(); - - let buffer = audio_encoder.get_buffer(); - - assert!(buffer.get_frames().is_empty()); - assert!(buffer.get_capture_times().is_empty()); -} - -#[test] -fn test_process_no_trim() { - let fake = create_default_frame_encoder(); - let mut audio_encoder = AudioEncoder::new_with_encoder(|| Ok(fake.clone()), 10).unwrap(); - - let mut raw = [ - RawAudioFrame { - timestamp: 1, - samples: vec![0.0; 1024], - }, - RawAudioFrame { - timestamp: 2, - samples: vec![0.0; 1024], - }, - ]; - - for frame in raw.iter_mut() { - audio_encoder.process(frame).unwrap(); - } - - let encoder = audio_encoder.get_encoder().as_ref().unwrap(); - let buffer = audio_encoder.get_buffer(); - - assert_eq!(encoder.sent_frames.len(), 2); - assert_eq!(buffer.get_frames().len(), 2); - assert_eq!(*buffer.get_capture_times(), vec![1, 2]); - assert_eq!( - buffer.get_frames().keys().copied().collect::>(), - vec![0, 960] - ); -} - -#[test] -fn test_process_duplicate_timestamps_when_leftover_is_multiple_of_sample() { - let fake = create_default_frame_encoder(); - let mut audio_encoder = AudioEncoder::new_with_encoder(|| Ok(fake.clone()), 60).unwrap(); - - let mut raw_frames = vec![]; - let mut actual_capture_times = vec![]; - - for i in 1..=15 { - raw_frames.push(RawAudioFrame { - timestamp: i, - samples: vec![0.0; 1024], - }); - - actual_capture_times.push(i); - } - - // This frame should be duplicated - actual_capture_times.push(15); - - for frame in raw_frames.iter_mut() { - audio_encoder.process(frame).unwrap(); - } - - let encoder = audio_encoder.get_encoder().as_ref().unwrap(); - let buffer = audio_encoder.get_buffer(); - - assert_eq!(encoder.sent_frames.len(), actual_capture_times.len()); - assert_eq!(buffer.get_frames().len(), actual_capture_times.len()); - - assert_eq!(*buffer.get_capture_times(), actual_capture_times); - assert_eq!( - *buffer - .get_frames() - .keys() - .copied() - .collect::>() - .last() - .unwrap(), - actual_capture_times.len() as i64 * fake.frame_size() as i64 - fake.frame_size() as i64, - ); -} - -#[test] -fn test_process_trimming() { - let fake = create_default_frame_encoder(); - let max_frames = 15; - let mut audio_encoder = AudioEncoder::new_with_encoder(|| Ok(fake.clone()), 5).unwrap(); - - let mut raw_frames = vec![]; - // 5 second window - let actual_capture_times = vec![ - 11 * ONE_MICROS as i64, - 12 * ONE_MICROS as i64, - 13 * ONE_MICROS as i64, - 14 * ONE_MICROS as i64, - 15 * ONE_MICROS as i64, - 15 * ONE_MICROS as i64, - ]; - - for i in 1..=max_frames { - raw_frames.push(RawAudioFrame { - timestamp: i * ONE_MICROS as i64, - samples: vec![0.0; 1024], - }); - } - - for frame in raw_frames.iter_mut() { - audio_encoder.process(frame).unwrap(); - } - - let encoder = audio_encoder.get_encoder().as_ref().unwrap(); - let buffer = audio_encoder.get_buffer(); - - println!("{:?}", buffer.get_capture_times()); - assert_eq!(encoder.sent_frames.len() as i64, max_frames + 1); // Duplicate last frame - assert_eq!(buffer.get_frames().len(), actual_capture_times.len()); - - assert_eq!(*buffer.get_capture_times(), actual_capture_times); - assert_eq!( - *buffer - .get_frames() - .keys() - .copied() - .collect::>() - .last() - .unwrap(), - (max_frames + 1) * fake.frame_size() as i64 - fake.frame_size() as i64, - ); -} diff --git a/src/encoders/buffer.rs b/src/encoders/buffer.rs index 10a0680..272aea1 100644 --- a/src/encoders/buffer.rs +++ b/src/encoders/buffer.rs @@ -147,7 +147,7 @@ impl ShadowCaptureVideoBuffer { } #[derive(Clone)] -pub struct AudioBuffer { +pub struct ShadowCaptureAudioBuffer { frames: BTreeMap>, /// Maximum duration (in seconds) that the buffer should retain. @@ -157,7 +157,7 @@ pub struct AudioBuffer { capture_times: Vec, } -impl AudioBuffer { +impl ShadowCaptureAudioBuffer { pub fn new(max_time: usize) -> Self { Self { frames: BTreeMap::new(), @@ -175,7 +175,7 @@ impl AudioBuffer { /// * `timestamp` - The presentation timestamp (PTS) of the frame according to the audio /// encoder. /// * `frame` - A [`AudioFrameData`] representing an encoded frame. - pub fn insert_frame(&mut self, timestamp: i64, frame: Vec) { + pub fn insert(&mut self, timestamp: i64, frame: Vec) { self.frames.insert(timestamp, frame); while let (Some(oldest), Some(newest)) = diff --git a/src/encoders/buffer_tests.rs b/src/encoders/buffer_tests.rs index 6262a3b..7b92320 100644 --- a/src/encoders/buffer_tests.rs +++ b/src/encoders/buffer_tests.rs @@ -81,7 +81,7 @@ fn test_video_buffer_trimming() { #[test] fn test_audio_buffer_no_trim() { - let mut audio_buffer = AudioBuffer::new(10); + let mut audio_buffer = ShadowCaptureAudioBuffer::new(10); let dummy_data = [ (1, vec![1]), (2, vec![1]), @@ -91,7 +91,7 @@ fn test_audio_buffer_no_trim() { ]; for (pts, data) in dummy_data { - audio_buffer.insert_frame(pts, data); + audio_buffer.insert(pts, data); audio_buffer.insert_capture_time(pts); } @@ -106,7 +106,7 @@ fn test_audio_buffer_no_trim() { #[test] fn test_audio_buffer_trimming() { - let mut audio_buffer = AudioBuffer::new(10); + let mut audio_buffer = ShadowCaptureAudioBuffer::new(10); let dummy_data = [ (1, vec![1]), (3, vec![1]), @@ -121,7 +121,7 @@ fn test_audio_buffer_trimming() { ]; for (pts, data) in dummy_data { - audio_buffer.insert_frame(pts, data); + audio_buffer.insert(pts, data); audio_buffer.insert_capture_time(pts); } diff --git a/src/encoders/mod.rs b/src/encoders/mod.rs index 63d555d..11ff5f3 100644 --- a/src/encoders/mod.rs +++ b/src/encoders/mod.rs @@ -1,6 +1,4 @@ pub mod audio_encoder; -#[cfg(test)] -mod audio_encoder_tests; pub mod buffer; #[cfg(test)] mod buffer_tests; diff --git a/src/encoders/vaapi_encoder.rs b/src/encoders/vaapi_encoder.rs index 0d56133..f20c5e7 100644 --- a/src/encoders/vaapi_encoder.rs +++ b/src/encoders/vaapi_encoder.rs @@ -2,12 +2,14 @@ use std::{cell::RefCell, ffi::CString, ptr::null_mut}; use crate::application_config::QualityPreset; use anyhow::anyhow; +use drm_fourcc::DrmFourcc; use ffmpeg_next::{ self as ffmpeg, ffi::{ - av_buffer_ref, av_buffer_unref, av_hwdevice_ctx_create, av_hwframe_ctx_alloc, - av_hwframe_ctx_init, av_hwframe_get_buffer, av_hwframe_transfer_data, AVBufferRef, - AVHWDeviceContext, AVHWFramesContext, AVPixelFormat, + av_buffer_create, av_buffer_default_free, av_buffer_ref, av_buffer_unref, + av_hwdevice_ctx_create, av_hwframe_ctx_alloc, av_hwframe_ctx_init, av_hwframe_get_buffer, + av_hwframe_transfer_data, AVBufferRef, AVDRMFrameDescriptor, AVHWDeviceContext, + AVHWFramesContext, AVPixelFormat, }, software::scaling::{Context as Scaler, Flags}, Rational, @@ -35,6 +37,7 @@ pub struct VaapiEncoder { quality: QualityPreset, encoded_frame_recv: Option>, encoded_frame_sender: Option>, + filter_graph: Option, } impl VideoEncoder for VaapiEncoder { @@ -46,6 +49,7 @@ impl VideoEncoder for VaapiEncoder { let encoder = Self::create_encoder(width, height, encoder_name, &quality)?; let video_ring_buffer = HeapRb::<(i64, VideoFrameData)>::new(120); let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); + let filter_graph = Some(Self::create_filter_graph(&encoder, width, height)?); Ok(Self { encoder: Some(encoder), @@ -55,110 +59,198 @@ impl VideoEncoder for VaapiEncoder { quality, encoded_frame_recv: Some(video_ring_receiver), encoded_frame_sender: Some(video_ring_sender), + filter_graph, }) } fn process(&mut self, frame: &crate::RawVideoFrame) -> Result<(), ffmpeg::Error> { if let Some(ref mut encoder) = self.encoder { - // Convert BGRA to NV12 then transfer it to a hw frame and send it to the - // encoder - // - // TODO: Figure out how to use DMA BUF so we don't need to do this to optimize - // performance. This function takes 15ms on average to run which is very close to being - // over the 1/60 target fps - SCALER.with(|scaler_cell| { - let mut scaler = scaler_cell.borrow_mut(); - if scaler.is_none() { - *scaler = Some( - Scaler::get( - ffmpeg::format::Pixel::BGRA, - encoder.width(), - encoder.height(), - ffmpeg::format::Pixel::NV12, - encoder.width(), - encoder.height(), - Flags::BILINEAR, - ) - .unwrap(), - ); - } - - let mut src_frame = ffmpeg::util::frame::video::Video::new( - ffmpeg_next::format::Pixel::BGRA, - encoder.width(), - encoder.height(), + if let Some(fd) = frame.dmabuf_fd { + log::debug!( + "DMA Frame with fd: {}, size: {}, offset: {}, stride: {}", + fd, + frame.size, + frame.offset, + frame.stride ); - src_frame.data_mut(0).copy_from_slice(frame.get_bytes()); - let mut dst_frame = ffmpeg::util::frame::Video::new( - ffmpeg::format::Pixel::NV12, + let mut drm_frame = ffmpeg::util::frame::Video::new( + ffmpeg_next::format::Pixel::DRM_PRIME, encoder.width(), encoder.height(), ); + unsafe { + // Create DRM descriptor that points to the DMA buffer + let drm_desc = + Box::into_raw(Box::new(std::mem::zeroed::())); + + (*drm_desc).nb_objects = 1; + (*drm_desc).objects[0].fd = fd; + (*drm_desc).objects[0].size = 0; + (*drm_desc).objects[0].format_modifier = 0; + + (*drm_desc).nb_layers = 1; + (*drm_desc).layers[0].format = DrmFourcc::Argb8888 as u32; + (*drm_desc).layers[0].nb_planes = 1; + (*drm_desc).layers[0].planes[0].object_index = 0; + (*drm_desc).layers[0].planes[0].offset = frame.offset as isize; + (*drm_desc).layers[0].planes[0].pitch = frame.stride as isize; + + // Attach descriptor to frame + (*drm_frame.as_mut_ptr()).data[0] = drm_desc as *mut u8; + (*drm_frame.as_mut_ptr()).buf[0] = av_buffer_create( + drm_desc as *mut u8, + std::mem::size_of::(), + Some(av_buffer_default_free), + null_mut(), + 0, + ); - scaler + (*drm_frame.as_mut_ptr()).hw_frames_ctx = + av_buffer_ref((*encoder.as_ptr()).hw_frames_ctx); + } + + drm_frame.set_pts(Some(*frame.get_timestamp())); + self.filter_graph .as_mut() .unwrap() - .run(&src_frame, &mut dst_frame) + .get("in") + .unwrap() + .source() + .add(&drm_frame) .unwrap(); - let mut vaapi_frame = ffmpeg::util::frame::video::Video::new( - encoder.format(), - encoder.width(), - encoder.height(), - ); + let mut filtered = ffmpeg::util::frame::Video::empty(); + if self + .filter_graph + .as_mut() + .unwrap() + .get("out") + .unwrap() + .sink() + .frame(&mut filtered) + .is_ok() + { + encoder.send_frame(&filtered)?; + } + } else { + // Convert BGRA to NV12 then transfer it to a hw frame and send it to the + // encoder + // + // TODO: deprecate this path? + SCALER.with(|scaler_cell| { + let mut scaler = scaler_cell.borrow_mut(); + if scaler.is_none() { + *scaler = Some( + Scaler::get( + ffmpeg::format::Pixel::BGRA, + encoder.width(), + encoder.height(), + ffmpeg::format::Pixel::NV12, + encoder.width(), + encoder.height(), + Flags::BILINEAR, + ) + .unwrap(), + ); + } - unsafe { - let err = av_hwframe_get_buffer( - (*encoder.as_ptr()).hw_frames_ctx, - vaapi_frame.as_mut_ptr(), - 0, + let mut src_frame = ffmpeg::util::frame::video::Video::new( + ffmpeg_next::format::Pixel::BGRA, + encoder.width(), + encoder.height(), ); + src_frame.data_mut(0).copy_from_slice(frame.get_bytes()); - if err < 0 { - error!("Error getting the hw frame buffer: {:?}", err); - } + let mut dst_frame = ffmpeg::util::frame::Video::new( + ffmpeg::format::Pixel::NV12, + encoder.width(), + encoder.height(), + ); + + scaler + .as_mut() + .unwrap() + .run(&src_frame, &mut dst_frame) + .unwrap(); + + let mut vaapi_frame = ffmpeg::util::frame::video::Video::new( + encoder.format(), + encoder.width(), + encoder.height(), + ); - let err = - av_hwframe_transfer_data(vaapi_frame.as_mut_ptr(), dst_frame.as_ptr(), 0); + unsafe { + let err = av_hwframe_get_buffer( + (*encoder.as_ptr()).hw_frames_ctx, + vaapi_frame.as_mut_ptr(), + 0, + ); - if err < 0 { - error!("Error transferring the frame data to hw frame: {:?}", err); + if err < 0 { + error!("Error getting the hw frame buffer: {:?}", err); + } + + let err = av_hwframe_transfer_data( + vaapi_frame.as_mut_ptr(), + dst_frame.as_ptr(), + 0, + ); + + if err < 0 { + error!("Error transferring the frame data to hw frame: {:?}", err); + } } - } - vaapi_frame.set_pts(Some(*frame.get_timestamp())); - - encoder.send_frame(&vaapi_frame).unwrap(); - - let mut packet = ffmpeg::codec::packet::Packet::empty(); - if encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - if let Some(ref mut sender) = self.encoded_frame_sender { - if sender - .try_push(( - packet.dts().unwrap_or(0), - VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ), - )) - .is_err() - { - log::error!("Could not send encoded packet to the ringbuf"); - } + vaapi_frame.set_pts(Some(*frame.get_timestamp())); + + encoder.send_frame(&vaapi_frame).unwrap(); + }); + } + + let mut packet = ffmpeg::codec::packet::Packet::empty(); + if encoder.receive_packet(&mut packet).is_ok() { + if let Some(data) = packet.data() { + if let Some(ref mut sender) = self.encoded_frame_sender { + if sender + .try_push(( + packet.dts().unwrap_or(0), + VideoFrameData::new( + data.to_vec(), + packet.is_key(), + packet.pts().unwrap_or(0), + ), + )) + .is_err() + { + log::error!("Could not send encoded packet to the ringbuf"); } - }; - } - }); + } + }; + } } Ok(()) } - /// Drain the encoder of any remaining frames it is processing + /// Drain the filter graph and encoder of any remaining frames it is processing fn drain(&mut self) -> Result<(), ffmpeg::Error> { if let Some(ref mut encoder) = self.encoder { + // Drain the filter graph + let mut filtered = ffmpeg::util::frame::Video::empty(); + while self + .filter_graph + .as_mut() + .unwrap() + .get("out") + .unwrap() + .sink() + .frame(&mut filtered) + .is_ok() + { + encoder.send_frame(&filtered)?; + } + + // Drain encoder encoder.send_eof()?; let mut packet = ffmpeg::codec::packet::Packet::empty(); while encoder.receive_packet(&mut packet).is_ok() { @@ -187,12 +279,13 @@ impl VideoEncoder for VaapiEncoder { fn reset(&mut self) -> anyhow::Result<()> { self.drop_encoder(); - self.encoder = Some(Self::create_encoder( - self.width, - self.height, - &self.encoder_name, - &self.quality, - )?); + let new_encoder = + Self::create_encoder(self.width, self.height, &self.encoder_name, &self.quality)?; + + let new_filter_graph = Self::create_filter_graph(&new_encoder, self.width, self.height)?; + + self.encoder = Some(new_encoder); + self.filter_graph = Some(new_filter_graph); Ok(()) } @@ -202,6 +295,7 @@ impl VideoEncoder for VaapiEncoder { fn drop_encoder(&mut self) { self.encoder.take(); + self.filter_graph.take(); } fn take_encoded_recv(&mut self) -> Option> { @@ -326,6 +420,50 @@ impl VaapiEncoder { } opts } + + fn create_filter_graph( + encoder: &ffmpeg::codec::encoder::Video, + width: u32, + height: u32, + ) -> anyhow::Result { + let mut graph = ffmpeg::filter::Graph::new(); + + let args = format!( + "video_size={}x{}:pix_fmt=bgra:time_base=1/1000000", + width, height + ); + + let mut input = graph.add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args)?; + + let mut hwmap = graph.add( + &ffmpeg::filter::find("hwmap").unwrap(), + "hwmap", + "mode=read+write:derive_device=vaapi", + )?; + + let scale_args = format!("w={}:h={}:format=nv12:out_range=tv", width, height); + let mut scale = graph.add( + &ffmpeg::filter::find("scale_vaapi").unwrap(), + "scale", + &scale_args, + )?; + + let mut out = graph.add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "")?; + unsafe { + let dev = (*encoder.as_ptr()).hw_device_ctx; + + (*hwmap.as_mut_ptr()).hw_device_ctx = av_buffer_ref(dev); + } + + input.link(0, &mut hwmap, 0); + hwmap.link(0, &mut scale, 0); + scale.link(0, &mut out, 0); + + graph.validate()?; + log::trace!("Graph\n{}", graph.dump()); + + Ok(graph) + } } impl Drop for VaapiEncoder { diff --git a/src/main.rs b/src/main.rs index 5558a49..7a7a26f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,11 +14,13 @@ mod modes; mod pw_capture; mod waycap; +use std::os::fd::RawFd; + use anyhow::{Context, Error, Result}; use application_config::load_or_create_config; use encoders::{ audio_encoder::{AudioEncoderImpl, FfmpegAudioEncoder}, - buffer::{AudioBuffer, ShadowCaptureVideoBuffer}, + buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer}, video_encoder::ONE_MICROS, }; use ffmpeg_next::{self as ffmpeg}; @@ -51,6 +53,10 @@ impl RawAudioFrame { pub struct RawVideoFrame { bytes: Vec, timestamp: i64, + dmabuf_fd: Option, + stride: i32, + offset: u32, + size: u32, } impl RawVideoFrame { @@ -65,7 +71,7 @@ impl RawVideoFrame { pub struct Terminate; -#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +#[tokio::main] async fn main() -> Result<(), Error> { pw::init(); ffmpeg::init()?; @@ -84,7 +90,7 @@ fn save_buffer( filename: &str, video_buffer: &ShadowCaptureVideoBuffer, video_encoder: &ffmpeg::codec::encoder::Video, - audio_buffer: &AudioBuffer, + audio_buffer: &ShadowCaptureAudioBuffer, audio_encoder: &FfmpegAudioEncoder, ) -> Result<()> { let mut output = ffmpeg::format::output(&filename)?; diff --git a/src/modes/shadow_cap.rs b/src/modes/shadow_cap.rs index a5367be..9702f24 100644 --- a/src/modes/shadow_cap.rs +++ b/src/modes/shadow_cap.rs @@ -11,8 +11,8 @@ use crate::{ app_context::AppContext, application_config, encoders::{ - audio_encoder::{AudioEncoder, FfmpegAudioEncoder}, - buffer::{ShadowCaptureVideoBuffer, VideoFrameData}, + audio_encoder::{AudioEncoder, EncodedAudioFrameData, FfmpegAudioEncoder}, + buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer, VideoFrameData}, nvenc_encoder::NvencEncoder, vaapi_encoder::VaapiEncoder, video_encoder::{VideoEncoder, ONE_MICROS}, @@ -26,6 +26,7 @@ pub struct ShadowCapMode { audio_encoder: Option>>>, video_encoder: Option>>, video_buffer: Arc>, + audio_buffer: Arc>, } impl AppMode for ShadowCapMode { @@ -53,12 +54,20 @@ impl AppMode for ShadowCapMode { Arc::clone(&video_encoder), ); ctx.join_handles.push(video_worker); - self.video_encoder = Some(video_encoder.clone()); + + let recv = { video_encoder.lock().await.take_encoded_recv() } + .context("Could not take encoded frame recv")?; + let shadow_worker = Self::create_shadow_video_worker( + recv, + Arc::clone(&self.video_buffer), + Arc::clone(&ctx.stop), + ); + ctx.join_handles.push(shadow_worker); + self.video_encoder = Some(video_encoder); // Audio let audio_encoder = Arc::new(Mutex::new(AudioEncoder::new_with_encoder( FfmpegAudioEncoder::new_opus, - ctx.config.max_seconds, )?)); let audio_owned_recv = ctx @@ -72,14 +81,17 @@ impl AppMode for ShadowCapMode { Arc::clone(&audio_encoder), ); ctx.join_handles.push(audio_worker); - self.audio_encoder = Some(audio_encoder); - let recv = { video_encoder.lock().await.take_encoded_recv() } - .context("Could not take encoded frame recv")?; - let shadow_worker = - Self::create_shadow_worker(recv, Arc::clone(&self.video_buffer), Arc::clone(&ctx.stop)); + let aud_recv = { audio_encoder.lock().await.take_encoded_recv() } + .context("Could not take ownership of encoded audio frame recv")?; - ctx.join_handles.push(shadow_worker); + let audio_shadow_worker = Self::create_shadow_audio_worker( + aud_recv, + Arc::clone(&self.audio_buffer), + Arc::clone(&ctx.stop), + ); + ctx.join_handles.push(audio_shadow_worker); + self.audio_encoder = Some(audio_encoder); log::debug!("Successfully initialized Shadow Capture Mode"); Ok(()) @@ -89,10 +101,11 @@ impl AppMode for ShadowCapMode { ctx.saving.store(true, std::sync::atomic::Ordering::Release); if let Some(video_encoder) = &self.video_encoder { if let Some(audio_encoder) = &self.audio_encoder { - let (mut video_lock, mut audio_lock, mut video_buffer) = tokio::join!( + let (mut video_lock, mut audio_lock, mut video_buffer, mut audio_buffer) = tokio::join!( video_encoder.lock(), audio_encoder.lock(), - self.video_buffer.lock() + self.video_buffer.lock(), + self.audio_buffer.lock(), ); // Drain both encoders of any remaining frames being processed @@ -105,7 +118,6 @@ impl AppMode for ShadowCapMode { .as_ref() .context("Could not get video encoder")?; - let audio_buffer = audio_lock.get_buffer(); let audio_encoder = audio_lock .get_encoder() .as_ref() @@ -115,12 +127,13 @@ impl AppMode for ShadowCapMode { &filename, &video_buffer, video_encoder, - audio_buffer, + &audio_buffer, audio_encoder, )?; video_lock.reset()?; video_buffer.reset(); + audio_buffer.reset(); audio_lock.reset_encoder(FfmpegAudioEncoder::new_opus)?; } } @@ -158,6 +171,9 @@ impl ShadowCapMode { video_buffer: Arc::new(Mutex::new(ShadowCaptureVideoBuffer::new( actual_max as usize, ))), + audio_buffer: Arc::new(Mutex::new(ShadowCaptureAudioBuffer::new( + actual_max as usize, + ))), }) } @@ -256,7 +272,7 @@ impl ShadowCapMode { }) } - fn create_shadow_worker( + fn create_shadow_video_worker( mut recv: HeapCons<(i64, VideoFrameData)>, buffer: Arc>, stop: Arc, @@ -273,4 +289,24 @@ impl ShadowCapMode { std::thread::sleep(Duration::from_nanos(100)); }) } + + fn create_shadow_audio_worker( + mut recv: HeapCons, + audio_buffer: Arc>, + stop: Arc, + ) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || loop { + if stop.load(std::sync::atomic::Ordering::Acquire) { + break; + } + + while let Some(encoded_frame) = recv.try_pop() { + let mut audio_buf = audio_buffer.blocking_lock(); + audio_buf.insert_capture_time(encoded_frame.timestamp); + audio_buf.insert(encoded_frame.pts, encoded_frame.data); + } + + std::thread::sleep(Duration::from_nanos(100)); + }) + } } diff --git a/src/pw_capture/video_stream.rs b/src/pw_capture/video_stream.rs index 1abe3d5..664a3f4 100644 --- a/src/pw_capture/video_stream.rs +++ b/src/pw_capture/video_stream.rs @@ -8,7 +8,10 @@ use pipewire::{ self as pw, context::Context, main_loop::MainLoop, - spa::utils::Direction, + spa::{ + buffer::{Data, DataType}, + utils::Direction, + }, stream::{Stream, StreamFlags, StreamState}, }; use pw::{properties::properties, spa}; @@ -163,19 +166,26 @@ impl VideoCapture { // send frame data to encoder let data = &mut datas[0]; - if let Some(frame) = data.data() { - if ringbuf_producer + + let fd = Self::get_dmabuf_fd(data); + + if fd.is_some() + && ringbuf_producer .try_push(RawVideoFrame { - bytes: frame.to_vec(), + // No need to copy the frame data + bytes: Vec::new(), timestamp: time_us, + dmabuf_fd: fd, + stride: data.chunk().stride(), + offset: data.chunk().offset(), + size: data.chunk().size(), }) .is_err() - { - log::error!( - "Error sending video frame at: {:?}. Ring buf full?", - time_us - ); - } + { + log::error!( + "Error sending video frame at: {:?}. Ring buf full?", + time_us + ); } } } @@ -200,13 +210,14 @@ impl VideoCapture { Choice, Enum, Id, - pw::spa::param::video::VideoFormat::xRGB, - pw::spa::param::video::VideoFormat::RGB, - pw::spa::param::video::VideoFormat::RGB, - pw::spa::param::video::VideoFormat::RGBA, - pw::spa::param::video::VideoFormat::RGBx, - pw::spa::param::video::VideoFormat::BGRx, + pw::spa::param::video::VideoFormat::NV12, pw::spa::param::video::VideoFormat::I420, + pw::spa::param::video::VideoFormat::BGRA, + ), + pw::spa::pod::property!( + pw::spa::param::format::FormatProperties::VideoModifier, + Long, + 0 ), pw::spa::pod::property!( pw::spa::param::format::FormatProperties::VideoSize, @@ -259,4 +270,18 @@ impl VideoCapture { pw_loop.run(); Ok(()) } + + fn get_dmabuf_fd(data: &Data) -> Option { + let raw_data = data.as_raw(); + + if data.type_() == DataType::DmaBuf { + let fd = raw_data.fd; + + if fd > 0 { + return Some(fd as i32); + } + } + + None + } } diff --git a/src/waycap.rs b/src/waycap.rs index 094ff31..245e532 100644 --- a/src/waycap.rs +++ b/src/waycap.rs @@ -87,7 +87,7 @@ impl WayCap { let _ = active_cast.close(); }); - // Window mode return (0, 0) for dimensions to we have to get it from pipewire + // Window mode return (0, 0) for dimensions so we have to get it from pipewire if (width, height) == (0, 0) { // Wait to get back a negotiated resolution from pipewire let timeout = Duration::from_secs(5); From 36bc41dfaecf8dfe8be8dcae9d097a4d6b512c12 Mon Sep 17 00:00:00 2001 From: Adonis Carvajal Date: Tue, 27 May 2025 23:29:23 -0400 Subject: [PATCH 2/2] Refactored to use waycap lib --- Cargo.lock | 56 +- Cargo.toml | 2 +- portal-screencast/.gitignore | 1 - portal-screencast/Cargo.lock | 285 ---------- portal-screencast/Cargo.toml | 19 - portal-screencast/README.md | 29 - portal-screencast/build.rs | 36 -- .../org.freedesktop.portal.Request.xml | 85 --- .../org.freedesktop.portal.ScreenCast.xml | 233 -------- .../org.freedesktop.portal.Session.xml | 75 --- portal-screencast/src/generated.rs | 16 - portal-screencast/src/lib.rs | 518 ------------------ src/app_context.rs | 9 +- src/encoders/audio_encoder.rs | 307 ----------- src/encoders/buffer.rs | 39 +- src/encoders/buffer_tests.rs | 73 +-- src/encoders/mod.rs | 4 - src/encoders/nvenc_encoder.rs | 209 ------- src/encoders/vaapi_encoder.rs | 476 ---------------- src/encoders/video_encoder.rs | 22 - src/main.rs | 95 +--- src/modes/shadow_cap.rs | 226 +------- src/pw_capture/audio_stream.rs | 225 -------- src/pw_capture/mod.rs | 2 - src/pw_capture/video_stream.rs | 287 ---------- src/waycap.rs | 117 +--- 26 files changed, 161 insertions(+), 3285 deletions(-) delete mode 100644 portal-screencast/.gitignore delete mode 100644 portal-screencast/Cargo.lock delete mode 100644 portal-screencast/Cargo.toml delete mode 100644 portal-screencast/README.md delete mode 100644 portal-screencast/build.rs delete mode 100644 portal-screencast/dbus_introspections/org.freedesktop.portal.Request.xml delete mode 100644 portal-screencast/dbus_introspections/org.freedesktop.portal.ScreenCast.xml delete mode 100644 portal-screencast/dbus_introspections/org.freedesktop.portal.Session.xml delete mode 100644 portal-screencast/src/generated.rs delete mode 100644 portal-screencast/src/lib.rs delete mode 100644 src/encoders/audio_encoder.rs delete mode 100644 src/encoders/nvenc_encoder.rs delete mode 100644 src/encoders/vaapi_encoder.rs delete mode 100644 src/encoders/video_encoder.rs delete mode 100644 src/pw_capture/audio_stream.rs delete mode 100644 src/pw_capture/mod.rs delete mode 100644 src/pw_capture/video_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 0ee9ea2..2e1bd43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,9 +343,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.21.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" +checksum = "9134a6ef01ce4b366b50689c94f82c14bc72bc5d0386829828a2e2752ef7958c" [[package]] name = "byteorder" @@ -548,6 +548,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctrlc" +version = "3.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73" +dependencies = [ + "nix 0.30.1", + "windows-sys 0.59.0", +] + [[package]] name = "dbus" version = "0.9.7" @@ -1095,9 +1105,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.25" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "memchr" @@ -1164,6 +1174,18 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.8.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1402,8 +1424,10 @@ dependencies = [ ] [[package]] -name = "portal-screencast" -version = "0.1.1" +name = "portal-screencast-waycap" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc755be8a0854a68a4b0c45585d6e90d692e9504360dc8a138e63e0d1577eba6" dependencies = [ "bitflags 1.3.2", "dbus", @@ -2093,16 +2117,34 @@ dependencies = [ "libc", "log", "pipewire", - "portal-screencast", "ringbuf", "serde", "serde_derive", "simple-logging", "tokio", "toml", + "waycap-rs", "zbus", ] +[[package]] +name = "waycap-rs" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63c4e4df0dd35671b2d68d2e27c425bcf9ef412062ff165020682599ed90c554" +dependencies = [ + "bytemuck", + "ctrlc", + "drm-fourcc", + "ffmpeg-next", + "libc", + "log", + "pipewire", + "portal-screencast-waycap", + "ringbuf", + "simple-logging", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index b047b46..3e85c75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ ffmpeg-next = { version = "7.1.0", features = ["codec", "format"] } libc = "0.2.172" log = "0.4.25" pipewire = "0.8.0" -portal-screencast = { path = "portal-screencast" } ringbuf = "0.4.8" serde = { version = "1.0.219", features = ["derive"] } serde_derive = "1.0.219" @@ -23,6 +22,7 @@ simple-logging = "2.0.2" tokio = {version = "1.43.0", features = ["full", "rt-multi-thread"] } toml = "0.8.20" zbus = {version = "5.3.1", features = ["tokio"] } +waycap-rs = "0.1.3" [profile.dev] debug = true diff --git a/portal-screencast/.gitignore b/portal-screencast/.gitignore deleted file mode 100644 index eb5a316..0000000 --- a/portal-screencast/.gitignore +++ /dev/null @@ -1 +0,0 @@ -target diff --git a/portal-screencast/Cargo.lock b/portal-screencast/Cargo.lock deleted file mode 100644 index 19e43d6..0000000 --- a/portal-screencast/Cargo.lock +++ /dev/null @@ -1,285 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 4 - -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "clap" -version = "2.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" -dependencies = [ - "ansi_term", - "atty", - "bitflags", - "strsim", - "textwrap", - "unicode-width", - "vec_map", -] - -[[package]] -name = "dbus" -version = "0.9.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bb21987b9fb1613058ba3843121dd18b163b254d8a6e797e144cbac14d96d1b" -dependencies = [ - "libc", - "libdbus-sys", - "winapi", -] - -[[package]] -name = "dbus-codegen" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a49da9fdfbe872d4841d56605dc42efa5e6ca3291299b87f44e1cde91a28617c" -dependencies = [ - "clap", - "dbus", - "xml-rs", -] - -[[package]] -name = "getrandom" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - -[[package]] -name = "libc" -version = "0.2.170" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" - -[[package]] -name = "libdbus-sys" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06085512b750d640299b79be4bad3d2fa90a9c00b1fd9e1b46364f66f0485c72" -dependencies = [ - "pkg-config", -] - -[[package]] -name = "pkg-config" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" - -[[package]] -name = "portal-screencast" -version = "0.1.1" -dependencies = [ - "bitflags", - "dbus", - "dbus-codegen", - "rand", -] - -[[package]] -name = "ppv-lite86" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" -dependencies = [ - "zerocopy", -] - -[[package]] -name = "proc-macro2" -version = "1.0.93" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.38" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - -[[package]] -name = "syn" -version = "2.0.98" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - -[[package]] -name = "unicode-ident" -version = "1.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00e2473a93778eb0bad35909dff6a10d28e63f792f16ed15e404fca9d5eeedbe" - -[[package]] -name = "unicode-width" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" - -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "xml-rs" -version = "0.8.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5b940ebc25896e71dd073bad2dbaa2abfe97b0a391415e22ad1326d9c54e3c4" - -[[package]] -name = "zerocopy" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" -dependencies = [ - "byteorder", - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] diff --git a/portal-screencast/Cargo.toml b/portal-screencast/Cargo.toml deleted file mode 100644 index d8a04ef..0000000 --- a/portal-screencast/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "portal-screencast" -version = "0.1.1" -description = "Rustic interface to the ScreenCast Desktop Portal" -documentation = "https://docs.rs/portal-screencast" -repository = "https://github.com/iwillspeak/obs-screencap" -license = "MIT" -keywords = ["ffi","screen","capture","dbus","linux"] -authors = ["Will Speak "] -edition = "2018" -readme = "README.md" - -[dependencies] -dbus = "0.9" -rand = "0.8" -bitflags = "1.2" - -[build-dependencies] -dbus-codegen = "0.9" diff --git a/portal-screencast/README.md b/portal-screencast/README.md deleted file mode 100644 index 5cd84b3..0000000 --- a/portal-screencast/README.md +++ /dev/null @@ -1,29 +0,0 @@ -# Rust API to the ScreenCast Desktop Portal - -Access to system resources on Linux can be negotitated in some environments -through calls to D-Bus 'portal' APIs. One such portal is the [`ScreenCast`][sc] -portal. This portal allows a user to choose which windows or screens to share -and provides access to raw video data through PipeWire. - -## Simple Use - -In the simples case this crate can be used to open a new screen cast with the -default settings: - -```rust -let screen_cast = ScreenCast::new()?.Start()?; -``` - -## Structure - -There are three main objects to interact with: `ScreenCast`, `ActiveScreenCast`, -and `ScreenCastStream`. The `ScreenCast` type is used to configure what type -of screen cast to prompt the user for. It is tramsformed into an -`ActiveScreenCast` by calling `start()`. Once active interaction with the cast -takes place over a Pipewire session using the `pipewire_fd()` and `streams()`. - -Under the hood this is be backed by some private structs: `ConnectionState` to -manage our D-Bus connection; `Request`, and `Session` to handle interacting with -request and session proxies. - - [sc]: https://flatpak.github.io/xdg-desktop-portal/portal-docs.html#gdbus-org.freedesktop.portal.ScreenCast \ No newline at end of file diff --git a/portal-screencast/build.rs b/portal-screencast/build.rs deleted file mode 100644 index b285864..0000000 --- a/portal-screencast/build.rs +++ /dev/null @@ -1,36 +0,0 @@ -use dbus_codegen::{ConnectionType, GenOpts}; -use std::{env, error::Error, path::Path}; - -fn introspect_one(out_dir: &Path, xml_name: &str) -> Result<(), Box> { - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - let manifest_dir = Path::new(&manifest_dir); - let gen_opts = GenOpts { - connectiontype: ConnectionType::Blocking, - methodtype: None, - dbuscrate: "::dbus".into(), - ..Default::default() - }; - let introspect_path = manifest_dir.join(format!( - "dbus_introspections/org.freedesktop.portal.{0}.xml", - xml_name - )); - println!("cargo:rerun-if-changed={0}", introspect_path.display()); - let src = dbus_codegen::generate(&std::fs::read_to_string(introspect_path)?, &gen_opts)?; - std::fs::write( - out_dir.join(format!("{0}.rs", xml_name.to_lowercase())), - src, - )?; - Ok(()) -} - -fn main() -> Result<(), Box> { - println!("cargo:rerun-if-changed=build.rs"); - - let out_dir = env::var("OUT_DIR")?; - let out_dir = Path::new(&out_dir); - introspect_one(out_dir, "Request")?; - introspect_one(out_dir, "Session")?; - introspect_one(out_dir, "ScreenCast")?; - - Ok(()) -} diff --git a/portal-screencast/dbus_introspections/org.freedesktop.portal.Request.xml b/portal-screencast/dbus_introspections/org.freedesktop.portal.Request.xml deleted file mode 100644 index 6d285ea..0000000 --- a/portal-screencast/dbus_introspections/org.freedesktop.portal.Request.xml +++ /dev/null @@ -1,85 +0,0 @@ - - - - - - - - - - - - - - - - - - diff --git a/portal-screencast/dbus_introspections/org.freedesktop.portal.ScreenCast.xml b/portal-screencast/dbus_introspections/org.freedesktop.portal.ScreenCast.xml deleted file mode 100644 index d80489a..0000000 --- a/portal-screencast/dbus_introspections/org.freedesktop.portal.ScreenCast.xml +++ /dev/null @@ -1,233 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/portal-screencast/dbus_introspections/org.freedesktop.portal.Session.xml b/portal-screencast/dbus_introspections/org.freedesktop.portal.Session.xml deleted file mode 100644 index d44abdf..0000000 --- a/portal-screencast/dbus_introspections/org.freedesktop.portal.Session.xml +++ /dev/null @@ -1,75 +0,0 @@ - - - - - - - - - - - - - - - - - - diff --git a/portal-screencast/src/generated.rs b/portal-screencast/src/generated.rs deleted file mode 100644 index 19b43a8..0000000 --- a/portal-screencast/src/generated.rs +++ /dev/null @@ -1,16 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_imports)] - -mod request { - include!(concat!(env!("OUT_DIR"), "/request.rs")); -} -mod session { - include!(concat!(env!("OUT_DIR"), "/session.rs")); -} -mod screencast { - include!(concat!(env!("OUT_DIR"), "/screencast.rs")); -} - -pub use request::*; -pub use screencast::*; -pub use session::*; diff --git a/portal-screencast/src/lib.rs b/portal-screencast/src/lib.rs deleted file mode 100644 index 17c57f2..0000000 --- a/portal-screencast/src/lib.rs +++ /dev/null @@ -1,518 +0,0 @@ -//! # XDG ScreenCast Portal utilities -//! -//! This module defines an interface for interacting with the ScreenCast portal. -//! -//! The general interaction pattern with the `ScreenCast` portal is to open a -//! session, set which source types are of interest, and call `start()`. -//! -//! ```no_run -//! # use portal_screencast::{ScreenCast, PortalError}; -//! # fn test() -> Result<(), PortalError> { -//! let screen_cast = ScreenCast::new()?.start(None)?; -//! # Ok(()) -//! # } -//! ``` -//! -//! In more complex cases you can modify the `ScreenCast` before starting it: -//! -//! ```no_run -//! # use portal_screencast::{ScreenCast, PortalError, SourceType}; -//! # fn test() -> Result<(), PortalError> { -//! let mut screen_cast = ScreenCast::new()?; -//! // Set which source types to allow, and enable multiple items to be shared. -//! screen_cast.set_source_types(SourceType::MONITOR); -//! screen_cast.enable_multiple(); -//! screen_cast.set_cursor_mode(CursorMode::HIDDEN); -//! // If you have a window handle you can tie the dialog to it -//! let screen_cast = screen_cast.start(Some("wayland:"))?; -//! # Ok(()) -//! # } -//! ``` - -use bitflags::bitflags; -use dbus::{ - arg::{OwnedFd, RefArg, Variant}, - blocking::{Connection, Proxy}, - channel::Token, - Message, Path, -}; -use generated::{ - OrgFreedesktopPortalRequestResponse, OrgFreedesktopPortalScreenCast, - OrgFreedesktopPortalSession, -}; -use std::{ - collections::HashMap, - convert::TryInto, - os::unix::prelude::RawFd, - sync::mpsc::{self, Receiver}, - time::Duration, -}; - -mod generated; - -// - - - - - - - - - - - - - - - Public Interface - - - - - - - - - - - - - - - -/// Desktop portal error. This could be an error from the underlying `dbus` -/// library, a generic error string, or some structured error. -#[derive(Debug)] -pub enum PortalError { - /// A generic error string describing the problem. - Generic(String), - /// A raw error from the `dbus` library. - DBus(dbus::Error), - /// A problem with deserialising the response to a portal request. - Parse, - /// Cancelled by the user. - Cancelled, -} - -impl std::convert::From for PortalError { - fn from(error_string: String) -> Self { - PortalError::Generic(error_string) - } -} - -impl std::convert::From for PortalError { - fn from(err: dbus::Error) -> Self { - PortalError::DBus(err) - } -} - -impl std::fmt::Display for PortalError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "D-Bus Portal error: {0:?}", self) - } -} - -impl std::error::Error for PortalError {} - -/// An un-opened screencast session. This can be queried for the supported -/// capture source types, and used to configure which source types to prompt -/// for. Each `ScreenCast` can be mde active once by calling `start()`. -pub struct ScreenCast { - state: ConnectionState, - session: String, - multiple: bool, - source_types: Option, - cursor_mode: Option, -} - -impl ScreenCast { - /// Create a new ScreenCast Session - /// - /// Connects to D-Bus and initaialises a ScreenCast object. - pub fn new() -> Result { - let state = ConnectionState::open_new()?; - - let session = { - let request = Request::with_handler(&state, |a| { - a.results - .get("session_handle") - .unwrap() - .as_str() - .unwrap() - .to_owned() - })?; - // Make the initail call to open the session. - let mut session_args = HashMap::>>::new(); - session_args.insert( - "handle_token".into(), - Variant(Box::new(String::from(&request.handle))), - ); - session_args.insert( - "session_handle_token".into(), - Variant(Box::new(String::from(&request.handle))), - ); - state.desktop_proxy().create_session(session_args)?; - request.wait_response()? - }; - - Ok(ScreenCast { - state, - session, - multiple: false, - source_types: None, - cursor_mode: None, - }) - } - - /// Get the supported source types for this connection - pub fn source_types(&self) -> Result { - let types = self.state.desktop_proxy().available_source_types()?; - Ok(SourceType::from_bits_truncate(types)) - } - - /// Set the source types to capture. This should be a subset of - /// those from `source_types()`. - pub fn set_source_types(&mut self, types: SourceType) { - self.source_types = Some(types); - } - - // Set cursor visibilty/mode (HIDDEN by default) - pub fn set_cursor_mode(&mut self, mode: CursorMode) { - self.cursor_mode = Some(mode); - } - - /// Enable multi-stream selection. This allows the user to choose more than - /// one thing to share. Each will be a separate item in the - /// `ActiveScreenCast::streams()` iterator. - pub fn enable_multiple(&mut self) { - self.multiple = true; - } - - /// Try to start the screen cast. This will prompt the user to select a - /// source to share. - pub fn start(self, parent_window: Option<&str>) -> Result { - let desktop_proxy = self.state.desktop_proxy(); - - { - let request = Request::new(&self.state)?; - let session = dbus::Path::from(&self.session); - let mut select_args = HashMap::>>::new(); - select_args.insert( - "handle_token".into(), - Variant(Box::new(String::from(&request.handle))), - ); - select_args.insert( - "types".into(), - Variant(Box::new(match self.source_types { - Some(types) => types.bits(), - None => desktop_proxy.available_source_types()?, - })), - ); - select_args.insert("multiple".into(), Variant(Box::new(self.multiple))); - select_args.insert( - "cursor_mode".into(), - Variant(Box::new(match self.cursor_mode { - Some(mode) => mode.bits(), - None => CursorMode::HIDDEN.bits(), - })), - ); - - desktop_proxy.select_sources(session, select_args)?; - request.wait_response()?; - } - - let streams = { - let request = Request::with_handler(&self.state, |response| { - if response.response != 0 { - return Err(PortalError::Cancelled); - } - match response.results.get("streams") { - Some(streams) => match streams.as_iter() { - Some(streams) => streams - .flat_map(|s| { - s.as_iter() - .into_iter() - .flat_map(|t| t.map(|u| u.try_into())) - }) - .collect(), - None => Err(PortalError::Parse), - }, - None => Err(PortalError::Parse), - } - })?; - let session = dbus::Path::from(&self.session); - let mut select_args = HashMap::>>::new(); - select_args.insert( - "handle_token".into(), - Variant(Box::new(String::from(&request.handle))), - ); - desktop_proxy.start(session, parent_window.unwrap_or(""), select_args)?; - request.wait_response()? - }?; - - let pipewire_fd = - desktop_proxy.open_pipe_wire_remote(dbus::Path::from(&self.session), HashMap::new())?; - - Ok(ActiveScreenCast { - state: self.state, - session_path: self.session, - pipewire_fd, - streams, - }) - } -} - -/// An active ScreenCast session. This holds a file descriptor for connecting -/// to PipeWire along with metadata for the active streams. -pub struct ActiveScreenCast { - state: ConnectionState, - session_path: String, - pipewire_fd: OwnedFd, - streams: Vec, -} - -impl ActiveScreenCast { - /// Get the fille descriptor for the PipeWire session. - pub fn pipewire_fd(&self) -> RawFd { - self.pipewire_fd.clone().into_fd() - } - - /// Get the streams active in this ScreenCast. - pub fn streams(&self) -> impl Iterator { - self.streams.iter() - } - - /// Close the ScreenCast session. This ends the cast. - pub fn close(&self) -> Result<(), PortalError> { - // Open a handle to the active session, and close it. - let session = Session::open(&self.state, &self.session_path)?; - session.close()?; - Ok(()) - } -} - -impl std::ops::Drop for ActiveScreenCast { - fn drop(&mut self) { - let _ = self.close(); - } -} - -/// A single active stream -/// -/// Each item being captured in the `ScreenCast` appears as a stream. This holds -/// metadata about how to access the stream from the PipeWire session. -#[derive(Debug)] -pub struct ScreenCastStream { - pipewire_node: u32, - width: u32, - height: u32, -} - -impl ScreenCastStream { - /// Get the PipeWire node ID for this stream. - pub fn pipewire_node(&self) -> u32 { - self.pipewire_node - } - - pub fn width(&self) -> u32 { - self.width - } - - pub fn height(&self) -> u32 { - self.height - } - - pub fn size(&self) -> (u32, u32) { - (self.width(), self.height()) - } -} - -impl std::convert::TryFrom<&dyn RefArg> for ScreenCastStream { - type Error = PortalError; - - fn try_from(value: &dyn RefArg) -> Result { - let mut parts_iter = value.as_iter().ok_or(PortalError::Parse)?; - - // Get node id - let node_id = parts_iter - .next() - .and_then(|r| r.as_u64()) - .map(|r| r as u32) - .ok_or(PortalError::Parse)?; - - let metadata = parts_iter.next().ok_or(PortalError::Parse)?; - - let mut width = 0; - let mut height = 0; - - if let Some(mut dict_iter) = metadata.as_iter() { - while let Some(key) = dict_iter.next() { - if key.as_str() == Some("size") { - if let Some(values) = dict_iter.next().ok_or(PortalError::Parse)?.as_iter() { - for v in values { - let mut v_iter = v.as_iter().ok_or(PortalError::Parse)?; - width = v_iter - .next() - .and_then(|w| w.as_i64()) - .map(|w| w as u32) - .ok_or(PortalError::Parse)?; - - height = v_iter - .next() - .and_then(|h| h.as_i64()) - .map(|h| h as u32) - .ok_or(PortalError::Parse)?; - } - } else { - return Err(PortalError::Parse); - } - } - } - } - - Ok(ScreenCastStream { - pipewire_node: node_id, - width, - height, - }) - } -} - -bitflags! { - /// Source Type Bitflags - /// - /// Use `MONITOR` to capture froma screen, `WINDOW` to capture a single - /// window, or `all()` to capture either. - pub struct SourceType : u32 { - const MONITOR = 0b00001; - const WINDOW = 0b00010; - } - - /// Cursor Mode Bitflags - /// - /// Refer to the freedesktop [docs](https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.impl.portal.ScreenCast.html#org-freedesktop-impl-portal-screencast-availablecursormodes) - /// to see more details about what these each mean - /// - /// Default: HIDDEN - pub struct CursorMode : u32 { - const HIDDEN = 0b00001; - const EMBEDDED = 0b00010; - const METADATA = 0b00100; - } -} - -// - - - - - - - - - - - - - - Private Implementation - - - - - - - - - - - - - -/// D-Bus connection state. Used to access the Desktop portal -/// and open our screencast. -struct ConnectionState { - connection: Connection, - sender_token: String, -} - -impl ConnectionState { - /// Open a new D-Bus connection to use for all our requests - pub fn open_new() -> Result { - // Create a new session and work out our session's sender token. Portal - // requests will send responses to paths based on this token. - let connection = Connection::new_session()?; - let sender_token = String::from(&connection.unique_name().replace(".", "_")[1..]); - Ok(ConnectionState { - connection, - sender_token, - }) - } - - /// Create a proxy to the main desktop portal object - pub fn desktop_proxy(&self) -> Proxy<&Connection> { - self.connection.with_proxy( - "org.freedesktop.portal.Desktop", - "/org/freedesktop/portal/desktop", - Duration::from_secs(20), - ) - } -} - -/// A request object. Portal requests are used to wait for responses to ongoing -/// portal operations. -struct Request<'a, Response> { - /// A proxy connected to this reuqest object on the bus. - proxy: Proxy<'a, &'a Connection>, - /// The handle for this request. - handle: String, - /// The channel reciever that we can read responses from. - response: Receiver, - /// The match token to remove our D-Bus matcher. - match_token: Token, -} - -impl<'a> Request<'a, ()> { - /// Create a new request object with the given connection. This generates - /// a random token for the handle. - pub fn new(state: &'a ConnectionState) -> Result { - Self::with_handler(state, |_| {}) - } -} - -impl<'a, Response> Request<'a, Response> { - /// Create a new request object with the given connection and handler. This - /// generates a random token for the handle. The results of the handler can - /// be retrieved by calling `wait_result()`. - pub fn with_handler( - state: &'a ConnectionState, - mut on_response: ResponseHandler, - ) -> Result - where - ResponseHandler: FnMut(OrgFreedesktopPortalRequestResponse) -> Response + Send + 'static, - Response: Send + 'static, - { - let handle = format!("screencap{0}", rand::random::()); - let resp_path = Path::new(format!( - "/org/freedesktop/portal/desktop/request/{0}/{1}", - state.sender_token, handle - ))?; - let proxy = state.connection.with_proxy( - "org.freedesktop.portal.Desktop", - resp_path, - Duration::from_secs(20), - ); - let (sender, response) = mpsc::channel(); - let match_token = proxy.match_signal( - move |a: OrgFreedesktopPortalRequestResponse, _: &Connection, _: &Message| { - // FIXME: handle error responses here somehow? Currently it is - // just up to the `on_response` to deal with it. - let res = on_response(a); - sender.send(res).is_ok() - }, - )?; - Ok(Request { - proxy, - handle, - response, - match_token, - }) - } - - pub fn wait_response(&self) -> Result { - // Pump the event loop until we receive our expected result - loop { - if let Ok(data) = self.response.try_recv() { - return Ok(data); - } else { - self.proxy.connection.process(Duration::from_millis(100))?; - } - } - } -} - -impl<'a, T> std::ops::Drop for Request<'a, T> { - fn drop(&mut self) { - let _ = self.proxy.match_stop(self.match_token, true); - } -} - -/// A session handle. -struct Session<'a> { - proxy: Proxy<'a, &'a Connection>, -} - -impl<'a> Session<'a> { - pub fn open(state: &'a ConnectionState, path: &str) -> Result { - let path = dbus::Path::new(path)?; - let proxy = state.connection.with_proxy( - "org.freedesktop.portal.Desktop", - path, - Duration::from_secs(20), - ); - Ok(Session { proxy }) - } - - pub fn close(&self) -> Result<(), PortalError> { - self.proxy.close()?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::SourceType; - - #[test] - pub fn check_source_types() { - assert_eq!(1, SourceType::MONITOR.bits()); - assert_eq!(2, SourceType::WINDOW.bits()); - assert_eq!(3, (SourceType::WINDOW | SourceType::MONITOR).bits()); - } -} diff --git a/src/app_context.rs b/src/app_context.rs index f4a09fe..6c053c2 100644 --- a/src/app_context.rs +++ b/src/app_context.rs @@ -1,14 +1,9 @@ -use crate::{application_config::AppConfig, RawAudioFrame, RawVideoFrame}; -use ringbuf::HeapCons; use std::sync::{atomic::AtomicBool, Arc}; +use waycap_rs::Capture; pub struct AppContext { pub saving: Arc, pub stop: Arc, pub join_handles: Vec>, - pub config: AppConfig, - pub width: u32, - pub height: u32, - pub video_ring_receiver: Option>, - pub audio_ring_receiver: Option>, + pub capture: Capture, } diff --git a/src/encoders/audio_encoder.rs b/src/encoders/audio_encoder.rs deleted file mode 100644 index 961c20c..0000000 --- a/src/encoders/audio_encoder.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::collections::VecDeque; - -use anyhow::Result; -use ffmpeg_next::{self as ffmpeg, Rational}; -use ringbuf::{ - traits::{Producer, Split}, - HeapCons, HeapProd, HeapRb, -}; - -use crate::RawAudioFrame; - -const MIN_RMS: f32 = 0.01; - -pub struct FfmpegAudioEncoder(ffmpeg::codec::encoder::Audio); - -pub trait AudioEncoderImpl { - type Error; - - fn codec(&self) -> Option; - fn time_base(&self) -> Rational; - - fn channels(&self) -> u16; - fn frame_size(&self) -> i32; - fn format(&self) -> ffmpeg::format::Sample; - fn channel_layout(&self) -> ffmpeg::channel_layout::ChannelLayout; - fn rate(&self) -> u32; - - fn send_frame(&mut self, frame: &ffmpeg::frame::Audio) -> Result<(), Self::Error>; - fn receive_packet( - &mut self, - pkt: &mut ffmpeg::codec::packet::Packet, - ) -> Result<(), Self::Error>; - fn send_eof(&mut self) -> Result<(), Self::Error>; -} - -impl AudioEncoderImpl for FfmpegAudioEncoder { - type Error = ffmpeg::Error; - - fn codec(&self) -> Option { - self.0.codec() - } - - fn time_base(&self) -> Rational { - self.0.time_base() - } - - fn channels(&self) -> u16 { - self.0.channels() - } - - fn frame_size(&self) -> i32 { - unsafe { (*self.0.as_ptr()).frame_size } - } - - fn format(&self) -> ffmpeg_next::format::Sample { - self.0.format() - } - - fn channel_layout(&self) -> ffmpeg_next::channel_layout::ChannelLayout { - self.0.channel_layout() - } - - fn rate(&self) -> u32 { - self.0.rate() - } - - fn send_frame(&mut self, frame: &ffmpeg_next::frame::Audio) -> Result<(), Self::Error> { - self.0.send_frame(frame) - } - - fn receive_packet( - &mut self, - pkt: &mut ffmpeg_next::codec::packet::Packet, - ) -> Result<(), Self::Error> { - self.0.receive_packet(pkt) - } - - fn send_eof(&mut self) -> Result<(), Self::Error> { - self.0.send_eof() - } -} - -impl FfmpegAudioEncoder { - pub fn new_opus() -> Result { - let encoder_codec = ffmpeg::codec::encoder::find(ffmpeg_next::codec::Id::OPUS) - .ok_or(ffmpeg::Error::EncoderNotFound)?; - - let mut encoder_ctx = ffmpeg::codec::context::Context::new_with_codec(encoder_codec) - .encoder() - .audio()?; - - encoder_ctx.set_rate(48000); - encoder_ctx.set_bit_rate(70_000); - encoder_ctx.set_format(ffmpeg::format::Sample::F32( - ffmpeg_next::format::sample::Type::Packed, - )); - encoder_ctx.set_time_base(Rational::new(1, 48000)); - encoder_ctx.set_frame_rate(Some(Rational::new(1, 48000))); - encoder_ctx.set_channel_layout(ffmpeg::channel_layout::ChannelLayout::STEREO); - - let mut encoder = encoder_ctx.open()?; - - // Opus frame size is based on n channels so need to update it - unsafe { - (*encoder.as_mut_ptr()).frame_size = - (encoder.frame_size() as i32 * encoder.channels() as i32) as i32; - } - - Ok(FfmpegAudioEncoder(encoder)) - } -} - -impl AsRef for FfmpegAudioEncoder { - fn as_ref(&self) -> &ffmpeg::codec::encoder::Audio { - &self.0 - } -} - -/// Represents an encoded audio frame encoded by the Opus encoder. -pub struct EncodedAudioFrameData { - /// The raw bytes of the encoded frame - pub data: Vec, - /// The presentation timestamp in the encoder's time base - /// (960 samples per channel = ~20 ms) - pub pts: i64, - /// The timestamp for when the frame was originally captured in micro seconds - pub timestamp: i64, -} - -pub struct AudioEncoder -where - E: AudioEncoderImpl, -{ - encoder: Option, - next_pts: i64, - leftover_data: VecDeque, - encoded_samples_recv: Option>, - encoded_samples_sender: Option>, - capture_timestamps: VecDeque, -} - -impl AudioEncoder -where - E: AudioEncoderImpl, -{ - pub fn new_with_encoder( - factory: impl Fn() -> Result, - ) -> Result { - let encoder = factory()?; - - let audio_ring_buffer = HeapRb::::new(5); - let (sender, receiver) = audio_ring_buffer.split(); - - Ok(Self { - encoder: Some(encoder), - next_pts: 0, - leftover_data: VecDeque::new(), - encoded_samples_recv: Some(receiver), - encoded_samples_sender: Some(sender), - capture_timestamps: VecDeque::with_capacity(4), - }) - } - - pub fn process(&mut self, raw_frame: &mut RawAudioFrame) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - let n_channels = encoder.channels() as usize; - let total_samples = raw_frame.samples.len(); - - if total_samples % n_channels != 0 { - return Err(ffmpeg::Error::InvalidData); - } - - let frame_size = encoder.frame_size() as usize; - - // Boost the audio so that even if system audio level is low - // it's still audible in playback - Self::boost_with_rms(raw_frame.get_samples_mut())?; - self.leftover_data.extend(raw_frame.get_samples()); - - // Send chunked frames to encoder - while self.leftover_data.len() >= frame_size { - let frame_samples: Vec = self.leftover_data.drain(..frame_size).collect(); - let mut frame = ffmpeg::frame::Audio::new( - encoder.format(), - frame_size, - encoder.channel_layout(), - ); - - // Capture time in vec - frame.plane_mut(0).copy_from_slice(&frame_samples); - frame.set_pts(Some(self.next_pts)); - frame.set_rate(encoder.rate()); - - self.capture_timestamps.push_back(raw_frame.timestamp); - encoder.send_frame(&frame)?; - - // Try and get a frame back from encoder - let mut packet = ffmpeg::codec::packet::Packet::empty(); - if encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - let pts = packet.pts().unwrap_or(0); - if let Some(ref mut sender) = self.encoded_samples_sender { - if sender - .try_push(EncodedAudioFrameData { - data: data.to_vec(), - pts, - timestamp: self.capture_timestamps.pop_front().unwrap_or(0), - }) - .is_err() - { - log::error!("Could not send encoded packet to audio ringbuf"); - } - } - } - } - - self.next_pts += frame_size as i64; - } - } - - Ok(()) - } - - pub fn get_encoder(&self) -> &Option { - &self.encoder - } - - // Drain remaining frames being processed in the encoder - pub fn drain(&mut self) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - encoder.send_eof()?; - let mut packet = ffmpeg::codec::packet::Packet::empty(); - while encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - let pts = packet.pts().unwrap_or(0); - if let Some(ref mut sender) = self.encoded_samples_sender { - if sender - .try_push(EncodedAudioFrameData { - data: data.to_vec(), - pts, - timestamp: self.capture_timestamps.pop_front().unwrap_or(0), - }) - .is_err() - { - log::error!("Could not send encoded packet to audio ringbuf"); - } - } - } - } - } - - Ok(()) - } - - pub fn drop_encoder(&mut self) { - self.encoder.take(); - } - - pub fn reset_encoder( - &mut self, - factory: impl Fn() -> Result, - ) -> Result<(), ffmpeg::Error> { - self.drop_encoder(); - self.encoder = Some(factory()?); - - Ok(()) - } - - pub fn take_encoded_recv(&mut self) -> Option> { - self.encoded_samples_recv.take() - } -} - -impl AudioEncoder -where - E: AudioEncoderImpl, -{ - fn boost_with_rms(samples: &mut [f32]) -> Result<(), ffmpeg::Error> { - let sum_sqrs = samples.iter().map(|&s| s * s).sum::(); - let rms = (sum_sqrs / samples.len() as f32).sqrt(); - - let gain = if rms > 0.0 && rms < MIN_RMS { - MIN_RMS / rms - } else { - 1.0 - }; - - let gain = gain.min(5.0); - for sample in samples.iter_mut() { - *sample *= gain; - } - Ok(()) - } -} - -impl Drop for AudioEncoder -where - E: AudioEncoderImpl, -{ - fn drop(&mut self) { - if let Err(e) = self.drain() { - log::error!("Error draining the audio encoder when dropping: {:?}", e); - } - - self.drop_encoder(); - } -} diff --git a/src/encoders/buffer.rs b/src/encoders/buffer.rs index 272aea1..dea6989 100644 --- a/src/encoders/buffer.rs +++ b/src/encoders/buffer.rs @@ -1,42 +1,13 @@ use std::collections::BTreeMap; -/// Represents a single encoded video frame -#[derive(Clone, Debug)] -pub struct VideoFrameData { - frame_bytes: Vec, - pts: i64, - is_key: bool, -} - -impl VideoFrameData { - pub fn new(frame_bytes: Vec, is_key: bool, pts: i64) -> Self { - Self { - frame_bytes, - is_key, - pts, - } - } - - pub fn get_raw_bytes(&self) -> &Vec { - &self.frame_bytes - } - - pub fn get_pts(&self) -> &i64 { - &self.pts - } - - pub fn is_key(&self) -> &bool { - &self.is_key - } -} +use waycap_rs::types::video_frame::EncodedVideoFrame; /// Rolling buffer which holds up to the last `max_time` seconds of video frames. /// /// The buffer is ordered by decoding timestamp (DTS) and maintains complete GOPs (groups of pictures), /// ensuring that no partial GOPs are kept when trimming for ease of muxing and playback. -#[derive(Clone)] pub struct ShadowCaptureVideoBuffer { - frames: BTreeMap, + frames: BTreeMap, /// Maximum duration (in seconds) that the buffer should retain. /// Once the difference between the newest and oldest frame exceeds this, older GOPs are trimmed. @@ -70,8 +41,8 @@ impl ShadowCaptureVideoBuffer { /// /// * `timestamp` - The decoding timestamp (DTS) of the frame. /// * `frame` - A [`VideoFrameData`] representing an encoded frame. - pub fn insert(&mut self, timestamp: i64, frame: VideoFrameData) { - if frame.is_key { + pub fn insert(&mut self, timestamp: i64, frame: EncodedVideoFrame) { + if frame.is_keyframe { self.key_frame_keys.push(timestamp); } @@ -136,7 +107,7 @@ impl ShadowCaptureVideoBuffer { self.key_frame_keys.remove(0); } - pub fn get_frames(&self) -> &BTreeMap { + pub fn get_frames(&self) -> &BTreeMap { &self.frames } diff --git a/src/encoders/buffer_tests.rs b/src/encoders/buffer_tests.rs index 7b92320..bd6c9a6 100644 --- a/src/encoders/buffer_tests.rs +++ b/src/encoders/buffer_tests.rs @@ -1,27 +1,37 @@ +use waycap_rs::types::video_frame::EncodedVideoFrame; + use super::buffer::*; +fn new_video_frame(data: Vec, pts: i64, keyframe: bool, dts: i64) -> EncodedVideoFrame { + EncodedVideoFrame { + data, + is_keyframe: keyframe, + pts, + dts, + } +} + #[test] fn test_video_frame_data_getters() { - let video_frame_data = VideoFrameData::new(vec![1], true, 1); - - assert_eq!(*video_frame_data.get_pts(), 1); - assert_eq!(*video_frame_data.get_raw_bytes(), vec![1]); - assert!(*video_frame_data.is_key()); + let video_frame_data = EncodedVideoFrame { + data: vec![1], + pts: 1, + is_keyframe: true, + dts: 1, + }; + + assert_eq!(video_frame_data.pts, 1); + assert_eq!(video_frame_data.data, vec![1]); + assert!(video_frame_data.is_keyframe); } #[test] fn test_video_buffer_no_trim() { - let dummy_frames = [ - VideoFrameData::new(vec![1], true, 1), - VideoFrameData::new(vec![2], false, 3), - VideoFrameData::new(vec![3], true, 6), - ]; - let mut buffer = ShadowCaptureVideoBuffer::new(10); - buffer.insert(1, dummy_frames[0].clone()); - buffer.insert(2, dummy_frames[1].clone()); - buffer.insert(3, dummy_frames[2].clone()); + buffer.insert(1, new_video_frame(vec![1], 1, true, 1)); + buffer.insert(2, new_video_frame(vec![2], 3, false, 3)); + buffer.insert(3, new_video_frame(vec![3], 6, true, 6)); assert_eq!(*buffer.get_last_gop_start().unwrap(), 3); @@ -44,36 +54,27 @@ fn test_video_buffer_no_trim() { fn test_video_buffer_trimming() { let mut buffer = ShadowCaptureVideoBuffer::new(10); - let dummy_frames = [ - VideoFrameData::new(vec![1], true, 0), - VideoFrameData::new(vec![1], false, 3), - VideoFrameData::new(vec![1], false, 5), - VideoFrameData::new(vec![1], true, 7), - VideoFrameData::new(vec![1], false, 9), - VideoFrameData::new(vec![1], false, 11), - // This keyframe (PTS 13) should become the oldest after trimming, - // as it's the first keyframe after the PTS 9 cut-off. - VideoFrameData::new(vec![1], true, 13), - VideoFrameData::new(vec![1], false, 15), - VideoFrameData::new(vec![1], false, 17), - VideoFrameData::new(vec![1], true, 19), - ]; - - for (iter, frame) in dummy_frames.iter().enumerate() { - buffer.insert(iter as i64, frame.clone()); - } + // Insert frames directly without storing in array first + buffer.insert(0, new_video_frame(vec![1], 0, true, 0)); + buffer.insert(1, new_video_frame(vec![1], 3, false, 3)); + buffer.insert(2, new_video_frame(vec![1], 5, false, 5)); + buffer.insert(3, new_video_frame(vec![1], 7, true, 7)); + buffer.insert(4, new_video_frame(vec![1], 9, false, 9)); + buffer.insert(5, new_video_frame(vec![1], 11, false, 11)); + // This keyframe (PTS 13) should become the oldest after trimming, + // as it's the first keyframe after the PTS 9 cut-off. + buffer.insert(6, new_video_frame(vec![1], 13, true, 13)); + buffer.insert(7, new_video_frame(vec![1], 15, false, 15)); + buffer.insert(8, new_video_frame(vec![1], 17, false, 17)); + buffer.insert(9, new_video_frame(vec![1], 19, true, 19)); let oldest = buffer.oldest_pts().unwrap(); assert_eq!(oldest, 13); - let newest = buffer.newest_pts().unwrap(); assert_eq!(newest, 19); - assert_eq!(buffer.get_frames().len(), 4); assert_eq!(*buffer.get_last_gop_start().unwrap(), 9); - buffer.reset(); - assert!(buffer.get_frames().is_empty()); assert!(buffer.newest_pts().is_none()); assert!(buffer.oldest_pts().is_none()); diff --git a/src/encoders/mod.rs b/src/encoders/mod.rs index 11ff5f3..3b6e3c9 100644 --- a/src/encoders/mod.rs +++ b/src/encoders/mod.rs @@ -1,7 +1,3 @@ -pub mod audio_encoder; pub mod buffer; #[cfg(test)] mod buffer_tests; -pub mod nvenc_encoder; -pub mod vaapi_encoder; -pub mod video_encoder; diff --git a/src/encoders/nvenc_encoder.rs b/src/encoders/nvenc_encoder.rs deleted file mode 100644 index dd2d064..0000000 --- a/src/encoders/nvenc_encoder.rs +++ /dev/null @@ -1,209 +0,0 @@ -use ffmpeg_next::{self as ffmpeg, Rational}; -use ringbuf::{ - traits::{Producer, Split}, - HeapCons, HeapProd, HeapRb, -}; - -use crate::{application_config::QualityPreset, RawVideoFrame}; - -use super::{ - buffer::VideoFrameData, - video_encoder::{VideoEncoder, GOP_SIZE}, -}; - -pub struct NvencEncoder { - encoder: Option, - width: u32, - height: u32, - encoder_name: String, - quality: QualityPreset, - encoded_frame_recv: Option>, - encoded_frame_sender: Option>, -} - -impl VideoEncoder for NvencEncoder { - fn new(width: u32, height: u32, quality: QualityPreset) -> anyhow::Result - where - Self: Sized, - { - let encoder_name = "h264_nvenc"; - let encoder = Some(Self::create_encoder(width, height, encoder_name, &quality)?); - let video_ring_buffer = HeapRb::<(i64, VideoFrameData)>::new(120); - let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); - - Ok(Self { - encoder, - width, - height, - encoder_name: encoder_name.to_string(), - quality, - encoded_frame_recv: Some(video_ring_receiver), - encoded_frame_sender: Some(video_ring_sender), - }) - } - - fn process(&mut self, frame: &RawVideoFrame) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - let mut src_frame = ffmpeg::util::frame::video::Video::new( - ffmpeg_next::format::Pixel::BGRA, - encoder.width(), - encoder.height(), - ); - - src_frame.set_pts(Some(*frame.get_timestamp())); - src_frame.data_mut(0).copy_from_slice(frame.get_bytes()); - - encoder.send_frame(&src_frame).unwrap(); - - let mut packet = ffmpeg::codec::packet::Packet::empty(); - if encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - if let Some(ref mut sender) = self.encoded_frame_sender { - if sender - .try_push(( - packet.dts().unwrap_or(0), - VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ), - )) - .is_err() - { - log::error!("Could not send encoded packet to the ringbuf"); - } - } - }; - } - } - Ok(()) - } - - /// Drain the encoder of any remaining frames it is processing - fn drain(&mut self) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - encoder.send_eof()?; - let mut packet = ffmpeg::codec::packet::Packet::empty(); - while encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - if let Some(ref mut sender) = self.encoded_frame_sender { - if sender - .try_push(( - packet.dts().unwrap_or(0), - VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ), - )) - .is_err() - { - log::error!("Could not send encoded packet to the ringbuf"); - } - } - }; - packet = ffmpeg::codec::packet::Packet::empty(); - } - } - Ok(()) - } - - fn drop_encoder(&mut self) { - self.encoder.take(); - } - - fn reset(&mut self) -> anyhow::Result<()> { - self.drop_encoder(); - self.encoder = Some(Self::create_encoder( - self.width, - self.height, - &self.encoder_name, - &self.quality, - )?); - Ok(()) - } - - fn get_encoder(&self) -> &Option { - &self.encoder - } - - fn take_encoded_recv(&mut self) -> Option> { - self.encoded_frame_recv.take() - } -} - -impl NvencEncoder { - fn create_encoder( - width: u32, - height: u32, - encoder: &str, - quality: &QualityPreset, - ) -> Result { - let encoder_codec = - ffmpeg::codec::encoder::find_by_name(encoder).ok_or(ffmpeg::Error::EncoderNotFound)?; - - let mut encoder_ctx = ffmpeg::codec::context::Context::new_with_codec(encoder_codec) - .encoder() - .video()?; - - encoder_ctx.set_width(width); - encoder_ctx.set_height(height); - encoder_ctx.set_format(ffmpeg::format::Pixel::BGRA); - encoder_ctx.set_bit_rate(16_000_000); - - // These should be part of a config file - encoder_ctx.set_time_base(Rational::new(1, 1_000_000)); - - // Needed to insert I-Frames more frequently so we don't lose full seconds - // when popping frames from the front - encoder_ctx.set_gop(GOP_SIZE); - - let encoder_params = ffmpeg::codec::Parameters::new(); - - let opts = Self::get_encoder_params(quality); - - encoder_ctx.set_parameters(encoder_params)?; - let encoder = encoder_ctx.open_with(opts)?; - - Ok(encoder) - } - - fn get_encoder_params(quality: &QualityPreset) -> ffmpeg::Dictionary { - let mut opts = ffmpeg::Dictionary::new(); - opts.set("vsync", "vfr"); - opts.set("rc", "vbr"); - opts.set("tune", "hq"); - match quality { - QualityPreset::Low => { - opts.set("preset", "p2"); - opts.set("cq", "30"); - opts.set("b:v", "20M"); - } - QualityPreset::Medium => { - opts.set("preset", "p4"); - opts.set("cq", "25"); - opts.set("b:v", "40M"); - } - QualityPreset::High => { - opts.set("preset", "p7"); - opts.set("cq", "20"); - opts.set("b:v", "80M"); - } - QualityPreset::Ultra => { - opts.set("preset", "p7"); - opts.set("cq", "15"); - opts.set("b:v", "120M"); - } - } - opts - } -} - -impl Drop for NvencEncoder { - fn drop(&mut self) { - if let Err(e) = self.drain() { - log::error!("Error while draining nvenc encoder during drop: {:?}", e); - } - self.drop_encoder(); - } -} diff --git a/src/encoders/vaapi_encoder.rs b/src/encoders/vaapi_encoder.rs deleted file mode 100644 index f20c5e7..0000000 --- a/src/encoders/vaapi_encoder.rs +++ /dev/null @@ -1,476 +0,0 @@ -use std::{cell::RefCell, ffi::CString, ptr::null_mut}; - -use crate::application_config::QualityPreset; -use anyhow::anyhow; -use drm_fourcc::DrmFourcc; -use ffmpeg_next::{ - self as ffmpeg, - ffi::{ - av_buffer_create, av_buffer_default_free, av_buffer_ref, av_buffer_unref, - av_hwdevice_ctx_create, av_hwframe_ctx_alloc, av_hwframe_ctx_init, av_hwframe_get_buffer, - av_hwframe_transfer_data, AVBufferRef, AVDRMFrameDescriptor, AVHWDeviceContext, - AVHWFramesContext, AVPixelFormat, - }, - software::scaling::{Context as Scaler, Flags}, - Rational, -}; -use log::error; -use ringbuf::{ - traits::{Producer, Split}, - HeapCons, HeapProd, HeapRb, -}; - -use super::{ - buffer::VideoFrameData, - video_encoder::{VideoEncoder, GOP_SIZE}, -}; - -thread_local! { - static SCALER: RefCell> = const { RefCell::new(None) }; -} - -pub struct VaapiEncoder { - encoder: Option, - width: u32, - height: u32, - encoder_name: String, - quality: QualityPreset, - encoded_frame_recv: Option>, - encoded_frame_sender: Option>, - filter_graph: Option, -} - -impl VideoEncoder for VaapiEncoder { - fn new(width: u32, height: u32, quality: QualityPreset) -> anyhow::Result - where - Self: Sized, - { - let encoder_name = "h264_vaapi"; - let encoder = Self::create_encoder(width, height, encoder_name, &quality)?; - let video_ring_buffer = HeapRb::<(i64, VideoFrameData)>::new(120); - let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); - let filter_graph = Some(Self::create_filter_graph(&encoder, width, height)?); - - Ok(Self { - encoder: Some(encoder), - width, - height, - encoder_name: encoder_name.to_string(), - quality, - encoded_frame_recv: Some(video_ring_receiver), - encoded_frame_sender: Some(video_ring_sender), - filter_graph, - }) - } - - fn process(&mut self, frame: &crate::RawVideoFrame) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - if let Some(fd) = frame.dmabuf_fd { - log::debug!( - "DMA Frame with fd: {}, size: {}, offset: {}, stride: {}", - fd, - frame.size, - frame.offset, - frame.stride - ); - - let mut drm_frame = ffmpeg::util::frame::Video::new( - ffmpeg_next::format::Pixel::DRM_PRIME, - encoder.width(), - encoder.height(), - ); - unsafe { - // Create DRM descriptor that points to the DMA buffer - let drm_desc = - Box::into_raw(Box::new(std::mem::zeroed::())); - - (*drm_desc).nb_objects = 1; - (*drm_desc).objects[0].fd = fd; - (*drm_desc).objects[0].size = 0; - (*drm_desc).objects[0].format_modifier = 0; - - (*drm_desc).nb_layers = 1; - (*drm_desc).layers[0].format = DrmFourcc::Argb8888 as u32; - (*drm_desc).layers[0].nb_planes = 1; - (*drm_desc).layers[0].planes[0].object_index = 0; - (*drm_desc).layers[0].planes[0].offset = frame.offset as isize; - (*drm_desc).layers[0].planes[0].pitch = frame.stride as isize; - - // Attach descriptor to frame - (*drm_frame.as_mut_ptr()).data[0] = drm_desc as *mut u8; - (*drm_frame.as_mut_ptr()).buf[0] = av_buffer_create( - drm_desc as *mut u8, - std::mem::size_of::(), - Some(av_buffer_default_free), - null_mut(), - 0, - ); - - (*drm_frame.as_mut_ptr()).hw_frames_ctx = - av_buffer_ref((*encoder.as_ptr()).hw_frames_ctx); - } - - drm_frame.set_pts(Some(*frame.get_timestamp())); - self.filter_graph - .as_mut() - .unwrap() - .get("in") - .unwrap() - .source() - .add(&drm_frame) - .unwrap(); - - let mut filtered = ffmpeg::util::frame::Video::empty(); - if self - .filter_graph - .as_mut() - .unwrap() - .get("out") - .unwrap() - .sink() - .frame(&mut filtered) - .is_ok() - { - encoder.send_frame(&filtered)?; - } - } else { - // Convert BGRA to NV12 then transfer it to a hw frame and send it to the - // encoder - // - // TODO: deprecate this path? - SCALER.with(|scaler_cell| { - let mut scaler = scaler_cell.borrow_mut(); - if scaler.is_none() { - *scaler = Some( - Scaler::get( - ffmpeg::format::Pixel::BGRA, - encoder.width(), - encoder.height(), - ffmpeg::format::Pixel::NV12, - encoder.width(), - encoder.height(), - Flags::BILINEAR, - ) - .unwrap(), - ); - } - - let mut src_frame = ffmpeg::util::frame::video::Video::new( - ffmpeg_next::format::Pixel::BGRA, - encoder.width(), - encoder.height(), - ); - src_frame.data_mut(0).copy_from_slice(frame.get_bytes()); - - let mut dst_frame = ffmpeg::util::frame::Video::new( - ffmpeg::format::Pixel::NV12, - encoder.width(), - encoder.height(), - ); - - scaler - .as_mut() - .unwrap() - .run(&src_frame, &mut dst_frame) - .unwrap(); - - let mut vaapi_frame = ffmpeg::util::frame::video::Video::new( - encoder.format(), - encoder.width(), - encoder.height(), - ); - - unsafe { - let err = av_hwframe_get_buffer( - (*encoder.as_ptr()).hw_frames_ctx, - vaapi_frame.as_mut_ptr(), - 0, - ); - - if err < 0 { - error!("Error getting the hw frame buffer: {:?}", err); - } - - let err = av_hwframe_transfer_data( - vaapi_frame.as_mut_ptr(), - dst_frame.as_ptr(), - 0, - ); - - if err < 0 { - error!("Error transferring the frame data to hw frame: {:?}", err); - } - } - - vaapi_frame.set_pts(Some(*frame.get_timestamp())); - - encoder.send_frame(&vaapi_frame).unwrap(); - }); - } - - let mut packet = ffmpeg::codec::packet::Packet::empty(); - if encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - if let Some(ref mut sender) = self.encoded_frame_sender { - if sender - .try_push(( - packet.dts().unwrap_or(0), - VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ), - )) - .is_err() - { - log::error!("Could not send encoded packet to the ringbuf"); - } - } - }; - } - } - Ok(()) - } - - /// Drain the filter graph and encoder of any remaining frames it is processing - fn drain(&mut self) -> Result<(), ffmpeg::Error> { - if let Some(ref mut encoder) = self.encoder { - // Drain the filter graph - let mut filtered = ffmpeg::util::frame::Video::empty(); - while self - .filter_graph - .as_mut() - .unwrap() - .get("out") - .unwrap() - .sink() - .frame(&mut filtered) - .is_ok() - { - encoder.send_frame(&filtered)?; - } - - // Drain encoder - encoder.send_eof()?; - let mut packet = ffmpeg::codec::packet::Packet::empty(); - while encoder.receive_packet(&mut packet).is_ok() { - if let Some(data) = packet.data() { - if let Some(ref mut sender) = self.encoded_frame_sender { - if sender - .try_push(( - packet.dts().unwrap_or(0), - VideoFrameData::new( - data.to_vec(), - packet.is_key(), - packet.pts().unwrap_or(0), - ), - )) - .is_err() - { - log::error!("Could not send encoded packet to the ringbuf"); - } - } - }; - packet = ffmpeg::codec::packet::Packet::empty(); - } - } - Ok(()) - } - - fn reset(&mut self) -> anyhow::Result<()> { - self.drop_encoder(); - let new_encoder = - Self::create_encoder(self.width, self.height, &self.encoder_name, &self.quality)?; - - let new_filter_graph = Self::create_filter_graph(&new_encoder, self.width, self.height)?; - - self.encoder = Some(new_encoder); - self.filter_graph = Some(new_filter_graph); - Ok(()) - } - - fn get_encoder(&self) -> &Option { - &self.encoder - } - - fn drop_encoder(&mut self) { - self.encoder.take(); - self.filter_graph.take(); - } - - fn take_encoded_recv(&mut self) -> Option> { - self.encoded_frame_recv.take() - } -} - -impl VaapiEncoder { - fn create_encoder( - width: u32, - height: u32, - encoder: &str, - quality: &QualityPreset, - ) -> anyhow::Result { - let encoder_codec = - ffmpeg::codec::encoder::find_by_name(encoder).ok_or(ffmpeg::Error::EncoderNotFound)?; - - let mut encoder_ctx = ffmpeg::codec::context::Context::new_with_codec(encoder_codec) - .encoder() - .video()?; - - encoder_ctx.set_width(width); - encoder_ctx.set_height(height); - encoder_ctx.set_format(ffmpeg::format::Pixel::VAAPI); - // Configuration inspiration from - // https://git.dec05eba.com/gpu-screen-recorder/tree/src/capture/xcomposite_drm.c?id=8cbdb596ebf79587a432ed40583630b6cd39ed88 - let mut vaapi_device = Self::create_vaapi_device()?; - let mut frame_ctx = Self::create_vaapi_frame_ctx(vaapi_device)?; - - unsafe { - let hw_frame_context = &mut *((*frame_ctx).data as *mut AVHWFramesContext); - hw_frame_context.width = width as i32; - hw_frame_context.height = height as i32; - hw_frame_context.sw_format = AVPixelFormat::AV_PIX_FMT_NV12; - hw_frame_context.format = encoder_ctx.format().into(); - hw_frame_context.device_ref = av_buffer_ref(vaapi_device); - hw_frame_context.device_ctx = (*vaapi_device).data as *mut AVHWDeviceContext; - // Decides buffer size if we do not pop frame from the encoder we cannot - // enqueue more than these many -- maybe adjust but for now setting it to - // doble target fps - hw_frame_context.initial_pool_size = 120; - - let err = av_hwframe_ctx_init(frame_ctx); - if err < 0 { - return Err(anyhow!( - "Error trying to initialize hw frame context: {:?}", - err - )); - } - - (*encoder_ctx.as_mut_ptr()).hw_device_ctx = av_buffer_ref(vaapi_device); - (*encoder_ctx.as_mut_ptr()).hw_frames_ctx = av_buffer_ref(frame_ctx); - - av_buffer_unref(&mut vaapi_device); - av_buffer_unref(&mut frame_ctx); - } - - // These should be part of a config file - encoder_ctx.set_time_base(Rational::new(1, 1_000_000)); - - // Needed to insert I-Frames more frequently so we don't lose full seconds - // when popping frames from the front - encoder_ctx.set_gop(GOP_SIZE); - - let encoder_params = ffmpeg::codec::Parameters::new(); - - let opts = Self::get_encoder_params(quality); - - encoder_ctx.set_parameters(encoder_params)?; - let encoder = encoder_ctx.open_with(opts)?; - Ok(encoder) - } - - fn create_vaapi_frame_ctx(device: *mut AVBufferRef) -> anyhow::Result<*mut AVBufferRef> { - unsafe { - let frame = av_hwframe_ctx_alloc(device); - - if frame.is_null() { - return Err(anyhow!("Could not create vaapi frame context")); - } - - Ok(frame) - } - } - - fn create_vaapi_device() -> anyhow::Result<*mut AVBufferRef> { - unsafe { - let mut device: *mut AVBufferRef = null_mut(); - let device_path = CString::new("/dev/dri/renderD128").unwrap(); - let ret = av_hwdevice_ctx_create( - &mut device, - ffmpeg_next::ffi::AVHWDeviceType::AV_HWDEVICE_TYPE_VAAPI, - device_path.as_ptr(), - null_mut(), - 0, - ); - if ret < 0 { - return Err(anyhow!("Failed to create VAAPI device: Error code {}", ret)); - } - - Ok(device) - } - } - - fn get_encoder_params(quality: &QualityPreset) -> ffmpeg::Dictionary { - let mut opts = ffmpeg::Dictionary::new(); - opts.set("vsync", "vfr"); - opts.set("rc", "VBR"); - match quality { - QualityPreset::Low => { - opts.set("qp", "30"); - } - QualityPreset::Medium => { - opts.set("qp", "25"); - } - QualityPreset::High => { - opts.set("qp", "20"); - } - QualityPreset::Ultra => { - opts.set("qp", "15"); - } - } - opts - } - - fn create_filter_graph( - encoder: &ffmpeg::codec::encoder::Video, - width: u32, - height: u32, - ) -> anyhow::Result { - let mut graph = ffmpeg::filter::Graph::new(); - - let args = format!( - "video_size={}x{}:pix_fmt=bgra:time_base=1/1000000", - width, height - ); - - let mut input = graph.add(&ffmpeg::filter::find("buffer").unwrap(), "in", &args)?; - - let mut hwmap = graph.add( - &ffmpeg::filter::find("hwmap").unwrap(), - "hwmap", - "mode=read+write:derive_device=vaapi", - )?; - - let scale_args = format!("w={}:h={}:format=nv12:out_range=tv", width, height); - let mut scale = graph.add( - &ffmpeg::filter::find("scale_vaapi").unwrap(), - "scale", - &scale_args, - )?; - - let mut out = graph.add(&ffmpeg::filter::find("buffersink").unwrap(), "out", "")?; - unsafe { - let dev = (*encoder.as_ptr()).hw_device_ctx; - - (*hwmap.as_mut_ptr()).hw_device_ctx = av_buffer_ref(dev); - } - - input.link(0, &mut hwmap, 0); - hwmap.link(0, &mut scale, 0); - scale.link(0, &mut out, 0); - - graph.validate()?; - log::trace!("Graph\n{}", graph.dump()); - - Ok(graph) - } -} - -impl Drop for VaapiEncoder { - fn drop(&mut self) { - if let Err(e) = self.drain() { - log::error!("Error while draining vaapi encoder during drop: {:?}", e); - } - self.drop_encoder(); - } -} diff --git a/src/encoders/video_encoder.rs b/src/encoders/video_encoder.rs deleted file mode 100644 index c3eaaed..0000000 --- a/src/encoders/video_encoder.rs +++ /dev/null @@ -1,22 +0,0 @@ -use anyhow::Result; -use ffmpeg_next::{self as ffmpeg}; -use ringbuf::HeapCons; - -use crate::{application_config::QualityPreset, RawVideoFrame}; - -use super::buffer::VideoFrameData; - -pub const ONE_MICROS: usize = 1_000_000; -pub const GOP_SIZE: u32 = 30; - -pub trait VideoEncoder: Send { - fn new(width: u32, height: u32, quality: QualityPreset) -> Result - where - Self: Sized; - fn process(&mut self, frame: &RawVideoFrame) -> Result<(), ffmpeg::Error>; - fn drain(&mut self) -> Result<(), ffmpeg::Error>; - fn reset(&mut self) -> Result<()>; - fn drop_encoder(&mut self); - fn get_encoder(&self) -> &Option; - fn take_encoded_recv(&mut self) -> Option>; -} diff --git a/src/main.rs b/src/main.rs index 7a7a26f..8053d0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,63 +11,19 @@ mod application_config; mod dbus; mod encoders; mod modes; -mod pw_capture; mod waycap; -use std::os::fd::RawFd; - use anyhow::{Context, Error, Result}; use application_config::load_or_create_config; -use encoders::{ - audio_encoder::{AudioEncoderImpl, FfmpegAudioEncoder}, - buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer}, - video_encoder::ONE_MICROS, -}; +use encoders::buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer}; use ffmpeg_next::{self as ffmpeg}; use modes::shadow_cap::ShadowCapMode; use pipewire::{self as pw}; use waycap::WayCap; +use waycap_rs::Capture; const VIDEO_STREAM: usize = 0; const AUDIO_STREAM: usize = 1; -const TARGET_FPS: usize = 60; -const FRAME_INTERVAL: u64 = (ONE_MICROS / TARGET_FPS) as u64; - -#[derive(Debug)] -pub struct RawAudioFrame { - samples: Vec, - timestamp: i64, -} - -impl RawAudioFrame { - pub fn get_samples_mut(&mut self) -> &mut Vec { - &mut self.samples - } - - pub fn get_samples(&mut self) -> &Vec { - &self.samples - } -} - -#[derive(Debug)] -pub struct RawVideoFrame { - bytes: Vec, - timestamp: i64, - dmabuf_fd: Option, - stride: i32, - offset: u32, - size: u32, -} - -impl RawVideoFrame { - pub fn get_bytes(&self) -> &Vec { - &self.bytes - } - - pub fn get_timestamp(&self) -> &i64 { - &self.timestamp - } -} pub struct Terminate; @@ -89,27 +45,28 @@ async fn main() -> Result<(), Error> { fn save_buffer( filename: &str, video_buffer: &ShadowCaptureVideoBuffer, - video_encoder: &ffmpeg::codec::encoder::Video, audio_buffer: &ShadowCaptureAudioBuffer, - audio_encoder: &FfmpegAudioEncoder, + capture: &Capture, ) -> Result<()> { let mut output = ffmpeg::format::output(&filename)?; - let video_codec = video_encoder - .codec() - .context("Could not find expected video codec")?; - - let mut video_stream = output.add_stream(video_codec)?; - video_stream.set_time_base(video_encoder.time_base()); - video_stream.set_parameters(video_encoder); - - let audio_codec = audio_encoder - .codec() - .context("Could not find expected audio codec")?; - - let mut audio_stream = output.add_stream(audio_codec)?; - audio_stream.set_time_base(audio_encoder.time_base()); - audio_stream.set_parameters(audio_encoder.as_ref()); + capture.with_video_encoder(|enc| { + if let Some(encoder) = enc { + let video_codec = encoder.codec().unwrap(); + let mut video_stream = output.add_stream(video_codec).unwrap(); + video_stream.set_time_base(encoder.time_base()); + video_stream.set_parameters(encoder); + } + }); + + capture.with_audio_encoder(|enc| { + if let Some(encoder) = enc { + let audio_codec = encoder.codec().unwrap(); + let mut audio_stream = output.add_stream(audio_codec).unwrap(); + audio_stream.set_time_base(encoder.time_base()); + audio_stream.set_parameters(encoder); + } + }); output.write_header()?; @@ -127,24 +84,24 @@ fn save_buffer( for (dts, frame_data) in video_buffer.get_frames().range(..=last_keyframe) { // If video starts before audio try and catch up as much as possible // (At worst a 20ms gap) - if &audio_capture_timestamps[0] > frame_data.get_pts() && !*frame_data.is_key() { + if audio_capture_timestamps[0] > frame_data.pts && !frame_data.is_keyframe { log::debug!( "Skipping Video Frame Captured at: {:?}, DTS: {:?}", - frame_data.get_pts(), + frame_data.pts, dts, ); continue; } if !first_offset { - first_pts_offset = *frame_data.get_pts(); + first_pts_offset = frame_data.pts; first_offset = true; } - let pts_offset = frame_data.get_pts() - first_pts_offset; + let pts_offset = frame_data.pts - first_pts_offset; let dts_offset = dts - first_pts_offset; - let mut packet = ffmpeg::codec::packet::Packet::copy(frame_data.get_raw_bytes()); + let mut packet = ffmpeg::codec::packet::Packet::copy(&frame_data.data); packet.set_pts(Some(pts_offset)); packet.set_dts(Some(dts_offset)); @@ -153,7 +110,7 @@ fn save_buffer( packet .write_interleaved(&mut output) .expect("Could not write video interleaved"); - newest_video_pts = *frame_data.get_pts(); + newest_video_pts = frame_data.pts; } log::debug!("VIDEO SAVE END"); diff --git a/src/modes/shadow_cap.rs b/src/modes/shadow_cap.rs index 9702f24..daa0cb1 100644 --- a/src/modes/shadow_cap.rs +++ b/src/modes/shadow_cap.rs @@ -1,30 +1,21 @@ use std::{ sync::{atomic::AtomicBool, Arc}, - time::{Duration, Instant, SystemTime}, + time::Duration, }; -use anyhow::Context; use ringbuf::{consumer::Consumer, HeapCons}; use tokio::sync::Mutex; +use waycap_rs::types::{audio_frame::EncodedAudioFrame, video_frame::EncodedVideoFrame}; use crate::{ app_context::AppContext, - application_config, - encoders::{ - audio_encoder::{AudioEncoder, EncodedAudioFrameData, FfmpegAudioEncoder}, - buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer, VideoFrameData}, - nvenc_encoder::NvencEncoder, - vaapi_encoder::VaapiEncoder, - video_encoder::{VideoEncoder, ONE_MICROS}, - }, - save_buffer, RawAudioFrame, RawVideoFrame, FRAME_INTERVAL, + encoders::buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer}, + save_buffer, }; use super::AppMode; pub struct ShadowCapMode { - audio_encoder: Option>>>, - video_encoder: Option>>, video_buffer: Arc>, audio_buffer: Arc>, } @@ -32,66 +23,24 @@ pub struct ShadowCapMode { impl AppMode for ShadowCapMode { async fn init(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> { log::debug!("Initializing context for Shadow Capture Mode"); - // Video - let video_encoder: Arc> = - match ctx.config.encoder { - application_config::EncoderToUse::H264Nvenc => Arc::new(Mutex::new( - NvencEncoder::new(ctx.width, ctx.height, ctx.config.quality)?, - )), - application_config::EncoderToUse::H264Vaapi => Arc::new(Mutex::new( - VaapiEncoder::new(ctx.width, ctx.height, ctx.config.quality)?, - )), - }; - let video_owned_recv = ctx - .video_ring_receiver - .take() - .context("Could not take ownership of the video ring buffer")?; + let video_owned_recv = ctx.capture.take_video_receiver(); - let video_worker = Self::create_video_worker( - Arc::clone(&ctx.stop), - video_owned_recv, - Arc::clone(&video_encoder), - ); - ctx.join_handles.push(video_worker); - - let recv = { video_encoder.lock().await.take_encoded_recv() } - .context("Could not take encoded frame recv")?; let shadow_worker = Self::create_shadow_video_worker( - recv, + video_owned_recv, Arc::clone(&self.video_buffer), Arc::clone(&ctx.stop), ); ctx.join_handles.push(shadow_worker); - self.video_encoder = Some(video_encoder); - - // Audio - let audio_encoder = Arc::new(Mutex::new(AudioEncoder::new_with_encoder( - FfmpegAudioEncoder::new_opus, - )?)); - - let audio_owned_recv = ctx - .audio_ring_receiver - .take() - .context("Could not take ownership of the audio ring buffer")?; - - let audio_worker = Self::create_audio_worker( - Arc::clone(&ctx.stop), - audio_owned_recv, - Arc::clone(&audio_encoder), - ); - ctx.join_handles.push(audio_worker); - let aud_recv = { audio_encoder.lock().await.take_encoded_recv() } - .context("Could not take ownership of encoded audio frame recv")?; + let audio_owned_recv = ctx.capture.take_audio_receiver()?; let audio_shadow_worker = Self::create_shadow_audio_worker( - aud_recv, + audio_owned_recv, Arc::clone(&self.audio_buffer), Arc::clone(&ctx.stop), ); ctx.join_handles.push(audio_shadow_worker); - self.audio_encoder = Some(audio_encoder); log::debug!("Successfully initialized Shadow Capture Mode"); Ok(()) @@ -99,47 +48,19 @@ impl AppMode for ShadowCapMode { async fn on_save(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> { ctx.saving.store(true, std::sync::atomic::Ordering::Release); - if let Some(video_encoder) = &self.video_encoder { - if let Some(audio_encoder) = &self.audio_encoder { - let (mut video_lock, mut audio_lock, mut video_buffer, mut audio_buffer) = tokio::join!( - video_encoder.lock(), - audio_encoder.lock(), - self.video_buffer.lock(), - self.audio_buffer.lock(), - ); - - // Drain both encoders of any remaining frames being processed - video_lock.drain()?; - audio_lock.drain()?; - - let filename = format!("clip_{}.mp4", chrono::Local::now().timestamp()); - let video_encoder = video_lock - .get_encoder() - .as_ref() - .context("Could not get video encoder")?; - - let audio_encoder = audio_lock - .get_encoder() - .as_ref() - .context("Could not get audio encoder")?; - - save_buffer( - &filename, - &video_buffer, - video_encoder, - &audio_buffer, - audio_encoder, - )?; + ctx.capture.finish()?; + let (mut video_buffer, mut audio_buffer) = + tokio::join!(self.video_buffer.lock(), self.audio_buffer.lock()); + let filename = format!("clip_{}.mp4", chrono::Local::now().timestamp()); - video_lock.reset()?; - video_buffer.reset(); - audio_buffer.reset(); - audio_lock.reset_encoder(FfmpegAudioEncoder::new_opus)?; - } - } + save_buffer(&filename, &video_buffer, &audio_buffer, &ctx.capture)?; + video_buffer.reset(); + audio_buffer.reset(); + ctx.capture.reset()?; ctx.saving .store(false, std::sync::atomic::Ordering::Release); + ctx.capture.start()?; log::debug!("Done saving!"); Ok(()) } @@ -149,10 +70,6 @@ impl AppMode for ShadowCapMode { // Stop processing new frames and exit worker threads ctx.saving.store(true, std::sync::atomic::Ordering::Release); ctx.stop.store(true, std::sync::atomic::Ordering::Release); - - // Drop encoders -- drop impl should clean up any remaining frames - self.audio_encoder.take(); - self.video_encoder.take(); Ok(()) } } @@ -164,10 +81,8 @@ impl ShadowCapMode { "Max seconds is above 24 hours. This is too much time for shadow capture" ); - let actual_max = max_seconds * ONE_MICROS as u32; + let actual_max = max_seconds * 1_000_000_u32; Ok(Self { - audio_encoder: None, - video_encoder: None, video_buffer: Arc::new(Mutex::new(ShadowCaptureVideoBuffer::new( actual_max as usize, ))), @@ -177,103 +92,8 @@ impl ShadowCapMode { }) } - // These look to be generic between modes and the only real thing that changes is what we do - // once we get an encoded frame back for modes like recording or screen sharing over network so - // these can probably go in a more common place one i start implementing those - - /// Creates a worker thread that polls a `HeapCons>` and sends anything on it to - /// the its encoder for processing - /// # Arguments - /// * `stop_audio` - Atomic bool for telling the thread to exit - /// * `mut audio_receiver` - The ring buf to poll - /// * `audio_encoder` - The audio encoder which will process the frames - fn create_audio_worker( - stop_audio: Arc, - mut audio_receiver: HeapCons, - audio_encoder: Arc>>, - ) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || loop { - if stop_audio.load(std::sync::atomic::Ordering::Acquire) { - break; - } - - while let Some(mut raw_frame) = audio_receiver.try_pop() { - let now = SystemTime::now(); - if let Err(e) = audio_encoder.blocking_lock().process(&mut raw_frame) { - log::error!( - "Error processing audio frame at {:?}: {:?}", - raw_frame.timestamp, - e - ); - } - log::trace!( - "Took {:?} to process this audio frame at {:?}", - now.elapsed(), - raw_frame.timestamp - ); - } - std::thread::sleep(Duration::from_nanos(100)); - }) - } - - /// Creates a worker thread that polls a `HeapCons>` and sends anything on it to - /// the its encoder for processing - /// # Arguments - /// * `stop_video` - Atomic bool for telling the thread to exit - /// * `mut video_receiver` - The ring buf to poll - /// * `video_encoder` - The video encoder which will process the frames - fn create_video_worker( - stop_video: Arc, - mut video_receiver: HeapCons, - video_encoder: Arc>, - ) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || { - let mut last_timestamp: u64 = 0; - let mut total_time: u128 = 0; - let mut frame_count: u64 = 0; - loop { - if stop_video.load(std::sync::atomic::Ordering::Acquire) { - break; - } - - while let Some(raw_frame) = video_receiver.try_pop() { - let now = Instant::now(); - let current_time = *raw_frame.get_timestamp() as u64; - - // Throttle FPS - if current_time < last_timestamp + FRAME_INTERVAL { - continue; - } - - last_timestamp = current_time; - if let Err(e) = video_encoder.blocking_lock().process(&raw_frame) { - log::error!( - "Error processing video frame at {:?}: {:?}", - raw_frame.timestamp, - e - ); - } - - let elapsed = now.elapsed().as_nanos(); - total_time += elapsed; - frame_count += 1; - - let average_time = total_time / frame_count as u128; - - log::trace!( - "Took {:?} to process this video frame. Average time: {:.3}ms, Frame Count: {:?}", - elapsed, - average_time / 1_000_000, - frame_count, - ); - } - std::thread::sleep(Duration::from_nanos(100)); - } - }) - } - fn create_shadow_video_worker( - mut recv: HeapCons<(i64, VideoFrameData)>, + mut recv: HeapCons, buffer: Arc>, stop: Arc, ) -> std::thread::JoinHandle<()> { @@ -282,8 +102,10 @@ impl ShadowCapMode { break; } - while let Some((dts, encoded_frame)) = recv.try_pop() { - buffer.blocking_lock().insert(dts, encoded_frame); + while let Some(encoded_frame) = recv.try_pop() { + buffer + .blocking_lock() + .insert(encoded_frame.dts, encoded_frame); } std::thread::sleep(Duration::from_nanos(100)); @@ -291,7 +113,7 @@ impl ShadowCapMode { } fn create_shadow_audio_worker( - mut recv: HeapCons, + mut recv: HeapCons, audio_buffer: Arc>, stop: Arc, ) -> std::thread::JoinHandle<()> { diff --git a/src/pw_capture/audio_stream.rs b/src/pw_capture/audio_stream.rs deleted file mode 100644 index b27e572..0000000 --- a/src/pw_capture/audio_stream.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::{ - process::Command, - sync::{atomic::AtomicBool, Arc}, - time::SystemTime, -}; - -use anyhow::Result; -use pipewire::{ - self as pw, - context::Context, - main_loop::MainLoop, - properties::properties, - spa::{ - self, - param::format::{MediaSubtype, MediaType}, - pod::Pod, - utils::Direction, - }, - stream::{StreamFlags, StreamState}, -}; -use ringbuf::{traits::Producer, HeapProd}; - -use crate::{RawAudioFrame, Terminate}; - -#[derive(Clone, Copy, Default)] -struct UserData { - audio_format: spa::param::audio::AudioInfoRaw, -} - -pub struct AudioCapture { - video_ready: Arc, - audio_ready: Arc, -} - -impl AudioCapture { - pub fn new(video_ready: Arc, audio_ready: Arc) -> Self { - Self { - video_ready, - audio_ready, - } - } - - pub fn run( - &self, - mut ringbuf_producer: HeapProd, - start_time: SystemTime, - termination_recv: pw::channel::Receiver, - saving: Arc, - ) -> Result<(), pw::Error> { - let pw_loop = MainLoop::new(None)?; - let terminate_loop = pw_loop.clone(); - - let _recv = termination_recv.attach(pw_loop.loop_(), move |_| { - log::debug!("Terminating audio capture loop"); - terminate_loop.quit(); - }); - - let pw_context = Context::new(&pw_loop)?; - let audio_core = pw_context.connect(None)?; - - let _audio_core_listener = audio_core - .add_listener_local() - .info(|i| log::info!("AUDIO CORE:\n{0:#?}", i)) - .error(|e, f, g, h| log::error!("{0},{1},{2},{3}", e, f, g, h)) - .done(|d, _| log::info!("DONE: {0}", d)) - .register(); - - let data = UserData::default(); - - // Audio Stream - let audio_stream = pw::stream::Stream::new( - &audio_core, - "waycap-audio", - properties! { - *pw::keys::MEDIA_TYPE => "Audio", - *pw::keys::MEDIA_CATEGORY => "Capture", - *pw::keys::MEDIA_ROLE => "Music", - *pw::keys::NODE_LATENCY => "1024/48000", - }, - )?; - - let video_ready_clone = Arc::clone(&self.video_ready); - let audio_ready_clone = Arc::clone(&self.audio_ready); - let _audio_stream_shared_data_listener = audio_stream - .add_local_listener_with_user_data(data) - .state_changed(move |_, _, old, new| { - log::debug!("Audio Stream State Changed: {0:?} -> {1:?}", old, new); - audio_ready_clone.store( - new == StreamState::Streaming, - std::sync::atomic::Ordering::Release, - ); - }) - .param_changed(|_, udata, id, param| { - let Some(param) = param else { - return; - }; - if id != pw::spa::param::ParamType::Format.as_raw() { - return; - } - - let (media_type, media_subtype) = - match pw::spa::param::format_utils::parse_format(param) { - Ok(v) => v, - Err(_) => return, - }; - - // only accept raw audio - if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { - return; - } - - udata - .audio_format - .parse(param) - .expect("Failed to parse audio params"); - - log::debug!( - "Capturing Rate:{} channels:{}, format: {}", - udata.audio_format.rate(), - udata.audio_format.channels(), - udata.audio_format.format().as_raw() - ); - }) - .process(move |stream, _| match stream.dequeue_buffer() { - None => log::debug!("Out of audio buffers"), - Some(mut buffer) => { - // Wait until video is streaming before we try to process - if !video_ready_clone.load(std::sync::atomic::Ordering::Acquire) - || saving.load(std::sync::atomic::Ordering::Acquire) - { - return; - } - - let datas = buffer.datas_mut(); - if datas.is_empty() { - return; - } - - let time_us = if let Ok(elapsed) = start_time.elapsed() { - elapsed.as_micros() as i64 - } else { - 0 - }; - - let data = &mut datas[0]; - let n_samples = data.chunk().size() / (std::mem::size_of::()) as u32; - - if let Some(samples) = data.data() { - let samples_f32: &[f32] = bytemuck::cast_slice(samples); - let audio_samples = &samples_f32[..n_samples as usize]; - if let Err(frame) = ringbuf_producer.try_push(RawAudioFrame { - samples: audio_samples.to_vec(), - timestamp: time_us, - }) { - log::error!( - "Could not add audio frame: {:?}. Is the buffer full?", - frame - ); - } - } - } - }) - .register()?; - - let audio_spa_obj = pw::spa::pod::object! { - pw::spa::utils::SpaTypes::ObjectParamFormat, - pw::spa::param::ParamType::EnumFormat, - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::MediaType, - Id, - pw::spa::param::format::MediaType::Audio - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::MediaSubtype, - Id, - pw::spa::param::format::MediaSubtype::Raw - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::AudioFormat, - Id, - pw::spa::param::audio::AudioFormat::F32LE - ) - }; - - let audio_spa_values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( - std::io::Cursor::new(Vec::new()), - &pw::spa::pod::Value::Object(audio_spa_obj), - ) - .unwrap() - .0 - .into_inner(); - - let mut audio_params = [Pod::from_bytes(&audio_spa_values).unwrap()]; - - let sink_id_to_use = get_default_sink_node_id(); - - log::debug!("Default sink id: {:?}", sink_id_to_use); - audio_stream.connect( - Direction::Input, - sink_id_to_use, - StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, - &mut audio_params, - )?; - - log::debug!("Audio Stream: {:?}", audio_stream); - - pw_loop.run(); - Ok(()) - } -} - -// Theres gotta be a less goofy way to do this -fn get_default_sink_node_id() -> Option { - let output = Command::new("sh") - .arg("-c") - .arg(r#"pactl list sinks | awk -v sink="$(pactl info | grep 'Default Sink' | cut -d' ' -f3)" '$0 ~ "Name: " sink { found=1 } found && /object.id/ { print $NF; exit }'"#) - .output() - .expect("Failed to execute command"); - - let stdout = String::from_utf8_lossy(&output.stdout); - - let cleaned = stdout.replace('"', ""); - - cleaned.trim().parse::().ok() -} diff --git a/src/pw_capture/mod.rs b/src/pw_capture/mod.rs deleted file mode 100644 index eb2fcb8..0000000 --- a/src/pw_capture/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod audio_stream; -pub mod video_stream; diff --git a/src/pw_capture/video_stream.rs b/src/pw_capture/video_stream.rs deleted file mode 100644 index 664a3f4..0000000 --- a/src/pw_capture/video_stream.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::{ - os::fd::{FromRawFd, OwnedFd, RawFd}, - sync::{atomic::AtomicBool, Arc}, - time::SystemTime, -}; - -use pipewire::{ - self as pw, - context::Context, - main_loop::MainLoop, - spa::{ - buffer::{Data, DataType}, - utils::Direction, - }, - stream::{Stream, StreamFlags, StreamState}, -}; -use pw::{properties::properties, spa}; - -use ringbuf::{traits::Producer, HeapProd}; -use spa::pod::Pod; -use tokio::sync::mpsc; - -use crate::{RawVideoFrame, Terminate}; - -pub struct VideoCapture { - video_ready: Arc, - audio_ready: Arc, -} - -#[derive(Clone, Copy, Default)] -struct UserData { - video_format: spa::param::video::VideoInfoRaw, -} - -impl VideoCapture { - pub fn new(video_ready: Arc, audio_ready: Arc) -> Self { - Self { - video_ready, - audio_ready, - } - } - - #[allow(clippy::too_many_arguments)] - pub fn run( - &self, - pipewire_fd: RawFd, - stream_node: u32, - mut ringbuf_producer: HeapProd, - termination_recv: pw::channel::Receiver, - saving: Arc, - start_time: SystemTime, - resolution_negotiation_channel: mpsc::Sender<(u32, u32)>, - ) -> Result<(), pipewire::Error> { - let pw_loop = MainLoop::new(None)?; - let terminate_loop = pw_loop.clone(); - - let _recv = termination_recv.attach(pw_loop.loop_(), move |_| { - log::debug!("Terminating video capture loop"); - terminate_loop.quit(); - }); - - let pw_context = Context::new(&pw_loop)?; - let core = pw_context.connect_fd(unsafe { OwnedFd::from_raw_fd(pipewire_fd) }, None)?; - - let data = UserData::default(); - - let _listener = core - .add_listener_local() - .info(|i| log::info!("VIDEO CORE:\n{0:#?}", i)) - .error(|e, f, g, h| log::error!("{0},{1},{2},{3}", e, f, g, h)) - .done(|d, _| log::info!("DONE: {0}", d)) - .register(); - - // Set up video stream - let video_stream = Stream::new( - &core, - "waycap-video", - properties! { - *pw::keys::MEDIA_TYPE => "Video", - *pw::keys::MEDIA_CATEGORY => "Capture", - *pw::keys::MEDIA_ROLE => "Screen", - }, - )?; - - let ready_clone = Arc::clone(&self.video_ready); - let audio_ready_clone = Arc::clone(&self.audio_ready); - let _video_stream = video_stream - .add_local_listener_with_user_data(data) - .state_changed(move |_, _, old, new| { - log::debug!("Video Stream State Changed: {0:?} -> {1:?}", old, new); - ready_clone.store( - new == StreamState::Streaming, - std::sync::atomic::Ordering::Release, - ); - }) - .param_changed(move |_, user_data, id, param| { - let Some(param) = param else { - return; - }; - if id != pw::spa::param::ParamType::Format.as_raw() { - return; - } - - let (media_type, media_subtype) = - match pw::spa::param::format_utils::parse_format(param) { - Ok(v) => v, - Err(_) => return, - }; - - if media_type != pw::spa::param::format::MediaType::Video - || media_subtype != pw::spa::param::format::MediaSubtype::Raw - { - return; - } - - user_data - .video_format - .parse(param) - .expect("Failed to parse param"); - - log::debug!( - " format: {} ({:?})", - user_data.video_format.format().as_raw(), - user_data.video_format.format() - ); - - resolution_negotiation_channel - .blocking_send(( - user_data.video_format.size().width, - user_data.video_format.size().height, - )) - .unwrap(); - - log::debug!( - " size: {}x{}", - user_data.video_format.size().width, - user_data.video_format.size().height - ); - log::debug!( - " framerate: {}/{}", - user_data.video_format.framerate().num, - user_data.video_format.framerate().denom - ); - }) - .process(move |stream, _| { - match stream.dequeue_buffer() { - None => log::debug!("out of buffers"), - Some(mut buffer) => { - // Wait until audio is streaming before we try to process - if !audio_ready_clone.load(std::sync::atomic::Ordering::Acquire) - || saving.load(std::sync::atomic::Ordering::Acquire) - { - return; - } - - let datas = buffer.datas_mut(); - if datas.is_empty() { - return; - } - - let time_us = if let Ok(elapsed) = start_time.elapsed() { - elapsed.as_micros() as i64 - } else { - 0 - }; - - // send frame data to encoder - let data = &mut datas[0]; - - let fd = Self::get_dmabuf_fd(data); - - if fd.is_some() - && ringbuf_producer - .try_push(RawVideoFrame { - // No need to copy the frame data - bytes: Vec::new(), - timestamp: time_us, - dmabuf_fd: fd, - stride: data.chunk().stride(), - offset: data.chunk().offset(), - size: data.chunk().size(), - }) - .is_err() - { - log::error!( - "Error sending video frame at: {:?}. Ring buf full?", - time_us - ); - } - } - } - }) - .register()?; - - let video_spa_obj = pw::spa::pod::object!( - pw::spa::utils::SpaTypes::ObjectParamFormat, - pw::spa::param::ParamType::EnumFormat, - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::MediaType, - Id, - pw::spa::param::format::MediaType::Video - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::MediaSubtype, - Id, - pw::spa::param::format::MediaSubtype::Raw - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::VideoFormat, - Choice, - Enum, - Id, - pw::spa::param::video::VideoFormat::NV12, - pw::spa::param::video::VideoFormat::I420, - pw::spa::param::video::VideoFormat::BGRA, - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::VideoModifier, - Long, - 0 - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::VideoSize, - Choice, - Range, - Rectangle, - pw::spa::utils::Rectangle { - width: 2560, - height: 1440 - }, // Default - pw::spa::utils::Rectangle { - width: 1, - height: 1 - }, // Min - pw::spa::utils::Rectangle { - width: 4096, - height: 4096 - } // Max - ), - pw::spa::pod::property!( - pw::spa::param::format::FormatProperties::VideoFramerate, - Choice, - Range, - Fraction, - pw::spa::utils::Fraction { num: 240, denom: 1 }, // Default - pw::spa::utils::Fraction { num: 0, denom: 1 }, // Min - pw::spa::utils::Fraction { num: 244, denom: 1 } // Max - ), - ); - - let video_spa_values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( - std::io::Cursor::new(Vec::new()), - &pw::spa::pod::Value::Object(video_spa_obj), - ) - .unwrap() - .0 - .into_inner(); - - let mut video_params = [Pod::from_bytes(&video_spa_values).unwrap()]; - - video_stream.connect( - Direction::Input, - Some(stream_node), - StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS, - &mut video_params, - )?; - - log::debug!("Video Stream: {0:?}", video_stream); - - pw_loop.run(); - Ok(()) - } - - fn get_dmabuf_fd(data: &Data) -> Option { - let raw_data = data.as_raw(); - - if data.type_() == DataType::DmaBuf { - let fd = raw_data.fd; - - if fd > 0 { - return Some(fd as i32); - } - } - - None - } -} diff --git a/src/waycap.rs b/src/waycap.rs index 245e532..ca18c4f 100644 --- a/src/waycap.rs +++ b/src/waycap.rs @@ -3,18 +3,11 @@ use crate::{ application_config::{update_config, AppConfig}, dbus, modes::AppMode, - pw_capture::{audio_stream::AudioCapture, video_stream::VideoCapture}, - RawAudioFrame, RawVideoFrame, Terminate, }; use anyhow::Result; -use pipewire::{self as pw}; -use portal_screencast::{CursorMode, ScreenCast, SourceType}; -use ringbuf::{traits::Split, HeapRb}; -use std::{ - sync::{atomic::AtomicBool, Arc}, - time::{Duration, Instant, SystemTime}, -}; +use std::sync::{atomic::AtomicBool, Arc}; use tokio::sync::mpsc; +use waycap_rs::pipeline::builder::CaptureBuilder; use zbus::{connection, Connection}; pub struct WayCap { @@ -22,18 +15,15 @@ pub struct WayCap { dbus_conn: Option, dbus_save_rx: mpsc::Receiver<()>, dbus_config_rx: mpsc::Receiver, - pw_video_terminate_tx: pw::channel::Sender, - pw_audio_terminate_tx: pw::channel::Sender, mode: M, } impl WayCap { pub async fn new(mut mode: M, config: AppConfig) -> Result { simple_logging::log_to_file("logs.txt", log::LevelFilter::Trace)?; - let current_time = SystemTime::now(); let saving = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false)); - let mut join_handles: Vec> = Vec::new(); + let join_handles: Vec> = Vec::new(); let (dbus_save_tx, dbus_save_rx) = mpsc::channel(1); let (dbus_config_tx, dbus_config_rx): (mpsc::Sender, mpsc::Receiver) = @@ -47,90 +37,27 @@ impl WayCap { .build() .await?; - let audio_ready = Arc::new(AtomicBool::new(false)); - let video_ready = Arc::new(AtomicBool::new(false)); - - let (mut width, mut height) = (0, 0); - - let video_ring_buffer = HeapRb::::new(250); - let (video_ring_sender, video_ring_receiver) = video_ring_buffer.split(); - - let (pw_video_sender, pw_video_receiver) = pw::channel::channel(); - let (resolution_sender, mut resolution_receiver) = mpsc::channel::<(u32, u32)>(2); - let video_ready_pw = Arc::clone(&video_ready); - let audio_ready_pw = Arc::clone(&audio_ready); - let saving_video = Arc::clone(&saving); - - let mut screen_cast = ScreenCast::new()?; - screen_cast.set_source_types(SourceType::all()); - screen_cast.set_cursor_mode(CursorMode::EMBEDDED); - let active_cast = screen_cast.start(None)?; - - let fd = active_cast.pipewire_fd(); - let stream = active_cast.streams().next().unwrap(); - let stream_node = stream.pipewire_node(); - - let pw_video_capture = std::thread::spawn(move || { - let video_cap = VideoCapture::new(video_ready_pw, audio_ready_pw); - video_cap - .run( - fd, - stream_node, - video_ring_sender, - pw_video_receiver, - saving_video, - current_time, - resolution_sender, - ) - .unwrap(); - - let _ = active_cast.close(); - }); - - // Window mode return (0, 0) for dimensions so we have to get it from pipewire - if (width, height) == (0, 0) { - // Wait to get back a negotiated resolution from pipewire - let timeout = Duration::from_secs(5); - let start = Instant::now(); - loop { - if let Ok((recv_width, recv_height)) = resolution_receiver.try_recv() { - (width, height) = (recv_width, recv_height); - break; + let mut capture = CaptureBuilder::new() + .with_audio() + .with_quality_preset(waycap_rs::types::config::QualityPreset::Medium) + .with_cursor_shown() + .with_video_encoder(match config.encoder { + crate::application_config::EncoderToUse::H264Nvenc => { + waycap_rs::types::config::VideoEncoder::H264Nvenc } - - if start.elapsed() > timeout { - log::error!("Timeout waiting for PipeWire negotiated resolution."); - std::process::exit(1); + crate::application_config::EncoderToUse::H264Vaapi => { + waycap_rs::types::config::VideoEncoder::H264Vaapi } + }) + .with_audio_encoder(waycap_rs::types::config::AudioEncoder::Opus) + .build()?; - tokio::time::sleep(Duration::from_millis(10)).await; - } - } - join_handles.push(pw_video_capture); - - let audio_ring_buffer = HeapRb::::new(10); - let (audio_ring_sender, audio_ring_receiver) = audio_ring_buffer.split(); - let (pw_audio_sender, pw_audio_recv) = pw::channel::channel(); - let saving_audio = Arc::clone(&saving); - let pw_audio_worker = std::thread::spawn(move || { - log::debug!("Starting audio stream"); - let audio_cap = AudioCapture::new(video_ready, audio_ready); - audio_cap - .run(audio_ring_sender, current_time, pw_audio_recv, saving_audio) - .unwrap(); - }); - - join_handles.push(pw_audio_worker); - + capture.start()?; let mut ctx = AppContext { saving, stop, join_handles, - width, - height, - video_ring_receiver: Some(video_ring_receiver), - audio_ring_receiver: Some(audio_ring_receiver), - config, + capture, }; mode.init(&mut ctx).await?; @@ -139,8 +66,6 @@ impl WayCap { context: ctx, dbus_save_rx, dbus_config_rx, - pw_video_terminate_tx: pw_video_sender, - pw_audio_terminate_tx: pw_audio_sender, mode, dbus_conn: Some(connection), }) @@ -164,14 +89,6 @@ impl WayCap { } } - // Shutdown capture threads - if self.pw_video_terminate_tx.send(Terminate).is_err() { - log::error!("Error sending terminate signal to pipewire video capture."); - } - if self.pw_audio_terminate_tx.send(Terminate).is_err() { - log::error!("Error sending terminate signal to pipewire audio capture."); - } - if let Some(conn) = self.dbus_conn.take() { if let Err(e) = conn.close().await { log::error!("Error closing dbus connection: {:?}", e);