# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Small genomics methods packaged for re-use.
genome_data logic has been imported from packages/graphics/.../genome_scale.py and
should eventually be inherited back there.
'''
import os
import sys
from collections import OrderedDict
from labtools import const
from labtools import misc
HG19 = 'ucsc.hg19'
CRCh38 = 'GRCh38.d1.vd1'
HG38 = CRCh38
CHROMOSOME = 'chromosome'
CHRM = 'chrM'
CHRX = 'chrX'
CHRY = 'chrY'
NON_AUTOSOMAL = [CHRM, CHRX, CHRY]
CHROMOSOME_BED_FMT = 'chromosomes.{0}.{1}.bed'
SORTING = 'sorting'
KIND = 'kind'
LABEL = 'label'
START = 'start'
END = 'end'
OFFSET = 'offset'
WHOLE = 'whole'
ARMS = 'by_arms'
LEXSORT = 'lexsort'
NUMSORT = 'numsort'
[docs]def fai_dict(failines):
fai = [ll.split(const.TAB) for ll in failines]
keys = []
offsets = []
length = 1
for i in range(len(fai)):
keys.append(fai[i][0])
offsets.append(length)
length += int(fai[i][1])
return OrderedDict(zip(keys, offsets))
FAI_SUFFIXES = ('.fa.idx', '.fai', '.fa.fai')
[docs]def read_genome_fai(genome='GRCh38.d1.vd1', location=None):
'''
Read a fasta index and make displayable-genome offsets.
Default genome is GRCh38.d1.vd1
'''
given = location is not None
if not given:
location = os.path.join(os.environ['TEAM_ROOT'], 'reference', 'genomics', 'Hsap', genome)
gpaths = [os.path.join(location, misc.suffixed_name(genome, sfx)) for sfx in FAI_SUFFIXES]
gpaths = [gpath for gpath in gpaths if misc.is_valid_filepath(gpath)[0]]
if not gpaths:
raise UserWarning('Genome index ending in one of %r not found in %s location %s' %
(list((misc.suffixed_name(genome, sfx) for sfx in FAI_SUFFIXES)),
'given' if given else 'default',
location))
with open(os.path.join(gpaths[0])) as fai:
elements = OrderedDict()
for line in fai.readlines():
chrom = line.strip().split(const.TAB, 1)[0]
quit = any([chrom.startswith(e+const.UNDERBAR) for e in elements.keys()])
elements[chrom] = line.strip()
if quit:
# Put the first weird one in as a boundary
break
return const.NEWLINE.join(list(elements.values()))
[docs]def read_genome_bed(genome='GRCh38.d1.vd1', kind=WHOLE, sorting=LEXSORT, location=None):
'''
Read a bedfile and make displayable-genome offsets.
Default genome is GRCh38.d1.vd1, default kind is whole, choices: [whole, arms]
Default sorting is lexsort, choices: [lexsort, numsort]
'''
given = location is not None
if not given:
location = os.path.join(os.environ['TEAM_ROOT'], 'reference', 'genomics', 'Hsap', genome)
gpath = os.path.join(location, misc.suffixed_name(genome, CHROMOSOME_BED_FMT.format(kind, sorting)))
if not misc.is_valid_filepath(gpath):
raise UserWarning('Genome %s bedfile "%s" not found in %s location %s' %
(genome, os.path.basename(gpath), 'given' if given else 'default', location))
elements = OrderedDict()
with open(gpath) as bed:
hdridx = misc.line_to_dict(bed.readline().replace('#CHR', CHROMOSOME), None)
for line in bed.readlines():
linedict = misc.line_to_dict(line, hdridx)
linedict[START] = int(linedict[START])
linedict[END] = int(linedict[END])
if kind == WHOLE:
elements[linedict[CHROMOSOME]] = linedict
else:
elements[linedict[LABEL]] = linedict
return elements
CHR = 'chr'
POS = 'pos'
LOCUS = 'Locus'
_nullcounter = 0
def _null_ctr():
'''static method to fill out non-chromosomal entries'''
global _nullcounter
_nullcounter += 1
return _nullcounter
[docs]def sortable_coord(chrom, position=0, split=False, chronly=False):
"""
Return a string that allows for an ASCII sort, zero-padded so that chromosome 2 follows 1 and not 19,
and M, X, and Y follow 22.
>>> sortable_coord('chr1:1234', split=True)
>>> sortable_coord('chr1', 1234)
>>> sortable_coord(line_of_a_vcf.split(const.TAB)[:2]
"""
prefixed = not isinstance(chrom, int) and chrom.lower().startswith(CHR)
if prefixed:
chrom = chrom[3:]
if split:
chrom, position = chrom.split(const.COLON)
if isinstance(chrom, int) or chrom.isdigit():
chrom = '%02d' % (int(chrom))
else:
chrom = chrom.upper()
chrom = const.UNDERBAR + chrom if len(chrom) == 1 else chrom
if prefixed:
chrom = CHR + chrom
if chronly:
return chrom
elif isinstance(position, int) or position.isdigit():
return '%s:%010d' % (chrom, int(position))
else:
return '%s%2s:%010d' % (CHR if prefixed else const.EMPTY, '_Z', _null_ctr())
[docs]def trim_chrom(chrom):
if chrom[3:].isdigit():
c = str(int(chrom[3:]))
else:
c = chrom[3:].replace(const.UNDERBAR, const.EMPTY)
return chrom[:3]+c
[docs]def sortable_coord_to_vcf_coord(locus):
chrom, loc = locus.split(const.COLON,1)
return [trim_chrom(chrom), str(int(loc))]
[docs]def sortable_coord_to_bedfile_coord(locus):
chrom, loc = sortable_coord_to_vcf_coord(locus)
return [chrom, str(int(loc)-1), loc]
[docs]def sortable_line_to_vcfline(line, sep=const.TAB):
line = line.split(sep)
return sep.join(sortable_coord_to_vcf_coord(line[0]) + line[1:])
[docs]def sortable_line_to_bedline(line, sep=const.TAB):
line = line.split(sep)
return sep.join(sortable_coord_to_bedfile_coord(line[0]) + line[1:])
[docs]def normalize_chr_name(chrom):
"""
Return a string that "normalizes" *chrom* so that the string can be used
to act as a key in the CHROMOSOMES dictionary.
"""
if isinstance(chrom, int):
chrom = 'chr%d' % (chrom)
elif chrom.lower().startswith('chromosome'):
chrom = 'chr%s' % (chrom[10:])
elif chrom.lower().startswith(CHR):
chrom = trim_chrom(chrom)
else:
chrom = 'chr%s' % (chrom)
return chrom
[docs]def hundredK_roundup(val):
"""
Return *val* rounded up to the nearest 100K.
"""
hundredK = 100000
return hundredK*(1+val//hundredK)
[docs]def chromosome_chunks(chrom, size=3000000000, overlap=0, autosomes=False, build=HG38, location=None):
"""
Return a list of chunks of *chrom* of size roughly *size+overlap*, suitable
for use with *samtools -r* or *bedtools intersect*.
If the given *chromosome* is smaller than the desired *size+overlap*, simply return the
whole chromosome.
If the last chunk would be relatively small compared to *size* (less than 1/3),
append it to the previous chunk instead.
*autosomes* return empty lists for non-autosomal chromosomes
"""
genome = genome_data(build, location=location)
chrom = normalize_chr_name(chrom)
if autosomes and chrom in NON_AUTOSOMAL:
return []
chromsize = genome.genome[chrom][END]
start = 0
if size >= chromsize:
return([chrom])
regions = []
while start < chromsize:
if chromsize-(start+size+overlap) < 3*size//10:
regions.append('%s:%d-%d' % (chrom, start+1, hundredK_roundup(chromsize)))
break
regions.append('%s:%d-%d' % (chrom, start+1, start+size+overlap))
start += size
return regions
[docs]class genome_data():
'''
An object that holds chromosome lengths and lengths by arm, and admits of several simple access functions
to view those data. Default build is HG38 (GRCh38.d1.vd1, in point of fact). Expects the data to reside
in $TEAM_ROOT/reference/genomics/Hsap/<BUILDNAME>. Easy enough to fit HG19, but I'd have to get the pq arm
data into the correct format.
'''
def __init__(self, genome=HG38, for_spacing=False, location=None):
self.build = genome
self.genome = read_genome_bed(genome, WHOLE, sorting=NUMSORT, location=location)
offset = 0
for chrom in self.genome:
self.genome[chrom][OFFSET] = offset
offset += self.genome[chrom][END]
if for_spacing:
self.genome['chrZ'] = OrderedDict([('chromosome', 'chrZ'), ('start', 0),
('end', 0), ('label', 'chrZ'),
('offset', offset)])
self.arms = read_genome_bed(genome, ARMS, sorting=NUMSORT)
offset = 0
for arm in self.arms:
self.arms[arm][OFFSET] = offset + self.arms[arm][START]
if arm.endswith('q'):
offset += self.genome[arm[:-1]][END]
if for_spacing:
self.genome['chrZp'] = OrderedDict([('chromosome', 'chrZp'), ('start', 0),
('end', 0), ('label', 'chrZp'),
('offset', offset)])
[docs] def fai_locus(self, loc, pos=0, raise_error=False):
'''
Report offset between 0 and genome length of a chromosome or
chromosome arm locus plus a position, optionally raising an
error if position is off the end of the given segment.
>>> 0 == fai_locus('chr1', 0)
>>> x = fai_locus('chr1q', 100000)
'''
if isinstance(loc, int):
loc = str(loc)
if not loc.startswith('chr'):
loc = 'chr' + loc
if loc in self.genome:
map = self.genome
else:
if loc in self.arms:
map = self.arms
else:
if raise_error:
raise LabtoolsWarning('Locus "%s" not found among chromosomes or arms' %
(loc))
else:
return None
locus = map[loc][OFFSET] + pos
if raise_error and locus > map[loc][END]:
raise LabtoolsWarning('Locus %s:%d exceeds %s boundary' %
(loc, pos, 'arm' if map == self.arms else 'chromosome'))
return min(locus, map[loc][OFFSET] + map[loc][END])
[docs] def chromosome_length(self, chrom):
'''
Report the length of requested chromosome
'''
if chrom not in self.genome:
raise LabtoolsWarning('Chromosome %s not found among %s' %
(chrom, misc.elide_list(list(self.genome.keys()))))
return self.genome[chrom][END]
[docs] def arm(self, arm):
if arm not in self.arms:
raise LabtoolsWarning('Chromosome arm %s not found among %s' %
(arm, misc.elide_list(list(self.arms.keys()))))
thisarm = self.arms[arm]
return [thisarm[CHROMOSOME], thisarm[START]-1, thisarm[END]]
[docs] def arm_length(self, arm):
'''
Report the length of requested chromosome arm
'''
if arm not in self.arms:
return arm[:-1]
thisarm = self.arms[arm]
return thisarm[END] - thisarm[START] + 1
#print(arm('1p'))
#print(arm('1q'))
#print(arm('13p'))
#print(arm('Xq'))