Add rcl_publisher_get_subscription_count related methods (#75)
* Add rcl_publisher_get_subscription_count related methods * Update implementation of publisher to handle wait_for_inter_process_subscribers
This commit is contained in:
parent
d5f2ef0eac
commit
3d6936e70a
|
|
@ -51,7 +51,7 @@ pub struct Node {
|
||||||
// timers,
|
// timers,
|
||||||
timers: Vec<Timer_>,
|
timers: Vec<Timer_>,
|
||||||
// and the publishers, whom we allow to be shared.. hmm.
|
// and the publishers, whom we allow to be shared.. hmm.
|
||||||
pubs: Vec<Arc<rcl_publisher_t>>,
|
pubs: Vec<Arc<Publisher_>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for Node {}
|
unsafe impl Send for Node {}
|
||||||
|
|
@ -828,6 +828,10 @@ impl Node {
|
||||||
c.lock().unwrap().poll_available(self.node_handle.as_mut());
|
c.lock().unwrap().poll_available(self.node_handle.as_mut());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for p in &self.pubs {
|
||||||
|
p.poll_has_inter_process_subscribers();
|
||||||
|
}
|
||||||
|
|
||||||
let timeout = timeout.as_nanos() as i64;
|
let timeout = timeout.as_nanos() as i64;
|
||||||
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
|
let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
|
||||||
|
|
||||||
|
|
@ -1305,9 +1309,9 @@ impl Drop for Node {
|
||||||
s.lock().unwrap().destroy(&mut self.node_handle);
|
s.lock().unwrap().destroy(&mut self.node_handle);
|
||||||
}
|
}
|
||||||
while let Some(p) = self.pubs.pop() {
|
while let Some(p) = self.pubs.pop() {
|
||||||
let mut p = wait_until_unwrapped(p);
|
let p = wait_until_unwrapped(p);
|
||||||
let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) };
|
|
||||||
// TODO: check ret
|
p.destroy(self.node_handle.as_mut());
|
||||||
}
|
}
|
||||||
unsafe {
|
unsafe {
|
||||||
rcl_node_fini(self.node_handle.as_mut());
|
rcl_node_fini(self.node_handle.as_mut());
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,10 @@ use std::fmt::Debug;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Once;
|
use std::sync::Once;
|
||||||
use std::sync::Weak;
|
use std::sync::Weak;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use futures::Future;
|
||||||
|
use futures::channel::oneshot;
|
||||||
|
use futures::TryFutureExt;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::msg_types::*;
|
use crate::msg_types::*;
|
||||||
|
|
@ -37,6 +41,68 @@ use r2r_rcl::*;
|
||||||
|
|
||||||
unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
|
unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
|
||||||
|
|
||||||
|
pub(crate) struct Publisher_ {
|
||||||
|
handle: rcl_publisher_t,
|
||||||
|
|
||||||
|
// TODO use a mpsc to avoid the mutex?
|
||||||
|
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Publisher_
|
||||||
|
{
|
||||||
|
fn get_inter_process_subscription_count(&self) -> Result<usize> {
|
||||||
|
// See https://github.com/ros2/rclcpp/issues/623
|
||||||
|
|
||||||
|
let mut inter_process_subscription_count = 0;
|
||||||
|
|
||||||
|
let result = unsafe {
|
||||||
|
rcl_publisher_get_subscription_count(
|
||||||
|
&self.handle as *const rcl_publisher_t,
|
||||||
|
&mut inter_process_subscription_count as *mut usize,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
if result == RCL_RET_OK as i32 {
|
||||||
|
Ok(inter_process_subscription_count)
|
||||||
|
} else {
|
||||||
|
Err(Error::from_rcl_error(result))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_has_inter_process_subscribers(&self) {
|
||||||
|
|
||||||
|
let mut poll_inter_process_subscriber_channels =
|
||||||
|
self.poll_inter_process_subscriber_channels.lock().unwrap();
|
||||||
|
|
||||||
|
if poll_inter_process_subscriber_channels.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let inter_process_subscription_count = self.get_inter_process_subscription_count();
|
||||||
|
match inter_process_subscription_count {
|
||||||
|
Ok(0) => {
|
||||||
|
// not available...
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
// send ok and close channels
|
||||||
|
while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
|
||||||
|
let _res = sender.send(()); // we ignore if receiver dropped.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// error, close all channels
|
||||||
|
poll_inter_process_subscriber_channels.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
|
||||||
|
let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };
|
||||||
|
|
||||||
|
// TODO: check ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// A ROS (typed) publisher.
|
/// A ROS (typed) publisher.
|
||||||
///
|
///
|
||||||
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
|
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
|
||||||
|
|
@ -46,7 +112,7 @@ pub struct Publisher<T>
|
||||||
where
|
where
|
||||||
T: WrappedTypesupport,
|
T: WrappedTypesupport,
|
||||||
{
|
{
|
||||||
handle: Weak<rcl_publisher_t>,
|
handle: Weak<Publisher_>,
|
||||||
type_: PhantomData<T>,
|
type_: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -58,11 +124,11 @@ unsafe impl Send for PublisherUntyped {}
|
||||||
/// move between threads.
|
/// move between threads.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PublisherUntyped {
|
pub struct PublisherUntyped {
|
||||||
handle: Weak<rcl_publisher_t>,
|
handle: Weak<Publisher_>,
|
||||||
type_: String,
|
type_: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_publisher<T>(handle: Weak<rcl_publisher_t>) -> Publisher<T>
|
pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
|
||||||
where
|
where
|
||||||
T: WrappedTypesupport,
|
T: WrappedTypesupport,
|
||||||
{
|
{
|
||||||
|
|
@ -72,14 +138,17 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_publisher_untyped(handle: Weak<rcl_publisher_t>, type_: String) -> PublisherUntyped {
|
pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
|
||||||
PublisherUntyped { handle, type_ }
|
PublisherUntyped {
|
||||||
|
handle,
|
||||||
|
type_,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_publisher_helper(
|
pub fn create_publisher_helper(
|
||||||
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
|
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
|
||||||
qos_profile: QosProfile,
|
qos_profile: QosProfile,
|
||||||
) -> Result<rcl_publisher_t> {
|
) -> Result<Publisher_> {
|
||||||
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
|
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
|
||||||
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
||||||
|
|
||||||
|
|
@ -95,7 +164,10 @@ pub fn create_publisher_helper(
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
if result == RCL_RET_OK as i32 {
|
if result == RCL_RET_OK as i32 {
|
||||||
Ok(publisher_handle)
|
Ok(Publisher_ {
|
||||||
|
handle: publisher_handle,
|
||||||
|
poll_inter_process_subscriber_channels: Mutex::new(Vec::new())
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
Err(Error::from_rcl_error(result))
|
Err(Error::from_rcl_error(result))
|
||||||
}
|
}
|
||||||
|
|
@ -116,7 +188,11 @@ impl PublisherUntyped {
|
||||||
native_msg.from_json(msg)?;
|
native_msg.from_json(msg)?;
|
||||||
|
|
||||||
let result =
|
let result =
|
||||||
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
|
unsafe { rcl_publish(
|
||||||
|
&publisher.handle as *const rcl_publisher_t,
|
||||||
|
native_msg.void_ptr(),
|
||||||
|
std::ptr::null_mut())
|
||||||
|
};
|
||||||
|
|
||||||
if result == RCL_RET_OK as i32 {
|
if result == RCL_RET_OK as i32 {
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -125,8 +201,34 @@ impl PublisherUntyped {
|
||||||
Err(Error::from_rcl_error(result))
|
Err(Error::from_rcl_error(result))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the number of external subscribers (i.e. it doesn't
|
||||||
|
/// count subscribers from the same process).
|
||||||
|
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
|
||||||
|
self.handle
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
|
||||||
|
.get_inter_process_subscription_count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Waits for at least one external subscriber to begin subscribing to the
|
||||||
|
/// topic. It doesn't count subscribers from the same process.
|
||||||
|
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
self.handle
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
|
||||||
|
.poll_inter_process_subscriber_channels
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.push(sender);
|
||||||
|
|
||||||
|
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: 'static> Publisher<T>
|
impl<T: 'static> Publisher<T>
|
||||||
where
|
where
|
||||||
T: WrappedTypesupport,
|
T: WrappedTypesupport,
|
||||||
|
|
@ -143,7 +245,11 @@ where
|
||||||
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
||||||
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
|
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
|
||||||
let result =
|
let result =
|
||||||
unsafe { rcl_publish(publisher.as_ref(), native_msg.void_ptr(), std::ptr::null_mut()) };
|
unsafe { rcl_publish(
|
||||||
|
&publisher.handle as *const rcl_publisher_t,
|
||||||
|
native_msg.void_ptr(),
|
||||||
|
std::ptr::null_mut())
|
||||||
|
};
|
||||||
|
|
||||||
if result == RCL_RET_OK as i32 {
|
if result == RCL_RET_OK as i32 {
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -163,17 +269,21 @@ where
|
||||||
.upgrade()
|
.upgrade()
|
||||||
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
||||||
|
|
||||||
if unsafe { rcl_publisher_can_loan_messages(publisher.as_ref()) } {
|
if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
|
||||||
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
|
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
|
||||||
let ret = unsafe {
|
let ret = unsafe {
|
||||||
rcl_borrow_loaned_message(publisher.as_ref(), T::get_ts(), &mut loaned_msg)
|
rcl_borrow_loaned_message(
|
||||||
|
&publisher.handle as *const rcl_publisher_t,
|
||||||
|
T::get_ts(),
|
||||||
|
&mut loaned_msg
|
||||||
|
)
|
||||||
};
|
};
|
||||||
if ret != RCL_RET_OK as i32 {
|
if ret != RCL_RET_OK as i32 {
|
||||||
log::error!("Failed getting loaned message");
|
log::error!("Failed getting loaned message");
|
||||||
return Err(Error::from_rcl_error(ret));
|
return Err(Error::from_rcl_error(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
let handle_box = Box::new(*publisher.as_ref());
|
let handle_box = Box::new(publisher.handle);
|
||||||
let msg = WrappedNativeMsg::<T>::from_loaned(
|
let msg = WrappedNativeMsg::<T>::from_loaned(
|
||||||
loaned_msg as *mut T::CStruct,
|
loaned_msg as *mut T::CStruct,
|
||||||
Box::new(|msg: *mut T::CStruct| {
|
Box::new(|msg: *mut T::CStruct| {
|
||||||
|
|
@ -226,13 +336,17 @@ where
|
||||||
|
|
||||||
// publish and return loaned message to middleware
|
// publish and return loaned message to middleware
|
||||||
rcl_publish_loaned_message(
|
rcl_publish_loaned_message(
|
||||||
publisher.as_ref(),
|
&publisher.handle as *const rcl_publisher_t,
|
||||||
msg.void_ptr_mut(),
|
msg.void_ptr_mut(),
|
||||||
std::ptr::null_mut(),
|
std::ptr::null_mut(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
unsafe { rcl_publish(publisher.as_ref(), msg.void_ptr(), std::ptr::null_mut()) }
|
unsafe { rcl_publish(
|
||||||
|
&publisher.handle as *const rcl_publisher_t,
|
||||||
|
msg.void_ptr(),
|
||||||
|
std::ptr::null_mut()
|
||||||
|
) }
|
||||||
};
|
};
|
||||||
|
|
||||||
if result == RCL_RET_OK as i32 {
|
if result == RCL_RET_OK as i32 {
|
||||||
|
|
@ -242,4 +356,30 @@ where
|
||||||
Err(Error::from_rcl_error(result))
|
Err(Error::from_rcl_error(result))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the number of external subscribers (i.e. it doesn't
|
||||||
|
/// count subscribers from the same process).
|
||||||
|
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
|
||||||
|
self.handle
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
|
||||||
|
.get_inter_process_subscription_count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits for at least one external subscriber to begin subscribing to the
|
||||||
|
/// topic. It doesn't count subscribers from the same process.
|
||||||
|
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
self.handle
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
|
||||||
|
.poll_inter_process_subscriber_channels
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.push(sender);
|
||||||
|
|
||||||
|
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue