internal/jsonrpc2/servertest: support both TCP and pipe connection

Update the servertest package to support connecting to a jsonrpc2 server
using either TCP or io.Pipes. The latter is provided so that regtests
can more accurately mimic the current gopls execution mode, where gopls
is run as a sidecar and communicated with via a pipe.

Updates golang/go#36879

Change-Id: I0e14ed0e628333ba2cc7b088009f1887fcaa82a5
Reviewed-on: https://go-review.googlesource.com/c/tools/+/218777
Run-TryBot: Robert Findley <rfindley@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
This commit is contained in:
Rob Findley 2020-02-09 13:38:49 -05:00 committed by Robert Findley
parent 5916a50871
commit b320d3a0f5
6 changed files with 130 additions and 41 deletions

View File

@ -43,7 +43,7 @@ func testCommandLine(t *testing.T, exporter packagestest.Exporter) {
ctx := tests.Context(t)
cache := cache.New(commandLineOptions)
ss := lsprpc.NewStreamServer(cache, false)
ts := servertest.NewServer(ctx, ss)
ts := servertest.NewTCPServer(ctx, ss)
for _, data := range data {
defer data.Exported.Cleanup()
t.Run(data.Folder, func(t *testing.T) {

View File

@ -9,46 +9,118 @@ package servertest
import (
"context"
"fmt"
"io"
"net"
"sync"
"golang.org/x/tools/internal/jsonrpc2"
)
// Server is a helper for executing tests against a remote jsonrpc2 connection.
// Once initialized, its Addr field may be used to connect a jsonrpc2 client.
type Server struct {
// Connector is the interface used to connect to a server.
type Connector interface {
Connect(context.Context) *jsonrpc2.Conn
}
// TCPServer is a helper for executing tests against a remote jsonrpc2
// connection. Once initialized, its Addr field may be used to connect a
// jsonrpc2 client.
type TCPServer struct {
Addr string
ln net.Listener
cls *closerList
}
// NewServer returns a new test server listening on local tcp port and serving
// incoming jsonrpc2 streams using the provided stream server. It panics on any
// error.
func NewServer(ctx context.Context, server jsonrpc2.StreamServer) *Server {
// NewTCPServer returns a new test server listening on local tcp port and
// serving incoming jsonrpc2 streams using the provided stream server. It
// panics on any error.
func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer) *TCPServer {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(fmt.Sprintf("servertest: failed to listen: %v", err))
}
go jsonrpc2.Serve(ctx, ln, server)
return &Server{Addr: ln.Addr().String(), ln: ln}
return &TCPServer{Addr: ln.Addr().String(), ln: ln, cls: &closerList{}}
}
// Connect dials the test server and returns a jsonrpc2 Connection that is
// ready for use.
func (s *Server) Connect(ctx context.Context) *jsonrpc2.Conn {
func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn {
netConn, err := net.Dial("tcp", s.Addr)
if err != nil {
panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err))
}
s.cls.add(func() {
netConn.Close()
})
conn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn))
go conn.Run(ctx)
return conn
}
// Close is a placeholder for proper test server shutdown.
// TODO: implement proper shutdown, which gracefully closes existing
// connections to the test server.
func (s *Server) Close() error {
// Close closes all connected pipes.
func (s *TCPServer) Close() error {
s.cls.closeAll()
return nil
}
// PipeServer is a test server that handles connections over io.Pipes.
type PipeServer struct {
server jsonrpc2.StreamServer
cls *closerList
}
// NewPipeServer returns a test server that can be connected to via io.Pipes.
func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer) *PipeServer {
return &PipeServer{server: server, cls: &closerList{}}
}
// Connect creates new io.Pipes and binds them to the underlying StreamServer.
func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn {
// Pipes connect like this:
// Client🡒(sWriter)🡒(sReader)🡒Server
// 🡔(cReader)🡐(cWriter)🡗
sReader, sWriter := io.Pipe()
cReader, cWriter := io.Pipe()
s.cls.add(func() {
sReader.Close()
sWriter.Close()
cReader.Close()
cWriter.Close()
})
serverStream := jsonrpc2.NewStream(sReader, cWriter)
go s.server.ServeStream(ctx, serverStream)
clientStream := jsonrpc2.NewStream(cReader, sWriter)
clientConn := jsonrpc2.NewConn(clientStream)
go clientConn.Run(ctx)
return clientConn
}
// Close closes all connected pipes.
func (s *PipeServer) Close() error {
s.cls.closeAll()
return nil
}
// closerList tracks closers to run when a testserver is closed. This is a
// convenience, so that callers don't have to worry about closing each
// connection.
type closerList struct {
mu sync.Mutex
closers []func()
}
func (l *closerList) add(closer func()) {
l.mu.Lock()
defer l.mu.Unlock()
l.closers = append(l.closers, closer)
}
func (l *closerList) closeAll() {
l.mu.Lock()
defer l.mu.Unlock()
for _, closer := range l.closers {
closer()
}
}

View File

@ -30,14 +30,31 @@ func (fakeHandler) Deliver(ctx context.Context, r *jsonrpc2.Request, delivered b
func TestTestServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ts := NewServer(ctx, jsonrpc2.HandlerServer(fakeHandler{}))
defer ts.Close()
conn := ts.Connect(ctx)
var got msg
if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil {
t.Fatal(err)
server := jsonrpc2.HandlerServer(fakeHandler{})
tcpTS := NewTCPServer(ctx, server)
defer tcpTS.Close()
pipeTS := NewPipeServer(ctx, server)
defer pipeTS.Close()
tests := []struct {
name string
connector Connector
} {
{"tcp", tcpTS},
{"pipe", pipeTS},
}
if want := "pong"; got.Msg != want {
t.Errorf("conn.Call(...): returned %q, want %q", got, want)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
conn := test.connector.Connect(ctx)
var got msg
if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil {
t.Fatal(err)
}
if want := "pong"; got.Msg != want {
t.Errorf("conn.Call(...): returned %q, want %q", got, want)
}
})
}
}

