DistributedPubSubExtension, subscriber doesn't receive published message

89 views
Skip to first unread message

Kostas kougios

unread,
Jun 23, 2015, 2:06:47 AM6/23/15
to akka...@googlegroups.com
Hi, using akka cluster and DistributedPubSubExtension, I am creating a topic.

The receiving actor (driver) subscribes to the topic:

private val mediator = DistributedPubSubExtension(system).mediator
mediator ! Subscribe(Subscriptions.IndexActorAvailability, self)

override def receive = {
case SubscribeAck(subscribe) =>
context become ready
...

The publishing actor (server) publishes a msg during it's startup:

private val mediator = DistributedPubSubExtension(context.system).mediator
mediator ! Publish(Subscriptions.IndexActorAvailability, IAmAvailable(index, self))

But the driver actor never receives it.

Some notes:

1. "context become ready" is certainly executed before the publishing actor publishes the msg (I've debugged it)

2. The actors context.system are different (akka - cluster) but the rest of the cluster communicates fine.

3. all run under a testcase in the same jvm

Here is my config:

akka {
log-dead-letters-during-shutdown = off
loglevel = "INFO"
cluster {
gossip-interval = 100 ms
leader-actions-interval = 100 ms
seed-nodes = [
"akka.tcp://testS...@127.0.0.1:2700",
"akka.tcp://testS...@127.0.0.1:2701"
]
}
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
}

server1 {
akka {
remote {
netty.tcp {
port = 2700
}
}
cluster {
roles = ["index-server"]
}
}
}

server2 {
akka {
remote {
netty.tcp {
port = 2701
}
}
cluster {
roles = ["index-server"]
}
}
}

driver {
akka {
remote {
netty.tcp {
port = 2702
}
}
cluster {
roles = ["driver"]
}
}
}

Patrik Nordwall

unread,
Jun 24, 2015, 8:03:37 AM6/24/15
to akka...@googlegroups.com
On Tue, Jun 23, 2015 at 8:06 AM, Kostas kougios <kostas....@googlemail.com> wrote:
Hi, using akka cluster and DistributedPubSubExtension, I am creating a topic.

The receiving actor (driver) subscribes to the topic:

private val mediator = DistributedPubSubExtension(system).mediator
mediator ! Subscribe(Subscriptions.IndexActorAvailability, self)

override def receive = {
case SubscribeAck(subscribe) =>
context become ready
...

The publishing actor (server) publishes a msg during it's startup:

private val mediator = DistributedPubSubExtension(context.system).mediator
mediator ! Publish(Subscriptions.IndexActorAvailability, IAmAvailable(index, self))

But the driver actor never receives it.

Some notes:

1. "context become ready" is certainly executed before the publishing actor publishes the msg (I've debugged it)

What is your definition of before, here?
The registry of subscribers is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds.

/Patrik
 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Konstantinos Kougios

unread,
Jun 25, 2015, 2:16:59 AM6/25/15
to akka...@googlegroups.com
Ok, thanks, a Thread.sleep(2000) sorted it out. But now we go back to the situation similar to my other post. How do I kickstart that consistency. Currently my test waits all clusters to join and then sleeps for 2 secs for the DistributedPubSub to become consistent. The good news is all my tests now pass :)

awaitCond(
   cluster1.state.members.size == 3 && cluster2.state.members.size == 3 && driverCluster.state.members.size == 3,
   60 seconds
)

Thread.sleep(2000)
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/PHVO7ueaNrU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Patrik Nordwall

unread,
Jun 25, 2015, 8:04:52 AM6/25/15
to akka...@googlegroups.com
Reduce 

akka.contrib.cluster.pub-sub.gossip-interval



In 2.4-M1 there is a GetTopics message that could be used together with awaitAssert to query it until it has the expected topics.

Konstantinos Kougios

unread,
Jun 26, 2015, 1:42:26 AM6/26/15
to akka...@googlegroups.com
Thanks, gossip-interval works, but also tried 2.4-M1. I get a compilation error that DistributedPubSubExtension is not found and can't find DistributedPubSubExtension class in those deps:

http://search.maven.org/#search|ga|1|c%3A%22DistributedPubSubExtension%22

Patrik Nordwall

unread,
Jun 26, 2015, 3:03:09 AM6/26/15
to akka...@googlegroups.com
On Fri, Jun 26, 2015 at 7:42 AM, 'Konstantinos Kougios' via Akka User List <akka...@googlegroups.com> wrote:
Thanks, gossip-interval works, but also tried 2.4-M1. I get a compilation error that DistributedPubSubExtension is not found and can't find DistributedPubSubExtension class in those deps:

http://search.maven.org/#search|ga|1|c%3A%22DistributedPubSubExtension%22

Konstantinos Kougios

unread,
Jun 29, 2015, 2:07:32 AM6/29/15
to akka...@googlegroups.com
Ok thanks, updated akka and waiting for all mediators to be aware of the topic
Reply all
Reply to author
Forward
0 new messages