[dev.fuzz] internal/fuzz: improve cancellation in worker event loops

worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).

workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.

Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.

Also fixed missing newline in log message.

Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
Reviewed-on: https://go-review.googlesource.com/c/go/+/303429
Trust: Jay Conrod <jayconrod@google.com>
Trust: Katie Hockman <katie@golang.org>
Run-TryBot: Jay Conrod <jayconrod@google.com>
Reviewed-by: Katie Hockman <katie@golang.org>
This commit is contained in:
Jay Conrod 2021-03-19 15:11:29 -04:00
parent b178a81e1f
commit 4cde035a72
5 changed files with 293 additions and 176 deletions

View File

@ -0,0 +1,101 @@
# Test that when the coordinator experiences an I/O error communicating
# with a worker, the coordinator stops the worker and reports the error.
# The coordinator should not record a crasher.
#
# We simulate an I/O error in the test by writing garbage to fuzz_out.
# This is unlikely, but possible. It's difficult to simulate interruptions
# due to ^C and EOF errors which are more common. We don't report those.
[short] skip
[!darwin] [!linux] [!windows] skip
# If the I/O error occurs before F.Fuzz is called, the coordinator should
# stop the worker and say that.
! go test -fuzz=FuzzClosePipeBefore -parallel=1
stdout '\s*fuzzing process terminated without fuzzing:'
! stdout 'communicating with fuzzing process'
! exists testdata
# If the I/O error occurs after F.Fuzz is called (unlikely), just exit.
# It's hard to distinguish this case from the worker being interrupted by ^C
# or exiting with status 0 (which it should do when interrupted by ^C).
! go test -fuzz=FuzzClosePipeAfter -parallel=1
stdout '^\s*communicating with fuzzing process: invalid character ''!'' looking for beginning of value$'
! exists testdata
-- go.mod --
module test
go 1.17
-- io_error_test.go --
package io_error
import (
"flag"
"testing"
"time"
)
func isWorker() bool {
f := flag.Lookup("test.fuzzworker")
if f == nil {
return false
}
get, ok := f.Value.(flag.Getter)
if !ok {
return false
}
return get.Get() == interface{}(true)
}
func FuzzClosePipeBefore(f *testing.F) {
if isWorker() {
sendGarbageToCoordinator(f)
time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
}
f.Fuzz(func(*testing.T, []byte) {})
}
func FuzzClosePipeAfter(f *testing.F) {
f.Fuzz(func(t *testing.T, _ []byte) {
if isWorker() {
sendGarbageToCoordinator(t)
time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
}
})
}
-- io_error_windows_test.go --
package io_error
import (
"fmt"
"os"
"testing"
)
func sendGarbageToCoordinator(tb testing.TB) {
v := os.Getenv("GO_TEST_FUZZ_WORKER_HANDLES")
var fuzzInFD, fuzzOutFD uintptr
if _, err := fmt.Sscanf(v, "%x,%x", &fuzzInFD, &fuzzOutFD); err != nil {
tb.Fatalf("parsing GO_TEST_FUZZ_WORKER_HANDLES: %v", err)
}
f := os.NewFile(fuzzOutFD, "fuzz_out")
if _, err := f.Write([]byte("!!")); err != nil {
tb.Fatalf("writing fuzz_out: %v", err)
}
}
-- io_error_notwindows_test.go --
// +build !windows
package io_error
import (
"os"
"testing"
)
func sendGarbageToCoordinator(tb testing.TB) {
f := os.NewFile(4, "fuzz_out")
if _, err := f.Write([]byte("!!")); err != nil {
tb.Fatalf("writing fuzz_out: %v", err)
}
}

View File

@ -86,13 +86,13 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
env := os.Environ() // same as self
c := &coordinator{
doneC: make(chan struct{}),
inputC: make(chan CorpusEntry),
interestingC: make(chan CorpusEntry),
crasherC: make(chan crasherEntry),
}
errC := make(chan error)
// newWorker creates a worker but doesn't start it yet.
newWorker := func() (*worker, error) {
mem, err := sharedMemTempFile(workerSharedMemSize)
if err != nil {
@ -110,17 +110,30 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
}, nil
}
// fuzzCtx is used to stop workers, for example, after finding a crasher.
fuzzCtx, cancelWorkers := context.WithCancel(ctx)
defer cancelWorkers()
doneC := ctx.Done()
// stop is called when a worker encounters a fatal error.
var fuzzErr error
stopping := false
stop := func(err error) {
if fuzzErr == nil || fuzzErr == ctx.Err() {
if err == fuzzCtx.Err() || isInterruptError(err) {
// Suppress cancellation errors and terminations due to SIGINT.
// The messages are not helpful since either the user triggered the error
// (with ^C) or another more helpful message will be printed (a crasher).
err = nil
}
if err != nil && (fuzzErr == nil || fuzzErr == ctx.Err()) {
fuzzErr = err
}
if stopping {
return
}
stopping = true
close(c.doneC)
cancelWorkers()
doneC = nil
}
// Start workers.
@ -135,7 +148,7 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
for i := range workers {
w := workers[i]
go func() {
err := w.runFuzzing()
err := w.coordinate(fuzzCtx)
cleanErr := w.cleanup()
if err == nil {
err = cleanErr
@ -146,17 +159,14 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
// Main event loop.
// Do not return until all workers have terminated. We avoid a deadlock by
// receiving messages from workers even after closing c.doneC.
// receiving messages from workers even after ctx is cancelled.
activeWorkers := len(workers)
i := 0
for {
select {
case <-ctx.Done():
case <-doneC:
// Interrupted, cancelled, or timed out.
// TODO(jayconrod,katiehockman): On Windows, ^C only interrupts 'go test',
// not the coordinator or worker processes. 'go test' will stop running
// actions, but it won't interrupt its child processes. This makes it
// difficult to stop fuzzing on Windows without a timeout.
// stop sets doneC to nil so we don't busy wait here.
stop(ctx.Err())
case crasher := <-c.crasherC:
@ -259,11 +269,6 @@ type crasherEntry struct {
// coordinator holds channels that workers can use to communicate with
// the coordinator.
type coordinator struct {
// doneC is closed to indicate fuzzing is done and workers should stop.
// doneC may be closed due to a time limit expiring or a fatal error in
// a worker.
doneC chan struct{}
// inputC is sent values to fuzz by the coordinator. Any worker may receive
// values from this channel.
inputC chan CorpusEntry

View File

@ -135,6 +135,7 @@ func getWorkerComm() (comm workerComm, err error) {
}
func isInterruptError(err error) bool {
// TODO(jayconrod): implement
// On Windows, we can't tell whether the process was interrupted by the error
// returned by Wait. It looks like an ExitError with status 1.
return false
}

View File

@ -51,10 +51,11 @@ type worker struct {
memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
cmd *exec.Cmd // current worker process
client *workerClient // used to communicate with worker process
waitErr error // last error returned by wait, set before termC is closed.
termC chan struct{} // closed by wait when worker process terminates
cmd *exec.Cmd // current worker process
client *workerClient // used to communicate with worker process
waitErr error // last error returned by wait, set before termC is closed.
interrupted bool // true after stop interrupts a running worker.
termC chan struct{} // closed by wait when worker process terminates
}
// cleanup releases persistent resources associated with the worker.
@ -67,12 +68,12 @@ func (w *worker) cleanup() error {
return mem.Close()
}
// runFuzzing runs the test binary to perform fuzzing.
// coordinate runs the test binary to perform fuzzing.
//
// This function loops until w.coordinator.doneC is closed or some
// fatal error is encountered. It receives inputs from w.coordinator.inputC,
// then passes those on to the worker process.
func (w *worker) runFuzzing() error {
// coordinate loops until ctx is cancelled or a fatal error is encountered. While
// looping, coordinate receives inputs from w.coordinator.inputC, then passes
// those on to the worker process.
func (w *worker) coordinate(ctx context.Context) error {
// Start the process.
if err := w.start(); err != nil {
// We couldn't start the worker process. We can't do anything, and it's
@ -80,125 +81,113 @@ func (w *worker) runFuzzing() error {
return err
}
// inputC is set to w.coordinator.inputC when the worker is able to process
// input. It's nil at other times, so its case won't be selected in the
// event loop below.
var inputC chan CorpusEntry
// A value is sent to fuzzC to tell the worker to prepare to process an input
// by setting inputC.
fuzzC := make(chan struct{}, 1)
// Send the worker a message to make sure it can respond.
// Errors that occur before we get a response likely indicate that
// the worker did not call F.Fuzz or called F.Fail first.
// We don't record crashers for these errors.
pinged := false
go func() {
err := w.client.ping()
if err != nil {
w.stop() // trigger termC case below
return
if err := w.client.ping(ctx); err != nil {
w.stop()
if ctx.Err() != nil {
return ctx.Err()
}
pinged = true
fuzzC <- struct{}{}
}()
if isInterruptError(err) {
// User may have pressed ^C before worker responded.
return nil
}
return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
// TODO(jayconrod,katiehockman): record and return stderr.
}
// Main event loop.
for {
select {
case <-w.coordinator.doneC:
// All workers were told to stop.
case <-ctx.Done():
// Worker was told to stop.
err := w.stop()
if isInterruptError(err) {
// Worker interrupted by SIGINT. This can happen if the worker receives
// SIGINT before installing the signal handler. That's likely if
// TestMain or the fuzz target setup takes a long time.
return nil
if err != nil && !w.interrupted && !isInterruptError(err) {
return err
}
return err
return ctx.Err()
case <-w.termC:
// Worker process terminated unexpectedly.
if !pinged {
w.stop()
return fmt.Errorf("worker terminated without fuzzing")
// TODO(jayconrod,katiehockman): record and return stderr.
// Worker process terminated unexpectedly while waiting for input.
err := w.stop()
if w.interrupted {
panic("worker interrupted after unexpected termination")
}
if isInterruptError(w.waitErr) {
// Worker interrupted by SIGINT. See comment in doneC case.
w.stop()
if err == nil || isInterruptError(err) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal that was not sent by the coordinator.
//
// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
// all processes in the group concurrently, and the worker may see it
// before the coordinator. The worker should exit 0 gracefully (in
// theory).
//
// This condition is probably intended by the user, so suppress
// the error.
return nil
}
if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
w.stop()
return fmt.Errorf("worker exited unexpectedly due to an internal failure")
// TODO(jayconrod,katiehockman): record and return stderr.
if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
// Worker exited with a code indicating F.Fuzz was not called correctly,
// for example, F.Fail was called first.
return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
}
// Worker exited non-zero or was terminated by a non-interrupt signal
// (for example, SIGSEGV).
return fmt.Errorf("fuzzing process terminated unexpectedly: %w", err)
// TODO(jayconrod,katiehockman): record and return stderr.
// Unexpected termination. Inform the coordinator about the crash.
// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
mem := <-w.memMu
value := mem.valueCopy()
w.memMu <- mem
message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr)
crasher := crasherEntry{
CorpusEntry: CorpusEntry{Data: value},
errMsg: message,
}
w.coordinator.crasherC <- crasher
return w.stop()
case input := <-inputC:
case input := <-w.coordinator.inputC:
// Received input from coordinator.
inputC = nil // block new inputs until we finish with this one.
go func() {
args := fuzzArgs{Duration: workerFuzzDuration}
value, resp, err := w.client.fuzz(input.Data, args)
if err != nil {
// Error communicating with worker.
select {
case <-w.termC:
// Worker terminated, perhaps unexpectedly.
// We expect I/O errors due to partially sent or received RPCs,
// so ignore this error.
case <-w.coordinator.doneC:
// Timeout or interruption. Worker may also be interrupted.
// Again, ignore I/O errors.
default:
// TODO(jayconrod): if we get an error here, something failed between
// main and the call to testing.F.Fuzz. The error here won't
// be useful. Collect stderr, clean it up, and return that.
// TODO(jayconrod): we can get EPIPE if w.stop is called concurrently
// and it kills the worker process. Suppress this message in
// that case.
fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
}
// TODO(jayconrod): what happens if testing.F.Fuzz is never called?
// TODO(jayconrod): time out if the test process hangs.
} else if resp.Crashed {
// The worker found a crasher. Inform the coordinator.
crasher := crasherEntry{
CorpusEntry: CorpusEntry{Data: value},
errMsg: resp.Err,
}
w.coordinator.crasherC <- crasher
} else {
// Inform the coordinator that fuzzing found something
// interesting (i.e. new coverage).
if resp.Interesting {
w.coordinator.interestingC <- CorpusEntry{Data: value}
}
// Continue fuzzing.
fuzzC <- struct{}{}
args := fuzzArgs{Duration: workerFuzzDuration}
value, resp, err := w.client.fuzz(ctx, input.Data, args)
if err != nil {
// Error communicating with worker.
w.stop()
if ctx.Err() != nil {
// Timeout or interruption.
return ctx.Err()
}
if w.interrupted {
// Communication error before we stopped the worker.
// Report an error, but don't record a crasher.
return fmt.Errorf("communicating with fuzzing process: %v", err)
}
if w.waitErr == nil || isInterruptError(w.waitErr) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator). See comment in
// termC case above.
//
// Since we expect I/O errors around interrupts, ignore this error.
return nil
}
// TODO(jayconrod,katiehockman): gather statistics.
}()
case <-fuzzC:
// Worker finished fuzzing and nothing new happened.
inputC = w.coordinator.inputC // unblock new inputs
// Unexpected termination. Inform the coordinator about the crash.
// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
mem := <-w.memMu
value := mem.valueCopy()
w.memMu <- mem
message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr)
crasher := crasherEntry{
CorpusEntry: CorpusEntry{Data: value},
errMsg: message,
}
w.coordinator.crasherC <- crasher
return w.waitErr
} else if resp.Crashed {
// The worker found a crasher. Inform the coordinator.
crasher := crasherEntry{
CorpusEntry: CorpusEntry{Data: value},
errMsg: resp.Err,
}
w.coordinator.crasherC <- crasher
} else if resp.Interesting {
// Inform the coordinator that fuzzing found something
// interesting (i.e. new coverage).
w.coordinator.interestingC <- CorpusEntry{Data: value}
}
// TODO(jayconrod,katiehockman): gather statistics.
}
}
}
@ -218,6 +207,7 @@ func (w *worker) start() (err error) {
panic("worker already started")
}
w.waitErr = nil
w.interrupted = false
w.termC = nil
cmd := exec.Command(w.binPath, w.args...)
@ -332,6 +322,7 @@ func (w *worker) stop() error {
case <-t.C:
// Timer fired before worker terminated.
w.interrupted = true
switch sig {
case os.Interrupt:
// Try to stop the worker with SIGINT and wait a little longer.
@ -347,7 +338,7 @@ func (w *worker) stop() error {
case nil:
// Still waiting. Print a message to let the user know why.
fmt.Fprintf(os.Stderr, "go: waiting for fuzz worker to terminate...\n")
fmt.Fprintf(os.Stderr, "go: waiting for fuzzing process to terminate...\n")
}
}
}
@ -446,49 +437,55 @@ type workerServer struct {
// does not return errors from method calls; those are passed through serialized
// responses.
func (ws *workerServer) serve(ctx context.Context) error {
// Stop handling messages when ctx.Done() is closed. This normally happens
// when the worker process receives a SIGINT signal, which on POSIX platforms
// is sent to the process group when ^C is pressed.
//
// Ordinarily, the coordinator process may stop a worker by closing fuzz_in.
// We simulate that and interrupt a blocked read here.
doneC := make(chan struct{})
defer func() { close(doneC) }()
// This goroutine may stay blocked after serve returns because the underlying
// read blocks, even after the file descriptor in this process is closed. The
// pipe must be closed by the client, too.
errC := make(chan error, 1)
go func() {
select {
case <-ctx.Done():
ws.fuzzIn.Close()
case <-doneC:
enc := json.NewEncoder(ws.fuzzOut)
dec := json.NewDecoder(ws.fuzzIn)
for {
if ctx.Err() != nil {
return
}
var c call
if err := dec.Decode(&c); err == io.EOF {
return
} else if err != nil {
errC <- err
return
}
if ctx.Err() != nil {
return
}
var resp interface{}
switch {
case c.Fuzz != nil:
resp = ws.fuzz(ctx, *c.Fuzz)
case c.Ping != nil:
resp = ws.ping(ctx, *c.Ping)
default:
errC <- errors.New("no arguments provided for any call")
return
}
if err := enc.Encode(resp); err != nil {
errC <- err
return
}
}
}()
enc := json.NewEncoder(ws.fuzzOut)
dec := json.NewDecoder(ws.fuzzIn)
for {
var c call
if err := dec.Decode(&c); err != nil {
if ctx.Err() != nil {
return ctx.Err()
} else if err == io.EOF {
return nil
} else {
return err
}
}
var resp interface{}
switch {
case c.Fuzz != nil:
resp = ws.fuzz(ctx, *c.Fuzz)
case c.Ping != nil:
resp = ws.ping(ctx, *c.Ping)
default:
return errors.New("no arguments provided for any call")
}
if err := enc.Encode(resp); err != nil {
return err
}
select {
case <-ctx.Done():
// Stop handling messages when ctx.Done() is closed. This normally happens
// when the worker process receives a SIGINT signal, which on POSIX platforms
// is sent to the process group when ^C is pressed.
return ctx.Err()
case err := <-errC:
return err
}
}
@ -691,7 +688,7 @@ func (wc *workerClient) Close() error {
var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
@ -703,11 +700,7 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re
wc.memMu <- mem
c := call{Fuzz: &args}
if err := wc.enc.Encode(c); err != nil {
return nil, fuzzResponse{}, err
}
err = wc.dec.Decode(&resp)
err = wc.call(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
@ -719,14 +712,31 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re
}
// ping tells the worker to call the ping method. See workerServer.ping.
func (wc *workerClient) ping() error {
func (wc *workerClient) ping(ctx context.Context) error {
c := call{Ping: &pingArgs{}}
if err := wc.enc.Encode(c); err != nil {
return err
}
var resp pingResponse
if err := wc.dec.Decode(&resp); err != nil {
return wc.call(ctx, c, &resp)
}
// call sends an RPC from the coordinator to the worker process and waits for
// the response. The call may be cancelled with ctx.
func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
// This goroutine may stay blocked after call returns because the underlying
// read blocks, even after the file descriptor in this process is closed. The
// pipe must be closed by the server, too.
errC := make(chan error, 1)
go func() {
if err := wc.enc.Encode(c); err != nil {
errC <- err
return
}
errC <- wc.dec.Decode(resp)
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
return nil
}

View File

@ -362,7 +362,7 @@ func (f *F) Fuzz(ff interface{}) {
if err != nil {
f.result = FuzzResult{Error: err}
f.Fail()
fmt.Fprintf(f.w, "%v", err)
fmt.Fprintf(f.w, "%v\n", err)
if crashErr, ok := err.(fuzzCrashError); ok {
crashName := crashErr.CrashName()
fmt.Fprintf(f.w, "Crash written to %s\n", filepath.Join("testdata/corpus", f.name, crashName))