How nextflow check the process complete?

860 views
Skip to first unread message

Brandon

unread,
Jul 26, 2016, 2:59:27 AM7/26/16
to Nextflow

For example,

I finished the all process of my workflow using nextflow.



but like this, with -resume  is no work. 

$ nextflow run main.nf -with-trace -with-timeline -with-dag flowchar.png -resume

...
[b0/e23c6c] Cached process > miraligner (4)
[20/956bce] Cached process > miraligner (5)
[95/07abe3] Cached process > miraligner (6)
[fd/3c6731] Cached process > miraligner (1)
[c2/2b0975] Cached process > miraligner (2)
[0e/3d70c5] Cached process > miraligner (7)
[6d/ac9ba8] Cached process > miraligner (3)
[d8/1691c0] Cached process > miraligner (8)
[ff/bacc41] Submitted process > seqcluster_prepare (1)
[2b/0607bc] Cached process > qc (2)
[a2/17a100] Cached process > qc (4)
[a1/ff25dc] Cached process > qc (6)
....



I confused.

My guess is all process must be cached process.


Paolo Di Tommaso

unread,
Jul 26, 2016, 9:36:54 AM7/26/16
to nextflow
Nextflow creates a unique key for each task hashing the script to be executed and all input data and input files metadata (i.e. file size, name, last update, etc). 

Then check that the expected outputs exist for each of that task, otherwise a new run is executed. 


Cheers,
Paolo



--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
Visit this group at https://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.

Brandon

unread,
Jul 27, 2016, 4:18:22 AM7/27/16
to Nextflow
This process is..

154 process seqcluster_prepare{
155         beforeScript 'export PATH=/BiO/BioTools/miniconda3/envs/smrna/bin/:$PATH'
156
157         publishDir "${params.outdir}/seqcluster/prepare";
158
159         input:
160         val(prefix) from crp_prepare.toList()
161         file(reads) from crf_prepare.toList()
162
163         output:
164         file 'seqs.ma' into seqs_ma
165         file 'seqs.fastq' into seqs_fastq
166         file 'stats_prepare.tsv' into stats_prepare
167
168         exec:
169         prefix = prefix.join(",")
170         reads = reads.toString().tokenize().join(",")
171
172         shell:
173         """
174         python ${params.script_dir}/prepare_data.py ${prefix} ${reads}
175         """
176 }

I don't know why this step is always need an rerun.

Can you check this and give the any suggestion please?


2016년 7월 26일 화요일 오후 10시 36분 54초 UTC+9, Paolo Di Tommaso 님의 말:

Paolo Di Tommaso

unread,
Jul 27, 2016, 4:55:49 AM7/27/16
to nextflow
Not sure that's the problem but exec and shell are alternative statements. You cannot use them in the same process. 

Try this instead: 


 process seqcluster_prepare{

         publishDir "${params.outdir}/seqcluster/prepare";

         input:
         val(prefix) from crp_prepare.toList()
         file(reads) from crf_prepare.toList()

         output:
         file 'seqs.ma' into seqs_ma
         file 'seqs.fastq' into seqs_fastq
         file 'stats_prepare.tsv' into stats_prepare

         shell:
         def prefix = prefix.join(",")
         def reads = reads.toString().tokenize().join(",")
         """
         python ${params.script_dir}/prepare_data.py ${prefix} ${reads}
         """
}



Also the best practice to define environment variables is to define them in the nextflow config file. For example: 


env {
   PATH='BiO/BioTools/miniconda3/envs/smrna/bin/:$PATH'
}



Hope it helps 


Cheers,
Paolo

Brandon

unread,
Jul 27, 2016, 11:14:56 PM7/27/16
to Nextflow
Thank you very much!

I updated my workflow using your suggestion.

2016년 7월 27일 수요일 오후 5시 55분 49초 UTC+9, Paolo Di Tommaso 님의 말:

Brandon

unread,
Jul 28, 2016, 5:39:08 AM7/28/16
to Nextflow
As before, I don't know why this process is sumitted again after I modify the script on your suggestion.


In this case, do I need a keep an eye out for python script?



2016년 7월 28일 목요일 오후 12시 14분 56초 UTC+9, Brandon 님의 말:

Paolo Di Tommaso

unread,
Jul 28, 2016, 6:37:25 AM7/28/16
to nextflow
OK, I think the problem in that step is the following input: 

  val(prefix) from crp_prepare.toList()

I guess the channel crp_prepare is the result of an upstream parallel process, thus the sequence of the values in the returned list may change and this cause to invalid the cache entry for the process. 

you may fix this problem changing that input as follow: 

 val(prefix) from crp_prepare.toList().map { new nextflow.util.ArrayBag(it) } 

This will transform the list into a "bag" for which the order of the items is not taking into account for the process cache hashing. 


Thanks for pointing out this problem. I will try to improve fix it in a future release .


Cheers,
Paolo

Brandon

unread,
Jul 28, 2016, 9:23:43 PM7/28/16
to Nextflow
Thanks for your support.

I solved it with your suggesstion. but I have same issue about the another process.

process seqcluster_cluster{

publishDir "${params.outdir}/seqcluster/cluster";




input:

file '*' from seqcluster_cluster_seqs_ma

file (bam:'*') from star_bam_for_cluster




output:

file 'seqcluster.json' into seqcluster_json




"""

seqcluster cluster -o ./ -m seqs.ma -a seqs.bam -r ${params.ref_fasta} -g ${params.gtf_srna_transcripts} 

"""
 

}

Full code is here.


and the flowchar.png is attached.




2016년 7월 28일 목요일 오후 7시 37분 25초 UTC+9, Paolo Di Tommaso 님의 말:
flowchar.png

Paolo Di Tommaso

unread,
Aug 1, 2016, 4:20:00 AM8/1/16
to nextflow
Hi, 

I don't see any problem here. Do you have docker container for this pipeline that I can use to run it locally ? 


Cheers, 
Paolo

Brandon

unread,
Aug 1, 2016, 9:53:45 PM8/1/16
to Nextflow
I have no experience with docker. so I have an time for docker container.

and I'll test it in another enviroment (oin ther server system)

Thanks,
Hyunmin 



2016년 8월 1일 월요일 오후 5시 20분 0초 UTC+9, Paolo Di Tommaso 님의 말:
Reply all
Reply to author
Forward
0 new messages