From fe23d94bcdc43b19ce05bd668d42393b75684191 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Fri, 18 Jun 2021 13:47:21 +0200 Subject: [PATCH] cancel actions --- examples/action_client.rs | 46 ++++++++++++---- src/error.rs | 9 ++++ src/lib.rs | 110 ++++++++++++++++++++++++++++++++++---- 3 files changed, 144 insertions(+), 21 deletions(-) diff --git a/examples/action_client.rs b/examples/action_client.rs index 156e4a8..3ae5156 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -1,44 +1,65 @@ use futures::executor::LocalPool; -use futures::future; +use futures::future::FutureExt; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; +use std::sync::{Arc,Mutex}; use r2r; use r2r::example_interfaces::action::Fibonacci; fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; - let client = node.create_action_client::("/fibonacci")?; + let client = Arc::new(Mutex::new(node.create_action_client::("/fibonacci")?)); + + // signal that we are done + let done = Arc::new(Mutex::new(false)); println!("waiting for action service..."); - while !node.action_server_available(&client)? { + while !node.action_server_available(&client.lock().unwrap())? { std::thread::sleep(std::time::Duration::from_millis(500)); } println!("action service available."); let goal = Fibonacci::Goal { order: 10 }; println!("sending goal: {:?}", goal); - let goal_fut = client.send_goal_request(goal)?; + let goal_fut = client.lock().unwrap().send_goal_request(goal)?; let mut pool = LocalPool::new(); let spawner = pool.spawner(); let task_spawner = spawner.clone(); + let task_done = done.clone(); spawner.spawn_local(async move { let goal = goal_fut.await.unwrap(); // assume success // process feedback stream in its own task + let goal_id = goal.uuid.clone(); + let nested_task_done = task_done.clone(); task_spawner - .spawn_local(goal.feedback.for_each(|msg| { - println!("new feedback msg {:?}", msg); - future::ready(()) - })) - .unwrap(); + .spawn_local(goal.feedback.for_each(move |msg| { + let task_client = client.clone(); + let task_done = nested_task_done.clone(); + async move { + println!("new feedback msg {:?}", msg); + + // cancel the goal before it finishes. (comment out to complete the goal) + if msg.sequence.len() == 8 { + task_client.lock().unwrap().send_cancel_request(&goal_id).unwrap(). + map(|r| { + println!("goal cancelled: {:?}", r); + // we are done. + *task_done.lock().unwrap() = true; + }).await; + } + }})).unwrap(); // await result in this task let result = goal.result.await; match result { - Ok(msg) => println!("got result {:?}", msg), + Ok(msg) => { + println!("got result {:?}", msg); + *task_done.lock().unwrap() = true; + }, Err(e) => println!("action failed: {:?}", e), } })?; @@ -46,5 +67,10 @@ fn main() -> Result<(), Box> { loop { node.spin_once(std::time::Duration::from_millis(100)); pool.run_until_stalled(); + if *done.lock().unwrap() { + break; + } } + + Ok(()) } diff --git a/src/error.rs b/src/error.rs index d1273a3..318d5e1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -85,6 +85,15 @@ pub enum Error { // action errors. perhaps they exist in the rcl? #[error("Goal rejected by server.")] GoalRejected, + + #[error("Goal cancel request rejected by server.")] + GoalCancelRejected, + + #[error("Unknown goal id.")] + GoalCancelUnknownGoalID, + + #[error("Goal already in a terminal state.")] + GoalCancelAlreadyTerminated, } impl Error { diff --git a/src/lib.rs b/src/lib.rs index d84d4b5..1ca46e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -488,6 +488,7 @@ where { rcl_handle: rcl_action_client_t, goal_response_channels: Vec<(i64, oneshot::Sender<<::SendGoal as WrappedServiceTypeSupport>::Response>)>, + cancel_response_channels: Vec<(i64, oneshot::Sender)>, feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, // goal_statuses: Vec<(uuid::Uuid, GoalStatus)>, result_requests: Vec<(i64, uuid::Uuid)>, @@ -499,6 +500,7 @@ pub trait ActionClient_ { fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn handle_goal_response(&mut self) -> (); + fn handle_cancel_response(&mut self) -> (); fn handle_feedback_msg(&mut self) -> (); fn handle_status_msg(&mut self) -> (); fn handle_result_response(&mut self) -> (); @@ -552,6 +554,37 @@ where } } + fn handle_cancel_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::::new(); + + let ret = unsafe { + rcl_action_take_cancel_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut()) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self.cancel_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + let (_, sender) = self.cancel_response_channels.swap_remove(idx); + let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg); + match sender.send(response) { + Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e), + _ => (), + } + } else { + let we_have: String = self + .goal_response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + fn handle_feedback_msg(&mut self) -> () { let mut feedback_msg = WrappedNativeMsg::::new(); let ret = unsafe { @@ -723,6 +756,7 @@ pub struct Goal where T: WrappedActionTypeSupport, { + pub uuid: uuid::Uuid, pub feedback: mpsc::Receiver, pub result: oneshot::Receiver, } @@ -1274,6 +1308,7 @@ impl Node { let wa = WrappedActionClient:: { rcl_handle: client_handle, goal_response_channels: Vec::new(), + cancel_response_channels: Vec::new(), feedback_senders: Vec::new(), result_senders: Vec::new(), // goal_statuses: Vec::new(), @@ -1561,16 +1596,6 @@ impl Node { continue; } - if is_goal_response_ready { - let mut acs = s.lock().unwrap(); - acs.handle_goal_response(); - } - - if is_result_response_ready { - let mut acs = s.lock().unwrap(); - acs.handle_result_response(); - } - if is_feedback_ready { let mut acs = s.lock().unwrap(); acs.handle_feedback_msg(); @@ -1580,6 +1605,21 @@ impl Node { let mut acs = s.lock().unwrap(); acs.handle_status_msg(); } + + if is_goal_response_ready { + let mut acs = s.lock().unwrap(); + acs.handle_goal_response(); + } + + if is_cancel_response_ready { + let mut acs = s.lock().unwrap(); + acs.handle_cancel_response(); + } + + if is_result_response_ready { + let mut acs = s.lock().unwrap(); + acs.handle_result_response(); + } } unsafe { @@ -1878,12 +1918,13 @@ where if result == RCL_RET_OK as i32 { client.goal_response_channels.push((seq_no, goal_req_sender)); // instead of "canceled" we return invalid client. - let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| { + let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(move |r| { match r { Ok(resp) => { let (accepted, _stamp) = T::destructure_goal_response_msg(resp); if accepted { Ok(Goal { + uuid, feedback: feedback_receiver, result: result_receiver, }) @@ -1901,6 +1942,53 @@ where Err(Error::from_rcl_error(result)) } } + + pub fn send_cancel_request(&self, goal: &uuid::Uuid) -> Result>> + where + T: WrappedActionTypeSupport, + { + // upgrade to actual ref. if still alive + let client = self + .client_ + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + let msg = action_msgs::srv::CancelGoal::Request { + goal_info: action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { uuid: goal.as_bytes().to_vec() }, + .. action_msgs::msg::GoalInfo::default() + } + }; + let native_msg = WrappedNativeMsg::::from(&msg); + let mut seq_no = 0i64; + let result = + unsafe { rcl_action_send_cancel_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + + if result == RCL_RET_OK as i32 { + let (cancel_req_sender, cancel_req_receiver) = + oneshot::channel::(); + + client.cancel_response_channels.push((seq_no, cancel_req_sender)); + // instead of "canceled" we return invalid client. + let future = cancel_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| { + match r { + Ok(r) => match r.return_code { + 0 => Ok(()), + 1 => Err(Error::GoalCancelRejected), + 2 => Err(Error::GoalCancelUnknownGoalID), + 3 => Err(Error::GoalCancelAlreadyTerminated), + x => panic!("unknown error code return from action server: {}", x), + }, + Err(e) => Err(e) + } + }); + Ok(future) + } else { + eprintln!("coult not send goal request {}", result); + Err(Error::from_rcl_error(result)) + } + } } impl PublisherUntyped {