[mule-user] How to make JMS zero message message loss

517 views
Skip to first unread message

jam zhou

unread,
Dec 6, 2011, 10:18:47 PM12/6/11
to us...@mule.codehaus.org
Hi,all

I config a VM inbound and a JMS outbound,when I shutdown the JMS server and restart,I found there are few message loss.
I send 500 message to VM,but only 498 message is put in to JMS.
How can I do?Any ideas?
Thank you.

-----
*mule-config.xml*
-----
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ftp="http://www.mulesoft.org/schema/mule/ftp"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:cxf="http://www.mulesoft.org/schema/mule/cxf"
xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ftp
http://www.mulesoft.org/schema/mule/ftp/3.2/mule-ftp.xsd
http://www.mulesoft.org/schema/mule/file
http://www.mulesoft.org/schema/mule/file/3.2/mule-file.xsd
http://www.mulesoft.org/schema/mule/cxf
http://www.mulesoft.org/schema/mule/cxf/3.2/mule-cxf.xsd
http://www.mulesoft.org/schema/mule/jms
http://www.mulesoft.org/schema/mule/jms/3.2/mule-jms.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.mulesoft.org/schema/mule/vm
http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
http://www.mulesoft.org/schema/mule/jbossts
http://www.mulesoft.org/schema/mule/jbossts/3.2/mule-jbossts.xsd">

<vm:connector name="vmConnector">
<receiver-threading-profile doThreading="false" />
<dispatcher-threading-profile doThreading="false" />
<vm:queue-profile>
<file-queue-store />
</vm:queue-profile>
</vm:connector>

<jbossts:transaction-manager/>

<payload-type-filter name="stringFilter" expectedType="java.lang.String" />

<flow name="clientService">
<inbound-endpoint address="http://localhost:12378/clientService" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.csn.telex.esb.webservice.ClientService" />
</inbound-endpoint>
<component>
<singleton-object class="com.csn.telex.esb.webservice.ClientServiceImpl" />
</component>
<vm:outbound-endpoint path="to-mq" exchange-pattern="one-way" connector-ref="vmConnector" />
</flow>

<flow name="MqOutBound">
<vm:inbound-endpoint path="to-mq" connector-ref="vmConnector">
<xa-transaction action="ALWAYS_BEGIN" />
</vm:inbound-endpoint>
<jms:outbound-endpoint name="mqOutmmm" queue="Telex.Test" connector-ref="jmsConnectormmm">
<xa-transaction action="ALWAYS_JOIN"/>
</jms:outbound-endpoint>
</flow>

<jms:activemq-xa-connector name="jmsConnectormmm" brokerURL="tcp://10.89.220.208:61616" createMultipleTransactedReceivers="false" >
<dispatcher-threading-profile doThreading="false" />
<reconnect-forever frequency="2000" />
</jms:activemq-xa-connector>
</mule>

-----
*Log of Exception*
-----
2011-12-07 10:47:41,390 WARN com.arjuna.ats.jta - ARJUNA16036: commit on < formatId=131076, gtrid_length=39, bqual_length=28, tx_uid=0:ffff0a59dc41:4dd:4edf43ea:ffa, node_name=Arjuna:1245, branch_uid=0:ffff0a59dc41:4dd:4edf43ea:ffe, eis_name=unknown eis name > (TransactionContext{transactionId=null}) failed with exception $XAException.XAER_RMFAIL
javax.transaction.xa.XAException: Connection reset
at org.apache.activemq.TransactionContext.toXAException(TransactionContext.java:738)
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:551)
at org.mule.transport.jms.xa.XAResourceWrapper.commit(XAResourceWrapper.java:87)
at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelCommit(XAResourceRecord.java:443)
at com.arjuna.ats.arjuna.coordinator.BasicAction.doCommit(BasicAction.java:2748)
at com.arjuna.ats.arjuna.coordinator.BasicAction.doCommit(BasicAction.java:2664)
at com.arjuna.ats.arjuna.coordinator.BasicAction.phase2Commit(BasicAction.java:1799)
at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1492)
at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:99)
at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:159)
at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1158)
at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:119)
at org.mule.transaction.XaTransaction.doCommit(XaTransaction.java:117)
at org.mule.transaction.AbstractTransaction.commit(AbstractTransaction.java:82)
at org.mule.transaction.TransactionTemplate.resolveTransaction(TransactionTemplate.java:147)
at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:117)
at org.mule.transport.TransactedPollingMessageReceiver.poll(TransactedPollingMessageReceiver.java:138)
at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:219)
at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:85)
at org.mule.transport.ContinuousPollingReceiverWorker.poll(ContinuousPollingReceiverWorker.java:36)
at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:53)
at org.mule.work.WorkerContext.run(WorkerContext.java:310)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: javax.jms.JMSException: Connection reset
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1295)
at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:707)
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:526)
... 23 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:576)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:561)
at java.io.DataInputStream.readInt(Unknown Source)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:227)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:219)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
... 1 more
2011-12-07 10:47:41,390 ERROR org.mule.exception.DefaultSystemExceptionStrategy -
********************************************************************************
Message : Connection reset (javax.jms.JMSException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. Connection reset (java.net.SocketException)
java.net.SocketInputStream:-1 (null)
2. Connection reset(JMS Code: null) (javax.jms.JMSException)
org.apache.activemq.util.JMSExceptionSupport:49 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/JMSException.html)
3. Connection reset (javax.jms.JMSException) (org.mule.transport.ConnectException)
org.mule.transport.jms.JmsConnector:481 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/ConnectException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:576)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

