diff --git a/api/next/63185.txt b/api/next/63185.txt new file mode 100644 index 0000000000..b5127ff8b2 --- /dev/null +++ b/api/next/63185.txt @@ -0,0 +1,9 @@ +pkg runtime/trace, func NewFlightRecorder(FlightRecorderConfig) *FlightRecorder #63185 +pkg runtime/trace, method (*FlightRecorder) Enabled() bool #63185 +pkg runtime/trace, method (*FlightRecorder) Start() error #63185 +pkg runtime/trace, method (*FlightRecorder) Stop() #63185 +pkg runtime/trace, method (*FlightRecorder) WriteTo(io.Writer) (int64, error) #63185 +pkg runtime/trace, type FlightRecorder struct #63185 +pkg runtime/trace, type FlightRecorderConfig struct #63185 +pkg runtime/trace, type FlightRecorderConfig struct, MaxBytes uint64 #63185 +pkg runtime/trace, type FlightRecorderConfig struct, MinAge time.Duration #63185 diff --git a/doc/next/6-stdlib/99-minor/runtime/trace/63185.md b/doc/next/6-stdlib/99-minor/runtime/trace/63185.md new file mode 100644 index 0000000000..80ba088b75 --- /dev/null +++ b/doc/next/6-stdlib/99-minor/runtime/trace/63185.md @@ -0,0 +1,2 @@ + +TODO The flight recorder has been added to the runtime/trace package. diff --git a/src/internal/trace/testtrace/validation.go b/src/internal/trace/testtrace/validation.go index 2060d0d44c..3de1e1d4bd 100644 --- a/src/internal/trace/testtrace/validation.go +++ b/src/internal/trace/testtrace/validation.go @@ -98,25 +98,26 @@ func (v *Validator) Event(ev trace.Event) error { if v.GoVersion >= version.Go125 && !(s.N > 1 && s.ClockSnapshot == nil) { if s.ClockSnapshot == nil { e.Errorf("sync %d has no clock snapshot", s.N) - } - if s.ClockSnapshot.Wall.IsZero() { - e.Errorf("sync %d has zero wall time", s.N) - } - if s.ClockSnapshot.Mono == 0 { - e.Errorf("sync %d has zero mono time", s.N) - } - if s.ClockSnapshot.Trace == 0 { - e.Errorf("sync %d has zero trace time", s.N) - } - if !v.skipClockSnapshotChecks { - if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) { - e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall) + } else { + if s.ClockSnapshot.Wall.IsZero() { + e.Errorf("sync %d has zero wall time", s.N) } - if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) { - e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono) + if s.ClockSnapshot.Mono == 0 { + e.Errorf("sync %d has zero mono time", s.N) } - if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) { - e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace) + if s.ClockSnapshot.Trace == 0 { + e.Errorf("sync %d has zero trace time", s.N) + } + if !v.skipClockSnapshotChecks { + if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) { + e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall) + } + if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) { + e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono) + } + if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) { + e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace) + } } } } diff --git a/src/internal/trace/tracev2/events.go b/src/internal/trace/tracev2/events.go index fc7b27720a..bfbbdec00f 100644 --- a/src/internal/trace/tracev2/events.go +++ b/src/internal/trace/tracev2/events.go @@ -87,6 +87,10 @@ const ( EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot] EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec] + // Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25. + // This could be used as an explicit in-band end-of-generation signal in the future. + EvEndOfGeneration + NumEvents ) @@ -189,6 +193,9 @@ var specs = [...]EventSpec{ EvSync: { Name: "Sync", }, + EvEndOfGeneration: { + Name: "EndOfGeneration", + }, // "Timed" Events. EvProcsChange: { diff --git a/src/runtime/trace.go b/src/runtime/trace.go index c4d68cb714..139cbba6a9 100644 --- a/src/runtime/trace.go +++ b/src/runtime/trace.go @@ -21,6 +21,7 @@ package runtime import ( "internal/runtime/atomic" + "internal/trace/tracev2" "unsafe" ) @@ -51,9 +52,10 @@ var trace struct { // State for the trace reader goroutine. // // Protected by trace.lock. - readerGen atomic.Uintptr // the generation the reader is currently reading for - flushedGen atomic.Uintptr // the last completed generation - headerWritten bool // whether ReadTrace has emitted trace header + readerGen atomic.Uintptr // the generation the reader is currently reading for + flushedGen atomic.Uintptr // the last completed generation + headerWritten bool // whether ReadTrace has emitted trace header + endOfGenerationWritten bool // whether readTrace has emitted the end of the generation signal // doneSema is used to synchronize the reader and traceAdvance. Specifically, // it notifies traceAdvance that the reader is done with a generation. @@ -753,8 +755,24 @@ func traceRegisterLabelsAndReasons(gen uintptr) { // returned data before calling ReadTrace again. // ReadTrace must be called from one goroutine at a time. func ReadTrace() []byte { + for { + buf := readTrace() + + // Skip over the end-of-generation signal which must not appear + // in the final trace. + if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration { + continue + } + return buf + } +} + +// readTrace is the implementation of ReadTrace, except with an additional +// in-band signal as to when the buffer is for a new generation. +// +//go:linkname readTrace runtime/trace.runtime_readTrace +func readTrace() (buf []byte) { top: - var buf []byte var park bool systemstack(func() { buf, park = readTrace0() @@ -782,7 +800,6 @@ top: }, nil, waitReasonTraceReaderBlocked, traceBlockSystemGoroutine, 2) goto top } - return buf } @@ -849,6 +866,17 @@ func readTrace0() (buf []byte, park bool) { // is waiting on the reader to finish flushing the last generation so that it // can continue to advance. if trace.flushedGen.Load() == gen { + // Write out the internal in-band end-of-generation signal. + if !trace.endOfGenerationWritten { + trace.endOfGenerationWritten = true + unlock(&trace.lock) + return []byte{byte(tracev2.EvEndOfGeneration)}, false + } + + // Reset the flag. + trace.endOfGenerationWritten = false + + // Handle shutdown. if trace.shutdown.Load() { unlock(&trace.lock) @@ -868,6 +896,8 @@ func readTrace0() (buf []byte, park bool) { // read. We're done. return nil, false } + // Handle advancing to the next generation. + // The previous gen has had all of its buffers flushed, and // there's nothing else for us to read. Advance the generation // we're reading from and try again. diff --git a/src/runtime/trace/batch.go b/src/runtime/trace/batch.go new file mode 100644 index 0000000000..d726a3d375 --- /dev/null +++ b/src/runtime/trace/batch.go @@ -0,0 +1,83 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "fmt" + "internal/trace/tracev2" +) + +// timestamp is an unprocessed timestamp. +type timestamp uint64 + +// batch represents a batch of trace events. +// It is unparsed except for its header. +type batch struct { + m threadID + time timestamp + data []byte +} + +// threadID is the runtime-internal M structure's ID. This is unique +// for each OS thread. +type threadID int64 + +// readBatch copies b and parses the trace batch header inside. +// Returns the batch, the generation, bytes read, and an error. +func readBatch(b []byte) (batch, uint64, uint64, error) { + if len(b) == 0 { + return batch{}, 0, 0, fmt.Errorf("batch is empty") + } + data := make([]byte, len(b)) + if nw := copy(data, b); nw != len(b) { + return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch") + } + // Read batch header byte. + if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch { + return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ) + } + + // Read the batch header: gen (generation), thread (M) ID, base timestamp + // for the batch. + total := 1 + b = b[1:] + gen, n, err := readUvarint(b) + if err != nil { + return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err) + } + total += n + b = b[n:] + m, n, err := readUvarint(b) + if err != nil { + return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err) + } + total += n + b = b[n:] + ts, n, err := readUvarint(b) + if err != nil { + return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err) + } + total += n + b = b[n:] + + // Read in the size of the batch to follow. + size, n, err := readUvarint(b) + if err != nil { + return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err) + } + if size > tracev2.MaxBatchSize { + return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize) + } + total += n + total += int(size) + data = data[:total] + + // Return the batch. + return batch{ + m: threadID(m), + time: timestamp(ts), + data: data, + }, gen, uint64(total), nil +} diff --git a/src/runtime/trace/encoding.go b/src/runtime/trace/encoding.go new file mode 100644 index 0000000000..46990c658c --- /dev/null +++ b/src/runtime/trace/encoding.go @@ -0,0 +1,50 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "errors" +) + +// maxVarintLenN is the maximum length of a varint-encoded N-bit integer. +const maxVarintLen64 = 10 + +var ( + errOverflow = errors.New("binary: varint overflows a 64-bit integer") + errEOB = errors.New("binary: end of buffer") +) + +// TODO deduplicate this function. +func readUvarint(b []byte) (uint64, int, error) { + var x uint64 + var s uint + var byt byte + for i := 0; i < maxVarintLen64 && i < len(b); i++ { + byt = b[i] + if byt < 0x80 { + if i == maxVarintLen64-1 && byt > 1 { + return x, i, errOverflow + } + return x | uint64(byt)<= 0x80 { + buf[i] = byte(x) | 0x80 + x >>= 7 + i++ + } + buf[i] = byte(x) + return i + 1 +} diff --git a/src/runtime/trace/flightrecorder.go b/src/runtime/trace/flightrecorder.go new file mode 100644 index 0000000000..24163f32b2 --- /dev/null +++ b/src/runtime/trace/flightrecorder.go @@ -0,0 +1,182 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "fmt" + "io" + "sync" + "time" + _ "unsafe" // added for go linkname usage +) + +// FlightRecorder represents a single consumer of a Go execution +// trace. +// It tracks a moving window over the execution trace produced by +// the runtime, always containing the most recent trace data. +// +// At most one flight recorder may be active at any given time, +// though flight recording is allowed to be concurrently active +// with a trace consumer using trace.Start. +// This restriction of only a single flight recorder may be removed +// in the future. +type FlightRecorder struct { + err error + + // State specific to the recorder. + header [16]byte + active rawGeneration + ringMu sync.Mutex + ring []rawGeneration + freq frequency // timestamp conversion factor, from the runtime + + // Externally-set options. + targetSize uint64 + targetPeriod time.Duration + + enabled bool // whether the flight recorder is enabled. + writing sync.Mutex // protects concurrent calls to WriteTo + + // The values of targetSize and targetPeriod we've committed to since the last Start. + wantSize uint64 + wantDur time.Duration +} + +// NewFlightRecorder creates a new flight recorder from the provided configuration. +func NewFlightRecorder(cfg FlightRecorderConfig) *FlightRecorder { + fr := new(FlightRecorder) + if cfg.MaxBytes != 0 { + fr.targetSize = cfg.MaxBytes + } else { + fr.targetSize = 10 << 20 // 10 MiB. + } + + if cfg.MinAge != 0 { + fr.targetPeriod = cfg.MinAge + } else { + fr.targetPeriod = 10 * time.Second + } + return fr +} + +// Start activates the flight recorder and begins recording trace data. +// Only one call to trace.Start may be active at any given time. +// In addition, currently only one flight recorder may be active in the program. +// Returns an error if the flight recorder cannot be started or is already started. +func (fr *FlightRecorder) Start() error { + if fr.enabled { + return fmt.Errorf("cannot enable a enabled flight recorder") + } + fr.wantSize = fr.targetSize + fr.wantDur = fr.targetPeriod + fr.err = nil + fr.freq = frequency(1.0 / (float64(runtime_traceClockUnitsPerSecond()) / 1e9)) + + // Start tracing, data is sent to a recorder which forwards it to our own + // storage. + if err := tracing.subscribeFlightRecorder(&recorder{r: fr}); err != nil { + return err + } + + fr.enabled = true + return nil +} + +// Stop ends recording of trace data. It blocks until any concurrent WriteTo calls complete. +func (fr *FlightRecorder) Stop() { + if !fr.enabled { + return + } + fr.enabled = false + tracing.unsubscribeFlightRecorder() + + // Reset all state. No need to lock because the reader has already exited. + fr.active = rawGeneration{} + fr.ring = nil +} + +// Enabled returns true if the flight recorder is active. +// Specifically, it will return true if Start did not return an error, and Stop has not yet been called. +// It is safe to call from multiple goroutines simultaneously. +func (fr *FlightRecorder) Enabled() bool { return fr.enabled } + +// WriteTo snapshots the moving window tracked by the flight recorder. +// The snapshot is expected to contain data that is up-to-date as of when WriteTo is called, +// though this is not a hard guarantee. +// Only one goroutine may execute WriteTo at a time. +// An error is returned upon failure to write to w, if another WriteTo call is already in-progress, +// or if the flight recorder is inactive. +func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) { + if !fr.enabled { + return 0, fmt.Errorf("cannot snapshot a disabled flight recorder") + } + if !fr.writing.TryLock() { + // Indicates that a call to WriteTo was made while one was already in progress. + // If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo. + return 0, fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress") + } + defer fr.writing.Unlock() + + // Force a global buffer flush. + runtime_traceAdvance(false) + + // Now that everything has been flushed and written, grab whatever we have. + // + // N.B. traceAdvance blocks until the tracer goroutine has actually written everything + // out, which means the generation we just flushed must have been already been observed + // by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to + // have been both completed *and* processed by the recorder goroutine. + fr.ringMu.Lock() + gens := fr.ring + fr.ringMu.Unlock() + + // Write the header. + nw, err := w.Write(fr.header[:]) + if err != nil { + return int64(nw), err + } + n += int64(nw) + + // Write all the data. + for _, gen := range gens { + for _, batch := range gen.batches { + // Write batch data. + nw, err = w.Write(batch.data) + n += int64(nw) + if err != nil { + return n, err + } + } + } + return n, nil +} + +type FlightRecorderConfig struct { + // MinAge is a lower bound on the age of an event in the flight recorder's window. + // + // The flight recorder will strive to promptly discard events older than the minimum age, + // but older events may appear in the window snapshot. The age setting will always be + // overridden by MaxSize. + // + // If this is 0, the minimum age is implementation defined, but can be assumed to be on the order + // of seconds. + MinAge time.Duration + + // MaxBytes is an upper bound on the size of the window in bytes. + // + // This setting takes precedence over MinAge. + // However, it does not make any guarantees on the size of the data WriteTo will write, + // nor does it guarantee memory overheads will always stay below MaxBytes. Treat it + // as a hint. + // + // If this is 0, the maximum size is implementation defined. + MaxBytes uint64 +} + +//go:linkname runtime_traceClockUnitsPerSecond +func runtime_traceClockUnitsPerSecond() uint64 + +//go:linkname runtime_traceAdvance runtime.traceAdvance +func runtime_traceAdvance(stopTrace bool) diff --git a/src/runtime/trace/flightrecorder_test.go b/src/runtime/trace/flightrecorder_test.go new file mode 100644 index 0000000000..075215db00 --- /dev/null +++ b/src/runtime/trace/flightrecorder_test.go @@ -0,0 +1,308 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace_test + +import ( + "bytes" + "context" + inttrace "internal/trace" + "internal/trace/testtrace" + "io" + "runtime/trace" + "slices" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestFlightRecorderDoubleStart(t *testing.T) { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error on Start: %v", err) + } + if err := fr.Start(); err == nil { + t.Fatalf("expected error from double Start: %v", err) + } + fr.Stop() +} + +func TestFlightRecorderEnabled(t *testing.T) { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + + if fr.Enabled() { + t.Fatal("flight recorder is enabled, but never started") + } + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error on Start: %v", err) + } + if !fr.Enabled() { + t.Fatal("flight recorder is not enabled, but started") + } + fr.Stop() + if fr.Enabled() { + t.Fatal("flight recorder is enabled, but stopped") + } +} + +func TestFlightRecorderWriteToDisabled(t *testing.T) { + var buf bytes.Buffer + + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + if n, err := fr.WriteTo(&buf); err == nil { + t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n) + } + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error on Start: %v", err) + } + fr.Stop() + if n, err := fr.WriteTo(&buf); err == nil { + t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n) + } +} + +func TestFlightRecorderConcurrentWriteTo(t *testing.T) { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error on Start: %v", err) + } + + // Start two goroutines to write snapshots. + // + // Most of the time one will fail and one will succeed, but we don't require this. + // Due to a variety of factors, it's definitely possible for them both to succeed. + // However, at least one should succeed. + var bufs [2]bytes.Buffer + var wg sync.WaitGroup + var successes atomic.Uint32 + for i := range bufs { + wg.Add(1) + go func() { + defer wg.Done() + + n, err := fr.WriteTo(&bufs[i]) + // TODO(go.dev/issue/63185) was an exported error. Consider refactoring. + if err != nil && err.Error() == "call to WriteTo for trace.FlightRecorder already in progress" { + if n != 0 { + t.Errorf("(goroutine %d) WriteTo bytes written is non-zero for early bail out: %d", i, n) + } + return + } + if err != nil { + t.Errorf("(goroutine %d) failed to write snapshot for unexpected reason: %v", i, err) + } + successes.Add(1) + + if n == 0 { + t.Errorf("(goroutine %d) wrote invalid trace of zero bytes in size", i) + } + if n != int64(bufs[i].Len()) { + t.Errorf("(goroutine %d) trace length doesn't match WriteTo result: got %d, want %d", i, n, int64(bufs[i].Len())) + } + }() + } + wg.Wait() + + // Stop tracing. + fr.Stop() + + // Make sure at least one succeeded to write. + if successes.Load() == 0 { + t.Fatal("expected at least one success to write a snapshot, got zero") + } + + // Validate the traces that came out. + for i := range bufs { + buf := &bufs[i] + if buf.Len() == 0 { + continue + } + testReader(t, buf, testtrace.ExpectSuccess()) + } +} + +func TestFlightRecorder(t *testing.T) { + testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) { + snapshot() + }) +} + +func TestFlightRecorderStartStop(t *testing.T) { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + for i := 0; i < 5; i++ { + testFlightRecorder(t, fr, func(snapshot func()) { + snapshot() + }) + } +} + +func TestFlightRecorderLog(t *testing.T) { + tr := testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) { + trace.Log(context.Background(), "message", "hello") + snapshot() + }) + + // Prepare to read the trace snapshot. + r, err := inttrace.NewReader(bytes.NewReader(tr)) + if err != nil { + t.Fatalf("unexpected error creating trace reader: %v", err) + } + + // Find the log message in the trace. + found := false + for { + ev, err := r.ReadEvent() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("unexpected error reading trace: %v", err) + } + if !found && ev.Kind() == inttrace.EventLog { + log := ev.Log() + found = log.Category == "message" && log.Message == "hello" + } + } + if !found { + t.Errorf("failed to find expected log message (%q, %q) in snapshot", "message", "hello") + } +} + +func TestFlightRecorderOneGeneration(t *testing.T) { + test := func(t *testing.T, fr *trace.FlightRecorder) { + tr := testFlightRecorder(t, fr, func(snapshot func()) { + // Sleep to let a few generations pass. + time.Sleep(3 * time.Second) + snapshot() + }) + + // Prepare to read the trace snapshot. + r, err := inttrace.NewReader(bytes.NewReader(tr)) + if err != nil { + t.Fatalf("unexpected error creating trace reader: %v", err) + } + + // Make sure there are exactly two Sync events: at the start and end. + var syncs []int + evs := 0 + for { + ev, err := r.ReadEvent() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("unexpected error reading trace: %v", err) + } + if ev.Kind() == inttrace.EventSync { + syncs = append(syncs, evs) + } + evs++ + } + if ends := []int{0, evs - 1}; !slices.Equal(syncs, ends) { + t.Errorf("expected two sync events (one at each end of the trace), found %d at %d instead of %d", + len(syncs), syncs[:min(len(syncs), 5)], ends) + } + } + t.Run("SetMinAge", func(t *testing.T) { + t.Skip("issue 63185: flaky test") + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MinAge: time.Millisecond}) + test(t, fr) + }) + t.Run("MaxBytes", func(t *testing.T) { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16}) + test(t, fr) + }) +} + +type flightRecorderTestFunc func(snapshot func()) + +func testFlightRecorder(t *testing.T, fr *trace.FlightRecorder, f flightRecorderTestFunc) []byte { + if trace.IsEnabled() { + t.Skip("cannot run flight recorder tests when tracing is enabled") + } + + // Start the flight recorder. + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error on Start: %v", err) + } + + // Set up snapshot callback. + var buf bytes.Buffer + callback := func() { + n, err := fr.WriteTo(&buf) + if err != nil { + t.Errorf("unexpected failure during flight recording: %v", err) + return + } + if n < 16 { + t.Errorf("expected a trace size of at least 16 bytes, got %d", n) + } + if n != int64(buf.Len()) { + t.Errorf("WriteTo result doesn't match trace size: got %d, want %d", n, int64(buf.Len())) + } + } + + // Call the test function. + f(callback) + + // Stop the flight recorder. + fr.Stop() + + // Get the trace bytes; we don't want to use the Buffer as a Reader directly + // since we may want to consume this data more than once. + traceBytes := buf.Bytes() + + // Parse the trace to make sure it's not broken. + testReader(t, bytes.NewReader(traceBytes), testtrace.ExpectSuccess()) + return traceBytes +} + +func testReader(t *testing.T, tr io.Reader, exp *testtrace.Expectation) { + r, err := inttrace.NewReader(tr) + if err != nil { + if err := exp.Check(err); err != nil { + t.Error(err) + } + return + } + v := testtrace.NewValidator() + v.SkipClockSnapshotChecks() + for { + ev, err := r.ReadEvent() + if err == io.EOF { + break + } + if err != nil { + if err := exp.Check(err); err != nil { + t.Error(err) + } + return + } + if err := v.Event(ev); err != nil { + t.Error(err) + } + } + if err := exp.Check(nil); err != nil { + t.Error(err) + } +} + +func TestTraceAndFlightRecorder(t *testing.T) { + var tBuf, frBuf bytes.Buffer + if err := trace.Start(&tBuf); err != nil { + t.Errorf("unable to start execution tracer: %s", err) + } + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16}) + fr.Start() + fr.WriteTo(&frBuf) + fr.Stop() + trace.Stop() + if tBuf.Len() == 0 || frBuf.Len() == 0 { + t.Errorf("None of these should be equal to zero: %d %d", tBuf.Len(), frBuf.Len()) + } + if tBuf.Len() <= frBuf.Len() { + t.Errorf("trace should be longer than the flight recorder: trace=%d flight record=%d", tBuf.Len(), frBuf.Len()) + } +} diff --git a/src/runtime/trace/recorder.go b/src/runtime/trace/recorder.go new file mode 100644 index 0000000000..78e003e2a5 --- /dev/null +++ b/src/runtime/trace/recorder.go @@ -0,0 +1,144 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "fmt" + "slices" + "time" + _ "unsafe" // added for go linkname usage +) + +// A recorder receives bytes from the runtime tracer, processes it. +type recorder struct { + r *FlightRecorder + + headerReceived bool +} + +func (w *recorder) Write(b []byte) (n int, err error) { + r := w.r + + defer func() { + if err != nil { + // Propagate errors to the flightrecorder. + if r.err == nil { + r.err = err + } + } + }() + + if !w.headerReceived { + if len(b) < len(r.header) { + return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header)) + } + r.header = ([16]byte)(b[:16]) + n += 16 + w.headerReceived = true + } + if len(b) == n { + return 0, nil + } + ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch. + if err != nil { + return len(b) - int(nb) - n, err + } + n += int(nb) + + // Append the batch to the current generation. + if r.active.gen == 0 { + r.active.gen = gen + } + if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) { + r.active.minTime = r.freq.mul(ba.time) + } + r.active.size += len(ba.data) + r.active.batches = append(r.active.batches, ba) + + return len(b), nil +} + +func (w *recorder) endGeneration() { + r := w.r + + // Check if we're entering a new generation. + r.ringMu.Lock() + + // Get the current trace clock time. + now := traceTimeNow(r.freq) + + // Add the current generation to the ring. Make sure we always have at least one + // complete generation by putting the active generation onto the new list, regardless + // of whatever our settings are. + // + // N.B. Let's completely replace the ring here, so that WriteTo can just make a copy + // and not worry about aliasing. This creates allocations, but at a very low rate. + newRing := []rawGeneration{r.active} + size := r.active.size + for i := len(r.ring) - 1; i >= 0; i-- { + // Stop adding older generations if the new ring already exceeds the thresholds. + // This ensures we keep generations that cross a threshold, but not any that lie + // entirely outside it. + if uint64(size) > r.wantSize || now.Sub(newRing[len(newRing)-1].minTime) > r.wantDur { + break + } + size += r.ring[i].size + newRing = append(newRing, r.ring[i]) + } + slices.Reverse(newRing) + r.ring = newRing + r.ringMu.Unlock() + + // Start a new active generation. + r.active = rawGeneration{} +} + +type rawGeneration struct { + gen uint64 + size int + minTime eventTime + batches []batch +} + +func traceTimeNow(freq frequency) eventTime { + return freq.mul(timestamp(runtime_traceClockNow())) +} + +//go:linkname runtime_traceClockNow runtime.traceClockNow +func runtime_traceClockNow() int64 + +// frequency is nanoseconds per timestamp unit. +type frequency float64 + +// mul multiplies an unprocessed to produce a time in nanoseconds. +func (f frequency) mul(t timestamp) eventTime { + return eventTime(float64(t) * float64(f)) +} + +// eventTime is a timestamp in nanoseconds. +// +// It corresponds to the monotonic clock on the platform that the +// trace was taken, and so is possible to correlate with timestamps +// for other traces taken on the same machine using the same clock +// (i.e. no reboots in between). +// +// The actual absolute value of the timestamp is only meaningful in +// relation to other timestamps from the same clock. +// +// BUG: Timestamps coming from traces on Windows platforms are +// only comparable with timestamps from the same trace. Timestamps +// across traces cannot be compared, because the system clock is +// not used as of Go 1.22. +// +// BUG: Traces produced by Go versions 1.21 and earlier cannot be +// compared with timestamps from other traces taken on the same +// machine. This is because the system clock was not used at all +// to collect those timestamps. +type eventTime int64 + +// Sub subtracts t0 from t, returning the duration in nanoseconds. +func (t eventTime) Sub(t0 eventTime) time.Duration { + return time.Duration(int64(t) - int64(t0)) +} diff --git a/src/runtime/trace/subscribe.go b/src/runtime/trace/subscribe.go new file mode 100644 index 0000000000..45320cee36 --- /dev/null +++ b/src/runtime/trace/subscribe.go @@ -0,0 +1,188 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "fmt" + "internal/trace/tracev2" + "io" + "runtime" + "sync" + "sync/atomic" + _ "unsafe" +) + +var tracing traceMultiplexer + +type traceMultiplexer struct { + sync.Mutex + enabled atomic.Bool + subscribers int + + subscribersMu sync.Mutex + traceStartWriter io.Writer + flightRecorder *recorder +} + +func (t *traceMultiplexer) subscribeFlightRecorder(r *recorder) error { + t.Lock() + defer t.Unlock() + + t.subscribersMu.Lock() + if t.flightRecorder != nil { + t.subscribersMu.Unlock() + return fmt.Errorf("flight recorder already enabled") + } + t.flightRecorder = r + t.subscribersMu.Unlock() + + if err := t.addedSubscriber(); err != nil { + t.subscribersMu.Lock() + t.flightRecorder = nil + t.subscribersMu.Unlock() + return err + } + return nil +} + +func (t *traceMultiplexer) unsubscribeFlightRecorder() error { + t.Lock() + defer t.Unlock() + + t.removingSubscriber() + + t.subscribersMu.Lock() + if t.flightRecorder == nil { + t.subscribersMu.Unlock() + return fmt.Errorf("attempt to unsubscribe missing flight recorder") + } + t.flightRecorder = nil + t.subscribersMu.Unlock() + + t.removedSubscriber() + return nil +} + +func (t *traceMultiplexer) subscribeTraceStartWriter(w io.Writer) error { + t.Lock() + defer t.Unlock() + + t.subscribersMu.Lock() + if t.traceStartWriter != nil { + t.subscribersMu.Unlock() + return fmt.Errorf("execution tracer already enabled") + } + t.traceStartWriter = w + t.subscribersMu.Unlock() + + if err := t.addedSubscriber(); err != nil { + t.subscribersMu.Lock() + t.traceStartWriter = nil + t.subscribersMu.Unlock() + return err + } + return nil +} + +func (t *traceMultiplexer) unsubscribeTraceStartWriter() { + t.Lock() + defer t.Unlock() + + t.removingSubscriber() + + t.subscribersMu.Lock() + if t.traceStartWriter == nil { + t.subscribersMu.Unlock() + return + } + t.traceStartWriter = nil + t.subscribersMu.Unlock() + + t.removedSubscriber() + return +} + +func (t *traceMultiplexer) addedSubscriber() error { + if t.enabled.Load() { + // This is necessary for the trace reader goroutine to pick up on the new subscriber. + runtime_traceAdvance(false) + } else { + if err := t.startLocked(); err != nil { + return err + } + } + t.subscribers++ + return nil +} + +func (t *traceMultiplexer) removingSubscriber() { + if t.subscribers == 0 { + return + } + t.subscribers-- + if t.subscribers == 0 { + runtime.StopTrace() + t.enabled.Store(false) + } else { + // This is necessary to avoid missing trace data when the system is under high load. + runtime_traceAdvance(false) + } +} + +func (t *traceMultiplexer) removedSubscriber() { + if t.subscribers > 0 { + // This is necessary for the trace reader goroutine to pick up on the new subscriber. + runtime_traceAdvance(false) + } +} + +func (t *traceMultiplexer) startLocked() error { + if err := runtime.StartTrace(); err != nil { + return err + } + + // Grab the trace reader goroutine's subscribers. + // + // We only update our subscribers if we see an end-of-generation + // signal from the runtime after this, so any new subscriptions + // or unsubscriptions must call traceAdvance to ensure the reader + // goroutine sees an end-of-generation signal. + t.subscribersMu.Lock() + flightRecorder := t.flightRecorder + traceStartWriter := t.traceStartWriter + t.subscribersMu.Unlock() + + go func() { + for { + data := runtime_readTrace() + if data == nil { + break + } + if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration { + if flightRecorder != nil { + flightRecorder.endGeneration() + } + + // Pick up any changes. + t.subscribersMu.Lock() + flightRecorder = t.flightRecorder + traceStartWriter = t.traceStartWriter + t.subscribersMu.Unlock() + } else { + if traceStartWriter != nil { + traceStartWriter.Write(data) + } + if flightRecorder != nil { + flightRecorder.Write(data) + } + } + } + }() + t.enabled.Store(true) + return nil +} + +//go:linkname runtime_readTrace +func runtime_readTrace() (buf []byte) diff --git a/src/runtime/trace/trace.go b/src/runtime/trace/trace.go index 935d222f02..a858d1b101 100644 --- a/src/runtime/trace/trace.go +++ b/src/runtime/trace/trace.go @@ -110,45 +110,17 @@ package trace import ( "io" - "runtime" - "sync" - "sync/atomic" ) // Start enables tracing for the current program. // While tracing, the trace will be buffered and written to w. // Start returns an error if tracing is already enabled. func Start(w io.Writer) error { - tracing.Lock() - defer tracing.Unlock() - - if err := runtime.StartTrace(); err != nil { - return err - } - go func() { - for { - data := runtime.ReadTrace() - if data == nil { - break - } - w.Write(data) - } - }() - tracing.enabled.Store(true) - return nil + return tracing.subscribeTraceStartWriter(w) } // Stop stops the current tracing, if any. // Stop only returns after all the writes for the trace have completed. func Stop() { - tracing.Lock() - defer tracing.Unlock() - tracing.enabled.Store(false) - - runtime.StopTrace() -} - -var tracing struct { - sync.Mutex // gate mutators (Start, Stop) - enabled atomic.Bool + tracing.unsubscribeTraceStartWriter() } diff --git a/src/runtime/traceregion.go b/src/runtime/traceregion.go index 2fb27e6e01..eb19294f1b 100644 --- a/src/runtime/traceregion.go +++ b/src/runtime/traceregion.go @@ -59,37 +59,43 @@ func (a *traceRegionAlloc) alloc(n uintptr) *notInHeap { } // Try to install a new block. - lock(&a.lock) + var x *notInHeap + systemstack(func() { + // Acquire a.lock on the systemstack to avoid stack growth + // and accidentally entering the tracer again. + lock(&a.lock) - // Check block again under the lock. Someone may - // have gotten here first. - block = (*traceRegionAllocBlock)(a.current.Load()) - if block != nil { - r := block.off.Add(n) - if r <= uintptr(len(block.data)) { - unlock(&a.lock) - return (*notInHeap)(unsafe.Pointer(&block.data[r-n])) + // Check block again under the lock. Someone may + // have gotten here first. + block = (*traceRegionAllocBlock)(a.current.Load()) + if block != nil { + r := block.off.Add(n) + if r <= uintptr(len(block.data)) { + unlock(&a.lock) + x = (*notInHeap)(unsafe.Pointer(&block.data[r-n])) + return + } + + // Add the existing block to the full list. + block.next = a.full + a.full = block } - // Add the existing block to the full list. - block.next = a.full - a.full = block - } + // Allocate a new block. + block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc")) + if block == nil { + throw("traceRegion: out of memory") + } - // Allocate a new block. - block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc")) - if block == nil { - throw("traceRegion: out of memory") - } + // Allocate space for our current request, so we always make + // progress. + block.off.Store(n) + x = (*notInHeap)(unsafe.Pointer(&block.data[0])) - // Allocate space for our current request, so we always make - // progress. - block.off.Store(n) - x := (*notInHeap)(unsafe.Pointer(&block.data[0])) - - // Publish the new block. - a.current.Store(unsafe.Pointer(block)) - unlock(&a.lock) + // Publish the new block. + a.current.Store(unsafe.Pointer(block)) + unlock(&a.lock) + }) return x } diff --git a/src/runtime/tracetime.go b/src/runtime/tracetime.go index df52f00ad4..7ffab79bad 100644 --- a/src/runtime/tracetime.go +++ b/src/runtime/tracetime.go @@ -64,6 +64,8 @@ func traceClockNow() traceTime { // traceClockUnitsPerSecond estimates the number of trace clock units per // second that elapse. +// +//go:linkname traceClockUnitsPerSecond runtime/trace.runtime_traceClockUnitsPerSecond func traceClockUnitsPerSecond() uint64 { if osHasLowResClock { // We're using cputicks as our clock, so we need a real estimate.