Refactor timers

This commit is contained in:
Martin Dahl 2021-09-01 19:08:54 +02:00
parent e0e5ed7dc4
commit 807d9fc59b
2 changed files with 72 additions and 80 deletions

View File

@ -10,7 +10,7 @@ pub enum ClockType {
unsafe impl Send for Clock {}
pub struct Clock {
clock_handle: Box<rcl_clock_t>,
pub(crate) clock_handle: Box<rcl_clock_t>,
}
pub fn clock_type_to_rcl(ct: &ClockType) -> rcl_clock_type_t {

View File

@ -6,7 +6,7 @@ pub struct Node {
pub params: Arc<Mutex<HashMap<String, ParameterValue>>>,
node_handle: Box<rcl_node_t>,
// the node owns the subscribers
subs: Vec<Box<dyn Subscriber_>>,
subscribers: Vec<Box<dyn Subscriber_>>,
// services,
services: Vec<Arc<Mutex<dyn Service_>>>,
// service clients
@ -148,7 +148,7 @@ impl Node {
params: Arc::new(Mutex::new(HashMap::new())),
context: ctx,
node_handle,
subs: Vec::new(),
subscribers: Vec::new(),
services: Vec::new(),
clients: Vec::new(),
action_clients: Vec::new(),
@ -273,7 +273,7 @@ impl Node {
rcl_handle: subscription_handle,
sender,
};
self.subs.push(Box::new(ws));
self.subscribers.push(Box::new(ws));
Ok(receiver)
}
@ -295,7 +295,7 @@ impl Node {
rcl_handle: subscription_handle,
sender,
};
self.subs.push(Box::new(ws));
self.subscribers.push(Box::new(ws));
Ok(receiver)
}
@ -318,7 +318,7 @@ impl Node {
topic_type: topic_type.to_string(),
sender,
};
self.subs.push(Box::new(ws));
self.subscribers.push(Box::new(ws));
Ok(receiver)
}
@ -647,7 +647,7 @@ impl Node {
unsafe {
rcl_wait_set_init(
&mut ws,
self.subs.len() + total_action_subs,
self.subscribers.len() + total_action_subs,
0,
self.timers.len() + total_action_timers,
self.clients.len() + total_action_clients,
@ -662,7 +662,7 @@ impl Node {
rcl_wait_set_clear(&mut ws);
}
for s in &self.subs {
for s in &self.subscribers {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
}
@ -720,9 +720,9 @@ impl Node {
return;
}
let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) };
let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subscribers.len()) };
let mut subs_to_remove = vec![];
for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) {
for (s, ws_s) in self.subscribers.iter_mut().zip(ws_subs) {
if ws_s != &std::ptr::null() {
let dropped = s.handle_incoming();
if dropped {
@ -731,44 +731,16 @@ impl Node {
}
}
}
self.subs.retain(|s| !subs_to_remove.contains(s.handle()));
self.subscribers.retain(|s| !subs_to_remove.contains(s.handle()));
let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) };
let mut timers_to_remove = vec![];
for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) {
if ws_s != &std::ptr::null() {
// TODO: move this to impl Timer
let mut is_ready = false;
let ret = unsafe { rcl_timer_is_ready(&s.timer_handle, &mut is_ready) };
if ret == RCL_RET_OK as i32 {
if is_ready {
let mut nanos = 0i64;
// todo: error handling
let ret = unsafe {
rcl_timer_get_time_since_last_call(&s.timer_handle, &mut nanos)
};
if ret == RCL_RET_OK as i32 {
let ret = unsafe { rcl_timer_call(&mut s.timer_handle) };
if ret == RCL_RET_OK as i32 {
match s.sender.try_send(Duration::from_nanos(nanos as u64)) {
Err(e) => {
if e.is_full() {
println!("Warning: timer tick not handled in time - no wakeup will occur");
}
if e.is_disconnected() {
// TODO: cleanup
let _ret = unsafe { rcl_timer_fini(&mut s.timer_handle) };
let _ret = unsafe { rcl_steady_clock_fini(s.clock_handle.as_mut()) };
// client dropped the timer handle, let's drop our timer as well.
timers_to_remove.push(s.timer_handle);
}
}
_ => {} // ok
}
}
}
}
let dropped = s.handle_incoming();
if dropped {
timers_to_remove.push(s.timer_handle);
}
}
}
@ -936,46 +908,32 @@ impl Node {
///
/// Create a ROS timer that is woken up by spin every `period`.
pub fn create_wall_timer(&mut self, period: Duration) -> Result<Timer> {
let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit();
let mut clock = Clock::create(ClockType::SteadyTime)?;
let ret = unsafe {
rcl_steady_clock_init(
clock_handle.as_mut_ptr(),
&mut rcutils_get_default_allocator(),
)
};
if ret != RCL_RET_OK as i32 {
eprintln!("could not create steady clock: {}", ret);
return Err(Error::from_rcl_error(ret));
}
let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() });
let mut timer_handle = unsafe { rcl_get_zero_initialized_timer() };
{
let mut ctx = self.context.context_handle.lock().unwrap();
let ret = unsafe {
rcl_timer_init(
&mut timer_handle,
clock_handle.as_mut(),
ctx.as_mut(),
period.as_nanos() as i64,
None,
rcutils_get_default_allocator(),
)
};
let mut ctx = self.context.context_handle.lock().unwrap();
let ret = unsafe {
rcl_timer_init(
&mut timer_handle,
clock.clock_handle.as_mut(),
ctx.as_mut(),
period.as_nanos() as i64,
None,
rcutils_get_default_allocator(),
)
};
if ret != RCL_RET_OK as i32 {
eprintln!("could not create timer: {}", ret);
return Err(Error::from_rcl_error(ret));
}
if ret != RCL_RET_OK as i32 {
eprintln!("could not create timer: {}", ret);
return Err(Error::from_rcl_error(ret));
}
let (tx, rx) = mpsc::channel::<Duration>(1);
let timer = Timer_ {
timer_handle,
clock_handle,
_clock: clock,
sender: tx,
};
self.timers.push(timer);
@ -998,10 +956,50 @@ impl Node {
struct Timer_ {
timer_handle: rcl_timer_t,
clock_handle: Box<rcl_clock_t>,
_clock: Clock, // just here to be dropped properly later.
sender: mpsc::Sender<Duration>,
}
impl Timer_ {
fn handle_incoming(&mut self) -> bool {
let mut is_ready = false;
let ret = unsafe { rcl_timer_is_ready(&self.timer_handle, &mut is_ready) };
if ret == RCL_RET_OK as i32 {
if is_ready {
let mut nanos = 0i64;
// todo: error handling
let ret = unsafe {
rcl_timer_get_time_since_last_call(&self.timer_handle, &mut nanos)
};
if ret == RCL_RET_OK as i32 {
let ret = unsafe { rcl_timer_call(&mut self.timer_handle) };
if ret == RCL_RET_OK as i32 {
match self.sender.try_send(Duration::from_nanos(nanos as u64)) {
Err(e) => {
if e.is_disconnected() {
// client dropped the timer handle, let's drop our timer as well.
return true;
}
if e.is_full() {
println!("Warning: timer tick not handled in time - no wakeup will occur");
}
}
_ => {} // ok
}
}
}
}
}
return false;
}
}
impl Drop for Timer_ {
fn drop(&mut self) {
let _ret = unsafe { rcl_timer_fini(&mut self.timer_handle) };
}
}
/// A ROS timer.
pub struct Timer {
receiver: mpsc::Receiver<Duration>,
@ -1040,18 +1038,12 @@ impl Drop for Node {
// fini functions are not thread safe so lock the context.
let _ctx_handle = self.context.context_handle.lock().unwrap();
for s in &mut self.subs {
for s in &mut self.subscribers {
s.destroy(&mut self.node_handle);
}
for s in &mut self.services {
s.lock().unwrap().destroy(&mut self.node_handle);
}
for t in &mut self.timers {
// TODO: check return values
let _ret = unsafe { rcl_timer_fini(&mut t.timer_handle) };
// TODO: allow other types of clocks...
let _ret = unsafe { rcl_steady_clock_fini(t.clock_handle.as_mut()) };
}
for c in &mut self.action_clients {
c.lock().unwrap().destroy(&mut self.node_handle);
}