mirror of https://github.com/golang/go.git
net: refactoring in preparation for integrated network poller
Introduce pollDesc struct, to split netFD struct into fd-related and poller-related parts. R=golang-dev, bradfitz, iant CC=golang-dev https://golang.org/cl/7762044
This commit is contained in:
parent
07fb6fcd40
commit
b09d881799
|
|
@ -29,10 +29,25 @@ type pollServer struct {
|
|||
pr, pw *os.File
|
||||
poll *pollster // low-level OS hooks
|
||||
sync.Mutex // controls pending and deadline
|
||||
pending map[int]*netFD
|
||||
pending map[int]*pollDesc
|
||||
deadline int64 // next deadline (nsec since 1970)
|
||||
}
|
||||
|
||||
// A pollDesc contains netFD state related to pollServer.
|
||||
type pollDesc struct {
|
||||
// immutable after Init()
|
||||
pollServer *pollServer
|
||||
sysfd int
|
||||
cr, cw chan error
|
||||
|
||||
// mutable, protected by pollServer mutex
|
||||
closing bool
|
||||
ncr, ncw int
|
||||
|
||||
// mutable, safe for concurrent access
|
||||
rdeadline, wdeadline deadline
|
||||
}
|
||||
|
||||
func newPollServer() (s *pollServer, err error) {
|
||||
s = new(pollServer)
|
||||
if s.pr, s.pw, err = os.Pipe(); err != nil {
|
||||
|
|
@ -51,7 +66,7 @@ func newPollServer() (s *pollServer, err error) {
|
|||
s.poll.Close()
|
||||
goto Error
|
||||
}
|
||||
s.pending = make(map[int]*netFD)
|
||||
s.pending = make(map[int]*pollDesc)
|
||||
go s.Run()
|
||||
return s, nil
|
||||
|
||||
|
|
@ -67,10 +82,10 @@ Error:
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
||||
func (s *pollServer) AddFD(pd *pollDesc, mode int) error {
|
||||
s.Lock()
|
||||
intfd := fd.sysfd
|
||||
if intfd < 0 || fd.closing {
|
||||
intfd := pd.sysfd
|
||||
if intfd < 0 || pd.closing {
|
||||
// fd closed underfoot
|
||||
s.Unlock()
|
||||
return errClosing
|
||||
|
|
@ -79,14 +94,14 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
|||
var t int64
|
||||
key := intfd << 1
|
||||
if mode == 'r' {
|
||||
fd.ncr++
|
||||
t = fd.rdeadline.value()
|
||||
pd.ncr++
|
||||
t = pd.rdeadline.value()
|
||||
} else {
|
||||
fd.ncw++
|
||||
pd.ncw++
|
||||
key++
|
||||
t = fd.wdeadline.value()
|
||||
t = pd.wdeadline.value()
|
||||
}
|
||||
s.pending[key] = fd
|
||||
s.pending[key] = pd
|
||||
doWakeup := false
|
||||
if t > 0 && (s.deadline == 0 || t < s.deadline) {
|
||||
s.deadline = t
|
||||
|
|
@ -96,7 +111,7 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
|||
wake, err := s.poll.AddFD(intfd, mode, false)
|
||||
s.Unlock()
|
||||
if err != nil {
|
||||
return &OpError{"addfd", fd.net, fd.laddr, err}
|
||||
return err
|
||||
}
|
||||
if wake || doWakeup {
|
||||
s.Wakeup()
|
||||
|
|
@ -104,25 +119,26 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Evict evicts fd from the pending list, unblocking
|
||||
// any I/O running on fd. The caller must have locked
|
||||
// Evict evicts pd from the pending list, unblocking
|
||||
// any I/O running on pd. The caller must have locked
|
||||
// pollserver.
|
||||
// Return value is whether the pollServer should be woken up.
|
||||
func (s *pollServer) Evict(fd *netFD) bool {
|
||||
func (s *pollServer) Evict(pd *pollDesc) bool {
|
||||
pd.closing = true
|
||||
doWakeup := false
|
||||
if s.pending[fd.sysfd<<1] == fd {
|
||||
s.WakeFD(fd, 'r', errClosing)
|
||||
if s.poll.DelFD(fd.sysfd, 'r') {
|
||||
if s.pending[pd.sysfd<<1] == pd {
|
||||
s.WakeFD(pd, 'r', errClosing)
|
||||
if s.poll.DelFD(pd.sysfd, 'r') {
|
||||
doWakeup = true
|
||||
}
|
||||
delete(s.pending, fd.sysfd<<1)
|
||||
delete(s.pending, pd.sysfd<<1)
|
||||
}
|
||||
if s.pending[fd.sysfd<<1|1] == fd {
|
||||
s.WakeFD(fd, 'w', errClosing)
|
||||
if s.poll.DelFD(fd.sysfd, 'w') {
|
||||
if s.pending[pd.sysfd<<1|1] == pd {
|
||||
s.WakeFD(pd, 'w', errClosing)
|
||||
if s.poll.DelFD(pd.sysfd, 'w') {
|
||||
doWakeup = true
|
||||
}
|
||||
delete(s.pending, fd.sysfd<<1|1)
|
||||
delete(s.pending, pd.sysfd<<1|1)
|
||||
}
|
||||
return doWakeup
|
||||
}
|
||||
|
|
@ -131,7 +147,7 @@ var wakeupbuf [1]byte
|
|||
|
||||
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
|
||||
|
||||
func (s *pollServer) LookupFD(fd int, mode int) *netFD {
|
||||
func (s *pollServer) LookupFD(fd int, mode int) *pollDesc {
|
||||
key := fd << 1
|
||||
if mode == 'w' {
|
||||
key++
|
||||
|
|
@ -144,16 +160,16 @@ func (s *pollServer) LookupFD(fd int, mode int) *netFD {
|
|||
return netfd
|
||||
}
|
||||
|
||||
func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
|
||||
func (s *pollServer) WakeFD(pd *pollDesc, mode int, err error) {
|
||||
if mode == 'r' {
|
||||
for fd.ncr > 0 {
|
||||
fd.ncr--
|
||||
fd.cr <- err
|
||||
for pd.ncr > 0 {
|
||||
pd.ncr--
|
||||
pd.cr <- err
|
||||
}
|
||||
} else {
|
||||
for fd.ncw > 0 {
|
||||
fd.ncw--
|
||||
fd.cw <- err
|
||||
for pd.ncw > 0 {
|
||||
pd.ncw--
|
||||
pd.cw <- err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -164,7 +180,7 @@ func (s *pollServer) CheckDeadlines() {
|
|||
// probably with a heap indexed by wakeup time.
|
||||
|
||||
var nextDeadline int64
|
||||
for key, fd := range s.pending {
|
||||
for key, pd := range s.pending {
|
||||
var t int64
|
||||
var mode int
|
||||
if key&1 == 0 {
|
||||
|
|
@ -173,15 +189,15 @@ func (s *pollServer) CheckDeadlines() {
|
|||
mode = 'w'
|
||||
}
|
||||
if mode == 'r' {
|
||||
t = fd.rdeadline.value()
|
||||
t = pd.rdeadline.value()
|
||||
} else {
|
||||
t = fd.wdeadline.value()
|
||||
t = pd.wdeadline.value()
|
||||
}
|
||||
if t > 0 {
|
||||
if t <= now {
|
||||
delete(s.pending, key)
|
||||
s.poll.DelFD(fd.sysfd, mode)
|
||||
s.WakeFD(fd, mode, errTimeout)
|
||||
s.poll.DelFD(pd.sysfd, mode)
|
||||
s.WakeFD(pd, mode, errTimeout)
|
||||
} else if nextDeadline == 0 || t < nextDeadline {
|
||||
nextDeadline = t
|
||||
}
|
||||
|
|
@ -220,48 +236,67 @@ func (s *pollServer) Run() {
|
|||
s.pr.Read(scratch[0:])
|
||||
s.CheckDeadlines()
|
||||
} else {
|
||||
netfd := s.LookupFD(fd, mode)
|
||||
if netfd == nil {
|
||||
pd := s.LookupFD(fd, mode)
|
||||
if pd == nil {
|
||||
// This can happen because the WaitFD runs without
|
||||
// holding s's lock, so there might be a pending wakeup
|
||||
// for an fd that has been evicted. No harm done.
|
||||
continue
|
||||
}
|
||||
s.WakeFD(netfd, mode, nil)
|
||||
s.WakeFD(pd, mode, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *pollServer) PrepareRead(fd *netFD) error {
|
||||
if fd.rdeadline.expired() {
|
||||
func (pd *pollDesc) Close() {
|
||||
}
|
||||
|
||||
func (pd *pollDesc) Lock() {
|
||||
pd.pollServer.Lock()
|
||||
}
|
||||
|
||||
func (pd *pollDesc) Unlock() {
|
||||
pd.pollServer.Unlock()
|
||||
}
|
||||
|
||||
func (pd *pollDesc) Wakeup() {
|
||||
pd.pollServer.Wakeup()
|
||||
}
|
||||
|
||||
func (pd *pollDesc) PrepareRead() error {
|
||||
if pd.rdeadline.expired() {
|
||||
return errTimeout
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *pollServer) PrepareWrite(fd *netFD) error {
|
||||
if fd.wdeadline.expired() {
|
||||
func (pd *pollDesc) PrepareWrite() error {
|
||||
if pd.wdeadline.expired() {
|
||||
return errTimeout
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *pollServer) WaitRead(fd *netFD) error {
|
||||
err := s.AddFD(fd, 'r')
|
||||
func (pd *pollDesc) WaitRead() error {
|
||||
err := pd.pollServer.AddFD(pd, 'r')
|
||||
if err == nil {
|
||||
err = <-fd.cr
|
||||
err = <-pd.cr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *pollServer) WaitWrite(fd *netFD) error {
|
||||
err := s.AddFD(fd, 'w')
|
||||
func (pd *pollDesc) WaitWrite() error {
|
||||
err := pd.pollServer.AddFD(pd, 'w')
|
||||
if err == nil {
|
||||
err = <-fd.cw
|
||||
err = <-pd.cw
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (pd *pollDesc) Evict() bool {
|
||||
return pd.pollServer.Evict(pd)
|
||||
}
|
||||
|
||||
// Spread network FDs over several pollServers.
|
||||
|
||||
var pollMaxN int
|
||||
|
|
@ -292,31 +327,29 @@ func startServer(k int) {
|
|||
pollservers[k] = p
|
||||
}
|
||||
|
||||
func pollServerInit(fd *netFD) error {
|
||||
func (pd *pollDesc) Init(fd *netFD) error {
|
||||
pollN := runtime.GOMAXPROCS(0)
|
||||
if pollN > pollMaxN {
|
||||
pollN = pollMaxN
|
||||
}
|
||||
k := fd.sysfd % pollN
|
||||
startServersOnce[k]()
|
||||
fd.pollServer = pollservers[k]
|
||||
fd.cr = make(chan error, 1)
|
||||
fd.cw = make(chan error, 1)
|
||||
pd.sysfd = fd.sysfd
|
||||
pd.pollServer = pollservers[k]
|
||||
pd.cr = make(chan error, 1)
|
||||
pd.cw = make(chan error, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *pollServer) Close(fd *netFD) {
|
||||
}
|
||||
|
||||
// TODO(dfc) these unused error returns could be removed
|
||||
|
||||
func setReadDeadline(fd *netFD, t time.Time) error {
|
||||
fd.rdeadline.setTime(t)
|
||||
fd.pd.rdeadline.setTime(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
func setWriteDeadline(fd *netFD, t time.Time) error {
|
||||
fd.wdeadline.setTime(t)
|
||||
fd.pd.wdeadline.setTime(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ type netFD struct {
|
|||
sysmu sync.Mutex
|
||||
sysref int
|
||||
|
||||
// must lock both sysmu and pollserver to write
|
||||
// must lock both sysmu and pollDesc to write
|
||||
// can lock either to read
|
||||
closing bool
|
||||
|
||||
|
|
@ -30,8 +30,6 @@ type netFD struct {
|
|||
sotype int
|
||||
isConnected bool
|
||||
sysfile *os.File
|
||||
cr chan error
|
||||
cw chan error
|
||||
net string
|
||||
laddr Addr
|
||||
raddr Addr
|
||||
|
|
@ -39,14 +37,8 @@ type netFD struct {
|
|||
// serialize access to Read and Write methods
|
||||
rio, wio sync.Mutex
|
||||
|
||||
// read and write deadlines
|
||||
rdeadline, wdeadline deadline
|
||||
|
||||
// owned by fd wait server
|
||||
ncr, ncw int
|
||||
|
||||
// wait server
|
||||
pollServer *pollServer
|
||||
pd pollDesc
|
||||
}
|
||||
|
||||
func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) {
|
||||
|
|
@ -65,7 +57,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) {
|
|||
sotype: sotype,
|
||||
net: net,
|
||||
}
|
||||
if err := pollServerInit(netfd); err != nil {
|
||||
if err := netfd.pd.Init(netfd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return netfd, nil
|
||||
|
|
@ -91,12 +83,12 @@ func (fd *netFD) name() string {
|
|||
func (fd *netFD) connect(ra syscall.Sockaddr) error {
|
||||
fd.wio.Lock()
|
||||
defer fd.wio.Unlock()
|
||||
if err := fd.pollServer.PrepareWrite(fd); err != nil {
|
||||
if err := fd.pd.PrepareWrite(); err != nil {
|
||||
return err
|
||||
}
|
||||
err := syscall.Connect(fd.sysfd, ra)
|
||||
if err == syscall.EINPROGRESS {
|
||||
if err = fd.pollServer.WaitWrite(fd); err != nil {
|
||||
if err = fd.pd.WaitWrite(); err != nil {
|
||||
return err
|
||||
}
|
||||
var e int
|
||||
|
|
@ -112,7 +104,7 @@ func (fd *netFD) connect(ra syscall.Sockaddr) error {
|
|||
}
|
||||
|
||||
// Add a reference to this fd.
|
||||
// If closing==true, pollserver must be locked; mark the fd as closing.
|
||||
// If closing==true, pollDesc must be locked; mark the fd as closing.
|
||||
// Returns an error if the fd cannot be used.
|
||||
func (fd *netFD) incref(closing bool) error {
|
||||
fd.sysmu.Lock()
|
||||
|
|
@ -135,7 +127,7 @@ func (fd *netFD) decref() {
|
|||
fd.sysref--
|
||||
if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
|
||||
fd.sysfile.Close()
|
||||
fd.pollServer.Close(fd)
|
||||
fd.pd.Close()
|
||||
fd.sysfile = nil
|
||||
fd.sysfd = -1
|
||||
}
|
||||
|
|
@ -143,21 +135,21 @@ func (fd *netFD) decref() {
|
|||
}
|
||||
|
||||
func (fd *netFD) Close() error {
|
||||
fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict
|
||||
fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
|
||||
if err := fd.incref(true); err != nil {
|
||||
fd.pollServer.Unlock()
|
||||
fd.pd.Unlock()
|
||||
return err
|
||||
}
|
||||
// Unblock any I/O. Once it all unblocks and returns,
|
||||
// so that it cannot be referring to fd.sysfd anymore,
|
||||
// the final decref will close fd.sysfd. This should happen
|
||||
// fairly quickly, since all the I/O is non-blocking, and any
|
||||
// attempts to block in the pollserver will return errClosing.
|
||||
doWakeup := fd.pollServer.Evict(fd)
|
||||
fd.pollServer.Unlock()
|
||||
// attempts to block in the pollDesc will return errClosing.
|
||||
doWakeup := fd.pd.Evict()
|
||||
fd.pd.Unlock()
|
||||
fd.decref()
|
||||
if doWakeup {
|
||||
fd.pollServer.Wakeup()
|
||||
fd.pd.Wakeup()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -189,7 +181,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
|
|||
return 0, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareRead(fd); err != nil {
|
||||
if err := fd.pd.PrepareRead(); err != nil {
|
||||
return 0, &OpError{"read", fd.net, fd.raddr, err}
|
||||
}
|
||||
for {
|
||||
|
|
@ -197,7 +189,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
|
|||
if err != nil {
|
||||
n = 0
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitRead(fd); err == nil {
|
||||
if err = fd.pd.WaitRead(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -218,7 +210,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
|
|||
return 0, nil, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareRead(fd); err != nil {
|
||||
if err := fd.pd.PrepareRead(); err != nil {
|
||||
return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
|
||||
}
|
||||
for {
|
||||
|
|
@ -226,7 +218,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
|
|||
if err != nil {
|
||||
n = 0
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitRead(fd); err == nil {
|
||||
if err = fd.pd.WaitRead(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -247,7 +239,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
|
|||
return 0, 0, 0, nil, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareRead(fd); err != nil {
|
||||
if err := fd.pd.PrepareRead(); err != nil {
|
||||
return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
|
||||
}
|
||||
for {
|
||||
|
|
@ -255,7 +247,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
|
|||
if err != nil {
|
||||
// TODO(dfc) should n and oobn be set to 0
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitRead(fd); err == nil {
|
||||
if err = fd.pd.WaitRead(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -283,7 +275,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
|
|||
return 0, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareWrite(fd); err != nil {
|
||||
if err := fd.pd.PrepareWrite(); err != nil {
|
||||
return 0, &OpError{"write", fd.net, fd.raddr, err}
|
||||
}
|
||||
for {
|
||||
|
|
@ -296,7 +288,7 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
|
|||
break
|
||||
}
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
||||
if err = fd.pd.WaitWrite(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -322,13 +314,13 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
|
|||
return 0, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareWrite(fd); err != nil {
|
||||
if err := fd.pd.PrepareWrite(); err != nil {
|
||||
return 0, &OpError{"write", fd.net, fd.raddr, err}
|
||||
}
|
||||
for {
|
||||
err = syscall.Sendto(fd.sysfd, p, 0, sa)
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
||||
if err = fd.pd.WaitWrite(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -349,13 +341,13 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
|
|||
return 0, 0, err
|
||||
}
|
||||
defer fd.decref()
|
||||
if err := fd.pollServer.PrepareWrite(fd); err != nil {
|
||||
if err := fd.pd.PrepareWrite(); err != nil {
|
||||
return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
|
||||
}
|
||||
for {
|
||||
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
||||
if err = fd.pd.WaitWrite(); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -380,14 +372,14 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
|
|||
|
||||
var s int
|
||||
var rsa syscall.Sockaddr
|
||||
if err = fd.pollServer.PrepareRead(fd); err != nil {
|
||||
if err = fd.pd.PrepareRead(); err != nil {
|
||||
return nil, &OpError{"accept", fd.net, fd.laddr, err}
|
||||
}
|
||||
for {
|
||||
s, rsa, err = accept(fd.sysfd)
|
||||
if err != nil {
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitRead(fd); err == nil {
|
||||
if err = fd.pd.WaitRead(); err == nil {
|
||||
continue
|
||||
}
|
||||
} else if err == syscall.ECONNABORTED {
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
|
|||
break
|
||||
}
|
||||
if err1 == syscall.EAGAIN {
|
||||
if err1 = c.pollServer.WaitWrite(c); err1 == nil {
|
||||
if err1 = c.pd.WaitWrite(); err1 == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
|
|||
break
|
||||
}
|
||||
if err1 == syscall.EAGAIN {
|
||||
if err1 = c.pollServer.WaitWrite(c); err1 == nil {
|
||||
if err1 = c.pd.WaitWrite(); err1 == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue