Compare commits
14 Commits
262b3e3885
...
8aa9e89243
| Author | SHA1 | Date |
|---|---|---|
|
|
8aa9e89243 | |
|
|
c77b001762 | |
|
|
f77574c3a9 | |
|
|
148e1de8ac | |
|
|
583c45c9ea | |
|
|
2b4f9753d4 | |
|
|
cf03fe1b48 | |
|
|
e371130cf1 | |
|
|
23672cfdb8 | |
|
|
a51bda8901 | |
|
|
5514c475bc | |
|
|
455ea630ec | |
|
|
aa180c5b15 | |
|
|
56acba5dd5 |
|
|
@ -7,4 +7,5 @@ members = [
|
|||
"r2r_msg_gen",
|
||||
"r2r_macros",
|
||||
"r2r_rcl",
|
||||
"r2r_tracing",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ These bindings are being written organically when things are needed by me and ot
|
|||
How to use
|
||||
--------------------
|
||||
1. Make sure you have libclang installed. (e.g. libclang-dev on ubuntu)
|
||||
2. Depend on this package in Cargo.toml: `r2r = "0.9.4"`
|
||||
2. Depend on this package in Cargo.toml: `r2r = "0.9.5"`
|
||||
3. You need to source your ROS2 installation before building/running.
|
||||
4. The bindings will rebuild automatically if/when you source your workspace(s).
|
||||
5. If you make changes to existing message types, run `cargo clean -p r2r_msg_gen` to force recompilation of the rust message types on the next build.
|
||||
|
|
@ -47,6 +47,10 @@ Changelog
|
|||
--------------------
|
||||
#### [Unreleased]
|
||||
|
||||
#### [0.9.5] - 2025-04-22
|
||||
- Add `/set_parameters_atomically` <https://github.com/sequenceplanner/r2r/pull/120>, <https://github.com/sequenceplanner/r2r/pull/121>. Fixes `ros2 param ...` on Jazzy.
|
||||
- Update bindgen 0.63.0 -> 0.71.1 <https://github.com/sequenceplanner/r2r/pull/116>. (Issues may persist until next bindgen version, see PR).
|
||||
|
||||
#### [0.9.4] - 2024-11-21
|
||||
- Fix cargo syntax for older rust versions < 1.77 <https://github.com/sequenceplanner/r2r/commit/74ad4410c79b1be7e42eb1822a291639e3c40ec4>
|
||||
- Fix message generation for WChar idl type <https://github.com/sequenceplanner/r2r/commit/94db799db282c8b1e0222f1699e6a6420b5fd544>
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Easy to use, runtime-agnostic, async rust bindings for ROS2."
|
||||
license = "MIT AND Apache-2.0"
|
||||
|
|
@ -18,11 +18,12 @@ serde = { version = "1.0.147", features = ["derive"] }
|
|||
serde_json = "1.0.89"
|
||||
thiserror = "1.0.37"
|
||||
lazy_static = "1.4.0"
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
|
||||
r2r_actions = { path = "../r2r_actions", version = "0.9.4" }
|
||||
r2r_macros = { path = "../r2r_macros", version = "0.9.4" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
|
||||
r2r_actions = { path = "../r2r_actions", version = "0.9.5" }
|
||||
r2r_macros = { path = "../r2r_macros", version = "0.9.5" }
|
||||
r2r_tracing = { path = "../r2r_tracing", version = "0.9.5" }
|
||||
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
||||
futures = "0.3.25"
|
||||
log = "0.4.18"
|
||||
|
|
@ -38,8 +39,8 @@ cdr = "0.2.4"
|
|||
criterion = "0.5.1"
|
||||
|
||||
[build-dependencies]
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
|
||||
rayon = "1.7.0"
|
||||
force-send-sync = "1.0.0"
|
||||
quote = "1.0.28"
|
||||
|
|
@ -50,6 +51,7 @@ prettyplease = "0.2.6"
|
|||
[features]
|
||||
save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen", "r2r_actions/save-bindgen"]
|
||||
doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", "r2r_actions/doc-only"]
|
||||
tracing = ["r2r_tracing/tracing"]
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
features = ["doc-only"]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use futures::{executor::LocalPool, task::LocalSpawnExt};
|
||||
|
||||
use r2r::{std_msgs::msg, std_srvs::srv, QosProfile};
|
||||
|
||||
/// This example demonstrates creation of a service,
|
||||
/// subscriber and timers with their callback execution traced.
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ctx = r2r::Context::create()?;
|
||||
let mut node = r2r::Node::create(ctx, "testnode", "")?;
|
||||
|
||||
let mut pool = LocalPool::new();
|
||||
let spawner = pool.spawner();
|
||||
|
||||
// The traced callback is supplied directly to `subscribe_traced`
|
||||
// and `create_service_traced`functions.
|
||||
let subscriber_future = node
|
||||
.subscribe("/print", QosProfile::default())?
|
||||
.traced_callback(|msg: msg::String| {
|
||||
println!("Received message: '{}'", msg.data);
|
||||
});
|
||||
spawner.spawn_local(subscriber_future)?;
|
||||
|
||||
let mut value = false;
|
||||
let service_future = node
|
||||
.create_service::<srv::SetBool::Service>("/set_value", QosProfile::default())?
|
||||
.traced_callback(move |req| {
|
||||
if value == req.message.data {
|
||||
req.respond(srv::SetBool::Response {
|
||||
success: false,
|
||||
message: format!("Value is already {value}."),
|
||||
})
|
||||
.expect("could not send service response");
|
||||
} else {
|
||||
value = req.message.data;
|
||||
req.respond(srv::SetBool::Response {
|
||||
success: true,
|
||||
message: format!("Value set to {value}."),
|
||||
})
|
||||
.expect("could not send service response");
|
||||
}
|
||||
});
|
||||
spawner.spawn_local(service_future)?;
|
||||
|
||||
let mut counter = 0;
|
||||
let wall_timer_future =
|
||||
node.create_wall_timer(Duration::from_millis(500))?
|
||||
.on_tick(move |_| {
|
||||
counter += 1;
|
||||
println!("Wall timer tick: {counter}");
|
||||
});
|
||||
spawner.spawn_local(wall_timer_future)?;
|
||||
|
||||
let ros_timer_future = node.create_timer(Duration::from_secs(1))?.on_tick(|diff| {
|
||||
println!("ROS timer tick. Time elapsed since last tick: {diff:?}");
|
||||
});
|
||||
spawner.spawn_local(ros_timer_future)?;
|
||||
|
||||
loop {
|
||||
node.spin_once(std::time::Duration::from_millis(5));
|
||||
pool.run_until_stalled();
|
||||
}
|
||||
}
|
||||
|
|
@ -318,11 +318,10 @@ impl Client_ for UntypedClient_ {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_client_helper(
|
||||
node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
|
||||
qos_profile: QosProfile,
|
||||
) -> Result<rcl_client_t> {
|
||||
let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
|
||||
pub unsafe fn create_client_helper(
|
||||
client_handle: &mut rcl_client_t, node: *mut rcl_node_t, service_name: &str,
|
||||
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
|
||||
) -> Result<()> {
|
||||
let service_name_c_string =
|
||||
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
||||
|
||||
|
|
@ -330,15 +329,16 @@ pub fn create_client_helper(
|
|||
let mut client_options = rcl_client_get_default_options();
|
||||
client_options.qos = qos_profile.into();
|
||||
rcl_client_init(
|
||||
&mut client_handle,
|
||||
client_handle,
|
||||
node,
|
||||
service_ts,
|
||||
service_name_c_string.as_ptr(),
|
||||
&client_options,
|
||||
)
|
||||
};
|
||||
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(client_handle)
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::from_rcl_error(result))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,6 +124,7 @@ pub use clocks::{Clock, ClockType};
|
|||
|
||||
mod nodes;
|
||||
pub use nodes::{Node, Timer};
|
||||
pub use r2r_tracing::StreamWithTracingData;
|
||||
|
||||
pub mod qos;
|
||||
|
||||
|
|
|
|||
588
r2r/src/nodes.rs
588
r2r/src/nodes.rs
|
|
@ -4,6 +4,7 @@ use futures::{
|
|||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use indexmap::IndexMap;
|
||||
use r2r_tracing::{StreamWithTracingData, StreamWithTracingDataBuilder, TracingId};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ffi::{CStr, CString},
|
||||
|
|
@ -283,154 +284,150 @@ impl Node {
|
|||
.register_parameters("", None, &mut self.params.lock().unwrap())?;
|
||||
}
|
||||
let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
|
||||
let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
|
||||
|
||||
let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
|
||||
let mut set_atomically_event_tx = set_event_tx.clone();
|
||||
|
||||
let node_name = self.name()?;
|
||||
let set_params_request_stream = self
|
||||
|
||||
// rcl_interfaces/srv/SetParameters
|
||||
let params = self.params.clone();
|
||||
let params_struct_clone = params_struct.clone();
|
||||
let set_params_future = self
|
||||
.create_service::<rcl_interfaces::srv::SetParameters::Service>(
|
||||
&format!("{}/set_parameters", node_name),
|
||||
QosProfile::default(),
|
||||
)?;
|
||||
|
||||
let params = self.params.clone();
|
||||
let params_struct_clone = params_struct.clone();
|
||||
let set_params_future = set_params_request_stream.for_each(
|
||||
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
|
||||
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
|
||||
for p in &req.message.parameters {
|
||||
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
|
||||
let changed = params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&p.name)
|
||||
.map(|v| v.value != val)
|
||||
.unwrap_or(true); // changed=true if new
|
||||
let r = if let Some(ps) = ¶ms_struct_clone {
|
||||
// Update parameter structure
|
||||
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
|
||||
if result.is_ok() {
|
||||
// Also update Node::params
|
||||
)?
|
||||
.traced_callback(
|
||||
move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
|
||||
let mut result = rcl_interfaces::srv::SetParameters::Response::default();
|
||||
for p in &req.message.parameters {
|
||||
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
|
||||
let changed = params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&p.name)
|
||||
.map(|v| v.value != val)
|
||||
.unwrap_or(true); // changed=true if new
|
||||
let r = if let Some(ps) = ¶ms_struct_clone {
|
||||
// Update parameter structure
|
||||
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
|
||||
if result.is_ok() {
|
||||
// Also update Node::params
|
||||
params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(p.name.clone())
|
||||
.and_modify(|p| p.value = val.clone());
|
||||
}
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: result.is_ok(),
|
||||
reason: result.err().map_or("".into(), |e| e.to_string()),
|
||||
}
|
||||
} else {
|
||||
// No parameter structure - update only Node::params
|
||||
params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(p.name.clone())
|
||||
.and_modify(|p| p.value = val.clone());
|
||||
}
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: result.is_ok(),
|
||||
reason: result.err().map_or("".into(), |e| e.to_string()),
|
||||
}
|
||||
} else {
|
||||
// No parameter structure - update only Node::params
|
||||
params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(p.name.clone())
|
||||
.and_modify(|p| p.value = val.clone())
|
||||
.or_insert(Parameter::new(val.clone()));
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: true,
|
||||
reason: "".into(),
|
||||
}
|
||||
};
|
||||
// if the value changed, send out new value on parameter event stream
|
||||
if changed && r.successful {
|
||||
if let Err(e) = event_tx.try_send((p.name.clone(), val)) {
|
||||
log::debug!("Warning: could not send parameter event ({}).", e);
|
||||
.and_modify(|p| p.value = val.clone())
|
||||
.or_insert(Parameter::new(val.clone()));
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: true,
|
||||
reason: "".into(),
|
||||
}
|
||||
};
|
||||
// if the value changed, send out new value on parameter event stream
|
||||
if changed && r.successful {
|
||||
if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
|
||||
log::debug!("Warning: could not send parameter event ({}).", e);
|
||||
}
|
||||
}
|
||||
result.results.push(r);
|
||||
}
|
||||
result.results.push(r);
|
||||
}
|
||||
req.respond(result)
|
||||
.expect("could not send reply to set parameter request");
|
||||
future::ready(())
|
||||
},
|
||||
);
|
||||
req.respond(result)
|
||||
.expect("could not send reply to set parameter request");
|
||||
},
|
||||
);
|
||||
handlers.push(Box::pin(set_params_future));
|
||||
|
||||
// rcl_interfaces/srv/GetParameters
|
||||
let get_params_request_stream = self
|
||||
let params = self.params.clone();
|
||||
let params_struct_clone = params_struct.clone();
|
||||
let get_params_future = self
|
||||
.create_service::<rcl_interfaces::srv::GetParameters::Service>(
|
||||
&format!("{}/get_parameters", node_name),
|
||||
QosProfile::default(),
|
||||
)?;
|
||||
|
||||
let params = self.params.clone();
|
||||
let params_struct_clone = params_struct.clone();
|
||||
let get_params_future = get_params_request_stream.for_each(
|
||||
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
|
||||
let params = params.lock().unwrap();
|
||||
let values = req
|
||||
.message
|
||||
.names
|
||||
.iter()
|
||||
.map(|n| {
|
||||
// First try to get the parameter from the param structure
|
||||
if let Some(ps) = ¶ms_struct_clone {
|
||||
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
|
||||
return value;
|
||||
)?
|
||||
.traced_callback(
|
||||
move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
|
||||
let params = params.lock().unwrap();
|
||||
let values = req
|
||||
.message
|
||||
.names
|
||||
.iter()
|
||||
.map(|n| {
|
||||
// First try to get the parameter from the param structure
|
||||
if let Some(ps) = ¶ms_struct_clone {
|
||||
if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Otherwise get it from node HashMap
|
||||
match params.get(n) {
|
||||
Some(v) => v.value.clone(),
|
||||
None => ParameterValue::NotSet,
|
||||
}
|
||||
})
|
||||
.map(|v| v.into_parameter_value_msg())
|
||||
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
|
||||
// Otherwise get it from node HashMap
|
||||
match params.get(n) {
|
||||
Some(v) => v.value.clone(),
|
||||
None => ParameterValue::NotSet,
|
||||
}
|
||||
})
|
||||
.map(|v| v.into_parameter_value_msg())
|
||||
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
|
||||
|
||||
let result = rcl_interfaces::srv::GetParameters::Response { values };
|
||||
req.respond(result)
|
||||
.expect("could not send reply to set parameter request");
|
||||
future::ready(())
|
||||
},
|
||||
);
|
||||
let result = rcl_interfaces::srv::GetParameters::Response { values };
|
||||
req.respond(result)
|
||||
.expect("could not send reply to set parameter request");
|
||||
},
|
||||
);
|
||||
|
||||
handlers.push(Box::pin(get_params_future));
|
||||
|
||||
let params = self.params.clone();
|
||||
|
||||
// rcl_interfaces/srv/ListParameters
|
||||
use rcl_interfaces::srv::ListParameters;
|
||||
let list_params_request_stream = self.create_service::<ListParameters::Service>(
|
||||
&format!("{}/list_parameters", node_name),
|
||||
QosProfile::default(),
|
||||
)?;
|
||||
|
||||
let params = self.params.clone();
|
||||
let list_params_future = list_params_request_stream.for_each(
|
||||
move |req: ServiceRequest<ListParameters::Service>| {
|
||||
let list_params_future = self
|
||||
.create_service::<ListParameters::Service>(
|
||||
&format!("{}/list_parameters", node_name),
|
||||
QosProfile::default(),
|
||||
)?
|
||||
.traced_callback(move |req: ServiceRequest<ListParameters::Service>| {
|
||||
Self::handle_list_parameters(req, ¶ms)
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
handlers.push(Box::pin(list_params_future));
|
||||
|
||||
// rcl_interfaces/srv/DescribeParameters
|
||||
use rcl_interfaces::srv::DescribeParameters;
|
||||
let desc_params_request_stream = self.create_service::<DescribeParameters::Service>(
|
||||
&format!("{node_name}/describe_parameters"),
|
||||
QosProfile::default(),
|
||||
)?;
|
||||
|
||||
let params = self.params.clone();
|
||||
let desc_params_future = desc_params_request_stream.for_each(
|
||||
move |req: ServiceRequest<DescribeParameters::Service>| {
|
||||
let desc_params_future = self
|
||||
.create_service::<DescribeParameters::Service>(
|
||||
&format!("{node_name}/describe_parameters"),
|
||||
QosProfile::default(),
|
||||
)?
|
||||
.traced_callback(move |req: ServiceRequest<DescribeParameters::Service>| {
|
||||
Self::handle_desc_parameters(req, ¶ms)
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
handlers.push(Box::pin(desc_params_future));
|
||||
|
||||
// rcl_interfaces/srv/GetParameterTypes
|
||||
use rcl_interfaces::srv::GetParameterTypes;
|
||||
let get_param_types_request_stream = self.create_service::<GetParameterTypes::Service>(
|
||||
&format!("{node_name}/get_parameter_types"),
|
||||
QosProfile::default(),
|
||||
)?;
|
||||
|
||||
let params = self.params.clone();
|
||||
let get_param_types_future = get_param_types_request_stream.for_each(
|
||||
move |req: ServiceRequest<GetParameterTypes::Service>| {
|
||||
let get_param_types_future = self
|
||||
.create_service::<GetParameterTypes::Service>(
|
||||
&format!("{node_name}/get_parameter_types"),
|
||||
QosProfile::default(),
|
||||
)?
|
||||
.traced_callback(move |req: ServiceRequest<GetParameterTypes::Service>| {
|
||||
let params = params.lock().unwrap();
|
||||
let types = req
|
||||
.message
|
||||
|
|
@ -443,12 +440,91 @@ impl Node {
|
|||
.collect();
|
||||
req.respond(GetParameterTypes::Response { types })
|
||||
.expect("could not send reply to get parameter types request");
|
||||
future::ready(())
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
handlers.push(Box::pin(get_param_types_future));
|
||||
|
||||
// rcl_interfaces/srv/SetParametersAtomically
|
||||
let params = self.params.clone();
|
||||
let params_struct_clone = params_struct.clone();
|
||||
let set_params_atomically_future = self
|
||||
.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
|
||||
&format!("{}/set_parameters_atomically", node_name),
|
||||
QosProfile::default(),
|
||||
)?
|
||||
.traced_callback(
|
||||
move |req: ServiceRequest<
|
||||
rcl_interfaces::srv::SetParametersAtomically::Service,
|
||||
>| {
|
||||
let mut result =
|
||||
rcl_interfaces::srv::SetParametersAtomically::Response::default();
|
||||
result.result.successful = true;
|
||||
if let Some(ps) = ¶ms_struct_clone {
|
||||
for p in &req.message.parameters {
|
||||
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
|
||||
if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
|
||||
result.result.successful = false;
|
||||
result.result.reason =
|
||||
format!("Can't set parameter {}: {}", p.name, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if result.result.successful {
|
||||
// Since we checked them above now we assume these will be set ok...
|
||||
for p in &req.message.parameters {
|
||||
let val = ParameterValue::from_parameter_value_msg(p.value.clone());
|
||||
let changed = params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&p.name)
|
||||
.map(|v| v.value != val)
|
||||
.unwrap_or(true); // changed=true if new
|
||||
let r = if let Some(ps) = ¶ms_struct_clone {
|
||||
// Update parameter structure
|
||||
let result = ps.lock().unwrap().set_parameter(&p.name, &val);
|
||||
if result.is_ok() {
|
||||
// Also update Node::params
|
||||
params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(p.name.clone())
|
||||
.and_modify(|p| p.value = val.clone());
|
||||
}
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: result.is_ok(),
|
||||
reason: result.err().map_or("".into(), |e| e.to_string()),
|
||||
}
|
||||
} else {
|
||||
// No parameter structure - update only Node::params
|
||||
params
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(p.name.clone())
|
||||
.and_modify(|p| p.value = val.clone())
|
||||
.or_insert(Parameter::new(val.clone()));
|
||||
rcl_interfaces::msg::SetParametersResult {
|
||||
successful: true,
|
||||
reason: "".into(),
|
||||
}
|
||||
};
|
||||
// if the value changed, send out new value on parameter event stream
|
||||
if changed && r.successful {
|
||||
if let Err(e) =
|
||||
set_atomically_event_tx.try_send((p.name.clone(), val))
|
||||
{
|
||||
log::debug!("Warning: could not send parameter event ({}).", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
req.respond(result)
|
||||
.expect("could not send reply to set parameter request");
|
||||
},
|
||||
);
|
||||
|
||||
handlers.push(Box::pin(set_params_atomically_future));
|
||||
|
||||
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
|
||||
{
|
||||
// create TimeSource based on value of use_sim_time parameter
|
||||
|
|
@ -479,7 +555,7 @@ impl Node {
|
|||
fn handle_list_parameters(
|
||||
req: ServiceRequest<rcl_interfaces::srv::ListParameters::Service>,
|
||||
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
|
||||
) -> future::Ready<()> {
|
||||
) {
|
||||
use rcl_interfaces::srv::ListParameters;
|
||||
|
||||
let depth = req.message.depth;
|
||||
|
|
@ -516,13 +592,12 @@ impl Node {
|
|||
}
|
||||
req.respond(ListParameters::Response { result })
|
||||
.expect("could not send reply to list parameter request");
|
||||
future::ready(())
|
||||
}
|
||||
|
||||
fn handle_desc_parameters(
|
||||
req: ServiceRequest<rcl_interfaces::srv::DescribeParameters::Service>,
|
||||
params: &Arc<Mutex<IndexMap<String, Parameter>>>,
|
||||
) -> future::Ready<()> {
|
||||
) {
|
||||
use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters};
|
||||
let mut descriptors = Vec::<ParameterDescriptor>::new();
|
||||
let params = params.lock().unwrap();
|
||||
|
|
@ -538,7 +613,6 @@ impl Node {
|
|||
}
|
||||
req.respond(DescribeParameters::Response { descriptors })
|
||||
.expect("could not send reply to describe parameters request");
|
||||
future::ready(())
|
||||
}
|
||||
|
||||
/// Fetch a single ROS parameter.
|
||||
|
|
@ -569,19 +643,38 @@ impl Node {
|
|||
/// This function returns a `Stream` of ros messages.
|
||||
pub fn subscribe<T: 'static>(
|
||||
&mut self, topic: &str, qos_profile: QosProfile,
|
||||
) -> Result<impl Stream<Item = T> + Unpin>
|
||||
) -> Result<StreamWithTracingData<T>>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle =
|
||||
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
|
||||
let (sender, receiver) = mpsc::channel::<T>(10);
|
||||
|
||||
let ws = TypedSubscriber {
|
||||
rcl_handle: subscription_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut subscription = Box::new(TypedSubscriber {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
|
||||
sender,
|
||||
});
|
||||
|
||||
// SAFETY:
|
||||
// create_subscription_helper requires zero initialized subscription_handle -> done above
|
||||
// Completes initialization of subscription.
|
||||
unsafe {
|
||||
create_subscription_helper(
|
||||
&mut subscription.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
topic,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
self.subscribers.push(Box::new(ws));
|
||||
|
||||
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
|
||||
|
||||
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
|
||||
TracingId::new(&*subscription).forget_type()
|
||||
});
|
||||
|
||||
self.subscribers.push(subscription);
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
|
|
@ -590,19 +683,38 @@ impl Node {
|
|||
/// This function returns a `Stream` of ros messages without the rust convenience types.
|
||||
pub fn subscribe_native<T: 'static>(
|
||||
&mut self, topic: &str, qos_profile: QosProfile,
|
||||
) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
|
||||
) -> Result<StreamWithTracingData<WrappedNativeMsg<T>>>
|
||||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let subscription_handle =
|
||||
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
|
||||
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
|
||||
|
||||
let ws = NativeSubscriber {
|
||||
rcl_handle: subscription_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut subscription = Box::new(NativeSubscriber {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
|
||||
sender,
|
||||
});
|
||||
|
||||
// SAFETY:
|
||||
// create_subscription_helper requires zero initialized subscription_handle -> done above
|
||||
// Completes initialization of subscription.
|
||||
unsafe {
|
||||
create_subscription_helper(
|
||||
&mut subscription.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
topic,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
self.subscribers.push(Box::new(ws));
|
||||
|
||||
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
|
||||
|
||||
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
|
||||
TracingId::new(&subscription.rcl_handle).forget_type()
|
||||
});
|
||||
|
||||
self.subscribers.push(subscription);
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
|
|
@ -612,18 +724,37 @@ impl Node {
|
|||
/// Useful when you cannot know the type of the message at compile time.
|
||||
pub fn subscribe_untyped(
|
||||
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
|
||||
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
|
||||
) -> Result<StreamWithTracingData<Result<serde_json::Value>>> {
|
||||
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
|
||||
let subscription_handle =
|
||||
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
|
||||
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
|
||||
|
||||
let ws = UntypedSubscriber {
|
||||
rcl_handle: subscription_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut subscription = Box::new(UntypedSubscriber {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
|
||||
topic_type: topic_type.to_string(),
|
||||
sender,
|
||||
});
|
||||
|
||||
// SAFETY:
|
||||
// create_subscription_helper requires zero initialized subscription_handle -> done above
|
||||
// Completes initialization of subscription.
|
||||
unsafe {
|
||||
create_subscription_helper(
|
||||
&mut subscription.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
topic,
|
||||
msg.ts,
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
self.subscribers.push(Box::new(ws));
|
||||
|
||||
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
|
||||
|
||||
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
|
||||
TracingId::new(&subscription.rcl_handle).forget_type()
|
||||
});
|
||||
|
||||
self.subscribers.push(subscription);
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
|
|
@ -633,7 +764,7 @@ impl Node {
|
|||
/// Useful if you just want to pass the data along to another part of the system.
|
||||
pub fn subscribe_raw(
|
||||
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
|
||||
) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
|
||||
) -> Result<StreamWithTracingData<Vec<u8>>> {
|
||||
// TODO is it possible to handle the raw message without type support?
|
||||
//
|
||||
// Passing null ts to rcl_subscription_init throws an error ..
|
||||
|
|
@ -659,16 +790,35 @@ impl Node {
|
|||
return Err(Error::from_rcl_error(ret));
|
||||
}
|
||||
|
||||
let subscription_handle =
|
||||
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
|
||||
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
|
||||
|
||||
let ws = RawSubscriber {
|
||||
rcl_handle: subscription_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut subscription = Box::new(RawSubscriber {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_subscription() },
|
||||
msg_buf,
|
||||
sender,
|
||||
});
|
||||
|
||||
// SAFETY:
|
||||
// create_subscription_helper requires zero initialized subscription_handle -> done above
|
||||
// Completes initialization of subscription.
|
||||
unsafe {
|
||||
create_subscription_helper(
|
||||
&mut subscription.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
topic,
|
||||
msg.ts,
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
self.subscribers.push(Box::new(ws));
|
||||
|
||||
r2r_tracing::trace_subscription_init(&subscription.rcl_handle, &*subscription);
|
||||
|
||||
let receiver = StreamWithTracingDataBuilder::build_subscription(receiver, unsafe {
|
||||
TracingId::new(&subscription.rcl_handle).forget_type()
|
||||
});
|
||||
|
||||
self.subscribers.push(subscription);
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
|
|
@ -678,24 +828,41 @@ impl Node {
|
|||
/// `respond` on the Service Request to send the reply.
|
||||
pub fn create_service<T: 'static>(
|
||||
&mut self, service_name: &str, qos_profile: QosProfile,
|
||||
) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
|
||||
) -> Result<StreamWithTracingData<ServiceRequest<T>>>
|
||||
where
|
||||
T: WrappedServiceTypeSupport,
|
||||
{
|
||||
let service_handle = create_service_helper(
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
|
||||
|
||||
let ws = TypedService::<T> {
|
||||
rcl_handle: service_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut service_arc = Arc::new(Mutex::new(TypedService::<T> {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_service() },
|
||||
sender,
|
||||
}));
|
||||
let service_ref = Arc::get_mut(&mut service_arc)
|
||||
.unwrap() // No other Arc should exist. The Arc was just created.
|
||||
.get_mut()
|
||||
.unwrap(); // The mutex was just created. It should not be poisoned.
|
||||
|
||||
// SAFETY:
|
||||
// The service was zero initialized above.
|
||||
// Full initialization happens in `create_service_helper``.
|
||||
unsafe {
|
||||
create_service_helper(
|
||||
&mut service_ref.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
|
||||
self.services.push(Arc::new(Mutex::new(ws)));
|
||||
let receiver = StreamWithTracingDataBuilder::build_service(receiver, unsafe {
|
||||
TracingId::new(&service_ref.rcl_handle)
|
||||
});
|
||||
|
||||
// Only push after full initialization.
|
||||
self.services.push(service_arc);
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
|
|
@ -708,19 +875,30 @@ impl Node {
|
|||
where
|
||||
T: WrappedServiceTypeSupport,
|
||||
{
|
||||
let client_handle = create_client_helper(
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
let ws = TypedClient::<T> {
|
||||
rcl_handle: client_handle,
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut client_arc = Arc::new(Mutex::new(TypedClient::<T> {
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
|
||||
response_channels: Vec::new(),
|
||||
poll_available_channels: Vec::new(),
|
||||
}));
|
||||
let client_ref = Arc::get_mut(&mut client_arc)
|
||||
.unwrap() // No other Arc should exist. The Arc was just created.
|
||||
.get_mut()
|
||||
.unwrap(); // The mutex was just created. It should not be poisoned.
|
||||
|
||||
// SAFETY:
|
||||
// The client was zero initialized above.
|
||||
// Full initialization happens in `create_client_helper`.
|
||||
unsafe {
|
||||
create_client_helper(
|
||||
&mut client_ref.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
T::get_ts(),
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
|
||||
let client_arc = Arc::new(Mutex::new(ws));
|
||||
let c = make_client(Arc::downgrade(&client_arc));
|
||||
self.clients.push(client_arc);
|
||||
Ok(c)
|
||||
|
|
@ -736,20 +914,32 @@ impl Node {
|
|||
&mut self, service_name: &str, service_type: &str, qos_profile: QosProfile,
|
||||
) -> Result<ClientUntyped> {
|
||||
let service_type = UntypedServiceSupport::new_from(service_type)?;
|
||||
let client_handle = create_client_helper(
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
service_type.ts,
|
||||
qos_profile,
|
||||
)?;
|
||||
let client = UntypedClient_ {
|
||||
|
||||
// SAFETY: The `rcl_handle` is zero initialized (partial initialization) in this block.
|
||||
let mut client_arc = Arc::new(Mutex::new(UntypedClient_ {
|
||||
service_type,
|
||||
rcl_handle: client_handle,
|
||||
rcl_handle: unsafe { rcl_get_zero_initialized_client() },
|
||||
response_channels: Vec::new(),
|
||||
poll_available_channels: Vec::new(),
|
||||
}));
|
||||
let client_ref = Arc::get_mut(&mut client_arc)
|
||||
.unwrap() // No other Arc should exist. The Arc was just created.
|
||||
.get_mut()
|
||||
.unwrap(); // The mutex was just created. It should not be poisoned.
|
||||
|
||||
// SAFETY:
|
||||
// The client was zero initialized above.
|
||||
// Full initialization happens in `create_client_helper`.
|
||||
unsafe {
|
||||
create_client_helper(
|
||||
&mut client_ref.rcl_handle,
|
||||
self.node_handle.as_mut(),
|
||||
service_name,
|
||||
client_ref.service_type.ts,
|
||||
qos_profile,
|
||||
)?;
|
||||
};
|
||||
|
||||
let client_arc = Arc::new(Mutex::new(client));
|
||||
let c = make_untyped_client(Arc::downgrade(&client_arc));
|
||||
self.clients.push(client_arc);
|
||||
Ok(c)
|
||||
|
|
@ -882,11 +1072,10 @@ impl Node {
|
|||
where
|
||||
T: WrappedTypesupport,
|
||||
{
|
||||
let publisher_handle =
|
||||
let publisher_arc =
|
||||
create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
|
||||
let arc = Arc::new(publisher_handle);
|
||||
let p = make_publisher(Arc::downgrade(&arc));
|
||||
self.pubs.push(arc);
|
||||
let p = make_publisher(Arc::downgrade(&publisher_arc));
|
||||
self.pubs.push(publisher_arc);
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
|
|
@ -897,11 +1086,10 @@ impl Node {
|
|||
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
|
||||
) -> Result<PublisherUntyped> {
|
||||
let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?;
|
||||
let publisher_handle =
|
||||
let publisher_arc =
|
||||
create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?;
|
||||
let arc = Arc::new(publisher_handle);
|
||||
let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned());
|
||||
self.pubs.push(arc);
|
||||
let p = make_publisher_untyped(Arc::downgrade(&publisher_arc), topic_type.to_owned());
|
||||
self.pubs.push(publisher_arc);
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
|
|
@ -939,6 +1127,8 @@ impl Node {
|
|||
/// `timeout` is a duration specifying how long the spin should
|
||||
/// block for if there are no pending events.
|
||||
pub fn spin_once(&mut self, timeout: Duration) {
|
||||
r2r_tracing::trace_spin_start(&*self.node_handle, timeout);
|
||||
|
||||
// first handle any completed action cancellation responses
|
||||
for a in &mut self.action_servers {
|
||||
a.lock().unwrap().send_completed_cancel_requests();
|
||||
|
|
@ -1102,9 +1292,12 @@ impl Node {
|
|||
unsafe {
|
||||
rcl_wait_set_fini(&mut ws);
|
||||
}
|
||||
r2r_tracing::trace_spin_timeout(&*self.node_handle);
|
||||
return;
|
||||
}
|
||||
|
||||
r2r_tracing::trace_spin_wake(&*self.node_handle);
|
||||
|
||||
let mut subs_to_remove = vec![];
|
||||
if ws.subscriptions != std::ptr::null_mut() {
|
||||
let ws_subs =
|
||||
|
|
@ -1261,6 +1454,8 @@ impl Node {
|
|||
unsafe {
|
||||
rcl_wait_set_fini(&mut ws);
|
||||
}
|
||||
|
||||
r2r_tracing::trace_spin_end(&*self.node_handle);
|
||||
}
|
||||
|
||||
/// Returns a map of topic names and type names of the publishers
|
||||
|
|
@ -1364,9 +1559,15 @@ impl Node {
|
|||
_clock: Some(clock), // The timer owns the clock.
|
||||
sender: tx,
|
||||
};
|
||||
self.timers.push(timer);
|
||||
|
||||
let out_timer = Timer { receiver: rx };
|
||||
let out_timer = unsafe {
|
||||
Timer {
|
||||
receiver: rx,
|
||||
timer_handle: TracingId::new(timer.get_handle()),
|
||||
node_handle: TracingId::new(&*self.node_handle),
|
||||
}
|
||||
};
|
||||
self.timers.push(timer);
|
||||
|
||||
Ok(out_timer)
|
||||
}
|
||||
|
|
@ -1387,9 +1588,15 @@ impl Node {
|
|||
_clock: None, // The timer does not own the clock (the node owns it).
|
||||
sender: tx,
|
||||
};
|
||||
self.timers.push(timer);
|
||||
|
||||
let out_timer = Timer { receiver: rx };
|
||||
let out_timer = unsafe {
|
||||
Timer {
|
||||
receiver: rx,
|
||||
timer_handle: TracingId::new(timer.get_handle()),
|
||||
node_handle: TracingId::new(&*self.node_handle),
|
||||
}
|
||||
};
|
||||
self.timers.push(timer);
|
||||
|
||||
Ok(out_timer)
|
||||
}
|
||||
|
|
@ -1521,6 +1728,8 @@ impl Drop for Timer_ {
|
|||
/// A ROS timer.
|
||||
pub struct Timer {
|
||||
receiver: mpsc::Receiver<Duration>,
|
||||
timer_handle: TracingId<rcl_timer_t>,
|
||||
node_handle: TracingId<rcl_node_t>,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
|
|
@ -1535,6 +1744,23 @@ impl Timer {
|
|||
Err(Error::RCL_RET_TIMER_INVALID)
|
||||
}
|
||||
}
|
||||
|
||||
/// Transforms this timer stream to a [`Future`] calling the given `callback` on each tick.
|
||||
///
|
||||
/// The callback execution is traced by r2r_tracing.
|
||||
///
|
||||
/// This function should be called before dropping the timer's node.
|
||||
/// Otherwise, the trace data might be inconsistent.
|
||||
#[must_use = "Futures do nothing unless you `.await` or poll them"]
|
||||
pub fn on_tick<F: FnMut(Duration)>(self, callback: F) -> impl Future<Output = ()> + Unpin {
|
||||
let mut callback = r2r_tracing::Callback::new_timer(self.timer_handle, callback);
|
||||
r2r_tracing::trace_timer_link_node(self.timer_handle, self.node_handle);
|
||||
|
||||
self.receiver.for_each(move |duration| {
|
||||
callback.call(duration);
|
||||
future::ready(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Since publishers are temporarily upgraded to owners during the
|
||||
|
|
|
|||
|
|
@ -308,6 +308,7 @@ pub trait RosParams {
|
|||
) -> Result<()>;
|
||||
fn get_parameter(&mut self, param_name: &str) -> Result<ParameterValue>;
|
||||
fn set_parameter(&mut self, param_name: &str, param_val: &ParameterValue) -> Result<()>;
|
||||
fn check_parameter(&self, param_name: &str, param_val: &ParameterValue) -> Result<()>;
|
||||
}
|
||||
|
||||
// Implementation of RosParams for primitive types, i.e. leaf parameters
|
||||
|
|
@ -360,6 +361,26 @@ macro_rules! impl_ros_params {
|
|||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_parameter(
|
||||
&self, param_name: &str, param_val: &ParameterValue,
|
||||
) -> Result<()> {
|
||||
if param_name != "" {
|
||||
return Err(Error::InvalidParameterName {
|
||||
name: param_name.to_owned(),
|
||||
});
|
||||
}
|
||||
match param_val {
|
||||
$param_value_type(val) => {
|
||||
let _: Self = $from_param_conv(val)?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(Error::InvalidParameterType {
|
||||
name: "".to_string(), // will be completed by callers who know the name
|
||||
ty: std::stringify!($param_value_type),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
ffi::{c_void, CString},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::{Mutex, Once, Weak},
|
||||
sync::{Arc, Mutex, Once, Weak},
|
||||
};
|
||||
|
||||
use crate::{error::*, msg_types::*, qos::QosProfile};
|
||||
|
|
@ -137,15 +137,24 @@ pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> Publis
|
|||
pub fn create_publisher_helper(
|
||||
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
|
||||
qos_profile: QosProfile,
|
||||
) -> Result<Publisher_> {
|
||||
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
|
||||
) -> Result<Arc<Publisher_>> {
|
||||
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
||||
|
||||
// Allocate the memory now so that the location of the rcl handle
|
||||
// does not change after call to rcl_publisher_init.
|
||||
// This is important because tracing in rcl expects the handle to be at a fixed location.
|
||||
let mut publisher_arc = Arc::new(Publisher_ {
|
||||
handle: unsafe { rcl_get_zero_initialized_publisher() },
|
||||
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
|
||||
});
|
||||
let publisher_mut = Arc::get_mut(&mut publisher_arc)
|
||||
.expect("No other Arc should exist. The Arc was just created.");
|
||||
|
||||
let result = unsafe {
|
||||
let mut publisher_options = rcl_publisher_get_default_options();
|
||||
publisher_options.qos = qos_profile.into();
|
||||
rcl_publisher_init(
|
||||
&mut publisher_handle,
|
||||
&mut publisher_mut.handle,
|
||||
node,
|
||||
typesupport,
|
||||
topic_c_string.as_ptr(),
|
||||
|
|
@ -153,10 +162,7 @@ pub fn create_publisher_helper(
|
|||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(Publisher_ {
|
||||
handle: publisher_handle,
|
||||
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
|
||||
})
|
||||
Ok(publisher_arc)
|
||||
} else {
|
||||
Err(Error::from_rcl_error(result))
|
||||
}
|
||||
|
|
@ -176,6 +182,8 @@ impl PublisherUntyped {
|
|||
let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
|
||||
native_msg.from_json(msg)?;
|
||||
|
||||
r2r_tracing::trace_publish(native_msg.void_ptr());
|
||||
|
||||
let result = unsafe {
|
||||
rcl_publish(
|
||||
&publisher.handle as *const rcl_publisher_t,
|
||||
|
|
@ -214,6 +222,8 @@ impl PublisherUntyped {
|
|||
allocator: unsafe { rcutils_get_default_allocator() },
|
||||
};
|
||||
|
||||
r2r_tracing::trace_publish((&msg_buf as *const rcl_serialized_message_t).cast::<c_void>());
|
||||
|
||||
let result = unsafe {
|
||||
rcl_publish_serialized_message(
|
||||
&publisher.handle,
|
||||
|
|
@ -271,6 +281,9 @@ where
|
|||
.upgrade()
|
||||
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
||||
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
|
||||
|
||||
r2r_tracing::trace_publish(native_msg.void_ptr());
|
||||
|
||||
let result = unsafe {
|
||||
rcl_publish(
|
||||
&publisher.handle as *const rcl_publisher_t,
|
||||
|
|
@ -357,6 +370,8 @@ where
|
|||
.upgrade()
|
||||
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
|
||||
|
||||
r2r_tracing::trace_publish(msg.void_ptr());
|
||||
|
||||
let result = if msg.is_loaned {
|
||||
unsafe {
|
||||
// signal that we are relinquishing responsibility of the memory
|
||||
|
|
|
|||
|
|
@ -111,11 +111,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_service_helper(
|
||||
node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
|
||||
qos_profile: QosProfile,
|
||||
) -> Result<rcl_service_t> {
|
||||
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
|
||||
/// Initializes the service.
|
||||
///
|
||||
/// SAFETY: requires that the service handle is zero initialized by [`rcl_get_zero_initialized_service`].
|
||||
pub unsafe fn create_service_helper(
|
||||
service_handle: &mut rcl_service_t, node: &mut rcl_node_t, service_name: &str,
|
||||
service_ts: *const rosidl_service_type_support_t, qos_profile: QosProfile,
|
||||
) -> Result<()> {
|
||||
let service_name_c_string =
|
||||
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
||||
|
||||
|
|
@ -123,7 +125,7 @@ pub fn create_service_helper(
|
|||
let mut service_options = rcl_service_get_default_options();
|
||||
service_options.qos = qos_profile.into();
|
||||
rcl_service_init(
|
||||
&mut service_handle,
|
||||
service_handle,
|
||||
node,
|
||||
service_ts,
|
||||
service_name_c_string.as_ptr(),
|
||||
|
|
@ -131,7 +133,7 @@ pub fn create_service_helper(
|
|||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(service_handle)
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::from_rcl_error(result))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,6 +55,8 @@ where
|
|||
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
|
||||
};
|
||||
if ret == RCL_RET_OK as i32 {
|
||||
r2r_tracing::trace_take_ptr(msg.void_ptr());
|
||||
|
||||
let msg = T::from_native(&msg);
|
||||
if let Err(e) = self.sender.try_send(msg) {
|
||||
if e.is_disconnected() {
|
||||
|
|
@ -138,6 +140,9 @@ where
|
|||
new_msg
|
||||
}
|
||||
};
|
||||
|
||||
r2r_tracing::trace_take_ptr(msg.void_ptr());
|
||||
|
||||
if let Err(e) = self.sender.try_send(msg) {
|
||||
if e.is_disconnected() {
|
||||
// user dropped the handle to the stream, signal removal.
|
||||
|
|
@ -168,6 +173,8 @@ impl Subscriber_ for UntypedSubscriber {
|
|||
rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
|
||||
};
|
||||
if ret == RCL_RET_OK as i32 {
|
||||
r2r_tracing::trace_take_ptr(msg.void_ptr());
|
||||
|
||||
let json = msg.to_json();
|
||||
if let Err(e) = self.sender.try_send(json) {
|
||||
if e.is_disconnected() {
|
||||
|
|
@ -215,6 +222,8 @@ impl Subscriber_ for RawSubscriber {
|
|||
}
|
||||
};
|
||||
|
||||
r2r_tracing::trace_take(&self.msg_buf);
|
||||
|
||||
if let Err(e) = self.sender.try_send(data_bytes) {
|
||||
if e.is_disconnected() {
|
||||
// user dropped the handle to the stream, signal removal.
|
||||
|
|
@ -234,18 +243,17 @@ impl Subscriber_ for RawSubscriber {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_subscription_helper(
|
||||
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
|
||||
qos_profile: QosProfile,
|
||||
) -> Result<rcl_subscription_t> {
|
||||
let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
|
||||
pub unsafe fn create_subscription_helper(
|
||||
subscription_handle: &mut rcl_subscription_t, node: &mut rcl_node_t, topic: &str,
|
||||
ts: *const rosidl_message_type_support_t, qos_profile: QosProfile,
|
||||
) -> Result<()> {
|
||||
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
|
||||
|
||||
let result = unsafe {
|
||||
let mut subscription_options = rcl_subscription_get_default_options();
|
||||
subscription_options.qos = qos_profile.into();
|
||||
rcl_subscription_init(
|
||||
&mut subscription_handle,
|
||||
subscription_handle,
|
||||
node,
|
||||
ts,
|
||||
topic_c_string.as_ptr(),
|
||||
|
|
@ -253,7 +261,7 @@ pub fn create_subscription_helper(
|
|||
)
|
||||
};
|
||||
if result == RCL_RET_OK as i32 {
|
||||
Ok(subscription_handle)
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::from_rcl_error(result))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,14 +9,18 @@ use crate::{
|
|||
Clock, ClockType, Node, QosProfile, WrappedTypesupport,
|
||||
};
|
||||
use r2r_rcl::{
|
||||
rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
|
||||
rmw_message_info_t, RCL_RET_OK,
|
||||
rcl_get_zero_initialized_subscription, rcl_node_t, rcl_subscription_fini, rcl_subscription_t,
|
||||
rcl_take, rcl_time_point_value_t, rmw_message_info_t, RCL_RET_OK,
|
||||
};
|
||||
use r2r_tracing::TracingId;
|
||||
use std::{
|
||||
any::type_name,
|
||||
sync::{Arc, Mutex, Weak},
|
||||
};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
|
||||
/// Provides time from `/clock` topic to attached ROS clocks
|
||||
///
|
||||
/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
|
||||
/// By default, only clock used by ROS timers is attached and time from `/clock` topic is disabled.
|
||||
///
|
||||
/// The time from `/clock` topic can be activated by either of these:
|
||||
/// - calling [`TimeSource::enable_sim_time`]
|
||||
|
|
@ -102,7 +106,7 @@ impl TimeSource {
|
|||
match inner.subscriber_state {
|
||||
TimeSourceSubscriberState::None => {
|
||||
let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
|
||||
node.subscribers.push(Box::new(subscriber));
|
||||
node.subscribers.push(subscriber);
|
||||
inner.subscriber_state = TimeSourceSubscriberState::Active;
|
||||
}
|
||||
TimeSourceSubscriberState::ToBeDestroyed => {
|
||||
|
|
@ -191,20 +195,39 @@ impl TimeSource_ {
|
|||
}
|
||||
|
||||
impl TimeSourceSubscriber {
|
||||
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
|
||||
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<Box<Self>> {
|
||||
// The values are set based on default values in rclcpp
|
||||
let qos = QosProfile::default().keep_last(1).best_effort();
|
||||
|
||||
let subscriber = create_subscription_helper(
|
||||
node_handle,
|
||||
"/clock",
|
||||
crate::rosgraph_msgs::msg::Clock::get_ts(),
|
||||
qos,
|
||||
)?;
|
||||
Ok(Self {
|
||||
subscriber_handle: subscriber,
|
||||
let mut subscriber = Box::new(Self {
|
||||
subscriber_handle: unsafe { rcl_get_zero_initialized_subscription() },
|
||||
time_source,
|
||||
})
|
||||
});
|
||||
|
||||
// SAFETY:
|
||||
// create_subscription_helper requires zero initialized subscription_handle -> done above
|
||||
// Completes initialization of subscription.
|
||||
unsafe {
|
||||
create_subscription_helper(
|
||||
&mut subscriber.subscriber_handle,
|
||||
node_handle,
|
||||
"/clock",
|
||||
crate::rosgraph_msgs::msg::Clock::get_ts(),
|
||||
qos,
|
||||
)?
|
||||
};
|
||||
|
||||
// Use TimeSource inner arc address as callback_id. We hope that this id is much larger value than
|
||||
// any value generated by `r2r_tracing::Callback::new` which uses sequential values from 1.
|
||||
let callback_id = Arc::as_ptr(&subscriber.time_source.inner) as usize;
|
||||
r2r_tracing::trace_subscription_init(&subscriber.subscriber_handle, &*subscriber);
|
||||
r2r_tracing::trace_subscription_callback_added(
|
||||
unsafe { TracingId::new(&*subscriber).forget_type() },
|
||||
callback_id,
|
||||
);
|
||||
r2r_tracing::trace_callback_register(callback_id, type_name::<TimeSource>());
|
||||
|
||||
Ok(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -226,11 +249,16 @@ impl Subscriber_ for TimeSourceSubscriber {
|
|||
)
|
||||
};
|
||||
|
||||
r2r_tracing::trace_take_ptr(clock_msg.void_ptr());
|
||||
|
||||
let mut inner_time_source = self.time_source.inner.lock().unwrap();
|
||||
if ret == RCL_RET_OK as i32 {
|
||||
let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
|
||||
|
||||
let time_source_ptr = Arc::as_ptr(&self.time_source.inner) as usize;
|
||||
r2r_tracing::trace_callback_start(time_source_ptr, false);
|
||||
inner_time_source.set_clock_time(msg.clock);
|
||||
r2r_tracing::trace_callback_end(time_source_ptr);
|
||||
}
|
||||
|
||||
match inner_time_source.subscriber_state {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r_actions"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Internal dependency to the r2r crate."
|
||||
license = "MIT"
|
||||
|
|
@ -11,12 +11,12 @@ repository = "https://github.com/sequenceplanner/r2r"
|
|||
documentation = "https://docs.rs/r2r/latest/r2r"
|
||||
|
||||
[dependencies]
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.4" }
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
|
||||
r2r_msg_gen = { path = "../r2r_msg_gen", version = "0.9.5" }
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0.63.0"
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
bindgen = "0.71.1"
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
|
||||
[features]
|
||||
save-bindgen = ["r2r_rcl/save-bindgen", "r2r_msg_gen/save-bindgen"]
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
#
|
||||
# For r2r 0.9.4
|
||||
# For r2r 0.9.5
|
||||
#
|
||||
# cmake code for simple colcon integration.
|
||||
# See https://github.com/m-dahl/r2r_minimal_node/
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r_common"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Minimal ros2 bindings."
|
||||
license = "MIT"
|
||||
|
|
@ -11,7 +11,7 @@ repository = "https://github.com/sequenceplanner/r2r"
|
|||
documentation = "https://docs.rs/r2r/latest/r2r"
|
||||
|
||||
[dependencies]
|
||||
bindgen = "0.63.0"
|
||||
bindgen = "0.71.1"
|
||||
sha2 = "0.10.6"
|
||||
os_str_bytes = "6.5.1"
|
||||
regex = "1.8.4"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r_macros"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Minimal ros2 bindings."
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ pub fn derive_r2r_params(input: proc_macro::TokenStream) -> proc_macro::TokenStr
|
|||
let get_param_matches = param_matches_for(quote!(get_parameter(suffix)), &input.data);
|
||||
let set_param_matches =
|
||||
param_matches_for(quote!(set_parameter(suffix, param_val)), &input.data);
|
||||
let check_param_matches =
|
||||
param_matches_for(quote!(check_parameter(suffix, param_val)), &input.data);
|
||||
|
||||
let expanded = quote! {
|
||||
// The generated impl.
|
||||
|
|
@ -64,6 +66,20 @@ pub fn derive_r2r_params(input: proc_macro::TokenStream) -> proc_macro::TokenStr
|
|||
};
|
||||
result.map_err(|e| e.update_param_name(¶m_name))
|
||||
}
|
||||
fn check_parameter(&self, param_name: &str, param_val: &::r2r::ParameterValue) -> ::r2r::Result<()>
|
||||
{
|
||||
let (prefix, suffix) = match param_name.split_once('.') {
|
||||
None => (param_name, ""),
|
||||
Some((prefix, suffix)) => (prefix, suffix)
|
||||
};
|
||||
let result = match prefix {
|
||||
#check_param_matches
|
||||
_ => Err(::r2r::Error::InvalidParameterName {
|
||||
name: "".into(),
|
||||
}),
|
||||
};
|
||||
result.map_err(|e| e.update_param_name(¶m_name))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r_msg_gen"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Internal dependency to the r2r crate."
|
||||
license = "MIT"
|
||||
|
|
@ -11,8 +11,8 @@ repository = "https://github.com/sequenceplanner/r2r"
|
|||
documentation = "https://docs.rs/r2r/latest/r2r"
|
||||
|
||||
[dependencies]
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
phf = { version = "0.11.1", features = ["macros"] }
|
||||
quote = "1.0.28"
|
||||
proc-macro2 = "1.0.60"
|
||||
|
|
@ -21,9 +21,9 @@ force-send-sync = "1.0.0"
|
|||
rayon = "1.7.0"
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0.63.0"
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.4" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
bindgen = "0.71.1"
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
quote = "1.0.28"
|
||||
syn = { version = "2.0.18", features = ["full"] }
|
||||
rayon = "1.7.0"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "r2r_rcl"
|
||||
version = "0.9.4"
|
||||
version = "0.9.5"
|
||||
authors = ["Martin Dahl <martin.dahl@gmail.com>"]
|
||||
description = "Internal dependency to the r2r crate."
|
||||
license = "MIT"
|
||||
|
|
@ -15,8 +15,8 @@ paste = "1.0.9"
|
|||
widestring = "1.0.2"
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0.63.0"
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.4" }
|
||||
bindgen = "0.71.1"
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5" }
|
||||
|
||||
[features]
|
||||
save-bindgen = []
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
[package]
|
||||
name = "r2r_tracing"
|
||||
description = "Internal dependency containing tracepoint definitions or imports for r2r."
|
||||
authors = ["Martin Škoudlil <skoudmar@cvut.cz>"]
|
||||
version = "0.9.5"
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
repository = "https://github.com/sequenceplanner/r2r"
|
||||
readme = "README.md"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.25"
|
||||
r2r_rcl = { path = "../r2r_rcl", version = "0.9.5" }
|
||||
|
||||
[build-dependencies]
|
||||
lttng-ust-generate = { git = "https://github.com/skoudmar/lttng-ust-rs.git", rev = "bd437b5eeb8c4378d78ebb5aa17f0f4a95828b32", optional = true}
|
||||
r2r_common = { path = "../r2r_common", version = "0.9.5", optional = true}
|
||||
|
||||
[features]
|
||||
tracing = ["dep:lttng-ust-generate", "dep:r2r_common"]
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
# r2r_tracing
|
||||
|
||||
Internal dependency containing tracepoint definitions or imports from `tracetools` ROS package for r2r.
|
||||
|
||||
Uses LTTng tracing framework.
|
||||
|
||||
## Feature flag `tracing`
|
||||
|
||||
The crate will generate and link tracepoint libraries only when the feature flag is enabled.
|
||||
|
||||
Without specifying the feature flag `tracing` all exported functions are No-ops.
|
||||
|
||||
## Depends on
|
||||
|
||||
- `tracetools` ROS package
|
||||
- This package is a part of ROS distribution.
|
||||
- `r2r_tracing` dynamically loads `tracetools` library to obtain tracepoints used by `rclcpp`.
|
||||
- `lttng-ust` crate
|
||||
- To define additional tracepoints.
|
||||
|
||||
## Recording traces of R2R applications
|
||||
|
||||
Make sure to enable feature flag `tracing`.
|
||||
|
||||
Then start tracing session:
|
||||
|
||||
- Either by installing [`ros2trace`](https://index.ros.org/p/ros2trace/) and running:
|
||||
|
||||
```sh
|
||||
ros2 trace -u 'ros2:*' 'r2r:*'
|
||||
```
|
||||
|
||||
The traces will be available in `$HOME/.ros/tracing/session-<timestamp>`.
|
||||
|
||||
- Alternatively, you can trace your application directly with LTTng:
|
||||
|
||||
```sh
|
||||
# Session name is an optional user-chosen name for the trace
|
||||
lttng create [session-name]
|
||||
lttng enable-event -u 'ros2:*,r2r:*'
|
||||
lttng add-context -u --type=vtid --type=vpid --type=procname
|
||||
lttng start
|
||||
# Start the ROS system here.
|
||||
# Let it run for as long as you want to trace it.
|
||||
lttng destroy
|
||||
```
|
||||
|
||||
The traces will be available in `$HOME/lttng-traces/<session-name>-<timestamp>`.
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
#[cfg(all(feature = "tracing", not(target_os = "linux")))]
|
||||
compile_error!("Feature 'tracing' is only available on Linux.");
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
fn main() {}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
fn main() {
|
||||
use tracing::{generate_r2r_tracepoints, generate_rclcpp_tracepoint_bindings};
|
||||
|
||||
generate_rclcpp_tracepoint_bindings();
|
||||
generate_r2r_tracepoints();
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
mod tracing {
|
||||
use lttng_ust_generate::{CIntegerType, CTFType, Generator, Provider};
|
||||
use std::{env, path::PathBuf};
|
||||
|
||||
macro_rules! create_tracepoint {
|
||||
($provider:ident::$name:ident($($arg_name:ident: $arg_lttng_type:expr),* $(,)?)) => {
|
||||
$provider.create_class(concat!(stringify!($name), "_class"))
|
||||
$(
|
||||
.add_field(stringify!($arg_name), $arg_lttng_type)
|
||||
)*
|
||||
.instantiate(stringify!($name))
|
||||
};
|
||||
}
|
||||
|
||||
/// Generate bindings to the rclcpp tracepoints defined in the tracetools ros2 package.
|
||||
pub(crate) fn generate_rclcpp_tracepoint_bindings() {
|
||||
let bindings = r2r_common::setup_bindgen_builder()
|
||||
.header("src/tracetools_wrapper.h")
|
||||
.allowlist_function("ros_trace_rclcpp_.*")
|
||||
.allowlist_function("ros_trace_callback_.*")
|
||||
.generate_comments(true)
|
||||
.generate()
|
||||
.expect("Unable to generate bindings for tracetools");
|
||||
|
||||
println!("cargo:rustc-link-lib=tracetools");
|
||||
|
||||
// Write the bindings to the $OUT_DIR/tracetools_bindings.rs file.
|
||||
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
|
||||
bindings
|
||||
.write_to_file(out_path.join("tracetools_bindings.rs"))
|
||||
.expect("Couldn't write tracetools bindings!");
|
||||
}
|
||||
|
||||
pub(crate) fn generate_r2r_tracepoints() {
|
||||
let mut r2r = Provider::new("r2r");
|
||||
|
||||
create_tracepoint!(r2r::spin_start(
|
||||
node_handle: CTFType::IntegerHex(CIntegerType::USize),
|
||||
timeout_s: CTFType::Integer(CIntegerType::U64),
|
||||
timeout_ns: CTFType::Integer(CIntegerType::U32),
|
||||
));
|
||||
create_tracepoint!(r2r::spin_end(
|
||||
node_handle: CTFType::IntegerHex(CIntegerType::USize),
|
||||
));
|
||||
create_tracepoint!(r2r::spin_wake(
|
||||
node_handle: CTFType::IntegerHex(CIntegerType::USize),
|
||||
));
|
||||
create_tracepoint!(r2r::spin_timeout(
|
||||
node_handle: CTFType::IntegerHex(CIntegerType::USize),
|
||||
));
|
||||
|
||||
Generator::default()
|
||||
.generated_lib_name("r2r_tracepoints_r2r")
|
||||
.register_provider(r2r)
|
||||
.output_file_name(
|
||||
PathBuf::from(env::var("OUT_DIR").unwrap()).join("r2r_tracepoints.rs"),
|
||||
)
|
||||
.generate()
|
||||
.expect("Unable to generate tracepoint bindings for r2r");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
use crate::{
|
||||
trace_callback_register, trace_service_callback_added, trace_subscription_callback_added,
|
||||
trace_timer_callback_added, TracingId,
|
||||
};
|
||||
use r2r_rcl::{rcl_service_t, rcl_timer_t};
|
||||
use std::{
|
||||
any::type_name,
|
||||
ffi::c_void,
|
||||
marker::PhantomData,
|
||||
sync::atomic::{AtomicUsize, Ordering::Relaxed},
|
||||
};
|
||||
|
||||
/// Tracing wrapper for callback
|
||||
pub struct Callback<F, M>
|
||||
where
|
||||
F: FnMut(M),
|
||||
{
|
||||
func: F,
|
||||
id: usize,
|
||||
msg_type: PhantomData<M>,
|
||||
}
|
||||
|
||||
impl<F, M> Callback<F, M>
|
||||
where
|
||||
F: FnMut(M),
|
||||
{
|
||||
/// Generates unique ID for the callback
|
||||
fn gen_id() -> usize {
|
||||
static COUNTER: AtomicUsize = AtomicUsize::new(1);
|
||||
COUNTER.fetch_add(1, Relaxed)
|
||||
}
|
||||
|
||||
fn new(callback: F, id: usize) -> Self {
|
||||
trace_callback_register(id, type_name::<F>());
|
||||
|
||||
Self {
|
||||
func: callback,
|
||||
id,
|
||||
msg_type: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Emits trace event associating this `callback` with the `service`.
|
||||
///
|
||||
/// Wraps the callback to allow tracing the callback calls.
|
||||
pub fn new_service(service: TracingId<rcl_service_t>, callback: F) -> Self {
|
||||
let id = Self::gen_id();
|
||||
trace_service_callback_added(service, id);
|
||||
|
||||
Self::new(callback, id)
|
||||
}
|
||||
|
||||
/// Emits trace event associating this `callback` with the `timer`.
|
||||
///
|
||||
/// Wraps the callback to allow tracing the callback calls.
|
||||
pub fn new_timer(timer: TracingId<rcl_timer_t>, callback: F) -> Self {
|
||||
let id = Self::gen_id();
|
||||
trace_timer_callback_added(timer, id);
|
||||
|
||||
Self::new(callback, id)
|
||||
}
|
||||
|
||||
/// Emits trace event associating this `callback` with the `subscription`.
|
||||
///
|
||||
/// Wraps the callback to allow tracing the callback calls.
|
||||
pub fn new_subscription(subscriber: TracingId<c_void>, callback: F) -> Self {
|
||||
let id = Self::gen_id();
|
||||
trace_subscription_callback_added(subscriber, id);
|
||||
|
||||
Self::new(callback, id)
|
||||
}
|
||||
|
||||
/// Call the `callback`.
|
||||
/// This emits `ros2:callback_start` and `ros2:callback_end` events at
|
||||
/// the beginning and end respectively.
|
||||
pub fn call(&mut self, msg: M) {
|
||||
crate::trace_callback_start(self.id, false);
|
||||
(self.func)(msg);
|
||||
crate::trace_callback_end(self.id);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
//! Tracepoint provider for the `r2r` crate.
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
mod r2r_tracepoints_bindings;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
mod tracetools_bindings;
|
||||
|
||||
mod rclcpp_tracepoints;
|
||||
pub use rclcpp_tracepoints::*;
|
||||
|
||||
mod r2r_tracepoints;
|
||||
pub use r2r_tracepoints::*;
|
||||
|
||||
mod callback;
|
||||
pub use callback::Callback;
|
||||
|
||||
mod stream;
|
||||
pub use stream::{StreamWithTracingData, StreamWithTracingDataBuilder};
|
||||
|
||||
mod tracing_id;
|
||||
pub use tracing_id::TracingId;
|
||||
|
||||
mod macros;
|
||||
use macros::tracepoint_fn;
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/// Generates two versions of each function definition wrapped via this macro:
|
||||
/// - One with tracing enabled: Adds a feature flag and copies the function signature and body as is.
|
||||
/// - One with tracing disabled: Adds a feature flag and replaces the function body to be empty.
|
||||
macro_rules! tracepoint_fn {
|
||||
($($(#[$atts:meta])* $vi:vis fn $name:ident$(<$generic:tt>)?($($arg_name:ident : $arg_ty:ty),* $(,)?) $(-> $ret_ty:ty)? $body:block)*) => {
|
||||
$(
|
||||
$(#[$atts])*
|
||||
#[inline]
|
||||
#[cfg(feature = "tracing")]
|
||||
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? $body
|
||||
|
||||
$(#[$atts])*
|
||||
#[doc = ""] // Empty doc line to start a new paragraph.
|
||||
#[doc = "Tracing is currently disabled so this function is a no-op. Enable `tracing` feature to enable tracing."]
|
||||
#[inline(always)]
|
||||
#[allow(unused_variables)]
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
$vi fn $name$(<$generic>)?($($arg_name : $arg_ty),*) $(-> $ret_ty)? {
|
||||
// Do nothing
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
// set macro visibility to public in crate only
|
||||
pub(crate) use tracepoint_fn;
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
#[cfg(feature = "tracing")]
|
||||
use crate::r2r_tracepoints_bindings::r2r as tp;
|
||||
use crate::tracepoint_fn;
|
||||
use r2r_rcl::rcl_node_t;
|
||||
use std::time::Duration;
|
||||
|
||||
tracepoint_fn! {
|
||||
/// The `node` started spinning with given `timeout`.
|
||||
pub fn trace_spin_start(node: *const rcl_node_t, timeout: Duration) {
|
||||
let timeout_s = timeout.as_secs();
|
||||
let timeout_ns = timeout.subsec_nanos();
|
||||
|
||||
tp::spin_start(node as usize, timeout_s, timeout_ns);
|
||||
}
|
||||
|
||||
/// The `node` ended spinning function.
|
||||
///
|
||||
/// If the spinning function ended by a timeout, use `trace_spin_timeout` instead.
|
||||
pub fn trace_spin_end(node: *const rcl_node_t) {
|
||||
tp::spin_end(node as usize);
|
||||
}
|
||||
|
||||
/// The `node` woke up from waiting on a wait set without reaching timeout.
|
||||
pub fn trace_spin_wake(node: *const rcl_node_t) {
|
||||
tp::spin_wake(node as usize);
|
||||
}
|
||||
|
||||
/// The `node` timeouted while spinning
|
||||
pub fn trace_spin_timeout(node: *const rcl_node_t) {
|
||||
tp::spin_timeout(node as usize);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
include!(concat!(env!("OUT_DIR"), "/r2r_tracepoints.rs"));
|
||||
|
|
@ -0,0 +1,161 @@
|
|||
use crate::{tracepoint_fn, TracingId};
|
||||
use r2r_rcl::{rcl_node_t, rcl_service_t, rcl_subscription_t, rcl_timer_t};
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
use crate::tracetools_bindings as tp;
|
||||
#[cfg(feature = "tracing")]
|
||||
use std::{
|
||||
ffi::{c_void, CString},
|
||||
ptr::null,
|
||||
};
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
const fn ref_to_c_void<T>(t: &T) -> *const std::ffi::c_void {
|
||||
std::ptr::from_ref(t).cast()
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! c_void {
|
||||
($e:ident) => {
|
||||
($e) as *const std::ffi::c_void
|
||||
};
|
||||
}
|
||||
|
||||
// Documentation of tracepoints is based on https://github.com/ros2/ros2_tracing project.
|
||||
// From file `tracetools/include/tracetools/tracetools.h`
|
||||
|
||||
tracepoint_fn! {
|
||||
/// Message publication.
|
||||
///
|
||||
/// Records the pointer to the `message` being published at the `rclcpp`/`r2r` level.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_publish
|
||||
// Lint allow note: The message pointer is NOT dereferenced.
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub fn trace_publish(message: *const std::ffi::c_void) {
|
||||
unsafe {
|
||||
// first argument documentation:
|
||||
// publisher_handle not used, but kept for API/ABI stability
|
||||
tp::ros_trace_rclcpp_publish(null(), message);
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscription object initialization.
|
||||
///
|
||||
/// Tracepoint to allow associating the `subscription_handle` from `rcl` with the address of rust `subscription` reference.
|
||||
/// There can be more than 1 `subscription` for 1 `subscription_handle`.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_subscription_init`
|
||||
pub fn trace_subscription_init<S>(
|
||||
subscription_handle: *const rcl_subscription_t, subscription: &S,
|
||||
) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_subscription_init(
|
||||
subscription_handle.cast(),
|
||||
ref_to_c_void(subscription),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracepoint to allow associating the subscription callback identified by `callback_id` with the `subscription` object.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_subscription_callback_added`
|
||||
pub fn trace_subscription_callback_added(subscription: TracingId<std::ffi::c_void>, callback_id: usize) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_subscription_callback_added(
|
||||
subscription.c_void(),
|
||||
c_void!(callback_id),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Message taking.
|
||||
///
|
||||
/// Records the **reference** to the `message` being taken at the `rclcpp`/`r2r` level.
|
||||
///
|
||||
/// To trace messages pointed to by void pointer use [`trace_take_ptr`].
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_take`
|
||||
pub fn trace_take<M>(message: &M) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_take(ref_to_c_void(message));
|
||||
}
|
||||
}
|
||||
|
||||
/// Message taking.
|
||||
///
|
||||
/// Records the **void pointer** to the `message` being taken at the `rclcpp`/`r2r` level.
|
||||
///
|
||||
/// To trace messages by their reference use [`trace_take`].
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_take`
|
||||
// Lint allow note: The message pointer is NOT dereferenced.
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub fn trace_take_ptr(message: *const std::ffi::c_void) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_take(message);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracepoint to allow associating the service callback identified by `callback_id` with a `service`.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_service_callback_added`
|
||||
pub fn trace_service_callback_added(service: TracingId<rcl_service_t>, callback_id: usize) {
|
||||
unsafe { tp::ros_trace_rclcpp_service_callback_added(service.c_void(), c_void!(callback_id)) }
|
||||
}
|
||||
|
||||
/// Tracepoint to allow associating the timer callback identified by `callback_id` with its `rcl_timer_t` handle.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_timer_callback_added`
|
||||
pub fn trace_timer_callback_added(timer: TracingId<rcl_timer_t>, callback_id: usize) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_timer_callback_added(timer.c_void(), c_void!(callback_id));
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracepoint to allow associating the `timer` with a `node`.
|
||||
///
|
||||
/// Tracepoint: `ros2::rclcpp_timer_link_node`
|
||||
pub fn trace_timer_link_node(timer: TracingId<rcl_timer_t>, node: TracingId<rcl_node_t>) {
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_timer_link_node(timer.c_void(), node.c_void());
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracepoint to allow associating demangled `function_symbol` with a `callback_id`.
|
||||
///
|
||||
/// Allocates memory to store the function symbol as a `CString`.
|
||||
///
|
||||
/// Tracepoint: `ros2::callback_register`
|
||||
///
|
||||
/// # Panics
|
||||
/// If `function_symbol` contains a null byte.
|
||||
pub fn trace_callback_register(callback_id: usize, function_symbol: &str) {
|
||||
let function_symbol = CString::new(function_symbol)
|
||||
.expect("r2r tracing: Cannot convert function_symbol to CString. It contains null byte.");
|
||||
|
||||
unsafe {
|
||||
tp::ros_trace_rclcpp_callback_register(c_void!(callback_id), function_symbol.as_ptr());
|
||||
}
|
||||
}
|
||||
|
||||
/// Start of a callback
|
||||
///
|
||||
/// Set `is_intra_process` depending on whether this callback is done via intra-process or not
|
||||
///
|
||||
/// Tracepoint: `ros2::callback_start`
|
||||
pub fn trace_callback_start(callback_id: usize, is_intra_process: bool) {
|
||||
unsafe {
|
||||
tp::ros_trace_callback_start(c_void!(callback_id), is_intra_process);
|
||||
}
|
||||
}
|
||||
|
||||
/// End of a callback.
|
||||
///
|
||||
/// Tracepoint: `ros2::callback_end`
|
||||
pub fn trace_callback_end(callback_id: usize) {
|
||||
unsafe {
|
||||
tp::ros_trace_callback_end(c_void!(callback_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,136 @@
|
|||
use std::{
|
||||
ffi::c_void,
|
||||
future::{self, Future},
|
||||
};
|
||||
|
||||
use futures::{channel::mpsc::Receiver, stream::FusedStream, Stream, StreamExt as _};
|
||||
use r2r_rcl::rcl_service_t;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
use crate::Callback;
|
||||
use crate::TracingId;
|
||||
|
||||
/// A stream wrapper containing tracing data.
|
||||
///
|
||||
/// When the `tracing` feature is enabled, you can use the [`Self::traced_callback`] method
|
||||
/// to trace the execution of the callback. Stream polling is not traced.
|
||||
#[derive(Debug)]
|
||||
pub struct StreamWithTracingData<T> {
|
||||
stream: Receiver<T>,
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing_id: TracingIdWithType,
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum TracingIdWithType {
|
||||
// The type of subscription is `c_void` because the actual type would
|
||||
// be a generic R2R subscriber and not `rcl_subscription_t`.
|
||||
Subscription(TracingId<c_void>),
|
||||
Service(TracingId<rcl_service_t>),
|
||||
}
|
||||
|
||||
impl<T> StreamWithTracingData<T> {
|
||||
/// Converts the stream into a future that calls the provided callback for each message.
|
||||
///
|
||||
/// If `tracing` feature is enabled:
|
||||
/// - Each time the callback is called, its start and end time will be traced by the
|
||||
/// ROS 2 tracing framework.
|
||||
/// - You should not poll the stream before calling this method because
|
||||
/// otherwise it might confuse software that analyzes the trace.
|
||||
pub fn traced_callback<C>(self, callback: C) -> impl Future<Output = ()> + Unpin
|
||||
where
|
||||
C: FnMut(T),
|
||||
{
|
||||
#[cfg(feature = "tracing")]
|
||||
{
|
||||
let mut callback_wrapper = match self.tracing_id {
|
||||
TracingIdWithType::Subscription(id) => Callback::new_subscription(id, callback),
|
||||
TracingIdWithType::Service(id) => Callback::new_service(id, callback),
|
||||
};
|
||||
|
||||
self.stream.for_each(move |msg| {
|
||||
callback_wrapper.call(msg);
|
||||
future::ready(())
|
||||
})
|
||||
}
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
{
|
||||
let mut callback = callback;
|
||||
self.stream.for_each(move |msg| {
|
||||
callback(msg);
|
||||
future::ready(())
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for StreamWithTracingData<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
this.stream.poll_next_unpin(cx)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.stream.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FusedStream for StreamWithTracingData<T> {
|
||||
fn is_terminated(&self) -> bool {
|
||||
self.stream.is_terminated()
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder for `StreamWithTracingData`.
|
||||
///
|
||||
/// This struct exists to allow creation of `StreamWithTracingData` without polluting its public API.
|
||||
/// It is used internally by r2r and should not be reexported.
|
||||
pub struct StreamWithTracingDataBuilder;
|
||||
|
||||
impl StreamWithTracingDataBuilder {
|
||||
#[must_use]
|
||||
#[allow(
|
||||
clippy::not_unsafe_ptr_arg_deref,
|
||||
reason = "The pointer is not dereferenced"
|
||||
)]
|
||||
#[cfg_attr(
|
||||
not(feature = "tracing"),
|
||||
expect(
|
||||
unused_variables,
|
||||
reason = "service_id is not saved if tracing is disabled"
|
||||
)
|
||||
)]
|
||||
pub fn build_service<T>(
|
||||
stream: Receiver<T>, service_id: TracingId<rcl_service_t>,
|
||||
) -> StreamWithTracingData<T> {
|
||||
StreamWithTracingData {
|
||||
stream,
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing_id: TracingIdWithType::Service(service_id),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[cfg_attr(
|
||||
not(feature = "tracing"),
|
||||
expect(
|
||||
unused_variables,
|
||||
reason = "subscription_id is not saved if tracing is disabled"
|
||||
)
|
||||
)]
|
||||
pub fn build_subscription<T>(
|
||||
stream: Receiver<T>, subscription_id: TracingId<c_void>,
|
||||
) -> StreamWithTracingData<T> {
|
||||
StreamWithTracingData {
|
||||
stream,
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing_id: TracingIdWithType::Subscription(subscription_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
#![allow(non_upper_case_globals)]
|
||||
#![allow(non_camel_case_types)]
|
||||
#![allow(non_snake_case)]
|
||||
#![allow(unused)]
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/tracetools_bindings.rs"));
|
||||
|
|
@ -0,0 +1 @@
|
|||
#include <tracetools/tracetools.h>
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
/// Unique identifier for tracing purposes
|
||||
#[derive(Debug)]
|
||||
pub struct TracingId<T> {
|
||||
/// Pointer to the object used as a unique ID.
|
||||
/// Safety: Do NOT dereference the pointer.
|
||||
#[cfg(feature = "tracing")]
|
||||
id: *const T,
|
||||
|
||||
/// Marker for the type. Needed when `tracing` feature is disabled.
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> TracingId<T> {
|
||||
/// Creates new `TracingId` from the pointer.
|
||||
///
|
||||
/// # Safety
|
||||
/// The pointer is used as a unique ID so users must ensure that they never create `TracingId`
|
||||
/// with same address for different objects.
|
||||
///
|
||||
/// The pointer does not need to point to valid memory.
|
||||
pub const unsafe fn new(_id: *const T) -> Self {
|
||||
Self {
|
||||
#[cfg(feature = "tracing")]
|
||||
id: _id,
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Erase the generic type of the ID.
|
||||
#[must_use]
|
||||
pub fn forget_type(self) -> TracingId<std::ffi::c_void> {
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
unsafe {
|
||||
// Safety: The ID cannot be obtained back without the `tracing` feature.
|
||||
TracingId::new(std::ptr::null())
|
||||
}
|
||||
#[cfg(feature = "tracing")]
|
||||
unsafe {
|
||||
// Safety: self contains valid ID.
|
||||
TracingId::new(self.c_void())
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain the address representing the ID.
|
||||
///
|
||||
/// # Safety
|
||||
/// Do NOT dereference the pointer.
|
||||
#[cfg(feature = "tracing")]
|
||||
pub(crate) const unsafe fn c_void(self) -> *const std::ffi::c_void {
|
||||
self.id.cast::<std::ffi::c_void>()
|
||||
}
|
||||
}
|
||||
|
||||
/// Deriving Clone for `TracingId` would only derive it only conditionally based on whether the
|
||||
/// `T` is `Clone` or not. But TracingId is independent of T.
|
||||
impl<T> Clone for TracingId<T> {
|
||||
fn clone(&self) -> Self {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
/// Deriving Copy for `TracingId` would only derive it only conditionally based on whether the
|
||||
/// `T` is `Copy` or not. But TracingId is independent of T.
|
||||
impl<T> Copy for TracingId<T> {}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// The address is never dereferenced and is used only as a unique ID so it is safe to send to another thread.
|
||||
unsafe impl<T> Send for TracingId<T> {}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// The `TracingId` does not allow interior mutability because the pointer is never dereferenced.
|
||||
/// It is safe to use across multiple threads.
|
||||
unsafe impl<T> Sync for TracingId<T> {}
|
||||
Loading…
Reference in New Issue