diff --git a/src/internal/synctest/synctest_test.go b/src/internal/synctest/synctest_test.go index 53c7c89716..c2f84be736 100644 --- a/src/internal/synctest/synctest_test.go +++ b/src/internal/synctest/synctest_test.go @@ -226,8 +226,8 @@ func TestTimerNondeterminism(t *testing.T) { const iterations = 1000 var seen1, seen2 bool for range iterations { - tm1 := time.NewTimer(0) - tm2 := time.NewTimer(0) + tm1 := time.NewTimer(1) + tm2 := time.NewTimer(1) select { case <-tm1.C: seen1 = true @@ -278,6 +278,57 @@ func TestSleepNondeterminism(t *testing.T) { }) } +// TestTimerRunsImmediately verifies that a 0-duration timer sends on its channel +// without waiting for the bubble to block. +func TestTimerRunsImmediately(t *testing.T) { + synctest.Run(func() { + start := time.Now() + tm := time.NewTimer(0) + select { + case got := <-tm.C: + if !got.Equal(start) { + t.Errorf("<-tm.C = %v, want %v", got, start) + } + default: + t.Errorf("0-duration timer channel is not readable; want it to be") + } + }) +} + +// TestTimerRunsLater verifies that reading from a timer's channel receives the +// timer fired, even when that time is in reading from a timer's channel receives the +// time the timer fired, even when that time is in the past. +func TestTimerRanInPast(t *testing.T) { + synctest.Run(func() { + delay := 1 * time.Second + want := time.Now().Add(delay) + tm := time.NewTimer(delay) + time.Sleep(2 * delay) + select { + case got := <-tm.C: + if !got.Equal(want) { + t.Errorf("<-tm.C = %v, want %v", got, want) + } + default: + t.Errorf("0-duration timer channel is not readable; want it to be") + } + }) +} + +// TestAfterFuncRunsImmediately verifies that a 0-duration AfterFunc is scheduled +// without waiting for the bubble to block. +func TestAfterFuncRunsImmediately(t *testing.T) { + synctest.Run(func() { + var b atomic.Bool + time.AfterFunc(0, func() { + b.Store(true) + }) + for !b.Load() { + runtime.Gosched() + } + }) +} + func TestChannelFromOutsideBubble(t *testing.T) { choutside := make(chan struct{}) for _, test := range []struct { diff --git a/src/runtime/chan.go b/src/runtime/chan.go index df48267e97..bb554ebfdb 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -497,7 +497,7 @@ func empty(c *hchan) bool { // c.timer is also immutable (it is set after make(chan) but before any channel operations). // All timer channels have dataqsiz > 0. if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } return atomic.Loaduint(&c.qcount) == 0 } @@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) } if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } // Fast path: check for failed non-blocking operation without acquiring the lock. @@ -821,7 +821,7 @@ func chanlen(c *hchan) int { } async := debug.asynctimerchan.Load() != 0 if c.timer != nil && async { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } if c.timer != nil && !async { // timer channels have a buffered implementation diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 5b8db2bee4..37a7b7f684 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -3341,7 +3341,7 @@ top: // which may steal timers. It's important that between now // and then, nothing blocks, so these numbers remain mostly // relevant. - now, pollUntil, _ := pp.timers.check(0) + now, pollUntil, _ := pp.timers.check(0, nil) // Try to schedule the trace reader. if traceEnabled() || traceShuttingDown() { @@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo // timerpMask tells us whether the P may have timers at all. If it // can't, no need to check at all. if stealTimersOrRunNextG && timerpMask.read(enum.position()) { - tnow, w, ran := p2.timers.check(now) + tnow, w, ran := p2.timers.check(now, nil) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w diff --git a/src/runtime/select.go b/src/runtime/select.go index 19256df6a6..ae7754b173 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo } if cas.c.timer != nil { - cas.c.timer.maybeRunChan() + cas.c.timer.maybeRunChan(cas.c) } j := cheaprandn(uint32(norder + 1)) diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go index f676afa20d..c837c792a5 100644 --- a/src/runtime/synctest.go +++ b/src/runtime/synctest.go @@ -185,7 +185,6 @@ func synctestRun(f func()) { } const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01 bubble.now = synctestBaseTime - bubble.timers.bubble = bubble lockInit(&bubble.mu, lockRankSynctest) lockInit(&bubble.timers.mu, lockRankTimers) @@ -213,7 +212,7 @@ func synctestRun(f func()) { // so timer goroutines inherit their child race context from g0. curg := gp.m.curg gp.m.curg = nil - gp.bubble.timers.check(gp.bubble.now) + gp.bubble.timers.check(bubble.now, bubble) gp.m.curg = curg }) gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0) diff --git a/src/runtime/time.go b/src/runtime/time.go index a1f8351a1e..4880dce8cd 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -157,8 +157,6 @@ type timers struct { // heap[i].when over timers with the timerModified bit set. // If minWhenModified = 0, it means there are no timerModified timers in the heap. minWhenModified atomic.Int64 - - bubble *synctestBubble } type timerWhen struct { @@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg throw("invalid timer channel: no capacity") } } - if gr := getg().bubble; gr != nil { + if bubble := getg().bubble; bubble != nil { t.isFake = true } t.modify(when, period, f, arg, 0) @@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() { // timer ourselves now is fine.) if now := nanotime(); t.when <= now { systemstack(func() { - t.unlockAndRun(now) // resets t.when + t.unlockAndRun(now, nil) // resets t.when }) t.lock() } @@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in add := t.needsAdd() + if add && t.isFake { + // If this is a bubbled timer scheduled to fire immediately, + // run it now rather than waiting for the bubble's timer scheduler. + // This avoids deferring timer execution until after the bubble + // becomes durably blocked. + // + // Don't do this for non-bubbled timers: It isn't necessary, + // and there may be cases where the runtime executes timers with + // the expectation the timer func will not run in the current goroutine. + // Bubbled timers are always created by the time package, and are + // safe to run in the current goroutine. + bubble := getg().bubble + if bubble == nil { + throw("fake timer executing with no bubble") + } + if t.state&timerHeaped == 0 && when <= bubble.now { + systemstack(func() { + t.unlockAndRun(bubble.now, bubble) + }) + return pending + } + } + if !async && t.isChan { // Stop any future sends with stale values. // See timer.unlockAndRun. @@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in // t must be locked. func (t *timer) needsAdd() bool { assertLockHeld(&t.mu) - need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0) + need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) if need { t.trace("needsAdd+") } else { @@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 { // We pass now in and out to avoid extra calls of nanotime. // //go:yeswritebarrierrec -func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { +func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) { ts.trace("check") // If it's not yet time for the first timer, or the first adjusted // timer, then there is nothing to do. @@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { ts.adjust(now, false) for len(ts.heap) > 0 { // Note that runtimer may temporarily unlock ts. - if tw := ts.run(now); tw != 0 { + if tw := ts.run(now, bubble); tw != 0 { if tw > 0 { pollUntil = tw } @@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { // If a timer is run, this will temporarily unlock ts. // //go:systemstack -func (ts *timers) run(now int64) int64 { +func (ts *timers) run(now int64, bubble *synctestBubble) int64 { ts.trace("run") assertLockHeld(&ts.mu) Redo: @@ -1081,7 +1102,7 @@ Redo: return t.when } - t.unlockAndRun(now) + t.unlockAndRun(now, bubble) assertLockHeld(&ts.mu) // t is unlocked now, but not ts return 0 } @@ -1092,7 +1113,7 @@ Redo: // unlockAndRun returns with t unlocked and t.ts (re-)locked. // //go:systemstack -func (t *timer) unlockAndRun(now int64) { +func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) { t.trace("unlockAndRun") assertLockHeld(&t.mu) if t.ts != nil { @@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) { // out from under us while this function executes. gp := getg() var tsLocal *timers - if t.ts == nil || t.ts.bubble == nil { + if bubble == nil { tsLocal = &gp.m.p.ptr().timers } else { - tsLocal = &t.ts.bubble.timers + tsLocal = &bubble.timers } if tsLocal.raceCtx == 0 { tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum) @@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) { if gp.racectx != 0 { throw("unexpected racectx") } - if ts == nil || ts.bubble == nil { + if bubble == nil { gp.racectx = gp.m.p.ptr().timers.raceCtx } else { - gp.racectx = ts.bubble.timers.raceCtx + gp.racectx = bubble.timers.raceCtx } } @@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) { ts.unlock() } - if ts != nil && ts.bubble != nil { + if bubble != nil { // Temporarily use the timer's synctest group for the G running this timer. gp := getg() if gp.bubble != nil { throw("unexpected syncgroup set") } - gp.bubble = ts.bubble - ts.bubble.changegstatus(gp, _Gdead, _Grunning) + gp.bubble = bubble + bubble.changegstatus(gp, _Gdead, _Grunning) } if !async && t.isChan { @@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) { unlock(&t.sendLock) } - if ts != nil && ts.bubble != nil { + if bubble != nil { gp := getg() - ts.bubble.changegstatus(gp, _Grunning, _Gdead) + bubble.changegstatus(gp, _Grunning, _Gdead) if raceenabled { // Establish a happens-before between this timer event and // the next synctest.Wait call. - racereleasemergeg(gp, ts.bubble.raceaddr()) + racereleasemergeg(gp, bubble.raceaddr()) } gp.bubble = nil } @@ -1415,24 +1436,10 @@ func badTimer() { // maybeRunChan checks whether the timer needs to run // to send a value to its associated channel. If so, it does. // The timer must not be locked. -func (t *timer) maybeRunChan() { - if t.isFake { - t.lock() - var timerBubble *synctestBubble - if t.ts != nil { - timerBubble = t.ts.bubble - } - t.unlock() - bubble := getg().bubble - if bubble == nil { - panic(plainError("synctest timer accessed from outside bubble")) - } - if timerBubble != nil && bubble != timerBubble { - panic(plainError("timer moved between synctest bubbles")) - } - // No need to do anything here. - // synctest.Run will run the timer when it advances its fake clock. - return +func (t *timer) maybeRunChan(c *hchan) { + if t.isFake && getg().bubble != c.bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } if t.astate.Load()&timerHeaped != 0 { // If the timer is in the heap, the ordinary timer code @@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() { t.lock() now := nanotime() + if t.isFake { + now = getg().bubble.now + } if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { t.trace("maybeRunChan-") // Timer in the heap, or not running at all, or not triggered. @@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() { } t.trace("maybeRunChan+") systemstack(func() { - t.unlockAndRun(now) + t.unlockAndRun(now, c.bubble) }) } @@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() { // adding it if needed. func blockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return + if t.isFake && c.bubble != getg().bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } + t.lock() t.trace("blockTimerChan") if !t.isChan { @@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) { // blocked on it anymore. func unblockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return - } t.lock() t.trace("unblockTimerChan") if !t.isChan || t.blocked == 0 {