Untyped action clients

This commit is contained in:
Martin Dahl 2021-08-18 14:58:49 +02:00
parent 7853cb7161
commit c9585f7a28
12 changed files with 777 additions and 24 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "r2r"
version = "0.3.5"
version = "0.3.6"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Minimal ros2 bindings."
license = "Apache-2.0/MIT"

View File

@ -119,11 +119,13 @@ fn main() {
let untyped_helper = msg_gen::generate_untyped_helper(&msg_list);
let untyped_service_helper = msg_gen::generate_untyped_service_helper(&msg_list);
let untyped_action_helper = msg_gen::generate_untyped_action_helper(&msg_list);
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
let msgs_fn = out_path.join("_r2r_generated_msgs.rs");
let untyped_fn = out_path.join("_r2r_generated_untyped_helper.rs");
let untyped_service_fn = out_path.join("_r2r_generated_service_helper.rs");
let untyped_action_fn = out_path.join("_r2r_generated_action_helper.rs");
let mut f = File::create(msgs_fn).unwrap();
write!(f, "{}", modules).unwrap();
@ -131,4 +133,6 @@ fn main() {
write!(f, "{}", untyped_helper).unwrap();
let mut f = File::create(untyped_service_fn).unwrap();
write!(f, "{}", untyped_service_helper).unwrap();
let mut f = File::create(untyped_action_fn).unwrap();
write!(f, "{}", untyped_action_helper).unwrap();
}

View File

@ -33,7 +33,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.send_goal_request(goal)
.expect("could not send goal request")
.await
.expect("did not get goal");
.expect("goal rejected by server");
println!("goal accepted: {}", goal.uuid);
// process feedback stream in its own task

View File

@ -0,0 +1,93 @@
//
// This example is the same as action_client.rs but stripped of all (explicit) type information.
//
use futures::executor::LocalPool;
use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use r2r;
use std::sync::{Arc, Mutex};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client =
node.create_action_client_untyped("/fibonacci", "example_interfaces/action/Fibonacci")?;
let action_server_available = node.is_available(&client)?;
// signal that we are done
let done = Arc::new(Mutex::new(false));
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let task_spawner = spawner.clone();
let task_done = done.clone();
spawner.spawn_local(async move {
println!("waiting for action service...");
action_server_available
.await
.expect("could not await action server");
println!("action service available.");
let goal = "{ \"order\": 5 }"; // Fibonacci::Goal { order: 5 };
println!("sending goal: {}", goal);
let (goal, result, feedback) = client
.send_goal_request(serde_json::from_str(goal).expect("json malformed"))
.expect("could not send goal request")
.await
.expect("goal rejected by server");
println!("goal accepted: {}", goal.uuid);
// process feedback stream in its own task
let nested_goal = goal.clone();
let nested_task_done = task_done.clone();
task_spawner
.spawn_local(feedback.for_each(move |msg| {
let nested_task_done = nested_task_done.clone();
let nested_goal = nested_goal.clone();
async move {
if let Ok(msg) = msg {
let sequence = msg
.get("sequence")
.and_then(|s| s.as_array())
.expect("unexpected feedback msg");
println!("new feedback msg {} -- {:?}", msg, nested_goal.get_status());
// 50/50 that cancel the goal before it finishes.
if sequence.len() == 4 && rand::random::<bool>() {
nested_goal
.cancel()
.unwrap()
.map(|r| {
println!("goal cancelled: {:?}", r);
// we are done.
*nested_task_done.lock().unwrap() = true;
})
.await;
}
}
}
}))
.unwrap();
// await result in this task
match result.await {
Ok((status, msg)) => {
println!("got result {} with msg {:?}", status, msg);
*task_done.lock().unwrap() = true;
}
Err(e) => println!("action failed: {:?}", e),
}
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
if *done.lock().unwrap() {
break;
}
}
Ok(())
}

View File

@ -49,12 +49,13 @@ async fn fibonacci_server(
"Got goal request with order {}, goal id: {}",
req.goal.order, req.uuid
);
// reject high orders
if req.goal.order >= 100 {
req.reject().unwrap();
// 1/4 chance that we reject the goal for testing.
if rand::random::<bool>() && rand::random::<bool>() {
println!("rejecting goal");
req.reject().expect("could not reject goal");
continue;
}
let (mut g, mut cancel) = req.accept().unwrap();
let (mut g, mut cancel) = req.accept().expect("could not accept goal");
let goal_fut = spawner
.spawn_local_with_handle(run_goal(node.clone(), g.clone()))

View File

@ -591,3 +591,44 @@ impl UntypedServiceSupport {
format!("{}{}{}", open, lines, close)
}
pub fn generate_untyped_action_helper(msgs: &Vec<common::RosMsg>) -> String {
let open = String::from(
"
impl UntypedActionSupport {
pub fn new_from(typename: &str) -> Result<Self> {
",
);
let close = String::from(
"
else
{
return Err(Error::InvalidMessageType{ msgtype: typename.into() })
}
}
}
",
);
let mut lines = String::new();
for msg in msgs {
if msg.prefix != "action" {
continue;
}
let typename = format!("{}/{}/{}", msg.module, msg.prefix, msg.name);
let rustname = format!("{}::{}::{}::Action", msg.module, msg.prefix, msg.name);
lines.push_str(&format!(
"
if typename == \"{typename}\" {{
return Ok(UntypedActionSupport::new::<{rustname}>());
}}
",
typename = typename,
rustname = rustname
));
}
format!("{}{}{}", open, lines, close)
}

