This commit is contained in:
Martin Dahl 2021-07-25 11:08:22 +02:00
parent 666c7e24e6
commit 59337d7d19
15 changed files with 277 additions and 161 deletions

View File

@ -60,18 +60,24 @@ fn main() {
.derive_copy(true)
// whitelist a few specific things that we need.
.whitelist_recursively(false)
.whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t")
.whitelist_type("rcl_action_server_t").opaque_type("rcl_action_server_t")
.whitelist_type("rcl_action_client_t")
.opaque_type("rcl_action_client_t")
.whitelist_type("rcl_action_server_t")
.opaque_type("rcl_action_server_t")
.whitelist_type("rcl_action_goal_info_t")
.whitelist_type("rcl_action_goal_handle_t").opaque_type("rcl_action_goal_handle_t")
.whitelist_type("rcl_action_goal_handle_t")
.opaque_type("rcl_action_goal_handle_t")
.whitelist_type("rcl_action_cancel_request_t")
.whitelist_type("rcl_action_cancel_response_t")
.whitelist_type("rcl_action_goal_event_t")
.whitelist_type("rcl_action_goal_state_t").opaque_type("rcl_action_goal_state_t")
.whitelist_type("rcl_action_goal_status_array_t").opaque_type("rcl_action_goal_status_array_t")
.whitelist_type("rcl_action_goal_state_t")
.opaque_type("rcl_action_goal_state_t")
.whitelist_type("rcl_action_goal_status_array_t")
.opaque_type("rcl_action_goal_status_array_t")
.whitelist_function("rcl_action_.*")
.whitelist_type("rcl_action_client_options_t")
.whitelist_type("rcl_action_server_options_t")
.whitelist_var("RCL_RET_ACTION_.*")
.generate()
.expect("Unable to generate bindings");

View File

@ -5,5 +5,5 @@
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/action_bindings.rs"));
use rcl::*;
use msg_gen::*;
use rcl::*;

View File

@ -76,7 +76,11 @@ fn main() {
// also "internal" feedback message type that wraps the feedback type with a uuid
let feedback_msgname = format!("{}_FeedbackMessage", msg);
codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &feedback_msgname));
codegen.push_str(&msg_gen::generate_rust_msg(
module,
prefix,
&feedback_msgname,
));
codegen.push_str(" }\n");
}

View File

@ -2,14 +2,16 @@ use futures::executor::LocalPool;
use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use std::sync::{Arc,Mutex};
use r2r;
use r2r::example_interfaces::action::Fibonacci;
use std::sync::{Arc, Mutex};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = Arc::new(Mutex::new(node.create_action_client::<Fibonacci::Action>("/fibonacci")?));
let client = Arc::new(Mutex::new(
node.create_action_client::<Fibonacci::Action>("/fibonacci")?,
));
// signal that we are done
let done = Arc::new(Mutex::new(false));
@ -41,18 +43,27 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let nested_task_done = nested_task_done.clone();
let nested_goal = nested_goal.clone();
async move {
println!("new feedback msg {:?} -- {:?}", msg, nested_goal.get_status());
println!(
"new feedback msg {:?} -- {:?}",
msg,
nested_goal.get_status()
);
// cancel the goal before it finishes. (comment out to complete the goal)
if msg.sequence.len() == 8 {
nested_goal.cancel().unwrap().
map(|r| {
nested_goal
.cancel()
.unwrap()
.map(|r| {
println!("goal cancelled: {:?}", r);
// we are done.
*nested_task_done.lock().unwrap() = true;
}).await;
})
.await;
}
}})).unwrap();
}
}))
.unwrap();
// await result in this task
let result = goal.get_result().unwrap().await;
@ -60,7 +71,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(msg) => {
println!("got result {:?}", msg);
*task_done.lock().unwrap() = true;
},
}
Err(e) => println!("action failed: {:?}", e),
}
})?;

View File

