# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Putting syntactic sugar on a few frequently used things.
Some might argue that this is hiding something that should
be exposed, but I think it renders simpler-looking code.
'''
import os
import sys
import re
import socket
import subprocess
from labtools import const
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection
CWD = os.path.abspath(os.curdir)
[docs]def localpath(*args):
'''
Return an absolute filepath joined with the current working directory
'''
return os.path.abspath(os.path.join(os.curdir, *args))
[docs]def progdir():
"""
Return the directory from which the running program was invoked
"""
invocation = sys.argv[0]
invocation_dir = os.path.dirname(invocation)
if invocation_dir:
return os.path.realpath(invocation_dir)
for dir in os.environ['PATH'].split(const.COLON):
dir = os.path.realpath(dir)
if os.access(os.path.join(dir, invocation), os.X_OK):
return dir
print('How on earth was this program invoked? (%s in %s)' % (invocation, os.environ), file=sys.stderr)
return os.curdir
[docs]def program_path(*args):
'''
Return an absolute filepath joined with the directory the program was
invoked from, for program-relative naming.
'''
return os.path.abspath(os.path.join(progdir(), *args))
[docs]def dotted_list(args, separator=const.DOT):
'''
Build a string from a tuple, interfixing the *separator*
(default const.DOT) if none specified
'''
return separator.join(args)
[docs]def spaced_list(args):
"""
Return a String that contains every element of the given list and
separates each element with a space.
"""
return dotted_list(args, separator=const.SPACE)
[docs]def nameroot(name):
'''
(Emulates tcsh :r)
Return the file *name* without the extension.
'''
if name is None: return None
return name.rsplit(const.DOT,1)[0]
[docs]def namesuffix(name):
'''
(Emulates bash :e)
Return the (rightmost) extension of the file *name*.
'''
if name is None: return None
sfx = name.rsplit(const.DOT,1)[-1]
if sfx == nameroot(name):
sfx = ''
return sfx
[docs]def suffixed_name(base, suffix='txt', separator=const.DOT):
'''
Returns a string appended by *suffix* with a *separator* between
'''
if suffix.startswith(separator):
suffix = suffix[1:]
return dotted_list([base, suffix], separator=separator)
[docs]def first_line(multiliner):
"""
Return the first line of the given string with multiple lines.
"""
return multiliner.split(const.NEWLINE)[0]
[docs]def hostname():
'''
Return the preferred form of the host name
The python docs say this is better than os.hostname()
'''
return socket.gethostname()
[docs]def username():
'''
Return the value of the USER environment variable or the uid
'''
if 'USER' in os.environ:
return os.environ['USER']
return str(os.getuid())
[docs]def plural(count, singular=const.EMPTY, plural=None):
'''
Return a singular suffix if *count* is 1, otherwise return the plural suffix
Default is nothing or 's'
>>> plural(1, singular='house') == 'house'
>>> plural(2, singular='house') == 'houses'
The other possibility is
>>> plural(2, singular='house', plural='hice') == 'hice'
'''
if plural is None:
plural = 's'
if plural in ['s', 'es']:
plural = singular + plural
return singular if count == 1 else plural
def file_or_gz(file, option='r', test=False):
from labtools import filetools
return filetools.file_or_gz(file, option, test)
if 'OS' not in os.environ:
os.environ['OS'] = os.uname()[0]
OS = os.environ['OS']
[docs]def is_MacOSX():
'''Return whether we're running on a Mac (else Linux - no solaris/Aix, we don't do Windoze
'''
return OS == 'Darwin'
[docs]def host_class():
'''Return the class of machine for software differentiation purposes.
'''
if hostname().startswith('cnode'):
return 'RIScluster'
if hostname().startswith('chms'):
return 'HMScluster'
if hostname().startswith('d1prphaplotype'):
return 'haps'
if hostname().startswith('mdarisepi'):
return 'cedric'
return OS
def read_attribute_pair(line, separator=const.EQUALS):
line = line.strip()
if (not line) or line.startswith(const.HASH): return None, None
pair = [ll.strip() for ll in line.split(separator,1)]
if len(pair) != 2:
raise LabtoolsWarning("%s (in %s): %s not found in line '%s'" % (reflection.my_methodname(),
reflection.my_callername(),
separator, line))
return pair[0], pair[1]
[docs]def get_property(line, key, separator=const.EQUALS):
"""
Treat a *line* as a key-value pair separated by *separator* and return the value
if the key in the line matches *key*, otherwise an empty string
"""
try:
parts = read_attribute_pair(line, separator=separator)
except LabtoolsWarning:
raise LabtoolsWarning('%s (in %s): %s not found in %s' % (reflection.my_methodname(),
reflection.my_callername(),
separator, line))
return parts[-1] if key == parts[0] else const.EMPTY
[docs]def shell_glob_filenames(globstring):
'''
Choose filenames from a directory based on a unix shell glob rather than
a python regex. Return a list of file paths.
This came from syqada, where I chose this approach to globbing
rather than filtering os.listdir() output,
because I wanted args.namepattern to look like a unix shell glob so
that the user would not need to learn python regex.
*globstring* the string to match with unix shell glob
'''
from io import StringIO
try:
with open('/dev/null', const.WRITE) as stderr:
filestring = subprocess.check_output('ls -1d %s' %
(globstring), shell=True, stderr=stderr).decode()
return filestring.strip().split(const.NEWLINE)
except subprocess.CalledProcessError as cpe:
raise LabtoolsWarning('No files found matching pattern %s' % (globstring))
[docs]def elide_list(longlist, showthismany=2, joiner=const.SPACE, ellipses=const.DOT*3, countem=True):
'''
Elide the middle elements of a "long" list, showing `showthismany` elements, split on either side.
joiner is a space by default, but const.NEWLINE could also be popular
ellipses is three dots by default
countem will put the number of missing elements in parentheses in the middle
'''
front = showthismany//2 + showthismany%2
end = showthismany//2
missing = len(longlist) - showthismany
if missing < 1:
return joiner.join(longlist)
thecount = ('{1}({0}){1}'.format(missing, ellipses)) if countem else ellipses
return joiner.join(longlist[:front] + [thecount] + longlist[-end:])
[docs]def is_valid_filepath(file, test_writable=False, accept_directory=False):
'''
Return True if file exists, and optionally if it could be created or is a directory.
test_writable=True implies accept a non-existent file in a writable directory.
accept_directory=True implies check to see if it's a valid directory.
'''
if test_writable:
if not os.access(file, os.W_OK):
_dir = const.DOT if not os.path.dirname(file) else os.path.dirname(file)
while True:
if os.access(_dir, os.W_OK):
return True, None
else:
if os.path.isdir(_dir):
return False, ("directory '%s' is not writable" % (_dir))
if _dir in (const.DOT, const.SLASH):
break
_dir = const.DOT if not os.path.dirname(_dir) else os.path.dirname(_dir)
return True, None
if not os.path.isfile(file):
if os.path.isdir(file):
if accept_directory:
return True, None
error = "'%s' is not a file" % (file)
else:
error = "File '%s' does not exist" % (file)
return False, error
if not os.access(file, os.R_OK):
return False, ('%s is not readable' %
('directory' if os.path.isdir(file) else 'file', file))
return True, None
[docs]def line_to_dict(line, hdrdict, separator=const.TAB):
'''
Poor man's pandas usage.
Return an OrderedDict of values keyed by the terms in hdrdict.
If hdrdict is None, the values are the indices of the keys in the line,
to be used as indices into the columns of the rows that follow.
This is meant to replace headerdict above.
>>> line = const.TAB.join(['1', '2', '3'])
>>> hdrdict = line_to_dict(line, None)
>>> for term in terms:
>>> assert(terms[hdrdict[term]] == term)
'''
from collections import OrderedDict
values = line.rstrip(const.NEWLINE).split(separator)
if hdrdict is None:
return OrderedDict(zip(values, list(range(len(values)))))
# guard against short lines
for idx in range(len(values),len(hdrdict)):
values.append(const.EMPTY)
return OrderedDict(zip(list(hdrdict.keys()), values))
[docs]def validate_columns(reference, columns, listnames=True, complain=False):
'''
Return a list of column indices from a possibly heterogeneous list of
column names, integers, or integer strings using reference (list or dict) as a guide
if *complain*, and a column name is not found, then fail with a LabtoolsWarning
'''
rlist = []
isdict = isinstance(reference, dict)
for c in columns:
try:
if isinstance(c, str):
if c in reference:
c = reference[c] if isdict else reference.index(c)
else:
c = int(c) # fall through for range check
if isinstance(c, int):
newc = len(reference)+c if c < 0 else c
if not 0 <= newc < len(reference):
raise ValueError('out of range')
c = newc
rlist.append(c)
except Exception as e:
if complain:
raise ValueError('%dth element "%r": %r' % (columns.index(c), c, e))
return [reference[c] for c in rlist] if listnames else rlist
[docs]def expand_env_vars(term, extra=None, locationmsg='input'):
'''
Find $terms in a string and substitute them first from extra if it is defined,
and then from the environment, throwing an error if any term is not found.
'''
# TODO fix const.DOLLAR in labtools.const
save = term
while '$' in term:
m = re.search(r'(\$[\w\d]+)', term)
if not m:
args.add_error("Unexpected use of $ seems invalid in '%s'" % (save))
break
else:
possible = m.group(1)[1:]
if extra and possible in extra:
term = term.replace(m.group(1), extra[possible])
elif possible in os.environ:
term = term.replace(m.group(1), os.environ[possible])
else:
raise UserWarning('''Environment variable '{}' found in {} is undefined.'''.
format(possible, locationmsg))
break
return term
[docs]def find_files(dir, depth=20):
'''
Create and return a recursive list of all file objects (exclusive of directories)
contained in and below dir.
depth constrains the search to depth recursions (depth=0 to check only this directory)
'''
files = []
for path in [os.path.join(dir, file) for file in os.listdir(dir)]:
if path.startswith('./'):
path = path[2:]
if os.path.isdir(path):
if depth > 0:
files.extend(find_files(path, depth-1))
else:
files.append(path)
return files
[docs]def host_and_path(file):
'''
Return a string identifying a file by its full path and host
'''
return '%s:%s' % (hostname(), os.path.abspath(file))
[docs]def zero_div(num, denom, warn=sys.stderr, fail=False):
'''
make division proof against zero divisor
'''
if denom == 0.0:
if warn or fail:
format = 'Zero divisor ({}/{}) found in %s'.format('%d' if isinstance(num, int) else '%.1f',
'%d' if isinstance(denom, int) else '%.1f')
msg = format % (num, denom, reflection.my_callername())
if fail:
raise LabtoolsWarning(msg)
warn.write(msg+const.NEWLINE)
return 0
return (num/denom)
[docs]def silent_zero(count, label, format='%3d %s,'):
'''
Return a formatted count and label, or a space-padded string if count is zero
format the count to 3 places by default.
'''
output = format % (count, label)
return const.SPACE*len(output) if not count else output
[docs]def pct(num, denom, decimals=1, symbol=const.PERCENT, warn=None, fail=False):
'''
Return num/denom as a percentage, ignoring div by zero by default.
'''
return ('%.{}f%s'.format(decimals) % (100*zero_div(num, denom), symbol))
[docs]def spaced_row(columnlist, size=7, aligned=None, joiner=const.SPACE):
'''
Return a string from a list of elements, spaced at intervals of size+1,
aligned according to an alignment string, by default left-justified
size determines the (minimum) width of the column, default 7
joiner is a string to connect the columns, space by default.
'''
if not aligned:
aligned = 'l'
aligned += aligned[-1]*(len(columnlist)-len(aligned))
return joiner.join(['%{}{}s'.format(const.DASH if aligned[i] == 'l' else const.EMPTY, size) %
str(columnlist[i]) for i in range(len(columnlist))]).rstrip()
[docs]def one_is_true(list):
'''
Syntactic sugar for max() of a list of booleans to clarify a favorite usage
'''
return max(list)
[docs]def all_are_true(list):
'''
Syntactic sugar for min() of a list of booleans to clarify a favorite usage
'''
return min(list)
[docs]def frame_label(string, frame, width=80):
'''
Wrap a string in as many copies of *frame* as necessary to fill *width* characters.
'''
halfstring = len(string)//2
len1 = width//2 - 1 - halfstring
len2 = width//2 - 1 - halfstring if not len(string)%2 else int(width//2) - 2 - halfstring
string = string.join([const.SPACE]*2)
return string.join([frame*len1, frame*len2])
[docs]def item_selector(items, columns=2, interactive=True, stdout=sys.stdout, stdin=sys.stdin):
'''
Display the items (a sequence object) in a numbered list in *columns* columns.
If interactive, return the item matching the numbered selection, or -1 if 'q' given or not interactive.
stdout and stdin can be fed as StringIO for special purposes, including testing.
'''
width = 80//columns
offset = len(items)//columns+(1 if len(items)%columns else 0)
format = '%-{}s'.format(width)
for idx in range(offset):
line = const.EMPTY
for ix in range(columns):
jdx = ix*offset+idx
element = ('(%2d)\t%s'%(jdx, items[jdx]) if jdx<len(items) else const.EMPTY)
line += (format % (element))
stdout.write(line.rstrip() + const.NEWLINE)
interactions = 0
while interactive:
interactions += 1
if interactions > 5:
print("We seem not to be communicating. I wanted numeric digits.", file=stdout)
break
stdout.write('Select the number between 0 and %d corresponding to your choice ... ' % (len(items)-1))
stdout.flush()
line = stdin.readline().strip()
if line == 'q':
interactive = False
elif line.isdigit() and 0 <= int(line) < len(items):
return int(line)
else:
stdout.write("(Please select a number from 0 to %d or 'q' to quit) " % (len(items)-1))
stdout.flush()
return -1