runtime/trace: add the flight recorder

This change adds the flight recorder to the trace package.
Flight recording is a technique in which trace data is kept
in a circular buffer and can be flushed upon request. The
implementation will be added in follow-up CLs.

The flight recorder has already been implemented inside of the
golang.org/x/exp/trace package. This copies the current implementation
and modifies it to work within the runtime/trace package.

The changes include:

This adds the ability for multiple consumers (both the execution
tracer and the flight recorder) to subscribe to tracing events. This
change allows us to add multiple consumers without making major
modifications to the runtime. Future optimizations are planned
for this functionality.

This removes the use of byte readers from the process that
parses and processes the trace batches.

This modifies the flight recorder to not parse out the trace
clock frequency, since that requires knowledge of the format that's
unfortunate to encode in yet another place. Right now, the trace clock
frequency is considered stable for the lifetime of the program, so just
grab it directly from the runtime.

This change adds an in-band end-of-generation signal to the internal
implementation of runtime.ReadTrace. The internal implementation is
exported via linkname to runtime/trace, so the flight recorder can
identify exactly when a generation has ended. This signal is also useful
for ensuring that subscribers to runtime trace data always see complete
generations, by starting or stopping data streaming only at generation
boundaries.

For #63185

Change-Id: I5c15345981a6bbe9764a3d623448237e983c64ec
Reviewed-on: https://go-review.googlesource.com/c/go/+/673116
Auto-Submit: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Carlos Amedee 2025-05-14 16:13:24 -04:00 committed by Gopher Robot
parent 7589e96042
commit 83df0afc4e
14 changed files with 1062 additions and 78 deletions

9
api/next/63185.txt Normal file
View File

@ -0,0 +1,9 @@
pkg runtime/trace, func NewFlightRecorder(FlightRecorderConfig) *FlightRecorder #63185
pkg runtime/trace, method (*FlightRecorder) Enabled() bool #63185
pkg runtime/trace, method (*FlightRecorder) Start() error #63185
pkg runtime/trace, method (*FlightRecorder) Stop() #63185
pkg runtime/trace, method (*FlightRecorder) WriteTo(io.Writer) (int64, error) #63185
pkg runtime/trace, type FlightRecorder struct #63185
pkg runtime/trace, type FlightRecorderConfig struct #63185
pkg runtime/trace, type FlightRecorderConfig struct, MaxBytes uint64 #63185
pkg runtime/trace, type FlightRecorderConfig struct, MinAge time.Duration #63185

View File

@ -0,0 +1,2 @@
<!-- go.dev/issue/63185 -->
TODO The flight recorder has been added to the runtime/trace package.

View File

@ -98,25 +98,26 @@ func (v *Validator) Event(ev trace.Event) error {
if v.GoVersion >= version.Go125 && !(s.N > 1 && s.ClockSnapshot == nil) { if v.GoVersion >= version.Go125 && !(s.N > 1 && s.ClockSnapshot == nil) {
if s.ClockSnapshot == nil { if s.ClockSnapshot == nil {
e.Errorf("sync %d has no clock snapshot", s.N) e.Errorf("sync %d has no clock snapshot", s.N)
} } else {
if s.ClockSnapshot.Wall.IsZero() { if s.ClockSnapshot.Wall.IsZero() {
e.Errorf("sync %d has zero wall time", s.N) e.Errorf("sync %d has zero wall time", s.N)
}
if s.ClockSnapshot.Mono == 0 {
e.Errorf("sync %d has zero mono time", s.N)
}
if s.ClockSnapshot.Trace == 0 {
e.Errorf("sync %d has zero trace time", s.N)
}
if !v.skipClockSnapshotChecks {
if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) {
e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall)
} }
if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) { if s.ClockSnapshot.Mono == 0 {
e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono) e.Errorf("sync %d has zero mono time", s.N)
} }
if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) { if s.ClockSnapshot.Trace == 0 {
e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace) e.Errorf("sync %d has zero trace time", s.N)
}
if !v.skipClockSnapshotChecks {
if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) {
e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall)
}
if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) {
e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono)
}
if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) {
e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace)
}
} }
} }
} }

View File

