Added buffer pool to NIOTSConnectionChannel (#236)
Added buffer pool to NIOTSConnectionChannel for reading messages. ### Motivation: In order to avoid creating a new `ByteBuffer` for every single message read from the channel, we decided to reuse the `NIOPooledRecvBufferAllocator` type from NIOCore to leverage a pool of buffers. ### Modifications: Used the buffer allocation mechanism provided by the `NIOPooledRecvBufferAllocator` to reuse and adapt previously created receive buffers. ### Result: Channel reads can now reuse previously created buffers, avoiding unnecessary overhead by creating new buffers every single time.
This commit is contained in:
parent
7114435533
commit
9224dc5159
|
|
@ -39,7 +39,7 @@ let package = Package(
|
||||||
.library(name: "NIOTransportServices", targets: ["NIOTransportServices"])
|
.library(name: "NIOTransportServices", targets: ["NIOTransportServices"])
|
||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
|
.package(url: "https://github.com/apple/swift-nio.git", from: "2.83.0"),
|
||||||
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
|
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
|
||||||
],
|
],
|
||||||
targets: [
|
targets: [
|
||||||
|
|
|
||||||
|
|
@ -236,6 +236,15 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
|
||||||
/// A lock that guards the _addressCache.
|
/// A lock that guards the _addressCache.
|
||||||
internal let _addressCacheLock = NIOLock()
|
internal let _addressCacheLock = NIOLock()
|
||||||
|
|
||||||
|
/// The `NIOPooledRecvBufferAllocator` used to allocate buffers for incoming data
|
||||||
|
private var recvBufferPool: NIOPooledRecvBufferAllocator
|
||||||
|
|
||||||
|
/// A constant to hold the maximum amount of buffers that should be created by the `NIOPooledRecvBufferAllocator`
|
||||||
|
///
|
||||||
|
/// Once we allow multiple messages per read on the channel, this should become a `maxMessagesPerRead` property
|
||||||
|
/// and a corresponding channel option that users can configure.
|
||||||
|
private let recvBufferPoolCapacity = 4
|
||||||
|
|
||||||
/// Create a `NIOTSConnectionChannel` on a given `NIOTSEventLoop`.
|
/// Create a `NIOTSConnectionChannel` on a given `NIOTSEventLoop`.
|
||||||
///
|
///
|
||||||
/// Note that `NIOTSConnectionChannel` objects cannot be created on arbitrary loops types.
|
/// Note that `NIOTSConnectionChannel` objects cannot be created on arbitrary loops types.
|
||||||
|
|
@ -247,6 +256,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
|
||||||
maximumReceiveLength: Int = 8192,
|
maximumReceiveLength: Int = 8192,
|
||||||
tcpOptions: NWProtocolTCP.Options,
|
tcpOptions: NWProtocolTCP.Options,
|
||||||
tlsOptions: NWProtocolTLS.Options?,
|
tlsOptions: NWProtocolTLS.Options?,
|
||||||
|
recvAllocator: RecvByteBufferAllocator = AdaptiveRecvByteBufferAllocator(),
|
||||||
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
|
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
|
||||||
) {
|
) {
|
||||||
self.tsEventLoop = eventLoop
|
self.tsEventLoop = eventLoop
|
||||||
|
|
@ -257,6 +267,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
|
||||||
self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos)
|
self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos)
|
||||||
self.tcpOptions = tcpOptions
|
self.tcpOptions = tcpOptions
|
||||||
self.tlsOptions = tlsOptions
|
self.tlsOptions = tlsOptions
|
||||||
|
self.recvBufferPool = .init(capacity: Int(self.recvBufferPoolCapacity), recvAllocator: recvAllocator)
|
||||||
self.nwParametersConfigurator = nwParametersConfigurator
|
self.nwParametersConfigurator = nwParametersConfigurator
|
||||||
|
|
||||||
// Must come last, as it requires self to be completely initialized.
|
// Must come last, as it requires self to be completely initialized.
|
||||||
|
|
@ -273,6 +284,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
|
||||||
maximumReceiveLength: Int = 8192,
|
maximumReceiveLength: Int = 8192,
|
||||||
tcpOptions: NWProtocolTCP.Options,
|
tcpOptions: NWProtocolTCP.Options,
|
||||||
tlsOptions: NWProtocolTLS.Options?,
|
tlsOptions: NWProtocolTLS.Options?,
|
||||||
|
recvAllocator: RecvByteBufferAllocator = AdaptiveRecvByteBufferAllocator(),
|
||||||
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
|
nwParametersConfigurator: (@Sendable (NWParameters) -> Void)?
|
||||||
) {
|
) {
|
||||||
self.init(
|
self.init(
|
||||||
|
|
@ -283,6 +295,7 @@ internal final class NIOTSConnectionChannel: StateManagedNWConnectionChannel {
|
||||||
maximumReceiveLength: maximumReceiveLength,
|
maximumReceiveLength: maximumReceiveLength,
|
||||||
tcpOptions: tcpOptions,
|
tcpOptions: tcpOptions,
|
||||||
tlsOptions: tlsOptions,
|
tlsOptions: tlsOptions,
|
||||||
|
recvAllocator: recvAllocator,
|
||||||
nwParametersConfigurator: nwParametersConfigurator
|
nwParametersConfigurator: nwParametersConfigurator
|
||||||
)
|
)
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
|
|
@ -412,10 +425,12 @@ extension NIOTSConnectionChannel {
|
||||||
if let content = content {
|
if let content = content {
|
||||||
// It would be nice if we didn't have to do this copy, but I'm not sure how to avoid it with the current Data
|
// It would be nice if we didn't have to do this copy, but I'm not sure how to avoid it with the current Data
|
||||||
// APIs.
|
// APIs.
|
||||||
var buffer = self.allocator.buffer(capacity: content.count)
|
let (buffer, bytesReceived) = self.recvBufferPool.buffer(allocator: allocator) { $0.writeBytes(content) }
|
||||||
buffer.writeBytes(content)
|
|
||||||
|
self.recvBufferPool.record(actualReadBytes: bytesReceived)
|
||||||
self.pipeline.fireChannelRead(buffer)
|
self.pipeline.fireChannelRead(buffer)
|
||||||
self.pipeline.fireChannelReadComplete()
|
self.pipeline.fireChannelReadComplete()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next, we want to check if there's an error. If there is, we're going to deliver it, and then close the connection with
|
// Next, we want to check if there's an error. If there is, we're going to deliver it, and then close the connection with
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue