200 lines
7.8 KiB
Swift
200 lines
7.8 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the SwiftNIO open source project
|
|
//
|
|
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
|
// swift-tools-version:4.0
|
|
//
|
|
// swift-tools-version:4.0
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
import Foundation
|
|
import NIO
|
|
import Dispatch
|
|
import Network
|
|
|
|
|
|
/// An `EventLoop` that interacts with `DispatchQoS` to help schedule upcoming work.
|
|
///
|
|
/// `EventLoop`s that implement `QoSEventLoop` can interact with `Dispatch` to propagate information
|
|
/// about the QoS required for a specific task block. This allows tasks to be dispatched onto an
|
|
/// event loop with a different priority than the majority of tasks on that loop.
|
|
public protocol QoSEventLoop: EventLoop {
|
|
/// Submit a given task to be executed by the `EventLoop` at a given `qos`.
|
|
func execute(qos: DispatchQoS, _ task: @escaping () -> Void) -> Void
|
|
|
|
/// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time at the
|
|
/// given `qos`.
|
|
func scheduleTask<T>(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled<T>
|
|
}
|
|
|
|
|
|
/// The lifecycle state of a given event loop.
|
|
///
|
|
/// Event loops have the ability to be shut down, and not restarted. When a loop is active it will accept
|
|
/// new registrations, and new scheduled work items. When a loop is shutting down it will no longer accept
|
|
/// new registrations, but it will continue to accept new scheduled work items. When a loop is closed, it
|
|
/// will accept neither new registrations nor new scheduled work items, but it will continue to process
|
|
/// the queue until it has drained.
|
|
fileprivate enum LifecycleState {
|
|
case active
|
|
case closing
|
|
case closed
|
|
}
|
|
|
|
|
|
internal class NIOTSEventLoop: QoSEventLoop {
|
|
private let loop: DispatchQueue
|
|
private let taskQueue: DispatchQueue
|
|
private let inQueueKey: DispatchSpecificKey<UUID>
|
|
private let loopID: UUID
|
|
private let defaultQoS: DispatchQoS
|
|
|
|
/// All the channels registered to this event loop.
|
|
///
|
|
/// This array does two jobs. Firstly, it ensures that these channels stay alive for as long as
|
|
/// they are registered: they cannot leak. Secondly, it provides a notification mechanism for
|
|
/// this event loop to deliver them specific kinds of events: in particular, to request that
|
|
/// they quiesce or shut themselves down.
|
|
private var registeredChannels: [ObjectIdentifier: Channel] = [:]
|
|
|
|
/// The state of this event loop.
|
|
private var state = LifecycleState.active
|
|
|
|
/// Whether this event loop is accepting new channels.
|
|
private var open: Bool {
|
|
return self.state == .active
|
|
}
|
|
|
|
public var inEventLoop: Bool {
|
|
return DispatchQueue.getSpecific(key: self.inQueueKey) == self.loopID
|
|
}
|
|
|
|
public init(qos: DispatchQoS) {
|
|
self.loop = DispatchQueue(label: "nio.transportservices.eventloop.loop", qos: qos, autoreleaseFrequency: .workItem)
|
|
self.taskQueue = DispatchQueue(label: "nio.transportservices.eventloop.taskqueue", target: self.loop)
|
|
self.loopID = UUID()
|
|
self.inQueueKey = DispatchSpecificKey()
|
|
self.defaultQoS = qos
|
|
loop.setSpecific(key: inQueueKey, value: self.loopID)
|
|
}
|
|
|
|
public func execute(_ task: @escaping () -> Void) {
|
|
self.execute(qos: self.defaultQoS, task)
|
|
}
|
|
|
|
public func execute(qos: DispatchQoS, _ task: @escaping () -> Void) {
|
|
// Ideally we'd not accept new work while closed. Sadly, that's not possible with the current APIs for this.
|
|
self.taskQueue.async(qos: qos, execute: task)
|
|
}
|
|
|
|
public func scheduleTask<T>(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
|
|
return self.scheduleTask(in: time, qos: self.defaultQoS, task)
|
|
}
|
|
|
|
public func scheduleTask<T>(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled<T> {
|
|
let p: EventLoopPromise<T> = self.newPromise()
|
|
|
|
guard self.state != .closed else {
|
|
p.fail(error: EventLoopError.shutdown)
|
|
return Scheduled(promise: p, cancellationTask: { } )
|
|
}
|
|
|
|
// Dispatch has no support for cancellation, so instead we synchronize over this nice variable.
|
|
var cancelled = false
|
|
|
|
self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: DispatchTime.now().uptimeNanoseconds + UInt64(time.nanoseconds)), qos: qos) {
|
|
guard !cancelled else { return }
|
|
do {
|
|
p.succeed(result: try task())
|
|
} catch {
|
|
p.fail(error: error)
|
|
}
|
|
}
|
|
|
|
return Scheduled(promise: p, cancellationTask: { self.taskQueue.async { cancelled = true } })
|
|
}
|
|
|
|
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
|
|
self.closeGently().map {
|
|
queue.async { callback(nil) }
|
|
}.whenFailure { error in
|
|
queue.async { callback(error) }
|
|
}
|
|
}
|
|
}
|
|
|
|
extension NIOTSEventLoop {
|
|
/// Create a `DispatchQueue` to use for events on a given `Channel`.
|
|
///
|
|
/// This `DispatchQueue` will be guaranteed to execute on this `EventLoop`, and
|
|
/// so is safe to use concurrently with the rest of the event loop.
|
|
internal func channelQueue(label: String, qos: DispatchQoS? = nil) -> DispatchQueue {
|
|
// If a QoS override is not requested, use the default.
|
|
let qos = qos ?? self.defaultQoS
|
|
return DispatchQueue(label: label, qos: qos, target: self.loop)
|
|
}
|
|
}
|
|
|
|
extension NIOTSEventLoop {
|
|
internal func closeGently() -> EventLoopFuture<Void> {
|
|
let p: EventLoopPromise<Void> = self.newPromise()
|
|
self.taskQueue.async {
|
|
guard self.open else {
|
|
p.fail(error: EventLoopError.shutdown)
|
|
return
|
|
}
|
|
|
|
// Ok, time to shut down.
|
|
self.state = .closing
|
|
|
|
// We need to tell all currently-registered channels to close.
|
|
let futures: [EventLoopFuture<Void>] = self.registeredChannels.map { _, channel in
|
|
channel.close(promise: nil)
|
|
return channel.closeFuture.thenIfErrorThrowing { error in
|
|
if let error = error as? ChannelError, error == .alreadyClosed {
|
|
return ()
|
|
} else {
|
|
throw error
|
|
}
|
|
}
|
|
}
|
|
|
|
// The ordering here is important.
|
|
// We must not transition into the closed state until *after* the caller has been notified that the
|
|
// event loop is closed. Otherwise, this future is in real trouble, as if it needs to dispatch onto the
|
|
// event loop it will be forbidden from doing so.
|
|
let completionFuture = EventLoopFuture<Void>.andAll(futures, eventLoop: self)
|
|
completionFuture.cascade(promise: p)
|
|
completionFuture.whenComplete {
|
|
self.state = .closed
|
|
}
|
|
}
|
|
return p.futureResult
|
|
}
|
|
}
|
|
|
|
extension NIOTSEventLoop {
|
|
/// Record a given channel with this event loop.
|
|
internal func register(_ channel: Channel) throws {
|
|
guard self.open else {
|
|
throw EventLoopError.shutdown
|
|
}
|
|
|
|
assert(channel.eventLoop === self)
|
|
self.registeredChannels[ObjectIdentifier(channel)] = channel
|
|
}
|
|
|
|
// We don't allow deregister to fail, as it doesn't make any sense.
|
|
internal func deregister(_ channel: Channel) {
|
|
assert(channel.eventLoop === self)
|
|
let oldChannel = self.registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
|
|
assert(oldChannel != nil)
|
|
}
|
|
}
|