more tidying

This commit is contained in:
Martin Dahl 2021-08-14 09:51:41 +02:00
parent ff0f0dd5e1
commit be567a512c
4 changed files with 294 additions and 279 deletions

View File

@ -1,5 +1,185 @@
use super::*;
unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
#[derive(Clone)]
pub struct ActionClient<T>
where
T: WrappedActionTypeSupport,
{
client: Weak<Mutex<WrappedActionClient<T>>>,
}
#[derive(Clone)]
pub struct ClientGoal<T>
where
T: WrappedActionTypeSupport,
{
client: Weak<Mutex<WrappedActionClient<T>>>,
pub uuid: uuid::Uuid,
feedback: Arc<Mutex<Option<mpsc::Receiver<T::Feedback>>>>,
result: Arc<Mutex<Option<oneshot::Receiver<T::Result>>>>,
}
impl<T: 'static> ClientGoal<T>
where
T: WrappedActionTypeSupport,
{
pub fn get_status(&self) -> Result<GoalStatus> {
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let client = client.lock().unwrap();
Ok(client.get_goal_status(&self.uuid))
}
pub fn get_result(&mut self) -> Result<impl Future<Output = Result<T::Result>>> {
if let Some(result_channel) = self.result.lock().unwrap().take() {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_result_request(self.uuid);
Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID))
} else {
// todo: error codes...
println!("already asked for the result!");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn get_feedback(&self) -> Result<impl Stream<Item = T::Feedback> + Unpin> {
if let Some(feedback_channel) = self.feedback.lock().unwrap().take() {
Ok(feedback_channel)
} else {
// todo: error codes...
println!("someone else owns the feedback consumer stream");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_cancel_request(&self.uuid)
}
}
impl<T: 'static> ActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn send_goal_request(
&self,
goal: T::Goal,
) -> Result<impl Future<Output = Result<ClientGoal<T>>>>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
let uuid = uuid::Uuid::new_v4();
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
};
let request_msg = T::make_goal_request_msg(uuid_msg, goal);
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
>::from(&request_msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
// set up channels
let (goal_req_sender, goal_req_receiver) = oneshot::channel::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(1);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) = oneshot::channel::<T::Result>();
client.result_senders.push((uuid, result_sender));
if result == RCL_RET_OK as i32 {
client
.goal_response_channels
.push((seq_no, goal_req_sender));
// instead of "canceled" we return invalid client.
let fut_client = Weak::clone(&self.client);
let future = goal_req_receiver
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
.map(move |r| match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
if accepted {
Ok(ClientGoal {
client: fut_client,
uuid,
feedback: Arc::new(Mutex::new(Some(feedback_receiver))),
result: Arc::new(Mutex::new(Some(result_receiver))),
})
} else {
println!("goal rejected");
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
}
}
Err(e) => Err(e),
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
pub fn make_action_client<T>(client: Weak<Mutex<WrappedActionClient<T>>>) -> ActionClient<T>
where
T: WrappedActionTypeSupport,
{
ActionClient {
client
}
}
pub fn action_server_available<T>(node: &rcl_node_t, client: &ActionClient<T>) -> Result<bool>
where
T: 'static + WrappedActionTypeSupport,
{
let client = client
.client
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
let mut avail = false;
let result = unsafe {
rcl_action_server_is_available(node, client.handle(), &mut avail)
};
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum GoalStatus {
Unknown,
@ -342,3 +522,29 @@ where
}
}
}
pub fn create_action_client_helper(
node: &mut rcl_node_t,
action_name: &str,
action_ts: *const rosidl_action_type_support_t,
) -> Result<rcl_action_client_t> {
let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() };
let action_name_c_string =
CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let client_options = rcl_action_client_get_default_options();
rcl_action_client_init(
&mut client_handle,
node,
action_ts,
action_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
} else {
Err(Error::from_rcl_error(result))
}
}

View File

@ -564,3 +564,32 @@ where
Ok(())
}
}
pub fn create_action_server_helper(
node: &mut rcl_node_t,
action_name: &str,
clock_handle: *mut rcl_clock_t,
action_ts: *const rosidl_action_type_support_t,
) -> Result<rcl_action_server_t> {
let mut server_handle = unsafe { rcl_action_get_zero_initialized_server() };
let action_name_c_string =
CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let server_options = rcl_action_server_get_default_options();
rcl_action_server_init(
&mut server_handle,
node,
clock_handle,
action_ts,
action_name_c_string.as_ptr(),
&server_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(server_handle)
} else {
Err(Error::from_rcl_error(result))
}
}

View File

