diff --git a/src/runtime/export_test.go b/src/runtime/export_test.go index a89220e0dd..4168705f2a 100644 --- a/src/runtime/export_test.go +++ b/src/runtime/export_test.go @@ -1330,7 +1330,7 @@ func (t *SemTable) Enqueue(addr *uint32) { // // Returns true if there actually was a waiter to be dequeued. func (t *SemTable) Dequeue(addr *uint32) bool { - s, _ := t.semTable.rootFor(addr).dequeue(addr) + s, _, _ := t.semTable.rootFor(addr).dequeue(addr) if s != nil { releaseSudog(s) return true diff --git a/src/runtime/mprof.go b/src/runtime/mprof.go index 308ebaebe6..45f107722a 100644 --- a/src/runtime/mprof.go +++ b/src/runtime/mprof.go @@ -553,8 +553,6 @@ func mutexevent(cycles int64, skip int) { cycles = 0 } rate := int64(atomic.Load64(&mutexprofilerate)) - // TODO(pjw): measure impact of always calling fastrand vs using something - // like malloc.go:nextSample() if rate > 0 && int64(fastrand())%rate == 0 { saveblockevent(cycles, rate, skip+1, mutexProfile) } diff --git a/src/runtime/pprof/pprof_test.go b/src/runtime/pprof/pprof_test.go index 1ade860441..9480240615 100644 --- a/src/runtime/pprof/pprof_test.go +++ b/src/runtime/pprof/pprof_test.go @@ -1023,7 +1023,7 @@ func containsStack(got [][]string, want []string) bool { // awaitBlockedGoroutine spins on runtime.Gosched until a runtime stack dump // shows a goroutine in the given state with a stack frame in // runtime/pprof.. -func awaitBlockedGoroutine(t *testing.T, state, fName string) { +func awaitBlockedGoroutine(t *testing.T, state, fName string, count int) { re := fmt.Sprintf(`(?m)^goroutine \d+ \[%s\]:\n(?:.+\n\t.+\n)*runtime/pprof\.%s`, regexp.QuoteMeta(state), fName) r := regexp.MustCompile(re) @@ -1047,7 +1047,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) { buf = make([]byte, 2*len(buf)) continue } - if r.Match(buf[:n]) { + if len(r.FindAll(buf[:n], -1)) >= count { return } } @@ -1056,7 +1056,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) { func blockChanRecv(t *testing.T) { c := make(chan bool) go func() { - awaitBlockedGoroutine(t, "chan receive", "blockChanRecv") + awaitBlockedGoroutine(t, "chan receive", "blockChanRecv", 1) c <- true }() <-c @@ -1065,7 +1065,7 @@ func blockChanRecv(t *testing.T) { func blockChanSend(t *testing.T) { c := make(chan bool) go func() { - awaitBlockedGoroutine(t, "chan send", "blockChanSend") + awaitBlockedGoroutine(t, "chan send", "blockChanSend", 1) <-c }() c <- true @@ -1074,7 +1074,7 @@ func blockChanSend(t *testing.T) { func blockChanClose(t *testing.T) { c := make(chan bool) go func() { - awaitBlockedGoroutine(t, "chan receive", "blockChanClose") + awaitBlockedGoroutine(t, "chan receive", "blockChanClose", 1) close(c) }() <-c @@ -1086,7 +1086,7 @@ func blockSelectRecvAsync(t *testing.T) { c2 := make(chan bool, 1) go func() { for i := 0; i < numTries; i++ { - awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync") + awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync", 1) c <- true } }() @@ -1102,7 +1102,7 @@ func blockSelectSendSync(t *testing.T) { c := make(chan bool) c2 := make(chan bool) go func() { - awaitBlockedGoroutine(t, "select", "blockSelectSendSync") + awaitBlockedGoroutine(t, "select", "blockSelectSendSync", 1) <-c }() select { @@ -1115,7 +1115,7 @@ func blockMutex(t *testing.T) { var mu sync.Mutex mu.Lock() go func() { - awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex") + awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", 1) mu.Unlock() }() // Note: Unlock releases mu before recording the mutex event, @@ -1125,12 +1125,36 @@ func blockMutex(t *testing.T) { mu.Lock() } +func blockMutexN(t *testing.T, n int, d time.Duration) { + var wg sync.WaitGroup + var mu sync.Mutex + mu.Lock() + go func() { + awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", n) + time.Sleep(d) + mu.Unlock() + }() + // Note: Unlock releases mu before recording the mutex event, + // so it's theoretically possible for this to proceed and + // capture the profile before the event is recorded. As long + // as this is blocked before the unlock happens, it's okay. + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mu.Lock() + mu.Unlock() + }() + } + wg.Wait() +} + func blockCond(t *testing.T) { var mu sync.Mutex c := sync.NewCond(&mu) mu.Lock() go func() { - awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond") + awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond", 1) mu.Lock() c.Signal() mu.Unlock() @@ -1217,7 +1241,11 @@ func TestMutexProfile(t *testing.T) { t.Fatalf("need MutexProfileRate 0, got %d", old) } - blockMutex(t) + const ( + N = 100 + D = 100 * time.Millisecond + ) + blockMutexN(t, N, D) t.Run("debug=1", func(t *testing.T) { var w strings.Builder @@ -1230,15 +1258,11 @@ func TestMutexProfile(t *testing.T) { } prof = strings.Trim(prof, "\n") lines := strings.Split(prof, "\n") - if len(lines) != 6 { - t.Errorf("expected 6 lines, got %d %q\n%s", len(lines), prof, prof) - } if len(lines) < 6 { - return + t.Fatalf("expected >=6 lines, got %d %q\n%s", len(lines), prof, prof) } // checking that the line is like "35258904 1 @ 0x48288d 0x47cd28 0x458931" r2 := `^\d+ \d+ @(?: 0x[[:xdigit:]]+)+` - //r2 := "^[0-9]+ 1 @ 0x[0-9a-f x]+$" if ok, err := regexp.MatchString(r2, lines[3]); err != nil || !ok { t.Errorf("%q didn't match %q", lines[3], r2) } @@ -1263,12 +1287,30 @@ func TestMutexProfile(t *testing.T) { stks := stacks(p) for _, want := range [][]string{ - {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutex.func1"}, + {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutexN.func1"}, } { if !containsStack(stks, want) { t.Errorf("No matching stack entry for %+v", want) } } + + i := 0 + for ; i < len(p.SampleType); i++ { + if p.SampleType[i].Unit == "nanoseconds" { + break + } + } + if i >= len(p.SampleType) { + t.Fatalf("profile did not contain nanoseconds sample") + } + total := int64(0) + for _, s := range p.Sample { + total += s.Value[i] + } + d := time.Duration(total) + if d < N*D*9/10 || d > N*D*2 { // want N*D but allow [0.9,2.0]*that. + t.Fatalf("profile samples total %v, want %v", d, N*D) + } }) } diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index 5017a7a80a..2a02e1fb3b 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -342,7 +342,7 @@ type gobuf struct { bp uintptr // for framepointer-enabled architectures } -// sudog represents a g in a wait list, such as for sending/receiving +// sudog (pseudo-g) represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation @@ -382,6 +382,13 @@ type sudog struct { // because c was closed. success bool + // waiters is a count of semaRoot waiting list other than head of list, + // clamped to a uint16 to fit in unused space. + // Only meaningful at the head of the list. + // (If we wanted to be overly clever, we could store a high 16 bits + // in the second entry in the list.) + waiters uint16 + parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot diff --git a/src/runtime/sema.go b/src/runtime/sema.go index d0a81170c3..3b6874ca11 100644 --- a/src/runtime/sema.go +++ b/src/runtime/sema.go @@ -191,7 +191,7 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) { unlock(&root.lock) return } - s, t0 := root.dequeue(addr) + s, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } @@ -199,7 +199,28 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) { if s != nil { // May be slow or even yield, so unlock first acquiretime := s.acquiretime if acquiretime != 0 { - mutexevent(t0-acquiretime, 3+skipframes) + // Charge contention that this (delayed) unlock caused. + // If there are N more goroutines waiting beyond the + // one that's waking up, charge their delay as well, so that + // contention holding up many goroutines shows up as + // more costly than contention holding up a single goroutine. + // It would take O(N) time to calculate how long each goroutine + // has been waiting, so instead we charge avg(head-wait, tail-wait)*N. + // head-wait is the longest wait and tail-wait is the shortest. + // (When we do a lifo insertion, we preserve this property by + // copying the old head's acquiretime into the inserted new head. + // In that case the overall average may be slightly high, but that's fine: + // the average of the ends is only an approximation to the actual + // average anyway.) + // The root.dequeue above changed the head and tail acquiretime + // to the current time, so the next unlock will not re-count this contention. + dt0 := t0 - acquiretime + dt := dt0 + if s.waiters != 0 { + dtail := t0 - tailtime + dt += (dtail + dt0) / 2 * int64(s.waiters) + } + mutexevent(dt, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") @@ -248,6 +269,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.elem = unsafe.Pointer(addr) s.next = nil s.prev = nil + s.waiters = 0 var last *sudog pt := &root.treap @@ -258,7 +280,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { // Substitute s in t's place in treap. *pt = s s.ticket = t.ticket - s.acquiretime = t.acquiretime + s.acquiretime = t.acquiretime // preserve head acquiretime as oldest time s.parent = t.parent s.prev = t.prev s.next = t.next @@ -274,6 +296,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { if s.waittail == nil { s.waittail = t } + s.waiters = t.waiters + if s.waiters+1 != 0 { + s.waiters++ + } t.parent = nil t.prev = nil t.next = nil @@ -287,6 +313,9 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { } t.waittail = s s.waitlink = nil + if t.waiters+1 != 0 { + t.waiters++ + } } return } @@ -330,7 +359,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { // in semaRoot blocked on addr. // If the sudog was being profiled, dequeue returns the time // at which it was woken up as now. Otherwise now is 0. -func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { +// If there are additional entries in the wait list, dequeue +// returns tailtime set to the last entry's acquiretime. +// Otherwise tailtime is found.acquiretime. +func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) { ps := &root.treap s := *ps for ; s != nil; s = *ps { @@ -343,7 +375,7 @@ func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { ps = &s.next } } - return nil, 0 + return nil, 0, 0 Found: now = int64(0) @@ -368,7 +400,16 @@ Found: } else { t.waittail = nil } + t.waiters = s.waiters + if t.waiters > 1 { + t.waiters-- + } + // Set head and tail acquire time to 'now', + // because the caller will take care of charging + // the delays before now for all entries in the list. t.acquiretime = now + tailtime = s.waittail.acquiretime + s.waittail.acquiretime = now s.waitlink = nil s.waittail = nil } else { @@ -390,13 +431,14 @@ Found: } else { root.treap = nil } + tailtime = s.acquiretime } s.parent = nil s.elem = nil s.next = nil s.prev = nil s.ticket = 0 - return s, now + return s, now, tailtime } // rotateLeft rotates the tree rooted at node x.