summaryrefslogtreecommitdiff
path: root/lib/dfatool.py
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-05-28 12:04:37 +0200
committerDaniel Friesel <daniel.friesel@uos.de>2020-05-28 12:04:37 +0200
commitc69331e4d925658b2bf26dcb387981f6530d7b9e (patch)
treed19c7f9b0bf51f68c104057e013630e009835268 /lib/dfatool.py
parent23927051ac3e64cabbaa6c30e8356dfe90ebfa6c (diff)
use black(1) for uniform code formatting
Diffstat (limited to 'lib/dfatool.py')
-rw-r--r--lib/dfatool.py1797
1 files changed, 1169 insertions, 628 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py
index 8fb41a5..56f0f2d 100644
--- a/lib/dfatool.py
+++ b/lib/dfatool.py
@@ -15,12 +15,19 @@ from multiprocessing import Pool
from .functions import analytic
from .functions import AnalyticFunction
from .parameters import ParamStats
-from .utils import vprint, is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple
+from .utils import (
+ vprint,
+ is_numeric,
+ soft_cast_int,
+ param_slice_eq,
+ remove_index_from_tuple,
+)
from .utils import by_name_to_by_param, match_parameter_values, running_mean
try:
from .pubcode import Code128
import zbar
+
zbar_available = True
except ImportError:
zbar_available = False
@@ -47,25 +54,25 @@ def gplearn_to_function(function_str: str):
inv -- 1 / x if |x| > 0.001, otherwise 0
"""
eval_globals = {
- 'add': lambda x, y: x + y,
- 'sub': lambda x, y: x - y,
- 'mul': lambda x, y: x * y,
- 'div': lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.,
- 'sqrt': lambda x: np.sqrt(np.abs(x)),
- 'log': lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.,
- 'inv': lambda x: 1. / x if np.abs(x) > 0.001 else 0.,
+ "add": lambda x, y: x + y,
+ "sub": lambda x, y: x - y,
+ "mul": lambda x, y: x * y,
+ "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0,
+ "sqrt": lambda x: np.sqrt(np.abs(x)),
+ "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0,
+ "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0,
}
last_arg_index = 0
for i in range(0, 100):
- if function_str.find('X{:d}'.format(i)) >= 0:
+ if function_str.find("X{:d}".format(i)) >= 0:
last_arg_index = i
arg_list = []
for i in range(0, last_arg_index + 1):
- arg_list.append('X{:d}'.format(i))
+ arg_list.append("X{:d}".format(i))
- eval_str = 'lambda {}, *whatever: {}'.format(','.join(arg_list), function_str)
+ eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str)
print(eval_str)
return eval(eval_str, eval_globals)
@@ -123,32 +130,35 @@ def regression_measures(predicted: np.ndarray, actual: np.ndarray):
count -- Number of values
"""
if type(predicted) != np.ndarray:
- raise ValueError('first arg must be ndarray, is {}'.format(type(predicted)))
+ raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
if type(actual) != np.ndarray:
- raise ValueError('second arg must be ndarray, is {}'.format(type(actual)))
+ raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
deviations = predicted - actual
# mean = np.mean(actual)
if len(deviations) == 0:
return {}
measures = {
- 'mae': np.mean(np.abs(deviations), dtype=np.float64),
- 'msd': np.mean(deviations**2, dtype=np.float64),
- 'rmsd': np.sqrt(np.mean(deviations**2), dtype=np.float64),
- 'ssr': np.sum(deviations**2, dtype=np.float64),
- 'rsq': r2_score(actual, predicted),
- 'count': len(actual),
+ "mae": np.mean(np.abs(deviations), dtype=np.float64),
+ "msd": np.mean(deviations ** 2, dtype=np.float64),
+ "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
+ "ssr": np.sum(deviations ** 2, dtype=np.float64),
+ "rsq": r2_score(actual, predicted),
+ "count": len(actual),
}
# rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
if np.all(actual != 0):
- measures['mape'] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
+ measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
else:
- measures['mape'] = np.nan
+ measures["mape"] = np.nan
if np.all(np.abs(predicted) + np.abs(actual) != 0):
- measures['smape'] = np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) * 100
+ measures["smape"] = (
+ np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
+ * 100
+ )
else:
- measures['smape'] = np.nan
+ measures["smape"] = np.nan
# if np.all(rsq_quotient != 0):
# measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
@@ -177,7 +187,7 @@ class KeysightCSV:
with open(filename) as f:
for _ in range(4):
next(f)
- reader = csv.reader(f, delimiter=',')
+ reader = csv.reader(f, delimiter=",")
for i, row in enumerate(reader):
timestamps[i] = float(row[0])
currents[i] = float(row[2]) * -1
@@ -266,29 +276,35 @@ class CrossValidator:
}
}
"""
- ret = {
- 'by_name': dict()
- }
+ ret = {"by_name": dict()}
for name in self.names:
- ret['by_name'][name] = dict()
- for attribute in self.by_name[name]['attributes']:
- ret['by_name'][name][attribute] = {
- 'mae_list': list(),
- 'smape_list': list()
+ ret["by_name"][name] = dict()
+ for attribute in self.by_name[name]["attributes"]:
+ ret["by_name"][name][attribute] = {
+ "mae_list": list(),
+ "smape_list": list(),
}
for _ in range(count):
res = self._single_montecarlo(model_getter)
for name in self.names:
- for attribute in self.by_name[name]['attributes']:
- ret['by_name'][name][attribute]['mae_list'].append(res['by_name'][name][attribute]['mae'])
- ret['by_name'][name][attribute]['smape_list'].append(res['by_name'][name][attribute]['smape'])
+ for attribute in self.by_name[name]["attributes"]:
+ ret["by_name"][name][attribute]["mae_list"].append(
+ res["by_name"][name][attribute]["mae"]
+ )
+ ret["by_name"][name][attribute]["smape_list"].append(
+ res["by_name"][name][attribute]["smape"]
+ )
for name in self.names:
- for attribute in self.by_name[name]['attributes']:
- ret['by_name'][name][attribute]['mae'] = np.mean(ret['by_name'][name][attribute]['mae_list'])
- ret['by_name'][name][attribute]['smape'] = np.mean(ret['by_name'][name][attribute]['smape_list'])
+ for attribute in self.by_name[name]["attributes"]:
+ ret["by_name"][name][attribute]["mae"] = np.mean(
+ ret["by_name"][name][attribute]["mae_list"]
+ )
+ ret["by_name"][name][attribute]["smape"] = np.mean(
+ ret["by_name"][name][attribute]["smape_list"]
+ )
return ret
@@ -296,77 +312,87 @@ class CrossValidator:
training = dict()
validation = dict()
for name in self.names:
- training[name] = {
- 'attributes': self.by_name[name]['attributes']
- }
- validation[name] = {
- 'attributes': self.by_name[name]['attributes']
- }
+ training[name] = {"attributes": self.by_name[name]["attributes"]}
+ validation[name] = {"attributes": self.by_name[name]["attributes"]}
- if 'isa' in self.by_name[name]:
- training[name]['isa'] = self.by_name[name]['isa']
- validation[name]['isa'] = self.by_name[name]['isa']
+ if "isa" in self.by_name[name]:
+ training[name]["isa"] = self.by_name[name]["isa"]
+ validation[name]["isa"] = self.by_name[name]["isa"]
- data_count = len(self.by_name[name]['param'])
+ data_count = len(self.by_name[name]["param"])
training_subset, validation_subset = _xv_partition_montecarlo(data_count)
- for attribute in self.by_name[name]['attributes']:
+ for attribute in self.by_name[name]["attributes"]:
self.by_name[name][attribute] = np.array(self.by_name[name][attribute])
- training[name][attribute] = self.by_name[name][attribute][training_subset]
- validation[name][attribute] = self.by_name[name][attribute][validation_subset]
+ training[name][attribute] = self.by_name[name][attribute][
+ training_subset
+ ]
+ validation[name][attribute] = self.by_name[name][attribute][
+ validation_subset
+ ]
# We can't use slice syntax for 'param', which may contain strings and other odd values
- training[name]['param'] = list()
- validation[name]['param'] = list()
+ training[name]["param"] = list()
+ validation[name]["param"] = list()
for idx in training_subset:
- training[name]['param'].append(self.by_name[name]['param'][idx])
+ training[name]["param"].append(self.by_name[name]["param"][idx])
for idx in validation_subset:
- validation[name]['param'].append(self.by_name[name]['param'][idx])
+ validation[name]["param"].append(self.by_name[name]["param"][idx])
- training_data = self.model_class(training, self.parameters, self.arg_count, verbose=False)
+ training_data = self.model_class(
+ training, self.parameters, self.arg_count, verbose=False
+ )
training_model = model_getter(training_data)
- validation_data = self.model_class(validation, self.parameters, self.arg_count, verbose=False)
+ validation_data = self.model_class(
+ validation, self.parameters, self.arg_count, verbose=False
+ )
return validation_data.assess(training_model)
def _preprocess_mimosa(measurement):
- setup = measurement['setup']
- mim = MIMOSA(float(setup['mimosa_voltage']), int(setup['mimosa_shunt']), with_traces=measurement['with_traces'])
+ setup = measurement["setup"]
+ mim = MIMOSA(
+ float(setup["mimosa_voltage"]),
+ int(setup["mimosa_shunt"]),
+ with_traces=measurement["with_traces"],
+ )
try:
- charges, triggers = mim.load_data(measurement['content'])
+ charges, triggers = mim.load_data(measurement["content"])
trigidx = mim.trigger_edges(triggers)
except EOFError as e:
- mim.errors.append('MIMOSA logfile error: {}'.format(e))
+ mim.errors.append("MIMOSA logfile error: {}".format(e))
trigidx = list()
if len(trigidx) == 0:
- mim.errors.append('MIMOSA log has no triggers')
+ mim.errors.append("MIMOSA log has no triggers")
return {
- 'fileno': measurement['fileno'],
- 'info': measurement['info'],
- 'has_datasource_error': len(mim.errors) > 0,
- 'datasource_errors': mim.errors,
- 'expected_trace': measurement['expected_trace'],
- 'repeat_id': measurement['repeat_id'],
+ "fileno": measurement["fileno"],
+ "info": measurement["info"],
+ "has_datasource_error": len(mim.errors) > 0,
+ "datasource_errors": mim.errors,
+ "expected_trace": measurement["expected_trace"],
+ "repeat_id": measurement["repeat_id"],
}
- cal_edges = mim.calibration_edges(running_mean(mim.currents_nocal(charges[0:trigidx[0]]), 10))
+ cal_edges = mim.calibration_edges(
+ running_mean(mim.currents_nocal(charges[0 : trigidx[0]]), 10)
+ )
calfunc, caldata = mim.calibration_function(charges, cal_edges)
vcalfunc = np.vectorize(calfunc, otypes=[np.float64])
processed_data = {
- 'fileno': measurement['fileno'],
- 'info': measurement['info'],
- 'triggers': len(trigidx),
- 'first_trig': trigidx[0] * 10,
- 'calibration': caldata,
- 'energy_trace': mim.analyze_states(charges, trigidx, vcalfunc),
- 'has_datasource_error': len(mim.errors) > 0,
- 'datasource_errors': mim.errors,
+ "fileno": measurement["fileno"],
+ "info": measurement["info"],
+ "triggers": len(trigidx),
+ "first_trig": trigidx[0] * 10,
+ "calibration": caldata,
+ "energy_trace": mim.analyze_states(charges, trigidx, vcalfunc),
+ "has_datasource_error": len(mim.errors) > 0,
+ "datasource_errors": mim.errors,
}
- for key in ['expected_trace', 'repeat_id']:
+ for key in ["expected_trace", "repeat_id"]:
if key in measurement:
processed_data[key] = measurement[key]
@@ -374,22 +400,28 @@ def _preprocess_mimosa(measurement):
def _preprocess_etlog(measurement):
- setup = measurement['setup']
- etlog = EnergyTraceLog(float(setup['voltage']), int(setup['state_duration']), measurement['transition_names'])
+ setup = measurement["setup"]
+ etlog = EnergyTraceLog(
+ float(setup["voltage"]),
+ int(setup["state_duration"]),
+ measurement["transition_names"],
+ )
try:
- etlog.load_data(measurement['content'])
- states_and_transitions = etlog.analyze_states(measurement['expected_trace'], measurement['repeat_id'])
+ etlog.load_data(measurement["content"])
+ states_and_transitions = etlog.analyze_states(
+ measurement["expected_trace"], measurement["repeat_id"]
+ )
except EOFError as e:
- etlog.errors.append('EnergyTrace logfile error: {}'.format(e))
+ etlog.errors.append("EnergyTrace logfile error: {}".format(e))
processed_data = {
- 'fileno': measurement['fileno'],
- 'repeat_id': measurement['repeat_id'],
- 'info': measurement['info'],
- 'expected_trace': measurement['expected_trace'],
- 'energy_trace': states_and_transitions,
- 'has_datasource_error': len(etlog.errors) > 0,
- 'datasource_errors': etlog.errors,
+ "fileno": measurement["fileno"],
+ "repeat_id": measurement["repeat_id"],
+ "info": measurement["info"],
+ "expected_trace": measurement["expected_trace"],
+ "energy_trace": states_and_transitions,
+ "has_datasource_error": len(etlog.errors) > 0,
+ "datasource_errors": etlog.errors,
}
return processed_data
@@ -421,35 +453,40 @@ class TimingData:
for trace_group in self.traces_by_fileno:
for trace in trace_group:
# TimingHarness logs states, but does not aggregate any data for them at the moment -> throw all states away
- transitions = list(filter(lambda x: x['isa'] == 'transition', trace['trace']))
- self.traces.append({
- 'id': trace['id'],
- 'trace': transitions,
- })
+ transitions = list(
+ filter(lambda x: x["isa"] == "transition", trace["trace"])
+ )
+ self.traces.append(
+ {"id": trace["id"], "trace": transitions,}
+ )
for i, trace in enumerate(self.traces):
- trace['orig_id'] = trace['id']
- trace['id'] = i
- for log_entry in trace['trace']:
- paramkeys = sorted(log_entry['parameter'].keys())
- if 'param' not in log_entry['offline_aggregates']:
- log_entry['offline_aggregates']['param'] = list()
- if 'duration' in log_entry['offline_aggregates']:
- for i in range(len(log_entry['offline_aggregates']['duration'])):
+ trace["orig_id"] = trace["id"]
+ trace["id"] = i
+ for log_entry in trace["trace"]:
+ paramkeys = sorted(log_entry["parameter"].keys())
+ if "param" not in log_entry["offline_aggregates"]:
+ log_entry["offline_aggregates"]["param"] = list()
+ if "duration" in log_entry["offline_aggregates"]:
+ for i in range(len(log_entry["offline_aggregates"]["duration"])):
paramvalues = list()
for paramkey in paramkeys:
- if type(log_entry['parameter'][paramkey]) is list:
- paramvalues.append(soft_cast_int(log_entry['parameter'][paramkey][i]))
+ if type(log_entry["parameter"][paramkey]) is list:
+ paramvalues.append(
+ soft_cast_int(log_entry["parameter"][paramkey][i])
+ )
else:
- paramvalues.append(soft_cast_int(log_entry['parameter'][paramkey]))
- if arg_support_enabled and 'args' in log_entry:
- paramvalues.extend(map(soft_cast_int, log_entry['args']))
- log_entry['offline_aggregates']['param'].append(paramvalues)
+ paramvalues.append(
+ soft_cast_int(log_entry["parameter"][paramkey])
+ )
+ if arg_support_enabled and "args" in log_entry:
+ paramvalues.extend(map(soft_cast_int, log_entry["args"]))
+ log_entry["offline_aggregates"]["param"].append(paramvalues)
def _preprocess_0(self):
for filename in self.filenames:
- with open(filename, 'r') as f:
+ with open(filename, "r") as f:
log_data = json.load(f)
- self.traces_by_fileno.extend(log_data['traces'])
+ self.traces_by_fileno.extend(log_data["traces"])
self._concatenate_analyzed_traces()
def get_preprocessed_data(self, verbose=True):
@@ -470,17 +507,25 @@ class TimingData:
def sanity_check_aggregate(aggregate):
for key in aggregate:
- if 'param' not in aggregate[key]:
- raise RuntimeError('aggregate[{}][param] does not exist'.format(key))
- if 'attributes' not in aggregate[key]:
- raise RuntimeError('aggregate[{}][attributes] does not exist'.format(key))
- for attribute in aggregate[key]['attributes']:
+ if "param" not in aggregate[key]:
+ raise RuntimeError("aggregate[{}][param] does not exist".format(key))
+ if "attributes" not in aggregate[key]:
+ raise RuntimeError("aggregate[{}][attributes] does not exist".format(key))
+ for attribute in aggregate[key]["attributes"]:
if attribute not in aggregate[key]:
- raise RuntimeError('aggregate[{}][{}] does not exist, even though it is contained in aggregate[{}][attributes]'.format(key, attribute, key))
- param_len = len(aggregate[key]['param'])
+ raise RuntimeError(
+ "aggregate[{}][{}] does not exist, even though it is contained in aggregate[{}][attributes]".format(
+ key, attribute, key
+ )
+ )
+ param_len = len(aggregate[key]["param"])
attr_len = len(aggregate[key][attribute])
if param_len != attr_len:
- raise RuntimeError('parameter mismatch: len(aggregate[{}][param]) == {} != len(aggregate[{}][{}]) == {}'.format(key, param_len, key, attribute, attr_len))
+ raise RuntimeError(
+ "parameter mismatch: len(aggregate[{}][param]) == {} != len(aggregate[{}][{}]) == {}".format(
+ key, param_len, key, attribute, attr_len
+ )
+ )
class RawData:
@@ -559,11 +604,11 @@ class RawData:
with tarfile.open(filenames[0]) as tf:
for member in tf.getmembers():
- if member.name == 'ptalog.json' and self.version == 0:
+ if member.name == "ptalog.json" and self.version == 0:
self.version = 1
# might also be version 2
# depends on whether *.etlog exists or not
- elif '.etlog' in member.name:
+ elif ".etlog" in member.name:
self.version = 2
break
@@ -572,18 +617,18 @@ class RawData:
self.load_cache()
def set_cache_file(self):
- cache_key = hashlib.sha256('!'.join(self.filenames).encode()).hexdigest()
- self.cache_dir = os.path.dirname(self.filenames[0]) + '/cache'
- self.cache_file = '{}/{}.json'.format(self.cache_dir, cache_key)
+ cache_key = hashlib.sha256("!".join(self.filenames).encode()).hexdigest()
+ self.cache_dir = os.path.dirname(self.filenames[0]) + "/cache"
+ self.cache_file = "{}/{}.json".format(self.cache_dir, cache_key)
def load_cache(self):
if os.path.exists(self.cache_file):
- with open(self.cache_file, 'r') as f:
+ with open(self.cache_file, "r") as f:
cache_data = json.load(f)
- self.traces = cache_data['traces']
- self.preprocessing_stats = cache_data['preprocessing_stats']
- if 'pta' in cache_data:
- self.pta = cache_data['pta']
+ self.traces = cache_data["traces"]
+ self.preprocessing_stats = cache_data["preprocessing_stats"]
+ if "pta" in cache_data:
+ self.pta = cache_data["pta"]
self.preprocessed = True
def save_cache(self):
@@ -593,30 +638,30 @@ class RawData:
os.mkdir(self.cache_dir)
except FileExistsError:
pass
- with open(self.cache_file, 'w') as f:
+ with open(self.cache_file, "w") as f:
cache_data = {
- 'traces': self.traces,
- 'preprocessing_stats': self.preprocessing_stats,
- 'pta': self.pta,
+ "traces": self.traces,
+ "preprocessing_stats": self.preprocessing_stats,
+ "pta": self.pta,
}
json.dump(cache_data, f)
def _state_is_too_short(self, online, offline, state_duration, next_transition):
# We cannot control when an interrupt causes a state to be left
- if next_transition['plan']['level'] == 'epilogue':
+ if next_transition["plan"]["level"] == "epilogue":
return False
# Note: state_duration is stored as ms, not us
- return offline['us'] < state_duration * 500
+ return offline["us"] < state_duration * 500
def _state_is_too_long(self, online, offline, state_duration, prev_transition):
# If the previous state was left by an interrupt, we may have some
# waiting time left over. So it's okay if the current state is longer
# than expected.
- if prev_transition['plan']['level'] == 'epilogue':
+ if prev_transition["plan"]["level"] == "epilogue":
return False
# state_duration is stored as ms, not us
- return offline['us'] > state_duration * 1500
+ return offline["us"] > state_duration * 1500
def _measurement_is_valid_2(self, processed_data):
"""
@@ -642,8 +687,8 @@ class RawData:
"""
# Check for low-level parser errors
- if processed_data['has_datasource_error']:
- processed_data['error'] = '; '.join(processed_data['datasource_errors'])
+ if processed_data["has_datasource_error"]:
+ processed_data["error"] = "; ".join(processed_data["datasource_errors"])
return False
# Note that the low-level parser (EnergyTraceLog) already checks
@@ -680,26 +725,27 @@ class RawData:
- uW_mean_delta_prev: Differenz zwischen uW_mean und uW_mean des vorherigen Zustands
- uW_mean_delta_next: Differenz zwischen uW_mean und uW_mean des Folgezustands
"""
- setup = self.setup_by_fileno[processed_data['fileno']]
- if 'expected_trace' in processed_data:
- traces = processed_data['expected_trace']
+ setup = self.setup_by_fileno[processed_data["fileno"]]
+ if "expected_trace" in processed_data:
+ traces = processed_data["expected_trace"]
else:
- traces = self.traces_by_fileno[processed_data['fileno']]
- state_duration = setup['state_duration']
+ traces = self.traces_by_fileno[processed_data["fileno"]]
+ state_duration = setup["state_duration"]
# Check MIMOSA error
- if processed_data['has_datasource_error']:
- processed_data['error'] = '; '.join(processed_data['datasource_errors'])
+ if processed_data["has_datasource_error"]:
+ processed_data["error"] = "; ".join(processed_data["datasource_errors"])
return False
# Check trigger count
sched_trigger_count = 0
for run in traces:
- sched_trigger_count += len(run['trace'])
- if sched_trigger_count != processed_data['triggers']:
- processed_data['error'] = 'got {got:d} trigger edges, expected {exp:d}'.format(
- got=processed_data['triggers'],
- exp=sched_trigger_count
+ sched_trigger_count += len(run["trace"])
+ if sched_trigger_count != processed_data["triggers"]:
+ processed_data[
+ "error"
+ ] = "got {got:d} trigger edges, expected {exp:d}".format(
+ got=processed_data["triggers"], exp=sched_trigger_count
)
return False
# Check state durations. Very short or long states can indicate a
@@ -707,62 +753,102 @@ class RawData:
# triggers elsewhere
online_datapoints = []
for run_idx, run in enumerate(traces):
- for trace_part_idx in range(len(run['trace'])):
+ for trace_part_idx in range(len(run["trace"])):
online_datapoints.append((run_idx, trace_part_idx))
for offline_idx, online_ref in enumerate(online_datapoints):
online_run_idx, online_trace_part_idx = online_ref
- offline_trace_part = processed_data['energy_trace'][offline_idx]
- online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx]
+ offline_trace_part = processed_data["energy_trace"][offline_idx]
+ online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx]
if self._parameter_names is None:
- self._parameter_names = sorted(online_trace_part['parameter'].keys())
-
- if sorted(online_trace_part['parameter'].keys()) != self._parameter_names:
- processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want:s}, is {param_is:s}'.format(
- off_idx=offline_idx, on_idx=online_run_idx,
+ self._parameter_names = sorted(online_trace_part["parameter"].keys())
+
+ if sorted(online_trace_part["parameter"].keys()) != self._parameter_names:
+ processed_data[
+ "error"
+ ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want:s}, is {param_is:s}".format(
+ off_idx=offline_idx,
+ on_idx=online_run_idx,
on_sub=online_trace_part_idx,
- on_name=online_trace_part['name'],
+ on_name=online_trace_part["name"],
param_want=self._parameter_names,
- param_is=sorted(online_trace_part['parameter'].keys())
+ param_is=sorted(online_trace_part["parameter"].keys()),
)
- if online_trace_part['isa'] != offline_trace_part['isa']:
- processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) claims to be {off_isa:s}, but should be {on_isa:s}'.format(
- off_idx=offline_idx, on_idx=online_run_idx,
+ if online_trace_part["isa"] != offline_trace_part["isa"]:
+ processed_data[
+ "error"
+ ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) claims to be {off_isa:s}, but should be {on_isa:s}".format(
+ off_idx=offline_idx,
+ on_idx=online_run_idx,
on_sub=online_trace_part_idx,
- on_name=online_trace_part['name'],
- off_isa=offline_trace_part['isa'],
- on_isa=online_trace_part['isa'])
+ on_name=online_trace_part["name"],
+ off_isa=offline_trace_part["isa"],
+ on_isa=online_trace_part["isa"],
+ )
return False
# Clipping in UNINITIALIZED (offline_idx == 0) can happen during
# calibration and is handled by MIMOSA
- if offline_idx != 0 and offline_trace_part['clip_rate'] != 0 and not self.ignore_clipping:
- processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) was clipping {clip:f}% of the time'.format(
- off_idx=offline_idx, on_idx=online_run_idx,
+ if (
+ offline_idx != 0
+ and offline_trace_part["clip_rate"] != 0
+ and not self.ignore_clipping
+ ):
+ processed_data[
+ "error"
+ ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) was clipping {clip:f}% of the time".format(
+ off_idx=offline_idx,
+ on_idx=online_run_idx,
on_sub=online_trace_part_idx,
- on_name=online_trace_part['name'],
- clip=offline_trace_part['clip_rate'] * 100,
+ on_name=online_trace_part["name"],
+ clip=offline_trace_part["clip_rate"] * 100,
)
return False
- if online_trace_part['isa'] == 'state' and online_trace_part['name'] != 'UNINITIALIZED' and len(traces[online_run_idx]['trace']) > online_trace_part_idx + 1:
- online_prev_transition = traces[online_run_idx]['trace'][online_trace_part_idx - 1]
- online_next_transition = traces[online_run_idx]['trace'][online_trace_part_idx + 1]
+ if (
+ online_trace_part["isa"] == "state"
+ and online_trace_part["name"] != "UNINITIALIZED"
+ and len(traces[online_run_idx]["trace"]) > online_trace_part_idx + 1
+ ):
+ online_prev_transition = traces[online_run_idx]["trace"][
+ online_trace_part_idx - 1
+ ]
+ online_next_transition = traces[online_run_idx]["trace"][
+ online_trace_part_idx + 1
+ ]
try:
- if self._state_is_too_short(online_trace_part, offline_trace_part, state_duration, online_next_transition):
- processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too short (duration = {dur:d} us)'.format(
- off_idx=offline_idx, on_idx=online_run_idx,
+ if self._state_is_too_short(
+ online_trace_part,
+ offline_trace_part,
+ state_duration,
+ online_next_transition,
+ ):
+ processed_data[
+ "error"
+ ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too short (duration = {dur:d} us)".format(
+ off_idx=offline_idx,
+ on_idx=online_run_idx,
on_sub=online_trace_part_idx,
- on_name=online_trace_part['name'],
- dur=offline_trace_part['us'])
+ on_name=online_trace_part["name"],
+ dur=offline_trace_part["us"],
+ )
return False
- if self._state_is_too_long(online_trace_part, offline_trace_part, state_duration, online_prev_transition):
- processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too long (duration = {dur:d} us)'.format(
- off_idx=offline_idx, on_idx=online_run_idx,
+ if self._state_is_too_long(
+ online_trace_part,
+ offline_trace_part,
+ state_duration,
+ online_prev_transition,
+ ):
+ processed_data[
+ "error"
+ ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too long (duration = {dur:d} us)".format(
+ off_idx=offline_idx,
+ on_idx=online_run_idx,
on_sub=online_trace_part_idx,
- on_name=online_trace_part['name'],
- dur=offline_trace_part['us'])
+ on_name=online_trace_part["name"],
+ dur=offline_trace_part["us"],
+ )
return False
except KeyError:
pass
@@ -775,136 +861,169 @@ class RawData:
# (appends data from measurement['energy_trace'])
# If measurement['expected_trace'] exists, it is edited in place instead
online_datapoints = []
- if 'expected_trace' in measurement:
- traces = measurement['expected_trace']
- traces = self.traces_by_fileno[measurement['fileno']]
+ if "expected_trace" in measurement:
+ traces = measurement["expected_trace"]
+ traces = self.traces_by_fileno[measurement["fileno"]]
else:
- traces = self.traces_by_fileno[measurement['fileno']]
+ traces = self.traces_by_fileno[measurement["fileno"]]
for run_idx, run in enumerate(traces):
- for trace_part_idx in range(len(run['trace'])):
+ for trace_part_idx in range(len(run["trace"])):
online_datapoints.append((run_idx, trace_part_idx))
for offline_idx, online_ref in enumerate(online_datapoints):
online_run_idx, online_trace_part_idx = online_ref
- offline_trace_part = measurement['energy_trace'][offline_idx]
- online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx]
+ offline_trace_part = measurement["energy_trace"][offline_idx]
+ online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx]
- if 'offline' not in online_trace_part:
- online_trace_part['offline'] = [offline_trace_part]
+ if "offline" not in online_trace_part:
+ online_trace_part["offline"] = [offline_trace_part]
else:
- online_trace_part['offline'].append(offline_trace_part)
+ online_trace_part["offline"].append(offline_trace_part)
- paramkeys = sorted(online_trace_part['parameter'].keys())
+ paramkeys = sorted(online_trace_part["parameter"].keys())
paramvalues = list()
for paramkey in paramkeys:
- if type(online_trace_part['parameter'][paramkey]) is list:
- paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey][measurement['repeat_id']]))
+ if type(online_trace_part["parameter"][paramkey]) is list:
+ paramvalues.append(
+ soft_cast_int(
+ online_trace_part["parameter"][paramkey][
+ measurement["repeat_id"]
+ ]
+ )
+ )
else:
- paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey]))
+ paramvalues.append(
+ soft_cast_int(online_trace_part["parameter"][paramkey])
+ )
# NB: Unscheduled transitions do not have an 'args' field set.
# However, they should only be caused by interrupts, and
# interrupts don't have args anyways.
- if arg_support_enabled and 'args' in online_trace_part:
- paramvalues.extend(map(soft_cast_int, online_trace_part['args']))
-
- if 'offline_aggregates' not in online_trace_part:
- online_trace_part['offline_attributes'] = ['power', 'duration', 'energy']
- online_trace_part['offline_aggregates'] = {
- 'power': [],
- 'duration': [],
- 'power_std': [],
- 'energy': [],
- 'paramkeys': [],
- 'param': [],
+ if arg_support_enabled and "args" in online_trace_part:
+ paramvalues.extend(map(soft_cast_int, online_trace_part["args"]))
+
+ if "offline_aggregates" not in online_trace_part:
+ online_trace_part["offline_attributes"] = [
+ "power",
+ "duration",
+ "energy",
+ ]
+ online_trace_part["offline_aggregates"] = {
+ "power": [],
+ "duration": [],
+ "power_std": [],
+ "energy": [],
+ "paramkeys": [],
+ "param": [],
}
- if online_trace_part['isa'] == 'transition':
- online_trace_part['offline_attributes'].extend(['rel_energy_prev', 'rel_energy_next', 'timeout'])
- online_trace_part['offline_aggregates']['rel_energy_prev'] = []
- online_trace_part['offline_aggregates']['rel_energy_next'] = []
- online_trace_part['offline_aggregates']['timeout'] = []
+ if online_trace_part["isa"] == "transition":
+ online_trace_part["offline_attributes"].extend(
+ ["rel_energy_prev", "rel_energy_next", "timeout"]
+ )
+ online_trace_part["offline_aggregates"]["rel_energy_prev"] = []
+ online_trace_part["offline_aggregates"]["rel_energy_next"] = []
+ online_trace_part["offline_aggregates"]["timeout"] = []
# Note: All state/transitions are 20us "too long" due to injected
# active wait states. These are needed to work around MIMOSA's
# relatively low sample rate of 100 kHz (10us) and removed here.
- online_trace_part['offline_aggregates']['power'].append(
- offline_trace_part['uW_mean'])
- online_trace_part['offline_aggregates']['duration'].append(
- offline_trace_part['us'] - 20)
- online_trace_part['offline_aggregates']['power_std'].append(
- offline_trace_part['uW_std'])
- online_trace_part['offline_aggregates']['energy'].append(
- offline_trace_part['uW_mean'] * (offline_trace_part['us'] - 20))
- online_trace_part['offline_aggregates']['paramkeys'].append(paramkeys)
- online_trace_part['offline_aggregates']['param'].append(paramvalues)
- if online_trace_part['isa'] == 'transition':
- online_trace_part['offline_aggregates']['rel_energy_prev'].append(
- offline_trace_part['uW_mean_delta_prev'] * (offline_trace_part['us'] - 20))
- online_trace_part['offline_aggregates']['rel_energy_next'].append(
- offline_trace_part['uW_mean_delta_next'] * (offline_trace_part['us'] - 20))
- online_trace_part['offline_aggregates']['timeout'].append(
- offline_trace_part['timeout'])
+ online_trace_part["offline_aggregates"]["power"].append(
+ offline_trace_part["uW_mean"]
+ )
+ online_trace_part["offline_aggregates"]["duration"].append(
+ offline_trace_part["us"] - 20
+ )
+ online_trace_part["offline_aggregates"]["power_std"].append(
+ offline_trace_part["uW_std"]
+ )
+ online_trace_part["offline_aggregates"]["energy"].append(
+ offline_trace_part["uW_mean"] * (offline_trace_part["us"] - 20)
+ )
+ online_trace_part["offline_aggregates"]["paramkeys"].append(paramkeys)
+ online_trace_part["offline_aggregates"]["param"].append(paramvalues)
+ if online_trace_part["isa"] == "transition":
+ online_trace_part["offline_aggregates"]["rel_energy_prev"].append(
+ offline_trace_part["uW_mean_delta_prev"]
+ * (offline_trace_part["us"] - 20)
+ )
+ online_trace_part["offline_aggregates"]["rel_energy_next"].append(
+ offline_trace_part["uW_mean_delta_next"]
+ * (offline_trace_part["us"] - 20)
+ )
+ online_trace_part["offline_aggregates"]["timeout"].append(
+ offline_trace_part["timeout"]
+ )
def _merge_online_and_etlog(self, measurement):
# Edits self.traces_by_fileno[measurement['fileno']][*]['trace'][*]['offline']
# and self.traces_by_fileno[measurement['fileno']][*]['trace'][*]['offline_aggregates'] in place
# (appends data from measurement['energy_trace'])
online_datapoints = []
- traces = self.traces_by_fileno[measurement['fileno']]
+ traces = self.traces_by_fileno[measurement["fileno"]]
for run_idx, run in enumerate(traces):
- for trace_part_idx in range(len(run['trace'])):
+ for trace_part_idx in range(len(run["trace"])):
online_datapoints.append((run_idx, trace_part_idx))
for offline_idx, online_ref in enumerate(online_datapoints):
online_run_idx, online_trace_part_idx = online_ref
- offline_trace_part = measurement['energy_trace'][offline_idx]
- online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx]
+ offline_trace_part = measurement["energy_trace"][offline_idx]
+ online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx]
- if 'offline' not in online_trace_part:
- online_trace_part['offline'] = [offline_trace_part]
+ if "offline" not in online_trace_part:
+ online_trace_part["offline"] = [offline_trace_part]
else:
- online_trace_part['offline'].append(offline_trace_part)
+ online_trace_part["offline"].append(offline_trace_part)
- paramkeys = sorted(online_trace_part['parameter'].keys())
+ paramkeys = sorted(online_trace_part["parameter"].keys())
paramvalues = list()
for paramkey in paramkeys:
- if type(online_trace_part['parameter'][paramkey]) is list:
- paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey][measurement['repeat_id']]))
+ if type(online_trace_part["parameter"][paramkey]) is list:
+ paramvalues.append(
+ soft_cast_int(
+ online_trace_part["parameter"][paramkey][
+ measurement["repeat_id"]
+ ]
+ )
+ )
else:
- paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey]))
+ paramvalues.append(
+ soft_cast_int(online_trace_part["parameter"][paramkey])
+ )
# NB: Unscheduled transitions do not have an 'args' field set.
# However, they should only be caused by interrupts, and
# interrupts don't have args anyways.
- if arg_support_enabled and 'args' in online_trace_part:
- paramvalues.extend(map(soft_cast_int, online_trace_part['args']))
-
- if 'offline_aggregates' not in online_trace_part:
- online_trace_part['offline_aggregates'] = {
- 'offline_attributes': ['power', 'duration', 'energy'],
- 'duration': list(),
- 'power': list(),
- 'power_std': list(),
- 'energy': list(),
- 'paramkeys': list(),
- 'param': list()
+ if arg_support_enabled and "args" in online_trace_part:
+ paramvalues.extend(map(soft_cast_int, online_trace_part["args"]))
+
+ if "offline_aggregates" not in online_trace_part:
+ online_trace_part["offline_aggregates"] = {
+ "offline_attributes": ["power", "duration", "energy"],
+ "duration": list(),
+ "power": list(),
+ "power_std": list(),
+ "energy": list(),
+ "paramkeys": list(),
+ "param": list(),
}
- offline_aggregates = online_trace_part['offline_aggregates']
+ offline_aggregates = online_trace_part["offline_aggregates"]
# if online_trace_part['isa'] == 'transitions':
# online_trace_part['offline_attributes'].extend(['rel_energy_prev', 'rel_energy_next'])
# offline_aggregates['rel_energy_prev'] = list()
# offline_aggregates['rel_energy_next'] = list()
- offline_aggregates['duration'].append(offline_trace_part['s'] * 1e6)
- offline_aggregates['power'].append(offline_trace_part['W_mean'] * 1e6)
- offline_aggregates['power_std'].append(offline_trace_part['W_std'] * 1e6)
- offline_aggregates['energy'].append(offline_trace_part['W_mean'] * offline_trace_part['s'] * 1e12)
- offline_aggregates['paramkeys'].append(paramkeys)
- offline_aggregates['param'].append(paramvalues)
+ offline_aggregates["duration"].append(offline_trace_part["s"] * 1e6)
+ offline_aggregates["power"].append(offline_trace_part["W_mean"] * 1e6)
+ offline_aggregates["power_std"].append(offline_trace_part["W_std"] * 1e6)
+ offline_aggregates["energy"].append(
+ offline_trace_part["W_mean"] * offline_trace_part["s"] * 1e12
+ )
+ offline_aggregates["paramkeys"].append(paramkeys)
+ offline_aggregates["param"].append(paramvalues)
# if online_trace_part['isa'] == 'transition':
# offline_aggregates['rel_energy_prev'].append(offline_trace_part['W_mean_delta_prev'] * offline_trace_part['s'] * 1e12)
@@ -922,8 +1041,8 @@ class RawData:
for trace in list_of_traces:
trace_output.extend(trace.copy())
for i, trace in enumerate(trace_output):
- trace['orig_id'] = trace['id']
- trace['id'] = i
+ trace["orig_id"] = trace["id"]
+ trace["id"] = i
return trace_output
def get_preprocessed_data(self, verbose=True):
@@ -1000,25 +1119,29 @@ class RawData:
if version == 0:
with tarfile.open(filename) as tf:
- self.setup_by_fileno.append(json.load(tf.extractfile('setup.json')))
- self.traces_by_fileno.append(json.load(tf.extractfile('src/apps/DriverEval/DriverLog.json')))
+ self.setup_by_fileno.append(json.load(tf.extractfile("setup.json")))
+ self.traces_by_fileno.append(
+ json.load(tf.extractfile("src/apps/DriverEval/DriverLog.json"))
+ )
for member in tf.getmembers():
_, extension = os.path.splitext(member.name)
- if extension == '.mim':
- offline_data.append({
- 'content': tf.extractfile(member).read(),
- 'fileno': i,
- 'info': member,
- 'setup': self.setup_by_fileno[i],
- 'with_traces': self.with_traces,
- })
+ if extension == ".mim":
+ offline_data.append(
+ {
+ "content": tf.extractfile(member).read(),
+ "fileno": i,
+ "info": member,
+ "setup": self.setup_by_fileno[i],
+ "with_traces": self.with_traces,
+ }
+ )
elif version == 1:
new_filenames = list()
with tarfile.open(filename) as tf:
- ptalog = json.load(tf.extractfile(tf.getmember('ptalog.json')))
- self.pta = ptalog['pta']
+ ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json")))
+ self.pta = ptalog["pta"]
# Benchmark code may be too large to be executed in a single
# run, so benchmarks (a benchmark is basically a list of DFA runs)
@@ -1043,33 +1166,37 @@ class RawData:
# ptalog['files'][0][0] is its first iteration/repetition,
# ptalog['files'][0][1] the second, etc.
- for j, traces in enumerate(ptalog['traces']):
- new_filenames.append('{}#{}'.format(filename, j))
+ for j, traces in enumerate(ptalog["traces"]):
+ new_filenames.append("{}#{}".format(filename, j))
self.traces_by_fileno.append(traces)
- self.setup_by_fileno.append({
- 'mimosa_voltage': ptalog['configs'][j]['voltage'],
- 'mimosa_shunt': ptalog['configs'][j]['shunt'],
- 'state_duration': ptalog['opt']['sleep'],
- })
- for repeat_id, mim_file in enumerate(ptalog['files'][j]):
+ self.setup_by_fileno.append(
+ {
+ "mimosa_voltage": ptalog["configs"][j]["voltage"],
+ "mimosa_shunt": ptalog["configs"][j]["shunt"],
+ "state_duration": ptalog["opt"]["sleep"],
+ }
+ )
+ for repeat_id, mim_file in enumerate(ptalog["files"][j]):
member = tf.getmember(mim_file)
- offline_data.append({
- 'content': tf.extractfile(member).read(),
- 'fileno': j,
- 'info': member,
- 'setup': self.setup_by_fileno[j],
- 'repeat_id': repeat_id,
- 'expected_trace': ptalog['traces'][j],
- 'with_traces': self.with_traces,
- })
+ offline_data.append(
+ {
+ "content": tf.extractfile(member).read(),
+ "fileno": j,
+ "info": member,
+ "setup": self.setup_by_fileno[j],
+ "repeat_id": repeat_id,
+ "expected_trace": ptalog["traces"][j],
+ "with_traces": self.with_traces,
+ }
+ )
self.filenames = new_filenames
elif version == 2:
new_filenames = list()
with tarfile.open(filename) as tf:
- ptalog = json.load(tf.extractfile(tf.getmember('ptalog.json')))
- self.pta = ptalog['pta']
+ ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json")))
+ self.pta = ptalog["pta"]
# Benchmark code may be too large to be executed in a single
# run, so benchmarks (a benchmark is basically a list of DFA runs)
@@ -1103,32 +1230,45 @@ class RawData:
# to an invalid measurement and thus power[b] corresponding
# to duration[C]. At the moment, this is harmless, but in the
# future it might not be.
- if 'offline_aggregates' in ptalog['traces'][0][0]['trace'][0]:
- for trace_group in ptalog['traces']:
+ if "offline_aggregates" in ptalog["traces"][0][0]["trace"][0]:
+ for trace_group in ptalog["traces"]:
for trace in trace_group:
- for state_or_transition in trace['trace']:
- offline_aggregates = state_or_transition.pop('offline_aggregates', None)
+ for state_or_transition in trace["trace"]:
+ offline_aggregates = state_or_transition.pop(
+ "offline_aggregates", None
+ )
if offline_aggregates:
- state_or_transition['online_aggregates'] = offline_aggregates
+ state_or_transition[
+ "online_aggregates"
+ ] = offline_aggregates
- for j, traces in enumerate(ptalog['traces']):
- new_filenames.append('{}#{}'.format(filename, j))
+ for j, traces in enumerate(ptalog["traces"]):
+ new_filenames.append("{}#{}".format(filename, j))
self.traces_by_fileno.append(traces)
- self.setup_by_fileno.append({
- 'voltage': ptalog['configs'][j]['voltage'],
- 'state_duration': ptalog['opt']['sleep'],
- })
- for repeat_id, etlog_file in enumerate(ptalog['files'][j]):
+ self.setup_by_fileno.append(
+ {
+ "voltage": ptalog["configs"][j]["voltage"],
+ "state_duration": ptalog["opt"]["sleep"],
+ }
+ )
+ for repeat_id, etlog_file in enumerate(ptalog["files"][j]):
member = tf.getmember(etlog_file)
- offline_data.append({
- 'content': tf.extractfile(member).read(),
- 'fileno': j,
- 'info': member,
- 'setup': self.setup_by_fileno[j],
- 'repeat_id': repeat_id,
- 'expected_trace': ptalog['traces'][j],
- 'transition_names': list(map(lambda x: x['name'], ptalog['pta']['transitions']))
- })
+ offline_data.append(
+ {
+ "content": tf.extractfile(member).read(),
+ "fileno": j,
+ "info": member,
+ "setup": self.setup_by_fileno[j],
+ "repeat_id": repeat_id,
+ "expected_trace": ptalog["traces"][j],
+ "transition_names": list(
+ map(
+ lambda x: x["name"],
+ ptalog["pta"]["transitions"],
+ )
+ ),
+ }
+ )
self.filenames = new_filenames
# TODO remove 'offline_aggregates' from pre-parse data and place
# it under 'online_aggregates' or similar instead. This way, if
@@ -1145,52 +1285,69 @@ class RawData:
num_valid = 0
for measurement in measurements:
- if 'energy_trace' not in measurement:
- vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format(
- ar=self.filenames[measurement['fileno']],
- m=measurement['info'].name,
- e='; '.join(measurement['datasource_errors'])))
+ if "energy_trace" not in measurement:
+ vprint(
+ self.verbose,
+ "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ ar=self.filenames[measurement["fileno"]],
+ m=measurement["info"].name,
+ e="; ".join(measurement["datasource_errors"]),
+ ),
+ )
continue
if version == 0:
# Strip the last state (it is not part of the scheduled measurement)
- measurement['energy_trace'].pop()
+ measurement["energy_trace"].pop()
elif version == 1:
# The first online measurement is the UNINITIALIZED state. In v1,
# it is not part of the expected PTA trace -> remove it.
- measurement['energy_trace'].pop(0)
+ measurement["energy_trace"].pop(0)
if version == 0 or version == 1:
if self._measurement_is_valid_01(measurement):
self._merge_online_and_offline(measurement)
num_valid += 1
else:
- vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format(
- ar=self.filenames[measurement['fileno']],
- m=measurement['info'].name,
- e=measurement['error']))
+ vprint(
+ self.verbose,
+ "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ ar=self.filenames[measurement["fileno"]],
+ m=measurement["info"].name,
+ e=measurement["error"],
+ ),
+ )
elif version == 2:
if self._measurement_is_valid_2(measurement):
self._merge_online_and_etlog(measurement)
num_valid += 1
else:
- vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format(
- ar=self.filenames[measurement['fileno']],
- m=measurement['info'].name,
- e=measurement['error']))
- vprint(self.verbose, '[I] {num_valid:d}/{num_total:d} measurements are valid'.format(
- num_valid=num_valid,
- num_total=len(measurements)))
+ vprint(
+ self.verbose,
+ "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ ar=self.filenames[measurement["fileno"]],
+ m=measurement["info"].name,
+ e=measurement["error"],
+ ),
+ )
+ vprint(
+ self.verbose,
+ "[I] {num_valid:d}/{num_total:d} measurements are valid".format(
+ num_valid=num_valid, num_total=len(measurements)
+ ),
+ )
if version == 0:
self.traces = self._concatenate_traces(self.traces_by_fileno)
elif version == 1:
- self.traces = self._concatenate_traces(map(lambda x: x['expected_trace'], measurements))
+ self.traces = self._concatenate_traces(
+ map(lambda x: x["expected_trace"], measurements)
+ )
self.traces = self._concatenate_traces(self.traces_by_fileno)
elif version == 2:
self.traces = self._concatenate_traces(self.traces_by_fileno)
self.preprocessing_stats = {
- 'num_runs': len(measurements),
- 'num_valid': num_valid
+ "num_runs": len(measurements),
+ "num_valid": num_valid,
}
@@ -1207,16 +1364,33 @@ class ParallelParamFit:
self.fit_queue = []
self.by_param = by_param
- def enqueue(self, state_or_tran, attribute, param_index, param_name, safe_functions_enabled=False, param_filter=None):
+ def enqueue(
+ self,
+ state_or_tran,
+ attribute,
+ param_index,
+ param_name,
+ safe_functions_enabled=False,
+ param_filter=None,
+ ):
"""
Add state_or_tran/attribute/param_name to fit queue.
This causes fit() to compute the best-fitting function for this model part.
"""
- self.fit_queue.append({
- 'key': [state_or_tran, attribute, param_name, param_filter],
- 'args': [self.by_param, state_or_tran, attribute, param_index, safe_functions_enabled, param_filter]
- })
+ self.fit_queue.append(
+ {
+ "key": [state_or_tran, attribute, param_name, param_filter],
+ "args": [
+ self.by_param,
+ state_or_tran,
+ attribute,
+ param_index,
+ safe_functions_enabled,
+ param_filter,
+ ],
+ }
+ )
def fit(self):
"""
@@ -1236,13 +1410,17 @@ def _try_fits_parallel(arg):
Must be a global function as it is called from a multiprocessing Pool.
"""
- return {
- 'key': arg['key'],
- 'result': _try_fits(*arg['args'])
- }
+ return {"key": arg["key"], "result": _try_fits(*arg["args"])}
-def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functions_enabled=False, param_filter: dict = None):
+def _try_fits(
+ by_param,
+ state_or_tran,
+ model_attribute,
+ param_index,
+ safe_functions_enabled=False,
+ param_filter: dict = None,
+):
"""
Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
@@ -1281,22 +1459,28 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi
function_names = list(functions.keys())
for function_name in function_names:
function_object = functions[function_name]
- if is_numeric(param_key[1][param_index]) and not function_object.is_valid(param_key[1][param_index]):
+ if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
+ param_key[1][param_index]
+ ):
functions.pop(function_name, None)
raw_results = dict()
raw_results_by_param = dict()
- ref_results = {
- 'mean': list(),
- 'median': list()
- }
+ ref_results = {"mean": list(), "median": list()}
results = dict()
results_by_param = dict()
seen_parameter_combinations = set()
# for each parameter combination:
- for param_key in filter(lambda x: x[0] == state_or_tran and remove_index_from_tuple(x[1], param_index) not in seen_parameter_combinations and len(by_param[x]['param']) and match_parameter_values(by_param[x]['param'][0], param_filter), by_param.keys()):
+ for param_key in filter(
+ lambda x: x[0] == state_or_tran
+ and remove_index_from_tuple(x[1], param_index)
+ not in seen_parameter_combinations
+ and len(by_param[x]["param"])
+ and match_parameter_values(by_param[x]["param"][0], param_filter),
+ by_param.keys(),
+ ):
X = []
Y = []
num_valid = 0
@@ -1304,10 +1488,14 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi
# Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
# the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
- seen_parameter_combinations.add(remove_index_from_tuple(param_key[1], param_index))
+ seen_parameter_combinations.add(
+ remove_index_from_tuple(param_key[1], param_index)
+ )
# for each value of the parameter denoted by param_index (all other parameters remain the same):
- for k, v in filter(lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()):
+ for k, v in filter(
+ lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
+ ):
num_total += 1
if is_numeric(k[1][param_index]):
num_valid += 1
@@ -1324,7 +1512,9 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi
if function_name not in raw_results:
raw_results[function_name] = dict()
error_function = param_function.error_function
- res = optimize.least_squares(error_function, [0, 1], args=(X, Y), xtol=2e-15)
+ res = optimize.least_squares(
+ error_function, [0, 1], args=(X, Y), xtol=2e-15
+ )
measures = regression_measures(param_function.eval(res.x, X), Y)
raw_results_by_param[other_parameters][function_name] = measures
for measure, error_rate in measures.items():
@@ -1333,38 +1523,37 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi
raw_results[function_name][measure].append(error_rate)
# print(function_name, res, measures)
mean_measures = aggregate_measures(np.mean(Y), Y)
- ref_results['mean'].append(mean_measures['rmsd'])
- raw_results_by_param[other_parameters]['mean'] = mean_measures
+ ref_results["mean"].append(mean_measures["rmsd"])
+ raw_results_by_param[other_parameters]["mean"] = mean_measures
median_measures = aggregate_measures(np.median(Y), Y)
- ref_results['median'].append(median_measures['rmsd'])
- raw_results_by_param[other_parameters]['median'] = median_measures
+ ref_results["median"].append(median_measures["rmsd"])
+ raw_results_by_param[other_parameters]["median"] = median_measures
- if not len(ref_results['mean']):
+ if not len(ref_results["mean"]):
# Insufficient data for fitting
# print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
- return {
- 'best': None,
- 'best_rmsd': np.inf,
- 'results': results
- }
+ return {"best": None, "best_rmsd": np.inf, "results": results}
- for other_parameter_combination, other_parameter_results in raw_results_by_param.items():
+ for (
+ other_parameter_combination,
+ other_parameter_results,
+ ) in raw_results_by_param.items():
best_fit_val = np.inf
best_fit_name = None
results = dict()
for function_name, result in other_parameter_results.items():
if len(result) > 0:
results[function_name] = result
- rmsd = result['rmsd']
+ rmsd = result["rmsd"]
if rmsd < best_fit_val:
best_fit_val = rmsd
best_fit_name = function_name
results_by_param[other_parameter_combination] = {
- 'best': best_fit_name,
- 'best_rmsd': best_fit_val,
- 'mean_rmsd': results['mean']['rmsd'],
- 'median_rmsd': results['median']['rmsd'],
- 'results': results
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": results["mean"]["rmsd"],
+ "median_rmsd": results["median"]["rmsd"],
+ "results": results,
}
best_fit_val = np.inf
@@ -1375,26 +1564,26 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi
results[function_name] = {}
for measure in result.keys():
results[function_name][measure] = np.mean(result[measure])
- rmsd = results[function_name]['rmsd']
+ rmsd = results[function_name]["rmsd"]
if rmsd < best_fit_val:
best_fit_val = rmsd
best_fit_name = function_name
return {
- 'best': best_fit_name,
- 'best_rmsd': best_fit_val,
- 'mean_rmsd': np.mean(ref_results['mean']),
- 'median_rmsd': np.mean(ref_results['median']),
- 'results': results,
- 'results_by_other_param': results_by_param
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": np.mean(ref_results["mean"]),
+ "median_rmsd": np.mean(ref_results["median"]),
+ "results": results,
+ "results_by_other_param": results_by_param,
}
def _num_args_from_by_name(by_name):
num_args = dict()
for key, value in by_name.items():
- if 'args' in value:
- num_args[key] = len(value['args'][0])
+ if "args" in value:
+ num_args[key] = len(value["args"][0])
return num_args
@@ -1413,19 +1602,44 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict =
"""
fit_result = dict()
for result in results:
- if result['key'][0] == name and result['key'][1] == attribute and result['key'][3] == param_filter and result['result']['best'] is not None: # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
- this_result = result['result']
- if this_result['best_rmsd'] >= min(this_result['mean_rmsd'], this_result['median_rmsd']):
- vprint(verbose, '[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})'.format(
- name, attribute, result['key'][2], this_result['best_rmsd'],
- this_result['mean_rmsd'], this_result['median_rmsd']))
+ if (
+ result["key"][0] == name
+ and result["key"][1] == attribute
+ and result["key"][3] == param_filter
+ and result["result"]["best"] is not None
+ ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
+ this_result = result["result"]
+ if this_result["best_rmsd"] >= min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ vprint(
+ verbose,
+ "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ ),
+ )
# See notes on depends_on_param
- elif this_result['best_rmsd'] >= 0.8 * min(this_result['mean_rmsd'], this_result['median_rmsd']):
- vprint(verbose, '[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})'.format(
- name, attribute, result['key'][2], this_result['best_rmsd'],
- this_result['mean_rmsd'], this_result['median_rmsd']))
+ elif this_result["best_rmsd"] >= 0.8 * min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ vprint(
+ verbose,
+ "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ ),
+ )
else:
- fit_result[result['key'][2]] = this_result
+ fit_result[result["key"][2]] = this_result
return fit_result
@@ -1471,7 +1685,15 @@ class AnalyticModel:
assess -- calculate model quality
"""
- def __init__(self, by_name, parameters, arg_count=None, function_override=dict(), verbose=True, use_corrcoef=False):
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count=None,
+ function_override=dict(),
+ verbose=True,
+ use_corrcoef=False,
+ ):
"""
Create a new AnalyticModel and compute parameter statistics.
@@ -1521,19 +1743,29 @@ class AnalyticModel:
if self._num_args is None:
self._num_args = _num_args_from_by_name(by_name)
- self.stats = ParamStats(self.by_name, self.by_param, self.parameters, self._num_args, verbose=verbose, use_corrcoef=use_corrcoef)
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self.parameters,
+ self._num_args,
+ verbose=verbose,
+ use_corrcoef=use_corrcoef,
+ )
def _get_model_from_dict(self, model_dict, model_function):
model = {}
for name, elem in model_dict.items():
model[name] = {}
- for key in elem['attributes']:
+ for key in elem["attributes"]:
try:
model[name][key] = model_function(elem[key])
except RuntimeWarning:
- vprint(self.verbose, '[W] Got no data for {} {}'.format(name, key))
+ vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
except FloatingPointError as fpe:
- vprint(self.verbose, '[W] Got no data for {} {}: {}'.format(name, key, fpe))
+ vprint(
+ self.verbose,
+ "[W] Got no data for {} {}: {}".format(name, key, fpe),
+ )
return model
def param_index(self, param_name):
@@ -1596,22 +1828,28 @@ class AnalyticModel:
model_function(name, attribute, param=parameter values) -> model value.
model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
"""
- if 'fitted_model_getter' in self.cache and 'fitted_info_getter' in self.cache:
- return self.cache['fitted_model_getter'], self.cache['fitted_info_getter']
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
static_model = self._get_model_from_dict(self.by_name, np.median)
param_model = dict([[name, {}] for name in self.by_name.keys()])
paramfit = ParallelParamFit(self.by_param)
for name in self.by_name.keys():
- for attribute in self.by_name[name]['attributes']:
+ for attribute in self.by_name[name]["attributes"]:
for param_index, param in enumerate(self.parameters):
if self.stats.depends_on_param(name, attribute, param):
paramfit.enqueue(name, attribute, param_index, param, False)
if arg_support_enabled and name in self._num_args:
for arg_index in range(self._num_args[name]):
if self.stats.depends_on_arg(name, attribute, arg_index):
- paramfit.enqueue(name, attribute, len(self.parameters) + arg_index, arg_index, False)
+ paramfit.enqueue(
+ name,
+ attribute,
+ len(self.parameters) + arg_index,
+ arg_index,
+ False,
+ )
paramfit.fit()
@@ -1619,8 +1857,10 @@ class AnalyticModel:
num_args = 0
if name in self._num_args:
num_args = self._num_args[name]
- for attribute in self.by_name[name]['attributes']:
- fit_result = get_fit_result(paramfit.results, name, attribute, self.verbose)
+ for attribute in self.by_name[name]["attributes"]:
+ fit_result = get_fit_result(
+ paramfit.results, name, attribute, self.verbose
+ )
if (name, attribute) in self.function_override:
function_str = self.function_override[(name, attribute)]
@@ -1628,25 +1868,27 @@ class AnalyticModel:
x.fit(self.by_param, name, attribute)
if x.fit_success:
param_model[name][attribute] = {
- 'fit_result': fit_result,
- 'function': x
+ "fit_result": fit_result,
+ "function": x,
}
elif len(fit_result.keys()):
- x = analytic.function_powerset(fit_result, self.parameters, num_args)
+ x = analytic.function_powerset(
+ fit_result, self.parameters, num_args
+ )
x.fit(self.by_param, name, attribute)
if x.fit_success:
param_model[name][attribute] = {
- 'fit_result': fit_result,
- 'function': x
+ "fit_result": fit_result,
+ "function": x,
}
def model_getter(name, key, **kwargs):
- if 'arg' in kwargs and 'param' in kwargs:
- kwargs['param'].extend(map(soft_cast_int, kwargs['arg']))
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
if key in param_model[name]:
- param_list = kwargs['param']
- param_function = param_model[name][key]['function']
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
if param_function.is_predictable(param_list):
return param_function.eval(param_list)
return static_model[name][key]
@@ -1656,8 +1898,8 @@ class AnalyticModel:
return param_model[name][key]
return None
- self.cache['fitted_model_getter'] = model_getter
- self.cache['fitted_info_getter'] = info_getter
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
return model_getter, info_getter
@@ -1677,13 +1919,22 @@ class AnalyticModel:
detailed_results = {}
for name, elem in sorted(self.by_name.items()):
detailed_results[name] = {}
- for attribute in elem['attributes']:
- predicted_data = np.array(list(map(lambda i: model_function(name, attribute, param=elem['param'][i]), range(len(elem[attribute])))))
+ for attribute in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(
+ name, attribute, param=elem["param"][i]
+ ),
+ range(len(elem[attribute])),
+ )
+ )
+ )
measures = regression_measures(predicted_data, elem[attribute])
detailed_results[name][attribute] = measures
return {
- 'by_name': detailed_results,
+ "by_name": detailed_results,
}
def to_json(self):
@@ -1695,25 +1946,28 @@ def _add_trace_data_to_aggregate(aggregate, key, element):
# Only cares about element['isa'], element['offline_aggregates'], and
# element['plan']['level']
if key not in aggregate:
- aggregate[key] = {
- 'isa': element['isa']
- }
- for datakey in element['offline_aggregates'].keys():
+ aggregate[key] = {"isa": element["isa"]}
+ for datakey in element["offline_aggregates"].keys():
aggregate[key][datakey] = []
- if element['isa'] == 'state':
- aggregate[key]['attributes'] = ['power']
+ if element["isa"] == "state":
+ aggregate[key]["attributes"] = ["power"]
else:
# TODO do not hardcode values
- aggregate[key]['attributes'] = ['duration', 'energy', 'rel_energy_prev', 'rel_energy_next']
+ aggregate[key]["attributes"] = [
+ "duration",
+ "energy",
+ "rel_energy_prev",
+ "rel_energy_next",
+ ]
# Uncomment this line if you also want to analyze mean transition power
# aggrgate[key]['attributes'].append('power')
- if 'plan' in element and element['plan']['level'] == 'epilogue':
- aggregate[key]['attributes'].insert(0, 'timeout')
- attributes = aggregate[key]['attributes'].copy()
+ if "plan" in element and element["plan"]["level"] == "epilogue":
+ aggregate[key]["attributes"].insert(0, "timeout")
+ attributes = aggregate[key]["attributes"].copy()
for attribute in attributes:
- if attribute not in element['offline_aggregates']:
- aggregate[key]['attributes'].remove(attribute)
- for datakey, dataval in element['offline_aggregates'].items():
+ if attribute not in element["offline_aggregates"]:
+ aggregate[key]["attributes"].remove(attribute)
+ for datakey, dataval in element["offline_aggregates"].items():
aggregate[key][datakey].extend(dataval)
@@ -1771,16 +2025,20 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]):
"""
arg_count = dict()
by_name = dict()
- parameter_names = sorted(traces[0]['trace'][0]['parameter'].keys())
+ parameter_names = sorted(traces[0]["trace"][0]["parameter"].keys())
for run in traces:
- if run['id'] not in ignore_trace_indexes:
- for elem in run['trace']:
- if elem['isa'] == 'transition' and not elem['name'] in arg_count and 'args' in elem:
- arg_count[elem['name']] = len(elem['args'])
- if elem['name'] != 'UNINITIALIZED':
- _add_trace_data_to_aggregate(by_name, elem['name'], elem)
+ if run["id"] not in ignore_trace_indexes:
+ for elem in run["trace"]:
+ if (
+ elem["isa"] == "transition"
+ and not elem["name"] in arg_count
+ and "args" in elem
+ ):
+ arg_count[elem["name"]] = len(elem["args"])
+ if elem["name"] != "UNINITIALIZED":
+ _add_trace_data_to_aggregate(by_name, elem["name"], elem)
for elem in by_name.values():
- for key in elem['attributes']:
+ for key in elem["attributes"]:
elem[key] = np.array(elem[key])
return by_name, parameter_names, arg_count
@@ -1817,7 +2075,19 @@ class PTAModel:
- rel_energy_next: transition energy relative to next state mean power in pJ
"""
- def __init__(self, by_name, parameters, arg_count, traces=[], ignore_trace_indexes=[], discard_outliers=None, function_override={}, verbose=True, use_corrcoef=False, pta=None):
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count,
+ traces=[],
+ ignore_trace_indexes=[],
+ discard_outliers=None,
+ function_override={},
+ verbose=True,
+ use_corrcoef=False,
+ pta=None,
+ ):
"""
Prepare a new PTA energy model.
@@ -1854,9 +2124,16 @@ class PTAModel:
self._num_args = arg_count
self._use_corrcoef = use_corrcoef
self.traces = traces
- self.stats = ParamStats(self.by_name, self.by_param, self._parameter_names, self._num_args, self._use_corrcoef, verbose=verbose)
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self._parameter_names,
+ self._num_args,
+ self._use_corrcoef,
+ verbose=verbose,
+ )
self.cache = {}
- np.seterr('raise')
+ np.seterr("raise")
self._outlier_threshold = discard_outliers
self.function_override = function_override.copy()
self.verbose = verbose
@@ -1866,7 +2143,7 @@ class PTAModel:
def _aggregate_to_ndarray(self, aggregate):
for elem in aggregate.values():
- for key in elem['attributes']:
+ for key in elem["attributes"]:
elem[key] = np.array(elem[key])
# This heuristic is very similar to the "function is not much better than
@@ -1884,13 +2161,16 @@ class PTAModel:
model = {}
for name, elem in model_dict.items():
model[name] = {}
- for key in elem['attributes']:
+ for key in elem["attributes"]:
try:
model[name][key] = model_function(elem[key])
except RuntimeWarning:
- vprint(self.verbose, '[W] Got no data for {} {}'.format(name, key))
+ vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
except FloatingPointError as fpe:
- vprint(self.verbose, '[W] Got no data for {} {}: {}'.format(name, key, fpe))
+ vprint(
+ self.verbose,
+ "[W] Got no data for {} {}: {}".format(name, key, fpe),
+ )
return model
def get_static(self, use_mean=False):
@@ -1953,63 +2233,110 @@ class PTAModel:
model_function(name, attribute, param=parameter values) -> model value.
model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
"""
- if 'fitted_model_getter' in self.cache and 'fitted_info_getter' in self.cache:
- return self.cache['fitted_model_getter'], self.cache['fitted_info_getter']
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict([[state_or_tran, {}] for state_or_tran in self.by_name.keys()])
+ param_model = dict(
+ [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
+ )
paramfit = ParallelParamFit(self.by_param)
for state_or_tran in self.by_name.keys():
- for model_attribute in self.by_name[state_or_tran]['attributes']:
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
fit_results = {}
for parameter_index, parameter_name in enumerate(self._parameter_names):
- if self.depends_on_param(state_or_tran, model_attribute, parameter_name):
- paramfit.enqueue(state_or_tran, model_attribute, parameter_index, parameter_name, safe_functions_enabled)
- for codependent_param_dict in self.stats.codependent_parameter_value_dicts(state_or_tran, model_attribute, parameter_name):
- paramfit.enqueue(state_or_tran, model_attribute, parameter_index, parameter_name, safe_functions_enabled, codependent_param_dict)
- if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition':
+ if self.depends_on_param(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ parameter_index,
+ parameter_name,
+ safe_functions_enabled,
+ )
+ for (
+ codependent_param_dict
+ ) in self.stats.codependent_parameter_value_dicts(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ parameter_index,
+ parameter_name,
+ safe_functions_enabled,
+ codependent_param_dict,
+ )
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
for arg_index in range(self._num_args[state_or_tran]):
- if self.depends_on_arg(state_or_tran, model_attribute, arg_index):
- paramfit.enqueue(state_or_tran, model_attribute, len(self._parameter_names) + arg_index, arg_index, safe_functions_enabled)
+ if self.depends_on_arg(
+ state_or_tran, model_attribute, arg_index
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ len(self._parameter_names) + arg_index,
+ arg_index,
+ safe_functions_enabled,
+ )
paramfit.fit()
for state_or_tran in self.by_name.keys():
num_args = 0
- if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition':
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
num_args = self._num_args[state_or_tran]
- for model_attribute in self.by_name[state_or_tran]['attributes']:
- fit_results = get_fit_result(paramfit.results, state_or_tran, model_attribute, self.verbose)
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
+ fit_results = get_fit_result(
+ paramfit.results, state_or_tran, model_attribute, self.verbose
+ )
for parameter_name in self._parameter_names:
- if self.depends_on_param(state_or_tran, model_attribute, parameter_name):
- for codependent_param_dict in self.stats.codependent_parameter_value_dicts(state_or_tran, model_attribute, parameter_name):
+ if self.depends_on_param(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ for (
+ codependent_param_dict
+ ) in self.stats.codependent_parameter_value_dicts(
+ state_or_tran, model_attribute, parameter_name
+ ):
pass
# FIXME get_fit_result hat ja gar keinen Parameter als Argument...
if (state_or_tran, model_attribute) in self.function_override:
- function_str = self.function_override[(state_or_tran, model_attribute)]
+ function_str = self.function_override[
+ (state_or_tran, model_attribute)
+ ]
x = AnalyticFunction(function_str, self._parameter_names, num_args)
x.fit(self.by_param, state_or_tran, model_attribute)
if x.fit_success:
param_model[state_or_tran][model_attribute] = {
- 'fit_result': fit_results,
- 'function': x
+ "fit_result": fit_results,
+ "function": x,
}
elif len(fit_results.keys()):
- x = analytic.function_powerset(fit_results, self._parameter_names, num_args)
+ x = analytic.function_powerset(
+ fit_results, self._parameter_names, num_args
+ )
x.fit(self.by_param, state_or_tran, model_attribute)
if x.fit_success:
param_model[state_or_tran][model_attribute] = {
- 'fit_result': fit_results,
- 'function': x
+ "fit_result": fit_results,
+ "function": x,
}
def model_getter(name, key, **kwargs):
- if 'arg' in kwargs and 'param' in kwargs:
- kwargs['param'].extend(map(soft_cast_int, kwargs['arg']))
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
if key in param_model[name]:
- param_list = kwargs['param']
- param_function = param_model[name][key]['function']
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
if param_function.is_predictable(param_list):
return param_function.eval(param_list)
return static_model[name][key]
@@ -2019,8 +2346,8 @@ class PTAModel:
return param_model[name][key]
return None
- self.cache['fitted_model_getter'] = model_getter
- self.cache['fitted_info_getter'] = info_getter
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
return model_getter, info_getter
@@ -2029,16 +2356,32 @@ class PTAModel:
static_quality = self.assess(static_model)
param_model, param_info = self.get_fitted()
analytic_quality = self.assess(param_model)
- self.pta.update(static_model, param_info, static_error=static_quality['by_name'], analytic_error=analytic_quality['by_name'])
+ self.pta.update(
+ static_model,
+ param_info,
+ static_error=static_quality["by_name"],
+ analytic_error=analytic_quality["by_name"],
+ )
return self.pta.to_json()
def states(self):
"""Return sorted list of state names."""
- return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'state', self.by_name.keys())))
+ return sorted(
+ list(
+ filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
+ )
+ )
def transitions(self):
"""Return sorted list of transition names."""
- return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'transition', self.by_name.keys())))
+ return sorted(
+ list(
+ filter(
+ lambda k: self.by_name[k]["isa"] == "transition",
+ self.by_name.keys(),
+ )
+ )
+ )
def states_and_transitions(self):
"""Return list of states and transition names."""
@@ -2050,7 +2393,7 @@ class PTAModel:
return self._parameter_names
def attributes(self, state_or_trans):
- return self.by_name[state_or_trans]['attributes']
+ return self.by_name[state_or_trans]["attributes"]
def assess(self, model_function):
"""
@@ -2068,16 +2411,23 @@ class PTAModel:
detailed_results = {}
for name, elem in sorted(self.by_name.items()):
detailed_results[name] = {}
- for key in elem['attributes']:
- predicted_data = np.array(list(map(lambda i: model_function(name, key, param=elem['param'][i]), range(len(elem[key])))))
+ for key in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(name, key, param=elem["param"][i]),
+ range(len(elem[key])),
+ )
+ )
+ )
measures = regression_measures(predicted_data, elem[key])
detailed_results[name][key] = measures
- return {
- 'by_name': detailed_results
- }
+ return {"by_name": detailed_results}
- def assess_states(self, model_function, model_attribute='power', distribution: dict = None):
+ def assess_states(
+ self, model_function, model_attribute="power", distribution: dict = None
+ ):
"""
Calculate overall model error assuming equal distribution of states
"""
@@ -2089,7 +2439,9 @@ class PTAModel:
distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
if not np.isclose(sum(distribution.values()), 1):
- raise ValueError('distribution must be a probability distribution with sum 1')
+ raise ValueError(
+ "distribution must be a probability distribution with sum 1"
+ )
# total_value = None
# try:
@@ -2097,7 +2449,17 @@ class PTAModel:
# except KeyError:
# pass
- total_error = np.sqrt(sum(map(lambda x: np.square(model_quality['by_name'][x][model_attribute]['mae'] * distribution[x]), self.states())))
+ total_error = np.sqrt(
+ sum(
+ map(
+ lambda x: np.square(
+ model_quality["by_name"][x][model_attribute]["mae"]
+ * distribution[x]
+ ),
+ self.states(),
+ )
+ )
+ )
return total_error
def assess_on_traces(self, model_function):
@@ -2118,44 +2480,72 @@ class PTAModel:
real_timeout_list = []
for trace in self.traces:
- if trace['id'] not in self.ignore_trace_indexes:
- for rep_id in range(len(trace['trace'][0]['offline'])):
- model_energy = 0.
- real_energy = 0.
- model_rel_energy = 0.
- model_state_energy = 0.
- model_duration = 0.
- real_duration = 0.
- model_timeout = 0.
- real_timeout = 0.
- for i, trace_part in enumerate(trace['trace']):
- name = trace_part['name']
- prev_name = trace['trace'][i - 1]['name']
- isa = trace_part['isa']
- if name != 'UNINITIALIZED':
+ if trace["id"] not in self.ignore_trace_indexes:
+ for rep_id in range(len(trace["trace"][0]["offline"])):
+ model_energy = 0.0
+ real_energy = 0.0
+ model_rel_energy = 0.0
+ model_state_energy = 0.0
+ model_duration = 0.0
+ real_duration = 0.0
+ model_timeout = 0.0
+ real_timeout = 0.0
+ for i, trace_part in enumerate(trace["trace"]):
+ name = trace_part["name"]
+ prev_name = trace["trace"][i - 1]["name"]
+ isa = trace_part["isa"]
+ if name != "UNINITIALIZED":
try:
- param = trace_part['offline_aggregates']['param'][rep_id]
- prev_param = trace['trace'][i - 1]['offline_aggregates']['param'][rep_id]
- power = trace_part['offline'][rep_id]['uW_mean']
- duration = trace_part['offline'][rep_id]['us']
- prev_duration = trace['trace'][i - 1]['offline'][rep_id]['us']
+ param = trace_part["offline_aggregates"]["param"][
+ rep_id
+ ]
+ prev_param = trace["trace"][i - 1][
+ "offline_aggregates"
+ ]["param"][rep_id]
+ power = trace_part["offline"][rep_id]["uW_mean"]
+ duration = trace_part["offline"][rep_id]["us"]
+ prev_duration = trace["trace"][i - 1]["offline"][
+ rep_id
+ ]["us"]
real_energy += power * duration
- if isa == 'state':
- model_energy += model_function(name, 'power', param=param) * duration
+ if isa == "state":
+ model_energy += (
+ model_function(name, "power", param=param)
+ * duration
+ )
else:
- model_energy += model_function(name, 'energy', param=param)
+ model_energy += model_function(
+ name, "energy", param=param
+ )
# If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
if i == 1:
- model_rel_energy += model_function(name, 'energy', param=param)
+ model_rel_energy += model_function(
+ name, "energy", param=param
+ )
else:
- model_rel_energy += model_function(prev_name, 'power', param=prev_param) * (prev_duration + duration)
- model_state_energy += model_function(prev_name, 'power', param=prev_param) * (prev_duration + duration)
- model_rel_energy += model_function(name, 'rel_energy_prev', param=param)
+ model_rel_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_state_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_rel_energy += model_function(
+ name, "rel_energy_prev", param=param
+ )
real_duration += duration
- model_duration += model_function(name, 'duration', param=param)
- if 'plan' in trace_part and trace_part['plan']['level'] == 'epilogue':
- real_timeout += trace_part['offline'][rep_id]['timeout']
- model_timeout += model_function(name, 'timeout', param=param)
+ model_duration += model_function(
+ name, "duration", param=param
+ )
+ if (
+ "plan" in trace_part
+ and trace_part["plan"]["level"] == "epilogue"
+ ):
+ real_timeout += trace_part["offline"][rep_id][
+ "timeout"
+ ]
+ model_timeout += model_function(
+ name, "timeout", param=param
+ )
except KeyError:
# if states/transitions have been removed via --filter-param, this is harmless
pass
@@ -2169,11 +2559,21 @@ class PTAModel:
model_timeout_list.append(model_timeout)
return {
- 'duration_by_trace': regression_measures(np.array(model_duration_list), np.array(real_duration_list)),
- 'energy_by_trace': regression_measures(np.array(model_energy_list), np.array(real_energy_list)),
- 'timeout_by_trace': regression_measures(np.array(model_timeout_list), np.array(real_timeout_list)),
- 'rel_energy_by_trace': regression_measures(np.array(model_rel_energy_list), np.array(real_energy_list)),
- 'state_energy_by_trace': regression_measures(np.array(model_state_energy_list), np.array(real_energy_list)),
+ "duration_by_trace": regression_measures(
+ np.array(model_duration_list), np.array(real_duration_list)
+ ),
+ "energy_by_trace": regression_measures(
+ np.array(model_energy_list), np.array(real_energy_list)
+ ),
+ "timeout_by_trace": regression_measures(
+ np.array(model_timeout_list), np.array(real_timeout_list)
+ ),
+ "rel_energy_by_trace": regression_measures(
+ np.array(model_rel_energy_list), np.array(real_energy_list)
+ ),
+ "state_energy_by_trace": regression_measures(
+ np.array(model_state_energy_list), np.array(real_energy_list)
+ ),
}
@@ -2230,17 +2630,19 @@ class EnergyTraceLog:
"""
if not zbar_available:
- self.errors.append('zbar module is not available. Try "apt install python3-zbar"')
+ self.errors.append(
+ 'zbar module is not available. Try "apt install python3-zbar"'
+ )
return list()
- lines = log_data.decode('ascii').split('\n')
- data_count = sum(map(lambda x: len(x) > 0 and x[0] != '#', lines))
- data_lines = filter(lambda x: len(x) > 0 and x[0] != '#', lines)
+ lines = log_data.decode("ascii").split("\n")
+ data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines))
+ data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines)
data = np.empty((data_count, 4))
for i, line in enumerate(data_lines):
- fields = line.split(' ')
+ fields = line.split(" ")
if len(fields) == 4:
timestamp, current, voltage, total_energy = map(int, fields)
elif len(fields) == 5:
@@ -2252,15 +2654,26 @@ class EnergyTraceLog:
self.interval_start_timestamp = data[:-1, 0] * 1e-6
self.interval_duration = (data[1:, 0] - data[:-1, 0]) * 1e-6
- self.interval_power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ((data[1:, 0] - data[:-1, 0]) * 1e-6)
+ self.interval_power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / (
+ (data[1:, 0] - data[:-1, 0]) * 1e-6
+ )
m_duration_us = data[-1, 0] - data[0, 0]
self.sample_rate = data_count / (m_duration_us * 1e-6)
- vprint(self.verbose, 'got {} samples with {} seconds of log data ({} Hz)'.format(data_count, m_duration_us * 1e-6, self.sample_rate))
+ vprint(
+ self.verbose,
+ "got {} samples with {} seconds of log data ({} Hz)".format(
+ data_count, m_duration_us * 1e-6, self.sample_rate
+ ),
+ )
- return self.interval_start_timestamp, self.interval_duration, self.interval_power
+ return (
+ self.interval_start_timestamp,
+ self.interval_duration,
+ self.interval_power,
+ )
def ts_to_index(self, timestamp):
"""
@@ -2279,7 +2692,12 @@ class EnergyTraceLog:
mid_index = left_index + (right_index - left_index) // 2
# I'm feeling lucky
- if timestamp > self.interval_start_timestamp[mid_index] and timestamp <= self.interval_start_timestamp[mid_index] + self.interval_duration[mid_index]:
+ if (
+ timestamp > self.interval_start_timestamp[mid_index]
+ and timestamp
+ <= self.interval_start_timestamp[mid_index]
+ + self.interval_duration[mid_index]
+ ):
return mid_index
if timestamp <= self.interval_start_timestamp[mid_index]:
@@ -2322,16 +2740,29 @@ class EnergyTraceLog:
expected_transitions = list()
for trace_number, trace in enumerate(traces):
- for state_or_transition_number, state_or_transition in enumerate(trace['trace']):
- if state_or_transition['isa'] == 'transition':
+ for state_or_transition_number, state_or_transition in enumerate(
+ trace["trace"]
+ ):
+ if state_or_transition["isa"] == "transition":
try:
- expected_transitions.append((
- state_or_transition['name'],
- state_or_transition['online_aggregates']['duration'][offline_index] * 1e-6
- ))
+ expected_transitions.append(
+ (
+ state_or_transition["name"],
+ state_or_transition["online_aggregates"]["duration"][
+ offline_index
+ ]
+ * 1e-6,
+ )
+ )
except IndexError:
- self.errors.append('Entry #{} ("{}") in trace #{} has no duration entry for offline_index/repeat_id {}'.format(
- state_or_transition_number, state_or_transition['name'], trace_number, offline_index))
+ self.errors.append(
+ 'Entry #{} ("{}") in trace #{} has no duration entry for offline_index/repeat_id {}'.format(
+ state_or_transition_number,
+ state_or_transition["name"],
+ trace_number,
+ offline_index,
+ )
+ )
return energy_trace
next_barcode = first_sync
@@ -2342,51 +2773,101 @@ class EnergyTraceLog:
print('[!!!] did not find transition "{}"'.format(name))
break
next_barcode = end + self.state_duration + duration
- vprint(self.verbose, '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format(offline_index, bc, start, stop, end))
+ vprint(
+ self.verbose,
+ '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format(
+ offline_index, bc, start, stop, end
+ ),
+ )
if bc != name:
- vprint(self.verbose, '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc))
- vprint(self.verbose, '{} estimated transition area: {:0.3f} .. {:0.3f} seconds'.format(offline_index, end, end + duration))
+ vprint(
+ self.verbose,
+ '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc),
+ )
+ vprint(
+ self.verbose,
+ "{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format(
+ offline_index, end, end + duration
+ ),
+ )
transition_start_index = self.ts_to_index(end)
transition_done_index = self.ts_to_index(end + duration) + 1
state_start_index = transition_done_index
- state_done_index = self.ts_to_index(end + duration + self.state_duration) + 1
-
- vprint(self.verbose, '{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds'.format(offline_index, transition_start_index / self.sample_rate, transition_done_index / self.sample_rate))
+ state_done_index = (
+ self.ts_to_index(end + duration + self.state_duration) + 1
+ )
- energy_trace.append({
- 'isa': 'transition',
- 'W_mean': np.mean(self.interval_power[transition_start_index: transition_done_index]),
- 'W_std': np.std(self.interval_power[transition_start_index: transition_done_index]),
- 's': duration,
- 's_coarse': self.interval_start_timestamp[transition_done_index] - self.interval_start_timestamp[transition_start_index]
+ vprint(
+ self.verbose,
+ "{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format(
+ offline_index,
+ transition_start_index / self.sample_rate,
+ transition_done_index / self.sample_rate,
+ ),
+ )
- })
+ energy_trace.append(
+ {
+ "isa": "transition",
+ "W_mean": np.mean(
+ self.interval_power[
+ transition_start_index:transition_done_index
+ ]
+ ),
+ "W_std": np.std(
+ self.interval_power[
+ transition_start_index:transition_done_index
+ ]
+ ),
+ "s": duration,
+ "s_coarse": self.interval_start_timestamp[transition_done_index]
+ - self.interval_start_timestamp[transition_start_index],
+ }
+ )
if len(energy_trace) > 1:
- energy_trace[-1]['W_mean_delta_prev'] = energy_trace[-1]['W_mean'] - energy_trace[-2]['W_mean']
+ energy_trace[-1]["W_mean_delta_prev"] = (
+ energy_trace[-1]["W_mean"] - energy_trace[-2]["W_mean"]
+ )
- energy_trace.append({
- 'isa': 'state',
- 'W_mean': np.mean(self.interval_power[state_start_index: state_done_index]),
- 'W_std': np.std(self.interval_power[state_start_index: state_done_index]),
- 's': self.state_duration,
- 's_coarse': self.interval_start_timestamp[state_done_index] - self.interval_start_timestamp[state_start_index]
- })
+ energy_trace.append(
+ {
+ "isa": "state",
+ "W_mean": np.mean(
+ self.interval_power[state_start_index:state_done_index]
+ ),
+ "W_std": np.std(
+ self.interval_power[state_start_index:state_done_index]
+ ),
+ "s": self.state_duration,
+ "s_coarse": self.interval_start_timestamp[state_done_index]
+ - self.interval_start_timestamp[state_start_index],
+ }
+ )
- energy_trace[-2]['W_mean_delta_next'] = energy_trace[-2]['W_mean'] - energy_trace[-1]['W_mean']
+ energy_trace[-2]["W_mean_delta_next"] = (
+ energy_trace[-2]["W_mean"] - energy_trace[-1]["W_mean"]
+ )
expected_transition_count = len(expected_transitions)
recovered_transition_ount = len(energy_trace) // 2
if expected_transition_count != recovered_transition_ount:
- self.errors.append('Expected {:d} transitions, got {:d}'.format(expected_transition_count, recovered_transition_ount))
+ self.errors.append(
+ "Expected {:d} transitions, got {:d}".format(
+ expected_transition_count, recovered_transition_ount
+ )
+ )
return energy_trace
def find_first_sync(self):
# LED Power is approx. self.led_power W, use self.led_power/2 W above surrounding median as threshold
- sync_threshold_power = np.median(self.interval_power[: int(3 * self.sample_rate)]) + self.led_power / 3
+ sync_threshold_power = (
+ np.median(self.interval_power[: int(3 * self.sample_rate)])
+ + self.led_power / 3
+ )
for i, ts in enumerate(self.interval_start_timestamp):
if ts > 2 and self.interval_power[i] > sync_threshold_power:
return self.interval_start_timestamp[i - 300]
@@ -2410,26 +2891,56 @@ class EnergyTraceLog:
lookaround = int(0.1 * self.sample_rate)
# LED Power is approx. self.led_power W, use self.led_power/2 W above surrounding median as threshold
- sync_threshold_power = np.median(self.interval_power[start_position - lookaround: start_position + lookaround]) + self.led_power / 3
+ sync_threshold_power = (
+ np.median(
+ self.interval_power[
+ start_position - lookaround : start_position + lookaround
+ ]
+ )
+ + self.led_power / 3
+ )
- vprint(self.verbose, 'looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW'.format(start_ts, sync_threshold_power * 1e3))
+ vprint(
+ self.verbose,
+ "looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format(
+ start_ts, sync_threshold_power * 1e3
+ ),
+ )
sync_area_start = None
sync_start_ts = None
sync_area_end = None
sync_end_ts = None
for i, ts in enumerate(self.interval_start_timestamp):
- if sync_area_start is None and ts >= start_ts and self.interval_power[i] > sync_threshold_power:
+ if (
+ sync_area_start is None
+ and ts >= start_ts
+ and self.interval_power[i] > sync_threshold_power
+ ):
sync_area_start = i - 300
sync_start_ts = ts
- if sync_area_start is not None and sync_area_end is None and ts > sync_start_ts + self.min_barcode_duration and (ts > sync_start_ts + self.max_barcode_duration or abs(sync_threshold_power - self.interval_power[i]) > self.led_power):
+ if (
+ sync_area_start is not None
+ and sync_area_end is None
+ and ts > sync_start_ts + self.min_barcode_duration
+ and (
+ ts > sync_start_ts + self.max_barcode_duration
+ or abs(sync_threshold_power - self.interval_power[i])
+ > self.led_power
+ )
+ ):
sync_area_end = i
sync_end_ts = ts
break
- barcode_data = self.interval_power[sync_area_start: sync_area_end]
+ barcode_data = self.interval_power[sync_area_start:sync_area_end]
- vprint(self.verbose, 'barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)'.format(sync_start_ts, sync_end_ts, len(barcode_data)))
+ vprint(
+ self.verbose,
+ "barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format(
+ sync_start_ts, sync_end_ts, len(barcode_data)
+ ),
+ )
bc, start, stop, padding_bits = self.find_barcode_in_power_data(barcode_data)
@@ -2439,7 +2950,9 @@ class EnergyTraceLog:
start_ts = self.interval_start_timestamp[sync_area_start + start]
stop_ts = self.interval_start_timestamp[sync_area_start + stop]
- end_ts = stop_ts + self.module_duration * padding_bits + self.quiet_zone_duration
+ end_ts = (
+ stop_ts + self.module_duration * padding_bits + self.quiet_zone_duration
+ )
# barcode content, barcode start timestamp, barcode stop timestamp, barcode end (stop + padding) timestamp
return bc, start_ts, stop_ts, end_ts
@@ -2455,7 +2968,9 @@ class EnergyTraceLog:
# -> Create a black and white (not grayscale) image to avoid this.
# Unfortunately, this decreases resilience against background noise
# (e.g. a not-exactly-idle peripheral device or CPU interrupts).
- image_data = np.around(1 - ((barcode_data - min_power) / (max_power - min_power)))
+ image_data = np.around(
+ 1 - ((barcode_data - min_power) / (max_power - min_power))
+ )
image_data *= 255
# zbar only returns the complete barcode position if it is at least
@@ -2469,12 +2984,12 @@ class EnergyTraceLog:
# img = Image.frombytes('L', (width, height), image_data).resize((width, 100))
# img.save('/tmp/test-{}.png'.format(os.getpid()))
- zbimg = zbar.Image(width, height, 'Y800', image_data)
+ zbimg = zbar.Image(width, height, "Y800", image_data)
scanner = zbar.ImageScanner()
- scanner.parse_config('enable')
+ scanner.parse_config("enable")
if scanner.scan(zbimg):
- sym, = zbimg.symbols
+ (sym,) = zbimg.symbols
content = sym.data
try:
sym_start = sym.location[1][0]
@@ -2482,7 +2997,7 @@ class EnergyTraceLog:
sym_start = 0
sym_end = sym.location[0][0]
- match = re.fullmatch(r'T(\d+)', content)
+ match = re.fullmatch(r"T(\d+)", content)
if match:
content = self.transition_names[int(match.group(1))]
@@ -2490,7 +3005,7 @@ class EnergyTraceLog:
# additional non-barcode padding (encoded as LED off / image white).
# Calculate the amount of extra bits to determine the offset until
# the transition starts.
- padding_bits = len(Code128(sym.data, charset='B').modules) % 8
+ padding_bits = len(Code128(sym.data, charset="B").modules) % 8
# sym_start leaves out the first two bars, but we don't do anything about that here
# sym_end leaves out the last three bars, each of which is one padding bit long.
@@ -2499,7 +3014,7 @@ class EnergyTraceLog:
return content, sym_start, sym_end, padding_bits
else:
- vprint(self.verbose, 'unable to find barcode')
+ vprint(self.verbose, "unable to find barcode")
return None, None, None, None
@@ -2555,15 +3070,15 @@ class MIMOSA:
:returns: (numpy array of charges (pJ per 10µs), numpy array of triggers (0/1 int, per 10µs))
"""
- num_bytes = tf.getmember('/tmp/mimosa//mimosa_scale_1.tmp').size
+ num_bytes = tf.getmember("/tmp/mimosa//mimosa_scale_1.tmp").size
charges = np.ndarray(shape=(int(num_bytes / 4)), dtype=np.int32)
triggers = np.ndarray(shape=(int(num_bytes / 4)), dtype=np.int8)
- with tf.extractfile('/tmp/mimosa//mimosa_scale_1.tmp') as f:
+ with tf.extractfile("/tmp/mimosa//mimosa_scale_1.tmp") as f:
content = f.read()
- iterator = struct.iter_unpack('<I', content)
+ iterator = struct.iter_unpack("<I", content)
i = 0
for word in iterator:
- charges[i] = (word[0] >> 4)
+ charges[i] = word[0] >> 4
triggers[i] = (word[0] & 0x08) >> 3
i += 1
return charges, triggers
@@ -2616,7 +3131,7 @@ class MIMOSA:
trigidx = []
if len(triggers) < 1000000:
- self.errors.append('MIMOSA log is too short')
+ self.errors.append("MIMOSA log is too short")
return trigidx
prevtrig = triggers[999999]
@@ -2625,13 +3140,17 @@ class MIMOSA:
# something went wrong and are unable to determine when the first
# transition starts.
if prevtrig != 0:
- self.errors.append('Unable to find start of first transition (log starts with trigger == {} != 0)'.format(prevtrig))
+ self.errors.append(
+ "Unable to find start of first transition (log starts with trigger == {} != 0)".format(
+ prevtrig
+ )
+ )
# if the last trigger is high (i.e., trigger/buzzer pin is active when the benchmark ends),
# it terminated in the middle of a transition -- meaning that it was not
# measured in its entirety.
if triggers[-1] != 0:
- self.errors.append('Log ends during a transition'.format(prevtrig))
+ self.errors.append("Log ends during a transition".format(prevtrig))
# the device is reset for MIMOSA calibration in the first 10s and may
# send bogus interrupts -> bogus triggers
@@ -2663,11 +3182,23 @@ class MIMOSA:
for i in range(100000, len(currents)):
if r1idx == 0 and currents[i] > ua_r1 * 0.6:
r1idx = i
- elif r1idx != 0 and r2idx == 0 and i > (r1idx + 180000) and currents[i] < ua_r1 * 0.4:
+ elif (
+ r1idx != 0
+ and r2idx == 0
+ and i > (r1idx + 180000)
+ and currents[i] < ua_r1 * 0.4
+ ):
r2idx = i
# 2s disconnected, 2s r1, 2s r2 with r1 < r2 -> ua_r1 > ua_r2
# allow 5ms buffer in both directions to account for bouncing relais contacts
- return r1idx - 180500, r1idx - 500, r1idx + 500, r2idx - 500, r2idx + 500, r2idx + 180500
+ return (
+ r1idx - 180500,
+ r1idx - 500,
+ r1idx + 500,
+ r2idx - 500,
+ r2idx + 500,
+ r2idx + 180500,
+ )
def calibration_function(self, charges, cal_edges):
u"""
@@ -2711,7 +3242,7 @@ class MIMOSA:
if cal_r2_mean > cal_0_mean:
b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean)
else:
- vprint(self.verbose, '[W] 0 uA == %.f uA during calibration' % (ua_r2))
+ vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2))
b_lower = 0
b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean)
@@ -2726,7 +3257,9 @@ class MIMOSA:
return 0
else:
return charge * b_lower + a_lower
+
else:
+
def calfunc(charge):
if charge < cal_0_mean:
return 0
@@ -2736,19 +3269,19 @@ class MIMOSA:
return charge * b_upper + a_upper + ua_r2
caldata = {
- 'edges': [x * 10 for x in cal_edges],
- 'offset': cal_0_mean,
- 'offset2': cal_r2_mean,
- 'slope_low': b_lower,
- 'slope_high': b_upper,
- 'add_low': a_lower,
- 'add_high': a_upper,
- 'r0_err_uW': np.mean(self.currents_nocal(chg_r0)) * self.voltage,
- 'r0_std_uW': np.std(self.currents_nocal(chg_r0)) * self.voltage,
- 'r1_err_uW': (np.mean(self.currents_nocal(chg_r1)) - ua_r1) * self.voltage,
- 'r1_std_uW': np.std(self.currents_nocal(chg_r1)) * self.voltage,
- 'r2_err_uW': (np.mean(self.currents_nocal(chg_r2)) - ua_r2) * self.voltage,
- 'r2_std_uW': np.std(self.currents_nocal(chg_r2)) * self.voltage,
+ "edges": [x * 10 for x in cal_edges],
+ "offset": cal_0_mean,
+ "offset2": cal_r2_mean,
+ "slope_low": b_lower,
+ "slope_high": b_upper,
+ "add_low": a_lower,
+ "add_high": a_upper,
+ "r0_err_uW": np.mean(self.currents_nocal(chg_r0)) * self.voltage,
+ "r0_std_uW": np.std(self.currents_nocal(chg_r0)) * self.voltage,
+ "r1_err_uW": (np.mean(self.currents_nocal(chg_r1)) - ua_r1) * self.voltage,
+ "r1_std_uW": np.std(self.currents_nocal(chg_r1)) * self.voltage,
+ "r2_err_uW": (np.mean(self.currents_nocal(chg_r2)) - ua_r2) * self.voltage,
+ "r2_std_uW": np.std(self.currents_nocal(chg_r2)) * self.voltage,
}
# print("if charge < %f : return 0" % cal_0_mean)
@@ -2843,51 +3376,59 @@ class MIMOSA:
statelist = []
prevsubidx = 0
for subidx in subst:
- statelist.append({
- 'duration': (subidx - prevsubidx) * 10,
- 'uW_mean': np.mean(range_ua[prevsubidx: subidx] * self.voltage),
- 'uW_std': np.std(range_ua[prevsubidx: subidx] * self.voltage),
- })
+ statelist.append(
+ {
+ "duration": (subidx - prevsubidx) * 10,
+ "uW_mean": np.mean(
+ range_ua[prevsubidx:subidx] * self.voltage
+ ),
+ "uW_std": np.std(
+ range_ua[prevsubidx:subidx] * self.voltage
+ ),
+ }
+ )
prevsubidx = subidx
substates = {
- 'threshold': thr,
- 'states': statelist,
+ "threshold": thr,
+ "states": statelist,
}
- isa = 'state'
+ isa = "state"
if not is_state:
- isa = 'transition'
+ isa = "transition"
data = {
- 'isa': isa,
- 'clip_rate': np.mean(range_raw == 65535),
- 'raw_mean': np.mean(range_raw),
- 'raw_std': np.std(range_raw),
- 'uW_mean': np.mean(range_ua * self.voltage),
- 'uW_std': np.std(range_ua * self.voltage),
- 'us': (idx - previdx) * 10,
+ "isa": isa,
+ "clip_rate": np.mean(range_raw == 65535),
+ "raw_mean": np.mean(range_raw),
+ "raw_std": np.std(range_raw),
+ "uW_mean": np.mean(range_ua * self.voltage),
+ "uW_std": np.std(range_ua * self.voltage),
+ "us": (idx - previdx) * 10,
}
if self.with_traces:
- data['uW'] = range_ua * self.voltage
+ data["uW"] = range_ua * self.voltage
- if 'states' in substates:
- data['substates'] = substates
- ssum = np.sum(list(map(lambda x: x['duration'], substates['states'])))
- if ssum != data['us']:
- vprint(self.verbose, "ERR: duration %d vs %d" % (data['us'], ssum))
+ if "states" in substates:
+ data["substates"] = substates
+ ssum = np.sum(list(map(lambda x: x["duration"], substates["states"])))
+ if ssum != data["us"]:
+ vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum))
- if isa == 'transition':
+ if isa == "transition":
# subtract average power of previous state
# (that is, the state from which this transition originates)
- data['uW_mean_delta_prev'] = data['uW_mean'] - iterdata[-1]['uW_mean']
+ data["uW_mean_delta_prev"] = data["uW_mean"] - iterdata[-1]["uW_mean"]
# placeholder to avoid extra cases in the analysis
- data['uW_mean_delta_next'] = data['uW_mean']
- data['timeout'] = iterdata[-1]['us']
+ data["uW_mean_delta_next"] = data["uW_mean"]
+ data["timeout"] = iterdata[-1]["us"]
elif len(iterdata) > 0:
# subtract average power of next state
# (the state into which this transition leads)
- iterdata[-1]['uW_mean_delta_next'] = iterdata[-1]['uW_mean'] - data['uW_mean']
+ iterdata[-1]["uW_mean_delta_next"] = (
+ iterdata[-1]["uW_mean"] - data["uW_mean"]
+ )
iterdata.append(data)