@ -87,6 +87,10 @@ const (
EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot] EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot]
EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec] EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec]
// Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25.
// This could be used as an explicit in-band end-of-generation signal in the future.
EvEndOfGeneration
NumEvents NumEvents
) )
@ -189,6 +193,9 @@ var specs = [...]EventSpec{
EvSync: { EvSync: {
Name: "Sync", Name: "Sync",
}, },
EvEndOfGeneration: {
Name: "EndOfGeneration",
},
// "Timed" Events. // "Timed" Events.
EvProcsChange: { EvProcsChange: {

View File

@ -21,6 +21,7 @@ package runtime
import ( import (
"internal/runtime/atomic" "internal/runtime/atomic"
"internal/trace/tracev2"
"unsafe" "unsafe"
) )
@ -51,9 +52,10 @@ var trace struct {
// State for the trace reader goroutine. // State for the trace reader goroutine.
// //
// Protected by trace.lock. // Protected by trace.lock.
readerGen atomic.Uintptr // the generation the reader is currently reading for readerGen atomic.Uintptr // the generation the reader is currently reading for
flushedGen atomic.Uintptr // the last completed generation flushedGen atomic.Uintptr // the last completed generation
headerWritten bool // whether ReadTrace has emitted trace header headerWritten bool // whether ReadTrace has emitted trace header
endOfGenerationWritten bool // whether readTrace has emitted the end of the generation signal
// doneSema is used to synchronize the reader and traceAdvance. Specifically, // doneSema is used to synchronize the reader and traceAdvance. Specifically,
// it notifies traceAdvance that the reader is done with a generation. // it notifies traceAdvance that the reader is done with a generation.
@ -753,8 +755,24 @@ func traceRegisterLabelsAndReasons(gen uintptr) {
// returned data before calling ReadTrace again. // returned data before calling ReadTrace again.
// ReadTrace must be called from one goroutine at a time. // ReadTrace must be called from one goroutine at a time.
func ReadTrace() []byte { func ReadTrace() []byte {
for {
buf := readTrace()
// Skip over the end-of-generation signal which must not appear
// in the final trace.
if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration {
continue
}
return buf
}
}
// readTrace is the implementation of ReadTrace, except with an additional
// in-band signal as to when the buffer is for a new generation.
//
//go:linkname readTrace runtime/trace.runtime_readTrace
func readTrace() (buf []byte) {
top: top:
var buf []byte
var park bool var park bool
systemstack(func() { systemstack(func() {
buf, park = readTrace0() buf, park = readTrace0()
@ -782,7 +800,6 @@ top:
}, nil, waitReasonTraceReaderBlocked, traceBlockSystemGoroutine, 2) }, nil, waitReasonTraceReaderBlocked, traceBlockSystemGoroutine, 2)
goto top goto top
} }
return buf return buf
} }
@ -849,6 +866,17 @@ func readTrace0() (buf []byte, park bool) {
// is waiting on the reader to finish flushing the last generation so that it // is waiting on the reader to finish flushing the last generation so that it
// can continue to advance. // can continue to advance.
if trace.flushedGen.Load() == gen { if trace.flushedGen.Load() == gen {
// Write out the internal in-band end-of-generation signal.
if !trace.endOfGenerationWritten {
trace.endOfGenerationWritten = true
unlock(&trace.lock)
return []byte{byte(tracev2.EvEndOfGeneration)}, false
}
// Reset the flag.
trace.endOfGenerationWritten = false
// Handle shutdown.
if trace.shutdown.Load() { if trace.shutdown.Load() {
unlock(&trace.lock) unlock(&trace.lock)
@ -868,6 +896,8 @@ func readTrace0() (buf []byte, park bool) {
// read. We're done. // read. We're done.
return nil, false return nil, false
} }
// Handle advancing to the next generation.
// The previous gen has had all of its buffers flushed, and // The previous gen has had all of its buffers flushed, and
// there's nothing else for us to read. Advance the generation // there's nothing else for us to read. Advance the generation
// we're reading from and try again. // we're reading from and try again.

View File

@ -0,0 +1,83 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"fmt"
"internal/trace/tracev2"
)
// timestamp is an unprocessed timestamp.
type timestamp uint64
// batch represents a batch of trace events.
// It is unparsed except for its header.
type batch struct {
m threadID
time timestamp
data []byte
}
// threadID is the runtime-internal M structure's ID. This is unique
// for each OS thread.
type threadID int64
// readBatch copies b and parses the trace batch header inside.
// Returns the batch, the generation, bytes read, and an error.
func readBatch(b []byte) (batch, uint64, uint64, error) {
if len(b) == 0 {
return batch{}, 0, 0, fmt.Errorf("batch is empty")
}
data := make([]byte, len(b))
if nw := copy(data, b); nw != len(b) {
return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
}
// Read batch header byte.
if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
}
// Read the batch header: gen (generation), thread (M) ID, base timestamp
// for the batch.
total := 1
b = b[1:]
gen, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
}
total += n
b = b[n:]
m, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
}
total += n
b = b[n:]
ts, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
}
total += n
b = b[n:]
// Read in the size of the batch to follow.
size, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
}
if size > tracev2.MaxBatchSize {
return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
}
total += n
total += int(size)
data = data[:total]
// Return the batch.
return batch{
m: threadID(m),
time: timestamp(ts),
data: data,
}, gen, uint64(total), nil
}

View File

@ -0,0 +1,50 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"errors"
)
// maxVarintLenN is the maximum length of a varint-encoded N-bit integer.
const maxVarintLen64 = 10
var (
errOverflow = errors.New("binary: varint overflows a 64-bit integer")
errEOB = errors.New("binary: end of buffer")
)
// TODO deduplicate this function.
func readUvarint(b []byte) (uint64, int, error) {
var x uint64
var s uint
var byt byte
for i := 0; i < maxVarintLen64 && i < len(b); i++ {
byt = b[i]
if byt < 0x80 {
if i == maxVarintLen64-1 && byt > 1 {
return x, i, errOverflow
}
return x | uint64(byt)<<s, i + 1, nil
}
x |= uint64(byt&0x7f) << s
s += 7
}
return x, len(b), errOverflow
}
// putUvarint encodes a uint64 into buf and returns the number of bytes written.
// If the buffer is too small, PutUvarint will panic.
// TODO deduplicate this function.
func putUvarint(buf []byte, x uint64) int {
i := 0
for x >= 0x80 {
buf[i] = byte(x) | 0x80
x >>= 7
i++
}
buf[i] = byte(x)
return i + 1
}

View File

@ -0,0 +1,182 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"fmt"
"io"
"sync"
"time"
_ "unsafe" // added for go linkname usage
)
// FlightRecorder represents a single consumer of a Go execution
// trace.
// It tracks a moving window over the execution trace produced by
// the runtime, always containing the most recent trace data.
//
// At most one flight recorder may be active at any given time,
// though flight recording is allowed to be concurrently active
// with a trace consumer using trace.Start.
// This restriction of only a single flight recorder may be removed
// in the future.
type FlightRecorder struct {
err error
// State specific to the recorder.
header [16]byte
active rawGeneration
ringMu sync.Mutex
ring []rawGeneration
freq frequency // timestamp conversion factor, from the runtime
// Externally-set options.
targetSize uint64
targetPeriod time.Duration
enabled bool // whether the flight recorder is enabled.
writing sync.Mutex // protects concurrent calls to WriteTo
// The values of targetSize and targetPeriod we've committed to since the last Start.
wantSize uint64
wantDur time.Duration
}
// NewFlightRecorder creates a new flight recorder from the provided configuration.
func NewFlightRecorder(cfg FlightRecorderConfig) *FlightRecorder {
fr := new(FlightRecorder)
if cfg.MaxBytes != 0 {
fr.targetSize = cfg.MaxBytes
} else {
fr.targetSize = 10 << 20 // 10 MiB.
}
if cfg.MinAge != 0 {
fr.targetPeriod = cfg.MinAge
} else {
fr.targetPeriod = 10 * time.Second
}
return fr
}
// Start activates the flight recorder and begins recording trace data.
// Only one call to trace.Start may be active at any given time.
// In addition, currently only one flight recorder may be active in the program.
// Returns an error if the flight recorder cannot be started or is already started.
func (fr *FlightRecorder) Start() error {
if fr.enabled {
return fmt.Errorf("cannot enable a enabled flight recorder")
}
fr.wantSize = fr.targetSize
fr.wantDur = fr.targetPeriod
fr.err = nil
fr.freq = frequency(1.0 / (float64(runtime_traceClockUnitsPerSecond()) / 1e9))
// Start tracing, data is sent to a recorder which forwards it to our own
// storage.
if err := tracing.subscribeFlightRecorder(&recorder{r: fr}); err != nil {
return err
}
fr.enabled = true
return nil
}
// Stop ends recording of trace data. It blocks until any concurrent WriteTo calls complete.
func (fr *FlightRecorder) Stop() {
if !fr.enabled {
return
}
fr.enabled = false
tracing.unsubscribeFlightRecorder()
// Reset all state. No need to lock because the reader has already exited.
fr.active = rawGeneration{}
fr.ring = nil
}
// Enabled returns true if the flight recorder is active.
// Specifically, it will return true if Start did not return an error, and Stop has not yet been called.
// It is safe to call from multiple goroutines simultaneously.
func (fr *FlightRecorder) Enabled() bool { return fr.enabled }
// WriteTo snapshots the moving window tracked by the flight recorder.
// The snapshot is expected to contain data that is up-to-date as of when WriteTo is called,
// though this is not a hard guarantee.
// Only one goroutine may execute WriteTo at a time.
// An error is returned upon failure to write to w, if another WriteTo call is already in-progress,
// or if the flight recorder is inactive.
func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) {
if !fr.enabled {
return 0, fmt.Errorf("cannot snapshot a disabled flight recorder")
}
if !fr.writing.TryLock() {
// Indicates that a call to WriteTo was made while one was already in progress.
// If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo.
return 0, fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress")
}
defer fr.writing.Unlock()
// Force a global buffer flush.
runtime_traceAdvance(false)
// Now that everything has been flushed and written, grab whatever we have.
//
// N.B. traceAdvance blocks until the tracer goroutine has actually written everything
// out, which means the generation we just flushed must have been already been observed
// by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to
// have been both completed *and* processed by the recorder goroutine.
fr.ringMu.Lock()
gens := fr.ring
fr.ringMu.Unlock()
// Write the header.
nw, err := w.Write(fr.header[:])
if err != nil {
return int64(nw), err
}
n += int64(nw)
// Write all the data.
for _, gen := range gens {
for _, batch := range gen.batches {
// Write batch data.
nw, err = w.Write(batch.data)
n += int64(nw)
if err != nil {
return n, err
}
}
}
return n, nil
}
type FlightRecorderConfig struct {
// MinAge is a lower bound on the age of an event in the flight recorder's window.
//
// The flight recorder will strive to promptly discard events older than the minimum age,
// but older events may appear in the window snapshot. The age setting will always be
// overridden by MaxSize.
//
// If this is 0, the minimum age is implementation defined, but can be assumed to be on the order
// of seconds.
MinAge time.Duration
// MaxBytes is an upper bound on the size of the window in bytes.
//
// This setting takes precedence over MinAge.
// However, it does not make any guarantees on the size of the data WriteTo will write,
// nor does it guarantee memory overheads will always stay below MaxBytes. Treat it
// as a hint.
//
// If this is 0, the maximum size is implementation defined.
MaxBytes uint64
}
//go:linkname runtime_traceClockUnitsPerSecond
func runtime_traceClockUnitsPerSecond() uint64
//go:linkname runtime_traceAdvance runtime.traceAdvance
func runtime_traceAdvance(stopTrace bool)

