From 9275c15258b660e2a9c45a1d0b3a0e14cc9980ae Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Mon, 6 Apr 2020 14:46:57 +0200 Subject: [PATCH 1/4] Add support for service messages --- build.rs | 23 ++++++++++++++++++----- common/src/lib.rs | 8 ++++++++ msg_gen/build.rs | 14 +++++++++++--- msg_gen/src/lib.rs | 14 +++++++++++++- src/lib.rs | 19 +++++++++++++++++++ 5 files changed, 69 insertions(+), 9 deletions(-) diff --git a/build.rs b/build.rs index ee2402b..152e800 100644 --- a/build.rs +++ b/build.rs @@ -27,17 +27,30 @@ fn main() { println!("cargo:rustc-link-lib=dylib={}__rosidl_generator_c", module); modules.push_str(&format!(r#"pub mod {module}{{include!(concat!(env!("OUT_DIR"), "/{module}.rs"));}}{lf}"#, module=module, lf="\n")); - //modules.push_str(&format!("pub mod {};\n", module)); let mut codegen = String::new(); for (prefix, msgs) in prefixes { codegen.push_str(&format!(" pub mod {} {{\n", prefix)); - codegen.push_str(" use super::super::*;\n"); for msg in msgs { - codegen.push_str(&generate_rust_msg(module, prefix, msg)); - - println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); + if prefix == &"srv" { + codegen.push_str("#[allow(non_snake_case)]\n"); + codegen.push_str(&format!(" pub mod {} {{\n", msg)); + for s in &["Request", "Response"] { + let msgname = format!("{}_{}", msg, s); + codegen.push_str("#[allow(unused_imports)]\n"); + codegen.push_str(" use super::super::super::*;\n"); + codegen.push_str(&generate_rust_msg(module, prefix, &msgname)); + println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); + } + codegen.push_str(" }\n"); + } else { + // the need to allow unused seems to be a compiler bug... + codegen.push_str("#[allow(unused_imports)]\n"); + codegen.push_str(" use super::super::*;\n"); + codegen.push_str(&generate_rust_msg(module, prefix, msg)); + println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); + } } codegen.push_str(" }\n"); diff --git a/common/src/lib.rs b/common/src/lib.rs index 743a217..cb0a1c0 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -50,6 +50,14 @@ pub fn get_all_ros_msgs() -> Vec { msgs.push(msg_name); } } + if l.starts_with("srv/") && (l.ends_with(".idl") || l.ends_with(".srv")) { + if let Some(file_name_str) = file_name.to_str() { + let substr = &l[4..l.len()-4]; + let srv_name = format!("{}/srv/{}", file_name_str, substr); + println!("srv_name: {}", srv_name); + msgs.push(srv_name); + } + } }); } } diff --git a/msg_gen/build.rs b/msg_gen/build.rs index 7e04b47..3cbb247 100644 --- a/msg_gen/build.rs +++ b/msg_gen/build.rs @@ -48,9 +48,17 @@ fn main() { &msg.module, &msg.prefix, &include_filename )); - let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, &msg.name); - let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name); - introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val)); + if msg.prefix == "srv" { + for s in &["Request", "Response"] { + let key = &format!("{}__{}__{}_{}", &msg.module, &msg.prefix, &msg.name, s); + let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}_{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name, s); + introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val)); + } + } else { + let key = &format!("{}__{}__{}", &msg.module, &msg.prefix, &msg.name); + let val = &format!("unsafe {{ rosidl_typesupport_introspection_c__get_message_type_support_handle__{}__{}__{}() }} as *const i32 as usize", &msg.module, &msg.prefix, &msg.name); + introspecion_map.push_str(&format!("m.insert(\"{}\", {});\n", key, val)); + } } introspecion_map.push_str("m \n }; }\n\n"); diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index c47a3d4..4c3140f 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -110,11 +110,20 @@ pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String { .expect(&format!("code generation error: {}", name_)); let ptr = *ptr as *const i32 as *const rosidl_message_type_support_t; unsafe { - let (module, prefix, name, c_struct, members) = introspection(ptr); + let (module, prefix, mut name, c_struct, members) = introspection(ptr); assert_eq!(module, module_); assert_eq!(prefix, prefix_); assert_eq!(name, name_); + if prefix == "srv" { + // for srv, the message name is both the service name and _Request or _Respone + // we only want to keep the last part. + let mut nn = name.splitn(2, "_"); + let _mod_name = nn.next().expect(&format!("malformed service name {}", name)); + let msg_name = nn.next().expect(&format!("malformed service name {}", name)); + name = msg_name.to_owned(); + } + let mut fields = String::new(); for member in members { @@ -353,6 +362,9 @@ impl WrappedNativeMsgUntyped { let mut lines = String::new(); for msg in msgs { + // for now don't generate untyped services + if msg.prefix == "srv" { continue; } + let typename = format!("{}/{}/{}", msg.module, msg.prefix, msg.name); let rustname = format!("{}::{}::{}", msg.module, msg.prefix, msg.name); diff --git a/src/lib.rs b/src/lib.rs index f8aae0e..0dee56b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -911,4 +911,23 @@ mod tests { assert_eq!(msg.wstring_value, msg2.wstring_value); } + #[cfg(r2r__example_interfaces__srv__AddTwoInts)] + #[test] + fn test_service_msgs() { + use example_interfaces::srv::AddTwoInts; + let mut req = AddTwoInts::Request::default(); + req.a = 5; + let rn = WrappedNativeMsg::<_>::from(&req); + let req2 = AddTwoInts::Request::from_native(&rn); + println!("req2 {:?}", req2); + assert_eq!(req, req2); + + let mut resp = AddTwoInts::Response::default(); + resp.sum = 5; + let rn = WrappedNativeMsg::<_>::from(&resp); + let resp2 = AddTwoInts::Response::from_native(&rn); + println!("resp {:?}", resp2); + assert_eq!(resp, resp2); + } + } From bc8fc7bd746d0000d82aaeffa561b683bef313a1 Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Tue, 7 Apr 2020 09:50:12 +0200 Subject: [PATCH 2/4] Fix unnecessary imports --- build.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/build.rs b/build.rs index 152e800..9bd9f1c 100644 --- a/build.rs +++ b/build.rs @@ -32,22 +32,21 @@ fn main() { for (prefix, msgs) in prefixes { codegen.push_str(&format!(" pub mod {} {{\n", prefix)); - for msg in msgs { - if prefix == &"srv" { + if prefix == &"srv" { + for msg in msgs { codegen.push_str("#[allow(non_snake_case)]\n"); codegen.push_str(&format!(" pub mod {} {{\n", msg)); + codegen.push_str(" use super::super::super::*;\n"); for s in &["Request", "Response"] { let msgname = format!("{}_{}", msg, s); - codegen.push_str("#[allow(unused_imports)]\n"); - codegen.push_str(" use super::super::super::*;\n"); codegen.push_str(&generate_rust_msg(module, prefix, &msgname)); println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); } codegen.push_str(" }\n"); - } else { - // the need to allow unused seems to be a compiler bug... - codegen.push_str("#[allow(unused_imports)]\n"); - codegen.push_str(" use super::super::*;\n"); + } + } else { + codegen.push_str(" use super::super::*;\n"); + for msg in msgs { codegen.push_str(&generate_rust_msg(module, prefix, msg)); println!("cargo:rustc-cfg=r2r__{}__{}__{}", module, prefix, msg); } From cd94bc5ba574589495db22b55ef7475c6ce9acfb Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Tue, 7 Apr 2020 20:25:00 +0200 Subject: [PATCH 3/4] =?UTF-8?q?Service=20servers!=20=F0=9F=96=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + build.rs | 3 + examples/service.rs | 21 +++++++ msg_gen/build.rs | 1 + msg_gen/src/lib.rs | 18 ++++++ src/lib.rs | 139 +++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 examples/service.rs diff --git a/README.md b/README.md index 29c6528..daf7b09 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ What works? - Up to date with ROS2 ~Dashing~ Eloquent - Building Rust types - Publish/subscribe +- Services (servers, not clients yet) TODO -------------------- diff --git a/build.rs b/build.rs index 9bd9f1c..0c2d33d 100644 --- a/build.rs +++ b/build.rs @@ -37,6 +37,9 @@ fn main() { codegen.push_str("#[allow(non_snake_case)]\n"); codegen.push_str(&format!(" pub mod {} {{\n", msg)); codegen.push_str(" use super::super::super::*;\n"); + + codegen.push_str(&generate_rust_service(module, prefix, msg)); + for s in &["Request", "Response"] { let msgname = format!("{}_{}", msg, s); codegen.push_str(&generate_rust_msg(module, prefix, &msgname)); diff --git a/examples/service.rs b/examples/service.rs new file mode 100644 index 0000000..8d990a4 --- /dev/null +++ b/examples/service.rs @@ -0,0 +1,21 @@ +use r2r; +use failure::Error; + +use r2r::example_interfaces::srv::AddTwoInts; + +fn handle_service(request: AddTwoInts::Request) -> AddTwoInts::Response { + println!("request: {} + {}", request.a, request.b); + AddTwoInts::Response { + sum: request.a + request.b + } +} + +fn main() -> Result<(), Error> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + node.create_service::("/add_two_ints", Box::new(handle_service))?; + + loop { + node.spin_once(std::time::Duration::from_millis(100)); + } +} diff --git a/msg_gen/build.rs b/msg_gen/build.rs index 3cbb247..84017a0 100644 --- a/msg_gen/build.rs +++ b/msg_gen/build.rs @@ -77,6 +77,7 @@ fn main() { .derive_copy(false) // blacklist types that are handled by rcl bindings .blacklist_type("rosidl_message_type_support_t") + .blacklist_type("rosidl_service_type_support_t") .blacklist_type("rosidl_generator_c__String") .blacklist_type("rosidl_generator_c__String__Sequence") .blacklist_type("rosidl_generator_c__U16String") diff --git a/msg_gen/src/lib.rs b/msg_gen/src/lib.rs index 4c3140f..488e291 100644 --- a/msg_gen/src/lib.rs +++ b/msg_gen/src/lib.rs @@ -102,6 +102,24 @@ fn field_name(field_name: &str) -> String { } } +pub fn generate_rust_service(module_: &str, prefix_: &str, name_: &str) -> String { + format!( + " + pub struct Service(); + impl WrappedServiceTypeSupport for Service {{ + type Request = Request; + type Response = Response; + fn get_ts() -> &'static rosidl_service_type_support_t {{ + unsafe {{ + &*rosidl_typesupport_c__get_service_type_support_handle__{}__{}__{}() + }} + }} + }} + + ", module_, prefix_, name_) +} + + // TODO: this is a terrible hack :) pub fn generate_rust_msg(module_: &str, prefix_: &str, name_: &str) -> String { let key = format!("{}__{}__{}", module_, prefix_, name_); diff --git a/src/lib.rs b/src/lib.rs index 0dee56b..0bd079b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ use error::*; pub type Result = std::result::Result; -pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned { +pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned + Default { type CStruct; fn get_ts() -> &'static rosidl_message_type_support_t; @@ -27,6 +27,14 @@ pub trait WrappedTypesupport: Serialize + serde::de::DeserializeOwned { fn copy_to_native(&self, msg: &mut Self::CStruct); } +pub trait WrappedServiceTypeSupport { + type Request: WrappedTypesupport; + type Response: WrappedTypesupport; + + fn get_ts() -> &'static rosidl_service_type_support_t; +} + + #[derive(Debug)] pub struct WrappedNativeMsg where @@ -255,6 +263,64 @@ impl Sub for WrappedSubUntyped } } + +// services +struct WrappedService +where + T: WrappedServiceTypeSupport, +{ + rcl_handle: rcl_service_t, + rcl_request: rmw_request_id_t, + callback: Box T::Response>, + rcl_request_msg: WrappedNativeMsg, +} + +pub trait Service { + fn handle(&self) -> &rcl_service_t; + fn run_cb(&mut self) -> (); + fn rcl_request_id(&mut self) -> *mut rmw_request_id_t; + fn rcl_request_msg(&mut self) -> *mut std::os::raw::c_void; + fn destroy(&mut self, node: &mut rcl_node_t) -> (); +} + +impl Service for WrappedService +where + T: WrappedServiceTypeSupport, +{ + fn handle(&self) -> &rcl_service_t { + &self.rcl_handle + } + + fn rcl_request_msg(&mut self) -> *mut std::os::raw::c_void { + self.rcl_request_msg.void_ptr_mut() + } + + fn rcl_request_id(&mut self) -> *mut rmw_request_id_t { + &mut self.rcl_request + } + + fn run_cb(&mut self) -> () { + // copy native msg to rust type and run callback + let request = T::Request::from_native(&self.rcl_request_msg); + let response = (self.callback)(request); + let mut native_response = WrappedNativeMsg::::from(&response); + let res = unsafe { + rcl_send_response(&self.rcl_handle, &mut self.rcl_request, native_response.void_ptr_mut()) + }; + + // TODO + if res != RCL_RET_OK as i32 { + panic!("service error {}", res); + } + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + unsafe { + rcl_service_fini(&mut self.rcl_handle, node); + } + } +} + // The publish function is thread safe. ROS2 docs state: // ============= // @@ -347,6 +413,8 @@ pub struct Node { node_handle: Box, // the node owns the subscribers subs: Vec>, + // servies, + services: Vec>, // and the publishers, whom we allow to be shared.. hmm. pubs: Vec>, } @@ -379,6 +447,7 @@ impl Node { context: ctx, node_handle: node_handle, subs: Vec::new(), + services: Vec::new(), pubs: Vec::new(), }) } else { @@ -464,11 +533,54 @@ impl Node { Ok(self.subs.last().unwrap().handle()) // hmm... } + pub fn create_service_helper(&mut self, service_name: &str, + service_ts: *const rosidl_service_type_support_t) + -> Result { + let mut service_handle = unsafe { rcl_get_zero_initialized_service() }; + let service_name_c_string = CString::new(service_name) + .map_err(|_|Error::RCL_RET_INVALID_ARGUMENT)?; + + let result = unsafe { + let service_options = rcl_service_get_default_options(); + rcl_service_init(&mut service_handle, self.node_handle.as_mut(), + service_ts, service_name_c_string.as_ptr(), &service_options) + }; + if result == RCL_RET_OK as i32 { + Ok(service_handle) + } else { + Err(Error::from_rcl_error(result)) + } + } + + pub fn create_service( + &mut self, + service_name: &str, + callback: Box T::Response>, + ) -> Result<&rcl_service_t> + where + T: WrappedServiceTypeSupport, + { + let service_handle = self.create_service_helper(service_name, T::get_ts())?; + let ws = WrappedService:: { + rcl_handle: service_handle, + rcl_request: rmw_request_id_t { + writer_guid: [0; 16usize], + sequence_number: 0, + }, + rcl_request_msg: WrappedNativeMsg::::new(), + callback: callback, + }; + + self.services.push(Box::new(ws)); + Ok(self.services.last().unwrap().handle()) // hmm... + } + pub fn create_publisher_helper(&mut self, topic: &str, typesupport: *const rosidl_message_type_support_t) -> Result { let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() }; - let topic_c_string = CString::new(topic).map_err(|_|Error::RCL_RET_INVALID_ARGUMENT)?; + let topic_c_string = CString::new(topic) + .map_err(|_|Error::RCL_RET_INVALID_ARGUMENT)?; let result = unsafe { let mut publisher_options = rcl_publisher_get_default_options(); @@ -534,7 +646,7 @@ impl Node { 0, 0, 0, - 0, + self.services.len(), 0, ctx.as_mut(), rcutils_get_default_allocator(), @@ -551,6 +663,12 @@ impl Node { } } + for s in self.services.iter() { + unsafe { + rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut()); + } + } + unsafe { rcl_wait(&mut ws, timeout); } @@ -570,6 +688,21 @@ impl Node { } } + + let ws_services = + unsafe { std::slice::from_raw_parts(ws.services, ws.size_of_services) }; + assert_eq!(ws_services.len(), self.services.len()); + for (s, ws_s) in self.services.iter_mut().zip(ws_services) { + if ws_s != &std::ptr::null() { + let ret = unsafe { + rcl_take_request(s.handle(), s.rcl_request_id(), s.rcl_request_msg()) + }; + if ret == RCL_RET_OK as i32 { + s.run_cb(); + } + } + } + unsafe { rcl_wait_set_fini(&mut ws); } From 2f6803f78cd7b3bf721f9762368bd3416ccc26fb Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Sat, 11 Apr 2020 08:46:06 +0200 Subject: [PATCH 4/4] create_wall_timer with callback --- examples/wall_timer.rs | 18 +++++++ src/lib.rs | 103 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 examples/wall_timer.rs diff --git a/examples/wall_timer.rs b/examples/wall_timer.rs new file mode 100644 index 0000000..34761bf --- /dev/null +++ b/examples/wall_timer.rs @@ -0,0 +1,18 @@ +use r2r; +use failure::Error; + +fn main() -> Result<(), Error> { + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + + let mut x = 0; + let cb = move |elapsed: std::time::Duration| { + println!("timer called ({}), {}us since last call", x, elapsed.as_micros()); + x+=1; + }; + node.create_wall_timer(std::time::Duration::from_millis(2000), Box::new(cb))?; + + loop { + node.spin_once(std::time::Duration::from_millis(100)); + } +} diff --git a/src/lib.rs b/src/lib.rs index 0bd079b..8908537 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ include!(concat!(env!("OUT_DIR"), "/_r2r_generated_untyped_helper.rs")); #[macro_use] extern crate failure_derive; use serde::{Deserialize, Serialize}; use std::ffi::{CString,CStr}; +use std::mem::MaybeUninit; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -415,6 +416,8 @@ pub struct Node { subs: Vec>, // servies, services: Vec>, + // timers, + timers: Vec, // and the publishers, whom we allow to be shared.. hmm. pubs: Vec>, } @@ -448,6 +451,7 @@ impl Node { node_handle: node_handle, subs: Vec::new(), services: Vec::new(), + timers: Vec::new(), pubs: Vec::new(), }) } else { @@ -644,7 +648,7 @@ impl Node { &mut ws, self.subs.len(), 0, - 0, + self.timers.len(), 0, self.services.len(), 0, @@ -663,14 +667,27 @@ impl Node { } } + for s in self.timers.iter() { + unsafe { + rcl_wait_set_add_timer(&mut ws, &s.timer_handle, std::ptr::null_mut()); + } + } + for s in self.services.iter() { unsafe { rcl_wait_set_add_service(&mut ws, s.handle(), std::ptr::null_mut()); } } - unsafe { - rcl_wait(&mut ws, timeout); + let ret = unsafe { + rcl_wait(&mut ws, timeout) + }; + + if ret == RCL_RET_TIMEOUT as i32 { + unsafe { + rcl_wait_set_fini(&mut ws); + } + return; } let ws_subs = @@ -688,6 +705,32 @@ impl Node { } } + let ws_timers = + unsafe { std::slice::from_raw_parts(ws.timers, ws.size_of_timers) }; + assert_eq!(ws_timers.len(), self.timers.len()); + for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) { + if ws_s != &std::ptr::null() { + let mut is_ready = false; + let ret = unsafe { + rcl_timer_is_ready(&s.timer_handle, &mut is_ready) + }; + if ret == RCL_RET_OK as i32 { + if is_ready { + let mut nanos = 0i64; + // todo: error handling + let ret = unsafe { rcl_timer_get_time_since_last_call(&s.timer_handle, + &mut nanos) }; + if ret == RCL_RET_OK as i32 { + let ret = unsafe { rcl_timer_call(&mut s.timer_handle) }; + if ret == RCL_RET_OK as i32 { + (s.callback)(Duration::from_nanos(nanos as u64)); + } + } + } + } + } + } + let ws_services = unsafe { std::slice::from_raw_parts(ws.services, ws.size_of_services) }; @@ -735,6 +778,51 @@ impl Node { Ok(res) } + + pub fn create_wall_timer( + &mut self, + period: Duration, + callback: Box ()>) -> Result<&Timer> { + + let mut clock_handle = MaybeUninit::::uninit(); + + let ret = unsafe { + rcl_steady_clock_init(clock_handle.as_mut_ptr(), &mut rcutils_get_default_allocator()) + }; + if ret != RCL_RET_OK as i32 { + eprintln!("could not create steady clock: {}", ret); + return Err(Error::from_rcl_error(ret)); + } + + let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() }); + let mut timer_handle = unsafe { rcl_get_zero_initialized_timer() }; + + { + let mut ctx = self.context.context_handle.lock().unwrap(); + let ret = unsafe { + rcl_timer_init(&mut timer_handle, clock_handle.as_mut(), + ctx.as_mut(), period.as_nanos() as i64, + None, rcutils_get_default_allocator()) + }; + + if ret != RCL_RET_OK as i32 { + eprintln!("could not create timer: {}", ret); + return Err(Error::from_rcl_error(ret)); + } + } + + let timer = Timer { timer_handle, clock_handle, callback }; + self.timers.push(timer); + + Ok(&self.timers[self.timers.len()-1]) + } + +} + +pub struct Timer { + timer_handle: rcl_timer_t, + clock_handle: Box, + callback: Box ()>, } // Since publishers are temporarily upgraded to owners during the @@ -759,6 +847,15 @@ impl Drop for Node { for s in &mut self.subs { s.destroy(&mut self.node_handle); } + for s in &mut self.services { + s.destroy(&mut self.node_handle); + } + for t in &mut self.timers { + // TODO: check return values + let _ret = unsafe { rcl_timer_fini(&mut t.timer_handle) }; + // TODO: allow other types of clocks... + let _ret = unsafe { rcl_steady_clock_fini(t.clock_handle.as_mut()) }; + } while let Some(p) = self.pubs.pop() { let mut p = wait_until_unwrapped(p); let _ret = unsafe { rcl_publisher_fini(&mut p as *mut _, self.node_handle.as_mut()) };