Correctly autoRead through the pipeline (#52)

Motivation:

When autoRead is on, the pipeline must observe the read calls in order
to be able to exert backpressure. Otherwise, autoRead is a
zero-backpressure mode, which isn't great.

Correctly call pipeline.read instead of self.read0 to avoid this.

Modifications:

- Updated NIOTSConnectionChannel to call pipeline.read().

Result:

Backpressure can be exerted.
This commit is contained in:
Cory Benfield 2019-07-23 14:37:50 +01:00 committed by Johannes Weiss
parent eec3aed641
commit 6cba688855
3 changed files with 86 additions and 1 deletions

View File

@ -625,7 +625,7 @@ extension NIOTSConnectionChannel: StateManagedChannel {
/// A function that will trigger a socket read if necessary.
internal func readIfNeeded0() {
if self.options.autoRead {
self.read0()
self.pipeline.read()
}
}
}

View File

@ -642,5 +642,76 @@ class NIOTSConnectionChannelTests: XCTestCase {
XCTFail("Unexpected error: \(error)")
}
}
func testAutoReadTraversesThePipeline() throws {
// This test is driven entirely by a channel handler inserted into the client channel.
final class TestHandler: ChannelDuplexHandler {
typealias InboundIn = ByteBuffer
typealias OutboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer
var readCount = 0
private let testCompletePromise: EventLoopPromise<Void>
init(testCompletePromise: EventLoopPromise<Void>) {
self.testCompletePromise = testCompletePromise
}
func read(context: ChannelHandlerContext) {
self.readCount += 1
context.read()
}
func channelActive(context: ChannelHandlerContext) {
var buffer = context.channel.allocator.buffer(capacity: 12)
buffer.writeString("Hello, world!")
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)
if buffer.readString(length: buffer.readableBytes) == "Hello, world!" {
self.testCompletePromise.succeed(())
}
}
}
let testCompletePromise = self.group.next().makePromise(of: Void.self)
let testHandler = TestHandler(testCompletePromise: testCompletePromise)
let listener = try assertNoThrowWithValue(NIOTSListenerBootstrap(group: self.group)
.childChannelInitializer { channel in channel.pipeline.addHandler(EchoHandler()) }
.bind(host: "localhost", port: 0).wait())
defer {
XCTAssertNoThrow(try listener.close().wait())
}
let connectBootstrap = NIOTSConnectionBootstrap(group: self.group)
.channelInitializer { channel in channel.pipeline.addHandler(testHandler) }
let connection = try assertNoThrowWithValue(connectBootstrap.connect(to: listener.localAddress!).wait())
defer {
XCTAssertNoThrow(try connection.close().wait())
}
// Let the test run.
XCTAssertNoThrow(try testCompletePromise.futureResult.wait())
// When the test is completed, we expect the following:
//
// 1. channelActive, which leads to a write and flush.
// 2. read, triggered by autoRead.
// 3. channelRead, enabled by the read above, which completes our promise.
// 4. IN THE SAME EVENT LOOP TICK, read(), triggered by autoRead.
//
// Thus, once the test has completed we can enter the event loop and check the read count.
// We expect 2.
XCTAssertNoThrow(try connection.eventLoop.submit {
XCTAssertEqual(testHandler.readCount, 2)
}.wait())
}
}
#endif

View File

@ -22,6 +22,20 @@ import Foundation
import Network
func assertNoThrowWithValue<T>(_ body: @autoclosure () throws -> T, defaultValue: T? = nil, message: String? = nil, file: StaticString = #file, line: UInt = #line) throws -> T {
do {
return try body()
} catch {
XCTFail("\(message.map { $0 + ": " } ?? "")unexpected error \(error) thrown", file: file, line: line)
if let defaultValue = defaultValue {
return defaultValue
} else {
throw error
}
}
}
final class EchoHandler: ChannelInboundHandler {
typealias InboundIn = Any
typealias OutboundOut = Any