Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ simple-logging = "2.0.2"
tokio = { version = "1.43.0", features = ["full", "rt-multi-thread"] }
toml = "0.8.20"
zbus = { version = "5.3.1", features = ["tokio"] }
waycap-rs = "1.0.2"
waycap-rs = "2.0.0"
crossbeam = "0.8.4"

[profile.dev]
Expand Down
3 changes: 3 additions & 0 deletions src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::{atomic::AtomicBool, Arc};
use waycap_rs::Capture;

use crate::application_config::AppConfig;

pub struct AppContext {
pub saving: Arc<AtomicBool>,
pub stop: Arc<AtomicBool>,
pub join_handles: Vec<std::thread::JoinHandle<()>>,
pub capture: Capture,
pub config: AppConfig,
}
5 changes: 5 additions & 0 deletions src/application_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl TryFrom<AppConfigDbus> for AppConfig {
}
}

#[derive(Type, Serialize, Deserialize, PartialEq)]
pub enum AppModeDbus {
Shadow,
}

pub fn load_or_create_config() -> AppConfig {
let mut settings = Config::builder();

Expand Down
21 changes: 18 additions & 3 deletions src/dbus.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use tokio::sync::mpsc;
use zbus::interface;

use crate::application_config::{AppConfig, AppConfigDbus};
use crate::application_config::{AppConfig, AppConfigDbus, AppModeDbus};

pub trait GameClip {
async fn save_clip(&self);
async fn update_config(&self, new_config: AppConfigDbus) -> zbus::fdo::Result<()>;
async fn change_mode(&self, new_mode: AppModeDbus) -> zbus::fdo::Result<()>;
}

pub struct ClipService {
save_tx: mpsc::Sender<()>,
config_tx: mpsc::Sender<AppConfig>,
change_mode_tx: mpsc::Sender<AppModeDbus>,
}

impl ClipService {
pub fn new(save_tx: mpsc::Sender<()>, config_tx: mpsc::Sender<AppConfig>) -> Self {
Self { save_tx, config_tx }
pub fn new(
save_tx: mpsc::Sender<()>,
config_tx: mpsc::Sender<AppConfig>,
change_mode_tx: mpsc::Sender<AppModeDbus>,
) -> Self {
Self {
save_tx,
config_tx,
change_mode_tx,
}
}
}

Expand All @@ -31,4 +41,9 @@ impl GameClip for ClipService {
let _ = self.config_tx.send(config).await;
Ok(())
}

async fn change_mode(&self, new_mode: AppModeDbus) -> zbus::fdo::Result<()> {
let _ = self.change_mode_tx.send(new_mode).await;
Ok(())
}
}
9 changes: 2 additions & 7 deletions src/encoders/buffer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ fn test_video_buffer_stress_realistic() {

buffer.insert(
dts,
new_video_frame(
vec![1, 2, 3],
current_pts,
is_keyframe,
current_pts,
),
new_video_frame(vec![1, 2, 3], current_pts, is_keyframe, current_pts),
);

dts += frame_duration_us;
Expand All @@ -118,7 +113,7 @@ fn test_video_buffer_stress_realistic() {
"Buffer duration: {:.2} seconds",
duration_us as f64 / 1_000_000.0
);
assert!(duration_us >= 4_400_000 && duration_us <= 4_600_000); // Should be close to max (~4.5 seconds)
assert!((4_400_000..4_600_000).contains(&duration_us)); // Should be close to max (~4.5 seconds)
}

assert!(buffer.get_last_gop_start().is_some());
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::{Context, Error, Result};
use application_config::load_or_create_config;
use encoders::buffer::{ShadowCaptureAudioBuffer, ShadowCaptureVideoBuffer};
use ffmpeg_next::{self as ffmpeg};
use modes::shadow_cap::ShadowCapMode;
use modes::{app_mode_variant::AppModeVariant, shadow_cap::ShadowCapMode};
use pipewire::{self as pw};
use waycap::WayCap;
use waycap_rs::Capture;
Expand All @@ -33,7 +33,7 @@ async fn main() -> Result<(), Error> {
ffmpeg::init()?;
let config = load_or_create_config();
log::debug!("Config: {config:?}");
let mode = ShadowCapMode::new(config.max_seconds).await?;
let mode = AppModeVariant::Shadow(ShadowCapMode::new(config.max_seconds).await?);

let mut app = WayCap::new(mode, config).await?;

Expand Down
52 changes: 52 additions & 0 deletions src/modes/app_mode_variant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::application_config::AppModeDbus;

use super::{shadow_cap::ShadowCapMode, AppMode};

pub enum AppModeVariant {
Shadow(ShadowCapMode),
}

impl AppMode for AppModeVariant {
async fn init(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> {
match self {
AppModeVariant::Shadow(mode) => mode.init(ctx).await,
}
}

async fn on_save(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> {
match self {
AppModeVariant::Shadow(mode) => mode.on_save(ctx).await,
}
}

async fn on_exit(&mut self, ctx: &mut crate::app_context::AppContext) -> anyhow::Result<()> {
match self {
AppModeVariant::Shadow(mode) => mode.on_exit(ctx).await,
}
}

async fn on_shutdown(
&mut self,
ctx: &mut crate::app_context::AppContext,
) -> anyhow::Result<()> {
match self {
AppModeVariant::Shadow(mode) => mode.on_shutdown(ctx).await,
}
}
}

impl std::fmt::Debug for AppModeVariant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppModeVariant::Shadow(_) => write!(f, "Shadow Capture Mode"),
}
}
}

impl AppModeVariant {
pub fn to_dbus(&self) -> AppModeDbus {
match self {
AppModeVariant::Shadow(_) => AppModeDbus::Shadow,
}
}
}
2 changes: 2 additions & 0 deletions src/modes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod app_mode_variant;
pub mod shadow_cap;
use crate::app_context::AppContext;
use anyhow::Result;
Expand All @@ -6,4 +7,5 @@ pub trait AppMode: Send + 'static {
async fn init(&mut self, ctx: &mut AppContext) -> Result<()>;
async fn on_save(&mut self, ctx: &mut AppContext) -> Result<()>;
async fn on_shutdown(&mut self, ctx: &mut AppContext) -> Result<()>;
async fn on_exit(&mut self, ctx: &mut AppContext) -> Result<()>;
}
29 changes: 23 additions & 6 deletions src/modes/shadow_cap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
sync::{atomic::AtomicBool, Arc},
thread::JoinHandle,
time::Duration,
};

Expand All @@ -18,30 +19,31 @@ use super::AppMode;
pub struct ShadowCapMode {
video_buffer: Arc<Mutex<ShadowCaptureVideoBuffer>>,
audio_buffer: Arc<Mutex<ShadowCaptureAudioBuffer>>,
shadow_workers: Vec<JoinHandle<()>>,
}

impl AppMode for ShadowCapMode {
async fn init(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> {
log::debug!("Initializing context for Shadow Capture Mode");

let video_owned_recv = ctx.capture.take_video_receiver();
let video_owned_recv = ctx.capture.get_video_receiver();

let shadow_worker = Self::create_shadow_video_worker(
video_owned_recv,
Arc::clone(&self.video_buffer),
Arc::clone(&ctx.stop),
);
ctx.join_handles.push(shadow_worker);
self.shadow_workers.push(shadow_worker);

let audio_owned_recv = ctx.capture.take_audio_receiver()?;
let audio_owned_recv = ctx.capture.get_audio_receiver()?;

let audio_shadow_worker = Self::create_shadow_audio_worker(
audio_owned_recv,
Arc::clone(&self.audio_buffer),
Arc::clone(&ctx.stop),
);
ctx.join_handles.push(audio_shadow_worker);
self.shadow_workers.push(audio_shadow_worker);

ctx.capture.start()?;
log::debug!("Successfully initialized Shadow Capture Mode");
Ok(())
}
Expand Down Expand Up @@ -71,10 +73,24 @@ impl AppMode for ShadowCapMode {
async fn on_shutdown(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> {
log::info!("Shutting down");
// Stop processing new frames and exit worker threads
ctx.saving.store(true, std::sync::atomic::Ordering::Release);
ctx.stop.store(true, std::sync::atomic::Ordering::Release);
Ok(())
}

async fn on_exit(&mut self, ctx: &mut AppContext) -> anyhow::Result<()> {
// Stop worker threads and wait for them to exit
ctx.stop.store(true, std::sync::atomic::Ordering::Release);
ctx.capture.pause()?;
for worker in self.shadow_workers.drain(..) {
match worker.join() {
Ok(_) => {}
Err(e) => {
log::error!("Error in shadow worker thread: {e:?}");
}
}
}
Ok(())
}
}

impl ShadowCapMode {
Expand All @@ -92,6 +108,7 @@ impl ShadowCapMode {
audio_buffer: Arc::new(Mutex::new(ShadowCaptureAudioBuffer::new(
actual_max as usize,
))),
shadow_workers: Vec::new(),
})
}

Expand Down
57 changes: 50 additions & 7 deletions src/waycap.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use crate::{
app_context::AppContext,
application_config::{update_config, AppConfig},
application_config::{update_config, AppConfig, AppModeDbus},
dbus,
modes::AppMode,
modes::{app_mode_variant::AppModeVariant, shadow_cap::ShadowCapMode, AppMode},
};
use anyhow::Result;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;
use waycap_rs::pipeline::builder::CaptureBuilder;
use zbus::{connection, Connection};

pub struct WayCap<M: AppMode> {
pub struct WayCap {
context: AppContext,
dbus_conn: Option<Connection>,
dbus_save_rx: mpsc::Receiver<()>,
dbus_config_rx: mpsc::Receiver<AppConfig>,
mode: M,
dbus_change_mode_rx: mpsc::Receiver<AppModeDbus>,
mode: AppModeVariant,
}

impl<M: AppMode> WayCap<M> {
pub async fn new(mut mode: M, _config: AppConfig) -> Result<Self> {
impl WayCap {
pub async fn new(mut mode: AppModeVariant, config: AppConfig) -> Result<Self> {
simple_logging::log_to_file("logs.txt", log::LevelFilter::Info)?;
let saving = Arc::new(AtomicBool::new(false));
let stop = Arc::new(AtomicBool::new(false));
Expand All @@ -28,7 +29,13 @@ impl<M: AppMode> WayCap<M> {
let (dbus_save_tx, dbus_save_rx) = mpsc::channel(1);
let (dbus_config_tx, dbus_config_rx): (mpsc::Sender<AppConfig>, mpsc::Receiver<AppConfig>) =
mpsc::channel(1);
let clip_service = dbus::ClipService::new(dbus_save_tx, dbus_config_tx);
let (dbus_change_mode_tx, dbus_change_mode_rx): (
mpsc::Sender<AppModeDbus>,
mpsc::Receiver<AppModeDbus>,
) = mpsc::channel(1);

let clip_service =
dbus::ClipService::new(dbus_save_tx, dbus_config_tx, dbus_change_mode_tx);

log::debug!("Creating dbus connection");
let connection = connection::Builder::session()?
Expand All @@ -50,6 +57,7 @@ impl<M: AppMode> WayCap<M> {
stop,
join_handles,
capture,
config,
};

mode.init(&mut ctx).await?;
Expand All @@ -58,6 +66,7 @@ impl<M: AppMode> WayCap<M> {
context: ctx,
dbus_save_rx,
dbus_config_rx,
dbus_change_mode_rx,
mode,
dbus_conn: Some(connection),
})
Expand All @@ -73,6 +82,9 @@ impl<M: AppMode> WayCap<M> {
Some(cfg) = self.dbus_config_rx.recv() => {
update_config(cfg);
},
Some(new_mode) = self.dbus_change_mode_rx.recv() => {
self.try_switch_mode(new_mode).await?;
},
_ = tokio::signal::ctrl_c() => {
log::debug!("Shutting down");
self.mode.on_shutdown(&mut self.context).await?;
Expand All @@ -95,4 +107,35 @@ impl<M: AppMode> WayCap<M> {

Ok(())
}

async fn try_switch_mode(&mut self, new_mode: AppModeDbus) -> anyhow::Result<()> {
let current_mode = self.mode.to_dbus();
if new_mode == current_mode {
log::info!("Already in {:?}. Not switching", self.mode);
return Ok(());
}

log::info!("Exiting {:?}", self.mode);
self.mode.on_exit(&mut self.context).await?;

let mode = match new_mode {
AppModeDbus::Shadow => {
AppModeVariant::Shadow(ShadowCapMode::new(self.context.config.max_seconds).await?)
}
};

log::info!("Initializing {mode:?}");
self.mode = mode;

// Reset internal states
self.context
.stop
.store(false, std::sync::atomic::Ordering::Release);
self.context
.saving
.store(false, std::sync::atomic::Ordering::Release);

self.mode.init(&mut self.context).await?;
Ok(())
}
}