From 7853cb7161d2d6445a3b5fa6db2dbd7ff6de5180 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 18 Aug 2021 10:02:59 +0200 Subject: [PATCH] Async api for waiting on services and action servers. Implemented via polling in spin_once. Can be improved. --- Cargo.toml | 2 +- examples/action_client.rs | 25 ++++++----- examples/client.rs | 15 +++---- examples/service.rs | 11 +++-- examples/untyped_client.rs | 19 ++++---- src/action_clients.rs | 83 +++++++++++++++++++++++++--------- src/clients.rs | 92 +++++++++++++++++++++++++++++++------- src/nodes.rs | 42 +++++++++-------- 8 files changed, 198 insertions(+), 91 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0545460..0044608 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "r2r" -version = "0.3.0" +version = "0.3.5" authors = ["Martin Dahl "] description = "Minimal ros2 bindings." license = "Apache-2.0/MIT" diff --git a/examples/action_client.rs b/examples/action_client.rs index c886733..aaf48df 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -10,27 +10,30 @@ fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; let client = node.create_action_client::("/fibonacci")?; + let action_server_available = node.is_available(&client)?; // signal that we are done let done = Arc::new(Mutex::new(false)); - println!("waiting for action service..."); - while !node.action_server_available(&client)? { - std::thread::sleep(std::time::Duration::from_millis(500)); - } - println!("action service available."); - - let goal = Fibonacci::Goal { order: 5 }; - println!("sending goal: {:?}", goal); - let goal_fut = client.send_goal_request(goal)?; - let mut pool = LocalPool::new(); let spawner = pool.spawner(); let task_spawner = spawner.clone(); let task_done = done.clone(); spawner.spawn_local(async move { - let (goal, result, feedback) = goal_fut.await.unwrap(); // assume success + println!("waiting for action service..."); + action_server_available + .await + .expect("could not await action server"); + println!("action service available."); + + let goal = Fibonacci::Goal { order: 5 }; + println!("sending goal: {:?}", goal); + let (goal, result, feedback) = client + .send_goal_request(goal) + .expect("could not send goal request") + .await + .expect("did not get goal"); println!("goal accepted: {}", goal.uuid); // process feedback stream in its own task diff --git a/examples/client.rs b/examples/client.rs index b7764b9..f4d9019 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,14 +1,19 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; +use futures::Future; use r2r; use std::io::Write; use r2r::example_interfaces::srv::AddTwoInts; async fn requester_task( + node_available: impl Future>, c: r2r::Client, ) -> Result<(), Box> { let mut x: i64 = 0; + println!("waiting for service..."); + node_available.await?; + println!("service available."); loop { let req = AddTwoInts::Request { a: 10 * x, b: x }; print!("{} + {} = ", req.a, req.b); @@ -29,19 +34,13 @@ fn main() -> Result<(), Box> { let mut node = r2r::Node::create(ctx, "testnode", "")?; let client = node.create_client::("/add_two_ints")?; - // wait for service to be available - println!("waiting for service..."); - while !node.service_available(&client)? { - std::thread::sleep(std::time::Duration::from_millis(1000)); - } - - println!("service available."); + let service_available = node.is_available(&client)?; let mut pool = LocalPool::new(); let spawner = pool.spawner(); spawner.spawn_local(async move { - match requester_task(client).await { + match requester_task(service_available, client).await { Ok(()) => println!("done."), Err(e) => println!("error: {}", e), } diff --git a/examples/service.rs b/examples/service.rs index 9e3dc11..444dce5 100644 --- a/examples/service.rs +++ b/examples/service.rs @@ -23,16 +23,15 @@ fn main() -> Result<(), Box> { let mut timer = node.create_wall_timer(std::time::Duration::from_millis(250))?; let mut timer2 = node.create_wall_timer(std::time::Duration::from_millis(2000))?; // wait for service to be available - println!("waiting for service..."); - while !node.service_available(&client)? { - std::thread::sleep(std::time::Duration::from_millis(1000)); - } - println!("service available."); - + let service_available = node.is_available(&client)?; let mut pool = LocalPool::new(); let spawner = pool.spawner(); spawner.spawn_local(async move { + println!("waiting for delayed service..."); + service_available.await.expect("could not await service"); + println!("delayed service available."); + loop { match service.next().await { Some(req) => { diff --git a/examples/untyped_client.rs b/examples/untyped_client.rs index 650e578..83342a9 100644 --- a/examples/untyped_client.rs +++ b/examples/untyped_client.rs @@ -1,9 +1,16 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; +use futures::Future; use r2r; -async fn requester_task(c: r2r::UntypedClient) -> Result<(), Box> { +async fn requester_task( + node_available: impl Future>, + c: r2r::UntypedClient, +) -> Result<(), Box> { let mut x: i64 = 0; + println!("waiting for service..."); + node_available.await?; + println!("service available."); loop { let json = format!("{{ \"a\": {}, \"b\": {} }}", 10 * x, x); let req = serde_json::from_str(&json).unwrap(); @@ -24,19 +31,13 @@ fn main() -> Result<(), Box> { let client = node.create_client_untyped("/add_two_ints", "example_interfaces/srv/AddTwoInts")?; - // wait for service to be available - println!("waiting for service..."); - while !node.service_available_untyped(&client)? { - std::thread::sleep(std::time::Duration::from_millis(1000)); - } - - println!("service available."); + let service_available = node.is_available(&client)?; let mut pool = LocalPool::new(); let spawner = pool.spawner(); spawner.spawn_local(async move { - match requester_task(client).await { + match requester_task(service_available, client).await { Ok(()) => println!("done."), Err(e) => println!("error: {}", e), } diff --git a/src/action_clients.rs b/src/action_clients.rs index bc3e2cd..152fee0 100644 --- a/src/action_clients.rs +++ b/src/action_clients.rs @@ -144,26 +144,6 @@ where ActionClient { client } } -pub fn action_server_available(node: &rcl_node_t, client: &ActionClient) -> Result -where - T: 'static + WrappedActionTypeSupport, -{ - let client = client - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - let mut avail = false; - let result = unsafe { rcl_action_server_is_available(node, client.handle(), &mut avail) }; - - if result == RCL_RET_OK as i32 { - Ok(avail) - } else { - eprintln!("coult not send request {}", result); - Err(Error::from_rcl_error(result)) - } -} - #[derive(Debug, Copy, Clone, PartialEq)] pub enum GoalStatus { Unknown, @@ -236,6 +216,8 @@ where pub result_requests: Vec<(i64, uuid::Uuid)>, pub result_senders: Vec<(uuid::Uuid, oneshot::Sender<(GoalStatus, T::Result)>)>, pub goal_status: HashMap, + + pub poll_available_channels: Vec>, } pub trait ActionClient_ { @@ -249,6 +231,9 @@ pub trait ActionClient_ { fn handle_result_response(&mut self) -> (); fn send_result_request(&mut self, uuid: uuid::Uuid) -> (); + + fn register_poll_available(&mut self, s: oneshot::Sender<()>) -> (); + fn poll_available(&mut self, node: &mut rcl_node_t) -> (); } use std::convert::TryInto; @@ -515,6 +500,32 @@ where } } + fn register_poll_available(&mut self, s: oneshot::Sender<()>) { + self.poll_available_channels.push(s); + } + + fn poll_available(&mut self, node: &mut rcl_node_t) { + if self.poll_available_channels.is_empty() { + return; + } + let available = action_server_available_helper(node, self.handle()); + match available { + Ok(true) => { + // send ok and close channels + while let Some(sender) = self.poll_available_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Ok(false) => { + // not available... + } + Err(_) => { + // error, close all channels + self.poll_available_channels.clear(); + } + } + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_action_client_fini(&mut self.rcl_handle, node); @@ -572,3 +583,35 @@ pub fn action_client_get_num_waits( } } } + +use crate::nodes::IsAvailablePollable; + +impl IsAvailablePollable for ActionClient +where + T: WrappedActionTypeSupport, +{ + fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> { + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.register_poll_available(sender); + Ok(()) + } +} + +pub fn action_server_available_helper( + node: &rcl_node_t, + client: &rcl_action_client_t, +) -> Result { + let mut avail = false; + let result = unsafe { rcl_action_server_is_available(node, client, &mut avail) }; + + if result == RCL_RET_OK as i32 { + Ok(avail) + } else { + eprintln!("coult not check if action server is available {}", result); + Err(Error::from_rcl_error(result)) + } +} diff --git a/src/clients.rs b/src/clients.rs index 31c8f39..b296061 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -109,6 +109,8 @@ impl UntypedClient_ { pub trait Client_ { fn handle(&self) -> &rcl_client_t; fn handle_response(&mut self) -> (); + fn register_poll_available(&mut self, s: oneshot::Sender<()>) -> (); + fn poll_available(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> (); } @@ -118,6 +120,7 @@ where { pub rcl_handle: rcl_client_t, pub response_channels: Vec<(i64, oneshot::Sender)>, + pub poll_available_channels: Vec>, } impl Client_ for TypedClient @@ -169,6 +172,32 @@ where } // TODO handle failure. } + fn register_poll_available(&mut self, s: oneshot::Sender<()>) { + self.poll_available_channels.push(s); + } + + fn poll_available(&mut self, node: &mut rcl_node_t) { + if self.poll_available_channels.is_empty() { + return; + } + let available = service_available_helper(node, self.handle()); + match available { + Ok(true) => { + // send ok and close channels + while let Some(sender) = self.poll_available_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Ok(false) => { + // not available... + } + Err(_) => { + // error, close all channels + self.poll_available_channels.clear(); + } + } + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_client_fini(&mut self.rcl_handle, node); @@ -180,6 +209,7 @@ pub struct UntypedClient_ { pub service_type: UntypedServiceSupport, pub rcl_handle: rcl_client_t, pub response_channels: Vec<(i64, oneshot::Sender>)>, + pub poll_available_channels: Vec>, } impl Client_ for UntypedClient_ { @@ -228,6 +258,32 @@ impl Client_ for UntypedClient_ { } // TODO handle failure. } + fn register_poll_available(&mut self, s: oneshot::Sender<()>) { + self.poll_available_channels.push(s); + } + + fn poll_available(&mut self, node: &mut rcl_node_t) { + if self.poll_available_channels.is_empty() { + return; + } + let available = service_available_helper(node, self.handle()); + match available { + Ok(true) => { + // send ok and close channels + while let Some(sender) = self.poll_available_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Ok(false) => { + // not available... + } + Err(_) => { + // error, close all channels + self.poll_available_channels.clear(); + } + } + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_client_fini(&mut self.rcl_handle, node); @@ -272,23 +328,25 @@ pub fn service_available_helper(node: &mut rcl_node_t, client: &rcl_client_t) -> } } -pub fn service_available( - node: &mut rcl_node_t, - client: &Client, -) -> Result { - let client = client - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - service_available_helper(node, client.handle()) +use crate::nodes::IsAvailablePollable; + +impl IsAvailablePollable for Client +where + T: WrappedServiceTypeSupport, +{ + fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> { + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.register_poll_available(sender); + Ok(()) + } } -pub fn service_available_untyped(node: &mut rcl_node_t, client: &UntypedClient) -> Result { - let client = client - .client - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); - service_available_helper(node, client.handle()) +impl IsAvailablePollable for UntypedClient { + fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> { + let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.register_poll_available(sender); + Ok(()) + } } diff --git a/src/nodes.rs b/src/nodes.rs index c4e4ac8..409df25 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -243,6 +243,7 @@ impl Node { let ws = TypedClient:: { rcl_handle: client_handle, response_channels: Vec::new(), + poll_available_channels: Vec::new(), }; let client_arc = Arc::new(Mutex::new(ws)); @@ -264,6 +265,7 @@ impl Node { service_type, rcl_handle: client_handle, response_channels: Vec::new(), + poll_available_channels: Vec::new(), }; let client_arc = Arc::new(Mutex::new(client)); @@ -272,15 +274,13 @@ impl Node { Ok(c) } - pub fn service_available( + pub fn is_available( &mut self, - client: &Client, - ) -> Result { - service_available(self.node_handle.as_mut(), client) - } - - pub fn service_available_untyped(&mut self, client: &UntypedClient) -> Result { - service_available_untyped(self.node_handle.as_mut(), client) + client: &dyn IsAvailablePollable, + ) -> Result>> { + let (sender, receiver) = oneshot::channel(); + client.register_poll_available(sender)?; + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) } pub fn create_action_client(&mut self, action_name: &str) -> Result> @@ -297,6 +297,7 @@ impl Node { result_senders: Vec::new(), result_requests: Vec::new(), goal_status: HashMap::new(), + poll_available_channels: Vec::new(), }; let client_arc = Arc::new(Mutex::new(client)); @@ -305,13 +306,6 @@ impl Node { Ok(c) } - pub fn action_server_available( - &self, - client: &ActionClient, - ) -> Result { - action_server_available(self.node_handle.as_ref(), client) - } - pub fn create_action_server( &mut self, action_name: &str, @@ -388,6 +382,15 @@ impl Node { a.lock().unwrap().send_completed_cancel_requests(); } + // as well as polling any services/action servers for availability + for c in &mut self.clients { + c.lock().unwrap().poll_available(self.node_handle.as_mut()); + } + + for c in &mut self.action_clients { + c.lock().unwrap().poll_available(self.node_handle.as_mut()); + } + let timeout = timeout.as_nanos() as i64; let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; @@ -815,10 +818,7 @@ impl Timer { // wait until there are no other owners in the cleanup procedure. The // next time a publisher wants to publish they will fail because the // value in the Arc has been dropped. Hacky but works. -fn wait_until_unwrapped(mut a: Arc) -> T -where - T: std::fmt::Debug, -{ +fn wait_until_unwrapped(mut a: Arc) -> T { loop { match Arc::try_unwrap(a) { Ok(b) => return b, @@ -860,3 +860,7 @@ impl Drop for Node { } } } + +pub trait IsAvailablePollable { + fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>; +}