deserializing ActorRef's after jvm restart, will it point to the correct actor?

104 views
Skip to first unread message

Kostas kougios

unread,
Nov 5, 2015, 7:25:14 AM11/5/15
to Akka User List
Hi, after my program restarts, it deserializes remote actorrefs but they don't seem to be pointing to the actor.

So I got a cluster of say 4 servers (simulated by 4 ActorSystem's running on different ports).

Each server serializes ActorRefs from self or other servers. When the server restarts, it deserializes them but they dont seem to be valid anymore.

Is there a way to deserialize them and be valid? My actorrefs all have valid paths, i.e. .../databaseServer/index

Kostas kougios

unread,
Nov 5, 2015, 9:02:09 AM11/5/15
to Akka User List
I could ofcourse do an actorSelection but is there an easier way?

Martynas Mickevičius

unread,
Nov 5, 2015, 10:41:28 AM11/5/15
to akka...@googlegroups.com
Hi Kostas,

serialized ActorRef contain UID which indicates an incarnation of particular actor.

res8: akka.actor.ActorRef = Actor[akka://repl/user/$a#1452319393]

When you restart your actor system newly created actors have the same path but they have a different UID and a message sent to such deserialized ActorRef ends up in dead letters.

What you have to do is to resolve these new UIDs that have been assigned to new actors. You can do that, as you correctly hinted to, by using actor selection.

val f: Future[ActorRef] = system.actorSelection(oldRef.path).resolveOne

On Thu, Nov 5, 2015 at 4:02 PM, Kostas kougios <kostas....@googlemail.com> wrote:
I could ofcourse do an actorSelection but is there an easier way?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Konstantinos Kougios

unread,
Nov 6, 2015, 7:10:40 AM11/6/15
to akka...@googlegroups.com
Thanks Martynas, I was afraid that this was the case. I would prefer to view actor paths as i.e. endpoints of a service, same path same service. I might serialize thousands of actorrefs and I suppose actorSelection for all those will take time as it requires network communication.

So it is not really a serialization issue. It is rather an actor restart issue using actorOf() because it gets a new UID. In my cluster I have thousands of actors and those are cached (in memory or serialized - it doesn't matter) on each server. If one of the cluster members goes down and restarts, then the ActorRef cache is not valid. This makes me thing that I either should cache the paths instead but then actorSelection will take more time than just actorRef ! X , or not cache anything at all - that will complicate the impl and network overhead considerably .
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/n8HlIIkZhCs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Kostas kougios

unread,
Nov 6, 2015, 7:30:11 AM11/6/15
to Akka User List
Ok so basically I am going to change my code and serialize endpoints (as strings) and then use a resolveOne(..) cache (probably cache refs for a short time of a few seconds) unless someone can recommend a better way.


Martynas Mickevičius

unread,
Nov 6, 2015, 8:04:22 AM11/6/15
to akka...@googlegroups.com
You can send messages to ActorSelection without resolving them to ActorRefs. ActorSelection has ! and tell methods.

On Fri, Nov 6, 2015 at 2:30 PM, Kostas kougios <kostas....@googlemail.com> wrote:
Ok so basically I am going to change my code and serialize endpoints (as strings) and then use a resolveOne(..) cache (probably cache refs for a short time of a few seconds) unless someone can recommend a better way.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Konstantinos Kougios

unread,
Nov 6, 2015, 8:28:05 AM11/6/15
to akka...@googlegroups.com
This will be helpful. Is there any performance penalty compared to actorRef.tell ? If I cache ActorSelection (for say 10 secs) will it be better in terms of performance and also will avoid the server-restart issue? I see it has internally an anchor actorref, so it might be better
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/n8HlIIkZhCs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Kostas kougios

unread,
Nov 6, 2015, 9:03:09 AM11/6/15
to Akka User List
hmm, I think actor paths will always be local during the time of the actor creation (all actors are created locally and then send over the wire). actor.path.address has a host & port of None, so there is no point sending that over the wire. The address part of the path won't automatically be fixed during serialization, will it?

Kostas kougios

unread,
Nov 6, 2015, 11:15:24 AM11/6/15
to Akka User List
Well, I refactored my code to cache actorpaths but indeed during creation all actorpaths are local and when they are transmitted over the wire they remain local => pointing to the wrong path.

I've manually modified the address part of the path myself so that it contains the host & port of the server. But the process seems awkward , am I missing something?

Guido Medina

unread,
Nov 9, 2015, 6:26:02 PM11/9/15
to Akka User List
Hi Kostas,

I have few micro-services and some supervisor actors which inherited from an AbstractSupervisor template which uses local caches per microservices with an optimistic approach, example with requirements:

1) There is a uniform pattern where each micro-service is an independent category and also each category has the capability of being either a round-robin supervisor only or a supervisor with children (sharding)
2) Each category informs other categories by using cluster events when a supervisor comes up and/or any/of its children.
3) If a supervisor dies, also its children die which means that other supervisors in other categories are informed hence the cache for that supervisor is invalidated.
4) Each children has an Integer ID, say, category account and account 1, 2, ...., 10 so I don't bother giving a path to children, I know a children of account is ../acccount/$blah blah incarnation, so what I do to inform caches of other microservices that Account ID X = ActorRef Y

All this would suggest that cluster sharding is what I use but no, it is very similar but I have my own implementation constrained more to a DDD.

I probably confused you more than what I helped you, all I wanted to say is that caching a key with a corresponding ActorRef is not a bad idea, of course if one of your microservices goes down you need to invalidate caches in order to avoid actor selection calls which I would discourage you to do, instead (but not free since I had to put lot of work on contemplating different scenarios), do an inventory of your events and try to react on them.

I particularly designed the application to build the caches using cluster events and template pattern (preStart and postStop for some type of actors would inform other caches)

Having also an uniform hierarchy will help like supervisor -> children so that you can add or invalidate a whole branch (I do that too with my mini-framework)

Hope all these crazy ideas can help you.

Best regards,

Guido.

Konstantinos Kougios

unread,
Nov 10, 2015, 7:27:02 AM11/10/15
to akka...@googlegroups.com
Hi Guido, mine is a similar use case. One difference is that I store the actorrefs to files on disk as I am expecting to have a lot of them and don't want to keep them into memory. I would expect to say have 1000 servers with 1000 actorrefs per server for 1 "index" of mine  (index = database index as I am building a database). This means it is costly to do batch updates of the actor refs. So what I ended doing so far is to serialize the actor path (after injecting the real server RootAddress to it). Then I resolveOne actorRef and I keep that ref into memory for 10 seconds and then resolveOne again. This means that my servers don't have to communicate up/down and it works if servers crash/restarts too. The whole trick is achived by replacing ActorRef with EndpointActorRef (a class of mine) which does the actorPath manipulation and takes care of serializing the actorPath instead of actorRef.

Overall I might end up serializing actorPath for actors in the same jvm - if the actor is inactive for a long time. This means actors might come and go quite frequently even if the servers are up.

What happens to your cache if a server crashes? Do the cache removes the invalid actor refs somehow?

Cheers

Guido Medina

unread,
Nov 10, 2015, 8:43:25 AM11/10/15
to Akka User List
Each server is subscribed to cluster events and each event is received by the supervisor of that server; when I say server think a micro-service and when I say supervisor think a supervisor for the category of that micro-service like Account supervisor or Order supervisor, etc, each server event message is dealt with accordingly, other things happen of course, if a member comes up to the cluster it notifies other members saying, here is my ActorRef supervisor, so to that actor ref other node send their caches, the beauty of it is that actorref can be use as a Map key, so if a node goes down, I locate the key and remove the children keys, partial production and working Java 8 code:

import akka.actor.*;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Router;
-- removed company imports;
import com.zaxxer.hikari.HikariDataSource;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;

import javax.persistence.EntityManagerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentMap;

import static akka.actor.ActorRef.noSender;
import static akka.cluster.ClusterEvent.initialStateAsEvents;
-- removed company imports;

/**
 * @author Guido Medina, created on 17/03/15.
 */
