Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Asynchronious reception of data hangs after MSMQ service restart

351 views
Skip to first unread message

Morten Herman Langkjaer

unread,
Mar 3, 2008, 4:32:03 AM3/3/08
to
When using Async reception of data from a message queue, resetting MSMQ will
cause the application to hang in the EndReceive call. When using
synchroinious reception this does not occur.

I have create two test programs to prove the error.

Consider doing the following:
1: start MessageQueueingTestReceiver application
2: start MessageQueueingTestSender application to test that the receicer
works.
3: restart Message queueing service from the service manager
4: start MessageQueueingTestSender to see that the receiver hs stopped
working.


/*********** MessageQueueingTestReceiver ***********/
using System;
using System.IO;
using System.Messaging;
using System.Threading;

namespace MessageQueueingTestReceiver
{
class Program
{
static void Main(string[] args)
{
MessageQueue queue;
if( MessageQueue.Exists(".\\private$\\testErroneousMessageQueueing") )
{
queue = new MessageQueue(".\\private$\\testErroneousMessageQueueing");
}
else
{
queue = MessageQueue.Create(".\\private$\\testErroneousMessageQueueing");
}

while( true )
{
Message m = null;
try
{
IAsyncResult res = queue.BeginReceive();
m = queue.EndReceive(res);
}
catch( Exception ){}

if( m != null )
{
using( StreamReader r = new StreamReader(m.BodyStream) )
{
Console.Out.WriteLine(r.ReadToEnd());
}
}
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
}
}

/*********** MessageQueueingTestReceiver ***********/

/*********** MessageQueueingTestSender ***********/
using System.Messaging;

namespace MessageQueueingTestSender
{
class Program
{
static void Main(string[] args)
{
MessageQueue queue;
if( MessageQueue.Exists(".\\private$\\testErroneousMessageQueueing") )
{
queue = new MessageQueue(".\\private$\\testErroneousMessageQueueing");
}
else
{
queue = MessageQueue.Create(".\\private$\\testErroneousMessageQueueing");
}

queue.Send("test");
queue.Dispose();
}
}
}
/*********** MessageQueueingTestSender ***********/

Yoel Arnon

unread,
Mar 4, 2008, 11:21:12 AM3/4/08
to
Hello Morten,
Actually your code looks a little weird - like an attempt to implement
synchronous receive using asynchronous calls... Normally if you call
BeginReceive you either define a receive function and set ReceiveCompleted
event handler, or wait on IAsyncResult.AsyncWaitHandle - which allows you to
wait on multiple queues, or on message received alongside other events.

Anyway, the problem you encountered is a real one - I can even say that it
looks like a bug in MSMQ. Here is what happened:

Under the hood, System.Messaging has to open a queue for send or receive
before accessing it. However, this operation is hidden from the programmer
and happens automatically in the first send or receive (actually if you send
to a queue after you received from it or vice versa, System.Messaging will
first close the queue and then re-open it in the new mode).

You can follow these actions by looking at the queue's ReadHandle or
WriteHandle.

When MSMQ service re-starts, all the handles become invalid and thus all the
queues have to be re-opened.

Here is the bug that you seem to have found:

When you call MessageQueue.Receive using an invalid handle (like a handle
from an earlier run of MSMQ service), System.Messaging will automatically
re-open the queue. You can see that by comparing queue.ReadHandle before and
after the call.

However, when you call MessageQueue.BeginReceive using an invalid handle, it
does not realize that the handle is invalid and the ReadHandle stays the
same.

What can be done to work around the problem?

1. Handle exceptions and call queue.Close() inside the exception handler.
This would help if MSMQ stops while you are in EndReceive - because in that
case you will get an exception, and if the queue is close, the subsequent
BeginReceive will attempt to re-open it.

2. Wait on the AsyncWaitHandle before calling EndReceive, and use a timeout
(use WaitHandle.WaitAny , for example, with timeout). If the timeout
expires, call queue.Close.

3. Always call queue.Close() before calling BeginReceive. This is not
recommended since it may cause performance problems, especially if you are
using public queues.

4. Check validity of ReadHandle before calling BeginReceive. This is a
little tricky because you will probably have to use C++ code... I will try
to work on this and re-post.

Hope that helps,
Yoel
www.msmq.biz

"Morten Herman Langkjaer" <MortenHerm...@discussions.microsoft.com>
wrote in message news:ADFD85F9-D652-49E3...@microsoft.com...

Morten Herman Langkjaer

unread,
Mar 5, 2008, 3:05:06 AM3/5/08
to
Hi Yoel, thanks for the quick reply.

The code i wrote is pure sample code and for explanatory example only.

Due to requirements on modifiability, we have wrapped System.Messaging in
our own component that enables us to use multiple communication technologies
with the same interface. Due to other requirements e.g. win2000 wupport, we
cannot just switch to WCF. Knowing this prerequisite, we are in a situation
where we need to be able to do Synchroious reception of data, on both
transactional and non-transactional queues.

When running against non transactional queues everything works fine, since
we use the synchrone Receive method for data reception, anf the handle is
correctly recreated as you point out.

We also have another requirements that says we have to be able to abort
reception of data. Whe using synchrone non transactional queues, we use the
Receive method, and by disposing the MessageQueue, the blocking call is
stopped. However, we had the problem that when using the synchronious Receive
method with transaction, we are not able to abort the call by disposing the
MessageQueue. Therefore the queue.Receive(transaction) will hang untill a
message is sent to the transactional queue. Therfore we use BeginPeek,
EndPeek, in the way i showed in the sample codeto detect messages, but enable
abortion functinoality, since the endpeek method can be aborted by disposing
the MessageQueue. When using this approach, it makes our applicatoin hang on
the endpeek method when Message queueing service is restarted. I simply tried
to reproduce the error, by using Begin/EndReceive for explanatory reasons
only.

If you can solve my problem on aborting synchronius transaction handled
reception, i could just use that approach instead, and then have the read
handle recomputed if it has been invalidated.

Sincerely Morten.

Yoel Arnon

unread,
Mar 6, 2008, 7:20:26 AM3/6/08
to
Hello Morten,
Yes, it is true that MSMQ does not support asynchronious receive for
transactional messages, and using asyncronious peek seems to be a good way
to work around this problem. However I would like to suggest an alternative
approach:

- Define a global ManualResetEvent (let's call it StopReceiving) that you
can set from another thread whenever you want to stop the MSMQ receive
(instead of disposing the queue)

- In your main loop:
- Call BeginPeek and obtain IASyncResult handler
- Wait on both StopReceiving and the async result's wait handle
- If StopReceiving was triggered - quit
- Otherwise - do a transactional receive, with zero timeout. Note that
you still can get a timeout exception in case someone grabbed the message
after the wait handle was triggered.

The code should be something like:
On the Program class level, define:
static ManualResetEvent StopReceiving = new ManualResetEvent(false);

This event can be set outside the receiving thread to stop the receive.

The main loop will be something like:

while (true)

{

Message m = null;

try

{

IAsyncResult res = queue.BeginPeek();

int waitRes = WaitHandle.WaitAny(new WaitHandle[] {StopReceiving,
res.AsyncWaitHandle });

if (waitRes == 0) // StopReceiving was signaled

{

break;

}

using (MessageQueueTransaction trans = new
MessageQueueTransaction())

{

trans.Begin();

m = queue.Receive(new TimeSpan(0), trans);

//

// Do something

//

trans.Commit();

}

}

catch (MessageQueueException ex)

{

if (ex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)

{

// This is OK, just continue with the loop

continue;

}

queue.Close();

}

if (m != null)

{

using (StreamReader r = new StreamReader(m.BodyStream))

{

Console.Out.WriteLine(r.ReadToEnd());

}

}

}

Hope that helps,
Yoel
www.msmq.biz


"Morten Herman Langkjaer" <MortenHerm...@discussions.microsoft.com>
wrote in message news:49077049-4E0B-42F2...@microsoft.com...

Frank Boyne

unread,
Mar 10, 2008, 10:34:06 PM3/10/08
to
"Morten Herman Langkjaer"
<MortenHerm...@discussions.microsoft.com> wrote in message
news:49077049-4E0B-42F2...@microsoft.com...

> If you can solve my problem on aborting synchronius transaction
> handled
> reception, i could just use that approach instead, and then have the
> read
> handle recomputed if it has been invalidated.

One way to abort an outstanding Receive (even a transactional one) is to
close the underlying ReadHandle -- which you can do using the native
function MQCloseQueue. For example...

using System.Runtime.InteropServices;

. . .

[DllImport("mqrt.dll", SetLastError = false)]
public static extern uint MQCloseQueue (IntPtr qh);

thread 1 . . .

myQueue.Receive (myTx);

thread 2 . . .

uint hresult = MQCloseQueue (myQueue.ReadHandle);


0 new messages