Compare commits

...

14 Commits

Author SHA1 Message Date
Martin Škoudlil 8aa9e89243 Add example showing usage of callback tracing 2025-05-07 12:47:12 +02:00
Martin Škoudlil c77b001762 Add tracing to parameter handling services 2025-05-07 12:47:12 +02:00
Martin Škoudlil f77574c3a9 Create to Stream wrapper with traced_callback method
The StreamWithTracingData wrapper implements both Stream and FusedStream similarly to Receiver
2025-05-07 12:47:12 +02:00
Martin Škoudlil 148e1de8ac Add tracepoints to r2r
Location and arguments of tracepoints is consistent with rclcpp
2025-05-07 12:47:12 +02:00
Martin Škoudlil 583c45c9ea Add Callback type to allow tracing callback calls 2025-05-07 12:47:12 +02:00
Martin Škoudlil 2b4f9753d4 Create TracingId type
The TracingId is used as a wrapper for the pointer to allow implementing
Send and Sync traits.

The pointer is used only as an ID and is never dereferenced, therefore, it is safe.
2025-05-07 12:47:12 +02:00
Martin Škoudlil cf03fe1b48 Add r2r_tracing crate
The crate contains tracepoints; either defined directly or imported
from ros2_tracing project (tracetools package).

The crate define a `tracing` feature which should only be used on Linux
because it uses LTTng only available on Linux.
2025-05-07 12:47:11 +02:00
Martin Škoudlil e371130cf1 Make rcl_client_t have stable location
This prevents another client initializing at the same location.
Therefore preventing conflicts in a trace.
2025-04-23 17:12:37 +02:00
Martin Škoudlil 23672cfdb8 Make rcl_service_t have stable location
This prevents another service initializing at the same location.
Therefore preventing conflicts in a trace.
2025-04-23 17:12:37 +02:00
Martin Škoudlil a51bda8901 Make rcl_subscription_t have stable location
This change is necessary because rcl uses address of the rcl handle in
tracepoints. If the location changes further `take` events will not
be correctly assigned to the subscriber.
2025-04-23 17:12:37 +02:00
Martin Škoudlil 5514c475bc Make rcl_publisher_t have stable location
This change is necessary because rcl uses address of the rcl handle in
tracepoints. If the location changes, further publish events will not
be correctly assigned to the publisher.
2025-04-23 17:12:37 +02:00
Martin Dahl 455ea630ec Release 0.9.5 2025-04-22 10:27:18 +02:00
Martin Dahl aa180c5b15
Add `/set_parameters_atomically` (#120, #121)
* Fixed failing `ros2 param ...` on r2r nodes for Jazzy (#120)

Add `/set_parameters_atomically` service to `make_parameter_handler_internal` in `nodes.rs` to fix failing `ros2 param ...` on r2r nodes for Jazzy.

* Atomic behavior for `set_parameters_atomically` (#121).

---------

Co-authored-by: Desmond Germans <desmond@germansmedia.nl>
2025-04-22 10:20:32 +02:00
Benjamin Bjerken 56acba5dd5 Update bindgen 0.63.0 -> 0.71.1 (#116)
This fixes #116 by now generating the missing primitive sequence functions.
Tested to be working on Ubuntu 22.04 with ROS2 Humble and Iron.
2025-04-17 09:54:48 +02:00
32 changed files with 1330 additions and 252 deletions

View File

@ -7,4 +7,5 @@ members = [
"r2r_msg_gen",
"r2r_macros",
"r2r_rcl",
"r2r_tracing",
]

View File

@ -14,7 +14,7 @@ These bindings are being written organically when things are needed by me and ot
How to use
--------------------
1. Make sure you have libclang installed. (e.g. libclang-dev on ubuntu)
2. Depend on this package in Cargo.toml: `r2r = "0.9.4"`
2. Depend on this package in Cargo.toml: `r2r = "0.9.5"`
3. You need to source your ROS2 installation before building/running.
4. The bindings will rebuild automatically if/when you source your workspace(s).
5. If you make changes to existing message types, run `cargo clean -p r2r_msg_gen` to force recompilation of the rust message types on the next build.
@ -47,6 +47,10 @@ Changelog
--------------------
#### [Unreleased]
#### [0.9.5] - 2025-04-22
- Add `/set_parameters_atomically` <https://github.com/sequenceplanner/r2r/pull/120>, <https://github.com/sequenceplanner/r2r/pull/121>. Fixes `ros2 param ...` on Jazzy.
- Update bindgen 0.63.0 -> 0.71.1 <https://github.com/sequenceplanner/r2r/pull/116>. (Issues may persist until next bindgen version, see PR).
#### [0.9.4] - 2024-11-21
- Fix cargo syntax for older rust versions < 1.77 <https://github.com/sequenceplanner/r2r/commit/74ad4410c79b1be7e42eb1822a291639e3c40ec4>
- Fix message generation for WChar idl type <https://github.com/sequenceplanner/r2r/commit/94db799db282c8b1e0222f1699e6a6420b5fd544>

View File

@ -1,6 +1,6 @@
[package]
name = "r2r"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Easy to use, runtime-agnostic, async rust bindings for ROS2."
license = "MIT AND Apache-2.0"
@ -18,11 +18,12 @@ serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89"
thiserror = "1.0.37"
lazy_static = "1.4.0"
r2r_common = { path = "../r2r_common", version = "0.9.4" }
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
r2r_actions = { path = "../r2r_actions", version = "0.9.4" }
r2r_macros = { path = "../r2r_macros", version = "0.9.4" }
r2r_common = { path = "../r2r_common", version = "0.9.5" }
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
r2r_actions = { path = "../r2r_actions", version = "0.9.5" }
r2r_macros = { path = "../r2r_macros", version = "0.9.5" }
r2r_tracing = { path = "../r2r_tracing", version = "0.9.5" }
uuid = { version = "1.2.2", features = ["serde", "v4"] }
futures = "0.3.25"
log = "0.4.18"
@ -38,8 +39,8 @@ cdr = "0.2.4"
criterion = "0.5.1"
[build-dependencies]
r2r_common = { path = "../r2r_common", version = "0.9.4" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
r2r_common = { path = "../r2r_common", version = "0.9.5" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
rayon = "1.7.0"
force-send-sync = "1.0.0"
quote = "1.0.28"
@ -50,6 +51,7 @@ prettyplease = "0.2.6"
[features]
save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen", "r2r_actions/save-bindgen"]
doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", "r2r_actions/doc-only"]
tracing = ["r2r_tracing/tracing"]
[package.metadata.docs.rs]
features = ["doc-only"]

View File

@ -0,0 +1,64 @@
use std::time::Duration;
use futures::{executor::LocalPool, task::LocalSpawnExt};
use r2r::{std_msgs::msg, std_srvs::srv, QosProfile};
/// This example demonstrates creation of a service,
/// subscriber and timers with their callback execution traced.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
// The traced callback is supplied directly to `subscribe_traced`
// and `create_service_traced`functions.
let subscriber_future = node
.subscribe("/print", QosProfile::default())?
.traced_callback(|msg: msg::String| {
println!("Received message: '{}'", msg.data);
});
spawner.spawn_local(subscriber_future)?;
let mut value = false;
let service_future = node
.create_service::<srv::SetBool::Service>("/set_value", QosProfile::default())?
.traced_callback(move |req| {
if value == req.message.data {
req.respond(srv::SetBool::Response {
success: false,
message: format!("Value is already {value}."),
})
.expect("could not send service response");
} else {
value = req.message.data;
req.respond(srv::SetBool::Response {
success: true,
message: format!("Value set to {value}."),
})
.expect("could not send service response");
}
});
spawner.spawn_local(service_future)?;
let mut counter = 0;
let wall_timer_future =
node.create_wall_timer(Duration::from_millis(500))?
.on_tick(move |_| {
counter += 1;
println!("Wall timer tick: {counter}");
});
spawner.spawn_local(wall_timer_future)?;
let ros_timer_future = node.create_timer(Duration::from_secs(1))?.on_tick(|diff| {
println!("ROS timer tick. Time elapsed since last tick: {diff:?}");
});
spawner.spawn_local(ros_timer_future)?;
loop {
node.spin_once(std::time::Duration::from_millis(5));
pool.run_until_stalled();
}
}

View File

@ -318,11 +318,10 @@ impl Client_ for UntypedClient_ {
}
}
pub fn create_client_helper(
node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_client_t> {
let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
pub unsafe fn create_client_helper(
client_handle: &mut rcl_client_t, node: *mut rcl_node_t, service_name: &str,
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
@ -330,15 +329,16 @@ pub fn create_client_helper(
let mut client_options = rcl_client_get_default_options();
client_options.qos = qos_profile.into();
rcl_client_init(
&mut client_handle,
client_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -124,6 +124,7 @@ pub use clocks::{Clock, ClockType};
mod nodes;
pub use nodes::{Node, Timer};
pub use r2r_tracing::StreamWithTracingData;
pub mod qos;

View File

@ -4,6 +4,7 @@ use futures::{
stream::{Stream, StreamExt},
};
use indexmap::IndexMap;
use r2r_tracing::{StreamWithTracingData, StreamWithTracingDataBuilder, TracingId};
use std::{
collections::HashMap,
ffi::{CStr, CString},
@ -283,154 +284,150 @@ impl Node {
.register_parameters("", None, &mut self.params.lock().unwrap())?;
}
let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
let mut set_atomically_event_tx = set_event_tx.clone();
let node_name = self.name()?;
let set_params_request_stream = self
// rcl_interfaces/srv/SetParameters
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_future = self
.create_service::<rcl_interfaces::srv::SetParameters::Service>(
&format!("{}/set_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_future = set_params_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
)?
.traced_callback(
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
result.results.push(r);
}
result.results.push(r);
}
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(set_params_future));
// rcl_interfaces/srv/GetParameters
let get_params_request_stream = self
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let get_params_future = self
.create_service::<rcl_interfaces::srv::GetParameters::Service>(
&format!("{}/get_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let get_params_future = get_params_request_stream.for_each(
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
let params = params.lock().unwrap();
let values = req
.message
.names
.iter()
.map(|n| {
// First try to get the parameter from the param structure
if let Some(ps) = &params_struct_clone {
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
return value;
)?
.traced_callback(
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
let params = params.lock().unwrap();
let values = req
.message
.names
.iter()
.map(|n| {
// First try to get the parameter from the param structure
if let Some(ps) = &params_struct_clone {
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
return value;
}
}
}
// Otherwise get it from node HashMap
match params.get(n) {
Some(v) => v.value.clone(),
None => ParameterValue::NotSet,
}
})
.map(|v| v.into_parameter_value_msg())
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
// Otherwise get it from node HashMap
match params.get(n) {
Some(v) => v.value.clone(),
None => ParameterValue::NotSet,
}
})
.map(|v| v.into_parameter_value_msg())
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
let result = rcl_interfaces::srv::GetParameters::Response { values };
req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(())
},
);
let result = rcl_interfaces::srv::GetParameters::Response { values };
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(get_params_future));
let params = self.params.clone();
// rcl_interfaces/srv/ListParameters
use rcl_interfaces::srv::ListParameters;
let list_params_request_stream = self.create_service::<ListParameters::Service>(
&format!("{}/list_parameters", node_name),
QosProfile::default(),
)?;
let params = self.params.clone();
let list_params_future = list_params_request_stream.for_each(
move |req: ServiceRequest<ListParameters::Service>| {
let list_params_future = self
.create_service::<ListParameters::Service>(
&format!("{}/list_parameters", node_name),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<ListParameters::Service>| {
Self::handle_list_parameters(req, &params)
},
);
});
handlers.push(Box::pin(list_params_future));
// rcl_interfaces/srv/DescribeParameters
use rcl_interfaces::srv::DescribeParameters;
let desc_params_request_stream = self.create_service::<DescribeParameters::Service>(
&format!("{node_name}/describe_parameters"),
QosProfile::default(),
)?;
let params = self.params.clone();
let desc_params_future = desc_params_request_stream.for_each(
move |req: ServiceRequest<DescribeParameters::Service>| {
let desc_params_future = self
.create_service::<DescribeParameters::Service>(
&format!("{node_name}/describe_parameters"),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<DescribeParameters::Service>| {
Self::handle_desc_parameters(req, &params)
},
);
});
handlers.push(Box::pin(desc_params_future));
// rcl_interfaces/srv/GetParameterTypes
use rcl_interfaces::srv::GetParameterTypes;
let get_param_types_request_stream = self.create_service::<GetParameterTypes::Service>(
&format!("{node_name}/get_parameter_types"),
QosProfile::default(),
)?;
let params = self.params.clone();
let get_param_types_future = get_param_types_request_stream.for_each(
move |req: ServiceRequest<GetParameterTypes::Service>| {
let get_param_types_future = self
.create_service::<GetParameterTypes::Service>(
&format!("{node_name}/get_parameter_types"),
QosProfile::default(),
)?
.traced_callback(move |req: ServiceRequest<GetParameterTypes::Service>| {
let params = params.lock().unwrap();
let types = req
.message
@ -443,12 +440,91 @@ impl Node {
.collect();
req.respond(GetParameterTypes::Response { types })
.expect("could not send reply to get parameter types request");
future::ready(())
},
);
});
handlers.push(Box::pin(get_param_types_future));
// rcl_interfaces/srv/SetParametersAtomically
let params = self.params.clone();
let params_struct_clone = params_struct.clone();
let set_params_atomically_future = self
.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
&format!("{}/set_parameters_atomically", node_name),
QosProfile::default(),
)?
.traced_callback(
move |req: ServiceRequest<
rcl_interfaces::srv::SetParametersAtomically::Service,
>| {
let mut result =
rcl_interfaces::srv::SetParametersAtomically::Response::default();
result.result.successful = true;
if let Some(ps) = &params_struct_clone {
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
result.result.successful = false;
result.result.reason =
format!("Can't set parameter {}: {}", p.name, e);
break;
}
}
}
if result.result.successful {
// Since we checked them above now we assume these will be set ok...
for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
let changed = params
.lock()
.unwrap()
.get(&p.name)
.map(|v| v.value != val)
.unwrap_or(true); // changed=true if new
let r = if let Some(ps) = &params_struct_clone {
// Update parameter structure
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
if result.is_ok() {
// Also update Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone());
}
rcl_interfaces::msg::SetParametersResult {
successful: result.is_ok(),
reason: result.err().map_or("".into(), |e| e.to_string()),
}
} else {
// No parameter structure - update only Node::params
params
.lock()
.unwrap()
.entry(p.name.clone())
.and_modify(|p| p.value = val.clone())
.or_insert(Parameter::new(val.clone()));
rcl_interfaces::msg::SetParametersResult {
successful: true,
reason: "".into(),
}
};
// if the value changed, send out new value on parameter event stream
if changed && r.successful {
if let Err(e) =
set_atomically_event_tx.try_send((p.name.clone(), val))
{
log::debug!("Warning: could not send parameter event ({}).", e);
}
}
}
}
req.respond(result)
.expect("could not send reply to set parameter request");
},
);
handlers.push(Box::pin(set_params_atomically_future));
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
{
// create TimeSource based on value of use_sim_time parameter
@ -479,7 +555,7 @@ impl Node {
fn handle_list_parameters(
req: ServiceRequest<rcl_interfaces::srv::ListParameters::Service>,
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
) -> future::Ready<()> {
) {
use rcl_interfaces::srv::ListParameters;
let depth = req.message.depth;
@ -516,13 +592,12 @@ impl Node {
}
req.respond(ListParameters::Response { result })
.expect("could not send reply to list parameter request");
future::ready(())
}
fn handle_desc_parameters(
req: ServiceRequest<rcl_interfaces::srv::DescribeParameters::Service>,
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
) -> future::Ready<()> {
) {
use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters};
let mut descriptors = Vec::<ParameterDescriptor>::new();
let params = params.lock().unwrap();
@ -538,7 +613,6 @@ impl Node {
}
req.respond(DescribeParameters::Response { descriptors })
.expect("could not send reply to describe parameters request");
future::ready(())
}
/// Fetch a single ROS parameter.
@ -569,19 +643,38 @@ impl Node {
/// This function returns a `Stream` of ros messages.
pub fn subscribe<T: 'static>(
&mut self, topic: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = T> + Unpin>
) -> Result<StreamWithTracingData<T>>
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(TypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&*subscription).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -590,19 +683,38 @@ impl Node {
/// This function returns a `Stream` of ros messages without the rust convenience types.
pub fn subscribe_native<T: 'static>(
&mut self, topic: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
) -> Result<StreamWithTracingData<WrappedNativeMsg<T>>>
where
T: WrappedTypesupport,
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
let ws = NativeSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(NativeSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
T::get_ts(),
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -612,18 +724,37 @@ impl Node {
/// Useful when you cannot know the type of the message at compile time.
pub fn subscribe_untyped(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
) -> Result<StreamWithTracingData<Result<serde_json::Value>>> {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
let ws = UntypedSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(UntypedSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
topic_type: topic_type.to_string(),
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -633,7 +764,7 @@ impl Node {
/// Useful if you just want to pass the data along to another part of the system.
pub fn subscribe_raw(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
) -> Result<StreamWithTracingData<Vec<u8>>> {
// TODO is it possible to handle the raw message without type support?
//
// Passing null ts to rcl_subscription_init throws an error ..
@ -659,16 +790,35 @@ impl Node {
return Err(Error::from_rcl_error(ret));
}
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
let ws = RawSubscriber {
rcl_handle: subscription_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut subscription = Box::new(RawSubscriber {
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
msg_buf,
sender,
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscription.rcl_handle,
self.node_handle.as_mut(),
topic,
msg.ts,
qos_profile,
)?;
};
self.subscribers.push(Box::new(ws));
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
TracingId::new(&subscription.rcl_handle).forget_type()
});
self.subscribers.push(subscription);
Ok(receiver)
}
@ -678,24 +828,41 @@ impl Node {
/// `respond` on the Service Request to send the reply.
pub fn create_service<T: 'static>(
&mut self, service_name: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
) -> Result<StreamWithTracingData<ServiceRequest<T>>>
where
T: WrappedServiceTypeSupport,
{
let service_handle = create_service_helper(
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
let ws = TypedService::<T> {
rcl_handle: service_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut service_arc = Arc::new(Mutex::new(TypedService::<T> {
rcl_handle: unsafe { rcl_get_zero_initialized_service() },
sender,
}));
let service_ref = Arc::get_mut(&mut service_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The service was zero initialized above.
// Full initialization happens in `create_service_helper``.
unsafe {
create_service_helper(
&mut service_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
};
self.services.push(Arc::new(Mutex::new(ws)));
let receiver = StreamWithTracingDataBuilder::build_service(receiver, unsafe {
TracingId::new(&service_ref.rcl_handle)
});
// Only push after full initialization.
self.services.push(service_arc);
Ok(receiver)
}
@ -708,19 +875,30 @@ impl Node {
where
T: WrappedServiceTypeSupport,
{
let client_handle = create_client_helper(
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
let ws = TypedClient::<T> {
rcl_handle: client_handle,
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut client_arc = Arc::new(Mutex::new(TypedClient::<T> {
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
}));
let client_ref = Arc::get_mut(&mut client_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The client was zero initialized above.
// Full initialization happens in `create_client_helper`.
unsafe {
create_client_helper(
&mut client_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
T::get_ts(),
qos_profile,
)?;
};
let client_arc = Arc::new(Mutex::new(ws));
let c = make_client(Arc::downgrade(&client_arc));
self.clients.push(client_arc);
Ok(c)
@ -736,20 +914,32 @@ impl Node {
&mut self, service_name: &str, service_type: &str, qos_profile: QosProfile,
) -> Result<ClientUntyped> {
let service_type = UntypedServiceSupport::new_from(service_type)?;
let client_handle = create_client_helper(
self.node_handle.as_mut(),
service_name,
service_type.ts,
qos_profile,
)?;
let client = UntypedClient_ {
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
let mut client_arc = Arc::new(Mutex::new(UntypedClient_ {
service_type,
rcl_handle: client_handle,
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
response_channels: Vec::new(),
poll_available_channels: Vec::new(),
}));
let client_ref = Arc::get_mut(&mut client_arc)
.unwrap() // No other Arc should exist. The Arc was just created.
.get_mut()
.unwrap(); // The mutex was just created. It should not be poisoned.
// SAFETY:
// The client was zero initialized above.
// Full initialization happens in `create_client_helper`.
unsafe {
create_client_helper(
&mut client_ref.rcl_handle,
self.node_handle.as_mut(),
service_name,
client_ref.service_type.ts,
qos_profile,
)?;
};
let client_arc = Arc::new(Mutex::new(client));
let c = make_untyped_client(Arc::downgrade(&client_arc));
self.clients.push(client_arc);
Ok(c)
@ -882,11 +1072,10 @@ impl Node {
where
T: WrappedTypesupport,
{
let publisher_handle =
let publisher_arc =
create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let arc = Arc::new(publisher_handle);
let p = make_publisher(Arc::downgrade(&arc));
self.pubs.push(arc);
let p = make_publisher(Arc::downgrade(&publisher_arc));
self.pubs.push(publisher_arc);
Ok(p)
}
@ -897,11 +1086,10 @@ impl Node {
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<PublisherUntyped> {
let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?;
let publisher_handle =
let publisher_arc =
create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?;
let arc = Arc::new(publisher_handle);
let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned());
self.pubs.push(arc);
let p = make_publisher_untyped(Arc::downgrade(&publisher_arc), topic_type.to_owned());
self.pubs.push(publisher_arc);
Ok(p)
}
@ -939,6 +1127,8 @@ impl Node {
/// `timeout` is a duration specifying how long the spin should
/// block for if there are no pending events.
pub fn spin_once(&mut self, timeout: Duration) {
r2r_tracing::trace_spin_start(&*self.node_handle, timeout);
// first handle any completed action cancellation responses
for a in &mut self.action_servers {
a.lock().unwrap().send_completed_cancel_requests();
@ -1102,9 +1292,12 @@ impl Node {
unsafe {
rcl_wait_set_fini(&mut ws);
}
r2r_tracing::trace_spin_timeout(&*self.node_handle);
return;
}
r2r_tracing::trace_spin_wake(&*self.node_handle);
let mut subs_to_remove = vec![];
if ws.subscriptions != std::ptr::null_mut() {
let ws_subs =
@ -1261,6 +1454,8 @@ impl Node {
unsafe {
rcl_wait_set_fini(&mut ws);
}
r2r_tracing::trace_spin_end(&*self.node_handle);
}
/// Returns a map of topic names and type names of the publishers
@ -1364,9 +1559,15 @@ impl Node {
_clock: Some(clock), // The timer owns the clock.
sender: tx,
};
self.timers.push(timer);
let out_timer = Timer { receiver: rx };
let out_timer = unsafe {
Timer {
receiver: rx,
timer_handle: TracingId::new(timer.get_handle()),
node_handle: TracingId::new(&*self.node_handle),
}
};
self.timers.push(timer);
Ok(out_timer)
}
@ -1387,9 +1588,15 @@ impl Node {
_clock: None, // The timer does not own the clock (the node owns it).
sender: tx,
};
self.timers.push(timer);
let out_timer = Timer { receiver: rx };
let out_timer = unsafe {
Timer {
receiver: rx,
timer_handle: TracingId::new(timer.get_handle()),
node_handle: TracingId::new(&*self.node_handle),
}
};
self.timers.push(timer);
Ok(out_timer)
}
@ -1521,6 +1728,8 @@ impl Drop for Timer_ {
/// A ROS timer.
pub struct Timer {
receiver: mpsc::Receiver<Duration>,
timer_handle: TracingId<rcl_timer_t>,
node_handle: TracingId<rcl_node_t>,
}
impl Timer {
@ -1535,6 +1744,23 @@ impl Timer {
Err(Error::RCL_RET_TIMER_INVALID)
}
}
/// Transforms this timer stream to a [`Future`] calling the given `callback` on each tick.
///
/// The callback execution is traced by r2r_tracing.
///
/// This function should be called before dropping the timer's node.
/// Otherwise, the trace data might be inconsistent.
#[must_use = "Futures do nothing unless you `.await` or poll them"]
pub fn on_tick<F: FnMut(Duration)>(self, callback: F) -> impl Future<Output = ()> + Unpin {
let mut callback = r2r_tracing::Callback::new_timer(self.timer_handle, callback);
r2r_tracing::trace_timer_link_node(self.timer_handle, self.node_handle);
self.receiver.for_each(move |duration| {
callback.call(duration);
future::ready(())
})
}
}
// Since publishers are temporarily upgraded to owners during the

View File

@ -308,6 +308,7 @@ pub trait RosParams {
) -> Result<()>;
fn get_parameter(&mut self, param_name: &str) -> Result<ParameterValue>;
fn set_parameter(&mut self, param_name: &str, param_val: &ParameterValue) -> Result<()>;
fn check_parameter(&self, param_name: &str, param_val: &ParameterValue) -> Result<()>;
}
// Implementation of RosParams for primitive types, i.e. leaf parameters
@ -360,6 +361,26 @@ macro_rules! impl_ros_params {
}),
}
}
fn check_parameter(
&self, param_name: &str, param_val: &ParameterValue,
) -> Result<()> {
if param_name != "" {
return Err(Error::InvalidParameterName {
name: param_name.to_owned(),
});
}
match param_val {
$param_value_type(val) => {
let _: Self = $from_param_conv(val)?;
Ok(())
}
_ => Err(Error::InvalidParameterType {
name: "".to_string(), // will be completed by callers who know the name
ty: std::stringify!($param_value_type),
}),
}
}
}
};
}

View File

@ -3,7 +3,7 @@ use std::{
ffi::{c_void, CString},
fmt::Debug,
marker::PhantomData,
sync::{Mutex, Once, Weak},
sync::{Arc, Mutex, Once, Weak},
};
use crate::{error::*, msg_types::*, qos::QosProfile};
@ -137,15 +137,24 @@ pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> Publis
pub fn create_publisher_helper(
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<Publisher_> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
) -> Result<Arc<Publisher_>> {
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
// Allocate the memory now so that the location of the rcl handle
// does not change after call to rcl_publisher_init.
// This is important because tracing in rcl expects the handle to be at a fixed location.
let mut publisher_arc = Arc::new(Publisher_ {
handle: unsafe { rcl_get_zero_initialized_publisher() },
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
});
let publisher_mut = Arc::get_mut(&mut publisher_arc)
.expect("No other Arc should exist. The Arc was just created.");
let result = unsafe {
let mut publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = qos_profile.into();
rcl_publisher_init(
&mut publisher_handle,
&mut publisher_mut.handle,
node,
typesupport,
topic_c_string.as_ptr(),
@ -153,10 +162,7 @@ pub fn create_publisher_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
})
Ok(publisher_arc)
} else {
Err(Error::from_rcl_error(result))
}
@ -176,6 +182,8 @@ impl PublisherUntyped {
let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
native_msg.from_json(msg)?;
r2r_tracing::trace_publish(native_msg.void_ptr());
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
@ -214,6 +222,8 @@ impl PublisherUntyped {
allocator: unsafe { rcutils_get_default_allocator() },
};
r2r_tracing::trace_publish((&msg_buf as *const rcl_serialized_message_t).cast::<c_void>());
let result = unsafe {
rcl_publish_serialized_message(
&publisher.handle,
@ -271,6 +281,9 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
r2r_tracing::trace_publish(native_msg.void_ptr());
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
@ -357,6 +370,8 @@ where
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
r2r_tracing::trace_publish(msg.void_ptr());
let result = if msg.is_loaned {
unsafe {
// signal that we are relinquishing responsibility of the memory

View File

@ -111,11 +111,13 @@ where
}
}
pub fn create_service_helper(
node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_service_t> {
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
/// Initializes the service.
///
/// SAFETY: requires that the service handle is zero initialized by [`rcl_get_zero_initialized_service`].
pub unsafe fn create_service_helper(
service_handle: &mut rcl_service_t, node: &mut rcl_node_t, service_name: &str,
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
@ -123,7 +125,7 @@ pub fn create_service_helper(
let mut service_options = rcl_service_get_default_options();
service_options.qos = qos_profile.into();
rcl_service_init(
&mut service_handle,
service_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
@ -131,7 +133,7 @@ pub fn create_service_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(service_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -55,6 +55,8 @@ where
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
r2r_tracing::trace_take_ptr(msg.void_ptr());
let msg = T::from_native(&msg);
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
@ -138,6 +140,9 @@ where
new_msg
}
};
r2r_tracing::trace_take_ptr(msg.void_ptr());
if let Err(e) = self.sender.try_send(msg) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
@ -168,6 +173,8 @@ impl Subscriber_ for UntypedSubscriber {
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
};
if ret == RCL_RET_OK as i32 {
r2r_tracing::trace_take_ptr(msg.void_ptr());
let json = msg.to_json();
if let Err(e) = self.sender.try_send(json) {
if e.is_disconnected() {
@ -215,6 +222,8 @@ impl Subscriber_ for RawSubscriber {
}
};
r2r_tracing::trace_take(&self.msg_buf);
if let Err(e) = self.sender.try_send(data_bytes) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
@ -234,18 +243,17 @@ impl Subscriber_ for RawSubscriber {
}
}
pub fn create_subscription_helper(
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<rcl_subscription_t> {
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
pub unsafe fn create_subscription_helper(
subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str,
ts: *const rosidl_message_type_support_t, qos_profile: QosProfile,
) -> Result<()> {
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let mut subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile.into();
rcl_subscription_init(
&mut subscription_handle,
subscription_handle,
node,
ts,
topic_c_string.as_ptr(),
@ -253,7 +261,7 @@ pub fn create_subscription_helper(
)
};
if result == RCL_RET_OK as i32 {
Ok(subscription_handle)
Ok(())
} else {
Err(Error::from_rcl_error(result))
}

View File

@ -9,14 +9,18 @@ use crate::{
Clock, ClockType, Node, QosProfile, WrappedTypesupport,
};
use r2r_rcl::{
rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
rmw_message_info_t, RCL_RET_OK,
rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t,
rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK,
};
use r2r_tracing::TracingId;
use std::{
any::type_name,
sync::{Arc, Mutex, Weak},
};
use std::sync::{Arc, Mutex, Weak};
/// Provides time from `/clock` topic to attached ROS clocks
///
/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
/// By default, only clock used by ROS timers is attached and time from `/clock` topic is disabled.
///
/// The time from `/clock` topic can be activated by either of these:
/// - calling [`TimeSource::enable_sim_time`]
@ -102,7 +106,7 @@ impl TimeSource {
match inner.subscriber_state {
TimeSourceSubscriberState::None => {
let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
node.subscribers.push(Box::new(subscriber));
node.subscribers.push(subscriber);
inner.subscriber_state = TimeSourceSubscriberState::Active;
}
TimeSourceSubscriberState::ToBeDestroyed => {
@ -191,20 +195,39 @@ impl TimeSource_ {
}
impl TimeSourceSubscriber {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<Box<Self>> {
// The values are set based on default values in rclcpp
let qos = QosProfile::default().keep_last(1).best_effort();
let subscriber = create_subscription_helper(
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?;
Ok(Self {
subscriber_handle: subscriber,
let mut subscriber = Box::new(Self {
subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() },
time_source,
})
});
// SAFETY:
// create_subscription_helper requires zero initialized subscription_handle -> done above
// Completes initialization of subscription.
unsafe {
create_subscription_helper(
&mut subscriber.subscriber_handle,
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?
};
// Use TimeSource inner arc address as callback_id. We hope that this id is much larger value than
// any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1.
let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize;
r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber);
r2r_tracing::trace_subscription_callback_added(
unsafe { TracingId::new(&*subscriber).forget_type() },
callback_id,
);
r2r_tracing::trace_callback_register(callback_id, type_name::<TimeSource>());
Ok(subscriber)
}
}
@ -226,11 +249,16 @@ impl Subscriber_ for TimeSourceSubscriber {
)
};
r2r_tracing::trace_take_ptr(clock_msg.void_ptr());
let mut inner_time_source = self.time_source.inner.lock().unwrap();
if ret == RCL_RET_OK as i32 {
let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
let time_source_ptr = Arc::as_ptr(&self.time_source.inner) as usize;
r2r_tracing::trace_callback_start(time_source_ptr, false);
inner_time_source.set_clock_time(msg.clock);
r2r_tracing::trace_callback_end(time_source_ptr);
}
match inner_time_source.subscriber_state {

View File

@ -1,6 +1,6 @@
[package]
name = "r2r_actions"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Internal dependency to the r2r crate."
license = "MIT"
@ -11,12 +11,12 @@ repository = "https://github.com/sequenceplanner/r2r"
documentation = "https://docs.rs/r2r/latest/r2r"
[dependencies]
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
[build-dependencies]
bindgen = "0.63.0"
r2r_common = { path = "../r2r_common", version = "0.9.4" }
bindgen = "0.71.1"
r2r_common = { path = "../r2r_common", version = "0.9.5" }
[features]
save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen"]

View File

@ -1,5 +1,5 @@
#
# For r2r 0.9.4
# For r2r 0.9.5
#
# cmake code for simple colcon integration.
# See https://github.com/m-dahl/r2r_minimal_node/

View File

@ -1,6 +1,6 @@
[package]
name = "r2r_common"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Minimal ros2 bindings."
license = "MIT"
@ -11,7 +11,7 @@ repository = "https://github.com/sequenceplanner/r2r"
documentation = "https://docs.rs/r2r/latest/r2r"
[dependencies]
bindgen = "0.63.0"
bindgen = "0.71.1"
sha2 = "0.10.6"
os_str_bytes = "6.5.1"
regex = "1.8.4"

View File

@ -1,6 +1,6 @@
[package]
name = "r2r_macros"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Minimal ros2 bindings."
license = "MIT"

View File

@ -18,6 +18,8 @@ pub fn derive_r2r_params(input: proc_macro::TokenStream) -> proc_macro::TokenStr
let get_param_matches = param_matches_for(quote!(get_parameter(suffix)), &input.data);
let set_param_matches =
param_matches_for(quote!(set_parameter(suffix, param_val)), &input.data);
let check_param_matches =
param_matches_for(quote!(check_parameter(suffix, param_val)), &input.data);
let expanded = quote! {
// The generated impl.
@ -64,6 +66,20 @@ pub fn derive_r2r_params(input: proc_macro::TokenStream) -> proc_macro::TokenStr
};
result.map_err(|e| e.update_param_name(&param_name))
}
fn check_parameter(&self, param_name: &str, param_val: &::r2r::ParameterValue) -> ::r2r::Result<()>
{
let (prefix, suffix) = match param_name.split_once('.') {
None => (param_name, ""),
Some((prefix, suffix)) => (prefix, suffix)
};
let result = match prefix {
#check_param_matches
_ => Err(::r2r::Error::InvalidParameterName {
name: "".into(),
}),
};
result.map_err(|e| e.update_param_name(&param_name))
}
}
};

View File

@ -1,6 +1,6 @@
[package]
name = "r2r_msg_gen"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Internal dependency to the r2r crate."
license = "MIT"
@ -11,8 +11,8 @@ repository = "https://github.com/sequenceplanner/r2r"
documentation = "https://docs.rs/r2r/latest/r2r"
[dependencies]
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
r2r_common = { path = "../r2r_common", version = "0.9.4" }
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
r2r_common = { path = "../r2r_common", version = "0.9.5" }
phf = { version = "0.11.1", features = ["macros"] }
quote = "1.0.28"
proc-macro2 = "1.0.60"
@ -21,9 +21,9 @@ force-send-sync = "1.0.0"
rayon = "1.7.0"
[build-dependencies]
bindgen = "0.63.0"
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
r2r_common = { path = "../r2r_common", version = "0.9.4" }
bindgen = "0.71.1"
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
r2r_common = { path = "../r2r_common", version = "0.9.5" }
quote = "1.0.28"
syn = { version = "2.0.18", features = ["full"] }
rayon = "1.7.0"

View File

@ -1,6 +1,6 @@
[package]
name = "r2r_rcl"
version = "0.9.4"
version = "0.9.5"
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
description = "Internal dependency to the r2r crate."
license = "MIT"
@ -15,8 +15,8 @@ paste = "1.0.9"
widestring = "1.0.2"
[build-dependencies]
bindgen = "0.63.0"
r2r_common = { path = "../r2r_common", version = "0.9.4" }
bindgen = "0.71.1"
r2r_common = { path = "../r2r_common", version = "0.9.5" }
[features]
save-bindgen = []

20
r2r_tracing/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "r2r_tracing"
description = "Internal dependency containing tracepoint definitions or imports for r2r."
authors = ["Martin Škoudlil <skoudmar@cvut.cz>"]
version = "0.9.5"
license = "MIT"
edition = "2021"
repository = "https://github.com/sequenceplanner/r2r"
readme = "README.md"
[dependencies]
futures = "0.3.25"
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
[build-dependencies]
lttng-ust-generate = { git = "https://github.com/skoudmar/lttng-ust-rs.git", rev = "bd437b5eeb8c4378d78ebb5aa17f0f4a95828b32", optional = true}
r2r_common = { path = "../r2r_common", version = "0.9.5", optional = true}
[features]
tracing = ["dep:lttng-ust-generate", "dep:r2r_common"]

48
r2r_tracing/README.md Normal file
View File

@ -0,0 +1,48 @@
# r2r_tracing
Internal dependency containing tracepoint definitions or imports from `tracetools` ROS package for r2r.
Uses LTTng tracing framework.
## Feature flag `tracing`
The crate will generate and link tracepoint libraries only when the feature flag is enabled.
Without specifying the feature flag `tracing` all exported functions are No-ops.
## Depends on
- `tracetools` ROS package
- This package is a part of ROS distribution.
- `r2r_tracing` dynamically loads `tracetools` library to obtain tracepoints used by `rclcpp`.
- `lttng-ust` crate
- To define additional tracepoints.
## Recording traces of R2R applications
Make sure to enable feature flag `tracing`.
Then start tracing session:
- Either by installing [`ros2trace`](https://index.ros.org/p/ros2trace/) and running:
```sh
ros2 trace -u 'ros2:*' 'r2r:*'
```
The traces will be available in `$HOME/.ros/tracing/session-<timestamp>`.
- Alternatively, you can trace your application directly with LTTng:
```sh
# Session name is an optional user-chosen name for the trace
lttng create [session-name]
lttng enable-event -u 'ros2:*,r2r:*'
lttng add-context -u --type=vtid --type=vpid --type=procname
lttng start
# Start the ROS system here.
# Let it run for as long as you want to trace it.
lttng destroy
```
The traces will be available in `$HOME/lttng-traces/<session-name>-<timestamp>`.

76
r2r_tracing/build.rs Normal file
View File

@ -0,0 +1,76 @@
#[cfg(all(feature = "tracing", not(target_os = "linux")))]
compile_error!("Feature 'tracing' is only available on Linux.");
#[cfg(not(feature = "tracing"))]
fn main() {}
#[cfg(feature = "tracing")]
fn main() {
use tracing::{generate_r2r_tracepoints, generate_rclcpp_tracepoint_bindings};
generate_rclcpp_tracepoint_bindings();
generate_r2r_tracepoints();
}
#[cfg(feature = "tracing")]
mod tracing {
use lttng_ust_generate::{CIntegerType, CTFType, Generator, Provider};
use std::{env, path::PathBuf};
macro_rules! create_tracepoint {
($provider:ident::$name:ident($($arg_name:ident: $arg_lttng_type:expr),* $(,)?)) => {
$provider.create_class(concat!(stringify!($name), "_class"))
$(
.add_field(stringify!($arg_name), $arg_lttng_type)
)*
.instantiate(stringify!($name))
};
}
/// Generate bindings to the rclcpp tracepoints defined in the tracetools ros2 package.
pub(crate) fn generate_rclcpp_tracepoint_bindings() {
let bindings = r2r_common::setup_bindgen_builder()
.header("src/tracetools_wrapper.h")
.allowlist_function("ros_trace_rclcpp_.*")
.allowlist_function("ros_trace_callback_.*")
.generate_comments(true)
.generate()
.expect("Unable to generate bindings for tracetools");
println!("cargo:rustc-link-lib=tracetools");
// Write the bindings to the $OUT_DIR/tracetools_bindings.rs file.
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("tracetools_bindings.rs"))
.expect("Couldn't write tracetools bindings!");
}
pub(crate) fn generate_r2r_tracepoints() {
let mut r2r = Provider::new("r2r");
create_tracepoint!(r2r::spin_start(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
timeout_s: CTFType::Integer(CIntegerType::U64),
timeout_ns: CTFType::Integer(CIntegerType::U32),
));
create_tracepoint!(r2r::spin_end(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
create_tracepoint!(r2r::spin_wake(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
create_tracepoint!(r2r::spin_timeout(
node_handle: CTFType::IntegerHex(CIntegerType::USize),
));
Generator::default()
.generated_lib_name("r2r_tracepoints_r2r")
.register_provider(r2r)
.output_file_name(
PathBuf::from(env::var("OUT_DIR").unwrap()).join("r2r_tracepoints.rs"),
)
.generate()
.expect("Unable to generate tracepoint bindings for r2r");
}
}

View File

@ -0,0 +1,81 @@
use crate::{
trace_callback_register, trace_service_callback_added, trace_subscription_callback_added,
trace_timer_callback_added, TracingId,
};
use r2r_rcl::{rcl_service_t, rcl_timer_t};
use std::{
any::type_name,
ffi::c_void,
marker::PhantomData,
sync::atomic::{AtomicUsize, Ordering::Relaxed},
};
/// Tracing wrapper for callback
pub struct Callback<F, M>
where
F: FnMut(M),
{
func: F,
id: usize,
msg_type: PhantomData<M>,
}
impl<F, M> Callback<F, M>
where
F: FnMut(M),
{
/// Generates unique ID for the callback
fn gen_id() -> usize {
static COUNTER: AtomicUsize = AtomicUsize::new(1);
COUNTER.fetch_add(1, Relaxed)
}
fn new(callback: F, id: usize) -> Self {
trace_callback_register(id, type_name::<F>());
Self {
func: callback,
id,
msg_type: PhantomData,
}
}
/// Emits trace event associating this `callback` with the `service`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_service(service: TracingId<rcl_service_t>, callback: F) -> Self {
let id = Self::gen_id();
trace_service_callback_added(service, id);
Self::new(callback, id)
}
/// Emits trace event associating this `callback` with the `timer`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_timer(timer: TracingId<rcl_timer_t>, callback: F) -> Self {
let id = Self::gen_id();
trace_timer_callback_added(timer, id);
Self::new(callback, id)
}
/// Emits trace event associating this `callback` with the `subscription`.
///
/// Wraps the callback to allow tracing the callback calls.
pub fn new_subscription(subscriber: TracingId<c_void>, callback: F) -> Self {
let id = Self::gen_id();
trace_subscription_callback_added(subscriber, id);
Self::new(callback, id)
}
/// Call the `callback`.
/// This emits `ros2:callback_start` and `ros2:callback_end` events at
/// the beginning and end respectively.
pub fn call(&mut self, msg: M) {
crate::trace_callback_start(self.id, false);
(self.func)(msg);
crate::trace_callback_end(self.id);
}
}

25
r2r_tracing/src/lib.rs Normal file
View File

@ -0,0 +1,25 @@
//! Tracepoint provider for the `r2r` crate.
#[cfg(feature = "tracing")]
mod r2r_tracepoints_bindings;
#[cfg(feature = "tracing")]
mod tracetools_bindings;
mod rclcpp_tracepoints;
pub use rclcpp_tracepoints::*;
mod r2r_tracepoints;
pub use r2r_tracepoints::*;
mod callback;
pub use callback::Callback;
mod stream;
pub use stream::{StreamWithTracingData, StreamWithTracingDataBuilder};
mod tracing_id;
pub use tracing_id::TracingId;
mod macros;
use macros::tracepoint_fn;

26
r2r_tracing/src/macros.rs Normal file
View File

@ -0,0 +1,26 @@
/// Generates two versions of each function definition wrapped via this macro:
/// - One with tracing enabled: Adds a feature flag and copies the function signature and body as is.
/// - One with tracing disabled: Adds a feature flag and replaces the function body to be empty.
macro_rules! tracepoint_fn {
($($(#[$atts:meta])* $vi:vis fn $name:ident$(<$generic:tt>)?($($arg_name:ident : $arg_ty:ty),* $(,)?) $(-> $ret_ty:ty)? $body:block)*) => {
$(
$(#[$atts])*
#[inline]
#[cfg(feature = "tracing")]
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? $body
$(#[$atts])*
#[doc = ""] // Empty doc line to start a new paragraph.
#[doc = "Tracing is currently disabled so this function is a no-op. Enable `tracing` feature to enable tracing."]
#[inline(always)]
#[allow(unused_variables)]
#[cfg(not(feature = "tracing"))]
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? {
// Do nothing
}
)*
};
}
// set macro visibility to public in crate only
pub(crate) use tracepoint_fn;

View File

@ -0,0 +1,32 @@
#[cfg(feature = "tracing")]
use crate::r2r_tracepoints_bindings::r2r as tp;
use crate::tracepoint_fn;
use r2r_rcl::rcl_node_t;
use std::time::Duration;
tracepoint_fn! {
/// The `node` started spinning with given `timeout`.
pub fn trace_spin_start(node: *const rcl_node_t, timeout: Duration) {
let timeout_s = timeout.as_secs();
let timeout_ns = timeout.subsec_nanos();
tp::spin_start(node as usize, timeout_s, timeout_ns);
}
/// The `node` ended spinning function.
///
/// If the spinning function ended by a timeout, use `trace_spin_timeout` instead.
pub fn trace_spin_end(node: *const rcl_node_t) {
tp::spin_end(node as usize);
}
/// The `node` woke up from waiting on a wait set without reaching timeout.
pub fn trace_spin_wake(node: *const rcl_node_t) {
tp::spin_wake(node as usize);
}
/// The `node` timeouted while spinning
pub fn trace_spin_timeout(node: *const rcl_node_t) {
tp::spin_timeout(node as usize);
}
}

View File

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), "/r2r_tracepoints.rs"));

View File

@ -0,0 +1,161 @@
use crate::{tracepoint_fn, TracingId};
use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t};
#[cfg(feature = "tracing")]
use crate::tracetools_bindings as tp;
#[cfg(feature = "tracing")]
use std::{
ffi::{c_void, CString},
ptr::null,
};
#[cfg(feature = "tracing")]
const fn ref_to_c_void<T>(t: &T) -> *const std::ffi::c_void {
std::ptr::from_ref(t).cast()
}
#[cfg(feature = "tracing")]
macro_rules! c_void {
($e:ident) => {
($e) as *const std::ffi::c_void
};
}
// Documentation of tracepoints is based on https://github.com/ros2/ros2_tracing project.
// From file `tracetools/include/tracetools/tracetools.h`
tracepoint_fn! {
/// Message publication.
///
/// Records the pointer to the `message` being published at the `rclcpp`/`r2r` level.
///
/// Tracepoint: `ros2::rclcpp_publish
// Lint allow note: The message pointer is NOT dereferenced.
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn trace_publish(message: *const std::ffi::c_void) {
unsafe {
// first argument documentation:
// publisher_handle not used, but kept for API/ABI stability
tp::ros_trace_rclcpp_publish(null(), message);
}
}
/// Subscription object initialization.
///
/// Tracepoint to allow associating the `subscription_handle` from `rcl` with the address of rust `subscription` reference.
/// There can be more than 1 `subscription` for 1 `subscription_handle`.
///
/// Tracepoint: `ros2::rclcpp_subscription_init`
pub fn trace_subscription_init<S>(
subscription_handle: *const rcl_subscription_t, subscription: &S,
) {
unsafe {
tp::ros_trace_rclcpp_subscription_init(
subscription_handle.cast(),
ref_to_c_void(subscription),
);
}
}
/// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object.
///
/// Tracepoint: `ros2::rclcpp_subscription_callback_added`
pub fn trace_subscription_callback_added(subscription: TracingId<std::ffi::c_void>, callback_id: usize) {
unsafe {
tp::ros_trace_rclcpp_subscription_callback_added(
subscription.c_void(),
c_void!(callback_id),
);
}
}
/// Message taking.
///
/// Records the **reference** to the `message` being taken at the `rclcpp`/`r2r` level.
///
/// To trace messages pointed to by void pointer use [`trace_take_ptr`].
///
/// Tracepoint: `ros2::rclcpp_take`
pub fn trace_take<M>(message: &M) {
unsafe {
tp::ros_trace_rclcpp_take(ref_to_c_void(message));
}
}
/// Message taking.
///
/// Records the **void pointer** to the `message` being taken at the `rclcpp`/`r2r` level.
///
/// To trace messages by their reference use [`trace_take`].
///
/// Tracepoint: `ros2::rclcpp_take`
// Lint allow note: The message pointer is NOT dereferenced.
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn trace_take_ptr(message: *const std::ffi::c_void) {
unsafe {
tp::ros_trace_rclcpp_take(message);
}
}
/// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`.
///
/// Tracepoint: `ros2::rclcpp_service_callback_added`
pub fn trace_service_callback_added(service: TracingId<rcl_service_t>, callback_id: usize) {
unsafe { tp::ros_trace_rclcpp_service_callback_added(service.c_void(), c_void!(callback_id)) }
}
/// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle.
///
/// Tracepoint: `ros2::rclcpp_timer_callback_added`
pub fn trace_timer_callback_added(timer: TracingId<rcl_timer_t>, callback_id: usize) {
unsafe {
tp::ros_trace_rclcpp_timer_callback_added(timer.c_void(), c_void!(callback_id));
}
}
/// Tracepoint to allow associating the `timer` with a `node`.
///
/// Tracepoint: `ros2::rclcpp_timer_link_node`
pub fn trace_timer_link_node(timer: TracingId<rcl_timer_t>, node: TracingId<rcl_node_t>) {
unsafe {
tp::ros_trace_rclcpp_timer_link_node(timer.c_void(), node.c_void());
}
}
/// Tracepoint to allow associating demangled `function_symbol` with a `callback_id`.
///
/// Allocates memory to store the function symbol as a `CString`.
///
/// Tracepoint: `ros2::callback_register`
///
/// # Panics
/// If `function_symbol` contains a null byte.
pub fn trace_callback_register(callback_id: usize, function_symbol: &str) {
let function_symbol = CString::new(function_symbol)
.expect("r2r tracing: Cannot convert function_symbol to CString. It contains null byte.");
unsafe {
tp::ros_trace_rclcpp_callback_register(c_void!(callback_id), function_symbol.as_ptr());
}
}
/// Start of a callback
///
/// Set `is_intra_process` depending on whether this callback is done via intra-process or not
///
/// Tracepoint: `ros2::callback_start`
pub fn trace_callback_start(callback_id: usize, is_intra_process: bool) {
unsafe {
tp::ros_trace_callback_start(c_void!(callback_id), is_intra_process);
}
}
/// End of a callback.
///
/// Tracepoint: `ros2::callback_end`
pub fn trace_callback_end(callback_id: usize) {
unsafe {
tp::ros_trace_callback_end(c_void!(callback_id));
}
}
}

136
r2r_tracing/src/stream.rs Normal file
View File

@ -0,0 +1,136 @@
use std::{
ffi::c_void,
future::{self, Future},
};
use futures::{channel::mpsc::Receiver, stream::FusedStream, Stream, StreamExt as _};
use r2r_rcl::rcl_service_t;
#[cfg(feature = "tracing")]
use crate::Callback;
use crate::TracingId;
/// A stream wrapper containing tracing data.
///
/// When the `tracing` feature is enabled, you can use the [`Self::traced_callback`] method
/// to trace the execution of the callback. Stream polling is not traced.
#[derive(Debug)]
pub struct StreamWithTracingData<T> {
stream: Receiver<T>,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType,
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone, Copy)]
enum TracingIdWithType {
// The type of subscription is `c_void` because the actual type would
// be a generic R2R subscriber and not `rcl_subscription_t`.
Subscription(TracingId<c_void>),
Service(TracingId<rcl_service_t>),
}
impl<T> StreamWithTracingData<T> {
/// Converts the stream into a future that calls the provided callback for each message.
///
/// If `tracing` feature is enabled:
/// - Each time the callback is called, its start and end time will be traced by the
/// ROS 2 tracing framework.
/// - You should not poll the stream before calling this method because
/// otherwise it might confuse software that analyzes the trace.
pub fn traced_callback<C>(self, callback: C) -> impl Future<Output = ()> + Unpin
where
C: FnMut(T),
{
#[cfg(feature = "tracing")]
{
let mut callback_wrapper = match self.tracing_id {
TracingIdWithType::Subscription(id) => Callback::new_subscription(id, callback),
TracingIdWithType::Service(id) => Callback::new_service(id, callback),
};
self.stream.for_each(move |msg| {
callback_wrapper.call(msg);
future::ready(())
})
}
#[cfg(not(feature = "tracing"))]
{
let mut callback = callback;
self.stream.for_each(move |msg| {
callback(msg);
future::ready(())
})
}
}
}
impl<T> Stream for StreamWithTracingData<T> {
type Item = T;
fn poll_next(
self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
this.stream.poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<T> FusedStream for StreamWithTracingData<T> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
/// A builder for `StreamWithTracingData`.
///
/// This struct exists to allow creation of `StreamWithTracingData` without polluting its public API.
/// It is used internally by r2r and should not be reexported.
pub struct StreamWithTracingDataBuilder;
impl StreamWithTracingDataBuilder {
#[must_use]
#[allow(
clippy::not_unsafe_ptr_arg_deref,
reason = "The pointer is not dereferenced"
)]
#[cfg_attr(
not(feature = "tracing"),
expect(
unused_variables,
reason = "service_id is not saved if tracing is disabled"
)
)]
pub fn build_service<T>(
stream: Receiver<T>, service_id: TracingId<rcl_service_t>,
) -> StreamWithTracingData<T> {
StreamWithTracingData {
stream,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType::Service(service_id),
}
}
#[must_use]
#[cfg_attr(
not(feature = "tracing"),
expect(
unused_variables,
reason = "subscription_id is not saved if tracing is disabled"
)
)]
pub fn build_subscription<T>(
stream: Receiver<T>, subscription_id: TracingId<c_void>,
) -> StreamWithTracingData<T> {
StreamWithTracingData {
stream,
#[cfg(feature = "tracing")]
tracing_id: TracingIdWithType::Subscription(subscription_id),
}
}
}

View File

@ -0,0 +1,6 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(unused)]
include!(concat!(env!("OUT_DIR"), "/tracetools_bindings.rs"));

View File

@ -0,0 +1 @@
#include <tracetools/tracetools.h>

View File

@ -0,0 +1,77 @@
/// Unique identifier for tracing purposes
#[derive(Debug)]
pub struct TracingId<T> {
/// Pointer to the object used as a unique ID.
/// Safety: Do NOT dereference the pointer.
#[cfg(feature = "tracing")]
id: *const T,
/// Marker for the type. Needed when `tracing` feature is disabled.
#[cfg(not(feature = "tracing"))]
_marker: std::marker::PhantomData<T>,
}
impl<T> TracingId<T> {
/// Creates new `TracingId` from the pointer.
///
/// # Safety
/// The pointer is used as a unique ID so users must ensure that they never create `TracingId`
/// with same address for different objects.
///
/// The pointer does not need to point to valid memory.
pub const unsafe fn new(_id: *const T) -> Self {
Self {
#[cfg(feature = "tracing")]
id: _id,
#[cfg(not(feature = "tracing"))]
_marker: std::marker::PhantomData,
}
}
/// Erase the generic type of the ID.
#[must_use]
pub fn forget_type(self) -> TracingId<std::ffi::c_void> {
#[cfg(not(feature = "tracing"))]
unsafe {
// Safety: The ID cannot be obtained back without the `tracing` feature.
TracingId::new(std::ptr::null())
}
#[cfg(feature = "tracing")]
unsafe {
// Safety: self contains valid ID.
TracingId::new(self.c_void())
}
}
/// Obtain the address representing the ID.
///
/// # Safety
/// Do NOT dereference the pointer.
#[cfg(feature = "tracing")]
pub(crate) const unsafe fn c_void(self) -> *const std::ffi::c_void {
self.id.cast::<std::ffi::c_void>()
}
}
/// Deriving Clone for `TracingId` would only derive it only conditionally based on whether the
/// `T` is `Clone` or not. But TracingId is independent of T.
impl<T> Clone for TracingId<T> {
fn clone(&self) -> Self {
*self
}
}
/// Deriving Copy for `TracingId` would only derive it only conditionally based on whether the
/// `T` is `Copy` or not. But TracingId is independent of T.
impl<T> Copy for TracingId<T> {}
/// # Safety
///
/// The address is never dereferenced and is used only as a unique ID so it is safe to send to another thread.
unsafe impl<T> Send for TracingId<T> {}
/// # Safety
///
/// The `TracingId` does not allow interior mutability because the pointer is never dereferenced.
/// It is safe to use across multiple threads.
unsafe impl<T> Sync for TracingId<T> {}