Async methods for `NIOTSListenerBootstrap` and `NIOTSConnectionBootstrap` (#178)
* Async methods for `NIOTSListenerBootstrap` and `NIOTSConnectionBootstrap` # Motivation We want to support async bootstrapping with all our bootstraps. # Modification This PR adds support for the `NIOTSListenerBootstrap` and `NIOTSConnectionBootstrap`. It also adds the three variants of methods to it (abstract output, `NIOAsyncChannel` based and protocol negotiation based) # Result We now support asynchronous interaction with the `NIOTSListenerBootstrap` and `NIOTSConnectionBootstrap` * Use protocolHandlers properly * Update NIO version * code review * REview * Doc indention
This commit is contained in:
parent
0194a62431
commit
ee0c7ffcd6
|
|
@ -21,7 +21,7 @@ let package = Package(
|
|||
.library(name: "NIOTransportServices", targets: ["NIOTransportServices"]),
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/apple/swift-nio.git", from: "2.51.0"),
|
||||
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
|
||||
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
|
||||
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
|
||||
],
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#if canImport(Network)
|
||||
import NIOCore
|
||||
@_spi(AsyncChannel) import NIOCore
|
||||
import Dispatch
|
||||
import Network
|
||||
|
||||
|
|
@ -41,7 +41,19 @@ import Network
|
|||
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
|
||||
public final class NIOTSConnectionBootstrap {
|
||||
private let group: EventLoopGroup
|
||||
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>)?
|
||||
private var _channelInitializer: ((Channel) -> EventLoopFuture<Void>)
|
||||
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>) {
|
||||
if let protocolHandlers = self.protocolHandlers {
|
||||
let channelInitializer = self._channelInitializer
|
||||
return { channel in
|
||||
channelInitializer(channel).flatMap {
|
||||
channel.pipeline.addHandlers(protocolHandlers(), position: .first)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return self._channelInitializer
|
||||
}
|
||||
}
|
||||
private var connectTimeout: TimeAmount = TimeAmount.seconds(10)
|
||||
private var channelOptions = ChannelOptions.Storage()
|
||||
private var qos: DispatchQoS?
|
||||
|
|
@ -87,6 +99,7 @@ public final class NIOTSConnectionBootstrap {
|
|||
|
||||
self.group = group
|
||||
self.channelOptions.append(key: ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
|
||||
self._channelInitializer = { channel in channel.eventLoop.makeSucceededVoidFuture() }
|
||||
}
|
||||
|
||||
/// Initialize the connected `NIOTSConnectionChannel` with `initializer`. The most common task in initializer is to add
|
||||
|
|
@ -97,7 +110,7 @@ public final class NIOTSConnectionBootstrap {
|
|||
/// - parameters:
|
||||
/// - handler: A closure that initializes the provided `Channel`.
|
||||
public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
|
||||
self.channelInitializer = handler
|
||||
self._channelInitializer = handler
|
||||
return self
|
||||
}
|
||||
|
||||
|
|
@ -216,7 +229,7 @@ public final class NIOTSConnectionBootstrap {
|
|||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions)
|
||||
}
|
||||
let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) }
|
||||
let initializer = self.channelInitializer
|
||||
let channelOptions = self.channelOptions
|
||||
|
||||
return conn.eventLoop.flatSubmit {
|
||||
|
|
@ -261,6 +274,525 @@ public final class NIOTSConnectionBootstrap {
|
|||
}
|
||||
}
|
||||
|
||||
// MARK: Async connect methods with arbitrary payload
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSConnectionBootstrap {
|
||||
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to connect to.
|
||||
/// - port: The port to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Output: Sendable>(
|
||||
host: String,
|
||||
port: Int,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> Output {
|
||||
let validPortRange = Int(UInt16.min)...Int(UInt16.max)
|
||||
guard validPortRange.contains(port), let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else {
|
||||
throw NIOTSErrors.InvalidPort(port: port)
|
||||
}
|
||||
|
||||
return try await self.connect(
|
||||
endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort),
|
||||
channelInitializer: channelInitializer
|
||||
)
|
||||
}
|
||||
|
||||
/// Specify the `address` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The address to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Output: Sendable>(
|
||||
to address: SocketAddress,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> Output {
|
||||
try await self.connect0(
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
connectionChannel.connect(to: address, promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Output: Sendable>(
|
||||
unixDomainSocketPath: String,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> Output {
|
||||
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
|
||||
return try await self.connect(
|
||||
to: address,
|
||||
channelInitializer: channelInitializer
|
||||
)
|
||||
}
|
||||
|
||||
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The endpoint to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Output: Sendable>(
|
||||
endpoint: NWEndpoint,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> Output {
|
||||
try await self.connect0(
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
connectionChannel.triggerUserOutboundEvent(
|
||||
NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint),
|
||||
promise: promise
|
||||
)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Use a pre-existing `NWConnection` to connect a `Channel`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - connection: The `NWConnection` to wrap.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withExistingNWConnection<Output: Sendable>(
|
||||
_ connection: NWConnection,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> Output {
|
||||
try await self.connect0(
|
||||
existingNWConnection: connection,
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.registerAlreadyConfigured0(promise: promise)
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
private func connect0<ChannelInitializerResult, PostRegistrationTransformationResult>(
|
||||
existingNWConnection: NWConnection? = nil,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
|
||||
registration: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void,
|
||||
postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture<PostRegistrationTransformationResult>
|
||||
) -> EventLoopFuture<PostRegistrationTransformationResult> {
|
||||
let connectionChannel: NIOTSConnectionChannel
|
||||
if let newConnection = existingNWConnection {
|
||||
connectionChannel = NIOTSConnectionChannel(
|
||||
wrapping: newConnection,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions
|
||||
)
|
||||
} else {
|
||||
connectionChannel = NIOTSConnectionChannel(
|
||||
eventLoop: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.qos,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions
|
||||
)
|
||||
}
|
||||
let channelInitializer = { (channel: Channel) -> EventLoopFuture<ChannelInitializerResult> in
|
||||
let initializer = self.channelInitializer
|
||||
return initializer(channel).flatMap { channelInitializer(channel) }
|
||||
}
|
||||
let channelOptions = self.channelOptions
|
||||
|
||||
return connectionChannel.eventLoop.flatSubmit {
|
||||
return channelOptions.applyAllChannelOptions(to: connectionChannel).flatMap {
|
||||
channelInitializer(connectionChannel)
|
||||
}.flatMap { result -> EventLoopFuture<ChannelInitializerResult> in
|
||||
let connectPromise: EventLoopPromise<Void> = connectionChannel.eventLoop.makePromise()
|
||||
registration(connectionChannel, connectPromise)
|
||||
let cancelTask = connectionChannel.eventLoop.scheduleTask(in: self.connectTimeout) {
|
||||
connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout))
|
||||
connectionChannel.close(promise: nil)
|
||||
}
|
||||
|
||||
connectPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
|
||||
cancelTask.cancel()
|
||||
}
|
||||
return connectPromise.futureResult.map { result }
|
||||
}.flatMap { (result: ChannelInitializerResult) -> EventLoopFuture<PostRegistrationTransformationResult> in
|
||||
postRegisterTransformation(result, connectionChannel.eventLoop)
|
||||
}.flatMapErrorThrowing {
|
||||
connectionChannel.close(promise: nil)
|
||||
throw $0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Async connect methods with AsyncChannel
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSConnectionBootstrap {
|
||||
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to connect to.
|
||||
/// - port: The port to connect to.
|
||||
/// - backpressureStrategy: The back pressure strategy used by the channel.
|
||||
/// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled
|
||||
/// then finishing the `NIOAsyncChannelWriter` will lead to half closure.
|
||||
/// - inboundType: The channel's inbound type.
|
||||
/// - outboundType: The channel's outbound type.
|
||||
/// - Returns: A `NIOAsyncChannel` for the established connection.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Inbound: Sendable, Outbound: Sendable>(
|
||||
host: String,
|
||||
port: Int,
|
||||
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isOutboundHalfClosureEnabled: Bool = false,
|
||||
inboundType: Inbound.Type = Inbound.self,
|
||||
outboundType: Outbound.Type = Outbound.self
|
||||
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
|
||||
try await self.connect(
|
||||
host: host,
|
||||
port: port
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
return try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: backpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
|
||||
inboundType: inboundType,
|
||||
outboundType: outboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify the `address` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The address to connect to.
|
||||
/// - backpressureStrategy: The back pressure strategy used by the channel.
|
||||
/// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled
|
||||
/// then finishing the `NIOAsyncChannelWriter` will lead to half closure.
|
||||
/// - inboundType: The channel's inbound type.
|
||||
/// - outboundType: The channel's outbound type.
|
||||
/// - Returns: A `NIOAsyncChannel` for the established connection.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Inbound: Sendable, Outbound: Sendable>(
|
||||
to address: SocketAddress,
|
||||
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isOutboundHalfClosureEnabled: Bool = false,
|
||||
inboundType: Inbound.Type = Inbound.self,
|
||||
outboundType: Outbound.Type = Outbound.self
|
||||
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
|
||||
try await self.connect(
|
||||
to: address
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
return try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: backpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
|
||||
inboundType: inboundType,
|
||||
outboundType: outboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
|
||||
/// - backpressureStrategy: The back pressure strategy used by the channel.
|
||||
/// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled
|
||||
/// then finishing the `NIOAsyncChannelWriter` will lead to half closure.
|
||||
/// - inboundType: The channel's inbound type.
|
||||
/// - outboundType: The channel's outbound type.
|
||||
/// - Returns: A `NIOAsyncChannel` for the established connection.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Inbound: Sendable, Outbound: Sendable>(
|
||||
unixDomainSocketPath: String,
|
||||
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isOutboundHalfClosureEnabled: Bool = false,
|
||||
inboundType: Inbound.Type = Inbound.self,
|
||||
outboundType: Outbound.Type = Outbound.self
|
||||
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
|
||||
try await self.connect(
|
||||
unixDomainSocketPath: unixDomainSocketPath
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
return try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: backpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
|
||||
inboundType: inboundType,
|
||||
outboundType: outboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The endpoint to connect to.
|
||||
/// - backpressureStrategy: The back pressure strategy used by the channel.
|
||||
/// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled
|
||||
/// then finishing the `NIOAsyncChannelWriter` will lead to half closure.
|
||||
/// - inboundType: The channel's inbound type.
|
||||
/// - outboundType: The channel's outbound type.
|
||||
/// - Returns: A `NIOAsyncChannel` for the established connection.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Inbound: Sendable, Outbound: Sendable>(
|
||||
endpoint: NWEndpoint,
|
||||
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isOutboundHalfClosureEnabled: Bool = false,
|
||||
inboundType: Inbound.Type = Inbound.self,
|
||||
outboundType: Outbound.Type = Outbound.self
|
||||
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
|
||||
try await self.connect(
|
||||
endpoint: endpoint
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
return try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: backpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
|
||||
inboundType: inboundType,
|
||||
outboundType: outboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Use a pre-existing `NWConnection` to connect a `Channel`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - connection: The `NWConnection` to wrap.
|
||||
/// - backpressureStrategy: The back pressure strategy used by the channel.
|
||||
/// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled
|
||||
/// then finishing the `NIOAsyncChannelWriter` will lead to half closure.
|
||||
/// - inboundType: The channel's inbound type.
|
||||
/// - outboundType: The channel's outbound type.
|
||||
/// - Returns: A `NIOAsyncChannel` for the established connection.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withExistingNWConnection<Inbound: Sendable, Outbound: Sendable>(
|
||||
_ connection: NWConnection,
|
||||
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isOutboundHalfClosureEnabled: Bool = false,
|
||||
inboundType: Inbound.Type = Inbound.self,
|
||||
outboundType: Outbound.Type = Outbound.self
|
||||
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
|
||||
try await self.withExistingNWConnection(
|
||||
connection
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
return try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: backpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
|
||||
inboundType: inboundType,
|
||||
outboundType: outboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Async connect methods with protocol negotation
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSConnectionBootstrap {
|
||||
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to connect to.
|
||||
/// - port: The port to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: The protocol negotiation result.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Handler: NIOProtocolNegotiationHandler>(
|
||||
host: String,
|
||||
port: Int,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> Handler.NegotiationResult {
|
||||
let validPortRange = Int(UInt16.min)...Int(UInt16.max)
|
||||
guard validPortRange.contains(port) else {
|
||||
throw NIOTSErrors.InvalidPort(port: port)
|
||||
}
|
||||
|
||||
guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else {
|
||||
throw NIOTSErrors.InvalidPort(port: port)
|
||||
}
|
||||
return try await self.connect(
|
||||
endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort),
|
||||
channelInitializer: channelInitializer
|
||||
)
|
||||
}
|
||||
|
||||
/// Specify the `address` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The address to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: The protocol negotiation result.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Handler: NIOProtocolNegotiationHandler>(
|
||||
to address: SocketAddress,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> Handler.NegotiationResult {
|
||||
try await self.connect0(
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
connectionChannel.connect(to: address, promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: The protocol negotiation result.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Handler: NIOProtocolNegotiationHandler>(
|
||||
unixDomainSocketPath: String,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> Handler.NegotiationResult {
|
||||
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
|
||||
return try await self.connect(
|
||||
to: address,
|
||||
channelInitializer: channelInitializer
|
||||
)
|
||||
}
|
||||
|
||||
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The endpoint to connect to.
|
||||
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: The protocol negotiation result.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func connect<Handler: NIOProtocolNegotiationHandler>(
|
||||
endpoint: NWEndpoint,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> Handler.NegotiationResult {
|
||||
try await self.connect0(
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
connectionChannel.triggerUserOutboundEvent(
|
||||
NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint),
|
||||
promise: promise
|
||||
)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Use a pre-existing `NWConnection` to connect a `Channel`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - connection: The `NWConnection` to wrap.
|
||||
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: The protocol negotiation result.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withExistingNWConnection<Handler: NIOProtocolNegotiationHandler>(
|
||||
_ connection: NWConnection,
|
||||
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> Handler.NegotiationResult {
|
||||
try await self.connect0(
|
||||
existingNWConnection: connection,
|
||||
channelInitializer: channelInitializer,
|
||||
registration: { connectionChannel, promise in
|
||||
connectionChannel.registerAlreadyConfigured0(promise: promise)
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
}
|
||||
|
||||
@available(*, unavailable)
|
||||
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
|
||||
extension NIOTSConnectionBootstrap: Sendable {}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#if canImport(Network)
|
||||
import NIOCore
|
||||
@_spi(AsyncChannel) import NIOCore
|
||||
import Dispatch
|
||||
import Network
|
||||
|
||||
|
|
@ -313,24 +313,28 @@ public final class NIOTSListenerBootstrap {
|
|||
|
||||
let serverChannel: NIOTSListenerChannel
|
||||
if let newListener = existingNWListener {
|
||||
serverChannel = NIOTSListenerChannel(wrapping: newListener,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
serverChannel = NIOTSListenerChannel(
|
||||
wrapping: newListener,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions
|
||||
)
|
||||
} else {
|
||||
serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
serverChannel = NIOTSListenerChannel(
|
||||
eventLoop: eventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions
|
||||
)
|
||||
}
|
||||
|
||||
return eventLoop.submit {
|
||||
|
|
@ -373,6 +377,533 @@ public final class NIOTSListenerBootstrap {
|
|||
}
|
||||
}
|
||||
|
||||
// MARK: Async bind methods with arbitrary payload
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSListenerBootstrap {
|
||||
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to bind on.
|
||||
/// - port: The port to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `bind`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Output: Sendable>(
|
||||
host: String,
|
||||
port: Int,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> NIOAsyncChannel<Output, Never> {
|
||||
let validPortRange = Int(UInt16.min)...Int(UInt16.max)
|
||||
guard validPortRange.contains(port) else {
|
||||
throw NIOTSErrors.InvalidPort(port: port)
|
||||
}
|
||||
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
do {
|
||||
// NWListener does not actually resolve hostname-based NWEndpoints
|
||||
// for use with requiredLocalEndpoint, so we fall back to
|
||||
// SocketAddress for this.
|
||||
let address = try SocketAddress.makeAddressResolvingHost(host, port: port)
|
||||
serverChannel.bind(to: address, promise: promise)
|
||||
} catch {
|
||||
promise.fail(error)
|
||||
}
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to `address`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The `SocketAddress` to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `bind`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Output: Sendable>(
|
||||
to address: SocketAddress,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> NIOAsyncChannel<Output, Never> {
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
serverChannel.bind(to: address, promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The `NWEndpoint` to bind this channel to.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `bind`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Output: Sendable>(
|
||||
endpoint: NWEndpoint,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> NIOAsyncChannel<Output, Never> {
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
serverChannel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - listener: The NWListener to wrap.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `bind`
|
||||
/// method.
|
||||
/// - Returns: The result of the channel initializer.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withNWListener<Output: Sendable>(
|
||||
_ listener: NWListener,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
|
||||
) async throws -> NIOAsyncChannel<Output, Never> {
|
||||
return try await self.bind0(
|
||||
existingNWListener: listener,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.registerAlreadyConfigured0(promise: promise)
|
||||
},
|
||||
postRegisterTransformation: { $1.makeSucceededFuture($0) }
|
||||
).get()
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
private func bind0<ChannelInitializerResult, PostRegistrationTransformationResult>(
|
||||
existingNWListener: NWListener? = nil,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
|
||||
registration: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void,
|
||||
postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture<PostRegistrationTransformationResult>
|
||||
) -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> {
|
||||
let eventLoop = self.group.next() as! NIOTSEventLoop
|
||||
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
|
||||
let childChannelInit = self.childChannelInit
|
||||
let serverChannelOptions = self.serverChannelOptions
|
||||
let childChannelOptions = self.childChannelOptions
|
||||
|
||||
let serverChannel: NIOTSListenerChannel
|
||||
if let newListener = existingNWListener {
|
||||
serverChannel = NIOTSListenerChannel(wrapping: newListener,
|
||||
on: self.group.next() as! NIOTSEventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
} else {
|
||||
serverChannel = NIOTSListenerChannel(eventLoop: eventLoop,
|
||||
qos: self.serverQoS,
|
||||
tcpOptions: self.tcpOptions,
|
||||
tlsOptions: self.tlsOptions,
|
||||
childLoopGroup: self.childGroup,
|
||||
childChannelQoS: self.childQoS,
|
||||
childTCPOptions: self.tcpOptions,
|
||||
childTLSOptions: self.tlsOptions)
|
||||
}
|
||||
|
||||
return eventLoop.submit {
|
||||
serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
|
||||
serverChannelInit(serverChannel)
|
||||
}.flatMap { (_) -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> in
|
||||
do {
|
||||
try serverChannel.pipeline.syncOperations.addHandler(
|
||||
AcceptHandler(childChannelInitializer: childChannelInit, childChannelOptions: childChannelOptions),
|
||||
name: "AcceptHandler"
|
||||
)
|
||||
let asyncChannel = try NIOAsyncChannel<PostRegistrationTransformationResult, Never>
|
||||
.wrapAsyncChannelWithTransformations(
|
||||
synchronouslyWrapping: serverChannel,
|
||||
backpressureStrategy: serverBackpressureStrategy,
|
||||
channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult, EventLoop)> in
|
||||
// The channelReadTransformation is run on the EL of the server channel
|
||||
// We have to make sure that we execute child channel initializer on the
|
||||
// EL of the child channel.
|
||||
channel.eventLoop.flatSubmit {
|
||||
childChannelInitializer(channel).map { ($0, channel.eventLoop) }
|
||||
}
|
||||
},
|
||||
postFireChannelReadTransformation: { result, eventLoop in
|
||||
eventLoop.flatSubmit {
|
||||
postRegisterTransformation(result, eventLoop)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
let bindPromise = eventLoop.makePromise(of: Void.self)
|
||||
registration(serverChannel, bindPromise)
|
||||
|
||||
if let bindTimeout = self.bindTimeout {
|
||||
let cancelTask = eventLoop.scheduleTask(in: bindTimeout) {
|
||||
bindPromise.fail(NIOTSErrors.BindTimeout(timeout: bindTimeout))
|
||||
serverChannel.close(promise: nil)
|
||||
}
|
||||
|
||||
bindPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
|
||||
cancelTask.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
return bindPromise.futureResult
|
||||
.map { (_) -> NIOAsyncChannel<PostRegistrationTransformationResult, Never> in asyncChannel
|
||||
}
|
||||
} catch {
|
||||
return eventLoop.makeFailedFuture(error)
|
||||
}
|
||||
}.flatMapError { error -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> in
|
||||
serverChannel.close0(error: error, mode: .all, promise: nil)
|
||||
return eventLoop.makeFailedFuture(error)
|
||||
}
|
||||
}.flatMap {
|
||||
$0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: AsyncChannel based bind
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSListenerBootstrap {
|
||||
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to bind on.
|
||||
/// - port: The port to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels.
|
||||
/// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled
|
||||
/// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure.
|
||||
/// - childChannelInboundType: The child channel's inbound type.
|
||||
/// - childChannelOutboundType: The child channel's outbound type.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
|
||||
host: String,
|
||||
port: Int,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isChildChannelOutboundHalfClosureEnabled: Bool = false,
|
||||
childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self,
|
||||
childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self
|
||||
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
|
||||
return try await self.bind(
|
||||
host: host,
|
||||
port: port,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: childChannelBackpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled,
|
||||
inboundType: childChannelInboundType,
|
||||
outboundType: childChannelOutboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to `address`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The `SocketAddress` to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels.
|
||||
/// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled
|
||||
/// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure.
|
||||
/// - childChannelInboundType: The child channel's inbound type.
|
||||
/// - childChannelOutboundType: The child channel's outbound type.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
|
||||
to address: SocketAddress,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isChildChannelOutboundHalfClosureEnabled: Bool = false,
|
||||
childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self,
|
||||
childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self
|
||||
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
|
||||
return try await self.bind(
|
||||
to: address,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: childChannelBackpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled,
|
||||
inboundType: childChannelInboundType,
|
||||
outboundType: childChannelOutboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The `NWEndpoint` to bind this channel to.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels.
|
||||
/// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled
|
||||
/// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure.
|
||||
/// - childChannelInboundType: The child channel's inbound type.
|
||||
/// - childChannelOutboundType: The child channel's outbound type.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
|
||||
endpoint: NWEndpoint,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isChildChannelOutboundHalfClosureEnabled: Bool = false,
|
||||
childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self,
|
||||
childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self
|
||||
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
|
||||
return try await self.bind(
|
||||
endpoint: endpoint,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: childChannelBackpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled,
|
||||
inboundType: childChannelInboundType,
|
||||
outboundType: childChannelOutboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - listener: The NWListener to wrap.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels.
|
||||
/// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled
|
||||
/// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure.
|
||||
/// - childChannelInboundType: The child channel's inbound type.
|
||||
/// - childChannelOutboundType: The child channel's outbound type.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withNWListener<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
|
||||
_ listener: NWListener,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
isChildChannelOutboundHalfClosureEnabled: Bool = false,
|
||||
childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self,
|
||||
childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self
|
||||
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
|
||||
return try await self.withNWListener(
|
||||
listener,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
backpressureStrategy: childChannelBackpressureStrategy,
|
||||
isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled,
|
||||
inboundType: childChannelInboundType,
|
||||
outboundType: childChannelOutboundType
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Protocol negotiation based bind
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOTSListenerBootstrap {
|
||||
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - host: The host to bind on.
|
||||
/// - port: The port to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Handler: NIOProtocolNegotiationHandler>(
|
||||
host: String,
|
||||
port: Int,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
|
||||
let address = try SocketAddress.makeAddressResolvingHost(host, port: port)
|
||||
|
||||
return try await self.bind(
|
||||
to: address,
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer
|
||||
)
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to `address`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - address: The `SocketAddress` to bind on.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Handler: NIOProtocolNegotiationHandler>(
|
||||
to address: SocketAddress,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
serverChannel.bind(to: address, promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - endpoint: The `NWEndpoint` to bind this channel to.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func bind<Handler: NIOProtocolNegotiationHandler>(
|
||||
endpoint: NWEndpoint,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.register().whenComplete { result in
|
||||
switch result {
|
||||
case .success:
|
||||
serverChannel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise)
|
||||
case .failure(let error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
|
||||
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - listener: The NWListener to wrap.
|
||||
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
|
||||
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
|
||||
/// the protocol.
|
||||
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
@_spi(AsyncChannel)
|
||||
public func withNWListener<Handler: NIOProtocolNegotiationHandler>(
|
||||
_ listener: NWListener,
|
||||
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
|
||||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
|
||||
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
|
||||
return try await self.bind0(
|
||||
serverBackpressureStrategy: serverBackpressureStrategy,
|
||||
childChannelInitializer: childChannelInitializer,
|
||||
registration: { (serverChannel, promise) in
|
||||
serverChannel.registerAlreadyConfigured0(promise: promise)
|
||||
},
|
||||
postRegisterTransformation: { handler, eventLoop in
|
||||
eventLoop.assertInEventLoop()
|
||||
return handler.protocolNegotiationResult.flatMap { result in
|
||||
result.resolve(on: eventLoop)
|
||||
}
|
||||
}
|
||||
).get()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
|
||||
private class AcceptHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = NIOTSConnectionChannel
|
||||
|
|
@ -430,4 +961,23 @@ private class AcceptHandler: ChannelInboundHandler {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension NIOProtocolNegotiationResult {
|
||||
func resolve(on eventLoop: EventLoop) -> EventLoopFuture<NegotiationResult> {
|
||||
Self.resolve(on: eventLoop, result: self)
|
||||
}
|
||||
|
||||
static func resolve(on eventLoop: EventLoop, result: Self) -> EventLoopFuture<NegotiationResult> {
|
||||
switch result {
|
||||
case .finished(let negotiationResult):
|
||||
return eventLoop.makeSucceededFuture(negotiationResult)
|
||||
|
||||
case .deferredResult(let future):
|
||||
return future.flatMap { result in
|
||||
return resolve(on: eventLoop, result: result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -0,0 +1,696 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#if canImport(Network)
|
||||
import NIOConcurrencyHelpers
|
||||
@_spi(AsyncChannel) import NIOTransportServices
|
||||
@_spi(AsyncChannel) import NIOCore
|
||||
import XCTest
|
||||
@_spi(AsyncChannel) import NIOTLS
|
||||
|
||||
private final class LineDelimiterCoder: ByteToMessageDecoder, MessageToByteEncoder {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = ByteBuffer
|
||||
|
||||
private let newLine = "\n".utf8.first!
|
||||
|
||||
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
|
||||
let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: self.newLine) }
|
||||
if let readable = readable {
|
||||
context.fireChannelRead(self.wrapInboundOut(buffer.readSlice(length: readable)!))
|
||||
buffer.moveReaderIndex(forwardBy: 1)
|
||||
return .continue
|
||||
}
|
||||
return .needMoreData
|
||||
}
|
||||
|
||||
func encode(data: ByteBuffer, out: inout ByteBuffer) throws {
|
||||
out.writeImmutableBuffer(data)
|
||||
out.writeString("\n")
|
||||
}
|
||||
}
|
||||
|
||||
private final class TLSUserEventHandler: ChannelInboundHandler, RemovableChannelHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = ByteBuffer
|
||||
enum ALPN: String {
|
||||
case string
|
||||
case byte
|
||||
case unknown
|
||||
}
|
||||
|
||||
private var proposedALPN: ALPN?
|
||||
|
||||
init(
|
||||
proposedALPN: ALPN? = nil
|
||||
) {
|
||||
self.proposedALPN = proposedALPN
|
||||
}
|
||||
|
||||
func handlerAdded(context: ChannelHandlerContext) {
|
||||
guard context.channel.isActive else {
|
||||
return
|
||||
}
|
||||
|
||||
if let proposedALPN = self.proposedALPN {
|
||||
self.proposedALPN = nil
|
||||
context.writeAndFlush(.init(ByteBuffer(string: "negotiate-alpn:\(proposedALPN.rawValue)")), promise: nil)
|
||||
}
|
||||
context.fireChannelActive()
|
||||
}
|
||||
|
||||
func channelActive(context: ChannelHandlerContext) {
|
||||
if let proposedALPN = self.proposedALPN {
|
||||
context.writeAndFlush(.init(ByteBuffer(string: "negotiate-alpn:\(proposedALPN.rawValue)")), promise: nil)
|
||||
}
|
||||
context.fireChannelActive()
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let buffer = self.unwrapInboundIn(data)
|
||||
let string = String(buffer: buffer)
|
||||
|
||||
if string.hasPrefix("negotiate-alpn:") {
|
||||
let alpn = String(string.dropFirst(15))
|
||||
context.writeAndFlush(.init(ByteBuffer(string: "alpn:\(alpn)")), promise: nil)
|
||||
context.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: alpn))
|
||||
context.pipeline.removeHandler(self, promise: nil)
|
||||
} else if string.hasPrefix("alpn:") {
|
||||
context.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: String(string.dropFirst(5))))
|
||||
context.pipeline.removeHandler(self, promise: nil)
|
||||
} else {
|
||||
context.fireChannelRead(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class ByteBufferToStringHandler: ChannelDuplexHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = String
|
||||
typealias OutboundIn = String
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let buffer = self.unwrapInboundIn(data)
|
||||
context.fireChannelRead(self.wrapInboundOut(String(buffer: buffer)))
|
||||
}
|
||||
|
||||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||
let buffer = ByteBuffer(string: self.unwrapOutboundIn(data))
|
||||
context.write(.init(buffer), promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private final class ByteBufferToByteHandler: ChannelDuplexHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = UInt8
|
||||
typealias OutboundIn = UInt8
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
var buffer = self.unwrapInboundIn(data)
|
||||
let byte = buffer.readInteger(as: UInt8.self)!
|
||||
context.fireChannelRead(self.wrapInboundOut(byte))
|
||||
}
|
||||
|
||||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||
let buffer = ByteBuffer(integer: self.unwrapOutboundIn(data))
|
||||
context.write(.init(buffer), promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
private final class AddressedEnvelopingHandler: ChannelDuplexHandler {
|
||||
typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||
typealias InboundOut = ByteBuffer
|
||||
typealias OutboundIn = ByteBuffer
|
||||
typealias OutboundOut = Any
|
||||
|
||||
var remoteAddress: SocketAddress?
|
||||
|
||||
init(remoteAddress: SocketAddress? = nil) {
|
||||
self.remoteAddress = remoteAddress
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let envelope = self.unwrapInboundIn(data)
|
||||
self.remoteAddress = envelope.remoteAddress
|
||||
|
||||
context.fireChannelRead(self.wrapInboundOut(envelope.data))
|
||||
}
|
||||
|
||||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||
let buffer = self.unwrapOutboundIn(data)
|
||||
if let remoteAddress = self.remoteAddress {
|
||||
context.write(self.wrapOutboundOut(AddressedEnvelope(remoteAddress: remoteAddress, data: buffer)), promise: promise)
|
||||
return
|
||||
}
|
||||
|
||||
context.write(self.wrapOutboundOut(buffer), promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
final class AsyncChannelBootstrapTests: XCTestCase {
|
||||
enum NegotiationResult {
|
||||
case string(NIOAsyncChannel<String, String>)
|
||||
case byte(NIOAsyncChannel<UInt8, UInt8>)
|
||||
}
|
||||
|
||||
struct ProtocolNegotiationError: Error {}
|
||||
|
||||
enum StringOrByte: Hashable {
|
||||
case string(String)
|
||||
case byte(UInt8)
|
||||
}
|
||||
|
||||
func testServerClientBootstrap_withAsyncChannel_andHostPort() async throws {
|
||||
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
|
||||
defer {
|
||||
try! eventLoopGroup.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
.childChannelInitializer { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler())
|
||||
}
|
||||
}
|
||||
.bind(
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
childChannelInboundType: String.self,
|
||||
childChannelOutboundType: String.self
|
||||
)
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var iterator = stream.makeAsyncIterator()
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { _ in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
for try await value in childChannel.inboundStream {
|
||||
continuation.yield(.string(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
try await stringChannel.outboundWriter.write("hello")
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
||||
func testAsyncChannelProtocolNegotiation() async throws {
|
||||
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
|
||||
defer {
|
||||
try! eventLoopGroup.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
.bind(
|
||||
host: "127.0.0.1",
|
||||
port: 0
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try self.configureProtocolNegotiationHandlers(channel: channel)
|
||||
}
|
||||
}
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var serverIterator = stream.makeAsyncIterator()
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
group.addTask {
|
||||
switch childChannel {
|
||||
case .string(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.string(value))
|
||||
}
|
||||
case .byte(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.byte(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let stringNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedALPN: .string
|
||||
)
|
||||
switch stringNegotiationResult {
|
||||
case .string(let stringChannel):
|
||||
// This is the actual content
|
||||
try await stringChannel.outboundWriter.write("hello")
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .string("hello"))
|
||||
case .byte:
|
||||
preconditionFailure()
|
||||
}
|
||||
|
||||
let byteNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedALPN: .byte
|
||||
)
|
||||
switch byteNegotiationResult {
|
||||
case .string:
|
||||
preconditionFailure()
|
||||
case .byte(let byteChannel):
|
||||
// This is the actual content
|
||||
try await byteChannel.outboundWriter.write(UInt8(8))
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .byte(8))
|
||||
}
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
||||
func testAsyncChannelNestedProtocolNegotiation() async throws {
|
||||
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
|
||||
defer {
|
||||
try! eventLoopGroup.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
.bind(
|
||||
host: "127.0.0.1",
|
||||
port: 0
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try self.configureNestedProtocolNegotiationHandlers(channel: channel)
|
||||
}
|
||||
}
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var serverIterator = stream.makeAsyncIterator()
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
group.addTask {
|
||||
switch childChannel {
|
||||
case .string(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.string(value))
|
||||
}
|
||||
case .byte(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.byte(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let stringStringNegotiationResult = try await self.makeClientChannelWithNestedProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedOuterALPN: .string,
|
||||
proposedInnerALPN: .string
|
||||
)
|
||||
switch stringStringNegotiationResult {
|
||||
case .string(let stringChannel):
|
||||
// This is the actual content
|
||||
try await stringChannel.outboundWriter.write("hello")
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .string("hello"))
|
||||
case .byte:
|
||||
preconditionFailure()
|
||||
}
|
||||
|
||||
let byteStringNegotiationResult = try await self.makeClientChannelWithNestedProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedOuterALPN: .byte,
|
||||
proposedInnerALPN: .string
|
||||
)
|
||||
switch byteStringNegotiationResult {
|
||||
case .string(let stringChannel):
|
||||
// This is the actual content
|
||||
try await stringChannel.outboundWriter.write("hello")
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .string("hello"))
|
||||
case .byte:
|
||||
preconditionFailure()
|
||||
}
|
||||
|
||||
let byteByteNegotiationResult = try await self.makeClientChannelWithNestedProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedOuterALPN: .byte,
|
||||
proposedInnerALPN: .byte
|
||||
)
|
||||
switch byteByteNegotiationResult {
|
||||
case .string:
|
||||
preconditionFailure()
|
||||
case .byte(let byteChannel):
|
||||
// This is the actual content
|
||||
try await byteChannel.outboundWriter.write(UInt8(8))
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .byte(8))
|
||||
}
|
||||
|
||||
let stringByteNegotiationResult = try await self.makeClientChannelWithNestedProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedOuterALPN: .string,
|
||||
proposedInnerALPN: .byte
|
||||
)
|
||||
switch stringByteNegotiationResult {
|
||||
case .string:
|
||||
preconditionFailure()
|
||||
case .byte(let byteChannel):
|
||||
// This is the actual content
|
||||
try await byteChannel.outboundWriter.write(UInt8(8))
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .byte(8))
|
||||
}
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
||||
func testAsyncChannelProtocolNegotiation_whenFails() async throws {
|
||||
final class CollectingHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = Channel
|
||||
|
||||
private let channels: NIOLockedValueBox<[Channel]>
|
||||
|
||||
init(channels: NIOLockedValueBox<[Channel]>) {
|
||||
self.channels = channels
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let channel = self.unwrapInboundIn(data)
|
||||
|
||||
self.channels.withLockedValue { $0.append(channel) }
|
||||
|
||||
context.fireChannelRead(data)
|
||||
}
|
||||
}
|
||||
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
|
||||
defer {
|
||||
try! eventLoopGroup.syncShutdownGracefully()
|
||||
}
|
||||
let channels = NIOLockedValueBox<[Channel]>([Channel]())
|
||||
|
||||
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.serverChannelInitializer { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(CollectingHandler(channels: channels))
|
||||
}
|
||||
}
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
.bind(
|
||||
host: "127.0.0.1",
|
||||
port: 0
|
||||
) { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try self.configureProtocolNegotiationHandlers(channel: channel)
|
||||
}
|
||||
}
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var serverIterator = stream.makeAsyncIterator()
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
group.addTask {
|
||||
switch childChannel {
|
||||
case .string(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.string(value))
|
||||
}
|
||||
case .byte(let channel):
|
||||
for try await value in channel.inboundStream {
|
||||
continuation.yield(.byte(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await XCTAsyncAssertThrowsError(
|
||||
try await self.makeClientChannelWithProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedALPN: .unknown
|
||||
)
|
||||
) { error in
|
||||
XCTAssertTrue(error is ProtocolNegotiationError)
|
||||
}
|
||||
|
||||
// Let's check that we can still open a new connection
|
||||
let stringNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation(
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
port: channel.channel.localAddress!.port!,
|
||||
proposedALPN: .string
|
||||
)
|
||||
switch stringNegotiationResult {
|
||||
case .string(let stringChannel):
|
||||
// This is the actual content
|
||||
try await stringChannel.outboundWriter.write("hello")
|
||||
await XCTAsyncAssertEqual(await serverIterator.next(), .string("hello"))
|
||||
case .byte:
|
||||
preconditionFailure()
|
||||
}
|
||||
|
||||
let failedInboundChannel = channels.withLockedValue { channels -> Channel in
|
||||
XCTAssertEqual(channels.count, 2)
|
||||
return channels[0]
|
||||
}
|
||||
|
||||
// We are waiting here to make sure the channel got closed
|
||||
try await failedInboundChannel.closeFuture.get()
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Test Helpers
|
||||
|
||||
private func makeClientChannel(eventLoopGroup: EventLoopGroup, port: Int) async throws -> NIOAsyncChannel<String, String> {
|
||||
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
|
||||
.channelInitializer { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler())
|
||||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler())
|
||||
}
|
||||
}
|
||||
.connect(
|
||||
to: .init(ipAddress: "127.0.0.1", port: port),
|
||||
inboundType: String.self,
|
||||
outboundType: String.self
|
||||
)
|
||||
}
|
||||
|
||||
private func makeClientChannelWithProtocolNegotiation(
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
port: Int,
|
||||
proposedALPN: TLSUserEventHandler.ALPN
|
||||
) async throws -> NegotiationResult {
|
||||
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
|
||||
.connect(
|
||||
to: .init(ipAddress: "127.0.0.1", port: port)
|
||||
) { channel in
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func makeClientChannelWithNestedProtocolNegotiation(
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
port: Int,
|
||||
proposedOuterALPN: TLSUserEventHandler.ALPN,
|
||||
proposedInnerALPN: TLSUserEventHandler.ALPN
|
||||
) async throws -> NegotiationResult {
|
||||
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
|
||||
.connect(
|
||||
to: .init(ipAddress: "127.0.0.1", port: port)
|
||||
) { channel in
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
try self.configureNestedProtocolNegotiationHandlers(
|
||||
channel: channel,
|
||||
proposedOuterALPN: proposedOuterALPN,
|
||||
proposedInnerALPN: proposedInnerALPN
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
private func configureProtocolNegotiationHandlers(
|
||||
channel: Channel,
|
||||
proposedALPN: TLSUserEventHandler.ALPN? = nil
|
||||
) throws -> NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedALPN))
|
||||
return try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
private func configureNestedProtocolNegotiationHandlers(
|
||||
channel: Channel,
|
||||
proposedOuterALPN: TLSUserEventHandler.ALPN? = nil,
|
||||
proposedInnerALPN: TLSUserEventHandler.ALPN? = nil
|
||||
) throws -> NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
|
||||
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedOuterALPN))
|
||||
let negotiationHandler = NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult>(eventLoop: channel.eventLoop) { alpnResult, channel in
|
||||
switch alpnResult {
|
||||
case .negotiated(let alpn):
|
||||
switch alpn {
|
||||
case "string":
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN))
|
||||
let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
|
||||
|
||||
return NIOProtocolNegotiationResult.deferredResult(negotiationFuture.protocolNegotiationResult)
|
||||
}
|
||||
case "byte":
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN))
|
||||
let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
|
||||
|
||||
return NIOProtocolNegotiationResult.deferredResult(negotiationHandler.protocolNegotiationResult)
|
||||
}
|
||||
default:
|
||||
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
|
||||
}
|
||||
case .fallback:
|
||||
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
|
||||
}
|
||||
}
|
||||
try channel.pipeline.syncOperations.addHandler(negotiationHandler)
|
||||
return negotiationHandler
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
private func addTypedApplicationProtocolNegotiationHandler(to channel: Channel) throws -> NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {
|
||||
let negotiationHandler = NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult>(eventLoop: channel.eventLoop) { alpnResult, channel in
|
||||
switch alpnResult {
|
||||
case .negotiated(let alpn):
|
||||
switch alpn {
|
||||
case "string":
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler())
|
||||
let asyncChannel = try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
isOutboundHalfClosureEnabled: true,
|
||||
inboundType: String.self,
|
||||
outboundType: String.self
|
||||
)
|
||||
|
||||
return NIOProtocolNegotiationResult.finished(NegotiationResult.string(asyncChannel))
|
||||
}
|
||||
case "byte":
|
||||
return channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteBufferToByteHandler())
|
||||
|
||||
let asyncChannel = try NIOAsyncChannel(
|
||||
synchronouslyWrapping: channel,
|
||||
isOutboundHalfClosureEnabled: true,
|
||||
inboundType: UInt8.self,
|
||||
outboundType: UInt8.self
|
||||
)
|
||||
|
||||
return NIOProtocolNegotiationResult.finished(NegotiationResult.byte(asyncChannel))
|
||||
}
|
||||
default:
|
||||
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
|
||||
}
|
||||
case .fallback:
|
||||
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
|
||||
}
|
||||
}
|
||||
|
||||
try channel.pipeline.syncOperations.addHandler(negotiationHandler)
|
||||
return negotiationHandler
|
||||
}
|
||||
}
|
||||
|
||||
extension AsyncStream {
|
||||
fileprivate static func makeStream(
|
||||
of elementType: Element.Type = Element.self,
|
||||
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
|
||||
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
|
||||
var continuation: AsyncStream<Element>.Continuation!
|
||||
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
|
||||
return (stream: stream, continuation: continuation!)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
||||
private func XCTAsyncAssertEqual<Element: Equatable>(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #filePath, line: UInt = #line) async rethrows {
|
||||
let lhsResult = try await lhs()
|
||||
let rhsResult = try await rhs()
|
||||
XCTAssertEqual(lhsResult, rhsResult, file: file, line: line)
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
||||
private func XCTAsyncAssertThrowsError<T>(
|
||||
_ expression: @autoclosure () async throws -> T,
|
||||
_ message: @autoclosure () -> String = "",
|
||||
file: StaticString = #filePath,
|
||||
line: UInt = #line,
|
||||
_ errorHandler: (_ error: Error) -> Void = { _ in }
|
||||
) async {
|
||||
do {
|
||||
_ = try await expression()
|
||||
XCTFail(message(), file: file, line: line)
|
||||
} catch {
|
||||
errorHandler(error)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
||||
internal func XCTAsyncAssertThrowsError<T>(
|
||||
_ expression: @autoclosure () async throws -> T,
|
||||
file: StaticString = #filePath,
|
||||
line: UInt = #line,
|
||||
verify: (Error) -> Void = { _ in }
|
||||
) async {
|
||||
do {
|
||||
_ = try await expression()
|
||||
XCTFail("Expression did not throw error", file: file, line: line)
|
||||
} catch {
|
||||
verify(error)
|
||||
}
|
||||
}
|
||||
#endif
|
||||
Loading…
Reference in New Issue