diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index 7e3e490af3..11a74a389f 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -12,6 +12,7 @@ import ( "compress/gzip" "compress/zlib" "context" + crand "crypto/rand" "crypto/tls" "crypto/x509" "encoding/json" @@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode) func BenchmarkServer(b *testing.B) { b.ReportAllocs() // Child process mode; - if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" { - n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N")) + if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" { + n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N")) if err != nil { panic(err) } @@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) { cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$") cmd.Env = append([]string{ - fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N), - fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL), + fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N), + fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL), }, os.Environ()...) out, err := cmd.CombinedOutput() if err != nil { @@ -5337,69 +5338,20 @@ func getNoBody(urlStr string) (*Response, error) { // A benchmark for profiling the client without the HTTP server code. // The server code runs in a subprocess. +// +// For use like: +// +// $ go test -c +// $ ./http.test -test.run='^$' -test.bench='^BenchmarkClient$' -test.benchtime=15s -test.cpuprofile=http.prof +// $ go tool pprof http.test http.prof +// (pprof) web func BenchmarkClient(b *testing.B) { - b.ReportAllocs() - b.StopTimer() - defer afterTest(b) - var data = []byte("Hello world.\n") - if server := os.Getenv("TEST_BENCH_SERVER"); server != "" { - // Server process mode. - port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user - if port == "" { - port = "0" - } - ln, err := net.Listen("tcp", "localhost:"+port) - if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) - } - fmt.Println(ln.Addr().String()) - HandleFunc("/", func(w ResponseWriter, r *Request) { - r.ParseForm() - if r.Form.Get("stop") != "" { - os.Exit(0) - } - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Write(data) - }) - var srv Server - log.Fatal(srv.Serve(ln)) - } - // Start server process. - ctx, cancel := context.WithCancel(context.Background()) - cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$") - cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes") - cmd.Stderr = os.Stderr - stdout, err := cmd.StdoutPipe() - if err != nil { - b.Fatal(err) - } - if err := cmd.Start(); err != nil { - b.Fatalf("subprocess failed to start: %v", err) - } - - done := make(chan error, 1) - go func() { - done <- cmd.Wait() - close(done) - }() - defer func() { - cancel() - <-done - }() - - // Wait for the server in the child process to respond and tell us - // its listening address, once it's started listening: - bs := bufio.NewScanner(stdout) - if !bs.Scan() { - b.Fatalf("failed to read listening URL from child: %v", bs.Err()) - } - url := "http://" + strings.TrimSpace(bs.Text()) + "/" - if _, err := getNoBody(url); err != nil { - b.Fatalf("initial probe of child process failed: %v", err) - } + url := startClientBenchmarkServer(b, "BenchmarkClient", func(w ResponseWriter, _ *Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write(data) + }) // Do b.N requests to the server. b.StartTimer() @@ -5418,12 +5370,115 @@ func BenchmarkClient(b *testing.B) { } } b.StopTimer() +} + +func startClientBenchmarkServer(b *testing.B, benchmarkName string, handler func(ResponseWriter, *Request)) string { + b.ReportAllocs() + b.StopTimer() + + if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" { + // Server process mode. + port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user + if port == "" { + port = "0" + } + ln, err := net.Listen("tcp", "localhost:"+port) + if err != nil { + log.Fatal(err) + } + fmt.Println(ln.Addr().String()) + + HandleFunc("/", func(w ResponseWriter, r *Request) { + r.ParseForm() + if r.Form.Get("stop") != "" { + os.Exit(0) + } + handler(w, r) + }) + var srv Server + log.Fatal(srv.Serve(ln)) + } + + // Start server process. + ctx, cancel := context.WithCancel(context.Background()) + cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+benchmarkName+"$") + cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes") + cmd.Stderr = os.Stderr + stdout, err := cmd.StdoutPipe() + if err != nil { + b.Fatal(err) + } + if err := cmd.Start(); err != nil { + b.Fatalf("subprocess failed to start: %v", err) + } + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + close(done) + }() + + // Wait for the server in the child process to respond and tell us + // its listening address, once it's started listening: + bs := bufio.NewScanner(stdout) + if !bs.Scan() { + b.Fatalf("failed to read listening URL from child: %v", bs.Err()) + } + url := "http://" + strings.TrimSpace(bs.Text()) + "/" + if _, err := getNoBody(url); err != nil { + b.Fatalf("initial probe of child process failed: %v", err) + } // Instruct server process to stop. - getNoBody(url + "?stop=yes") - if err := <-done; err != nil { - b.Fatalf("subprocess failed: %v", err) + b.Cleanup(func() { + getNoBody(url + "?stop=yes") + if err := <-done; err != nil { + b.Fatalf("subprocess failed: %v", err) + } + + cancel() + <-done + + afterTest(b) + }) + + return url +} + +func BenchmarkClientGzip(b *testing.B) { + const responseSize = 1024 * 1024 + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil { + b.Fatal(err) } + gz.Close() + + data := buf.Bytes() + + url := startClientBenchmarkServer(b, "BenchmarkClientGzip", func(w ResponseWriter, _ *Request) { + w.Header().Set("Content-Encoding", "gzip") + w.Write(data) + }) + + // Do b.N requests to the server. + b.StartTimer() + for i := 0; i < b.N; i++ { + res, err := Get(url) + if err != nil { + b.Fatalf("Get: %v", err) + } + n, err := io.Copy(io.Discard, res.Body) + res.Body.Close() + if err != nil { + b.Fatalf("ReadAll: %v", err) + } + if n != responseSize { + b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n) + } + } + b.StopTimer() } func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) { diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 07b3a9e1e7..ce428a30dd 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -11,6 +11,7 @@ package http import ( "bufio" + "compress/flate" "compress/gzip" "container/list" "context" @@ -2982,6 +2983,7 @@ type bodyEOFSignal struct { } var errReadOnClosedResBody = errors.New("http: read on closed response body") +var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body") func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { es.mu.Lock() @@ -3031,37 +3033,98 @@ func (es *bodyEOFSignal) condfn(err error) error { } // gzipReader wraps a response body so it can lazily -// call gzip.NewReader on the first call to Read +// get gzip.Reader from the pool on the first call to Read. +// After Close is called it puts gzip.Reader to the pool immediately +// if there is no Read in progress or later when Read completes. type gzipReader struct { _ incomparable body *bodyEOFSignal // underlying HTTP/1 response body framing - zr *gzip.Reader // lazily-initialized gzip reader - zerr error // any error from gzip.NewReader; sticky + mu sync.Mutex // guards zr and zerr + zr *gzip.Reader + zerr error +} + +type eofReader struct{} + +func (eofReader) Read([]byte) (int, error) { return 0, io.EOF } +func (eofReader) ReadByte() (byte, error) { return 0, io.EOF } + +var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }} + +// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r. +func gzipPoolGet(r io.Reader) (*gzip.Reader, error) { + zr := gzipPool.Get().(*gzip.Reader) + if err := zr.Reset(r); err != nil { + gzipPoolPut(zr) + return nil, err + } + return zr, nil +} + +// gzipPoolPut puts a gzip.Reader back into the pool. +func gzipPoolPut(zr *gzip.Reader) { + // Reset will allocate bufio.Reader if we pass it anything + // other than a flate.Reader, so ensure that it's getting one. + var r flate.Reader = eofReader{} + zr.Reset(r) + gzipPool.Put(zr) +} + +// acquire returns a gzip.Reader for reading response body. +// The reader must be released after use. +func (gz *gzipReader) acquire() (*gzip.Reader, error) { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr != nil { + return nil, gz.zerr + } + if gz.zr == nil { + gz.zr, gz.zerr = gzipPoolGet(gz.body) + if gz.zerr != nil { + return nil, gz.zerr + } + } + ret := gz.zr + gz.zr, gz.zerr = nil, errConcurrentReadOnResBody + return ret, nil +} + +// release returns the gzip.Reader to the pool if Close was called during Read. +func (gz *gzipReader) release(zr *gzip.Reader) { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr == errConcurrentReadOnResBody { + gz.zr, gz.zerr = zr, nil + } else { // errReadOnClosedResBody + gzipPoolPut(zr) + } +} + +// close returns the gzip.Reader to the pool immediately or +// signals release to do so after Read completes. +func (gz *gzipReader) close() { + gz.mu.Lock() + defer gz.mu.Unlock() + if gz.zerr == nil && gz.zr != nil { + gzipPoolPut(gz.zr) + gz.zr = nil + } + gz.zerr = errReadOnClosedResBody } func (gz *gzipReader) Read(p []byte) (n int, err error) { - if gz.zr == nil { - if gz.zerr == nil { - gz.zr, gz.zerr = gzip.NewReader(gz.body) - } - if gz.zerr != nil { - return 0, gz.zerr - } - } - - gz.body.mu.Lock() - if gz.body.closed { - err = errReadOnClosedResBody - } - gz.body.mu.Unlock() - + zr, err := gz.acquire() if err != nil { return 0, err } - return gz.zr.Read(p) + defer gz.release(zr) + + return zr.Read(p) } func (gz *gzipReader) Close() error { + gz.close() + return gz.body.Close() }