From f2b6c64440b61fd780e00e3286516b1a3ed81d0e Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 16 Jun 2021 09:34:11 +0200 Subject: [PATCH] example using tokio as executor --- examples/tokio.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 examples/tokio.rs diff --git a/examples/tokio.rs b/examples/tokio.rs new file mode 100644 index 0000000..4d993ad --- /dev/null +++ b/examples/tokio.rs @@ -0,0 +1,66 @@ +use futures::stream::StreamExt; +use futures::future; +use tokio::task; +use std::sync::{Arc, Mutex}; +use r2r; + +#[derive(Debug, Default)] +struct SharedState { + pub state: i32 +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut sub = node.subscribe::("/topic")?; + let p = node.create_publisher::("/topic2")?; + let state = Arc::new(Mutex::new(SharedState::default())); + + // task that every other time forwards message to topic2 + let state_t1 = state.clone(); + task::spawn(async move { + let mut x: i32 = 0; + loop { + match sub.next().await { + Some(msg) => { + if x % 2 == 0 { + p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap(); + } else { + // update shared state + state_t1.lock().unwrap().state = x; + } + }, + None => break, + } + x+=1; + } + }); + + // for sub2 we just print the data + let sub2 = node.subscribe::("/topic2")?; + task::spawn(async move { sub2.for_each(|msg| { + println!("topic2: new msg: {}", msg.data); + future::ready(()) + }).await}); + + let mut timer = node.create_wall_timer(std::time::Duration::from_millis(2500)).unwrap(); + let state_t2 = state.clone(); + task::spawn(async move { + loop { + let time_passed = timer.tick().await.unwrap(); + let x = state_t2.lock().unwrap().state; + println!("timer event. time passed: {}. shared state is {}", time_passed.as_micros(), x); + } + }); + + // here we spin the node in its own thread (but we could just busy wait in this thread) + let handle = std::thread::spawn(move || { + loop { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + handle.join().unwrap(); + + Ok(()) +}