diff --git a/internal/jsonrpc2_v2/conn.go b/internal/jsonrpc2_v2/conn.go index 0215c67af7..f010d9a055 100644 --- a/internal/jsonrpc2_v2/conn.go +++ b/internal/jsonrpc2_v2/conn.go @@ -51,17 +51,17 @@ type Connection struct { closeOnce sync.Once closer io.Closer - writerBox chan Writer - outgoingBox chan map[ID]chan<- *Response - incomingBox chan map[ID]*incoming - async *async + writer chan Writer + outgoing chan map[ID]chan<- *Response + incoming chan map[ID]*incoming + async *async } type AsyncCall struct { - id ID - response chan *Response // the channel a response will be delivered on - resultBox chan asyncResult - endSpan func() // close the tracing span when all processing for the message is complete + id ID + response chan *Response // the channel a response will be delivered on + result chan asyncResult + endSpan func() // close the tracing span when all processing for the message is complete } type asyncResult struct { @@ -87,11 +87,11 @@ func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions // This is used by the Dial and Serve functions to build the actual connection. func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { c := &Connection{ - closer: rwc, - writerBox: make(chan Writer, 1), - outgoingBox: make(chan map[ID]chan<- *Response, 1), - incomingBox: make(chan map[ID]*incoming, 1), - async: newAsync(), + closer: rwc, + writer: make(chan Writer, 1), + outgoing: make(chan map[ID]chan<- *Response, 1), + incoming: make(chan map[ID]*incoming, 1), + async: newAsync(), } options, err := binder.Bind(ctx, c) @@ -107,8 +107,8 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) ( if options.Handler == nil { options.Handler = defaultHandler{} } - c.outgoingBox <- make(map[ID]chan<- *Response) - c.incomingBox <- make(map[ID]*incoming) + c.outgoing <- make(map[ID]chan<- *Response) + c.incoming <- make(map[ID]*incoming) // the goroutines started here will continue until the underlying stream is closed reader := options.Framer.Reader(rwc) readToQueue := make(chan *incoming) @@ -119,7 +119,7 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) ( // releaseing the writer must be the last thing we do in case any requests // are blocked waiting for the connection to be ready - c.writerBox <- options.Framer.Writer(rwc) + c.writer <- options.Framer.Writer(rwc) return c, nil } @@ -154,14 +154,14 @@ func (c *Connection) Notify(ctx context.Context, method string, params interface // If sending the call failed, the response will be ready and have the error in it. func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { result := &AsyncCall{ - id: Int64ID(atomic.AddInt64(&c.seq, 1)), - resultBox: make(chan asyncResult, 1), + id: Int64ID(atomic.AddInt64(&c.seq, 1)), + result: make(chan asyncResult, 1), } // generate a new request identifier call, err := NewCall(result.id, method, params) if err != nil { //set the result to failed - result.resultBox <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)} + result.result <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)} return result } ctx, endSpan := event.Start(ctx, method, @@ -175,7 +175,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{} // are racing the response. // rchan is buffered in case the response arrives without a listener. result.response = make(chan *Response, 1) - outgoing, ok := <-c.outgoingBox + outgoing, ok := <-c.outgoing if !ok { // If the call failed due to (say) an I/O error or broken pipe, attribute it // as such. (If the error is nil, then the connection must have been shut @@ -193,7 +193,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{} return result } outgoing[result.id] = result.response - c.outgoingBox <- outgoing + c.outgoing <- outgoing // now we are ready to send if err := c.write(ctx, call); err != nil { // sending failed, we will never get a response, so deliver a fake one @@ -212,8 +212,8 @@ func (a *AsyncCall) ID() ID { return a.id } // returned, or a call that failed to send in the first place. func (a *AsyncCall) IsReady() bool { select { - case r := <-a.resultBox: - a.resultBox <- r + case r := <-a.result: + a.result <- r return true default: return false @@ -236,14 +236,14 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { r.result = response.Result event.Label(ctx, tag.StatusCode.Of("OK")) } - case r = <-a.resultBox: + case r = <-a.result: // result already available case <-ctx.Done(): event.Label(ctx, tag.StatusCode.Of("CANCELLED")) return ctx.Err() } // refill the box for the next caller - a.resultBox <- r + a.result <- r // and unpack the result if r.err != nil { return r.err @@ -259,8 +259,8 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { // Respond must be called exactly once for any message for which a handler // returns ErrAsyncResponse. It must not be called for any other message. func (c *Connection) Respond(id ID, result interface{}, rerr error) error { - pending := <-c.incomingBox - defer func() { c.incomingBox <- pending }() + pending := <-c.incoming + defer func() { c.incoming <- pending }() entry, found := pending[id] if !found { return nil @@ -277,8 +277,8 @@ func (c *Connection) Respond(id ID, result interface{}, rerr error) error { // not cause any messages that have not arrived yet with that ID to be // cancelled. func (c *Connection) Cancel(id ID) { - pending := <-c.incomingBox - defer func() { c.incomingBox <- pending }() + pending := <-c.incoming + defer func() { c.incoming <- pending }() if entry, found := pending[id]; found && entry.cancel != nil { entry.cancel() entry.cancel = nil @@ -310,10 +310,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch defer func() { // Retire any outgoing requests that were still in flight. // With the Reader no longer being processed, they necessarily cannot receive a response. - outgoing := <-c.outgoingBox - close(c.outgoingBox) // Prevent new outgoing requests, which would deadlock. - for id, responseBox := range outgoing { - responseBox <- &Response{ID: id, Error: err} + outgoing := <-c.outgoing + close(c.outgoing) // Prevent new outgoing requests, which would deadlock. + for id, response := range outgoing { + response <- &Response{ID: id, Error: err} } close(toQueue) @@ -352,9 +352,9 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch // if the request is a call, add it to the incoming map so it can be // cancelled by id if msg.IsCall() { - pending := <-c.incomingBox + pending := <-c.incoming pending[msg.ID] = entry - c.incomingBox <- pending + c.incoming <- pending } // send the message to the incoming queue toQueue <- entry @@ -368,10 +368,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch func (c *Connection) incomingResponse(msg *Response) { var response chan<- *Response - if outgoing, ok := <-c.outgoingBox; ok { + if outgoing, ok := <-c.outgoing; ok { response = outgoing[msg.ID] delete(outgoing, msg.ID) - c.outgoingBox <- outgoing + c.outgoing <- outgoing } if response != nil { response <- msg @@ -468,8 +468,8 @@ func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQ func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { if entry.request.IsCall() { // we have a call finishing, remove it from the incoming map - pending := <-c.incomingBox - defer func() { c.incomingBox <- pending }() + pending := <-c.incoming + defer func() { c.incoming <- pending }() delete(pending, entry.request.ID) } if err := c.respond(entry, result, rerr); err != nil { @@ -527,8 +527,8 @@ func (c *Connection) respond(entry *incoming, result interface{}, rerr error) er // write is used by all things that write outgoing messages, including replies. // it makes sure that writes are atomic func (c *Connection) write(ctx context.Context, msg Message) error { - writer := <-c.writerBox - defer func() { c.writerBox <- writer }() + writer := <-c.writer + defer func() { c.writer <- writer }() n, err := writer.Write(ctx, msg) event.Metric(ctx, tag.SentBytes.Of(n)) return err diff --git a/internal/jsonrpc2_v2/jsonrpc2_test.go b/internal/jsonrpc2_v2/jsonrpc2_test.go index 8e90c235f9..c63a6a982b 100644 --- a/internal/jsonrpc2_v2/jsonrpc2_test.go +++ b/internal/jsonrpc2_v2/jsonrpc2_test.go @@ -77,7 +77,7 @@ type binder struct { type handler struct { conn *jsonrpc2.Connection accumulator int - waitersBox chan map[string]chan struct{} + waiters chan map[string]chan struct{} calls map[string]*jsonrpc2.AsyncCall } @@ -256,11 +256,11 @@ func verifyResults(t *testing.T, method string, results interface{}, expect inte func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) { h := &handler{ - conn: conn, - waitersBox: make(chan map[string]chan struct{}, 1), - calls: make(map[string]*jsonrpc2.AsyncCall), + conn: conn, + waiters: make(chan map[string]chan struct{}, 1), + calls: make(map[string]*jsonrpc2.AsyncCall), } - h.waitersBox <- make(map[string]chan struct{}) + h.waiters <- make(map[string]chan struct{}) if b.runTest != nil { go b.runTest(h) } @@ -272,8 +272,8 @@ func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.C } func (h *handler) waiter(name string) chan struct{} { - waiters := <-h.waitersBox - defer func() { h.waitersBox <- waiters }() + waiters := <-h.waiters + defer func() { h.waiters <- waiters }() waiter, found := waiters[name] if !found { waiter = make(chan struct{})