diff --git a/r2r/examples/publishers.rs b/r2r/examples/publishers.rs index 1af8540..89d8a34 100644 --- a/r2r/examples/publishers.rs +++ b/r2r/examples/publishers.rs @@ -26,7 +26,7 @@ fn main() -> Result<(), Box> { native.data = count; publisher.publish(&to_send).unwrap(); - publisher2.publish_native(&native).unwrap(); + publisher2.publish_native(&mut native).unwrap(); std::thread::sleep(std::time::Duration::from_millis(100)); count += 1; diff --git a/r2r/src/msg_types.rs b/r2r/src/msg_types.rs index 24e18e5..96d2141 100644 --- a/r2r/src/msg_types.rs +++ b/r2r/src/msg_types.rs @@ -4,6 +4,7 @@ use r2r_rcl::{ rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t, }; use serde::{Deserialize, Serialize}; +use std::boxed::Box; use std::convert::TryInto; use std::fmt::Debug; use std::ops::{Deref, DerefMut}; @@ -107,12 +108,13 @@ pub trait WrappedActionTypeSupport: Debug + Clone { /// This struct wraps a RCL message. /// /// It contains a pointer to a C struct. -#[derive(Debug)] pub struct WrappedNativeMsg where T: WrappedTypesupport, { pub msg: *mut T::CStruct, + pub is_loaned: bool, + deallocator: Option>, } pub trait VoidPtr { @@ -347,11 +349,13 @@ impl Drop for WrappedNativeMsgUntyped { impl WrappedNativeMsg where - T: WrappedTypesupport, + T: WrappedTypesupport + 'static, { pub fn new() -> Self { WrappedNativeMsg { msg: T::create_msg(), + deallocator: Some(Box::new(T::destroy_msg)), + is_loaned: false, } } @@ -360,11 +364,26 @@ where msg.copy_to_native(&mut native_msg); native_msg } + + pub fn from_loaned( + msg: *mut T::CStruct, + deallocator: Box::CStruct)>, + ) -> Self { + WrappedNativeMsg { + msg, + deallocator: Some(deallocator), + is_loaned: true, + } + } + + pub fn release(&mut self) { + self.deallocator.take(); + } } impl Default for WrappedNativeMsg where - T: WrappedTypesupport, + T: WrappedTypesupport + 'static, { fn default() -> Self { Self::new() @@ -389,7 +408,9 @@ where T: WrappedTypesupport, { fn drop(&mut self) { - T::destroy_msg(self.msg); + if let Some(deallocator) = self.deallocator.take() { + (deallocator)(self.msg); + } } } @@ -580,6 +601,21 @@ mod tests { assert_eq!(msg, msg2); } + #[test] + fn test_from_loaned() { + type MsgType = trajectory_msgs::msg::JointTrajectoryPoint; + type CMsgType = ::CStruct; + + let borrowed_msg = MsgType::create_msg(); + + let native = WrappedNativeMsg::::from_loaned( + borrowed_msg as *mut CMsgType, + Box::new(|_: *mut CMsgType| {}), + ); + + assert!(native.void_ptr() == borrowed_msg as *mut core::ffi::c_void); + } + #[cfg(r2r__test_msgs__msg__Defaults)] #[test] fn test_untyped_json_default() { diff --git a/r2r/src/publishers.rs b/r2r/src/publishers.rs index 979ed18..b0a399c 100644 --- a/r2r/src/publishers.rs +++ b/r2r/src/publishers.rs @@ -1,3 +1,4 @@ +use std::ffi::c_void; use std::ffi::CString; use std::fmt::Debug; use std::marker::PhantomData; @@ -163,11 +164,7 @@ where } } - /// Publish a "native" ROS message. - /// - /// This function is useful if you want to bypass the generated - /// rust types as it lets you work with the raw C struct. - pub fn publish_native(&self, msg: &WrappedNativeMsg) -> Result<()> + pub fn borrow_loaned_message(&self) -> Result> where T: WrappedTypesupport, { @@ -177,8 +174,76 @@ where .upgrade() .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; - let result = - unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }; + if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } { + let mut loaned_msg: *mut c_void = std::ptr::null_mut(); + let ret = unsafe { + rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg) + }; + if ret != RCL_RET_OK as i32 { + // TODO: Switch to logging library + eprintln!("Failed getting loaned message"); + return Err(Error::from_rcl_error(ret)) + } + + let handle_box = Box::new(*publisher.as_ref()); + let msg = WrappedNativeMsg::::from_loaned( + loaned_msg as *mut T::CStruct, + Box::new(|msg: *mut T::CStruct| { + let ret = unsafe { + let handle_ptr = Box::into_raw(handle_box); + let ret = rcl_return_loaned_message_from_publisher( + handle_ptr, + msg as *mut c_void, + ); + drop(Box::from_raw(handle_ptr)); + ret + }; + + if ret != RCL_RET_OK as i32 { + panic!("rcl_deallocate_loaned_message failed"); + } + }), + ); + Ok(msg) + } else { + // TODO: Switch to logging library + eprintln!( + "Currently used middleware can't loan messages. Local allocator will be used." + ); + Ok(WrappedNativeMsg::::new()) + } + } + + /// Publish a "native" ROS message. + /// + /// This function is useful if you want to bypass the generated + /// rust types as it lets you work with the raw C struct. + pub fn publish_native(&self, msg: &mut WrappedNativeMsg) -> Result<()> + where + T: WrappedTypesupport, + { + // upgrade to actual ref. if still alive + let publisher = self + .handle + .upgrade() + .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; + + let result = if msg.is_loaned { + unsafe { + // signal that we are relinquishing responsibility of the memory + msg.release(); + + // publish and return loaned message to middleware + rcl_publish_loaned_message( + publisher.as_ref(), + msg.void_ptr_mut(), + std::ptr::null_mut(), + ) + } + } else { + unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) } + }; + if result == RCL_RET_OK as i32 { Ok(()) } else { diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index e0fe6ec..d672063 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -5,6 +5,8 @@ use crate::error::*; use crate::msg_types::*; use crate::qos::QosProfile; use r2r_rcl::*; +use std::ffi::c_void; +use std::ffi::CStr; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; @@ -84,23 +86,61 @@ where fn handle_incoming(&mut self) -> bool { let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take( - &self.rcl_handle, - msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; + let msg = unsafe { + if rcl_subscription_can_loan_messages(&self.rcl_handle) { + let mut loaned_msg: *mut c_void = std::ptr::null_mut(); + let ret = rcl_take_loaned_message( + &self.rcl_handle, + &mut loaned_msg, + &mut msg_info, + std::ptr::null_mut(), + ); + if ret != RCL_RET_OK as i32 { + return false; } - println!("error {:?}", e) + let handle_box = Box::new(self.rcl_handle); + let deallocator = Box::new(|msg: *mut T::CStruct| { + let handle_ptr = Box::into_raw(handle_box); + let ret = + rcl_return_loaned_message_from_subscription(handle_ptr, msg as *mut c_void); + drop(Box::from_raw(handle_ptr)); + if ret != RCL_RET_OK as i32 { + let err_str = rcutils_get_error_string(); + let err_str_ptr = &(err_str.str_) as *const std::os::raw::c_char; + let error_msg = CStr::from_ptr(err_str_ptr); + + let topic_str = rcl_subscription_get_topic_name(handle_ptr); + let topic = CStr::from_ptr(topic_str); + + panic!( + "rcl_return_loaned_message_from_subscription() \ + failed for subscription on topic {}: {}", + topic.to_str().expect("to_str() call failed"), + error_msg.to_str().expect("to_str() call failed") + ); + } + }); + WrappedNativeMsg::::from_loaned(loaned_msg as *mut T::CStruct, deallocator) + } else { + let mut new_msg = WrappedNativeMsg::::new(); + let ret = rcl_take( + &self.rcl_handle, + new_msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ); + if ret != RCL_RET_OK as i32 { + return false; + } + new_msg } + }; + if let Err(e) = self.sender.try_send(msg) { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + eprintln!("error {:?}", e) } false }