From b766c284d54c8e81c08f6e458b48dd018dca2994 Mon Sep 17 00:00:00 2001 From: "Bryan C. Mills" Date: Fri, 5 Jun 2020 15:19:41 -0400 Subject: [PATCH] internal/jsonrpc2: make Serve wait for all connections to close Also eliminate a goroutine leak in case of connection timeout. Change-Id: I82c1a8352658353ef5d0ba4e7ce9e617ce7b42a3 Reviewed-on: https://go-review.googlesource.com/c/tools/+/236750 Trust: Bryan C. Mills Run-TryBot: Bryan C. Mills gopls-CI: kokoro TryBot-Result: Go Bot Reviewed-by: Ian Cottrell --- internal/jsonrpc2/serve.go | 73 +++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/internal/jsonrpc2/serve.go b/internal/jsonrpc2/serve.go index b9e31a8573..d587971527 100644 --- a/internal/jsonrpc2/serve.go +++ b/internal/jsonrpc2/serve.go @@ -6,7 +6,6 @@ package jsonrpc2 import ( "context" - "fmt" "io" "net" "os" @@ -65,47 +64,69 @@ func ListenAndServe(ctx context.Context, network, addr string, server StreamServ // the provided server. If idleTimeout is non-zero, ListenAndServe exits after // there are no clients for this duration, otherwise it exits only on error. func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeout time.Duration) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - // Max duration: ~290 years; surely that's long enough. - const forever = 1<<63 - 1 - if idleTimeout <= 0 { - idleTimeout = forever - } - connTimer := time.NewTimer(idleTimeout) - newConns := make(chan net.Conn) - doneListening := make(chan error) closedConns := make(chan error) - + activeConns := 0 + var acceptErr error go func() { + defer close(newConns) for { - nc, err := ln.Accept() - if err != nil { - select { - case doneListening <- fmt.Errorf("Accept(): %w", err): - case <-ctx.Done(): - } + var nc net.Conn + nc, acceptErr = ln.Accept() + if acceptErr != nil { return } newConns <- nc } }() - activeConns := 0 + ctx, cancel := context.WithCancel(ctx) + defer func() { + // Signal the Accept goroutine to stop immediately + // and terminate all newly-accepted connections until it returns. + ln.Close() + for nc := range newConns { + nc.Close() + } + // Cancel pending ServeStream callbacks and wait for them to finish. + cancel() + for activeConns > 0 { + err := <-closedConns + if !isClosingError(err) { + event.Error(ctx, "closed a connection", err) + } + activeConns-- + } + }() + + // Max duration: ~290 years; surely that's long enough. + const forever = 1<<63 - 1 + if idleTimeout <= 0 { + idleTimeout = forever + } + connTimer := time.NewTimer(idleTimeout) + defer connTimer.Stop() + for { select { - case netConn := <-newConns: + case netConn, ok := <-newConns: + if !ok { + return acceptErr + } + if activeConns == 0 && !connTimer.Stop() { + // connTimer.C may receive a value even after Stop returns. + // (See https://golang.org/issue/37196.) + <-connTimer.C + } activeConns++ - connTimer.Stop() stream := NewHeaderStream(netConn) go func() { conn := NewConn(stream) - closedConns <- server.ServeStream(ctx, conn) + err := server.ServeStream(ctx, conn) stream.Close() + closedConns <- err }() - case err := <-doneListening: - return err + case err := <-closedConns: if !isClosingError(err) { event.Error(ctx, "closed a connection", err) @@ -114,10 +135,12 @@ func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeou if activeConns == 0 { connTimer.Reset(idleTimeout) } + case <-connTimer.C: return ErrIdleTimeout + case <-ctx.Done(): - return ctx.Err() + return nil } } }