internal/lsp: change logging stream to be a framer

Change-Id: Id9e17e98ca00f31424068e875851b5f9008c6fe8
Reviewed-on: https://go-review.googlesource.com/c/tools/+/231797
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-04-30 11:05:58 -04:00
parent 9089ff1aee
commit cc40288be8
5 changed files with 40 additions and 31 deletions

View File

@ -26,20 +26,24 @@ type Connector interface {
type TCPServer struct {
Addr string
ln net.Listener
cls *closerList
ln net.Listener
framer jsonrpc2.Framer
cls *closerList
}
// NewTCPServer returns a new test server listening on local tcp port and
// serving incoming jsonrpc2 streams using the provided stream server. It
// panics on any error.
func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer) *TCPServer {
func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer, framer jsonrpc2.Framer) *TCPServer {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(fmt.Sprintf("servertest: failed to listen: %v", err))
}
if framer == nil {
framer = jsonrpc2.NewHeaderStream
}
go jsonrpc2.Serve(ctx, ln, server, 0)
return &TCPServer{Addr: ln.Addr().String(), ln: ln, cls: &closerList{}}
return &TCPServer{Addr: ln.Addr().String(), ln: ln, framer: framer, cls: &closerList{}}
}
// Connect dials the test server and returns a jsonrpc2 Connection that is
@ -52,7 +56,7 @@ func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn {
s.cls.add(func() {
netConn.Close()
})
return jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn))
return jsonrpc2.NewConn(s.framer(netConn))
}
// Close closes all connected pipes.
@ -64,12 +68,16 @@ func (s *TCPServer) Close() error {
// PipeServer is a test server that handles connections over io.Pipes.
type PipeServer struct {
server jsonrpc2.StreamServer
framer jsonrpc2.Framer
cls *closerList
}
// NewPipeServer returns a test server that can be connected to via io.Pipes.
func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer) *PipeServer {
return &PipeServer{server: server, cls: &closerList{}}
func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer, framer jsonrpc2.Framer) *PipeServer {
if framer == nil {
framer = jsonrpc2.NewRawStream
}
return &PipeServer{server: server, framer: framer, cls: &closerList{}}
}
// Connect creates new io.Pipes and binds them to the underlying StreamServer.
@ -79,10 +87,10 @@ func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn {
sPipe.Close()
cPipe.Close()
})
serverStream := jsonrpc2.NewRawStream(sPipe)
serverStream := s.framer(sPipe)
go s.server.ServeStream(ctx, serverStream)
clientStream := jsonrpc2.NewRawStream(cPipe)
clientStream := s.framer(cPipe)
return jsonrpc2.NewConn(clientStream)
}

View File

@ -24,9 +24,9 @@ func TestTestServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server := jsonrpc2.HandlerServer(fakeHandler)
tcpTS := NewTCPServer(ctx, server)
tcpTS := NewTCPServer(ctx, server, nil)
defer tcpTS.Close()
pipeTS := NewPipeServer(ctx, server)
pipeTS := NewPipeServer(ctx, server, nil)
defer pipeTS.Close()
tests := []struct {

View File

@ -68,7 +68,7 @@ func NewTestServer(ctx context.Context, options func(*source.Options)) *serverte
ctx = debug.WithInstance(ctx, "", "")
cache := cache.New(ctx, options)
ss := lsprpc.NewStreamServer(cache)
return servertest.NewTCPServer(ctx, ss)
return servertest.NewTCPServer(ctx, ss, nil)
}
func NewRunner(exporter packagestest.Exporter, data *tests.Data, ctx context.Context, remote string, options func(*source.Options)) *runner {

View File

@ -52,7 +52,7 @@ func TestClientLogging(t *testing.T) {
ctx = debug.WithInstance(ctx, "", "")
ss := NewStreamServer(cache.New(ctx, nil))
ss.serverForTest = server
ts := servertest.NewPipeServer(ctx, ss)
ts := servertest.NewPipeServer(ctx, ss, nil)
defer checkClose(t, ts.Close)
cc := ts.Connect(ctx)
cc.Go(ctx, protocol.ClientHandler(client, jsonrpc2.MethodNotFound))
@ -116,12 +116,12 @@ func TestRequestCancellation(t *testing.T) {
serveCtx := debug.WithInstance(baseCtx, "", "")
ss := NewStreamServer(cache.New(serveCtx, nil))
ss.serverForTest = server
tsDirect := servertest.NewTCPServer(serveCtx, ss)
tsDirect := servertest.NewTCPServer(serveCtx, ss, nil)
defer checkClose(t, tsDirect.Close)
forwarderCtx := debug.WithInstance(baseCtx, "", "")
forwarder := NewForwarder("tcp", tsDirect.Addr)
tsForwarded := servertest.NewPipeServer(forwarderCtx, forwarder)
tsForwarded := servertest.NewPipeServer(forwarderCtx, forwarder, nil)
defer checkClose(t, tsForwarded.Close)
tests := []struct {
@ -212,10 +212,10 @@ func TestDebugInfoLifecycle(t *testing.T) {
cache := cache.New(serverCtx, nil)
ss := NewStreamServer(cache)
tsBackend := servertest.NewTCPServer(serverCtx, ss)
tsBackend := servertest.NewTCPServer(serverCtx, ss, nil)
forwarder := NewForwarder("tcp", tsBackend.Addr)
tsForwarder := servertest.NewPipeServer(clientCtx, forwarder)
tsForwarder := servertest.NewPipeServer(clientCtx, forwarder, nil)
conn1 := tsForwarder.Connect(clientCtx)
ed1, err := fake.NewEditor(sb, fake.EditorConfig{}).Connect(clientCtx, conn1, fake.ClientHooks{})

View File

@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"path/filepath"
@ -185,8 +186,8 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(t *testing.T, e *E
r.AddCloser(sandbox)
}
ss := tc.getServer(ctx, t)
ls := &loggingServer{delegate: ss}
ts := servertest.NewPipeServer(ctx, ls)
ls := &loggingFramer{}
ts := servertest.NewPipeServer(ctx, ss, ls.framer(jsonrpc2.NewRawStream))
defer func() {
ts.Close()
}()
@ -207,23 +208,23 @@ func (r *Runner) Run(t *testing.T, filedata string, test func(t *testing.T, e *E
}
}
type loggingServer struct {
delegate jsonrpc2.StreamServer
type loggingFramer struct {
mu sync.Mutex
buffers []*bytes.Buffer
}
func (s *loggingServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
s.mu.Lock()
var buf bytes.Buffer
s.buffers = append(s.buffers, &buf)
s.mu.Unlock()
logStream := protocol.LoggingStream(stream, &buf)
return s.delegate.ServeStream(ctx, logStream)
func (s *loggingFramer) framer(f jsonrpc2.Framer) jsonrpc2.Framer {
return func(nc net.Conn) jsonrpc2.Stream {
s.mu.Lock()
var buf bytes.Buffer
s.buffers = append(s.buffers, &buf)
s.mu.Unlock()
stream := f(nc)
return protocol.LoggingStream(stream, &buf)
}
}
func (s *loggingServer) printBuffers(testname string, w io.Writer) {
func (s *loggingFramer) printBuffers(testname string, w io.Writer) {
s.mu.Lock()
defer s.mu.Unlock()
@ -252,7 +253,7 @@ func (r *Runner) getTestServer() *servertest.TCPServer {
ctx := context.Background()
ctx = debug.WithInstance(ctx, "", "")
ss := lsprpc.NewStreamServer(cache.New(ctx, nil))
r.ts = servertest.NewTCPServer(context.Background(), ss)
r.ts = servertest.NewTCPServer(context.Background(), ss, nil)
}
return r.ts
}