# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Tools to help with vcf manipulation in pandas
'''
import os
import sys
from collections import OrderedDict, defaultdict
import subprocess
import pandas as pd
from labtools import const
from labtools import misc
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection
from labtools import genomics
from labtools import phylogenetics as phyl
from labtools.phylogenetics import DistanceSet, MEET_ONLY, FULL_GRAPH
def pd_version():
return pd.__version__
def pd_format(precision=7, width=10, rows=6):
if precision: pd.set_option('precision', precision)
if width: pd.set_option('max_colwidth', width)
if rows: pd.set_option('max_rows', rows)
UTR3 = 'UTR3'
UTR5 = 'UTR5'
EXONIC = 'exonic'
INTRONIC = 'intronic'
NCRNA_INTRONIC = 'ncRNA_intronic'
SPLICING = 'splicing'
NONCODING = const.DOT
INTERGENIC = 'intergenic'
UPSTREAM = 'upstream'
DOWNSTREAM = 'downstream'
NONSYNONYMOUS = 'nonsynonymous SNV'
STOPGAIN = 'stopgain'
STOPLOSS = 'stoploss'
SYNONYMOUS = 'synonymous SNV'
UNKNOWN = 'unknown'
C_OTHER = 'Intron'
C_SYNONYMOUS = 'Synonym'
C_NONSYNONYMOUS = 'Nonsynonym'
def variant_characterization(muttype=None, loctype=None):
if loctype != EXONIC:
return C_OTHER
if muttype in [NONSYNONYMOUS, STOPGAIN, STOPLOSS]:
return C_NONSYNONYMOUS
if muttype in [SYNONYMOUS]:
return C_SYNONYMOUS
return C_OTHER
A = 'A'
C = 'C'
G = 'G'
T = 'T'
BASES = [A, C, G, T]
_TRANSPOSE = {
A : T,
T : A,
C : G,
G : C,
}
[docs]def collect_mapped_items(mlist, mapdict, sep=const.DASH):
'''
Build a *sep*-separated string of the sorted unique mappings from *mapdict*
for the members of *mlist*
sep defaults to a DASH
'''
map = [mapdict[m] if m in mapdict else m for m in mlist]
return sep.join(sorted(set(map)))
[docs]def boolean_profile(blist):
'''
Given a list of values that can be interpreted as booleans,
return a string of 1's and 0's, one per value.
'''
return const.EMPTY.join(['1' if b else '0' for b in blist])
[docs]def transversion_to_apply(row):
'''
Return the transition/transversion for a variant as pulled from
a pandas vcf dataframe, for use in
>>> df.apply(thismodule.transversion_to_apply, axis=1)
'''
if not min([row[b] in BASES for b in ('REF', 'ALT')]):
return const.DOT
if row['REF'] in [G, T]:
return const.EMPTY.join([_TRANSPOSE[b] for b in [row['REF'], row['ALT']]])
else:
return row['REF']+row['ALT']
[docs]def idx_merge(df, df2, **kwargs):
'''
Curry out the virtually always used indexing and sorting from dataframe.merge()
'''
return df.merge(df2, left_index=True, right_index=True, sort=True, **kwargs)
[docs]def stable_sort_dataframe(df, clist=None, ascending=True):
'''
Stable sort of df on the requested columns, all by default.
mergesort is the only stable sort available in pandas.
'''
if clist is None:
clist = df.columns.values.tolist()
if pd_version() >= '0.17':
df.sort_values(by=clist, inplace=True,
kind='mergesort', ascending=ascending)
else:
df.sort(by=clist, inplace=True,
kind='mergesort', ascending=ascending)
[docs]def convert_chrpos_to_locus(df, indexing=None, verify=True):
'''
Take a dataframe with CHR and POS columns and change it to a CHR:00POS
Locus format, reindexing to include Locus and other columns if desired.
indexing=[] indexes by Locus. Otherwise, indexing is a list of column
names to add to the Locus for indexing.
'''
cols = df.columns.values.tolist()
if indexing is not None:
# try-block repeated below, sorry, but a numeric index could change during transformation
# to Locus-format
try:
indexing = misc.validate_columns(cols, indexing, complain=True)
except ValueError as e:
raise LabtoolsWarning('%s(): Your indexing columns are probably wrong - %s' %
(reflection.my_methodname(), e.message))
if genomics.POS.upper() not in [c.upper() for c in cols]:
raise LabtoolsWarning('%s wants %s or %s in dataframe column list' %
(reflection.my_methodname(),
genomics.POS, genomics.POS.upper()))
poscol = (cols.index(genomics.POS.upper())
if genomics.POS.upper() in cols else cols.index(genomics.POS))
chrcol = poscol - 1
chr = cols[chrcol]
pos = cols[poscol]
locusfunc = lambda r: genomics.sortable_coord(r[chr], r[pos])
locus = df.apply(locusfunc, axis=1)
df.loc[:,genomics.LOCUS] = locus
cols = df.columns.values.tolist()
df = df[[cols[-1]]+cols[:-1]]
df = df.drop([chr, pos], axis=1)
try:
stable_sort_dataframe(df, clist=[genomics.LOCUS] + (indexing if indexing else []))
except ValueError as e:
if 'REF' in e.message:
msg = ('%s(): In this pandas you cannot index by REF/ALT - %s' %
(reflection.my_methodname(), e.message))
else:
msg = ('%s(): Probably trying to index by "categorical" columns - %s' %
(reflection.my_methodname(), e.message))
raise LabtoolsWarning(msg)
if indexing is not None:
try:
indexing = misc.validate_columns(cols, indexing, complain=True)
df.set_index([genomics.LOCUS]+indexing, inplace=True, verify_integrity=verify)
except ValueError as e:
raise LabtoolsWarning('%s(): I suspect your indexing columns are wrong - %s' %
(reflection.my_methodname(), e.args[0]))
return df
[docs]def convert_locus_to_chrpos(df, indexing=None, chromosome=genomics.CHR, verify=True):
'''
Take a dataframe with a CHR:00POS Locus format and change it to
CHR and POS columns, reindexing if desired.
By default (indexing=None) the returned dataframe has only its numeric index.
indexing=[] indexes by Locus. Otherwise, indexing is a list of column
names or indices to add to the Locus for indexing.
'''
cols = df.columns.values.tolist()
if indexing is not None:
# try-block repeated below, sorry, but a numeric index could change during transformation
# from Locus-format
try:
indexing = misc.validate_columns(cols, indexing, complain=True)
except ValueError as e:
raise LabtoolsWarning('%s(): Your indexing columns are probably wrong - %s' %
(reflection.my_methodname(), e.message))
if genomics.LOCUS.upper() in cols:
locuscol = cols.index(genomics.LOCUS.upper())
elif genomics.LOCUS.capitalize() in cols:
locuscol = cols.index(genomics.LOCUS.capitalize())
elif genomics.LOCUS in cols:
locuscol = cols.index(genomics.LOCUS)
else:
raise LabtoolsWarning('%s wants %s, %s or %s in dataframe column list' %
(reflection.my_methodname(),
genomics.LOCUS, genomics.LOCUS.upper(),
genomics.LOCUS.capitalize()))
locusstr = cols[locuscol]
locusfunc = lambda r: genomics.sortable_coord_to_vcf_coord(r[locusstr])
locus = df.apply(locusfunc, axis=1)
df.loc[:,chromosome] = [cp[0] for cp in locus]
df.loc[:,genomics.POS.upper()] = [cp[1] for cp in locus]
cols = df.columns.values.tolist()
df = df[cols[-2:]+cols[:-2]]
df = df.drop([locusstr], axis=1)
try:
stable_sort_dataframe(df, clist=[chromosome, genomics.POS.upper()] + (indexing if indexing else []))
except ValueError as e:
if 'REF' in e.message:
msg = ('%s(): In this pandas you cannot index by REF/ALT - %s' %
(reflection.my_methodname(), e.message))
else:
msg = ('%s(): Probably trying to index by "categorical" columns - %s' %
(reflection.my_methodname(), e.message))
raise LabtoolsWarning(msg)
if indexing is not None:
try:
df.set_index([chromosome, genomics.POS.upper()]+indexing,
inplace=True, verify_integrity=verify)
except ValueError as e:
raise LabtoolsWarning('%s(): I suspect your indexing columns are wrong - %s' %
(reflection.my_methodname(), e.message))
return df
[docs]def rows_showing_value(df, filter=lambda x: x > 0, toplimit=0, bottomlimit=None, label='Filtered'):
'''
Return a dataframe of the row-labels of df that meet *filter* on the columns
between toplimit and bottomlimit, with *label* used as the column name
filter is a boolean function that defaults to returning values greater than zero
toplimit defaults to the top (first) row
bottomlimit defaults to the bottom (last) row
label defaults to 'Filtered'
'''
raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' %
(reflection.my_methodname()))
id_values = df.index.values.tolist()
if toplimit and not str(toplimit).isdigit():
if toplimit in id_values:
toplimit = id_values.index(toplimit)
else:
raise LabtoolsWarning('toplimit "%s" not found in row index' %(toplimit))
if bottomlimit and not str(bottomlimit).isdigit():
if bottomlimit in id_values:
bottomlimit = id_values.index(bottomlimit)+1
else:
raise LabtoolsWarning('bottomlimit "%s" not found in row index' %(bottomlimit))
def _rsv_inner_func(row):
return [id_values[r] for r in range(toplimit, len(row[:bottomlimit])) if list(filter(row[r]))]
return pd.DataFrame(pd.Series(df.apply(_rsv_inner_func), name=label))
[docs]def columns_showing_value(df, filter=lambda x: x > 0, leftlimit=0, rightlimit=None, label='Filtered'):
'''
Return a dataframe of the column-labels of df that meet *filter*.
This is (no longer) simply a transpose of rows_showing_value, but see it for identical details,
modulo c/top/left/ c/bottom/right/.
The reason it differs is that transposing a multi-index causes a misbehavior that leads to an
exception. (which is probably a pandas bug, but as usual I feel disinclined to delve).
'''
raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' %
(reflection.my_methodname()))
if len(df.index.names) > 1:
raise LabtoolsWarning("Cannot handle DataFrame's multi-index. Pandas bug? Index is %r" %
(list(df.index.names)))
id_values = df.columns.values.tolist()
if leftlimit and not str(leftlimit).isdigit():
if leftlimit in id_values:
leftlimit = id_values.index(leftlimit)
else:
raise LabtoolsWarning('leftlimit "%s" not found in column labels' %(leftlimit))
if rightlimit and not str(rightlimit).isdigit():
if rightlimit in id_values:
rightlimit = id_values.index(rightlimit)+1
else:
raise LabtoolsWarning('rightlimit "%s" not found in column labels' %(rightlimit))
def _csv_inner_func(col):
return [id_values[r] for r in range(leftlimit, len(col[:rightlimit])) if list(filter(col[r]))]
appl = df.apply(_csv_inner_func, reduce=True, axis=1)
s = pd.Series(appl, name=label)
return pd.DataFrame(s)
[docs]def boolean_group_profile(df, grouping, l_limit=0, r_limit=None, label=None):
'''
Group the columns of dataframe df between l_limit and r_limit
using *grouping* and return a boolean_profile of the grouping on
those columns for each row.
grouping is a function on the values of the columns
l_limit and r_limit default to the leftmost and rightmost columns, ie, all
if *label* is given, a dataframe with *label* as column name is returned.
Otherwise, a Series is returned.
'''
def _bgp_inner_func(row):
id_values = df.columns.values.tolist()
groups = sorted(set([grouping(id_values[g])
for g in range(l_limit, len(row[:r_limit]))]))
gprofile = [max([row[j]
for j in range(l_limit, len(row[:r_limit]))
if g == grouping(id_values[j])])
for g in groups]
return boolean_profile(gprofile)
result = df.apply(_bgp_inner_func, axis=1)
return pd.DataFrame(pd.Series(result, name=label)) if label else result
[docs]def one_row_per_locus(df, last=False, selector=None, axis=1):
'''
Choose exactly one row for each locus, defaulting to the first,
but allowing for a selector function to choose based on some other
criteria (again defaulting to first if more than one match the criterion)
last: whether to select the last matching row instead of the first
selector: a boolean function given to df.apply(axis=1) for each row of the dataframe, to do a more refined selection
axis: I can't figure out how it would make sense to apply on axis=0, but if you can, go for it.
'''
raise LabtoolsWarning('labtools.pandas_tools.%s has not been ported to python 3.0' %
(reflection.my_methodname()))
onedf = None
for g in sorted(set(df.index.values)):
if isinstance(df.loc[g], pd.core.series.Series):
dfrow = df.loc[[g]]
else:
if selector:
indices = list(range(df.loc[[g]].shape[0]))
if not last:
# reverse so that the last row chosen by selector
# will be the smallest matching index
indices.reverse()
dfrow = None
print('Shape', df.loc[[g]].shape)
booles = df.loc[[g]].apply(selector, axis=1)
for idx in indices:
print(booles[idx], df.loc[[g]].iloc[idx][:3])
if booles[idx]:
dfrow = df.loc[[g]].iloc[idx]
if dfrow is None:
dfrow = df.loc[[g]].iloc[idx]
else:
dfrow = df.loc[g][-1:] if last else df.loc[g][0:1]
if onedf is None:
onedf = pd.DataFrame(dfrow)
else:
onedf = onedf.append(dfrow)
return onedf
[docs]def rename_column(df, fromname, toname):
'''
Change a column header from fromname to toname
'''
dfcols = df.columns.values.tolist()
fromcol = misc.validate_columns(dfcols, [fromname], listnames=False, complain=True)[0]
df.columns = dfcols[0:fromcol] + [toname] + dfcols[fromcol+1:]
[docs]def rearrange_column(df, fromcol, tocol=0):
'''
Move the column indexed at fromcol to the index at tocol, default first
'''
dfcols = df.columns.values.tolist()
fromcol, tocol = misc.validate_columns(dfcols, [fromcol, tocol],
listnames=False, complain=True)
if fromcol > tocol:
newcols = (dfcols[0:tocol] + dfcols[fromcol:fromcol+1] +
dfcols[tocol:fromcol] + dfcols[fromcol+1:])
else:
newcols = (dfcols[0:fromcol] + dfcols[fromcol+1:tocol] +
dfcols[fromcol:fromcol+1] + dfcols[tocol:])
return df[newcols]
[docs]def condense_dataframe(df, matchlist, match_rows=True, match_columns=True):
'''
Given a set of row/column names, reduce df to contain only the terms found in matchlist.
By default, reduces both rows and columns.
match_rows=False and match_columns=False do the obvious thing. Both false gives a warning.
'''
if isinstance(matchlist, pd.Series):
matchlist = list(matchlist)
if not matchlist or not (match_rows or match_columns):
raise LabtoolsWarning(reflection.my_methodname() + 'must have a useful input')
if match_rows:
if len(df.axes[0].shape) > 1:
raise LabtoolsWarning(reflection.my_methodname() + "can't handle multi-indexes yet")
filtered = [v for v in df.axes[0].values if v in matchlist]
df = df.loc[filtered,:]
if match_columns:
if len(df.axes[1].shape) > 1:
raise LabtoolsWarning(reflection.my_methodname() + "can't handle multi-indexes yet")
filtered = [v for v in df.axes[1].values if v in matchlist]
df = df.loc[:,filtered]
return df
class ScikitNaiveBayes():
def __init__(self, skscore, category_columns, classifier_column, training_fraction=1, random_seed=None):
'''
train a naive bayesian classifier on the attributes in a dataframe
return the classifier and the unclassified subset if there is one
'''
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer
import random
random.seed(random_seed)
self.cvec = CountVectorizer()
self.classifier_column = (classifier_column if isinstance(classifier_column, int)
else skscore.columns.values.tolist().index(classifier_column))
self.category_columns = category_columns
if misc.one_is_true([isinstance(c, int) for c in category_columns]):
sktrain = skscore.iloc[:,self.category_columns]
else:
sktrain = skscore.loc[:,self.category_columns]
self.actual_result = skscore.iloc[:,self.classifier_column].values
self.trainable = sktrain.apply(lambda r: const.TAB.join(r.values), axis=1).values
set_size = len(self.trainable)
tset_size = int(training_fraction*set_size)
indices = list(range(set_size))
random.shuffle(indices)
self.training_indices = sorted(indices[:tset_size])
self.training_results = [self.actual_result[idx] for idx in self.training_indices]
#print('TRAIN %d %r' % (len(self.trainable), self.trainable))
try:
training_counts = self.cvec.fit_transform([self.trainable[idx] for idx in self.training_indices])
except ValueError as v:
raise LabtoolsWarning('Probably empty training set: %s' % v)
items_left = sorted(indices[tset_size:])
self.remainder_set = [self.trainable[idx] for idx in items_left]
self.remainder_results = [self.actual_result[idx] for idx in items_left]
self.clf = MultinomialNB().fit(training_counts, self.training_results)
def predict(self, testset=None):
if testset is None:
testset = self.remainder_set
test_counts = self.cvec.transform(testset)
predicted = self.clf.predict(test_counts)
return predicted
def compare(self, predicted, scoremap=None, dump=False):
'''
Do a Pearson correlation of predicted vs actual.
'''
pdf = pd.DataFrame(pd.Series(self.remainder_results, name='Actual'))
pdf['Prediction'] = predicted
scored = pdf.applymap(lambda r: scoremap[r] if scoremap else r)
return scored if dump else scored.corr()