Because of the size of the data set, I'm wondering if it's safe to try
an approach where I have two goroutines writing/reading from a
temporary file, where the temporary file is acting as a buffer.
Are there gotchas w/re to opening a temp file (i.e., os.TempFile) and
trying to share it across two goroutines? Is that a bad idea? If it
is, do I have any alternative to a lock-step (a) read all data from
server, then (b) kick of goroutines to process data?
Implementing a semaphore using channels could be an interesting task. Or if you're not in the mood you can search this list--I'm sure there were semaphore implementations somewhere around.
No need to keep a list of workers or anything as far as I can see
If order does matter, you could still do this, just write each incoming batch of data to its own temp file and send the file name down the channel to the workers.This may be impractical if you get many tiny batches instead of a few big ones. In that case, I'd add one more goroutine:The first goroutine downloads the data and writes to the end of the temp file. It sends the file offset & length of the data down a channel to the second goroutine.The second go reads from the temp file and distributes the data to worker goroutines via a second channel.
From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing. You'll end up with tens of millions of offset/length pairs in your channel. I'll be surprised if it works.
Missed that; well, the first channel is just a convenience. First goroutine writes lines at the end, second one reads starting from the beginning. Without the channel it has to pay some attention to what it is reading but that's not a big deal. The rest of the design still works.On Wed, Nov 10, 2010 at 11:51 AM, SnakE <snake...@gmail.com> wrote:From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing. You'll end up with tens of millions of offset/length pairs in your channel. I'll be surprised if it works.
On Wed, Nov 10, 2010 at 12:59 PM, SnakE <snake...@gmail.com> wrote:
2010/11/10 Daniel Smith <dan...@lukenine45.net>
On Wed, Nov 10, 2010 at 11:51 AM, SnakE <snake...@gmail.com> wrote:From the OP it follows that there are about 40 millions of data units to be processed, and downloading is much faster than processing. You'll end up with tens of millions of offset/length pairs in your channel. I'll be surprised if it works.Missed that; well, the first channel is just a convenience. First goroutine writes lines at the end, second one reads starting from the beginning. Without the channel it has to pay some attention to what it is reading but that's not a big deal. The rest of the design still works.Please explain how to read new lines of data from a huge file as they arrive without sitting in a busy loop.
Processing is super slow compared to writing to the file, so that won't be a problem-- you'll never catch up and have to wait.
i wrote a package that implements the buffering part.
http://code.google.com/p/rog-go/source/browse/breader/breader.go
then you can write the actual processing code in
a more natural style, something like this, perhaps:
package main
import (
"rog-go.googlecode.com/hg/breader"
"os"
"log"
"bufio"
"fmt"
)
const nProcs = 4
func main() {
br, err := breader.NewReader(os.Stdin, "/tmp", "breader")
if err != nil {
log.Exitln("newreader", err)
}
lineIn := make(chan string)
lineOut := make(chan string)
procDone := make(chan bool)
outDone := make(chan bool)
go output(lineOut, outDone)
for i := 0; i < nProcs; i++ {
go processor(lineIn, lineOut, procDone)
}
bufr := bufio.NewReader(br)
for {
s, err := bufr.ReadString('\n')
if len(s) > 0 {
lineIn <- s
}
if err != nil {
break
}
}
close(lineIn)
for i := 0; i < nProcs; i++ {
<-procDone
}
close(lineOut)
<-outDone
}
func processor(lineIn <-chan string, lineOut chan<-string, done chan<- bool) {
for {
// NB can't use range because there are multiple
readers on the same channel.
s := <-lineIn
if len(s) == 0 {
break
}
// process the line
lineOut <- fmt.Sprintf("%d\n", len(s))
}
done <- true
}
func output(lineOut <-chan string, done chan<- bool) {
for s := range lineOut {
fmt.Print(s)
}
done <- true
}
> On Nov 11, 4:06 am, roger peppe <rogpe...@gmail.com> wrote:
>>
>> func processor(lineIn <-chan string, lineOut chan<-string, done chan<- bool) {
>> for {
>> // NB can't use range because there are multiple readers on the same channel.
>> s := <-lineIn
>> if len(s) == 0 {
>> break
>> }
>> // process the line
>> lineOut <- fmt.Sprintf("%d\n", len(s))
>> }
>> done <- true
>>
>> }
>
> Could someone expand on the warning in this code snippet about using
> range when there are multiple readers? I assume a race condition
> problem exists, but I don't follow what that is based on documentation
> for range:
>
> http://golang.org/doc/go_spec.html#Select_statements
>
> 4. For channels, the iteration values produced are the successive
> values sent on the channel until the channel is closed; it does
> not produce the zero value sent before the channel is closed
> (§close and closed).
Range loops terminate when the channel closes, but if multiple readers exist, there is a race between two readers seeing the channel close and getting the last value. For instance, one reader could get a zero value caused by a closed channel but believe it's a legitimate value because *its* call to closed() said 'false'.
> I'd also like to ask if someone could clarify the interaction between
> the code above and the close(lineIn) from main:
>
> bufr := bufio.NewReader(br)
> for {
> s, err := bufr.ReadString('\n')
> if len(s) > 0 {
> lineIn <- s
> }
> if err != nil {
> break
> }
> }
> close(lineIn)
>
> If there is small amount of input data, only a few lines, and for some
> reason the execution spun off many threads, won't this potentially
> cause a panic as many of threads end up testing
>
> s := <-lineIn
>
> after the channel has been closed?
The len(s) test does the job. If it's zero, that means the channel is closed. This avoids the race.
-rob
Ah, ok. That makes sense, thank you.
>> If there is small amount of input data, only a few lines, and for some
>> reason the execution spun off many threads, won't this potentially
>> cause a panic as many of threads end up testing
>>
>> s := <-lineIn
>>
>> after the channel has been closed?
>
> The len(s) test does the job. If it's zero, that means the channel is closed. This avoids the race.
If the number of threads is wildly out of proportion of the lines of
input, the program below terminates with a "too many operations on a
closed channel" on the
s := <- c1
So while 510 threads works, 511 threads fails. I assume this is
because we're reading c1 over and over as the threads see whether or
not it's closed (when, in fact, it is closed).
While I can certainly also check that somebody didn't specify an
idiotic number of threads, I'm wondering if I might ever hit a
situation where I really can't know whether or not it's safe to limit
the threads, and might have to deal with having more threads than I
have input for.
So should this be running a closed(c1) test *before* reading c1, and
then checking (s = "") ?
-- Jim
package main
import (
"fmt"
"strconv"
)
func main() {
threads := 511 // threads count: 511 crashes, 510 works.
inputs := 100 // number of ints to send
c1 := make(chan string)
c2 := make(chan string)
d1 := make(chan bool)
d2 := make(chan bool)
go output(c2, d2)
for i := 0; i < threads; i++ {
go worker(c1, c2, d1)
}
for i := 1; i <= inputs; i++ {
c1 <- strconv.Itoa(i)
}
close(c1)
for i := 0; i < threads; i++ {
<-d1
}
close(c2)
<-d2
}
func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
for {
s := <-c1
if len(s) == 0 {
break
}
c2 <- s
}
done <- true
}
func output(c <-chan string, done chan<- bool) {
for s := range c {
fmt.Println(s)
}
done <- true
}
> On Fri, Nov 12, 2010 at 15:53, Rob 'Commander' Pike <r...@google.com> wrote:
>>
>> Range loops terminate when the channel closes, but if multiple
>> readers exist, there is a race between two readers seeing the
>> channel close and getting the last value. For instance, one reader
>> could get a zero value caused by a closed channel but believe it's a
>> legitimate value because *its* call to closed() said 'false'.
>
> Ah, ok. That makes sense, thank you.
>
>>> If there is small amount of input data, only a few lines, and for some
>>> reason the execution spun off many threads, won't this potentially
>>> cause a panic as many of threads end up testing
>>>
>>> s := <-lineIn
>>>
>>> after the channel has been closed?
>>
>> The len(s) test does the job. If it's zero, that means the channel is closed. This avoids the race.
>
>
> If the number of threads is wildly out of proportion of the lines of
> input, the program below terminates with a "too many operations on a
> closed channel" on the
>
> s := <- c1
>
> So while 510 threads works, 511 threads fails. I assume this is
> because we're reading c1 over and over as the threads see whether or
> not it's closed (when, in fact, it is closed).
the loop should probably check for closed(). the way it's written, using the zero as the termination condition, that would just be an optimization, but perhaps an important one.
-rob
That is not a fix. It doesn't even help.
> Here, there is no race in the `range` expression because the channel is
> never closed. Any number of workers can be closed correctly because the
> invalid value is kept in the channel for those who come and check later.
> One gotcha here: you use unbuffered, synchronous channels. This means that
> the last worker will block on the `c1 <- s` line forever because nobody else
> is reading the channel. This is easily avoided: just make the c1 a buffered
> channel with a capacity of one:
> c1 := make(chan string, 1)
> Hope this helps.
> PS: I think that a race in the `range` over a channel is unacceptable. It
> should just work.
Indeed, but it can't with acceptable efficiency. Synchronization is
like that sometimes.
-rob
On Fri, Nov 12, 2010 at 6:04 PM, SnakE <snake...@gmail.com> wrote:That is not a fix. It doesn't even help.
> One way of optimizing this safely is, instead of closing the channel, put an
> invalid value into it:
> c1 <- ""
> Then in the worker, terminate if `len(s) == 0` as it is now, *but* return
> that invalid value into the channel before termination:
> func worker(c1 <-chan string, c2 chan<- string, done chan<- bool) {
> for s := range c1 {
> if len(s) == 0 {
> c1 <- s
> break
> }
> c2 <- s
> }
> done <- true
> }
>
> PS: I think that a race in the `range` over a channel is unacceptable. ItIndeed, but it can't with acceptable efficiency. Synchronization is
> should just work.
like that sometimes.
On Nov 13, 2:26 am, SnakE <snake.sc...@gmail.com> wrote:Well, that's not really fair. The close/closed functions are working
> > > PS: I think that a race in the `range` over a channel is
> > unacceptable. It > should just work.
>
> > Indeed, but it can't with acceptable efficiency. Synchronization
> > is like that sometimes.
>
> Maybe it's just me but I expect every language feature to work as
> specified. If it doesn't it's an implementation bug. Nowhere in
> the specs the race is mentioned. So it's either the specs should be
> fixed or the implementation. It may actually worth it to redesign
> the close()/closed() concept.
as specified
For channels, the iteration values produced are the successive values sent on the channel until the channel is closed; it does not produce the zero value sent before the channel is closed.
This just like someone writing a concurrent program where two threads
are dealing with checking and inserting into a map, right?
i don't think this would help at all. checking for closed() doesn't
stop the channel receive, so 511 threads would still fail regardless.
if i'd wanted the program to work for arbitrary numbers of threads,
the only current solution is either to send each eof value explicitly
(not closing the channel) or to have another channel to signify eof
and have the receivers select.
> > PS: I think that a race in the `range` over a channel is unacceptable. It
> > should just work.
>
> Indeed, but it can't with acceptable efficiency. Synchronization is
> like that sometimes.
is there really no way of providing an efficient means of simultaneously
receiving on a channel and checking for closed()?
it doesn't seem like a difficult problem, and that's all that's needed
to avoid the race.
> is there really no way of providing an efficient means of simultaneously
> receiving on a channel and checking for closed()?
Yes, we could invent an internal facility only available for the range
clause to make this work. The internal facility would return two
values: the value read from the channel and a boolean indicating whether
the channel is closed. If the channel is closed, then the value should
not be used.
I think that would provide this guarantee: multiple goroutines could use
range over a channel correctly, where correctly is defined as "each
value goes to exactly one goroutine, each value sent on the channel is
received exactly once, and no values not sent on the channel are
received."
I don't know whether this is a good idea, but I think it's worth opening
an issue for it.
Ian
that's not quite right, because after close, zero values are received
that are not sent on the channel, but i like the general thrust.
> I don't know whether this is a good idea, but I think it's worth opening
> an issue for it.
i don't like the idea of it being internal and accessible only to range
statements, because it's often useful to transform
for i := range c {
}
into
for {
select {
case i := <-c:
if closed(c) {
return
}
... other cases
}
it's currently a mechanical translation and often
essential when you need to read on more than
one channel but still get range-like semantics.
i've said this a few times already (i don't think
it was my idea originally) but if the semantics
of multiple-assignment receive were changed
so that the second value represented closed rather
than "was received", then this would all fall out
quite naturally.
i, closed := <-c
so range on a channel would be equivalent to:
for {
i, closed := <-c
if closed {
break
}
...
}
it'd be a significant change to the language, and not easy
to automate the code transition, but still perhaps worth considering.
But now how do you distinguish between blocking and non-blocking receives?
Allow <- to return three values, quietly drooping on the floor those that
are not used.
Or add the <-- or <<- or <=- operator for "the other" receive operation.
Chris
--
Chris "allusive" Dollin
add a "default" clause to select - if such a clause is present, then
the select becomes non-blocking. it's syntactically heavier,
but non-blocking channel operations are rare enough that that
probably does not matter.
you could do this, but you'd have to make it also valid for <- to return two
values (the second value must be "closed") otherwise it's not possible
to do a blocking receive that also returns closed.
I like this idea. However, I think it would be better if the second
value meant "open" instead of "closed". It would be more consistent
with maps and the comma-ok idiom, and then you could close channels
with:
ch <- nil, false
as we do to delete map entries. This way we could get rid of both
built-ins, close() and closed().
--
- yiyus || JGL . 4l77.com
roger peppe <rogp...@gmail.com> writes:
> is there really no way of providing an efficient means of simultaneously
> receiving on a channel and checking for closed()?
I don't know whether this is a good idea, but I think it's worth opening
an issue for it.