From ab269e8b4dc7c691b97f5a2ad768b2d844d3a81d Mon Sep 17 00:00:00 2001 From: Desmond Germans Date: Wed, 9 Apr 2025 15:42:49 +0200 Subject: [PATCH 1/4] added debug statements --- r2r/src/nodes.rs | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index c797f1df6..a45db95bc 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -283,9 +283,15 @@ impl Node { .register_parameters("", None, &mut self.params.lock().unwrap())?; } let mut handlers: Vec + Send>>> = Vec::new(); + + log::info!("make_parameter_handler: creating channel"); + let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); let node_name = self.name()?; + + log::info!("make_parameter_handler: creating {}/set_parameters service", node_name); + let set_params_request_stream = self .create_service::( &format!("{}/set_parameters", node_name), @@ -296,6 +302,10 @@ impl Node { let params_struct_clone = params_struct.clone(); let set_params_future = set_params_request_stream.for_each( move |req: ServiceRequest| { + log::info!( + "make_parameter_handler: set_parameters service called: {:?}", + req.message + ); let mut result = rcl_interfaces::srv::SetParameters::Response::default(); for p in &req.message.parameters { let val = ParameterValue::from_parameter_value_msg(p.value.clone()); @@ -348,6 +358,8 @@ impl Node { ); handlers.push(Box::pin(set_params_future)); + log::info!("make_parameter_handler: created {}/get_parameters service", node_name); + // rcl_interfaces/srv/GetParameters let get_params_request_stream = self .create_service::( @@ -359,6 +371,11 @@ impl Node { let params_struct_clone = params_struct.clone(); let get_params_future = get_params_request_stream.for_each( move |req: ServiceRequest| { + log::info!( + "make_parameter_handler: get_parameters service called: {:?}", + req.message + ); + let params = params.lock().unwrap(); let values = req .message @@ -389,6 +406,8 @@ impl Node { handlers.push(Box::pin(get_params_future)); + log::info!("make_parameter_handler: creating {}/list_parameters service", node_name); + // rcl_interfaces/srv/ListParameters use rcl_interfaces::srv::ListParameters; let list_params_request_stream = self.create_service::( @@ -399,12 +418,18 @@ impl Node { let params = self.params.clone(); let list_params_future = list_params_request_stream.for_each( move |req: ServiceRequest| { + log::info!( + "make_parameter_handler: list_parameters service called: {:?}", + req.message + ); Self::handle_list_parameters(req, ¶ms) }, ); handlers.push(Box::pin(list_params_future)); + log::info!("make_parameter_handler: created {}/describe_parameters service", node_name); + // rcl_interfaces/srv/DescribeParameters use rcl_interfaces::srv::DescribeParameters; let desc_params_request_stream = self.create_service::( @@ -415,12 +440,18 @@ impl Node { let params = self.params.clone(); let desc_params_future = desc_params_request_stream.for_each( move |req: ServiceRequest| { + log::info!( + "make_parameter_handler: describe_parameters service called: {:?}", + req.message + ); Self::handle_desc_parameters(req, ¶ms) }, ); handlers.push(Box::pin(desc_params_future)); + log::info!("make_parameter_handler: creating {}/get_parameter_types service", node_name); + // rcl_interfaces/srv/GetParameterTypes use rcl_interfaces::srv::GetParameterTypes; let get_param_types_request_stream = self.create_service::( @@ -431,6 +462,10 @@ impl Node { let params = self.params.clone(); let get_param_types_future = get_param_types_request_stream.for_each( move |req: ServiceRequest| { + log::info!( + "make_parameter_handler: get_parameter_types service called: {:?}", + req.message + ); let params = params.lock().unwrap(); let types = req .message @@ -472,6 +507,8 @@ impl Node { } } + log::info!("make_parameter_handler should be done now"); + // we don't care about the result, the futures will not complete anyway. Ok((join_all(handlers).map(|_| ()), event_rx)) } @@ -909,7 +946,9 @@ impl Node { pub fn destroy_publisher(&mut self, p: Publisher) { if let Some(handle) = p.handle.upgrade() { // Remove handle from list of publishers. - self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle)) + self.pubs + .iter() + .position(|p| Arc::ptr_eq(p, &handle)) .map(|i| self.pubs.swap_remove(i)); let handle = wait_until_unwrapped(handle); @@ -921,7 +960,9 @@ impl Node { pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) { if let Some(handle) = p.handle.upgrade() { // Remove handle from list of publishers. - self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle)) + self.pubs + .iter() + .position(|p| Arc::ptr_eq(p, &handle)) .map(|i| self.pubs.swap_remove(i)); let handle = wait_until_unwrapped(handle); From 2e5ac337be5608b8a7287719456c2a086a567d56 Mon Sep 17 00:00:00 2001 From: Desmond Germans Date: Wed, 9 Apr 2025 15:45:41 +0200 Subject: [PATCH 2/4] using println to see if that matters --- r2r/src/nodes.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index a45db95bc..5543555b3 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -302,7 +302,7 @@ impl Node { let params_struct_clone = params_struct.clone(); let set_params_future = set_params_request_stream.for_each( move |req: ServiceRequest| { - log::info!( + println!( "make_parameter_handler: set_parameters service called: {:?}", req.message ); @@ -358,7 +358,7 @@ impl Node { ); handlers.push(Box::pin(set_params_future)); - log::info!("make_parameter_handler: created {}/get_parameters service", node_name); + println!("make_parameter_handler: created {}/get_parameters service", node_name); // rcl_interfaces/srv/GetParameters let get_params_request_stream = self @@ -371,7 +371,7 @@ impl Node { let params_struct_clone = params_struct.clone(); let get_params_future = get_params_request_stream.for_each( move |req: ServiceRequest| { - log::info!( + println!( "make_parameter_handler: get_parameters service called: {:?}", req.message ); @@ -406,7 +406,7 @@ impl Node { handlers.push(Box::pin(get_params_future)); - log::info!("make_parameter_handler: creating {}/list_parameters service", node_name); + println!("make_parameter_handler: creating {}/list_parameters service", node_name); // rcl_interfaces/srv/ListParameters use rcl_interfaces::srv::ListParameters; @@ -418,7 +418,7 @@ impl Node { let params = self.params.clone(); let list_params_future = list_params_request_stream.for_each( move |req: ServiceRequest| { - log::info!( + println!( "make_parameter_handler: list_parameters service called: {:?}", req.message ); @@ -428,7 +428,7 @@ impl Node { handlers.push(Box::pin(list_params_future)); - log::info!("make_parameter_handler: created {}/describe_parameters service", node_name); + println!("make_parameter_handler: created {}/describe_parameters service", node_name); // rcl_interfaces/srv/DescribeParameters use rcl_interfaces::srv::DescribeParameters; @@ -440,7 +440,7 @@ impl Node { let params = self.params.clone(); let desc_params_future = desc_params_request_stream.for_each( move |req: ServiceRequest| { - log::info!( + println!( "make_parameter_handler: describe_parameters service called: {:?}", req.message ); @@ -450,7 +450,7 @@ impl Node { handlers.push(Box::pin(desc_params_future)); - log::info!("make_parameter_handler: creating {}/get_parameter_types service", node_name); + println!("make_parameter_handler: creating {}/get_parameter_types service", node_name); // rcl_interfaces/srv/GetParameterTypes use rcl_interfaces::srv::GetParameterTypes; @@ -462,7 +462,7 @@ impl Node { let params = self.params.clone(); let get_param_types_future = get_param_types_request_stream.for_each( move |req: ServiceRequest| { - log::info!( + println!( "make_parameter_handler: get_parameter_types service called: {:?}", req.message ); @@ -507,7 +507,7 @@ impl Node { } } - log::info!("make_parameter_handler should be done now"); + println!("make_parameter_handler should be done now"); // we don't care about the result, the futures will not complete anyway. Ok((join_all(handlers).map(|_| ()), event_rx)) From 80e4f2628d112a21ee03ce17ce8398219870a4b7 Mon Sep 17 00:00:00 2001 From: Desmond Germans Date: Fri, 11 Apr 2025 13:12:10 +0200 Subject: [PATCH 3/4] found the problem, added `/set_parameters_atomically` service to `make_parameter_handler_internal` in `nodes.rs`. --- r2r/src/nodes.rs | 108 +++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 37 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 5543555b3..cbe956265 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -284,14 +284,12 @@ impl Node { } let mut handlers: Vec + Send>>> = Vec::new(); - log::info!("make_parameter_handler: creating channel"); - - let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); + let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); + let mut set_atomically_event_tx = set_event_tx.clone(); let node_name = self.name()?; - log::info!("make_parameter_handler: creating {}/set_parameters service", node_name); - + // rcl_interfaces/srv/SetParameters let set_params_request_stream = self .create_service::( &format!("{}/set_parameters", node_name), @@ -302,10 +300,6 @@ impl Node { let params_struct_clone = params_struct.clone(); let set_params_future = set_params_request_stream.for_each( move |req: ServiceRequest| { - println!( - "make_parameter_handler: set_parameters service called: {:?}", - req.message - ); let mut result = rcl_interfaces::srv::SetParameters::Response::default(); for p in &req.message.parameters { let val = ParameterValue::from_parameter_value_msg(p.value.clone()); @@ -345,7 +339,7 @@ impl Node { }; // if the value changed, send out new value on parameter event stream if changed && r.successful { - if let Err(e) = event_tx.try_send((p.name.clone(), val)) { + if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) { log::debug!("Warning: could not send parameter event ({}).", e); } } @@ -358,8 +352,6 @@ impl Node { ); handlers.push(Box::pin(set_params_future)); - println!("make_parameter_handler: created {}/get_parameters service", node_name); - // rcl_interfaces/srv/GetParameters let get_params_request_stream = self .create_service::( @@ -371,11 +363,6 @@ impl Node { let params_struct_clone = params_struct.clone(); let get_params_future = get_params_request_stream.for_each( move |req: ServiceRequest| { - println!( - "make_parameter_handler: get_parameters service called: {:?}", - req.message - ); - let params = params.lock().unwrap(); let values = req .message @@ -406,8 +393,6 @@ impl Node { handlers.push(Box::pin(get_params_future)); - println!("make_parameter_handler: creating {}/list_parameters service", node_name); - // rcl_interfaces/srv/ListParameters use rcl_interfaces::srv::ListParameters; let list_params_request_stream = self.create_service::( @@ -418,18 +403,12 @@ impl Node { let params = self.params.clone(); let list_params_future = list_params_request_stream.for_each( move |req: ServiceRequest| { - println!( - "make_parameter_handler: list_parameters service called: {:?}", - req.message - ); Self::handle_list_parameters(req, ¶ms) }, ); handlers.push(Box::pin(list_params_future)); - println!("make_parameter_handler: created {}/describe_parameters service", node_name); - // rcl_interfaces/srv/DescribeParameters use rcl_interfaces::srv::DescribeParameters; let desc_params_request_stream = self.create_service::( @@ -440,18 +419,12 @@ impl Node { let params = self.params.clone(); let desc_params_future = desc_params_request_stream.for_each( move |req: ServiceRequest| { - println!( - "make_parameter_handler: describe_parameters service called: {:?}", - req.message - ); Self::handle_desc_parameters(req, ¶ms) }, ); handlers.push(Box::pin(desc_params_future)); - println!("make_parameter_handler: creating {}/get_parameter_types service", node_name); - // rcl_interfaces/srv/GetParameterTypes use rcl_interfaces::srv::GetParameterTypes; let get_param_types_request_stream = self.create_service::( @@ -462,10 +435,6 @@ impl Node { let params = self.params.clone(); let get_param_types_future = get_param_types_request_stream.for_each( move |req: ServiceRequest| { - println!( - "make_parameter_handler: get_parameter_types service called: {:?}", - req.message - ); let params = params.lock().unwrap(); let types = req .message @@ -484,6 +453,73 @@ impl Node { handlers.push(Box::pin(get_param_types_future)); + let set_params_atomically_request_stream = + self.create_service::( + &format!("{}/set_parameters_atomically", node_name), + QosProfile::default(), + )?; + + let params = self.params.clone(); + let params_struct_clone = params_struct.clone(); + let set_params_atomically_future = set_params_atomically_request_stream.for_each( + move |req: ServiceRequest| { + let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default(); + result.result.successful = true; + for p in &req.message.parameters { + let val = ParameterValue::from_parameter_value_msg(p.value.clone()); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .map(|v| v.value != val) + .unwrap_or(true); // changed=true if new + let r = if let Some(ps) = ¶ms_struct_clone { + // Update parameter structure + let result = ps.lock().unwrap().set_parameter(&p.name, &val); + if result.is_ok() { + // Also update Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()); + } + rcl_interfaces::msg::SetParametersResult { + successful: result.is_ok(), + reason: result.err().map_or("".into(), |e| e.to_string()), + } + } else { + // No parameter structure - update only Node::params + params + .lock() + .unwrap() + .entry(p.name.clone()) + .and_modify(|p| p.value = val.clone()) + .or_insert(Parameter::new(val.clone())); + rcl_interfaces::msg::SetParametersResult { + successful: true, + reason: "".into(), + } + }; + // if the value changed, send out new value on parameter event stream + if changed && r.successful { + if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) { + log::debug!("Warning: could not send parameter event ({}).", e); + } + } + // if this parameter failed, set the result and break + if !r.successful { + result.result = r; + break; + } + } + req.respond(result) + .expect("could not send reply to set parameter request"); + future::ready(()) + }, + ); + handlers.push(Box::pin(set_params_atomically_future)); + #[cfg(r2r__rosgraph_msgs__msg__Clock)] { // create TimeSource based on value of use_sim_time parameter @@ -507,8 +543,6 @@ impl Node { } } - println!("make_parameter_handler should be done now"); - // we don't care about the result, the futures will not complete anyway. Ok((join_all(handlers).map(|_| ()), event_rx)) } From 5dc2e791588b64fd4cd873749445895b3ac5ee7e Mon Sep 17 00:00:00 2001 From: Desmond Germans Date: Mon, 14 Apr 2025 10:35:45 +0200 Subject: [PATCH 4/4] added warning message --- r2r/src/nodes.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index cbe956265..6d46957d6 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -453,6 +453,10 @@ impl Node { handlers.push(Box::pin(get_param_types_future)); + // rcl_interfaces/srv/SetParametersAtomically + + // NOTE: This is not a proper implementation of the specs, but rather a copy of set_parameters. + // On error, some of the parameters might already be set. let set_params_atomically_request_stream = self.create_service::( &format!("{}/set_parameters_atomically", node_name),