diff --git a/README.md b/README.md index 83f9f5e..dcc0722 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ What works? Changelog -------------------- #### [Unreleased] +- Raw message subscribers. #### [0.8.2] - 2023-12-11 - Fix include path regression on linux. diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 6abf78b..39baf8b 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -31,8 +31,9 @@ phf = "0.11.1" [dev-dependencies] serde_json = "1.0.89" futures = "0.3.25" -tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] } rand = "0.8.5" +cdr = "0.2.4" [build-dependencies] r2r_common = { path = "../r2r_common", version = "0.8.2" } diff --git a/r2r/examples/tokio_raw_subscriber.rs b/r2r/examples/tokio_raw_subscriber.rs new file mode 100644 index 0000000..9f238b7 --- /dev/null +++ b/r2r/examples/tokio_raw_subscriber.rs @@ -0,0 +1,52 @@ +use futures::future; +use futures::stream::StreamExt; +use r2r::QosProfile; +use serde::{Deserialize, Serialize}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let p = node.create_publisher::("/topic", QosProfile::default())?; + let sub = node.subscribe_raw("/topic", "std_msgs/msg/String", QosProfile::default())?; + + let pub_task = tokio::task::spawn(async move { + for x in 5..50 { + // Send a string with varying length. + let _ = p.publish(&r2r::std_msgs::msg::String { + data: format!("Hello{:>width$}", "World", width = x), + }); + let _ = tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }); + + tokio::task::spawn_blocking(move || { + for _ in 0..500 { + node.spin_once(std::time::Duration::from_millis(10)); + } + }); + + // Demonstrate that we can deserialize the raw bytes into this + // rust struct using the cdr crate. + #[derive(Deserialize, Serialize, PartialEq, Debug)] + struct OurOwnStdString { + data: String, // the field name can be anything... + } + sub.for_each(|msg| { + println!("got raw bytes of length {}.", msg.len()); + + if let Ok(data) = cdr::deserialize::(&msg) { + println!("contents: {:?}", data); + } else { + println!("Warning: cannot deserialize data."); + } + + future::ready(()) + }) + .await; + + pub_task.await?; + + Ok(()) +} diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 5f3e8f8..d7a78b2 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -541,6 +541,51 @@ impl Node { Ok(receiver) } + /// Subscribe to a ROS topic. + /// + /// This function returns a `Stream` of ros messages as non-deserialized `Vec`:s. + /// Useful if you just want to pass the data along to another part of the system. + pub fn subscribe_raw( + &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, + ) -> Result> + Unpin> { + // TODO is it possible to handle the raw message without type support? + // + // Passing null ts to rcl_subscription_init throws an error .. + // + // It does not seem possible to not have a type support, which is a shame + // because it means we always have to build the message types even if we + // are just after the raw bytes. + let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; + + // Keep a buffer to reduce number of allocations. The rmw will + // resize it if the message size exceeds the buffer size. + let mut msg_buf: rcl_serialized_message_t = + unsafe { rcutils_get_zero_initialized_uint8_array() }; + let ret = unsafe { + rcutils_uint8_array_init( + &mut msg_buf as *mut rcl_serialized_message_t, + 0, + &rcutils_get_default_allocator(), + ) + }; + + if ret != RCL_RET_OK as i32 { + return Err(Error::from_rcl_error(ret)); + } + + let subscription_handle = + create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; + let (sender, receiver) = mpsc::channel::>(10); + + let ws = RawSubscriber { + rcl_handle: subscription_handle, + msg_buf, + sender, + }; + self.subscribers.push(Box::new(ws)); + Ok(receiver) + } + /// Create a ROS service. /// /// This function returns a `Stream` of `ServiceRequest`:s. Call diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 5367ab0..ba49571 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -37,6 +37,12 @@ pub struct UntypedSubscriber { pub sender: mpsc::Sender>, } +pub struct RawSubscriber { + pub rcl_handle: rcl_subscription_t, + pub msg_buf: rcl_serialized_message_t, + pub sender: mpsc::Sender>, +} + impl Subscriber_ for TypedSubscriber where T: WrappedTypesupport, @@ -179,6 +185,49 @@ impl Subscriber_ for UntypedSubscriber { } } +impl Subscriber_ for RawSubscriber { + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> bool { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let ret = unsafe { + rcl_take_serialized_message( + &self.rcl_handle, + &mut self.msg_buf as *mut rcl_serialized_message_t, + &mut msg_info, + std::ptr::null_mut(), + ) + }; + if ret != RCL_RET_OK as i32 { + log::error!("failed to take serialized message"); + return false; + } + + let data_bytes = unsafe { + std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec() + }; + + if let Err(e) = self.sender.try_send(data_bytes) { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + log::debug!("error {:?}", e) + } + + false + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); + } + } +} + pub fn create_subscription_helper( node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, qos_profile: QosProfile, diff --git a/r2r/tests/tokio_test_raw.rs b/r2r/tests/tokio_test_raw.rs new file mode 100644 index 0000000..e619449 --- /dev/null +++ b/r2r/tests/tokio_test_raw.rs @@ -0,0 +1,65 @@ +use futures::stream::StreamExt; +use r2r::QosProfile; +use tokio::task; + +#[tokio::test(flavor = "multi_thread")] +async fn tokio_subscribe_raw_testing() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode2", "")?; + + let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; + + let mut sub_array = + node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; + + let pub_int = + node.create_publisher::("/int", QosProfile::default())?; + + // Use an array as well since its a variable sized type + let pub_array = node.create_publisher::( + "/int_array", + QosProfile::default(), + )?; + + task::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + (0..10).for_each(|i| { + pub_int + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); + + pub_array + .publish(&r2r::std_msgs::msg::Int32MultiArray { + layout: r2r::std_msgs::msg::MultiArrayLayout::default(), + data: vec![i], + }) + .unwrap(); + }); + }); + + let sub_int_handle = task::spawn(async move { + while let Some(msg) = sub_int.next().await { + println!("Got int msg of len {}", msg.len()); + assert_eq!(msg.len(), 8); + } + }); + + let sub_array_handle = task::spawn(async move { + while let Some(msg) = sub_array.next().await { + println!("Got array msg of len {}", msg.len()); + assert_eq!(msg.len(), 20); + } + }); + + let handle = std::thread::spawn(move || { + for _ in 1..=30 { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + sub_int_handle.await?; + sub_array_handle.await?; + handle.join().unwrap(); + + Ok(()) +} diff --git a/r2r_msg_gen/src/lib.rs b/r2r_msg_gen/src/lib.rs index e019d1f..26c3d19 100644 --- a/r2r_msg_gen/src/lib.rs +++ b/r2r_msg_gen/src/lib.rs @@ -700,10 +700,21 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> proc_macr .into_iter() .flatten() .map(|(const_name, typ)| { - let typ: Box = syn::parse_str(typ).unwrap(); let const_name = format_ident!("{const_name}"); let value = format_ident!("{key}__{const_name}"); - quote! { pub const #const_name: #typ = #value; } + if let Ok(mut typ) = syn::parse_str::>(typ) { + // If the constant is a reference, rustc needs it to be static. + // (see https://github.com/rust-lang/rust/issues/115010) + typ.lifetime = Some(syn::Lifetime::new("'static", proc_macro2::Span::call_site())); + quote! { pub const #const_name: #typ = #value; } + } + else if let Ok(typ) = syn::parse_str::>(typ) { + // Value + quote! { pub const #const_name: #typ = #value; } + } else { + // Something else, hope for the best but will most likely fail to compile. + quote! { pub const #const_name: #typ = #value; } + } }) .collect();