diff --git a/src/cmd/go/testdata/script/test_fuzz_io_error.txt b/src/cmd/go/testdata/script/test_fuzz_io_error.txt new file mode 100644 index 0000000000..4c7ab4c152 --- /dev/null +++ b/src/cmd/go/testdata/script/test_fuzz_io_error.txt @@ -0,0 +1,101 @@ +# Test that when the coordinator experiences an I/O error communicating +# with a worker, the coordinator stops the worker and reports the error. +# The coordinator should not record a crasher. +# +# We simulate an I/O error in the test by writing garbage to fuzz_out. +# This is unlikely, but possible. It's difficult to simulate interruptions +# due to ^C and EOF errors which are more common. We don't report those. +[short] skip +[!darwin] [!linux] [!windows] skip + +# If the I/O error occurs before F.Fuzz is called, the coordinator should +# stop the worker and say that. +! go test -fuzz=FuzzClosePipeBefore -parallel=1 +stdout '\s*fuzzing process terminated without fuzzing:' +! stdout 'communicating with fuzzing process' +! exists testdata + +# If the I/O error occurs after F.Fuzz is called (unlikely), just exit. +# It's hard to distinguish this case from the worker being interrupted by ^C +# or exiting with status 0 (which it should do when interrupted by ^C). +! go test -fuzz=FuzzClosePipeAfter -parallel=1 +stdout '^\s*communicating with fuzzing process: invalid character ''!'' looking for beginning of value$' +! exists testdata + +-- go.mod -- +module test + +go 1.17 +-- io_error_test.go -- +package io_error + +import ( + "flag" + "testing" + "time" +) + +func isWorker() bool { + f := flag.Lookup("test.fuzzworker") + if f == nil { + return false + } + get, ok := f.Value.(flag.Getter) + if !ok { + return false + } + return get.Get() == interface{}(true) +} + +func FuzzClosePipeBefore(f *testing.F) { + if isWorker() { + sendGarbageToCoordinator(f) + time.Sleep(3600 * time.Second) // pause until coordinator terminates the process + } + f.Fuzz(func(*testing.T, []byte) {}) +} + +func FuzzClosePipeAfter(f *testing.F) { + f.Fuzz(func(t *testing.T, _ []byte) { + if isWorker() { + sendGarbageToCoordinator(t) + time.Sleep(3600 * time.Second) // pause until coordinator terminates the process + } + }) +} +-- io_error_windows_test.go -- +package io_error + +import ( + "fmt" + "os" + "testing" +) + +func sendGarbageToCoordinator(tb testing.TB) { + v := os.Getenv("GO_TEST_FUZZ_WORKER_HANDLES") + var fuzzInFD, fuzzOutFD uintptr + if _, err := fmt.Sscanf(v, "%x,%x", &fuzzInFD, &fuzzOutFD); err != nil { + tb.Fatalf("parsing GO_TEST_FUZZ_WORKER_HANDLES: %v", err) + } + f := os.NewFile(fuzzOutFD, "fuzz_out") + if _, err := f.Write([]byte("!!")); err != nil { + tb.Fatalf("writing fuzz_out: %v", err) + } +} +-- io_error_notwindows_test.go -- +// +build !windows + +package io_error + +import ( + "os" + "testing" +) + +func sendGarbageToCoordinator(tb testing.TB) { + f := os.NewFile(4, "fuzz_out") + if _, err := f.Write([]byte("!!")); err != nil { + tb.Fatalf("writing fuzz_out: %v", err) + } +} diff --git a/src/internal/fuzz/fuzz.go b/src/internal/fuzz/fuzz.go index 293cb48d4d..5fa265f8c5 100644 --- a/src/internal/fuzz/fuzz.go +++ b/src/internal/fuzz/fuzz.go @@ -86,13 +86,13 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int, env := os.Environ() // same as self c := &coordinator{ - doneC: make(chan struct{}), inputC: make(chan CorpusEntry), interestingC: make(chan CorpusEntry), crasherC: make(chan crasherEntry), } errC := make(chan error) + // newWorker creates a worker but doesn't start it yet. newWorker := func() (*worker, error) { mem, err := sharedMemTempFile(workerSharedMemSize) if err != nil { @@ -110,17 +110,30 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int, }, nil } + // fuzzCtx is used to stop workers, for example, after finding a crasher. + fuzzCtx, cancelWorkers := context.WithCancel(ctx) + defer cancelWorkers() + doneC := ctx.Done() + + // stop is called when a worker encounters a fatal error. var fuzzErr error stopping := false stop := func(err error) { - if fuzzErr == nil || fuzzErr == ctx.Err() { + if err == fuzzCtx.Err() || isInterruptError(err) { + // Suppress cancellation errors and terminations due to SIGINT. + // The messages are not helpful since either the user triggered the error + // (with ^C) or another more helpful message will be printed (a crasher). + err = nil + } + if err != nil && (fuzzErr == nil || fuzzErr == ctx.Err()) { fuzzErr = err } if stopping { return } stopping = true - close(c.doneC) + cancelWorkers() + doneC = nil } // Start workers. @@ -135,7 +148,7 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int, for i := range workers { w := workers[i] go func() { - err := w.runFuzzing() + err := w.coordinate(fuzzCtx) cleanErr := w.cleanup() if err == nil { err = cleanErr @@ -146,17 +159,14 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int, // Main event loop. // Do not return until all workers have terminated. We avoid a deadlock by - // receiving messages from workers even after closing c.doneC. + // receiving messages from workers even after ctx is cancelled. activeWorkers := len(workers) i := 0 for { select { - case <-ctx.Done(): + case <-doneC: // Interrupted, cancelled, or timed out. - // TODO(jayconrod,katiehockman): On Windows, ^C only interrupts 'go test', - // not the coordinator or worker processes. 'go test' will stop running - // actions, but it won't interrupt its child processes. This makes it - // difficult to stop fuzzing on Windows without a timeout. + // stop sets doneC to nil so we don't busy wait here. stop(ctx.Err()) case crasher := <-c.crasherC: @@ -259,11 +269,6 @@ type crasherEntry struct { // coordinator holds channels that workers can use to communicate with // the coordinator. type coordinator struct { - // doneC is closed to indicate fuzzing is done and workers should stop. - // doneC may be closed due to a time limit expiring or a fatal error in - // a worker. - doneC chan struct{} - // inputC is sent values to fuzz by the coordinator. Any worker may receive // values from this channel. inputC chan CorpusEntry diff --git a/src/internal/fuzz/sys_windows.go b/src/internal/fuzz/sys_windows.go index e1734af53c..de6af81d94 100644 --- a/src/internal/fuzz/sys_windows.go +++ b/src/internal/fuzz/sys_windows.go @@ -135,6 +135,7 @@ func getWorkerComm() (comm workerComm, err error) { } func isInterruptError(err error) bool { - // TODO(jayconrod): implement + // On Windows, we can't tell whether the process was interrupted by the error + // returned by Wait. It looks like an ExitError with status 1. return false } diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go index 506a485f24..2c4cc1f82b 100644 --- a/src/internal/fuzz/worker.go +++ b/src/internal/fuzz/worker.go @@ -51,10 +51,11 @@ type worker struct { memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes. - cmd *exec.Cmd // current worker process - client *workerClient // used to communicate with worker process - waitErr error // last error returned by wait, set before termC is closed. - termC chan struct{} // closed by wait when worker process terminates + cmd *exec.Cmd // current worker process + client *workerClient // used to communicate with worker process + waitErr error // last error returned by wait, set before termC is closed. + interrupted bool // true after stop interrupts a running worker. + termC chan struct{} // closed by wait when worker process terminates } // cleanup releases persistent resources associated with the worker. @@ -67,12 +68,12 @@ func (w *worker) cleanup() error { return mem.Close() } -// runFuzzing runs the test binary to perform fuzzing. +// coordinate runs the test binary to perform fuzzing. // -// This function loops until w.coordinator.doneC is closed or some -// fatal error is encountered. It receives inputs from w.coordinator.inputC, -// then passes those on to the worker process. -func (w *worker) runFuzzing() error { +// coordinate loops until ctx is cancelled or a fatal error is encountered. While +// looping, coordinate receives inputs from w.coordinator.inputC, then passes +// those on to the worker process. +func (w *worker) coordinate(ctx context.Context) error { // Start the process. if err := w.start(); err != nil { // We couldn't start the worker process. We can't do anything, and it's @@ -80,125 +81,113 @@ func (w *worker) runFuzzing() error { return err } - // inputC is set to w.coordinator.inputC when the worker is able to process - // input. It's nil at other times, so its case won't be selected in the - // event loop below. - var inputC chan CorpusEntry - - // A value is sent to fuzzC to tell the worker to prepare to process an input - // by setting inputC. - fuzzC := make(chan struct{}, 1) - // Send the worker a message to make sure it can respond. // Errors that occur before we get a response likely indicate that // the worker did not call F.Fuzz or called F.Fail first. // We don't record crashers for these errors. - pinged := false - go func() { - err := w.client.ping() - if err != nil { - w.stop() // trigger termC case below - return + if err := w.client.ping(ctx); err != nil { + w.stop() + if ctx.Err() != nil { + return ctx.Err() } - pinged = true - fuzzC <- struct{}{} - }() + if isInterruptError(err) { + // User may have pressed ^C before worker responded. + return nil + } + return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err) + // TODO(jayconrod,katiehockman): record and return stderr. + } // Main event loop. for { select { - case <-w.coordinator.doneC: - // All workers were told to stop. + case <-ctx.Done(): + // Worker was told to stop. err := w.stop() - if isInterruptError(err) { - // Worker interrupted by SIGINT. This can happen if the worker receives - // SIGINT before installing the signal handler. That's likely if - // TestMain or the fuzz target setup takes a long time. - return nil + if err != nil && !w.interrupted && !isInterruptError(err) { + return err } - return err + return ctx.Err() case <-w.termC: - // Worker process terminated unexpectedly. - if !pinged { - w.stop() - return fmt.Errorf("worker terminated without fuzzing") - // TODO(jayconrod,katiehockman): record and return stderr. + // Worker process terminated unexpectedly while waiting for input. + err := w.stop() + if w.interrupted { + panic("worker interrupted after unexpected termination") } - if isInterruptError(w.waitErr) { - // Worker interrupted by SIGINT. See comment in doneC case. - w.stop() + if err == nil || isInterruptError(err) { + // Worker stopped, either by exiting with status 0 or after being + // interrupted with a signal that was not sent by the coordinator. + // + // When the user presses ^C, on POSIX platforms, SIGINT is delivered to + // all processes in the group concurrently, and the worker may see it + // before the coordinator. The worker should exit 0 gracefully (in + // theory). + // + // This condition is probably intended by the user, so suppress + // the error. return nil } - if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode { - w.stop() - return fmt.Errorf("worker exited unexpectedly due to an internal failure") - // TODO(jayconrod,katiehockman): record and return stderr. + if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode { + // Worker exited with a code indicating F.Fuzz was not called correctly, + // for example, F.Fail was called first. + return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err) } + // Worker exited non-zero or was terminated by a non-interrupt signal + // (for example, SIGSEGV). + return fmt.Errorf("fuzzing process terminated unexpectedly: %w", err) + // TODO(jayconrod,katiehockman): record and return stderr. - // Unexpected termination. Inform the coordinator about the crash. - // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker. - mem := <-w.memMu - value := mem.valueCopy() - w.memMu <- mem - message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr) - crasher := crasherEntry{ - CorpusEntry: CorpusEntry{Data: value}, - errMsg: message, - } - w.coordinator.crasherC <- crasher - return w.stop() - - case input := <-inputC: + case input := <-w.coordinator.inputC: // Received input from coordinator. - inputC = nil // block new inputs until we finish with this one. - go func() { - args := fuzzArgs{Duration: workerFuzzDuration} - value, resp, err := w.client.fuzz(input.Data, args) - if err != nil { - // Error communicating with worker. - select { - case <-w.termC: - // Worker terminated, perhaps unexpectedly. - // We expect I/O errors due to partially sent or received RPCs, - // so ignore this error. - case <-w.coordinator.doneC: - // Timeout or interruption. Worker may also be interrupted. - // Again, ignore I/O errors. - default: - // TODO(jayconrod): if we get an error here, something failed between - // main and the call to testing.F.Fuzz. The error here won't - // be useful. Collect stderr, clean it up, and return that. - // TODO(jayconrod): we can get EPIPE if w.stop is called concurrently - // and it kills the worker process. Suppress this message in - // that case. - fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err) - } - // TODO(jayconrod): what happens if testing.F.Fuzz is never called? - // TODO(jayconrod): time out if the test process hangs. - } else if resp.Crashed { - // The worker found a crasher. Inform the coordinator. - crasher := crasherEntry{ - CorpusEntry: CorpusEntry{Data: value}, - errMsg: resp.Err, - } - w.coordinator.crasherC <- crasher - } else { - // Inform the coordinator that fuzzing found something - // interesting (i.e. new coverage). - if resp.Interesting { - w.coordinator.interestingC <- CorpusEntry{Data: value} - } - - // Continue fuzzing. - fuzzC <- struct{}{} + args := fuzzArgs{Duration: workerFuzzDuration} + value, resp, err := w.client.fuzz(ctx, input.Data, args) + if err != nil { + // Error communicating with worker. + w.stop() + if ctx.Err() != nil { + // Timeout or interruption. + return ctx.Err() + } + if w.interrupted { + // Communication error before we stopped the worker. + // Report an error, but don't record a crasher. + return fmt.Errorf("communicating with fuzzing process: %v", err) + } + if w.waitErr == nil || isInterruptError(w.waitErr) { + // Worker stopped, either by exiting with status 0 or after being + // interrupted with a signal (not sent by coordinator). See comment in + // termC case above. + // + // Since we expect I/O errors around interrupts, ignore this error. + return nil } - // TODO(jayconrod,katiehockman): gather statistics. - }() - case <-fuzzC: - // Worker finished fuzzing and nothing new happened. - inputC = w.coordinator.inputC // unblock new inputs + // Unexpected termination. Inform the coordinator about the crash. + // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker. + mem := <-w.memMu + value := mem.valueCopy() + w.memMu <- mem + message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr) + crasher := crasherEntry{ + CorpusEntry: CorpusEntry{Data: value}, + errMsg: message, + } + w.coordinator.crasherC <- crasher + return w.waitErr + } else if resp.Crashed { + // The worker found a crasher. Inform the coordinator. + crasher := crasherEntry{ + CorpusEntry: CorpusEntry{Data: value}, + errMsg: resp.Err, + } + w.coordinator.crasherC <- crasher + } else if resp.Interesting { + // Inform the coordinator that fuzzing found something + // interesting (i.e. new coverage). + w.coordinator.interestingC <- CorpusEntry{Data: value} + } + // TODO(jayconrod,katiehockman): gather statistics. } } } @@ -218,6 +207,7 @@ func (w *worker) start() (err error) { panic("worker already started") } w.waitErr = nil + w.interrupted = false w.termC = nil cmd := exec.Command(w.binPath, w.args...) @@ -332,6 +322,7 @@ func (w *worker) stop() error { case <-t.C: // Timer fired before worker terminated. + w.interrupted = true switch sig { case os.Interrupt: // Try to stop the worker with SIGINT and wait a little longer. @@ -347,7 +338,7 @@ func (w *worker) stop() error { case nil: // Still waiting. Print a message to let the user know why. - fmt.Fprintf(os.Stderr, "go: waiting for fuzz worker to terminate...\n") + fmt.Fprintf(os.Stderr, "go: waiting for fuzzing process to terminate...\n") } } } @@ -446,49 +437,55 @@ type workerServer struct { // does not return errors from method calls; those are passed through serialized // responses. func (ws *workerServer) serve(ctx context.Context) error { - // Stop handling messages when ctx.Done() is closed. This normally happens - // when the worker process receives a SIGINT signal, which on POSIX platforms - // is sent to the process group when ^C is pressed. - // - // Ordinarily, the coordinator process may stop a worker by closing fuzz_in. - // We simulate that and interrupt a blocked read here. - doneC := make(chan struct{}) - defer func() { close(doneC) }() + // This goroutine may stay blocked after serve returns because the underlying + // read blocks, even after the file descriptor in this process is closed. The + // pipe must be closed by the client, too. + errC := make(chan error, 1) go func() { - select { - case <-ctx.Done(): - ws.fuzzIn.Close() - case <-doneC: + enc := json.NewEncoder(ws.fuzzOut) + dec := json.NewDecoder(ws.fuzzIn) + for { + if ctx.Err() != nil { + return + } + + var c call + if err := dec.Decode(&c); err == io.EOF { + return + } else if err != nil { + errC <- err + return + } + if ctx.Err() != nil { + return + } + + var resp interface{} + switch { + case c.Fuzz != nil: + resp = ws.fuzz(ctx, *c.Fuzz) + case c.Ping != nil: + resp = ws.ping(ctx, *c.Ping) + default: + errC <- errors.New("no arguments provided for any call") + return + } + + if err := enc.Encode(resp); err != nil { + errC <- err + return + } } }() - enc := json.NewEncoder(ws.fuzzOut) - dec := json.NewDecoder(ws.fuzzIn) - for { - var c call - if err := dec.Decode(&c); err != nil { - if ctx.Err() != nil { - return ctx.Err() - } else if err == io.EOF { - return nil - } else { - return err - } - } - - var resp interface{} - switch { - case c.Fuzz != nil: - resp = ws.fuzz(ctx, *c.Fuzz) - case c.Ping != nil: - resp = ws.ping(ctx, *c.Ping) - default: - return errors.New("no arguments provided for any call") - } - - if err := enc.Encode(resp); err != nil { - return err - } + select { + case <-ctx.Done(): + // Stop handling messages when ctx.Done() is closed. This normally happens + // when the worker process receives a SIGINT signal, which on POSIX platforms + // is sent to the process group when ^C is pressed. + return ctx.Err() + case err := <-errC: + return err } } @@ -691,7 +688,7 @@ func (wc *workerClient) Close() error { var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped") // fuzz tells the worker to call the fuzz method. See workerServer.fuzz. -func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) { +func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) { wc.mu.Lock() defer wc.mu.Unlock() @@ -703,11 +700,7 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re wc.memMu <- mem c := call{Fuzz: &args} - if err := wc.enc.Encode(c); err != nil { - return nil, fuzzResponse{}, err - } - err = wc.dec.Decode(&resp) - + err = wc.call(ctx, c, &resp) mem, ok = <-wc.memMu if !ok { return nil, fuzzResponse{}, errSharedMemClosed @@ -719,14 +712,31 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re } // ping tells the worker to call the ping method. See workerServer.ping. -func (wc *workerClient) ping() error { +func (wc *workerClient) ping(ctx context.Context) error { c := call{Ping: &pingArgs{}} - if err := wc.enc.Encode(c); err != nil { - return err - } var resp pingResponse - if err := wc.dec.Decode(&resp); err != nil { + return wc.call(ctx, c, &resp) +} + +// call sends an RPC from the coordinator to the worker process and waits for +// the response. The call may be cancelled with ctx. +func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) { + // This goroutine may stay blocked after call returns because the underlying + // read blocks, even after the file descriptor in this process is closed. The + // pipe must be closed by the server, too. + errC := make(chan error, 1) + go func() { + if err := wc.enc.Encode(c); err != nil { + errC <- err + return + } + errC <- wc.dec.Decode(resp) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: return err } - return nil } diff --git a/src/testing/fuzz.go b/src/testing/fuzz.go index 2a0754fdd7..73ac59cfb4 100644 --- a/src/testing/fuzz.go +++ b/src/testing/fuzz.go @@ -362,7 +362,7 @@ func (f *F) Fuzz(ff interface{}) { if err != nil { f.result = FuzzResult{Error: err} f.Fail() - fmt.Fprintf(f.w, "%v", err) + fmt.Fprintf(f.w, "%v\n", err) if crashErr, ok := err.(fuzzCrashError); ok { crashName := crashErr.CrashName() fmt.Fprintf(f.w, "Crash written to %s\n", filepath.Join("testdata/corpus", f.name, crashName))