diff --git a/src/lib.rs b/src/lib.rs index 8471a13..cddc0f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,21 +7,21 @@ include!(concat!( use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ffi::{CStr, CString}; +use std::fmt::Debug; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use std::time::Duration; -use std::fmt::Debug; -use std::future::Future; -use futures::future::TryFutureExt; -use futures::future::FutureExt; use futures::channel::{mpsc, oneshot}; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use futures::stream::{Stream, StreamExt}; +use std::future::Future; +use actions::*; use msg_gen::*; use rcl::*; -use actions::*; mod error; use error::*; @@ -31,7 +31,9 @@ pub use utils::*; pub type Result = std::result::Result; -pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned + Default + Debug + Clone { +pub trait WrappedTypesupport: + Serialize + serde::de::DeserializeOwned + Default + Debug + Clone +{ type CStruct; fn get_ts() -> &'static rosidl_message_type_support_t; @@ -48,7 +50,7 @@ pub trait WrappedServiceTypeSupport: Debug + Clone { fn get_ts() -> &'static rosidl_service_type_support_t; } -pub trait WrappedActionTypeSupport: Debug + Clone{ +pub trait WrappedActionTypeSupport: Debug + Clone { type Goal: WrappedTypesupport; type Result: WrappedTypesupport; type Feedback: WrappedTypesupport; @@ -60,21 +62,40 @@ pub trait WrappedActionTypeSupport: Debug + Clone{ fn get_ts() -> &'static rosidl_action_type_support_t; - fn make_goal_request_msg(goal_id: unique_identifier_msgs::msg::UUID, goal: Self::Goal) -> - <::SendGoal as WrappedServiceTypeSupport>::Request; - fn make_goal_response_msg(accepted: bool, stamp: builtin_interfaces::msg::Time) -> - <::SendGoal as WrappedServiceTypeSupport>::Response; - fn make_feedback_msg(goal_id: unique_identifier_msgs::msg::UUID, feedback: Self::Feedback) -> Self::FeedbackMessage; - fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) -> - <::GetResult as WrappedServiceTypeSupport>::Request; - fn make_result_response_msg(status: i8, result: Self::Result) -> - <::GetResult as WrappedServiceTypeSupport>::Response; - fn destructure_goal_request_msg(msg: <::SendGoal as WrappedServiceTypeSupport>::Request) -> (unique_identifier_msgs::msg::UUID, Self::Goal); - fn destructure_goal_response_msg(msg: <::SendGoal as WrappedServiceTypeSupport>::Response) -> (bool, builtin_interfaces::msg::Time); - fn destructure_feedback_msg(msg: Self::FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); - fn destructure_result_response_msg(msg: <::GetResult as WrappedServiceTypeSupport>::Response) -> - (i8, Self::Result); - fn destructure_result_request_msg(msg: <::GetResult as WrappedServiceTypeSupport>::Request) -> unique_identifier_msgs::msg::UUID; + fn make_goal_request_msg( + goal_id: unique_identifier_msgs::msg::UUID, + goal: Self::Goal, + ) -> <::SendGoal as WrappedServiceTypeSupport>::Request; + fn make_goal_response_msg( + accepted: bool, + stamp: builtin_interfaces::msg::Time, + ) -> <::SendGoal as WrappedServiceTypeSupport>::Response; + fn make_feedback_msg( + goal_id: unique_identifier_msgs::msg::UUID, + feedback: Self::Feedback, + ) -> Self::FeedbackMessage; + fn make_result_request_msg( + goal_id: unique_identifier_msgs::msg::UUID, + ) -> <::GetResult as WrappedServiceTypeSupport>::Request; + fn make_result_response_msg( + status: i8, + result: Self::Result, + ) -> <::GetResult as WrappedServiceTypeSupport>::Response; + fn destructure_goal_request_msg( + msg: <::SendGoal as WrappedServiceTypeSupport>::Request, + ) -> (unique_identifier_msgs::msg::UUID, Self::Goal); + fn destructure_goal_response_msg( + msg: <::SendGoal as WrappedServiceTypeSupport>::Response, + ) -> (bool, builtin_interfaces::msg::Time); + fn destructure_feedback_msg( + msg: Self::FeedbackMessage, + ) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); + fn destructure_result_response_msg( + msg: <::GetResult as WrappedServiceTypeSupport>::Response, + ) -> (i8, Self::Result); + fn destructure_result_request_msg( + msg: <::GetResult as WrappedServiceTypeSupport>::Request, + ) -> unique_identifier_msgs::msg::UUID; } #[derive(Debug)] @@ -263,7 +284,12 @@ where let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsg::::new(); let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) }; if ret == RCL_RET_OK as i32 { let msg = T::from_native(&msg); @@ -293,7 +319,12 @@ where let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsg::::new(); let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) }; if ret == RCL_RET_OK as i32 { match self.sender.try_send(msg) { @@ -320,7 +351,12 @@ impl Sub for WrappedSubUntyped { let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) .expect(&format!("no typesupport for {}", self.topic_type)); let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) }; if ret == RCL_RET_OK as i32 { let json = msg.to_json(); @@ -427,16 +463,26 @@ where let mut response_msg = WrappedNativeMsg::::new(); let ret = unsafe { - rcl_take_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + rcl_take_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) }; if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + if let Some(idx) = self + .response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { let (_, sender) = self.response_channels.swap_remove(idx); let response = T::Response::from_native(&response_msg); match sender.send(response) { - Ok(()) => {}, - Err(e) => { println!("error sending to client: {:?}", e); }, + Ok(()) => {} + Err(e) => { + println!("error sending to client: {:?}", e); + } } } else { let we_have: String = self @@ -460,7 +506,6 @@ where } } - // action clients #[derive(Debug, Copy, Clone, PartialEq)] pub enum GoalStatus { @@ -506,7 +551,12 @@ where T: WrappedActionTypeSupport, { rcl_handle: rcl_action_client_t, - goal_response_channels: Vec<(i64, oneshot::Sender<<::SendGoal as WrappedServiceTypeSupport>::Response>)>, + goal_response_channels: Vec<( + i64, + oneshot::Sender< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >, + )>, cancel_response_channels: Vec<(i64, oneshot::Sender)>, feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, result_requests: Vec<(i64, uuid::Uuid)>, @@ -527,17 +577,17 @@ pub trait ActionClient_ { fn send_result_request(&mut self, uuid: uuid::Uuid) -> (); } - use std::convert::TryInto; fn vec_to_uuid_bytes(v: Vec) -> [T; 16] { - v.try_into() - .unwrap_or_else(|v: Vec| panic!("Expected a Vec of length {} but it was {}", 16, v.len())) + v.try_into().unwrap_or_else(|v: Vec| { + panic!("Expected a Vec of length {} but it was {}", 16, v.len()) + }) } impl WrappedActionClient where T: WrappedActionTypeSupport, - { +{ fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus { *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown) } @@ -548,23 +598,28 @@ where { let msg = action_msgs::srv::CancelGoal::Request { goal_info: action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { uuid: goal.as_bytes().to_vec() }, - .. action_msgs::msg::GoalInfo::default() - } + goal_id: unique_identifier_msgs::msg::UUID { + uuid: goal.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }, }; let native_msg = WrappedNativeMsg::::from(&msg); let mut seq_no = 0i64; - let result = - unsafe { rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + let result = unsafe { + rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; if result == RCL_RET_OK as i32 { let (cancel_req_sender, cancel_req_receiver) = oneshot::channel::(); - self.cancel_response_channels.push((seq_no, cancel_req_sender)); + self.cancel_response_channels + .push((seq_no, cancel_req_sender)); // instead of "canceled" we return invalid client. - let future = cancel_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| { - match r { + let future = cancel_req_receiver + .map_err(|_| Error::RCL_RET_CLIENT_INVALID) + .map(|r| match r { Ok(r) => match r.return_code { 0 => Ok(()), 1 => Err(Error::GoalCancelRejected), @@ -572,9 +627,8 @@ where 3 => Err(Error::GoalCancelAlreadyTerminated), x => panic!("unknown error code return from action server: {}", x), }, - Err(e) => Err(e) - } - }); + Err(e) => Err(e), + }); Ok(future) } else { eprintln!("coult not send goal request {}", result); @@ -593,19 +647,31 @@ where fn handle_goal_response(&mut self) -> () { let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::<<::SendGoal as WrappedServiceTypeSupport>::Response>::new(); + let mut response_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >::new(); let ret = unsafe { - rcl_action_take_goal_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + rcl_action_take_goal_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) }; if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.goal_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + if let Some(idx) = self + .goal_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { let (_, sender) = self.goal_response_channels.swap_remove(idx); let response = <::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); match sender.send(response) { - Ok(()) => {}, - Err(e) => { println!("error sending to action client: {:?}", e); }, + Ok(()) => {} + Err(e) => { + println!("error sending to action client: {:?}", e); + } } } else { let we_have: String = self @@ -627,11 +693,19 @@ where let mut response_msg = WrappedNativeMsg::::new(); let ret = unsafe { - rcl_action_take_cancel_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + rcl_action_take_cancel_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) }; if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.cancel_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + if let Some(idx) = self + .cancel_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { let (_, sender) = self.cancel_response_channels.swap_remove(idx); let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg); match sender.send(response) { @@ -655,14 +729,17 @@ where fn handle_feedback_msg(&mut self) -> () { let mut feedback_msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) - }; + let ret = + unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) }; if ret == RCL_RET_OK as i32 { let msg = T::FeedbackMessage::from_native(&feedback_msg); let (uuid, feedback) = T::destructure_feedback_msg(msg); let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); - if let Some((_,sender)) = self.feedback_senders.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) { + if let Some((_, sender)) = self + .feedback_senders + .iter_mut() + .find(|(uuid, _)| uuid == &msg_uuid) + { match sender.try_send(feedback) { Err(e) => eprintln!("warning: could not send feedback msg ({})", e), _ => (), @@ -673,14 +750,15 @@ where fn handle_status_msg(&mut self) -> () { let mut status_array = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) - }; + let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) }; if ret == RCL_RET_OK as i32 { let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); for a in &arr.status_list { - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); - if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { continue; } + let uuid = + uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); + if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { + continue; + } let status = GoalStatus::from_rcl(a.status); *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status; } @@ -689,17 +767,31 @@ where fn handle_result_response(&mut self) -> () { let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::<<::GetResult as WrappedServiceTypeSupport>::Response>::new(); + let mut response_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::new(); let ret = unsafe { - rcl_action_take_result_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + rcl_action_take_result_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) }; if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.result_requests.iter().position(|(id, _)| id == &request_id.sequence_number) { + if let Some(idx) = self + .result_requests + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { let (_, uuid) = self.result_requests.swap_remove(idx); - if let Some(idx) = self.result_senders.iter().position(|(suuid, _)| suuid == &uuid) { + if let Some(idx) = self + .result_senders + .iter() + .position(|(suuid, _)| suuid == &uuid) + { let (_, sender) = self.result_senders.swap_remove(idx); let response = <::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); let (status, result) = T::destructure_result_response_msg(response); @@ -709,8 +801,10 @@ where // this will drop the sender which makes the receiver fail with "canceled" } else { match sender.send(result) { - Ok(()) => {}, - Err(e) => { println!("error sending result to action client: {:?}", e); }, + Ok(()) => {} + Err(e) => { + println!("error sending result to action client: {:?}", e); + } } } } @@ -730,13 +824,17 @@ where } fn send_result_request(&mut self, uuid: uuid::Uuid) -> () { - let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; let request_msg = T::make_result_request_msg(uuid_msg); - let native_msg = WrappedNativeMsg::<<::GetResult as - WrappedServiceTypeSupport>::Request>::from(&request_msg); + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Request, + >::from(&request_msg); let mut seq_no = 0i64; - let result = - unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + let result = unsafe { + rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; if result == RCL_RET_OK as i32 { self.result_requests.push((seq_no, uuid)); @@ -752,7 +850,6 @@ where } } - // action servers struct WrappedActionServer where @@ -789,9 +886,15 @@ where fn handle_goal_request(&mut self, server: Arc>) -> () { let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::<<::SendGoal as WrappedServiceTypeSupport>::Request>::new(); + let mut request_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Request, + >::new(); let ret = unsafe { - rcl_action_take_goal_request(&self.rcl_handle, request_id.as_mut_ptr(), request_msg.void_ptr_mut()) + rcl_action_take_goal_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) }; if ret != RCL_RET_OK as i32 { @@ -806,14 +909,17 @@ where let goal_info = action_msgs::msg::GoalInfo { goal_id: uuid_msg, - stamp: time.clone() + stamp: time.clone(), }; let native_goal_info = WrappedNativeMsg::::from(&goal_info); let goal_handle: Option<*mut rcl_action_goal_handle_t> = if goal_accepted { unsafe { - Some(rcl_action_accept_new_goal(&mut self.rcl_handle, &*native_goal_info)) + Some(rcl_action_accept_new_goal( + &mut self.rcl_handle, + &*native_goal_info, + )) } } else { None @@ -821,15 +927,17 @@ where // send response let response_msg = T::make_goal_response_msg(goal_accepted, time); - let mut response_msg = WrappedNativeMsg::<<::SendGoal as - WrappedServiceTypeSupport>::Response>::from(&response_msg); + let mut response_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >::from(&response_msg); let ret = unsafe { let mut request_id = request_id.assume_init(); rcl_action_send_goal_response( &self.rcl_handle, &mut request_id, - response_msg.void_ptr_mut()) + response_msg.void_ptr_mut(), + ) }; if ret != RCL_RET_OK as i32 { println!("action server: failed to send goal response"); @@ -839,7 +947,10 @@ where // if we accepted the goal, update its state machine and publish all goal statuses if let Some(goal_handle) = goal_handle { unsafe { - rcl_action_update_goal_state(goal_handle, rcl_action_goal_event_t::GOAL_EVENT_EXECUTE); + rcl_action_update_goal_state( + goal_handle, + rcl_action_goal_event_t::GOAL_EVENT_EXECUTE, + ); } self.publish_status(); @@ -863,7 +974,11 @@ where let mut request_id = MaybeUninit::::uninit(); let mut request_msg = WrappedNativeMsg::::new(); let ret = unsafe { - rcl_action_take_cancel_request(&self.rcl_handle, request_id.as_mut_ptr(), request_msg.void_ptr_mut()) + rcl_action_take_cancel_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) }; if ret != RCL_RET_OK as i32 { @@ -881,7 +996,8 @@ where return; } - let mut response_msg = action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg); + let mut response_msg = + action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg); // let user filter cancelled goals. let requested_cancels = response_msg.goals_canceling.len(); @@ -911,10 +1027,15 @@ where self.publish_status(); } - let mut native_msg = WrappedNativeMsg::::from(&response_msg); + let mut native_msg = + WrappedNativeMsg::::from(&response_msg); let ret = unsafe { let mut request_id = request_id.assume_init(); - rcl_action_send_cancel_response(&self.rcl_handle, &mut request_id, native_msg.void_ptr_mut()) + rcl_action_send_cancel_response( + &self.rcl_handle, + &mut request_id, + native_msg.void_ptr_mut(), + ) }; if ret != RCL_RET_OK as i32 { @@ -953,7 +1074,10 @@ where println!("action server: failed to get goal status array: {}", ret); } // todo: error handling... - rcl_action_publish_status(&self.rcl_handle, &status as *const _ as *const std::os::raw::c_void); + rcl_action_publish_status( + &self.rcl_handle, + &status as *const _ as *const std::os::raw::c_void, + ); rcl_action_goal_status_array_fini(&mut status); } } @@ -976,9 +1100,15 @@ where fn handle_result_request(&mut self) -> () { let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::<<::GetResult as WrappedServiceTypeSupport>::Request>::new(); + let mut request_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Request, + >::new(); let ret = unsafe { - rcl_action_take_result_request(&self.rcl_handle, request_id.as_mut_ptr(), request_msg.void_ptr_mut()) + rcl_action_take_result_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) }; if ret != RCL_RET_OK as i32 { @@ -989,14 +1119,13 @@ where let msg = <<::GetResult as WrappedServiceTypeSupport>::Request>::from_native(&request_msg); let goal_info = action_msgs::msg::GoalInfo { goal_id: T::destructure_result_request_msg(msg), - .. action_msgs::msg::GoalInfo::default() + ..action_msgs::msg::GoalInfo::default() }; let goal_info_native = WrappedNativeMsg::::from(&goal_info); // does this goal exist? - let goal_exists = unsafe { - rcl_action_server_goal_exists(&self.rcl_handle, &*goal_info_native) - }; + let goal_exists = + unsafe { rcl_action_server_goal_exists(&self.rcl_handle, &*goal_info_native) }; let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); @@ -1005,11 +1134,14 @@ where println!("goal does not exist :("); let status = GoalStatus::Unknown; let msg = T::make_result_response_msg(status.to_rcl(), T::Result::default()); - let mut response_msg = WrappedNativeMsg::<<::GetResult as - WrappedServiceTypeSupport>::Response>::from(&msg); + let mut response_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&msg); Some(response_msg.void_ptr_mut()) } else { - self.result_msgs.get_mut(&uuid).map(|msg| msg.void_ptr_mut()) + self.result_msgs + .get_mut(&uuid) + .map(|msg| msg.void_ptr_mut()) }; let mut request_id = unsafe { request_id.assume_init() }; @@ -1026,7 +1158,10 @@ where } else { // keep request for later when result comes in // todo: add logic that replies to the requests - self.result_requests.entry(uuid).or_insert(vec![]).push(request_id); + self.result_requests + .entry(uuid) + .or_insert(vec![]) + .push(request_id); } } @@ -1152,13 +1287,16 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes - let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: self.uuid.as_bytes().to_vec() }; + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }; let feedback_msg = T::make_feedback_msg(uuid_msg, msg); let mut native_msg = WrappedNativeMsg::::from(&feedback_msg); let result = unsafe { rcl_action_publish_feedback( action_server.lock().unwrap().handle(), - native_msg.void_ptr_mut()) + native_msg.void_ptr_mut(), + ) }; if result == RCL_RET_OK as i32 { @@ -1191,15 +1329,16 @@ where // todo: check that the goal exists let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { uuid: self.uuid.as_bytes().to_vec() }, - .. action_msgs::msg::GoalInfo::default() + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() }; let goal_info_native = WrappedNativeMsg::::from(&goal_info); // does this goal exist? - let goal_exists = unsafe { - rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) - }; + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; if !goal_exists { println!("tried to publish result without a goal"); @@ -1216,8 +1355,9 @@ where // create result message let result_msg = T::make_result_response_msg(5, msg); // todo: int8 STATUS_CANCELED = 5 - let native_msg = WrappedNativeMsg::<<::GetResult - as WrappedServiceTypeSupport>::Response>::from(&result_msg); + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); action_server.add_result(self.uuid.clone(), Box::new(native_msg)); Ok(()) @@ -1243,15 +1383,16 @@ where // todo: check that the goal exists let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { uuid: self.uuid.as_bytes().to_vec() }, - .. action_msgs::msg::GoalInfo::default() + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() }; let goal_info_native = WrappedNativeMsg::::from(&goal_info); // does this goal exist? - let goal_exists = unsafe { - rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) - }; + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; if !goal_exists { println!("tried to publish result without a goal"); @@ -1268,8 +1409,9 @@ where // create result message let result_msg = T::make_result_response_msg(6, msg); // todo: int8 STATUS_ABORTED = 6 - let native_msg = WrappedNativeMsg::<<::GetResult - as WrappedServiceTypeSupport>::Response>::from(&result_msg); + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); action_server.add_result(self.uuid.clone(), Box::new(native_msg)); Ok(()) @@ -1288,15 +1430,16 @@ where // todo: check that the goal exists let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { uuid: self.uuid.as_bytes().to_vec() }, - .. action_msgs::msg::GoalInfo::default() + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() }; let goal_info_native = WrappedNativeMsg::::from(&goal_info); // does this goal exist? - let goal_exists = unsafe { - rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) - }; + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; if !goal_exists { println!("tried to publish result without a goal"); @@ -1319,8 +1462,9 @@ where // create result message let result_msg = T::make_result_response_msg(4, msg); // todo: int8 STATUS_SUCCEEDED = 4 - let native_msg = WrappedNativeMsg::<<::GetResult - as WrappedServiceTypeSupport>::Response>::from(&result_msg); + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); action_server.add_result(self.uuid.clone(), Box::new(native_msg)); Ok(()) @@ -1337,7 +1481,7 @@ where impl ActionServer where - T: WrappedActionTypeSupport + T: WrappedActionTypeSupport, { pub fn is_valid(&self) -> Result { // upgrade to actual ref. if still alive @@ -1347,9 +1491,7 @@ where .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes let action_server = action_server.lock().unwrap(); - Ok(unsafe { - rcl_action_server_is_valid(&action_server.rcl_handle) - }) + Ok(unsafe { rcl_action_server_is_valid(&action_server.rcl_handle) }) } } @@ -1700,10 +1842,7 @@ impl Node { } } - pub fn subscribe( - &mut self, - topic: &str, - ) -> Result + Unpin> + pub fn subscribe(&mut self, topic: &str) -> Result + Unpin> where T: WrappedTypesupport, { @@ -1968,11 +2107,12 @@ impl Node { } } - pub fn create_action_server(&mut self, - action_name: &str, - accept_goal_cb: Box bool>, - accept_cancel_cb: Box) -> bool>, - goal_cb: Box)>, + pub fn create_action_server( + &mut self, + action_name: &str, + accept_goal_cb: Box bool>, + accept_cancel_cb: Box) -> bool>, + goal_cb: Box)>, ) -> Result> where T: WrappedActionTypeSupport, @@ -1991,7 +2131,8 @@ impl Node { } let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() }); - let server_handle = self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?; + let server_handle = + self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?; let wa = WrappedActionServer:: { rcl_handle: server_handle, clock_handle, @@ -2065,11 +2206,23 @@ impl Node { Ok(p) } - fn action_client_get_num_waits(rcl_handle: &rcl_action_client_t, num_subs: &mut usize, num_gc: &mut usize , num_timers: &mut usize, - num_clients: &mut usize , num_services: &mut usize) -> Result<()> { + fn action_client_get_num_waits( + rcl_handle: &rcl_action_client_t, + num_subs: &mut usize, + num_gc: &mut usize, + num_timers: &mut usize, + num_clients: &mut usize, + num_services: &mut usize, + ) -> Result<()> { unsafe { - let result = rcl_action_client_wait_set_get_num_entities(rcl_handle, num_subs, num_gc, - num_timers, num_clients, num_services); + let result = rcl_action_client_wait_set_get_num_entities( + rcl_handle, + num_subs, + num_gc, + num_timers, + num_clients, + num_services, + ); if result == RCL_RET_OK as i32 { Ok(()) } else { @@ -2078,11 +2231,23 @@ impl Node { } } - fn action_server_get_num_waits(rcl_handle: &rcl_action_server_t, num_subs: &mut usize, num_gc: &mut usize , num_timers: &mut usize, - num_clients: &mut usize , num_services: &mut usize) -> Result<()> { + fn action_server_get_num_waits( + rcl_handle: &rcl_action_server_t, + num_subs: &mut usize, + num_gc: &mut usize, + num_timers: &mut usize, + num_clients: &mut usize, + num_services: &mut usize, + ) -> Result<()> { unsafe { - let result = rcl_action_server_wait_set_get_num_entities(rcl_handle, num_subs, num_gc, - num_timers, num_clients, num_services); + let result = rcl_action_server_wait_set_get_num_entities( + rcl_handle, + num_subs, + num_gc, + num_timers, + num_clients, + num_services, + ); if result == RCL_RET_OK as i32 { Ok(()) } else { @@ -2111,8 +2276,15 @@ impl Node { let mut num_clients = 0; let mut num_services = 0; - Self::action_client_get_num_waits(ach, &mut num_subs, &mut num_gc, &mut num_timers, &mut num_clients, &mut num_services) - .expect("could not get action client wait sets"); + Self::action_client_get_num_waits( + ach, + &mut num_subs, + &mut num_gc, + &mut num_timers, + &mut num_clients, + &mut num_services, + ) + .expect("could not get action client wait sets"); // sanity check assert_eq!(num_subs, 2); assert_eq!(num_clients, 3); @@ -2134,8 +2306,15 @@ impl Node { let mut num_clients = 0; let mut num_services = 0; - Self::action_server_get_num_waits(ash, &mut num_subs, &mut num_gc, &mut num_timers, &mut num_clients, &mut num_services) - .expect("could not get action client wait sets"); + Self::action_server_get_num_waits( + ash, + &mut num_subs, + &mut num_gc, + &mut num_timers, + &mut num_clients, + &mut num_services, + ) + .expect("could not get action client wait sets"); // sanity check assert_eq!(num_subs, 0); assert_eq!(num_clients, 0); @@ -2199,17 +2378,17 @@ impl Node { // the actions are added at the end of the wait set arrays for (ac, _) in self.action_clients.iter() { unsafe { - rcl_action_wait_set_add_action_client(&mut ws, - ac, - std::ptr::null_mut(), - std::ptr::null_mut()); + rcl_action_wait_set_add_action_client( + &mut ws, + ac, + std::ptr::null_mut(), + std::ptr::null_mut(), + ); } } for (ash, _) in self.action_servers.iter() { unsafe { - rcl_action_wait_set_add_action_server(&mut ws, - ash, - std::ptr::null_mut()); + rcl_action_wait_set_add_action_server(&mut ws, ash, std::ptr::null_mut()); } } @@ -2222,8 +2401,7 @@ impl Node { return; } - let ws_subs = - unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; + let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { if ws_s != &std::ptr::null() { s.handle_incoming(); @@ -2255,7 +2433,7 @@ impl Node { // client dropped the timer handle, let's drop our timer as well. timers_to_remove.push(s.timer_handle); } - }, + } _ => {} // ok } } @@ -2265,7 +2443,8 @@ impl Node { } } // drop timers scheduled for deletion - self.timers.retain(|t| !timers_to_remove.contains(&t.timer_handle)); + self.timers + .retain(|t| !timers_to_remove.contains(&t.timer_handle)); let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { @@ -2295,12 +2474,15 @@ impl Node { let mut is_result_response_ready = false; let ret = unsafe { - rcl_action_client_wait_set_get_entities_ready(&ws, ac, + rcl_action_client_wait_set_get_entities_ready( + &ws, + ac, &mut is_feedback_ready, &mut is_status_ready, &mut is_goal_response_ready, &mut is_cancel_response_ready, - &mut is_result_response_ready) + &mut is_result_response_ready, + ) }; if ret != RCL_RET_OK as i32 { @@ -2340,11 +2522,14 @@ impl Node { let mut is_goal_expired = false; let ret = unsafe { - rcl_action_server_wait_set_get_entities_ready(&ws, ash, + rcl_action_server_wait_set_get_entities_ready( + &ws, + ash, &mut is_goal_request_ready, &mut is_cancel_request_ready, &mut is_result_request_ready, - &mut is_goal_expired) + &mut is_goal_expired, + ) }; if ret != RCL_RET_OK as i32 { @@ -2372,7 +2557,6 @@ impl Node { } } - unsafe { rcl_wait_set_fini(&mut ws); } @@ -2414,10 +2598,7 @@ impl Node { Ok(res) } - pub fn create_wall_timer( - &mut self, - period: Duration, - ) -> Result { + pub fn create_wall_timer(&mut self, period: Duration) -> Result { let mut clock_handle = MaybeUninit::::uninit(); let ret = unsafe { @@ -2462,9 +2643,7 @@ impl Node { }; self.timers.push(timer); - let out_timer = Timer { - receiver: rx, - }; + let out_timer = Timer { receiver: rx }; Ok(out_timer) } @@ -2639,10 +2818,7 @@ where T: WrappedActionTypeSupport, { pub fn get_status(&self) -> Result { - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; let client = client.lock().unwrap(); Ok(client.get_goal_status(&self.uuid)) @@ -2651,17 +2827,13 @@ where pub fn get_result(&mut self) -> Result>> { if let Some(result_channel) = self.result.lock().unwrap().take() { // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); client.send_result_request(self.uuid); Ok(result_channel.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) - } - else { + } else { // todo: error codes... println!("already asked for the result!"); Err(Error::RCL_RET_CLIENT_INVALID) @@ -2680,55 +2852,59 @@ where pub fn cancel(&self) -> Result>> { // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); client.send_cancel_request(&self.uuid) } - } impl ActionClient where T: WrappedActionTypeSupport, { - pub fn send_goal_request(&self, goal: T::Goal) -> Result>>> + pub fn send_goal_request( + &self, + goal: T::Goal, + ) -> Result>>> where T: WrappedActionTypeSupport, { // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); let uuid = uuid::Uuid::new_v4(); - let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; let request_msg = T::make_goal_request_msg(uuid_msg, goal); - let native_msg = WrappedNativeMsg::<<::SendGoal as - WrappedServiceTypeSupport>::Request>::from(&request_msg); + let native_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Request, + >::from(&request_msg); let mut seq_no = 0i64; - let result = - unsafe { rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + let result = unsafe { + rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; // set up channels - let (goal_req_sender, goal_req_receiver) = - oneshot::channel::<<::SendGoal as WrappedServiceTypeSupport>::Response>(); + let (goal_req_sender, goal_req_receiver) = oneshot::channel::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >(); let (feedback_sender, feedback_receiver) = mpsc::channel::(1); client.feedback_senders.push((uuid, feedback_sender)); let (result_sender, result_receiver) = oneshot::channel::(); client.result_senders.push((uuid, result_sender)); if result == RCL_RET_OK as i32 { - client.goal_response_channels.push((seq_no, goal_req_sender)); + client + .goal_response_channels + .push((seq_no, goal_req_sender)); // instead of "canceled" we return invalid client. let fut_client = Weak::clone(&self.client); - let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(move |r| { - match r { + let future = goal_req_receiver + .map_err(|_| Error::RCL_RET_CLIENT_INVALID) + .map(move |r| match r { Ok(resp) => { let (accepted, _stamp) = T::destructure_goal_response_msg(resp); if accepted { @@ -2742,10 +2918,9 @@ where println!("goal rejected"); Err(Error::GoalRejected) } - }, + } Err(e) => Err(e), - } - }); + }); Ok(future) } else { eprintln!("coult not send goal request {}", result);