diff --git a/examples/parameters.rs b/examples/parameters.rs index 1ce489e..f7bccbc 100644 --- a/examples/parameters.rs +++ b/examples/parameters.rs @@ -1,4 +1,5 @@ use futures::executor::LocalPool; +use futures::prelude::*; use futures::task::LocalSpawnExt; use r2r; @@ -17,14 +18,23 @@ fn main() -> Result<(), Box> { // set up ros node let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "to_be_replaced", "to_be_replaced")?; - let mut timer = node.create_wall_timer(std::time::Duration::from_millis(2000))?; // make a parameter handler (once per node). // the parameter handler is optional, only spawn one if you need it. - let paramater_handler = node.make_parameter_handler()?; + let (paramater_handler, parameter_events) = node.make_parameter_handler()?; // run parameter handler on your executor. spawner.spawn_local(paramater_handler)?; + // parameter event stream. just print them + spawner.spawn_local(async move { + parameter_events + .for_each(|(param_name, param_val)| { + println!("parameter event: {} is now {:?}", param_name, param_val); + future::ready(()) + }) + .await + })?; + println!("node name: {}", node.name()?); println!( "node fully qualified name: {}", @@ -32,7 +42,8 @@ fn main() -> Result<(), Box> { ); println!("node namespace: {}", node.namespace()?); - // print all params every 2 seconds. + // print all params every 5 seconds. + let mut timer = node.create_wall_timer(std::time::Duration::from_secs(5))?; let params = node.params.clone(); spawner.spawn_local(async move { loop { diff --git a/src/lib.rs b/src/lib.rs index 126471a..d78dcc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,12 +50,12 @@ use clients::*; pub use clients::{Client, UntypedClient}; mod action_clients; -pub use action_clients::{ActionClient, ClientGoal, GoalStatus}; use action_clients::*; +pub use action_clients::{ActionClient, ClientGoal, GoalStatus}; mod action_clients_untyped; -pub use action_clients_untyped::{ActionClientUntyped, ClientGoalUntyped}; use action_clients_untyped::*; +pub use action_clients_untyped::{ActionClientUntyped, ClientGoalUntyped}; mod action_servers; use action_servers::*; diff --git a/src/nodes.rs b/src/nodes.rs index 697b8b0..0795f6f 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -160,61 +160,93 @@ impl Node { } } - /// Returns a future which handles any parameter change requests. Spawn this onto the executor of choice. - pub fn make_parameter_handler(&mut self) -> Result> { + /// Returns a tuple (parameter_handler_future, parameter_event_stream). + /// The user should spawn the parameter_handler_future onto the executor of choice. + /// The "event stream" includes the name of the parameter which was updated as well as its value. + pub fn make_parameter_handler( + &mut self, + ) -> Result<( + impl Future, + impl Stream, + )> { let mut handlers: Vec>>> = Vec::new(); + let (mut event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10); let node_name = self.name()?; - let set_params_request_stream = self.create_service::( - &format!("{}/set_parameters", node_name))?; + let set_params_request_stream = self + .create_service::(&format!( + "{}/set_parameters", + node_name + ))?; let params = self.params.clone(); - let set_params_future = set_params_request_stream - .for_each(move |req: ServiceRequest| { + let set_params_future = set_params_request_stream.for_each( + move |req: ServiceRequest| { let mut result = rcl_interfaces::srv::SetParameters::Response::default(); for p in &req.message.parameters { let val = ParameterValue::from_parameter_value_msg(p.value.clone()); - params.lock().unwrap().insert(p.name.clone(), val); + let changed = params + .lock() + .unwrap() + .get(&p.name) + .and_then(|v| Some(v != &val)) + .unwrap_or(true); // changed=true if new + params.lock().unwrap().insert(p.name.clone(), val.clone()); let r = rcl_interfaces::msg::SetParametersResult { successful: true, reason: "".into(), }; result.results.push(r); + // if the value changed, send out new value on parameter event stream + if changed { + match event_tx.try_send((p.name.clone(), val)) { + Err(e) => { + println!("Warning: could not send parameter event ({}).", e); + } + _ => {} // ok + } + } } - req.respond(result).expect("could not send reply to set parameter request"); + req.respond(result) + .expect("could not send reply to set parameter request"); future::ready(()) - }); + }, + ); handlers.push(Box::pin(set_params_future)); // rcl_interfaces/srv/GetParameters - let get_params_request_stream = self.create_service::( - &format!("{}/get_parameters", node_name))?; + let get_params_request_stream = self + .create_service::(&format!( + "{}/get_parameters", + node_name + ))?; let params = self.params.clone(); - let get_params_future = get_params_request_stream - .for_each(move |req: ServiceRequest| { + let get_params_future = get_params_request_stream.for_each( + move |req: ServiceRequest| { let params = params.lock().unwrap(); - let values = req.message.names.iter() - .map(|n| { - match params.get(n) { - Some(v) => v.clone(), - None => ParameterValue::NotSet - } + let values = req + .message + .names + .iter() + .map(|n| match params.get(n) { + Some(v) => v.clone(), + None => ParameterValue::NotSet, }) .map(|v| v.clone().to_parameter_value_msg()) .collect::>(); - let result = rcl_interfaces::srv::GetParameters::Response { - values - }; - req.respond(result).expect("could not send reply to set parameter request"); + let result = rcl_interfaces::srv::GetParameters::Response { values }; + req.respond(result) + .expect("could not send reply to set parameter request"); future::ready(()) - }); + }, + ); handlers.push(Box::pin(get_params_future)); // we don't care about the result, the futures will not complete anyway. - Ok(join_all(handlers).map(|_| ())) + Ok((join_all(handlers).map(|_| ()), event_rx)) } pub fn subscribe(&mut self, topic: &str) -> Result + Unpin> diff --git a/src/parameters.rs b/src/parameters.rs index 5c6e8dc..0cf9b81 100644 --- a/src/parameters.rs +++ b/src/parameters.rs @@ -98,45 +98,44 @@ impl ParameterValue { match self { ParameterValue::NotSet => { ret.type_ = 0; // uint8 PARAMETER_NOT_SET=0 - }, + } ParameterValue::Bool(b) => { ret.type_ = 1; // uint8 PARAMETER_BOOL=1 ret.bool_value = b; - }, + } ParameterValue::Integer(i) => { ret.type_ = 2; // uint8 PARAMETER_INTEGER=2 ret.integer_value = i; - }, + } ParameterValue::Double(d) => { ret.type_ = 3; // uint8 PARAMETER_DOUBLE=3 ret.double_value = d; - }, + } ParameterValue::String(s) => { ret.type_ = 4; // uint8 PARAMETER_STRING=4 ret.string_value = s; - }, + } ParameterValue::ByteArray(ba) => { ret.type_ = 5; // uint8 PARAMETER_BYTE_ARRAY=5 ret.byte_array_value = ba; - }, + } ParameterValue::BoolArray(ba) => { - ret.type_ = 6; // uint8 PARAMETER_BOOL_ARRAY=6 + ret.type_ = 6; // uint8 PARAMETER_BOOL_ARRAY=6 ret.bool_array_value = ba; - }, + } ParameterValue::IntegerArray(ia) => { - ret.type_ = 7; // uint8 PARAMETER_INTEGER_ARRAY=7 + ret.type_ = 7; // uint8 PARAMETER_INTEGER_ARRAY=7 ret.integer_array_value = ia; - }, + } ParameterValue::DoubleArray(da) => { - ret.type_ = 8; // uint8 PARAMETER_DOUBLE_ARRAY=8 + ret.type_ = 8; // uint8 PARAMETER_DOUBLE_ARRAY=8 ret.double_array_value = da; - }, + } ParameterValue::StringArray(sa) => { - ret.type_ = 9; // int PARAMETER_STRING_ARRAY=9 + ret.type_ = 9; // int PARAMETER_STRING_ARRAY=9 ret.string_array_value = sa; - }, + } } ret } - }