[mule-user] Threading and pooling confusion

10 views
Skip to first unread message

Cuball

unread,
Jan 29, 2008, 3:32:23 AM1/29/08
to us...@mule.codehaus.org

I have this Usecase where I need to poll a db table for new records
(getIMPrintRequestsBETA and getIMPrintRequestsBETA.ack) the order of
receiving and processing is important (order of events entering needs to be
the same order that events goes out to the jms queue). I've been trying
lots of combinations , I think it finally works now but I don't really
understand how/why. What I've done is :

set the threading-profile on my connector to maxThreadsActive=2
and on my component i've set the pooling-profile to maxActive=1

a few questions I have:
*setting maxThreadActive=1 doesn't work
*what's the difference between threading-profile set on the connector and
threading-profile set on my component?
*how are threading-profile and pooling-profile related on component level?
If I limit threading in my component to 1 thread will pooling-profile have
no effect anymore ?
*why does setting doThreading=false in the threadingProfile on my connector
not result in an ordered processing of my messages received from my db?

here's my config file (Mule 1.4.3):

<mule-environment-properties recoverableMode="true" >
<queue-profile maxOutstandingMessages="1000" persistent="true" />
</mule-environment-properties>

<connector name="BETAConnector"
className="org.mule.providers.jdbc.JdbcConnector" >
<properties>
<container-property name="dataSource" reference="jdbcBETADataSource"/>
<map name="queries">
<property name="getIMPrintRequestsBETA"
value="SELECT * FROM PIMSCRIPT WHERE PRINTTIMESTAMP IS NULL ORDER BY
DOCNBR ASC"/>
<property name="getIMPrintRequestsBETA.ack"
value="UPDATE PIMSCRIPT SET PRINTTIMESTAMP = ${NOW} WHERE DOCNBR =
${docnbr}" />
<property name="writeIMPrintRequestsBETA"
value="UPDATE PIMSCRIPT SET FINISHEDTIMESTAMP = ${NOW} WHERE DOCNBR =
${printJobMetaData.documentNumber}" />
</map>
</properties>
<threading-profile maxThreadsActive="2"/>
</connector>

<mule-descriptor name="IMPrintComponent" singleton="true"
implementation="be.xxx.IMPrintComponent" >

<inbound-router>
<endpoint address="jdbc://getIMPrintRequestsBETA"
connector="jdbcBETAConnector" >
</endpoint>

</inbound-router>

<outbound-router matchAll="true">
<catch-all-strategy
className="org.mule.routing.ForwardingCatchAllStrategy">
<endpoint address="jms://errors"
transformers="ExceptionMessageToPrintJobError ObjectToJMSMessage"/>
</catch-all-strategy>

<router
className="org.mule.routing.outbound.FilteringListMessageSplitter">
<endpoint address="jms://printJobs" >
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="be.oz.mule.domain.PrintJob"/>
</endpoint>
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="java.util.List"/>
</router>


<!-- every message is also updated with a finishedtimestamp -->
<router
className="org.mule.routing.outbound.FilteringListMessageSplitter">
<endpoint address="jdbc://writeIMPrintRequestsBETA"
connector="jdbcBETAConnector" >
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="be.oz.mule.domain.PrintJob"/>
</endpoint>
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="java.util.List"/>
</router>
<!-- end updating finished timestamp -->

<!-- every message is also sent to a backup queue -->
<router
className="org.mule.routing.outbound.FilteringListMessageSplitter">
<endpoint address="jms://backup" >
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="be.oz.mule.domain.PrintJob"/>
</endpoint>
<filter className="org.mule.routing.filters.PayloadTypeFilter"
expectedType="java.util.List"/>
</router>
<!-- end sending to scripturaBackup queue -->

</outbound-router>

<pooling-profile initialisationPolicy="INITIALISE_ALL" maxActive="1"
exhaustedAction="WAIT"/>

<exception-strategy
className="org.mule.impl.DefaultComponentExceptionStrategy">
<endpoint address="jms://errors"
transformers="ExceptionMessageToPrintJobError ObjectToJMSMessage"/>
</exception-strategy>

</mule-descriptor>

--
View this message in context: http://www.nabble.com/Threading-and-pooling-confusion-tp15154703p15154703.html
Sent from the Mule - User mailing list archive at Nabble.com.


---------------------------------------------------------------------
To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email

Cuball

unread,
Jan 29, 2008, 4:12:28 AM1/29/08
to us...@mule.codehaus.org


Cuball wrote:
>
> , I think it finally works now but I don't really understand how/why.
> What I've done is :
>
> set the threading-profile on my connector to maxThreadsActive=2
> and on my component i've set the pooling-profile to maxActive=1
>
>

I've looked a bit closer and even now with these settings some messages get
out of order (on 1000 records read from db about 2% is out of order put in
my queue)

any ideas ?

--
View this message in context: http://www.nabble.com/Threading-and-pooling-confusion-tp15154703p15155307.html

Andrew Perepelytsya

unread,
Jan 29, 2008, 12:21:21 PM1/29/08
to us...@mule.codehaus.org
1.4.4 featured some fixes in this area, do you observe the same behavior with the 1.4.4 snapshot?

Andrew

Cuball

unread,
Jan 30, 2008, 6:04:23 AM1/30/08
to us...@mule.codehaus.org

Changing to the 1.4.4-SNAPSHOT gives me other errors... with same
configuration my components don't work anymore... I really don't know
anymore how I can do this simple task of reading - processing - dispatching
to a queue in an ordered manner via Mule... This looks like a trival use
case to me :(

here's the log from the 1.4.4 snapshot:


EBUG 2008-01-30 11:53:12,954 [jdbcBETAConnector.receiver.2]
org.mule.providers.jdbc.JdbcConnector: Retrieving new connection from data
source
DEBUG 2008-01-30 11:53:13,094 [jdbcBETAConnector.receiver.2]
org.mule.providers.jdbc.JdbcMessageReceiver: Message Received from:
jdbc://getIMPrintRequestsBETA
DEBUG 2008-01-30 11:53:13,125 [jdbcBETAConnector.receiver.2]
org.mule.impl.MuleSession: There is no session id on the request using key:
ID. Generating new session id: 8d9e3c24-cf21-11dc-b3e7-b91bf72803f3
DEBUG 2008-01-30 11:53:13,125 [jdbcBETAConnector.receiver.2]
org.mule.impl.MuleSession: dispatching event to component: IMPrintComponent,
event is: Event: 8da08615-cf21-11dc-b3e7-b91bf72803f3, sync=false, stop
processing=false, MuleEndpoint{endpointUri=jdbc://getIMPrintRequestsBETA,
connector=JdbcConnector{this=389922, started=true, initialised=true,
name='jdbcBETAConnector', disposed=false,
numberOfConcurrentTransactedReceivers=4,
createMultipleTransactedReceivers=true, connected=true,
supportedProtocols=[jdbc], serviceOverrides=null}, transformer=null,
name='endpoint.jdbc.getIMPrintRequestsBETA', type='receiver', properties={},
transactionConfig=Transaction{factory=null, action=NONE, timeout=30000},
filter=null, deleteUnacceptedMessages=false, initialised=true,
securityFilter=null, synchronous=null, initialState=started,
createConnector=0, remoteSync=false, remoteSyncTimeout=null,
endpointEncoding=null}
DEBUG 2008-01-30 11:53:13,141 [jdbcBETAConnector.receiver.2]
org.mule.impl.model.seda.SedaComponent: Component: IMPrintComponent has
received asynchronous event on: jdbc://getIMPrintRequestsBETA
DEBUG 2008-01-30 11:53:13,141 [jdbcBETAConnector.receiver.2]
org.mule.impl.model.seda.SedaComponent: Component: IMPrintComponent has
received asynchronous event on: jdbc://getIMPrintRequestsBETA
DEBUG 2008-01-30 11:53:13,719 [jdbcBETAConnector.receiver.2]
org.mule.impl.MuleMessage: new copy of message for
Thread[jdbcBETAConnector.receiver.2,5,main]
DEBUG 2008-01-30 11:53:14,047 [jdbcBETAConnector.receiver.3]
org.mule.providers.jdbc.JdbcConnector: Retrieving new connection from data
source
DEBUG 2008-01-30 11:53:14,079 [IMPrintComponent.1] org.mule.impl.MuleEvent:
Failed to load transformers from stream
java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.mule.impl.MuleEvent.unmarshallTransformers(MuleEvent.java:652)
at org.mule.impl.MuleEvent.readObject(MuleEvent.java:676)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at
org.mule.util.queue.FilePersistenceStrategy.load(FilePersistenceStrategy.java:103)
at
org.mule.util.queue.CachingPersistenceStrategy.load(CachingPersistenceStrategy.java:47)
at
org.mule.util.queue.TransactionalQueueManager.doLoad(TransactionalQueueManager.java:263)
at
org.mule.util.queue.TransactionalQueueSession$QueueImpl.poll(TransactionalQueueSession.java:115)
at org.mule.impl.model.seda.SedaComponent.dequeue(SedaComponent.java:601)
at org.mule.impl.model.seda.SedaComponent.run(SedaComponent.java:488)
at org.mule.impl.work.WorkerContext.run(WorkerContext.java:310)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Unknown Source)
DEBUG 2008-01-30 11:53:14,079 [IMPrintComponent.1]
org.mule.providers.service.TransportServiceDescriptor: Loading endpointUri
resolver: org.mule.providers.jdbc.JdbcEndpointBuilder
DEBUG 2008-01-30 11:53:14,094 [IMPrintComponent.1]
org.mule.impl.model.seda.SedaComponent: Component: IMPrintComponent dequeued
event on: jdbc://getIMPrintRequestsBETA
DEBUG 2008-01-30 11:53:14,047 [jdbcBETAConnector.receiver.1]
org.mule.providers.jdbc.JdbcConnector: Retrieving new connection from data
source
DEBUG 2008-01-30 11:53:14,157 [IMPrintComponent.2]
org.mule.providers.jdbc.JdbcConnector: Borrowing a dispatcher for endpoint:
jdbc://getIMPrintRequestsBETA
DEBUG 2008-01-30 11:53:14,250 [IMPrintComponent.2]
org.mule.providers.jdbc.JdbcConnector: Borrowed dispatcher:
JdbcMessageDispatcher{this=174e78a, endpoint=jdbc://getIMPrintRequestsBETA}
DEBUG 2008-01-30 11:53:14,250 [IMPrintComponent.2]
org.mule.impl.MuleMessage: new copy of message for
Thread[IMPrintComponent.2,5,main]
DEBUG 2008-01-30 11:53:14,250 [IMPrintComponent.2]
org.mule.providers.jdbc.JdbcConnector: Returning dispatcher for endpoint:
jdbc://getIMPrintRequestsBETA = JdbcMessageDispatcher{this=174e78a,
endpoint=jdbc://getIMPrintRequestsBETA}
ERROR 2008-01-30 11:53:14,266 [IMPrintComponent.2]
org.mule.impl.model.seda.SedaComponent: Work caused exception on
'workCompleted'. Work being executed was: proxy for: name=IMPrintComponent,
outbound endpoint=null, send transformer=null, inbound endpointUri=null,
receive transformer=null, encoding=null
ERROR 2008-01-30 11:53:14,266 [IMPrintComponent.2]
org.mule.impl.DefaultComponentExceptionStrategy: Caught exception in
Exception Strategy: null
java.lang.NullPointerException
at org.mule.impl.model.DefaultMuleProxy.run(DefaultMuleProxy.java:502)
at org.mule.impl.work.WorkerContext.run(WorkerContext.java:310)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Unknown Source)
INFO 2008-01-30 11:53:14,266 [IMPrintComponent.2]
org.mule.impl.DefaultComponentExceptionStrategy: There is no current event
available, routing Null message with the exception
ERROR 2008-01-30 11:53:14,282 [IMPrintComponent.2]
org.mule.impl.DefaultComponentExceptionStrategy: Message being processed is:
org.mule.providers.DefaultMessageAdapter/org.mule.providers.DefaultMessageAdapter@a33d00{id=8e511169-cf21-11dc-b3e7-b91bf72803f3,
payload=org.mule.providers.NullPayload, correlationId=null,
correlationGroup=-1, correlationSeq=-1, encoding=UTF-8,
exceptionPayload=null, properties={}}
DEBUG 2008-01-30 11:53:14,297 [IMPrintComponent.2]
org.mule.impl.MuleSession: There is no session id on the request using key:
ID. Generating new session id: 8e535b5b-cf21-11dc-b3e7-b91bf72803f3
DEBUG 2008-01-30 11:53:14,297 [IMPrintComponent.2]
org.mule.impl.RequestContext: copying event
java.lang.Exception
at org.mule.impl.RequestContext.noteUse(RequestContext.java:196)
at org.mule.impl.RequestContext.newEvent(RequestContext.java:206)
at org.mule.impl.RequestContext.setEvent(RequestContext.java:73)
at
org.mule.impl.AbstractExceptionListener.routeException(AbstractExceptionListener.java:244)
at
org.mule.impl.DefaultComponentExceptionStrategy.routeException(DefaultComponentExceptionStrategy.java:102)
at
org.mule.impl.DefaultExceptionStrategy.handleMessagingException(DefaultExceptionStrategy.java:29)
at
org.mule.impl.DefaultExceptionStrategy.handleStandardException(DefaultExceptionStrategy.java:57)
at
org.mule.impl.AbstractExceptionListener.exceptionThrown(AbstractExceptionListener.java:126)
at
org.mule.impl.model.AbstractComponent.handleException(AbstractComponent.java:465)
at
org.mule.impl.model.seda.SedaComponent.handleWorkException(SedaComponent.java:647)
at
org.mule.impl.model.seda.SedaComponent.workCompleted(SedaComponent.java:621)
at org.mule.impl.work.WorkerContext.run(WorkerContext.java:367)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Unknown Source)
DEBUG 2008-01-30 11:53:14,297 [IMPrintComponent.2]
org.mule.impl.MuleMessage: new copy of message for
Thread[IMPrintComponent.2,5,main]
DEBUG 2008-01-30 11:53:14,297 [IMPrintComponent.2]
org.mule.providers.jms.activemq.ActiveMqJmsConnector: Borrowing a dispatcher
for endpoint: jms://Errors
DEBUG 2008-01-30 11:53:14,344 [IMPrintComponent.2]
org.mule.providers.jms.activemq.ActiveMqJmsConnector: Borrowed dispatcher:
JmsMessageDispatcher{this=1ca9f78, endpoint=jms://Errors}
DEBUG 2008-01-30 11:53:14,344 [IMPrintComponent.2]
org.mule.providers.jms.JmsMessageDispatcher: Connecting:
JmsMessageDispatcher{this=1ca9f78, endpoint=jms://Errors}
INFO 2008-01-30 11:53:14,344 [IMPrintComponent.2]
org.mule.providers.jms.JmsMessageDispatcher: Connected:
JmsMessageDispatcher{this=1ca9f78, endpoint=jms://Errors}
DEBUG 2008-01-30 11:53:14,344 [IMPrintComponent.2]
org.mule.providers.jms.JmsMessageDispatcher: dispatching on endpoint:
jms://Errors. Event id is: 8e535b5c-cf21-11dc-b3e7-b91bf72803f3
DEBUG 2008-01-30 11:53:14,344 [IMPrintComponent.2]
org.mule.providers.jms.activemq.ActiveMqJmsConnector: Retrieving new jms
session from connection: topic=false, transacted=false, ack mode=1,
nolocal=false
DEBUG 2008-01-30 11:53:14,532 [IMPrintComponent.2]
be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer: Applying
transformer ExceptionMessageToPrintJobError
(be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer)
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer: Object
before transform: ExceptionMessage{message={NullPayload},
context={}exception=java.lang.NullPointerException, componentName='Unknown',
endpointUri=null, timeStamp=Wed Jan 30 11:53:14 CET 2008}
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer: Object
after transform: be.xxx.mule.domain.PrintJobError@1166179
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer: The
transformed object is of expected type. Type is: PrintJobError
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
be.xxx.mule.transformer.ExceptionMessageToPrintJobErrorTransformer:
Following transformer in the chain is ObjectToJMSMessage
(org.mule.providers.jms.transformers.ObjectToJMSMessage)
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Applying transformer
ObjectToJMSMessage (org.mule.providers.jms.transformers.ObjectToJMSMessage)
DEBUG 2008-01-30 11:53:14,547 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Object before
transform: be.xxx.mule.domain.PrintJobError@1166179
DEBUG 2008-01-30 11:53:14,563 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Source object is
PrintJobError
DEBUG 2008-01-30 11:53:14,563 [IMPrintComponent.2]
org.mule.providers.jms.activemq.ActiveMqJmsConnector: Retrieving new jms
session from connection: topic=false, transacted=false, ack mode=1,
nolocal=false
DEBUG 2008-01-30 11:53:14,641 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Closing
non-transacted jms session: ActiveMQSession
{id=ID:s501-040-2295-1201690352203-3:0:2,started=true}
DEBUG 2008-01-30 11:53:14,657 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Resulting object is
ActiveMQObjectMessage
DEBUG 2008-01-30 11:53:14,672 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: Object after
transform: ActiveMQObjectMessage {commandId = 0, responseRequired = false,
messageId = null, originalDestination = null, originalTransactionId = null,
producerId = null, destination = null, transactionId = null,...
DEBUG 2008-01-30 11:53:14,688 [IMPrintComponent.2]
org.mule.providers.jms.transformers.ObjectToJMSMessage: The transformed
object is of expected type. Type is: ActiveMQObjectMessage
DEBUG 2008-01-30 11:53:14,688 [IMPrintComponent.2]
org.mule.providers.jms.JmsMessageDispatcher: Sending message of type
ActiveMQObjectMessage
DEBUG 2008-01-30 11:53:14,735 [IMPrintComponent.2]
org.mule.providers.jms.activemq.ActiveMqJmsConnector: Returning dispatcher
for endpoint: jms://Errors = JmsMessageDispatcher{this=1ca9f78,
endpoint=jms://Errors}
DEBUG 2008-01-30 11:53:14,735 [IMPrintComponent.2]
org.mule.impl.DefaultComponentExceptionStrategy: routed Exception message
via MuleEndpoint{endpointUri=jms://Errors,
connector=ActiveMqJmsConnector{this=82b865, started=true, initialised=true,
name='jmsConnector', disposed=false,
numberOfConcurrentTransactedReceivers=4,
createMultipleTransactedReceivers=true, connected=true,
supportedProtocols=[jms], serviceOverrides=null},
transformer=ExceptionMessageToPrintJobErrorTransformer{this=11c7f8d,
name='ExceptionMessageToPrintJobError', ignoreBadInput=false,
returnClass=class java.lang.Object, sourceTypes=[]},
name='endpoint.jms.Errors', type='sender', properties={},
transactionConfig=Transaction{factory=null, action=NONE, timeout=30000},
filter=null, deleteUnacceptedMessages=false, initialised=true,
securityFilter=null, synchronous=null, initialState=started,
createConnector=0, remoteSync=false, remoteSyncTimeout=10000,
endpointEncoding=null}
DEBUG 2008-01-30 11:53:14,891 [jdbcBETAConnector.receiver.1]
org.mule.providers.jdbc.JdbcConnector: Retrieving new connection from data
source


--
View this message in context: http://www.nabble.com/Threading-and-pooling-confusion-tp15154703p15179305.html

Andrew Perepelytsya

unread,
Jan 30, 2008, 8:29:26 AM1/30/08
to us...@mule.codehaus.org
The only error I see here is the NPE in DefaultMuleProxy, which is odd. I bet you may have had persistent VM queues from the old config. Could you purge them and re-try, we can look into analyzing this NPE then.

Andrew

Cuball

unread,
Jan 30, 2008, 9:08:09 AM1/30/08
to us...@mule.codehaus.org

Andrew Perepelytsya wrote:
>
> I bet you may have had persistent VM queues from the old config. Could you
> purge them and re-try, we can look into analyzing this NPE then.
>
> Andrew
>

I indeed have set persistent="true" in my queue profile for my mule
environment. I didn't know that could be loaded from another config file.
How do I remove the messages from the vm queue?

I looked at /bin/.mule/queuestore but it's empty, I guess that's not where
it could be loaded from because I installed mule 1.4.4 in another directory
anyway.

--
View this message in context: http://www.nabble.com/Threading-and-pooling-confusion-tp15154703p15182890.html

Reply all
Reply to author
Forward
0 new messages