Adopt latest SPI(AsyncChannel) changes (#181)

# Motivation
We introduced some breaking SPI(AsyncChannel) changes in NIO that we have to adopt here.

# Modification
This PR adopts the latest `NIOProtocolNegotiationResult` APIs. Additionally, it also drops all bind/connect methods on the bootstraps that are specific to protocol negotiation or `NIOAsyncChannel`.

# Result
Green CI on `main` and alignment between `NIOPosix` and `NIOTS.
This commit is contained in:
Franz Busch 2023-07-26 15:48:49 +01:00 committed by GitHub
parent 01fc0ae7e5
commit fbc49d892b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 645 deletions

View File

@ -21,7 +21,7 @@ let package = Package(
.library(name: "NIOTransportServices", targets: ["NIOTransportServices"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.57.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
],

View File

@ -328,8 +328,7 @@ extension NIOTSConnectionBootstrap {
promise.fail(error)
}
}
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@ -380,8 +379,7 @@ extension NIOTSConnectionBootstrap {
promise.fail(error)
}
}
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@ -403,18 +401,16 @@ extension NIOTSConnectionBootstrap {
channelInitializer: channelInitializer,
registration: { connectionChannel, promise in
connectionChannel.registerAlreadyConfigured0(promise: promise)
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private func connect0<ChannelInitializerResult, PostRegistrationTransformationResult>(
private func connect0<ChannelInitializerResult>(
existingNWConnection: NWConnection? = nil,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
registration: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void,
postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture<PostRegistrationTransformationResult>
) -> EventLoopFuture<PostRegistrationTransformationResult> {
registration: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<ChannelInitializerResult> {
let connectionChannel: NIOTSConnectionChannel
if let newConnection = existingNWConnection {
connectionChannel = NIOTSConnectionChannel(
@ -452,8 +448,6 @@ extension NIOTSConnectionBootstrap {
cancelTask.cancel()
}
return connectPromise.futureResult.map { result }
}.flatMap { (result: ChannelInitializerResult) -> EventLoopFuture<PostRegistrationTransformationResult> in
postRegisterTransformation(result, connectionChannel.eventLoop)
}.flatMapErrorThrowing {
connectionChannel.close(promise: nil)
throw $0
@ -462,287 +456,6 @@ extension NIOTSConnectionBootstrap {
}
}
// MARK: Async connect methods with AsyncChannel
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOTSConnectionBootstrap {
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - host: The host to connect to.
/// - port: The port to connect to.
/// - channelConfiguration: The channel's async channel configuration.
/// - Returns: A `NIOAsyncChannel` for the established connection.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Inbound: Sendable, Outbound: Sendable>(
host: String,
port: Int,
channelConfiguration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
try await self.connect(
host: host,
port: port
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: channelConfiguration
)
}
}
}
/// Specify the `address` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - address: The address to connect to.
/// - channelConfiguration: The channel's async channel configuration.
/// - Returns: A `NIOAsyncChannel` for the established connection.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Inbound: Sendable, Outbound: Sendable>(
to address: SocketAddress,
channelConfiguration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
try await self.connect(
to: address
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: channelConfiguration
)
}
}
}
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
///
/// - Parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
/// - channelConfiguration: The channel's async channel configuration.
/// - Returns: A `NIOAsyncChannel` for the established connection.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Inbound: Sendable, Outbound: Sendable>(
unixDomainSocketPath: String,
channelConfiguration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
try await self.connect(
unixDomainSocketPath: unixDomainSocketPath
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: channelConfiguration
)
}
}
}
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - endpoint: The endpoint to connect to.
/// - channelConfiguration: The channel's async channel configuration.
/// - Returns: A `NIOAsyncChannel` for the established connection.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Inbound: Sendable, Outbound: Sendable>(
endpoint: NWEndpoint,
channelConfiguration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
try await self.connect(
endpoint: endpoint
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: channelConfiguration
)
}
}
}
/// Use a pre-existing `NWConnection` to connect a `Channel`.
///
/// - Parameters:
/// - connection: The `NWConnection` to wrap.
/// - channelConfiguration: The channel's async channel configuration.
/// - Returns: A `NIOAsyncChannel` for the established connection.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func withExistingNWConnection<Inbound: Sendable, Outbound: Sendable>(
_ connection: NWConnection,
channelConfiguration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
try await self.withExistingNWConnection(
connection
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: channelConfiguration
)
}
}
}
}
// MARK: Async connect methods with protocol negotation
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOTSConnectionBootstrap {
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - host: The host to connect to.
/// - port: The port to connect to.
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: The protocol negotiation result.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Handler: NIOProtocolNegotiationHandler>(
host: String,
port: Int,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> Handler.NegotiationResult {
let validPortRange = Int(UInt16.min)...Int(UInt16.max)
guard validPortRange.contains(port) else {
throw NIOTSErrors.InvalidPort(port: port)
}
guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else {
throw NIOTSErrors.InvalidPort(port: port)
}
return try await self.connect(
endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort),
channelInitializer: channelInitializer
)
}
/// Specify the `address` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - address: The address to connect to.
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: The protocol negotiation result.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Handler: NIOProtocolNegotiationHandler>(
to address: SocketAddress,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> Handler.NegotiationResult {
try await self.connect0(
channelInitializer: channelInitializer,
registration: { connectionChannel, promise in
connectionChannel.register().whenComplete { result in
switch result {
case .success:
connectionChannel.connect(to: address, promise: promise)
case .failure(let error):
promise.fail(error)
}
}
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
///
/// - Parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: The protocol negotiation result.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Handler: NIOProtocolNegotiationHandler>(
unixDomainSocketPath: String,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> Handler.NegotiationResult {
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
return try await self.connect(
to: address,
channelInitializer: channelInitializer
)
}
/// Specify the `endpoint` to connect to for the TCP `Channel` that will be established.
///
/// - Parameters:
/// - endpoint: The endpoint to connect to.
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: The protocol negotiation result.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func connect<Handler: NIOProtocolNegotiationHandler>(
endpoint: NWEndpoint,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> Handler.NegotiationResult {
try await self.connect0(
channelInitializer: channelInitializer,
registration: { connectionChannel, promise in
connectionChannel.register().whenComplete { result in
switch result {
case .success:
connectionChannel.triggerUserOutboundEvent(
NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint),
promise: promise
)
case .failure(let error):
promise.fail(error)
}
}
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
/// Use a pre-existing `NWConnection` to connect a `Channel`.
///
/// - Parameters:
/// - connection: The `NWConnection` to wrap.
/// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: The protocol negotiation result.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func withExistingNWConnection<Handler: NIOProtocolNegotiationHandler>(
_ connection: NWConnection,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> Handler.NegotiationResult {
try await self.connect0(
existingNWConnection: connection,
channelInitializer: channelInitializer,
registration: { connectionChannel, promise in
connectionChannel.registerAlreadyConfigured0(promise: promise)
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
}
@available(*, unavailable)
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSConnectionBootstrap: Sendable {}

View File

@ -423,8 +423,7 @@ extension NIOTSListenerBootstrap {
promise.fail(error)
}
}
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@ -455,8 +454,7 @@ extension NIOTSListenerBootstrap {
promise.fail(error)
}
}
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@ -487,8 +485,7 @@ extension NIOTSListenerBootstrap {
promise.fail(error)
}
}
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@ -513,19 +510,17 @@ extension NIOTSListenerBootstrap {
childChannelInitializer: childChannelInitializer,
registration: { (serverChannel, promise) in
serverChannel.registerAlreadyConfigured0(promise: promise)
},
postRegisterTransformation: { $1.makeSucceededFuture($0) }
}
).get()
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private func bind0<ChannelInitializerResult, PostRegistrationTransformationResult>(
private func bind0<ChannelInitializerResult>(
existingNWListener: NWListener? = nil,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
registration: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void,
postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture<PostRegistrationTransformationResult>
) -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> {
registration: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<NIOAsyncChannel<ChannelInitializerResult, Never>> {
let eventLoop = self.group.next() as! NIOTSEventLoop
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
let childChannelInit = self.childChannelInit
@ -557,27 +552,22 @@ extension NIOTSListenerBootstrap {
return eventLoop.submit {
serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
serverChannelInit(serverChannel)
}.flatMap { (_) -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> in
}.flatMap { (_) -> EventLoopFuture<NIOAsyncChannel<ChannelInitializerResult, Never>> in
do {
try serverChannel.pipeline.syncOperations.addHandler(
AcceptHandler<NIOTSConnectionChannel>(childChannelInitializer: childChannelInit, childChannelOptions: childChannelOptions),
name: "AcceptHandler"
)
let asyncChannel = try NIOAsyncChannel<PostRegistrationTransformationResult, Never>
let asyncChannel = try NIOAsyncChannel<ChannelInitializerResult, Never>
.wrapAsyncChannelWithTransformations(
synchronouslyWrapping: serverChannel,
backpressureStrategy: serverBackpressureStrategy,
channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult, EventLoop)> in
channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult)> in
// The channelReadTransformation is run on the EL of the server channel
// We have to make sure that we execute child channel initializer on the
// EL of the child channel.
channel.eventLoop.flatSubmit {
childChannelInitializer(channel).map { ($0, channel.eventLoop) }
}
},
postFireChannelReadTransformation: { result, eventLoop in
eventLoop.flatSubmit {
postRegisterTransformation(result, eventLoop)
childChannelInitializer(channel)
}
}
)
@ -597,12 +587,12 @@ extension NIOTSListenerBootstrap {
}
return bindPromise.futureResult
.map { (_) -> NIOAsyncChannel<PostRegistrationTransformationResult, Never> in asyncChannel
.map { (_) -> NIOAsyncChannel<ChannelInitializerResult, Never> in asyncChannel
}
} catch {
return eventLoop.makeFailedFuture(error)
}
}.flatMapError { error -> EventLoopFuture<NIOAsyncChannel<PostRegistrationTransformationResult, Never>> in
}.flatMapError { error -> EventLoopFuture<NIOAsyncChannel<ChannelInitializerResult, Never>> in
serverChannel.close0(error: error, mode: .all, promise: nil)
return eventLoop.makeFailedFuture(error)
}
@ -612,274 +602,4 @@ extension NIOTSListenerBootstrap {
}
}
// MARK: AsyncChannel based bind
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOTSListenerBootstrap {
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
///
/// - Parameters:
/// - host: The host to bind on.
/// - port: The port to bind on.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelConfiguration: The child channel's async channel configuration.
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
host: String,
port: Int,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelConfiguration: NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
return try await self.bind(
host: host,
port: port,
serverBackpressureStrategy: serverBackpressureStrategy
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: childChannelConfiguration
)
}
}
}
/// Bind the `NIOTSListenerChannel` to `address`.
///
/// - Parameters:
/// - address: The `SocketAddress` to bind on.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelConfiguration: The child channel's async channel configuration.
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
to address: SocketAddress,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelConfiguration: NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
return try await self.bind(
to: address,
serverBackpressureStrategy: serverBackpressureStrategy
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: childChannelConfiguration
)
}
}
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
///
/// - Parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelConfiguration: The child channel's async channel configuration.
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
endpoint: NWEndpoint,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelConfiguration: NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
return try await self.bind(
endpoint: endpoint,
serverBackpressureStrategy: serverBackpressureStrategy
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: childChannelConfiguration
)
}
}
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
///
/// - Parameters:
/// - listener: The NWListener to wrap.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelConfiguration: The child channel's async channel configuration.
/// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func withNWListener<ChildChannelInbound: Sendable, ChildChannelOutbound: Sendable>(
_ listener: NWListener,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelConfiguration: NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>.Configuration = .init()
) async throws -> NIOAsyncChannel<NIOAsyncChannel<ChildChannelInbound, ChildChannelOutbound>, Never> {
return try await self.withNWListener(
listener,
serverBackpressureStrategy: serverBackpressureStrategy
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: childChannelConfiguration
)
}
}
}
}
// MARK: Protocol negotiation based bind
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOTSListenerBootstrap {
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
///
/// - Parameters:
/// - host: The host to bind on.
/// - port: The port to bind on.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<Handler: NIOProtocolNegotiationHandler>(
host: String,
port: Int,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
let address = try SocketAddress.makeAddressResolvingHost(host, port: port)
return try await self.bind(
to: address,
serverBackpressureStrategy: serverBackpressureStrategy,
childChannelInitializer: childChannelInitializer
)
}
/// Bind the `NIOTSListenerChannel` to `address`.
///
/// - Parameters:
/// - address: The `SocketAddress` to bind on.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<Handler: NIOProtocolNegotiationHandler>(
to address: SocketAddress,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
return try await self.bind0(
serverBackpressureStrategy: serverBackpressureStrategy,
childChannelInitializer: childChannelInitializer,
registration: { (serverChannel, promise) in
serverChannel.register().whenComplete { result in
switch result {
case .success:
serverChannel.bind(to: address, promise: promise)
case .failure(let error):
promise.fail(error)
}
}
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
///
/// - Parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func bind<Handler: NIOProtocolNegotiationHandler>(
endpoint: NWEndpoint,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
return try await self.bind0(
serverBackpressureStrategy: serverBackpressureStrategy,
childChannelInitializer: childChannelInitializer,
registration: { (serverChannel, promise) in
serverChannel.register().whenComplete { result in
switch result {
case .success:
serverChannel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise)
case .failure(let error):
promise.fail(error)
}
}
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
///
/// - Parameters:
/// - listener: The NWListener to wrap.
/// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel.
/// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating
/// the protocol.
/// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public func withNWListener<Handler: NIOProtocolNegotiationHandler>(
_ listener: NWListener,
serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Handler>
) async throws -> NIOAsyncChannel<Handler.NegotiationResult, Never> {
return try await self.bind0(
serverBackpressureStrategy: serverBackpressureStrategy,
childChannelInitializer: childChannelInitializer,
registration: { (serverChannel, promise) in
serverChannel.registerAlreadyConfigured0(promise: promise)
},
postRegisterTransformation: { handler, eventLoop in
eventLoop.assertInEventLoop()
return handler.protocolNegotiationResult.flatMap { result in
result.resolve(on: eventLoop)
}
}
).get()
}
}
extension NIOProtocolNegotiationResult {
func resolve(on eventLoop: EventLoop) -> EventLoopFuture<NegotiationResult> {
Self.resolve(on: eventLoop, result: self)
}
static func resolve(on eventLoop: EventLoop, result: Self) -> EventLoopFuture<NegotiationResult> {
switch result {
case .finished(let negotiationResult):
return eventLoop.makeSucceededFuture(negotiationResult)
case .deferredResult(let future):
return future.flatMap { result in
return resolve(on: eventLoop, result: result)
}
}
}
}
#endif

View File

@ -182,21 +182,23 @@ final class AsyncChannelBootstrapTests: XCTestCase {
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelOption(ChannelOptions.autoRead, value: true)
.childChannelInitializer { channel in
.bind(
host: "127.0.0.1",
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler())
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: .init(
inboundType: String.self,
outboundType: String.self
)
)
}
}
.bind(
host: "127.0.0.1",
port: 0,
childChannelConfiguration: .init(
inboundType: String.self,
outboundType: String.self
)
)
try await withThrowingTaskGroup(of: Void.self) { group in
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
@ -227,7 +229,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
try! eventLoopGroup.syncShutdownGracefully()
}
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelOption(ChannelOptions.autoRead, value: true)
.bind(
@ -235,7 +237,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureProtocolNegotiationHandlers(channel: channel)
try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -245,9 +247,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
group.addTask {
try await withThrowingTaskGroup(of: Void.self) { group in
for try await childChannel in channel.inboundStream {
for try await negotiationResult in channel.inboundStream {
group.addTask {
switch childChannel {
switch try await negotiationResult.get().waitForFinalResult() {
case .string(let channel):
for try await value in channel.inboundStream {
continuation.yield(.string(value))
@ -267,7 +269,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: channel.channel.localAddress!.port!,
proposedALPN: .string
)
switch stringNegotiationResult {
switch try await stringNegotiationResult.get().waitForFinalResult() {
case .string(let stringChannel):
// This is the actual content
try await stringChannel.outboundWriter.write("hello")
@ -281,7 +283,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: channel.channel.localAddress!.port!,
proposedALPN: .byte
)
switch byteNegotiationResult {
switch try await byteNegotiationResult.get().waitForFinalResult() {
case .string:
preconditionFailure()
case .byte(let byteChannel):
@ -300,7 +302,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
try! eventLoopGroup.syncShutdownGracefully()
}
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelOption(ChannelOptions.autoRead, value: true)
.bind(
@ -308,7 +310,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureNestedProtocolNegotiationHandlers(channel: channel)
try self.configureNestedProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -318,9 +320,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
group.addTask {
try await withThrowingTaskGroup(of: Void.self) { group in
for try await childChannel in channel.inboundStream {
for try await negotiationResult in channel.inboundStream {
group.addTask {
switch childChannel {
switch try await negotiationResult.get().waitForFinalResult() {
case .string(let channel):
for try await value in channel.inboundStream {
continuation.yield(.string(value))
@ -341,7 +343,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
proposedOuterALPN: .string,
proposedInnerALPN: .string
)
switch stringStringNegotiationResult {
switch try await stringStringNegotiationResult.get().waitForFinalResult() {
case .string(let stringChannel):
// This is the actual content
try await stringChannel.outboundWriter.write("hello")
@ -356,7 +358,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
proposedOuterALPN: .byte,
proposedInnerALPN: .string
)
switch byteStringNegotiationResult {
switch try await byteStringNegotiationResult.get().waitForFinalResult() {
case .string(let stringChannel):
// This is the actual content
try await stringChannel.outboundWriter.write("hello")
@ -371,7 +373,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
proposedOuterALPN: .byte,
proposedInnerALPN: .byte
)
switch byteByteNegotiationResult {
switch try await byteByteNegotiationResult.get().waitForFinalResult() {
case .string:
preconditionFailure()
case .byte(let byteChannel):
@ -386,7 +388,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
proposedOuterALPN: .string,
proposedInnerALPN: .byte
)
switch stringByteNegotiationResult {
switch try await stringByteNegotiationResult.get().waitForFinalResult() {
case .string:
preconditionFailure()
case .byte(let byteChannel):
@ -423,7 +425,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
let channels = NIOLockedValueBox<[Channel]>([Channel]())
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
let channel: NIOAsyncChannel<EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>>, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
@ -436,7 +438,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureProtocolNegotiationHandlers(channel: channel)
try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -446,9 +448,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
group.addTask {
try await withThrowingTaskGroup(of: Void.self) { group in
for try await childChannel in channel.inboundStream {
for try await negotiationResult in channel.inboundStream {
group.addTask {
switch childChannel {
switch try await negotiationResult.get().waitForFinalResult() {
case .string(let channel):
for try await value in channel.inboundStream {
continuation.yield(.string(value))
@ -463,15 +465,14 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
}
await XCTAsyncAssertThrowsError(
try await self.makeClientChannelWithProtocolNegotiation(
eventLoopGroup: eventLoopGroup,
port: channel.channel.localAddress!.port!,
proposedALPN: .unknown
)
) { error in
XCTAssertTrue(error is ProtocolNegotiationError)
}
let failedProtocolNegotiation = try await self.makeClientChannelWithProtocolNegotiation(
eventLoopGroup: eventLoopGroup,
port: channel.channel.localAddress!.port!,
proposedALPN: .unknown
)
await XCTAssertThrowsError(
try await failedProtocolNegotiation.get().waitForFinalResult()
)
// Let's check that we can still open a new connection
let stringNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation(
@ -479,7 +480,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: channel.channel.localAddress!.port!,
proposedALPN: .string
)
switch stringNegotiationResult {
switch try await stringNegotiationResult.get().waitForFinalResult() {
case .string(let stringChannel):
// This is the actual content
try await stringChannel.outboundWriter.write("hello")
@ -504,34 +505,36 @@ final class AsyncChannelBootstrapTests: XCTestCase {
private func makeClientChannel(eventLoopGroup: EventLoopGroup, port: Int) async throws -> NIOAsyncChannel<String, String> {
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
.connect(
to: .init(ipAddress: "127.0.0.1", port: port)
) { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler())
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler())
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: .init(
inboundType: String.self,
outboundType: String.self
)
)
}
}
.connect(
to: .init(ipAddress: "127.0.0.1", port: port),
channelConfiguration: .init(
inboundType: String.self,
outboundType: String.self
)
)
}
private func makeClientChannelWithProtocolNegotiation(
eventLoopGroup: EventLoopGroup,
port: Int,
proposedALPN: TLSUserEventHandler.ALPN
) async throws -> NegotiationResult {
) async throws -> EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>> {
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
.connect(
to: .init(ipAddress: "127.0.0.1", port: port)
) { channel in
return channel.eventLoop.makeCompletedFuture {
return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN)
return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN).protocolNegotiationResult
}
}
}
@ -541,7 +544,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: Int,
proposedOuterALPN: TLSUserEventHandler.ALPN,
proposedInnerALPN: TLSUserEventHandler.ALPN
) async throws -> NegotiationResult {
) async throws -> EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>> {
return try await NIOTSConnectionBootstrap(group: eventLoopGroup)
.connect(
to: .init(ipAddress: "127.0.0.1", port: port)
@ -551,7 +554,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
channel: channel,
proposedOuterALPN: proposedOuterALPN,
proposedInnerALPN: proposedInnerALPN
)
).protocolNegotiationResult
}
}
}
@ -585,20 +588,20 @@ final class AsyncChannelBootstrapTests: XCTestCase {
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN))
let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
return NIOProtocolNegotiationResult.deferredResult(negotiationFuture.protocolNegotiationResult)
return NIOProtocolNegotiationResult(deferredResult: negotiationFuture.protocolNegotiationResult)
}
case "byte":
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN))
let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
return NIOProtocolNegotiationResult.deferredResult(negotiationHandler.protocolNegotiationResult)
return NIOProtocolNegotiationResult(deferredResult: negotiationHandler.protocolNegotiationResult)
}
default:
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
return channel.close().flatMapThrowing { throw ProtocolNegotiationError() }
}
case .fallback:
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
return channel.close().flatMapThrowing { throw ProtocolNegotiationError() }
}
}
try channel.pipeline.syncOperations.addHandler(negotiationHandler)
@ -618,7 +621,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
synchronouslyWrapping: channel
)
return NIOProtocolNegotiationResult.finished(NegotiationResult.string(asyncChannel))
return NIOProtocolNegotiationResult(result: NegotiationResult.string(asyncChannel))
}
case "byte":
return channel.eventLoop.makeCompletedFuture {
@ -628,13 +631,13 @@ final class AsyncChannelBootstrapTests: XCTestCase {
synchronouslyWrapping: channel
)
return NIOProtocolNegotiationResult.finished(NegotiationResult.byte(asyncChannel))
return NIOProtocolNegotiationResult(result: NegotiationResult.byte(asyncChannel))
}
default:
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
return channel.close().flatMapThrowing { throw ProtocolNegotiationError() }
}
case .fallback:
return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError())
return channel.close().flatMapThrowing { throw ProtocolNegotiationError() }
}
}
@ -678,7 +681,7 @@ private func XCTAsyncAssertThrowsError<T>(
}
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAsyncAssertThrowsError<T>(
internal func XCTAssertThrowsError<T>(
_ expression: @autoclosure () async throws -> T,
file: StaticString = #filePath,
line: UInt = #line,