package com.akka.UDP;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import com.akka.project.Pi.Worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.routing.RoundRobinRouter;
public class UDPServer {
public static void main(String args[]) throws Exception
{
DatagramSocket serverSocket = new DatagramSocket(10001);
byte[] receiveData = new byte[1024];
byte[] sendData = new byte[1024];
System.out.println("################ SERVER STARTED ######################## ");
while(true)
{
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
serverSocket.receive(receivePacket);
String sentence = new String( receivePacket.getData());
System.out.println("RECEIVED: " + sentence);
if(receivePacket!=null && receivePacket.getData().length > 0){
UDPServer server = new UDPServer();
String message = new String(receivePacket.getData());
server.processMessage(4, message);
}
}
}
public void processMessage(final int numberOfWorkers, final String message){
final ActorSystem system = ActorSystem.create("UDPMessageProcessing");
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(numberOfWorkers, message, system);
}
}), "master");
master.tell(message, master);
}
static class Master extends UntypedActor{
private final int numberOfWorker;
private String message;
private final ActorRef workerRouter;
public Master(int numberOfWorker, String message, ActorSystem system){
this.numberOfWorker = numberOfWorker;
this.message = message;
workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(new RoundRobinRouter(numberOfWorker)));
}
@Override
public void onReceive(Object arg0) throws Exception {
if(arg0 instanceof String){
String message = (String)arg0;
System.out.println("############## MESSAGE RECEIEVED BY MASTER - "+ getSender().toString());
workerRouter.tell(new Work(message), getSender());
}else if(arg0 instanceof Work){
System.out.println("############ GOT THE RESPONSE FROM WORKER AND PROCESSED MESSAGE --- " + ((Work)arg0).getMessage());
}
else{
getContext().stop(getSelf());
System.out.println("#### IS ALIVE -- " + getSelf().isTerminated());
unhandled(arg0);
}
}
}
public static class Worker extends UntypedActor {
/* public Worker(){
System.out.println("########################## WORKER " + getSender().toString());
}*/
@Override
public void onReceive(Object arg0) throws Exception {
if(arg0 instanceof Work){
Work work = (Work) arg0;
System.out.println("########################## WORKER " + getSelf().toString());
System.out.println("###################### WORKER RECEIVED MESSAGE -- " + work.getMessage());
getSender().tell(work, getSelf());
}
else{
getContext().stop(getSelf());
System.out.println("#### IS WORKER ALIVE -- " + getSelf().isTerminated());
unhandled(arg0);
}
}
}
static class Work{
private String message;
public Work(String message){
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
}
UDP Client :
package com.akka.UDP;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
public class UDPClient {
public static void main(String args[]) throws Exception
{
BufferedReader inFromUser =
new BufferedReader(new InputStreamReader(System.in));
DatagramSocket clientSocket = new DatagramSocket();
InetAddress IPAddress = InetAddress.getByName("localhost");
byte[] sendData = null;
byte[] receiveData = new byte[1024];
String sentence = inFromUser.readLine();
sendData = sentence.getBytes();
for (int i=1 ; i<10;i++){
sendData = new byte[1024];
String message = "Message "+i;
sendData = message.getBytes();
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, 10001);
clientSocket.send(sendPacket);
sendData = null;
}
clientSocket.close();
}
}
Above mentioned code is error free but I am unable to understand one thing which prints out when I run the application.
Whenever System.out.println("########################## WORKER " + getSelf.toString()); this line is executed it always prints "UDPMessageProcessing/user/$a$a"
I am expecting it to print something like " "UDPMessageProcessing/user/$a$b"
" which shows each time a different routee is executing the process?
Can anyone please let me know what wrong I am doing in this ?