//===----------------------------------------------------------------------===// // // This source file is part of the SwiftNIO open source project // // Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information // See CONTRIBUTORS.txt for the list of SwiftNIO project authors // // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// #if canImport(Network) import Foundation import NIOCore import NIOConcurrencyHelpers import NIOFoundationCompat import NIOTLS import Dispatch import Network import Security @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) protocol NWConnectionSubstate: ActiveChannelSubstate { static func closeInput(state: inout ChannelState) throws static func closeOutput(state: inout ChannelState) throws } internal typealias PendingWrite = (data: ByteBuffer, promise: EventLoopPromise?) protocol NWOptionsProtocol { /// Apply a given channel `SocketOption` to this protocol options state. func applyChannelOption(option: ChannelOptions.Types.SocketOption, value: SocketOptionValue) throws /// Obtain the given `SocketOption` value for this protocol options state. func valueFor(socketOption option: ChannelOptions.Types.SocketOption) throws -> SocketOptionValue } @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) internal protocol StateManagedNWConnectionChannel: StateManagedChannel where ActiveSubstate: NWConnectionSubstate { associatedtype NWOptions: NWOptionsProtocol var parameters: NWParameters { get } var nwOptions: NWOptions { get } var connection: NWConnection? { get set } var connectionQueue: DispatchQueue { get } var connectPromise: EventLoopPromise? { get set } var outstandingRead: Bool { get set } var options: TransportServicesChannelOptions { get set } var pendingWrites: CircularBuffer { get set } var _backpressureManager: BackpressureManager { get set } var reuseAddress: Bool { get set } var reusePort: Bool { get set } var enablePeerToPeer: Bool { get set } var _inboundStreamOpen: Bool { get } var _pipeline: ChannelPipeline! { get } var _addressCache: AddressCache { get set } var _addressCacheLock: NIOLock { get } var allowLocalEndpointReuse: Bool { get set } var multipathServiceType: NWParameters.MultipathServiceType { get } func setChannelSpecificOption0(option: Option, value: Option.Value) throws func getChannelSpecificOption0(option: Option) throws -> Option.Value } @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) extension StateManagedNWConnectionChannel { public var pipeline: ChannelPipeline { self._pipeline } public var _channelCore: ChannelCore { return self } /// The local address for this channel. public var localAddress: SocketAddress? { return self._addressCacheLock.withLock { return self._addressCache.local } } /// The remote address for this channel. public var remoteAddress: SocketAddress? { return self._addressCacheLock.withLock { return self._addressCache.remote } } /// Whether this channel is currently writable. public var isWritable: Bool { return self._backpressureManager.writable.load(ordering: .relaxed) } internal func beginActivating0(to target: NWEndpoint, promise: EventLoopPromise?) { assert(self.connection == nil) assert(self.connectPromise == nil) // Before we start, we validate that the target won't cause a crash: see // https://github.com/apple/swift-nio/issues/1617. if case .hostPort(host: let host, port: _) = target, host == "" { // We don't pass the promise in here because we'll actually not complete it. We complete it manually ourselves. self.close0(error: NIOTSErrors.InvalidHostname(), mode: .all, promise: nil) promise?.fail(NIOTSErrors.InvalidHostname()) return } self.connectPromise = promise let parameters = parameters // Network.framework munges REUSEADDR and REUSEPORT together, so we turn this on if we need // either or it's been explicitly set. parameters.allowLocalEndpointReuse = self.reuseAddress || self.reusePort || self.allowLocalEndpointReuse parameters.includePeerToPeer = self.enablePeerToPeer parameters.multipathServiceType = self.multipathServiceType let connection = NWConnection(to: target, using: parameters) connection.stateUpdateHandler = self.stateUpdateHandler(newState:) connection.betterPathUpdateHandler = self.betterPathHandler connection.pathUpdateHandler = self.pathChangedHandler(newPath:) // Ok, state is ready. Let's go! self.connection = connection connection.start(queue: self.connectionQueue) } public func write0(_ data: NIOAny, promise: EventLoopPromise?) { guard self.isActive else { promise?.fail(ChannelError.ioOnClosedChannel) return } // TODO: We would ideally support all of IOData here, gotta work out how to do that without HOL blocking // all writes terribly. // My best guess at this time is that Data(contentsOf:) may mmap the file in question, which would let us // at least only block the network stack itself rather than our thread. I'm not certain though, especially // on Linux. Should investigate. let data = self.unwrapData(data, as: ByteBuffer.self) self.pendingWrites.append((data, promise)) /// This may cause our writability state to change. if self._backpressureManager.writabilityChanges(whenQueueingBytes: data.readableBytes) { self.pipeline.fireChannelWritabilityChanged() } } public func flush0() { guard self.isActive else { return } guard let conn = self.connection else { preconditionFailure("nwconnection cannot be nil while channel is active") } func completionCallback(promise: EventLoopPromise?, sentBytes: Int) -> ((NWError?) -> Void) { return { error in if let error = error { promise?.fail(error) } else { promise?.succeed(()) } if self._backpressureManager.writabilityChanges(whenBytesSent: sentBytes) { self.pipeline.fireChannelWritabilityChanged() } } } conn.batch { while self.pendingWrites.count > 0 { let write = self.pendingWrites.removeFirst() let buffer = write.data let content = buffer.getData(at: buffer.readerIndex, length: buffer.readableBytes) conn.send(content: content, completion: .contentProcessed(completionCallback(promise: write.promise, sentBytes: buffer.readableBytes))) } } } public func localAddress0() throws -> SocketAddress { guard let localEndpoint = self.connection?.currentPath?.localEndpoint else { throw NIOTSErrors.NoCurrentPath() } // TODO: Support wider range of address types. return try SocketAddress(fromNWEndpoint: localEndpoint) } public func remoteAddress0() throws -> SocketAddress { guard let remoteEndpoint = self.connection?.currentPath?.remoteEndpoint else { throw NIOTSErrors.NoCurrentPath() } // TODO: Support wider range of address types. return try SocketAddress(fromNWEndpoint: remoteEndpoint) } internal func alreadyConfigured0(promise: EventLoopPromise?) { guard let connection = connection else { promise?.fail(NIOTSErrors.NotPreConfigured()) return } guard case .setup = connection.state else { promise?.fail(NIOTSErrors.NotPreConfigured()) return } self.connectPromise = promise connection.stateUpdateHandler = self.stateUpdateHandler(newState:) connection.betterPathUpdateHandler = self.betterPathHandler connection.pathUpdateHandler = self.pathChangedHandler(newPath:) connection.start(queue: self.connectionQueue) } /// Perform a read from the network. /// /// This method has a slightly strange semantic, because we do not allow multiple reads at once. As a result, this /// is a *request* to read, and if there is a read already being processed then this method will do nothing. public func read0() { guard self._inboundStreamOpen && !self.outstandingRead else { return } guard let conn = self.connection else { preconditionFailure("Connection should not be nil") } // TODO: Can we do something sensible with these numbers? self.outstandingRead = true conn.receive(minimumIncompleteLength: 1, maximumLength: 8192, completion: self.dataReceivedHandler(content:context:isComplete:error:)) } public func doClose0(error: Error) { guard let conn = self.connection else { // We don't have a connection to close here, so we're actually done. Our old state // was idle. assert(self.pendingWrites.count == 0) return } // Step 1 is to tell the network stack we're done. // TODO: Does this drop the connection fully, or can we keep receiving data? Must investigate. conn.cancel() // Step 2 is to fail all outstanding writes. self.dropOutstandingWrites(error: error) // Step 3 is to cancel a pending connect promise, if any. if let pendingConnect = self.connectPromise { self.connectPromise = nil pendingConnect.fail(error) } } public func doHalfClose0(error: Error, promise: EventLoopPromise?) { guard let conn = self.connection else { // We don't have a connection to half close, so fail the promise. promise?.fail(ChannelError.ioOnClosedChannel) return } do { try ActiveSubstate.closeOutput(state: &self.state) } catch ChannelError.outputClosed { // Here we *only* fail the promise, no need to blow up the connection. promise?.fail(ChannelError.outputClosed) return } catch { // For any other error, this is fatal. self.close0(error: error, mode: .all, promise: promise) return } func completionCallback(for promise: EventLoopPromise?) -> ((NWError?) -> Void) { return { error in if let error = error { promise?.fail(error) } else { promise?.succeed(()) } } } // It should not be possible to have a pending connect promise while we're doing half-closure. assert(self.connectPromise == nil) // Step 1 is to tell the network stack we're done. conn.send(content: nil, contentContext: .finalMessage, completion: .contentProcessed(completionCallback(for: promise))) // Step 2 is to fail all outstanding writes. self.dropOutstandingWrites(error: error) } public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { switch event { case let x as NIOTSNetworkEvents.ConnectToNWEndpoint: self.connect0(to: x.endpoint, promise: promise) default: promise?.fail(ChannelError.operationUnsupported) } } public func channelRead0(_ data: NIOAny) { // drop the data, do nothing return } public func errorCaught0(error: Error) { // Currently we don't do anything with errors that pass through the pipeline return } /// A function that will trigger a socket read if necessary. internal func readIfNeeded0() { if self.options.autoRead { self.pipeline.read() } } /// Called by the underlying `NWConnection` when its internal state has changed. private func stateUpdateHandler(newState: NWConnection.State) { switch newState { case .setup: preconditionFailure("Should not be told about this state.") case .waiting(let err): if case .activating = self.state, self.options.waitForActivity { // This means the connection cannot currently be completed. We should notify the pipeline // here, or support this with a channel option or something, but for now for the sake of // demos we will just allow ourselves into this stage.tage. self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.WaitingForConnectivity(transientError: err)) break } // In this state we've transitioned into waiting, presumably from active or closing. In this // version of NIO this is an error, but we should aim to support this at some stage. self.close0(error: err, mode: .all, promise: nil) case .preparing: // This just means connections are being actively established. We have no specific action // here. break case .ready: // Transitioning to ready means the connection was succeeded. Hooray! self.connectionComplete0() case .cancelled: // This is the network telling us we're closed. We don't need to actually do anything here // other than check our state is ok. assert(self.closed) self.connection = nil case .failed(let err): // The connection has failed for some reason. self.close0(error: err, mode: .all, promise: nil) default: // This clause is here to help the compiler out: it's otherwise not able to // actually validate that the switch is exhaustive. Trust me, it is. fatalError("Unreachable") } } /// Called by the underlying `NWConnection` when a network receive has completed. /// /// The state matrix here is large. If `content` is non-nil, some data was received: we need to send it down the pipeline /// and call channelReadComplete. This may be nil, in which case we expect either `isComplete` to be `true` or `error` /// to be non-nil. `isComplete` indicates half-closure on the read side of a connection. `error` is set if the receive /// did not complete due to an error, though there may still be some data. private func dataReceivedHandler(content: Data?, context: NWConnection.ContentContext?, isComplete: Bool, error: NWError?) { precondition(self.outstandingRead) self.outstandingRead = false guard self.isActive else { // If we're already not active, we aren't going to process any of this: it's likely the result of an extra // read somewhere along the line. return } // First things first, if there's data we need to deliver it. if let content = content { // It would be nice if we didn't have to do this copy, but I'm not sure how to avoid it with the current Data // APIs. var buffer = self.allocator.buffer(capacity: content.count) buffer.writeBytes(content) self.pipeline.fireChannelRead(NIOAny(buffer)) self.pipeline.fireChannelReadComplete() } // Next, we want to check if there's an error. If there is, we're going to deliver it, and then close the connection with // it. Otherwise, we're going to check if we read EOF, and if we did we'll close with that instead. if let error = error { self.pipeline.fireErrorCaught(error) self.close0(error: error, mode: .all, promise: nil) } else if isComplete { self.didReadEOF() } // Last, issue a new read automatically if we need to. self.readIfNeeded0() } /// Called by the underlying `NWConnection` when a better path for this connection is available. /// /// Notifies the channel pipeline of the new option. private func betterPathHandler(available: Bool) { if available { self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.BetterPathAvailable()) } else { self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.BetterPathUnavailable()) } } /// Called by the underlying `NWConnection` when this connection changes its network path. /// /// Notifies the channel pipeline of the new path. private func pathChangedHandler(newPath path: NWPath) { self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.PathChanged(newPath: path)) } /// Handle a read EOF. /// /// If the user has indicated they support half-closure, we will emit the standard half-closure /// event. If they have not, we upgrade this to regular closure. private func didReadEOF() { if self.options.supportRemoteHalfClosure { // This is a half-closure, but the connection is still valid. do { try ActiveSubstate.closeInput(state: &self.state) } catch { return self.close0(error: error, mode: .all, promise: nil) } self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed) } else { self.close0(error: ChannelError.eof, mode: .all, promise: nil) } } /// Make the channel active. private func connectionComplete0() { let promise = self.connectPromise self.connectPromise = nil // Before becoming active, update the cached addresses. let localAddress = try? self.localAddress0() let remoteAddress = try? self.remoteAddress0() self._addressCache = AddressCache(local: localAddress, remote: remoteAddress) self.becomeActive0(promise: promise) if let metadata = self.connection?.metadata(definition: NWProtocolTLS.definition) as? NWProtocolTLS.Metadata { // This is a TLS connection, we may need to fire some other events. let securityMetadata = metadata.securityProtocolMetadata // The pointer returned by `sec_protocol_metadata_get_negotiated_protocol` is presumably owned by it, so we need // to confirm it's still alive while we copy the data out. let negotiatedProtocol = withExtendedLifetime(securityMetadata) { sec_protocol_metadata_get_negotiated_protocol(metadata.securityProtocolMetadata).map { String(cString: $0) } } self.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: negotiatedProtocol)) } } /// Drop all outstanding writes. Must only be called in the inactive /// state. private func dropOutstandingWrites(error: Error) { while self.pendingWrites.count > 0 { self.pendingWrites.removeFirst().promise?.fail(error) } } public func setOption(_ option: Option, value: Option.Value) -> EventLoopFuture { if self.eventLoop.inEventLoop { return self.eventLoop.makeCompletedFuture(Result { try setOption0(option: option, value: value) }) } else { return self.eventLoop.submit { try self.setOption0(option: option, value: value) } } } func setOption0(option: Option, value: Option.Value) throws { self.eventLoop.assertInEventLoop() guard !self.closed else { throw ChannelError.ioOnClosedChannel } switch option { case _ as ChannelOptions.Types.AutoReadOption: self.options.autoRead = value as! Bool self.readIfNeeded0() case _ as ChannelOptions.Types.SocketOption: let optionValue = option as! ChannelOptions.Types.SocketOption // SO_REUSEADDR and SO_REUSEPORT are handled here. switch (optionValue.level, optionValue.name) { case (SOL_SOCKET, SO_REUSEADDR): self.reuseAddress = (value as! SocketOptionValue) != Int32(0) case (SOL_SOCKET, SO_REUSEPORT): self.reusePort = (value as! SocketOptionValue) != Int32(0) default: try self.nwOptions.applyChannelOption(option: optionValue, value: value as! SocketOptionValue) } case _ as ChannelOptions.Types.WriteBufferWaterMarkOption: if self._backpressureManager.writabilityChanges(whenUpdatingWaterMarks: value as! ChannelOptions.Types.WriteBufferWaterMark) { self.pipeline.fireChannelWritabilityChanged() } case is NIOTSChannelOptions.Types.NIOTSEnablePeerToPeerOption: self.enablePeerToPeer = value as! NIOTSChannelOptions.Types.NIOTSEnablePeerToPeerOption.Value case _ as NIOTSChannelOptions.Types.NIOTSWaitForActivityOption: let newValue = value as! Bool self.options.waitForActivity = newValue if let state = self.connection?.state, case .waiting(let err) = state, !newValue { // We're in waiting now, so we should drop the connection. self.close0(error: err, mode: .all, promise: nil) } case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption: self.options.supportRemoteHalfClosure = value as! Bool case is NIOTSChannelOptions.Types.NIOTSAllowLocalEndpointReuse: self.allowLocalEndpointReuse = value as! NIOTSChannelOptions.Types.NIOTSEnablePeerToPeerOption.Value default: try self.setChannelSpecificOption0(option: option, value: value) } } public func getOption(_ option: Option) -> EventLoopFuture { if self.eventLoop.inEventLoop { return self.eventLoop.makeCompletedFuture(Result { try getOption0(option: option) }) } else { return eventLoop.submit { try self.getOption0(option: option) } } } func getOption0(option: Option) throws -> Option.Value { self.eventLoop.assertInEventLoop() guard !self.closed else { throw ChannelError.ioOnClosedChannel } switch option { case _ as ChannelOptions.Types.AutoReadOption: return self.options.autoRead as! Option.Value case _ as ChannelOptions.Types.SocketOption: let optionValue = option as! ChannelOptions.Types.SocketOption // SO_REUSEADDR and SO_REUSEPORT are handled here. switch (optionValue.level, optionValue.name) { case (SOL_SOCKET, SO_REUSEADDR): return Int32(self.reuseAddress ? 1 : 0) as! Option.Value case (SOL_SOCKET, SO_REUSEPORT): return Int32(self.reusePort ? 1 : 0) as! Option.Value default: return try self.nwOptions.valueFor(socketOption: optionValue) as! Option.Value } case _ as ChannelOptions.Types.WriteBufferWaterMarkOption: return self._backpressureManager.waterMarks as! Option.Value case is NIOTSChannelOptions.Types.NIOTSEnablePeerToPeerOption: return self.enablePeerToPeer as! Option.Value case is NIOTSChannelOptions.Types.NIOTSAllowLocalEndpointReuse: return self.allowLocalEndpointReuse as! Option.Value case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption: return self.options.supportRemoteHalfClosure as! Option.Value case _ as NIOTSChannelOptions.Types.NIOTSWaitForActivityOption: return self.options.waitForActivity as! Option.Value case is NIOTSChannelOptions.Types.NIOTSCurrentPathOption: guard let currentPath = self.connection?.currentPath else { throw NIOTSErrors.NoCurrentPath() } return currentPath as! Option.Value case is NIOTSChannelOptions.Types.NIOTSMetadataOption: let optionValue = option as! NIOTSChannelOptions.Types.NIOTSMetadataOption guard let connection = self.connection else { throw NIOTSErrors.NoCurrentConnection() } return connection.metadata(definition: optionValue.definition) as! Option.Value default: // watchOS 6.0 availability is covered by the @available on this extension. if #available(OSX 10.15, iOS 13.0, tvOS 13.0, *) { switch option { case is NIOTSChannelOptions.Types.NIOTSEstablishmentReportOption: guard let connection = self.connection else { throw NIOTSErrors.NoCurrentConnection() } let promise: EventLoopPromise = eventLoop.makePromise() connection.requestEstablishmentReport(queue: connectionQueue) { report in promise.succeed(report) } return promise.futureResult as! Option.Value case is NIOTSChannelOptions.Types.NIOTSDataTransferReportOption: guard let connection = self.connection else { throw NIOTSErrors.NoCurrentConnection() } return connection.startDataTransferReport() as! Option.Value default: break } } return try getChannelSpecificOption0(option: option) } } } #endif