A generic lock-free implementation of the "Token-Bucket" algorithm

1,553 views
Skip to first unread message

Tomás Senart

unread,
Feb 3, 2014, 9:30:34 AM2/3/14
to golan...@googlegroups.com
This package provides a generic lock-free implementation of the "Token bucket" algorithm where the handling of non-conformity is left to the user. Read more about in this Wikipedia page.
I would love to get some constructive feedback, please! https://github.com/tsenart/tb

Tomás

Donovan Hide

unread,
Feb 3, 2014, 10:10:05 AM2/3/14
to Tomás Senart, golang-nuts
Interesting take on rate limiting per connection. I imagine real world scenarios would involve expiring connections that haven't been used in while. Perhaps this could be added to your API? Would also be interesting to benchmark the choice of a goroutine per connection filling the bucket vs a single goroutine managing the filling of all buckets, for a large number of connections.




--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Tomás Senart

unread,
Feb 3, 2014, 11:51:12 AM2/3/14
to golan...@googlegroups.com, Tomás Senart
If you look closer, you can see that this package is completely decoupled from the concept of connections. The Take and Throttle methods take arbitrary quantities (messages, connections, bytes, packets, whatever) as the in argument, thus the "generic" part of the package description.

Nick Craig-Wood

unread,
Feb 3, 2014, 12:40:06 PM2/3/14
to Tomás Senart, golan...@googlegroups.com
On 03/02/14 14:30, Tomás Senart wrote:
> This package provides a generic lock-free implementation of the "Token
> bucket" algorithm where the handling of non-conformity is left to the
> user. Read more about in this Wikipedia page
> <http://en.wikipedia.org/wiki/Token_bucket>.
> I would love to get some constructive feedback,
> please! https://github.com/tsenart/tb

Looks very useful! Here are some notes I made on a quick code review...

....

I don't like your example which uses a global variable DefaultThrottler
- I think you should get rid of that and get the user to make their own.

....


IMHO locks look nicer when anonymous

type Throttler struct {
mu sync.RWMutex
buckets map[string]*Bucket
}

t.mu.RLock()
b := t.buckets[key]
t.mu.RUnlock()

Becomes

type Throttler struct {
sync.RWMutex
buckets map[string]*Bucket
}

t.RLock()
b := t.buckets[key]
t.RUnlock()

....

Your Stop() methods are leaking go routines - from the docs
http://golang.org/pkg/time/#Ticker.Stop

Stop turns off a ticker. After Stop, no more ticks will be sent. Stop
does not close the channel, to prevent a read from the channel
succeeding incorrectly.

So you'll want to close the channel too in order to stop the go routines
probably. The documentation should state that a go routine is started
and you'll need to call Stop() to stop it.

....

It isn't clear to me from the docs exactly what happens when there
aren't enough tokens in the bucket. Do those tokens get taken out the
bucket or not? I would hope not, but I'm not sure from this.

// Take will take n tokens out of the bucket. If there aren't enough
// tokens, the difference is returned. Otherwise n is returned.
// This method is thread-safe.
func (b *Bucket) Take(n int64) int64 {

I would have though a better API would be something like this

// Take will attempt to take n tokens out of the bucket. If there
// aren't enough in the bucket it returns false, otherwise it removes
// the tokens from the bucket and returns true.
// This method is thread-safe.
func (b *Bucket) Take(n int64) bool {


--
Nick Craig-Wood <ni...@craig-wood.com> -- http://www.craig-wood.com/nick

Tomás Senart

unread,
Feb 3, 2014, 12:56:37 PM2/3/14
to golan...@googlegroups.com, Tomás Senart
I generally agree with this whenever I am not writing a library, but that would expose the lock publicly which I don't want.


....

Your Stop() methods are leaking go routines - from the docs
http://golang.org/pkg/time/#Ticker.Stop

Stop turns off a ticker. After Stop, no more ticks will be sent. Stop
does not close the channel, to prevent a read from the channel
succeeding incorrectly.

So you'll want to close the channel too in order to stop the go routines
probably.  The documentation should state that a go routine is started
and you'll need to call Stop() to stop it.

Ohhh... Thanks for this one. I thought Stop closed the channel.
I will change and add the documentation you suggested.
 

....

It isn't clear to me from the docs exactly what happens when there
aren't enough tokens in the bucket.  Do those tokens get taken out the
bucket or not?  I would hope not, but I'm not sure from this.

If you have 0 tokens in the Bucket and you try to "take" any number of tokens, 0 tokens will be taken.
If you have 10 tokens in the Bucket and you "take" 5 tokens then 5 tokens will be taken.
If you have 10 tokens in the Bucket and you try to "take" 15 then only 10 tokens will be taken.

How do you suggest the documentation to reflect this better?


// Take will take n tokens out of the bucket. If there aren't enough
// tokens, the difference is returned. Otherwise n is returned.
// This method is thread-safe.
func (b *Bucket) Take(n int64) int64 {

I would have though a better API would be something like this

This wouldn't allow for partial success, as described above. If we're processing 1000 items and only 500 are allowed through, then the caller wants to process the first 500 differently from the last 500.
 

Tomás Senart

unread,
Feb 3, 2014, 1:06:08 PM2/3/14
to golan...@googlegroups.com, Tomás Senart
It turns out that the provided channel is a receive only channel because there aren't actually any go-routines launched for the ticker.

Donovan Hide

unread,
Feb 3, 2014, 3:01:45 PM2/3/14
to Tomás Senart, golang-nuts
Slightly cleaner Throttle method:

func (t *Throttler) Throttle(key string, in, rate int64) (out int64) {
t.mu.RLock()
defer t.mu.RUnlock()
b, ok := t.buckets[key]
if !ok {
b = NewBucket(rate)
t.buckets[key] = b
}
return b.Take(in)
}


--

Donovan Hide

unread,
Feb 3, 2014, 3:03:00 PM2/3/14
to Tomás Senart, golang-nuts
Oops, ignore! You need the write lock...

Benjamin Measures

unread,
Feb 3, 2014, 3:50:52 PM2/3/14
to golan...@googlegroups.com
On Monday, 3 February 2014 14:30:34 UTC, Tomás Senart wrote:
This package provides a generic lock-free implementation of the "Token bucket" algorithm [...]
I would love to get some constructive feedback, please! https://github.com/tsenart/tb

I'm curious as to why you used sync/atomic to implement this instead of channels. For example, (not my) https://gist.github.com/bgentry/3876453

Tomás Senart

unread,
Feb 3, 2014, 4:12:38 PM2/3/14
to golan...@googlegroups.com
My solution allows arbitrary flows that can be expressed as a number (messages, bytes, packets, etc) to be throttled in a user defined way, instead of actually sleeping inside the library.
This characteristic gives the word "generic" to the package description.
The "lock-free" part of the description refers to the use of lock-free concurrent programming techniques (CAS loop). http://preshing.com/20120612/an-introduction-to-lock-free-programming/

Tomás Senart

unread,
Feb 3, 2014, 8:26:01 PM2/3/14
to golan...@googlegroups.com

Nick Craig-Wood

unread,
Feb 4, 2014, 3:45:19 AM2/4/14
to Tomás Senart, golan...@googlegroups.com
On 03/02/14 18:06, Tomás Senart wrote:
> On Monday, February 3, 2014 5:56:37 PM UTC, Tomás Senart wrote:
> On Monday, February 3, 2014 5:40:06 PM UTC, Nick Craig-Wood wrote:
> IMHO locks look nicer when anonymous
> I generally agree with this whenever I am not writing a library, but
> that would expose the lock publicly which I don't want.

Good point

> Your Stop() methods are leaking go routines - from the docs
> http://golang.org/pkg/time/#Ticker.Stop
> <http://golang.org/pkg/time/#Ticker.Stop>
>
> Stop turns off a ticker. After Stop, no more ticks will be sent.
> Stop
> does not close the channel, to prevent a read from the channel
> succeeding incorrectly.
>
> So you'll want to close the channel too in order to stop the go
> routines
> probably. The documentation should state that a go routine is
> started
> and you'll need to call Stop() to stop it.
>
>
> Ohhh... Thanks for this one. I thought Stop closed the channel.
> I will change and add the documentation you suggested.
>
>
> It turns out that the provided channel is a receive only channel because
> there aren't actually any go-routines launched for the ticker.
> http://golang.org/src/pkg/time/tick.go#L20

I was talking about the goroutine you start which runs this

func (b *Bucket) fill(capacity int64) {
for _ = range b.ticker.C {
if tokens := atomic.LoadInt64(&b.tokens); tokens < capacity {
atomic.AddInt64(&b.tokens, 1)
}
}
}

This go routine is never stopped - the for{} loop never exits. I was
thinking you could close the ticker to stop it but obviously not

> It isn't clear to me from the docs exactly what happens when there
> aren't enough tokens in the bucket. Do those tokens get taken
> out the
> bucket or not? I would hope not, but I'm not sure from this.
>
>
> If you have 0 tokens in the Bucket and you try to "take" any number
> of tokens, 0 tokens will be taken.
> If you have 10 tokens in the Bucket and you "take" 5 tokens then 5
> tokens will be taken.
> If you have 10 tokens in the Bucket and you try to "take" 15 then
> only 10 tokens will be taken.
>
> How do you suggest the documentation to reflect this better?

Why don't you put exactly what you wrote above in? It is a nice
explanation!

> // Take will take n tokens out of the bucket. If there aren't
> enough
> // tokens, the difference is returned. Otherwise n is returned.
> // This method is thread-safe.
> func (b *Bucket) Take(n int64) int64 {
>
> I would have though a better API would be something like this
>
>
> This wouldn't allow for partial success, as described above. If
> we're processing 1000 items and only 500 are allowed through, then
> the caller wants to process the first 500 differently from the last 500.
>
>
>
> // Take will attempt to take n tokens out of the bucket. If there
> // aren't enough in the bucket it returns false, otherwise it
> removes
> // the tokens from the bucket and returns true.
> // This method is thread-safe.
> func (b *Bucket) Take(n int64) bool {

Ah I was perhaps thinking of a different use. Say I have a channel of
1000 bytes per second, so I put 1000 tokens per second in to the bucket.
When I come to send a packet of 100 bytes say I try to get 100 tokens
from the bucket. I want to be able to send all the packet or none of it,
having a number of tokens < 100 returned is no use in that case.

Tomás Senart

unread,
Feb 4, 2014, 7:15:47 AM2/4/14
to golan...@googlegroups.com, Tomás Senart
You're right, I have to implement some sort of closing mechanism for short lived buckets. Thanks for the insight.

You can easily handle your case by comparing the number of tokens taken with the number of bytes you have to send.
In case there aren't enough tokens, you wait for more tokens to be put into the bucket. Does this make sense?

bs := make([]byte, 100)
for {
    if out := b.Take(int64(len(bs)); out < int64(len(bs)) {
        time.Sleep(time.Duration(1e9/(len(bs) - out)))
        continue
    }
    n, err := w.Write(bs)
    // ...

Tomás Senart

unread,
Feb 4, 2014, 8:01:46 AM2/4/14
to golan...@googlegroups.com, Tomás Senart
Scratch that, as you'd waste tokens. I think you can accumulate tokens until you have enough of them.
Please validate this idea! :)
You could then rewrite the above example as:

b := tb.NewBucket(1000)
bs := make([]byte, 100)

var tokens int64
for {
    tokens += b.Take(int64(len(bs))-tokens)
    if tokens < int64(len(bs)) {
        time.Sleep(time.Duration(1e9/(1000 - tokens)))
        continue
    }
    break
} 

Tomás Senart

unread,
Feb 4, 2014, 8:42:57 AM2/4/14
to golan...@googlegroups.com, Tomás Senart

Nick Craig-Wood

unread,
Feb 4, 2014, 12:00:18 PM2/4/14
to Tomás Senart, golan...@googlegroups.com
On 04/02/14 13:01, Tomás Senart wrote:
> On Tuesday, February 4, 2014 12:15:47 PM UTC, Tomás Senart wrote:
>
> On Tuesday, February 4, 2014 8:45:19 AM UTC, Nick Craig-Wood wrote:
> Ah I was perhaps thinking of a different use. Say I have a
> channel of
> 1000 bytes per second, so I put 1000 tokens per second in to the
> bucket.
> When I come to send a packet of 100 bytes say I try to get 100
> tokens
> from the bucket. I want to be able to send all the packet or
> none of it,
> having a number of tokens < 100 returned is no use in that case.
>
> Scratch that, as you'd waste tokens. I think you can accumulate tokens
> until you have enough of them.
> Please validate this idea! :)
> You could then rewrite the above example as:
>
> b := tb.NewBucket(1000)
> bs := make([]byte, 100)
>
> var tokens int64
> for {
> tokens += b.Take(int64(len(bs))-tokens)
> if tokens < int64(len(bs)) {
> time.Sleep(time.Duration(1e9/(1000 - tokens)))
> continue
> }
> break
> }
> n, err := w.Write(bs)
> // ...

That would work very well provided you wanted to delay the packet until
you could send it. However if you wanted to drop the packet then I
don't see a way of doing that without leaking tokens. Maybe you could
make a Put() method to put some tokens back into the bucket?

Nick Craig-Wood

unread,
Feb 4, 2014, 12:00:45 PM2/4/14
to Tomás Senart, golan...@googlegroups.com
On 04/02/14 13:42, Tomás Senart wrote:
> Here's the fix for the leaking
> go-routines: https://github.com/tsenart/tb/commit/3fbc89c979cf909d788da352e3f1a5681927bc05

Looks great :-)

Tomás Senart

unread,
Feb 5, 2014, 1:40:35 PM2/5/14
to Nick Craig-Wood, golan...@googlegroups.com
After thinking a bit about it, I am pretty sure that having concurrent Take -> Puts would preserve the overall throttling rate over time, despite the spikes. I will add the Put method.

Tomás Senart

unread,
Feb 6, 2014, 1:54:54 PM2/6/14
to golan...@googlegroups.com, Nick Craig-Wood
Besides adding the Put method, I parameterised the frequency with which the filler go-routines tick.
In Throttler, all buckets share a single tick loop which makes it possible to not peg the CPU with large numbers of Buckets.
It's possible for anyone to create their own kind of Bucket aggregator with a single tick-loop by passing 0 to each bucket hz parameter.

http://godoc.org/github.com/tsenart/tb

Tomás Senart

unread,
Feb 6, 2014, 7:13:05 PM2/6/14
to golan...@googlegroups.com, Tomás Senart
Reply all
Reply to author
Forward
0 new messages