This commit is contained in:
Martin Škoudlil 2025-05-07 12:47:42 +02:00 committed by GitHub
commit b5a59349bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1261 additions and 304 deletions

View File

@ -7,4 +7,5 @@ members = [
"r2r_msg_gen",
"r2r_macros",
"r2r_rcl",
"r2r_tracing",
]

View File

@ -23,6 +23,7 @@ r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
r2r_actions = { path = "../r2r_actions", version = "0.9.5" }
r2r_macros = { path = "../r2r_macros", version = "0.9.5" }
r2r_tracing = { path = "../r2r_tracing", version = "0.9.5" }
uuid = { version = "1.2.2", features = ["serde", "v4"] }
futures = "0.3.25"
log = "0.4.18"
@ -50,6 +51,7 @@ prettyplease = "0.2.6"
[features]
save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen", "r2r_actions/save-bindgen"]
doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", "r2r_actions/doc-only"]
tracing = ["r2r_tracing/tracing"]
[package.metadata.docs.rs]
features = ["doc-only"]

View File

@ -0,0 +1,64 @@
use std::time::Duration;
use futures::{executor::LocalPool, task::LocalSpawnExt};
use r2r::{std_msgs::msg, std_srvs::srv, QosProfile};
/// This example demonstrates creation of a service,
/// subscriber and timers with their callback execution traced.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
// The traced callback is supplied directly to `subscribe_traced`
// and `create_service_traced`functions.
let subscriber_future = node
.subscribe("/print", QosProfile::default())?
.traced_callback(|msg: msg::String| {
println!("Received message: '{}'", msg.data);
});
spawner.spawn_local(subscriber_future)?;
let mut value = false;
let service_future = node
.create_service::<srv::SetBool::Service>("/set_value", QosProfile::default())?
.traced_callback(move |req| {
if value == req.message.data {
req.respond(srv::SetBool::Response {
success: false,
message: format!("Value is already {value}."),
})
.expect("could not send service response");
} else {
value = req.message.data;
req.respond(srv::SetBool::Response {
success: true,
message: format!("Value set to {value}."),
})
.expect("could not send service response");
}
});
spawner.spawn_local(service_future)?;
let mut counter = 0;
let wall_timer_future =
node.create_wall_timer(Duration::from_millis(500))?
.on_tick(move |_| {
counter += 1;
println!("Wall timer tick: {counter}");
});
spawner.spawn_local(wall_timer_future)?;
let ros_timer_future = node.create_timer(Duration::from_secs(1))?.on_tick(|diff| {
println!("ROS timer tick. Time elapsed since last tick: {diff:?}");
});
spawner.spawn_local(ros_timer_future)?;
loop {
node.spin_once(std::time::Duration::from_millis(5));
pool.run_until_stalled();
}
}

View File

