internal/telemetry: change concurrency model

This changes to use a mutex and directly execute the less performance
sensitive telemetry calls (tracing and logging) and then uses a submission
queue only for stats adjustments as those are much more sensitive (but it
should also be easier to keep up with them in bursts)

Fixes golang/go#33692

Change-Id: Ia59a8975f21dfbfcf115be1f1d11b097be8dd9c8
Reviewed-on: https://go-review.googlesource.com/c/tools/+/190737
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
This commit is contained in:
Ian Cottrell 2019-08-17 23:26:28 -04:00 committed by Ian Cottrell
parent 15fda70baf
commit d9ab56aa29
7 changed files with 177 additions and 180 deletions

View File

@ -248,18 +248,15 @@ func Serve(ctx context.Context, addr string) error {
func Render(tmpl *template.Template, fun func(*http.Request) interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
export.Do(func() {
defer close(done)
var data interface{}
if fun != nil {
data = fun(r)
}
if err := tmpl.Execute(w, data); err != nil {
log.Error(context.Background(), "", err)
}
})
<-done
mu.Lock()
defer mu.Unlock()
var data interface{}
if fun != nil {
data = fun(r)
}
if err := tmpl.Execute(w, data); err != nil {
log.Error(context.Background(), "", err)
}
}
}

View File

@ -10,6 +10,7 @@ package export
import (
"context"
"os"
"sync"
"time"
"golang.org/x/tools/internal/telemetry"
@ -27,68 +28,65 @@ type Exporter interface {
Metric(context.Context, telemetry.MetricData)
}
var exporter = LogWriter(os.Stderr, true)
func SetExporter(setter func(Exporter) Exporter) {
Do(func() {
exporter = setter(exporter)
})
}
var (
exporterMu sync.Mutex
exporter = LogWriter(os.Stderr, true)
)
func AddExporters(e ...Exporter) {
Do(func() {
exporter = Multi(append([]Exporter{exporter}, e...)...)
})
exporterMu.Lock()
defer exporterMu.Unlock()
exporter = Multi(append([]Exporter{exporter}, e...)...)
}
func StartSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
Do(func() {
span.Start = at
exporter.StartSpan(ctx, span)
})
exporterMu.Lock()
defer exporterMu.Unlock()
span.Start = at
exporter.StartSpan(ctx, span)
}
func FinishSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
Do(func() {
span.Finish = at
exporter.FinishSpan(ctx, span)
})
exporterMu.Lock()
defer exporterMu.Unlock()
span.Finish = at
exporter.FinishSpan(ctx, span)
}
func Tag(ctx context.Context, at time.Time, tags telemetry.TagList) {
Do(func() {
// If context has a span we need to add the tags to it
span := telemetry.GetSpan(ctx)
if span == nil {
return
}
if span.Start.IsZero() {
// span still being created, tag it directly
span.Tags = append(span.Tags, tags...)
return
}
// span in progress, add an event to the span
span.Events = append(span.Events, telemetry.Event{
At: at,
Tags: tags,
})
exporterMu.Lock()
defer exporterMu.Unlock()
// If context has a span we need to add the tags to it
span := telemetry.GetSpan(ctx)
if span == nil {
return
}
if span.Start.IsZero() {
// span still being created, tag it directly
span.Tags = append(span.Tags, tags...)
return
}
// span in progress, add an event to the span
span.Events = append(span.Events, telemetry.Event{
At: at,
Tags: tags,
})
}
func Log(ctx context.Context, event telemetry.Event) {
Do(func() {
// If context has a span we need to add the event to it
span := telemetry.GetSpan(ctx)
if span != nil {
span.Events = append(span.Events, event)
}
// and now also hand the event of to the current observer
exporter.Log(ctx, event)
})
exporterMu.Lock()
defer exporterMu.Unlock()
// If context has a span we need to add the event to it
span := telemetry.GetSpan(ctx)
if span != nil {
span.Events = append(span.Events, event)
}
// and now also hand the event of to the current observer
exporter.Log(ctx, event)
}
func Metric(ctx context.Context, data telemetry.MetricData) {
Do(func() {
exporter.Metric(ctx, data)
})
exporterMu.Lock()
defer exporterMu.Unlock()
exporter.Metric(ctx, data)
}

View File

@ -14,6 +14,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"time"
"golang.org/x/tools/internal/telemetry"
@ -26,6 +27,7 @@ const DefaultAddress = "http://localhost:55678"
const exportRate = 2 * time.Second
type exporter struct {
mu sync.Mutex
address string
node *wire.Node
spans []*wire.Span
@ -63,9 +65,7 @@ func Connect(service, address string) export.Exporter {
}
go func() {
for _ = range time.Tick(exportRate) {
export.Do(func() {
exporter.flush()
})
exporter.flush()
}
}()
return exporter
@ -74,16 +74,22 @@ func Connect(service, address string) export.Exporter {
func (e *exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (e *exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {
e.mu.Lock()
defer e.mu.Unlock()
e.spans = append(e.spans, convertSpan(span))
}
func (e *exporter) Log(context.Context, telemetry.Event) {}
func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
e.mu.Lock()
defer e.mu.Unlock()
e.metrics = append(e.metrics, convertMetric(data))
}
func (e *exporter) flush() {
e.mu.Lock()
defer e.mu.Unlock()
spans := e.spans
e.spans = nil
metrics := e.metrics

View File

@ -10,9 +10,9 @@ import (
"fmt"
"net/http"
"sort"
"sync"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/metric"
)
@ -21,6 +21,7 @@ func New() *Exporter {
}
type Exporter struct {
mu sync.Mutex
metrics []telemetry.MetricData
}
@ -28,6 +29,8 @@ func (e *Exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (e *Exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {}
func (e *Exporter) Log(ctx context.Context, event telemetry.Event) {}
func (e *Exporter) Metric(ctx context.Context, data telemetry.MetricData) {
e.mu.Lock()
defer e.mu.Unlock()
name := data.Handle()
// We keep the metrics in name sorted order so the page is stable and easy
// to read. We do this with an insertion sort rather than sorting the list
@ -76,48 +79,45 @@ func (e *Exporter) row(w http.ResponseWriter, name string, group telemetry.TagLi
}
func (e *Exporter) Serve(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
export.Do(func() {
defer close(done)
for _, data := range e.metrics {
switch data := data.(type) {
case *metric.Int64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
e.mu.Lock()
defer e.mu.Unlock()
for _, data := range e.metrics {
switch data := data.(type) {
case *metric.Int64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.Float64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.Float64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.HistogramInt64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
case *metric.HistogramInt64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
case *metric.HistogramFloat64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
case *metric.HistogramFloat64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
}
})
<-done
}
}

View File

@ -215,21 +215,19 @@ func (data *Int64Data) Handle() string { return data.Info.Name }
func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
data.Rows = make([]int64, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]int64, len(old))
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export.Metric(ctx, &frozen)
})
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
data.Rows = make([]int64, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]int64, len(old))
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *Int64Data) countInt64(ctx context.Context, measure *stats.Int64Measure, value int64) {
@ -252,21 +250,19 @@ func (data *Float64Data) Handle() string { return data.Info.Name }
func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
data.Rows = make([]float64, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]float64, len(old))
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export.Metric(ctx, &frozen)
})
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
data.Rows = make([]float64, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]float64, len(old))
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *Float64Data) sum(ctx context.Context, measure *stats.Float64Measure, value float64) {
@ -281,27 +277,25 @@ func (data *HistogramInt64Data) Handle() string { return data.Info.
func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramInt64Row
if insert {
data.Rows = make([]*HistogramInt64Row, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]*HistogramInt64Row, len(old))
copy(data.Rows, old)
v = *data.Rows[index]
}
oldValues := v.Values
v.Values = make([]int64, len(data.Info.Buckets))
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
frozen := *data
export.Metric(ctx, &frozen)
})
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramInt64Row
if insert {
data.Rows = make([]*HistogramInt64Row, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]*HistogramInt64Row, len(old))
copy(data.Rows, old)
v = *data.Rows[index]
}
oldValues := v.Values
v.Values = make([]int64, len(data.Info.Buckets))
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64Measure, value int64) {
@ -326,27 +320,25 @@ func (data *HistogramFloat64Data) Handle() string { return data.Inf
func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramFloat64Row
if insert {
data.Rows = make([]*HistogramFloat64Row, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]*HistogramFloat64Row, len(old))
copy(data.Rows, old)
v = *data.Rows[index]
}
oldValues := v.Values
v.Values = make([]int64, len(data.Info.Buckets))
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
frozen := *data
export.Metric(ctx, &frozen)
})
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramFloat64Row
if insert {
data.Rows = make([]*HistogramFloat64Row, len(old)+1)
copy(data.Rows, old[:index])
copy(data.Rows[index+1:], old[index:])
} else {
data.Rows = make([]*HistogramFloat64Row, len(old))
copy(data.Rows, old)
v = *data.Rows[index]
}
oldValues := v.Values
v.Values = make([]int64, len(data.Info.Buckets))
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *HistogramFloat64Data) record(ctx context.Context, measure *stats.Float64Measure, value float64) {

View File

@ -69,9 +69,11 @@ func (m *Int64Measure) Subscribe(s Int64Subscriber) { m.subscribers = append(m.s
// Record delivers a new value to the subscribers of this measure.
func (m *Int64Measure) Record(ctx context.Context, value int64) {
for _, s := range m.subscribers {
s(ctx, m, value)
}
do(func() {
for _, s := range m.subscribers {
s(ctx, m, value)
}
})
}
// Name returns the name this measure was given on construction.
@ -88,7 +90,9 @@ func (m *Float64Measure) Subscribe(s Float64Subscriber) { m.subscribers = append
// Record delivers a new value to the subscribers of this measure.
func (m *Float64Measure) Record(ctx context.Context, value float64) {
for _, s := range m.subscribers {
s(ctx, m, value)
}
do(func() {
for _, s := range m.subscribers {
s(ctx, m, value)
}
})
}

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package export
package stats
import (
"fmt"
@ -23,14 +23,14 @@ func init() {
}()
}
// Do adds a task to the list of things to work on in the background.
// do adds a task to the list of things to work on in the background.
// All tasks will be handled in submission order, and no two tasks will happen
// concurrently so they do not need to do any kind of locking.
// It is safe however to call Do concurrently.
// No promises are made about when the tasks will be performed.
// This function may block, but in general it will return very quickly and
// before the task has been run.
func Do(task func()) {
func do(task func()) {
select {
case workQueue <- task:
default: