mirror of https://github.com/golang/go.git
sync: scalable Pool
Introduce fixed-size P-local caches. When local caches overflow/underflow a batch of items is transferred to/from global mutex-protected cache. benchmark old ns/op new ns/op delta BenchmarkPool 50554 22423 -55.65% BenchmarkPool-4 400359 5904 -98.53% BenchmarkPool-16 403311 1598 -99.60% BenchmarkPool-32 367310 1526 -99.58% BenchmarkPoolOverlflow 5214 3633 -30.32% BenchmarkPoolOverlflow-4 42663 9539 -77.64% BenchmarkPoolOverlflow-8 46919 11385 -75.73% BenchmarkPoolOverlflow-16 39454 13048 -66.93% BenchmarkSprintfEmpty 84 63 -25.68% BenchmarkSprintfEmpty-2 371 32 -91.13% BenchmarkSprintfEmpty-4 465 22 -95.25% BenchmarkSprintfEmpty-8 565 12 -97.77% BenchmarkSprintfEmpty-16 498 5 -98.87% BenchmarkSprintfEmpty-32 492 4 -99.04% BenchmarkSprintfString 259 229 -11.58% BenchmarkSprintfString-2 574 144 -74.91% BenchmarkSprintfString-4 651 77 -88.05% BenchmarkSprintfString-8 868 47 -94.48% BenchmarkSprintfString-16 825 33 -95.96% BenchmarkSprintfString-32 825 30 -96.28% BenchmarkSprintfInt 213 188 -11.74% BenchmarkSprintfInt-2 448 138 -69.20% BenchmarkSprintfInt-4 624 52 -91.63% BenchmarkSprintfInt-8 691 31 -95.43% BenchmarkSprintfInt-16 724 18 -97.46% BenchmarkSprintfInt-32 718 16 -97.70% BenchmarkSprintfIntInt 311 282 -9.32% BenchmarkSprintfIntInt-2 333 145 -56.46% BenchmarkSprintfIntInt-4 642 110 -82.87% BenchmarkSprintfIntInt-8 832 42 -94.90% BenchmarkSprintfIntInt-16 817 24 -97.00% BenchmarkSprintfIntInt-32 805 22 -97.17% BenchmarkSprintfPrefixedInt 309 269 -12.94% BenchmarkSprintfPrefixedInt-2 245 168 -31.43% BenchmarkSprintfPrefixedInt-4 598 99 -83.36% BenchmarkSprintfPrefixedInt-8 770 67 -91.23% BenchmarkSprintfPrefixedInt-16 829 54 -93.49% BenchmarkSprintfPrefixedInt-32 824 50 -93.83% BenchmarkSprintfFloat 418 398 -4.78% BenchmarkSprintfFloat-2 295 203 -31.19% BenchmarkSprintfFloat-4 585 128 -78.12% BenchmarkSprintfFloat-8 873 60 -93.13% BenchmarkSprintfFloat-16 884 33 -96.24% BenchmarkSprintfFloat-32 881 29 -96.62% BenchmarkManyArgs 1097 1069 -2.55% BenchmarkManyArgs-2 705 567 -19.57% BenchmarkManyArgs-4 792 319 -59.72% BenchmarkManyArgs-8 963 172 -82.14% BenchmarkManyArgs-16 1115 103 -90.76% BenchmarkManyArgs-32 1133 90 -92.03% LGTM=rsc R=golang-codereviews, bradfitz, minux.ma, gobot, rsc CC=golang-codereviews https://golang.org/cl/46010043
This commit is contained in:
parent
9fa9613e0b
commit
f8e0057bb7
|
|
@ -29,7 +29,7 @@ var pkgDeps = map[string][]string{
|
|||
"errors": {},
|
||||
"io": {"errors", "sync"},
|
||||
"runtime": {"unsafe"},
|
||||
"sync": {"sync/atomic", "unsafe"},
|
||||
"sync": {"runtime", "sync/atomic", "unsafe"},
|
||||
"sync/atomic": {"unsafe"},
|
||||
"unsafe": {},
|
||||
|
||||
|
|
|
|||
|
|
@ -68,15 +68,19 @@ clearpools(void)
|
|||
{
|
||||
void **pool, **next;
|
||||
P *p, **pp;
|
||||
uintptr off;
|
||||
int32 i;
|
||||
|
||||
// clear sync.Pool's
|
||||
for(pool = pools.head; pool != nil; pool = next) {
|
||||
next = pool[0];
|
||||
pool[0] = nil; // next
|
||||
pool[1] = nil; // slice
|
||||
pool[2] = nil;
|
||||
pool[3] = nil;
|
||||
pool[1] = nil; // local
|
||||
pool[2] = nil; // localSize
|
||||
off = (uintptr)pool[3] / sizeof(void*);
|
||||
pool[off+0] = nil; // global slice
|
||||
pool[off+1] = nil;
|
||||
pool[off+2] = nil;
|
||||
}
|
||||
pools.head = nil;
|
||||
|
||||
|
|
|
|||
|
|
@ -3046,3 +3046,23 @@ haveexperiment(int8 *name)
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// func runtime_procPin() int
|
||||
void
|
||||
sync·runtime_procPin(intgo p)
|
||||
{
|
||||
M *mp;
|
||||
|
||||
mp = m;
|
||||
// Disable preemption.
|
||||
mp->locks++;
|
||||
p = mp->p->id;
|
||||
FLUSH(&p);
|
||||
}
|
||||
|
||||
// func runtime_procUnpin()
|
||||
void
|
||||
sync·runtime_procUnpin(void)
|
||||
{
|
||||
m->locks--;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,18 @@
|
|||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheLineSize = 128
|
||||
poolLocalSize = 2 * cacheLineSize
|
||||
poolLocalCap = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1
|
||||
)
|
||||
|
||||
// A Pool is a set of temporary objects that may be individually saved
|
||||
// and retrieved.
|
||||
//
|
||||
|
|
@ -26,29 +38,52 @@ package sync
|
|||
//
|
||||
// This is an experimental type and might not be released.
|
||||
type Pool struct {
|
||||
next *Pool // for use by runtime. must be first.
|
||||
list []interface{} // offset known to runtime
|
||||
mu Mutex // guards list
|
||||
// The following fields are known to runtime.
|
||||
next *Pool // for use by runtime
|
||||
local *poolLocal // local fixed-size per-P pool, actually an array
|
||||
localSize uintptr // size of the local array
|
||||
globalOffset uintptr // offset of global
|
||||
// The rest is not known to runtime.
|
||||
|
||||
// New optionally specifies a function to generate
|
||||
// a value when Get would otherwise return nil.
|
||||
// It may not be changed concurrently with calls to Get.
|
||||
New func() interface{}
|
||||
|
||||
pad [cacheLineSize]byte
|
||||
// Read-mostly date above this point, mutable data follows.
|
||||
mu Mutex
|
||||
global []interface{} // global fallback pool
|
||||
}
|
||||
|
||||
func runtime_registerPool(*Pool)
|
||||
// Local per-P Pool appendix.
|
||||
type poolLocal struct {
|
||||
tail int
|
||||
unused int
|
||||
buf [poolLocalCap]interface{}
|
||||
}
|
||||
|
||||
func init() {
|
||||
var v poolLocal
|
||||
if unsafe.Sizeof(v) != poolLocalSize {
|
||||
panic("sync: incorrect pool size")
|
||||
}
|
||||
}
|
||||
|
||||
// Put adds x to the pool.
|
||||
func (p *Pool) Put(x interface{}) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
if p.list == nil {
|
||||
runtime_registerPool(p)
|
||||
l := p.pin()
|
||||
t := l.tail
|
||||
if t < int(poolLocalCap) {
|
||||
l.buf[t] = x
|
||||
l.tail = t + 1
|
||||
runtime_procUnpin()
|
||||
return
|
||||
}
|
||||
p.list = append(p.list, x)
|
||||
p.mu.Unlock()
|
||||
p.putSlow(l, x)
|
||||
}
|
||||
|
||||
// Get selects an arbitrary item from the Pool, removes it from the
|
||||
|
|
@ -60,16 +95,116 @@ func (p *Pool) Put(x interface{}) {
|
|||
// If Get would otherwise return nil and p.New is non-nil, Get returns
|
||||
// the result of calling p.New.
|
||||
func (p *Pool) Get() interface{} {
|
||||
p.mu.Lock()
|
||||
var x interface{}
|
||||
if n := len(p.list); n > 0 {
|
||||
x = p.list[n-1]
|
||||
p.list[n-1] = nil // Just to be safe
|
||||
p.list = p.list[:n-1]
|
||||
l := p.pin()
|
||||
t := l.tail
|
||||
if t > 0 {
|
||||
t -= 1
|
||||
x := l.buf[t]
|
||||
l.tail = t
|
||||
runtime_procUnpin()
|
||||
return x
|
||||
}
|
||||
return p.getSlow()
|
||||
}
|
||||
|
||||
func (p *Pool) putSlow(l *poolLocal, x interface{}) {
|
||||
// Grab half of items from local pool and put to global pool.
|
||||
// Can not lock the mutex while pinned.
|
||||
const N = int(poolLocalCap/2 + 1)
|
||||
var buf [N]interface{}
|
||||
buf[0] = x
|
||||
for i := 1; i < N; i++ {
|
||||
l.tail--
|
||||
buf[i] = l.buf[l.tail]
|
||||
}
|
||||
runtime_procUnpin()
|
||||
|
||||
p.mu.Lock()
|
||||
p.global = append(p.global, buf[:]...)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
func (p *Pool) getSlow() (x interface{}) {
|
||||
// Grab a batch of items from global pool and put to local pool.
|
||||
// Can not lock the mutex while pinned.
|
||||
runtime_procUnpin()
|
||||
p.mu.Lock()
|
||||
pid := runtime_procPin()
|
||||
s := p.localSize
|
||||
l := p.local
|
||||
if uintptr(pid) < s {
|
||||
l = indexLocal(l, pid)
|
||||
// Get the item to return.
|
||||
last := len(p.global) - 1
|
||||
if last >= 0 {
|
||||
x = p.global[last]
|
||||
p.global = p.global[:last]
|
||||
}
|
||||
// Try to refill local pool, we may have been rescheduled to another P.
|
||||
if last > 0 && l.tail == 0 {
|
||||
n := int(poolLocalCap / 2)
|
||||
gl := len(p.global)
|
||||
if n > gl {
|
||||
n = gl
|
||||
}
|
||||
copy(l.buf[:], p.global[gl-n:])
|
||||
p.global = p.global[:gl-n]
|
||||
l.tail = n
|
||||
}
|
||||
}
|
||||
runtime_procUnpin()
|
||||
p.mu.Unlock()
|
||||
|
||||
if x == nil && p.New != nil {
|
||||
x = p.New()
|
||||
}
|
||||
return x
|
||||
return
|
||||
}
|
||||
|
||||
// pin pins current goroutine to P, disables preemption and returns poolLocal pool for the P.
|
||||
// Caller must call runtime_procUnpin() when done with the pool.
|
||||
func (p *Pool) pin() *poolLocal {
|
||||
pid := runtime_procPin()
|
||||
// In pinSlow we store to localSize and then to local, here we load in opposite order.
|
||||
// Since we've disabled preemption, GC can not happen in between.
|
||||
// Thus here we must observe local at least as large localSize.
|
||||
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
|
||||
s := atomic.LoadUintptr(&p.localSize) // load-acquire
|
||||
l := p.local // load-consume
|
||||
if uintptr(pid) < s {
|
||||
return indexLocal(l, pid)
|
||||
}
|
||||
return p.pinSlow()
|
||||
}
|
||||
|
||||
func (p *Pool) pinSlow() *poolLocal {
|
||||
// Retry under the mutex.
|
||||
runtime_procUnpin()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
pid := runtime_procPin()
|
||||
s := p.localSize
|
||||
l := p.local
|
||||
if uintptr(pid) < s {
|
||||
return indexLocal(l, pid)
|
||||
}
|
||||
if p.local == nil {
|
||||
p.globalOffset = unsafe.Offsetof(p.global)
|
||||
runtime_registerPool(p)
|
||||
}
|
||||
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
|
||||
size := runtime.GOMAXPROCS(0)
|
||||
local := make([]poolLocal, size)
|
||||
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release
|
||||
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
|
||||
return &local[pid]
|
||||
}
|
||||
|
||||
func indexLocal(l *poolLocal, i int) *poolLocal {
|
||||
return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh...
|
||||
}
|
||||
|
||||
// Implemented in runtime.
|
||||
func runtime_registerPool(*Pool)
|
||||
func runtime_procPin() int
|
||||
func runtime_procUnpin()
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import (
|
|||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
func TestPool(t *testing.T) {
|
||||
|
|
@ -125,28 +124,41 @@ func TestPoolStress(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkPool(b *testing.B) {
|
||||
procs := runtime.GOMAXPROCS(-1)
|
||||
var dec func() bool
|
||||
if unsafe.Sizeof(b.N) == 8 {
|
||||
n := int64(b.N)
|
||||
dec = func() bool {
|
||||
return atomic.AddInt64(&n, -1) >= 0
|
||||
}
|
||||
} else {
|
||||
n := int32(b.N)
|
||||
dec = func() bool {
|
||||
return atomic.AddInt32(&n, -1) >= 0
|
||||
}
|
||||
}
|
||||
var p Pool
|
||||
var wg WaitGroup
|
||||
for i := 0; i < procs; i++ {
|
||||
n0 := uintptr(b.N)
|
||||
n := n0
|
||||
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for dec() {
|
||||
p.Put(1)
|
||||
p.Get()
|
||||
for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
|
||||
for b := 0; b < 100; b++ {
|
||||
p.Put(1)
|
||||
p.Get()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkPoolOverlflow(b *testing.B) {
|
||||
var p Pool
|
||||
var wg WaitGroup
|
||||
n0 := uintptr(b.N)
|
||||
n := n0
|
||||
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
|
||||
for b := 0; b < 100; b++ {
|
||||
p.Put(1)
|
||||
}
|
||||
for b := 0; b < 100; b++ {
|
||||
p.Get()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue