async action client + example

This commit is contained in:
Martin Dahl 2021-06-16 14:39:27 +02:00
parent 49c2a0ea7b
commit b70fc089b1
4 changed files with 160 additions and 127 deletions

View File

@ -1,7 +1,9 @@
use futures::executor::LocalPool;
use futures::future;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use r2r; use r2r;
use r2r::example_interfaces::action::Fibonacci; use r2r::example_interfaces::action::Fibonacci;
use std::cell::RefCell;
use std::rc::Rc;
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?; let ctx = r2r::Context::create()?;
@ -15,34 +17,34 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("action service available."); println!("action service available.");
let goal = Fibonacci::Goal { order: 10 }; let goal = Fibonacci::Goal { order: 10 };
let goal_accepted = Rc::new(RefCell::new(None));
let cb_ga = goal_accepted.clone();
let cb = Box::new(move |r: Fibonacci::SendGoal::Response| {
println!("got response {:?}", r);
*cb_ga.borrow_mut() = Some(r.accepted);
});
let feedback_cb = Box::new(move |fb: Fibonacci::Feedback| {
println!("got feedback {:?}", fb);
});
let result_cb = Box::new(move |r: Fibonacci::Result| {
println!("final result {:?}", r);
});
println!("sending goal: {:?}", goal); println!("sending goal: {:?}", goal);
client.send_goal_request(goal, cb, feedback_cb, result_cb)?; let goal_fut = client.send_goal_request(goal)?;
let mut c = 0; let mut pool = LocalPool::new();
loop { let spawner = pool.spawner();
node.spin_once(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(1000)); let task_spawner = spawner.clone();
c += 1; spawner.spawn_local(async move {
if c > 100 { let goal = goal_fut.await.unwrap(); // assume success
println!("shutdown");
break; // process feedback stream in its own task
task_spawner
.spawn_local(goal.feedback.for_each(|msg| {
println!("new feedback msg {:?}", msg);
future::ready(())
}))
.unwrap();
// await result in this task
let result = goal.result.await;
match result {
Ok(msg) => println!("got result {:?}", msg),
Err(e) => println!("action failed: {:?}", e),
} }
} })?;
Ok(()) loop {
node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
}
} }

View File

@ -154,6 +154,10 @@ pub fn generate_rust_action(module_: &str, prefix_: &str, name_: &str) -> String
}} }}
}} }}
fn destructure_goal_response_msg(msg: SendGoal::Response) -> (bool, builtin_interfaces::msg::Time) {{
(msg.accepted, msg.stamp)
}}
fn destructure_feedback_msg(msg: FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Feedback) {{ fn destructure_feedback_msg(msg: FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Feedback) {{
(msg.goal_id, msg.feedback) (msg.goal_id, msg.feedback)
}} }}

View File

