On Thu, Aug 27, 2015, 15:50 Enric Lafont <enr...@gmail.com> wrote:
I need to know the number of goroutines available in a WaitGroup, but there are no Count() method or any other way to obtain such information from the WaitGroup structure.
WaitGroup has no Count method because, in the general case, there's no non-racy way to provide that number - it may get invalidated the moment you know it. The racy value possibly obtained in any other way is thus not much useful.
-j
Thanks for answering me.
The solution you propose is quite similar to the one I'm using, well you do a manual count when such number must be equal to the one wg.Count() returns.
What I'm doing is to process a directory in our NAS server badly organized, it has 60 million files and 350.000 subdirectories
The program I'm refactoring is an old PHP script that takes more than 4 days to complete operation, it's badly written and executed slowly, so, as the nature of the problem is a very concurrent one, Golang seem a perfect fit for me, well its the same I would have rewrite it in go even if the problem were not concurrent ... :)
The main problem is that a goroutine for every file overloads the machine and at the end golang complains about too many open files, and "fs.file-max = 4033344" when the goroutines aproach the milion the program crashes.
I can not overload the machine, it's a server doing business procedures and our NAS too, so I need to limit the number of simultaneous goroutines, but, when I limit the number I find that some subdirectories are deeper than the number of max goroutines I'm allowed to run, and the feeder channel gets full, so the program stops working, I needed a way to keep the goroutines on order, and this way is limiting the number but allowing more when the deep of the tree structure needs it, something like
I:
for { // Repeat until fullFile name has space in dirch
select {
case dirch <- fullFile:
fmt.Printf("Queue: %3d, %3d File: %v \r", len(dirch), wg.Count(), fullFile)
if wg.Count() < (uint32)(len(dirch)) { // len(dirch) is 256 now
go readDirectory(dirch, wr, wg) // The goroutine accepts an input read channel, an output write channel , and a waitgroup
runtime.Gosched() // Give time to other goroutines
}
break I
default: // Done when channel capacity is full
runtime.Gosched() // Give time to reduce channel pressure
if len(dirch) == cap(dirch) { // if none of the existing routines solved the problem, then
go readDirectory(dirch, wr, wg) // run extra routine to relieve channel capacity
}
}
}
This currently works, being able to count the number of goroutines lets me open more goroutines when the need arises, the goroutines dies when the queue (dirch) becomes empty, so I can keep the load on the system as low as possible, using this approach I have a peak of ~900 goroutines for a while and an average of 150 routines running concurrently, the load on the system is about 40%.
Thanks again for your time.
On Fri, Aug 28, 2015, 11:04 Enric Lafont <enricl@gmail.com> wrote:
If the need for Count() is actually just a need to limit a number of things, for example goroutines, then use a buffered channel.
limiter := make(chan struct{}, limit)
Before creating a new thing (starting a new goroutine)
limiter <- struct{}{}
Before destroying a thing (returning from a goroutine)
<-limiter
Such limiter is synchronous and safe for concurrent use.
-j
type CountWG struct {
sync.WaitGroup
Count int // Race conditions, only for info logging.
}
// Increment on Add
func (cg CountWG) Add(delta int) {
cg.Count += delta
cg.WaitGroup.Add(delta)
}
// Decrement on Done
func (cg CountWG) Done() {
cg.Count--
cg.WaitGroup.Done()
}
I too wanted a way to see the number and was ok to accept nonsafe count.
Just wanted to log the count from time to time and on SIGTERM ("Signal received, waiting on %d jobs.")
type CountWG struct {
sync.WaitGroup
Count int // Race conditions, only for info logging.
}
// Increment on Add
func (cg CountWG) Add(delta int) {
cg.Count += delta
cg.WaitGroup.Add(delta)
}
// Decrement on Done
func (cg CountWG) Done() {
cg.Count--