public abstract class SupervisorAbstractActor<
  K extends Serializable,
  T extends GenericEntity<K, T>,
  S extends SupervisorAbstractActor<K, T, S, C>,
  C extends ChildAbstractActor<K, T, S, C>> extends DefaultAbstractActor {

  public static final String ENGINE_DISPATCHER = "engine.dispatcher";
  public static final String DEFAULT_DISPATCHER = "akka.actor.default-dispatcher";

  public final SupervisorKey key;
  public final HikariDataSource ds;
  public final EntityManagerFactory emf;
  public final LoggingAdapter log;

  final ConcurrentMap<SupervisorType, Router> routers = newConcurrentMap();
  final ComputingConcurrentMap<SupervisorType, ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>>>
    actors = newComputingConcurrentMap(supervisorType -> newComputingConcurrentMap(supervisorKey -> newConcurrentMap()));

  public SupervisorAbstractActor(SupervisorType type) {
    final ActorSystem system = context().system();
    log = Logging.getLogger(system, this);
    key = new SupervisorKey(type, self());
    try {
      ds = createDataSource(system.settings().config());
      emf = createEntityManagerFactory(ds);
    } catch (Exception e) {
      log.error("Error starting supervisor " + key, e);
      system.terminate();
      throw e;
    }
    Cluster.get(system).subscribe(key.supervisorRef, initialStateAsEvents(), MemberEvent.class);
  }

  protected abstract ChildActorCreator<K, T, S, C> creator(T entity);

  protected ActorRef createChildActor(ActorContext context, T entity) {
    return context.actorOf(Props.create(creator(entity)).withDispatcher(DEFAULT_DISPATCHER));
  }

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return ALWAYS_RESUME_STRATEGY;
  }

  @Override
  public void postStop() {
    emf.close();
    ds.close();
  }

  private void createSupervisor(SupervisorCommand<ConcurrentMap<Integer, ActorRef>> command) {
    actors.computeIfAbsent(command.key.type).compute(command.key, (supervisor, children) -> {
      if (children == null) {
        if (command.key.type.distributionType.isRoundRobin) {
          routers.compute(command.key.type, (type, router) ->
            (router == null ? new Router(new RoundRobinRoutingLogic()) : router).addRoutee(command.key.supervisorRef));
        }
        return command.value != null ? command.value : newConcurrentMap();
      } else {
        if (command.value != null) {
          children.putAll(command.value);
        }
        return children;
      }
    });
  }

  private void registerKey(Member member) {
    final Set<String> roles = member.getRoles();
    if (!roles.isEmpty()) {
      final String role = roles.iterator().next();
      if (SupervisorType.findBy(role) != key.type) {
        context().actorSelection(member.address() + "/user/" + role).
          tell(SupervisorCommand.registerKey(), key.supervisorRef);
      }
    }
  }

  private void registerKey(SupervisorCommand<SupervisorKey> supervisorCommand) {
    createSupervisor(SupervisorCommand.createSupervisor(supervisorCommand.key, null));
    final ConcurrentMap<Integer, ActorRef> cache = findCacheBy(key);
    supervisorCommand.key.supervisorRef.
      tell(SupervisorCommand.createSupervisor(key, cache.isEmpty() ? null : cache), noSender());
  }

  private void removeSupervisor(Member member) {
    final Set<String> roles = member.getRoles();
    if (!roles.isEmpty()) {
      final SupervisorType supervisorType = SupervisorType.findBy(roles.iterator().next());
      if (supervisorType != key.type) {
        final String address = member.address().toString();
        final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>> cache = actors.computeIfAbsent(supervisorType);
        cache.keySet().stream().
          filter(supervisor -> supervisor.supervisorRef.path().address().toString().startsWith(address)).
          forEach(supervisor -> {
            final Router oldRouter = routers.computeIfPresent(supervisor.type,
              (type, router) -> router.removeRoutee(supervisor.supervisorRef));
            if (oldRouter != null && oldRouter.routees().isEmpty()) {
              routers.remove(supervisor.type);
            }
            cache.remove(supervisor);
          });
      }
    }
  }

  @SuppressWarnings("unchecked")
  private void createChild(SupervisorCommand<?> command) {
    final SupervisorCommand<ChildKey> childCommand;
    if (command.key == null) {
      childCommand = SupervisorCommand.createChild(key, (ChildKey) command.value);
      actors.entrySet().stream().
        filter(entry -> entry.getKey() != key.type).
        forEach(entry -> entry.getValue().keySet().stream().
          forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, noSender())));
    } else {
      childCommand = (SupervisorCommand<ChildKey>) command;
    }
    actors.computeIfAbsent(childCommand.key.type).
      computeIfAbsent(childCommand.key).
      put(childCommand.value.id, childCommand.value.childRef);
  }

  @SuppressWarnings("unchecked")
  private void removeChild(SupervisorCommand<?> command) {
    final SupervisorCommand<ChildKey> childCommand;
    if (command.key == null) {
      childCommand = SupervisorCommand.removeChild(key, (ChildKey) command.value);
      actors.entrySet().stream().
        filter(entry -> entry.getKey() != key.type).
        forEach(entry -> entry.getValue().keySet().
          forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, noSender())));
    } else {
      childCommand = (SupervisorCommand<ChildKey>) command;
    }
    actors.computeIfAbsent(childCommand.key.type).
      computeIfAbsent(childCommand.key).
      remove(childCommand.value.id);
  }

  public ConcurrentMap<Integer, ActorRef> findCacheBy(SupervisorKey supervisorKey) {
    return actors.computeIfAbsent(supervisorKey.type).computeIfAbsent(supervisorKey);
  }

  public Router findRouterBy(SupervisorType type) {
    return routers.get(type);
  }

  public Integers findActorIdsBy(SupervisorType type) {
    final Integers ids = new Integers();
    actors.computeIfAbsent(type).values().forEach(actorIds -> ids.addAll(actorIds.keySet()));
    return ids;
  }

  public ActorRef findActorBy(SupervisorType type, Integer id) {
    for (ConcurrentMap<Integer, ActorRef> actors : this.actors.computeIfAbsent(type).values()) {
      final ActorRef childRef = actors.get(id);
      if (childRef != null) {
        return childRef;
      }
    }
    return null;
  }

  public SupervisorKey findFirstSupervisorBy(SupervisorType type) {
    final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>> supervisors = actors.get(type);
    return supervisors.isEmpty() ? null : supervisors.keySet().iterator().next();
  }

  @SuppressWarnings("unchecked")
  @Override
  public final void onReceive(Object message) {
    if (message instanceof SupervisorCommand<?>) {
      final SupervisorCommand<?> command = (SupervisorCommand<?>) message;
      switch (command.command) {
        case REGISTER_KEY:
          if (command.key == null) {
            sender().tell(SupervisorCommand.registerKey(key), noSender());
          } else {
            registerKey((SupervisorCommand<SupervisorKey>) message);
          }
          break;
        case CREATE_SUPERVISOR:
          createSupervisor((SupervisorCommand<ConcurrentMap<Integer, ActorRef>>) command);
          break;
        case CREATE_CHILD:
          createChild(command);
          break;
        case REMOVE_CHILD:
          removeChild(command);
          break;
        case RELOAD_CONFIG: {
          emf.getCache().evictAll();
          reloadCaches();
          final List<Future<Boolean>> futures = new ArrayList<>();
          final SupervisorCommand<Integers> configCommand = (SupervisorCommand<Integers>) command;
          final Integers currentActors = findActorIdsBy(key.type);
          currentActors.minus(configCommand.value).
            forEach(id -> {
              final ActorRef child = findActorBy(key.type, id);
              if (child != null) {
                futures.add(Patterns.gracefulStop(child, FIFTEEN_SECONDS));
              }
            });

          final ActorContext context = context();
          final ExecutionContextExecutor dispatcher = context.system().dispatcher();
          Futures.sequence(futures, dispatcher).onComplete(
            new OnComplete<Iterable<Boolean>>() {
              @Override
              public void onComplete(Throwable failure, Iterable<Boolean> success) {
                try (CloseableEntityManager em = new CloseableEntityManager(emf.createEntityManager())) {
                  key.type.findEnabledEntitiesBy(em.em, configCommand.value.minus(currentActors)).
                    forEach(entity -> createChildActor(context, (T) entity));
                }
              }
            }, dispatcher);
          break;
        }
        case RELOAD_CACHES:
          emf.getCache().evictAll();
          reloadCaches();
          break;
      }
      log.info(command.toString());
    } else if (message instanceof MemberEvent) {
      if (message instanceof MemberUp) {
        registerKey(((MemberUp) message).member());
      } else {
        removeSupervisor(((MemberEvent) message).member());
      }
    } else {
      processMessage(message);
    }
  }

  protected void reloadCaches() {
  }

  public void processMessage(Object message) {
    unhandled(message);
  }
}



Note: I pasted random code without context of industry because I'm not allowed to share the code so hope it helps and by the way you can have hundreds of thousands ActorRef in memory that's no problem.

Guido Medina

unread,
Nov 10, 2015, 11:04:31 AM11/10/15
to Akka User List
Ah, in a way the code I sent before is for a set of Actor systems connected between them all belonging to the same cluster, I'm not sure how Akka Cluster will perform with 1000 nodes, why so many nodes? Answering your question if a node leaves the cluster under any circumstances an event is generated on all other nodes so I remove that branch of actors from each node's cache, I only call once when a node comes up to locate the supervisor actor via selection remotely because that one is always on /user/$role where $role in ["account", "order", "signal", ..., "http"] etc

Also, children actors extend an abstract template class which basically notify the other remote supervisor of its life-cycle (created, destroyed), one node going down is an special action so child by child are not remove but their whole branch, because; they belong to a tree, a hierarchy of supervisors with children.

There are special supervisors that have no children, specially for round-robin supervisor, say, a supervisor that process stateless requests and each other node have a router reference to such category instead of a single reference to a supervisor so instead of sending a message to a supervisor I send a message to a round robin router.

Cheers,

Guido.
Reply all
Reply to author
Forward
0 new messages