diff --git a/Cargo.toml b/Cargo.toml index 0044608..1befd94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "r2r" -version = "0.3.5" +version = "0.3.6" authors = ["Martin Dahl "] description = "Minimal ros2 bindings." license = "Apache-2.0/MIT" diff --git a/build.rs b/build.rs index 8316eb8..a37e412 100644 --- a/build.rs +++ b/build.rs @@ -119,11 +119,13 @@ fn main() { let untyped_helper = msg_gen::generate_untyped_helper(&msg_list); let untyped_service_helper = msg_gen::generate_untyped_service_helper(&msg_list); + let untyped_action_helper = msg_gen::generate_untyped_action_helper(&msg_list); let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); let msgs_fn = out_path.join("_r2r_generated_msgs.rs"); let untyped_fn = out_path.join("_r2r_generated_untyped_helper.rs"); let untyped_service_fn = out_path.join("_r2r_generated_service_helper.rs"); + let untyped_action_fn = out_path.join("_r2r_generated_action_helper.rs"); let mut f = File::create(msgs_fn).unwrap(); write!(f, "{}", modules).unwrap(); @@ -131,4 +133,6 @@ fn main() { write!(f, "{}", untyped_helper).unwrap(); let mut f = File::create(untyped_service_fn).unwrap(); write!(f, "{}", untyped_service_helper).unwrap(); + let mut f = File::create(untyped_action_fn).unwrap(); + write!(f, "{}", untyped_action_helper).unwrap(); } diff --git a/examples/action_client.rs b/examples/action_client.rs index aaf48df..c54e800 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -33,7 +33,7 @@ fn main() -> Result<(), Box> { .send_goal_request(goal) .expect("could not send goal request") .await - .expect("did not get goal"); + .expect("goal rejected by server"); println!("goal accepted: {}", goal.uuid); // process feedback stream in its own task diff --git a/examples/action_client_untyped.rs b/examples/action_client_untyped.rs new file mode 100644 index 0000000..ccc5fe1 --- /dev/null +++ b/examples/action_client_untyped.rs @@ -0,0 +1,93 @@ +// +// This example is the same as action_client.rs but stripped of all (explicit) type information. +// +use futures::executor::LocalPool; +use futures::future::FutureExt; +use futures::stream::StreamExt; +use futures::task::LocalSpawnExt; +use r2r; +use std::sync::{Arc, Mutex}; + +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let client = + node.create_action_client_untyped("/fibonacci", "example_interfaces/action/Fibonacci")?; + let action_server_available = node.is_available(&client)?; + + // signal that we are done + let done = Arc::new(Mutex::new(false)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + let task_spawner = spawner.clone(); + let task_done = done.clone(); + spawner.spawn_local(async move { + println!("waiting for action service..."); + action_server_available + .await + .expect("could not await action server"); + println!("action service available."); + + let goal = "{ \"order\": 5 }"; // Fibonacci::Goal { order: 5 }; + println!("sending goal: {}", goal); + let (goal, result, feedback) = client + .send_goal_request(serde_json::from_str(goal).expect("json malformed")) + .expect("could not send goal request") + .await + .expect("goal rejected by server"); + + println!("goal accepted: {}", goal.uuid); + // process feedback stream in its own task + let nested_goal = goal.clone(); + let nested_task_done = task_done.clone(); + task_spawner + .spawn_local(feedback.for_each(move |msg| { + let nested_task_done = nested_task_done.clone(); + let nested_goal = nested_goal.clone(); + async move { + if let Ok(msg) = msg { + let sequence = msg + .get("sequence") + .and_then(|s| s.as_array()) + .expect("unexpected feedback msg"); + println!("new feedback msg {} -- {:?}", msg, nested_goal.get_status()); + + // 50/50 that cancel the goal before it finishes. + if sequence.len() == 4 && rand::random::() { + nested_goal + .cancel() + .unwrap() + .map(|r| { + println!("goal cancelled: {:?}", r); + // we are done. + *nested_task_done.lock().unwrap() = true; + }) + .await; + } + } + } + })) + .unwrap(); + + // await result in this task + match result.await { + Ok((status, msg)) => { + println!("got result {} with msg {:?}", status, msg); + *task_done.lock().unwrap() = true; + } + Err(e) => println!("action failed: {:?}", e), + } + })?; + + loop { + node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); + if *done.lock().unwrap() { + break; + } + } + + Ok(()) +} diff --git a/examples/action_server.rs b/examples/action_server.rs index c6aaf24..3761402 100644 --- a/examples/action_server.rs +++ b/examples/action_server.rs @@ -49,12 +49,13 @@ async fn fibonacci_server( "Got goal request with order {}, goal id: {}", req.goal.order, req.uuid ); - // reject high orders - if req.goal.order >= 100 { - req.reject().unwrap(); + // 1/4 chance that we reject the goal for testing. + if rand::random::() && rand::random::() { + println!("rejecting goal"); + req.reject().expect("could not reject goal"); continue; } - let (mut g, mut cancel) = req.accept().unwrap(); + let (mut g, mut cancel) = req.accept().expect("could not accept goal"); let goal_fut = spawner .spawn_local_with_handle(run_goal(node.clone(), g.clone())) diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index e06d2a5..5677728 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -591,3 +591,44 @@ impl UntypedServiceSupport { format!("{}{}{}", open, lines, close) } + +pub fn generate_untyped_action_helper(msgs: &Vec) -> String { + let open = String::from( + " +impl UntypedActionSupport { + pub fn new_from(typename: &str) -> Result { +", + ); + let close = String::from( + " + else + { + return Err(Error::InvalidMessageType{ msgtype: typename.into() }) + } + } +} +", + ); + + let mut lines = String::new(); + for msg in msgs { + if msg.prefix != "action" { + continue; + } + + let typename = format!("{}/{}/{}", msg.module, msg.prefix, msg.name); + let rustname = format!("{}::{}::{}::Action", msg.module, msg.prefix, msg.name); + + lines.push_str(&format!( + " + if typename == \"{typename}\" {{ + return Ok(UntypedActionSupport::new::<{rustname}>()); + }} +", + typename = typename, + rustname = rustname + )); + } + + format!("{}{}{}", open, lines, close) +} diff --git a/src/action_clients.rs b/src/action_clients.rs index 152fee0..1b88506 100644 --- a/src/action_clients.rs +++ b/src/action_clients.rs @@ -85,9 +85,8 @@ where }; // set up channels - let (goal_req_sender, goal_req_receiver) = oneshot::channel::< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >(); + let (goal_req_sender, goal_req_receiver) = + oneshot::channel::<(bool, builtin_interfaces::msg::Time)>(); let (feedback_sender, feedback_receiver) = mpsc::channel::(10); client.feedback_senders.push((uuid, feedback_sender)); let (result_sender, result_receiver) = oneshot::channel::<(GoalStatus, T::Result)>(); @@ -102,8 +101,7 @@ where let future = goal_req_receiver .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID) .map(move |r| match r { - Ok(resp) => { - let (accepted, _stamp) = T::destructure_goal_response_msg(resp); + Ok((accepted, _stamp)) => { if accepted { // on goal accept we immediately send the result request { @@ -123,7 +121,6 @@ where feedback_receiver, )) } else { - println!("goal rejected"); Err(Error::RCL_RET_ACTION_GOAL_REJECTED) } } @@ -204,12 +201,7 @@ where T: WrappedActionTypeSupport, { pub rcl_handle: rcl_action_client_t, - pub goal_response_channels: Vec<( - i64, - oneshot::Sender< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >, - )>, + pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>, pub cancel_response_channels: Vec<(i64, oneshot::Sender)>, pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, @@ -329,7 +321,8 @@ where { let (_, sender) = self.goal_response_channels.swap_remove(idx); let response = <::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); - match sender.send(response) { + let (accept, stamp) = T::destructure_goal_response_msg(response); + match sender.send((accept, stamp)) { Ok(()) => {} Err(e) => { println!("error sending to action client: {:?}", e); diff --git a/src/action_clients_untyped.rs b/src/action_clients_untyped.rs new file mode 100644 index 0000000..753049e --- /dev/null +++ b/src/action_clients_untyped.rs @@ -0,0 +1,435 @@ +use super::*; + +// +// TODO: refactor this to separate out shared code between action client and this. +// + +unsafe impl Send for ActionClientUntyped {} + +#[derive(Clone)] +pub struct ActionClientUntyped { + client: Weak>, +} + +#[derive(Clone)] +pub struct ClientGoalUntyped { + client: Weak>, + pub uuid: uuid::Uuid, +} + +impl ClientGoalUntyped { + pub fn get_status(&self) -> Result { + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + + Ok(client.get_goal_status(&self.uuid)) + } + + pub fn cancel(&self) -> Result>> { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + client.send_cancel_request(&self.uuid) + } +} + +impl ActionClientUntyped { + pub fn send_goal_request( + &self, + goal: serde_json::Value, // T::Goal + ) -> Result< + impl Future< + Output = Result<( + ClientGoalUntyped, + impl Future)>>, // T::Result + impl Stream> + Unpin, // T::Feedback + )>, + >, + > { + // upgrade to actual ref. if still alive + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + + let uuid = uuid::Uuid::new_v4(); + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; + + let native_msg = (client.action_type_support.make_goal_request_msg)(uuid_msg, goal); + + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + // set up channels + let (goal_req_sender, goal_req_receiver) = + oneshot::channel::<(bool, builtin_interfaces::msg::Time)>(); + let (feedback_sender, feedback_receiver) = mpsc::channel::>(10); + client.feedback_senders.push((uuid, feedback_sender)); + let (result_sender, result_receiver) = + oneshot::channel::<(GoalStatus, Result)>(); + client.result_senders.push((uuid, result_sender)); + + if result == RCL_RET_OK as i32 { + client + .goal_response_channels + .push((seq_no, goal_req_sender)); + // instead of "canceled" we return invalid client. + let fut_client = Weak::clone(&self.client); + let future = goal_req_receiver + .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID) + .map(move |r| match r { + Ok((accepted, _stamp)) => { + if accepted { + // on goal accept we immediately send the result request + { + let c = fut_client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut c = c.lock().unwrap(); + c.send_result_request(uuid.clone()); + } + + Ok(( + ClientGoalUntyped { + client: fut_client, + uuid, + }, + result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID), + feedback_receiver, + )) + } else { + Err(Error::RCL_RET_ACTION_GOAL_REJECTED) + } + } + Err(e) => Err(e), + }); + Ok(future) + } else { + eprintln!("coult not send goal request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +pub fn make_action_client_untyped( + client: Weak>, +) -> ActionClientUntyped { + ActionClientUntyped { client } +} + +pub struct WrappedActionClientUntyped { + pub action_type_support: UntypedActionSupport, + pub rcl_handle: rcl_action_client_t, + pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>, + pub cancel_response_channels: + Vec<(i64, oneshot::Sender)>, + pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender>)>, + pub result_requests: Vec<(i64, uuid::Uuid)>, + pub result_senders: Vec<( + uuid::Uuid, + oneshot::Sender<(GoalStatus, Result)>, + )>, + pub goal_status: HashMap, + + pub poll_available_channels: Vec>, +} + +impl WrappedActionClientUntyped { + pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus { + *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown) + } + + pub fn send_cancel_request( + &mut self, + goal: &uuid::Uuid, + ) -> Result>> { + let msg = action_msgs::srv::CancelGoal::Request { + goal_info: action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { + uuid: goal.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }, + }; + let native_msg = WrappedNativeMsg::::from(&msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + if result == RCL_RET_OK as i32 { + let (cancel_req_sender, cancel_req_receiver) = + oneshot::channel::(); + + self.cancel_response_channels + .push((seq_no, cancel_req_sender)); + // instead of "canceled" we return invalid client. + let future = cancel_req_receiver + .map_err(|_| Error::RCL_RET_CLIENT_INVALID) + .map(|r| match r { + Ok(r) => match r.return_code { + 0 => Ok(()), + 1 => Err(Error::GoalCancelRejected), + 2 => Err(Error::GoalCancelUnknownGoalID), + 3 => Err(Error::GoalCancelAlreadyTerminated), + x => panic!("unknown error code return from action server: {}", x), + }, + Err(e) => Err(e), + }); + Ok(future) + } else { + eprintln!("coult not send goal request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +impl ActionClient_ for WrappedActionClientUntyped { + fn handle(&self) -> &rcl_action_client_t { + &self.rcl_handle + } + + fn handle_goal_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = (self.action_type_support.make_goal_response_msg)(); + + let ret = unsafe { + rcl_action_take_goal_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .goal_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.goal_response_channels.swap_remove(idx); + let (accept, stamp) = + (self.action_type_support.destructure_goal_response_msg)(response_msg); + match sender.send((accept, stamp)) { + Ok(()) => {} + Err(e) => { + println!("error sending to action client: {:?}", e); + } + } + } else { + let we_have: String = self + .goal_response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn handle_cancel_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::::new(); + + let ret = unsafe { + rcl_action_take_cancel_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .cancel_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.cancel_response_channels.swap_remove(idx); + let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg); + match sender.send(response) { + Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e), + _ => (), + } + } else { + let we_have: String = self + .goal_response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn handle_feedback_msg(&mut self) -> () { + let mut feedback_msg = (self.action_type_support.make_feedback_msg)(); + let ret = + unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) }; + if ret == RCL_RET_OK as i32 { + let (uuid, feedback) = + (self.action_type_support.destructure_feedback_msg)(feedback_msg); + let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); + if let Some((_, sender)) = self + .feedback_senders + .iter_mut() + .find(|(uuid, _)| uuid == &msg_uuid) + { + match sender.try_send(feedback) { + Err(e) => eprintln!("warning: could not send feedback msg ({})", e), + _ => (), + } + } + } + } + + fn handle_status_msg(&mut self) -> () { + let mut status_array = WrappedNativeMsg::::new(); + let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) }; + if ret == RCL_RET_OK as i32 { + let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); + for a in &arr.status_list { + let uuid = + uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); + if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { + continue; + } + let status = GoalStatus::from_rcl(a.status); + *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status; + } + } + } + + fn handle_result_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = (self.action_type_support.make_result_response_msg)(); + let ret = unsafe { + rcl_action_take_result_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .result_requests + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, uuid) = self.result_requests.swap_remove(idx); + if let Some(idx) = self + .result_senders + .iter() + .position(|(suuid, _)| suuid == &uuid) + { + let (_, sender) = self.result_senders.swap_remove(idx); + let (status, result) = + (self.action_type_support.destructure_result_response_msg)(response_msg); + let status = GoalStatus::from_rcl(status); + match sender.send((status, result)) { + Ok(()) => {} + Err(e) => { + println!("error sending result to action client: {:?}", e); + } + } + } + } else { + let we_have: String = self + .result_requests + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn send_result_request(&mut self, uuid: uuid::Uuid) -> () { + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; + let native_msg = (self.action_type_support.make_result_request_msg)(uuid_msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + if result == RCL_RET_OK as i32 { + self.result_requests.push((seq_no, uuid)); + } else { + eprintln!("coult not send request {}", result); + } + } + + fn register_poll_available(&mut self, s: oneshot::Sender<()>) { + self.poll_available_channels.push(s); + } + + fn poll_available(&mut self, node: &mut rcl_node_t) { + if self.poll_available_channels.is_empty() { + return; + } + let available = action_server_available_helper(node, self.handle()); + match available { + Ok(true) => { + // send ok and close channels + while let Some(sender) = self.poll_available_channels.pop() { + let _res = sender.send(()); // we ignore if receiver dropped. + } + } + Ok(false) => { + // not available... + } + Err(_) => { + // error, close all channels + self.poll_available_channels.clear(); + } + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_action_client_fini(&mut self.rcl_handle, node); + } + } +} + +use crate::nodes::IsAvailablePollable; + +impl IsAvailablePollable for ActionClientUntyped { + fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> { + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.register_poll_available(sender); + Ok(()) + } +} diff --git a/src/action_servers.rs b/src/action_servers.rs index 6702303..8eb5f02 100644 --- a/src/action_servers.rs +++ b/src/action_servers.rs @@ -149,7 +149,7 @@ where let server = self.server.upgrade().unwrap(); // todo fixme let mut server = server.lock().unwrap(); - let response_msg = T::make_goal_response_msg(true, time); + let response_msg = T::make_goal_response_msg(false, time); let mut response_msg = WrappedNativeMsg::< <::SendGoal as WrappedServiceTypeSupport>::Response, >::from(&response_msg); @@ -278,11 +278,8 @@ where let mut responses = vec![]; self.active_cancel_requests .retain_mut(|(request_id, msg, fut)| { - println!("checking fut?"); let boxed = fut.boxed(); if let Some(results) = boxed.now_or_never() { - println!("cancel answers: {:?}", results); - let mut response_msg = msg.clone(); let requested_cancels = response_msg.goals_canceling.len(); for r in results { @@ -329,7 +326,6 @@ where // send out responses for (mut request_id, response_msg) in responses { // send out response msg. - println!("sending out cancellation msg\n{:?}", response_msg); let mut native_msg = WrappedNativeMsg::::from(&response_msg); let ret = unsafe { diff --git a/src/lib.rs b/src/lib.rs index 6e41f86..3350d93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,10 @@ mod action_clients; pub use action_clients::ActionClient; use action_clients::*; +mod action_clients_untyped; +pub use action_clients_untyped::ActionClientUntyped; +use action_clients_untyped::*; + mod action_servers; use action_servers::*; pub use action_servers::{ActionServer, CancelRequest, GoalRequest, ServerGoal}; diff --git a/src/msg_types.rs b/src/msg_types.rs index 85e29d6..bcc0829 100644 --- a/src/msg_types.rs +++ b/src/msg_types.rs @@ -18,6 +18,7 @@ pub mod generated_msgs { env!("OUT_DIR"), "/_r2r_generated_service_helper.rs" )); + include!(concat!(env!("OUT_DIR"), "/_r2r_generated_action_helper.rs")); } use generated_msgs::{builtin_interfaces, unique_identifier_msgs}; @@ -138,6 +139,135 @@ impl UntypedServiceSupport { } } +// For now only the client side is implemented. +pub struct UntypedActionSupport { + pub(crate) ts: &'static rosidl_action_type_support_t, + + pub(crate) make_goal_request_msg: Box< + dyn Fn(unique_identifier_msgs::msg::UUID, serde_json::Value) -> WrappedNativeMsgUntyped, + >, + pub(crate) make_goal_response_msg: Box WrappedNativeMsgUntyped>, + pub(crate) destructure_goal_response_msg: + Box (bool, builtin_interfaces::msg::Time)>, + + pub(crate) make_feedback_msg: Box WrappedNativeMsgUntyped>, + pub(crate) destructure_feedback_msg: Box< + dyn Fn( + WrappedNativeMsgUntyped, + ) -> (unique_identifier_msgs::msg::UUID, Result), + >, + + pub(crate) make_result_request_msg: + Box WrappedNativeMsgUntyped>, + pub(crate) make_result_response_msg: Box WrappedNativeMsgUntyped>, + pub(crate) destructure_result_response_msg: + Box (i8, Result)>, +} + +impl UntypedActionSupport { + fn new() -> Self + where + T: WrappedActionTypeSupport, + { + // TODO: this is terrible. These closures perform json (de)serialization just to move the data. + // FIX. + + let make_goal_request_msg = Box::new(|goal_id, goal| { + let goal_msg: T::Goal = + serde_json::from_value(goal).expect("TODO: move this error handling"); + let request_msg = T::make_goal_request_msg(goal_id, goal_msg); + let json = + serde_json::to_value(request_msg.clone()).expect("TODO: move this error handling"); + let mut native_untyped = WrappedNativeMsgUntyped::new::< + <::SendGoal as WrappedServiceTypeSupport>::Request, + >(); + native_untyped + .from_json(json) + .expect("TODO: move this error handling"); + native_untyped + }); + + let make_goal_response_msg = Box::new(|| { + WrappedNativeMsgUntyped::new::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >() + }); + + let destructure_goal_response_msg = Box::new(|msg: WrappedNativeMsgUntyped| { + let msg = unsafe { + <<::SendGoal as WrappedServiceTypeSupport>::Response> + ::from_native(&*(msg.msg as *const <<::SendGoal as + WrappedServiceTypeSupport>::Response as WrappedTypesupport>::CStruct)) + }; + T::destructure_goal_response_msg(msg) + }); + + let make_feedback_msg = Box::new(|| WrappedNativeMsgUntyped::new::()); + + let destructure_feedback_msg = Box::new(|msg: WrappedNativeMsgUntyped| { + let msg = unsafe { + T::FeedbackMessage::from_native( + &*(msg.msg as *const ::CStruct), + ) + }; + let (uuid, feedback) = T::destructure_feedback_msg(msg); + let json = serde_json::to_value(feedback).map_err(|serde_err| Error::SerdeError { + err: serde_err.to_string(), + }); + (uuid, json) + }); + + let make_result_request_msg = Box::new(|uuid_msg: unique_identifier_msgs::msg::UUID| { + let request_msg = T::make_result_request_msg(uuid_msg); + let json = + serde_json::to_value(request_msg.clone()).expect("TODO: move this error handling"); + + let mut native_untyped = WrappedNativeMsgUntyped::new::< + <::GetResult as WrappedServiceTypeSupport>::Request, + >(); + + native_untyped + .from_json(json) + .expect("TODO: move this error handling"); + native_untyped + }); + + let make_result_response_msg = Box::new(|| { + WrappedNativeMsgUntyped::new::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >() + }); + + let destructure_result_response_msg = Box::new(|msg: WrappedNativeMsgUntyped| { + let msg = unsafe { + <<::GetResult as WrappedServiceTypeSupport>::Response> + ::from_native(&*(msg.msg as *const <<::GetResult as + WrappedServiceTypeSupport>::Response as WrappedTypesupport>::CStruct)) + }; + let (status, result) = T::destructure_result_response_msg(msg); + let json = serde_json::to_value(result).map_err(|serde_err| Error::SerdeError { + err: serde_err.to_string(), + }); + (status, json) + }); + + UntypedActionSupport { + ts: T::get_ts(), + make_goal_request_msg, + make_goal_response_msg, + destructure_goal_response_msg, + make_feedback_msg, + destructure_feedback_msg, + make_result_request_msg, + make_result_response_msg, + destructure_result_response_msg, + // destructure_goal_response_msg, + // make_request_msg, + // make_response_msg, + } + } +} + impl WrappedNativeMsgUntyped { fn new() -> Self where @@ -521,4 +651,30 @@ mod tests { let fb2 = Fibonacci::Feedback::from_native(&fb); assert_eq!(fb1, fb2); } + + #[cfg(r2r__example_interfaces__srv__AddTwoInts)] + #[test] + fn test_untyped_service_support() { + let ts = UntypedServiceSupport::new_from("example_interfaces/srv/AddTwoInts").unwrap(); + let msg = (ts.make_request_msg)(); + let json = msg.to_json(); + // the message should contain something (default msg) + assert!(!json.unwrap().to_string().is_empty()); + } + + #[cfg(r2r__example_interfaces__action__Fibonacci)] + #[test] + fn test_untyped_action_support() { + use example_interfaces::action::Fibonacci; + + let ts = UntypedActionSupport::new_from("example_interfaces/action/Fibonacci").unwrap(); + let uuid = unique_identifier_msgs::msg::UUID::default(); + let goal = Fibonacci::Goal { order: 5 }; + let json_goal = serde_json::to_value(&goal).unwrap(); + let json_request = (ts.make_goal_request_msg)(uuid, json_goal) + .to_json() + .unwrap(); + // the message should contain something (default msg) + assert!(!json_request.to_string().is_empty()); + } } diff --git a/src/nodes.rs b/src/nodes.rs index 409df25..06cc4a8 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -306,6 +306,36 @@ impl Node { Ok(c) } + /// Create an action client without having the concrete rust type. + pub fn create_action_client_untyped( + &mut self, + action_name: &str, + action_type: &str, + ) -> Result { + let action_type_support = UntypedActionSupport::new_from(action_type)?; + let client_handle = create_action_client_helper( + self.node_handle.as_mut(), + action_name, + action_type_support.ts, + )?; + let client = WrappedActionClientUntyped { + action_type_support, + rcl_handle: client_handle, + goal_response_channels: Vec::new(), + cancel_response_channels: Vec::new(), + feedback_senders: Vec::new(), + result_senders: Vec::new(), + result_requests: Vec::new(), + goal_status: HashMap::new(), + poll_available_channels: Vec::new(), + }; + + let client_arc = Arc::new(Mutex::new(client)); + self.action_clients.push(client_arc.clone()); + let c = make_action_client_untyped(Arc::downgrade(&client_arc)); + Ok(c) + } + pub fn create_action_server( &mut self, action_name: &str,