[go] net: implement readv/io.ReaderFrom for Buffers

171 views
Skip to first unread message

Justin Nuß (Gerrit)

unread,
Oct 2, 2016, 5:20:44 AM10/2/16
to Ian Lance Taylor, golang-co...@googlegroups.com
Justin Nuß uploaded a change:
https://go-review.googlesource.com/30102

net: implement readv/io.ReaderFrom for Buffers

CL 29951 (issue #13451) introduced the net.Buffers as a way
to use OS optimized batch writes and implemented it for unix
systems via writev. This CL implements a similiar optimization
for reads by using readv on unix systems. (No fast path
currently for other systems).

The implementation and tests are based on Brads code for the
WriterTo/writev logic.

Unlinke the write logic which "consumes" the Buffers
(reslicing, until the Buffers is empty), the read logic does
not consume the slice as otherwise users would need to keep
a reference to the original Buffers.

Calling ReadFrom or Write multiple times will overwrite the
data in the Buffers. This avoids having to maintain a
pointer/offset to the last byte read/written (which would
mean changing the type of Buffers). To support this
readv_unix.go, which is otherwise just a minimal adaption
from writev_unix.go, must keep it's own offset into the
Buffers.

Change-Id: I0dbe0d887e71685e47e6519103d8f2b828946c72
---
M src/net/fd_unix.go
M src/net/net.go
A src/net/readv_test.go
A src/net/readv_unix.go
4 files changed, 298 insertions(+), 7 deletions(-)



diff --git a/src/net/fd_unix.go b/src/net/fd_unix.go
index 1296bc5..c92f0d4 100644
--- a/src/net/fd_unix.go
+++ b/src/net/fd_unix.go
@@ -29,7 +29,7 @@
laddr Addr
raddr Addr

- // writev cache.
+ // readv/writev cache.
iovecs *[]syscall.Iovec

// wait server
diff --git a/src/net/net.go b/src/net/net.go
index 8ab952a..c66a1ee 100644
--- a/src/net/net.go
+++ b/src/net/net.go
@@ -605,6 +605,16 @@
<-threadLimit
}

+// buffersReader is the interface implemented by Conns that support a
+// "readv"-like batch read optimization.
+// readBuffers should fully fill all chunks from the
+// provided Buffers, else it should report a non-nil error.
+type buffersReader interface {
+ readBuffers(*Buffers) (int64, error)
+}
+
+var testHookDidReadv = func(read int) {}
+
// buffersWriter is the interface implemented by Conns that support a
// "writev"-like batch write optimization.
// writeBuffers should fully consume and write all chunks from the
@@ -615,17 +625,34 @@

var testHookDidWritev = func(wrote int) {}

-// Buffers contains zero or more runs of bytes to write.
+// Buffers contains zero or more runs of bytes to read into or write.
//
-// On certain machines, for certain types of connections, this is
-// optimized into an OS-specific batch write operation (such as
-// "writev").
+// On certain machines, for certain types of connections, these are
+// optimized into OS-specific batch read or write operations (such as
+// "readv" or "writev").
type Buffers [][]byte

var (
- _ io.WriterTo = (*Buffers)(nil)
- _ io.Reader = (*Buffers)(nil)
+ _ io.Writer = (*Buffers)(nil)
+ _ io.WriterTo = (*Buffers)(nil)
+ _ io.Reader = (*Buffers)(nil)
+ _ io.ReaderFrom = (*Buffers)(nil)
)
+
+func (v *Buffers) Write(p []byte) (n int, err error) {
+ for _, b := range *v {
+ n0 := copy(b, p)
+ p = p[n0:]
+ n += n0
+ if n0 < len(b) || len(p) == 0 {
+ break
+ }
+ }
+ if len(p) > 0 {
+ err = io.ErrShortWrite
+ }
+ return
+}

