minimal action clients. no handling or failed goals or canceling.

This commit is contained in:
Martin Dahl 2021-06-09 14:23:26 +02:00
parent afb62321e5
commit c3be77224e
10 changed files with 717 additions and 27 deletions

View File

@ -15,6 +15,8 @@ lazy_static = "1.4.0"
common = { path = "common", version = "0.0.3" } common = { path = "common", version = "0.0.3" }
rcl = { path = "rcl", version = "0.0.3" } rcl = { path = "rcl", version = "0.0.3" }
msg_gen = { path = "msg_gen", version = "0.0.3" } msg_gen = { path = "msg_gen", version = "0.0.3" }
actions = { path = "actions", version = "0.0.1" }
uuid = { version = "0.8", features = ["serde", "v4"] }
[dev-dependencies] [dev-dependencies]
serde_json = "1.0.62" serde_json = "1.0.62"

16
actions/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "actions"
version = "0.0.1"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rcl = { path = "../rcl", version = "0.0.3" }
msg_gen = { path = "../msg_gen", version = "0.0.3" }
[build-dependencies]
bindgen = "0.57.0"
itertools = "0.10.0"
common = { path = "../common", version = "0.0.3" }

82
actions/build.rs Normal file
View File

@ -0,0 +1,82 @@
use bindgen;
use itertools::Itertools;
use std::env;
use std::path::{Path, PathBuf};
fn main() {
let mut builder = bindgen::Builder::default()
.header("src/action_wrapper.h")
.derive_copy(false)
.size_t_is_usize(true)
.default_enum_style(bindgen::EnumVariation::Rust {
non_exhaustive: false,
});
if let Some(cmake_includes) = env::var("CMAKE_INCLUDE_DIRS").ok() {
// we are running from cmake, do special thing.
let mut includes = cmake_includes.split(":").collect::<Vec<_>>();
includes.sort();
includes.dedup();
for x in &includes {
let clang_arg = format!("-I{}", x);
println!("adding clang arg: {}", clang_arg);
builder = builder.clang_arg(clang_arg);
}
env::var("CMAKE_LIBRARIES")
.unwrap_or(String::new())
.split(":")
.into_iter()
.filter(|s| s.contains(".so") || s.contains(".dylib"))
.flat_map(|l| Path::new(l).parent().and_then(|p| p.to_str()))
.unique()
.for_each(|pp| {
println!("cargo:rustc-link-search=native={}", pp)
// we could potentially do the below instead of hardcoding which libs we rely on.
// let filename = path.file_stem().and_then(|f| f.to_str()).unwrap();
// let without_lib = filename.strip_prefix("lib").unwrap();
// println!("cargo:rustc-link-lib=dylib={}", without_lib);
});
} else {
let ament_prefix_var_name = "AMENT_PREFIX_PATH";
let ament_prefix_var = env::var(ament_prefix_var_name).expect("Source your ROS!");
for ament_prefix_path in ament_prefix_var.split(":") {
builder = builder.clang_arg(format!("-I{}/include", ament_prefix_path));
println!(
"added include search dir: {}",
format!("-I{}/include", ament_prefix_path)
);
println!("cargo:rustc-link-search=native={}/lib", ament_prefix_path);
}
}
println!("cargo:rustc-link-lib=dylib=rcl_action");
let bindings = builder
.no_debug("_OSUnaligned.*")
// whitelist a few specific things that we need.
.whitelist_recursively(false)
.whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t")
.whitelist_type("rcl_action_server_t").opaque_type("rcl_action_server_t")
.whitelist_type("rcl_action_goal_info_t").opaque_type("rcl_action_goal_info_t")
.whitelist_type("rcl_action_goal_handle_t").opaque_type("rcl_action_goal_handle_t")
.whitelist_type("rcl_action_cancel_request_t").opaque_type("rcl_action_cancel_request_t")
.whitelist_type("rcl_action_cancel_response_t").opaque_type("rcl_action_cancel_response_t")
.whitelist_type("rcl_action_goal_info_t").opaque_type("rcl_action_goal_info_t")
.whitelist_type("rcl_action_goal_event_t").opaque_type("rcl_action_goal_event_t")
.whitelist_type("rcl_action_goal_state_t").opaque_type("rcl_action_goal_state_t")
.whitelist_type("rcl_action_goal_status_array_t").opaque_type("rcl_action_goal_status_array_t")
.whitelist_function("rcl_action_.*")
// .whitelist_type("rosidl_action_type_support_t").opaque_type("rosidl_action_type_support_t")
.whitelist_type("rcl_action_client_options_t")
.whitelist_type("rcl_action_server_options_t")
.generate()
.expect("Unable to generate bindings");
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("action_bindings.rs"))
.expect("Couldn't write bindings!");
}

View File

@ -0,0 +1 @@
#include <rcl_action/rcl_action.h>

9
actions/src/lib.rs Normal file
View File

@ -0,0 +1,9 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(improper_ctypes)]
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/action_bindings.rs"));
use rcl::*;
use msg_gen::*;

View File

@ -57,6 +57,27 @@ fn main() {
codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &msgname)); codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &msgname));
println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg);
} }
// "internal" services that implements the action type
for srv in &["SendGoal", "GetResult"] {
codegen.push_str("#[allow(non_snake_case)]\n");
codegen.push_str(&format!(" pub mod {} {{\n", srv));
codegen.push_str(" use super::super::super::super::*;\n");
let srvname = format!("{}_{}", msg, srv);
codegen.push_str(&msg_gen::generate_rust_service(module, prefix, &srvname));
for s in &["Request", "Response"] {
let msgname = format!("{}_{}_{}", msg, srv, s);
codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &msgname));
}
codegen.push_str(" }\n");
}
// also "internal" feedback message type that wraps the feedback type with a uuid
let feedback_msgname = format!("{}_FeedbackMessage", msg);
codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &feedback_msgname));
codegen.push_str(" }\n"); codegen.push_str(" }\n");
} }
} else if prefix == &"srv" { } else if prefix == &"srv" {

48
examples/action_client.rs Normal file
View File

@ -0,0 +1,48 @@
use r2r;
use r2r::example_interfaces::action::Fibonacci;
use std::cell::RefCell;
use std::rc::Rc;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_action_client::<Fibonacci::Action>("/fibonacci")?;
println!("waiting for action service...");
while !node.action_server_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(500));
}
println!("action service available.");
let goal = Fibonacci::Goal { order: 10 };
let goal_accepted = Rc::new(RefCell::new(None));
let cb_ga = goal_accepted.clone();
let cb = Box::new(move |r: Fibonacci::SendGoal::Response| {
println!("got response {:?}", r);
*cb_ga.borrow_mut() = Some(r.accepted);
});
let feedback_cb = Box::new(move |fb: Fibonacci::Feedback| {
println!("got feedback {:?}", fb);
});
let result_cb = Box::new(move |r: Fibonacci::Result| {
println!("final result {:?}", r);
});
println!("sending goal: {:?}", goal);
client.send_goal_request(goal, cb, feedback_cb, result_cb)?;
let mut c = 0;
loop {
node.spin_once(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(1000));
c += 1;
if c > 100 {
println!("shutdown");
break;
}
}
Ok(())
}

View File

@ -82,11 +82,21 @@ fn main() {
introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val)); introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val));
} }
} else if msg.prefix == "action" { } else if msg.prefix == "action" {
for s in &["Goal", "Result", "Feedback"] { for s in &["Goal", "Result", "Feedback", "FeedbackMessage"] {
let key = &format!("{}__{}__{}_{}", &msg.module, &msg.prefix, &msg.name, s); let key = &format!("{}__{}__{}_{}", &msg.module, &msg.prefix, &msg.name, s);
let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}_{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name, s); let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}_{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name, s);
introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val)); introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val));
} }
// "internal" services
for srv in &["SendGoal", "GetResult"] {
// TODO: refactor this is copy paste from services...
for s in &["Request", "Response"] {
let msgname = format!("{}_{}_{}", msg.name, srv, s);
let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, msgname);
let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, msgname);
introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val));
}
}
} else { } else {
let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, &msg.name); let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, &msg.name);
let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name); let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name);

