# labtools, Copyright (C) 2017 Jerry Fowler and Paul Scheet.
# This program comes with ABSOLUTELY NO WARRANTY. It is licensed under
# GNU GPL Version 3. License and warranty may be viewed in the manual.
'''
Routines for dealing with time and time deltas.
I'm a fetishist about printing time, so that's what's emphasized.
'''
import datetime
import logging
import os
import sys
from labtools import const
from labtools import misc
from labtools import reflection
from labtools.labexceptions import LabtoolsWarning
WEEKS_PER_DAY = 1/7.0
DAYS_PER_HOUR = 1/24.0
HOURS_PER_MINUTE = 1/60.0
MINUTES_PER_HOUR = 60.0
HOURS_PER_SECOND = 1/3600.0
MINUTES_PER_SECOND = 1/60.0
SECONDS_PER_MICROSECOND = 1/1000000.0
SECONDS_PER_MILLISECOND = 1/1000.0
MILLIS_PER_MICROSECOND = 1/1000.0
DAYS_PER_MOSTMONTHS = 30.0
DAYS_PER_WEEK = 7.0 #:
HOURS_PER_DAY = 24.0 #:
SECONDS_PER_HOUR = 3600.0 #:
SECONDS_PER_MINUTE = 60.0 #:
MICROSECONDS_PER_SECOND = 1000000.0 #:
MICROSECONDS_PER_MILLISECOND = 1000.0 #:
MILLISECONDS_PER_SECOND = 1000.0 #:
SYM_WEEKS = 'W'
SYM_DAYS = 'D'
SYM_HOURS = 'H'
SYM_MINUTES = 'M'
SYM_SECONDS = 'S'
SYM_MILLIS = 'm'
SYM_MICROS = 'u'
VALID_SYMS = [SYM_WEEKS, SYM_DAYS, SYM_HOURS, SYM_MINUTES, SYM_SECONDS, SYM_MILLIS, SYM_MICROS]
SAMPLE_DELTAS = ['W00D01H00:00:00', 'M10:00']
COMPARABLE_DEFAULT = 'HMSm'
JOINER = ', '
[docs]def convert_delta(timeunit):
'''
Turn a timeunit as an int or float into a datetime.delta.
Not permitting strings right now because there's a method below that would
end up recurring infinitely through this method. TODO: A default parameter could fix that.
'''
reflection.typecheck(timeunit, datetime.timedelta, int, float)
if not isinstance(timeunit, datetime.timedelta):
if type(timeunit) in [int, float]:
timeunit = datetime.timedelta(seconds=timeunit)
else:
raise LabtoolsWarning('%s: requires a datetime.timedelta (Got a %s)' %
(reflection.my_callername(), type(timeunit)))
return timeunit
def comparable_delta(delta, pattern=COMPARABLE_DEFAULT, nohours=False):
'''
Format a timedelta so that it will sort lexically in Jesus's ASCII.
A complete format would be 'WDHMSm' or 'WDHMSu' where the abbreviations are
weeks = SYM_WEEKS, days = SYM_DAYS
Specifying weeks implies days.
hours = SYM_HOURS, minutes = SYM_MINUTES, seconds = SYM_SECONDS,
Hours alone or seconds alone shows one decimal place
milliseconds = SYM_MILLISECONDS, microseconds = SYM_MICROSECONDS.
Microseconds overrides milliseconds if both are provided.
Default is '%s'
*delta* can be either a datetime.timedelta or an interval in seconds
'''.format(COMPARABLE_DEFAULT)
delta = convert_delta(delta)
if not pattern:
pattern = COMPARABLE_DEFAULT
else:
if sum([char in VALID_SYMS for char in pattern]) < len(pattern):
raise LabtoolsWarning("'%s' is not a valid pattern for comparable_delta: %s" %
(pattern, VALID_SYMS))
weeks = SYM_WEEKS in pattern
days = SYM_DAYS in pattern or weeks
millis = SYM_MILLIS in pattern
micros = SYM_MICROS in pattern
fractions = millis or micros
seconds = SYM_SECONDS in pattern or fractions
minutes = SYM_MINUTES in pattern
hours = SYM_HOURS in pattern or ((minutes or seconds) and days)
comparable=[]
if weeks: comparable.extend('W%02d' % (delta.days/DAYS_PER_WEEK))
if days: comparable.extend('D%02d' % (delta.days%DAYS_PER_WEEK if weeks else delta.days))
secs = delta.seconds
if not (weeks or days):
secs += delta.days*SECONDS_PER_HOUR*HOURS_PER_DAY
if hours:
hstring = '' if nohours else SYM_HOURS
hstring += '%05.1f' if SYM_HOURS == pattern else '%02d'
comparable.extend(hstring % (secs*HOURS_PER_SECOND))
secs = secs%SECONDS_PER_HOUR
if minutes:
mstring = const.COLON if hours else SYM_MINUTES
mstring += '%02d' if seconds or hours else '%04.1f'
comparable.extend(mstring % (secs*MINUTES_PER_SECOND))
secs = secs%SECONDS_PER_MINUTE
if seconds:
prefix = const.COLON if minutes else SYM_SECONDS
chars = 2 if minutes else 4
if not fractions:
if hours or days:
fstring = 'd'
else:
fstring = '.1f'
chars +=2
else:
if micros:
fstring = '.6f'
chars += 7
else:
fstring = '.3f'
chars += 4
sstring = '%s%%0%d%s' % (prefix, chars, fstring)
comparable.extend(sstring % (secs+delta.microseconds*SECONDS_PER_MICROSECOND))
return const.EMPTY.join(comparable)
IGNORABLE_DAYS = 4*DAYS_PER_WEEK
IGNORABLE_HOURS = 3*HOURS_PER_DAY
IGNORABLE_MINUTES = 3*MINUTES_PER_HOUR
IGNORABLE_SECONDS = 3*SECONDS_PER_MINUTE
IGNORABLE_MILLISECONDS = 300
IGNORABLE_MICROSECONDS = 300
def _ignorable(term, bigunit, smallunit, factor, ignorable):
'''
Test whether the small unit is enough smaller that we should ignore it.
Or, if it is so big that the difference between it and the bigunit/smallunit is small enough.
'''
test = (bigunit*factor >= (smallunit+1)*ignorable or
(bigunit+1)*factor >= (factor-smallunit)*ignorable)
#logging.debug('%s %r: %d*%d >= min(%d+1,%d-%d)*%d' % (term, test, bigunit, factor, smallunit,
# factor, smallunit,ignorable))
return test
[docs]def pretty_delta(delta, weeksonly=False, nofractions=False):
'''
Format a timedelta reasonably nicely, so that it doesn't print with
absurd precision. i.e., no '1 month, 2 seconds' or '15.000010 seconds'
It only approximates months, so times as long as a year will be off by
.16 months. The message will say 'About x months.'
*weeksonly* = True will print precise weeks instead.
*nofractions* = True will not print fractional seconds.
*delta* can be either a datetime.timedelta or an interval in seconds
'''
delta = convert_delta(delta)
pretty = []
days = delta.days
hours = int(delta.seconds/SECONDS_PER_HOUR)
seconds = delta.seconds%SECONDS_PER_HOUR
minutes = int(seconds/SECONDS_PER_MINUTE)
seconds = seconds%SECONDS_PER_MINUTE
milliseconds = int(delta.microseconds/MICROSECONDS_PER_MILLISECOND)
microseconds = delta.microseconds%MICROSECONDS_PER_MILLISECOND
ignore_days = days>IGNORABLE_DAYS
ignore_hours = ignore_days or _ignorable('hours', days, hours, HOURS_PER_DAY, IGNORABLE_HOURS)
ignore_minutes = days or ignore_hours or _ignorable('minutes', hours, minutes, MINUTES_PER_HOUR, IGNORABLE_MINUTES)
ignore_seconds = days or hours or ignore_minutes or _ignorable('seconds', minutes, seconds, SECONDS_PER_MINUTE, IGNORABLE_SECONDS)
ignore_milliseconds = (days or hours or minutes or ignore_seconds or
_ignorable('ms', seconds, milliseconds, MILLISECONDS_PER_SECOND,
IGNORABLE_MILLISECONDS))
ignore_microseconds = (days or hours or minutes or seconds or ignore_milliseconds or
_ignorable('us', milliseconds, microseconds, MICROSECONDS_PER_MILLISECOND,
IGNORABLE_MICROSECONDS))
if ignore_days:
return '%d weeks' % (days/DAYS_PER_WEEK+round((days%DAYS_PER_WEEK)/DAYS_PER_WEEK,0))
pretty = dict()
if days:
if not ignore_hours:
hours += round(minutes/MINUTES_PER_HOUR, 0)
days += round(hours/HOURS_PER_DAY, 0)
pretty[SYM_DAYS] = '%d %s' % (days, misc.plural(days, 'day'))
pass
if ignore_hours:
return display_pretty(pretty)
if hours:
if ignore_minutes:
hours += round((minutes+round(seconds/SECONDS_PER_MINUTE, 0))/MINUTES_PER_HOUR, 0)
pretty[SYM_HOURS] = '%d %s' % (hours, misc.plural(hours, 'hour'))
else:
pretty[SYM_HOURS] = '%d hr' % (hours)
pass
if ignore_minutes:
return display_pretty(pretty)
if minutes:
if ignore_seconds or ignore_milliseconds:
fracminutes = round((seconds+round(milliseconds/MILLISECONDS_PER_SECOND, 0))/
SECONDS_PER_MINUTE, 0)
if fracminutes:
ignore_seconds = True
minutes += fracminutes
pretty[SYM_MINUTES] = '%d min' % (minutes)
pass
if ignore_seconds:
return display_pretty(pretty)
if seconds:
if nofractions or ignore_milliseconds:
fracseconds = round((milliseconds+round(microseconds/MICROSECONDS_PER_MILLISECOND, 0))/
MILLISECONDS_PER_SECOND, 0)
if fracseconds:
ignore_milliseconds = True
seconds += fracseconds
pretty[SYM_SECONDS] = '%d sec' % (seconds)
pass
if nofractions or ignore_milliseconds:
return display_pretty(pretty)
if milliseconds:
if seconds:
pretty[SYM_SECONDS] = '%.1f sec' % (seconds + milliseconds/MILLISECONDS_PER_SECOND)
elif nofractions or ignore_microseconds:
milliseconds += round(microseconds/MICROSECONDS_PER_MILLISECOND, 0)
pretty[SYM_MILLIS] = '%d ms' % (milliseconds)
else:
pretty[SYM_MILLIS] = '%.1f ms' % (delta.microseconds/MICROSECONDS_PER_MILLISECOND)
else:
pretty[SYM_MICROS] = '%d usec' % (microseconds)
return display_pretty(pretty)
def display_pretty(terms):
#debug = const.SPACE.join(['%s: %s ' % (sym, terms[sym] if sym in terms else const.DASH) for sym in VALID_SYMS])
#logging.debug(debug)
return ', '.join(['%s' % (terms[sym]) for sym in VALID_SYMS if sym in terms])
[docs]def walltime(delta):
'''
Format a timedelta to H:MM:SS
'''
delta = convert_delta(delta)
hours = int(delta.days*HOURS_PER_DAY + delta.seconds/SECONDS_PER_HOUR)
seconds = delta.seconds%SECONDS_PER_HOUR
minutes = int(seconds/SECONDS_PER_MINUTE)
seconds = 0 if (hours or minutes > 5) else seconds%SECONDS_PER_MINUTE
if hours > 5:
minutes = 0
if hours:
return '%d:%02d:%02d' % (hours, minutes, seconds)
else:
return '%d:%02d' % (minutes, seconds)
[docs]def rounded_seconds_in_delta(delta):
'''
Return the seconds in datetime.timedelta, *delta*, rounded by the microseconds.
Note, this differs from timedelta.total_seconds(), which should truncate, but doesn't.
*delta* can be either a datetime.timedelta or an interval in seconds
'''
delta = convert_delta(delta)
return (int(delta.total_seconds()) + (1 if delta.microseconds > 500000 else 0))
[docs]def parse_comparable_delta(comparable):
'''
Parse a comparable_delta string to return a timedelta.
'''
reflection.typecheck(comparable, str)
days = 0
if comparable.startswith(SYM_WEEKS):
weeks = int(comparable[1:3])
days += 7*weeks
comparable = comparable[3:]
if comparable.startswith(SYM_DAYS):
d = int(comparable[1:3])
days += d
comparable = comparable[3:]
seconds = 0
if comparable.startswith(SYM_HOURS):
h = int(comparable[1:3])
seconds += h * 3600
comparable = comparable[3:]
if comparable.startswith(SYM_MINUTES) or comparable.startswith(const.COLON):
m = int(comparable[1:3])
seconds += m * 60
comparable = comparable[3:]
if comparable.startswith(SYM_SECONDS) or comparable.startswith(const.COLON):
seconds += int(comparable[1:3])
comparable = comparable[3:]
if len(comparable):
micros = float(comparable)
else:
micros = 0
return datetime.timedelta(days=days,seconds=seconds,microseconds=micros)
PRINTED_TIMESTAMP_FORMAT = '%Y-%m-%d_%H:%M'
FILESYSTEM_TIMESTAMP_FORMAT = '%Y-%m-%d_%H-%M'
[docs]def timestamp(when=None, seconds=False, microseconds=False, printed=False):
"""
Return the string representation of the date controlled by a string format.
"""
if microseconds:
seconds = True
ts = ((PRINTED_TIMESTAMP_FORMAT if printed else FILESYSTEM_TIMESTAMP_FORMAT) +
('-%S' if seconds else '') +
('.%f' if microseconds else ''))
if not when:
when = datetime.datetime.today()
return when.strftime(ts)
def parse_timestamp(atime, printed=False, seconds=False):
fmt = PRINTED_TIMESTAMP_FORMAT if printed else FILESYSTEM_TIMESTAMP_FORMAT
atime = str(atime) # py3: make sure we've got a string, not bytes.
if seconds:
fmt += ':%S'
try:
return datetime.datetime.strptime(atime, fmt)
except ValueError as v:
try:
return datetime.datetime.strptime(atime.strip(), fmt)
except ValueError as v:
raise LabtoolsWarning('Format %s could not parse %s: %r' %
(fmt, atime, v.args[0]))
[docs]def parse_colon_delta(deltastring):
'''
Assume no more than hours specified, ie, 10:00
'''
terms = deltastring.split(const.COLON)
seconds = int(terms[-1])
minutes = int(terms[-2]) if len(terms) > 1 else 0
hours = int(terms[0]) if len(terms) > 2 else 0
return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
[docs]def start_timer():
"""
Return the current time, to be used as the first parameter to *report_timer()*.
"""
return datetime.datetime.now()
[docs]def report_timer(start, label, count=None, unit='unit', logger=None, stream=sys.stderr):
"""
Return a message containing the time it took to run a function.
if *stream* supports the .write() method, invoke the message to print to the stream
if *logger* is given, log the message
"""
delta = datetime.datetime.now() - start
if 'write' in dir(stream):
stream.write("Running '%s'" % (label))
report = ' took %s' % (pretty_delta(delta))
else:
report = '%s: took %s' % (label, pretty_delta(delta))
if count:
report += ' ({0}/{2} for {1} {3})'.format(pretty_delta(delta/count if count else delta), count, unit, misc.plural(count, unit))
if logger:
logger.info(report)
if 'write' in dir(stream):
stream.write(report + const.NEWLINE)
else:
return report
[docs]def time_method(func, count=None, unit='unit', logger=None, stream=sys.stderr):
"""
Return a function which can be invoked to report the time it takes to execute the function *func*.
"""
def _time_method(*args, **kwargs):
start = start_timer()
result = func(*args, **kwargs)
report_timer(start, func.__name__, count=count, unit=unit, logger=logger, stream=stream)
return result
return _time_method
[docs]def report_timed_loop(func, unit='iteration', logger=None, stream=sys.stderr):
"""
Return a report of how long an iterative function takes to run
"""
def _time_method(*args, **kwargs):
start = start_timer()
count = 0
while True:
count += 1
keep_going = func(*args, **kwargs)
if not keep_going:
break
return report_timer(start, '%s()' % func.__name__, count=count, unit=unit,
logger=logger, stream=stream)
return _time_method
_stamp_start_time = 0
[docs]def stampit(message, stream=None, logfile=None, timezero=False):
'''
Print *message* with a sortable leading timestamp of elapsed time.
If *timezero* is True, restart the timer (and thus print 00 seconds elapsed)
'''
elapsed_stamp = printable_elapsed_stamp(timezero)
pmessage = misc.spaced_list([elapsed_stamp, message])
print(pmessage, file=stream)
if logfile:
try:
with open(logfile, 'a') as log:
log.write(pmessage + const.NEWLINE)
except Exception as e:
raise LabtoolsWarning('%s() called from %s() with bad logfile name (%s)' %
(reflection.my_methodname(), reflection.my_callername(), logfile))
[docs]def printable_elapsed_stamp(timezero=False):
'''
Return a string showing elapsed time in standard sortable form. Although this
is "exposed" as a "public" method, it is chiefly a utility function for *stampit()*
The first invocation will return 0 elapsed time.
*timezero* resets elapsed time to zero if true.
'''
global _stamp_start_time
now = datetime.datetime.now()
if timezero or not _stamp_start_time:
_stamp_start_time = now
return comparable_delta(now - _stamp_start_time)