hoe to get data back from the consumer

106 views
Skip to first unread message

Anna

unread,
Oct 11, 2012, 4:43:12 AM10/11/12
to lmax-di...@googlegroups.com
Hi! I'm working with Disruptor and ZMQ Server-Client, my classes are in the same package:

App class it has two threads one for the ZMQ Server and the other for the ZMQ Client.

public class Client extends Thread {

@Override

public void run() {

// Prepare our context and socket

        ZMQ.Context context = ZMQ.context(1);

        ZMQ.Socket socket = context.socket(ZMQ.REQ);

        socket.connect ("tcp://localhost:5555");

        ZMQ.Socket synclient = context.socket(ZMQ.REQ);

synclient.connect("tcp://localhost:5562");

//send a synchronization request

synclient.send("".getBytes(),0);

//wait for synchronization reply

synclient.recv(0);


        // Do 10 requests, waiting each time for a response

        for(int request_nbr = 0; request_nbr < 100; request_nbr++) {

            

            String requestString = " Hello World ";

            byte[] request = requestString.getBytes();

            request[request.length-1]=0; //Sets the last byte to 0

  

            socket.send(request, 0);

            System.out.println("prova 1");

            

            byte[] reply = socket.recv(0);

            final String value = new String(reply);

            System.out.println("resposta " +value);

           

        }

        socket.close();

}


}

---------------------------------------------------------------------

public class Server extends Thread {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket socket = context.socket(ZMQ.REP);

protected int CLIENTS_EXPECTED = 1;

private Consumer consumer;

@Override

public void run() {

     socket.bind("tcp://*:5555");

     ZMQ.Socket socketsyn = context.socket(ZMQ.REP);

     socketsyn.bind("tcp://*:5562");

    

     int subscribers =0 ;

while(subscribers< CLIENTS_EXPECTED){

   //wait for synchronization request

   socketsyn.recv(0);

//send synchronization reply

socketsyn.send("".getBytes(), 0);

subscribers++;

}

    

     byte[] request;

     //Wait for next request from client

        //We will wait for a 0-terminated string (C string) from the client

     request = socket.recv (0);

     final String value = new String(request); 

                  

     Producer producer = new Producer();

producer.requestToRingBuffer(value);

    


}

-----------------------------------------------------------------------------------------------

The class Producer is storing the request from the Client in a RingBuffer


public class Producer {

private static final int size = 128;

private final ExecutorService executorService;

private final Disruptor<SimpleEvent> disruptor1;

private final RingBuffer<SimpleEvent> ringBuffer1;

Producer(){

        executorService = Executors.newCachedThreadPool(); // will use to execute the consumer threads

        disruptor1 = new Disruptor<SimpleEvent>(SimpleEvent.FACTORY,sizeexecutorService);

        disruptor1.handleEventsWith(new SimpleEventHandler());

        disruptor1.start();

        ringBuffer1 = disruptor1.getRingBuffer();

    }

 public void requestToRingBuffer(String request) { //our producer method


      final long sequence = ringBuffer1.next(); //claim a slot

      SimpleEvent simpleEvent = ringBuffer1.get(sequence);   

     

      simpleEvent.time = System.currentTimeMillis();

        simpleEvent.text = request;

        simpleEvent.level = 0;

         

      ringBuffer1.publish(sequence); //then copy our value into that slot’s entry and finally publish the slot

      long cursor = ringBuffer1.getCursor(); //Get the current sequence that is published to the RingBuffer

      System.out.println("cursor: " +cursor);

     

    }

     public void stop(){ //takes care of waiting until all consumers have processed all available entries

     

      disruptor1.shutdown();

      executorService.shutdownNow();

     }


}

Then SimpleEventHandler sends the event to another class which only reverse the request from the Client and stores it in another ring buffer and the consumer will read from the second ring buffer

public class Consumer {

String reply;

public void main(String [] args){

}

public void setReply(String reply){

this.reply=reply;

}

}


Until here everything is working! 

My problem is how I can send the reply back to the Server? without create a new Server in the Consumer class neither make the Server static,

is there any option with the Disruptor to do this?Thanks


Anna.


Michael Barker

unread,
Oct 17, 2012, 1:42:04 AM10/17/12
to lmax-di...@googlegroups.com
There is nothing specific in the Disruptor that will solve your design
problem for you. Remember the Disruptor is just a library to pass
messages between threads. Typically in we have 2 ring buffers in each
service, one that the business logic writes to to send messages out on
the network and a second that a receiver writes to when it receives
messages from the network. We would have business logic thread(s)
that read from the receiver's ring buffer and write to the senders
one.

Mike.
> --
>
>
Reply all
Reply to author
Forward
0 new messages