Add loaned message support to R2R

Add loaned message support in R2R's subscriber to
take advantage of the shared memory backend of the middleware.

- Add new constructor to `WrappedNativeMsg` which
  takes a already allocated pointer, and a deallocator.
- If the subscription can loan messages, construct
  `WrappedNativeMsg` with the loaned message pointer.
- Pass in also a closure  as the deallocator that calls
  the right APIs to return the loaned message to the middleware layer.
- The deallocator is called in the destructor of `WrappedNativeMsg`
- Add borrow_loaned_message API to publisher
- Add release() api to WrappedNativeMsg so a loaded message
  could be manually released if not published.
This commit is contained in:
Liyou Zhou 2023-03-17 12:32:17 +00:00 committed by Martin Dahl
parent 88c16ba598
commit c092123e82
4 changed files with 168 additions and 27 deletions

View File

@ -26,7 +26,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
native.data = count;
publisher.publish(&to_send).unwrap();
publisher2.publish_native(&native).unwrap();
publisher2.publish_native(&mut native).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
count += 1;

View File

@ -4,6 +4,7 @@ use r2r_rcl::{
rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t,
};
use serde::{Deserialize, Serialize};
use std::boxed::Box;
use std::convert::TryInto;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
@ -107,12 +108,13 @@ pub trait WrappedActionTypeSupport: Debug + Clone {
/// This struct wraps a RCL message.
///
/// It contains a pointer to a C struct.
#[derive(Debug)]
pub struct WrappedNativeMsg<T>
where
T: WrappedTypesupport,
{
pub msg: *mut T::CStruct,
pub is_loaned: bool,
deallocator: Option<Box<dyn FnOnce(*mut T::CStruct)>>,
}
pub trait VoidPtr {
@ -347,11 +349,13 @@ impl Drop for WrappedNativeMsgUntyped {
impl<T> WrappedNativeMsg<T>
where
T: WrappedTypesupport,
T: WrappedTypesupport + 'static,
{
pub fn new() -> Self {
WrappedNativeMsg {
msg: T::create_msg(),
deallocator: Some(Box::new(T::destroy_msg)),
is_loaned: false,
}
}
@ -360,11 +364,26 @@ where
msg.copy_to_native(&mut native_msg);
native_msg
}
pub fn from_loaned(
msg: *mut T::CStruct,
deallocator: Box<dyn FnOnce(*mut <T as WrappedTypesupport>::CStruct)>,
) -> Self {
WrappedNativeMsg {
msg,
deallocator: Some(deallocator),
is_loaned: true,
}
}
pub fn release(&mut self) {
self.deallocator.take();
}
}
impl<T> Default for WrappedNativeMsg<T>
where
T: WrappedTypesupport,
T: WrappedTypesupport + 'static,
{
fn default() -> Self {
Self::new()
@ -389,7 +408,9 @@ where
T: WrappedTypesupport,
{
fn drop(&mut self) {
T::destroy_msg(self.msg);
if let Some(deallocator) = self.deallocator.take() {
(deallocator)(self.msg);
}
}
}
@ -580,6 +601,21 @@ mod tests {
assert_eq!(msg, msg2);
}
#[test]
fn test_from_loaned() {
type MsgType = trajectory_msgs::msg::JointTrajectoryPoint;
type CMsgType = <MsgType as WrappedTypesupport>::CStruct;
let borrowed_msg = MsgType::create_msg();
let native = WrappedNativeMsg::<MsgType>::from_loaned(
borrowed_msg as *mut CMsgType,
Box::new(|_: *mut CMsgType| {}),
);
assert!(native.void_ptr() == borrowed_msg as *mut core::ffi::c_void);
}
#[cfg(r2r__test_msgs__msg__Defaults)]
#[test]
fn test_untyped_json_default() {

View File

@ -1,3 +1,4 @@
use std::ffi::c_void;
use std::ffi::CString;
use std::fmt::Debug;
use std::marker::PhantomData;
@ -163,11 +164,7 @@ where
}
}
/// Publish a "native" ROS message.
///
/// This function is useful if you want to bypass the generated
/// rust types as it lets you work with the raw C struct.
pub fn publish_native(&self, msg: &WrappedNativeMsg<T>) -> Result<()>
pub fn borrow_loaned_message(&self) -> Result<WrappedNativeMsg<T>>
where
T: WrappedTypesupport,
{
@ -177,8 +174,76 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let result =
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) };
if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } {
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
let ret = unsafe {
rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg)
};
if ret != RCL_RET_OK as i32 {
// TODO: Switch to logging library
eprintln!("Failed getting loaned message");
return Err(Error::from_rcl_error(ret))
}
let handle_box = Box::new(*publisher.as_ref());
let msg = WrappedNativeMsg::<T>::from_loaned(
loaned_msg as *mut T::CStruct,
Box::new(|msg: *mut T::CStruct| {
let ret = unsafe {
let handle_ptr = Box::into_raw(handle_box);
let ret = rcl_return_loaned_message_from_publisher(
handle_ptr,
msg as *mut c_void,
);
drop(Box::from_raw(handle_ptr));
ret
};
if ret != RCL_RET_OK as i32 {
panic!("rcl_deallocate_loaned_message failed");
}
}),
);
Ok(msg)
} else {
// TODO: Switch to logging library
eprintln!(
"Currently used middleware can't loan messages. Local allocator will be used."
);
Ok(WrappedNativeMsg::<T>::new())
}
}
/// Publish a "native" ROS message.
///
/// This function is useful if you want to bypass the generated
/// rust types as it lets you work with the raw C struct.
pub fn publish_native(&self, msg: &mut WrappedNativeMsg<T>) -> Result<()>
where
T: WrappedTypesupport,
{
// upgrade to actual ref. if still alive
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let result = if msg.is_loaned {
unsafe {
// signal that we are relinquishing responsibility of the memory
msg.release();
// publish and return loaned message to middleware
rcl_publish_loaned_message(
publisher.as_ref(),
msg.void_ptr_mut(),
std::ptr::null_mut(),
)
}
} else {
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }
};
if result == RCL_RET_OK as i32 {
Ok(())
} else {

View File

@ -5,6 +5,8 @@ use crate::error::*;
use crate::msg_types::*;
use crate::qos::QosProfile;
use r2r_rcl::*;
use std::ffi::c_void;
use std::ffi::CStr;
pub trait Subscriber_ {
fn handle(&self) -> &rcl_subscription_t;
@ -84,23 +86,61 @@ where
fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg = WrappedNativeMsg::<T>::new();
let ret = unsafe {
rcl_take(
&self.rcl_handle,
msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
)
};
if ret == RCL_RET_OK as i32 {
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
let msg = unsafe {
if rcl_subscription_can_loan_messages(&self.rcl_handle) {
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
let ret = rcl_take_loaned_message(
&self.rcl_handle,
&mut loaned_msg,
&mut msg_info,
std::ptr::null_mut(),
);
if ret != RCL_RET_OK as i32 {
return false;
}
println!("error {:?}", e)
let handle_box = Box::new(self.rcl_handle);
let deallocator = Box::new(|msg: *mut T::CStruct| {
let handle_ptr = Box::into_raw(handle_box);
let ret =
rcl_return_loaned_message_from_subscription(handle_ptr, msg as *mut c_void);
drop(Box::from_raw(handle_ptr));
if ret != RCL_RET_OK as i32 {
let err_str = rcutils_get_error_string();
let err_str_ptr = &(err_str.str_) as *const std::os::raw::c_char;
let error_msg = CStr::from_ptr(err_str_ptr);
let topic_str = rcl_subscription_get_topic_name(handle_ptr);
let topic = CStr::from_ptr(topic_str);
panic!(
"rcl_return_loaned_message_from_subscription() \
failed for subscription on topic {}: {}",
topic.to_str().expect("to_str() call failed"),
error_msg.to_str().expect("to_str() call failed")
);
}
});
WrappedNativeMsg::<T>::from_loaned(loaned_msg as *mut T::CStruct, deallocator)
} else {
let mut new_msg = WrappedNativeMsg::<T>::new();
let ret = rcl_take(
&self.rcl_handle,
new_msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
);
if ret != RCL_RET_OK as i32 {
return false;
}
new_msg
}
};
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
eprintln!("error {:?}", e)
}
false
}