diff --git a/build.rs b/build.rs index 1eac9b1..8316eb8 100644 --- a/build.rs +++ b/build.rs @@ -118,13 +118,17 @@ fn main() { } let untyped_helper = msg_gen::generate_untyped_helper(&msg_list); + let untyped_service_helper = msg_gen::generate_untyped_service_helper(&msg_list); let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); let msgs_fn = out_path.join("_r2r_generated_msgs.rs"); let untyped_fn = out_path.join("_r2r_generated_untyped_helper.rs"); + let untyped_service_fn = out_path.join("_r2r_generated_service_helper.rs"); let mut f = File::create(msgs_fn).unwrap(); write!(f, "{}", modules).unwrap(); let mut f = File::create(untyped_fn).unwrap(); write!(f, "{}", untyped_helper).unwrap(); + let mut f = File::create(untyped_service_fn).unwrap(); + write!(f, "{}", untyped_service_helper).unwrap(); } diff --git a/examples/action_server.rs b/examples/action_server.rs index cdcbd49..7ae5555 100644 --- a/examples/action_server.rs +++ b/examples/action_server.rs @@ -93,14 +93,12 @@ fn main() -> Result<(), Box> { Box::new(accept_cancel_cb), Box::new(handle_goal_cb), )?; - loop { + + while !*done.lock().unwrap() { node.lock() .unwrap() .spin_once(std::time::Duration::from_millis(100)); pool.run_until_stalled(); - if *done.lock().unwrap() { - break; - } } Ok(()) diff --git a/examples/publishers.rs b/examples/publishers.rs index 0782130..338600f 100644 --- a/examples/publishers.rs +++ b/examples/publishers.rs @@ -22,7 +22,7 @@ fn main() -> Result<(), Box> { }, ..Default::default() }; - let mut native = r2r::WrappedNativeMsg::::new(); + let mut native = r2r::NativeMsg::::new(); native.data = count; publisher.publish(&to_send).unwrap(); diff --git a/examples/subscriber.rs b/examples/subscriber.rs index c096295..fbd9a87 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -33,10 +33,10 @@ fn main() -> Result<(), Box> { })?; // for sub2 we just print the data - let sub2 = node.subscribe::("/topic2")?; + let sub2 = node.subscribe_untyped("/topic2", "std_msgs/msg/String")?; spawner.spawn_local(async move { sub2.for_each(|msg| { - println!("topic2: new msg: {}", msg.data); + println!("topic2: new msg: {}", msg.expect("deserialization error")); future::ready(()) }) .await diff --git a/examples/tokio.rs b/examples/tokio.rs index 4024fe8..17f5a4b 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -17,6 +17,15 @@ async fn main() -> Result<(), Box> { let p = node.create_publisher::("/topic2")?; let state = Arc::new(Mutex::new(SharedState::default())); + use r2r::example_interfaces::srv::AddTwoInts; + let client = node.create_client::("/add_two_ints")?; + + std::thread::spawn(move || { + let req = AddTwoInts::Request { a: 10, b: 20 }; + print!("{} + {} = ", req.a, req.b); + let resp = client.request(&req).expect(""); + }); + // task that every other time forwards message to topic2 let state_t1 = state.clone(); task::spawn(async move { diff --git a/examples/untyped_client.rs b/examples/untyped_client.rs new file mode 100644 index 0000000..99c4433 --- /dev/null +++ b/examples/untyped_client.rs @@ -0,0 +1,48 @@ +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; +use r2r; + +async fn requester_task(c: r2r::UntypedClient) -> Result<(), Box> { + let mut x: i64 = 0; + loop { + let json = format!("{{ \"a\": {}, \"b\": {} }}", 10 * x, x); + let req = serde_json::from_str(&json).unwrap(); + let resp = c.request(req)?.await?; + println!("{}", resp.expect("deserialization error")); + + x += 1; + if x == 10 { + break; + } + } + Ok(()) +} + +fn main() -> Result<(), Box> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let client = node.create_client_untyped("/add_two_ints", "example_interfaces/srv/AddTwoInts")?; + + // wait for service to be available + println!("waiting for service..."); + while !node.service_available_untyped(&client)? { + std::thread::sleep(std::time::Duration::from_millis(1000)); + } + + println!("service available."); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + + spawner.spawn_local(async move { + match requester_task(client).await { + Ok(()) => println!("done."), + Err(e) => println!("error: {}", e), + } + })?; + + loop { + node.spin_once(std::time::Duration::from_millis(100)); + pool.run_until_stalled(); + } +} diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index 99b05a2..961c9b6 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -486,7 +486,7 @@ pub fn generate_untyped_helper(msgs: &Vec) -> String { let open = String::from( " impl WrappedNativeMsgUntyped { - fn new_from(typename: &str) -> Result { + pub fn new_from(typename: &str) -> Result { ", ); let close = String::from( @@ -523,3 +523,44 @@ impl WrappedNativeMsgUntyped { format!("{}{}{}", open, lines, close) } + +pub fn generate_untyped_service_helper(msgs: &Vec) -> String { + let open = String::from( + " +impl UntypedServiceSupport { + pub fn new_from(typename: &str) -> Result { +", + ); + let close = String::from( + " + else + { + return Err(Error::InvalidMessageType{ msgtype: typename.into() }) + } + } +} +", + ); + + let mut lines = String::new(); + for msg in msgs { + if msg.prefix != "srv" { + continue; + } + + let typename = format!("{}/{}/{}", msg.module, msg.prefix, msg.name); + let rustname = format!("{}::{}::{}::Service", msg.module, msg.prefix, msg.name); + + lines.push_str(&format!( + " + if typename == \"{typename}\" {{ + return Ok(UntypedServiceSupport::new::<{rustname}>()); + }} +", + typename = typename, + rustname = rustname + )); + } + + format!("{}{}{}", open, lines, close) +} diff --git a/src/action_clients.rs b/src/action_clients.rs new file mode 100644 index 0000000..1253429 --- /dev/null +++ b/src/action_clients.rs @@ -0,0 +1,344 @@ +use super::*; + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum GoalStatus { + Unknown, + Accepted, + Executing, + Canceling, + Succeeded, + Canceled, + Aborted, +} + +impl GoalStatus { + #[allow(dead_code)] + pub fn to_rcl(&self) -> i8 { + match self { + GoalStatus::Unknown => 0, + GoalStatus::Accepted => 1, + GoalStatus::Executing => 2, + GoalStatus::Canceling => 3, + GoalStatus::Succeeded => 4, + GoalStatus::Canceled => 5, + GoalStatus::Aborted => 6, + } + } + + pub fn from_rcl(s: i8) -> Self { + match s { + 0 => GoalStatus::Unknown, + 1 => GoalStatus::Accepted, + 2 => GoalStatus::Executing, + 3 => GoalStatus::Canceling, + 4 => GoalStatus::Succeeded, + 5 => GoalStatus::Canceled, + 6 => GoalStatus::Aborted, + _ => panic!("unknown action status: {}", s), + } + } +} + +pub struct WrappedActionClient +where + T: WrappedActionTypeSupport, +{ + pub rcl_handle: rcl_action_client_t, + pub goal_response_channels: Vec<( + i64, + oneshot::Sender< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >, + )>, + pub cancel_response_channels: Vec<(i64, oneshot::Sender)>, + pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, + pub result_requests: Vec<(i64, uuid::Uuid)>, + pub result_senders: Vec<(uuid::Uuid, oneshot::Sender)>, + pub goal_status: HashMap, +} + +pub trait ActionClient_ { + fn handle(&self) -> &rcl_action_client_t; + fn destroy(&mut self, node: &mut rcl_node_t) -> (); + + fn handle_goal_response(&mut self) -> (); + fn handle_cancel_response(&mut self) -> (); + fn handle_feedback_msg(&mut self) -> (); + fn handle_status_msg(&mut self) -> (); + fn handle_result_response(&mut self) -> (); + + fn send_result_request(&mut self, uuid: uuid::Uuid) -> (); +} + +use std::convert::TryInto; +pub fn vec_to_uuid_bytes(v: Vec) -> [T; 16] { + v.try_into().unwrap_or_else(|v: Vec| { + panic!("Expected a Vec of length {} but it was {}", 16, v.len()) + }) +} + +impl WrappedActionClient +where + T: WrappedActionTypeSupport, +{ + pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus { + *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown) + } + + pub fn send_cancel_request(&mut self, goal: &uuid::Uuid) -> Result>> + where + T: WrappedActionTypeSupport, + { + let msg = action_msgs::srv::CancelGoal::Request { + goal_info: action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { + uuid: goal.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }, + }; + let native_msg = WrappedNativeMsg::::from(&msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + if result == RCL_RET_OK as i32 { + let (cancel_req_sender, cancel_req_receiver) = + oneshot::channel::(); + + self.cancel_response_channels + .push((seq_no, cancel_req_sender)); + // instead of "canceled" we return invalid client. + let future = cancel_req_receiver + .map_err(|_| Error::RCL_RET_CLIENT_INVALID) + .map(|r| match r { + Ok(r) => match r.return_code { + 0 => Ok(()), + 1 => Err(Error::GoalCancelRejected), + 2 => Err(Error::GoalCancelUnknownGoalID), + 3 => Err(Error::GoalCancelAlreadyTerminated), + x => panic!("unknown error code return from action server: {}", x), + }, + Err(e) => Err(e), + }); + Ok(future) + } else { + eprintln!("coult not send goal request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +impl ActionClient_ for WrappedActionClient +where + T: WrappedActionTypeSupport, +{ + fn handle(&self) -> &rcl_action_client_t { + &self.rcl_handle + } + + fn handle_goal_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >::new(); + + let ret = unsafe { + rcl_action_take_goal_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .goal_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.goal_response_channels.swap_remove(idx); + let response = <::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); + match sender.send(response) { + Ok(()) => {} + Err(e) => { + println!("error sending to action client: {:?}", e); + } + } + } else { + let we_have: String = self + .goal_response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn handle_cancel_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::::new(); + + let ret = unsafe { + rcl_action_take_cancel_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .cancel_response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.cancel_response_channels.swap_remove(idx); + let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg); + match sender.send(response) { + Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e), + _ => (), + } + } else { + let we_have: String = self + .goal_response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn handle_feedback_msg(&mut self) -> () { + let mut feedback_msg = WrappedNativeMsg::::new(); + let ret = + unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) }; + if ret == RCL_RET_OK as i32 { + let msg = T::FeedbackMessage::from_native(&feedback_msg); + let (uuid, feedback) = T::destructure_feedback_msg(msg); + let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); + if let Some((_, sender)) = self + .feedback_senders + .iter_mut() + .find(|(uuid, _)| uuid == &msg_uuid) + { + match sender.try_send(feedback) { + Err(e) => eprintln!("warning: could not send feedback msg ({})", e), + _ => (), + } + } + } + } + + fn handle_status_msg(&mut self) -> () { + let mut status_array = WrappedNativeMsg::::new(); + let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) }; + if ret == RCL_RET_OK as i32 { + let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); + for a in &arr.status_list { + let uuid = + uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); + if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { + continue; + } + let status = GoalStatus::from_rcl(a.status); + *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status; + } + } + } + + fn handle_result_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::new(); + + let ret = unsafe { + rcl_action_take_result_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .result_requests + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, uuid) = self.result_requests.swap_remove(idx); + if let Some(idx) = self + .result_senders + .iter() + .position(|(suuid, _)| suuid == &uuid) + { + let (_, sender) = self.result_senders.swap_remove(idx); + let response = <::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); + let (status, result) = T::destructure_result_response_msg(response); + let status = GoalStatus::from_rcl(status); + if status != GoalStatus::Succeeded { + println!("goal status failed: {:?}, result: {:?}", status, result); + // this will drop the sender which makes the receiver fail with "canceled" + } else { + match sender.send(result) { + Ok(()) => {} + Err(e) => { + println!("error sending result to action client: {:?}", e); + } + } + } + } + } else { + let we_have: String = self + .result_requests + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } + } + + fn send_result_request(&mut self, uuid: uuid::Uuid) -> () { + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: uuid.as_bytes().to_vec(), + }; + let request_msg = T::make_result_request_msg(uuid_msg); + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Request, + >::from(&request_msg); + let mut seq_no = 0i64; + let result = unsafe { + rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) + }; + + if result == RCL_RET_OK as i32 { + self.result_requests.push((seq_no, uuid)); + } else { + eprintln!("coult not send request {}", result); + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_action_client_fini(&mut self.rcl_handle, node); + } + } +} diff --git a/src/action_servers.rs b/src/action_servers.rs new file mode 100644 index 0000000..77db735 --- /dev/null +++ b/src/action_servers.rs @@ -0,0 +1,566 @@ +use super::*; + +pub trait ActionServer_ { + fn handle(&self) -> &rcl_action_server_t; + fn handle_goal_request(&mut self, server: Arc>) -> (); + fn handle_cancel_request(&mut self) -> (); + fn handle_result_request(&mut self) -> (); + fn handle_goal_expired(&mut self) -> (); + fn publish_status(&self) -> (); + fn add_result(&mut self, uuid: uuid::Uuid, msg: Box) -> (); + fn destroy(&mut self, node: &mut rcl_node_t); +} + +pub struct WrappedActionServer +where + T: WrappedActionTypeSupport, +{ + pub rcl_handle: rcl_action_server_t, + pub clock_handle: Box, + pub accept_goal_cb: Box bool>, + pub accept_cancel_cb: Box) -> bool>, + pub goal_cb: Box)>, + pub goals: HashMap>, + pub result_msgs: HashMap>, + pub result_requests: HashMap>, +} + +impl ActionServer_ for WrappedActionServer +where + T: WrappedActionTypeSupport, +{ + fn handle(&self) -> &rcl_action_server_t { + &self.rcl_handle + } + + fn handle_goal_request(&mut self, server: Arc>) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut request_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Request, + >::new(); + let ret = unsafe { + rcl_action_take_goal_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) + }; + + if ret != RCL_RET_OK as i32 { + // this seems normal if client dies. + return; + } + let msg = <<::SendGoal as WrappedServiceTypeSupport>::Request>::from_native(&request_msg); + let (uuid_msg, goal) = T::destructure_goal_request_msg(msg); + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid_msg.uuid.clone())); + let goal_accepted = (self.accept_goal_cb)(&uuid, &goal); + let time = builtin_interfaces::msg::Time::default(); + + let goal_info = action_msgs::msg::GoalInfo { + goal_id: uuid_msg, + stamp: time.clone(), + }; + + let native_goal_info = WrappedNativeMsg::::from(&goal_info); + + let goal_handle: Option<*mut rcl_action_goal_handle_t> = if goal_accepted { + unsafe { + Some(rcl_action_accept_new_goal( + &mut self.rcl_handle, + &*native_goal_info, + )) + } + } else { + None + }; + + // send response + let response_msg = T::make_goal_response_msg(goal_accepted, time); + let mut response_msg = WrappedNativeMsg::< + <::SendGoal as WrappedServiceTypeSupport>::Response, + >::from(&response_msg); + + let ret = unsafe { + let mut request_id = request_id.assume_init(); + rcl_action_send_goal_response( + &self.rcl_handle, + &mut request_id, + response_msg.void_ptr_mut(), + ) + }; + if ret != RCL_RET_OK as i32 { + println!("action server: failed to send goal response"); + return; + } + + // if we accepted the goal, update its state machine and publish all goal statuses + if let Some(goal_handle) = goal_handle { + unsafe { + rcl_action_update_goal_state( + goal_handle, + rcl_action_goal_event_t::GOAL_EVENT_EXECUTE, + ); + } + + self.publish_status(); + + // run the user supplied cb with newly created goal handle object + let g: ServerGoal = ServerGoal { + uuid, + goal, + handle: Arc::new(Mutex::new(goal_handle)), + server: Arc::downgrade(&server), + }; + + self.goals.insert(uuid, g.clone()); + + // start goal callback + (self.goal_cb)(g); + } + } + + fn handle_cancel_request(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut request_msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_action_take_cancel_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) + }; + + if ret != RCL_RET_OK as i32 { + // this seems normal if client dies. + return; + } + + let mut cancel_response = unsafe { rcl_action_get_zero_initialized_cancel_response() }; + let ret = unsafe { + rcl_action_process_cancel_request(&self.rcl_handle, &*request_msg, &mut cancel_response) + }; + + if ret != RCL_RET_OK as i32 { + println!("action server: could not process cancel request. {}", ret); + return; + } + + let mut response_msg = + action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg); + + // let user filter cancelled goals. + let requested_cancels = response_msg.goals_canceling.len(); + response_msg.goals_canceling.retain(|goal_info| { + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); + if let Some(goal) = self.goals.get(&uuid) { + (self.accept_cancel_cb)(goal) + } else { + true + } + }); + + response_msg.goals_canceling.iter().for_each(|goal_info| { + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); + if let Some(goal) = self.goals.get_mut(&uuid) { + goal.set_cancel(); + } + }); + + // check if all cancels were rejected. + if requested_cancels >= 1 && response_msg.goals_canceling.is_empty() { + response_msg.return_code = 1; // TODO: auto generate these (int8 ERROR_REJECTED=1) + } + + if !response_msg.goals_canceling.is_empty() { + // at least one goal state changed, publish a new status message + self.publish_status(); + } + + let mut native_msg = + WrappedNativeMsg::::from(&response_msg); + let ret = unsafe { + let mut request_id = request_id.assume_init(); + rcl_action_send_cancel_response( + &self.rcl_handle, + &mut request_id, + native_msg.void_ptr_mut(), + ) + }; + + if ret != RCL_RET_OK as i32 { + println!("action server: could send cancel response. {}", ret); + return; + } + } + + fn handle_goal_expired(&mut self) { + let mut goal_info = WrappedNativeMsg::::new(); + let mut num_expired = 1; + + while num_expired > 1 { + let ret = unsafe { + rcl_action_expire_goals(&self.rcl_handle, &mut *goal_info, 1, &mut num_expired) + }; + if ret != RCL_RET_OK as i32 { + println!("action server: could not expire goal."); + return; + } + let gi = action_msgs::msg::GoalInfo::from_native(&goal_info); + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone())); + println!("goal expired: {} - {}", uuid, num_expired); + self.goals.remove(&uuid); + self.result_msgs.remove(&uuid); + self.result_requests.remove(&uuid); + } + } + + fn publish_status(&self) { + unsafe { + let mut status = rcl_action_get_zero_initialized_goal_status_array(); + let ret = rcl_action_get_goal_status_array(&self.rcl_handle, &mut status); + if ret != RCL_RET_OK as i32 { + println!( + "action server: failed to get goal status array: {}", + Error::from_rcl_error(ret) + ); + return; + } + let ret = rcl_action_publish_status( + &self.rcl_handle, + &status as *const _ as *const std::os::raw::c_void, + ); + if ret != RCL_RET_OK as i32 { + println!( + "action server: failed to publish status: {}", + Error::from_rcl_error(ret) + ); + return; + } + rcl_action_goal_status_array_fini(&mut status); + } + } + + // bit of a hack... + fn add_result(&mut self, uuid: uuid::Uuid, mut msg: Box) -> () { + // if there are already requests for this goal, send the result immediately. + if let Some(rr) = self.result_requests.remove(&uuid) { + for mut req in rr { + let ret = unsafe { + rcl_action_send_result_response(&self.rcl_handle, &mut req, msg.void_ptr_mut()) + }; + if ret != RCL_RET_OK as i32 { + println!( + "action server: could send result request response. {}", + Error::from_rcl_error(ret) + ); + } + } + } + self.result_msgs.insert(uuid, msg); + } + + fn handle_result_request(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut request_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Request, + >::new(); + let ret = unsafe { + rcl_action_take_result_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) + }; + + if ret != RCL_RET_OK as i32 { + // this seems normal if client dies. + return; + } + + let msg = <<::GetResult as WrappedServiceTypeSupport>::Request>::from_native(&request_msg); + let goal_info = action_msgs::msg::GoalInfo { + goal_id: T::destructure_result_request_msg(msg), + ..action_msgs::msg::GoalInfo::default() + }; + let goal_info_native = WrappedNativeMsg::::from(&goal_info); + + // does this goal exist? + let goal_exists = + unsafe { rcl_action_server_goal_exists(&self.rcl_handle, &*goal_info_native) }; + + let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); + + let response_msg = if !goal_exists { + // Goal does not exists + println!("goal does not exist :("); + let status = GoalStatus::Unknown; + let msg = T::make_result_response_msg(status.to_rcl(), T::Result::default()); + let mut response_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&msg); + Some(response_msg.void_ptr_mut()) + } else { + self.result_msgs + .get_mut(&uuid) + .map(|msg| msg.void_ptr_mut()) + }; + + let mut request_id = unsafe { request_id.assume_init() }; + if let Some(response_msg) = response_msg { + let ret = unsafe { + rcl_action_send_result_response(&self.rcl_handle, &mut request_id, response_msg) + }; + + if ret != RCL_RET_OK as i32 { + println!( + "action server: could send result request response. {}", + Error::from_rcl_error(ret) + ); + return; + } + } else { + // keep request for later when result comes in + // todo: add logic that replies to the requests + self.result_requests + .entry(uuid) + .or_insert(vec![]) + .push(request_id); + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_action_server_fini(&mut self.rcl_handle, node); + rcl_ros_clock_fini(self.clock_handle.as_mut()); + } + } +} + +#[derive(Clone)] +pub struct ServerGoal +where + T: WrappedActionTypeSupport, +{ + pub uuid: uuid::Uuid, + pub goal: T::Goal, + handle: Arc>, + server: Weak>, +} + +unsafe impl Send for ServerGoal where T: WrappedActionTypeSupport {} + +impl ServerGoal +where + T: WrappedActionTypeSupport, +{ + pub fn is_cancelling(&self) -> bool { + let mut state = 0u8; // TODO: int8 STATUS_UNKNOWN = 0; + let ret = unsafe { + let handle = self.handle.lock().unwrap(); + rcl_action_goal_handle_get_status(*handle, &mut state) + }; + + if ret != RCL_RET_OK as i32 { + println!("action server: Failed to get goal handle state: {}", ret); + } + return state == 3u8; // TODO: int8 STATUS_CANCELING + } + + pub fn publish_feedback(&self, msg: T::Feedback) -> Result<()> + where + T: WrappedActionTypeSupport, + { + // upgrade to actual ref. if still alive + let action_server = self + .server + .upgrade() + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; + + let uuid_msg = unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }; + let feedback_msg = T::make_feedback_msg(uuid_msg, msg); + let mut native_msg = WrappedNativeMsg::::from(&feedback_msg); + let ret = unsafe { + rcl_action_publish_feedback( + action_server.lock().unwrap().handle(), + native_msg.void_ptr_mut(), + ) + }; + + if ret == RCL_RET_OK as i32 { + Ok(()) + } else { + eprintln!("coult not publish {}", Error::from_rcl_error(ret)); + Ok(()) // todo: error codes + } + } + + fn set_cancel(&mut self) { + let ret = unsafe { + let handle = self.handle.lock().unwrap(); + rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL) + }; + + if ret != RCL_RET_OK as i32 { + println!( + "action server: could not cancel goal: {}", + Error::from_rcl_error(ret) + ); + } + } + + pub fn cancel(&mut self, msg: T::Result) -> Result<()> { + // upgrade to actual ref. if still alive + let action_server = self + .server + .upgrade() + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; + let mut action_server = action_server.lock().unwrap(); + + // todo: check that the goal exists + let goal_info = action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }; + let goal_info_native = WrappedNativeMsg::::from(&goal_info); + + // does this goal exist? + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; + + if !goal_exists { + println!("tried to publish result without a goal"); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); + } + + // todo: error handling + unsafe { + rcl_action_notify_goal_done(action_server.handle()); + } + + // send out updated statues + action_server.publish_status(); + + // create result message + let result_msg = T::make_result_response_msg(5, msg); // todo: int8 STATUS_CANCELED = 5 + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); + action_server.add_result(self.uuid.clone(), Box::new(native_msg)); + + Ok(()) + } + + pub fn abort(&mut self, msg: T::Result) -> Result<()> { + // todo: error handling + let ret = unsafe { + let handle = self.handle.lock().unwrap(); + rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_ABORT) + }; + + if ret != RCL_RET_OK as i32 { + println!( + "action server: could not cancel goal: {}", + Error::from_rcl_error(ret) + ); + } + + // upgrade to actual ref. if still alive + let action_server = self + .server + .upgrade() + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; + let mut action_server = action_server.lock().unwrap(); + + // todo: check that the goal exists + let goal_info = action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }; + let goal_info_native = WrappedNativeMsg::::from(&goal_info); + + // does this goal exist? + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; + + if !goal_exists { + println!("tried to abort without a goal"); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); + } + + // todo: error handling + unsafe { + rcl_action_notify_goal_done(action_server.handle()); + } + + // send out updated statues + action_server.publish_status(); + + // create result message + let result_msg = T::make_result_response_msg(6, msg); // todo: int8 STATUS_ABORTED = 6 + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); + action_server.add_result(self.uuid.clone(), Box::new(native_msg)); + + Ok(()) + } + + pub fn succeed(&mut self, msg: T::Result) -> Result<()> + where + T: WrappedActionTypeSupport, + { + // upgrade to actual ref. if still alive + let action_server = self + .server + .upgrade() + .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; + let mut action_server = action_server.lock().unwrap(); + + // todo: check that the goal exists + let goal_info = action_msgs::msg::GoalInfo { + goal_id: unique_identifier_msgs::msg::UUID { + uuid: self.uuid.as_bytes().to_vec(), + }, + ..action_msgs::msg::GoalInfo::default() + }; + let goal_info_native = WrappedNativeMsg::::from(&goal_info); + + // does this goal exist? + let goal_exists = + unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; + + if !goal_exists { + println!("tried to publish result without a goal"); + return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); + } + + // todo: error handling + unsafe { + let handle = self.handle.lock().unwrap(); + rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_SUCCEED); + } + + // todo: error handling + unsafe { + rcl_action_notify_goal_done(action_server.handle()); + } + + // send out updated statues + action_server.publish_status(); + + // create result message + let result_msg = T::make_result_response_msg(4, msg); // todo: int8 STATUS_SUCCEEDED = 4 + let native_msg = WrappedNativeMsg::< + <::GetResult as WrappedServiceTypeSupport>::Response, + >::from(&result_msg); + action_server.add_result(self.uuid.clone(), Box::new(native_msg)); + + Ok(()) + } +} diff --git a/src/clients.rs b/src/clients.rs new file mode 100644 index 0000000..215d03d --- /dev/null +++ b/src/clients.rs @@ -0,0 +1,211 @@ +use super::*; + +unsafe impl Send for TypedClient where T: WrappedServiceTypeSupport {} + +impl TypedClient +where + T: WrappedServiceTypeSupport, +{ + pub fn request(&mut self, msg: &T::Request) -> Result>> + where + T: WrappedServiceTypeSupport, + { + let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); + let mut seq_no = 0i64; + let result = + unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + + let (sender, receiver) = oneshot::channel::(); + + if result == RCL_RET_OK as i32 { + self.response_channels.push((seq_no, sender)); + // instead of "canceled" we return invalid client. + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + } else { + eprintln!("coult not send request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +unsafe impl Send for UntypedClient_ {} + +impl UntypedClient_ { + pub fn request(&mut self, msg: serde_json::Value) -> Result>>> + { + let mut native_msg = (self.service_type.make_request_msg)(); + native_msg.from_json(msg)?; + + let mut seq_no = 0i64; + let result = + unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; + + let (sender, receiver) = oneshot::channel::>(); + + if result == RCL_RET_OK as i32 { + self.response_channels.push((seq_no, sender)); + // instead of "canceled" we return invalid client. + Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) + } else { + eprintln!("coult not send request {}", result); + Err(Error::from_rcl_error(result)) + } + } +} + +pub trait Client_ { + fn handle(&self) -> &rcl_client_t; + fn handle_response(&mut self) -> (); + fn destroy(&mut self, node: &mut rcl_node_t) -> (); +} + +pub struct TypedClient +where + T: WrappedServiceTypeSupport, +{ + pub rcl_handle: rcl_client_t, + pub response_channels: Vec<(i64, oneshot::Sender)>, +} + +impl Client_ for TypedClient +where + T: WrappedServiceTypeSupport, +{ + fn handle(&self) -> &rcl_client_t { + &self.rcl_handle + } + + fn handle_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = WrappedNativeMsg::::new(); + + let ret = unsafe { + rcl_take_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.response_channels.swap_remove(idx); + let response = T::Response::from_native(&response_msg); + match sender.send(response) { + Ok(()) => {} + Err(e) => { + println!("error sending to client: {:?}", e); + } + } + } else { + let we_have: String = self + .response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } // TODO handle failure. + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_client_fini(&mut self.rcl_handle, node); + } + } +} + +pub struct UntypedClient_ +{ + pub service_type: UntypedServiceSupport, + pub rcl_handle: rcl_client_t, + pub response_channels: Vec<(i64, oneshot::Sender>)>, +} + +impl Client_ for UntypedClient_ +{ + fn handle(&self) -> &rcl_client_t { + &self.rcl_handle + } + + fn handle_response(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut response_msg = (self.service_type.make_response_msg)(); + + let ret = unsafe { + rcl_take_response( + &self.rcl_handle, + request_id.as_mut_ptr(), + response_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + if let Some(idx) = self + .response_channels + .iter() + .position(|(id, _)| id == &request_id.sequence_number) + { + let (_, sender) = self.response_channels.swap_remove(idx); + let response = response_msg.to_json(); + match sender.send(response) { + Ok(()) => {} + Err(e) => { + println!("error sending to client: {:?}", e); + } + } + } else { + let we_have: String = self + .response_channels + .iter() + .map(|(id, _)| id.to_string()) + .collect::>() + .join(","); + eprintln!( + "no such req id: {}, we have [{}], ignoring", + request_id.sequence_number, we_have + ); + } + } // TODO handle failure. + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_client_fini(&mut self.rcl_handle, node); + } + } +} + +pub fn create_client_helper( + node: *mut rcl_node_t, + service_name: &str, + service_ts: *const rosidl_service_type_support_t, +) -> Result { + let mut client_handle = unsafe { rcl_get_zero_initialized_client() }; + let service_name_c_string = + CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let client_options = rcl_client_get_default_options(); + rcl_client_init( + &mut client_handle, + node, + service_ts, + service_name_c_string.as_ptr(), + &client_options, + ) + }; + if result == RCL_RET_OK as i32 { + Ok(client_handle) + } else { + Err(Error::from_rcl_error(result)) + } +} diff --git a/src/clocks.rs b/src/clocks.rs new file mode 100644 index 0000000..d4a88c1 --- /dev/null +++ b/src/clocks.rs @@ -0,0 +1,77 @@ +use super::*; + +#[derive(Debug)] +pub enum ClockType { + RosTime, + SystemTime, + SteadyTime, +} + +unsafe impl Send for Clock {} + +pub struct Clock { + clock_handle: Box, +} + +pub fn clock_type_to_rcl(ct: &ClockType) -> rcl_clock_type_t { + match ct { + ClockType::RosTime => rcl_clock_type_t::RCL_ROS_TIME, + ClockType::SystemTime => rcl_clock_type_t::RCL_SYSTEM_TIME, + ClockType::SteadyTime => rcl_clock_type_t::RCL_STEADY_TIME, + } +} + + +impl Clock { + pub fn create(ct: ClockType) -> Result { + let mut clock_handle = MaybeUninit::::uninit(); + + let rcl_ct = clock_type_to_rcl(&ct); + let ret = unsafe { + rcl_clock_init( + rcl_ct, + clock_handle.as_mut_ptr(), + &mut rcutils_get_default_allocator(), + ) + }; + if ret != RCL_RET_OK as i32 { + eprintln!("could not create {:?} clock: {}", ct, ret); + return Err(Error::from_rcl_error(ret)); + } + + let clock_handle = Box::new(unsafe { clock_handle.assume_init() }); + Ok(Clock { clock_handle }) + } + + pub fn get_now(&mut self) -> Result { + let valid = unsafe { rcl_clock_valid(&mut *self.clock_handle) }; + if !valid { + return Err(Error::from_rcl_error(RCL_RET_INVALID_ARGUMENT as i32)); + } + let mut tp: rcutils_time_point_value_t = 0; + let ret = unsafe { rcl_clock_get_now(&mut *self.clock_handle, &mut tp) }; + + if ret != RCL_RET_OK as i32 { + eprintln!("could not create steady clock: {}", ret); + return Err(Error::from_rcl_error(ret)); + } + + let dur = Duration::from_nanos(tp as u64); + + Ok(dur) + } + + pub fn to_builtin_time(d: &Duration) -> builtin_interfaces::msg::Time { + let sec = d.as_secs() as i32; + let nanosec = d.subsec_nanos(); + builtin_interfaces::msg::Time { sec, nanosec } + } +} + +impl Drop for Clock { + fn drop(&mut self) { + unsafe { + rcl_clock_fini(&mut *self.clock_handle); + } + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..7f26c58 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,87 @@ +use super::*; + + +#[derive(Debug, Clone)] +pub struct Context { + pub context_handle: Arc>, +} + +unsafe impl Send for Context {} + +impl Context { + pub fn create() -> Result { + let mut ctx: Box = unsafe { Box::new(rcl_get_zero_initialized_context()) }; + // argc/v + let args = std::env::args() + .map(|arg| CString::new(arg).unwrap()) + .collect::>(); + let mut c_args = args + .iter() + .map(|arg| arg.as_ptr()) + .collect::>(); + c_args.push(std::ptr::null()); + + let is_valid = unsafe { + let allocator = rcutils_get_default_allocator(); + let mut init_options = rcl_get_zero_initialized_init_options(); + rcl_init_options_init(&mut init_options, allocator); + rcl_init( + (c_args.len() - 1) as ::std::os::raw::c_int, + c_args.as_ptr(), + &init_options, + ctx.as_mut(), + ); + rcl_init_options_fini(&mut init_options as *mut _); + rcl_context_is_valid(ctx.as_mut()) + }; + + let logging_ok = unsafe { + let _guard = log_guard(); + let ret = rcl_logging_configure( + &ctx.as_ref().global_arguments, + &rcutils_get_default_allocator(), + ); + ret == RCL_RET_OK as i32 + }; + + if is_valid && logging_ok { + Ok(Context { + context_handle: Arc::new(Mutex::new(ContextHandle(ctx))), + }) + } else { + Err(Error::RCL_RET_ERROR) // TODO + } + } + + pub fn is_valid(&self) -> bool { + let mut ctx = self.context_handle.lock().unwrap(); + unsafe { rcl_context_is_valid(ctx.as_mut()) } + } +} + +#[derive(Debug)] +pub struct ContextHandle(Box); + +impl Deref for ContextHandle { + type Target = Box; + + fn deref<'a>(&'a self) -> &'a Box { + &self.0 + } +} + +impl DerefMut for ContextHandle { + fn deref_mut<'a>(&'a mut self) -> &'a mut Box { + &mut self.0 + } +} + +impl Drop for ContextHandle { + fn drop(&mut self) { + // TODO: error handling? atleast probably need rcl_reset_error + unsafe { + rcl::rcl_shutdown(self.0.as_mut()); + rcl::rcl_context_fini(self.0.as_mut()); + } + } +} diff --git a/src/error.rs b/src/error.rs index d3d2aac..261867b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,7 +2,7 @@ use rcl::*; use thiserror::Error; -// TODO +pub type Result = std::result::Result; #[derive(Error, Debug)] pub enum Error { diff --git a/src/lib.rs b/src/lib.rs index 28141bc..75f935c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,10 @@ -include!(concat!(env!("OUT_DIR"), "/_r2r_generated_msgs.rs")); -include!(concat!( - env!("OUT_DIR"), - "/_r2r_generated_untyped_helper.rs" -)); - -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::fmt::Debug; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use futures::channel::{mpsc, oneshot}; @@ -26,1256 +20,99 @@ use retain_mut::RetainMut; pub extern crate uuid; use actions::*; -use msg_gen::*; use rcl::*; mod error; use error::*; +mod msg_types; +use msg_types::*; + +pub use msg_types::generated_msgs::*; +pub use msg_types::WrappedNativeMsg as NativeMsg; + mod utils; pub use utils::*; -pub type Result = std::result::Result; +mod subscribers; +use subscribers::*; -pub trait WrappedTypesupport: - Serialize + serde::de::DeserializeOwned + Default + Debug + Clone -{ - type CStruct; +mod publishers; +use publishers::*; - fn get_ts() -> &'static rosidl_message_type_support_t; - fn create_msg() -> *mut Self::CStruct; - fn destroy_msg(msg: *mut Self::CStruct); - fn from_native(msg: &Self::CStruct) -> Self; - fn copy_to_native(&self, msg: &mut Self::CStruct); -} +mod services; +use services::*; -pub trait WrappedServiceTypeSupport: Debug + Clone { - type Request: WrappedTypesupport; - type Response: WrappedTypesupport; +mod clients; +use clients::*; - fn get_ts() -> &'static rosidl_service_type_support_t; -} +mod action_clients; +use action_clients::*; -pub trait WrappedActionTypeSupport: Debug + Clone { - type Goal: WrappedTypesupport; - type Result: WrappedTypesupport; - type Feedback: WrappedTypesupport; +mod action_servers; +use action_servers::*; - // internal... - type FeedbackMessage: WrappedTypesupport; - type SendGoal: WrappedServiceTypeSupport; - type GetResult: WrappedServiceTypeSupport; +pub use action_servers::ServerGoal; - fn get_ts() -> &'static rosidl_action_type_support_t; +mod context; +pub use context::Context; - fn make_goal_request_msg( - goal_id: unique_identifier_msgs::msg::UUID, - goal: Self::Goal, - ) -> <::SendGoal as WrappedServiceTypeSupport>::Request; - fn make_goal_response_msg( - accepted: bool, - stamp: builtin_interfaces::msg::Time, - ) -> <::SendGoal as WrappedServiceTypeSupport>::Response; - fn make_feedback_msg( - goal_id: unique_identifier_msgs::msg::UUID, - feedback: Self::Feedback, - ) -> Self::FeedbackMessage; - fn make_result_request_msg( - goal_id: unique_identifier_msgs::msg::UUID, - ) -> <::GetResult as WrappedServiceTypeSupport>::Request; - fn make_result_response_msg( - status: i8, - result: Self::Result, - ) -> <::GetResult as WrappedServiceTypeSupport>::Response; - fn destructure_goal_request_msg( - msg: <::SendGoal as WrappedServiceTypeSupport>::Request, - ) -> (unique_identifier_msgs::msg::UUID, Self::Goal); - fn destructure_goal_response_msg( - msg: <::SendGoal as WrappedServiceTypeSupport>::Response, - ) -> (bool, builtin_interfaces::msg::Time); - fn destructure_feedback_msg( - msg: Self::FeedbackMessage, - ) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); - fn destructure_result_response_msg( - msg: <::GetResult as WrappedServiceTypeSupport>::Response, - ) -> (i8, Self::Result); - fn destructure_result_request_msg( - msg: <::GetResult as WrappedServiceTypeSupport>::Request, - ) -> unique_identifier_msgs::msg::UUID; -} +mod parameters; +use parameters::*; +pub use parameters::ParameterValue; -#[derive(Debug)] -pub struct WrappedNativeMsg -where - T: WrappedTypesupport, -{ - pub msg: *mut T::CStruct, -} - -trait VoidPtr { - fn void_ptr(&self) -> *const std::os::raw::c_void; - fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void; -} - -#[derive(Debug)] -pub struct WrappedNativeMsgUntyped { - ts: &'static rosidl_message_type_support_t, - msg: *mut std::os::raw::c_void, - destroy: fn(*mut std::os::raw::c_void), - msg_to_json: fn( - native: *const std::os::raw::c_void, - ) -> std::result::Result, - msg_from_json: fn( - native: *mut std::os::raw::c_void, - json: serde_json::Value, - ) -> std::result::Result<(), serde_json::error::Error>, -} - -impl WrappedNativeMsgUntyped { - fn new() -> Self - where - T: WrappedTypesupport, - { - let destroy = |native: *mut std::os::raw::c_void| { - let native_msg = native as *mut T::CStruct; - T::destroy_msg(native_msg); - }; - - let msg_to_json = |native: *const std::os::raw::c_void| { - let msg = unsafe { T::from_native(&*(native as *const T::CStruct)) }; - serde_json::to_value(&msg) - }; - - let msg_from_json = |native: *mut std::os::raw::c_void, json: serde_json::Value| { - serde_json::from_value(json).map(|msg: T| unsafe { - msg.copy_to_native(&mut *(native as *mut T::CStruct)); - }) - }; - - WrappedNativeMsgUntyped { - ts: T::get_ts(), - msg: T::create_msg() as *mut std::os::raw::c_void, - destroy, - msg_to_json, - msg_from_json, - } - } - - fn to_json(&self) -> Result { - let json = (self.msg_to_json)(self.msg); - json.map_err(|serde_err| Error::SerdeError { - err: serde_err.to_string(), - }) - } - - fn from_json(&mut self, json: serde_json::Value) -> Result<()> { - (self.msg_from_json)(self.msg, json).map_err(|serde_err| Error::SerdeError { - err: serde_err.to_string(), - }) - } -} - -impl VoidPtr for WrappedNativeMsgUntyped { - fn void_ptr(&self) -> *const std::os::raw::c_void { - self.msg as *const _ as *const std::os::raw::c_void - } - - fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void { - self.msg as *mut _ as *mut std::os::raw::c_void - } -} - -impl Drop for WrappedNativeMsgUntyped { - fn drop(&mut self) { - (self.destroy)(self.msg); - } -} - -impl WrappedNativeMsg -where - T: WrappedTypesupport, -{ - pub fn new() -> Self { - WrappedNativeMsg { - msg: T::create_msg(), - } - } - - pub fn from(msg: &T) -> Self { - let mut native_msg = Self::new(); - msg.copy_to_native(&mut native_msg); - native_msg - } -} - -impl VoidPtr for WrappedNativeMsg -where - T: WrappedTypesupport, -{ - fn void_ptr(&self) -> *const std::os::raw::c_void { - self.msg as *const _ as *const std::os::raw::c_void - } - - fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void { - self.msg as *mut _ as *mut std::os::raw::c_void - } -} - -impl Drop for WrappedNativeMsg -where - T: WrappedTypesupport, -{ - fn drop(&mut self) { - T::destroy_msg(self.msg); - } -} - -impl Deref for WrappedNativeMsg -where - T: WrappedTypesupport, -{ - type Target = T::CStruct; - - fn deref(&self) -> &Self::Target { - unsafe { &(*self.msg) } - } -} - -impl DerefMut for WrappedNativeMsg -where - T: WrappedTypesupport, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut (*self.msg) } - } -} - -trait Sub { - fn handle(&self) -> &rcl_subscription_t; - fn handle_incoming(&mut self) -> (); - fn destroy(&mut self, node: &mut rcl_node_t) -> (); -} - -struct WrappedSub -where - T: WrappedTypesupport, -{ - rcl_handle: rcl_subscription_t, - sender: mpsc::Sender, -} - -struct WrappedSubNative -where - T: WrappedTypesupport, -{ - rcl_handle: rcl_subscription_t, - sender: mpsc::Sender>, -} - -struct WrappedSubUntyped { - rcl_handle: rcl_subscription_t, - topic_type: String, - sender: mpsc::Sender>, -} - -impl Sub for WrappedSub -where - T: WrappedTypesupport, -{ - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle - } - - fn handle_incoming(&mut self) -> () { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take( - &self.rcl_handle, - msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let msg = T::from_native(&msg); - match self.sender.try_send(msg) { - Err(e) => println!("error {:?}", e), - _ => (), - } - } - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); - } - } -} - -impl Sub for WrappedSubNative -where - T: WrappedTypesupport, -{ - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle - } - - fn handle_incoming(&mut self) -> () { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take( - &self.rcl_handle, - msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - match self.sender.try_send(msg) { - Err(e) => println!("error {:?}", e), - _ => (), - } - } - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); - } - } -} - -impl Sub for WrappedSubUntyped { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle - } - - fn handle_incoming(&mut self) -> () { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) - .expect(&format!("no typesupport for {}", self.topic_type)); - let ret = unsafe { - rcl_take( - &self.rcl_handle, - msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let json = msg.to_json(); - match self.sender.try_send(json) { - Err(e) => println!("error {:?}", e), - _ => (), - } - } - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); - } - } -} - -struct WrappedService -where - T: WrappedServiceTypeSupport, -{ - rcl_handle: rcl_service_t, - sender: mpsc::Sender>, - outstanding_requests: Vec>, -} - -trait Service { - fn handle(&self) -> &rcl_service_t; - fn send_completed_responses(&mut self) -> (); - fn handle_request(&mut self) -> (); - fn destroy(&mut self, node: &mut rcl_node_t) -> (); -} - -impl Service for WrappedService -where - T: WrappedServiceTypeSupport, -{ - fn handle(&self) -> &rcl_service_t { - &self.rcl_handle - } - - fn send_completed_responses(&mut self) -> () { - let mut to_send = vec![]; - self.outstanding_requests.retain_mut(|r| { - match r.try_recv() { - Ok(Some(resp)) => { - to_send.push(resp); - false // done with this. - } - Ok(None) => true, // keep message, waiting for service - Err(_) => false, // channel canceled - } - }); - - for (mut req_id, msg) in to_send { - let mut native_response = WrappedNativeMsg::::from(&msg); - let res = unsafe { - rcl_send_response( - &self.rcl_handle, - &mut req_id, - native_response.void_ptr_mut(), - ) - }; - - // TODO - if res != RCL_RET_OK as i32 { - eprintln!("could not send service response {}", res); - } - } - } - - fn handle_request(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::::new(); - - let ret = unsafe { - rcl_take_request( - &self.rcl_handle, - request_id.as_mut_ptr(), - request_msg.void_ptr_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let request_id = unsafe { request_id.assume_init() }; - let request_msg = T::Request::from_native(&request_msg); - let (response_sender, response_receiver) = - oneshot::channel::<(rmw_request_id_t, T::Response)>(); - self.outstanding_requests.push(response_receiver); - let request = ServiceRequest:: { - message: request_msg, - request_id, - response_sender, - }; - match self.sender.try_send(request) { - Err(e) => eprintln!("warning: could not send service request ({})", e), - _ => (), - } - } // TODO handle failure. - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_service_fini(&mut self.rcl_handle, node); - } - } -} - -// client -struct WrappedClient -where - T: WrappedServiceTypeSupport, -{ - rcl_handle: rcl_client_t, - // store callbacks with request sequence id and callback function - response_channels: Vec<(i64, oneshot::Sender)>, -} - -trait Client_ { - fn handle(&self) -> &rcl_client_t; - fn handle_response(&mut self) -> (); - fn destroy(&mut self, node: &mut rcl_node_t) -> (); -} - -impl Client_ for WrappedClient -where - T: WrappedServiceTypeSupport, -{ - fn handle(&self) -> &rcl_client_t { - &self.rcl_handle - } - - fn handle_response(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::::new(); - - let ret = unsafe { - rcl_take_response( - &self.rcl_handle, - request_id.as_mut_ptr(), - response_msg.void_ptr_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self - .response_channels - .iter() - .position(|(id, _)| id == &request_id.sequence_number) - { - let (_, sender) = self.response_channels.swap_remove(idx); - let response = T::Response::from_native(&response_msg); - match sender.send(response) { - Ok(()) => {} - Err(e) => { - println!("error sending to client: {:?}", e); - } - } - } else { - let we_have: String = self - .response_channels - .iter() - .map(|(id, _)| id.to_string()) - .collect::>() - .join(","); - eprintln!( - "no such req id: {}, we have [{}], ignoring", - request_id.sequence_number, we_have - ); - } - } // TODO handle failure. - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_client_fini(&mut self.rcl_handle, node); - } - } -} - -// action clients -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum GoalStatus { - Unknown, - Accepted, - Executing, - Canceling, - Succeeded, - Canceled, - Aborted, -} - -impl GoalStatus { - #[allow(dead_code)] - pub fn to_rcl(&self) -> i8 { - match self { - GoalStatus::Unknown => 0, - GoalStatus::Accepted => 1, - GoalStatus::Executing => 2, - GoalStatus::Canceling => 3, - GoalStatus::Succeeded => 4, - GoalStatus::Canceled => 5, - GoalStatus::Aborted => 6, - } - } - - pub fn from_rcl(s: i8) -> Self { - match s { - 0 => GoalStatus::Unknown, - 1 => GoalStatus::Accepted, - 2 => GoalStatus::Executing, - 3 => GoalStatus::Canceling, - 4 => GoalStatus::Succeeded, - 5 => GoalStatus::Canceled, - 6 => GoalStatus::Aborted, - _ => panic!("unknown action status: {}", s), - } - } -} - -struct WrappedActionClient -where - T: WrappedActionTypeSupport, -{ - rcl_handle: rcl_action_client_t, - goal_response_channels: Vec<( - i64, - oneshot::Sender< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >, - )>, - cancel_response_channels: Vec<(i64, oneshot::Sender)>, - feedback_senders: Vec<(uuid::Uuid, mpsc::Sender)>, - result_requests: Vec<(i64, uuid::Uuid)>, - result_senders: Vec<(uuid::Uuid, oneshot::Sender)>, - goal_status: HashMap, -} - -trait ActionClient_ { - fn handle(&self) -> &rcl_action_client_t; - fn destroy(&mut self, node: &mut rcl_node_t) -> (); - - fn handle_goal_response(&mut self) -> (); - fn handle_cancel_response(&mut self) -> (); - fn handle_feedback_msg(&mut self) -> (); - fn handle_status_msg(&mut self) -> (); - fn handle_result_response(&mut self) -> (); - - fn send_result_request(&mut self, uuid: uuid::Uuid) -> (); -} - -use std::convert::TryInto; -fn vec_to_uuid_bytes(v: Vec) -> [T; 16] { - v.try_into().unwrap_or_else(|v: Vec| { - panic!("Expected a Vec of length {} but it was {}", 16, v.len()) - }) -} - -impl WrappedActionClient -where - T: WrappedActionTypeSupport, -{ - fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus { - *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown) - } - - fn send_cancel_request(&mut self, goal: &uuid::Uuid) -> Result>> - where - T: WrappedActionTypeSupport, - { - let msg = action_msgs::srv::CancelGoal::Request { - goal_info: action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { - uuid: goal.as_bytes().to_vec(), - }, - ..action_msgs::msg::GoalInfo::default() - }, - }; - let native_msg = WrappedNativeMsg::::from(&msg); - let mut seq_no = 0i64; - let result = unsafe { - rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) - }; - - if result == RCL_RET_OK as i32 { - let (cancel_req_sender, cancel_req_receiver) = - oneshot::channel::(); - - self.cancel_response_channels - .push((seq_no, cancel_req_sender)); - // instead of "canceled" we return invalid client. - let future = cancel_req_receiver - .map_err(|_| Error::RCL_RET_CLIENT_INVALID) - .map(|r| match r { - Ok(r) => match r.return_code { - 0 => Ok(()), - 1 => Err(Error::GoalCancelRejected), - 2 => Err(Error::GoalCancelUnknownGoalID), - 3 => Err(Error::GoalCancelAlreadyTerminated), - x => panic!("unknown error code return from action server: {}", x), - }, - Err(e) => Err(e), - }); - Ok(future) - } else { - eprintln!("coult not send goal request {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - -impl ActionClient_ for WrappedActionClient -where - T: WrappedActionTypeSupport, -{ - fn handle(&self) -> &rcl_action_client_t { - &self.rcl_handle - } - - fn handle_goal_response(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >::new(); - - let ret = unsafe { - rcl_action_take_goal_response( - &self.rcl_handle, - request_id.as_mut_ptr(), - response_msg.void_ptr_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self - .goal_response_channels - .iter() - .position(|(id, _)| id == &request_id.sequence_number) - { - let (_, sender) = self.goal_response_channels.swap_remove(idx); - let response = <::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg); - match sender.send(response) { - Ok(()) => {} - Err(e) => { - println!("error sending to action client: {:?}", e); - } - } - } else { - let we_have: String = self - .goal_response_channels - .iter() - .map(|(id, _)| id.to_string()) - .collect::>() - .join(","); - eprintln!( - "no such req id: {}, we have [{}], ignoring", - request_id.sequence_number, we_have - ); - } - } - } - - fn handle_cancel_response(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::::new(); - - let ret = unsafe { - rcl_action_take_cancel_response( - &self.rcl_handle, - request_id.as_mut_ptr(), - response_msg.void_ptr_mut(), - ) - }; - if ret == RCL_RET_OK as i32 { - let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self - .cancel_response_channels - .iter() - .position(|(id, _)| id == &request_id.sequence_number) - { - let (_, sender) = self.cancel_response_channels.swap_remove(idx); - let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg); - match sender.send(response) { - Err(e) => eprintln!("warning: could not send cancel response msg ({:?})", e), - _ => (), - } - } else { - let we_have: String = self - .goal_response_channels - .iter() - .map(|(id, _)| id.to_string()) - .collect::>() - .join(","); - eprintln!( - "no such req id: {}, we have [{}], ignoring", - request_id.sequence_number, we_have - ); - } - } - } - - fn handle_feedback_msg(&mut self) -> () { - let mut feedback_msg = WrappedNativeMsg::::new(); - let ret = - unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) }; - if ret == RCL_RET_OK as i32 { - let msg = T::FeedbackMessage::from_native(&feedback_msg); - let (uuid, feedback) = T::destructure_feedback_msg(msg); - let msg_uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid.uuid)); - if let Some((_, sender)) = self - .feedback_senders - .iter_mut() - .find(|(uuid, _)| uuid == &msg_uuid) - { - match sender.try_send(feedback) { - Err(e) => eprintln!("warning: could not send feedback msg ({})", e), - _ => (), - } - } - } - } - - fn handle_status_msg(&mut self) -> () { - let mut status_array = WrappedNativeMsg::::new(); - let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) }; - if ret == RCL_RET_OK as i32 { - let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array); - for a in &arr.status_list { - let uuid = - uuid::Uuid::from_bytes(vec_to_uuid_bytes(a.goal_info.goal_id.uuid.clone())); - if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) { - continue; - } - let status = GoalStatus::from_rcl(a.status); - *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status; - } - } - } - - fn handle_result_response(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut response_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Response, - >::new(); - - let ret = unsafe { - rcl_action_take_result_response( - &self.rcl_handle, - request_id.as_mut_ptr(), - response_msg.void_ptr_mut(), - ) - }; - - if ret == RCL_RET_OK as i32 { - let request_id = unsafe { request_id.assume_init() }; - if let Some(idx) = self - .result_requests - .iter() - .position(|(id, _)| id == &request_id.sequence_number) - { - let (_, uuid) = self.result_requests.swap_remove(idx); - if let Some(idx) = self - .result_senders - .iter() - .position(|(suuid, _)| suuid == &uuid) - { - let (_, sender) = self.result_senders.swap_remove(idx); - let response = <::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg); - let (status, result) = T::destructure_result_response_msg(response); - let status = GoalStatus::from_rcl(status); - if status != GoalStatus::Succeeded { - println!("goal status failed: {:?}, result: {:?}", status, result); - // this will drop the sender which makes the receiver fail with "canceled" - } else { - match sender.send(result) { - Ok(()) => {} - Err(e) => { - println!("error sending result to action client: {:?}", e); - } - } - } - } - } else { - let we_have: String = self - .result_requests - .iter() - .map(|(id, _)| id.to_string()) - .collect::>() - .join(","); - eprintln!( - "no such req id: {}, we have [{}], ignoring", - request_id.sequence_number, we_have - ); - } - } - } - - fn send_result_request(&mut self, uuid: uuid::Uuid) -> () { - let uuid_msg = unique_identifier_msgs::msg::UUID { - uuid: uuid.as_bytes().to_vec(), - }; - let request_msg = T::make_result_request_msg(uuid_msg); - let native_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Request, - >::from(&request_msg); - let mut seq_no = 0i64; - let result = unsafe { - rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) - }; - - if result == RCL_RET_OK as i32 { - self.result_requests.push((seq_no, uuid)); - } else { - eprintln!("coult not send request {}", result); - } - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_action_client_fini(&mut self.rcl_handle, node); - } - } -} - -struct WrappedActionServer -where - T: WrappedActionTypeSupport, -{ - rcl_handle: rcl_action_server_t, - clock_handle: Box, - accept_goal_cb: Box bool>, - accept_cancel_cb: Box) -> bool>, - goal_cb: Box)>, - goals: HashMap>, - result_msgs: HashMap>, - result_requests: HashMap>, -} - -trait ActionServer_ { - fn handle(&self) -> &rcl_action_server_t; - fn handle_goal_request(&mut self, server: Arc>) -> (); - fn handle_cancel_request(&mut self) -> (); - fn handle_result_request(&mut self) -> (); - fn handle_goal_expired(&mut self) -> (); - fn publish_status(&self) -> (); - fn add_result(&mut self, uuid: uuid::Uuid, msg: Box) -> (); - fn destroy(&mut self, node: &mut rcl_node_t); -} - -impl ActionServer_ for WrappedActionServer -where - T: WrappedActionTypeSupport, -{ - fn handle(&self) -> &rcl_action_server_t { - &self.rcl_handle - } - - fn handle_goal_request(&mut self, server: Arc>) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::< - <::SendGoal as WrappedServiceTypeSupport>::Request, - >::new(); - let ret = unsafe { - rcl_action_take_goal_request( - &self.rcl_handle, - request_id.as_mut_ptr(), - request_msg.void_ptr_mut(), - ) - }; - - if ret != RCL_RET_OK as i32 { - // this seems normal if client dies. - return; - } - let msg = <<::SendGoal as WrappedServiceTypeSupport>::Request>::from_native(&request_msg); - let (uuid_msg, goal) = T::destructure_goal_request_msg(msg); - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(uuid_msg.uuid.clone())); - let goal_accepted = (self.accept_goal_cb)(&uuid, &goal); - let time = builtin_interfaces::msg::Time::default(); - - let goal_info = action_msgs::msg::GoalInfo { - goal_id: uuid_msg, - stamp: time.clone(), - }; - - let native_goal_info = WrappedNativeMsg::::from(&goal_info); - - let goal_handle: Option<*mut rcl_action_goal_handle_t> = if goal_accepted { - unsafe { - Some(rcl_action_accept_new_goal( - &mut self.rcl_handle, - &*native_goal_info, - )) - } - } else { - None - }; - - // send response - let response_msg = T::make_goal_response_msg(goal_accepted, time); - let mut response_msg = WrappedNativeMsg::< - <::SendGoal as WrappedServiceTypeSupport>::Response, - >::from(&response_msg); - - let ret = unsafe { - let mut request_id = request_id.assume_init(); - rcl_action_send_goal_response( - &self.rcl_handle, - &mut request_id, - response_msg.void_ptr_mut(), - ) - }; - if ret != RCL_RET_OK as i32 { - println!("action server: failed to send goal response"); - return; - } - - // if we accepted the goal, update its state machine and publish all goal statuses - if let Some(goal_handle) = goal_handle { - unsafe { - rcl_action_update_goal_state( - goal_handle, - rcl_action_goal_event_t::GOAL_EVENT_EXECUTE, - ); - } - - self.publish_status(); - - // run the user supplied cb with newly created goal handle object - let g: ServerGoal = ServerGoal { - uuid, - goal, - handle: Arc::new(Mutex::new(goal_handle)), - server: Arc::downgrade(&server), - }; - - self.goals.insert(uuid, g.clone()); - - // start goal callback - (self.goal_cb)(g); - } - } - - fn handle_cancel_request(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_action_take_cancel_request( - &self.rcl_handle, - request_id.as_mut_ptr(), - request_msg.void_ptr_mut(), - ) - }; - - if ret != RCL_RET_OK as i32 { - // this seems normal if client dies. - return; - } - - let mut cancel_response = unsafe { rcl_action_get_zero_initialized_cancel_response() }; - let ret = unsafe { - rcl_action_process_cancel_request(&self.rcl_handle, &*request_msg, &mut cancel_response) - }; - - if ret != RCL_RET_OK as i32 { - println!("action server: could not process cancel request. {}", ret); - return; - } - - let mut response_msg = - action_msgs::srv::CancelGoal::Response::from_native(&cancel_response.msg); - - // let user filter cancelled goals. - let requested_cancels = response_msg.goals_canceling.len(); - response_msg.goals_canceling.retain(|goal_info| { - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); - if let Some(goal) = self.goals.get(&uuid) { - (self.accept_cancel_cb)(goal) - } else { - true - } - }); - - response_msg.goals_canceling.iter().for_each(|goal_info| { - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); - if let Some(goal) = self.goals.get_mut(&uuid) { - goal.set_cancel(); - } - }); - - // check if all cancels were rejected. - if requested_cancels >= 1 && response_msg.goals_canceling.is_empty() { - response_msg.return_code = 1; // TODO: auto generate these (int8 ERROR_REJECTED=1) - } - - if !response_msg.goals_canceling.is_empty() { - // at least one goal state changed, publish a new status message - self.publish_status(); - } - - let mut native_msg = - WrappedNativeMsg::::from(&response_msg); - let ret = unsafe { - let mut request_id = request_id.assume_init(); - rcl_action_send_cancel_response( - &self.rcl_handle, - &mut request_id, - native_msg.void_ptr_mut(), - ) - }; - - if ret != RCL_RET_OK as i32 { - println!("action server: could send cancel response. {}", ret); - return; - } - } - - fn handle_goal_expired(&mut self) { - let mut goal_info = WrappedNativeMsg::::new(); - let mut num_expired = 1; - - while num_expired > 1 { - let ret = unsafe { - rcl_action_expire_goals(&self.rcl_handle, &mut *goal_info, 1, &mut num_expired) - }; - if ret != RCL_RET_OK as i32 { - println!("action server: could not expire goal."); - return; - } - let gi = action_msgs::msg::GoalInfo::from_native(&goal_info); - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(gi.goal_id.uuid.clone())); - println!("goal expired: {} - {}", uuid, num_expired); - self.goals.remove(&uuid); - self.result_msgs.remove(&uuid); - self.result_requests.remove(&uuid); - } - } - - fn publish_status(&self) { - unsafe { - let mut status = rcl_action_get_zero_initialized_goal_status_array(); - let ret = rcl_action_get_goal_status_array(&self.rcl_handle, &mut status); - if ret != RCL_RET_OK as i32 { - println!( - "action server: failed to get goal status array: {}", - Error::from_rcl_error(ret) - ); - return; - } - let ret = rcl_action_publish_status( - &self.rcl_handle, - &status as *const _ as *const std::os::raw::c_void, - ); - if ret != RCL_RET_OK as i32 { - println!( - "action server: failed to publish status: {}", - Error::from_rcl_error(ret) - ); - return; - } - rcl_action_goal_status_array_fini(&mut status); - } - } - - // bit of a hack... - fn add_result(&mut self, uuid: uuid::Uuid, mut msg: Box) -> () { - // if there are already requests for this goal, send the result immediately. - if let Some(rr) = self.result_requests.remove(&uuid) { - for mut req in rr { - let ret = unsafe { - rcl_action_send_result_response(&self.rcl_handle, &mut req, msg.void_ptr_mut()) - }; - if ret != RCL_RET_OK as i32 { - println!( - "action server: could send result request response. {}", - Error::from_rcl_error(ret) - ); - } - } - } - self.result_msgs.insert(uuid, msg); - } - - fn handle_result_request(&mut self) -> () { - let mut request_id = MaybeUninit::::uninit(); - let mut request_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Request, - >::new(); - let ret = unsafe { - rcl_action_take_result_request( - &self.rcl_handle, - request_id.as_mut_ptr(), - request_msg.void_ptr_mut(), - ) - }; - - if ret != RCL_RET_OK as i32 { - // this seems normal if client dies. - return; - } - - let msg = <<::GetResult as WrappedServiceTypeSupport>::Request>::from_native(&request_msg); - let goal_info = action_msgs::msg::GoalInfo { - goal_id: T::destructure_result_request_msg(msg), - ..action_msgs::msg::GoalInfo::default() - }; - let goal_info_native = WrappedNativeMsg::::from(&goal_info); - - // does this goal exist? - let goal_exists = - unsafe { rcl_action_server_goal_exists(&self.rcl_handle, &*goal_info_native) }; - - let uuid = uuid::Uuid::from_bytes(vec_to_uuid_bytes(goal_info.goal_id.uuid.clone())); - - let response_msg = if !goal_exists { - // Goal does not exists - println!("goal does not exist :("); - let status = GoalStatus::Unknown; - let msg = T::make_result_response_msg(status.to_rcl(), T::Result::default()); - let mut response_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Response, - >::from(&msg); - Some(response_msg.void_ptr_mut()) - } else { - self.result_msgs - .get_mut(&uuid) - .map(|msg| msg.void_ptr_mut()) - }; - - let mut request_id = unsafe { request_id.assume_init() }; - if let Some(response_msg) = response_msg { - let ret = unsafe { - rcl_action_send_result_response(&self.rcl_handle, &mut request_id, response_msg) - }; - - if ret != RCL_RET_OK as i32 { - println!( - "action server: could send result request response. {}", - Error::from_rcl_error(ret) - ); - return; - } - } else { - // keep request for later when result comes in - // todo: add logic that replies to the requests - self.result_requests - .entry(uuid) - .or_insert(vec![]) - .push(request_id); - } - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_action_server_fini(&mut self.rcl_handle, node); - rcl_ros_clock_fini(self.clock_handle.as_mut()); - } - } -} - -// The publish function is thread safe. ROS2 docs state: -// ============= -// -// This function is thread safe so long as access to both the -// publisher and the" `ros_message` is synchronized." That means that -// calling rcl_publish() from multiple threads is allowed, but" -// calling rcl_publish() at the same time as non-thread safe -// publisher" functions is not, e.g. calling rcl_publish() and -// rcl_publisher_fini()" concurrently is not allowed." Before calling -// rcl_publish() the message can change and after calling" -// rcl_publish() the message can change, but it cannot be changed -// during the" publish call." The same `ros_message`, however, can be -// passed to multiple calls of" rcl_publish() simultaneously, even if -// the publishers differ." The `ros_message` is unmodified by -// rcl_publish()." -// -// TODO: I guess there is a potential error source in destructuring -// while calling publish. I don't think its worth to protect with a -// mutex/rwlock for this though... -// -// Methods that mutate need to called from the thread owning the Node. -// I don't think we can count on Node being generally thread-safe. -// So keep pub/sub management and polling contained to one thread -// and send out publishers. - -unsafe impl Send for Publisher where T: WrappedTypesupport {} - -use std::sync::{Arc, Mutex, Weak}; -#[derive(Debug, Clone)] -pub struct Publisher -where - T: WrappedTypesupport, -{ - handle: Weak, - type_: PhantomData, -} - -unsafe impl Send for PublisherUntyped {} -#[derive(Debug, Clone)] -pub struct PublisherUntyped { - handle: Weak, - type_: String, -} - -// Same reasoning for clients. -unsafe impl Send for Client where T: WrappedServiceTypeSupport {} +mod clocks; +pub use clocks::{Clock, ClockType}; +// Public facing client API. pub struct Client where T: WrappedServiceTypeSupport, { - client_: Weak>>, + client_: Weak>>, } +impl Client +where + T: WrappedServiceTypeSupport, +{ + pub fn request(&self, msg: &T::Request) -> Result>> + where + T: WrappedServiceTypeSupport, + { + // upgrade to actual ref. if still alive + let client = self + .client_ + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.request(msg) + } +} + +// Public facing client API. +pub struct UntypedClient +{ + client_: Weak>, +} + +impl UntypedClient +{ + pub fn request(&self, msg: serde_json::Value) -> Result>>> + { + // upgrade to actual ref. if still alive + let client = self + .client_ + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let mut client = client.lock().unwrap(); + client.request(msg) + } +} + +/// Encapsulates a service request. In contrast to having a simply callback from +/// Request -> Response types that is called synchronously, the service request +/// can be moved around and completed asynchronously. pub struct ServiceRequest where T: WrappedServiceTypeSupport, @@ -1285,9 +122,6 @@ where response_sender: oneshot::Sender<(rmw_request_id_t, T::Response)>, } -/// Encapsulates a service request. In contrast to having a simply callback from -/// Request -> Response types that is called synchronously, the service request -/// can be moved around and completed asynchronously. impl ServiceRequest where T: WrappedServiceTypeSupport, @@ -1325,235 +159,6 @@ where result: Arc>>>, } -#[derive(Clone)] -pub struct ServerGoal -where - T: WrappedActionTypeSupport, -{ - pub uuid: uuid::Uuid, - pub goal: T::Goal, - handle: Arc>, - server: Weak>, -} - -unsafe impl Send for ServerGoal where T: WrappedActionTypeSupport {} - -impl ServerGoal -where - T: WrappedActionTypeSupport, -{ - pub fn is_cancelling(&self) -> bool { - let mut state = 0u8; // TODO: int8 STATUS_UNKNOWN = 0; - let ret = unsafe { - let handle = self.handle.lock().unwrap(); - rcl_action_goal_handle_get_status(*handle, &mut state) - }; - - if ret != RCL_RET_OK as i32 { - println!("action server: Failed to get goal handle state: {}", ret); - } - return state == 3u8; // TODO: int8 STATUS_CANCELING - } - - pub fn publish_feedback(&self, msg: T::Feedback) -> Result<()> - where - T: WrappedActionTypeSupport, - { - // upgrade to actual ref. if still alive - let action_server = self - .server - .upgrade() - .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; - - let uuid_msg = unique_identifier_msgs::msg::UUID { - uuid: self.uuid.as_bytes().to_vec(), - }; - let feedback_msg = T::make_feedback_msg(uuid_msg, msg); - let mut native_msg = WrappedNativeMsg::::from(&feedback_msg); - let ret = unsafe { - rcl_action_publish_feedback( - action_server.lock().unwrap().handle(), - native_msg.void_ptr_mut(), - ) - }; - - if ret == RCL_RET_OK as i32 { - Ok(()) - } else { - eprintln!("coult not publish {}", Error::from_rcl_error(ret)); - Ok(()) // todo: error codes - } - } - - fn set_cancel(&mut self) { - let ret = unsafe { - let handle = self.handle.lock().unwrap(); - rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_CANCEL_GOAL) - }; - - if ret != RCL_RET_OK as i32 { - println!( - "action server: could not cancel goal: {}", - Error::from_rcl_error(ret) - ); - } - } - - pub fn cancel(&mut self, msg: T::Result) -> Result<()> { - // upgrade to actual ref. if still alive - let action_server = self - .server - .upgrade() - .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; - let mut action_server = action_server.lock().unwrap(); - - // todo: check that the goal exists - let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { - uuid: self.uuid.as_bytes().to_vec(), - }, - ..action_msgs::msg::GoalInfo::default() - }; - let goal_info_native = WrappedNativeMsg::::from(&goal_info); - - // does this goal exist? - let goal_exists = - unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; - - if !goal_exists { - println!("tried to publish result without a goal"); - return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); - } - - // todo: error handling - unsafe { - rcl_action_notify_goal_done(action_server.handle()); - } - - // send out updated statues - action_server.publish_status(); - - // create result message - let result_msg = T::make_result_response_msg(5, msg); // todo: int8 STATUS_CANCELED = 5 - let native_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Response, - >::from(&result_msg); - action_server.add_result(self.uuid.clone(), Box::new(native_msg)); - - Ok(()) - } - - pub fn abort(&mut self, msg: T::Result) -> Result<()> { - // todo: error handling - let ret = unsafe { - let handle = self.handle.lock().unwrap(); - rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_ABORT) - }; - - if ret != RCL_RET_OK as i32 { - println!( - "action server: could not cancel goal: {}", - Error::from_rcl_error(ret) - ); - } - - // upgrade to actual ref. if still alive - let action_server = self - .server - .upgrade() - .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; - let mut action_server = action_server.lock().unwrap(); - - // todo: check that the goal exists - let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { - uuid: self.uuid.as_bytes().to_vec(), - }, - ..action_msgs::msg::GoalInfo::default() - }; - let goal_info_native = WrappedNativeMsg::::from(&goal_info); - - // does this goal exist? - let goal_exists = - unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; - - if !goal_exists { - println!("tried to abort without a goal"); - return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); - } - - // todo: error handling - unsafe { - rcl_action_notify_goal_done(action_server.handle()); - } - - // send out updated statues - action_server.publish_status(); - - // create result message - let result_msg = T::make_result_response_msg(6, msg); // todo: int8 STATUS_ABORTED = 6 - let native_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Response, - >::from(&result_msg); - action_server.add_result(self.uuid.clone(), Box::new(native_msg)); - - Ok(()) - } - - pub fn succeed(&mut self, msg: T::Result) -> Result<()> - where - T: WrappedActionTypeSupport, - { - // upgrade to actual ref. if still alive - let action_server = self - .server - .upgrade() - .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?; - let mut action_server = action_server.lock().unwrap(); - - // todo: check that the goal exists - let goal_info = action_msgs::msg::GoalInfo { - goal_id: unique_identifier_msgs::msg::UUID { - uuid: self.uuid.as_bytes().to_vec(), - }, - ..action_msgs::msg::GoalInfo::default() - }; - let goal_info_native = WrappedNativeMsg::::from(&goal_info); - - // does this goal exist? - let goal_exists = - unsafe { rcl_action_server_goal_exists(action_server.handle(), &*goal_info_native) }; - - if !goal_exists { - println!("tried to publish result without a goal"); - return Err(Error::RCL_RET_ACTION_GOAL_HANDLE_INVALID); - } - - // todo: error handling - unsafe { - let handle = self.handle.lock().unwrap(); - rcl_action_update_goal_state(*handle, rcl_action_goal_event_t::GOAL_EVENT_SUCCEED); - } - - // todo: error handling - unsafe { - rcl_action_notify_goal_done(action_server.handle()); - } - - // send out updated statues - action_server.publish_status(); - - // create result message - let result_msg = T::make_result_response_msg(4, msg); // todo: int8 STATUS_SUCCEEDED = 4 - let native_msg = WrappedNativeMsg::< - <::GetResult as WrappedServiceTypeSupport>::Response, - >::from(&result_msg); - action_server.add_result(self.uuid.clone(), Box::new(native_msg)); - - Ok(()) - } -} - #[derive(Clone)] pub struct ActionServer where @@ -1578,174 +183,12 @@ where } } -#[derive(Debug, Clone)] -pub struct Context { - context_handle: Arc>, -} - -// Not 100% about this one. From our end the context is rarely used -// and can be locked by a mutex for that. But I haven't investigated -// if its use is thread-safe between nodes. May remove send here -// later. -unsafe impl Send for Context {} - -impl Context { - pub fn create() -> Result { - let mut ctx: Box = unsafe { Box::new(rcl_get_zero_initialized_context()) }; - // argc/v - let args = std::env::args() - .map(|arg| CString::new(arg).unwrap()) - .collect::>(); - let mut c_args = args - .iter() - .map(|arg| arg.as_ptr()) - .collect::>(); - c_args.push(std::ptr::null()); - - let is_valid = unsafe { - let allocator = rcutils_get_default_allocator(); - let mut init_options = rcl_get_zero_initialized_init_options(); - rcl_init_options_init(&mut init_options, allocator); - rcl_init( - (c_args.len() - 1) as ::std::os::raw::c_int, - c_args.as_ptr(), - &init_options, - ctx.as_mut(), - ); - rcl_init_options_fini(&mut init_options as *mut _); - rcl_context_is_valid(ctx.as_mut()) - }; - - let logging_ok = unsafe { - let _guard = log_guard(); - let ret = rcl_logging_configure( - &ctx.as_ref().global_arguments, - &rcutils_get_default_allocator(), - ); - ret == RCL_RET_OK as i32 - }; - - if is_valid && logging_ok { - Ok(Context { - context_handle: Arc::new(Mutex::new(ContextHandle(ctx))), - }) - } else { - Err(Error::RCL_RET_ERROR) // TODO - } - } - - pub fn is_valid(&self) -> bool { - let mut ctx = self.context_handle.lock().unwrap(); - unsafe { rcl_context_is_valid(ctx.as_mut()) } - } -} - -#[derive(Debug)] -pub struct ContextHandle(Box); - -impl Deref for ContextHandle { - type Target = Box; - - fn deref<'a>(&'a self) -> &'a Box { - &self.0 - } -} - -impl DerefMut for ContextHandle { - fn deref_mut<'a>(&'a mut self) -> &'a mut Box { - &mut self.0 - } -} - -impl Drop for ContextHandle { - fn drop(&mut self) { - // TODO: error handling? atleast probably need rcl_reset_error - unsafe { - rcl::rcl_shutdown(self.0.as_mut()); - rcl::rcl_context_fini(self.0.as_mut()); - } - } -} - -#[derive(Debug)] -pub enum ParameterValue { - NotSet, - Bool(bool), - Integer(i64), - Double(f64), - String(String), - BoolArray(Vec), - ByteArray(Vec), - IntegerArray(Vec), - DoubleArray(Vec), - StringArray(Vec), -} - -impl ParameterValue { - fn from_rcl(v: &rcl_variant_t) -> Self { - if v.bool_value != std::ptr::null_mut() { - ParameterValue::Bool(unsafe { *v.bool_value }) - } else if v.integer_value != std::ptr::null_mut() { - ParameterValue::Integer(unsafe { *v.integer_value }) - } else if v.double_value != std::ptr::null_mut() { - ParameterValue::Double(unsafe { *v.double_value }) - } else if v.string_value != std::ptr::null_mut() { - let s = unsafe { CStr::from_ptr(v.string_value) }; - let string = s.to_str().unwrap_or("").to_owned(); - ParameterValue::String(string) - } else if v.byte_array_value != std::ptr::null_mut() { - let vals = unsafe { - std::slice::from_raw_parts((*v.byte_array_value).values, (*v.byte_array_value).size) - }; - ParameterValue::ByteArray(vals.iter().cloned().collect()) - } else if v.bool_array_value != std::ptr::null_mut() { - let vals = unsafe { - std::slice::from_raw_parts((*v.bool_array_value).values, (*v.bool_array_value).size) - }; - ParameterValue::BoolArray(vals.iter().cloned().collect()) - } else if v.integer_array_value != std::ptr::null_mut() { - let vals = unsafe { - std::slice::from_raw_parts( - (*v.integer_array_value).values, - (*v.integer_array_value).size, - ) - }; - ParameterValue::IntegerArray(vals.iter().cloned().collect()) - } else if v.double_array_value != std::ptr::null_mut() { - let vals = unsafe { - std::slice::from_raw_parts( - (*v.double_array_value).values, - (*v.double_array_value).size, - ) - }; - ParameterValue::DoubleArray(vals.iter().cloned().collect()) - } else if v.string_array_value != std::ptr::null_mut() { - let vals = unsafe { - std::slice::from_raw_parts( - (*v.string_array_value).data, - (*v.string_array_value).size, - ) - }; - let s = vals - .iter() - .map(|cs| { - let s = unsafe { CStr::from_ptr(*cs) }; - s.to_str().unwrap_or("").to_owned() - }) - .collect(); - ParameterValue::StringArray(s) - } else { - ParameterValue::NotSet - } - } -} - pub struct Node { context: Context, pub params: HashMap, node_handle: Box, // the node owns the subscribers - subs: Vec>, + subs: Vec>, // services, services: Vec>, // clients with hack to avoid locking just to wait..., @@ -1847,7 +290,7 @@ impl Node { for (s, v) in param_names.iter().zip(param_values) { let s = unsafe { CStr::from_ptr(*s) }; let key = s.to_str().unwrap_or(""); - let val = ParameterValue::from_rcl(&*v); + let val = parameter_value_from_rcl(&*v); self.params.insert(key.to_owned(), val); } } @@ -1932,7 +375,7 @@ impl Node { let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; let (sender, receiver) = mpsc::channel::(10); - let ws = WrappedSub { + let ws = TypedSubscriber { rcl_handle: subscription_handle, sender, }; @@ -1950,7 +393,7 @@ impl Node { let subscription_handle = self.create_subscription_helper(topic, T::get_ts())?; let (sender, receiver) = mpsc::channel::>(10); - let ws = WrappedSubNative { + let ws = NativeSubscriber { rcl_handle: subscription_handle, sender, }; @@ -1968,7 +411,7 @@ impl Node { let subscription_handle = self.create_subscription_helper(topic, msg.ts)?; let (sender, receiver) = mpsc::channel::>(10); - let ws = WrappedSubUntyped { + let ws = UntypedSubscriber { rcl_handle: subscription_handle, topic_type: topic_type.to_string(), sender, @@ -2013,7 +456,7 @@ impl Node { let service_handle = self.create_service_helper(service_name, T::get_ts())?; let (sender, receiver) = mpsc::channel::>(10); - let ws = WrappedService:: { + let ws = TypedService:: { rcl_handle: service_handle, outstanding_requests: vec![], sender, @@ -2023,38 +466,12 @@ impl Node { Ok(receiver) } - fn create_client_helper( - &mut self, - service_name: &str, - service_ts: *const rosidl_service_type_support_t, - ) -> Result { - let mut client_handle = unsafe { rcl_get_zero_initialized_client() }; - let service_name_c_string = - CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?; - - let result = unsafe { - let client_options = rcl_client_get_default_options(); - rcl_client_init( - &mut client_handle, - self.node_handle.as_mut(), - service_ts, - service_name_c_string.as_ptr(), - &client_options, - ) - }; - if result == RCL_RET_OK as i32 { - Ok(client_handle) - } else { - Err(Error::from_rcl_error(result)) - } - } - pub fn create_client(&mut self, service_name: &str) -> Result> where T: WrappedServiceTypeSupport, { - let client_handle = self.create_client_helper(service_name, T::get_ts())?; - let ws = WrappedClient:: { + let client_handle = create_client_helper(self.node_handle.as_mut(), service_name, T::get_ts())?; + let ws = TypedClient:: { rcl_handle: client_handle, response_channels: Vec::new(), }; @@ -2066,28 +483,55 @@ impl Node { Ok(c) } - pub fn service_available( - &self, - client: &Client, - ) -> Result { - let client = client - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let client = client.lock().unwrap(); + /// Create a service client without having the concrete rust type. + pub fn create_client_untyped(&mut self, service_name: &str, service_type: &str) -> Result + { + let service_type = UntypedServiceSupport::new_from(service_type)?; + let client_handle = create_client_helper(self.node_handle.as_mut(), service_name, service_type.ts)?; + let client = UntypedClient_ { + service_type, + rcl_handle: client_handle, + response_channels: Vec::new(), + }; + + let arc = Arc::new(Mutex::new(client)); + let client = Arc::downgrade(&arc); + self.clients.push((client_handle, arc)); + let c = UntypedClient { client_: client }; + Ok(c) + } + + fn service_available_helper(&self, client: &rcl_client_t) -> Result { let mut avail = false; let result = unsafe { - rcl_service_server_is_available(self.node_handle.as_ref(), client.handle(), &mut avail) + rcl_service_server_is_available(self.node_handle.as_ref(), client, &mut avail) }; if result == RCL_RET_OK as i32 { Ok(avail) } else { - eprintln!("coult not send request {}", result); Err(Error::from_rcl_error(result)) } } + pub fn service_available(&self,client: &Client) -> Result { + let client = client + .client_ + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + self.service_available_helper(client.handle()) + } + + pub fn service_available_untyped(&self, client: &UntypedClient) -> Result { + let client = client + .client_ + .upgrade() + .ok_or(Error::RCL_RET_CLIENT_INVALID)?; + let client = client.lock().unwrap(); + self.service_available_helper(client.handle()) + } + fn create_action_client_helper( &mut self, action_name: &str, @@ -2861,38 +1305,6 @@ where } } -impl Client -where - T: WrappedServiceTypeSupport, -{ - pub fn request(&self, msg: &T::Request) -> Result>> - where - T: WrappedServiceTypeSupport, - { - // upgrade to actual ref. if still alive - let client = self - .client_ - .upgrade() - .ok_or(Error::RCL_RET_CLIENT_INVALID)?; - let mut client = client.lock().unwrap(); - let native_msg: WrappedNativeMsg = WrappedNativeMsg::::from(msg); - let mut seq_no = 0i64; - let result = - unsafe { rcl_send_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no) }; - - let (sender, receiver) = oneshot::channel::(); - - if result == RCL_RET_OK as i32 { - client.response_channels.push((seq_no, sender)); - // instead of "canceled" we return invalid client. - Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID)) - } else { - eprintln!("coult not send request {}", result); - Err(Error::from_rcl_error(result)) - } - } -} - impl ClientGoal where T: WrappedActionTypeSupport, @@ -3049,81 +1461,6 @@ impl PublisherUntyped { } } -#[derive(Debug)] -pub enum ClockType { - RosTime, - SystemTime, - SteadyTime, -} - -unsafe impl Send for Clock {} - -pub struct Clock { - clock_handle: Box, -} - -impl Clock { - fn clock_type_to_rcl(ct: &ClockType) -> rcl_clock_type_t { - match ct { - ClockType::RosTime => rcl_clock_type_t::RCL_ROS_TIME, - ClockType::SystemTime => rcl_clock_type_t::RCL_SYSTEM_TIME, - ClockType::SteadyTime => rcl_clock_type_t::RCL_STEADY_TIME, - } - } - - pub fn create(ct: ClockType) -> Result { - let mut clock_handle = MaybeUninit::::uninit(); - - let rcl_ct = Clock::clock_type_to_rcl(&ct); - let ret = unsafe { - rcl_clock_init( - rcl_ct, - clock_handle.as_mut_ptr(), - &mut rcutils_get_default_allocator(), - ) - }; - if ret != RCL_RET_OK as i32 { - eprintln!("could not create {:?} clock: {}", ct, ret); - return Err(Error::from_rcl_error(ret)); - } - - let clock_handle = Box::new(unsafe { clock_handle.assume_init() }); - Ok(Clock { clock_handle }) - } - - pub fn get_now(&mut self) -> Result { - let valid = unsafe { rcl_clock_valid(&mut *self.clock_handle) }; - if !valid { - return Err(Error::from_rcl_error(RCL_RET_INVALID_ARGUMENT as i32)); - } - let mut tp: rcutils_time_point_value_t = 0; - let ret = unsafe { rcl_clock_get_now(&mut *self.clock_handle, &mut tp) }; - - if ret != RCL_RET_OK as i32 { - eprintln!("could not create steady clock: {}", ret); - return Err(Error::from_rcl_error(ret)); - } - - let dur = Duration::from_nanos(tp as u64); - - Ok(dur) - } - - pub fn to_builtin_time(d: &Duration) -> builtin_interfaces::msg::Time { - let sec = d.as_secs() as i32; - let nanosec = d.subsec_nanos(); - builtin_interfaces::msg::Time { sec, nanosec } - } -} - -impl Drop for Clock { - fn drop(&mut self) { - unsafe { - rcl_clock_fini(&mut *self.clock_handle); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -3139,253 +1476,4 @@ mod tests { assert!(ctx.is_valid()); } } - - #[test] - fn test_ros_str() -> () { - let hej = "hej hopp"; - let mut msg = WrappedNativeMsg::::new(); - msg.data.assign(hej); - assert_eq!(msg.data.to_str(), hej); - } - - #[test] - fn test_copy_fields() -> () { - let msg_orig = std_msgs::msg::String { data: "hej".into() }; - let rosmsg = WrappedNativeMsg::::from(&msg_orig); - let msg2 = std_msgs::msg::String::from_native(&rosmsg); - assert_eq!(msg_orig, msg2); - } - - #[test] - fn test_introspection_string() -> () { - unsafe { - use std::ffi::CStr; - - let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__std_msgs__msg__String(); - let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; - println!("{:#?}", *members); - - assert_eq!((*members).member_count_, 1); - - let s = CStr::from_ptr((*members).message_namespace_) - .to_str() - .unwrap(); - assert_eq!(s, "std_msgs__msg"); - - let s = CStr::from_ptr((*members).message_name_).to_str().unwrap(); - assert_eq!(s, "String"); - - let member = (*members).members_; - println!("member: {:#?}", *member); - let field_name = CStr::from_ptr((*member).name_).to_str().unwrap(); - let type_id = (*member).type_id_; - let is_array = (*member).is_array_; - assert_eq!(field_name, "data"); - assert_eq!( - type_id, - rosidl_typesupport_introspection_c__ROS_TYPE_STRING as u8 - ); - assert_eq!(is_array, false); - } - } - - #[test] - #[should_panic] // we are testing that we cannot have to many elements in a fixed sized field - fn test_fixedsizearray() -> () { - unsafe { - let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__geometry_msgs__msg__AccelWithCovariance(); - let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; - println!("{:#?}", *members); - - let memberslice = - std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize); - for member in memberslice { - println!("member: {:#?}", *member); - } - - let msg_native = WrappedNativeMsg::::new(); - let mut msg = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native); - println!("{:#?}", msg); - msg.covariance[0] = 10.0; - msg.covariance[10] = 10.0; - msg.covariance[35] = 99.0; - msg.covariance.push(4444.0); - let msg_native2 = - WrappedNativeMsg::::from(&msg); - let msg2 = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native2); - println!("{:#?}", msg2); - } - } - - #[test] - #[should_panic] // we are testing that we cannot have to many elements in a fixed sized field - fn test_capped_sequence() -> () { - // float64[<=3] dimensions in the .msg translates to a float64 sequence AND an array size field. handle it... - unsafe { - let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__shape_msgs__msg__SolidPrimitive(); - let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; - println!("{:#?}", *members); - - let memberslice = - std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize); - for member in memberslice { - println!("member: {:#?}", *member); - } - - let msg_native = WrappedNativeMsg::::new(); - let mut msg = shape_msgs::msg::SolidPrimitive::from_native(&msg_native); - println!("{:#?}", msg); - msg.dimensions.push(1.0); - msg.dimensions.push(1.0); - msg.dimensions.push(1.0); - msg.dimensions.push(1.0); // only three elements allowed - let _msg_native2 = WrappedNativeMsg::::from(&msg); - } - } - - #[test] - fn test_generation_string_use() -> () { - let msg = std_msgs::msg::String { data: "hej".into() }; - let msg2 = msg.clone(); - let msg_native = WrappedNativeMsg::::from(&msg2); - let msg2 = std_msgs::msg::String::from_native(&msg_native); - assert_eq!(msg, msg2) - } - - #[test] - fn test_generation_bool_use() -> () { - let msg = std_msgs::msg::Bool { data: true }; - let msg_native = WrappedNativeMsg::::from(&msg); - let msg2 = std_msgs::msg::Bool::from_native(&msg_native); - assert_eq!(msg, msg2); - } - - #[test] - fn test_float_sequence() -> () { - use trajectory_msgs::msg::*; - let native = WrappedNativeMsg::::new(); - let mut msg = JointTrajectoryPoint::from_native(&native); - msg.positions.push(39.0); - msg.positions.push(34.0); - let new_native = WrappedNativeMsg::::from(&msg); - let new_msg = JointTrajectoryPoint::from_native(&new_native); - println!("{:#?}", new_msg); - assert_eq!(msg, new_msg); - } - - #[test] - fn test_deault() -> () { - use trajectory_msgs::msg::*; - let mut msg: JointTrajectoryPoint = Default::default(); - msg.positions.push(39.0); - msg.positions.push(34.0); - let mut new_native = WrappedNativeMsg::::from(&msg); - unsafe { *((*new_native).positions.data) = 88.9 }; - let new_msg = JointTrajectoryPoint::from_native(&new_native); - println!("{:#?}", new_msg); - assert_ne!(msg, new_msg); - } - - #[test] - fn test_untyped_json() -> () { - let mut msg = trajectory_msgs::msg::JointTrajectoryPoint::default(); - msg.positions.push(39.0); - msg.positions.push(34.0); - let json = serde_json::to_value(msg.clone()).unwrap(); - - let mut native = - WrappedNativeMsgUntyped::new_from("trajectory_msgs/msg/JointTrajectoryPoint").unwrap(); - native.from_json(json.clone()).unwrap(); - let json2 = native.to_json().unwrap(); - assert_eq!(json, json2); - - let msg2: trajectory_msgs::msg::JointTrajectoryPoint = - serde_json::from_value(json2).unwrap(); - assert_eq!(msg, msg2); - } - - #[cfg(r2r__test_msgs__msg__Arrays)] - #[test] - fn test_test_msgs_array() -> () { - let mut msg = test_msgs::msg::Arrays::default(); - println!("msg: {:?}", msg.string_values); - msg.string_values = vec!["hej".to_string(), "hopp".to_string(), "stropp".to_string()]; - - let msg_native = WrappedNativeMsg::::from(&msg); - let msg2 = test_msgs::msg::Arrays::from_native(&msg_native); - - assert_eq!(msg, msg2); - } - - #[cfg(r2r__test_msgs__msg__Arrays)] - #[test] - #[should_panic] - fn test_test_msgs_array_too_few_elems() -> () { - let mut msg = test_msgs::msg::Arrays::default(); - println!("msg: {:?}", msg.string_values); - msg.string_values = vec!["hej".to_string(), "hopp".to_string()]; - let _msg_native = WrappedNativeMsg::::from(&msg); - } - - #[cfg(r2r__test_msgs__msg__WStrings)] - #[test] - fn test_test_msgs_wstring() -> () { - let mut msg = test_msgs::msg::WStrings::default(); - let rust_str = "ハローワールド"; - msg.wstring_value = rust_str.to_string(); - let native = WrappedNativeMsg::::from(&msg); - println!("msg: {:?}", msg); - let msg2 = test_msgs::msg::WStrings::from_native(&native); - assert_eq!(msg.wstring_value, msg2.wstring_value); - } - - #[cfg(r2r__example_interfaces__srv__AddTwoInts)] - #[test] - fn test_service_msgs() { - use example_interfaces::srv::AddTwoInts; - let mut req = AddTwoInts::Request::default(); - req.a = 5; - let rn = WrappedNativeMsg::<_>::from(&req); - let req2 = AddTwoInts::Request::from_native(&rn); - println!("req2 {:?}", req2); - assert_eq!(req, req2); - - let mut resp = AddTwoInts::Response::default(); - resp.sum = 5; - let rn = WrappedNativeMsg::<_>::from(&resp); - let resp2 = AddTwoInts::Response::from_native(&rn); - println!("resp {:?}", resp2); - assert_eq!(resp, resp2); - } - - #[cfg(r2r__example_interfaces__action__Fibonacci)] - #[test] - fn test_action_msgs() { - use example_interfaces::action::Fibonacci; - let mut goal = Fibonacci::Goal::default(); - goal.order = 5; - let gn = WrappedNativeMsg::<_>::from(&goal); - let goal2 = Fibonacci::Goal::from_native(&gn); - println!("goal2 {:?}", goal2); - assert_eq!(goal, goal2); - - let mut res = Fibonacci::Result::default(); - res.sequence = vec![1, 2, 3]; - let rn = WrappedNativeMsg::<_>::from(&res); - let res2 = Fibonacci::Result::from_native(&rn); - println!("res2 {:?}", res2); - assert_eq!(res, res2); - - let mut fb = Fibonacci::Feedback::default(); - fb.sequence = vec![4, 3, 6]; - let fbn = WrappedNativeMsg::<_>::from(&fb); - let fb2 = Fibonacci::Feedback::from_native(&fbn); - println!("feedback2 {:?}", fb2); - assert_eq!(fb, fb2); - - let fb = WrappedNativeMsg::::new(); - let fb1 = Fibonacci::Feedback::default(); - let fb2 = Fibonacci::Feedback::from_native(&fb); - assert_eq!(fb1, fb2); - } } diff --git a/src/msg_types.rs b/src/msg_types.rs new file mode 100644 index 0000000..e93b18a --- /dev/null +++ b/src/msg_types.rs @@ -0,0 +1,518 @@ +use crate::error::*; +use msg_gen::*; +use rcl::{ + rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t, +}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; + +pub mod generated_msgs { + use super::*; + include!(concat!(env!("OUT_DIR"), "/_r2r_generated_msgs.rs")); + include!(concat!( + env!("OUT_DIR"), + "/_r2r_generated_untyped_helper.rs" + )); + include!(concat!( + env!("OUT_DIR"), + "/_r2r_generated_service_helper.rs" + )); +} + +use generated_msgs::{builtin_interfaces, unique_identifier_msgs}; + +pub trait WrappedTypesupport: + Serialize + serde::de::DeserializeOwned + Default + Debug + Clone +{ + type CStruct; + + fn get_ts() -> &'static rosidl_message_type_support_t; + fn create_msg() -> *mut Self::CStruct; + fn destroy_msg(msg: *mut Self::CStruct); + fn from_native(msg: &Self::CStruct) -> Self; + fn copy_to_native(&self, msg: &mut Self::CStruct); +} + +pub trait WrappedServiceTypeSupport: Debug + Clone { + type Request: WrappedTypesupport; + type Response: WrappedTypesupport; + + fn get_ts() -> &'static rosidl_service_type_support_t; +} + +pub trait WrappedActionTypeSupport: Debug + Clone { + type Goal: WrappedTypesupport; + type Result: WrappedTypesupport; + type Feedback: WrappedTypesupport; + + // internal... + type FeedbackMessage: WrappedTypesupport; + type SendGoal: WrappedServiceTypeSupport; + type GetResult: WrappedServiceTypeSupport; + + fn get_ts() -> &'static rosidl_action_type_support_t; + + fn make_goal_request_msg( + goal_id: unique_identifier_msgs::msg::UUID, + goal: Self::Goal, + ) -> <::SendGoal as WrappedServiceTypeSupport>::Request; + fn make_goal_response_msg( + accepted: bool, + stamp: builtin_interfaces::msg::Time, + ) -> <::SendGoal as WrappedServiceTypeSupport>::Response; + fn make_feedback_msg( + goal_id: unique_identifier_msgs::msg::UUID, + feedback: Self::Feedback, + ) -> Self::FeedbackMessage; + fn make_result_request_msg( + goal_id: unique_identifier_msgs::msg::UUID, + ) -> <::GetResult as WrappedServiceTypeSupport>::Request; + fn make_result_response_msg( + status: i8, + result: Self::Result, + ) -> <::GetResult as WrappedServiceTypeSupport>::Response; + fn destructure_goal_request_msg( + msg: <::SendGoal as WrappedServiceTypeSupport>::Request, + ) -> (unique_identifier_msgs::msg::UUID, Self::Goal); + fn destructure_goal_response_msg( + msg: <::SendGoal as WrappedServiceTypeSupport>::Response, + ) -> (bool, builtin_interfaces::msg::Time); + fn destructure_feedback_msg( + msg: Self::FeedbackMessage, + ) -> (unique_identifier_msgs::msg::UUID, Self::Feedback); + fn destructure_result_response_msg( + msg: <::GetResult as WrappedServiceTypeSupport>::Response, + ) -> (i8, Self::Result); + fn destructure_result_request_msg( + msg: <::GetResult as WrappedServiceTypeSupport>::Request, + ) -> unique_identifier_msgs::msg::UUID; +} + +#[derive(Debug)] +pub struct WrappedNativeMsg +where + T: WrappedTypesupport, +{ + pub msg: *mut T::CStruct, +} + +pub trait VoidPtr { + fn void_ptr(&self) -> *const std::os::raw::c_void; + fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void; +} + +#[derive(Debug)] +pub struct WrappedNativeMsgUntyped { + pub ts: &'static rosidl_message_type_support_t, + msg: *mut std::os::raw::c_void, + destroy: fn(*mut std::os::raw::c_void), + msg_to_json: fn( + native: *const std::os::raw::c_void, + ) -> std::result::Result, + msg_from_json: fn( + native: *mut std::os::raw::c_void, + json: serde_json::Value, + ) -> std::result::Result<(), serde_json::error::Error>, +} + +pub struct UntypedServiceSupport { + pub ts: &'static rosidl_service_type_support_t, + pub make_request_msg: fn() -> WrappedNativeMsgUntyped, + pub make_response_msg: fn() -> WrappedNativeMsgUntyped, +} + +impl UntypedServiceSupport { + fn new() -> Self + where + T: WrappedServiceTypeSupport, + { + let make_request_msg = || { + WrappedNativeMsgUntyped::new::() + }; + let make_response_msg = || { + WrappedNativeMsgUntyped::new::() + }; + + UntypedServiceSupport { + ts: T::get_ts(), + make_request_msg, + make_response_msg, + } + } +} + +impl WrappedNativeMsgUntyped { + fn new() -> Self + where + T: WrappedTypesupport, + { + let destroy = |native: *mut std::os::raw::c_void| { + let native_msg = native as *mut T::CStruct; + T::destroy_msg(native_msg); + }; + + let msg_to_json = |native: *const std::os::raw::c_void| { + let msg = unsafe { T::from_native(&*(native as *const T::CStruct)) }; + serde_json::to_value(&msg) + }; + + let msg_from_json = |native: *mut std::os::raw::c_void, json: serde_json::Value| { + serde_json::from_value(json).map(|msg: T| unsafe { + msg.copy_to_native(&mut *(native as *mut T::CStruct)); + }) + }; + + WrappedNativeMsgUntyped { + ts: T::get_ts(), + msg: T::create_msg() as *mut std::os::raw::c_void, + destroy, + msg_to_json, + msg_from_json, + } + } + + pub fn to_json(&self) -> Result { + let json = (self.msg_to_json)(self.msg); + json.map_err(|serde_err| Error::SerdeError { + err: serde_err.to_string(), + }) + } + + pub fn from_json(&mut self, json: serde_json::Value) -> Result<()> { + (self.msg_from_json)(self.msg, json).map_err(|serde_err| Error::SerdeError { + err: serde_err.to_string(), + }) + } +} + +impl VoidPtr for WrappedNativeMsgUntyped { + fn void_ptr(&self) -> *const std::os::raw::c_void { + self.msg as *const _ as *const std::os::raw::c_void + } + + fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void { + self.msg as *mut _ as *mut std::os::raw::c_void + } +} + +impl Drop for WrappedNativeMsgUntyped { + fn drop(&mut self) { + (self.destroy)(self.msg); + } +} + +impl WrappedNativeMsg +where + T: WrappedTypesupport, +{ + pub fn new() -> Self { + WrappedNativeMsg { + msg: T::create_msg(), + } + } + + pub fn from(msg: &T) -> Self { + let mut native_msg = Self::new(); + msg.copy_to_native(&mut native_msg); + native_msg + } +} + +impl VoidPtr for WrappedNativeMsg +where + T: WrappedTypesupport, +{ + fn void_ptr(&self) -> *const std::os::raw::c_void { + self.msg as *const _ as *const std::os::raw::c_void + } + + fn void_ptr_mut(&mut self) -> *mut std::os::raw::c_void { + self.msg as *mut _ as *mut std::os::raw::c_void + } +} + +impl Drop for WrappedNativeMsg +where + T: WrappedTypesupport, +{ + fn drop(&mut self) { + T::destroy_msg(self.msg); + } +} + +impl Deref for WrappedNativeMsg +where + T: WrappedTypesupport, +{ + type Target = T::CStruct; + + fn deref(&self) -> &Self::Target { + unsafe { &(*self.msg) } + } +} + +impl DerefMut for WrappedNativeMsg +where + T: WrappedTypesupport, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut (*self.msg) } + } +} + +#[cfg(test)] +mod tests { + use super::generated_msgs::*; + use super::*; + use rcl::*; + + #[test] + fn test_ros_str() -> () { + let hej = "hej hopp"; + let mut msg = WrappedNativeMsg::::new(); + msg.data.assign(hej); + assert_eq!(msg.data.to_str(), hej); + } + + #[test] + fn test_copy_fields() -> () { + let msg_orig = std_msgs::msg::String { data: "hej".into() }; + let rosmsg = WrappedNativeMsg::::from(&msg_orig); + let msg2 = std_msgs::msg::String::from_native(&rosmsg); + assert_eq!(msg_orig, msg2); + } + + #[test] + fn test_introspection_string() -> () { + unsafe { + use std::ffi::CStr; + + let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__std_msgs__msg__String(); + let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; + println!("{:#?}", *members); + + assert_eq!((*members).member_count_, 1); + + let s = CStr::from_ptr((*members).message_namespace_) + .to_str() + .unwrap(); + assert_eq!(s, "std_msgs__msg"); + + let s = CStr::from_ptr((*members).message_name_).to_str().unwrap(); + assert_eq!(s, "String"); + + let member = (*members).members_; + println!("member: {:#?}", *member); + let field_name = CStr::from_ptr((*member).name_).to_str().unwrap(); + let type_id = (*member).type_id_; + let is_array = (*member).is_array_; + assert_eq!(field_name, "data"); + assert_eq!( + type_id, + rosidl_typesupport_introspection_c__ROS_TYPE_STRING as u8 + ); + assert_eq!(is_array, false); + } + } + + #[test] + #[should_panic] // we are testing that we cannot have to many elements in a fixed sized field + fn test_fixedsizearray() -> () { + unsafe { + let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__geometry_msgs__msg__AccelWithCovariance(); + let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; + println!("{:#?}", *members); + + let memberslice = + std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize); + for member in memberslice { + println!("member: {:#?}", *member); + } + + let msg_native = WrappedNativeMsg::::new(); + let mut msg = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native); + println!("{:#?}", msg); + msg.covariance[0] = 10.0; + msg.covariance[10] = 10.0; + msg.covariance[35] = 99.0; + msg.covariance.push(4444.0); + let msg_native2 = + WrappedNativeMsg::::from(&msg); + let msg2 = geometry_msgs::msg::AccelWithCovariance::from_native(&msg_native2); + println!("{:#?}", msg2); + } + } + + #[test] + #[should_panic] // we are testing that we cannot have to many elements in a fixed sized field + fn test_capped_sequence() -> () { + // float64[<=3] dimensions in the .msg translates to a float64 sequence AND an array size field. handle it... + unsafe { + let x = rosidl_typesupport_introspection_c__get_message_type_support_handle__shape_msgs__msg__SolidPrimitive(); + let members = (*x).data as *const rosidl_typesupport_introspection_c__MessageMembers; + println!("{:#?}", *members); + + let memberslice = + std::slice::from_raw_parts((*members).members_, (*members).member_count_ as usize); + for member in memberslice { + println!("member: {:#?}", *member); + } + + let msg_native = WrappedNativeMsg::::new(); + let mut msg = shape_msgs::msg::SolidPrimitive::from_native(&msg_native); + println!("{:#?}", msg); + msg.dimensions.push(1.0); + msg.dimensions.push(1.0); + msg.dimensions.push(1.0); + msg.dimensions.push(1.0); // only three elements allowed + let _msg_native2 = WrappedNativeMsg::::from(&msg); + } + } + + #[test] + fn test_generation_string_use() -> () { + let msg = std_msgs::msg::String { data: "hej".into() }; + let msg2 = msg.clone(); + let msg_native = WrappedNativeMsg::::from(&msg2); + let msg2 = std_msgs::msg::String::from_native(&msg_native); + assert_eq!(msg, msg2) + } + + #[test] + fn test_generation_bool_use() -> () { + let msg = std_msgs::msg::Bool { data: true }; + let msg_native = WrappedNativeMsg::::from(&msg); + let msg2 = std_msgs::msg::Bool::from_native(&msg_native); + assert_eq!(msg, msg2); + } + + #[test] + fn test_float_sequence() -> () { + use trajectory_msgs::msg::*; + let native = WrappedNativeMsg::::new(); + let mut msg = JointTrajectoryPoint::from_native(&native); + msg.positions.push(39.0); + msg.positions.push(34.0); + let new_native = WrappedNativeMsg::::from(&msg); + let new_msg = JointTrajectoryPoint::from_native(&new_native); + println!("{:#?}", new_msg); + assert_eq!(msg, new_msg); + } + + #[test] + fn test_deault() -> () { + use trajectory_msgs::msg::*; + let mut msg: JointTrajectoryPoint = Default::default(); + msg.positions.push(39.0); + msg.positions.push(34.0); + let mut new_native = WrappedNativeMsg::::from(&msg); + unsafe { *((*new_native).positions.data) = 88.9 }; + let new_msg = JointTrajectoryPoint::from_native(&new_native); + println!("{:#?}", new_msg); + assert_ne!(msg, new_msg); + } + + #[test] + fn test_untyped_json() -> () { + let mut msg = trajectory_msgs::msg::JointTrajectoryPoint::default(); + msg.positions.push(39.0); + msg.positions.push(34.0); + let json = serde_json::to_value(msg.clone()).unwrap(); + + let mut native = + WrappedNativeMsgUntyped::new_from("trajectory_msgs/msg/JointTrajectoryPoint").unwrap(); + native.from_json(json.clone()).unwrap(); + let json2 = native.to_json().unwrap(); + assert_eq!(json, json2); + + let msg2: trajectory_msgs::msg::JointTrajectoryPoint = + serde_json::from_value(json2).unwrap(); + assert_eq!(msg, msg2); + } + + #[cfg(r2r__test_msgs__msg__Arrays)] + #[test] + fn test_test_msgs_array() -> () { + let mut msg = test_msgs::msg::Arrays::default(); + println!("msg: {:?}", msg.string_values); + msg.string_values = vec!["hej".to_string(), "hopp".to_string(), "stropp".to_string()]; + + let msg_native = WrappedNativeMsg::::from(&msg); + let msg2 = test_msgs::msg::Arrays::from_native(&msg_native); + + assert_eq!(msg, msg2); + } + + #[cfg(r2r__test_msgs__msg__Arrays)] + #[test] + #[should_panic] + fn test_test_msgs_array_too_few_elems() -> () { + let mut msg = test_msgs::msg::Arrays::default(); + println!("msg: {:?}", msg.string_values); + msg.string_values = vec!["hej".to_string(), "hopp".to_string()]; + let _msg_native = WrappedNativeMsg::::from(&msg); + } + + #[cfg(r2r__test_msgs__msg__WStrings)] + #[test] + fn test_test_msgs_wstring() -> () { + let mut msg = test_msgs::msg::WStrings::default(); + let rust_str = "ハローワールド"; + msg.wstring_value = rust_str.to_string(); + let native = WrappedNativeMsg::::from(&msg); + println!("msg: {:?}", msg); + let msg2 = test_msgs::msg::WStrings::from_native(&native); + assert_eq!(msg.wstring_value, msg2.wstring_value); + } + + #[cfg(r2r__example_interfaces__srv__AddTwoInts)] + #[test] + fn test_service_msgs() { + use example_interfaces::srv::AddTwoInts; + let mut req = AddTwoInts::Request::default(); + req.a = 5; + let rn = WrappedNativeMsg::<_>::from(&req); + let req2 = AddTwoInts::Request::from_native(&rn); + println!("req2 {:?}", req2); + assert_eq!(req, req2); + + let mut resp = AddTwoInts::Response::default(); + resp.sum = 5; + let rn = WrappedNativeMsg::<_>::from(&resp); + let resp2 = AddTwoInts::Response::from_native(&rn); + println!("resp {:?}", resp2); + assert_eq!(resp, resp2); + } + + #[cfg(r2r__example_interfaces__action__Fibonacci)] + #[test] + fn test_action_msgs() { + use example_interfaces::action::Fibonacci; + let mut goal = Fibonacci::Goal::default(); + goal.order = 5; + let gn = WrappedNativeMsg::<_>::from(&goal); + let goal2 = Fibonacci::Goal::from_native(&gn); + println!("goal2 {:?}", goal2); + assert_eq!(goal, goal2); + + let mut res = Fibonacci::Result::default(); + res.sequence = vec![1, 2, 3]; + let rn = WrappedNativeMsg::<_>::from(&res); + let res2 = Fibonacci::Result::from_native(&rn); + println!("res2 {:?}", res2); + assert_eq!(res, res2); + + let mut fb = Fibonacci::Feedback::default(); + fb.sequence = vec![4, 3, 6]; + let fbn = WrappedNativeMsg::<_>::from(&fb); + let fb2 = Fibonacci::Feedback::from_native(&fbn); + println!("feedback2 {:?}", fb2); + assert_eq!(fb, fb2); + + let fb = WrappedNativeMsg::::new(); + let fb1 = Fibonacci::Feedback::default(); + let fb2 = Fibonacci::Feedback::from_native(&fb); + assert_eq!(fb1, fb2); + } +} diff --git a/src/parameters.rs b/src/parameters.rs new file mode 100644 index 0000000..81d3c77 --- /dev/null +++ b/src/parameters.rs @@ -0,0 +1,72 @@ +use super::*; + +#[derive(Debug, PartialEq, Clone)] +pub enum ParameterValue { + NotSet, + Bool(bool), + Integer(i64), + Double(f64), + String(String), + BoolArray(Vec), + ByteArray(Vec), + IntegerArray(Vec), + DoubleArray(Vec), + StringArray(Vec), +} + +pub fn parameter_value_from_rcl(v: &rcl_variant_t) -> ParameterValue { + if v.bool_value != std::ptr::null_mut() { + ParameterValue::Bool(unsafe { *v.bool_value }) + } else if v.integer_value != std::ptr::null_mut() { + ParameterValue::Integer(unsafe { *v.integer_value }) + } else if v.double_value != std::ptr::null_mut() { + ParameterValue::Double(unsafe { *v.double_value }) + } else if v.string_value != std::ptr::null_mut() { + let s = unsafe { CStr::from_ptr(v.string_value) }; + let string = s.to_str().unwrap_or("").to_owned(); + ParameterValue::String(string) + } else if v.byte_array_value != std::ptr::null_mut() { + let vals = unsafe { + std::slice::from_raw_parts((*v.byte_array_value).values, (*v.byte_array_value).size) + }; + ParameterValue::ByteArray(vals.iter().cloned().collect()) + } else if v.bool_array_value != std::ptr::null_mut() { + let vals = unsafe { + std::slice::from_raw_parts((*v.bool_array_value).values, (*v.bool_array_value).size) + }; + ParameterValue::BoolArray(vals.iter().cloned().collect()) + } else if v.integer_array_value != std::ptr::null_mut() { + let vals = unsafe { + std::slice::from_raw_parts( + (*v.integer_array_value).values, + (*v.integer_array_value).size, + ) + }; + ParameterValue::IntegerArray(vals.iter().cloned().collect()) + } else if v.double_array_value != std::ptr::null_mut() { + let vals = unsafe { + std::slice::from_raw_parts( + (*v.double_array_value).values, + (*v.double_array_value).size, + ) + }; + ParameterValue::DoubleArray(vals.iter().cloned().collect()) + } else if v.string_array_value != std::ptr::null_mut() { + let vals = unsafe { + std::slice::from_raw_parts( + (*v.string_array_value).data, + (*v.string_array_value).size, + ) + }; + let s = vals + .iter() + .map(|cs| { + let s = unsafe { CStr::from_ptr(*cs) }; + s.to_str().unwrap_or("").to_owned() + }) + .collect(); + ParameterValue::StringArray(s) + } else { + ParameterValue::NotSet + } +} diff --git a/src/publishers.rs b/src/publishers.rs new file mode 100644 index 0000000..96a7a01 --- /dev/null +++ b/src/publishers.rs @@ -0,0 +1,44 @@ +use super::*; + +// The publish function is thread safe. ROS2 docs state: +// ============= +// +// This function is thread safe so long as access to both the +// publisher and the" `ros_message` is synchronized." That means that +// calling rcl_publish() from multiple threads is allowed, but" +// calling rcl_publish() at the same time as non-thread safe +// publisher" functions is not, e.g. calling rcl_publish() and +// rcl_publisher_fini()" concurrently is not allowed." Before calling +// rcl_publish() the message can change and after calling" +// rcl_publish() the message can change, but it cannot be changed +// during the" publish call." The same `ros_message`, however, can be +// passed to multiple calls of" rcl_publish() simultaneously, even if +// the publishers differ." The `ros_message` is unmodified by +// rcl_publish()." +// +// TODO: I guess there is a potential error source in destructuring +// while calling publish. I don't think its worth to protect with a +// mutex/rwlock for this though... +// +// Methods that mutate need to called from the thread owning the Node. +// I don't think we can count on Node being generally thread-safe. +// So keep pub/sub management and polling contained to one thread +// and send out publishers. + +unsafe impl Send for Publisher where T: WrappedTypesupport {} + +#[derive(Debug, Clone)] +pub struct Publisher +where + T: WrappedTypesupport, +{ + pub handle: Weak, + pub type_: PhantomData, +} + +unsafe impl Send for PublisherUntyped {} +#[derive(Debug, Clone)] +pub struct PublisherUntyped { + pub handle: Weak, + pub type_: String, +} diff --git a/src/services.rs b/src/services.rs new file mode 100644 index 0000000..127c4c1 --- /dev/null +++ b/src/services.rs @@ -0,0 +1,91 @@ +use super::*; + +pub trait Service { + fn handle(&self) -> &rcl_service_t; + fn send_completed_responses(&mut self) -> (); + fn handle_request(&mut self) -> (); + fn destroy(&mut self, node: &mut rcl_node_t) -> (); +} + +pub struct TypedService +where + T: WrappedServiceTypeSupport, +{ + pub rcl_handle: rcl_service_t, + pub sender: mpsc::Sender>, + pub outstanding_requests: Vec>, +} + +impl Service for TypedService +where + T: WrappedServiceTypeSupport, +{ + fn handle(&self) -> &rcl_service_t { + &self.rcl_handle + } + + fn send_completed_responses(&mut self) -> () { + let mut to_send = vec![]; + self.outstanding_requests.retain_mut(|r| { + match r.try_recv() { + Ok(Some(resp)) => { + to_send.push(resp); + false // done with this. + } + Ok(None) => true, // keep message, waiting for service + Err(_) => false, // channel canceled + } + }); + + for (mut req_id, msg) in to_send { + let mut native_response = WrappedNativeMsg::::from(&msg); + let res = unsafe { + rcl_send_response( + &self.rcl_handle, + &mut req_id, + native_response.void_ptr_mut(), + ) + }; + + // TODO + if res != RCL_RET_OK as i32 { + eprintln!("could not send service response {}", res); + } + } + } + + fn handle_request(&mut self) -> () { + let mut request_id = MaybeUninit::::uninit(); + let mut request_msg = WrappedNativeMsg::::new(); + + let ret = unsafe { + rcl_take_request( + &self.rcl_handle, + request_id.as_mut_ptr(), + request_msg.void_ptr_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let request_id = unsafe { request_id.assume_init() }; + let request_msg = T::Request::from_native(&request_msg); + let (response_sender, response_receiver) = + oneshot::channel::<(rmw_request_id_t, T::Response)>(); + self.outstanding_requests.push(response_receiver); + let request = ServiceRequest:: { + message: request_msg, + request_id, + response_sender, + }; + match self.sender.try_send(request) { + Err(e) => eprintln!("warning: could not send service request ({})", e), + _ => (), + } + } // TODO handle failure. + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_service_fini(&mut self.rcl_handle, node); + } + } +} diff --git a/src/subscribers.rs b/src/subscribers.rs new file mode 100644 index 0000000..d0bf66b --- /dev/null +++ b/src/subscribers.rs @@ -0,0 +1,131 @@ +use super::*; + +pub trait Subscriber { + fn handle(&self) -> &rcl_subscription_t; + fn handle_incoming(&mut self) -> (); + fn destroy(&mut self, node: &mut rcl_node_t) -> (); +} + +pub struct TypedSubscriber +where + T: WrappedTypesupport, +{ + pub rcl_handle: rcl_subscription_t, + pub sender: mpsc::Sender, +} + +pub struct NativeSubscriber +where + T: WrappedTypesupport, +{ + pub rcl_handle: rcl_subscription_t, + pub sender: mpsc::Sender>, +} + +pub struct UntypedSubscriber { + pub rcl_handle: rcl_subscription_t, + pub topic_type: String, + pub sender: mpsc::Sender>, +} + +impl Subscriber for TypedSubscriber +where + T: WrappedTypesupport, +{ + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let msg = T::from_native(&msg); + match self.sender.try_send(msg) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +} + +impl Subscriber for NativeSubscriber +where + T: WrappedTypesupport, +{ + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + match self.sender.try_send(msg) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +} + +impl Subscriber for UntypedSubscriber { + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> () { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) + .expect(&format!("no typesupport for {}", self.topic_type)); + let ret = unsafe { + rcl_take( + &self.rcl_handle, + msg.void_ptr_mut(), + &mut msg_info, + std::ptr::null_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + let json = msg.to_json(); + match self.sender.try_send(json) { + Err(e) => println!("error {:?}", e), + _ => (), + } + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +}