From 3eebf4bf9d0bbed7d2f657662be1ce7d9509fa4a Mon Sep 17 00:00:00 2001 From: Ian Cottrell Date: Tue, 31 Mar 2020 21:00:00 -0400 Subject: [PATCH] internal/jsonrpc2: break Run up into composable handlers Change-Id: I9aff86f7ab06e61849495b9c73553147e29343f1 Reviewed-on: https://go-review.googlesource.com/c/tools/+/226840 Run-TryBot: Ian Cottrell Reviewed-by: Robert Findley --- internal/jsonrpc2/handler.go | 79 ++++++++++++++++++++++++++++ internal/jsonrpc2/jsonrpc2.go | 98 ++++++++++------------------------- 2 files changed, 106 insertions(+), 71 deletions(-) diff --git a/internal/jsonrpc2/handler.go b/internal/jsonrpc2/handler.go index 860ea1f52a..0bcfdfb8ef 100644 --- a/internal/jsonrpc2/handler.go +++ b/internal/jsonrpc2/handler.go @@ -7,6 +7,9 @@ package jsonrpc2 import ( "context" "fmt" + "sync" + + "golang.org/x/tools/internal/telemetry/event" ) // Handler is invoked to handle incoming requests. @@ -80,3 +83,79 @@ func MustReply(handler Handler) Handler { return err } } + +// CancelHandler returns a handler that supports cancellation, and a canceller +// that can be used to trigger canceling in progress requests. +func CancelHandler(handler Handler) (Handler, Canceller) { + var mu sync.Mutex + handling := make(map[ID]context.CancelFunc) + wrapped := func(ctx context.Context, req *Request) error { + if req.ID != nil { + cancelCtx, cancel := context.WithCancel(ctx) + ctx = cancelCtx + mu.Lock() + handling[*req.ID] = cancel + mu.Unlock() + req.OnReply(func() { + mu.Lock() + delete(handling, *req.ID) + mu.Unlock() + }) + } + return handler(ctx, req) + } + return wrapped, func(id ID) { + mu.Lock() + cancel, found := handling[id] + mu.Unlock() + if found { + cancel() + } + } +} + +// AsyncHandler returns a handler that processes each request goes in its own +// goroutine. +// The handler returns immediately, without the request being processed. +// Each request then waits for the previous request to finish before it starts. +// This allows the stream to unblock at the cost of unbounded goroutines +// all stalled on the previous one. +func AsyncHandler(handler Handler) Handler { + nextRequest := make(chan struct{}) + close(nextRequest) + return func(ctx context.Context, req *Request) error { + waitForPrevious := nextRequest + nextRequest = make(chan struct{}) + unlockNext := nextRequest + req.OnReply(func() { close(unlockNext) }) + _, queueDone := event.StartSpan(ctx, "queued") + go func() { + <-waitForPrevious + queueDone() + if err := handler(ctx, req); err != nil { + event.Error(ctx, "jsonrpc2 async message delivery failed", err) + } + }() + return nil + } +} + +func legacyDeliverHandler(handler Handler) Handler { + return func(ctx context.Context, req *Request) error { + if req.conn.LegacyHooks != nil { + if req.conn.LegacyHooks.Deliver(ctx, req, false) { + return nil + } + } + return handler(ctx, req) + } +} + +func legacyRequestHandler(handler Handler) Handler { + return func(ctx context.Context, req *Request) error { + if req.conn.LegacyHooks != nil { + ctx = req.conn.LegacyHooks.Request(ctx, req.conn, Receive, &req.WireRequest) + } + return handler(ctx, req) + } +} diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index c67a72ac96..3d5ffa4780 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -34,14 +34,12 @@ type Conn struct { stream Stream pendingMu sync.Mutex // protects the pending map pending map[ID]chan *WireResponse - handlingMu sync.Mutex // protects the handling map - handling map[ID]*Request + canceller Canceller } // Request is sent to a server to represent a Call or Notify operaton. type Request struct { - conn *Conn - cancel context.CancelFunc + conn *Conn // done holds set of callbacks added by OnReply, and is set back to nil if // Reply has been called. done []func() @@ -50,6 +48,9 @@ type Request struct { WireRequest } +// Canceller is the type for a function that can cancel an in progress request. +type Canceller func(id ID) + type constError string func (e constError) Error() string { return string(e) } @@ -67,9 +68,8 @@ func NewErrorf(code int64, format string, args ...interface{}) *Error { // You must call Run for the connection to be active. func NewConn(s Stream) *Conn { conn := &Conn{ - stream: s, - pending: make(map[ID]chan *WireResponse), - handling: make(map[ID]*Request), + stream: s, + pending: make(map[ID]chan *WireResponse), } return conn } @@ -80,12 +80,7 @@ func NewConn(s Stream) *Conn { // directly wired in. This method allows a higher level protocol to choose how // to propagate the cancel. func (c *Conn) Cancel(id ID) { - c.handlingMu.Lock() - handling, found := c.handling[id] - c.handlingMu.Unlock() - if found { - handling.cancel() - } + c.canceller(id) } // Notify is called to send a notification request over the connection. @@ -259,19 +254,6 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro return nil } -func setHandling(r *Request, active bool) { - if r.ID == nil { - return - } - r.conn.handlingMu.Lock() - defer r.conn.handlingMu.Unlock() - if active { - r.conn.handling[*r.ID] = r - } else { - delete(r.conn.handling, *r.ID) - } -} - // OnReply adds a done callback to the request. // All added callbacks are invoked during the one required call to Reply, and // then dropped. @@ -300,11 +282,10 @@ type combined struct { // It returns only when the reader is closed or there is an error in the stream. func (c *Conn) Run(runCtx context.Context, handler Handler) error { handler = MustReply(handler) - // we need to make the next request "lock" in an unlocked state to allow - // the first incoming request to proceed. All later requests are unlocked - // by the preceding request going to parallel mode. - nextRequest := make(chan struct{}) - close(nextRequest) + handler = legacyDeliverHandler(handler) + handler = AsyncHandler(handler) + handler = legacyRequestHandler(handler) + handler, c.canceller = CancelHandler(handler) for { // get the data for a message data, n, err := c.stream.Read(runCtx) @@ -324,13 +305,17 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error { switch { case msg.Method != "": // If method is set it must be a request. - reqCtx, cancelReq := context.WithCancel(runCtx) - waitForPrevious := nextRequest - nextRequest = make(chan struct{}) - unlockNext := nextRequest + reqCtx, spanDone := event.StartSpan(runCtx, msg.Method, + tag.Method.Of(msg.Method), + tag.RPCDirection.Of(tag.Inbound), + tag.RPCID.Of(msg.ID.String()), + ) + event.Record(reqCtx, + tag.Started.Of(1), + tag.SentBytes.Of(n)) + req := &Request{ - conn: c, - cancel: cancelReq, + conn: c, WireRequest: WireRequest{ VersionTag: msg.VersionTag, Method: msg.Method, @@ -338,41 +323,12 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error { ID: msg.ID, }, } - req.OnReply(func() { - close(unlockNext) - }) - if c.LegacyHooks != nil { - reqCtx = c.LegacyHooks.Request(reqCtx, c, Receive, &req.WireRequest) + req.OnReply(func() { spanDone() }) + + if err := handler(reqCtx, req); err != nil { + // delivery failed, not much we can do + event.Error(reqCtx, "jsonrpc2 message delivery failed", err) } - reqCtx, done := event.StartSpan(reqCtx, req.WireRequest.Method, - tag.Method.Of(req.WireRequest.Method), - tag.RPCDirection.Of(tag.Inbound), - tag.RPCID.Of(req.WireRequest.ID.String()), - ) - event.Record(reqCtx, - tag.Started.Of(1), - tag.SentBytes.Of(n)) - setHandling(req, true) - _, queueDone := event.StartSpan(reqCtx, "queued") - go func() { - <-waitForPrevious - queueDone() - defer func() { - setHandling(req, false) - done() - cancelReq() - }() - if c.LegacyHooks != nil { - if c.LegacyHooks.Deliver(reqCtx, req, false) { - return - } - } - err := handler(reqCtx, req) - if err != nil { - // delivery failed, not much we can do - event.Error(reqCtx, "jsonrpc2 message delivery failed", err) - } - }() case msg.ID != nil: // If method is not set, this should be a response, in which case we must // have an id to send the response back to the caller.