cancel actions

This commit is contained in:
Martin Dahl 2021-06-18 13:47:21 +02:00
parent b70fc089b1
commit fe23d94bcd
3 changed files with 144 additions and 21 deletions

View File

@ -1,44 +1,65 @@
use futures::executor::LocalPool;
use futures::future;
use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use std::sync::{Arc,Mutex};
use r2r;
use r2r::example_interfaces::action::Fibonacci;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_action_client::<Fibonacci::Action>("/fibonacci")?;
let client = Arc::new(Mutex::new(node.create_action_client::<Fibonacci::Action>("/fibonacci")?));
// signal that we are done
let done = Arc::new(Mutex::new(false));
println!("waiting for action service...");
while !node.action_server_available(&client)? {
while !node.action_server_available(&client.lock().unwrap())? {
std::thread::sleep(std::time::Duration::from_millis(500));
}
println!("action service available.");
let goal = Fibonacci::Goal { order: 10 };
println!("sending goal: {:?}", goal);
let goal_fut = client.send_goal_request(goal)?;
let goal_fut = client.lock().unwrap().send_goal_request(goal)?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let task_spawner = spawner.clone();
let task_done = done.clone();
spawner.spawn_local(async move {
let goal = goal_fut.await.unwrap(); // assume success
// process feedback stream in its own task
let goal_id = goal.uuid.clone();
let nested_task_done = task_done.clone();
task_spawner
.spawn_local(goal.feedback.for_each(|msg| {
.spawn_local(goal.feedback.for_each(move |msg| {
let task_client = client.clone();
let task_done = nested_task_done.clone();
async move {
println!("new feedback msg {:?}", msg);
future::ready(())
}))
.unwrap();
// cancel the goal before it finishes. (comment out to complete the goal)
if msg.sequence.len() == 8 {
task_client.lock().unwrap().send_cancel_request(&goal_id).unwrap().
map(|r| {
println!("goal cancelled: {:?}", r);
// we are done.
*task_done.lock().unwrap() = true;
}).await;
}
}})).unwrap();
// await result in this task
let result = goal.result.await;
match result {
Ok(msg) => println!("got result {:?}", msg),
Ok(msg) => {
println!("got result {:?}", msg);
*task_done.lock().unwrap() = true;
},
Err(e) => println!("action failed: {:?}", e),
}
})?;
@ -46,5 +67,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
if *done.lock().unwrap() {
break;
}
}
Ok(())
}

View File

@ -85,6 +85,15 @@ pub enum Error {
// action errors. perhaps they exist in the rcl?
#[error("Goal rejected by server.")]
GoalRejected,
#[error("Goal cancel request rejected by server.")]
GoalCancelRejected,
#[error("Unknown goal id.")]
GoalCancelUnknownGoalID,
#[error("Goal already in a terminal state.")]
GoalCancelAlreadyTerminated,
}
impl Error {

View File

@ -488,6 +488,7 @@ where
{
rcl_handle: rcl_action_client_t,
goal_response_channels: Vec<(i64, oneshot::Sender<<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response>)>,
cancel_response_channels: Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
// goal_statuses: Vec<(uuid::Uuid, GoalStatus)>,
result_requests: Vec<(i64, uuid::Uuid)>,
@ -499,6 +500,7 @@ pub trait ActionClient_ {
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
fn handle_goal_response(&mut self) -> ();
fn handle_cancel_response(&mut self) -> ();
fn handle_feedback_msg(&mut self) -> ();
fn handle_status_msg(&mut self) -> ();
fn handle_result_response(&mut self) -> ();
@ -552,6 +554,37 @@ where
}
}
fn handle_cancel_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
let ret = unsafe {
rcl_action_take_cancel_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.cancel_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, sender) = self.cancel_response_channels.swap_remove(idx);
let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
match sender.send(response) {
Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e),
_ => (),
}
} else {
let we_have: String = self
.goal_response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_feedback_msg(&mut self) -> () {
let mut feedback_msg = WrappedNativeMsg::<T::FeedbackMessage>::new();
let ret = unsafe {
@ -723,6 +756,7 @@ pub struct Goal<T>
where
T: WrappedActionTypeSupport,
{
pub uuid: uuid::Uuid,
pub feedback: mpsc::Receiver<T::Feedback>,
pub result: oneshot::Receiver<T::Result>,
}
@ -1274,6 +1308,7 @@ impl Node {
let wa = WrappedActionClient::<T> {
rcl_handle: client_handle,
goal_response_channels: Vec::new(),
cancel_response_channels: Vec::new(),
feedback_senders: Vec::new(),
result_senders: Vec::new(),
// goal_statuses: Vec::new(),
@ -1561,16 +1596,6 @@ impl Node {
continue;
}
if is_goal_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_goal_response();
}
if is_result_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_result_response();
}
if is_feedback_ready {
let mut acs = s.lock().unwrap();
acs.handle_feedback_msg();
@ -1580,6 +1605,21 @@ impl Node {
let mut acs = s.lock().unwrap();
acs.handle_status_msg();
}
if is_goal_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_goal_response();
}
if is_cancel_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_cancel_response();
}
if is_result_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_result_response();
}
}
unsafe {
@ -1878,12 +1918,13 @@ where
if result == RCL_RET_OK as i32 {
client.goal_response_channels.push((seq_no, goal_req_sender));
// instead of "canceled" we return invalid client.
let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| {
let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(move |r| {
match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
if accepted {
Ok(Goal {
uuid,
feedback: feedback_receiver,
result: result_receiver,
})
@ -1901,6 +1942,53 @@ where
Err(Error::from_rcl_error(result))
}
}
pub fn send_cancel_request(&self, goal: &uuid::Uuid) -> Result<impl Future<Output = Result<()>>>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self
.client_
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
let msg = action_msgs::srv::CancelGoal::Request {
goal_info: action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID { uuid: goal.as_bytes().to_vec() },
.. action_msgs::msg::GoalInfo::default()
}
};
let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_action_send_cancel_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
if result == RCL_RET_OK as i32 {
let (cancel_req_sender, cancel_req_receiver) =
oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
client.cancel_response_channels.push((seq_no, cancel_req_sender));
// instead of "canceled" we return invalid client.
let future = cancel_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| {
match r {
Ok(r) => match r.return_code {
0 => Ok(()),
1 => Err(Error::GoalCancelRejected),
2 => Err(Error::GoalCancelUnknownGoalID),
3 => Err(Error::GoalCancelAlreadyTerminated),
x => panic!("unknown error code return from action server: {}", x),
},
Err(e) => Err(e)
}
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl PublisherUntyped {