View File

@ -85,9 +85,8 @@ where
};
// set up channels
let (goal_req_sender, goal_req_receiver) = oneshot::channel::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>();
let (goal_req_sender, goal_req_receiver) =
oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(10);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) = oneshot::channel::<(GoalStatus, T::Result)>();
@ -102,8 +101,7 @@ where
let future = goal_req_receiver
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
.map(move |r| match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
Ok((accepted, _stamp)) => {
if accepted {
// on goal accept we immediately send the result request
{
@ -123,7 +121,6 @@ where
feedback_receiver,
))
} else {
println!("goal rejected");
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
}
}
@ -204,12 +201,7 @@ where
T: WrappedActionTypeSupport,
{
pub rcl_handle: rcl_action_client_t,
pub goal_response_channels: Vec<(
i64,
oneshot::Sender<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>,
)>,
pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
pub cancel_response_channels:
Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
@ -329,7 +321,8 @@ where
{
let (_, sender) = self.goal_response_channels.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
match sender.send(response) {
let (accept, stamp) = T::destructure_goal_response_msg(response);
match sender.send((accept, stamp)) {
Ok(()) => {}
Err(e) => {
println!("error sending to action client: {:?}", e);

View File

@ -0,0 +1,435 @@
use super::*;
//
// TODO: refactor this to separate out shared code between action client and this.
//
unsafe impl Send for ActionClientUntyped {}
#[derive(Clone)]
pub struct ActionClientUntyped {
client: Weak<Mutex<WrappedActionClientUntyped>>,
}
#[derive(Clone)]
pub struct ClientGoalUntyped {
client: Weak<Mutex<WrappedActionClientUntyped>>,
pub uuid: uuid::Uuid,
}
impl ClientGoalUntyped {
pub fn get_status(&self) -> Result<GoalStatus> {
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let client = client.lock().unwrap();
Ok(client.get_goal_status(&self.uuid))
}
pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_cancel_request(&self.uuid)
}
}
impl ActionClientUntyped {
pub fn send_goal_request(
&self,
goal: serde_json::Value, // T::Goal
) -> Result<
impl Future<
Output = Result<(
ClientGoalUntyped,
impl Future<Output = Result<(GoalStatus, Result<serde_json::Value>)>>, // T::Result
impl Stream<Item = Result<serde_json::Value>> + Unpin, // T::Feedback
)>,
>,
> {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
let uuid = uuid::Uuid::new_v4();
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
};
let native_msg = (client.action_type_support.make_goal_request_msg)(uuid_msg, goal);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
// set up channels
let (goal_req_sender, goal_req_receiver) =
oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) =
oneshot::channel::<(GoalStatus, Result<serde_json::Value>)>();
client.result_senders.push((uuid, result_sender));
if result == RCL_RET_OK as i32 {
client
.goal_response_channels
.push((seq_no, goal_req_sender));
// instead of "canceled" we return invalid client.
let fut_client = Weak::clone(&self.client);
let future = goal_req_receiver
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
.map(move |r| match r {
Ok((accepted, _stamp)) => {
if accepted {
// on goal accept we immediately send the result request
{
let c = fut_client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut c = c.lock().unwrap();
c.send_result_request(uuid.clone());
}
Ok((
ClientGoalUntyped {
client: fut_client,
uuid,
},
result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
feedback_receiver,
))
} else {
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
}
}
Err(e) => Err(e),
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
pub fn make_action_client_untyped(
client: Weak<Mutex<WrappedActionClientUntyped>>,
) -> ActionClientUntyped {
ActionClientUntyped { client }
}
pub struct WrappedActionClientUntyped {
pub action_type_support: UntypedActionSupport,
pub rcl_handle: rcl_action_client_t,
pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
pub cancel_response_channels:
Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<Result<serde_json::Value>>)>,
pub result_requests: Vec<(i64, uuid::Uuid)>,
pub result_senders: Vec<(
uuid::Uuid,
oneshot::Sender<(GoalStatus, Result<serde_json::Value>)>,
)>,
pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
pub poll_available_channels: Vec<oneshot::Sender<()>>,
}
impl WrappedActionClientUntyped {
pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
*self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
}
pub fn send_cancel_request(
&mut self,
goal: &uuid::Uuid,
) -> Result<impl Future<Output = Result<()>>> {
let msg = action_msgs::srv::CancelGoal::Request {
goal_info: action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: goal.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
},
};
let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
if result == RCL_RET_OK as i32 {
let (cancel_req_sender, cancel_req_receiver) =
oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
self.cancel_response_channels
.push((seq_no, cancel_req_sender));
// instead of "canceled" we return invalid client.
let future = cancel_req_receiver
.map_err(|_| Error::RCL_RET_CLIENT_INVALID)
.map(|r| match r {
Ok(r) => match r.return_code {
0 => Ok(()),
1 => Err(Error::GoalCancelRejected),
2 => Err(Error::GoalCancelUnknownGoalID),
3 => Err(Error::GoalCancelAlreadyTerminated),
x => panic!("unknown error code return from action server: {}", x),
},
Err(e) => Err(e),
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl ActionClient_ for WrappedActionClientUntyped {
fn handle(&self) -> &rcl_action_client_t {
&self.rcl_handle
}
fn handle_goal_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = (self.action_type_support.make_goal_response_msg)();
let ret = unsafe {
rcl_action_take_goal_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.goal_response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.goal_response_channels.swap_remove(idx);
let (accept, stamp) =
(self.action_type_support.destructure_goal_response_msg)(response_msg);
match sender.send((accept, stamp)) {
Ok(()) => {}
Err(e) => {
println!("error sending to action client: {:?}", e);
}
}
} else {
let we_have: String = self
.goal_response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_cancel_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
let ret = unsafe {
rcl_action_take_cancel_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.cancel_response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.cancel_response_channels.swap_remove(idx);
let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
match sender.send(response) {
Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e),
_ => (),
}
} else {
let we_have: String = self
.goal_response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_feedback_msg(&mut self) -> () {
let mut feedback_msg = (self.action_type_support.make_feedback_msg)();
let ret =
unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
if ret == RCL_RET_OK as i32 {
let (uuid, feedback) =
(self.action_type_support.destructure_feedback_msg)(feedback_msg);
let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid));
if let Some((_, sender)) = self
.feedback_senders
.iter_mut()
.find(|(uuid, _)| uuid == &msg_uuid)
{
match sender.try_send(feedback) {
Err(e) => eprintln!("warning: could not send feedback msg ({})", e),
_ => (),
}
}
}
}
fn handle_status_msg(&mut self) -> () {
let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
if ret == RCL_RET_OK as i32 {
let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
for a in &arr.status_list {
let uuid =
uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone()));
if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
continue;
}
let status = GoalStatus::from_rcl(a.status);
*self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
}
}
}
fn handle_result_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = (self.action_type_support.make_result_response_msg)();
let ret = unsafe {
rcl_action_take_result_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.result_requests
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, uuid) = self.result_requests.swap_remove(idx);
if let Some(idx) = self
.result_senders
.iter()
.position(|(suuid, _)| suuid == &uuid)
{
let (_, sender) = self.result_senders.swap_remove(idx);
let (status, result) =
(self.action_type_support.destructure_result_response_msg)(response_msg);
let status = GoalStatus::from_rcl(status);
match sender.send((status, result)) {
Ok(()) => {}
Err(e) => {
println!("error sending result to action client: {:?}", e);
}
}
}
} else {
let we_have: String = self
.result_requests
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn send_result_request(&mut self, uuid: uuid::Uuid) -> () {
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
};
let native_msg = (self.action_type_support.make_result_request_msg)(uuid_msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
if result == RCL_RET_OK as i32 {
self.result_requests.push((seq_no, uuid));
} else {
eprintln!("coult not send request {}", result);
}
}
fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
self.poll_available_channels.push(s);
}
fn poll_available(&mut self, node: &mut rcl_node_t) {
if self.poll_available_channels.is_empty() {
return;
}
let available = action_server_available_helper(node, self.handle());
match available {
Ok(true) => {
// send ok and close channels
while let Some(sender) = self.poll_available_channels.pop() {
let _res = sender.send(()); // we ignore if receiver dropped.
}
}
Ok(false) => {
// not available...
}
Err(_) => {
// error, close all channels
self.poll_available_channels.clear();
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_action_client_fini(&mut self.rcl_handle, node);
}
}
}
use crate::nodes::IsAvailablePollable;
impl IsAvailablePollable for ActionClientUntyped {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.register_poll_available(sender);
Ok(())
}
}

View File

@ -149,7 +149,7 @@ where
let server = self.server.upgrade().unwrap(); // todo fixme
let mut server = server.lock().unwrap();
let response_msg = T::make_goal_response_msg(true, time);
let response_msg = T::make_goal_response_msg(false, time);
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::from(&response_msg);
@ -278,11 +278,8 @@ where
let mut responses = vec![];
self.active_cancel_requests
.retain_mut(|(request_id, msg, fut)| {
println!("checking fut?");
let boxed = fut.boxed();
if let Some(results) = boxed.now_or_never() {
println!("cancel answers: {:?}", results);
let mut response_msg = msg.clone();
let requested_cancels = response_msg.goals_canceling.len();
for r in results {
@ -329,7 +326,6 @@ where
// send out responses
for (mut request_id, response_msg) in responses {
// send out response msg.
println!("sending out cancellation msg\n{:?}", response_msg);
let mut native_msg =
WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::from(&response_msg);
let ret = unsafe {

View File

@ -53,6 +53,10 @@ mod action_clients;
pub use action_clients::ActionClient;
use action_clients::*;
mod action_clients_untyped;
pub use action_clients_untyped::ActionClientUntyped;
use action_clients_untyped::*;
mod action_servers;
use action_servers::*;
pub use action_servers::{ActionServer, CancelRequest, GoalRequest, ServerGoal};

View File

@ -18,6 +18,7 @@ pub mod generated_msgs {
env!("OUT_DIR"),
"/_r2r_generated_service_helper.rs"
));
include!(concat!(env!("OUT_DIR"), "/_r2r_generated_action_helper.rs"));
}
use generated_msgs::{builtin_interfaces, unique_identifier_msgs};
@ -138,6 +139,135 @@ impl UntypedServiceSupport {
}
}
// For now only the client side is implemented.
pub struct UntypedActionSupport {
pub(crate) ts: &'static rosidl_action_type_support_t,
pub(crate) make_goal_request_msg: Box<
dyn Fn(unique_identifier_msgs::msg::UUID, serde_json::Value) -> WrappedNativeMsgUntyped,
>,
pub(crate) make_goal_response_msg: Box<dyn Fn() -> WrappedNativeMsgUntyped>,
pub(crate) destructure_goal_response_msg:
Box<dyn Fn(WrappedNativeMsgUntyped) -> (bool, builtin_interfaces::msg::Time)>,
pub(crate) make_feedback_msg: Box<dyn Fn() -> WrappedNativeMsgUntyped>,
pub(crate) destructure_feedback_msg: Box<
dyn Fn(
WrappedNativeMsgUntyped,
) -> (unique_identifier_msgs::msg::UUID, Result<serde_json::Value>),
>,
pub(crate) make_result_request_msg:
Box<dyn Fn(unique_identifier_msgs::msg::UUID) -> WrappedNativeMsgUntyped>,
pub(crate) make_result_response_msg: Box<dyn Fn() -> WrappedNativeMsgUntyped>,
pub(crate) destructure_result_response_msg:
Box<dyn Fn(WrappedNativeMsgUntyped) -> (i8, Result<serde_json::Value>)>,
}
impl UntypedActionSupport {
fn new<T>() -> Self
where
T: WrappedActionTypeSupport,
{
// TODO: this is terrible. These closures perform json (de)serialization just to move the data.
// FIX.
let make_goal_request_msg = Box::new(|goal_id, goal| {
let goal_msg: T::Goal =
serde_json::from_value(goal).expect("TODO: move this error handling");
let request_msg = T::make_goal_request_msg(goal_id, goal_msg);
let json =
serde_json::to_value(request_msg.clone()).expect("TODO: move this error handling");
let mut native_untyped = WrappedNativeMsgUntyped::new::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
>();
native_untyped
.from_json(json)
.expect("TODO: move this error handling");
native_untyped
});
let make_goal_response_msg = Box::new(|| {
WrappedNativeMsgUntyped::new::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>()
});
let destructure_goal_response_msg = Box::new(|msg: WrappedNativeMsgUntyped| {
let msg = unsafe {
<<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response>
::from_native(&*(msg.msg as *const <<<T as WrappedActionTypeSupport>::SendGoal as
WrappedServiceTypeSupport>::Response as WrappedTypesupport>::CStruct))
};
T::destructure_goal_response_msg(msg)
});
let make_feedback_msg = Box::new(|| WrappedNativeMsgUntyped::new::<T::FeedbackMessage>());
let destructure_feedback_msg = Box::new(|msg: WrappedNativeMsgUntyped| {
let msg = unsafe {
T::FeedbackMessage::from_native(
&*(msg.msg as *const <T::FeedbackMessage as WrappedTypesupport>::CStruct),
)
};
let (uuid, feedback) = T::destructure_feedback_msg(msg);
let json = serde_json::to_value(feedback).map_err(|serde_err| Error::SerdeError {
err: serde_err.to_string(),
});
(uuid, json)
});
let make_result_request_msg = Box::new(|uuid_msg: unique_identifier_msgs::msg::UUID| {
let request_msg = T::make_result_request_msg(uuid_msg);
let json =
serde_json::to_value(request_msg.clone()).expect("TODO: move this error handling");
let mut native_untyped = WrappedNativeMsgUntyped::new::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
>();
native_untyped
.from_json(json)
.expect("TODO: move this error handling");
native_untyped
});
let make_result_response_msg = Box::new(|| {
WrappedNativeMsgUntyped::new::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>()
});
let destructure_result_response_msg = Box::new(|msg: WrappedNativeMsgUntyped| {
let msg = unsafe {
<<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response>
::from_native(&*(msg.msg as *const <<<T as WrappedActionTypeSupport>::GetResult as
WrappedServiceTypeSupport>::Response as WrappedTypesupport>::CStruct))
};
let (status, result) = T::destructure_result_response_msg(msg);
let json = serde_json::to_value(result).map_err(|serde_err| Error::SerdeError {
err: serde_err.to_string(),
});
(status, json)
});
UntypedActionSupport {
ts: T::get_ts(),
make_goal_request_msg,
make_goal_response_msg,
destructure_goal_response_msg,
make_feedback_msg,
destructure_feedback_msg,
make_result_request_msg,
make_result_response_msg,
destructure_result_response_msg,
// destructure_goal_response_msg,
// make_request_msg,
// make_response_msg,
}
}
}
impl WrappedNativeMsgUntyped {
fn new<T>() -> Self
where
@ -521,4 +651,30 @@ mod tests {
let fb2 = Fibonacci::Feedback::from_native(&fb);
assert_eq!(fb1, fb2);
}
#[cfg(r2r__example_interfaces__srv__AddTwoInts)]
#[test]
fn test_untyped_service_support() {
let ts = UntypedServiceSupport::new_from("example_interfaces/srv/AddTwoInts").unwrap();
let msg = (ts.make_request_msg)();
let json = msg.to_json();
// the message should contain something (default msg)
assert!(!json.unwrap().to_string().is_empty());
}
#[cfg(r2r__example_interfaces__action__Fibonacci)]
#[test]
fn test_untyped_action_support() {
use example_interfaces::action::Fibonacci;
let ts = UntypedActionSupport::new_from("example_interfaces/action/Fibonacci").unwrap();
let uuid = unique_identifier_msgs::msg::UUID::default();
let goal = Fibonacci::Goal { order: 5 };
let json_goal = serde_json::to_value(&goal).unwrap();
let json_request = (ts.make_goal_request_msg)(uuid, json_goal)
.to_json()
.unwrap();
// the message should contain something (default msg)
assert!(!json_request.to_string().is_empty());
}
}

View File

@ -306,6 +306,36 @@ impl Node {
Ok(c)
}
/// Create an action client without having the concrete rust type.
pub fn create_action_client_untyped(
&mut self,
action_name: &str,
action_type: &str,
) -> Result<ActionClientUntyped> {
let action_type_support = UntypedActionSupport::new_from(action_type)?;
let client_handle = create_action_client_helper(
self.node_handle.as_mut(),
action_name,
action_type_support.ts,
)?;
let client = WrappedActionClientUntyped {
action_type_support,
rcl_handle: client_handle,
goal_response_channels: Vec::new(),
cancel_response_channels: Vec::new(),
feedback_senders: Vec::new(),
result_senders: Vec::new(),
result_requests: Vec::new(),
goal_status: HashMap::new(),
poll_available_channels: Vec::new(),
};
let client_arc = Arc::new(Mutex::new(client));
self.action_clients.push(client_arc.clone());
let c = make_action_client_untyped(Arc::downgrade(&client_arc));
Ok(c)
}
pub fn create_action_server<T: 'static>(
&mut self,
action_name: &str,