From d3396bb197417d37a2def9208c3c0bf371d603f6 Mon Sep 17 00:00:00 2001 From: Ian Cottrell Date: Wed, 29 Apr 2020 13:17:22 -0400 Subject: [PATCH] internal/jsonrpc2: change jsonrpc2.Conn to be an interface This will allow varying implementations and wrappers, and more closely matches the concepts used in the net library. Change-Id: I4be4c6efb3def0eda2693f482cbb0c6f776e5642 Reviewed-on: https://go-review.googlesource.com/c/tools/+/232877 Run-TryBot: Ian Cottrell TryBot-Result: Gobot Gobot Reviewed-by: Robert Findley --- internal/jsonrpc2/conn.go | 262 +++++++++++++++++++++ internal/jsonrpc2/jsonrpc2.go | 248 ------------------- internal/jsonrpc2/jsonrpc2_test.go | 4 +- internal/jsonrpc2/messages.go | 8 + internal/jsonrpc2/servertest/servertest.go | 6 +- internal/lsp/fake/editor.go | 2 +- internal/lsp/protocol/protocol.go | 8 +- internal/lsp/protocol/tsclient.go | 2 +- internal/lsp/protocol/tsserver.go | 2 +- internal/lsp/protocol/typescript/code.ts | 2 +- internal/lsp/regtest/env.go | 2 +- 11 files changed, 284 insertions(+), 262 deletions(-) create mode 100644 internal/jsonrpc2/conn.go diff --git a/internal/jsonrpc2/conn.go b/internal/jsonrpc2/conn.go new file mode 100644 index 0000000000..ca7752d664 --- /dev/null +++ b/internal/jsonrpc2/conn.go @@ -0,0 +1,262 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + + "golang.org/x/tools/internal/event" + "golang.org/x/tools/internal/event/label" + "golang.org/x/tools/internal/lsp/debug/tag" +) + +// Conn is the common interface to jsonrpc clients and servers. +// Conn is bidirectional; it does not have a designated server or client end. +// It manages the jsonrpc2 protocol, connecting responses back to their calls. +type Conn interface { + // Call invokes the target method and waits for a response. + // The params will be marshaled to JSON before sending over the wire, and will + // be handed to the method invoked. + // The response will be unmarshaled from JSON into the result. + // The id returned will be unique from this connection, and can be used for + // logging or tracking. + Call(ctx context.Context, method string, params, result interface{}) (ID, error) + + // Notify invokes the target method but does not wait for a response. + // The params will be marshaled to JSON before sending over the wire, and will + // be handed to the method invoked. + Notify(ctx context.Context, method string, params interface{}) error + + // Go starts a goroutine to handle the connection. + // It must be called exactly once for each Conn. + // It returns immediately. + // You must block on Done() to wait for the connection to shut down. + // This is a temporary measure, this should be started automatically in the + // future. + Go(ctx context.Context, handler Handler) + + // Close closes the connection and it's underlying stream. + // It does not wait for the close to complete, use the Done() channel for + // that. + Close() error + + // Done returns a channel that will be closed when the processing goroutine + // has terminated, which will happen if Close() is called or an underlying + // stream is closed. + Done() <-chan struct{} + + // Err returns an error if there was one from within the processing goroutine. + // If err returns non nil, the connection will be already closed or closing. + Err() error +} + +type conn struct { + seq int64 // must only be accessed using atomic operations + writeMu sync.Mutex // protects writes to the stream + stream Stream + pendingMu sync.Mutex // protects the pending map + pending map[ID]chan *Response + + done chan struct{} + err atomic.Value +} + +// NewConn creates a new connection object around the supplied stream. +func NewConn(s Stream) Conn { + conn := &conn{ + stream: s, + pending: make(map[ID]chan *Response), + done: make(chan struct{}), + } + return conn +} + +func (c *conn) Notify(ctx context.Context, method string, params interface{}) (err error) { + notify, err := NewNotification(method, params) + if err != nil { + return fmt.Errorf("marshaling notify parameters: %v", err) + } + ctx, done := event.Start(ctx, method, + tag.Method.Of(method), + tag.RPCDirection.Of(tag.Outbound), + ) + defer func() { + recordStatus(ctx, err) + done() + }() + + event.Metric(ctx, tag.Started.Of(1)) + n, err := c.write(ctx, notify) + event.Metric(ctx, tag.SentBytes.Of(n)) + return err +} + +func (c *conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) { + // generate a new request identifier + id := ID{number: atomic.AddInt64(&c.seq, 1)} + call, err := NewCall(id, method, params) + if err != nil { + return id, fmt.Errorf("marshaling call parameters: %v", err) + } + ctx, done := event.Start(ctx, method, + tag.Method.Of(method), + tag.RPCDirection.Of(tag.Outbound), + tag.RPCID.Of(fmt.Sprintf("%q", id)), + ) + defer func() { + recordStatus(ctx, err) + done() + }() + event.Metric(ctx, tag.Started.Of(1)) + // We have to add ourselves to the pending map before we send, otherwise we + // are racing the response. Also add a buffer to rchan, so that if we get a + // wire response between the time this call is cancelled and id is deleted + // from c.pending, the send to rchan will not block. + rchan := make(chan *Response, 1) + c.pendingMu.Lock() + c.pending[id] = rchan + c.pendingMu.Unlock() + defer func() { + c.pendingMu.Lock() + delete(c.pending, id) + c.pendingMu.Unlock() + }() + // now we are ready to send + n, err := c.write(ctx, call) + event.Metric(ctx, tag.SentBytes.Of(n)) + if err != nil { + // sending failed, we will never get a response, so don't leave it pending + return id, err + } + // now wait for the response + select { + case response := <-rchan: + // is it an error response? + if response.err != nil { + return id, response.err + } + if result == nil || len(response.result) == 0 { + return id, nil + } + if err := json.Unmarshal(response.result, result); err != nil { + return id, fmt.Errorf("unmarshaling result: %v", err) + } + return id, nil + case <-ctx.Done(): + return id, ctx.Err() + } +} + +func (c *conn) replier(req Request, spanDone func()) Replier { + return func(ctx context.Context, result interface{}, err error) error { + defer func() { + recordStatus(ctx, err) + spanDone() + }() + call, ok := req.(*Call) + if !ok { + // request was a notify, no need to respond + return nil + } + response, err := NewResponse(call.id, result, err) + if err != nil { + return err + } + n, err := c.write(ctx, response) + event.Metric(ctx, tag.SentBytes.Of(n)) + if err != nil { + // TODO(iancottrell): if a stream write fails, we really need to shut down + // the whole stream + return err + } + return nil + } +} + +func (c *conn) write(ctx context.Context, msg Message) (int64, error) { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.stream.Write(ctx, msg) +} + +func (c *conn) Go(ctx context.Context, handler Handler) { + go c.run(ctx, handler) +} + +func (c *conn) run(ctx context.Context, handler Handler) { + defer close(c.done) + for { + // get the next message + msg, n, err := c.stream.Read(ctx) + if err != nil { + // The stream failed, we cannot continue. + c.fail(err) + return + } + switch msg := msg.(type) { + case Request: + labels := []label.Label{ + tag.Method.Of(msg.Method()), + tag.RPCDirection.Of(tag.Inbound), + {}, // reserved for ID if present + } + if call, ok := msg.(*Call); ok { + labels[len(labels)-1] = tag.RPCID.Of(fmt.Sprintf("%q", call.ID())) + } else { + labels = labels[:len(labels)-1] + } + reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...) + event.Metric(reqCtx, + tag.Started.Of(1), + tag.ReceivedBytes.Of(n)) + if err := handler(reqCtx, c.replier(msg, spanDone), msg); err != nil { + // delivery failed, not much we can do + event.Error(reqCtx, "jsonrpc2 message delivery failed", err) + } + case *Response: + // If method is not set, this should be a response, in which case we must + // have an id to send the response back to the caller. + c.pendingMu.Lock() + rchan, ok := c.pending[msg.id] + c.pendingMu.Unlock() + if ok { + rchan <- msg + } + } + } +} + +func (c *conn) Close() error { + return c.stream.Close() +} + +func (c *conn) Done() <-chan struct{} { + return c.done +} + +func (c *conn) Err() error { + if err := c.err.Load(); err != nil { + return err.(error) + } + return nil +} + +// fail sets a failure condition on the stream and closes it. +func (c *conn) fail(err error) { + c.err.Store(err) + c.stream.Close() +} + +func recordStatus(ctx context.Context, err error) { + if err != nil { + event.Label(ctx, tag.StatusCode.Of("ERROR")) + } else { + event.Label(ctx, tag.StatusCode.Of("OK")) + } +} diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index 840b6ca240..5a52995014 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -7,259 +7,11 @@ // It is intended to be compatible with other implementations at the wire level. package jsonrpc2 -import ( - "context" - "encoding/json" - "fmt" - "sync" - "sync/atomic" - - "golang.org/x/tools/internal/event" - "golang.org/x/tools/internal/event/label" - "golang.org/x/tools/internal/lsp/debug/tag" -) - const ( // ErrIdleTimeout is returned when serving timed out waiting for new connections. ErrIdleTimeout = constError("timed out waiting for new connections") ) -// Conn is a JSON RPC 2 client server connection. -// Conn is bidirectional; it does not have a designated server or client end. -type Conn struct { - seq int64 // must only be accessed using atomic operations - writeMu sync.Mutex // protects writes to the stream - stream Stream - pendingMu sync.Mutex // protects the pending map - pending map[ID]chan *Response - - done chan struct{} - err atomic.Value -} - type constError string func (e constError) Error() string { return string(e) } - -// NewConn creates a new connection object around the supplied stream. -// You must call Run for the connection to be active. -func NewConn(s Stream) *Conn { - conn := &Conn{ - stream: s, - pending: make(map[ID]chan *Response), - done: make(chan struct{}), - } - return conn -} - -// Notify is called to send a notification request over the connection. -// It will return as soon as the notification has been sent, as no response is -// possible. -func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) { - notify, err := NewNotification(method, params) - if err != nil { - return fmt.Errorf("marshaling notify parameters: %v", err) - } - ctx, done := event.Start(ctx, method, - tag.Method.Of(method), - tag.RPCDirection.Of(tag.Outbound), - ) - defer func() { - recordStatus(ctx, err) - done() - }() - - event.Metric(ctx, tag.Started.Of(1)) - n, err := c.write(ctx, notify) - event.Metric(ctx, tag.SentBytes.Of(n)) - return err -} - -// Call sends a request over the connection and then waits for a response. -// If the response is not an error, it will be decoded into result. -// result must be of a type you an pass to json.Unmarshal. -func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) { - // generate a new request identifier - id := ID{number: atomic.AddInt64(&c.seq, 1)} - call, err := NewCall(id, method, params) - if err != nil { - return id, fmt.Errorf("marshaling call parameters: %v", err) - } - ctx, done := event.Start(ctx, method, - tag.Method.Of(method), - tag.RPCDirection.Of(tag.Outbound), - tag.RPCID.Of(fmt.Sprintf("%q", id)), - ) - defer func() { - recordStatus(ctx, err) - done() - }() - event.Metric(ctx, tag.Started.Of(1)) - // We have to add ourselves to the pending map before we send, otherwise we - // are racing the response. Also add a buffer to rchan, so that if we get a - // wire response between the time this call is cancelled and id is deleted - // from c.pending, the send to rchan will not block. - rchan := make(chan *Response, 1) - c.pendingMu.Lock() - c.pending[id] = rchan - c.pendingMu.Unlock() - defer func() { - c.pendingMu.Lock() - delete(c.pending, id) - c.pendingMu.Unlock() - }() - // now we are ready to send - n, err := c.write(ctx, call) - event.Metric(ctx, tag.SentBytes.Of(n)) - if err != nil { - // sending failed, we will never get a response, so don't leave it pending - return id, err - } - // now wait for the response - select { - case response := <-rchan: - // is it an error response? - if response.err != nil { - return id, response.err - } - if result == nil || len(response.result) == 0 { - return id, nil - } - if err := json.Unmarshal(response.result, result); err != nil { - return id, fmt.Errorf("unmarshaling result: %v", err) - } - return id, nil - case <-ctx.Done(): - return id, ctx.Err() - } -} - -func replier(conn *Conn, req Request, spanDone func()) Replier { - return func(ctx context.Context, result interface{}, err error) error { - defer func() { - recordStatus(ctx, err) - spanDone() - }() - call, ok := req.(*Call) - if !ok { - // request was a notify, no need to respond - return nil - } - response, err := NewResponse(call.id, result, err) - if err != nil { - return err - } - n, err := conn.write(ctx, response) - event.Metric(ctx, tag.SentBytes.Of(n)) - if err != nil { - // TODO(iancottrell): if a stream write fails, we really need to shut down - // the whole stream - return err - } - return nil - } -} - -func (c *Conn) write(ctx context.Context, msg Message) (int64, error) { - c.writeMu.Lock() - defer c.writeMu.Unlock() - return c.stream.Write(ctx, msg) -} - -// Go starts a goroutine to handle the connection. -// It must be called exactly once for each Conn. -// It returns immediately. -// You must block on Done() to wait for the connection to shut down. -// This is a temporary measure, this should be started automatically in the -// future. -func (c *Conn) Go(ctx context.Context, handler Handler) { - go c.run(ctx, handler) -} - -func (c *Conn) run(ctx context.Context, handler Handler) { - defer close(c.done) - for { - // get the next message - msg, n, err := c.stream.Read(ctx) - if err != nil { - // The stream failed, we cannot continue. - c.fail(err) - return - } - switch msg := msg.(type) { - case Request: - labels := []label.Label{ - tag.Method.Of(msg.Method()), - tag.RPCDirection.Of(tag.Inbound), - {}, // reserved for ID if present - } - if call, ok := msg.(*Call); ok { - labels[len(labels)-1] = tag.RPCID.Of(fmt.Sprintf("%q", call.ID())) - } else { - labels = labels[:len(labels)-1] - } - reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...) - event.Metric(reqCtx, - tag.Started.Of(1), - tag.ReceivedBytes.Of(n)) - if err := handler(reqCtx, replier(c, msg, spanDone), msg); err != nil { - // delivery failed, not much we can do - event.Error(reqCtx, "jsonrpc2 message delivery failed", err) - } - case *Response: - // If method is not set, this should be a response, in which case we must - // have an id to send the response back to the caller. - c.pendingMu.Lock() - rchan, ok := c.pending[msg.id] - c.pendingMu.Unlock() - if ok { - rchan <- msg - } - } - } -} - -// Close closes the underlying stream. -// This does not wait for the underlying handler to finish, block on the done -// channel with <-Done() for that purpose. -func (c *Conn) Close() error { - return c.stream.Close() -} - -// Done returns a channel that will be closed when the processing goroutine has -// terminated, which will happen if Close() is called or the underlying -// stream is closed. -func (c *Conn) Done() <-chan struct{} { - return c.done -} - -// Err returns an error if there was one from within the processing goroutine. -// If err returns non nil, the connection will be already closed or closing. -func (c *Conn) Err() error { - if err := c.err.Load(); err != nil { - return err.(error) - } - return nil -} - -// fail sets a failure condition on the stream and closes it. -func (c *Conn) fail(err error) { - c.err.Store(err) - c.stream.Close() -} - -func marshalToRaw(obj interface{}) (json.RawMessage, error) { - data, err := json.Marshal(obj) - if err != nil { - return json.RawMessage{}, err - } - return json.RawMessage(data), nil -} - -func recordStatus(ctx context.Context, err error) { - if err != nil { - event.Label(ctx, tag.StatusCode.Of("ERROR")) - } else { - event.Label(ctx, tag.StatusCode.Of("OK")) - } -} diff --git a/internal/jsonrpc2/jsonrpc2_test.go b/internal/jsonrpc2/jsonrpc2_test.go index 8443a9e8d5..f62977edfc 100644 --- a/internal/jsonrpc2/jsonrpc2_test.go +++ b/internal/jsonrpc2/jsonrpc2_test.go @@ -90,7 +90,7 @@ func TestCall(t *testing.T) { } } -func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Conn, *jsonrpc2.Conn, func()) { +func prepare(ctx context.Context, t *testing.T, withHeaders bool) (jsonrpc2.Conn, jsonrpc2.Conn, func()) { // make a wait group that can be used to wait for the system to shut down aPipe, bPipe := net.Pipe() a := run(ctx, withHeaders, aPipe) @@ -103,7 +103,7 @@ func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Con } } -func run(ctx context.Context, withHeaders bool, nc net.Conn) *jsonrpc2.Conn { +func run(ctx context.Context, withHeaders bool, nc net.Conn) jsonrpc2.Conn { var stream jsonrpc2.Stream if withHeaders { stream = jsonrpc2.NewHeaderStream(nc) diff --git a/internal/jsonrpc2/messages.go b/internal/jsonrpc2/messages.go index 45efc53294..ff2bd47a32 100644 --- a/internal/jsonrpc2/messages.go +++ b/internal/jsonrpc2/messages.go @@ -225,3 +225,11 @@ func DecodeMessage(data []byte) (Message, error) { } return call, nil } + +func marshalToRaw(obj interface{}) (json.RawMessage, error) { + data, err := json.Marshal(obj) + if err != nil { + return json.RawMessage{}, err + } + return json.RawMessage(data), nil +} diff --git a/internal/jsonrpc2/servertest/servertest.go b/internal/jsonrpc2/servertest/servertest.go index a3c5589560..9c93f6ff21 100644 --- a/internal/jsonrpc2/servertest/servertest.go +++ b/internal/jsonrpc2/servertest/servertest.go @@ -17,7 +17,7 @@ import ( // Connector is the interface used to connect to a server. type Connector interface { - Connect(context.Context) *jsonrpc2.Conn + Connect(context.Context) jsonrpc2.Conn } // TCPServer is a helper for executing tests against a remote jsonrpc2 @@ -48,7 +48,7 @@ func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer, framer json // Connect dials the test server and returns a jsonrpc2 Connection that is // ready for use. -func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn { +func (s *TCPServer) Connect(ctx context.Context) jsonrpc2.Conn { netConn, err := net.Dial("tcp", s.Addr) if err != nil { panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err)) @@ -81,7 +81,7 @@ func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer, framer jso } // Connect creates new io.Pipes and binds them to the underlying StreamServer. -func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn { +func (s *PipeServer) Connect(ctx context.Context) jsonrpc2.Conn { sPipe, cPipe := net.Pipe() s.cls.add(func() { sPipe.Close() diff --git a/internal/lsp/fake/editor.go b/internal/lsp/fake/editor.go index 3b051c2786..feb47e463c 100644 --- a/internal/lsp/fake/editor.go +++ b/internal/lsp/fake/editor.go @@ -80,7 +80,7 @@ func NewEditor(ws *Sandbox, config EditorConfig) *Editor { // // It returns the editor, so that it may be called as follows: // editor, err := NewEditor(s).Connect(ctx, conn) -func (e *Editor) Connect(ctx context.Context, conn *jsonrpc2.Conn, hooks ClientHooks) (*Editor, error) { +func (e *Editor) Connect(ctx context.Context, conn jsonrpc2.Conn, hooks ClientHooks) (*Editor, error) { e.Server = protocol.ServerDispatcher(conn) e.client = &Client{editor: e, hooks: hooks} conn.Go(ctx, diff --git a/internal/lsp/protocol/protocol.go b/internal/lsp/protocol/protocol.go index 969ab7e50a..b9cddbf235 100644 --- a/internal/lsp/protocol/protocol.go +++ b/internal/lsp/protocol/protocol.go @@ -21,13 +21,13 @@ var ( // ClientDispatcher returns a Client that dispatches LSP requests across the // given jsonrpc2 connection. -func ClientDispatcher(conn *jsonrpc2.Conn) Client { +func ClientDispatcher(conn jsonrpc2.Conn) Client { return &clientDispatcher{Conn: conn} } // ServerDispatcher returns a Server that dispatches LSP requests across the // given jsonrpc2 connection. -func ServerDispatcher(conn *jsonrpc2.Conn) Server { +func ServerDispatcher(conn jsonrpc2.Conn) Server { return &serverDispatcher{Conn: conn} } @@ -72,7 +72,7 @@ func CancelHandler(handler jsonrpc2.Handler) jsonrpc2.Handler { } } -func Call(ctx context.Context, conn *jsonrpc2.Conn, method string, params interface{}, result interface{}) error { +func Call(ctx context.Context, conn jsonrpc2.Conn, method string, params interface{}, result interface{}) error { id, err := conn.Call(ctx, method, params, result) if ctx.Err() != nil { cancelCall(ctx, conn, id) @@ -80,7 +80,7 @@ func Call(ctx context.Context, conn *jsonrpc2.Conn, method string, params interf return err } -func cancelCall(ctx context.Context, conn *jsonrpc2.Conn, id jsonrpc2.ID) { +func cancelCall(ctx context.Context, conn jsonrpc2.Conn, id jsonrpc2.ID) { ctx = xcontext.Detach(ctx) ctx, done := event.Start(ctx, "protocol.canceller") defer done() diff --git a/internal/lsp/protocol/tsclient.go b/internal/lsp/protocol/tsclient.go index fbb7cfc39b..0876c9e171 100644 --- a/internal/lsp/protocol/tsclient.go +++ b/internal/lsp/protocol/tsclient.go @@ -129,7 +129,7 @@ func ClientHandler(client Client, handler jsonrpc2.Handler) jsonrpc2.Handler { } type clientDispatcher struct { - *jsonrpc2.Conn + jsonrpc2.Conn } func (s *clientDispatcher) ShowMessage(ctx context.Context, params *ShowMessageParams) error { diff --git a/internal/lsp/protocol/tsserver.go b/internal/lsp/protocol/tsserver.go index abbd813710..c0e1cded2e 100644 --- a/internal/lsp/protocol/tsserver.go +++ b/internal/lsp/protocol/tsserver.go @@ -427,7 +427,7 @@ func ServerHandler(server Server, handler jsonrpc2.Handler) jsonrpc2.Handler { } type serverDispatcher struct { - *jsonrpc2.Conn + jsonrpc2.Conn } func (s *serverDispatcher) DidChangeWorkspaceFolders(ctx context.Context, params *DidChangeWorkspaceFoldersParams) error { diff --git a/internal/lsp/protocol/typescript/code.ts b/internal/lsp/protocol/typescript/code.ts index 97dd7f028e..db7dec4fac 100644 --- a/internal/lsp/protocol/typescript/code.ts +++ b/internal/lsp/protocol/typescript/code.ts @@ -1096,7 +1096,7 @@ function output(side: side) { }`); f(` type ${side.name}Dispatcher struct { - *jsonrpc2.Conn + jsonrpc2.Conn } `); side.calls.forEach((v) => {f(v)}); diff --git a/internal/lsp/regtest/env.go b/internal/lsp/regtest/env.go index ee9e6cab51..6bdb3f38e8 100644 --- a/internal/lsp/regtest/env.go +++ b/internal/lsp/regtest/env.go @@ -31,7 +31,7 @@ type Env struct { Sandbox *fake.Sandbox Editor *fake.Editor Server servertest.Connector - Conn *jsonrpc2.Conn + Conn jsonrpc2.Conn // mu guards the fields below, for the purpose of checking conditions on // every change to diagnostics.