Streaming buffered copy (proposal-ish)

449 views
Skip to first unread message

Péter Szilágyi

unread,
Jan 28, 2015, 3:18:52 AM1/28/15
to golang-nuts
Hi all,

  I've hit an interesting problem, and I was a bit surprised that there isn't anything in the standard libs that could have solved it easily. It isn't too complicated to write, but it isn't trivial either. If by any chance it's already in the libs, please enlighten me :), otherwise would anyone be interested in including it?

  The thing I was solving is fairly trivial: download a file from the internet, and stream-upload it somewhere else (Google Cloud Storage specifically, but it doesn't really matter). The naive solution is pretty straightforward: wire together the downloader's reader with the uploader's writer, and voila, magic... until you look at the network usage: x1 secs download, y1 secs upload, x2 secs download, y2 secs upload.

  The problem is, that the uploader will read some fixed amount of data, buffer it up and then start the upload. But while the upload is in progress, it doesn't read any more data from the reader, essentially pausing it until it finishes. By that time the reader could have filled up the next buffer to send, but alas, it was blocked so it didn't download anything.

  Note, buffered readers/writers won't really solve this issue, since even though they have the buffers in place to store arriving data, those buffers cannot be simultaneously filled and flushed too. As far as I figured it out, the only way to solve this streaming problem is to have to concurrent go routines, one reading and another writing, with a data buffer in between.

  If there is indeed no such thing, would it be worthwhile to add something like?

bufio.Copy(dst io.Writer, src io.Reader, buffer int) (written int64, err error)

  Which essentially does what io.Copy does, but starts up a separate writer go routine and passes everything through a user definable buffer. Then it could handle both data bursts as well batching readers/writers.

  Comments/feedback? :)

Cheers,
  Peter

Jan Mercl

unread,
Jan 28, 2015, 3:41:02 AM1/28/15
to Péter Szilágyi, golang-nuts
On Wed Jan 28 2015 at 9:18:47 Péter Szilágyi <pet...@gmail.com> wrote:

> until you look at the network usage: x1 secs download, y1 secs upload,
> x2 secs download, y2 secs upload.

Run the upload and download in separate goroutines, ie. concurrently. Feed the upload reading from a channel the download writes to chunks of data.

It's simple. Wrapping it into a stdlib fn is not worth of it, IMO. Additionally, such wrapper has no chance to handle network errors in any intelligent/task specific way* which the roll-your-own dozen-or-so lines can.

(*) In this case like reestablishing a broken network connection and continuing from the last good R/W position etc on either/both side(s) (DL/UL).

-j

Péter Szilágyi

unread,
Jan 28, 2015, 3:45:25 AM1/28/15
to Jan Mercl, golang-nuts
That is indeed my solution currently, that's why I said that it's not *too* complicated to write, but I still miss the functionality from the libs (though this is a personal opinion, so I can accept not including it). The dozen-or-so is a hundred-or-so rather, but your point still stands :)

Lastly, of course you could always add specialized intelligent error handling, but it would be nice to have an easy/dumb way of doing it and work from there if it's not enough (yes, you could argue, that the dump way is io.Copy :P).

Péter Szilágyi

unread,
Jan 28, 2015, 3:47:06 AM1/28/15
to Jan Mercl, golang-nuts
PS: Sorry, the hundred-or-so I mentioned also contains some specific error handling and uploader logic, so the truth is probably somewhere in between :)

roger peppe

unread,
Jan 28, 2015, 4:21:32 AM1/28/15
to Péter Szilágyi, golang-nuts
One simple solution:

func bufferedCopy(w io.Writer, r io.Reader) (int64, error) {
pr, pw, err := os.Pipe()
if err != nil {
return 0, err
}
done := make(chan error)
go func() {
_, err := io.Copy(pw, r)
pw.Close()
done <- err
}()
n, err0 := io.Copy(w, pr)
err1 := <-done
if err0 != nil {
return n, err0
}
return n, err1
}

