Implementing an EventBus in Go

741 views
Skip to first unread message

Kasun Vithanage

unread,
Mar 11, 2019, 12:41:25 AM3/11/19
to golang-nuts
Hi all,

I've experience implementing event buses in Java. 
In Java, I used a singleton where I can register callbacks to methods and fire event from a publisher(maybe from another thread) and propagate it to subscribers.

In Go what would be the best pattern to implement a pub/sub event bus? 
Are channels are a better choice than using callbacks in this scenario(one to many event propagation)?
Or callbacks are better for this? 

(I need to propagate events only to subscribers of the topic)

Regards,
Kasun

Burak Serdar

unread,
Mar 11, 2019, 12:58:17 AM3/11/19
to Kasun Vithanage, golang-nuts
On Sun, Mar 10, 2019 at 10:41 PM Kasun Vithanage <alan...@gmail.com> wrote:
>
> Hi all,
>
> I've experience implementing event buses in Java.
> In Java, I used a singleton where I can register callbacks to methods and fire event from a publisher(maybe from another thread) and propagate it to subscribers.
>
> In Go what would be the best pattern to implement a pub/sub event bus?
> Are channels are a better choice than using callbacks in this scenario(one to many event propagation)?

Are you talking about the event listeners in Java where all listeners
are called synchronously? You can't use channels for something like
this, and a list of callbacks would be the simplest solution. But you
mentioned you might be calling listeners in another thread. Do you
need a synchronous call where publish() will make sure all subscribers
get the event? If so, maybe you can use a mix of channels and
callbacks. You can keep a list of callbacks for listeners in the same
goroutine, and a list of channels for listeners in other goroutines,
and call/write to elements of both list.

> Or callbacks are better for this?
>
> (I need to propagate events only to subscribers of the topic)
>
> Regards,
> Kasun
>
> --
> You received this message because you are subscribed to the Google Groups "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Marcin Romaszewicz

unread,
Mar 11, 2019, 12:59:33 AM3/11/19
to Kasun Vithanage, golang-nuts
Channels are producer/consumer queues, they don't handle one to many broadcasting, you'd need one channel per subscriber then you'd queue the message to each of them. In my opinion, they work more nicely than callbacks, since your handler can run in its own execution context, and not in the callback invoker's execution context. They are really nice for round robin between multiple receivers, though.

Why write it yourself? Check out the Go bindings for Zeromq, which supports pub-sub in exactly the way you mention. I've used this thing in production to route > 200,000 pub/sub messages per second. It may be overkill if you're within a single process, though.

-- Marcin


--

Kasun Vithanage

unread,
Mar 11, 2019, 1:07:11 AM3/11/19
to golang-nuts
Yes, when a publisher publish for a topic it should be routed to all subscribers to that topic :)

Kasun Vithanage

unread,
Mar 11, 2019, 1:10:00 AM3/11/19
to golang-nuts
My intention is to implement it myself. I was wondering whether to go plain old callback way where a subscriber can subscribe to a topic and callback is fired when a topic is updated.
I was really wondering how to utilize go's channels to improve this pattern and gain performance.

My final intention is to implement a pub/sub server myself for fun :)  

Randall O'Reilly

unread,
Mar 11, 2019, 1:12:44 AM3/11/19
to Kasun Vithanage, golang-nuts
for reference, here is a very basic Qt-style signal / slot style subscribe / publish model in Go, just using a map of receivers and their receiver callback functions:
https://github.com/goki/ki/blob/master/ki/signal.go

- Randy

Kasun Vithanage

unread,
Mar 11, 2019, 1:41:54 AM3/11/19
to golang-nuts
Thanks, I was wondering whether we can replace callbacks with channels.
The same way it is used in here, instead of calling a function we can also send the data to a channel associated with the receiver.

like

var subs = make(map[string] []chan string)

func publish(topic string, data string) {
go func() {
if chans, found := subs[topic]; found {
for _, ch := range chans {
ch <- data
}
}
}()
}

func main()  {
ch := make(chan string)
ch2 := make(chan string)
subs["simple"] = append(subs["simple"], ch)
subs["simple"] = append(subs["simple"], ch2)

go func() {
time.Sleep(1 * time.Second)
publish("simple", "its so simple")
}()

fmt.Println(<-ch)
fmt.Println(<-ch2)
}

Kasun Vithanage

unread,
Mar 11, 2019, 2:52:53 AM3/11/19
to golang-nuts
Hi were you talking about something like this? I used channels over callbacks.
I quickly prototyped it. Is this approach better or we have to do improvements?

package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type Data struct {
data interface{}
topic string
}

type ChannelSlice []chan Data

type EventBus struct {
subscribers map[string] ChannelSlice
rm sync.RWMutex
}

func (eb *EventBus)publish(topic string, data interface{}) {
eb.rm.RLock()
if ch, found := eb.subscribers[topic]; found {
go _boradcastEvent(Data{data: data, topic:topic}, ch)
}
eb.rm.RUnlock()
}

func _boradcastEvent(data Data, sl ChannelSlice)  {
for _, ch := range sl {
ch <- data
}
}

func (eb *EventBus)subscribe(topic string, ch chan Data)  {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([] chan Data{}, ch)
}
eb.rm.Unlock()
}

var eb = &EventBus{
subscribers: map[string]ChannelSlice{},
}

func printData(ch string, data Data)  {
fmt.Printf("Channel: %s; Topic: %s; Data: %v\n", ch, data.topic, data.data)
}

func publisTo(topic string, data string)  {
for {
eb.publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}

func main()  {
ch1 := make(chan Data)
ch2 := make(chan Data)
ch3 := make(chan Data)

eb.subscribe("topic1", ch1)
eb.subscribe("topic2", ch2)
eb.subscribe("topic2", ch3)

go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")

for {
select {
case d := <-ch1:
go printData("ch1", d)
case d := <-ch2:
go printData("ch2", d)
case d := <-ch3:
go printData("ch3", d)
}
}
}




On Monday, March 11, 2019 at 10:29:33 AM UTC+5:30, Marcin Romaszewicz wrote:

Amnon Baron Cohen

unread,
Mar 11, 2019, 3:28:17 AM3/11/19
to golang-nuts

Kasun Vithanage

unread,
Mar 13, 2019, 12:22:25 AM3/13/19
to golang-nuts
I wrote an article about the topic. Feel free to visit it

Reply all
Reply to author
Forward
0 new messages