cleanups + start of async api

This commit is contained in:
Martin Dahl 2021-06-15 13:10:50 +02:00
parent c3be77224e
commit d82efff0fe
11 changed files with 314 additions and 316 deletions

View File

@ -17,9 +17,11 @@ rcl = { path = "rcl", version = "0.0.3" }
msg_gen = { path = "msg_gen", version = "0.0.3" } msg_gen = { path = "msg_gen", version = "0.0.3" }
actions = { path = "actions", version = "0.0.1" } actions = { path = "actions", version = "0.0.1" }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
futures = "0.3.15"
[dev-dependencies] [dev-dependencies]
serde_json = "1.0.62" serde_json = "1.0.62"
futures = "0.3.15"
[build-dependencies] [build-dependencies]
common = { path = "common", version = "0.0.3" } common = { path = "common", version = "0.0.3" }

View File

@ -56,6 +56,8 @@ fn main() {
let bindings = builder let bindings = builder
.no_debug("_OSUnaligned.*") .no_debug("_OSUnaligned.*")
.derive_partialeq(true)
.derive_copy(true)
// whitelist a few specific things that we need. // whitelist a few specific things that we need.
.whitelist_recursively(false) .whitelist_recursively(false)
.whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t") .whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t")

View File

@ -1,7 +1,25 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use r2r; use r2r;
use r2r::example_interfaces::srv::AddTwoInts; use r2r::example_interfaces::srv::AddTwoInts;
async fn requester_task(c: r2r::Client<AddTwoInts::Service>) -> Result<(), Box<dyn std::error::Error>> {
let mut x: i64 = 0;
loop {
let req = AddTwoInts::Request { a: 10 * x, b: x };
let resp = c.request(&req)?.await?;
println!("{} + {} = {}", req.a, req.b, resp.sum);
x+=1;
if x == 10 {
break;
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?; let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?; let mut node = r2r::Node::create(ctx, "testnode", "")?;
@ -15,20 +33,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("service available."); println!("service available.");
let mut c = 0; let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
match requester_task(client).await {
Ok(()) => println!("exiting"),
Err(e) => println!("error: {}", e),
}})?;
loop { loop {
let req = AddTwoInts::Request { a: 10 * c, b: c }; node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
let cb_req = req.clone();
let cb = Box::new(move |r: AddTwoInts::Response| {
println!("{} + {} = {}", cb_req.a, cb_req.b, r.sum)
});
client.request(&req, cb)?;
node.spin_once(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(1000));
c += 1;
} }
} }

View File

@ -1,56 +0,0 @@
use r2r;
use r2r::builtin_interfaces::msg::Duration;
use r2r::std_msgs::msg::Int32;
use r2r::trajectory_msgs::msg::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let publisher = node.create_publisher::<JointTrajectoryPoint>("/hej")?;
let publisher2 = node.create_publisher::<Int32>("/native_count")?;
let mut c = 0;
let mut positions: Vec<f64> = Vec::new();
let cb = move |x: r2r::std_msgs::msg::String| {
println!("at count {} got: {}", c, x.data);
c = c + 1;
positions.push(c as f64);
let to_send = JointTrajectoryPoint {
positions: positions.clone(),
time_from_start: Duration { sec: c, nanosec: 0 },
..Default::default()
};
let mut native = r2r::WrappedNativeMsg::<Int32>::new();
native.data = c;
publisher.publish(&to_send).unwrap();
publisher2.publish_native(&native).unwrap();
};
let cb2 = move |x: JointTrajectoryPoint| {
let serialized = serde_json::to_string(&x).unwrap();
println!("JTP serialized as: {}", serialized);
};
let cb3 = move |raw_c: &r2r::WrappedNativeMsg<JointTrajectoryPoint>| {
println!("Raw c data: {:?}", raw_c.positions);
};
let _sub1 = node.subscribe("/hopp", Box::new(cb))?;
let _sub2 = node.subscribe("/hej", Box::new(cb2))?;
let _sub3 = node.subscribe_native("/hej", Box::new(cb3))?;
// TODO: group subscriptions in a "node" struct
//let mut subs: Vec<Box<Sub>> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
// run for 10 seconds
let mut count = 0;
while count < 100 {
node.spin_once(std::time::Duration::from_millis(100));
count += 1;
}
println!("All done!");
Ok(())
}

35
examples/publishers.rs Normal file
View File

@ -0,0 +1,35 @@
use r2r;
use r2r::builtin_interfaces::msg::Duration;
use r2r::std_msgs::msg::Int32;
use r2r::trajectory_msgs::msg::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let publisher = node.create_publisher::<JointTrajectoryPoint>("/jtp")?;
let publisher2 = node.create_publisher::<Int32>("/native_count")?;
// run for 10 seconds
let mut count = 0;
let mut positions: Vec<f64> = Vec::new();
while count < 100 {
positions.push(count as f64);
let to_send = JointTrajectoryPoint {
positions: positions.clone(),
time_from_start: Duration { sec: count, nanosec: 0 },
..Default::default()
};
let mut native = r2r::WrappedNativeMsg::<Int32>::new();
native.data = count;
publisher.publish(&to_send).unwrap();
publisher2.publish_native(&native).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
count += 1;
}
println!("All done!");
Ok(())
}

View File

@ -1,3 +1,7 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::stream::StreamExt;
use futures::future;
use r2r; use r2r;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
@ -10,7 +14,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
let topic = args.get(1).expect("provide a topic!"); let topic = args.get(1).expect("provide a topic!");
// run for a while to populate the topic list let mut pool = LocalPool::new();
let spawner = pool.spawner();
// run for a while to populate the topic list (note blocking...)
let mut count = 0; let mut count = 0;
let mut nt = HashMap::new(); let mut nt = HashMap::new();
while count < 50 { while count < 50 {
@ -37,20 +44,23 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let echo = &format!("{}_echo", topic); let echo = &format!("{}_echo", topic);
let echo_pub = node.create_publisher_untyped(echo, type_name)?; let echo_pub = node.create_publisher_untyped(echo, type_name)?;
let cb = move |msg: r2r::Result<serde_json::Value>| match msg { let sub = node.subscribe_untyped(topic, type_name)?;
Ok(msg) => { spawner.spawn_local(async move { sub.for_each(|msg| {
let s = serde_json::to_string_pretty(&msg).unwrap(); match msg {
println!("{}\n---\n", &s); Ok(msg) => {
echo_pub.publish(msg).unwrap(); let s = serde_json::to_string_pretty(&msg).unwrap();
println!("{}\n---\n", &s);
echo_pub.publish(msg).unwrap();
}
Err(err) => {
println!("Could not parse msg. {}", err);
}
} }
Err(err) => { future::ready(())
println!("Could not parse msg. {}", err); }).await})?;
}
};
let _subref = node.subscribe_untyped(topic, type_name, Box::new(cb))?;
loop { loop {
node.spin_once(std::time::Duration::from_millis(100)); node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
} }
} }

View File

@ -1,66 +1,43 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::stream::StreamExt;
use futures::future;
use r2r; use r2r;
use std::sync::mpsc;
use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?; let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let mut sub = node.subscribe::<r2r::std_msgs::msg::String>("/topic")?;
let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic2")?;
let th = { let mut pool = LocalPool::new();
let mut node = r2r::Node::create(ctx, "testnode", "")?; let spawner = pool.spawner();
let (tx, rx) = mpsc::channel::<String>(); // task that every other time forwards message to topic2
spawner.spawn_local(async move {
let p = node let mut x: i32 = 0;
.create_publisher::<r2r::std_msgs::msg::String>("/hej") loop {
.unwrap(); match sub.next().await {
Some(msg) => {
let th = thread::spawn(move || loop { if x % 2 == 0 {
println!("thread looping"); p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap();
let des = if let Ok(msg) = rx.recv() { }
let deserialized: r2r::std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); },
println!( None => break,
"received: {}, deserialized ros msg = {:#?}",
msg, deserialized
);
deserialized
} else {
break;
};
if let Err(_) = p.publish(&des) {
break;
} }
}); x+=1;
let tx1 = tx.clone();
let cb = move |x: r2r::std_msgs::msg::String| {
let serialized = serde_json::to_string(&x).unwrap();
tx1.send(serialized).unwrap(); // pass msg on to other thread for printing
};
let cb2 = move |x: &r2r::WrappedNativeMsg<r2r::std_msgs::msg::String>| {
// use native data!
let s = x.data.to_str();
println!("native ros msg: {}", s);
};
let _subref = node.subscribe("/hopp", Box::new(cb))?;
let _subref = node.subscribe_native("/hoppe", Box::new(cb2))?;
// run for 10 seconds
let mut count = 0;
while count < 100 {
node.spin_once(std::time::Duration::from_millis(100));
count += 1;
} }
th })?;
};
println!("dropped node"); // for sub2 we just print the data
let sub2 = node.subscribe::<r2r::std_msgs::msg::String>("/topic2")?;
spawner.spawn_local(async move { sub2.for_each(|msg| {
println!("topic2: new msg: {}", msg.data);
future::ready(())
}).await})?;
th.join().unwrap(); loop {
node.spin_once(std::time::Duration::from_millis(100));
println!("all done"); pool.run_until_stalled();
}
Ok(())
} }

View File

@ -1,50 +0,0 @@
use std::sync::mpsc;
use std::thread;
use r2r;
use r2r::std_msgs;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let publisher = node.create_publisher::<std_msgs::msg::String>("/hello")?;
let pubint = node.create_publisher::<std_msgs::msg::Int32>("/count")?;
let (tx, rx) = mpsc::channel::<String>();
thread::spawn(move || loop {
if let Ok(msg) = rx.recv() {
let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap();
println!(
"received: {}, deserialized ros msg = {:?}",
msg, deserialized
);
} else {
println!("stopping");
break;
}
});
let mut c = 0;
let cb = move |x: std_msgs::msg::String| {
let to_send = format!("at count {} got: {}", c, x.data);
c = c + 1;
let serialized = serde_json::to_string(&x).unwrap();
tx.send(serialized).unwrap(); // pass msg on to other thread for printing
let to_send = std_msgs::msg::String { data: to_send };
publisher.publish(&to_send).unwrap();
let to_send = std_msgs::msg::Int32 { data: c };
pubint.publish(&to_send).unwrap();
};
let _ws2 = node.subscribe("/hi", Box::new(cb))?;
// run for 10 seconds
let mut count = 0;
while count < 100 {
node.spin_once(std::time::Duration::from_millis(100));
count += 1;
}
Ok(())
}

View File

@ -1,21 +1,43 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use r2r; use r2r;
fn main() -> Result<(), Box<dyn std::error::Error>> { async fn timer_task(mut t: r2r::Timer) -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?; let mut x: i32 = 0;
let mut node = r2r::Node::create(ctx, "testnode", "")?; loop {
let elapsed = t.tick().await?;
let mut x = 0;
let cb = move |elapsed: std::time::Duration| {
println!( println!(
"timer called ({}), {}us since last call", "timer called ({}), {}us since last call",
x, x,
elapsed.as_micros() elapsed.as_micros()
); );
x += 1; x += 1;
}; if x == 10 {
node.create_wall_timer(std::time::Duration::from_millis(2000), Box::new(cb))?; break;
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let timer = node.create_wall_timer(std::time::Duration::from_millis(1000))?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
match timer_task(timer).await {
Ok(()) => println!("exiting"),
Err(e) => println!("error: {}", e),
}})?;
loop { loop {
node.spin_once(std::time::Duration::from_millis(100)); node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
} }
} }

View File

