diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index 49faee2209..245ce030c1 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -216,39 +216,26 @@ func (r *Request) IsNotify() bool { return r.ID == nil } -// Parallel indicates that the system is now allowed to process other requests -// in parallel with this one. -// It is safe to call any number of times, but must only be called from the -// request handling go routine. -// It is implied by both reply and by the handler returning. -func (r *Request) Parallel() { - if r.state >= requestParallel { - return - } - r.state = requestParallel - close(r.nextRequest) -} - // Reply sends a reply to the given request. -// It is an error to call this if request was not a call. // You must call this exactly once for any given request. -// It should only be called from the handler go routine. // If err is set then result will be ignored. -// If the request has not yet dropped into parallel mode -// it will be before this function returns. +// This will mark the request as done, triggering any done +// handlers func (r *Request) Reply(ctx context.Context, result interface{}, err error) error { if r.state >= requestReplied { return fmt.Errorf("reply invoked more than once") } - if r.IsNotify() { - return fmt.Errorf("reply not invoked with a valid call: %v, %v", r.Method, r.Params) + + if r.state < requestParallel { + r.state = requestParallel + close(r.nextRequest) } - // reply ends the handling phase of a call, so if we are not yet - // parallel we should be now. The go routine is allowed to continue - // to do work after replying, which is why it is important to unlock - // the rpc system at this point. - r.Parallel() r.state = requestReplied + recordStatus(ctx, nil) + + if r.IsNotify() { + return nil + } var raw *json.RawMessage if err == nil { @@ -280,7 +267,7 @@ func (r *Request) Reply(ctx context.Context, result interface{}, err error) erro return nil } -func (c *Conn) setHandling(r *Request, active bool) { +func setHandling(r *Request, active bool) { if r.ID == nil { return } @@ -358,19 +345,17 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error { event.Record(reqCtx, tag.Started.Of(1), tag.SentBytes.Of(n)) - c.setHandling(req, true) + setHandling(req, true) _, queueDone := event.StartSpan(reqCtx, "queued") go func() { <-thisRequest queueDone() req.state = requestSerial defer func() { - c.setHandling(req, false) - if !req.IsNotify() && req.state < requestReplied { - req.Reply(reqCtx, nil, NewErrorf(CodeInternalError, "method %q did not reply", req.Method)) + setHandling(req, false) + if req.state < requestReplied { + req.Reply(reqCtx, nil, nil) } - req.Parallel() - recordStatus(reqCtx, nil) done() cancelReq() }()