func (v *Buffers) WriteTo(w io.Writer) (n int64, err error) {
if wv, ok := w.(buffersWriter); ok {
@@ -656,6 +683,23 @@
return
}

+func (v *Buffers) ReadFrom(r io.Reader) (n int64, err error) {
+ if rv, ok := r.(buffersReader); ok {
+ return rv.readBuffers(v)
+ }
+ for _, b := range *v {
+ nb, err := r.Read(b)
+ n += int64(nb)
+ if err != nil {
+ return n, err
+ }
+ if nb < len(b) {
+ break
+ }
+ }
+ return n, nil
+}
+
func (v *Buffers) consume(n int64) {
for len(*v) > 0 {
ln0 := int64(len((*v)[0]))
diff --git a/src/net/readv_test.go b/src/net/readv_test.go
new file mode 100644
index 0000000..040c013
--- /dev/null
+++ b/src/net/readv_test.go
@@ -0,0 +1,146 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "runtime"
+ "sync"
+ "testing"
+)
+
+func TestBuffers_write(t *testing.T) {
+ var story = []byte("once upon a time in Gopherland ... ")
+ var buffers Buffers
+ for range story {
+ buffers = append(buffers, make([]byte, 1))
+ }
+ n, err := buffers.Write(story)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if n != len(story) {
+ t.Errorf("n = %d; want %d", n, len(story))
+ }
+ if got := bytes.Join([][]byte(buffers), nil); !bytes.Equal(got, story) {
+ t.Errorf("read %q; want %q", string(got), string(story))
+ }
+ if len(buffers) != len(story) {
+ t.Errorf("len(buffers) = %d; want 0%d", len(buffers), len(story))
+ }
+}
+
+func TestBuffers_ReadFrom(t *testing.T) {
+ for _, name := range []string{"ReadFrom", "Copy"} {
+ for _, size := range []int{0, 10, 1023, 1024, 1025} {
+ t.Run(fmt.Sprintf("%s/%d", name, size), func(t *testing.T) {
+ testBuffer_readFrom(t, size, false, name == "Copy")
+ })
+ t.Run(fmt.Sprintf("%s/%d+1", name, size), func(t *testing.T) {
+ testBuffer_readFrom(t, size, true, name == "Copy")
+ })
+ }
+ }
+}
+
+func testBuffer_readFrom(t *testing.T, chunks int, extraByte, useCopy
bool) {
+ oldHook := testHookDidReadv
+ defer func() { testHookDidReadv = oldHook }()
+ var readLog struct {
+ sync.Mutex
+ log []int
+ }
+ testHookDidReadv = func(size int) {
+ readLog.Lock()
+ readLog.log = append(readLog.log, size)
+ readLog.Unlock()
+ }
+ var want bytes.Buffer
+ for i := 0; i < chunks; i++ {
+ want.WriteByte(byte(i))
+ }
+ if extraByte { // write extra byte that doesn't fit into the Buffers
+ want.WriteByte(byte(chunks))
+ }
+
+ readDone := make(chan struct{})
+
+ withTCPConnPair(t, func(c *TCPConn) error {
+ defer close(readDone)
+ buffers := make(Buffers, chunks)
+ for i := range buffers {
+ buffers[i] = make([]byte, 1)
+ }
+ var n int64
+ var err error
+ if useCopy {
+ n, err = io.Copy(&buffers, c)
+ } else {
+ n, err = buffers.ReadFrom(c)
+ }
+ if err != nil {
+ return err
+ }
+ if n != int64(chunks) {
+ return fmt.Errorf("Buffers.ReadFrom returned %d; want %d", n, chunks)
+ }
+ if extraByte {
+ n, err := io.Copy(ioutil.Discard, c)
+ if err != nil {
+ return err
+ }
+ if n != 1 {
+ return fmt.Errorf("found %d extra bytes; want 1", n)
+ }
+ }
+ return nil
+ }, func(c *TCPConn) error {
+ var n int
+ for i := 0; i < want.Len(); i++ {
+ n0, err := c.Write(want.Bytes()[i : i+1])
+ n += n0
+ if err != nil {
+ return err
+ }
+ }
+ if err := c.CloseWrite(); err != nil {
+ return err
+ }
+ if n != want.Len() {
+ return fmt.Errorf("client wrote %d; want %d", n, want.Len())
+ }
+ <-readDone // wait for the data to be fully read
+ readLog.Lock() // no need to unlock (or even lock, but let's be safe)
+ var gotSum int
+ for _, v := range readLog.log {
+ gotSum += v
+ }
+
+ var wantSum int
+ var wantMinCalls int
+ switch runtime.GOOS {
+ case "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd":
+ wantSum = want.Len()
+ if extraByte {
+ wantSum--
+ }
+ v := chunks
+ for v > 0 {
+ wantMinCalls++
+ v -= 1024
+ }
+ }
+ if len(readLog.log) < wantMinCalls {
+ t.Errorf("write calls = %v < wanted min %v", len(readLog.log),
wantMinCalls)
+ }
+ if gotSum != wantSum {
+ t.Errorf("readv call sum = %v; want %v", gotSum, wantSum)
+ }
+ return nil
+ })
+}
diff --git a/src/net/readv_unix.go b/src/net/readv_unix.go
new file mode 100644
index 0000000..9b4a448
--- /dev/null
+++ b/src/net/readv_unix.go
@@ -0,0 +1,101 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build darwin dragonfly freebsd linux netbsd openbsd
+
+package net
+
+import (
+ "os"
+ "syscall"
+ "unsafe"
+)
+
+func (c *conn) readBuffers(v *Buffers) (int64, error) {
+ if !c.ok() {
+ return 0, syscall.EINVAL
+ }
+ n, err := c.fd.readBuffers(v)
+ if err != nil {
+ return n, &OpError{Op: "readv", Net: c.fd.net, Source: c.fd.laddr, Addr:
c.fd.raddr, Err: err}
+ }
+ return n, nil
+}
+
+func (fd *netFD) readBuffers(v *Buffers) (n int64, err error) {
+ if err := fd.readLock(); err != nil {
+ return 0, err
+ }
+ defer fd.readUnlock()
+ if err := fd.pd.prepareRead(); err != nil {
+ return 0, err
+ }
+
+ var iovecs []syscall.Iovec
+ if fd.iovecs != nil {
+ iovecs = *fd.iovecs
+ }
+ // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is
+ // 1024 and this seems conservative enough for now. Darwin's
+ // UIO_MAXIOV also seems to be 1024.
+ maxVec := 1024
+
+ vc := *v
+ pos := 0
+
+ for len(vc) > 0 {
+ iovecs = iovecs[:0]
+ for i, chunk := range vc {
+ if len(chunk) == 0 {
+ continue
+ }
+ if i == 0 { // subtract pos bytes for first chunk
+ iovecs = append(iovecs, syscall.Iovec{Base: &chunk[pos]})
+ iovecs[len(iovecs)-1].SetLen(len(chunk) - pos)
+ } else {
+ iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
+ iovecs[len(iovecs)-1].SetLen(len(chunk))
+ }
+ if len(iovecs) == maxVec {
+ break
+ }
+ }
+ if len(iovecs) == 0 {
+ break
+ }
+ fd.iovecs = &iovecs // cache
+
+ read, _, e0 := syscall.Syscall(syscall.SYS_READV,
+ uintptr(fd.sysfd),
+ uintptr(unsafe.Pointer(&iovecs[0])),
+ uintptr(len(iovecs)))
+ if read < 0 || read == ^uintptr(0) {
+ read = 0
+ }
+ testHookDidReadv(int(read))
+ n += int64(read)
+ for read > 0 {
+ if len(vc[0])-pos <= int(read) {
+ pos, read = 0, read-uintptr(len(vc[0]))
+ vc = vc[1:]
+ } else {
+ pos, read = int(read), 0
+ }
+ }
+ if e0 == syscall.EAGAIN {
+ if err = fd.pd.waitRead(); err == nil {
+ continue
+ }
+ } else if e0 != 0 {
+ err = syscall.Errno(e0)
+ }
+ if err != nil {
+ break
+ }
+ }
+ if _, ok := err.(syscall.Errno); ok {
+ err = os.NewSyscallError("readv", err)
+ }
+ return n, err
+}

--
https://go-review.googlesource.com/30102

Ian Lance Taylor (Gerrit)

unread,
Oct 3, 2016, 2:56:49 PM10/3/16
to Justin Nuß, golang-co...@googlegroups.com
Ian Lance Taylor has posted comments on this change.

net: implement readv/io.ReaderFrom for Buffers

Patch Set 1:

In a language like Go readv seems much less interesting than writev. It
seems easy enough to read into a slice and then use slice expressions to
pass around different versions to different places.

Do you have an application for readv?

--
https://go-review.googlesource.com/30102
Gerrit-HasComments: No

Justin Nuß (Gerrit)

unread,
Oct 3, 2016, 3:05:21 PM10/3/16
to golang-co...@googlegroups.com
Justin Nuß has posted comments on this change.

net: implement readv/io.ReaderFrom for Buffers

Patch Set 1:

> Patch Set 1:

> In a language like Go readv seems much less interesting than writev. It
> seems easy enough to read into a slice and then use slice expressions to
> pass around different versions to different places.

> Do you have an application for readv?

I agree and I don't currently have any real applications for this. I
implemented this mostly for fun and to see what performance I could get out
of this and then mailed it in case anyone would like to have this (and also
because I thought this would be better than just letting the code rot on my
disk)

--
https://go-review.googlesource.com/30102
Gerrit-Reviewer: Justin Nuß <nuss....@gmail.com>
Gerrit-HasComments: No

Ian Lance Taylor (Gerrit)

unread,
Oct 3, 2016, 3:10:59 PM10/3/16
to Justin Nuß, golang-co...@googlegroups.com
Ian Lance Taylor has abandoned this change. (
https://go-review.googlesource.com/30102 )

Change subject: net: implement readv/io.ReaderFrom for Buffers
......................................................................


Abandoned

Abandoning until there is an application.
Reply all
Reply to author
Forward
0 new messages