goroutines and os.TempFile

181 views
Skip to first unread message

jimr

unread,
Nov 10, 2010, 8:19:56 AM11/10/10
to golang-nuts
Folks,

I've got an application in mind that needs to read some 40 million
lines of data from a downstream server, and then process those lines
of data.

Ideally, if one has enough cores (and GOMAXPROCS set to more than
one), I'd like to be able to have one thread buffering data from the
downstream server, while N additional threads start processing the
data.

Because of the size of the data set, I'm wondering if it's safe to try
an approach where I have two goroutines writing/reading from a
temporary file, where the temporary file is acting as a buffer.

Are there gotchas w/re to opening a temp file (i.e., os.TempFile) and
trying to share it across two goroutines? Is that a bad idea? If it
is, do I have any alternative to a lock-step (a) read all data from
server, then (b) kick of goroutines to process data?

I'm wondering if I'm going to need to use channels to communicate
the count of the lines of data written to the temp file, allowing the
2nd
goroutine to block based on input from that channel.

One of my concerns is that the read from the server might end up
timing out if I don't process it quickly enough, but at the same time
it'd be really nice if the client didn't have to wait on the *entire*
response before starting to process data.

JIm

SnakE

unread,
Nov 10, 2010, 9:17:03 AM11/10/10
to jimr, golang-nuts
2010/11/10 jimr <jim.ro...@gmail.com>

Because of the size of the data set, I'm wondering if it's safe to try
an approach where I have two goroutines writing/reading from a
temporary file, where the temporary file is acting as a buffer.

Are there gotchas w/re to opening a temp file (i.e., os.TempFile) and
trying to share it across two goroutines?  Is that a bad idea?  If it
is, do I have any alternative to a lock-step (a) read all data from
server, then (b) kick of goroutines to process data?

Everything I say here is an educated IMHO, sorry.

It's usually safe to open and manipulate a file even across multiple OS processes, especially in *NIX where a file is not exclusively locked by default.

I imagine the following architecture.

Goroutine 1: opens the temporary file for writing.  Reads data from server.  Puts data into the file, one at a time.  For each line, increment a semaphore.

Goroutine 2: opens the same file for reading.  It must be a different file descriptor.  Either open by name again, or duplicate the handle.  This goroutine decrements the semaphore, then reads a line from the buffer, and spawns a worker goroutine to process input.  Maybe it limits number of worker goroutines somehow.

Worker goroutines: process data, do not interact with the temporary file.

The hard part is a semaphore here.  If you're not familiar with, it's a sync object based on a counter.  Any thread can increment it.  Any thread can decrement it.  But if its value is zero, decrementing it causes the decrementing thread to block until somebody increments it.  In your case the counter will be a number of unprocessed lines in the temp file.  If the goroutine 1 lags behind the goroutine 2, the latter blocks on the semaphore and waits until new lines are put into the temp file.

Implementing a semaphore using channels could be an interesting task.  Or if you're not in the mood you can search this list--I'm sure there were semaphore implementations somewhere around.

SnakE

unread,
Nov 10, 2010, 10:49:00 AM11/10/10
to jimr, golang-nuts
2010/11/10 SnakE <snake...@gmail.com>

Implementing a semaphore using channels could be an interesting task.  Or if you're not in the mood you can search this list--I'm sure there were semaphore implementations somewhere around.

Hmmm, the only semaphore floating around is the one using a channel of size MaxSemaphoreCount.  If downloading is much faster than processing, a channel for 40 million booleans is probably too much.

Here's my implementation.  It even seems to work.  I wonder if something like this should be in the sync package.

package semaphore

type Semaphore struct {
count chan int
wait chan bool
}

func NewSemaphore() *Semaphore {
self := new(Semaphore)
self.count = make(chan int, 1)
self.count <- 0
self.wait = make(chan bool, 1)
return self
}

func (self *Semaphore) Inc() {
self.count <- <-self.count + 1
_ = self.wait <- true
}

func (self *Semaphore) Dec() {
for {
if c := <-self.count; c > 0 {
self.count <- c - 1
return
} else {
self.count <- 0
<-self.wait
}
}
}

André Moraes

unread,
Nov 10, 2010, 11:56:42 AM11/10/10
to golang-nuts
Maybe you can use 3 channels and only a few goroutines.

1- One goroutine downloads the data from the server
1.1- The same goroutine keep a list of workers
1.2- When new data come from the server you check if there are any worker idle, if there, you pass the data to the worker and a channel so when the work is done he can signal that he finished the work.
1.3- If there are no workers, you write the data to a temporary file

When the workers finish their jobs, you continue to dispatch the data from the server to them. After that check if the temporary file has unprocessed data, if it have, read the file and send to the "Worker".