@ -52,7 +52,6 @@ use action_clients::*;
mod action_servers;
use action_servers::*;
pub use action_servers::ServerGoal;
mod context;
@ -65,55 +64,6 @@ pub use parameters::ParameterValue;
mod clocks;
pub use clocks::{Clock, ClockType};
/// Encapsulates a service request. In contrast to having a simply callback from
/// Request -> Response types that is called synchronously, the service request
/// can be moved around and completed asynchronously.
pub struct ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
pub message: T::Request,
request_id: rmw_request_id_t,
response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>,
}
impl<T> ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
/// Complete the service request, consuming the request in the process.
/// The reply is sent back on the next "ros spin".
pub fn respond(self, msg: T::Response) {
match self.response_sender.send((self.request_id, msg)) {
Err(_) => {
println!("service response receiver dropped");
}
_ => {}
}
}
}
unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
#[derive(Clone)]
pub struct ActionClient<T>
where
T: WrappedActionTypeSupport,
{
client: Weak<Mutex<WrappedActionClient<T>>>,
}
#[derive(Clone)]
pub struct ClientGoal<T>
where
T: WrappedActionTypeSupport,
{
client: Weak<Mutex<WrappedActionClient<T>>>,
pub uuid: uuid::Uuid,
feedback: Arc<Mutex<Option<mpsc::Receiver<T::Feedback>>>>,
result: Arc<Mutex<Option<oneshot::Receiver<T::Result>>>>,
}
#[derive(Clone)]
pub struct ActionServer<T>
where
@ -348,32 +298,6 @@ impl Node {
Ok(receiver)
}
fn create_service_helper(
&mut self,
service_name: &str,
service_ts: *const rosidl_service_type_support_t,
) -> Result<rcl_service_t> {
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let service_options = rcl_service_get_default_options();
rcl_service_init(
&mut service_handle,
self.node_handle.as_mut(),
service_ts,
service_name_c_string.as_ptr(),
&service_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(service_handle)
} else {
Err(Error::from_rcl_error(result))
}
}
pub fn create_service<T: 'static>(
&mut self,
service_name: &str,
@ -381,7 +305,7 @@ impl Node {
where
T: WrappedServiceTypeSupport,
{
let service_handle = self.create_service_helper(service_name, T::get_ts())?;
let service_handle = create_service_helper(self.node_handle.as_mut(), service_name, T::get_ts())?;
let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
let ws = TypedService::<T> {
@ -435,37 +359,11 @@ impl Node {
service_available_untyped(self.node_handle.as_mut(), client)
}
fn create_action_client_helper(
&mut self,
action_name: &str,
action_ts: *const rosidl_action_type_support_t,
) -> Result<rcl_action_client_t> {
let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() };
let action_name_c_string =
CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let client_options = rcl_action_client_get_default_options();
rcl_action_client_init(
&mut client_handle,
self.node_handle.as_mut(),
action_ts,
action_name_c_string.as_ptr(),
&client_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
} else {
Err(Error::from_rcl_error(result))
}
}
pub fn create_action_client<T: 'static>(&mut self, action_name: &str) -> Result<ActionClient<T>>
where
T: WrappedActionTypeSupport,
{
let client_handle = self.create_action_client_helper(action_name, T::get_ts())?;
let client_handle = create_action_client_helper(self.node_handle.as_mut(), action_name, T::get_ts())?;
let client = WrappedActionClient::<T> {
rcl_handle: client_handle,
goal_response_channels: Vec::new(),
@ -478,7 +376,7 @@ impl Node {
let client_arc = Arc::new(Mutex::new(client));
self.action_clients.push(client_arc.clone());
let c = ActionClient { client: Arc::downgrade(&client_arc) };
let c = make_action_client(Arc::downgrade(&client_arc));
Ok(c)
}
@ -486,51 +384,7 @@ impl Node {
&self,
client: &ActionClient<T>,
) -> Result<bool> {
let client = client
.client
.upgrade()
.ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
let mut avail = false;
let result = unsafe {
rcl_action_server_is_available(self.node_handle.as_ref(), client.handle(), &mut avail)
};
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
fn create_action_server_helper(
&mut self,
action_name: &str,
clock_handle: *mut rcl_clock_t,
action_ts: *const rosidl_action_type_support_t,
) -> Result<rcl_action_server_t> {
let mut server_handle = unsafe { rcl_action_get_zero_initialized_server() };
let action_name_c_string =
CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let server_options = rcl_action_server_get_default_options();
rcl_action_server_init(
&mut server_handle,
self.node_handle.as_mut(),
clock_handle,
action_ts,
action_name_c_string.as_ptr(),
&server_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(server_handle)
} else {
Err(Error::from_rcl_error(result))
}
action_server_available(self.node_handle.as_ref(), client)
}
pub fn create_action_server<T: 'static>(
@ -558,7 +412,7 @@ impl Node {
let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() });
let server_handle =
self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?;
create_action_server_helper(self.node_handle.as_mut(), action_name, clock_handle.as_mut(), T::get_ts())?;
let server = WrappedActionServer::<T> {
rcl_handle: server_handle,
clock_handle,
@ -1124,134 +978,6 @@ impl Drop for Node {
}
}
impl<T: 'static> ClientGoal<T>
where
T: WrappedActionTypeSupport,
{
pub fn get_status(&self) -> Result<GoalStatus> {
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let client = client.lock().unwrap();
Ok(client.get_goal_status(&self.uuid))
}
pub fn get_result(&mut self) -> Result<impl Future<Output = Result<T::Result>>> {
if let Some(result_channel) = self.result.lock().unwrap().take() {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_result_request(self.uuid);
Ok(result_channel.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID))
} else {
// todo: error codes...
println!("already asked for the result!");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn get_feedback(&self) -> Result<impl Stream<Item = T::Feedback> + Unpin> {
if let Some(feedback_channel) = self.feedback.lock().unwrap().take() {
Ok(feedback_channel)
} else {
// todo: error codes...
println!("someone else owns the feedback consumer stream");
Err(Error::RCL_RET_ACTION_CLIENT_INVALID)
}
}
pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
client.send_cancel_request(&self.uuid)
}
}
impl<T: 'static> ActionClient<T>
where
T: WrappedActionTypeSupport,
{
pub fn send_goal_request(
&self,
goal: T::Goal,
) -> Result<impl Future<Output = Result<ClientGoal<T>>>>
where
T: WrappedActionTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self
.client
.upgrade()
.ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
let uuid = uuid::Uuid::new_v4();
let uuid_msg = unique_identifier_msgs::msg::UUID {
uuid: uuid.as_bytes().to_vec(),
};
let request_msg = T::make_goal_request_msg(uuid_msg, goal);
let native_msg = WrappedNativeMsg::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
>::from(&request_msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
};
// set up channels
let (goal_req_sender, goal_req_receiver) = oneshot::channel::<
<<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
>();
let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(1);
client.feedback_senders.push((uuid, feedback_sender));
let (result_sender, result_receiver) = oneshot::channel::<T::Result>();
client.result_senders.push((uuid, result_sender));
if result == RCL_RET_OK as i32 {
client
.goal_response_channels
.push((seq_no, goal_req_sender));
// instead of "canceled" we return invalid client.
let fut_client = Weak::clone(&self.client);
let future = goal_req_receiver
.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
.map(move |r| match r {
Ok(resp) => {
let (accepted, _stamp) = T::destructure_goal_response_msg(resp);
if accepted {
Ok(ClientGoal {
client: fut_client,
uuid,
feedback: Arc::new(Mutex::new(Some(feedback_receiver))),
result: Arc::new(Mutex::new(Some(result_receiver))),
})
} else {
println!("goal rejected");
Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
}
}
Err(e) => Err(e),
});
Ok(future)
} else {
eprintln!("coult not send goal request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,5 +1,33 @@
use super::*;
/// Encapsulates a service request. In contrast to having a simply callback from
/// Request -> Response types that is called synchronously, the service request
/// can be moved around and completed asynchronously.
pub struct ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
pub message: T::Request,
request_id: rmw_request_id_t,
response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>,
}
impl<T> ServiceRequest<T>
where
T: WrappedServiceTypeSupport,
{
/// Complete the service request, consuming the request in the process.
/// The reply is sent back on the next "ros spin".
pub fn respond(self, msg: T::Response) {
match self.response_sender.send((self.request_id, msg)) {
Err(_) => {
println!("service response receiver dropped");
}
_ => {}
}
}
}
pub trait Service_ {
fn handle(&self) -> &rcl_service_t;
fn send_completed_responses(&mut self) -> ();
@ -89,3 +117,29 @@ where
}
}
}
pub fn create_service_helper(
node: &mut rcl_node_t,
service_name: &str,
service_ts: *const rosidl_service_type_support_t,
) -> Result<rcl_service_t> {
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
let service_name_c_string =
CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let service_options = rcl_service_get_default_options();
rcl_service_init(
&mut service_handle,
node,
service_ts,
service_name_c_string.as_ptr(),
&service_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(service_handle)
} else {
Err(Error::from_rcl_error(result))
}
}