Async api for waiting on services and action servers.

Implemented via polling in spin_once. Can be improved.
This commit is contained in:
Martin Dahl 2021-08-18 10:02:59 +02:00
parent 2babcaa5c6
commit 7853cb7161
8 changed files with 198 additions and 91 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "r2r"
version = "0.3.0"
version = "0.3.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Minimal ros2 bindings."
license = "Apache-2.0/MIT"

View File

@ -10,27 +10,30 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_action_client::<Fibonacci::Action>("/fibonacci")?;
let action_server_available = node.is_available(&client)?;
// signal that we are done
let done = Arc::new(Mutex::new(false));
println!("waiting for action service...");
while !node.action_server_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(500));
}
println!("action service available.");
let goal = Fibonacci::Goal { order: 5 };
println!("sending goal: {:?}", goal);
let goal_fut = client.send_goal_request(goal)?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let task_spawner = spawner.clone();
let task_done = done.clone();
spawner.spawn_local(async move {
let (goal, result, feedback) = goal_fut.await.unwrap(); // assume success
println!("waiting for action service...");
action_server_available
.await
.expect("could not await action server");
println!("action service available.");
let goal = Fibonacci::Goal { order: 5 };
println!("sending goal: {:?}", goal);
let (goal, result, feedback) = client
.send_goal_request(goal)
.expect("could not send goal request")
.await
.expect("did not get goal");
println!("goal accepted: {}", goal.uuid);
// process feedback stream in its own task

View File

@ -1,14 +1,19 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::Future;
use r2r;
use std::io::Write;
use r2r::example_interfaces::srv::AddTwoInts;
async fn requester_task(
node_available: impl Future<Output = r2r::Result<()>>,
c: r2r::Client<AddTwoInts::Service>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut x: i64 = 0;
println!("waiting for service...");
node_available.await?;
println!("service available.");
loop {
let req = AddTwoInts::Request { a: 10 * x, b: x };
print!("{} + {} = ", req.a, req.b);
@ -29,19 +34,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let service_available = node.is_available(&client)?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
match requester_task(client).await {
match requester_task(service_available, client).await {
Ok(()) => println!("done."),
Err(e) => println!("error: {}", e),
}

View File

@ -23,16 +23,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut timer = node.create_wall_timer(std::time::Duration::from_millis(250))?;
let mut timer2 = node.create_wall_timer(std::time::Duration::from_millis(2000))?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let service_available = node.is_available(&client)?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
println!("waiting for delayed service...");
service_available.await.expect("could not await service");
println!("delayed service available.");
loop {
match service.next().await {
Some(req) => {

View File

@ -1,9 +1,16 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::Future;
use r2r;
async fn requester_task(c: r2r::UntypedClient) -> Result<(), Box<dyn std::error::Error>> {
async fn requester_task(
node_available: impl Future<Output = r2r::Result<()>>,
c: r2r::UntypedClient,
) -> Result<(), Box<dyn std::error::Error>> {
let mut x: i64 = 0;
println!("waiting for service...");
node_available.await?;
println!("service available.");
loop {
let json = format!("{{ \"a\": {}, \"b\": {} }}", 10 * x, x);
let req = serde_json::from_str(&json).unwrap();
@ -24,19 +31,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let client =
node.create_client_untyped("/add_two_ints", "example_interfaces/srv/AddTwoInts")?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available_untyped(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let service_available = node.is_available(&client)?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
match requester_task(client).await {
match requester_task(service_available, client).await {
Ok(()) => println!("done."),
Err(e) => println!("error: {}", e),
}

View File

@ -144,26 +144,6 @@ where
ActionClient { client }
}
pub fn action_server_available<T>(node: &rcl_node_t, client: &ActionClient<T>) -> Result<bool>
where
T: 'static + WrappedActionTypeSupport,
{
let client = client
.client
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
let mut avail = false;
let result = unsafe { rcl_action_server_is_available(node, client.handle(), &mut avail) };
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum GoalStatus {
Unknown,
@ -236,6 +216,8 @@ where
pub result_requests: Vec<(i64, uuid::Uuid)>,
pub result_senders: Vec<(uuid::Uuid, oneshot::Sender<(GoalStatus, T::Result)>)>,
pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
pub poll_available_channels: Vec<oneshot::Sender<()>>,
}
pub trait ActionClient_ {
@ -249,6 +231,9 @@ pub trait ActionClient_ {
fn handle_result_response(&mut self) -> ();
fn send_result_request(&mut self, uuid: uuid::Uuid) -> ();
fn register_poll_available(&mut self, s: oneshot::Sender<()>) -> ();
fn poll_available(&mut self, node: &mut rcl_node_t) -> ();
}
use std::convert::TryInto;
@ -515,6 +500,32 @@ where
}
}
fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
self.poll_available_channels.push(s);
}
fn poll_available(&mut self, node: &mut rcl_node_t) {
if self.poll_available_channels.is_empty() {
return;
}
let available = action_server_available_helper(node, self.handle());
match available {
Ok(true) => {
// send ok and close channels
while let Some(sender) = self.poll_available_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Ok(false) => {
// not available...
}
Err(_) => {
// error, close all channels
self.poll_available_channels.clear();
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_action_client_fini(&mut self.rcl_handle, node);
@ -572,3 +583,35 @@ pub fn action_client_get_num_waits(
}
}
}
use crate::nodes::IsAvailablePollable;
impl<T: 'static> IsAvailablePollable for ActionClient<T>
where
T: WrappedActionTypeSupport,
{
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.register_poll_available(sender);
Ok(())
}
}
pub fn action_server_available_helper(
node: &rcl_node_t,
client: &rcl_action_client_t,
) -> Result<bool> {
let mut avail = false;
let result = unsafe { rcl_action_server_is_available(node, client, &mut avail) };
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not check if action server is available {}", result);
Err(Error::from_rcl_error(result))
}
}

View File

@ -109,6 +109,8 @@ impl UntypedClient_ {
pub trait Client_ {
fn handle(&self) -> &rcl_client_t;
fn handle_response(&mut self) -> ();
fn register_poll_available(&mut self, s: oneshot::Sender<()>) -> ();
fn poll_available(&mut self, node: &mut rcl_node_t) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
@ -118,6 +120,7 @@ where
{
pub rcl_handle: rcl_client_t,
pub response_channels: Vec<(i64, oneshot::Sender<T::Response>)>,
pub poll_available_channels: Vec<oneshot::Sender<()>>,
}
impl<T: 'static> Client_ for TypedClient<T>
@ -169,6 +172,32 @@ where
} // TODO handle failure.
}
fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
self.poll_available_channels.push(s);
}
fn poll_available(&mut self, node: &mut rcl_node_t) {
if self.poll_available_channels.is_empty() {
return;
}
let available = service_available_helper(node, self.handle());
match available {
Ok(true) => {
// send ok and close channels
while let Some(sender) = self.poll_available_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Ok(false) => {
// not available...
}
Err(_) => {
// error, close all channels
self.poll_available_channels.clear();
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_client_fini(&mut self.rcl_handle, node);
@ -180,6 +209,7 @@ pub struct UntypedClient_ {
pub service_type: UntypedServiceSupport,
pub rcl_handle: rcl_client_t,
pub response_channels: Vec<(i64, oneshot::Sender<Result<serde_json::Value>>)>,
pub poll_available_channels: Vec<oneshot::Sender<()>>,
}
impl Client_ for UntypedClient_ {
@ -228,6 +258,32 @@ impl Client_ for UntypedClient_ {
} // TODO handle failure.
}
fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
self.poll_available_channels.push(s);
}
fn poll_available(&mut self, node: &mut rcl_node_t) {
if self.poll_available_channels.is_empty() {
return;
}
let available = service_available_helper(node, self.handle());
match available {
Ok(true) => {
// send ok and close channels
while let Some(sender) = self.poll_available_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Ok(false) => {
// not available...
}
Err(_) => {
// error, close all channels
self.poll_available_channels.clear();
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_client_fini(&mut self.rcl_handle, node);
@ -272,23 +328,25 @@ pub fn service_available_helper(node: &mut rcl_node_t, client: &rcl_client_t) ->
}
}
pub fn service_available<T: 'static + WrappedServiceTypeSupport>(
node: &mut rcl_node_t,
client: &Client<T>,
) -> Result<bool> {
let client = client
.client
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
service_available_helper(node, client.handle())
use crate::nodes::IsAvailablePollable;
impl<T: 'static> IsAvailablePollable for Client<T>
where
T: WrappedServiceTypeSupport,
{
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.register_poll_available(sender);
Ok(())
}
}
pub fn service_available_untyped(node: &mut rcl_node_t, client: &UntypedClient) -> Result<bool> {
let client = client
.client
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
service_available_helper(node, client.handle())
impl IsAvailablePollable for UntypedClient {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.register_poll_available(sender);
Ok(())
}
}

View File

@ -243,6 +243,7 @@ impl Node {
let ws = TypedClient::<T> {
rcl_handle: client_handle,
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
};
let client_arc = Arc::new(Mutex::new(ws));
@ -264,6 +265,7 @@ impl Node {
service_type,
rcl_handle: client_handle,
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
};
let client_arc = Arc::new(Mutex::new(client));
@ -272,15 +274,13 @@ impl Node {
Ok(c)
}
pub fn service_available<T: 'static + WrappedServiceTypeSupport>(
pub fn is_available(
&mut self,
client: &Client<T>,
) -> Result<bool> {
service_available(self.node_handle.as_mut(), client)
}
pub fn service_available_untyped(&mut self, client: &UntypedClient) -> Result<bool> {
service_available_untyped(self.node_handle.as_mut(), client)
client: &dyn IsAvailablePollable,
) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();
client.register_poll_available(sender)?;
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
pub fn create_action_client<T: 'static>(&mut self, action_name: &str) -> Result<ActionClient<T>>
@ -297,6 +297,7 @@ impl Node {
result_senders: Vec::new(),
result_requests: Vec::new(),
goal_status: HashMap::new(),
poll_available_channels: Vec::new(),
};
let client_arc = Arc::new(Mutex::new(client));
@ -305,13 +306,6 @@ impl Node {
Ok(c)
}
pub fn action_server_available<T: 'static + WrappedActionTypeSupport>(
&self,
client: &ActionClient<T>,
) -> Result<bool> {
action_server_available(self.node_handle.as_ref(), client)
}
pub fn create_action_server<T: 'static>(
&mut self,
action_name: &str,
@ -388,6 +382,15 @@ impl Node {
a.lock().unwrap().send_completed_cancel_requests();
}
// as well as polling any services/action servers for availability
for c in &mut self.clients {
c.lock().unwrap().poll_available(self.node_handle.as_mut());
}
for c in &mut self.action_clients {
c.lock().unwrap().poll_available(self.node_handle.as_mut());
}
let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
@ -815,10 +818,7 @@ impl Timer {
// wait until there are no other owners in the cleanup procedure. The
// next time a publisher wants to publish they will fail because the
// value in the Arc has been dropped. Hacky but works.
fn wait_until_unwrapped<T>(mut a: Arc<T>) -> T
where
T: std::fmt::Debug,
{
fn wait_until_unwrapped<T>(mut a: Arc<T>) -> T {
loop {
match Arc::try_unwrap(a) {
Ok(b) => return b,
@ -860,3 +860,7 @@ impl Drop for Node {
}
}
}
pub trait IsAvailablePollable {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
}