Need help with python subprocess and hadoop streaming

154 views
Skip to first unread message

Shalini Ravishankar

unread,
Jan 24, 2015, 3:15:55 PM1/24/15
to mr...@googlegroups.com
Hello Everyone,


I have a python program (methratio.py) which reads and write files from local. I have modified the read and write so that it can read/ write to/from hdfs. Bur I am having issues. The logic of the program is correct, it is bioinformatics algorithm . But he problem is with read/write files from hdfs. Can some one please tell me what I am doing wrong here


I have modified the methratio.py so that it can read files from hdfs. But I am getting error.  Can someone tell me what I am doing wrong here

The Original code :

 import sys, time, os, array, optparse
usage = "usage: %prog [options] BSMAP_MAPPING_FILES"
parser = optparse.OptionParser(usage=usage)

parser.add_option("-o", "--out", dest="outfile", metavar="FILE", help="output file name. (required)", default="")
parser.add_option("-d", "--ref", dest="reffile", metavar="FILE", help="reference genome fasta file. (required)", default="")
parser.add_option("-c", "--chr", dest="chroms", metavar="CHR", help="process only specified chromosomes, separated by ','. [default: all]\nexample: --chroms=chr1,chr2", default=[])
parser.add_option("-s", "--sam-path", dest="sam_path", metavar="PATH", help="path to samtools. [default: none]", default='')
parser.add_option("-u", "--unique", action="store_true", dest="unique", help="process only unique mappings/pairs.", default=False)                                    
parser.add_option("-p", "--pair", action="store_true", dest="pair", help="process only properly paired mappings.", default=False)
parser.add_option("-z", "--zero-meth", action="store_true", dest="meth0", help="report loci with zero methylation ratios.", default=False)
parser.add_option("-q", "--quiet", action="store_true", dest="quiet", help="don't print progress on stderr.", default=False)
parser.add_option("-r", "--remove-duplicate", action="store_true", dest="rm_dup", help="remove duplicated reads.", default=False)
parser.add_option("-t", "--trim-fillin", dest="trim_fillin", type="int", metavar='N', help="trim N end-repairing fill-in nucleotides. [default: 2]", default=2)
parser.add_option("-g", "--combine-CpG", action="store_true", dest="combine_CpG", help="combine CpG methylaion ratios on both strands.", default=False)
parser.add_option("-m", "--min-depth", dest="min_depth", type="int", metavar='FOLD', help="report loci with sequencing depth>=FOLD. [default: 1]", default=1)
parser.add_option("-n", "--no-header", action="store_true", dest="no_header", help="don't print a header line", default=False)
parser.add_option("-i", "--ct-snp", dest="CT_SNP", help='how to handle CT SNP ("no-action", "correct", "skip"), default: "correct".', default="correct")

options, infiles = parser.parse_args()

if len(options.reffile) == 0: parser.error("Missing reference file, use -d or --ref option.")
if len(options.outfile) == 0: parser.error("Missing output file name, use -o or --out option.")
if len(infiles) == 0: parser.error("Require at least one BSMAP_MAPPING_FILE.") 
if any(options.chroms): options.chroms = options.chroms.split(',')
CT_SNP_val = {"no-action": 0, "correct": 1, "skip": 2}
try: options.CT_SNP = CT_SNP_val[options.CT_SNP.lower()]
except: parser.error('Invalid -i value, select "no-action", "correct" or "skip"')
if options.min_depth <= 0: parser.error('Invalid -m value, must >= 1')
if options.trim_fillin < 0: parser.error('Invalid -t value, must >= 0')

if any(options.sam_path): 
    if options.sam_path[-1] != '/': options.sam_path += '/'

def disp(txt, nt=0):
    if not options.quiet: print >> sys.stderr, ''.join(['\t' for i in xrange(nt)]+['@ ',time.asctime(),': ',txt])

def get_alignment(line):
    col = line.split('\t')
    if sam_format:
        flag = col[1] 
        if 'u' in flag: return []
        if options.unique and 's' in flag: return []
        if options.pair and 'P' not in flag: return []
        cr, pos, cigar, seq, strand, insert = col[2], int(col[3])-1, col[5], col[9], '', int(col[8])
        if cr not in options.chroms: return []
        for aux in col[11:]:
            if aux[:5] == 'ZS:Z:': 
                strand = aux[5:7]
                break
        assert strand, 'missing strand information "ZS:Z:xx"'
        gap_pos, gap_size = 0, 0
        while 'I' in cigar or 'D' in cigar:
            for sep in 'MID':
                try: gap_size = int(cigar.split(sep, 1)[0])
                except ValueError: continue
                break
            if sep == 'M': gap_pos += gap_size
            elif sep == 'I': seq = seq[:gap_pos] + seq[gap_pos+gap_size:]
            elif sep == 'D': 
                seq = seq[:gap_pos] + '-' * gap_size + seq[gap_pos:]                        
                gap_pos += gap_size
            cigar = cigar[cigar.index(sep)+1:]
    else:
        flag = col[3][:2]
        if flag == 'NM' or flag == 'QC': return []
        if options.unique and flag != 'UM': return []
        if options.pair and col[7] == '0': return []
        seq, strand, cr, pos, insert, mm = col[1], col[6], col[4], int(col[5])-1, int(col[7]), col[9]
        if cr not in options.chroms: return []
        if ':' in mm:
            tmp = mm.split(':')
            gap_pos, gap_size = int(tmp[1]), int(tmp[2])
            if gap_size < 0: seq = seq[:gap_pos] + seq[gap_pos-gap_size:]  # insertion on reference
            else: seq = seq[:gap_pos] + '-' * gap_size + seq[gap_pos:]
    if pos + len(seq) >= len(ref[cr]): return []
    if options.rm_dup:  # remove duplicate hits
        if strand == '+-' or strand == '-+': frag_end, direction = pos+len(seq), 2
        else: frag_end, direction = pos, 1
        if coverage[cr][frag_end] & direction: return []
        coverage[cr][frag_end] |= direction
    if options.trim_fillin > 0: # trim fill in nucleotides
        if strand == '+-': seq = seq[:-options.trim_fillin]
        elif strand == '--': seq, pos = seq[options.trim_fillin:], pos+options.trim_fillin
        elif insert != 0 and len(seq) > abs(insert) - options.trim_fillin:
            trim_nt = len(seq) - (abs(insert) - options.trim_fillin)
            if strand == '++': seq = seq[:-trim_nt]
            elif strand == '-+': seq, pos =seq[trim_nt:], pos+trim_nt
    if sam_format and insert > 0: seq = seq[:int(col[7])-1-pos] # remove overlapped regions in paired hits, SAM format only
    return (seq, strand[0], cr, pos)

ref, cr, seq = {}, '', ''
disp('reading reference %s ...' % options.reffile)
for line in open(options.reffile):
    if line[0] == '>': 
        if any(cr): 
            if len(options.chroms) == 0 or cr in options.chroms: ref[cr] = seq.upper()
        cr, seq = line[1:-1].split()[0], ''
    else: seq += line.strip()

if len(options.chroms) == 0 or cr in options.chroms: ref[cr] = seq.upper()
del seq

meth, depth, coverage, meth1, depth1 = {}, {}, {}, {}, {}
for cr in ref:
    meth[cr] = array.array('H', [0]) * len(ref[cr])
    depth[cr] = array.array('H', [0]) * len(ref[cr])
    if options.rm_dup: coverage[cr] = array.array('B', [0]) * len(ref[cr])
    if options.CT_SNP > 0:
        meth1[cr] = array.array('H', [0]) * len(ref[cr])
        depth1[cr] = array.array('H', [0]) * len(ref[cr])

options.chroms = set(ref.keys())

BS_conversion = {'+': ('C','T','G','A'), '-': ('G','A','C','T')}
nmap = 0
for infile in infiles:
    nline = 0
    disp('reading %s ...' % infile)
    if infile[-4:].upper() == '.SAM': sam_format, fin = True, os.popen('%ssamtools view -XS %s' % (options.sam_path, infile)) 
    elif infile[-4:].upper() == '.BAM': sam_format, fin = True, os.popen('%ssamtools view -X %s' % (options.sam_path, infile))
    else: sam_format, fin = False, open(infile)
    for line in fin:
        nline += 1
        if nline % 10000000 == 0: disp('read %d lines' % nline, nt=1)
        map_info = get_alignment(line)
        if len(map_info) == 0: continue
        seq, strand, cr, pos = map_info
        depthcr = depth[cr]
        if pos + len(seq) > len(depthcr): continue
        nmap += 1
        methcr = meth[cr]
        refseq = ref[cr][pos:pos+len(seq)]
        match, convert, rc_match, rc_convert = BS_conversion[strand]
        index = refseq.find(match)
        while index >= 0:
            if depthcr[pos+index] < 65535:
                if seq[index] == convert: depthcr[pos+index] += 1
                elif seq[index] == match: 
                    methcr[pos+index] += 1
                    depthcr[pos+index] += 1
            index = refseq.find(match, index+1)
        if options.CT_SNP == 0: continue
        methcr1 = meth1[cr]
        depthcr1 = depth1[cr]
        index = refseq.find(rc_match)
        while index >= 0:
            if depthcr1[pos+index] < 65535:
                if seq[index] == rc_convert: depthcr1[pos+index] += 1
                if seq[index] == rc_match: 
                    methcr1[pos+index] += 1
                    depthcr1[pos+index] += 1
            index = refseq.find(rc_match, index+1)

    fin.close()

if options.combine_CpG:
    disp('combining CpG methylation from both strands ...')
    for cr in depth:
        depthcr, methcr, refcr = depth[cr], meth[cr], ref[cr]
        if options.CT_SNP > 0: depthcr1, methcr1 = depth1[cr], meth1[cr]
        pos = refcr.find('CG')
        while pos >= 0:
            if depthcr[pos] + depthcr[pos+1] <= 65535:
                depthcr[pos] += depthcr[pos+1]
                methcr[pos] += methcr[pos+1]
            else:
                depthcr[pos] = (depthcr[pos] + depthcr[pos+1]) / 2
                methcr[pos] = (methcr[pos] + methcr[pos+1]) / 2
            depthcr[pos+1] = 0
            methcr[pos+1] = 0
            if options.CT_SNP > 0:
                if depthcr1[pos] + depthcr1[pos+1] <= 65535:
                    depthcr1[pos] += depthcr1[pos+1]
                    methcr1[pos] += methcr1[pos+1]
                else:
                    depthcr1[pos] = (depthcr1[pos] + depthcr1[pos+1]) / 2
                    methcr1[pos] = (methcr1[pos] + methcr1[pos+1]) / 2
            pos = refcr.find('CG', pos+2)

disp('writing %s ...' % options.outfile)
ss = {'C': '+', 'G': '-'}
fout = open(options.outfile, 'w')
if not options.no_header: 
    fout.write('chr\tpos\tstrand\tcontext\tratio\teff_CT_count\tC_count\tCT_count\trev_G_count\trev_GA_count\tCI_lower\tCI_upper\n')
z95, z95sq = 1.96, 1.96 * 1.96
nc, nd, dep0 = 0, 0, options.min_depth
for cr in sorted(depth.keys()):
    depthcr, methcr, refcr = depth[cr], meth[cr], ref[cr]
    if options.CT_SNP > 0: depthcr1, methcr1 = depth1[cr], meth1[cr]
    for i, dd in enumerate(depthcr):
        if dd < dep0: continue
        if options.CT_SNP > 0: 
            m1, d1 = methcr1[i], depthcr1[i]
            if m1 != d1:
                if options.CT_SNP == 2: continue
                d = float(dd) * m1 / d1
            else: d = float(dd)
        else: d = float(dd)
        nc += 1
        nd += d
        m = methcr[i]
        if m == 0 and not options.meth0: continue
        seq = refcr[i-2:i+3]
        try: strand = ss[refcr[i]]
        except KeyError: continue
        try: ratio = float(min(m,d)) / d
        except ZeroDivisionError:
            if options.CT_SNP:
                fout.write('%s\t%d\t%c\t%s\tNA\t%.2f\t%d\t%d\t%d\t%d\tNA\tNA\n' % (cr, i+1, strand, seq, d, m, dd, m1, d1))
            else:
                fout.write('%s\t%d\t%c\t%s\tNA\t%.2f\t%d\t%d\tNA\tNA\tNA\tNA\n' % (cr, i+1, strand, seq, d, m, dd))
            continue     
        pmid = ratio + z95sq / (2 * d)
        sd = z95 * ((ratio*(1-ratio)/d + z95sq/(4*d*d)) ** 0.5)
        norminator = 1 + z95sq / d
        CIl, CIu = (pmid - sd) / norminator, (pmid + sd) / norminator
        if options.CT_SNP:
            fout.write('%s\t%d\t%c\t%s\t%.3f\t%.2f\t%d\t%d\t%d\t%d\t%.3f\t%.3f\n' 
                % (cr, i+1, strand, seq, ratio, d, m, dd, m1, d1, CIl, CIu))
        else:
            fout.write('%s\t%d\t%c\t%s\t%.3f\t%.2f\t%d\t%d\tNA\tNA\t%.3f\t%.3f\n'
                % (cr, i+1, strand, seq, ratio, d, m, dd, CIl, CIu))

fout.close()
disp('done.')
if nc > 0: print 'total %d valid mappings, %d covered cytosines, average coverage: %.2f fold.' % (nmap, nc, float(nd)/nc)



The code with changes so that the python code can read and write files to hdfs:



import sys, time, os, array, optparse, subprocess
usage = "usage: %prog [options] BSMAP_MAPPING_FILES"
parser = optparse.OptionParser(usage=usage)

parser.add_option("-o", "--out", dest="outfile", metavar="FILE", help="output file name. (required)", default="")
parser.add_option("-d", "--ref", dest="reffile", metavar="FILE", help="reference genome fasta file. (required)", default="")
parser.add_option("-c", "--chr", dest="chroms", metavar="CHR", help="process only specified chromosomes, separated by ','. [default: all]\nexample: --chroms=chr1,chr2", default=[])
parser.add_option("-s", "--sam-path", dest="sam_path", metavar="PATH", help="path to samtools. [default: none]", default='')
parser.add_option("-u", "--unique", action="store_true", dest="unique", help="process only unique mappings/pairs.", default=False)                                    
parser.add_option("-p", "--pair", action="store_true", dest="pair", help="process only properly paired mappings.", default=False)
parser.add_option("-z", "--zero-meth", action="store_true", dest="meth0", help="report loci with zero methylation ratios.", default=False)
parser.add_option("-q", "--quiet", action="store_true", dest="quiet", help="don't print progress on stderr.", default=False)
parser.add_option("-r", "--remove-duplicate", action="store_true", dest="rm_dup", help="remove duplicated reads.", default=False)
parser.add_option("-t", "--trim-fillin", dest="trim_fillin", type="int", metavar='N', help="trim N end-repairing fill-in nucleotides. [default: 2]", default=2)
parser.add_option("-g", "--combine-CpG", action="store_true", dest="combine_CpG", help="combine CpG methylaion ratios on both strands.", default=False)
parser.add_option("-m", "--min-depth", dest="min_depth", type="int", metavar='FOLD', help="report loci with sequencing depth>=FOLD. [default: 1]", default=1)
parser.add_option("-n", "--no-header", action="store_true", dest="no_header", help="don't print a header line", default=False)
parser.add_option("-i", "--ct-snp", dest="CT_SNP", help='how to handle CT SNP ("no-action", "correct", "skip"), default: "correct".', default="correct")

options, infiles = parser.parse_args()

if len(options.reffile) == 0: parser.error("Missing reference file, use -d or --ref option.")
if len(options.outfile) == 0: parser.error("Missing output file name, use -o or --out option.")
if len(infiles) == 0: parser.error("Require at least one BSMAP_MAPPING_FILE.") 
if any(options.chroms): options.chroms = options.chroms.split(',')
CT_SNP_val = {"no-action": 0, "correct": 1, "skip": 2}
try: options.CT_SNP = CT_SNP_val[options.CT_SNP.lower()]
except: parser.error('Invalid -i value, select "no-action", "correct" or "skip"')
if options.min_depth <= 0: parser.error('Invalid -m value, must >= 1')
if options.trim_fillin < 0: parser.error('Invalid -t value, must >= 0')

if any(options.sam_path): 
    if options.sam_path[-1] != '/': options.sam_path += '/'

def disp(txt, nt=0):
    if not options.quiet: print >> sys.stderr, ''.join(['\t' for i in xrange(nt)]+['@ ',time.asctime(),': ',txt])

def get_alignment(line):
    col = line.split('\t')
    if sam_format:
        flag = col[1] 
        if 'u' in flag: return []
        if options.unique and 's' in flag: return []
        if options.pair and 'P' not in flag: return []
        cr, pos, cigar, seq, strand, insert = col[2], int(col[3])-1, col[5], col[9], '', int(col[8])
        if cr not in options.chroms: return []
        for aux in col[11:]:
            if aux[:5] == 'ZS:Z:': 
                strand = aux[5:7]
                break
        assert strand, 'missing strand information "ZS:Z:xx"'
        gap_pos, gap_size = 0, 0
        while 'I' in cigar or 'D' in cigar:
            for sep in 'MID':
                try: gap_size = int(cigar.split(sep, 1)[0])
                except ValueError: continue
                break
            if sep == 'M': gap_pos += gap_size
            elif sep == 'I': seq = seq[:gap_pos] + seq[gap_pos+gap_size:]
            elif sep == 'D': 
                seq = seq[:gap_pos] + '-' * gap_size + seq[gap_pos:]                        
                gap_pos += gap_size
            cigar = cigar[cigar.index(sep)+1:]
    else:
        flag = col[3][:2]
        if flag == 'NM' or flag == 'QC': return []
        if options.unique and flag != 'UM': return []
        if options.pair and col[7] == '0': return []
        seq, strand, cr, pos, insert, mm = col[1], col[6], col[4], int(col[5])-1, int(col[7]), col[9]
        if cr not in options.chroms: return []
        if ':' in mm:
            tmp = mm.split(':')
            gap_pos, gap_size = int(tmp[1]), int(tmp[2])
            if gap_size < 0: seq = seq[:gap_pos] + seq[gap_pos-gap_size:]  # insertion on reference
            else: seq = seq[:gap_pos] + '-' * gap_size + seq[gap_pos:]
    if pos + len(seq) >= len(ref[cr]): return []
    if options.rm_dup:  # remove duplicate hits
        if strand == '+-' or strand == '-+': frag_end, direction = pos+len(seq), 2
        else: frag_end, direction = pos, 1
        if coverage[cr][frag_end] & direction: return []
        coverage[cr][frag_end] |= direction
    if options.trim_fillin > 0: # trim fill in nucleotides
        if strand == '+-': seq = seq[:-options.trim_fillin]
        elif strand == '--': seq, pos = seq[options.trim_fillin:], pos+options.trim_fillin
        elif insert != 0 and len(seq) > abs(insert) - options.trim_fillin:
            trim_nt = len(seq) - (abs(insert) - options.trim_fillin)
            if strand == '++': seq = seq[:-trim_nt]
            elif strand == '-+': seq, pos =seq[trim_nt:], pos+trim_nt
    if sam_format and insert > 0: seq = seq[:int(col[7])-1-pos] # remove overlapped regions in paired hits, SAM format only
    return (seq, strand[0], cr, pos)

ref, cr, seq = {}, '', ''
disp('reading reference %s ...' % options.reffile)
rfile = subprocess.Popen(["hadoop", "fs", "-cat", "/usr/bin/hadoop/"+options.reffile], stdout=subprocess.PIPE)
for line in rfile.stdout:
#    print line
#for line in open(options.reffile):
    if line[0] == '>': 
        if any(cr): 
            if len(options.chroms) == 0 or cr in options.chroms: ref[cr] = seq.upper()
        cr, seq = line[1:-1].split()[0], ''
    else: seq += line.strip()

if len(options.chroms) == 0 or cr in options.chroms: ref[cr] = seq.upper()
del seq

meth, depth, coverage, meth1, depth1 = {}, {}, {}, {}, {}
for cr in ref:
    meth[cr] = array.array('H', [0]) * len(ref[cr])
    depth[cr] = array.array('H', [0]) * len(ref[cr])
    if options.rm_dup: coverage[cr] = array.array('B', [0]) * len(ref[cr])
    if options.CT_SNP > 0:
        meth1[cr] = array.array('H', [0]) * len(ref[cr])
        depth1[cr] = array.array('H', [0]) * len(ref[cr])

options.chroms = set(ref.keys())

BS_conversion = {'+': ('C','T','G','A'), '-': ('G','A','C','T')}
nmap = 0
print 'before hadoop'
hadoop = subprocess.Popen(["hadoop", "fs", "-cat", "/usr/bin/hadoop/"+infiles], stdout=subprocess.PIPE)
print 'after hadoop'
for infile in hadoop.stdout:
    print 'inside for-loop, infile is:', infile
    nline = 0
    if infile[-4:].upper() == '.SAM': 
        sam_format = True
        cmd = '%ssamtools view -XS %s' % (options.sam_path, infile)
        print 'calling os.popen, cmd is:', cmd
        fin = os.popen(cmd) 
    elif infile[-4:].upper() == '.BAM': 
        sam_format = True
        cmd = '%ssamtools view -X %s' % (options.sam_path, infile)
        print 'calling os.popen, cmd is:', cmd
        fin = os.popen(cmd)
    else: 
        sam_format = False
        print 'before calling hadoop second time'
        fin = subprocess.Popen(["hadoop", "fs", "-cat", "/usr/bin/hadoop/"+infiles], stdout=subprocess.PIPE)
        print 'after calling hadoop second time'
        for line in fin:
            print 'inside inner for-loop, line is:', line
        nline += 1
        if nline % 10000000 == 0: disp('read %d lines' % nline, nt=1)
        map_info = get_alignment(line)
        if len(map_info) == 0: continue
        seq, strand, cr, pos = map_info
        depthcr = depth[cr]
        if pos + len(seq) > len(depthcr): continue
        nmap += 1
        methcr = meth[cr]
        refseq = ref[cr][pos:pos+len(seq)]
        match, convert, rc_match, rc_convert = BS_conversion[strand]
        index = refseq.find(match)
        while index >= 0:
            if depthcr[pos+index] < 65535:
                if seq[index] == convert: depthcr[pos+index] += 1
                elif seq[index] == match: 
                    methcr[pos+index] += 1
                    depthcr[pos+index] += 1
            index = refseq.find(match, index+1)
        if options.CT_SNP == 0: continue
        methcr1 = meth1[cr]
        depthcr1 = depth1[cr]
        index = refseq.find(rc_match)
        while index >= 0:
            if depthcr1[pos+index] < 65535:
                if seq[index] == rc_convert: depthcr1[pos+index] += 1
                if seq[index] == rc_match: 
                    methcr1[pos+index] += 1
                    depthcr1[pos+index] += 1
            index = refseq.find(rc_match, index+1)

    fin.close()

if options.combine_CpG:
    disp('combining CpG methylation from both strands ...')
    for cr in depth:
        depthcr, methcr, refcr = depth[cr], meth[cr], ref[cr]
        if options.CT_SNP > 0: depthcr1, methcr1 = depth1[cr], meth1[cr]
        pos = refcr.find('CG')
        while pos >= 0:
            if depthcr[pos] + depthcr[pos+1] <= 65535:
                depthcr[pos] += depthcr[pos+1]
                methcr[pos] += methcr[pos+1]
            else:
                depthcr[pos] = (depthcr[pos] + depthcr[pos+1]) / 2
                methcr[pos] = (methcr[pos] + methcr[pos+1]) / 2
            depthcr[pos+1] = 0
            methcr[pos+1] = 0
            if options.CT_SNP > 0:
                if depthcr1[pos] + depthcr1[pos+1] <= 65535:
                    depthcr1[pos] += depthcr1[pos+1]
                    methcr1[pos] += methcr1[pos+1]
                else:
                    depthcr1[pos] = (depthcr1[pos] + depthcr1[pos+1]) / 2
                    methcr1[pos] = (methcr1[pos] + methcr1[pos+1]) / 2
            pos = refcr.find('CG', pos+2)

disp('writing %s ...' % options.outfile)
ss = {'C': '+', 'G': '-'}
fout = open(options.outfile, 'w')
if not options.no_header: 
    fout.write('chr\tpos\tstrand\tcontext\tratio\teff_CT_count\tC_count\tCT_count\trev_G_count\trev_GA_count\tCI_lower\tCI_upper\n')
z95, z95sq = 1.96, 1.96 * 1.96
nc, nd, dep0 = 0, 0, options.min_depth
for cr in sorted(depth.keys()):
    depthcr, methcr, refcr = depth[cr], meth[cr], ref[cr]
    if options.CT_SNP > 0: depthcr1, methcr1 = depth1[cr], meth1[cr]
    for i, dd in enumerate(depthcr):
        if dd < dep0: continue
        if options.CT_SNP > 0: 
            m1, d1 = methcr1[i], depthcr1[i]
            if m1 != d1:
                if options.CT_SNP == 2: continue
                d = float(dd) * m1 / d1
            else: d = float(dd)
        else: d = float(dd)
        nc += 1
        nd += d
        m = methcr[i]
        if m == 0 and not options.meth0: continue
        seq = refcr[i-2:i+3]
        try: strand = ss[refcr[i]]
        except KeyError: continue
        try: ratio = float(min(m,d)) / d
        except ZeroDivisionError:
            if options.CT_SNP:
                fout.write('%s\t%d\t%c\t%s\tNA\t%.2f\t%d\t%d\t%d\t%d\tNA\tNA\n' % (cr, i+1, strand, seq, d, m, dd, m1, d1))
            else:
                fout.write('%s\t%d\t%c\t%s\tNA\t%.2f\t%d\t%d\tNA\tNA\tNA\tNA\n' % (cr, i+1, strand, seq, d, m, dd))
            continue     
        pmid = ratio + z95sq / (2 * d)
        sd = z95 * ((ratio*(1-ratio)/d + z95sq/(4*d*d)) ** 0.5)
        norminator = 1 + z95sq / d
        CIl, CIu = (pmid - sd) / norminator, (pmid + sd) / norminator
        if options.CT_SNP:
            fout.write('%s\t%d\t%c\t%s\t%.3f\t%.2f\t%d\t%d\t%d\t%d\t%.3f\t%.3f\n' 
                % (cr, i+1, strand, seq, ratio, d, m, dd, m1, d1, CIl, CIu))
        else:
            fout.write('%s\t%d\t%c\t%s\t%.3f\t%.2f\t%d\t%d\tNA\tNA\t%.3f\t%.3f\n'
                % (cr, i+1, strand, seq, ratio, d, m, dd, CIl, CIu))

fout.close()
disp('done.')
if nc > 0: print 'total %d valid mappings, %d covered cytosines, average coverage: %.2f fold.' % (nmap, nc, float(nd)/nc)


The hadoop command to run I used :



 hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.5.1.jar -libjars /usr/local/hadoop-bam/hadoop-bam-7.0.0-jar-with-dependencies.jar  -D org.apache.hadoop.mapreduce.lib.input.FileInputFormat=org.seqdoop.hadoop_bam.BAMInputFormat -file ./methratio.py -file '../fadata/test.fa' -mapper 'python methratio.py --ref=../fadata/test.fa -r -g --out=bsmap_out_sample1.txt wgEncodeSydhRnaSeqK562Ifna6hPolyaAln.bam' -input ./wgEncodeSydhRnaSeqK562Ifna6hPolyaAln.bam -output ./outfile


The Error I am getting :

5/01/22 15:52:17 INFO mapreduce.Job:  map 0% reduce 0%
15/01/22 15:52:23 INFO mapreduce.Job: Task Id : attempt_1418762215449_0033_m_000016_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

Reply all
Reply to author
Forward
0 new messages