cargo fmt

This commit is contained in:
Martin Dahl 2024-01-11 21:03:41 +01:00
parent b3e4a58eca
commit fe258169dd
6 changed files with 295 additions and 266 deletions

View File

@ -45,8 +45,10 @@ static CONTEXT: OnceLock<Result<Context>> = OnceLock::new();
impl Context {
/// Create a ROS context.
pub fn create() -> Result<Context> {
CONTEXT.get_or_init(|| {
let mut ctx: Box<rcl_context_t> = unsafe { Box::new(rcl_get_zero_initialized_context()) };
CONTEXT
.get_or_init(|| {
let mut ctx: Box<rcl_context_t> =
unsafe { Box::new(rcl_get_zero_initialized_context()) };
// argc/v
let args = std::env::args()
.map(|arg| CString::new(arg).unwrap())
@ -87,7 +89,8 @@ impl Context {
} else {
Err(Error::RCL_RET_ERROR) // TODO
}
}).clone()
})
.clone()
}
/// Check if the ROS context is valid.

View File

@ -79,9 +79,9 @@ mod msg_types;
pub use msg_types::generated_msgs::*;
pub use msg_types::WrappedActionTypeSupport;
pub use msg_types::WrappedNativeMsg as NativeMsg;
pub use msg_types::WrappedNativeMsgUntyped;
pub use msg_types::WrappedServiceTypeSupport;
pub use msg_types::WrappedTypesupport;
pub use msg_types::WrappedNativeMsgUntyped;
mod utils;
pub use utils::*;

View File

@ -423,11 +423,7 @@ impl WrappedNativeMsgUntyped {
// any part of msg_buf. However it shouldn't matter since from_native
// clones everything again anyway ..
let result = unsafe {
rmw_deserialize(
&msg_buf as *const rcl_serialized_message_t,
self.ts,
self.msg,
)
rmw_deserialize(&msg_buf as *const rcl_serialized_message_t, self.ts, self.msg)
};
if result == RCL_RET_OK as i32 {

View File

@ -1,13 +1,13 @@
use futures::channel::oneshot;
use futures::Future;
use futures::TryFutureExt;
use std::ffi::c_void;
use std::ffi::CString;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Mutex;
use std::sync::Once;
use std::sync::Weak;
use std::sync::Mutex;
use futures::Future;
use futures::channel::oneshot;
use futures::TryFutureExt;
use crate::error::*;
use crate::msg_types::*;
@ -45,11 +45,10 @@ pub(crate) struct Publisher_ {
handle: rcl_publisher_t,
// TODO use a mpsc to avoid the mutex?
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>,
}
impl Publisher_
{
impl Publisher_ {
fn get_inter_process_subscription_count(&self) -> Result<usize> {
// See https://github.com/ros2/rclcpp/issues/623
@ -70,7 +69,6 @@ impl Publisher_
}
pub(crate) fn poll_has_inter_process_subscribers(&self) {
let mut poll_inter_process_subscriber_channels =
self.poll_inter_process_subscriber_channels.lock().unwrap();
@ -102,7 +100,6 @@ impl Publisher_
}
}
/// A ROS (typed) publisher.
///
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
@ -139,10 +136,7 @@ where
}
pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
PublisherUntyped {
handle,
type_,
}
PublisherUntyped { handle, type_ }
}
pub fn create_publisher_helper(
@ -166,7 +160,7 @@ pub fn create_publisher_helper(
if result == RCL_RET_OK as i32 {
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new())
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
})
} else {
Err(Error::from_rcl_error(result))
@ -187,11 +181,12 @@ impl PublisherUntyped {
let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
native_msg.from_json(msg)?;
let result =
unsafe { rcl_publish(
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut())
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
@ -221,15 +216,16 @@ impl PublisherUntyped {
buffer_capacity: data.len(),
// Since its read only, this should never be used ..
allocator: unsafe { rcutils_get_default_allocator() }
allocator: unsafe { rcutils_get_default_allocator() },
};
let result =
unsafe { rcl_publish_serialized_message(
let result = unsafe {
rcl_publish_serialized_message(
&publisher.handle,
&msg_buf as *const rcl_serialized_message_t,
std::ptr::null_mut()
) };
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
Ok(())
@ -265,7 +261,6 @@ impl PublisherUntyped {
}
}
impl<T: 'static> Publisher<T>
where
T: WrappedTypesupport,
@ -281,11 +276,12 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
let result =
unsafe { rcl_publish(
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut())
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
@ -312,7 +308,7 @@ where
rcl_borrow_loaned_message(
&publisher.handle as *const rcl_publisher_t,
T::get_ts(),
&mut loaned_msg
&mut loaned_msg,
)
};
if ret != RCL_RET_OK as i32 {
@ -379,11 +375,13 @@ where
)
}
} else {
unsafe { rcl_publish(
unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
msg.void_ptr(),
std::ptr::null_mut()
) }
std::ptr::null_mut(),
)
}
};
if result == RCL_RET_OK as i32 {
@ -418,5 +416,4 @@ where
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
}

View File

@ -10,9 +10,8 @@ const N_TEARDOWN_CYCLES: usize = 2;
#[test]
// Let's create and drop a lot of node and publishers for a while to see that we can cope.
fn doesnt_crash() -> Result<(), Box<dyn std::error::Error>> {
let threads = (0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| std::thread::spawn(move || {
let threads = (0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| {
std::thread::spawn(move || {
for _i_cycle in 0..N_TEARDOWN_CYCLES {
// a global shared context.
let ctx = r2r::Context::create().unwrap();
@ -26,7 +25,12 @@ fn doesnt_crash() -> Result<(), Box<dyn std::error::Error>> {
// create concurrent nodes that max out the cpu
let ctx = ctx.clone();
ths.push(thread::spawn(move || {
let mut node = r2r::Node::create(ctx, &format!("testnode_{}_{}", i_context, i_node), "").unwrap();
let mut node = r2r::Node::create(
ctx,
&format!("testnode_{}_{}", i_context, i_node),
"",
)
.unwrap();
// each with 10 publishers
for _j in 0..10 {
@ -67,11 +71,10 @@ fn doesnt_crash() -> Result<(), Box<dyn std::error::Error>> {
t.join().unwrap();
}
// println!("all threads done {}", c);
}
}
}));
})
});
for thread in threads.into_iter() {
thread.join().unwrap();

View File

@ -1,8 +1,7 @@
use futures::stream::StreamExt;
use r2r::QosProfile;
use tokio::task;
use r2r::WrappedTypesupport;
use tokio::task;
const N_CONCURRENT_ROS_CONTEXT: usize = 3;
const N_TEARDOWN_CYCLES: usize = 2;
@ -10,27 +9,42 @@ const N_TEARDOWN_CYCLES: usize = 2;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
let mut threads = futures::stream::FuturesUnordered::from_iter(
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| tokio::spawn(async move {
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| {
tokio::spawn(async move {
// Iterate to check for memory corruption on node setup/teardown
for i_cycle in 0..N_TEARDOWN_CYCLES {
println!("tokio_subscribe_raw_testing iteration {i_cycle}");
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, &format!("testnode2_{i_context}"), "").unwrap();
let mut node =
r2r::Node::create(ctx, &format!("testnode2_{i_context}"), "").unwrap();
let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default()).unwrap();
let mut sub_int = node
.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())
.unwrap();
let mut sub_array =
node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default()).unwrap();
let mut sub_array = node
.subscribe_raw(
"/int_array",
"std_msgs/msg/Int32MultiArray",
QosProfile::default(),
)
.unwrap();
let pub_int =
node.create_publisher::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default()).unwrap();
let pub_int = node
.create_publisher::<r2r::std_msgs::msg::Int32>(
"/int",
QosProfile::default(),
)
.unwrap();
// Use an array as well since its a variable sized type
let pub_array = node.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>(
let pub_array = node
.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>(
"/int_array",
QosProfile::default(),
).unwrap();
)
.unwrap();
task::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
@ -74,8 +88,9 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
println!("Going to drop tokio_subscribe_raw_testing iteration {i_cycle}");
}
})));
})
}),
);
while let Some(thread) = threads.next().await {
thread.unwrap();
@ -84,44 +99,57 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>>
Ok(())
}
// Limit the number of threads to force threads to be reused
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_publish_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
let mut threads = futures::stream::FuturesUnordered::from_iter(
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| tokio::spawn(async move {
(0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| {
tokio::spawn(async move {
// Iterate to check for memory corruption on node setup/teardown
for i_cycle in 0..N_TEARDOWN_CYCLES {
println!("tokio_publish_raw_testing iteration {i_cycle}");
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, &format!("testnode3_{i_context}"), "").unwrap();
let mut node =
r2r::Node::create(ctx, &format!("testnode3_{i_context}"), "").unwrap();
let mut sub_int = node.subscribe::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default()).unwrap();
let mut sub_int = node
.subscribe::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default())
.unwrap();
let mut sub_array =
node.subscribe::<r2r::std_msgs::msg::Int32MultiArray>("/int_array", QosProfile::default()).unwrap();
let mut sub_array = node
.subscribe::<r2r::std_msgs::msg::Int32MultiArray>(
"/int_array",
QosProfile::default(),
)
.unwrap();
let pub_int = node.create_publisher_untyped(
let pub_int = node
.create_publisher_untyped(
"/int",
"std_msgs/msg/Int32",
QosProfile::default()
).unwrap();
QosProfile::default(),
)
.unwrap();
// Use an array as well since its a variable sized type
let pub_array = node.create_publisher_untyped(
let pub_array = node
.create_publisher_untyped(
"/int_array",
"std_msgs/msg/Int32MultiArray",
QosProfile::default(),
).unwrap();
)
.unwrap();
task::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
(0..10).for_each(|i| {
pub_int
.publish_raw(&r2r::std_msgs::msg::Int32 { data: i }.to_serialized_bytes().unwrap())
.publish_raw(
&r2r::std_msgs::msg::Int32 { data: i }
.to_serialized_bytes()
.unwrap(),
)
.unwrap();
pub_array
@ -129,7 +157,9 @@ async fn tokio_publish_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
&r2r::std_msgs::msg::Int32MultiArray {
layout: r2r::std_msgs::msg::MultiArrayLayout::default(),
data: vec![i],
}.to_serialized_bytes().unwrap()
}
.to_serialized_bytes()
.unwrap(),
)
.unwrap();
});
@ -143,7 +173,6 @@ async fn tokio_publish_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
println!("Got int msg with value {}", msg.data);
assert!(msg.data >= 0);
assert!(msg.data < 10);
}
});
@ -170,9 +199,10 @@ async fn tokio_publish_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
handle.join().unwrap();
println!("Going to drop tokio_publish_raw_testing iteration {i_cycle}");
}
})));
})
}),
);
while let Some(thread) = threads.next().await {
thread.unwrap();