[dev.fuzz] internal/fuzz: refactor in preparation for GOFUZZCACHE

Several small changes, most related to GOFUZZCACHE.

* Use separate channels to send crashers and interesting values to the
  coordinator.
* Add a new type, crasherEntry, which is a corpusEntry with an
  error message.
* Workers now send fatal errors to the coordinator via errC instead of
  returning or closing doneC.
* In CoordinateFuzzing, defer code that closes doneC and waits for
  workers to stop. This is the only place where doneC is closed.
* In workerServer and workerClient, always pass input values through
  shared memory instead of RPC messages or arguments to avoid
  confusion.
* Rename sharedMem.value to valueRef and add valueCopy to make it
  clearer whether a reference or copy is needed.
* mutate now operates on shared memory directly.
* mutate will not panic on empty input.

Change-Id: I6e57354875508f0ac4483ed2728f3ba18dc938c4
Reviewed-on: https://go-review.googlesource.com/c/go/+/275533
Run-TryBot: Jay Conrod <jayconrod@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Jay Conrod <jayconrod@google.com>
Reviewed-by: Katie Hockman <katie@golang.org>
This commit is contained in:
Jay Conrod 2020-12-04 18:07:20 -05:00
parent c3dc225988
commit 97df3ba792
4 changed files with 130 additions and 98 deletions

View File