2011-12-07 10:47:41,390 WARN org.mule.transport.jms.activemq.ActiveMQXAJmsConnector - Exception cleaning up JMS connection: null
2011-12-07 10:47:41,390 WARN org.mule.transport.jms.activemq.ActiveMQXAJmsConnector - Exception closing JMS connection: Connection reset
2011-12-07 10:47:41,406 ERROR org.mule.exception.DefaultMessagingExceptionStrategy -
********************************************************************************
Message : Cannot process event as "jmsConnectormmm" is stopped
Code : MULE_ERROR-70167
--------------------------------------------------------------------------------
Exception stack is:
1. Cannot process event as "jmsConnectormmm" is stopped (org.mule.api.lifecycle.LifecycleException)
org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor:42 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.api.lifecycle.LifecycleException: Cannot process event as "jmsConnectormmm" is stopped
at org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor.handleUnaccepted(ProcessIfStartedMessageProcessor.java:42)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:45)
at org.mule.endpoint.DefaultOutboundEndpoint.process(DefaultOutboundEndpoint.java:95)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

-----

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

http://xircles.codehaus.org/manage_email


David Dossot

unread,
Dec 6, 2011, 10:26:31 PM12/6/11
to us...@mule.codehaus.org
Put the VM in JMS endpoints in one XA transaction ; or use until-sucessful around the JMS endpoint.

jam zhou

unread,
Dec 6, 2011, 10:33:32 PM12/6/11
to us...@mule.codehaus.org
David,thanks for you reply.
I have already put in the same XA-transaction, but Not work ,see:

<flow name="MqOutBound">
<vm:inbound-endpoint path="to-mq"
connector-ref="vmConnector">

*<xa-transaction action="ALWAYS_BEGIN" />*


</vm:inbound-endpoint>
<jms:outbound-endpoint name="mqOutmmm" queue="Telex.Test"
connector-ref="jmsConnectormmm">

*<xa-transaction action="ALWAYS_JOIN"/>*
</jms:outbound-endpoint>
</flow>

It still loss message.
Should I miss some config?

David Dossot

unread,
Dec 6, 2011, 10:50:01 PM12/6/11
to us...@mule.codehaus.org
Duh! Sorry I didn't see you already posted all your config :(

The user guide says to specify: exchange-pattern="one-way" on the vm:inbound-endpoint to be sure it is transactional. Can you try?

Otherwise, I don't see anything bad here. I'm unsure about the dispatcher-threading-profile doThreading="false" in your ActiveMQ config, not sure if this could impact something.

D.

jam zhou

unread,
Dec 7, 2011, 2:56:28 AM12/7/11
to us...@mule.codehaus.org
Hi,David.
I specify: exchange-pattern="one--way" on VM inbound-endpoint and delete the dispatcher-threading-profile.
but it still loss message.

I have no ideas about that~

why the mule can't rollback when occur the Exception of connection reset(I shutdown the JMS server and restart).
Any one help~

Andrew Perepelytsya

unread,
Dec 7, 2011, 7:53:55 AM12/7/11
to us...@mule.codehaus.org

This is not about Mule. By using one-way endpoint you introduce a point of failure into the flow. Did you try changing it as David suggested above?

Andrew

David Dossot

unread,
Dec 7, 2011, 12:06:46 PM12/7/11
to us...@mule.codehaus.org
Mmmh, the user guide strictly requires that the VM inbound endpoint be one-way:


when using transactions (to activate queuing it seems). I really don't get why the rollback doesn't happen, since the log shows that the transaction manager is aware of the issue.

Jam, could try adding these attributes on activemq-xa-connector:

            persistentDelivery="true" specification="1.1"

Cheers,
D.

jam zhou

unread,
Dec 7, 2011, 9:30:28 PM12/7/11
to us...@mule.codehaus.org
Hi,Andrew.

Thanks for your reply.

I try as David suggested.Specify the exchange-pattern to "one-way",but still lose message.

-----


<flow name="clientService">
<inbound-endpoint address="http://localhost:12378/clientService" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.csn.telex.esb.webservice.ClientService" />
</inbound-endpoint>
<component>
<singleton-object class="com.csn.telex.esb.webservice.ClientServiceImpl" />
</component>
<vm:outbound-endpoint path="to-mq" exchange-pattern="one-way" connector-ref="vmConnector" />
</flow>

-----

I shut down the JMS Server several times when the Application is running.
the Message handle by the component,I can see it handle 500 messages on the console.
but when I check the JMS , it just 499 messages.


*By using one-way endpoint you introduce a point of failure into the flow.---Andrew*
In your opinion, I should not user the "one-way",because it's multithreading?

Appreciate.
Jam

jam zhou

unread,
Dec 7, 2011, 9:46:45 PM12/7/11
to us...@mule.codehaus.org
Hi,David.

Thanks your replay again.

I have added the persistentDelivery and specification attributes to the activemq-xa-connector.
but still loss.

I post the log file attachment.

forgon-portal.log

Dirk Olmes

unread,
Dec 8, 2011, 12:54:59 AM12/8/11
to us...@mule.codehaus.org
> {quote:title=jam zhou wrote:}{quote}

> -----
> <flow name="clientService">
> <inbound-endpoint address="http://localhost:12378/clientService" exchange-pattern="request-response">
> <cxf:jaxws-service serviceClass="com.csn.telex.esb.webservice.ClientService" />
> </inbound-endpoint>
> <component>
> <singleton-object class="com.csn.telex.esb.webservice.ClientServiceImpl" />
> </component>
> <vm:outbound-endpoint path="to-mq" exchange-pattern="one-way" connector-ref="vmConnector" />
> </flow>
> -----
> *By using one-way endpoint you introduce a point of failure into the flow.---Andrew*
> In your opinion, I should not use the "one-way",because it's multithreading?

You don't use a transaction element on the vm endpoint so you're effectively leaving the thread that processes the incoming HTTP request after the component. The send to JMS will be handled on a different thread. If you change the EP to request-response, you will at least see the exception from an unsuccessful JMS send on the HTTP side. Or you use a transaction and the EP will be ignored and everything will be forced to be processed on the same thread.

-dirk

jam zhou

unread,
Dec 8, 2011, 1:28:41 AM12/8/11
to us...@mule.codehaus.org
Hi,Dirk

Thanks for your replay.

Yeah,if I change the EP to request-response,the message stay in the VM and didn't send to the JMS.

The Application didn't occur any Exception, it is wire.


Now, I change the EP back to one-way,and the problem is still not fix,it still loss message.

Appreciate your help,Dirk.

Cheers.
Jam

David Dossot

unread,
Dec 8, 2011, 12:36:07 PM12/8/11
to us...@mule.codehaus.org
I am total loss here: the user guide says:

one-way VM queues can take part in distributed XA Transactions.

Notice the emphasis on "one-way". So it should work then, no?

Jam, as a side test, could you try using a transient in-memory ActiveMQ broker in Mule instead of VM? The XA transaction would then encompass a local AMQ and a remote AMQ.

D.

jam zhou

