From cc40288be83943c50bf80a9702fb35bd5e580e8d Mon Sep 17 00:00:00 2001 From: Ian Cottrell Date: Thu, 30 Apr 2020 11:05:58 -0400 Subject: [PATCH] internal/lsp: change logging stream to be a framer Change-Id: Id9e17e98ca00f31424068e875851b5f9008c6fe8 Reviewed-on: https://go-review.googlesource.com/c/tools/+/231797 Run-TryBot: Ian Cottrell TryBot-Result: Gobot Gobot Reviewed-by: Robert Findley --- internal/jsonrpc2/servertest/servertest.go | 26 +++++++++++------ .../jsonrpc2/servertest/servertest_test.go | 4 +-- internal/lsp/cmd/test/cmdtest.go | 2 +- internal/lsp/lsprpc/lsprpc_test.go | 10 +++---- internal/lsp/regtest/runner.go | 29 ++++++++++--------- 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/internal/jsonrpc2/servertest/servertest.go b/internal/jsonrpc2/servertest/servertest.go index d5dac702bb..a3c5589560 100644 --- a/internal/jsonrpc2/servertest/servertest.go +++ b/internal/jsonrpc2/servertest/servertest.go @@ -26,20 +26,24 @@ type Connector interface { type TCPServer struct { Addr string - ln net.Listener - cls *closerList + ln net.Listener + framer jsonrpc2.Framer + cls *closerList } // NewTCPServer returns a new test server listening on local tcp port and // serving incoming jsonrpc2 streams using the provided stream server. It // panics on any error. -func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer) *TCPServer { +func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer, framer jsonrpc2.Framer) *TCPServer { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(fmt.Sprintf("servertest: failed to listen: %v", err)) } + if framer == nil { + framer = jsonrpc2.NewHeaderStream + } go jsonrpc2.Serve(ctx, ln, server, 0) - return &TCPServer{Addr: ln.Addr().String(), ln: ln, cls: &closerList{}} + return &TCPServer{Addr: ln.Addr().String(), ln: ln, framer: framer, cls: &closerList{}} } // Connect dials the test server and returns a jsonrpc2 Connection that is @@ -52,7 +56,7 @@ func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn { s.cls.add(func() { netConn.Close() }) - return jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn)) + return jsonrpc2.NewConn(s.framer(netConn)) } // Close closes all connected pipes. @@ -64,12 +68,16 @@ func (s *TCPServer) Close() error { // PipeServer is a test server that handles connections over io.Pipes. type PipeServer struct { server jsonrpc2.StreamServer + framer jsonrpc2.Framer cls *closerList } // NewPipeServer returns a test server that can be connected to via io.Pipes. -func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer) *PipeServer { - return &PipeServer{server: server, cls: &closerList{}} +func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer, framer jsonrpc2.Framer) *PipeServer { + if framer == nil { + framer = jsonrpc2.NewRawStream + } + return &PipeServer{server: server, framer: framer, cls: &closerList{}} } // Connect creates new io.Pipes and binds them to the underlying StreamServer. @@ -79,10 +87,10 @@ func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn { sPipe.Close() cPipe.Close() }) - serverStream := jsonrpc2.NewRawStream(sPipe) + serverStream := s.framer(sPipe) go s.server.ServeStream(ctx, serverStream) - clientStream := jsonrpc2.NewRawStream(cPipe) + clientStream := s.framer(cPipe) return jsonrpc2.NewConn(clientStream) } diff --git a/internal/jsonrpc2/servertest/servertest_test.go b/internal/jsonrpc2/servertest/servertest_test.go index f7c36ccd55..38fa21a24d 100644 --- a/internal/jsonrpc2/servertest/servertest_test.go +++ b/internal/jsonrpc2/servertest/servertest_test.go @@ -24,9 +24,9 @@ func TestTestServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() server := jsonrpc2.HandlerServer(fakeHandler) - tcpTS := NewTCPServer(ctx, server) + tcpTS := NewTCPServer(ctx, server, nil) defer tcpTS.Close() - pipeTS := NewPipeServer(ctx, server) + pipeTS := NewPipeServer(ctx, server, nil) defer pipeTS.Close() tests := []struct { diff --git a/internal/lsp/cmd/test/cmdtest.go b/internal/lsp/cmd/test/cmdtest.go index 863e7edb02..78cccae9e5 100644 --- a/internal/lsp/cmd/test/cmdtest.go +++ b/internal/lsp/cmd/test/cmdtest.go @@ -68,7 +68,7 @@ func NewTestServer(ctx context.Context, options func(*source.Options)) *serverte ctx = debug.WithInstance(ctx, "", "") cache := cache.New(ctx, options) ss := lsprpc.NewStreamServer(cache) - return servertest.NewTCPServer(ctx, ss) + return servertest.NewTCPServer(ctx, ss, nil) } func NewRunner(exporter packagestest.Exporter, data *tests.Data, ctx context.Context, remote string, options func(*source.Options)) *runner { diff --git a/internal/lsp/lsprpc/lsprpc_test.go b/internal/lsp/lsprpc/lsprpc_test.go index 7a56446fbb..26bef293c1 100644 --- a/internal/lsp/lsprpc/lsprpc_test.go +++ b/internal/lsp/lsprpc/lsprpc_test.go @@ -52,7 +52,7 @@ func TestClientLogging(t *testing.T) { ctx = debug.WithInstance(ctx, "", "") ss := NewStreamServer(cache.New(ctx, nil)) ss.serverForTest = server - ts := servertest.NewPipeServer(ctx, ss) + ts := servertest.NewPipeServer(ctx, ss, nil) defer checkClose(t, ts.Close) cc := ts.Connect(ctx) cc.Go(ctx, protocol.ClientHandler(client, jsonrpc2.MethodNotFound)) @@ -116,12 +116,12 @@ func TestRequestCancellation(t *testing.T) { serveCtx := debug.WithInstance(baseCtx, "", "") ss := NewStreamServer(cache.New(serveCtx, nil)) ss.serverForTest = server - tsDirect := servertest.NewTCPServer(serveCtx, ss) + tsDirect := servertest.NewTCPServer(serveCtx, ss, nil) defer checkClose(t, tsDirect.Close) forwarderCtx := debug.WithInstance(baseCtx, "", "") forwarder := NewForwarder("tcp", tsDirect.Addr) - tsForwarded := servertest.NewPipeServer(forwarderCtx, forwarder) + tsForwarded := servertest.NewPipeServer(forwarderCtx, forwarder, nil) defer checkClose(t, tsForwarded.Close) tests := []struct { @@ -212,10 +212,10 @@ func TestDebugInfoLifecycle(t *testing.T) { cache := cache.New(serverCtx, nil) ss := NewStreamServer(cache) - tsBackend := servertest.NewTCPServer(serverCtx, ss) + tsBackend := servertest.NewTCPServer(serverCtx, ss, nil) forwarder := NewForwarder("tcp", tsBackend.Addr) - tsForwarder := servertest.NewPipeServer(clientCtx, forwarder) + tsForwarder := servertest.NewPipeServer(clientCtx, forwarder, nil) conn1 := tsForwarder.Connect(clientCtx) ed1, err := fake.NewEditor(sb, fake.EditorConfig{}).Connect(clientCtx, conn1, fake.ClientHooks{}) diff --git a/internal/lsp/regtest/runner.go b/internal/lsp/regtest/runner.go index 98e08f3fd1..e078281103 100644 --- a/internal/lsp/regtest/runner.go +++ b/internal/lsp/regtest/runner.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "os" "os/exec" "path/filepath" @@ -185,8 +186,8 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(t *testing.T, e *E r.AddCloser(sandbox) } ss := tc.getServer(ctx, t) - ls := &loggingServer{delegate: ss} - ts := servertest.NewPipeServer(ctx, ls) + ls := &loggingFramer{} + ts := servertest.NewPipeServer(ctx, ss, ls.framer(jsonrpc2.NewRawStream)) defer func() { ts.Close() }() @@ -207,23 +208,23 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(t *testing.T, e *E } } -type loggingServer struct { - delegate jsonrpc2.StreamServer - +type loggingFramer struct { mu sync.Mutex buffers []*bytes.Buffer } -func (s *loggingServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error { - s.mu.Lock() - var buf bytes.Buffer - s.buffers = append(s.buffers, &buf) - s.mu.Unlock() - logStream := protocol.LoggingStream(stream, &buf) - return s.delegate.ServeStream(ctx, logStream) +func (s *loggingFramer) framer(f jsonrpc2.Framer) jsonrpc2.Framer { + return func(nc net.Conn) jsonrpc2.Stream { + s.mu.Lock() + var buf bytes.Buffer + s.buffers = append(s.buffers, &buf) + s.mu.Unlock() + stream := f(nc) + return protocol.LoggingStream(stream, &buf) + } } -func (s *loggingServer) printBuffers(testname string, w io.Writer) { +func (s *loggingFramer) printBuffers(testname string, w io.Writer) { s.mu.Lock() defer s.mu.Unlock() @@ -252,7 +253,7 @@ func (r *Runner) getTestServer() *servertest.TCPServer { ctx := context.Background() ctx = debug.WithInstance(ctx, "", "") ss := lsprpc.NewStreamServer(cache.New(ctx, nil)) - r.ts = servertest.NewTCPServer(context.Background(), ss) + r.ts = servertest.NewTCPServer(context.Background(), ss, nil) } return r.ts }