tidying + untyped service clients

This commit is contained in:
Martin Dahl 2021-08-13 17:16:51 +02:00
parent 3effec8cc5
commit 49f83fb5d9
19 changed files with 2366 additions and 2037 deletions

View File

@ -118,13 +118,17 @@ fn main() {
}
let untyped_helper = msg_gen::generate_untyped_helper(&msg_list);
let untyped_service_helper = msg_gen::generate_untyped_service_helper(&msg_list);
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
let msgs_fn = out_path.join("_r2r_generated_msgs.rs");
let untyped_fn = out_path.join("_r2r_generated_untyped_helper.rs");
let untyped_service_fn = out_path.join("_r2r_generated_service_helper.rs");
let mut f = File::create(msgs_fn).unwrap();
write!(f, "{}", modules).unwrap();
let mut f = File::create(untyped_fn).unwrap();
write!(f, "{}", untyped_helper).unwrap();
let mut f = File::create(untyped_service_fn).unwrap();
write!(f, "{}", untyped_service_helper).unwrap();
}

View File

@ -93,14 +93,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Box::new(accept_cancel_cb),
Box::new(handle_goal_cb),
)?;
loop {
while !*done.lock().unwrap() {
node.lock()
.unwrap()
.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
if *done.lock().unwrap() {
break;
}
}
Ok(())

View File

@ -22,7 +22,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
},
..Default::default()
};
let mut native = r2r::WrappedNativeMsg::<Int32>::new();
let mut native = r2r::NativeMsg::<Int32>::new();
native.data = count;
publisher.publish(&to_send).unwrap();

View File

@ -33,10 +33,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
})?;
// for sub2 we just print the data
let sub2 = node.subscribe::<r2r::std_msgs::msg::String>("/topic2")?;
let sub2 = node.subscribe_untyped("/topic2", "std_msgs/msg/String")?;
spawner.spawn_local(async move {
sub2.for_each(|msg| {
println!("topic2: new msg: {}", msg.data);
println!("topic2: new msg: {}", msg.expect("deserialization error"));
future::ready(())
})
.await

View File

@ -17,6 +17,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic2")?;
let state = Arc::new(Mutex::new(SharedState::default()));
use r2r::example_interfaces::srv::AddTwoInts;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
std::thread::spawn(move || {
let req = AddTwoInts::Request { a: 10, b: 20 };
print!("{} + {} = ", req.a, req.b);
let resp = client.request(&req).expect("");
});
// task that every other time forwards message to topic2
let state_t1 = state.clone();
task::spawn(async move {

View File

@ -0,0 +1,48 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use r2r;
async fn requester_task(c: r2r::UntypedClient) -> Result<(), Box<dyn std::error::Error>> {
let mut x: i64 = 0;
loop {
let json = format!("{{ \"a\": {}, \"b\": {} }}", 10 * x, x);
let req = serde_json::from_str(&json).unwrap();
let resp = c.request(req)?.await?;
println!("{}", resp.expect("deserialization error"));
x += 1;
if x == 10 {
break;
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_client_untyped("/add_two_ints", "example_interfaces/srv/AddTwoInts")?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available_untyped(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let mut pool = LocalPool::new();
let spawner = pool.spawner();
spawner.spawn_local(async move {
match requester_task(client).await {
Ok(()) => println!("done."),
Err(e) => println!("error: {}", e),
}
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
}
}

View File

@ -486,7 +486,7 @@ pub fn generate_untyped_helper(msgs: &Vec<common::RosMsg>) -> String {
let open = String::from(
"
impl WrappedNativeMsgUntyped {
fn new_from(typename: &str) -> Result<Self> {
pub fn new_from(typename: &str) -> Result<Self> {
",
);
let close = String::from(
@ -523,3 +523,44 @@ impl WrappedNativeMsgUntyped {
format!("{}{}{}", open, lines, close)
}
pub fn generate_untyped_service_helper(msgs: &Vec<common::RosMsg>) -> String {
let open = String::from(
"
impl UntypedServiceSupport {
pub fn new_from(typename: &str) -> Result<Self> {
",
);
let close = String::from(
"
else
{
return Err(Error::InvalidMessageType{ msgtype: typename.into() })
}
}
}
",
);
let mut lines = String::new();
for msg in msgs {
if msg.prefix != "srv" {
continue;
}
let typename = format!("{}/{}/{}", msg.module, msg.prefix, msg.name);
let rustname = format!("{}::{}::{}::Service", msg.module, msg.prefix, msg.name);
lines.push_str(&format!(
"
if typename == \"{typename}\" {{
return Ok(UntypedServiceSupport::new::<{rustname}>());
}}
",
typename = typename,
rustname = rustname
));
}
format!("{}{}{}", open, lines, close)
}

344
src/action_clients.rs Normal file
View File

@ -0,0 +1,344 @@
use super::*;
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum GoalStatus {
Unknown,
Accepted,
Executing,
Canceling,
Succeeded,
Canceled,
Aborted,
}
impl GoalStatus {
#[allow(dead_code)]
pub fn to_rcl(&self) -> i8 {
match self {
GoalStatus::Unknown => 0,
GoalStatus::Accepted => 1,
GoalStatus::Executing => 2,
GoalStatus::Canceling => 3,
GoalStatus::Succeeded => 4,
GoalStatus::Canceled => 5,
GoalStatus::Aborted => 6,
}
}
pub fn from_rcl(s: i8) -> Self {
match s {
0 => GoalStatus::Unknown,
1 => GoalStatus::Accepted,
2 => GoalStatus::Executing,
3 => GoalStatus::Canceling,
4 => GoalStatus::Succeeded,
5 => GoalStatus::Canceled,
6 => GoalStatus::Aborted,
_ => panic!("unknown action status: {}", s),
}
}
}
pub struct WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub rcl_handle: rcl_action_client_t,
pub goal_response_channels: Vec<(
i64,
oneshot::Sender<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>,
)>,
pub cancel_response_channels: Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
pub result_requests: Vec<(i64, uuid::Uuid)>,
pub result_senders: Vec<(uuid::Uuid, oneshot::Sender<T::Result>)>,
pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
}
pub trait ActionClient_ {
fn handle(&self) -> &rcl_action_client_t;
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
fn handle_goal_response(&mut self) -> ();
fn handle_cancel_response(&mut self) -> ();
fn handle_feedback_msg(&mut self) -> ();
fn handle_status_msg(&mut self) -> ();
fn handle_result_response(&mut self) -> ();
fn send_result_request(&mut self, uuid: uuid::Uuid) -> ();
}
use std::convert::TryInto;
pub fn vec_to_uuid_bytes<T>(v: Vec<T>) -> [T; 16] {
v.try_into().unwrap_or_else(|v: Vec<T>| {
panic!("Expected a Vec of length {} but it was {}", 16, v.len())
})
}
impl<T> WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
*self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
}
pub fn send_cancel_request(&mut self, goal: &uuid::Uuid) -> Result<impl Future<Output = Result<()>>>
where
T: WrappedActionTypeSupport,
{
let msg = action_msgs::srv::CancelGoal::Request {
goal_info: action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: goal.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
},
};
let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
if result == RCL_RET_OK as i32 {
let (cancel_req_sender, cancel_req_receiver) =
oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
self.cancel_response_channels
.push((seq_no, cancel_req_sender));
// instead of "canceled" we return invalid client.
let future = cancel_req_receiver
.map_err(|_| Error::RCL_RET_CLIENT_INVALID)
.map(|r| match r {
Ok(r) => match r.return_code {
0 => Ok(()),
1 => Err(Error::GoalCancelRejected),
2 => Err(Error::GoalCancelUnknownGoalID),
3 => Err(Error::GoalCancelAlreadyTerminated),
x => panic!("unknown error code return from action server: {}", x),
},
Err(e) => Err(e),
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl<T: 'static> ActionClient_ for WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
{
fn handle(&self) -> &rcl_action_client_t {
&self.rcl_handle
}
fn handle_goal_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::new();
let ret = unsafe {
rcl_action_take_goal_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.goal_response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.goal_response_channels.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
match sender.send(response) {
Ok(()) => {}
Err(e) => {
println!("error sending to action client: {:?}", e);
}
}
} else {
let we_have: String = self
.goal_response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_cancel_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
let ret = unsafe {
rcl_action_take_cancel_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.cancel_response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.cancel_response_channels.swap_remove(idx);
let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
match sender.send(response) {
Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e),
_ => (),
}
} else {
let we_have: String = self
.goal_response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn handle_feedback_msg(&mut self) -> () {
let mut feedback_msg = WrappedNativeMsg::<T::FeedbackMessage>::new();
let ret =
unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
if ret == RCL_RET_OK as i32 {
let msg = T::FeedbackMessage::from_native(&feedback_msg);
let (uuid, feedback) = T::destructure_feedback_msg(msg);
let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid));
if let Some((_, sender)) = self
.feedback_senders
.iter_mut()
.find(|(uuid, _)| uuid == &msg_uuid)
{
match sender.try_send(feedback) {
Err(e) => eprintln!("warning: could not send feedback msg ({})", e),
_ => (),
}
}
}
}
fn handle_status_msg(&mut self) -> () {
let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
if ret == RCL_RET_OK as i32 {
let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
for a in &arr.status_list {
let uuid =
uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone()));
if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
continue;
}
let status = GoalStatus::from_rcl(a.status);
*self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
}
}
}
fn handle_result_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>::new();
let ret = unsafe {
rcl_action_take_result_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.result_requests
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, uuid) = self.result_requests.swap_remove(idx);
if let Some(idx) = self
.result_senders
.iter()
.position(|(suuid, _)| suuid == &uuid)
{
let (_, sender) = self.result_senders.swap_remove(idx);
let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
let (status, result) = T::destructure_result_response_msg(response);
let status = GoalStatus::from_rcl(status);
if status != GoalStatus::Succeeded {
println!("goal status failed: {:?}, result: {:?}", status, result);
// this will drop the sender which makes the receiver fail with "canceled"
} else {
match sender.send(result) {
Ok(()) => {}
Err(e) => {
println!("error sending result to action client: {:?}", e);
}
}
}
}
} else {
let we_have: String = self
.result_requests
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
}
}
fn send_result_request(&mut self, uuid: uuid::Uuid) -> () {
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
};
let request_msg = T::make_result_request_msg(uuid_msg);
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
>::from(&request_msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
if result == RCL_RET_OK as i32 {
self.result_requests.push((seq_no, uuid));
} else {
eprintln!("coult not send request {}", result);
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_action_client_fini(&mut self.rcl_handle, node);
}
}
}

566
src/action_servers.rs Normal file
View File

@ -0,0 +1,566 @@
use super::*;
pub trait ActionServer_ {
fn handle(&self) -> &rcl_action_server_t;
fn handle_goal_request(&mut self, server: Arc<Mutex<dyn ActionServer_>>) -> ();
fn handle_cancel_request(&mut self) -> ();
fn handle_result_request(&mut self) -> ();
fn handle_goal_expired(&mut self) -> ();
fn publish_status(&self) -> ();
fn add_result(&mut self, uuid: uuid::Uuid, msg: Box<dyn VoidPtr>) -> ();
fn destroy(&mut self, node: &mut rcl_node_t);
}
pub struct WrappedActionServer<T>
where
T: WrappedActionTypeSupport,
{
pub rcl_handle: rcl_action_server_t,
pub clock_handle: Box<rcl_clock_t>,
pub accept_goal_cb: Box<dyn FnMut(&uuid::Uuid, &T::Goal) -> bool>,
pub accept_cancel_cb: Box<dyn FnMut(&ServerGoal<T>) -> bool>,
pub goal_cb: Box<dyn FnMut(ServerGoal<T>)>,
pub goals: HashMap<uuid::Uuid, ServerGoal<T>>,
pub result_msgs: HashMap<uuid::Uuid, Box<dyn VoidPtr>>,
pub result_requests: HashMap<uuid::Uuid, Vec<rmw_request_id_t>>,
}
impl<T: 'static> ActionServer_ for WrappedActionServer<T>
where
T: WrappedActionTypeSupport,
{
fn handle(&self) -> &rcl_action_server_t {
&self.rcl_handle
}
fn handle_goal_request(&mut self, server: Arc<Mutex<dyn ActionServer_>>) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
>::new();
let ret = unsafe {
rcl_action_take_goal_request(
&self.rcl_handle,
request_id.as_mut_ptr(),
request_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
// this seems normal if client dies.
return;
}
let msg = <<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request>::from_native(&request_msg);
let (uuid_msg, goal) = T::destructure_goal_request_msg(msg);
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid_msg.uuid.clone()));
let goal_accepted = (self.accept_goal_cb)(&uuid, &goal);
let time = builtin_interfaces::msg::Time::default();
let goal_info = action_msgs::msg::GoalInfo {
goal_id: uuid_msg,
stamp: time.clone(),
};
let native_goal_info = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
let goal_handle: Option<*mut rcl_action_goal_handle_t> = if goal_accepted {
unsafe {
Some(rcl_action_accept_new_goal(
&mut self.rcl_handle,
&*native_goal_info,
))
}
} else {
None
};
// send response
let response_msg = T::make_goal_response_msg(goal_accepted, time);
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::from(&response_msg);
let ret = unsafe {
let mut request_id = request_id.assume_init();
rcl_action_send_goal_response(
&self.rcl_handle,
&mut request_id,
response_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
println!("action server: failed to send goal response");
return;
}
// if we accepted the goal, update its state machine and publish all goal statuses
if let Some(goal_handle) = goal_handle {
unsafe {
rcl_action_update_goal_state(
goal_handle,
rcl_action_goal_event_t::GOAL_EVENT_EXECUTE,
);
}
self.publish_status();
// run the user supplied cb with newly created goal handle object
let g: ServerGoal<T> = ServerGoal {
uuid,
goal,
handle: Arc::new(Mutex::new(goal_handle)),
server: Arc::downgrade(&server),
};
self.goals.insert(uuid, g.clone());
// start goal callback
(self.goal_cb)(g);
}
}
fn handle_cancel_request(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::new();
let ret = unsafe {
rcl_action_take_cancel_request(
&self.rcl_handle,
request_id.as_mut_ptr(),
request_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
// this seems normal if client dies.
return;
}
let mut cancel_response = unsafe { rcl_action_get_zero_initialized_cancel_response() };
let ret = unsafe {
rcl_action_process_cancel_request(&self.rcl_handle, &*request_msg, &mut cancel_response)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could not process cancel request. {}", ret);
return;
}
let mut response_msg =
action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg);
// let user filter cancelled goals.
let requested_cancels = response_msg.goals_canceling.len();
response_msg.goals_canceling.retain(|goal_info| {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
if let Some(goal) = self.goals.get(&uuid) {
(self.accept_cancel_cb)(goal)
} else {
true
}
});
response_msg.goals_canceling.iter().for_each(|goal_info| {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
if let Some(goal) = self.goals.get_mut(&uuid) {
goal.set_cancel();
}
});
// check if all cancels were rejected.
if requested_cancels >= 1 && response_msg.goals_canceling.is_empty() {
response_msg.return_code = 1; // TODO: auto generate these (int8 ERROR_REJECTED=1)
}
if !response_msg.goals_canceling.is_empty() {
// at least one goal state changed, publish a new status message
self.publish_status();
}
let mut native_msg =
WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::from(&response_msg);
let ret = unsafe {
let mut request_id = request_id.assume_init();
rcl_action_send_cancel_response(
&self.rcl_handle,
&mut request_id,
native_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could send cancel response. {}", ret);
return;
}
}
fn handle_goal_expired(&mut self) {
let mut goal_info = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::new();
let mut num_expired = 1;
while num_expired > 1 {
let ret = unsafe {
rcl_action_expire_goals(&self.rcl_handle, &mut *goal_info, 1, &mut num_expired)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could not expire goal.");
return;
}
let gi = action_msgs::msg::GoalInfo::from_native(&goal_info);
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone()));
println!("goal expired: {} - {}", uuid, num_expired);
self.goals.remove(&uuid);
self.result_msgs.remove(&uuid);
self.result_requests.remove(&uuid);
}
}
fn publish_status(&self) {
unsafe {
let mut status = rcl_action_get_zero_initialized_goal_status_array();
let ret = rcl_action_get_goal_status_array(&self.rcl_handle, &mut status);
if ret != RCL_RET_OK as i32 {
println!(
"action server: failed to get goal status array: {}",
Error::from_rcl_error(ret)
);
return;
}
let ret = rcl_action_publish_status(
&self.rcl_handle,
&status as *const _ as *const std::os::raw::c_void,
);
if ret != RCL_RET_OK as i32 {
println!(
"action server: failed to publish status: {}",
Error::from_rcl_error(ret)
);
return;
}
rcl_action_goal_status_array_fini(&mut status);
}
}
// bit of a hack...
fn add_result(&mut self, uuid: uuid::Uuid, mut msg: Box<dyn VoidPtr>) -> () {
// if there are already requests for this goal, send the result immediately.
if let Some(rr) = self.result_requests.remove(&uuid) {
for mut req in rr {
let ret = unsafe {
rcl_action_send_result_response(&self.rcl_handle, &mut req, msg.void_ptr_mut())
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could send result request response. {}",
Error::from_rcl_error(ret)
);
}
}
}
self.result_msgs.insert(uuid, msg);
}
fn handle_result_request(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
>::new();
let ret = unsafe {
rcl_action_take_result_request(
&self.rcl_handle,
request_id.as_mut_ptr(),
request_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
// this seems normal if client dies.
return;
}
let msg = <<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request>::from_native(&request_msg);
let goal_info = action_msgs::msg::GoalInfo {
goal_id: T::destructure_result_request_msg(msg),
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(&self.rcl_handle, &*goal_info_native) };
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
let response_msg = if !goal_exists {
// Goal does not exists
println!("goal does not exist :(");
let status = GoalStatus::Unknown;
let msg = T::make_result_response_msg(status.to_rcl(), T::Result::default());
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>::from(&msg);
Some(response_msg.void_ptr_mut())
} else {
self.result_msgs
.get_mut(&uuid)
.map(|msg| msg.void_ptr_mut())
};
let mut request_id = unsafe { request_id.assume_init() };
if let Some(response_msg) = response_msg {
let ret = unsafe {
rcl_action_send_result_response(&self.rcl_handle, &mut request_id, response_msg)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could send result request response. {}",
Error::from_rcl_error(ret)
);
return;
}
} else {
// keep request for later when result comes in
// todo: add logic that replies to the requests
self.result_requests
.entry(uuid)
.or_insert(vec![])
.push(request_id);
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_action_server_fini(&mut self.rcl_handle, node);
rcl_ros_clock_fini(self.clock_handle.as_mut());
}
}
}
#[derive(Clone)]
pub struct ServerGoal<T>
where
T: WrappedActionTypeSupport,
{
pub uuid: uuid::Uuid,
pub goal: T::Goal,
handle: Arc<Mutex<*mut rcl_action_goal_handle_t>>,
server: Weak<Mutex<dyn ActionServer_>>,
}
unsafe impl<T> Send for ServerGoal<T> where T: WrappedActionTypeSupport {}
impl<T: 'static> ServerGoal<T>
where
T: WrappedActionTypeSupport,
{
pub fn is_cancelling(&self) -> bool {
let mut state = 0u8; // TODO: int8 STATUS_UNKNOWN = 0;
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_goal_handle_get_status(*handle, &mut state)
};
if ret != RCL_RET_OK as i32 {
println!("action server: Failed to get goal handle state: {}", ret);
}
return state == 3u8; // TODO: int8 STATUS_CANCELING
}
pub fn publish_feedback(&self, msg: T::Feedback) -> Result<()>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
};
let feedback_msg = T::make_feedback_msg(uuid_msg, msg);
let mut native_msg = WrappedNativeMsg::<T::FeedbackMessage>::from(&feedback_msg);
let ret = unsafe {
rcl_action_publish_feedback(
action_server.lock().unwrap().handle(),
native_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
Ok(())
} else {
eprintln!("coult not publish {}", Error::from_rcl_error(ret));
Ok(()) // todo: error codes
}
}
fn set_cancel(&mut self) {
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
}
pub fn cancel(&mut self, msg: T::Result) -> Result<()> {
// upgrade to actual ref. if still alive
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to publish result without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(action_server.handle());
}
// send out updated statues
action_server.publish_status();
// create result message
let result_msg = T::make_result_response_msg(5, msg); // todo: int8 STATUS_CANCELED = 5
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>::from(&result_msg);
action_server.add_result(self.uuid.clone(), Box::new(native_msg));
Ok(())
}
pub fn abort(&mut self, msg: T::Result) -> Result<()> {
// todo: error handling
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_ABORT)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
// upgrade to actual ref. if still alive
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to abort without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(action_server.handle());
}
// send out updated statues
action_server.publish_status();
// create result message
let result_msg = T::make_result_response_msg(6, msg); // todo: int8 STATUS_ABORTED = 6
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>::from(&result_msg);
action_server.add_result(self.uuid.clone(), Box::new(native_msg));
Ok(())
}
pub fn succeed(&mut self, msg: T::Result) -> Result<()>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to publish result without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_SUCCEED);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(action_server.handle());
}
// send out updated statues
action_server.publish_status();
// create result message
let result_msg = T::make_result_response_msg(4, msg); // todo: int8 STATUS_SUCCEEDED = 4
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
>::from(&result_msg);
action_server.add_result(self.uuid.clone(), Box::new(native_msg));
Ok(())
}
}

211
src/clients.rs Normal file
View File

@ -0,0 +1,211 @@
use super::*;
unsafe impl<T> Send for TypedClient<T> where T: WrappedServiceTypeSupport {}
impl<T: 'static> TypedClient<T>
where
T: WrappedServiceTypeSupport,
{
pub fn request(&mut self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
where
T: WrappedServiceTypeSupport,
{
let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg);
let mut seq_no = 0i64;
let result =
unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
let (sender, receiver) = oneshot::channel::<T::Response>();
if result == RCL_RET_OK as i32 {
self.response_channels.push((seq_no, sender));
// instead of "canceled" we return invalid client.
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
unsafe impl Send for UntypedClient_ {}
impl UntypedClient_ {
pub fn request(&mut self, msg: serde_json::Value) -> Result<impl Future<Output = Result<Result<serde_json::Value>>>>
{
let mut native_msg = (self.service_type.make_request_msg)();
native_msg.from_json(msg)?;
let mut seq_no = 0i64;
let result =
unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
let (sender, receiver) = oneshot::channel::<Result<serde_json::Value>>();
if result == RCL_RET_OK as i32 {
self.response_channels.push((seq_no, sender));
// instead of "canceled" we return invalid client.
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
pub trait Client_ {
fn handle(&self) -> &rcl_client_t;
fn handle_response(&mut self) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
pub struct TypedClient<T>
where
T: WrappedServiceTypeSupport,
{
pub rcl_handle: rcl_client_t,
pub response_channels: Vec<(i64, oneshot::Sender<T::Response>)>,
}
impl<T: 'static> Client_ for TypedClient<T>
where
T: WrappedServiceTypeSupport,
{
fn handle(&self) -> &rcl_client_t {
&self.rcl_handle
}
fn handle_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = WrappedNativeMsg::<T::Response>::new();
let ret = unsafe {
rcl_take_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.response_channels.swap_remove(idx);
let response = T::Response::from_native(&response_msg);
match sender.send(response) {
Ok(()) => {}
Err(e) => {
println!("error sending to client: {:?}", e);
}
}
} else {
let we_have: String = self
.response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
} // TODO handle failure.
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_client_fini(&mut self.rcl_handle, node);
}
}
}
pub struct UntypedClient_
{
pub service_type: UntypedServiceSupport,
pub rcl_handle: rcl_client_t,
pub response_channels: Vec<(i64, oneshot::Sender<Result<serde_json::Value>>)>,
}
impl Client_ for UntypedClient_
{
fn handle(&self) -> &rcl_client_t {
&self.rcl_handle
}
fn handle_response(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut response_msg = (self.service_type.make_response_msg)();
let ret = unsafe {
rcl_take_response(
&self.rcl_handle,
request_id.as_mut_ptr(),
response_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
if let Some(idx) = self
.response_channels
.iter()
.position(|(id, _)| id == &request_id.sequence_number)
{
let (_, sender) = self.response_channels.swap_remove(idx);
let response = response_msg.to_json();
match sender.send(response) {
Ok(()) => {}
Err(e) => {
println!("error sending to client: {:?}", e);
}
}
} else {
let we_have: String = self
.response_channels
.iter()
.map(|(id, _)| id.to_string())
.collect::<Vec<_>>()
.join(",");
eprintln!(
"no such req id: {}, we have [{}], ignoring",
request_id.sequence_number, we_have
);
}
} // TODO handle failure.
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_client_fini(&mut self.rcl_handle, node);
}
}
}
pub fn create_client_helper(
node: *mut rcl_node_t,
service_name: &str,
service_ts: *const rosidl_service_type_support_t,
) -> Result<rcl_client_t> {
let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let client_options = rcl_client_get_default_options();
rcl_client_init(
&mut client_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
} else {
Err(Error::from_rcl_error(result))
}
}

77
src/clocks.rs Normal file
View File

@ -0,0 +1,77 @@
use super::*;
#[derive(Debug)]
pub enum ClockType {
RosTime,
SystemTime,
SteadyTime,
}
unsafe impl Send for Clock {}
pub struct Clock {
clock_handle: Box<rcl_clock_t>,
}
pub fn clock_type_to_rcl(ct: &ClockType) -> rcl_clock_type_t {
match ct {
ClockType::RosTime => rcl_clock_type_t::RCL_ROS_TIME,
ClockType::SystemTime => rcl_clock_type_t::RCL_SYSTEM_TIME,
ClockType::SteadyTime => rcl_clock_type_t::RCL_STEADY_TIME,
}
}
impl Clock {
pub fn create(ct: ClockType) -> Result<Clock> {
let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit();
let rcl_ct = clock_type_to_rcl(&ct);
let ret = unsafe {
rcl_clock_init(
rcl_ct,
clock_handle.as_mut_ptr(),
&mut rcutils_get_default_allocator(),
)
};
if ret != RCL_RET_OK as i32 {
eprintln!("could not create {:?} clock: {}", ct, ret);
return Err(Error::from_rcl_error(ret));
}
let clock_handle = Box::new(unsafe { clock_handle.assume_init() });
Ok(Clock { clock_handle })
}
pub fn get_now(&mut self) -> Result<Duration> {
let valid = unsafe { rcl_clock_valid(&mut *self.clock_handle) };
if !valid {
return Err(Error::from_rcl_error(RCL_RET_INVALID_ARGUMENT as i32));
}
let mut tp: rcutils_time_point_value_t = 0;
let ret = unsafe { rcl_clock_get_now(&mut *self.clock_handle, &mut tp) };
if ret != RCL_RET_OK as i32 {
eprintln!("could not create steady clock: {}", ret);
return Err(Error::from_rcl_error(ret));
}
let dur = Duration::from_nanos(tp as u64);
Ok(dur)
}
pub fn to_builtin_time(d: &Duration) -> builtin_interfaces::msg::Time {
let sec = d.as_secs() as i32;
let nanosec = d.subsec_nanos();
builtin_interfaces::msg::Time { sec, nanosec }
}
}
impl Drop for Clock {
fn drop(&mut self) {
unsafe {
rcl_clock_fini(&mut *self.clock_handle);
}
}
}

87
src/context.rs Normal file
View File

@ -0,0 +1,87 @@
use super::*;
#[derive(Debug, Clone)]
pub struct Context {
pub context_handle: Arc<Mutex<ContextHandle>>,
}
unsafe impl Send for Context {}
impl Context {
pub fn create() -> Result<Context> {
let mut ctx: Box<rcl_context_t> = unsafe { Box::new(rcl_get_zero_initialized_context()) };
// argc/v
let args = std::env::args()
.map(|arg| CString::new(arg).unwrap())
.collect::<Vec<CString>>();
let mut c_args = args
.iter()
.map(|arg| arg.as_ptr())
.collect::<Vec<*const ::std::os::raw::c_char>>();
c_args.push(std::ptr::null());
let is_valid = unsafe {
let allocator = rcutils_get_default_allocator();
let mut init_options = rcl_get_zero_initialized_init_options();
rcl_init_options_init(&mut init_options, allocator);
rcl_init(
(c_args.len() - 1) as ::std::os::raw::c_int,
c_args.as_ptr(),
&init_options,
ctx.as_mut(),
);
rcl_init_options_fini(&mut init_options as *mut _);
rcl_context_is_valid(ctx.as_mut())
};
let logging_ok = unsafe {
let _guard = log_guard();
let ret = rcl_logging_configure(
&ctx.as_ref().global_arguments,
&rcutils_get_default_allocator(),
);
ret == RCL_RET_OK as i32
};
if is_valid && logging_ok {
Ok(Context {
context_handle: Arc::new(Mutex::new(ContextHandle(ctx))),
})
} else {
Err(Error::RCL_RET_ERROR) // TODO
}
}
pub fn is_valid(&self) -> bool {
let mut ctx = self.context_handle.lock().unwrap();
unsafe { rcl_context_is_valid(ctx.as_mut()) }
}
}
#[derive(Debug)]
pub struct ContextHandle(Box<rcl_context_t>);
impl Deref for ContextHandle {
type Target = Box<rcl_context_t>;
fn deref<'a>(&'a self) -> &'a Box<rcl_context_t> {
&self.0
}
}
impl DerefMut for ContextHandle {
fn deref_mut<'a>(&'a mut self) -> &'a mut Box<rcl_context_t> {
&mut self.0
}
}
impl Drop for ContextHandle {
fn drop(&mut self) {
// TODO: error handling? atleast probably need rcl_reset_error
unsafe {
rcl::rcl_shutdown(self.0.as_mut());
rcl::rcl_context_fini(self.0.as_mut());
}
}
}

View File

@ -2,7 +2,7 @@
use rcl::*;
use thiserror::Error;
// TODO
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)]
pub enum Error {

2144
src/lib.rs

File diff suppressed because it is too large Load Diff

518
src/msg_types.rs Normal file
View File

@ -0,0 +1,518 @@
use crate::error::*;
use msg_gen::*;
use rcl::{
rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
pub mod generated_msgs {
use super::*;
include!(concat!(env!("OUT_DIR"), "/_r2r_generated_msgs.rs"));
include!(concat!(
env!("OUT_DIR"),
"/_r2r_generated_untyped_helper.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/_r2r_generated_service_helper.rs"
));
}
use generated_msgs::{builtin_interfaces, unique_identifier_msgs};
pub trait WrappedTypesupport:
Serialize + serde::de::DeserializeOwned + Default + Debug + Clone
{
type CStruct;
fn get_ts() -> &'static rosidl_message_type_support_t;
fn create_msg() -> *mut Self::CStruct;
fn destroy_msg(msg: *mut Self::CStruct);
fn from_native(msg: &Self::CStruct) -> Self;
fn copy_to_native(&self, msg: &mut Self::CStruct);
}
pub trait WrappedServiceTypeSupport: Debug + Clone {
type Request: WrappedTypesupport;
type Response: WrappedTypesupport;
fn get_ts() -> &'static rosidl_service_type_support_t;
}
pub trait WrappedActionTypeSupport: Debug + Clone {
type Goal: WrappedTypesupport;
type Result: WrappedTypesupport;
type Feedback: WrappedTypesupport;
// internal...
type FeedbackMessage: WrappedTypesupport;
type SendGoal: WrappedServiceTypeSupport;
type GetResult: WrappedServiceTypeSupport;
fn get_ts() -> &'static rosidl_action_type_support_t;
fn make_goal_request_msg(
goal_id: unique_identifier_msgs::msg::UUID,
goal: Self::Goal,
) -> <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request;
fn make_goal_response_msg(
accepted: bool,
stamp: builtin_interfaces::msg::Time,
) -> <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response;
fn make_feedback_msg(
goal_id: unique_identifier_msgs::msg::UUID,
feedback: Self::Feedback,
) -> Self::FeedbackMessage;
fn make_result_request_msg(
goal_id: unique_identifier_msgs::msg::UUID,
) -> <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request;
fn make_result_response_msg(
status: i8,
result: Self::Result,
) -> <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response;
fn destructure_goal_request_msg(
msg: <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
) -> (unique_identifier_msgs::msg::UUID, Self::Goal);
fn destructure_goal_response_msg(
msg: <<Self as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
) -> (bool, builtin_interfaces::msg::Time);
fn destructure_feedback_msg(
msg: Self::FeedbackMessage,
) -> (unique_identifier_msgs::msg::UUID, Self::Feedback);
fn destructure_result_response_msg(
msg: <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
) -> (i8, Self::Result);
fn destructure_result_request_msg(
msg: <<Self as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
) -> unique_identifier_msgs::msg::UUID;
}
#[derive(Debug)]
pub struct WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
pub msg: *mut T::CStruct,
}
pub trait VoidPtr {
fn void_ptr(&self) -> *const std::os::raw::c_void;
fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void;
}
#[derive(Debug)]
pub struct WrappedNativeMsgUntyped {
pub ts: &'static rosidl_message_type_support_t,
msg: *mut std::os::raw::c_void,
destroy: fn(*mut std::os::raw::c_void),
msg_to_json: fn(
native: *const std::os::raw::c_void,
) -> std::result::Result<serde_json::Value, serde_json::error::Error>,
msg_from_json: fn(
native: *mut std::os::raw::c_void,
json: serde_json::Value,
) -> std::result::Result<(), serde_json::error::Error>,
}
pub struct UntypedServiceSupport {
pub ts: &'static rosidl_service_type_support_t,
pub make_request_msg: fn() -> WrappedNativeMsgUntyped,
pub make_response_msg: fn() -> WrappedNativeMsgUntyped,
}
impl UntypedServiceSupport {
fn new<T>() -> Self
where
T: WrappedServiceTypeSupport,
{
let make_request_msg = || {
WrappedNativeMsgUntyped::new::<T::Request>()
};
let make_response_msg = || {
WrappedNativeMsgUntyped::new::<T::Response>()
};
UntypedServiceSupport {
ts: T::get_ts(),
make_request_msg,
make_response_msg,
}
}
}
impl WrappedNativeMsgUntyped {
fn new<T>() -> Self
where
T: WrappedTypesupport,
{
let destroy = |native: *mut std::os::raw::c_void| {
let native_msg = native as *mut T::CStruct;
T::destroy_msg(native_msg);
};
let msg_to_json = |native: *const std::os::raw::c_void| {
let msg = unsafe { T::from_native(&*(native as *const T::CStruct)) };
serde_json::to_value(&msg)
};
let msg_from_json = |native: *mut std::os::raw::c_void, json: serde_json::Value| {
serde_json::from_value(json).map(|msg: T| unsafe {
msg.copy_to_native(&mut *(native as *mut T::CStruct));
})
};
WrappedNativeMsgUntyped {
ts: T::get_ts(),
msg: T::create_msg() as *mut std::os::raw::c_void,
destroy,
msg_to_json,
msg_from_json,
}
}
pub fn to_json(&self) -> Result<serde_json::Value> {
let json = (self.msg_to_json)(self.msg);
json.map_err(|serde_err| Error::SerdeError {
err: serde_err.to_string(),
})
}
pub fn from_json(&mut self, json: serde_json::Value) -> Result<()> {
(self.msg_from_json)(self.msg, json).map_err(|serde_err| Error::SerdeError {
err: serde_err.to_string(),
})
}
}
impl VoidPtr for WrappedNativeMsgUntyped {
fn void_ptr(&self) -> *const std::os::raw::c_void {
self.msg as *const _ as *const std::os::raw::c_void
}
fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void {
self.msg as *mut _ as *mut std::os::raw::c_void
}
}
impl Drop for WrappedNativeMsgUntyped {
fn drop(&mut self) {
(self.destroy)(self.msg);
}
}
impl<T> WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
pub fn new() -> Self {
WrappedNativeMsg {
msg: T::create_msg(),
}
}
pub fn from(msg: &T) -> Self {
let mut native_msg = Self::new();
msg.copy_to_native(&mut native_msg);
native_msg
}
}
impl<T: 'static> VoidPtr for WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
fn void_ptr(&self) -> *const std::os::raw::c_void {
self.msg as *const _ as *const std::os::raw::c_void
}
fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void {
self.msg as *mut _ as *mut std::os::raw::c_void
}
}
impl<T> Drop for WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
fn drop(&mut self) {
T::destroy_msg(self.msg);
}
}
impl<T> Deref for WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
type Target = T::CStruct;
fn deref(&self) -> &Self::Target {
unsafe { &(*self.msg) }
}
}
impl<T> DerefMut for WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut (*self.msg) }
}
}
#[cfg(test)]
mod tests {
use super::generated_msgs::*;
use super::*;
use rcl::*;
#[test]
fn test_ros_str() -> () {
let hej = "hej hopp";
let mut msg = WrappedNativeMsg::<std_msgs::msg::String>::new();
msg.data.assign(hej);
assert_eq!(msg.data.to_str(), hej);
}
#[test]
fn test_copy_fields() -> () {
let msg_orig = std_msgs::msg::String { data: "hej".into() };
let rosmsg = WrappedNativeMsg::<std_msgs::msg::String>::from(&msg_orig);
let msg2 = std_msgs::msg::String::from_native(&rosmsg);
assert_eq!(msg_orig, msg2);
}
#[test]
fn test_introspection_string() -> () {
unsafe {
use std::ffi::CStr;
let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__std_msgs__msg__String();
let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers;
println!("{:#?}", *members);
assert_eq!((*members).member_count_, 1);
let s = CStr::from_ptr((*members).message_namespace_)
.to_str()
.unwrap();
assert_eq!(s, "std_msgs__msg");
let s = CStr::from_ptr((*members).message_name_).to_str().unwrap();
assert_eq!(s, "String");
let member = (*members).members_;
println!("member: {:#?}", *member);
let field_name = CStr::from_ptr((*member).name_).to_str().unwrap();
let type_id = (*member).type_id_;
let is_array = (*member).is_array_;
assert_eq!(field_name, "data");
assert_eq!(
type_id,
rosidl_typesupport_introspection_c__ROS_TYPE_STRING as u8
);
assert_eq!(is_array, false);
}
}
#[test]
#[should_panic] // we are testing that we cannot have to many elements in a fixed sized field
fn test_fixedsizearray() -> () {
unsafe {
let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__geometry_msgs__msg__AccelWithCovariance();
let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers;
println!("{:#?}", *members);
let memberslice =
std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize);
for member in memberslice {
println!("member: {:#?}", *member);
}
let msg_native = WrappedNativeMsg::<geometry_msgs::msg::AccelWithCovariance>::new();
let mut msg = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native);
println!("{:#?}", msg);
msg.covariance[0] = 10.0;
msg.covariance[10] = 10.0;
msg.covariance[35] = 99.0;
msg.covariance.push(4444.0);
let msg_native2 =
WrappedNativeMsg::<geometry_msgs::msg::AccelWithCovariance>::from(&msg);
let msg2 = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native2);
println!("{:#?}", msg2);
}
}
#[test]
#[should_panic] // we are testing that we cannot have to many elements in a fixed sized field
fn test_capped_sequence() -> () {
// float64[<=3] dimensions in the .msg translates to a float64 sequence AND an array size field. handle it...
unsafe {
let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__shape_msgs__msg__SolidPrimitive();
let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers;
println!("{:#?}", *members);
let memberslice =
std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize);
for member in memberslice {
println!("member: {:#?}", *member);
}
let msg_native = WrappedNativeMsg::<shape_msgs::msg::SolidPrimitive>::new();
let mut msg = shape_msgs::msg::SolidPrimitive::from_native(&msg_native);
println!("{:#?}", msg);
msg.dimensions.push(1.0);
msg.dimensions.push(1.0);
msg.dimensions.push(1.0);
msg.dimensions.push(1.0); // only three elements allowed
let _msg_native2 = WrappedNativeMsg::<shape_msgs::msg::SolidPrimitive>::from(&msg);
}
}
#[test]
fn test_generation_string_use() -> () {
let msg = std_msgs::msg::String { data: "hej".into() };
let msg2 = msg.clone();
let msg_native = WrappedNativeMsg::<std_msgs::msg::String>::from(&msg2);
let msg2 = std_msgs::msg::String::from_native(&msg_native);
assert_eq!(msg, msg2)
}
#[test]
fn test_generation_bool_use() -> () {
let msg = std_msgs::msg::Bool { data: true };
let msg_native = WrappedNativeMsg::<std_msgs::msg::Bool>::from(&msg);
let msg2 = std_msgs::msg::Bool::from_native(&msg_native);
assert_eq!(msg, msg2);
}
#[test]
fn test_float_sequence() -> () {
use trajectory_msgs::msg::*;
let native = WrappedNativeMsg::<JointTrajectoryPoint>::new();
let mut msg = JointTrajectoryPoint::from_native(&native);
msg.positions.push(39.0);
msg.positions.push(34.0);
let new_native = WrappedNativeMsg::<JointTrajectoryPoint>::from(&msg);
let new_msg = JointTrajectoryPoint::from_native(&new_native);
println!("{:#?}", new_msg);
assert_eq!(msg, new_msg);
}
#[test]
fn test_deault() -> () {
use trajectory_msgs::msg::*;
let mut msg: JointTrajectoryPoint = Default::default();
msg.positions.push(39.0);
msg.positions.push(34.0);
let mut new_native = WrappedNativeMsg::<JointTrajectoryPoint>::from(&msg);
unsafe { *((*new_native).positions.data) = 88.9 };
let new_msg = JointTrajectoryPoint::from_native(&new_native);
println!("{:#?}", new_msg);
assert_ne!(msg, new_msg);
}
#[test]
fn test_untyped_json() -> () {
let mut msg = trajectory_msgs::msg::JointTrajectoryPoint::default();
msg.positions.push(39.0);
msg.positions.push(34.0);
let json = serde_json::to_value(msg.clone()).unwrap();
let mut native =
WrappedNativeMsgUntyped::new_from("trajectory_msgs/msg/JointTrajectoryPoint").unwrap();
native.from_json(json.clone()).unwrap();
let json2 = native.to_json().unwrap();
assert_eq!(json, json2);
let msg2: trajectory_msgs::msg::JointTrajectoryPoint =
serde_json::from_value(json2).unwrap();
assert_eq!(msg, msg2);
}
#[cfg(r2r__test_msgs__msg__Arrays)]
#[test]
fn test_test_msgs_array() -> () {
let mut msg = test_msgs::msg::Arrays::default();
println!("msg: {:?}", msg.string_values);
msg.string_values = vec!["hej".to_string(), "hopp".to_string(), "stropp".to_string()];
let msg_native = WrappedNativeMsg::<test_msgs::msg::Arrays>::from(&msg);
let msg2 = test_msgs::msg::Arrays::from_native(&msg_native);
assert_eq!(msg, msg2);
}
#[cfg(r2r__test_msgs__msg__Arrays)]
#[test]
#[should_panic]
fn test_test_msgs_array_too_few_elems() -> () {
let mut msg = test_msgs::msg::Arrays::default();
println!("msg: {:?}", msg.string_values);
msg.string_values = vec!["hej".to_string(), "hopp".to_string()];
let _msg_native = WrappedNativeMsg::<test_msgs::msg::Arrays>::from(&msg);
}
#[cfg(r2r__test_msgs__msg__WStrings)]
#[test]
fn test_test_msgs_wstring() -> () {
let mut msg = test_msgs::msg::WStrings::default();
let rust_str = "ハローワールド";
msg.wstring_value = rust_str.to_string();
let native = WrappedNativeMsg::<test_msgs::msg::WStrings>::from(&msg);
println!("msg: {:?}", msg);
let msg2 = test_msgs::msg::WStrings::from_native(&native);
assert_eq!(msg.wstring_value, msg2.wstring_value);
}
#[cfg(r2r__example_interfaces__srv__AddTwoInts)]
#[test]
fn test_service_msgs() {
use example_interfaces::srv::AddTwoInts;
let mut req = AddTwoInts::Request::default();
req.a = 5;
let rn = WrappedNativeMsg::<_>::from(&req);
let req2 = AddTwoInts::Request::from_native(&rn);
println!("req2 {:?}", req2);
assert_eq!(req, req2);
let mut resp = AddTwoInts::Response::default();
resp.sum = 5;
let rn = WrappedNativeMsg::<_>::from(&resp);
let resp2 = AddTwoInts::Response::from_native(&rn);
println!("resp {:?}", resp2);
assert_eq!(resp, resp2);
}
#[cfg(r2r__example_interfaces__action__Fibonacci)]
#[test]
fn test_action_msgs() {
use example_interfaces::action::Fibonacci;
let mut goal = Fibonacci::Goal::default();
goal.order = 5;
let gn = WrappedNativeMsg::<_>::from(&goal);
let goal2 = Fibonacci::Goal::from_native(&gn);
println!("goal2 {:?}", goal2);
assert_eq!(goal, goal2);
let mut res = Fibonacci::Result::default();
res.sequence = vec![1, 2, 3];
let rn = WrappedNativeMsg::<_>::from(&res);
let res2 = Fibonacci::Result::from_native(&rn);
println!("res2 {:?}", res2);
assert_eq!(res, res2);
let mut fb = Fibonacci::Feedback::default();
fb.sequence = vec![4, 3, 6];
let fbn = WrappedNativeMsg::<_>::from(&fb);
let fb2 = Fibonacci::Feedback::from_native(&fbn);
println!("feedback2 {:?}", fb2);
assert_eq!(fb, fb2);
let fb = WrappedNativeMsg::<Fibonacci::Feedback>::new();
let fb1 = Fibonacci::Feedback::default();
let fb2 = Fibonacci::Feedback::from_native(&fb);
assert_eq!(fb1, fb2);
}
}

