Remove sendability reqs from test type
This commit is contained in:
parent
b31ad4e336
commit
3fb3484409
|
|
@ -52,56 +52,34 @@ final class EchoHandler: ChannelInboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
final class ReadExpecter: ChannelInboundHandler, Sendable {
|
||||
final class ReadExpecter: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
|
||||
struct DidNotReadError: Error {}
|
||||
|
||||
private let readPromise: NIOLockedValueBox<EventLoopPromise<Void>?>
|
||||
private let cumulationBuffer: NIOLockedValueBox<ByteBuffer?>
|
||||
private let readPromise: EventLoopPromise<Void>
|
||||
private var cumulationBuffer: ByteBuffer?
|
||||
private let expectedRead: ByteBuffer
|
||||
|
||||
var readFuture: EventLoopFuture<Void>? {
|
||||
self.readPromise.withLockedValue { $0?.futureResult }
|
||||
}
|
||||
|
||||
init(expecting: ByteBuffer) {
|
||||
self.readPromise = .init(nil)
|
||||
self.cumulationBuffer = .init(nil)
|
||||
init(expecting: ByteBuffer, readPromise: EventLoopPromise<Void>) {
|
||||
self.readPromise = readPromise
|
||||
self.cumulationBuffer = nil
|
||||
self.expectedRead = expecting
|
||||
}
|
||||
|
||||
func handlerAdded(context: ChannelHandlerContext) {
|
||||
self.readPromise.withLockedValue { $0 = context.eventLoop.makePromise() }
|
||||
}
|
||||
|
||||
func handlerRemoved(context: ChannelHandlerContext) {
|
||||
self.readPromise.withLockedValue {
|
||||
if let promise = $0 {
|
||||
promise.fail(DidNotReadError())
|
||||
}
|
||||
}
|
||||
self.readPromise.fail(DidNotReadError())
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
self.cumulationBuffer.withLockedValue {
|
||||
var bytes = self.unwrapInboundIn(data)
|
||||
$0.setOrWriteBuffer(&bytes)
|
||||
}
|
||||
|
||||
var bytes = self.unwrapInboundIn(data)
|
||||
self.cumulationBuffer.setOrWriteBuffer(&bytes)
|
||||
self.maybeFulfillPromise()
|
||||
}
|
||||
|
||||
private func maybeFulfillPromise() {
|
||||
self.readPromise.withLockedValue { readPromise in
|
||||
self.cumulationBuffer.withLockedValue { cumulationBuffer in
|
||||
guard cumulationBuffer == self.expectedRead else { return }
|
||||
}
|
||||
if let promise = readPromise {
|
||||
promise.succeed(())
|
||||
readPromise = nil
|
||||
}
|
||||
}
|
||||
guard self.cumulationBuffer == self.expectedRead else { return }
|
||||
self.readPromise.succeed(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -188,9 +166,12 @@ final class WaitForActiveHandler: ChannelInboundHandler {
|
|||
extension Channel {
|
||||
/// Expect that the given bytes will be received.
|
||||
func expectRead(_ bytes: ByteBuffer) -> EventLoopFuture<Void> {
|
||||
let expecter = ReadExpecter(expecting: bytes)
|
||||
return self.pipeline.addHandler(expecter).flatMap {
|
||||
expecter.readFuture!
|
||||
let readPromise = self.eventLoop.makePromise(of: Void.self)
|
||||
return self.eventLoop.submit {
|
||||
let expecter = ReadExpecter(expecting: bytes, readPromise: readPromise)
|
||||
try self.pipeline.syncOperations.addHandler(expecter)
|
||||
}.flatMap {
|
||||
readPromise.futureResult
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue