Hi,
I'm tying to implement a polygenic scoring pipeline in Nextflow. The basic idea of the pipeline is to
1. Split a file of summary statistics by chromosome (input: VCF, output: 22 VCFs)
2. For each of the 22 VCFs from step 1, calculate posterior effect sizes using a polygenic scoring model (additional input, chr wise ld matrices and plink genotype files).
3. ...continue with scoring and finally merge the per sample scores
I'm struggling a bit to write the logic in matching the right chromosome VCF to its LD matrix and PLINK genotype file.
In a previous implementation in .wdl, I read the paths to the ld matrices as an input .json file, which the user could specify from the command line and I could read into a dictionary with the chromosome name as the key. Then use the chromosome name from the process output to index this dictionary to stage the file for the process.
However, this does not seem possible in Nextflow.
My approach so far looks like this:
Reading the file list in .json format:
import groovy.json.JsonSlurper
def jsonSlurper_prscs = new JsonSlurper()
def prscs_ld_json = new File(params.prscs_ld_files)
String prscs_ld_files = prscs_ld_json.text
def prscs_ld_dict = jsonSlurper_prscs.parseText(prscs_ld_files)
where the .json file looks like this:
{
"1" : "ldblk_1kg_chr1.hdf5",
"2" : "ldblk_1kg_chr2.hdf5",
"3" : "ldblk_1kg_chr3.hdf5",
"4" : "ldblk_1kg_chr4.hdf5",
"5" : "ldblk_1kg_chr5.hdf5",
"6" : "ldblk_1kg_chr6.hdf5",
"7" : "ldblk_1kg_chr7.hdf5",
"8" : "ldblk_1kg_chr8.hdf5",
"9" : "ldblk_1kg_chr9.hdf5",
"10" : "ldblk_1kg_chr10.hdf5",
"11" : "ldblk_1kg_chr11.hdf5",
"12" : "ldblk_1kg_chr12.hdf5",
"13" : "ldblk_1kg_chr13.hdf5",
"14" : "ldblk_1kg_chr14.hdf5",
"15" : "ldblk_1kg_chr15.hdf5",
"16" : "ldblk_1kg_chr16.hdf5",
"17" : "ldblk_1kg_chr17.hdf5",
"18" : "ldblk_1kg_chr18.hdf5",
"19" : "ldblk_1kg_chr19.hdf5",
"20" : "ldblk_1kg_chr20.hdf5",
"21" : "ldblk_1kg_chr21.hdf5",
"22" : "ldblk_1kg_chr22.hdf5"
}
The processes:
splitting the gwas file:
process split_reformat_gwas {
input:
val chr
val traitName
path gwas
val N
val method
output:
val chr
path "${traitName}_${method}_chr${chr}.txt"
script:
"""
python /bin/split_gwas_vcf.py --vcf $gwas --chromosome $chr --format $method
"""
}
calculating posteriors:
process calc_posteriors_prscs {
input:
val chr
path gwas_chr
val N
path ld_mat
tuple val(bfile), path(plink_files)
output:
path "${out_prefix}_prscs_chr${chr}.snpRes"
script:
"""
echo "python /bin/PRScs.py --ref_dir=$ld_mat \
--sst_file=$gwas_chr \
--bim_prefix=$bfile \
--n_gwas=$N \
--chrom=$chr \
--out_dir=$workDir"
"""
}
with the workflow calls:
workflow {
split_for_prscs(Channel.of(1..22),
params.trait,
params.ref,
params.N,
"prscs")
calc_posteriors_prscs(split_for_prscs.out[0],
split_for_prscs.out[1],
params.N,
prscs_ld_dict.get(split_for_prscs.out[0]),
Channel.fromFilePairs("${params.bfile}.{bed, bim, fam}", checkIfExists: true))
}
however this part of the input prscs_ld_dict.get(split_for_prscs.out[0]) seems to error out as the file cannot be fetched without any value in the process output.
There is possibly a better Nextflow way of doing this and I might be trying to fit Nextflow syntax into the wdl scatter/gather logic, so any feedback is appreciated.
Cheers,
Vivek.