# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Tools to deal with phylogenetic stuff
'''
import os
import sys
from collections import OrderedDict, defaultdict
import subprocess
from labtools import const
from labtools import misc
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection
from labtools import genomics
from labtools import set_operations as setops
from labtools import scipy_operations as setops2
from labtools.set_operations import MEET_ONLY, FULL_GRAPH
DISTANCE = 'distance'
OUTTREE = 'outtree'
TREE = 'tree'
OUTFILE = 'outfile'
ANNOTATIONS = 'annotations'
PHYLIP = 'phylip'
STANDARD_MAPPING = dict({
'meet' : 'normal',
'join' : 'aberrant'
})
STANDARD = 'standard'
class DistanceSet():
'''
A class that takes a dataframe full of values (all presumed one or zero), uses the columns as
arrays to be fed nXn to a distance function. default function is jaccard distance.
>>> j = DistanceSet('setname', dataframe, options...)
That distance function is then used to run phylip and build
an unrooted categorization tree...
>>> j.write_matrix()
>>> j.run_phylip(['fitch' | 'kitsch' | 'neighbor'], options...)
and finally render the tree as a png using the ETE tree renderer...
>>> j.write_annotation(options...)
>>> png = j.draw_tree(options...)
meetjoin can be empty or one or both of {}
distance_function can either be a function of two arrays that returns a float or the name of
a method found in the scipy.distance module. Default is scipy.distance.jaccard
phylipdir is the working directory for running phylip (which has its own peculiar assumptions
about input, so must be isolated)
phylipbase is the path to the phylip installation
mean is used to normalize circle sizes in tree drawing. Default if None is to calculate
the mean of the counts in the tree set (this is not nearly as useful as I had imagined).
circlesize is the size of the circle expressing an average count
'''.format(FULL_GRAPH)
def __init__(self, treename, dataframe, meetjoin=(setops.MEET,),
distance_function=None, phylipdir=PHYLIP, phylipbase=None,
mean=None, circlesize=20, nan_value=0):
'''
Build a distance matrix using the columns of the given
dataframe as entity names and the values as vectors to be
pairwise compared
distance_function is jaccard if None
meetjoin is a tuple containing either 'meet' or 'join' or both.
The class supports operations including printing, running
phylip, and drawing a tree.
'''
raise UserWarning('labtools.phylogenetics has not been ported to python 3.0')
self.treename = treename
self.dataframe = dataframe
self.meetjoin = meetjoin
self.phylipdir = phylipdir
self.distance_function = distance_function
if phylipbase is None:
if 'PHYLIP_BASE' not in os.environ:
raise UserWarning('Either define path to phylip installation as PHYLIP_BASE in environment or pass it as parameter "phylipbase"')
self.phylipbase = (phylipbase if phylipbase
else os.environ['PHYLIP_BASE'])
self.circlesize = circlesize
self.mean = mean
self.members = sorted(dataframe.columns.values.tolist())
self.files = defaultdict(str)
self.variants = OrderedDict()
for member in self.members:
self.variants[member] = dataframe[member].values
self.distance = setops2.compute_distance(self.variants,
self.distance_function,
nan_value,
self.meetjoin)
def write_matrix(self, mapping=STANDARD, phylip=True):
'''
write the distance matrix needed by phylip from the distance array computed during init()
if mapping is provided, use that mapping to rename the members of the set in the matrix file.
(defaults to mapping MEET to 'normal' and JOIN to 'aberrant'; mapping=None allows no change.
phylip=False will print the whole name of the sample instead of truncating to 10 characters
to satisfy phylip.
'''
if mapping == STANDARD:
mapping = STANDARD_MAPPING
if not os.path.exists(self.phylipdir):
os.makedirs(self.phylipdir)
self.files[DISTANCE] = os.path.join(self.phylipdir,
misc.suffixed_name(self.treename, DISTANCE))
with open(self.files[DISTANCE], const.WRITE) as stream:
setops.write_distance_matrix(self.variants, self.distance,
stream, mapping, phylip, self.meetjoin)
def run_phylip(self, tree_caller, distance_runner=None, debug=False):
'''
Run the phylip wrapper that does all sorts of niggly little preparation
to run the phylip executable.
tree_caller is the method used to arrange the tree, one of fitch, kitsch, or neighbor.
distance_runner is the name of the phylip wrapper, normally found in VARIANT_BIN in the TEAM_ROOT
'''
if distance_runner is None:
if 'VARIANT_BIN' not in os.environ:
raise UserWarning('Either define path to directory containing phylip_runner.py as VARIANT_BIN in environment or pass the executable as parameter "distance_runner"')
distance_runner = os.path.join(os.environ['VARIANT_BIN'], 'phylip_runner.py')
prefix = os.path.join(self.phylipdir, self.treename)
if not os.path.exists(prefix):
os.makedirs(prefix)
PHYLIP = '''{distance_runner} \
--phylip_base {phylip_base} \
--phylip_command {tree_caller} \
--stdin Y \
--rundir {outprefix} \
--infile {outprefix}.distance \
--outprefix {outprefix} \
--outputs outfile outtree \
--same_system'''.format(distance_runner=distance_runner,
phylip_base=self.phylipbase,
tree_caller=tree_caller,
treename=self.treename,
outprefix=prefix).split()
if debug:
PHYLIP.append('--nocleanup')
if debug:
print(const.SPACE.join(PHYLIP))
try:
stdout = subprocess.check_output(PHYLIP, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as cpe:
print(cpe)
self.files[OUTFILE] = misc.suffixed_name(prefix, OUTFILE)
self.files[OUTTREE] = misc.suffixed_name(prefix, OUTTREE)
return self.files[OUTTREE]
def write_annotation(self, annodict=None, typingfunc=None, mean=None, dir=None, circlesize=12):
'''
Prepare an annotation file that can be used to enhance the tree drawing. See the static method for details.
'''
if typingfunc is None:
def typingfunc(term):
# this works for Kadara FieldEffect samples, with form AIR_NNN-Type
return term[8]
dir = dir if dir else self.phylipdir
file = write_annotation(self.treename, self.members, self.variants, typingfunc,
annodict, mean, dir, circlesize)
self.files[ANNOTATIONS] = file
def draw_tree(self, annotations=None, dir=None, fileprefix=None, debug=False):
'''
Use the ETE tree drawing package, thanks to Jaime Huerta-Cepas.
'''
annofile = self.files[ANNOTATIONS] if annotations is None else annotations
if not os.path.exists(annofile):
raise UserWarning('"%s" specified %s is not a file.%s' %
(annofile, 'on command line' if annotations else 'in DistanceSet object',
const.EMPTY if annotations else ' Run DistanceSet.write_annotations()?'))
dir = dir if dir else self.phylipdir
title = self.treename if not fileprefix else const.DASH.join([fileprefix, self.treename])
prefix = os.path.join(dir, title)
return draw_tree(title, self.members, self.files[OUTTREE], annofile,
circlesize=self.circlesize, dir=dir, debug=debug)
[docs]def write_annotation(treename, members, variants, typingfunc,
annodict=None, mean=None, dir=const.DOT, circlesize=12):
'''
Prepare an annotation file that can be used to enhance the tree drawing.
annodict is a dictionary of annotations to be fed to the ETE tree
renderer by draw_tree, see that method. The default annotation
dictionary is hopelessly associated with the Kadara lung field
effect project.
typingfunc is function that classifies samples into classes for coloring, etc.
mean is used to normalize sample numbers, defaults to calculating
the mean of the samples in the set.
dir is the place to write the annotation file, default cwd.
circlesize is the default size of a sphere/circle/square
'''
import math
import numpy
NODESIZE = 'nodesize'
TEXT = 'text'
SAMPLE = 'sample'
if annodict is None:
ANNOTATION_HDR = 'sample nodecolor nodetype text nodesize textcolor alpha'.split()
annodict = {
'T' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'crimson', 'sphere', None, None, 'white', .6]))),
'C' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'fuchsia', 'sphere', None, None, 'white', .5]))),
'S' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'royalblue', 'sphere', None, None, 'white', .4]))),
'L' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'deepskyblue', 'sphere', None, None, 'white', .5]))),
'N' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'seagreen', 'sphere', None, None, 'white', .5]))),
}
if annodict:
ANNOTATION_HDR = list(annodict.values())[0].keys()
if not os.path.exists(dir):
os.makedirs(dir)
filename = os.path.join(dir, misc.suffixed_name(treename, ANNOTATIONS))
sizeidx = NODESIZE if NODESIZE in ANNOTATION_HDR else None
textidx = TEXT if TEXT in ANNOTATION_HDR else None
if mean is None:
mean = numpy.mean([variants[member].sum() for member in members])
with open(filename, const.WRITE) as stream:
stream.write('%s\n' % const.TAB.join(ANNOTATION_HDR))
for member in members:
mdict = annodict[typingfunc(member)]
mdict[SAMPLE] = member
vars = variants[member].sum()
if textidx is not None:
mdict[textidx] = str(int(round(vars)))
if sizeidx is not None:
mdict[sizeidx] = circlesize*math.sqrt(max(1,vars)/mean)
stream.write('%s\n' % const.TAB.join([str(v) for v in list(mdict.values())]))
return filename
[docs]def draw_tree(title, members, treefile, annofile,
circlesize=12, dir=const.DOT, debug=False):
'''
Curry up arguments to drive a python script that uses the ETE tree
drawing package, many thanks to Jaime Huerta-Cepas and SciPy 2015.
title is used as a label for the image and a prefix for the filename
members is the set of sample names
treefile is the phylip output, or any newick-format tree file.
annofile is the file of optional node annotations
circlesize is the default size of a sphere/circle/square
dir is the place to write the annotation file, default cwd.
'''
if not os.path.exists(annofile):
raise UserWarning('"%s" specified %s is not a file.%s' %
(annofile, 'on command line' if annotations else 'in DistanceSet object',
const.EMPTY if annotations else ' Run DistanceSet.write_annotations()?'))
if not os.path.exists(dir):
os.makedirs(dir)
prefix = os.path.join(dir, title)
formatdict = dict({
'graphics_bin' : os.environ['GRAPHICS_BIN'],
'title' : title,
'prefix' : prefix,
'sample' : const.SPACE.join(members),
'nodecolor' : 'steelblue',
'nodesize' : str(circlesize),
'textcolor' : 'gray',
'fontsize' : '14',
'treefile' : treefile,
'annotationfile' : annofile
})
ETE = '''{graphics_bin}/draw_phylogenetic_tree.py
--title {title}
--kind tree
--output {prefix}.png
--sample {sample}
--alpha 0.5
--nodecolor steelblue
--nodesize {nodesize}
--textcolor {textcolor}
--fontsize {fontsize}
--treefile {treefile}
--annotation {annotationfile}'''
command = ETE.format(**formatdict).split()
if debug:
print(const.SPACE.join(command))
tree = misc.suffixed_name(prefix, 'png')
try:
stdout = subprocess.check_output(command)
return tree
except subprocess.CalledProcessError as cpe:
print('Failure:\n'+const.SPACE.join(command))
return 'Failed %s' % (tree)