action servers are now streams

This commit is contained in:
Martin Dahl 2021-08-16 10:35:51 +02:00
parent ea06f203ca
commit ac51828cad
9 changed files with 537 additions and 415 deletions

View File

@ -24,6 +24,7 @@ futures = "0.3.15"
serde_json = "1.0.62"
futures = "0.3.15"
tokio = { version = "1", features = ["full"] }
rand = "0.8.4"
[build-dependencies]
common = { path = "common", version = "0.1.0" }

View File

@ -20,7 +20,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
println!("action service available.");
let goal = Fibonacci::Goal { order: 10 };
let goal = Fibonacci::Goal { order: 5 };
println!("sending goal: {:?}", goal);
let goal_fut = client.send_goal_request(goal)?;
@ -30,14 +30,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let task_spawner = spawner.clone();
let task_done = done.clone();
spawner.spawn_local(async move {
let mut goal = goal_fut.await.unwrap(); // assume success
let (goal, result, feedback) = goal_fut.await.unwrap(); // assume success
println!("goal accepted: {}", goal.uuid);
// process feedback stream in its own task
let nested_goal = goal.clone();
let nested_task_done = task_done.clone();
task_spawner
.spawn_local(goal.get_feedback().unwrap().for_each(move |msg| {
.spawn_local(feedback.for_each(move |msg| {
let nested_task_done = nested_task_done.clone();
let nested_goal = nested_goal.clone();
async move {
@ -47,8 +47,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
nested_goal.get_status()
);
// cancel the goal before it finishes. (comment out to complete the goal)
if msg.sequence.len() == 8 {
// 50/50 that cancel the goal before it finishes.
if msg.sequence.len() == 4 && rand::random::<bool>() {
nested_goal
.cancel()
.unwrap()
@ -64,10 +64,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap();
// await result in this task
let result = goal.get_result().unwrap().await;
match result {
Ok(msg) => {
println!("got result {:?}", msg);
match result.await {
Ok((status, msg)) => {
println!("got result {} with msg {:?}", status, msg);
*task_done.lock().unwrap() = true;
}
Err(e) => println!("action failed: {:?}", e),

View File

@ -1,105 +1,109 @@
use futures::executor::LocalPool;
use futures::executor::{LocalPool, LocalSpawner};
use futures::future::{self, Either};
use futures::stream::{Stream, StreamExt};
use futures::task::LocalSpawnExt;
use r2r;
use r2r::example_interfaces::action::Fibonacci;
use r2r::ServerGoal;
use std::sync::{Arc, Mutex};
// note: cannot be blocking.
fn accept_goal_cb(uuid: &uuid::Uuid, goal: &Fibonacci::Goal) -> bool {
println!(
"Got goal request with order {}, goal id: {}",
goal.order, uuid
);
// reject high orders
goal.order < 100
// main goal handling routine.
async fn run_goal(
node: Arc<Mutex<r2r::Node>>,
g: r2r::ServerGoal<Fibonacci::Action>,
) -> Fibonacci::Result {
let mut timer = node // local timer, will be dropped after this request is processed.
.lock()
.unwrap()
.create_wall_timer(std::time::Duration::from_millis(1000))
.expect("could not create timer");
let mut feedback_msg = Fibonacci::Feedback {
sequence: vec![0, 1],
};
g.publish_feedback(feedback_msg.clone()).expect("fail");
let order = g.goal.order as usize;
for i in 1..order {
feedback_msg
.sequence
.push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]);
g.publish_feedback(feedback_msg.clone()).expect("fail");
println!("Sending feedback: {:?}", feedback_msg);
timer.tick().await.unwrap();
}
Fibonacci::Result {
sequence: feedback_msg.sequence,
}
}
// note: cannot be blocking.
fn accept_cancel_cb(goal: &ServerGoal<Fibonacci::Action>) -> bool {
println!("Got request to cancel {}", goal.uuid);
// always accept cancel requests
true
async fn fibonacci_server(
spawner: LocalSpawner,
node: Arc<Mutex<r2r::Node>>,
mut requests: impl Stream<Item = r2r::GoalRequest<Fibonacci::Action>> + Unpin,
) {
loop {
match requests.next().await {
Some(req) => {
println!(
"Got goal request with order {}, goal id: {}",
req.goal.order, req.uuid
);
// reject high orders
if req.goal.order >= 100 {
req.reject().unwrap();
continue;
}
let (mut g, mut cancel) = req.accept().unwrap();
let goal_fut = spawner
.spawn_local_with_handle(run_goal(node.clone(), g.clone()))
.unwrap();
match future::select(goal_fut, cancel.next()).await {
Either::Left((result, _)) => {
// 50/50 that we succeed or abort
if rand::random::<bool>() {
println!("goal completed!");
g.succeed(result).expect("could not send result");
} else {
println!("goal aborted!");
g.abort(result).expect("could not send result");
}
}
Either::Right((request, _)) => {
if let Some(request) = request {
println!("got cancel request: {}", request.uuid);
request.accept();
}
}
};
}
None => break,
}
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let task_spawner = spawner.clone();
let ctx = r2r::Context::create()?;
let node = Arc::new(Mutex::new(r2r::Node::create(ctx, "testnode", "")?));
// signal that we are done
let done = Arc::new(Mutex::new(false));
let node_cb = node.clone();
let done_cb = done.clone();
let handle_goal_cb = move |mut g: ServerGoal<Fibonacci::Action>| {
// note that we cannot create the timer here, since we are
// called during spin_once which menas whoever is spinning holds the mutex.
// instead we just set up and immediately start a task.
// also we cannot block which is why we spawn the task
let node_cb = node_cb.clone();
let done_cb = done_cb.clone();
task_spawner
.spawn_local(async move {
let mut timer = node_cb
.lock()
.unwrap()
.create_wall_timer(std::time::Duration::from_millis(1000))
.expect("could not create timer");
let mut feedback_msg = Fibonacci::Feedback {
sequence: vec![0, 1],
};
g.publish_feedback(feedback_msg.clone()).expect("fail");
let order = g.goal.order as usize;
for i in 1..order {
if g.is_cancelling() {
println!("Goal cancelled. quitting");
let result_msg = Fibonacci::Result {
sequence: feedback_msg.sequence,
};
g.cancel(result_msg).expect("could not send cancel request");
// signal stopping of the node
*done_cb.lock().unwrap() = true;
return;
}
feedback_msg
.sequence
.push(feedback_msg.sequence[i] + feedback_msg.sequence[i - 1]);
g.publish_feedback(feedback_msg.clone()).expect("fail");
println!("Sending feedback: {:?}", feedback_msg);
timer.tick().await.unwrap();
}
let result_msg = Fibonacci::Result {
sequence: feedback_msg.sequence,
};
g.succeed(result_msg).expect("could not set result");
// signal stopping of the node
*done_cb.lock().unwrap() = true;
})
.expect("could not spawn task");
};
let _server = node
let server_requests = node
.lock()
.unwrap()
.create_action_server::<Fibonacci::Action>(
"/fibonacci",
Box::new(accept_goal_cb),
Box::new(accept_cancel_cb),
Box::new(handle_goal_cb),
)?;
.create_action_server::<Fibonacci::Action>("/fibonacci")?;
while !*done.lock().unwrap() {
let node_cb = node.clone();
spawner
.spawn_local(fibonacci_server(spawner.clone(), node_cb, server_requests))
.unwrap();
loop {
node.lock()
.unwrap()
.spin_once(std::time::Duration::from_millis(100));
pool.run_until_stalled();
}
Ok(())
}

View File

@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"responding with: {} + {} = {}",
req.message.a, req.message.b, resp.sum
);
req.respond(resp);
req.respond(resp).expect("could not send service response");
}
None => break,
}
@ -65,7 +65,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};
// wait a bit before answering...
let _ret = timer2.tick().await;
req.respond(resp);
req.respond(resp).expect("could not send service response");
}
},
elapsed = timer2.tick().fuse() => {

View File

@ -17,8 +17,6 @@ where
{
client: Weak<Mutex<WrappedActionClient<T>>>,
pub uuid: uuid::Uuid,
feedback: Arc<Mutex<Option<mpsc::Receiver<T::Feedback>>>>,
result: Arc<Mutex<Option<oneshot::Receiver<T::Result>>>>,
}
impl<T: 'static> ClientGoal<T>
@ -35,35 +33,6 @@ where
Ok(client.get_goal_status(&self.uuid))
}
pub fn get_result(&mut self) -> Result<impl Future<Output = Result<T::Result>>> {
if let Some(result_channel) = self.result.lock().unwrap().take() {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_result_request(self.uuid);
Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID))
} else {
// todo: error codes...
println!("already asked for the result!");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn get_feedback(&self) -> Result<impl Stream<Item = T::Feedback> + Unpin> {
if let Some(feedback_channel) = self.feedback.lock().unwrap().take() {
Ok(feedback_channel)
} else {
// todo: error codes...
println!("someone else owns the feedback consumer stream");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
// upgrade to actual ref. if still alive
let client = self
@ -83,7 +52,13 @@ where
pub fn send_goal_request(
&self,
goal: T::Goal,
) -> Result<impl Future<Output = Result<ClientGoal<T>>>>
) -> Result<
impl Future<Output = Result<
(
ClientGoal<T>,
impl Future<Output = Result<(GoalStatus, T::Result)>>,
impl Stream<Item = T::Feedback> + Unpin,
)>>>
where
T: WrappedActionTypeSupport,
{
@ -111,9 +86,9 @@ where
let (goal_req_sender, goal_req_receiver) = oneshot::channel::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(1);
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(10);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) = oneshot::channel::<T::Result>();
let (result_sender, result_receiver) = oneshot::channel::<(GoalStatus, T::Result)>();
client.result_senders.push((uuid, result_sender));
if result == RCL_RET_OK as i32 {
@ -128,12 +103,21 @@ where
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
if accepted {
Ok(ClientGoal {
// on goal accept we immediately send the result request
{
let c = fut_client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut c = c.lock().unwrap();
c.send_result_request(uuid.clone());
}
Ok((ClientGoal {
client: fut_client,
uuid,
feedback: Arc::new(Mutex::new(Some(feedback_receiver))),
result: Arc::new(Mutex::new(Some(result_receiver))),
})
}, result_receiver
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
feedback_receiver))
} else {
println!("goal rejected");
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
@ -219,6 +203,22 @@ impl GoalStatus {
}
}
impl std::fmt::Display for GoalStatus {
fn fmt(&self, fmtr: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
GoalStatus::Unknown => "unknown",
GoalStatus::Accepted => "accepted",
GoalStatus::Executing => "executing",
GoalStatus::Canceling => "canceling",
GoalStatus::Succeeded => "succeeded",
GoalStatus::Canceled => "canceled",
GoalStatus::Aborted => "aborted",
};
write!(fmtr, "{}", s)
}
}
pub struct WrappedActionClient<T>
where
T: WrappedActionTypeSupport,
@ -233,7 +233,7 @@ where
pub cancel_response_channels: Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
pub result_requests: Vec<(i64, uuid::Uuid)>,
pub result_senders: Vec<(uuid::Uuid, oneshot::Sender<T::Result>)>,
pub result_senders: Vec<(uuid::Uuid, oneshot::Sender<(GoalStatus, T::Result)>)>,
pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
}
@ -469,15 +469,10 @@ where
let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
let (status, result) = T::destructure_result_response_msg(response);
let status = GoalStatus::from_rcl(status);
if status != GoalStatus::Succeeded {
println!("goal status failed: {:?}, result: {:?}", status, result);
// this will drop the sender which makes the receiver fail with "canceled"
} else {
match sender.send(result) {
Ok(()) => {}
Err(e) => {
println!("error sending result to action client: {:?}", e);
}
match sender.send((status, result)) {
Ok(()) => {}
Err(e) => {
println!("error sending result to action client: {:?}", e);
}
}
}

View File

@ -1,4 +1,7 @@
use super::*;
// use core::pin::Pin;
// use futures::prelude::*;
use futures::future::{join_all, JoinAll};
#[derive(Clone)]
pub struct ActionServer<T>
@ -24,34 +27,162 @@ where
}
}
pub fn make_action_server<T>(s: Weak<Mutex<WrappedActionServer<T>>>) -> ActionServer<T>
where
T: WrappedActionTypeSupport,
{
ActionServer { server: s }
}
pub trait ActionServer_ {
fn handle(&self) -> &rcl_action_server_t;
fn handle_mut(&mut self) -> &mut rcl_action_server_t;
fn handle_goal_request(&mut self, server: Arc<Mutex<dyn ActionServer_>>) -> ();
fn send_completed_cancel_requests(&mut self) -> ();
fn handle_cancel_request(&mut self) -> ();
fn handle_result_request(&mut self) -> ();
fn handle_goal_expired(&mut self) -> ();
fn publish_status(&self) -> ();
fn set_goal_state(
&mut self,
uuid: &uuid::Uuid,
new_state: rcl_action_goal_event_t,
) -> Result<()>;
fn add_result(&mut self, uuid: uuid::Uuid, msg: Box<dyn VoidPtr>) -> ();
fn cancel_goal(&mut self, uuid: &uuid::Uuid);
fn is_cancelling(&self, uuid: &uuid::Uuid) -> Result<bool>;
fn add_goal_handle(
&mut self,
uuid: uuid::Uuid,
goal_handle: *mut rcl_action_goal_handle_t,
) -> ();
fn destroy(&mut self, node: &mut rcl_node_t);
}
pub struct CancelRequest {
pub uuid: uuid::Uuid,
response_sender: oneshot::Sender<(uuid::Uuid, bool)>,
}
impl CancelRequest {
/// Accepts the cancel request. The action server should now cancel the corresponding goal.
pub fn accept(self) {
match self.response_sender.send((self.uuid, true)) {
Err(_) => eprintln!("warning: could not send goal canellation accept msg"),
_ => (),
}
}
/// Rejects the cancel request.
pub fn reject(self) {
match self.response_sender.send((self.uuid, false)) {
Err(_) => eprintln!("warning: could not send goal cancellation rejection"),
_ => (),
}
}
}
pub struct GoalRequest<T>
where
T: WrappedActionTypeSupport,
{
pub uuid: uuid::Uuid,
pub goal: T::Goal,
cancel_requests: mpsc::Receiver<CancelRequest>,
server: Weak<Mutex<dyn ActionServer_>>,
request_id: rmw_request_id_t,
}
impl<T: 'static> GoalRequest<T>
where
T: WrappedActionTypeSupport,
{
/// Accept the goal request and become a ServerGoal.
/// Returns a handle to the goal and a stream on which cancel requests can be received.
pub fn accept(mut self) -> Result<(ServerGoal<T>, impl Stream<Item = CancelRequest> + Unpin)> {
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
};
let time = builtin_interfaces::msg::Time::default();
let goal_info = action_msgs::msg::GoalInfo {
goal_id: uuid_msg,
stamp: time.clone(),
};
let native_goal_info = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
let server = self.server.upgrade().unwrap(); // todo fixme
let mut server = server.lock().unwrap();
let goal_handle: *mut rcl_action_goal_handle_t =
unsafe { rcl_action_accept_new_goal(server.handle_mut(), &*native_goal_info) };
// send response
let response_msg = T::make_goal_response_msg(true, time);
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::from(&response_msg);
let ret = unsafe {
rcl_action_send_goal_response(
server.handle_mut(),
&mut self.request_id,
response_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(ret));
}
unsafe {
rcl_action_update_goal_state(goal_handle, rcl_action_goal_event_t::GOAL_EVENT_EXECUTE);
}
server.publish_status();
let g = ServerGoal {
uuid: self.uuid.clone(),
goal: self.goal,
server: self.server,
};
// server.goals.insert(g.uuid.clone(), goal_handle);
server.add_goal_handle(g.uuid.clone(), goal_handle);
return Ok((g, self.cancel_requests));
}
/// reject the goal request and be consumed in the process
pub fn reject(mut self) -> Result<()> {
let time = builtin_interfaces::msg::Time::default();
let server = self.server.upgrade().unwrap(); // todo fixme
let mut server = server.lock().unwrap();
let response_msg = T::make_goal_response_msg(true, time);
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::from(&response_msg);
let ret = unsafe {
rcl_action_send_goal_response(
server.handle_mut(),
&mut self.request_id,
response_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(ret));
}
Ok(())
}
}
pub struct WrappedActionServer<T>
where
T: WrappedActionTypeSupport,
{
pub rcl_handle: rcl_action_server_t,
pub clock_handle: Box<rcl_clock_t>,
pub accept_goal_cb: Box<dyn FnMut(&uuid::Uuid, &T::Goal) -> bool>,
pub accept_cancel_cb: Box<dyn FnMut(&ServerGoal<T>) -> bool>,
pub goal_cb: Box<dyn FnMut(ServerGoal<T>)>,
pub goals: HashMap<uuid::Uuid, ServerGoal<T>>,
pub goal_request_sender: mpsc::Sender<GoalRequest<T>>,
pub cancel_senders: HashMap<uuid::Uuid, mpsc::Sender<CancelRequest>>,
pub active_cancel_requests: Vec<(
rmw_request_id_t,
action_msgs::srv::CancelGoal::Response,
JoinAll<oneshot::Receiver<(uuid::Uuid, bool)>>,
)>,
pub goals: HashMap<uuid::Uuid, *mut rcl_action_goal_handle_t>,
pub result_msgs: HashMap<uuid::Uuid, Box<dyn VoidPtr>>,
pub result_requests: HashMap<uuid::Uuid, Vec<rmw_request_id_t>>,
}
@ -64,6 +195,157 @@ where
&self.rcl_handle
}
fn handle_mut(&mut self) -> &mut rcl_action_server_t {
&mut self.rcl_handle
}
fn is_cancelling(&self, uuid: &uuid::Uuid) -> Result<bool> {
if let Some(handle) = self.goals.get(uuid) {
let mut state = 0u8; // TODO: int8 STATUS_UNKNOWN = 0;
let ret = unsafe { rcl_action_goal_handle_get_status(*handle, &mut state) };
if ret != RCL_RET_OK as i32 {
println!("action server: Failed to get goal handle state: {}", ret);
return Err(Error::from_rcl_error(ret));
}
return Ok(state == 3u8); // TODO: int8 STATUS_CANCELING
}
Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID)
}
fn cancel_goal(&mut self, uuid: &uuid::Uuid) {
if let Some(handle) = self.goals.remove(uuid) {
let ret = unsafe {
rcl_action_update_goal_state(
handle,
rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL,
)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
}
}
fn set_goal_state(
&mut self,
uuid: &uuid::Uuid,
new_state: rcl_action_goal_event_t,
) -> Result<()> {
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(self.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to publish result without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
if let Some(handle) = self.goals.get(uuid) {
// todo: error handling
unsafe {
rcl_action_update_goal_state(*handle, new_state);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(self.handle());
}
// send out updated statues
self.publish_status();
Ok(())
} else {
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
}
fn send_completed_cancel_requests(&mut self) {
let mut canceled = vec![];
let mut responses = vec![];
self.active_cancel_requests
.retain_mut(|(request_id, msg, fut)| {
println!("checking fut?");
let boxed = fut.boxed();
if let Some(results) = boxed.now_or_never() {
println!("cancel answers: {:?}", results);
let mut response_msg = msg.clone();
let requested_cancels = response_msg.goals_canceling.len();
for r in results {
match r {
Ok((uuid, do_cancel)) => {
// cancel goal and filter response msg.
if do_cancel {
canceled.push(uuid.clone());
}
response_msg.goals_canceling.retain(|goal_info| {
let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(
goal_info.goal_id.uuid.clone(),
));
do_cancel && msg_uuid == uuid
});
}
Err(oneshot::Canceled) => {
eprintln!("Warning, cancel request not handled!");
return false; // skip this request.
}
}
}
// check if all cancels were rejected.
if requested_cancels >= 1 && response_msg.goals_canceling.is_empty() {
response_msg.return_code = 1; // TODO: auto generate these (int8 ERROR_REJECTED=1)
}
responses.push((*request_id, response_msg));
false
} else {
true
}
});
canceled.iter().for_each(|uuid| self.cancel_goal(&uuid));
if !canceled.is_empty() {
// at least one goal state changed, publish a new status message
self.publish_status();
}
// send out responses
for (mut request_id, response_msg) in responses {
// send out response msg.
println!("sending out cancellation msg\n{:?}", response_msg);
let mut native_msg =
WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::from(&response_msg);
let ret = unsafe {
rcl_action_send_cancel_response(
&self.rcl_handle,
&mut request_id,
native_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could send cancel response. {}", ret);
}
}
}
fn handle_goal_request(&mut self, server: Arc<Mutex<dyn ActionServer_>>) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<
@ -84,69 +366,22 @@ where
let msg = <<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request>::from_native(&request_msg);
let (uuid_msg, goal) = T::destructure_goal_request_msg(msg);
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid_msg.uuid.clone()));
let goal_accepted = (self.accept_goal_cb)(&uuid, &goal);
let time = builtin_interfaces::msg::Time::default();
let goal_info = action_msgs::msg::GoalInfo {
goal_id: uuid_msg,
stamp: time.clone(),
let (cancel_sender, cancel_receiver) = mpsc::channel::<CancelRequest>(10);
self.cancel_senders.insert(uuid.clone(), cancel_sender);
let gr: GoalRequest<T> = GoalRequest {
uuid,
goal,
cancel_requests: cancel_receiver,
server: Arc::downgrade(&server),
request_id: unsafe { request_id.assume_init() },
};
let native_goal_info = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
let goal_handle: Option<*mut rcl_action_goal_handle_t> = if goal_accepted {
unsafe {
Some(rcl_action_accept_new_goal(
&mut self.rcl_handle,
&*native_goal_info,
))
}
} else {
None
};
// send response
let response_msg = T::make_goal_response_msg(goal_accepted, time);
let mut response_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>::from(&response_msg);
let ret = unsafe {
let mut request_id = request_id.assume_init();
rcl_action_send_goal_response(
&self.rcl_handle,
&mut request_id,
response_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
println!("action server: failed to send goal response");
return;
}
// if we accepted the goal, update its state machine and publish all goal statuses
if let Some(goal_handle) = goal_handle {
unsafe {
rcl_action_update_goal_state(
goal_handle,
rcl_action_goal_event_t::GOAL_EVENT_EXECUTE,
);
}
self.publish_status();
// run the user supplied cb with newly created goal handle object
let g: ServerGoal<T> = ServerGoal {
uuid,
goal,
handle: Arc::new(Mutex::new(goal_handle)),
server: Arc::downgrade(&server),
};
self.goals.insert(uuid, g.clone());
// start goal callback
(self.goal_cb)(g);
// send out request.
match self.goal_request_sender.try_send(gr) {
Err(e) => eprintln!("warning: could not send service request ({})", e),
_ => (),
}
}
@ -161,6 +396,8 @@ where
)
};
let request_id = unsafe { request_id.assume_init() };
if ret != RCL_RET_OK as i32 {
// this seems normal if client dies.
return;
@ -176,52 +413,38 @@ where
return;
}
let mut response_msg =
let response_msg =
action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg);
// let user filter cancelled goals.
let requested_cancels = response_msg.goals_canceling.len();
response_msg.goals_canceling.retain(|goal_info| {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
if let Some(goal) = self.goals.get(&uuid) {
(self.accept_cancel_cb)(goal)
} else {
true
}
});
let return_channels = response_msg
.goals_canceling
.iter()
.flat_map(|goal_info| {
let uuid =
uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
self.cancel_senders
.get_mut(&uuid)
.and_then(|cancel_sender| {
let (s, r) = oneshot::channel::<(uuid::Uuid, bool)>();
let cr = CancelRequest {
uuid: uuid.clone(),
response_sender: s,
};
match cancel_sender.try_send(cr) {
Err(_) => {
eprintln!("warning: could not send goal cancellation request");
None
}
_ => Some(r),
}
})
})
.collect::<Vec<_>>();
response_msg.goals_canceling.iter().for_each(|goal_info| {
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone()));
if let Some(goal) = self.goals.get_mut(&uuid) {
goal.set_cancel();
}
});
// check if all cancels were rejected.
if requested_cancels >= 1 && response_msg.goals_canceling.is_empty() {
response_msg.return_code = 1; // TODO: auto generate these (int8 ERROR_REJECTED=1)
}
if !response_msg.goals_canceling.is_empty() {
// at least one goal state changed, publish a new status message
self.publish_status();
}
let mut native_msg =
WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::from(&response_msg);
let ret = unsafe {
let mut request_id = request_id.assume_init();
rcl_action_send_cancel_response(
&self.rcl_handle,
&mut request_id,
native_msg.void_ptr_mut(),
)
};
if ret != RCL_RET_OK as i32 {
println!("action server: could send cancel response. {}", ret);
return;
}
// because we want to reply to the caller when all goals have been either accepted or rejected,
// join the channels into one future that we can poll during spin.
self.active_cancel_requests
.push((request_id, response_msg, join_all(return_channels)));
}
fn handle_goal_expired(&mut self) {
@ -239,7 +462,8 @@ where
let gi = action_msgs::msg::GoalInfo::from_native(&goal_info);
let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone()));
println!("goal expired: {} - {}", uuid, num_expired);
self.goals.remove(&uuid);
// todo
// self.goals.remove(&uuid);
self.result_msgs.remove(&uuid);
self.result_requests.remove(&uuid);
}
@ -271,6 +495,14 @@ where
}
}
fn add_goal_handle(
&mut self,
uuid: uuid::Uuid,
goal_handle: *mut rcl_action_goal_handle_t,
) -> () {
self.goals.insert(uuid, goal_handle);
}
// bit of a hack...
fn add_result(&mut self, uuid: uuid::Uuid, mut msg: Box<dyn VoidPtr>) -> () {
// if there are already requests for this goal, send the result immediately.
@ -374,7 +606,6 @@ where
{
pub uuid: uuid::Uuid,
pub goal: T::Goal,
handle: Arc<Mutex<*mut rcl_action_goal_handle_t>>,
server: Weak<Mutex<dyn ActionServer_>>,
}
@ -384,17 +615,14 @@ impl<T: 'static> ServerGoal<T>
where
T: WrappedActionTypeSupport,
{
pub fn is_cancelling(&self) -> bool {
let mut state = 0u8; // TODO: int8 STATUS_UNKNOWN = 0;
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_goal_handle_get_status(*handle, &mut state)
};
pub fn is_cancelling(&self) -> Result<bool> {
let action_server = self
.server
.upgrade()
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
if ret != RCL_RET_OK as i32 {
println!("action server: Failed to get goal handle state: {}", ret);
}
return state == 3u8; // TODO: int8 STATUS_CANCELING
let action_server = action_server.lock().unwrap();
action_server.is_cancelling(&self.uuid)
}
pub fn publish_feedback(&self, msg: T::Feedback) -> Result<()>
@ -419,26 +647,10 @@ where
)
};
if ret == RCL_RET_OK as i32 {
Ok(())
} else {
eprintln!("coult not publish {}", Error::from_rcl_error(ret));
Ok(()) // todo: error codes
}
}
fn set_cancel(&mut self) {
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
eprintln!("coult not publish {}", Error::from_rcl_error(ret));
}
Ok(()) // todo: error codes
}
pub fn cancel(&mut self, msg: T::Result) -> Result<()> {
@ -486,19 +698,6 @@ where
}
pub fn abort(&mut self, msg: T::Result) -> Result<()> {
// todo: error handling
let ret = unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_ABORT)
};
if ret != RCL_RET_OK as i32 {
println!(
"action server: could not cancel goal: {}",
Error::from_rcl_error(ret)
);
}
// upgrade to actual ref. if still alive
let action_server = self
.server
@ -506,31 +705,7 @@ where
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to abort without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(action_server.handle());
}
// send out updated statues
action_server.publish_status();
action_server.set_goal_state(&self.uuid, rcl_action_goal_event_t::GOAL_EVENT_ABORT)?;
// create result message
let result_msg = T::make_result_response_msg(6, msg); // todo: int8 STATUS_ABORTED = 6
@ -553,37 +728,7 @@ where
.ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut action_server = action_server.lock().unwrap();
// todo: check that the goal exists
let goal_info = action_msgs::msg::GoalInfo {
goal_id: unique_identifier_msgs::msg::UUID {
uuid: self.uuid.as_bytes().to_vec(),
},
..action_msgs::msg::GoalInfo::default()
};
let goal_info_native = WrappedNativeMsg::<action_msgs::msg::GoalInfo>::from(&goal_info);
// does this goal exist?
let goal_exists =
unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) };
if !goal_exists {
println!("tried to publish result without a goal");
return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID);
}
// todo: error handling
unsafe {
let handle = self.handle.lock().unwrap();
rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_SUCCEED);
}
// todo: error handling
unsafe {
rcl_action_notify_goal_done(action_server.handle());
}
// send out updated statues
action_server.publish_status();
action_server.set_goal_state(&self.uuid, rcl_action_goal_event_t::GOAL_EVENT_SUCCEED)?;
// create result message
let result_msg = T::make_result_response_msg(4, msg); // todo: int8 STATUS_SUCCEEDED = 4

