Jay Conrod has uploaded this change for review.
[dev.fuzz] internal/fuzz: improve cancelation in worker event loops
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).
workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.
Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.
Also,
* Increased timeouts to deflake tests. Opened b/183200291 to add flags
to make them more deterministic.
* Fixed missing newline in log message.
Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
---
M src/cmd/go/testdata/script/test_fuzz_chatty.txt
A src/cmd/go/testdata/script/test_fuzz_io_error.txt
M src/cmd/go/testdata/script/test_fuzz_match.txt
M src/internal/fuzz/fuzz.go
M src/internal/fuzz/worker.go
M src/testing/fuzz.go
6 files changed, 237 insertions(+), 168 deletions(-)
diff --git a/src/cmd/go/testdata/script/test_fuzz_chatty.txt b/src/cmd/go/testdata/script/test_fuzz_chatty.txt
index ea81bc3..1784ee0 100644
--- a/src/cmd/go/testdata/script/test_fuzz_chatty.txt
+++ b/src/cmd/go/testdata/script/test_fuzz_chatty.txt
@@ -35,7 +35,7 @@
! stdout FAIL
# Fuzz successful chatty fuzz target that includes a separate unit test.
-go test -v chatty_with_test_fuzz_test.go -fuzz=Fuzz -fuzztime=1s
+go test -v chatty_with_test_fuzz_test.go -fuzz=Fuzz -fuzztime=5s
stdout ok
stdout PASS
! stdout FAIL
diff --git a/src/cmd/go/testdata/script/test_fuzz_io_error.txt b/src/cmd/go/testdata/script/test_fuzz_io_error.txt
new file mode 100644
index 0000000..b9eafd5
--- /dev/null
+++ b/src/cmd/go/testdata/script/test_fuzz_io_error.txt
@@ -0,0 +1,68 @@
+# Test that when the coordinator experiences an I/O error communicating
+# with a worker, the coordinator stops the worker and reports the error.
+# The coordinator should not record a crasher.
+#
+# We simulate an I/O error by closing the fuzz_in / fuzz_out pipe that
+# the coordinator and worker use for RPCs.
+# TODO(jayconrod): test on Windows, too.
+[short] skip
+[!darwin] [!linux] skip
+
+# If the I/O error occurs before F.Fuzz is called, the coordinator should
+# stop the worker and say that.
+! go test -fuzz=FuzzClosePipeBefore
+stdout '^\s*worker terminated without fuzzing$'
+! stdout 'communicating with worker: EOF'
+! exists testdata
+
+# If the I/O error occurs after F.Fuzz is called (unlikely), just exit.
+# It's hard to distinguish this case from the worker being interrupted by ^C
+# or exiting with status 0 (which it should do when interrupted by ^C).
+go test -fuzz=FuzzClosePipeAfter
+
+-- go.mod --
+module test
+
+go 1.17
+-- close_pipe_test.go --
+package close_pipe
+
+import (
+ "flag"
+ "syscall"
+ "testing"
+ "time"
+)
+
+func isWorker() bool {
+ f := flag.Lookup("test.fuzzworker")
+ if f == nil {
+ return false
+ }
+ get, ok := f.Value.(flag.Getter)
+ if !ok {
+ return false
+ }
+ return get.Get() == interface{}(true)
+}
+
+func closePipeAndWait() {
+ syscall.Close(3)
+ syscall.Close(4)
+ time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+}
+
+func FuzzClosePipeBefore(f *testing.F) {
+ if isWorker() {
+ closePipeAndWait()
+ }
+ f.Fuzz(func(*testing.T, []byte) {})
+}
+
+func FuzzClosePipeAfter(f *testing.F) {
+ f.Fuzz(func(*testing.T, []byte) {
+ if isWorker() {
+ closePipeAndWait()
+ }
+ })
+}
diff --git a/src/cmd/go/testdata/script/test_fuzz_match.txt b/src/cmd/go/testdata/script/test_fuzz_match.txt
index 7b2216f..3f1db28 100644
--- a/src/cmd/go/testdata/script/test_fuzz_match.txt
+++ b/src/cmd/go/testdata/script/test_fuzz_match.txt
@@ -7,12 +7,12 @@
stdout '^ok'
# Matches only for fuzzing.
-go test -fuzz Fuzz -fuzztime 2s -parallel 4 standalone_fuzz_test.go
+go test -fuzz Fuzz -fuzztime 5s -parallel 4 standalone_fuzz_test.go
! stdout '^ok.*\[no tests to run\]'
stdout '^ok'
# Matches none for fuzzing but will run the fuzz target as a test.
-go test -fuzz ThisWillNotMatch -fuzztime 2s -parallel 4 standalone_fuzz_test.go
+go test -fuzz ThisWillNotMatch -fuzztime 5s -parallel 4 standalone_fuzz_test.go
! stdout '^ok.*\[no tests to run\]'
stdout '^ok'
stdout '\[no targets to fuzz\]'
@@ -30,7 +30,7 @@
! stdout '\[no targets to fuzz\]'
# Matches more than one fuzz target for fuzzing.
-go test -fuzz Fuzz -fuzztime 2s -parallel 4 multiple_fuzz_test.go
+go test -fuzz Fuzz -fuzztime 5s -parallel 4 multiple_fuzz_test.go
# The tests should run, but not be fuzzed
! stdout '\[no tests to run\]'
! stdout '\[no targets to fuzz\]'
diff --git a/src/internal/fuzz/fuzz.go b/src/internal/fuzz/fuzz.go
index 9ae1ead..7edac27 100644
--- a/src/internal/fuzz/fuzz.go
+++ b/src/internal/fuzz/fuzz.go
@@ -77,13 +77,13 @@
env := os.Environ() // same as self
c := &coordinator{
- doneC: make(chan struct{}),
inputC: make(chan CorpusEntry),
interestingC: make(chan CorpusEntry),
crasherC: make(chan crasherEntry),
}
errC := make(chan error)
+ // newWorker creates a worker but doesn't start it yet.
newWorker := func() (*worker, error) {
mem, err := sharedMemTempFile(sharedMemSize)
if err != nil {
@@ -101,17 +101,28 @@
}, nil
}
+ // fuzzCtx is used to stop workers, for example, after finding a crasher.
+ fuzzCtx, cancelWorkers := context.WithCancel(ctx)
+ defer cancelWorkers()
+
+ // stop is called when a worker encounters a fatal error.
var fuzzErr error
stopping := false
stop := func(err error) {
- if fuzzErr == nil || fuzzErr == ctx.Err() {
+ if err == fuzzCtx.Err() || isInterruptError(err) {
+ // Suppress cancellation errors and terminations due to SIGINT.
+ // The messages are not helpful since either the user triggered the error
+ // (with ^C) or another more helpful message will be printed (a crasher).
+ err = nil
+ }
+ if err != nil && (fuzzErr == nil || fuzzErr == ctx.Err()) {
fuzzErr = err
}
if stopping {
return
}
stopping = true
- close(c.doneC)
+ cancelWorkers()
}
// Start workers.
@@ -126,7 +137,7 @@
for i := range workers {
w := workers[i]
go func() {
- err := w.runFuzzing()
+ err := w.runFuzzing(fuzzCtx)
cleanErr := w.cleanup()
if err == nil {
err = cleanErr
@@ -137,7 +148,7 @@
// Main event loop.
// Do not return until all workers have terminated. We avoid a deadlock by
- // receiving messages from workers even after closing c.doneC.
+ // receiving messages from workers even after ctx is canceled.
activeWorkers := len(workers)
i := 0
for {
@@ -250,11 +261,6 @@
// coordinator holds channels that workers can use to communicate with
// the coordinator.
type coordinator struct {
- // doneC is closed to indicate fuzzing is done and workers should stop.
- // doneC may be closed due to a time limit expiring or a fatal error in
- // a worker.
- doneC chan struct{}
-
// inputC is sent values to fuzz by the coordinator. Any worker may receive
// values from this channel.
inputC chan CorpusEntry
diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go
index 3fe5aeb..de15adb 100644
--- a/src/internal/fuzz/worker.go
+++ b/src/internal/fuzz/worker.go
@@ -65,10 +65,10 @@
// runFuzzing runs the test binary to perform fuzzing.
//
-// This function loops until w.coordinator.doneC is closed or some
-// fatal error is encountered. It receives inputs from w.coordinator.inputC,
-// then passes those on to the worker process.
-func (w *worker) runFuzzing() error {
+// runFuzzing loops until ctx is canceled or a fatal error is encountered. While
+// looping, runFuzzing receives inputs from w.coordinator.inputC, then passes
+// those on to the worker process.
+func (w *worker) runFuzzing(ctx context.Context) error {
// Start the process.
if err := w.start(); err != nil {
// We couldn't start the worker process. We can't do anything, and it's
@@ -76,125 +76,98 @@
return err
}
- // inputC is set to w.coordinator.inputC when the worker is able to process
- // input. It's nil at other times, so its case won't be selected in the
- // event loop below.
- var inputC chan CorpusEntry
-
- // A value is sent to fuzzC to tell the worker to prepare to process an input
- // by setting inputC.
- fuzzC := make(chan struct{}, 1)
-
// Send the worker a message to make sure it can respond.
// Errors that occur before we get a response likely indicate that
// the worker did not call F.Fuzz or called F.Fail first.
// We don't record crashers for these errors.
- pinged := false
- go func() {
- err := w.client.ping()
- if err != nil {
- w.stop() // trigger termC case below
- return
- }
- pinged = true
- fuzzC <- struct{}{}
- }()
+ if err := w.client.ping(ctx); err != nil {
+ w.stop()
+ return fmt.Errorf("worker terminated without fuzzing")
+ // TODO(jayconrod,katiehockman): record and return stderr.
+ }
// Main event loop.
for {
select {
- case <-w.coordinator.doneC:
- // All workers were told to stop.
+ case <-ctx.Done():
+ // Worker was told to stop.
err := w.stop()
- if isInterruptError(err) {
- // Worker interrupted by SIGINT. This can happen if the worker receives
- // SIGINT before installing the signal handler. That's likely if
- // TestMain or the fuzz target setup takes a long time.
- return nil
+ if err != nil {
+ return err
}
- return err
+ return ctx.Err()
case <-w.termC:
- // Worker process terminated unexpectedly.
- if !pinged {
- w.stop()
- return fmt.Errorf("worker terminated without fuzzing")
- // TODO(jayconrod,katiehockman): record and return stderr.
- }
- if isInterruptError(w.waitErr) {
- // Worker interrupted by SIGINT. See comment in doneC case.
- w.stop()
- return nil
- }
+ // Worker process terminated unexpectedly while waiting for input.
+ w.stop()
if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
- w.stop()
return fmt.Errorf("worker exited unexpectedly due to an internal failure")
// TODO(jayconrod,katiehockman): record and return stderr.
}
-
- // Unexpected termination. Inform the coordinator about the crash.
- // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
- mem := <-w.memMu
- value := mem.valueCopy()
- w.memMu <- mem
- message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr)
- crasher := crasherEntry{
- CorpusEntry: CorpusEntry{Data: value},
- errMsg: message,
+ if w.waitErr == nil || isInterruptError(w.waitErr) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal. We can't tell whether we sent the signal
+ // in w.stop above, or the worker terminated for some other reason
+ // like the user pressing ^C.
+ //
+ // When the user presses ^C, on POSIX platforms, SIGINT is delivered
+ // to all processes in the group concurrently, and the worker may
+ // see it first. The worker should exit 0 gracefully (in theory).
+ //
+ // In any case, this condition is probably intentional, so don't
+ // report an error.
+ return nil
}
- w.coordinator.crasherC <- crasher
- return w.stop()
+ if w.waitErr != nil && !isInterruptError(w.waitErr) {
+ return fmt.Errorf("worker exited unexpectedly: %w", w.waitErr)
+ }
+ return w.waitErr
- case input := <-inputC:
+ case input := <-w.coordinator.inputC:
// Received input from coordinator.
- inputC = nil // block new inputs until we finish with this one.
- go func() {
- args := fuzzArgs{Duration: workerFuzzDuration}
- value, resp, err := w.client.fuzz(input.Data, args)
- if err != nil {
- // Error communicating with worker.
- select {
- case <-w.termC:
- // Worker terminated, perhaps unexpectedly.
- // We expect I/O errors due to partially sent or received RPCs,
- // so ignore this error.
- case <-w.coordinator.doneC:
- // Timeout or interruption. Worker may also be interrupted.
- // Again, ignore I/O errors.
- default:
- // TODO(jayconrod): if we get an error here, something failed between
- // main and the call to testing.F.Fuzz. The error here won't
- // be useful. Collect stderr, clean it up, and return that.
- // TODO(jayconrod): we can get EPIPE if w.stop is called concurrently
- // and it kills the worker process. Suppress this message in
- // that case.
- fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
- }
- // TODO(jayconrod): what happens if testing.F.Fuzz is never called?
- // TODO(jayconrod): time out if the test process hangs.
- } else if resp.Crashed {
- // The worker found a crasher. Inform the coordinator.
- crasher := crasherEntry{
- CorpusEntry: CorpusEntry{Data: value},
- errMsg: resp.Err,
- }
- w.coordinator.crasherC <- crasher
- } else {
- // Inform the coordinator that fuzzing found something
- // interesting (i.e. new coverage).
- if resp.Interesting {
- w.coordinator.interestingC <- CorpusEntry{Data: value}
- }
-
- // Continue fuzzing.
- fuzzC <- struct{}{}
+ args := fuzzArgs{Duration: workerFuzzDuration}
+ value, resp, err := w.client.fuzz(ctx, input.Data, args)
+ if err != nil {
+ // Error communicating with worker.
+ w.stop()
+ if ctx.Err() != nil {
+ // Timeout or interruption.
+ return ctx.Err()
}
- // TODO(jayconrod,katiehockman): gather statistics.
- }()
+ if w.waitErr == nil || isInterruptError(w.waitErr) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal. See comment in termC case above.
+ //
+ // Since we expect I/O errors around interrupts, and we don't expect
+ // and can't distinguish I/O errors at other times, ignore this error.
+ return nil
+ }
- case <-fuzzC:
- // Worker finished fuzzing and nothing new happened.
- inputC = w.coordinator.inputC // unblock new inputs
+ // Unexpected termination. Inform the coordinator about the crash.
+ // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
+ mem := <-w.memMu
+ value := mem.valueCopy()
+ w.memMu <- mem
+ message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr)
+ crasher := crasherEntry{
+ CorpusEntry: CorpusEntry{Data: value},
+ errMsg: message,
+ }
+ w.coordinator.crasherC <- crasher
+ return w.waitErr
+ } else if resp.Crashed {
+ // The worker found a crasher. Inform the coordinator.
+ crasher := crasherEntry{
+ CorpusEntry: CorpusEntry{Data: value},
+ errMsg: resp.Err,
+ }
+ w.coordinator.crasherC <- crasher
+ } else if resp.Interesting {
+ // Inform the coordinator that fuzzing found something
+ // interesting (i.e. new coverage).
+ w.coordinator.interestingC <- CorpusEntry{Data: value}
+ }
+ // TODO(jayconrod,katiehockman): gather statistics.
}
}
}
@@ -442,49 +415,57 @@
// does not return errors from method calls; those are passed through serialized
// responses.
func (ws *workerServer) serve(ctx context.Context) error {
- // Stop handling messages when ctx.Done() is closed. This normally happens
- // when the worker process receives a SIGINT signal, which on POSIX platforms
- // is sent to the process group when ^C is pressed.
- //
- // Ordinarily, the coordinator process may stop a worker by closing fuzz_in.
- // We simulate that and interrupt a blocked read here.
- doneC := make(chan struct{})
- defer func() { close(doneC) }()
+ errC := make(chan error, 1)
go func() {
- select {
- case <-ctx.Done():
- ws.fuzzIn.Close()
- case <-doneC:
+ enc := json.NewEncoder(ws.fuzzOut)
+ dec := json.NewDecoder(ws.fuzzIn)
+ for {
+ if ctx.Err() != nil {
+ return
+ }
+
+ var c call
+ if err := dec.Decode(&c); err == io.EOF {
+ return
+ } else if err != nil {
+ errC <- err
+ return
+ }
+ if ctx.Err() != nil {
+ return
+ }
+
+ var resp interface{}
+ switch {
+ case c.Fuzz != nil:
+ resp = ws.fuzz(ctx, *c.Fuzz)
+ case c.Ping != nil:
+ resp = ws.ping(ctx, *c.Ping)
+ default:
+ errC <- errors.New("no arguments provided for any call")
+ return
+ }
+
+ if err := enc.Encode(resp); err != nil {
+ errC <- err
+ return
+ }
}
}()
- enc := json.NewEncoder(ws.fuzzOut)
- dec := json.NewDecoder(ws.fuzzIn)
- for {
- var c call
- if err := dec.Decode(&c); err != nil {
- if ctx.Err() != nil {
- return ctx.Err()
- } else if err == io.EOF {
- return nil
- } else {
- return err
- }
- }
-
- var resp interface{}
- switch {
- case c.Fuzz != nil:
- resp = ws.fuzz(ctx, *c.Fuzz)
- case c.Ping != nil:
- resp = ws.ping(ctx, *c.Ping)
- default:
- return errors.New("no arguments provided for any call")
- }
-
- if err := enc.Encode(resp); err != nil {
- return err
- }
+ select {
+ case <-ctx.Done():
+ // Stop handling messages when ctx.Done() is closed. This normally happens
+ // when the worker process receives a SIGINT signal, which on POSIX platforms
+ // is sent to the process group when ^C is pressed.
+ //
+ // TODO(jayconrod): the goroutine above will still be blocked until the
+ // other process closes the pipes (possibly by exiting). There's no
+ // cross-platform way to interrupt it because the underlying read/write
+ // calls will still block, even after this side is closed.
+ return ctx.Err()
+ case err := <-errC:
+ return err
}
}
@@ -686,7 +667,7 @@
var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
-func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
+func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
@@ -698,11 +679,7 @@
wc.memMu <- mem
c := call{Fuzz: &args}
- if err := wc.enc.Encode(c); err != nil {
- return nil, fuzzResponse{}, err
- }
- err = wc.dec.Decode(&resp)
-
+ err = wc.call(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
@@ -714,14 +691,32 @@
}
// ping tells the worker to call the ping method. See workerServer.ping.
-func (wc *workerClient) ping() error {
+func (wc *workerClient) ping(ctx context.Context) error {
c := call{Ping: &pingArgs{}}
- if err := wc.enc.Encode(c); err != nil {
- return err
- }
var resp pingResponse
- if err := wc.dec.Decode(&resp); err != nil {
+ return wc.call(ctx, c, &resp)
+}
+
+// call sends an RPC from the coordinator to the worker process and waits for
+// the response. The call may be canceled with ctx.
+func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
+ errC := make(chan error, 1)
+ go func() {
+ if err := wc.enc.Encode(c); err != nil {
+ errC <- err
+ return
+ }
+ errC <- wc.dec.Decode(resp)
+ }()
+
+ select {
+ case <-ctx.Done():
+ // TODO(jayconrod): the goroutine above will still be blocked until the
+ // other process closes the pipes (possibly by exiting). There's no
+ // cross-platform way to interrupt it because the underlying read/write
+ // calls will still block, even after this side is closed.
+ return ctx.Err()
+ case err := <-errC:
return err
}
- return nil
}
diff --git a/src/testing/fuzz.go b/src/testing/fuzz.go
index 2a0754f..73ac59c 100644
--- a/src/testing/fuzz.go
+++ b/src/testing/fuzz.go
@@ -362,7 +362,7 @@
if err != nil {
f.result = FuzzResult{Error: err}
f.Fail()
- fmt.Fprintf(f.w, "%v", err)
+ fmt.Fprintf(f.w, "%v\n", err)
if crashErr, ok := err.(fuzzCrashError); ok {
crashName := crashErr.CrashName()
fmt.Fprintf(f.w, "Crash written to %s\n", filepath.Join("testdata/corpus", f.name, crashName))
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Katie Hockman.
Patch set 5:Run-TryBot +1Trust +1
Attention is currently required from: Jay Conrod.
4 comments:
Patchset:
First pass. I still need to look at worker.go more closely. There is some code duplication (ie. some of the same checks but slightly altered) that make it a little hard to follow. I can give more concrete examples of this tomorrow when I can look more closely.
File src/internal/fuzz/worker.go:
Patch Set #5, Line 100: !w.interrupted && isInterruptError(err)
Should this be !isInterruptError(err) ?
Or for consistency with below:
if !w.interrupted && (err == nil || isInterruptError(err))
If not, then this is confusing to me. It seems to meet the condition if it's not interrupted, or if it is an interrupt error, which seems contradictory. (ie. I read this as "if !interrupted && interrupted")
if !w.interrupted && (w.waitErr == nil || isInterruptError(w.waitErr)) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator).
//
// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
// all processes in the group concurrently, and the worker may see it
// before the coordinator. The worker should exit 0 gracefully (in
// theory).
//
// In any case, this condition is probably intended by the user, so
// don't report an error.
return nil
}
if w.waitErr != nil && !isInterruptError(w.waitErr) {
return fmt.Errorf("fuzzing process terminated unexpectedly: %w", w.waitErr)
}
Alternatively, for readability
unexpectedProcessExit := w.waitErr != nil && !isInterruptError(w.waitErr)
if unexpectedProcessExit {
return fmt.Errorf("fuzzing process terminated...
} else if !w.interrupted {
// Worker stopped, either by exiting with status 0...
...
}
Or something similar. You could maybe even do this right before the select, then just use it throughout, since you do a similar but slightly different check at 141.
Patch Set #5, Line 313: w.interrupted = true
maybe move this after the comment
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Jay Conrod.
1 comment:
File src/internal/fuzz/worker.go:
Patch Set #5, Line 100: !w.interrupted && isInterruptError(err)
Should this be !isInterruptError(err) ? […]
The second suggestion I gave won't work, so ignore that one 😊
But I am still wondering if this should be !isInterruptError(err) ?
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Jay Conrod.
Patch set 5:Code-Review +2Trust +1
2 comments:
Patchset:
First pass. I still need to look at worker.go more closely. There is some code duplication (ie. […]
Got some rest, looked at it again, and it makes more sense this time around :) I think a few helper variables in the places I described would be nice, but otherwise this looks good to me!
File src/internal/fuzz/worker.go:
if !w.interrupted && (w.waitErr == nil || isInterruptError(w.waitErr)) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator).
//
// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
// all processes in the group concurrently, and the worker may see it
// before the coordinator. The worker should exit 0 gracefully (in
// theory).
//
// In any case, this condition is probably intended by the user, so
// don't report an error.
return nil
}
if w.waitErr != nil && !isInterruptError(w.waitErr) {
return fmt.Errorf("fuzzing process terminated unexpectedly: %w", w.waitErr)
}
Alternatively, for readability […]
I suppose it can't be defined before w.stop() is called, since that could impact w.waitErr's value, IIUC. But still think a variable here might help make it a bit clearer what's going on.
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Jay Conrod.
Jay Conrod uploaded patch set #6 to this change.
[dev.fuzz] internal/fuzz: improve cancellation in worker event loops
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).
workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.
Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.
Also, fixed missing newline in log message.
Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
---
A src/cmd/go/testdata/script/test_fuzz_io_error.txt
M src/internal/fuzz/fuzz.go
M src/internal/fuzz/sys_windows.go
M src/internal/fuzz/worker.go
M src/testing/fuzz.go
5 files changed, 286 insertions(+), 176 deletions(-)
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 6:Run-TryBot +1Trust +1
3 comments:
File src/internal/fuzz/worker.go:
Patch Set #5, Line 100: !w.interrupted && isInterruptError(err)
The second suggestion I gave won't work, so ignore that one 😊 […]
You're right, there should be a ! in there.
if !w.interrupted && (w.waitErr == nil || isInterruptError(w.waitErr)) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator).
//
// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
// all processes in the group concurrently, and the worker may see it
// before the coordinator. The worker should exit 0 gracefully (in
// theory).
//
// In any case, this condition is probably intended by the user, so
// don't report an error.
return nil
}
if w.waitErr != nil && !isInterruptError(w.waitErr) {
return fmt.Errorf("fuzzing process terminated unexpectedly: %w", w.waitErr)
}
I suppose it can't be defined before w.stop() is called, since that could impact w. […]
I think I simplified this a bit, see what you think.
It was definitely more complicated than it needed to be. w.interrupted should always be false for this case, since w.stop() shouldn't send an interrupt signal after the process has terminated, so I added an assertion for that at the top.
Patch Set #5, Line 313: w.interrupted = true
maybe move this after the comment
Done
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Jay Conrod uploaded patch set #7 to this change.
[dev.fuzz] internal/fuzz: improve cancellation in worker event loops
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).
workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.
Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.
Also fixed missing newline in log message.
Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
---
A src/cmd/go/testdata/script/test_fuzz_io_error.txt
M src/internal/fuzz/fuzz.go
M src/internal/fuzz/sys_windows.go
M src/internal/fuzz/worker.go
M src/testing/fuzz.go
5 files changed, 293 insertions(+), 176 deletions(-)
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 7:Run-TryBot +1Trust +1
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Jay Conrod.
Patch set 7:Run-TryBot +1Code-Review +2Trust +1
1 comment:
File src/internal/fuzz/worker.go:
if !w.interrupted && (w.waitErr == nil || isInterruptError(w.waitErr)) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator). See comment in
// termC case above.
//
// Since we expect I/O errors around interrupts, ignore this error.
return nil
}
if w.interrupted {
// Some other communication error before we stopped the worker.
// Report an error, but don't record a crasher.
return fmt.Errorf("communicating with fuzzing process: %v", err)
}
Consider simplifying this a bit to something like the following:
if w.interrupted {
return fmt.Errorf("communicating with fuzzing process: %v", err)
} else if w.waitErr == nil || isInterruptError(w.waitErr) {
return nil
}To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Jay Conrod.
Jay Conrod uploaded patch set #8 to this change.
[dev.fuzz] internal/fuzz: improve cancellation in worker event loops
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).
workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.
Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.
Also fixed missing newline in log message.
Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
---
A src/cmd/go/testdata/script/test_fuzz_io_error.txt
M src/internal/fuzz/fuzz.go
M src/internal/fuzz/sys_windows.go
M src/internal/fuzz/worker.go
M src/testing/fuzz.go
5 files changed, 293 insertions(+), 176 deletions(-)
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 8:Run-TryBot +1Trust +1
1 comment:
File src/internal/fuzz/worker.go:
if !w.interrupted && (w.waitErr == nil || isInterruptError(w.waitErr)) {
// Worker stopped, either by exiting with status 0 or after being
// interrupted with a signal (not sent by coordinator). See comment in
// termC case above.
//
// Since we expect I/O errors around interrupts, ignore this error.
return nil
}
if w.interrupted {
// Some other communication error before we stopped the worker.
// Report an error, but don't record a crasher.
return fmt.Errorf("communicating with fuzzing process: %v", err)
}
Consider simplifying this a bit to something like the following: […]
Done
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patchset:
TryBot failure is an unrelated timeout. Also seen on master, so unrelated.
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.
Jay Conrod submitted this change.
[dev.fuzz] internal/fuzz: improve cancellation in worker event loops
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).
workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.
Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.
Also fixed missing newline in log message.
Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
Reviewed-on: https://go-review.googlesource.com/c/go/+/303429
Trust: Jay Conrod <jayc...@google.com>
Trust: Katie Hockman <ka...@golang.org>
Run-TryBot: Jay Conrod <jayc...@google.com>
Reviewed-by: Katie Hockman <ka...@golang.org>
---
A src/cmd/go/testdata/script/test_fuzz_io_error.txt
M src/internal/fuzz/fuzz.go
M src/internal/fuzz/sys_windows.go
M src/internal/fuzz/worker.go
M src/testing/fuzz.go
5 files changed, 293 insertions(+), 176 deletions(-)
diff --git a/src/cmd/go/testdata/script/test_fuzz_io_error.txt b/src/cmd/go/testdata/script/test_fuzz_io_error.txt
new file mode 100644
index 0000000..4c7ab4c
--- /dev/null
+++ b/src/cmd/go/testdata/script/test_fuzz_io_error.txt
@@ -0,0 +1,101 @@
+# Test that when the coordinator experiences an I/O error communicating
+# with a worker, the coordinator stops the worker and reports the error.
+# The coordinator should not record a crasher.
+#
+# We simulate an I/O error in the test by writing garbage to fuzz_out.
+# This is unlikely, but possible. It's difficult to simulate interruptions
+# due to ^C and EOF errors which are more common. We don't report those.
+[short] skip
+[!darwin] [!linux] [!windows] skip
+
+# If the I/O error occurs before F.Fuzz is called, the coordinator should
+# stop the worker and say that.
+! go test -fuzz=FuzzClosePipeBefore -parallel=1
+stdout '\s*fuzzing process terminated without fuzzing:'
+! stdout 'communicating with fuzzing process'
+! exists testdata
+
+# If the I/O error occurs after F.Fuzz is called (unlikely), just exit.
+# It's hard to distinguish this case from the worker being interrupted by ^C
+# or exiting with status 0 (which it should do when interrupted by ^C).
+! go test -fuzz=FuzzClosePipeAfter -parallel=1
+stdout '^\s*communicating with fuzzing process: invalid character ''!'' looking for beginning of value$'
+! exists testdata
+
+-- go.mod --
+module test
+
+go 1.17
+-- io_error_test.go --
+package io_error
+
+import (
+ "flag"
+ "testing"
+ "time"
+)
+
+func isWorker() bool {
+ f := flag.Lookup("test.fuzzworker")
+ if f == nil {
+ return false
+ }
+ get, ok := f.Value.(flag.Getter)
+ if !ok {
+ return false
+ }
+ return get.Get() == interface{}(true)
+}
+
+func FuzzClosePipeBefore(f *testing.F) {
+ if isWorker() {
+ sendGarbageToCoordinator(f)
+ time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+ }
+ f.Fuzz(func(*testing.T, []byte) {})
+}
+
+func FuzzClosePipeAfter(f *testing.F) {
+ f.Fuzz(func(t *testing.T, _ []byte) {
+ if isWorker() {
+ sendGarbageToCoordinator(t)
+ time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+ }
+ })
+}
+-- io_error_windows_test.go --
+package io_error
+
+import (
+ "fmt"
+ "os"
+ "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+ v := os.Getenv("GO_TEST_FUZZ_WORKER_HANDLES")
+ var fuzzInFD, fuzzOutFD uintptr
+ if _, err := fmt.Sscanf(v, "%x,%x", &fuzzInFD, &fuzzOutFD); err != nil {
+ tb.Fatalf("parsing GO_TEST_FUZZ_WORKER_HANDLES: %v", err)
+ }
+ f := os.NewFile(fuzzOutFD, "fuzz_out")
+ if _, err := f.Write([]byte("!!")); err != nil {
+ tb.Fatalf("writing fuzz_out: %v", err)
+ }
+}
+-- io_error_notwindows_test.go --
+// +build !windows
+
+package io_error
+
+import (
+ "os"
+ "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+ f := os.NewFile(4, "fuzz_out")
+ if _, err := f.Write([]byte("!!")); err != nil {
+ tb.Fatalf("writing fuzz_out: %v", err)
+ }
+}
diff --git a/src/internal/fuzz/fuzz.go b/src/internal/fuzz/fuzz.go
index 293cb48..5fa265f 100644
--- a/src/internal/fuzz/fuzz.go
+++ b/src/internal/fuzz/fuzz.go
@@ -86,13 +86,13 @@
env := os.Environ() // same as self
c := &coordinator{
- doneC: make(chan struct{}),
inputC: make(chan CorpusEntry),
interestingC: make(chan CorpusEntry),
crasherC: make(chan crasherEntry),
}
errC := make(chan error)
+ // newWorker creates a worker but doesn't start it yet.
newWorker := func() (*worker, error) {
mem, err := sharedMemTempFile(workerSharedMemSize)
if err != nil {
@@ -110,17 +110,30 @@
}, nil
}
+ // fuzzCtx is used to stop workers, for example, after finding a crasher.
+ fuzzCtx, cancelWorkers := context.WithCancel(ctx)
+ defer cancelWorkers()
+ doneC := ctx.Done()
+
+ // stop is called when a worker encounters a fatal error.
var fuzzErr error
stopping := false
stop := func(err error) {
- if fuzzErr == nil || fuzzErr == ctx.Err() {
+ if err == fuzzCtx.Err() || isInterruptError(err) {
+ // Suppress cancellation errors and terminations due to SIGINT.
+ // The messages are not helpful since either the user triggered the error
+ // (with ^C) or another more helpful message will be printed (a crasher).
+ err = nil
+ }
+ if err != nil && (fuzzErr == nil || fuzzErr == ctx.Err()) {
fuzzErr = err
}
if stopping {
return
}
stopping = true
- close(c.doneC)
+ cancelWorkers()
+ doneC = nil
}
// Start workers.
@@ -135,7 +148,7 @@
for i := range workers {
w := workers[i]
go func() {
- err := w.runFuzzing()
+ err := w.coordinate(fuzzCtx)
cleanErr := w.cleanup()
if err == nil {
err = cleanErr
@@ -146,17 +159,14 @@
// Main event loop.
// Do not return until all workers have terminated. We avoid a deadlock by
- // receiving messages from workers even after closing c.doneC.
+ // receiving messages from workers even after ctx is cancelled.
activeWorkers := len(workers)
i := 0
for {
select {
- case <-ctx.Done():
+ case <-doneC:
// Interrupted, cancelled, or timed out.
- // TODO(jayconrod,katiehockman): On Windows, ^C only interrupts 'go test',
- // not the coordinator or worker processes. 'go test' will stop running
- // actions, but it won't interrupt its child processes. This makes it
- // difficult to stop fuzzing on Windows without a timeout.
+ // stop sets doneC to nil so we don't busy wait here.
stop(ctx.Err())
case crasher := <-c.crasherC:
@@ -259,11 +269,6 @@
// coordinator holds channels that workers can use to communicate with
// the coordinator.
type coordinator struct {
- // doneC is closed to indicate fuzzing is done and workers should stop.
- // doneC may be closed due to a time limit expiring or a fatal error in
- // a worker.
- doneC chan struct{}
-
// inputC is sent values to fuzz by the coordinator. Any worker may receive
// values from this channel.
inputC chan CorpusEntry
diff --git a/src/internal/fuzz/sys_windows.go b/src/internal/fuzz/sys_windows.go
index e1734af..de6af81 100644
--- a/src/internal/fuzz/sys_windows.go
+++ b/src/internal/fuzz/sys_windows.go
@@ -135,6 +135,7 @@
}
func isInterruptError(err error) bool {
- // TODO(jayconrod): implement
+ // On Windows, we can't tell whether the process was interrupted by the error
+ // returned by Wait. It looks like an ExitError with status 1.
return false
}
diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go
index 506a485..2c4cc1f 100644
--- a/src/internal/fuzz/worker.go
+++ b/src/internal/fuzz/worker.go
@@ -51,10 +51,11 @@
memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
- cmd *exec.Cmd // current worker process
- client *workerClient // used to communicate with worker process
- waitErr error // last error returned by wait, set before termC is closed.
- termC chan struct{} // closed by wait when worker process terminates
+ cmd *exec.Cmd // current worker process
+ client *workerClient // used to communicate with worker process
+ waitErr error // last error returned by wait, set before termC is closed.
+ interrupted bool // true after stop interrupts a running worker.
+ termC chan struct{} // closed by wait when worker process terminates
}
// cleanup releases persistent resources associated with the worker.
@@ -67,12 +68,12 @@
return mem.Close()
}
-// runFuzzing runs the test binary to perform fuzzing.
+// coordinate runs the test binary to perform fuzzing.
//
-// This function loops until w.coordinator.doneC is closed or some
-// fatal error is encountered. It receives inputs from w.coordinator.inputC,
-// then passes those on to the worker process.
-func (w *worker) runFuzzing() error {
+// coordinate loops until ctx is cancelled or a fatal error is encountered. While
+// looping, coordinate receives inputs from w.coordinator.inputC, then passes
+// those on to the worker process.
+func (w *worker) coordinate(ctx context.Context) error {
// Start the process.
if err := w.start(); err != nil {
// We couldn't start the worker process. We can't do anything, and it's
@@ -80,125 +81,113 @@
return err
}
- // inputC is set to w.coordinator.inputC when the worker is able to process
- // input. It's nil at other times, so its case won't be selected in the
- // event loop below.
- var inputC chan CorpusEntry
-
- // A value is sent to fuzzC to tell the worker to prepare to process an input
- // by setting inputC.
- fuzzC := make(chan struct{}, 1)
-
// Send the worker a message to make sure it can respond.
// Errors that occur before we get a response likely indicate that
// the worker did not call F.Fuzz or called F.Fail first.
// We don't record crashers for these errors.
- pinged := false
- go func() {
- err := w.client.ping()
- if err != nil {
- w.stop() // trigger termC case below
- return
+ if err := w.client.ping(ctx); err != nil {
+ w.stop()
+ if ctx.Err() != nil {
+ return ctx.Err()
}
- pinged = true
- fuzzC <- struct{}{}
- }()
+ if isInterruptError(err) {
+ // User may have pressed ^C before worker responded.
+ return nil
+ }
+ return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
+ // TODO(jayconrod,katiehockman): record and return stderr.
+ }
// Main event loop.
for {
select {
- case <-w.coordinator.doneC:
- // All workers were told to stop.
+ case <-ctx.Done():
+ // Worker was told to stop.
err := w.stop()
- if isInterruptError(err) {
- // Worker interrupted by SIGINT. This can happen if the worker receives
- // SIGINT before installing the signal handler. That's likely if
- // TestMain or the fuzz target setup takes a long time.
- return nil
+ if err != nil && !w.interrupted && !isInterruptError(err) {
+ return err
}
- return err
+ return ctx.Err()
case <-w.termC:
- // Worker process terminated unexpectedly.
- if !pinged {
- w.stop()
- return fmt.Errorf("worker terminated without fuzzing")
- // TODO(jayconrod,katiehockman): record and return stderr.
+ // Worker process terminated unexpectedly while waiting for input.
+ err := w.stop()
+ if w.interrupted {
+ panic("worker interrupted after unexpected termination")
}
- if isInterruptError(w.waitErr) {
- // Worker interrupted by SIGINT. See comment in doneC case.
- w.stop()
+ if err == nil || isInterruptError(err) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal that was not sent by the coordinator.
+ //
+ // When the user presses ^C, on POSIX platforms, SIGINT is delivered to
+ // all processes in the group concurrently, and the worker may see it
+ // before the coordinator. The worker should exit 0 gracefully (in
+ // theory).
+ //
+ // This condition is probably intended by the user, so suppress
+ // the error.
return nil
}
- if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
- w.stop()
- return fmt.Errorf("worker exited unexpectedly due to an internal failure")
- // TODO(jayconrod,katiehockman): record and return stderr.
+ if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
+ // Worker exited with a code indicating F.Fuzz was not called correctly,
+ // for example, F.Fail was called first.
+ return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
}
+ // Worker exited non-zero or was terminated by a non-interrupt signal
+ // (for example, SIGSEGV).
+ return fmt.Errorf("fuzzing process terminated unexpectedly: %w", err)
+ // TODO(jayconrod,katiehockman): record and return stderr.
- // Unexpected termination. Inform the coordinator about the crash.
- // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
- mem := <-w.memMu
- value := mem.valueCopy()
- w.memMu <- mem
- message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr)
- crasher := crasherEntry{
- CorpusEntry: CorpusEntry{Data: value},
- errMsg: message,
- }
- w.coordinator.crasherC <- crasher
- return w.stop()
-
+ if w.interrupted {
+ // Communication error before we stopped the worker.
+ // Report an error, but don't record a crasher.
+ return fmt.Errorf("communicating with fuzzing process: %v", err)
+ }
+ if w.waitErr == nil || isInterruptError(w.waitErr) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal (not sent by coordinator). See comment in
+ // termC case above.
+ //
+ // Since we expect I/O errors around interrupts, ignore this error.
@@ -218,6 +207,7 @@
panic("worker already started")
}
w.waitErr = nil
+ w.interrupted = false
w.termC = nil
cmd := exec.Command(w.binPath, w.args...)
@@ -332,6 +322,7 @@
case <-t.C:
// Timer fired before worker terminated.
+ w.interrupted = true
switch sig {
case os.Interrupt:
// Try to stop the worker with SIGINT and wait a little longer.
@@ -347,7 +338,7 @@
case nil:
// Still waiting. Print a message to let the user know why.
- fmt.Fprintf(os.Stderr, "go: waiting for fuzz worker to terminate...\n")
+ fmt.Fprintf(os.Stderr, "go: waiting for fuzzing process to terminate...\n")
}
}
}
@@ -446,49 +437,55 @@
// does not return errors from method calls; those are passed through serialized
// responses.
func (ws *workerServer) serve(ctx context.Context) error {
- // Stop handling messages when ctx.Done() is closed. This normally happens
- // when the worker process receives a SIGINT signal, which on POSIX platforms
- // is sent to the process group when ^C is pressed.
- //
- // Ordinarily, the coordinator process may stop a worker by closing fuzz_in.
- // We simulate that and interrupt a blocked read here.
- doneC := make(chan struct{})
- defer func() { close(doneC) }()
+ // This goroutine may stay blocked after serve returns because the underlying
+ // read blocks, even after the file descriptor in this process is closed. The
+ // pipe must be closed by the client, too.
+ return ctx.Err()
+ case err := <-errC:
+ return err
}
}
@@ -691,7 +688,7 @@
var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
-func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
+func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
@@ -703,11 +700,7 @@
wc.memMu <- mem
c := call{Fuzz: &args}
- if err := wc.enc.Encode(c); err != nil {
- return nil, fuzzResponse{}, err
- }
- err = wc.dec.Decode(&resp)
-
+ err = wc.call(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
@@ -719,14 +712,31 @@
}
// ping tells the worker to call the ping method. See workerServer.ping.
-func (wc *workerClient) ping() error {
+func (wc *workerClient) ping(ctx context.Context) error {
c := call{Ping: &pingArgs{}}
- if err := wc.enc.Encode(c); err != nil {
- return err
- }
var resp pingResponse
- if err := wc.dec.Decode(&resp); err != nil {
+ return wc.call(ctx, c, &resp)
+}
+
+// call sends an RPC from the coordinator to the worker process and waits for
+// the response. The call may be cancelled with ctx.
+func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
+ // This goroutine may stay blocked after call returns because the underlying
+ // read blocks, even after the file descriptor in this process is closed. The
+ // pipe must be closed by the server, too.
+ errC := make(chan error, 1)
+ go func() {
+ if err := wc.enc.Encode(c); err != nil {
+ errC <- err
+ return
+ }
+ errC <- wc.dec.Decode(resp)
+ }()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-errC:
return err
}
- return nil
}
diff --git a/src/testing/fuzz.go b/src/testing/fuzz.go
index 2a0754f..73ac59c 100644
--- a/src/testing/fuzz.go
+++ b/src/testing/fuzz.go
@@ -362,7 +362,7 @@
if err != nil {
f.result = FuzzResult{Error: err}
f.Fail()
- fmt.Fprintf(f.w, "%v", err)
+ fmt.Fprintf(f.w, "%v\n", err)
if crashErr, ok := err.(fuzzCrashError); ok {
crashName := crashErr.CrashName()
fmt.Fprintf(f.w, "Crash written to %s\n", filepath.Join("testdata/corpus", f.name, crashName))
To view, visit change 303429. To unsubscribe, or for help writing mail filters, visit settings.