Make rcl_subscription_t have stable location

This change is necessary because rcl uses address of the rcl handle in
tracepoints. If the location changes further `take` events will not
be correctly assigned to the subscriber.
This commit is contained in:
Martin Škoudlil 2025-03-04 15:05:29 +01:00
parent 5514c475bc
commit a51bda8901
3 changed files with 96 additions and 40 deletions

View File

@ -653,15 +653,27 @@ impl Node {
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(TypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
self.subscribers.push(subscription);
Ok(receiver)
}
@ -674,15 +686,27 @@ impl Node {
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
let ws = NativeSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(NativeSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
self.subscribers.push(subscription);
Ok(receiver)
}
@ -694,16 +718,28 @@ impl Node {
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
let ws = UntypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(UntypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
topic_type: topic_type.to_string(),
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
self.subscribers.push(subscription);
Ok(receiver)
}
@ -739,16 +775,28 @@ impl Node {
return Err(Error::from_rcl_error(ret));
}
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
let ws = RawSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(RawSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
msg_buf,
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
self.subscribers.push(subscription);
Ok(receiver)
}

View File

@ -234,18 +234,17 @@ impl Subscriber_ for RawSubscriber {
}
}
pub fn create_subscription_helper(
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_subscription_t> {
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
pub unsafe fn create_subscription_helper(
subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str,
ts: *const rosidl_message_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let mut subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile.into();
rcl_subscription_init(
&mut subscription_handle,
subscription_handle,
node,
ts,
topic_c_string.as_ptr(),
@ -253,7 +252,7 @@ pub fn create_subscription_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(subscription_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -9,8 +9,8 @@ use crate::{
Clock, ClockType, Node, QosProfile, WrappedTypesupport,
};
use r2r_rcl::{
rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
rmw_message_info_t, RCL_RET_OK,
rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t,
rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK,
};
use std::sync::{Arc, Mutex, Weak};
@ -102,7 +102,7 @@ impl TimeSource {
match inner.subscriber_state {
TimeSourceSubscriberState::None => {
let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
node.subscribers.push(Box::new(subscriber));
node.subscribers.push(subscriber);
inner.subscriber_state = TimeSourceSubscriberState::Active;
}
TimeSourceSubscriberState::ToBeDestroyed => {
@ -191,20 +191,29 @@ impl TimeSource_ {
}
impl TimeSourceSubscriber {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<Box<Self>> {
// The values are set based on default values in rclcpp
let qos = QosProfile::default().keep_last(1).best_effort();
let subscriber = create_subscription_helper(
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?;
Ok(Self {
subscriber_handle: subscriber,
let mut subscriber = Box::new(Self {
subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() },
time_source,
})
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscriber.subscriber_handle,
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?
};
Ok(subscriber)
}
}