diff --git a/Package.swift b/Package.swift index fad8f95..6ade68b 100644 --- a/Package.swift +++ b/Package.swift @@ -36,6 +36,6 @@ let package = Package( .target(name: "NIOTSHTTPServer", dependencies: ["NIO", "NIOTransportServices", "NIOHTTP1"]), .testTarget(name: "NIOTransportServicesTests", - dependencies: ["NIO", "NIOTransportServices"]), + dependencies: ["NIO", "NIOTransportServices", "NIOConcurrencyHelpers"]), ] ) diff --git a/Sources/NIOTransportServices/NIOTSEventLoop.swift b/Sources/NIOTransportServices/NIOTSEventLoop.swift index 036b0e2..e3cdee5 100644 --- a/Sources/NIOTransportServices/NIOTSEventLoop.swift +++ b/Sources/NIOTransportServices/NIOTSEventLoop.swift @@ -13,11 +13,13 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// -import Foundation -import NIO + import Dispatch +import Foundation import Network +import NIO +import NIOConcurrencyHelpers /// An `EventLoop` that interacts with `DispatchQoS` to help schedule upcoming work. /// @@ -113,17 +115,20 @@ internal class NIOTSEventLoop: QoSEventLoop { // Dispatch support for cancellation exists at the work-item level, so we explicitly create one here. // We set the QoS on this work item and explicitly enforce it when the block runs. - let workItem = DispatchWorkItem(qos: qos, flags: .enforceQoS) { + let timerSource = DispatchSource.makeTimerSource(queue: self.taskQueue) + timerSource.schedule(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds)) + timerSource.setEventHandler(qos: qos, flags: .enforceQoS) { do { p.succeed(try task()) } catch { p.fail(error) } } + timerSource.resume() - self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds), execute: workItem) - - return Scheduled(promise: p, cancellationTask: { workItem.cancel() }) + return Scheduled(promise: p, cancellationTask: { + timerSource.cancel() + }) } public func scheduleTask(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { diff --git a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift index cc8338d..cea4f4e 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift @@ -469,4 +469,25 @@ class NIOTSEndToEndTests: XCTestCase { connection.writeAndFlush(buffer, promise: nil) XCTAssertNoThrow(try completeFuture.wait()) } + + func testBasicConnectionTimeout() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEADDR), value: 0) + .serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEPORT), value: 0) + .childChannelInitializer { channel in channel.pipeline.addHandler(CloseOnActiveHandler())} + .bind(host: "localhost", port: 0).wait() + let address = listener.localAddress! + + // let's close the server socket, we disable SO_REUSEPORT/SO_REUSEADDR so that nobody can bind this for a + // while. + XCTAssertNoThrow(try listener.close().wait()) + + // this should now definitely time out. + XCTAssertThrowsError(try NIOTSConnectionBootstrap(group: self.group) + .connectTimeout(.milliseconds(10)) + .connect(to: address) + .wait()) { error in + print(error) + } + } } diff --git a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift index d10f66e..a127c28 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift @@ -15,6 +15,7 @@ //===----------------------------------------------------------------------===// import XCTest import NIO +import NIOConcurrencyHelpers import NIOTransportServices class NIOTSEventLoopTest: XCTestCase { @@ -86,4 +87,43 @@ class NIOTSEventLoopTest: XCTestCase { } try EventLoopFuture.andAllComplete([firstTask.futureResult, secondTask.futureResult], on: firstLoop).wait() } + + func testWeDontHoldELOrELGReferencesImmeditelyFollowingAConnect() { + weak var weakEL: EventLoop? = nil + weak var weakELG: EventLoopGroup? = nil + func make() throws { + let group = NIOTSEventLoopGroup(loopCount: 1) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + weakELG = group + weakEL = group.next() + + let counter = Atomic(value: 0) + let acceptedChan = group.next().makePromise(of: Channel.self) + let server = try NIOTSListenerBootstrap(group: group) + .childChannelInitializer { channel in + XCTAssertEqual(0, counter.add(1)) + acceptedChan.succeed(channel) + return channel.eventLoop.makeSucceededFuture(()) + } + .bind(host: "127.0.0.1", port: 0).wait() + // leave this "localhost" so we need to resolve it (involving happy eyeballs) + let client = try NIOTSConnectionBootstrap(group: group).connect(host: "localhost", + port: server.localAddress!.port!).wait() + XCTAssertNoThrow(try client.close().wait()) + XCTAssertNoThrow(try acceptedChan.futureResult.wait().close().flatMapErrorThrowing { error in + if let error = error as? ChannelError, error == .alreadyClosed { + // this is okay because we previously closed the other end + } else { + throw error + } + }) + XCTAssertNoThrow(try server.close().wait()) + } + XCTAssertNoThrow(try make()) + usleep(100_000) // to give the other thread chance to deallocate everything + XCTAssertNil(weakELG) + XCTAssertNil(weakEL) + } }