FilterEmptyWritesHandler (#72)
Add filter removing empty writes to remedy a bug in Network Framework. This is a work in progress, after some initial discussions and suggestions by @weissi. Motivation: There is a known bug in Network Framework affecting iOS devices, which will stall a TCP connection after an empty, zero length write. This bug is not found in Network Framework on MacOS. While the bug fix will be rolled out in future versions of iOS it may take some time. Also, to better support the current and older versions of iOS where the bug remains, @weissi suggested to add a ChannelOutboundHandler, that filters out empty writes. Modifications: Add a FilterEmptyWritesHandler, which on affected iOS versions can be added by default to all Channels. Unit tests for all additions and modifications. Result: With this workaround NIOTransportServices can support all iOS versions with Network Framework.
This commit is contained in:
parent
cefc7014fc
commit
7f98392c5d
|
|
@ -0,0 +1,163 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIO
|
||||
|
||||
|
||||
/// A `ChannelHandler` that checks for outbound writes of zero length, which are then dropped. This is
|
||||
/// due to a bug in `Network Framework`, where zero byte TCP writes lead to stalled connections.
|
||||
/// Write promises are confirmed in the correct order.
|
||||
public final class NIOFilterEmptyWritesHandler: ChannelDuplexHandler {
|
||||
public typealias InboundIn = ByteBuffer
|
||||
public typealias InboundOut = ByteBuffer
|
||||
public typealias OutboundIn = ByteBuffer
|
||||
public typealias OutboundOut = ByteBuffer
|
||||
|
||||
fileprivate enum ChannelState: Equatable {
|
||||
case notActiveYet
|
||||
case open
|
||||
case closedFromLocal
|
||||
case closedFromRemote
|
||||
case error
|
||||
}
|
||||
|
||||
private var state: ChannelState = .notActiveYet
|
||||
private var prefixEmptyWritePromise: Optional<EventLoopPromise<Void>>
|
||||
private var lastWritePromise: Optional<EventLoopPromise<Void>>
|
||||
|
||||
public init() {
|
||||
self.prefixEmptyWritePromise = nil
|
||||
self.lastWritePromise = nil
|
||||
}
|
||||
|
||||
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||
switch self.state {
|
||||
case .open:
|
||||
let buffer = self.unwrapOutboundIn(data)
|
||||
if buffer.readableBytes > 0 {
|
||||
self.lastWritePromise = promise ?? context.eventLoop.makePromise()
|
||||
context.write(data, promise: self.lastWritePromise)
|
||||
} else {
|
||||
/*
|
||||
Empty writes need to be handled individually depending on:
|
||||
A) Empty write occurring before any non-empty write needs a
|
||||
separate promise to cascade from (prefix)
|
||||
B) Non-empty writes carry a promise, that subsequent empty
|
||||
writes can cascade from
|
||||
*/
|
||||
switch (self.prefixEmptyWritePromise, self.lastWritePromise, promise) {
|
||||
case (_, _, .none): ()
|
||||
case (.none, .none, .some(let promise)):
|
||||
self.prefixEmptyWritePromise = promise
|
||||
case (_, .some(let lastWritePromise), .some(let promise)):
|
||||
lastWritePromise.futureResult.cascade(to: promise)
|
||||
case (.some(let prefixEmptyWritePromise), .none, .some(let promise)):
|
||||
prefixEmptyWritePromise.futureResult.cascade(to: promise)
|
||||
}
|
||||
}
|
||||
case .closedFromLocal, .closedFromRemote, .error:
|
||||
// Since channel is closed, Network Framework bug is not triggered for empty writes
|
||||
context.write(data, promise: promise)
|
||||
case .notActiveYet:
|
||||
preconditionFailure()
|
||||
}
|
||||
}
|
||||
|
||||
public func flush(context: ChannelHandlerContext) {
|
||||
self.lastWritePromise = nil
|
||||
if let prefixEmptyWritePromise = self.prefixEmptyWritePromise {
|
||||
self.prefixEmptyWritePromise = nil
|
||||
prefixEmptyWritePromise.succeed(())
|
||||
}
|
||||
|
||||
context.flush()
|
||||
}
|
||||
}
|
||||
|
||||
// Connection state management
|
||||
extension NIOFilterEmptyWritesHandler {
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
assert(self.state == .notActiveYet)
|
||||
self.state = .open
|
||||
context.fireChannelActive()
|
||||
}
|
||||
|
||||
public func channelInactive(context: ChannelHandlerContext) {
|
||||
let save = self.prefixEmptyWritePromise
|
||||
self.prefixEmptyWritePromise = nil
|
||||
self.lastWritePromise = nil
|
||||
|
||||
switch self.state {
|
||||
case .open:
|
||||
self.state = .closedFromRemote
|
||||
save?.fail(ChannelError.eof)
|
||||
case .closedFromLocal, .closedFromRemote, .error:
|
||||
assert(save == nil)
|
||||
case .notActiveYet:
|
||||
preconditionFailure()
|
||||
}
|
||||
context.fireChannelInactive()
|
||||
}
|
||||
|
||||
public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
|
||||
let save = self.prefixEmptyWritePromise
|
||||
self.prefixEmptyWritePromise = nil
|
||||
self.lastWritePromise = nil
|
||||
|
||||
switch (mode, self.state) {
|
||||
case (.all, .open),
|
||||
(.output, .open):
|
||||
self.state = .closedFromLocal
|
||||
save?.fail(ChannelError.outputClosed)
|
||||
case (.all, .closedFromLocal),
|
||||
(.output, .closedFromLocal),
|
||||
(.all, .closedFromRemote),
|
||||
(.output, .closedFromRemote),
|
||||
(.all, .notActiveYet),
|
||||
(.output, .notActiveYet),
|
||||
(.all, .error),
|
||||
(.output, .error):
|
||||
assert(save == nil)
|
||||
case (.input, _):
|
||||
save?.fail(ChannelError.operationUnsupported)
|
||||
}
|
||||
|
||||
context.close(mode: mode, promise: promise)
|
||||
}
|
||||
|
||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
||||
let save = self.prefixEmptyWritePromise
|
||||
self.prefixEmptyWritePromise = nil
|
||||
self.lastWritePromise = nil
|
||||
|
||||
switch self.state {
|
||||
case .open:
|
||||
self.state = .error
|
||||
save?.fail(error)
|
||||
case .closedFromLocal, .closedFromRemote, .error:
|
||||
assert(save == nil)
|
||||
case .notActiveYet:
|
||||
preconditionFailure()
|
||||
}
|
||||
|
||||
context.fireErrorCaught(error)
|
||||
}
|
||||
|
||||
public func handlerAdded(context: ChannelHandlerContext) {
|
||||
assert(self.state == .notActiveYet)
|
||||
if context.channel.isActive {
|
||||
self.state = .open
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,262 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import XCTest
|
||||
import NIO
|
||||
import NIOTransportServices
|
||||
|
||||
|
||||
class NIOFilterEmptyWritesHandlerTests: XCTestCase {
|
||||
var allocator: ByteBufferAllocator!
|
||||
var channel: EmbeddedChannel!
|
||||
var eventLoop: EmbeddedEventLoop!
|
||||
|
||||
override func setUp() {
|
||||
self.channel = EmbeddedChannel(handler: NIOFilterEmptyWritesHandler())
|
||||
XCTAssertNoThrow(try self.channel.connect(to: .init(ipAddress: "1.1.1.1", port: 1)).wait())
|
||||
self.allocator = self.channel.allocator
|
||||
let eventLoop = self.channel.eventLoop as! EmbeddedEventLoop
|
||||
self.eventLoop = eventLoop
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertTrue(try self.channel.finish().isClean)
|
||||
)
|
||||
self.channel = nil
|
||||
self.eventLoop = nil
|
||||
}
|
||||
|
||||
func testEmptyWritePromise() {
|
||||
let emptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let emptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(
|
||||
try emptyWritePromise.futureResult.wait()
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
}
|
||||
|
||||
func testEmptyWritesNoWriteThrough() {
|
||||
class OutboundTestHandler: ChannelOutboundHandler {
|
||||
typealias OutboundIn = ByteBuffer
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
func write(context: ChannelHandlerContext,
|
||||
data: NIOAny,
|
||||
promise: EventLoopPromise<Void>?) {
|
||||
XCTFail()
|
||||
context.write(data, promise: promise)
|
||||
}
|
||||
}
|
||||
XCTAssertNoThrow(
|
||||
try self.channel.pipeline.addHandler(OutboundTestHandler(),
|
||||
position: .first).wait()
|
||||
)
|
||||
let emptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenEmptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
self.channel.write(NIOAny(emptyWrite), promise: nil)
|
||||
self.channel.write(NIOAny(thenEmptyWrite),
|
||||
promise: thenEmptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(try thenEmptyWritePromise.futureResult.wait())
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
}
|
||||
|
||||
func testSomeWriteThenEmptyWritePromiseCascade() {
|
||||
let someWrite = self.allocator.bufferFor(string: "non empty")
|
||||
let someWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenEmptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
enum CheckOrder {
|
||||
case noWrite
|
||||
case someWrite
|
||||
case thenEmptyWrite
|
||||
}
|
||||
var checkOrder = CheckOrder.noWrite
|
||||
someWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .noWrite)
|
||||
checkOrder = .someWrite
|
||||
}
|
||||
thenEmptyWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .someWrite)
|
||||
checkOrder = .thenEmptyWrite
|
||||
}
|
||||
self.channel.write(NIOAny(someWrite),
|
||||
promise: someWritePromise)
|
||||
self.channel.write(NIOAny(thenEmptyWrite),
|
||||
promise: thenEmptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(try thenEmptyWritePromise.futureResult.wait())
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNotNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
XCTAssertEqual(checkOrder, .thenEmptyWrite)
|
||||
}
|
||||
|
||||
func testEmptyWriteTwicePromiseCascade() {
|
||||
let emptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let emptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenEmptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
enum CheckOrder {
|
||||
case noWrite
|
||||
case emptyWrite
|
||||
case thenEmptyWrite
|
||||
}
|
||||
var checkOrder = CheckOrder.noWrite
|
||||
emptyWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .noWrite)
|
||||
checkOrder = .emptyWrite
|
||||
}
|
||||
thenEmptyWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .emptyWrite)
|
||||
checkOrder = .thenEmptyWrite
|
||||
}
|
||||
self.channel.write(NIOAny(emptyWrite),
|
||||
promise: emptyWritePromise)
|
||||
self.channel.write(NIOAny(thenEmptyWrite),
|
||||
promise: thenEmptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(try thenEmptyWritePromise.futureResult.wait())
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
XCTAssertEqual(checkOrder, .thenEmptyWrite)
|
||||
}
|
||||
|
||||
func testEmptyWriteThenSomeWriteThenEmptyWritePromiseCascade() {
|
||||
let emptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let emptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
let thenSomeWrite = self.allocator.bufferFor(string: "non-empty")
|
||||
let thenSomeWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenEmptyWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
enum CheckOrder {
|
||||
case noWrite
|
||||
case emptyWrite
|
||||
case thenSomeWrite
|
||||
case thenEmptyWrite
|
||||
}
|
||||
var checkOrder = CheckOrder.noWrite
|
||||
emptyWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .noWrite)
|
||||
checkOrder = .emptyWrite
|
||||
}
|
||||
thenSomeWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .emptyWrite)
|
||||
checkOrder = .thenSomeWrite
|
||||
}
|
||||
thenEmptyWritePromise.futureResult.whenSuccess {
|
||||
XCTAssertEqual(checkOrder, .thenSomeWrite)
|
||||
checkOrder = .thenEmptyWrite
|
||||
}
|
||||
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
|
||||
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
|
||||
self.channel.write(NIOAny(thenEmptyWrite), promise: thenEmptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(try thenEmptyWritePromise.futureResult.wait())
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNotNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
XCTAssertEqual(checkOrder, .thenEmptyWrite)
|
||||
}
|
||||
|
||||
func testSomeWriteWithNilPromiseThenEmptyWriteWithNilPromiseThenSomeWrite() {
|
||||
let someWrite = self.allocator.bufferFor(string: "non empty")
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
let thenSomeWrite = self.allocator.bufferFor(string: "then some")
|
||||
let thenSomeWritePromise = self.eventLoop.makePromise(of: Void.self)
|
||||
self.channel.write(NIOAny(someWrite), promise: nil)
|
||||
self.channel.write(NIOAny(thenEmptyWrite), promise: nil)
|
||||
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(try thenSomeWritePromise.futureResult.wait())
|
||||
var someWriteOutput: ByteBuffer?
|
||||
XCTAssertNoThrow(
|
||||
someWriteOutput = try self.channel.readOutbound()
|
||||
)
|
||||
XCTAssertEqual(someWriteOutput, someWrite)
|
||||
var thenSomeWriteOutput: ByteBuffer?
|
||||
XCTAssertNoThrow(
|
||||
thenSomeWriteOutput = try self.channel.readOutbound()
|
||||
)
|
||||
XCTAssertEqual(thenSomeWriteOutput, thenSomeWrite)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
}
|
||||
|
||||
func testSomeWriteAndFlushThenSomeWriteAndFlush() {
|
||||
let someWrite = self.allocator.bufferFor(string: "non empty")
|
||||
var someWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
|
||||
self.channel.write(NIOAny(someWrite), promise: someWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(
|
||||
try someWritePromise.futureResult.wait()
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNotNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
someWritePromise = nil
|
||||
let thenSomeWrite = self.allocator.bufferFor(string: "then some")
|
||||
var thenSomeWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
|
||||
self.channel.write(NIOAny(thenSomeWrite), promise: thenSomeWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(
|
||||
try thenSomeWritePromise.futureResult.wait()
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNotNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
thenSomeWritePromise = nil
|
||||
}
|
||||
|
||||
func testEmptyWriteAndFlushThenEmptyWriteAndFlush() {
|
||||
let emptyWrite = self.allocator.buffer(capacity: 0)
|
||||
var emptyWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
|
||||
self.channel.write(NIOAny(emptyWrite), promise: emptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(
|
||||
try emptyWritePromise.futureResult.wait()
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
emptyWritePromise = nil
|
||||
let thenEmptyWrite = self.allocator.buffer(capacity: 0)
|
||||
var thenEmptyWritePromise: EventLoopPromise<Void>! = self.eventLoop.makePromise()
|
||||
self.channel.write(NIOAny(thenEmptyWrite), promise: thenEmptyWritePromise)
|
||||
self.channel.flush()
|
||||
XCTAssertNoThrow(
|
||||
try thenEmptyWritePromise.futureResult.wait()
|
||||
)
|
||||
XCTAssertNoThrow(
|
||||
XCTAssertNil(try self.channel.readOutbound(as: ByteBuffer.self))
|
||||
)
|
||||
thenEmptyWritePromise = nil
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue