summaryrefslogtreecommitdiff
path: root/lib/dfatool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/dfatool.py')
-rw-r--r--lib/dfatool.py117
1 files changed, 40 insertions, 77 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py
index 0da8cc9..ef3bac7 100644
--- a/lib/dfatool.py
+++ b/lib/dfatool.py
@@ -3,6 +3,7 @@
import csv
import io
import json
+import logging
import numpy as np
import os
import re
@@ -16,7 +17,6 @@ from .functions import analytic
from .functions import AnalyticFunction
from .parameters import ParamStats
from .utils import (
- vprint,
is_numeric,
soft_cast_int,
param_slice_eq,
@@ -24,6 +24,8 @@ from .utils import (
)
from .utils import by_name_to_by_param, match_parameter_values, running_mean
+logger = logging.getLogger(__name__)
+
try:
from .pubcode import Code128
import zbar
@@ -250,7 +252,7 @@ class CrossValidator:
arguments:
model_class -- model class/type used for model synthesis,
e.g. PTAModel or AnalyticModel. model_class must have a
- constructor accepting (by_name, parameters, arg_count, verbose = False)
+ constructor accepting (by_name, parameters, arg_count)
and provide an `assess` method.
by_name -- measurements aggregated by state/transition/function/... name.
Layout: by_name[name][attribute] = list of data. Additionally,
@@ -422,13 +424,9 @@ class CrossValidator:
for idx in validation_subset:
validation[name]["param"].append(self.by_name[name]["param"][idx])
- training_data = self.model_class(
- training, self.parameters, self.arg_count, verbose=False
- )
+ training_data = self.model_class(training, self.parameters, self.arg_count)
training_model = model_getter(training_data)
- validation_data = self.model_class(
- validation, self.parameters, self.arg_count, verbose=False
- )
+ validation_data = self.model_class(validation, self.parameters, self.arg_count)
return validation_data.assess(training_model)
@@ -573,14 +571,13 @@ class TimingData:
self.traces_by_fileno.extend(log_data["traces"])
self._concatenate_analyzed_traces()
- def get_preprocessed_data(self, verbose=True):
+ def get_preprocessed_data(self):
"""
Return a list of DFA traces annotated with timing and parameter data.
Suitable for the PTAModel constructor.
See PTAModel(...) docstring for format details.
"""
- self.verbose = verbose
if self.preprocessed:
return self.traces
if self.version == 0:
@@ -1133,7 +1130,7 @@ class RawData:
trace["id"] = i
return trace_output
- def get_preprocessed_data(self, verbose=True):
+ def get_preprocessed_data(self):
"""
Return a list of DFA traces annotated with energy, timing, and parameter data.
The list is cached on disk, unless the constructor was called with `with_traces` set.
@@ -1186,7 +1183,6 @@ class RawData:
* `args`: List of arguments the corresponding function call was called with. args entries are strings which are not necessarily numeric
* `code`: List of function name (first entry) and arguments (remaining entries) of the corresponding function call
"""
- self.verbose = verbose
if self.preprocessed:
return self.traces
if self.version == 0:
@@ -1375,9 +1371,8 @@ class RawData:
for measurement in measurements:
if "energy_trace" not in measurement:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logging.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e="; ".join(measurement["datasource_errors"]),
@@ -1398,9 +1393,8 @@ class RawData:
self._merge_online_and_offline(measurement)
num_valid += 1
else:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logging.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e=measurement["error"],
@@ -1411,17 +1405,15 @@ class RawData:
self._merge_online_and_etlog(measurement)
num_valid += 1
else:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logging.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e=measurement["error"],
),
)
- vprint(
- self.verbose,
- "[I] {num_valid:d}/{num_total:d} measurements are valid".format(
+ logging.info(
+ "{num_valid:d}/{num_total:d} measurements are valid".format(
num_valid=num_valid, num_total=len(measurements)
),
)
@@ -1676,7 +1668,7 @@ def _num_args_from_by_name(by_name):
return num_args
-def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = None):
+def get_fit_result(results, name, attribute, param_filter: dict = None):
"""
Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
@@ -1685,7 +1677,6 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict =
:param results: fit results as returned by `paramfit.results`
:param name: state/transition/... name, e.g. 'TX'
:param attribute: model attribute, e.g. 'duration'
- :param verbose: print debug message to stdout when deliberately not using a determined fit function
:param param_filter:
:returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
"""
@@ -1701,31 +1692,29 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict =
if this_result["best_rmsd"] >= min(
this_result["mean_rmsd"], this_result["median_rmsd"]
):
- vprint(
- verbose,
- "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
name,
attribute,
result["key"][2],
this_result["best_rmsd"],
this_result["mean_rmsd"],
this_result["median_rmsd"],
- ),
+ )
)
# See notes on depends_on_param
elif this_result["best_rmsd"] >= 0.8 * min(
this_result["mean_rmsd"], this_result["median_rmsd"]
):
- vprint(
- verbose,
- "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
name,
attribute,
result["key"][2],
this_result["best_rmsd"],
this_result["mean_rmsd"],
this_result["median_rmsd"],
- ),
+ )
)
else:
fit_result[result["key"][2]] = this_result
@@ -1780,7 +1769,6 @@ class AnalyticModel:
parameters,
arg_count=None,
function_override=dict(),
- verbose=True,
use_corrcoef=False,
):
"""
@@ -1817,7 +1805,6 @@ class AnalyticModel:
there for the required format. Note that this happens regardless of
parameter dependency detection: The provided analytic function will be assigned
even if it seems like the model attribute is static / parameter-independent.
- :param verbose: Print debug/info output while generating the model?
:param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
"""
self.cache = dict()
@@ -1826,7 +1813,6 @@ class AnalyticModel:
self.names = sorted(by_name.keys())
self.parameters = sorted(parameters)
self.function_override = function_override.copy()
- self.verbose = verbose
self._use_corrcoef = use_corrcoef
self._num_args = arg_count
if self._num_args is None:
@@ -1837,7 +1823,6 @@ class AnalyticModel:
self.by_param,
self.parameters,
self._num_args,
- verbose=verbose,
use_corrcoef=use_corrcoef,
)
@@ -1849,12 +1834,9 @@ class AnalyticModel:
try:
model[name][key] = model_function(elem[key])
except RuntimeWarning:
- vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
+ logging.warning("Got no data for {} {}".format(name, key))
except FloatingPointError as fpe:
- vprint(
- self.verbose,
- "[W] Got no data for {} {}: {}".format(name, key, fpe),
- )
+ logging.warning("Got no data for {} {}: {}".format(name, key, fpe),)
return model
def param_index(self, param_name):
@@ -1947,9 +1929,7 @@ class AnalyticModel:
if name in self._num_args:
num_args = self._num_args[name]
for attribute in self.by_name[name]["attributes"]:
- fit_result = get_fit_result(
- paramfit.results, name, attribute, self.verbose
- )
+ fit_result = get_fit_result(paramfit.results, name, attribute)
if (name, attribute) in self.function_override:
function_str = self.function_override[(name, attribute)]
@@ -2173,7 +2153,6 @@ class PTAModel:
ignore_trace_indexes=[],
discard_outliers=None,
function_override={},
- verbose=True,
use_corrcoef=False,
pta=None,
):
@@ -2202,7 +2181,6 @@ class PTAModel:
there for the required format. Note that this happens regardless of
parameter dependency detection: The provided analytic function will be assigned
even if it seems like the model attribute is static / parameter-independent.
- verbose -- print informative output, e.g. when removing an outlier
use_corrcoef -- use correlation coefficient instead of stddev comparison
to detect whether a model attribute depends on a parameter
pta -- hardware model as `PTA` object
@@ -2219,13 +2197,11 @@ class PTAModel:
self._parameter_names,
self._num_args,
self._use_corrcoef,
- verbose=verbose,
)
self.cache = {}
np.seterr("raise")
self._outlier_threshold = discard_outliers
self.function_override = function_override.copy()
- self.verbose = verbose
self.pta = pta
self.ignore_trace_indexes = ignore_trace_indexes
self._aggregate_to_ndarray(self.by_name)
@@ -2254,12 +2230,9 @@ class PTAModel:
try:
model[name][key] = model_function(elem[key])
except RuntimeWarning:
- vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
+ logging.warning("Got no data for {} {}".format(name, key))
except FloatingPointError as fpe:
- vprint(
- self.verbose,
- "[W] Got no data for {} {}: {}".format(name, key, fpe),
- )
+ logging.warning("Got no data for {} {}: {}".format(name, key, fpe),)
return model
def get_static(self, use_mean=False):
@@ -2383,7 +2356,7 @@ class PTAModel:
num_args = self._num_args[state_or_tran]
for model_attribute in self.by_name[state_or_tran]["attributes"]:
fit_results = get_fit_result(
- paramfit.results, state_or_tran, model_attribute, self.verbose
+ paramfit.results, state_or_tran, model_attribute
)
for parameter_name in self._parameter_names:
@@ -2700,7 +2673,6 @@ class EnergyTraceLog:
self.state_duration = state_duration * 1e-3
self.transition_names = transition_names
self.with_traces = with_traces
- self.verbose = False
self.errors = list()
# TODO auto-detect
@@ -2758,8 +2730,7 @@ class EnergyTraceLog:
self.sample_rate = data_count / (m_duration_us * 1e-6)
- vprint(
- self.verbose,
+ logging.debug(
"got {} samples with {} seconds of log data ({} Hz)".format(
data_count, m_duration_us * 1e-6, self.sample_rate
),
@@ -2869,19 +2840,16 @@ class EnergyTraceLog:
print('[!!!] did not find transition "{}"'.format(name))
break
next_barcode = end + self.state_duration + duration
- vprint(
- self.verbose,
+ logging.debug(
'{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format(
offline_index, bc, start, stop, end
),
)
if bc != name:
- vprint(
- self.verbose,
+ logging.debug(
'[!!!] mismatch: expected "{}", got "{}"'.format(name, bc),
)
- vprint(
- self.verbose,
+ logging.debug(
"{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format(
offline_index, end, end + duration
),
@@ -2894,8 +2862,7 @@ class EnergyTraceLog:
self.ts_to_index(end + duration + self.state_duration) + 1
)
- vprint(
- self.verbose,
+ logging.debug(
"{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format(
offline_index,
transition_start_index / self.sample_rate,
@@ -2995,8 +2962,7 @@ class EnergyTraceLog:
+ self.led_power / 3
)
- vprint(
- self.verbose,
+ logging.debug(
"looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format(
start_ts, sync_threshold_power * 1e3
),
@@ -3030,8 +2996,7 @@ class EnergyTraceLog:
barcode_data = self.interval_power[sync_area_start:sync_area_end]
- vprint(
- self.verbose,
+ logging.debug(
"barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format(
sync_start_ts, sync_end_ts, len(barcode_data)
),
@@ -3109,7 +3074,7 @@ class EnergyTraceLog:
return content, sym_start, sym_end, padding_bits
else:
- vprint(self.verbose, "unable to find barcode")
+ logging.warning("unable to find barcode")
return None, None, None, None
@@ -3129,17 +3094,15 @@ class MIMOSA:
Resulting data is a list of state/transition/state/transition/... measurements.
"""
- def __init__(self, voltage: float, shunt: int, verbose=True, with_traces=False):
+ def __init__(self, voltage: float, shunt: int, with_traces=False):
"""
Initialize MIMOSA loader for a specific voltage and shunt setting.
:param voltage: MIMOSA DUT supply voltage (V)
:para mshunt: MIMOSA Shunt (Ohms)
- :param verbose: print notices about invalid data on STDOUT?
"""
self.voltage = voltage
self.shunt = shunt
- self.verbose = verbose
self.with_traces = with_traces
self.r1 = 984 # "1k"
self.r2 = 99013 # "100k"
@@ -3337,7 +3300,7 @@ class MIMOSA:
if cal_r2_mean > cal_0_mean:
b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean)
else:
- vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2))
+ logging.warning("0 uA == %.f uA during calibration" % (ua_r2))
b_lower = 0
b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean)
@@ -3509,7 +3472,7 @@ class MIMOSA:
data["substates"] = substates
ssum = np.sum(list(map(lambda x: x["duration"], substates["states"])))
if ssum != data["us"]:
- vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum))
+ logging.warning("duration %d vs %d" % (data["us"], ssum))
if isa == "transition":
# subtract average power of previous state