mirror of https://github.com/golang/go.git
cmd/go/internal/par: add Queue as a simpler alternative to Work
par.Work performs two different tasks: deduplicating work (a task which overlaps with par.Cache), and executing limited active work in parallel. It also requires the caller to re-invoke Do whenever the workqueue transititions from empty to non-empty. The new par.Queue only performs the second of those two tasks, and presents a simpler API: it starts and stops its own goroutines as needed (indicating its idle state via a channel), rather than expecting the caller to drive the transitions explicitly. For #36460 Change-Id: I5c38657dda63ab55718497467d05d41744ff59f2 Reviewed-on: https://go-review.googlesource.com/c/go/+/247766 Run-TryBot: Bryan C. Mills <bcmills@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Jay Conrod <jayconrod@google.com>
This commit is contained in:
parent
865d72f962
commit
95df156e6a
|
|
@ -0,0 +1,88 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package par
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Queue manages a set of work items to be executed in parallel. The number of
|
||||
// active work items is limited, and excess items are queued sequentially.
|
||||
type Queue struct {
|
||||
maxActive int
|
||||
st chan queueState
|
||||
}
|
||||
|
||||
type queueState struct {
|
||||
active int // number of goroutines processing work; always nonzero when len(backlog) > 0
|
||||
backlog []func()
|
||||
idle chan struct{} // if non-nil, closed when active becomes 0
|
||||
}
|
||||
|
||||
// NewQueue returns a Queue that executes up to maxActive items in parallel.
|
||||
//
|
||||
// maxActive must be positive.
|
||||
func NewQueue(maxActive int) *Queue {
|
||||
if maxActive < 1 {
|
||||
panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
|
||||
}
|
||||
|
||||
q := &Queue{
|
||||
maxActive: maxActive,
|
||||
st: make(chan queueState, 1),
|
||||
}
|
||||
q.st <- queueState{}
|
||||
return q
|
||||
}
|
||||
|
||||
// Add adds f as a work item in the queue.
|
||||
//
|
||||
// Add returns immediately, but the queue will be marked as non-idle until after
|
||||
// f (and any subsequently-added work) has completed.
|
||||
func (q *Queue) Add(f func()) {
|
||||
st := <-q.st
|
||||
if st.active == q.maxActive {
|
||||
st.backlog = append(st.backlog, f)
|
||||
q.st <- st
|
||||
return
|
||||
}
|
||||
if st.active == 0 {
|
||||
// Mark q as non-idle.
|
||||
st.idle = nil
|
||||
}
|
||||
st.active++
|
||||
q.st <- st
|
||||
|
||||
go func() {
|
||||
for {
|
||||
f()
|
||||
|
||||
st := <-q.st
|
||||
if len(st.backlog) == 0 {
|
||||
if st.active--; st.active == 0 && st.idle != nil {
|
||||
close(st.idle)
|
||||
}
|
||||
q.st <- st
|
||||
return
|
||||
}
|
||||
f, st.backlog = st.backlog[0], st.backlog[1:]
|
||||
q.st <- st
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Idle returns a channel that will be closed when q has no (active or enqueued)
|
||||
// work outstanding.
|
||||
func (q *Queue) Idle() <-chan struct{} {
|
||||
st := <-q.st
|
||||
defer func() { q.st <- st }()
|
||||
|
||||
if st.idle == nil {
|
||||
st.idle = make(chan struct{})
|
||||
if st.active == 0 {
|
||||
close(st.idle)
|
||||
}
|
||||
}
|
||||
|
||||
return st.idle
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package par
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestQueueIdle(t *testing.T) {
|
||||
q := NewQueue(1)
|
||||
select {
|
||||
case <-q.Idle():
|
||||
default:
|
||||
t.Errorf("NewQueue(1) is not initially idle.")
|
||||
}
|
||||
|
||||
started := make(chan struct{})
|
||||
unblock := make(chan struct{})
|
||||
q.Add(func() {
|
||||
close(started)
|
||||
<-unblock
|
||||
})
|
||||
|
||||
<-started
|
||||
idle := q.Idle()
|
||||
select {
|
||||
case <-idle:
|
||||
t.Errorf("NewQueue(1) is marked idle while processing work.")
|
||||
default:
|
||||
}
|
||||
|
||||
close(unblock)
|
||||
<-idle // Should be closed as soon as the Add callback returns.
|
||||
}
|
||||
|
||||
func TestQueueBacklog(t *testing.T) {
|
||||
const (
|
||||
maxActive = 2
|
||||
totalWork = 3 * maxActive
|
||||
)
|
||||
|
||||
q := NewQueue(maxActive)
|
||||
t.Logf("q = NewQueue(%d)", maxActive)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(totalWork)
|
||||
started := make([]chan struct{}, totalWork)
|
||||
unblock := make(chan struct{})
|
||||
for i := range started {
|
||||
started[i] = make(chan struct{})
|
||||
i := i
|
||||
q.Add(func() {
|
||||
close(started[i])
|
||||
<-unblock
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
|
||||
for i, c := range started {
|
||||
if i < maxActive {
|
||||
<-c // Work item i should be started immediately.
|
||||
} else {
|
||||
select {
|
||||
case <-c:
|
||||
t.Errorf("Work item %d started before previous items finished.", i)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(unblock)
|
||||
for _, c := range started[maxActive:] {
|
||||
<-c
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
Loading…
Reference in New Issue