import akka.actor.*;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Router;
-- removed company imports;
import com.zaxxer.hikari.HikariDataSource;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import javax.persistence.EntityManagerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import static akka.actor.ActorRef.noSender;
import static akka.cluster.ClusterEvent.initialStateAsEvents;
-- removed company imports;
/**
* @author Guido Medina, created on 17/03/15.
*/
public abstract class SupervisorAbstractActor<
K extends Serializable,
T extends GenericEntity<K, T>,
S extends SupervisorAbstractActor<K, T, S, C>,
C extends ChildAbstractActor<K, T, S, C>> extends DefaultAbstractActor {
public static final String ENGINE_DISPATCHER = "engine.dispatcher";
public static final String DEFAULT_DISPATCHER = "akka.actor.default-dispatcher";
public final SupervisorKey key;
public final HikariDataSource ds;
public final EntityManagerFactory emf;
public final LoggingAdapter log;
final ConcurrentMap<SupervisorType, Router> routers = newConcurrentMap();
final ComputingConcurrentMap<SupervisorType, ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>>>
actors = newComputingConcurrentMap(supervisorType -> newComputingConcurrentMap(supervisorKey -> newConcurrentMap()));
public SupervisorAbstractActor(SupervisorType type) {
final ActorSystem system = context().system();
log = Logging.getLogger(system, this);
key = new SupervisorKey(type, self());
try {
ds = createDataSource(system.settings().config());
emf = createEntityManagerFactory(ds);
} catch (Exception e) {
log.error("Error starting supervisor " + key, e);
system.terminate();
throw e;
}
Cluster.get(system).subscribe(key.supervisorRef, initialStateAsEvents(), MemberEvent.class);
}
protected abstract ChildActorCreator<K, T, S, C> creator(T entity);
protected ActorRef createChildActor(ActorContext context, T entity) {
return context.actorOf(Props.create(creator(entity)).withDispatcher(DEFAULT_DISPATCHER));
}
@Override
public SupervisorStrategy supervisorStrategy() {
return ALWAYS_RESUME_STRATEGY;
}
@Override
public void postStop() {
emf.close();
ds.close();
}
private void createSupervisor(SupervisorCommand<ConcurrentMap<Integer, ActorRef>> command) {
actors.computeIfAbsent(command.key.type).compute(command.key, (supervisor, children) -> {
if (children == null) {
if (command.key.type.distributionType.isRoundRobin) {
routers.compute(command.key.type, (type, router) ->
(router == null ? new Router(new RoundRobinRoutingLogic()) : router).addRoutee(command.key.supervisorRef));
}
return command.value != null ? command.value : newConcurrentMap();
} else {
if (command.value != null) {
children.putAll(command.value);
}
return children;
}
});
}
private void registerKey(Member member) {
final Set<String> roles = member.getRoles();
if (!roles.isEmpty()) {
final String role = roles.iterator().next();
if (SupervisorType.findBy(role) != key.type) {
context().actorSelection(member.address() + "/user/" + role).
tell(SupervisorCommand.registerKey(), key.supervisorRef);
}
}
}
private void registerKey(SupervisorCommand<SupervisorKey> supervisorCommand) {
createSupervisor(SupervisorCommand.createSupervisor(supervisorCommand.key, null));
final ConcurrentMap<Integer, ActorRef> cache = findCacheBy(key);
supervisorCommand.key.supervisorRef.
tell(SupervisorCommand.createSupervisor(key, cache.isEmpty() ? null : cache), noSender());
}
private void removeSupervisor(Member member) {
final Set<String> roles = member.getRoles();
if (!roles.isEmpty()) {
final SupervisorType supervisorType = SupervisorType.findBy(roles.iterator().next());
if (supervisorType != key.type) {
final String address = member.address().toString();
final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>> cache = actors.computeIfAbsent(supervisorType);
cache.keySet().stream().
filter(supervisor -> supervisor.supervisorRef.path().address().toString().startsWith(address)).
forEach(supervisor -> {
final Router oldRouter = routers.computeIfPresent(supervisor.type,
(type, router) -> router.removeRoutee(supervisor.supervisorRef));
if (oldRouter != null && oldRouter.routees().isEmpty()) {
routers.remove(supervisor.type);
}
cache.remove(supervisor);
});
}
}
}
@SuppressWarnings("unchecked")
private void createChild(SupervisorCommand<?> command) {
final SupervisorCommand<ChildKey> childCommand;
if (command.key == null) {
childCommand = SupervisorCommand.createChild(key, (ChildKey) command.value);
actors.entrySet().stream().
filter(entry -> entry.getKey() != key.type).
forEach(entry -> entry.getValue().keySet().stream().
forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, noSender())));
} else {
childCommand = (SupervisorCommand<ChildKey>) command;
}
actors.computeIfAbsent(childCommand.key.type).
computeIfAbsent(childCommand.key).
}
@SuppressWarnings("unchecked")
private void removeChild(SupervisorCommand<?> command) {
final SupervisorCommand<ChildKey> childCommand;
if (command.key == null) {
childCommand = SupervisorCommand.removeChild(key, (ChildKey) command.value);
actors.entrySet().stream().
filter(entry -> entry.getKey() != key.type).
forEach(entry -> entry.getValue().keySet().
forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, noSender())));
} else {
childCommand = (SupervisorCommand<ChildKey>) command;
}
actors.computeIfAbsent(childCommand.key.type).
computeIfAbsent(childCommand.key).
}
public ConcurrentMap<Integer, ActorRef> findCacheBy(SupervisorKey supervisorKey) {
return actors.computeIfAbsent(supervisorKey.type).computeIfAbsent(supervisorKey);
}
public Router findRouterBy(SupervisorType type) {
return routers.get(type);
}
public Integers findActorIdsBy(SupervisorType type) {
final Integers ids = new Integers();
actors.computeIfAbsent(type).values().forEach(actorIds -> ids.addAll(actorIds.keySet()));
return ids;
}
public ActorRef findActorBy(SupervisorType type, Integer id) {
for (ConcurrentMap<Integer, ActorRef> actors : this.actors.computeIfAbsent(type).values()) {
final ActorRef childRef = actors.get(id);
if (childRef != null) {
return childRef;
}
}
return null;
}
public SupervisorKey findFirstSupervisorBy(SupervisorType type) {
final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>> supervisors = actors.get(type);
return supervisors.isEmpty() ? null : supervisors.keySet().iterator().next();
}
@SuppressWarnings("unchecked")
@Override
public final void onReceive(Object message) {
if (message instanceof SupervisorCommand<?>) {
final SupervisorCommand<?> command = (SupervisorCommand<?>) message;
switch (command.command) {
case REGISTER_KEY:
if (command.key == null) {
sender().tell(SupervisorCommand.registerKey(key), noSender());
} else {
registerKey((SupervisorCommand<SupervisorKey>) message);
}
break;
case CREATE_SUPERVISOR:
createSupervisor((SupervisorCommand<ConcurrentMap<Integer, ActorRef>>) command);
break;
case CREATE_CHILD:
createChild(command);
break;
case REMOVE_CHILD:
removeChild(command);
break;
case RELOAD_CONFIG: {
emf.getCache().evictAll();
reloadCaches();
final List<Future<Boolean>> futures = new ArrayList<>();
final SupervisorCommand<Integers> configCommand = (SupervisorCommand<Integers>) command;
final Integers currentActors = findActorIdsBy(key.type);
currentActors.minus(configCommand.value).
forEach(id -> {
final ActorRef child = findActorBy(key.type, id);
if (child != null) {
futures.add(Patterns.gracefulStop(child, FIFTEEN_SECONDS));
}
});
final ActorContext context = context();
final ExecutionContextExecutor dispatcher = context.system().dispatcher();
Futures.sequence(futures, dispatcher).onComplete(
new OnComplete<Iterable<Boolean>>() {
@Override
public void onComplete(Throwable failure, Iterable<Boolean> success) {
try (CloseableEntityManager em = new CloseableEntityManager(emf.createEntityManager())) {
key.type.findEnabledEntitiesBy(em.em, configCommand.value.minus(currentActors)).
forEach(entity -> createChildActor(context, (T) entity));
}
}
}, dispatcher);
break;
}
case RELOAD_CACHES:
emf.getCache().evictAll();
reloadCaches();
break;
}
} else if (message instanceof MemberEvent) {
if (message instanceof MemberUp) {
registerKey(((MemberUp) message).member());
} else {
removeSupervisor(((MemberEvent) message).member());
}
} else {
processMessage(message);
}
}
protected void reloadCaches() {
}
public void processMessage(Object message) {
unhandled(message);
}
}