This way you can process the data when it arrives and at the end, any data that could not be processed "on-the-fly" will be read from the file. If the data can be processed in an order different from the arrival this scheme is much easier than the semaphore.


--
André Moraes
http://andredevchannel.blogspot.com/

Daniel Smith

unread,
Nov 10, 2010, 12:19:45 PM11/10/10
to André Moraes, golang-nuts
If order does matter, you could still do this, just write each incoming batch of data to its own temp file and send the file name down the channel to the workers.

This may be impractical if you get many tiny batches instead of a few big ones. In that case, I'd add one more goroutine:

The first goroutine downloads the data and writes to the end of the temp file. It sends the file offset & length of the data down a channel to the second goroutine.
The second go reads from the temp file and distributes the data to worker goroutines via a second channel.

No need to keep a list of workers or anything as far as I can see, the second goroutine just keeps that second channel full. Once it finishes processing the temp file, it closes the channel. Worker goroutines detect this and write to a final channel to indicate they've exited. Once you've read as many values from this channel as you had workers, it's safe to exit the program.

goroutines & channels make everything easy once again. No semaphores or locks should be necessary if processing is really independent. If your db or whatever is not thread safe you can send the insert statements (or whatever you make with this data) down yet another channel to another goroutine that does the inserts.

2010/11/10 André Moraes <and...@gmail.com>

André Moraes

unread,
Nov 10, 2010, 12:35:11 PM11/10/10
to golang-nuts
No need to keep a list of workers or anything as far as I can see

The list is required only if he had limitation on the number of workers or if he wants to reuse the workers (maybe each worker when started open a connection to the database).

If none of the points above is true, then the list is really unnecessary
 

Daniel Smith

unread,
Nov 10, 2010, 12:50:48 PM11/10/10
to André Moraes, golang-nuts
Ah, when I do things like this I start N worker goroutines, which each read from (the same) channel in a loop. No need to keep track of them, except to verify that they've all shut down before you quit

2010/11/10 André Moraes <and...@gmail.com>

SnakE

unread,
Nov 10, 2010, 12:51:35 PM11/10/10
to dan...@lukenine45.net, André Moraes, golang-nuts
2010/11/10 Daniel Smith <dan...@lukenine45.net>

If order does matter, you could still do this, just write each incoming batch of data to its own temp file and send the file name down the channel to the workers.

This may be impractical if you get many tiny batches instead of a few big ones. In that case, I'd add one more goroutine:

The first goroutine downloads the data and writes to the end of the temp file. It sends the file offset & length of the data down a channel to the second goroutine.
The second go reads from the temp file and distributes the data to worker goroutines via a second channel.

From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing.  You'll end up with tens of millions of offset/length pairs in your channel.  I'll be surprised if it works.

Daniel Smith

unread,
Nov 10, 2010, 12:54:25 PM11/10/10
to SnakE, André Moraes, golang-nuts


On Wed, Nov 10, 2010 at 11:51 AM, SnakE <snake...@gmail.com> wrote:
From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing.  You'll end up with tens of millions of offset/length pairs in your channel.  I'll be surprised if it works.

Missed that; well, the first channel is just a convenience. First goroutine writes lines at the end, second one reads starting from the beginning. Without the channel it has to pay some attention to what it is reading but that's not a big deal. The rest of the design still works.

SnakE

unread,
Nov 10, 2010, 1:59:08 PM11/10/10
to dan...@lukenine45.net, André Moraes, golang-nuts
2010/11/10 Daniel Smith <dan...@lukenine45.net>



On Wed, Nov 10, 2010 at 11:51 AM, SnakE <snake...@gmail.com> wrote:
From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing.  You'll end up with tens of millions of offset/length pairs in your channel.  I'll be surprised if it works.

Missed that; well, the first channel is just a convenience. First goroutine writes lines at the end, second one reads starting from the beginning. Without the channel it has to pay some attention to what it is reading but that's not a big deal. The rest of the design still works.

Please explain how to read new lines of data from a huge file as they arrive without sitting in a busy loop.

Daniel Smith

unread,
Nov 10, 2010, 2:04:09 PM11/10/10
to SnakE, André Moraes, golang-nuts
Processing is super slow compared to writing to the file, so that won't be a problem-- you'll never catch up and have to wait.

SnakE

unread,
Nov 10, 2010, 2:15:37 PM11/10/10
to dan...@lukenine45.net, André Moraes, golang-nuts
2010/11/10 Daniel Smith <dan...@lukenine45.net>
On Wed, Nov 10, 2010 at 12:59 PM, SnakE <snake...@gmail.com> wrote:
2010/11/10 Daniel Smith <dan...@lukenine45.net>


On Wed, Nov 10, 2010 at 11:51 AM, SnakE <snake...@gmail.com> wrote:
From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing.  You'll end up with tens of millions of offset/length pairs in your channel.  I'll be surprised if it works.

Missed that; well, the first channel is just a convenience. First goroutine writes lines at the end, second one reads starting from the beginning. Without the channel it has to pay some attention to what it is reading but that's not a big deal. The rest of the design still works.

Please explain how to read new lines of data from a huge file as they arrive without sitting in a busy loop.

Processing is super slow compared to writing to the file, so that won't be a problem-- you'll never catch up and have to wait.
 
You never know.  It's remote.  The speed may drop at any moment for unknown reasons.  I wouldn't want my processing to stop working prematurely just because the connection slowed down.

jimr

unread,
Nov 10, 2010, 10:28:23 PM11/10/10
to golang-nuts
On Nov 10, 9:19 am, Daniel Smith <dan...@lukenine45.net> wrote:
>
> No need to keep a list of workers or anything as far as I can see, the
> second goroutine just keeps that second channel full. Once it finishes
> processing the temp file, it closes the channel. Worker goroutines detect
> this and write to a final channel to indicate they've exited. Once you've
> read as many values from this channel as you had workers, it's safe to exit
> the program.
>
> goroutines & channels make everything easy once again. No semaphores or
> locks should be necessary if processing is really independent. If your db or
> whatever is not thread safe you can send the insert statements (or whatever
> you make with this data) down yet another channel to another goroutine that
> does the inserts.

I appreciate the feedback from everyone. Using the advice here, I've
got something that I think mostly follows the advice above.

I'm not positive I got the control flow correct -- I notice the
goroutine that is reading from the server and writing to the temp file
ends up taking over the program. Only when it's finished do the other
goroutines start to do their thing.

I'm having a hard time knowing for sure if this is simply an artifact
of how the scheduler works (the read from the server is quite fast),
or if I've got a logical mistake in how I set up the routines.

When I sprinkle some fmt.Println debugging into the code (example code
listed at the end of this message), I can see the program print out
that it's got to the point that it ranging over the queued sets of
data, and I can then I can see that items are being added to the
queues as data is read from the server, and that they are being
dispatched into the goroutines that process the blocks.

What I don't see happening is any actual processing of the data until
the final block has been dispatched. E.g., processing a length set of
data (2 or 4 core machines w/ GOMAXPROCS set to 2 or 4) produces
output like this:

reading block queue...
queuing a block
dispatched block: # 1
queuing a block
dispatched block: # 2
queuing a block
dispatched block: # 3
queuing a block
dispatched block: # 4
queuing a block
dispatched block: # 5
queuing a block
dispatched block: # 6
queuing a block
dispatched block: # 7
queuing a block
dispatched block: # 8
queuing a block
dispatched block: # 9
queuing a block
dispatched block: # 10
queuing a block
dispatched block: # 11
queuing a block
dispatched block: # 12
queuing a block
dispatched block: # 13
queuing final block
processing /pnas/100/9/5022/rev/correction.6ba6618b945d87d7.atom
dispatched block: # 14
processing pnas/104/15/6182.figures-only.html
processing a block 14
... from here it starts to actually do the bulk of the processing ...

Any additional input from folks, including any advice on where they
think the code could be simplified or where they see mistakes in how I
put it together, would be most appreciated.

In the course of adding this new logic, the code went from an elegant
38 lines to an unwieldy 115 line. I'll probably be able to refactor
it a little bit more, but criticism from those of you who have more
experience in go would be welcomed.


--Jim


Example code snippet:

...
type pathRange struct {
offset int64
length int64
}
...

// List all documents matching glob in the specified forests.
func (s *Server) Paths(glob string, forests []string) (paths chan
string, errors chan os.Error) {
paths, errors = make(chan string), make(chan os.Error)
prange := make(chan pathRange)

tempf, err := ioutil.TempFile("/tmp", "xdbc.paths.")
if err != nil {
errors <- err
return
}

// write items to tempf, signaling prange channel when a chunk
// of work is available.
wbuf := bufio.NewWriter(tempf)
go func() {
r := pathRange{int64(0), int64(0)}

defer func() {
if r.length > r.offset {
wbuf.Flush()
fmt.Println("queuing final block")
prange <- pathRange{r.offset, r.length}
}
close(prange)
tempf.Close()
}()

for i := range forests {
q := NewQuery(
listForestPaths,
NewParam("glob", glob),
NewParam("forest", forests[i]))

items, itemserr := s.EvalItr(q)
for {
select {
case err := <-itemserr:
if err != nil {
errors <- err
return
}
case item := <-items:
if closed(items) {
return
}

n, err := wbuf.Write(item.Bytes)
if err != nil {
errors <- err
return
}

if n > 0 {
if r.offset == r.length && r.offset != int64(0) {
r.offset++
}
r.length += int64(n)
if (r.length - r.offset) > PR_SEND_LEN {
wbuf.Flush()
fmt.Println("queuing a block")
prange <- pathRange{r.offset, r.length}
r.offset = r.length
}
}
}
}
}
}()

fmt.Println("reading block queue...")

// queue each pathRange for line processing.
pending, done := 0, make(chan int)
for pr := range prange {
pending++
fmt.Println("dispatched block: #", pending)
go func(pr pathRange) {
fmt.Println("processing a block")
f, err := os.Open(tempf.Name(), os.O_RDONLY, 0)
if err != nil {
errors <- err
return
}
defer f.Close()

rbuf := bufio.NewReader(io.Reader(io.NewSectionReader(f, pr.offset,
pr.length)))
for {
str, err := rbuf.ReadString('\n')
if n := len(str); n != 0 {
paths <- str[0:n-1]
}
if err != nil {
if err != os.EOF {
errors <- err
}
break
}
}
done <- 1
}(pr)
}

// close paths and remove tempf when all of the prange derived
// goroutines have signaled that they are done.
go func() {
for _ = range done {
if pending--; pending == 0 {
close(done)
close(paths)
os.Remove(tempf.Name())
}
}
}()

return
}

roger peppe

unread,
Nov 11, 2010, 7:06:05 AM11/11/10
to jimr, golang-nuts
i thought this was an interesting little problem.

i wrote a package that implements the buffering part.
http://code.google.com/p/rog-go/source/browse/breader/breader.go

then you can write the actual processing code in
a more natural style, something like this, perhaps:

package main
import (
"rog-go.googlecode.com/hg/breader"
"os"
"log"
"bufio"
"fmt"
)

const nProcs = 4

func main() {
br, err := breader.NewReader(os.Stdin, "/tmp", "breader")
if err != nil {
log.Exitln("newreader", err)
}
lineIn := make(chan string)
lineOut := make(chan string)
procDone := make(chan bool)
outDone := make(chan bool)

go output(lineOut, outDone)

for i := 0; i < nProcs; i++ {
go processor(lineIn, lineOut, procDone)
}
bufr := bufio.NewReader(br)
for {
s, err := bufr.ReadString('\n')
if len(s) > 0 {
lineIn <- s
}
if err != nil {
break
}
}
close(lineIn)
for i := 0; i < nProcs; i++ {
<-procDone
}
close(lineOut)
<-outDone
}

func processor(lineIn <-chan string, lineOut chan<-string, done chan<- bool) {
for {
// NB can't use range because there are multiple
readers on the same channel.
s := <-lineIn
if len(s) == 0 {
break
}
// process the line
lineOut <- fmt.Sprintf("%d\n", len(s))
}
done <- true
}

func output(lineOut <-chan string, done chan<- bool) {
for s := range lineOut {
fmt.Print(s)
}
done <- true
}

jimr

unread,
Nov 11, 2010, 9:13:54 AM11/11/10
to golang-nuts
On Nov 11, 4:06 am, roger peppe <rogpe...@gmail.com> wrote:
>
> i wrote a package that implements the buffering part.
> http://code.google.com/p/rog-go/source/browse/breader/breader.go

That's very elegant, thank you very much for sharing!

--Jim

jimr

unread,
Nov 12, 2010, 5:54:17 PM11/12/10
to golang-nuts
On Nov 11, 4:06 am, roger peppe <rogpe...@gmail.com> wrote:
>
> func processor(lineIn <-chan string, lineOut chan<-string, done chan<- bool) {
>         for {
>                 // NB can't use range because there are multiple readers on the same channel.
>                 s := <-lineIn
>                 if len(s) == 0 {
>                         break
>                 }
>                 // process the line
>                 lineOut <- fmt.Sprintf("%d\n", len(s))
>         }
>         done <- true
>
> }

Could someone expand on the warning in this code snippet about using
range when there are multiple readers? I assume a race condition
problem exists, but I don't follow what that is based on documentation
for range:

http://golang.org/doc/go_spec.html#Select_statements

4. For channels, the iteration values produced are the successive
values sent on the channel until the channel is closed; it does
not produce the zero value sent before the channel is closed
(§close and closed).

I'd also like to ask if someone could clarify the interaction between
the code above and the close(lineIn) from main:

bufr := bufio.NewReader(br)
for {
s, err := bufr.ReadString('\n')
if len(s) > 0 {
lineIn <- s
}
if err != nil {
break
}
}
close(lineIn)

If there is small amount of input data, only a few lines, and for some
reason the execution spun off many threads, won't this potentially
cause a panic as many of threads end up testing

s := <-lineIn

after the channel has been closed?

Jim

Rob 'Commander' Pike

unread,
Nov 12, 2010, 6:53:15 PM11/12/10
to jimr, golang-nuts

On Nov 12, 2010, at 2:54 PM, jimr wrote:

> On Nov 11, 4:06 am, roger peppe <rogpe...@gmail.com> wrote:
>>
>> func processor(lineIn <-chan string, lineOut chan<-string, done chan<- bool) {
>> for {
>> // NB can't use range because there are multiple readers on the same channel.
>> s := <-lineIn
>> if len(s) == 0 {
>> break
>> }
>> // process the line
>> lineOut <- fmt.Sprintf("%d\n", len(s))
>> }
>> done <- true
>>
>> }
>
> Could someone expand on the warning in this code snippet about using
> range when there are multiple readers? I assume a race condition
> problem exists, but I don't follow what that is based on documentation
> for range:
>
> http://golang.org/doc/go_spec.html#Select_statements
>
> 4. For channels, the iteration values produced are the successive
> values sent on the channel until the channel is closed; it does
> not produce the zero value sent before the channel is closed
> (§close and closed).

Range loops terminate when the channel closes, but if multiple readers exist, there is a race between two readers seeing the channel close and getting the last value. For instance, one reader could get a zero value caused by a closed channel but believe it's a legitimate value because *its* call to closed() said 'false'.

> I'd also like to ask if someone could clarify the interaction between
> the code above and the close(lineIn) from main:
>
> bufr := bufio.NewReader(br)
> for {
> s, err := bufr.ReadString('\n')
> if len(s) > 0 {
> lineIn <- s
> }
> if err != nil {
> break
> }
> }
> close(lineIn)
>
> If there is small amount of input data, only a few lines, and for some
> reason the execution spun off many threads, won't this potentially
> cause a panic as many of threads end up testing
>
> s := <-lineIn
>
> after the channel has been closed?

The len(s) test does the job. If it's zero, that means the channel is closed. This avoids the race.

-rob

James A. Robinson

unread,
Nov 12, 2010, 7:27:50 PM11/12/10
to Rob 'Commander' Pike, golang-nuts
On Fri, Nov 12, 2010 at 15:53, Rob 'Commander' Pike <r...@google.com> wrote:
>
> Range loops terminate when the channel closes, but if multiple
> readers exist, there is a race between two readers seeing the
> channel close and getting the last value.  For instance, one reader
> could get a zero value caused by a closed channel but believe it's a
> legitimate value because *its* call to closed() said 'false'.

Ah, ok. That makes sense, thank you.

>> If there is small amount of input data, only a few lines, and for some
>> reason the execution spun off many threads, won't this potentially
>> cause a panic as many of threads end up testing
>>
>>  s := <-lineIn
>>
>> after the channel has been closed?
>
> The len(s) test does the job. If it's zero, that means the channel is closed.  This avoids the race.


If the number of threads is wildly out of proportion of the lines of
input, the program below terminates with a "too many operations on a
closed channel" on the

s := <- c1

So while 510 threads works, 511 threads fails. I assume this is
because we're reading c1 over and over as the threads see whether or
not it's closed (when, in fact, it is closed).

While I can certainly also check that somebody didn't specify an
idiotic number of threads, I'm wondering if I might ever hit a
situation where I really can't know whether or not it's safe to limit
the threads, and might have to deal with having more threads than I
have input for.

So should this be running a closed(c1) test *before* reading c1, and
then checking (s = "") ?

-- Jim


package main

import (
"fmt"
"strconv"
)

func main() {
threads := 511 // threads count: 511 crashes, 510 works.
inputs := 100 // number of ints to send

c1 := make(chan string)
c2 := make(chan string)
d1 := make(chan bool)
d2 := make(chan bool)

go output(c2, d2)
for i := 0; i < threads; i++ {
go worker(c1, c2, d1)
}

for i := 1; i <= inputs; i++ {
c1 <- strconv.Itoa(i)
}

close(c1)
for i := 0; i < threads; i++ {
<-d1
}

close(c2)
<-d2
}

func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
for {
s := <-c1


if len(s) == 0 {
break
}

c2 <- s
}
done <- true
}

func output(c <-chan string, done chan<- bool) {
for s := range c {
fmt.Println(s)
}
done <- true
}

Rob 'Commander' Pike

unread,
Nov 12, 2010, 7:32:30 PM11/12/10
to James A. Robinson, golang-nuts

On Nov 12, 2010, at 4:27 PM, James A. Robinson wrote:

> On Fri, Nov 12, 2010 at 15:53, Rob 'Commander' Pike <r...@google.com> wrote:
>>
>> Range loops terminate when the channel closes, but if multiple
>> readers exist, there is a race between two readers seeing the
>> channel close and getting the last value. For instance, one reader
>> could get a zero value caused by a closed channel but believe it's a
>> legitimate value because *its* call to closed() said 'false'.
>
> Ah, ok. That makes sense, thank you.
>
>>> If there is small amount of input data, only a few lines, and for some
>>> reason the execution spun off many threads, won't this potentially
>>> cause a panic as many of threads end up testing
>>>
>>> s := <-lineIn
>>>
>>> after the channel has been closed?
>>
>> The len(s) test does the job. If it's zero, that means the channel is closed. This avoids the race.
>
>
> If the number of threads is wildly out of proportion of the lines of
> input, the program below terminates with a "too many operations on a
> closed channel" on the
>
> s := <- c1
>
> So while 510 threads works, 511 threads fails. I assume this is
> because we're reading c1 over and over as the threads see whether or
> not it's closed (when, in fact, it is closed).

the loop should probably check for closed(). the way it's written, using the zero as the termination condition, that would just be an optimization, but perhaps an important one.

-rob


SnakE

unread,
Nov 12, 2010, 9:04:21 PM11/12/10
to Rob 'Commander' Pike, James A. Robinson, golang-nuts
2010/11/13 Rob 'Commander' Pike <r...@google.com>
One way of optimizing this safely is, instead of closing the channel, put an invalid value into it:

    c1 <- ""

Then in the worker, terminate if `len(s) == 0` as it is now, *but* return that invalid value into the channel before termination:

    func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
        for s := range c1 {
            if len(s) == 0 {
                c1 <- s
                break
            }
            c2 <- s
        }
        done <- true
    }

Here, there is no race in the `range` expression because the channel is never closed.  Any number of workers can be closed correctly because the invalid value is kept in the channel for those who come and check later.

One gotcha here: you use unbuffered, synchronous channels.  This means that the last worker will block on the `c1 <- s` line forever because nobody else is reading the channel.  This is easily avoided: just make the c1 a buffered channel with a capacity of one:

    c1 := make(chan string, 1)

Hope this helps.

PS: I think that a race in the `range` over a channel is unacceptable.  It should just work.

Rob 'Commander' Pike

unread,
Nov 13, 2010, 1:41:37 AM11/13/10
to SnakE, James A. Robinson, golang-nuts
On Fri, Nov 12, 2010 at 6:04 PM, SnakE <snake...@gmail.com> wrote:
> 2010/11/13 Rob 'Commander' Pike <r...@google.com>
>>
>> On Nov 12, 2010, at 4:27 PM, James A. Robinson wrote:
>> > So while 510 threads works, 511 threads fails.  I assume this is
>> > because we're reading c1 over and over as the threads see whether or
>> > not it's closed (when, in fact, it is closed).
>>
>> the loop should probably check for closed().  the way it's written, using
>> the zero as the termination condition, that would just  be an optimization,
>> but perhaps an important one.
>
> One way of optimizing this safely is, instead of closing the channel, put an
> invalid value into it:
>     c1 <- ""
> Then in the worker, terminate if `len(s) == 0` as it is now, *but* return
> that invalid value into the channel before termination:
>     func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
>         for s := range c1 {
>             if len(s) == 0 {
>                 c1 <- s
>                 break
>             }
>             c2 <- s
>         }
>         done <- true
>     }
>

That is not a fix. It doesn't even help.

> Here, there is no race in the `range` expression because the channel is
> never closed.  Any number of workers can be closed correctly because the
> invalid value is kept in the channel for those who come and check later.
> One gotcha here: you use unbuffered, synchronous channels.  This means that
> the last worker will block on the `c1 <- s` line forever because nobody else
> is reading the channel.  This is easily avoided: just make the c1 a buffered
> channel with a capacity of one:
>     c1 := make(chan string, 1)
> Hope this helps.
> PS: I think that a race in the `range` over a channel is unacceptable.  It
> should just work.

Indeed, but it can't with acceptable efficiency. Synchronization is
like that sometimes.

-rob

SnakE

unread,
Nov 13, 2010, 5:26:50 AM11/13/10
to Rob 'Commander' Pike, James A. Robinson, golang-nuts
2010/11/13 Rob 'Commander' Pike <r...@google.com>
On Fri, Nov 12, 2010 at 6:04 PM, SnakE <snake...@gmail.com> wrote:
> One way of optimizing this safely is, instead of closing the channel, put an
> invalid value into it:
>     c1 <- ""
> Then in the worker, terminate if `len(s) == 0` as it is now, *but* return
> that invalid value into the channel before termination:
>     func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
>         for s := range c1 {
>             if len(s) == 0 {
>                 c1 <- s
>                 break
>             }
>             c2 <- s
>         }
>         done <- true
>     }
>

That is not a fix. It doesn't even help.

I thought it would.  Why won't it work? 
 
> PS: I think that a race in the `range` over a channel is unacceptable.  It
> should just work.

Indeed, but it can't with acceptable efficiency.  Synchronization is
like that sometimes.

Maybe it's just me but I expect every language feature to work as specified.  If it doesn't it's an implementation bug.  Nowhere in the specs the race is mentioned.  So it's either the specs should be fixed or the implementation.  It may actually worth it to redesign the close()/closed() concept.

jimr

unread,
Nov 13, 2010, 11:44:13 AM11/13/10
to golang-nuts
On Nov 13, 2:26 am, SnakE <snake.sc...@gmail.com> wrote:
> > > PS: I think that a race in the `range` over a channel is
> > unacceptable.  It > should just work.
>
> > Indeed, but it can't with acceptable efficiency.  Synchronization
> > is like that sometimes.
>
> Maybe it's just me but I expect every language feature to work as
> specified.  If it doesn't it's an implementation bug.  Nowhere in
> the specs the race is mentioned.  So it's either the specs should be
> fixed or the implementation.  It may actually worth it to redesign
> the close()/closed() concept.

Well, that's not really fair. The close/closed functions are working
as specified, at worst the documentation fails to call out the
subtlety that

"After at least one such zero value has been received, closed(c)
returns true. "

means only *one* channel of any number that are reading (c) need
receive the zero value before *all* channels checking closed(c) see
true, and the subsequent impact on range.

This just like someone writing a concurrent program where two threads
are dealing with checking and inserting into a map, right? It's a
common mistake for people to write something like

if ( map[foo] == null ) { map[foo] = bar() }

w/o thinking about their process potentially being put to sleep
between the check and the insert. I wouldn't call that a bug in the
implementation, just a fact of life with concurrent programs.

-- Jim

SnakE

unread,
Nov 13, 2010, 10:59:37 PM11/13/10
to jimr, golang-nuts
2010/11/13 jimr <jim.ro...@gmail.com>

On Nov 13, 2:26 am, SnakE <snake.sc...@gmail.com> wrote:
> > > PS: I think that a race in the `range` over a channel is
> > unacceptable.   It > should just work.
>
> > Indeed, but it can't with acceptable efficiency.  Synchronization
> > is like that sometimes.
>
> Maybe it's just me but I expect every language feature to work as
> specified.   If it doesn't it's an implementation bug.  Nowhere in
> the specs the race is mentioned.  So it's either the specs should be
> fixed or the implementation.   It may actually worth it to redesign
> the close()/closed() concept.

Well, that's not really fair.  The close/closed functions are working
as specified

I'm talking about the `for range` specification:

For channels, the iteration values produced are the successive values sent on the channel until the channel is closed; it does not produce the zero value sent before the channel is closed.

It guarantees to produce "successive values sent on the channel," and it guarantees to end iteration when the channel is closed.  Neither guarantee stands when there are multiple readers on the channel.

There's an easy fix: add to the specs that the `for range chan` works as specified only if there is exactly one goroutine reading from that channel, otherwise the behavior is undefined.  It's not a good fix, but at least it'll make the specs consistent both within itself and with the current implementation.

This just like someone writing a concurrent program where two threads
are dealing with checking and inserting into a map, right?

No.  Channels are the idiomatic means of inter-thread synchronization in Go.  Therefore I expect them to work flawlessly in multi-threaded environment.  But currently the close()/closed() are *specified* so that it's impossible to use them without races.

The operation of receiving a value from a channel must be atomic with the check if the channel is closed.  That's why I'm talking about re-design.

roger peppe

unread,
Nov 15, 2010, 9:56:27 AM11/15/10
to Rob 'Commander' Pike, James A. Robinson, golang-nuts
On 13 November 2010 00:32, Rob 'Commander' Pike <r...@google.com> wrote:
>> So while 510 threads works, 511 threads fails.  I assume this is
>> because we're reading c1 over and over as the threads see whether or
>> not it's closed (when, in fact, it is closed).
>
> the loop should probably check for closed().  the way it's written, using the zero
> as the termination condition, that would just  be an optimization, but perhaps an important one.

i don't think this would help at all. checking for closed() doesn't
stop the channel receive, so 511 threads would still fail regardless.

if i'd wanted the program to work for arbitrary numbers of threads,
the only current solution is either to send each eof value explicitly
(not closing the channel) or to have another channel to signify eof
and have the receivers select.

> > PS: I think that a race in the `range` over a channel is unacceptable. It
> > should just work.
>
> Indeed, but it can't with acceptable efficiency. Synchronization is
> like that sometimes.

is there really no way of providing an efficient means of simultaneously
receiving on a channel and checking for closed()?

it doesn't seem like a difficult problem, and that's all that's needed
to avoid the race.

Ian Lance Taylor

unread,
Nov 15, 2010, 1:06:50 PM11/15/10
to roger peppe, Rob 'Commander' Pike, James A. Robinson, golang-nuts
roger peppe <rogp...@gmail.com> writes:

> is there really no way of providing an efficient means of simultaneously
> receiving on a channel and checking for closed()?

