Source code for timeutil

# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Routines for dealing with time and time deltas.
I'm a fetishist about printing time, so that's what's emphasized.
'''



import datetime
import logging
import os
import sys

from labtools import const
from labtools import misc
from labtools import reflection
from labtools.labexceptions import LabtoolsWarning 

WEEKS_PER_DAY = 1/7.0
DAYS_PER_HOUR = 1/24.0
HOURS_PER_MINUTE = 1/60.0
MINUTES_PER_HOUR = 60.0
HOURS_PER_SECOND = 1/3600.0
MINUTES_PER_SECOND = 1/60.0
SECONDS_PER_MICROSECOND = 1/1000000.0
SECONDS_PER_MILLISECOND = 1/1000.0
MILLIS_PER_MICROSECOND = 1/1000.0
DAYS_PER_MOSTMONTHS = 30.0
DAYS_PER_WEEK = 7.0 #:
HOURS_PER_DAY = 24.0 #:
SECONDS_PER_HOUR = 3600.0 #:
SECONDS_PER_MINUTE = 60.0 #:
MICROSECONDS_PER_SECOND = 1000000.0 #:
MICROSECONDS_PER_MILLISECOND = 1000.0 #:
MILLISECONDS_PER_SECOND = 1000.0 #:

SYM_WEEKS = 'W'
SYM_DAYS = 'D'
SYM_HOURS = 'H'
SYM_MINUTES = 'M'
SYM_SECONDS = 'S'
SYM_MILLIS = 'm'
SYM_MICROS = 'u'

VALID_SYMS = [SYM_WEEKS, SYM_DAYS, SYM_HOURS, SYM_MINUTES, SYM_SECONDS, SYM_MILLIS, SYM_MICROS]

SAMPLE_DELTAS = ['W00D01H00:00:00', 'M10:00']

COMPARABLE_DEFAULT = 'HMSm'

JOINER = ', '

[docs]def convert_delta(timeunit): ''' Turn a timeunit as an int or float into a datetime.delta. Not permitting strings right now because there's a method below that would end up recurring infinitely through this method. TODO: A default parameter could fix that. ''' reflection.typecheck(timeunit, datetime.timedelta, int, float) if not isinstance(timeunit, datetime.timedelta): if type(timeunit) in [int, float]: timeunit = datetime.timedelta(seconds=timeunit) else: raise LabtoolsWarning('%s: requires a datetime.timedelta (Got a %s)' % (reflection.my_callername(), type(timeunit))) return timeunit
def comparable_delta(delta, pattern=COMPARABLE_DEFAULT, nohours=False): ''' Format a timedelta so that it will sort lexically in Jesus's ASCII. A complete format would be 'WDHMSm' or 'WDHMSu' where the abbreviations are weeks = SYM_WEEKS, days = SYM_DAYS Specifying weeks implies days. hours = SYM_HOURS, minutes = SYM_MINUTES, seconds = SYM_SECONDS, Hours alone or seconds alone shows one decimal place milliseconds = SYM_MILLISECONDS, microseconds = SYM_MICROSECONDS. Microseconds overrides milliseconds if both are provided. Default is '%s' *delta* can be either a datetime.timedelta or an interval in seconds '''.format(COMPARABLE_DEFAULT) delta = convert_delta(delta) if not pattern: pattern = COMPARABLE_DEFAULT else: if sum([char in VALID_SYMS for char in pattern]) < len(pattern): raise LabtoolsWarning("'%s' is not a valid pattern for comparable_delta: %s" % (pattern, VALID_SYMS)) weeks = SYM_WEEKS in pattern days = SYM_DAYS in pattern or weeks millis = SYM_MILLIS in pattern micros = SYM_MICROS in pattern fractions = millis or micros seconds = SYM_SECONDS in pattern or fractions minutes = SYM_MINUTES in pattern hours = SYM_HOURS in pattern or ((minutes or seconds) and days) comparable=[] if weeks: comparable.extend('W%02d' % (delta.days/DAYS_PER_WEEK)) if days: comparable.extend('D%02d' % (delta.days%DAYS_PER_WEEK if weeks else delta.days)) secs = delta.seconds if not (weeks or days): secs += delta.days*SECONDS_PER_HOUR*HOURS_PER_DAY if hours: hstring = '' if nohours else SYM_HOURS hstring += '%05.1f' if SYM_HOURS == pattern else '%02d' comparable.extend(hstring % (secs*HOURS_PER_SECOND)) secs = secs%SECONDS_PER_HOUR if minutes: mstring = const.COLON if hours else SYM_MINUTES mstring += '%02d' if seconds or hours else '%04.1f' comparable.extend(mstring % (secs*MINUTES_PER_SECOND)) secs = secs%SECONDS_PER_MINUTE if seconds: prefix = const.COLON if minutes else SYM_SECONDS chars = 2 if minutes else 4 if not fractions: if hours or days: fstring = 'd' else: fstring = '.1f' chars +=2 else: if micros: fstring = '.6f' chars += 7 else: fstring = '.3f' chars += 4 sstring = '%s%%0%d%s' % (prefix, chars, fstring) comparable.extend(sstring % (secs+delta.microseconds*SECONDS_PER_MICROSECOND)) return const.EMPTY.join(comparable) IGNORABLE_DAYS = 4*DAYS_PER_WEEK IGNORABLE_HOURS = 3*HOURS_PER_DAY IGNORABLE_MINUTES = 3*MINUTES_PER_HOUR IGNORABLE_SECONDS = 3*SECONDS_PER_MINUTE IGNORABLE_MILLISECONDS = 300 IGNORABLE_MICROSECONDS = 300 def _ignorable(term, bigunit, smallunit, factor, ignorable): ''' Test whether the small unit is enough smaller that we should ignore it. Or, if it is so big that the difference between it and the bigunit/smallunit is small enough. ''' test = (bigunit*factor >= (smallunit+1)*ignorable or (bigunit+1)*factor >= (factor-smallunit)*ignorable) #logging.debug('%s %r: %d*%d >= min(%d+1,%d-%d)*%d' % (term, test, bigunit, factor, smallunit, # factor, smallunit,ignorable)) return test
[docs]def pretty_delta(delta, weeksonly=False, nofractions=False): ''' Format a timedelta reasonably nicely, so that it doesn't print with absurd precision. i.e., no '1 month, 2 seconds' or '15.000010 seconds' It only approximates months, so times as long as a year will be off by .16 months. The message will say 'About x months.' *weeksonly* = True will print precise weeks instead. *nofractions* = True will not print fractional seconds. *delta* can be either a datetime.timedelta or an interval in seconds ''' delta = convert_delta(delta) pretty = [] days = delta.days hours = int(delta.seconds/SECONDS_PER_HOUR) seconds = delta.seconds%SECONDS_PER_HOUR minutes = int(seconds/SECONDS_PER_MINUTE) seconds = seconds%SECONDS_PER_MINUTE milliseconds = int(delta.microseconds/MICROSECONDS_PER_MILLISECOND) microseconds = delta.microseconds%MICROSECONDS_PER_MILLISECOND ignore_days = days>IGNORABLE_DAYS ignore_hours = ignore_days or _ignorable('hours', days, hours, HOURS_PER_DAY, IGNORABLE_HOURS) ignore_minutes = days or ignore_hours or _ignorable('minutes', hours, minutes, MINUTES_PER_HOUR, IGNORABLE_MINUTES) ignore_seconds = days or hours or ignore_minutes or _ignorable('seconds', minutes, seconds, SECONDS_PER_MINUTE, IGNORABLE_SECONDS) ignore_milliseconds = (days or hours or minutes or ignore_seconds or _ignorable('ms', seconds, milliseconds, MILLISECONDS_PER_SECOND, IGNORABLE_MILLISECONDS)) ignore_microseconds = (days or hours or minutes or seconds or ignore_milliseconds or _ignorable('us', milliseconds, microseconds, MICROSECONDS_PER_MILLISECOND, IGNORABLE_MICROSECONDS)) if ignore_days: return '%d weeks' % (days/DAYS_PER_WEEK+round((days%DAYS_PER_WEEK)/DAYS_PER_WEEK,0)) pretty = dict() if days: if not ignore_hours: hours += round(minutes/MINUTES_PER_HOUR, 0) days += round(hours/HOURS_PER_DAY, 0) pretty[SYM_DAYS] = '%d %s' % (days, misc.plural(days, 'day')) pass if ignore_hours: return display_pretty(pretty) if hours: if ignore_minutes: hours += round((minutes+round(seconds/SECONDS_PER_MINUTE, 0))/MINUTES_PER_HOUR, 0) pretty[SYM_HOURS] = '%d %s' % (hours, misc.plural(hours, 'hour')) else: pretty[SYM_HOURS] = '%d hr' % (hours) pass if ignore_minutes: return display_pretty(pretty) if minutes: if ignore_seconds or ignore_milliseconds: fracminutes = round((seconds+round(milliseconds/MILLISECONDS_PER_SECOND, 0))/ SECONDS_PER_MINUTE, 0) if fracminutes: ignore_seconds = True minutes += fracminutes pretty[SYM_MINUTES] = '%d min' % (minutes) pass if ignore_seconds: return display_pretty(pretty) if seconds: if nofractions or ignore_milliseconds: fracseconds = round((milliseconds+round(microseconds/MICROSECONDS_PER_MILLISECOND, 0))/ MILLISECONDS_PER_SECOND, 0) if fracseconds: ignore_milliseconds = True seconds += fracseconds pretty[SYM_SECONDS] = '%d sec' % (seconds) pass if nofractions or ignore_milliseconds: return display_pretty(pretty) if milliseconds: if seconds: pretty[SYM_SECONDS] = '%.1f sec' % (seconds + milliseconds/MILLISECONDS_PER_SECOND) elif nofractions or ignore_microseconds: milliseconds += round(microseconds/MICROSECONDS_PER_MILLISECOND, 0) pretty[SYM_MILLIS] = '%d ms' % (milliseconds) else: pretty[SYM_MILLIS] = '%.1f ms' % (delta.microseconds/MICROSECONDS_PER_MILLISECOND) else: pretty[SYM_MICROS] = '%d usec' % (microseconds) return display_pretty(pretty)
def display_pretty(terms): #debug = const.SPACE.join(['%s: %s ' % (sym, terms[sym] if sym in terms else const.DASH) for sym in VALID_SYMS]) #logging.debug(debug) return ', '.join(['%s' % (terms[sym]) for sym in VALID_SYMS if sym in terms])
[docs]def walltime(delta): ''' Format a timedelta to H:MM:SS ''' delta = convert_delta(delta) hours = int(delta.days*HOURS_PER_DAY + delta.seconds/SECONDS_PER_HOUR) seconds = delta.seconds%SECONDS_PER_HOUR minutes = int(seconds/SECONDS_PER_MINUTE) seconds = 0 if (hours or minutes > 5) else seconds%SECONDS_PER_MINUTE if hours > 5: minutes = 0 if hours: return '%d:%02d:%02d' % (hours, minutes, seconds) else: return '%d:%02d' % (minutes, seconds)
[docs]def rounded_seconds_in_delta(delta): ''' Return the seconds in datetime.timedelta, *delta*, rounded by the microseconds. Note, this differs from timedelta.total_seconds(), which should truncate, but doesn't. *delta* can be either a datetime.timedelta or an interval in seconds ''' delta = convert_delta(delta) return (int(delta.total_seconds()) + (1 if delta.microseconds > 500000 else 0))
[docs]def parse_comparable_delta(comparable): ''' Parse a comparable_delta string to return a timedelta. ''' reflection.typecheck(comparable, str) days = 0 if comparable.startswith(SYM_WEEKS): weeks = int(comparable[1:3]) days += 7*weeks comparable = comparable[3:] if comparable.startswith(SYM_DAYS): d = int(comparable[1:3]) days += d comparable = comparable[3:] seconds = 0 if comparable.startswith(SYM_HOURS): h = int(comparable[1:3]) seconds += h * 3600 comparable = comparable[3:] if comparable.startswith(SYM_MINUTES) or comparable.startswith(const.COLON): m = int(comparable[1:3]) seconds += m * 60 comparable = comparable[3:] if comparable.startswith(SYM_SECONDS) or comparable.startswith(const.COLON): seconds += int(comparable[1:3]) comparable = comparable[3:] if len(comparable): micros = float(comparable) else: micros = 0 return datetime.timedelta(days=days,seconds=seconds,microseconds=micros)
PRINTED_TIMESTAMP_FORMAT = '%Y-%m-%d_%H:%M' FILESYSTEM_TIMESTAMP_FORMAT = '%Y-%m-%d_%H-%M'
[docs]def timestamp(when=None, seconds=False, microseconds=False, printed=False): """ Return the string representation of the date controlled by a string format. """ if microseconds: seconds = True ts = ((PRINTED_TIMESTAMP_FORMAT if printed else FILESYSTEM_TIMESTAMP_FORMAT) + ('-%S' if seconds else '') + ('.%f' if microseconds else '')) if not when: when = datetime.datetime.today() return when.strftime(ts)
def parse_timestamp(atime, printed=False, seconds=False): fmt = PRINTED_TIMESTAMP_FORMAT if printed else FILESYSTEM_TIMESTAMP_FORMAT atime = str(atime) # py3: make sure we've got a string, not bytes. if seconds: fmt += ':%S' try: return datetime.datetime.strptime(atime, fmt) except ValueError as v: try: return datetime.datetime.strptime(atime.strip(), fmt) except ValueError as v: raise LabtoolsWarning('Format %s could not parse %s: %r' % (fmt, atime, v.args[0]))
[docs]def parse_colon_delta(deltastring): ''' Assume no more than hours specified, ie, 10:00 ''' terms = deltastring.split(const.COLON) seconds = int(terms[-1]) minutes = int(terms[-2]) if len(terms) > 1 else 0 hours = int(terms[0]) if len(terms) > 2 else 0 return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
[docs]def start_timer(): """ Return the current time, to be used as the first parameter to *report_timer()*. """ return datetime.datetime.now()
[docs]def report_timer(start, label, count=None, unit='unit', logger=None, stream=sys.stderr): """ Return a message containing the time it took to run a function. if *stream* supports the .write() method, invoke the message to print to the stream if *logger* is given, log the message """ delta = datetime.datetime.now() - start if 'write' in dir(stream): stream.write("Running '%s'" % (label)) report = ' took %s' % (pretty_delta(delta)) else: report = '%s: took %s' % (label, pretty_delta(delta)) if count: report += ' ({0}/{2} for {1} {3})'.format(pretty_delta(delta/count if count else delta), count, unit, misc.plural(count, unit)) if logger: logger.info(report) if 'write' in dir(stream): stream.write(report + const.NEWLINE) else: return report
[docs]def time_method(func, count=None, unit='unit', logger=None, stream=sys.stderr): """ Return a function which can be invoked to report the time it takes to execute the function *func*. """ def _time_method(*args, **kwargs): start = start_timer() result = func(*args, **kwargs) report_timer(start, func.__name__, count=count, unit=unit, logger=logger, stream=stream) return result return _time_method
[docs]def report_timed_loop(func, unit='iteration', logger=None, stream=sys.stderr): """ Return a report of how long an iterative function takes to run """ def _time_method(*args, **kwargs): start = start_timer() count = 0 while True: count += 1 keep_going = func(*args, **kwargs) if not keep_going: break return report_timer(start, '%s()' % func.__name__, count=count, unit=unit, logger=logger, stream=stream) return _time_method
_stamp_start_time = 0
[docs]def stampit(message, stream=None, logfile=None, timezero=False): ''' Print *message* with a sortable leading timestamp of elapsed time. If *timezero* is True, restart the timer (and thus print 00 seconds elapsed) ''' elapsed_stamp = printable_elapsed_stamp(timezero) pmessage = misc.spaced_list([elapsed_stamp, message]) print(pmessage, file=stream) if logfile: try: with open(logfile, 'a') as log: log.write(pmessage + const.NEWLINE) except Exception as e: raise LabtoolsWarning('%s() called from %s() with bad logfile name (%s)' % (reflection.my_methodname(), reflection.my_callername(), logfile))
[docs]def printable_elapsed_stamp(timezero=False): ''' Return a string showing elapsed time in standard sortable form. Although this is "exposed" as a "public" method, it is chiefly a utility function for *stampit()* The first invocation will return 0 elapsed time. *timezero* resets elapsed time to zero if true. ''' global _stamp_start_time now = datetime.datetime.now() if timezero or not _stamp_start_time: _stamp_start_time = now return comparable_delta(now - _stamp_start_time)