@ -318,11 +318,10 @@ impl Client_ for UntypedClient_ {
}
}
pub fn create_client_helper(
node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_client_t> {
let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
pub unsafe fn create_client_helper(
client_handle: &mut rcl_client_t, node: *mut rcl_node_t, service_name: &str,
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
@ -330,15 +329,16 @@ pub fn create_client_helper(
let mut client_options = rcl_client_get_default_options();
client_options.qos = qos_profile.into();
rcl_client_init(
&mut client_handle,
client_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -124,6 +124,7 @@ pub use clocks::{Clock, ClockType};
mod nodes;
pub use nodes::{Node, Timer};
pub use r2r_tracing::StreamWithTracingData;
pub mod qos;

View File

@ -4,6 +4,7 @@ use futures::{
stream::{Stream, StreamExt},
};
use indexmap::IndexMap;
use r2r_tracing::{StreamWithTracingData, StreamWithTracingDataBuilder, TracingId};
use std::{
collections::HashMap,
ffi::{CStr, CString},
@ -290,194 +291,16 @@ impl Node {
let node_name = self.name()?;
// rcl_interfaces/srv/SetParameters
let set_params_request_stream = self
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_future = self
.create_service::<rcl_interfaces::srv::SetParameters::Service>(
&format!("{}/set_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_future = set_params_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
result.results.push(r);
}
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
handlers.push(Box::pin(set_params_future));
// rcl_interfaces/srv/GetParameters
let get_params_request_stream = self
.create_service::<rcl_interfaces::srv::GetParameters::Service>(
&format!("{}/get_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let get_params_future = get_params_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
let params = params.lock().unwrap();
let values = req
.message
.names
.iter()
.map(|n| {
// First try to get the parameter from the param structure
if let Some(ps) = &params_struct_clone {
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
return value;
}
}
// Otherwise get it from node HashMap
match params.get(n) {
Some(v) => v.value.clone(),
None => ParameterValue::NotSet,
}
})
.map(|v| v.into_parameter_value_msg())
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
let result = rcl_interfaces::srv::GetParameters::Response { values };
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
handlers.push(Box::pin(get_params_future));
// rcl_interfaces/srv/ListParameters
use rcl_interfaces::srv::ListParameters;
let list_params_request_stream = self.create_service::<ListParameters::Service>(
&format!("{}/list_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let list_params_future = list_params_request_stream.for_each(
move |req: ServiceRequest<ListParameters::Service>| {
Self::handle_list_parameters(req, &params)
},
);
handlers.push(Box::pin(list_params_future));
// rcl_interfaces/srv/DescribeParameters
use rcl_interfaces::srv::DescribeParameters;
let desc_params_request_stream = self.create_service::<DescribeParameters::Service>(
&format!("{node_name}/describe_parameters"),
QosProfile::default(),
)?;
let params = self.params.clone();
let desc_params_future = desc_params_request_stream.for_each(
move |req: ServiceRequest<DescribeParameters::Service>| {
Self::handle_desc_parameters(req, &params)
},
);
handlers.push(Box::pin(desc_params_future));
// rcl_interfaces/srv/GetParameterTypes
use rcl_interfaces::srv::GetParameterTypes;
let get_param_types_request_stream = self.create_service::<GetParameterTypes::Service>(
&format!("{node_name}/get_parameter_types"),
QosProfile::default(),
)?;
let params = self.params.clone();
let get_param_types_future = get_param_types_request_stream.for_each(
move |req: ServiceRequest<GetParameterTypes::Service>| {
let params = params.lock().unwrap();
let types = req
.message
.names
.iter()
.map(|name| match params.get(name) {
Some(param) => param.value.into_parameter_type(),
None => rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET as u8,
})
.collect();
req.respond(GetParameterTypes::Response { types })
.expect("could not send reply to get parameter types request");
future::ready(())
},
);
handlers.push(Box::pin(get_param_types_future));
// rcl_interfaces/srv/SetParametersAtomically
let set_params_atomically_request_stream =
self.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
&format!("{}/set_parameters_atomically", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_atomically_future = set_params_atomically_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::SetParametersAtomically::Service>| {
let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default();
result.result.successful = true;
if let Some(ps) = &params_struct_clone {
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
result.result.successful = false;
result.result.reason = format!("Can't set parameter {}: {}", p.name, e);
break;
}
}
}
if result.result.successful {
// Since we checked them above now we assume these will be set ok...
)?
.traced_callback(
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
@ -516,17 +339,190 @@ impl Node {
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) {
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
result.results.push(r);
}
}
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(set_params_future));
// rcl_interfaces/srv/GetParameters
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let get_params_future = self
.create_service::<rcl_interfaces::srv::GetParameters::Service>(
&format!("{}/get_parameters", node_name),
QosProfile::default(),
)?
.traced_callback(
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
let params = params.lock().unwrap();
let values = req
.message
.names
.iter()
.map(|n| {
// First try to get the parameter from the param structure
if let Some(ps) = &params_struct_clone {
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
return value;
}
}
// Otherwise get it from node HashMap
match params.get(n) {
Some(v) => v.value.clone(),
None => ParameterValue::NotSet,
}
})
.map(|v| v.into_parameter_value_msg())
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
let result = rcl_interfaces::srv::GetParameters::Response { values };
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(get_params_future));
let params = self.params.clone();
// rcl_interfaces/srv/ListParameters
use rcl_interfaces::srv::ListParameters;
let list_params_future = self
.create_service::<ListParameters::Service>(
&format!("{}/list_parameters", node_name),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<ListParameters::Service>| {
Self::handle_list_parameters(req, &params)
});
handlers.push(Box::pin(list_params_future));
// rcl_interfaces/srv/DescribeParameters
use rcl_interfaces::srv::DescribeParameters;
let params = self.params.clone();
let desc_params_future = self
.create_service::<DescribeParameters::Service>(
&format!("{node_name}/describe_parameters"),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<DescribeParameters::Service>| {
Self::handle_desc_parameters(req, &params)
});
handlers.push(Box::pin(desc_params_future));
// rcl_interfaces/srv/GetParameterTypes
use rcl_interfaces::srv::GetParameterTypes;
let params = self.params.clone();
let get_param_types_future = self
.create_service::<GetParameterTypes::Service>(
&format!("{node_name}/get_parameter_types"),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<GetParameterTypes::Service>| {
let params = params.lock().unwrap();
let types = req
.message
.names
.iter()
.map(|name| match params.get(name) {
Some(param) => param.value.into_parameter_type(),
None => rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET as u8,
})
.collect();
req.respond(GetParameterTypes::Response { types })
.expect("could not send reply to get parameter types request");
});
handlers.push(Box::pin(get_param_types_future));
// rcl_interfaces/srv/SetParametersAtomically
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_atomically_future = self
.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
&format!("{}/set_parameters_atomically", node_name),
QosProfile::default(),
)?
.traced_callback(
move |req: ServiceRequest<
rcl_interfaces::srv::SetParametersAtomically::Service,
>| {
let mut result =
rcl_interfaces::srv::SetParametersAtomically::Response::default();
result.result.successful = true;
if let Some(ps) = &params_struct_clone {
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
result.result.successful = false;
result.result.reason =
format!("Can't set parameter {}: {}", p.name, e);
break;
}
}
}
if result.result.successful {
// Since we checked them above now we assume these will be set ok...
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) =
set_atomically_event_tx.try_send((p.name.clone(), val))
{
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
}
}
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(set_params_atomically_future));
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
@ -559,7 +555,7 @@ impl Node {
fn handle_list_parameters(
req: ServiceRequest<rcl_interfaces::srv::ListParameters::Service>,
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
) -> future::Ready<()> {
) {
use rcl_interfaces::srv::ListParameters;
let depth = req.message.depth;
@ -596,13 +592,12 @@ impl Node {
}
req.respond(ListParameters::Response { result })
.expect("could not send reply to list parameter request");
future::ready(())
}
fn handle_desc_parameters(
req: ServiceRequest<rcl_interfaces::srv::DescribeParameters::Service>,
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
) -> future::Ready<()> {
) {
use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters};
let mut descriptors = Vec::<ParameterDescriptor>::new();
let params = params.lock().unwrap();
@ -618,7 +613,6 @@ impl Node {
}
req.respond(DescribeParameters::Response { descriptors })
.expect("could not send reply to describe parameters request");
future::ready(())
}
/// Fetch a single ROS parameter.
@ -649,19 +643,38 @@ impl Node {
/// This function returns a `Stream` of ros messages.
pub fn subscribe<T: 'static>(
&mut self, topic: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = T> + Unpin>
) -> Result<StreamWithTracingData<T>>
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(TypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&*subscription).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -670,19 +683,38 @@ impl Node {
/// This function returns a `Stream` of ros messages without the rust convenience types.
pub fn subscribe_native<T: 'static>(
&mut self, topic: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
) -> Result<StreamWithTracingData<WrappedNativeMsg<T>>>
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
let ws = NativeSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(NativeSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -692,18 +724,37 @@ impl Node {
/// Useful when you cannot know the type of the message at compile time.
pub fn subscribe_untyped(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
) -> Result<StreamWithTracingData<Result<serde_json::Value>>> {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
let ws = UntypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(UntypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
topic_type: topic_type.to_string(),
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -713,7 +764,7 @@ impl Node {
/// Useful if you just want to pass the data along to another part of the system.
pub fn subscribe_raw(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
) -> Result<StreamWithTracingData<Vec<u8>>> {
// TODO is it possible to handle the raw message without type support?
//
// Passing null ts to rcl_subscription_init throws an error ..
@ -739,16 +790,35 @@ impl Node {
return Err(Error::from_rcl_error(ret));
}
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
let ws = RawSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(RawSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
msg_buf,
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -758,24 +828,41 @@ impl Node {
/// `respond` on the Service Request to send the reply.
pub fn create_service<T: 'static>(
&mut self, service_name: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
) -> Result<StreamWithTracingData<ServiceRequest<T>>>
where
T: WrappedServiceTypeSupport,
{
let service_handle = create_service_helper(
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
let ws = TypedService::<T> {
rcl_handle: service_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut service_arc = Arc::new(Mutex::new(TypedService::<T> {
rcl_handle: unsafe { rcl_get_zero_initialized_service() },
sender,
}));
let service_ref = Arc::get_mut(&mut service_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The service was zero initialized above.
// Full initialization happens in `create_service_helper``.
unsafe {
create_service_helper(
&mut service_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
};
self.services.push(Arc::new(Mutex::new(ws)));
let receiver = StreamWithTracingDataBuilder::build_service(receiver, unsafe {
TracingId::new(&service_ref.rcl_handle)
});
// Only push after full initialization.
self.services.push(service_arc);
Ok(receiver)
}
@ -788,19 +875,30 @@ impl Node {
where
T: WrappedServiceTypeSupport,
{
let client_handle = create_client_helper(
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
let ws = TypedClient::<T> {
rcl_handle: client_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut client_arc = Arc::new(Mutex::new(TypedClient::<T> {
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
}));
let client_ref = Arc::get_mut(&mut client_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The client was zero initialized above.
// Full initialization happens in `create_client_helper`.
unsafe {
create_client_helper(
&mut client_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
};
let client_arc = Arc::new(Mutex::new(ws));
let c = make_client(Arc::downgrade(&client_arc));
self.clients.push(client_arc);
Ok(c)
@ -816,20 +914,32 @@ impl Node {
&mut self, service_name: &str, service_type: &str, qos_profile: QosProfile,
) -> Result<ClientUntyped> {
let service_type = UntypedServiceSupport::new_from(service_type)?;
let client_handle = create_client_helper(
self.node_handle.as_mut(),
service_name,
service_type.ts,
qos_profile,
)?;
let client = UntypedClient_ {
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut client_arc = Arc::new(Mutex::new(UntypedClient_ {
service_type,
rcl_handle: client_handle,
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
}));
let client_ref = Arc::get_mut(&mut client_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The client was zero initialized above.
// Full initialization happens in `create_client_helper`.
unsafe {
create_client_helper(
&mut client_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
client_ref.service_type.ts,
qos_profile,
)?;
};
let client_arc = Arc::new(Mutex::new(client));
let c = make_untyped_client(Arc::downgrade(&client_arc));
self.clients.push(client_arc);
Ok(c)
@ -962,11 +1072,10 @@ impl Node {
where
T: WrappedTypesupport,
{
let publisher_handle =
let publisher_arc =
create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let arc = Arc::new(publisher_handle);
let p = make_publisher(Arc::downgrade(&arc));
self.pubs.push(arc);
let p = make_publisher(Arc::downgrade(&publisher_arc));
self.pubs.push(publisher_arc);
Ok(p)
}
@ -977,11 +1086,10 @@ impl Node {
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<PublisherUntyped> {
let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?;
let publisher_handle =
let publisher_arc =
create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?;
let arc = Arc::new(publisher_handle);
let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned());
self.pubs.push(arc);
let p = make_publisher_untyped(Arc::downgrade(&publisher_arc), topic_type.to_owned());
self.pubs.push(publisher_arc);
Ok(p)
}
@ -1019,6 +1127,8 @@ impl Node {
/// `timeout` is a duration specifying how long the spin should
/// block for if there are no pending events.
pub fn spin_once(&mut self, timeout: Duration) {
r2r_tracing::trace_spin_start(&*self.node_handle, timeout);
// first handle any completed action cancellation responses
for a in &mut self.action_servers {
a.lock().unwrap().send_completed_cancel_requests();
@ -1182,9 +1292,12 @@ impl Node {
unsafe {
rcl_wait_set_fini(&mut ws);
}
r2r_tracing::trace_spin_timeout(&*self.node_handle);
return;
}
r2r_tracing::trace_spin_wake(&*self.node_handle);
let mut subs_to_remove = vec![];
if ws.subscriptions != std::ptr::null_mut() {
let ws_subs =
@ -1341,6 +1454,8 @@ impl Node {
unsafe {
rcl_wait_set_fini(&mut ws);
}
r2r_tracing::trace_spin_end(&*self.node_handle);
}
/// Returns a map of topic names and type names of the publishers
@ -1444,9 +1559,15 @@ impl Node {
_clock: Some(clock), // The timer owns the clock.
sender: tx,
};
self.timers.push(timer);
let out_timer = Timer { receiver: rx };
let out_timer = unsafe {
Timer {
receiver: rx,
timer_handle: TracingId::new(timer.get_handle()),
node_handle: TracingId::new(&*self.node_handle),
}
};
self.timers.push(timer);
Ok(out_timer)
}
@ -1467,9 +1588,15 @@ impl Node {
_clock: None, // The timer does not own the clock (the node owns it).
sender: tx,
};
self.timers.push(timer);
let out_timer = Timer { receiver: rx };
let out_timer = unsafe {
Timer {
receiver: rx,
timer_handle: TracingId::new(timer.get_handle()),
node_handle: TracingId::new(&*self.node_handle),
}
};
self.timers.push(timer);
Ok(out_timer)
}
@ -1601,6 +1728,8 @@ impl Drop for Timer_ {
/// A ROS timer.
pub struct Timer {
receiver: mpsc::Receiver<Duration>,
timer_handle: TracingId<rcl_timer_t>,
node_handle: TracingId<rcl_node_t>,
}
impl Timer {
@ -1615,6 +1744,23 @@ impl Timer {
Err(Error::RCL_RET_TIMER_INVALID)
}
}
/// Transforms this timer stream to a [`Future`] calling the given `callback` on each tick.
///
/// The callback execution is traced by r2r_tracing.
///
/// This function should be called before dropping the timer's node.
/// Otherwise, the trace data might be inconsistent.
#[must_use = "Futures do nothing unless you `.await` or poll them"]
pub fn on_tick<F: FnMut(Duration)>(self, callback: F) -> impl Future<Output = ()> + Unpin {
let mut callback = r2r_tracing::Callback::new_timer(self.timer_handle, callback);
r2r_tracing::trace_timer_link_node(self.timer_handle, self.node_handle);
self.receiver.for_each(move |duration| {
callback.call(duration);
future::ready(())
})
}
}
// Since publishers are temporarily upgraded to owners during the

View File

@ -3,7 +3,7 @@ use std::{
ffi::{c_void, CString},
fmt::Debug,
marker::PhantomData,
sync::{Mutex, Once, Weak},
sync::{Arc, Mutex, Once, Weak},
};
use crate::{error::*, msg_types::*, qos::QosProfile};
@ -137,15 +137,24 @@ pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> Publis
pub fn create_publisher_helper(
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<Publisher_> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
) -> Result<Arc<Publisher_>> {
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
// Allocate the memory now so that the location of the rcl handle
// does not change after call to rcl_publisher_init.
// This is important because tracing in rcl expects the handle to be at a fixed location.
let mut publisher_arc = Arc::new(Publisher_ {
handle: unsafe { rcl_get_zero_initialized_publisher() },
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
});
let publisher_mut = Arc::get_mut(&mut publisher_arc)
.expect("No other Arc should exist. The Arc was just created.");
let result = unsafe {
let mut publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = qos_profile.into();
rcl_publisher_init(
&mut publisher_handle,
&mut publisher_mut.handle,
node,
typesupport,
topic_c_string.as_ptr(),
@ -153,10 +162,7 @@ pub fn create_publisher_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
})
Ok(publisher_arc)
} else {
Err(Error::from_rcl_error(result))
}
@ -176,6 +182,8 @@ impl PublisherUntyped {
let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
native_msg.from_json(msg)?;
r2r_tracing::trace_publish(native_msg.void_ptr());
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
@ -214,6 +222,8 @@ impl PublisherUntyped {
allocator: unsafe { rcutils_get_default_allocator() },
};
r2r_tracing::trace_publish((&msg_buf as *const rcl_serialized_message_t).cast::<c_void>());
let result = unsafe {
rcl_publish_serialized_message(
&publisher.handle,
@ -271,6 +281,9 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
r2r_tracing::trace_publish(native_msg.void_ptr());
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
@ -357,6 +370,8 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
r2r_tracing::trace_publish(msg.void_ptr());
let result = if msg.is_loaned {
unsafe {
// signal that we are relinquishing responsibility of the memory

View File

@ -111,11 +111,13 @@ where
}
}
pub fn create_service_helper(
node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_service_t> {
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
/// Initializes the service.
///
/// SAFETY: requires that the service handle is zero initialized by [`rcl_get_zero_initialized_service`].
pub unsafe fn create_service_helper(
service_handle: &mut rcl_service_t, node: &mut rcl_node_t, service_name: &str,
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
@ -123,7 +125,7 @@ pub fn create_service_helper(
let mut service_options = rcl_service_get_default_options();
service_options.qos = qos_profile.into();
rcl_service_init(
&mut service_handle,
service_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
@ -131,7 +133,7 @@ pub fn create_service_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(service_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -55,6 +55,8 @@ where
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
r2r_tracing::trace_take_ptr(msg.void_ptr());
let msg = T::from_native(&msg);
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
@ -138,6 +140,9 @@ where
new_msg
}
};
r2r_tracing::trace_take_ptr(msg.void_ptr());
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
@ -168,6 +173,8 @@ impl Subscriber_ for UntypedSubscriber {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
r2r_tracing::trace_take_ptr(msg.void_ptr());
let json = msg.to_json();
if let Err(e) = self.sender.try_send(json) {
if e.is_disconnected() {
@ -215,6 +222,8 @@ impl Subscriber_ for RawSubscriber {
}
};
r2r_tracing::trace_take(&self.msg_buf);
if let Err(e) = self.sender.try_send(data_bytes) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
@ -234,18 +243,17 @@ impl Subscriber_ for RawSubscriber {
}
}
pub fn create_subscription_helper(
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_subscription_t> {
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
pub unsafe fn create_subscription_helper(
subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str,
ts: *const rosidl_message_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let mut subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile.into();
rcl_subscription_init(
&mut subscription_handle,
subscription_handle,
node,
ts,
topic_c_string.as_ptr(),
@ -253,7 +261,7 @@ pub fn create_subscription_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(subscription_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -9,14 +9,18 @@ use crate::{
Clock, ClockType, Node, QosProfile, WrappedTypesupport,
};
use r2r_rcl::{
rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
rmw_message_info_t, RCL_RET_OK,
rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t,
rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK,
};
use r2r_tracing::TracingId;
use std::{
any::type_name,
sync::{Arc, Mutex, Weak},
};
use std::sync::{Arc, Mutex, Weak};
/// Provides time from `/clock` topic to attached ROS clocks
///
/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
/// By default, only clock used by ROS timers is attached and time from `/clock` topic is disabled.
///
/// The time from `/clock` topic can be activated by either of these:
/// - calling [`TimeSource::enable_sim_time`]
@ -102,7 +106,7 @@ impl TimeSource {
match inner.subscriber_state {
TimeSourceSubscriberState::None => {
let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
node.subscribers.push(Box::new(subscriber));
node.subscribers.push(subscriber);
inner.subscriber_state = TimeSourceSubscriberState::Active;
}
TimeSourceSubscriberState::ToBeDestroyed => {
@ -191,20 +195,39 @@ impl TimeSource_ {
}
impl TimeSourceSubscriber {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<Box<Self>> {
// The values are set based on default values in rclcpp
let qos = QosProfile::default().keep_last(1).best_effort();
let subscriber = create_subscription_helper(
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?;
Ok(Self {
subscriber_handle: subscriber,
let mut subscriber = Box::new(Self {
subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() },
time_source,
})
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscriber.subscriber_handle,
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?
};
// Use TimeSource inner arc address as callback_id. We hope that this id is much larger value than
// any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1.
let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize;
r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber);
r2r_tracing::trace_subscription_callback_added(
unsafe { TracingId::new(&*subscriber).forget_type() },
callback_id,
);
r2r_tracing::trace_callback_register(callback_id, type_name::<TimeSource>());
Ok(subscriber)
}
}
@ -226,11 +249,16 @@ impl Subscriber_ for TimeSourceSubscriber {
)
};
r2r_tracing::trace_take_ptr(clock_msg.void_ptr());
let mut inner_time_source = self.time_source.inner.lock().unwrap();
if ret == RCL_RET_OK as i32 {
let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
let time_source_ptr = Arc::as_ptr(&self.time_source.inner) as usize;
r2r_tracing::trace_callback_start(time_source_ptr, false);
inner_time_source.set_clock_time(msg.clock);
r2r_tracing::trace_callback_end(time_source_ptr);
}
match inner_time_source.subscriber_state {

20
r2r_tracing/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "r2r_tracing"
description = "Internal dependency containing tracepoint definitions or imports for r2r."
authors = ["Martin Škoudlil <skoudmar@cvut.cz>"]
version = "0.9.5"
license = "MIT"
edition = "2021"
repository = "https://github.com/sequenceplanner/r2r"
readme = "README.md"
[dependencies]
futures = "0.3.25"
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
[build-dependencies]
lttng-ust-generate = { git = "https://github.com/skoudmar/lttng-ust-rs.git", rev = "bd437b5eeb8c4378d78ebb5aa17f0f4a95828b32", optional = true}
r2r_common = { path = "../r2r_common", version = "0.9.5", optional = true}
[features]
tracing = ["dep:lttng-ust-generate", "dep:r2r_common"]

48
r2r_tracing/README.md Normal file
View File

@ -0,0 +1,48 @@
# r2r_tracing
Internal dependency containing tracepoint definitions or imports from `tracetools` ROS package for r2r.
Uses LTTng tracing framework.
## Feature flag `tracing`
The crate will generate and link tracepoint libraries only when the feature flag is enabled.
Without specifying the feature flag `tracing` all exported functions are No-ops.
## Depends on
- `tracetools` ROS package
- This package is a part of ROS distribution.
- `r2r_tracing` dynamically loads `tracetools` library to obtain tracepoints used by `rclcpp`.
- `lttng-ust` crate
- To define additional tracepoints.
## Recording traces of R2R applications
Make sure to enable feature flag `tracing`.
Then start tracing session:
- Either by installing [`ros2trace`](https://index.ros.org/p/ros2trace/) and running:
```sh
ros2 trace -u 'ros2:*' 'r2r:*'
```
The traces will be available in `$HOME/.ros/tracing/session-<timestamp>`.
- Alternatively, you can trace your application directly with LTTng:
```sh
# Session name is an optional user-chosen name for the trace
lttng create [session-name]
lttng enable-event -u 'ros2:*,r2r:*'
lttng add-context -u --type=vtid --type=vpid --type=procname
lttng start
# Start the ROS system here.
# Let it run for as long as you want to trace it.
lttng destroy
```
The traces will be available in `$HOME/lttng-traces/<session-name>-<timestamp>`.

76
r2r_tracing/build.rs Normal file
View File

@ -0,0 +1,76 @@
#[cfg(all(feature = "tracing", not(target_os = "linux")))]
compile_error!("Feature 'tracing' is only available on Linux.");
#[cfg(not(feature = "tracing"))]
fn main() {}
#[cfg(feature = "tracing")]
fn main() {
use tracing::{generate_r2r_tracepoints, generate_rclcpp_tracepoint_bindings};
generate_rclcpp_tracepoint_bindings();
generate_r2r_tracepoints();
}
#[cfg(feature = "tracing")]
mod tracing {
use lttng_ust_generate::{CIntegerType, CTFType, Generator, Provider};
use std::{env, path::PathBuf};
macro_rules! create_tracepoint {
($provider:ident::$name:ident($($arg_name:ident: $arg_lttng_type:expr),* $(,)?)) => {
$provider.create_class(concat!(stringify!($name), "_class"))
$(
.add_field(stringify!($arg_name), $arg_lttng_type)
)*
.instantiate(stringify!($name))
};
}
/// Generate bindings to the rclcpp tracepoints defined in the tracetools ros2 package.
pub(crate) fn generate_rclcpp_tracepoint_bindings() {
let bindings = r2r_common::setup_bindgen_builder()
.header("src/tracetools_wrapper.h")
.allowlist_function("ros_trace_rclcpp_.*")
.allowlist_function("ros_trace_callback_.*")
.generate_comments(true)
.generate()
.expect("Unable to generate bindings for tracetools");
println!("cargo:rustc-link-lib=tracetools");
// Write the bindings to the $OUT_DIR/tracetools_bindings.rs file.
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("tracetools_bindings.rs"))
.expect("Couldn't write tracetools bindings!");
}
pub(crate) fn generate_r2r_tracepoints() {
let mut r2r = Provider::new("r2r");
create_tracepoint!(r2r::spin_start(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
timeout_s: CTFType::Integer(CIntegerType::U64),
timeout_ns: CTFType::Integer(CIntegerType::U32),
));
create_tracepoint!(r2r::spin_end(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
create_tracepoint!(r2r::spin_wake(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
create_tracepoint!(r2r::spin_timeout(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
Generator::default()
.generated_lib_name("r2r_tracepoints_r2r")
.register_provider(r2r)
.output_file_name(
PathBuf::from(env::var("OUT_DIR").unwrap()).join("r2r_tracepoints.rs"),
)
.generate()
.expect("Unable to generate tracepoint bindings for r2r");
}
}

View File

@ -0,0 +1,81 @@
use crate::{
trace_callback_register, trace_service_callback_added, trace_subscription_callback_added,
trace_timer_callback_added, TracingId,
};
use r2r_rcl::{rcl_service_t, rcl_timer_t};
use std::{
any::type_name,
ffi::c_void,
marker::PhantomData,
sync::atomic::{AtomicUsize, Ordering::Relaxed},
};
/// Tracing wrapper for callback
pub struct Callback<F, M>
where
F: FnMut(M),
{
func: F,
id: usize,
msg_type: PhantomData<M>,
}
impl<F, M> Callback<F, M>
where
F: FnMut(M),
{
/// Generates unique ID for the callback
fn gen_id() -> usize {
static COUNTER: AtomicUsize = AtomicUsize::new(1);
COUNTER.fetch_add(1, Relaxed)
}
fn new(callback: F, id: usize) -> Self {
trace_callback_register(id, type_name::<F>());
Self {
func: callback,
id,
msg_type: PhantomData,
}
}
/// Emits trace event associating this `callback` with the `service`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_service(service: TracingId<rcl_service_t>, callback: F) -> Self {
let id = Self::gen_id();
trace_service_callback_added(service, id);
Self::new(callback, id)
}
/// Emits trace event associating this `callback` with the `timer`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_timer(timer: TracingId<rcl_timer_t>, callback: F) -> Self {
let id = Self::gen_id();
trace_timer_callback_added(timer, id);
Self::new(callback, id)
}
/// Emits trace event associating this `callback` with the `subscription`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_subscription(subscriber: TracingId<c_void>, callback: F) -> Self {
let id = Self::gen_id();
trace_subscription_callback_added(subscriber, id);
Self::new(callback, id)
}
/// Call the `callback`.
/// This emits `ros2:callback_start` and `ros2:callback_end` events at
/// the beginning and end respectively.
pub fn call(&mut self, msg: M) {
crate::trace_callback_start(self.id, false);
(self.func)(msg);
crate::trace_callback_end(self.id);
}
}

25
r2r_tracing/src/lib.rs Normal file
View File

@ -0,0 +1,25 @@
//! Tracepoint provider for the `r2r` crate.
#[cfg(feature = "tracing")]
mod r2r_tracepoints_bindings;
#[cfg(feature = "tracing")]
mod tracetools_bindings;
mod rclcpp_tracepoints;
pub use rclcpp_tracepoints::*;
mod r2r_tracepoints;
pub use r2r_tracepoints::*;
mod callback;
pub use callback::Callback;
mod stream;
pub use stream::{StreamWithTracingData, StreamWithTracingDataBuilder};
mod tracing_id;
pub use tracing_id::TracingId;
mod macros;
use macros::tracepoint_fn;

26
r2r_tracing/src/macros.rs Normal file
View File

@ -0,0 +1,26 @@
/// Generates two versions of each function definition wrapped via this macro:
/// - One with tracing enabled: Adds a feature flag and copies the function signature and body as is.
/// - One with tracing disabled: Adds a feature flag and replaces the function body to be empty.
macro_rules! tracepoint_fn {
($($(#[$atts:meta])* $vi:vis fn $name:ident$(<$generic:tt>)?($($arg_name:ident : $arg_ty:ty),* $(,)?) $(-> $ret_ty:ty)? $body:block)*) => {
$(
$(#[$atts])*
#[inline]
#[cfg(feature = "tracing")]
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? $body
$(#[$atts])*
#[doc = ""] // Empty doc line to start a new paragraph.
#[doc = "Tracing is currently disabled so this function is a no-op. Enable `tracing` feature to enable tracing."]
#[inline(always)]
#[allow(unused_variables)]
#[cfg(not(feature = "tracing"))]
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? {
// Do nothing
}
)*
};
}
// set macro visibility to public in crate only
pub(crate) use tracepoint_fn;

View File

@ -0,0 +1,32 @@
#[cfg(feature = "tracing")]
use crate::r2r_tracepoints_bindings::r2r as tp;
use crate::tracepoint_fn;
use r2r_rcl::rcl_node_t;
use std::time::Duration;
tracepoint_fn! {
/// The `node` started spinning with given `timeout`.
pub fn trace_spin_start(node: *const rcl_node_t, timeout: Duration) {
let timeout_s = timeout.as_secs();
let timeout_ns = timeout.subsec_nanos();
tp::spin_start(node as usize, timeout_s, timeout_ns);
}
/// The `node` ended spinning function.
///
/// If the spinning function ended by a timeout, use `trace_spin_timeout` instead.
pub fn trace_spin_end(node: *const rcl_node_t) {
tp::spin_end(node as usize);
}
/// The `node` woke up from waiting on a wait set without reaching timeout.
pub fn trace_spin_wake(node: *const rcl_node_t) {
tp::spin_wake(node as usize);
}
/// The `node` timeouted while spinning
pub fn trace_spin_timeout(node: *const rcl_node_t) {
tp::spin_timeout(node as usize);
}
}

View File

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), "/r2r_tracepoints.rs"));

View File

@ -0,0 +1,161 @@
use crate::{tracepoint_fn, TracingId};
use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t};
#[cfg(feature = "tracing")]
use crate::tracetools_bindings as tp;
#[cfg(feature = "tracing")]
use std::{
ffi::{c_void, CString},
ptr::null,
};
#[cfg(feature = "tracing")]
const fn ref_to_c_void<T>(t: &T) -> *const std::ffi::c_void {
std::ptr::from_ref(t).cast()
}
#[cfg(feature = "tracing")]
macro_rules! c_void {
($e:ident) => {
($e) as *const std::ffi::c_void
};
}
// Documentation of tracepoints is based on https://github.com/ros2/ros2_tracing project.
// From file `tracetools/include/tracetools/tracetools.h`
tracepoint_fn! {
/// Message publication.
///
/// Records the pointer to the `message` being published at the `rclcpp`/`r2r` level.
///
/// Tracepoint: `ros2::rclcpp_publish
// Lint allow note: The message pointer is NOT dereferenced.
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn trace_publish(message: *const std::ffi::c_void) {
unsafe {
// first argument documentation:
// publisher_handle not used, but kept for API/ABI stability
tp::ros_trace_rclcpp_publish(null(), message);
}
}
/// Subscription object initialization.
///
/// Tracepoint to allow associating the `subscription_handle` from `rcl` with the address of rust `subscription` reference.
/// There can be more than 1 `subscription` for 1 `subscription_handle`.
///
/// Tracepoint: `ros2::rclcpp_subscription_init`
pub fn trace_subscription_init<S>(
subscription_handle: *const rcl_subscription_t, subscription: &S,
) {
unsafe {
tp::ros_trace_rclcpp_subscription_init(
subscription_handle.cast(),
ref_to_c_void(subscription),
);
}
}
/// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object.
///
/// Tracepoint: `ros2::rclcpp_subscription_callback_added`
pub fn trace_subscription_callback_added(subscription: TracingId<std::ffi::c_void>, callback_id: usize) {
unsafe {
tp::ros_trace_rclcpp_subscription_callback_added(
subscription.c_void(),
c_void!(callback_id),
);
}
}
/// Message taking.
///
/// Records the **reference** to the `message` being taken at the `rclcpp`/`r2r` level.
///
/// To trace messages pointed to by void pointer use [`trace_take_ptr`].
///
/// Tracepoint: `ros2::rclcpp_take`
pub fn trace_take<M>(message: &M) {
unsafe {
tp::ros_trace_rclcpp_take(ref_to_c_void(message));
}
}
/// Message taking.
///
/// Records the **void pointer** to the `message` being taken at the `rclcpp`/`r2r` level.
///
/// To trace messages by their reference use [`trace_take`].
///
/// Tracepoint: `ros2::rclcpp_take`
// Lint allow note: The message pointer is NOT dereferenced.
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn trace_take_ptr(message: *const std::ffi::c_void) {
unsafe {
tp::ros_trace_rclcpp_take(message);
}
}
/// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`.
///
/// Tracepoint: `ros2::rclcpp_service_callback_added`
pub fn trace_service_callback_added(service: TracingId<rcl_service_t>, callback_id: usize) {
unsafe { tp::ros_trace_rclcpp_service_callback_added(service.c_void(), c_void!(callback_id)) }
}
/// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle.
///
/// Tracepoint: `ros2::rclcpp_timer_callback_added`
pub fn trace_timer_callback_added(timer: TracingId<rcl_timer_t>, callback_id: usize) {
unsafe {
tp::ros_trace_rclcpp_timer_callback_added(timer.c_void(), c_void!(callback_id));
}
}
/// Tracepoint to allow associating the `timer` with a `node`.
///
/// Tracepoint: `ros2::rclcpp_timer_link_node`
pub fn trace_timer_link_node(timer: TracingId<rcl_timer_t>, node: TracingId<rcl_node_t>) {
unsafe {
tp::ros_trace_rclcpp_timer_link_node(timer.c_void(), node.c_void());
}
}
/// Tracepoint to allow associating demangled `function_symbol` with a `callback_id`.
///
/// Allocates memory to store the function symbol as a `CString`.
///
/// Tracepoint: `ros2::callback_register`
///
/// # Panics
/// If `function_symbol` contains a null byte.
pub fn trace_callback_register(callback_id: usize, function_symbol: &str) {
let function_symbol = CString::new(function_symbol)
.expect("r2r tracing: Cannot convert function_symbol to CString. It contains null byte.");
unsafe {
tp::ros_trace_rclcpp_callback_register(c_void!(callback_id), function_symbol.as_ptr());
}
}
/// Start of a callback
///
/// Set `is_intra_process` depending on whether this callback is done via intra-process or not
///
/// Tracepoint: `ros2::callback_start`
pub fn trace_callback_start(callback_id: usize, is_intra_process: bool) {
unsafe {
tp::ros_trace_callback_start(c_void!(callback_id), is_intra_process);
}
}
/// End of a callback.
///
/// Tracepoint: `ros2::callback_end`
pub fn trace_callback_end(callback_id: usize) {
unsafe {
tp::ros_trace_callback_end(c_void!(callback_id));
}
}
}

136
r2r_tracing/src/stream.rs Normal file
View File

@ -0,0 +1,136 @@
use std::{
ffi::c_void,
future::{self, Future},
};
use futures::{channel::mpsc::Receiver, stream::FusedStream, Stream, StreamExt as _};
use r2r_rcl::rcl_service_t;
#[cfg(feature = "tracing")]
use crate::Callback;
use crate::TracingId;
/// A stream wrapper containing tracing data.
///
/// When the `tracing` feature is enabled, you can use the [`Self::traced_callback`] method
/// to trace the execution of the callback. Stream polling is not traced.
#[derive(Debug)]
pub struct StreamWithTracingData<T> {
stream: Receiver<T>,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType,
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone, Copy)]
enum TracingIdWithType {
// The type of subscription is `c_void` because the actual type would
// be a generic R2R subscriber and not `rcl_subscription_t`.
Subscription(TracingId<c_void>),
Service(TracingId<rcl_service_t>),
}
impl<T> StreamWithTracingData<T> {
/// Converts the stream into a future that calls the provided callback for each message.
///
/// If `tracing` feature is enabled:
/// - Each time the callback is called, its start and end time will be traced by the
/// ROS 2 tracing framework.
/// - You should not poll the stream before calling this method because
/// otherwise it might confuse software that analyzes the trace.
pub fn traced_callback<C>(self, callback: C) -> impl Future<Output = ()> + Unpin
where
C: FnMut(T),
{
#[cfg(feature = "tracing")]
{
let mut callback_wrapper = match self.tracing_id {
TracingIdWithType::Subscription(id) => Callback::new_subscription(id, callback),
TracingIdWithType::Service(id) => Callback::new_service(id, callback),
};
self.stream.for_each(move |msg| {
callback_wrapper.call(msg);
future::ready(())
})
}
#[cfg(not(feature = "tracing"))]
{
let mut callback = callback;
self.stream.for_each(move |msg| {
callback(msg);
future::ready(())
})
}
}
}
impl<T> Stream for StreamWithTracingData<T> {
type Item = T;
fn poll_next(
self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
this.stream.poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<T> FusedStream for StreamWithTracingData<T> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
/// A builder for `StreamWithTracingData`.
///
/// This struct exists to allow creation of `StreamWithTracingData` without polluting its public API.
/// It is used internally by r2r and should not be reexported.
pub struct StreamWithTracingDataBuilder;
impl StreamWithTracingDataBuilder {
#[must_use]
#[allow(
clippy::not_unsafe_ptr_arg_deref,
reason = "The pointer is not dereferenced"
)]
#[cfg_attr(
not(feature = "tracing"),
expect(
unused_variables,
reason = "service_id is not saved if tracing is disabled"
)
)]
pub fn build_service<T>(
stream: Receiver<T>, service_id: TracingId<rcl_service_t>,
) -> StreamWithTracingData<T> {
StreamWithTracingData {
stream,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType::Service(service_id),
}
}
#[must_use]
#[cfg_attr(
not(feature = "tracing"),
expect(
unused_variables,
reason = "subscription_id is not saved if tracing is disabled"
)
)]
pub fn build_subscription<T>(
stream: Receiver<T>, subscription_id: TracingId<c_void>,
) -> StreamWithTracingData<T> {
StreamWithTracingData {
stream,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType::Subscription(subscription_id),
}
}
}

View File

@ -0,0 +1,6 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(unused)]
include!(concat!(env!("OUT_DIR"), "/tracetools_bindings.rs"));

View File

@ -0,0 +1 @@
#include <tracetools/tracetools.h>

View File

@ -0,0 +1,77 @@
/// Unique identifier for tracing purposes
#[derive(Debug)]
pub struct TracingId<T> {
/// Pointer to the object used as a unique ID.
/// Safety: Do NOT dereference the pointer.
#[cfg(feature = "tracing")]
id: *const T,
/// Marker for the type. Needed when `tracing` feature is disabled.
#[cfg(not(feature = "tracing"))]
_marker: std::marker::PhantomData<T>,
}
impl<T> TracingId<T> {
/// Creates new `TracingId` from the pointer.
///
/// # Safety
/// The pointer is used as a unique ID so users must ensure that they never create `TracingId`
/// with same address for different objects.
///
/// The pointer does not need to point to valid memory.
pub const unsafe fn new(_id: *const T) -> Self {
Self {
#[cfg(feature = "tracing")]
id: _id,
#[cfg(not(feature = "tracing"))]
_marker: std::marker::PhantomData,
}
}
/// Erase the generic type of the ID.
#[must_use]
pub fn forget_type(self) -> TracingId<std::ffi::c_void> {
#[cfg(not(feature = "tracing"))]
unsafe {
// Safety: The ID cannot be obtained back without the `tracing` feature.
TracingId::new(std::ptr::null())
}
#[cfg(feature = "tracing")]
unsafe {
// Safety: self contains valid ID.
TracingId::new(self.c_void())
}
}
/// Obtain the address representing the ID.
///
/// # Safety
/// Do NOT dereference the pointer.
#[cfg(feature = "tracing")]
pub(crate) const unsafe fn c_void(self) -> *const std::ffi::c_void {
self.id.cast::<std::ffi::c_void>()
}
}
/// Deriving Clone for `TracingId` would only derive it only conditionally based on whether the
/// `T` is `Clone` or not. But TracingId is independent of T.
impl<T> Clone for TracingId<T> {
fn clone(&self) -> Self {
*self
}
}
/// Deriving Copy for `TracingId` would only derive it only conditionally based on whether the
/// `T` is `Copy` or not. But TracingId is independent of T.
impl<T> Copy for TracingId<T> {}
/// # Safety
///
/// The address is never dereferenced and is used only as a unique ID so it is safe to send to another thread.
unsafe impl<T> Send for TracingId<T> {}
/// # Safety
///
/// The `TracingId` does not allow interior mutability because the pointer is never dereferenced.
/// It is safe to use across multiple threads.
unsafe impl<T> Sync for TracingId<T> {}