diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 0000000..29aff6b --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,35 @@ +use r2r; +use failure::Error; + +use r2r::example_interfaces::srv::AddTwoInts; + +fn main() -> Result<(), Error> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let client = node.create_client::("/add_two_ints")?; + + // wait for service to be available + println!("waiting for service..."); + while !node.service_available(&client)? { + std::thread::sleep(std::time::Duration::from_millis(1000)); + } + + println!("service available."); + + let mut c = 0; + loop { + let req = AddTwoInts::Request { a: 10*c, b: c }; + + let cb_req = req.clone(); + let cb = Box::new(move |r: AddTwoInts::Response| + println!("{} + {} = {}", cb_req.a, cb_req.b, r.sum)); + + client.request(&req, cb)?; + + node.spin_once(std::time::Duration::from_millis(1000)); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + c+=1; + } + +} diff --git a/src/lib.rs b/src/lib.rs index 25549b2..f7dcb07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -325,6 +325,59 @@ where } } +// client +struct WrappedClient +where + T: WrappedServiceTypeSupport, +{ + rcl_handle: rcl_client_t, + rcl_request: rmw_request_id_t, + // store callbacks with request sequence id and callback function + callbacks: Vec<(i64, Box)>, + rcl_response_msg: WrappedNativeMsg, +} + +pub trait Client_ { + fn handle(&self) -> &rcl_client_t; + fn run_cb(&mut self) -> (); + fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void; + fn rcl_request_id(&mut self) -> *mut rmw_request_id_t; + fn destroy(&mut self, node: &mut rcl_node_t) -> (); +} + +impl Client_ for WrappedClient +where + T: WrappedServiceTypeSupport, +{ + fn handle(&self) -> &rcl_client_t { + &self.rcl_handle + } + + fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void { + self.rcl_response_msg.void_ptr_mut() + } + + fn rcl_request_id(&mut self) -> *mut rmw_request_id_t { + &mut self.rcl_request + } + + fn run_cb(&mut self) -> () { + // copy native msg to rust type and run callback + let req_id = self.rcl_request.sequence_number; + let idx = self.callbacks.iter().position(|(id, _)| id == &req_id) + .expect("no such sequence number registered"); + let (_, cb_to_run) = self.callbacks.swap_remove(idx); + let response = T::Response::from_native(&self.rcl_response_msg); + (cb_to_run)(response); + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_client_fini(&mut self.rcl_handle, node); + } + } +} + // The publish function is thread safe. ROS2 docs state: // ============= // @@ -369,12 +422,26 @@ pub struct PublisherUntyped { type_: String, } +// Same reasoning for clients. +unsafe impl Send for Client where T: WrappedServiceTypeSupport {} + +pub struct Client +where + T: WrappedServiceTypeSupport, +{ + client_: Weak>>, +} + + #[derive(Debug, Clone)] pub struct Context { context_handle: Arc>>, } -// Not 100% about this one. From our end the context is rarely used and can be locked by a mutex for that. But I haven't investigated if its use is thread-safe between nodes. May remove send here later. +// Not 100% about this one. From our end the context is rarely used +// and can be locked by a mutex for that. But I haven't investigated +// if its use is thread-safe between nodes. May remove send here +// later. unsafe impl Send for Context {} impl Context { @@ -509,8 +576,10 @@ pub struct Node { node_handle: Box, // the node owns the subscribers subs: Vec>, - // servies, + // services, services: Vec>, + // clients with hack to avoid locking just to wait..., + clients: Vec<(rcl_client_t, Arc>)>, // timers, timers: Vec, // and the publishers, whom we allow to be shared.. hmm. @@ -626,6 +695,7 @@ impl Node { node_handle: node_handle, subs: Vec::new(), services: Vec::new(), + clients: Vec::new(), timers: Vec::new(), pubs: Vec::new(), }; @@ -757,6 +827,74 @@ impl Node { } + pub fn create_client_helper(&mut self, service_name: &str, + service_ts: *const rosidl_service_type_support_t) + -> Result { + let mut client_handle = unsafe { rcl_get_zero_initialized_client() }; + let service_name_c_string = CString::new(service_name) + .map_err(|_|Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let client_options = rcl_client_get_default_options(); + rcl_client_init(&mut client_handle, self.node_handle.as_mut(), + service_ts, service_name_c_string.as_ptr(), &client_options) + }; + if result == RCL_RET_OK as i32 { + Ok(client_handle) + } else { + Err(Error::from_rcl_error(result)) + } + } + + pub fn create_client( + &mut self, + service_name: &str, + ) -> Result> + where + T: WrappedServiceTypeSupport, + { + let client_handle = self.create_client_helper(service_name, T::get_ts())?; + let cloned_ch = rcl_client_t { + impl_: client_handle.impl_ + }; + let ws = WrappedClient:: { + rcl_handle: cloned_ch, + rcl_request: rmw_request_id_t { + writer_guid: [0; 16usize], + sequence_number: 0, + }, + rcl_response_msg: WrappedNativeMsg::::new(), + callbacks: Vec::new(), + }; + + let arc = Arc::new(Mutex::new(ws)); + let client_ = Arc::downgrade(&arc); + self.clients.push((client_handle, arc)); + let c = Client { + client_ + }; + Ok(c) + } + + pub fn service_available(&self, client: &Client) -> Result { + let client = client.client_.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + let mut avail = false; + let result = unsafe { + rcl_service_server_is_available( + self.node_handle.as_ref(), + client.handle(), + &mut avail) + }; + + if result == RCL_RET_OK as i32 { + Ok(avail) + } else { + eprintln!("coult not send request {}", result); + Err(Error::from_rcl_error(result)) + } + } + pub fn create_publisher_helper(&mut self, topic: &str, typesupport: *const rosidl_message_type_support_t) -> Result { let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; @@ -826,7 +964,7 @@ impl Node { self.subs.len(), 0, self.timers.len(), - 0, + self.clients.len(), self.services.len(), 0, ctx.as_mut(), @@ -850,6 +988,12 @@ impl Node { } } + for (handle, _) in self.clients.iter() { + unsafe { + rcl_wait_set_add_client(&mut ws, &*handle, std::ptr::null_mut()); + } + } + for s in self.services.iter() { unsafe { rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut()); @@ -908,6 +1052,20 @@ impl Node { } } + let ws_clients = + unsafe { std::slice::from_raw_parts(ws.clients, ws.size_of_clients) }; + assert_eq!(ws_clients.len(), self.clients.len()); + for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { + if ws_s != &std::ptr::null() { + let mut s = s.lock().unwrap(); + let ret = unsafe { + rcl_take_response(s.handle(), s.rcl_request_id(), s.rcl_response_msg()) + }; + if ret == RCL_RET_OK as i32 { + s.run_cb(); + } + } + } let ws_services = unsafe { std::slice::from_raw_parts(ws.services, ws.size_of_services) }; @@ -1099,6 +1257,40 @@ where } } + +impl Client +where + T: WrappedServiceTypeSupport, +{ + pub fn request(&self, msg: &T::Request, cb: Box ()>) -> Result<()> + where + T: WrappedServiceTypeSupport, + { + // upgrade to actual ref. if still alive + let client = self.client_.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + // copy rust msg to native and publish it + let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_send_request( + &client.rcl_handle, + native_msg.void_ptr(), + &mut seq_no, + ) + }; + + if result == RCL_RET_OK as i32 { + client.callbacks.push((seq_no, cb)); + Ok(()) + } else { + eprintln!("coult not send request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + + impl PublisherUntyped { pub fn publish(&self, msg: serde_json::Value) -> Result<()> { // upgrade to actual ref. if still alive