Refactoring
This commit is contained in:
parent
7d620ad9ae
commit
de4781cd46
|
|
@ -3,28 +3,28 @@ use builtin_interfaces::msg::Duration;
|
|||
use trajectory_msgs::msg::*;
|
||||
use std_msgs::msg::Int32;
|
||||
|
||||
|
||||
fn main() -> Result<(), ()> {
|
||||
let mut ctx = rcl_create_context()?;
|
||||
let mut node = rcl_create_node(&mut ctx, "testnode", "")?;
|
||||
let publisher = rcl_create_publisher::<JointTrajectoryPoint>(&mut node, "/hej")?;
|
||||
let publisher2 = rcl_create_publisher::<Int32>(&mut node, "/native_count")?;
|
||||
let ctx = Context::create()?;
|
||||
let mut node = Node::create(ctx, "testnode", "")?;
|
||||
let publisher = node.create_publisher::<JointTrajectoryPoint>("/hej")?;
|
||||
let publisher2 = node.create_publisher::<Int32>("/native_count")?;
|
||||
|
||||
let mut c = 0;
|
||||
let mut positions: Vec<f64> = Vec::new();
|
||||
let cb = move |x:std_msgs::msg::String| {
|
||||
println!("at count {} got: {}", c, x.data);
|
||||
c = c + 1;
|
||||
let positions: Vec<f64> = vec!(94.2 * c as f64);
|
||||
positions.push(c as f64);
|
||||
let to_send = JointTrajectoryPoint {
|
||||
positions,
|
||||
positions: positions.clone(),
|
||||
time_from_start : Duration { sec: c, nanosec: 0 },
|
||||
..Default::default()
|
||||
};
|
||||
let mut native = WrappedNativeMsg::<Int32>::new();
|
||||
native.data = c;
|
||||
|
||||
publish(&publisher, &to_send).unwrap();
|
||||
publish_native(&publisher2, &native).unwrap();
|
||||
publisher.publish(&to_send).unwrap();
|
||||
publisher2.publish_native(&native).unwrap();
|
||||
};
|
||||
|
||||
let cb2 = move |x:JointTrajectoryPoint| {
|
||||
|
|
@ -36,27 +36,22 @@ fn main() -> Result<(), ()> {
|
|||
println!("Raw c data: {:?}", raw_c.positions);
|
||||
};
|
||||
|
||||
let sub1 = rcl_create_subscription(&mut node, "/hopp", Box::new(cb))?;
|
||||
let sub2 = rcl_create_subscription(&mut node, "/hej", Box::new(cb2))?;
|
||||
let sub3 = rcl_create_subscription_native(&mut node, "/hej", Box::new(cb3))?;
|
||||
let _sub1 = node.subscribe("/hopp", Box::new(cb))?;
|
||||
let _sub2 = node.subscribe("/hej", Box::new(cb2))?;
|
||||
let _sub3 = node.subscribe_native("/hej", Box::new(cb3))?;
|
||||
|
||||
// TODO: group subscriptions in a "node" struct
|
||||
let mut subs: Vec<Box<Sub>> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
|
||||
//let mut subs: Vec<Box<Sub>> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
|
||||
|
||||
// run for 10 seconds
|
||||
let mut count = 0;
|
||||
while count < 100 {
|
||||
let timeout = 100 * 1000 * 1000; // 0.1 sec
|
||||
take_subs(&mut ctx, &mut subs, timeout)?;
|
||||
count = count + 1;
|
||||
node.spin_once(std::time::Duration::from_millis(100));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
|
||||
// TODO: crashes here. maybe because pub and sub are not cleaned up
|
||||
rcl_destroy_node(&mut node);
|
||||
rcl_destroy_ctx(&mut ctx);
|
||||
println!("All done!");
|
||||
|
||||
Ok(())
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
use r2r::*;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
fn main() -> Result<(), ()> {
|
||||
let ctx = Context::create()?;
|
||||
|
||||
let th = {
|
||||
let mut node = Node::create(ctx, "testnode", "")?;
|
||||
|
||||
let (tx, rx) = mpsc::channel::<String>();
|
||||
|
||||
let p = node
|
||||
.create_publisher::<std_msgs::msg::String>("/hej")
|
||||
.unwrap();
|
||||
|
||||
let th = thread::spawn(move || loop {
|
||||
println!("thread looping");
|
||||
let des = if let Ok(msg) = rx.recv() {
|
||||
let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap();
|
||||
println!(
|
||||
"received: {}, deserialized ros msg = {:#?}",
|
||||
msg, deserialized
|
||||
);
|
||||
deserialized
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
|
||||
if let Err(_) = p.publish(&des) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
let tx1 = tx.clone();
|
||||
let cb = move |x: std_msgs::msg::String| {
|
||||
let serialized = serde_json::to_string(&x).unwrap();
|
||||
tx1.send(serialized).unwrap(); // pass msg on to other thread for printing
|
||||
};
|
||||
|
||||
let cb2 = move |x: &WrappedNativeMsg<std_msgs::msg::String>| {
|
||||
// use native data!
|
||||
let s = x.data.to_str();
|
||||
println!("native ros msg: {}", s);
|
||||
};
|
||||
|
||||
let _subref = node.subscribe("/hopp", Box::new(cb))?;
|
||||
let _subref = node.subscribe_native("/hoppe", Box::new(cb2))?;
|
||||
|
||||
// run for 10 seconds
|
||||
let mut count = 0;
|
||||
while count < 100 {
|
||||
node.spin_once(std::time::Duration::from_millis(100));
|
||||
count += 1;
|
||||
}
|
||||
th
|
||||
};
|
||||
|
||||
println!("dropped node");
|
||||
|
||||
th.join().unwrap();
|
||||
|
||||
println!("all done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1,55 +1,45 @@
|
|||
use std::thread;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
use r2r::*;
|
||||
|
||||
fn main() -> Result<(), ()> {
|
||||
let mut ctx = rcl_create_context()?;
|
||||
let mut node = rcl_create_node(&mut ctx, "qqq", "")?;
|
||||
let ctx = Context::create()?;
|
||||
let mut node = Node::create(ctx, "testnode", "")?;
|
||||
|
||||
let publisher = rcl_create_publisher::<std_msgs::msg::String>(&mut node, "/hej")?;
|
||||
let pubint = rcl_create_publisher::<std_msgs::msg::Int32>(&mut node, "/count")?;
|
||||
let publisher = node.create_publisher::<std_msgs::msg::String>("/hej")?;
|
||||
let pubint = node.create_publisher::<std_msgs::msg::Int32>("/count")?;
|
||||
|
||||
let (tx, rx) = mpsc::channel::<String>();
|
||||
thread::spawn(move|| {
|
||||
loop {
|
||||
let msg = rx.recv().unwrap();
|
||||
let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap();
|
||||
println!("received: {}, deserialized ros msg = {:?}", msg, deserialized);
|
||||
}
|
||||
thread::spawn(move || loop {
|
||||
let msg = rx.recv().unwrap();
|
||||
let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap();
|
||||
println!(
|
||||
"received: {}, deserialized ros msg = {:?}",
|
||||
msg, deserialized
|
||||
);
|
||||
});
|
||||
|
||||
let mut c = 0;
|
||||
let cb = move |x:std_msgs::msg::String| {
|
||||
let cb = move |x: std_msgs::msg::String| {
|
||||
let to_send = format!("at count {} got: {}", c, x.data);
|
||||
c = c + 1;
|
||||
let serialized = serde_json::to_string(&x).unwrap();
|
||||
tx.send(serialized).unwrap(); // pass msg on to other thread for printing
|
||||
let to_send = std_msgs::msg::String { data: to_send };
|
||||
publish(&publisher, &to_send).unwrap();
|
||||
publisher.publish(&to_send).unwrap();
|
||||
let to_send = std_msgs::msg::Int32 { data: c };
|
||||
publish(&pubint, &to_send).unwrap();
|
||||
pubint.publish(&to_send).unwrap();
|
||||
};
|
||||
|
||||
let ws2 = rcl_create_subscription(&mut node, "/hopp", Box::new(cb))?;
|
||||
|
||||
// TODO: group subscriptions in a "node" struct
|
||||
let mut subs: Vec<Box<Sub>> = vec![Box::new(ws2)];
|
||||
let _ws2 = node.subscribe("/hopp", Box::new(cb))?;
|
||||
|
||||
// run for 10 seconds
|
||||
let mut count = 0;
|
||||
while count < 100 {
|
||||
let timeout = 100 * 1000 * 1000; // 0.1 sec
|
||||
take_subs(&mut ctx, &mut subs, timeout)?;
|
||||
count = count + 1;
|
||||
node.spin_once(std::time::Duration::from_millis(100));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
|
||||
// TODO: crashes here. maybe because pub and sub are not cleaned up
|
||||
rcl_destroy_node(&mut node);
|
||||
rcl_destroy_ctx(&mut ctx);
|
||||
|
||||
Ok(())
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
581
src/lib.rs
581
src/lib.rs
|
|
@ -5,7 +5,9 @@ use rcl::*;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ffi::CString;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::time::Duration;
|
||||
|
||||
pub trait WrappedTypesupport {
|
||||
type CStruct;
|
||||
|
|
@ -83,9 +85,10 @@ pub trait Sub {
|
|||
fn handle(&self) -> &rcl_subscription_t;
|
||||
fn run_cb(&mut self) -> ();
|
||||
fn rcl_msg(&mut self) -> *mut std::os::raw::c_void;
|
||||
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
|
||||
}
|
||||
|
||||
pub struct WrappedSub<T>
|
||||
struct WrappedSub<T>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
|
|
@ -94,7 +97,7 @@ where
|
|||
rcl_msg: WrappedNativeMsg<T>,
|
||||
}
|
||||
|
||||
pub struct WrappedSubNative<T>
|
||||
struct WrappedSubNative<T>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
|
|
@ -120,6 +123,12 @@ where
|
|||
let msg = T::from_native(&self.rcl_msg);
|
||||
(self.callback)(msg);
|
||||
}
|
||||
|
||||
fn destroy(&mut self, node: &mut rcl_node_t) {
|
||||
unsafe {
|
||||
rcl_subscription_fini(&mut self.rcl_handle, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sub for WrappedSubNative<T>
|
||||
|
|
@ -139,232 +148,370 @@ where
|
|||
// e.g. if you for instance have large image data...
|
||||
(self.callback)(&self.rcl_msg);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_create_context() -> Result<rcl_context_t, ()> {
|
||||
let mut ctx = unsafe { rcl_get_zero_initialized_context() };
|
||||
let isok = unsafe {
|
||||
let allocator = rcutils_get_default_allocator();
|
||||
let mut init_options = rcl_get_zero_initialized_init_options();
|
||||
rcl_init_options_init(&mut init_options, allocator);
|
||||
rcl_init(0, std::ptr::null(), &init_options, &mut ctx);
|
||||
let isok = rcl_context_is_valid(&mut ctx);
|
||||
rcl_init_options_fini(&mut init_options as *mut _);
|
||||
isok
|
||||
};
|
||||
|
||||
if isok {
|
||||
Ok(ctx)
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_create_node(
|
||||
ctx: &mut rcl_context_t,
|
||||
name: &str,
|
||||
namespace: &str,
|
||||
) -> Result<rcl_node_t, ()> {
|
||||
let c_node_name = CString::new(name).unwrap();
|
||||
let c_node_ns = CString::new(namespace).unwrap();
|
||||
let mut node_handle = unsafe { rcl_get_zero_initialized_node() };
|
||||
let nr = unsafe {
|
||||
let node_options = rcl_node_get_default_options();
|
||||
rcl_node_init(
|
||||
&mut node_handle as *mut _,
|
||||
c_node_name.as_ptr(),
|
||||
c_node_ns.as_ptr(),
|
||||
ctx,
|
||||
&node_options as *const _,
|
||||
)
|
||||
};
|
||||
if nr == RCL_RET_OK as i32 {
|
||||
Ok(node_handle)
|
||||
} else {
|
||||
eprintln!("{}", nr);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_destroy_node(node: &mut rcl_node_t) {
|
||||
unsafe {
|
||||
rcl_node_fini(node);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_destroy_ctx(ctx: &mut rcl_context_t) {
|
||||
unsafe {
|
||||
rcl_shutdown(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_create_publisher<T>(node: &mut rcl_node_t, topic: &str) -> Result<rcl_publisher_t, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
|
||||
let topic_c_string = CString::new(topic).unwrap();
|
||||
|
||||
let result = unsafe {
|
||||
let mut publisher_options = rcl_publisher_get_default_options();
|
||||
publisher_options.qos = rmw_qos_profile_t::default();
|
||||
rcl_publisher_init(
|
||||
&mut publisher_handle,
|
||||
node,
|
||||
T::get_ts(),
|
||||
topic_c_string.as_ptr(),
|
||||
&publisher_options,
|
||||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(publisher_handle)
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn publish<T>(publisher: &rcl_publisher_t, msg: &T) -> Result<(), ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
// copy rust msg to native and publish it
|
||||
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
|
||||
let result = unsafe { rcl_publish(publisher, native_msg.void_ptr(), std::ptr::null_mut()) };
|
||||
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(())
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn publish_native<T>(publisher: &rcl_publisher_t, msg: &WrappedNativeMsg<T>) -> Result<(), ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let result = unsafe { rcl_publish(publisher, msg.void_ptr(), std::ptr::null_mut()) };
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(())
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
fn create_subscription_helper<T>(
|
||||
node: &mut rcl_node_t,
|
||||
topic: &str,
|
||||
) -> Result<rcl_subscription_t, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
|
||||
let topic_c_string = CString::new(topic).map_err(|_| ())?;
|
||||
|
||||
let result = unsafe {
|
||||
let mut subscription_options = rcl_subscription_get_default_options();
|
||||
subscription_options.qos = rmw_qos_profile_t::default();
|
||||
rcl_subscription_init(
|
||||
&mut subscription_handle,
|
||||
node,
|
||||
T::get_ts(),
|
||||
topic_c_string.as_ptr(),
|
||||
&subscription_options,
|
||||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(subscription_handle)
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rcl_create_subscription<T>(
|
||||
node: &mut rcl_node_t,
|
||||
topic: &str,
|
||||
callback: Box<dyn FnMut(T) -> ()>,
|
||||
) -> Result<WrappedSub<T>, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle = create_subscription_helper::<T>(node, topic)?;
|
||||
let wrapped_sub = WrappedSub {
|
||||
rcl_handle: subscription_handle,
|
||||
rcl_msg: WrappedNativeMsg::<T>::new(),
|
||||
callback: callback,
|
||||
};
|
||||
|
||||
Ok(wrapped_sub)
|
||||
}
|
||||
|
||||
pub fn rcl_create_subscription_native<T>(
|
||||
node: &mut rcl_node_t,
|
||||
topic: &str,
|
||||
callback: Box<dyn FnMut(&WrappedNativeMsg<T>) -> ()>,
|
||||
) -> Result<WrappedSubNative<T>, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle = create_subscription_helper::<T>(node, topic)?;
|
||||
let wrapped_sub = WrappedSubNative {
|
||||
rcl_handle: subscription_handle,
|
||||
rcl_msg: WrappedNativeMsg::<T>::new(),
|
||||
callback: callback,
|
||||
};
|
||||
|
||||
Ok(wrapped_sub)
|
||||
}
|
||||
|
||||
pub fn take_subs(
|
||||
ctx: &mut rcl_context_t,
|
||||
subs: &mut Vec<Box<dyn Sub>>,
|
||||
timeout: i64,
|
||||
) -> Result<(), ()> {
|
||||
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
|
||||
|
||||
unsafe {
|
||||
rcl_wait_set_init(
|
||||
&mut ws,
|
||||
subs.len(),
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
ctx,
|
||||
rcutils_get_default_allocator(),
|
||||
);
|
||||
}
|
||||
unsafe {
|
||||
rcl_wait_set_clear(&mut ws);
|
||||
}
|
||||
|
||||
for s in subs.iter() {
|
||||
fn destroy(&mut self, node: &mut rcl_node_t) {
|
||||
unsafe {
|
||||
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
|
||||
rcl_subscription_fini(&mut self.rcl_handle, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The publish function is thread safe. ROS2 docs state:
|
||||
// =============
|
||||
//
|
||||
// This function is thread safe so long as access to both the
|
||||
// publisher and the" `ros_message` is synchronized." That means that
|
||||
// calling rcl_publish() from multiple threads is allowed, but"
|
||||
// calling rcl_publish() at the same time as non-thread safe
|
||||
// publisher" functions is not, e.g. calling rcl_publish() and
|
||||
// rcl_publisher_fini()" concurrently is not allowed." Before calling
|
||||
// rcl_publish() the message can change and after calling"
|
||||
// rcl_publish() the message can change, but it cannot be changed
|
||||
// during the" publish call." The same `ros_message`, however, can be
|
||||
// passed to multiple calls of" rcl_publish() simultaneously, even if
|
||||
// the publishers differ." The `ros_message` is unmodified by
|
||||
// rcl_publish()."
|
||||
//
|
||||
// TODO: I guess there is a potential error source in destructuring
|
||||
// while calling publish. I don't think its worth to protect with a
|
||||
// mutex/rwlock for this though...
|
||||
//
|
||||
// Methods that mutate need to called from the thread owning the Node.
|
||||
// I don't think we can count on Node being generally thread-safe.
|
||||
// So keep pub/sub management and polling contained to one thread
|
||||
// and send out publishers.
|
||||
|
||||
unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
|
||||
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
pub struct Publisher<T>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
handle: Weak<rcl_publisher_t>,
|
||||
type_: PhantomData<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Context {
|
||||
context_handle: Arc<Mutex<Box<rcl_context_t>>>,
|
||||
}
|
||||
|
||||
// Not 100% about this one. From our end the context is rarely used and can be locked by a mutex for that. But I haven't investigated if its use is thread-safe between nodes. May remove send here later.
|
||||
unsafe impl Send for Context {}
|
||||
|
||||
impl Context {
|
||||
pub fn create() -> Result<Context, ()> {
|
||||
let mut ctx: Box<rcl_context_t> = unsafe { Box::new(rcl_get_zero_initialized_context()) };
|
||||
let is_valid = unsafe {
|
||||
let allocator = rcutils_get_default_allocator();
|
||||
let mut init_options = rcl_get_zero_initialized_init_options();
|
||||
rcl_init_options_init(&mut init_options, allocator);
|
||||
rcl_init(0, std::ptr::null(), &init_options, ctx.as_mut());
|
||||
rcl_init_options_fini(&mut init_options as *mut _);
|
||||
rcl_context_is_valid(ctx.as_mut())
|
||||
};
|
||||
|
||||
if is_valid {
|
||||
Ok(Context {
|
||||
context_handle: Arc::new(Mutex::new(ctx)),
|
||||
})
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ContextHandle(Arc<Mutex<Box<rcl_context_t>>>);
|
||||
|
||||
impl Drop for ContextHandle {
|
||||
fn drop(&mut self) {
|
||||
println!("DROPPING CONTEXT HANDLE!");
|
||||
let mut ctx_handle = self.0.lock().unwrap();
|
||||
// TODO: error handling? atleast probably need rcl_reset_error
|
||||
unsafe {
|
||||
rcl::rcl_shutdown(ctx_handle.as_mut());
|
||||
rcl::rcl_context_fini(ctx_handle.as_mut());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Node {
|
||||
context: Context,
|
||||
node_handle: Box<rcl_node_t>,
|
||||
// the node owns the subscribers
|
||||
subs: Vec<Box<dyn Sub>>,
|
||||
// and the publishers, whom we allow to be shared.. hmm.
|
||||
pubs: Vec<Arc<rcl_publisher_t>>,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node, ()> {
|
||||
let (res, node_handle) = {
|
||||
let mut ctx_handle = ctx.context_handle.lock().unwrap();
|
||||
|
||||
let c_node_name = CString::new(name).unwrap();
|
||||
let c_node_ns = CString::new(namespace).unwrap();
|
||||
let mut node_handle: Box<rcl_node_t> =
|
||||
unsafe { Box::new(rcl_get_zero_initialized_node()) };
|
||||
let res = unsafe {
|
||||
let node_options = rcl_node_get_default_options();
|
||||
rcl_node_init(
|
||||
node_handle.as_mut(),
|
||||
c_node_name.as_ptr(),
|
||||
c_node_ns.as_ptr(),
|
||||
ctx_handle.as_mut(),
|
||||
&node_options as *const _,
|
||||
)
|
||||
};
|
||||
(res, node_handle)
|
||||
};
|
||||
|
||||
if res == RCL_RET_OK as i32 {
|
||||
Ok(Node {
|
||||
context: ctx,
|
||||
node_handle: node_handle,
|
||||
subs: Vec::new(),
|
||||
pubs: Vec::new(),
|
||||
})
|
||||
} else {
|
||||
eprintln!("{}", res);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
unsafe {
|
||||
rcl_wait(&mut ws, timeout);
|
||||
}
|
||||
fn create_subscription_helper<T>(&mut self, topic: &str) -> Result<rcl_subscription_t, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
|
||||
let topic_c_string = CString::new(topic).map_err(|_| ())?;
|
||||
|
||||
for s in subs {
|
||||
let mut msg_info = rmw_message_info_t::default();
|
||||
let ret = unsafe { rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut()) };
|
||||
// fresh message, run cb
|
||||
if ret == RCL_RET_OK as i32 {
|
||||
s.run_cb();
|
||||
let result = unsafe {
|
||||
let mut subscription_options = rcl_subscription_get_default_options();
|
||||
subscription_options.qos = rmw_qos_profile_t::default();
|
||||
rcl_subscription_init(
|
||||
&mut subscription_handle,
|
||||
self.node_handle.as_mut(),
|
||||
T::get_ts(),
|
||||
topic_c_string.as_ptr(),
|
||||
&subscription_options,
|
||||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(subscription_handle)
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
unsafe {
|
||||
rcl_wait_set_fini(&mut ws);
|
||||
pub fn subscribe<T: 'static>(
|
||||
&mut self,
|
||||
topic: &str,
|
||||
callback: Box<dyn FnMut(T) -> ()>,
|
||||
) -> Result<&rcl_subscription_t, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle = self.create_subscription_helper::<T>(topic)?;
|
||||
let ws = WrappedSub {
|
||||
rcl_handle: subscription_handle,
|
||||
rcl_msg: WrappedNativeMsg::<T>::new(),
|
||||
callback: callback,
|
||||
};
|
||||
self.subs.push(Box::new(ws));
|
||||
Ok(self.subs.last().unwrap().handle()) // hmm...
|
||||
}
|
||||
|
||||
Ok(())
|
||||
pub fn subscribe_native<T: 'static>(
|
||||
&mut self,
|
||||
topic: &str,
|
||||
callback: Box<dyn FnMut(&WrappedNativeMsg<T>) -> ()>,
|
||||
) -> Result<&rcl_subscription_t, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle = self.create_subscription_helper::<T>(topic)?;
|
||||
let ws = WrappedSubNative {
|
||||
rcl_handle: subscription_handle,
|
||||
rcl_msg: WrappedNativeMsg::<T>::new(),
|
||||
callback: callback,
|
||||
};
|
||||
self.subs.push(Box::new(ws));
|
||||
Ok(self.subs.last().unwrap().handle()) // hmm...
|
||||
}
|
||||
|
||||
pub fn create_publisher<T>(&mut self, topic: &str) -> Result<Publisher<T>, ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
|
||||
let topic_c_string = CString::new(topic).unwrap();
|
||||
|
||||
let result = unsafe {
|
||||
let mut publisher_options = rcl_publisher_get_default_options();
|
||||
publisher_options.qos = rmw_qos_profile_t::default();
|
||||
rcl_publisher_init(
|
||||
&mut publisher_handle,
|
||||
self.node_handle.as_mut(),
|
||||
T::get_ts(),
|
||||
topic_c_string.as_ptr(),
|
||||
&publisher_options,
|
||||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
self.pubs.push(Arc::new(publisher_handle));
|
||||
let p = Publisher {
|
||||
handle: Arc::downgrade(self.pubs.last().unwrap()),
|
||||
type_: PhantomData,
|
||||
};
|
||||
Ok(p)
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spin_once(&mut self, timeout: Duration) {
|
||||
let timeout = timeout.as_nanos() as i64;
|
||||
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
|
||||
|
||||
// #[doc = "* This function is thread-safe for unique wait sets with unique contents."]
|
||||
// #[doc = "* This function cannot operate on the same wait set in multiple threads, and"]
|
||||
// #[doc = "* the wait sets may not share content."]
|
||||
// #[doc = "* For example, calling rcl_wait() in two threads on two different wait sets"]
|
||||
// #[doc = "* that both contain a single, shared guard condition is undefined behavior."]
|
||||
|
||||
{
|
||||
let mut ctx = self.context.context_handle.lock().unwrap();
|
||||
|
||||
unsafe {
|
||||
rcl_wait_set_init(
|
||||
&mut ws,
|
||||
self.subs.len(),
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
ctx.as_mut(),
|
||||
rcutils_get_default_allocator(),
|
||||
);
|
||||
}
|
||||
}
|
||||
unsafe {
|
||||
rcl_wait_set_clear(&mut ws);
|
||||
}
|
||||
|
||||
for s in self.subs.iter() {
|
||||
unsafe {
|
||||
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
|
||||
}
|
||||
}
|
||||
|
||||
unsafe {
|
||||
rcl_wait(&mut ws, timeout);
|
||||
}
|
||||
|
||||
let ws_subs =
|
||||
unsafe { std::slice::from_raw_parts(ws.subscriptions, ws.size_of_subscriptions) };
|
||||
assert_eq!(ws_subs.len(), self.subs.len());
|
||||
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
|
||||
for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) {
|
||||
if ws_s != &std::ptr::null() {
|
||||
let ret = unsafe {
|
||||
rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut())
|
||||
};
|
||||
if ret == RCL_RET_OK as i32 {
|
||||
s.run_cb();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe {
|
||||
rcl_wait_set_fini(&mut ws);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Since publishers are temporarily upgraded to owners during the
|
||||
// actual publish but are not the ones that handle cleanup, we simply
|
||||
// wait until there are no other owners in the cleanup procedure. The
|
||||
// next time a publisher wants to publish they will fail because the
|
||||
// value in the Arc has been dropped. Hacky but works.
|
||||
fn wait_until_unwrapped<T>(mut a: Arc<T>) -> T
|
||||
where
|
||||
T: std::fmt::Debug,
|
||||
{
|
||||
loop {
|
||||
match Arc::try_unwrap(a) {
|
||||
Ok(b) => return b,
|
||||
Err(t) => a = t,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Node {
|
||||
fn drop(&mut self) {
|
||||
for s in &mut self.subs {
|
||||
s.destroy(&mut self.node_handle);
|
||||
}
|
||||
while let Some(p) = self.pubs.pop() {
|
||||
let mut p = wait_until_unwrapped(p);
|
||||
let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) };
|
||||
// TODO: check ret
|
||||
}
|
||||
unsafe {
|
||||
rcl_node_fini(self.node_handle.as_mut());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Publisher<T>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
pub fn publish(&self, msg: &T) -> Result<(), ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
// upgrade to actual ref. if still alive
|
||||
let publisher = self.handle.upgrade().ok_or(())?;
|
||||
// copy rust msg to native and publish it
|
||||
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
|
||||
let result = unsafe {
|
||||
rcl_publish(
|
||||
publisher.as_ref(),
|
||||
native_msg.void_ptr(),
|
||||
std::ptr::null_mut(),
|
||||
)
|
||||
};
|
||||
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(())
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn publish_native(&self, msg: &WrappedNativeMsg<T>) -> Result<(), ()>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
// upgrade to actual ref. if still alive
|
||||
let publisher = self.handle.upgrade().ok_or(())?;
|
||||
|
||||
let result =
|
||||
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) };
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(())
|
||||
} else {
|
||||
eprintln!("{}", result);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
Loading…
Reference in New Issue