@ -1,13 +1,16 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use std::sync::{Arc,Mutex};
use r2r;
use r2r::ServerGoal;
use r2r::example_interfaces::action::Fibonacci;
use r2r::ServerGoal;
use std::sync::{Arc, Mutex};
// note: cannot be blocking.
fn accept_goal_cb(uuid: &uuid::Uuid, goal: &Fibonacci::Goal) -> bool {
println!("Got goal request with order {}, goal id: {}", goal.order, uuid);
println!(
"Got goal request with order {}, goal id: {}",
goal.order, uuid
);
// reject high orders
goal.order < 100
}
@ -40,8 +43,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// also we cannot block which is why we spawn the task
let node_cb = node_cb.clone();
let done_cb = done_cb.clone();
task_spawner.spawn_local(async move {
let mut timer = node_cb.lock().unwrap().create_wall_timer(std::time::Duration::from_millis(1000))
task_spawner
.spawn_local(async move {
let mut timer = node_cb
.lock()
.unwrap()
.create_wall_timer(std::time::Duration::from_millis(1000))
.expect("could not create timer");
let mut feedback_msg = Fibonacci::Feedback {
sequence: vec![0, 1],
@ -59,27 +66,37 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
*done_cb.lock().unwrap() = true;
return;
}
feedback_msg.sequence.push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]);
feedback_msg
.sequence
.push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]);
g.publish_feedback(feedback_msg.clone()).expect("fail");
println!("Sending feedback: {:?}", feedback_msg);
timer.tick().await.unwrap();
}
let result_msg = Fibonacci::Result {
sequence: feedback_msg.sequence
sequence: feedback_msg.sequence,
};
g.succeed(result_msg).expect("could not set result");
// signal stopping of the node
*done_cb.lock().unwrap() = true;
}).expect("could not spawn task");
})
.expect("could not spawn task");
};
let _server = node.lock().unwrap().create_action_server::<Fibonacci::Action>("/fibonacci",
let _server = node
.lock()
.unwrap()
.create_action_server::<Fibonacci::Action>(
"/fibonacci",
Box::new(accept_goal_cb),
Box::new(accept_cancel_cb),
Box::new(handle_goal_cb))?;
Box::new(handle_goal_cb),
)?;
loop {
node.lock().unwrap().spin_once(std::time::Duration::from_millis(100));
node.lock()
.unwrap()
.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
if *done.lock().unwrap() {
break;

View File

@ -5,7 +5,9 @@ use r2r;
use r2r::example_interfaces::srv::AddTwoInts;
async fn requester_task(c: r2r::Client<AddTwoInts::Service>) -> Result<(), Box<dyn std::error::Error>> {
async fn requester_task(
c: r2r::Client<AddTwoInts::Service>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut x: i64 = 0;
loop {
let req = AddTwoInts::Request { a: 10 * x, b: x };
@ -40,7 +42,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match requester_task(client).await {
Ok(()) => println!("exiting"),
Err(e) => println!("error: {}", e),
}})?;
}
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));

View File

@ -16,7 +16,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
positions.push(count as f64);
let to_send = JointTrajectoryPoint {
positions: positions.clone(),
time_from_start: Duration { sec: count, nanosec: 0 },
time_from_start: Duration {
sec: count,
nanosec: 0,
},
..Default::default()
};
let mut native = r2r::WrappedNativeMsg::<Int32>::new();

View File

@ -1,7 +1,7 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::stream::StreamExt;
use futures::future;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use r2r;
use std::collections::HashMap;
use std::env;
@ -45,7 +45,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let echo_pub = node.create_publisher_untyped(echo, type_name)?;
let sub = node.subscribe_untyped(topic, type_name)?;
spawner.spawn_local(async move { sub.for_each(|msg| {
spawner.spawn_local(async move {
sub.for_each(|msg| {
match msg {
Ok(msg) => {
let s = serde_json::to_string_pretty(&msg).unwrap();
@ -57,7 +58,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
future::ready(())
}).await})?;
})
.await
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));

View File

@ -1,7 +1,7 @@
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::stream::StreamExt;
use futures::future;
use futures::stream::StreamExt;
use futures::task::LocalSpawnExt;
use r2r;
fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -20,9 +20,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match sub.next().await {
Some(msg) => {
if x % 2 == 0 {
p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap();
p.publish(&r2r::std_msgs::msg::String {
data: format!("({}): new msg: {}", x, msg.data),
})
.unwrap();
}
}
},
None => break,
}
x += 1;
@ -31,10 +34,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// for sub2 we just print the data
let sub2 = node.subscribe::<r2r::std_msgs::msg::String>("/topic2")?;
spawner.spawn_local(async move { sub2.for_each(|msg| {
spawner.spawn_local(async move {
sub2.for_each(|msg| {
println!("topic2: new msg: {}", msg.data);
future::ready(())
}).await})?;
})
.await
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));

View File

@ -1,12 +1,12 @@
use futures::stream::StreamExt;
use futures::future;
use tokio::task;
use std::sync::{Arc, Mutex};
use futures::stream::StreamExt;
use r2r;
use std::sync::{Arc, Mutex};
use tokio::task;
#[derive(Debug, Default)]
struct SharedState {
pub state: i32
pub state: i32,
}
#[tokio::main]
@ -25,12 +25,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match sub.next().await {
Some(msg) => {
if x % 2 == 0 {
p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap();
p.publish(&r2r::std_msgs::msg::String {
data: format!("({}): new msg: {}", x, msg.data),
})
.unwrap();
} else {
// update shared state
state_t1.lock().unwrap().state = x;
}
},
}
None => break,
}
x += 1;
@ -39,26 +42,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// for sub2 we just print the data
let sub2 = node.subscribe::<r2r::std_msgs::msg::String>("/topic2")?;
task::spawn(async move { sub2.for_each(|msg| {
task::spawn(async move {
sub2.for_each(|msg| {
println!("topic2: new msg: {}", msg.data);
future::ready(())
}).await});
})
.await
});
let mut timer = node.create_wall_timer(std::time::Duration::from_millis(2500)).unwrap();
let mut timer = node
.create_wall_timer(std::time::Duration::from_millis(2500))
.unwrap();
let state_t2 = state.clone();
task::spawn(async move {
loop {
let time_passed = timer.tick().await.unwrap();
let x = state_t2.lock().unwrap().state;
println!("timer event. time passed: {}. shared state is {}", time_passed.as_micros(), x);
println!(
"timer event. time passed: {}. shared state is {}",
time_passed.as_micros(),
x
);
}
});
// here we spin the node in its own thread (but we could just busy wait in this thread)
let handle = std::thread::spawn(move || {
loop {
let handle = std::thread::spawn(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
}
});
handle.join().unwrap();

View File

@ -33,7 +33,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match timer_task(timer).await {
Ok(()) => println!("exiting"),
Err(e) => println!("error: {}", e),
}})?;
}
})?;
loop {
node.spin_once(std::time::Duration::from_millis(100));

View File

@ -52,8 +52,7 @@ fn field_type(t: u8) -> String {
} else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_LONG_DOUBLE as u8) {
// f128 does not exist in rust
"u128".to_owned()
}
else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) {
} else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) {
"message".to_owned()
} else {
panic!("ros native type not implemented: {}", t);
@ -242,14 +241,14 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String {
// hack here to rustify nested action type names
if prefix == "action" {
if let Some((n1, n2)) = name.rsplit_once("_") {
format!("{module}::{prefix}::{srvname}::{msgname}",
format!(
"{module}::{prefix}::{srvname}::{msgname}",
module = module,
prefix = prefix,
srvname = n1,
msgname = n2
)
}
else {
} else {
format!(
"{module}::{prefix}::{msgname}",
module = module,

View File

@ -82,9 +82,25 @@ pub enum Error {
#[error("Serde error: {}", err)]
SerdeError { err: String },
// action errors. perhaps they exist in the rcl?
#[error("Goal rejected by server.")]
GoalRejected,
// action errors.
#[error("RCL_RET_ACTION_NAME_INVALID")]
RCL_RET_ACTION_NAME_INVALID,
#[error("RCL_RET_ACTION_GOAL_ACCEPTED")]
RCL_RET_ACTION_GOAL_ACCEPTED,
#[error("RCL_RET_ACTION_GOAL_REJECTED")]
RCL_RET_ACTION_GOAL_REJECTED,
#[error("RCL_RET_ACTION_CLIENT_INVALID")]
RCL_RET_ACTION_CLIENT_INVALID,
#[error("RCL_RET_ACTION_CLIENT_TAKE_FAILED")]
RCL_RET_ACTION_CLIENT_TAKE_FAILED,
#[error("RCL_RET_ACTION_SERVER_INVALID")]
RCL_RET_ACTION_SERVER_INVALID,
#[error("RCL_RET_ACTION_SERVER_TAKE_FAILED")]
RCL_RET_ACTION_SERVER_TAKE_FAILED,
#[error("RCL_RET_ACTION_GOAL_HANDLE_INVALID")]
RCL_RET_ACTION_GOAL_HANDLE_INVALID,
#[error("RCL_RET_ACTION_GOAL_EVENT_INVALID")]
RCL_RET_ACTION_GOAL_EVENT_INVALID,
#[error("Goal cancel request rejected by server.")]
GoalCancelRejected,

View File

@ -1059,7 +1059,6 @@ where
let gi = action_msgs::msg::GoalInfo::from_native(&goal_info);
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone()));
println!("goal expired: {} - {}", uuid, num_expired);
// todo: delete info about goal.
self.goals.remove(&uuid);
self.result_msgs.remove(&uuid);
self.result_requests.remove(&uuid);
@ -1071,13 +1070,23 @@ where
let mut status = rcl_action_get_zero_initialized_goal_status_array();
let ret = rcl_action_get_goal_status_array(&self.rcl_handle, &mut status);
if ret != RCL_RET_OK as i32 {
println!("action server: failed to get goal status array: {}", ret);
println!(
"action server: failed to get goal status array: {}",
Error::from_rcl_error(ret)
);
return;
}
// todo: error handling...
rcl_action_publish_status(
let ret = rcl_action_publish_status(
&self.rcl_handle,
&status as *const _ as *const std::os::raw::c_void,
);
if ret != RCL_RET_OK as i32 {
println!(
"action server: failed to publish status: {}",
Error::from_rcl_error(ret)
);
return;
}
rcl_action_goal_status_array_fini(&mut status);
}
}
@ -1091,7 +1100,10 @@ where
rcl_action_send_result_response(&self.rcl_handle, &mut req, msg.void_ptr_mut())
};
if ret != RCL_RET_OK as i32 {
println!("action server: could send result request response. {}", ret);
println!(
"action server: could send result request response. {}",
Error::from_rcl_error(ret)
);
}
}
}
@ -1146,13 +1158,15 @@ where
let mut request_id = unsafe { request_id.assume_init() };
if let Some(response_msg) = response_msg {
println!("sending result msg");
let ret = unsafe {
rcl_action_send_result_response(&self.rcl_handle, &mut request_id, response_msg)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could send result request response. {}", ret);
println!(
"action server: could send result request response. {}",
Error::from_rcl_error(ret)
);
return;
}
} else {
@ -1285,37 +1299,39 @@ where
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
};
let feedback_msg = T::make_feedback_msg(uuid_msg, msg);
let mut native_msg = WrappedNativeMsg::<T::FeedbackMessage>::from(&feedback_msg);
let result = unsafe {
let ret = unsafe {
rcl_action_publish_feedback(
action_server.lock().unwrap().handle(),
native_msg.void_ptr_mut(),
)
};
if result == RCL_RET_OK as i32 {
if ret == RCL_RET_OK as i32 {
Ok(())
} else {
eprintln!("coult not publish {}", result);
eprintln!("coult not publish {}", Error::from_rcl_error(ret));
Ok(()) // todo: error codes
}
}
fn set_cancel(&mut self) {
// todo: error handling
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could not cancel goal: {}", ret);
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
}
@ -1324,7 +1340,7 @@ where
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
@ -1342,7 +1358,7 @@ where
if !goal_exists {
println!("tried to publish result without a goal");
return Ok(());
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
@ -1371,14 +1387,17 @@ where
};
if ret != RCL_RET_OK as i32 {
println!("action server: could not cancel goal: {}", ret);
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
// upgrade to actual ref. if still alive
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
@ -1395,8 +1414,8 @@ where
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to publish result without a goal");
return Ok(());
println!("tried to abort without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
@ -1425,7 +1444,7 @@ where
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
@ -1443,7 +1462,7 @@ where
if !goal_exists {
println!("tried to publish result without a goal");
return Ok(());
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
@ -1488,7 +1507,7 @@ where
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let action_server = action_server.lock().unwrap();
Ok(unsafe { rcl_action_server_is_valid(&action_server.rcl_handle) })
@ -2818,7 +2837,10 @@ where
T: WrappedActionTypeSupport,
{
pub fn get_status(&self) -> Result<GoalStatus> {
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let client = client.lock().unwrap();
Ok(client.get_goal_status(&self.uuid))
@ -2827,16 +2849,19 @@ where
pub fn get_result(&mut self) -> Result<impl Future<Output = Result<T::Result>>> {
if let Some(result_channel) = self.result.lock().unwrap().take() {
// upgrade to actual ref. if still alive
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_result_request(self.uuid);
Ok(result_channel.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID))
} else {
// todo: error codes...
println!("already asked for the result!");
Err(Error::RCL_RET_CLIENT_INVALID)
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
@ -2846,13 +2871,16 @@ where
} else {
// todo: error codes...
println!("someone else owns the feedback consumer stream");
Err(Error::RCL_RET_CLIENT_INVALID)
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
// upgrade to actual ref. if still alive
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_cancel_request(&self.uuid)
@ -2871,7 +2899,10 @@ where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
let uuid = uuid::Uuid::new_v4();
@ -2903,7 +2934,7 @@ where
// instead of "canceled" we return invalid client.
let fut_client = Weak::clone(&self.client);
let future = goal_req_receiver
.map_err(|_| Error::RCL_RET_CLIENT_INVALID)
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
.map(move |r| match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
@ -2916,7 +2947,7 @@ where
})
} else {
println!("goal rejected");
Err(Error::GoalRejected)
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
}
}
Err(e) => Err(e),

View File

@ -1,7 +1,7 @@
use futures::stream::StreamExt;
use tokio::task;
use std::sync::{Arc, Mutex};
use r2r;
use std::sync::{Arc, Mutex};
use tokio::task;
#[tokio::test(flavor = "multi_thread")]
async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
@ -15,7 +15,9 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
task::spawn(async move {
(0..10).for_each(|i| {
p_the_no.publish(&r2r::std_msgs::msg::Int32{data: i}).unwrap();
p_the_no
.publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap();
});
});
@ -23,8 +25,12 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
loop {
match s_the_no.next().await {
Some(msg) => {
p_new_no.publish(&r2r::std_msgs::msg::Int32{data: msg.data + 10}).unwrap();
},
p_new_no
.publish(&r2r::std_msgs::msg::Int32 {
data: msg.data + 10,
})
.unwrap();
}
None => break,
}
}
@ -39,7 +45,7 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
if i == 19 {
*s.lock().unwrap() = 19;
}
},
}
None => break,
}
}
@ -52,7 +58,7 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
if *x == 19 {
break;
}
};
}
state.lock().unwrap().clone()
});