add get_publishers_info_by_topic (#80)

* add get_publishers_info_by_topic
This commit is contained in:
NerdToMars 2024-01-12 03:55:33 +08:00 committed by GitHub
parent cb87b9c01c
commit b3e4a58eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 338 additions and 66 deletions

3
.dockerignore Normal file
View File

@ -0,0 +1,3 @@
.git
.github
target

View File

@ -792,7 +792,7 @@ impl Node {
}
/// Create a ROS publisher with a type given at runtime, where the data may either be
/// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
/// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
/// (i.e. &[u8], using the `publish_raw` method).
pub fn create_publisher_untyped(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
@ -1169,6 +1169,46 @@ impl Node {
Ok(res)
}
pub fn get_publishers_info_by_topic(
&self, topic_name: &str, no_mangle: bool,
) -> Result<Vec<TopicEndpointInfo>> {
let node = self.node_handle.as_ref();
let topic_c_string =
CString::new(topic_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let mut allocator = unsafe { rcutils_get_default_allocator() };
let mut info_array: rcl_topic_endpoint_info_array_t =
unsafe { rmw_get_zero_initialized_topic_endpoint_info_array() };
let result = unsafe {
rcl_get_publishers_info_by_topic(
node,
&mut allocator,
topic_c_string.as_ptr(),
no_mangle,
&mut info_array,
)
};
if result != RCL_RET_OK as i32 {
unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
return Err(Error::from_rcl_error(result));
}
// Convert info_array to Vec<TopicEndpointInfo>
let topic_info_list = convert_info_array_to_vec(&info_array);
let result = unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
if result != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(result));
}
Ok(topic_info_list)
}
/// Create a ROS wall timer.
///
/// Create a ROS timer that is woken up by spin every `period`.
@ -1324,3 +1364,56 @@ impl Drop for Node {
pub trait IsAvailablePollable {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
}
pub struct TopicEndpointInfo {
pub node_name: String,
pub node_namespace: String,
pub topic_type: String,
pub endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize],
pub qos_profile: QosProfile,
}
impl From<rmw_topic_endpoint_info_t> for TopicEndpointInfo {
fn from(info: rmw_topic_endpoint_info_t) -> Self {
// Convert C strings to Rust String
let node_name = unsafe { CStr::from_ptr(info.node_name) }
.to_string_lossy()
.into_owned();
let node_namespace = unsafe { CStr::from_ptr(info.node_namespace) }
.to_string_lossy()
.into_owned();
let topic_type = unsafe { CStr::from_ptr(info.topic_type) }
.to_string_lossy()
.into_owned();
// Copy the endpoint_gid array
let endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize] = info.endpoint_gid;
// Convert qos_profile
let qos_profile = QosProfile::from(info.qos_profile); // Adjust this line based on how QosProfile is defined
TopicEndpointInfo {
node_name,
node_namespace,
topic_type,
endpoint_gid,
qos_profile,
}
}
}
fn convert_info_array_to_vec(
info_array: &rcl_topic_endpoint_info_array_t,
) -> Vec<TopicEndpointInfo> {
let mut topic_info_list = Vec::with_capacity(info_array.size);
unsafe {
let infos = std::slice::from_raw_parts(info_array.info_array, info_array.size);
for &info in infos {
let endpoint_info = TopicEndpointInfo::from(info);
topic_info_list.push(endpoint_info);
}
}
topic_info_list
}

View File

@ -233,6 +233,19 @@ impl From<HistoryPolicy> for rmw_qos_history_policy_t {
}
}
impl From<rmw_qos_history_policy_t> for HistoryPolicy {
fn from(rmw_history_policy: rmw_qos_history_policy_t) -> Self {
match rmw_history_policy {
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_ALL => HistoryPolicy::KeepAll,
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_LAST => HistoryPolicy::KeepLast,
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT => {
HistoryPolicy::SystemDefault
}
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_UNKNOWN => HistoryPolicy::Unknown,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReliabilityPolicy {
BestEffort,
@ -259,6 +272,25 @@ impl From<ReliabilityPolicy> for rmw_qos_reliability_policy_t {
}
}
impl From<rmw_qos_reliability_policy_t> for ReliabilityPolicy {
fn from(rmw_reliability_policy: rmw_qos_reliability_policy_t) -> Self {
match rmw_reliability_policy {
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT => {
ReliabilityPolicy::BestEffort
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_RELIABLE => {
ReliabilityPolicy::Reliable
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT => {
ReliabilityPolicy::SystemDefault
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_UNKNOWN => {
ReliabilityPolicy::Unknown
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DurabilityPolicy {
TransientLocal,
@ -286,6 +318,25 @@ impl From<DurabilityPolicy> for rmw_qos_durability_policy_t {
}
}
impl From<rmw_qos_durability_policy_t> for DurabilityPolicy {
fn from(rmw_durability_policy: rmw_qos_durability_policy_t) -> Self {
match rmw_durability_policy {
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL => {
DurabilityPolicy::TransientLocal
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_VOLATILE => {
DurabilityPolicy::Volatile
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT => {
DurabilityPolicy::SystemDefault
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_UNKNOWN => {
DurabilityPolicy::Unknown
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LivelinessPolicy {
Automatic,
@ -317,6 +368,28 @@ impl From<LivelinessPolicy> for rmw_qos_liveliness_policy_t {
}
}
impl From<rmw_qos_liveliness_policy_t> for LivelinessPolicy {
fn from(rmw_liveliness_policy: rmw_qos_liveliness_policy_t) -> Self {
match rmw_liveliness_policy {
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_AUTOMATIC => {
LivelinessPolicy::Automatic
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE => {
LivelinessPolicy::ManualByNode
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC => {
LivelinessPolicy::ManualByTopic
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT => {
LivelinessPolicy::SystemDefault
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_UNKNOWN => {
LivelinessPolicy::Unknown
}
}
}
}
/// QoS profile
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QosProfile {
@ -719,8 +792,28 @@ impl From<QosProfile> for r2r_rcl::rmw_qos_profile_t {
}
}
}
impl From<r2r_rcl::rmw_qos_profile_t> for QosProfile {
fn from(rmw_qos: r2r_rcl::rmw_qos_profile_t) -> Self {
QosProfile {
history: rmw_qos.history.into(),
depth: rmw_qos.depth,
reliability: rmw_qos.reliability.into(),
durability: rmw_qos.durability.into(),
deadline: Duration::from_rmw_time_t(&rmw_qos.deadline),
lifespan: Duration::from_rmw_time_t(&rmw_qos.lifespan),
liveliness: rmw_qos.liveliness.into(),
liveliness_lease_duration: Duration::from_rmw_time_t(
&rmw_qos.liveliness_lease_duration,
),
avoid_ros_namespace_conventions: rmw_qos.avoid_ros_namespace_conventions,
}
}
}
pub(crate) trait RclDurationT {
fn to_rmw_time_t(&self) -> rmw_time_t;
fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self;
}
impl RclDurationT for Duration {
@ -730,4 +823,28 @@ impl RclDurationT for Duration {
nsec: self.subsec_nanos().into(),
}
}
fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self {
#[cfg(not(r2r__ros__distro__foxy))]
{
assert!(
rmw_time.nsec < 1_000_000_000,
"nsec part of rmw_time_t should be less than 1 billion"
);
}
#[cfg(r2r__ros__distro__foxy)]
{
// FIXME: In foxy, duration data obtained from publisher with default qos profile is
// sec: 7FFFFFFF (2147483647), nsec: FFFFFFFF (4294967295)
if rmw_time.nsec == 4294967295 {
// 0s indicates deadline policies are not tracked or enforced in foxy
return Duration::new(0, 0);
} else if rmw_time.nsec > 1_000_000_000 {
panic!("nsec part of rmw_time_t should be less than 1 billion");
}
}
Duration::new(rmw_time.sec, rmw_time.nsec as u32)
}
}

View File

@ -9,85 +9,144 @@ const N_TEARDOWN_CYCLES: usize = 2;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
let mut threads = futures::stream::FuturesUnordered::from_iter(
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| tokio::spawn(async move {
// Iterate to check for memory corruption on node setup/teardown
for i_cycle in 0..N_TEARDOWN_CYCLES {
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| {
tokio::spawn(async move {
// Iterate to check for memory corruption on node setup/teardown
for i_cycle in 0..N_TEARDOWN_CYCLES {
println!("tokio_testing iteration {i_cycle}");
println!("tokio_testing iteration {i_cycle}");
let ctx = r2r::Context::create().unwrap();
// let ctx = std::thread::spawn(|| r2r::Context::create().unwrap()).join().unwrap();
let ctx = r2r::Context::create().unwrap();
// let ctx = std::thread::spawn(|| r2r::Context::create().unwrap()).join().unwrap();
let mut node =
r2r::Node::create(ctx, &format!("testnode_{i_context}"), "").unwrap();
let mut s_the_no = node
.subscribe::<r2r::std_msgs::msg::Int32>(
&format!("/the_no_{i_context}"),
QosProfile::default(),
)
.unwrap();
let mut s_new_no = node
.subscribe::<r2r::std_msgs::msg::Int32>(
&format!("/new_no_{i_context}"),
QosProfile::default(),
)
.unwrap();
let p_the_no = node
.create_publisher::<r2r::std_msgs::msg::Int32>(
&format!("/the_no_{i_context}"),
QosProfile::default(),
)
.unwrap();
let p_new_no = node
.create_publisher::<r2r::std_msgs::msg::Int32>(
&format!("/new_no_{i_context}"),
QosProfile::default(),
)
.unwrap();
let mut node = r2r::Node::create(ctx, &format!("testnode_{i_context}"), "").unwrap();
let mut s_the_no =
node.subscribe::<r2r::std_msgs::msg::Int32>(&format!("/the_no_{i_context}"), QosProfile::default()).unwrap();
let mut s_new_no =
node.subscribe::<r2r::std_msgs::msg::Int32>(&format!("/new_no_{i_context}"), QosProfile::default()).unwrap();
let p_the_no =
node.create_publisher::<r2r::std_msgs::msg::Int32>(&format!("/the_no_{i_context}"), QosProfile::default()).unwrap();
let p_new_no =
node.create_publisher::<r2r::std_msgs::msg::Int32>(&format!("/new_no_{i_context}"), QosProfile::default()).unwrap();
let state = Arc::new(Mutex::new(0));
let p_float_no = node
.create_publisher::<r2r::std_msgs::msg::Float32>(
&format!("/float_no_{i_context}"),
QosProfile::default().best_effort(),
)
.unwrap();
task::spawn(async move {
(0..10).for_each(|i| {
p_the_no
.publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap();
let pub_info = node
.get_publishers_info_by_topic(&format!("/float_no_{i_context}"), false)
.unwrap();
assert_eq!(pub_info.len(), 1);
assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Float32".to_owned());
assert_eq!(
pub_info[0].qos_profile.reliability,
QosProfile::default().best_effort().reliability
);
assert_eq!(
pub_info[0].qos_profile.durability,
QosProfile::default().durability
);
println!("send {i}");
let pub_info = node
.get_publishers_info_by_topic(&format!("/new_no_{i_context}"), false)
.unwrap();
assert_eq!(pub_info.len(), 1);
assert_eq!(pub_info[0].topic_type, "std_msgs/msg/Int32".to_owned());
assert_eq!(
pub_info[0].qos_profile.reliability,
QosProfile::default().reliability
);
assert_eq!(
pub_info[0].qos_profile.durability,
QosProfile::default().durability
);
let state = Arc::new(Mutex::new(0));
task::spawn(async move {
(0..10).for_each(|i| {
p_the_no
.publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap();
println!("send {i}");
});
});
});
task::spawn(async move {
while let Some(msg) = s_the_no.next().await {
p_new_no
.publish(&r2r::std_msgs::msg::Int32 {
data: msg.data + 10,
})
.unwrap();
task::spawn(async move {
while let Some(msg) = s_the_no.next().await {
p_new_no
.publish(&r2r::std_msgs::msg::Int32 {
data: msg.data + 10,
})
.unwrap();
println!("got {}, send {}", msg.data, msg.data + 10);
}
});
let s = state.clone();
task::spawn(async move {
while let Some(msg) = s_new_no.next().await {
println!("got {}", msg.data);
let i = msg.data;
*s.lock().unwrap() = i;
}
});
// std::thread::spawn doesn't work here anymore?
let handle = task::spawn_blocking(move || {
for _ in 1..30 {
node.spin_once(std::time::Duration::from_millis(100));
let x = state.lock().unwrap();
println!("rec {}", x);
if *x == 19 {
break;
println!("got {}, send {}", msg.data, msg.data + 10);
}
}
});
*state.lock().unwrap()
});
let x = handle.await.unwrap();
assert_eq!(x, 19);
let s = state.clone();
task::spawn(async move {
while let Some(msg) = s_new_no.next().await {
println!("got {}", msg.data);
println!("tokio_testing finish iteration {i_cycle}");
let i = msg.data;
}
})));
*s.lock().unwrap() = i;
}
});
task::spawn(async move {
(0..10).for_each(|i| {
p_float_no
.publish(&r2r::std_msgs::msg::Float32 { data: i as f32 })
.unwrap();
});
});
// std::thread::spawn doesn't work here anymore?
let handle = task::spawn_blocking(move || {
for _ in 1..30 {
node.spin_once(std::time::Duration::from_millis(100));
let x = state.lock().unwrap();
println!("rec {}", x);
if *x == 19 {
break;
}
}
*state.lock().unwrap()
});
let x = handle.await.unwrap();
assert_eq!(x, 19);
println!("tokio_testing finish iteration {i_cycle}");
}
})
}),
);
while let Some(thread) = threads.next().await {
thread.unwrap();