I have two service tasks which I would like to execute in parallel (see http://camunda.org/share/#/process/7d501925-631f-4452-9bf5-7275d8926a52). Per default the job engine executes the two service tasks sequentially.
How can the job engine be configured to execute these in parallel? What are the possible downsides?
I found https://app.camunda.com/confluence/display/foxUserGuide/Advanced+Job+Executor, but setting the property maxJobsPerAcquisition had no influence.
Regards,
Tobias
thanks for posting the links. Setting camunda:exclusive="false" leads to the behaviour described in the documentation.
The use case is theoretical.
Regards
Tobias
I'm having some issues getting serviceTasks to run in parallel. Whenever I set
<code>
camunda:async="true"
</code>
the corresponding serviceTask does not run. When I set one to "true" and one to "false", then the "false" on runs.
For example, in the following process, only ServiceTask_1 runs:
<code>
<bpmn2:process id="camunda_parallel_sleeps" isExecutable="true">
<bpmn2:startEvent id="StartEvent_1">
<bpmn2:outgoing>SequenceFlow_3</bpmn2:outgoing>
</bpmn2:startEvent>
<bpmn2:sequenceFlow id="SequenceFlow_3" name="" sourceRef="StartEvent_1" targetRef="ParallelGateway_1"/>
<bpmn2:parallelGateway id="ParallelGateway_1">
<bpmn2:incoming>SequenceFlow_3</bpmn2:incoming>
<bpmn2:outgoing>SequenceFlow_4</bpmn2:outgoing>
<bpmn2:outgoing>SequenceFlow_5</bpmn2:outgoing>
</bpmn2:parallelGateway>
<bpmn2:sequenceFlow id="SequenceFlow_4" name="" sourceRef="ParallelGateway_1" targetRef="ServiceTask_1"/>
<bpmn2:serviceTask id="ServiceTask_2" camunda:class="foo.SleepTask" camunda:exclusive="false" camunda:async="true">
<bpmn2:incoming>SequenceFlow_5</bpmn2:incoming>
<bpmn2:outgoing>SequenceFlow_7</bpmn2:outgoing>
</bpmn2:serviceTask>
<bpmn2:sequenceFlow id="SequenceFlow_5" name="" sourceRef="ParallelGateway_1" targetRef="ServiceTask_2"/>
<bpmn2:endEvent id="EndEvent_1">
<bpmn2:incoming>SequenceFlow_1</bpmn2:incoming>
</bpmn2:endEvent>
<bpmn2:parallelGateway id="ParallelGateway_2">
<bpmn2:incoming>SequenceFlow_6</bpmn2:incoming>
<bpmn2:incoming>SequenceFlow_7</bpmn2:incoming>
<bpmn2:outgoing>SequenceFlow_1</bpmn2:outgoing>
</bpmn2:parallelGateway>
<bpmn2:sequenceFlow id="SequenceFlow_7" name="" sourceRef="ServiceTask_2" targetRef="ParallelGateway_2"/>
<bpmn2:serviceTask id="ServiceTask_1" camunda:class="foo.SleepTask" camunda:exclusive="false" camunda:async="false">
<bpmn2:incoming>SequenceFlow_4</bpmn2:incoming>
<bpmn2:outgoing>SequenceFlow_6</bpmn2:outgoing>
</bpmn2:serviceTask>
<bpmn2:sequenceFlow id="SequenceFlow_6" name="" sourceRef="ServiceTask_1" targetRef="ParallelGateway_2"/>
<bpmn2:sequenceFlow id="SequenceFlow_1" name="" sourceRef="ParallelGateway_2" targetRef="EndEvent_1"/>
</bpmn2:process>
</code>
Is what I'm trying to do technically possible?
Thanks,
Galen
Thanks for the info. So are you saying that if I use a parallel gate, then there is only ever going to be one token of execution? In other words a whole branch of the gate has to reach the converging gate before spawning another token to follow another path?
I guess I'm still unclear why adding async=true would cause the serviceTask to not be executed. I guess what you are saying is that somehow the splitting/gate only sends the execution along on the "main" thread, and if tasks are on the background thread, this "main" thread will never reach them.
++++++++++++++++++++++++++++++++++++++++++
Among all of the engines I've tried, implementing true parallel execution is probably one of the trickiest concepts. Each engine has a somewhat "hacky" way of allowing for it.
In jBPM it's accomplished by using asynchronous WorkItemHandlers, and a callback of "completeWorkItem".
https://community.jboss.org/thread/165545?_sscc=t
In Activiti it's accomplished via async camel seda queues http://bpmn20inaction.blogspot.in/2012/12/implement-parallel-execution-in-activiti.html
In Camunda, I'm still trying to figure out the best way to do this. So far, the best way I've come up with is to have the Java serviceTask spawn another thread, then return control to the process engine immediately. Immediately following the serviceTask in the flow is a message receive event, which picks up a message sent out at the end of the thread's execution. This approach works, but requires extra nodes in the diagram, and extra implementation to manage the threads and the message sending at the end.
+++++++++++++++++++++++++++++++++++++++++++++
As a side note, I've seen some arguments (mostly by engine implementors) that true parallelization is really overkill, or not needed. But it really depends in practice. If the tasks you are trying to run are mostly single-threaded apps that are CPU intensive, and you have a lot of cores on your machine, then you might as well be taking care of things in parallel. If the tasks are just proxies to executions that run on physically different hosts, then you also might as well be running in parallel. I understand that running on physically different hosts usually is a result of a web service call or a REST call, but in general it would be nice to just have the abstraction around parallel tasks in a single node, without having to think about it more. I think when most users see the parallel gateway, they initially think that it will be true concurrent execution.
By the way, I'm totally fine with the default behavior being serial execution -- I think that's probably the smartest default. I just think that the developer should have the flexibility to choose multi-threaded if he/she desires.
From the posts above it sounds like multi-threading can be achieved if not using gates. I would love to see my example (using a parallel gate) massaged into a working multi-threaded example if at all possible.
Thanks,
Galen
> In Activiti it's accomplished via async camel seda queues http://bpmn20inaction.blogspot.in/2012/12/implement-parallel-execution-in-activiti.html
Why not using the camunda camel component: https://github.com/camunda/camunda-bpm-camel ?
CASE 1 (async=false, exclusive=true): both serviceTasks run in a serial fashion (as expected)
CASE 2 (async=false, exclusive=false): both serviceTasks run in a serial fashion (same behavior as CASE 1. Shouldn't this execute on concurrent Threads?)
CASE 3 (async=true, exclusive=true): neither serviceTask ever gets executed, process hang forever (no exceptions)
CASE 4 (async=true, exclusive=false): neither serviceTask ever gets executed, process hang forever (no exceptions) (same behavior as CASE 3)
So this is what I was concerned about in my above post -- the fact that serviceTasks aren't executing. I understand that there will be synchronization issues at the joining gateway if I go with non-defaults (i.e. async=true), but I'm simply not even able to get the tasks executed in the first place, so it never even gets to the joining gateway. Forgive me if I'm still not understanding this.
++++++++++++++++++++++++++++++++++++++
I totally agree with you about the dangerous aspects of spawning a thread. I'm just doing this for now as a proof-of-concept. In the future, something like a JMS queue would be used. For now I'm just evaluating various engine implementations.
++++++++++++++++++++++++++++++++++++++
In terms of one process vs. multiple processes, I'm looking to run a single process that manages multiple concurrent tasks by using a parallel gateway. It just makes for a simpler model that way. I understand that you could have several different process definitions instantiated via messages/signals, and that they can run in parallel fine, but then you would have to have sort of a "master process" "worker processes" model where there are joining message receives in the master. I would be interested in seeing some inter-process concurrency examples though. Maybe I'm thinking about this whole paradigm the wrong way :) In general, we are shooting to simplify the overall model, and hopefully keep things encapsulated in one process definition.
++++++++++++++++++++++++++++++++++++++
With regards to the camel component, thanks for bringing that up -- I will check that out further. I'm assuming the implementation of what I'm trying to do would be very similar to the activiti post I mentioned above? In general though, I think that people who model BPMN, and want to execute something concurrently don't want to think in terms of two nodes (a call, and a receive like the activiti example). It would be cool if the engine did the camel stuff under the covers in a single node implementation. That is to say it would take are of the seda async route, the execution and the callback all in one node. It would sort of act like a User Task in that sense, where it would be a transaction boundary, and other stuff could execute.
Thanks,
Galen
regarding cases 1 and 2: with async=false both service tasks are executed sequentially in the current thread. That's why the exclusive attribute has no effect. It's only meaningful with async=true.
With async=true, the current thread reaches both service tasks and instead of executing them creates a job, i.e. persists it to the database. It then returns, the tasks are not immediately executed. This is now the responsibility of the job executor. You write that the tasks are not executed at all. This may be that the job executor is not activated (see http://docs.camunda.org/latest/guides/user-guide/#process-engine-the-job-executor-job-executor-activation) or some other issue in your setup. To help you with this, you could post some infos about your setup, for example if you are using the pre-built distribution from camunda.org or running inside a unit test or using the embedded engine. Furthermore you might also post your process engine configuration. In addition, you could take a look at the ACT_RU_JOB_ table in the database to ensure that the jobs are actually created.
Best regards,
Thorben
Hi Rob,
yes, in your scenario that is the expected behavior. The cause is that we need to synchronize two transactions at the joining parallel gateway. If those transactions are executed concurrently, only one is able to complete successfully since we use the optimistic locking scheme. It will then roll back to the last persistent state which is in your case the Service Task A or B. If you had a sequence of multiple service tasks on each branch with each of them having an asynchronous continuation configured, then only the last service task would be rolled back and the previous ones could be executed in a truly concurrent way.
Maybe we should add the possibility to make the joining parallel gateway asynchronous? That way the optimistic locking exception would not roll back the service task but would only retry execution of the join?
Cheers,
Daniel Meyer
Hi Rob,
yes, thank you for that. This is currently a known issue:
https://app.camunda.com/jira/browse/CAM-428
Cheers,
Daniel Meyer
> Maybe we should add the possibility to make the joining parallel gateway asynchronous? That way the optimistic locking exception would not roll back the service task but would only retry execution of the join?
Yes, this would be awesome if you could implement this behavior. This seems like the correct thing to do in this case. But I'm not sure if the best terms to use on the gateway is "asynchronous". It seems like a "lock-wait-retry" flag (similar to the way MySQL retries for a period of time when encountering a lock, before eventually giving). It would be great is there would be a configurable "timeout", and "retryFrequency" values. Of course most users would just use the defaults.
Thanks,
Galen
Thanks for the debugging tips. When I run CASE 3, I do see two rows being created in the ACT_RU_JOB table. Here is what they look like (sorry for the formatting):
mysql> select * from ACT_RU_JOB;
+------+------+---------+----------------+-------------+------------+---------------+----------------------+----------+---------------------+----------------+----------+---------+--------------------+-------------------------+----------------+
| ID_ | REV_ | TYPE_ | LOCK_EXP_TIME_ | LOCK_OWNER_ | EXCLUSIVE_ | EXECUTION_ID_ | PROCESS_INSTANCE_ID_ | RETRIES_ | EXCEPTION_STACK_ID_ | EXCEPTION_MSG_ | DUEDATE_ | REPEAT_ | HANDLER_TYPE_ | HANDLER_CFG_ | DEPLOYMENT_ID_ |
+------+------+---------+----------------+-------------+------------+---------------+----------------------+----------+---------------------+----------------+----------+---------+--------------------+-------------------------+----------------+
| 4313 | 1 | message | NULL | NULL | 1 | 4311 | 4308 | 3 | NULL | NULL | NULL | NULL | async-continuation | transition-create-scope | 4301 |
| 4314 | 1 | message | NULL | NULL | 1 | 4312 | 4308 | 3 | NULL | NULL | NULL | NULL | async-continuation | transition-create-scope | 4301 |
I then checked my configuration, and for some reason (maybe I copied and pasted from some example), I had:
<property name="jobExecutorActivate" value="false" />
So I changed this to "true", and restarted and now I'm seeing the expected behavior:
CASE 1 (async=false, exclusive=true): both serviceTasks run in a serial fashion (as expected)
CASE 2 (async=false, exclusive=false): both serviceTasks run in a serial fashion (as expected)
CASE 3 (async=true, exclusive=true): both serviceTasks run in a serial fashion (as expected)
CASE 4 (async=true, exclusive=false): both serviceTasks run concurrently, then I get an OptimisticLockingException
What is interesting though, is that in CASE 4, it leaves one row in the ACT_RU_JOB table. Is this to be expected. Per other posts, I would have expected this one to be reprocessed.
Thanks,
Galen
> > In Activiti it's accomplished via async camel seda queues http://bpmn20inaction.blogspot.in/2012/12/implement-parallel-execution-in-activiti.html
> Why not using the camunda camel component: https://github.com/camunda/camunda-bpm-camel ?
I tried implementing an example using camunda-bpm-camel by:
1) Using a parallel gateway
2) 2 serviceTasks that call different camel routes
3) In the middle of the routes, some work is performed (i.e. Processor())
4) At the end of the routes, they send a message indicating the work is done.
5) 2 message receive events receive message from routes
6) Sleep tasks (java delegate) get executed after message receive
7) Finally, a joining parallel gateway
See diagram here:
http://i801.photobucket.com/albums/yy298/ghollins/ScreenShot2013-11-19at120340PM_zps8dbf68dd.png
The problem I'm seeing is that I get an OptimisticLocking exception on the joining parallel gateway. I noticed that the receive tasks kick off the sleep tasks on separate threads, and this may be the root of my problem:
12:09:13,839 [Camel (camelContext) thread #5 - seda://runStuff1] INFO task.builtin.SleepTask - sleep for 13000 milliseconds...
12:09:15,057 [Camel (camelContext) thread #6 - seda://runStuff2] INFO task.builtin.SleepTask - sleep for 3000 milliseconds...
Then I get:
12:09:26,851 [Camel (camelContext) thread #5 - seda://runStuff1] ERROR org.apache.camel.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-ghollins on ExchangeId: ID-ghollins). Exhausted after delivery attempt: 1 caught: org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[1017] was updated by another transaction concurrently
Although, it's weird, because they can't be reaching the gateway at the same time, since I've delayed one by 10 seconds (13000 millis vs. 3000 millis).
Any ideas?
Thanks,
Galen
> I briefly looked at the process - did you notice that you will never be
>
> able to complete the process because of a deadlock in the joining parallel
>
> gateway at the end? That is an error in the BPMN I guess (you could upload
>
> it to www.camunda.org/share/ then I could point to the problem directly).
>
I have shared my BPMN at:
http://www.camunda.org/share/#/process/05f8db4e-8113-49eb-8395-9b2990fa8d25
Here are my camel routes:
from("direct://syncService1")
.onException(SampleException.class) // map exception to BPMN error
.throwException(new BpmnError("camel.error"))
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread()+"1: FORWARDING TO runStuff1");
}
})
.to("seda:runStuff1?waitForTaskToComplete=Never");
from("seda:runStuff1")
.onException(SampleException.class) // map exception to BPMN error
.throwException(new BpmnError("camel.error"))
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread()+"1: SLEEPING FOR 2 SECONDS...");
Thread.sleep(2000);
System.out.println(Thread.currentThread()+"1: DONE WITH SLEEP");
}
})
.to("camunda-bpm:message?messageName=camel.done1");
from("direct://syncService2")
.onException(SampleException.class) // map exception to BPMN error
.throwException(new BpmnError("camel.error"))
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread()+"2: FORWARDING TO runStuff2");
}
})
.to("seda:runStuff2?waitForTaskToComplete=Never");
from("seda:runStuff2")
.onException(SampleException.class) // map exception to BPMN error
.throwException(new BpmnError("camel.error"))
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread()+"2: SLEEPING FOR 12 SECONDS...");
Thread.sleep(12000);
System.out.println(Thread.currentThread()+"2: DONE WITH SLEEP");
}
})
.to("camunda-bpm:message?messageName=camel.done2");
+++++++++++++++++++++++++++++++++++
When I sleep in both camel routes for the exact same amount of time (i.e. 2 seconds in each route), I get an OptimisticLockingException at the joining gateway:
Thread[pool-1-thread-1,5,main]1: FORWARDING TO runStuff1
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: SLEEPING FOR 2 SECONDS...
Thread[pool-1-thread-1,5,main]2: FORWARDING TO runStuff2
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: SLEEPING FOR 2 SECONDS...
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: DONE WITH SLEEP
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: DONE WITH SLEEP
msg 1 received.
msg 2 received.
Nov 20, 2013 10:40:45 AM org.camunda.bpm.engine.impl.interceptor.CommandContext close
SEVERE: Error while closing command context
org.camunda.bpm.engine.OptimisticLockingException: ExecutionEntity[3501] was updated by another transaction concurrently
...
...
<<<MORE STACKTRACE -- TRUNCATED>>>
When I sleep for different amounts of time the messages are received, but the process never finishes (deadlock at the joining gateway?):
Thread[pool-1-thread-1,5,main]1: FORWARDING TO runStuff1
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: SLEEPING FOR 2 SECONDS...
Thread[pool-1-thread-1,5,main]2: FORWARDING TO runStuff2
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: SLEEPING FOR 12 SECONDS...
Thread[Camel (camelContext) thread #5 - seda://runStuff1,5,main]1: DONE WITH SLEEP
msg 1 received.
Thread[Camel (camelContext) thread #6 - seda://runStuff2,5,main]2: DONE WITH SLEEP
msg 2 received.
<<< PROCESS NEVER FINISHES>>>
+++++++++++++++++++++++++++++++++++++
NOTE: in this BPMN model there are no sleeps in Java, they are all in the camel routes.
I would expect that in the case where I stagger the arrivals at the gateway by sleeping for different amounts of time, the problem is not one of concurrency. So I assume the deadlock is due to the fact that there are two different threads that arrive at the gateway. Each one of them doesn't know about the other, so they think that only 1 out of 2 tokens have reached the gateway.
If you could please elaborate the problem on my shared diagram, that would be awesome. Also, proposing a solution to accomplish what I'm trying to do would be great.
Thanks!
Galen
Hi Bernd,
Thank you very much, that definitely helps. For some reason the link you posted doesn't display a diagram (I see no diagram on the screen). I've tried it on 2 different browsers, and I get the same result. I think I can imagine what your solution would look like though. You probably add 2 XOR gateway (similar to figure 2.30 in your Real-Life BPMN book) to merge the events. I will try this approach. It all makes sense to me now.
Thanks,
Galen