package actors.system;
public class SystemManagerActor extends UntypedActor {
private static final Logger.ALogger log = Logger.of(SystemManagerActor.class); // play framework logger
private static ActorSystem system = MyApp.getActorSystem();
private static Cluster cluster = Cluster.get(system);
private final CompletableFuture<Void> future;
// -- Shard regions for cluster sharded actors.
private final List<ActorRef> shardRegions = Stream.of(
ShardedActorA.shardRegion(),
ShardedActorB.shardRegion()
).collect(toList());
// -- Actors that are not sharded.
private final List<ActorSelection> actors = Stream.of(
SimpleActorA.actorSelection(),
SimpleActorB.actorSelection(),
SingletonActor.actorSelection()
).collect(toList());
public SystemManagerActor(final CompletableFuture<Void> future) {
this.future = future;
log.info("Shutting down MyApp ActorSystem actors");
// wait 5 seconds for load balancer.
context().system().scheduler().scheduleOnce(Duration.create(5, SECONDS), self(),
SignalMessages.BEGIN_SHUTDOWN, context().dispatcher(), self());
}
@Override
public void onReceive(final Object message) throws Throwable {
if (message instanceof SignalMessages && SignalMessages.BEGIN_SHUTDOWN.equals(message)) {
shardRegions.forEach(ref -> {
context().watch(ref);
ref.tell(ShardRegion.gracefulShutdownInstance(), getSelf());
});
} else if (message instanceof Terminated && !shardRegions.isEmpty()) {
shardRegions.remove(((Terminated) message).getActor());
if (!shardRegions.isEmpty()) return;
actors.forEach(actorSelection -> actorSelection.tell(PoisonPill.getInstance(), getSelf()));
log.info("Actors Shut down.");
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsSnapshot(), ClusterEvent.MemberRemoved.class);
cluster.leave(cluster.selfAddress());
} else if (message instanceof ClusterEvent.MemberRemoved) {
// wait 10 seconds for graceful stop.
context().system().scheduler().scheduleOnce(Duration.create(10, SECONDS), self(),
SignalMessages.STOP_ACTOR_SYSTEM, context().dispatcher(), self());
} else if (message instanceof SignalMessages && SignalMessages.STOP_ACTOR_SYSTEM.equals(message)) {
system.terminate();
future.complete(null);
} else {
unhandled(message);
}
}
public static CompletableFuture<Void> onStop() {
final CompletableFuture<Void> future = new CompletableFuture<>();
MyApp.getActorSystem().actorOf(Props.create(SystemManagerActor.class, future));
return future;
}
public static void onStart() {
system.actorOf(SimpleActorA.props(), SimpleActorA.ACTOR_NAME);
system.actorOf(SimpleActorB.props(), SimpleActorB.ACTOR_NAME);
SingletonActor.register();
ShardedActorA.startShardRegion();
ShardedActorB.startShardRegion();
}
private enum SignalMessages {
BEGIN_SHUTDOWN, STOP_ACTOR_SYSTEM;
}
}
2016-11-17 18:11:40 +0000 - [INFO] - [MigrationManager] [10.77.21.35]:5701 [vtest] [3.7.2] Re-partitioning cluster data... Migration queue size: 271
2016-11-17 18:11:42 +0000 - [INFO] - [MigrationThread] [10.77.21.35]:5701 [vtest] [3.7.2] All migration tasks have been completed, queues are empty.
2016-11-17 18:12:02 +0000 - [INFO] - [JenkinsMonitorActor] PING from Jenkins...
2016-11-17 18:12:58 +0000 - [INFO] - [application] Stopping global actors
2016-11-17 18:12:58 +0000 - [INFO] - [SystemManagerActor] Shutting down MyApp ActorSystem actors
2016-11-17 18:13:03 +0000 - [INFO] - [SystemManagerActor] Actors Shut down.
2016-11-17 18:13:03 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) - Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Marked address [akka.tcp://my...@10.77.21.35:2551] as [Leaving]
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '1-1019' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '3-750' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '2-730' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator/singleton/coordinator - Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionTerminated] with sequence number [750] for persistenceId [/sharding/UserActivityActorCoordinator].
com.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '3-750' for key 'PRIMARY'
at com.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
at com.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at java.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator/singleton/coordinator - Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionTerminated] with sequence number [730] for persistenceId [/sharding/ProductActorCoordinator].
com.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '2-730' for key 'PRIMARY'
at com.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
at com.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at java.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator/singleton/coordinator - Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardHomeDeallocated] with sequence number [1019] for persistenceId [/sharding/actors.auth.UserAuthenticationActorCoordinator].
com.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '1-1019' for key 'PRIMARY'
at com.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
at com.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at java.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) - Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Leader is moving node [akka.tcp://my...@10.77.21.35:2551] to [Exiting]
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) - Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Shutting down...
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator - Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator - Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) - Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Successfully shut down
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator - Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator - Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator - Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator - Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/publisher - Message [akka.cluster.InternalClusterAction$Unsubscribe] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/publisher#-855855895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/daemon/heartbeatSender - Message [akka.cluster.ClusterEvent$MemberRemoved] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/daemon/heartbeatSender#1873687371] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/daemon - Message [akka.cluster.InternalClusterAction$Unsubscribe] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/daemon#1632944597] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator - ClusterSingletonManager state change [Oldest -> WasOldest]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator - ClusterSingletonManager state change [Oldest -> WasOldest]
// I call this utility method from my main method once akka system is created and my main actor is up,
// then it will block until SIGTERM is received, for a webapp you cannot relay on SIGTERM but on some webapp shutdown hook (NOT a JVM hook)
public static void registerShutdownHook(ActorSystem system) {
final CountDownLatch latch = new CountDownLatch(1);
system.registerOnTermination(() -> {
try {
LogManager.shutdown();
} finally {
latch.countDown();
}
});
// Waits for Ctrl + C or SIGTERM
new SigIntBarrier().await();
// Leaves the cluster once SIGTERM is received, MemberRemoved will be received by the actor subscribed which should call system.terminate() immediately.
final Cluster cluster = Cluster.get(system);
cluster.leave(cluster.selfAddress());
try {
latch.await();
} catch (InterruptedException ignored) {
} finally {
system.terminate();
}
}