mirror of https://github.com/golang/go.git
[release-branch.go1.23] runtime: if stop/reset races with running timer, return correct result
The timer code is careful to ensure that if stop/reset is called
while a timer is being run, we cancel the run. However, the code
failed to ensure that in that case stop/reset returned true,
meaning that the timer had been stopped. In the racing case
stop/reset could see that t.when had been set to zero,
and return false, even though the timer had not and never would fire.
Fix this by tracking whether a timer run is in progress,
and using that to reliably detect that the run was cancelled,
meaning that stop/reset should return true.
For #69312
Fixes #69333
Change-Id: I78e870063eb96650638f12c056e32c931417c84a
Reviewed-on: https://go-review.googlesource.com/c/go/+/611496
Reviewed-by: David Chase <drchase@google.com>
Reviewed-by: Cuong Manh Le <cuong.manhle.vn@gmail.com>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Ian Lance Taylor <iant@golang.org>
(cherry picked from commit 2ebaff4890)
Reviewed-on: https://go-review.googlesource.com/c/go/+/616096
Reviewed-by: Ian Lance Taylor <iant@google.com>
Commit-Queue: Ian Lance Taylor <iant@google.com>
Auto-Submit: Ian Lance Taylor <iant@google.com>
This commit is contained in:
parent
fbddfae62f
commit
3b2e846e11
|
|
@ -26,10 +26,40 @@ type timer struct {
|
|||
// mu protects reads and writes to all fields, with exceptions noted below.
|
||||
mu mutex
|
||||
|
||||
astate atomic.Uint8 // atomic copy of state bits at last unlock
|
||||
state uint8 // state bits
|
||||
isChan bool // timer has a channel; immutable; can be read without lock
|
||||
blocked uint32 // number of goroutines blocked on timer's channel
|
||||
astate atomic.Uint8 // atomic copy of state bits at last unlock
|
||||
state uint8 // state bits
|
||||
isChan bool // timer has a channel; immutable; can be read without lock
|
||||
|
||||
// isSending is used to handle races between running a
|
||||
// channel timer and stopping or resetting the timer.
|
||||
// It is used only for channel timers (t.isChan == true).
|
||||
// The lowest zero bit is set when about to send a value on the channel,
|
||||
// and cleared after sending the value.
|
||||
// The stop/reset code uses this to detect whether it
|
||||
// stopped the channel send.
|
||||
//
|
||||
// An isSending bit is set only when t.mu is held.
|
||||
// An isSending bit is cleared only when t.sendLock is held.
|
||||
// isSending is read only when both t.mu and t.sendLock are held.
|
||||
//
|
||||
// Setting and clearing Uint8 bits handles the case of
|
||||
// a timer that is reset concurrently with unlockAndRun.
|
||||
// If the reset timer runs immediately, we can wind up with
|
||||
// concurrent calls to unlockAndRun for the same timer.
|
||||
// Using matched bit set and clear in unlockAndRun
|
||||
// ensures that the value doesn't get temporarily out of sync.
|
||||
//
|
||||
// We use a uint8 to keep the timer struct small.
|
||||
// This means that we can only support up to 8 concurrent
|
||||
// runs of a timer, where a concurrent run can only occur if
|
||||
// we start a run, unlock the timer, the timer is reset to a new
|
||||
// value (or the ticker fires again), it is ready to run,
|
||||
// and it is actually run, all before the first run completes.
|
||||
// Since completing a run is fast, even 2 concurrent timer runs are
|
||||
// nearly impossible, so this should be safe in practice.
|
||||
isSending atomic.Uint8
|
||||
|
||||
blocked uint32 // number of goroutines blocked on timer's channel
|
||||
|
||||
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
|
||||
// each time calling f(arg, seq, delay) in the timer goroutine, so f must be
|
||||
|
|
@ -431,6 +461,15 @@ func (t *timer) stop() bool {
|
|||
// Stop any future sends with stale values.
|
||||
// See timer.unlockAndRun.
|
||||
t.seq++
|
||||
|
||||
// If there is currently a send in progress,
|
||||
// incrementing seq is going to prevent that
|
||||
// send from actually happening. That means
|
||||
// that we should return true: the timer was
|
||||
// stopped, even though t.when may be zero.
|
||||
if t.isSending.Load() > 0 {
|
||||
pending = true
|
||||
}
|
||||
}
|
||||
t.unlock()
|
||||
if !async && t.isChan {
|
||||
|
|
@ -525,6 +564,15 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
|
|||
// Stop any future sends with stale values.
|
||||
// See timer.unlockAndRun.
|
||||
t.seq++
|
||||
|
||||
// If there is currently a send in progress,
|
||||
// incrementing seq is going to prevent that
|
||||
// send from actually happening. That means
|
||||
// that we should return true: the timer was
|
||||
// stopped, even though t.when may be zero.
|
||||
if t.isSending.Load() > 0 {
|
||||
pending = true
|
||||
}
|
||||
}
|
||||
t.unlock()
|
||||
if !async && t.isChan {
|
||||
|
|
@ -1013,6 +1061,24 @@ func (t *timer) unlockAndRun(now int64) {
|
|||
}
|
||||
t.updateHeap()
|
||||
}
|
||||
|
||||
async := debug.asynctimerchan.Load() != 0
|
||||
var isSendingClear uint8
|
||||
if !async && t.isChan {
|
||||
// Tell Stop/Reset that we are sending a value.
|
||||
// Set the lowest zero bit.
|
||||
// We do this awkward step because atomic.Uint8
|
||||
// doesn't support Add or CompareAndSwap.
|
||||
// We only set bits with t locked.
|
||||
v := t.isSending.Load()
|
||||
i := sys.TrailingZeros8(^v)
|
||||
if i == 8 {
|
||||
throw("too many concurrent timer firings")
|
||||
}
|
||||
isSendingClear = 1 << i
|
||||
t.isSending.Or(isSendingClear)
|
||||
}
|
||||
|
||||
t.unlock()
|
||||
|
||||
if raceenabled {
|
||||
|
|
@ -1028,7 +1094,6 @@ func (t *timer) unlockAndRun(now int64) {
|
|||
ts.unlock()
|
||||
}
|
||||
|
||||
async := debug.asynctimerchan.Load() != 0
|
||||
if !async && t.isChan {
|
||||
// For a timer channel, we want to make sure that no stale sends
|
||||
// happen after a t.stop or t.modify, but we cannot hold t.mu
|
||||
|
|
@ -1044,6 +1109,10 @@ func (t *timer) unlockAndRun(now int64) {
|
|||
// and double-check that t.seq is still the seq value we saw above.
|
||||
// If not, the timer has been updated and we should skip the send.
|
||||
// We skip the send by reassigning f to a no-op function.
|
||||
//
|
||||
// The isSending field tells t.stop or t.modify that we have
|
||||
// started to send the value. That lets them correctly return
|
||||
// true meaning that no value was sent.
|
||||
lock(&t.sendLock)
|
||||
if t.seq != seq {
|
||||
f = func(any, uintptr, int64) {}
|
||||
|
|
@ -1053,6 +1122,9 @@ func (t *timer) unlockAndRun(now int64) {
|
|||
f(arg, seq, delay)
|
||||
|
||||
if !async && t.isChan {
|
||||
// We are no longer sending a value.
|
||||
t.isSending.And(^isSendingClear)
|
||||
|
||||
unlock(&t.sendLock)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -785,6 +785,68 @@ func TestAdjustTimers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStopResult(t *testing.T) {
|
||||
testStopResetResult(t, true)
|
||||
}
|
||||
|
||||
func TestResetResult(t *testing.T) {
|
||||
testStopResetResult(t, false)
|
||||
}
|
||||
|
||||
// Test that when racing between running a timer and stopping a timer Stop
|
||||
// consistently indicates whether a value can be read from the channel.
|
||||
// Issue #69312.
|
||||
func testStopResetResult(t *testing.T, testStop bool) {
|
||||
for _, name := range []string{"0", "1", "2"} {
|
||||
t.Run("asynctimerchan="+name, func(t *testing.T) {
|
||||
testStopResetResultGODEBUG(t, testStop, name)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testStopResetResultGODEBUG(t *testing.T, testStop bool, godebug string) {
|
||||
t.Setenv("GODEBUG", "asynctimerchan="+godebug)
|
||||
|
||||
stopOrReset := func(timer *Timer) bool {
|
||||
if testStop {
|
||||
return timer.Stop()
|
||||
} else {
|
||||
return timer.Reset(1 * Hour)
|
||||
}
|
||||
}
|
||||
|
||||
start := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
const N = 1000
|
||||
wg.Add(N)
|
||||
for range N {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
for j := 0; j < 100; j++ {
|
||||
timer1 := NewTimer(1 * Millisecond)
|
||||
timer2 := NewTimer(1 * Millisecond)
|
||||
select {
|
||||
case <-timer1.C:
|
||||
if !stopOrReset(timer2) {
|
||||
// The test fails if this
|
||||
// channel read times out.
|
||||
<-timer2.C
|
||||
}
|
||||
case <-timer2.C:
|
||||
if !stopOrReset(timer1) {
|
||||
// The test fails if this
|
||||
// channel read times out.
|
||||
<-timer1.C
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
close(start)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Benchmark timer latency when the thread that creates the timer is busy with
|
||||
// other work and the timers must be serviced by other threads.
|
||||
// https://golang.org/issue/38860
|
||||
|
|
|
|||
Loading…
Reference in New Issue