Compare commits

..

9 Commits

Author SHA1 Message Date
Johannes Weiss 7c21367b1a
Merge 862378ee24 into 7114435533 2025-05-05 23:27:13 +00:00
Rick Newton-Rogers 7114435533
Drop Swift 5.9 (#235)
Motivation:

Swift 5.9 is no longer supported, we should bump the tools version and
remove it from our CI.

Modifications:

* Bump the Swift tools version to Swift 5.10
* Remove Swift 5.9 jobs where appropriate in main.yml, pull_request.yml

Result:

Code reflects our support window.
2025-05-05 12:13:58 +01:00
Gus Cairo d8443227d1
Change naming of datagram-related types to make them more consistent + fix docs (#233)
### Motivation:

The datagram-related types logically mirror the TCP-based ones
(`NIOTSDatagramListenerBootstrap` vs `NIOTSListenerBootstrap`;
`NIOTSListenerChannel` vs `NIOTSDatagramListenerChannel`;
`NIOTSDatagramChannel` vs `NIOTSDatagramConnectionChannel`;
`NIOTSDatagramBootstrap` vs `NIOTSConnectionBootstrap`).
However, some of the type names could more closely resemble their TCP
counterparts to make it easier to navigate the code/understand what
their purpose is.
Additionally, the docs for some of these types are wrong, as they've
been copy-pasted into the datagram versions without changes.

### Modifications:

There are separate commits for each of the following changes:
- Fix docs for `NIOTSDatagramListenerBootstrap` and rename the file to
match the type name.
- Rename `NIOTSDatagramConnectionChannelTests` to
`NIOTSDatagramBootstrapTests` (to match `NIOTSBootstrapTests`).
- Rename `NIOTSDatagramChannel` to `NIOTSDatagramConnectionChannel` (to
match `NIOTSConnectionChannel`)
- Fix docs for `NIOTSConnectionBootstrap`.
- Rename `NIOTSDatagramBootstrap` to `NIOTSDatagramConnectionBootstrap`
(to match `NIOTSConnectionBootstrap`). This one required a deprecate and
replace since it's a public type.

### Result:

Better docs and more consistency in our type names.
2025-04-29 13:56:21 +01:00
Gus Cairo cd1e89816d
Add `NWParameters` configurator to bootstraps (#230)
Allow users to provide a closure taking an `NWParameters` to customise
them before they're used to create `NWConnection`s.

### Motivation:

`NWParameters` are currently created using the provided TLS and UDP
options, and then passed over to new `NWConnection`s. However, there are
more ways in which `NWParameters` can be customised, so this new API
provides a way for users to do this.

### Modifications:

Introduce new `configureNWParameters` methods to the existing bootstraps
to allow configuring a closure for customising `NWParameters`.

### Result:

Users can now customise the `NWParameters` used to create new
`NWConnection`s.
2025-04-16 14:33:56 +01:00
Rick Newton-Rogers 3d21b85af4
Enable Swift 6.1 jobs in CI (#232)
Motivation:

Swift 6.1 has been released, we should add it to our CI coverage.

Modifications:

Add additional Swift 6.1 jobs where appropriate in main.yml,
pull_request.yml

Result:

Improved test coverage.
2025-04-14 10:52:43 +01:00
Gus Cairo 9eb2ebde13
Fix missing strict concurrency error (#231)
Current strict concurrency checks don't work consistently in Xcode. This
PR adds an additional flag (`-Xfrontend`) to our set of strict
concurrency flags to make sure nothing has been missed when building. It
also fixes an additional error uncovered after adding it.
2025-04-09 15:08:12 +01:00
Gus Cairo 92bb536b7e
Strict concurrency for NIOTransportServices and tests (#228) 2025-04-02 10:54:00 +01:00
Rick Newton-Rogers a9b23220e4
Enable macOS CI on pull requests (#229)
Motivation:

* Improve test coverage

Modifications:

Enable macOS CI to be run on pull request commits and make the use of
the nightly runner pool for main.yml jobs explicit.

Result:

Improved test coverage.
2025-04-01 17:40:20 +01:00
Rick Newton-Rogers a9da7c9aef
Enable macOS CI on merge to main and daily timer (#227)
Enable macOS CI on merge to main and daily timer

### Motivation:

* Improve test coverage
* Check test pass/fail status
* Monitor CI throughput

### Modifications:

Enable macOS CI to be run on all merges to main and on a daily timer.

### Result:

Improved test coverage run out-of-band at the moment so we can get a
feeling for if any changes need to be made in the repo or in the CI
pipelines to ensure timely and stable checks.
2025-03-26 17:01:32 +01:00
29 changed files with 1009 additions and 491 deletions

View File

@ -11,8 +11,15 @@ jobs:
name: Unit tests
uses: apple/swift-nio/.github/workflows/unit_tests.yml@main
with:
linux_5_9_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_0_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
macos-tests:
name: macOS tests
uses: apple/swift-nio/.github/workflows/macos_tests.yml@main
with:
runner_pool: nightly
build_scheme: swift-nio-transport-services-Package

View File

@ -14,12 +14,19 @@ jobs:
name: Unit tests
uses: apple/swift-nio/.github/workflows/unit_tests.yml@main
with:
linux_5_9_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_0_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
cxx-interop:
name: Cxx interop
uses: apple/swift-nio/.github/workflows/cxx_interop.yml@main
macos-tests:
name: macOS tests
uses: apple/swift-nio/.github/workflows/macos_tests.yml@main
with:
runner_pool: general
build_scheme: swift-nio-transport-services-Package

View File

@ -1,4 +1,4 @@
// swift-tools-version:5.8
// swift-tools-version:5.10
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
@ -15,13 +15,31 @@
import PackageDescription
let strictConcurrencyDevelopment = false
let strictConcurrencySettings: [SwiftSetting] = {
var initialSettings: [SwiftSetting] = []
initialSettings.append(contentsOf: [
.enableUpcomingFeature("StrictConcurrency"),
.enableUpcomingFeature("InferSendableFromCaptures"),
])
if strictConcurrencyDevelopment {
// -warnings-as-errors here is a workaround so that IDE-based development can
// get tripped up on -require-explicit-sendable.
initialSettings.append(.unsafeFlags(["-Xfrontend", "-require-explicit-sendable", "-warnings-as-errors"]))
}
return initialSettings
}()
let package = Package(
name: "swift-nio-transport-services",
products: [
.library(name: "NIOTransportServices", targets: ["NIOTransportServices"])
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.62.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
],
targets: [
@ -33,7 +51,8 @@ let package = Package(
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "NIOTLS", package: "swift-nio"),
.product(name: "Atomics", package: "swift-atomics"),
]
],
swiftSettings: strictConcurrencySettings
),
.executableTarget(
name: "NIOTSHTTPClient",
@ -58,7 +77,8 @@ let package = Package(
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "Atomics", package: "swift-atomics"),
]
],
swiftSettings: strictConcurrencySettings
),
]
)

View File

@ -20,11 +20,11 @@ internal class AcceptHandler<ChildChannel: Channel>: ChannelInboundHandler {
typealias InboundIn = ChildChannel
typealias InboundOut = ChildChannel
private let childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?
private let childChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private let childChannelOptions: ChannelOptions.Storage
init(
childChannelInitializer: ((Channel) -> EventLoopFuture<Void>)?,
childChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?,
childChannelOptions: ChannelOptions.Storage
) {
self.childChannelInitializer = childChannelInitializer
@ -35,11 +35,12 @@ internal class AcceptHandler<ChildChannel: Channel>: ChannelInboundHandler {
let newChannel = self.unwrapInboundIn(data)
let childLoop = newChannel.eventLoop
let ctxEventLoop = context.eventLoop
let childInitializer = self.childChannelInitializer ?? { _ in childLoop.makeSucceededFuture(()) }
let childInitializer = self.childChannelInitializer ?? { @Sendable _ in childLoop.makeSucceededFuture(()) }
let childChannelOptions = self.childChannelOptions
@inline(__always)
@Sendable @inline(__always)
func setupChildChannel() -> EventLoopFuture<Void> {
self.childChannelOptions.applyAllChannelOptions(to: newChannel).flatMap { () -> EventLoopFuture<Void> in
childChannelOptions.applyAllChannelOptions(to: newChannel).flatMap { () -> EventLoopFuture<Void> in
childLoop.assertInEventLoop()
return childInitializer(newChannel)
}
@ -48,8 +49,8 @@ internal class AcceptHandler<ChildChannel: Channel>: ChannelInboundHandler {
@inline(__always)
func fireThroughPipeline(_ future: EventLoopFuture<Void>) {
ctxEventLoop.assertInEventLoop()
future.flatMap { (_) -> EventLoopFuture<Void> in
ctxEventLoop.assertInEventLoop()
assert(ctxEventLoop === context.eventLoop)
future.assumeIsolated().flatMap { (_) -> EventLoopFuture<Void> in
guard context.channel.isActive else {
return newChannel.close().flatMapThrowing {
throw ChannelError.ioOnClosedChannel
@ -75,4 +76,7 @@ internal class AcceptHandler<ChildChannel: Channel>: ChannelInboundHandler {
}
}
}
@available(*, unavailable)
extension AcceptHandler: Sendable {}
#endif

View File

@ -17,9 +17,13 @@ import NIOCore
import Dispatch
import Network
/// A `NIOTSDatagramBootstrap` is an easy way to bootstrap a `NIOTSDatagramChannel` when creating network clients.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public typealias NIOTSDatagramBootstrap = NIOTSDatagramConnectionBootstrap
/// A ``NIOTSDatagramConnectionBootstrap`` is an easy way to bootstrap a UDP channel when creating network clients.
///
/// Usually you re-use a `NIOTSDatagramBootstrap` once you set it up, calling `connect` multiple times on the same bootstrap.
/// Usually you re-use a ``NIOTSDatagramConnectionBootstrap`` once you set it up, calling `connect` multiple times on the
/// same bootstrap.
/// This way you ensure that the same `EventLoop`s will be shared across all your connections.
///
/// Example:
@ -29,7 +33,7 @@ import Network
/// defer {
/// try! group.syncShutdownGracefully()
/// }
/// let bootstrap = NIOTSDatagramBootstrap(group: group)
/// let bootstrap = NIOTSDatagramConnectionBootstrap(group: group)
/// .channelInitializer { channel in
/// channel.pipeline.addHandler(MyChannelHandler())
/// }
@ -37,16 +41,17 @@ import Network
/// /* the Channel is now connected */
/// ```
///
/// The connected `NIOTSDatagramChannel` will operate on `ByteBuffer` as inbound and outbound messages.
/// The connected channel will operate on `ByteBuffer` as inbound and outbound messages.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSDatagramBootstrap {
public final class NIOTSDatagramConnectionBootstrap {
private let group: EventLoopGroup
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>)?
private var channelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private var connectTimeout: TimeAmount = TimeAmount.seconds(10)
private var channelOptions = ChannelOptions.Storage()
private var qos: DispatchQoS?
private var udpOptions: NWProtocolUDP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
private var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// Create a `NIOTSDatagramConnectionBootstrap` on the `EventLoopGroup` `group`.
///
@ -72,19 +77,20 @@ public final class NIOTSDatagramBootstrap {
self.init(group: group as EventLoopGroup)
}
/// Initialize the connected `NIOTSDatagramConnectionChannel` with `initializer`. The most common task in initializer is to add
/// Initialize the connected channel with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The connected `Channel` will operate on `ByteBuffer` as inbound and outbound messages.
///
/// - parameters:
/// - handler: A closure that initializes the provided `Channel`.
public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func channelInitializer(_ handler: @Sendable @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.channelInitializer = handler
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSDatagramConnectionChannel`.
/// Specifies a `ChannelOption` to be applied to the channel.
///
/// - parameters:
/// - option: The option to be applied.
@ -132,6 +138,14 @@ public final class NIOTSDatagramBootstrap {
return self
}
/// Customise the `NWParameters` to be used when creating the connection.
public func configureNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.nwParametersConfigurator = configurator
return self
}
/// Specify the `host` and `port` to connect to for the UDP `Channel` that will be established.
///
/// - parameters:
@ -180,17 +194,19 @@ public final class NIOTSDatagramBootstrap {
}
}
private func connect0(_ binder: @escaping (Channel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
let conn: Channel = NIOTSDatagramChannel(
private func connect0(
_ binder: @Sendable @escaping (Channel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<Channel> {
let conn: Channel = NIOTSDatagramConnectionChannel(
eventLoop: self.group.next() as! NIOTSEventLoop,
qos: self.qos,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator
)
let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) }
let channelOptions = self.channelOptions
let initializer = self.channelInitializer ?? { @Sendable _ in conn.eventLoop.makeSucceededFuture(()) }
return conn.eventLoop.submit {
return conn.eventLoop.submit { [channelOptions, connectTimeout] in
channelOptions.applyAllChannelOptions(to: conn).flatMap {
initializer(conn)
}.flatMap {
@ -199,8 +215,8 @@ public final class NIOTSDatagramBootstrap {
}.flatMap {
let connectPromise: EventLoopPromise<Void> = conn.eventLoop.makePromise()
binder(conn, connectPromise)
let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout))
let cancelTask = conn.eventLoop.scheduleTask(in: connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(connectTimeout))
conn.close(promise: nil)
}
@ -215,4 +231,7 @@ public final class NIOTSDatagramBootstrap {
}.flatMap { $0 }
}
}
@available(*, unavailable)
extension NIOTSDatagramConnectionBootstrap: Sendable {}
#endif

View File

@ -24,7 +24,7 @@ import Network
import Security
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
internal final class NIOTSDatagramConnectionChannel: StateManagedNWConnectionChannel {
typealias ActiveSubstate = UDPSubstate
enum UDPSubstate: NWConnectionSubstate {
@ -34,11 +34,11 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
self = .open
}
static func closeInput(state: inout ChannelState<NIOTSDatagramChannel.UDPSubstate>) throws {
static func closeInput(state: inout ChannelState<NIOTSDatagramConnectionChannel.UDPSubstate>) throws {
throw NIOTSErrors.InvalidChannelStateTransition()
}
static func closeOutput(state: inout ChannelState<NIOTSDatagramChannel.UDPSubstate>) throws {
static func closeOutput(state: inout ChannelState<NIOTSDatagramConnectionChannel.UDPSubstate>) throws {
throw NIOTSErrors.InvalidChannelStateTransition()
}
}
@ -140,8 +140,12 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
internal var allowLocalEndpointReuse = false
internal var multipathServiceType: NWParameters.MultipathServiceType = .disabled
internal let nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
var parameters: NWParameters {
NWParameters(dtls: self.tlsOptions, udp: self.udpOptions)
let parameters = NWParameters(dtls: self.tlsOptions, udp: self.udpOptions)
self.nwParametersConfigurator?(parameters)
return parameters
}
var _inboundStreamOpen: Bool {
@ -182,7 +186,8 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: Int = 1,
maximumReceiveLength: Int = 8192,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.tsEventLoop = eventLoop
self.closePromise = eventLoop.makePromise()
@ -192,6 +197,7 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos)
self.udpOptions = udpOptions
self.tlsOptions = tlsOptions
self.nwParametersConfigurator = nwParametersConfigurator
// Must come last, as it requires self to be completely initialized.
self._pipeline = ChannelPipeline(channel: self)
@ -206,7 +212,8 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: Int = 1,
maximumReceiveLength: Int = 8192,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
eventLoop: eventLoop,
@ -215,18 +222,19 @@ internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: minimumIncompleteReceiveLength,
maximumReceiveLength: maximumReceiveLength,
udpOptions: udpOptions,
tlsOptions: tlsOptions
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator
)
self.connection = connection
}
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSDatagramChannel {
extension NIOTSDatagramConnectionChannel {
internal struct SynchronousOptions: NIOSynchronousChannelOptions {
private let channel: NIOTSDatagramChannel
private let channel: NIOTSDatagramConnectionChannel
fileprivate init(channel: NIOTSDatagramChannel) {
fileprivate init(channel: NIOTSDatagramConnectionChannel) {
self.channel = channel
}
@ -243,4 +251,7 @@ extension NIOTSDatagramChannel {
SynchronousOptions(channel: self)
}
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSDatagramConnectionChannel: @unchecked Sendable {}
#endif

View File

@ -17,7 +17,7 @@ import NIOCore
import Dispatch
import Network
/// A ``NIOTSListenerBootstrap`` is an easy way to bootstrap a `NIOTSListenerChannel` when creating network servers.
/// A ``NIOTSDatagramListenerBootstrap`` is an easy way to bootstrap a listener channel when creating network servers.
///
/// Example:
///
@ -26,7 +26,7 @@ import Network
/// defer {
/// try! group.syncShutdownGracefully()
/// }
/// let bootstrap = NIOTSListenerBootstrap(group: group)
/// let bootstrap = NIOTSDatagramListenerBootstrap(group: group)
/// // Specify backlog and enable SO_REUSEADDR for the server itself
/// .serverChannelOption(ChannelOptions.backlog, value: 256)
/// .serverChannelOption(ChannelOptions.socketOption(.reuseaddr), value: 1)
@ -46,28 +46,31 @@ import Network
/// try! channel.closeFuture.wait() // wait forever as we never close the Channel
/// ```
///
/// The `EventLoopFuture` returned by `bind` will fire with a `NIOTSListenerChannel`. This is the channel that owns the
/// listening socket. Each time it accepts a new connection it will fire a `NIOTSConnectionChannel` through the
/// `ChannelPipeline` via `fireChannelRead`: as a result, the `NIOTSListenerChannel` operates on `Channel`s as inbound
/// messages. Outbound messages are not supported on a `NIOTSListenerChannel` which means that each write attempt will
/// fail.
/// The `EventLoopFuture` returned by `bind` will fire with a channel. This is the channel that owns the listening socket. Each
/// time it accepts a new connection it will fire a new child channel for the new connection through the `ChannelPipeline` via
/// `fireChannelRead`: as a result, the listening channel operates on `Channel`s as inbound messages. Outbound messages are
/// not supported on these listening channels, which means that each write attempt will fail.
///
/// Accepted `NIOTSConnectionChannel`s operate on `ByteBuffer` as inbound data, and `IOData` as outbound data.
/// Accepted channels operate on `ByteBuffer` as inbound data, and `IOData` as outbound data.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSDatagramListenerBootstrap {
private let group: EventLoopGroup
private let childGroup: EventLoopGroup
private var serverChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var childChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var serverChannelInit: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private var childChannelInit: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private var serverChannelOptions = ChannelOptions.Storage()
private var childChannelOptions = ChannelOptions.Storage()
private var serverQoS: DispatchQoS?
private var childQoS: DispatchQoS?
private var udpOptions: NWProtocolUDP.Options = .init()
private var childUDPOptions: NWProtocolUDP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
private var childTLSOptions: NWProtocolTLS.Options?
private var bindTimeout: TimeAmount?
private var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
private var childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// Create a ``NIOTSListenerBootstrap`` for the `EventLoopGroup` `group`.
/// Create a ``NIOTSDatagramListenerBootstrap`` for the `EventLoopGroup` `group`.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
@ -78,20 +81,20 @@ public final class NIOTSDatagramListenerBootstrap {
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `NIOTSListenerChannel`.
/// - group: The `EventLoopGroup` to use for the listening channel.
public convenience init(group: EventLoopGroup) {
self.init(group: group, childGroup: group)
}
/// Create a ``NIOTSListenerBootstrap`` for the ``NIOTSEventLoopGroup`` `group`.
/// Create a ``NIOTSDatagramListenerBootstrap`` for the ``NIOTSEventLoopGroup`` `group`.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `NIOTSListenerChannel`.
/// - group: The ``NIOTSEventLoopGroup`` to use for the listening channel.
public convenience init(group: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup)
}
/// Create a ``NIOTSListenerBootstrap``.
/// Create a ``NIOTSDatagramListenerBootstrap``.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
@ -102,9 +105,8 @@ public final class NIOTSDatagramListenerBootstrap {
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The `EventLoopGroup` to use for the `bind` of the listening channel and to accept new child channels with.
/// - childGroup: The `EventLoopGroup` to run the accepted child channels on.
public convenience init(group: EventLoopGroup, childGroup: EventLoopGroup) {
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
preconditionFailure(
@ -117,13 +119,12 @@ public final class NIOTSDatagramListenerBootstrap {
self.init(validatingGroup: group, childGroup: childGroup)!
}
/// Create a ``NIOTSListenerBootstrap`` on the `EventLoopGroup` `group` which accepts `Channel`s on `childGroup`,
/// validating that the `EventLoopGroup`s are compatible with ``NIOTSListenerBootstrap``.
/// Create a ``NIOTSDatagramListenerBootstrap`` on the `EventLoopGroup` `group` which accepts `Channel`s
/// on `childGroup`, validating that the `EventLoopGroup`s are compatible with ``NIOTSDatagramListenerBootstrap``.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The `EventLoopGroup` to use for the `bind` of the listening channel and to accept new child channels with.
/// - childGroup: The `EventLoopGroup` to run the accepted child channels on.
public init?(validatingGroup group: EventLoopGroup, childGroup: EventLoopGroup? = nil) {
let childGroup = childGroup ?? group
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
@ -134,32 +135,33 @@ public final class NIOTSDatagramListenerBootstrap {
self.childGroup = childGroup
}
/// Create a ``NIOTSListenerBootstrap``.
/// Create a ``NIOTSDatagramListenerBootstrap``.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The ``NIOTSEventLoopGroup`` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The ``NIOTSEventLoopGroup`` to use for the `bind` of the listening channel and to accept new child
/// channels with.
/// - childGroup: The ``NIOTSEventLoopGroup`` to run the accepted child channels on.
public convenience init(group: NIOTSEventLoopGroup, childGroup: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup, childGroup: childGroup as EventLoopGroup)
}
/// Initialize the `NIOTSListenerChannel` with `initializer`. The most common task in initializer is to add
/// Initialize the listening channel with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The `NIOTSListenerChannel` uses the accepted `NIOTSConnectionChannel`s as inbound messages.
/// The listening channel uses the accepted child channels as inbound messages.
///
/// > Note: To set the initializer for the accepted `NIOTSConnectionChannel`s, look at
/// ``childChannelInitializer(_:)``.
/// > Note: To set the initializer for the accepted child channels, look at ``childChannelInitializer(_:)``.
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func serverChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func serverChannelInitializer(_ initializer: @Sendable @escaping (Channel) -> EventLoopFuture<Void>) -> Self
{
self.serverChannelInit = initializer
return self
}
/// Initialize the accepted `NIOTSConnectionChannel`s with `initializer`. The most common task in initializer is to add
/// Initialize the accepted child channels with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`. Note that if the `initializer` fails then the error will be
/// fired in the *parent* channel.
///
@ -167,14 +169,15 @@ public final class NIOTSDatagramListenerBootstrap {
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func childChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func childChannelInitializer(_ initializer: @Sendable @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.childChannelInit = initializer
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSListenerChannel`.
/// Specifies a `ChannelOption` to be applied to the listening channel.
///
/// > Note: To specify options for the accepted `NIOTSConnectionChannel`s, look at ``childChannelOption(_:value:)``.
/// > Note: To specify options for the accepted child channels, look at ``childChannelOption(_:value:)``.
///
/// - parameters:
/// - option: The option to be applied.
@ -184,7 +187,7 @@ public final class NIOTSDatagramListenerBootstrap {
return self
}
/// Specifies a `ChannelOption` to be applied to the accepted `NIOTSConnectionChannel`s.
/// Specifies a `ChannelOption` to be applied to the accepted child channels.
///
/// - parameters:
/// - option: The option to be applied.
@ -221,19 +224,47 @@ public final class NIOTSDatagramListenerBootstrap {
return self
}
/// Specifies the TCP options to use on the child `Channel`s.
/// Specifies the TCP options to use on the listener.
public func udpOptions(_ options: NWProtocolUDP.Options) -> Self {
self.udpOptions = options
return self
}
/// Specifies the TLS options to use on the child `Channel`s.
/// Specifies the TCP options to use on the child `Channel`s.
public func childUDPOptions(_ options: NWProtocolUDP.Options) -> Self {
self.childUDPOptions = options
return self
}
/// Specifies the TLS options to use on the listener.
public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self {
self.tlsOptions = options
return self
}
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
/// Specifies the TLS options to use on the child `Channel`s.
public func childTLSOptions(_ options: NWProtocolTLS.Options) -> Self {
self.childTLSOptions = options
return self
}
/// Customise the `NWParameters` to be used when creating the `NWConnection` for the listener.
public func configureNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.nwParametersConfigurator = configurator
return self
}
/// Customise the `NWParameters` to be used when creating the `NWConnection`s for the child `Channel`s.
public func configureChildNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.childNWParametersConfigurator = configurator
return self
}
/// Bind the listening channel to `host` and `port`.
///
/// - parameters:
/// - host: The host to bind on.
@ -257,7 +288,7 @@ public final class NIOTSDatagramListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to `address`.
/// Bind the listening channel to `address`.
///
/// - parameters:
/// - address: The `SocketAddress` to bind on.
@ -267,7 +298,7 @@ public final class NIOTSDatagramListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to a UNIX Domain Socket.
/// Bind the listening channel to a UNIX Domain Socket.
///
/// - parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system.
@ -282,7 +313,7 @@ public final class NIOTSDatagramListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
/// Bind the listening channel to a given `NWEndpoint`.
///
/// - parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
@ -292,7 +323,7 @@ public final class NIOTSDatagramListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
/// Bind the listening channel to an existing `NWListener`.
///
/// - parameters:
/// - listener: The NWListener to wrap.
@ -305,10 +336,13 @@ public final class NIOTSDatagramListenerBootstrap {
private func bind0(
existingNWListener: NWListener? = nil,
shouldRegister: Bool,
_ binder: @escaping (NIOTSDatagramListenerChannel, EventLoopPromise<Void>) -> Void
_ binder: @Sendable @escaping (NIOTSDatagramListenerChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next() as! NIOTSEventLoop
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
let serverChannelInit =
self.serverChannelInit ?? {
@Sendable _ in eventLoop.makeSucceededFuture(())
}
let childChannelInit = self.childChannelInit
let serverChannelOptions = self.serverChannelOptions
let childChannelOptions = self.childChannelOptions
@ -321,10 +355,12 @@ public final class NIOTSDatagramListenerBootstrap {
qos: self.serverQoS,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childUDPOptions: self.udpOptions,
childTLSOptions: self.tlsOptions
childUDPOptions: self.childUDPOptions,
childTLSOptions: self.childTLSOptions,
childNWParametersConfigurator: self.childNWParametersConfigurator
)
} else {
serverChannel = NIOTSDatagramListenerChannel(
@ -332,24 +368,28 @@ public final class NIOTSDatagramListenerBootstrap {
qos: self.serverQoS,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childUDPOptions: self.udpOptions,
childTLSOptions: self.tlsOptions
childUDPOptions: self.childUDPOptions,
childTLSOptions: self.childTLSOptions,
childNWParametersConfigurator: self.childNWParametersConfigurator
)
}
return eventLoop.submit {
return eventLoop.submit { [bindTimeout] in
serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
serverChannelInit(serverChannel)
}.flatMap {
eventLoop.assertInEventLoop()
return serverChannel.pipeline.addHandler(
AcceptHandler<NIOTSDatagramChannel>(
childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions
return eventLoop.makeCompletedFuture {
try serverChannel.pipeline.syncOperations.addHandler(
AcceptHandler<NIOTSDatagramConnectionChannel>(
childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions
)
)
)
}
}.flatMap {
if shouldRegister {
return serverChannel.register()
@ -360,7 +400,7 @@ public final class NIOTSDatagramListenerBootstrap {
let bindPromise = eventLoop.makePromise(of: Void.self)
binder(serverChannel, bindPromise)
if let bindTimeout = self.bindTimeout {
if let bindTimeout = bindTimeout {
let cancelTask = eventLoop.scheduleTask(in: bindTimeout) {
bindPromise.fail(NIOTSErrors.BindTimeout(timeout: bindTimeout))
serverChannel.close(promise: nil)
@ -382,4 +422,7 @@ public final class NIOTSDatagramListenerBootstrap {
}
}
}
@available(*, unavailable)
extension NIOTSDatagramListenerBootstrap: Sendable {}
#endif

View File

@ -22,7 +22,7 @@ import Network
import Atomics
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<NIOTSDatagramChannel> {
internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<NIOTSDatagramConnectionChannel> {
/// The TCP options for this listener.
private var udpOptions: NWProtocolUDP.Options {
get {
@ -81,19 +81,23 @@ internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<N
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childUDPOptions: NWProtocolUDP.Options,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
eventLoop: eventLoop,
protocolOptions: .udp(udpOptions),
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .udp(childUDPOptions),
childTLSOptions: childTLSOptions
childTLSOptions: childTLSOptions,
childNWParametersConfigurator: childNWParametersConfigurator
)
}
@ -104,20 +108,24 @@ internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<N
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childUDPOptions: NWProtocolUDP.Options,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
wrapping: listener,
eventLoop: eventLoop,
protocolOptions: .udp(udpOptions),
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .udp(childUDPOptions),
childTLSOptions: childTLSOptions
childTLSOptions: childTLSOptions,
childNWParametersConfigurator: childNWParametersConfigurator
)
}
@ -127,15 +135,16 @@ internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<N
return
}
let newChannel = NIOTSDatagramChannel(
let newChannel = NIOTSDatagramConnectionChannel(
wrapping: connection,
on: self.childLoopGroup.next() as! NIOTSEventLoop,
parent: self,
udpOptions: self.childUDPOptions,
tlsOptions: self.childTLSOptions
tlsOptions: self.childTLSOptions,
nwParametersConfigurator: self.childNWParametersConfigurator
)
self.pipeline.fireChannelRead(NIOAny(newChannel))
self.pipeline.fireChannelRead(newChannel)
self.pipeline.fireChannelReadComplete()
}

View File

@ -13,11 +13,11 @@
//===----------------------------------------------------------------------===//
#if canImport(Network)
import NIOCore
@preconcurrency import Network
import Network
/// Options that can be set explicitly and only on bootstraps provided by `NIOTransportServices`.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public struct NIOTSChannelOptions {
public struct NIOTSChannelOptions: Sendable {
/// See: ``Types/NIOTSWaitForActivityOption``.
public static let waitForActivity = NIOTSChannelOptions.Types.NIOTSWaitForActivityOption()
@ -32,7 +32,7 @@ public struct NIOTSChannelOptions {
/// See: ``Types/NIOTSMetadataOption``
public static let metadata = {
(definition: NWProtocolDefinition) -> NIOTSChannelOptions.Types.NIOTSMetadataOption in
@Sendable (definition: NWProtocolDefinition) -> NIOTSChannelOptions.Types.NIOTSMetadataOption in
.init(definition: definition)
}
@ -66,7 +66,7 @@ public struct NIOTSChannelOptions {
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSChannelOptions {
/// A namespace for ``NIOTSChannelOptions`` datastructures.
public enum Types {
public enum Types: Sendable {
/// ``NIOTSWaitForActivityOption`` controls whether the `Channel` should wait for connection changes
/// during the connection process if the connection attempt fails. If Network.framework believes that
/// a connection may succeed in future, it may transition into the `.waiting` state. By default, this option

View File

@ -17,9 +17,9 @@ import NIOCore
import Dispatch
import Network
/// A `NIOTSConnectionBootstrap` is an easy way to bootstrap a `NIOTSConnectionChannel` when creating network clients.
/// A ``NIOTSConnectionBootstrap`` is an easy way to bootstrap a channel when creating network clients.
///
/// Usually you re-use a `NIOTSConnectionBootstrap` once you set it up, calling `connect` multiple times on the same bootstrap.
/// Usually you re-use a ``NIOTSConnectionBootstrap`` once you set it up, calling `connect` multiple times on the same bootstrap.
/// This way you ensure that the same `EventLoop`s will be shared across all your connections.
///
/// Example:
@ -37,17 +37,19 @@ import Network
/// /* the Channel is now connected */
/// ```
///
/// The connected `NIOTSConnectionChannel` will operate on `ByteBuffer` as inbound and on `IOData` as outbound messages.
/// The connected channel will operate on `ByteBuffer` as inbound and on `IOData` as outbound messages.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSConnectionBootstrap {
private let group: EventLoopGroup
private var _channelInitializer: ((Channel) -> EventLoopFuture<Void>)
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>) {
private var _channelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)
private var channelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>) {
if let protocolHandlers = self.protocolHandlers {
let channelInitializer = self._channelInitializer
return { channel in
channelInitializer(channel).flatMap {
channel.pipeline.addHandlers(protocolHandlers(), position: .first)
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandlers(protocolHandlers(), position: .first)
}
}
}
} else {
@ -59,11 +61,12 @@ public final class NIOTSConnectionBootstrap {
private var qos: DispatchQoS?
private var tcpOptions: NWProtocolTCP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
private var protocolHandlers: (() -> [ChannelHandler])? = nil
private var protocolHandlers: (@Sendable () -> [ChannelHandler])? = nil
private var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// Create a `NIOTSConnectionBootstrap` on the `EventLoopGroup` `group`.
/// Create a ``NIOTSConnectionBootstrap`` on the `EventLoopGroup` `group`.
///
/// The `EventLoopGroup` `group` must be compatible, otherwise the program will crash. `NIOTSConnectionBootstrap` is
/// The `EventLoopGroup` `group` must be compatible, otherwise the program will crash. ``NIOTSConnectionBootstrap`` is
/// compatible only with ``NIOTSEventLoopGroup`` as well as the `EventLoop`s returned by
/// ``NIOTSEventLoopGroup/next()``. See ``init(validatingGroup:)`` for a fallible initializer for
/// situations where it's impossible to tell ahead of time if the `EventLoopGroup` is compatible or not.
@ -81,7 +84,7 @@ public final class NIOTSConnectionBootstrap {
self.init(validatingGroup: group)!
}
/// Create a `NIOTSConnectionBootstrap` on the ``NIOTSEventLoopGroup`` `group`.
/// Create a ``NIOTSConnectionBootstrap`` on the ``NIOTSEventLoopGroup`` `group`.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use.
@ -89,7 +92,7 @@ public final class NIOTSConnectionBootstrap {
self.init(group: group as EventLoopGroup)
}
/// Create a `NIOTSConnectionBootstrap` on the ``NIOTSEventLoopGroup`` `group`, validating
/// Create a ``NIOTSConnectionBootstrap`` on the ``NIOTSEventLoopGroup`` `group`, validating
/// that the `EventLoopGroup` is compatible with ``NIOTSConnectionBootstrap``.
///
/// - parameters:
@ -104,19 +107,20 @@ public final class NIOTSConnectionBootstrap {
self._channelInitializer = { channel in channel.eventLoop.makeSucceededVoidFuture() }
}
/// Initialize the connected `NIOTSConnectionChannel` with `initializer`. The most common task in initializer is to add
/// Initialize the connected channel with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The connected `Channel` will operate on `ByteBuffer` as inbound and `IOData` as outbound messages.
///
/// - parameters:
/// - handler: A closure that initializes the provided `Channel`.
public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func channelInitializer(_ handler: @escaping @Sendable (Channel) -> EventLoopFuture<Void>) -> Self {
self._channelInitializer = handler
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSConnectionChannel`.
/// Specifies a `ChannelOption` to be applied to the channel.
///
/// - parameters:
/// - option: The option to be applied.
@ -162,6 +166,14 @@ public final class NIOTSConnectionBootstrap {
self.channelOption(NIOTSChannelOptions.multipathServiceType, value: type)
}
/// Customise the `NWParameters` to be used when creating the connection.
public func configureNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.nwParametersConfigurator = configurator
return self
}
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
///
/// - parameters:
@ -229,7 +241,10 @@ public final class NIOTSConnectionBootstrap {
private func connect(
existingNWConnection: NWConnection? = nil,
shouldRegister: Bool,
_ connectAction: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void
_ connectAction: @Sendable @escaping (
NIOTSConnectionChannel,
EventLoopPromise<Void>
) -> Void
) -> EventLoopFuture<Channel> {
let conn: NIOTSConnectionChannel
if let newConnection = existingNWConnection {
@ -237,20 +252,22 @@ public final class NIOTSConnectionBootstrap {
wrapping: newConnection,
on: self.group.next() as! NIOTSEventLoop,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator
)
} else {
conn = NIOTSConnectionChannel(
eventLoop: self.group.next() as! NIOTSEventLoop,
qos: self.qos,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator
)
}
let initializer = self.channelInitializer
let channelOptions = self.channelOptions
return conn.eventLoop.flatSubmit {
return conn.eventLoop.flatSubmit { [connectTimeout] in
channelOptions.applyAllChannelOptions(to: conn).flatMap {
initializer(conn)
}.flatMap {
@ -263,8 +280,8 @@ public final class NIOTSConnectionBootstrap {
}.flatMap {
let connectPromise: EventLoopPromise<Void> = conn.eventLoop.makePromise()
connectAction(conn, connectPromise)
let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout))
let cancelTask = conn.eventLoop.scheduleTask(in: connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(connectTimeout))
conn.close(promise: nil)
}
@ -285,7 +302,8 @@ public final class NIOTSConnectionBootstrap {
/// Per bootstrap, you can only set the `protocolHandlers` once. Typically, `protocolHandlers` are used for the TLS
/// implementation. Most notably, `NIOClientTCPBootstrap`, NIO's "universal bootstrap" abstraction, uses
/// `protocolHandlers` to add the required `ChannelHandler`s for many TLS implementations.
public func protocolHandlers(_ handlers: @escaping () -> [ChannelHandler]) -> Self {
@preconcurrency
public func protocolHandlers(_ handlers: @Sendable @escaping () -> [ChannelHandler]) -> Self {
precondition(self.protocolHandlers == nil, "protocol handlers can only be set once")
self.protocolHandlers = handlers
return self
@ -419,10 +437,10 @@ extension NIOTSConnectionBootstrap {
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private func connect0<ChannelInitializerResult>(
private func connect0<ChannelInitializerResult: Sendable>(
existingNWConnection: NWConnection? = nil,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
registration: @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void
registration: @Sendable @escaping (NIOTSConnectionChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<ChannelInitializerResult> {
let connectionChannel: NIOTSConnectionChannel
if let newConnection = existingNWConnection {
@ -430,30 +448,32 @@ extension NIOTSConnectionBootstrap {
wrapping: newConnection,
on: self.group.next() as! NIOTSEventLoop,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator
)
} else {
connectionChannel = NIOTSConnectionChannel(
eventLoop: self.group.next() as! NIOTSEventLoop,
qos: self.qos,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator
)
}
let channelInitializer = { (channel: Channel) -> EventLoopFuture<ChannelInitializerResult> in
let initializer = self.channelInitializer
return initializer(channel).flatMap { channelInitializer(channel) }
let initializer = self.channelInitializer
let channelInitializer = { @Sendable (channel: Channel) -> EventLoopFuture<ChannelInitializerResult> in
initializer(channel).flatMap { channelInitializer(channel) }
}
let channelOptions = self.channelOptions
return connectionChannel.eventLoop.flatSubmit {
return connectionChannel.eventLoop.flatSubmit { [connectTimeout] in
channelOptions.applyAllChannelOptions(to: connectionChannel).flatMap {
channelInitializer(connectionChannel)
}.flatMap { result -> EventLoopFuture<ChannelInitializerResult> in
let connectPromise: EventLoopPromise<Void> = connectionChannel.eventLoop.makePromise()
registration(connectionChannel, connectPromise)
let cancelTask = connectionChannel.eventLoop.scheduleTask(in: self.connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout))
let cancelTask = connectionChannel.eventLoop.scheduleTask(in: connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(connectTimeout))
connectionChannel.close(promise: nil)
}

View File

@ -164,8 +164,12 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
/// An `EventLoopPromise` that will be succeeded or failed when a connection attempt succeeds or fails.
internal var connectPromise: EventLoopPromise<Void>?
internal let nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
internal var parameters: NWParameters {
NWParameters(tls: self.tlsOptions, tcp: self.tcpOptions)
let parameters = NWParameters(tls: self.tlsOptions, tcp: self.tcpOptions)
self.nwParametersConfigurator?(parameters)
return parameters
}
/// The TCP options for this connection.
@ -242,7 +246,8 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: Int = 1,
maximumReceiveLength: Int = 8192,
tcpOptions: NWProtocolTCP.Options,
tlsOptions: NWProtocolTLS.Options?
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.tsEventLoop = eventLoop
self.closePromise = eventLoop.makePromise()
@ -252,6 +257,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos)
self.tcpOptions = tcpOptions
self.tlsOptions = tlsOptions
self.nwParametersConfigurator = nwParametersConfigurator
// Must come last, as it requires self to be completely initialized.
self._pipeline = ChannelPipeline(channel: self)
@ -266,7 +272,8 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: Int = 1,
maximumReceiveLength: Int = 8192,
tcpOptions: NWProtocolTCP.Options,
tlsOptions: NWProtocolTLS.Options?
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
eventLoop: eventLoop,
@ -275,7 +282,8 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
minimumIncompleteReceiveLength: minimumIncompleteReceiveLength,
maximumReceiveLength: maximumReceiveLength,
tcpOptions: tcpOptions,
tlsOptions: tlsOptions
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator
)
self.connection = connection
}
@ -406,7 +414,7 @@ extension NIOTSConnectionChannel {
// APIs.
var buffer = self.allocator.buffer(capacity: content.count)
buffer.writeBytes(content)
self.pipeline.fireChannelRead(NIOAny(buffer))
self.pipeline.fireChannelRead(buffer)
self.pipeline.fireChannelReadComplete()
}
@ -568,4 +576,7 @@ extension Channel {
}
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSConnectionChannel: @unchecked Sendable {}
#endif

View File

@ -21,7 +21,7 @@ import NIOCore
public protocol NIOTSError: Error, Equatable {}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public enum NIOTSErrors {
public enum NIOTSErrors: Sendable {
/// ``InvalidChannelStateTransition`` is thrown when a channel has been asked to do something
/// that is incompatible with its current channel state: e.g. attempting to register an
/// already registered channel.

View File

@ -13,7 +13,11 @@
//===----------------------------------------------------------------------===//
#if canImport(Network)
import Dispatch
#if swift(<6.1)
@preconcurrency import class Dispatch.DispatchSource
#else
import class Dispatch.DispatchSource
#endif
import Foundation
import Network
@ -28,11 +32,17 @@ import NIOConcurrencyHelpers
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public protocol QoSEventLoop: EventLoop {
/// Submit a given task to be executed by the `EventLoop` at a given `qos`.
func execute(qos: DispatchQoS, _ task: @escaping () -> Void)
@preconcurrency
func execute(qos: DispatchQoS, _ task: @escaping @Sendable () -> Void)
/// Schedule a `task` that is executed by this `NIOTSEventLoop` after the given amount of time at the
/// given `qos`.
func scheduleTask<T>(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled<T>
@preconcurrency
func scheduleTask<T>(
in time: TimeAmount,
qos: DispatchQoS,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T>
}
/// The lifecycle state of a given event loop.
@ -49,8 +59,9 @@ private enum LifecycleState {
case closed
}
// It's okay for NIOTSEventLoop to be unchecked Sendable, since the state is isolated to the EL.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal class NIOTSEventLoop: QoSEventLoop {
internal final class NIOTSEventLoop: QoSEventLoop, @unchecked Sendable {
private let loop: DispatchQueue
private let taskQueue: DispatchQueue
private let inQueueKey: DispatchSpecificKey<UUID>
@ -114,23 +125,27 @@ internal class NIOTSEventLoop: QoSEventLoop {
loop.setSpecific(key: inQueueKey, value: self.loopID)
}
public func execute(_ task: @escaping () -> Void) {
@preconcurrency
public func execute(_ task: @escaping @Sendable () -> Void) {
self.execute(qos: self.defaultQoS, task)
}
public func execute(qos: DispatchQoS, _ task: @escaping () -> Void) {
@preconcurrency
public func execute(qos: DispatchQoS, _ task: @escaping @Sendable () -> Void) {
// Ideally we'd not accept new work while closed. Sadly, that's not possible with the current APIs for this.
self.taskQueue.async(qos: qos, execute: task)
}
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
@preconcurrency
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping @Sendable () throws -> T) -> Scheduled<T> {
self.scheduleTask(deadline: deadline, qos: self.defaultQoS, task)
}
@preconcurrency
public func scheduleTask<T>(
deadline: NIODeadline,
qos: DispatchQoS,
_ task: @escaping () throws -> T
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
let p: EventLoopPromise<T> = self.makePromise()
@ -143,11 +158,12 @@ internal class NIOTSEventLoop: QoSEventLoop {
p.fail(EventLoopError.shutdown)
return
}
do {
p.succeed(try task())
} catch {
p.fail(error)
}
p.assumeIsolated().completeWith(
Result {
try task()
}
)
}
timerSource.resume()
@ -165,16 +181,25 @@ internal class NIOTSEventLoop: QoSEventLoop {
)
}
public func scheduleTask<T>(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
@preconcurrency
public func scheduleTask<T>(
in time: TimeAmount,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
self.scheduleTask(in: time, qos: self.defaultQoS, task)
}
public func scheduleTask<T>(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled<T>
{
@preconcurrency
public func scheduleTask<T>(
in time: TimeAmount,
qos: DispatchQoS,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
self.scheduleTask(deadline: NIODeadline.now() + time, qos: qos, task)
}
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
@preconcurrency
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) {
guard self.canBeShutDownIndividually else {
// The loops cannot be shut down by individually. They need to be shut down as a group and
// `NIOTSEventLoopGroup` calls `closeGently` not this method.

View File

@ -82,7 +82,8 @@ public final class NIOTSEventLoopGroup: EventLoopGroup {
}
/// Shuts down all of the event loops, rendering them unable to perform further work.
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
@preconcurrency
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) {
guard self.canBeShutDown else {
queue.async {
callback(EventLoopError.unsupportedOperation)
@ -91,19 +92,19 @@ public final class NIOTSEventLoopGroup: EventLoopGroup {
}
let g = DispatchGroup()
let q = DispatchQueue(label: "nio.transportservices.shutdowngracefullyqueue", target: queue)
var error: Error? = nil
let error: NIOLockedValueBox<Error?> = .init(nil)
for loop in self.eventLoops {
g.enter()
loop.closeGently().recover { err in
q.sync { error = err }
q.sync { error.withLockedValue({ $0 = err }) }
}.whenComplete { (_: Result<Void, Error>) in
g.leave()
}
}
g.notify(queue: q) {
callback(error)
callback(error.withLockedValue({ $0 }))
}
}
@ -145,4 +146,7 @@ public struct NIOTSClientTLSProvider: NIOClientTLSProvider {
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSEventLoopGroup: @unchecked Sendable {}
@available(*, unavailable)
extension NIOTSClientTLSProvider: Sendable {}
#endif

View File

@ -17,7 +17,7 @@ import NIOCore
import Dispatch
import Network
/// A ``NIOTSListenerBootstrap`` is an easy way to bootstrap a `NIOTSListenerChannel` when creating network servers.
/// A ``NIOTSListenerBootstrap`` is an easy way to bootstrap a listener channel when creating network servers.
///
/// Example:
///
@ -46,26 +46,29 @@ import Network
/// try! channel.closeFuture.wait() // wait forever as we never close the Channel
/// ```
///
/// The `EventLoopFuture` returned by `bind` will fire with a `NIOTSListenerChannel`. This is the channel that owns the
/// listening socket. Each time it accepts a new connection it will fire a `NIOTSConnectionChannel` through the
/// `ChannelPipeline` via `fireChannelRead`: as a result, the `NIOTSListenerChannel` operates on `Channel`s as inbound
/// messages. Outbound messages are not supported on a `NIOTSListenerChannel` which means that each write attempt will
/// fail.
/// The `EventLoopFuture` returned by `bind` will fire with a channel. This is the channel that owns the listening socket. Each
/// time it accepts a new connection it will fire a new child channel for the new connection through the `ChannelPipeline` via
/// `fireChannelRead`: as a result, the listening channel operates on `Channel`s as inbound messages. Outbound messages are
/// not supported on these listening channels, which means that each write attempt will fail.
///
/// Accepted `NIOTSConnectionChannel`s operate on `ByteBuffer` as inbound data, and `IOData` as outbound data.
/// Accepted channels operate on `ByteBuffer` as inbound data, and `IOData` as outbound data.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSListenerBootstrap {
private let group: EventLoopGroup
private let childGroup: EventLoopGroup
private var serverChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var childChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var serverChannelInit: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private var childChannelInit: (@Sendable (Channel) -> EventLoopFuture<Void>)?
private var serverChannelOptions = ChannelOptions.Storage()
private var childChannelOptions = ChannelOptions.Storage()
private var serverQoS: DispatchQoS?
private var childQoS: DispatchQoS?
private var tcpOptions: NWProtocolTCP.Options = .init()
private var childTCPOptions: NWProtocolTCP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
private var childTLSOptions: NWProtocolTLS.Options?
private var bindTimeout: TimeAmount?
private var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
private var childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// Create a ``NIOTSListenerBootstrap`` for the `EventLoopGroup` `group`.
///
@ -78,7 +81,7 @@ public final class NIOTSListenerBootstrap {
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `NIOTSListenerChannel`.
/// - group: The `EventLoopGroup` to use for the listening channel.
public convenience init(group: EventLoopGroup) {
self.init(group: group, childGroup: group)
}
@ -86,7 +89,7 @@ public final class NIOTSListenerBootstrap {
/// Create a ``NIOTSListenerBootstrap`` for the ``NIOTSEventLoopGroup`` `group`.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `NIOTSListenerChannel`.
/// - group: The ``NIOTSEventLoopGroup`` to use for the listening channel.
public convenience init(group: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup)
}
@ -102,9 +105,9 @@ public final class NIOTSListenerBootstrap {
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The `EventLoopGroup` to use for the `bind` of the listening channel
/// and to accept new child channels with.
/// - childGroup: The `EventLoopGroup` to run the accepted child channels on.
public convenience init(group: EventLoopGroup, childGroup: EventLoopGroup) {
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
preconditionFailure(
@ -121,9 +124,9 @@ public final class NIOTSListenerBootstrap {
/// validating that the `EventLoopGroup`s are compatible with ``NIOTSListenerBootstrap``.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The `EventLoopGroup` to use for the `bind` of the listening channel
/// and to accept new child channels with.
/// - childGroup: The `EventLoopGroup` to run the accepted child channels on.
public init?(validatingGroup group: EventLoopGroup, childGroup: EventLoopGroup? = nil) {
let childGroup = childGroup ?? group
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
@ -140,29 +143,31 @@ public final class NIOTSListenerBootstrap {
/// Create a ``NIOTSListenerBootstrap``.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The ``NIOTSEventLoopGroup`` to run the accepted `NIOTSConnectionChannel`s on.
/// - group: The ``NIOTSEventLoopGroup`` to use for the `bind` of the listening channel
/// and to accept new child channels with.
/// - childGroup: The ``NIOTSEventLoopGroup`` to run the accepted child channels on.
public convenience init(group: NIOTSEventLoopGroup, childGroup: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup, childGroup: childGroup as EventLoopGroup)
}
/// Initialize the `NIOTSListenerChannel` with `initializer`. The most common task in initializer is to add
/// Initialize the listening channel with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The `NIOTSListenerChannel` uses the accepted `NIOTSConnectionChannel`s as inbound messages.
/// The listening channel uses the accepted child channels as inbound messages.
///
/// > Note: To set the initializer for the accepted `NIOTSConnectionChannel`s, look at
/// > Note: To set the initializer for the accepted child channels, look at
/// ``childChannelInitializer(_:)``.
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func serverChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func serverChannelInitializer(_ initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>) -> Self
{
self.serverChannelInit = initializer
return self
}
/// Initialize the accepted `NIOTSConnectionChannel`s with `initializer`. The most common task in initializer is to add
/// Initialize the accepted child channels with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`. Note that if the `initializer` fails then the error will be
/// fired in the *parent* channel.
///
@ -170,14 +175,15 @@ public final class NIOTSListenerBootstrap {
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func childChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
@preconcurrency
public func childChannelInitializer(_ initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>) -> Self {
self.childChannelInit = initializer
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSListenerChannel`.
/// Specifies a `ChannelOption` to be applied to the listening channel.
///
/// > Note: To specify options for the accepted `NIOTSConnectionChannel`s, look at ``childChannelOption(_:value:)``.
/// > Note: To specify options for the accepted child channels, look at ``childChannelOption(_:value:)``.
///
/// - parameters:
/// - option: The option to be applied.
@ -187,7 +193,7 @@ public final class NIOTSListenerBootstrap {
return self
}
/// Specifies a `ChannelOption` to be applied to the accepted `NIOTSConnectionChannel`s.
/// Specifies a `ChannelOption` to be applied to the accepted child channels.
///
/// - parameters:
/// - option: The option to be applied.
@ -224,18 +230,46 @@ public final class NIOTSListenerBootstrap {
return self
}
/// Specifies the TCP options to use on the child `Channel`s.
/// Specifies the TCP options to use on the listener.
public func tcpOptions(_ options: NWProtocolTCP.Options) -> Self {
self.tcpOptions = options
return self
}
/// Specifies the TLS options to use on the child `Channel`s.
/// Specifies the TCP options to use on the child `Channel`s.
public func childTCPOptions(_ options: NWProtocolTCP.Options) -> Self {
self.childTCPOptions = options
return self
}
/// Specifies the TLS options to use on the listener.
public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self {
self.tlsOptions = options
return self
}
/// Specifies the TLS options to use on the child `Channel`s.
public func childTLSOptions(_ options: NWProtocolTLS.Options) -> Self {
self.childTLSOptions = options
return self
}
/// Customise the `NWParameters` to be used when creating the `NWConnection` for the listener.
public func configureNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.nwParametersConfigurator = configurator
return self
}
/// Customise the `NWParameters` to be used when creating the `NWConnection`s for the child `Channel`s.
public func configureChildNWParameters(
_ configurator: @Sendable @escaping (NWParameters) -> Void
) -> Self {
self.childNWParametersConfigurator = configurator
return self
}
/// Specifies a type of Multipath service to use for this listener, instead of the default
/// service type for the event loop.
///
@ -246,7 +280,7 @@ public final class NIOTSListenerBootstrap {
self.serverChannelOption(NIOTSChannelOptions.multipathServiceType, value: type)
}
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
/// Bind the listening channel to `host` and `port`.
///
/// - parameters:
/// - host: The host to bind on.
@ -270,7 +304,7 @@ public final class NIOTSListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to `address`.
/// Bind the listening channel to `address`.
///
/// - parameters:
/// - address: The `SocketAddress` to bind on.
@ -280,7 +314,7 @@ public final class NIOTSListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to a UNIX Domain Socket.
/// Bind the listening channel to a UNIX Domain Socket.
///
/// - parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system.
@ -295,7 +329,7 @@ public final class NIOTSListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
/// Bind the listening channel to a given `NWEndpoint`.
///
/// - parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
@ -305,7 +339,7 @@ public final class NIOTSListenerBootstrap {
}
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
/// Bind the listening channel to an existing `NWListener`.
///
/// - parameters:
/// - listener: The NWListener to wrap.
@ -318,10 +352,10 @@ public final class NIOTSListenerBootstrap {
private func bind0(
existingNWListener: NWListener? = nil,
shouldRegister: Bool,
_ binder: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void
_ binder: @escaping @Sendable (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next() as! NIOTSEventLoop
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
let serverChannelInit = self.serverChannelInit ?? { @Sendable _ in eventLoop.makeSucceededFuture(()) }
let childChannelInit = self.childChannelInit
let serverChannelOptions = self.serverChannelOptions
let childChannelOptions = self.childChannelOptions
@ -334,10 +368,12 @@ public final class NIOTSListenerBootstrap {
qos: self.serverQoS,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childTCPOptions: self.tcpOptions,
childTLSOptions: self.tlsOptions
childTCPOptions: self.childTCPOptions,
childTLSOptions: self.childTLSOptions,
childNWParametersConfigurator: self.childNWParametersConfigurator
)
} else {
serverChannel = NIOTSListenerChannel(
@ -345,24 +381,28 @@ public final class NIOTSListenerBootstrap {
qos: self.serverQoS,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childTCPOptions: self.tcpOptions,
childTLSOptions: self.tlsOptions
childTCPOptions: self.childTCPOptions,
childTLSOptions: self.childTLSOptions,
childNWParametersConfigurator: self.childNWParametersConfigurator
)
}
return eventLoop.submit {
return eventLoop.submit { [bindTimeout] in
serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
serverChannelInit(serverChannel)
}.flatMap {
eventLoop.assertInEventLoop()
return serverChannel.pipeline.addHandler(
AcceptHandler<NIOTSConnectionChannel>(
childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions
return eventLoop.makeCompletedFuture {
try serverChannel.pipeline.syncOperations.addHandler(
AcceptHandler<NIOTSConnectionChannel>(
childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions
)
)
)
}
}.flatMap {
if shouldRegister {
return serverChannel.register()
@ -373,7 +413,7 @@ public final class NIOTSListenerBootstrap {
let bindPromise = eventLoop.makePromise(of: Void.self)
binder(serverChannel, bindPromise)
if let bindTimeout = self.bindTimeout {
if let bindTimeout = bindTimeout {
let cancelTask = eventLoop.scheduleTask(in: bindTimeout) {
bindPromise.fail(NIOTSErrors.BindTimeout(timeout: bindTimeout))
serverChannel.close(promise: nil)
@ -400,7 +440,7 @@ public final class NIOTSListenerBootstrap {
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOTSListenerBootstrap {
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
/// Bind the listening channel to `host` and `port`.
///
/// - Parameters:
/// - host: The host to bind on.
@ -445,7 +485,7 @@ extension NIOTSListenerBootstrap {
).get()
}
/// Bind the `NIOTSListenerChannel` to `address`.
/// Bind the listening channel to `address`.
///
/// - Parameters:
/// - address: The `SocketAddress` to bind on.
@ -475,7 +515,7 @@ extension NIOTSListenerBootstrap {
).get()
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
/// Bind the listening channel to a given `NWEndpoint`.
///
/// - Parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
@ -508,7 +548,7 @@ extension NIOTSListenerBootstrap {
).get()
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
/// Bind the listening channel to an existing `NWListener`.
///
/// - Parameters:
/// - listener: The NWListener to wrap.
@ -537,7 +577,7 @@ extension NIOTSListenerBootstrap {
existingNWListener: NWListener? = nil,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
registration: @escaping (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void
registration: @escaping @Sendable (NIOTSListenerChannel, EventLoopPromise<Void>) -> Void
) -> EventLoopFuture<NIOAsyncChannel<ChannelInitializerResult, Never>> {
let eventLoop = self.group.next() as! NIOTSEventLoop
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
@ -553,10 +593,12 @@ extension NIOTSListenerBootstrap {
qos: self.serverQoS,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childTCPOptions: self.tcpOptions,
childTLSOptions: self.tlsOptions
childTLSOptions: self.tlsOptions,
childNWParametersConfigurator: self.nwParametersConfigurator
)
} else {
serverChannel = NIOTSListenerChannel(
@ -564,14 +606,16 @@ extension NIOTSListenerBootstrap {
qos: self.serverQoS,
tcpOptions: self.tcpOptions,
tlsOptions: self.tlsOptions,
nwParametersConfigurator: self.nwParametersConfigurator,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childTCPOptions: self.tcpOptions,
childTLSOptions: self.tlsOptions
childTLSOptions: self.tlsOptions,
childNWParametersConfigurator: self.nwParametersConfigurator
)
}
return eventLoop.submit {
return eventLoop.submit { [bindTimeout] in
serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
serverChannelInit(serverChannel)
}.flatMap { (_) -> EventLoopFuture<NIOAsyncChannel<ChannelInitializerResult, Never>> in
@ -585,7 +629,7 @@ extension NIOTSListenerBootstrap {
)
let asyncChannel = try NIOAsyncChannel<ChannelInitializerResult, Never>
._wrapAsyncChannelWithTransformations(
synchronouslyWrapping: serverChannel,
wrappingChannelSynchronously: serverChannel,
backPressureStrategy: serverBackPressureStrategy,
channelReadTransformation: { channel -> EventLoopFuture<(ChannelInitializerResult)> in
// The channelReadTransformation is run on the EL of the server channel
@ -600,7 +644,7 @@ extension NIOTSListenerBootstrap {
let bindPromise = eventLoop.makePromise(of: Void.self)
registration(serverChannel, bindPromise)
if let bindTimeout = self.bindTimeout {
if let bindTimeout = bindTimeout {
let cancelTask = eventLoop.scheduleTask(in: bindTimeout) {
bindPromise.fail(NIOTSErrors.BindTimeout(timeout: bindTimeout))
serverChannel.close(promise: nil)
@ -627,4 +671,6 @@ extension NIOTSListenerBootstrap {
}
}
@available(*, unavailable)
extension NIOTSListenerBootstrap: Sendable {}
#endif

View File

@ -81,19 +81,23 @@ internal final class NIOTSListenerChannel: StateManagedListenerChannel<NIOTSConn
qos: DispatchQoS? = nil,
tcpOptions: NWProtocolTCP.Options,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childTCPOptions: NWProtocolTCP.Options,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
eventLoop: eventLoop,
protocolOptions: .tcp(tcpOptions),
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .tcp(childTCPOptions),
childTLSOptions: childTLSOptions
childTLSOptions: childTLSOptions,
childNWParametersConfigurator: childNWParametersConfigurator
)
}
@ -104,10 +108,12 @@ internal final class NIOTSListenerChannel: StateManagedListenerChannel<NIOTSConn
qos: DispatchQoS? = nil,
tcpOptions: NWProtocolTCP.Options,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childTCPOptions: NWProtocolTCP.Options,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
wrapping: listener,
@ -115,10 +121,12 @@ internal final class NIOTSListenerChannel: StateManagedListenerChannel<NIOTSConn
qos: qos,
protocolOptions: .tcp(tcpOptions),
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .tcp(childTCPOptions),
childTLSOptions: childTLSOptions
childTLSOptions: childTLSOptions,
childNWParametersConfigurator: childNWParametersConfigurator
)
}
@ -134,10 +142,11 @@ internal final class NIOTSListenerChannel: StateManagedListenerChannel<NIOTSConn
parent: self,
qos: self.childChannelQoS,
tcpOptions: self.childTCPOptions,
tlsOptions: self.childTLSOptions
tlsOptions: self.childTLSOptions,
nwParametersConfigurator: self.childNWParametersConfigurator
)
self.pipeline.fireChannelRead(NIOAny(newChannel))
self.pipeline.fireChannelRead(newChannel)
self.pipeline.fireChannelReadComplete()
}

View File

@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//
#if canImport(Network)
@preconcurrency import Network
import Network
import NIOCore
/// A tag protocol that can be used to cover all network events emitted by `NIOTransportServices`.
@ -23,7 +23,7 @@ import NIOCore
public protocol NIOTSNetworkEvent: Equatable, _NIOPreconcurrencySendable {}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public enum NIOTSNetworkEvents {
public enum NIOTSNetworkEvents: Sendable {
/// ``BetterPathAvailable`` is fired whenever the OS has informed NIO that there is a better
/// path available to the endpoint that this `Channel` is currently connected to,
/// e.g. the current connection is using an expensive cellular connection and

View File

@ -67,6 +67,9 @@ internal class StateManagedListenerChannel<ChildChannel: StateManagedChannel>: S
/// The TLS options for this listener.
internal let tlsOptions: NWProtocolTLS.Options?
/// A customization point for this listener's `NWParameters`.
internal let nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// The `DispatchQueue` that socket events for this connection will be dispatched onto.
internal let connectionQueue: DispatchQueue
@ -113,6 +116,9 @@ internal class StateManagedListenerChannel<ChildChannel: StateManagedChannel>: S
/// The TLS options to use for child channels.
internal let childTLSOptions: NWProtocolTLS.Options?
/// A customization point for each child's `NWParameters`.
internal let childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
/// The cache of the local and remote socket addresses. Must be accessed using _addressCacheLock.
internal var addressCache = AddressCache(local: nil, remote: nil)
@ -130,20 +136,24 @@ internal class StateManagedListenerChannel<ChildChannel: StateManagedChannel>: S
qos: DispatchQoS? = nil,
protocolOptions: ProtocolOptions,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childProtocolOptions: ProtocolOptions,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.tsEventLoop = eventLoop
self.closePromise = eventLoop.makePromise()
self.connectionQueue = eventLoop.channelQueue(label: "nio.transportservices.listenerchannel", qos: qos)
self.protocolOptions = protocolOptions
self.tlsOptions = tlsOptions
self.nwParametersConfigurator = nwParametersConfigurator
self.childLoopGroup = childLoopGroup
self.childChannelQoS = childChannelQoS
self.childProtocolOptions = childProtocolOptions
self.childTLSOptions = childTLSOptions
self.childNWParametersConfigurator = childNWParametersConfigurator
// Must come last, as it requires self to be completely initialized.
self._pipeline = ChannelPipeline(channel: self)
@ -155,20 +165,24 @@ internal class StateManagedListenerChannel<ChildChannel: StateManagedChannel>: S
qos: DispatchQoS? = nil,
protocolOptions: ProtocolOptions,
tlsOptions: NWProtocolTLS.Options?,
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childProtocolOptions: ProtocolOptions,
childTLSOptions: NWProtocolTLS.Options?
childTLSOptions: NWProtocolTLS.Options?,
childNWParametersConfigurator: (@Sendable (NWParameters) -> Void)?
) {
self.init(
eventLoop: eventLoop,
qos: qos,
protocolOptions: protocolOptions,
tlsOptions: tlsOptions,
nwParametersConfigurator: nwParametersConfigurator,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: childProtocolOptions,
childTLSOptions: childTLSOptions
childTLSOptions: childTLSOptions,
childNWParametersConfigurator: childNWParametersConfigurator
)
self.nwListener = listener
}
@ -398,6 +412,8 @@ extension StateManagedListenerChannel {
parameters.multipathServiceType = self.multipathServiceType
self.nwParametersConfigurator?(parameters)
let listener: NWListener
do {
listener = try NWListener(using: parameters)
@ -533,4 +549,9 @@ extension StateManagedListenerChannel {
}
}
// We inherit from StateManagedListenerChannel in NIOTSDatagramListenerChannel, so we can't mark
// it as Sendable safely.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension StateManagedListenerChannel: @unchecked Sendable {}
#endif

View File

@ -83,6 +83,8 @@ internal protocol StateManagedNWConnectionChannel: StateManagedChannel where Act
var multipathServiceType: NWParameters.MultipathServiceType { get }
var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? { get }
func setChannelSpecificOption0<Option: ChannelOption>(option: Option, value: Option.Value) throws
func getChannelSpecificOption0<Option: ChannelOption>(option: Option) throws -> Option.Value
@ -182,7 +184,7 @@ extension StateManagedNWConnectionChannel {
preconditionFailure("nwconnection cannot be nil while channel is active")
}
func completionCallback(promise: EventLoopPromise<Void>?, sentBytes: Int) -> ((NWError?) -> Void) {
func completionCallback(promise: EventLoopPromise<Void>?, sentBytes: Int) -> (@Sendable (NWError?) -> Void) {
{ error in
if let error = error {
promise?.fail(error)
@ -242,6 +244,7 @@ extension StateManagedNWConnectionChannel {
connection.betterPathUpdateHandler = self.betterPathHandler
connection.viabilityUpdateHandler = self.viabilityUpdateHandler
connection.pathUpdateHandler = self.pathChangedHandler(newPath:)
self.nwParametersConfigurator?(connection.parameters)
connection.start(queue: self.connectionQueue)
}
@ -314,7 +317,7 @@ extension StateManagedNWConnectionChannel {
return
}
func completionCallback(for promise: EventLoopPromise<Void>?) -> ((NWError?) -> Void) {
func completionCallback(for promise: EventLoopPromise<Void>?) -> (@Sendable (NWError?) -> Void) {
{ error in
if let error = error {
promise?.fail(error)
@ -432,7 +435,7 @@ extension StateManagedNWConnectionChannel {
// APIs.
var buffer = self.allocator.buffer(capacity: content.count)
buffer.writeBytes(content)
self.pipeline.fireChannelRead(NIOAny(buffer))
self.pipeline.fireChannelRead(buffer)
self.pipeline.fireChannelReadComplete()
}

View File

@ -42,7 +42,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
func testEmptyWritePromise() {
let emptyWrite = self.allocator.buffer(capacity: 0)
let emptyWritePromise = self.eventLoop.makePromise(of: Void.self)
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
self.channel.write(emptyWrite, promise: emptyWritePromise)
self.channel.flush()
XCTAssertNoThrow(
try emptyWritePromise.futureResult.wait()
@ -53,7 +53,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
}
func testEmptyWritesNoWriteThrough() {
class OutboundTestHandler: ChannelOutboundHandler {
final class OutboundTestHandler: ChannelOutboundHandler, Sendable {
typealias OutboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer
@ -75,9 +75,9 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
let emptyWrite = self.allocator.buffer(capacity: 0)
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
let thenEmptyWritePromise = self.eventLoop.makePromise(of: Void.self)
self.channel.write(NIOAny(emptyWrite), promise: nil)
self.channel.write(emptyWrite, promise: nil)
self.channel.write(
NIOAny(thenEmptyWrite),
thenEmptyWrite,
promise: thenEmptyWritePromise
)
self.channel.flush()
@ -98,20 +98,20 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
case thenEmptyWrite
}
var checkOrder = CheckOrder.noWrite
someWritePromise.futureResult.whenSuccess {
someWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .noWrite)
checkOrder = .someWrite
}
thenEmptyWritePromise.futureResult.whenSuccess {
thenEmptyWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .someWrite)
checkOrder = .thenEmptyWrite
}
self.channel.write(
NIOAny(someWrite),
someWrite,
promise: someWritePromise
)
self.channel.write(
NIOAny(thenEmptyWrite),
thenEmptyWrite,
promise: thenEmptyWritePromise
)
self.channel.flush()
@ -136,20 +136,20 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
case thenEmptyWrite
}
var checkOrder = CheckOrder.noWrite
emptyWritePromise.futureResult.whenSuccess {
emptyWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .noWrite)
checkOrder = .emptyWrite
}
thenEmptyWritePromise.futureResult.whenSuccess {
thenEmptyWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .emptyWrite)
checkOrder = .thenEmptyWrite
}
self.channel.write(
NIOAny(emptyWrite),
emptyWrite,
promise: emptyWritePromise
)
self.channel.write(
NIOAny(thenEmptyWrite),
thenEmptyWrite,
promise: thenEmptyWritePromise
)
self.channel.flush()
@ -174,21 +174,21 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
case thenEmptyWrite
}
var checkOrder = CheckOrder.noWrite
emptyWritePromise.futureResult.whenSuccess {
emptyWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .noWrite)
checkOrder = .emptyWrite
}
thenSomeWritePromise.futureResult.whenSuccess {
thenSomeWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .emptyWrite)
checkOrder = .thenSomeWrite
}
thenEmptyWritePromise.futureResult.whenSuccess {
thenEmptyWritePromise.futureResult.assumeIsolated().whenSuccess {
XCTAssertEqual(checkOrder, .thenSomeWrite)
checkOrder = .thenEmptyWrite
}
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
self.channel.write(NIOAny(thenEmptyWrite), promise: thenEmptyWritePromise)
self.channel.write(emptyWrite, promise: emptyWritePromise)
self.channel.write(thenSomeWrite, promise: thenSomeWritePromise)
self.channel.write(thenEmptyWrite, promise: thenEmptyWritePromise)
self.channel.flush()
XCTAssertNoThrow(try thenEmptyWritePromise.futureResult.wait())
XCTAssertNoThrow(
@ -205,9 +205,9 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
let thenSomeWrite = self.allocator.bufferFor(string: "then some")
let thenSomeWritePromise = self.eventLoop.makePromise(of: Void.self)
self.channel.write(NIOAny(someWrite), promise: nil)
self.channel.write(NIOAny(thenEmptyWrite), promise: nil)
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
self.channel.write(someWrite, promise: nil)
self.channel.write(thenEmptyWrite, promise: nil)
self.channel.write(thenSomeWrite, promise: thenSomeWritePromise)
self.channel.flush()
XCTAssertNoThrow(try thenSomeWritePromise.futureResult.wait())
var someWriteOutput: ByteBuffer?
@ -228,7 +228,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
func testSomeWriteAndFlushThenSomeWriteAndFlush() {
let someWrite = self.allocator.bufferFor(string: "non empty")
var someWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
self.channel.write(NIOAny(someWrite), promise: someWritePromise)
self.channel.write(someWrite, promise: someWritePromise)
self.channel.flush()
XCTAssertNoThrow(
try someWritePromise.futureResult.wait()
@ -239,7 +239,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
someWritePromise = nil
let thenSomeWrite = self.allocator.bufferFor(string: "then some")
var thenSomeWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
self.channel.write(thenSomeWrite, promise: thenSomeWritePromise)
self.channel.flush()
XCTAssertNoThrow(
try thenSomeWritePromise.futureResult.wait()
@ -253,7 +253,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
func testEmptyWriteAndFlushThenEmptyWriteAndFlush() {
let emptyWrite = self.allocator.buffer(capacity: 0)
var emptyWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
self.channel.write(emptyWrite, promise: emptyWritePromise)
self.channel.flush()
XCTAssertNoThrow(
try emptyWritePromise.futureResult.wait()
@ -264,7 +264,7 @@ class NIOFilterEmptyWritesHandlerTests: XCTestCase {
emptyWritePromise = nil
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
var thenEmptyWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
self.channel.write(NIOAny(thenEmptyWrite), promise: thenEmptyWritePromise)
self.channel.write(thenEmptyWrite, promise: thenEmptyWritePromise)
self.channel.flush()
XCTAssertNoThrow(
try thenEmptyWritePromise.futureResult.wait()

View File

@ -85,12 +85,12 @@ private final class TLSUserEventHandler: ChannelInboundHandler, RemovableChannel
let alpn = String(string.dropFirst(15))
context.writeAndFlush(.init(ByteBuffer(string: "alpn:\(alpn)")), promise: nil)
context.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: alpn))
context.pipeline.removeHandler(self, promise: nil)
context.pipeline.syncOperations.removeHandler(self, promise: nil)
} else if string.hasPrefix("alpn:") {
context.fireUserInboundEventTriggered(
TLSUserEvent.handshakeCompleted(negotiatedProtocol: String(string.dropFirst(5)))
)
context.pipeline.removeHandler(self, promise: nil)
context.pipeline.syncOperations.removeHandler(self, promise: nil)
} else {
context.fireChannelRead(data)
}
@ -182,7 +182,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
func testServerClientBootstrap_withAsyncChannel_andHostPort() async throws {
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
defer {
try! eventLoopGroup.syncShutdownGracefully()
Task {
try! await eventLoopGroup.shutdownGracefully()
}
}
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
@ -240,7 +242,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
func testAsyncChannelProtocolNegotiation() async throws {
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
defer {
try! eventLoopGroup.syncShutdownGracefully()
Task {
try! await eventLoopGroup.shutdownGracefully()
}
}
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
@ -251,7 +255,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
try Self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -323,7 +327,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
func testAsyncChannelNestedProtocolNegotiation() async throws {
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
defer {
try! eventLoopGroup.syncShutdownGracefully()
Task {
try! await eventLoopGroup.shutdownGracefully()
}
}
let channel = try await NIOTSListenerBootstrap(group: eventLoopGroup)
@ -334,7 +340,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureNestedProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
try Self.configureNestedProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -459,7 +465,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
let eventLoopGroup = NIOTSEventLoopGroup(loopCount: 3)
defer {
try! eventLoopGroup.syncShutdownGracefully()
Task {
try! await eventLoopGroup.shutdownGracefully()
}
}
let channels = NIOLockedValueBox<[Channel]>([Channel]())
@ -478,7 +486,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: 0
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
try Self.configureProtocolNegotiationHandlers(channel: channel).protocolNegotiationResult
}
}
@ -585,7 +593,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
to: .init(ipAddress: "127.0.0.1", port: port)
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN)
try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN)
.protocolNegotiationResult
}
}
@ -602,7 +610,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
to: .init(ipAddress: "127.0.0.1", port: port)
) { channel in
channel.eventLoop.makeCompletedFuture {
try self.configureNestedProtocolNegotiationHandlers(
try Self.configureNestedProtocolNegotiationHandlers(
channel: channel,
proposedOuterALPN: proposedOuterALPN,
proposedInnerALPN: proposedInnerALPN
@ -612,18 +620,18 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
@discardableResult
private func configureProtocolNegotiationHandlers(
private static func configureProtocolNegotiationHandlers(
channel: Channel,
proposedALPN: TLSUserEventHandler.ALPN? = nil
) throws -> NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(LineDelimiterCoder()))
try channel.pipeline.syncOperations.addHandler(TLSUserEventHandler(proposedALPN: proposedALPN))
return try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
return try Self.addTypedApplicationProtocolNegotiationHandler(to: channel)
}
@discardableResult
private func configureNestedProtocolNegotiationHandlers(
private static func configureNestedProtocolNegotiationHandlers(
channel: Channel,
proposedOuterALPN: TLSUserEventHandler.ALPN? = nil,
proposedInnerALPN: TLSUserEventHandler.ALPN? = nil
@ -642,7 +650,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
try channel.pipeline.syncOperations.addHandler(
TLSUserEventHandler(proposedALPN: proposedInnerALPN)
)
let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
let negotiationFuture = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel)
return negotiationFuture.protocolNegotiationResult
}
@ -651,7 +659,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
try channel.pipeline.syncOperations.addHandler(
TLSUserEventHandler(proposedALPN: proposedInnerALPN)
)
let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel)
let negotiationHandler = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel)
return negotiationHandler.protocolNegotiationResult
}
@ -667,7 +675,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
@discardableResult
private func addTypedApplicationProtocolNegotiationHandler(
private static func addTypedApplicationProtocolNegotiationHandler(
to channel: Channel
) throws -> NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {
let negotiationHandler = NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult> {

View File

@ -24,58 +24,41 @@ import Foundation
@available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6, *)
final class NIOTSBootstrapTests: XCTestCase {
var groupBag: [NIOTSEventLoopGroup]? = nil // protected by `self.lock`
let lock = NIOLock()
override func setUp() {
self.lock.withLock {
XCTAssertNil(self.groupBag)
self.groupBag = []
}
}
override func tearDown() {
XCTAssertNoThrow(
try self.lock.withLock {
guard let groupBag = self.groupBag else {
XCTFail()
return
}
for group in groupBag {
func testBootstrapsTolerateFuturesFromDifferentEventLoopsReturnedInInitializers() throws {
let groupBag: NIOLockedValueBox<[NIOTSEventLoopGroup]> = .init([])
defer {
try! groupBag.withLockedValue {
for group in $0 {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
self.groupBag = nil
}
)
}
func freshEventLoop() -> EventLoop {
let group: NIOTSEventLoopGroup = .init(loopCount: 1, defaultQoS: .default)
self.lock.withLock {
self.groupBag!.append(group)
}
return group.next()
}
func testBootstrapsTolerateFuturesFromDifferentEventLoopsReturnedInInitializers() throws {
let childChannelDone = self.freshEventLoop().makePromise(of: Void.self)
let serverChannelDone = self.freshEventLoop().makePromise(of: Void.self)
@Sendable func freshEventLoop() -> EventLoop {
let group: NIOTSEventLoopGroup = .init(loopCount: 1, defaultQoS: .default)
groupBag.withLockedValue {
$0.append(group)
}
return group.next()
}
let childChannelDone = freshEventLoop().makePromise(of: Void.self)
let serverChannelDone = freshEventLoop().makePromise(of: Void.self)
let serverChannel = try assertNoThrowWithValue(
NIOTSListenerBootstrap(group: self.freshEventLoop())
NIOTSListenerBootstrap(group: freshEventLoop())
.childChannelInitializer { channel in
channel.eventLoop.preconditionInEventLoop()
defer {
childChannelDone.succeed(())
}
return self.freshEventLoop().makeSucceededFuture(())
return freshEventLoop().makeSucceededFuture(())
}
.serverChannelInitializer { channel in
channel.eventLoop.preconditionInEventLoop()
defer {
serverChannelDone.succeed(())
}
return self.freshEventLoop().makeSucceededFuture(())
return freshEventLoop().makeSucceededFuture(())
}
.bind(host: "127.0.0.1", port: 0)
.wait()
@ -85,10 +68,10 @@ final class NIOTSBootstrapTests: XCTestCase {
}
let client = try assertNoThrowWithValue(
NIOTSConnectionBootstrap(group: self.freshEventLoop())
NIOTSConnectionBootstrap(group: freshEventLoop())
.channelInitializer { channel in
channel.eventLoop.preconditionInEventLoop()
return self.freshEventLoop().makeSucceededFuture(())
return freshEventLoop().makeSucceededFuture(())
}
.connect(to: serverChannel.localAddress!)
.wait()
@ -140,7 +123,9 @@ final class NIOTSBootstrapTests: XCTestCase {
return try NIOTSListenerBootstrap(group: group)
.childChannelInitializer { channel in
XCTAssertEqual(0, numberOfConnections.loadThenWrappingIncrement(ordering: .relaxed))
return channel.pipeline.addHandler(TellMeIfConnectionIsTLSHandler(isTLS: isTLS))
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(TellMeIfConnectionIsTLSHandler(isTLS: isTLS))
}
}
.bind(host: "127.0.0.1", port: 0)
.wait()
@ -175,8 +160,7 @@ final class NIOTSBootstrapTests: XCTestCase {
)
.enableTLS()
var buffer = server1.allocator.buffer(capacity: 2)
buffer.writeString("NO")
let buffer = server1.allocator.buffer(string: "NO")
var maybeClient1: Channel? = nil
XCTAssertNoThrow(maybeClient1 = try bootstrap.connect(to: server1.localAddress!).wait())
@ -387,6 +371,49 @@ final class NIOTSBootstrapTests: XCTestCase {
XCTAssertEqual(try listenerChannel.getOption(NIOTSChannelOptions.multipathServiceType).wait(), .handover)
XCTAssertEqual(try connectionChannel.getOption(NIOTSChannelOptions.multipathServiceType).wait(), .handover)
}
func testNWParametersConfigurator() async throws {
try await withEventLoopGroup { group in
let configuratorServerListenerCounter = NIOLockedValueBox(0)
let configuratorServerConnectionCounter = NIOLockedValueBox(0)
let configuratorClientConnectionCounter = NIOLockedValueBox(0)
let waitForConnectionHandler = WaitForConnectionHandler(
connectionPromise: group.next().makePromise()
)
let listenerChannel = try await NIOTSListenerBootstrap(group: group)
.childChannelInitializer { connectionChannel in
connectionChannel.eventLoop.makeCompletedFuture {
try connectionChannel.pipeline.syncOperations.addHandler(waitForConnectionHandler)
}
}
.configureNWParameters { _ in
configuratorServerListenerCounter.withLockedValue { $0 += 1 }
}
.configureChildNWParameters { _ in
configuratorServerConnectionCounter.withLockedValue { $0 += 1 }
}
.bind(host: "localhost", port: 0)
.get()
let connectionChannel: Channel = try await NIOTSConnectionBootstrap(group: group)
.configureNWParameters { _ in
configuratorClientConnectionCounter.withLockedValue { $0 += 1 }
}
.connect(to: listenerChannel.localAddress!)
.get()
// Wait for the server to activate the connection channel to the client.
try await waitForConnectionHandler.connectionPromise.futureResult.get()
try await listenerChannel.close().get()
try await connectionChannel.close().get()
XCTAssertEqual(1, configuratorServerListenerCounter.withLockedValue { $0 })
XCTAssertEqual(1, configuratorServerConnectionCounter.withLockedValue { $0 })
XCTAssertEqual(1, configuratorClientConnectionCounter.withLockedValue { $0 })
}
}
}
extension Channel {

View File

@ -37,13 +37,14 @@ final class NIOTSChannelMetadataTests: XCTestCase {
}.wait()
}
func testThowsIfCalledOnANonInitializedChannel() {
func testThrowsIfCalledOnANonInitializedChannel() {
let eventLoopGroup = NIOTSEventLoopGroup()
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let channel = NIOTSConnectionChannel(
eventLoop: eventLoopGroup.next() as! NIOTSEventLoop,
tcpOptions: .init(),
tlsOptions: .init()
tlsOptions: .init(),
nwParametersConfigurator: nil
)
XCTAssertThrowsError(try channel.getMetadata(definition: NWProtocolTLS.definition).wait()) { error in
XCTAssertTrue(error is NIOTSConnectionNotInitialized, "unexpected error \(error)")

View File

@ -19,6 +19,7 @@ import NIOCore
import NIOFoundationCompat
import NIOTransportServices
import Foundation
import NIOConcurrencyHelpers
@available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6, *)
final class ConnectRecordingHandler: ChannelOutboundHandler {
@ -73,13 +74,14 @@ final class DisableWaitingAfterConnect: ChannelOutboundHandler {
typealias OutboundOut = Any
func connect(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) {
do {
try context.channel.syncOptions?.setOption(NIOTSChannelOptions.waitForActivity, value: false)
} catch {
promise?.fail(error)
return
}
let f = context.channel.setOption(NIOTSChannelOptions.waitForActivity, value: false).flatMap {
context.connect(to: address)
}
if let promise = promise {
f.cascade(to: promise)
}
context.connect(to: address).cascade(to: promise)
}
}
@ -112,7 +114,7 @@ final class PromiseOnActiveHandler: ChannelInboundHandler {
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
final class EventWaiter<Event>: ChannelInboundHandler {
final class EventWaiter<Event: Sendable>: ChannelInboundHandler {
typealias InboundIn = Any
typealias InboundOut = Any
@ -145,7 +147,6 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
func testConnectingToSocketAddressTraversesPipeline() throws {
let connectRecordingHandler = ConnectRecordingHandler()
let listener = try NIOTSListenerBootstrap(group: self.group)
.bind(host: "localhost", port: 0).wait()
defer {
@ -153,9 +154,14 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(connectRecordingHandler) }
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let handler = ConnectRecordingHandler()
try channel.pipeline.syncOperations.addHandler(handler)
XCTAssertEqual(handler.connectTargets, [])
XCTAssertEqual(handler.endpointTargets, [])
}
}
let connection = try connectBootstrap.connect(to: listener.localAddress!).wait()
defer {
@ -163,13 +169,13 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
try connection.eventLoop.submit {
XCTAssertEqual(connectRecordingHandler.connectTargets, [listener.localAddress!])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
let handler = try connection.pipeline.syncOperations.handler(type: ConnectRecordingHandler.self)
XCTAssertEqual(handler.connectTargets, [listener.localAddress!])
XCTAssertEqual(handler.endpointTargets, [])
}.wait()
}
func testConnectingToHostPortSkipsPipeline() throws {
let connectRecordingHandler = ConnectRecordingHandler()
let listener = try NIOTSListenerBootstrap(group: self.group)
.bind(host: "localhost", port: 0).wait()
defer {
@ -177,9 +183,14 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(connectRecordingHandler) }
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let connectRecordingHandler = ConnectRecordingHandler()
try channel.pipeline.syncOperations.addHandler(connectRecordingHandler)
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
}
}
let connection = try connectBootstrap.connect(host: "localhost", port: Int(listener.localAddress!.port!)).wait()
defer {
@ -187,6 +198,9 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
try connection.eventLoop.submit {
let connectRecordingHandler = try connection.pipeline.syncOperations.handler(
type: ConnectRecordingHandler.self
)
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(
connectRecordingHandler.endpointTargets,
@ -201,7 +215,6 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
func testConnectingToEndpointSkipsPipeline() throws {
let connectRecordingHandler = ConnectRecordingHandler()
let listener = try NIOTSListenerBootstrap(group: self.group)
.bind(host: "localhost", port: 0).wait()
defer {
@ -209,9 +222,14 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(connectRecordingHandler) }
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let connectRecordingHandler = ConnectRecordingHandler()
try channel.pipeline.syncOperations.addHandler(connectRecordingHandler)
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [])
}
}
let target = NWEndpoint.hostPort(
host: "localhost",
@ -224,6 +242,9 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
try connection.eventLoop.submit {
let connectRecordingHandler = try connection.pipeline.syncOperations.handler(
type: ConnectRecordingHandler.self
)
XCTAssertEqual(connectRecordingHandler.connectTargets, [])
XCTAssertEqual(connectRecordingHandler.endpointTargets, [target])
}.wait()
@ -231,7 +252,11 @@ class NIOTSConnectionChannelTests: XCTestCase {
func testZeroLengthWritesHaveSatisfiedPromises() throws {
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(FailOnReadHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(FailOnReadHandler())
}
}
.bind(host: "localhost", port: 0).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -247,11 +272,15 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
func testSettingTCPOptionsWholesale() throws {
let tcpOptions = NWProtocolTCP.Options()
tcpOptions.disableAckStretching = true
let listenerTCPOptions = NWProtocolTCP.Options()
listenerTCPOptions.disableAckStretching = true
let connectionTCPOptions = NWProtocolTCP.Options()
connectionTCPOptions.disableAckStretching = true
let listener = try NIOTSListenerBootstrap(group: self.group)
.tcpOptions(tcpOptions)
.tcpOptions(listenerTCPOptions)
.childTCPOptions(connectionTCPOptions)
.serverChannelInitializer { channel in
channel.getOption(ChannelOptions.socket(IPPROTO_TCP, TCP_SENDMOREACKS)).map { value in
XCTAssertEqual(value, 1)
@ -268,7 +297,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let connection = try NIOTSConnectionBootstrap(group: self.group)
.tcpOptions(tcpOptions)
.tcpOptions(connectionTCPOptions)
.channelInitializer { channel in
channel.getOption(ChannelOptions.socket(IPPROTO_TCP, TCP_SENDMOREACKS)).map { value in
XCTAssertEqual(value, 1)
@ -320,13 +349,16 @@ class NIOTSConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try listener.close().wait())
}
var writabilities = [Bool]()
let handler = WritabilityChangedHandler { newValue in
writabilities.append(newValue)
}
let writabilities: NIOLockedValueBox<[Bool]> = .init([])
let connection = try NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(handler) }
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let handler = WritabilityChangedHandler { newValue in
writabilities.withLockedValue { $0.append(newValue) }
}
try channel.pipeline.syncOperations.addHandler(handler)
}
}
.connect(to: listener.localAddress!)
.wait()
@ -337,8 +369,6 @@ class NIOTSConnectionChannelTests: XCTestCase {
value: ChannelOptions.Types.WriteBufferWaterMark(low: 2, high: 2048)
).wait()
)
var buffer = connection.allocator.buffer(capacity: 2048)
buffer.writeBytes(repeatElement(UInt8(4), count: 2048))
// We're going to issue the following pattern of writes:
// a: 1 byte
@ -355,58 +385,61 @@ class NIOTSConnectionChannelTests: XCTestCase {
// until after the promise for d has fired: by the time the promise for e has fired it will be writable
// again.
try connection.eventLoop.submit {
var buffer = connection.allocator.buffer(capacity: 2048)
buffer.writeBytes(repeatElement(UInt8(4), count: 2048))
// Pre writing.
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write a. After this write, we are still writable. When this write
// succeeds, we'll still be not writable.
connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { (_: Result<Void, Error>) in
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
}
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write b. After this write we are still writable. When this write
// succeeds we'll still be not writable.
connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { (_: Result<Void, Error>) in
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
}
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write c. After this write we are still writable (2047 bytes written).
// When this write succeeds we'll still be not writable (2 bytes outstanding).
connection.write(buffer.getSlice(at: 0, length: 2045)).whenComplete { (_: Result<Void, Error>) in
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
}
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write d. After this write we are still writable (2048 bytes written).
// When this write succeeds we'll become writable, but critically the promise fires before
// the state change, so we'll *appear* to be unwritable.
connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { (_: Result<Void, Error>) in
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
}
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write e. After this write we are now not writable (2049 bytes written).
// When this write succeeds we'll have already been writable, thanks to the previous
// write.
connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { (_: Result<Void, Error>) in
XCTAssertEqual(writabilities, [false, true])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false, true])
XCTAssertTrue(connection.isWritable)
// We close after this succeeds.
connection.close(promise: nil)
}
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
}.wait()
@ -415,7 +448,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try connection.closeFuture.wait())
// Ok, check that the writability changes worked.
XCTAssertEqual(writabilities, [false, true])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false, true])
}
func testWritabilityChangesAfterChangingWatermarks() throws {
@ -425,23 +458,22 @@ class NIOTSConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try listener.close().wait())
}
var writabilities = [Bool]()
let handler = WritabilityChangedHandler { newValue in
writabilities.append(newValue)
}
let writabilities: NIOLockedValueBox<[Bool]> = .init([])
let connection = try NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(handler) }
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let handler = WritabilityChangedHandler { newValue in
writabilities.withLockedValue({ $0.append(newValue) })
}
try channel.pipeline.syncOperations.addHandler(handler)
}
}
.connect(to: listener.localAddress!)
.wait()
defer {
XCTAssertNoThrow(try connection.close().wait())
}
// We're going to allocate a buffer.
var buffer = connection.allocator.buffer(capacity: 256)
buffer.writeBytes(repeatElement(UInt8(4), count: 256))
// We're going to issue a 256-byte write. This write will not cause any change in channel writability
// state.
//
@ -462,13 +494,17 @@ class NIOTSConnectionChannelTests: XCTestCase {
//
// Then we're going to set the high watermark to 1024, and the low to 256. This will change nothing.
try connection.eventLoop.submit {
// We're going to allocate a buffer.
var buffer = connection.allocator.buffer(capacity: 256)
buffer.writeBytes(repeatElement(UInt8(4), count: 256))
// Pre changes.
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
// Write. No writability change.
connection.write(buffer, promise: nil)
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
}.wait()
@ -477,7 +513,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
value: ChannelOptions.Types.WriteBufferWaterMark(low: 128, high: 256)
).flatMap {
// High to 256, low to 128. No writability change.
XCTAssertEqual(writabilities, [])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [])
XCTAssertTrue(connection.isWritable)
return connection.setOption(
@ -486,7 +522,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
)
}.flatMap {
// High to 255, low to 127. Channel becomes not writable.
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
return connection.setOption(
@ -495,7 +531,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
)
}.flatMap {
// High back to 256, low to 128. No writability change.
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
return connection.setOption(
@ -504,7 +540,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
)
}.flatMap {
// High to 1024, low to 128. No writability change.
XCTAssertEqual(writabilities, [false])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false])
XCTAssertFalse(connection.isWritable)
return connection.setOption(
@ -513,7 +549,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
)
}.flatMap {
// Low to 257, channel becomes writable again.
XCTAssertEqual(writabilities, [false, true])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false, true])
XCTAssertTrue(connection.isWritable)
return connection.setOption(
@ -522,7 +558,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
)
}.map {
// Low back to 256, no writability change.
XCTAssertEqual(writabilities, [false, true])
XCTAssertEqual(writabilities.withLockedValue({ $0 }), [false, true])
XCTAssertTrue(connection.isWritable)
}.wait()
}
@ -608,7 +644,9 @@ class NIOTSConnectionChannelTests: XCTestCase {
func testEarlyExitCanBeSetInWaitingState() throws {
let connectFuture = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in
channel.pipeline.addHandler(DisableWaitingAfterConnect())
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(DisableWaitingAfterConnect())
}
}.connect(to: try SocketAddress(unixDomainSocketPath: "/this/path/definitely/doesnt/exist"))
do {
@ -712,7 +750,11 @@ class NIOTSConnectionChannelTests: XCTestCase {
let channel = try NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in
channel.pipeline.addHandler(PromiseOnActiveHandler(activePromise))
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
PromiseOnActiveHandler(activePromise)
)
}
}.connect(to: listener.localAddress!).wait()
XCTAssertNoThrow(try activePromise.futureResult.wait())
@ -772,10 +814,13 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let testCompletePromise = self.group.next().makePromise(of: Void.self)
let testHandler = TestHandler(testCompletePromise: testCompletePromise)
let listener = try assertNoThrowWithValue(
NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.bind(host: "localhost", port: 0).wait()
)
defer {
@ -783,7 +828,13 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(testHandler) }
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
TestHandler(testCompletePromise: testCompletePromise)
)
}
}
let connection = try assertNoThrowWithValue(connectBootstrap.connect(to: listener.localAddress!).wait())
defer {
@ -804,7 +855,8 @@ class NIOTSConnectionChannelTests: XCTestCase {
// We expect 2.
XCTAssertNoThrow(
try connection.eventLoop.submit {
XCTAssertEqual(testHandler.readCount, 2)
let handler = try connection.pipeline.syncOperations.handler(type: TestHandler.self)
XCTAssertEqual(handler.readCount, 2)
}.wait()
)
}
@ -839,11 +891,15 @@ class NIOTSConnectionChannelTests: XCTestCase {
func testConnectingInvolvesWaiting() throws {
let loop = self.group.next()
let eventPromise = loop.makePromise(of: NIOTSNetworkEvents.WaitingForConnectivity.self)
let eventRecordingHandler = EventWaiter<NIOTSNetworkEvents.WaitingForConnectivity>(eventPromise)
// 5s is the worst-case test time: normally it'll be faster as we don't wait for this.
let connectBootstrap = NIOTSConnectionBootstrap(group: loop)
.channelInitializer { channel in channel.pipeline.addHandler(eventRecordingHandler) }
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let eventRecordingHandler = EventWaiter<NIOTSNetworkEvents.WaitingForConnectivity>(eventPromise)
try channel.pipeline.syncOperations.addHandler(eventRecordingHandler)
}
}
.connectTimeout(.seconds(5))
// We choose 443 here to avoid triggering Private Relay, which can do all kinds of weird stuff to this test.
@ -868,7 +924,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
}
func testSyncOptionsAreSupported() throws {
func testSyncOptions(_ channel: Channel) {
@Sendable func testSyncOptions(_ channel: Channel) {
if let sync = channel.syncOptions {
do {
let autoRead = try sync.getOption(ChannelOptions.autoRead)
@ -919,6 +975,7 @@ class NIOTSConnectionChannelTests: XCTestCase {
func channelActive(context: ChannelHandlerContext) {
listenerChannel
.close()
.assumeIsolated()
.whenSuccess { _ in
_ = context.channel.write(ByteBuffer(data: Data()))
}
@ -943,12 +1000,14 @@ class NIOTSConnectionChannelTests: XCTestCase {
let testCompletePromise = self.group.next().makePromise(of: Error.self)
let connection = try NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in
channel.pipeline.addHandler(
ForwardErrorHandler(
testCompletePromise: testCompletePromise,
listenerChannel: listener
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
ForwardErrorHandler(
testCompletePromise: testCompletePromise,
listenerChannel: listener
)
)
)
}
}
.connect(to: listener.localAddress!)
.wait()

View File

@ -18,9 +18,10 @@ import Network
import NIOCore
import NIOTransportServices
import Foundation
import NIOConcurrencyHelpers
extension Channel {
func wait<T>(for type: T.Type, count: Int) throws -> [T] {
func wait<T: Sendable>(for type: T.Type, count: Int) throws -> [T] {
try self.pipeline.context(name: "ByteReadRecorder").flatMap { context in
if let future = (context.handler as? ReadRecorder<T>)?.notifyForDatagrams(count) {
return future
@ -53,7 +54,7 @@ extension Channel {
}
}
final class ReadRecorder<DataType>: ChannelInboundHandler {
final class ReadRecorder<DataType: Sendable>: ChannelInboundHandler {
typealias InboundIn = DataType
typealias InboundOut = DataType
@ -113,29 +114,42 @@ final class ReadRecorder<DataType>: ChannelInboundHandler {
// Mimicks the DatagramChannelTest from apple/swift-nio
@available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6, *)
final class NIOTSDatagramConnectionChannelTests: XCTestCase {
final class NIOTSDatagramBootstrapTests: XCTestCase {
private var group: NIOTSEventLoopGroup!
private func buildServerChannel(
group: NIOTSEventLoopGroup,
host: String = "127.0.0.1",
port: Int = 0,
onConnect: @escaping (Channel) -> Void
onConnect: @escaping @Sendable (Channel) -> Void
) throws -> Channel {
try NIOTSDatagramListenerBootstrap(group: group)
.childChannelInitializer { childChannel in
onConnect(childChannel)
return childChannel.pipeline.addHandler(ReadRecorder<ByteBuffer>(), name: "ByteReadRecorder")
return childChannel.eventLoop.makeCompletedFuture {
try childChannel.pipeline.syncOperations.addHandler(
ReadRecorder<ByteBuffer>(),
name: "ByteReadRecorder"
)
}
}
.bind(host: host, port: port)
.wait()
}
private func buildClientChannel(group: NIOTSEventLoopGroup, host: String = "127.0.0.1", port: Int) throws -> Channel
{
try NIOTSDatagramBootstrap(group: group)
private func buildClientChannel(
group: NIOTSEventLoopGroup,
host: String = "127.0.0.1",
port: Int
) throws -> Channel {
try NIOTSDatagramConnectionBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandler(ReadRecorder<ByteBuffer>(), name: "ByteReadRecorder")
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
ReadRecorder<ByteBuffer>(),
name: "ByteReadRecorder"
)
}
}
.connect(host: host, port: port)
.wait()
@ -169,7 +183,7 @@ final class NIOTSDatagramConnectionChannelTests: XCTestCase {
}
func testSyncOptionsAreSupported() throws {
func testSyncOptions(_ channel: Channel) {
@Sendable func testSyncOptions(_ channel: Channel) {
if let sync = channel.syncOptions {
do {
let endpointReuse = try sync.getOption(NIOTSChannelOptions.allowLocalEndpointReuse)
@ -192,7 +206,12 @@ final class NIOTSDatagramConnectionChannelTests: XCTestCase {
.childChannelInitializer { channel in
testSyncOptions(channel)
promise.succeed(channel)
return channel.pipeline.addHandler(ReadRecorder<ByteBuffer>(), name: "ByteReadRecorder")
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
ReadRecorder<ByteBuffer>(),
name: "ByteReadRecorder"
)
}
}
.bind(host: "localhost", port: 0)
.wait()
@ -200,7 +219,7 @@ final class NIOTSDatagramConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try listener.close().wait())
}
let connection = try! NIOTSDatagramBootstrap(group: self.group)
let connection = try! NIOTSDatagramConnectionBootstrap(group: self.group)
.channelInitializer { channel in
testSyncOptions(channel)
return channel.eventLoop.makeSucceededVoidFuture()
@ -214,6 +233,54 @@ final class NIOTSDatagramConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try connection.close().wait())
}
func testNWParametersConfigurator() async throws {
try await withEventLoopGroup { group in
let configuratorServerListenerCounter = NIOLockedValueBox(0)
let configuratorServerConnectionCounter = NIOLockedValueBox(0)
let configuratorClientConnectionCounter = NIOLockedValueBox(0)
let waitForConnectionHandler = WaitForConnectionHandler(
connectionPromise: group.next().makePromise()
)
let listenerChannel = try await NIOTSDatagramListenerBootstrap(group: group)
.childChannelInitializer { connectionChannel in
connectionChannel.eventLoop.makeCompletedFuture {
try connectionChannel.pipeline.syncOperations.addHandler(waitForConnectionHandler)
}
}
.configureNWParameters { _ in
configuratorServerListenerCounter.withLockedValue { $0 += 1 }
}
.configureChildNWParameters { _ in
configuratorServerConnectionCounter.withLockedValue { $0 += 1 }
}
.bind(host: "localhost", port: 0)
.get()
let connectionChannel: Channel = try await NIOTSDatagramConnectionBootstrap(group: group)
.configureNWParameters { _ in
configuratorClientConnectionCounter.withLockedValue { $0 += 1 }
}
.connect(to: listenerChannel.localAddress!)
.get()
// Need to write something so the server can activate the connection channel: this is UDP,
// so there is no handshaking that happens and thus the server cannot know that the
// connection has been established and the channel can be activated until we receive something.
try await connectionChannel.writeAndFlush(ByteBuffer(bytes: [42]))
// Wait for the server to activate the connection channel to the client.
try await waitForConnectionHandler.connectionPromise.futureResult.get()
try await listenerChannel.close().get()
try await connectionChannel.close().get()
XCTAssertEqual(1, configuratorServerListenerCounter.withLockedValue { $0 })
XCTAssertEqual(1, configuratorServerConnectionCounter.withLockedValue { $0 })
XCTAssertEqual(1, configuratorClientConnectionCounter.withLockedValue { $0 })
}
}
func testCanExtractTheConnection() throws {
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
throw XCTSkip("Option not available")
@ -225,7 +292,7 @@ final class NIOTSDatagramConnectionChannelTests: XCTestCase {
XCTAssertNoThrow(try listener.close().wait())
}
_ = try NIOTSDatagramBootstrap(group: self.group)
_ = try NIOTSDatagramConnectionBootstrap(group: self.group)
.channelInitializer { channel in
let conn = try! channel.syncOptions!.getOption(NIOTSChannelOptions.connection)
XCTAssertNil(conn)

View File

@ -18,6 +18,7 @@ import NIOCore
import NIOTransportServices
import Foundation
import Network
import NIOConcurrencyHelpers
func assertNoThrowWithValue<T>(
_ body: @autoclosure () throws -> T,
@ -56,44 +57,29 @@ final class ReadExpecter: ChannelInboundHandler {
struct DidNotReadError: Error {}
private var readPromise: EventLoopPromise<Void>?
private let readPromise: EventLoopPromise<Void>
private var cumulationBuffer: ByteBuffer?
private let expectedRead: ByteBuffer
var readFuture: EventLoopFuture<Void>? {
self.readPromise?.futureResult
}
init(expecting: ByteBuffer) {
init(expecting: ByteBuffer, readPromise: EventLoopPromise<Void>) {
self.readPromise = readPromise
self.cumulationBuffer = nil
self.expectedRead = expecting
}
func handlerAdded(context: ChannelHandlerContext) {
self.readPromise = context.eventLoop.makePromise()
}
func handlerRemoved(context: ChannelHandlerContext) {
if let promise = self.readPromise {
promise.fail(DidNotReadError())
}
self.readPromise.fail(DidNotReadError())
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var bytes = self.unwrapInboundIn(data)
if self.cumulationBuffer == nil {
self.cumulationBuffer = bytes
} else {
self.cumulationBuffer!.writeBuffer(&bytes)
}
self.cumulationBuffer.setOrWriteBuffer(&bytes)
self.maybeFulfillPromise()
}
private func maybeFulfillPromise() {
if let promise = self.readPromise, self.cumulationBuffer! == self.expectedRead {
promise.succeed(())
self.readPromise = nil
}
guard self.cumulationBuffer == self.expectedRead else { return }
self.readPromise.succeed(())
}
}
@ -180,9 +166,12 @@ final class WaitForActiveHandler: ChannelInboundHandler {
extension Channel {
/// Expect that the given bytes will be received.
func expectRead(_ bytes: ByteBuffer) -> EventLoopFuture<Void> {
let expecter = ReadExpecter(expecting: bytes)
return self.pipeline.addHandler(expecter).flatMap {
expecter.readFuture!
let readPromise = self.eventLoop.makePromise(of: Void.self)
return self.eventLoop.submit {
let expecter = ReadExpecter(expecting: bytes, readPromise: readPromise)
try self.pipeline.syncOperations.addHandler(expecter)
}.flatMap {
readPromise.futureResult
}
}
}
@ -209,7 +198,11 @@ class NIOTSEndToEndTests: XCTestCase {
func testSimpleListener() throws {
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.bind(host: "localhost", port: 0).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -232,7 +225,11 @@ class NIOTSEndToEndTests: XCTestCase {
on: NWEndpoint.Port(rawValue: 0)!
)
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.withNWListener(nwListenerTest).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -259,7 +256,11 @@ class NIOTSEndToEndTests: XCTestCase {
func testMultipleConnectionsOneListener() throws {
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.bind(host: "localhost", port: 0).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -282,7 +283,11 @@ class NIOTSEndToEndTests: XCTestCase {
func testBasicConnectionTeardown() throws {
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(CloseOnActiveHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(CloseOnActiveHandler())
}
}
.bind(host: "localhost", port: 0).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -304,12 +309,12 @@ class NIOTSEndToEndTests: XCTestCase {
// This test is a little bit dicey, but we need 20 futures in this list.
let closeFutureSyncQueue = DispatchQueue(label: "closeFutureSyncQueue")
let closeFutureGroup = DispatchGroup()
var closeFutures: [EventLoopFuture<Void>] = []
let closeFutures: NIOLockedValueBox<[EventLoopFuture<Void>]> = .init([])
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in
closeFutureSyncQueue.sync {
closeFutures.append(channel.closeFuture)
closeFutures.withLockedValue { $0.append(channel.closeFuture) }
}
closeFutureGroup.leave()
return channel.eventLoop.makeSucceededFuture(())
@ -320,7 +325,9 @@ class NIOTSEndToEndTests: XCTestCase {
}
let bootstrap = NIOTSConnectionBootstrap(group: self.group).channelInitializer { channel in
channel.pipeline.addHandler(CloseOnActiveHandler())
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(CloseOnActiveHandler())
}
}
for _ in (0..<10) {
@ -330,7 +337,7 @@ class NIOTSEndToEndTests: XCTestCase {
closeFutureGroup.enter()
bootstrap.connect(to: listener.localAddress!).whenSuccess { channel in
closeFutureSyncQueue.sync {
closeFutures.append(channel.closeFuture)
closeFutures.withLockedValue { $0.append(channel.closeFuture) }
}
closeFutureGroup.leave()
}
@ -338,7 +345,9 @@ class NIOTSEndToEndTests: XCTestCase {
closeFutureGroup.wait()
let allClosed = closeFutureSyncQueue.sync {
EventLoopFuture<Void>.andAllComplete(closeFutures, on: self.group.next())
closeFutures.withLockedValue {
EventLoopFuture<Void>.andAllComplete($0, on: self.group.next())
}
}
XCTAssertNoThrow(try allClosed.wait())
}
@ -347,10 +356,12 @@ class NIOTSEndToEndTests: XCTestCase {
let serverSideConnectionPromise: EventLoopPromise<Channel> = self.group.next().makePromise()
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in
channel.pipeline.addHandlers([
WaitForActiveHandler(serverSideConnectionPromise),
EchoHandler(),
])
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandlers([
WaitForActiveHandler(serverSideConnectionPromise),
EchoHandler(),
])
}
}
.bind(host: "localhost", port: 0).wait()
defer {
@ -373,8 +384,9 @@ class NIOTSEndToEndTests: XCTestCase {
let halfClosedPromise: EventLoopPromise<Void> = self.group.next().makePromise()
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in
channel.pipeline.addHandler(EchoHandler()).flatMap { _ in
channel.pipeline.addHandler(HalfCloseHandler(halfClosedPromise))
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
try channel.pipeline.syncOperations.addHandler(HalfCloseHandler(halfClosedPromise))
}
}
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
@ -403,8 +415,9 @@ class NIOTSEndToEndTests: XCTestCase {
func testDisabledHalfClosureCausesFullClosure() throws {
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in
channel.pipeline.addHandler(EchoHandler()).flatMap { _ in
channel.pipeline.addHandler(FailOnHalfCloseHandler())
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
try channel.pipeline.syncOperations.addHandler(FailOnHalfCloseHandler())
}
}
.bind(host: "localhost", port: 0).wait()
@ -483,7 +496,11 @@ class NIOTSEndToEndTests: XCTestCase {
let udsPath = "/tmp/\(UUID().uuidString)_testBasicUnixSockets.sock"
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.bind(unixDomainSocketPath: udsPath).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -513,7 +530,11 @@ class NIOTSEndToEndTests: XCTestCase {
let serviceEndpoint = NWEndpoint.service(name: name, type: "_niots._tcp", domain: "local", interface: nil)
let listener = try NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(EchoHandler())
}
}
.bind(endpoint: serviceEndpoint).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -541,7 +562,11 @@ class NIOTSEndToEndTests: XCTestCase {
let listener = try NIOTSListenerBootstrap(group: self.group)
.serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEADDR), value: 0)
.serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEPORT), value: 0)
.childChannelInitializer { channel in channel.pipeline.addHandler(CloseOnActiveHandler()) }
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(CloseOnActiveHandler())
}
}
.bind(host: "localhost", port: 0).wait()
let address = listener.localAddress!
@ -587,11 +612,11 @@ class NIOTSEndToEndTests: XCTestCase {
let testCompletePromise = self.group.next().makePromise(of: Bool.self)
let connection = try NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in
channel.pipeline.addHandler(
ViabilityHandler(
testCompletePromise: testCompletePromise
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
ViabilityHandler(testCompletePromise: testCompletePromise)
)
)
}
}
.connect(to: listener.localAddress!)
.wait()

View File

@ -147,7 +147,9 @@ class NIOTSEventLoopTest: XCTestCase {
func testIndividualLoopsCannotBeShutDownWhenPartOfGroup() async throws {
let group = NIOTSEventLoopGroup(loopCount: 3)
defer {
try! group.syncShutdownGracefully()
Task {
try! await group.shutdownGracefully()
}
}
for loop in group.makeIterator() {

View File

@ -55,13 +55,18 @@ class NIOTSListenerChannelTests: XCTestCase {
}
func testBindingToSocketAddressTraversesPipeline() throws {
let bindRecordingHandler = BindRecordingHandler()
let target = try SocketAddress.makeAddressResolvingHost("localhost", port: 0)
let bindBootstrap = NIOTSListenerBootstrap(group: self.group)
.serverChannelInitializer { channel in channel.pipeline.addHandler(bindRecordingHandler) }
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let bindRecordingHandler = BindRecordingHandler()
try channel.pipeline.syncOperations.addHandler(
bindRecordingHandler
)
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
}
}
let listener = try bindBootstrap.bind(to: target).wait()
defer {
@ -69,18 +74,22 @@ class NIOTSListenerChannelTests: XCTestCase {
}
try self.group.next().submit {
XCTAssertEqual(bindRecordingHandler.bindTargets, [target])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
let handler = try listener.pipeline.syncOperations.handler(type: BindRecordingHandler.self)
XCTAssertEqual(handler.bindTargets, [target])
XCTAssertEqual(handler.endpointTargets, [])
}.wait()
}
func testConnectingToHostPortTraversesPipeline() throws {
let bindRecordingHandler = BindRecordingHandler()
let bindBootstrap = NIOTSListenerBootstrap(group: self.group)
.serverChannelInitializer { channel in channel.pipeline.addHandler(bindRecordingHandler) }
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let bindRecordingHandler = BindRecordingHandler()
try channel.pipeline.syncOperations.addHandler(bindRecordingHandler)
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
}
}
let listener = try bindBootstrap.bind(host: "localhost", port: 0).wait()
defer {
@ -88,22 +97,28 @@ class NIOTSListenerChannelTests: XCTestCase {
}
try self.group.next().submit {
let handler = try listener.pipeline.syncOperations.handler(
type: BindRecordingHandler.self
)
XCTAssertEqual(
bindRecordingHandler.bindTargets,
handler.bindTargets,
[try SocketAddress.makeAddressResolvingHost("localhost", port: 0)]
)
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
XCTAssertEqual(handler.endpointTargets, [])
}.wait()
}
func testConnectingToEndpointSkipsPipeline() throws {
let endpoint = NWEndpoint.hostPort(host: .ipv4(.loopback), port: .any)
let bindRecordingHandler = BindRecordingHandler()
let bindBootstrap = NIOTSListenerBootstrap(group: self.group)
.serverChannelInitializer { channel in channel.pipeline.addHandler(bindRecordingHandler) }
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let bindRecordingHandler = BindRecordingHandler()
try channel.pipeline.syncOperations.addHandler(bindRecordingHandler)
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [])
}
}
let listener = try bindBootstrap.bind(endpoint: endpoint).wait()
defer {
@ -111,8 +126,11 @@ class NIOTSListenerChannelTests: XCTestCase {
}
try self.group.next().submit {
XCTAssertEqual(bindRecordingHandler.bindTargets, [])
XCTAssertEqual(bindRecordingHandler.endpointTargets, [endpoint])
let handler = try listener.pipeline.syncOperations.handler(
type: BindRecordingHandler.self
)
XCTAssertEqual(handler.bindTargets, [])
XCTAssertEqual(handler.endpointTargets, [endpoint])
}.wait()
}
@ -169,7 +187,11 @@ class NIOTSListenerChannelTests: XCTestCase {
let listener = try NIOTSListenerBootstrap(group: self.group, childGroup: childGroup)
.childChannelInitializer { channel in
childChannelPromise.succeed(channel)
return channel.pipeline.addHandler(PromiseOnActiveHandler(activePromise))
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
PromiseOnActiveHandler(activePromise)
)
}
}.bind(host: "localhost", port: 0).wait()
defer {
XCTAssertNoThrow(try listener.close().wait())
@ -243,7 +265,9 @@ class NIOTSListenerChannelTests: XCTestCase {
let channelPromise = self.group.next().makePromise(of: Channel.self)
let listener = try NIOTSListenerBootstrap(group: self.group)
.serverChannelInitializer { channel in
channel.pipeline.addHandler(ChannelReceiver(channelPromise))
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(ChannelReceiver(channelPromise))
}
}
.bind(host: "localhost", port: 0).wait()
defer {
@ -258,7 +282,9 @@ class NIOTSListenerChannelTests: XCTestCase {
// We must wait for channel active here, or the socket addresses won't be set.
let promisedChannel = try channelPromise.futureResult.flatMap { (channel) -> EventLoopFuture<Channel> in
let promiseChannelActive = channel.eventLoop.makePromise(of: Channel.self)
_ = channel.pipeline.addHandler(WaitForActiveHandler(promiseChannelActive))
try? channel.pipeline.syncOperations.addHandler(
WaitForActiveHandler(promiseChannelActive)
)
return promiseChannelActive.futureResult
}.wait()
@ -314,7 +340,7 @@ class NIOTSListenerChannelTests: XCTestCase {
}
func testSyncOptionsAreSupported() throws {
func testSyncOptions(_ channel: Channel) {
@Sendable func testSyncOptions(_ channel: Channel) {
if let sync = channel.syncOptions {
do {
let endpointReuse = try sync.getOption(NIOTSChannelOptions.allowLocalEndpointReuse)

View File

@ -0,0 +1,44 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import NIOCore
import NIOTransportServices
func withEventLoopGroup(_ test: (EventLoopGroup) async throws -> Void) async rethrows {
let group = NIOTSEventLoopGroup()
do {
try await test(group)
try? await group.shutdownGracefully()
} catch {
try? await group.shutdownGracefully()
throw error
}
}
final class WaitForConnectionHandler: ChannelInboundHandler, Sendable {
typealias InboundIn = Never
let connectionPromise: EventLoopPromise<Void>
init(connectionPromise: EventLoopPromise<Void>) {
self.connectionPromise = connectionPromise
}
func channelActive(context: ChannelHandlerContext) {
self.connectionPromise.succeed()
context.fireChannelActive()
}
}
#endif