runtime: clean up timer state

The timers had evolved to the point where the state was stored as follows:

	if timer in heap:
	    state has timerHeaped set
	    if heap timer is stale:
	        heap deadline in t.when
	        real deadline in t.nextWhen
	        state has timerNextWhen set
	    else:
	        real deadline in t.when
	        t.nextWhen unset
	else:
	    real deadline in t.when
	    t.nextWhen unset

That made it hard to find the real deadline and just hard to think about everything.
The new state is:

	real deadline in t.when (always)
	if timer in heap:
	    state has timerHeaped set
	    heap deadline in t.whenHeap
	    if heap timer is stale:
	        state has timerModified set

Separately, the 'state' word itself was being used as a lock
and state bits because the code started with CAS loops,
which we abstracted into the lock/unlock methods step by step.
At this point, we can switch to a real lock, making sure to
publish the one boolean needed by timers fast paths
at each unlock.

All this simplifies various logic considerably.

Change-Id: I35766204f7a26d999206bd56cc0db60ad1b17cbe
Reviewed-on: https://go-review.googlesource.com/c/go/+/570335
Auto-Submit: Russ Cox <rsc@golang.org>
Reviewed-by: Austin Clements <austin@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
This commit is contained in:
Russ Cox 2024-03-09 13:36:58 -05:00 committed by Gopher Robot
parent b2e9221089
commit 6135651867
8 changed files with 283 additions and 325 deletions

View File

@ -361,10 +361,10 @@ func (s *scavengerState) init() {
s.g = getg()
s.timer = new(timer)
s.timer.arg = s
s.timer.f = func(s any, _ uintptr) {
f := func(s any, _ uintptr) {
s.(*scavengerState).wake()
}
s.timer.init(f, s)
// input: fraction of CPU time actually used.
// setpoint: ideal CPU fraction.
@ -497,7 +497,7 @@ func (s *scavengerState) sleep(worked float64) {
// because we can't close over any variables without
// failing escape analysis.
start := nanotime()
s.timer.reset(start + sleepTime)
s.timer.reset(start+sleepTime, 0)
// Mark ourselves as asleep and go to sleep.
s.parked = true

View File

@ -104,10 +104,12 @@ type pollDesc struct {
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer (set if rt.f != nil)
rt timer // read deadline timer
rrun bool // whether rt is running
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wrun bool // whether wt is running
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
@ -392,15 +394,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
if combo {
rtf = netpollDeadline
}
if pd.rt.f == nil {
if !pd.rrun {
if pd.rd > 0 {
pd.rt.f = rtf
// Copy current seq into the timer arg.
// Timer func will check the seq against current descriptor seq,
// if they differ the descriptor was reused or timers were reset.
pd.rt.arg = pd.makeArg()
pd.rt.seq = pd.rseq
pd.rt.reset(pd.rd)
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
pd.rrun = true
}
} else if pd.rd != rd0 || combo != combo0 {
pd.rseq++ // invalidate current timers
@ -408,15 +408,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
} else {
pd.rt.stop()
pd.rt.f = nil
pd.rrun = false
}
}
if pd.wt.f == nil {
if !pd.wrun {
if pd.wd > 0 && !combo {
pd.wt.f = netpollWriteDeadline
pd.wt.arg = pd.makeArg()
pd.wt.seq = pd.wseq
pd.wt.reset(pd.wd)
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
pd.wrun = true
}
} else if pd.wd != wd0 || combo != combo0 {
pd.wseq++ // invalidate current timers
@ -424,7 +422,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
} else {
pd.wt.stop()
pd.wt.f = nil
pd.wrun = false
}
}
// If we set the new deadline in the past, unblock currently pending IO if any.
@ -461,13 +459,13 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
delta := int32(0)
rg = netpollunblock(pd, 'r', false, &delta)
wg = netpollunblock(pd, 'w', false, &delta)
if pd.rt.f != nil {
if pd.rrun {
pd.rt.stop()
pd.rt.f = nil
pd.rrun = false
}
if pd.wt.f != nil {
if pd.wrun {
pd.wt.stop()
pd.wt.f = nil
pd.wrun = false
}
unlock(&pd.lock)
if rg != nil {
@ -634,7 +632,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
delta := int32(0)
var rg *g
if read {
if pd.rd <= 0 || pd.rt.f == nil {
if pd.rd <= 0 || !pd.rrun {
throw("runtime: inconsistent read deadline")
}
pd.rd = -1
@ -643,7 +641,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
}
var wg *g
if write {
if pd.wd <= 0 || pd.wt.f == nil && !read {
if pd.wd <= 0 || !pd.wrun && !read {
throw("runtime: inconsistent write deadline")
}
pd.wd = -1
@ -697,13 +695,15 @@ func (c *pollCache) alloc() *pollDesc {
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
lockInit(&pd.lock, lockRankPollDesc)
pd.rt.init(nil, nil)
pd.wt.init(nil, nil)
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}

View File

@ -506,6 +506,7 @@ type g struct {
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
sleepWhen int64 // when to sleep until
selectDone atomic.Uint32 // are we participating in a select and did someone win the race?
coroarg *coro // argument during coroutine transfers

View File

@ -17,9 +17,11 @@ import (
func TestSizeof(t *testing.T) {
const _64bit = unsafe.Sizeof(uintptr(0)) == 8
g32bit := uintptr(256)
g32bit := uintptr(264)
if goexperiment.ExecTracer2 {
g32bit = uintptr(260)
// gTraceState changed from 2 uint64, 1 pointer, 1 bool to 2 uint64, 3 uint32.
// On 32-bit, that's one extra word.
g32bit += 4
}
var tests = []struct {
@ -27,7 +29,7 @@ func TestSizeof(t *testing.T) {
_32bit uintptr // size on 32bit platforms
_64bit uintptr // size on 64bit platforms
}{
{runtime.G{}, g32bit, 424}, // g, but exported for testing
{runtime.G{}, g32bit, 432}, // g, but exported for testing
{runtime.Sudog{}, 56, 88}, // sudog, but exported for testing
}

View File

@ -19,34 +19,49 @@ import (
//
// A timer is expected to be used by only one client goroutine at a time,
// but there will be concurrent access by the P managing that timer.
// The fundamental state about the timer is managed in the atomic state field,
// including a lock bit to manage access to the other fields.
// The lock bit supports a manual cas-based spin lock that handles
// contention by yielding the OS thread. The expectation is that critical
// sections are very short and contention on the lock bit is low.
// Timer accesses are protected by the lock t.mu, with a snapshot of
// t's state bits published in t.astate to enable certain fast paths to make
// decisions about a timer without acquiring the lock.
type timer struct {
ts *timers
// mu protects reads and writes to all fields, with exceptions noted below.
mu mutex
astate atomic.Uint8 // atomic copy of state bits at last unlock; can be read without lock
state uint8 // state bits
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
// a well-behaved function and not block.
//
// when must be positive on an active timer.
// Timers in heaps are ordered by when.
when int64
period int64
f func(any, uintptr)
arg any
seq uintptr
// nextWhen is the next value for when,
// set if state&timerNextWhen is true.
// In that case, the actual update of when = nextWhen
// must be delayed until the heap can be fixed at the same time.
nextWhen int64
// If non-nil, the timers containing t.
ts *timers
// The state field holds state bits, defined below.
state atomic.Uint32
// whenHeap is a (perhaps outdated) copy of t.when for use
// ordering t within t.ts.heap.
// When t is in a heap but t.whenHeap is outdated,
// the timerModified state bit is set.
// The actual update t.whenHeap = t.when must be
// delayed until the heap can be reordered at the same time
// (meaning t's lock must be held for whenHeap,
// and t.ts's lock must be held for the heap reordering).
// Since writes to whenHeap are protected by two locks (t.mu and t.ts.mu),
// it is permitted to read whenHeap when holding either one.
whenHeap int64
}
// init initializes a newly allocated timer t.
// Any code that allocates a timer must call t.init before using it.
// The arg and f can be set during init, or they can be nil in init
// and set by a future call to t.modify.
func (t *timer) init(f func(any, uintptr), arg any) {
lockInit(&t.mu, lockRankTimer)
t.f = f
t.arg = arg
}
// A timers is a per-P set of timers.
@ -55,7 +70,7 @@ type timers struct {
// access the timers of another P, so we have to lock.
mu mutex
// heap is the set of timers, ordered by t.when.
// heap is the set of timers, ordered by t.whenHeap.
// Must hold lock to access.
heap []*timer
@ -69,16 +84,16 @@ type timers struct {
// raceCtx is the race context used while executing timer functions.
raceCtx uintptr
// minWhen is the minimum heap[i].when value (= heap[0].when).
// The wakeTime method uses minWhen and minNextWhen to determine
// the next wake time.
// If minWhen = 0, it means there are no timers in the heap.
minWhen atomic.Int64
// minWhenHeap is the minimum heap[i].whenHeap value (= heap[0].whenHeap).
// The wakeTime method uses minWhenHeap and minWhenModified
// to determine the next wake time.
// If minWhenHeap = 0, it means there are no timers in the heap.
minWhenHeap atomic.Int64
// minNextWhen is a lower bound on the minimum
// heap[i].nextWhen over timers with the timerNextWhen bit set.
// If minNextWhen = 0, it means there are no timerNextWhen timers in the heap.
minNextWhen atomic.Int64
// minWhenModified is a lower bound on the minimum
// heap[i].when over timers with the timerModified bit set.
// If minWhenModified = 0, it means there are no timerModified timers in the heap.
minWhenModified atomic.Int64
}
func (ts *timers) lock() {
@ -99,114 +114,79 @@ func (ts *timers) unlock() {
}
// Timer state field.
// Timers start zeroed, so the zero state should be "unlocked, not in heap".
const (
// timerLocked is set when the timer is locked,
// meaning other goroutines cannot read or write mutable fields.
// Goroutines can still read the state word atomically to see
// what the state was before it was locked.
// The lock is implemented as a cas on the state field with osyield on contention;
// the expectation is very short critical sections with little to no contention.
timerLocked = 1 << iota
// timerHeaped is set when the timer is stored in some P's heap.
timerHeaped
timerHeaped uint8 = 1 << iota
// timerNextWhen is set when a pending change to the timer's when
// field has been stored in t.nextwhen. The change to t.when waits
// until the heap in which the timer appears can also be updated.
// Only set when timerHeaped is also set.
timerNextWhen
// timerModified is set when t.when has been modified but
// t.whenHeap still needs to be updated as well.
// The change to t.whenHeap waits until the heap in which
// the timer appears can be locked and rearranged.
// timerModified is only set when timerHeaped is also set.
timerModified
// timerZombie is set when the timer has been stopped
// but is still present in some P's heap.
// Only set when timerHeaped is also set.
// It is possible for timerNextWhen and timerZombie to both
// It is possible for timerModified and timerZombie to both
// be set, meaning that the timer was modified and then stopped.
timerZombie
)
// lock locks the timer, allowing reading or writing any of the timer fields.
// It returns the current m and the status prior to the lock.
// The caller must call unlock with the same m and an updated status.
func (t *timer) lock() (state uint32, mp *m) {
acquireLockRank(lockRankTimer)
for {
state := t.state.Load()
if state&timerLocked != 0 {
osyield()
continue
}
// Prevent preemption while the timer is locked.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if t.state.CompareAndSwap(state, state|timerLocked) {
return state, mp
}
releasem(mp)
}
func (t *timer) lock() {
lock(&t.mu)
}
// unlock unlocks the timer.
// If mp == nil, the caller is responsible for calling
// releasem(mp) with the mp returned by t.lock.
func (t *timer) unlock(state uint32, mp *m) {
releaseLockRank(lockRankTimer)
if t.state.Load()&timerLocked == 0 {
badTimer()
}
if state&timerLocked != 0 {
badTimer()
}
t.state.Store(state)
if mp != nil {
releasem(mp)
}
// unlock updates t.astate and unlocks the timer.
func (t *timer) unlock() {
// Let heap fast paths know whether t.whenHeap is accurate.
t.astate.Store(t.state)
unlock(&t.mu)
}
// updateHeap updates t.when as directed by state, returning the new state
// and a bool indicating whether the state (and t.when) changed.
// updateHeap updates t.whenHeap as directed by t.state, updating t.state
// and returning a bool indicating whether the state (and t.whenHeap) changed.
// The caller must hold t's lock, or the world can be stopped instead.
// If ts != nil, then ts must be locked, t must be ts.heap[0], and updateHeap
// takes care of moving t within the timers heap to preserve the heap invariants.
// If ts == nil, then t must not be in a heap (or is in a heap that is
// temporarily not maintaining its invariant, such as during timers.adjust).
func (t *timer) updateHeap(state uint32, ts *timers) (newState uint32, updated bool) {
func (t *timer) updateHeap(ts *timers) (updated bool) {
assertWorldStoppedOrLockHeld(&t.mu)
if ts != nil {
if t.ts != ts || t != ts.heap[0] {
badTimer()
}
assertLockHeld(&ts.mu)
}
if state&timerZombie != 0 {
// Take timer out of heap, applying final t.when update first.
state &^= timerHeaped | timerZombie
if state&timerNextWhen != 0 {
state &^= timerNextWhen
t.when = t.nextWhen
if t.state&timerZombie != 0 {
// Take timer out of heap, applying final t.whenHeap update first.
t.state &^= timerHeaped | timerZombie
if t.state&timerModified != 0 {
t.state &^= timerModified
t.whenHeap = t.when
}
if ts != nil {
if t != ts.heap[0] {
badTimer()
}
ts.zombies.Add(-1)
ts.deleteMin()
}
return state, true
return true
}
if state&timerNextWhen != 0 {
// Apply t.when update and move within heap.
state &^= timerNextWhen
t.when = t.nextWhen
if t.state&timerModified != 0 {
// Apply t.whenHeap update and move within heap.
t.state &^= timerModified
t.whenHeap = t.when
// Move t to the right position.
if ts != nil {
if t != ts.heap[0] {
badTimer()
}
ts.siftDown(0)
ts.updateMinWhen()
ts.updateMinWhenHeap()
}
return state, true
return true
}
return state, false
return false
}
// maxWhen is the maximum value for timer's when field.
@ -233,24 +213,23 @@ func timeSleep(ns int64) {
t := gp.timer
if t == nil {
t = new(timer)
t.init(goroutineReady, gp)
gp.timer = t
}
t.f = goroutineReady
t.arg = gp
t.nextWhen = nanotime() + ns
if t.nextWhen < 0 { // check for overflow.
t.nextWhen = maxWhen
when := nanotime() + ns
if when < 0 { // check for overflow.
when = maxWhen
}
gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceBlockSleep, 1)
gp.sleepWhen = when
gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)
}
// resetForSleep is called after the goroutine is parked for timeSleep.
// We can't call resettimer in timeSleep itself because if this is a short
// We can't call timer.reset in timeSleep itself because if this is a short
// sleep and there are many goroutines then the P can wind up running the
// timer function, goroutineReady, before the goroutine has been parked.
func resetForSleep(gp *g, ut unsafe.Pointer) bool {
t := (*timer)(ut)
t.reset(t.nextWhen)
func resetForSleep(gp *g, _ unsafe.Pointer) bool {
gp.timer.reset(gp.sleepWhen, 0)
return true
}
@ -269,14 +248,11 @@ type timeTimer struct {
//go:linkname newTimer time.newTimer
func newTimer(when, period int64, f func(any, uintptr), arg any) *timeTimer {
t := new(timeTimer)
t.when = when
t.period = period
t.f = f
t.arg = arg
t.timer.init(nil, nil)
if raceenabled {
racerelease(unsafe.Pointer(&t.timer))
}
t.reset(t.when)
t.modify(when, period, f, arg, 0)
t.init = true
return t
}
@ -289,26 +265,16 @@ func stopTimer(t *timeTimer) bool {
return t.stop()
}
// resetTimer resets an inactive timer, adding it to the heap.
// resetTimer resets an inactive timer, adding it to the timer heap.
//
// Reports whether the timer was modified before it was run.
//
//go:linkname resetTimer time.resetTimer
func resetTimer(t *timeTimer, when int64) bool {
func resetTimer(t *timeTimer, when, period int64) bool {
if raceenabled {
racerelease(unsafe.Pointer(&t.timer))
}
return t.reset(when)
}
// modTimer modifies an existing timer.
//
//go:linkname modTimer time.modTimer
func modTimer(t *timeTimer, when, period int64) {
if raceenabled {
racerelease(unsafe.Pointer(&t.timer))
}
t.modify(when, period, t.f, t.arg, t.seq)
return t.reset(when, period)
}
// Go runtime.
@ -335,37 +301,30 @@ func (ts *timers) addHeap(t *timer) {
throw("ts set in timer")
}
t.ts = ts
t.whenHeap = t.when
ts.heap = append(ts.heap, t)
ts.siftUp(len(ts.heap) - 1)
if t == ts.heap[0] {
ts.updateMinWhen()
ts.updateMinWhenHeap()
}
}
// stop deletes the timer t. It may be on some other P, so we can't
// stop stops the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as stopped.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was stopped before it was run.
func (t *timer) stop() bool {
state, mp := t.lock()
pending := false
if state&timerHeaped != 0 {
// Timer is in some heap, but is possibly already stopped
// (indicated by a nextWhen update to 0).
if state&timerNextWhen == 0 || t.nextWhen > 0 {
// Timer pending: stop it.
t.nextWhen = 0
state |= timerNextWhen
pending = true
}
// Mark timer for removal unless already marked.
if state&timerZombie == 0 {
state |= timerZombie
t.lock()
if t.state&timerHeaped != 0 {
t.state |= timerModified
if t.state&timerZombie == 0 {
t.state |= timerZombie
t.ts.zombies.Add(1)
}
}
t.unlock(state, mp)
pending := t.when > 0
t.when = 0
t.unlock()
return pending
}
@ -387,16 +346,17 @@ func (ts *timers) deleteMin() {
if last > 0 {
ts.siftDown(0)
}
ts.updateMinWhen()
ts.updateMinWhenHeap()
if last == 0 {
// If there are no timers, then clearly there are no timerNextWhen timers.
ts.minNextWhen.Store(0)
// If there are no timers, then clearly there are no timerModified timers.
ts.minWhenModified.Store(0)
}
}
// modify modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
// If f == nil, then t.f, t.arg, and t.seq are not modified.
func (t *timer) modify(when, period int64, f func(any, uintptr), arg any, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
@ -405,49 +365,39 @@ func (t *timer) modify(when, period int64, f func(any, uintptr), arg any, seq ui
throw("timer period must be non-negative")
}
state, mp := t.lock()
t.lock()
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if f != nil {
t.f = f
t.arg = arg
t.seq = seq
}
if state&timerHeaped == 0 {
// Set up t for insertion but unlock first,
// to avoid lock inversion with timers lock.
// Since t is not in a heap yet, nothing will
// find and modify it until after the ts.add.
t.when = when
t.unlock(state, mp)
wake := false
pending := t.when > 0
t.when = when
if t.state&timerHeaped != 0 {
t.state |= timerModified
if t.state&timerZombie != 0 {
// In the heap but marked for removal (by a Stop).
// Unmark it, since it has been Reset and will be running again.
t.ts.zombies.Add(-1)
t.state &^= timerZombie
}
// Cannot modify t.whenHeap until t.ts is locked.
// See comment in type timer above and in timers.adjust below.
if when < t.whenHeap {
wake = true
t.ts.updateMinWhenModified(when)
}
}
add := t.needsAdd()
t.unlock()
if add {
t.maybeAdd()
return false
}
pending := true // in the heap
if state&timerZombie != 0 {
// In the heap but marked for removal (by a Stop); therefore not pending.
// Unmark it, since it has been Reset and will be running again.
pending = false
t.ts.zombies.Add(-1)
state &^= timerZombie
}
// The timer is in some P's heap (perhaps another P),
// so we can't change the when field.
// If we did, the other P's heap would be out of order.
// So we put the new when value in the nextWhen field
// and set timerNextWhen, leaving the other P set the when
// field when it is prepared to maintain the heap invariant.
t.nextWhen = when
state |= timerNextWhen
earlier := when < t.when
if earlier {
t.ts.updateMinNextWhen(when)
}
t.unlock(state, mp)
// If the new status is earlier, wake up the poller.
if earlier {
if wake {
wakeNetPoller(when)
}
@ -455,9 +405,10 @@ func (t *timer) modify(when, period int64, f func(any, uintptr), arg any, seq ui
}
// needsAdd reports whether t needs to be added to a timers heap.
func (t *timer) needsAdd(state uint32) bool {
return state&timerHeaped == 0 &&
t.when > 0
// t must be locked.
func (t *timer) needsAdd() bool {
assertLockHeld(&t.mu)
return t.state&timerHeaped == 0 && t.when > 0
}
// maybeAdd adds t to the local timers heap if it needs to be in a heap.
@ -475,22 +426,21 @@ func (t *timer) needsAdd(state uint32) bool {
// too clever and respect the static ordering.
// (If we don't, we have to change the static lock checking of t and ts.)
//
// Because we are not holding t while acquiring ts,
// concurrent calls to enqueueTimerChan may result in
// concurrent calls to t.maybeAdd, so we cannot assume that
// t is not in a heap on entry to t.maybeAdd.
// Concurrent calls to time.Timer.Reset
// may result in concurrent calls to t.maybeAdd,
// so we cannot assume that t is not in a heap on entry to t.maybeAdd.
func (t *timer) maybeAdd() {
ts := &getg().m.p.ptr().timers
ts.lock()
ts.cleanHead()
state, mp := t.lock()
t.lock()
when := int64(0)
if t.needsAdd(state) {
state |= timerHeaped
ts.addHeap(t)
if t.needsAdd() {
t.state |= timerHeaped
when = t.when
ts.addHeap(t)
}
t.unlock(state, mp)
t.unlock()
ts.unlock()
if when > 0 {
wakeNetPoller(when)
@ -500,8 +450,8 @@ func (t *timer) maybeAdd() {
// reset resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// Reports whether the timer was active and was stopped.
func (t *timer) reset(when int64) bool {
return t.modify(when, t.period, t.f, t.arg, t.seq)
func (t *timer) reset(when, period int64) bool {
return t.modify(when, period, nil, nil, 0)
}
// cleanHead cleans up the head of the timer queue. This speeds up
@ -529,14 +479,14 @@ func (ts *timers) cleanHead() {
throw("bad ts")
}
if t.state.Load()&(timerNextWhen|timerZombie) == 0 {
if t.astate.Load()&(timerModified|timerZombie) == 0 {
// Fast path: head of timers does not need adjustment.
return
}
state, mp := t.lock()
state, updated := t.updateHeap(state, ts)
t.unlock(state, mp)
t.lock()
updated := t.updateHeap(ts)
t.unlock()
if !updated {
// Head of timers does not need adjustment.
return
@ -558,7 +508,8 @@ func (ts *timers) take(src *timers) {
ts.move(src.heap)
src.heap = nil
src.zombies.Store(0)
src.minWhen.Store(0)
src.minWhenHeap.Store(0)
src.minWhenModified.Store(0)
src.len.Store(0)
ts.len.Store(uint32(len(ts.heap)))
}
@ -570,13 +521,11 @@ func (ts *timers) take(src *timers) {
func (ts *timers) move(timers []*timer) {
assertWorldStopped()
for _, t := range timers {
state := t.state.Load()
t.ts = nil
state, _ = t.updateHeap(state, nil)
if state&timerHeaped != 0 {
t.updateHeap(nil)
if t.state&timerHeaped != 0 {
ts.addHeap(t)
}
t.state.Store(state)
}
}
@ -593,7 +542,7 @@ func (ts *timers) adjust(now int64, force bool) {
// We'll postpone looking through all the adjusted timers until
// one would actually expire.
if !force {
first := ts.minNextWhen.Load()
first := ts.minWhenModified.Load()
if first == 0 || first > now {
if verifyTimers {
ts.verify()
@ -602,50 +551,49 @@ func (ts *timers) adjust(now int64, force bool) {
}
}
// minNextWhen is a lower bound on the earliest t.nextWhen
// among the timerNextWhen timers. We want to make it more precise:
// we are going to scan the heap and clean out all the timerNextWhen bits,
// at which point minNextWhen can be set to 0 (indicating none at all).
// minWhenModified is a lower bound on the earliest t.when
// among the timerModified timers. We want to make it more precise:
// we are going to scan the heap and clean out all the timerModified bits,
// at which point minWhenModified can be set to 0 (indicating none at all).
//
// Other P's can be calling ts.wakeTime concurrently, and we'd like to
// keep ts.wakeTime returning an accurate value throughout this entire process.
//
// Setting minNextWhen = 0 *before* the scan could make wakeTime
// return an incorrect value: if minNextWhen < nextWhen, then clearing
// it to 0 will make wakeTime return nextWhen (too late) until the scan finishes.
// To avoid that, we want to set minNextWhen to 0 *after* the scan.
// Setting minWhenModified = 0 *before* the scan could make wakeTime
// return an incorrect value: if minWhenModified < minWhenHeap, then clearing
// it to 0 will make wakeTime return minWhenHeap (too late) until the scan finishes.
// To avoid that, we want to set minWhenModified to 0 *after* the scan.
//
// Setting minNextWhen = 0 *after* the scan could result in missing
// Setting minWhenModified = 0 *after* the scan could result in missing
// concurrent timer modifications in other goroutines; those will lock
// the specific timer, set the timerNextWhen bit, and set t.nextWhen.
// To avoid that, we want to set minNextWhen to 0 *before* the scan.
// the specific timer, set the timerModified bit, and set t.when.
// To avoid that, we want to set minWhenModified to 0 *before* the scan.
//
// The way out of this dilemma is to preserve wakeTime a different way.
// wakeTime is min(minWhen, minNextWhen), and minWhen is protected by
// ts.lock, which we hold, so we can modify it however we like in service
// of keeping wakeTime accurate.
// wakeTime is min(minWhenHeap, minWhenModified), and minWhenHeap
// is protected by ts.lock, which we hold, so we can modify it however we like
// in service of keeping wakeTime accurate.
//
// So we can:
//
// 1. Set minWhen = min(minWhen, minNextWhen)
// 2. Set minNextWhen = 0
// (Other goroutines may modify timers and update minNextWhen now.)
// 1. Set minWhenHeap = min(minWhenHeap, minWhenModified)
// 2. Set minWhenModified = 0
// (Other goroutines may modify timers and update minWhenModified now.)
// 3. Scan timers
// 4. Set minWhen = heap[0].when
// 4. Set minWhenHeap = heap[0].whenHeap
//
// That order preserves a correct value of wakeTime throughout the entire
// operation:
// Step 1 “locks in” an accurate wakeTime even with minNextWhen cleared.
// Step 2 makes sure concurrent nextWhen updates are not lost during the scan.
// Step 3 clears all the old nextWhen values, justifying minNextWhen = 0.
// Step 4 corrects minWhen to a precise value.
// Step 1 “locks in” an accurate wakeTime even with minWhenModified cleared.
// Step 2 makes sure concurrent t.when updates are not lost during the scan.
// Step 3 processes all modified timer values, justifying minWhenModified = 0.
// Step 4 corrects minWhenHeap to a precise value.
//
// The wakeTime method implementation reads minNextWhen *before* minWhen,
// so that if the minNextWhen observes a 0, that means the minWhen
// will include the information about what was zeroed.
ts.minWhen.Store(ts.wakeTime())
ts.minNextWhen.Store(0)
// The wakeTime method implementation reads minWhenModified *before* minWhenHeap,
// so that if the minWhenModified is observed to be 0, that means the minWhenHeap that
// follows will include the information that was zeroed out of it.
ts.minWhenHeap.Store(ts.wakeTime())
ts.minWhenModified.Store(0)
changed := false
for i := 0; i < len(ts.heap); i++ {
@ -654,17 +602,16 @@ func (ts *timers) adjust(now int64, force bool) {
throw("bad ts")
}
state, mp := t.lock()
if state&timerHeaped == 0 {
t.lock()
if t.state&timerHeaped == 0 {
badTimer()
}
if state&timerZombie != 0 {
if t.state&timerZombie != 0 {
ts.zombies.Add(-1) // updateHeap will return updated=true and we will delete t
}
state, updated := t.updateHeap(state, nil)
if updated {
if t.updateHeap(nil) {
changed = true
if state&timerHeaped == 0 {
if t.state&timerHeaped == 0 {
n := len(ts.heap)
ts.heap[i] = ts.heap[n-1]
ts.heap[n-1] = nil
@ -673,13 +620,13 @@ func (ts *timers) adjust(now int64, force bool) {
i--
}
}
t.unlock(state, mp)
t.unlock()
}
if changed {
ts.initHeap()
}
ts.updateMinWhen()
ts.updateMinWhenHeap()
if verifyTimers {
ts.verify()
@ -698,8 +645,8 @@ func (ts *timers) wakeTime() int64 {
// We read minWhen after reading minNextWhen so that
// if we see a cleared minNextWhen, we are guaranteed to see
// the updated minWhen.
nextWhen := ts.minNextWhen.Load()
when := ts.minWhen.Load()
nextWhen := ts.minWhenModified.Load()
when := ts.minWhenHeap.Load()
if when == 0 || (nextWhen != 0 && nextWhen < when) {
when = nextWhen
}
@ -781,43 +728,49 @@ Redo:
throw("bad ts")
}
if t.state.Load()&(timerNextWhen|timerZombie) == 0 && t.when > now {
if t.astate.Load()&(timerModified|timerZombie) == 0 && t.whenHeap > now {
// Fast path: not ready to run.
// The access of t.when is protected by the caller holding
// The access of t.whenHeap is protected by the caller holding
// ts.lock, even though t itself is unlocked.
return t.when
return t.whenHeap
}
state, mp := t.lock()
state, updated := t.updateHeap(state, ts)
if updated {
t.unlock(state, mp)
t.lock()
if t.updateHeap(ts) {
t.unlock()
goto Redo
}
if state&timerHeaped == 0 {
if t.state&timerHeaped == 0 || t.state&timerModified != 0 {
badTimer()
}
if t.when > now {
// Not ready to run.
t.unlock(state, mp)
t.unlock()
return t.when
}
ts.unlockAndRun(t, now, state, mp)
t.unlockAndRun(now)
assertLockHeld(&ts.mu) // t is unlocked now, but not ts
return 0
}
// unlockAndRun unlocks and runs the timer t.
// If t is in a timer set (t.ts != nil), the caller must have locked the timer set,
// unlockAndRun unlocks and runs the timer t (which must be locked).
// If t is in a timer set (t.ts != nil), the caller must also have locked the timer set,
// and this call will temporarily unlock the timer set while running the timer function.
// unlockAndRun returns with t unlocked and t.ts (re-)locked.
//
//go:systemstack
func (ts *timers) unlockAndRun(t *timer, now int64, state uint32, mp *m) {
assertLockHeld(&ts.mu)
func (t *timer) unlockAndRun(now int64) {
assertLockHeld(&t.mu)
if t.ts != nil {
assertLockHeld(&t.ts.mu)
}
if raceenabled {
// Note that we are running on a system stack,
// so there is no chance of getg().m being reassigned
// out from under us while this function executes.
tsLocal := &getg().m.p.ptr().timers
if tsLocal.raceCtx == 0 {
tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum)
@ -825,7 +778,7 @@ func (ts *timers) unlockAndRun(t *timer, now int64, state uint32, mp *m) {
raceacquirectx(tsLocal.raceCtx, unsafe.Pointer(t))
}
if state&(timerNextWhen|timerZombie) != 0 {
if t.state&(timerModified|timerZombie) != 0 {
badTimer()
}
@ -843,18 +796,19 @@ func (ts *timers) unlockAndRun(t *timer, now int64, state uint32, mp *m) {
} else {
next = 0
}
if state&timerHeaped != 0 {
t.nextWhen = next
state |= timerNextWhen
if t.state&timerHeaped != 0 {
t.when = next
t.state |= timerModified
if next == 0 {
state |= timerZombie
t.state |= timerZombie
t.ts.zombies.Add(1)
}
} else {
t.when = next
}
state, _ = t.updateHeap(state, ts)
t.unlock(state, mp)
ts := t.ts
t.updateHeap(ts)
t.unlock()
if raceenabled {
// Temporarily use the current P's racectx for g0.
@ -865,9 +819,13 @@ func (ts *timers) unlockAndRun(t *timer, now int64, state uint32, mp *m) {
gp.racectx = gp.m.p.ptr().timers.raceCtx
}
ts.unlock()
if ts != nil {
ts.unlock()
}
f(arg, seq)
ts.lock()
if ts != nil {
ts.lock()
}
if raceenabled {
gp := getg()
@ -888,8 +846,8 @@ func (ts *timers) verify() {
// The heap is 4-ary. See siftupTimer and siftdownTimer.
p := (i - 1) / 4
if t.when < ts.heap[p].when {
print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].when, ", ", i, ": ", t.when, "\n")
if t.whenHeap < ts.heap[p].whenHeap {
print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].whenHeap, ", ", i, ": ", t.whenHeap, "\n")
throw("bad timer heap")
}
}
@ -899,26 +857,26 @@ func (ts *timers) verify() {
}
}
// updateMinWhen sets ts.minWhen to ts.heap[0].when.
// updateMinWhenHeap sets ts.minWhenHeap to ts.heap[0].whenHeap.
// The caller must have locked ts or the world must be stopped.
func (ts *timers) updateMinWhen() {
func (ts *timers) updateMinWhenHeap() {
assertWorldStoppedOrLockHeld(&ts.mu)
if len(ts.heap) == 0 {
ts.minWhen.Store(0)
ts.minWhenHeap.Store(0)
} else {
ts.minWhen.Store(ts.heap[0].when)
ts.minWhenHeap.Store(ts.heap[0].whenHeap)
}
}
// updateMinNextWhen updates ts.minNextWhen to be <= when.
// updateMinWhenModified updates ts.minWhenModified to be <= when.
// ts need not be (and usually is not) locked.
func (ts *timers) updateMinNextWhen(when int64) {
func (ts *timers) updateMinWhenModified(when int64) {
for {
old := ts.minNextWhen.Load()
old := ts.minWhenModified.Load()
if old != 0 && old < when {
return
}
if ts.minNextWhen.CompareAndSwap(old, when) {
if ts.minWhenModified.CompareAndSwap(old, when) {
return
}
}
@ -963,14 +921,14 @@ func (ts *timers) siftUp(i int) {
if i >= len(t) {
badTimer()
}
when := t[i].when
when := t[i].whenHeap
if when <= 0 {
badTimer()
}
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
if when >= t[p].whenHeap {
break
}
t[i] = t[p]
@ -989,7 +947,7 @@ func (ts *timers) siftDown(i int) {
if i >= n {
badTimer()
}
when := t[i].when
when := t[i].whenHeap
if when <= 0 {
badTimer()
}
@ -1000,15 +958,15 @@ func (ts *timers) siftDown(i int) {
if c >= n {
break
}
w := t[c].when
if c+1 < n && t[c+1].when < w {
w = t[c+1].when
w := t[c].whenHeap
if c+1 < n && t[c+1].whenHeap < w {
w = t[c+1].whenHeap
c++
}
if c3 < n {
w3 := t[c3].when
if c3+1 < n && t[c3+1].when < w3 {
w3 = t[c3+1].when
w3 := t[c3].whenHeap
if c3+1 < n && t[c3+1].whenHeap < w3 {
w3 = t[c3+1].whenHeap
c3++
}
if w3 < w {

View File

@ -955,10 +955,10 @@ func newWakeableSleep() *wakeableSleep {
lockInit(&s.lock, lockRankWakeableSleep)
s.wakeup = make(chan struct{}, 1)
s.timer = new(timer)
s.timer.arg = s
s.timer.f = func(s any, _ uintptr) {
f := func(s any, _ uintptr) {
s.(*wakeableSleep).wake()
}
s.timer.init(f, s)
return s
}
@ -968,7 +968,7 @@ func newWakeableSleep() *wakeableSleep {
// Must not be called by more than one goroutine at a time and
// must not be called concurrently with close.
func (s *wakeableSleep) sleep(ns int64) {
s.timer.reset(nanotime() + ns)
s.timer.reset(nanotime()+ns, 0)
lock(&s.lock)
if raceenabled {
raceacquire(unsafe.Pointer(&s.lock))

View File

@ -36,10 +36,7 @@ func newTimer(when, period int64, f func(any, uintptr), arg any) *Timer
func stopTimer(*Timer) bool
//go:linkname resetTimer
func resetTimer(*Timer, int64) bool
//go:linkname modTimer
func modTimer(t *Timer, when, period int64)
func resetTimer(t *Timer, when, period int64) bool
// Note: The runtime knows the layout of struct Timer, since newTimer allocates it.
// The runtime also knows that Ticker and Timer have the same layout.
@ -132,7 +129,7 @@ func (t *Timer) Reset(d Duration) bool {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(t, w)
return resetTimer(t, w, 0)
}
// sendTime does a non-blocking send of the current time on c.

View File

@ -60,7 +60,7 @@ func (t *Ticker) Reset(d Duration) {
if !t.initTicker {
panic("time: Reset called on uninitialized Ticker")
}
modTimer((*Timer)(unsafe.Pointer(t)), when(d), int64(d))
resetTimer((*Timer)(unsafe.Pointer(t)), when(d), int64(d))
}
// Tick is a convenience wrapper for NewTicker providing access to the ticking