@ -81,6 +81,10 @@ pub enum Error {
InvalidMessageType { msgtype: String }, InvalidMessageType { msgtype: String },
#[error("Serde error: {}", err)] #[error("Serde error: {}", err)]
SerdeError { err: String }, SerdeError { err: String },
// action errors. perhaps they exist in the rcl?
#[error("Goal rejected by server.")]
GoalRejected,
} }
impl Error { impl Error {

View File

@ -15,6 +15,7 @@ use std::fmt::Debug;
use std::future::Future; use std::future::Future;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use futures::future::FutureExt;
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
@ -63,6 +64,7 @@ pub trait WrappedActionTypeSupport {
<<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request; <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request;
fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) -> fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) ->
<<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request; <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request;
fn destructure_goal_response_msg(msg: <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response) -> (bool, builtin_interfaces::msg::Time);
fn destructure_feedback_msg(msg: Self::FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); fn destructure_feedback_msg(msg: Self::FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Self::Feedback);
fn destructure_result_response_msg(msg: <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response) -> fn destructure_result_response_msg(msg: <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response) ->
(i8, Self::Result); (i8, Self::Result);
@ -353,7 +355,6 @@ where
} }
fn run_cb(&mut self) -> () { fn run_cb(&mut self) -> () {
// copy native msg to rust type and run callback
let request = T::Request::from_native(&self.rcl_request_msg); let request = T::Request::from_native(&self.rcl_request_msg);
let response = (self.callback)(request); let response = (self.callback)(request);
let mut native_response = WrappedNativeMsg::<T::Response>::from(&response); let mut native_response = WrappedNativeMsg::<T::Response>::from(&response);
@ -412,15 +413,13 @@ where
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() }; let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, channel) = self.response_channels.swap_remove(idx); let (_, sender) = self.response_channels.swap_remove(idx);
let response = T::Response::from_native(&response_msg); let response = T::Response::from_native(&response_msg);
match channel.send(response) { match sender.send(response) {
Ok(()) => {}, Ok(()) => {},
Err(e) => { println!("error sending: {:?}", e); }, Err(e) => { println!("error sending to client: {:?}", e); },
} }
} else { } else {
// I don't think this should be able to occur? Let's panic so we
// find out...
let we_have: String = self let we_have: String = self
.response_channels .response_channels
.iter() .iter()
@ -488,11 +487,11 @@ where
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
{ {
rcl_handle: rcl_action_client_t, rcl_handle: rcl_action_client_t,
goal_request_callbacks: Vec<(i64, Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response)>)>, goal_response_channels: Vec<(i64, oneshot::Sender<<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response>)>,
feedback_callbacks: Vec<(uuid::Uuid, Box<dyn FnMut(T::Feedback) -> ()>)>, feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
goal_statuses: Vec<(uuid::Uuid, GoalStatus)>, // goal_statuses: Vec<(uuid::Uuid, GoalStatus)>,
result_request_callbacks: Vec<(i64, Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response)>)>, result_requests: Vec<(i64, uuid::Uuid)>,
result_callbacks: Vec<(uuid::Uuid, Box<dyn FnOnce(T::Result) -> ()>)>, result_senders: Vec<(uuid::Uuid, oneshot::Sender<T::Result>)>,
} }
pub trait ActionClient_ { pub trait ActionClient_ {
@ -501,9 +500,10 @@ pub trait ActionClient_ {
fn handle_goal_response(&mut self) -> (); fn handle_goal_response(&mut self) -> ();
fn handle_feedback_msg(&mut self) -> (); fn handle_feedback_msg(&mut self) -> ();
fn handle_status_msg(&mut self) -> ();
fn handle_result_response(&mut self) -> (); fn handle_result_response(&mut self) -> ();
fn run_result_request(&mut self, uuid: &uuid::Uuid) -> (); fn send_result_request(&mut self, uuid: uuid::Uuid) -> ();
} }
@ -513,36 +513,6 @@ fn vec_to_uuid_bytes<T>(v: Vec<T>) -> [T; 16] {
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", 16, v.len())) .unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", 16, v.len()))
} }
impl<T: 'static> WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn send_result_request(&mut self, uuid: &uuid::Uuid, cb: Box<dyn FnOnce(T::Result) -> ()>) -> Result<()> {
let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() };
let request_msg = T::make_result_request_msg(uuid_msg);
let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::GetResult as
WrappedServiceTypeSupport>::Request>::from(&request_msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
if result == RCL_RET_OK as i32 {
self.result_request_callbacks.push((seq_no, Box::new(move |r: <<T as WrappedActionTypeSupport>::GetResult as
WrappedServiceTypeSupport>::Response| {
let (status, result) = T::destructure_result_response_msg(r);
let status = GoalStatus::from_rcl(status);
println!("status: {:?}, result: {:?}", status, result);
(cb)(result);
})));
Ok(())
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl<T: 'static> ActionClient_ for WrappedActionClient<T> impl<T: 'static> ActionClient_ for WrappedActionClient<T>
where where
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
@ -560,15 +530,16 @@ where
}; };
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() }; let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.goal_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) { if let Some(idx) = self.goal_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, cb_to_run) = self.goal_request_callbacks.swap_remove(idx); let (_, sender) = self.goal_response_channels.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
(cb_to_run)(response); match sender.send(response) {
Ok(()) => {},
Err(e) => { println!("error sending to action client: {:?}", e); },
}
} else { } else {
// I don't think this should be able to occur? Let's panic so we
// find out...
let we_have: String = self let we_have: String = self
.goal_request_callbacks .goal_response_channels
.iter() .iter()
.map(|(id, _)| id.to_string()) .map(|(id, _)| id.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
@ -590,8 +561,31 @@ where
let msg = T::FeedbackMessage::from_native(&feedback_msg); let msg = T::FeedbackMessage::from_native(&feedback_msg);
let (uuid, feedback) = T::destructure_feedback_msg(msg); let (uuid, feedback) = T::destructure_feedback_msg(msg);
let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid));
if let Some((_,cb)) = self.feedback_callbacks.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) { if let Some((_,sender)) = self.feedback_senders.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) {
(cb)(feedback); match sender.try_send(feedback) {
Err(e) => eprintln!("warning: could not send feedback msg ({})", e),
_ => (),
}
}
}
}
fn handle_status_msg(&mut self) -> () {
let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
let ret = unsafe {
rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
// TODO: actually use this information.
for a in &arr.status_list {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone()));
if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { continue; }
let status = GoalStatus::from_rcl(a.status);
if status == GoalStatus::Succeeded {
self.send_result_request(uuid);
}
// TODO, cancel futures when goal is aborted etc.
} }
} }
} }
@ -606,15 +600,26 @@ where
if ret == RCL_RET_OK as i32 { if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() }; let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.result_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) { if let Some(idx) = self.result_requests.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, cb_to_run) = self.result_request_callbacks.swap_remove(idx); let (_, uuid) = self.result_requests.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); if let Some(idx) = self.result_senders.iter().position(|(suuid, _)| suuid == &uuid) {
(cb_to_run)(response); let (_, sender) = self.result_senders.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
let (status, result) = T::destructure_result_response_msg(response);
let status = GoalStatus::from_rcl(status);
if status != GoalStatus::Succeeded {
println!("goal status failed: {:?}, result: {:?}", status, result);
// this will drop the sender which makes the receiver fail with "canceled"
} else {
match sender.send(result) {
Ok(()) => {},
Err(e) => { println!("error sending result to action client: {:?}", e); },
}
}
}
} else { } else {
// I don't think this should be able to occur? Let's panic so we
// find out...
let we_have: String = self let we_have: String = self
.result_request_callbacks .result_requests
.iter() .iter()
.map(|(id, _)| id.to_string()) .map(|(id, _)| id.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
@ -627,11 +632,19 @@ where
} }
} }
fn run_result_request(&mut self, uuid: &uuid::Uuid) -> () { fn send_result_request(&mut self, uuid: uuid::Uuid) -> () {
if let Some(idx) = self.result_callbacks.iter().position(|(cb_uuid, _)| cb_uuid == uuid) { let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() };
let (_, result_cb) = self.result_callbacks.swap_remove(idx); let request_msg = T::make_result_request_msg(uuid_msg);
println!("asking for final result for {}", uuid); let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::GetResult as
self.send_result_request(uuid, result_cb).unwrap(); // TODO error handling. WrappedServiceTypeSupport>::Request>::from(&request_msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
if result == RCL_RET_OK as i32 {
self.result_requests.push((seq_no, uuid));
} else {
eprintln!("coult not send request {}", result);
} }
} }
@ -705,6 +718,15 @@ where
client_: Weak<Mutex<WrappedActionClient<T>>>, client_: Weak<Mutex<WrappedActionClient<T>>>,
} }
#[derive(Debug)]
pub struct Goal<T>
where
T: WrappedActionTypeSupport,
{
pub feedback: mpsc::Receiver<T::Feedback>,
pub result: oneshot::Receiver<T::Result>,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
context_handle: Arc<Mutex<ContextHandle>>, context_handle: Arc<Mutex<ContextHandle>>,
@ -1251,11 +1273,11 @@ impl Node {
let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; let client_handle = self.create_action_client_helper(action_name, T::get_ts())?;
let wa = WrappedActionClient::<T> { let wa = WrappedActionClient::<T> {
rcl_handle: client_handle, rcl_handle: client_handle,
goal_request_callbacks: Vec::new(), goal_response_channels: Vec::new(),
feedback_callbacks: Vec::new(), feedback_senders: Vec::new(),
goal_statuses: Vec::new(), result_senders: Vec::new(),
result_request_callbacks: Vec::new(), // goal_statuses: Vec::new(),
result_callbacks: Vec::new(), result_requests: Vec::new(),
}; };
let arc = Arc::new(Mutex::new(wa)); let arc = Arc::new(Mutex::new(wa));
@ -1555,25 +1577,8 @@ impl Node {
} }
if is_status_ready { if is_status_ready {
let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new(); let mut acs = s.lock().unwrap();
let ret = unsafe { acs.handle_status_msg();
rcl_action_take_status(ac, status_array.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
// TODO: actually use this information.
for a in &arr.status_list {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone()));
let status = GoalStatus::from_rcl(a.status);
println!("goal status for {}: {:?}", uuid, status);
if status == GoalStatus::Succeeded {
// query for the result.
let mut acs = s.lock().unwrap();
acs.run_result_request(&uuid);
}
}
}
} }
} }
@ -1768,7 +1773,6 @@ where
.handle .handle
.upgrade() .upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
// copy rust msg to native and publish it
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg); let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
let result = unsafe { let result = unsafe {
rcl_publish( rcl_publish(
@ -1821,7 +1825,6 @@ where
.upgrade() .upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?; .ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap(); let mut client = client.lock().unwrap();
// copy rust msg to native and publish it
let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg); let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg);
let mut seq_no = 0i64; let mut seq_no = 0i64;
let result = let result =
@ -1844,10 +1847,7 @@ impl<T> ActionClient<T>
where where
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
{ {
pub fn send_goal_request(&self, goal: T::Goal, pub fn send_goal_request(&self, goal: T::Goal) -> Result<impl Future<Output = Result<Goal<T>>>>
cb: Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response) -> ()>,
feedback_cb: Box<dyn FnMut(T::Feedback) -> ()>,
result_cb: Box<dyn FnOnce(T::Result) -> ()>) -> Result<()>
where where
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
{ {
@ -1857,12 +1857,9 @@ where
.upgrade() .upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?; .ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap(); let mut client = client.lock().unwrap();
// copy rust msg to native and publish it
let uuid = uuid::Uuid::new_v4(); let uuid = uuid::Uuid::new_v4();
let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() };
println!("UUID: {:?}", uuid);
client.feedback_callbacks.push((uuid, feedback_cb));
client.result_callbacks.push((uuid, result_cb));
let request_msg = T::make_goal_request_msg(uuid_msg, goal); let request_msg = T::make_goal_request_msg(uuid_msg, goal);
let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::SendGoal as let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::SendGoal as
WrappedServiceTypeSupport>::Request>::from(&request_msg); WrappedServiceTypeSupport>::Request>::from(&request_msg);
@ -1870,11 +1867,37 @@ where
let result = let result =
unsafe { rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; unsafe { rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
// set up channels
let (goal_req_sender, goal_req_receiver) =
oneshot::channel::<<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(1);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) = oneshot::channel::<T::Result>();
client.result_senders.push((uuid, result_sender));
if result == RCL_RET_OK as i32 { if result == RCL_RET_OK as i32 {
client.goal_request_callbacks.push((seq_no, cb)); client.goal_response_channels.push((seq_no, goal_req_sender));
Ok(()) // instead of "canceled" we return invalid client.
let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| {
match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
if accepted {
Ok(Goal {
feedback: feedback_receiver,
result: result_receiver,
})
} else {
println!("goal rejected");
Err(Error::GoalRejected)
}
},
Err(e) => Err(e),
}
});
Ok(future)
} else { } else {
eprintln!("coult not send request {}", result); eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result)) Err(Error::from_rcl_error(result))
} }
} }