From ff0f0dd5e1364d5db4fa771812837031e694dd78 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Sat, 14 Aug 2021 09:27:59 +0200 Subject: [PATCH] more tidying --- src/clients.rs | 92 +++++++++++++++++ src/lib.rs | 247 ++++----------------------------------------- src/publishers.rs | 129 ++++++++++++++++++++++- src/subscribers.rs | 27 +++++ 4 files changed, 265 insertions(+), 230 deletions(-) diff --git a/src/clients.rs b/src/clients.rs index 215d03d..0907820 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -1,5 +1,66 @@ use super::*; +// Public facing apis + +pub struct Client +where + T: WrappedServiceTypeSupport, +{ + client: Weak>>, +} + +impl Client +where + T: WrappedServiceTypeSupport, +{ + pub fn request(&self, msg: &T::Request) -> Result>> + where + T: WrappedServiceTypeSupport, + { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.request(msg) + } +} + +pub struct UntypedClient +{ + client: Weak>, +} + +impl UntypedClient +{ + pub fn request(&self, msg: serde_json::Value) -> Result>>> + { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.request(msg) + } +} + +pub fn make_client(client: Weak>>) -> Client +where + T: WrappedServiceTypeSupport, +{ + Client { + client + } +} + +pub fn make_untyped_client(client: Weak>) -> UntypedClient { + UntypedClient { + client + } +} + unsafe impl Send for TypedClient where T: WrappedServiceTypeSupport {} impl TypedClient @@ -209,3 +270,34 @@ pub fn create_client_helper( Err(Error::from_rcl_error(result)) } } + +pub fn service_available_helper(node: &mut rcl_node_t, client: &rcl_client_t) -> Result { + let mut avail = false; + let result = unsafe { + rcl_service_server_is_available(node, client, &mut avail) + }; + + if result == RCL_RET_OK as i32 { + Ok(avail) + } else { + Err(Error::from_rcl_error(result)) + } +} + +pub fn service_available(node: &mut rcl_node_t, client: &Client) -> Result { + let client = client + .client + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + service_available_helper(node, client.handle()) +} + +pub fn service_available_untyped(node: &mut rcl_node_t, client: &UntypedClient) -> Result { + let client = client + .client + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + service_available_helper(node, client.handle()) +} diff --git a/src/lib.rs b/src/lib.rs index 1ef5380..ee83d93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,7 @@ use services::*; mod clients; use clients::*; +pub use clients::{Client, UntypedClient}; mod action_clients; use action_clients::*; @@ -64,52 +65,6 @@ pub use parameters::ParameterValue; mod clocks; pub use clocks::{Clock, ClockType}; -// Public facing client API. -pub struct Client -where - T: WrappedServiceTypeSupport, -{ - client_: Weak>>, -} - -impl Client -where - T: WrappedServiceTypeSupport, -{ - pub fn request(&self, msg: &T::Request) -> Result>> - where - T: WrappedServiceTypeSupport, - { - // upgrade to actual ref. if still alive - let client = self - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - client.request(msg) - } -} - -// Public facing client API. -pub struct UntypedClient -{ - client_: Weak>, -} - -impl UntypedClient -{ - pub fn request(&self, msg: serde_json::Value) -> Result>>> - { - // upgrade to actual ref. if still alive - let client = self - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - client.request(msg) - } -} - /// Encapsulates a service request. In contrast to having a simply callback from /// Request -> Response types that is called synchronously, the service request /// can be moved around and completed asynchronously. @@ -300,7 +255,6 @@ impl Node { } pub fn create(ctx: Context, name: &str, namespace: &str) -> Result { - // cleanup default options. let (res, node_handle) = { let mut ctx_handle = ctx.context_handle.lock().unwrap(); @@ -342,37 +296,11 @@ impl Node { } } - fn create_subscription_helper( - &mut self, - topic: &str, - ts: *const rosidl_message_type_support_t, - ) -> Result { - let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; - let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let mut subscription_options = rcl_subscription_get_default_options(); - subscription_options.qos = rmw_qos_profile_t::default(); - rcl_subscription_init( - &mut subscription_handle, - self.node_handle.as_mut(), - ts, - topic_c_string.as_ptr(), - &subscription_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(subscription_handle) - } else { - Err(Error::from_rcl_error(result)) - } - } - pub fn subscribe(&mut self, topic: &str) -> Result + Unpin> where T: WrappedTypesupport, { - let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; + let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts())?; let (sender, receiver) = mpsc::channel::(10); let ws = TypedSubscriber { @@ -390,7 +318,7 @@ impl Node { where T: WrappedTypesupport, { - let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; + let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts())?; let (sender, receiver) = mpsc::channel::>(10); let ws = NativeSubscriber { @@ -408,7 +336,7 @@ impl Node { topic_type: &str, ) -> Result> + Unpin> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; - let subscription_handle = self.create_subscription_helper(topic, msg.ts)?; + let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts)?; let (sender, receiver) = mpsc::channel::>(10); let ws = UntypedSubscriber { @@ -477,8 +405,8 @@ impl Node { }; let client_arc = Arc::new(Mutex::new(ws)); - self.clients.push(client_arc.clone()); - let c = Client { client_: Arc::downgrade(&client_arc) }; + let c = make_client(Arc::downgrade(&client_arc)); + self.clients.push(client_arc); Ok(c) } @@ -494,40 +422,17 @@ impl Node { }; let client_arc = Arc::new(Mutex::new(client)); - self.clients.push(client_arc.clone()); - let c = UntypedClient { client_: Arc::downgrade(&client_arc) }; + let c = make_untyped_client(Arc::downgrade(&client_arc)); + self.clients.push(client_arc); Ok(c) } - fn service_available_helper(&self, client: &rcl_client_t) -> Result { - let mut avail = false; - let result = unsafe { - rcl_service_server_is_available(self.node_handle.as_ref(), client, &mut avail) - }; - - if result == RCL_RET_OK as i32 { - Ok(avail) - } else { - Err(Error::from_rcl_error(result)) - } + pub fn service_available(&mut self, client: &Client) -> Result { + service_available(self.node_handle.as_mut(), client) } - pub fn service_available(&self,client: &Client) -> Result { - let client = client - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - self.service_available_helper(client.handle()) - } - - pub fn service_available_untyped(&self, client: &UntypedClient) -> Result { - let client = client - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - self.service_available_helper(client.handle()) + pub fn service_available_untyped(&mut self, client: &UntypedClient) -> Result { + service_available_untyped(self.node_handle.as_mut(), client) } fn create_action_client_helper( @@ -671,42 +576,14 @@ impl Node { Ok(c) } - fn create_publisher_helper( - &mut self, - topic: &str, - typesupport: *const rosidl_message_type_support_t, - ) -> Result { - let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; - let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let mut publisher_options = rcl_publisher_get_default_options(); - publisher_options.qos = rmw_qos_profile_t::default(); - rcl_publisher_init( - &mut publisher_handle, - self.node_handle.as_mut(), - typesupport, - topic_c_string.as_ptr(), - &publisher_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(publisher_handle) - } else { - Err(Error::from_rcl_error(result)) - } - } - pub fn create_publisher(&mut self, topic: &str) -> Result> where T: WrappedTypesupport, { - let publisher_handle = self.create_publisher_helper(topic, T::get_ts())?; - self.pubs.push(Arc::new(publisher_handle)); - let p = Publisher { - handle: Arc::downgrade(self.pubs.last().unwrap()), - type_: PhantomData, - }; + let publisher_handle = create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts())?; + let arc = Arc::new(publisher_handle); + let p = make_publisher(Arc::downgrade(&arc)); + self.pubs.push(arc); Ok(p) } @@ -715,14 +592,11 @@ impl Node { topic: &str, topic_type: &str, ) -> Result { - let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?; // TODO, get ts without allocating msg - let publisher_handle = self.create_publisher_helper(topic, dummy.ts)?; - - self.pubs.push(Arc::new(publisher_handle)); - let p = PublisherUntyped { - handle: Arc::downgrade(self.pubs.last().unwrap()), - type_: topic_type.to_owned(), - }; + let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?; + let publisher_handle = create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts)?; + let arc = Arc::new(publisher_handle); + let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned()); + self.pubs.push(arc); Ok(p) } @@ -1250,57 +1124,6 @@ impl Drop for Node { } } -impl Publisher -where - T: WrappedTypesupport, -{ - pub fn publish(&self, msg: &T) -> Result<()> - where - T: WrappedTypesupport, - { - // upgrade to actual ref. if still alive - let publisher = self - .handle - .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); - let result = unsafe { - rcl_publish( - publisher.as_ref(), - native_msg.void_ptr(), - std::ptr::null_mut(), - ) - }; - - if result == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("coult not publish {}", result); - Err(Error::from_rcl_error(result)) - } - } - - pub fn publish_native(&self, msg: &WrappedNativeMsg) -> Result<()> - where - T: WrappedTypesupport, - { - // upgrade to actual ref. if still alive - let publisher = self - .handle - .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - - let result = - unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }; - if result == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("could not publish native {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - impl ClientGoal where T: WrappedActionTypeSupport, @@ -1429,34 +1252,6 @@ where } } -impl PublisherUntyped { - pub fn publish(&self, msg: serde_json::Value) -> Result<()> { - // upgrade to actual ref. if still alive - let publisher = self - .handle - .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - - let mut native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?; - native_msg.from_json(msg)?; - - let result = unsafe { - rcl_publish( - publisher.as_ref(), - native_msg.void_ptr(), - std::ptr::null_mut(), - ) - }; - - if result == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("coult not publish {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/publishers.rs b/src/publishers.rs index 96a7a01..119c52b 100644 --- a/src/publishers.rs +++ b/src/publishers.rs @@ -32,13 +32,134 @@ pub struct Publisher where T: WrappedTypesupport, { - pub handle: Weak, - pub type_: PhantomData, + handle: Weak, + type_: PhantomData, } unsafe impl Send for PublisherUntyped {} #[derive(Debug, Clone)] pub struct PublisherUntyped { - pub handle: Weak, - pub type_: String, + handle: Weak, + type_: String, +} + +pub fn make_publisher(handle: Weak) -> Publisher +where T: WrappedTypesupport { + Publisher { + handle, + type_: PhantomData, + } +} + +pub fn make_publisher_untyped(handle: Weak, type_: String) -> PublisherUntyped { + PublisherUntyped { + handle, + type_, + } +} + +pub fn create_publisher_helper( + node: &mut rcl_node_t, + topic: &str, + typesupport: *const rosidl_message_type_support_t, +) -> Result { + let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; + let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let mut publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos = rmw_qos_profile_t::default(); + rcl_publisher_init( + &mut publisher_handle, + node, + typesupport, + topic_c_string.as_ptr(), + &publisher_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(publisher_handle) + } else { + Err(Error::from_rcl_error(result)) + } +} + + +impl PublisherUntyped { + pub fn publish(&self, msg: serde_json::Value) -> Result<()> { + // upgrade to actual ref. if still alive + let publisher = self + .handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + + let mut native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?; + native_msg.from_json(msg)?; + + let result = unsafe { + rcl_publish( + publisher.as_ref(), + native_msg.void_ptr(), + std::ptr::null_mut(), + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("coult not publish {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +impl Publisher +where + T: WrappedTypesupport, +{ + pub fn publish(&self, msg: &T) -> Result<()> + where + T: WrappedTypesupport, + { + // upgrade to actual ref. if still alive + let publisher = self + .handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + let result = unsafe { + rcl_publish( + publisher.as_ref(), + native_msg.void_ptr(), + std::ptr::null_mut(), + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("coult not publish {}", result); + Err(Error::from_rcl_error(result)) + } + } + + pub fn publish_native(&self, msg: &WrappedNativeMsg) -> Result<()> + where + T: WrappedTypesupport, + { + // upgrade to actual ref. if still alive + let publisher = self + .handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + + let result = + unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }; + if result == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("could not publish native {}", result); + Err(Error::from_rcl_error(result)) + } + } } diff --git a/src/subscribers.rs b/src/subscribers.rs index fa11ac0..c82836e 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -129,3 +129,30 @@ impl Subscriber_ for UntypedSubscriber { } } } + + +pub fn create_subscription_helper( + node: &mut rcl_node_t, + topic: &str, + ts: *const rosidl_message_type_support_t, +) -> Result { + let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; + let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let mut subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos = rmw_qos_profile_t::default(); + rcl_subscription_init( + &mut subscription_handle, + node, + ts, + topic_c_string.as_ptr(), + &subscription_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(subscription_handle) + } else { + Err(Error::from_rcl_error(result)) + } +}