object Node {
def main(args: Array[String]):Unit = {
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(AkkaConfig.config(args(0).toInt)))
consumerStart(system)
scheduleMessages(system)
}
def scheduleMessages(system: ActorSystem): Unit ={
implicit val dispatcher = system.dispatcher
(0 to 1) foreach { i =>
system.scheduler.schedule(0.millis, 5 seconds, ClusterSharding.get(system).shardRegion(Consumer.shardName), SomeMessage(i))
}
}
def consumerStart(system: ActorSystem): Unit = {
val watcher: ActorRef = system.actorOf(Props(new Watcher()))
ClusterSharding(system).start(
typeName = Consumer.shardName,
Consumer.props(watcher),
settings = ClusterShardingSettings(system),
extractEntityId = Consumer.entityExtractor,
extractShardId = Consumer.shardExtractor,
allocationStrategy = new LeastShardAllocationStrategy(2, 1),
handOffStopMessage = Stop
)
}
}
object Consumer {
def props(watcher: ActorRef): Props = Props(new Consumer(watcher))
case class SomeMessage(id: Int)
val shardName = "testShard"
val entityExtractor: ExtractEntityId = {
case msg@SomeMessage(id) => (id.toString, msg)
}
val shardExtractor: ExtractShardId = {
case msg@SomeMessage(id) => id.toString
}
}
class Consumer(watcher: ActorRef) extends Actor {
override def preStart: Unit = {
println(s"Starting consumer for ${self.path.name}")
watcher ! WatchMe(self, self.path.name)
}
override def postStop: Unit = {
println(s"Stopping consumer for ${self.path.name}")
}
override def receive: Receive = {
case SomeMessage(id) =>
println(s"Received message for ${id}")
case Stop =>
println(s"Received stop message for ${self.path.name}")
self ! PoisonPill
}
}
object Watcher {
case class WatchMe(actorRef: ActorRef, id: String)
}
class Watcher extends Actor{
override def receive: Receive = {
case WatchMe(ref, id) => {
println(s"Watching received! Ref: ${ref} Id: ${id}")
context.watch(ref)
}
case Terminated(ref) => println(s"Terminated received! ${ref}")
}
}
object AkkaConfig {
val config: (Int) => String = port =>
s"""akka {
| actor {
| provider = "akka.cluster.ClusterActorRefProvider"
| }
| remote {
| log-remote-lifecycle-events = off
| netty.tcp {
| hostname = "127.0.0.1"
| port = ${port}
| }
| }
|
| persistence {
| journal.plugin = "cassandra-journal"
| snapshot-store.plugin = "cassandra-snapshot-store"
| }
|
| cluster {
| min-nr-of-members=1
| seed-nodes = [
| "akka.tcp://Cluste...@127.0.0.1:2551",
| "akka.tcp://Cluste...@127.0.0.1:2552"
| ]
| sharding {
| remember-entities = on
| journal-plugin-id = "cassandra-journal"
| snapshot-plugin-id = "cassandra-snapshot-store"
| rebalance-interval = 10 s
| }
| auto-down-unreachable-after = 10s
| }
|}
|
|cassandra-journal {
| contact-points = [cassandra01.weave.local]
|}
|cassandra-snapshot-store {
| contact-points = [cassandra01.weave.local]
|}
""".stripMargin
}
Starting consumer for 1
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-1013061113] Id: 1
Received message for 1
Received message for 1
Received message for 1
Received message for 1
Starting consumer for 1
Starting consumer for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#1034416006] Id: 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#26936794] Id: 1
Received message for 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0
Starting consumer for 0
Starting consumer for 1
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-2140659337] Id: 1
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#92401496] Id: 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
[INFO] [11/06/2015 10:05:45.051] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://Cluste...@127.0.0.1:2551] - Node [akka.tcp://Cluste...@127.0.0.1:2552] is JOINING, roles []
[INFO] [11/06/2015 10:05:45.424] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://Cluste...@127.0.0.1:2551] - Leader is moving node [akka.tcp://Cluste...@127.0.0.1:2552] to [Up]
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 1
Received message for 0
Received stop message for 0
Stopping consumer for 0
Terminated received! Actor[akka://ClusterSystem/system/sharding/testShard/0/0#92401496]
Received message for 1
Received message for 1
Received message for 1
Received message for 1
Starting consumer for 0
Watching received! Ref: Actor[akka://ClusterSystem/system/sharding/testShard/0/0#-930183150] Id: 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0
var shards = Map.empty[ShardId, ActorRef]var shardsByRef = Map.empty[ActorRef, ShardId]
case ShardInitialized(shardId) ⇒ initializeShard(shardId, sender())
case msg @ HandOff(shard) ⇒ log.debug("HandOff shard [{}]", shard) if (shardBuffers.contains(shard)) { shardBuffers -= shard loggedFullBufferWarning = false } if (shards.contains(shard)) { handingOff += shards(shard) shards(shard) forward msg } else sender() ! ShardStopped(shard)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Patrik Nordwall
Typesafe - Reactive apps on the JVM
Twitter: @patriknw