From fbc49d892b0a62666475c5f137f6d64994368368 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Wed, 26 Jul 2023 15:48:49 +0100 Subject: [PATCH] Adopt latest SPI(AsyncChannel) changes (#181) # Motivation We introduced some breaking SPI(AsyncChannel) changes in NIO that we have to adopt here. # Modification This PR adopts the latest `NIOProtocolNegotiationResult` APIs. Additionally, it also drops all bind/connect methods on the bootstraps that are specific to protocol negotiation or `NIOAsyncChannel`. # Result Green CI on `main` and alignment between `NIOPosix` and `NIOTS. --- Package.swift | 2 +- .../NIOTSConnectionBootstrap.swift | 299 +---------------- .../NIOTSListenerBootstrap.swift | 306 +----------------- .../NIOTSAsyncBootstrapTests.swift | 119 +++---- 4 files changed, 81 insertions(+), 645 deletions(-) diff --git a/Package.swift b/Package.swift index f8d079b..8444173 100644 --- a/Package.swift +++ b/Package.swift @@ -21,7 +21,7 @@ let package = Package( .library(name: "NIOTransportServices", targets: ["NIOTransportServices"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.57.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), ], diff --git a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift index 3acd8a2..ab97123 100644 --- a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift +++ b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift @@ -328,8 +328,7 @@ extension NIOTSConnectionBootstrap { promise.fail(error) } } - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @@ -380,8 +379,7 @@ extension NIOTSConnectionBootstrap { promise.fail(error) } } - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @@ -403,18 +401,16 @@ extension NIOTSConnectionBootstrap { channelInitializer: channelInitializer, registration: { connectionChannel, promise in connectionChannel.registerAlreadyConfigured0(promise: promise) - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - private func connect0( + private func connect0( existingNWConnection: NWConnection? = nil, channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture, - registration: @escaping (NIOTSConnectionChannel, EventLoopPromise) -> Void, - postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { + registration: @escaping (NIOTSConnectionChannel, EventLoopPromise) -> Void + ) -> EventLoopFuture { let connectionChannel: NIOTSConnectionChannel if let newConnection = existingNWConnection { connectionChannel = NIOTSConnectionChannel( @@ -452,8 +448,6 @@ extension NIOTSConnectionBootstrap { cancelTask.cancel() } return connectPromise.futureResult.map { result } - }.flatMap { (result: ChannelInitializerResult) -> EventLoopFuture in - postRegisterTransformation(result, connectionChannel.eventLoop) }.flatMapErrorThrowing { connectionChannel.close(promise: nil) throw $0 @@ -462,287 +456,6 @@ extension NIOTSConnectionBootstrap { } } -// MARK: Async connect methods with AsyncChannel - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOTSConnectionBootstrap { - /// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - host: The host to connect to. - /// - port: The port to connect to. - /// - channelConfiguration: The channel's async channel configuration. - /// - Returns: A `NIOAsyncChannel` for the established connection. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - host: String, - port: Int, - channelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel { - try await self.connect( - host: host, - port: port - ) { channel in - channel.eventLoop.makeCompletedFuture { - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: channelConfiguration - ) - } - } - } - - /// Specify the `address` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - address: The address to connect to. - /// - channelConfiguration: The channel's async channel configuration. - /// - Returns: A `NIOAsyncChannel` for the established connection. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - to address: SocketAddress, - channelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel { - try await self.connect( - to: address - ) { channel in - channel.eventLoop.makeCompletedFuture { - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: channelConfiguration - ) - } - } - } - - /// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established. - /// - /// - Parameters: - /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. - /// - channelConfiguration: The channel's async channel configuration. - /// - Returns: A `NIOAsyncChannel` for the established connection. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - unixDomainSocketPath: String, - channelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel { - try await self.connect( - unixDomainSocketPath: unixDomainSocketPath - ) { channel in - channel.eventLoop.makeCompletedFuture { - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: channelConfiguration - ) - } - } - } - - /// Specify the `endpoint` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - endpoint: The endpoint to connect to. - /// - channelConfiguration: The channel's async channel configuration. - /// - Returns: A `NIOAsyncChannel` for the established connection. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - endpoint: NWEndpoint, - channelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel { - try await self.connect( - endpoint: endpoint - ) { channel in - channel.eventLoop.makeCompletedFuture { - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: channelConfiguration - ) - } - } - } - - /// Use a pre-existing `NWConnection` to connect a `Channel`. - /// - /// - Parameters: - /// - connection: The `NWConnection` to wrap. - /// - channelConfiguration: The channel's async channel configuration. - /// - Returns: A `NIOAsyncChannel` for the established connection. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func withExistingNWConnection( - _ connection: NWConnection, - channelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel { - try await self.withExistingNWConnection( - connection - ) { channel in - channel.eventLoop.makeCompletedFuture { - return try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: channelConfiguration - ) - } - } - } -} - -// MARK: Async connect methods with protocol negotation - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOTSConnectionBootstrap { - /// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - host: The host to connect to. - /// - port: The port to connect to. - /// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: The protocol negotiation result. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - host: String, - port: Int, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> Handler.NegotiationResult { - let validPortRange = Int(UInt16.min)...Int(UInt16.max) - guard validPortRange.contains(port) else { - throw NIOTSErrors.InvalidPort(port: port) - } - - guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else { - throw NIOTSErrors.InvalidPort(port: port) - } - return try await self.connect( - endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort), - channelInitializer: channelInitializer - ) - } - - /// Specify the `address` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - address: The address to connect to. - /// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: The protocol negotiation result. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - to address: SocketAddress, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> Handler.NegotiationResult { - try await self.connect0( - channelInitializer: channelInitializer, - registration: { connectionChannel, promise in - connectionChannel.register().whenComplete { result in - switch result { - case .success: - connectionChannel.connect(to: address, promise: promise) - case .failure(let error): - promise.fail(error) - } - } - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } - - /// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established. - /// - /// - Parameters: - /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. - /// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: The protocol negotiation result. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - unixDomainSocketPath: String, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> Handler.NegotiationResult { - let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) - return try await self.connect( - to: address, - channelInitializer: channelInitializer - ) - } - - /// Specify the `endpoint` to connect to for the TCP `Channel` that will be established. - /// - /// - Parameters: - /// - endpoint: The endpoint to connect to. - /// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: The protocol negotiation result. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func connect( - endpoint: NWEndpoint, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> Handler.NegotiationResult { - try await self.connect0( - channelInitializer: channelInitializer, - registration: { connectionChannel, promise in - connectionChannel.register().whenComplete { result in - switch result { - case .success: - connectionChannel.triggerUserOutboundEvent( - NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint), - promise: promise - ) - case .failure(let error): - promise.fail(error) - } - } - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } - - /// Use a pre-existing `NWConnection` to connect a `Channel`. - /// - /// - Parameters: - /// - connection: The `NWConnection` to wrap. - /// - channelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: The protocol negotiation result. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func withExistingNWConnection( - _ connection: NWConnection, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> Handler.NegotiationResult { - try await self.connect0( - existingNWConnection: connection, - channelInitializer: channelInitializer, - registration: { connectionChannel, promise in - connectionChannel.registerAlreadyConfigured0(promise: promise) - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } -} - @available(*, unavailable) @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) extension NIOTSConnectionBootstrap: Sendable {} diff --git a/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift index 00599a2..0ea3dbf 100644 --- a/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift +++ b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift @@ -423,8 +423,7 @@ extension NIOTSListenerBootstrap { promise.fail(error) } } - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @@ -455,8 +454,7 @@ extension NIOTSListenerBootstrap { promise.fail(error) } } - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @@ -487,8 +485,7 @@ extension NIOTSListenerBootstrap { promise.fail(error) } } - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @@ -513,19 +510,17 @@ extension NIOTSListenerBootstrap { childChannelInitializer: childChannelInitializer, registration: { (serverChannel, promise) in serverChannel.registerAlreadyConfigured0(promise: promise) - }, - postRegisterTransformation: { $1.makeSucceededFuture($0) } + } ).get() } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - private func bind0( + private func bind0( existingNWListener: NWListener? = nil, serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture, - registration: @escaping (NIOTSListenerChannel, EventLoopPromise) -> Void, - postRegisterTransformation: @escaping @Sendable (ChannelInitializerResult, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture> { + registration: @escaping (NIOTSListenerChannel, EventLoopPromise) -> Void + ) -> EventLoopFuture> { let eventLoop = self.group.next() as! NIOTSEventLoop let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) } let childChannelInit = self.childChannelInit @@ -557,27 +552,22 @@ extension NIOTSListenerBootstrap { return eventLoop.submit { serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap { serverChannelInit(serverChannel) - }.flatMap { (_) -> EventLoopFuture> in + }.flatMap { (_) -> EventLoopFuture> in do { try serverChannel.pipeline.syncOperations.addHandler( AcceptHandler(childChannelInitializer: childChannelInit, childChannelOptions: childChannelOptions), name: "AcceptHandler" ) - let asyncChannel = try NIOAsyncChannel + let asyncChannel = try NIOAsyncChannel .wrapAsyncChannelWithTransformations( synchronouslyWrapping: serverChannel, backpressureStrategy: serverBackpressureStrategy, - channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult, EventLoop)> in + channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult)> in // The channelReadTransformation is run on the EL of the server channel // We have to make sure that we execute child channel initializer on the // EL of the child channel. channel.eventLoop.flatSubmit { - childChannelInitializer(channel).map { ($0, channel.eventLoop) } - } - }, - postFireChannelReadTransformation: { result, eventLoop in - eventLoop.flatSubmit { - postRegisterTransformation(result, eventLoop) + childChannelInitializer(channel) } } ) @@ -597,12 +587,12 @@ extension NIOTSListenerBootstrap { } return bindPromise.futureResult - .map { (_) -> NIOAsyncChannel in asyncChannel + .map { (_) -> NIOAsyncChannel in asyncChannel } } catch { return eventLoop.makeFailedFuture(error) } - }.flatMapError { error -> EventLoopFuture> in + }.flatMapError { error -> EventLoopFuture> in serverChannel.close0(error: error, mode: .all, promise: nil) return eventLoop.makeFailedFuture(error) } @@ -612,274 +602,4 @@ extension NIOTSListenerBootstrap { } } -// MARK: AsyncChannel based bind - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOTSListenerBootstrap { - /// Bind the `NIOTSListenerChannel` to `host` and `port`. - /// - /// - Parameters: - /// - host: The host to bind on. - /// - port: The port to bind on. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelConfiguration: The child channel's async channel configuration. - /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - host: String, - port: Int, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel, Never> { - return try await self.bind( - host: host, - port: port, - serverBackpressureStrategy: serverBackpressureStrategy - ) { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: childChannelConfiguration - ) - } - } - } - - /// Bind the `NIOTSListenerChannel` to `address`. - /// - /// - Parameters: - /// - address: The `SocketAddress` to bind on. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelConfiguration: The child channel's async channel configuration. - /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - to address: SocketAddress, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel, Never> { - return try await self.bind( - to: address, - serverBackpressureStrategy: serverBackpressureStrategy - ) { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: childChannelConfiguration - ) - } - } - } - - /// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`. - /// - /// - Parameters: - /// - endpoint: The `NWEndpoint` to bind this channel to. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelConfiguration: The child channel's async channel configuration. - /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - endpoint: NWEndpoint, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel, Never> { - return try await self.bind( - endpoint: endpoint, - serverBackpressureStrategy: serverBackpressureStrategy - ) { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: childChannelConfiguration - ) - } - } - } - - /// Bind the `NIOTSListenerChannel` to an existing `NWListener`. - /// - /// - Parameters: - /// - listener: The NWListener to wrap. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelConfiguration: The child channel's async channel configuration. - /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func withNWListener( - _ listener: NWListener, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelConfiguration: NIOAsyncChannel.Configuration = .init() - ) async throws -> NIOAsyncChannel, Never> { - return try await self.withNWListener( - listener, - serverBackpressureStrategy: serverBackpressureStrategy - ) { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: childChannelConfiguration - ) - } - } - } -} - -// MARK: Protocol negotiation based bind - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOTSListenerBootstrap { - /// Bind the `NIOTSListenerChannel` to `host` and `port`. - /// - /// - Parameters: - /// - host: The host to bind on. - /// - port: The port to bind on. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - host: String, - port: Int, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> NIOAsyncChannel { - let address = try SocketAddress.makeAddressResolvingHost(host, port: port) - - return try await self.bind( - to: address, - serverBackpressureStrategy: serverBackpressureStrategy, - childChannelInitializer: childChannelInitializer - ) - } - - /// Bind the `NIOTSListenerChannel` to `address`. - /// - /// - Parameters: - /// - address: The `SocketAddress` to bind on. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - to address: SocketAddress, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> NIOAsyncChannel { - return try await self.bind0( - serverBackpressureStrategy: serverBackpressureStrategy, - childChannelInitializer: childChannelInitializer, - registration: { (serverChannel, promise) in - serverChannel.register().whenComplete { result in - switch result { - case .success: - serverChannel.bind(to: address, promise: promise) - case .failure(let error): - promise.fail(error) - } - } - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } - - /// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`. - /// - /// - Parameters: - /// - endpoint: The `NWEndpoint` to bind this channel to. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func bind( - endpoint: NWEndpoint, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> NIOAsyncChannel { - return try await self.bind0( - serverBackpressureStrategy: serverBackpressureStrategy, - childChannelInitializer: childChannelInitializer, - registration: { (serverChannel, promise) in - serverChannel.register().whenComplete { result in - switch result { - case .success: - serverChannel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise) - case .failure(let error): - promise.fail(error) - } - } - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } - - /// Bind the `NIOTSListenerChannel` to an existing `NWListener`. - /// - /// - Parameters: - /// - listener: The NWListener to wrap. - /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelInitializer: A closure to initialize the channel which must return the handler that is used for negotiating - /// the protocol. - /// - Returns: A ``NIOAsyncChannel`` of the protocol negotiation results. - @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @_spi(AsyncChannel) - public func withNWListener( - _ listener: NWListener, - serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture - ) async throws -> NIOAsyncChannel { - return try await self.bind0( - serverBackpressureStrategy: serverBackpressureStrategy, - childChannelInitializer: childChannelInitializer, - registration: { (serverChannel, promise) in - serverChannel.registerAlreadyConfigured0(promise: promise) - }, - postRegisterTransformation: { handler, eventLoop in - eventLoop.assertInEventLoop() - return handler.protocolNegotiationResult.flatMap { result in - result.resolve(on: eventLoop) - } - } - ).get() - } -} - - -extension NIOProtocolNegotiationResult { - func resolve(on eventLoop: EventLoop) -> EventLoopFuture { - Self.resolve(on: eventLoop, result: self) - } - - static func resolve(on eventLoop: EventLoop, result: Self) -> EventLoopFuture { - switch result { - case .finished(let negotiationResult): - return eventLoop.makeSucceededFuture(negotiationResult) - - case .deferredResult(let future): - return future.flatMap { result in - return resolve(on: eventLoop, result: result) - } - } - } -} - #endif diff --git a/Tests/NIOTransportServicesTests/NIOTSAsyncBootstrapTests.swift b/Tests/NIOTransportServicesTests/NIOTSAsyncBootstrapTests.swift index 4a8a10a..39eeb40 100644 --- a/Tests/NIOTransportServicesTests/NIOTSAsyncBootstrapTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSAsyncBootstrapTests.swift @@ -182,21 +182,23 @@ final class AsyncChannelBootstrapTests: XCTestCase { let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.autoRead, value: true) - .childChannelInitializer { channel in + .bind( + host: "127.0.0.1", + port: 0 + ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder())) try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder())) try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler()) + return try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + inboundType: String.self, + outboundType: String.self + ) + ) } } - .bind( - host: "127.0.0.1", - port: 0, - childChannelConfiguration: .init( - inboundType: String.self, - outboundType: String.self - ) - ) try await withThrowingTaskGroup(of: Void.self) { group in let (stream, continuation) = AsyncStream.makeStream() @@ -227,7 +229,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try! eventLoopGroup.syncShutdownGracefully() } - let channel: NIOAsyncChannel = try await NIOTSListenerBootstrap(group: eventLoopGroup) + let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.autoRead, value: true) .bind( @@ -235,7 +237,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult } } @@ -245,9 +247,9 @@ final class AsyncChannelBootstrapTests: XCTestCase { group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in - for try await childChannel in channel.inboundStream { + for try await negotiationResult in channel.inboundStream { group.addTask { - switch childChannel { + switch try await negotiationResult.get().waitForFinalResult() { case .string(let channel): for try await value in channel.inboundStream { continuation.yield(.string(value)) @@ -267,7 +269,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: channel.channel.localAddress!.port!, proposedALPN: .string ) - switch stringNegotiationResult { + switch try await stringNegotiationResult.get().waitForFinalResult() { case .string(let stringChannel): // This is the actual content try await stringChannel.outboundWriter.write("hello") @@ -281,7 +283,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: channel.channel.localAddress!.port!, proposedALPN: .byte ) - switch byteNegotiationResult { + switch try await byteNegotiationResult.get().waitForFinalResult() { case .string: preconditionFailure() case .byte(let byteChannel): @@ -300,7 +302,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try! eventLoopGroup.syncShutdownGracefully() } - let channel: NIOAsyncChannel = try await NIOTSListenerBootstrap(group: eventLoopGroup) + let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.autoRead, value: true) .bind( @@ -308,7 +310,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureNestedProtocolNegotiationHandlers(channel: channel) + try self.configureNestedProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult } } @@ -318,9 +320,9 @@ final class AsyncChannelBootstrapTests: XCTestCase { group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in - for try await childChannel in channel.inboundStream { + for try await negotiationResult in channel.inboundStream { group.addTask { - switch childChannel { + switch try await negotiationResult.get().waitForFinalResult() { case .string(let channel): for try await value in channel.inboundStream { continuation.yield(.string(value)) @@ -341,7 +343,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { proposedOuterALPN: .string, proposedInnerALPN: .string ) - switch stringStringNegotiationResult { + switch try await stringStringNegotiationResult.get().waitForFinalResult() { case .string(let stringChannel): // This is the actual content try await stringChannel.outboundWriter.write("hello") @@ -356,7 +358,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { proposedOuterALPN: .byte, proposedInnerALPN: .string ) - switch byteStringNegotiationResult { + switch try await byteStringNegotiationResult.get().waitForFinalResult() { case .string(let stringChannel): // This is the actual content try await stringChannel.outboundWriter.write("hello") @@ -371,7 +373,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { proposedOuterALPN: .byte, proposedInnerALPN: .byte ) - switch byteByteNegotiationResult { + switch try await byteByteNegotiationResult.get().waitForFinalResult() { case .string: preconditionFailure() case .byte(let byteChannel): @@ -386,7 +388,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { proposedOuterALPN: .string, proposedInnerALPN: .byte ) - switch stringByteNegotiationResult { + switch try await stringByteNegotiationResult.get().waitForFinalResult() { case .string: preconditionFailure() case .byte(let byteChannel): @@ -423,7 +425,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } let channels = NIOLockedValueBox<[Channel]>([Channel]()) - let channel: NIOAsyncChannel = try await NIOTSListenerBootstrap(group: eventLoopGroup) + let channel: NIOAsyncChannel>, Never> = try await NIOTSListenerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .serverChannelInitializer { channel in channel.eventLoop.makeCompletedFuture { @@ -436,7 +438,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult } } @@ -446,9 +448,9 @@ final class AsyncChannelBootstrapTests: XCTestCase { group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in - for try await childChannel in channel.inboundStream { + for try await negotiationResult in channel.inboundStream { group.addTask { - switch childChannel { + switch try await negotiationResult.get().waitForFinalResult() { case .string(let channel): for try await value in channel.inboundStream { continuation.yield(.string(value)) @@ -463,15 +465,14 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - await XCTAsyncAssertThrowsError( - try await self.makeClientChannelWithProtocolNegotiation( - eventLoopGroup: eventLoopGroup, - port: channel.channel.localAddress!.port!, - proposedALPN: .unknown - ) - ) { error in - XCTAssertTrue(error is ProtocolNegotiationError) - } + let failedProtocolNegotiation = try await self.makeClientChannelWithProtocolNegotiation( + eventLoopGroup: eventLoopGroup, + port: channel.channel.localAddress!.port!, + proposedALPN: .unknown + ) + await XCTAssertThrowsError( + try await failedProtocolNegotiation.get().waitForFinalResult() + ) // Let's check that we can still open a new connection let stringNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation( @@ -479,7 +480,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: channel.channel.localAddress!.port!, proposedALPN: .string ) - switch stringNegotiationResult { + switch try await stringNegotiationResult.get().waitForFinalResult() { case .string(let stringChannel): // This is the actual content try await stringChannel.outboundWriter.write("hello") @@ -504,34 +505,36 @@ final class AsyncChannelBootstrapTests: XCTestCase { private func makeClientChannel(eventLoopGroup: EventLoopGroup, port: Int) async throws -> NIOAsyncChannel { return try await NIOTSConnectionBootstrap(group: eventLoopGroup) - .channelInitializer { channel in + .connect( + to: .init(ipAddress: "127.0.0.1", port: port) + ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler()) try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder())) try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder())) try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler()) + return try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + inboundType: String.self, + outboundType: String.self + ) + ) } } - .connect( - to: .init(ipAddress: "127.0.0.1", port: port), - channelConfiguration: .init( - inboundType: String.self, - outboundType: String.self - ) - ) } private func makeClientChannelWithProtocolNegotiation( eventLoopGroup: EventLoopGroup, port: Int, proposedALPN: TLSUserEventHandler.ALPN - ) async throws -> NegotiationResult { + ) async throws -> EventLoopFuture> { return try await NIOTSConnectionBootstrap(group: eventLoopGroup) .connect( to: .init(ipAddress: "127.0.0.1", port: port) ) { channel in return channel.eventLoop.makeCompletedFuture { - return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN).protocolNegotiationResult } } } @@ -541,7 +544,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: Int, proposedOuterALPN: TLSUserEventHandler.ALPN, proposedInnerALPN: TLSUserEventHandler.ALPN - ) async throws -> NegotiationResult { + ) async throws -> EventLoopFuture> { return try await NIOTSConnectionBootstrap(group: eventLoopGroup) .connect( to: .init(ipAddress: "127.0.0.1", port: port) @@ -551,7 +554,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { channel: channel, proposedOuterALPN: proposedOuterALPN, proposedInnerALPN: proposedInnerALPN - ) + ).protocolNegotiationResult } } } @@ -585,20 +588,20 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN)) let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) - return NIOProtocolNegotiationResult.deferredResult(negotiationFuture.protocolNegotiationResult) + return NIOProtocolNegotiationResult(deferredResult: negotiationFuture.protocolNegotiationResult) } case "byte": return channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedInnerALPN)) let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) - return NIOProtocolNegotiationResult.deferredResult(negotiationHandler.protocolNegotiationResult) + return NIOProtocolNegotiationResult(deferredResult: negotiationHandler.protocolNegotiationResult) } default: - return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError()) + return channel.close().flatMapThrowing { throw ProtocolNegotiationError() } } case .fallback: - return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError()) + return channel.close().flatMapThrowing { throw ProtocolNegotiationError() } } } try channel.pipeline.syncOperations.addHandler(negotiationHandler) @@ -618,7 +621,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { synchronouslyWrapping: channel ) - return NIOProtocolNegotiationResult.finished(NegotiationResult.string(asyncChannel)) + return NIOProtocolNegotiationResult(result: NegotiationResult.string(asyncChannel)) } case "byte": return channel.eventLoop.makeCompletedFuture { @@ -628,13 +631,13 @@ final class AsyncChannelBootstrapTests: XCTestCase { synchronouslyWrapping: channel ) - return NIOProtocolNegotiationResult.finished(NegotiationResult.byte(asyncChannel)) + return NIOProtocolNegotiationResult(result: NegotiationResult.byte(asyncChannel)) } default: - return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError()) + return channel.close().flatMapThrowing { throw ProtocolNegotiationError() } } case .fallback: - return channel.eventLoop.makeFailedFuture(ProtocolNegotiationError()) + return channel.close().flatMapThrowing { throw ProtocolNegotiationError() } } } @@ -678,7 +681,7 @@ private func XCTAsyncAssertThrowsError( } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -internal func XCTAsyncAssertThrowsError( +internal func XCTAssertThrowsError( _ expression: @autoclosure () async throws -> T, file: StaticString = #filePath, line: UInt = #line,