Source code for misc

# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Putting syntactic sugar on a few frequently used things.
Some might argue that this is hiding something that should
be exposed, but I think it renders simpler-looking code.
'''


import os
import sys

import re
import socket
import subprocess

from labtools import const
from labtools.labexceptions import LabtoolsWarning
from labtools import reflection

CWD = os.path.abspath(os.curdir)

[docs]def localpath(*args): ''' Return an absolute filepath joined with the current working directory ''' return os.path.abspath(os.path.join(os.curdir, *args))
[docs]def progdir(): """ Return the directory from which the running program was invoked """ invocation = sys.argv[0] invocation_dir = os.path.dirname(invocation) if invocation_dir: return os.path.realpath(invocation_dir) for dir in os.environ['PATH'].split(const.COLON): dir = os.path.realpath(dir) if os.access(os.path.join(dir, invocation), os.X_OK): return dir print('How on earth was this program invoked? (%s in %s)' % (invocation, os.environ), file=sys.stderr) return os.curdir
[docs]def program_path(*args): ''' Return an absolute filepath joined with the directory the program was invoked from, for program-relative naming. ''' return os.path.abspath(os.path.join(progdir(), *args))
[docs]def dotted_list(args, separator=const.DOT): ''' Build a string from a tuple, interfixing the *separator* (default const.DOT) if none specified ''' return separator.join(args)
[docs]def spaced_list(args): """ Return a String that contains every element of the given list and separates each element with a space. """ return dotted_list(args, separator=const.SPACE)
[docs]def nameroot(name): ''' (Emulates tcsh :r) Return the file *name* without the extension. ''' if name is None: return None return name.rsplit(const.DOT,1)[0]
[docs]def namesuffix(name): ''' (Emulates bash :e) Return the (rightmost) extension of the file *name*. ''' if name is None: return None sfx = name.rsplit(const.DOT,1)[-1] if sfx == nameroot(name): sfx = '' return sfx
[docs]def suffixed_name(base, suffix='txt', separator=const.DOT): ''' Returns a string appended by *suffix* with a *separator* between ''' if suffix.startswith(separator): suffix = suffix[1:] return dotted_list([base, suffix], separator=separator)
[docs]def first_line(multiliner): """ Return the first line of the given string with multiple lines. """ return multiliner.split(const.NEWLINE)[0]
[docs]def hostname(): ''' Return the preferred form of the host name The python docs say this is better than os.hostname() ''' return socket.gethostname()
[docs]def username(): ''' Return the value of the USER environment variable or the uid ''' if 'USER' in os.environ: return os.environ['USER'] return str(os.getuid())
[docs]def plural(count, singular=const.EMPTY, plural=None): ''' Return a singular suffix if *count* is 1, otherwise return the plural suffix Default is nothing or 's' >>> plural(1, singular='house') == 'house' >>> plural(2, singular='house') == 'houses' The other possibility is >>> plural(2, singular='house', plural='hice') == 'hice' ''' if plural is None: plural = 's' if plural in ['s', 'es']: plural = singular + plural return singular if count == 1 else plural
def file_or_gz(file, option='r', test=False): from labtools import filetools return filetools.file_or_gz(file, option, test) if 'OS' not in os.environ: os.environ['OS'] = os.uname()[0] OS = os.environ['OS']
[docs]def is_MacOSX(): '''Return whether we're running on a Mac (else Linux - no solaris/Aix, we don't do Windoze ''' return OS == 'Darwin'
[docs]def host_class(): '''Return the class of machine for software differentiation purposes. ''' if hostname().startswith('cnode'): return 'RIScluster' if hostname().startswith('chms'): return 'HMScluster' if hostname().startswith('d1prphaplotype'): return 'haps' if hostname().startswith('mdarisepi'): return 'cedric' return OS
def read_attribute_pair(line, separator=const.EQUALS): line = line.strip() if (not line) or line.startswith(const.HASH): return None, None pair = [ll.strip() for ll in line.split(separator,1)] if len(pair) != 2: raise LabtoolsWarning("%s (in %s): %s not found in line '%s'" % (reflection.my_methodname(), reflection.my_callername(), separator, line)) return pair[0], pair[1]
[docs]def get_property(line, key, separator=const.EQUALS): """ Treat a *line* as a key-value pair separated by *separator* and return the value if the key in the line matches *key*, otherwise an empty string """ try: parts = read_attribute_pair(line, separator=separator) except LabtoolsWarning: raise LabtoolsWarning('%s (in %s): %s not found in %s' % (reflection.my_methodname(), reflection.my_callername(), separator, line)) return parts[-1] if key == parts[0] else const.EMPTY
[docs]def shell_glob_filenames(globstring): ''' Choose filenames from a directory based on a unix shell glob rather than a python regex. Return a list of file paths. This came from syqada, where I chose this approach to globbing rather than filtering os.listdir() output, because I wanted args.namepattern to look like a unix shell glob so that the user would not need to learn python regex. *globstring* the string to match with unix shell glob ''' from io import StringIO try: with open('/dev/null', const.WRITE) as stderr: filestring = subprocess.check_output('ls -1d %s' % (globstring), shell=True, stderr=stderr).decode() return filestring.strip().split(const.NEWLINE) except subprocess.CalledProcessError as cpe: raise LabtoolsWarning('No files found matching pattern %s' % (globstring))
[docs]def elide_list(longlist, showthismany=2, joiner=const.SPACE, ellipses=const.DOT*3, countem=True): ''' Elide the middle elements of a "long" list, showing `showthismany` elements, split on either side. joiner is a space by default, but const.NEWLINE could also be popular ellipses is three dots by default countem will put the number of missing elements in parentheses in the middle ''' front = showthismany//2 + showthismany%2 end = showthismany//2 missing = len(longlist) - showthismany if missing < 1: return joiner.join(longlist) thecount = ('{1}({0}){1}'.format(missing, ellipses)) if countem else ellipses return joiner.join(longlist[:front] + [thecount] + longlist[-end:])
[docs]def is_valid_filepath(file, test_writable=False, accept_directory=False): ''' Return True if file exists, and optionally if it could be created or is a directory. test_writable=True implies accept a non-existent file in a writable directory. accept_directory=True implies check to see if it's a valid directory. ''' if test_writable: if not os.access(file, os.W_OK): _dir = const.DOT if not os.path.dirname(file) else os.path.dirname(file) while True: if os.access(_dir, os.W_OK): return True, None else: if os.path.isdir(_dir): return False, ("directory '%s' is not writable" % (_dir)) if _dir in (const.DOT, const.SLASH): break _dir = const.DOT if not os.path.dirname(_dir) else os.path.dirname(_dir) return True, None if not os.path.isfile(file): if os.path.isdir(file): if accept_directory: return True, None error = "'%s' is not a file" % (file) else: error = "File '%s' does not exist" % (file) return False, error if not os.access(file, os.R_OK): return False, ('%s is not readable' % ('directory' if os.path.isdir(file) else 'file', file)) return True, None
[docs]def headerdict(line, separator=const.TAB): ''' Poor man's pandas usage. Build a dict of terms in a (header) line and their column offsets, to be used as indices into the columns of the rows that follow. Also return the list of terms themselves for the sake of order. >>> line = const.TAB.join(['1', '2', '3']) >>> hdrdict, terms = headerdict(line) >>> for term in terms: >>> assert(terms[hdrdict[term]] == term) ''' w = DeprecationWarning('%s is obsolete, use line_to_dict() instead' % (reflection.my_methodname())) if True: raise w else: print(w.args[0], file=sys.stderr) hdrdict = line_to_dict(line, None, separator) return hdrdict, list(hdrdict.keys())
[docs]def line_to_dict(line, hdrdict, separator=const.TAB): ''' Poor man's pandas usage. Return an OrderedDict of values keyed by the terms in hdrdict. If hdrdict is None, the values are the indices of the keys in the line, to be used as indices into the columns of the rows that follow. This is meant to replace headerdict above. >>> line = const.TAB.join(['1', '2', '3']) >>> hdrdict = line_to_dict(line, None) >>> for term in terms: >>> assert(terms[hdrdict[term]] == term) ''' from collections import OrderedDict values = line.rstrip(const.NEWLINE).split(separator) if hdrdict is None: return OrderedDict(zip(values, list(range(len(values))))) # guard against short lines for idx in range(len(values),len(hdrdict)): values.append(const.EMPTY) return OrderedDict(zip(list(hdrdict.keys()), values))
[docs]def validate_columns(reference, columns, listnames=True, complain=False): ''' Return a list of column indices from a possibly heterogeneous list of column names, integers, or integer strings using reference (list or dict) as a guide if *complain*, and a column name is not found, then fail with a LabtoolsWarning ''' rlist = [] isdict = isinstance(reference, dict) for c in columns: try: if isinstance(c, str): if c in reference: c = reference[c] if isdict else reference.index(c) else: c = int(c) # fall through for range check if isinstance(c, int): newc = len(reference)+c if c < 0 else c if not 0 <= newc < len(reference): raise ValueError('out of range') c = newc rlist.append(c) except Exception as e: if complain: raise ValueError('%dth element "%r": %r' % (columns.index(c), c, e)) return [reference[c] for c in rlist] if listnames else rlist
[docs]def expand_env_vars(term, extra=None, locationmsg='input'): ''' Find $terms in a string and substitute them first from extra if it is defined, and then from the environment, throwing an error if any term is not found. ''' # TODO fix const.DOLLAR in labtools.const save = term while '$' in term: m = re.search(r'(\$[\w\d]+)', term) if not m: args.add_error("Unexpected use of $ seems invalid in '%s'" % (save)) break else: possible = m.group(1)[1:] if extra and possible in extra: term = term.replace(m.group(1), extra[possible]) elif possible in os.environ: term = term.replace(m.group(1), os.environ[possible]) else: raise UserWarning('''Environment variable '{}' found in {} is undefined.'''. format(possible, locationmsg)) break return term
[docs]def find_files(dir, depth=20): ''' Create and return a recursive list of all file objects (exclusive of directories) contained in and below dir. depth constrains the search to depth recursions (depth=0 to check only this directory) ''' files = [] for path in [os.path.join(dir, file) for file in os.listdir(dir)]: if path.startswith('./'): path = path[2:] if os.path.isdir(path): if depth > 0: files.extend(find_files(path, depth-1)) else: files.append(path) return files
[docs]def host_and_path(file): ''' Return a string identifying a file by its full path and host ''' return '%s:%s' % (hostname(), os.path.abspath(file))
[docs]def zero_div(num, denom, warn=sys.stderr, fail=False): ''' make division proof against zero divisor ''' if denom == 0.0: if warn or fail: format = 'Zero divisor ({}/{}) found in %s'.format('%d' if isinstance(num, int) else '%.1f', '%d' if isinstance(denom, int) else '%.1f') msg = format % (num, denom, reflection.my_callername()) if fail: raise LabtoolsWarning(msg) warn.write(msg+const.NEWLINE) return 0 return (num/denom)
[docs]def silent_zero(count, label, format='%3d %s,'): ''' Return a formatted count and label, or a space-padded string if count is zero format the count to 3 places by default. ''' output = format % (count, label) return const.SPACE*len(output) if not count else output
[docs]def pct(num, denom, decimals=1, symbol=const.PERCENT, warn=None, fail=False): ''' Return num/denom as a percentage, ignoring div by zero by default. ''' return ('%.{}f%s'.format(decimals) % (100*zero_div(num, denom), symbol))
[docs]def spaced_row(columnlist, size=7, aligned=None, joiner=const.SPACE): ''' Return a string from a list of elements, spaced at intervals of size+1, aligned according to an alignment string, by default left-justified size determines the (minimum) width of the column, default 7 joiner is a string to connect the columns, space by default. ''' if not aligned: aligned = 'l' aligned += aligned[-1]*(len(columnlist)-len(aligned)) return joiner.join(['%{}{}s'.format(const.DASH if aligned[i] == 'l' else const.EMPTY, size) % str(columnlist[i]) for i in range(len(columnlist))]).rstrip()
[docs]def one_is_true(list): ''' Syntactic sugar for max() of a list of booleans to clarify a favorite usage ''' return max(list)
[docs]def all_are_true(list): ''' Syntactic sugar for min() of a list of booleans to clarify a favorite usage ''' return min(list)
[docs]def frame_label(string, frame, width=80): ''' Wrap a string in as many copies of *frame* as necessary to fill *width* characters. ''' halfstring = len(string)//2 len1 = width//2 - 1 - halfstring len2 = width//2 - 1 - halfstring if not len(string)%2 else int(width//2) - 2 - halfstring string = string.join([const.SPACE]*2) return string.join([frame*len1, frame*len2])
[docs]def item_selector(items, columns=2, interactive=True, stdout=sys.stdout, stdin=sys.stdin): ''' Display the items (a sequence object) in a numbered list in *columns* columns. If interactive, return the item matching the numbered selection, or -1 if 'q' given or not interactive. stdout and stdin can be fed as StringIO for special purposes, including testing. ''' width = 80//columns offset = len(items)//columns+(1 if len(items)%columns else 0) format = '%-{}s'.format(width) for idx in range(offset): line = const.EMPTY for ix in range(columns): jdx = ix*offset+idx element = ('(%2d)\t%s'%(jdx, items[jdx]) if jdx<len(items) else const.EMPTY) line += (format % (element)) stdout.write(line.rstrip() + const.NEWLINE) interactions = 0 while interactive: interactions += 1 if interactions > 5: print("We seem not to be communicating. I wanted numeric digits.", file=stdout) break stdout.write('Select the number between 0 and %d corresponding to your choice ... ' % (len(items)-1)) stdout.flush() line = stdin.readline().strip() if line == 'q': interactive = False elif line.isdigit() and 0 <= int(line) < len(items): return int(line) else: stdout.write("(Please select a number from 0 to %d or 'q' to quit) " % (len(items)-1)) stdout.flush() return -1