diff --git a/src/action_clients.rs b/src/action_clients.rs index 1253429..1f109ec 100644 --- a/src/action_clients.rs +++ b/src/action_clients.rs @@ -1,5 +1,185 @@ use super::*; +unsafe impl Send for ActionClient where T: WrappedActionTypeSupport {} + +#[derive(Clone)] +pub struct ActionClient +where + T: WrappedActionTypeSupport, +{ + client: Weak>>, +} + +#[derive(Clone)] +pub struct ClientGoal +where + T: WrappedActionTypeSupport, +{ + client: Weak>>, + pub uuid: uuid::Uuid, + feedback: Arc>>>, + result: Arc>>>, +} + +impl ClientGoal +where + T: WrappedActionTypeSupport, +{ + pub fn get_status(&self) -> Result { + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + + Ok(client.get_goal_status(&self.uuid)) + } + + pub fn get_result(&mut self) -> Result>> { + if let Some(result_channel) = self.result.lock().unwrap().take() { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + client.send_result_request(self.uuid); + + Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)) + } else { + // todo: error codes... + println!("already asked for the result!"); + Err(Error::RCL_RET_ACTION_CLIENT_INVALID) + } + } + + pub fn get_feedback(&self) -> Result + Unpin> { + if let Some(feedback_channel) = self.feedback.lock().unwrap().take() { + Ok(feedback_channel) + } else { + // todo: error codes... + println!("someone else owns the feedback consumer stream"); + Err(Error::RCL_RET_ACTION_CLIENT_INVALID) + } + } + + pub fn cancel(&self) -> Result>> { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + client.send_cancel_request(&self.uuid) + } +} + +impl ActionClient +where + T: WrappedActionTypeSupport, +{ + pub fn send_goal_request( + &self, + goal: T::Goal, + ) -> Result>>> + where + T: WrappedActionTypeSupport, + { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + let uuid = uuid::Uuid::new_v4(); + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; + let request_msg = T::make_goal_request_msg(uuid_msg, goal); + let native_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Request, + >::from(&request_msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + // set up channels + let (goal_req_sender, goal_req_receiver) = oneshot::channel::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >(); + let (feedback_sender, feedback_receiver) = mpsc::channel::(1); + client.feedback_senders.push((uuid, feedback_sender)); + let (result_sender, result_receiver) = oneshot::channel::(); + client.result_senders.push((uuid, result_sender)); + + if result == RCL_RET_OK as i32 { + client + .goal_response_channels + .push((seq_no, goal_req_sender)); + // instead of "canceled" we return invalid client. + let fut_client = Weak::clone(&self.client); + let future = goal_req_receiver + .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID) + .map(move |r| match r { + Ok(resp) => { + let (accepted, _stamp) = T::destructure_goal_response_msg(resp); + if accepted { + Ok(ClientGoal { + client: fut_client, + uuid, + feedback: Arc::new(Mutex::new(Some(feedback_receiver))), + result: Arc::new(Mutex::new(Some(result_receiver))), + }) + } else { + println!("goal rejected"); + Err(Error::RCL_RET_ACTION_GOAL_REJECTED) + } + } + Err(e) => Err(e), + }); + Ok(future) + } else { + eprintln!("coult not send goal request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +pub fn make_action_client(client: Weak>>) -> ActionClient +where + T: WrappedActionTypeSupport, +{ + ActionClient { + client + } +} + +pub fn action_server_available(node: &rcl_node_t, client: &ActionClient) -> Result +where + T: 'static + WrappedActionTypeSupport, +{ + let client = client + .client + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + let mut avail = false; + let result = unsafe { + rcl_action_server_is_available(node, client.handle(), &mut avail) + }; + + if result == RCL_RET_OK as i32 { + Ok(avail) + } else { + eprintln!("coult not send request {}", result); + Err(Error::from_rcl_error(result)) + } +} + #[derive(Debug, Copy, Clone, PartialEq)] pub enum GoalStatus { Unknown, @@ -342,3 +522,29 @@ where } } } + +pub fn create_action_client_helper( + node: &mut rcl_node_t, + action_name: &str, + action_ts: *const rosidl_action_type_support_t, +) -> Result { + let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() }; + let action_name_c_string = + CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let client_options = rcl_action_client_get_default_options(); + rcl_action_client_init( + &mut client_handle, + node, + action_ts, + action_name_c_string.as_ptr(), + &client_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(client_handle) + } else { + Err(Error::from_rcl_error(result)) + } +} diff --git a/src/action_servers.rs b/src/action_servers.rs index 77db735..b70330b 100644 --- a/src/action_servers.rs +++ b/src/action_servers.rs @@ -564,3 +564,32 @@ where Ok(()) } } + +pub fn create_action_server_helper( + node: &mut rcl_node_t, + action_name: &str, + clock_handle: *mut rcl_clock_t, + action_ts: *const rosidl_action_type_support_t, +) -> Result { + let mut server_handle = unsafe { rcl_action_get_zero_initialized_server() }; + let action_name_c_string = + CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let server_options = rcl_action_server_get_default_options(); + + rcl_action_server_init( + &mut server_handle, + node, + clock_handle, + action_ts, + action_name_c_string.as_ptr(), + &server_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(server_handle) + } else { + Err(Error::from_rcl_error(result)) + } +} diff --git a/src/lib.rs b/src/lib.rs index ee83d93..a64e996 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,6 @@ use action_clients::*; mod action_servers; use action_servers::*; - pub use action_servers::ServerGoal; mod context; @@ -65,55 +64,6 @@ pub use parameters::ParameterValue; mod clocks; pub use clocks::{Clock, ClockType}; -/// Encapsulates a service request. In contrast to having a simply callback from -/// Request -> Response types that is called synchronously, the service request -/// can be moved around and completed asynchronously. -pub struct ServiceRequest -where - T: WrappedServiceTypeSupport, -{ - pub message: T::Request, - request_id: rmw_request_id_t, - response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>, -} - -impl ServiceRequest -where - T: WrappedServiceTypeSupport, -{ - /// Complete the service request, consuming the request in the process. - /// The reply is sent back on the next "ros spin". - pub fn respond(self, msg: T::Response) { - match self.response_sender.send((self.request_id, msg)) { - Err(_) => { - println!("service response receiver dropped"); - } - _ => {} - } - } -} - -unsafe impl Send for ActionClient where T: WrappedActionTypeSupport {} - -#[derive(Clone)] -pub struct ActionClient -where - T: WrappedActionTypeSupport, -{ - client: Weak>>, -} - -#[derive(Clone)] -pub struct ClientGoal -where - T: WrappedActionTypeSupport, -{ - client: Weak>>, - pub uuid: uuid::Uuid, - feedback: Arc>>>, - result: Arc>>>, -} - #[derive(Clone)] pub struct ActionServer where @@ -348,32 +298,6 @@ impl Node { Ok(receiver) } - fn create_service_helper( - &mut self, - service_name: &str, - service_ts: *const rosidl_service_type_support_t, - ) -> Result { - let mut service_handle = unsafe { rcl_get_zero_initialized_service() }; - let service_name_c_string = - CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let service_options = rcl_service_get_default_options(); - rcl_service_init( - &mut service_handle, - self.node_handle.as_mut(), - service_ts, - service_name_c_string.as_ptr(), - &service_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(service_handle) - } else { - Err(Error::from_rcl_error(result)) - } - } - pub fn create_service( &mut self, service_name: &str, @@ -381,7 +305,7 @@ impl Node { where T: WrappedServiceTypeSupport, { - let service_handle = self.create_service_helper(service_name, T::get_ts())?; + let service_handle = create_service_helper(self.node_handle.as_mut(), service_name, T::get_ts())?; let (sender, receiver) = mpsc::channel::>(10); let ws = TypedService:: { @@ -435,37 +359,11 @@ impl Node { service_available_untyped(self.node_handle.as_mut(), client) } - fn create_action_client_helper( - &mut self, - action_name: &str, - action_ts: *const rosidl_action_type_support_t, - ) -> Result { - let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() }; - let action_name_c_string = - CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let client_options = rcl_action_client_get_default_options(); - rcl_action_client_init( - &mut client_handle, - self.node_handle.as_mut(), - action_ts, - action_name_c_string.as_ptr(), - &client_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(client_handle) - } else { - Err(Error::from_rcl_error(result)) - } - } - pub fn create_action_client(&mut self, action_name: &str) -> Result> where T: WrappedActionTypeSupport, { - let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; + let client_handle = create_action_client_helper(self.node_handle.as_mut(), action_name, T::get_ts())?; let client = WrappedActionClient:: { rcl_handle: client_handle, goal_response_channels: Vec::new(), @@ -478,7 +376,7 @@ impl Node { let client_arc = Arc::new(Mutex::new(client)); self.action_clients.push(client_arc.clone()); - let c = ActionClient { client: Arc::downgrade(&client_arc) }; + let c = make_action_client(Arc::downgrade(&client_arc)); Ok(c) } @@ -486,51 +384,7 @@ impl Node { &self, client: &ActionClient, ) -> Result { - let client = client - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - let mut avail = false; - let result = unsafe { - rcl_action_server_is_available(self.node_handle.as_ref(), client.handle(), &mut avail) - }; - - if result == RCL_RET_OK as i32 { - Ok(avail) - } else { - eprintln!("coult not send request {}", result); - Err(Error::from_rcl_error(result)) - } - } - - fn create_action_server_helper( - &mut self, - action_name: &str, - clock_handle: *mut rcl_clock_t, - action_ts: *const rosidl_action_type_support_t, - ) -> Result { - let mut server_handle = unsafe { rcl_action_get_zero_initialized_server() }; - let action_name_c_string = - CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let server_options = rcl_action_server_get_default_options(); - - rcl_action_server_init( - &mut server_handle, - self.node_handle.as_mut(), - clock_handle, - action_ts, - action_name_c_string.as_ptr(), - &server_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(server_handle) - } else { - Err(Error::from_rcl_error(result)) - } + action_server_available(self.node_handle.as_ref(), client) } pub fn create_action_server( @@ -558,7 +412,7 @@ impl Node { let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() }); let server_handle = - self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?; + create_action_server_helper(self.node_handle.as_mut(), action_name, clock_handle.as_mut(), T::get_ts())?; let server = WrappedActionServer:: { rcl_handle: server_handle, clock_handle, @@ -1124,134 +978,6 @@ impl Drop for Node { } } -impl ClientGoal -where - T: WrappedActionTypeSupport, -{ - pub fn get_status(&self) -> Result { - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - - Ok(client.get_goal_status(&self.uuid)) - } - - pub fn get_result(&mut self) -> Result>> { - if let Some(result_channel) = self.result.lock().unwrap().take() { - // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - - client.send_result_request(self.uuid); - - Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)) - } else { - // todo: error codes... - println!("already asked for the result!"); - Err(Error::RCL_RET_ACTION_CLIENT_INVALID) - } - } - - pub fn get_feedback(&self) -> Result + Unpin> { - if let Some(feedback_channel) = self.feedback.lock().unwrap().take() { - Ok(feedback_channel) - } else { - // todo: error codes... - println!("someone else owns the feedback consumer stream"); - Err(Error::RCL_RET_ACTION_CLIENT_INVALID) - } - } - - pub fn cancel(&self) -> Result>> { - // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - - client.send_cancel_request(&self.uuid) - } -} - -impl ActionClient -where - T: WrappedActionTypeSupport, -{ - pub fn send_goal_request( - &self, - goal: T::Goal, - ) -> Result>>> - where - T: WrappedActionTypeSupport, - { - // upgrade to actual ref. if still alive - let client = self - .client - .upgrade() - .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - - let uuid = uuid::Uuid::new_v4(); - let uuid_msg = unique_identifier_msgs::msg::UUID { - uuid: uuid.as_bytes().to_vec(), - }; - let request_msg = T::make_goal_request_msg(uuid_msg, goal); - let native_msg = WrappedNativeMsg::< - <::SendGoal as WrappedServiceTypeSupport>::Request, - >::from(&request_msg); - let mut seq_no = 0i64; - let result = unsafe { - rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) - }; - - // set up channels - let (goal_req_sender, goal_req_receiver) = oneshot::channel::< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >(); - let (feedback_sender, feedback_receiver) = mpsc::channel::(1); - client.feedback_senders.push((uuid, feedback_sender)); - let (result_sender, result_receiver) = oneshot::channel::(); - client.result_senders.push((uuid, result_sender)); - - if result == RCL_RET_OK as i32 { - client - .goal_response_channels - .push((seq_no, goal_req_sender)); - // instead of "canceled" we return invalid client. - let fut_client = Weak::clone(&self.client); - let future = goal_req_receiver - .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID) - .map(move |r| match r { - Ok(resp) => { - let (accepted, _stamp) = T::destructure_goal_response_msg(resp); - if accepted { - Ok(ClientGoal { - client: fut_client, - uuid, - feedback: Arc::new(Mutex::new(Some(feedback_receiver))), - result: Arc::new(Mutex::new(Some(result_receiver))), - }) - } else { - println!("goal rejected"); - Err(Error::RCL_RET_ACTION_GOAL_REJECTED) - } - } - Err(e) => Err(e), - }); - Ok(future) - } else { - eprintln!("coult not send goal request {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/services.rs b/src/services.rs index bc63bb2..cc3bb59 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,5 +1,33 @@ use super::*; +/// Encapsulates a service request. In contrast to having a simply callback from +/// Request -> Response types that is called synchronously, the service request +/// can be moved around and completed asynchronously. +pub struct ServiceRequest +where + T: WrappedServiceTypeSupport, +{ + pub message: T::Request, + request_id: rmw_request_id_t, + response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>, +} + +impl ServiceRequest +where + T: WrappedServiceTypeSupport, +{ + /// Complete the service request, consuming the request in the process. + /// The reply is sent back on the next "ros spin". + pub fn respond(self, msg: T::Response) { + match self.response_sender.send((self.request_id, msg)) { + Err(_) => { + println!("service response receiver dropped"); + } + _ => {} + } + } +} + pub trait Service_ { fn handle(&self) -> &rcl_service_t; fn send_completed_responses(&mut self) -> (); @@ -89,3 +117,29 @@ where } } } + +pub fn create_service_helper( + node: &mut rcl_node_t, + service_name: &str, + service_ts: *const rosidl_service_type_support_t, +) -> Result { + let mut service_handle = unsafe { rcl_get_zero_initialized_service() }; + let service_name_c_string = + CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let service_options = rcl_service_get_default_options(); + rcl_service_init( + &mut service_handle, + node, + service_ts, + service_name_c_string.as_ptr(), + &service_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(service_handle) + } else { + Err(Error::from_rcl_error(result)) + } +}