Re: [go-nuts] Multiple goroutines with quit channels, some messages are lost

1,521 views
Skip to first unread message

Kyle Lemons

unread,
Feb 14, 2013, 1:36:38 PM2/14/13
to Mika Boström, golang-nuts
A few things:

1) Quit channels are typically implemented with a single channel given out to all listeners; then the "quit" message is sent by closing the channel.
2) Have you SIGQUIT (ctrl+/) the app when it is hung?  You will get a nice stack trace explaining what goroutines are alive and where they are blocked and on what.
3) I suspect you have a communication loop; a routine is sending a message to the goroutine that is sending the quits, so they are blocked sending to one another.

The most direct way to fix it is probably something like:
func primary() (err error) {
  quit := make(chan bool)
  data := make(chan []string)
  errch := make(chan error, NumSecondary) // each child can send at most one error

  var wg sync.WaitGroup
  for i := 0; i < NumSecondary; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      secondary(data, errch, quit)
    }()
  }
  go func() {
    wg.Wait()
    close(data)
  }()

  for {
    select{
    case s, ok := <-data:
      if !ok {
        return err
      }
      process(s)
    case err = <-errch: // got an error, signal children to exit
      log.Printf("error: %s", err)
      close(quit)
      errch = nil // don't get any more errors (would cause double close)
    }
  }
  panic("unreachable")
}

func secondary(data chan []string, errch chan error, quit chan bool) {
  ready := time.Tick(1*time.Second) // just used for demonstration
  for {
    select {
    case <-quit:
      return
    case <-ready:
      s, err := getData() // just used for demonstration
      if err != nil {
        errch <- err
        return
      }
      data <- s
    }
  }
}

It looks like you have lots of readers pumping into a single writer, though, which doesn't really mesh with your description; in a case like that, all of the readers should generally fail if any of them fail, so you don't need the quit channel in the first place.  Doing a blocking operation and an async quit doesn't really work.


On Thu, Feb 14, 2013 at 9:29 AM, Mika Boström <limb...@gmail.com> wrote:
I'm seeing some odd behaviour when I try to signal multiple (eternally looping) goroutines to quit. When I tell a number of goroutines to quit, only some of them receive the message. The "quit" signal is sent through dedicated channels, with each routine having their own.

Some background. First of all, this is my "learn some Go" project. I'm feeding data for carbon (graphite backend), which simply listens on a socket and accepts lines of text. There is no other protocol, the data is just sucked in silently. When logs are rotated, carbon gets rehashed and future writes to the existing connection either time out or receive an error due to broken pipe. 

I want to react to the failed write, shut down collector routines, reconnect and put the routines back to work.

What happens in reality:
1) remote end shuts down
2) app sees a failed write
3) app sends a message over all the quit channels, by calling mass_quit() function
4) some of the goroutines receive the message and exit gracefully, but not all; those that do exit, nudge a sync.WaitGroup counter down
5) the app hangs indefinitely in the mass_quit(), apparently blocking on the channel transmit

Code speaks more than any descriptions, so I have put a slightly reduced version at http://pastie.org/6165938 (buggy.go). With that code I can reproduce the hang at third goroutine. While I do want to find a solution for this, I would also _really_ like to understand why this happens in the first place.

Easiest way to reproduce with buggy.go: run "nc -l -p 2009" in one terminal, buggy in another. After a couple of received bursts, kill nc. Buggy should react in a few seconds and exhibit the behaviour I've outlined above.

Any help and hints welcome. (Yes, including "redesign, that's just wrong[tm]".)

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

steve wang

unread,
Feb 14, 2013, 1:38:26 PM2/14/13
to golan...@googlegroups.com
I gess some of your goroutines are blocked while they are trying to send []string by channel, where the code snippet is
line 28: c <- stuff

My suggestion is that you change the implemation of do_stuff, which may be:
func do_stuff(c chan []string, quit chan int, wg *sync.WaitGroup) {
for {
n := 1 + (rand.Int() % 6)
stamp := time.Now().Unix()
var stuff = make([]string, n)
for i := range stuff {
stuff[i] = fmt.Sprintf("chan-id=%x, (#%d), timestamp = %d",
quit, i+1, stamp)
}
select {
case <-quit:
log.Printf("Ach, 'tis time to go.")
wg.Done()
return
case c <- stuff:
time.Sleep(5 * time.Second)

bryanturley

unread,
Feb 14, 2013, 1:39:49 PM2/14/13
to golan...@googlegroups.com
Hmmm...

I ported it to play
http://play.golang.org/p/hHZu2IfHNW

But it seems to work there?

I removed the tcp and replaced with a chan []byte then made the write error random, since I am fairly certain the net package doesn't work on play.golang.org.

what is your GOMAXPROCS?  I think play.golang.org is set to 1.

Not sure you need the wait group if you also have the quit chan int that isn't buffered.


Instead of having something like

func launch(routine func(chan []string, chan int, *sync.WaitGroup), ch chan []string, wg *sync.WaitGroup) chan int {


you could define a type

type R struct {
  ch chan []string
  quit chan int
  wg *sync.WaitGroup
}

func NewR(ch chan []string, wg *sync.WaitGroup) *R {
  r := new(R)
  ... setup r ...
  go r.Run()
  return r
}

func (r *R) Run {
  ... bleh ...
}

Might be easier to read, just be aware that r is being used in that goroutine and possibly other places in code so you should use channels and other non racy things that you seem to be doing already.

Now launch is just NewR()

Peter

unread,
Feb 14, 2013, 1:46:04 PM2/14/13
to golan...@googlegroups.com
I think that when you call mass_quit you stop receiving from comm. A do_stuff goroutine could get stuck as it blocks on sending to comm.

steve wang

unread,
Feb 14, 2013, 1:54:18 PM2/14/13
to golan...@googlegroups.com
Agreed. Just the same as I thought. 
I have given an alternative implementation of do_stuff in my previous post.

John Nagle

unread,
Feb 15, 2013, 2:12:29 AM2/15/13
to golan...@googlegroups.com
On 2/14/2013 10:54 AM, steve wang wrote:
> Agreed. Just the same as I thought.
> I have given an alternative implementation of do_stuff in my previous post.

Here's a simpler version, using one quit channel that only uses a close.

http://play.golang.org/p/qkBKKcrgOz

Close is the only broadcast operation on a channel, so if you have to do
a one-to-many send, once, it's the way to go. No worries about deadlock
because close is non-blocking.

This is a fairly standard cleanup pattern - channel close signals
shutdown, and a wait group counts down until everybody has finished.

John Nagle



Mika Boström

unread,
Feb 15, 2013, 7:38:59 AM2/15/13
to golan...@googlegroups.com
Okay, that was a nice solution. The goroutines were indeed blocked on send, which was confirmed by SIGQUIT dump (thanks, Kyle). 


torstai, 14. helmikuuta 2013 20.38.26 UTC+2 steve wang kirjoitti:
My suggestion is that you change the implemation of do_stuff, which may be:
func do_stuff(c chan []string, quit chan int, wg *sync.WaitGroup) {
for {
n := 1 + (rand.Int() % 6)
stamp := time.Now().Unix()
var stuff = make([]string, n)
for i := range stuff {
stuff[i] = fmt.Sprintf("chan-id=%x, (#%d), timestamp = %d",
quit, i+1, stamp)
}
select {
case <-quit:
log.Printf("Ach, 'tis time to go.")
wg.Done()
return
case c <- stuff:
time.Sleep(5 * time.Second)

That is a beautiful trick. I wasn't aware we could select on "data sent" condition. The other surprising bit was that there is no need for 'default' case. Amusingly an empty default-block caused the program to busyloop - which in hindsight makes sense. "There's nothing to do, so let's do it again!"

These changes were enough to make a graceful exit + restart work as expected. I'll try out all the other suggestions too. Refactoring collectors behind structs sounds like a nice approach and might make the code more manageable in the long run. But first I'll see if I can get the thing working with a single shared quit channel. That would be a decent simplification.

Thank you, guys.
 

Mika Boström

unread,
Feb 15, 2013, 7:54:05 AM2/15/13
to golan...@googlegroups.com
torstai, 14. helmikuuta 2013 20.39.49 UTC+2 bryanturley kirjoitti:
what is your GOMAXPROCS?  I think play.golang.org is set to 1.

I hadn't thought of that. All the runs have been on SMP systems, so I tried out with runtime.GOMAXPROC(1); still experience the same hang. Reorganising the worker logic solved the problem.

I'll probably refactor my code to use your "worker-struct" approach in near future, though.
Reply all
Reply to author
Forward
0 new messages