Graceful node shutdown. Is there a batter way?

260 прегледа
Пређи на прву непрочитану поруку

kraythe

непрочитано,
16. 11. 2016. 13:57:5016.11.16.
– Akka User List
Greetings, I have a node with a number of top level actors and cluster shards that I want to shut down gracefully. I have added the redacted code before for the startup and shutdown via a SystemManagerActor but I am staring at it wondering if there is a better way. A number of questions occur. 

1. Should I have this SystemManagerActor hold onto the refs of the actors it starts and when shutting down send the refs Poison pills, or restart them if the system is not shutting down? 
2. If I go with option 1, is there any way that I can make the Cluster Shard actors children of the SystemManagerActor? I could easily start the other actors as children of the manager. What I am not sure about is the cluster singleton and the cluster shards. It seems to me there is no way I know to create them as children of other actors.
3. If question 2 is impossible, is it at all appropriate for the actor to watch actors that aren't its children? 
4. Any other suggestions? 

Thanks in advance
package actors.system;

public class
SystemManagerActor extends UntypedActor {
   
private static final Logger.ALogger log = Logger.of(SystemManagerActor.class); // play framework logger
    private static ActorSystem system = MyApp.getActorSystem();
   
private static Cluster cluster = Cluster.get(system);

   
private final CompletableFuture<Void> future;

   
// -- Shard regions for cluster sharded actors.
    private final List<ActorRef> shardRegions = Stream.of(
           
ShardedActorA.shardRegion(),
           
ShardedActorB.shardRegion()
   
).collect(toList());

   
// -- Actors that are not sharded.
    private final List<ActorSelection> actors = Stream.of(
           
SimpleActorA.actorSelection(),
           
SimpleActorB.actorSelection(),
           
SingletonActor.actorSelection()
   
).collect(toList());

   
public SystemManagerActor(final CompletableFuture<Void> future) {
       
this.future = future;
       
log.info("Shutting down MyApp ActorSystem actors");
       
// wait 5 seconds for load balancer.
        context().system().scheduler().scheduleOnce(Duration.create(5, SECONDS), self(),
               
SignalMessages.BEGIN_SHUTDOWN, context().dispatcher(), self());
   
}

   
@Override
    public void onReceive(final Object message) throws Throwable {
       
if (message instanceof SignalMessages && SignalMessages.BEGIN_SHUTDOWN.equals(message)) {
           
shardRegions.forEach(ref -> {
                context
().watch(ref);
               
ref.tell(ShardRegion.gracefulShutdownInstance(), getSelf());
           
});
       
} else if (message instanceof Terminated && !shardRegions.isEmpty()) {
           
shardRegions.remove(((Terminated) message).getActor());
           
if (!shardRegions.isEmpty()) return;
           
actors.forEach(actorSelection -> actorSelection.tell(PoisonPill.getInstance(), getSelf()));
           
log.info("Actors Shut down.");
           
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsSnapshot(), ClusterEvent.MemberRemoved.class);
           
cluster.leave(cluster.selfAddress());
       
} else if (message instanceof ClusterEvent.MemberRemoved) {
           
// wait 10 seconds for graceful stop.
            context().system().scheduler().scheduleOnce(Duration.create(10, SECONDS), self(),
                   
SignalMessages.STOP_ACTOR_SYSTEM, context().dispatcher(), self());
       
} else if (message instanceof SignalMessages && SignalMessages.STOP_ACTOR_SYSTEM.equals(message)) {
           
system.terminate();
           
future.complete(null);
       
} else {
            unhandled
(message);
       
}
   
}

    public static CompletableFuture<Void> onStop() {
       
final CompletableFuture<Void> future = new CompletableFuture<>();
       
MyApp.getActorSystem().actorOf(Props.create(SystemManagerActor.class, future));
       
return future;
   
}

    public static void onStart() {
       
system.actorOf(SimpleActorA.props(), SimpleActorA.ACTOR_NAME);
       
system.actorOf(SimpleActorB.props(), SimpleActorB.ACTOR_NAME);
       
SingletonActor.register();
       
ShardedActorA.startShardRegion();
       
ShardedActorB.startShardRegion();
    }

    private enum SignalMessages {
       
BEGIN_SHUTDOWN, STOP_ACTOR_SYSTEM;
   
}
}


kraythe

непрочитано,
17. 11. 2016. 13:52:0917.11.16.
– Akka User List
Well clearly there is a problem. Wet new roll with this we get errors like the following. Any ideas? 

