From d82efff0fe4049c8b0ef74ed227b5ee49570e9c9 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Tue, 15 Jun 2021 13:10:50 +0200 Subject: [PATCH] cleanups + start of async api --- Cargo.toml | 2 + actions/build.rs | 2 + examples/client.rs | 43 +++-- examples/publish_complex_msgs.rs | 56 ------ examples/publishers.rs | 35 ++++ examples/rostopic_echo.rs | 34 ++-- examples/subscriber.rs | 87 ++++----- examples/subscriber_with_thread.rs | 50 ----- examples/wall_timer.rs | 38 +++- rcl/build.rs | 2 + src/lib.rs | 281 ++++++++++++++++------------- 11 files changed, 314 insertions(+), 316 deletions(-) delete mode 100644 examples/publish_complex_msgs.rs create mode 100644 examples/publishers.rs delete mode 100644 examples/subscriber_with_thread.rs diff --git a/Cargo.toml b/Cargo.toml index 14d6258..dabc59f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,11 @@ rcl = { path = "rcl", version = "0.0.3" } msg_gen = { path = "msg_gen", version = "0.0.3" } actions = { path = "actions", version = "0.0.1" } uuid = { version = "0.8", features = ["serde", "v4"] } +futures = "0.3.15" [dev-dependencies] serde_json = "1.0.62" +futures = "0.3.15" [build-dependencies] common = { path = "common", version = "0.0.3" } diff --git a/actions/build.rs b/actions/build.rs index 43c7bca..fa849f5 100644 --- a/actions/build.rs +++ b/actions/build.rs @@ -56,6 +56,8 @@ fn main() { let bindings = builder .no_debug("_OSUnaligned.*") + .derive_partialeq(true) + .derive_copy(true) // whitelist a few specific things that we need. .whitelist_recursively(false) .whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t") diff --git a/examples/client.rs b/examples/client.rs index 98300da..70089eb 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,7 +1,25 @@ +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; + use r2r; use r2r::example_interfaces::srv::AddTwoInts; +async fn requester_task(c: r2r::Client) -> Result<(), Box> { + let mut x: i64 = 0; + loop { + let req = AddTwoInts::Request { a: 10 * x, b: x }; + let resp = c.request(&req)?.await?; + println!("{} + {} = {}", req.a, req.b, resp.sum); + + x+=1; + if x == 10 { + break; + } + } + Ok(()) +} + fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; @@ -15,20 +33,17 @@ fn main() -> Result<(), Box> { println!("service available."); - let mut c = 0; + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + spawner.spawn_local(async move { + match requester_task(client).await { + Ok(()) => println!("exiting"), + Err(e) => println!("error: {}", e), + }})?; + loop { - let req = AddTwoInts::Request { a: 10 * c, b: c }; - - let cb_req = req.clone(); - let cb = Box::new(move |r: AddTwoInts::Response| { - println!("{} + {} = {}", cb_req.a, cb_req.b, r.sum) - }); - - client.request(&req, cb)?; - - node.spin_once(std::time::Duration::from_millis(1000)); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - c += 1; + node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); } } diff --git a/examples/publish_complex_msgs.rs b/examples/publish_complex_msgs.rs deleted file mode 100644 index e08da94..0000000 --- a/examples/publish_complex_msgs.rs +++ /dev/null @@ -1,56 +0,0 @@ -use r2r; -use r2r::builtin_interfaces::msg::Duration; -use r2r::std_msgs::msg::Int32; -use r2r::trajectory_msgs::msg::*; - -fn main() -> Result<(), Box> { - let ctx = r2r::Context::create()?; - let mut node = r2r::Node::create(ctx, "testnode", "")?; - let publisher = node.create_publisher::("/hej")?; - let publisher2 = node.create_publisher::("/native_count")?; - - let mut c = 0; - let mut positions: Vec = Vec::new(); - let cb = move |x: r2r::std_msgs::msg::String| { - println!("at count {} got: {}", c, x.data); - c = c + 1; - positions.push(c as f64); - let to_send = JointTrajectoryPoint { - positions: positions.clone(), - time_from_start: Duration { sec: c, nanosec: 0 }, - ..Default::default() - }; - let mut native = r2r::WrappedNativeMsg::::new(); - native.data = c; - - publisher.publish(&to_send).unwrap(); - publisher2.publish_native(&native).unwrap(); - }; - - let cb2 = move |x: JointTrajectoryPoint| { - let serialized = serde_json::to_string(&x).unwrap(); - println!("JTP serialized as: {}", serialized); - }; - - let cb3 = move |raw_c: &r2r::WrappedNativeMsg| { - println!("Raw c data: {:?}", raw_c.positions); - }; - - let _sub1 = node.subscribe("/hopp", Box::new(cb))?; - let _sub2 = node.subscribe("/hej", Box::new(cb2))?; - let _sub3 = node.subscribe_native("/hej", Box::new(cb3))?; - - // TODO: group subscriptions in a "node" struct - //let mut subs: Vec> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)]; - - // run for 10 seconds - let mut count = 0; - while count < 100 { - node.spin_once(std::time::Duration::from_millis(100)); - count += 1; - } - - println!("All done!"); - - Ok(()) -} diff --git a/examples/publishers.rs b/examples/publishers.rs new file mode 100644 index 0000000..37d4669 --- /dev/null +++ b/examples/publishers.rs @@ -0,0 +1,35 @@ +use r2r; +use r2r::builtin_interfaces::msg::Duration; +use r2r::std_msgs::msg::Int32; +use r2r::trajectory_msgs::msg::*; + +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let publisher = node.create_publisher::("/jtp")?; + let publisher2 = node.create_publisher::("/native_count")?; + + // run for 10 seconds + let mut count = 0; + let mut positions: Vec = Vec::new(); + while count < 100 { + positions.push(count as f64); + let to_send = JointTrajectoryPoint { + positions: positions.clone(), + time_from_start: Duration { sec: count, nanosec: 0 }, + ..Default::default() + }; + let mut native = r2r::WrappedNativeMsg::::new(); + native.data = count; + + publisher.publish(&to_send).unwrap(); + publisher2.publish_native(&native).unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(100)); + count += 1; + } + + println!("All done!"); + + Ok(()) +} diff --git a/examples/rostopic_echo.rs b/examples/rostopic_echo.rs index 3dfd5db..9be5259 100644 --- a/examples/rostopic_echo.rs +++ b/examples/rostopic_echo.rs @@ -1,3 +1,7 @@ +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; +use futures::stream::StreamExt; +use futures::future; use r2r; use std::collections::HashMap; use std::env; @@ -10,7 +14,10 @@ fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); let topic = args.get(1).expect("provide a topic!"); - // run for a while to populate the topic list + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + // run for a while to populate the topic list (note blocking...) let mut count = 0; let mut nt = HashMap::new(); while count < 50 { @@ -37,20 +44,23 @@ fn main() -> Result<(), Box> { let echo = &format!("{}_echo", topic); let echo_pub = node.create_publisher_untyped(echo, type_name)?; - let cb = move |msg: r2r::Result| match msg { - Ok(msg) => { - let s = serde_json::to_string_pretty(&msg).unwrap(); - println!("{}\n---\n", &s); - echo_pub.publish(msg).unwrap(); + let sub = node.subscribe_untyped(topic, type_name)?; + spawner.spawn_local(async move { sub.for_each(|msg| { + match msg { + Ok(msg) => { + let s = serde_json::to_string_pretty(&msg).unwrap(); + println!("{}\n---\n", &s); + echo_pub.publish(msg).unwrap(); + } + Err(err) => { + println!("Could not parse msg. {}", err); + } } - Err(err) => { - println!("Could not parse msg. {}", err); - } - }; - - let _subref = node.subscribe_untyped(topic, type_name, Box::new(cb))?; + future::ready(()) + }).await})?; loop { node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); } } diff --git a/examples/subscriber.rs b/examples/subscriber.rs index e6ee96c..413d256 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -1,66 +1,43 @@ +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; +use futures::stream::StreamExt; +use futures::future; use r2r; -use std::sync::mpsc; -use std::thread; fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut sub = node.subscribe::("/topic")?; + let p = node.create_publisher::("/topic2")?; - let th = { - let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); - let (tx, rx) = mpsc::channel::(); - - let p = node - .create_publisher::("/hej") - .unwrap(); - - let th = thread::spawn(move || loop { - println!("thread looping"); - let des = if let Ok(msg) = rx.recv() { - let deserialized: r2r::std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); - println!( - "received: {}, deserialized ros msg = {:#?}", - msg, deserialized - ); - deserialized - } else { - break; - }; - - if let Err(_) = p.publish(&des) { - break; + // task that every other time forwards message to topic2 + spawner.spawn_local(async move { + let mut x: i32 = 0; + loop { + match sub.next().await { + Some(msg) => { + if x % 2 == 0 { + p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap(); + } + }, + None => break, } - }); - - let tx1 = tx.clone(); - let cb = move |x: r2r::std_msgs::msg::String| { - let serialized = serde_json::to_string(&x).unwrap(); - tx1.send(serialized).unwrap(); // pass msg on to other thread for printing - }; - - let cb2 = move |x: &r2r::WrappedNativeMsg| { - // use native data! - let s = x.data.to_str(); - println!("native ros msg: {}", s); - }; - - let _subref = node.subscribe("/hopp", Box::new(cb))?; - let _subref = node.subscribe_native("/hoppe", Box::new(cb2))?; - - // run for 10 seconds - let mut count = 0; - while count < 100 { - node.spin_once(std::time::Duration::from_millis(100)); - count += 1; + x+=1; } - th - }; + })?; - println!("dropped node"); + // for sub2 we just print the data + let sub2 = node.subscribe::("/topic2")?; + spawner.spawn_local(async move { sub2.for_each(|msg| { + println!("topic2: new msg: {}", msg.data); + future::ready(()) + }).await})?; - th.join().unwrap(); - - println!("all done"); - - Ok(()) + loop { + node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); + } } diff --git a/examples/subscriber_with_thread.rs b/examples/subscriber_with_thread.rs deleted file mode 100644 index 7375d11..0000000 --- a/examples/subscriber_with_thread.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::sync::mpsc; -use std::thread; - -use r2r; -use r2r::std_msgs; - -fn main() -> Result<(), Box> { - let ctx = r2r::Context::create()?; - let mut node = r2r::Node::create(ctx, "testnode", "")?; - - let publisher = node.create_publisher::("/hello")?; - let pubint = node.create_publisher::("/count")?; - - let (tx, rx) = mpsc::channel::(); - thread::spawn(move || loop { - if let Ok(msg) = rx.recv() { - let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); - println!( - "received: {}, deserialized ros msg = {:?}", - msg, deserialized - ); - } else { - println!("stopping"); - break; - } - }); - - let mut c = 0; - let cb = move |x: std_msgs::msg::String| { - let to_send = format!("at count {} got: {}", c, x.data); - c = c + 1; - let serialized = serde_json::to_string(&x).unwrap(); - tx.send(serialized).unwrap(); // pass msg on to other thread for printing - let to_send = std_msgs::msg::String { data: to_send }; - publisher.publish(&to_send).unwrap(); - let to_send = std_msgs::msg::Int32 { data: c }; - pubint.publish(&to_send).unwrap(); - }; - - let _ws2 = node.subscribe("/hi", Box::new(cb))?; - - // run for 10 seconds - let mut count = 0; - while count < 100 { - node.spin_once(std::time::Duration::from_millis(100)); - count += 1; - } - - Ok(()) -} diff --git a/examples/wall_timer.rs b/examples/wall_timer.rs index cbd4da9..411f2c4 100644 --- a/examples/wall_timer.rs +++ b/examples/wall_timer.rs @@ -1,21 +1,43 @@ +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; use r2r; -fn main() -> Result<(), Box> { - let ctx = r2r::Context::create()?; - let mut node = r2r::Node::create(ctx, "testnode", "")?; - - let mut x = 0; - let cb = move |elapsed: std::time::Duration| { +async fn timer_task(mut t: r2r::Timer) -> Result<(), Box> { + let mut x: i32 = 0; + loop { + let elapsed = t.tick().await?; println!( "timer called ({}), {}us since last call", x, elapsed.as_micros() ); + x += 1; - }; - node.create_wall_timer(std::time::Duration::from_millis(2000), Box::new(cb))?; + if x == 10 { + break; + } + } + Ok(()) +} + +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let timer = node.create_wall_timer(std::time::Duration::from_millis(1000))?; + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + spawner.spawn_local(async move { + match timer_task(timer).await { + Ok(()) => println!("exiting"), + Err(e) => println!("error: {}", e), + }})?; loop { node.spin_once(std::time::Duration::from_millis(100)); + + pool.run_until_stalled(); } } diff --git a/rcl/build.rs b/rcl/build.rs index debb137..c07f6bc 100644 --- a/rcl/build.rs +++ b/rcl/build.rs @@ -66,6 +66,8 @@ fn main() { let bindings = builder .no_debug("_OSUnaligned.*") + .derive_partialeq(true) + .derive_copy(true) .generate() .expect("Unable to generate bindings"); diff --git a/src/lib.rs b/src/lib.rs index 7988873..2e245bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,11 @@ use std::ops::{Deref, DerefMut}; use std::time::Duration; use std::fmt::Debug; +use std::future::Future; +use futures::future::TryFutureExt; +use futures::channel::{mpsc, oneshot}; +use futures::stream::{Stream, StreamExt}; + use msg_gen::*; use rcl::*; use actions::*; @@ -109,9 +114,9 @@ impl WrappedNativeMsgUntyped { WrappedNativeMsgUntyped { ts: T::get_ts(), msg: T::create_msg() as *mut std::os::raw::c_void, - destroy: destroy, - msg_to_json: msg_to_json, - msg_from_json: msg_from_json, + destroy, + msg_to_json, + msg_from_json, } } @@ -199,8 +204,7 @@ where pub trait Sub { fn handle(&self) -> &rcl_subscription_t; - fn run_cb(&mut self) -> (); - fn rcl_msg(&mut self) -> *mut std::os::raw::c_void; + fn handle_incoming(&mut self) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> (); } @@ -209,8 +213,7 @@ where T: WrappedTypesupport, { rcl_handle: rcl_subscription_t, - callback: Box ()>, - rcl_msg: WrappedNativeMsg, + sender: mpsc::Sender, } struct WrappedSubNative @@ -218,14 +221,13 @@ where T: WrappedTypesupport, { rcl_handle: rcl_subscription_t, - callback: Box) -> ()>, - rcl_msg: WrappedNativeMsg, + sender: mpsc::Sender>, } struct WrappedSubUntyped { rcl_handle: rcl_subscription_t, - rcl_msg: WrappedNativeMsgUntyped, - callback: Box) -> ()>, + topic_type: String, + sender: mpsc::Sender>, } impl Sub for WrappedSub @@ -236,14 +238,19 @@ where &self.rcl_handle } - fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { - self.rcl_msg.void_ptr_mut() - } - - fn run_cb(&mut self) -> () { - // copy native msg to rust type and run callback - let msg = T::from_native(&self.rcl_msg); - (self.callback)(msg); + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + let msg = T::from_native(&msg); + match self.sender.try_send(msg) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -261,14 +268,18 @@ where &self.rcl_handle } - fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { - self.rcl_msg.void_ptr_mut() - } - - fn run_cb(&mut self) -> () { - // *dont't* copy native msg to rust type. - // e.g. if you for instance have large image data... - (self.callback)(&self.rcl_msg); + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + match self.sender.try_send(msg) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -283,13 +294,20 @@ impl Sub for WrappedSubUntyped { &self.rcl_handle } - fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { - self.rcl_msg.void_ptr_mut() - } - - fn run_cb(&mut self) -> () { - let json = self.rcl_msg.to_json(); - (self.callback)(json); + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) + .expect(&format!("no typesupport for {}", self.topic_type)); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + let json = msg.to_json(); + match self.sender.try_send(json) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -366,17 +384,13 @@ where T: WrappedServiceTypeSupport, { rcl_handle: rcl_client_t, - rcl_request: rmw_request_id_t, // store callbacks with request sequence id and callback function - callbacks: Vec<(i64, Box)>, - rcl_response_msg: WrappedNativeMsg, + response_channels: Vec<(i64, oneshot::Sender)>, } pub trait Client_ { fn handle(&self) -> &rcl_client_t; - fn run_cb(&mut self) -> (); - fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void; - fn rcl_request_id(&mut self) -> *mut rmw_request_id_t; + fn handle_response(&mut self) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> (); } @@ -388,35 +402,37 @@ where &self.rcl_handle } - fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void { - self.rcl_response_msg.void_ptr_mut() - } + fn handle_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::::new(); - fn rcl_request_id(&mut self) -> *mut rmw_request_id_t { - &mut self.rcl_request - } - - fn run_cb(&mut self) -> () { - // copy native msg to rust type and run callback - let req_id = self.rcl_request.sequence_number; - if let Some(idx) = self.callbacks.iter().position(|(id, _)| id == &req_id) { - let (_, cb_to_run) = self.callbacks.swap_remove(idx); - let response = T::Response::from_native(&self.rcl_response_msg); - (cb_to_run)(response); - } else { - // I don't think this should be able to occur? Let's panic so we - // find out... - let we_have: String = self - .callbacks - .iter() - .map(|(id, _)| id.to_string()) - .collect::>() - .join(","); - eprintln!( - "no such req id: {}, we have [{}], ignoring", - req_id, we_have - ); - } + let ret = unsafe { + rcl_take_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + let (_, channel) = self.response_channels.swap_remove(idx); + let response = T::Response::from_native(&response_msg); + match channel.send(response) { + Ok(()) => {}, + Err(e) => { println!("error sending: {:?}", e); }, + } + } else { + // I don't think this should be able to occur? Let's panic so we + // find out... + let we_have: String = self + .response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } // TODO handle failure. } fn destroy(&mut self, node: &mut rcl_node_t) { @@ -864,7 +880,7 @@ pub struct Node { // action clients action_clients: Vec<(rcl_action_client_t, Arc>)>, // timers, - timers: Vec, + timers: Vec, // and the publishers, whom we allow to be shared.. hmm. pubs: Vec>, } @@ -1034,37 +1050,37 @@ impl Node { pub fn subscribe( &mut self, topic: &str, - callback: Box ()>, - ) -> Result<&rcl_subscription_t> + ) -> Result + Unpin> where T: WrappedTypesupport, { let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; + let (sender, receiver) = mpsc::channel::(10); + let ws = WrappedSub { rcl_handle: subscription_handle, - rcl_msg: WrappedNativeMsg::::new(), - callback: callback, + sender, }; self.subs.push(Box::new(ws)); - Ok(self.subs.last().unwrap().handle()) // hmm... + Ok(receiver) } pub fn subscribe_native( &mut self, topic: &str, - callback: Box) -> ()>, - ) -> Result<&rcl_subscription_t> + ) -> Result>> where T: WrappedTypesupport, { let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; + let (sender, receiver) = mpsc::channel::>(10); + let ws = WrappedSubNative { rcl_handle: subscription_handle, - rcl_msg: WrappedNativeMsg::::new(), - callback: callback, + sender, }; self.subs.push(Box::new(ws)); - Ok(self.subs.last().unwrap().handle()) // hmm... + Ok(receiver) } // Its not really untyped since we know the underlying type... But we throw this info away :) @@ -1072,18 +1088,18 @@ impl Node { &mut self, topic: &str, topic_type: &str, - callback: Box) -> ()>, - ) -> Result<&rcl_subscription_t> { + ) -> Result>> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let subscription_handle = self.create_subscription_helper(topic, msg.ts)?; + let (sender, receiver) = mpsc::channel::>(10); let ws = WrappedSubUntyped { rcl_handle: subscription_handle, - rcl_msg: msg, - callback: callback, + topic_type: topic_type.to_string(), + sender, }; self.subs.push(Box::new(ws)); - Ok(self.subs.last().unwrap().handle()) // hmm... + Ok(receiver) } pub fn create_service_helper( @@ -1128,7 +1144,7 @@ impl Node { sequence_number: 0, }, rcl_request_msg: WrappedNativeMsg::::new(), - callback: callback, + callback, }; self.services.push(Box::new(ws)); @@ -1166,17 +1182,9 @@ impl Node { T: WrappedServiceTypeSupport, { let client_handle = self.create_client_helper(service_name, T::get_ts())?; - let cloned_ch = rcl_client_t { - impl_: client_handle.impl_, - }; let ws = WrappedClient:: { - rcl_handle: cloned_ch, - rcl_request: rmw_request_id_t { - writer_guid: [0; 16usize], - sequence_number: 0, - }, - rcl_response_msg: WrappedNativeMsg::::new(), - callbacks: Vec::new(), + rcl_handle: client_handle, + response_channels: Vec::new(), }; let arc = Arc::new(Mutex::new(ws)); @@ -1239,9 +1247,8 @@ impl Node { T: WrappedActionTypeSupport, { let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; - let cloned_handle = unsafe { core::ptr::read(&client_handle) }; let wa = WrappedActionClient:: { - rcl_handle: cloned_handle, + rcl_handle: client_handle, goal_request_callbacks: Vec::new(), feedback_callbacks: Vec::new(), goal_statuses: Vec::new(), @@ -1445,20 +1452,15 @@ impl Node { let ws_subs = unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; - let mut msg_info = rmw_message_info_t::default(); // we dont care for now for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { if ws_s != &std::ptr::null() { - let ret = unsafe { - rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut()) - }; - if ret == RCL_RET_OK as i32 { - s.run_cb(); - } + s.handle_incoming(); } } let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, ws.size_of_timers) }; assert_eq!(ws_timers.len(), self.timers.len()); + let mut timers_to_remove = vec![]; for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) { if ws_s != &std::ptr::null() { let mut is_ready = false; @@ -1473,24 +1475,32 @@ impl Node { if ret == RCL_RET_OK as i32 { let ret = unsafe { rcl_timer_call(&mut s.timer_handle) }; if ret == RCL_RET_OK as i32 { - (s.callback)(Duration::from_nanos(nanos as u64)); + match s.sender.try_send(Duration::from_nanos(nanos as u64)) { + Err(e) => { + if e.is_full() { + println!("Warning: timer tick not handled in time - no wakeup will occur"); + } + if e.is_disconnected() { + // client dropped the timer handle, let's drop our timer as well. + timers_to_remove.push(s.timer_handle); + } + }, + _ => {} // ok + } } } } } } } + // drop timers scheduled for deletion + self.timers.retain(|t| !timers_to_remove.contains(&t.timer_handle)); let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { if ws_s != &std::ptr::null() { let mut s = s.lock().unwrap(); - let ret = unsafe { - rcl_take_response(s.handle(), s.rcl_request_id(), s.rcl_response_msg()) - }; - if ret == RCL_RET_OK as i32 { - s.run_cb(); - } + s.handle_response(); } } @@ -1609,8 +1619,7 @@ impl Node { pub fn create_wall_timer( &mut self, period: Duration, - callback: Box ()>, - ) -> Result<&Timer> { + ) -> Result { let mut clock_handle = MaybeUninit::::uninit(); let ret = unsafe { @@ -1646,14 +1655,20 @@ impl Node { } } - let timer = Timer { + let (tx, rx) = mpsc::channel::(1); + + let timer = Timer_ { timer_handle, clock_handle, - callback, + sender: tx, }; self.timers.push(timer); - Ok(&self.timers[self.timers.len() - 1]) + let out_timer = Timer { + receiver: rx, + }; + + Ok(out_timer) } pub fn logger<'a>(&'a self) -> &'a str { @@ -1666,10 +1681,31 @@ impl Node { } } -pub struct Timer { +pub struct Timer_ { timer_handle: rcl_timer_t, clock_handle: Box, - callback: Box ()>, + sender: mpsc::Sender, +} + +impl Drop for Timer_ { + fn drop(&mut self) { + unsafe { rcl_timer_fini(&mut self.timer_handle); } + } +} + +pub struct Timer { + receiver: mpsc::Receiver, +} + +impl Timer { + pub async fn tick(&mut self) -> Result { + let next = self.receiver.next().await; + if let Some(elapsed) = next { + Ok(elapsed) + } else { + Err(Error::RCL_RET_TIMER_INVALID) + } + } } // Since publishers are temporarily upgraded to owners during the @@ -1773,7 +1809,7 @@ impl Client where T: WrappedServiceTypeSupport, { - pub fn request(&self, msg: &T::Request, cb: Box ()>) -> Result<()> + pub fn request(&self, msg: &T::Request) -> Result>> where T: WrappedServiceTypeSupport, { @@ -1789,9 +1825,12 @@ where let result = unsafe { rcl_send_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + let (sender, receiver) = oneshot::channel::(); + if result == RCL_RET_OK as i32 { - client.callbacks.push((seq_no, cb)); - Ok(()) + client.response_channels.push((seq_no, sender)); + // instead of "canceled" we return invalid client. + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) } else { eprintln!("coult not send request {}", result); Err(Error::from_rcl_error(result))