View File

@ -0,0 +1,308 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace_test
import (
"bytes"
"context"
inttrace "internal/trace"
"internal/trace/testtrace"
"io"
"runtime/trace"
"slices"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestFlightRecorderDoubleStart(t *testing.T) {
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
if err := fr.Start(); err != nil {
t.Fatalf("unexpected error on Start: %v", err)
}
if err := fr.Start(); err == nil {
t.Fatalf("expected error from double Start: %v", err)
}
fr.Stop()
}
func TestFlightRecorderEnabled(t *testing.T) {
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
if fr.Enabled() {
t.Fatal("flight recorder is enabled, but never started")
}
if err := fr.Start(); err != nil {
t.Fatalf("unexpected error on Start: %v", err)
}
if !fr.Enabled() {
t.Fatal("flight recorder is not enabled, but started")
}
fr.Stop()
if fr.Enabled() {
t.Fatal("flight recorder is enabled, but stopped")
}
}
func TestFlightRecorderWriteToDisabled(t *testing.T) {
var buf bytes.Buffer
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
if n, err := fr.WriteTo(&buf); err == nil {
t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
}
if err := fr.Start(); err != nil {
t.Fatalf("unexpected error on Start: %v", err)
}
fr.Stop()
if n, err := fr.WriteTo(&buf); err == nil {
t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
}
}
func TestFlightRecorderConcurrentWriteTo(t *testing.T) {
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
if err := fr.Start(); err != nil {
t.Fatalf("unexpected error on Start: %v", err)
}
// Start two goroutines to write snapshots.
//
// Most of the time one will fail and one will succeed, but we don't require this.
// Due to a variety of factors, it's definitely possible for them both to succeed.
// However, at least one should succeed.
var bufs [2]bytes.Buffer
var wg sync.WaitGroup
var successes atomic.Uint32
for i := range bufs {
wg.Add(1)
go func() {
defer wg.Done()
n, err := fr.WriteTo(&bufs[i])
// TODO(go.dev/issue/63185) was an exported error. Consider refactoring.
if err != nil && err.Error() == "call to WriteTo for trace.FlightRecorder already in progress" {
if n != 0 {
t.Errorf("(goroutine %d) WriteTo bytes written is non-zero for early bail out: %d", i, n)
}
return
}
if err != nil {
t.Errorf("(goroutine %d) failed to write snapshot for unexpected reason: %v", i, err)
}
successes.Add(1)
if n == 0 {
t.Errorf("(goroutine %d) wrote invalid trace of zero bytes in size", i)
}
if n != int64(bufs[i].Len()) {
t.Errorf("(goroutine %d) trace length doesn't match WriteTo result: got %d, want %d", i, n, int64(bufs[i].Len()))
}
}()
}
wg.Wait()
// Stop tracing.
fr.Stop()
// Make sure at least one succeeded to write.
if successes.Load() == 0 {
t.Fatal("expected at least one success to write a snapshot, got zero")
}
// Validate the traces that came out.
for i := range bufs {
buf := &bufs[i]
if buf.Len() == 0 {
continue
}
testReader(t, buf, testtrace.ExpectSuccess())
}
}
func TestFlightRecorder(t *testing.T) {
testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) {
snapshot()
})
}
func TestFlightRecorderStartStop(t *testing.T) {
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
for i := 0; i < 5; i++ {
testFlightRecorder(t, fr, func(snapshot func()) {
snapshot()
})
}
}
func TestFlightRecorderLog(t *testing.T) {
tr := testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) {
trace.Log(context.Background(), "message", "hello")
snapshot()
})
// Prepare to read the trace snapshot.
r, err := inttrace.NewReader(bytes.NewReader(tr))
if err != nil {
t.Fatalf("unexpected error creating trace reader: %v", err)
}
// Find the log message in the trace.
found := false
for {
ev, err := r.ReadEvent()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("unexpected error reading trace: %v", err)
}
if !found && ev.Kind() == inttrace.EventLog {
log := ev.Log()
found = log.Category == "message" && log.Message == "hello"
}
}
if !found {
t.Errorf("failed to find expected log message (%q, %q) in snapshot", "message", "hello")
}
}
func TestFlightRecorderOneGeneration(t *testing.T) {
test := func(t *testing.T, fr *trace.FlightRecorder) {
tr := testFlightRecorder(t, fr, func(snapshot func()) {
// Sleep to let a few generations pass.
time.Sleep(3 * time.Second)
snapshot()
})
// Prepare to read the trace snapshot.
r, err := inttrace.NewReader(bytes.NewReader(tr))
if err != nil {
t.Fatalf("unexpected error creating trace reader: %v", err)
}
// Make sure there are exactly two Sync events: at the start and end.
var syncs []int
evs := 0
for {
ev, err := r.ReadEvent()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("unexpected error reading trace: %v", err)
}
if ev.Kind() == inttrace.EventSync {
syncs = append(syncs, evs)
}
evs++
}
if ends := []int{0, evs - 1}; !slices.Equal(syncs, ends) {
t.Errorf("expected two sync events (one at each end of the trace), found %d at %d instead of %d",
len(syncs), syncs[:min(len(syncs), 5)], ends)
}
}
t.Run("SetMinAge", func(t *testing.T) {
t.Skip("issue 63185: flaky test")
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MinAge: time.Millisecond})
test(t, fr)
})
t.Run("MaxBytes", func(t *testing.T) {
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16})
test(t, fr)
})
}
type flightRecorderTestFunc func(snapshot func())
func testFlightRecorder(t *testing.T, fr *trace.FlightRecorder, f flightRecorderTestFunc) []byte {
if trace.IsEnabled() {
t.Skip("cannot run flight recorder tests when tracing is enabled")
}
// Start the flight recorder.
if err := fr.Start(); err != nil {
t.Fatalf("unexpected error on Start: %v", err)
}
// Set up snapshot callback.
var buf bytes.Buffer
callback := func() {
n, err := fr.WriteTo(&buf)
if err != nil {
t.Errorf("unexpected failure during flight recording: %v", err)
return
}
if n < 16 {
t.Errorf("expected a trace size of at least 16 bytes, got %d", n)
}
if n != int64(buf.Len()) {
t.Errorf("WriteTo result doesn't match trace size: got %d, want %d", n, int64(buf.Len()))
}
}
// Call the test function.
f(callback)
// Stop the flight recorder.
fr.Stop()
// Get the trace bytes; we don't want to use the Buffer as a Reader directly
// since we may want to consume this data more than once.
traceBytes := buf.Bytes()
// Parse the trace to make sure it's not broken.
testReader(t, bytes.NewReader(traceBytes), testtrace.ExpectSuccess())
return traceBytes
}
func testReader(t *testing.T, tr io.Reader, exp *testtrace.Expectation) {
r, err := inttrace.NewReader(tr)
if err != nil {
if err := exp.Check(err); err != nil {
t.Error(err)
}
return
}
v := testtrace.NewValidator()
v.SkipClockSnapshotChecks()
for {
ev, err := r.ReadEvent()
if err == io.EOF {
break
}
if err != nil {
if err := exp.Check(err); err != nil {
t.Error(err)
}
return
}
if err := v.Event(ev); err != nil {
t.Error(err)
}
}
if err := exp.Check(nil); err != nil {
t.Error(err)
}
}
func TestTraceAndFlightRecorder(t *testing.T) {
var tBuf, frBuf bytes.Buffer
if err := trace.Start(&tBuf); err != nil {
t.Errorf("unable to start execution tracer: %s", err)
}
fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16})
fr.Start()
fr.WriteTo(&frBuf)
fr.Stop()
trace.Stop()
if tBuf.Len() == 0 || frBuf.Len() == 0 {
t.Errorf("None of these should be equal to zero: %d %d", tBuf.Len(), frBuf.Len())
}
if tBuf.Len() <= frBuf.Len() {
t.Errorf("trace should be longer than the flight recorder: trace=%d flight record=%d", tBuf.Len(), frBuf.Len())
}
}

View File

@ -0,0 +1,144 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"fmt"
"slices"
"time"
_ "unsafe" // added for go linkname usage
)
// A recorder receives bytes from the runtime tracer, processes it.
type recorder struct {
r *FlightRecorder
headerReceived bool
}
func (w *recorder) Write(b []byte) (n int, err error) {
r := w.r
defer func() {
if err != nil {
// Propagate errors to the flightrecorder.
if r.err == nil {
r.err = err
}
}
}()
if !w.headerReceived {
if len(b) < len(r.header) {
return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header))
}
r.header = ([16]byte)(b[:16])
n += 16
w.headerReceived = true
}
if len(b) == n {
return 0, nil
}
ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
if err != nil {
return len(b) - int(nb) - n, err
}
n += int(nb)
// Append the batch to the current generation.
if r.active.gen == 0 {
r.active.gen = gen
}
if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) {
r.active.minTime = r.freq.mul(ba.time)
}
r.active.size += len(ba.data)
r.active.batches = append(r.active.batches, ba)
return len(b), nil
}
func (w *recorder) endGeneration() {
r := w.r
// Check if we're entering a new generation.
r.ringMu.Lock()
// Get the current trace clock time.
now := traceTimeNow(r.freq)
// Add the current generation to the ring. Make sure we always have at least one
// complete generation by putting the active generation onto the new list, regardless
// of whatever our settings are.
//
// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
// and not worry about aliasing. This creates allocations, but at a very low rate.
newRing := []rawGeneration{r.active}
size := r.active.size
for i := len(r.ring) - 1; i >= 0; i-- {
// Stop adding older generations if the new ring already exceeds the thresholds.
// This ensures we keep generations that cross a threshold, but not any that lie
// entirely outside it.
if uint64(size) > r.wantSize || now.Sub(newRing[len(newRing)-1].minTime) > r.wantDur {
break
}
size += r.ring[i].size
newRing = append(newRing, r.ring[i])
}
slices.Reverse(newRing)
r.ring = newRing
r.ringMu.Unlock()
// Start a new active generation.
r.active = rawGeneration{}
}
type rawGeneration struct {
gen uint64
size int
minTime eventTime
batches []batch
}
func traceTimeNow(freq frequency) eventTime {
return freq.mul(timestamp(runtime_traceClockNow()))
}
//go:linkname runtime_traceClockNow runtime.traceClockNow
func runtime_traceClockNow() int64
// frequency is nanoseconds per timestamp unit.
type frequency float64
// mul multiplies an unprocessed to produce a time in nanoseconds.
func (f frequency) mul(t timestamp) eventTime {
return eventTime(float64(t) * float64(f))
}
// eventTime is a timestamp in nanoseconds.
//
// It corresponds to the monotonic clock on the platform that the
// trace was taken, and so is possible to correlate with timestamps
// for other traces taken on the same machine using the same clock
// (i.e. no reboots in between).
//
// The actual absolute value of the timestamp is only meaningful in
// relation to other timestamps from the same clock.
//
// BUG: Timestamps coming from traces on Windows platforms are
// only comparable with timestamps from the same trace. Timestamps
// across traces cannot be compared, because the system clock is
// not used as of Go 1.22.
//
// BUG: Traces produced by Go versions 1.21 and earlier cannot be
// compared with timestamps from other traces taken on the same
// machine. This is because the system clock was not used at all
// to collect those timestamps.
type eventTime int64
// Sub subtracts t0 from t, returning the duration in nanoseconds.
func (t eventTime) Sub(t0 eventTime) time.Duration {
return time.Duration(int64(t) - int64(t0))
}

