diff --git a/examples/publish_complex_msgs.rs b/examples/publish_complex_msgs.rs index 6f2572e..d8e3041 100644 --- a/examples/publish_complex_msgs.rs +++ b/examples/publish_complex_msgs.rs @@ -3,28 +3,28 @@ use builtin_interfaces::msg::Duration; use trajectory_msgs::msg::*; use std_msgs::msg::Int32; - fn main() -> Result<(), ()> { - let mut ctx = rcl_create_context()?; - let mut node = rcl_create_node(&mut ctx, "testnode", "")?; - let publisher = rcl_create_publisher::(&mut node, "/hej")?; - let publisher2 = rcl_create_publisher::(&mut node, "/native_count")?; + let ctx = Context::create()?; + let mut node = Node::create(ctx, "testnode", "")?; + let publisher = node.create_publisher::("/hej")?; + let publisher2 = node.create_publisher::("/native_count")?; let mut c = 0; + let mut positions: Vec = Vec::new(); let cb = move |x:std_msgs::msg::String| { println!("at count {} got: {}", c, x.data); c = c + 1; - let positions: Vec = vec!(94.2 * c as f64); + positions.push(c as f64); let to_send = JointTrajectoryPoint { - positions, + positions: positions.clone(), time_from_start : Duration { sec: c, nanosec: 0 }, ..Default::default() }; let mut native = WrappedNativeMsg::::new(); native.data = c; - publish(&publisher, &to_send).unwrap(); - publish_native(&publisher2, &native).unwrap(); + publisher.publish(&to_send).unwrap(); + publisher2.publish_native(&native).unwrap(); }; let cb2 = move |x:JointTrajectoryPoint| { @@ -36,27 +36,22 @@ fn main() -> Result<(), ()> { println!("Raw c data: {:?}", raw_c.positions); }; - let sub1 = rcl_create_subscription(&mut node, "/hopp", Box::new(cb))?; - let sub2 = rcl_create_subscription(&mut node, "/hej", Box::new(cb2))?; - let sub3 = rcl_create_subscription_native(&mut node, "/hej", Box::new(cb3))?; + let _sub1 = node.subscribe("/hopp", Box::new(cb))?; + let _sub2 = node.subscribe("/hej", Box::new(cb2))?; + let _sub3 = node.subscribe_native("/hej", Box::new(cb3))?; // TODO: group subscriptions in a "node" struct - let mut subs: Vec> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)]; + //let mut subs: Vec> = vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)]; // run for 10 seconds let mut count = 0; while count < 100 { - let timeout = 100 * 1000 * 1000; // 0.1 sec - take_subs(&mut ctx, &mut subs, timeout)?; - count = count + 1; + node.spin_once(std::time::Duration::from_millis(100)); + count += 1; } - - // TODO: crashes here. maybe because pub and sub are not cleaned up - rcl_destroy_node(&mut node); - rcl_destroy_ctx(&mut ctx); + println!("All done!"); Ok(()) - } diff --git a/examples/subscriber.rs b/examples/subscriber.rs new file mode 100644 index 0000000..857fd5c --- /dev/null +++ b/examples/subscriber.rs @@ -0,0 +1,66 @@ +use r2r::*; +use std::sync::mpsc; +use std::thread; + +fn main() -> Result<(), ()> { + let ctx = Context::create()?; + + let th = { + let mut node = Node::create(ctx, "testnode", "")?; + + let (tx, rx) = mpsc::channel::(); + + let p = node + .create_publisher::("/hej") + .unwrap(); + + let th = thread::spawn(move || loop { + println!("thread looping"); + let des = if let Ok(msg) = rx.recv() { + let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); + println!( + "received: {}, deserialized ros msg = {:#?}", + msg, deserialized + ); + deserialized + } else { + break; + }; + + if let Err(_) = p.publish(&des) { + break; + } + }); + + let tx1 = tx.clone(); + let cb = move |x: std_msgs::msg::String| { + let serialized = serde_json::to_string(&x).unwrap(); + tx1.send(serialized).unwrap(); // pass msg on to other thread for printing + }; + + let cb2 = move |x: &WrappedNativeMsg| { + // use native data! + let s = x.data.to_str(); + println!("native ros msg: {}", s); + }; + + let _subref = node.subscribe("/hopp", Box::new(cb))?; + let _subref = node.subscribe_native("/hoppe", Box::new(cb2))?; + + // run for 10 seconds + let mut count = 0; + while count < 100 { + node.spin_once(std::time::Duration::from_millis(100)); + count += 1; + } + th + }; + + println!("dropped node"); + + th.join().unwrap(); + + println!("all done"); + + Ok(()) +} diff --git a/examples/subscriber_with_thread.rs b/examples/subscriber_with_thread.rs index 08a0c5e..fa1f4f8 100644 --- a/examples/subscriber_with_thread.rs +++ b/examples/subscriber_with_thread.rs @@ -1,55 +1,45 @@ -use std::thread; use std::sync::mpsc; +use std::thread; use r2r::*; fn main() -> Result<(), ()> { - let mut ctx = rcl_create_context()?; - let mut node = rcl_create_node(&mut ctx, "qqq", "")?; + let ctx = Context::create()?; + let mut node = Node::create(ctx, "testnode", "")?; - let publisher = rcl_create_publisher::(&mut node, "/hej")?; - let pubint = rcl_create_publisher::(&mut node, "/count")?; + let publisher = node.create_publisher::("/hej")?; + let pubint = node.create_publisher::("/count")?; let (tx, rx) = mpsc::channel::(); - thread::spawn(move|| { - loop { - let msg = rx.recv().unwrap(); - let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); - println!("received: {}, deserialized ros msg = {:?}", msg, deserialized); - } + thread::spawn(move || loop { + let msg = rx.recv().unwrap(); + let deserialized: std_msgs::msg::String = serde_json::from_str(&msg).unwrap(); + println!( + "received: {}, deserialized ros msg = {:?}", + msg, deserialized + ); }); let mut c = 0; - let cb = move |x:std_msgs::msg::String| { + let cb = move |x: std_msgs::msg::String| { let to_send = format!("at count {} got: {}", c, x.data); c = c + 1; let serialized = serde_json::to_string(&x).unwrap(); tx.send(serialized).unwrap(); // pass msg on to other thread for printing let to_send = std_msgs::msg::String { data: to_send }; - publish(&publisher, &to_send).unwrap(); + publisher.publish(&to_send).unwrap(); let to_send = std_msgs::msg::Int32 { data: c }; - publish(&pubint, &to_send).unwrap(); + pubint.publish(&to_send).unwrap(); }; - let ws2 = rcl_create_subscription(&mut node, "/hopp", Box::new(cb))?; - - // TODO: group subscriptions in a "node" struct - let mut subs: Vec> = vec![Box::new(ws2)]; + let _ws2 = node.subscribe("/hopp", Box::new(cb))?; // run for 10 seconds let mut count = 0; while count < 100 { - let timeout = 100 * 1000 * 1000; // 0.1 sec - take_subs(&mut ctx, &mut subs, timeout)?; - count = count + 1; + node.spin_once(std::time::Duration::from_millis(100)); + count += 1; } - - // TODO: crashes here. maybe because pub and sub are not cleaned up - rcl_destroy_node(&mut node); - rcl_destroy_ctx(&mut ctx); - Ok(()) - } - diff --git a/src/lib.rs b/src/lib.rs index 5cd1421..e5d6dd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,9 @@ use rcl::*; use serde::{Deserialize, Serialize}; use std::ffi::CString; +use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use std::time::Duration; pub trait WrappedTypesupport { type CStruct; @@ -83,9 +85,10 @@ pub trait Sub { fn handle(&self) -> &rcl_subscription_t; fn run_cb(&mut self) -> (); fn rcl_msg(&mut self) -> *mut std::os::raw::c_void; + fn destroy(&mut self, node: &mut rcl_node_t) -> (); } -pub struct WrappedSub +struct WrappedSub where T: WrappedTypesupport, { @@ -94,7 +97,7 @@ where rcl_msg: WrappedNativeMsg, } -pub struct WrappedSubNative +struct WrappedSubNative where T: WrappedTypesupport, { @@ -120,6 +123,12 @@ where let msg = T::from_native(&self.rcl_msg); (self.callback)(msg); } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } } impl Sub for WrappedSubNative @@ -139,232 +148,370 @@ where // e.g. if you for instance have large image data... (self.callback)(&self.rcl_msg); } -} -pub fn rcl_create_context() -> Result { - let mut ctx = unsafe { rcl_get_zero_initialized_context() }; - let isok = unsafe { - let allocator = rcutils_get_default_allocator(); - let mut init_options = rcl_get_zero_initialized_init_options(); - rcl_init_options_init(&mut init_options, allocator); - rcl_init(0, std::ptr::null(), &init_options, &mut ctx); - let isok = rcl_context_is_valid(&mut ctx); - rcl_init_options_fini(&mut init_options as *mut _); - isok - }; - - if isok { - Ok(ctx) - } else { - Err(()) - } -} - -pub fn rcl_create_node( - ctx: &mut rcl_context_t, - name: &str, - namespace: &str, -) -> Result { - let c_node_name = CString::new(name).unwrap(); - let c_node_ns = CString::new(namespace).unwrap(); - let mut node_handle = unsafe { rcl_get_zero_initialized_node() }; - let nr = unsafe { - let node_options = rcl_node_get_default_options(); - rcl_node_init( - &mut node_handle as *mut _, - c_node_name.as_ptr(), - c_node_ns.as_ptr(), - ctx, - &node_options as *const _, - ) - }; - if nr == RCL_RET_OK as i32 { - Ok(node_handle) - } else { - eprintln!("{}", nr); - Err(()) - } -} - -pub fn rcl_destroy_node(node: &mut rcl_node_t) { - unsafe { - rcl_node_fini(node); - } -} - -pub fn rcl_destroy_ctx(ctx: &mut rcl_context_t) { - unsafe { - rcl_shutdown(ctx); - } -} - -pub fn rcl_create_publisher(node: &mut rcl_node_t, topic: &str) -> Result -where - T: WrappedTypesupport, -{ - let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; - let topic_c_string = CString::new(topic).unwrap(); - - let result = unsafe { - let mut publisher_options = rcl_publisher_get_default_options(); - publisher_options.qos = rmw_qos_profile_t::default(); - rcl_publisher_init( - &mut publisher_handle, - node, - T::get_ts(), - topic_c_string.as_ptr(), - &publisher_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(publisher_handle) - } else { - eprintln!("{}", result); - Err(()) - } -} - -pub fn publish(publisher: &rcl_publisher_t, msg: &T) -> Result<(), ()> -where - T: WrappedTypesupport, -{ - // copy rust msg to native and publish it - let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); - let result = unsafe { rcl_publish(publisher, native_msg.void_ptr(), std::ptr::null_mut()) }; - - if result == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("{}", result); - Err(()) - } -} - -pub fn publish_native(publisher: &rcl_publisher_t, msg: &WrappedNativeMsg) -> Result<(), ()> -where - T: WrappedTypesupport, -{ - let result = unsafe { rcl_publish(publisher, msg.void_ptr(), std::ptr::null_mut()) }; - if result == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("{}", result); - Err(()) - } -} - -fn create_subscription_helper( - node: &mut rcl_node_t, - topic: &str, -) -> Result -where - T: WrappedTypesupport, -{ - let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; - let topic_c_string = CString::new(topic).map_err(|_| ())?; - - let result = unsafe { - let mut subscription_options = rcl_subscription_get_default_options(); - subscription_options.qos = rmw_qos_profile_t::default(); - rcl_subscription_init( - &mut subscription_handle, - node, - T::get_ts(), - topic_c_string.as_ptr(), - &subscription_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(subscription_handle) - } else { - Err(()) - } -} - -pub fn rcl_create_subscription( - node: &mut rcl_node_t, - topic: &str, - callback: Box ()>, -) -> Result, ()> -where - T: WrappedTypesupport, -{ - let subscription_handle = create_subscription_helper::(node, topic)?; - let wrapped_sub = WrappedSub { - rcl_handle: subscription_handle, - rcl_msg: WrappedNativeMsg::::new(), - callback: callback, - }; - - Ok(wrapped_sub) -} - -pub fn rcl_create_subscription_native( - node: &mut rcl_node_t, - topic: &str, - callback: Box) -> ()>, -) -> Result, ()> -where - T: WrappedTypesupport, -{ - let subscription_handle = create_subscription_helper::(node, topic)?; - let wrapped_sub = WrappedSubNative { - rcl_handle: subscription_handle, - rcl_msg: WrappedNativeMsg::::new(), - callback: callback, - }; - - Ok(wrapped_sub) -} - -pub fn take_subs( - ctx: &mut rcl_context_t, - subs: &mut Vec>, - timeout: i64, -) -> Result<(), ()> { - let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; - - unsafe { - rcl_wait_set_init( - &mut ws, - subs.len(), - 0, - 0, - 0, - 0, - 0, - ctx, - rcutils_get_default_allocator(), - ); - } - unsafe { - rcl_wait_set_clear(&mut ws); - } - - for s in subs.iter() { + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { - rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +} + +// The publish function is thread safe. ROS2 docs state: +// ============= +// +// This function is thread safe so long as access to both the +// publisher and the" `ros_message` is synchronized." That means that +// calling rcl_publish() from multiple threads is allowed, but" +// calling rcl_publish() at the same time as non-thread safe +// publisher" functions is not, e.g. calling rcl_publish() and +// rcl_publisher_fini()" concurrently is not allowed." Before calling +// rcl_publish() the message can change and after calling" +// rcl_publish() the message can change, but it cannot be changed +// during the" publish call." The same `ros_message`, however, can be +// passed to multiple calls of" rcl_publish() simultaneously, even if +// the publishers differ." The `ros_message` is unmodified by +// rcl_publish()." +// +// TODO: I guess there is a potential error source in destructuring +// while calling publish. I don't think its worth to protect with a +// mutex/rwlock for this though... +// +// Methods that mutate need to called from the thread owning the Node. +// I don't think we can count on Node being generally thread-safe. +// So keep pub/sub management and polling contained to one thread +// and send out publishers. + +unsafe impl Send for Publisher where T: WrappedTypesupport {} + +use std::sync::{Arc, Mutex, Weak}; +pub struct Publisher +where + T: WrappedTypesupport, +{ + handle: Weak, + type_: PhantomData, +} + +#[derive(Debug, Clone)] +pub struct Context { + context_handle: Arc>>, +} + +// Not 100% about this one. From our end the context is rarely used and can be locked by a mutex for that. But I haven't investigated if its use is thread-safe between nodes. May remove send here later. +unsafe impl Send for Context {} + +impl Context { + pub fn create() -> Result { + let mut ctx: Box = unsafe { Box::new(rcl_get_zero_initialized_context()) }; + let is_valid = unsafe { + let allocator = rcutils_get_default_allocator(); + let mut init_options = rcl_get_zero_initialized_init_options(); + rcl_init_options_init(&mut init_options, allocator); + rcl_init(0, std::ptr::null(), &init_options, ctx.as_mut()); + rcl_init_options_fini(&mut init_options as *mut _); + rcl_context_is_valid(ctx.as_mut()) + }; + + if is_valid { + Ok(Context { + context_handle: Arc::new(Mutex::new(ctx)), + }) + } else { + Err(()) + } + } +} + +#[derive(Debug, Clone)] +pub struct ContextHandle(Arc>>); + +impl Drop for ContextHandle { + fn drop(&mut self) { + println!("DROPPING CONTEXT HANDLE!"); + let mut ctx_handle = self.0.lock().unwrap(); + // TODO: error handling? atleast probably need rcl_reset_error + unsafe { + rcl::rcl_shutdown(ctx_handle.as_mut()); + rcl::rcl_context_fini(ctx_handle.as_mut()); + } + } +} + +pub struct Node { + context: Context, + node_handle: Box, + // the node owns the subscribers + subs: Vec>, + // and the publishers, whom we allow to be shared.. hmm. + pubs: Vec>, +} + +impl Node { + pub fn create(ctx: Context, name: &str, namespace: &str) -> Result { + let (res, node_handle) = { + let mut ctx_handle = ctx.context_handle.lock().unwrap(); + + let c_node_name = CString::new(name).unwrap(); + let c_node_ns = CString::new(namespace).unwrap(); + let mut node_handle: Box = + unsafe { Box::new(rcl_get_zero_initialized_node()) }; + let res = unsafe { + let node_options = rcl_node_get_default_options(); + rcl_node_init( + node_handle.as_mut(), + c_node_name.as_ptr(), + c_node_ns.as_ptr(), + ctx_handle.as_mut(), + &node_options as *const _, + ) + }; + (res, node_handle) + }; + + if res == RCL_RET_OK as i32 { + Ok(Node { + context: ctx, + node_handle: node_handle, + subs: Vec::new(), + pubs: Vec::new(), + }) + } else { + eprintln!("{}", res); + Err(()) } } - unsafe { - rcl_wait(&mut ws, timeout); - } + fn create_subscription_helper(&mut self, topic: &str) -> Result + where + T: WrappedTypesupport, + { + let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; + let topic_c_string = CString::new(topic).map_err(|_| ())?; - for s in subs { - let mut msg_info = rmw_message_info_t::default(); - let ret = unsafe { rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut()) }; - // fresh message, run cb - if ret == RCL_RET_OK as i32 { - s.run_cb(); + let result = unsafe { + let mut subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos = rmw_qos_profile_t::default(); + rcl_subscription_init( + &mut subscription_handle, + self.node_handle.as_mut(), + T::get_ts(), + topic_c_string.as_ptr(), + &subscription_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(subscription_handle) + } else { + Err(()) } } - unsafe { - rcl_wait_set_fini(&mut ws); + pub fn subscribe( + &mut self, + topic: &str, + callback: Box ()>, + ) -> Result<&rcl_subscription_t, ()> + where + T: WrappedTypesupport, + { + let subscription_handle = self.create_subscription_helper::(topic)?; + let ws = WrappedSub { + rcl_handle: subscription_handle, + rcl_msg: WrappedNativeMsg::::new(), + callback: callback, + }; + self.subs.push(Box::new(ws)); + Ok(self.subs.last().unwrap().handle()) // hmm... } - Ok(()) + pub fn subscribe_native( + &mut self, + topic: &str, + callback: Box) -> ()>, + ) -> Result<&rcl_subscription_t, ()> + where + T: WrappedTypesupport, + { + let subscription_handle = self.create_subscription_helper::(topic)?; + let ws = WrappedSubNative { + rcl_handle: subscription_handle, + rcl_msg: WrappedNativeMsg::::new(), + callback: callback, + }; + self.subs.push(Box::new(ws)); + Ok(self.subs.last().unwrap().handle()) // hmm... + } + + pub fn create_publisher(&mut self, topic: &str) -> Result, ()> + where + T: WrappedTypesupport, + { + let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; + let topic_c_string = CString::new(topic).unwrap(); + + let result = unsafe { + let mut publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos = rmw_qos_profile_t::default(); + rcl_publisher_init( + &mut publisher_handle, + self.node_handle.as_mut(), + T::get_ts(), + topic_c_string.as_ptr(), + &publisher_options, + ) + }; + if result == RCL_RET_OK as i32 { + self.pubs.push(Arc::new(publisher_handle)); + let p = Publisher { + handle: Arc::downgrade(self.pubs.last().unwrap()), + type_: PhantomData, + }; + Ok(p) + } else { + eprintln!("{}", result); + Err(()) + } + } + + pub fn spin_once(&mut self, timeout: Duration) { + let timeout = timeout.as_nanos() as i64; + let mut ws = unsafe { rcl_get_zero_initialized_wait_set() }; + + // #[doc = "* This function is thread-safe for unique wait sets with unique contents."] + // #[doc = "* This function cannot operate on the same wait set in multiple threads, and"] + // #[doc = "* the wait sets may not share content."] + // #[doc = "* For example, calling rcl_wait() in two threads on two different wait sets"] + // #[doc = "* that both contain a single, shared guard condition is undefined behavior."] + + { + let mut ctx = self.context.context_handle.lock().unwrap(); + + unsafe { + rcl_wait_set_init( + &mut ws, + self.subs.len(), + 0, + 0, + 0, + 0, + 0, + ctx.as_mut(), + rcutils_get_default_allocator(), + ); + } + } + unsafe { + rcl_wait_set_clear(&mut ws); + } + + for s in self.subs.iter() { + unsafe { + rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + } + } + + unsafe { + rcl_wait(&mut ws, timeout); + } + + let ws_subs = + unsafe { std::slice::from_raw_parts(ws.subscriptions, ws.size_of_subscriptions) }; + assert_eq!(ws_subs.len(), self.subs.len()); + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + for (s, ws_s) in self.subs.iter_mut().zip(ws_subs) { + if ws_s != &std::ptr::null() { + let ret = unsafe { + rcl_take(s.handle(), s.rcl_msg(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + s.run_cb(); + } + } + } + + unsafe { + rcl_wait_set_fini(&mut ws); + } + } +} + +// Since publishers are temporarily upgraded to owners during the +// actual publish but are not the ones that handle cleanup, we simply +// wait until there are no other owners in the cleanup procedure. The +// next time a publisher wants to publish they will fail because the +// value in the Arc has been dropped. Hacky but works. +fn wait_until_unwrapped(mut a: Arc) -> T +where + T: std::fmt::Debug, +{ + loop { + match Arc::try_unwrap(a) { + Ok(b) => return b, + Err(t) => a = t, + } + } +} + +impl Drop for Node { + fn drop(&mut self) { + for s in &mut self.subs { + s.destroy(&mut self.node_handle); + } + while let Some(p) = self.pubs.pop() { + let mut p = wait_until_unwrapped(p); + let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) }; + // TODO: check ret + } + unsafe { + rcl_node_fini(self.node_handle.as_mut()); + } + } +} + +impl Publisher +where + T: WrappedTypesupport, +{ + pub fn publish(&self, msg: &T) -> Result<(), ()> + where + T: WrappedTypesupport, + { + // upgrade to actual ref. if still alive + let publisher = self.handle.upgrade().ok_or(())?; + // copy rust msg to native and publish it + let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + let result = unsafe { + rcl_publish( + publisher.as_ref(), + native_msg.void_ptr(), + std::ptr::null_mut(), + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("{}", result); + Err(()) + } + } + + pub fn publish_native(&self, msg: &WrappedNativeMsg) -> Result<(), ()> + where + T: WrappedTypesupport, + { + // upgrade to actual ref. if still alive + let publisher = self.handle.upgrade().ok_or(())?; + + let result = + unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }; + if result == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("{}", result); + Err(()) + } + } } #[cfg(test)]