Change services api to be a stream of requests. Updated examples.

The requests can be processed asynchronously which is demonstrated in
the service example.
This commit is contained in:
Martin Dahl 2021-08-03 11:34:33 +02:00
parent 98ecc51d1b
commit e4dcb4d6ca
4 changed files with 192 additions and 58 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "r2r"
version = "0.1.0"
version = "0.2.0"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Minimal ros2 bindings."
license = "Apache-2.0/MIT"
@ -17,12 +17,13 @@ rcl = { path = "rcl", version = "0.1.0" }
msg_gen = { path = "msg_gen", version = "0.1.0" }
actions = { path = "actions", version = "0.1.0" }
uuid = { version = "0.8", features = ["serde", "v4"] }
retain_mut = "0.1.3"
futures = "0.3.15"
[dev-dependencies]
serde_json = "1.0.62"
futures = "0.3.15"
tokio = { version = "1", features = ["full"] } # for example
tokio = { version = "1", features = ["full"] }
[build-dependencies]
common = { path = "common", version = "0.1.0" }

View File

@ -1,7 +1,7 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use r2r;
use std::io::Write;
use r2r::example_interfaces::srv::AddTwoInts;
@ -11,8 +11,10 @@ async fn requester_task(
let mut x: i64 = 0;
loop {
let req = AddTwoInts::Request { a: 10 * x, b: x };
print!("{} + {} = ", req.a, req.b);
std::io::stdout().flush()?;
let resp = c.request(&req)?.await?;
println!("{} + {} = {}", req.a, req.b, resp.sum);
println!("{}", resp.sum);
x += 1;
if x == 10 {
@ -40,7 +42,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
spawner.spawn_local(async move {
match requester_task(client).await {
Ok(()) => println!("exiting"),
Ok(()) => println!("done."),
Err(e) => println!("error: {}", e),
}
})?;

View File

@ -1,19 +1,94 @@
use futures::executor::LocalPool;
use futures::select;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use futures::FutureExt;
use r2r;
use r2r::example_interfaces::srv::AddTwoInts;
fn handle_service(request: AddTwoInts::Request) -> AddTwoInts::Response {
println!("request: {} + {}", request.a, request.b);
AddTwoInts::Response {
sum: request.a + request.b,
}
}
///
/// This example demonstrates how we can chain async service calls.
///
/// Run toghtether with the client example.
/// e.g. cargo run --example service
/// and in another terminal cargo run --example client
///
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
node.create_service::<AddTwoInts::Service>("/add_two_ints", Box::new(handle_service))?;
let mut service = node.create_service::<AddTwoInts::Service>("/add_two_ints")?;
let service_delayed = node.create_service::<AddTwoInts::Service>("/add_two_ints_delayed")?;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints_delayed")?;
let mut timer = node.create_wall_timer(std::time::Duration::from_millis(250))?;
let mut timer2 = node.create_wall_timer(std::time::Duration::from_millis(2000))?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
loop {
match service.next().await {
Some(req) => {
println!("passing request on to delayed service");
let resp = client
.request(&req.message)
.expect("could not send client response")
.await
.expect("expected client response");
println!(
"responding with: {} + {} = {}",
req.message.a, req.message.b, resp.sum
);
req.respond(resp);
}
None => break,
}
}
})?;
spawner.spawn_local(async move {
// we need to fuse the streams for select
let mut service_delayed = service_delayed.fuse();
loop {
select! {
req = service_delayed.next() => {
if let Some(req) = req {
let resp = AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
// wait a bit before answering...
let _ret = timer2.tick().await;
req.respond(resp);
}
},
elapsed = timer2.tick().fuse() => {
if let Ok(elapsed) = elapsed {
println!("no request made in {}ms", elapsed.as_millis());
}
}
};
}
})?;
spawner.spawn_local(async move {
loop {
let elapsed = timer.tick().await.expect("could not tick");
println!(
"doing other async work, {}ms since last call",
elapsed.as_millis()
);
}
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));
node.spin_once(std::time::Duration::from_millis(5));
pool.run_until_stalled();
}
}

View File

