net/http: pool transport gzip readers

goos: linux
goarch: amd64
pkg: net/http
             │   HEAD~1    │              HEAD              │
             │   sec/op    │    sec/op     vs base          │
ClientGzip-8   621.0µ ± 2%   616.3µ ± 10%  ~ (p=0.971 n=10)

             │    HEAD~1     │                 HEAD                 │
             │     B/op      │     B/op      vs base                │
ClientGzip-8   49.765Ki ± 0%   9.514Ki ± 2%  -80.88% (p=0.000 n=10)

             │   HEAD~1   │               HEAD                │
             │ allocs/op  │ allocs/op   vs base               │
ClientGzip-8   57.00 ± 0%   52.00 ± 0%  -8.77% (p=0.000 n=10)

Allocation saving comes from absent compress/flate.(*dictDecoder).init

Updates #61353
This commit is contained in:
Alexander Yastrebov 2023-11-10 18:22:14 +01:00
parent 1baf68b227
commit e26eb44fcc
1 changed files with 82 additions and 19 deletions

View File

@ -11,6 +11,7 @@ package http
import (
"bufio"
"compress/flate"
"compress/gzip"
"container/list"
"context"
@ -2976,6 +2977,7 @@ type bodyEOFSignal struct {
}
var errReadOnClosedResBody = errors.New("http: read on closed response body")
var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
@ -3025,37 +3027,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
}
// gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read
// get gzip.Reader from the pool on the first call to Read.
// After Close is called it puts gzip.Reader to the pool immediately
// if there is no Read in progress or later when Read completes.
type gzipReader struct {
_ incomparable
body *bodyEOFSignal // underlying HTTP/1 response body framing
zr *gzip.Reader // lazily-initialized gzip reader
zerr error // any error from gzip.NewReader; sticky
mu sync.Mutex // guards zr and zerr
zr *gzip.Reader
zerr error
}
type eofReader struct{}
func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
zr := gzipPool.Get().(*gzip.Reader)
if err := zr.Reset(r); err != nil {
gzipPoolPut(zr)
return nil, err
}
return zr, nil
}
// gzipPoolPut puts a gzip.Reader back into the pool.
func gzipPoolPut(zr *gzip.Reader) {
// Reset will allocate bufio.Reader if we pass it anything
// other than a flate.Reader, so ensure that it's getting one.
var r flate.Reader = eofReader{}
zr.Reset(r)
gzipPool.Put(zr)
}
// acquire returns a gzip.Reader for reading response body.
// The reader must be released after use.
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr != nil {
return nil, gz.zerr
}
if gz.zr == nil {
gz.zr, gz.zerr = gzipPoolGet(gz.body)
if gz.zerr != nil {
return nil, gz.zerr
}
}
ret := gz.zr
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
return ret, nil
}
// release returns the gzip.Reader to the pool if Close was called during Read.
func (gz *gzipReader) release(zr *gzip.Reader) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == errConcurrentReadOnResBody {
gz.zr, gz.zerr = zr, nil
} else { // errReadOnClosedResBody
gzipPoolPut(zr)
}
}
// close returns the gzip.Reader to the pool immediately or
// signals release to do so after Read completes.
func (gz *gzipReader) close() {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == nil && gz.zr != nil {
gzipPoolPut(gz.zr)
gz.zr = nil
}
gz.zerr = errReadOnClosedResBody
}
func (gz *gzipReader) Read(p []byte) (n int, err error) {
if gz.zr == nil {
if gz.zerr == nil {
gz.zr, gz.zerr = gzip.NewReader(gz.body)
}
if gz.zerr != nil {
return 0, gz.zerr
}
}
gz.body.mu.Lock()
if gz.body.closed {
err = errReadOnClosedResBody
}
gz.body.mu.Unlock()
zr, err := gz.acquire()
if err != nil {
return 0, err
}
return gz.zr.Read(p)
defer gz.release(zr)
return zr.Read(p)
}
func (gz *gzipReader) Close() error {
gz.close()
return gz.body.Close()
}