internal/poll: add mutex to prevent SetDeadline race in Plan 9

There are data races on fd.[rw]aio and fd.[rw]timedout when Read/Write
is called on a polled fd concurrently with SetDeadline (see #38769).
Adding a mutex around accesses to each pair (read and write) prevents
the race, which was causing deadlocks in net/http tests on the builders.

Updates #38769.

Change-Id: I31719b3c9a664e81a775cda583cff31c0da946c4
Reviewed-on: https://go-review.googlesource.com/c/go/+/235820
Run-TryBot: David du Colombier <0intro@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David du Colombier <0intro@gmail.com>
This commit is contained in:
Richard Miller 2020-06-02 10:34:09 +01:00 committed by David du Colombier
parent 0eb9767caa
commit d17aebf46d
1 changed files with 23 additions and 6 deletions

View File

@ -7,6 +7,7 @@ package poll
import (
"errors"
"io"
"sync"
"sync/atomic"
"time"
)
@ -24,6 +25,8 @@ type FD struct {
Destroy func()
// deadlines
rmu sync.Mutex
wmu sync.Mutex
raio *asyncIO
waio *asyncIO
rtimer *time.Timer
@ -59,9 +62,6 @@ func (fd *FD) Close() error {
// Read implements io.Reader.
func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
if fd.rtimedout.isSet() {
return 0, ErrDeadlineExceeded
}
if err := fd.readLock(); err != nil {
return 0, err
}
@ -69,7 +69,13 @@ func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}
fd.rmu.Lock()
if fd.rtimedout.isSet() {
fd.rmu.Unlock()
return 0, ErrDeadlineExceeded
}
fd.raio = newAsyncIO(fn, b)
fd.rmu.Unlock()
n, err := fd.raio.Wait()
fd.raio = nil
if isHangup(err) {
@ -83,14 +89,17 @@ func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
// Write implements io.Writer.
func (fd *FD) Write(fn func([]byte) (int, error), b []byte) (int, error) {
if fd.wtimedout.isSet() {
return 0, ErrDeadlineExceeded
}
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
fd.wmu.Lock()
if fd.wtimedout.isSet() {
fd.wmu.Unlock()
return 0, ErrDeadlineExceeded
}
fd.waio = newAsyncIO(fn, b)
fd.wmu.Unlock()
n, err := fd.waio.Wait()
fd.waio = nil
if isInterrupted(err) {
@ -117,9 +126,13 @@ func (fd *FD) SetWriteDeadline(t time.Time) error {
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
d := t.Sub(time.Now())
if mode == 'r' || mode == 'r'+'w' {
fd.rmu.Lock()
defer fd.rmu.Unlock()
fd.rtimedout.setFalse()
}
if mode == 'w' || mode == 'r'+'w' {
fd.wmu.Lock()
defer fd.wmu.Unlock()
fd.wtimedout.setFalse()
}
if t.IsZero() || d < 0 {
@ -140,18 +153,22 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
// Interrupt I/O operation once timer has expired
if mode == 'r' || mode == 'r'+'w' {
fd.rtimer = time.AfterFunc(d, func() {
fd.rmu.Lock()
fd.rtimedout.setTrue()
if fd.raio != nil {
fd.raio.Cancel()
}
fd.rmu.Unlock()
})
}
if mode == 'w' || mode == 'r'+'w' {
fd.wtimer = time.AfterFunc(d, func() {
fd.wmu.Lock()
fd.wtimedout.setTrue()
if fd.waio != nil {
fd.waio.Cancel()
}
fd.wmu.Unlock()
})
}
}