internal/jsonrpc2: make Serve wait for all connections to close

Also eliminate a goroutine leak in case of connection timeout.

Change-Id: I82c1a8352658353ef5d0ba4e7ce9e617ce7b42a3
Reviewed-on: https://go-review.googlesource.com/c/tools/+/236750
Trust: Bryan C. Mills <bcmills@google.com>
Run-TryBot: Bryan C. Mills <bcmills@google.com>
gopls-CI: kokoro <noreply+kokoro@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Ian Cottrell <iancottrell@google.com>
This commit is contained in:
Bryan C. Mills 2020-06-05 15:19:41 -04:00
parent 2cdcc60405
commit b766c284d5
1 changed files with 48 additions and 25 deletions

View File

@ -6,7 +6,6 @@ package jsonrpc2
import (
"context"
"fmt"
"io"
"net"
"os"
@ -65,47 +64,69 @@ func ListenAndServe(ctx context.Context, network, addr string, server StreamServ
// the provided server. If idleTimeout is non-zero, ListenAndServe exits after
// there are no clients for this duration, otherwise it exits only on error.
func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeout time.Duration) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Max duration: ~290 years; surely that's long enough.
const forever = 1<<63 - 1
if idleTimeout <= 0 {
idleTimeout = forever
}
connTimer := time.NewTimer(idleTimeout)
newConns := make(chan net.Conn)
doneListening := make(chan error)
closedConns := make(chan error)
activeConns := 0
var acceptErr error
go func() {
defer close(newConns)
for {
nc, err := ln.Accept()
if err != nil {
select {
case doneListening <- fmt.Errorf("Accept(): %w", err):
case <-ctx.Done():
}
var nc net.Conn
nc, acceptErr = ln.Accept()
if acceptErr != nil {
return
}
newConns <- nc
}
}()
activeConns := 0
ctx, cancel := context.WithCancel(ctx)
defer func() {
// Signal the Accept goroutine to stop immediately
// and terminate all newly-accepted connections until it returns.
ln.Close()
for nc := range newConns {
nc.Close()
}
// Cancel pending ServeStream callbacks and wait for them to finish.
cancel()
for activeConns > 0 {
err := <-closedConns
if !isClosingError(err) {
event.Error(ctx, "closed a connection", err)
}
activeConns--
}
}()
// Max duration: ~290 years; surely that's long enough.
const forever = 1<<63 - 1
if idleTimeout <= 0 {
idleTimeout = forever
}
connTimer := time.NewTimer(idleTimeout)
defer connTimer.Stop()
for {
select {
case netConn := <-newConns:
case netConn, ok := <-newConns:
if !ok {
return acceptErr
}
if activeConns == 0 && !connTimer.Stop() {
// connTimer.C may receive a value even after Stop returns.
// (See https://golang.org/issue/37196.)
<-connTimer.C
}
activeConns++
connTimer.Stop()
stream := NewHeaderStream(netConn)
go func() {
conn := NewConn(stream)
closedConns <- server.ServeStream(ctx, conn)
err := server.ServeStream(ctx, conn)
stream.Close()
closedConns <- err
}()
case err := <-doneListening:
return err
case err := <-closedConns:
if !isClosingError(err) {
event.Error(ctx, "closed a connection", err)
@ -114,10 +135,12 @@ func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeou
if activeConns == 0 {
connTimer.Reset(idleTimeout)
}
case <-connTimer.C:
return ErrIdleTimeout
case <-ctx.Done():
return ctx.Err()
return nil
}
}
}