mirror of https://github.com/golang/go.git
net: fix data races on deadline vars
Fixes #4434. R=mikioh.mikioh, bradfitz, dvyukov, alex.brainman CC=golang-dev https://golang.org/cl/6855110
This commit is contained in:
parent
0ce96f9ef4
commit
be0d84e335
|
|
@ -0,0 +1,57 @@
|
|||
// Copyright 2012 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build darwin freebsd linux netbsd openbsd windows
|
||||
|
||||
package net
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var deadlineSetTimeTests = []struct {
|
||||
input time.Time
|
||||
expected int64
|
||||
}{
|
||||
{time.Time{}, 0},
|
||||
{time.Date(2009, 11, 10, 23, 00, 00, 00, time.UTC), 1257894000000000000}, // 2009-11-10 23:00:00 +0000 UTC
|
||||
}
|
||||
|
||||
func TestDeadlineSetTime(t *testing.T) {
|
||||
for _, tt := range deadlineSetTimeTests {
|
||||
var d deadline
|
||||
d.setTime(tt.input)
|
||||
actual := d.value()
|
||||
expected := int64(0)
|
||||
if !tt.input.IsZero() {
|
||||
expected = tt.input.UnixNano()
|
||||
}
|
||||
if actual != expected {
|
||||
t.Errorf("set/value failed: expected %v, actual %v", expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var deadlineExpiredTests = []struct {
|
||||
deadline time.Time
|
||||
expired bool
|
||||
}{
|
||||
// note, times are relative to the start of the test run, not
|
||||
// the start of TestDeadlineExpired
|
||||
{time.Now().Add(5 * time.Minute), false},
|
||||
{time.Now().Add(-5 * time.Minute), true},
|
||||
{time.Time{}, false}, // no deadline set
|
||||
}
|
||||
|
||||
func TestDeadlineExpired(t *testing.T) {
|
||||
for _, tt := range deadlineExpiredTests {
|
||||
var d deadline
|
||||
d.set(tt.deadline.UnixNano())
|
||||
expired := d.expired()
|
||||
if expired != tt.expired {
|
||||
t.Errorf("expire failed: expected %v, actual %v", tt.expired, expired)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
|
@ -37,11 +38,11 @@ type netFD struct {
|
|||
laddr Addr
|
||||
raddr Addr
|
||||
|
||||
// owned by client
|
||||
rdeadline int64
|
||||
rio sync.Mutex
|
||||
wdeadline int64
|
||||
wio sync.Mutex
|
||||
// serialize access to Read and Write methods
|
||||
rio, wio sync.Mutex
|
||||
|
||||
// read and write deadlines
|
||||
rdeadline, wdeadline deadline
|
||||
|
||||
// owned by fd wait server
|
||||
ncr, ncw int
|
||||
|
|
@ -50,6 +51,31 @@ type netFD struct {
|
|||
pollServer *pollServer
|
||||
}
|
||||
|
||||
// deadline is an atomically-accessed number of nanoseconds since 1970
|
||||
// or 0, if no deadline is set.
|
||||
type deadline int64
|
||||
|
||||
func (d *deadline) expired() bool {
|
||||
t := d.value()
|
||||
return t > 0 && time.Now().UnixNano() >= t
|
||||
}
|
||||
|
||||
func (d *deadline) value() int64 {
|
||||
return atomic.LoadInt64((*int64)(d))
|
||||
}
|
||||
|
||||
func (d *deadline) set(v int64) {
|
||||
atomic.StoreInt64((*int64)(d), v)
|
||||
}
|
||||
|
||||
func (d *deadline) setTime(t time.Time) {
|
||||
if t.IsZero() {
|
||||
d.set(0)
|
||||
} else {
|
||||
d.set(t.UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
// A pollServer helps FDs determine when to retry a non-blocking
|
||||
// read or write after they get EAGAIN. When an FD needs to wait,
|
||||
// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server.
|
||||
|
|
@ -82,11 +108,11 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
|||
key := intfd << 1
|
||||
if mode == 'r' {
|
||||
fd.ncr++
|
||||
t = fd.rdeadline
|
||||
t = fd.rdeadline.value()
|
||||
} else {
|
||||
fd.ncw++
|
||||
key++
|
||||
t = fd.wdeadline
|
||||
t = fd.wdeadline.value()
|
||||
}
|
||||
s.pending[key] = fd
|
||||
doWakeup := false
|
||||
|
|
@ -153,12 +179,8 @@ func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *pollServer) Now() int64 {
|
||||
return time.Now().UnixNano()
|
||||
}
|
||||
|
||||
func (s *pollServer) CheckDeadlines() {
|
||||
now := s.Now()
|
||||
now := time.Now().UnixNano()
|
||||
// TODO(rsc): This will need to be handled more efficiently,
|
||||
// probably with a heap indexed by wakeup time.
|
||||
|
||||
|
|
@ -172,9 +194,9 @@ func (s *pollServer) CheckDeadlines() {
|
|||
mode = 'w'
|
||||
}
|
||||
if mode == 'r' {
|
||||
t = fd.rdeadline
|
||||
t = fd.rdeadline.value()
|
||||
} else {
|
||||
t = fd.wdeadline
|
||||
t = fd.wdeadline.value()
|
||||
}
|
||||
if t > 0 {
|
||||
if t <= now {
|
||||
|
|
@ -198,15 +220,15 @@ func (s *pollServer) Run() {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for {
|
||||
var t = s.deadline
|
||||
if t > 0 {
|
||||
t = t - s.Now()
|
||||
if t <= 0 {
|
||||
var timeout int64 // nsec to wait for or 0 for none
|
||||
if s.deadline > 0 {
|
||||
timeout = s.deadline - time.Now().UnixNano()
|
||||
if timeout <= 0 {
|
||||
s.CheckDeadlines()
|
||||
continue
|
||||
}
|
||||
}
|
||||
fd, mode, err := s.poll.WaitFD(s, t)
|
||||
fd, mode, err := s.poll.WaitFD(s, timeout)
|
||||
if err != nil {
|
||||
print("pollServer WaitFD: ", err.Error(), "\n")
|
||||
return
|
||||
|
|
@ -417,11 +439,9 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
defer fd.decref()
|
||||
for {
|
||||
if fd.rdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.rdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.rdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
n, err = syscall.Read(int(fd.sysfd), p)
|
||||
if err != nil {
|
||||
|
|
@ -449,11 +469,9 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
|
|||
}
|
||||
defer fd.decref()
|
||||
for {
|
||||
if fd.rdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.rdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.rdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
|
||||
if err != nil {
|
||||
|
|
@ -481,15 +499,13 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
|
|||
}
|
||||
defer fd.decref()
|
||||
for {
|
||||
if fd.rdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.rdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.rdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
|
||||
if err != nil {
|
||||
// TODO(dfc) should n and oobn be set to nil
|
||||
// TODO(dfc) should n and oobn be set to 0
|
||||
if err == syscall.EAGAIN {
|
||||
if err = fd.pollServer.WaitRead(fd); err == nil {
|
||||
continue
|
||||
|
|
@ -512,21 +528,17 @@ func chkReadErr(n int, err error, fd *netFD) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (fd *netFD) Write(p []byte) (int, error) {
|
||||
func (fd *netFD) Write(p []byte) (nn int, err error) {
|
||||
fd.wio.Lock()
|
||||
defer fd.wio.Unlock()
|
||||
if err := fd.incref(false); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer fd.decref()
|
||||
var err error
|
||||
nn := 0
|
||||
for {
|
||||
if fd.wdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.wdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.wdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
var n int
|
||||
n, err = syscall.Write(int(fd.sysfd), p[nn:])
|
||||
|
|
@ -564,11 +576,9 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
|
|||
}
|
||||
defer fd.decref()
|
||||
for {
|
||||
if fd.wdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.wdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.wdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
err = syscall.Sendto(fd.sysfd, p, 0, sa)
|
||||
if err == syscall.EAGAIN {
|
||||
|
|
@ -594,11 +604,9 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
|
|||
}
|
||||
defer fd.decref()
|
||||
for {
|
||||
if fd.wdeadline > 0 {
|
||||
if time.Now().UnixNano() >= fd.wdeadline {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
if fd.wdeadline.expired() {
|
||||
err = errTimeout
|
||||
break
|
||||
}
|
||||
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
|
||||
if err == syscall.EAGAIN {
|
||||
|
|
|
|||
|
|
@ -285,11 +285,39 @@ type netFD struct {
|
|||
errnoc [2]chan error // read/write submit or cancel operation errors
|
||||
closec chan bool // used by Close to cancel pending IO
|
||||
|
||||
// owned by client
|
||||
rdeadline int64
|
||||
rio sync.Mutex
|
||||
wdeadline int64
|
||||
wio sync.Mutex
|
||||
// serialize access to Read and Write methods
|
||||
rio, wio sync.Mutex
|
||||
|
||||
// read and write deadlines
|
||||
rdeadline, wdeadline deadline
|
||||
}
|
||||
|
||||
// deadline is a number of nanoseconds since 1970 or 0, if no deadline is set.
|
||||
// For compatability, deadline has the same method set as fd_unix.go, but
|
||||
// does not use atomic operations as it is not known if data races exist on
|
||||
// these values.
|
||||
// TODO(dfc,brainman) when we get a windows race builder, revisit this.
|
||||
type deadline int64
|
||||
|
||||
func (d *deadline) expired() bool {
|
||||
t := d.value()
|
||||
return t > 0 && time.Now().UnixNano() >= t
|
||||
}
|
||||
|
||||
func (d *deadline) value() int64 {
|
||||
return int64(*d)
|
||||
}
|
||||
|
||||
func (d *deadline) set(v int64) {
|
||||
*d = deadline(v)
|
||||
}
|
||||
|
||||
func (d *deadline) setTime(t time.Time) {
|
||||
if t.IsZero() {
|
||||
d.set(0)
|
||||
} else {
|
||||
d.set(t.UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD {
|
||||
|
|
@ -422,7 +450,7 @@ func (fd *netFD) Read(buf []byte) (int, error) {
|
|||
defer fd.rio.Unlock()
|
||||
var o readOp
|
||||
o.Init(fd, buf, 'r')
|
||||
n, err := iosrv.ExecIO(&o, fd.rdeadline)
|
||||
n, err := iosrv.ExecIO(&o, fd.rdeadline.value())
|
||||
if err == nil && n == 0 {
|
||||
err = io.EOF
|
||||
}
|
||||
|
|
@ -459,7 +487,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
|
|||
var o readFromOp
|
||||
o.Init(fd, buf, 'r')
|
||||
o.rsan = int32(unsafe.Sizeof(o.rsa))
|
||||
n, err = iosrv.ExecIO(&o, fd.rdeadline)
|
||||
n, err = iosrv.ExecIO(&o, fd.rdeadline.value())
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
|
@ -491,7 +519,7 @@ func (fd *netFD) Write(buf []byte) (int, error) {
|
|||
defer fd.wio.Unlock()
|
||||
var o writeOp
|
||||
o.Init(fd, buf, 'w')
|
||||
return iosrv.ExecIO(&o, fd.wdeadline)
|
||||
return iosrv.ExecIO(&o, fd.wdeadline.value())
|
||||
}
|
||||
|
||||
// WriteTo to network.
|
||||
|
|
@ -523,7 +551,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
|
|||
var o writeToOp
|
||||
o.Init(fd, buf, 'w')
|
||||
o.sa = sa
|
||||
return iosrv.ExecIO(&o, fd.wdeadline)
|
||||
return iosrv.ExecIO(&o, fd.wdeadline.value())
|
||||
}
|
||||
|
||||
// Accept new network connections.
|
||||
|
|
@ -572,7 +600,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
|
|||
var o acceptOp
|
||||
o.Init(fd, 'r')
|
||||
o.newsock = s
|
||||
_, err = iosrv.ExecIO(&o, fd.rdeadline)
|
||||
_, err = iosrv.ExecIO(&o, fd.rdeadline.value())
|
||||
if err != nil {
|
||||
closesocket(s)
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
|
|||
if n == 0 && err1 == nil {
|
||||
break
|
||||
}
|
||||
if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
|
||||
if err1 == syscall.EAGAIN {
|
||||
if err1 = c.pollServer.WaitWrite(c); err1 == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
|
|||
if n == 0 && err1 == nil {
|
||||
break
|
||||
}
|
||||
if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
|
||||
if err1 == syscall.EAGAIN {
|
||||
if err1 = c.pollServer.WaitWrite(c); err1 == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,16 +57,14 @@ func socket(net string, f, t, p int, ipv6only bool, ulsa, ursa syscall.Sockaddr,
|
|||
}
|
||||
|
||||
if ursa != nil {
|
||||
if !deadline.IsZero() {
|
||||
fd.wdeadline = deadline.UnixNano()
|
||||
}
|
||||
fd.wdeadline.setTime(deadline)
|
||||
if err = fd.connect(ursa); err != nil {
|
||||
closesocket(s)
|
||||
fd.Close()
|
||||
return nil, err
|
||||
}
|
||||
fd.isConnected = true
|
||||
fd.wdeadline = 0
|
||||
fd.wdeadline.set(0)
|
||||
}
|
||||
|
||||
lsa, _ := syscall.Getsockname(s)
|
||||
|
|
|
|||
|
|
@ -119,29 +119,22 @@ func setWriteBuffer(fd *netFD, bytes int) error {
|
|||
return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes))
|
||||
}
|
||||
|
||||
// TODO(dfc) these unused error returns could be removed
|
||||
|
||||
func setReadDeadline(fd *netFD, t time.Time) error {
|
||||
if t.IsZero() {
|
||||
fd.rdeadline = 0
|
||||
} else {
|
||||
fd.rdeadline = t.UnixNano()
|
||||
}
|
||||
fd.rdeadline.setTime(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
func setWriteDeadline(fd *netFD, t time.Time) error {
|
||||
if t.IsZero() {
|
||||
fd.wdeadline = 0
|
||||
} else {
|
||||
fd.wdeadline = t.UnixNano()
|
||||
}
|
||||
fd.wdeadline.setTime(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
func setDeadline(fd *netFD, t time.Time) error {
|
||||
if err := setReadDeadline(fd, t); err != nil {
|
||||
return err
|
||||
}
|
||||
return setWriteDeadline(fd, t)
|
||||
setReadDeadline(fd, t)
|
||||
setWriteDeadline(fd, t)
|
||||
return nil
|
||||
}
|
||||
|
||||
func setKeepAlive(fd *netFD, keepalive bool) error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue