diff options
Diffstat (limited to 'lib/dfatool.py')
-rw-r--r-- | lib/dfatool.py | 117 |
1 files changed, 40 insertions, 77 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py index 0da8cc9..ef3bac7 100644 --- a/lib/dfatool.py +++ b/lib/dfatool.py @@ -3,6 +3,7 @@ import csv import io import json +import logging import numpy as np import os import re @@ -16,7 +17,6 @@ from .functions import analytic from .functions import AnalyticFunction from .parameters import ParamStats from .utils import ( - vprint, is_numeric, soft_cast_int, param_slice_eq, @@ -24,6 +24,8 @@ from .utils import ( ) from .utils import by_name_to_by_param, match_parameter_values, running_mean +logger = logging.getLogger(__name__) + try: from .pubcode import Code128 import zbar @@ -250,7 +252,7 @@ class CrossValidator: arguments: model_class -- model class/type used for model synthesis, e.g. PTAModel or AnalyticModel. model_class must have a - constructor accepting (by_name, parameters, arg_count, verbose = False) + constructor accepting (by_name, parameters, arg_count) and provide an `assess` method. by_name -- measurements aggregated by state/transition/function/... name. Layout: by_name[name][attribute] = list of data. Additionally, @@ -422,13 +424,9 @@ class CrossValidator: for idx in validation_subset: validation[name]["param"].append(self.by_name[name]["param"][idx]) - training_data = self.model_class( - training, self.parameters, self.arg_count, verbose=False - ) + training_data = self.model_class(training, self.parameters, self.arg_count) training_model = model_getter(training_data) - validation_data = self.model_class( - validation, self.parameters, self.arg_count, verbose=False - ) + validation_data = self.model_class(validation, self.parameters, self.arg_count) return validation_data.assess(training_model) @@ -573,14 +571,13 @@ class TimingData: self.traces_by_fileno.extend(log_data["traces"]) self._concatenate_analyzed_traces() - def get_preprocessed_data(self, verbose=True): + def get_preprocessed_data(self): """ Return a list of DFA traces annotated with timing and parameter data. Suitable for the PTAModel constructor. See PTAModel(...) docstring for format details. """ - self.verbose = verbose if self.preprocessed: return self.traces if self.version == 0: @@ -1133,7 +1130,7 @@ class RawData: trace["id"] = i return trace_output - def get_preprocessed_data(self, verbose=True): + def get_preprocessed_data(self): """ Return a list of DFA traces annotated with energy, timing, and parameter data. The list is cached on disk, unless the constructor was called with `with_traces` set. @@ -1186,7 +1183,6 @@ class RawData: * `args`: List of arguments the corresponding function call was called with. args entries are strings which are not necessarily numeric * `code`: List of function name (first entry) and arguments (remaining entries) of the corresponding function call """ - self.verbose = verbose if self.preprocessed: return self.traces if self.version == 0: @@ -1375,9 +1371,8 @@ class RawData: for measurement in measurements: if "energy_trace" not in measurement: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logging.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e="; ".join(measurement["datasource_errors"]), @@ -1398,9 +1393,8 @@ class RawData: self._merge_online_and_offline(measurement) num_valid += 1 else: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logging.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e=measurement["error"], @@ -1411,17 +1405,15 @@ class RawData: self._merge_online_and_etlog(measurement) num_valid += 1 else: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logging.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e=measurement["error"], ), ) - vprint( - self.verbose, - "[I] {num_valid:d}/{num_total:d} measurements are valid".format( + logging.info( + "{num_valid:d}/{num_total:d} measurements are valid".format( num_valid=num_valid, num_total=len(measurements) ), ) @@ -1676,7 +1668,7 @@ def _num_args_from_by_name(by_name): return num_args -def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = None): +def get_fit_result(results, name, attribute, param_filter: dict = None): """ Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'. @@ -1685,7 +1677,6 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = :param results: fit results as returned by `paramfit.results` :param name: state/transition/... name, e.g. 'TX' :param attribute: model attribute, e.g. 'duration' - :param verbose: print debug message to stdout when deliberately not using a determined fit function :param param_filter: :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } """ @@ -1701,31 +1692,29 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = if this_result["best_rmsd"] >= min( this_result["mean_rmsd"], this_result["median_rmsd"] ): - vprint( - verbose, - "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( name, attribute, result["key"][2], this_result["best_rmsd"], this_result["mean_rmsd"], this_result["median_rmsd"], - ), + ) ) # See notes on depends_on_param elif this_result["best_rmsd"] >= 0.8 * min( this_result["mean_rmsd"], this_result["median_rmsd"] ): - vprint( - verbose, - "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( name, attribute, result["key"][2], this_result["best_rmsd"], this_result["mean_rmsd"], this_result["median_rmsd"], - ), + ) ) else: fit_result[result["key"][2]] = this_result @@ -1780,7 +1769,6 @@ class AnalyticModel: parameters, arg_count=None, function_override=dict(), - verbose=True, use_corrcoef=False, ): """ @@ -1817,7 +1805,6 @@ class AnalyticModel: there for the required format. Note that this happens regardless of parameter dependency detection: The provided analytic function will be assigned even if it seems like the model attribute is static / parameter-independent. - :param verbose: Print debug/info output while generating the model? :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter """ self.cache = dict() @@ -1826,7 +1813,6 @@ class AnalyticModel: self.names = sorted(by_name.keys()) self.parameters = sorted(parameters) self.function_override = function_override.copy() - self.verbose = verbose self._use_corrcoef = use_corrcoef self._num_args = arg_count if self._num_args is None: @@ -1837,7 +1823,6 @@ class AnalyticModel: self.by_param, self.parameters, self._num_args, - verbose=verbose, use_corrcoef=use_corrcoef, ) @@ -1849,12 +1834,9 @@ class AnalyticModel: try: model[name][key] = model_function(elem[key]) except RuntimeWarning: - vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) + logging.warning("Got no data for {} {}".format(name, key)) except FloatingPointError as fpe: - vprint( - self.verbose, - "[W] Got no data for {} {}: {}".format(name, key, fpe), - ) + logging.warning("Got no data for {} {}: {}".format(name, key, fpe),) return model def param_index(self, param_name): @@ -1947,9 +1929,7 @@ class AnalyticModel: if name in self._num_args: num_args = self._num_args[name] for attribute in self.by_name[name]["attributes"]: - fit_result = get_fit_result( - paramfit.results, name, attribute, self.verbose - ) + fit_result = get_fit_result(paramfit.results, name, attribute) if (name, attribute) in self.function_override: function_str = self.function_override[(name, attribute)] @@ -2173,7 +2153,6 @@ class PTAModel: ignore_trace_indexes=[], discard_outliers=None, function_override={}, - verbose=True, use_corrcoef=False, pta=None, ): @@ -2202,7 +2181,6 @@ class PTAModel: there for the required format. Note that this happens regardless of parameter dependency detection: The provided analytic function will be assigned even if it seems like the model attribute is static / parameter-independent. - verbose -- print informative output, e.g. when removing an outlier use_corrcoef -- use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter pta -- hardware model as `PTA` object @@ -2219,13 +2197,11 @@ class PTAModel: self._parameter_names, self._num_args, self._use_corrcoef, - verbose=verbose, ) self.cache = {} np.seterr("raise") self._outlier_threshold = discard_outliers self.function_override = function_override.copy() - self.verbose = verbose self.pta = pta self.ignore_trace_indexes = ignore_trace_indexes self._aggregate_to_ndarray(self.by_name) @@ -2254,12 +2230,9 @@ class PTAModel: try: model[name][key] = model_function(elem[key]) except RuntimeWarning: - vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) + logging.warning("Got no data for {} {}".format(name, key)) except FloatingPointError as fpe: - vprint( - self.verbose, - "[W] Got no data for {} {}: {}".format(name, key, fpe), - ) + logging.warning("Got no data for {} {}: {}".format(name, key, fpe),) return model def get_static(self, use_mean=False): @@ -2383,7 +2356,7 @@ class PTAModel: num_args = self._num_args[state_or_tran] for model_attribute in self.by_name[state_or_tran]["attributes"]: fit_results = get_fit_result( - paramfit.results, state_or_tran, model_attribute, self.verbose + paramfit.results, state_or_tran, model_attribute ) for parameter_name in self._parameter_names: @@ -2700,7 +2673,6 @@ class EnergyTraceLog: self.state_duration = state_duration * 1e-3 self.transition_names = transition_names self.with_traces = with_traces - self.verbose = False self.errors = list() # TODO auto-detect @@ -2758,8 +2730,7 @@ class EnergyTraceLog: self.sample_rate = data_count / (m_duration_us * 1e-6) - vprint( - self.verbose, + logging.debug( "got {} samples with {} seconds of log data ({} Hz)".format( data_count, m_duration_us * 1e-6, self.sample_rate ), @@ -2869,19 +2840,16 @@ class EnergyTraceLog: print('[!!!] did not find transition "{}"'.format(name)) break next_barcode = end + self.state_duration + duration - vprint( - self.verbose, + logging.debug( '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format( offline_index, bc, start, stop, end ), ) if bc != name: - vprint( - self.verbose, + logging.debug( '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc), ) - vprint( - self.verbose, + logging.debug( "{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format( offline_index, end, end + duration ), @@ -2894,8 +2862,7 @@ class EnergyTraceLog: self.ts_to_index(end + duration + self.state_duration) + 1 ) - vprint( - self.verbose, + logging.debug( "{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format( offline_index, transition_start_index / self.sample_rate, @@ -2995,8 +2962,7 @@ class EnergyTraceLog: + self.led_power / 3 ) - vprint( - self.verbose, + logging.debug( "looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format( start_ts, sync_threshold_power * 1e3 ), @@ -3030,8 +2996,7 @@ class EnergyTraceLog: barcode_data = self.interval_power[sync_area_start:sync_area_end] - vprint( - self.verbose, + logging.debug( "barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format( sync_start_ts, sync_end_ts, len(barcode_data) ), @@ -3109,7 +3074,7 @@ class EnergyTraceLog: return content, sym_start, sym_end, padding_bits else: - vprint(self.verbose, "unable to find barcode") + logging.warning("unable to find barcode") return None, None, None, None @@ -3129,17 +3094,15 @@ class MIMOSA: Resulting data is a list of state/transition/state/transition/... measurements. """ - def __init__(self, voltage: float, shunt: int, verbose=True, with_traces=False): + def __init__(self, voltage: float, shunt: int, with_traces=False): """ Initialize MIMOSA loader for a specific voltage and shunt setting. :param voltage: MIMOSA DUT supply voltage (V) :para mshunt: MIMOSA Shunt (Ohms) - :param verbose: print notices about invalid data on STDOUT? """ self.voltage = voltage self.shunt = shunt - self.verbose = verbose self.with_traces = with_traces self.r1 = 984 # "1k" self.r2 = 99013 # "100k" @@ -3337,7 +3300,7 @@ class MIMOSA: if cal_r2_mean > cal_0_mean: b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean) else: - vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2)) + logging.warning("0 uA == %.f uA during calibration" % (ua_r2)) b_lower = 0 b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean) @@ -3509,7 +3472,7 @@ class MIMOSA: data["substates"] = substates ssum = np.sum(list(map(lambda x: x["duration"], substates["states"]))) if ssum != data["us"]: - vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum)) + logging.warning("duration %d vs %d" % (data["us"], ssum)) if isa == "transition": # subtract average power of previous state |