View File

@ -45,10 +45,10 @@ func testCommandLine(t *testing.T, exporter packagestest.Exporter) {
}
}
func testServer(ctx context.Context) *servertest.Server {
func testServer(ctx context.Context) *servertest.TCPServer {
cache := cache.New(nil)
ss := lsprpc.NewStreamServer(cache, false)
return servertest.NewServer(ctx, ss)
return servertest.NewTCPServer(ctx, ss)
}
func TestDefinitionHelpExample(t *testing.T) {

View File

@ -46,7 +46,7 @@ func TestClientLogging(t *testing.T) {
return server
},
}
ts := servertest.NewServer(ctx, ss)
ts := servertest.NewPipeServer(ctx, ss)
cc := ts.Connect(ctx)
cc.AddHandler(protocol.ClientHandler(client))
@ -100,14 +100,14 @@ func TestRequestCancellation(t *testing.T) {
},
}
ctx := context.Background()
tsDirect := servertest.NewServer(ctx, ss)
tsDirect := servertest.NewTCPServer(ctx, ss)
forwarder := NewForwarder(tsDirect.Addr, false)
tsForwarded := servertest.NewServer(ctx, forwarder)
tsForwarded := servertest.NewPipeServer(ctx, forwarder)
tests := []struct {
serverType string
ts *servertest.Server
ts servertest.Connector
}{
{"direct", tsDirect},
{"forwarder", tsForwarded},

View File

@ -40,7 +40,7 @@ const (
// remote), any tests that execute on the same Runner will share the same
// state.
type Runner struct {
ts *servertest.Server
ts *servertest.TCPServer
modes EnvMode
timeout time.Duration
}
@ -49,7 +49,7 @@ type Runner struct {
// run tests.
func NewTestRunner(modes EnvMode, testTimeout time.Duration) *Runner {
ss := lsprpc.NewStreamServer(cache.New(nil), false)
ts := servertest.NewServer(context.Background(), ss)
ts := servertest.NewTCPServer(context.Background(), ss)
return &Runner{
ts: ts,
modes: modes,
@ -69,9 +69,9 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, *
t.Helper()
tests := []struct {
name string
mode EnvMode
makeServer func(context.Context, *testing.T) (*servertest.Server, func())
name string
mode EnvMode
getConnector func(context.Context, *testing.T) (servertest.Connector, func())
}{
{"singleton", Singleton, r.singletonEnv},
{"shared", Shared, r.sharedEnv},
@ -92,7 +92,7 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, *
t.Fatal(err)
}
defer ws.Close()
ts, cleanup := tc.makeServer(ctx, t)
ts, cleanup := tc.getConnector(ctx, t)
defer cleanup()
env := NewEnv(ctx, t, ws, ts)
test(ctx, t, env)
@ -100,22 +100,22 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(context.Context, *
}
}
func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
ss := lsprpc.NewStreamServer(cache.New(nil), false)
ts := servertest.NewServer(ctx, ss)
ts := servertest.NewPipeServer(ctx, ss)
cleanup := func() {
ts.Close()
}
return ts, cleanup
}
func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
return r.ts, func() {}
}
func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
forwarder := lsprpc.NewForwarder(r.ts.Addr, false)
ts2 := servertest.NewServer(ctx, forwarder)
ts2 := servertest.NewTCPServer(ctx, forwarder)
cleanup := func() {
ts2.Close()
}
@ -134,7 +134,7 @@ type Env struct {
// but they are available if needed.
W *fake.Workspace
E *fake.Editor
Server *servertest.Server
Server servertest.Connector
// mu guards the fields below, for the purpose of checking conditions on
// every change to diagnostics.
@ -154,7 +154,7 @@ type diagnosticCondition struct {
// NewEnv creates a new test environment using the given workspace and gopls
// server.
func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts *servertest.Server) *Env {
func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts servertest.Connector) *Env {
t.Helper()
conn := ts.Connect(ctx)
editor, err := fake.NewConnectedEditor(ctx, ws, conn)