Support for querying the network and working with "untyped" msgs

It's currently a huge hack that came about because I wanted to
implement a rostopic echo function. For this we need to look up the
ros msg name during runtime. We use serde_json to store these
"untyped" messages.
This commit is contained in:
Martin Dahl 2019-08-27 13:33:30 +02:00
parent f88bd94eb5
commit 3ae38ed3a9
8 changed files with 316 additions and 49 deletions

View File

@ -9,6 +9,7 @@ edition = "2018"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
common = { path = "common", version = "0.0.1" }
rcl = { path = "rcl", version = "0.0.1" }
msg_gen = { path = "msg_gen", version = "0.0.1" }

View File

@ -1,16 +1,26 @@
use msg_gen::*;
use common::*;
use msg_gen::*;
use std::env;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
fn main() {
let msgs = read_file("./msgs.txt").expect("You need to create msgs.txt");
let msgs = parse_msgs(&msgs);
let msgs = as_map(&msgs);
let msgs_str = read_file("./msgs.txt").expect("You need to create msgs.txt");
let msgs_list = parse_msgs(&msgs_str);
let msgs = as_map(&msgs_list);
let mut codegen = String::new();
for (module, prefixes) in &msgs {
println!("cargo:rustc-link-lib=dylib={}__rosidl_typesupport_c", module);
println!("cargo:rustc-link-lib=dylib={}__rosidl_typesupport_introspection_c", module);
println!(
"cargo:rustc-link-lib=dylib={}__rosidl_typesupport_c",
module
);
println!(
"cargo:rustc-link-lib=dylib={}__rosidl_typesupport_introspection_c",
module
);
println!("cargo:rustc-link-lib=dylib={}__rosidl_generator_c", module);
codegen.push_str(&format!("pub mod {} {{\n", module));
@ -24,14 +34,19 @@ fn main() {
}
codegen.push_str(" }\n");
}
codegen.push_str("}\n");
}
use std::io::Write;
use std::fs::File;
let mut f = File::create("src/generated_msgs.rs").unwrap();
let codegen_typehacks = generate_untyped_helpers(&msgs_list);
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
let msgs_fn = out_path.join("generated_msgs.rs");
let hacks_fn = out_path.join("generated_typehacks.rs");
let mut f = File::create(msgs_fn).unwrap();
write!(f, "{}", codegen).unwrap();
let mut f = File::create(hacks_fn).unwrap();
write!(f, "{}", codegen_typehacks).unwrap();
}

View File

@ -8,6 +8,7 @@ edition = "2018"
lazy_static = "1.3.0"
libc = "0.2.0"
rcl = { path = "../rcl", version = "0.0.1" }
common = { path = "../common", version = "0.0.1" }
[build-dependencies]
bindgen = "0.50.0"

View File

@ -1,6 +1,8 @@
extern crate bindgen;
use std::env;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use common::*;
@ -13,43 +15,57 @@ fn main() {
let msg_map = as_map(&msg_list);
for module in msg_map.keys() {
println!("cargo:rustc-link-lib=dylib={}__rosidl_typesupport_c", module);
println!("cargo:rustc-link-lib=dylib={}__rosidl_typesupport_introspection_c", module);
println!(
"cargo:rustc-link-lib=dylib={}__rosidl_typesupport_c",
module
);
println!(
"cargo:rustc-link-lib=dylib={}__rosidl_typesupport_introspection_c",
module
);
println!("cargo:rustc-link-lib=dylib={}__rosidl_generator_c", module);
}
let mut includes = String::new();
let mut introspecion_map = String::from("\
let mut introspecion_map = String::from(
"\
lazy_static! { \n
static ref INTROSPECTION_FNS: HashMap<&'static str, usize> = {\n
let mut m = HashMap::new();\n");
let mut m = HashMap::new();\n",
);
for msg in msg_list {
// filename is certainly CamelCase -> snake_case. convert
use heck::SnakeCase;
let include_filename = msg.name.to_snake_case();
includes.push_str(&format!("#include <{}/{}/{}.h>\n",
&msg.module, &msg.prefix, &include_filename));
includes.push_str(&format!("#include <{}/{}/{}__rosidl_typesupport_introspection_c.h>\n",
&msg.module, &msg.prefix, &include_filename));
includes.push_str(&format!(
"#include <{}/{}/{}.h>\n",
&msg.module, &msg.prefix, &include_filename
));
includes.push_str(&format!(
"#include <{}/{}/{}__rosidl_typesupport_introspection_c.h>\n",
&msg.module, &msg.prefix, &include_filename
));
let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, &msg.name);
let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name);
introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n",key,val));
introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val));
}
introspecion_map.push_str("m \n }; }\n\n");
use std::io::Write;
use std::fs::File;
let mut f = File::create("src/msg_includes.h").unwrap();
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
let msg_includes_fn = out_path.join("msg_includes.h");
let introspection_fn = out_path.join("introspection_functions.rs");
let mut f = File::create(msg_includes_fn.clone()).unwrap();
write!(f, "{}", includes).unwrap();
let mut f = File::create("src/introspection_functions.rs").unwrap();
let mut f = File::create(introspection_fn).unwrap();
write!(f, "{}", introspecion_map).unwrap();
let mut builder = bindgen::Builder::default()
.header("src/msg_includes.h")
.header(msg_includes_fn.to_str().unwrap())
.derive_copy(false)
// blacklist types that are handled by rcl bindings
.blacklist_type("rosidl_message_type_support_t")
@ -72,7 +88,9 @@ fn main() {
.blacklist_type("rosidl_generator_c__int32__Sequence")
.blacklist_type("rosidl_generator_c__uint64__Sequence")
.blacklist_type("rosidl_generator_c__int64__Sequence")
.default_enum_style(bindgen::EnumVariation::Rust { non_exhaustive: false } );
.default_enum_style(bindgen::EnumVariation::Rust {
non_exhaustive: false,
});
let ament_prefix_var_name = "AMENT_PREFIX_PATH";
let ament_prefix_var = env::var(ament_prefix_var_name).expect("Source your ROS!");
@ -84,10 +102,7 @@ fn main() {
let bindings = builder.generate().expect("Unable to generate bindings");
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("msg_bindings.rs"))
.expect("Couldn't write bindings!");
// assert!(false);
}

View File

@ -4,7 +4,7 @@
#![allow(improper_ctypes)]
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/msg_bindings.rs"));
include!("./introspection_functions.rs");
include!(concat!(env!("OUT_DIR"), "/introspection_functions.rs"));
#[macro_use]
extern crate lazy_static;
@ -63,7 +63,7 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
let (module, prefix, name, c_struct, members) = introspection(ptr);
assert_eq!(module, module_);
assert_eq!(prefix, prefix_);
assert_eq!(name, name_);
assert_eq!(name, name_);
let mut fields = String::new();
@ -189,13 +189,95 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fn_ptrs() -> () {
assert_eq!(generate_rust_msg("std_msgs", "msg", "string"), "hej");
// this is even worse, it was added as an afterthought when I wanted to implement rostopic echo
pub fn generate_untyped_helpers(msgs: &Vec<common::RosMsg>) -> String {
let mut ts_helper = format!("fn untyped_ts_helper(typename: &str) -> Result<&'static rosidl_message_type_support_t, ()> {{");
for msg in msgs {
ts_helper.push_str(&generate_untyped_ts_helper(&msg.module, &msg.prefix, &msg.name));
}
ts_helper.push_str(&format!("return Err(())\n}}"));
let mut ds_helper = format!("fn untyped_deserialize_helper(typename: &str) -> Result<fn(native: *const std::os::raw::c_void) -> serde_json::Value, ()> {{");
for msg in msgs {
ds_helper.push_str(&generate_untyped_deserialize_helper(&msg.module, &msg.prefix, &msg.name));
}
ds_helper.push_str(&format!("return Err(())\n}}"));
let mut se_helper = format!("
fn untyped_serialize_helper(typename: &str) -> Result<fn(json: serde_json::Value) -> Result<*mut std::os::raw::c_void, ()>, ()> {{");
for msg in msgs {
se_helper.push_str(&generate_untyped_serialize_helper(&msg.module, &msg.prefix, &msg.name));
}
se_helper.push_str(&format!("return Err(())\n}}"));
let mut dealloc_helper = format!("fn untyped_dealloc_helper(typename: &str) -> Result<fn(*mut std::os::raw::c_void), ()> {{");
for msg in msgs {
dealloc_helper.push_str(&generate_untyped_dealloc_helper(&msg.module, &msg.prefix, &msg.name));
}
dealloc_helper.push_str(&format!("return Err(())\n}}"));
format!("{} \n\n {} \n\n {} \n\n {} \n\n", ts_helper, ds_helper, se_helper, dealloc_helper)
}
pub fn generate_untyped_ts_helper(module_: &str, prefix_: &str, name_: &str) -> String {
let typename = format!("{}/{}/{}", module_, prefix_, name_);
let rustname = format!("{}::{}::{}", module_, prefix_, name_);
format!("
if typename == \"{typename}\" {{
return Ok({rustname}::get_ts());
}}
", typename = typename, rustname = rustname)
}
pub fn generate_untyped_deserialize_helper(module_: &str, prefix_: &str, name_: &str) -> String {
let typename = format!("{}/{}/{}", module_, prefix_, name_);
let rustname = format!("{}::{}::{}", module_, prefix_, name_);
format!("
if typename == \"{typename}\" {{
let x = | native: *const std::os::raw::c_void | {{
let ptr = native as *const <{rustname} as WrappedTypesupport>::CStruct;
let msg = unsafe {{ {rustname}::from_native(&*ptr) }};
serde_json::to_value(&msg).unwrap() // should never crash, we serialize from a known struct
}};
return Ok(x);
}}", typename = typename, rustname = rustname)
}
pub fn generate_untyped_serialize_helper(module_: &str, prefix_: &str, name_: &str) -> String {
let typename = format!("{}/{}/{}", module_, prefix_, name_);
let rustname = format!("{}::{}::{}", module_, prefix_, name_);
format!("
if typename == \"{typename}\" {{
let x = | json: serde_json::Value | {{
let msg: Result<{rustname}, _> = serde_json::from_value(json);
match msg {{
Ok(msg) => {{
let native = {rustname}::create_msg();
unsafe {{ msg.copy_to_native(&mut *native); }}
Ok(native as *mut std::os::raw::c_void)
}},
Err(_) => Err(())
}}
}};
return Ok(x);
}}", typename = typename, rustname = rustname)
}
pub fn generate_untyped_dealloc_helper(module_: &str, prefix_: &str, name_: &str) -> String {
let typename = format!("{}/{}/{}", module_, prefix_, name_);
let rustname = format!("{}::{}::{}", module_, prefix_, name_);
format!("
if typename == \"{typename}\" {{
let y = | native: *mut std::os::raw::c_void | {{
let native_msg = native as *mut <{rustname} as WrappedTypesupport>::CStruct;
{rustname}::destroy_msg(native_msg);
}};
return Ok(y);
}}", typename = typename, rustname = rustname)
}

View File

@ -7,7 +7,9 @@ fn main() {
let mut builder = bindgen::Builder::default()
.header("src/rcl_wrapper.h")
.derive_copy(false)
.default_enum_style(bindgen::EnumVariation::Rust { non_exhaustive: false } );
.default_enum_style(bindgen::EnumVariation::Rust {
non_exhaustive: false,
});
let ament_prefix_var_name = "AMENT_PREFIX_PATH";
let ament_prefix_var = env::var(ament_prefix_var_name).expect("Source your ROS!");
@ -31,5 +33,4 @@ fn main() {
bindings
.write_to_file(out_path.join("rcl_bindings.rs"))
.expect("Couldn't write bindings!");
}

View File

@ -1,5 +1,10 @@
// ros client library
#include <rcl/rcl.h>
// query the network
#include <rcl/graph.h>
// errors
#include <rcutils/error_handling.h>
// low level msg type handling
@ -12,4 +17,3 @@
#include <rosidl_generator_c/message_type_support_struct.h>
#include <rosidl_typesupport_introspection_c/message_introspection.h>
#include <rosidl_typesupport_introspection_c/field_types.h>

View File

@ -1,13 +1,15 @@
include!("./generated_msgs.rs");
include!(concat!(env!("OUT_DIR"), "/generated_msgs.rs"));
include!(concat!(env!("OUT_DIR"), "/generated_typehacks.rs"));
use msg_gen::*;
use rcl::*;
use serde::{Deserialize, Serialize};
use std::ffi::CString;
use std::ffi::{CString,CStr};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::time::Duration;
use std::collections::HashMap;
pub trait WrappedTypesupport {
type CStruct;
@ -106,6 +108,13 @@ where
rcl_msg: WrappedNativeMsg<T>,
}
struct WrappedSubUntyped {
rcl_handle: rcl_subscription_t,
callback: Box<dyn FnMut(serde_json::Value) -> ()>,
serialize: Box<dyn FnMut(*const std::os::raw::c_void) -> serde_json::Value>,
rcl_msg: *mut std::os::raw::c_void,
}
impl<T> Sub for WrappedSub<T>
where
T: WrappedTypesupport,
@ -156,6 +165,28 @@ where
}
}
impl Sub for WrappedSubUntyped
{
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void {
self.rcl_msg
}
fn run_cb(&mut self) -> () {
let string = (self.serialize)(self.rcl_msg);
(self.callback)(string);
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
}
}
}
// The publish function is thread safe. ROS2 docs state:
// =============
//
@ -192,6 +223,12 @@ where
type_: PhantomData<T>,
}
unsafe impl Send for PublisherUntyped {}
pub struct PublisherUntyped {
handle: Weak<rcl_publisher_t>,
type_: String,
}
#[derive(Debug, Clone)]
pub struct Context {
context_handle: Arc<Mutex<Box<rcl_context_t>>>,
@ -281,10 +318,7 @@ impl Node {
}
}
fn create_subscription_helper<T>(&mut self, topic: &str) -> Result<rcl_subscription_t, ()>
where
T: WrappedTypesupport,
{
fn create_subscription_helper(&mut self, topic: &str, ts: *const rosidl_message_type_support_t) -> Result<rcl_subscription_t, ()> {
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
let topic_c_string = CString::new(topic).map_err(|_| ())?;
@ -294,7 +328,7 @@ impl Node {
rcl_subscription_init(
&mut subscription_handle,
self.node_handle.as_mut(),
T::get_ts(),
ts,
topic_c_string.as_ptr(),
&subscription_options,
)
@ -314,7 +348,7 @@ impl Node {
where
T: WrappedTypesupport,
{
let subscription_handle = self.create_subscription_helper::<T>(topic)?;
let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?;
let ws = WrappedSub {
rcl_handle: subscription_handle,
rcl_msg: WrappedNativeMsg::<T>::new(),
@ -332,7 +366,7 @@ impl Node {
where
T: WrappedTypesupport,
{
let subscription_handle = self.create_subscription_helper::<T>(topic)?;
let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?;
let ws = WrappedSubNative {
rcl_handle: subscription_handle,
rcl_msg: WrappedNativeMsg::<T>::new(),
@ -342,6 +376,28 @@ impl Node {
Ok(self.subs.last().unwrap().handle()) // hmm...
}
// Its not really untyped since we know the underlying type... But we throw this info away :)
pub fn subscribe_untyped(
&mut self,
topic: &str,
topic_type: &str,
callback: Box<dyn FnMut(serde_json::Value) -> ()>,
) -> Result<&rcl_subscription_t, ()> {
let ts = untyped_ts_helper(topic_type)?;
let de = untyped_deserialize_helper(topic_type)?;
let subscription_handle = self.create_subscription_helper(topic, ts)?;
let ws = WrappedSubUntyped {
rcl_handle: subscription_handle,
rcl_msg: unsafe { std_msgs__msg__String__create() as *mut _ as *mut std::os::raw::c_void },
callback: callback,
serialize: Box::new(de),
};
self.subs.push(Box::new(ws));
Ok(self.subs.last().unwrap().handle()) // hmm...
}
pub fn create_publisher<T>(&mut self, topic: &str) -> Result<Publisher<T>, ()>
where
T: WrappedTypesupport,
@ -373,6 +429,40 @@ impl Node {
}
}
pub fn create_publisher_untyped(&mut self, topic: &str, topic_type: &str) -> Result<PublisherUntyped, ()> {
let ts = if topic_type == "std_msgs/msg/String" {
std_msgs::msg::String::get_ts()
} else {
return Err(());
};
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c_string = CString::new(topic).unwrap();
let result = unsafe {
let mut publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = rmw_qos_profile_t::default();
rcl_publisher_init(
&mut publisher_handle,
self.node_handle.as_mut(),
ts,
topic_c_string.as_ptr(),
&publisher_options,
)
};
if result == RCL_RET_OK as i32 {
self.pubs.push(Arc::new(publisher_handle));
let p = PublisherUntyped {
handle: Arc::downgrade(self.pubs.last().unwrap()),
type_: topic_type.to_owned(),
};
Ok(p)
} else {
eprintln!("could not create publisher {}", result);
Err(())
}
}
pub fn spin_once(&mut self, timeout: Duration) {
let timeout = timeout.as_nanos() as i64;
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
@ -433,6 +523,34 @@ impl Node {
rcl_wait_set_fini(&mut ws);
}
}
pub fn get_topic_names_and_types(&self) -> Result<HashMap<String, Vec<String>>, ()> {
let mut tnat = unsafe { rmw_get_zero_initialized_names_and_types() };
let ret = unsafe {
rcl_get_topic_names_and_types(self.node_handle.as_ref(), &mut rcutils_get_default_allocator(), false, &mut tnat)
};
if ret != RCL_RET_OK as i32 {
eprintln!("could not get topic names and types {}", ret);
return Err(());
}
let names = unsafe { std::slice::from_raw_parts(tnat.names.data, tnat.names.size) };
let types = unsafe { std::slice::from_raw_parts(tnat.types, tnat.names.size) };
let mut res = HashMap::new();
for (n,t) in names.iter().zip(types) {
let topic_name = unsafe {CStr::from_ptr(*n).to_str().unwrap().to_owned() };
let topic_types = unsafe { std::slice::from_raw_parts(t, t.size) };
let topic_types: Vec<String> = unsafe {
topic_types.iter().map(|t| CStr::from_ptr(*((*t).data)).to_str().unwrap().to_owned()).collect()
};
res.insert(topic_name, topic_types);
}
unsafe { rmw_names_and_types_fini(&mut tnat); } // TODO: check return value
Ok(res)
}
}
// Since publishers are temporarily upgraded to owners during the
@ -514,6 +632,36 @@ where
}
}
impl PublisherUntyped {
pub fn publish(&self, msg: serde_json::Value) -> Result<(), ()> {
// upgrade to actual ref. if still alive
let publisher = self.handle.upgrade().ok_or(())?;
// figure out which serializer to use, publish, then destroy
let se = untyped_serialize_helper(&self.type_)?;
let dealloc = untyped_dealloc_helper(&self.type_)?;
// copy rust msg to native and publish it
let native_ptr = se(msg)?;
let result = unsafe {
rcl_publish(
publisher.as_ref(),
native_ptr,
std::ptr::null_mut(),
)
};
dealloc(native_ptr);
if result == RCL_RET_OK as i32 {
Ok(())
} else {
eprintln!("coult not publish {}", result);
Err(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;