Suggestions on parallelized and distributed version of TASSEL

208 views
Skip to first unread message

Faheem Abrar

unread,
Sep 12, 2017, 5:31:23 PM9/12/17
to TASSEL - Trait Analysis by Association, Evolution and Linkage

Hello everyone,


I am trying to modify TASSEL to a distributed architecture - so that it can process data and run plugins on multiple machines simultaneously. I encountered issues during my endeavours, and I am sharing them to get feedback and suggestions from the maintainers of the package as well as enthusiasts.


As far I’ve seen, tassel uses Java parallel streams and Java threads to achieve single machine parallelism. That might be enough for some with a very powerful machine (say, 100s of cores and gigabytes of RAM), but the application is not scalable - that is, it’s not built to work in cluster of machines. I’m trying to modify TASSEL and make it work with existing Big Data applications (such as Spark and Hadoop). But I’ve encountered some problems.


  • In the tassel pipeline, the required plugins are parsed and created from arguments. Then those plugins are pushed into a thread pool which is controlled by a Java ExecutionService. These threads do a nice job of parallelizing tasks using multiple cores. However, I saw the opportunity to parallelize data processing inside each plugin. Take for instance, the weighted MLM plugin. There are multiple iterators, upon which the same set of operations are run iteratively. I saw it as a good opportunity to split the dataset and run those distinct tasks on multiple machines. To achieve that goal, I decided to use Apache Spark. I created a spark context object and created a RDD from the iterators. When I try to run map operations using the distinct tasks I identified, I encountered java.util.ConcurrentModificationException
  • The problem lies in Spark’s attempt of serialization. The block of codes I tried to parallelize using map operation uses references of objects which are running under threads. Spark tries to modify those objects in order to serialize, hence the concurrent modification exception.
  • So, it seems the problem lies in the thread based architecture that tassel attempted. The only solution to this problem that I could think of was deconstruct the architecture so that the plugins run sequentially, instead of under threads. However, it seems that the entire system is deeply encapsulated inside thread ecosystem. All plugins use Plugin interface, which in turn extends Runnable interface. The output data of plugins are returned using thread listeners. It seems to me that deconstructing the plugins from thread based system to a sequential system might be a very complicated affair.


I should mention that before posting here, I have looked through various researches to find attempts of implementing distributed solution of tassel, without much success. So I have posted here, to get feedback from the maintainers of this package about my proposed solution and the problems I am encountering. I would much appreciate it if anyone can share ideas and suggestions about how I can proceed to parallelize tassel plugins to work in multiple machines without the problems I an encountering. I would also welcome any mention of shortcomings in my approach.

Peter Bradbury

unread,
Sep 13, 2017, 12:16:54 PM9/13/17
to TASSEL - Trait Analysis by Association, Evolution and Linkage
Faheem,

Many of the TASSEL methods are not good candidates for running on a distributed environment. For example, PCA relies on singular value decomposition of the entire dataset. The SVD is a single method call to an external library. As you suggest, MLM would be a good candidate for parallelization. In fact, the method is not even multi-threaded, though that is on out to-do list but not near the top. The best approach to parallelizing TASSEL methods would be to take the general approach of splitting the input data prior to providing the input to TASSEL and to run a separate instance of TASSEL on each node. In that way, the data provided to each instance of TASSEL would be independent.

It would be the responsiblility of the software managing the parallelization (Spark, for instance) to accumulate the results from the different nodes. We have thought about using Spark with TASSEL in that way in our lab and do not see any reason why it should not work, but have not had any real need to do that. 

Without knowing the details of how you are using Spark with TASSEL, it is impossible to say where the problem is coming from. It could be that you are trying to write the results to a common data structure instead of having each node record its own data then merge the results when they come back to driver. Getting the parts properly isolated with Spark can be a bit tricky.

Peter

Faheem Abrar

unread,
Nov 1, 2017, 5:49:57 PM11/1/17
to TASSEL - Trait Analysis by Association, Evolution and Linkage
Thank you very much for your suggestions. I followed your advice of running separate instance of TASSEL on each node using Spark. Now, I am having a problem on which I need some help.

The problem is related to threading. I am running the following tassel command

./run_pipeline.pl -fork1 -h /tassel-5-source/data/mdp_genotype.hmp.txt -filterAlign -filterAlignMi
nFreq 0.05 -fork2 -r /tassel-5-source/data/mdp_traits.txt -fork3 -q /tassel-5-source/d
ata/mdp_population_structure.txt -excludeLastTrait -fork4 -k /tassel-5-source/data/mdp_kinship.txt
 -combine5 -input1 -input2 -input3 -intersect -combine6 -input5 -input4 -mlm -export /tassel-5-sou
rce/mlm_output_tutorial -runfork1 -runfork2 -runfork3 -runfork4

As you can see, the command runs a Weighted MLM Plugin at the end of the pipeline. 

The problem happens while I try to execute Tassel inside a Spark map operation. During operation, the files load properly, the filter plugin runs correctly. So far, so good. But whenever the MLM plugin starts, the spark operation shuts down. 

After a detailed analysis, I believe the abrupt shut down of spark operation happens because Spark does not recognize that a new thread is running the MLM plugin. This would explain why the previous plugins work fine, and just a little while after MLM plugin starts, spark closes the operation.

This is indeed a weird problem. Why Spark would not recognize the thread running MLM plugin, when it recognizes the previous threads?

Looking at the log file of execution, I noticed a slight difference between the FileLoadPlugin log and WeightedMLMPlugin log. Below is an excerpt from FileLoadPlugin log - 

[pool-17-thread-3] INFO net.maizegenetics.plugindef.AbstractPlugin - Finished net.maizegenetics.analysis.data.FileLoadPlugin: time: Oct 26, 2017 14:00:58
[pool-17-thread-3] INFO net.maizegenetics.pipeline.TasselPipeline - net.maizegenetics.analysis.data.FileLoadPlugin: time: Oct 26, 2017 14:00:58: progress: 100%

Next is the excerpt from WeightedMLMPlugin log, which signals the beginning of the MLM plugin, but stops abruptly when spark shuts down.

[Thread-29] INFO net.maizegenetics.plugindef.AbstractPlugin - Starting net.maizegenetics.analysis.association.WeightedMLMPlugin: time: Oct 26, 2017 14:00:58

It seems that all the plugins except MLM plugin are under a thread pool, managed by an executor service defined in TasselPipeline file. I can only fathom that perhaps this subtle difference might be the reason for the odd error. But to confirm this theory, I need to find where in the code the thread that executes the MLM plugin is instantiated.

So far, I have not been able to find the source of instantiation of the MLM plugin thread. I can see that the following code excerpt, from TasselPipeline file, submits the thread jobs to a pool and waits for the jobs using Future.

List<Future<?>> futures = new ArrayList<>();
 
myThreads.stream().forEach((current) -> {
 
    futures.add(pool.submit(current));
 
});
 
for (Future<?> future : futures) {
 
    future.get();
 
}

But I could not find any reference to the MLM plugin from the myThreads list. I only got listeners to the FileLoad plugins. Due to the program having a pipeline structure, with each pipeline having multiple forks, I assume that somewhere along the line that thread with the MLM plugin will be executed.

My questions are - 

1. In which part of the code I can expect that the thread containing the MLM plugin is being executed? As it is not part of a thread pool, I assume that it should be a normal thread job having a run() operation. Where in the source code can I find the thread being executed?

2. This question is not as important as 1, but I would still like to ask, in the off chance that I might get a clue. What might be the reason behind spark acknowledging the existence of threads in a pool, but not the existence of a single thread outside the pool? I have not been able to find any reason from my internet search, but if you have any idea or clue at all, please let me know. Even a small idea may lead to a working solution.

I hope I was able to properly explain my predicament. I believe solving this problem would go a long way to parallellize Tassel, in small scale at least. Thank you very much for your patience, and I look forward to your answer.

Terry Casstevens

unread,
Nov 1, 2017, 6:04:17 PM11/1/17
to Tassel User Group
Does this command run correctly outside of spark?
> --
> You received this message because you are subscribed to the Google Groups
> "TASSEL - Trait Analysis by Association, Evolution and Linkage" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to tassel+un...@googlegroups.com.
> To post to this group, send email to tas...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/tassel/74d9ca75-e8e0-4960-b8fe-cf7b919f9b15%40googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.

Terry Casstevens

unread,
Nov 1, 2017, 6:25:29 PM11/1/17
to Tassel User Group
Will you send all the logging?

Faheem Abrar

unread,
Nov 1, 2017, 6:36:08 PM11/1/17
to TASSEL - Trait Analysis by Association, Evolution and Linkage
The command does run correctly outside Spark. Not only that, but also simple Tassel program such as importing and exporting a hapmap file, and intersection plugin works inside Spark map operation. It's only when I try to run the Weighted MLM plugin that the error happens.

I have attached a yarn log file for your perusal. This log describes the Spark operation that I attempted.
application_log_unsuccessful.txt

Terry Casstevens

unread,
Nov 1, 2017, 7:03:04 PM11/1/17
to Tassel User Group
Would you mind running this with -debug flag and send that log?

./run_pipeline.pl -debug ...
> https://groups.google.com/d/msgid/tassel/c92a9503-9b89-490f-bd39-d43c41585d30%40googlegroups.com.

Faheem Abrar

unread,
Nov 1, 2017, 7:12:55 PM11/1/17
to TASSEL - Trait Analysis by Association, Evolution and Linkage
I've already executed this before, with the debug flag. I've attached the corresponding log.
Please note that to identify the source of error, I made some textual modification in the debug functions, so you will see something like "Running blablabla for BC160".
application_log_with_debug.txt

Terry Casstevens

unread,
Feb 20, 2018, 7:23:42 PM2/20/18
to Tassel User Group
This may be fixed now.

In TasselPipeline

I changed
pool.shutdownNow();
to
pool.shutdown();

I don't have a setup like yours to test.
> https://groups.google.com/d/msgid/tassel/a213f55a-b2dc-43fe-b28e-1e5d33c17c36%40googlegroups.com.

Terry Casstevens

unread,
Feb 20, 2018, 7:25:31 PM2/20/18
to Tassel User Group
Although the fix isn't in the latest build yet.
Reply all
Reply to author
Forward
0 new messages