Cluster Sharding - watching for termination of sharded actors.

508 views
Skip to first unread message

Marcin Sośnicki

unread,
Nov 6, 2015, 5:24:41 AM11/6/15
to Akka User List
Hi,

We are using akka sharding extension for distributing message broker consumer actors over our cluster. Our in house consumer library, uses death watch on the consumers to manage it's state (when one of the actors connecting to the broker is moved to the other node, the supervisor no longer has to worry about handling stuff like reconnections etc.)
The problem that we're seing, is that sometimes, the Terminated message is not received from the sharded actor. That compromises our library internal state and causes problems.
Of course the library itself could be easily fixed, but I just wonder if the deathwatch feature in general shouldn't be used with sharded actors as it may cause problems?

Below I have the simplified code snippet that reproduces the problem:
Akka version: 2.4.0 (akka-actor, akka-cluster, akka-cluster-sharding)
akka-persistence-cassandra: 0.4


object Node {

 
def main(args: Array[String]):Unit = {
    val system
= ActorSystem("ClusterSystem", ConfigFactory.parseString(AkkaConfig.config(args(0).toInt)))
    consumerStart
(system)
    scheduleMessages
(system)
 
}

 
def scheduleMessages(system: ActorSystem): Unit ={
   
implicit val dispatcher = system.dispatcher
   
(0 to 1) foreach { i =>
      system
.scheduler.schedule(0.millis, 5 seconds, ClusterSharding.get(system).shardRegion(Consumer.shardName), SomeMessage(i))
   
}
 
}

 
def consumerStart(system: ActorSystem): Unit = {
    val watcher
: ActorRef = system.actorOf(Props(new Watcher()))
   
ClusterSharding(system).start(
      typeName
= Consumer.shardName,
     
Consumer.props(watcher),
      settings
= ClusterShardingSettings(system),
      extractEntityId
= Consumer.entityExtractor,
      extractShardId
= Consumer.shardExtractor,
      allocationStrategy
= new LeastShardAllocationStrategy(2, 1),
      handOffStopMessage
= Stop
   
)
 
}


}

object Consumer {
 
def props(watcher: ActorRef): Props = Props(new Consumer(watcher))

 
case class SomeMessage(id: Int)

  val shardName
= "testShard"

  val entityExtractor
: ExtractEntityId = {
   
case msg@SomeMessage(id) => (id.toString, msg)
 
}

  val shardExtractor
: ExtractShardId = {
   
case msg@SomeMessage(id) => id.toString
 
}
}

class Consumer(watcher: ActorRef) extends Actor {

 
override def preStart: Unit = {
    println
(s"Starting consumer for ${self.path.name}")
    watcher
! WatchMe(self, self.path.name)
 
}

 
override def postStop: Unit = {
    println
(s"Stopping consumer for ${self.path.name}")
 
}

 
override def receive: Receive = {
   
case SomeMessage(id) =>
      println
(s"Received message for ${id}")
   
case Stop =>
      println
(s"Received stop message for ${self.path.name}")
     
self ! PoisonPill
 
}
}

object Watcher {

 
case class WatchMe(actorRef: ActorRef, id: String)
}

class Watcher extends Actor{
 
override def receive: Receive = {
   
case WatchMe(ref, id) => {
      println
(s"Watching received! Ref: ${ref} Id: ${id}")
      context
.watch(ref)
   
}
   
case Terminated(ref) => println(s"Terminated received! ${ref}")
 
}
}

object AkkaConfig {

  val config
: (Int) => String = port =>
    s
"""akka {
      |  actor {
      |    provider = "
akka.cluster.ClusterActorRefProvider"
      |  }
      |  remote {
      |    log-remote-lifecycle-events = off
      |    netty.tcp {
      |      hostname = "
127.0.0.1"
      |      port = ${port}
      |    }
      |  }
      |
      |  persistence {
      |    journal.plugin = "
cassandra-journal"
      |    snapshot-store.plugin = "
cassandra-snapshot-store"
      |  }
      |
      |  cluster {
      |    min-nr-of-members=1
      |    seed-nodes = [
      |      "
akka.tcp://Cluste...@127.0.0.1:2551",
     
|      "akka.tcp://Cluste...@127.0.0.1:2552"
     
|      ]
     
|    sharding {
     
|       remember-entities = on
     
|       journal-plugin-id = "cassandra-journal"
     
|       snapshot-plugin-id = "cassandra-snapshot-store"
     
|       rebalance-interval = 10 s
     
|    }
     
|    auto-down-unreachable-after = 10s
     
|  }
     
|}
     
|
     
|cassandra-journal {
     
| contact-points = [cassandra01.weave.local]
     
|}
     
|cassandra-snapshot-store {
     
| contact-points = [cassandra01.weave.local]
     
|}
   
""".stripMargin
}
 