72
src/parameters.rs Normal file
View File

@ -0,0 +1,72 @@
use super::*;
#[derive(Debug, PartialEq, Clone)]
pub enum ParameterValue {
NotSet,
Bool(bool),
Integer(i64),
Double(f64),
String(String),
BoolArray(Vec<bool>),
ByteArray(Vec<u8>),
IntegerArray(Vec<i64>),
DoubleArray(Vec<f64>),
StringArray(Vec<String>),
}
pub fn parameter_value_from_rcl(v: &rcl_variant_t) -> ParameterValue {
if v.bool_value != std::ptr::null_mut() {
ParameterValue::Bool(unsafe { *v.bool_value })
} else if v.integer_value != std::ptr::null_mut() {
ParameterValue::Integer(unsafe { *v.integer_value })
} else if v.double_value != std::ptr::null_mut() {
ParameterValue::Double(unsafe { *v.double_value })
} else if v.string_value != std::ptr::null_mut() {
let s = unsafe { CStr::from_ptr(v.string_value) };
let string = s.to_str().unwrap_or("").to_owned();
ParameterValue::String(string)
} else if v.byte_array_value != std::ptr::null_mut() {
let vals = unsafe {
std::slice::from_raw_parts((*v.byte_array_value).values, (*v.byte_array_value).size)
};
ParameterValue::ByteArray(vals.iter().cloned().collect())
} else if v.bool_array_value != std::ptr::null_mut() {
let vals = unsafe {
std::slice::from_raw_parts((*v.bool_array_value).values, (*v.bool_array_value).size)
};
ParameterValue::BoolArray(vals.iter().cloned().collect())
} else if v.integer_array_value != std::ptr::null_mut() {
let vals = unsafe {
std::slice::from_raw_parts(
(*v.integer_array_value).values,
(*v.integer_array_value).size,
)
};
ParameterValue::IntegerArray(vals.iter().cloned().collect())
} else if v.double_array_value != std::ptr::null_mut() {
let vals = unsafe {
std::slice::from_raw_parts(
(*v.double_array_value).values,
(*v.double_array_value).size,
)
};
ParameterValue::DoubleArray(vals.iter().cloned().collect())
} else if v.string_array_value != std::ptr::null_mut() {
let vals = unsafe {
std::slice::from_raw_parts(
(*v.string_array_value).data,
(*v.string_array_value).size,
)
};
let s = vals
.iter()
.map(|cs| {
let s = unsafe { CStr::from_ptr(*cs) };
s.to_str().unwrap_or("").to_owned()
})
.collect();
ParameterValue::StringArray(s)
} else {
ParameterValue::NotSet
}
}

