diff --git a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift index a1f0b21..fbc1caa 100644 --- a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift +++ b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift @@ -103,7 +103,7 @@ public final class NIOTSConnectionBootstrap { /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. public func connect(host: String, port: Int) -> EventLoopFuture { guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else { - return self.group.next().makeFailedFuture(error: NIOTSErrors.InvalidPort(port: port)) + return self.group.next().makeFailedFuture(NIOTSErrors.InvalidPort(port: port)) } return self.connect(endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort)) } @@ -129,7 +129,7 @@ public final class NIOTSConnectionBootstrap { let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) return connect(to: address) } catch { - return group.next().makeFailedFuture(error: error) + return group.next().makeFailedFuture(error) } } @@ -146,7 +146,7 @@ public final class NIOTSConnectionBootstrap { qos: self.qos, tcpOptions: self.tcpOptions, tlsOptions: self.tlsOptions) - let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(result: ()) } + let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) } let channelOptions = self.channelOptions return conn.eventLoop.submit { @@ -158,7 +158,7 @@ public final class NIOTSConnectionBootstrap { let connectPromise: EventLoopPromise = conn.eventLoop.makePromise() connectAction(conn, connectPromise) let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) { - connectPromise.fail(error: ChannelError.connectTimeout(self.connectTimeout)) + connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout)) conn.close(promise: nil) } @@ -206,13 +206,13 @@ internal struct ChannelOptionStorage { func applyNext() { guard let (key, (value, applier)) = it.next() else { // If we reached the end, everything is applied. - applyPromise.succeed(result: ()) + applyPromise.succeed(()) return } applier(channel)(key, value).map { applyNext() - }.cascadeFailure(promise: applyPromise) + }.cascadeFailure(to: applyPromise) } applyNext() diff --git a/Sources/NIOTransportServices/NIOTSConnectionChannel.swift b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift index d99bf95..31d4a2d 100644 --- a/Sources/NIOTransportServices/NIOTSConnectionChannel.swift +++ b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift @@ -26,9 +26,9 @@ import Security func executeAndComplete(_ promise: EventLoopPromise?, _ body: () throws -> T) { do { let result = try body() - promise?.succeed(result: result) + promise?.succeed(result) } catch let e { - promise?.fail(error: e) + promise?.fail(e) } } @@ -36,7 +36,7 @@ func executeAndComplete(_ promise: EventLoopPromise?, _ body: () throws -> private func mergePromises(_ first: EventLoopPromise?, _ second: EventLoopPromise?) -> EventLoopPromise? { if let first = first { if let second = second { - first.futureResult.cascade(promise: second) + first.futureResult.cascade(to: second) } return first } else { @@ -190,7 +190,7 @@ internal final class NIOTSConnectionChannel { private var options: ConnectionChannelOptions = ConnectionChannelOptions() /// Any pending writes that have yet to be delivered to the network stack. - private var pendingWrites = CircularBuffer(initialRingCapacity: 8) + private var pendingWrites = CircularBuffer(initialCapacity: 8) /// An object to keep track of pending writes and manage our backpressure signaling. private var backpressureManager = BackpressureManager() @@ -422,12 +422,12 @@ extension NIOTSConnectionChannel: StateManagedChannel { internal func alreadyConfigured0(promise: EventLoopPromise?) { guard let connection = nwConnection else { - promise?.fail(error: NIOTSErrors.NotPreConfigured()) + promise?.fail(NIOTSErrors.NotPreConfigured()) return } guard case .setup = connection.state else { - promise?.fail(error: NIOTSErrors.NotPreConfigured()) + promise?.fail(NIOTSErrors.NotPreConfigured()) return } @@ -462,7 +462,7 @@ extension NIOTSConnectionChannel: StateManagedChannel { public func write0(_ data: NIOAny, promise: EventLoopPromise?) { guard self.isActive else { - promise?.fail(error: ChannelError.ioOnClosedChannel) + promise?.fail(ChannelError.ioOnClosedChannel) return } @@ -493,9 +493,9 @@ extension NIOTSConnectionChannel: StateManagedChannel { func completionCallback(promise: EventLoopPromise?, sentBytes: Int) -> ((NWError?) -> Void) { return { error in if let error = error { - promise?.fail(error: error) + promise?.fail(error) } else { - promise?.succeed(result: ()) + promise?.succeed(()) } if self.backpressureManager.writabilityChanges(whenBytesSent: sentBytes) { @@ -550,14 +550,14 @@ extension NIOTSConnectionChannel: StateManagedChannel { // Step 3 is to cancel a pending connect promise, if any. if let pendingConnect = self.connectPromise { self.connectPromise = nil - pendingConnect.fail(error: error) + pendingConnect.fail(error) } } public func doHalfClose0(error: Error, promise: EventLoopPromise?) { guard let conn = self.nwConnection else { // We don't have a connection to half close, so fail the promise. - promise?.fail(error: ChannelError.ioOnClosedChannel) + promise?.fail(ChannelError.ioOnClosedChannel) return } @@ -566,7 +566,7 @@ extension NIOTSConnectionChannel: StateManagedChannel { try self.state.closeOutput() } catch ChannelError.outputClosed { // Here we *only* fail the promise, no need to blow up the connection. - promise?.fail(error: ChannelError.outputClosed) + promise?.fail(ChannelError.outputClosed) return } catch { // For any other error, this is fatal. @@ -577,9 +577,9 @@ extension NIOTSConnectionChannel: StateManagedChannel { func completionCallback(for promise: EventLoopPromise?) -> ((NWError?) -> Void) { return { error in if let error = error { - promise?.fail(error: error) + promise?.fail(error) } else { - promise?.succeed(result: ()) + promise?.succeed(()) } } } @@ -599,7 +599,7 @@ extension NIOTSConnectionChannel: StateManagedChannel { case let x as NIOTSNetworkEvents.ConnectToNWEndpoint: self.connect0(to: x.endpoint, promise: promise) default: - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) } } @@ -752,7 +752,7 @@ extension NIOTSConnectionChannel { /// state. private func dropOutstandingWrites(error: Error) { while self.pendingWrites.count > 0 { - self.pendingWrites.removeFirst().promise?.fail(error: error) + self.pendingWrites.removeFirst().promise?.fail(error) } } diff --git a/Sources/NIOTransportServices/NIOTSEventLoop.swift b/Sources/NIOTransportServices/NIOTSEventLoop.swift index f68a4db..036b0e2 100644 --- a/Sources/NIOTransportServices/NIOTSEventLoop.swift +++ b/Sources/NIOTransportServices/NIOTSEventLoop.swift @@ -99,15 +99,15 @@ internal class NIOTSEventLoop: QoSEventLoop { self.taskQueue.async(qos: qos, execute: task) } - public func scheduleTask(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { - return self.scheduleTask(in: time, qos: self.defaultQoS, task) + public func scheduleTask(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled { + return self.scheduleTask(deadline: deadline, qos: self.defaultQoS, task) } - public func scheduleTask(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled { + public func scheduleTask(deadline: NIODeadline, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled { let p: EventLoopPromise = self.makePromise() guard self.state != .closed else { - p.fail(error: EventLoopError.shutdown) + p.fail(EventLoopError.shutdown) return Scheduled(promise: p, cancellationTask: { } ) } @@ -115,17 +115,25 @@ internal class NIOTSEventLoop: QoSEventLoop { // We set the QoS on this work item and explicitly enforce it when the block runs. let workItem = DispatchWorkItem(qos: qos, flags: .enforceQoS) { do { - p.succeed(result: try task()) + p.succeed(try task()) } catch { - p.fail(error: error) + p.fail(error) } } - - self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: DispatchTime.now().uptimeNanoseconds + UInt64(time.nanoseconds)), execute: workItem) + + self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds), execute: workItem) return Scheduled(promise: p, cancellationTask: { workItem.cancel() }) } + public func scheduleTask(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { + return self.scheduleTask(in: time, qos: self.defaultQoS, task) + } + + public func scheduleTask(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled { + return self.scheduleTask(deadline: NIODeadline.now() + time, qos: qos, task) + } + public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { self.closeGently().map { queue.async { callback(nil) } @@ -156,7 +164,7 @@ extension NIOTSEventLoop { let p: EventLoopPromise = self.makePromise() self.taskQueue.async { guard self.open else { - p.fail(error: EventLoopError.shutdown) + p.fail(EventLoopError.shutdown) return } @@ -179,8 +187,8 @@ extension NIOTSEventLoop { // We must not transition into the closed state until *after* the caller has been notified that the // event loop is closed. Otherwise, this future is in real trouble, as if it needs to dispatch onto the // event loop it will be forbidden from doing so. - let completionFuture = EventLoopFuture.andAll(futures, eventLoop: self) - completionFuture.cascade(promise: p) + let completionFuture = EventLoopFuture.andAllComplete(futures, on: self) + completionFuture.cascade(to: p) completionFuture.whenComplete { (_: Result) in self.state = .closed } diff --git a/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift b/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift index 0d0f0bc..db63e79 100644 --- a/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift +++ b/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift @@ -66,7 +66,7 @@ public final class NIOTSEventLoopGroup: EventLoopGroup { for loop in self.eventLoops { g.enter() - loop.closeGently().mapIfError { err in + loop.closeGently().recover { err in q.sync { error = err } }.whenComplete { (_: Result) in g.leave() diff --git a/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift index bbe3c8e..1e0ae4e 100644 --- a/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift +++ b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift @@ -153,7 +153,7 @@ public final class NIOTSListenerBootstrap { let address = try SocketAddress.makeAddressResolvingHost(host, port: port) channel.bind(to: address, promise: p) } catch { - p.fail(error: error) + p.fail(error) } return p.futureResult } @@ -180,7 +180,7 @@ public final class NIOTSListenerBootstrap { let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) channel.bind(to: address, promise: p) } catch { - p.fail(error: error) + p.fail(error) } return p.futureResult } @@ -199,7 +199,7 @@ public final class NIOTSListenerBootstrap { private func bind0(_ binder: @escaping (Channel) -> EventLoopFuture) -> EventLoopFuture { let eventLoop = self.group.next() as! NIOTSEventLoop let childEventLoopGroup = self.childGroup as! NIOTSEventLoopGroup - let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(result: ()) } + let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.makeSucceededFuture(()) } let childChannelInit = self.childChannelInit let serverChannelOptions = self.serverChannelOptions let childChannelOptions = self.childChannelOptions @@ -227,7 +227,7 @@ public final class NIOTSListenerBootstrap { serverChannel as Channel }.flatMapError { error in serverChannel.close0(error: error, mode: .all, promise: nil) - return eventLoop.makeFailedFuture(error: error) + return eventLoop.makeFailedFuture(error) } }.flatMap { $0 @@ -265,7 +265,7 @@ private class AcceptHandler: ChannelInboundHandler { let conn = self.unwrapInboundIn(data) let childLoop = self.childGroup.next() as! NIOTSEventLoop let ctxEventLoop = ctx.eventLoop - let childInitializer = self.childChannelInitializer ?? { _ in childLoop.makeSucceededFuture(result: ()) } + let childInitializer = self.childChannelInitializer ?? { _ in childLoop.makeSucceededFuture(()) } let newChannel = NIOTSConnectionChannel(wrapping: conn, on: childLoop, parent: ctx.channel, @@ -292,7 +292,7 @@ private class AcceptHandler: ChannelInboundHandler { } } ctx.fireChannelRead(self.wrapInboundOut(newChannel)) - return ctx.eventLoop.makeSucceededFuture(result: ()) + return ctx.eventLoop.makeSucceededFuture(()) }.whenFailure { error in ctx.eventLoop.assertInEventLoop() _ = newChannel.close() diff --git a/Sources/NIOTransportServices/NIOTSListenerChannel.swift b/Sources/NIOTransportServices/NIOTSListenerChannel.swift index 4d9958f..15e3cda 100644 --- a/Sources/NIOTransportServices/NIOTSListenerChannel.swift +++ b/Sources/NIOTransportServices/NIOTSListenerChannel.swift @@ -308,7 +308,7 @@ extension NIOTSListenerChannel: StateManagedChannel { } public func write0(_ data: NIOAny, promise: EventLoopPromise?) { - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) } public func flush0() { @@ -332,12 +332,12 @@ extension NIOTSListenerChannel: StateManagedChannel { // Step 2: fail any pending bind promise. if let pendingBind = self.bindPromise { self.bindPromise = nil - pendingBind.fail(error: error) + pendingBind.fail(error) } } public func doHalfClose0(error: Error, promise: EventLoopPromise?) { - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) } public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { @@ -345,7 +345,7 @@ extension NIOTSListenerChannel: StateManagedChannel { case let x as NIOTSNetworkEvents.BindToNWEndpoint: self.bind0(to: x.endpoint, promise: promise) default: - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) } } diff --git a/Sources/NIOTransportServices/StateManagedChannel.swift b/Sources/NIOTransportServices/StateManagedChannel.swift index 72c7b0e..c2cee9c 100644 --- a/Sources/NIOTransportServices/StateManagedChannel.swift +++ b/Sources/NIOTransportServices/StateManagedChannel.swift @@ -145,9 +145,9 @@ extension StateManagedChannel { do { try self.state.register(eventLoop: self.tsEventLoop, channel: self) self.pipeline.fireChannelRegistered() - promise?.succeed(result: ()) + promise?.succeed(()) } catch { - promise?.fail(error: error) + promise?.fail(error) self.close0(error: error, mode: .all, promise: nil) } } @@ -157,9 +157,9 @@ extension StateManagedChannel { try self.state.register(eventLoop: self.tsEventLoop, channel: self) self.pipeline.fireChannelRegistered() try self.state.beginActivating() - promise?.succeed(result: ()) + promise?.succeed(()) } catch { - promise?.fail(error: error) + promise?.fail(error) self.close0(error: error, mode: .all, promise: nil) return } @@ -192,7 +192,7 @@ extension StateManagedChannel { do { oldState = try self.state.becomeInactive() } catch let thrownError { - promise?.fail(error: thrownError) + promise?.fail(thrownError) return } @@ -215,17 +215,17 @@ extension StateManagedChannel { } // Next we fire the promise passed to this method. - promise?.succeed(result: ()) + promise?.succeed(()) // Now we schedule our final cleanup. We need to keep the channel pipeline alive for at least one more event // loop tick, as more work might be using it. self.eventLoop.execute { self.removeHandlers(channel: self) - self.closePromise.succeed(result: ()) + self.closePromise.succeed(()) } case .input: - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) case .output: self.doHalfClose0(error: error, promise: promise) @@ -243,7 +243,7 @@ extension StateManagedChannel { } self.isActive0.store(true) - promise?.succeed(result: ()) + promise?.succeed(()) self.pipeline.fireChannelActive() self.readIfNeeded0() } @@ -252,14 +252,14 @@ extension StateManagedChannel { /// not supported by a single channel type. private func activateWithType(type: ActivationType, to endpoint: NWEndpoint, promise: EventLoopPromise?) { guard type == self.supportedActivationType else { - promise?.fail(error: ChannelError.operationUnsupported) + promise?.fail(ChannelError.operationUnsupported) return } do { try self.state.beginActivating() } catch { - promise?.fail(error: error) + promise?.fail(error) return } diff --git a/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift b/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift index 0ff502c..4b00b08 100644 --- a/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift @@ -80,7 +80,7 @@ final class DisableWaitingAfterConnect: ChannelOutboundHandler { ctx.connect(to: address) } if let promise = promise { - f.cascade(promise: promise) + f.cascade(to: promise) } } } @@ -97,7 +97,7 @@ final class PromiseOnActiveHandler: ChannelInboundHandler { } func channelActive(ctx: ChannelHandlerContext) { - self.promise.succeed(result: ()) + self.promise.succeed(()) } } @@ -508,7 +508,7 @@ class NIOTSConnectionChannelTests: XCTestCase { } let connectFuture = NIOTSConnectionBootstrap(group: self.group) - .channelInitializer { channel in channel.eventLoop.makeFailedFuture(error: MyError()) } + .channelInitializer { channel in channel.eventLoop.makeFailedFuture(MyError()) } .connect(to: listener.localAddress!) do { diff --git a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift index eb0b5cb..69cf0dd 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift @@ -57,7 +57,7 @@ final class ReadExpecter: ChannelInboundHandler { func handlerRemoved(ctx: ChannelHandlerContext) { if let promise = self.readPromise { - promise.fail(error: DidNotReadError()) + promise.fail(DidNotReadError()) } } @@ -74,7 +74,7 @@ final class ReadExpecter: ChannelInboundHandler { private func maybeFulfillPromise() { if let promise = self.readPromise, self.cumulationBuffer! == self.expectedRead { - promise.succeed(result: ()) + promise.succeed(()) self.readPromise = nil } } @@ -109,7 +109,7 @@ final class HalfCloseHandler: ChannelInboundHandler { XCTAssertFalse(self.alreadyHalfClosed) XCTAssertFalse(self.closed) self.alreadyHalfClosed = true - self.halfClosedPromise.succeed(result: ()) + self.halfClosedPromise.succeed(()) ctx.close(mode: .output, promise: nil) default: @@ -212,7 +212,7 @@ class NIOTSEndToEndTests: XCTestCase { } } - let allDoneFuture = EventLoopFuture.andAll(completeFutures, eventLoop: self.group.next()) + let allDoneFuture = EventLoopFuture.andAllComplete(completeFutures, on: self.group.next()) XCTAssertNoThrow(try allDoneFuture.wait()) } @@ -232,7 +232,7 @@ class NIOTSEndToEndTests: XCTestCase { } } - let allClosed = EventLoopFuture.andAll(closeFutures, eventLoop: self.group.next()) + let allClosed = EventLoopFuture.andAllComplete(closeFutures, on: self.group.next()) XCTAssertNoThrow(try allClosed.wait()) } @@ -248,7 +248,7 @@ class NIOTSEndToEndTests: XCTestCase { closeFutures.append(channel.closeFuture) } closeFutureGroup.leave() - return channel.eventLoop.makeSucceededFuture(result: ()) + return channel.eventLoop.makeSucceededFuture(()) } .bind(host: "localhost", port: 0).wait() defer { @@ -273,7 +273,7 @@ class NIOTSEndToEndTests: XCTestCase { closeFutureGroup.wait() let allClosed = closeFutureSyncQueue.sync { - return EventLoopFuture.andAll(closeFutures, eventLoop: self.group.next()) + return EventLoopFuture.andAllComplete(closeFutures, on: self.group.next()) } XCTAssertNoThrow(try allClosed.wait()) } @@ -282,7 +282,7 @@ class NIOTSEndToEndTests: XCTestCase { let serverSideConnectionPromise: EventLoopPromise = self.group.next().makePromise() let listener = try NIOTSListenerBootstrap(group: self.group) .childChannelInitializer { channel in - serverSideConnectionPromise.succeed(result: channel) + serverSideConnectionPromise.succeed(channel) return channel.pipeline.add(handler: EchoHandler()) } .bind(host: "localhost", port: 0).wait() diff --git a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift index 95452e2..d10f66e 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift @@ -84,6 +84,6 @@ class NIOTSEventLoopTest: XCTestCase { XCTAssertFalse(firstLoop.inEventLoop) XCTAssertTrue(secondLoop.inEventLoop) } - try EventLoopFuture.andAll([firstTask.futureResult, secondTask.futureResult], eventLoop: firstLoop).wait() + try EventLoopFuture.andAllComplete([firstTask.futureResult, secondTask.futureResult], on: firstLoop).wait() } } diff --git a/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift b/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift index a06bd4f..5704260 100644 --- a/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift @@ -143,7 +143,7 @@ class NIOTSListenerChannelTests: XCTestCase { struct MyError: Error { } let listenerFuture = NIOTSListenerBootstrap(group: self.group) - .serverChannelInitializer { channel in channel.eventLoop.makeFailedFuture(error: MyError()) } + .serverChannelInitializer { channel in channel.eventLoop.makeFailedFuture(MyError()) } .bind(host: "localhost", port: 0) do { @@ -165,7 +165,7 @@ class NIOTSListenerChannelTests: XCTestCase { let listener = try NIOTSListenerBootstrap(group: self.group, childGroup: childGroup) .childChannelInitializer { channel in - childChannelPromise.succeed(result: channel) + childChannelPromise.succeed(channel) return channel.pipeline.add(handler: PromiseOnActiveHandler(activePromise)) }.bind(host: "localhost", port: 0).wait() defer {