Cleanup subscribers and services when their streams are dropped.

This commit is contained in:
Martin Dahl 2021-09-01 16:12:18 +02:00
parent 439c89801f
commit 585e41a6a0
3 changed files with 58 additions and 12 deletions

View File

@ -721,16 +721,23 @@ impl Node {
} }
let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) };
let mut subs_to_remove = vec![];
for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
s.handle_incoming(); let dropped = s.handle_incoming();
if dropped {
s.destroy(&mut self.node_handle);
subs_to_remove.push(*s.handle());
}
} }
} }
self.subs.retain(|s| !subs_to_remove.contains(s.handle()));
let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) }; let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) };
let mut timers_to_remove = vec![]; let mut timers_to_remove = vec![];
for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) { for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
// TODO: move this to impl Timer
let mut is_ready = false; let mut is_ready = false;
let ret = unsafe { rcl_timer_is_ready(&s.timer_handle, &mut is_ready) }; let ret = unsafe { rcl_timer_is_ready(&s.timer_handle, &mut is_ready) };
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
@ -749,6 +756,10 @@ impl Node {
println!("Warning: timer tick not handled in time - no wakeup will occur"); println!("Warning: timer tick not handled in time - no wakeup will occur");
} }
if e.is_disconnected() { if e.is_disconnected() {
// TODO: cleanup
let _ret = unsafe { rcl_timer_fini(&mut s.timer_handle) };
let _ret = unsafe { rcl_steady_clock_fini(s.clock_handle.as_mut()) };
// client dropped the timer handle, let's drop our timer as well. // client dropped the timer handle, let's drop our timer as well.
timers_to_remove.push(s.timer_handle); timers_to_remove.push(s.timer_handle);
} }
@ -774,12 +785,18 @@ impl Node {
} }
let ws_services = unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) }; let ws_services = unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) };
let mut services_to_remove = vec![];
for (s, ws_s) in self.services.iter_mut().zip(ws_services) { for (s, ws_s) in self.services.iter_mut().zip(ws_services) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let mut service = s.lock().unwrap(); let mut service = s.lock().unwrap();
service.handle_request(s.clone()); let dropped = service.handle_request(s.clone());
if dropped {
service.destroy(&mut self.node_handle);
services_to_remove.push(*service.handle());
}
} }
} }
self.services.retain(|s| !services_to_remove.contains(s.lock().unwrap().handle()));
for ac in &self.action_clients { for ac in &self.action_clients {
let mut is_feedback_ready = false; let mut is_feedback_ready = false;

View File

@ -34,7 +34,8 @@ where
pub trait Service_ { pub trait Service_ {
fn handle(&self) -> &rcl_service_t; fn handle(&self) -> &rcl_service_t;
fn send_response(&mut self, request_id: rmw_request_id_t, msg: Box<dyn VoidPtr>) -> Result<()>; fn send_response(&mut self, request_id: rmw_request_id_t, msg: Box<dyn VoidPtr>) -> Result<()>;
fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> (); /// Returns true if the service stream has been dropped.
fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> bool;
fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> ();
} }
@ -69,7 +70,7 @@ where
} }
} }
fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> () { fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> bool {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit(); let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<T::Request>::new(); let mut request_msg = WrappedNativeMsg::<T::Request>::new();
@ -89,10 +90,16 @@ where
service: Arc::downgrade(&service), service: Arc::downgrade(&service),
}; };
match self.sender.try_send(request) { match self.sender.try_send(request) {
Err(e) => eprintln!("warning: could not send service request ({})", e), Err(e) => {
if e.is_disconnected() {
return true;
}
eprintln!("warning: could not send service request ({})", e)
},
_ => (), _ => (),
} }
} // TODO handle failure. } // TODO handle failure.
return false;
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {

View File

@ -2,7 +2,8 @@ use super::*;
pub trait Subscriber_ { pub trait Subscriber_ {
fn handle(&self) -> &rcl_subscription_t; fn handle(&self) -> &rcl_subscription_t;
fn handle_incoming(&mut self) -> (); /// Returns true if the subscriber stream has been dropped.
fn handle_incoming(&mut self) -> bool;
fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> ();
} }
@ -36,7 +37,7 @@ where
&self.rcl_handle &self.rcl_handle
} }
fn handle_incoming(&mut self) -> () { fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new(); let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe { let ret = unsafe {
@ -50,10 +51,17 @@ where
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let msg = T::from_native(&msg); let msg = T::from_native(&msg);
match self.sender.try_send(msg) { match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e), Err(e) => {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
println!("error {:?}", e)
}
_ => (), _ => (),
} }
} }
return false;
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -71,7 +79,7 @@ where
&self.rcl_handle &self.rcl_handle
} }
fn handle_incoming(&mut self) -> () { fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new(); let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe { let ret = unsafe {
@ -84,10 +92,17 @@ where
}; };
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
match self.sender.try_send(msg) { match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e), Err(e) => {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
println!("error {:?}", e)
}
_ => (), _ => (),
} }
} }
return false;
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -102,7 +117,7 @@ impl Subscriber_ for UntypedSubscriber {
&self.rcl_handle &self.rcl_handle
} }
fn handle_incoming(&mut self) -> () { fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type)
.expect(&format!("no typesupport for {}", self.topic_type)); .expect(&format!("no typesupport for {}", self.topic_type));
@ -117,10 +132,17 @@ impl Subscriber_ for UntypedSubscriber {
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let json = msg.to_json(); let json = msg.to_json();
match self.sender.try_send(json) { match self.sender.try_send(json) {
Err(e) => println!("error {:?}", e), Err(e) => {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
println!("error {:?}", e)
}
_ => (), _ => (),
} }
} }
return false;
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {