From 59337d7d197c8fe54fd5eda0fd18710821c7fe7e Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Sun, 25 Jul 2021 11:08:22 +0200 Subject: [PATCH] tidying --- actions/build.rs | 16 ++++--- actions/src/lib.rs | 2 +- build.rs | 6 ++- examples/action_client.rs | 27 ++++++++---- examples/action_server.rs | 93 +++++++++++++++++++++++---------------- examples/client.rs | 9 ++-- examples/publishers.rs | 5 ++- examples/rostopic_echo.rs | 31 +++++++------ examples/subscriber.rs | 24 ++++++---- examples/tokio.rs | 44 +++++++++++------- examples/wall_timer.rs | 3 +- msg_gen/src/lib.rs | 37 ++++++++-------- src/error.rs | 22 +++++++-- src/lib.rs | 93 ++++++++++++++++++++++++++------------- tests/tokio_testing.rs | 26 ++++++----- 15 files changed, 277 insertions(+), 161 deletions(-) diff --git a/actions/build.rs b/actions/build.rs index dd45097..ae2cf24 100644 --- a/actions/build.rs +++ b/actions/build.rs @@ -60,18 +60,24 @@ fn main() { .derive_copy(true) // whitelist a few specific things that we need. .whitelist_recursively(false) - .whitelist_type("rcl_action_client_t").opaque_type("rcl_action_client_t") - .whitelist_type("rcl_action_server_t").opaque_type("rcl_action_server_t") + .whitelist_type("rcl_action_client_t") + .opaque_type("rcl_action_client_t") + .whitelist_type("rcl_action_server_t") + .opaque_type("rcl_action_server_t") .whitelist_type("rcl_action_goal_info_t") - .whitelist_type("rcl_action_goal_handle_t").opaque_type("rcl_action_goal_handle_t") + .whitelist_type("rcl_action_goal_handle_t") + .opaque_type("rcl_action_goal_handle_t") .whitelist_type("rcl_action_cancel_request_t") .whitelist_type("rcl_action_cancel_response_t") .whitelist_type("rcl_action_goal_event_t") - .whitelist_type("rcl_action_goal_state_t").opaque_type("rcl_action_goal_state_t") - .whitelist_type("rcl_action_goal_status_array_t").opaque_type("rcl_action_goal_status_array_t") + .whitelist_type("rcl_action_goal_state_t") + .opaque_type("rcl_action_goal_state_t") + .whitelist_type("rcl_action_goal_status_array_t") + .opaque_type("rcl_action_goal_status_array_t") .whitelist_function("rcl_action_.*") .whitelist_type("rcl_action_client_options_t") .whitelist_type("rcl_action_server_options_t") + .whitelist_var("RCL_RET_ACTION_.*") .generate() .expect("Unable to generate bindings"); diff --git a/actions/src/lib.rs b/actions/src/lib.rs index 08a012c..f5fe8e5 100644 --- a/actions/src/lib.rs +++ b/actions/src/lib.rs @@ -5,5 +5,5 @@ #![allow(dead_code)] include!(concat!(env!("OUT_DIR"), "/action_bindings.rs")); -use rcl::*; use msg_gen::*; +use rcl::*; diff --git a/build.rs b/build.rs index 43901f7..1eac9b1 100644 --- a/build.rs +++ b/build.rs @@ -76,7 +76,11 @@ fn main() { // also "internal" feedback message type that wraps the feedback type with a uuid let feedback_msgname = format!("{}_FeedbackMessage", msg); - codegen.push_str(&msg_gen::generate_rust_msg(module, prefix, &feedback_msgname)); + codegen.push_str(&msg_gen::generate_rust_msg( + module, + prefix, + &feedback_msgname, + )); codegen.push_str(" }\n"); } diff --git a/examples/action_client.rs b/examples/action_client.rs index 48bd641..4095850 100644 --- a/examples/action_client.rs +++ b/examples/action_client.rs @@ -2,14 +2,16 @@ use futures::executor::LocalPool; use futures::future::FutureExt; use futures::stream::StreamExt; use futures::task::LocalSpawnExt; -use std::sync::{Arc,Mutex}; use r2r; use r2r::example_interfaces::action::Fibonacci; +use std::sync::{Arc, Mutex}; fn main() -> Result<(), Box> { let ctx = r2r::Context::create()?; let mut node = r2r::Node::create(ctx, "testnode", "")?; - let client = Arc::new(Mutex::new(node.create_action_client::("/fibonacci")?)); + let client = Arc::new(Mutex::new( + node.create_action_client::("/fibonacci")?, + )); // signal that we are done let done = Arc::new(Mutex::new(false)); @@ -41,18 +43,27 @@ fn main() -> Result<(), Box> { let nested_task_done = nested_task_done.clone(); let nested_goal = nested_goal.clone(); async move { - println!("new feedback msg {:?} -- {:?}", msg, nested_goal.get_status()); + println!( + "new feedback msg {:?} -- {:?}", + msg, + nested_goal.get_status() + ); // cancel the goal before it finishes. (comment out to complete the goal) if msg.sequence.len() == 8 { - nested_goal.cancel().unwrap(). - map(|r| { + nested_goal + .cancel() + .unwrap() + .map(|r| { println!("goal cancelled: {:?}", r); // we are done. *nested_task_done.lock().unwrap() = true; - }).await; + }) + .await; } - }})).unwrap(); + } + })) + .unwrap(); // await result in this task let result = goal.get_result().unwrap().await; @@ -60,7 +71,7 @@ fn main() -> Result<(), Box> { Ok(msg) => { println!("got result {:?}", msg); *task_done.lock().unwrap() = true; - }, + } Err(e) => println!("action failed: {:?}", e), } })?; diff --git a/examples/action_server.rs b/examples/action_server.rs index af11a7e..cdcbd49 100644 --- a/examples/action_server.rs +++ b/examples/action_server.rs @@ -1,13 +1,16 @@ use futures::executor::LocalPool; use futures::task::LocalSpawnExt; -use std::sync::{Arc,Mutex}; use r2r; -use r2r::ServerGoal; use r2r::example_interfaces::action::Fibonacci; +use r2r::ServerGoal; +use std::sync::{Arc, Mutex}; // note: cannot be blocking. fn accept_goal_cb(uuid: &uuid::Uuid, goal: &Fibonacci::Goal) -> bool { - println!("Got goal request with order {}, goal id: {}", goal.order, uuid); + println!( + "Got goal request with order {}, goal id: {}", + goal.order, uuid + ); // reject high orders goal.order < 100 } @@ -40,46 +43,60 @@ fn main() -> Result<(), Box> { // also we cannot block which is why we spawn the task let node_cb = node_cb.clone(); let done_cb = done_cb.clone(); - task_spawner.spawn_local(async move { - let mut timer = node_cb.lock().unwrap().create_wall_timer(std::time::Duration::from_millis(1000)) - .expect("could not create timer"); - let mut feedback_msg = Fibonacci::Feedback { - sequence: vec![0,1], - }; - g.publish_feedback(feedback_msg.clone()).expect("fail"); - let order = g.goal.order as usize; - for i in 1..order { - if g.is_cancelling() { - println!("Goal cancelled. quitting"); - let result_msg = Fibonacci::Result { - sequence: feedback_msg.sequence, - }; - g.cancel(result_msg).expect("could not send cancel request"); - // signal stopping of the node - *done_cb.lock().unwrap() = true; - return; - } - feedback_msg.sequence.push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]); + task_spawner + .spawn_local(async move { + let mut timer = node_cb + .lock() + .unwrap() + .create_wall_timer(std::time::Duration::from_millis(1000)) + .expect("could not create timer"); + let mut feedback_msg = Fibonacci::Feedback { + sequence: vec![0, 1], + }; g.publish_feedback(feedback_msg.clone()).expect("fail"); - println!("Sending feedback: {:?}", feedback_msg); + let order = g.goal.order as usize; + for i in 1..order { + if g.is_cancelling() { + println!("Goal cancelled. quitting"); + let result_msg = Fibonacci::Result { + sequence: feedback_msg.sequence, + }; + g.cancel(result_msg).expect("could not send cancel request"); + // signal stopping of the node + *done_cb.lock().unwrap() = true; + return; + } + feedback_msg + .sequence + .push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]); + g.publish_feedback(feedback_msg.clone()).expect("fail"); + println!("Sending feedback: {:?}", feedback_msg); - timer.tick().await.unwrap(); - } - let result_msg = Fibonacci::Result { - sequence: feedback_msg.sequence - }; - g.succeed(result_msg).expect("could not set result"); - // signal stopping of the node - *done_cb.lock().unwrap() = true; - }).expect("could not spawn task"); + timer.tick().await.unwrap(); + } + let result_msg = Fibonacci::Result { + sequence: feedback_msg.sequence, + }; + g.succeed(result_msg).expect("could not set result"); + // signal stopping of the node + *done_cb.lock().unwrap() = true; + }) + .expect("could not spawn task"); }; - let _server = node.lock().unwrap().create_action_server::("/fibonacci", - Box::new(accept_goal_cb), - Box::new(accept_cancel_cb), - Box::new(handle_goal_cb))?; + let _server = node + .lock() + .unwrap() + .create_action_server::( + "/fibonacci", + Box::new(accept_goal_cb), + Box::new(accept_cancel_cb), + Box::new(handle_goal_cb), + )?; loop { - node.lock().unwrap().spin_once(std::time::Duration::from_millis(100)); + node.lock() + .unwrap() + .spin_once(std::time::Duration::from_millis(100)); pool.run_until_stalled(); if *done.lock().unwrap() { break; diff --git a/examples/client.rs b/examples/client.rs index 70089eb..a0daaa4 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -5,14 +5,16 @@ use r2r; use r2r::example_interfaces::srv::AddTwoInts; -async fn requester_task(c: r2r::Client) -> Result<(), Box> { +async fn requester_task( + c: r2r::Client, +) -> Result<(), Box> { let mut x: i64 = 0; loop { let req = AddTwoInts::Request { a: 10 * x, b: x }; let resp = c.request(&req)?.await?; println!("{} + {} = {}", req.a, req.b, resp.sum); - x+=1; + x += 1; if x == 10 { break; } @@ -40,7 +42,8 @@ fn main() -> Result<(), Box> { match requester_task(client).await { Ok(()) => println!("exiting"), Err(e) => println!("error: {}", e), - }})?; + } + })?; loop { node.spin_once(std::time::Duration::from_millis(100)); diff --git a/examples/publishers.rs b/examples/publishers.rs index 37d4669..0782130 100644 --- a/examples/publishers.rs +++ b/examples/publishers.rs @@ -16,7 +16,10 @@ fn main() -> Result<(), Box> { positions.push(count as f64); let to_send = JointTrajectoryPoint { positions: positions.clone(), - time_from_start: Duration { sec: count, nanosec: 0 }, + time_from_start: Duration { + sec: count, + nanosec: 0, + }, ..Default::default() }; let mut native = r2r::WrappedNativeMsg::::new(); diff --git a/examples/rostopic_echo.rs b/examples/rostopic_echo.rs index 9be5259..199fa42 100644 --- a/examples/rostopic_echo.rs +++ b/examples/rostopic_echo.rs @@ -1,7 +1,7 @@ use futures::executor::LocalPool; -use futures::task::LocalSpawnExt; -use futures::stream::StreamExt; use futures::future; +use futures::stream::StreamExt; +use futures::task::LocalSpawnExt; use r2r; use std::collections::HashMap; use std::env; @@ -45,19 +45,22 @@ fn main() -> Result<(), Box> { let echo_pub = node.create_publisher_untyped(echo, type_name)?; let sub = node.subscribe_untyped(topic, type_name)?; - spawner.spawn_local(async move { sub.for_each(|msg| { - match msg { - Ok(msg) => { - let s = serde_json::to_string_pretty(&msg).unwrap(); - println!("{}\n---\n", &s); - echo_pub.publish(msg).unwrap(); + spawner.spawn_local(async move { + sub.for_each(|msg| { + match msg { + Ok(msg) => { + let s = serde_json::to_string_pretty(&msg).unwrap(); + println!("{}\n---\n", &s); + echo_pub.publish(msg).unwrap(); + } + Err(err) => { + println!("Could not parse msg. {}", err); + } } - Err(err) => { - println!("Could not parse msg. {}", err); - } - } - future::ready(()) - }).await})?; + future::ready(()) + }) + .await + })?; loop { node.spin_once(std::time::Duration::from_millis(100)); diff --git a/examples/subscriber.rs b/examples/subscriber.rs index 413d256..c096295 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -1,7 +1,7 @@ use futures::executor::LocalPool; -use futures::task::LocalSpawnExt; -use futures::stream::StreamExt; use futures::future; +use futures::stream::StreamExt; +use futures::task::LocalSpawnExt; use r2r; fn main() -> Result<(), Box> { @@ -20,21 +20,27 @@ fn main() -> Result<(), Box> { match sub.next().await { Some(msg) => { if x % 2 == 0 { - p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap(); + p.publish(&r2r::std_msgs::msg::String { + data: format!("({}): new msg: {}", x, msg.data), + }) + .unwrap(); } - }, + } None => break, } - x+=1; + x += 1; } })?; // for sub2 we just print the data let sub2 = node.subscribe::("/topic2")?; - spawner.spawn_local(async move { sub2.for_each(|msg| { - println!("topic2: new msg: {}", msg.data); - future::ready(()) - }).await})?; + spawner.spawn_local(async move { + sub2.for_each(|msg| { + println!("topic2: new msg: {}", msg.data); + future::ready(()) + }) + .await + })?; loop { node.spin_once(std::time::Duration::from_millis(100)); diff --git a/examples/tokio.rs b/examples/tokio.rs index 4d993ad..4024fe8 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -1,12 +1,12 @@ -use futures::stream::StreamExt; use futures::future; -use tokio::task; -use std::sync::{Arc, Mutex}; +use futures::stream::StreamExt; use r2r; +use std::sync::{Arc, Mutex}; +use tokio::task; #[derive(Debug, Default)] struct SharedState { - pub state: i32 + pub state: i32, } #[tokio::main] @@ -25,40 +25,50 @@ async fn main() -> Result<(), Box> { match sub.next().await { Some(msg) => { if x % 2 == 0 { - p.publish(&r2r::std_msgs::msg::String { data: format!("({}): new msg: {}", x, msg.data) }).unwrap(); + p.publish(&r2r::std_msgs::msg::String { + data: format!("({}): new msg: {}", x, msg.data), + }) + .unwrap(); } else { // update shared state state_t1.lock().unwrap().state = x; } - }, + } None => break, } - x+=1; + x += 1; } }); // for sub2 we just print the data let sub2 = node.subscribe::("/topic2")?; - task::spawn(async move { sub2.for_each(|msg| { - println!("topic2: new msg: {}", msg.data); - future::ready(()) - }).await}); + task::spawn(async move { + sub2.for_each(|msg| { + println!("topic2: new msg: {}", msg.data); + future::ready(()) + }) + .await + }); - let mut timer = node.create_wall_timer(std::time::Duration::from_millis(2500)).unwrap(); + let mut timer = node + .create_wall_timer(std::time::Duration::from_millis(2500)) + .unwrap(); let state_t2 = state.clone(); task::spawn(async move { loop { let time_passed = timer.tick().await.unwrap(); let x = state_t2.lock().unwrap().state; - println!("timer event. time passed: {}. shared state is {}", time_passed.as_micros(), x); + println!( + "timer event. time passed: {}. shared state is {}", + time_passed.as_micros(), + x + ); } }); // here we spin the node in its own thread (but we could just busy wait in this thread) - let handle = std::thread::spawn(move || { - loop { - node.spin_once(std::time::Duration::from_millis(100)); - } + let handle = std::thread::spawn(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); }); handle.join().unwrap(); diff --git a/examples/wall_timer.rs b/examples/wall_timer.rs index 411f2c4..7808771 100644 --- a/examples/wall_timer.rs +++ b/examples/wall_timer.rs @@ -33,7 +33,8 @@ fn main() -> Result<(), Box> { match timer_task(timer).await { Ok(()) => println!("exiting"), Err(e) => println!("error: {}", e), - }})?; + } + })?; loop { node.spin_once(std::time::Duration::from_millis(100)); diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index e3934cf..99b05a2 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -52,8 +52,7 @@ fn field_type(t: u8) -> String { } else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_LONG_DOUBLE as u8) { // f128 does not exist in rust "u128".to_owned() - } - else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) { + } else if t == (rosidl_typesupport_introspection_c__ROS_TYPE_MESSAGE as u8) { "message".to_owned() } else { panic!("ros native type not implemented: {}", t); @@ -223,9 +222,9 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String { // TODO: refactor... // handle special case of ActionName_ServiceName_Response let nn = name.splitn(3, "_").collect::>(); - if let [ _mod_name, _srv_name, msg_name ] = &nn[..] { + if let [_mod_name, _srv_name, msg_name] = &nn[..] { name = msg_name.to_string(); - } else if let [ _mod_name, msg_name ] = &nn[..] { + } else if let [_mod_name, msg_name] = &nn[..] { name = msg_name.to_string(); } else { panic!("malformed service name {}", name); @@ -241,21 +240,21 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String { let (module, prefix, name, _, _) = introspection(member.members_); // hack here to rustify nested action type names if prefix == "action" { - if let Some((n1,n2)) = name.rsplit_once("_") { - format!("{module}::{prefix}::{srvname}::{msgname}", - module = module, - prefix = prefix, - srvname = n1, - msgname = n2 - ) - } - else { + if let Some((n1, n2)) = name.rsplit_once("_") { format!( - "{module}::{prefix}::{msgname}", - module = module, - prefix = prefix, - msgname = name - ) + "{module}::{prefix}::{srvname}::{msgname}", + module = module, + prefix = prefix, + srvname = n1, + msgname = n2 + ) + } else { + format!( + "{module}::{prefix}::{msgname}", + module = module, + prefix = prefix, + msgname = name + ) } } else { format!( @@ -351,7 +350,7 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String { let (module, prefix, name, _, _) = introspection(member.members_); // same hack as above to rustify message type names if prefix == "action" { - if let Some((n1,n2)) = name.rsplit_once("_") { + if let Some((n1, n2)) = name.rsplit_once("_") { from_native.push_str(&format!("{field_name}: {module}::{prefix}::{srvname}::{msgname}::from_native(&msg.{field_name}),\n", field_name = field_name, module = module, prefix=prefix, srvname = n1, msgname = n2)); } else { panic!("ooops at from_native"); diff --git a/src/error.rs b/src/error.rs index 318d5e1..d3d2aac 100644 --- a/src/error.rs +++ b/src/error.rs @@ -82,9 +82,25 @@ pub enum Error { #[error("Serde error: {}", err)] SerdeError { err: String }, - // action errors. perhaps they exist in the rcl? - #[error("Goal rejected by server.")] - GoalRejected, + // action errors. + #[error("RCL_RET_ACTION_NAME_INVALID")] + RCL_RET_ACTION_NAME_INVALID, + #[error("RCL_RET_ACTION_GOAL_ACCEPTED")] + RCL_RET_ACTION_GOAL_ACCEPTED, + #[error("RCL_RET_ACTION_GOAL_REJECTED")] + RCL_RET_ACTION_GOAL_REJECTED, + #[error("RCL_RET_ACTION_CLIENT_INVALID")] + RCL_RET_ACTION_CLIENT_INVALID, + #[error("RCL_RET_ACTION_CLIENT_TAKE_FAILED")] + RCL_RET_ACTION_CLIENT_TAKE_FAILED, + #[error("RCL_RET_ACTION_SERVER_INVALID")] + RCL_RET_ACTION_SERVER_INVALID, + #[error("RCL_RET_ACTION_SERVER_TAKE_FAILED")] + RCL_RET_ACTION_SERVER_TAKE_FAILED, + #[error("RCL_RET_ACTION_GOAL_HANDLE_INVALID")] + RCL_RET_ACTION_GOAL_HANDLE_INVALID, + #[error("RCL_RET_ACTION_GOAL_EVENT_INVALID")] + RCL_RET_ACTION_GOAL_EVENT_INVALID, #[error("Goal cancel request rejected by server.")] GoalCancelRejected, diff --git a/src/lib.rs b/src/lib.rs index cddc0f2..ddbb124 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1059,7 +1059,6 @@ where let gi = action_msgs::msg::GoalInfo::from_native(&goal_info); let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone())); println!("goal expired: {} - {}", uuid, num_expired); - // todo: delete info about goal. self.goals.remove(&uuid); self.result_msgs.remove(&uuid); self.result_requests.remove(&uuid); @@ -1071,13 +1070,23 @@ where let mut status = rcl_action_get_zero_initialized_goal_status_array(); let ret = rcl_action_get_goal_status_array(&self.rcl_handle, &mut status); if ret != RCL_RET_OK as i32 { - println!("action server: failed to get goal status array: {}", ret); + println!( + "action server: failed to get goal status array: {}", + Error::from_rcl_error(ret) + ); + return; } - // todo: error handling... - rcl_action_publish_status( + let ret = rcl_action_publish_status( &self.rcl_handle, &status as *const _ as *const std::os::raw::c_void, ); + if ret != RCL_RET_OK as i32 { + println!( + "action server: failed to publish status: {}", + Error::from_rcl_error(ret) + ); + return; + } rcl_action_goal_status_array_fini(&mut status); } } @@ -1091,7 +1100,10 @@ where rcl_action_send_result_response(&self.rcl_handle, &mut req, msg.void_ptr_mut()) }; if ret != RCL_RET_OK as i32 { - println!("action server: could send result request response. {}", ret); + println!( + "action server: could send result request response. {}", + Error::from_rcl_error(ret) + ); } } } @@ -1146,13 +1158,15 @@ where let mut request_id = unsafe { request_id.assume_init() }; if let Some(response_msg) = response_msg { - println!("sending result msg"); let ret = unsafe { rcl_action_send_result_response(&self.rcl_handle, &mut request_id, response_msg) }; if ret != RCL_RET_OK as i32 { - println!("action server: could send result request response. {}", ret); + println!( + "action server: could send result request response. {}", + Error::from_rcl_error(ret) + ); return; } } else { @@ -1285,37 +1299,39 @@ where let action_server = self .server .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; let uuid_msg = unique_identifier_msgs::msg::UUID { uuid: self.uuid.as_bytes().to_vec(), }; let feedback_msg = T::make_feedback_msg(uuid_msg, msg); let mut native_msg = WrappedNativeMsg::::from(&feedback_msg); - let result = unsafe { + let ret = unsafe { rcl_action_publish_feedback( action_server.lock().unwrap().handle(), native_msg.void_ptr_mut(), ) }; - if result == RCL_RET_OK as i32 { + if ret == RCL_RET_OK as i32 { Ok(()) } else { - eprintln!("coult not publish {}", result); + eprintln!("coult not publish {}", Error::from_rcl_error(ret)); Ok(()) // todo: error codes } } fn set_cancel(&mut self) { - // todo: error handling let ret = unsafe { let handle = self.handle.lock().unwrap(); rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL) }; if ret != RCL_RET_OK as i32 { - println!("action server: could not cancel goal: {}", ret); + println!( + "action server: could not cancel goal: {}", + Error::from_rcl_error(ret) + ); } } @@ -1324,7 +1340,7 @@ where let action_server = self .server .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; let mut action_server = action_server.lock().unwrap(); // todo: check that the goal exists @@ -1342,7 +1358,7 @@ where if !goal_exists { println!("tried to publish result without a goal"); - return Ok(()); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); } // todo: error handling @@ -1371,14 +1387,17 @@ where }; if ret != RCL_RET_OK as i32 { - println!("action server: could not cancel goal: {}", ret); + println!( + "action server: could not cancel goal: {}", + Error::from_rcl_error(ret) + ); } // upgrade to actual ref. if still alive let action_server = self .server .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; let mut action_server = action_server.lock().unwrap(); // todo: check that the goal exists @@ -1395,8 +1414,8 @@ where unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; if !goal_exists { - println!("tried to publish result without a goal"); - return Ok(()); + println!("tried to abort without a goal"); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); } // todo: error handling @@ -1425,7 +1444,7 @@ where let action_server = self .server .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; let mut action_server = action_server.lock().unwrap(); // todo: check that the goal exists @@ -1443,7 +1462,7 @@ where if !goal_exists { println!("tried to publish result without a goal"); - return Ok(()); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); } // todo: error handling @@ -1488,7 +1507,7 @@ where let action_server = self .server .upgrade() - .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?; // todo: error codes + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; let action_server = action_server.lock().unwrap(); Ok(unsafe { rcl_action_server_is_valid(&action_server.rcl_handle) }) @@ -2818,7 +2837,10 @@ where T: WrappedActionTypeSupport, { pub fn get_status(&self) -> Result { - let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; let client = client.lock().unwrap(); Ok(client.get_goal_status(&self.uuid)) @@ -2827,16 +2849,19 @@ where pub fn get_result(&mut self) -> Result>> { if let Some(result_channel) = self.result.lock().unwrap().take() { // upgrade to actual ref. if still alive - let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); client.send_result_request(self.uuid); - Ok(result_channel.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)) } else { // todo: error codes... println!("already asked for the result!"); - Err(Error::RCL_RET_CLIENT_INVALID) + Err(Error::RCL_RET_ACTION_CLIENT_INVALID) } } @@ -2846,13 +2871,16 @@ where } else { // todo: error codes... println!("someone else owns the feedback consumer stream"); - Err(Error::RCL_RET_CLIENT_INVALID) + Err(Error::RCL_RET_ACTION_CLIENT_INVALID) } } pub fn cancel(&self) -> Result>> { // upgrade to actual ref. if still alive - let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); client.send_cancel_request(&self.uuid) @@ -2871,7 +2899,10 @@ where T: WrappedActionTypeSupport, { // upgrade to actual ref. if still alive - let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = self + .client + .upgrade() + .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?; let mut client = client.lock().unwrap(); let uuid = uuid::Uuid::new_v4(); @@ -2903,7 +2934,7 @@ where // instead of "canceled" we return invalid client. let fut_client = Weak::clone(&self.client); let future = goal_req_receiver - .map_err(|_| Error::RCL_RET_CLIENT_INVALID) + .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID) .map(move |r| match r { Ok(resp) => { let (accepted, _stamp) = T::destructure_goal_response_msg(resp); @@ -2916,7 +2947,7 @@ where }) } else { println!("goal rejected"); - Err(Error::GoalRejected) + Err(Error::RCL_RET_ACTION_GOAL_REJECTED) } } Err(e) => Err(e), diff --git a/tests/tokio_testing.rs b/tests/tokio_testing.rs index 3260d93..0678181 100644 --- a/tests/tokio_testing.rs +++ b/tests/tokio_testing.rs @@ -1,7 +1,7 @@ use futures::stream::StreamExt; -use tokio::task; -use std::sync::{Arc, Mutex}; use r2r; +use std::sync::{Arc, Mutex}; +use tokio::task; #[tokio::test(flavor = "multi_thread")] async fn tokio_testing() -> Result<(), Box> { @@ -15,7 +15,9 @@ async fn tokio_testing() -> Result<(), Box> { task::spawn(async move { (0..10).for_each(|i| { - p_the_no.publish(&r2r::std_msgs::msg::Int32{data: i}).unwrap(); + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: i }) + .unwrap(); }); }); @@ -23,8 +25,12 @@ async fn tokio_testing() -> Result<(), Box> { loop { match s_the_no.next().await { Some(msg) => { - p_new_no.publish(&r2r::std_msgs::msg::Int32{data: msg.data + 10}).unwrap(); - }, + p_new_no + .publish(&r2r::std_msgs::msg::Int32 { + data: msg.data + 10, + }) + .unwrap(); + } None => break, } } @@ -36,10 +42,10 @@ async fn tokio_testing() -> Result<(), Box> { match s_new_no.next().await { Some(msg) => { let i = msg.data; - if i==19 { + if i == 19 { *s.lock().unwrap() = 19; } - }, + } None => break, } } @@ -52,11 +58,11 @@ async fn tokio_testing() -> Result<(), Box> { if *x == 19 { break; } - }; + } state.lock().unwrap().clone() }); - let x = handle.join().unwrap(); + let x = handle.join().unwrap(); assert_eq!(x, 19); Ok(()) -} \ No newline at end of file +}