unread,
Dec 9, 2011, 2:35:52 AM12/9/11
to us...@mule.codehaus.org
Hi,David

I delete the VM and change to local JMS.
But when I shutdown and restart my remote JMS, the message didn't read from the local JMS inbound-endpoint.Because the component "com.Test" is not execute.I sure the message is all in the local JMS inbound-endpoint.

After I restart my mule application again, it recovery, the message store in local JMS start to transport to the remote JMS again.That's not I wanted.
It is not good to restart my application when then remote JMS is restart.
I have set up a reconnect strategy,but still not work , just like blocking.
And also loss one message in the end.

here is my mule confit:
-----

<jbossts:transaction-manager/>

<flow name="clientService" processingStrategy="synchronous">


<inbound-endpoint address="http://localhost:12378/clientService" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.csn.telex.esb.webservice.ClientService" />
</inbound-endpoint>
<component>
<singleton-object class="com.csn.telex.esb.webservice.ClientServiceImpl" />
</component>

*<jms:outbound-endpoint name="mqOuttest" queue="Telex.Temp" connector-ref="jmsTemp" />*
</flow>

<flow name="MqOutBound">
*<jms:inbound-endpoint queue="Telex.Temp" connector-ref="jmsTemp">*


*<xa-transaction action="ALWAYS_BEGIN"/>*

*</jms:inbound-endpoint>*
<component>
*<singleton-object class="com.Test" />*
</component>
<jms:outbound-endpoint queue="Telex.Test" connector-ref="jmsConnectormmm">


<xa-transaction action="ALWAYS_JOIN"/>
</jms:outbound-endpoint>
</flow>

<jms:activemq-xa-connector name="jmsConnectormmm" brokerURL="tcp://10.89.220.208:61616" persistentDelivery="true" specification="1.1" createMultipleTransactedReceivers="false" >


<reconnect-forever frequency="2000" />
</jms:activemq-xa-connector>

*<jms:activemq-xa-connector name="jmsTemp" brokerURL="tcp://0.0.0.0:61616" persistentDelivery="true" maxRedelivery="9999" createMultipleTransactedReceivers="false" >*
*<reconnect-forever frequency="2000"/>*
*</jms:activemq-xa-connector>*
</mule>
-----

My purpose is when the webservice recieve a message, I can transport it to the remote JMS success.
Consider the remote JMS may be shutdown,so I add the VM when transport the message from Http to JMS.

jam zhou

unread,
Dec 9, 2011, 3:19:01 AM12/9/11
to us...@mule.codehaus.org
Use the local JMS,
I found ,there is one message in the ActiveMQ.DLQ queue after I shutdown and restart the remote JMS server.
So one message loss.

David Dossot

unread,
Dec 9, 2011, 12:31:56 PM12/9/11
to us...@mule.codehaus.org
If it's in the DLQ, it's not lost, you can reprocess it.

But, before going there, you need to configure the redelivery policy of your AMQ client so it tries more times / longer before dropping the message in the DLQ.

D.

jam zhou

unread,
Dec 12, 2011, 1:24:53 AM12/12/11
to us...@mule.codehaus.org
<jms:activemq-xa-connector name="jmsTemp" brokerURL="tcp://0.0.0.0:61616" persistentDelivery="true"
*maxRedelivery="9999" *createMultipleTransactedReceivers="false" >

<reconnect-forever frequency="2000"/>
</jms:activemq-xa-connector>

I had specify the maxRedelivery ,but the application won't get message from the local JMS inbound-enpoint as I restart the remote JMS server.
All the message is block in the local JMS queue.
is there some config that I didn't consider?

David Dossot

unread,
Dec 12, 2011, 11:36:18 AM12/12/11
to us...@mule.codehaus.org
Yes, there is some ActiveMQ specific config to deal with. Look at an example of how to configure ActiveMQ redelivery and DLQ policies here: http://www.mulesoft.org/mule-activemq-integration-examples  

jam zhou

unread,
Dec 16, 2011, 1:53:35 AM12/16/11
to us...@mule.codehaus.org
Hi,David.

Thank you for your reply again.

I fix the bug,by use the active connector factory with redelivery.

appreciate your guy‘s help.Thank you.

Reply all
Reply to author
Forward
0 new messages