Reuse the msg buffer to reduce allocations. Added example. cargo fmt

This commit is contained in:
Martin Dahl 2023-12-13 11:27:52 +01:00
parent 21a6551d81
commit 4c38bce214
5 changed files with 100 additions and 89 deletions

View File

@ -31,8 +31,9 @@ phf = "0.11.1"
[dev-dependencies] [dev-dependencies]
serde_json = "1.0.89" serde_json = "1.0.89"
futures = "0.3.25" futures = "0.3.25"
tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros"] } tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] }
rand = "0.8.5" rand = "0.8.5"
cdr = "0.2.4"
[build-dependencies] [build-dependencies]
r2r_common = { path = "../r2r_common", version = "0.8.2" } r2r_common = { path = "../r2r_common", version = "0.8.2" }

View File

@ -0,0 +1,52 @@
use futures::future;
use futures::stream::StreamExt;
use r2r::QosProfile;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic", QosProfile::default())?;
let sub = node.subscribe_raw("/topic", "std_msgs/msg/String", QosProfile::default())?;
let pub_task = tokio::task::spawn(async move {
for x in 5..50 {
// Send a string with varying length.
let _ = p.publish(&r2r::std_msgs::msg::String {
data: format!("Hello{:>width$}", "World", width = x),
});
let _ = tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
});
tokio::task::spawn_blocking(move || {
for _ in 0..500 {
node.spin_once(std::time::Duration::from_millis(10));
}
});
// Demonstrate that we can deserialize the raw bytes into this
// rust struct using the cdr crate.
#[derive(Deserialize, Serialize, PartialEq, Debug)]
struct OurOwnStdString {
data: String, // the field name can be anything...
}
sub.for_each(|msg| {
println!("got raw bytes of length {}.", msg.len());
if let Ok(data) = cdr::deserialize::<OurOwnStdString>(&msg) {
println!("contents: {:?}", data);
} else {
println!("Warning: cannot deserialize data.");
}
future::ready(())
})
.await;
pub_task.await?;
Ok(())
}

View File

@ -551,14 +551,35 @@ impl Node {
// TODO is it possible to handle the raw message without type support? // TODO is it possible to handle the raw message without type support?
// //
// Passing null ts to rcl_subscription_init throws an error .. // Passing null ts to rcl_subscription_init throws an error ..
//
// It does not seem possible to not have a type support, which is a shame
// because it means we always have to build the message types even if we
// are just after the raw bytes.
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
// Keep a buffer to reduce number of allocations. The rmw will
// resize it if the message size exceeds the buffer size.
let mut msg_buf: rcl_serialized_message_t =
unsafe { rcutils_get_zero_initialized_uint8_array() };
let ret = unsafe {
rcutils_uint8_array_init(
&mut msg_buf as *mut rcl_serialized_message_t,
0,
&rcutils_get_default_allocator(),
)
};
if ret != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(ret));
}
let subscription_handle = let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10); let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
let ws = RawSubscriber { let ws = RawSubscriber {
rcl_handle: subscription_handle, rcl_handle: subscription_handle,
msg_buf,
sender, sender,
}; };
self.subscribers.push(Box::new(ws)); self.subscribers.push(Box::new(ws));

View File

@ -39,10 +39,10 @@ pub struct UntypedSubscriber {
pub struct RawSubscriber { pub struct RawSubscriber {
pub rcl_handle: rcl_subscription_t, pub rcl_handle: rcl_subscription_t,
pub msg_buf: rcl_serialized_message_t,
pub sender: mpsc::Sender<Vec<u8>>, pub sender: mpsc::Sender<Vec<u8>>,
} }
impl<T: 'static> Subscriber_ for TypedSubscriber<T> impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where where
T: WrappedTypesupport, T: WrappedTypesupport,
@ -191,81 +191,24 @@ impl Subscriber_ for RawSubscriber {
} }
fn handle_incoming(&mut self) -> bool { fn handle_incoming(&mut self) -> bool {
// This code is based on:
//
// https://github.com/josephduchesne/rclpy/blob/502e2135498460dd4c74cf3a6fa543590364a1fe/rclpy/src/rclpy/_rclpy.c#L2612-L2649
let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut msg: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() };
let allocator: rcutils_allocator_t = unsafe { rcutils_get_default_allocator() };
let ret: rcl_ret_t = unsafe {
rcutils_uint8_array_init(&mut msg as *mut rcl_serialized_message_t, 0, &allocator)
};
if ret != RCL_RET_OK as i32 {
log::error!("Failed to initialize message: {:?}", unsafe { rcutils_get_error_string().str_ });
unsafe { rcutils_reset_error() };
let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};
if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");
}
return false;
}
let ret = unsafe { let ret = unsafe {
rcl_take_serialized_message( rcl_take_serialized_message(
&self.rcl_handle, &self.rcl_handle,
&mut msg as *mut rcl_serialized_message_t, &mut self.msg_buf as *mut rcl_serialized_message_t,
&mut msg_info, &mut msg_info,
std::ptr::null_mut()) std::ptr::null_mut(),
)
}; };
if ret != RCL_RET_OK as i32 { if ret != RCL_RET_OK as i32 {
log::error!( log::error!("failed to take serialized message");
"Failed to take_serialized from a subscription: {:?}",
unsafe { rcutils_get_error_string().str_ });
// rcl_reset_error();
unsafe { rcutils_reset_error() };
let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};
if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");
}
return false;
}
// TODO put rcutils_uint8_array_fini in a message drop guard?
//
// Or is is safe to deallocate with Vec::drop instead of rcutils_uint8_array_fini?
// let data_bytes = unsafe {
// Vec::from_raw_parts(msg.buffer, msg.buffer_length, msg.buffer_capacity)
// };
let data_bytes = unsafe {
std::slice::from_raw_parts(msg.buffer, msg.buffer_length).to_vec()
};
let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};
if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");
return false; return false;
} }
let data_bytes = unsafe {
std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec()
};
if let Err(e) = self.sender.try_send(data_bytes) { if let Err(e) = self.sender.try_send(data_bytes) {
if e.is_disconnected() { if e.is_disconnected() {
// user dropped the handle to the stream, signal removal. // user dropped the handle to the stream, signal removal.
@ -273,13 +216,14 @@ impl Subscriber_ for RawSubscriber {
} }
log::debug!("error {:?}", e) log::debug!("error {:?}", e)
} }
false false
} }
fn destroy(&mut self, node: &mut rcl_node_t) { fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe { unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node); rcl_subscription_fini(&mut self.rcl_handle, node);
rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t);
} }
} }
} }
@ -308,4 +252,3 @@ pub fn create_subscription_helper(
Err(Error::from_rcl_error(result)) Err(Error::from_rcl_error(result))
} }
} }

View File

@ -62,14 +62,12 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>> { async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?; let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?; let mut node = r2r::Node::create(ctx, "testnode", "")?;
let mut sub_int = let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?;
node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?;
let mut sub_array = let mut sub_array =
node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?;
@ -78,9 +76,10 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
node.create_publisher::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default())?; node.create_publisher::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default())?;
// Use an array as well since its a variable sized type // Use an array as well since its a variable sized type
let pub_array = let pub_array = node.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>(
node.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>("/int_array", QosProfile::default())?; "/int_array",
QosProfile::default(),
)?;
task::spawn(async move { task::spawn(async move {
(0..10).for_each(|i| { (0..10).for_each(|i| {
@ -88,15 +87,15 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
.publish(&r2r::std_msgs::msg::Int32 { data: i }) .publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap(); .unwrap();
pub_array.publish(&r2r::std_msgs::msg::Int32MultiArray { pub_array
layout: r2r::std_msgs::msg::MultiArrayLayout::default(), .publish(&r2r::std_msgs::msg::Int32MultiArray {
data: vec![i] layout: r2r::std_msgs::msg::MultiArrayLayout::default(),
}) data: vec![i],
.unwrap(); })
.unwrap();
}); });
}); });
let sub_int_handle = task::spawn(async move { let sub_int_handle = task::spawn(async move {
while let Some(msg) = sub_int.next().await { while let Some(msg) = sub_int.next().await {
println!("Got int msg of len {}", msg.len()); println!("Got int msg of len {}", msg.len());
@ -104,9 +103,8 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
// assert_eq!(msg.len(), 4); // assert_eq!(msg.len(), 4);
// TODO is there padding or something? // TODO is there padding or something?
assert_eq!(msg.len(), 8); assert_eq!(msg.len(), 8);
// assert_eq!(msg,) // assert_eq!(msg,)
} }
panic!("int msg finished"); panic!("int msg finished");
@ -114,10 +112,8 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
let sub_array_handle = task::spawn(async move { let sub_array_handle = task::spawn(async move {
while let Some(msg) = sub_array.next().await { while let Some(msg) = sub_array.next().await {
println!("Got array msg of len {}", msg.len()); println!("Got array msg of len {}", msg.len());
// assert_eq!(msg.data, ) // assert_eq!(msg.data, )
} }
panic!("array msg finished"); panic!("array msg finished");
@ -126,9 +122,7 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
let handle = std::thread::spawn(move || { let handle = std::thread::spawn(move || {
for _ in 1..=30 { for _ in 1..=30 {
node.spin_once(std::time::Duration::from_millis(100)); node.spin_once(std::time::Duration::from_millis(100));
} }
}); });
handle.join().unwrap(); handle.join().unwrap();