This commit is contained in:
Martin Dahl 2021-08-13 19:35:54 +02:00
parent 49f83fb5d9
commit 33596b0b69
4 changed files with 52 additions and 65 deletions

View File

@ -17,15 +17,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic2")?; let p = node.create_publisher::<r2r::std_msgs::msg::String>("/topic2")?;
let state = Arc::new(Mutex::new(SharedState::default())); let state = Arc::new(Mutex::new(SharedState::default()));
use r2r::example_interfaces::srv::AddTwoInts;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
std::thread::spawn(move || {
let req = AddTwoInts::Request { a: 10, b: 20 };
print!("{} + {} = ", req.a, req.b);
let resp = client.request(&req).expect("");
});
// task that every other time forwards message to topic2 // task that every other time forwards message to topic2
let state_t1 = state.clone(); let state_t1 = state.clone();
task::spawn(async move { task::spawn(async move {

View File

@ -188,15 +188,15 @@ pub struct Node {
pub params: HashMap<String, ParameterValue>, pub params: HashMap<String, ParameterValue>,
node_handle: Box<rcl_node_t>, node_handle: Box<rcl_node_t>,
// the node owns the subscribers // the node owns the subscribers
subs: Vec<Box<dyn Subscriber>>, subs: Vec<Box<dyn Subscriber_>>,
// services, // services,
services: Vec<Box<dyn Service>>, services: Vec<Box<dyn Service_>>,
// clients with hack to avoid locking just to wait..., // service clients
clients: Vec<(rcl_client_t, Arc<Mutex<dyn Client_>>)>, clients: Vec<Arc<Mutex<dyn Client_>>>,
// action clients // action clients
action_clients: Vec<(rcl_action_client_t, Arc<Mutex<dyn ActionClient_>>)>, action_clients: Vec<Arc<Mutex<dyn ActionClient_>>>,
// action servers // action servers
action_servers: Vec<(rcl_action_server_t, Arc<Mutex<dyn ActionServer_>>)>, action_servers: Vec<Arc<Mutex<dyn ActionServer_>>>,
// timers, // timers,
timers: Vec<Timer_>, timers: Vec<Timer_>,
// and the publishers, whom we allow to be shared.. hmm. // and the publishers, whom we allow to be shared.. hmm.
@ -325,7 +325,7 @@ impl Node {
let mut node = Node { let mut node = Node {
params: HashMap::new(), params: HashMap::new(),
context: ctx, context: ctx,
node_handle: node_handle, node_handle,
subs: Vec::new(), subs: Vec::new(),
services: Vec::new(), services: Vec::new(),
clients: Vec::new(), clients: Vec::new(),
@ -476,10 +476,9 @@ impl Node {
response_channels: Vec::new(), response_channels: Vec::new(),
}; };
let arc = Arc::new(Mutex::new(ws)); let client_arc = Arc::new(Mutex::new(ws));
let client_ = Arc::downgrade(&arc); self.clients.push(client_arc.clone());
self.clients.push((client_handle, arc)); let c = Client { client_: Arc::downgrade(&client_arc) };
let c = Client { client_ };
Ok(c) Ok(c)
} }
@ -494,10 +493,9 @@ impl Node {
response_channels: Vec::new(), response_channels: Vec::new(),
}; };
let arc = Arc::new(Mutex::new(client)); let client_arc = Arc::new(Mutex::new(client));
let client = Arc::downgrade(&arc); self.clients.push(client_arc.clone());
self.clients.push((client_handle, arc)); let c = UntypedClient { client_: Arc::downgrade(&client_arc) };
let c = UntypedClient { client_: client };
Ok(c) Ok(c)
} }
@ -563,7 +561,7 @@ impl Node {
T: WrappedActionTypeSupport, T: WrappedActionTypeSupport,
{ {
let client_handle = self.create_action_client_helper(action_name, T::get_ts())?; let client_handle = self.create_action_client_helper(action_name, T::get_ts())?;
let wa = WrappedActionClient::<T> { let client = WrappedActionClient::<T> {
rcl_handle: client_handle, rcl_handle: client_handle,
goal_response_channels: Vec::new(), goal_response_channels: Vec::new(),
cancel_response_channels: Vec::new(), cancel_response_channels: Vec::new(),
@ -573,10 +571,9 @@ impl Node {
goal_status: HashMap::new(), goal_status: HashMap::new(),
}; };
let arc = Arc::new(Mutex::new(wa)); let client_arc = Arc::new(Mutex::new(client));
let client = Arc::downgrade(&arc); self.action_clients.push(client_arc.clone());
self.action_clients.push((client_handle, arc)); let c = ActionClient { client: Arc::downgrade(&client_arc) };
let c = ActionClient { client };
Ok(c) Ok(c)
} }
@ -657,7 +654,7 @@ impl Node {
let server_handle = let server_handle =
self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?; self.create_action_server_helper(action_name, clock_handle.as_mut(), T::get_ts())?;
let wa = WrappedActionServer::<T> { let server = WrappedActionServer::<T> {
rcl_handle: server_handle, rcl_handle: server_handle,
clock_handle, clock_handle,
accept_goal_cb, accept_goal_cb,
@ -668,10 +665,9 @@ impl Node {
result_requests: HashMap::new(), result_requests: HashMap::new(),
}; };
let arc = Arc::new(Mutex::new(wa)); let server_arc = Arc::new(Mutex::new(server));
let server = Arc::downgrade(&arc); self.action_servers.push(server_arc.clone());
self.action_servers.push((server_handle, arc)); let c = ActionServer { server: Arc::downgrade(&server_arc) };
let c = ActionServer { server };
Ok(c) Ok(c)
} }
@ -798,7 +794,7 @@ impl Node {
// count action client wait set needs // count action client wait set needs
let mut total_action_subs = 0; let mut total_action_subs = 0;
let mut total_action_clients = 0; let mut total_action_clients = 0;
for (ach, _) in &self.action_clients { for c in &self.action_clients {
let mut num_subs = 0; let mut num_subs = 0;
let mut num_gc = 0; let mut num_gc = 0;
let mut num_timers = 0; let mut num_timers = 0;
@ -806,7 +802,7 @@ impl Node {
let mut num_services = 0; let mut num_services = 0;
Self::action_client_get_num_waits( Self::action_client_get_num_waits(
ach, c.lock().unwrap().handle(),
&mut num_subs, &mut num_subs,
&mut num_gc, &mut num_gc,
&mut num_timers, &mut num_timers,
@ -828,7 +824,7 @@ impl Node {
// count action server wait set needs // count action server wait set needs
let mut total_action_timers = 0; let mut total_action_timers = 0;
let mut total_action_services = 0; let mut total_action_services = 0;
for (ash, _) in &self.action_servers { for s in &self.action_servers {
let mut num_subs = 0; let mut num_subs = 0;
let mut num_gc = 0; let mut num_gc = 0;
let mut num_timers = 0; let mut num_timers = 0;
@ -836,7 +832,7 @@ impl Node {
let mut num_services = 0; let mut num_services = 0;
Self::action_server_get_num_waits( Self::action_server_get_num_waits(
ash, s.lock().unwrap().handle(),
&mut num_subs, &mut num_subs,
&mut num_gc, &mut num_gc,
&mut num_timers, &mut num_timers,
@ -876,25 +872,25 @@ impl Node {
rcl_wait_set_clear(&mut ws); rcl_wait_set_clear(&mut ws);
} }
for s in self.subs.iter() { for s in &self.subs {
unsafe { unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
} }
} }
for s in self.timers.iter() { for s in &self.timers {
unsafe { unsafe {
rcl_wait_set_add_timer(&mut ws, &s.timer_handle, std::ptr::null_mut()); rcl_wait_set_add_timer(&mut ws, &s.timer_handle, std::ptr::null_mut());
} }
} }
for (handle, _) in self.clients.iter() { for s in &self.clients {
unsafe { unsafe {
rcl_wait_set_add_client(&mut ws, &*handle, std::ptr::null_mut()); rcl_wait_set_add_client(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
} }
} }
for s in self.services.iter() { for s in &self.services {
unsafe { unsafe {
rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut()); rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut());
} }
@ -905,19 +901,19 @@ impl Node {
// the node before ones created automatically by actions. we // the node before ones created automatically by actions. we
// then assume that we can count on the waitables created by // then assume that we can count on the waitables created by
// the actions are added at the end of the wait set arrays // the actions are added at the end of the wait set arrays
for (ac, _) in self.action_clients.iter() { for ac in &self.action_clients {
unsafe { unsafe {
rcl_action_wait_set_add_action_client( rcl_action_wait_set_add_action_client(
&mut ws, &mut ws,
ac, ac.lock().unwrap().handle(),
std::ptr::null_mut(), std::ptr::null_mut(),
std::ptr::null_mut(), std::ptr::null_mut(),
); );
} }
} }
for (ash, _) in self.action_servers.iter() { for acs in &self.action_servers {
unsafe { unsafe {
rcl_action_wait_set_add_action_server(&mut ws, ash, std::ptr::null_mut()); rcl_action_wait_set_add_action_server(&mut ws, acs.lock().unwrap().handle(), std::ptr::null_mut());
} }
} }
@ -976,7 +972,7 @@ impl Node {
.retain(|t| !timers_to_remove.contains(&t.timer_handle)); .retain(|t| !timers_to_remove.contains(&t.timer_handle));
let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) };
for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) { for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) {
if ws_s != &std::ptr::null() { if ws_s != &std::ptr::null() {
let mut s = s.lock().unwrap(); let mut s = s.lock().unwrap();
s.handle_response(); s.handle_response();
@ -990,7 +986,7 @@ impl Node {
} }
} }
for (ac, s) in &self.action_clients { for ac in &self.action_clients {
let mut is_feedback_ready = false; let mut is_feedback_ready = false;
let mut is_status_ready = false; let mut is_status_ready = false;
let mut is_goal_response_ready = false; let mut is_goal_response_ready = false;
@ -1000,7 +996,7 @@ impl Node {
let ret = unsafe { let ret = unsafe {
rcl_action_client_wait_set_get_entities_ready( rcl_action_client_wait_set_get_entities_ready(
&ws, &ws,
ac, ac.lock().unwrap().handle(),
&mut is_feedback_ready, &mut is_feedback_ready,
&mut is_status_ready, &mut is_status_ready,
&mut is_goal_response_ready, &mut is_goal_response_ready,
@ -1014,32 +1010,32 @@ impl Node {
} }
if is_feedback_ready { if is_feedback_ready {
let mut acs = s.lock().unwrap(); let mut acs = ac.lock().unwrap();
acs.handle_feedback_msg(); acs.handle_feedback_msg();
} }
if is_status_ready { if is_status_ready {
let mut acs = s.lock().unwrap(); let mut acs = ac.lock().unwrap();
acs.handle_status_msg(); acs.handle_status_msg();
} }
if is_goal_response_ready { if is_goal_response_ready {
let mut acs = s.lock().unwrap(); let mut acs = ac.lock().unwrap();
acs.handle_goal_response(); acs.handle_goal_response();
} }
if is_cancel_response_ready { if is_cancel_response_ready {
let mut acs = s.lock().unwrap(); let mut acs = ac.lock().unwrap();
acs.handle_cancel_response(); acs.handle_cancel_response();
} }
if is_result_response_ready { if is_result_response_ready {
let mut acs = s.lock().unwrap(); let mut acs = ac.lock().unwrap();
acs.handle_result_response(); acs.handle_result_response();
} }
} }
for (ash, s) in &self.action_servers { for s in &self.action_servers {
let mut is_goal_request_ready = false; let mut is_goal_request_ready = false;
let mut is_cancel_request_ready = false; let mut is_cancel_request_ready = false;
let mut is_result_request_ready = false; let mut is_result_request_ready = false;
@ -1048,7 +1044,7 @@ impl Node {
let ret = unsafe { let ret = unsafe {
rcl_action_server_wait_set_get_entities_ready( rcl_action_server_wait_set_get_entities_ready(
&ws, &ws,
ash, s.lock().unwrap().handle(),
&mut is_goal_request_ready, &mut is_goal_request_ready,
&mut is_cancel_request_ready, &mut is_cancel_request_ready,
&mut is_result_request_ready, &mut is_result_request_ready,
@ -1237,10 +1233,10 @@ impl Drop for Node {
// TODO: allow other types of clocks... // TODO: allow other types of clocks...
let _ret = unsafe { rcl_steady_clock_fini(t.clock_handle.as_mut()) }; let _ret = unsafe { rcl_steady_clock_fini(t.clock_handle.as_mut()) };
} }
for (_, c) in &mut self.action_clients { for c in &mut self.action_clients {
c.lock().unwrap().destroy(&mut self.node_handle); c.lock().unwrap().destroy(&mut self.node_handle);
} }
for (_, s) in &mut self.action_servers { for s in &mut self.action_servers {
s.lock().unwrap().destroy(&mut self.node_handle); s.lock().unwrap().destroy(&mut self.node_handle);
} }
while let Some(p) = self.pubs.pop() { while let Some(p) = self.pubs.pop() {

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub trait Service { pub trait Service_ {
fn handle(&self) -> &rcl_service_t; fn handle(&self) -> &rcl_service_t;
fn send_completed_responses(&mut self) -> (); fn send_completed_responses(&mut self) -> ();
fn handle_request(&mut self) -> (); fn handle_request(&mut self) -> ();
@ -16,7 +16,7 @@ where
pub outstanding_requests: Vec<oneshot::Receiver<(rmw_request_id_t, T::Response)>>, pub outstanding_requests: Vec<oneshot::Receiver<(rmw_request_id_t, T::Response)>>,
} }
impl<T: 'static> Service for TypedService<T> impl<T: 'static> Service_ for TypedService<T>
where where
T: WrappedServiceTypeSupport, T: WrappedServiceTypeSupport,
{ {

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub trait Subscriber { pub trait Subscriber_ {
fn handle(&self) -> &rcl_subscription_t; fn handle(&self) -> &rcl_subscription_t;
fn handle_incoming(&mut self) -> (); fn handle_incoming(&mut self) -> ();
fn destroy(&mut self, node: &mut rcl_node_t) -> (); fn destroy(&mut self, node: &mut rcl_node_t) -> ();
@ -28,7 +28,7 @@ pub struct UntypedSubscriber {
pub sender: mpsc::Sender<Result<serde_json::Value>>, pub sender: mpsc::Sender<Result<serde_json::Value>>,
} }
impl<T: 'static> Subscriber for TypedSubscriber<T> impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
@ -63,7 +63,7 @@ where
} }
} }
impl<T: 'static> Subscriber for NativeSubscriber<T> impl<T: 'static> Subscriber_ for NativeSubscriber<T>
where where
T: WrappedTypesupport, T: WrappedTypesupport,
{ {
@ -97,7 +97,7 @@ where
} }
} }
impl Subscriber for UntypedSubscriber { impl Subscriber_ for UntypedSubscriber {
fn handle(&self) -> &rcl_subscription_t { fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle &self.rcl_handle
} }