@ -66,6 +66,8 @@ fn main() {
let bindings = builder let bindings = builder
.no_debug("_OSUnaligned.*") .no_debug("_OSUnaligned.*")
.derive_partialeq(true)
.derive_copy(true)
.generate() .generate()
.expect("Unable to generate bindings"); .expect("Unable to generate bindings");

View File

@ -13,6 +13,11 @@ use std::ops::{Deref, DerefMut};
use std::time::Duration; use std::time::Duration;
use std::fmt::Debug; use std::fmt::Debug;
use std::future::Future;
use futures::future::TryFutureExt;
use futures::channel::{mpsc, oneshot};
use futures::stream::{Stream, StreamExt};
use msg_gen::*; use msg_gen::*;
use rcl::*; use rcl::*;
use actions::*; use actions::*;
@ -109,9 +114,9 @@ impl WrappedNativeMsgUntyped {
WrappedNativeMsgUntyped { WrappedNativeMsgUntyped {
ts: T::get_ts(), ts: T::get_ts(),
msg: T::create_msg() as *mut std::os::raw::c_void, msg: T::create_msg() as *mut std::os::raw::c_void,
destroy: destroy, destroy,
msg_to_json: msg_to_json, msg_to_json,
msg_from_json: msg_from_json, msg_from_json,
} }
} }
@ -199,8 +204,7 @@ where
pub trait Sub { pub trait Sub {
fn handle(&self) -> &rcl_subscription_t; fn handle(&self) -> &rcl_subscription_t;
fn run_cb(&mut self) -> (); fn handle_incoming(&mut self) -> ();
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void;
fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> ();
} }
@ -209,8 +213,7 @@ where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
rcl_handle: rcl_subscription_t, rcl_handle: rcl_subscription_t,
callback: Box<dyn FnMut(T) -> ()>, sender: mpsc::Sender<T>,
rcl_msg: WrappedNativeMsg<T>,
} }
struct WrappedSubNative<T> struct WrappedSubNative<T>
@ -218,14 +221,13 @@ where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
rcl_handle: rcl_subscription_t, rcl_handle: rcl_subscription_t,
callback: Box<dyn FnMut(&WrappedNativeMsg<T>) -> ()>, sender: mpsc::Sender<WrappedNativeMsg<T>>,
rcl_msg: WrappedNativeMsg<T>,
} }
struct WrappedSubUntyped { struct WrappedSubUntyped {
rcl_handle: rcl_subscription_t, rcl_handle: rcl_subscription_t,
rcl_msg: WrappedNativeMsgUntyped, topic_type: String,
callback: Box<dyn FnMut(Result<serde_json::Value>) -> ()>, sender: mpsc::Sender<Result<serde_json::Value>>,
} }
impl<T> Sub for WrappedSub<T> impl<T> Sub for WrappedSub<T>
@ -236,14 +238,19 @@ where
&self.rcl_handle &self.rcl_handle
} }
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { fn handle_incoming(&mut self) -> () {
self.rcl_msg.void_ptr_mut() let mut msg_info = rmw_message_info_t::default(); // we dont care for now
} let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
fn run_cb(&mut self) -> () { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
// copy native msg to rust type and run callback };
let msg = T::from_native(&self.rcl_msg); if ret == RCL_RET_OK as i32 {
(self.callback)(msg); let msg = T::from_native(&msg);
match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -261,14 +268,18 @@ where
&self.rcl_handle &self.rcl_handle
} }
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { fn handle_incoming(&mut self) -> () {
self.rcl_msg.void_ptr_mut() let mut msg_info = rmw_message_info_t::default(); // we dont care for now
} let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
fn run_cb(&mut self) -> () { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
// *dont't* copy native msg to rust type. };
// e.g. if you for instance have large image data... if ret == RCL_RET_OK as i32 {
(self.callback)(&self.rcl_msg); match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -283,13 +294,20 @@ impl Sub for WrappedSubUntyped {
&self.rcl_handle &self.rcl_handle
} }
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void { fn handle_incoming(&mut self) -> () {
self.rcl_msg.void_ptr_mut() let mut msg_info = rmw_message_info_t::default(); // we dont care for now
} let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type)
.expect(&format!("no typesupport for {}", self.topic_type));
fn run_cb(&mut self) -> () { let ret = unsafe {
let json = self.rcl_msg.to_json(); rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
(self.callback)(json); };
if ret == RCL_RET_OK as i32 {
let json = msg.to_json();
match self.sender.try_send(json) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -366,17 +384,13 @@ where
T: WrappedServiceTypeSupport, T: WrappedServiceTypeSupport,
{ {
rcl_handle: rcl_client_t, rcl_handle: rcl_client_t,
rcl_request: rmw_request_id_t,
// store callbacks with request sequence id and callback function // store callbacks with request sequence id and callback function
callbacks: Vec<(i64, Box<dyn FnOnce(T::Response)>)>, response_channels: Vec<(i64, oneshot::Sender<T::Response>)>,
rcl_response_msg: WrappedNativeMsg<T::Response>,
} }
pub trait Client_ { pub trait Client_ {
fn handle(&self) -> &rcl_client_t; fn handle(&self) -> &rcl_client_t;
fn run_cb(&mut self) -> (); fn handle_response(&mut self) -> ();
fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void;
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t;
fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> ();
} }
@ -388,35 +402,37 @@ where
&self.rcl_handle &self.rcl_handle
} }
fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void { fn handle_response(&mut self) -> () {
self.rcl_response_msg.void_ptr_mut() let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
} let mut response_msg = WrappedNativeMsg::<T::Response>::new();
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t { let ret = unsafe {
&mut self.rcl_request rcl_take_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut())
} };
if ret == RCL_RET_OK as i32 {
fn run_cb(&mut self) -> () { let request_id = unsafe { request_id.assume_init() };
// copy native msg to rust type and run callback if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) {
let req_id = self.rcl_request.sequence_number; let (_, channel) = self.response_channels.swap_remove(idx);
if let Some(idx) = self.callbacks.iter().position(|(id, _)| id == &req_id) { let response = T::Response::from_native(&response_msg);
let (_, cb_to_run) = self.callbacks.swap_remove(idx); match channel.send(response) {
let response = T::Response::from_native(&self.rcl_response_msg); Ok(()) => {},
(cb_to_run)(response); Err(e) => { println!("error sending: {:?}", e); },
} else { }
// I don't think this should be able to occur? Let's panic so we } else {
// find out... // I don't think this should be able to occur? Let's panic so we
let we_have: String = self // find out...
.callbacks let we_have: String = self
.iter() .response_channels
.map(|(id, _)| id.to_string()) .iter()
.collect::<Vec<_>>() .map(|(id, _)| id.to_string())
.join(","); .collect::<Vec<_>>()
eprintln!( .join(",");
"no such req id: {}, we have [{}], ignoring", eprintln!(
req_id, we_have "no such req id: {}, we have [{}], ignoring",
); request_id.sequence_number, we_have
} );
}
} // TODO handle failure.
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
@ -864,7 +880,7 @@ pub struct Node {
// action clients // action clients
action_clients: Vec<(rcl_action_client_t, Arc<Mutex<dyn ActionClient_>>)>, action_clients: Vec<(rcl_action_client_t, Arc<Mutex<dyn ActionClient_>>)>,
// timers, // timers,
timers: Vec<Timer>, timers: Vec<Timer_>,
// and the publishers, whom we allow to be shared.. hmm. // and the publishers, whom we allow to be shared.. hmm.
pubs: Vec<Arc<rcl_publisher_t>>, pubs: Vec<Arc<rcl_publisher_t>>,
} }
@ -1034,37 +1050,37 @@ impl Node {
pub fn subscribe<T: 'static>( pub fn subscribe<T: 'static>(
&mut self, &mut self,
topic: &str, topic: &str,
callback: Box<dyn FnMut(T) -> ()>, ) -> Result<impl Stream<Item = T> + Unpin>
) -> Result<&rcl_subscription_t>
where where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?;
let (sender, receiver) = mpsc::channel::<T>(10);
let ws = WrappedSub { let ws = WrappedSub {
rcl_handle: subscription_handle, rcl_handle: subscription_handle,
rcl_msg: WrappedNativeMsg::<T>::new(), sender,
callback: callback,
}; };
self.subs.push(Box::new(ws)); self.subs.push(Box::new(ws));
Ok(self.subs.last().unwrap().handle()) // hmm... Ok(receiver)
} }
pub fn subscribe_native<T: 'static>( pub fn subscribe_native<T: 'static>(
&mut self, &mut self,
topic: &str, topic: &str,
callback: Box<dyn FnMut(&WrappedNativeMsg<T>) -> ()>, ) -> Result<impl Stream<Item = WrappedNativeMsg<T>>>
) -> Result<&rcl_subscription_t>
where where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?;
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
let ws = WrappedSubNative { let ws = WrappedSubNative {
rcl_handle: subscription_handle, rcl_handle: subscription_handle,
rcl_msg: WrappedNativeMsg::<T>::new(), sender,
callback: callback,
}; };
self.subs.push(Box::new(ws)); self.subs.push(Box::new(ws));
Ok(self.subs.last().unwrap().handle()) // hmm... Ok(receiver)
} }
// Its not really untyped since we know the underlying type... But we throw this info away :) // Its not really untyped since we know the underlying type... But we throw this info away :)
@ -1072,18 +1088,18 @@ impl Node {
&mut self, &mut self,
topic: &str, topic: &str,
topic_type: &str, topic_type: &str,
callback: Box<dyn FnMut(Result<serde_json::Value>) -> ()>, ) -> Result<impl Stream<Item = Result<serde_json::Value>>> {
) -> Result<&rcl_subscription_t> {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle = self.create_subscription_helper(topic, msg.ts)?; let subscription_handle = self.create_subscription_helper(topic, msg.ts)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
let ws = WrappedSubUntyped { let ws = WrappedSubUntyped {
rcl_handle: subscription_handle, rcl_handle: subscription_handle,
rcl_msg: msg, topic_type: topic_type.to_string(),
callback: callback, sender,
}; };
self.subs.push(Box::new(ws)); self.subs.push(Box::new(ws));
Ok(self.subs.last().unwrap().handle()) // hmm... Ok(receiver)
} }
pub fn create_service_helper( pub fn create_service_helper(
@ -1128,7 +1144,7 @@ impl Node {
sequence_number: 0, sequence_number: 0,
}, },
rcl_request_msg: WrappedNativeMsg::<T::Request>::new(), rcl_request_msg: WrappedNativeMsg::<T::Request>::new(),
callback: callback, callback,
}; };
self.services.push(Box::new(ws)); self.services.push(Box::new(ws));
@ -1166,17 +1182,9 @@ impl Node {
T: WrappedServiceTypeSupport, T: WrappedServiceTypeSupport,
{ {
let client_handle = self.create_client_helper(service_name, T::get_ts())?; let client_handle = self.create_client_helper(service_name, T::get_ts())?;
let cloned_ch = rcl_client_t {
impl_: client_handle.impl_,
};
let ws = WrappedClient::<T> { let ws = WrappedClient::<T> {
rcl_handle: cloned_ch, rcl_handle: client_handle,
rcl_request: rmw_request_id_t { response_channels: Vec::new(),
writer_guid: [0; 16usize],
sequence_number: 0,
},
rcl_response_msg: WrappedNativeMsg::<T::Response>::new(),
callbacks: Vec::new(),
}; };
let arc = Arc::new(Mutex::new(ws)); let arc = Arc::new(Mutex::new(ws));
@ -1239,9 +1247,8 @@ impl Node {
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
{ {
let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; let client_handle = self.create_action_client_helper(action_name, T::get_ts())?;
let cloned_handle = unsafe { core::ptr::read(&client_handle) };
let wa = WrappedActionClient::<T> { let wa = WrappedActionClient::<T> {
rcl_handle: cloned_handle, rcl_handle: client_handle,
goal_request_callbacks: Vec::new(), goal_request_callbacks: Vec::new(),
feedback_callbacks: Vec::new(), feedback_callbacks: Vec::new(),
goal_statuses: Vec::new(), goal_statuses: Vec::new(),
@ -1445,20 +1452,15 @@ impl Node {
let ws_subs = let ws_subs =
unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) }; unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) };
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let ret = unsafe { s.handle_incoming();
rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
s.run_cb();
}
} }
} }
let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, ws.size_of_timers) }; let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, ws.size_of_timers) };
assert_eq!(ws_timers.len(), self.timers.len()); assert_eq!(ws_timers.len(), self.timers.len());
let mut timers_to_remove = vec![];
for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) { for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let mut is_ready = false; let mut is_ready = false;
@ -1473,24 +1475,32 @@ impl Node {
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let ret = unsafe { rcl_timer_call(&mut s.timer_handle) }; let ret = unsafe { rcl_timer_call(&mut s.timer_handle) };
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
(s.callback)(Duration::from_nanos(nanos as u64)); match s.sender.try_send(Duration::from_nanos(nanos as u64)) {
Err(e) => {
if e.is_full() {
println!("Warning: timer tick not handled in time - no wakeup will occur");
}
if e.is_disconnected() {
// client dropped the timer handle, let's drop our timer as well.
timers_to_remove.push(s.timer_handle);
}
},
_ => {} // ok
}
} }
} }
} }
} }
} }
} }
// drop timers scheduled for deletion
self.timers.retain(|t| !timers_to_remove.contains(&t.timer_handle));
let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) };
for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let mut s = s.lock().unwrap(); let mut s = s.lock().unwrap();
let ret = unsafe { s.handle_response();
rcl_take_response(s.handle(), s.rcl_request_id(), s.rcl_response_msg())
};
if ret == RCL_RET_OK as i32 {
s.run_cb();
}
} }
} }
@ -1609,8 +1619,7 @@ impl Node {
pub fn create_wall_timer( pub fn create_wall_timer(
&mut self, &mut self,
period: Duration, period: Duration,
callback: Box<dyn FnMut(Duration) -> ()>, ) -> Result<Timer> {
) -> Result<&Timer> {
let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit(); let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit();
let ret = unsafe { let ret = unsafe {
@ -1646,14 +1655,20 @@ impl Node {
} }
} }
let timer = Timer { let (tx, rx) = mpsc::channel::<Duration>(1);
let timer = Timer_ {
timer_handle, timer_handle,
clock_handle, clock_handle,
callback, sender: tx,
}; };
self.timers.push(timer); self.timers.push(timer);
Ok(&self.timers[self.timers.len() - 1]) let out_timer = Timer {
receiver: rx,
};
Ok(out_timer)
} }
pub fn logger<'a>(&'a self) -> &'a str { pub fn logger<'a>(&'a self) -> &'a str {
@ -1666,10 +1681,31 @@ impl Node {
} }
} }
pub struct Timer { pub struct Timer_ {
timer_handle: rcl_timer_t, timer_handle: rcl_timer_t,
clock_handle: Box<rcl_clock_t>, clock_handle: Box<rcl_clock_t>,
callback: Box<dyn FnMut(Duration) -> ()>, sender: mpsc::Sender<Duration>,
}
impl Drop for Timer_ {
fn drop(&mut self) {
unsafe { rcl_timer_fini(&mut self.timer_handle); }
}
}
pub struct Timer {
receiver: mpsc::Receiver<Duration>,
}
impl Timer {
pub async fn tick(&mut self) -> Result<Duration> {
let next = self.receiver.next().await;
if let Some(elapsed) = next {
Ok(elapsed)
} else {
Err(Error::RCL_RET_TIMER_INVALID)
}
}
} }
// Since publishers are temporarily upgraded to owners during the // Since publishers are temporarily upgraded to owners during the
@ -1773,7 +1809,7 @@ impl<T> Client<T>
where where
T: WrappedServiceTypeSupport, T: WrappedServiceTypeSupport,
{ {
pub fn request(&self, msg: &T::Request, cb: Box<dyn FnOnce(T::Response) -> ()>) -> Result<()> pub fn request(&self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
where where
T: WrappedServiceTypeSupport, T: WrappedServiceTypeSupport,
{ {
@ -1789,9 +1825,12 @@ where
let result = let result =
unsafe { rcl_send_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; unsafe { rcl_send_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
let (sender, receiver) = oneshot::channel::<T::Response>();
if result == RCL_RET_OK as i32 { if result == RCL_RET_OK as i32 {
client.callbacks.push((seq_no, cb)); client.response_channels.push((seq_no, sender));
Ok(()) // instead of "canceled" we return invalid client.
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
} else { } else {
eprintln!("coult not send request {}", result); eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result)) Err(Error::from_rcl_error(result))