@ -19,6 +19,8 @@ use futures::future::TryFutureExt;
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use retain_mut::RetainMut;
// otherwise crates using r2r needs to specify the same version of uuid as
// this crate depend on, which seem like bad user experience.
pub extern crate uuid;
@ -378,22 +380,19 @@ impl Sub for WrappedSubUntyped {
}
}
// services
struct WrappedService<T>
where
T: WrappedServiceTypeSupport,
{
rcl_handle: rcl_service_t,
rcl_request: rmw_request_id_t,
callback: Box<dyn FnMut(T::Request) -> T::Response>,
rcl_request_msg: WrappedNativeMsg<T::Request>,
sender: mpsc::Sender<ServiceRequest<T>>,
outstanding_requests: Vec<oneshot::Receiver<(rmw_request_id_t, T::Response)>>,
}
trait Service {
fn handle(&self) -> &rcl_service_t;
fn run_cb(&mut self) -> ();
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t;
fn rcl_request_msg(&mut self) -> *mut std::os::raw::c_void;
fn send_completed_responses(&mut self) -> ();
fn handle_request(&mut self) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
@ -405,30 +404,63 @@ where
&self.rcl_handle
}
fn rcl_request_msg(&mut self) -> *mut std::os::raw::c_void {
self.rcl_request_msg.void_ptr_mut()
fn send_completed_responses(&mut self) -> () {
let mut to_send = vec![];
self.outstanding_requests.retain_mut(|r| {
match r.try_recv() {
Ok(Some(resp)) => {
to_send.push(resp);
false // done with this.
}
Ok(None) => true, // keep message, waiting for service
Err(_) => false, // channel canceled
}
});
for (mut req_id, msg) in to_send {
let mut native_response = WrappedNativeMsg::<T::Response>::from(&msg);
let res = unsafe {
rcl_send_response(
&self.rcl_handle,
&mut req_id,
native_response.void_ptr_mut(),
)
};
// TODO
if res != RCL_RET_OK as i32 {
eprintln!("could not send service response {}", res);
}
}
}
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t {
&mut self.rcl_request
}
fn handle_request(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<T::Request>::new();
fn run_cb(&mut self) -> () {
let request = T::Request::from_native(&self.rcl_request_msg);
let response = (self.callback)(request);
let mut native_response = WrappedNativeMsg::<T::Response>::from(&response);
let res = unsafe {
rcl_send_response(
let ret = unsafe {
rcl_take_request(
&self.rcl_handle,
&mut self.rcl_request,
native_response.void_ptr_mut(),
request_id.as_mut_ptr(),
request_msg.void_ptr_mut(),
)
};
// TODO
if res != RCL_RET_OK as i32 {
eprintln!("service error {}", res);
}
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
let request_msg = T::Request::from_native(&request_msg);
let (response_sender, response_receiver) =
oneshot::channel::<(rmw_request_id_t, T::Response)>();
self.outstanding_requests.push(response_receiver);
let request = ServiceRequest::<T> {
message: request_msg,
request_id,
response_sender,
};
match self.sender.try_send(request) {
Err(e) => eprintln!("warning: could not send service request ({})", e),
_ => (),
}
} // TODO handle failure.
}
fn destroy(&mut self, node: &mut rcl_node_t) {
@ -854,7 +886,6 @@ where
}
}
// action servers
struct WrappedActionServer<T>
where
T: WrappedActionTypeSupport,
@ -1245,6 +1276,34 @@ where
client_: Weak<Mutex<WrappedClient<T>>>,
}
pub struct ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
pub message: T::Request,
request_id: rmw_request_id_t,
response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>,
}
/// Encapsulates a service request. In contrast to having a simply callback from
/// Request -> Response types that is called synchronously, the service request
/// can be moved around and completed asynchronously.
impl<T> ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
/// Complete the service request, consuming the request in the process.
/// The reply is sent back on the next "ros spin".
pub fn respond(self, msg: T::Response) {
match self.response_sender.send((self.request_id, msg)) {
Err(_) => {
println!("service response receiver dropped");
}
_ => {}
}
}
}
unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
#[derive(Clone)]
@ -1884,7 +1943,7 @@ impl Node {
pub fn subscribe_native<T: 'static>(
&mut self,
topic: &str,
) -> Result<impl Stream<Item = WrappedNativeMsg<T>>>
) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
where
T: WrappedTypesupport,
{
@ -1904,7 +1963,7 @@ impl Node {
&mut self,
topic: &str,
topic_type: &str,
) -> Result<impl Stream<Item = Result<serde_json::Value>>> {
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle = self.create_subscription_helper(topic, msg.ts)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
@ -1947,24 +2006,21 @@ impl Node {
pub fn create_service<T: 'static>(
&mut self,
service_name: &str,
callback: Box<dyn FnMut(T::Request) -> T::Response>,
) -> Result<&rcl_service_t>
) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
where
T: WrappedServiceTypeSupport,
{
let service_handle = self.create_service_helper(service_name, T::get_ts())?;
let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
let ws = WrappedService::<T> {
rcl_handle: service_handle,
rcl_request: rmw_request_id_t {
writer_guid: [0; 16usize],
sequence_number: 0,
},
rcl_request_msg: WrappedNativeMsg::<T::Request>::new(),
callback,
outstanding_requests: vec![],
sender,
};
self.services.push(Box::new(ws));
Ok(self.services.last().unwrap().handle()) // hmm...
Ok(receiver)
}
fn create_client_helper(
@ -2281,6 +2337,11 @@ impl Node {
}
pub fn spin_once(&mut self, timeout: Duration) {
// first handle any completed service responses
for s in &mut self.services {
s.send_completed_responses();
}
let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
@ -2395,7 +2456,7 @@ impl Node {
}
}
// code below assumes that actions are added last... perhaps a
// code (further) below assumes that actions are added last... perhaps a
// bad assumption. e.g. we add subscriptions and timers of
// the node before ones created automatically by actions. we
// then assume that we can count on the waitables created by
@ -2481,12 +2542,7 @@ impl Node {
let ws_services = unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) };
for (s, ws_s) in self.services.iter_mut().zip(ws_services) {
if ws_s != &std::ptr::null() {
let ret = unsafe {
rcl_take_request(s.handle(), s.rcl_request_id(), s.rcl_request_msg())
};
if ret == RCL_RET_OK as i32 {
s.run_cb();
}
s.handle_request();
}
}