Matching input files during parallel execution in Nextflow

306 views
Skip to first unread message

Vivek Appadurai

unread,
Aug 19, 2021, 4:59:18 AM8/19/21
to Nextflow
Hi,

I'm tying to implement a polygenic scoring pipeline in Nextflow. The basic idea of the pipeline is to 

1. Split a file of summary statistics by chromosome (input: VCF, output: 22 VCFs)
2. For each of the 22 VCFs from step 1, calculate posterior effect sizes using a polygenic scoring model (additional input, chr wise ld matrices and plink genotype files).
3. ...continue with scoring and finally merge the per sample scores

I'm struggling a bit to write the logic in matching the right chromosome VCF to its LD matrix and PLINK genotype file.

In a previous implementation in .wdl, I read the paths to the ld matrices as an input .json file, which the user could specify from the command line and I could read into a dictionary with the chromosome name as the key. Then use the chromosome name from the process output to index this dictionary to stage the file for the process.

However, this does not seem possible in Nextflow. 

My approach so far looks like this:

Reading the file list in .json format:

import groovy.json.JsonSlurper

def jsonSlurper_prscs   = new JsonSlurper()
def prscs_ld_json       = new File(params.prscs_ld_files)
String prscs_ld_files   = prscs_ld_json.text
def prscs_ld_dict       = jsonSlurper_prscs.parseText(prscs_ld_files) 

where the .json file looks like this:

{

        "1" : "ldblk_1kg_chr1.hdf5",

        "2" : "ldblk_1kg_chr2.hdf5",

        "3" : "ldblk_1kg_chr3.hdf5",

        "4" : "ldblk_1kg_chr4.hdf5",

        "5" : "ldblk_1kg_chr5.hdf5",

        "6" : "ldblk_1kg_chr6.hdf5",

        "7" : "ldblk_1kg_chr7.hdf5",

        "8" : "ldblk_1kg_chr8.hdf5",

        "9" : "ldblk_1kg_chr9.hdf5",

        "10" : "ldblk_1kg_chr10.hdf5",

        "11" : "ldblk_1kg_chr11.hdf5",

        "12" : "ldblk_1kg_chr12.hdf5",

        "13" : "ldblk_1kg_chr13.hdf5",

        "14" : "ldblk_1kg_chr14.hdf5",

        "15" : "ldblk_1kg_chr15.hdf5",

        "16" : "ldblk_1kg_chr16.hdf5",

        "17" : "ldblk_1kg_chr17.hdf5",

        "18" : "ldblk_1kg_chr18.hdf5",

        "19" : "ldblk_1kg_chr19.hdf5",

        "20" : "ldblk_1kg_chr20.hdf5",

        "21" : "ldblk_1kg_chr21.hdf5",

        "22" : "ldblk_1kg_chr22.hdf5"

}


The processes:

splitting the gwas file:

process split_reformat_gwas {
input:
val chr
val traitName
path gwas
val N
val method

output:
val chr
path "${traitName}_${method}_chr${chr}.txt"

script:
"""
python /bin/split_gwas_vcf.py --vcf $gwas --chromosome $chr --format $method
"""
}

calculating posteriors:

process calc_posteriors_prscs {
input:
val chr
path gwas_chr
val N
path ld_mat
tuple val(bfile), path(plink_files)
output:
path "${out_prefix}_prscs_chr${chr}.snpRes"

script:
"""
echo "python /bin/PRScs.py --ref_dir=$ld_mat \
--sst_file=$gwas_chr \
--bim_prefix=$bfile \
--n_gwas=$N \
--chrom=$chr \
--out_dir=$workDir"
"""
}

with the workflow calls:

include { split_reformat_gwas as split_for_prscs } from './modules/split_reformat_gwas.nf'
include { calc_posteriors_prscs } from './modules/calc_posteriors_prscs.nf'

workflow {
split_for_prscs(Channel.of(1..22),
params.trait,
params.ref,
params.N,
"prscs")

calc_posteriors_prscs(split_for_prscs.out[0],
split_for_prscs.out[1],
params.N,
prscs_ld_dict.get(split_for_prscs.out[0]),
Channel.fromFilePairs("${params.bfile}.{bed, bim, fam}", checkIfExists: true))
}

however this part of the input prscs_ld_dict.get(split_for_prscs.out[0]) seems to error out as the file cannot be fetched without any value in the process output.

There is possibly a better Nextflow way of doing this and I might be trying to fit Nextflow syntax into the wdl scatter/gather logic, so any feedback is appreciated.

Cheers,
Vivek.

Sam

unread,
Aug 19, 2021, 11:51:03 AM8/19/21
to Nextflow
Given your JSON format, I think it might be easier to do it with the following code instead of using the JSON

ld = Channel.of(1..22) \
     | map { a -> [    a,
                               "ldblk_1kg_chr${a}.hdf5"]}

Then you can do



process split_reformat_gwas {
input:
tuple val(chr)
val(traitName),
path(gwas),
val(N),
val(method)

output:
tuple val (chr),
path ("${traitName}_${method}_chr${chr}.txt")

script:
"""
python /bin/split_gwas_vcf.py --vcf $gwas --chromosome $chr --format $method
"""
}
process calc_posteriors_prscs {
input:
tuiple val (chr),
path (gwas_chr),
val (N),
path (ld_mat),
path(bed),
path(bim),
path(fam)
output:
path "${out_prefix}_prscs_chr${chr}.snpRes"

script:
"""
echo "python /bin/PRScs.py --ref_dir=$ld_mat \
--sst_file=$gwas_chr \
--bim_prefix=$bim \
--n_gwas=$N \
--chrom=$chr \
--out_dir=$workDir"
"""
}


workflow{
Channel.of(1..22) \
| combine(params.trait) \
| combine(params.ref) \
| combine(params.N) \
| combine("prscs") \
| split_for_prscs \
| calculate_posteriors_prscs \
| combine(params.N) \
| combine(ld, by: 0) \
| combine(Channel.fromFilePairs("${params.bfile}.{bed, bim, fam}", checkIfExists: true)))
}

Which I found more elegant

Sam

unread,
Aug 20, 2021, 10:34:44 AM8/20/21
to Nextflow

An edit to the map:


ld = Channel.of(1..22) \
     | map { a -> [    a,
                               file("ldblk_1kg_chr${a}.hdf5")]}

to indicate the second argument should be a file

Vivek Appadurai

unread,
Aug 20, 2021, 11:45:02 AM8/20/21
to Nextflow
This workflow is indeed more elegant and gives me a frame to build on.

The reason I want to accept the inputs in JSON is to avoid hardcoding the filenames and paths in the pipeline script. There are use cases where we might use reference gwas from Finnish or even Inuit populations and I want to suggest to people to build their own LD matrices in those contexts and avoid running the pipeline with the European or the other LD matrices provided by PRS-CS.

I ended up creating the map like this:

prscs_ld_ch = Channel.of(1..22) | map {a -> [a, prscs_ld_dict.get(a.toString())]}

I got stuck for a while with the map returning null until I realized JSON keys are always in string format so I needed to typecast  the channel value to string.

Cheers,
Vivek.

Reply all
Reply to author
Forward
0 new messages