From 585e41a6a098c46b71b8f42657ae7abdd3adbab9 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 1 Sep 2021 16:12:18 +0200 Subject: [PATCH] Cleanup subscribers and services when their streams are dropped. --- src/nodes.rs | 21 +++++++++++++++++++-- src/services.rs | 13 ++++++++++--- src/subscribers.rs | 36 +++++++++++++++++++++++++++++------- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/src/nodes.rs b/src/nodes.rs index 5cefbe0..e6962bb 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -721,16 +721,23 @@ impl Node { } let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; + let mut subs_to_remove = vec![]; for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { if ws_s != &std::ptr::null() { - s.handle_incoming(); + let dropped = s.handle_incoming(); + if dropped { + s.destroy(&mut self.node_handle); + subs_to_remove.push(*s.handle()); + } } } + self.subs.retain(|s| !subs_to_remove.contains(s.handle())); let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) }; let mut timers_to_remove = vec![]; for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) { if ws_s != &std::ptr::null() { + // TODO: move this to impl Timer let mut is_ready = false; let ret = unsafe { rcl_timer_is_ready(&s.timer_handle, &mut is_ready) }; if ret == RCL_RET_OK as i32 { @@ -749,6 +756,10 @@ impl Node { println!("Warning: timer tick not handled in time - no wakeup will occur"); } if e.is_disconnected() { + // TODO: cleanup + let _ret = unsafe { rcl_timer_fini(&mut s.timer_handle) }; + let _ret = unsafe { rcl_steady_clock_fini(s.clock_handle.as_mut()) }; + // client dropped the timer handle, let's drop our timer as well. timers_to_remove.push(s.timer_handle); } @@ -774,12 +785,18 @@ impl Node { } let ws_services = unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) }; + let mut services_to_remove = vec![]; for (s, ws_s) in self.services.iter_mut().zip(ws_services) { if ws_s != &std::ptr::null() { let mut service = s.lock().unwrap(); - service.handle_request(s.clone()); + let dropped = service.handle_request(s.clone()); + if dropped { + service.destroy(&mut self.node_handle); + services_to_remove.push(*service.handle()); + } } } + self.services.retain(|s| !services_to_remove.contains(s.lock().unwrap().handle())); for ac in &self.action_clients { let mut is_feedback_ready = false; diff --git a/src/services.rs b/src/services.rs index b27aa51..8232b67 100644 --- a/src/services.rs +++ b/src/services.rs @@ -34,7 +34,8 @@ where pub trait Service_ { fn handle(&self) -> &rcl_service_t; fn send_response(&mut self, request_id: rmw_request_id_t, msg: Box) -> Result<()>; - fn handle_request(&mut self, service: Arc>) -> (); + /// Returns true if the service stream has been dropped. + fn handle_request(&mut self, service: Arc>) -> bool; fn destroy(&mut self, node: &mut rcl_node_t) -> (); } @@ -69,7 +70,7 @@ where } } - fn handle_request(&mut self, service: Arc>) -> () { + fn handle_request(&mut self, service: Arc>) -> bool { let mut request_id = MaybeUninit::::uninit(); let mut request_msg = WrappedNativeMsg::::new(); @@ -89,10 +90,16 @@ where service: Arc::downgrade(&service), }; match self.sender.try_send(request) { - Err(e) => eprintln!("warning: could not send service request ({})", e), + Err(e) => { + if e.is_disconnected() { + return true; + } + eprintln!("warning: could not send service request ({})", e) + }, _ => (), } } // TODO handle failure. + return false; } fn destroy(&mut self, node: &mut rcl_node_t) { diff --git a/src/subscribers.rs b/src/subscribers.rs index e4aaf34..4aa09c3 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -2,7 +2,8 @@ use super::*; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; - fn handle_incoming(&mut self) -> (); + /// Returns true if the subscriber stream has been dropped. + fn handle_incoming(&mut self) -> bool; fn destroy(&mut self, node: &mut rcl_node_t) -> (); } @@ -36,7 +37,7 @@ where &self.rcl_handle } - fn handle_incoming(&mut self) -> () { + fn handle_incoming(&mut self) -> bool { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsg::::new(); let ret = unsafe { @@ -50,10 +51,17 @@ where if ret == RCL_RET_OK as i32 { let msg = T::from_native(&msg); match self.sender.try_send(msg) { - Err(e) => println!("error {:?}", e), + Err(e) => { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + println!("error {:?}", e) + } _ => (), } } + return false; } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -71,7 +79,7 @@ where &self.rcl_handle } - fn handle_incoming(&mut self) -> () { + fn handle_incoming(&mut self) -> bool { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsg::::new(); let ret = unsafe { @@ -84,10 +92,17 @@ where }; if ret == RCL_RET_OK as i32 { match self.sender.try_send(msg) { - Err(e) => println!("error {:?}", e), + Err(e) => { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + println!("error {:?}", e) + } _ => (), } } + return false; } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -102,7 +117,7 @@ impl Subscriber_ for UntypedSubscriber { &self.rcl_handle } - fn handle_incoming(&mut self) -> () { + fn handle_incoming(&mut self) -> bool { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) .expect(&format!("no typesupport for {}", self.topic_type)); @@ -117,10 +132,17 @@ impl Subscriber_ for UntypedSubscriber { if ret == RCL_RET_OK as i32 { let json = msg.to_json(); match self.sender.try_send(json) { - Err(e) => println!("error {:?}", e), + Err(e) => { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + println!("error {:?}", e) + } _ => (), } } + return false; } fn destroy(&mut self, node: &mut rcl_node_t) {