From b70fc089b1f01ffd98b2b7f60f038affa7199f43 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 16 Jun 2021 14:39:27 +0200 Subject: [PATCH] async action client + example --- examples/action_client.rs | 58 +++++----- msg_gen/src/lib.rs | 4 + src/error.rs | 4 + src/lib.rs | 221 +++++++++++++++++++++----------------- 4 files changed, 160 insertions(+), 127 deletions(-) diff --git a/examples/action_client.rs b/examples/action_client.rs index 4356c5a..156e4a8 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -1,7 +1,9 @@ +use futures::executor::LocalPool; +use futures::future; +use futures::stream::StreamExt; +use futures::task::LocalSpawnExt; use r2r; use r2r::example_interfaces::action::Fibonacci; -use std::cell::RefCell; -use std::rc::Rc; fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; @@ -15,34 +17,34 @@ fn main() -> Result<(), Box> { println!("action service available."); let goal = Fibonacci::Goal { order: 10 }; - let goal_accepted = Rc::new(RefCell::new(None)); - let cb_ga = goal_accepted.clone(); - let cb = Box::new(move |r: Fibonacci::SendGoal::Response| { - println!("got response {:?}", r); - *cb_ga.borrow_mut() = Some(r.accepted); - }); - - let feedback_cb = Box::new(move |fb: Fibonacci::Feedback| { - println!("got feedback {:?}", fb); - }); - - let result_cb = Box::new(move |r: Fibonacci::Result| { - println!("final result {:?}", r); - }); - println!("sending goal: {:?}", goal); - client.send_goal_request(goal, cb, feedback_cb, result_cb)?; + let goal_fut = client.send_goal_request(goal)?; - let mut c = 0; - loop { - node.spin_once(std::time::Duration::from_millis(1000)); - std::thread::sleep(std::time::Duration::from_millis(1000)); - c += 1; - if c > 100 { - println!("shutdown"); - break; + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + let task_spawner = spawner.clone(); + spawner.spawn_local(async move { + let goal = goal_fut.await.unwrap(); // assume success + + // process feedback stream in its own task + task_spawner + .spawn_local(goal.feedback.for_each(|msg| { + println!("new feedback msg {:?}", msg); + future::ready(()) + })) + .unwrap(); + + // await result in this task + let result = goal.result.await; + match result { + Ok(msg) => println!("got result {:?}", msg), + Err(e) => println!("action failed: {:?}", e), } - } + })?; - Ok(()) + loop { + node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); + } } diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index 276f8f2..0caaff0 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -154,6 +154,10 @@ pub fn generate_rust_action(module_: &str, prefix_: &str, name_: &str) -> String }} }} + fn destructure_goal_response_msg(msg: SendGoal::Response) -> (bool, builtin_interfaces::msg::Time) {{ + (msg.accepted, msg.stamp) + }} + fn destructure_feedback_msg(msg: FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Feedback) {{ (msg.goal_id, msg.feedback) }} diff --git a/src/error.rs b/src/error.rs index 3082221..d1273a3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -81,6 +81,10 @@ pub enum Error { InvalidMessageType { msgtype: String }, #[error("Serde error: {}", err)] SerdeError { err: String }, + + // action errors. perhaps they exist in the rcl? + #[error("Goal rejected by server.")] + GoalRejected, } impl Error { diff --git a/src/lib.rs b/src/lib.rs index 15164ab..d84d4b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ use std::fmt::Debug; use std::future::Future; use futures::future::TryFutureExt; +use futures::future::FutureExt; use futures::channel::{mpsc, oneshot}; use futures::stream::{Stream, StreamExt}; @@ -63,6 +64,7 @@ pub trait WrappedActionTypeSupport { <::SendGoal as WrappedServiceTypeSupport>::Request; fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) -> <::GetResult as WrappedServiceTypeSupport>::Request; + fn destructure_goal_response_msg(msg: <::SendGoal as WrappedServiceTypeSupport>::Response) -> (bool, builtin_interfaces::msg::Time); fn destructure_feedback_msg(msg: Self::FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); fn destructure_result_response_msg(msg: <::GetResult as WrappedServiceTypeSupport>::Response) -> (i8, Self::Result); @@ -353,7 +355,6 @@ where } fn run_cb(&mut self) -> () { - // copy native msg to rust type and run callback let request = T::Request::from_native(&self.rcl_request_msg); let response = (self.callback)(request); let mut native_response = WrappedNativeMsg::::from(&response); @@ -412,15 +413,13 @@ where if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; if let Some(idx) = self.response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { - let (_, channel) = self.response_channels.swap_remove(idx); + let (_, sender) = self.response_channels.swap_remove(idx); let response = T::Response::from_native(&response_msg); - match channel.send(response) { + match sender.send(response) { Ok(()) => {}, - Err(e) => { println!("error sending: {:?}", e); }, + Err(e) => { println!("error sending to client: {:?}", e); }, } } else { - // I don't think this should be able to occur? Let's panic so we - // find out... let we_have: String = self .response_channels .iter() @@ -488,11 +487,11 @@ where T: WrappedActionTypeSupport, { rcl_handle: rcl_action_client_t, - goal_request_callbacks: Vec<(i64, Box::SendGoal as WrappedServiceTypeSupport>::Response)>)>, - feedback_callbacks: Vec<(uuid::Uuid, Box ()>)>, - goal_statuses: Vec<(uuid::Uuid, GoalStatus)>, - result_request_callbacks: Vec<(i64, Box::GetResult as WrappedServiceTypeSupport>::Response)>)>, - result_callbacks: Vec<(uuid::Uuid, Box ()>)>, + goal_response_channels: Vec<(i64, oneshot::Sender<<::SendGoal as WrappedServiceTypeSupport>::Response>)>, + feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, + // goal_statuses: Vec<(uuid::Uuid, GoalStatus)>, + result_requests: Vec<(i64, uuid::Uuid)>, + result_senders: Vec<(uuid::Uuid, oneshot::Sender)>, } pub trait ActionClient_ { @@ -501,9 +500,10 @@ pub trait ActionClient_ { fn handle_goal_response(&mut self) -> (); fn handle_feedback_msg(&mut self) -> (); + fn handle_status_msg(&mut self) -> (); fn handle_result_response(&mut self) -> (); - fn run_result_request(&mut self, uuid: &uuid::Uuid) -> (); + fn send_result_request(&mut self, uuid: uuid::Uuid) -> (); } @@ -513,36 +513,6 @@ fn vec_to_uuid_bytes(v: Vec) -> [T; 16] { .unwrap_or_else(|v: Vec| panic!("Expected a Vec of length {} but it was {}", 16, v.len())) } - -impl WrappedActionClient -where - T: WrappedActionTypeSupport, -{ - pub fn send_result_request(&mut self, uuid: &uuid::Uuid, cb: Box ()>) -> Result<()> { - let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; - let request_msg = T::make_result_request_msg(uuid_msg); - let native_msg = WrappedNativeMsg::<<::GetResult as - WrappedServiceTypeSupport>::Request>::from(&request_msg); - let mut seq_no = 0i64; - let result = - unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; - - if result == RCL_RET_OK as i32 { - self.result_request_callbacks.push((seq_no, Box::new(move |r: <::GetResult as - WrappedServiceTypeSupport>::Response| { - let (status, result) = T::destructure_result_response_msg(r); - let status = GoalStatus::from_rcl(status); - println!("status: {:?}, result: {:?}", status, result); - (cb)(result); - }))); - Ok(()) - } else { - eprintln!("coult not send request {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - impl ActionClient_ for WrappedActionClient where T: WrappedActionTypeSupport, @@ -560,15 +530,16 @@ where }; if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.goal_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) { - let (_, cb_to_run) = self.goal_request_callbacks.swap_remove(idx); + if let Some(idx) = self.goal_response_channels.iter().position(|(id, _)| id == &request_id.sequence_number) { + let (_, sender) = self.goal_response_channels.swap_remove(idx); let response = <::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); - (cb_to_run)(response); + match sender.send(response) { + Ok(()) => {}, + Err(e) => { println!("error sending to action client: {:?}", e); }, + } } else { - // I don't think this should be able to occur? Let's panic so we - // find out... let we_have: String = self - .goal_request_callbacks + .goal_response_channels .iter() .map(|(id, _)| id.to_string()) .collect::>() @@ -590,8 +561,31 @@ where let msg = T::FeedbackMessage::from_native(&feedback_msg); let (uuid, feedback) = T::destructure_feedback_msg(msg); let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); - if let Some((_,cb)) = self.feedback_callbacks.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) { - (cb)(feedback); + if let Some((_,sender)) = self.feedback_senders.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) { + match sender.try_send(feedback) { + Err(e) => eprintln!("warning: could not send feedback msg ({})", e), + _ => (), + } + } + } + } + + fn handle_status_msg(&mut self) -> () { + let mut status_array = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) + }; + if ret == RCL_RET_OK as i32 { + let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); + // TODO: actually use this information. + for a in &arr.status_list { + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); + if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { continue; } + let status = GoalStatus::from_rcl(a.status); + if status == GoalStatus::Succeeded { + self.send_result_request(uuid); + } + // TODO, cancel futures when goal is aborted etc. } } } @@ -606,15 +600,26 @@ where if ret == RCL_RET_OK as i32 { let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self.result_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) { - let (_, cb_to_run) = self.result_request_callbacks.swap_remove(idx); - let response = <::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); - (cb_to_run)(response); + if let Some(idx) = self.result_requests.iter().position(|(id, _)| id == &request_id.sequence_number) { + let (_, uuid) = self.result_requests.swap_remove(idx); + if let Some(idx) = self.result_senders.iter().position(|(suuid, _)| suuid == &uuid) { + let (_, sender) = self.result_senders.swap_remove(idx); + let response = <::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); + let (status, result) = T::destructure_result_response_msg(response); + let status = GoalStatus::from_rcl(status); + if status != GoalStatus::Succeeded { + println!("goal status failed: {:?}, result: {:?}", status, result); + // this will drop the sender which makes the receiver fail with "canceled" + } else { + match sender.send(result) { + Ok(()) => {}, + Err(e) => { println!("error sending result to action client: {:?}", e); }, + } + } + } } else { - // I don't think this should be able to occur? Let's panic so we - // find out... let we_have: String = self - .result_request_callbacks + .result_requests .iter() .map(|(id, _)| id.to_string()) .collect::>() @@ -627,11 +632,19 @@ where } } - fn run_result_request(&mut self, uuid: &uuid::Uuid) -> () { - if let Some(idx) = self.result_callbacks.iter().position(|(cb_uuid, _)| cb_uuid == uuid) { - let (_, result_cb) = self.result_callbacks.swap_remove(idx); - println!("asking for final result for {}", uuid); - self.send_result_request(uuid, result_cb).unwrap(); // TODO error handling. + fn send_result_request(&mut self, uuid: uuid::Uuid) -> () { + let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; + let request_msg = T::make_result_request_msg(uuid_msg); + let native_msg = WrappedNativeMsg::<<::GetResult as + WrappedServiceTypeSupport>::Request>::from(&request_msg); + let mut seq_no = 0i64; + let result = + unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + + if result == RCL_RET_OK as i32 { + self.result_requests.push((seq_no, uuid)); + } else { + eprintln!("coult not send request {}", result); } } @@ -705,6 +718,15 @@ where client_: Weak>>, } +#[derive(Debug)] +pub struct Goal +where + T: WrappedActionTypeSupport, +{ + pub feedback: mpsc::Receiver, + pub result: oneshot::Receiver, +} + #[derive(Debug, Clone)] pub struct Context { context_handle: Arc>, @@ -1251,11 +1273,11 @@ impl Node { let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; let wa = WrappedActionClient:: { rcl_handle: client_handle, - goal_request_callbacks: Vec::new(), - feedback_callbacks: Vec::new(), - goal_statuses: Vec::new(), - result_request_callbacks: Vec::new(), - result_callbacks: Vec::new(), + goal_response_channels: Vec::new(), + feedback_senders: Vec::new(), + result_senders: Vec::new(), + // goal_statuses: Vec::new(), + result_requests: Vec::new(), }; let arc = Arc::new(Mutex::new(wa)); @@ -1555,25 +1577,8 @@ impl Node { } if is_status_ready { - let mut status_array = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_action_take_status(ac, status_array.void_ptr_mut()) - }; - if ret == RCL_RET_OK as i32 { - let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); - // TODO: actually use this information. - for a in &arr.status_list { - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); - let status = GoalStatus::from_rcl(a.status); - println!("goal status for {}: {:?}", uuid, status); - - if status == GoalStatus::Succeeded { - // query for the result. - let mut acs = s.lock().unwrap(); - acs.run_result_request(&uuid); - } - } - } + let mut acs = s.lock().unwrap(); + acs.handle_status_msg(); } } @@ -1768,7 +1773,6 @@ where .handle .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - // copy rust msg to native and publish it let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); let result = unsafe { rcl_publish( @@ -1821,7 +1825,6 @@ where .upgrade() .ok_or(Error::RCL_RET_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); - // copy rust msg to native and publish it let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); let mut seq_no = 0i64; let result = @@ -1844,10 +1847,7 @@ impl ActionClient where T: WrappedActionTypeSupport, { - pub fn send_goal_request(&self, goal: T::Goal, - cb: Box::SendGoal as WrappedServiceTypeSupport>::Response) -> ()>, - feedback_cb: Box ()>, - result_cb: Box ()>) -> Result<()> + pub fn send_goal_request(&self, goal: T::Goal) -> Result>>> where T: WrappedActionTypeSupport, { @@ -1857,12 +1857,9 @@ where .upgrade() .ok_or(Error::RCL_RET_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); - // copy rust msg to native and publish it + let uuid = uuid::Uuid::new_v4(); let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() }; - println!("UUID: {:?}", uuid); - client.feedback_callbacks.push((uuid, feedback_cb)); - client.result_callbacks.push((uuid, result_cb)); let request_msg = T::make_goal_request_msg(uuid_msg, goal); let native_msg = WrappedNativeMsg::<<::SendGoal as WrappedServiceTypeSupport>::Request>::from(&request_msg); @@ -1870,11 +1867,37 @@ where let result = unsafe { rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + // set up channels + let (goal_req_sender, goal_req_receiver) = + oneshot::channel::<<::SendGoal as WrappedServiceTypeSupport>::Response>(); + let (feedback_sender, feedback_receiver) = mpsc::channel::(1); + client.feedback_senders.push((uuid, feedback_sender)); + let (result_sender, result_receiver) = oneshot::channel::(); + client.result_senders.push((uuid, result_sender)); + if result == RCL_RET_OK as i32 { - client.goal_request_callbacks.push((seq_no, cb)); - Ok(()) + client.goal_response_channels.push((seq_no, goal_req_sender)); + // instead of "canceled" we return invalid client. + let future = goal_req_receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID).map(|r| { + match r { + Ok(resp) => { + let (accepted, _stamp) = T::destructure_goal_response_msg(resp); + if accepted { + Ok(Goal { + feedback: feedback_receiver, + result: result_receiver, + }) + } else { + println!("goal rejected"); + Err(Error::GoalRejected) + } + }, + Err(e) => Err(e), + } + }); + Ok(future) } else { - eprintln!("coult not send request {}", result); + eprintln!("coult not send goal request {}", result); Err(Error::from_rcl_error(result)) } }