This would be better with a Go-implemented buffered pipe
(I'm sure I saw an implementation recently, but I can't remember
where it was).
> --
> You received this message because you are subscribed to the Google Groups
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to golang-nuts...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Péter Szilágyi

unread,
Jan 28, 2015, 5:20:53 AM1/28/15
to roger peppe, golang-nuts
Indeed, but since the Linux pipe buffers are hard coded to 64K, it will block the same way, just will have a 64K slack. I need buffers in the rage on 64MB to keep both downloader and uploader happy, which cannot be set using classical pipes.

Donovan Hide

unread,
Jan 28, 2015, 7:53:14 AM1/28/15
to Péter Szilágyi, roger peppe, golang-nuts
Is this not the intended use case of io.Pipe()?
http://golang.org/pkg/io/#Pipe

Nick Craig-Wood

unread,
Jan 28, 2015, 8:10:29 AM1/28/15
to Péter Szilágyi, golang-nuts
On 28/01/15 08:18, Péter Szilágyi wrote:
> I've hit an interesting problem, and I was a bit surprised that there
> isn't anything in the standard libs that could have solved it easily. It
> isn't too complicated to write, but it isn't trivial either. If by any
> chance it's already in the libs, please enlighten me :), otherwise would
> anyone be interested in including it?
>
> The thing I was solving is fairly trivial: download a file from the
> internet, and stream-upload it somewhere else (Google Cloud Storage
> specifically, but it doesn't really matter). The naive solution is
> pretty straightforward: wire together the downloader's reader with the
> uploader's writer, and voila, magic... until you look at the network
> usage: x1 secs download, y1 secs upload, x2 secs download, y2 secs upload.

I came across exactly the same problem yesterday!

I was reading, gzipping and uploading, but I had exactly the same
problem - 10 seconds of 100% CPU for gzipping into a 64 MB block, then
20 seconds of upload at 0% CPU.

Here was my solution

https://github.com/Memset/snapshot-manager/blob/master/snapshot/snapshot.go#L166

Which is an annoying amount of code. The error handling is tricky too.

> bufio.Copy(dst io.Writer, src io.Reader, buffer int) (written int64, err
> error)
>
> Which essentially does what io.Copy does, but starts up a separate
> writer go routine and passes everything through a user definable buffer.
> Then it could handle both data bursts as well batching readers/writers.

I like it!

--
Nick Craig-Wood <ni...@craig-wood.com> -- http://www.craig-wood.com/nick

Péter Szilágyi

unread,
Jan 28, 2015, 1:40:00 PM1/28/15
to Nick Craig-Wood, golang-nuts
An initial attempt at getting a buffered concurrent copy. It's a tad more than a dozen lines, more like 200 :) I haven't tested it too extensively, nor cleaned it up too much, just a rather quick hack together to see if the concept works.


Opinions? :)

Péter Szilágyi

unread,
Jan 28, 2015, 2:04:32 PM1/28/15
to Nick Craig-Wood, golang-nuts

Just to reply to Donovan, pipes are not buffered. That's the hard part of the equation, not the threading.

Matt Harden

unread,
Jan 28, 2015, 9:09:11 PM1/28/15
to Péter Szilágyi, Nick Craig-Wood, golang-nuts
Why not just run two io.Copy() in two goroutines - the first copying from a bufio.Reader to an io.PipeWriter, and the second copying from an io.PipeReader to a bufio.Writer? This way you get two buffers so that reading and writing can occur simultaneously, and data will be copied from one buffer to the other whenever both buffers need flushing. Seems pretty simple; what am I missing?

Matt Harden

unread,
Jan 28, 2015, 9:27:51 PM1/28/15
to Péter Szilágyi, Nick Craig-Wood, golang-nuts
http://play.golang.org/p/owO96v5See demonstrates what I was saying.

To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscribe@googlegroups.com.

Matt Harden

unread,
Jan 28, 2015, 9:35:56 PM1/28/15
to Péter Szilágyi, Nick Craig-Wood, golang-nuts
This one actually compiles http://play.golang.org/p/BivUlAT7_0.

