diff --git a/build.rs b/build.rs index 9dd324a..d84902b 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,3 @@ -use r2r_common; -use r2r_msg_gen; use std::env; use std::fs::File; use std::io::Write; @@ -8,13 +6,13 @@ use std::path::{Path, PathBuf}; fn main() { r2r_common::print_cargo_watches(); - let msg_list = if let Some(cmake_includes) = env::var("CMAKE_INCLUDE_DIRS").ok() { + let msg_list = if let Ok(cmake_includes) = env::var("CMAKE_INCLUDE_DIRS") { let packages = cmake_includes - .split(":") + .split(':') .flat_map(|i| Path::new(i).parent()) .collect::>(); - let deps = env::var("CMAKE_IDL_PACKAGES").unwrap_or(String::default()); - let deps = deps.split(":").collect::>(); + let deps = env::var("CMAKE_IDL_PACKAGES").unwrap_or_default(); + let deps = deps.split(':').collect::>(); let msgs = r2r_common::get_ros_msgs(&packages); r2r_common::parse_msgs(&msgs) .into_iter() @@ -23,8 +21,8 @@ fn main() { } else { let ament_prefix_var = env::var("AMENT_PREFIX_PATH").expect("Source your ROS!"); let paths = ament_prefix_var - .split(":") - .map(|i| Path::new(i)) + .split(':') + .map(Path::new) .collect::>(); let msgs = r2r_common::get_ros_msgs(&paths); r2r_common::parse_msgs(&msgs) @@ -65,11 +63,15 @@ fn main() { codegen.push_str(" use super::super::super::super::*;\n"); let srvname = format!("{}_{}", msg, srv); - codegen.push_str(&r2r_msg_gen::generate_rust_service(module, prefix, &srvname)); + codegen.push_str(&r2r_msg_gen::generate_rust_service( + module, prefix, &srvname, + )); for s in &["Request", "Response"] { let msgname = format!("{}_{}_{}", msg, srv, s); - codegen.push_str(&r2r_msg_gen::generate_rust_msg(module, prefix, &msgname)); + codegen.push_str(&r2r_msg_gen::generate_rust_msg( + module, prefix, &msgname, + )); } codegen.push_str(" }\n"); } diff --git a/examples/action_client.rs b/examples/action_client.rs index c54e800..b9d06cc 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -2,7 +2,7 @@ use futures::executor::LocalPool; use futures::future::FutureExt; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; -use r2r; + use r2r::example_interfaces::action::Fibonacci; use std::sync::{Arc, Mutex}; diff --git a/examples/action_client_untyped.rs b/examples/action_client_untyped.rs index ccc5fe1..198a342 100644 --- a/examples/action_client_untyped.rs +++ b/examples/action_client_untyped.rs @@ -5,7 +5,7 @@ use futures::executor::LocalPool; use futures::future::FutureExt; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; -use r2r; + use std::sync::{Arc, Mutex}; fn main() -> Result<(), Box> { diff --git a/examples/action_server.rs b/examples/action_server.rs index ea4ba4d..358dc56 100644 --- a/examples/action_server.rs +++ b/examples/action_server.rs @@ -2,7 +2,7 @@ use futures::executor::{LocalPool, LocalSpawner}; use futures::future::{self, Either}; use futures::stream::{Stream, StreamExt}; use futures::task::LocalSpawnExt; -use r2r; + use r2r::example_interfaces::action::Fibonacci; use std::sync::{Arc, Mutex}; diff --git a/examples/client.rs b/examples/client.rs index f4d9019..d3259d2 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,7 +1,7 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; use futures::Future; -use r2r; + use std::io::Write; use r2r::example_interfaces::srv::AddTwoInts; diff --git a/examples/clock.rs b/examples/clock.rs index 3652132..a77435d 100644 --- a/examples/clock.rs +++ b/examples/clock.rs @@ -1,5 +1,3 @@ -use r2r; - fn main() -> Result<(), Box> { { let mut clock = r2r::Clock::create(r2r::ClockType::RosTime)?; diff --git a/examples/logging.rs b/examples/logging.rs index 64f15fe..be73db1 100644 --- a/examples/logging.rs +++ b/examples/logging.rs @@ -1,5 +1,3 @@ -use r2r; - /// try to run like this /// cargo run --example logging -- --ros-args --log-level DEBUG /// The logs produced with the node logger should show up in /rosout diff --git a/examples/parameters.rs b/examples/parameters.rs index f7bccbc..1fa5e20 100644 --- a/examples/parameters.rs +++ b/examples/parameters.rs @@ -1,7 +1,6 @@ use futures::executor::LocalPool; use futures::prelude::*; use futures::task::LocalSpawnExt; -use r2r; // try to run like this // cargo run --example parameters -- --ros-args -p key1:=[hello,world] -p key2:=5.5 -r __ns:=/demo -r __node:=my_node diff --git a/examples/publishers.rs b/examples/publishers.rs index 338600f..0803a47 100644 --- a/examples/publishers.rs +++ b/examples/publishers.rs @@ -1,4 +1,3 @@ -use r2r; use r2r::builtin_interfaces::msg::Duration; use r2r::std_msgs::msg::Int32; use r2r::trajectory_msgs::msg::*; diff --git a/examples/rostopic_echo.rs b/examples/rostopic_echo.rs index 199fa42..ae73d89 100644 --- a/examples/rostopic_echo.rs +++ b/examples/rostopic_echo.rs @@ -2,7 +2,7 @@ use futures::executor::LocalPool; use futures::future; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; -use r2r; + use std::collections::HashMap; use std::env; use std::thread; diff --git a/examples/rostopic_list.rs b/examples/rostopic_list.rs index afa3e01..6b2940e 100644 --- a/examples/rostopic_list.rs +++ b/examples/rostopic_list.rs @@ -1,4 +1,3 @@ -use r2r; use std::thread; use std::time::Duration; diff --git a/examples/service.rs b/examples/service.rs index 444dce5..7f72ad7 100644 --- a/examples/service.rs +++ b/examples/service.rs @@ -4,7 +4,6 @@ use futures::stream::StreamExt; use futures::task::LocalSpawnExt; use futures::FutureExt; -use r2r; use r2r::example_interfaces::srv::AddTwoInts; /// diff --git a/examples/subscriber.rs b/examples/subscriber.rs index fbd9a87..a186996 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -2,7 +2,6 @@ use futures::executor::LocalPool; use futures::future; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; -use r2r; fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; diff --git a/examples/tokio.rs b/examples/tokio.rs index 4024fe8..b9cf5e7 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -1,6 +1,6 @@ use futures::future; use futures::stream::StreamExt; -use r2r; + use std::sync::{Arc, Mutex}; use tokio::task; @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { let mut timer = node .create_wall_timer(std::time::Duration::from_millis(2500)) .unwrap(); - let state_t2 = state.clone(); + let state_t2 = state; task::spawn(async move { loop { let time_passed = timer.tick().await.unwrap(); diff --git a/examples/tokio_client.rs b/examples/tokio_client.rs index 2174c02..f24c48f 100644 --- a/examples/tokio_client.rs +++ b/examples/tokio_client.rs @@ -1,7 +1,3 @@ -use r2r; - - - #[tokio::main] async fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; @@ -21,7 +17,7 @@ async fn main() -> Result<(), Box> { waiting.await?; println!("service available."); for i in 1..10 { - let req = AddTwoInts::Request { a: i , b: 5 }; + let req = AddTwoInts::Request { a: i, b: 5 }; if let Ok(resp) = client.request(&req)?.await { println!("{}", resp.sum); } diff --git a/examples/tokio_examples.rs b/examples/tokio_examples.rs index 99cfba8..9b1b8a8 100644 --- a/examples/tokio_examples.rs +++ b/examples/tokio_examples.rs @@ -2,11 +2,9 @@ use std::sync::{Arc, Mutex}; use futures::future; use futures::stream::StreamExt; -use r2r; + use tokio::task; - - #[tokio::main] async fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; @@ -14,28 +12,23 @@ async fn main() -> Result<(), Box> { let arc_node = Arc::new(Mutex::new(node)); let an = arc_node.clone(); - task::spawn(async move { - subscriber(an).await.unwrap() - }); - - let an = arc_node.clone(); - task::spawn(async move { - publisher(an).await.unwrap() - }); + task::spawn(async move { subscriber(an).await.unwrap() }); let an = arc_node.clone(); - task::spawn(async move { - client(an).await.unwrap() - }); + task::spawn(async move { publisher(an).await.unwrap() }); let an = arc_node.clone(); - task::spawn(async move { - service(an).await.unwrap() - }); + task::spawn(async move { client(an).await.unwrap() }); + + let an = arc_node.clone(); + task::spawn(async move { service(an).await.unwrap() }); let handle = tokio::task::spawn_blocking(move || loop { { - arc_node.lock().unwrap().spin_once(std::time::Duration::from_millis(10)); + arc_node + .lock() + .unwrap() + .spin_once(std::time::Duration::from_millis(10)); } std::thread::sleep(std::time::Duration::from_millis(100)) }); @@ -45,13 +38,16 @@ async fn main() -> Result<(), Box> { Ok(()) } - async fn subscriber(arc_node: Arc>) -> Result<(), r2r::Error> { - let sub = arc_node.lock().unwrap().subscribe::("/topic")?; + let sub = arc_node + .lock() + .unwrap() + .subscribe::("/topic")?; sub.for_each(|msg| { println!("topic: new msg: {}", msg.data); future::ready(()) - }).await; + }) + .await; Ok(()) } @@ -65,9 +61,11 @@ async fn publisher(arc_node: Arc>) -> Result<(), r2r::Error> { }; for _ in 1..10 { timer.tick().await?; - let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() }; + let msg = r2r::std_msgs::msg::String { + data: "hello from r2r".to_string(), + }; publisher.publish(&msg)?; - }; + } Ok(()) } @@ -85,12 +83,12 @@ async fn client(arc_node: Arc>) -> Result<(), r2r::Error> { service_available.await?; println!("service available."); for i in 1..10 { - let req = AddTwoInts::Request { a: i , b: 5 }; + let req = AddTwoInts::Request { a: i, b: 5 }; if let Ok(resp) = client.request(&req).unwrap().await { println!("{}", resp.sum); } timer.tick().await?; - }; + } Ok(()) } @@ -111,7 +109,6 @@ async fn service(arc_node: Arc>) -> Result<(), r2r::Error> { } None => break, } - } + } Ok(()) } - diff --git a/examples/tokio_publisher.rs b/examples/tokio_publisher.rs index d0fc94f..ea4171c 100644 --- a/examples/tokio_publisher.rs +++ b/examples/tokio_publisher.rs @@ -1,27 +1,24 @@ -use r2r; - - - #[tokio::main] async fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; let duration = std::time::Duration::from_millis(2500); - + let mut timer = node.create_wall_timer(duration)?; let publisher = node.create_publisher::("/topic")?; let handle = tokio::task::spawn_blocking(move || loop { node.spin_once(std::time::Duration::from_millis(100)); }); - + for _ in 1..10 { timer.tick().await?; - let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() }; + let msg = r2r::std_msgs::msg::String { + data: "hello from r2r".to_string(), + }; publisher.publish(&msg)?; } - handle.await?; Ok(()) } diff --git a/examples/tokio_service.rs b/examples/tokio_service.rs index a8b9163..6421170 100644 --- a/examples/tokio_service.rs +++ b/examples/tokio_service.rs @@ -1,8 +1,5 @@ -use r2r; use futures::stream::StreamExt; - - #[tokio::main] async fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; @@ -26,7 +23,7 @@ async fn main() -> Result<(), Box> { None => break, } } - + handle.await?; Ok(()) diff --git a/examples/tokio_subscriber.rs b/examples/tokio_subscriber.rs index 3b25da0..6e30e81 100644 --- a/examples/tokio_subscriber.rs +++ b/examples/tokio_subscriber.rs @@ -1,15 +1,13 @@ use futures::future; use futures::stream::StreamExt; -use r2r; - - +use r2r::QosProfile; #[tokio::main] async fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; - - let sub = node.subscribe::("/topic")?; + + let sub = node.subscribe::("/topic", QosProfile::default())?; let handle = tokio::task::spawn_blocking(move || loop { node.spin_once(std::time::Duration::from_millis(100)); @@ -18,9 +16,10 @@ async fn main() -> Result<(), Box> { sub.for_each(|msg| { println!("topic: new msg: {}", msg.data); future::ready(()) - }).await; + }) + .await; handle.await?; - + Ok(()) } diff --git a/examples/untyped_client.rs b/examples/untyped_client.rs index 0c821ad..01ce621 100644 --- a/examples/untyped_client.rs +++ b/examples/untyped_client.rs @@ -1,7 +1,6 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; use futures::Future; -use r2r; async fn requester_task( node_available: impl Future>, diff --git a/examples/wall_timer.rs b/examples/wall_timer.rs index b6a0023..1358f34 100644 --- a/examples/wall_timer.rs +++ b/examples/wall_timer.rs @@ -1,6 +1,6 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; -use r2r; + use std::cell::RefCell; use std::rc::Rc; diff --git a/r2r_rcl/build.rs b/r2r_rcl/build.rs index db115e0..6b3bd88 100644 --- a/r2r_rcl/build.rs +++ b/r2r_rcl/build.rs @@ -1,7 +1,7 @@ extern crate bindgen; -use r2r_common; use itertools::Itertools; +use r2r_common; use std::env; use std::path::{Path, PathBuf}; diff --git a/r2r_rcl/src/lib.rs b/r2r_rcl/src/lib.rs index a653c84..72d9c9d 100644 --- a/r2r_rcl/src/lib.rs +++ b/r2r_rcl/src/lib.rs @@ -14,23 +14,6 @@ impl Default for rmw_message_info_t { } } -impl Default for rmw_qos_profile_t { - fn default() -> Self { - let mut profile: rmw_qos_profile_t = unsafe { std::mem::zeroed() }; - profile.history = rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT; - profile.depth = 10; - profile.reliability = - rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT; - profile.durability = rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT; - profile.avoid_ros_namespace_conventions = false; - profile.deadline = rmw_time_t { sec: 0, nsec: 0 }; - profile.lifespan = rmw_time_t { sec: 0, nsec: 0 }; - profile.liveliness = rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT; - profile.liveliness_lease_duration = rmw_time_t { sec: 0, nsec: 0 }; - profile - } -} - // special treatment to convert to/from rust strings. // ros strings are owned by ros, assignment is a copy impl rosidl_runtime_c__String { diff --git a/src/action_clients.rs b/src/action_clients.rs index 66c43dc..693f8c2 100644 --- a/src/action_clients.rs +++ b/src/action_clients.rs @@ -2,21 +2,17 @@ use futures::channel::{mpsc, oneshot}; use futures::future::{FutureExt, TryFutureExt}; use futures::stream::Stream; use std::collections::HashMap; -use std::future::Future; -use std::sync::{Mutex, Weak}; -use std::mem::MaybeUninit; use std::ffi::CString; +use std::future::Future; +use std::mem::MaybeUninit; +use std::sync::{Mutex, Weak}; -use crate::error::*; use crate::action_common::*; +use crate::error::*; +use crate::msg_types::generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs}; use crate::msg_types::*; -use crate::msg_types::generated_msgs::{ - unique_identifier_msgs, - action_msgs, - builtin_interfaces, -}; -use r2r_rcl::*; use r2r_actions::*; +use r2r_rcl::*; unsafe impl Send for ActionClient where T: WrappedActionTypeSupport {} diff --git a/src/action_clients_untyped.rs b/src/action_clients_untyped.rs index b455d08..9c7da32 100644 --- a/src/action_clients_untyped.rs +++ b/src/action_clients_untyped.rs @@ -3,20 +3,16 @@ use futures::future::{FutureExt, TryFutureExt}; use futures::stream::Stream; use std::collections::HashMap; use std::future::Future; -use std::sync::{Mutex, Weak}; use std::mem::MaybeUninit; +use std::sync::{Mutex, Weak}; -use crate::error::*; -use crate::action_common::*; -use crate::msg_types::*; use crate::action_clients::*; -use crate::msg_types::generated_msgs::{ - unique_identifier_msgs, - action_msgs, - builtin_interfaces, -}; -use r2r_rcl::*; +use crate::action_common::*; +use crate::error::*; +use crate::msg_types::generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs}; +use crate::msg_types::*; use r2r_actions::*; +use r2r_rcl::*; // // TODO: refactor this to separate out shared code between typed action client and this. // diff --git a/src/action_servers.rs b/src/action_servers.rs index 154e330..935f4f6 100644 --- a/src/action_servers.rs +++ b/src/action_servers.rs @@ -8,16 +8,12 @@ use std::ffi::CString; use std::mem::MaybeUninit; use std::sync::{Arc, Mutex, Weak}; -use crate::error::*; use crate::action_common::*; +use crate::error::*; +use crate::msg_types::generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs}; use crate::msg_types::*; -use crate::msg_types::generated_msgs::{ - unique_identifier_msgs, - action_msgs, - builtin_interfaces, -}; -use r2r_rcl::*; use r2r_actions::*; +use r2r_rcl::*; pub trait ActionServer_ { fn handle(&self) -> &rcl_action_server_t; diff --git a/src/clients.rs b/src/clients.rs index f2f27ef..106a315 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -1,12 +1,12 @@ use futures::channel::oneshot; use futures::TryFutureExt; -use std::future::Future; use std::ffi::CString; +use std::future::Future; use std::mem::MaybeUninit; use std::sync::{Mutex, Weak}; -use crate::msg_types::*; use crate::error::*; +use crate::msg_types::*; use r2r_rcl::*; /// ROS service client. diff --git a/src/lib.rs b/src/lib.rs index 8d5ec32..a567f2d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,13 +28,13 @@ //!use futures::future; //!use futures::stream::StreamExt; //!use futures::task::LocalSpawnExt; -//!use r2r; +//!use r2r::QosProfile; //! //!fn main() -> Result<(), Box> { //! let ctx = r2r::Context::create()?; //! let mut node = r2r::Node::create(ctx, "node", "namespace")?; -//! let subscriber = node.subscribe::("/topic")?; -//! let publisher = node.create_publisher::("/topic")?; +//! let subscriber = node.subscribe::("/topic", QosProfile::default())?; +//! let publisher = node.create_publisher::("/topic", QosProfile::default())?; //! let mut timer = node.create_wall_timer(std::time::Duration::from_millis(1000))?; //! //! // Set up a simple task executor. @@ -68,7 +68,6 @@ //!} //! ``` - // otherwise crates using r2r needs to specify the same version of uuid as // this crate depend on, which seem like bad user experience. pub extern crate uuid; @@ -117,3 +116,6 @@ pub use clocks::{Clock, ClockType}; mod nodes; pub use nodes::{Node, Timer}; + +pub mod qos; +pub use qos::QosProfile; diff --git a/src/msg_types.rs b/src/msg_types.rs index 01a1238..75de7d0 100644 --- a/src/msg_types.rs +++ b/src/msg_types.rs @@ -4,9 +4,9 @@ use r2r_rcl::{ rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t, }; use serde::{Deserialize, Serialize}; +use std::convert::TryInto; use std::fmt::Debug; use std::ops::{Deref, DerefMut}; -use std::convert::TryInto; pub mod generated_msgs { use super::*; diff --git a/src/nodes.rs b/src/nodes.rs index a994974..cbd46a7 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -3,29 +3,30 @@ use futures::future::FutureExt; use futures::future::TryFutureExt; use futures::future::{self, join_all}; use futures::stream::{Stream, StreamExt}; -use std::future::Future; use std::collections::HashMap; use std::ffi::{CStr, CString}; +use std::future::Future; use std::mem::MaybeUninit; use std::sync::{Arc, Mutex}; use std::time::Duration; -use r2r_rcl::*; use r2r_actions::*; +use r2r_rcl::*; -use crate::error::*; -use crate::msg_types::*; -use crate::msg_types::generated_msgs::rcl_interfaces; -use crate::subscribers::*; -use crate::publishers::*; -use crate::services::*; -use crate::clients::*; use crate::action_clients::*; use crate::action_clients_untyped::*; use crate::action_servers::*; -use crate::context::*; -use crate::parameters::*; +use crate::clients::*; use crate::clocks::*; +use crate::context::*; +use crate::error::*; +use crate::msg_types::generated_msgs::rcl_interfaces; +use crate::msg_types::*; +use crate::parameters::*; +use crate::publishers::*; +use crate::qos::QosProfile; +use crate::services::*; +use crate::subscribers::*; /// A ROS Node. /// @@ -293,12 +294,16 @@ impl Node { /// Subscribe to a ROS topic. /// /// This function returns a `Stream` of ros messages. - pub fn subscribe(&mut self, topic: &str) -> Result + Unpin> + pub fn subscribe( + &mut self, + topic: &str, + qos_profile: QosProfile, + ) -> Result + Unpin> where T: WrappedTypesupport, { let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts())?; + create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::(10); let ws = TypedSubscriber { @@ -315,12 +320,13 @@ impl Node { pub fn subscribe_native( &mut self, topic: &str, + qos_profile: QosProfile, ) -> Result> + Unpin> where T: WrappedTypesupport, { let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts())?; + create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); let ws = NativeSubscriber { @@ -339,10 +345,11 @@ impl Node { &mut self, topic: &str, topic_type: &str, + qos_profile: QosProfile, ) -> Result> + Unpin> { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let subscription_handle = - create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts)?; + create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; let (sender, receiver) = mpsc::channel::>(10); let ws = UntypedSubscriber { @@ -555,12 +562,16 @@ impl Node { } /// Create a ROS publisher. - pub fn create_publisher(&mut self, topic: &str) -> Result> + pub fn create_publisher( + &mut self, + topic: &str, + qos_profile: QosProfile, + ) -> Result> where T: WrappedTypesupport, { let publisher_handle = - create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts())?; + create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; let arc = Arc::new(publisher_handle); let p = make_publisher(Arc::downgrade(&arc)); self.pubs.push(arc); @@ -572,9 +583,11 @@ impl Node { &mut self, topic: &str, topic_type: &str, + qos_profile: QosProfile, ) -> Result { let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?; - let publisher_handle = create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts)?; + let publisher_handle = + create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?; let arc = Arc::new(publisher_handle); let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned()); self.pubs.push(arc); diff --git a/src/publishers.rs b/src/publishers.rs index 0dc3e4f..2012a16 100644 --- a/src/publishers.rs +++ b/src/publishers.rs @@ -1,10 +1,11 @@ use std::ffi::CString; use std::fmt::Debug; -use std::sync::Weak; use std::marker::PhantomData; +use std::sync::Weak; -use crate::msg_types::*; use crate::error::*; +use crate::msg_types::*; +use crate::qos::QosProfile; use r2r_rcl::*; // The publish function is thread safe. ROS2 docs state: @@ -77,13 +78,14 @@ pub fn create_publisher_helper( node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t, + qos_profile: QosProfile, ) -> Result { let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; let result = unsafe { let mut publisher_options = rcl_publisher_get_default_options(); - publisher_options.qos = rmw_qos_profile_t::default(); + publisher_options.qos = qos_profile.into(); rcl_publisher_init( &mut publisher_handle, node, diff --git a/src/qos.rs b/src/qos.rs new file mode 100644 index 0000000..f9ee69c --- /dev/null +++ b/src/qos.rs @@ -0,0 +1,530 @@ +//! QoS (Quality of Service) +//! full credit goes to https://github.com/rclrust/rclrust/blob/main/rclrust/src/qos.rs + +use std::time::Duration; + +use r2r_rcl::{ + rmw_qos_durability_policy_t, rmw_qos_history_policy_t, rmw_qos_liveliness_policy_t, + rmw_qos_reliability_policy_t, rmw_time_t, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum HistoryPolicy { + KeepAll, + KeepLast, + SystemDefault, + Unknown, +} + +impl From for rmw_qos_history_policy_t { + fn from(history_policy: HistoryPolicy) -> Self { + match history_policy { + HistoryPolicy::KeepAll => rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_ALL, + HistoryPolicy::KeepLast => rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_LAST, + HistoryPolicy::SystemDefault => { + rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT + } + HistoryPolicy::Unknown => rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_UNKNOWN, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReliabilityPolicy { + BestEffort, + Reliable, + SystemDefault, + Unknown, +} +impl From for rmw_qos_reliability_policy_t { + fn from(reliability_policy: ReliabilityPolicy) -> Self { + match reliability_policy { + ReliabilityPolicy::BestEffort => { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT + } + ReliabilityPolicy::Reliable => { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_RELIABLE + } + ReliabilityPolicy::SystemDefault => { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT + } + ReliabilityPolicy::Unknown => { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_UNKNOWN + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DurabilityPolicy { + TransientLocal, + Volatile, + SystemDefault, + Unknown, +} + +impl From for rmw_qos_durability_policy_t { + fn from(durability_policy: DurabilityPolicy) -> Self { + match durability_policy { + DurabilityPolicy::TransientLocal => { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL + } + DurabilityPolicy::Volatile => { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_VOLATILE + } + DurabilityPolicy::SystemDefault => { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT + } + DurabilityPolicy::Unknown => { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_UNKNOWN + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LivelinessPolicy { + Automatic, + ManualByNode, + ManualByTopic, + SystemDefault, + Unknown, +} + +impl From for rmw_qos_liveliness_policy_t { + fn from(liveliness_policy: LivelinessPolicy) -> Self { + match liveliness_policy { + LivelinessPolicy::Automatic => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_AUTOMATIC + } + LivelinessPolicy::ManualByNode => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE + } + LivelinessPolicy::ManualByTopic => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC + } + LivelinessPolicy::SystemDefault => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT + } + LivelinessPolicy::Unknown => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_UNKNOWN + } + } + } +} + +/// QoS profile +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QosProfile { + /// History QoS policy setting. + pub history: HistoryPolicy, + /// Size of the message queue. + pub depth: usize, + /// Reliabiilty QoS policy setting. + pub reliability: ReliabilityPolicy, + /// Durability QoS policy setting. + pub durability: DurabilityPolicy, + /// The period at which messages are expected to be sent/received. + pub deadline: Duration, + /// The age at which messages are considered expired and no longer valid. + pub lifespan: Duration, + /// Liveliness QoS policy setting. + pub liveliness: LivelinessPolicy, + /// The time within which the RMW node or publisher must show that it is alive. + pub liveliness_lease_duration: Duration, + /// If true, any ROS specific namespacing conventions will be circumvented. + /// + /// In the case of DDS and topics, for example, this means the typical ROS specific prefix of rt + /// would not be applied as described here: + /// + /// + /// + /// This might be useful when trying to directly connect a native DDS topic with a ROS 2 topic. + pub avoid_ros_namespace_conventions: bool, +} + +impl QosProfile { + /// Sensor Data QoS class + /// - History: Keep last, + /// - Depth: 5, + /// - Reliability: Best effort, + /// - Durability: Volatile, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - avoid ros namespace conventions: false + pub const fn sensor_data() -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth: 5, + reliability: ReliabilityPolicy::BestEffort, + durability: DurabilityPolicy::Volatile, + ..Self::common() + } + } + + /// Parameters QoS class + /// - History: Keep last, + /// - Depth: 1000, + /// - Reliability: Reliable, + /// - Durability: Volatile, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn parameters() -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth: 1000, + reliability: ReliabilityPolicy::Reliable, + durability: DurabilityPolicy::Volatile, + ..Self::common() + } + } + + /// Default QoS class + /// - History: Keep last, + /// - Depth: 10, + /// - Reliability: Reliable, + /// - Durability: Volatile, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn default() -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth: 10, + reliability: ReliabilityPolicy::Reliable, + durability: DurabilityPolicy::Volatile, + ..Self::common() + } + } + + /// Services QoS class + /// - History: Keep last, + /// - Depth: 10, + /// - Reliability: Reliable, + /// - Durability: Volatile, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn services_default() -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth: 10, + reliability: ReliabilityPolicy::Reliable, + durability: DurabilityPolicy::Volatile, + ..Self::common() + } + } + + /// Parameter events QoS class + /// - History: Keep last, + /// - Depth: 1000, + /// - Reliability: Reliable, + /// - Durability: Volatile, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn parameter_events() -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth: 1000, + reliability: ReliabilityPolicy::Reliable, + durability: DurabilityPolicy::Volatile, + ..Self::common() + } + } + + /// System defaults QoS class + /// - History: System default, + /// - Depth: System default, + /// - Reliability: System default, + /// - Durability: System default, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: System default, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn system_default() -> Self { + Self::common() + } + + /// Unknow QoS class + /// - History: Unknown, + /// - Depth: System default, + /// - Reliability: Unknown, + /// - Durability: Unknown, + /// - Deadline: Default, + /// - Lifespan: Default, + /// - Liveliness: Unknown, + /// - Liveliness lease duration: Default, + /// - Avoid ros namespace conventions: false + pub const fn unknown() -> Self { + Self { + history: HistoryPolicy::Unknown, + reliability: ReliabilityPolicy::Unknown, + durability: DurabilityPolicy::Unknown, + liveliness: LivelinessPolicy::Unknown, + ..Self::common() + } + } + + const fn common() -> Self { + Self { + history: HistoryPolicy::SystemDefault, + depth: r2r_rcl::RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT as usize, + reliability: ReliabilityPolicy::SystemDefault, + durability: DurabilityPolicy::SystemDefault, + deadline: Duration::ZERO, + lifespan: Duration::ZERO, + liveliness: LivelinessPolicy::SystemDefault, + liveliness_lease_duration: Duration::ZERO, + avoid_ros_namespace_conventions: false, + } + } + + /// Set the history policy. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::{HistoryPolicy, QosProfile}; + /// # + /// let qos = QosProfile::default().history(HistoryPolicy::KeepAll); + /// ``` + pub const fn history(self, history: HistoryPolicy) -> Self { + Self { history, ..self } + } + + /// Set the history to keep last. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().keep_last(10); + /// ``` + pub const fn keep_last(self, depth: usize) -> Self { + Self { + history: HistoryPolicy::KeepLast, + depth, + ..self + } + } + + /// Set the history to keep all. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().keep_all(); + /// ``` + pub const fn keep_all(self) -> Self { + Self { + history: HistoryPolicy::KeepAll, + depth: 0, + ..self + } + } + + /// Set the reliability setting. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::{QosProfile, ReliabilityPolicy}; + /// # + /// let qos = QosProfile::default().reliability(ReliabilityPolicy::Reliable); + /// ``` + pub const fn reliability(self, reliability: ReliabilityPolicy) -> Self { + Self { + reliability, + ..self + } + } + + /// Set the reliability setting to reliable. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().reliable(); + /// ``` + pub const fn reliable(self) -> Self { + self.reliability(ReliabilityPolicy::Reliable) + } + + /// Set the reliability setting to best effort. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().best_effort(); + /// ``` + pub const fn best_effort(self) -> Self { + self.reliability(ReliabilityPolicy::BestEffort) + } + + /// Set the durability setting. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::{DurabilityPolicy, QosProfile}; + /// # + /// let qos = QosProfile::default().durability(DurabilityPolicy::Volatile); + /// ``` + pub const fn durability(self, durability: DurabilityPolicy) -> Self { + Self { durability, ..self } + } + + /// Set the durability setting to volatile. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().volatile(); + /// ``` + pub const fn volatile(self) -> Self { + self.durability(DurabilityPolicy::Volatile) + } + + /// Set the durability setting to transient local. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().transient_local(); + /// ``` + pub const fn transient_local(self) -> Self { + self.durability(DurabilityPolicy::TransientLocal) + } + + /// Set the deadline setting. + /// + /// # Examples + /// + /// ``` + /// # use std::time::Duration; + /// # + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().deadline(Duration::from_secs(5)); + /// ``` + pub const fn deadline(self, deadline: Duration) -> Self { + Self { deadline, ..self } + } + + /// Set the lifespan setting. + /// + /// # Examples + /// + /// ``` + /// # use std::time::Duration; + /// # + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().lifespan(Duration::from_secs(5)); + /// ``` + pub const fn lifespan(self, lifespan: Duration) -> Self { + Self { lifespan, ..self } + } + + /// Set the liveliness setting. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::{LivelinessPolicy, QosProfile}; + /// # + /// let qos = QosProfile::default().liveliness(LivelinessPolicy::Automatic); + /// ``` + pub const fn liveliness(self, liveliness: LivelinessPolicy) -> Self { + Self { liveliness, ..self } + } + + /// Set the liveliness_lease_duration setting. + /// + /// # Examples + /// + /// ``` + /// # use std::time::Duration; + /// # + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().liveliness_lease_duration(Duration::from_secs(5)); + /// ``` + pub const fn liveliness_lease_duration(self, liveliness_lease_duration: Duration) -> Self { + Self { + liveliness_lease_duration, + ..self + } + } + + /// Set the avoid_ros_namespace_conventions setting. + /// + /// # Examples + /// + /// ``` + /// # use r2r::qos::QosProfile; + /// # + /// let qos = QosProfile::default().avoid_ros_namespace_conventions(true); + /// ``` + pub const fn avoid_ros_namespace_conventions( + self, + avoid_ros_namespace_conventions: bool, + ) -> Self { + Self { + avoid_ros_namespace_conventions, + ..self + } + } +} + +impl From for r2r_rcl::rmw_qos_profile_t { + fn from(qos: QosProfile) -> Self { + Self { + history: qos.history.into(), + depth: qos.depth, + reliability: qos.reliability.into(), + durability: qos.durability.into(), + deadline: qos.deadline.to_rmw_time_t(), + lifespan: qos.lifespan.to_rmw_time_t(), + liveliness: qos.liveliness.into(), + liveliness_lease_duration: qos.liveliness_lease_duration.to_rmw_time_t(), + avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions, + } + } +} +pub(crate) trait RclDurationT { + fn to_rmw_time_t(&self) -> rmw_time_t; +} + +impl RclDurationT for Duration { + fn to_rmw_time_t(&self) -> rmw_time_t { + rmw_time_t { + sec: self.as_secs(), + nsec: self.subsec_nanos().into(), + } + } +} diff --git a/src/services.rs b/src/services.rs index f232d6d..fcbe75b 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,10 +1,10 @@ use futures::channel::{mpsc, oneshot}; use std::ffi::CString; -use std::sync::{Arc, Mutex, Weak}; use std::mem::MaybeUninit; +use std::sync::{Arc, Mutex, Weak}; -use crate::msg_types::*; use crate::error::*; +use crate::msg_types::*; use r2r_rcl::*; /// Encapsulates a service request. diff --git a/src/subscribers.rs b/src/subscribers.rs index 0da8d6a..c9e16fd 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -1,8 +1,9 @@ use futures::channel::mpsc; use std::ffi::CString; -use crate::msg_types::*; use crate::error::*; +use crate::msg_types::*; +use crate::qos::QosProfile; use r2r_rcl::*; pub trait Subscriber_ { @@ -161,13 +162,14 @@ pub fn create_subscription_helper( node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, + qos_profile: QosProfile, ) -> Result { let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() }; let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; let result = unsafe { let mut subscription_options = rcl_subscription_get_default_options(); - subscription_options.qos = rmw_qos_profile_t::default(); + subscription_options.qos = qos_profile.into(); rcl_subscription_init( &mut subscription_handle, node, diff --git a/tests/threads.rs b/tests/threads.rs index ad47e6b..0bf21fa 100644 --- a/tests/threads.rs +++ b/tests/threads.rs @@ -1,8 +1,6 @@ use std::thread; use std::time::Duration; -use r2r; - #[test] // Let's create and drop a lot of node and publishers for a while to see that we can cope. fn doesnt_crash() -> Result<(), Box> { diff --git a/tests/tokio_testing.rs b/tests/tokio_testing.rs index 0678181..66d349d 100644 --- a/tests/tokio_testing.rs +++ b/tests/tokio_testing.rs @@ -1,5 +1,5 @@ use futures::stream::StreamExt; -use r2r; + use std::sync::{Arc, Mutex}; use tokio::task; @@ -60,7 +60,7 @@ async fn tokio_testing() -> Result<(), Box> { } } - state.lock().unwrap().clone() + *state.lock().unwrap() }); let x = handle.join().unwrap(); assert_eq!(x, 19);