public class Client extends Thread {
@Override
public void run() {
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
socket.connect ("tcp://localhost:5555");
ZMQ.Socket synclient = context.socket(ZMQ.REQ);
synclient.connect("tcp://localhost:5562");
//send a synchronization request
synclient.send("".getBytes(),0);
//wait for synchronization reply
synclient.recv(0);
// Do 10 requests, waiting each time for a response
for(int request_nbr = 0; request_nbr < 100; request_nbr++) {
String requestString = " Hello World ";
byte[] request = requestString.getBytes();
request[request.length-1]=0; //Sets the last byte to 0
socket.send(request, 0);
System.out.println("prova 1");
byte[] reply = socket.recv(0);
final String value = new String(reply);
System.out.println("resposta " +value);
}
socket.close();
}
}
---------------------------------------------------------------------
public class Server extends Thread {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
protected int CLIENTS_EXPECTED = 1;
private Consumer consumer;
@Override
public void run() {
socket.bind("tcp://*:5555");
ZMQ.Socket socketsyn = context.socket(ZMQ.REP);
socketsyn.bind("tcp://*:5562");
int subscribers =0 ;
while(subscribers< CLIENTS_EXPECTED){
//wait for synchronization request
socketsyn.recv(0);
//send synchronization reply
socketsyn.send("".getBytes(), 0);
subscribers++;
}
byte[] request;
//Wait for next request from client
//We will wait for a 0-terminated string (C string) from the client
request = socket.recv (0);
final String value = new String(request);
Producer producer = new Producer();
producer.requestToRingBuffer(value);
}
-----------------------------------------------------------------------------------------------
The class Producer is storing the request from the Client in a RingBuffer
public class Producer {
private static final int size = 128;
private final ExecutorService executorService;
private final Disruptor<SimpleEvent> disruptor1;
private final RingBuffer<SimpleEvent> ringBuffer1;
Producer(){
executorService = Executors.newCachedThreadPool(); // will use to execute the consumer threads
disruptor1 = new Disruptor<SimpleEvent>(SimpleEvent.FACTORY,size, executorService);
disruptor1.handleEventsWith(new SimpleEventHandler());
disruptor1.start();
ringBuffer1 = disruptor1.getRingBuffer();
}
public void requestToRingBuffer(String request) { //our producer method
final long sequence = ringBuffer1.next(); //claim a slot
SimpleEvent simpleEvent = ringBuffer1.get(sequence);
simpleEvent.time = System.currentTimeMillis();
simpleEvent.text = request;
simpleEvent.level = 0;
ringBuffer1.publish(sequence); //then copy our value into that slot’s entry and finally publish the slot
long cursor = ringBuffer1.getCursor(); //Get the current sequence that is published to the RingBuffer
System.out.println("cursor: " +cursor);
}
public void stop(){ //takes care of waiting until all consumers have processed all available entries
disruptor1.shutdown();
executorService.shutdownNow();
}
}
Then SimpleEventHandler sends the event to another class which only reverse the request from the Client and stores it in another ring buffer and the consumer will read from the second ring buffer
public class Consumer {
String reply;
public void main(String [] args){
}
public void setReply(String reply){
this.reply=reply;
}
}
Until here everything is working!
My problem is how I can send the reply back to the Server? without create a new Server in the Consumer class neither make the Server static,
is there any option with the Disruptor to do this?Thanks
Anna.