[dev.fuzz] internal/fuzz: guard concurrent access to shared memory

This change moves the worker's *sharedMem into a buffered chan that
acts as a mutex. The mutex can be locked by receiving from the chan;
it can be unlocked by sending *sharedMem back to the chan. Multiple
objects (like worker, workerClient, workerServer) may have references
to the chan and may hold the lock across several operations.

This is intended to fix a segfault that occurred when
workerClient.fuzz accessed shared memory after it was already closed
and unmapped by the worker's goroutine. workerClient.fuzz is executed
in a separate goroutine so the worker can still receive messages from
the coordinator (like being told to stop and clean up).

Change-Id: I4eb9079ba9e5bfcfacfecd0fc8ad9bed17b33bba
Reviewed-on: https://go-review.googlesource.com/c/go/+/285054
Run-TryBot: Jay Conrod <jayconrod@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Katie Hockman <katie@golang.org>
Trust: Jay Conrod <jayconrod@google.com>
This commit is contained in:
Jay Conrod 2021-01-20 17:09:59 -05:00
parent 8e0584c327
commit 671dba6c89
4 changed files with 57 additions and 19 deletions

View File

@ -75,13 +75,15 @@ func CoordinateFuzzing(ctx context.Context, parallel int, seed [][]byte, corpusD
if err != nil {
return nil, err
}
memMu := make(chan *sharedMem, 1)
memMu <- mem
return &worker{
dir: dir,
binPath: binPath,
args: args,
env: env,
coordinator: c,
mem: mem,
memMu: memMu,
}, nil
}

View File

@ -50,7 +50,10 @@ func (m *sharedMem) Close() error {
// setWorkerComm configures communciation channels on the cmd that will
// run a worker process.
func setWorkerComm(cmd *exec.Cmd, comm workerComm) {
cmd.ExtraFiles = []*os.File{comm.fuzzIn, comm.fuzzOut, comm.mem.f}
mem := <-comm.memMu
memFile := mem.f
comm.memMu <- mem
cmd.ExtraFiles = []*os.File{comm.fuzzIn, comm.fuzzOut, memFile}
}
// getWorkerComm returns communication channels in the worker process.
@ -71,7 +74,9 @@ func getWorkerComm() (comm workerComm, err error) {
if err != nil {
return workerComm{}, err
}
return workerComm{fuzzIn: fuzzIn, fuzzOut: fuzzOut, mem: mem}, nil
memMu := make(chan *sharedMem, 1)
memMu <- mem
return workerComm{fuzzIn: fuzzIn, fuzzOut: fuzzOut, memMu: memMu}, nil
}
// isInterruptError returns whether an error was returned by a process that

View File

@ -85,10 +85,13 @@ func (m *sharedMem) Close() error {
// setWorkerComm configures communciation channels on the cmd that will
// run a worker process.
func setWorkerComm(cmd *exec.Cmd, comm workerComm) {
mem := <-comm.memMu
memFD := mem.f.Fd()
comm.memMu <- mem
syscall.SetHandleInformation(syscall.Handle(comm.fuzzIn.Fd()), syscall.HANDLE_FLAG_INHERIT, 1)
syscall.SetHandleInformation(syscall.Handle(comm.fuzzOut.Fd()), syscall.HANDLE_FLAG_INHERIT, 1)
syscall.SetHandleInformation(syscall.Handle(comm.mem.f.Fd()), syscall.HANDLE_FLAG_INHERIT, 1)
cmd.Env = append(cmd.Env, fmt.Sprintf("GO_TEST_FUZZ_WORKER_HANDLES=%x,%x,%x", comm.fuzzIn.Fd(), comm.fuzzOut.Fd(), comm.mem.f.Fd()))
syscall.SetHandleInformation(syscall.Handle(memFD), syscall.HANDLE_FLAG_INHERIT, 1)
cmd.Env = append(cmd.Env, fmt.Sprintf("GO_TEST_FUZZ_WORKER_HANDLES=%x,%x,%x", comm.fuzzIn.Fd(), comm.fuzzOut.Fd(), memFD))
}
// getWorkerComm returns communication channels in the worker process.
@ -128,8 +131,10 @@ func getWorkerComm() (comm workerComm, err error) {
if err != nil {
return workerComm{}, err
}
memMu := make(chan *sharedMem, 1)
memMu <- mem
return workerComm{fuzzIn: fuzzIn, fuzzOut: fuzzOut, mem: mem}, nil
return workerComm{fuzzIn: fuzzIn, fuzzOut: fuzzOut, memMu: memMu}, nil
}
func isInterruptError(err error) bool {

View File

@ -40,7 +40,7 @@ type worker struct {
coordinator *coordinator
mem *sharedMem // shared memory with worker; persists across processes.
memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
cmd *exec.Cmd // current worker process
client *workerClient // used to communicate with worker process
@ -50,12 +50,12 @@ type worker struct {
// cleanup releases persistent resources associated with the worker.
func (w *worker) cleanup() error {
if w.mem == nil {
mem := <-w.memMu
if mem == nil {
return nil
}
err := w.mem.Close()
w.mem = nil
return err
close(w.memMu)
return mem.Close()
}
// runFuzzing runs the test binary to perform fuzzing.
@ -99,7 +99,9 @@ func (w *worker) runFuzzing() error {
// Unexpected termination. Inform the coordinator about the crash.
// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
value := w.mem.valueCopy()
mem := <-w.memMu
value := mem.valueCopy()
w.memMu <- mem
message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr)
crasher := crasherEntry{
corpusEntry: corpusEntry{b: value},
@ -215,7 +217,7 @@ func (w *worker) start() (err error) {
return err
}
defer fuzzOutW.Close()
setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, mem: w.mem})
setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
// Start the worker process.
if err := cmd.Start(); err != nil {
@ -229,7 +231,7 @@ func (w *worker) start() (err error) {
// called later by stop.
w.cmd = cmd
w.termC = make(chan struct{})
w.client = newWorkerClient(workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, mem: w.mem})
w.client = newWorkerClient(workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu})
go func() {
w.waitErr = w.cmd.Wait()
@ -369,7 +371,7 @@ type fuzzResponse struct {
// (coordinator) has exclusive access.
type workerComm struct {
fuzzIn, fuzzOut *os.File
mem *sharedMem
memMu chan *sharedMem // mutex guarding shared memory
}
// workerServer is a minimalist RPC server, run by fuzz worker processes.
@ -447,6 +449,8 @@ func (ws *workerServer) serve(ctx context.Context) error {
func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) fuzzResponse {
ctx, cancel := context.WithTimeout(ctx, args.Duration)
defer cancel()
mem := <-ws.memMu
defer func() { ws.memMu <- mem }()
for {
select {
@ -455,9 +459,9 @@ func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) fuzzResponse {
// real heuristic once we have one.
return fuzzResponse{Interesting: true}
default:
b := ws.mem.valueRef()
b := mem.valueRef()
ws.m.mutate(&b)
ws.mem.setValueLen(len(b))
mem.setValueLen(len(b))
if err := ws.fuzzFn(b); err != nil {
return fuzzResponse{Err: err.Error()}
}
@ -509,17 +513,39 @@ func (wc *workerClient) Close() error {
return wc.fuzzOut.Close()
}
// errSharedMemClosed is returned by workerClient methods that cannot access
// shared memory because it was closed and unmapped by another goroutine. That
// can happen when worker.cleanup is called in the worker goroutine while a
// workerClient.fuzz call runs concurrently.
//
// This error should not be reported. It indicates the operation was
// interrupted.
var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
wc.mem.setValue(valueIn)
mem, ok := <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
}
mem.setValue(valueIn)
wc.memMu <- mem
c := call{Fuzz: &args}
if err := wc.enc.Encode(c); err != nil {
return nil, fuzzResponse{}, err
}
err = wc.dec.Decode(&resp)
valueOut = wc.mem.valueCopy()
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
}
valueOut = mem.valueCopy()
wc.memMu <- mem
return valueOut, resp, err
}