From e81f706abe73bed4f7f4636a706b8c25ffe8fac1 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 13 Dec 2023 12:44:17 +0100 Subject: [PATCH] Break out raw subscription test to own file and remove panics. --- r2r/tests/tokio_test_raw.rs | 65 +++++++++++++++++++++++++++++++++ r2r/tests/tokio_testing.rs | 72 ------------------------------------- 2 files changed, 65 insertions(+), 72 deletions(-) create mode 100644 r2r/tests/tokio_test_raw.rs diff --git a/r2r/tests/tokio_test_raw.rs b/r2r/tests/tokio_test_raw.rs new file mode 100644 index 0000000..e619449 --- /dev/null +++ b/r2r/tests/tokio_test_raw.rs @@ -0,0 +1,65 @@ +use futures::stream::StreamExt; +use r2r::QosProfile; +use tokio::task; + +#[tokio::test(flavor = "multi_thread")] +async fn tokio_subscribe_raw_testing() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode2", "")?; + + let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; + + let mut sub_array = + node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; + + let pub_int = + node.create_publisher::("/int", QosProfile::default())?; + + // Use an array as well since its a variable sized type + let pub_array = node.create_publisher::( + "/int_array", + QosProfile::default(), + )?; + + task::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + (0..10).for_each(|i| { + pub_int + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); + + pub_array + .publish(&r2r::std_msgs::msg::Int32MultiArray { + layout: r2r::std_msgs::msg::MultiArrayLayout::default(), + data: vec![i], + }) + .unwrap(); + }); + }); + + let sub_int_handle = task::spawn(async move { + while let Some(msg) = sub_int.next().await { + println!("Got int msg of len {}", msg.len()); + assert_eq!(msg.len(), 8); + } + }); + + let sub_array_handle = task::spawn(async move { + while let Some(msg) = sub_array.next().await { + println!("Got array msg of len {}", msg.len()); + assert_eq!(msg.len(), 20); + } + }); + + let handle = std::thread::spawn(move || { + for _ in 1..=30 { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + sub_int_handle.await?; + sub_array_handle.await?; + handle.join().unwrap(); + + Ok(()) +} diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index 17023f8..0736cb5 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -61,75 +61,3 @@ async fn tokio_testing() -> Result<(), Box> { assert_eq!(x, 19); Ok(()) } - -#[tokio::test(flavor = "multi_thread")] -async fn tokio_subscribe_raw_testing() -> Result<(), Box> { - let ctx = r2r::Context::create()?; - let mut node = r2r::Node::create(ctx, "testnode", "")?; - - let mut sub_int = node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?; - - let mut sub_array = - node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?; - - let pub_int = - node.create_publisher::("/int", QosProfile::default())?; - - // Use an array as well since its a variable sized type - let pub_array = node.create_publisher::( - "/int_array", - QosProfile::default(), - )?; - - task::spawn(async move { - (0..10).for_each(|i| { - pub_int - .publish(&r2r::std_msgs::msg::Int32 { data: i }) - .unwrap(); - - pub_array - .publish(&r2r::std_msgs::msg::Int32MultiArray { - layout: r2r::std_msgs::msg::MultiArrayLayout::default(), - data: vec![i], - }) - .unwrap(); - }); - }); - - let sub_int_handle = task::spawn(async move { - while let Some(msg) = sub_int.next().await { - println!("Got int msg of len {}", msg.len()); - - // assert_eq!(msg.len(), 4); - // TODO is there padding or something? - assert_eq!(msg.len(), 8); - - // assert_eq!(msg,) - } - - panic!("int msg finished"); - }); - - let sub_array_handle = task::spawn(async move { - while let Some(msg) = sub_array.next().await { - println!("Got array msg of len {}", msg.len()); - // assert_eq!(msg.data, ) - } - - panic!("array msg finished"); - }); - - let handle = std::thread::spawn(move || { - for _ in 1..=30 { - node.spin_once(std::time::Duration::from_millis(100)); - } - }); - - handle.join().unwrap(); - - // This means something panicked .. - assert!(!sub_int_handle.is_finished()); - assert!(!sub_array_handle.is_finished()); - - Ok(()) -}