diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 6abf78b..39baf8b 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -31,8 +31,9 @@ phf = "0.11.1" [dev-dependencies] serde_json = "1.0.89" futures = "0.3.25" -tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] } rand = "0.8.5" +cdr = "0.2.4" [build-dependencies] r2r_common = { path = "../r2r_common", version = "0.8.2" } diff --git a/r2r/examples/tokio_raw_subscriber.rs b/r2r/examples/tokio_raw_subscriber.rs new file mode 100644 index 0000000..9f238b7 --- /dev/null +++ b/r2r/examples/tokio_raw_subscriber.rs @@ -0,0 +1,52 @@ +use futures::future; +use futures::stream::StreamExt; +use r2r::QosProfile; +use serde::{Deserialize, Serialize}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let p = node.create_publisher::("/topic", QosProfile::default())?; + let sub = node.subscribe_raw("/topic", "std_msgs/msg/String", QosProfile::default())?; + + let pub_task = tokio::task::spawn(async move { + for x in 5..50 { + // Send a string with varying length. + let _ = p.publish(&r2r::std_msgs::msg::String { + data: format!("Hello{:>width$}", "World", width = x), + }); + let _ = tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }); + + tokio::task::spawn_blocking(move || { + for _ in 0..500 { + node.spin_once(std::time::Duration::from_millis(10)); + } + }); + + // Demonstrate that we can deserialize the raw bytes into this + // rust struct using the cdr crate. + #[derive(Deserialize, Serialize, PartialEq, Debug)] + struct OurOwnStdString { + data: String, // the field name can be anything... + } + sub.for_each(|msg| { + println!("got raw bytes of length {}.", msg.len()); + + if let Ok(data) = cdr::deserialize::(&msg) { + println!("contents: {:?}", data); + } else { + println!("Warning: cannot deserialize data."); + } + + future::ready(()) + }) + .await; + + pub_task.await?; + + Ok(()) +} diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 0e417b4..d7a78b2 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -551,14 +551,35 @@ impl Node { // TODO is it possible to handle the raw message without type support? // // Passing null ts to rcl_subscription_init throws an error .. + // + // It does not seem possible to not have a type support, which is a shame + // because it means we always have to build the message types even if we + // are just after the raw bytes. let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; + // Keep a buffer to reduce number of allocations. The rmw will + // resize it if the message size exceeds the buffer size. + let mut msg_buf: rcl_serialized_message_t = + unsafe { rcutils_get_zero_initialized_uint8_array() }; + let ret = unsafe { + rcutils_uint8_array_init( + &mut msg_buf as *mut rcl_serialized_message_t, + 0, + &rcutils_get_default_allocator(), + ) + }; + + if ret != RCL_RET_OK as i32 { + return Err(Error::from_rcl_error(ret)); + } + let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); let ws = RawSubscriber { rcl_handle: subscription_handle, + msg_buf, sender, }; self.subscribers.push(Box::new(ws)); diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 16258c0..ba49571 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -39,10 +39,10 @@ pub struct UntypedSubscriber { pub struct RawSubscriber { pub rcl_handle: rcl_subscription_t, + pub msg_buf: rcl_serialized_message_t, pub sender: mpsc::Sender>, } - impl Subscriber_ for TypedSubscriber where T: WrappedTypesupport, @@ -191,81 +191,24 @@ impl Subscriber_ for RawSubscriber { } fn handle_incoming(&mut self) -> bool { - - // This code is based on: - // - // https://github.com/josephduchesne/rclpy/blob/502e2135498460dd4c74cf3a6fa543590364a1fe/rclpy/src/rclpy/_rclpy.c#L2612-L2649 - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - - let mut msg: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() }; - let allocator: rcutils_allocator_t = unsafe { rcutils_get_default_allocator() }; - let ret: rcl_ret_t = unsafe { - rcutils_uint8_array_init(&mut msg as *mut rcl_serialized_message_t, 0, &allocator) - }; - - if ret != RCL_RET_OK as i32 { - log::error!("Failed to initialize message: {:?}", unsafe { rcutils_get_error_string().str_ }); - unsafe { rcutils_reset_error() }; - - let r_fini: rmw_ret_t = unsafe { - rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) - }; - - if r_fini != RMW_RET_OK as i32 { - log::error!("Failed to deallocate message buffer: {r_fini}"); - } - return false; - } - let ret = unsafe { rcl_take_serialized_message( &self.rcl_handle, - &mut msg as *mut rcl_serialized_message_t, - &mut msg_info, - std::ptr::null_mut()) + &mut self.msg_buf as *mut rcl_serialized_message_t, + &mut msg_info, + std::ptr::null_mut(), + ) }; if ret != RCL_RET_OK as i32 { - log::error!( - "Failed to take_serialized from a subscription: {:?}", - unsafe { rcutils_get_error_string().str_ }); - - // rcl_reset_error(); - unsafe { rcutils_reset_error() }; - - let r_fini: rmw_ret_t = unsafe { - rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) - }; - - if r_fini != RMW_RET_OK as i32 { - log::error!("Failed to deallocate message buffer: {r_fini}"); - } - - return false; - } - - // TODO put rcutils_uint8_array_fini in a message drop guard? - // - // Or is is safe to deallocate with Vec::drop instead of rcutils_uint8_array_fini? - - // let data_bytes = unsafe { - // Vec::from_raw_parts(msg.buffer, msg.buffer_length, msg.buffer_capacity) - // }; - - let data_bytes = unsafe { - std::slice::from_raw_parts(msg.buffer, msg.buffer_length).to_vec() - }; - - let r_fini: rmw_ret_t = unsafe { - rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) - }; - - if r_fini != RMW_RET_OK as i32 { - log::error!("Failed to deallocate message buffer: {r_fini}"); - + log::error!("failed to take serialized message"); return false; } + let data_bytes = unsafe { + std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec() + }; + if let Err(e) = self.sender.try_send(data_bytes) { if e.is_disconnected() { // user dropped the handle to the stream, signal removal. @@ -273,13 +216,14 @@ impl Subscriber_ for RawSubscriber { } log::debug!("error {:?}", e) } - + false } fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); + rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); } } } @@ -308,4 +252,3 @@ pub fn create_subscription_helper( Err(Error::from_rcl_error(result)) } } - diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index db99d7a..17023f8 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -62,14 +62,12 @@ async fn tokio_testing() -> Result<(), Box> { Ok(()) } - #[tokio::test(flavor = "multi_thread")] async fn tokio_subscribe_raw_testing() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; - - let mut sub_int = - node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; + + let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; let mut sub_array = node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; @@ -78,9 +76,10 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> node.create_publisher::("/int", QosProfile::default())?; // Use an array as well since its a variable sized type - let pub_array = - node.create_publisher::("/int_array", QosProfile::default())?; - + let pub_array = node.create_publisher::( + "/int_array", + QosProfile::default(), + )?; task::spawn(async move { (0..10).for_each(|i| { @@ -88,15 +87,15 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> .publish(&r2r::std_msgs::msg::Int32 { data: i }) .unwrap(); - pub_array.publish(&r2r::std_msgs::msg::Int32MultiArray { - layout: r2r::std_msgs::msg::MultiArrayLayout::default(), - data: vec![i] - }) - .unwrap(); + pub_array + .publish(&r2r::std_msgs::msg::Int32MultiArray { + layout: r2r::std_msgs::msg::MultiArrayLayout::default(), + data: vec![i], + }) + .unwrap(); }); }); - let sub_int_handle = task::spawn(async move { while let Some(msg) = sub_int.next().await { println!("Got int msg of len {}", msg.len()); @@ -104,9 +103,8 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> // assert_eq!(msg.len(), 4); // TODO is there padding or something? assert_eq!(msg.len(), 8); - + // assert_eq!(msg,) - } panic!("int msg finished"); @@ -114,10 +112,8 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> let sub_array_handle = task::spawn(async move { while let Some(msg) = sub_array.next().await { - println!("Got array msg of len {}", msg.len()); // assert_eq!(msg.data, ) - } panic!("array msg finished"); @@ -126,9 +122,7 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> let handle = std::thread::spawn(move || { for _ in 1..=30 { node.spin_once(std::time::Duration::from_millis(100)); - } - }); handle.join().unwrap();