diff --git a/examples/tokio_client.rs b/examples/tokio_client.rs new file mode 100644 index 0000000..2174c02 --- /dev/null +++ b/examples/tokio_client.rs @@ -0,0 +1,34 @@ +use r2r; + + + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let duration = std::time::Duration::from_millis(2500); + + use r2r::example_interfaces::srv::AddTwoInts; + let client = node.create_client::("/add_two_ints")?; + let mut timer = node.create_wall_timer(duration)?; + let waiting = node.is_available(&client)?; + + let handle = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + println!("waiting for service..."); + waiting.await?; + println!("service available."); + for i in 1..10 { + let req = AddTwoInts::Request { a: i , b: 5 }; + if let Ok(resp) = client.request(&req)?.await { + println!("{}", resp.sum); + } + timer.tick().await?; + } + + handle.await?; + + Ok(()) +} diff --git a/examples/tokio_examples.rs b/examples/tokio_examples.rs new file mode 100644 index 0000000..99cfba8 --- /dev/null +++ b/examples/tokio_examples.rs @@ -0,0 +1,117 @@ +use std::sync::{Arc, Mutex}; + +use futures::future; +use futures::stream::StreamExt; +use r2r; +use tokio::task; + + + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let node = r2r::Node::create(ctx, "testnode", "")?; + let arc_node = Arc::new(Mutex::new(node)); + + let an = arc_node.clone(); + task::spawn(async move { + subscriber(an).await.unwrap() + }); + + let an = arc_node.clone(); + task::spawn(async move { + publisher(an).await.unwrap() + }); + + let an = arc_node.clone(); + task::spawn(async move { + client(an).await.unwrap() + }); + + let an = arc_node.clone(); + task::spawn(async move { + service(an).await.unwrap() + }); + + let handle = tokio::task::spawn_blocking(move || loop { + { + arc_node.lock().unwrap().spin_once(std::time::Duration::from_millis(10)); + } + std::thread::sleep(std::time::Duration::from_millis(100)) + }); + + handle.await?; + + Ok(()) +} + + +async fn subscriber(arc_node: Arc>) -> Result<(), r2r::Error> { + let sub = arc_node.lock().unwrap().subscribe::("/topic")?; + sub.for_each(|msg| { + println!("topic: new msg: {}", msg.data); + future::ready(()) + }).await; + Ok(()) +} + +async fn publisher(arc_node: Arc>) -> Result<(), r2r::Error> { + let (mut timer, publisher) = { + // Limiting the scope when locking the arc + let mut node = arc_node.lock().unwrap(); + let timer = node.create_wall_timer(std::time::Duration::from_secs(2))?; + let publisher = node.create_publisher::("/topic")?; + (timer, publisher) + }; + for _ in 1..10 { + timer.tick().await?; + let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() }; + publisher.publish(&msg)?; + }; + Ok(()) +} + +async fn client(arc_node: Arc>) -> Result<(), r2r::Error> { + use r2r::example_interfaces::srv::AddTwoInts; + let (client, mut timer, service_available) = { + // Limiting the scope when locking the arc + let mut node = arc_node.lock().unwrap(); + let client = node.create_client::("/add_two_ints")?; + let timer = node.create_wall_timer(std::time::Duration::from_secs(2))?; + let service_available = node.is_available(&client)?; + (client, timer, service_available) + }; + println!("waiting for service..."); + service_available.await?; + println!("service available."); + for i in 1..10 { + let req = AddTwoInts::Request { a: i , b: 5 }; + if let Ok(resp) = client.request(&req).unwrap().await { + println!("{}", resp.sum); + } + timer.tick().await?; + }; + Ok(()) +} + +async fn service(arc_node: Arc>) -> Result<(), r2r::Error> { + use r2r::example_interfaces::srv::AddTwoInts; + let mut service = { + // Limiting the scope when locking the arc + let mut node = arc_node.lock().unwrap(); + node.create_service::("/add_two_ints")? + }; + loop { + match service.next().await { + Some(req) => { + let resp = AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).expect("could not send service response"); + } + None => break, + } + } + Ok(()) +} + diff --git a/examples/tokio_publisher.rs b/examples/tokio_publisher.rs new file mode 100644 index 0000000..d0fc94f --- /dev/null +++ b/examples/tokio_publisher.rs @@ -0,0 +1,27 @@ +use r2r; + + + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let duration = std::time::Duration::from_millis(2500); + + let mut timer = node.create_wall_timer(duration)?; + let publisher = node.create_publisher::("/topic")?; + + let handle = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + for _ in 1..10 { + timer.tick().await?; + let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() }; + publisher.publish(&msg)?; + } + + + handle.await?; + Ok(()) +} diff --git a/examples/tokio_service.rs b/examples/tokio_service.rs new file mode 100644 index 0000000..a8b9163 --- /dev/null +++ b/examples/tokio_service.rs @@ -0,0 +1,33 @@ +use r2r; +use futures::stream::StreamExt; + + + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + use r2r::example_interfaces::srv::AddTwoInts; + let mut service = node.create_service::("/add_two_ints")?; + + let handle = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + loop { + match service.next().await { + Some(req) => { + let resp = AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).expect("could not send service response"); + } + None => break, + } + } + + handle.await?; + + Ok(()) +} diff --git a/examples/tokio_subscriber.rs b/examples/tokio_subscriber.rs new file mode 100644 index 0000000..3b25da0 --- /dev/null +++ b/examples/tokio_subscriber.rs @@ -0,0 +1,26 @@ +use futures::future; +use futures::stream::StreamExt; +use r2r; + + + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let sub = node.subscribe::("/topic")?; + + let handle = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + sub.for_each(|msg| { + println!("topic: new msg: {}", msg.data); + future::ready(()) + }).await; + + handle.await?; + + Ok(()) +}