package actors;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.concurrent.CompletableFuture;
public class PerRequestActor extends UntypedActor {
private final CompletableFuture<FinalResponse> future;
private ResponseA a;
private ResponseB b;
private ResponseC c;
public static Props props(final CompletableFuture<FinalResponse> future) {
return Props.create(PerRequestActor.class, future);
}
public static CompletableFuture<FinalResponse> invoke(final ActorSystem system) {
final CompletableFuture<FinalResponse> future = new CompletableFuture<>();
system.actorOf(props(future));
return future;
}
public PerRequestActor(final CompletableFuture<FinalResponse> future) {
this.future = future;
}
@Override
public void preStart() throws Exception {
context().system().actorSelection("/user/A").tell(new RequestA(), getSelf());
context().system().actorSelection("/user/B").tell(new RequestB(), getSelf());
context().system().actorSelection("/user/C").tell(new RequestC(), getSelf());
}
@Override
public void onReceive(final Object message) throws Exception {
if (message instanceof ResponseA) {
a = (ResponseA) message;
completeIfPossible();
} else if (message instanceof ResponseB) {
b = (ResponseB) message;
completeIfPossible();
} else if (message instanceof ResponseC) {
c = (ResponseC) message;
completeIfPossible();
} else {
unhandled(message);
}
}
public void completeIfPossible() {
if (a != null && b != null && c != null) future.complete(new FinalResponse(a, b, c));
getContext().stop(getSelf());
}
public static class FinalResponse {
public FinalResponse(final ResponseA a, final ResponseB b, final ResponseC c) {
// Code Here
}
}
public static class RequestA { }
public static class ResponseA { }
public static class RequestB { }
public static class ResponseB { }
public static class RequestC { }
public static class ResponseC { }
}
package actors;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Procedure;
import akka.pattern.PatternsCS;
import java.util.concurrent.CompletableFuture;
@SuppressWarnings("WeakerAccess")
public class RequestHandlerActor extends UntypedActor {
private ResponseA a;
private ResponseB b;
private ResponseC c;
private ActorRef originalSender;
private Procedure<Object> idle = new Procedure<Object>() {
@Override
public void apply(final Object message) throws Exception {
if (message instanceof StartProcessRequest) {
originalSender = sender();
getContext().become(busy);
context().system().actorSelection("/user/A").tell(new RequestA(), getSelf());
} else {
unhandled(message);
}
}
};
private Procedure<Object> busy = new Procedure<Object>() {
@Override
public void apply(final Object message) throws Exception {
if (message instanceof ResponseA) {
a = (ResponseA) message;
context().system().actorSelection("/user/B").tell(new RequestB(a.id), getSelf());
context().system().actorSelection("/user/C").tell(new RequestC(a.id), getSelf());
} else if (message instanceof ResponseB) {
b = (ResponseB) message;
} else if (message instanceof ResponseC) {
c = (ResponseC) message;
} else {
unhandled(message);
}
if ((a != null && b != null && c != null)) {
originalSender.tell(new FinalResponse(a, b, c), getSelf());
originalSender = null;
getContext().become(idle);
}
}
};
public static Props props(final CompletableFuture<FinalResponse> future) {
return Props.create(RequestHandlerActor.class, future);
}
public static CompletableFuture<FinalResponse> invoke(final ActorSystem system) {
return PatternsCS.ask(system.actorSelection("/user/myRouter"), new StartProcessRequest(), 60000)
.thenApply(v -> (FinalResponse) v)
.toCompletableFuture();
}
public RequestHandlerActor() {
}
@Override
public void preStart() throws Exception {
getContext().become(idle);
}
@Override
public void onReceive(final Object message) throws Exception {
unhandled(message);
}
public static class FinalResponse {
public FinalResponse(final ResponseA a, final ResponseB b, final ResponseC c) {
// Code Here
}
}
public static class RequestA {
}
public static class ResponseA {
public final int id;
public ResponseA(final int id) {
this.id = id;
}
}
public static class RequestB {
public final int id;
public RequestB(final int id) {
this.id = id;
}
}
public static class ResponseB {
}
public static class RequestC {
public final int id;
public RequestC(final int id) {
this.id = id;
}
}
public static class ResponseC {
}
public static class StartProcessRequest {
}
}
akka.routing.SmallestMailboxRoutingLogic
final List<ActorRef> actors = ...Router router = new Router(new RoundRobinRoutingLogic(), actors.stream().map(ActorRefRoutee::new). collect(toCollection(() -> new ArrayList<>(value.size())));
Here is how to create your Router programmatically:
final List<ActorRef> actors = ...Router router = new Router(new RoundRobinRoutingLogic(), actors.stream().map(ActorRefRoutee::new).
collect(toCollection(() -> new ArrayList<>(actors.size())));
In this case I'm using RoundRobinRoutingLogic, you can use any of the pre-defined ones or you could implement your own.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Patrik Nordwall
Akka Tech Lead
Lightbend - Reactive apps on the JVM
Twitter: @patriknw