Receiving messages using Camel Integration and threading behavior

217 views
Skip to first unread message

galen...@gmail.com

unread,
Nov 21, 2013, 2:18:32 PM11/21/13
to camunda-...@googlegroups.com
Hi,

I'm running into a few issues with camel integration:

1) When receiving messages back from camel via a receive task, the subsequent tasks in the flow execute on a particular thread, even if those subsequent tasks are async=false, exclusive=true (for example the NOOP tasks in my example). Why are separate threads being used here? I mean shouldn't exclusive=true, async=false mean that they can't be executed at the same time? You can see them interleaved in the log below, and on separate threads.

2) This (I think) leads to Optimistic locking exceptions at the joining parallel gateway.

3) No jobs get put into the ACT_RU_JOB table for retrying later upon the locking exception.

4) Due to #3, I can't attempt the "sacrificial" locking 'null' task approach as suggested here:
https://groups.google.com/forum/#!msg/camunda-bpm-users/j-kMGHZcXjw/2C1t6Zyagr8J
How can I force the process back into a single-threaded/synchronous mode before the parallel join?

5) After both receive tasks receive messages and move on in the flow, I would think camel's role would be completely over. However, if a locking exception occurs at the end, then I get the "org.apache.camel.processor.DefaultErrorHandler - Failed delivery for ..." error (see log below).
Why is camel still involved at this point? Is it because of the way Camunda's camel integration is implemented? Perhaps the "InOut" nature is expecting some sort of hand-shake at the receive task that it never receives?


Here is my process:
http://www.camunda.org/share/#/process/14ac4245-8959-4618-b16b-d663983c9cba


NOTE: if I put a sleep in one of the camel routes, then I avoid the locking exception, since the tokens arrive at different times. However, this is just a "band-aid" solution. In more complex processes, the tokens could still potentially arrive at the gateway at the same time.


Please forgive me if I'm beating a dead horse here (I know similar things have been discussed on other posts), but I still feel like this is somewhat unresolved. I do have a feeling that once these two tickets are implemented, all of these problems will be gone :)

https://app.camunda.com/jira/browse/CAM-1531
https://app.camunda.com/jira/browse/CAM-1532

Thanks,
Galen


+++++++++++++++++++++++++++++++++++++++++++++++
LOG:

11:07:40,237 [http-8080-1] INFO controller.ProcessController - **************** STARTED process 'cam_camel_exception' ID = 8301
11:07:40,237 [http-8080-1] INFO service.camunda.CamundaExecutionService - Waiting for Camunda process instance '8301' to finish...
11:07:40,254 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
Thread[pool-1-thread-1,5,main]2: FORWARDING TO runStuff2
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: SLEEPING FOR 13497 ms...
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: DONE WITH SLEEP
Thread[pool-1-thread-1,5,main]1: FORWARDING TO runStuff1
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: SLEEPING FOR 12659 ms...
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: DONE WITH SLEEP
msg 2 received.
11:07:40,362 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - SleepTask xtor...
11:07:40,363 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.CamTask - execute()...
11:07:40,364 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - in impl...
11:07:40,364 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #0)
11:07:40,365 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #1)
11:07:40,367 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #2)
11:07:40,368 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #3)
msg 1 received.
11:07:40,368 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - SleepTask xtor...
11:07:40,368 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.CamTask - execute()...
11:07:40,369 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - in impl...
11:07:40,369 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #0)
11:07:40,369 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #4)
11:07:40,370 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #1)
11:07:40,370 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #5)
11:07:40,371 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #2)
11:07:40,371 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #6)
11:07:40,372 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #3)
11:07:40,373 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #7)
11:07:40,373 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #4)
11:07:40,374 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #8)
11:07:40,374 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #5)
11:07:40,375 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #9)
11:07:40,376 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #6)
11:07:40,377 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #7)
11:07:40,378 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #8)
11:07:40,379 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.camunda.SleepTask - sleep for 1 milliseconds... (iter #9)
Nov 21, 2013 11:07:40 AM org.camunda.bpm.engine.impl.interceptor.CommandContext close
SEVERE: Error while closing command context
org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[8301] was updated by another transaction concurrently
at org.camunda.bpm.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:704)
at org.camunda.bpm.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:500)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.close(CommandContext.java:154)
at org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:49)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor$1.doInTransaction(SpringTransactionInterceptor.java:42)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:40)
at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:32)
at org.camunda.bpm.engine.impl.RuntimeServiceImpl.messageEventReceived(RuntimeServiceImpl.java:266)
at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:100)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
11:07:40,432 [Camel (camelContext) thread #5 - seda://runStuff1] ERROR org.apache.camel.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-ghollins-09204131306- 54065-1385060838967-0-4 on ExchangeId: ID-ghollins-09204131306- 54065-1385060838967-0-6). Exhausted after delivery attempt: 1 caught: org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[8301] was updated by another transaction concurrently

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route2 ] [route2 ] [seda://runStuff1 ] [ 112]
[route1 ] [process1 ] [ routes.camunda.CamundaCamelRoutes$1@270ad7c0 ] [ 0]
[route1 ] [to1 ] [seda:runStuff1?waitForTaskToComplete=Never ] [ 0]
[route2 ] [process2 ] [ routes.camunda.CamundaCamelRoutes$2@5e1387c6 ] [ 11]
[route2 ] [to3 ] [camunda-bpm:message?messageName=camel.done1 ] [ 100]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-ghollins-09204131306- 54065-1385060838967-0-6
ExchangePattern InOut
Headers {breadcrumbId=ID-ghollins-09204131306- 54065-1385060838967-0-4, CamelRedelivered=false, CamelRedeliveryCounter=0}
BodyType java.util.HashMap
Body {}
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[8301] was updated by another transaction concurrently
at org.camunda.bpm.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:704)
at org.camunda.bpm.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:500)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.close(CommandContext.java:154)
at org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:49)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor$1.doInTransaction(SpringTransactionInterceptor.java:42)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:40)
at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:32)
at org.camunda.bpm.engine.impl.RuntimeServiceImpl.messageEventReceived(RuntimeServiceImpl.java:266)
at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:100)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
11:07:40,434 [Camel (camelContext) thread #5 - seda://runStuff1] WARN org.apache.camel.component.seda.SedaConsumer - Error processing exchange. Exchange[Message: {}]. Caused by: [org.camunda.bpm.engine.OptimisticLockingException - ExecutionEntity[8301] was updated by another transaction concurrently]
org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[8301] was updated by another transaction concurrently
at org.camunda.bpm.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:704)
at org.camunda.bpm.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:500)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:211)
at org.camunda.bpm.engine.impl.interceptor.CommandContext.close(CommandContext.java:154)
at org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:49)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor$1.doInTransaction(SpringTransactionInterceptor.java:42)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:40)
at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:32)
at org.camunda.bpm.engine.impl.RuntimeServiceImpl.messageEventReceived(RuntimeServiceImpl.java:266)
at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:100)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
11:07:42,262 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
11:07:44,271 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
11:07:46,279 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
11:07:48,287 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
11:07:50,294 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
11:07:52,301 [http-8080-1] INFO service.camunda.CamundaExecutionService - PROCESS[8301] (suspended=false, ended=false)
...
... THESE LOG MESSAGES CONTINUE INDEFINITELY...
...

galen...@gmail.com

unread,
Nov 21, 2013, 2:53:54 PM11/21/13
to camunda-...@googlegroups.com, galen...@gmail.com
By the way, I acknowledge that you can successfully accomplish parallelism by not using camel, and by using async=true, exclusive=false service tasks.

For example:
http://www.camunda.org/share/#/process/e4b2c8e6-68fc-45bb-8123-90e57cca5da7

This forum post is just my questions/issues with camel usage inside parallel gateways.

Thanks,
Galen

Bernd Rücker (camunda)

unread,
Nov 22, 2013, 2:36:26 AM11/22/13
to camunda-...@googlegroups.com

Hi Galen.

 

Not sure where you are exactly - but maybe it make sense to organize a workshop at your side where we cover all the questions you have? Would maybe speed things up? The questions you ask below are covered in our advanced training perfectly. Anyway - see answers inline.

 

Cheers

Bernd

 

-----Ursprüngliche Nachricht-----
Von: camunda-...@googlegroups.com [mailto:camunda-...@googlegroups.com] Im Auftrag von galen...@gmail.com
Gesendet: Donnerstag, 21. November 2013 20:19
An: camunda-...@googlegroups.com
Betreff: [camunda-bpm-users] Receiving messages using Camel Integration and threading behavior

 

Hi,

 

   I'm running into a few issues with camel integration:

 

