From b320d3a0f5a29dea7f95f1ca5c4cbef0ac24a304 Mon Sep 17 00:00:00 2001 From: Rob Findley Date: Sun, 9 Feb 2020 13:38:49 -0500 Subject: [PATCH] internal/jsonrpc2/servertest: support both TCP and pipe connection Update the servertest package to support connecting to a jsonrpc2 server using either TCP or io.Pipes. The latter is provided so that regtests can more accurately mimic the current gopls execution mode, where gopls is run as a sidecar and communicated with via a pipe. Updates golang/go#36879 Change-Id: I0e14ed0e628333ba2cc7b088009f1887fcaa82a5 Reviewed-on: https://go-review.googlesource.com/c/tools/+/218777 Run-TryBot: Robert Findley TryBot-Result: Gobot Gobot Reviewed-by: Heschi Kreinick --- gopls/test/gopls_test.go | 2 +- internal/jsonrpc2/servertest/servertest.go | 98 ++++++++++++++++--- .../jsonrpc2/servertest/servertest_test.go | 33 +++++-- internal/lsp/cmd/cmd_test.go | 4 +- internal/lsp/lsprpc/lsprpc_test.go | 8 +- internal/lsp/regtest/env.go | 26 ++--- 6 files changed, 130 insertions(+), 41 deletions(-) diff --git a/gopls/test/gopls_test.go b/gopls/test/gopls_test.go index 1adcf483b4..343ae2174b 100644 --- a/gopls/test/gopls_test.go +++ b/gopls/test/gopls_test.go @@ -43,7 +43,7 @@ func testCommandLine(t *testing.T, exporter packagestest.Exporter) { ctx := tests.Context(t) cache := cache.New(commandLineOptions) ss := lsprpc.NewStreamServer(cache, false) - ts := servertest.NewServer(ctx, ss) + ts := servertest.NewTCPServer(ctx, ss) for _, data := range data { defer data.Exported.Cleanup() t.Run(data.Folder, func(t *testing.T) { diff --git a/internal/jsonrpc2/servertest/servertest.go b/internal/jsonrpc2/servertest/servertest.go index b6ef7dfb9b..818662d500 100644 --- a/internal/jsonrpc2/servertest/servertest.go +++ b/internal/jsonrpc2/servertest/servertest.go @@ -9,46 +9,118 @@ package servertest import ( "context" "fmt" + "io" "net" + "sync" "golang.org/x/tools/internal/jsonrpc2" ) -// Server is a helper for executing tests against a remote jsonrpc2 connection. -// Once initialized, its Addr field may be used to connect a jsonrpc2 client. -type Server struct { +// Connector is the interface used to connect to a server. +type Connector interface { + Connect(context.Context) *jsonrpc2.Conn +} + +// TCPServer is a helper for executing tests against a remote jsonrpc2 +// connection. Once initialized, its Addr field may be used to connect a +// jsonrpc2 client. +type TCPServer struct { Addr string ln net.Listener + cls *closerList } -// NewServer returns a new test server listening on local tcp port and serving -// incoming jsonrpc2 streams using the provided stream server. It panics on any -// error. -func NewServer(ctx context.Context, server jsonrpc2.StreamServer) *Server { +// NewTCPServer returns a new test server listening on local tcp port and +// serving incoming jsonrpc2 streams using the provided stream server. It +// panics on any error. +func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer) *TCPServer { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(fmt.Sprintf("servertest: failed to listen: %v", err)) } go jsonrpc2.Serve(ctx, ln, server) - return &Server{Addr: ln.Addr().String(), ln: ln} + return &TCPServer{Addr: ln.Addr().String(), ln: ln, cls: &closerList{}} } // Connect dials the test server and returns a jsonrpc2 Connection that is // ready for use. -func (s *Server) Connect(ctx context.Context) *jsonrpc2.Conn { +func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn { netConn, err := net.Dial("tcp", s.Addr) if err != nil { panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err)) } + s.cls.add(func() { + netConn.Close() + }) conn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn)) go conn.Run(ctx) return conn } -// Close is a placeholder for proper test server shutdown. -// TODO: implement proper shutdown, which gracefully closes existing -// connections to the test server. -func (s *Server) Close() error { +// Close closes all connected pipes. +func (s *TCPServer) Close() error { + s.cls.closeAll() return nil } + +// PipeServer is a test server that handles connections over io.Pipes. +type PipeServer struct { + server jsonrpc2.StreamServer + cls *closerList +} + +// NewPipeServer returns a test server that can be connected to via io.Pipes. +func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer) *PipeServer { + return &PipeServer{server: server, cls: &closerList{}} +} + +// Connect creates new io.Pipes and binds them to the underlying StreamServer. +func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn { + // Pipes connect like this: + // Client🡒(sWriter)🡒(sReader)🡒Server + // 🡔(cReader)🡐(cWriter)🡗 + sReader, sWriter := io.Pipe() + cReader, cWriter := io.Pipe() + s.cls.add(func() { + sReader.Close() + sWriter.Close() + cReader.Close() + cWriter.Close() + }) + serverStream := jsonrpc2.NewStream(sReader, cWriter) + go s.server.ServeStream(ctx, serverStream) + + clientStream := jsonrpc2.NewStream(cReader, sWriter) + clientConn := jsonrpc2.NewConn(clientStream) + go clientConn.Run(ctx) + return clientConn +} + +// Close closes all connected pipes. +func (s *PipeServer) Close() error { + s.cls.closeAll() + return nil +} + +// closerList tracks closers to run when a testserver is closed. This is a +// convenience, so that callers don't have to worry about closing each +// connection. +type closerList struct { + mu sync.Mutex + closers []func() +} + +func (l *closerList) add(closer func()) { + l.mu.Lock() + defer l.mu.Unlock() + l.closers = append(l.closers, closer) +} + +func (l *closerList) closeAll() { + l.mu.Lock() + defer l.mu.Unlock() + for _, closer := range l.closers { + closer() + } +} diff --git a/internal/jsonrpc2/servertest/servertest_test.go b/internal/jsonrpc2/servertest/servertest_test.go index 578a56ec41..851c0f137f 100644 --- a/internal/jsonrpc2/servertest/servertest_test.go +++ b/internal/jsonrpc2/servertest/servertest_test.go @@ -30,14 +30,31 @@ func (fakeHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered b func TestTestServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - ts := NewServer(ctx, jsonrpc2.HandlerServer(fakeHandler{})) - defer ts.Close() - conn := ts.Connect(ctx) - var got msg - if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil { - t.Fatal(err) + server := jsonrpc2.HandlerServer(fakeHandler{}) + tcpTS := NewTCPServer(ctx, server) + defer tcpTS.Close() + pipeTS := NewPipeServer(ctx, server) + defer pipeTS.Close() + + + tests := []struct { + name string + connector Connector + } { + {"tcp", tcpTS}, + {"pipe", pipeTS}, } - if want := "pong"; got.Msg != want { - t.Errorf("conn.Call(...): returned %q, want %q", got, want) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + conn := test.connector.Connect(ctx) + var got msg + if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil { + t.Fatal(err) + } + if want := "pong"; got.Msg != want { + t.Errorf("conn.Call(...): returned %q, want %q", got, want) + } + }) } } diff --git a/internal/lsp/cmd/cmd_test.go b/internal/lsp/cmd/cmd_test.go index 0e9c97f650..56ebacebc8 100644 --- a/internal/lsp/cmd/cmd_test.go +++ b/internal/lsp/cmd/cmd_test.go @@ -45,10 +45,10 @@ func testCommandLine(t *testing.T, exporter packagestest.Exporter) { } } -func testServer(ctx context.Context) *servertest.Server { +func testServer(ctx context.Context) *servertest.TCPServer { cache := cache.New(nil) ss := lsprpc.NewStreamServer(cache, false) - return servertest.NewServer(ctx, ss) + return servertest.NewTCPServer(ctx, ss) } func TestDefinitionHelpExample(t *testing.T) { diff --git a/internal/lsp/lsprpc/lsprpc_test.go b/internal/lsp/lsprpc/lsprpc_test.go index b7c20fe476..4ecf974805 100644 --- a/internal/lsp/lsprpc/lsprpc_test.go +++ b/internal/lsp/lsprpc/lsprpc_test.go @@ -46,7 +46,7 @@ func TestClientLogging(t *testing.T) { return server }, } - ts := servertest.NewServer(ctx, ss) + ts := servertest.NewPipeServer(ctx, ss) cc := ts.Connect(ctx) cc.AddHandler(protocol.ClientHandler(client)) @@ -100,14 +100,14 @@ func TestRequestCancellation(t *testing.T) { }, } ctx := context.Background() - tsDirect := servertest.NewServer(ctx, ss) + tsDirect := servertest.NewTCPServer(ctx, ss) forwarder := NewForwarder(tsDirect.Addr, false) - tsForwarded := servertest.NewServer(ctx, forwarder) + tsForwarded := servertest.NewPipeServer(ctx, forwarder) tests := []struct { serverType string - ts *servertest.Server + ts servertest.Connector }{ {"direct", tsDirect}, {"forwarder", tsForwarded}, diff --git a/internal/lsp/regtest/env.go b/internal/lsp/regtest/env.go index 8bc100f3b1..61eca0b45c 100644 --- a/internal/lsp/regtest/env.go +++ b/internal/lsp/regtest/env.go @@ -40,7 +40,7 @@ const ( // remote), any tests that execute on the same Runner will share the same // state. type Runner struct { - ts *servertest.Server + ts *servertest.TCPServer modes EnvMode timeout time.Duration } @@ -49,7 +49,7 @@ type Runner struct { // run tests. func NewTestRunner(modes EnvMode, testTimeout time.Duration) *Runner { ss := lsprpc.NewStreamServer(cache.New(nil), false) - ts := servertest.NewServer(context.Background(), ss) + ts := servertest.NewTCPServer(context.Background(), ss) return &Runner{ ts: ts, modes: modes, @@ -69,9 +69,9 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, * t.Helper() tests := []struct { - name string - mode EnvMode - makeServer func(context.Context, *testing.T) (*servertest.Server, func()) + name string + mode EnvMode + getConnector func(context.Context, *testing.T) (servertest.Connector, func()) }{ {"singleton", Singleton, r.singletonEnv}, {"shared", Shared, r.sharedEnv}, @@ -92,7 +92,7 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, * t.Fatal(err) } defer ws.Close() - ts, cleanup := tc.makeServer(ctx, t) + ts, cleanup := tc.getConnector(ctx, t) defer cleanup() env := NewEnv(ctx, t, ws, ts) test(ctx, t, env) @@ -100,22 +100,22 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, * } } -func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) { +func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) { ss := lsprpc.NewStreamServer(cache.New(nil), false) - ts := servertest.NewServer(ctx, ss) + ts := servertest.NewPipeServer(ctx, ss) cleanup := func() { ts.Close() } return ts, cleanup } -func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) { +func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) { return r.ts, func() {} } -func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) { +func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) { forwarder := lsprpc.NewForwarder(r.ts.Addr, false) - ts2 := servertest.NewServer(ctx, forwarder) + ts2 := servertest.NewTCPServer(ctx, forwarder) cleanup := func() { ts2.Close() } @@ -134,7 +134,7 @@ type Env struct { // but they are available if needed. W *fake.Workspace E *fake.Editor - Server *servertest.Server + Server servertest.Connector // mu guards the fields below, for the purpose of checking conditions on // every change to diagnostics. @@ -154,7 +154,7 @@ type diagnosticCondition struct { // NewEnv creates a new test environment using the given workspace and gopls // server. -func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts *servertest.Server) *Env { +func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts servertest.Connector) *Env { t.Helper() conn := ts.Connect(ctx) editor, err := fake.NewConnectedEditor(ctx, ws, conn)