From a007f72b6c898ef50aa56029d79cc26708033662 Mon Sep 17 00:00:00 2001 From: Kristofer Bengtsson Date: Tue, 6 Jul 2021 15:33:35 +0200 Subject: [PATCH] Added testing of tokio when running cargo test --- tests/tokio_testing.rs | 62 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/tokio_testing.rs diff --git a/tests/tokio_testing.rs b/tests/tokio_testing.rs new file mode 100644 index 0000000..3260d93 --- /dev/null +++ b/tests/tokio_testing.rs @@ -0,0 +1,62 @@ +use futures::stream::StreamExt; +use tokio::task; +use std::sync::{Arc, Mutex}; +use r2r; + +#[tokio::test(flavor = "multi_thread")] +async fn tokio_testing() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut s_the_no = node.subscribe::("/the_no")?; + let mut s_new_no = node.subscribe::("/new_no")?; + let p_the_no = node.create_publisher::("/the_no")?; + let p_new_no = node.create_publisher::("/new_no")?; + let state = Arc::new(Mutex::new(0)); + + task::spawn(async move { + (0..10).for_each(|i| { + p_the_no.publish(&r2r::std_msgs::msg::Int32{data: i}).unwrap(); + }); + }); + + task::spawn(async move { + loop { + match s_the_no.next().await { + Some(msg) => { + p_new_no.publish(&r2r::std_msgs::msg::Int32{data: msg.data + 10}).unwrap(); + }, + None => break, + } + } + }); + + let s = state.clone(); + task::spawn(async move { + loop { + match s_new_no.next().await { + Some(msg) => { + let i = msg.data; + if i==19 { + *s.lock().unwrap() = 19; + } + }, + None => break, + } + } + }); + + let handle = std::thread::spawn(move || { + for _ in 1..=30 { + node.spin_once(std::time::Duration::from_millis(100)); + let x = state.lock().unwrap(); + if *x == 19 { + break; + } + }; + + state.lock().unwrap().clone() + }); + let x = handle.join().unwrap(); + assert_eq!(x, 19); + Ok(()) +} \ No newline at end of file