internal/jsonrpc2: cleanup reply handling

Instead of having a Parallel method, we only have Reply, which must also be used
for Notify messages (with a nil response).
This has no real functional impact but makes it easier to refactor in the next
cl.

Change-Id: Ifd4316dd71706de7913c69e6be539b966800e9dd
Reviewed-on: https://go-review.googlesource.com/c/tools/+/226837
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-03-31 15:18:39 -04:00
parent 6dc6d5718f
commit 7e0acf58eb
1 changed files with 16 additions and 31 deletions

View File

@ -216,39 +216,26 @@ func (r *Request) IsNotify() bool {
return r.ID == nil
}
// Parallel indicates that the system is now allowed to process other requests
// in parallel with this one.
// It is safe to call any number of times, but must only be called from the
// request handling go routine.
// It is implied by both reply and by the handler returning.
func (r *Request) Parallel() {
if r.state >= requestParallel {
return
}
r.state = requestParallel
close(r.nextRequest)
}
// Reply sends a reply to the given request.
// It is an error to call this if request was not a call.
// You must call this exactly once for any given request.
// It should only be called from the handler go routine.
// If err is set then result will be ignored.
// If the request has not yet dropped into parallel mode
// it will be before this function returns.
// This will mark the request as done, triggering any done
// handlers
func (r *Request) Reply(ctx context.Context, result interface{}, err error) error {
if r.state >= requestReplied {
return fmt.Errorf("reply invoked more than once")
}
if r.IsNotify() {
return fmt.Errorf("reply not invoked with a valid call: %v, %v", r.Method, r.Params)
if r.state < requestParallel {
r.state = requestParallel
close(r.nextRequest)
}
// reply ends the handling phase of a call, so if we are not yet
// parallel we should be now. The go routine is allowed to continue
// to do work after replying, which is why it is important to unlock
// the rpc system at this point.
r.Parallel()
r.state = requestReplied
recordStatus(ctx, nil)
if r.IsNotify() {
return nil
}
var raw *json.RawMessage
if err == nil {
@ -280,7 +267,7 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro
return nil
}
func (c *Conn) setHandling(r *Request, active bool) {
func setHandling(r *Request, active bool) {
if r.ID == nil {
return
}
@ -358,19 +345,17 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
event.Record(reqCtx,
tag.Started.Of(1),
tag.SentBytes.Of(n))
c.setHandling(req, true)
setHandling(req, true)
_, queueDone := event.StartSpan(reqCtx, "queued")
go func() {
<-thisRequest
queueDone()
req.state = requestSerial
defer func() {
c.setHandling(req, false)
if !req.IsNotify() && req.state < requestReplied {
req.Reply(reqCtx, nil, NewErrorf(CodeInternalError, "method %q did not reply", req.Method))
setHandling(req, false)
if req.state < requestReplied {
req.Reply(reqCtx, nil, nil)
}
req.Parallel()
recordStatus(reqCtx, nil)
done()
cancelReq()
}()