Add support for UDP clients and servers.

Motivation:

This change was made because UDP support was lacking on iOS. It's needed by my DNS client implementation, which I am in turn using for an iOS app I'm working on relying on SRV typed records.

Modifications:

Adds a NIOTSDatagramListenerBootstrap for making UDP services
Adds a NIOTSDatagramListenerChannel that accepts UDP connections
Adds a NIOTSDatagramChannel for UDP client connections
Adds a NIOTSDatagramBootstrap that can create a new UDP client
This commit is contained in:
Joannis Orlandos 2023-08-10 15:02:19 +02:00 committed by GitHub
parent 5fd1458c24
commit e7403c35ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1099 additions and 4 deletions

View File

@ -0,0 +1,216 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import NIOCore
import Dispatch
import Network
/// A `NIOTSDatagramBootstrap` is an easy way to bootstrap a `NIOTSDatagramChannel` when creating network clients.
///
/// Usually you re-use a `NIOTSDatagramBootstrap` once you set it up, calling `connect` multiple times on the same bootstrap.
/// This way you ensure that the same `EventLoop`s will be shared across all your connections.
///
/// Example:
///
/// ```swift
/// let group = NIOTSEventLoopGroup()
/// defer {
/// try! group.syncShutdownGracefully()
/// }
/// let bootstrap = NIOTSDatagramBootstrap(group: group)
/// .channelInitializer { channel in
/// channel.pipeline.addHandler(MyChannelHandler())
/// }
/// try! bootstrap.connect(host: "example.org", port: 12345).wait()
/// /* the Channel is now connected */
/// ```
///
/// The connected `NIOTSDatagramChannel` will operate on `ByteBuffer` as inbound and outbound messages.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSDatagramBootstrap {
private let group: EventLoopGroup
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>)?
private var connectTimeout: TimeAmount = TimeAmount.seconds(10)
private var channelOptions = ChannelOptions.Storage()
private var qos: DispatchQoS?
private var udpOptions: NWProtocolUDP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
/// Create a `NIOTSDatagramConnectionBootstrap` on the `EventLoopGroup` `group`.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
/// existing `NIOTSEventLoop` may be used to initialize this bootstrap. Where possible the
/// initializers accepting `NIOTSEventLoopGroup` should be used instead to avoid the wrong
/// type being used.
///
/// Note that the "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use.
public init(group: EventLoopGroup) {
self.group = group
}
/// Create a `NIOTSDatagramConnectionBootstrap` on the `NIOTSEventLoopGroup` `group`.
///
/// - parameters:
/// - group: The `NIOTSEventLoopGroup` to use.
public convenience init(group: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup)
}
/// Initialize the connected `NIOTSDatagramConnectionChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The connected `Channel` will operate on `ByteBuffer` as inbound and outbound messages.
///
/// - parameters:
/// - handler: A closure that initializes the provided `Channel`.
public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.channelInitializer = handler
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSDatagramConnectionChannel`.
///
/// - parameters:
/// - option: The option to be applied.
/// - value: The value for the option.
public func channelOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> Self {
channelOptions.append(key: option, value: value)
return self
}
/// Specifies a timeout to apply to a connection attempt.
//
/// - parameters:
/// - timeout: The timeout that will apply to the connection attempt.
public func connectTimeout(_ timeout: TimeAmount) -> Self {
self.connectTimeout = timeout
return self
}
/// Specifies a QoS to use for this connection, instead of the default QoS for the
/// event loop.
///
/// This allows unusually high or low priority workloads to be appropriately scheduled.
public func withQoS(_ qos: DispatchQoS) -> Self {
self.qos = qos
return self
}
/// Specifies the UDP options to use on the `Channel`s.
///
/// To retrieve the UDP options from connected channels, use
/// `NIOTSChannelOptions.UDPConfiguration`. It is not possible to change the
/// UDP configuration after `connect` is called.
public func udpOptions(_ options: NWProtocolUDP.Options) -> Self {
self.udpOptions = options
return self
}
/// Specifies the TLS options to use on the `Channel`s.
///
/// To retrieve the TLS options from connected channels, use
/// `NIOTSChannelOptions.TLSConfiguration`. It is not possible to change the
/// TLS configuration after `connect` is called.
public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self {
self.tlsOptions = options
return self
}
/// Specify the `host` and `port` to connect to for the UDP `Channel` that will be established.
///
/// - parameters:
/// - host: The host to connect to.
/// - port: The port to connect to.
/// - returns: An `EventLoopFuture<Channel>` to deliver the `Channel` when connected.
public func connect(host: String, port: Int) -> EventLoopFuture<Channel> {
guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else {
return self.group.next().makeFailedFuture(NIOTSErrors.InvalidPort(port: port))
}
return self.connect(endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort))
}
/// Specify the `address` to connect to for the UDP `Channel` that will be established.
///
/// - parameters:
/// - address: The address to connect to.
/// - returns: An `EventLoopFuture<Channel>` to deliver the `Channel` when connected.
public func connect(to address: SocketAddress) -> EventLoopFuture<Channel> {
return self.connect0 { channel, promise in
channel.bind(to: address, promise: promise)
}
}
/// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established.
///
/// - parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to connect to.
/// - returns: An `EventLoopFuture<Channel>` to deliver the `Channel` when connected.
public func connect(unixDomainSocketPath: String) -> EventLoopFuture<Channel> {
do {
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
return connect(to: address)
} catch {
return group.next().makeFailedFuture(error)
}
}
/// Specify the `endpoint` to connect to for the UDP `Channel` that will be established.
public func connect(endpoint: NWEndpoint) -> EventLoopFuture<Channel> {
return self.connect0 { channel, promise in
channel.triggerUserOutboundEvent(
NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint),
promise: promise
)
}
}
private func connect0(_ binder: @escaping (Channel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
let conn: Channel = NIOTSDatagramChannel(eventLoop: self.group.next() as! NIOTSEventLoop,
qos: self.qos,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions)
let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) }
let channelOptions = self.channelOptions
return conn.eventLoop.submit {
return channelOptions.applyAllChannelOptions(to: conn).flatMap {
initializer(conn)
}.flatMap {
conn.eventLoop.assertInEventLoop()
return conn.register()
}.flatMap {
let connectPromise: EventLoopPromise<Void> = conn.eventLoop.makePromise()
binder(conn, connectPromise)
let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) {
connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout))
conn.close(promise: nil)
}
connectPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
cancelTask.cancel()
}
return connectPromise.futureResult
}.map { conn }.flatMapErrorThrowing {
conn.close(promise: nil)
throw $0
}
}.flatMap { $0 }
}
}
#endif

View File

@ -0,0 +1,190 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import Atomics
import Foundation
import NIOCore
import NIOConcurrencyHelpers
import NIOFoundationCompat
import NIOTLS
import Dispatch
import Network
import Security
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel {
typealias ActiveSubstate = UDPSubstate
enum UDPSubstate: NWConnectionSubstate {
case open, closed
init() {
self = .open
}
static func closeInput(state: inout ChannelState<NIOTSDatagramChannel.UDPSubstate>) throws {
throw NIOTSErrors.InvalidChannelStateTransition()
}
static func closeOutput(state: inout ChannelState<NIOTSDatagramChannel.UDPSubstate>) throws {
throw NIOTSErrors.InvalidChannelStateTransition()
}
}
/// The kinds of channel activation this channel supports
internal let supportedActivationType: ActivationType = .connect
/// The `ByteBufferAllocator` for this `Channel`.
public let allocator = ByteBufferAllocator()
/// An `EventLoopFuture` that will complete when this channel is finally closed.
public var closeFuture: EventLoopFuture<Void> {
return self.closePromise.futureResult
}
/// The parent `Channel` for this one, if any.
public let parent: Channel?
/// The `EventLoop` this `Channel` belongs to.
internal let tsEventLoop: NIOTSEventLoop
private(set) var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads.
internal let closePromise: EventLoopPromise<Void>
/// The underlying `NWConnection` that this `Channel` wraps. This is only non-nil
/// after the initial connection attempt has been made.
internal var connection: NWConnection?
/// The `DispatchQueue` that socket events for this connection will be dispatched onto.
internal let connectionQueue: DispatchQueue
/// An `EventLoopPromise` that will be succeeded or failed when a connection attempt succeeds or fails.
internal var connectPromise: EventLoopPromise<Void>?
/// The UDP options for this connection.
private let udpOptions: NWProtocolUDP.Options
internal var nwOptions: NWProtocolUDP.Options { udpOptions }
/// The TLS options for this connection, if any.
private var tlsOptions: NWProtocolTLS.Options?
/// The state of this connection channel.
internal var state: ChannelState<ActiveSubstate> = .idle
/// The active state, used for safely reporting the channel state across threads.
internal var isActive0 = ManagedAtomic(false)
/// Whether a call to NWConnection.receive has been made, but the completion
/// handler has not yet been invoked.
internal var outstandingRead: Bool = false
/// The options for this channel.
internal var options = TransportServicesChannelOptions()
/// Any pending writes that have yet to be delivered to the network stack.
internal var pendingWrites = CircularBuffer<PendingWrite>(initialCapacity: 8)
/// An object to keep track of pending writes and manage our backpressure signaling.
internal var _backpressureManager = BackpressureManager()
/// The value of SO_REUSEADDR.
internal var reuseAddress = false
/// The value of SO_REUSEPORT.
internal var reusePort = false
/// Whether to use peer-to-peer connectivity when connecting to Bonjour services.
internal var enablePeerToPeer = false
/// The cache of the local and remote socket addresses. Must be accessed using _addressCacheLock.
internal var _addressCache = AddressCache(local: nil, remote: nil)
internal var addressCache: AddressCache {
get {
return self._addressCacheLock.withLock {
return self._addressCache
}
}
set {
return self._addressCacheLock.withLock {
self._addressCache = newValue
}
}
}
/// A lock that guards the _addressCache.
internal let _addressCacheLock = NIOLock()
internal var allowLocalEndpointReuse = false
internal var multipathServiceType: NWParameters.MultipathServiceType = .disabled
var parameters: NWParameters {
NWParameters(dtls: self.tlsOptions, udp: self.udpOptions)
}
var _inboundStreamOpen: Bool {
switch self.state {
case .active(.open):
return true
case .idle, .registered, .activating, .active, .inactive:
return false
}
}
func setChannelSpecificOption0<Option>(option: Option, value: Option.Value) throws where Option : NIOCore.ChannelOption {
fatalError("option \(type(of: option)).\(option) not supported")
}
func getChannelSpecificOption0<Option>(option: Option) throws -> Option.Value where Option : ChannelOption {
fatalError("option \(type(of: option)).\(option) not supported")
}
/// Create a `NIOTSDatagramConnectionChannel` on a given `NIOTSEventLoop`.
///
/// Note that `NIOTSDatagramConnectionChannel` objects cannot be created on arbitrary loops types.
internal init(eventLoop: NIOTSEventLoop,
parent: Channel? = nil,
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?) {
self.tsEventLoop = eventLoop
self.closePromise = eventLoop.makePromise()
self.parent = parent
self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos)
self.udpOptions = udpOptions
self.tlsOptions = tlsOptions
// Must come last, as it requires self to be completely initialized.
self._pipeline = ChannelPipeline(channel: self)
}
/// Create a `NIOTSDatagramConnectionChannel` with an already-established `NWConnection`.
internal convenience init(wrapping connection: NWConnection,
on eventLoop: NIOTSEventLoop,
parent: Channel,
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?) {
self.init(eventLoop: eventLoop,
parent: parent,
qos: qos,
udpOptions: udpOptions,
tlsOptions: tlsOptions)
self.connection = connection
}
}
#endif

View File

@ -0,0 +1,372 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import NIOCore
import Dispatch
import Network
/// A ``NIOTSListenerBootstrap`` is an easy way to bootstrap a `NIOTSListenerChannel` when creating network servers.
///
/// Example:
///
/// ```swift
/// let group = NIOTSEventLoopGroup()
/// defer {
/// try! group.syncShutdownGracefully()
/// }
/// let bootstrap = NIOTSListenerBootstrap(group: group)
/// // Specify backlog and enable SO_REUSEADDR for the server itself
/// .serverChannelOption(ChannelOptions.backlog, value: 256)
/// .serverChannelOption(ChannelOptions.socketOption(.reuseaddr), value: 1)
///
/// // Set the handlers that are applied to the accepted child `Channel`s.
/// .childChannelInitializer { channel in
/// // Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
/// channel.pipeline.addHandler(BackPressureHandler()).flatMap { () in
/// // make sure to instantiate your `ChannelHandlers` inside of
/// // the closure as it will be invoked once per connection.
/// channel.pipeline.addHandler(MyChannelHandler())
/// }
/// }
/// let channel = try! bootstrap.bind(host: host, port: port).wait()
/// /* the server will now be accepting connections */
///
/// try! channel.closeFuture.wait() // wait forever as we never close the Channel
/// ```
///
/// The `EventLoopFuture` returned by `bind` will fire with a `NIOTSListenerChannel`. This is the channel that owns the
/// listening socket. Each time it accepts a new connection it will fire a `NIOTSConnectionChannel` through the
/// `ChannelPipeline` via `fireChannelRead`: as a result, the `NIOTSListenerChannel` operates on `Channel`s as inbound
/// messages. Outbound messages are not supported on a `NIOTSListenerChannel` which means that each write attempt will
/// fail.
///
/// Accepted `NIOTSConnectionChannel`s operate on `ByteBuffer` as inbound data, and `IOData` as outbound data.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public final class NIOTSDatagramListenerBootstrap {
private let group: EventLoopGroup
private let childGroup: EventLoopGroup
private var serverChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var childChannelInit: ((Channel) -> EventLoopFuture<Void>)?
private var serverChannelOptions = ChannelOptions.Storage()
private var childChannelOptions = ChannelOptions.Storage()
private var serverQoS: DispatchQoS?
private var childQoS: DispatchQoS?
private var udpOptions: NWProtocolUDP.Options = .init()
private var tlsOptions: NWProtocolTLS.Options?
private var bindTimeout: TimeAmount?
/// Create a ``NIOTSListenerBootstrap`` for the `EventLoopGroup` `group`.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
/// existing `NIOTSEventLoop` may be used to initialize this bootstrap. Where possible the
/// initializers accepting ``NIOTSEventLoopGroup`` should be used instead to avoid the wrong
/// type being used.
///
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `NIOTSListenerChannel`.
public convenience init(group: EventLoopGroup) {
self.init(group: group, childGroup: group)
}
/// Create a ``NIOTSListenerBootstrap`` for the ``NIOTSEventLoopGroup`` `group`.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `NIOTSListenerChannel`.
public convenience init(group: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup)
}
/// Create a ``NIOTSListenerBootstrap``.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
/// existing `NIOTSEventLoop` may be used to initialize this bootstrap. Where possible the
/// initializers accepting ``NIOTSEventLoopGroup`` should be used instead to avoid the wrong
/// type being used.
///
/// > Note: The "real" solution is described in https://github.com/apple/swift-nio/issues/674.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
public convenience init(group: EventLoopGroup, childGroup: EventLoopGroup) {
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
preconditionFailure("NIOTSListenerBootstrap is only compatible with NIOTSEventLoopGroup and " +
"NIOTSEventLoop. You tried constructing one with group: \(group) and " +
"childGroup: \(childGroup) at least one of which is incompatible.")
}
self.init(validatingGroup: group, childGroup: childGroup)!
}
/// Create a ``NIOTSListenerBootstrap`` on the `EventLoopGroup` `group` which accepts `Channel`s on `childGroup`,
/// validating that the `EventLoopGroup`s are compatible with ``NIOTSListenerBootstrap``.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The `EventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on.
public init?(validatingGroup group: EventLoopGroup, childGroup: EventLoopGroup? = nil) {
let childGroup = childGroup ?? group
guard NIOTSBootstraps.isCompatible(group: group) && NIOTSBootstraps.isCompatible(group: childGroup) else {
return nil
}
self.group = group
self.childGroup = childGroup
}
/// Create a ``NIOTSListenerBootstrap``.
///
/// - parameters:
/// - group: The ``NIOTSEventLoopGroup`` to use for the `bind` of the `NIOTSListenerChannel`
/// and to accept new `NIOTSConnectionChannel`s with.
/// - childGroup: The ``NIOTSEventLoopGroup`` to run the accepted `NIOTSConnectionChannel`s on.
public convenience init(group: NIOTSEventLoopGroup, childGroup: NIOTSEventLoopGroup) {
self.init(group: group as EventLoopGroup, childGroup: childGroup as EventLoopGroup)
}
/// Initialize the `NIOTSListenerChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The `NIOTSListenerChannel` uses the accepted `NIOTSConnectionChannel`s as inbound messages.
///
/// > Note: To set the initializer for the accepted `NIOTSConnectionChannel`s, look at
/// ``childChannelInitializer(_:)``.
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func serverChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.serverChannelInit = initializer
return self
}
/// Initialize the accepted `NIOTSConnectionChannel`s with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`. Note that if the `initializer` fails then the error will be
/// fired in the *parent* channel.
///
/// The accepted `Channel` will operate on `ByteBuffer` as inbound and `IOData` as outbound messages.
///
/// - parameters:
/// - initializer: A closure that initializes the provided `Channel`.
public func childChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.childChannelInit = initializer
return self
}
/// Specifies a `ChannelOption` to be applied to the `NIOTSListenerChannel`.
///
/// > Note: To specify options for the accepted `NIOTSConnectionChannel`s, look at ``childChannelOption(_:value:)``.
///
/// - parameters:
/// - option: The option to be applied.
/// - value: The value for the option.
public func serverChannelOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> Self {
self.serverChannelOptions.append(key: option, value: value)
return self
}
/// Specifies a `ChannelOption` to be applied to the accepted `NIOTSConnectionChannel`s.
///
/// - parameters:
/// - option: The option to be applied.
/// - value: The value for the option.
public func childChannelOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> Self {
self.childChannelOptions.append(key: option, value: value)
return self
}
/// Specifies a timeout to apply to a bind attempt.
///
/// - parameters:
/// - timeout: The timeout that will apply to the bind attempt.
public func bindTimeout(_ timeout: TimeAmount) -> Self {
self.bindTimeout = timeout
return self
}
/// Specifies a QoS to use for the server channel, instead of the default QoS for the
/// event loop.
///
/// This allows unusually high or low priority workloads to be appropriately scheduled.
public func serverQoS(_ qos: DispatchQoS) -> Self {
self.serverQoS = qos
return self
}
/// Specifies a QoS to use for the child connections created from the server channel,
/// instead of the default QoS for the event loop.
///
/// This allows unusually high or low priority workloads to be appropriately scheduled.
public func childQoS(_ qos: DispatchQoS) -> Self {
self.childQoS = qos
return self
}
/// Specifies the TCP options to use on the child `Channel`s.
public func udpOptions(_ options: NWProtocolUDP.Options) -> Self {
self.udpOptions = options
return self
}
/// Specifies the TLS options to use on the child `Channel`s.
public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self {
self.tlsOptions = options
return self
}
/// Bind the `NIOTSListenerChannel` to `host` and `port`.
///
/// - parameters:
/// - host: The host to bind on.
/// - port: The port to bind on.
public func bind(host: String, port: Int) -> EventLoopFuture<Channel> {
let validPortRange = Int(UInt16.min)...Int(UInt16.max)
guard validPortRange.contains(port) else {
return self.group.next().makeFailedFuture(NIOTSErrors.InvalidPort(port: port))
}
return self.bind0(shouldRegister: true) { (channel, promise) in
do {
// NWListener does not actually resolve hostname-based NWEndpoints
// for use with requiredLocalEndpoint, so we fall back to
// SocketAddress for this.
let address = try SocketAddress.makeAddressResolvingHost(host, port: port)
channel.bind(to: address, promise: promise)
} catch {
promise.fail(error)
}
}
}
/// Bind the `NIOTSListenerChannel` to `address`.
///
/// - parameters:
/// - address: The `SocketAddress` to bind on.
public func bind(to address: SocketAddress) -> EventLoopFuture<Channel> {
return self.bind0(shouldRegister: true) { (channel, promise) in
channel.bind(to: address, promise: promise)
}
}
/// Bind the `NIOTSListenerChannel` to a UNIX Domain Socket.
///
/// - parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system.
public func bind(unixDomainSocketPath: String) -> EventLoopFuture<Channel> {
return self.bind0(shouldRegister: true) { (channel, promise) in
do {
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
channel.bind(to: address, promise: promise)
} catch {
promise.fail(error)
}
}
}
/// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`.
///
/// - parameters:
/// - endpoint: The `NWEndpoint` to bind this channel to.
public func bind(endpoint: NWEndpoint) -> EventLoopFuture<Channel> {
return self.bind0(shouldRegister: true) { (channel, promise) in
channel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint), promise: promise)
}
}
/// Bind the `NIOTSListenerChannel` to an existing `NWListener`.
///
/// - parameters:
/// - listener: The NWListener to wrap.
public func withNWListener(_ listener:NWListener) -> EventLoopFuture<Channel>{
return self.bind0(existingNWListener: listener,shouldRegister: false) { channel, promise in
channel.registerAlreadyConfigured0(promise: promise)
}
}
private func bind0(existingNWListener: NWListener? = nil, shouldRegister: Bool, _ binder: @escaping (NIOTSDatagramListenerChannel, EventLoopPromise<Void>) -> Void) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next() as! NIOTSEventLoop
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) }
let childChannelInit = self.childChannelInit
let serverChannelOptions = self.serverChannelOptions
let childChannelOptions = self.childChannelOptions
let serverChannel: NIOTSDatagramListenerChannel
if let newListener = existingNWListener {
serverChannel = NIOTSDatagramListenerChannel(wrapping: newListener,
on: eventLoop,
qos: self.serverQoS,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childUDPOptions: self.udpOptions,
childTLSOptions: self.tlsOptions)
} else {
serverChannel = NIOTSDatagramListenerChannel(eventLoop: eventLoop,
qos: self.serverQoS,
udpOptions: self.udpOptions,
tlsOptions: self.tlsOptions,
childLoopGroup: self.childGroup,
childChannelQoS: self.childQoS,
childUDPOptions: self.udpOptions,
childTLSOptions: self.tlsOptions)
}
return eventLoop.submit {
return serverChannelOptions.applyAllChannelOptions(to: serverChannel).flatMap {
serverChannelInit(serverChannel)
}.flatMap {
eventLoop.assertInEventLoop()
return serverChannel.pipeline.addHandler(AcceptHandler<NIOTSDatagramChannel>(childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions))
}.flatMap {
if shouldRegister{
return serverChannel.register()
} else {
return eventLoop.makeSucceededVoidFuture()
}
}.flatMap {
let bindPromise = eventLoop.makePromise(of: Void.self)
binder(serverChannel, bindPromise)
if let bindTimeout = self.bindTimeout {
let cancelTask = eventLoop.scheduleTask(in: bindTimeout) {
bindPromise.fail(NIOTSErrors.BindTimeout(timeout: bindTimeout))
serverChannel.close(promise: nil)
}
bindPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
cancelTask.cancel()
}
}
return bindPromise.futureResult
}.map {
serverChannel as Channel
}.flatMapError { error in
serverChannel.close0(error: error, mode: .all, promise: nil)
return eventLoop.makeFailedFuture(error)
}
}.flatMap {
$0
}
}
}
#endif

View File

@ -0,0 +1,158 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import Foundation
import NIOCore
import NIOFoundationCompat
import NIOConcurrencyHelpers
import Dispatch
import Network
import Atomics
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal final class NIOTSDatagramListenerChannel: StateManagedListenerChannel<NIOTSDatagramChannel> {
/// The TCP options for this listener.
private var udpOptions: NWProtocolUDP.Options {
get {
guard case .udp(let options) = protocolOptions else {
fatalError("NIOTSDatagramListenerChannel did not have a UDP protocol state")
}
return options
}
set {
assert({
if case .udp = protocolOptions {
return true
} else {
return false
}
}(), "The protocol options of this channel were not configured as UDP")
protocolOptions = .udp(newValue)
}
}
/// The TCP options to use for child channels.
private var childUDPOptions: NWProtocolUDP.Options {
get {
guard case .udp(let options) = childProtocolOptions else {
fatalError("NIOTSDatagramListenerChannel did not have a UDP protocol state")
}
return options
}
set {
assert({
if case .udp = childProtocolOptions {
return true
} else {
return false
}
}(), "The protocol options of child channelss were not configured as UDP")
childProtocolOptions = .udp(newValue)
}
}
/// Create a `NIOTSDatagramListenerChannel` on a given `NIOTSEventLoop`.
///
/// Note that `NIOTSDatagramListenerChannel` objects cannot be created on arbitrary loops types.
internal convenience init(eventLoop: NIOTSEventLoop,
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childUDPOptions: NWProtocolUDP.Options,
childTLSOptions: NWProtocolTLS.Options?) {
self.init(
eventLoop: eventLoop,
protocolOptions: .udp(udpOptions),
tlsOptions: tlsOptions,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .udp(childUDPOptions),
childTLSOptions: childTLSOptions
)
}
/// Create a `NIOTSDatagramListenerChannel` with an already-established `NWListener`.
internal convenience init(wrapping listener: NWListener,
on eventLoop: NIOTSEventLoop,
qos: DispatchQoS? = nil,
udpOptions: NWProtocolUDP.Options,
tlsOptions: NWProtocolTLS.Options?,
childLoopGroup: EventLoopGroup,
childChannelQoS: DispatchQoS?,
childUDPOptions: NWProtocolUDP.Options,
childTLSOptions: NWProtocolTLS.Options?) {
self.init(
wrapping: listener,
eventLoop: eventLoop,
protocolOptions: .udp(udpOptions),
tlsOptions: tlsOptions,
childLoopGroup: childLoopGroup,
childChannelQoS: childChannelQoS,
childProtocolOptions: .udp(childUDPOptions),
childTLSOptions: childTLSOptions
)
}
/// Called by the underlying `NWListener` when a new connection has been received.
internal override func newConnectionHandler(connection: NWConnection) {
guard self.isActive else {
return
}
let newChannel = NIOTSDatagramChannel(
wrapping: connection,
on: self.childLoopGroup.next() as! NIOTSEventLoop,
parent: self,
udpOptions: self.childUDPOptions,
tlsOptions: self.childTLSOptions)
self.pipeline.fireChannelRead(NIOAny(newChannel))
self.pipeline.fireChannelReadComplete()
}
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSDatagramListenerChannel {
internal struct SynchronousOptions: NIOSynchronousChannelOptions {
private let channel: NIOTSDatagramListenerChannel
fileprivate init(channel: NIOTSDatagramListenerChannel) {
self.channel = channel
}
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
try self.channel.setOption0(option: option, value: value)
}
public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
return try self.channel.getOption0(option: option)
}
}
public var syncOptions: NIOSynchronousChannelOptions? {
return SynchronousOptions(channel: self)
}
}
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
extension NIOTSDatagramListenerChannel: @unchecked Sendable {}
#endif

View File

@ -172,7 +172,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
internal var state: ChannelState<ActiveSubstate> = .idle
/// The active state, used for safely reporting the channel state across threads.
internal var isActive0 = ManagedAtomic(false)
internal let isActive0 = ManagedAtomic(false)
/// The kinds of channel activation this channel supports
internal let supportedActivationType: ActivationType = .connect

View File

@ -70,7 +70,7 @@ public final class NIOTSListenerBootstrap {
/// Create a ``NIOTSListenerBootstrap`` for the `EventLoopGroup` `group`.
///
/// This initializer only exists to be more in-line with the NIO core bootstraps, in that they
/// may be constructed with an `EventLoopGroup` and by extenstion an `EventLoop`. As such an
/// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an
/// existing `NIOTSEventLoop` may be used to initialize this bootstrap. Where possible the
/// initializers accepting ``NIOTSEventLoopGroup`` should be used instead to avoid the wrong
/// type being used.

View File

@ -56,7 +56,7 @@ internal final class NIOTSListenerChannel: StateManagedListenerChannel<NIOTSConn
}
set {
assert({
if case .tcp = protocolOptions {
if case .tcp = childProtocolOptions {
return true
} else {
return false

View File

@ -103,7 +103,7 @@ internal protocol StateManagedChannel: Channel, ChannelCore {
var state: ChannelState<ActiveSubstate> { get set }
var isActive0: ManagedAtomic<Bool> { get set }
var isActive0: ManagedAtomic<Bool> { get }
var tsEventLoop: NIOTSEventLoop { get }

View File

@ -0,0 +1,159 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Network)
import XCTest
import Network
import NIOCore
import NIOTransportServices
import Foundation
extension Channel {
func wait<T>(for type: T.Type, count: Int) throws -> [T] {
return try self.pipeline.context(name: "ByteReadRecorder").flatMap { context in
if let future = (context.handler as? ReadRecorder<T>)?.notifyForDatagrams(count) {
return future
}
XCTFail("Could not wait for reads")
return self.eventLoop.makeSucceededFuture([] as [T])
}.wait()
}
func waitForDatagrams(count: Int) throws -> [ByteBuffer] {
try wait(for: ByteBuffer.self, count: count)
}
func readCompleteCount() throws -> Int {
return try self.pipeline.context(name: "ByteReadRecorder").map { context in
return (context.handler as! ReadRecorder<ByteBuffer>).readCompleteCount
}.wait()
}
func configureForRecvMmsg(messageCount: Int) throws {
let totalBufferSize = messageCount * 2048
try self.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: totalBufferSize)).flatMap {
self.setOption(ChannelOptions.datagramVectorReadMessageCount, value: messageCount)
}.wait()
}
}
final class ReadRecorder<DataType>: ChannelInboundHandler {
typealias InboundIn = DataType
typealias InboundOut = DataType
enum State {
case fresh
case registered
case active
}
var reads: [DataType] = []
var loop: EventLoop? = nil
var state: State = .fresh
var readWaiters: [Int: EventLoopPromise<[DataType]>] = [:]
var readCompleteCount = 0
func channelRegistered(context: ChannelHandlerContext) {
XCTAssertEqual(.fresh, self.state)
self.state = .registered
self.loop = context.eventLoop
}
func channelActive(context: ChannelHandlerContext) {
XCTAssertEqual(.registered, self.state)
self.state = .active
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
XCTAssertEqual(.active, self.state)
let data = self.unwrapInboundIn(data)
reads.append(data)
if let promise = readWaiters.removeValue(forKey: reads.count) {
promise.succeed(reads)
}
context.fireChannelRead(self.wrapInboundOut(data))
}
func channelReadComplete(context: ChannelHandlerContext) {
self.readCompleteCount += 1
context.fireChannelReadComplete()
}
func notifyForDatagrams(_ count: Int) -> EventLoopFuture<[DataType]> {
guard reads.count < count else {
return loop!.makeSucceededFuture(.init(reads.prefix(count)))
}
readWaiters[count] = loop!.makePromise()
return readWaiters[count]!.futureResult
}
}
// Mimicks the DatagramChannelTest from apple/swift-nio
@available(OSX 10.14, iOS 12.0, tvOS 12.0, *)
final class NIOTSDatagramConnectionChannelTests: XCTestCase {
private var group: NIOTSEventLoopGroup!
private func buildServerChannel(group: NIOTSEventLoopGroup, host: String = "127.0.0.1", port: Int = 0, onConnect: @escaping (Channel) -> ()) throws -> Channel {
return try NIOTSDatagramListenerBootstrap(group: group)
.childChannelInitializer { childChannel in
onConnect(childChannel)
return childChannel.pipeline.addHandler(ReadRecorder<ByteBuffer>(), name: "ByteReadRecorder")
}
.bind(host: host, port: port)
.wait()
}
private func buildClientChannel(group: NIOTSEventLoopGroup, host: String = "127.0.0.1", port: Int) throws -> Channel {
return try NIOTSDatagramBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandler(ReadRecorder<ByteBuffer>(), name: "ByteReadRecorder")
}
.connect(host: host, port: port)
.wait()
}
override func setUp() {
super.setUp()
self.group = NIOTSEventLoopGroup()
self.continueAfterFailure = false
}
override func tearDown() {
XCTAssertNoThrow(try self.group.syncShutdownGracefully())
}
func testBasicChannelCommunication() throws {
let serverHandlePromise = group.next().makePromise(of: Channel.self)
let server = try buildServerChannel(group: group, onConnect: serverHandlePromise.succeed)
let client = try buildClientChannel(group: group, port: server.localAddress!.port!)
var buffer = client.allocator.buffer(capacity: 256)
buffer.writeStaticString("hello, world!")
XCTAssertNoThrow(try client.writeAndFlush(buffer).wait())
let serverHandle = try serverHandlePromise.futureResult.wait()
let reads = try serverHandle.waitForDatagrams(count: 1)
XCTAssertEqual(reads.count, 1)
XCTAssertEqual(reads[0], buffer)
}
}
#endif