mirror of https://github.com/golang/go.git
internal/jsonrpc2_v2: rework Connection concurrency
This change fixes the semantics of Close to actually wait for in-flight requests before closing the ReadWriteCloser. (Previously, the Close method closed the ReadWriteCloser immediately, which I suspect is what led to many of the failures observed in golang/go#49387 and golang/go#46520.) It achieves this by explicitly tracking the number of in-flight requests, including requests with pending async responses, and explicitly rejecting new Call requests (while keeping the read loop open!) once Close has begun. To make it easier for me to reason about the request lifetimes, I reduced the number of long-lived goroutines from three to just one (the Read loop), with an additional Handler goroutine that runs only while the Handler queue is non-empty. Now, it is clearer (I hope!) that the number of in-flight async requests strictly decreases after Close has begun, even though the Read goroutine continues to read requests (and, importantly, responses) and to forward Notifications to the preempter. For golang/go#49387 For golang/go#46520 Change-Id: Idf5960f848108a7ced78c5382099c8692e9b181e Reviewed-on: https://go-review.googlesource.com/c/tools/+/388134 gopls-CI: kokoro <noreply+kokoro@google.com> Run-TryBot: Bryan Mills <bcmills@google.com> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Alan Donovan <adonovan@google.com>
This commit is contained in:
parent
e172e97c52
commit
739f55d751
|
|
@ -7,12 +7,15 @@ package jsonrpc2
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/tools/internal/event"
|
||||
"golang.org/x/tools/internal/event/keys"
|
||||
"golang.org/x/tools/internal/event/label"
|
||||
"golang.org/x/tools/internal/event/tag"
|
||||
)
|
||||
|
|
@ -48,6 +51,10 @@ type ConnectionOptions struct {
|
|||
// Handler is used as the queued message handler for inbound messages.
|
||||
// If nil, all responses will be ErrNotHandled.
|
||||
Handler Handler
|
||||
// OnInternalError, if non-nil, is called with any internal errors that occur
|
||||
// while serving the connection, such as protocol errors or invariant
|
||||
// violations. (If nil, internal errors result in panics.)
|
||||
OnInternalError func(error)
|
||||
}
|
||||
|
||||
// Connection manages the jsonrpc2 protocol, connecting responses back to their
|
||||
|
|
@ -57,34 +64,65 @@ type ConnectionOptions struct {
|
|||
type Connection struct {
|
||||
seq int64 // must only be accessed using atomic operations
|
||||
|
||||
closeOnce sync.Once
|
||||
closer io.Closer
|
||||
stateMu sync.Mutex
|
||||
state inFlightState // accessed only in updateInFlight
|
||||
|
||||
writer chan Writer
|
||||
outgoing chan map[ID]chan<- *Response
|
||||
incoming chan map[ID]*incoming
|
||||
async *async
|
||||
closer io.Closer // shuts down connection when Close has been called or the reader fails
|
||||
closeErr chan error // 1-buffered; stores the error from closer.Close
|
||||
writer chan Writer // 1-buffered; stores the writer when not in use
|
||||
|
||||
handler Handler
|
||||
|
||||
onInternalError func(error)
|
||||
}
|
||||
|
||||
type AsyncCall struct {
|
||||
id ID
|
||||
response chan *Response // the channel a response will be delivered on
|
||||
result chan asyncResult
|
||||
endSpan func() // close the tracing span when all processing for the message is complete
|
||||
// inFlightState records the state of the incoming and outgoing calls on a
|
||||
// Connection.
|
||||
type inFlightState struct {
|
||||
closing bool // disallow enqueuing further requests, and close the Closer when transitioning to idle
|
||||
readErr error
|
||||
|
||||
outgoing map[ID]*AsyncCall // calls only
|
||||
|
||||
// incoming stores the total number of incoming calls and notifications
|
||||
// that have not yet written or processed a result.
|
||||
incoming int
|
||||
|
||||
incomingByID map[ID]*incomingRequest // calls only
|
||||
|
||||
// handlerQueue stores the backlog of calls and notifications that were not
|
||||
// already handled by a preempter.
|
||||
// The queue does not include the request currently being handled (if any).
|
||||
handlerQueue []*incomingRequest
|
||||
handlerRunning bool
|
||||
|
||||
closed bool // true after the closer has been invoked
|
||||
}
|
||||
|
||||
type asyncResult struct {
|
||||
result []byte
|
||||
err error
|
||||
// updateInFlight locks the state of the connection's in-flight requests, allows
|
||||
// f to mutate that state, and closes the connection if it is idle and either
|
||||
// is closing or has a read error.
|
||||
func (c *Connection) updateInFlight(f func(*inFlightState)) {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
s := &c.state
|
||||
|
||||
f(s)
|
||||
|
||||
idle := s.incoming == 0 && len(s.outgoing) == 0 && !s.handlerRunning
|
||||
if idle && (s.closing || s.readErr != nil) && !s.closed {
|
||||
c.closeErr <- c.closer.Close()
|
||||
s.closed = true
|
||||
}
|
||||
}
|
||||
|
||||
// incoming is used to track an incoming request as it is being handled
|
||||
type incoming struct {
|
||||
request *Request // the request being processed
|
||||
baseCtx context.Context // a base context for the message processing
|
||||
done func() // a function called when all processing for the message is complete
|
||||
handleCtx context.Context // the context for handling the message, child of baseCtx
|
||||
cancel func() // a function that cancels the handling context
|
||||
// incomingRequest is used to track an incoming request as it is being handled
|
||||
type incomingRequest struct {
|
||||
*Request // the request being processed
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
endSpan func() // called (and set to nil) when the response is sent
|
||||
}
|
||||
|
||||
// Bind returns the options unmodified.
|
||||
|
|
@ -94,41 +132,35 @@ func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions
|
|||
|
||||
// newConnection creates a new connection and runs it.
|
||||
// This is used by the Dial and Serve functions to build the actual connection.
|
||||
func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
|
||||
func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
|
||||
// TODO: Should we create a new event span here?
|
||||
// This will propagate cancellation from ctx; should it?
|
||||
ctx := notDone{bindCtx}
|
||||
|
||||
c := &Connection{
|
||||
closer: rwc,
|
||||
closeErr: make(chan error, 1),
|
||||
writer: make(chan Writer, 1),
|
||||
outgoing: make(chan map[ID]chan<- *Response, 1),
|
||||
incoming: make(chan map[ID]*incoming, 1),
|
||||
async: newAsync(),
|
||||
}
|
||||
|
||||
options, err := binder.Bind(ctx, c)
|
||||
options, err := binder.Bind(bindCtx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if options.Framer == nil {
|
||||
options.Framer = HeaderFramer()
|
||||
framer := options.Framer
|
||||
if framer == nil {
|
||||
framer = HeaderFramer()
|
||||
}
|
||||
if options.Preempter == nil {
|
||||
options.Preempter = defaultHandler{}
|
||||
c.handler = options.Handler
|
||||
if c.handler == nil {
|
||||
c.handler = defaultHandler{}
|
||||
}
|
||||
if options.Handler == nil {
|
||||
options.Handler = defaultHandler{}
|
||||
}
|
||||
c.outgoing <- make(map[ID]chan<- *Response)
|
||||
c.incoming <- make(map[ID]*incoming)
|
||||
// the goroutines started here will continue until the underlying stream is closed
|
||||
reader := options.Framer.Reader(rwc)
|
||||
readToQueue := make(chan *incoming)
|
||||
queueToDeliver := make(chan *incoming)
|
||||
go c.readIncoming(ctx, reader, readToQueue)
|
||||
go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
|
||||
go c.deliverMessages(ctx, options.Handler, queueToDeliver)
|
||||
c.onInternalError = options.OnInternalError
|
||||
|
||||
// releaseing the writer must be the last thing we do in case any requests
|
||||
// are blocked waiting for the connection to be ready
|
||||
c.writer <- options.Framer.Writer(rwc)
|
||||
c.writer <- framer.Writer(rwc)
|
||||
reader := framer.Reader(rwc)
|
||||
// The goroutines started here will continue until the underlying stream is closed.
|
||||
go c.readIncoming(ctx, reader, options.Preempter)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
|
|
@ -146,12 +178,7 @@ func (c *Connection) Notify(ctx context.Context, method string, params interface
|
|||
)
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
err = c.write(ctx, notify)
|
||||
switch {
|
||||
case err != nil:
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
labelStatus(ctx, err)
|
||||
done()
|
||||
return err
|
||||
}
|
||||
|
|
@ -162,375 +189,439 @@ func (c *Connection) Notify(ctx context.Context, method string, params interface
|
|||
// You do not have to wait for the response, it can just be ignored if not needed.
|
||||
// If sending the call failed, the response will be ready and have the error in it.
|
||||
func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
|
||||
result := &AsyncCall{
|
||||
id: Int64ID(atomic.AddInt64(&c.seq, 1)),
|
||||
result: make(chan asyncResult, 1),
|
||||
}
|
||||
// generate a new request identifier
|
||||
call, err := NewCall(result.id, method, params)
|
||||
if err != nil {
|
||||
//set the result to failed
|
||||
result.result <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)}
|
||||
return result
|
||||
}
|
||||
// Generate a new request identifier.
|
||||
id := Int64ID(atomic.AddInt64(&c.seq, 1))
|
||||
ctx, endSpan := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
tag.RPCID.Of(fmt.Sprintf("%q", result.id)),
|
||||
tag.RPCID.Of(fmt.Sprintf("%q", id)),
|
||||
)
|
||||
result.endSpan = endSpan
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
// We have to add ourselves to the pending map before we send, otherwise we
|
||||
// are racing the response.
|
||||
// rchan is buffered in case the response arrives without a listener.
|
||||
result.response = make(chan *Response, 1)
|
||||
outgoing, ok := <-c.outgoing
|
||||
if !ok {
|
||||
// If the call failed due to (say) an I/O error or broken pipe, attribute it
|
||||
// as such. (If the error is nil, then the connection must have been shut
|
||||
// down cleanly.)
|
||||
err := c.async.wait()
|
||||
if err == nil {
|
||||
err = ErrClientClosing
|
||||
}
|
||||
|
||||
resp, respErr := NewResponse(result.id, nil, err)
|
||||
if respErr != nil {
|
||||
panic(fmt.Errorf("unexpected error from NewResponse: %w", respErr))
|
||||
ac := &AsyncCall{
|
||||
id: id,
|
||||
ready: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
endSpan: endSpan,
|
||||
}
|
||||
// When this method returns, either ac is retired, or the request has been
|
||||
// written successfully and the call is awaiting a response (to be provided by
|
||||
// the readIncoming goroutine).
|
||||
|
||||
call, err := NewCall(ac.id, method, params)
|
||||
if err != nil {
|
||||
ac.retire(&Response{ID: id, Error: fmt.Errorf("marshaling call parameters: %w", err)})
|
||||
return ac
|
||||
}
|
||||
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if s.closing {
|
||||
err = ErrClientClosing
|
||||
return
|
||||
}
|
||||
result.response <- resp
|
||||
return result
|
||||
if s.readErr != nil {
|
||||
// We must not start a new Call request if the read end of the connection
|
||||
// has already failed: a Call request requires a response, but with the
|
||||
// read side broken we have no way to receive that response.
|
||||
err = fmt.Errorf("%w: %v", ErrClientClosing, s.readErr)
|
||||
return
|
||||
}
|
||||
if s.outgoing == nil {
|
||||
s.outgoing = make(map[ID]*AsyncCall)
|
||||
}
|
||||
s.outgoing[ac.id] = ac
|
||||
})
|
||||
if err != nil {
|
||||
ac.retire(&Response{ID: id, Error: err})
|
||||
return ac
|
||||
}
|
||||
outgoing[result.id] = result.response
|
||||
c.outgoing <- outgoing
|
||||
// now we are ready to send
|
||||
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
if err := c.write(ctx, call); err != nil {
|
||||
// sending failed, we will never get a response, so deliver a fake one
|
||||
r, _ := NewResponse(result.id, nil, err)
|
||||
c.incomingResponse(r)
|
||||
// Sending failed. We will never get a response, so deliver a fake one if it
|
||||
// wasn't already retired by the connection breaking.
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if s.outgoing[ac.id] == ac {
|
||||
delete(s.outgoing, ac.id)
|
||||
ac.retire(&Response{ID: id, Error: err})
|
||||
} else {
|
||||
// ac was already retired by the readIncoming goroutine:
|
||||
// perhaps our write raced with the Read side of the connection breaking.
|
||||
}
|
||||
})
|
||||
}
|
||||
return result
|
||||
return ac
|
||||
}
|
||||
|
||||
type AsyncCall struct {
|
||||
id ID
|
||||
ready chan struct{} // closed after response has been set and span has been ended
|
||||
response *Response
|
||||
ctx context.Context // for event logging only
|
||||
endSpan func() // close the tracing span when all processing for the message is complete
|
||||
}
|
||||
|
||||
// ID used for this call.
|
||||
// This can be used to cancel the call if needed.
|
||||
func (a *AsyncCall) ID() ID { return a.id }
|
||||
func (ac *AsyncCall) ID() ID { return ac.id }
|
||||
|
||||
// IsReady can be used to check if the result is already prepared.
|
||||
// This is guaranteed to return true on a result for which Await has already
|
||||
// returned, or a call that failed to send in the first place.
|
||||
func (a *AsyncCall) IsReady() bool {
|
||||
func (ac *AsyncCall) IsReady() bool {
|
||||
select {
|
||||
case r := <-a.result:
|
||||
a.result <- r
|
||||
case <-ac.ready:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Await the results of a Call.
|
||||
// The response will be unmarshaled from JSON into the result.
|
||||
func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
|
||||
defer a.endSpan()
|
||||
var r asyncResult
|
||||
// retire processes the response to the call.
|
||||
func (ac *AsyncCall) retire(response *Response) {
|
||||
select {
|
||||
case <-ac.ready:
|
||||
panic(fmt.Sprintf("jsonrpc2: retire called twice for ID %v", ac.id))
|
||||
default:
|
||||
}
|
||||
|
||||
ac.response = response
|
||||
labelStatus(ac.ctx, response.Error)
|
||||
ac.endSpan()
|
||||
// Allow the trace context, which may retain a lot of reachable values,
|
||||
// to be garbage-collected.
|
||||
ac.ctx, ac.endSpan = nil, nil
|
||||
|
||||
close(ac.ready)
|
||||
}
|
||||
|
||||
// Await waits for (and decodes) the results of a Call.
|
||||
// The response will be unmarshaled from JSON into the result.
|
||||
func (ac *AsyncCall) Await(ctx context.Context, result interface{}) error {
|
||||
select {
|
||||
case response := <-a.response:
|
||||
// response just arrived, prepare the result
|
||||
switch {
|
||||
case response.Error != nil:
|
||||
r.err = response.Error
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
r.result = response.Result
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
case r = <-a.result:
|
||||
// result already available
|
||||
case <-ctx.Done():
|
||||
event.Label(ctx, tag.StatusCode.Of("CANCELLED"))
|
||||
return ctx.Err()
|
||||
case <-ac.ready:
|
||||
}
|
||||
// refill the box for the next caller
|
||||
a.result <- r
|
||||
// and unpack the result
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
if ac.response.Error != nil {
|
||||
return ac.response.Error
|
||||
}
|
||||
if result == nil || len(r.result) == 0 {
|
||||
if result == nil {
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal(r.result, result)
|
||||
return json.Unmarshal(ac.response.Result, result)
|
||||
}
|
||||
|
||||
// Respond delivers a response to an incoming Call.
|
||||
//
|
||||
// Respond must be called exactly once for any message for which a handler
|
||||
// returns ErrAsyncResponse. It must not be called for any other message.
|
||||
func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
|
||||
pending := <-c.incoming
|
||||
defer func() { c.incoming <- pending }()
|
||||
entry, found := pending[id]
|
||||
if !found {
|
||||
return nil
|
||||
func (c *Connection) Respond(id ID, result interface{}, err error) error {
|
||||
var req *incomingRequest
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
req = s.incomingByID[id]
|
||||
})
|
||||
if req == nil {
|
||||
return c.internalErrorf("Request not found for ID %v", id)
|
||||
}
|
||||
delete(pending, id)
|
||||
return c.respond(entry, result, rerr)
|
||||
|
||||
if err == ErrAsyncResponse {
|
||||
// Respond is supposed to supply the asynchronous response, so it would be
|
||||
// confusing to call Respond with an error that promises to call Respond
|
||||
// again.
|
||||
err = c.internalErrorf("Respond called with ErrAsyncResponse for %q", req.Method)
|
||||
}
|
||||
return c.processResult("Respond", req, result, err)
|
||||
}
|
||||
|
||||
// Cancel is used to cancel an inbound message by ID, it does not cancel
|
||||
// outgoing messages.
|
||||
// This is only used inside a message handler that is layering a
|
||||
// cancellation protocol on top of JSON RPC 2.
|
||||
// It will not complain if the ID is not a currently active message, and it will
|
||||
// not cause any messages that have not arrived yet with that ID to be
|
||||
// Cancel cancels the Context passed to the Handle call for the inbound message
|
||||
// with the given ID.
|
||||
//
|
||||
// Cancel will not complain if the ID is not a currently active message, and it
|
||||
// will not cause any messages that have not arrived yet with that ID to be
|
||||
// cancelled.
|
||||
func (c *Connection) Cancel(id ID) {
|
||||
pending := <-c.incoming
|
||||
defer func() { c.incoming <- pending }()
|
||||
if entry, found := pending[id]; found && entry.cancel != nil {
|
||||
entry.cancel()
|
||||
entry.cancel = nil
|
||||
var req *incomingRequest
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
req = s.incomingByID[id]
|
||||
})
|
||||
if req != nil {
|
||||
req.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait blocks until the connection is fully closed, but does not close it.
|
||||
func (c *Connection) Wait() error {
|
||||
return c.async.wait()
|
||||
err := <-c.closeErr
|
||||
c.closeErr <- err
|
||||
return err
|
||||
}
|
||||
|
||||
// Close can be used to close the underlying stream, and then wait for the connection to
|
||||
// fully shut down.
|
||||
// This does not cancel in flight requests, but waits for them to gracefully complete.
|
||||
// Close stops accepting new requests, waits for in-flight requests and enqueued
|
||||
// Handle calls to complete, and then closes the underlying stream.
|
||||
//
|
||||
// After the start of a Close, notification requests (that lack IDs and do not
|
||||
// receive responses) will continue to be passed to the Preempter, but calls
|
||||
// with IDs will receive immediate responses with ErrServerClosing, and no new
|
||||
// requests (not even notifications!) will be enqueued to the Handler.
|
||||
func (c *Connection) Close() error {
|
||||
// close the underlying stream
|
||||
c.closeOnce.Do(func() {
|
||||
if err := c.closer.Close(); err != nil {
|
||||
c.async.setError(err)
|
||||
}
|
||||
})
|
||||
// and then wait for it to cause the connection to close
|
||||
// Stop handling new requests, and interrupt the reader (by closing the
|
||||
// connection) as soon as the active requests finish.
|
||||
c.updateInFlight(func(s *inFlightState) { s.closing = true })
|
||||
|
||||
return c.Wait()
|
||||
}
|
||||
|
||||
// readIncoming collects inbound messages from the reader and delivers them, either responding
|
||||
// to outgoing calls or feeding requests to the queue.
|
||||
func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) (err error) {
|
||||
defer func() {
|
||||
// Retire any outgoing requests that were still in flight.
|
||||
// With the Reader no longer being processed, they necessarily cannot receive a response.
|
||||
outgoing := <-c.outgoing
|
||||
close(c.outgoing) // Prevent new outgoing requests, which would deadlock.
|
||||
for id, response := range outgoing {
|
||||
response <- &Response{ID: id, Error: err}
|
||||
}
|
||||
|
||||
close(toQueue)
|
||||
}()
|
||||
|
||||
func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter Preempter) {
|
||||
var err error
|
||||
for {
|
||||
// get the next message
|
||||
// no lock is needed, this is the only reader
|
||||
msg, n, err := reader.Read(ctx)
|
||||
var (
|
||||
msg Message
|
||||
n int64
|
||||
)
|
||||
msg, n, err = reader.Read(ctx)
|
||||
if err != nil {
|
||||
// The stream failed, we cannot continue
|
||||
if !isClosingError(err) {
|
||||
c.async.setError(err)
|
||||
}
|
||||
return err
|
||||
break
|
||||
}
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *Request:
|
||||
entry := &incoming{
|
||||
request: msg,
|
||||
}
|
||||
// add a span to the context for this request
|
||||
labels := append(make([]label.Label, 0, 3), // make space for the id if present
|
||||
tag.Method.Of(msg.Method),
|
||||
tag.RPCDirection.Of(tag.Inbound),
|
||||
)
|
||||
if msg.IsCall() {
|
||||
labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
|
||||
}
|
||||
entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...)
|
||||
event.Metric(entry.baseCtx,
|
||||
tag.Started.Of(1),
|
||||
tag.ReceivedBytes.Of(n))
|
||||
// in theory notifications cannot be cancelled, but we build them a cancel context anyway
|
||||
entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
|
||||
// if the request is a call, add it to the incoming map so it can be
|
||||
// cancelled by id
|
||||
if msg.IsCall() {
|
||||
pending := <-c.incoming
|
||||
pending[msg.ID] = entry
|
||||
c.incoming <- pending
|
||||
}
|
||||
// send the message to the incoming queue
|
||||
toQueue <- entry
|
||||
c.acceptRequest(ctx, msg, n, preempter)
|
||||
|
||||
case *Response:
|
||||
// If method is not set, this should be a response, in which case we must
|
||||
// have an id to send the response back to the caller.
|
||||
c.incomingResponse(msg)
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if ac, ok := s.outgoing[msg.ID]; ok {
|
||||
delete(s.outgoing, msg.ID)
|
||||
ac.retire(msg)
|
||||
} else {
|
||||
// TODO: How should we report unexpected responses?
|
||||
}
|
||||
})
|
||||
|
||||
default:
|
||||
c.internalErrorf("Read returned an unexpected message of type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
s.readErr = err
|
||||
|
||||
// Retire any outgoing requests that were still in flight: with the Reader no
|
||||
// longer being processed, they necessarily cannot receive a response.
|
||||
for id, ac := range s.outgoing {
|
||||
ac.retire(&Response{ID: id, Error: err})
|
||||
}
|
||||
s.outgoing = nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Connection) incomingResponse(msg *Response) {
|
||||
var response chan<- *Response
|
||||
if outgoing, ok := <-c.outgoing; ok {
|
||||
response = outgoing[msg.ID]
|
||||
delete(outgoing, msg.ID)
|
||||
c.outgoing <- outgoing
|
||||
// acceptRequest either handles msg synchronously or enqueues it to be handled
|
||||
// asynchronously.
|
||||
func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes int64, preempter Preempter) {
|
||||
// Add a span to the context for this request.
|
||||
labels := append(make([]label.Label, 0, 3), // Make space for the ID if present.
|
||||
tag.Method.Of(msg.Method),
|
||||
tag.RPCDirection.Of(tag.Inbound),
|
||||
)
|
||||
if msg.IsCall() {
|
||||
labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
|
||||
}
|
||||
if response != nil {
|
||||
response <- msg
|
||||
}
|
||||
}
|
||||
ctx, endSpan := event.Start(ctx, msg.Method, labels...)
|
||||
event.Metric(ctx,
|
||||
tag.Started.Of(1),
|
||||
tag.ReceivedBytes.Of(msgBytes))
|
||||
|
||||
// manageQueue reads incoming requests, attempts to process them with the preempter, or queue them
|
||||
// up for normal handling.
|
||||
func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
|
||||
defer close(toDeliver)
|
||||
q := []*incoming{}
|
||||
ok := true
|
||||
for {
|
||||
var nextReq *incoming
|
||||
if len(q) == 0 {
|
||||
// no messages in the queue
|
||||
// if we were closing, then we are done
|
||||
if !ok {
|
||||
// In theory notifications cannot be cancelled, but we build them a cancel
|
||||
// context anyway.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
req := &incomingRequest{
|
||||
Request: msg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
endSpan: endSpan,
|
||||
}
|
||||
|
||||
// If the request is a call, add it to the incoming map so it can be
|
||||
// cancelled (or responded) by ID.
|
||||
var err error
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
s.incoming++
|
||||
|
||||
if req.IsCall() {
|
||||
if s.incomingByID[req.ID] != nil {
|
||||
err = fmt.Errorf("%w: request ID %v already in use", ErrInvalidRequest, req.ID)
|
||||
req.ID = ID{} // Don't misattribute this error to the existing request.
|
||||
return
|
||||
}
|
||||
// not closing, but nothing in the queue, so just block waiting for a read
|
||||
nextReq, ok = <-fromRead
|
||||
} else {
|
||||
// we have a non empty queue, so pick whichever of reading or delivering
|
||||
// that we can make progress on
|
||||
select {
|
||||
case nextReq, ok = <-fromRead:
|
||||
case toDeliver <- q[0]:
|
||||
//TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
|
||||
q = q[1:]
|
||||
|
||||
if s.incomingByID == nil {
|
||||
s.incomingByID = make(map[ID]*incomingRequest)
|
||||
}
|
||||
s.incomingByID[req.ID] = req
|
||||
|
||||
if s.closing {
|
||||
// When closing, reject all new Call requests, even if they could
|
||||
// theoretically be handled by the preempter. The preempter could return
|
||||
// ErrAsyncResponse, which would increase the amount of work in flight
|
||||
// when we're trying to ensure that it strictly decreases.
|
||||
err = ErrServerClosing
|
||||
return
|
||||
}
|
||||
}
|
||||
if nextReq != nil {
|
||||
// TODO: should we allow to limit the queue size?
|
||||
var result interface{}
|
||||
rerr := nextReq.handleCtx.Err()
|
||||
if rerr == nil {
|
||||
// only preempt if not already cancelled
|
||||
result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
|
||||
}
|
||||
switch {
|
||||
case rerr == ErrNotHandled:
|
||||
// message not handled, add it to the queue for the main handler
|
||||
q = append(q, nextReq)
|
||||
case rerr == ErrAsyncResponse:
|
||||
// message handled but the response will come later
|
||||
default:
|
||||
// anything else means the message is fully handled
|
||||
c.reply(nextReq, result, rerr)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
c.processResult("acceptRequest", req, nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
if preempter != nil {
|
||||
result, err := preempter.Preempt(req.ctx, req.Request)
|
||||
|
||||
if req.IsCall() && errors.Is(err, ErrAsyncResponse) {
|
||||
// This request will remain in flight until Respond is called for it.
|
||||
return
|
||||
}
|
||||
|
||||
if !errors.Is(err, ErrNotHandled) {
|
||||
c.processResult("Preempt", req, result, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if s.closing {
|
||||
// If the connection is closing, don't enqueue anything to the handler — not
|
||||
// even notifications. That ensures that if the handler continues to make
|
||||
// progress, it will eventually become idle and close the connection.
|
||||
err = ErrServerClosing
|
||||
return
|
||||
}
|
||||
|
||||
// We enqueue requests that have not been preempted to an unbounded slice.
|
||||
// Unfortunately, we cannot in general limit the size of the handler
|
||||
// queue: we have to read every response that comes in on the wire
|
||||
// (because it may be responding to a request issued by, say, an
|
||||
// asynchronous handler), and in order to get to that response we have
|
||||
// to read all of the requests that came in ahead of it.
|
||||
s.handlerQueue = append(s.handlerQueue, req)
|
||||
if !s.handlerRunning {
|
||||
// We start the handleAsync goroutine when it has work to do, and let it
|
||||
// exit when the queue empties.
|
||||
//
|
||||
// Otherwise, in order to synchronize the handler we would need some other
|
||||
// goroutine (probably readIncoming?) to explicitly wait for handleAsync
|
||||
// to finish, and that would complicate error reporting: either the error
|
||||
// report from the goroutine would be blocked on the handler emptying its
|
||||
// queue (which was tried, and introduced a deadlock detected by
|
||||
// TestCloseCallRace), or the error would need to be reported separately
|
||||
// from synchronizing completion. Allowing the handler goroutine to exit
|
||||
// when idle seems simpler than trying to implement either of those
|
||||
// alternatives correctly.
|
||||
s.handlerRunning = true
|
||||
go c.handleAsync()
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
c.processResult("acceptRequest", req, nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
|
||||
defer func() {
|
||||
// Close the underlying ReadWriteCloser if not already closed. We're about
|
||||
// to mark the Connection as done, so we'd better actually be done! 😅
|
||||
//
|
||||
// TODO(bcmills): This is actually a bit premature, since we may have
|
||||
// asynchronous handlers still in flight at this point, but it's at least no
|
||||
// more premature than calling c.async.done at this point (which we were
|
||||
// already doing). This will get a proper fix in https://go.dev/cl/388134.
|
||||
c.closeOnce.Do(func() {
|
||||
if err := c.closer.Close(); err != nil {
|
||||
c.async.setError(err)
|
||||
// handleAsync invokes the handler on the requests in the handler queue
|
||||
// sequentially until the queue is empty.
|
||||
func (c *Connection) handleAsync() {
|
||||
for {
|
||||
var req *incomingRequest
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if len(s.handlerQueue) > 0 {
|
||||
req, s.handlerQueue = s.handlerQueue[0], s.handlerQueue[1:]
|
||||
} else {
|
||||
s.handlerRunning = false
|
||||
}
|
||||
})
|
||||
if req == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.async.done()
|
||||
}()
|
||||
|
||||
for entry := range fromQueue {
|
||||
// cancel any messages in the queue that we have a pending cancel for
|
||||
var result interface{}
|
||||
rerr := entry.handleCtx.Err()
|
||||
if rerr == nil {
|
||||
// only deliver if not already cancelled
|
||||
result, rerr = handler.Handle(entry.handleCtx, entry.request)
|
||||
}
|
||||
switch {
|
||||
case rerr == ErrNotHandled:
|
||||
// message not handled, report it back to the caller as an error
|
||||
c.reply(entry, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
|
||||
case rerr == ErrAsyncResponse:
|
||||
// message handled but the response will come later
|
||||
default:
|
||||
c.reply(entry, result, rerr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reply is used to reply to an incoming request that has just been handled
|
||||
func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
|
||||
if entry.request.IsCall() {
|
||||
// we have a call finishing, remove it from the incoming map
|
||||
pending := <-c.incoming
|
||||
defer func() { c.incoming <- pending }()
|
||||
delete(pending, entry.request.ID)
|
||||
}
|
||||
if err := c.respond(entry, result, rerr); err != nil {
|
||||
// no way to propagate this error
|
||||
//TODO: should we do more than just log it?
|
||||
event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err)
|
||||
}
|
||||
}
|
||||
|
||||
// respond sends a response.
|
||||
// This is the code shared between reply and SendResponse.
|
||||
func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
|
||||
var err error
|
||||
if entry.request.IsCall() {
|
||||
// send the response
|
||||
if result == nil && rerr == nil {
|
||||
// call with no response, send an error anyway
|
||||
rerr = fmt.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
|
||||
}
|
||||
var response *Response
|
||||
response, err = NewResponse(entry.request.ID, result, rerr)
|
||||
err := req.ctx.Err()
|
||||
if err == nil {
|
||||
// we write the response with the base context, in case the message was cancelled
|
||||
err = c.write(entry.baseCtx, response)
|
||||
// Only deliver to the Handler if not already cancelled.
|
||||
result, err = c.handler.Handle(req.ctx, req.Request)
|
||||
}
|
||||
} else {
|
||||
switch {
|
||||
case rerr != nil:
|
||||
// notification failed
|
||||
err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
|
||||
rerr = nil
|
||||
case result != nil:
|
||||
//notification produced a response, which is an error
|
||||
err = fmt.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
|
||||
default:
|
||||
// normal notification finish
|
||||
c.processResult(c.handler, req, result, err)
|
||||
}
|
||||
}
|
||||
|
||||
// processResult processes the result of a request and, if appropriate, sends a response.
|
||||
func (c *Connection) processResult(from interface{}, req *incomingRequest, result interface{}, err error) error {
|
||||
switch err {
|
||||
case ErrAsyncResponse:
|
||||
if !req.IsCall() {
|
||||
return c.internalErrorf("%#v returned ErrAsyncResponse for a %q Request without an ID", from, req.Method)
|
||||
}
|
||||
return nil // This request is still in flight, so don't record the result yet.
|
||||
case ErrNotHandled, ErrMethodNotFound:
|
||||
// Add detail describing the unhandled method.
|
||||
err = fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method)
|
||||
}
|
||||
|
||||
if req.endSpan == nil {
|
||||
return c.internalErrorf("%#v produced a duplicate %q Response", from, req.Method)
|
||||
}
|
||||
|
||||
if result != nil && err != nil {
|
||||
c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v", from, req.Method, err, result)
|
||||
result = nil // Discard the spurious result and respond with err.
|
||||
}
|
||||
|
||||
if req.IsCall() {
|
||||
if result == nil && err == nil {
|
||||
err = c.internalErrorf("%#v returned a nil result and nil error for a %q Request that requires a Response", from, req.Method)
|
||||
}
|
||||
|
||||
response, respErr := NewResponse(req.ID, result, err)
|
||||
|
||||
// The caller could theoretically reuse the request's ID as soon as we've
|
||||
// sent the response, so ensure that it is removed from the incoming map
|
||||
// before sending.
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
delete(s.incomingByID, req.ID)
|
||||
})
|
||||
if respErr == nil {
|
||||
writeErr := c.write(notDone{req.ctx}, response)
|
||||
if err == nil {
|
||||
err = writeErr
|
||||
}
|
||||
} else {
|
||||
err = c.internalErrorf("%#v returned a malformed result for %q: %w", from, req.Method, respErr)
|
||||
}
|
||||
} else { // req is a notification
|
||||
if result != nil {
|
||||
err = c.internalErrorf("%#v returned a non-nil result for a %q Request without an ID", from, req.Method)
|
||||
} else if err != nil {
|
||||
err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, req.Method, err)
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: can/should we do anything with this error beyond writing it to the event log?
|
||||
// (Is this the right label to attach to the log?)
|
||||
event.Label(req.ctx, keys.Err.Of(err))
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case rerr != nil || err != nil:
|
||||
event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
event.Label(entry.baseCtx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
// and just to be clean, invoke and clear the cancel if needed
|
||||
if entry.cancel != nil {
|
||||
entry.cancel()
|
||||
entry.cancel = nil
|
||||
}
|
||||
// mark the entire request processing as done
|
||||
entry.done()
|
||||
return err
|
||||
|
||||
labelStatus(req.ctx, err)
|
||||
|
||||
// Cancel the request and finalize the event span to free any associated resources.
|
||||
req.cancel()
|
||||
req.endSpan()
|
||||
req.endSpan = nil
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if s.incoming == 0 {
|
||||
panic("jsonrpc2_v2: processResult called when incoming count is already zero")
|
||||
}
|
||||
s.incoming--
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// write is used by all things that write outgoing messages, including replies.
|
||||
|
|
@ -540,5 +631,46 @@ func (c *Connection) write(ctx context.Context, msg Message) error {
|
|||
defer func() { c.writer <- writer }()
|
||||
n, err := writer.Write(ctx, msg)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
|
||||
// TODO: if err != nil, that suggests that future writes will not succeed,
|
||||
// so we cannot possibly write the results of incoming Call requests.
|
||||
// If the read side of the connection is also broken, we also might not have
|
||||
// a way to receive cancellation notifications.
|
||||
//
|
||||
// Should we cancel the pending calls implicitly?
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// internalErrorf reports an internal error. By default it panics, but if
|
||||
// c.onInternalError is non-nil it instead calls that and returns an error
|
||||
// wrapping ErrInternal.
|
||||
func (c *Connection) internalErrorf(format string, args ...interface{}) error {
|
||||
err := fmt.Errorf(format, args...)
|
||||
if c.onInternalError == nil {
|
||||
panic("jsonrpc2: " + err.Error())
|
||||
}
|
||||
c.onInternalError(err)
|
||||
|
||||
return fmt.Errorf("%w: %v", ErrInternal, err)
|
||||
}
|
||||
|
||||
// labelStatus labels the status of the event in ctx based on whether err is nil.
|
||||
func labelStatus(ctx context.Context, err error) {
|
||||
if err == nil {
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
} else {
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
}
|
||||
}
|
||||
|
||||
// notDone is a context.Context wrapper that returns a nil Done channel.
|
||||
type notDone struct{ ctx context.Context }
|
||||
|
||||
func (ic notDone) Value(key interface{}) interface{} {
|
||||
return ic.ctx.Value(key)
|
||||
}
|
||||
|
||||
func (notDone) Done() <-chan struct{} { return nil }
|
||||
func (notDone) Err() error { return nil }
|
||||
func (notDone) Deadline() (time.Time, bool) { return time.Time{}, false }
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// This file contains implementations of the transport primitives that use the standard network
|
||||
|
|
@ -36,7 +35,7 @@ type netListener struct {
|
|||
}
|
||||
|
||||
// Accept blocks waiting for an incoming connection to the listener.
|
||||
func (l *netListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
func (l *netListener) Accept(context.Context) (io.ReadWriteCloser, error) {
|
||||
return l.net.Accept()
|
||||
}
|
||||
|
||||
|
|
@ -56,9 +55,7 @@ func (l *netListener) Close() error {
|
|||
|
||||
// Dialer returns a dialer that can be used to connect to the listener.
|
||||
func (l *netListener) Dialer() Dialer {
|
||||
return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
})
|
||||
return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{})
|
||||
}
|
||||
|
||||
// NetDialer returns a Dialer using the supplied standard network dialer.
|
||||
|
|
@ -98,15 +95,19 @@ type netPiper struct {
|
|||
}
|
||||
|
||||
// Accept blocks waiting for an incoming connection to the listener.
|
||||
func (l *netPiper) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
// block until we have a listener, or are closed or cancelled
|
||||
func (l *netPiper) Accept(context.Context) (io.ReadWriteCloser, error) {
|
||||
// Block until the pipe is dialed or the listener is closed,
|
||||
// preferring the latter if already closed at the start of Accept.
|
||||
select {
|
||||
case <-l.done:
|
||||
return nil, errClosed
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case rwc := <-l.dialed:
|
||||
return rwc, nil
|
||||
case <-l.done:
|
||||
return nil, io.EOF
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return nil, errClosed
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -124,6 +125,14 @@ func (l *netPiper) Dialer() Dialer {
|
|||
|
||||
func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
client, server := net.Pipe()
|
||||
l.dialed <- server
|
||||
return client, nil
|
||||
|
||||
select {
|
||||
case l.dialed <- server:
|
||||
return client, nil
|
||||
|
||||
case <-l.done:
|
||||
client.Close()
|
||||
server.Close()
|
||||
return nil, errClosed
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,6 +105,8 @@ func (s *Server) run(ctx context.Context) {
|
|||
if err != nil {
|
||||
if !isClosingError(err) {
|
||||
s.async.setError(err)
|
||||
s.listener.Close()
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -120,10 +122,12 @@ func (s *Server) run(ctx context.Context) {
|
|||
func onlyActive(conns []*Connection) []*Connection {
|
||||
i := 0
|
||||
for _, c := range conns {
|
||||
if !c.async.isDone() {
|
||||
conns[i] = c
|
||||
i++
|
||||
}
|
||||
c.updateInFlight(func(s *inFlightState) {
|
||||
if !s.closed {
|
||||
conns[i] = c
|
||||
i++
|
||||
}
|
||||
})
|
||||
}
|
||||
// trim the slice down
|
||||
return conns[:i]
|
||||
|
|
@ -151,10 +155,7 @@ func isClosingError(err error) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Per https://github.com/golang/go/issues/4373, this error string should not
|
||||
// change. This is not ideal, but since the worst that could happen here is
|
||||
// some superfluous logging, it is acceptable.
|
||||
if err.Error() == "use of closed network connection" {
|
||||
if isErrClosed(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,19 @@
|
|||
// Copyright 2022 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build go1.16
|
||||
// +build go1.16
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
)
|
||||
|
||||
var errClosed = net.ErrClosed
|
||||
|
||||
func isErrClosed(err error) bool {
|
||||
return errors.Is(err, errClosed)
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !go1.16
|
||||
// +build !go1.16
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// errClosed is an error with the same string as net.ErrClosed,
|
||||
// which was added in Go 1.16.
|
||||
var errClosed = errors.New("use of closed network connection")
|
||||
|
||||
// isErrClosed reports whether err ends in the same string as errClosed.
|
||||
func isErrClosed(err error) bool {
|
||||
// As of Go 1.16, this could be 'errors.Is(err, net.ErrClosing)', but
|
||||
// unfortunately gopls still requires compatiblity with
|
||||
// (otherwise-unsupported) older Go versions.
|
||||
//
|
||||
// In the meantime, this error strirng has not changed on any supported Go
|
||||
// version, and is not expected to change in the future.
|
||||
// This is not ideal, but since the worst that could happen here is some
|
||||
// superfluous logging, it is acceptable.
|
||||
return strings.HasSuffix(err.Error(), "use of closed network connection")
|
||||
}
|
||||
|
|
@ -230,7 +230,7 @@ func TestIdleListenerAcceptCloseRace(t *testing.T) {
|
|||
watchdog := time.Duration(n) * 1000 * time.Millisecond
|
||||
timer := time.AfterFunc(watchdog, func() {
|
||||
debug.SetTraceback("all")
|
||||
panic(fmt.Sprintf("TestAcceptCloseRace deadlocked after %v", watchdog))
|
||||
panic(fmt.Sprintf("%s deadlocked after %v", t.Name(), watchdog))
|
||||
})
|
||||
defer timer.Stop()
|
||||
|
||||
|
|
@ -261,3 +261,84 @@ func TestIdleListenerAcceptCloseRace(t *testing.T) {
|
|||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
// TestCloseCallRace checks for a race resulting in a deadlock when a Call on
|
||||
// one side of the connection races with a Close (or otherwise broken
|
||||
// connection) initiated from the other side.
|
||||
//
|
||||
// (The Call method was waiting for a result from the Read goroutine to
|
||||
// determine which error value to return, but the Read goroutine was waiting for
|
||||
// in-flight calls to complete before reporting that result.)
|
||||
func TestCloseCallRace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := 10
|
||||
|
||||
watchdog := time.Duration(n) * 1000 * time.Millisecond
|
||||
timer := time.AfterFunc(watchdog, func() {
|
||||
debug.SetTraceback("all")
|
||||
panic(fmt.Sprintf("%s deadlocked after %v", t.Name(), watchdog))
|
||||
})
|
||||
defer timer.Stop()
|
||||
|
||||
for ; n > 0; n-- {
|
||||
listener, err := jsonrpc2.NetPipeListener(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pokec := make(chan *jsonrpc2.AsyncCall, 1)
|
||||
|
||||
s, err := jsonrpc2.Serve(ctx, listener, jsonrpc2.BinderFunc(func(_ context.Context, srvConn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) {
|
||||
h := jsonrpc2.HandlerFunc(func(ctx context.Context, _ *jsonrpc2.Request) (interface{}, error) {
|
||||
// Start a concurrent call from the server to the client.
|
||||
// The point of this test is to ensure this doesn't deadlock
|
||||
// if the client shuts down the connection concurrently.
|
||||
//
|
||||
// The racing Call may or may not receive a response: it should get a
|
||||
// response if it is sent before the client closes the connection, and
|
||||
// it should fail with some kind of "connection closed" error otherwise.
|
||||
go func() {
|
||||
pokec <- srvConn.Call(ctx, "poke", nil)
|
||||
}()
|
||||
|
||||
return &msg{"pong"}, nil
|
||||
})
|
||||
return jsonrpc2.ConnectionOptions{Handler: h}, nil
|
||||
}))
|
||||
if err != nil {
|
||||
listener.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dialConn, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{})
|
||||
if err != nil {
|
||||
listener.Close()
|
||||
s.Wait()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Calling any method on the server should provoke it to asynchronously call
|
||||
// us back. While it is starting that call, we will close the connection.
|
||||
if err := dialConn.Call(ctx, "ping", nil).Await(ctx, nil); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := dialConn.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Ensure that the Call on the server side did not block forever when the
|
||||
// connection closed.
|
||||
pokeCall := <-pokec
|
||||
if err := pokeCall.Await(ctx, nil); err == nil {
|
||||
t.Errorf("unexpected nil error from server-initited call")
|
||||
} else if errors.Is(err, jsonrpc2.ErrMethodNotFound) {
|
||||
// The call completed before the Close reached the handler.
|
||||
} else {
|
||||
// The error was something else.
|
||||
t.Logf("server-initiated call completed with expected error: %v", err)
|
||||
}
|
||||
|
||||
listener.Close()
|
||||
s.Wait()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue