Launching Services with existing NWConnection or NWListener objects (#156)
NIO Transport Services is not capable of launching services with existing NWConnection or NWListener objects. Being able to get an existing NWConnection through a connection bootstrap and into a channel is a useful capability for advanced use cases. Modifications: * Added an option to bootstrap with existing NWListener and NWConnection * Completed promise connection earlier within NIOTSChannels when AlreadyConfigured is called * Added test with new NWConnection and NWListener to register Channels Result: Able to create and register a channel using an existing NWListener and NWConnection
This commit is contained in:
parent
c2e373fec5
commit
5cd6fd45f7
|
|
@ -165,7 +165,7 @@ public final class NIOTSConnectionBootstrap {
|
|||
/// - address: The address to connect to.
|
||||
/// - returns: An `EventLoopFuture<Channel>` to deliver the `Channel` when connected.
|
||||
public func connect(to address: SocketAddress) -> EventLoopFuture<Channel> {
|
||||
return self.connect { channel, promise in
|
||||
return self.connect(shouldRegister: true) { channel, promise in
|
||||
channel.connect(to: address, promise: promise)
|
||||
}
|
||||
}
|
||||
|
|
@ -186,17 +186,36 @@ public final class NIOTSConnectionBootstrap {
|
|||
|
||||
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
|
||||
public func connect(endpoint: NWEndpoint) -> EventLoopFuture<Channel> {
|
||||
return self.connect { channel, promise in
|
||||
return self.connect(shouldRegister: true) {channel, promise in
|
||||
channel.triggerUserOutboundEvent(NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint),
|
||||
promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private func connect(_ connectAction: @escaping (Channel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
|
||||
let conn: Channel = NIOTSConnectionChannel(eventLoop: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.qos,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions)
|
||||
/// Use a pre-existing `NWConnection` to connect a `Channel`.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - connection: The NWConnection to wrap.
|
||||
/// - returns: An `EventLoopFuture<Channel>` to deliver the `Channel` when connected.
|
||||
public func withExistingNWConnection(_ connection: NWConnection) -> EventLoopFuture<Channel> {
|
||||
return self.connect(existingNWConnection: connection, shouldRegister: false) { channel, promise in
|
||||
channel.registerAlreadyConfigured0(promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private func connect(existingNWConnection: NWConnection? = nil, shouldRegister: Bool, _ connectAction: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
|
||||
let conn: NIOTSConnectionChannel
|
||||
if let newConnection = existingNWConnection {
|
||||
conn = NIOTSConnectionChannel(wrapping: newConnection,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions)
|
||||
} else {
|
||||
conn = NIOTSConnectionChannel(eventLoop: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.qos,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions)
|
||||
}
|
||||
let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) }
|
||||
let channelOptions = self.channelOptions
|
||||
|
||||
|
|
@ -205,7 +224,11 @@ public final class NIOTSConnectionBootstrap {
|
|||
initializer(conn)
|
||||
}.flatMap {
|
||||
conn.eventLoop.assertInEventLoop()
|
||||
return conn.register()
|
||||
if shouldRegister {
|
||||
return conn.register()
|
||||
} else {
|
||||
return conn.eventLoop.makeSucceededVoidFuture()
|
||||
}
|
||||
}.flatMap {
|
||||
let connectPromise: EventLoopPromise<Void> = conn.eventLoop.makePromise()
|
||||
connectAction(conn, connectPromise)
|
||||
|
|
|
|||
|
|
@ -228,7 +228,7 @@ internal final class NIOTSConnectionChannel {
|
|||
/// Create a `NIOTSConnectionChannel` with an already-established `NWConnection`.
|
||||
internal convenience init(wrapping connection: NWConnection,
|
||||
on eventLoop: NIOTSEventLoop,
|
||||
parent: Channel,
|
||||
parent: Channel? = nil,
|
||||
qos: DispatchQoS? = nil,
|
||||
tcpOptions: NWProtocolTCP.Options,
|
||||
tlsOptions: NWProtocolTLS.Options?) {
|
||||
|
|
@ -463,7 +463,7 @@ extension NIOTSConnectionChannel: StateManagedChannel {
|
|||
promise?.fail(NIOTSErrors.NotPreConfigured())
|
||||
return
|
||||
}
|
||||
|
||||
self.connectPromise = promise
|
||||
connection.stateUpdateHandler = self.stateUpdateHandler(newState:)
|
||||
connection.betterPathUpdateHandler = self.betterPathHandler
|
||||
connection.pathUpdateHandler = self.pathChangedHandler(newPath:)
|
||||
|
|
|
|||
|
|
@ -246,7 +246,7 @@ public final class NIOTSListenerBootstrap {
|
|||
return self.group.next().makeFailedFuture(NIOTSErrors.InvalidPort(port: port))
|
||||
}
|
||||
|
||||
return self.bind0 { (channel, promise) in
|
||||
return self.bind0(shouldRegister: true) { (channel, promise) in
|
||||
do {
|
||||
// NWListener does not actually resolve hostname-based NWEndpoints
|
||||
// for use with requiredLocalEndpoint, so we fall back to
|
||||
|
|
@ -264,7 +264,7 @@ public final class NIOTSListenerBootstrap {
|
|||
/// - parameters:
|
||||
/// - address: The `SocketAddress` to bind on.
|
||||
public func bind(to address: SocketAddress) -> EventLoopFuture<Channel> {
|
||||
return self.bind0 { (channel, promise) in
|
||||
return self.bind0(shouldRegister: true) { (channel, promise) in
|
||||
channel.bind(to: address, promise: promise)
|
||||
}
|
||||
}
|
||||
|
|
@ -274,7 +274,7 @@ public final class NIOTSListenerBootstrap {
|
|||
/// - parameters:
|
||||
/// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system.
|
||||
public func bind(unixDomainSocketPath: String) -> EventLoopFuture<Channel> {
|
||||
return self.bind0 { (channel, promise) in
|
||||
return self.bind0(shouldRegister: true) { (channel, promise) in
|
||||
do {
|
||||
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
|
||||
channel.bind(to: address, promise: promise)
|
||||
|
|
@ -289,26 +289,49 @@ public final class NIOTSListenerBootstrap {
|
|||
/// - parameters:
|
||||
/// - endpoint: The `NWEndpoint` to bind this channel to.
|
||||
public func bind(endpoint: NWEndpoint) -> EventLoopFuture<Channel> {
|
||||
return self.bind0 { (channel, promise) in
|
||||
return self.bind0(shouldRegister: true) { (channel, promise) in
|
||||
channel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private func bind0(_ binder: @escaping (Channel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
|
||||
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - listener: The NWListener to wrap.
|
||||
public func withNWListener(_ listener:NWListener) -> EventLoopFuture<Channel>{
|
||||
return self.bind0(existingNWListener: listener,shouldRegister: false) { channel, promise in
|
||||
channel.registerAlreadyConfigured0(promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private func bind0(existingNWListener: NWListener? = nil, shouldRegister: Bool, _ binder: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
|
||||
let eventLoop = self.group.next() as! NIOTSEventLoop
|
||||
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
|
||||
let childChannelInit = self.childChannelInit
|
||||
let serverChannelOptions = self.serverChannelOptions
|
||||
let childChannelOptions = self.childChannelOptions
|
||||
|
||||
let serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
let serverChannel: NIOTSListenerChannel
|
||||
if let newListener = existingNWListener {
|
||||
serverChannel = NIOTSListenerChannel(wrapping: newListener,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
} else {
|
||||
serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
}
|
||||
|
||||
return eventLoop.submit {
|
||||
return serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
|
||||
|
|
@ -318,7 +341,11 @@ public final class NIOTSListenerBootstrap {
|
|||
return serverChannel.pipeline.addHandler(AcceptHandler(childChannelInitializer: childChannelInit,
|
||||
childChannelOptions: childChannelOptions))
|
||||
}.flatMap {
|
||||
serverChannel.register()
|
||||
if shouldRegister{
|
||||
return serverChannel.register()
|
||||
} else {
|
||||
return eventLoop.makeSucceededVoidFuture()
|
||||
}
|
||||
}.flatMap {
|
||||
let bindPromise = eventLoop.makePromise(of: Void.self)
|
||||
binder(serverChannel, bindPromise)
|
||||
|
|
|
|||
|
|
@ -128,6 +128,28 @@ internal final class NIOTSListenerChannel {
|
|||
// Must come last, as it requires self to be completely initialized.
|
||||
self._pipeline = ChannelPipeline(channel: self)
|
||||
}
|
||||
|
||||
/// Create a `NIOTSListenerChannel` with an already-established `NWListener`.
|
||||
internal convenience init(wrapping listener: NWListener,
|
||||
on eventLoop: NIOTSEventLoop,
|
||||
qos: DispatchQoS? = nil,
|
||||
tcpOptions: NWProtocolTCP.Options,
|
||||
tlsOptions: NWProtocolTLS.Options?,
|
||||
childLoopGroup: EventLoopGroup,
|
||||
childChannelQoS: DispatchQoS?,
|
||||
childTCPOptions: NWProtocolTCP.Options,
|
||||
childTLSOptions: NWProtocolTLS.Options?) {
|
||||
self.init(eventLoop: eventLoop,
|
||||
qos: qos,
|
||||
tcpOptions: tcpOptions,
|
||||
tlsOptions: tlsOptions,
|
||||
childLoopGroup: childLoopGroup,
|
||||
childChannelQoS: childChannelQoS,
|
||||
childTCPOptions: childTCPOptions,
|
||||
childTLSOptions: childTLSOptions
|
||||
)
|
||||
self.nwListener = listener
|
||||
}
|
||||
}
|
||||
|
||||
// MARK:- NIOTSListenerChannel implementation of Channel
|
||||
|
|
@ -256,9 +278,20 @@ extension NIOTSListenerChannel: StateManagedChannel {
|
|||
self = .active
|
||||
}
|
||||
}
|
||||
internal func alreadyConfigured0(promise: EventLoopPromise<Void>?) {
|
||||
guard let listener = nwListener else {
|
||||
promise?.fail(NIOTSErrors.NotPreConfigured())
|
||||
return
|
||||
}
|
||||
|
||||
func alreadyConfigured0(promise: EventLoopPromise<Void>?) {
|
||||
fatalError("Not implemented")
|
||||
guard case .setup = listener.state else {
|
||||
promise?.fail(NIOTSErrors.NotPreConfigured())
|
||||
return
|
||||
}
|
||||
self.bindPromise = promise
|
||||
listener.stateUpdateHandler = self.stateUpdateHandler(newState:)
|
||||
listener.newConnectionHandler = self.newConnectionHandler(connection:)
|
||||
listener.start(queue: self.connectionQueue)
|
||||
}
|
||||
|
||||
public func localAddress0() throws -> SocketAddress {
|
||||
|
|
@ -289,7 +322,6 @@ extension NIOTSListenerChannel: StateManagedChannel {
|
|||
}
|
||||
|
||||
internal func beginActivating0(to target: NWEndpoint, promise: EventLoopPromise<Void>?) {
|
||||
assert(self.nwListener == nil)
|
||||
assert(self.bindPromise == nil)
|
||||
self.bindPromise = promise
|
||||
|
||||
|
|
|
|||
|
|
@ -163,7 +163,6 @@ extension StateManagedChannel {
|
|||
try self.state.register(eventLoop: self.tsEventLoop, channel: self)
|
||||
self.pipeline.fireChannelRegistered()
|
||||
try self.state.beginActivating()
|
||||
promise?.succeed(())
|
||||
} catch {
|
||||
promise?.fail(error)
|
||||
self.close0(error: error, mode: .all, promise: nil)
|
||||
|
|
|
|||
|
|
@ -229,6 +229,35 @@ class NIOTSEndToEndTests: XCTestCase {
|
|||
XCTAssertNoThrow(try completeFuture.wait())
|
||||
}
|
||||
|
||||
func testNWExistingListener() throws {
|
||||
let nwListenerTest = try NWListener(
|
||||
using: NWParameters(tls: nil),
|
||||
on: NWEndpoint.Port(rawValue: 0)!)
|
||||
let listener = try NIOTSListenerBootstrap(group: self.group)
|
||||
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler())}
|
||||
.withNWListener(nwListenerTest).wait()
|
||||
defer {
|
||||
XCTAssertNoThrow(try listener.close().wait())
|
||||
}
|
||||
|
||||
let nwConnectionTest = NWConnection(
|
||||
host: NWEndpoint.Host("localhost"),
|
||||
port: nwListenerTest.port!,
|
||||
using: NWParameters(tls: nil))
|
||||
let connection = try NIOTSConnectionBootstrap(group: self.group)
|
||||
|
||||
.withExistingNWConnection(nwConnectionTest).wait()
|
||||
defer {
|
||||
XCTAssertNoThrow(try connection.close().wait())
|
||||
}
|
||||
|
||||
let buffer = connection.allocator.bufferFor(string: "hello, world!")
|
||||
let completeFuture = connection.expectRead(buffer)
|
||||
connection.writeAndFlush(buffer, promise: nil)
|
||||
// this is the assert that matters to make sure it works & writes data
|
||||
XCTAssertNoThrow(try completeFuture.wait())
|
||||
}
|
||||
|
||||
func testMultipleConnectionsOneListener() throws {
|
||||
let listener = try NIOTSListenerBootstrap(group: self.group)
|
||||
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler())}
|
||||
|
|
|
|||
Loading…
Reference in New Issue