Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ windows = { version = "0.61.3", features = [
"Win32_UI_Shell_PropertiesSystem",
] }

[[example]]
name = "eject_stream_pipewire"
path = "examples/eject_stream_pipewire.rs"
required-features = ["pipewire"]

[[example]]
name = "enumerate_alsa"
path = "examples/enumerate_alsa.rs"
Expand Down
77 changes: 77 additions & 0 deletions examples/eject_stream_pipewire.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! This example showcases the problem with ejecting stopped pipewire streams described in
//! https://github.com/SolarLiner/interflow/issues/105
//!
//! It would be best as an integration test, but it has nontrivial prerequisites on the environment:
//! - running PipeWire daemon
//! - at least one PipeWire audio output device
//! - the `pw-link` program installed (bundled with pipewire)

use interflow::prelude::*;
use std::ops::Deref;
use std::thread;
use util::sine::SineWave;

mod util;

#[cfg(all(os_pipewire, feature = "pipewire"))]
fn main() -> Result<(), Box<dyn std::error::Error>> {
use interflow::prelude::pipewire::driver::PipewireDriver;
use std::{process, time::Duration};

env_logger::init();

let driver = PipewireDriver::new()?;

// Select the highest-priority output device. Use this rather than `driver.default_device()`
// because we need its real name for disconnecting it below.
let devices = driver.list_devices()?;
let mut device = devices
.into_iter()
.filter(|d| d.device_type().is_output())
.max_by_key(device_session_priority)
.expect("No output PipeWire devices?");
println!("Using device {}", device.name());

let config = device.default_output_config()?;
device.with_stream_name("Interflow eject test 1");
let stream_1 = device.create_output_stream(config, SineWave::new(440.0))?;

println!("Playing sine wave for 1 second, then ejecting");
thread::sleep(Duration::from_secs(1));
let callback = stream_1.eject().unwrap();

println!("Playing sine wave for another second in a new stream but old callback");
let stream_2 = device.create_output_stream(config, callback)?;
thread::sleep(Duration::from_secs(1));

// Disconnect our node from the device node. Call external program, doing this programmatically
// using pipewire-rs would be much more involved.
let mut command = process::Command::new("pw-link");
command
.arg("--disconnect")
.arg("eject_stream_pipewire")
.arg(device.name().deref());
println!("Disconnecting playback pipewire node from its device using {command:?}");
let status = command.status()?;
assert!(status.success());

println!("Ejecting the callback from the new stream");
// The hang occurred right in this call
stream_2.eject()?;

println!("Exiting cleanly");
Ok(())
}

#[cfg(all(os_pipewire, feature = "pipewire"))]
fn device_session_priority(device: &pipewire::device::PipewireDevice) -> Option<i32> {
let properties = device
.properties()
.expect("Cannot get pipewire device properties")?;

let priority_property = properties.get("priority.session")?;
let priority = priority_property
.parse()
.expect("Cannot parse priority.session as i32");
Some(priority)
}
156 changes: 98 additions & 58 deletions src/backends/pipewire/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::audio_buffer::{AudioMut, AudioRef};
use crate::backends::pipewire::error::PipewireError;
use crate::channel_map::Bitset;
use crate::prelude::pipewire::utils::{BlackHole, CallbackHolder};
use crate::timestamp::Timestamp;
use crate::{
AudioCallbackContext, AudioInput, AudioInputCallback, AudioOutput, AudioOutputCallback,
Expand All @@ -13,65 +14,33 @@ use libspa::utils::Direction;
use libspa_sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format};
use pipewire::context::Context;
use pipewire::keys;
use pipewire::main_loop::{MainLoop, WeakMainLoop};
use pipewire::main_loop::MainLoop;
use pipewire::properties::Properties;
use pipewire::stream::{Stream, StreamFlags};
use std::cell::Cell;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::sync::{Arc, Weak};
use std::thread::JoinHandle;

enum StreamCommands<Callback> {
ReceiveCallback(Callback),
Eject(oneshot::Sender<Callback>),
}

impl<Callback> fmt::Debug for StreamCommands<Callback> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::ReceiveCallback(_) => write!(f, "ReceiveCallback"),
Self::Eject(_) => write!(f, "Eject"),
}
}
}

struct StreamInner<Callback> {
commands: rtrb::Consumer<StreamCommands<Callback>>,
scratch_buffer: Box<[f32]>,
callback: Option<Callback>,
callback: Weak<CallbackHolder<Callback>>,
config: StreamConfig,
timestamp: Timestamp,
loop_ref: WeakMainLoop,
}

impl<Callback> StreamInner<Callback> {
fn handle_command(&mut self, command: StreamCommands<Callback>) {
log::debug!("Handling command: {command:?}");
match command {
StreamCommands::ReceiveCallback(callback) => {
debug_assert!(self.callback.is_none());
self.callback = Some(callback);
}
StreamCommands::Eject(reply) => {
if let Some(callback) = self.callback.take() {
reply.send(callback).unwrap();
if let Some(loop_ref) = self.loop_ref.upgrade() {
loop_ref.quit();
}
}
}
}
}

fn handle_commands(&mut self) {
while let Ok(command) = self.commands.pop() {
self.handle_command(command);
}
}

fn ejected(&self) -> bool {
self.callback.is_none()
}
}

