Congratulations on opening your first change. Thank you for your contribution!
Next steps:
Within the next week or so, a maintainer will review your change and provide
feedback. See https://golang.org/doc/contribute.html#review for more info and
tips to get your patch through code review.
Most changes in the Go project go through a few rounds of revision. This can be
surprising to people new to the project. The careful, iterative review process
is our way of helping mentor contributors and ensuring that their contributions
have a lasting impact.
During May-July and Nov-Jan the Go project is in a code freeze, during which
little code gets reviewed or merged. If a reviewer responds with a comment like
R=go1.11, it means that this CL will be reviewed as part of the next development
cycle. See https://golang.org/s/release for more details.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Andrei Tudor Călin has uploaded this change for review.
net: add support for splice(2) in (*TCPConn).ReadFrom on Linux
This change adds support for the splice system call on Linux,
for the purpose of optimizing (*TCPConn).ReadFrom by reducing
copies of data from and to userspace. It does so by creating a
temporary pipe and splicing data from the source connection to the
pipe, then from the pipe to the destination connection. The pipe
serves as an in-kernel buffer for the data transfer.
No new API is added to package net, but a new Splice function is
added to package internal/poll, because using splice requires help
from the network poller. Users of the net package should benefit
from the change transparently.
This change only enables the optimization if the Reader in ReadFrom
is a TCP connection. Since splice is a more general interface, it
could, in theory, also be enabled if the Reader were a unix socket,
or the read half of a pipe.
However, benchmarks show that enabling it for unix sockets is most
likely not a net performance gain. The tcp <- unix case is also
fairly unlikely to be used very much by users of package net.
Enabling the optimization for pipes is also problematic from an
implementation perspective, since package net cannot easily get at
the *poll.FD of an *os.File. A possible solution to this would be
to dup the pipe file descriptor, register the duped descriptor with
the network poller, and work on that *poll.FD instead of the original.
However, this seems too intrusive, so it has not been done. If there
was a clean way to do it, it would probably be worth doing, since
splicing from a pipe to a socket can be done directly.
Therefore, this patch only enables the optimization for what is likely
the most common use case: tcp <- tcp.
The following benchmark compares the performance of the previous
userspace genericReadFrom code path to the new optimized code path.
The sub-benchmarks represent chunk sizes used by the writer on the
other end of the Reader passed to ReadFrom.
benchmark old ns/op new ns/op delta
BenchmarkTCPReadFrom/1024-4 4727 4954 +4.80%
BenchmarkTCPReadFrom/2048-4 4389 4301 -2.01%
BenchmarkTCPReadFrom/4096-4 4606 4534 -1.56%
BenchmarkTCPReadFrom/8192-4 5219 4779 -8.43%
BenchmarkTCPReadFrom/16384-4 8708 8008 -8.04%
BenchmarkTCPReadFrom/32768-4 16349 14973 -8.42%
BenchmarkTCPReadFrom/65536-4 35246 27406 -22.24%
BenchmarkTCPReadFrom/131072-4 72920 52382 -28.17%
BenchmarkTCPReadFrom/262144-4 149311 95094 -36.31%
BenchmarkTCPReadFrom/524288-4 306704 181856 -40.71%
BenchmarkTCPReadFrom/1048576-4 674174 357406 -46.99%
benchmark old MB/s new MB/s speedup
BenchmarkTCPReadFrom/1024-4 216.62 206.69 0.95x
BenchmarkTCPReadFrom/2048-4 466.61 476.08 1.02x
BenchmarkTCPReadFrom/4096-4 889.09 903.31 1.02x
BenchmarkTCPReadFrom/8192-4 1569.40 1714.06 1.09x
BenchmarkTCPReadFrom/16384-4 1881.42 2045.84 1.09x
BenchmarkTCPReadFrom/32768-4 2004.18 2188.41 1.09x
BenchmarkTCPReadFrom/65536-4 1859.38 2391.25 1.29x
BenchmarkTCPReadFrom/131072-4 1797.46 2502.21 1.39x
BenchmarkTCPReadFrom/262144-4 1755.69 2756.68 1.57x
BenchmarkTCPReadFrom/524288-4 1709.42 2882.98 1.69x
BenchmarkTCPReadFrom/1048576-4 1555.35 2933.84 1.89x
Fixes #10948
Change-Id: I3ce27f21f7adda8b696afdc48a91149998ae16a5
---
A src/internal/poll/splice_linux.go
A src/net/splice_linux.go
A src/net/splice_stub.go
A src/net/splice_test.go
M src/net/tcpsock_posix.go
5 files changed, 643 insertions(+), 0 deletions(-)
diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go
new file mode 100644
index 0000000..289064c
--- /dev/null
+++ b/src/internal/poll/splice_linux.go
@@ -0,0 +1,195 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+import "syscall"
+
+const (
+ // spliceMove hints to the kernel that it should try to move pages
+ // instead of copying them. As of Linux version 2.6.21, this is a
+ // no-op, so it is unused.
+ spliceMove = 0x1
+
+ // spliceNonblock makes calls to splice(2) non-blocking.
+ spliceNonblock = 0x2
+
+ // spliceMore acts like TCP_CORK, when the destination file descriptor
+ // refers to a socket. This option is currently unused, since it
+ // potentially introduces latency. If users nevertheless desire this
+ // behavior, they can cork the TCP connection themselves, using
+ // syscall.RawConn.
+ //
+ // TODO(acln): verify that the "introduces latency" bit is true.
+ // The following comment seems to suggest so:
+ // https://github.com/golang/go/issues/10948#issuecomment-167559277
+ spliceMore = 0x4
+
+ // spliceGift is specific to vmsplice(2), and is unused.
+ spliceGift = 0x8
+)
+
+// maxSpliceSize is the maximum amount of data Splice asks the kernel to move
+// in a single call to splice(2).
+const maxSpliceSize = 4 << 20
+
+// Splice transfers data from src to dst, using the splice system call to
+// minimize copies of data from and to userspace. Splice creates a temporary
+// pipe, to serve as a buffer for the data transfer.
+//
+// src and dst must both be stream-oriented sockets.
+func Splice(dst, src *FD, remain int64) (int64, error) {
+ prfd, pwfd, err := newTempPipe()
+ if err != nil {
+ return 0, err
+ }
+ defer destroyTempPipe(prfd, pwfd)
+ if err := src.readLock(); err != nil {
+ return 0, err
+ }
+ defer src.readUnlock()
+ if err := dst.writeLock(); err != nil {
+ return 0, err
+ }
+ defer dst.writeUnlock()
+ if err := src.pd.prepareRead(src.isFile); err != nil {
+ return 0, err
+ }
+ if err := dst.pd.prepareWrite(dst.isFile); err != nil {
+ return 0, err
+ }
+ var (
+ written int64
+ n int
+ )
+ for err == nil && remain > 0 {
+ max := maxSpliceSize
+ if int64(max) > remain {
+ max = int(remain)
+ }
+ n, err = spliceDrain(pwfd, src, max)
+ // spliceDrain should never return EAGAIN, so if err != nil,
+ // Splice cannot continue. If n == 0 && err == nil, src is
+ // at EOF, and the transfer is complete.
+ if err != nil || (n == 0 && err == nil) {
+ break
+ }
+ n, err = splicePump(dst, prfd, n)
+ if n > 0 {
+ written += int64(n)
+ remain -= int64(n)
+ }
+ }
+ return written, err
+}
+
+// spliceDrain moves data from a socket to a pipe.
+//
+// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
+// initial state, or splicePump has emptied it previously.
+//
+// Given this, spliceDrain can reasonably assume that the pipe is ready for
+// writing, so if splice returns EAGAIN, it must be because the socket is not
+// ready for reading.
+//
+// If spliceDrain returns (0, nil), src is at EOF.
+func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
+ if max == 0 {
+ return 0, nil
+ }
+ for {
+ n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
+ if err != syscall.EAGAIN {
+ return n, err
+ }
+ if err := sock.pd.waitRead(sock.isFile); err != nil {
+ return n, err
+ }
+ }
+}
+
+// splicePump moves exactly max bytes from a pipe to a socket.
+//
+// Invariant: when entering splicePump, there is at least some
+// data in the pipe, from a previous call to spliceDrain.
+// By analogy to the condition from spliceDrain, splicePump
+// only needs to poll the socket for readiness, if splice returns
+// EAGAIN.
+//
+// If splicePump cannot move all the data in a single call to
+// splice(2), it loops over the buffered data until it has written
+// all of it to the socket. This behavior is similar to the Write
+// step of an io.Copy in userspace.
+func splicePump(sock *FD, pipefd int, remain int) (int, error) {
+ if remain == 0 {
+ return 0, nil
+ }
+ written := 0
+ for remain > 0 {
+ n, err := splice(sock.Sysfd, pipefd, remain, spliceNonblock)
+ // Here, the condition n == 0 && err == nil should never be
+ // observed, since Splice controls the write side of the pipe.
+ if n > 0 {
+ remain -= n
+ written += n
+ continue
+ }
+ if err != syscall.EAGAIN {
+ return written, err
+ }
+ if err := sock.pd.waitWrite(sock.isFile); err != nil {
+ return written, err
+ }
+ }
+ return written, nil
+}
+
+// splice calls splice(2), changing the order of the arguments to the
+// more natural one. Since the current implementation only uses splice on
+// sockets and pipes, the offset arguments are unused. splice returns int
+// instead of int64, because callers never ask it to move more data in a
+// single call than can fit in an int32.
+func splice(out int, in int, max int, flags int) (int, error) {
+ n, err := syscall.Splice(in, nil, out, nil, max, flags)
+ return int(n), err
+}
+
+// newTempPipe sets up a temporary pipe for a splice operation.
+func newTempPipe() (prfd, pwfd int, err error) {
+ var fds [2]int
+ const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
+ if err := syscall.Pipe2(fds[:], flags); err != nil {
+ if err == syscall.ENOSYS {
+ return newTempPipeFallback(fds[:])
+ }
+ return -1, -1, err
+ }
+ return fds[0], fds[1], nil
+}
+
+// newTempPipeFallback is a fallback for newTempPipe, for systems
+// which do not support pipe2 (linux versions 2.6.23 up to 2.6.27).
+func newTempPipeFallback(fds []int) (prfd, pwfd int, err error) {
+ syscall.ForkLock.RLock()
+ defer syscall.ForkLock.RUnlock()
+ if err := syscall.Pipe(fds); err != nil {
+ return -1, -1, err
+ }
+ prfd, pwfd = fds[0], fds[1]
+ syscall.CloseOnExec(prfd)
+ syscall.CloseOnExec(pwfd)
+ syscall.SetNonblock(prfd, true)
+ syscall.SetNonblock(prfd, true)
+ return prfd, pwfd, nil
+}
+
+// destroyTempPipe destroys a temporary pipe.
+func destroyTempPipe(prfd, pwfd int) error {
+ err := CloseFunc(prfd)
+ err1 := CloseFunc(pwfd)
+ if err == nil {
+ return err1
+ }
+ return err
+}
diff --git a/src/net/splice_linux.go b/src/net/splice_linux.go
new file mode 100644
index 0000000..2c295a0
--- /dev/null
+++ b/src/net/splice_linux.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "internal/poll"
+ "io"
+)
+
+// splice transfers data from r to c using the splice system call to minimize
+// copies from and to userspace. c must be a TCP connection. Currently, splice
+// is only enabled if r is also a TCP connection.
+//
+// If splice returns handled == false, it has performed no work.
+func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) {
+ var remain int64 = 1 << 62 // by default, copy until EOF
+ lr, ok := r.(*io.LimitedReader)
+ if ok {
+ remain, r = lr.N, lr.R
+ if remain <= 0 {
+ return 0, nil, true
+ }
+ }
+ s, ok := r.(*TCPConn)
+ if !ok {
+ return 0, nil, false
+ }
+ written, err = poll.Splice(&c.pfd, &s.fd.pfd, remain)
+ if lr != nil {
+ lr.N -= written
+ }
+ // TODO(acln): is written > 0 the right condition?
+ return written, err, written > 0
+}
diff --git a/src/net/splice_stub.go b/src/net/splice_stub.go
new file mode 100644
index 0000000..9106cb2
--- /dev/null
+++ b/src/net/splice_stub.go
@@ -0,0 +1,13 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !linux
+
+package net
+
+import "io"
+
+func splice(c *netFD, r io.Reader) (int64, error, bool) {
+ return 0, nil, false
+}
diff --git a/src/net/splice_test.go b/src/net/splice_test.go
new file mode 100644
index 0000000..97807c9
--- /dev/null
+++ b/src/net/splice_test.go
@@ -0,0 +1,396 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "testing"
+)
+
+func TestSplice(t *testing.T) {
+ t.Run("simple", testSpliceSimple)
+ t.Run("multiple write", testSpliceMultipleWrite)
+ t.Run("big", testSpliceBig)
+ t.Run("honors LimitedReader", testSpliceHonorsLimitedReader)
+}
+
+func testSpliceSimple(t *testing.T) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ copyDone := srv.Copy()
+ msg := []byte("splice test")
+ if _, err := srv.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ got := make([]byte, len(msg))
+ if _, err := io.ReadFull(srv, got); err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, msg) {
+ t.Errorf("got %q, wrote %q", got, msg)
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceMultipleWrite(t *testing.T) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ copyDone := srv.Copy()
+ msg1 := []byte("splice test part 1 ")
+ msg2 := []byte(" splice test part 2")
+ if _, err := srv.Write(msg1); err != nil {
+ t.Fatalf("Write: %v", err)
+ }
+ if _, err := srv.Write(msg2); err != nil {
+ t.Fatal(err)
+ }
+ got := make([]byte, len(msg1)+len(msg2))
+ if _, err := io.ReadFull(srv, got); err != nil {
+ t.Fatal(err)
+ }
+ want := append(msg1, msg2...)
+ if !bytes.Equal(got, want) {
+ t.Errorf("got %q, wrote %q", got, want)
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceBig(t *testing.T) {
+ size := 1<<31 - 1
+ if testing.Short() {
+ size = 1 << 25
+ }
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ big := make([]byte, size)
+ copyDone := srv.Copy()
+ type readResult struct {
+ b []byte
+ err error
+ }
+ readDone := make(chan readResult)
+ go func() {
+ got := make([]byte, len(big))
+ _, err := io.ReadFull(srv, got)
+ readDone <- readResult{got, err}
+ }()
+ if _, err := srv.Write(big); err != nil {
+ t.Fatal(err)
+ }
+ res := <-readDone
+ if res.err != nil {
+ t.Fatal(res.err)
+ }
+ got := res.b
+ if !bytes.Equal(got, big) {
+ t.Errorf("input and output differ")
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceHonorsLimitedReader(t *testing.T) {
+ t.Run("stops after N", testSpliceStopsAfterN)
+ t.Run("updates LimitedReader N", testSpliceUpdatesN)
+}
+
+func testSpliceStopsAfterN(t *testing.T) {
+ clientUp, serverUp, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientUp.Close()
+ defer serverUp.Close()
+ clientDown, serverDown, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientDown.Close()
+ defer serverDown.Close()
+ count := 128
+ copyDone := make(chan error)
+ lr := &io.LimitedReader{
+ N: int64(count),
+ R: serverUp,
+ }
+ go func() {
+ _, err := io.Copy(serverDown, lr)
+ serverDown.Close()
+ copyDone <- err
+ }()
+ msg := make([]byte, 2*count)
+ if _, err := clientUp.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ clientUp.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, clientDown); err != nil {
+ t.Fatal(err)
+ }
+ if buf.Len() != count {
+ t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count)
+ }
+ clientDown.Close()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceUpdatesN(t *testing.T) {
+ clientUp, serverUp, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientUp.Close()
+ defer serverUp.Close()
+ clientDown, serverDown, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientDown.Close()
+ defer serverDown.Close()
+ count := 128
+ copyDone := make(chan error)
+ lr := &io.LimitedReader{
+ N: int64(100 + count),
+ R: serverUp,
+ }
+ go func() {
+ _, err := io.Copy(serverDown, lr)
+ copyDone <- err
+ }()
+ msg := make([]byte, count)
+ if _, err := clientUp.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ clientUp.Close()
+ got := make([]byte, count)
+ if _, err := io.ReadFull(clientDown, got); err != nil {
+ t.Fatal(err)
+ }
+ clientDown.Close()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+ wantN := int64(100)
+ if lr.N != wantN {
+ t.Errorf("lr.N = %d, want %d", lr.N, wantN)
+ }
+}
+
+func BenchmarkTCPReadFrom(b *testing.B) {
+ testHookUninstaller.Do(uninstallTestHooks)
+
+ var chunkSizes []int
+ for i := uint(10); i <= 20; i++ {
+ chunkSizes = append(chunkSizes, 1<<i)
+ }
+ // To benchmark the genericReadFrom code path, set this to false.
+ useSplice := true
+ for _, chunkSize := range chunkSizes {
+ b.Run(fmt.Sprint(chunkSize), func(b *testing.B) {
+ benchmarkSplice(b, chunkSize, useSplice)
+ })
+ }
+}
+
+func benchmarkSplice(b *testing.B, chunkSize int, useSplice bool) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer srv.Close()
+ var copyDone <-chan error
+ if useSplice {
+ copyDone = srv.Copy()
+ } else {
+ copyDone = srv.CopyNoSplice()
+ }
+ chunk := make([]byte, chunkSize)
+ discardDone := make(chan struct{})
+ go func() {
+ for {
+ buf := make([]byte, chunkSize)
+ _, err := srv.Read(buf)
+ if err != nil {
+ break
+ }
+ }
+ discardDone <- struct{}{}
+ }()
+ b.SetBytes(int64(chunkSize))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ srv.Write(chunk)
+ }
+ srv.CloseWrite()
+ <-copyDone
+ srv.CloseRead()
+ <-discardDone
+}
+
+type spliceTestServer struct {
+ clientUp io.WriteCloser
+ clientDown io.ReadCloser
+ serverUp io.ReadCloser
+ serverDown io.WriteCloser
+}
+
+func newSpliceTestServer() (*spliceTestServer, error) {
+ // For now, both networks are hard-coded to TCP.
+ // If splice is enabled for non-tcp upstream connections,
+ // newSpliceTestServer will need to take a network parameter.
+ clientUp, serverUp, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ return nil, err
+ }
+ clientDown, serverDown, err := spliceTestSocketPair("tcp", ":0")
+ if err != nil {
+ clientUp.Close()
+ serverUp.Close()
+ return nil, err
+ }
+ return &spliceTestServer{clientUp, clientDown, serverUp, serverDown}, nil
+}
+
+// Read reads from the downstream connection.
+func (srv *spliceTestServer) Read(b []byte) (int, error) {
+ return srv.clientDown.Read(b)
+}
+
+// Write writes to the upstream connection.
+func (srv *spliceTestServer) Write(b []byte) (int, error) {
+ return srv.clientUp.Write(b)
+}
+
+// Close closes the server.
+func (srv *spliceTestServer) Close() error {
+ err := srv.closeUp()
+ err1 := srv.closeDown()
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+// CloseWrite closes the client side of the upstream connection.
+func (srv *spliceTestServer) CloseWrite() error {
+ return srv.clientUp.Close()
+}
+
+// CloseRead closes the client side of the downstream connection.
+func (srv *spliceTestServer) CloseRead() error {
+ return srv.clientDown.Close()
+}
+
+// Copy copies from the server side of the upstream connection
+// to the server side of the downstream connection, in a separate
+// goroutine. Copy is done when the first send on the returned
+// channel succeeds.
+func (srv *spliceTestServer) Copy() <-chan error {
+ ch := make(chan error)
+ go func() {
+ _, err := io.Copy(srv.serverDown, srv.serverUp)
+ ch <- err
+ close(ch)
+ }()
+ return ch
+}
+
+// CopyNoSplice is like Copy, but ensures that the splice code path
+// is not reached.
+func (srv *spliceTestServer) CopyNoSplice() <-chan error {
+ type onlyReader struct {
+ io.Reader
+ }
+ ch := make(chan error)
+ go func() {
+ _, err := io.Copy(srv.serverDown, onlyReader{srv.serverUp})
+ ch <- err
+ close(ch)
+ }()
+ return ch
+}
+
+func (srv *spliceTestServer) closeUp() error {
+ var err, err1 error
+ if srv.serverUp != nil {
+ err = srv.serverUp.Close()
+ }
+ if srv.clientUp != nil {
+ err1 = srv.clientUp.Close()
+ }
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+func (srv *spliceTestServer) closeDown() error {
+ var err, err1 error
+ if srv.serverDown != nil {
+ err = srv.serverDown.Close()
+ }
+ if srv.clientDown != nil {
+ err1 = srv.clientDown.Close()
+ }
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+// TODO(acln): is this necessary? Do the tests in package net have something
+// similar that could be used instead?
+func spliceTestSocketPair(net string, addr string) (client, server Conn, err error) {
+ ln, err := Listen(net, addr)
+ if err != nil {
+ return nil, nil, err
+ }
+ defer ln.Close()
+ var cerr, serr error
+ acceptDone := make(chan struct{})
+ go func() {
+ server, serr = ln.Accept()
+ acceptDone <- struct{}{}
+ }()
+ client, cerr = Dial(ln.Addr().Network(), ln.Addr().String())
+ <-acceptDone
+ if cerr != nil {
+ if server != nil {
+ server.Close()
+ }
+ return nil, nil, cerr
+ }
+ if serr != nil {
+ if client != nil {
+ client.Close()
+ }
+ return nil, nil, serr
+ }
+ return client, server, nil
+}
diff --git a/src/net/tcpsock_posix.go b/src/net/tcpsock_posix.go
index 58c7e49..f6fd931 100644
--- a/src/net/tcpsock_posix.go
+++ b/src/net/tcpsock_posix.go
@@ -45,6 +45,9 @@
}
func (c *TCPConn) readFrom(r io.Reader) (int64, error) {
+ if n, err, handled := splice(c.fd, r); handled {
+ return n, err
+ }
if n, err, handled := sendFile(c.fd, r); handled {
return n, err
}
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Looks nice. Thanks.
7 comments:
File src/internal/poll/splice_linux.go:
Patch Set #1, Line 13: spliceMove = 0x1
Let's not bother to define the flags we aren't going to use. They aren't exported and aren't used, so the only possible result is that the documentation will go out of date over time.
Patch Set #1, Line 45: return 0, err
I think this function needs to return both the error and a string for the system call that failed, as in `func accept` in this package. Otherwise we may report an error returned by the pipe system call as an error from the splice system call.
Patch Set #1, Line 92: // Given this, spliceDrain can reasonably assume that the pipe is ready for
Normal pipes have a buffer size. When using splice, is it possible for the pipe buffer to fill up such that the kernel returns EAGAIN?
Patch Set #1, Line 114: // Invariant: when entering splicePump, there is at least some
Since you are checking `remain`, I think the invariant is more like "there are least remain bytes in the pipe."
Patch Set #1, Line 148: changing
Minor, but I'm not sure which order is the natural one. I think you can drop this whole clause.
Patch Set #1, Line 163: if err == syscall.ENOSYS {
Add a comment here referring to kernel version numbers, like the one in os/pipe_linux.go.
Patch Set #1, Line 34: // TODO(acln): is written > 0 the right condition?
Perhaps poll.Splice should return some indication of whether it did the right thing, so that we don't have any confusion between an empty input and an unhandled input.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
RELNOTE=yes
Thank you for the review. Before I proceed with the other points, please advise on the comment below.
1 comment:
File src/internal/poll/splice_linux.go:
Patch Set #1, Line 45: return 0, err
I think this function needs to return both the error and a string for the system call that failed, a […]
I overlooked this detail. Now that I think of it, perhaps a better way would be to create the pipe in package net and pass it in to poll.Splice, instead of creating it here? This way, net.splice could return the right error straight away if creating the pipe fails, and poll.Splice could deal with splicing only.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patch Set #1, Line 45: return 0, err
I overlooked this detail. […]
apart from the conjunction of system calls, it's fine to add package-specific error type such as net.OpError for conveying a system call name and the error value.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patch Set #1, Line 45: return 0, err
apart from the conjunction of system calls, it's fine to add package-specific error type such as net […]
I would slightly prefer to create the pipe here. Using a pipe seems like a Linux-specific detail to the general concept.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patch Set #1, Line 45: return 0, err
I would slightly prefer to create the pipe here. […]
Note that we have os.NewSyscallError for a system call plus error, and the net package already uses that where appropriate.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patch Set #1, Line 45: return 0, err
Note that we have os. […]
i completely forgot os.NewSyscallError, yup, please use it if necessary.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Andrei Tudor Călin uploaded patch set #2 to this change.
5 files changed, 662 insertions(+), 0 deletions(-)
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
I've uploaded a new patch set. Please take a look.
7 comments:
File src/internal/poll/splice_linux.go:
Patch Set #1, Line 13: // maxSpliceSize is the maximum amount of data Splice asks
Let's not bother to define the flags we aren't going to use. […]
Done
Patch Set #1, Line 45: if err := dst.pd.prepareWrite(dst.isFile); err != nil {
i completely forgot os.NewSyscallError, yup, please use it if necessary.
Done
Normal pipes have a buffer size. […]
I don't think that's possible, due to the fact that the pipe is always empty when entering spliceDrain. If the socket is ready, I believe the kernel will move as much data as it can (or wants to) in the one call to splice, which will return a nil error. spliceDrain is not called again until the pipe is empty, so it should not observe EAGAIN on the pipe. Please correct me if I'm wrong.
Patch Set #1, Line 114: if n > 0 {
Since you are checking `remain`, I think the invariant is more like "there are least remain bytes in […]
Done
Minor, but I'm not sure which order is the natural one. I think you can drop this whole clause.
Done
Patch Set #1, Line 163: syscall.CloseOnExec(pwfd)
Add a comment here referring to kernel version numbers, like the one in os/pipe_linux.go.
Done
Patch Set #1, Line 34: return written, wrapSyscallError(sc, err), handled
Perhaps poll. […]
Done
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
File src/internal/poll/splice_linux.go:
I don't think that's possible, due to the fact that the pipe is always empty when entering spliceDra […]
When I look at the kernel sources, it seems to me that every pipe has a maximum number of buffers. This can be controlled by fcntl(pfd, F_SETPIPE_SZ, val), as documented in `man 2 fcntl`. When splicing, the kernel will only add that many buffers to the pipe. If the socket holds more data than that, and SPLICE_F_NONBLOCK was passed, the splice call will return EAGAIN. So as far as I can tell, splice can return EAGAIN either because the socket is empty, or because the pipe is full. And this code doesn't handle the case in which the pipe is full. I think that when you see EAGAIN, if you've written data to the pipe, you need to call splicePump to move the data onward. You can only safely wait for the socket if splice returns EAGAIN when there is no data in the pipe.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
When I look at the kernel sources, it seems to me that every pipe has a maximum number of buffers. […]
Please bear with me. I don't understand why the code needs to worry about EAGAIN on the pipe.
When entering spliceDrain, the pipe is completely empty. If spliceDrain sees anything but EAGAIN, it returns immediately. This includes the case where it is successful. In a single invocation of spliceDrain, exactly one successful splice from the socket to the pipe is attempted, and at most one successful such splice can occur. If spliceDrain has written anything to the pipe, it returns, and no other writes to the pipe are attempted until the pipe is empty again.
Given that the pipe is empty, even if there are more buffers in the socket than can fit in the pipe, I believe the kernel should move as much as it can to the pipe without returning EAGAIN, otherwise no progress can ever be made.
As far as I can tell, spliceDrain can never attempt to splice to the pipe if there is any data in it, because as soon as it writes to the pipe, it returns, leaving splicePump to move the data onward, as you suggested it should do.
As for safely waiting for the socket if splice returns EAGAIN, I believe the current code covers that as well, since the pipe is empty when entering spliceDrain, and spliceDrain only wants to do a single successful splice before transferring control to splicePump.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 2:Run-TryBot +1
1 comment:
Please bear with me. I don't understand why the code needs to worry about EAGAIN on the pipe. […]
OK, now I get it. Thanks. Sorry for the confusion.
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Build is still in progress...
This change failed on nacl-amd64p32:
See https://storage.googleapis.com/go-build-log/d75b943f/nacl-amd64p32_8b1d97a6.log
Consult https://build.golang.org/ to see whether it's a new failure. Other builds still in progress; subsequent failure notices suppressed until final report.
6 of 17 TryBots failed:
Failed on nacl-amd64p32: https://storage.googleapis.com/go-build-log/d75b943f/nacl-amd64p32_8b1d97a6.log
Failed on nacl-386: https://storage.googleapis.com/go-build-log/d75b943f/nacl-386_5bf8e022.log
Failed on freebsd-amd64-11_1: https://storage.googleapis.com/go-build-log/d75b943f/freebsd-amd64-11_1_b8d22341.log
Failed on windows-386-2008: https://storage.googleapis.com/go-build-log/d75b943f/windows-386-2008_2d279cf0.log
Failed on openbsd-amd64-62: https://storage.googleapis.com/go-build-log/d75b943f/openbsd-amd64-62_a5a5fc03.log
Failed on windows-amd64-2016: https://storage.googleapis.com/go-build-log/d75b943f/windows-amd64-2016_e4ea92fb.log
Consult https://build.golang.org/ to see whether they are new failures.
Patch set 2:TryBot-Result -1
Patch Set 2: TryBot-Result-1
6 of 17 TryBots failed:
Failed on nacl-amd64p32: https://storage.googleapis.com/go-build-log/d75b943f/nacl-amd64p32_8b1d97a6.log
Failed on nacl-386: https://storage.googleapis.com/go-build-log/d75b943f/nacl-386_5bf8e022.log
Failed on freebsd-amd64-11_1: https://storage.googleapis.com/go-build-log/d75b943f/freebsd-amd64-11_1_b8d22341.log
Failed on windows-386-2008: https://storage.googleapis.com/go-build-log/d75b943f/windows-386-2008_2d279cf0.log
Failed on openbsd-amd64-62: https://storage.googleapis.com/go-build-log/d75b943f/openbsd-amd64-62_a5a5fc03.log
Failed on windows-amd64-2016: https://storage.googleapis.com/go-build-log/d75b943f/windows-amd64-2016_e4ea92fb.logConsult https://build.golang.org/ to see whether they are new failures.
Sorry. The failing test expects to see splice return handled == true if the source reader is at EOF, but the splice stub returns handled == false without touching the reader.
Andrei Tudor Călin uploaded patch set #3 to this change.
5 files changed, 663 insertions(+), 0 deletions(-)
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Fixed the tests to only run on Linux. Also removed an ineffective assignment in poll.Splice.
Is the test code fine in its current form?
Could someone with access to a fast machine with a beefy network card please try the patch on a more real workload? I am dissatisfied with the fact that the benchmarks are pretty artificial in their current form.
1 comment:
OK, now I get it. Thanks. Sorry for the confusion.
Done
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
3 comments:
Patch Set #3, Line 21: src at EOF
there is no convention for subtest names but perhaps it's better not to include spaces, though i'm not sure it works with escaping or quoting like go test -run src\ at\ EOF or -src "src at EOF"
do you want to invoke built-in firewall stuff of consumer oses even in short-mode test?
Patch Set #3, Line 402: Listen(net, addr)
use newLocalListener instead
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Andrei Tudor Călin uploaded patch set #4 to this change.
5 files changed, 661 insertions(+), 0 deletions(-)
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
I've sent a new patch set. Please take a look. Thank you.
3 comments:
Patch Set #3, Line 21: readerAtEO
there is no convention for subtest names but perhaps it's better not to include spaces, though i'm n […]
Done
do you want to invoke built-in firewall stuff of consumer oses even in short-mode test?
newLocalListener is used everywhere now. Does this fix this issue?
Patch Set #3, Line 402: , nil, err
use newLocalListener instead
Done
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 4:Run-TryBot +1
TryBots are happy.
Patch set 4:TryBot-Result +1
1 comment:
Does this fix this issue?
yup
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
> Does this fix this issue? […]
Done
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
Thanks!
Patch set 4:Code-Review +2
Ian Lance Taylor merged this change.
Reviewed-on: https://go-review.googlesource.com/107715
Run-TryBot: Brad Fitzpatrick <brad...@golang.org>
Run-TryBot: Ian Lance Taylor <ia...@golang.org>
TryBot-Result: Gobot Gobot <go...@golang.org>
Reviewed-by: Ian Lance Taylor <ia...@golang.org>
---
A src/internal/poll/splice_linux.go
A src/net/splice_linux.go
A src/net/splice_stub.go
A src/net/splice_test.go
M src/net/tcpsock_posix.go
5 files changed, 661 insertions(+), 0 deletions(-)
diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go
new file mode 100644
index 0000000..7ebd548
--- /dev/null
+++ b/src/internal/poll/splice_linux.go
@@ -0,0 +1,184 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+import "syscall"
+
+const (
+ // spliceNonblock makes calls to splice(2) non-blocking.
+ spliceNonblock = 0x2
+
+ // maxSpliceSize is the maximum amount of data Splice asks
+ // the kernel to move in a single call to splice(2).
+ maxSpliceSize = 4 << 20
+)
+
+// Splice transfers at most remain bytes of data from src to dst, using the
+// splice system call to minimize copies of data from and to userspace.
+//
+// Splice creates a temporary pipe, to serve as a buffer for the data transfer.
+// src and dst must both be stream-oriented sockets.
+//
+// If err != nil, sc is the system call which caused the error.
+func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
+ prfd, pwfd, sc, err := newTempPipe()
+ if err != nil {
+ return 0, false, sc, err
+ }
+ defer destroyTempPipe(prfd, pwfd)
+ // From here on, the operation should be considered handled,
+ // even if Splice doesn't transfer any data.
+ if err := src.readLock(); err != nil {
+ return 0, true, "splice", err
+ }
+ defer src.readUnlock()
+ if err := dst.writeLock(); err != nil {
+ return 0, true, "splice", err
+ }
+ defer dst.writeUnlock()
+ if err := src.pd.prepareRead(src.isFile); err != nil {
+ return 0, true, "splice", err
+ }
+ if err := dst.pd.prepareWrite(dst.isFile); err != nil {
+ return 0, true, "splice", err
+ }
+ var inPipe, n int
+ for err == nil && remain > 0 {
+ max := maxSpliceSize
+ if int64(max) > remain {
+ max = int(remain)
+ }
+ inPipe, err = spliceDrain(pwfd, src, max)
+ // spliceDrain should never return EAGAIN, so if err != nil,
+ // Splice cannot continue. If inPipe == 0 && err == nil,
+ // src is at EOF, and the transfer is complete.
+ if err != nil || (inPipe == 0 && err == nil) {
+ break
+ }
+ n, err = splicePump(dst, prfd, inPipe)
+ if n > 0 {
+ written += int64(n)
+ remain -= int64(n)
+ }
+ }
+ if err != nil {
+ return written, true, "splice", err
+ }
+ return written, true, "", nil
+}
+
+// spliceDrain moves data from a socket to a pipe.
+//
+// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
+// initial state, or splicePump has emptied it previously.
+//
+// Given this, spliceDrain can reasonably assume that the pipe is ready for
+// writing, so if splice returns EAGAIN, it must be because the socket is not
+// ready for reading.
+//
+// If spliceDrain returns (0, nil), src is at EOF.
+func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
+ for {
+ n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
+ if err != syscall.EAGAIN {
+ return n, err
+ }
+ if err := sock.pd.waitRead(sock.isFile); err != nil {
+ return n, err
+ }
+ }
+}
+
+// splicePump moves all the buffered data from a pipe to a socket.
+//
+// Invariant: when entering splicePump, there are exactly inPipe
+// bytes of data in the pipe, from a previous call to spliceDrain.
+//
+// By analogy to the condition from spliceDrain, splicePump
+// only needs to poll the socket for readiness, if splice returns
+// EAGAIN.
+//
+// If splicePump cannot move all the data in a single call to
+// splice(2), it loops over the buffered data until it has written
+// all of it to the socket. This behavior is similar to the Write
+// step of an io.Copy in userspace.
+func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
+ written := 0
+ for inPipe > 0 {
+ n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
+ // Here, the condition n == 0 && err == nil should never be
+ // observed, since Splice controls the write side of the pipe.
+ if n > 0 {
+ inPipe -= n
+ written += n
+ continue
+ }
+ if err != syscall.EAGAIN {
+ return written, err
+ }
+ if err := sock.pd.waitWrite(sock.isFile); err != nil {
+ return written, err
+ }
+ }
+ return written, nil
+}
+
+// splice wraps the splice system call. Since the current implementation
+// only uses splice on sockets and pipes, the offset arguments are unused.
+// splice returns int instead of int64, because callers never ask it to
+// move more data in a single call than can fit in an int32.
+func splice(out int, in int, max int, flags int) (int, error) {
+ n, err := syscall.Splice(in, nil, out, nil, max, flags)
+ return int(n), err
+}
+
+// newTempPipe sets up a temporary pipe for a splice operation.
+func newTempPipe() (prfd, pwfd int, sc string, err error) {
+ var fds [2]int
+ const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
+ if err := syscall.Pipe2(fds[:], flags); err != nil {
+ // pipe2 was added in 2.6.27 and our minimum requirement
+ // is 2.6.23, so it might not be implemented.
+ if err == syscall.ENOSYS {
+ return newTempPipeFallback(fds[:])
+ }
+ return -1, -1, "pipe2", err
+ }
+ return fds[0], fds[1], "", nil
+}
+
+// newTempPipeFallback is a fallback for newTempPipe, for systems
+// which do not support pipe2.
+func newTempPipeFallback(fds []int) (prfd, pwfd int, sc string, err error) {
+ syscall.ForkLock.RLock()
+ defer syscall.ForkLock.RUnlock()
+ if err := syscall.Pipe(fds); err != nil {
+ return -1, -1, "pipe", err
+ }
+ prfd, pwfd = fds[0], fds[1]
+ syscall.CloseOnExec(prfd)
+ syscall.CloseOnExec(pwfd)
+ if err := syscall.SetNonblock(prfd, true); err != nil {
+ CloseFunc(prfd)
+ CloseFunc(pwfd)
+ return -1, -1, "setnonblock", err
+ }
+ if err := syscall.SetNonblock(pwfd, true); err != nil {
+ CloseFunc(prfd)
+ CloseFunc(pwfd)
+ return -1, -1, "setnonblock", err
+ }
+ return prfd, pwfd, "", nil
+}
+
+// destroyTempPipe destroys a temporary pipe.
+func destroyTempPipe(prfd, pwfd int) error {
+ err := CloseFunc(prfd)
+ err1 := CloseFunc(pwfd)
+ if err == nil {
+ return err1
+ }
+ return err
+}
diff --git a/src/net/splice_linux.go b/src/net/splice_linux.go
new file mode 100644
index 0000000..b055f93
--- /dev/null
+++ b/src/net/splice_linux.go
@@ -0,0 +1,35 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "internal/poll"
+ "io"
+)
+
+// splice transfers data from r to c using the splice system call to minimize
+// copies from and to userspace. c must be a TCP connection. Currently, splice
+// is only enabled if r is also a TCP connection.
+//
+// If splice returns handled == false, it has performed no work.
+func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) {
+ var remain int64 = 1 << 62 // by default, copy until EOF
+ lr, ok := r.(*io.LimitedReader)
+ if ok {
+ remain, r = lr.N, lr.R
+ if remain <= 0 {
+ return 0, nil, true
+ }
+ }
+ s, ok := r.(*TCPConn)
+ if !ok {
+ return 0, nil, false
+ }
+ written, handled, sc, err := poll.Splice(&c.pfd, &s.fd.pfd, remain)
+ if lr != nil {
+ lr.N -= written
+ }
+ return written, wrapSyscallError(sc, err), handled
+}
diff --git a/src/net/splice_stub.go b/src/net/splice_stub.go
new file mode 100644
index 0000000..9106cb2
--- /dev/null
+++ b/src/net/splice_stub.go
@@ -0,0 +1,13 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !linux
+
+package net
+
+import "io"
+
+func splice(c *netFD, r io.Reader) (int64, error, bool) {
+ return 0, nil, false
+}
diff --git a/src/net/splice_test.go b/src/net/splice_test.go
new file mode 100644
index 0000000..483a9e5
--- /dev/null
+++ b/src/net/splice_test.go
@@ -0,0 +1,426 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build linux
+
+package net
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "testing"
+)
+
+func TestSplice(t *testing.T) {
+ t.Run("simple", testSpliceSimple)
+ t.Run("multipleWrite", testSpliceMultipleWrite)
+ t.Run("big", testSpliceBig)
+ t.Run("honorsLimitedReader", testSpliceHonorsLimitedReader)
+ t.Run("readerAtEOF", testSpliceReaderAtEOF)
+}
+
+func testSpliceSimple(t *testing.T) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ copyDone := srv.Copy()
+ msg := []byte("splice test")
+ if _, err := srv.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ got := make([]byte, len(msg))
+ if _, err := io.ReadFull(srv, got); err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, msg) {
+ t.Errorf("got %q, wrote %q", got, msg)
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceMultipleWrite(t *testing.T) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ copyDone := srv.Copy()
+ msg1 := []byte("splice test part 1 ")
+ msg2 := []byte(" splice test part 2")
+ if _, err := srv.Write(msg1); err != nil {
+ t.Fatalf("Write: %v", err)
+ }
+ if _, err := srv.Write(msg2); err != nil {
+ t.Fatal(err)
+ }
+ got := make([]byte, len(msg1)+len(msg2))
+ if _, err := io.ReadFull(srv, got); err != nil {
+ t.Fatal(err)
+ }
+ want := append(msg1, msg2...)
+ if !bytes.Equal(got, want) {
+ t.Errorf("got %q, wrote %q", got, want)
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceBig(t *testing.T) {
+ size := 1<<31 - 1
+ if testing.Short() {
+ size = 1 << 25
+ }
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Close()
+ big := make([]byte, size)
+ copyDone := srv.Copy()
+ type readResult struct {
+ b []byte
+ err error
+ }
+ readDone := make(chan readResult)
+ go func() {
+ got := make([]byte, len(big))
+ _, err := io.ReadFull(srv, got)
+ readDone <- readResult{got, err}
+ }()
+ if _, err := srv.Write(big); err != nil {
+ t.Fatal(err)
+ }
+ res := <-readDone
+ if res.err != nil {
+ t.Fatal(res.err)
+ }
+ got := res.b
+ if !bytes.Equal(got, big) {
+ t.Errorf("input and output differ")
+ }
+ srv.CloseWrite()
+ srv.CloseRead()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceHonorsLimitedReader(t *testing.T) {
+ t.Run("stopsAfterN", testSpliceStopsAfterN)
+ t.Run("updatesN", testSpliceUpdatesN)
+}
+
+func testSpliceStopsAfterN(t *testing.T) {
+ clientUp, serverUp, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientUp.Close()
+ defer serverUp.Close()
+ clientDown, serverDown, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientDown.Close()
+ defer serverDown.Close()
+ count := 128
+ copyDone := make(chan error)
+ lr := &io.LimitedReader{
+ N: int64(count),
+ R: serverUp,
+ }
+ go func() {
+ _, err := io.Copy(serverDown, lr)
+ serverDown.Close()
+ copyDone <- err
+ }()
+ msg := make([]byte, 2*count)
+ if _, err := clientUp.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ clientUp.Close()
+ var buf bytes.Buffer
+ if _, err := io.Copy(&buf, clientDown); err != nil {
+ t.Fatal(err)
+ }
+ if buf.Len() != count {
+ t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count)
+ }
+ clientDown.Close()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+}
+
+func testSpliceUpdatesN(t *testing.T) {
+ clientUp, serverUp, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientUp.Close()
+ defer serverUp.Close()
+ clientDown, serverDown, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientDown.Close()
+ defer serverDown.Close()
+ count := 128
+ copyDone := make(chan error)
+ lr := &io.LimitedReader{
+ N: int64(100 + count),
+ R: serverUp,
+ }
+ go func() {
+ _, err := io.Copy(serverDown, lr)
+ copyDone <- err
+ }()
+ msg := make([]byte, count)
+ if _, err := clientUp.Write(msg); err != nil {
+ t.Fatal(err)
+ }
+ clientUp.Close()
+ got := make([]byte, count)
+ if _, err := io.ReadFull(clientDown, got); err != nil {
+ t.Fatal(err)
+ }
+ clientDown.Close()
+ if err := <-copyDone; err != nil {
+ t.Errorf("splice: %v", err)
+ }
+ wantN := int64(100)
+ if lr.N != wantN {
+ t.Errorf("lr.N = %d, want %d", lr.N, wantN)
+ }
+}
+
+func testSpliceReaderAtEOF(t *testing.T) {
+ clientUp, serverUp, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientUp.Close()
+ defer serverUp.Close()
+ clientDown, serverDown, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clientDown.Close()
+ defer serverDown.Close()
+
+ serverUp.Close()
+ _, err, handled := splice(serverDown.(*TCPConn).fd, serverUp)
+ if !handled {
+ t.Errorf("closed connection: got err = %v, handled = %t, want handled = true", err, handled)
+ }
+ lr := &io.LimitedReader{
+ N: 0,
+ R: serverUp,
+ }
+ _, err, handled = splice(serverDown.(*TCPConn).fd, lr)
+ if !handled {
+ t.Errorf("exhausted LimitedReader: got err = %v, handled = %t, want handled = true", err, handled)
+ }
+}
+
+func BenchmarkTCPReadFrom(b *testing.B) {
+ testHookUninstaller.Do(uninstallTestHooks)
+
+ var chunkSizes []int
+ for i := uint(10); i <= 20; i++ {
+ chunkSizes = append(chunkSizes, 1<<i)
+ }
+ // To benchmark the genericReadFrom code path, set this to false.
+ useSplice := true
+ for _, chunkSize := range chunkSizes {
+ b.Run(fmt.Sprint(chunkSize), func(b *testing.B) {
+ benchmarkSplice(b, chunkSize, useSplice)
+ })
+ }
+}
+
+func benchmarkSplice(b *testing.B, chunkSize int, useSplice bool) {
+ srv, err := newSpliceTestServer()
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer srv.Close()
+ var copyDone <-chan error
+ if useSplice {
+ copyDone = srv.Copy()
+ } else {
+ copyDone = srv.CopyNoSplice()
+ }
+ chunk := make([]byte, chunkSize)
+ discardDone := make(chan struct{})
+ go func() {
+ for {
+ buf := make([]byte, chunkSize)
+ _, err := srv.Read(buf)
+ if err != nil {
+ break
+ }
+ }
+ discardDone <- struct{}{}
+ }()
+ b.SetBytes(int64(chunkSize))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ srv.Write(chunk)
+ }
+ srv.CloseWrite()
+ <-copyDone
+ srv.CloseRead()
+ <-discardDone
+}
+
+type spliceTestServer struct {
+ clientUp io.WriteCloser
+ clientDown io.ReadCloser
+ serverUp io.ReadCloser
+ serverDown io.WriteCloser
+}
+
+func newSpliceTestServer() (*spliceTestServer, error) {
+ // For now, both networks are hard-coded to TCP.
+ // If splice is enabled for non-tcp upstream connections,
+ // newSpliceTestServer will need to take a network parameter.
+ clientUp, serverUp, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ return nil, err
+ }
+ clientDown, serverDown, err := spliceTestSocketPair("tcp")
+ if err != nil {
+ clientUp.Close()
+ serverUp.Close()
+ return nil, err
+ }
+ return &spliceTestServer{clientUp, clientDown, serverUp, serverDown}, nil
+}
+
+// Read reads from the downstream connection.
+func (srv *spliceTestServer) Read(b []byte) (int, error) {
+ return srv.clientDown.Read(b)
+}
+
+// Write writes to the upstream connection.
+func (srv *spliceTestServer) Write(b []byte) (int, error) {
+ return srv.clientUp.Write(b)
+}
+
+// Close closes the server.
+func (srv *spliceTestServer) Close() error {
+ err := srv.closeUp()
+ err1 := srv.closeDown()
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+// CloseWrite closes the client side of the upstream connection.
+func (srv *spliceTestServer) CloseWrite() error {
+ return srv.clientUp.Close()
+}
+
+// CloseRead closes the client side of the downstream connection.
+func (srv *spliceTestServer) CloseRead() error {
+ return srv.clientDown.Close()
+}
+
+// Copy copies from the server side of the upstream connection
+// to the server side of the downstream connection, in a separate
+// goroutine. Copy is done when the first send on the returned
+// channel succeeds.
+func (srv *spliceTestServer) Copy() <-chan error {
+ ch := make(chan error)
+ go func() {
+ _, err := io.Copy(srv.serverDown, srv.serverUp)
+ ch <- err
+ close(ch)
+ }()
+ return ch
+}
+
+// CopyNoSplice is like Copy, but ensures that the splice code path
+// is not reached.
+func (srv *spliceTestServer) CopyNoSplice() <-chan error {
+ type onlyReader struct {
+ io.Reader
+ }
+ ch := make(chan error)
+ go func() {
+ _, err := io.Copy(srv.serverDown, onlyReader{srv.serverUp})
+ ch <- err
+ close(ch)
+ }()
+ return ch
+}
+
+func (srv *spliceTestServer) closeUp() error {
+ var err, err1 error
+ if srv.serverUp != nil {
+ err = srv.serverUp.Close()
+ }
+ if srv.clientUp != nil {
+ err1 = srv.clientUp.Close()
+ }
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+func (srv *spliceTestServer) closeDown() error {
+ var err, err1 error
+ if srv.serverDown != nil {
+ err = srv.serverDown.Close()
+ }
+ if srv.clientDown != nil {
+ err1 = srv.clientDown.Close()
+ }
+ if err == nil {
+ return err1
+ }
+ return err
+}
+
+func spliceTestSocketPair(net string) (client, server Conn, err error) {
+ ln, err := newLocalListener(net)
+ if err != nil {
+ return nil, nil, err
+ }
+ defer ln.Close()
+ var cerr, serr error
+ acceptDone := make(chan struct{})
+ go func() {
+ server, serr = ln.Accept()
+ acceptDone <- struct{}{}
+ }()
+ client, cerr = Dial(ln.Addr().Network(), ln.Addr().String())
+ <-acceptDone
+ if cerr != nil {
+ if server != nil {
+ server.Close()
+ }
+ return nil, nil, cerr
+ }
+ if serr != nil {
+ if client != nil {
+ client.Close()
+ }
+ return nil, nil, serr
+ }
+ return client, server, nil
+}
diff --git a/src/net/tcpsock_posix.go b/src/net/tcpsock_posix.go
index 58c7e49..f6fd931 100644
--- a/src/net/tcpsock_posix.go
+++ b/src/net/tcpsock_posix.go
@@ -45,6 +45,9 @@
}
func (c *TCPConn) readFrom(r io.Reader) (int64, error) {
+ if n, err, handled := splice(c.fd, r); handled {
+ return n, err
+ }
if n, err, handled := sendFile(c.fd, r); handled {
return n, err
}
To view, visit change 107715. To unsubscribe, or for help writing mail filters, visit settings.
This CL may have broken the linux-mips builder: https://build.golang.org
It has been failing on the net tests since LC this was merged.
This CL may have broken the linux-mips builder: https://build.golang.org
It has been failing on the net tests since LC this was merged.
Thanks for pointing that out. Sent https://golang.org/cl/109316 to fix the problem.
However, benchmarks show that enabling it for unix sockets is most
likely not a net performance gain. The tcp <- unix case is also
fairly unlikely to be used very much by users of package net.
I may be reading this wrong, but it seems that splice(2) support reading from AF_UNIX was introduced [1] in Linux 4.2.0. Before that the was no support whatsoever for splice(2) with AF_UNIX, thus this patch would indeed not provide any gains for the tcp <- unix case on kernels < 4.2.0.
There's no information irt the kernel version the perf benchmarks cited in this commit took place, so I wouldn't dismiss potential gains from this patch in Go 1.11 for the AF_UNIX io.Copy case in more recent kernels.
P.
1. https://github.com/torvalds/linux/commit/2b514574f7e88c8498027ee366fd6e7aae5aa4b5