View File

@ -0,0 +1,188 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"fmt"
"internal/trace/tracev2"
"io"
"runtime"
"sync"
"sync/atomic"
_ "unsafe"
)
var tracing traceMultiplexer
type traceMultiplexer struct {
sync.Mutex
enabled atomic.Bool
subscribers int
subscribersMu sync.Mutex
traceStartWriter io.Writer
flightRecorder *recorder
}
func (t *traceMultiplexer) subscribeFlightRecorder(r *recorder) error {
t.Lock()
defer t.Unlock()
t.subscribersMu.Lock()
if t.flightRecorder != nil {
t.subscribersMu.Unlock()
return fmt.Errorf("flight recorder already enabled")
}
t.flightRecorder = r
t.subscribersMu.Unlock()
if err := t.addedSubscriber(); err != nil {
t.subscribersMu.Lock()
t.flightRecorder = nil
t.subscribersMu.Unlock()
return err
}
return nil
}
func (t *traceMultiplexer) unsubscribeFlightRecorder() error {
t.Lock()
defer t.Unlock()
t.removingSubscriber()
t.subscribersMu.Lock()
if t.flightRecorder == nil {
t.subscribersMu.Unlock()
return fmt.Errorf("attempt to unsubscribe missing flight recorder")
}
t.flightRecorder = nil
t.subscribersMu.Unlock()
t.removedSubscriber()
return nil
}
func (t *traceMultiplexer) subscribeTraceStartWriter(w io.Writer) error {
t.Lock()
defer t.Unlock()
t.subscribersMu.Lock()
if t.traceStartWriter != nil {
t.subscribersMu.Unlock()
return fmt.Errorf("execution tracer already enabled")
}
t.traceStartWriter = w
t.subscribersMu.Unlock()
if err := t.addedSubscriber(); err != nil {
t.subscribersMu.Lock()
t.traceStartWriter = nil
t.subscribersMu.Unlock()
return err
}
return nil
}
func (t *traceMultiplexer) unsubscribeTraceStartWriter() {
t.Lock()
defer t.Unlock()
t.removingSubscriber()
t.subscribersMu.Lock()
if t.traceStartWriter == nil {
t.subscribersMu.Unlock()
return
}
t.traceStartWriter = nil
t.subscribersMu.Unlock()
t.removedSubscriber()
return
}
func (t *traceMultiplexer) addedSubscriber() error {
if t.enabled.Load() {
// This is necessary for the trace reader goroutine to pick up on the new subscriber.
runtime_traceAdvance(false)
} else {
if err := t.startLocked(); err != nil {
return err
}
}
t.subscribers++
return nil
}
func (t *traceMultiplexer) removingSubscriber() {
if t.subscribers == 0 {
return
}
t.subscribers--
if t.subscribers == 0 {
runtime.StopTrace()
t.enabled.Store(false)
} else {
// This is necessary to avoid missing trace data when the system is under high load.
runtime_traceAdvance(false)
}
}
func (t *traceMultiplexer) removedSubscriber() {
if t.subscribers > 0 {
// This is necessary for the trace reader goroutine to pick up on the new subscriber.
runtime_traceAdvance(false)
}
}
func (t *traceMultiplexer) startLocked() error {
if err := runtime.StartTrace(); err != nil {
return err
}
// Grab the trace reader goroutine's subscribers.
//
// We only update our subscribers if we see an end-of-generation
// signal from the runtime after this, so any new subscriptions
// or unsubscriptions must call traceAdvance to ensure the reader
// goroutine sees an end-of-generation signal.
t.subscribersMu.Lock()
flightRecorder := t.flightRecorder
traceStartWriter := t.traceStartWriter
t.subscribersMu.Unlock()
go func() {
for {
data := runtime_readTrace()
if data == nil {
break
}
if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
if flightRecorder != nil {
flightRecorder.endGeneration()
}
// Pick up any changes.
t.subscribersMu.Lock()
flightRecorder = t.flightRecorder
traceStartWriter = t.traceStartWriter
t.subscribersMu.Unlock()
} else {
if traceStartWriter != nil {
traceStartWriter.Write(data)
}
if flightRecorder != nil {
flightRecorder.Write(data)
}
}
}
}()
t.enabled.Store(true)
return nil
}
//go:linkname runtime_readTrace
func runtime_readTrace() (buf []byte)

View File

@ -110,45 +110,17 @@ package trace
import ( import (
"io" "io"
"runtime"
"sync"
"sync/atomic"
) )
// Start enables tracing for the current program. // Start enables tracing for the current program.
// While tracing, the trace will be buffered and written to w. // While tracing, the trace will be buffered and written to w.
// Start returns an error if tracing is already enabled. // Start returns an error if tracing is already enabled.
func Start(w io.Writer) error { func Start(w io.Writer) error {
tracing.Lock() return tracing.subscribeTraceStartWriter(w)
defer tracing.Unlock()
if err := runtime.StartTrace(); err != nil {
return err
}
go func() {
for {
data := runtime.ReadTrace()
if data == nil {
break
}
w.Write(data)
}
}()
tracing.enabled.Store(true)
return nil
} }
// Stop stops the current tracing, if any. // Stop stops the current tracing, if any.
// Stop only returns after all the writes for the trace have completed. // Stop only returns after all the writes for the trace have completed.
func Stop() { func Stop() {
tracing.Lock() tracing.unsubscribeTraceStartWriter()
defer tracing.Unlock()
tracing.enabled.Store(false)
runtime.StopTrace()
}
var tracing struct {
sync.Mutex // gate mutators (Start, Stop)
enabled atomic.Bool
} }

