Is this a viable paradigm for PerRequest actors on Synchronous system?

71 views
Skip to first unread message

kraythe

unread,
Jun 28, 2016, 12:48:23 PM6/28/16
to Akka User List
Greetings, 

I am trying to glue a play controller together with an Actor System in a large legacy codebase. Reimplementing the whole codebase as a raw actor system is not viable in the short term so it will have to take over piece by piece. What I am wondering is if the following paradigm is viable or if there is some issue with the concept. 

package actors;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.concurrent.CompletableFuture;

public class PerRequestActor extends UntypedActor {
   
private final CompletableFuture<FinalResponse> future;
   
private ResponseA a;
   
private ResponseB b;
   
private ResponseC c;

   
public static Props props(final CompletableFuture<FinalResponse> future) {
       
return Props.create(PerRequestActor.class, future);
   
}

   
public static CompletableFuture<FinalResponse> invoke(final ActorSystem system) {
       
final CompletableFuture<FinalResponse> future = new CompletableFuture<>();
        system
.actorOf(props(future));
       
return future;
   
}

   
public PerRequestActor(final CompletableFuture<FinalResponse> future) {
       
this.future = future;
   
}

   
@Override
    public void preStart() throws Exception {
        context
().system().actorSelection("/user/A").tell(new RequestA(), getSelf());
        context
().system().actorSelection("/user/B").tell(new RequestB(), getSelf());
        context
().system().actorSelection("/user/C").tell(new RequestC(), getSelf());
   
}

   
@Override
    public void onReceive(final Object message) throws Exception {
       
if (message instanceof ResponseA) {
           
a = (ResponseA) message;
            completeIfPossible
();
       
} else if (message instanceof ResponseB) {
           
b = (ResponseB) message;
            completeIfPossible
();
       
} else if (message instanceof ResponseC) {
           
c = (ResponseC) message;
            completeIfPossible
();
       
} else {
            unhandled
(message);
       
}
   
}

   
public void completeIfPossible() {
       
if (a != null && b != null && c != null) future.complete(new FinalResponse(a, b, c));
        getContext
().stop(getSelf());
   
}

   
public static class FinalResponse {
       
public FinalResponse(final ResponseA a, final ResponseB b, final ResponseC c) {
           
// Code Here
        }
   
}

   
public static class RequestA { }
   
public static class ResponseA { }
   
public static class RequestB { }
   
public static class ResponseB { }
   
public static class RequestC { }
   
public static class ResponseC { }
}



The idea is that the user calls invoke synchronously and then that spins up the actor system passing the future to the actor. In the pre-start the messages are sent to other actors via tell and then when the actor collects all of the information it needs to finish the task, it collects the information, creates the final response and completes the future.

The question is if this is a viable concept. 

Thanks for your advice.

Mark Hatton

unread,
Jun 29, 2016, 5:27:08 AM6/29/16
to Akka User List
Hi Kraythe

In situations such as this where the only state of your actor is a set of future values, I prefer future composition over actors to improve readability and avoid reinventing the wheel... afaict the sole responsibility of your actor is to compose the result of 3 futures.

Regarding your specific implementation you should of course be mindful of timeouts (I would recommend getContext().setReceiveTimeout(...)) and your completeIfPossible function looks to have a bug whereby it is stopping the actor early :)

Regards

Mark

kraythe

unread,
Jun 29, 2016, 10:25:23 AM6/29/16
to Akka User List
Thanks for the reply Mark. I understand where you are comming from but the actual implementation in proprietary code is quit a bit more complex. Future a response drives data needed for future b and c requests. The actual code with completable futures is significantly more complex. I would rather be able to do an ask and collect three responses before returning from the ask but I don't think there is a way to do that without encoding asks into the actor itself which seems worse than this. Currently the implementation works. I did add a receive timeout though. If there is a better way or something wrong with this way then I would change it

Guido Medina

unread,
Jun 29, 2016, 11:16:28 AM6/29/16
to Akka User List
But it can be improved; instead of creating and destroying actors per Future why not just create a pool of actors (or workers if you will),
the recommended amount of concurrent actors you want is usually a factor of CPUs, say CPUs x N where N in [1..4], of course,
and if these actors use legacy code and have a chance to block create them on a separate dispatcher.

Once you create these actors put them in a RoundRobin router so that they alternate (who will receive the next future) and send the Future to your router.
You can do all that programmatically, these actors can be created with a Creator if you need to pass constructor parameter to wire them up with other things,
I will post later sample code of a router with workers if you haven't figured it out, let me know if you do or don't in a couple of hours.

HTH,

Guido.

kraythe

unread,
Jun 29, 2016, 11:58:10 AM6/29/16
to Akka User List
Interesting idea, of course that violates the principle of not sending mutable state to an actor. I think it would only be necessary if there was significant overhead in creating and stopping the actors and Akka says that isn't the case. I think the only way this could work is if you sent an ask to the router and then the actor changed state, saved the original sender and then finished the ask when all data has been collected. So this idea realized would look something like the following. 

package actors;

import akka.actor.ActorRef;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Procedure;
import akka.pattern.PatternsCS;

import java.util.concurrent.CompletableFuture;

@SuppressWarnings("WeakerAccess")
public class RequestHandlerActor extends UntypedActor {

   
private ResponseA a;
   
private ResponseB b;
   
private ResponseC c;

   
private ActorRef originalSender;

   
private Procedure<Object> idle = new Procedure<Object>() {
       
@Override
        public void apply(final Object message) throws Exception {
           
if (message instanceof StartProcessRequest) {
               
originalSender = sender();
                getContext
().become(busy);

                context
().system().actorSelection("/user/A").tell(new RequestA(), getSelf());

           
} else {
                unhandled
(message);
           
}
       
}
   
};

   
private Procedure<Object> busy = new Procedure<Object>() {
       
@Override
        public void apply(final Object message) throws Exception {

           
if (message instanceof ResponseA) {
               
a = (ResponseA) message;

                context
().system().actorSelection("/user/B").tell(new RequestB(a.id), getSelf());
                context
().system().actorSelection("/user/C").tell(new RequestC(a.id), getSelf());

           
} else if (message instanceof ResponseB) {
               
b = (ResponseB) message;

           
} else if (message instanceof ResponseC) {
               
c = (ResponseC) message;

           
} else {
                unhandled
(message);
           
}
           
if ((a != null && b != null && c != null)) {
               
originalSender.tell(new FinalResponse(a, b, c), getSelf());
               
originalSender = null;
                getContext
().become(idle);

           
}
       
}
   
};

   
public static Props props(final CompletableFuture<FinalResponse> future) {

       
return Props.create(RequestHandlerActor.class, future);

   
}

   
public static CompletableFuture<FinalResponse> invoke(final ActorSystem system) {

       
return PatternsCS.ask(system.actorSelection("/user/myRouter"), new StartProcessRequest(), 60000)
               
.thenApply(v -> (FinalResponse) v)
               
.toCompletableFuture();
   
}

   
public RequestHandlerActor() {

   
}

   
@Override
    public void preStart() throws Exception {

        getContext
().become(idle);

   
}

   
@Override
    public void onReceive(final Object message) throws Exception {

        unhandled
(message);

   
}

   
public static class FinalResponse {
       
public FinalResponse(final ResponseA a, final ResponseB b, final ResponseC c) {
           
// Code Here
        }
   
}

   
public static class RequestA {
   
}

   
public static class ResponseA {

       
public final int id;

       
public ResponseA(final int id) {
           
this.id = id;
       
}
   
}

   
public static class RequestB {
       
public final int id;

       
public RequestB(final int id) {
           
this.id = id;

       
}
   
}

   
public static class ResponseB {
   
}

   
public static class RequestC {

       
public final int id;

       
public RequestC(final int id) {
           
this.id = id;
       
}
   
}

   
public static class ResponseC {
   
}

   
public static class StartProcessRequest {
   
}
}



The only issue that I see with this is that the router is going to need to know which actors are busy and which aren't and I don't see how they can do that without sending a message to the actor itself which would be heavyweight if you have to poll all the actors to figure out which is available. Another thought would be to make the router just be a least mailbox router and then have the messages for processing a new request be stashed so they queue up to be handled next. 

The benefit of this is that we can control the actor pool size rather than have it grow according to load.

Ideas? Comments? Flames? 

Guido Medina

unread,
Jun 29, 2016, 5:33:08 PM6/29/16
to Akka User List
akka.routing.SmallestMailboxRoutingLogic

Source: http://doc.akka.io/docs/akka/2.4.7/java/routing.html

Guido Medina

unread,
Jun 29, 2016, 5:40:20 PM6/29/16
to Akka User List
Here is how to create your Router programmatically:

final List<ActorRef> actors = ...
Router router = new Router(new RoundRobinRoutingLogic(), actors.stream().map(ActorRefRoutee::new).
            collect(toCollection(() -> new ArrayList<>(value.size())));

In this case I'm using RoundRobinRoutingLogic, you can use any of the pre-defined ones or you could implement your own.
You want the size of that list to be something like:

Runtime.getRuntime().availableProcessors() * N where N in [1..4]

HTH,

Guido.

Guido Medina

unread,
Jun 29, 2016, 5:41:52 PM6/29/16
to Akka User List
Correction: is actors.size(), not value.size(), value came from the original code which iterates over a Map, something else, which is my DDD shard distributor:


On Wednesday, June 29, 2016 at 10:40:20 PM UTC+1, Guido Medina wrote:
Here is how to create your Router programmatically:

final List<ActorRef> actors = ...
Router router = new Router(new RoundRobinRoutingLogic(), actors.stream().map(ActorRefRoutee::new).
            collect(toCollection(() -> new ArrayList<>(actors.size())));

In this case I'm using RoundRobinRoutingLogic, you can use any of the pre-defined ones or you could implement your own.

Guido Medina

unread,
Jun 29, 2016, 5:50:55 PM6/29/16
to Akka User List
Even if the cost of creating new actors is low, there is still a cost, its lifecycle for example has to be ran per actor creation,
a mailbox has to be created and that adds up to the GC, you could even use specific mailboxes for these set of actors that are inside a router,
for example, I use two types of "faster" and "cheaper" mailboxes for worker's like actors, like:


That way my workers are always ready to process whatever.
Just my opinion though,

Guido.

Patrik Nordwall

unread,
Jun 30, 2016, 2:27:41 AM6/30/16
to akka...@googlegroups.com
I agree that there is a performance cost involved in creating actors, but if you don't have extreme performance requirements PerRequest actors can make the code more clear and easier to reason about. I would start simple and only introduce a more complex solution when I have measured and confirmed that it is needed. 

Cheers,
Patrik

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Guido Medina

unread,
Jun 30, 2016, 4:20:43 AM6/30/16
to Akka User List
I totally agree with Patrik, simplicity is going to be the key.

The approach I have shown you wasn't the first instance of my code when I started,
it is just the current state where my application has evolved to, you will eventually evolve it when the need arises.

Also I was trying to show you the choices you will have in the future if you need to go one way or the other,
the concept of using workers is well known and has proven to scale even with slower frameworks than Akka

-that also says that I consider Akka a hell of a good toolkit to work with; it gives you many ways of solving one problem and each way does it well-

You are prototyping and coding at the same time, I believe the more choices you are aware of the better you will build your application.

HTH,

Guido.

kraythe

unread,
Jun 30, 2016, 9:52:06 AM6/30/16
to Akka User List
Thanks for all the feedback. To be clear you seem to think my second approach is the better of the two ideas and I souls just create a bunch of these in a router? My only concern there is that if a single actor fails requests can queue up in that actors mailbox. When the router restarts the failed actor, will unprocessed messages in the mailbox be lost or will it go processing as normal?

Guido Medina

unread,
Jun 30, 2016, 4:17:35 PM6/30/16
to Akka User List
That depends on the supervisor strategy, funny thing that for one my clients with lots of legacy code I had to use a CompletableFuture because the framework I'm introducing there has no of ask or await.
And that's an idea you can use, send a Java's CompletableFuture to your actor, and in such future set either the exception or result and let the consumer of such failure deal with it:


HTH,

Guido.

Guido Medina

unread,
Jun 30, 2016, 4:24:11 PM6/30/16
to Akka User List
You can complete such futures with a Throwable or an actual result, that's what I meant,
in another thread you can future.get() (blocking) and as soon as the actor executes the other thread will be able to proceed,
I know that's half actor half blocking but that's one of the hassle when dealing with legacy code.
Reply all
Reply to author
Forward
0 new messages