Channel from io.Reader

4,538 views
Skip to first unread message

Tarmigan

unread,
Mar 8, 2011, 1:56:37 PM3/8/11
to golang-nuts
Hi,

I often want to wait for data from multiple io.Readers or a mix of
io.Readers and channels. The go way to do this seems to do a select
on a set of channels which means I need to convert an io.Reader to a
channel. Are there any helper functions to do this?

Here's a snippet that I find myself using:

func byteReader(r io.Reader, size int) (<-chan []byte, <-chan
os.Error) {
if size <= 0 {
size = 2048
}

ch := make(chan([]byte))
errCh := make(chan(os.Error))

go func() {
for {
buf := make([]byte, size)
s := 0
inner:
for {
n, err := r.Read(buf[s:])
if n > 0 {
ch <- buf[s:s+n]
s += n
}
if err != nil {
errCh <- err
return
}
if s >= len(buf) {
break inner;
}
}
}
}()

return ch, errCh
}



Would something like that be acceptable in io/ioutils?

Thanks,
Tarmigan

David Symonds

unread,
Mar 8, 2011, 1:57:57 PM3/8/11
to Tarmigan, golang-nuts
On Tue, Mar 8, 2011 at 10:56 AM, Tarmigan <tarm...@gmail.com> wrote:

> I often want to wait for data from multiple io.Readers or a mix of
> io.Readers and channels.  The go way to do this seems to do a select
> on a set of channels which means I need to convert an io.Reader to a
> channel.  Are there any helper functions to do this?

If it's just multiple io.Readers, try io.MultiReader.


Dave.

Tarmigan

unread,
Mar 8, 2011, 2:07:56 PM3/8/11
to golang-nuts
On Mar 8, 10:57 am, David Symonds <dsymo...@golang.org> wrote:
> On Tue, Mar 8, 2011 at 10:56 AM, Tarmigan <tarmi...@gmail.com> wrote:
> > I often want to wait for data from multiple io.Readers or a mix of
> > io.Readers and channels.  The go way to do this seems to do a select
> > on a set of channels which means I need to convert an io.Reader to a
> > channel.  Are there any helper functions to do this?
>
> If it's just multiple io.Readers, try io.MultiReader.

Thanks, I had forgotten about that.

Even io.MultiReader doesn't do what I am interested in though because
it reads all of the readers sequentially. For this application, I
want to read them simultaneously (think network io.Readers instead of
file io.Readers).

Thanks,
Tarmigan

Florian Weimer

unread,
Mar 8, 2011, 2:40:12 PM3/8/11
to Tarmigan, golang-nuts
> Even io.MultiReader doesn't do what I am interested in though because
> it reads all of the readers sequentially. For this application, I
> want to read them simultaneously (think network io.Readers instead of
> file io.Readers).

The io.Reader interface is geared towards streams of bytes, and it
will be difficult to interleave them with useful results. What kinds
of data are you trying to process?

Tarmigan

unread,
Mar 8, 2011, 3:02:14 PM3/8/11
to golang-nuts
Agreed. My data could not be interleaved.

For a bit more context, the similarities in the times I have needed
this are when I have been muxing or demuxing a set of data streams (a
server type application for example). For example in one application
data was read from a single io.ReadWriteCloser and sent on to several
io.ReadWriters. All of the io.ReadWriters could then send data back
to the single io.ReadWriteCloser. I could potentially have done this
with a bunch of independent goroutines that each did an io.Copy or
something, but because no one goroutine owns the single
io.ReadWriteCloser that would pose problems with intermixed writing
(same problem as you mentioned with the reading) and with error
handling/closing/reconnecting.

To be clear, my program is working, but I have found myself using this
same pattern a few times and I am wondering if a) there is a different
way I should be doing it or b) if converting from an io.Reader to a
set of channels would be useful to other people too.

Thanks,
Tarmigan

roman.m...@googlemail.com

unread,
Jan 25, 2019, 7:44:28 PM1/25/19
to golang-nuts
Hello Tarmigan,

I am new to go and I find myself asking the same question.
Did you come to a conclusion about this?

Many thanks,
Roman

simon place

unread,
Feb 3, 2019, 3:52:54 PM2/3/19
to golang-nuts
seems to me...

channels are for language-handled concurrency which is able to then, transparently, be parallel, readers are a single threaded thing that mimics concurrency, they do it explicitly.

so you need to 'drive' the readers by calling them all in a loop, handling any none zero length, the channels can be 'selected' because the language handles calling the right piece of code as other, possible threaded code, drops something on a channels.

roman.m...@googlemail.com

unread,
Feb 8, 2019, 5:52:15 PM2/8/19
to golang-nuts
Hello Simon,

thanks for the reply, I think I got this now.
Here is an example that I am working on. I thought initially one could make the echo function listen to a 'close-channel' at the same time as reading from the connection. Obviously this is not possible. Is this example a good/common way to address such a question?

package main

import (
"io"
"log"
"net"
"os"
"os/signal"
"syscall"
)

type myconn struct {
net.Conn
enforced bool
}

func awaitSignal(sigchan <-chan os.Signal, connections map[*myconn]*myconn) {
for {
<-sigchan
log.Println("received signal, closing all connections...")
// you would not do this inside the signal handler usually
for conn := range connections {
conn.enforced = true
conn.Close()
delete(connections, conn)
}
}
}

func echo(conn *myconn) {
     // Echo all incoming data.
io.Copy(conn, conn)
// Shut down the connection.
log.Println("lost connection")
if !conn.enforced {
log.Println("doing cleanup work")
}
    conn.Close()
      return
}

func main() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGHUP)
connections := make(map[*myconn]*myconn)
go awaitSignal(c, connections)
l, err := net.Listen("tcp", ":2000")
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
// Wait for a connection.
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
log.Println("new connection")
newconn := &myconn{conn, false}
connections[newconn] = newconn
go echo(newconn)
}
}

If the connection was closed from inside the code, then the 'enforced' flag is set to true, so that the echo function knows it does not need to perform the cleanup activity.
Thanks!
Roman
Reply all
Reply to author
Forward
0 new messages