View File

@ -59,37 +59,43 @@ func (a *traceRegionAlloc) alloc(n uintptr) *notInHeap {
} }
// Try to install a new block. // Try to install a new block.
lock(&a.lock) var x *notInHeap
systemstack(func() {
// Acquire a.lock on the systemstack to avoid stack growth
// and accidentally entering the tracer again.
lock(&a.lock)
// Check block again under the lock. Someone may // Check block again under the lock. Someone may
// have gotten here first. // have gotten here first.
block = (*traceRegionAllocBlock)(a.current.Load()) block = (*traceRegionAllocBlock)(a.current.Load())
if block != nil { if block != nil {
r := block.off.Add(n) r := block.off.Add(n)
if r <= uintptr(len(block.data)) { if r <= uintptr(len(block.data)) {
unlock(&a.lock) unlock(&a.lock)
return (*notInHeap)(unsafe.Pointer(&block.data[r-n])) x = (*notInHeap)(unsafe.Pointer(&block.data[r-n]))
return
}
// Add the existing block to the full list.
block.next = a.full
a.full = block
} }
// Add the existing block to the full list. // Allocate a new block.
block.next = a.full block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc"))
a.full = block if block == nil {
} throw("traceRegion: out of memory")
}
// Allocate a new block. // Allocate space for our current request, so we always make
block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc")) // progress.
if block == nil { block.off.Store(n)
throw("traceRegion: out of memory") x = (*notInHeap)(unsafe.Pointer(&block.data[0]))
}
// Allocate space for our current request, so we always make // Publish the new block.
// progress. a.current.Store(unsafe.Pointer(block))
block.off.Store(n) unlock(&a.lock)
x := (*notInHeap)(unsafe.Pointer(&block.data[0])) })
// Publish the new block.
a.current.Store(unsafe.Pointer(block))
unlock(&a.lock)
return x return x
} }

View File

@ -64,6 +64,8 @@ func traceClockNow() traceTime {
// traceClockUnitsPerSecond estimates the number of trace clock units per // traceClockUnitsPerSecond estimates the number of trace clock units per
// second that elapse. // second that elapse.
//
//go:linkname traceClockUnitsPerSecond runtime/trace.runtime_traceClockUnitsPerSecond
func traceClockUnitsPerSecond() uint64 { func traceClockUnitsPerSecond() uint64 {
if osHasLowResClock { if osHasLowResClock {
// We're using cputicks as our clock, so we need a real estimate. // We're using cputicks as our clock, so we need a real estimate.