1)  When receiving messages back from camel via a receive task, the subsequent tasks in the flow execute on a particular thread, even if those subsequent tasks are async=false, exclusive=true (for example the NOOP tasks in my example).  Why are separate threads being used here?  I mean shouldn't exclusive=true, async=false mean that they can't be executed at the same time?  You can see them interleaved in the log below, and on separate threads.

 

If async=false the exclusive=x does not have any effect as this is only taken into account in case of async. See http://docs.camunda.org/latest/guides/user-guide/#process-engine-transactions-in-processes for some background on how we do threading.

 

2)  This (I think) leads to Optimistic locking exceptions at the joining parallel gateway.

 

3)  No jobs get put into the ACT_RU_JOB table for retrying later upon the locking exception.

 

Exactly – no async means we directly use the client thread to advance in the process flow. In this case the client thread is the one from camel triggering the message event.

 

4)  Due to #3, I can't attempt the "sacrificial" locking 'null' task approach as suggested here:

https://groups.google.com/forum/#!msg/camunda-bpm-users/j-kMGHZcXjw/2C1t6Zyagr8J

How can I force the process back into a single-threaded/synchronous mode before the parallel join?

 

After a parallel gateway you have only one sequence flow – so it cannot be one thread. That assured. So if you arrive there one after the other the second thread is used. The OptimisticLock is only a “problem” if you arrive at the exact same time.

 

5)  After both receive tasks receive messages and move on in the flow, I would think camel's role would be completely over.  However, if a locking exception occurs at the end, then I get the "org.apache.camel.processor.DefaultErrorHandler  - Failed delivery for ..." error (see log below).

Why is camel still involved at this point?  Is it because of the way Camunda's camel integration is implemented?  Perhaps the "InOut" nature is expecting some sort of hand-shake at the receive task that it never receives?

 

See above – we use the client thread from Camel. If you want to avoid this (and Camels role should be over) you have to use async=true.

 

Here is my process:

http://www.camunda.org/share/#/process/14ac4245-8959-4618-b16b-d663983c9cba

 

 

NOTE:  if I put a sleep in one of the camel routes, then I avoid the locking exception, since the tokens arrive at different times.  However, this is just a "band-aid" solution.  In more complex processes, the tokens could still potentially arrive at the gateway at the same time.

 

If you have the right asyncs configered this boils down to very few milliseconds where both answers arrive. If you are in the use case of integration this is really unlikely – because remote communication always consumes some time. So in reality I haven’t yet seen this to be a big deal. And you could even add some redelivery attempt in camel in case you experience a OptimisticLockException – then it gets retried and anything is fine.

--

You received this message because you are subscribed to the Google Groups "camunda BPM users" group.

To unsubscribe from this group and stop receiving emails from it, send an email to camunda-bpm-us...@googlegroups.com.

To post to this group, send email to camunda-...@googlegroups.com.

To view this discussion on the web visit https://groups.google.com/d/msgid/camunda-bpm-users/a610f4d6-51fe-41bf-b10a-2b4e9487ec21%40googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.

galen...@gmail.com

unread,
Nov 22, 2013, 12:16:31 PM11/22/13
to camunda-...@googlegroups.com
Hi Bernd,

Thanks for your answers. I think I'm understanding the behavior pretty well now. It makes sense after reading the docs that camunda is "borrowing the client thread", which in is case is the camel thread, since async=false.

By the way your documentation is really good/helpful.

We may want to take advantage of an advanced training course at some point down the road, but I'm not sure yet. Out of curiosity, do you do the advanced training in the US (we are in California)?

Thanks,
Galen

Bernd Rücker (camunda)

unread,
Nov 23, 2013, 3:05:31 AM11/23/13
to camunda-...@googlegroups.com
Hi Galen.

We have no classroom training in the US (yet) - but we can do onsite
trainings/workshops. Or we could discuss about some WebEx/Remote
Consulting to discuss the most urgent questions. If interesting best send
me a private mail.

Cheers
Bernd

-----Urspr�ngliche Nachricht-----
Gesendet: Freitag, 22. November 2013 18:17
An: camunda-...@googlegroups.com
Betreff: Re: [camunda-bpm-users] Receiving messages using Camel
Integration and threading behavior
--
You received this message because you are subscribed to the Google Groups
"camunda BPM users" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to camunda-bpm-us...@googlegroups.com.
To post to this group, send email to camunda-...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/camunda-bpm-users/58d188d4-9cd3-4f17-9bd
8-45452a41f3d8%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages