Merge branch 'master' of github.com:mchhoy/r2r into message_serialization_helpers

This commit is contained in:
Michael Hoy 2023-12-13 22:42:31 +08:00
commit d9cab1058e
7 changed files with 227 additions and 3 deletions

View File

@ -45,6 +45,7 @@ What works?
Changelog
--------------------
#### [Unreleased]
- Raw message subscribers. <https://github.com/sequenceplanner/r2r/pull/73>
#### [0.8.2] - 2023-12-11
- Fix include path regression on linux. <https://github.com/sequenceplanner/r2r/pull/71>

View File

@ -31,8 +31,9 @@ phf = "0.11.1"
[dev-dependencies]
serde_json = "1.0.89"
futures = "0.3.25"
tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] }
rand = "0.8.5"
cdr = "0.2.4"
[build-dependencies]
r2r_common = { path = "../r2r_common", version = "0.8.2" }

View File

@ -0,0 +1,52 @@
use futures::future;
use futures::stream::StreamExt;
use r2r::QosProfile;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic", QosProfile::default())?;
let sub = node.subscribe_raw("/topic", "std_msgs/msg/String", QosProfile::default())?;
let pub_task = tokio::task::spawn(async move {
for x in 5..50 {
// Send a string with varying length.
let _ = p.publish(&r2r::std_msgs::msg::String {
data: format!("Hello{:>width$}", "World", width = x),
});
let _ = tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
});
tokio::task::spawn_blocking(move || {
for _ in 0..500 {
node.spin_once(std::time::Duration::from_millis(10));
}
});
// Demonstrate that we can deserialize the raw bytes into this
// rust struct using the cdr crate.
#[derive(Deserialize, Serialize, PartialEq, Debug)]
struct OurOwnStdString {
data: String, // the field name can be anything...
}
sub.for_each(|msg| {
println!("got raw bytes of length {}.", msg.len());
if let Ok(data) = cdr::deserialize::<OurOwnStdString>(&msg) {
println!("contents: {:?}", data);
} else {
println!("Warning: cannot deserialize data.");
}
future::ready(())
})
.await;
pub_task.await?;
Ok(())
}

View File

@ -541,6 +541,51 @@ impl Node {
Ok(receiver)
}
/// Subscribe to a ROS topic.
///
/// This function returns a `Stream` of ros messages as non-deserialized `Vec<u8>`:s.
/// Useful if you just want to pass the data along to another part of the system.
pub fn subscribe_raw(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
// TODO is it possible to handle the raw message without type support?
//
// Passing null ts to rcl_subscription_init throws an error ..
//
// It does not seem possible to not have a type support, which is a shame
// because it means we always have to build the message types even if we
// are just after the raw bytes.
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
// Keep a buffer to reduce number of allocations. The rmw will
// resize it if the message size exceeds the buffer size.
let mut msg_buf: rcl_serialized_message_t =
unsafe { rcutils_get_zero_initialized_uint8_array() };
let ret = unsafe {
rcutils_uint8_array_init(
&mut msg_buf as *mut rcl_serialized_message_t,
0,
&rcutils_get_default_allocator(),
)
};
if ret != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(ret));
}
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
let ws = RawSubscriber {
rcl_handle: subscription_handle,
msg_buf,
sender,
};
self.subscribers.push(Box::new(ws));
Ok(receiver)
}
/// Create a ROS service.
///
/// This function returns a `Stream` of `ServiceRequest`:s. Call

View File

@ -37,6 +37,12 @@ pub struct UntypedSubscriber {
pub sender: mpsc::Sender<Result<serde_json::Value>>,
}
pub struct RawSubscriber {
pub rcl_handle: rcl_subscription_t,
pub msg_buf: rcl_serialized_message_t,
pub sender: mpsc::Sender<Vec<u8>>,
}
impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where
T: WrappedTypesupport,
@ -179,6 +185,49 @@ impl Subscriber_ for UntypedSubscriber {
}
}
impl Subscriber_ for RawSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}
fn handle_incoming(&mut self) -> bool {
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let ret = unsafe {
rcl_take_serialized_message(
&self.rcl_handle,
&mut self.msg_buf as *mut rcl_serialized_message_t,
&mut msg_info,
std::ptr::null_mut(),
)
};
if ret != RCL_RET_OK as i32 {
log::error!("failed to take serialized message");
return false;
}
let data_bytes = unsafe {
std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec()
};
if let Err(e) = self.sender.try_send(data_bytes) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
log::debug!("error {:?}", e)
}
false
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t);
}
}
}
pub fn create_subscription_helper(
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
qos_profile: QosProfile,

View File

@ -0,0 +1,65 @@
use futures::stream::StreamExt;
use r2r::QosProfile;
use tokio::task;
#[tokio::test(flavor = "multi_thread")]
async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode2", "")?;
let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?;
let mut sub_array =
node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?;
let pub_int =
node.create_publisher::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default())?;
// Use an array as well since its a variable sized type
let pub_array = node.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>(
"/int_array",
QosProfile::default(),
)?;
task::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
(0..10).for_each(|i| {
pub_int
.publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap();
pub_array
.publish(&r2r::std_msgs::msg::Int32MultiArray {
layout: r2r::std_msgs::msg::MultiArrayLayout::default(),
data: vec![i],
})
.unwrap();
});
});
let sub_int_handle = task::spawn(async move {
while let Some(msg) = sub_int.next().await {
println!("Got int msg of len {}", msg.len());
assert_eq!(msg.len(), 8);
}
});
let sub_array_handle = task::spawn(async move {
while let Some(msg) = sub_array.next().await {
println!("Got array msg of len {}", msg.len());
assert_eq!(msg.len(), 20);
}
});
let handle = std::thread::spawn(move || {
for _ in 1..=30 {
node.spin_once(std::time::Duration::from_millis(100));
}
});
sub_int_handle.await?;
sub_array_handle.await?;
handle.join().unwrap();
Ok(())
}

View File

@ -700,10 +700,21 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> proc_macr
.into_iter()
.flatten()
.map(|(const_name, typ)| {
let typ: Box<syn::Type> = syn::parse_str(typ).unwrap();
let const_name = format_ident!("{const_name}");
let value = format_ident!("{key}__{const_name}");
quote! { pub const #const_name: #typ = #value; }
if let Ok(mut typ) = syn::parse_str::<Box<syn::TypeReference>>(typ) {
// If the constant is a reference, rustc needs it to be static.
// (see https://github.com/rust-lang/rust/issues/115010)
typ.lifetime = Some(syn::Lifetime::new("'static", proc_macro2::Span::call_site()));
quote! { pub const #const_name: #typ = #value; }
}
else if let Ok(typ) = syn::parse_str::<Box<syn::Type>>(typ) {
// Value
quote! { pub const #const_name: #typ = #value; }
} else {
// Something else, hope for the best but will most likely fail to compile.
quote! { pub const #const_name: #typ = #value; }
}
})
.collect();