View File

@ -50,9 +50,9 @@ fn field_type(t: u8) -> String {
} else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_DOUBLE as u8) { } else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_DOUBLE as u8) {
"f64".to_owned() "f64".to_owned()
} else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_LONG_DOUBLE as u8) { } else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_LONG_DOUBLE as u8) {
// f128 does not exist in rust
"u128".to_owned() "u128".to_owned()
} }
// f128 does not exist in rust
else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) { else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) {
"message".to_owned() "message".to_owned()
} else { } else {
@ -129,15 +129,42 @@ pub fn generate_rust_action(module_: &str, prefix_: &str, name_: &str) -> String
type Goal = Goal; type Goal = Goal;
type Result = Result; type Result = Result;
type Feedback = Feedback; type Feedback = Feedback;
// internal structs
type FeedbackMessage = FeedbackMessage;
type SendGoal = SendGoal::Service;
type GetResult = GetResult::Service;
fn get_ts() -> &'static rosidl_action_type_support_t {{ fn get_ts() -> &'static rosidl_action_type_support_t {{
unsafe {{ unsafe {{
&*rosidl_typesupport_c__get_action_type_support_handle__{}__{}__{}() &*rosidl_typesupport_c__get_action_type_support_handle__{module}__{prefix}__{name}()
}} }}
}} }}
fn make_goal_request_msg(goal_id: unique_identifier_msgs::msg::UUID, goal: Goal) -> SendGoal::Request {{
SendGoal::Request {{
goal_id,
goal
}}
}}
fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) -> GetResult::Request {{
GetResult::Request {{
goal_id,
}}
}}
fn destructure_feedback_msg(msg: FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Feedback) {{
(msg.goal_id, msg.feedback)
}}
fn destructure_result_response_msg(msg: GetResult::Response) -> (i8, Result) {{
(msg.status, msg.result)
}}
}} }}
", ",
module_, prefix_, name_ module = module_, prefix = prefix_, name = name_
) )
} }
@ -146,7 +173,7 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
let key = format!("{}__{}__{}", module_, prefix_, name_); let key = format!("{}__{}__{}", module_, prefix_, name_);
let ptr = INTROSPECTION_FNS let ptr = INTROSPECTION_FNS
.get(key.as_str()) .get(key.as_str())
.expect(&format!("code generation error: {}", name_)); .expect(&format!("code generation error: {}", key));
let ptr = *ptr as *const i32 as *const rosidl_message_type_support_t; let ptr = *ptr as *const i32 as *const rosidl_message_type_support_t;
unsafe { unsafe {
let (module, prefix, mut name, c_struct, members) = introspection(ptr); let (module, prefix, mut name, c_struct, members) = introspection(ptr);
@ -159,14 +186,15 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
// we only want to keep the last part. // we only want to keep the last part.
// same for actions with _Goal, _Result, _Feedback // same for actions with _Goal, _Result, _Feedback
// TODO: refactor... // TODO: refactor...
let mut nn = name.splitn(2, "_"); // handle special case of ActionName_ServiceName_Response
let _mod_name = nn let nn = name.splitn(3, "_").collect::<Vec<&str>>();
.next() if let [ _mod_name, _srv_name, msg_name ] = &nn[..] {
.expect(&format!("malformed service name {}", name)); name = msg_name.to_string();
let msg_name = nn } else if let [ _mod_name, msg_name ] = &nn[..] {
.next() name = msg_name.to_string();
.expect(&format!("malformed service name {}", name)); } else {
name = msg_name.to_owned(); panic!("malformed service name {}", name);
}
} }
let mut fields = String::new(); let mut fields = String::new();
@ -176,12 +204,32 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
let rust_field_type = field_type(member.type_id_); let rust_field_type = field_type(member.type_id_);
let rust_field_type = if rust_field_type == "message" { let rust_field_type = if rust_field_type == "message" {
let (module, prefix, name, _, _) = introspection(member.members_); let (module, prefix, name, _, _) = introspection(member.members_);
format!( // hack here to rustify nested action type names
"{module}::{prefix}::{msgname}", if prefix == "action" {
module = module, if let Some((n1,n2)) = name.rsplit_once("_") {
prefix = prefix, format!("{module}::{prefix}::{srvname}::{msgname}",
msgname = name module = module,
) prefix = prefix,
srvname = n1,
msgname = n2
)
}
else {
format!(
"{module}::{prefix}::{msgname}",
module = module,
prefix = prefix,
msgname = name
)
}
} else {
format!(
"{module}::{prefix}::{msgname}",
module = module,
prefix = prefix,
msgname = name
)
}
} else { } else {
rust_field_type rust_field_type
}; };
@ -266,7 +314,16 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
)); ));
} else if rust_field_type == "message" { } else if rust_field_type == "message" {
let (module, prefix, name, _, _) = introspection(member.members_); let (module, prefix, name, _, _) = introspection(member.members_);
from_native.push_str(&format!("{field_name}: {module}::{prefix}::{msgname}::from_native(&msg.{field_name}),\n", field_name = field_name, module = module, prefix=prefix, msgname = name)); // same hack as above to rustify message type names
if prefix == "action" {
if let Some((n1,n2)) = name.rsplit_once("_") {
from_native.push_str(&format!("{field_name}: {module}::{prefix}::{srvname}::{msgname}::from_native(&msg.{field_name}),\n", field_name = field_name, module = module, prefix=prefix, srvname = n1, msgname = n2));
} else {
panic!("ooops at from_native");
}
} else {
from_native.push_str(&format!("{field_name}: {module}::{prefix}::{msgname}::from_native(&msg.{field_name}),\n", field_name = field_name, module = module, prefix=prefix, msgname = name));
}
} else { } else {
from_native.push_str(&format!( from_native.push_str(&format!(
"{field_name}: msg.{field_name},\n", "{field_name}: msg.{field_name},\n",

View File

@ -11,9 +11,11 @@ use std::marker::PhantomData;
use std::mem::MaybeUninit; use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::time::Duration; use std::time::Duration;
use std::fmt::Debug;
use msg_gen::*; use msg_gen::*;
use rcl::*; use rcl::*;
use actions::*;
mod error; mod error;
use error::*; use error::*;
@ -23,7 +25,7 @@ pub use utils::*;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned + Default { pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned + Default + Debug {
type CStruct; type CStruct;
fn get_ts() -> &'static rosidl_message_type_support_t; fn get_ts() -> &'static rosidl_message_type_support_t;
@ -45,7 +47,20 @@ pub trait WrappedActionTypeSupport {
type Result: WrappedTypesupport; type Result: WrappedTypesupport;
type Feedback: WrappedTypesupport; type Feedback: WrappedTypesupport;
// internal...
type FeedbackMessage: WrappedTypesupport;
type SendGoal: WrappedServiceTypeSupport;
type GetResult: WrappedServiceTypeSupport;
fn get_ts() -> &'static rosidl_action_type_support_t; fn get_ts() -> &'static rosidl_action_type_support_t;
fn make_goal_request_msg(goal_id: unique_identifier_msgs::msg::UUID, goal: Self::Goal) ->
<<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request;
fn make_result_request_msg(goal_id: unique_identifier_msgs::msg::UUID) ->
<<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request;
fn destructure_feedback_msg(msg: Self::FeedbackMessage) -> (unique_identifier_msgs::msg::UUID, Self::Feedback);
fn destructure_result_response_msg(msg: <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response) ->
(i8, Self::Result);
} }
#[derive(Debug)] #[derive(Debug)]
@ -411,6 +426,206 @@ where
} }
} }
// action clients
#[derive(Debug, Copy, Clone, PartialEq)]
enum GoalStatus {
Unknown,
Accepted,
Executing,
Canceling,
Succeeded,
Canceled,
Aborted,
}
impl GoalStatus {
#[allow(dead_code)]
pub fn to_rcl(&self) -> i8 {
match self {
GoalStatus::Unknown => 0,
GoalStatus::Accepted => 1,
GoalStatus::Executing => 2,
GoalStatus::Canceling => 3,
GoalStatus::Succeeded => 4,
GoalStatus::Canceled => 5,
GoalStatus::Aborted => 6,
}
}
pub fn from_rcl(s: i8) -> Self {
match s {
0 => GoalStatus::Unknown,
1 => GoalStatus::Accepted,
2 => GoalStatus::Executing,
3 => GoalStatus::Canceling,
4 => GoalStatus::Succeeded,
5 => GoalStatus::Canceled,
6 => GoalStatus::Aborted,
_ => panic!("unknown action status: {}", s),
}
}
}
struct WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
rcl_handle: rcl_action_client_t,
goal_request_callbacks: Vec<(i64, Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response)>)>,
feedback_callbacks: Vec<(uuid::Uuid, Box<dyn FnMut(T::Feedback) -> ()>)>,
goal_statuses: Vec<(uuid::Uuid, GoalStatus)>,
result_request_callbacks: Vec<(i64, Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response)>)>,
result_callbacks: Vec<(uuid::Uuid, Box<dyn FnOnce(T::Result) -> ()>)>,
}
pub trait ActionClient_ {
fn handle(&self) -> &rcl_action_client_t;
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
fn handle_goal_response(&mut self) -> ();
fn handle_feedback_msg(&mut self) -> ();
fn handle_result_response(&mut self) -> ();
fn run_result_request(&mut self, uuid: &uuid::Uuid) -> ();
}
use std::convert::TryInto;
fn vec_to_uuid_bytes<T>(v: Vec<T>) -> [T; 16] {
v.try_into()
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", 16, v.len()))
}
impl<T: 'static> WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn send_result_request(&mut self, uuid: &uuid::Uuid, cb: Box<dyn FnOnce(T::Result) -> ()>) -> Result<()> {
let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() };
let request_msg = T::make_result_request_msg(uuid_msg);
let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::GetResult as
WrappedServiceTypeSupport>::Request>::from(&request_msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
if result == RCL_RET_OK as i32 {
self.result_request_callbacks.push((seq_no, Box::new(move |r: <<T as WrappedActionTypeSupport>::GetResult as
WrappedServiceTypeSupport>::Response| {
let (status, result) = T::destructure_result_response_msg(r);
let status = GoalStatus::from_rcl(status);
println!("status: {:?}, result: {:?}", status, result);
(cb)(result);
})));
Ok(())
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl<T: 'static> ActionClient_ for WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
fn handle(&self) -> &rcl_action_client_t {
&self.rcl_handle
}
fn handle_goal_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response>::new();
let ret = unsafe {
rcl_action_take_goal_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.goal_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, cb_to_run) = self.goal_request_callbacks.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
(cb_to_run)(response);
} else {
// I don't think this should be able to occur? Let's panic so we
// find out...
let we_have: String = self
.goal_request_callbacks
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_feedback_msg(&mut self) -> () {
let mut feedback_msg = WrappedNativeMsg::<T::FeedbackMessage>::new();
let ret = unsafe {
rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let msg = T::FeedbackMessage::from_native(&feedback_msg);
let (uuid, feedback) = T::destructure_feedback_msg(msg);
let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid));
if let Some((_,cb)) = self.feedback_callbacks.iter_mut().find(|(uuid, _)| uuid == &msg_uuid) {
(cb)(feedback);
}
}
}
fn handle_result_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response>::new();
let ret = unsafe {
rcl_action_take_result_response(&self.rcl_handle, request_id.as_mut_ptr(), response_msg.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self.result_request_callbacks.iter().position(|(id, _)| id == &request_id.sequence_number) {
let (_, cb_to_run) = self.result_request_callbacks.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
(cb_to_run)(response);
} else {
// I don't think this should be able to occur? Let's panic so we
// find out...
let we_have: String = self
.result_request_callbacks
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn run_result_request(&mut self, uuid: &uuid::Uuid) -> () {
if let Some(idx) = self.result_callbacks.iter().position(|(cb_uuid, _)| cb_uuid == uuid) {
let (_, result_cb) = self.result_callbacks.swap_remove(idx);
println!("asking for final result for {}", uuid);
self.send_result_request(uuid, result_cb).unwrap(); // TODO error handling.
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_action_client_fini(&mut self.rcl_handle, node);
}
}
}
// The publish function is thread safe. ROS2 docs state: // The publish function is thread safe. ROS2 docs state:
// ============= // =============
// //
@ -465,6 +680,15 @@ where
client_: Weak<Mutex<WrappedClient<T>>>, client_: Weak<Mutex<WrappedClient<T>>>,
} }
unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
pub struct ActionClient<T>
where
T: WrappedActionTypeSupport,
{
client_: Weak<Mutex<WrappedActionClient<T>>>,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
context_handle: Arc<Mutex<ContextHandle>>, context_handle: Arc<Mutex<ContextHandle>>,
@ -637,6 +861,8 @@ pub struct Node {
services: Vec<Box<dyn Service>>, services: Vec<Box<dyn Service>>,
// clients with hack to avoid locking just to wait..., // clients with hack to avoid locking just to wait...,
clients: Vec<(rcl_client_t, Arc<Mutex<dyn Client_>>)>, clients: Vec<(rcl_client_t, Arc<Mutex<dyn Client_>>)>,
// action clients
action_clients: Vec<(rcl_action_client_t, Arc<Mutex<dyn ActionClient_>>)>,
// timers, // timers,
timers: Vec<Timer>, timers: Vec<Timer>,
// and the publishers, whom we allow to be shared.. hmm. // and the publishers, whom we allow to be shared.. hmm.
@ -767,6 +993,7 @@ impl Node {
subs: Vec::new(), subs: Vec::new(),
services: Vec::new(), services: Vec::new(),
clients: Vec::new(), clients: Vec::new(),
action_clients: Vec::new(),
timers: Vec::new(), timers: Vec::new(),
pubs: Vec::new(), pubs: Vec::new(),
}; };
@ -981,6 +1208,76 @@ impl Node {
} }
} }
pub fn create_action_client_helper(
&mut self,
action_name: &str,
action_ts: *const rosidl_action_type_support_t,
) -> Result<rcl_action_client_t> {
let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() };
let action_name_c_string =
CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let client_options = rcl_action_client_get_default_options();
rcl_action_client_init(
&mut client_handle,
self.node_handle.as_mut(),
action_ts,
action_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
} else {
Err(Error::from_rcl_error(result))
}
}
pub fn create_action_client<T: 'static>(&mut self, action_name: &str) -> Result<ActionClient<T>>
where
T: WrappedActionTypeSupport,
{
let client_handle = self.create_action_client_helper(action_name, T::get_ts())?;
let cloned_handle = unsafe { core::ptr::read(&client_handle) };
let wa = WrappedActionClient::<T> {
rcl_handle: cloned_handle,
goal_request_callbacks: Vec::new(),
feedback_callbacks: Vec::new(),
goal_statuses: Vec::new(),
result_request_callbacks: Vec::new(),
result_callbacks: Vec::new(),
};
let arc = Arc::new(Mutex::new(wa));
let client_ = Arc::downgrade(&arc);
self.action_clients.push((client_handle, arc));
let c = ActionClient { client_ };
Ok(c)
}
pub fn action_server_available<T: 'static + WrappedActionTypeSupport>(
&self,
client: &ActionClient<T>,
) -> Result<bool> {
let client = client
.client_
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
let mut avail = false;
let result = unsafe {
rcl_action_server_is_available(self.node_handle.as_ref(), client.handle(), &mut avail)
};
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn create_publisher_helper( pub fn create_publisher_helper(
&mut self, &mut self,
topic: &str, topic: &str,
@ -1036,6 +1333,19 @@ impl Node {
Ok(p) Ok(p)
} }
fn get_num_waits(rcl_handle: &rcl_action_client_t, num_subs: &mut usize, num_gc: &mut usize , num_timers: &mut usize,
num_clients: &mut usize , num_services: &mut usize) -> Result<()> {
unsafe {
let result = rcl_action_client_wait_set_get_num_entities(rcl_handle, num_subs, num_gc,
num_timers, num_clients, num_services);
if result == RCL_RET_OK as i32 {
Ok(())
} else {
Err(Error::from_rcl_error(result))
}
}
}
pub fn spin_once(&mut self, timeout: Duration) { pub fn spin_once(&mut self, timeout: Duration) {
let timeout = timeout.as_nanos() as i64; let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
@ -1046,16 +1356,39 @@ impl Node {
// #[doc = "* For example, calling rcl_wait() in two threads on two different wait sets"] // #[doc = "* For example, calling rcl_wait() in two threads on two different wait sets"]
// #[doc = "* that both contain a single, shared guard condition is undefined behavior."] // #[doc = "* that both contain a single, shared guard condition is undefined behavior."]
// count action client wait set needs
let mut total_action_subs = 0;
let mut total_action_clients = 0;
for (ach, _) in &self.action_clients {
let mut num_subs = 0;
let mut num_gc = 0;
let mut num_timers = 0;
let mut num_clients = 0;
let mut num_services = 0;
Self::get_num_waits(ach, &mut num_subs, &mut num_gc, &mut num_timers, &mut num_clients, &mut num_services)
.expect("could not get action client wait sets");
// sanity check
assert_eq!(num_subs, 2);
assert_eq!(num_clients, 3);
assert_eq!(num_gc, 0);
assert_eq!(num_timers, 0);
assert_eq!(num_services, 0);
total_action_subs += num_subs;
total_action_clients += num_clients;
}
{ {
let mut ctx = self.context.context_handle.lock().unwrap(); let mut ctx = self.context.context_handle.lock().unwrap();
unsafe { unsafe {
rcl_wait_set_init( rcl_wait_set_init(
&mut ws, &mut ws,
self.subs.len(), self.subs.len() + total_action_subs,
0, 0,
self.timers.len(), self.timers.len(),
self.clients.len(), self.clients.len() + total_action_clients,
self.services.len(), self.services.len(),
0, 0,
ctx.as_mut(), ctx.as_mut(),
@ -1091,6 +1424,16 @@ impl Node {
} }
} }
// code below assumes that actions are added last... perhaps a bad assumption.
for (ac, _) in self.action_clients.iter() {
unsafe {
rcl_action_wait_set_add_action_client(&mut ws,
ac,
std::ptr::null_mut(),
std::ptr::null_mut());
}
}
let ret = unsafe { rcl_wait(&mut ws, timeout) }; let ret = unsafe { rcl_wait(&mut ws, timeout) };
if ret == RCL_RET_TIMEOUT as i32 { if ret == RCL_RET_TIMEOUT as i32 {
@ -1101,8 +1444,7 @@ impl Node {
} }
let ws_subs = let ws_subs =
unsafe { std::slice::from_raw_parts(ws.subscriptions, ws.size_of_subscriptions) }; unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subs.len()) };
assert_eq!(ws_subs.len(), self.subs.len());
let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg_info = rmw_message_info_t::default(); // we dont care for now
for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
@ -1139,8 +1481,7 @@ impl Node {
} }
} }
let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, ws.size_of_clients) }; let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) };
assert_eq!(ws_clients.len(), self.clients.len());
for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let mut s = s.lock().unwrap(); let mut s = s.lock().unwrap();
@ -1166,6 +1507,64 @@ impl Node {
} }
} }
for (ac, s) in &self.action_clients {
let mut is_feedback_ready = false;
let mut is_status_ready = false;
let mut is_goal_response_ready = false;
let mut is_cancel_response_ready = false;
let mut is_result_response_ready = false;
let ret = unsafe {
rcl_action_client_wait_set_get_entities_ready(&ws, ac,
&mut is_feedback_ready,
&mut is_status_ready,
&mut is_goal_response_ready,
&mut is_cancel_response_ready,
&mut is_result_response_ready)
};
if ret != RCL_RET_OK as i32 {
continue;
}
if is_goal_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_goal_response();
}
if is_result_response_ready {
let mut acs = s.lock().unwrap();
acs.handle_result_response();
}
if is_feedback_ready {
let mut acs = s.lock().unwrap();
acs.handle_feedback_msg();
}
if is_status_ready {
let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
let ret = unsafe {
rcl_action_take_status(ac, status_array.void_ptr_mut())
};
if ret == RCL_RET_OK as i32 {
let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
// TODO: actually use this information.
for a in &arr.status_list {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone()));
let status = GoalStatus::from_rcl(a.status);
println!("goal status for {}: {:?}", uuid, status);
if status == GoalStatus::Succeeded {
// query for the result.
let mut acs = s.lock().unwrap();
acs.run_result_request(&uuid);
}
}
}
}
}
unsafe { unsafe {
rcl_wait_set_fini(&mut ws); rcl_wait_set_fini(&mut ws);
} }
@ -1400,6 +1799,46 @@ where
} }
} }
impl<T> ActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn send_goal_request(&self, goal: T::Goal,
cb: Box<dyn FnOnce(<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response) -> ()>,
feedback_cb: Box<dyn FnMut(T::Feedback) -> ()>,
result_cb: Box<dyn FnOnce(T::Result) -> ()>) -> Result<()>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self
.client_
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
// copy rust msg to native and publish it
let uuid = uuid::Uuid::new_v4();
let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: uuid.as_bytes().to_vec() };
println!("UUID: {:?}", uuid);
client.feedback_callbacks.push((uuid, feedback_cb));
client.result_callbacks.push((uuid, result_cb));
let request_msg = T::make_goal_request_msg(uuid_msg, goal);
let native_msg = WrappedNativeMsg::<<<T as WrappedActionTypeSupport>::SendGoal as
WrappedServiceTypeSupport>::Request>::from(&request_msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
if result == RCL_RET_OK as i32 {
client.goal_request_callbacks.push((seq_no, cb));
Ok(())
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl PublisherUntyped { impl PublisherUntyped {
pub fn publish(&self, msg: serde_json::Value) -> Result<()> { pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
// upgrade to actual ref. if still alive // upgrade to actual ref. if still alive
@ -1761,5 +2200,10 @@ mod tests {
let fb2 = Fibonacci::Feedback::from_native(&fbn); let fb2 = Fibonacci::Feedback::from_native(&fbn);
println!("feedback2 {:?}", fb2); println!("feedback2 {:?}", fb2);
assert_eq!(fb, fb2); assert_eq!(fb, fb2);
let fb = WrappedNativeMsg::<Fibonacci::Feedback>::new();
let fb1 = Fibonacci::Feedback::default();
let fb2 = Fibonacci::Feedback::from_native(&fb);
assert_eq!(fb1, fb2);
} }
} }