simple tokio examples.

This commit is contained in:
Kristofer Bengtsson 2021-09-06 09:33:14 +02:00
parent f886228cec
commit 1fb92fa5ca
5 changed files with 237 additions and 0 deletions

34
examples/tokio_client.rs Normal file
View File

@ -0,0 +1,34 @@
use r2r;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let duration = std::time::Duration::from_millis(2500);
use r2r::example_interfaces::srv::AddTwoInts;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
let mut timer = node.create_wall_timer(duration)?;
let waiting = node.is_available(&client)?;
let handle = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});
println!("waiting for service...");
waiting.await?;
println!("service available.");
for i in 1..10 {
let req = AddTwoInts::Request { a: i , b: 5 };
if let Ok(resp) = client.request(&req)?.await {
println!("{}", resp.sum);
}
timer.tick().await?;
}
handle.await?;
Ok(())
}

117
examples/tokio_examples.rs Normal file
View File

@ -0,0 +1,117 @@
use std::sync::{Arc, Mutex};
use futures::future;
use futures::stream::StreamExt;
use r2r;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let node = r2r::Node::create(ctx, "testnode", "")?;
let arc_node = Arc::new(Mutex::new(node));
let an = arc_node.clone();
task::spawn(async move {
subscriber(an).await.unwrap()
});
let an = arc_node.clone();
task::spawn(async move {
publisher(an).await.unwrap()
});
let an = arc_node.clone();
task::spawn(async move {
client(an).await.unwrap()
});
let an = arc_node.clone();
task::spawn(async move {
service(an).await.unwrap()
});
let handle = tokio::task::spawn_blocking(move || loop {
{
arc_node.lock().unwrap().spin_once(std::time::Duration::from_millis(10));
}
std::thread::sleep(std::time::Duration::from_millis(100))
});
handle.await?;
Ok(())
}
async fn subscriber(arc_node: Arc<Mutex<r2r::Node>>) -> Result<(), r2r::Error> {
let sub = arc_node.lock().unwrap().subscribe::<r2r::std_msgs::msg::String>("/topic")?;
sub.for_each(|msg| {
println!("topic: new msg: {}", msg.data);
future::ready(())
}).await;
Ok(())
}
async fn publisher(arc_node: Arc<Mutex<r2r::Node>>) -> Result<(), r2r::Error> {
let (mut timer, publisher) = {
// Limiting the scope when locking the arc
let mut node = arc_node.lock().unwrap();
let timer = node.create_wall_timer(std::time::Duration::from_secs(2))?;
let publisher = node.create_publisher::<r2r::std_msgs::msg::String>("/topic")?;
(timer, publisher)
};
for _ in 1..10 {
timer.tick().await?;
let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() };
publisher.publish(&msg)?;
};
Ok(())
}
async fn client(arc_node: Arc<Mutex<r2r::Node>>) -> Result<(), r2r::Error> {
use r2r::example_interfaces::srv::AddTwoInts;
let (client, mut timer, service_available) = {
// Limiting the scope when locking the arc
let mut node = arc_node.lock().unwrap();
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
let timer = node.create_wall_timer(std::time::Duration::from_secs(2))?;
let service_available = node.is_available(&client)?;
(client, timer, service_available)
};
println!("waiting for service...");
service_available.await?;
println!("service available.");
for i in 1..10 {
let req = AddTwoInts::Request { a: i , b: 5 };
if let Ok(resp) = client.request(&req).unwrap().await {
println!("{}", resp.sum);
}
timer.tick().await?;
};
Ok(())
}
async fn service(arc_node: Arc<Mutex<r2r::Node>>) -> Result<(), r2r::Error> {
use r2r::example_interfaces::srv::AddTwoInts;
let mut service = {
// Limiting the scope when locking the arc
let mut node = arc_node.lock().unwrap();
node.create_service::<AddTwoInts::Service>("/add_two_ints")?
};
loop {
match service.next().await {
Some(req) => {
let resp = AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
req.respond(resp).expect("could not send service response");
}
None => break,
}
}
Ok(())
}

View File

@ -0,0 +1,27 @@
use r2r;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let duration = std::time::Duration::from_millis(2500);
let mut timer = node.create_wall_timer(duration)?;
let publisher = node.create_publisher::<r2r::std_msgs::msg::String>("/topic")?;
let handle = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});
for _ in 1..10 {
timer.tick().await?;
let msg = r2r::std_msgs::msg::String{data: "hello from r2r".to_string() };
publisher.publish(&msg)?;
}
handle.await?;
Ok(())
}

33
examples/tokio_service.rs Normal file
View File

@ -0,0 +1,33 @@
use r2r;
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
use r2r::example_interfaces::srv::AddTwoInts;
let mut service = node.create_service::<AddTwoInts::Service>("/add_two_ints")?;
let handle = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});
loop {
match service.next().await {
Some(req) => {
let resp = AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
req.respond(resp).expect("could not send service response");
}
None => break,
}
}
handle.await?;
Ok(())
}

View File

@ -0,0 +1,26 @@
use futures::future;
use futures::stream::StreamExt;
use r2r;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let sub = node.subscribe::<r2r::std_msgs::msg::String>("/topic")?;
let handle = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});
sub.for_each(|msg| {
println!("topic: new msg: {}", msg.data);
future::ready(())
}).await;
handle.await?;
Ok(())
}