44
src/publishers.rs Normal file
View File

@ -0,0 +1,44 @@
use super::*;
// The publish function is thread safe. ROS2 docs state:
// =============
//
// This function is thread safe so long as access to both the
// publisher and the" `ros_message` is synchronized." That means that
// calling rcl_publish() from multiple threads is allowed, but"
// calling rcl_publish() at the same time as non-thread safe
// publisher" functions is not, e.g. calling rcl_publish() and
// rcl_publisher_fini()" concurrently is not allowed." Before calling
// rcl_publish() the message can change and after calling"
// rcl_publish() the message can change, but it cannot be changed
// during the" publish call." The same `ros_message`, however, can be
// passed to multiple calls of" rcl_publish() simultaneously, even if
// the publishers differ." The `ros_message` is unmodified by
// rcl_publish()."
//
// TODO: I guess there is a potential error source in destructuring
// while calling publish. I don't think its worth to protect with a
// mutex/rwlock for this though...
//
// Methods that mutate need to called from the thread owning the Node.
// I don't think we can count on Node being generally thread-safe.
// So keep pub/sub management and polling contained to one thread
// and send out publishers.
unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
#[derive(Debug, Clone)]
pub struct Publisher<T>
where
T: WrappedTypesupport,
{
pub handle: Weak<rcl_publisher_t>,
pub type_: PhantomData<T>,
}
unsafe impl Send for PublisherUntyped {}
#[derive(Debug, Clone)]
pub struct PublisherUntyped {
pub handle: Weak<rcl_publisher_t>,
pub type_: String,
}

91
src/services.rs Normal file
View File

@ -0,0 +1,91 @@
use super::*;
pub trait Service {
fn handle(&self) -> &rcl_service_t;
fn send_completed_responses(&mut self) -> ();
fn handle_request(&mut self) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
pub struct TypedService<T>
where
T: WrappedServiceTypeSupport,
{
pub rcl_handle: rcl_service_t,
pub sender: mpsc::Sender<ServiceRequest<T>>,
pub outstanding_requests: Vec<oneshot::Receiver<(rmw_request_id_t, T::Response)>>,
}
impl<T: 'static> Service for TypedService<T>
where
T: WrappedServiceTypeSupport,
{
fn handle(&self) -> &rcl_service_t {
&self.rcl_handle
}
fn send_completed_responses(&mut self) -> () {
let mut to_send = vec![];
self.outstanding_requests.retain_mut(|r| {
match r.try_recv() {
Ok(Some(resp)) => {
to_send.push(resp);
false // done with this.
}
Ok(None) => true, // keep message, waiting for service
Err(_) => false, // channel canceled
}
});
for (mut req_id, msg) in to_send {
let mut native_response = WrappedNativeMsg::<T::Response>::from(&msg);
let res = unsafe {
rcl_send_response(
&self.rcl_handle,
&mut req_id,
native_response.void_ptr_mut(),
)
};
// TODO
if res != RCL_RET_OK as i32 {
eprintln!("could not send service response {}", res);
}
}
}
fn handle_request(&mut self) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<T::Request>::new();
let ret = unsafe {
rcl_take_request(
&self.rcl_handle,
request_id.as_mut_ptr(),
request_msg.void_ptr_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
let request_msg = T::Request::from_native(&request_msg);
let (response_sender, response_receiver) =
oneshot::channel::<(rmw_request_id_t, T::Response)>();
self.outstanding_requests.push(response_receiver);
let request = ServiceRequest::<T> {
message: request_msg,
request_id,
response_sender,
};
match self.sender.try_send(request) {
Err(e) => eprintln!("warning: could not send service request ({})", e),
_ => (),
}
} // TODO handle failure.
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_service_fini(&mut self.rcl_handle, node);
}
}
}

131
src/subscribers.rs Normal file
View File

@ -0,0 +1,131 @@
use super::*;
pub trait Subscriber {
fn handle(&self) -> &rcl_subscription_t;
fn handle_incoming(&mut self) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
pub struct TypedSubscriber<T>
where
T: WrappedTypesupport,
{
pub rcl_handle: rcl_subscription_t,
pub sender: mpsc::Sender<T>,
}
pub struct NativeSubscriber<T>
where
T: WrappedTypesupport,
{
pub rcl_handle: rcl_subscription_t,
pub sender: mpsc::Sender<WrappedNativeMsg<T>>,
}
pub struct UntypedSubscriber {
pub rcl_handle: rcl_subscription_t,
pub topic_type: String,
pub sender: mpsc::Sender<Result<serde_json::Value>>,
}
impl<T: 'static> Subscriber for TypedSubscriber<T>
where
T: WrappedTypesupport,
{
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}
fn handle_incoming(&mut self) -> () {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(
&self.rcl_handle,
msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let msg = T::from_native(&msg);
match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
}
}
}
impl<T: 'static> Subscriber for NativeSubscriber<T>
where
T: WrappedTypesupport,
{
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}
fn handle_incoming(&mut self) -> () {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(
&self.rcl_handle,
msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
)
};
if ret == RCL_RET_OK as i32 {
match self.sender.try_send(msg) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
}
}
}
impl Subscriber for UntypedSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}
fn handle_incoming(&mut self) -> () {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type)
.expect(&format!("no typesupport for {}", self.topic_type));
let ret = unsafe {
rcl_take(
&self.rcl_handle,
msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
)
};
if ret == RCL_RET_OK as i32 {
let json = msg.to_json();
match self.sender.try_send(json) {
Err(e) => println!("error {:?}", e),
_ => (),
}
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
}
}
}