From 7a18352b5edfe543ebd0e6fec9c0a1b2d9a878b6 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Sat, 8 Jul 2023 13:41:46 +0200 Subject: [PATCH] Extract the NWConnection code into StateManagedNWConnectionChannel (#174) * Extract the NWConnection code into StateManagedNWConnectionChannel * Process cory's feedback * Remove dead code whose functionality is already handled by the helper protocol * Make the address cache non-locked, because its lock has been moved to the accessors * Move getting the Multipath option into the TCP channel, re-introduce the underscores to the addressCache properties * Drop the umbrella import --------- Co-authored-by: Cory Benfield --- .../NIOTSConnectionBootstrap.swift | 2 +- .../NIOTSConnectionChannel.swift | 550 ++------------- .../NIOTSListenerChannel.swift | 15 +- .../StateManagedChannel.swift | 4 +- .../StateManagedNWConnectionChannel.swift | 625 ++++++++++++++++++ .../TCPOptions+SocketChannelOption.swift | 2 +- 6 files changed, 691 insertions(+), 507 deletions(-) create mode 100644 Sources/NIOTransportServices/StateManagedNWConnectionChannel.swift diff --git a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift index c653635..c2a159c 100644 --- a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift +++ b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift @@ -199,7 +199,7 @@ public final class NIOTSConnectionBootstrap { /// Specify the `endpoint` to connect to for the TCP `Channel` that will be established. public func connect(endpoint: NWEndpoint) -> EventLoopFuture { - return self.connect(shouldRegister: true) {channel, promise in + return self.connect(shouldRegister: true) { channel, promise in channel.triggerUserOutboundEvent(NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint), promise: promise) } diff --git a/Sources/NIOTransportServices/NIOTSConnectionChannel.swift b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift index 9e0a988..e3f0b55 100644 --- a/Sources/NIOTransportServices/NIOTSConnectionChannel.swift +++ b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift @@ -25,7 +25,7 @@ import Atomics /// Channel options for the connection channel. @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) -private struct ConnectionChannelOptions { +struct TransportServicesChannelOptions { /// Whether autoRead is enabled for this channel. internal var autoRead: Bool = true @@ -37,10 +37,6 @@ private struct ConnectionChannelOptions { internal var waitForActivity: Bool = true } - -private typealias PendingWrite = (data: ByteBuffer, promise: EventLoopPromise?) - - internal struct AddressCache { // deliberately lets because they must always be updated together (so forcing `init` is useful). let local: Optional @@ -55,7 +51,7 @@ internal struct AddressCache { /// A structure that manages backpressure signaling on this channel. @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) -private struct BackpressureManager { +internal struct BackpressureManager { /// Whether the channel is writable, given the current watermark state. /// /// This is an atomic only because the channel writability flag needs to be safe to access from multiple @@ -131,7 +127,7 @@ private struct BackpressureManager { @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) -internal final class NIOTSConnectionChannel { +internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel { /// The `ByteBufferAllocator` for this `Channel`. public let allocator = ByteBufferAllocator() @@ -146,23 +142,29 @@ internal final class NIOTSConnectionChannel { /// The `EventLoop` this `Channel` belongs to. internal let tsEventLoop: NIOTSEventLoop - private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads. + private(set) var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads. internal let closePromise: EventLoopPromise /// The underlying `NWConnection` that this `Channel` wraps. This is only non-nil /// after the initial connection attempt has been made. - private var nwConnection: NWConnection? + internal var connection: NWConnection? /// The `DispatchQueue` that socket events for this connection will be dispatched onto. - private let connectionQueue: DispatchQueue + internal let connectionQueue: DispatchQueue /// An `EventLoopPromise` that will be succeeded or failed when a connection attempt succeeds or fails. - private var connectPromise: EventLoopPromise? + internal var connectPromise: EventLoopPromise? + + internal var parameters: NWParameters { + NWParameters(tls: self.tlsOptions, tcp: self.tcpOptions) + } /// The TCP options for this connection. private var tcpOptions: NWProtocolTCP.Options + internal var nwOptions: NWProtocolTCP.Options { self.tcpOptions } + /// The TLS options for this connection, if any. private var tlsOptions: NWProtocolTLS.Options? @@ -177,37 +179,37 @@ internal final class NIOTSConnectionChannel { /// Whether a call to NWConnection.receive has been made, but the completion /// handler has not yet been invoked. - private var outstandingRead: Bool = false + internal var outstandingRead: Bool = false /// The options for this channel. - private var options: ConnectionChannelOptions = ConnectionChannelOptions() + internal var options = TransportServicesChannelOptions() /// Any pending writes that have yet to be delivered to the network stack. - private var pendingWrites = CircularBuffer(initialCapacity: 8) + internal var pendingWrites = CircularBuffer(initialCapacity: 8) /// An object to keep track of pending writes and manage our backpressure signaling. - private var backpressureManager = BackpressureManager() + internal var _backpressureManager = BackpressureManager() /// The value of SO_REUSEADDR. - private var reuseAddress = false + internal var reuseAddress = false /// The value of SO_REUSEPORT. - private var reusePort = false + internal var reusePort = false /// The value of the allowLocalEndpointReuse option. - private var allowLocalEndpointReuse = false + internal var allowLocalEndpointReuse = false /// Whether to use peer-to-peer connectivity when connecting to Bonjour services. - private var enablePeerToPeer = false + internal var enablePeerToPeer = false /// The default multipath service type. - private var multipathServiceType = NWParameters.MultipathServiceType.disabled + internal var multipathServiceType = NWParameters.MultipathServiceType.disabled /// The cache of the local and remote socket addresses. Must be accessed using _addressCacheLock. - private var _addressCache = AddressCache(local: nil, remote: nil) + internal var _addressCache = AddressCache(local: nil, remote: nil) /// A lock that guards the _addressCache. - private let _addressCacheLock = NIOLock() + internal let _addressCacheLock = NIOLock() /// Create a `NIOTSConnectionChannel` on a given `NIOTSEventLoop`. /// @@ -240,7 +242,7 @@ internal final class NIOTSConnectionChannel { qos: qos, tcpOptions: tcpOptions, tlsOptions: tlsOptions) - self.nwConnection = connection + self.connection = connection } } @@ -248,99 +250,16 @@ internal final class NIOTSConnectionChannel { // MARK:- NIOTSConnectionChannel implementation of Channel @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) extension NIOTSConnectionChannel: Channel { - /// The `ChannelPipeline` for this `Channel`. - public var pipeline: ChannelPipeline { - return self._pipeline - } - - /// The local address for this channel. - public var localAddress: SocketAddress? { - return self._addressCacheLock.withLock { - return self._addressCache.local - } - } - - /// The remote address for this channel. - public var remoteAddress: SocketAddress? { - return self._addressCacheLock.withLock { - return self._addressCache.remote - } - } - - /// Whether this channel is currently writable. - public var isWritable: Bool { - return self.backpressureManager.writable.load(ordering: .relaxed) - } - - public var _channelCore: ChannelCore { - return self - } - - public func setOption(_ option: Option, value: Option.Value) -> EventLoopFuture { - if self.eventLoop.inEventLoop { - return self.eventLoop.makeCompletedFuture(Result { try setOption0(option: option, value: value) }) - } else { - return self.eventLoop.submit { try self.setOption0(option: option, value: value) } - } - } - - private func setOption0(option: Option, value: Option.Value) throws { - self.eventLoop.preconditionInEventLoop() - - guard !self.closed else { - throw ChannelError.ioOnClosedChannel - } - + func getChannelSpecificOption0