From 5514c475bca1857365be899acb586370efccba43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 15:06:50 +0100 Subject: [PATCH 01/11] Make rcl_publisher_t have stable location This change is necessary because rcl uses address of the rcl handle in tracepoints. If the location changes, further publish events will not be correctly assigned to the publisher. --- r2r/src/nodes.rs | 14 ++++++-------- r2r/src/publishers.rs | 22 ++++++++++++++-------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 435746a17..43a851cf2 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -962,11 +962,10 @@ impl Node { where T: WrappedTypesupport, { - let publisher_handle = + let publisher_arc = create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let arc = Arc::new(publisher_handle); - let p = make_publisher(Arc::downgrade(&arc)); - self.pubs.push(arc); + let p = make_publisher(Arc::downgrade(&publisher_arc)); + self.pubs.push(publisher_arc); Ok(p) } @@ -977,11 +976,10 @@ impl Node { &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, ) -> Result { let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?; - let publisher_handle = + let publisher_arc = create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?; - let arc = Arc::new(publisher_handle); - let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned()); - self.pubs.push(arc); + let p = make_publisher_untyped(Arc::downgrade(&publisher_arc), topic_type.to_owned()); + self.pubs.push(publisher_arc); Ok(p) } diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index cb3bd3bd7..658ecd1a2 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -3,7 +3,7 @@ use std::{ ffi::{c_void, CString}, fmt::Debug, marker::PhantomData, - sync::{Mutex, Once, Weak}, + sync::{Arc, Mutex, Once, Weak}, }; use crate::{error::*, msg_types::*, qos::QosProfile}; @@ -137,15 +137,24 @@ pub fn make_publisher_untyped(handle: Weak, type_: String) -> Publis pub fn create_publisher_helper( node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t, qos_profile: QosProfile, -) -> Result { - let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; +) -> Result> { let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + // Allocate the memory now so that the location of the rcl handle + // does not change after call to rcl_publisher_init. + // This is important because tracing in rcl expects the handle to be at a fixed location. + let mut publisher_arc = Arc::new(Publisher_ { + handle: unsafe { rcl_get_zero_initialized_publisher() }, + poll_inter_process_subscriber_channels: Mutex::new(Vec::new()), + }); + let publisher_mut = Arc::get_mut(&mut publisher_arc) + .expect("No other Arc should exist. The Arc was just created."); + let result = unsafe { let mut publisher_options = rcl_publisher_get_default_options(); publisher_options.qos = qos_profile.into(); rcl_publisher_init( - &mut publisher_handle, + &mut publisher_mut.handle, node, typesupport, topic_c_string.as_ptr(), @@ -153,10 +162,7 @@ pub fn create_publisher_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(Publisher_ { - handle: publisher_handle, - poll_inter_process_subscriber_channels: Mutex::new(Vec::new()), - }) + Ok(publisher_arc) } else { Err(Error::from_rcl_error(result)) } From a51bda8901b54e23f978332066c67662520cba30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 15:05:29 +0100 Subject: [PATCH 02/11] Make rcl_subscription_t have stable location This change is necessary because rcl uses address of the rcl handle in tracepoints. If the location changes further `take` events will not be correctly assigned to the subscriber. --- r2r/src/nodes.rs | 88 ++++++++++++++++++++++++++++++++---------- r2r/src/subscribers.rs | 13 +++---- r2r/src/time_source.rs | 35 ++++++++++------- 3 files changed, 96 insertions(+), 40 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 43a851cf2..99787380d 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -653,15 +653,27 @@ impl Node { where T: WrappedTypesupport, { - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::(10); - let ws = TypedSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(TypedSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + T::get_ts(), + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + self.subscribers.push(subscription); Ok(receiver) } @@ -674,15 +686,27 @@ impl Node { where T: WrappedTypesupport, { - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = NativeSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(NativeSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + T::get_ts(), + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + self.subscribers.push(subscription); Ok(receiver) } @@ -694,16 +718,28 @@ impl Node { &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, ) -> Result> + Unpin> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = UntypedSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(UntypedSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, topic_type: topic_type.to_string(), sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + msg.ts, + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + self.subscribers.push(subscription); Ok(receiver) } @@ -739,16 +775,28 @@ impl Node { return Err(Error::from_rcl_error(ret)); } - let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = RawSubscriber { - rcl_handle: subscription_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut subscription = Box::new(RawSubscriber { + rcl_handle: unsafe { rcl_get_zero_initialized_subscription() }, msg_buf, sender, + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscription.rcl_handle, + self.node_handle.as_mut(), + topic, + msg.ts, + qos_profile, + )?; }; - self.subscribers.push(Box::new(ws)); + self.subscribers.push(subscription); Ok(receiver) } diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 5ee510209..e2a0419fe 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -234,18 +234,17 @@ impl Subscriber_ for RawSubscriber { } } -pub fn create_subscription_helper( - node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; +pub unsafe fn create_subscription_helper( + subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str, + ts: *const rosidl_message_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; let result = unsafe { let mut subscription_options = rcl_subscription_get_default_options(); subscription_options.qos = qos_profile.into(); rcl_subscription_init( - &mut subscription_handle, + subscription_handle, node, ts, topic_c_string.as_ptr(), @@ -253,7 +252,7 @@ pub fn create_subscription_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(subscription_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index 91b1a09c6..bbbbb13e1 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -9,8 +9,8 @@ use crate::{ Clock, ClockType, Node, QosProfile, WrappedTypesupport, }; use r2r_rcl::{ - rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t, - rmw_message_info_t, RCL_RET_OK, + rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t, + rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK, }; use std::sync::{Arc, Mutex, Weak}; @@ -102,7 +102,7 @@ impl TimeSource { match inner.subscriber_state { TimeSourceSubscriberState::None => { let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?; - node.subscribers.push(Box::new(subscriber)); + node.subscribers.push(subscriber); inner.subscriber_state = TimeSourceSubscriberState::Active; } TimeSourceSubscriberState::ToBeDestroyed => { @@ -191,20 +191,29 @@ impl TimeSource_ { } impl TimeSourceSubscriber { - fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result { + fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result> { // The values are set based on default values in rclcpp let qos = QosProfile::default().keep_last(1).best_effort(); - let subscriber = create_subscription_helper( - node_handle, - "/clock", - crate::rosgraph_msgs::msg::Clock::get_ts(), - qos, - )?; - Ok(Self { - subscriber_handle: subscriber, + let mut subscriber = Box::new(Self { + subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() }, time_source, - }) + }); + + // SAFETY: + // create_subscription_helper requires zero initialized subscription_handle -> done above + // Completes initialization of subscription. + unsafe { + create_subscription_helper( + &mut subscriber.subscriber_handle, + node_handle, + "/clock", + crate::rosgraph_msgs::msg::Clock::get_ts(), + qos, + )? + }; + + Ok(subscriber) } } From 23672cfdb80974195384cb8b97ffa19c651c8861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 15:07:45 +0100 Subject: [PATCH 03/11] Make rcl_service_t have stable location This prevents another service initializing at the same location. Therefore preventing conflicts in a trace. --- r2r/src/nodes.rs | 31 ++++++++++++++++++++++--------- r2r/src/services.rs | 16 +++++++++------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 99787380d..b56f564c8 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -810,20 +810,33 @@ impl Node { where T: WrappedServiceTypeSupport, { - let service_handle = create_service_helper( - self.node_handle.as_mut(), - service_name, - T::get_ts(), - qos_profile, - )?; let (sender, receiver) = mpsc::channel::>(10); - let ws = TypedService:: { - rcl_handle: service_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut service_arc = Arc::new(Mutex::new(TypedService:: { + rcl_handle: unsafe { rcl_get_zero_initialized_service() }, sender, + })); + let service_ref = Arc::get_mut(&mut service_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The service was zero initialized above. + // Full initialization happens in `create_service_helper``. + unsafe { + create_service_helper( + &mut service_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + T::get_ts(), + qos_profile, + )?; }; - self.services.push(Arc::new(Mutex::new(ws))); + // Only push after full initialization. + self.services.push(service_arc); Ok(receiver) } diff --git a/r2r/src/services.rs b/r2r/src/services.rs index e4fd8c846..c9c820b3d 100644 --- a/r2r/src/services.rs +++ b/r2r/src/services.rs @@ -111,11 +111,13 @@ where } } -pub fn create_service_helper( - node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut service_handle = unsafe { rcl_get_zero_initialized_service() }; +/// Initializes the service. +/// +/// SAFETY: requires that the service handle is zero initialized by [`rcl_get_zero_initialized_service`]. +pub unsafe fn create_service_helper( + service_handle: &mut rcl_service_t, node: &mut rcl_node_t, service_name: &str, + service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let service_name_c_string = CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -123,7 +125,7 @@ pub fn create_service_helper( let mut service_options = rcl_service_get_default_options(); service_options.qos = qos_profile.into(); rcl_service_init( - &mut service_handle, + service_handle, node, service_ts, service_name_c_string.as_ptr(), @@ -131,7 +133,7 @@ pub fn create_service_helper( ) }; if result == RCL_RET_OK as i32 { - Ok(service_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } From e371130cf1c23dfabd4a512abe2ca454caba9a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 14:57:05 +0100 Subject: [PATCH 04/11] Make rcl_client_t have stable location This prevents another client initializing at the same location. Therefore preventing conflicts in a trace. --- r2r/src/clients.rs | 14 +++++------ r2r/src/nodes.rs | 59 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/r2r/src/clients.rs b/r2r/src/clients.rs index 67d2d68ed..2b5267483 100644 --- a/r2r/src/clients.rs +++ b/r2r/src/clients.rs @@ -318,11 +318,10 @@ impl Client_ for UntypedClient_ { } } -pub fn create_client_helper( - node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t, - qos_profile: QosProfile, -) -> Result { - let mut client_handle = unsafe { rcl_get_zero_initialized_client() }; +pub unsafe fn create_client_helper( + client_handle: &mut rcl_client_t, node: *mut rcl_node_t, service_name: &str, + service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile, +) -> Result<()> { let service_name_c_string = CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; @@ -330,15 +329,16 @@ pub fn create_client_helper( let mut client_options = rcl_client_get_default_options(); client_options.qos = qos_profile.into(); rcl_client_init( - &mut client_handle, + client_handle, node, service_ts, service_name_c_string.as_ptr(), &client_options, ) }; + if result == RCL_RET_OK as i32 { - Ok(client_handle) + Ok(()) } else { Err(Error::from_rcl_error(result)) } diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index b56f564c8..ef4d9a42d 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -849,19 +849,30 @@ impl Node { where T: WrappedServiceTypeSupport, { - let client_handle = create_client_helper( - self.node_handle.as_mut(), - service_name, - T::get_ts(), - qos_profile, - )?; - let ws = TypedClient:: { - rcl_handle: client_handle, + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut client_arc = Arc::new(Mutex::new(TypedClient:: { + rcl_handle: unsafe { rcl_get_zero_initialized_client() }, response_channels: Vec::new(), poll_available_channels: Vec::new(), + })); + let client_ref = Arc::get_mut(&mut client_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The client was zero initialized above. + // Full initialization happens in `create_client_helper`. + unsafe { + create_client_helper( + &mut client_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + T::get_ts(), + qos_profile, + )?; }; - let client_arc = Arc::new(Mutex::new(ws)); let c = make_client(Arc::downgrade(&client_arc)); self.clients.push(client_arc); Ok(c) @@ -877,20 +888,32 @@ impl Node { &mut self, service_name: &str, service_type: &str, qos_profile: QosProfile, ) -> Result { let service_type = UntypedServiceSupport::new_from(service_type)?; - let client_handle = create_client_helper( - self.node_handle.as_mut(), - service_name, - service_type.ts, - qos_profile, - )?; - let client = UntypedClient_ { + + // SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block. + let mut client_arc = Arc::new(Mutex::new(UntypedClient_ { service_type, - rcl_handle: client_handle, + rcl_handle: unsafe { rcl_get_zero_initialized_client() }, response_channels: Vec::new(), poll_available_channels: Vec::new(), + })); + let client_ref = Arc::get_mut(&mut client_arc) + .unwrap() // No other Arc should exist. The Arc was just created. + .get_mut() + .unwrap(); // The mutex was just created. It should not be poisoned. + + // SAFETY: + // The client was zero initialized above. + // Full initialization happens in `create_client_helper`. + unsafe { + create_client_helper( + &mut client_ref.rcl_handle, + self.node_handle.as_mut(), + service_name, + client_ref.service_type.ts, + qos_profile, + )?; }; - let client_arc = Arc::new(Mutex::new(client)); let c = make_untyped_client(Arc::downgrade(&client_arc)); self.clients.push(client_arc); Ok(c) From cf03fe1b489a2d164e32a20985a01f8d8d769584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Wed, 7 May 2025 12:47:11 +0200 Subject: [PATCH 05/11] Add r2r_tracing crate The crate contains tracepoints; either defined directly or imported from ros2_tracing project (tracetools package). The crate define a `tracing` feature which should only be used on Linux because it uses LTTng only available on Linux. --- Cargo.toml | 1 + r2r_tracing/Cargo.toml | 19 +++ r2r_tracing/README.md | 48 ++++++ r2r_tracing/build.rs | 76 ++++++++++ r2r_tracing/src/lib.rs | 16 ++ r2r_tracing/src/macros.rs | 26 ++++ r2r_tracing/src/r2r_tracepoints.rs | 32 ++++ r2r_tracing/src/r2r_tracepoints_bindings.rs | 1 + r2r_tracing/src/rclcpp_tracepoints.rs | 158 ++++++++++++++++++++ r2r_tracing/src/tracetools_bindings.rs | 6 + r2r_tracing/src/tracetools_wrapper.h | 1 + 11 files changed, 384 insertions(+) create mode 100644 r2r_tracing/Cargo.toml create mode 100644 r2r_tracing/README.md create mode 100644 r2r_tracing/build.rs create mode 100644 r2r_tracing/src/lib.rs create mode 100644 r2r_tracing/src/macros.rs create mode 100644 r2r_tracing/src/r2r_tracepoints.rs create mode 100644 r2r_tracing/src/r2r_tracepoints_bindings.rs create mode 100644 r2r_tracing/src/rclcpp_tracepoints.rs create mode 100644 r2r_tracing/src/tracetools_bindings.rs create mode 100644 r2r_tracing/src/tracetools_wrapper.h diff --git a/Cargo.toml b/Cargo.toml index e91f1802d..24e452c42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ members = [ "r2r_msg_gen", "r2r_macros", "r2r_rcl", + "r2r_tracing", ] diff --git a/r2r_tracing/Cargo.toml b/r2r_tracing/Cargo.toml new file mode 100644 index 000000000..41313244b --- /dev/null +++ b/r2r_tracing/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "r2r_tracing" +description = "Internal dependency containing tracepoint definitions or imports for r2r." +authors = ["Martin Škoudlil "] +version = "0.9.5" +license = "MIT" +edition = "2021" +repository = "https://github.com/sequenceplanner/r2r" +readme = "README.md" + +[dependencies] +r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" } + +[build-dependencies] +lttng-ust-generate = { git = "https://github.com/skoudmar/lttng-ust-rs.git", rev = "bd437b5eeb8c4378d78ebb5aa17f0f4a95828b32", optional = true} +r2r_common = { path = "../r2r_common", version = "0.9.5", optional = true} + +[features] +tracing = ["dep:lttng-ust-generate", "dep:r2r_common"] diff --git a/r2r_tracing/README.md b/r2r_tracing/README.md new file mode 100644 index 000000000..347332fe5 --- /dev/null +++ b/r2r_tracing/README.md @@ -0,0 +1,48 @@ +# r2r_tracing + +Internal dependency containing tracepoint definitions or imports from `tracetools` ROS package for r2r. + +Uses LTTng tracing framework. + +## Feature flag `tracing` + +The crate will generate and link tracepoint libraries only when the feature flag is enabled. + +Without specifying the feature flag `tracing` all exported functions are No-ops. + +## Depends on + +- `tracetools` ROS package + - This package is a part of ROS distribution. + - `r2r_tracing` dynamically loads `tracetools` library to obtain tracepoints used by `rclcpp`. +- `lttng-ust` crate + - To define additional tracepoints. + +## Recording traces of R2R applications + +Make sure to enable feature flag `tracing`. + +Then start tracing session: + +- Either by installing [`ros2trace`](https://index.ros.org/p/ros2trace/) and running: + + ```sh + ros2 trace -u 'ros2:*' 'r2r:*' + ``` + + The traces will be available in `$HOME/.ros/tracing/session-`. + +- Alternatively, you can trace your application directly with LTTng: + + ```sh + # Session name is an optional user-chosen name for the trace + lttng create [session-name] + lttng enable-event -u 'ros2:*,r2r:*' + lttng add-context -u --type=vtid --type=vpid --type=procname + lttng start + # Start the ROS system here. + # Let it run for as long as you want to trace it. + lttng destroy + ``` + + The traces will be available in `$HOME/lttng-traces/-`. diff --git a/r2r_tracing/build.rs b/r2r_tracing/build.rs new file mode 100644 index 000000000..2adef2ae4 --- /dev/null +++ b/r2r_tracing/build.rs @@ -0,0 +1,76 @@ +#[cfg(all(feature = "tracing", not(target_os = "linux")))] +compile_error!("Feature 'tracing' is only available on Linux."); + +#[cfg(not(feature = "tracing"))] +fn main() {} + +#[cfg(feature = "tracing")] +fn main() { + use tracing::{generate_r2r_tracepoints, generate_rclcpp_tracepoint_bindings}; + + generate_rclcpp_tracepoint_bindings(); + generate_r2r_tracepoints(); +} + +#[cfg(feature = "tracing")] +mod tracing { + use lttng_ust_generate::{CIntegerType, CTFType, Generator, Provider}; + use std::{env, path::PathBuf}; + + macro_rules! create_tracepoint { + ($provider:ident::$name:ident($($arg_name:ident: $arg_lttng_type:expr),* $(,)?)) => { + $provider.create_class(concat!(stringify!($name), "_class")) + $( + .add_field(stringify!($arg_name), $arg_lttng_type) + )* + .instantiate(stringify!($name)) + }; + } + + /// Generate bindings to the rclcpp tracepoints defined in the tracetools ros2 package. + pub(crate) fn generate_rclcpp_tracepoint_bindings() { + let bindings = r2r_common::setup_bindgen_builder() + .header("src/tracetools_wrapper.h") + .allowlist_function("ros_trace_rclcpp_.*") + .allowlist_function("ros_trace_callback_.*") + .generate_comments(true) + .generate() + .expect("Unable to generate bindings for tracetools"); + + println!("cargo:rustc-link-lib=tracetools"); + + // Write the bindings to the $OUT_DIR/tracetools_bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("tracetools_bindings.rs")) + .expect("Couldn't write tracetools bindings!"); + } + + pub(crate) fn generate_r2r_tracepoints() { + let mut r2r = Provider::new("r2r"); + + create_tracepoint!(r2r::spin_start( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + timeout_s: CTFType::Integer(CIntegerType::U64), + timeout_ns: CTFType::Integer(CIntegerType::U32), + )); + create_tracepoint!(r2r::spin_end( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + create_tracepoint!(r2r::spin_wake( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + create_tracepoint!(r2r::spin_timeout( + node_handle: CTFType::IntegerHex(CIntegerType::USize), + )); + + Generator::default() + .generated_lib_name("r2r_tracepoints_r2r") + .register_provider(r2r) + .output_file_name( + PathBuf::from(env::var("OUT_DIR").unwrap()).join("r2r_tracepoints.rs"), + ) + .generate() + .expect("Unable to generate tracepoint bindings for r2r"); + } +} diff --git a/r2r_tracing/src/lib.rs b/r2r_tracing/src/lib.rs new file mode 100644 index 000000000..ced4e925f --- /dev/null +++ b/r2r_tracing/src/lib.rs @@ -0,0 +1,16 @@ +//! Tracepoint provider for the `r2r` crate. + +#[cfg(feature = "tracing")] +mod r2r_tracepoints_bindings; + +#[cfg(feature = "tracing")] +mod tracetools_bindings; + +mod rclcpp_tracepoints; +pub use rclcpp_tracepoints::*; + +mod r2r_tracepoints; +pub use r2r_tracepoints::*; + +mod macros; +use macros::tracepoint_fn; diff --git a/r2r_tracing/src/macros.rs b/r2r_tracing/src/macros.rs new file mode 100644 index 000000000..8729744f3 --- /dev/null +++ b/r2r_tracing/src/macros.rs @@ -0,0 +1,26 @@ +/// Generates two versions of each function definition wrapped via this macro: +/// - One with tracing enabled: Adds a feature flag and copies the function signature and body as is. +/// - One with tracing disabled: Adds a feature flag and replaces the function body to be empty. +macro_rules! tracepoint_fn { + ($($(#[$atts:meta])* $vi:vis fn $name:ident$(<$generic:tt>)?($($arg_name:ident : $arg_ty:ty),* $(,)?) $(-> $ret_ty:ty)? $body:block)*) => { + $( + $(#[$atts])* + #[inline] + #[cfg(feature = "tracing")] + $vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? $body + + $(#[$atts])* + #[doc = ""] // Empty doc line to start a new paragraph. + #[doc = "Tracing is currently disabled so this function is a no-op. Enable `tracing` feature to enable tracing."] + #[inline(always)] + #[allow(unused_variables)] + #[cfg(not(feature = "tracing"))] + $vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? { + // Do nothing + } + )* + }; +} + +// set macro visibility to public in crate only +pub(crate) use tracepoint_fn; diff --git a/r2r_tracing/src/r2r_tracepoints.rs b/r2r_tracing/src/r2r_tracepoints.rs new file mode 100644 index 000000000..027468d41 --- /dev/null +++ b/r2r_tracing/src/r2r_tracepoints.rs @@ -0,0 +1,32 @@ +#[cfg(feature = "tracing")] +use crate::r2r_tracepoints_bindings::r2r as tp; +use crate::tracepoint_fn; +use r2r_rcl::rcl_node_t; +use std::time::Duration; + +tracepoint_fn! { +/// The `node` started spinning with given `timeout`. +pub fn trace_spin_start(node: *const rcl_node_t, timeout: Duration) { + let timeout_s = timeout.as_secs(); + let timeout_ns = timeout.subsec_nanos(); + + tp::spin_start(node as usize, timeout_s, timeout_ns); +} + +/// The `node` ended spinning function. +/// +/// If the spinning function ended by a timeout, use `trace_spin_timeout` instead. +pub fn trace_spin_end(node: *const rcl_node_t) { + tp::spin_end(node as usize); +} + +/// The `node` woke up from waiting on a wait set without reaching timeout. +pub fn trace_spin_wake(node: *const rcl_node_t) { + tp::spin_wake(node as usize); +} + +/// The `node` timeouted while spinning +pub fn trace_spin_timeout(node: *const rcl_node_t) { + tp::spin_timeout(node as usize); +} +} diff --git a/r2r_tracing/src/r2r_tracepoints_bindings.rs b/r2r_tracing/src/r2r_tracepoints_bindings.rs new file mode 100644 index 000000000..7f23d6c95 --- /dev/null +++ b/r2r_tracing/src/r2r_tracepoints_bindings.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/r2r_tracepoints.rs")); diff --git a/r2r_tracing/src/rclcpp_tracepoints.rs b/r2r_tracing/src/rclcpp_tracepoints.rs new file mode 100644 index 000000000..55b4e5c1f --- /dev/null +++ b/r2r_tracing/src/rclcpp_tracepoints.rs @@ -0,0 +1,158 @@ +use crate::tracepoint_fn; +use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t}; + +#[cfg(feature = "tracing")] +use crate::tracetools_bindings as tp; +#[cfg(feature = "tracing")] +use std::{ffi::CString, ptr::null}; + +#[cfg(feature = "tracing")] +const fn ref_to_c_void(t: &T) -> *const std::ffi::c_void { + std::ptr::from_ref(t).cast() +} + +#[cfg(feature = "tracing")] +macro_rules! c_void { + ($e:ident) => { + ($e) as *const std::ffi::c_void + }; +} + +// Documentation of tracepoints is based on https://github.com/ros2/ros2_tracing project. +// From file `tracetools/include/tracetools/tracetools.h` + +tracepoint_fn! { +/// Message publication. +/// +/// Records the pointer to the `message` being published at the `rclcpp`/`r2r` level. +/// +/// Tracepoint: `ros2::rclcpp_publish +// Lint allow note: The message pointer is NOT dereferenced. +#[allow(clippy::not_unsafe_ptr_arg_deref)] +pub fn trace_publish(message: *const std::ffi::c_void) { + unsafe { + // first argument documentation: + // publisher_handle not used, but kept for API/ABI stability + tp::ros_trace_rclcpp_publish(null(), message); + } +} + +/// Subscription object initialization. +/// +/// Tracepoint to allow associating the `subscription_handle` from `rcl` with the address of rust `subscription` reference. +/// There can be more than 1 `subscription` for 1 `subscription_handle`. +/// +/// Tracepoint: `ros2::rclcpp_subscription_init` +pub fn trace_subscription_init( + subscription_handle: *const rcl_subscription_t, subscription: &S, +) { + unsafe { + tp::ros_trace_rclcpp_subscription_init( + subscription_handle.cast(), + ref_to_c_void(subscription), + ); + } +} + +/// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object. +/// +/// Tracepoint: `ros2::rclcpp_subscription_callback_added` +pub fn trace_subscription_callback_added(subscription: &S, callback_id: usize) { + unsafe { + tp::ros_trace_rclcpp_subscription_callback_added( + ref_to_c_void(subscription), + c_void!(callback_id), + ); + } +} + +/// Message taking. +/// +/// Records the **reference** to the `message` being taken at the `rclcpp`/`r2r` level. +/// +/// To trace messages pointed to by void pointer use [`trace_take_ptr`]. +/// +/// Tracepoint: `ros2::rclcpp_take` +pub fn trace_take(message: &M) { + unsafe { + tp::ros_trace_rclcpp_take(ref_to_c_void(message)); + } +} + +/// Message taking. +/// +/// Records the **void pointer** to the `message` being taken at the `rclcpp`/`r2r` level. +/// +/// To trace messages by their reference use [`trace_take`]. +/// +/// Tracepoint: `ros2::rclcpp_take` +// Lint allow note: The message pointer is NOT dereferenced. +#[allow(clippy::not_unsafe_ptr_arg_deref)] +pub fn trace_take_ptr(message: *const std::ffi::c_void) { + unsafe { + tp::ros_trace_rclcpp_take(message); + } +} + +/// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`. +/// +/// Tracepoint: `ros2::rclcpp_service_callback_added` +pub fn trace_service_callback_added(service: *const rcl_service_t, callback_id: usize) { + unsafe { tp::ros_trace_rclcpp_service_callback_added(service.cast(), c_void!(callback_id)) } +} + +/// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle. +/// +/// Tracepoint: `ros2::rclcpp_timer_callback_added` +pub fn trace_timer_callback_added(timer: *const rcl_timer_t, callback_id: usize) { + unsafe { + tp::ros_trace_rclcpp_timer_callback_added(timer.cast(), c_void!(callback_id)); + } +} + +/// Tracepoint to allow associating the `timer` with a `node`. +/// +/// Tracepoint: `ros2::rclcpp_timer_link_node` +pub fn trace_timer_link_node(timer: *const rcl_timer_t, node: *const rcl_node_t) { + unsafe { + tp::ros_trace_rclcpp_timer_link_node(timer.cast(), node.cast()); + } +} + +/// Tracepoint to allow associating demangled `function_symbol` with a `callback_id`. +/// +/// Allocates memory to store the function symbol as a `CString`. +/// +/// Tracepoint: `ros2::callback_register` +/// +/// # Panics +/// If `function_symbol` contains a null byte. +pub fn trace_callback_register(callback_id: usize, function_symbol: &str) { + let function_symbol = CString::new(function_symbol) + .expect("r2r tracing: Cannot convert function_symbol to CString. It contains null byte."); + + unsafe { + tp::ros_trace_rclcpp_callback_register(c_void!(callback_id), function_symbol.as_ptr()); + } +} + +/// Start of a callback +/// +/// Set `is_intra_process` depending on whether this callback is done via intra-process or not +/// +/// Tracepoint: `ros2::callback_start` +pub fn trace_callback_start(callback_id: usize, is_intra_process: bool) { + unsafe { + tp::ros_trace_callback_start(c_void!(callback_id), is_intra_process); + } +} + +/// End of a callback. +/// +/// Tracepoint: `ros2::callback_end` +pub fn trace_callback_end(callback_id: usize) { + unsafe { + tp::ros_trace_callback_end(c_void!(callback_id)); + } +} +} diff --git a/r2r_tracing/src/tracetools_bindings.rs b/r2r_tracing/src/tracetools_bindings.rs new file mode 100644 index 000000000..dc33f2641 --- /dev/null +++ b/r2r_tracing/src/tracetools_bindings.rs @@ -0,0 +1,6 @@ +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(unused)] + +include!(concat!(env!("OUT_DIR"), "/tracetools_bindings.rs")); diff --git a/r2r_tracing/src/tracetools_wrapper.h b/r2r_tracing/src/tracetools_wrapper.h new file mode 100644 index 000000000..f50256eae --- /dev/null +++ b/r2r_tracing/src/tracetools_wrapper.h @@ -0,0 +1 @@ +#include From 2b4f9753d4255a0db1e82b89aeaed9c42688e9ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Wed, 7 May 2025 11:00:50 +0200 Subject: [PATCH 06/11] Create TracingId type The TracingId is used as a wrapper for the pointer to allow implementing Send and Sync traits. The pointer is used only as an ID and is never dereferenced, therefore, it is safe. --- r2r_tracing/src/lib.rs | 3 ++ r2r_tracing/src/rclcpp_tracepoints.rs | 10 ++-- r2r_tracing/src/tracing_id.rs | 77 +++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 r2r_tracing/src/tracing_id.rs diff --git a/r2r_tracing/src/lib.rs b/r2r_tracing/src/lib.rs index ced4e925f..b9ae86a56 100644 --- a/r2r_tracing/src/lib.rs +++ b/r2r_tracing/src/lib.rs @@ -12,5 +12,8 @@ pub use rclcpp_tracepoints::*; mod r2r_tracepoints; pub use r2r_tracepoints::*; +mod tracing_id; +pub use tracing_id::TracingId; + mod macros; use macros::tracepoint_fn; diff --git a/r2r_tracing/src/rclcpp_tracepoints.rs b/r2r_tracing/src/rclcpp_tracepoints.rs index 55b4e5c1f..25b855434 100644 --- a/r2r_tracing/src/rclcpp_tracepoints.rs +++ b/r2r_tracing/src/rclcpp_tracepoints.rs @@ -1,4 +1,4 @@ -use crate::tracepoint_fn; +use crate::{tracepoint_fn, TracingId}; use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t}; #[cfg(feature = "tracing")] @@ -104,18 +104,18 @@ pub fn trace_service_callback_added(service: *const rcl_service_t, callback_id: /// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle. /// /// Tracepoint: `ros2::rclcpp_timer_callback_added` -pub fn trace_timer_callback_added(timer: *const rcl_timer_t, callback_id: usize) { +pub fn trace_timer_callback_added(timer: TracingId, callback_id: usize) { unsafe { - tp::ros_trace_rclcpp_timer_callback_added(timer.cast(), c_void!(callback_id)); + tp::ros_trace_rclcpp_timer_callback_added(timer.c_void(), c_void!(callback_id)); } } /// Tracepoint to allow associating the `timer` with a `node`. /// /// Tracepoint: `ros2::rclcpp_timer_link_node` -pub fn trace_timer_link_node(timer: *const rcl_timer_t, node: *const rcl_node_t) { +pub fn trace_timer_link_node(timer: TracingId, node: TracingId) { unsafe { - tp::ros_trace_rclcpp_timer_link_node(timer.cast(), node.cast()); + tp::ros_trace_rclcpp_timer_link_node(timer.c_void(), node.c_void()); } } diff --git a/r2r_tracing/src/tracing_id.rs b/r2r_tracing/src/tracing_id.rs new file mode 100644 index 000000000..950f4dbf9 --- /dev/null +++ b/r2r_tracing/src/tracing_id.rs @@ -0,0 +1,77 @@ +/// Unique identifier for tracing purposes +#[derive(Debug)] +pub struct TracingId { + /// Pointer to the object used as a unique ID. + /// Safety: Do NOT dereference the pointer. + #[cfg(feature = "tracing")] + id: *const T, + + /// Marker for the type. Needed when `tracing` feature is disabled. + #[cfg(not(feature = "tracing"))] + _marker: std::marker::PhantomData, +} + +impl TracingId { + /// Creates new `TracingId` from the pointer. + /// + /// # Safety + /// The pointer is used as a unique ID so users must ensure that they never create `TracingId` + /// with same address for different objects. + /// + /// The pointer does not need to point to valid memory. + pub const unsafe fn new(_id: *const T) -> Self { + Self { + #[cfg(feature = "tracing")] + id: _id, + #[cfg(not(feature = "tracing"))] + _marker: std::marker::PhantomData, + } + } + + /// Erase the generic type of the ID. + #[must_use] + pub fn forget_type(self) -> TracingId { + #[cfg(not(feature = "tracing"))] + unsafe { + // Safety: The ID cannot be obtained back without the `tracing` feature. + TracingId::new(std::ptr::null()) + } + #[cfg(feature = "tracing")] + unsafe { + // Safety: self contains valid ID. + TracingId::new(self.c_void()) + } + } + + /// Obtain the address representing the ID. + /// + /// # Safety + /// Do NOT dereference the pointer. + #[cfg(feature = "tracing")] + pub(crate) const unsafe fn c_void(self) -> *const std::ffi::c_void { + self.id.cast::() + } +} + +/// Deriving Clone for `TracingId` would only derive it only conditionally based on whether the +/// `T` is `Clone` or not. But TracingId is independent of T. +impl Clone for TracingId { + fn clone(&self) -> Self { + *self + } +} + +/// Deriving Copy for `TracingId` would only derive it only conditionally based on whether the +/// `T` is `Copy` or not. But TracingId is independent of T. +impl Copy for TracingId {} + +/// # Safety +/// +/// The address is never dereferenced and is used only as a unique ID so it is safe to send to another thread. +unsafe impl Send for TracingId {} + +/// # Safety +/// +/// The `TracingId` does not allow interior mutability because the pointer is never dereferenced. +/// It is safe to use across multiple threads. +unsafe impl Sync for TracingId {} From 583c45c9eabda1f69d95fd0d5fdb4aae3845dde1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Wed, 9 Oct 2024 13:04:14 +0200 Subject: [PATCH 07/11] Add Callback type to allow tracing callback calls --- r2r_tracing/src/callback.rs | 80 +++++++++++++++++++++++++++++++++++++ r2r_tracing/src/lib.rs | 3 ++ 2 files changed, 83 insertions(+) create mode 100644 r2r_tracing/src/callback.rs diff --git a/r2r_tracing/src/callback.rs b/r2r_tracing/src/callback.rs new file mode 100644 index 000000000..589448564 --- /dev/null +++ b/r2r_tracing/src/callback.rs @@ -0,0 +1,80 @@ +use crate::{ + trace_callback_register, trace_service_callback_added, trace_subscription_callback_added, + trace_timer_callback_added, TracingId, +}; +use r2r_rcl::{rcl_service_t, rcl_timer_t}; +use std::{ + any::type_name, + marker::PhantomData, + sync::atomic::{AtomicUsize, Ordering::Relaxed}, +}; + +/// Tracing wrapper for callback +pub struct Callback +where + F: FnMut(M), +{ + func: F, + id: usize, + msg_type: PhantomData, +} + +impl Callback +where + F: FnMut(M), +{ + /// Generates unique ID for the callback + fn gen_id() -> usize { + static COUNTER: AtomicUsize = AtomicUsize::new(1); + COUNTER.fetch_add(1, Relaxed) + } + + fn new(callback: F, id: usize) -> Self { + trace_callback_register(id, type_name::()); + + Self { + func: callback, + id, + msg_type: PhantomData, + } + } + + /// Emits trace event associating this `callback` with the `service`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_service(service: *const rcl_service_t, callback: F) -> Self { + let id = Self::gen_id(); + trace_service_callback_added(service, id); + + Self::new(callback, id) + } + + /// Emits trace event associating this `callback` with the `timer`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_timer(timer: TracingId, callback: F) -> Self { + let id = Self::gen_id(); + trace_timer_callback_added(timer, id); + + Self::new(callback, id) + } + + /// Emits trace event associating this `callback` with the `subscription`. + /// + /// Wraps the callback to allow tracing the callback calls. + pub fn new_subscription(subscriber: &S, callback: F) -> Self { + let id = Self::gen_id(); + trace_subscription_callback_added(subscriber, id); + + Self::new(callback, id) + } + + /// Call the `callback`. + /// This emits `ros2:callback_start` and `ros2:callback_end` events at + /// the beginning and end respectively. + pub fn call(&mut self, msg: M) { + crate::trace_callback_start(self.id, false); + (self.func)(msg); + crate::trace_callback_end(self.id); + } +} diff --git a/r2r_tracing/src/lib.rs b/r2r_tracing/src/lib.rs index b9ae86a56..085dd7533 100644 --- a/r2r_tracing/src/lib.rs +++ b/r2r_tracing/src/lib.rs @@ -12,6 +12,9 @@ pub use rclcpp_tracepoints::*; mod r2r_tracepoints; pub use r2r_tracepoints::*; +mod callback; +pub use callback::Callback; + mod tracing_id; pub use tracing_id::TracingId; From 148e1de8ac72313840cc8e168846de85655d7864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Wed, 5 Mar 2025 11:53:07 +0100 Subject: [PATCH 08/11] Add tracepoints to r2r Location and arguments of tracepoints is consistent with rclcpp --- r2r/Cargo.toml | 2 ++ r2r/src/nodes.rs | 42 ++++++++++++++++++++++++++++++++++++++---- r2r/src/publishers.rs | 9 +++++++++ r2r/src/subscribers.rs | 9 +++++++++ r2r/src/time_source.rs | 19 +++++++++++++++++-- 5 files changed, 75 insertions(+), 6 deletions(-) diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index d7fd214a2..d6f8f84ea 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -23,6 +23,7 @@ r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" } r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" } r2r_actions = { path = "../r2r_actions", version = "0.9.5" } r2r_macros = { path = "../r2r_macros", version = "0.9.5" } +r2r_tracing = { path = "../r2r_tracing", version = "0.9.5" } uuid = { version = "1.2.2", features = ["serde", "v4"] } futures = "0.3.25" log = "0.4.18" @@ -50,6 +51,7 @@ prettyplease = "0.2.6" [features] save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen", "r2r_actions/save-bindgen"] doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", "r2r_actions/doc-only"] +tracing = ["r2r_tracing/tracing"] [package.metadata.docs.rs] features = ["doc-only"] diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index ef4d9a42d..c5c62f9e5 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -4,6 +4,7 @@ use futures::{ stream::{Stream, StreamExt}, }; use indexmap::IndexMap; +use r2r_tracing::TracingId; use std::{ collections::HashMap, ffi::{CStr, CString}, @@ -673,6 +674,9 @@ impl Node { qos_profile, )?; }; + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + self.subscribers.push(subscription); Ok(receiver) } @@ -706,6 +710,9 @@ impl Node { qos_profile, )?; }; + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + self.subscribers.push(subscription); Ok(receiver) } @@ -739,6 +746,9 @@ impl Node { qos_profile, )?; }; + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + self.subscribers.push(subscription); Ok(receiver) } @@ -796,6 +806,9 @@ impl Node { qos_profile, )?; }; + + r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + self.subscribers.push(subscription); Ok(receiver) } @@ -1101,6 +1114,8 @@ impl Node { /// `timeout` is a duration specifying how long the spin should /// block for if there are no pending events. pub fn spin_once(&mut self, timeout: Duration) { + r2r_tracing::trace_spin_start(&*self.node_handle, timeout); + // first handle any completed action cancellation responses for a in &mut self.action_servers { a.lock().unwrap().send_completed_cancel_requests(); @@ -1264,9 +1279,12 @@ impl Node { unsafe { rcl_wait_set_fini(&mut ws); } + r2r_tracing::trace_spin_timeout(&*self.node_handle); return; } + r2r_tracing::trace_spin_wake(&*self.node_handle); + let mut subs_to_remove = vec![]; if ws.subscriptions != std::ptr::null_mut() { let ws_subs = @@ -1423,6 +1441,8 @@ impl Node { unsafe { rcl_wait_set_fini(&mut ws); } + + r2r_tracing::trace_spin_end(&*self.node_handle); } /// Returns a map of topic names and type names of the publishers @@ -1526,9 +1546,15 @@ impl Node { _clock: Some(clock), // The timer owns the clock. sender: tx, }; - self.timers.push(timer); - let out_timer = Timer { receiver: rx }; + let out_timer = unsafe { + Timer { + receiver: rx, + timer_handle: TracingId::new(timer.get_handle()), + node_handle: TracingId::new(&*self.node_handle), + } + }; + self.timers.push(timer); Ok(out_timer) } @@ -1549,9 +1575,15 @@ impl Node { _clock: None, // The timer does not own the clock (the node owns it). sender: tx, }; - self.timers.push(timer); - let out_timer = Timer { receiver: rx }; + let out_timer = unsafe { + Timer { + receiver: rx, + timer_handle: TracingId::new(timer.get_handle()), + node_handle: TracingId::new(&*self.node_handle), + } + }; + self.timers.push(timer); Ok(out_timer) } @@ -1683,6 +1715,8 @@ impl Drop for Timer_ { /// A ROS timer. pub struct Timer { receiver: mpsc::Receiver, + timer_handle: TracingId, + node_handle: TracingId, } impl Timer { diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 658ecd1a2..332d80db3 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -182,6 +182,8 @@ impl PublisherUntyped { let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?; native_msg.from_json(msg)?; + r2r_tracing::trace_publish(native_msg.void_ptr()); + let result = unsafe { rcl_publish( &publisher.handle as *const rcl_publisher_t, @@ -220,6 +222,8 @@ impl PublisherUntyped { allocator: unsafe { rcutils_get_default_allocator() }, }; + r2r_tracing::trace_publish((&msg_buf as *const rcl_serialized_message_t).cast::()); + let result = unsafe { rcl_publish_serialized_message( &publisher.handle, @@ -277,6 +281,9 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + + r2r_tracing::trace_publish(native_msg.void_ptr()); + let result = unsafe { rcl_publish( &publisher.handle as *const rcl_publisher_t, @@ -363,6 +370,8 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + r2r_tracing::trace_publish(msg.void_ptr()); + let result = if msg.is_loaned { unsafe { // signal that we are relinquishing responsibility of the memory diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index e2a0419fe..10cb133aa 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -55,6 +55,8 @@ where rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { + r2r_tracing::trace_take_ptr(msg.void_ptr()); + let msg = T::from_native(&msg); if let Err(e) = self.sender.try_send(msg) { if e.is_disconnected() { @@ -138,6 +140,9 @@ where new_msg } }; + + r2r_tracing::trace_take_ptr(msg.void_ptr()); + if let Err(e) = self.sender.try_send(msg) { if e.is_disconnected() { // user dropped the handle to the stream, signal removal. @@ -168,6 +173,8 @@ impl Subscriber_ for UntypedSubscriber { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { + r2r_tracing::trace_take_ptr(msg.void_ptr()); + let json = msg.to_json(); if let Err(e) = self.sender.try_send(json) { if e.is_disconnected() { @@ -215,6 +222,8 @@ impl Subscriber_ for RawSubscriber { } }; + r2r_tracing::trace_take(&self.msg_buf); + if let Err(e) = self.sender.try_send(data_bytes) { if e.is_disconnected() { // user dropped the handle to the stream, signal removal. diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index bbbbb13e1..fd6d3a587 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -12,11 +12,14 @@ use r2r_rcl::{ rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK, }; -use std::sync::{Arc, Mutex, Weak}; +use std::{ + any::type_name, + sync::{Arc, Mutex, Weak}, +}; /// Provides time from `/clock` topic to attached ROS clocks /// -/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled. +/// By default, only clock used by ROS timers is attached and time from `/clock` topic is disabled. /// /// The time from `/clock` topic can be activated by either of these: /// - calling [`TimeSource::enable_sim_time`] @@ -213,6 +216,13 @@ impl TimeSourceSubscriber { )? }; + // Use TimeSource inner arc address as callback_id. We hope that this id is much larger value than + // any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1. + let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize; + r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber); + r2r_tracing::trace_subscription_callback_added(&*subscriber, callback_id); + r2r_tracing::trace_callback_register(callback_id, type_name::()); + Ok(subscriber) } } @@ -235,11 +245,16 @@ impl Subscriber_ for TimeSourceSubscriber { ) }; + r2r_tracing::trace_take_ptr(clock_msg.void_ptr()); + let mut inner_time_source = self.time_source.inner.lock().unwrap(); if ret == RCL_RET_OK as i32 { let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg); + let time_source_ptr = Arc::as_ptr(&self.time_source.inner) as usize; + r2r_tracing::trace_callback_start(time_source_ptr, false); inner_time_source.set_clock_time(msg.clock); + r2r_tracing::trace_callback_end(time_source_ptr); } match inner_time_source.subscriber_state { From f77574c3a9f0d41c2aa728e884885bd764a80b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Wed, 7 May 2025 11:43:36 +0200 Subject: [PATCH 09/11] Create to Stream wrapper with traced_callback method The StreamWithTracingData wrapper implements both Stream and FusedStream similarly to Receiver --- r2r/src/lib.rs | 1 + r2r/src/nodes.rs | 49 ++++++++-- r2r/src/time_source.rs | 6 +- r2r_tracing/Cargo.toml | 1 + r2r_tracing/src/callback.rs | 5 +- r2r_tracing/src/lib.rs | 3 + r2r_tracing/src/rclcpp_tracepoints.rs | 13 ++- r2r_tracing/src/stream.rs | 136 ++++++++++++++++++++++++++ 8 files changed, 200 insertions(+), 14 deletions(-) create mode 100644 r2r_tracing/src/stream.rs diff --git a/r2r/src/lib.rs b/r2r/src/lib.rs index 904485a7b..4b52aee1c 100644 --- a/r2r/src/lib.rs +++ b/r2r/src/lib.rs @@ -124,6 +124,7 @@ pub use clocks::{Clock, ClockType}; mod nodes; pub use nodes::{Node, Timer}; +pub use r2r_tracing::StreamWithTracingData; pub mod qos; diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index c5c62f9e5..a7a74a7cb 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -4,7 +4,7 @@ use futures::{ stream::{Stream, StreamExt}, }; use indexmap::IndexMap; -use r2r_tracing::TracingId; +use r2r_tracing::{StreamWithTracingData, StreamWithTracingDataBuilder, TracingId}; use std::{ collections::HashMap, ffi::{CStr, CString}, @@ -650,7 +650,7 @@ impl Node { /// This function returns a `Stream` of ros messages. pub fn subscribe( &mut self, topic: &str, qos_profile: QosProfile, - ) -> Result + Unpin> + ) -> Result> where T: WrappedTypesupport, { @@ -677,6 +677,10 @@ impl Node { r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&*subscription).forget_type() + }); + self.subscribers.push(subscription); Ok(receiver) } @@ -686,7 +690,7 @@ impl Node { /// This function returns a `Stream` of ros messages without the rust convenience types. pub fn subscribe_native( &mut self, topic: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> + ) -> Result>> where T: WrappedTypesupport, { @@ -713,6 +717,10 @@ impl Node { r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + self.subscribers.push(subscription); Ok(receiver) } @@ -723,7 +731,7 @@ impl Node { /// Useful when you cannot know the type of the message at compile time. pub fn subscribe_untyped( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> { + ) -> Result>> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let (sender, receiver) = mpsc::channel::>(10); @@ -749,6 +757,10 @@ impl Node { r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + self.subscribers.push(subscription); Ok(receiver) } @@ -759,7 +771,7 @@ impl Node { /// Useful if you just want to pass the data along to another part of the system. pub fn subscribe_raw( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> { + ) -> Result>> { // TODO is it possible to handle the raw message without type support? // // Passing null ts to rcl_subscription_init throws an error .. @@ -809,6 +821,10 @@ impl Node { r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription); + let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe { + TracingId::new(&subscription.rcl_handle).forget_type() + }); + self.subscribers.push(subscription); Ok(receiver) } @@ -819,7 +835,7 @@ impl Node { /// `respond` on the Service Request to send the reply. pub fn create_service( &mut self, service_name: &str, qos_profile: QosProfile, - ) -> Result> + Unpin> + ) -> Result>> where T: WrappedServiceTypeSupport, { @@ -848,6 +864,10 @@ impl Node { )?; }; + let receiver = StreamWithTracingDataBuilder::build_service(receiver, unsafe { + TracingId::new(&service_ref.rcl_handle) + }); + // Only push after full initialization. self.services.push(service_arc); Ok(receiver) @@ -1731,6 +1751,23 @@ impl Timer { Err(Error::RCL_RET_TIMER_INVALID) } } + + /// Transforms this timer stream to a [`Future`] calling the given `callback` on each tick. + /// + /// The callback execution is traced by r2r_tracing. + /// + /// This function should be called before dropping the timer's node. + /// Otherwise, the trace data might be inconsistent. + #[must_use = "Futures do nothing unless you `.await` or poll them"] + pub fn on_tick(self, callback: F) -> impl Future + Unpin { + let mut callback = r2r_tracing::Callback::new_timer(self.timer_handle, callback); + r2r_tracing::trace_timer_link_node(self.timer_handle, self.node_handle); + + self.receiver.for_each(move |duration| { + callback.call(duration); + future::ready(()) + }) + } } // Since publishers are temporarily upgraded to owners during the diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index fd6d3a587..4b2623c5a 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -12,6 +12,7 @@ use r2r_rcl::{ rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK, }; +use r2r_tracing::TracingId; use std::{ any::type_name, sync::{Arc, Mutex, Weak}, @@ -220,7 +221,10 @@ impl TimeSourceSubscriber { // any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1. let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize; r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber); - r2r_tracing::trace_subscription_callback_added(&*subscriber, callback_id); + r2r_tracing::trace_subscription_callback_added( + unsafe { TracingId::new(&*subscriber).forget_type() }, + callback_id, + ); r2r_tracing::trace_callback_register(callback_id, type_name::()); Ok(subscriber) diff --git a/r2r_tracing/Cargo.toml b/r2r_tracing/Cargo.toml index 41313244b..29d148c22 100644 --- a/r2r_tracing/Cargo.toml +++ b/r2r_tracing/Cargo.toml @@ -9,6 +9,7 @@ repository = "https://github.com/sequenceplanner/r2r" readme = "README.md" [dependencies] +futures = "0.3.25" r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" } [build-dependencies] diff --git a/r2r_tracing/src/callback.rs b/r2r_tracing/src/callback.rs index 589448564..40082ac10 100644 --- a/r2r_tracing/src/callback.rs +++ b/r2r_tracing/src/callback.rs @@ -5,6 +5,7 @@ use crate::{ use r2r_rcl::{rcl_service_t, rcl_timer_t}; use std::{ any::type_name, + ffi::c_void, marker::PhantomData, sync::atomic::{AtomicUsize, Ordering::Relaxed}, }; @@ -42,7 +43,7 @@ where /// Emits trace event associating this `callback` with the `service`. /// /// Wraps the callback to allow tracing the callback calls. - pub fn new_service(service: *const rcl_service_t, callback: F) -> Self { + pub fn new_service(service: TracingId, callback: F) -> Self { let id = Self::gen_id(); trace_service_callback_added(service, id); @@ -62,7 +63,7 @@ where /// Emits trace event associating this `callback` with the `subscription`. /// /// Wraps the callback to allow tracing the callback calls. - pub fn new_subscription(subscriber: &S, callback: F) -> Self { + pub fn new_subscription(subscriber: TracingId, callback: F) -> Self { let id = Self::gen_id(); trace_subscription_callback_added(subscriber, id); diff --git a/r2r_tracing/src/lib.rs b/r2r_tracing/src/lib.rs index 085dd7533..7d52566f3 100644 --- a/r2r_tracing/src/lib.rs +++ b/r2r_tracing/src/lib.rs @@ -15,6 +15,9 @@ pub use r2r_tracepoints::*; mod callback; pub use callback::Callback; +mod stream; +pub use stream::{StreamWithTracingData, StreamWithTracingDataBuilder}; + mod tracing_id; pub use tracing_id::TracingId; diff --git a/r2r_tracing/src/rclcpp_tracepoints.rs b/r2r_tracing/src/rclcpp_tracepoints.rs index 25b855434..655276d77 100644 --- a/r2r_tracing/src/rclcpp_tracepoints.rs +++ b/r2r_tracing/src/rclcpp_tracepoints.rs @@ -4,7 +4,10 @@ use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t}; #[cfg(feature = "tracing")] use crate::tracetools_bindings as tp; #[cfg(feature = "tracing")] -use std::{ffi::CString, ptr::null}; +use std::{ + ffi::{c_void, CString}, + ptr::null, +}; #[cfg(feature = "tracing")] const fn ref_to_c_void(t: &T) -> *const std::ffi::c_void { @@ -57,10 +60,10 @@ pub fn trace_subscription_init( /// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object. /// /// Tracepoint: `ros2::rclcpp_subscription_callback_added` -pub fn trace_subscription_callback_added(subscription: &S, callback_id: usize) { +pub fn trace_subscription_callback_added(subscription: TracingId, callback_id: usize) { unsafe { tp::ros_trace_rclcpp_subscription_callback_added( - ref_to_c_void(subscription), + subscription.c_void(), c_void!(callback_id), ); } @@ -97,8 +100,8 @@ pub fn trace_take_ptr(message: *const std::ffi::c_void) { /// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`. /// /// Tracepoint: `ros2::rclcpp_service_callback_added` -pub fn trace_service_callback_added(service: *const rcl_service_t, callback_id: usize) { - unsafe { tp::ros_trace_rclcpp_service_callback_added(service.cast(), c_void!(callback_id)) } +pub fn trace_service_callback_added(service: TracingId, callback_id: usize) { + unsafe { tp::ros_trace_rclcpp_service_callback_added(service.c_void(), c_void!(callback_id)) } } /// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle. diff --git a/r2r_tracing/src/stream.rs b/r2r_tracing/src/stream.rs new file mode 100644 index 000000000..fc2189962 --- /dev/null +++ b/r2r_tracing/src/stream.rs @@ -0,0 +1,136 @@ +use std::{ + ffi::c_void, + future::{self, Future}, +}; + +use futures::{channel::mpsc::Receiver, stream::FusedStream, Stream, StreamExt as _}; +use r2r_rcl::rcl_service_t; + +#[cfg(feature = "tracing")] +use crate::Callback; +use crate::TracingId; + +/// A stream wrapper containing tracing data. +/// +/// When the `tracing` feature is enabled, you can use the [`Self::traced_callback`] method +/// to trace the execution of the callback. Stream polling is not traced. +#[derive(Debug)] +pub struct StreamWithTracingData { + stream: Receiver, + + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType, +} + +#[cfg(feature = "tracing")] +#[derive(Debug, Clone, Copy)] +enum TracingIdWithType { + // The type of subscription is `c_void` because the actual type would + // be a generic R2R subscriber and not `rcl_subscription_t`. + Subscription(TracingId), + Service(TracingId), +} + +impl StreamWithTracingData { + /// Converts the stream into a future that calls the provided callback for each message. + /// + /// If `tracing` feature is enabled: + /// - Each time the callback is called, its start and end time will be traced by the + /// ROS 2 tracing framework. + /// - You should not poll the stream before calling this method because + /// otherwise it might confuse software that analyzes the trace. + pub fn traced_callback(self, callback: C) -> impl Future + Unpin + where + C: FnMut(T), + { + #[cfg(feature = "tracing")] + { + let mut callback_wrapper = match self.tracing_id { + TracingIdWithType::Subscription(id) => Callback::new_subscription(id, callback), + TracingIdWithType::Service(id) => Callback::new_service(id, callback), + }; + + self.stream.for_each(move |msg| { + callback_wrapper.call(msg); + future::ready(()) + }) + } + #[cfg(not(feature = "tracing"))] + { + let mut callback = callback; + self.stream.for_each(move |msg| { + callback(msg); + future::ready(()) + }) + } + } +} + +impl Stream for StreamWithTracingData { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + this.stream.poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl FusedStream for StreamWithTracingData { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +/// A builder for `StreamWithTracingData`. +/// +/// This struct exists to allow creation of `StreamWithTracingData` without polluting its public API. +/// It is used internally by r2r and should not be reexported. +pub struct StreamWithTracingDataBuilder; + +impl StreamWithTracingDataBuilder { + #[must_use] + #[allow( + clippy::not_unsafe_ptr_arg_deref, + reason = "The pointer is not dereferenced" + )] + #[cfg_attr( + not(feature = "tracing"), + expect( + unused_variables, + reason = "service_id is not saved if tracing is disabled" + ) + )] + pub fn build_service( + stream: Receiver, service_id: TracingId, + ) -> StreamWithTracingData { + StreamWithTracingData { + stream, + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType::Service(service_id), + } + } + + #[must_use] + #[cfg_attr( + not(feature = "tracing"), + expect( + unused_variables, + reason = "subscription_id is not saved if tracing is disabled" + ) + )] + pub fn build_subscription( + stream: Receiver, subscription_id: TracingId, + ) -> StreamWithTracingData { + StreamWithTracingData { + stream, + #[cfg(feature = "tracing")] + tracing_id: TracingIdWithType::Subscription(subscription_id), + } + } +} From c77b0017621345c6c06e7f43c396d2c6b2f3c85c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 18:06:11 +0100 Subject: [PATCH 10/11] Add tracing to parameter handling services --- r2r/src/nodes.rs | 349 +++++++++++++++++++++++------------------------ 1 file changed, 171 insertions(+), 178 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index a7a74a7cb..a9c4f5487 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -291,151 +291,143 @@ impl Node { let node_name = self.name()?; // rcl_interfaces/srv/SetParameters - let set_params_request_stream = self + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let set_params_future = self .create_service::( &format!("{}/set_parameters", node_name), QosProfile::default(), - )?; - - let params = self.params.clone(); - let params_struct_clone = params_struct.clone(); - let set_params_future = set_params_request_stream.for_each( - move |req: ServiceRequest| { - let mut result = rcl_interfaces::srv::SetParameters::Response::default(); - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - let changed = params - .lock() - .unwrap() - .get(&p.name) - .map(|v| v.value != val) - .unwrap_or(true); // changed=true if new - let r = if let Some(ps) = ¶ms_struct_clone { - // Update parameter structure - let result = ps.lock().unwrap().set_parameter(&p.name, &val); - if result.is_ok() { - // Also update Node::params + )? + .traced_callback( + move |req: ServiceRequest| { + let mut result = rcl_interfaces::srv::SetParameters::Response::default(); + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params params .lock() .unwrap() .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()); - } - rcl_interfaces::msg::SetParametersResult { - successful: result.is_ok(), - reason: result.err().map_or("".into(), |e| e.to_string()), - } - } else { - // No parameter structure - update only Node::params - params - .lock() - .unwrap() - .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()) - .or_insert(Parameter::new(val.clone())); - rcl_interfaces::msg::SetParametersResult { - successful: true, - reason: "".into(), - } - }; - // if the value changed, send out new value on parameter event stream - if changed && r.successful { - if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { - log::debug!("Warning: could not send parameter event ({}).", e); + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { + log::debug!("Warning: could not send parameter event ({}).", e); + } } + result.results.push(r); } - result.results.push(r); - } - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); handlers.push(Box::pin(set_params_future)); // rcl_interfaces/srv/GetParameters - let get_params_request_stream = self + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let get_params_future = self .create_service::( &format!("{}/get_parameters", node_name), QosProfile::default(), - )?; - - let params = self.params.clone(); - let params_struct_clone = params_struct.clone(); - let get_params_future = get_params_request_stream.for_each( - move |req: ServiceRequest| { - let params = params.lock().unwrap(); - let values = req - .message - .names - .iter() - .map(|n| { - // First try to get the parameter from the param structure - if let Some(ps) = ¶ms_struct_clone { - if let Ok(value) = ps.lock().unwrap().get_parameter(n) { - return value; + )? + .traced_callback( + move |req: ServiceRequest| { + let params = params.lock().unwrap(); + let values = req + .message + .names + .iter() + .map(|n| { + // First try to get the parameter from the param structure + if let Some(ps) = ¶ms_struct_clone { + if let Ok(value) = ps.lock().unwrap().get_parameter(n) { + return value; + } } - } - // Otherwise get it from node HashMap - match params.get(n) { - Some(v) => v.value.clone(), - None => ParameterValue::NotSet, - } - }) - .map(|v| v.into_parameter_value_msg()) - .collect::>(); + // Otherwise get it from node HashMap + match params.get(n) { + Some(v) => v.value.clone(), + None => ParameterValue::NotSet, + } + }) + .map(|v| v.into_parameter_value_msg()) + .collect::>(); - let result = rcl_interfaces::srv::GetParameters::Response { values }; - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + let result = rcl_interfaces::srv::GetParameters::Response { values }; + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); handlers.push(Box::pin(get_params_future)); + let params = self.params.clone(); + // rcl_interfaces/srv/ListParameters use rcl_interfaces::srv::ListParameters; - let list_params_request_stream = self.create_service::( - &format!("{}/list_parameters", node_name), - QosProfile::default(), - )?; - - let params = self.params.clone(); - let list_params_future = list_params_request_stream.for_each( - move |req: ServiceRequest| { + let list_params_future = self + .create_service::( + &format!("{}/list_parameters", node_name), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { Self::handle_list_parameters(req, ¶ms) - }, - ); + }); handlers.push(Box::pin(list_params_future)); // rcl_interfaces/srv/DescribeParameters use rcl_interfaces::srv::DescribeParameters; - let desc_params_request_stream = self.create_service::( - &format!("{node_name}/describe_parameters"), - QosProfile::default(), - )?; - let params = self.params.clone(); - let desc_params_future = desc_params_request_stream.for_each( - move |req: ServiceRequest| { + let desc_params_future = self + .create_service::( + &format!("{node_name}/describe_parameters"), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { Self::handle_desc_parameters(req, ¶ms) - }, - ); + }); handlers.push(Box::pin(desc_params_future)); // rcl_interfaces/srv/GetParameterTypes use rcl_interfaces::srv::GetParameterTypes; - let get_param_types_request_stream = self.create_service::( - &format!("{node_name}/get_parameter_types"), - QosProfile::default(), - )?; - let params = self.params.clone(); - let get_param_types_future = get_param_types_request_stream.for_each( - move |req: ServiceRequest| { + let get_param_types_future = self + .create_service::( + &format!("{node_name}/get_parameter_types"), + QosProfile::default(), + )? + .traced_callback(move |req: ServiceRequest| { let params = params.lock().unwrap(); let types = req .message @@ -448,86 +440,89 @@ impl Node { .collect(); req.respond(GetParameterTypes::Response { types }) .expect("could not send reply to get parameter types request"); - future::ready(()) - }, - ); + }); handlers.push(Box::pin(get_param_types_future)); // rcl_interfaces/srv/SetParametersAtomically - let set_params_atomically_request_stream = - self.create_service::( - &format!("{}/set_parameters_atomically", node_name), - QosProfile::default(), - )?; - let params = self.params.clone(); let params_struct_clone = params_struct.clone(); - let set_params_atomically_future = set_params_atomically_request_stream.for_each( - move |req: ServiceRequest| { - let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default(); - result.result.successful = true; - if let Some(ps) = ¶ms_struct_clone { - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) { - result.result.successful = false; - result.result.reason = format!("Can't set parameter {}: {}", p.name, e); - break; + let set_params_atomically_future = self + .create_service::( + &format!("{}/set_parameters_atomically", node_name), + QosProfile::default(), + )? + .traced_callback( + move |req: ServiceRequest< + rcl_interfaces::srv::SetParametersAtomically::Service, + >| { + let mut result = + rcl_interfaces::srv::SetParametersAtomically::Response::default(); + result.result.successful = true; + if let Some(ps) = ¶ms_struct_clone { + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) { + result.result.successful = false; + result.result.reason = + format!("Can't set parameter {}: {}", p.name, e); + break; + } } } - } - if result.result.successful { - // Since we checked them above now we assume these will be set ok... - for p in &req.message.parameters { - let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - let changed = params - .lock() - .unwrap() - .get(&p.name) - .map(|v| v.value != val) - .unwrap_or(true); // changed=true if new - let r = if let Some(ps) = ¶ms_struct_clone { - // Update parameter structure - let result = ps.lock().unwrap().set_parameter(&p.name, &val); - if result.is_ok() { - // Also update Node::params + if result.result.successful { + // Since we checked them above now we assume these will be set ok... + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params params .lock() .unwrap() .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()); - } - rcl_interfaces::msg::SetParametersResult { - successful: result.is_ok(), - reason: result.err().map_or("".into(), |e| e.to_string()), - } - } else { - // No parameter structure - update only Node::params - params - .lock() - .unwrap() - .entry(p.name.clone()) - .and_modify(|p| p.value = val.clone()) - .or_insert(Parameter::new(val.clone())); - rcl_interfaces::msg::SetParametersResult { - successful: true, - reason: "".into(), - } - }; - // if the value changed, send out new value on parameter event stream - if changed && r.successful { - if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) { - log::debug!("Warning: could not send parameter event ({}).", e); + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = + set_atomically_event_tx.try_send((p.name.clone(), val)) + { + log::debug!("Warning: could not send parameter event ({}).", e); + } } } } - } - req.respond(result) - .expect("could not send reply to set parameter request"); - future::ready(()) - }, - ); + req.respond(result) + .expect("could not send reply to set parameter request"); + }, + ); + handlers.push(Box::pin(set_params_atomically_future)); #[cfg(r2r__rosgraph_msgs__msg__Clock)] @@ -560,7 +555,7 @@ impl Node { fn handle_list_parameters( req: ServiceRequest, params: &Arc>>, - ) -> future::Ready<()> { + ) { use rcl_interfaces::srv::ListParameters; let depth = req.message.depth; @@ -597,13 +592,12 @@ impl Node { } req.respond(ListParameters::Response { result }) .expect("could not send reply to list parameter request"); - future::ready(()) } fn handle_desc_parameters( req: ServiceRequest, params: &Arc>>, - ) -> future::Ready<()> { + ) { use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters}; let mut descriptors = Vec::::new(); let params = params.lock().unwrap(); @@ -619,7 +613,6 @@ impl Node { } req.respond(DescribeParameters::Response { descriptors }) .expect("could not send reply to describe parameters request"); - future::ready(()) } /// Fetch a single ROS parameter. From 8aa9e89243536ee5670561b5ecaf7bb739721c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0koudlil?= Date: Tue, 4 Mar 2025 18:04:49 +0100 Subject: [PATCH 11/11] Add example showing usage of callback tracing --- r2r/examples/callback_tracing.rs | 64 ++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 r2r/examples/callback_tracing.rs diff --git a/r2r/examples/callback_tracing.rs b/r2r/examples/callback_tracing.rs new file mode 100644 index 000000000..51433b279 --- /dev/null +++ b/r2r/examples/callback_tracing.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use futures::{executor::LocalPool, task::LocalSpawnExt}; + +use r2r::{std_msgs::msg, std_srvs::srv, QosProfile}; + +/// This example demonstrates creation of a service, +/// subscriber and timers with their callback execution traced. +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + // The traced callback is supplied directly to `subscribe_traced` + // and `create_service_traced`functions. + let subscriber_future = node + .subscribe("/print", QosProfile::default())? + .traced_callback(|msg: msg::String| { + println!("Received message: '{}'", msg.data); + }); + spawner.spawn_local(subscriber_future)?; + + let mut value = false; + let service_future = node + .create_service::("/set_value", QosProfile::default())? + .traced_callback(move |req| { + if value == req.message.data { + req.respond(srv::SetBool::Response { + success: false, + message: format!("Value is already {value}."), + }) + .expect("could not send service response"); + } else { + value = req.message.data; + req.respond(srv::SetBool::Response { + success: true, + message: format!("Value set to {value}."), + }) + .expect("could not send service response"); + } + }); + spawner.spawn_local(service_future)?; + + let mut counter = 0; + let wall_timer_future = + node.create_wall_timer(Duration::from_millis(500))? + .on_tick(move |_| { + counter += 1; + println!("Wall timer tick: {counter}"); + }); + spawner.spawn_local(wall_timer_future)?; + + let ros_timer_future = node.create_timer(Duration::from_secs(1))?.on_tick(|diff| { + println!("ROS timer tick. Time elapsed since last tick: {diff:?}"); + }); + spawner.spawn_local(ros_timer_future)?; + + loop { + node.spin_once(std::time::Duration::from_millis(5)); + pool.run_until_stalled(); + } +}