Correctly hop across event loops during child channel setup. (#17)
Motivation: We were inadvertently modifying the registration information of NIOTSEventLoop objects from multiple threads, which is very not good. Modifications: Add an event loop execute to hop loops. Result: Thread safe!
This commit is contained in:
parent
1761d4eafa
commit
fb5d37a1e1
|
|
@ -196,13 +196,13 @@ extension NIOTSEventLoop {
|
||||||
throw EventLoopError.shutdown
|
throw EventLoopError.shutdown
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(channel.eventLoop === self)
|
channel.eventLoop.assertInEventLoop()
|
||||||
self.registeredChannels[ObjectIdentifier(channel)] = channel
|
self.registeredChannels[ObjectIdentifier(channel)] = channel
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't allow deregister to fail, as it doesn't make any sense.
|
// We don't allow deregister to fail, as it doesn't make any sense.
|
||||||
internal func deregister(_ channel: Channel) {
|
internal func deregister(_ channel: Channel) {
|
||||||
assert(channel.eventLoop === self)
|
channel.eventLoop.assertInEventLoop()
|
||||||
let oldChannel = self.registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
|
let oldChannel = self.registeredChannels.removeValue(forKey: ObjectIdentifier(channel))
|
||||||
assert(oldChannel != nil)
|
assert(oldChannel != nil)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -339,11 +339,15 @@ extension NIOTSListenerChannel: StateManagedChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
public func channelRead0(_ data: NIOAny) {
|
public func channelRead0(_ data: NIOAny) {
|
||||||
|
self.eventLoop.assertInEventLoop()
|
||||||
|
|
||||||
let channel = self.unwrapData(data, as: NIOTSConnectionChannel.self)
|
let channel = self.unwrapData(data, as: NIOTSConnectionChannel.self)
|
||||||
let p: EventLoopPromise<Void> = self.eventLoop.newPromise()
|
let p: EventLoopPromise<Void> = channel.eventLoop.newPromise()
|
||||||
channel.registerAlreadyConfigured0(promise: p)
|
channel.eventLoop.execute {
|
||||||
p.futureResult.whenFailure { (_: Error) in
|
channel.registerAlreadyConfigured0(promise: p)
|
||||||
channel.close(promise: nil)
|
p.futureResult.whenFailure { (_: Error) in
|
||||||
|
channel.close(promise: nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -156,4 +156,32 @@ class NIOTSListenerChannelTests: XCTestCase {
|
||||||
XCTFail("Unexpected error")
|
XCTFail("Unexpected error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCanSafelyInvokeChannelsAcrossThreads() throws {
|
||||||
|
// This is a test that aims to trigger TSAN violations.
|
||||||
|
let childGroup = NIOTSEventLoopGroup(loopCount: 2)
|
||||||
|
let childChannelPromise: EventLoopPromise<Channel> = childGroup.next().newPromise()
|
||||||
|
let activePromise: EventLoopPromise<Void> = childGroup.next().newPromise()
|
||||||
|
|
||||||
|
let listener = try NIOTSListenerBootstrap(group: self.group, childGroup: childGroup)
|
||||||
|
.childChannelInitializer { channel in
|
||||||
|
childChannelPromise.succeed(result: channel)
|
||||||
|
return channel.pipeline.add(handler: PromiseOnActiveHandler(activePromise))
|
||||||
|
}.bind(host: "localhost", port: 0).wait()
|
||||||
|
defer {
|
||||||
|
XCTAssertNoThrow(try listener.close().wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to the listener.
|
||||||
|
let channel = try NIOTSConnectionBootstrap(group: self.group)
|
||||||
|
.connect(to: listener.localAddress!).wait()
|
||||||
|
|
||||||
|
// Wait for the child channel to become active.
|
||||||
|
let childChannel = try childChannelPromise.futureResult.wait()
|
||||||
|
XCTAssertNoThrow(try activePromise.futureResult.wait())
|
||||||
|
|
||||||
|
// Now close the child channel.
|
||||||
|
XCTAssertNoThrow(try childChannel.close().wait())
|
||||||
|
XCTAssertNoThrow(try channel.closeFuture.wait())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue