Pass Channels down pipeline not NWConnection (#45)
Motivation: The expectation is that server channels use Channel as their data type, but the initial data type in NIOTSListenerChannel was actually NWConnection. This is unnecessary and it makes it hard to interop between NIOTS and NIO. Modifications: - Initialize the NIOTSConnectionChannel in the NIOTSListenerChannel instead of in AcceptHandler. - Added some missing @available annotations in the tests. Result: More consistency
This commit is contained in:
parent
3d0713dc43
commit
d59370d89a
|
|
@ -22,7 +22,7 @@ import Network
|
||||||
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
public final class NIOTSListenerBootstrap {
|
public final class NIOTSListenerBootstrap {
|
||||||
private let group: EventLoopGroup
|
private let group: EventLoopGroup
|
||||||
private let childGroup: EventLoopGroup
|
private let childGroup: NIOTSEventLoopGroup
|
||||||
private var serverChannelInit: ((Channel) -> EventLoopFuture<Void>)?
|
private var serverChannelInit: ((Channel) -> EventLoopFuture<Void>)?
|
||||||
private var childChannelInit: ((Channel) -> EventLoopFuture<Void>)?
|
private var childChannelInit: ((Channel) -> EventLoopFuture<Void>)?
|
||||||
private var serverChannelOptions = ChannelOptionsStorage()
|
private var serverChannelOptions = ChannelOptionsStorage()
|
||||||
|
|
@ -200,7 +200,6 @@ public final class NIOTSListenerBootstrap {
|
||||||
|
|
||||||
private func bind0(_ binder: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
|
private func bind0(_ binder: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
|
||||||
let eventLoop = self.group.next() as! NIOTSEventLoop
|
let eventLoop = self.group.next() as! NIOTSEventLoop
|
||||||
let childEventLoopGroup = self.childGroup as! NIOTSEventLoopGroup
|
|
||||||
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
|
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
|
||||||
let childChannelInit = self.childChannelInit
|
let childChannelInit = self.childChannelInit
|
||||||
let serverChannelOptions = self.serverChannelOptions
|
let serverChannelOptions = self.serverChannelOptions
|
||||||
|
|
@ -209,18 +208,18 @@ public final class NIOTSListenerBootstrap {
|
||||||
let serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
let serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
||||||
qos: self.serverQoS,
|
qos: self.serverQoS,
|
||||||
tcpOptions: self.tcpOptions,
|
tcpOptions: self.tcpOptions,
|
||||||
tlsOptions: self.tlsOptions)
|
tlsOptions: self.tlsOptions,
|
||||||
|
childLoopGroup: self.childGroup,
|
||||||
|
childChannelQoS: self.childQoS,
|
||||||
|
childTCPOptions: self.tcpOptions,
|
||||||
|
childTLSOptions: self.tlsOptions)
|
||||||
|
|
||||||
return eventLoop.submit {
|
return eventLoop.submit {
|
||||||
return serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
|
return serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
|
||||||
serverChannelInit(serverChannel)
|
serverChannelInit(serverChannel)
|
||||||
}.flatMap {
|
}.flatMap {
|
||||||
serverChannel.pipeline.addHandler(AcceptHandler(childChannelInitializer: childChannelInit,
|
serverChannel.pipeline.addHandler(AcceptHandler(childChannelInitializer: childChannelInit,
|
||||||
childGroup: childEventLoopGroup,
|
childChannelOptions: childChannelOptions))
|
||||||
childChannelOptions: childChannelOptions,
|
|
||||||
childChannelQoS: self.childQoS,
|
|
||||||
tcpOptions: self.tcpOptions,
|
|
||||||
tlsOptions: self.tlsOptions))
|
|
||||||
}.flatMap {
|
}.flatMap {
|
||||||
serverChannel.register()
|
serverChannel.register()
|
||||||
}.flatMap {
|
}.flatMap {
|
||||||
|
|
@ -240,41 +239,24 @@ public final class NIOTSListenerBootstrap {
|
||||||
|
|
||||||
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
private class AcceptHandler: ChannelInboundHandler {
|
private class AcceptHandler: ChannelInboundHandler {
|
||||||
typealias InboundIn = NWConnection
|
typealias InboundIn = NIOTSConnectionChannel
|
||||||
typealias InboundOut = NIOTSConnectionChannel
|
typealias InboundOut = NIOTSConnectionChannel
|
||||||
|
|
||||||
private let childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
|
private let childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
|
||||||
private let childGroup: NIOTSEventLoopGroup
|
|
||||||
private let childChannelOptions: ChannelOptionsStorage
|
private let childChannelOptions: ChannelOptionsStorage
|
||||||
private let childChannelQoS: DispatchQoS?
|
|
||||||
private let originalTCPOptions: NWProtocolTCP.Options
|
|
||||||
private let originalTLSOptions: NWProtocolTLS.Options?
|
|
||||||
|
|
||||||
init(childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?,
|
init(childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?,
|
||||||
childGroup: NIOTSEventLoopGroup,
|
childChannelOptions: ChannelOptionsStorage) {
|
||||||
childChannelOptions: ChannelOptionsStorage,
|
|
||||||
childChannelQoS: DispatchQoS?,
|
|
||||||
tcpOptions: NWProtocolTCP.Options,
|
|
||||||
tlsOptions: NWProtocolTLS.Options?) {
|
|
||||||
self.childChannelInitializer = childChannelInitializer
|
self.childChannelInitializer = childChannelInitializer
|
||||||
self.childGroup = childGroup
|
|
||||||
self.childChannelOptions = childChannelOptions
|
self.childChannelOptions = childChannelOptions
|
||||||
self.childChannelQoS = childChannelQoS
|
|
||||||
self.originalTCPOptions = tcpOptions
|
|
||||||
self.originalTLSOptions = tlsOptions
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||||
let conn = self.unwrapInboundIn(data)
|
let newChannel = self.unwrapInboundIn(data)
|
||||||
let childLoop = self.childGroup.next() as! NIOTSEventLoop
|
let childLoop = newChannel.eventLoop
|
||||||
let ctxEventLoop = context.eventLoop
|
let ctxEventLoop = context.eventLoop
|
||||||
let childInitializer = self.childChannelInitializer ?? { _ in childLoop.makeSucceededFuture(()) }
|
let childInitializer = self.childChannelInitializer ?? { _ in childLoop.makeSucceededFuture(()) }
|
||||||
let newChannel = NIOTSConnectionChannel(wrapping: conn,
|
|
||||||
on: childLoop,
|
|
||||||
parent: context.channel,
|
|
||||||
qos: self.childChannelQoS,
|
|
||||||
tcpOptions: self.originalTCPOptions,
|
|
||||||
tlsOptions: self.originalTLSOptions)
|
|
||||||
|
|
||||||
@inline(__always)
|
@inline(__always)
|
||||||
func setupChildChannel() -> EventLoopFuture<Void> {
|
func setupChildChannel() -> EventLoopFuture<Void> {
|
||||||
|
|
|
||||||
|
|
@ -83,18 +83,39 @@ internal final class NIOTSListenerChannel {
|
||||||
/// Whether to enable peer-to-peer connectivity when using Bonjour services.
|
/// Whether to enable peer-to-peer connectivity when using Bonjour services.
|
||||||
private var enablePeerToPeer = false
|
private var enablePeerToPeer = false
|
||||||
|
|
||||||
|
/// The event loop group to use for child channels.
|
||||||
|
private let childLoopGroup: NIOTSEventLoopGroup
|
||||||
|
|
||||||
|
/// The QoS to use for child channels.
|
||||||
|
private let childChannelQoS: DispatchQoS?
|
||||||
|
|
||||||
|
/// The TCP options to use for child channels.
|
||||||
|
private let childTCPOptions: NWProtocolTCP.Options
|
||||||
|
|
||||||
|
/// The TLS options to use for child channels.
|
||||||
|
private let childTLSOptions: NWProtocolTLS.Options?
|
||||||
|
|
||||||
|
|
||||||
/// Create a `NIOTSListenerChannel` on a given `NIOTSEventLoop`.
|
/// Create a `NIOTSListenerChannel` on a given `NIOTSEventLoop`.
|
||||||
///
|
///
|
||||||
/// Note that `NIOTSListenerChannel` objects cannot be created on arbitrary loops types.
|
/// Note that `NIOTSListenerChannel` objects cannot be created on arbitrary loops types.
|
||||||
internal init(eventLoop: NIOTSEventLoop,
|
internal init(eventLoop: NIOTSEventLoop,
|
||||||
qos: DispatchQoS? = nil,
|
qos: DispatchQoS? = nil,
|
||||||
tcpOptions: NWProtocolTCP.Options,
|
tcpOptions: NWProtocolTCP.Options,
|
||||||
tlsOptions: NWProtocolTLS.Options?) {
|
tlsOptions: NWProtocolTLS.Options?,
|
||||||
|
childLoopGroup: NIOTSEventLoopGroup,
|
||||||
|
childChannelQoS: DispatchQoS?,
|
||||||
|
childTCPOptions: NWProtocolTCP.Options,
|
||||||
|
childTLSOptions: NWProtocolTLS.Options?) {
|
||||||
self.tsEventLoop = eventLoop
|
self.tsEventLoop = eventLoop
|
||||||
self.closePromise = eventLoop.makePromise()
|
self.closePromise = eventLoop.makePromise()
|
||||||
self.connectionQueue = eventLoop.channelQueue(label: "nio.transportservices.listenerchannel", qos: qos)
|
self.connectionQueue = eventLoop.channelQueue(label: "nio.transportservices.listenerchannel", qos: qos)
|
||||||
self.tcpOptions = tcpOptions
|
self.tcpOptions = tcpOptions
|
||||||
self.tlsOptions = tlsOptions
|
self.tlsOptions = tlsOptions
|
||||||
|
self.childLoopGroup = childLoopGroup
|
||||||
|
self.childChannelQoS = childChannelQoS
|
||||||
|
self.childTCPOptions = childTCPOptions
|
||||||
|
self.childTLSOptions = childTLSOptions
|
||||||
|
|
||||||
// Must come last, as it requires self to be completely initialized.
|
// Must come last, as it requires self to be completely initialized.
|
||||||
self._pipeline = ChannelPipeline(channel: self)
|
self._pipeline = ChannelPipeline(channel: self)
|
||||||
|
|
@ -412,7 +433,14 @@ extension NIOTSListenerChannel {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
self.pipeline.fireChannelRead(NIOAny(connection))
|
let newChannel = NIOTSConnectionChannel(wrapping: connection,
|
||||||
|
on: self.childLoopGroup.next() as! NIOTSEventLoop,
|
||||||
|
parent: self,
|
||||||
|
qos: self.childChannelQoS,
|
||||||
|
tcpOptions: self.childTCPOptions,
|
||||||
|
tlsOptions: self.childTLSOptions)
|
||||||
|
|
||||||
|
self.pipeline.fireChannelRead(NIOAny(newChannel))
|
||||||
self.pipeline.fireChannelReadComplete()
|
self.pipeline.fireChannelReadComplete()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import NIOTransportServices
|
||||||
import Foundation
|
import Foundation
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
final class ConnectRecordingHandler: ChannelOutboundHandler {
|
final class ConnectRecordingHandler: ChannelOutboundHandler {
|
||||||
typealias OutboundIn = Any
|
typealias OutboundIn = Any
|
||||||
typealias OutboundOut = Any
|
typealias OutboundOut = Any
|
||||||
|
|
@ -71,6 +72,7 @@ final class WritabilityChangedHandler: ChannelInboundHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
final class DisableWaitingAfterConnect: ChannelOutboundHandler {
|
final class DisableWaitingAfterConnect: ChannelOutboundHandler {
|
||||||
typealias OutboundIn = Any
|
typealias OutboundIn = Any
|
||||||
typealias OutboundOut = Any
|
typealias OutboundOut = Any
|
||||||
|
|
@ -103,6 +105,7 @@ final class PromiseOnActiveHandler: ChannelInboundHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSConnectionChannelTests: XCTestCase {
|
class NIOTSConnectionChannelTests: XCTestCase {
|
||||||
private var group: NIOTSEventLoopGroup!
|
private var group: NIOTSEventLoopGroup!
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -164,7 +164,7 @@ extension ByteBufferAllocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSEndToEndTests: XCTestCase {
|
class NIOTSEndToEndTests: XCTestCase {
|
||||||
private var group: NIOTSEventLoopGroup!
|
private var group: NIOTSEventLoopGroup!
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,8 @@ import NIO
|
||||||
import NIOConcurrencyHelpers
|
import NIOConcurrencyHelpers
|
||||||
import NIOTransportServices
|
import NIOTransportServices
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSEventLoopTest: XCTestCase {
|
class NIOTSEventLoopTest: XCTestCase {
|
||||||
func testIsInEventLoopWorks() throws {
|
func testIsInEventLoopWorks() throws {
|
||||||
let group = NIOTSEventLoopGroup()
|
let group = NIOTSEventLoopGroup()
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import NIO
|
||||||
import NIOTransportServices
|
import NIOTransportServices
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
final class BindRecordingHandler: ChannelOutboundHandler {
|
final class BindRecordingHandler: ChannelOutboundHandler {
|
||||||
typealias OutboundIn = Any
|
typealias OutboundIn = Any
|
||||||
typealias OutboundOut = Any
|
typealias OutboundOut = Any
|
||||||
|
|
@ -45,6 +46,7 @@ final class BindRecordingHandler: ChannelOutboundHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSListenerChannelTests: XCTestCase {
|
class NIOTSListenerChannelTests: XCTestCase {
|
||||||
private var group: NIOTSEventLoopGroup!
|
private var group: NIOTSEventLoopGroup!
|
||||||
|
|
||||||
|
|
@ -220,5 +222,42 @@ class NIOTSListenerChannelTests: XCTestCase {
|
||||||
XCTAssertNoThrow(try listener.close().wait())
|
XCTAssertNoThrow(try listener.close().wait())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testChannelEmitsChannels() throws {
|
||||||
|
class ChannelReceiver: ChannelInboundHandler {
|
||||||
|
typealias InboundIn = Channel
|
||||||
|
typealias InboundOut = Channel
|
||||||
|
|
||||||
|
private let promise: EventLoopPromise<Channel>
|
||||||
|
|
||||||
|
init(_ promise: EventLoopPromise<Channel>) {
|
||||||
|
self.promise = promise
|
||||||
|
}
|
||||||
|
|
||||||
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||||
|
let channel = self.unwrapInboundIn(data)
|
||||||
|
self.promise.succeed(channel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let channelPromise = self.group.next().makePromise(of: Channel.self)
|
||||||
|
let listener = try NIOTSListenerBootstrap(group: self.group)
|
||||||
|
.serverChannelInitializer { channel in
|
||||||
|
channel.pipeline.addHandler(ChannelReceiver(channelPromise))
|
||||||
|
}
|
||||||
|
.bind(host: "localhost", port: 0).wait()
|
||||||
|
defer {
|
||||||
|
XCTAssertNoThrow(try listener.close().wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
let connection = try NIOTSConnectionBootstrap(group: self.group).connect(to: listener.localAddress!).wait()
|
||||||
|
defer {
|
||||||
|
XCTAssertNoThrow(try connection.close().wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
let promisedChannel = try channelPromise.futureResult.wait()
|
||||||
|
XCTAssertEqual(promisedChannel.remoteAddress, connection.localAddress)
|
||||||
|
XCTAssertEqual(promisedChannel.localAddress, connection.remoteAddress)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import Network
|
||||||
@testable import NIOTransportServices
|
@testable import NIOTransportServices
|
||||||
|
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSSocketOptionTests: XCTestCase {
|
class NIOTSSocketOptionTests: XCTestCase {
|
||||||
private var options: NWProtocolTCP.Options!
|
private var options: NWProtocolTCP.Options!
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ private extension Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
|
||||||
class NIOTSSocketOptionsOnChannelTests: XCTestCase {
|
class NIOTSSocketOptionsOnChannelTests: XCTestCase {
|
||||||
private var group: NIOTSEventLoopGroup!
|
private var group: NIOTSEventLoopGroup!
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue