diff --git a/Cargo.lock b/Cargo.lock index 6a13ee7..53c1d9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2956,9 +2956,9 @@ dependencies = [ [[package]] name = "waycap-rs" -version = "1.0.2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2683ad730f668866f3c672b4652e1d6ee300e9f26cf6da42912cbf0d6907b60e" +checksum = "db71789090139f6e724524954eb7cd37ab7aacdd020858b3a58f510f3ea0559d" dependencies = [ "bytemuck", "crossbeam", diff --git a/Cargo.toml b/Cargo.toml index c2d9909..84fad4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ simple-logging = "2.0.2" tokio = { version = "1.43.0", features = ["full", "rt-multi-thread"] } toml = "0.8.20" zbus = { version = "5.3.1", features = ["tokio"] } -waycap-rs = "1.0.2" +waycap-rs = "2.0.0" crossbeam = "0.8.4" [profile.dev] diff --git a/src/app_context.rs b/src/app_context.rs index 6c053c2..b5d1568 100644 --- a/src/app_context.rs +++ b/src/app_context.rs @@ -1,9 +1,12 @@ use std::sync::{atomic::AtomicBool, Arc}; use waycap_rs::Capture; +use crate::application_config::AppConfig; + pub struct AppContext { pub saving: Arc, pub stop: Arc, pub join_handles: Vec>, pub capture: Capture, + pub config: AppConfig, } diff --git a/src/application_config.rs b/src/application_config.rs index bca6d87..3510ac5 100644 --- a/src/application_config.rs +++ b/src/application_config.rs @@ -90,6 +90,11 @@ impl TryFrom for AppConfig { } } +#[derive(Type, Serialize, Deserialize, PartialEq)] +pub enum AppModeDbus { + Shadow, +} + pub fn load_or_create_config() -> AppConfig { let mut settings = Config::builder(); diff --git a/src/dbus.rs b/src/dbus.rs index ae91bde..8a0ae40 100644 --- a/src/dbus.rs +++ b/src/dbus.rs @@ -1,21 +1,31 @@ use tokio::sync::mpsc; use zbus::interface; -use crate::application_config::{AppConfig, AppConfigDbus}; +use crate::application_config::{AppConfig, AppConfigDbus, AppModeDbus}; pub trait GameClip { async fn save_clip(&self); async fn update_config(&self, new_config: AppConfigDbus) -> zbus::fdo::Result<()>; + async fn change_mode(&self, new_mode: AppModeDbus) -> zbus::fdo::Result<()>; } pub struct ClipService { save_tx: mpsc::Sender<()>, config_tx: mpsc::Sender, + change_mode_tx: mpsc::Sender, } impl ClipService { - pub fn new(save_tx: mpsc::Sender<()>, config_tx: mpsc::Sender) -> Self { - Self { save_tx, config_tx } + pub fn new( + save_tx: mpsc::Sender<()>, + config_tx: mpsc::Sender, + change_mode_tx: mpsc::Sender, + ) -> Self { + Self { + save_tx, + config_tx, + change_mode_tx, + } } } @@ -31,4 +41,9 @@ impl GameClip for ClipService { let _ = self.config_tx.send(config).await; Ok(()) } + + async fn change_mode(&self, new_mode: AppModeDbus) -> zbus::fdo::Result<()> { + let _ = self.change_mode_tx.send(new_mode).await; + Ok(()) + } } diff --git a/src/encoders/buffer_tests.rs b/src/encoders/buffer_tests.rs index 2dd3993..40db682 100644 --- a/src/encoders/buffer_tests.rs +++ b/src/encoders/buffer_tests.rs @@ -95,12 +95,7 @@ fn test_video_buffer_stress_realistic() { buffer.insert( dts, - new_video_frame( - vec![1, 2, 3], - current_pts, - is_keyframe, - current_pts, - ), + new_video_frame(vec![1, 2, 3], current_pts, is_keyframe, current_pts), ); dts += frame_duration_us; @@ -118,7 +113,7 @@ fn test_video_buffer_stress_realistic() { "Buffer duration: {:.2} seconds", duration_us as f64 / 1_000_000.0 ); - assert!(duration_us >= 4_400_000 && duration_us <= 4_600_000); // Should be close to max (~4.5 seconds) + assert!((4_400_000..4_600_000).contains(&duration_us)); // Should be close to max (~4.5 seconds) } assert!(buffer.get_last_gop_start().is_some()); diff --git a/src/main.rs b/src/main.rs index f406042..7325c29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use anyhow::{Context, Error, Result}; use application_config::load_or_create_config; use encoders::buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer}; use ffmpeg_next::{self as ffmpeg}; -use modes::shadow_cap::ShadowCapMode; +use modes::{app_mode_variant::AppModeVariant, shadow_cap::ShadowCapMode}; use pipewire::{self as pw}; use waycap::WayCap; use waycap_rs::Capture; @@ -33,7 +33,7 @@ async fn main() -> Result<(), Error> { ffmpeg::init()?; let config = load_or_create_config(); log::debug!("Config: {config:?}"); - let mode = ShadowCapMode::new(config.max_seconds).await?; + let mode = AppModeVariant::Shadow(ShadowCapMode::new(config.max_seconds).await?); let mut app = WayCap::new(mode, config).await?; diff --git a/src/modes/app_mode_variant.rs b/src/modes/app_mode_variant.rs new file mode 100644 index 0000000..3e3c3ae --- /dev/null +++ b/src/modes/app_mode_variant.rs @@ -0,0 +1,52 @@ +use crate::application_config::AppModeDbus; + +use super::{shadow_cap::ShadowCapMode, AppMode}; + +pub enum AppModeVariant { + Shadow(ShadowCapMode), +} + +impl AppMode for AppModeVariant { + async fn init(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> { + match self { + AppModeVariant::Shadow(mode) => mode.init(ctx).await, + } + } + + async fn on_save(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> { + match self { + AppModeVariant::Shadow(mode) => mode.on_save(ctx).await, + } + } + + async fn on_exit(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> { + match self { + AppModeVariant::Shadow(mode) => mode.on_exit(ctx).await, + } + } + + async fn on_shutdown( + &mut self, + ctx: &mut crate::app_context::AppContext, + ) -> anyhow::Result<()> { + match self { + AppModeVariant::Shadow(mode) => mode.on_shutdown(ctx).await, + } + } +} + +impl std::fmt::Debug for AppModeVariant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AppModeVariant::Shadow(_) => write!(f, "Shadow Capture Mode"), + } + } +} + +impl AppModeVariant { + pub fn to_dbus(&self) -> AppModeDbus { + match self { + AppModeVariant::Shadow(_) => AppModeDbus::Shadow, + } + } +} diff --git a/src/modes/mod.rs b/src/modes/mod.rs index 3ad136e..e21ea38 100644 --- a/src/modes/mod.rs +++ b/src/modes/mod.rs @@ -1,3 +1,4 @@ +pub mod app_mode_variant; pub mod shadow_cap; use crate::app_context::AppContext; use anyhow::Result; @@ -6,4 +7,5 @@ pub trait AppMode: Send + 'static { async fn init(&mut self, ctx: &mut AppContext) -> Result<()>; async fn on_save(&mut self, ctx: &mut AppContext) -> Result<()>; async fn on_shutdown(&mut self, ctx: &mut AppContext) -> Result<()>; + async fn on_exit(&mut self, ctx: &mut AppContext) -> Result<()>; } diff --git a/src/modes/shadow_cap.rs b/src/modes/shadow_cap.rs index 0145750..b03d3e8 100644 --- a/src/modes/shadow_cap.rs +++ b/src/modes/shadow_cap.rs @@ -1,5 +1,6 @@ use std::{ sync::{atomic::AtomicBool, Arc}, + thread::JoinHandle, time::Duration, }; @@ -18,30 +19,31 @@ use super::AppMode; pub struct ShadowCapMode { video_buffer: Arc>, audio_buffer: Arc>, + shadow_workers: Vec>, } impl AppMode for ShadowCapMode { async fn init(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> { log::debug!("Initializing context for Shadow Capture Mode"); - - let video_owned_recv = ctx.capture.take_video_receiver(); + let video_owned_recv = ctx.capture.get_video_receiver(); let shadow_worker = Self::create_shadow_video_worker( video_owned_recv, Arc::clone(&self.video_buffer), Arc::clone(&ctx.stop), ); - ctx.join_handles.push(shadow_worker); + self.shadow_workers.push(shadow_worker); - let audio_owned_recv = ctx.capture.take_audio_receiver()?; + let audio_owned_recv = ctx.capture.get_audio_receiver()?; let audio_shadow_worker = Self::create_shadow_audio_worker( audio_owned_recv, Arc::clone(&self.audio_buffer), Arc::clone(&ctx.stop), ); - ctx.join_handles.push(audio_shadow_worker); + self.shadow_workers.push(audio_shadow_worker); + ctx.capture.start()?; log::debug!("Successfully initialized Shadow Capture Mode"); Ok(()) } @@ -71,10 +73,24 @@ impl AppMode for ShadowCapMode { async fn on_shutdown(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> { log::info!("Shutting down"); // Stop processing new frames and exit worker threads - ctx.saving.store(true, std::sync::atomic::Ordering::Release); ctx.stop.store(true, std::sync::atomic::Ordering::Release); Ok(()) } + + async fn on_exit(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> { + // Stop worker threads and wait for them to exit + ctx.stop.store(true, std::sync::atomic::Ordering::Release); + ctx.capture.pause()?; + for worker in self.shadow_workers.drain(..) { + match worker.join() { + Ok(_) => {} + Err(e) => { + log::error!("Error in shadow worker thread: {e:?}"); + } + } + } + Ok(()) + } } impl ShadowCapMode { @@ -92,6 +108,7 @@ impl ShadowCapMode { audio_buffer: Arc::new(Mutex::new(ShadowCaptureAudioBuffer::new( actual_max as usize, ))), + shadow_workers: Vec::new(), }) } diff --git a/src/waycap.rs b/src/waycap.rs index f2e5af0..daf2a8e 100644 --- a/src/waycap.rs +++ b/src/waycap.rs @@ -1,8 +1,8 @@ use crate::{ app_context::AppContext, - application_config::{update_config, AppConfig}, + application_config::{update_config, AppConfig, AppModeDbus}, dbus, - modes::AppMode, + modes::{app_mode_variant::AppModeVariant, shadow_cap::ShadowCapMode, AppMode}, }; use anyhow::Result; use std::sync::{atomic::AtomicBool, Arc}; @@ -10,16 +10,17 @@ use tokio::sync::mpsc; use waycap_rs::pipeline::builder::CaptureBuilder; use zbus::{connection, Connection}; -pub struct WayCap { +pub struct WayCap { context: AppContext, dbus_conn: Option, dbus_save_rx: mpsc::Receiver<()>, dbus_config_rx: mpsc::Receiver, - mode: M, + dbus_change_mode_rx: mpsc::Receiver, + mode: AppModeVariant, } -impl WayCap { - pub async fn new(mut mode: M, _config: AppConfig) -> Result { +impl WayCap { + pub async fn new(mut mode: AppModeVariant, config: AppConfig) -> Result { simple_logging::log_to_file("logs.txt", log::LevelFilter::Info)?; let saving = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false)); @@ -28,7 +29,13 @@ impl WayCap { let (dbus_save_tx, dbus_save_rx) = mpsc::channel(1); let (dbus_config_tx, dbus_config_rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(1); - let clip_service = dbus::ClipService::new(dbus_save_tx, dbus_config_tx); + let (dbus_change_mode_tx, dbus_change_mode_rx): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(1); + + let clip_service = + dbus::ClipService::new(dbus_save_tx, dbus_config_tx, dbus_change_mode_tx); log::debug!("Creating dbus connection"); let connection = connection::Builder::session()? @@ -50,6 +57,7 @@ impl WayCap { stop, join_handles, capture, + config, }; mode.init(&mut ctx).await?; @@ -58,6 +66,7 @@ impl WayCap { context: ctx, dbus_save_rx, dbus_config_rx, + dbus_change_mode_rx, mode, dbus_conn: Some(connection), }) @@ -73,6 +82,9 @@ impl WayCap { Some(cfg) = self.dbus_config_rx.recv() => { update_config(cfg); }, + Some(new_mode) = self.dbus_change_mode_rx.recv() => { + self.try_switch_mode(new_mode).await?; + }, _ = tokio::signal::ctrl_c() => { log::debug!("Shutting down"); self.mode.on_shutdown(&mut self.context).await?; @@ -95,4 +107,35 @@ impl WayCap { Ok(()) } + + async fn try_switch_mode(&mut self, new_mode: AppModeDbus) -> anyhow::Result<()> { + let current_mode = self.mode.to_dbus(); + if new_mode == current_mode { + log::info!("Already in {:?}. Not switching", self.mode); + return Ok(()); + } + + log::info!("Exiting {:?}", self.mode); + self.mode.on_exit(&mut self.context).await?; + + let mode = match new_mode { + AppModeDbus::Shadow => { + AppModeVariant::Shadow(ShadowCapMode::new(self.context.config.max_seconds).await?) + } + }; + + log::info!("Initializing {mode:?}"); + self.mode = mode; + + // Reset internal states + self.context + .stop + .store(false, std::sync::atomic::Ordering::Release); + self.context + .saving + .store(false, std::sync::atomic::Ordering::Release); + + self.mode.init(&mut self.context).await?; + Ok(()) + } }