Source code for genomics

# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Small genomics methods packaged for re-use.

genome_data logic has been imported from packages/graphics/.../genome_scale.py and
should eventually be inherited back there.
'''

import os
import sys

from collections import OrderedDict

from labtools import const
from labtools import misc

HG19 = 'ucsc.hg19'
CRCh38 = 'GRCh38.d1.vd1'
HG38 = CRCh38

CHROMOSOME = 'chromosome'

CHRM = 'chrM'
CHRX = 'chrX'
CHRY = 'chrY'
NON_AUTOSOMAL = [CHRM, CHRX, CHRY]

CHROMOSOME_BED_FMT = 'chromosomes.{0}.{1}.bed'
SORTING = 'sorting'
KIND = 'kind'
LABEL = 'label'
START = 'start'
END = 'end'
OFFSET = 'offset'


WHOLE = 'whole'
ARMS = 'by_arms'
LEXSORT = 'lexsort'
NUMSORT = 'numsort'

[docs]def fai_dict(failines): fai = [ll.split(const.TAB) for ll in failines] keys = [] offsets = [] length = 1 for i in range(len(fai)): keys.append(fai[i][0]) offsets.append(length) length += int(fai[i][1]) return OrderedDict(zip(keys, offsets))
FAI_SUFFIXES = ('.fa.idx', '.fai', '.fa.fai')
[docs]def read_genome_fai(genome='GRCh38.d1.vd1', location=None): ''' Read a fasta index and make displayable-genome offsets. Default genome is GRCh38.d1.vd1 ''' given = location is not None if not given: location = os.path.join(os.environ['TEAM_ROOT'], 'reference', 'genomics', 'Hsap', genome) gpaths = [os.path.join(location, misc.suffixed_name(genome, sfx)) for sfx in FAI_SUFFIXES] gpaths = [gpath for gpath in gpaths if misc.is_valid_filepath(gpath)[0]] if not gpaths: raise UserWarning('Genome index ending in one of %r not found in %s location %s' % (list((misc.suffixed_name(genome, sfx) for sfx in FAI_SUFFIXES)), 'given' if given else 'default', location)) with open(os.path.join(gpaths[0])) as fai: elements = OrderedDict() for line in fai.readlines(): chrom = line.strip().split(const.TAB, 1)[0] quit = any([chrom.startswith(e+const.UNDERBAR) for e in elements.keys()]) elements[chrom] = line.strip() if quit: # Put the first weird one in as a boundary break return const.NEWLINE.join(list(elements.values()))
[docs]def read_genome_bed(genome='GRCh38.d1.vd1', kind=WHOLE, sorting=LEXSORT, location=None): ''' Read a bedfile and make displayable-genome offsets. Default genome is GRCh38.d1.vd1, default kind is whole, choices: [whole, arms] Default sorting is lexsort, choices: [lexsort, numsort] ''' given = location is not None if not given: location = os.path.join(os.environ['TEAM_ROOT'], 'reference', 'genomics', 'Hsap', genome) gpath = os.path.join(location, misc.suffixed_name(genome, CHROMOSOME_BED_FMT.format(kind, sorting))) if not misc.is_valid_filepath(gpath): raise UserWarning('Genome %s bedfile "%s" not found in %s location %s' % (genome, os.path.basename(gpath), 'given' if given else 'default', location)) elements = OrderedDict() with open(gpath) as bed: hdridx = misc.line_to_dict(bed.readline().replace('#CHR', CHROMOSOME), None) for line in bed.readlines(): linedict = misc.line_to_dict(line, hdridx) linedict[START] = int(linedict[START]) linedict[END] = int(linedict[END]) if kind == WHOLE: elements[linedict[CHROMOSOME]] = linedict else: elements[linedict[LABEL]] = linedict return elements
CHR = 'chr' POS = 'pos' LOCUS = 'Locus' _nullcounter = 0 def _null_ctr(): '''static method to fill out non-chromosomal entries''' global _nullcounter _nullcounter += 1 return _nullcounter
[docs]def sortable_coord(chrom, position=0, split=False, chronly=False): """ Return a string that allows for an ASCII sort, zero-padded so that chromosome 2 follows 1 and not 19, and M, X, and Y follow 22. >>> sortable_coord('chr1:1234', split=True) >>> sortable_coord('chr1', 1234) >>> sortable_coord(line_of_a_vcf.split(const.TAB)[:2] """ prefixed = not isinstance(chrom, int) and chrom.lower().startswith(CHR) if prefixed: chrom = chrom[3:] if split: chrom, position = chrom.split(const.COLON) if isinstance(chrom, int) or chrom.isdigit(): chrom = '%02d' % (int(chrom)) else: chrom = chrom.upper() chrom = const.UNDERBAR + chrom if len(chrom) == 1 else chrom if prefixed: chrom = CHR + chrom if chronly: return chrom elif isinstance(position, int) or position.isdigit(): return '%s:%010d' % (chrom, int(position)) else: return '%s%2s:%010d' % (CHR if prefixed else const.EMPTY, '_Z', _null_ctr())
[docs]def trim_chrom(chrom): if chrom[3:].isdigit(): c = str(int(chrom[3:])) else: c = chrom[3:].replace(const.UNDERBAR, const.EMPTY) return chrom[:3]+c
[docs]def sortable_coord_to_vcf_coord(locus): chrom, loc = locus.split(const.COLON,1) return [trim_chrom(chrom), str(int(loc))]
[docs]def sortable_coord_to_bedfile_coord(locus): chrom, loc = sortable_coord_to_vcf_coord(locus) return [chrom, str(int(loc)-1), loc]
[docs]def sortable_line_to_vcfline(line, sep=const.TAB): line = line.split(sep) return sep.join(sortable_coord_to_vcf_coord(line[0]) + line[1:])
[docs]def sortable_line_to_bedline(line, sep=const.TAB): line = line.split(sep) return sep.join(sortable_coord_to_bedfile_coord(line[0]) + line[1:])
[docs]def normalize_chr_name(chrom): """ Return a string that "normalizes" *chrom* so that the string can be used to act as a key in the CHROMOSOMES dictionary. """ if isinstance(chrom, int): chrom = 'chr%d' % (chrom) elif chrom.lower().startswith('chromosome'): chrom = 'chr%s' % (chrom[10:]) elif chrom.lower().startswith(CHR): chrom = trim_chrom(chrom) else: chrom = 'chr%s' % (chrom) return chrom
[docs]def hundredK_roundup(val): """ Return *val* rounded up to the nearest 100K. """ hundredK = 100000 return hundredK*(1+val//hundredK)
[docs]def chromosome_chunks(chrom, size=3000000000, overlap=0, autosomes=False, build=HG38, location=None): """ Return a list of chunks of *chrom* of size roughly *size+overlap*, suitable for use with *samtools -r* or *bedtools intersect*. If the given *chromosome* is smaller than the desired *size+overlap*, simply return the whole chromosome. If the last chunk would be relatively small compared to *size* (less than 1/3), append it to the previous chunk instead. *autosomes* return empty lists for non-autosomal chromosomes """ genome = genome_data(build, location=location) chrom = normalize_chr_name(chrom) if autosomes and chrom in NON_AUTOSOMAL: return [] chromsize = genome.genome[chrom][END] start = 0 if size >= chromsize: return([chrom]) regions = [] while start < chromsize: if chromsize-(start+size+overlap) < 3*size//10: regions.append('%s:%d-%d' % (chrom, start+1, hundredK_roundup(chromsize))) break regions.append('%s:%d-%d' % (chrom, start+1, start+size+overlap)) start += size return regions
[docs]class genome_data(): ''' An object that holds chromosome lengths and lengths by arm, and admits of several simple access functions to view those data. Default build is HG38 (GRCh38.d1.vd1, in point of fact). Expects the data to reside in $TEAM_ROOT/reference/genomics/Hsap/<BUILDNAME>. Easy enough to fit HG19, but I'd have to get the pq arm data into the correct format. ''' def __init__(self, genome=HG38, for_spacing=False, location=None): self.build = genome self.genome = read_genome_bed(genome, WHOLE, sorting=NUMSORT, location=location) offset = 0 for chrom in self.genome: self.genome[chrom][OFFSET] = offset offset += self.genome[chrom][END] if for_spacing: self.genome['chrZ'] = OrderedDict([('chromosome', 'chrZ'), ('start', 0), ('end', 0), ('label', 'chrZ'), ('offset', offset)]) self.arms = read_genome_bed(genome, ARMS, sorting=NUMSORT) offset = 0 for arm in self.arms: self.arms[arm][OFFSET] = offset + self.arms[arm][START] if arm.endswith('q'): offset += self.genome[arm[:-1]][END] if for_spacing: self.genome['chrZp'] = OrderedDict([('chromosome', 'chrZp'), ('start', 0), ('end', 0), ('label', 'chrZp'), ('offset', offset)])
[docs] def fai_locus(self, loc, pos=0, raise_error=False): ''' Report offset between 0 and genome length of a chromosome or chromosome arm locus plus a position, optionally raising an error if position is off the end of the given segment. >>> 0 == fai_locus('chr1', 0) >>> x = fai_locus('chr1q', 100000) ''' if isinstance(loc, int): loc = str(loc) if not loc.startswith('chr'): loc = 'chr' + loc if loc in self.genome: map = self.genome else: if loc in self.arms: map = self.arms else: if raise_error: raise LabtoolsWarning('Locus "%s" not found among chromosomes or arms' % (loc)) else: return None locus = map[loc][OFFSET] + pos if raise_error and locus > map[loc][END]: raise LabtoolsWarning('Locus %s:%d exceeds %s boundary' % (loc, pos, 'arm' if map == self.arms else 'chromosome')) return min(locus, map[loc][OFFSET] + map[loc][END])
[docs] def chromosome_length(self, chrom): ''' Report the length of requested chromosome ''' if chrom not in self.genome: raise LabtoolsWarning('Chromosome %s not found among %s' % (chrom, misc.elide_list(list(self.genome.keys())))) return self.genome[chrom][END]
[docs] def arm(self, arm): if arm not in self.arms: raise LabtoolsWarning('Chromosome arm %s not found among %s' % (arm, misc.elide_list(list(self.arms.keys())))) thisarm = self.arms[arm] return [thisarm[CHROMOSOME], thisarm[START]-1, thisarm[END]]
[docs] def arm_length(self, arm): ''' Report the length of requested chromosome arm ''' if arm not in self.arms: return arm[:-1] thisarm = self.arms[arm] return thisarm[END] - thisarm[START] + 1
#print(arm('1p')) #print(arm('1q')) #print(arm('13p')) #print(arm('Xq'))