impl<Callback: AudioOutputCallback> StreamInner<Callback> {
Expand All @@ -81,7 +50,7 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
channels,
)
.unwrap();
if let Some(callback) = self.callback.as_mut() {
if let Some(mut callback) = self.callback.upgrade() {
let context = AudioCallbackContext {
stream_config: self.config,
timestamp: self.timestamp,
Expand All @@ -91,7 +60,12 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
buffer,
timestamp: self.timestamp,
};

// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
callback.on_output_data(context, output);

self.timestamp += num_frames as u64;
num_frames
} else {
Expand All @@ -105,7 +79,7 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
let buffer =
AudioRef::from_interleaved(&self.scratch_buffer[..channels * frames], channels)
.unwrap();
if let Some(callback) = self.callback.as_mut() {
if let Some(mut callback) = self.callback.upgrade() {
let context = AudioCallbackContext {
stream_config: self.config,
timestamp: self.timestamp,
Expand All @@ -115,7 +89,12 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
buffer,
timestamp: self.timestamp,
};

// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
callback.on_input_data(context, input);

self.timestamp += num_frames as u64;
num_frames
} else {
Expand All @@ -125,19 +104,19 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
}

pub struct StreamHandle<Callback> {
commands: rtrb::Producer<StreamCommands<Callback>>,
commands: pipewire::channel::Sender<StreamCommands<Callback>>,
handle: JoinHandle<Result<(), PipewireError>>,
}

impl<Callback> AudioStreamHandle<Callback> for StreamHandle<Callback> {
type Error = PipewireError;

fn eject(mut self) -> Result<Callback, Self::Error> {
fn eject(self) -> Result<Callback, Self::Error> {
log::info!("Ejecting stream");
let (tx, rx) = oneshot::channel();
self.commands
.push(StreamCommands::Eject(tx))
.expect("Command buffer overflow");
.send(StreamCommands::Eject(tx))
.expect("Should be able to send a message through PipeWire channel");
self.handle.join().unwrap()?;
Ok(rx.recv().unwrap())
}
Expand All @@ -155,7 +134,10 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
+ Send
+ 'static,
) -> Result<Self, PipewireError> {
let (mut tx, rx) = rtrb::RingBuffer::new(16);
// Create a channel for sending command into PipeWire main loop.
let (pipewire_sender, pipewire_receiver) =
pipewire::channel::channel::<StreamCommands<Callback>>();

let handle = std::thread::spawn(move || {
let main_loop = MainLoop::new(None)?;
let context = Context::new(&main_loop)?;
Expand All @@ -180,23 +162,27 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
properties.insert(*keys::TARGET_OBJECT, device_object_serial);
}

let (callback_holder, callback_rx) = CallbackHolder::new(callback);
let callback_holder = Arc::new(callback_holder);

let stream_inner = StreamInner {
callback: Arc::downgrade(&callback_holder),
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
config,
timestamp: Timestamp::new(config.samplerate),
};

// SAFETY of StreamInner::process_input(), StreamInner::process_output() depends on us
// never _dereferencing_ `callback_holder` outside of `StreamInner`. Achieve that at
// type level by wrapping it in a black hole.
let callback_holder = BlackHole::new(callback_holder);

let stream = Stream::new(&core, &name, properties)?;
config.samplerate = config.samplerate.round();
let _listener = stream
.add_local_listener_with_user_data(StreamInner {
callback: None,
commands: rx,
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
loop_ref: main_loop.downgrade(),
config,
timestamp: Timestamp::new(config.samplerate),
})
.add_local_listener_with_user_data(stream_inner)
.process(move |stream, inner| {
log::debug!("Processing stream");
inner.handle_commands();
if inner.ejected() {
return;
}
if let Some(mut buffer) = stream.dequeue_buffer() {
let datas = buffer.datas_mut();
log::debug!("Datas: len={}", datas.len());
Expand Down Expand Up @@ -235,14 +221,48 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut params,
)?;

// Handle commands (stream ejection). Runs in the PipeWire main loop.
let loop_ref = main_loop.downgrade();
// pipewire::channel::receiver::attach() only accepts `Fn()` (instead of expected
// `FnMut()`), so we need interior mutability. Cell is sufficient.
let callback_holder = Cell::new(Some(callback_holder));
let _attached_receiver = pipewire_receiver.attach(main_loop.loop_(), move |command| {
log::debug!("Handling command: {command:?}");
match command {
StreamCommands::Eject(reply) => {
// Take the callback holder our of its `Cell`, leaving `None` in place.
let callback_holder = callback_holder.take();

if callback_holder.is_none() {
// We've already ejected the callback, nothing to do.
return;
}

// Drop our reference to the Arc, which is its only persistent strong
// reference. The `CallbackHolder` will go out of scope (usually right away,
// but if the callback is running right now in the rt thread, then after it
// releases it), and its Drop impl will send it through `callback_tx`.
drop(callback_holder);

let callback = callback_rx.recv().expect(
"channel from StreamInner to receiver in pipewire main thread should \
not be closed",
);
reply.send(callback).unwrap();
if let Some(loop_ref) = loop_ref.upgrade() {
loop_ref.quit();
}
}
}
});

log::debug!("Starting Pipewire main loop");
main_loop.run();
Ok::<_, PipewireError>(())
});
log::debug!("Sending callback to stream");
tx.push(StreamCommands::ReceiveCallback(callback)).unwrap();
Ok(Self {
commands: tx,
commands: pipewire_sender,
handle,
})
}
Expand Down Expand Up @@ -347,3 +367,23 @@ const DEFAULT_EXPECTED_FRAMES: usize = 512;
fn stream_buffer_size(range: (Option<usize>, Option<usize>)) -> usize {
range.0.or(range.1).unwrap_or(DEFAULT_EXPECTED_FRAMES)
}

/// Returns a mutable reference into the given `Arc`, without any check.
///
/// This does the same thing as unstable [`Arc::get_mut_unchecked()`], but on stable Rust.
/// The documentation including Safety prerequisites are copied from Rust stdlib.
/// This helper can be removed once `get_mut_unchecked()` is stabilized and hits our MSRV.
///
/// Unsafe variant of [`Arc::get_mut()`], which is safe and does appropriate checks.
///
/// # Safety
///
/// If any other `Arc` or [`Weak`] pointers to the same allocation exist, then
/// they must not be dereferenced or have active borrows for the duration
/// of the returned borrow, and their inner type must be exactly the same as the
/// inner type of this Rc (including lifetimes). This is trivially the case if no
/// such pointers exist, for example immediately after `Arc::new`.
unsafe fn arc_get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
let raw_pointer = Arc::as_ptr(arc) as *mut T;
unsafe { &mut *raw_pointer }
}
Loading
Loading