2016-11-17 18:11:40 +0000 - [INFO] - [MigrationManager] [10.77.21.35]:5701 [vtest] [3.7.2] Re-partitioning cluster data... Migration queue size: 271
2016-11-17 18:11:42 +0000 - [INFO] - [MigrationThread] [10.77.21.35]:5701 [vtest] [3.7.2] All migration tasks have been completed, queues are empty.
2016-11-17 18:12:02 +0000 - [INFO] - [JenkinsMonitorActor] PING from Jenkins...
2016-11-17 18:12:58 +0000 - [INFO] - [application] Stopping global actors
2016-11-17 18:12:58 +0000 - [INFO] - [SystemManagerActor] Shutting down MyApp ActorSystem actors
2016-11-17 18:13:03 +0000 - [INFO] - [SystemManagerActor] Actors Shut down.
2016-11-17 18:13:03 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) -  Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Marked address [akka.tcp://my...@10.77.21.35:2551] as [Leaving]
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '1-1019' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '3-750' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [MySQLConnection] Received an error message -> ErrorMessage(1062,#23000,Duplicate entry '2-730' for key 'PRIMARY')
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator/singleton/coordinator -  Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionTerminated] with sequence number [750] for persistenceId [/sharding/UserActivityActorCoordinator].
com
.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '3-750' for key 'PRIMARY'
   at com
.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
   at com
.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
   at io
.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
   at io
.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
   at io
.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
   at io
.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
   at io
.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
   at java
.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator/singleton/coordinator -  Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionTerminated] with sequence number [730] for persistenceId [/sharding/ProductActorCoordinator].
com
.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '2-730' for key 'PRIMARY'
   at com
.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
   at com
.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
   at io
.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
   at io
.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
   at io
.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
   at io
.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
   at io
.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
   at java
.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:03 +0000 - [ERROR] - [PersistentShardCoordinator] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator/singleton/coordinator -  Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardHomeDeallocated] with sequence number [1019] for persistenceId [/sharding/actors.auth.UserAuthenticationActorCoordinator].
com
.github.mauricio.async.db.mysql.exceptions.MySQLException: Error 1062 - #23000 - Duplicate entry '1-1019' for key 'PRIMARY'
   at com
.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:124)
   at com
.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.scala:105)
   at io
.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
   at io
.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
   at io
.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
   at io
.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
   at io
.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
   at io
.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:611)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:552)
   at io
.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:466)
   at io
.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
   at io
.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
   at java
.lang.Thread.run(Thread.java:745)
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) -  Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Leader is moving node [akka.tcp://my...@10.77.21.35:2551] to [Exiting]
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) -  Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Shutting down...
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator -  Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator -  Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [Cluster(akka://myapp)] akka.cluster.Cluster(akka://myapp) -  Cluster Node [akka.tcp://my...@10.77.21.35:2551] - Successfully shut down
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator -  Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator -  Exited [akka.tcp://my...@10.77.21.35:2551]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/actors.auth.UserAuthenticationActorCoordinator -  Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator -  Oldest observed OldestChanged: [akka.tcp://my...@10.77.21.35:2551 -> None]
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/publisher -  Message [akka.cluster.InternalClusterAction$Unsubscribe] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/publisher#-855855895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/daemon/heartbeatSender -  Message [akka.cluster.ClusterEvent$MemberRemoved] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/daemon/heartbeatSender#1873687371] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [LocalActorRef] akka://myapp/system/cluster/core/daemon -  Message [akka.cluster.InternalClusterAction$Unsubscribe] from Actor[akka://myapp/deadLetters] to Actor[akka://myapp/system/cluster/core/daemon#1632944597] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/UserActivityActorCoordinator -  ClusterSingletonManager state change [Oldest -> WasOldest]
2016-11-17 18:13:04 +0000 - [INFO] - [ClusterSingletonManager] akka.tcp://my...@10.77.21.35:2551/system/sharding/ProductActorCoordinator -  ClusterSingletonManager state change [Oldest -> WasOldest]

Guido Medina

непрочитано,
19. 11. 2016. 12:25:2119.11.16.
– Akka User List
I have the following shutdown hook (plus what you have on that actor), it is designed for Ctrl + C on a standalone JVM or kill , if running on a webapp, you have to first call clusler.leave(cluster.selfAddress()) and then system.terminate()
I was having an issue because my shutdown hook was a JVM shutdown hook instead of a SignalIntBarrier making Akka cluster not shutdown properly:

I don't know though if in the case of Akka Sharding will help you, all I can say is don't relay JVM shutdown hooks which was creating me problems:

  // I call this utility method from my main method once akka system is created and my main actor is up,
 
// then it will block until SIGTERM is received, for a webapp you cannot relay on SIGTERM but on some webapp shutdown hook (NOT a JVM hook)
 
public static void registerShutdownHook(ActorSystem system) {
   
final CountDownLatch latch = new CountDownLatch(1);
    system
.registerOnTermination(() -> {
     
try {
       
LogManager.shutdown();
     
} finally {
        latch
.countDown();
     
}
   
});
   
// Waits for Ctrl + C or SIGTERM
   
new SigIntBarrier().await();
   
// Leaves the cluster once SIGTERM is received, MemberRemoved will be received by the actor subscribed which should call system.terminate() immediately.
   
final Cluster cluster = Cluster.get(system);
    cluster
.leave(cluster.selfAddress());
   
try {
      latch
.await();
   
} catch (InterruptedException ignored) {
   
} finally {
      system
.terminate();
   
}
 
}

HTH,
Guido.

Guido Medina

непрочитано,
19. 11. 2016. 15:29:0419.11.16.
– Akka User List
Also, don't forget to verify that the MemberRemoved is you, see in the following example I created for some other test:
https://gist.github.com/guidomedina/fffeaa10d1017cc528a6817f2c4941be

HTH,
Guido.
Одговори свима
Одговори аутору
Проследи
0 нових порука