internal/jsonrpc2: break Run up into composable handlers

Change-Id: I9aff86f7ab06e61849495b9c73553147e29343f1
Reviewed-on: https://go-review.googlesource.com/c/tools/+/226840
Run-TryBot: Ian Cottrell <iancottrell@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-03-31 21:00:00 -04:00
parent 5c4bdbc02c
commit 3eebf4bf9d
2 changed files with 106 additions and 71 deletions

View File

@ -7,6 +7,9 @@ package jsonrpc2
import (
"context"
"fmt"
"sync"
"golang.org/x/tools/internal/telemetry/event"
)
// Handler is invoked to handle incoming requests.
@ -80,3 +83,79 @@ func MustReply(handler Handler) Handler {
return err
}
}
// CancelHandler returns a handler that supports cancellation, and a canceller
// that can be used to trigger canceling in progress requests.
func CancelHandler(handler Handler) (Handler, Canceller) {
var mu sync.Mutex
handling := make(map[ID]context.CancelFunc)
wrapped := func(ctx context.Context, req *Request) error {
if req.ID != nil {
cancelCtx, cancel := context.WithCancel(ctx)
ctx = cancelCtx
mu.Lock()
handling[*req.ID] = cancel
mu.Unlock()
req.OnReply(func() {
mu.Lock()
delete(handling, *req.ID)
mu.Unlock()
})
}
return handler(ctx, req)
}
return wrapped, func(id ID) {
mu.Lock()
cancel, found := handling[id]
mu.Unlock()
if found {
cancel()
}
}
}
// AsyncHandler returns a handler that processes each request goes in its own
// goroutine.
// The handler returns immediately, without the request being processed.
// Each request then waits for the previous request to finish before it starts.
// This allows the stream to unblock at the cost of unbounded goroutines
// all stalled on the previous one.
func AsyncHandler(handler Handler) Handler {
nextRequest := make(chan struct{})
close(nextRequest)
return func(ctx context.Context, req *Request) error {
waitForPrevious := nextRequest
nextRequest = make(chan struct{})
unlockNext := nextRequest
req.OnReply(func() { close(unlockNext) })
_, queueDone := event.StartSpan(ctx, "queued")
go func() {
<-waitForPrevious
queueDone()
if err := handler(ctx, req); err != nil {
event.Error(ctx, "jsonrpc2 async message delivery failed", err)
}
}()
return nil
}
}
func legacyDeliverHandler(handler Handler) Handler {
return func(ctx context.Context, req *Request) error {
if req.conn.LegacyHooks != nil {
if req.conn.LegacyHooks.Deliver(ctx, req, false) {
return nil
}
}
return handler(ctx, req)
}
}
func legacyRequestHandler(handler Handler) Handler {
return func(ctx context.Context, req *Request) error {
if req.conn.LegacyHooks != nil {
ctx = req.conn.LegacyHooks.Request(ctx, req.conn, Receive, &req.WireRequest)
}
return handler(ctx, req)
}
}

View File

@ -34,14 +34,12 @@ type Conn struct {
stream Stream
pendingMu sync.Mutex // protects the pending map
pending map[ID]chan *WireResponse
handlingMu sync.Mutex // protects the handling map
handling map[ID]*Request
canceller Canceller
}
// Request is sent to a server to represent a Call or Notify operaton.
type Request struct {
conn *Conn
cancel context.CancelFunc
conn *Conn
// done holds set of callbacks added by OnReply, and is set back to nil if
// Reply has been called.
done []func()
@ -50,6 +48,9 @@ type Request struct {
WireRequest
}
// Canceller is the type for a function that can cancel an in progress request.
type Canceller func(id ID)
type constError string
func (e constError) Error() string { return string(e) }
@ -67,9 +68,8 @@ func NewErrorf(code int64, format string, args ...interface{}) *Error {
// You must call Run for the connection to be active.
func NewConn(s Stream) *Conn {
conn := &Conn{
stream: s,
pending: make(map[ID]chan *WireResponse),
handling: make(map[ID]*Request),
stream: s,
pending: make(map[ID]chan *WireResponse),
}
return conn
}
@ -80,12 +80,7 @@ func NewConn(s Stream) *Conn {
// directly wired in. This method allows a higher level protocol to choose how
// to propagate the cancel.
func (c *Conn) Cancel(id ID) {
c.handlingMu.Lock()
handling, found := c.handling[id]
c.handlingMu.Unlock()
if found {
handling.cancel()
}
c.canceller(id)
}
// Notify is called to send a notification request over the connection.
@ -259,19 +254,6 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro
return nil
}
func setHandling(r *Request, active bool) {
if r.ID == nil {
return
}
r.conn.handlingMu.Lock()
defer r.conn.handlingMu.Unlock()
if active {
r.conn.handling[*r.ID] = r
} else {
delete(r.conn.handling, *r.ID)
}
}
// OnReply adds a done callback to the request.
// All added callbacks are invoked during the one required call to Reply, and
// then dropped.
@ -300,11 +282,10 @@ type combined struct {
// It returns only when the reader is closed or there is an error in the stream.
func (c *Conn) Run(runCtx context.Context, handler Handler) error {
handler = MustReply(handler)
// we need to make the next request "lock" in an unlocked state to allow
// the first incoming request to proceed. All later requests are unlocked
// by the preceding request going to parallel mode.
nextRequest := make(chan struct{})
close(nextRequest)
handler = legacyDeliverHandler(handler)
handler = AsyncHandler(handler)
handler = legacyRequestHandler(handler)
handler, c.canceller = CancelHandler(handler)
for {
// get the data for a message
data, n, err := c.stream.Read(runCtx)
@ -324,13 +305,17 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
switch {
case msg.Method != "":
// If method is set it must be a request.
reqCtx, cancelReq := context.WithCancel(runCtx)
waitForPrevious := nextRequest
nextRequest = make(chan struct{})
unlockNext := nextRequest
reqCtx, spanDone := event.StartSpan(runCtx, msg.Method,
tag.Method.Of(msg.Method),
tag.RPCDirection.Of(tag.Inbound),
tag.RPCID.Of(msg.ID.String()),
)
event.Record(reqCtx,
tag.Started.Of(1),
tag.SentBytes.Of(n))
req := &Request{
conn: c,
cancel: cancelReq,
conn: c,
WireRequest: WireRequest{
VersionTag: msg.VersionTag,
Method: msg.Method,
@ -338,41 +323,12 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
ID: msg.ID,
},
}
req.OnReply(func() {
close(unlockNext)
})
if c.LegacyHooks != nil {
reqCtx = c.LegacyHooks.Request(reqCtx, c, Receive, &req.WireRequest)
req.OnReply(func() { spanDone() })
if err := handler(reqCtx, req); err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}
reqCtx, done := event.StartSpan(reqCtx, req.WireRequest.Method,
tag.Method.Of(req.WireRequest.Method),
tag.RPCDirection.Of(tag.Inbound),
tag.RPCID.Of(req.WireRequest.ID.String()),
)
event.Record(reqCtx,
tag.Started.Of(1),
tag.SentBytes.Of(n))
setHandling(req, true)
_, queueDone := event.StartSpan(reqCtx, "queued")
go func() {
<-waitForPrevious
queueDone()
defer func() {
setHandling(req, false)
done()
cancelReq()
}()
if c.LegacyHooks != nil {
if c.LegacyHooks.Deliver(reqCtx, req, false) {
return
}
}
err := handler(reqCtx, req)
if err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}
}()
case msg.ID != nil:
// If method is not set, this should be a response, in which case we must
// have an id to send the response back to the caller.