From 8d3324635cbee8883015ad87acb10b5087d39f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Tue, 11 Nov 2025 12:22:02 +0100 Subject: [PATCH 1/3] Add eject_stream_pipewire.rs example to demonstrate the problem --- Cargo.toml | 5 ++ examples/eject_stream_pipewire.rs | 77 +++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 examples/eject_stream_pipewire.rs diff --git a/Cargo.toml b/Cargo.toml index 95cee5f..528fb5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,11 @@ windows = { version = "0.61.3", features = [ "Win32_UI_Shell_PropertiesSystem", ] } +[[example]] +name = "eject_stream_pipewire" +path = "examples/eject_stream_pipewire.rs" +required-features = ["pipewire"] + [[example]] name = "enumerate_alsa" path = "examples/enumerate_alsa.rs" diff --git a/examples/eject_stream_pipewire.rs b/examples/eject_stream_pipewire.rs new file mode 100644 index 0000000..3241bf2 --- /dev/null +++ b/examples/eject_stream_pipewire.rs @@ -0,0 +1,77 @@ +//! This example showcases the problem with ejecting stopped pipewire streams described in +//! https://github.com/SolarLiner/interflow/issues/105 +//! +//! It would be best as an integration test, but it has nontrivial prerequisites on the environment: +//! - running PipeWire daemon +//! - at least one PipeWire audio output device +//! - the `pw-link` program installed (bundled with pipewire) + +use interflow::prelude::*; +use std::ops::Deref; +use std::thread; +use util::sine::SineWave; + +mod util; + +#[cfg(all(os_pipewire, feature = "pipewire"))] +fn main() -> Result<(), Box> { + use interflow::prelude::pipewire::driver::PipewireDriver; + use std::{process, time::Duration}; + + env_logger::init(); + + let driver = PipewireDriver::new()?; + + // Select the highest-priority output device. Use this rather than `driver.default_device()` + // because we need its real name for disconnecting it below. + let devices = driver.list_devices()?; + let mut device = devices + .into_iter() + .filter(|d| d.device_type().is_output()) + .max_by_key(device_session_priority) + .expect("No output PipeWire devices?"); + println!("Using device {}", device.name()); + + let config = device.default_output_config()?; + device.with_stream_name("Interflow eject test 1"); + let stream_1 = device.create_output_stream(config, SineWave::new(440.0))?; + + println!("Playing sine wave for 1 second, then ejecting"); + thread::sleep(Duration::from_secs(1)); + let callback = stream_1.eject().unwrap(); + + println!("Playing sine wave for another second in a new stream but old callback"); + let stream_2 = device.create_output_stream(config, callback)?; + thread::sleep(Duration::from_secs(1)); + + // Disconnect our node from the device node. Call external program, doing this programmatically + // using pipewire-rs would be much more involved. + let mut command = process::Command::new("pw-link"); + command + .arg("--disconnect") + .arg("eject_stream_pipewire") + .arg(device.name().deref()); + println!("Disconnecting playback pipewire node from its device using {command:?}"); + let status = command.status()?; + assert!(status.success()); + + println!("Ejecting the callback from the new stream"); + // The hang occurred right in this call + stream_2.eject()?; + + println!("Exiting cleanly"); + Ok(()) +} + +#[cfg(all(os_pipewire, feature = "pipewire"))] +fn device_session_priority(device: &pipewire::device::PipewireDevice) -> Option { + let properties = device + .properties() + .expect("Cannot get pipewire device properties")?; + + let priority_property = properties.get("priority.session")?; + let priority = priority_property + .parse() + .expect("Cannot parse priority.session as i32"); + Some(priority) +} From 1d600d6dc529e12c66e5d01263aacdb79578fc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Sun, 9 Nov 2025 15:57:13 +0100 Subject: [PATCH 2/3] pipewire stream: pass callback directly ..instead of sending it through a command channel. I haven't found the reason to do this the more complicated way..? --- src/backends/pipewire/stream.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/backends/pipewire/stream.rs b/src/backends/pipewire/stream.rs index 4fac9f2..dc5acb2 100644 --- a/src/backends/pipewire/stream.rs +++ b/src/backends/pipewire/stream.rs @@ -22,14 +22,12 @@ use std::fmt::Formatter; use std::thread::JoinHandle; enum StreamCommands { - ReceiveCallback(Callback), Eject(oneshot::Sender), } impl fmt::Debug for StreamCommands { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - Self::ReceiveCallback(_) => write!(f, "ReceiveCallback"), Self::Eject(_) => write!(f, "Eject"), } } @@ -48,10 +46,6 @@ impl StreamInner { fn handle_command(&mut self, command: StreamCommands) { log::debug!("Handling command: {command:?}"); match command { - StreamCommands::ReceiveCallback(callback) => { - debug_assert!(self.callback.is_none()); - self.callback = Some(callback); - } StreamCommands::Eject(reply) => { if let Some(callback) = self.callback.take() { reply.send(callback).unwrap(); @@ -155,7 +149,7 @@ impl StreamHandle { + Send + 'static, ) -> Result { - let (mut tx, rx) = rtrb::RingBuffer::new(16); + let (tx, rx) = rtrb::RingBuffer::new(16); let handle = std::thread::spawn(move || { let main_loop = MainLoop::new(None)?; let context = Context::new(&main_loop)?; @@ -184,7 +178,7 @@ impl StreamHandle { config.samplerate = config.samplerate.round(); let _listener = stream .add_local_listener_with_user_data(StreamInner { - callback: None, + callback: Some(callback), commands: rx, scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(), loop_ref: main_loop.downgrade(), @@ -239,8 +233,6 @@ impl StreamHandle { main_loop.run(); Ok::<_, PipewireError>(()) }); - log::debug!("Sending callback to stream"); - tx.push(StreamCommands::ReceiveCallback(callback)).unwrap(); Ok(Self { commands: tx, handle, From 135935947a4bb6025b37f5d3c68161084cb99c68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Mon, 10 Nov 2025 23:10:31 +0100 Subject: [PATCH 3/3] pipewire: handle commands in the main loop using pw channel Attempt to fix https://github.com/SolarLiner/interflow/issues/105 v2: don't use a `Mutex`, do some Arc/Weak/Drop/channel/unsave tricks instead. --- src/backends/pipewire/stream.rs | 148 +++++++++++++++++++++----------- src/backends/pipewire/utils.rs | 69 +++++++++++++++ 2 files changed, 167 insertions(+), 50 deletions(-) diff --git a/src/backends/pipewire/stream.rs b/src/backends/pipewire/stream.rs index dc5acb2..16a28cc 100644 --- a/src/backends/pipewire/stream.rs +++ b/src/backends/pipewire/stream.rs @@ -1,6 +1,7 @@ use crate::audio_buffer::{AudioMut, AudioRef}; use crate::backends::pipewire::error::PipewireError; use crate::channel_map::Bitset; +use crate::prelude::pipewire::utils::{BlackHole, CallbackHolder}; use crate::timestamp::Timestamp; use crate::{ AudioCallbackContext, AudioInput, AudioInputCallback, AudioOutput, AudioOutputCallback, @@ -13,12 +14,14 @@ use libspa::utils::Direction; use libspa_sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format}; use pipewire::context::Context; use pipewire::keys; -use pipewire::main_loop::{MainLoop, WeakMainLoop}; +use pipewire::main_loop::MainLoop; use pipewire::properties::Properties; use pipewire::stream::{Stream, StreamFlags}; +use std::cell::Cell; use std::collections::HashMap; use std::fmt; use std::fmt::Formatter; +use std::sync::{Arc, Weak}; use std::thread::JoinHandle; enum StreamCommands { @@ -34,38 +37,10 @@ impl fmt::Debug for StreamCommands { } struct StreamInner { - commands: rtrb::Consumer>, scratch_buffer: Box<[f32]>, - callback: Option, + callback: Weak>, config: StreamConfig, timestamp: Timestamp, - loop_ref: WeakMainLoop, -} - -impl StreamInner { - fn handle_command(&mut self, command: StreamCommands) { - log::debug!("Handling command: {command:?}"); - match command { - StreamCommands::Eject(reply) => { - if let Some(callback) = self.callback.take() { - reply.send(callback).unwrap(); - if let Some(loop_ref) = self.loop_ref.upgrade() { - loop_ref.quit(); - } - } - } - } - } - - fn handle_commands(&mut self) { - while let Ok(command) = self.commands.pop() { - self.handle_command(command); - } - } - - fn ejected(&self) -> bool { - self.callback.is_none() - } } impl StreamInner { @@ -75,7 +50,7 @@ impl StreamInner { channels, ) .unwrap(); - if let Some(callback) = self.callback.as_mut() { + if let Some(mut callback) = self.callback.upgrade() { let context = AudioCallbackContext { stream_config: self.config, timestamp: self.timestamp, @@ -85,7 +60,12 @@ impl StreamInner { buffer, timestamp: self.timestamp, }; + + // SAFETY: there is max one other owner of the callback Arc, and it never dereferences + // it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`. + let callback = unsafe { arc_get_mut_unchecked(&mut callback) }; callback.on_output_data(context, output); + self.timestamp += num_frames as u64; num_frames } else { @@ -99,7 +79,7 @@ impl StreamInner { let buffer = AudioRef::from_interleaved(&self.scratch_buffer[..channels * frames], channels) .unwrap(); - if let Some(callback) = self.callback.as_mut() { + if let Some(mut callback) = self.callback.upgrade() { let context = AudioCallbackContext { stream_config: self.config, timestamp: self.timestamp, @@ -109,7 +89,12 @@ impl StreamInner { buffer, timestamp: self.timestamp, }; + + // SAFETY: there is max one other owner of the callback Arc, and it never dereferences + // it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`. + let callback = unsafe { arc_get_mut_unchecked(&mut callback) }; callback.on_input_data(context, input); + self.timestamp += num_frames as u64; num_frames } else { @@ -119,19 +104,19 @@ impl StreamInner { } pub struct StreamHandle { - commands: rtrb::Producer>, + commands: pipewire::channel::Sender>, handle: JoinHandle>, } impl AudioStreamHandle for StreamHandle { type Error = PipewireError; - fn eject(mut self) -> Result { + fn eject(self) -> Result { log::info!("Ejecting stream"); let (tx, rx) = oneshot::channel(); self.commands - .push(StreamCommands::Eject(tx)) - .expect("Command buffer overflow"); + .send(StreamCommands::Eject(tx)) + .expect("Should be able to send a message through PipeWire channel"); self.handle.join().unwrap()?; Ok(rx.recv().unwrap()) } @@ -149,7 +134,10 @@ impl StreamHandle { + Send + 'static, ) -> Result { - let (tx, rx) = rtrb::RingBuffer::new(16); + // Create a channel for sending command into PipeWire main loop. + let (pipewire_sender, pipewire_receiver) = + pipewire::channel::channel::>(); + let handle = std::thread::spawn(move || { let main_loop = MainLoop::new(None)?; let context = Context::new(&main_loop)?; @@ -174,23 +162,27 @@ impl StreamHandle { properties.insert(*keys::TARGET_OBJECT, device_object_serial); } + let (callback_holder, callback_rx) = CallbackHolder::new(callback); + let callback_holder = Arc::new(callback_holder); + + let stream_inner = StreamInner { + callback: Arc::downgrade(&callback_holder), + scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(), + config, + timestamp: Timestamp::new(config.samplerate), + }; + + // SAFETY of StreamInner::process_input(), StreamInner::process_output() depends on us + // never _dereferencing_ `callback_holder` outside of `StreamInner`. Achieve that at + // type level by wrapping it in a black hole. + let callback_holder = BlackHole::new(callback_holder); + let stream = Stream::new(&core, &name, properties)?; config.samplerate = config.samplerate.round(); let _listener = stream - .add_local_listener_with_user_data(StreamInner { - callback: Some(callback), - commands: rx, - scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(), - loop_ref: main_loop.downgrade(), - config, - timestamp: Timestamp::new(config.samplerate), - }) + .add_local_listener_with_user_data(stream_inner) .process(move |stream, inner| { log::debug!("Processing stream"); - inner.handle_commands(); - if inner.ejected() { - return; - } if let Some(mut buffer) = stream.dequeue_buffer() { let datas = buffer.datas_mut(); log::debug!("Datas: len={}", datas.len()); @@ -229,12 +221,48 @@ impl StreamHandle { StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, &mut params, )?; + + // Handle commands (stream ejection). Runs in the PipeWire main loop. + let loop_ref = main_loop.downgrade(); + // pipewire::channel::receiver::attach() only accepts `Fn()` (instead of expected + // `FnMut()`), so we need interior mutability. Cell is sufficient. + let callback_holder = Cell::new(Some(callback_holder)); + let _attached_receiver = pipewire_receiver.attach(main_loop.loop_(), move |command| { + log::debug!("Handling command: {command:?}"); + match command { + StreamCommands::Eject(reply) => { + // Take the callback holder our of its `Cell`, leaving `None` in place. + let callback_holder = callback_holder.take(); + + if callback_holder.is_none() { + // We've already ejected the callback, nothing to do. + return; + } + + // Drop our reference to the Arc, which is its only persistent strong + // reference. The `CallbackHolder` will go out of scope (usually right away, + // but if the callback is running right now in the rt thread, then after it + // releases it), and its Drop impl will send it through `callback_tx`. + drop(callback_holder); + + let callback = callback_rx.recv().expect( + "channel from StreamInner to receiver in pipewire main thread should \ + not be closed", + ); + reply.send(callback).unwrap(); + if let Some(loop_ref) = loop_ref.upgrade() { + loop_ref.quit(); + } + } + } + }); + log::debug!("Starting Pipewire main loop"); main_loop.run(); Ok::<_, PipewireError>(()) }); Ok(Self { - commands: tx, + commands: pipewire_sender, handle, }) } @@ -339,3 +367,23 @@ const DEFAULT_EXPECTED_FRAMES: usize = 512; fn stream_buffer_size(range: (Option, Option)) -> usize { range.0.or(range.1).unwrap_or(DEFAULT_EXPECTED_FRAMES) } + +/// Returns a mutable reference into the given `Arc`, without any check. +/// +/// This does the same thing as unstable [`Arc::get_mut_unchecked()`], but on stable Rust. +/// The documentation including Safety prerequisites are copied from Rust stdlib. +/// This helper can be removed once `get_mut_unchecked()` is stabilized and hits our MSRV. +/// +/// Unsafe variant of [`Arc::get_mut()`], which is safe and does appropriate checks. +/// +/// # Safety +/// +/// If any other `Arc` or [`Weak`] pointers to the same allocation exist, then +/// they must not be dereferenced or have active borrows for the duration +/// of the returned borrow, and their inner type must be exactly the same as the +/// inner type of this Rc (including lifetimes). This is trivially the case if no +/// such pointers exist, for example immediately after `Arc::new`. +unsafe fn arc_get_mut_unchecked(arc: &mut Arc) -> &mut T { + let raw_pointer = Arc::as_ptr(arc) as *mut T; + unsafe { &mut *raw_pointer } +} diff --git a/src/backends/pipewire/utils.rs b/src/backends/pipewire/utils.rs index bb01eb8..9f0fb4b 100644 --- a/src/backends/pipewire/utils.rs +++ b/src/backends/pipewire/utils.rs @@ -5,7 +5,9 @@ use pipewire::context::Context; use pipewire::main_loop::MainLoop; use pipewire::registry::GlobalObject; use std::cell::{Cell, RefCell}; +use std::ops::{Deref, DerefMut}; use std::rc::Rc; +use std::sync::mpsc; fn get_device_type(object: &GlobalObject<&DictRef>) -> Option { fn is_input(media_class: &str) -> bool { @@ -89,3 +91,70 @@ pub fn get_devices() -> Result, PipewireError> { drop(_listener_reg); Ok(Rc::into_inner(data).unwrap().into_inner()) } + +/// A little helper that holds user's callback and sends it out using a channel when it goes out of +/// scope. Dereferences to `Callback`, including mutably. +pub(super) struct CallbackHolder { + /// Invariant: `callback` is always `Some`, except in the second half of the [`Drop`] impl. + callback: Option, + tx: mpsc::SyncSender, +} + +impl CallbackHolder { + /// Returns a pair (self, rx), where `rx` should be used to fetch the callback when the holder + /// goes out of scope. + pub(super) fn new(callback: Callback) -> (Self, mpsc::Receiver) { + // Our first choice would be and `rtrb` channel, but that doesn't allow receiver to wait + // for a message, which we need. It doesn't matter, we use a channel of capacity 1 and + // we only use it exactly once, it never blocks in this case. + let (tx, rx) = mpsc::sync_channel(1); + let myself = Self { + callback: Some(callback), + tx, + }; + (myself, rx) + } +} + +impl Deref for CallbackHolder { + type Target = Callback; + + fn deref(&self) -> &Callback { + self.callback + .as_ref() + .expect("never None outside destructor") + } +} + +impl DerefMut for CallbackHolder { + fn deref_mut(&mut self) -> &mut Callback { + self.callback + .as_mut() + .expect("never None outside destructor") + } +} + +impl Drop for CallbackHolder { + fn drop(&mut self) { + let callback = self.callback.take().expect("never None outside destructor"); + match self.tx.try_send(callback) { + Ok(()) => (), + Err(mpsc::TrySendError::Full(_)) => { + panic!("The channel in CallbackHolder should be never full") + } + Err(mpsc::TrySendError::Disconnected(_)) => log::warn!( + "Channel in CallbackHolder is disconnected, did PipeWire main loop already exit?" + ), + } + } +} + +/// Allows you to send to value to a black hole. It keeps at alive as long as it is in scope, but +/// you cannot get the value back in any way. +pub struct BlackHole(T); + +impl BlackHole { + pub fn new(wrapped: T) -> Self { + Self(wrapped) + } +}