diff --git a/src/runtime/trace/subscribe.go b/src/runtime/trace/subscribe.go index 45320cee36..7e22b6abdb 100644 --- a/src/runtime/trace/subscribe.go +++ b/src/runtime/trace/subscribe.go @@ -155,6 +155,14 @@ func (t *traceMultiplexer) startLocked() error { t.subscribersMu.Unlock() go func() { + header := runtime_readTrace() + if traceStartWriter != nil { + traceStartWriter.Write(header) + } + if flightRecorder != nil { + flightRecorder.Write(header) + } + for { data := runtime_readTrace() if data == nil { @@ -167,9 +175,18 @@ func (t *traceMultiplexer) startLocked() error { // Pick up any changes. t.subscribersMu.Lock() + frIsNew := flightRecorder != t.flightRecorder && t.flightRecorder != nil + trIsNew := traceStartWriter != t.traceStartWriter && t.traceStartWriter != nil flightRecorder = t.flightRecorder traceStartWriter = t.traceStartWriter t.subscribersMu.Unlock() + + if trIsNew { + traceStartWriter.Write(header) + } + if frIsNew { + flightRecorder.Write(header) + } } else { if traceStartWriter != nil { traceStartWriter.Write(data) diff --git a/src/runtime/trace/subscribe_test.go b/src/runtime/trace/subscribe_test.go new file mode 100644 index 0000000000..0e6c57cbc6 --- /dev/null +++ b/src/runtime/trace/subscribe_test.go @@ -0,0 +1,153 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace_test + +import ( + "bytes" + inttrace "internal/trace" + "internal/trace/testtrace" + "io" + "runtime" + "runtime/trace" + "slices" + "testing" +) + +func TestSubscribers(t *testing.T) { + validate := func(t *testing.T, source string, tr io.Reader) { + // Prepare to read the trace snapshot. + r, err := inttrace.NewReader(tr) + if err != nil { + t.Fatalf("unexpected error creating trace reader for %s: %v", source, err) + return + } + + v := testtrace.NewValidator() + // These platforms can't guarantee a monotonically increasing clock reading in a short trace. + if runtime.GOOS == "windows" || runtime.GOARCH == "wasm" { + v.SkipClockSnapshotChecks() + } + // Make sure there are Sync events: at the start and end. + var syncs []int + evs := 0 + for { + ev, err := r.ReadEvent() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("unexpected error reading trace for %s: %v", source, err) + } + if err := v.Event(ev); err != nil { + t.Fatalf("event validation failed: %s", err) + } + if ev.Kind() == inttrace.EventSync { + syncs = append(syncs, evs) + } + evs++ + } + ends := []int{syncs[0], syncs[len(syncs)-1]} + if wantEnds := []int{0, evs - 1}; !slices.Equal(wantEnds, ends) { + t.Errorf("expected a sync event at each end of the trace, found sync events at %d instead of %d for %s", + ends, wantEnds, source) + } + } + + validateTraces := func(t *testing.T, tReader, frReader io.Reader) { + validate(t, "tracer", tReader) + validate(t, "flightRecorder", frReader) + } + startFlightRecorder := func(t *testing.T) *trace.FlightRecorder { + fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{}) + if err := fr.Start(); err != nil { + t.Fatalf("unexpected error creating flight recorder: %v", err) + } + return fr + } + startTrace := func(t *testing.T, w io.Writer) { + if err := trace.Start(w); err != nil { + t.Fatalf("unexpected error starting flight recorder: %v", err) + } + } + stopFlightRecorder := func(t *testing.T, fr *trace.FlightRecorder, w io.Writer) { + if _, err := fr.WriteTo(w); err != nil { + t.Fatalf("unexpected error writing trace from flight recorder: %v", err) + } + fr.Stop() + } + stopTrace := func() { + trace.Stop() + } + t.Run("start(flight)_start(trace)_stop(trace)_stop(flight)", func(t *testing.T) { + if trace.IsEnabled() { + t.Skip("skipping because trace is already enabled") + } + frBuf := new(bytes.Buffer) + tBuf := new(bytes.Buffer) + fr := startFlightRecorder(t) + defer fr.Stop() + startTrace(t, tBuf) + defer trace.Stop() + stopTrace() + stopFlightRecorder(t, fr, frBuf) + validateTraces(t, tBuf, frBuf) + }) + t.Run("start(trace)_start(flight)_stop(trace)_stop(flight)", func(t *testing.T) { + if trace.IsEnabled() { + t.Skip("skipping because trace is already enabled") + } + frBuf := new(bytes.Buffer) + tBuf := new(bytes.Buffer) + startTrace(t, tBuf) + defer trace.Stop() + fr := startFlightRecorder(t) + defer fr.Stop() + stopTrace() + stopFlightRecorder(t, fr, frBuf) + validateTraces(t, tBuf, frBuf) + }) + t.Run("start(flight)_stop(flight)_start(trace)_stop(trace)", func(t *testing.T) { + if trace.IsEnabled() { + t.Skip("skipping because trace is already enabled") + } + frBuf := new(bytes.Buffer) + tBuf := new(bytes.Buffer) + fr := startFlightRecorder(t) + defer fr.Stop() + stopFlightRecorder(t, fr, frBuf) + startTrace(t, tBuf) + defer trace.Stop() + stopTrace() + validateTraces(t, tBuf, frBuf) + }) + t.Run("start(flight)_stop(flight)_start(trace)_stop(trace)", func(t *testing.T) { + if trace.IsEnabled() { + t.Skip("skipping because trace is already enabled") + } + frBuf := new(bytes.Buffer) + tBuf := new(bytes.Buffer) + fr := startFlightRecorder(t) + defer fr.Stop() + stopFlightRecorder(t, fr, frBuf) + startTrace(t, tBuf) + defer trace.Stop() + stopTrace() + validateTraces(t, tBuf, frBuf) + }) + t.Run("start(flight)_start(trace)_stop(flight)_stop(trace)", func(t *testing.T) { + if trace.IsEnabled() { + t.Skip("skipping because trace is already enabled") + } + frBuf := new(bytes.Buffer) + tBuf := new(bytes.Buffer) + fr := startFlightRecorder(t) + defer fr.Stop() + startTrace(t, tBuf) + defer trace.Stop() + stopFlightRecorder(t, fr, frBuf) + stopTrace() + validateTraces(t, tBuf, frBuf) + }) +}