View File

@ -52,7 +52,7 @@ use action_clients::*;
mod action_servers;
use action_servers::*;
pub use action_servers::ServerGoal;
pub use action_servers::{ActionServer, GoalRequest, ServerGoal, CancelRequest};
mod context;
pub use context::Context;

View File

@ -7,7 +7,7 @@ pub struct Node {
// the node owns the subscribers
subs: Vec<Box<dyn Subscriber_>>,
// services,
services: Vec<Box<dyn Service_>>,
services: Vec<Arc<Mutex<dyn Service_>>>,
// service clients
clients: Vec<Arc<Mutex<dyn Client_>>>,
// action clients
@ -226,7 +226,7 @@ impl Node {
sender,
};
self.services.push(Box::new(ws));
self.services.push(Arc::new(Mutex::new(ws)));
Ok(receiver)
}
@ -302,10 +302,7 @@ impl Node {
pub fn create_action_server<T: 'static>(
&mut self,
action_name: &str,
accept_goal_cb: Box<dyn FnMut(&uuid::Uuid, &T::Goal) -> bool>,
accept_cancel_cb: Box<dyn FnMut(&ServerGoal<T>) -> bool>,
goal_cb: Box<dyn FnMut(ServerGoal<T>)>,
) -> Result<ActionServer<T>>
) -> Result<impl Stream<Item = GoalRequest<T>> + Unpin>
where
T: WrappedActionTypeSupport,
{
@ -323,23 +320,24 @@ impl Node {
}
let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() });
let (goal_request_sender, goal_request_receiver) = mpsc::channel::<GoalRequest<T>>(10);
let server_handle =
create_action_server_helper(self.node_handle.as_mut(), action_name, clock_handle.as_mut(), T::get_ts())?;
let server = WrappedActionServer::<T> {
rcl_handle: server_handle,
clock_handle,
accept_goal_cb,
accept_cancel_cb,
goal_cb,
goal_request_sender,
active_cancel_requests: Vec::new(),
cancel_senders: HashMap::new(),
goals: HashMap::new(),
result_msgs: HashMap::new(),
result_requests: HashMap::new(),
};
let server_arc = Arc::new(Mutex::new(server));
let s = make_action_server(Arc::downgrade(&server_arc));
self.action_servers.push(server_arc);
Ok(s)
Ok(goal_request_receiver)
}
pub fn create_publisher<T>(&mut self, topic: &str) -> Result<Publisher<T>>
@ -367,9 +365,9 @@ impl Node {
}
pub fn spin_once(&mut self, timeout: Duration) {
// first handle any completed service responses
for s in &mut self.services {
s.send_completed_responses();
// first handle any completed action cancellation responses
for a in &mut self.action_servers {
a.lock().unwrap().send_completed_cancel_requests();
}
let timeout = timeout.as_nanos() as i64;
@ -482,7 +480,7 @@ impl Node {
for s in &self.services {
unsafe {
rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut());
rcl_wait_set_add_service(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
}
}
@ -572,7 +570,8 @@ impl Node {
let ws_services = unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) };
for (s, ws_s) in self.services.iter_mut().zip(ws_services) {
if ws_s != &std::ptr::null() {
s.handle_request();
let mut service = s.lock().unwrap();
service.handle_request(s.clone());
}
}
@ -815,7 +814,7 @@ impl Drop for Node {
s.destroy(&mut self.node_handle);
}
for s in &mut self.services {
s.destroy(&mut self.node_handle);
s.lock().unwrap().destroy(&mut self.node_handle);
}
for t in &mut self.timers {
// TODO: check return values

View File

@ -9,29 +9,26 @@ where
{
pub message: T::Request,
request_id: rmw_request_id_t,
response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>,
service: Weak<Mutex<dyn Service_>>,
}
impl<T> ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
T: 'static + WrappedServiceTypeSupport,
{
/// Complete the service request, consuming the request in the process.
/// The reply is sent back on the next "ros spin".
pub fn respond(self, msg: T::Response) {
match self.response_sender.send((self.request_id, msg)) {
Err(_) => {
println!("service response receiver dropped");
}
_ => {}
}
pub fn respond(self, msg: T::Response) -> Result<()> {
let service = self.service.upgrade().ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
let mut service = service.lock().unwrap();
let native_msg = WrappedNativeMsg::<T::Response>::from(&msg);
service.send_response(self.request_id, Box::new(native_msg))
}
}
pub trait Service_ {
fn handle(&self) -> &rcl_service_t;
fn send_completed_responses(&mut self) -> ();
fn handle_request(&mut self) -> ();
fn send_response(&mut self, request_id: rmw_request_id_t, msg: Box<dyn VoidPtr>) -> Result<()>;
fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
@ -52,37 +49,22 @@ where
&self.rcl_handle
}
fn send_completed_responses(&mut self) -> () {
let mut to_send = vec![];
self.outstanding_requests.retain_mut(|r| {
match r.try_recv() {
Ok(Some(resp)) => {
to_send.push(resp);
false // done with this.
}
Ok(None) => true, // keep message, waiting for service
Err(_) => false, // channel canceled
}
});
for (mut req_id, msg) in to_send {
let mut native_response = WrappedNativeMsg::<T::Response>::from(&msg);
let res = unsafe {
rcl_send_response(
&self.rcl_handle,
&mut req_id,
native_response.void_ptr_mut(),
)
};
// TODO
if res != RCL_RET_OK as i32 {
eprintln!("could not send service response {}", res);
}
fn send_response(&mut self, mut request_id: rmw_request_id_t, mut msg: Box<dyn VoidPtr>) -> Result<()> {
let res = unsafe {
rcl_send_response(
&self.rcl_handle,
&mut request_id,
msg.void_ptr_mut(),
)
};
if res == RCL_RET_OK as i32 {
Ok(())
} else {
Err(Error::from_rcl_error(res))
}
}
fn handle_request(&mut self) -> () {
fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> () {
let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
let mut request_msg = WrappedNativeMsg::<T::Request>::new();
@ -96,13 +78,10 @@ where
if ret == RCL_RET_OK as i32 {
let request_id = unsafe { request_id.assume_init() };
let request_msg = T::Request::from_native(&request_msg);
let (response_sender, response_receiver) =
oneshot::channel::<(rmw_request_id_t, T::Response)>();
self.outstanding_requests.push(response_receiver);
let request = ServiceRequest::<T> {
message: request_msg,
request_id,
response_sender,
service: Arc::downgrade(&service)
};
match self.sender.try_send(request) {
Err(e) => eprintln!("warning: could not send service request ({})", e),