Add stream of parameter events

This commit is contained in:
Martin Dahl 2021-08-29 19:35:01 +02:00
parent b24af7f038
commit 45cfbe63b1
4 changed files with 87 additions and 45 deletions

View File

@ -1,4 +1,5 @@
use futures::executor::LocalPool; use futures::executor::LocalPool;
use futures::prelude::*;
use futures::task::LocalSpawnExt; use futures::task::LocalSpawnExt;
use r2r; use r2r;
@ -17,14 +18,23 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// set up ros node // set up ros node
let ctx = r2r::Context::create()?; let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "to_be_replaced", "to_be_replaced")?; let mut node = r2r::Node::create(ctx, "to_be_replaced", "to_be_replaced")?;
let mut timer = node.create_wall_timer(std::time::Duration::from_millis(2000))?;
// make a parameter handler (once per node). // make a parameter handler (once per node).
// the parameter handler is optional, only spawn one if you need it. // the parameter handler is optional, only spawn one if you need it.
let paramater_handler = node.make_parameter_handler()?; let (paramater_handler, parameter_events) = node.make_parameter_handler()?;
// run parameter handler on your executor. // run parameter handler on your executor.
spawner.spawn_local(paramater_handler)?; spawner.spawn_local(paramater_handler)?;
// parameter event stream. just print them
spawner.spawn_local(async move {
parameter_events
.for_each(|(param_name, param_val)| {
println!("parameter event: {} is now {:?}", param_name, param_val);
future::ready(())
})
.await
})?;
println!("node name: {}", node.name()?); println!("node name: {}", node.name()?);
println!( println!(
"node fully qualified name: {}", "node fully qualified name: {}",
@ -32,7 +42,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
println!("node namespace: {}", node.namespace()?); println!("node namespace: {}", node.namespace()?);
// print all params every 2 seconds. // print all params every 5 seconds.
let mut timer = node.create_wall_timer(std::time::Duration::from_secs(5))?;
let params = node.params.clone(); let params = node.params.clone();
spawner.spawn_local(async move { spawner.spawn_local(async move {
loop { loop {

View File

@ -50,12 +50,12 @@ use clients::*;
pub use clients::{Client, UntypedClient}; pub use clients::{Client, UntypedClient};
mod action_clients; mod action_clients;
pub use action_clients::{ActionClient, ClientGoal, GoalStatus};
use action_clients::*; use action_clients::*;
pub use action_clients::{ActionClient, ClientGoal, GoalStatus};
mod action_clients_untyped; mod action_clients_untyped;
pub use action_clients_untyped::{ActionClientUntyped, ClientGoalUntyped};
use action_clients_untyped::*; use action_clients_untyped::*;
pub use action_clients_untyped::{ActionClientUntyped, ClientGoalUntyped};
mod action_servers; mod action_servers;
use action_servers::*; use action_servers::*;

View File

@ -160,61 +160,93 @@ impl Node {
} }
} }
/// Returns a future which handles any parameter change requests. Spawn this onto the executor of choice. /// Returns a tuple (parameter_handler_future, parameter_event_stream).
pub fn make_parameter_handler(&mut self) -> Result<impl Future<Output = ()>> { /// The user should spawn the parameter_handler_future onto the executor of choice.
/// The "event stream" includes the name of the parameter which was updated as well as its value.
pub fn make_parameter_handler(
&mut self,
) -> Result<(
impl Future<Output = ()>,
impl Stream<Item = (String, ParameterValue)>,
)> {
let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()>>>> = Vec::new(); let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()>>>> = Vec::new();
let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
let node_name = self.name()?; let node_name = self.name()?;
let set_params_request_stream = self.create_service::<rcl_interfaces::srv::SetParameters::Service>( let set_params_request_stream = self
&format!("{}/set_parameters", node_name))?; .create_service::<rcl_interfaces::srv::SetParameters::Service>(&format!(
"{}/set_parameters",
node_name
))?;
let params = self.params.clone(); let params = self.params.clone();
let set_params_future = set_params_request_stream let set_params_future = set_params_request_stream.for_each(
.for_each(move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| { move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
let mut result = rcl_interfaces::srv::SetParameters::Response::default(); let mut result = rcl_interfaces::srv::SetParameters::Response::default();
for p in &req.message.parameters { for p in &req.message.parameters {
let val = ParameterValue::from_parameter_value_msg(p.value.clone()); let val = ParameterValue::from_parameter_value_msg(p.value.clone());
params.lock().unwrap().insert(p.name.clone(), val); let changed = params
.lock()
.unwrap()
.get(&p.name)
.and_then(|v| Some(v != &val))
.unwrap_or(true); // changed=true if new
params.lock().unwrap().insert(p.name.clone(), val.clone());
let r = rcl_interfaces::msg::SetParametersResult { let r = rcl_interfaces::msg::SetParametersResult {
successful: true, successful: true,
reason: "".into(), reason: "".into(),
}; };
result.results.push(r); result.results.push(r);
// if the value changed, send out new value on parameter event stream
if changed {
match event_tx.try_send((p.name.clone(), val)) {
Err(e) => {
println!("Warning: could not send parameter event ({}).", e);
}
_ => {} // ok
}
}
} }
req.respond(result).expect("could not send reply to set parameter request"); req.respond(result)
.expect("could not send reply to set parameter request");
future::ready(()) future::ready(())
}); },
);
handlers.push(Box::pin(set_params_future)); handlers.push(Box::pin(set_params_future));
// rcl_interfaces/srv/GetParameters // rcl_interfaces/srv/GetParameters
let get_params_request_stream = self.create_service::<rcl_interfaces::srv::GetParameters::Service>( let get_params_request_stream = self
&format!("{}/get_parameters", node_name))?; .create_service::<rcl_interfaces::srv::GetParameters::Service>(&format!(
"{}/get_parameters",
node_name
))?;
let params = self.params.clone(); let params = self.params.clone();
let get_params_future = get_params_request_stream let get_params_future = get_params_request_stream.for_each(
.for_each(move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| { move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
let params = params.lock().unwrap(); let params = params.lock().unwrap();
let values = req.message.names.iter() let values = req
.map(|n| { .message
match params.get(n) { .names
Some(v) => v.clone(), .iter()
None => ParameterValue::NotSet .map(|n| match params.get(n) {
} Some(v) => v.clone(),
None => ParameterValue::NotSet,
}) })
.map(|v| v.clone().to_parameter_value_msg()) .map(|v| v.clone().to_parameter_value_msg())
.collect::<Vec<rcl_interfaces::msg::ParameterValue>>(); .collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
let result = rcl_interfaces::srv::GetParameters::Response { let result = rcl_interfaces::srv::GetParameters::Response { values };
values req.respond(result)
}; .expect("could not send reply to set parameter request");
req.respond(result).expect("could not send reply to set parameter request");
future::ready(()) future::ready(())
}); },
);
handlers.push(Box::pin(get_params_future)); handlers.push(Box::pin(get_params_future));
// we don't care about the result, the futures will not complete anyway. // we don't care about the result, the futures will not complete anyway.
Ok(join_all(handlers).map(|_| ())) Ok((join_all(handlers).map(|_| ()), event_rx))
} }
pub fn subscribe<T: 'static>(&mut self, topic: &str) -> Result<impl Stream<Item = T> + Unpin> pub fn subscribe<T: 'static>(&mut self, topic: &str) -> Result<impl Stream<Item = T> + Unpin>

View File

@ -98,45 +98,44 @@ impl ParameterValue {
match self { match self {
ParameterValue::NotSet => { ParameterValue::NotSet => {
ret.type_ = 0; // uint8 PARAMETER_NOT_SET=0 ret.type_ = 0; // uint8 PARAMETER_NOT_SET=0
}, }
ParameterValue::Bool(b) => { ParameterValue::Bool(b) => {
ret.type_ = 1; // uint8 PARAMETER_BOOL=1 ret.type_ = 1; // uint8 PARAMETER_BOOL=1
ret.bool_value = b; ret.bool_value = b;
}, }
ParameterValue::Integer(i) => { ParameterValue::Integer(i) => {
ret.type_ = 2; // uint8 PARAMETER_INTEGER=2 ret.type_ = 2; // uint8 PARAMETER_INTEGER=2
ret.integer_value = i; ret.integer_value = i;
}, }
ParameterValue::Double(d) => { ParameterValue::Double(d) => {
ret.type_ = 3; // uint8 PARAMETER_DOUBLE=3 ret.type_ = 3; // uint8 PARAMETER_DOUBLE=3
ret.double_value = d; ret.double_value = d;
}, }
ParameterValue::String(s) => { ParameterValue::String(s) => {
ret.type_ = 4; // uint8 PARAMETER_STRING=4 ret.type_ = 4; // uint8 PARAMETER_STRING=4
ret.string_value = s; ret.string_value = s;
}, }
ParameterValue::ByteArray(ba) => { ParameterValue::ByteArray(ba) => {
ret.type_ = 5; // uint8 PARAMETER_BYTE_ARRAY=5 ret.type_ = 5; // uint8 PARAMETER_BYTE_ARRAY=5
ret.byte_array_value = ba; ret.byte_array_value = ba;
}, }
ParameterValue::BoolArray(ba) => { ParameterValue::BoolArray(ba) => {
ret.type_ = 6; // uint8 PARAMETER_BOOL_ARRAY=6 ret.type_ = 6; // uint8 PARAMETER_BOOL_ARRAY=6
ret.bool_array_value = ba; ret.bool_array_value = ba;
}, }
ParameterValue::IntegerArray(ia) => { ParameterValue::IntegerArray(ia) => {
ret.type_ = 7; // uint8 PARAMETER_INTEGER_ARRAY=7 ret.type_ = 7; // uint8 PARAMETER_INTEGER_ARRAY=7
ret.integer_array_value = ia; ret.integer_array_value = ia;
}, }
ParameterValue::DoubleArray(da) => { ParameterValue::DoubleArray(da) => {
ret.type_ = 8; // uint8 PARAMETER_DOUBLE_ARRAY=8 ret.type_ = 8; // uint8 PARAMETER_DOUBLE_ARRAY=8
ret.double_array_value = da; ret.double_array_value = da;
}, }
ParameterValue::StringArray(sa) => { ParameterValue::StringArray(sa) => {
ret.type_ = 9; // int PARAMETER_STRING_ARRAY=9 ret.type_ = 9; // int PARAMETER_STRING_ARRAY=9
ret.string_array_value = sa; ret.string_array_value = sa;
}, }
} }
ret ret
} }
} }