Single publisher, multiple consumers push/pull. How to block on publish side

1,244 views
Skip to first unread message

Chris Whiten

unread,
Jan 14, 2014, 4:33:01 PM1/14/14
to netm...@googlegroups.com
I have a single publisher pushing messages and multiple consumers competing for those messages to parallelize some work. I'm running into an issue where at times of high data volume, the publisher is pushing much more quickly than the consumers can process the data, so I'd like to block the send if the number of unclaimed messages is too large (and I do not want those unclaimed messages to be thrown away).  How can this be achieved? I had a look at the high watermark but setting this just seems to dictate how many messages to buffer before throwing away future messages.

Here is the test I was running

// publisher process
public static void NetMQPublisher()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var publisher = ctx.CreatePushSocket())
                {
                    publisher.Bind("tcp://127.0.0.1:4557");
                    
                    int i = 0;
                    while (true)
                    {
                        
                        publisher.Send(i.ToString());
                        Console.WriteLine(i.ToString());
                        i++;
                    }
                }
            }
        }

// multiple consumer processes, all running this code
        public static void NetMQConsumer()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var consumer = ctx.CreatePullSocket())
                {
                    consumer.Connect("tcp://127.0.0.1:4557");
                    while (true)
                    {
                        string msg = consumer.ReceiveString();
                        Console.WriteLine(msg);
                        Thread.Sleep(1000); 
                    }
                }
            }
        }

Thanks for any guidance on this.  I can't seem to find any NetMQ documentation exploring this topic.

Doron Somech

unread,
Jan 15, 2014, 4:41:01 AM1/15/14
to Chris Whiten, netm...@googlegroups.com
Hi Chris,

First because NetMQ and ZeroMQ are actually the same you can explore the ZeroMQ documentation as well,I also suggest posting the question at the ZeroMQ mailing list (zeromq-dev@lists.zeromq.org).

Regarding your question there is no out of the box solution for this, setting the HighWatermark won't help because push-pull is async and the pushing side doesn't know that the processing is taking time.

I think there are some pattern in the zeromq guide (http://zguide.zeromq.org) but I don't remember which one address this issue.

Bottom line, post the question at the zeromq mailing list, I'm sure they can help you better.

Doron




--
You received this message because you are subscribed to the Google Groups "netmq-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netmq-dev+...@googlegroups.com.
To post to this group, send email to netm...@googlegroups.com.
Visit this group at http://groups.google.com/group/netmq-dev.
For more options, visit https://groups.google.com/groups/opt_out.

Chris Whiten

unread,
Jan 15, 2014, 2:35:40 PM1/15/14
to netm...@googlegroups.com, Chris Whiten
Hi Doron,

I posted on the ZeroMQ mailing list (http://permalink.gmane.org/gmane.network.zeromq.devel/22351) and it seems that this push/pull pattern is supposed to block when the pullers are busy, which is what I had originally thought.  

Is there a fundamental misunderstanding here, where the code provided is using a separate pattern?  Or is it just that NetMQ has missed this part of the ZeroMQ spec?

Doron Somech

unread,
Jan 16, 2014, 8:28:30 AM1/16/14
to Chris Whiten, netm...@googlegroups.com
Hi Chris,

Try to set low watermark (maybe even 1) on both the push socket and the pull socket (send watermark and receive watermark accordingly).

The behavior should be exactly the same as ZeroMQ, so I believe if you use clrzmq with ZeroMQ or NetMQ you will get same behavior.

Chris Whiten

unread,
Jan 16, 2014, 8:46:06 AM1/16/14
to netm...@googlegroups.com, Chris Whiten
Hi Doron,

I believe this is what I've done.  Perhaps my setup is incorrect, could you please have a look?  The test code is here: http://pastebin.com/4aNfPS8h

The behaviour that I'm seeing is that once the consumer connects to the port bound by the publisher, the publisher rushes through hundreds (thousands?) of messages a second, even though the consumer is only accepting one message per second (1 second sleep after each receive).

Thanks,
Chris

Doron Somech

unread,
Jan 16, 2014, 8:56:48 AM1/16/14
to Chris Whiten, netm...@googlegroups.com
The code looks ok, I have two questions:

* Can you check this code with clrzmq and see if the behavior is different?
* Do you loose messages? because i believe once the tcp buffer will be full the pushing will be stopped...

Chris Whiten

unread,
Jan 16, 2014, 12:46:17 PM1/16/14
to netm...@googlegroups.com, Chris Whiten
You're right.  Thanks Doron.  I just wasn't waiting long enough for the tcp buffer to fill up so I never ended up seeing it pause.

There does seem to be an issue with this though.  I am occasionally getting a socket exception from the push socket's .Send() method.

System.Net.Sockets.SocketException (0x80004005): A non-blocking socket operation could not be completed immediately
   at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
   at System.Net.Sockets.Socket.Receive(Byte[] buffer)
   at NetMQ.zmq.Signaler.Recv()
   at NetMQ.zmq.Mailbox.Recv(Int32 timeout)
   at NetMQ.zmq.SocketBase.ProcessCommands(Int32 timeout, Boolean throttle)
   at NetMQ.zmq.SocketBase.Send(Msg msg, SendReceiveOptions flags)
   at NetMQ.zmq.ZMQ.SendMsg(SocketBase s, Msg msg, SendReceiveOptions flags)
   at NetMQ.zmq.ZMQ.Send(SocketBase s, Msg msg, SendReceiveOptions flags)
   at NetMQ.NetMQSocket.Send(Byte[] data, Int32 length, SendReceiveOptions options)
   at NetMQ.NetMQSocket.Send(Byte[] data, Int32 length, Boolean dontWait, Boolean sendMore)
   at NetMQ.OutgoingSocketExtensions.Send(IOutgoingSocket socket, String message, Boolean dontWait, Boolean sendMore)

The error code is 10035.  Is this something you've seen?

Thanks,
Chris

Doron Somech

unread,
Jan 16, 2014, 3:15:30 PM1/16/14
to Chris Whiten, netm...@googlegroups.com
Do you by any chance using the netmq socket from two different threads?

Chris Whiten

unread,
Jan 21, 2014, 3:05:43 PM1/21/14
to netm...@googlegroups.com, Chris Whiten
Sorry for the delayed response.  No, there is only a single point of entry into a function which is just publishing longs continuously.  This error shows up seemingly randomly.  Sometimes it does not appear for hours, sometimes it appears within a few minutes of application start.  In my currently running example, there are 10 worker processes reading and processing messages from the queue, but still only one publishing messages to it.

Doron Somech

unread,
Jan 22, 2014, 5:24:15 AM1/22/14
to Chris Whiten, netm...@googlegroups.com
Can you run in debug and get the line number for each of the method as well?

thanks
Reply all
Reply to author
Forward
0 new messages