best way to send files between goroutines

1,080 views
Skip to first unread message

Brian Slesinsky

unread,
Aug 20, 2012, 9:27:05 PM8/20/12
to golan...@googlegroups.com
I'd like to read from multiple files, with a goroutine on each file for maximum I/O concurrency. It's unclear to me how best to arrange communication and buffering for sending a possibly large file between goroutines. Should I just use a chan []byte or is it possible to use io.Pipe for this? It looks like a naive copy from bufio.Reader to an io.PipeWriter won't do what I want, since it won't be reading when blocked on the pipe, but it should only block on I/O or a full buffer.

It seems like I must have somehow overlooked a one-liner for a buffered pipe.

- Brian

Jesse McNelis

unread,
Aug 20, 2012, 11:51:52 PM8/20/12
to Brian Slesinsky, golan...@googlegroups.com
I'm not sure what you're trying to do.
If you're processing the file in sections across a number of
goroutines then you'd want a chan []byte to queue the sections for
processing.
If you've got lots of goroutines that each want to read there own
sections in turn then you can just pass the *os.File and they can read
from it and pass it on.
If you're want to avoid making too many Read() syscalls then you want
a bufio.Reader to wrap that *os.File.

--
=====================
http://jessta.id.au

Brian Slesinsky

unread,
Aug 21, 2012, 1:40:42 AM8/21/12
to golan...@googlegroups.com, Brian Slesinsky, jes...@jessta.id.au


On Monday, August 20, 2012 8:51:52 PM UTC-7, Jesse McNelis wrote:
On Tue, Aug 21, 2012 at 11:27 AM, Brian Slesinsky <bsles...@gmail.com> wrote:
> I'd like to read from multiple files, with a goroutine on each file for
> maximum I/O concurrency. It's unclear to me how best to arrange
> communication and buffering for sending a possibly large file between
> goroutines. Should I just use a chan []byte or is it possible to use io.Pipe
> for this? It looks like a naive copy from bufio.Reader to an io.PipeWriter
> won't do what I want, since it won't be reading when blocked on the pipe,
> but it should only block on I/O or a full buffer.
>
> It seems like I must have somehow overlooked a one-liner for a buffered
> pipe.

I'm not sure what you're trying to do.

Okay, let me try to make the problem more concrete.

Let's say we want to write cat(1). Of course we don't really need to use concurrency at all; the program could just loop over the filenames and copy each one.

However, suppose that a few of the files in the middle of the list are on very slow network filesystems. Then we probably don't want to open the files sequentially; instead we could start some goroutines and open them all in parallel. Each reader goroutine could open the file, read the data, and send it somehow to a writer goroutine that writes the file contents in the proper order. But there should be some back-pressure so we don't run out of memory.

This might become an interesting optimization problem: how much buffer memory should we allocate to each reader to finish as quickly as possible? But leaving that aside and just using a fixed-size buffer for each input file, I'm just wondering about the most convenient and idiomatic way to stream data between goroutines so that the readers only block for good reason (blocked on I/O or buffer memory).

- Brian

yy

unread,
Aug 21, 2012, 6:08:16 AM8/21/12
to Brian Slesinsky, golan...@googlegroups.com, jes...@jessta.id.au
On 21 August 2012 07:40, Brian Slesinsky <bsles...@gmail.com> wrote:
> Let's say we want to write cat(1). Of course we don't really need to use
> concurrency at all; the program could just loop over the filenames and copy
> each one.
>
> However, suppose that a few of the files in the middle of the list are on
> very slow network filesystems. Then we probably don't want to open the files
> sequentially; instead we could start some goroutines and open them all in
> parallel. Each reader goroutine could open the file, read the data, and send
> it somehow to a writer goroutine that writes the file contents in the proper
> order. But there should be some back-pressure so we don't run out of memory.

If your use case really is like cat, and the output is written to a
io.Writer in an specific order, you don't need to send the data.
Instead, read to a buffer in each goroutine and pass the io.Writer to
the goroutine working on the next file when you are done.

If you want to store the data for later, write it to a buffer. In the
case you know the size of the files, allocate a big buffer for the
result in advance and pass slices of this big buffer to the different
goroutines, to use as less memory as possible.


--
- yiyus || JGL .

Brian Slesinsky

unread,
Aug 21, 2012, 2:51:33 PM8/21/12
to golan...@googlegroups.com, Brian Slesinsky, jes...@jessta.id.au
It's not actually like cat because I'm decompressing the input and compressing the output. It seems like separate goroutines should be doing these things. That's why a buffered pipe (like with Unix) would be useful, and I'm surprised that it's not a one-liner. I suppose a "chan byte" is the closest equivalent, but that seems unlikely to be efficient. I'd rather have something like io.Pipe, but buffered.

- Brian

yy

unread,
Aug 21, 2012, 3:08:20 PM8/21/12
to Brian Slesinsky, golan...@googlegroups.com, jes...@jessta.id.au
On 21 August 2012 20:51, Brian Slesinsky <bsles...@gmail.com> wrote:
> It's not actually like cat because I'm decompressing the input and
> compressing the output.

The decompressors and compressors implement io.Reader and io.Writer,
so it doesn't make any difference.

andrey mirtchovski

unread,
Aug 21, 2012, 3:58:41 PM8/21/12
to golan...@googlegroups.com
> these things. That's why a buffered pipe (like with Unix) would be useful,
> and I'm surprised that it's not a one-liner.

I think it's reasonably simple. Everything being a Reader or a Writer
you can stack the functionality with abandon. If, for example, the
following is a relatively straightforward (unbuffered) Pipe
implementation of a decompress->compress pipeline:

http://play.golang.org/p/z_jxMTZriS

then the fully buffered pipe would be:

http://play.golang.org/p/WZbxXspcFT

(of course you can buffer only the write side of the pipe). Try it
with "gzip < t.go | ./t | gunzip".

In a test with 100MiB compressed file the buffered and unbuffered
versions completed with identical times. Definitely more time is saved
by increasing GOMAXPROCS to 2 than worrying about buffering the pipe
:)

roger peppe

unread,
Aug 21, 2012, 4:08:46 PM8/21/12
to Brian Slesinsky, golan...@googlegroups.com, jes...@jessta.id.au
You could use os.Pipe.

I did actually write something a while ago that can be used quite reasonably as
a buffered pipe. It's reasonably efficient, though I don't know how
its performance would compare
to using an OS pipe.

http://go.pkgdoc.org/code.google.com/p/rog-go/loopback

Brian Slesinsky

unread,
Aug 21, 2012, 6:24:33 PM8/21/12
to roger peppe, golan...@googlegroups.com, jes...@jessta.id.au
os.Pipe is what I was missing. (Easy to miss since it's not actually listed under "P", but under "F" for File.) Cross-references between os.Pipe and io.Pipe would have helped.

- Brian

Reply all
Reply to author
Forward
0 new messages