Case for Cond.WaitTimeout()

343 views
Skip to first unread message

Roman Leventov

unread,
Apr 13, 2021, 6:53:01 AM4/13/21
to golang-nuts
I'm trying to implement a simple Connection type:

type Connection {
  // Puts a batch of frames into a queue which can later be read.
  // This method is called only from a single goroutine.
  PutFrames(frames []*Frame)
  // Returns a frame from the queue if present, with timeout.
  // This method is called only from a single goroutine.
  Read(timeout time.Duration) *Frame
}

I cannot change this interface.

The simplest solution that I've come up with is the following:

```
type Frame struct{}

type Connection struct {
sem    *semaphore.Weighted // from http://golang.org/x/sync/semaphore
mu     sync.Mutex
frames []*Frame
}

func NewConnection() *Connection {
return &Connection{
sem: semaphore.NewWeighted(1),
}
}

func (c *Connection) getFrames() []*Frame {
c.mu.Lock()
defer c.mu.Unlock()
return c.frames
}

// Can be called only from one goroutine.
func (c *Connection) Read(timeout time.Duration) *Frame {
if timeout < 0 {
return nil
}
start := time.Now()
c.mu.Lock()
if len(c.frames) > 0 {
f := c.frames[0]
c.frames = c.frames[1:]
if len(c.frames) == 0 {
// Drain the permit if present.
_ = c.sem.TryAcquire(1)
}
c.mu.Unlock()
return f
}
c.mu.Unlock()
if !c.awaitFrames(timeout) {
return nil
}
return c.Read(timeout - time.Since(start))
}

func (c *Connection) awaitFrames(timeout time.Duration) bool {
done := make(chan struct{})
go func() {
_ = c.sem.Acquire(context.Background(), 1)
close(done)
}()
select {
case <-time.After(timeout):
return false
case <-done:
return true
}
}

// Can be called only from one goroutine.
func (c *Connection) PutFrames(frames []*Frame) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.frames) == 0 {
c.sem.Release(1)
}
c.frames = append(c.frames, frames...)
}
```

The problem with this solution is that it's very complex (let alone inefficient, but I don't care about performance in this case). Also, note that this solution only works because PutFrames() and Read() are only ever called from single goroutines. I'm afraid I would need to hack into Semaphore implementation to make it work for more concurrent callers. 

If sync.Cond.WaitTimeout() method existed, the code could be made much simpler.

I don't understand what Andrew Gerrard suggested in this comment: https://github.com/golang/go/issues/9578#issuecomment-69700942 and how it is supposed to work. In his snippet, cond.Wait() is called when the mutex is not locked.

Questions:
 - Are there any bugs in my solution?
 - Are there are ways to make my solution simpler?
 - Is it a good case for justifying adding Cond.WaitTimeout()?

Jesper Louis Andersen

unread,
Apr 13, 2021, 7:28:13 AM4/13/21
to Roman Leventov, golang-nuts
On Tue, Apr 13, 2021 at 12:53 PM Roman Leventov <leven...@gmail.com> wrote:
type Frame struct{}

type Connection struct {
sem    *semaphore.Weighted // from http://golang.org/x/sync/semaphore
mu     sync.Mutex
frames []*Frame
}


My immediate hunch when reading your code was:

type Connection struct {
    frames chan *Frame
}

However, I'm also pretty sure it breaks down in your case. But it might be good as an exploratory naive question: why wouldn't a channel work in your case? A Put is to send to the channel. A read is to receive from the channel. The timeout is handled by a select { ... }, like in your code, more or less. Granted, you'll have to drain the channel etc, but you explicitly said the efficiency wasn't that important. And channels tend to be easier to reason about. My hunch is also this is easier to get a good semantics for when there are more than 2 goroutines involved.

Brian Candler

unread,
Apr 13, 2021, 8:43:13 AM4/13/21
to golang-nuts
Bryan Mill's presentation on go concurrency patterns is well worth watching all the way through:

There are lots of code snippets which are directly relevant to what you're trying to do, e.g. from 20:48 is a queue with Get, and 21:59 a queue with GetMany.  He starts with the traditional semaphore approach and then shows a much cleaner and simpler way of doing it.

The biggest revelation I got was: instead of having a data structure protected by a mutex, you can stuff a data structure into a one-element buffered channel.  When you need to use it: pop it out, modify it, push it back in.  Very easy to reason about.

Roman Leventov

unread,
Apr 13, 2021, 9:00:33 AM4/13/21
to Brian Candler, golang-nuts
Jesper, a single channel works if I provision it with big enough capacity (1000), but this feels like an antipattern which can break. Still, this is probably the most practical option.

Brian, thanks for sharing, yes this pattern solves my problem much simpler than I have. Although it doesn't feel right that achieving something that feels like basic concurrent programming turns into Koan solving.

--
You received this message because you are subscribed to a topic in the Google Groups "golang-nuts" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/golang-nuts/bNF_KXUiKHs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/7b198a7b-daaf-435b-930f-bb318dd838e3n%40googlegroups.com.

Robert Engels

unread,
Apr 13, 2021, 9:21:17 AM4/13/21
to Roman Leventov, Brian Candler, golang-nuts
Have the channel hold a slice of objects and process them as a batch. Batch in - batch out. 

Typically unless your traffic is bursty - a faster producer than consumer will either cause blocking, data loss (drops) or failure (oom). 

On Apr 13, 2021, at 8:00 AM, Roman Leventov <leven...@gmail.com> wrote:


You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CAAMLo%3DYqkBjgz7tL13GE9_hetP79JG-q-PW5ze6UmXVE%3DYU%3DQQ%40mail.gmail.com.

Jesper Louis Andersen

unread,
Apr 14, 2021, 6:12:55 AM4/14/21
to Roman Leventov, Brian Candler, golang-nuts
On Tue, Apr 13, 2021 at 3:00 PM Roman Leventov <leven...@gmail.com> wrote:
Jesper, a single channel works if I provision it with big enough capacity (1000), but this feels like an antipattern which can break.

Yes.

On the flip side though, unbounded channels are also dangerous if the producer(s) outpaces consumers. In a real system, you'll have to decide on an in-flight limit anyway, if you want your system to be stable. If you don't want a bound, just set the capacity at 1_000_000_000_000 or some number that's larger than the available RAM for the foreseeable future.

Personally, I'd probably go with the flow control pattern.

Also, the idea of using a checkout/checkin pattern on a 1-capacity channel circumvents most of these considerations.

Bryan C. Mills

unread,
Apr 15, 2021, 10:17:04 AM4/15/21
to golang-nuts
For what it's worth, I would argue that the 1-buffered channel pattern in Go is “basic concurrent programming”. 1-buffered channels are used as basic building blocks throughout the standard library — consider time.Ticker or signal.Notify — and for good reason. A 1-buffered channel is a very simple and versatile concurrent container.

For a good analogy, you can think of it like a physical transaction drawer or secure package receiver or lock box. The container may have a value in it (say, a bank deposit bag), but in order to access that value one party (perhaps the teller) needs to open the container and remove its contents, and only one party may do so at a time. That party may update the contents (perhaps removing the deposit from the bag), and then put the updated value (the empty deposit bag) back in the container. The nature of the container ensures that only one party can manipulate the item at a time.
Reply all
Reply to author
Forward
0 new messages