Make NIOTSEventLoop @unchecked Sendable

This commit is contained in:
Gus Cairo 2025-04-01 11:05:45 +01:00
parent 37f947f448
commit b31ad4e336
1 changed files with 60 additions and 112 deletions

View File

@ -28,13 +28,11 @@ import NIOConcurrencyHelpers
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
public protocol QoSEventLoop: EventLoop { public protocol QoSEventLoop: EventLoop {
/// Submit a given task to be executed by the `EventLoop` at a given `qos`. /// Submit a given task to be executed by the `EventLoop` at a given `qos`.
@preconcurrency func execute(qos: DispatchQoS, _ task: @escaping @Sendable () -> Void)
func execute(qos: DispatchQoS, _ task: @Sendable @escaping () -> Void)
/// Schedule a `task` that is executed by this `NIOTSEventLoop` after the given amount of time at the /// Schedule a `task` that is executed by this `NIOTSEventLoop` after the given amount of time at the
/// given `qos`. /// given `qos`.
@preconcurrency func scheduleTask<T>(
func scheduleTask<T: Sendable>(
in time: TimeAmount, in time: TimeAmount,
qos: DispatchQoS, qos: DispatchQoS,
_ task: @escaping @Sendable () throws -> T _ task: @escaping @Sendable () throws -> T
@ -50,71 +48,14 @@ public protocol QoSEventLoop: EventLoop {
/// the queue until it has drained. /// the queue until it has drained.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
private enum LifecycleState { private enum LifecycleState {
/// This state holds all the channels registered to this event loop. case active
/// case closing
/// This dictionary ensures that these channels stay alive for as long as they are registered: they cannot leak.
/// It also provides provides a notification mechanism for this event loop to deliver them specific kinds of events: in particular, to
/// request that they quiesce or shut themselves down.
case active(registeredChannels: [ObjectIdentifier: Channel])
case closing(registeredChannels: [ObjectIdentifier: Channel])
case closed case closed
enum CloseGentlyAction {
case closeChannels([Channel])
case failPromise
}
mutating func closeGently() -> CloseGentlyAction {
switch self {
case .active(let registeredChannels):
self = .closing(registeredChannels: registeredChannels)
return .closeChannels(registeredChannels.map({ _, channel in channel }))
case .closing, .closed:
return .failPromise
}
}
enum RegisterChannelResult {
case success
case failedToRegister
}
mutating func registerChannel(_ channel: Channel) -> RegisterChannelResult {
switch self {
case .active(var registeredChannels):
channel.eventLoop.assertInEventLoop()
registeredChannels[ObjectIdentifier(channel)] = channel
self = .active(registeredChannels: registeredChannels)
return .success
case .closing, .closed:
return .failedToRegister
}
}
mutating func deregisterChannel(_ channel: Channel) {
switch self {
case .active(var registeredChannels):
channel.eventLoop.assertInEventLoop()
let oldChannel = registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
assert(oldChannel != nil)
self = .active(registeredChannels: registeredChannels)
case .closing(var registeredChannels):
channel.eventLoop.assertInEventLoop()
let oldChannel = registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
assert(oldChannel != nil)
self = .active(registeredChannels: registeredChannels)
case .closed:
()
}
}
} }
// It's okay for NIOTSEventLoop to be unchecked Sendable, since the state is isolated to the EL.
@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) @available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
internal final class NIOTSEventLoop: QoSEventLoop { internal final class NIOTSEventLoop: QoSEventLoop, @unchecked Sendable {
private let loop: DispatchQueue private let loop: DispatchQueue
private let taskQueue: DispatchQueue private let taskQueue: DispatchQueue
private let inQueueKey: DispatchSpecificKey<UUID> private let inQueueKey: DispatchSpecificKey<UUID>
@ -122,8 +63,21 @@ internal final class NIOTSEventLoop: QoSEventLoop {
private let defaultQoS: DispatchQoS private let defaultQoS: DispatchQoS
private let canBeShutDownIndividually: Bool private let canBeShutDownIndividually: Bool
/// All the channels registered to this event loop.
///
/// This array does two jobs. Firstly, it ensures that these channels stay alive for as long as
/// they are registered: they cannot leak. Secondly, it provides a notification mechanism for
/// this event loop to deliver them specific kinds of events: in particular, to request that
/// they quiesce or shut themselves down.
private var registeredChannels: [ObjectIdentifier: Channel] = [:]
/// The state of this event loop. /// The state of this event loop.
private let state = NIOLockedValueBox(LifecycleState.active(registeredChannels: [:])) private var state = LifecycleState.active
/// Whether this event loop is accepting new channels.
private var open: Bool {
self.state == .active
}
/// Returns whether the currently executing code is on the event loop. /// Returns whether the currently executing code is on the event loop.
/// ///
@ -165,26 +119,23 @@ internal final class NIOTSEventLoop: QoSEventLoop {
loop.setSpecific(key: inQueueKey, value: self.loopID) loop.setSpecific(key: inQueueKey, value: self.loopID)
} }
public func execute(_ task: @Sendable @escaping () -> Void) { public func execute(_ task: @escaping @Sendable () -> Void) {
self.execute(qos: self.defaultQoS, task) self.execute(qos: self.defaultQoS, task)
} }
@preconcurrency public func execute(qos: DispatchQoS, _ task: @escaping @Sendable () -> Void) { public func execute(qos: DispatchQoS, _ task: @escaping @Sendable () -> Void) {
// Ideally we'd not accept new work while closed. Sadly, that's not possible with the current APIs for this. // Ideally we'd not accept new work while closed. Sadly, that's not possible with the current APIs for this.
self.taskQueue.async(qos: qos, execute: task) self.taskQueue.async(qos: qos, execute: task)
} }
@preconcurrency public func scheduleTask<T>( public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping @Sendable () throws -> T) -> Scheduled<T> {
deadline: NIODeadline,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
self.scheduleTask(deadline: deadline, qos: self.defaultQoS, task) self.scheduleTask(deadline: deadline, qos: self.defaultQoS, task)
} }
public func scheduleTask<T>( public func scheduleTask<T>(
deadline: NIODeadline, deadline: NIODeadline,
qos: DispatchQoS, qos: DispatchQoS,
_ task: @escaping @Sendable () throws -> T _ task: @escaping () throws -> T
) -> Scheduled<T> { ) -> Scheduled<T> {
let p: EventLoopPromise<T> = self.makePromise() let p: EventLoopPromise<T> = self.makePromise()
@ -193,7 +144,7 @@ internal final class NIOTSEventLoop: QoSEventLoop {
let timerSource = DispatchSource.makeTimerSource(queue: self.taskQueue) let timerSource = DispatchSource.makeTimerSource(queue: self.taskQueue)
timerSource.schedule(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds)) timerSource.schedule(deadline: DispatchTime(uptimeNanoseconds: deadline.uptimeNanoseconds))
timerSource.setEventHandler(qos: qos, flags: .enforceQoS) { timerSource.setEventHandler(qos: qos, flags: .enforceQoS) {
if case .closed = self.state.withLockedValue({ $0 }) { guard self.state != .closed else {
p.fail(EventLoopError.shutdown) p.fail(EventLoopError.shutdown)
return return
} }
@ -220,18 +171,12 @@ internal final class NIOTSEventLoop: QoSEventLoop {
) )
} }
@preconcurrency public func scheduleTask<T>( public func scheduleTask<T>(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
in time: TimeAmount,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
self.scheduleTask(in: time, qos: self.defaultQoS, task) self.scheduleTask(in: time, qos: self.defaultQoS, task)
} }
@preconcurrency public func scheduleTask<T>( public func scheduleTask<T>(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled<T>
in time: TimeAmount, {
qos: DispatchQoS,
_ task: @escaping @Sendable () throws -> T
) -> Scheduled<T> {
self.scheduleTask(deadline: NIODeadline.now() + time, qos: qos, task) self.scheduleTask(deadline: NIODeadline.now() + time, qos: qos, task)
} }
@ -281,32 +226,34 @@ extension NIOTSEventLoop {
internal func closeGently() -> EventLoopFuture<Void> { internal func closeGently() -> EventLoopFuture<Void> {
let p: EventLoopPromise<Void> = self.makePromise() let p: EventLoopPromise<Void> = self.makePromise()
self.taskQueue.async { self.taskQueue.async {
switch self.state.withLockedValue({ $0.closeGently() }) { guard self.open else {
case .closeChannels(let channels): p.fail(EventLoopError.shutdown)
// We need to tell all currently-registered channels to close. return
let futures: [EventLoopFuture<Void>] = channels.map { channel in }
channel.close(promise: nil)
return channel.closeFuture.flatMapErrorThrowing { error in // Ok, time to shut down.
if let error = error as? ChannelError, error == .alreadyClosed { self.state = .closing
return ()
} else { // We need to tell all currently-registered channels to close.
throw error let futures: [EventLoopFuture<Void>] = self.registeredChannels.map { _, channel in
} channel.close(promise: nil)
return channel.closeFuture.flatMapErrorThrowing { error in
if let error = error as? ChannelError, error == .alreadyClosed {
return ()
} else {
throw error
} }
} }
}
// The ordering here is important. // The ordering here is important.
// We must not transition into the closed state until *after* the caller has been notified that the // We must not transition into the closed state until *after* the caller has been notified that the
// event loop is closed. Otherwise, this future is in real trouble, as if it needs to dispatch onto the // event loop is closed. Otherwise, this future is in real trouble, as if it needs to dispatch onto the
// event loop it will be forbidden from doing so. // event loop it will be forbidden from doing so.
let completionFuture = EventLoopFuture<Void>.andAllComplete(futures, on: self) let completionFuture = EventLoopFuture<Void>.andAllComplete(futures, on: self)
completionFuture.cascade(to: p) completionFuture.cascade(to: p)
completionFuture.whenComplete { (_: Result<Void, Error>) in completionFuture.whenComplete { (_: Result<Void, Error>) in
self.state.withLockedValue({ $0 = .closed }) self.state = .closed
}
case .failPromise:
p.fail(EventLoopError.shutdown)
} }
} }
return p.futureResult return p.futureResult
@ -317,18 +264,19 @@ extension NIOTSEventLoop {
extension NIOTSEventLoop { extension NIOTSEventLoop {
/// Record a given channel with this event loop. /// Record a given channel with this event loop.
internal func register(_ channel: Channel) throws { internal func register(_ channel: Channel) throws {
switch self.state.withLockedValue({ $0.registerChannel(channel) }) { guard self.open else {
case .success:
()
case .failedToRegister:
throw EventLoopError.shutdown throw EventLoopError.shutdown
} }
channel.eventLoop.assertInEventLoop()
self.registeredChannels[ObjectIdentifier(channel)] = channel
} }
// We don't allow deregister to fail, as it doesn't make any sense. // We don't allow deregister to fail, as it doesn't make any sense.
internal func deregister(_ channel: Channel) { internal func deregister(_ channel: Channel) {
self.state.withLockedValue({ $0.deregisterChannel(channel) }) channel.eventLoop.assertInEventLoop()
let oldChannel = self.registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
assert(oldChannel != nil)
} }
} }
#endif #endif