@ -35,7 +35,7 @@ import (
//
// If a crash occurs, the function will return an error containing information
// about the crash, which can be reported to the user.
func CoordinateFuzzing(parallel int, seed [][]byte, crashDir string) error {
func CoordinateFuzzing(parallel int, seed [][]byte, crashDir string) (err error) {
if parallel == 0 {
parallel = runtime.GOMAXPROCS(0)
}
@ -69,7 +69,9 @@ func CoordinateFuzzing(parallel int, seed [][]byte, crashDir string) error {
c := &coordinator{
doneC: make(chan struct{}),
inputC: make(chan corpusEntry),
interestingC: make(chan fuzzResponse),
interestingC: make(chan corpusEntry),
crasherC: make(chan crasherEntry),
errC: make(chan error),
}
newWorker := func() (*worker, error) {
@ -110,58 +112,56 @@ func CoordinateFuzzing(parallel int, seed [][]byte, crashDir string) error {
}(i)
}
// Before returning, signal workers to stop, wait for them to actually stop,
// and gather any errors they encountered.
defer func() {
close(c.doneC)
wg.Wait()
if err == nil {
for _, err = range workerErrs {
if err != nil {
// Return the first error found.
return
}
}
}
}()
// Main event loop.
stopC := time.After(duration)
i := 0
for {
select {
// TODO(jayconrod): handle interruptions like SIGINT.
// TODO(jayconrod,katiehockman): receive crashers and new corpus values
// from workers.
case <-stopC:
// Time's up.
close(c.doneC)
case <-c.doneC:
// Wait for workers to stop and return.
wg.Wait()
for _, err := range workerErrs {
if err != nil {
return err
}
}
return nil
case resp := <-c.interestingC:
// Some interesting input arrived from a worker.
if resp.Err != "" {
// This is a crasher, which should be written to testdata and
// reported to the user.
fileName, err := writeToCorpus(resp.Value, crashDir)
if err == nil {
err = fmt.Errorf(" Crash written to: %s\n%s", fileName, resp.Err)
}
// TODO(jayconrod,katiehockman): if -keepfuzzing, don't stop all
// of the workers, but still report to the user.
// Stop the rest of the workers and wait until they have
// stopped before returning this error.
close(c.doneC)
wg.Wait()
return err
} else if len(resp.Value) > 0 {
// This is not a crasher, but something interesting that should
// be added to the on disk corpus and prioritized for future
// workers to fuzz.
corpus.entries = append(corpus.entries, corpusEntry{b: resp.Value})
// TODO(jayconrod, katiehockman): Add this to the on disk corpus
// TODO(jayconrod, katiehockman): Prioritize fuzzing these values which expanded coverage
case crasher := <-c.crasherC:
// A worker found a crasher. Write it to testdata and return it.
fileName, err := writeToCorpus(crasher.b, crashDir)
if err == nil {
err = fmt.Errorf(" Crash written to %s\n%s", fileName, crasher.errMsg)
}
// TODO(jayconrod,katiehockman): if -keepfuzzing, report the error to
// the user and restart the crashed worker.
return err
case entry := <-c.interestingC:
// Some interesting input arrived from a worker.
// This is not a crasher, but something interesting that should
// be added to the on disk corpus and prioritized for future
// workers to fuzz.
// TODO(jayconrod, katiehockman): Prioritize fuzzing these values which expanded coverage
corpus.entries = append(corpus.entries, entry)
case err := <-c.errC:
// A worker encountered a fatal error.
return err
case c.inputC <- corpus.entries[i]:
// Sent the next input to any worker.
// Send the next input to any worker.
// TODO(jayconrod,katiehockman): need a scheduling algorithm that chooses
// which corpus value to send next (or generates something new).
i = (i + 1) % len(corpus.entries)
@ -183,6 +183,11 @@ type corpusEntry struct {
b []byte
}
type crasherEntry struct {
corpusEntry
errMsg string
}
// coordinator holds channels that workers can use to communicate with
// the coordinator.
type coordinator struct {
@ -196,9 +201,18 @@ type coordinator struct {
inputC chan corpusEntry
// interestingC is sent interesting values by the worker, which is received
// by the coordinator. The interesting value could be a crash or some
// value that increased coverage.
interestingC chan fuzzResponse
// by the coordinator. Values are usually interesting because they
// increase coverage.
interestingC chan corpusEntry
// crasherC is sent values that crashed the code being fuzzed. These values
// should be saved in the corpus, and we may want to stop fuzzing after
// receiving one.
crasherC chan crasherEntry
// errC is sent internal errors encountered by workers. When the coordinator
// receives an error, it closes doneC and returns.
errC chan error
}
// ReadCorpus reads the corpus from the testdata directory in this target's

View File

@ -17,11 +17,9 @@ import (
//
// When fuzzing, the coordinator creates a sharedMem from a temporary file for
// each worker. This buffer is used to pass values to fuzz between processes.
//
// Care must be taken to synchronize access to shared memory across processes.
// For example, workerClient and workerServer use an RPC protocol over pipes:
// workerServer may access shared memory when handling an RPC; workerClient may
// access shared memory at other times.
// Care must be taken to manage access to shared memory across processes;
// sharedMem provides no synchronization on its own. See workerComm for an
// explanation.
type sharedMem struct {
// f is the file mapped into memory.
f *os.File
@ -81,19 +79,27 @@ func (m *sharedMem) header() *sharedMemHeader {
return (*sharedMemHeader)(unsafe.Pointer(&m.region[0]))
}
// value returns the value currently stored in shared memory. The returned slice
// points to shared memory; it is not a copy.
func (m *sharedMem) value() []byte {
// valueRef returns the value currently stored in shared memory. The returned
// slice points to shared memory; it is not a copy.
func (m *sharedMem) valueRef() []byte {
length := m.header().length
valueOffset := int(unsafe.Sizeof(sharedMemHeader{}))
return m.region[valueOffset : valueOffset+length]
}
// valueCopy returns a copy of the value stored in shared memory.
func (m *sharedMem) valueCopy() []byte {
ref := m.valueRef()
b := make([]byte, len(ref))
copy(b, ref)
return b
}
// setValue copies the data in b into the shared memory buffer and sets
// the length. len(b) must be less than or equal to the capacity of the buffer
// (as returned by cap(m.value())).
func (m *sharedMem) setValue(b []byte) {
v := m.value()
v := m.valueRef()
if len(b) > cap(v) {
panic(fmt.Sprintf("value length %d larger than shared memory capacity %d", len(b), cap(v)))
}

View File

@ -6,13 +6,12 @@ package fuzz
import "math/rand"
func mutate(b []byte) []byte {
mutated := make([]byte, len(b))
copy(mutated, b)
func mutate(b []byte) {
if len(b) == 0 {
return
}
// Mutate a byte in a random position.
pos := rand.Intn(len(mutated))
mutated[pos] = byte(rand.Intn(256))
return mutated
pos := rand.Intn(len(b))
b[pos] = byte(rand.Intn(256))
}

View File

@ -67,7 +67,7 @@ func (w *worker) runFuzzing() error {
if err := w.start(); err != nil {
// We couldn't start the worker process. We can't do anything, and it's
// likely that other workers can't either, so give up.
close(w.coordinator.doneC)
w.coordinator.errC <- err
return err
}
@ -84,11 +84,12 @@ func (w *worker) runFuzzing() error {
case <-w.termC:
// Worker process terminated unexpectedly, so inform the coordinator
// that a crash occurred.
b := w.mem.value() // These are the bytes that caused the crash.
resB := make([]byte, len(b))
copy(resB, b)
resp := fuzzResponse{Value: resB, Err: "fuzzing process crashed unexpectedly"}
w.coordinator.interestingC <- resp
value := w.mem.valueCopy()
crasher := crasherEntry{
corpusEntry: corpusEntry{b: value},
errMsg: "fuzzing process crashed unexpectedly",
}
w.coordinator.crasherC <- crasher
// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
err := w.stop()
@ -102,7 +103,7 @@ func (w *worker) runFuzzing() error {
inputC = nil // block new inputs until we finish with this one.
go func() {
args := fuzzArgs{Duration: workerFuzzDuration}
resp, err := w.client.fuzz(input.b, args)
value, resp, err := w.client.fuzz(input.b, args)
if err != nil {
// TODO(jayconrod): if we get an error here, something failed between
// main and the call to testing.F.Fuzz. The error here won't
@ -113,22 +114,22 @@ func (w *worker) runFuzzing() error {
// TODO(jayconrod): what happens if testing.F.Fuzz is never called?
// TODO(jayconrod): time out if the test process hangs.
fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
} else if resp.Err != "" {
// The worker found a crasher. Inform the coordinator.
crasher := crasherEntry{
corpusEntry: corpusEntry{b: value},
errMsg: resp.Err,
}
w.coordinator.crasherC <- crasher
} else {
// TODO(jayconrod, katiehockman): Right now, this will just
// send an empty fuzzResponse{} if nothing interesting came
// up. Probably want to only pass to interestingC if fuzzing
// found something interesting.
// Inform the coordinator that fuzzing found something
// interesting (ie. a crash or new coverage).
w.coordinator.interestingC <- resp
if resp.Err == "" {
// Only unblock to allow more fuzzing to occur if
// everything was successful with the last fuzzing
// attempt.
fuzzC <- struct{}{}
if resp.Interesting {
w.coordinator.interestingC <- corpusEntry{b: value}
}
// Continue fuzzing.
fuzzC <- struct{}{}
}
// TODO(jayconrod,katiehockman): gather statistics.
}()
@ -316,17 +317,31 @@ type call struct {
Fuzz *fuzzArgs
}
// fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is
// passed in shared memory.
type fuzzArgs struct {
Duration time.Duration
}
// fuzzResponse contains results from workerServer.fuzz.
type fuzzResponse struct {
Value []byte // The bytes that yielded the response.
Err string // The error if the bytes resulted in a crash, nil otherwise.
// Interesting indicates the value in shared memory may be interesting to
// the coordinator (for example, because it expanded coverage).
Interesting bool
// Err is set if the value in shared memory caused a crash.
Err string
}
// workerComm holds pipes and shared memory used for communication
// between the coordinator process (client) and a worker process (server).
// These values are unique to each worker; they are shared only with the
// coordinator, not with other workers.
//
// Access to shared memory is synchronized implicitly over the RPC protocol
// implemented in workerServer and workerClient. During a call, the client
// (worker) has exclusive access to shared memory; at other times, the server
// (coordinator) has exclusive access.
type workerComm struct {
fuzzIn, fuzzOut *os.File
mem *sharedMem
@ -369,8 +384,7 @@ func (ws *workerServer) serve() error {
var resp interface{}
switch {
case c.Fuzz != nil:
value := ws.mem.value()
resp = ws.fuzz(value, *c.Fuzz)
resp = ws.fuzz(*c.Fuzz)
default:
return errors.New("no arguments provided for any call")
}
@ -384,20 +398,21 @@ func (ws *workerServer) serve() error {
// fuzz runs the test function on random variations of a given input value for
// a given amount of time. fuzz returns early if it finds an input that crashes
// the fuzz function or an input that expands coverage.
func (ws *workerServer) fuzz(value []byte, args fuzzArgs) fuzzResponse {
func (ws *workerServer) fuzz(args fuzzArgs) fuzzResponse {
t := time.NewTimer(args.Duration)
for {
select {
case <-t.C:
return fuzzResponse{}
// TODO(jayconrod,katiehockman): this value is not interesting. Use a
// real heuristic once we have one.
return fuzzResponse{Interesting: true}
default:
b := mutate(value)
ws.mem.setValue(b) // Write the value to memory so it can be recovered it if the process dies
if err := ws.fuzzFn(b); err != nil {
return fuzzResponse{Value: b, Err: err.Error()}
mutate(ws.mem.valueRef())
if err := ws.fuzzFn(ws.mem.valueRef()); err != nil {
return fuzzResponse{Err: err.Error()}
}
// TODO(jayconrod,katiehockman): return early if coverage is expanded
// by returning a fuzzResponse with the Value set but a nil Err.
// TODO(jayconrod,katiehockman): return early if we find an
// interesting value.
}
}
}
@ -445,18 +460,16 @@ func (wc *workerClient) Close() error {
}
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
func (wc *workerClient) fuzz(value []byte, args fuzzArgs) (fuzzResponse, error) {
func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
wc.mem.setValue(value)
wc.mem.setValue(valueIn)
c := call{Fuzz: &args}
if err := wc.enc.Encode(c); err != nil {
return fuzzResponse{}, err
return nil, fuzzResponse{}, err
}
var resp fuzzResponse
if err := wc.dec.Decode(&resp); err != nil {
return fuzzResponse{}, err
}
return resp, nil
err = wc.dec.Decode(&resp)
valueOut = wc.mem.valueCopy()
return valueOut, resp, err
}