ZeroMQ simple echo server

467 views
Skip to first unread message

Svetlozar Argirov

unread,
Sep 26, 2014, 6:06:00 AM9/26/14
to reactor-...@googlegroups.com
Hi  ,

I am trying the new ZeroMQ integration with a simple echo server  like this ( using spring boot  to fire it up):

    @Bean
    public Pipe[] zmqStart(Environment env, CountDownLatch latch) {
        ZeroMQ<Buffer> zmq = new ZeroMQ<>(env);

        Pipe taskQ = new Pipe();
        zmq.reply("tcp://*:5555")
                .onSuccess(ch -> {
                    //ch.when(Exception.class, err ->{
                    //    logger.severe(err.toString());
                    //});
                    ch.on().close(new Runnable() {
                        @Override
                        public void run() {
                            logger.warning("Channel closed");
                        }
                    });
                    //taskQ.in = ch;
                    //logger.info("tasQ.in = " + ch);
                })
                .consume(ch -> {
                    Stream<Buffer> in = ch.in();
                    in.consume(req -> {
                        logger.warning("Consumed " + new String(req.asBytes()));
                        ch.send(req)
                                .onComplete(res ->{
//                                    logger.warning(res.toString());
                                });
                    });
                });
        return new Pipe[]{taskQ};
    }


And simple python client to send messages :

#!/usr/bin/env python3
import zmq

context = zmq.Context()

# Socket to talk to server
print("Connecting to hello world server")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

# Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s " % request)
    socket.send(b"Hello")

    # Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))


And what happens is that I get two messages in the server  :

2014-09-26 13:03:17.256  INFO 22690 --- [  zmq-server-16] reactor.net.zmq.tcp.ZeroMQTcpServer      : BIND: starting ZeroMQ REP socket on tcp://*:5555
2014-09-26 13:03:17.485  INFO 22690 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2014-09-26 13:03:17.549  INFO 22690 --- [           main] z.c.c.xmlrpc.server.SimpleBroker         : Started SimpleBroker in 4.654 seconds (JVM running for 5.152)
2014-09-26 13:03:29.730  WARN 22690 --- [   ringBuffer-9] ZMQ Echo                                 : Consumed Hello
2014-09-26 13:03:29.736  WARN 22690 --- [   ringBuffer-9] ZMQ Echo                                 : Consumed Hello


and only two replies in the client :

Connecting to hello world server
Sending request 0
Received reply 0 [ b'Hello' ]
Sending request 1
Received reply 1 [ b'Hello' ]
Sending request 2

and at that point the client is just waiting for the 3rd reply, but there is no message received in the server. Also if I interrupt the client, and w/o restarting the server run the client again the same thing happens, I get only 2 messages.
Anybody has any clue why I get only 2 messages, and where does the 3rd go ? Is there any way to catch errors that the reactor has probably just ignored ?

Thanks,
Zaro

Jon Brisbin

unread,
Sep 26, 2014, 11:08:31 AM9/26/14
to reactor-...@googlegroups.com, Svetlozar Argirov, smal...@pivotal.io
Looks like we might have a bug. If you pass SynchronousDispatcher.INSTANCE as the second argument to the ZeroMQ CTOR things seem to flow fine. This works for me:

@Test
public void pythonClientTest() throws InterruptedException {
ZeroMQ<Buffer> zmq = new ZeroMQ<>(getServerEnvironment(), SynchronousDispatcher.INSTANCE);

zmq.reply("tcp://*:5555")
  .onSuccess(ch -> {
  ch.on().close(() -> log.info("channel closed: {}", ch));

  BatchConsumer<Buffer> out = ch.out();
  ch.in()
    .map(Buffer::asString)
    .consume(s -> {
    log.info("recv'd msg: {}", s);
    out.accept(Buffer.wrap("Hello World!"));
    });
  });

while (true) {
Thread.sleep(5000);
}
}


Thanks!

Jon Brisbin
Reactor Project Lead
@j_brisbin | @ProjectReactor
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jon Brisbin

unread,
Sep 26, 2014, 11:13:56 AM9/26/14
to reactor-...@googlegroups.com, Svetlozar Argirov, smal...@pivotal.io
FWIW it appears to be because using the RingBufferDispatcher means you’re trying to write to the socket from a non-ZMQ thread. We need to make sure that any writes that happen get scheduled in the right thread. That’s why sync works fine.

Thanks!

Jon Brisbin
Reactor Project Lead
@j_brisbin | @ProjectReactor

Svetlozar Argirov

unread,
Sep 26, 2014, 12:49:25 PM9/26/14
to reactor-...@googlegroups.com
Thanks for fast response, it solved the issue  :)

I have one more question though. For my real app, I want to use ROUTER socket, but the API exposes only server ROUTER socket, and while for the server sockets I can create any with createServer , createClient is private, which makes it impossible to create ROUTER client socket, and according to the ZMQ doc this is perfectly valid .

Is it intentional to forbid using createClient directly ?

Cheers,
Zaro

Jon Brisbin

unread,
Sep 26, 2014, 1:11:50 PM9/26/14
to reactor-...@googlegroups.com, Svetlozar Argirov
Unfortunately I didn’t have time to fully flesh out the client part of ROUTER/DEALER like I really wanted to. We can make changes and additions in this area and I welcome any feedback in that regard.

It might be as simple for the time being as exposing that method. We just need to create some test cases for that.


Thanks!

Jon Brisbin
Reactor Project Lead
@j_brisbin | @ProjectReactor

Svetlozar Argirov

unread,
Sep 29, 2014, 8:27:32 AM9/29/14
to reactor-...@googlegroups.com, zar...@gmail.com
I was playing a bit more with the PUSH/PULL sockets, just trying to get  something like a pipe running, reading from a pull socket and writing the message on a push socket. The thing is these are both server sockets  so it roughly looks like this :

        zmq.pull("tcp://*:5560")
                .onSuccess(ch -> {
                    respQ.in = ch;
                    logger.info("respQ.in = " + ch);

                })
                .consume(ch -> {
                    Stream<Buffer> in = ch.in();
                    //respQ.out.send(in);
                    in.consume(req ->{
                        logger.info("respQ.in < "+req.asString());
                        if (respQ.out != null)
                            respQ.out.send(req);
                        else
                            logger.warning("No respQ.out, ignoring message:"+ req.asString());
                    });
                });

        zmq.createServer("tcp://*:5561", ZMQ.PUSH)
                .onSuccess(ch -> {
                    respQ.out = ch;
                    logger.info("respQ.out = " + ch);

                }).onComplete(r -> logger.warning("** " + r)).consume(r -> logger.warning("*** " + r));


onSuccess/onComplete on the zmq.createServer("tcp://*:5561", ZMQ.PUSH) never fires. I couldn't really understand why, PUSH server socket is perfectly valid and I am using it in a C++ code.
So I dug a bit in the reactor ZMQ code.
I couldn't find the reason  for this, as I don't understand everything in Reactor, but I found something about the design is very strange. Now it seems that for every ZMQ socket , server or client, a worker thread is started and each worker thread has its own ZLoop, that polls for events on this socket. Doesn't this defy the purpose of the event loop, given the fact that ZMQ has configurable number of IO threads ? Maybe it is required because of how Reactor works, I have no idea, but  I would rather have only one ZLoop, that polls all zmq sockets, as it will be  more efficient .

Cheers,
Zaro

Reply all
Reply to author
Forward
0 new messages