Péter Szilágyi

unread,
Jan 29, 2015, 4:50:28 AM1/29/15
to Matt Harden, Nick Craig-Wood, golang-nuts
Hi Matt,

  It's a very fancy solution, but unfortunately it doesn't work, as it encounters exactly the same problems as all previous suggestions: in Go, all existing readers and writers are synchronous and blocking. Meaning, that even if a reader or writer is buffered, the buffer is only filled/flushed when it's empty/full. The issue with all solutions is that you need to place the buffer in between the two threads, not between a thread and the reader/writer endpoint. And this is what no solution apart from my proposal solves (Jan's solution of course also does it, just requires a bit more locking and gc).

  However, given that it seems a bit hard to grasp the exact issue, I've decided to put together a more formal proposal, with exact code snippets simulating the issue and verification that the issue indeed does persist in all above short/easy solutions:

$ shootout

Stable input, stable output:
        io.Copy:    3.38504052s  10.666667 mbps.
 [!] bufio.Copy:    3.37012021s  10.666667 mbps.
rogerpeppe.Copy:   3.414476536s  10.666667 mbps.
mattharden.Copy:   6.368713887s   5.333333 mbps.

Stable input, bursty output:
        io.Copy:   6.251177787s   5.333333 mbps.
 [!] bufio.Copy:   3.387935437s  10.666667 mbps.
rogerpeppe.Copy:    5.98428305s   6.400000 mbps.
mattharden.Copy:   6.250739081s   5.333333 mbps.

Bursty input, stable output:
        io.Copy:    6.25889809s   5.333333 mbps.
 [!] bufio.Copy:   3.347354357s  10.666667 mbps.
rogerpeppe.Copy:   5.999921216s   6.400000 mbps.
mattharden.Copy:   3.473998412s  10.666667 mbps.

