Rudimentary support for service clients 💻💻

This commit is contained in:
Martin Dahl 2020-09-13 14:27:02 +02:00
parent 5260d3ba8e
commit a3e93c814d
2 changed files with 230 additions and 3 deletions

35
examples/client.rs Normal file
View File

@ -0,0 +1,35 @@
use r2r;
use failure::Error;
use r2r::example_interfaces::srv::AddTwoInts;
fn main() -> Result<(), Error> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let client = node.create_client::<AddTwoInts::Service>("/add_two_ints")?;
// wait for service to be available
println!("waiting for service...");
while !node.service_available(&client)? {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
println!("service available.");
let mut c = 0;
loop {
let req = AddTwoInts::Request { a: 10*c, b: c };
let cb_req = req.clone();
let cb = Box::new(move |r: AddTwoInts::Response|
println!("{} + {} = {}", cb_req.a, cb_req.b, r.sum));
client.request(&req, cb)?;
node.spin_once(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(1000));
c+=1;
}
}

View File

@ -325,6 +325,59 @@ where
}
}
// client
struct WrappedClient<T>
where
T: WrappedServiceTypeSupport,
{
rcl_handle: rcl_client_t,
rcl_request: rmw_request_id_t,
// store callbacks with request sequence id and callback function
callbacks: Vec<(i64, Box<dyn FnOnce(T::Response)>)>,
rcl_response_msg: WrappedNativeMsg<T::Response>,
}
pub trait Client_ {
fn handle(&self) -> &rcl_client_t;
fn run_cb(&mut self) -> ();
fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void;
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t;
fn destroy(&mut self, node: &mut rcl_node_t) -> ();
}
impl<T> Client_ for WrappedClient<T>
where
T: WrappedServiceTypeSupport,
{
fn handle(&self) -> &rcl_client_t {
&self.rcl_handle
}
fn rcl_response_msg(&mut self) -> *mut std::os::raw::c_void {
self.rcl_response_msg.void_ptr_mut()
}
fn rcl_request_id(&mut self) -> *mut rmw_request_id_t {
&mut self.rcl_request
}
fn run_cb(&mut self) -> () {
// copy native msg to rust type and run callback
let req_id = self.rcl_request.sequence_number;
let idx = self.callbacks.iter().position(|(id, _)| id == &req_id)
.expect("no such sequence number registered");
let (_, cb_to_run) = self.callbacks.swap_remove(idx);
let response = T::Response::from_native(&self.rcl_response_msg);
(cb_to_run)(response);
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_client_fini(&mut self.rcl_handle, node);
}
}
}
// The publish function is thread safe. ROS2 docs state:
// =============
//
@ -369,12 +422,26 @@ pub struct PublisherUntyped {
type_: String,
}
// Same reasoning for clients.
unsafe impl<T> Send for Client<T> where T: WrappedServiceTypeSupport {}
pub struct Client<T>
where
T: WrappedServiceTypeSupport,
{
client_: Weak<Mutex<WrappedClient<T>>>,
}
#[derive(Debug, Clone)]
pub struct Context {
context_handle: Arc<Mutex<Box<rcl_context_t>>>,
}
// Not 100% about this one. From our end the context is rarely used and can be locked by a mutex for that. But I haven't investigated if its use is thread-safe between nodes. May remove send here later.
// Not 100% about this one. From our end the context is rarely used
// and can be locked by a mutex for that. But I haven't investigated
// if its use is thread-safe between nodes. May remove send here
// later.
unsafe impl Send for Context {}
impl Context {
@ -509,8 +576,10 @@ pub struct Node {
node_handle: Box<rcl_node_t>,
// the node owns the subscribers
subs: Vec<Box<dyn Sub>>,
// servies,
// services,
services: Vec<Box<dyn Service>>,
// clients with hack to avoid locking just to wait...,
clients: Vec<(rcl_client_t, Arc<Mutex<dyn Client_>>)>,
// timers,
timers: Vec<Timer>,
// and the publishers, whom we allow to be shared.. hmm.
@ -626,6 +695,7 @@ impl Node {
node_handle: node_handle,
subs: Vec::new(),
services: Vec::new(),
clients: Vec::new(),
timers: Vec::new(),
pubs: Vec::new(),
};
@ -757,6 +827,74 @@ impl Node {
}
pub fn create_client_helper(&mut self, service_name: &str,
service_ts: *const rosidl_service_type_support_t)
-> Result<rcl_client_t> {
let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
let service_name_c_string = CString::new(service_name)
.map_err(|_|Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let client_options = rcl_client_get_default_options();
rcl_client_init(&mut client_handle, self.node_handle.as_mut(),
service_ts, service_name_c_string.as_ptr(), &client_options)
};
if result == RCL_RET_OK as i32 {
Ok(client_handle)
} else {
Err(Error::from_rcl_error(result))
}
}
pub fn create_client<T: 'static>(
&mut self,
service_name: &str,
) -> Result<Client<T>>
where
T: WrappedServiceTypeSupport,
{
let client_handle = self.create_client_helper(service_name, T::get_ts())?;
let cloned_ch = rcl_client_t {
impl_: client_handle.impl_
};
let ws = WrappedClient::<T> {
rcl_handle: cloned_ch,
rcl_request: rmw_request_id_t {
writer_guid: [0; 16usize],
sequence_number: 0,
},
rcl_response_msg: WrappedNativeMsg::<T::Response>::new(),
callbacks: Vec::new(),
};
let arc = Arc::new(Mutex::new(ws));
let client_ = Arc::downgrade(&arc);
self.clients.push((client_handle, arc));
let c = Client {
client_
};
Ok(c)
}
pub fn service_available<T: WrappedServiceTypeSupport>(&self, client: &Client<T>) -> Result<bool> {
let client = client.client_.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let client = client.lock().unwrap();
let mut avail = false;
let result = unsafe {
rcl_service_server_is_available(
self.node_handle.as_ref(),
client.handle(),
&mut avail)
};
if result == RCL_RET_OK as i32 {
Ok(avail)
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn create_publisher_helper(&mut self, topic: &str,
typesupport: *const rosidl_message_type_support_t) -> Result<rcl_publisher_t> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
@ -826,7 +964,7 @@ impl Node {
self.subs.len(),
0,
self.timers.len(),
0,
self.clients.len(),
self.services.len(),
0,
ctx.as_mut(),
@ -850,6 +988,12 @@ impl Node {
}
}
for (handle, _) in self.clients.iter() {
unsafe {
rcl_wait_set_add_client(&mut ws, &*handle, std::ptr::null_mut());
}
}
for s in self.services.iter() {
unsafe {
rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut());
@ -908,6 +1052,20 @@ impl Node {
}
}
let ws_clients =
unsafe { std::slice::from_raw_parts(ws.clients, ws.size_of_clients) };
assert_eq!(ws_clients.len(), self.clients.len());
for ((_, s), ws_s) in self.clients.iter_mut().zip(ws_clients) {
if ws_s != &std::ptr::null() {
let mut s = s.lock().unwrap();
let ret = unsafe {
rcl_take_response(s.handle(), s.rcl_request_id(), s.rcl_response_msg())
};
if ret == RCL_RET_OK as i32 {
s.run_cb();
}
}
}
let ws_services =
unsafe { std::slice::from_raw_parts(ws.services, ws.size_of_services) };
@ -1099,6 +1257,40 @@ where
}
}
impl<T> Client<T>
where
T: WrappedServiceTypeSupport,
{
pub fn request(&self, msg: &T::Request, cb: Box<dyn FnOnce(T::Response) -> ()>) -> Result<()>
where
T: WrappedServiceTypeSupport,
{
// upgrade to actual ref. if still alive
let client = self.client_.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
let mut client = client.lock().unwrap();
// copy rust msg to native and publish it
let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg);
let mut seq_no = 0i64;
let result = unsafe {
rcl_send_request(
&client.rcl_handle,
native_msg.void_ptr(),
&mut seq_no,
)
};
if result == RCL_RET_OK as i32 {
client.callbacks.push((seq_no, cb));
Ok(())
} else {
eprintln!("coult not send request {}", result);
Err(Error::from_rcl_error(result))
}
}
}
impl PublisherUntyped {
pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
// upgrade to actual ref. if still alive