Looking for some tips on using the Queue component

551 views
Skip to first unread message

Dick de Reus

unread,
Jun 6, 2017, 7:41:15 AM6/6/17
to netmq-dev
Hi

I'm just getting started with NetMQ and it looks like it is just what I need.

The example on the docs site the 'Queue' is just what I think I need  https://netmq.readthedocs.io/en/latest/queue/


using (var queue = new NetMQQueue<ICommand>())
using (var poller = new NetMQPoller { queue })
{
    queue.ReceiveReady += (sender, args) => ProcessCommand(queue.Dequeue());

    poller.RunAsync();

    // Then, from various threads...
    queue.Enqueue(new DoSomethingCommand());
    queue.Enqueue(new DoSomethingElseCommand());
}


One program putting message in a queue and the other picking them up.

But I can't quite figure out how to start putting the pieces together.

How do you tell the programs the address to send messages to and listen on?

The other patterns have a way to bind or connet to it:

pubSocket.Bind("tcp://localhost:12345");

I cant find that for the Queue


Edoardo Ciotti

unread,
Jun 7, 2017, 10:55:56 AM6/7/17
to netmq-dev
Some code snippets to help you figure it out.
It's vb, but I think you can easily understand.

OBJECT DECLARATIONS
  Public OutgoingBuffer As NetMQQueue(Of RRMessage)
  Private WithEvents DSocket As Sockets.DealerSocket = Nothing
 Private WithEvents poller As NetMQPoller = Nothing

 CONNECT SUB:
   '--Create socket
 DSocket = New Sockets.DealerSocket
 DSocket.Connect("tcp://" + prefs.ServerIP(SID) + ":" + prefs.ServerReqRepPort(SID))
                                        
  '--Create Poller
  OutgoingBuffer = New NetMQQueue(Of RRMessage)
  poller = New NetMQPoller
   poller.Add(DSocket)
  poller.Add(OutgoingBuffer)
  poller.RunAsync()

 '-- Hooks NetMQ events (generated by poller) to routines
  AddHandler DSocket.ReceiveReady, AddressOf ReceiveServerReply
  AddHandler OutgoingBuffer.ReceiveReady, AddressOf ConsumeOutgoingBuffer

...
End Sub


INCOMING MESSAGES SUB 
Private Sub ReceiveServerReply(sender As Object, e As NetMQSocketEventArgs)
...
End sub

OUTGOING MESSAGES SUB
 Private Sub ConsumeOutgoingBuffer(sender As Object, e As NetMQQueueEventArgs(Of RRMessage))
 Dim RRM As RRMessage = Nothing
While e.Queue.TryDequeue(RRM, TimeSpan.FromMilliseconds(20))
End Sub

Regards

Edoardo

jabak

unread,
Aug 14, 2017, 4:11:53 PM8/14/17
to netmq-dev
The following example is a bit bloated but it will give you an idea. It uses Hyperion serializer to serialize a MyMessage object into memorysteam that is feed to the outgoing queue as a bytearray. Also I use FluentScheduler to async send a message every 5 seconds.

  namespace NetMQClient
{
    class MyMessage
    {
        public string PropertyName { get; set; }
    }
    class Program
    {

        static NetMQQueue<byte[]> OutgoingQueue = new NetMQQueue<byte[]>();
        volatile static int failed = 0;
        static void Main(string[] args)
        {
            var SocketDemo = new SocketDemo();
        }

        public class SocketDemo
        {
            
            DealerSocket client = null;

            public SocketDemo()
            {
                client = new DealerSocket();
                client.Options.Identity = Encoding.Unicode.GetBytes(Guid.NewGuid().ToString());
                client.Options.SendHighWatermark = 1;
                client.Options.Linger = new TimeSpan(0, 0, 10);
                client.Connect("tcp://127.0.0.1:5556");

                var registry = new Registry();
                registry.Schedule(() =>
                {
                    var serializer = new Hyperion.Serializer(new Hyperion.SerializerOptions(versionTolerance: false, preserveObjectReferences: true));
                    var valserializer = serializer.GetSerializerByType(typeof(MyMessage));
                    for (int i = 0; i < 100; i++)
                    {
                        var ms = new MemoryStream();
                        valserializer.WriteValue(ms,new MyMessage { PropertyName = "abc"}, new Hyperion.SerializerSession(serializer));
                        ms.Position = 0;
                        OutgoingQueue.Enqueue(ms.GetBuffer());
                        ms.Close();

                        //Thread.Sleep(delay);
                    }
                }).ToRunEvery(5).Seconds();
                JobManager.Initialize(registry);

                var poller = new NetMQPoller { OutgoingQueue };
                
                client.ReceiveReady += Client_ReceiveReady;
                OutgoingQueue.ReceiveReady += (sender, args) => OutgoingData(OutgoingQueue.Dequeue());

                poller.Add(client);
                poller.RunAsync();
                Console.ReadLine();
                poller.StopAsync();
                poller.Dispose();
                client.Dispose();
            }

            private void OutgoingData(byte[] BytesToSendOut)
            {
                var messageToServer = new NetMQMessage();
                messageToServer.AppendEmptyFrame();
                messageToServer.Append(BytesToSendOut);
                var res = client.TrySendMultipartMessage(new TimeSpan(0,0,1), messageToServer);
                if (res == false)
                {
                    failed++;
                    Console.WriteLine(failed);
                }
                messageToServer.Clear();
            }

            private void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
            {
                var ms = e.Socket.ReceiveMultipartMessage();
                Console.WriteLine("REPLY {0}", ms[1].ConvertToString());
Reply all
Reply to author
Forward
0 new messages