Source code for phylogenetics

# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Tools to deal with phylogenetic stuff
'''

import os
import sys

from collections import OrderedDict, defaultdict
import subprocess

from labtools import const
from labtools import misc
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection
from labtools import genomics
from labtools import set_operations as setops
from labtools import scipy_operations as setops2
from labtools.set_operations import MEET_ONLY, FULL_GRAPH

DISTANCE = 'distance'
OUTTREE = 'outtree'
TREE = 'tree'
OUTFILE = 'outfile'
ANNOTATIONS = 'annotations'
PHYLIP = 'phylip'

STANDARD_MAPPING = dict({
        'meet' : 'normal',
        'join' : 'aberrant'
    })
STANDARD = 'standard'

class DistanceSet():
    '''
    A class that takes a dataframe full of values (all presumed one or zero), uses the columns as
    arrays to be fed nXn to a distance function. default function is jaccard distance.

    >>> j = DistanceSet('setname', dataframe, options...)

    That distance function is then used to run phylip and build 
    an unrooted categorization tree...

    >>> j.write_matrix()
    >>> j.run_phylip(['fitch' | 'kitsch' | 'neighbor'], options...)

    and finally render the tree as a png using the ETE tree renderer...

    >>> j.write_annotation(options...)
    >>> png = j.draw_tree(options...)

    meetjoin can be empty or one or both of {}

    distance_function can either be a function of two arrays that returns a float or the name of
    a method found in the scipy.distance module. Default is scipy.distance.jaccard

    phylipdir is the working directory for running phylip (which has its own peculiar assumptions
    about input, so must be isolated)

    phylipbase is the path to the phylip installation

    mean is used to normalize circle sizes in tree drawing. Default if None is to calculate
    the mean of the counts in the tree set (this is not nearly as useful as I had imagined).

    circlesize is the size of the circle expressing an average count
    '''.format(FULL_GRAPH)

    def __init__(self, treename, dataframe, meetjoin=(setops.MEET,),
                 distance_function=None, phylipdir=PHYLIP, phylipbase=None,
                 mean=None, circlesize=20, nan_value=0):
        '''
        Build a distance matrix using the columns of the given
        dataframe as entity names and the values as vectors to be
        pairwise compared

        distance_function is jaccard if None

        meetjoin is a tuple containing either 'meet' or 'join' or both.

        The class supports operations including printing, running
        phylip, and drawing a tree.
        '''
        raise UserWarning('labtools.phylogenetics has not been ported to python 3.0')

        self.treename = treename
        self.dataframe = dataframe
        self.meetjoin = meetjoin
        self.phylipdir = phylipdir
        self.distance_function = distance_function
        if phylipbase is None:
            if 'PHYLIP_BASE' not in os.environ:
                raise UserWarning('Either define path to phylip installation as PHYLIP_BASE in environment or pass it as parameter "phylipbase"')
        self.phylipbase = (phylipbase if phylipbase
                           else os.environ['PHYLIP_BASE'])
        self.circlesize = circlesize
        self.mean = mean
        self.members = sorted(dataframe.columns.values.tolist())
        self.files = defaultdict(str)
        self.variants = OrderedDict()
        for member in self.members:
            self.variants[member] = dataframe[member].values
        self.distance = setops2.compute_distance(self.variants,
                                                 self.distance_function,
                                                 nan_value,
                                                 self.meetjoin)

    def write_matrix(self, mapping=STANDARD, phylip=True):
        '''
        write the distance matrix needed by phylip from the distance array computed during init()

        if mapping is provided, use that mapping to rename the members of the set in the matrix file.
        (defaults to mapping MEET to 'normal' and JOIN to 'aberrant'; mapping=None allows no change.

        phylip=False will print the whole name of the sample instead of truncating to 10 characters
        to satisfy phylip.
        '''
        if mapping == STANDARD:
            mapping = STANDARD_MAPPING
        if not os.path.exists(self.phylipdir):
            os.makedirs(self.phylipdir)
        self.files[DISTANCE] = os.path.join(self.phylipdir,
                                            misc.suffixed_name(self.treename, DISTANCE))
        with open(self.files[DISTANCE], const.WRITE) as stream:
            setops.write_distance_matrix(self.variants, self.distance,
                                         stream, mapping, phylip, self.meetjoin)

    def run_phylip(self, tree_caller, distance_runner=None, debug=False):
        '''
        Run the phylip wrapper that does all sorts of niggly little preparation
        to run the phylip executable.

        tree_caller is the method used to arrange the tree, one of fitch, kitsch, or neighbor.

        distance_runner is the name of the phylip wrapper, normally found in VARIANT_BIN in the TEAM_ROOT
        '''
        if distance_runner is None:
            if 'VARIANT_BIN' not in os.environ:
                raise UserWarning('Either define path to directory containing phylip_runner.py as VARIANT_BIN in environment or pass the executable as parameter "distance_runner"')
            distance_runner = os.path.join(os.environ['VARIANT_BIN'], 'phylip_runner.py')
        prefix = os.path.join(self.phylipdir, self.treename)
        if not os.path.exists(prefix):
            os.makedirs(prefix)
        PHYLIP = '''{distance_runner} \
        --phylip_base {phylip_base} \
        --phylip_command {tree_caller} \
        --stdin Y \
        --rundir {outprefix} \
        --infile {outprefix}.distance \
        --outprefix {outprefix} \
        --outputs outfile outtree \
        --same_system'''.format(distance_runner=distance_runner,
                                phylip_base=self.phylipbase,
                                tree_caller=tree_caller,
                                treename=self.treename,
                                outprefix=prefix).split()
        if debug:
            PHYLIP.append('--nocleanup')
        if debug:
            print(const.SPACE.join(PHYLIP))
        try:
            stdout = subprocess.check_output(PHYLIP, stderr=subprocess.PIPE)
        except subprocess.CalledProcessError as cpe:
            print(cpe)
        self.files[OUTFILE] = misc.suffixed_name(prefix, OUTFILE)
        self.files[OUTTREE] = misc.suffixed_name(prefix, OUTTREE)
        return self.files[OUTTREE]


    def write_annotation(self, annodict=None, typingfunc=None, mean=None, dir=None, circlesize=12):
        '''
        Prepare an annotation file that can be used to enhance the tree drawing. See the static method for details.
        '''
        if typingfunc is None:
            def typingfunc(term):
                # this works for Kadara FieldEffect samples, with form AIR_NNN-Type
                return term[8]
        dir = dir if dir else self.phylipdir
        file = write_annotation(self.treename, self.members, self.variants, typingfunc,
                                annodict, mean, dir, circlesize)
        self.files[ANNOTATIONS] = file


    def draw_tree(self, annotations=None, dir=None, fileprefix=None, debug=False):
        '''
        Use the ETE tree drawing package, thanks to Jaime Huerta-Cepas.
        '''
        annofile = self.files[ANNOTATIONS] if annotations is None else annotations
        if not os.path.exists(annofile):
            raise UserWarning('"%s" specified %s is not a file.%s' % 
                              (annofile, 'on command line' if annotations else 'in DistanceSet object',
                               const.EMPTY if annotations else ' Run DistanceSet.write_annotations()?'))
        dir = dir if dir else self.phylipdir
        title = self.treename if not fileprefix else const.DASH.join([fileprefix, self.treename])
        prefix = os.path.join(dir, title)
        return draw_tree(title, self.members, self.files[OUTTREE], annofile,
                         
                         circlesize=self.circlesize, dir=dir, debug=debug)


[docs]def write_annotation(treename, members, variants, typingfunc, annodict=None, mean=None, dir=const.DOT, circlesize=12): ''' Prepare an annotation file that can be used to enhance the tree drawing. annodict is a dictionary of annotations to be fed to the ETE tree renderer by draw_tree, see that method. The default annotation dictionary is hopelessly associated with the Kadara lung field effect project. typingfunc is function that classifies samples into classes for coloring, etc. mean is used to normalize sample numbers, defaults to calculating the mean of the samples in the set. dir is the place to write the annotation file, default cwd. circlesize is the default size of a sphere/circle/square ''' import math import numpy NODESIZE = 'nodesize' TEXT = 'text' SAMPLE = 'sample' if annodict is None: ANNOTATION_HDR = 'sample nodecolor nodetype text nodesize textcolor alpha'.split() annodict = { 'T' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'crimson', 'sphere', None, None, 'white', .6]))), 'C' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'fuchsia', 'sphere', None, None, 'white', .5]))), 'S' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'royalblue', 'sphere', None, None, 'white', .4]))), 'L' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'deepskyblue', 'sphere', None, None, 'white', .5]))), 'N' : OrderedDict(list(zip(ANNOTATION_HDR, [None, 'seagreen', 'sphere', None, None, 'white', .5]))), } if annodict: ANNOTATION_HDR = list(annodict.values())[0].keys() if not os.path.exists(dir): os.makedirs(dir) filename = os.path.join(dir, misc.suffixed_name(treename, ANNOTATIONS)) sizeidx = NODESIZE if NODESIZE in ANNOTATION_HDR else None textidx = TEXT if TEXT in ANNOTATION_HDR else None if mean is None: mean = numpy.mean([variants[member].sum() for member in members]) with open(filename, const.WRITE) as stream: stream.write('%s\n' % const.TAB.join(ANNOTATION_HDR)) for member in members: mdict = annodict[typingfunc(member)] mdict[SAMPLE] = member vars = variants[member].sum() if textidx is not None: mdict[textidx] = str(int(round(vars))) if sizeidx is not None: mdict[sizeidx] = circlesize*math.sqrt(max(1,vars)/mean) stream.write('%s\n' % const.TAB.join([str(v) for v in list(mdict.values())])) return filename
[docs]def draw_tree(title, members, treefile, annofile, circlesize=12, dir=const.DOT, debug=False): ''' Curry up arguments to drive a python script that uses the ETE tree drawing package, many thanks to Jaime Huerta-Cepas and SciPy 2015. title is used as a label for the image and a prefix for the filename members is the set of sample names treefile is the phylip output, or any newick-format tree file. annofile is the file of optional node annotations circlesize is the default size of a sphere/circle/square dir is the place to write the annotation file, default cwd. ''' if not os.path.exists(annofile): raise UserWarning('"%s" specified %s is not a file.%s' % (annofile, 'on command line' if annotations else 'in DistanceSet object', const.EMPTY if annotations else ' Run DistanceSet.write_annotations()?')) if not os.path.exists(dir): os.makedirs(dir) prefix = os.path.join(dir, title) formatdict = dict({ 'graphics_bin' : os.environ['GRAPHICS_BIN'], 'title' : title, 'prefix' : prefix, 'sample' : const.SPACE.join(members), 'nodecolor' : 'steelblue', 'nodesize' : str(circlesize), 'textcolor' : 'gray', 'fontsize' : '14', 'treefile' : treefile, 'annotationfile' : annofile }) ETE = '''{graphics_bin}/draw_phylogenetic_tree.py --title {title} --kind tree --output {prefix}.png --sample {sample} --alpha 0.5 --nodecolor steelblue --nodesize {nodesize} --textcolor {textcolor} --fontsize {fontsize} --treefile {treefile} --annotation {annotationfile}''' command = ETE.format(**formatdict).split() if debug: print(const.SPACE.join(command)) tree = misc.suffixed_name(prefix, 'png') try: stdout = subprocess.check_output(command) return tree except subprocess.CalledProcessError as cpe: print('Failure:\n'+const.SPACE.join(command)) return 'Failed %s' % (tree)