XPublisherSocket : A non-blocking socket operation could not be completed immediately

289 views
Skip to first unread message

Johan Lindberg

unread,
Nov 30, 2015, 8:45:45 AM11/30/15
to netmq-dev
Still having trouble with my server. 
Get loads of the following error

A non-blocking socket operation could not be completed immediately

NetMQ.Core.Mailbox.TryRecv : Line = 0 Column 0 : offset 339 
NetMQ.Core.SocketBase.ProcessCommands : Line = 0 Column 0 : offset 147 
NetMQ.Core.SocketBase.GetSocketOption : Line = 0 Column 0 : offset 81 
NetMQ.Core.Utils.Selector.Select : Line = 0 Column 0 : offset 557 
NetMQ.Poller.PollWhile : Line = 0 Column 0 : offset 555

And in some occasions this error get logged in the eventlog and the service crasches

Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.Net.Sockets.SocketException
Stack:
   at NetMQ.Core.Mailbox.TryRecv(Int32, NetMQ.Core.Command ByRef)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.Core.Utils.Selector.Select(NetMQ.Core.Utils.SelectItem[], Int32, Int64)
   at NetMQ.Poller.PollWhile(System.Func`1<Boolean>)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()

Im using NetMQ 3.3.2.1
Running on Server 2008 R2
Service is built in x64.

I have two sockets on my server side, one sub/pub and one req/res , implemented as below.
Pub/Sub
  Task.Factory.StartNew(() =>

           
{
               
Thread.CurrentThread.Name = "NetMQ - Publish/Subscribe";


               
using (var server = _Context.CreateXPublisherSocket())
               
using (var poller = new Poller())
               
{
                    server
.Options.SendBuffer = ZMQConstants.MegaByte * 100; // 100 megabyte
                    server
.Options.SendHighWatermark = 10000;
                    server
.Options.ReceiveHighWatermark = 10000;
                    server
.Options.Linger = TimeSpan.FromSeconds(1);
                    server
.Bind(Address);


                    server
.ReceiveReady += server_ReceiveReady;
                    poller
.AddSocket(server);
                    poller
.PollTillCancelledNonBlocking();


                   
while (true)
                   
{
                       
try
                       
{
                           
var m = MessageQueue.Dequeue(_Cancel.Token);
                           
if (m == null)
                           
{
                                poller
.CancelAndJoin();
                                poller
.Dispose();
                                server
.Close();
                               
break;
                           
}
                            server
.SendMoreFrame(m.Key);
                            server
.SendFrame(m.AsBytes); //
                       
}
                       
catch (OperationCanceledException)
                       
{
                            poller
.CancelAndJoin();
                            poller
.Dispose();
                            server
.Close();
                           
break;

                       
}
                       
catch (Exception e)
                       
{
                           
ConnectionManager.Log(e);
                       
}
                   
}
               
}
           
}, TaskCreationOptions.LongRunning);

void server_ReceiveReady(object sender, NetMQSocketEventArgs e)
       
{
           
try
           
{
               
var message = e.Socket.ReceiveFrameString();
               
//Messagelogic
           
}
           
catch (Exception ex)
           
{
               
ConnectionManager.Log(ex);
           
}
       
}

And here's the Req/Res

  Task.Factory.StartNew(() =>
           
{
               
using (var server = _Context.CreateResponseSocket())
               
{
                   
Thread.CurrentThread.Name = "NetMQ - Request/Response";
                    server
.Bind(Address);
                   
while (true)
                   
{
                       
try
                       
{
                           
bool more;
                           
var msg = server.ReceiveFrameBytes(out more);
                           
if (more)

                               
throw new NotImplementedException();
                           
//msglogic
                            server
.SendFrame(handleMessageFunc(msg).AsBytes);

                       
}
                       
catch (Exception e)
                       
{
                           
ConnectionManager.Log(LogType.Error, e.Message, "Response", e.StackTrace);
                       
}
                   
}
               
}
           
}, TaskCreationOptions.LongRunning).ContinueWith(v => { ConnectionManager.Log(v.Exception); }, TaskContinuationOptions.OnlyOnFaulted);

Any errors in the implementation that can create the error?
Message has been deleted

Johan Lindberg

unread,
Dec 1, 2015, 2:54:53 AM12/1/15
to netmq-dev
I can reproduce the error with 1 client by doing the following.

client is subscribing to all the messages in the queue.

Loop
--------------------------------------------
Block the transmit thread while filling the messagequeue with 10000 messages.
Stop filling the queue and unblock the transmit thread and let it send as fast as it can until the queue is empty
--------------------------------------------------------------------

I get the following errors..

1 A non-blocking socket operation could not be completed immediately
> System.dll!System.Net.Sockets.Socket.Receive(byte[] buffer, int offset, int size, System.Net.Sockets.SocketFlags socketFlags) Unknown
  NetMQ.dll!NetMQ.Core.Mailbox.TryRecv(int timeout, out NetMQ.Core.Command command) Unknown
  NetMQ.dll!NetMQ.Core.SocketBase.ProcessCommands(int timeout, bool throttle) Unknown
  NetMQ.dll!NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption option) Unknown
  NetMQ.dll!NetMQ.Core.Utils.Selector.Select(NetMQ.Core.Utils.SelectItem[] items, int itemsCount, long timeout) Unknown
  NetMQ.dll!NetMQ.Poller.PollWhile(System.Func<bool> condition) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) Unknown
  mscorlib.dll!System.Threading.ThreadHelper.ThreadStart() Unknown
2 Null Exception
> NetMQ.dll!NetMQ.Core.SocketBase.ProcessCommands(int timeout, bool throttle) Unknown
  NetMQ.dll!NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption option) Unknown
  NetMQ.dll!NetMQ.Core.Utils.Selector.Select(NetMQ.Core.Utils.SelectItem[] items, int itemsCount, long timeout) Unknown
  NetMQ.dll!NetMQ.Poller.PollWhile(System.Func<bool> condition) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) Unknown
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) Unknown
  mscorlib.dll!System.Threading.ThreadHelper.ThreadStart() Unknown

Doron Somech

unread,
Dec 1, 2015, 2:59:02 AM12/1/15
to Johan Lindberg, netmq-dev
You are using the server socket from multiple threads which is not supported.

In your first code example you are adding the server to the poller, run it in a new thread and then send on server from a different thread.

Try to use NetMQQueue to use a queue with poller, also take a look at NetMQActor which helps open a thread and dispose it.

--
You received this message because you are subscribed to the Google Groups "netmq-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netmq-dev+...@googlegroups.com.
To post to this group, send email to netm...@googlegroups.com.
Visit this group at http://groups.google.com/group/netmq-dev.
For more options, visit https://groups.google.com/d/optout.

Johan Lindberg

unread,
Dec 1, 2015, 3:09:49 AM12/1/15
to netmq-dev, badge...@gmail.com
Ah, at last i believe i grasp it, The sending should be done also with the poller, 
Will look into the NetMQQueue.

Thanks. 
Reply all
Reply to author
Forward
0 new messages