Yes, we could invent an internal facility only available for the range
clause to make this work. The internal facility would return two
values: the value read from the channel and a boolean indicating whether
the channel is closed. If the channel is closed, then the value should
not be used.

I think that would provide this guarantee: multiple goroutines could use
range over a channel correctly, where correctly is defined as "each
value goes to exactly one goroutine, each value sent on the channel is
received exactly once, and no values not sent on the channel are
received."

I don't know whether this is a good idea, but I think it's worth opening
an issue for it.

Ian

roger peppe

unread,
Nov 15, 2010, 5:16:54 PM11/15/10
to Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts
On 15 November 2010 18:06, Ian Lance Taylor <ia...@google.com> wrote:
> roger peppe <rogp...@gmail.com> writes:
>
>> is there really no way of providing an efficient means of simultaneously
>> receiving on a channel and checking for closed()?
>
> Yes, we could invent an internal facility only available for the range
> clause to make this work.  The internal facility would return two
> values: the value read from the channel and a boolean indicating whether
> the channel is closed.  If the channel is closed, then the value should
> not be used.
>
> I think that would provide this guarantee: multiple goroutines could use
> range over a channel correctly, where correctly is defined as "each
> value goes to exactly one goroutine, each value sent on the channel is
> received exactly once, and no values not sent on the channel are
> received."

that's not quite right, because after close, zero values are received
that are not sent on the channel, but i like the general thrust.

> I don't know whether this is a good idea, but I think it's worth opening
> an issue for it.

i don't like the idea of it being internal and accessible only to range
statements, because it's often useful to transform

for i := range c {
}

into

for {
select {
case i := <-c:
if closed(c) {
return
}
... other cases
}

it's currently a mechanical translation and often
essential when you need to read on more than
one channel but still get range-like semantics.

i've said this a few times already (i don't think
it was my idea originally) but if the semantics
of multiple-assignment receive were changed
so that the second value represented closed rather
than "was received", then this would all fall out
quite naturally.

i, closed := <-c

so range on a channel would be equivalent to:

for {
i, closed := <-c
if closed {
break
}
...
}


it'd be a significant change to the language, and not easy
to automate the code transition, but still perhaps worth considering.

Steven

unread,
Nov 16, 2010, 8:53:53 AM11/16/10
to roger peppe, Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts

But now how do you distinguish between blocking and non-blocking receives?

chris dollin

unread,
Nov 16, 2010, 8:59:21 AM11/16/10
to Steven, roger peppe, Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts

Allow <- to return three values, quietly drooping on the floor those that
are not used.

Or add the <-- or <<- or <=- operator for "the other" receive operation.

Chris

--
Chris "allusive" Dollin

roger peppe

unread,
Nov 16, 2010, 9:03:38 AM11/16/10
to Steven, Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts
On 16 November 2010 13:53, Steven <stev...@gmail.com> wrote:
> But now how do you distinguish between blocking and non-blocking receives?

add a "default" clause to select - if such a clause is present, then
the select becomes non-blocking. it's syntactically heavier,
but non-blocking channel operations are rare enough that that
probably does not matter.

roger peppe

unread,
Nov 16, 2010, 9:46:00 AM11/16/10
to chris dollin, Steven, Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts
On 16 November 2010 13:59, chris dollin <ehog....@googlemail.com> wrote:
>> But now how do you distinguish between blocking and non-blocking receives?
>
> Allow <- to return three values, quietly drooping on the floor those that
> are not used.

you could do this, but you'd have to make it also valid for <- to return two
values (the second value must be "closed") otherwise it's not possible
to do a blocking receive that also returns closed.

yy

unread,
Nov 16, 2010, 10:21:36 AM11/16/10
to roger peppe, Steven, Ian Lance Taylor, Rob 'Commander' Pike, James A. Robinson, golang-nuts
2010/11/16 roger peppe <rogp...@gmail.com>:

I like this idea. However, I think it would be better if the second
value meant "open" instead of "closed". It would be more consistent
with maps and the comma-ok idiom, and then you could close channels
with:

ch <- nil, false

as we do to delete map entries. This way we could get rid of both
built-ins, close() and closed().

--
- yiyus || JGL . 4l77.com

SnakE

unread,
Nov 16, 2010, 7:55:45 PM11/16/10
to Ian Lance Taylor, roger peppe, Rob 'Commander' Pike, James A. Robinson, golang-nuts
2010/11/15 Ian Lance Taylor <ia...@google.com>
roger peppe <rogp...@gmail.com> writes:

> is there really no way of providing an efficient means of simultaneously
> receiving on a channel and checking for closed()?

I don't know whether this is a good idea, but I think it's worth opening
an issue for it.

Issue 397 will celebrate its anniversary soon.
Reply all
Reply to author
Forward
0 new messages