Source code for pandas_tools

# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Tools to help with vcf manipulation in pandas
'''



import os
import sys

from collections import OrderedDict, defaultdict
import subprocess

import pandas as pd

from labtools import const
from labtools import misc
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection
from labtools import genomics
from labtools import phylogenetics as phyl
from labtools.phylogenetics import DistanceSet, MEET_ONLY, FULL_GRAPH

def pd_version():
    return pd.__version__

def pd_format(precision=7, width=10, rows=6):
    if precision: pd.set_option('precision', precision)
    if width: pd.set_option('max_colwidth', width)
    if rows: pd.set_option('max_rows', rows)


UTR3 = 'UTR3'
UTR5 = 'UTR5'
EXONIC = 'exonic'
INTRONIC = 'intronic'
NCRNA_INTRONIC = 'ncRNA_intronic'
SPLICING = 'splicing'
NONCODING = const.DOT
INTERGENIC = 'intergenic'
UPSTREAM = 'upstream'
DOWNSTREAM = 'downstream'

NONSYNONYMOUS = 'nonsynonymous SNV'
STOPGAIN = 'stopgain'
STOPLOSS = 'stoploss'
SYNONYMOUS = 'synonymous SNV'
UNKNOWN = 'unknown'

C_OTHER = 'Intron'
C_SYNONYMOUS = 'Synonym'
C_NONSYNONYMOUS = 'Nonsynonym'

def variant_characterization(muttype=None, loctype=None):
    if loctype != EXONIC:
        return C_OTHER

    if muttype in [NONSYNONYMOUS, STOPGAIN, STOPLOSS]:
        return C_NONSYNONYMOUS
    if muttype in [SYNONYMOUS]:
        return C_SYNONYMOUS
    return C_OTHER

A = 'A'
C = 'C'
G = 'G'
T = 'T'
BASES = [A, C, G, T]

_TRANSPOSE = {
    A : T,
    T : A,
    C : G,
    G : C,
    }

[docs]def collect_mapped_items(mlist, mapdict, sep=const.DASH): ''' Build a *sep*-separated string of the sorted unique mappings from *mapdict* for the members of *mlist* sep defaults to a DASH ''' map = [mapdict[m] if m in mapdict else m for m in mlist] return sep.join(sorted(set(map)))
[docs]def boolean_profile(blist): ''' Given a list of values that can be interpreted as booleans, return a string of 1's and 0's, one per value. ''' return const.EMPTY.join(['1' if b else '0' for b in blist])
[docs]def transversion_to_apply(row): ''' Return the transition/transversion for a variant as pulled from a pandas vcf dataframe, for use in >>> df.apply(thismodule.transversion_to_apply, axis=1) ''' if not min([row[b] in BASES for b in ('REF', 'ALT')]): return const.DOT if row['REF'] in [G, T]: return const.EMPTY.join([_TRANSPOSE[b] for b in [row['REF'], row['ALT']]]) else: return row['REF']+row['ALT']
[docs]def idx_merge(df, df2, **kwargs): ''' Curry out the virtually always used indexing and sorting from dataframe.merge() ''' return df.merge(df2, left_index=True, right_index=True, sort=True, **kwargs)
[docs]def stable_sort_dataframe(df, clist=None, ascending=True): ''' Stable sort of df on the requested columns, all by default. mergesort is the only stable sort available in pandas. ''' if clist is None: clist = df.columns.values.tolist() if pd_version() >= '0.17': df.sort_values(by=clist, inplace=True, kind='mergesort', ascending=ascending) else: df.sort(by=clist, inplace=True, kind='mergesort', ascending=ascending)
[docs]def convert_chrpos_to_locus(df, indexing=None, verify=True): ''' Take a dataframe with CHR and POS columns and change it to a CHR:00POS Locus format, reindexing to include Locus and other columns if desired. indexing=[] indexes by Locus. Otherwise, indexing is a list of column names to add to the Locus for indexing. ''' cols = df.columns.values.tolist() if indexing is not None: # try-block repeated below, sorry, but a numeric index could change during transformation # to Locus-format try: indexing = misc.validate_columns(cols, indexing, complain=True) except ValueError as e: raise LabtoolsWarning('%s(): Your indexing columns are probably wrong - %s' % (reflection.my_methodname(), e.message)) if genomics.POS.upper() not in [c.upper() for c in cols]: raise LabtoolsWarning('%s wants %s or %s in dataframe column list' % (reflection.my_methodname(), genomics.POS, genomics.POS.upper())) poscol = (cols.index(genomics.POS.upper()) if genomics.POS.upper() in cols else cols.index(genomics.POS)) chrcol = poscol - 1 chr = cols[chrcol] pos = cols[poscol] locusfunc = lambda r: genomics.sortable_coord(r[chr], r[pos]) locus = df.apply(locusfunc, axis=1) df.loc[:,genomics.LOCUS] = locus cols = df.columns.values.tolist() df = df[[cols[-1]]+cols[:-1]] df = df.drop([chr, pos], axis=1) try: stable_sort_dataframe(df, clist=[genomics.LOCUS] + (indexing if indexing else [])) except ValueError as e: if 'REF' in e.message: msg = ('%s(): In this pandas you cannot index by REF/ALT - %s' % (reflection.my_methodname(), e.message)) else: msg = ('%s(): Probably trying to index by "categorical" columns - %s' % (reflection.my_methodname(), e.message)) raise LabtoolsWarning(msg) if indexing is not None: try: indexing = misc.validate_columns(cols, indexing, complain=True) df.set_index([genomics.LOCUS]+indexing, inplace=True, verify_integrity=verify) except ValueError as e: raise LabtoolsWarning('%s(): I suspect your indexing columns are wrong - %s' % (reflection.my_methodname(), e.args[0])) return df
[docs]def convert_locus_to_chrpos(df, indexing=None, chromosome=genomics.CHR, verify=True): ''' Take a dataframe with a CHR:00POS Locus format and change it to CHR and POS columns, reindexing if desired. By default (indexing=None) the returned dataframe has only its numeric index. indexing=[] indexes by Locus. Otherwise, indexing is a list of column names or indices to add to the Locus for indexing. ''' cols = df.columns.values.tolist() if indexing is not None: # try-block repeated below, sorry, but a numeric index could change during transformation # from Locus-format try: indexing = misc.validate_columns(cols, indexing, complain=True) except ValueError as e: raise LabtoolsWarning('%s(): Your indexing columns are probably wrong - %s' % (reflection.my_methodname(), e.message)) if genomics.LOCUS.upper() in cols: locuscol = cols.index(genomics.LOCUS.upper()) elif genomics.LOCUS.capitalize() in cols: locuscol = cols.index(genomics.LOCUS.capitalize()) elif genomics.LOCUS in cols: locuscol = cols.index(genomics.LOCUS) else: raise LabtoolsWarning('%s wants %s, %s or %s in dataframe column list' % (reflection.my_methodname(), genomics.LOCUS, genomics.LOCUS.upper(), genomics.LOCUS.capitalize())) locusstr = cols[locuscol] locusfunc = lambda r: genomics.sortable_coord_to_vcf_coord(r[locusstr]) locus = df.apply(locusfunc, axis=1) df.loc[:,chromosome] = [cp[0] for cp in locus] df.loc[:,genomics.POS.upper()] = [cp[1] for cp in locus] cols = df.columns.values.tolist() df = df[cols[-2:]+cols[:-2]] df = df.drop([locusstr], axis=1) try: stable_sort_dataframe(df, clist=[chromosome, genomics.POS.upper()] + (indexing if indexing else [])) except ValueError as e: if 'REF' in e.message: msg = ('%s(): In this pandas you cannot index by REF/ALT - %s' % (reflection.my_methodname(), e.message)) else: msg = ('%s(): Probably trying to index by "categorical" columns - %s' % (reflection.my_methodname(), e.message)) raise LabtoolsWarning(msg) if indexing is not None: try: df.set_index([chromosome, genomics.POS.upper()]+indexing, inplace=True, verify_integrity=verify) except ValueError as e: raise LabtoolsWarning('%s(): I suspect your indexing columns are wrong - %s' % (reflection.my_methodname(), e.message)) return df
[docs]def rows_showing_value(df, filter=lambda x: x > 0, toplimit=0, bottomlimit=None, label='Filtered'): ''' Return a dataframe of the row-labels of df that meet *filter* on the columns between toplimit and bottomlimit, with *label* used as the column name filter is a boolean function that defaults to returning values greater than zero toplimit defaults to the top (first) row bottomlimit defaults to the bottom (last) row label defaults to 'Filtered' ''' raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' % (reflection.my_methodname())) id_values = df.index.values.tolist() if toplimit and not str(toplimit).isdigit(): if toplimit in id_values: toplimit = id_values.index(toplimit) else: raise LabtoolsWarning('toplimit "%s" not found in row index' %(toplimit)) if bottomlimit and not str(bottomlimit).isdigit(): if bottomlimit in id_values: bottomlimit = id_values.index(bottomlimit)+1 else: raise LabtoolsWarning('bottomlimit "%s" not found in row index' %(bottomlimit)) def _rsv_inner_func(row): return [id_values[r] for r in range(toplimit, len(row[:bottomlimit])) if list(filter(row[r]))] return pd.DataFrame(pd.Series(df.apply(_rsv_inner_func), name=label))
[docs]def columns_showing_value(df, filter=lambda x: x > 0, leftlimit=0, rightlimit=None, label='Filtered'): ''' Return a dataframe of the column-labels of df that meet *filter*. This is (no longer) simply a transpose of rows_showing_value, but see it for identical details, modulo c/top/left/ c/bottom/right/. The reason it differs is that transposing a multi-index causes a misbehavior that leads to an exception. (which is probably a pandas bug, but as usual I feel disinclined to delve). ''' raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' % (reflection.my_methodname())) if len(df.index.names) > 1: raise LabtoolsWarning("Cannot handle DataFrame's multi-index. Pandas bug? Index is %r" % (list(df.index.names))) id_values = df.columns.values.tolist() if leftlimit and not str(leftlimit).isdigit(): if leftlimit in id_values: leftlimit = id_values.index(leftlimit) else: raise LabtoolsWarning('leftlimit "%s" not found in column labels' %(leftlimit)) if rightlimit and not str(rightlimit).isdigit(): if rightlimit in id_values: rightlimit = id_values.index(rightlimit)+1 else: raise LabtoolsWarning('rightlimit "%s" not found in column labels' %(rightlimit)) def _csv_inner_func(col): return [id_values[r] for r in range(leftlimit, len(col[:rightlimit])) if list(filter(col[r]))] appl = df.apply(_csv_inner_func, reduce=True, axis=1) s = pd.Series(appl, name=label) return pd.DataFrame(s)
[docs]def boolean_group_profile(df, grouping, l_limit=0, r_limit=None, label=None): ''' Group the columns of dataframe df between l_limit and r_limit using *grouping* and return a boolean_profile of the grouping on those columns for each row. grouping is a function on the values of the columns l_limit and r_limit default to the leftmost and rightmost columns, ie, all if *label* is given, a dataframe with *label* as column name is returned. Otherwise, a Series is returned. ''' def _bgp_inner_func(row): id_values = df.columns.values.tolist() groups = sorted(set([grouping(id_values[g]) for g in range(l_limit, len(row[:r_limit]))])) gprofile = [max([row[j] for j in range(l_limit, len(row[:r_limit])) if g == grouping(id_values[j])]) for g in groups] return boolean_profile(gprofile) result = df.apply(_bgp_inner_func, axis=1) return pd.DataFrame(pd.Series(result, name=label)) if label else result
[docs]def one_row_per_locus(df, last=False, selector=None, axis=1): ''' Choose exactly one row for each locus, defaulting to the first, but allowing for a selector function to choose based on some other criteria (again defaulting to first if more than one match the criterion) last: whether to select the last matching row instead of the first selector: a boolean function given to df.apply(axis=1) for each row of the dataframe, to do a more refined selection axis: I can't figure out how it would make sense to apply on axis=0, but if you can, go for it. ''' raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' % (reflection.my_methodname())) onedf = None for g in sorted(set(df.index.values)): if isinstance(df.loc[g], pd.core.series.Series): dfrow = df.loc[[g]] else: if selector: indices = list(range(df.loc[[g]].shape[0])) if not last: # reverse so that the last row chosen by selector # will be the smallest matching index indices.reverse() dfrow = None print('Shape', df.loc[[g]].shape) booles = df.loc[[g]].apply(selector, axis=1) for idx in indices: print(booles[idx], df.loc[[g]].iloc[idx][:3]) if booles[idx]: dfrow = df.loc[[g]].iloc[idx] if dfrow is None: dfrow = df.loc[[g]].iloc[idx] else: dfrow = df.loc[g][-1:] if last else df.loc[g][0:1] if onedf is None: onedf = pd.DataFrame(dfrow) else: onedf = onedf.append(dfrow) return onedf
[docs]def rename_column(df, fromname, toname): ''' Change a column header from fromname to toname ''' dfcols = df.columns.values.tolist() fromcol = misc.validate_columns(dfcols, [fromname], listnames=False, complain=True)[0] df.columns = dfcols[0:fromcol] + [toname] + dfcols[fromcol+1:]
[docs]def rearrange_column(df, fromcol, tocol=0): ''' Move the column indexed at fromcol to the index at tocol, default first ''' dfcols = df.columns.values.tolist() fromcol, tocol = misc.validate_columns(dfcols, [fromcol, tocol], listnames=False, complain=True) if fromcol > tocol: newcols = (dfcols[0:tocol] + dfcols[fromcol:fromcol+1] + dfcols[tocol:fromcol] + dfcols[fromcol+1:]) else: newcols = (dfcols[0:fromcol] + dfcols[fromcol+1:tocol] + dfcols[fromcol:fromcol+1] + dfcols[tocol:]) return df[newcols]
[docs]def import_vtools_dataframe(dataframe, discriminant_columns=None, locus_columns=None, attribute_columns=None, sample_columns=None): ''' Assumes dataframe is a vtools-generated vcf-like file from the standard lab vtools annotation suite. No provision is made to determine the version that generated the vcf-like file. This could give you a subtle confusion some time in the future. Returns a pair of dataframes indexed by sortable_coord, the first with sample data, the second with attribute data. *discriminant_columns* is a list of additional columns to put in a multi-index. This really can't be more than whatever the first column of the dataframe is because of indexing/sorting constraints. Default none. *locus_columns* columns in which to find the chr,pos values -- default, based on Kadara workflow, is 0-based cols 1 and 2. *attribute_columns* columns to put into the second returned dataframe, default 0-based 4-22 *sample_columns* columns of interest, default 23 to the end. ''' tmp = pd.read_table(dataframe,header=0) header = tmp.columns.values.tolist() if discriminant_columns: discriminant_columns = misc.validate_columns(header, discriminant_columns) if locus_columns is None: locus_columns = list(range(1,3)) locus_columns = misc.validate_columns(header, locus_columns) if attribute_columns is None: attribute_columns = list(range(header.index('ID'),header.index('FORMAT'))) attribute_columns = misc.validate_columns(header, attribute_columns) if sample_columns is None: if discriminant_columns is not None and not len(discriminant_columns): # Hack. For files formed from multiple vtools inputs, there's a lead column. # Consequence of this being developed for Kadara lung technology paper. sample_columns = (list(range(min(misc.validate_columns(header, locus_columns, listnames=False)))) + list(range(23, len(header)))) else: sample_columns = list(range(23,len(header))) sample_columns = misc.validate_columns(header, sample_columns) new_columns = (locus_columns + (discriminant_columns if discriminant_columns else []) + attribute_columns + sample_columns) tmp = convert_chrpos_to_locus(tmp[new_columns], indexing=discriminant_columns, verify=False) return tmp[sample_columns], one_row_per_locus(tmp[attribute_columns])
[docs]def condense_dataframe(df, matchlist, match_rows=True, match_columns=True): ''' Given a set of row/column names, reduce df to contain only the terms found in matchlist. By default, reduces both rows and columns. match_rows=False and match_columns=False do the obvious thing. Both false gives a warning. ''' if isinstance(matchlist, pd.Series): matchlist = list(matchlist) if not matchlist or not (match_rows or match_columns): raise LabtoolsWarning(reflection.my_methodname() + 'must have a useful input') if match_rows: if len(df.axes[0].shape) > 1: raise LabtoolsWarning(reflection.my_methodname() + "can't handle multi-indexes yet") filtered = [v for v in df.axes[0].values if v in matchlist] df = df.loc[filtered,:] if match_columns: if len(df.axes[1].shape) > 1: raise LabtoolsWarning(reflection.my_methodname() + "can't handle multi-indexes yet") filtered = [v for v in df.axes[1].values if v in matchlist] df = df.loc[:,filtered] return df
class ScikitNaiveBayes(): def __init__(self, skscore, category_columns, classifier_column, training_fraction=1, random_seed=None): ''' train a naive bayesian classifier on the attributes in a dataframe return the classifier and the unclassified subset if there is one ''' from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import CountVectorizer import random random.seed(random_seed) self.cvec = CountVectorizer() self.classifier_column = (classifier_column if isinstance(classifier_column, int) else skscore.columns.values.tolist().index(classifier_column)) self.category_columns = category_columns if misc.one_is_true([isinstance(c, int) for c in category_columns]): sktrain = skscore.iloc[:,self.category_columns] else: sktrain = skscore.loc[:,self.category_columns] self.actual_result = skscore.iloc[:,self.classifier_column].values self.trainable = sktrain.apply(lambda r: const.TAB.join(r.values), axis=1).values set_size = len(self.trainable) tset_size = int(training_fraction*set_size) indices = list(range(set_size)) random.shuffle(indices) self.training_indices = sorted(indices[:tset_size]) self.training_results = [self.actual_result[idx] for idx in self.training_indices] #print('TRAIN %d %r' % (len(self.trainable), self.trainable)) try: training_counts = self.cvec.fit_transform([self.trainable[idx] for idx in self.training_indices]) except ValueError as v: raise LabtoolsWarning('Probably empty training set: %s' % v) items_left = sorted(indices[tset_size:]) self.remainder_set = [self.trainable[idx] for idx in items_left] self.remainder_results = [self.actual_result[idx] for idx in items_left] self.clf = MultinomialNB().fit(training_counts, self.training_results) def predict(self, testset=None): if testset is None: testset = self.remainder_set test_counts = self.cvec.transform(testset) predicted = self.clf.predict(test_counts) return predicted def compare(self, predicted, scoremap=None, dump=False): ''' Do a Pearson correlation of predicted vs actual. ''' pdf = pd.DataFrame(pd.Series(self.remainder_results, name='Actual')) pdf['Prediction'] = predicted scored = pdf.applymap(lambda r: scoremap[r] if scoremap else r) return scored if dump else scored.corr()