From 1d0cb1040eacfad538c4e9a681db4bb09e5e8935 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Fri, 22 Mar 2019 15:40:02 +0000 Subject: [PATCH] don't retain everything until connect timeout expires (#30) Motivation: Previously we would use the combination of DispatchQueue.asyncAfter and a DispatchWorkItem for the connect timeout. If the connection succeeded we would just cancel the DispatchWorkItem. Unfortunately that will still keep everything that's captured in the DispatchWorkItem alive until the deadline has come (because DispatchWorkItem is just a dumb wrapper over a closure). Modifications: use a DispatchSource of type timer source instead. Result: - we won't keep the ELG/EL/Channel/... alive until at least the connect timeout expires. - fixes #28 --- Package.swift | 2 +- .../NIOTransportServices/NIOTSEventLoop.swift | 17 +++++--- .../NIOTSEndToEndTests.swift | 21 ++++++++++ .../NIOTSEventLoopTests.swift | 40 +++++++++++++++++++ 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/Package.swift b/Package.swift index fad8f95..6ade68b 100644 --- a/Package.swift +++ b/Package.swift @@ -36,6 +36,6 @@ let package = Package( .target(name: "NIOTSHTTPServer", dependencies: ["NIO", "NIOTransportServices", "NIOHTTP1"]), .testTarget(name: "NIOTransportServicesTests", - dependencies: ["NIO", "NIOTransportServices"]), + dependencies: ["NIO", "NIOTransportServices", "NIOConcurrencyHelpers"]), ] ) diff --git a/Sources/NIOTransportServices/NIOTSEventLoop.swift b/Sources/NIOTransportServices/NIOTSEventLoop.swift index 036b0e2..e3cdee5 100644 --- a/Sources/NIOTransportServices/NIOTSEventLoop.swift +++ b/Sources/NIOTransportServices/NIOTSEventLoop.swift @@ -13,11 +13,13 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// -import Foundation -import NIO + import Dispatch +import Foundation import Network +import NIO +import NIOConcurrencyHelpers /// An `EventLoop` that interacts with `DispatchQoS` to help schedule upcoming work. /// @@ -113,17 +115,20 @@ internal class NIOTSEventLoop: QoSEventLoop { // Dispatch support for cancellation exists at the work-item level, so we explicitly create one here. // We set the QoS on this work item and explicitly enforce it when the block runs. - let workItem = DispatchWorkItem(qos: qos, flags: .enforceQoS) { + let timerSource = DispatchSource.makeTimerSource(queue: self.taskQueue) + timerSource.schedule(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds)) + timerSource.setEventHandler(qos: qos, flags: .enforceQoS) { do { p.succeed(try task()) } catch { p.fail(error) } } + timerSource.resume() - self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds), execute: workItem) - - return Scheduled(promise: p, cancellationTask: { workItem.cancel() }) + return Scheduled(promise: p, cancellationTask: { + timerSource.cancel() + }) } public func scheduleTask(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { diff --git a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift index cc8338d..cea4f4e 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift @@ -469,4 +469,25 @@ class NIOTSEndToEndTests: XCTestCase { connection.writeAndFlush(buffer, promise: nil) XCTAssertNoThrow(try completeFuture.wait()) } + + func testBasicConnectionTimeout() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEADDR), value: 0) + .serverChannelOption(ChannelOptions.socket(SOL_SOCKET, SO_REUSEPORT), value: 0) + .childChannelInitializer { channel in channel.pipeline.addHandler(CloseOnActiveHandler())} + .bind(host: "localhost", port: 0).wait() + let address = listener.localAddress! + + // let's close the server socket, we disable SO_REUSEPORT/SO_REUSEADDR so that nobody can bind this for a + // while. + XCTAssertNoThrow(try listener.close().wait()) + + // this should now definitely time out. + XCTAssertThrowsError(try NIOTSConnectionBootstrap(group: self.group) + .connectTimeout(.milliseconds(10)) + .connect(to: address) + .wait()) { error in + print(error) + } + } } diff --git a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift index d10f66e..a127c28 100644 --- a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift +++ b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift @@ -15,6 +15,7 @@ //===----------------------------------------------------------------------===// import XCTest import NIO +import NIOConcurrencyHelpers import NIOTransportServices class NIOTSEventLoopTest: XCTestCase { @@ -86,4 +87,43 @@ class NIOTSEventLoopTest: XCTestCase { } try EventLoopFuture.andAllComplete([firstTask.futureResult, secondTask.futureResult], on: firstLoop).wait() } + + func testWeDontHoldELOrELGReferencesImmeditelyFollowingAConnect() { + weak var weakEL: EventLoop? = nil + weak var weakELG: EventLoopGroup? = nil + func make() throws { + let group = NIOTSEventLoopGroup(loopCount: 1) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + weakELG = group + weakEL = group.next() + + let counter = Atomic(value: 0) + let acceptedChan = group.next().makePromise(of: Channel.self) + let server = try NIOTSListenerBootstrap(group: group) + .childChannelInitializer { channel in + XCTAssertEqual(0, counter.add(1)) + acceptedChan.succeed(channel) + return channel.eventLoop.makeSucceededFuture(()) + } + .bind(host: "127.0.0.1", port: 0).wait() + // leave this "localhost" so we need to resolve it (involving happy eyeballs) + let client = try NIOTSConnectionBootstrap(group: group).connect(host: "localhost", + port: server.localAddress!.port!).wait() + XCTAssertNoThrow(try client.close().wait()) + XCTAssertNoThrow(try acceptedChan.futureResult.wait().close().flatMapErrorThrowing { error in + if let error = error as? ChannelError, error == .alreadyClosed { + // this is okay because we previously closed the other end + } else { + throw error + } + }) + XCTAssertNoThrow(try server.close().wait()) + } + XCTAssertNoThrow(try make()) + usleep(100_000) // to give the other thread chance to deallocate everything + XCTAssertNil(weakELG) + XCTAssertNil(weakEL) + } }