diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 5f3e8f8..0e417b4 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -541,6 +541,30 @@ impl Node { Ok(receiver) } + /// Subscribe to a ROS topic. + /// + /// This function returns a `Stream` of ros messages as non-deserialized `Vec`:s. + /// Useful if you just want to pass the data along to another part of the system. + pub fn subscribe_raw( + &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, + ) -> Result> + Unpin> { + // TODO is it possible to handle the raw message without type support? + // + // Passing null ts to rcl_subscription_init throws an error .. + let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; + + let subscription_handle = + create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; + let (sender, receiver) = mpsc::channel::>(10); + + let ws = RawSubscriber { + rcl_handle: subscription_handle, + sender, + }; + self.subscribers.push(Box::new(ws)); + Ok(receiver) + } + /// Create a ROS service. /// /// This function returns a `Stream` of `ServiceRequest`:s. Call diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 5367ab0..16258c0 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -37,6 +37,12 @@ pub struct UntypedSubscriber { pub sender: mpsc::Sender>, } +pub struct RawSubscriber { + pub rcl_handle: rcl_subscription_t, + pub sender: mpsc::Sender>, +} + + impl Subscriber_ for TypedSubscriber where T: WrappedTypesupport, @@ -179,6 +185,105 @@ impl Subscriber_ for UntypedSubscriber { } } +impl Subscriber_ for RawSubscriber { + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> bool { + + // This code is based on: + // + // https://github.com/josephduchesne/rclpy/blob/502e2135498460dd4c74cf3a6fa543590364a1fe/rclpy/src/rclpy/_rclpy.c#L2612-L2649 + + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + + let mut msg: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() }; + let allocator: rcutils_allocator_t = unsafe { rcutils_get_default_allocator() }; + let ret: rcl_ret_t = unsafe { + rcutils_uint8_array_init(&mut msg as *mut rcl_serialized_message_t, 0, &allocator) + }; + + if ret != RCL_RET_OK as i32 { + log::error!("Failed to initialize message: {:?}", unsafe { rcutils_get_error_string().str_ }); + unsafe { rcutils_reset_error() }; + + let r_fini: rmw_ret_t = unsafe { + rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) + }; + + if r_fini != RMW_RET_OK as i32 { + log::error!("Failed to deallocate message buffer: {r_fini}"); + } + return false; + } + + let ret = unsafe { + rcl_take_serialized_message( + &self.rcl_handle, + &mut msg as *mut rcl_serialized_message_t, + &mut msg_info, + std::ptr::null_mut()) + }; + if ret != RCL_RET_OK as i32 { + log::error!( + "Failed to take_serialized from a subscription: {:?}", + unsafe { rcutils_get_error_string().str_ }); + + // rcl_reset_error(); + unsafe { rcutils_reset_error() }; + + let r_fini: rmw_ret_t = unsafe { + rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) + }; + + if r_fini != RMW_RET_OK as i32 { + log::error!("Failed to deallocate message buffer: {r_fini}"); + } + + return false; + } + + // TODO put rcutils_uint8_array_fini in a message drop guard? + // + // Or is is safe to deallocate with Vec::drop instead of rcutils_uint8_array_fini? + + // let data_bytes = unsafe { + // Vec::from_raw_parts(msg.buffer, msg.buffer_length, msg.buffer_capacity) + // }; + + let data_bytes = unsafe { + std::slice::from_raw_parts(msg.buffer, msg.buffer_length).to_vec() + }; + + let r_fini: rmw_ret_t = unsafe { + rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t) + }; + + if r_fini != RMW_RET_OK as i32 { + log::error!("Failed to deallocate message buffer: {r_fini}"); + + return false; + } + + if let Err(e) = self.sender.try_send(data_bytes) { + if e.is_disconnected() { + // user dropped the handle to the stream, signal removal. + return true; + } + log::debug!("error {:?}", e) + } + + false + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +} + pub fn create_subscription_helper( node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, qos_profile: QosProfile, @@ -203,3 +308,4 @@ pub fn create_subscription_helper( Err(Error::from_rcl_error(result)) } } + diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index 0736cb5..db99d7a 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -61,3 +61,81 @@ async fn tokio_testing() -> Result<(), Box> { assert_eq!(x, 19); Ok(()) } + + +#[tokio::test(flavor = "multi_thread")] +async fn tokio_subscribe_raw_testing() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let mut sub_int = + node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; + + let mut sub_array = + node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; + + let pub_int = + node.create_publisher::("/int", QosProfile::default())?; + + // Use an array as well since its a variable sized type + let pub_array = + node.create_publisher::("/int_array", QosProfile::default())?; + + + task::spawn(async move { + (0..10).for_each(|i| { + pub_int + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); + + pub_array.publish(&r2r::std_msgs::msg::Int32MultiArray { + layout: r2r::std_msgs::msg::MultiArrayLayout::default(), + data: vec![i] + }) + .unwrap(); + }); + }); + + + let sub_int_handle = task::spawn(async move { + while let Some(msg) = sub_int.next().await { + println!("Got int msg of len {}", msg.len()); + + // assert_eq!(msg.len(), 4); + // TODO is there padding or something? + assert_eq!(msg.len(), 8); + + // assert_eq!(msg,) + + } + + panic!("int msg finished"); + }); + + let sub_array_handle = task::spawn(async move { + while let Some(msg) = sub_array.next().await { + + println!("Got array msg of len {}", msg.len()); + // assert_eq!(msg.data, ) + + } + + panic!("array msg finished"); + }); + + let handle = std::thread::spawn(move || { + for _ in 1..=30 { + node.spin_once(std::time::Duration::from_millis(100)); + + } + + }); + + handle.join().unwrap(); + + // This means something panicked .. + assert!(!sub_int_handle.is_finished()); + assert!(!sub_array_handle.is_finished()); + + Ok(()) +}