diff --git a/examples/tokio.rs b/examples/tokio.rs index 17f5a4b..4024fe8 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -17,15 +17,6 @@ async fn main() -> Result<(), Box> { let p = node.create_publisher::("/topic2")?; let state = Arc::new(Mutex::new(SharedState::default())); - use r2r::example_interfaces::srv::AddTwoInts; - let client = node.create_client::("/add_two_ints")?; - - std::thread::spawn(move || { - let req = AddTwoInts::Request { a: 10, b: 20 }; - print!("{} + {} = ", req.a, req.b); - let resp = client.request(&req).expect(""); - }); - // task that every other time forwards message to topic2 let state_t1 = state.clone(); task::spawn(async move { diff --git a/src/lib.rs b/src/lib.rs index 75f935c..1ef5380 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,15 +188,15 @@ pub struct Node { pub params: HashMap, node_handle: Box, // the node owns the subscribers - subs: Vec>, + subs: Vec>, // services, - services: Vec>, - // clients with hack to avoid locking just to wait..., - clients: Vec<(rcl_client_t, Arc>)>, + services: Vec>, + // service clients + clients: Vec>>, // action clients - action_clients: Vec<(rcl_action_client_t, Arc>)>, + action_clients: Vec>>, // action servers - action_servers: Vec<(rcl_action_server_t, Arc>)>, + action_servers: Vec>>, // timers, timers: Vec, // and the publishers, whom we allow to be shared.. hmm. @@ -325,7 +325,7 @@ impl Node { let mut node = Node { params: HashMap::new(), context: ctx, - node_handle: node_handle, + node_handle, subs: Vec::new(), services: Vec::new(), clients: Vec::new(), @@ -476,10 +476,9 @@ impl Node { response_channels: Vec::new(), }; - let arc = Arc::new(Mutex::new(ws)); - let client_ = Arc::downgrade(&arc); - self.clients.push((client_handle, arc)); - let c = Client { client_ }; + let client_arc = Arc::new(Mutex::new(ws)); + self.clients.push(client_arc.clone()); + let c = Client { client_: Arc::downgrade(&client_arc) }; Ok(c) } @@ -494,10 +493,9 @@ impl Node { response_channels: Vec::new(), }; - let arc = Arc::new(Mutex::new(client)); - let client = Arc::downgrade(&arc); - self.clients.push((client_handle, arc)); - let c = UntypedClient { client_: client }; + let client_arc = Arc::new(Mutex::new(client)); + self.clients.push(client_arc.clone()); + let c = UntypedClient { client_: Arc::downgrade(&client_arc) }; Ok(c) } @@ -563,7 +561,7 @@ impl Node { T: WrappedActionTypeSupport, { let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; - let wa = WrappedActionClient:: { + let client = WrappedActionClient:: { rcl_handle: client_handle, goal_response_channels: Vec::new(), cancel_response_channels: Vec::new(), @@ -573,10 +571,9 @@ impl Node { goal_status: HashMap::new(), }; - let arc = Arc::new(Mutex::new(wa)); - let client = Arc::downgrade(&arc); - self.action_clients.push((client_handle, arc)); - let c = ActionClient { client }; + let client_arc = Arc::new(Mutex::new(client)); + self.action_clients.push(client_arc.clone()); + let c = ActionClient { client: Arc::downgrade(&client_arc) }; Ok(c) } @@ -657,7 +654,7 @@ impl Node { let server_handle = self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?; - let wa = WrappedActionServer:: { + let server = WrappedActionServer:: { rcl_handle: server_handle, clock_handle, accept_goal_cb, @@ -668,10 +665,9 @@ impl Node { result_requests: HashMap::new(), }; - let arc = Arc::new(Mutex::new(wa)); - let server = Arc::downgrade(&arc); - self.action_servers.push((server_handle, arc)); - let c = ActionServer { server }; + let server_arc = Arc::new(Mutex::new(server)); + self.action_servers.push(server_arc.clone()); + let c = ActionServer { server: Arc::downgrade(&server_arc) }; Ok(c) } @@ -798,7 +794,7 @@ impl Node { // count action client wait set needs let mut total_action_subs = 0; let mut total_action_clients = 0; - for (ach, _) in &self.action_clients { + for c in &self.action_clients { let mut num_subs = 0; let mut num_gc = 0; let mut num_timers = 0; @@ -806,7 +802,7 @@ impl Node { let mut num_services = 0; Self::action_client_get_num_waits( - ach, + c.lock().unwrap().handle(), &mut num_subs, &mut num_gc, &mut num_timers, @@ -828,7 +824,7 @@ impl Node { // count action server wait set needs let mut total_action_timers = 0; let mut total_action_services = 0; - for (ash, _) in &self.action_servers { + for s in &self.action_servers { let mut num_subs = 0; let mut num_gc = 0; let mut num_timers = 0; @@ -836,7 +832,7 @@ impl Node { let mut num_services = 0; Self::action_server_get_num_waits( - ash, + s.lock().unwrap().handle(), &mut num_subs, &mut num_gc, &mut num_timers, @@ -876,25 +872,25 @@ impl Node { rcl_wait_set_clear(&mut ws); } - for s in self.subs.iter() { + for s in &self.subs { unsafe { rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); } } - for s in self.timers.iter() { + for s in &self.timers { unsafe { rcl_wait_set_add_timer(&mut ws, &s.timer_handle, std::ptr::null_mut()); } } - for (handle, _) in self.clients.iter() { + for s in &self.clients { unsafe { - rcl_wait_set_add_client(&mut ws, &*handle, std::ptr::null_mut()); + rcl_wait_set_add_client(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut()); } } - for s in self.services.iter() { + for s in &self.services { unsafe { rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut()); } @@ -905,19 +901,19 @@ impl Node { // the node before ones created automatically by actions. we // then assume that we can count on the waitables created by // the actions are added at the end of the wait set arrays - for (ac, _) in self.action_clients.iter() { + for ac in &self.action_clients { unsafe { rcl_action_wait_set_add_action_client( &mut ws, - ac, + ac.lock().unwrap().handle(), std::ptr::null_mut(), std::ptr::null_mut(), ); } } - for (ash, _) in self.action_servers.iter() { + for acs in &self.action_servers { unsafe { - rcl_action_wait_set_add_action_server(&mut ws, ash, std::ptr::null_mut()); + rcl_action_wait_set_add_action_server(&mut ws, acs.lock().unwrap().handle(), std::ptr::null_mut()); } } @@ -976,7 +972,7 @@ impl Node { .retain(|t| !timers_to_remove.contains(&t.timer_handle)); let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; - for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { + for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) { if ws_s != &std::ptr::null() { let mut s = s.lock().unwrap(); s.handle_response(); @@ -990,7 +986,7 @@ impl Node { } } - for (ac, s) in &self.action_clients { + for ac in &self.action_clients { let mut is_feedback_ready = false; let mut is_status_ready = false; let mut is_goal_response_ready = false; @@ -1000,7 +996,7 @@ impl Node { let ret = unsafe { rcl_action_client_wait_set_get_entities_ready( &ws, - ac, + ac.lock().unwrap().handle(), &mut is_feedback_ready, &mut is_status_ready, &mut is_goal_response_ready, @@ -1014,32 +1010,32 @@ impl Node { } if is_feedback_ready { - let mut acs = s.lock().unwrap(); + let mut acs = ac.lock().unwrap(); acs.handle_feedback_msg(); } if is_status_ready { - let mut acs = s.lock().unwrap(); + let mut acs = ac.lock().unwrap(); acs.handle_status_msg(); } if is_goal_response_ready { - let mut acs = s.lock().unwrap(); + let mut acs = ac.lock().unwrap(); acs.handle_goal_response(); } if is_cancel_response_ready { - let mut acs = s.lock().unwrap(); + let mut acs = ac.lock().unwrap(); acs.handle_cancel_response(); } if is_result_response_ready { - let mut acs = s.lock().unwrap(); + let mut acs = ac.lock().unwrap(); acs.handle_result_response(); } } - for (ash, s) in &self.action_servers { + for s in &self.action_servers { let mut is_goal_request_ready = false; let mut is_cancel_request_ready = false; let mut is_result_request_ready = false; @@ -1048,7 +1044,7 @@ impl Node { let ret = unsafe { rcl_action_server_wait_set_get_entities_ready( &ws, - ash, + s.lock().unwrap().handle(), &mut is_goal_request_ready, &mut is_cancel_request_ready, &mut is_result_request_ready, @@ -1237,10 +1233,10 @@ impl Drop for Node { // TODO: allow other types of clocks... let _ret = unsafe { rcl_steady_clock_fini(t.clock_handle.as_mut()) }; } - for (_, c) in &mut self.action_clients { + for c in &mut self.action_clients { c.lock().unwrap().destroy(&mut self.node_handle); } - for (_, s) in &mut self.action_servers { + for s in &mut self.action_servers { s.lock().unwrap().destroy(&mut self.node_handle); } while let Some(p) = self.pubs.pop() { diff --git a/src/services.rs b/src/services.rs index 127c4c1..bc63bb2 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,6 +1,6 @@ use super::*; -pub trait Service { +pub trait Service_ { fn handle(&self) -> &rcl_service_t; fn send_completed_responses(&mut self) -> (); fn handle_request(&mut self) -> (); @@ -16,7 +16,7 @@ where pub outstanding_requests: Vec>, } -impl Service for TypedService +impl Service_ for TypedService where T: WrappedServiceTypeSupport, { diff --git a/src/subscribers.rs b/src/subscribers.rs index d0bf66b..fa11ac0 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -1,6 +1,6 @@ use super::*; -pub trait Subscriber { +pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; fn handle_incoming(&mut self) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> (); @@ -28,7 +28,7 @@ pub struct UntypedSubscriber { pub sender: mpsc::Sender>, } -impl Subscriber for TypedSubscriber +impl Subscriber_ for TypedSubscriber where T: WrappedTypesupport, { @@ -63,7 +63,7 @@ where } } -impl Subscriber for NativeSubscriber +impl Subscriber_ for NativeSubscriber where T: WrappedTypesupport, { @@ -97,7 +97,7 @@ where } } -impl Subscriber for UntypedSubscriber { +impl Subscriber_ for UntypedSubscriber { fn handle(&self) -> &rcl_subscription_t { &self.rcl_handle }