There are three scenarios:
  • If both reader and writer produce/consume in the same speed, any solution works.
  • If the reader is stable, but the writer processes in bursts, then all solutions apart from my proposal fail, as they serialize reading and writing.
  • If the reader produces in bursts and the writer consumes stably, then a buffered input can handle it (i.e. Matt's solution, not because of the threads, but because bufio.Reader).
I'll write a formal proposal thread too to this mailing list, as indeed it seems a non-trivial task, even though it might first appear to be so.

Input most welcome,
  Peter

roger peppe

unread,
Jan 29, 2015, 9:46:19 AM1/29/15
to Péter Szilágyi, Matt Harden, Nick Craig-Wood, golang-nuts
A buffered pipe implementation isn't hard to put together.
How about this, for example (I have only given it the
barest amount of testing)?

http://play.golang.org/p/PW4bPzaEEN

If you think that fits your needs, I could work it up into
an actual published package with some decent tests.

A further possibility, at the expense of a little computation time,
would be to factor out the ring buffer piece so that it
is possible to use file storage (for example) as backing.
It wouldn't be much more work.

cheers,
rog.
>>>>> an email to golang-nuts...@googlegroups.com.
>>>>> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to golang-nuts...@googlegroups.com.

Nick Craig-Wood

unread,
Jan 29, 2015, 9:53:22 AM1/29/15
to Péter Szilágyi, Matt Harden, golang-nuts
On 29/01/15 09:50, Péter Szilágyi wrote:
> It's a very fancy solution, but unfortunately it doesn't work, as it
> encounters exactly the same problems as all previous suggestions: in Go,
> all existing readers and writers are synchronous and blocking. Meaning,
> that even if a reader or writer is buffered, the buffer is only
> filled/flushed when it's empty/full. The issue with all solutions is
> that you need to place the buffer in between the two threads, not
> between a thread and the reader/writer endpoint. And this is what no
> solution apart from my proposal solves (Jan's solution of course also
> does it, just requires a bit more locking and gc).
>
> However, given that it seems a bit hard to grasp the exact issue, I've
> decided to put together a more formal proposal, with exact code snippets
> simulating the issue and verification that the issue indeed does persist
> in all above short/easy solutions:
>
> $ go get github.com/karalabe/bufioprop/shootout

Excellent example code.

I've just read through your bufio.Copy code:

The error handling is potentially racy. You are writing the err
variable from two different go routines.

It is quite complicated - I have a feeling it could be simplified with
more buffers - I'll try to see if I can do better within your nice
framework!

Nick Craig-Wood

unread,
Jan 29, 2015, 11:58:39 AM1/29/15
to Péter Szilágyi, golang-nuts
On 29/01/15 14:53, Nick Craig-Wood wrote:
> It is quite complicated - I have a feeling it could be simplified with
> more buffers - I'll try to see if I can do better within your nice
> framework!

I've sent my attempt through as a pull request

https://github.com/karalabe/bufioprop/pull/5

Bakul Shah

unread,
Jan 29, 2015, 5:17:09 PM1/29/15
to Péter Szilágyi, golang-nuts
What about something like this: http://play.golang.org/p/g0-g6x-Kp_

The idea is to allocate a large buffer and read into it in
smaller chunks (at buffersize/N boundaries). For each read
chunk the reader sends its size in a chan. The writer waits
until it gets a valid count and writes it out and then sends
back an ack on another chan. The reader can be upto N reads
ahead of the writer. On a recent MBP I get about 2.9GB/s with

dd < /dev/zero bs=1m count=10000 | bufcopy > /dev/null

The sample interface in this quick hack is:

func bufcopy(w io.Writer, r io.Reader) error

Shouldn't be hard to turn it into what you want.

[Aside: The current IO interfaces force extra copying.
Copying could've been made faster when reading from a file or
writing to one if read(n) returned a filled-in buffer &/or
writes were done lazily -- that is, instead of
writer.write(buf) one does buf := writer.alloc_buf(n) to grab
a buffer to write into. Ultimately followed by a sync. See
"The Alloc Stream facility" paper by Krieger, Strum and
Unrau.]

Péter Szilágyi

unread,
Jan 30, 2015, 3:18:49 AM1/30/15
to Bakul Shah, golang-nuts
Hey Bakul,

  I've added your implementation to the shootout, but it's currently panicing. Please investigate,

To the others too in general, I've openen a more formal proposal thread (Subject: [proposal] bufio.Copy) in which various implementations and discussions are going. I think we should move from this thread to that one to keep it under a single thread of discussion and not multiplex between two.

Cheers :),
   Peter

Bakul Shah

unread,
Jan 30, 2015, 4:35:26 AM1/30/15
to Péter Szilágyi, golang-nuts
On Fri, 30 Jan 2015 10:18:38 +0200 =?UTF-8?B?UMOpdGVyIFN6aWzDoWd5aQ==?= <pet...@gmail.com> wrote:
>
> Hey Bakul,
>
> I've added your implementation to the shootout, but it's currently
> panicing. Please investigate,

Can you Show me your shootout code that panics? I ran my
version on osx, freebsd and linux with no trouble. But I have
only tried this with os.Stdin, os.Stdout.

Péter Szilágyi

unread,
Jan 30, 2015, 4:42:27 AM1/30/15
to Bakul Shah, golang-nuts
The code is at https://github.com/karalabe/bufioprop

To run the shootout:
Please see this thread for details: https://groups.google.com/forum/#!topic/golang-nuts/Mwn9buVnLmY (also if possible, let's continue there)

Donovan Hide

unread,
Jan 30, 2015, 8:02:54 AM1/30/15
to Péter Szilágyi, Bakul Shah, golang-nuts
Bit off a tangential suggestion, but if your HTTP source server supports byte range requests you could parallelise your uploads to Google cloud storage by dividing your initial GET requests into 32 separate partial requests and creating a composite object after all the uploads have completed.

https://cloud.google.com/storage/docs/composite-objects#_Uploads
Reply all
Reply to author
Forward
0 new messages