mirror of https://github.com/golang/go.git
329 lines
9.7 KiB
Go
329 lines
9.7 KiB
Go
// Copyright 2020 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package jsonrpc2
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Listener is implemented by protocols to accept new inbound connections.
|
|
type Listener interface {
|
|
// Accept accepts an inbound connection to a server.
|
|
// It blocks until either an inbound connection is made, or the listener is closed.
|
|
Accept(context.Context) (io.ReadWriteCloser, error)
|
|
|
|
// Close closes the listener.
|
|
// Any blocked Accept or Dial operations will unblock and return errors.
|
|
Close() error
|
|
|
|
// Dialer returns a dialer that can be used to connect to this listener
|
|
// locally.
|
|
// If a listener does not implement this it will return nil.
|
|
Dialer() Dialer
|
|
}
|
|
|
|
// Dialer is used by clients to dial a server.
|
|
type Dialer interface {
|
|
// Dial returns a new communication byte stream to a listening server.
|
|
Dial(ctx context.Context) (io.ReadWriteCloser, error)
|
|
}
|
|
|
|
// Server is a running server that is accepting incoming connections.
|
|
type Server struct {
|
|
listener Listener
|
|
binder Binder
|
|
async *async
|
|
|
|
shutdownOnce sync.Once
|
|
closing int32 // atomic: set to nonzero when Shutdown is called
|
|
}
|
|
|
|
// Dial uses the dialer to make a new connection, wraps the returned
|
|
// reader and writer using the framer to make a stream, and then builds
|
|
// a connection on top of that stream using the binder.
|
|
//
|
|
// The returned Connection will operate independently using the Preempter and/or
|
|
// Handler provided by the Binder, and will release its own resources when the
|
|
// connection is broken, but the caller may Close it earlier to stop accepting
|
|
// (or sending) new requests.
|
|
func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
|
|
// dial a server
|
|
rwc, err := dialer.Dial(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newConnection(ctx, rwc, binder, nil), nil
|
|
}
|
|
|
|
// NewServer starts a new server listening for incoming connections and returns
|
|
// it.
|
|
// This returns a fully running and connected server, it does not block on
|
|
// the listener.
|
|
// You can call Wait to block on the server, or Shutdown to get the sever to
|
|
// terminate gracefully.
|
|
// To notice incoming connections, use an intercepting Binder.
|
|
func NewServer(ctx context.Context, listener Listener, binder Binder) *Server {
|
|
server := &Server{
|
|
listener: listener,
|
|
binder: binder,
|
|
async: newAsync(),
|
|
}
|
|
go server.run(ctx)
|
|
return server
|
|
}
|
|
|
|
// Wait returns only when the server has shut down.
|
|
func (s *Server) Wait() error {
|
|
return s.async.wait()
|
|
}
|
|
|
|
// Shutdown informs the server to stop accepting new connections.
|
|
func (s *Server) Shutdown() {
|
|
s.shutdownOnce.Do(func() {
|
|
atomic.StoreInt32(&s.closing, 1)
|
|
s.listener.Close()
|
|
})
|
|
}
|
|
|
|
// run accepts incoming connections from the listener,
|
|
// If IdleTimeout is non-zero, run exits after there are no clients for this
|
|
// duration, otherwise it exits only on error.
|
|
func (s *Server) run(ctx context.Context) {
|
|
defer s.async.done()
|
|
|
|
var activeConns sync.WaitGroup
|
|
for {
|
|
rwc, err := s.listener.Accept(ctx)
|
|
if err != nil {
|
|
// Only Shutdown closes the listener. If we get an error after Shutdown is
|
|
// called, assume that that was the cause and don't report the error;
|
|
// otherwise, report the error in case it is unexpected.
|
|
if atomic.LoadInt32(&s.closing) == 0 {
|
|
s.async.setError(err)
|
|
}
|
|
// We are done generating new connections for good.
|
|
break
|
|
}
|
|
|
|
// A new inbound connection.
|
|
activeConns.Add(1)
|
|
_ = newConnection(ctx, rwc, s.binder, activeConns.Done) // unregisters itself when done
|
|
}
|
|
activeConns.Wait()
|
|
}
|
|
|
|
// NewIdleListener wraps a listener with an idle timeout.
|
|
//
|
|
// When there are no active connections for at least the timeout duration,
|
|
// calls to Accept will fail with ErrIdleTimeout.
|
|
//
|
|
// A connection is considered inactive as soon as its Close method is called.
|
|
func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
|
|
l := &idleListener{
|
|
wrapped: wrap,
|
|
timeout: timeout,
|
|
active: make(chan int, 1),
|
|
timedOut: make(chan struct{}),
|
|
idleTimer: make(chan *time.Timer, 1),
|
|
}
|
|
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
|
|
return l
|
|
}
|
|
|
|
type idleListener struct {
|
|
wrapped Listener
|
|
timeout time.Duration
|
|
|
|
// Only one of these channels is receivable at any given time.
|
|
active chan int // count of active connections; closed when Close is called if not timed out
|
|
timedOut chan struct{} // closed when the idle timer expires
|
|
idleTimer chan *time.Timer // holds the timer only when idle
|
|
}
|
|
|
|
// Accept accepts an incoming connection.
|
|
//
|
|
// If an incoming connection is accepted concurrent to the listener being closed
|
|
// due to idleness, the new connection is immediately closed.
|
|
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
|
|
rwc, err := l.wrapped.Accept(ctx)
|
|
|
|
select {
|
|
case n, ok := <-l.active:
|
|
if err != nil {
|
|
if ok {
|
|
l.active <- n
|
|
}
|
|
return nil, err
|
|
}
|
|
if ok {
|
|
l.active <- n + 1
|
|
} else {
|
|
// l.wrapped.Close Close has been called, but Accept returned a
|
|
// connection. This race can occur with concurrent Accept and Close calls
|
|
// with any net.Listener, and it is benign: since the listener was closed
|
|
// explicitly, it can't have also timed out.
|
|
}
|
|
return l.newConn(rwc), nil
|
|
|
|
case <-l.timedOut:
|
|
if err == nil {
|
|
// Keeping the connection open would leave the listener simultaneously
|
|
// active and closed due to idleness, which would be contradictory and
|
|
// confusing. Close the connection and pretend that it never happened.
|
|
rwc.Close()
|
|
} else {
|
|
// In theory the timeout could have raced with an unrelated error return
|
|
// from Accept. However, ErrIdleTimeout is arguably still valid (since we
|
|
// would have closed due to the timeout independent of the error), and the
|
|
// harm from returning a spurious ErrIdleTimeout is negliglible anyway.
|
|
}
|
|
return nil, ErrIdleTimeout
|
|
|
|
case timer := <-l.idleTimer:
|
|
if err != nil {
|
|
// The idle timer doesn't run until it receives itself from the idleTimer
|
|
// channel, so it can't have called l.wrapped.Close yet and thus err can't
|
|
// be ErrIdleTimeout. Leave the idle timer as it was and return whatever
|
|
// error we got.
|
|
l.idleTimer <- timer
|
|
return nil, err
|
|
}
|
|
|
|
if !timer.Stop() {
|
|
// Failed to stop the timer — the timer goroutine is in the process of
|
|
// firing. Send the timer back to the timer goroutine so that it can
|
|
// safely close the timedOut channel, and then wait for the listener to
|
|
// actually be closed before we return ErrIdleTimeout.
|
|
l.idleTimer <- timer
|
|
rwc.Close()
|
|
<-l.timedOut
|
|
return nil, ErrIdleTimeout
|
|
}
|
|
|
|
l.active <- 1
|
|
return l.newConn(rwc), nil
|
|
}
|
|
}
|
|
|
|
func (l *idleListener) Close() error {
|
|
select {
|
|
case _, ok := <-l.active:
|
|
if ok {
|
|
close(l.active)
|
|
}
|
|
|
|
case <-l.timedOut:
|
|
// Already closed by the timer; take care not to double-close if the caller
|
|
// only explicitly invokes this Close method once, since the io.Closer
|
|
// interface explicitly leaves doubled Close calls undefined.
|
|
return ErrIdleTimeout
|
|
|
|
case timer := <-l.idleTimer:
|
|
if !timer.Stop() {
|
|
// Couldn't stop the timer. It shouldn't take long to run, so just wait
|
|
// (so that the Listener is guaranteed to be closed before we return)
|
|
// and pretend that this call happened afterward.
|
|
// That way we won't leak any timers or goroutines when Close returns.
|
|
l.idleTimer <- timer
|
|
<-l.timedOut
|
|
return ErrIdleTimeout
|
|
}
|
|
close(l.active)
|
|
}
|
|
|
|
return l.wrapped.Close()
|
|
}
|
|
|
|
func (l *idleListener) Dialer() Dialer {
|
|
return l.wrapped.Dialer()
|
|
}
|
|
|
|
func (l *idleListener) timerExpired() {
|
|
select {
|
|
case n, ok := <-l.active:
|
|
if ok {
|
|
panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n))
|
|
} else {
|
|
panic("jsonrpc2: Close finished with idle timer still running")
|
|
}
|
|
|
|
case <-l.timedOut:
|
|
panic("jsonrpc2: idleListener idle timer fired more than once")
|
|
|
|
case <-l.idleTimer:
|
|
// The timer for this very call!
|
|
}
|
|
|
|
// Close the Listener with all channels still blocked to ensure that this call
|
|
// to l.wrapped.Close doesn't race with the one in l.Close.
|
|
defer close(l.timedOut)
|
|
l.wrapped.Close()
|
|
}
|
|
|
|
func (l *idleListener) connClosed() {
|
|
select {
|
|
case n, ok := <-l.active:
|
|
if !ok {
|
|
// l is already closed, so it can't close due to idleness,
|
|
// and we don't need to track the number of active connections any more.
|
|
return
|
|
}
|
|
n--
|
|
if n == 0 {
|
|
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
|
|
} else {
|
|
l.active <- n
|
|
}
|
|
|
|
case <-l.timedOut:
|
|
panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")
|
|
|
|
case <-l.idleTimer:
|
|
panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
|
|
}
|
|
}
|
|
|
|
type idleListenerConn struct {
|
|
wrapped io.ReadWriteCloser
|
|
l *idleListener
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn {
|
|
c := &idleListenerConn{
|
|
wrapped: rwc,
|
|
l: l,
|
|
}
|
|
|
|
// A caller that forgets to call Close may disrupt the idleListener's
|
|
// accounting, even though the file descriptor for the underlying connection
|
|
// may eventually be garbage-collected anyway.
|
|
//
|
|
// Set a (best-effort) finalizer to verify that a Close call always occurs.
|
|
// (We will clear the finalizer explicitly in Close.)
|
|
runtime.SetFinalizer(c, func(c *idleListenerConn) {
|
|
panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
|
|
})
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) }
|
|
func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) }
|
|
|
|
func (c *idleListenerConn) Close() error {
|
|
defer c.closeOnce.Do(func() {
|
|
c.l.connClosed()
|
|
runtime.SetFinalizer(c, nil)
|
|
})
|
|
return c.wrapped.Close()
|
|
}
|