After running this example twice simultaneously, once with 2551 main argument and second time with 2552 I'm seing following logs:
Node 2551
Starting consumer for 1
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-1013061113] Id: 1
Received message for 1
Received message for 1
Received message for 1
Received message for 1
Node 2552
Starting consumer for 1
Starting consumer for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#1034416006] Id: 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#26936794] Id: 1
Received message for 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0

As you can see, no termination messages are received.

If I however start the first node, give it some time to start, and only after that start the second one I'm seing:
Node1
Starting consumer for 0
Starting consumer for 1
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-2140659337] Id: 1
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#92401496] Id: 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
[INFO] [11/06/2015 10:05:45.051] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://Cluste...@127.0.0.1:2551] - Node [akka.tcp://Cluste...@127.0.0.1:2552] is JOINING, roles []
[INFO] [11/06/2015 10:05:45.424] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://Cluste...@127.0.0.1:2551] - Leader is moving node [akka.tcp://Cluste...@127.0.0.1:2552] to [Up]
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 1
Received message for 0
Received stop message for 0
Stopping consumer for 0
Terminated received! Actor[akka://ClusterSystem/system/sharding/testShard/0/0#92401496]
Received message for 1
Received message for 1
Received message for 1
Received message for 1

Node2
Starting consumer for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#-930183150] Id: 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0

In this scenario the deathwatch works fine.

I don't know if starting nodes together is the only way to reproduce the problem, I've seen it occurring few times in our OAT environment, and the example above is the only way I could reproduce it locally.

I will appreciate any help on that, maybe I'm missing something obvious, or simply death watch is not guaranteed to work fine in the Sharding extension, but I haven't seen it anywhere in the documentation.

Thanks a lot,
Marcin

Marcin Sośnicki

unread,
Nov 12, 2015, 9:34:17 AM11/12/15
to Akka User List
Hi,

I have done some deeper digging into that and it seems that the issue is not the Terminated message not being received. The sharded actor keeps running on both nodes of the cluster! 
The shard region correctly resolves where to send message so it's hard to spot this as long as you're using only ShardRegion to communicate with it (in that case it will just sit idle on the node that it was supposed to be removed from). 
In other case, when you rely on things like DeathWatch for example, it won't work as  expected as the actor is still alive.

It looks that the issue lies with the Remember Entities feature that was added in 2.4. 
Depending on that setting, either Shard class (persistent-entities=off) or PersistentShard (persistent-entities=on)  is used. 

In a ShardRegion class there are 
var shards = Map.empty[ShardId, ActorRef]
var shardsByRef = Map.empty[ActorRef, ShardId]
properties. 
I assume that there are two of them, just to enable faster lookup, depending whether you're using ShardId or ActorRef as a key. But I also assume that they should be consistent (and they're not). 
What happens in the scenario when the bug manifests (using remember entities=on and therefore PersistentShard class) is that shards map property is updated only after Persistent Actor recovery. 
case ShardInitialized(shardId)               ⇒ initializeShard(shardId, sender())
This introduces a gap where our state is not consistent, shardsRef holds some entries that shards does not, and some decisions are made based on that inconsistent state. 
For example the sharded actors might not get stopped, when the recovery is not finished yet (but the actor is already started so it'll be up eventually).
    case msg @ HandOff(shard) ⇒
      log.debug("HandOff shard [{}]", shard)
      if (shardBuffers.contains(shard)) {
        shardBuffers -= shard
        loggedFullBufferWarning = false
      }
      if (shards.contains(shard)) {
        handingOff += shards(shard)
        shards(shard) forward msg
      } else
        sender() ! ShardStopped(shard)



Is there a reason why shards and shardsByRef are not updated atomically? For example, the getShard method only updates the shardsByRef property, whereas it seems natural to update the shards property in that place as well (as opposed to updating shards property only after the recovery is completed).

I hope that my post is not too confusing, it's easier to show the problem directly in the code rather than posting just some fragments in a post. 

Thanks a lot,
Marcin

Patrik Nordwall

unread,
Nov 16, 2015, 9:39:41 AM11/16/15
to akka...@googlegroups.com
That sounds serious. Please create an issue, and we can continue the investigation there.
Thanks for reporting.
/Patrik

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Marcin Sośnicki

unread,
Nov 16, 2015, 4:40:14 PM11/16/15
to Akka User List
Thanks for the response Patrik, I have created an issue here.

Marcin
Reply all
Reply to author
Forward
0 new messages