diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2020-05-28 12:04:37 +0200 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2020-05-28 12:04:37 +0200 |
commit | c69331e4d925658b2bf26dcb387981f6530d7b9e (patch) | |
tree | d19c7f9b0bf51f68c104057e013630e009835268 /lib/dfatool.py | |
parent | 23927051ac3e64cabbaa6c30e8356dfe90ebfa6c (diff) |
use black(1) for uniform code formatting
Diffstat (limited to 'lib/dfatool.py')
-rw-r--r-- | lib/dfatool.py | 1797 |
1 files changed, 1169 insertions, 628 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py index 8fb41a5..56f0f2d 100644 --- a/lib/dfatool.py +++ b/lib/dfatool.py @@ -15,12 +15,19 @@ from multiprocessing import Pool from .functions import analytic from .functions import AnalyticFunction from .parameters import ParamStats -from .utils import vprint, is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple +from .utils import ( + vprint, + is_numeric, + soft_cast_int, + param_slice_eq, + remove_index_from_tuple, +) from .utils import by_name_to_by_param, match_parameter_values, running_mean try: from .pubcode import Code128 import zbar + zbar_available = True except ImportError: zbar_available = False @@ -47,25 +54,25 @@ def gplearn_to_function(function_str: str): inv -- 1 / x if |x| > 0.001, otherwise 0 """ eval_globals = { - 'add': lambda x, y: x + y, - 'sub': lambda x, y: x - y, - 'mul': lambda x, y: x * y, - 'div': lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1., - 'sqrt': lambda x: np.sqrt(np.abs(x)), - 'log': lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0., - 'inv': lambda x: 1. / x if np.abs(x) > 0.001 else 0., + "add": lambda x, y: x + y, + "sub": lambda x, y: x - y, + "mul": lambda x, y: x * y, + "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0, + "sqrt": lambda x: np.sqrt(np.abs(x)), + "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0, + "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0, } last_arg_index = 0 for i in range(0, 100): - if function_str.find('X{:d}'.format(i)) >= 0: + if function_str.find("X{:d}".format(i)) >= 0: last_arg_index = i arg_list = [] for i in range(0, last_arg_index + 1): - arg_list.append('X{:d}'.format(i)) + arg_list.append("X{:d}".format(i)) - eval_str = 'lambda {}, *whatever: {}'.format(','.join(arg_list), function_str) + eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str) print(eval_str) return eval(eval_str, eval_globals) @@ -123,32 +130,35 @@ def regression_measures(predicted: np.ndarray, actual: np.ndarray): count -- Number of values """ if type(predicted) != np.ndarray: - raise ValueError('first arg must be ndarray, is {}'.format(type(predicted))) + raise ValueError("first arg must be ndarray, is {}".format(type(predicted))) if type(actual) != np.ndarray: - raise ValueError('second arg must be ndarray, is {}'.format(type(actual))) + raise ValueError("second arg must be ndarray, is {}".format(type(actual))) deviations = predicted - actual # mean = np.mean(actual) if len(deviations) == 0: return {} measures = { - 'mae': np.mean(np.abs(deviations), dtype=np.float64), - 'msd': np.mean(deviations**2, dtype=np.float64), - 'rmsd': np.sqrt(np.mean(deviations**2), dtype=np.float64), - 'ssr': np.sum(deviations**2, dtype=np.float64), - 'rsq': r2_score(actual, predicted), - 'count': len(actual), + "mae": np.mean(np.abs(deviations), dtype=np.float64), + "msd": np.mean(deviations ** 2, dtype=np.float64), + "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64), + "ssr": np.sum(deviations ** 2, dtype=np.float64), + "rsq": r2_score(actual, predicted), + "count": len(actual), } # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64) if np.all(actual != 0): - measures['mape'] = np.mean(np.abs(deviations / actual)) * 100 # bad measure + measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure else: - measures['mape'] = np.nan + measures["mape"] = np.nan if np.all(np.abs(predicted) + np.abs(actual) != 0): - measures['smape'] = np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) * 100 + measures["smape"] = ( + np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) + * 100 + ) else: - measures['smape'] = np.nan + measures["smape"] = np.nan # if np.all(rsq_quotient != 0): # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient @@ -177,7 +187,7 @@ class KeysightCSV: with open(filename) as f: for _ in range(4): next(f) - reader = csv.reader(f, delimiter=',') + reader = csv.reader(f, delimiter=",") for i, row in enumerate(reader): timestamps[i] = float(row[0]) currents[i] = float(row[2]) * -1 @@ -266,29 +276,35 @@ class CrossValidator: } } """ - ret = { - 'by_name': dict() - } + ret = {"by_name": dict()} for name in self.names: - ret['by_name'][name] = dict() - for attribute in self.by_name[name]['attributes']: - ret['by_name'][name][attribute] = { - 'mae_list': list(), - 'smape_list': list() + ret["by_name"][name] = dict() + for attribute in self.by_name[name]["attributes"]: + ret["by_name"][name][attribute] = { + "mae_list": list(), + "smape_list": list(), } for _ in range(count): res = self._single_montecarlo(model_getter) for name in self.names: - for attribute in self.by_name[name]['attributes']: - ret['by_name'][name][attribute]['mae_list'].append(res['by_name'][name][attribute]['mae']) - ret['by_name'][name][attribute]['smape_list'].append(res['by_name'][name][attribute]['smape']) + for attribute in self.by_name[name]["attributes"]: + ret["by_name"][name][attribute]["mae_list"].append( + res["by_name"][name][attribute]["mae"] + ) + ret["by_name"][name][attribute]["smape_list"].append( + res["by_name"][name][attribute]["smape"] + ) for name in self.names: - for attribute in self.by_name[name]['attributes']: - ret['by_name'][name][attribute]['mae'] = np.mean(ret['by_name'][name][attribute]['mae_list']) - ret['by_name'][name][attribute]['smape'] = np.mean(ret['by_name'][name][attribute]['smape_list']) + for attribute in self.by_name[name]["attributes"]: + ret["by_name"][name][attribute]["mae"] = np.mean( + ret["by_name"][name][attribute]["mae_list"] + ) + ret["by_name"][name][attribute]["smape"] = np.mean( + ret["by_name"][name][attribute]["smape_list"] + ) return ret @@ -296,77 +312,87 @@ class CrossValidator: training = dict() validation = dict() for name in self.names: - training[name] = { - 'attributes': self.by_name[name]['attributes'] - } - validation[name] = { - 'attributes': self.by_name[name]['attributes'] - } + training[name] = {"attributes": self.by_name[name]["attributes"]} + validation[name] = {"attributes": self.by_name[name]["attributes"]} - if 'isa' in self.by_name[name]: - training[name]['isa'] = self.by_name[name]['isa'] - validation[name]['isa'] = self.by_name[name]['isa'] + if "isa" in self.by_name[name]: + training[name]["isa"] = self.by_name[name]["isa"] + validation[name]["isa"] = self.by_name[name]["isa"] - data_count = len(self.by_name[name]['param']) + data_count = len(self.by_name[name]["param"]) training_subset, validation_subset = _xv_partition_montecarlo(data_count) - for attribute in self.by_name[name]['attributes']: + for attribute in self.by_name[name]["attributes"]: self.by_name[name][attribute] = np.array(self.by_name[name][attribute]) - training[name][attribute] = self.by_name[name][attribute][training_subset] - validation[name][attribute] = self.by_name[name][attribute][validation_subset] + training[name][attribute] = self.by_name[name][attribute][ + training_subset + ] + validation[name][attribute] = self.by_name[name][attribute][ + validation_subset + ] # We can't use slice syntax for 'param', which may contain strings and other odd values - training[name]['param'] = list() - validation[name]['param'] = list() + training[name]["param"] = list() + validation[name]["param"] = list() for idx in training_subset: - training[name]['param'].append(self.by_name[name]['param'][idx]) + training[name]["param"].append(self.by_name[name]["param"][idx]) for idx in validation_subset: - validation[name]['param'].append(self.by_name[name]['param'][idx]) + validation[name]["param"].append(self.by_name[name]["param"][idx]) - training_data = self.model_class(training, self.parameters, self.arg_count, verbose=False) + training_data = self.model_class( + training, self.parameters, self.arg_count, verbose=False + ) training_model = model_getter(training_data) - validation_data = self.model_class(validation, self.parameters, self.arg_count, verbose=False) + validation_data = self.model_class( + validation, self.parameters, self.arg_count, verbose=False + ) return validation_data.assess(training_model) def _preprocess_mimosa(measurement): - setup = measurement['setup'] - mim = MIMOSA(float(setup['mimosa_voltage']), int(setup['mimosa_shunt']), with_traces=measurement['with_traces']) + setup = measurement["setup"] + mim = MIMOSA( + float(setup["mimosa_voltage"]), + int(setup["mimosa_shunt"]), + with_traces=measurement["with_traces"], + ) try: - charges, triggers = mim.load_data(measurement['content']) + charges, triggers = mim.load_data(measurement["content"]) trigidx = mim.trigger_edges(triggers) except EOFError as e: - mim.errors.append('MIMOSA logfile error: {}'.format(e)) + mim.errors.append("MIMOSA logfile error: {}".format(e)) trigidx = list() if len(trigidx) == 0: - mim.errors.append('MIMOSA log has no triggers') + mim.errors.append("MIMOSA log has no triggers") return { - 'fileno': measurement['fileno'], - 'info': measurement['info'], - 'has_datasource_error': len(mim.errors) > 0, - 'datasource_errors': mim.errors, - 'expected_trace': measurement['expected_trace'], - 'repeat_id': measurement['repeat_id'], + "fileno": measurement["fileno"], + "info": measurement["info"], + "has_datasource_error": len(mim.errors) > 0, + "datasource_errors": mim.errors, + "expected_trace": measurement["expected_trace"], + "repeat_id": measurement["repeat_id"], } - cal_edges = mim.calibration_edges(running_mean(mim.currents_nocal(charges[0:trigidx[0]]), 10)) + cal_edges = mim.calibration_edges( + running_mean(mim.currents_nocal(charges[0 : trigidx[0]]), 10) + ) calfunc, caldata = mim.calibration_function(charges, cal_edges) vcalfunc = np.vectorize(calfunc, otypes=[np.float64]) processed_data = { - 'fileno': measurement['fileno'], - 'info': measurement['info'], - 'triggers': len(trigidx), - 'first_trig': trigidx[0] * 10, - 'calibration': caldata, - 'energy_trace': mim.analyze_states(charges, trigidx, vcalfunc), - 'has_datasource_error': len(mim.errors) > 0, - 'datasource_errors': mim.errors, + "fileno": measurement["fileno"], + "info": measurement["info"], + "triggers": len(trigidx), + "first_trig": trigidx[0] * 10, + "calibration": caldata, + "energy_trace": mim.analyze_states(charges, trigidx, vcalfunc), + "has_datasource_error": len(mim.errors) > 0, + "datasource_errors": mim.errors, } - for key in ['expected_trace', 'repeat_id']: + for key in ["expected_trace", "repeat_id"]: if key in measurement: processed_data[key] = measurement[key] @@ -374,22 +400,28 @@ def _preprocess_mimosa(measurement): def _preprocess_etlog(measurement): - setup = measurement['setup'] - etlog = EnergyTraceLog(float(setup['voltage']), int(setup['state_duration']), measurement['transition_names']) + setup = measurement["setup"] + etlog = EnergyTraceLog( + float(setup["voltage"]), + int(setup["state_duration"]), + measurement["transition_names"], + ) try: - etlog.load_data(measurement['content']) - states_and_transitions = etlog.analyze_states(measurement['expected_trace'], measurement['repeat_id']) + etlog.load_data(measurement["content"]) + states_and_transitions = etlog.analyze_states( + measurement["expected_trace"], measurement["repeat_id"] + ) except EOFError as e: - etlog.errors.append('EnergyTrace logfile error: {}'.format(e)) + etlog.errors.append("EnergyTrace logfile error: {}".format(e)) processed_data = { - 'fileno': measurement['fileno'], - 'repeat_id': measurement['repeat_id'], - 'info': measurement['info'], - 'expected_trace': measurement['expected_trace'], - 'energy_trace': states_and_transitions, - 'has_datasource_error': len(etlog.errors) > 0, - 'datasource_errors': etlog.errors, + "fileno": measurement["fileno"], + "repeat_id": measurement["repeat_id"], + "info": measurement["info"], + "expected_trace": measurement["expected_trace"], + "energy_trace": states_and_transitions, + "has_datasource_error": len(etlog.errors) > 0, + "datasource_errors": etlog.errors, } return processed_data @@ -421,35 +453,40 @@ class TimingData: for trace_group in self.traces_by_fileno: for trace in trace_group: # TimingHarness logs states, but does not aggregate any data for them at the moment -> throw all states away - transitions = list(filter(lambda x: x['isa'] == 'transition', trace['trace'])) - self.traces.append({ - 'id': trace['id'], - 'trace': transitions, - }) + transitions = list( + filter(lambda x: x["isa"] == "transition", trace["trace"]) + ) + self.traces.append( + {"id": trace["id"], "trace": transitions,} + ) for i, trace in enumerate(self.traces): - trace['orig_id'] = trace['id'] - trace['id'] = i - for log_entry in trace['trace']: - paramkeys = sorted(log_entry['parameter'].keys()) - if 'param' not in log_entry['offline_aggregates']: - log_entry['offline_aggregates']['param'] = list() - if 'duration' in log_entry['offline_aggregates']: - for i in range(len(log_entry['offline_aggregates']['duration'])): + trace["orig_id"] = trace["id"] + trace["id"] = i + for log_entry in trace["trace"]: + paramkeys = sorted(log_entry["parameter"].keys()) + if "param" not in log_entry["offline_aggregates"]: + log_entry["offline_aggregates"]["param"] = list() + if "duration" in log_entry["offline_aggregates"]: + for i in range(len(log_entry["offline_aggregates"]["duration"])): paramvalues = list() for paramkey in paramkeys: - if type(log_entry['parameter'][paramkey]) is list: - paramvalues.append(soft_cast_int(log_entry['parameter'][paramkey][i])) + if type(log_entry["parameter"][paramkey]) is list: + paramvalues.append( + soft_cast_int(log_entry["parameter"][paramkey][i]) + ) else: - paramvalues.append(soft_cast_int(log_entry['parameter'][paramkey])) - if arg_support_enabled and 'args' in log_entry: - paramvalues.extend(map(soft_cast_int, log_entry['args'])) - log_entry['offline_aggregates']['param'].append(paramvalues) + paramvalues.append( + soft_cast_int(log_entry["parameter"][paramkey]) + ) + if arg_support_enabled and "args" in log_entry: + paramvalues.extend(map(soft_cast_int, log_entry["args"])) + log_entry["offline_aggregates"]["param"].append(paramvalues) def _preprocess_0(self): for filename in self.filenames: - with open(filename, 'r') as f: + with open(filename, "r") as f: log_data = json.load(f) - self.traces_by_fileno.extend(log_data['traces']) + self.traces_by_fileno.extend(log_data["traces"]) self._concatenate_analyzed_traces() def get_preprocessed_data(self, verbose=True): @@ -470,17 +507,25 @@ class TimingData: def sanity_check_aggregate(aggregate): for key in aggregate: - if 'param' not in aggregate[key]: - raise RuntimeError('aggregate[{}][param] does not exist'.format(key)) - if 'attributes' not in aggregate[key]: - raise RuntimeError('aggregate[{}][attributes] does not exist'.format(key)) - for attribute in aggregate[key]['attributes']: + if "param" not in aggregate[key]: + raise RuntimeError("aggregate[{}][param] does not exist".format(key)) + if "attributes" not in aggregate[key]: + raise RuntimeError("aggregate[{}][attributes] does not exist".format(key)) + for attribute in aggregate[key]["attributes"]: if attribute not in aggregate[key]: - raise RuntimeError('aggregate[{}][{}] does not exist, even though it is contained in aggregate[{}][attributes]'.format(key, attribute, key)) - param_len = len(aggregate[key]['param']) + raise RuntimeError( + "aggregate[{}][{}] does not exist, even though it is contained in aggregate[{}][attributes]".format( + key, attribute, key + ) + ) + param_len = len(aggregate[key]["param"]) attr_len = len(aggregate[key][attribute]) if param_len != attr_len: - raise RuntimeError('parameter mismatch: len(aggregate[{}][param]) == {} != len(aggregate[{}][{}]) == {}'.format(key, param_len, key, attribute, attr_len)) + raise RuntimeError( + "parameter mismatch: len(aggregate[{}][param]) == {} != len(aggregate[{}][{}]) == {}".format( + key, param_len, key, attribute, attr_len + ) + ) class RawData: @@ -559,11 +604,11 @@ class RawData: with tarfile.open(filenames[0]) as tf: for member in tf.getmembers(): - if member.name == 'ptalog.json' and self.version == 0: + if member.name == "ptalog.json" and self.version == 0: self.version = 1 # might also be version 2 # depends on whether *.etlog exists or not - elif '.etlog' in member.name: + elif ".etlog" in member.name: self.version = 2 break @@ -572,18 +617,18 @@ class RawData: self.load_cache() def set_cache_file(self): - cache_key = hashlib.sha256('!'.join(self.filenames).encode()).hexdigest() - self.cache_dir = os.path.dirname(self.filenames[0]) + '/cache' - self.cache_file = '{}/{}.json'.format(self.cache_dir, cache_key) + cache_key = hashlib.sha256("!".join(self.filenames).encode()).hexdigest() + self.cache_dir = os.path.dirname(self.filenames[0]) + "/cache" + self.cache_file = "{}/{}.json".format(self.cache_dir, cache_key) def load_cache(self): if os.path.exists(self.cache_file): - with open(self.cache_file, 'r') as f: + with open(self.cache_file, "r") as f: cache_data = json.load(f) - self.traces = cache_data['traces'] - self.preprocessing_stats = cache_data['preprocessing_stats'] - if 'pta' in cache_data: - self.pta = cache_data['pta'] + self.traces = cache_data["traces"] + self.preprocessing_stats = cache_data["preprocessing_stats"] + if "pta" in cache_data: + self.pta = cache_data["pta"] self.preprocessed = True def save_cache(self): @@ -593,30 +638,30 @@ class RawData: os.mkdir(self.cache_dir) except FileExistsError: pass - with open(self.cache_file, 'w') as f: + with open(self.cache_file, "w") as f: cache_data = { - 'traces': self.traces, - 'preprocessing_stats': self.preprocessing_stats, - 'pta': self.pta, + "traces": self.traces, + "preprocessing_stats": self.preprocessing_stats, + "pta": self.pta, } json.dump(cache_data, f) def _state_is_too_short(self, online, offline, state_duration, next_transition): # We cannot control when an interrupt causes a state to be left - if next_transition['plan']['level'] == 'epilogue': + if next_transition["plan"]["level"] == "epilogue": return False # Note: state_duration is stored as ms, not us - return offline['us'] < state_duration * 500 + return offline["us"] < state_duration * 500 def _state_is_too_long(self, online, offline, state_duration, prev_transition): # If the previous state was left by an interrupt, we may have some # waiting time left over. So it's okay if the current state is longer # than expected. - if prev_transition['plan']['level'] == 'epilogue': + if prev_transition["plan"]["level"] == "epilogue": return False # state_duration is stored as ms, not us - return offline['us'] > state_duration * 1500 + return offline["us"] > state_duration * 1500 def _measurement_is_valid_2(self, processed_data): """ @@ -642,8 +687,8 @@ class RawData: """ # Check for low-level parser errors - if processed_data['has_datasource_error']: - processed_data['error'] = '; '.join(processed_data['datasource_errors']) + if processed_data["has_datasource_error"]: + processed_data["error"] = "; ".join(processed_data["datasource_errors"]) return False # Note that the low-level parser (EnergyTraceLog) already checks @@ -680,26 +725,27 @@ class RawData: - uW_mean_delta_prev: Differenz zwischen uW_mean und uW_mean des vorherigen Zustands - uW_mean_delta_next: Differenz zwischen uW_mean und uW_mean des Folgezustands """ - setup = self.setup_by_fileno[processed_data['fileno']] - if 'expected_trace' in processed_data: - traces = processed_data['expected_trace'] + setup = self.setup_by_fileno[processed_data["fileno"]] + if "expected_trace" in processed_data: + traces = processed_data["expected_trace"] else: - traces = self.traces_by_fileno[processed_data['fileno']] - state_duration = setup['state_duration'] + traces = self.traces_by_fileno[processed_data["fileno"]] + state_duration = setup["state_duration"] # Check MIMOSA error - if processed_data['has_datasource_error']: - processed_data['error'] = '; '.join(processed_data['datasource_errors']) + if processed_data["has_datasource_error"]: + processed_data["error"] = "; ".join(processed_data["datasource_errors"]) return False # Check trigger count sched_trigger_count = 0 for run in traces: - sched_trigger_count += len(run['trace']) - if sched_trigger_count != processed_data['triggers']: - processed_data['error'] = 'got {got:d} trigger edges, expected {exp:d}'.format( - got=processed_data['triggers'], - exp=sched_trigger_count + sched_trigger_count += len(run["trace"]) + if sched_trigger_count != processed_data["triggers"]: + processed_data[ + "error" + ] = "got {got:d} trigger edges, expected {exp:d}".format( + got=processed_data["triggers"], exp=sched_trigger_count ) return False # Check state durations. Very short or long states can indicate a @@ -707,62 +753,102 @@ class RawData: # triggers elsewhere online_datapoints = [] for run_idx, run in enumerate(traces): - for trace_part_idx in range(len(run['trace'])): + for trace_part_idx in range(len(run["trace"])): online_datapoints.append((run_idx, trace_part_idx)) for offline_idx, online_ref in enumerate(online_datapoints): online_run_idx, online_trace_part_idx = online_ref - offline_trace_part = processed_data['energy_trace'][offline_idx] - online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx] + offline_trace_part = processed_data["energy_trace"][offline_idx] + online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx] if self._parameter_names is None: - self._parameter_names = sorted(online_trace_part['parameter'].keys()) - - if sorted(online_trace_part['parameter'].keys()) != self._parameter_names: - processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want:s}, is {param_is:s}'.format( - off_idx=offline_idx, on_idx=online_run_idx, + self._parameter_names = sorted(online_trace_part["parameter"].keys()) + + if sorted(online_trace_part["parameter"].keys()) != self._parameter_names: + processed_data[ + "error" + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want:s}, is {param_is:s}".format( + off_idx=offline_idx, + on_idx=online_run_idx, on_sub=online_trace_part_idx, - on_name=online_trace_part['name'], + on_name=online_trace_part["name"], param_want=self._parameter_names, - param_is=sorted(online_trace_part['parameter'].keys()) + param_is=sorted(online_trace_part["parameter"].keys()), ) - if online_trace_part['isa'] != offline_trace_part['isa']: - processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) claims to be {off_isa:s}, but should be {on_isa:s}'.format( - off_idx=offline_idx, on_idx=online_run_idx, + if online_trace_part["isa"] != offline_trace_part["isa"]: + processed_data[ + "error" + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) claims to be {off_isa:s}, but should be {on_isa:s}".format( + off_idx=offline_idx, + on_idx=online_run_idx, on_sub=online_trace_part_idx, - on_name=online_trace_part['name'], - off_isa=offline_trace_part['isa'], - on_isa=online_trace_part['isa']) + on_name=online_trace_part["name"], + off_isa=offline_trace_part["isa"], + on_isa=online_trace_part["isa"], + ) return False # Clipping in UNINITIALIZED (offline_idx == 0) can happen during # calibration and is handled by MIMOSA - if offline_idx != 0 and offline_trace_part['clip_rate'] != 0 and not self.ignore_clipping: - processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) was clipping {clip:f}% of the time'.format( - off_idx=offline_idx, on_idx=online_run_idx, + if ( + offline_idx != 0 + and offline_trace_part["clip_rate"] != 0 + and not self.ignore_clipping + ): + processed_data[ + "error" + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) was clipping {clip:f}% of the time".format( + off_idx=offline_idx, + on_idx=online_run_idx, on_sub=online_trace_part_idx, - on_name=online_trace_part['name'], - clip=offline_trace_part['clip_rate'] * 100, + on_name=online_trace_part["name"], + clip=offline_trace_part["clip_rate"] * 100, ) return False - if online_trace_part['isa'] == 'state' and online_trace_part['name'] != 'UNINITIALIZED' and len(traces[online_run_idx]['trace']) > online_trace_part_idx + 1: - online_prev_transition = traces[online_run_idx]['trace'][online_trace_part_idx - 1] - online_next_transition = traces[online_run_idx]['trace'][online_trace_part_idx + 1] + if ( + online_trace_part["isa"] == "state" + and online_trace_part["name"] != "UNINITIALIZED" + and len(traces[online_run_idx]["trace"]) > online_trace_part_idx + 1 + ): + online_prev_transition = traces[online_run_idx]["trace"][ + online_trace_part_idx - 1 + ] + online_next_transition = traces[online_run_idx]["trace"][ + online_trace_part_idx + 1 + ] try: - if self._state_is_too_short(online_trace_part, offline_trace_part, state_duration, online_next_transition): - processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too short (duration = {dur:d} us)'.format( - off_idx=offline_idx, on_idx=online_run_idx, + if self._state_is_too_short( + online_trace_part, + offline_trace_part, + state_duration, + online_next_transition, + ): + processed_data[ + "error" + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too short (duration = {dur:d} us)".format( + off_idx=offline_idx, + on_idx=online_run_idx, on_sub=online_trace_part_idx, - on_name=online_trace_part['name'], - dur=offline_trace_part['us']) + on_name=online_trace_part["name"], + dur=offline_trace_part["us"], + ) return False - if self._state_is_too_long(online_trace_part, offline_trace_part, state_duration, online_prev_transition): - processed_data['error'] = 'Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too long (duration = {dur:d} us)'.format( - off_idx=offline_idx, on_idx=online_run_idx, + if self._state_is_too_long( + online_trace_part, + offline_trace_part, + state_duration, + online_prev_transition, + ): + processed_data[ + "error" + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) is too long (duration = {dur:d} us)".format( + off_idx=offline_idx, + on_idx=online_run_idx, on_sub=online_trace_part_idx, - on_name=online_trace_part['name'], - dur=offline_trace_part['us']) + on_name=online_trace_part["name"], + dur=offline_trace_part["us"], + ) return False except KeyError: pass @@ -775,136 +861,169 @@ class RawData: # (appends data from measurement['energy_trace']) # If measurement['expected_trace'] exists, it is edited in place instead online_datapoints = [] - if 'expected_trace' in measurement: - traces = measurement['expected_trace'] - traces = self.traces_by_fileno[measurement['fileno']] + if "expected_trace" in measurement: + traces = measurement["expected_trace"] + traces = self.traces_by_fileno[measurement["fileno"]] else: - traces = self.traces_by_fileno[measurement['fileno']] + traces = self.traces_by_fileno[measurement["fileno"]] for run_idx, run in enumerate(traces): - for trace_part_idx in range(len(run['trace'])): + for trace_part_idx in range(len(run["trace"])): online_datapoints.append((run_idx, trace_part_idx)) for offline_idx, online_ref in enumerate(online_datapoints): online_run_idx, online_trace_part_idx = online_ref - offline_trace_part = measurement['energy_trace'][offline_idx] - online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx] + offline_trace_part = measurement["energy_trace"][offline_idx] + online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx] - if 'offline' not in online_trace_part: - online_trace_part['offline'] = [offline_trace_part] + if "offline" not in online_trace_part: + online_trace_part["offline"] = [offline_trace_part] else: - online_trace_part['offline'].append(offline_trace_part) + online_trace_part["offline"].append(offline_trace_part) - paramkeys = sorted(online_trace_part['parameter'].keys()) + paramkeys = sorted(online_trace_part["parameter"].keys()) paramvalues = list() for paramkey in paramkeys: - if type(online_trace_part['parameter'][paramkey]) is list: - paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey][measurement['repeat_id']])) + if type(online_trace_part["parameter"][paramkey]) is list: + paramvalues.append( + soft_cast_int( + online_trace_part["parameter"][paramkey][ + measurement["repeat_id"] + ] + ) + ) else: - paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey])) + paramvalues.append( + soft_cast_int(online_trace_part["parameter"][paramkey]) + ) # NB: Unscheduled transitions do not have an 'args' field set. # However, they should only be caused by interrupts, and # interrupts don't have args anyways. - if arg_support_enabled and 'args' in online_trace_part: - paramvalues.extend(map(soft_cast_int, online_trace_part['args'])) - - if 'offline_aggregates' not in online_trace_part: - online_trace_part['offline_attributes'] = ['power', 'duration', 'energy'] - online_trace_part['offline_aggregates'] = { - 'power': [], - 'duration': [], - 'power_std': [], - 'energy': [], - 'paramkeys': [], - 'param': [], + if arg_support_enabled and "args" in online_trace_part: + paramvalues.extend(map(soft_cast_int, online_trace_part["args"])) + + if "offline_aggregates" not in online_trace_part: + online_trace_part["offline_attributes"] = [ + "power", + "duration", + "energy", + ] + online_trace_part["offline_aggregates"] = { + "power": [], + "duration": [], + "power_std": [], + "energy": [], + "paramkeys": [], + "param": [], } - if online_trace_part['isa'] == 'transition': - online_trace_part['offline_attributes'].extend(['rel_energy_prev', 'rel_energy_next', 'timeout']) - online_trace_part['offline_aggregates']['rel_energy_prev'] = [] - online_trace_part['offline_aggregates']['rel_energy_next'] = [] - online_trace_part['offline_aggregates']['timeout'] = [] + if online_trace_part["isa"] == "transition": + online_trace_part["offline_attributes"].extend( + ["rel_energy_prev", "rel_energy_next", "timeout"] + ) + online_trace_part["offline_aggregates"]["rel_energy_prev"] = [] + online_trace_part["offline_aggregates"]["rel_energy_next"] = [] + online_trace_part["offline_aggregates"]["timeout"] = [] # Note: All state/transitions are 20us "too long" due to injected # active wait states. These are needed to work around MIMOSA's # relatively low sample rate of 100 kHz (10us) and removed here. - online_trace_part['offline_aggregates']['power'].append( - offline_trace_part['uW_mean']) - online_trace_part['offline_aggregates']['duration'].append( - offline_trace_part['us'] - 20) - online_trace_part['offline_aggregates']['power_std'].append( - offline_trace_part['uW_std']) - online_trace_part['offline_aggregates']['energy'].append( - offline_trace_part['uW_mean'] * (offline_trace_part['us'] - 20)) - online_trace_part['offline_aggregates']['paramkeys'].append(paramkeys) - online_trace_part['offline_aggregates']['param'].append(paramvalues) - if online_trace_part['isa'] == 'transition': - online_trace_part['offline_aggregates']['rel_energy_prev'].append( - offline_trace_part['uW_mean_delta_prev'] * (offline_trace_part['us'] - 20)) - online_trace_part['offline_aggregates']['rel_energy_next'].append( - offline_trace_part['uW_mean_delta_next'] * (offline_trace_part['us'] - 20)) - online_trace_part['offline_aggregates']['timeout'].append( - offline_trace_part['timeout']) + online_trace_part["offline_aggregates"]["power"].append( + offline_trace_part["uW_mean"] + ) + online_trace_part["offline_aggregates"]["duration"].append( + offline_trace_part["us"] - 20 + ) + online_trace_part["offline_aggregates"]["power_std"].append( + offline_trace_part["uW_std"] + ) + online_trace_part["offline_aggregates"]["energy"].append( + offline_trace_part["uW_mean"] * (offline_trace_part["us"] - 20) + ) + online_trace_part["offline_aggregates"]["paramkeys"].append(paramkeys) + online_trace_part["offline_aggregates"]["param"].append(paramvalues) + if online_trace_part["isa"] == "transition": + online_trace_part["offline_aggregates"]["rel_energy_prev"].append( + offline_trace_part["uW_mean_delta_prev"] + * (offline_trace_part["us"] - 20) + ) + online_trace_part["offline_aggregates"]["rel_energy_next"].append( + offline_trace_part["uW_mean_delta_next"] + * (offline_trace_part["us"] - 20) + ) + online_trace_part["offline_aggregates"]["timeout"].append( + offline_trace_part["timeout"] + ) def _merge_online_and_etlog(self, measurement): # Edits self.traces_by_fileno[measurement['fileno']][*]['trace'][*]['offline'] # and self.traces_by_fileno[measurement['fileno']][*]['trace'][*]['offline_aggregates'] in place # (appends data from measurement['energy_trace']) online_datapoints = [] - traces = self.traces_by_fileno[measurement['fileno']] + traces = self.traces_by_fileno[measurement["fileno"]] for run_idx, run in enumerate(traces): - for trace_part_idx in range(len(run['trace'])): + for trace_part_idx in range(len(run["trace"])): online_datapoints.append((run_idx, trace_part_idx)) for offline_idx, online_ref in enumerate(online_datapoints): online_run_idx, online_trace_part_idx = online_ref - offline_trace_part = measurement['energy_trace'][offline_idx] - online_trace_part = traces[online_run_idx]['trace'][online_trace_part_idx] + offline_trace_part = measurement["energy_trace"][offline_idx] + online_trace_part = traces[online_run_idx]["trace"][online_trace_part_idx] - if 'offline' not in online_trace_part: - online_trace_part['offline'] = [offline_trace_part] + if "offline" not in online_trace_part: + online_trace_part["offline"] = [offline_trace_part] else: - online_trace_part['offline'].append(offline_trace_part) + online_trace_part["offline"].append(offline_trace_part) - paramkeys = sorted(online_trace_part['parameter'].keys()) + paramkeys = sorted(online_trace_part["parameter"].keys()) paramvalues = list() for paramkey in paramkeys: - if type(online_trace_part['parameter'][paramkey]) is list: - paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey][measurement['repeat_id']])) + if type(online_trace_part["parameter"][paramkey]) is list: + paramvalues.append( + soft_cast_int( + online_trace_part["parameter"][paramkey][ + measurement["repeat_id"] + ] + ) + ) else: - paramvalues.append(soft_cast_int(online_trace_part['parameter'][paramkey])) + paramvalues.append( + soft_cast_int(online_trace_part["parameter"][paramkey]) + ) # NB: Unscheduled transitions do not have an 'args' field set. # However, they should only be caused by interrupts, and # interrupts don't have args anyways. - if arg_support_enabled and 'args' in online_trace_part: - paramvalues.extend(map(soft_cast_int, online_trace_part['args'])) - - if 'offline_aggregates' not in online_trace_part: - online_trace_part['offline_aggregates'] = { - 'offline_attributes': ['power', 'duration', 'energy'], - 'duration': list(), - 'power': list(), - 'power_std': list(), - 'energy': list(), - 'paramkeys': list(), - 'param': list() + if arg_support_enabled and "args" in online_trace_part: + paramvalues.extend(map(soft_cast_int, online_trace_part["args"])) + + if "offline_aggregates" not in online_trace_part: + online_trace_part["offline_aggregates"] = { + "offline_attributes": ["power", "duration", "energy"], + "duration": list(), + "power": list(), + "power_std": list(), + "energy": list(), + "paramkeys": list(), + "param": list(), } - offline_aggregates = online_trace_part['offline_aggregates'] + offline_aggregates = online_trace_part["offline_aggregates"] # if online_trace_part['isa'] == 'transitions': # online_trace_part['offline_attributes'].extend(['rel_energy_prev', 'rel_energy_next']) # offline_aggregates['rel_energy_prev'] = list() # offline_aggregates['rel_energy_next'] = list() - offline_aggregates['duration'].append(offline_trace_part['s'] * 1e6) - offline_aggregates['power'].append(offline_trace_part['W_mean'] * 1e6) - offline_aggregates['power_std'].append(offline_trace_part['W_std'] * 1e6) - offline_aggregates['energy'].append(offline_trace_part['W_mean'] * offline_trace_part['s'] * 1e12) - offline_aggregates['paramkeys'].append(paramkeys) - offline_aggregates['param'].append(paramvalues) + offline_aggregates["duration"].append(offline_trace_part["s"] * 1e6) + offline_aggregates["power"].append(offline_trace_part["W_mean"] * 1e6) + offline_aggregates["power_std"].append(offline_trace_part["W_std"] * 1e6) + offline_aggregates["energy"].append( + offline_trace_part["W_mean"] * offline_trace_part["s"] * 1e12 + ) + offline_aggregates["paramkeys"].append(paramkeys) + offline_aggregates["param"].append(paramvalues) # if online_trace_part['isa'] == 'transition': # offline_aggregates['rel_energy_prev'].append(offline_trace_part['W_mean_delta_prev'] * offline_trace_part['s'] * 1e12) @@ -922,8 +1041,8 @@ class RawData: for trace in list_of_traces: trace_output.extend(trace.copy()) for i, trace in enumerate(trace_output): - trace['orig_id'] = trace['id'] - trace['id'] = i + trace["orig_id"] = trace["id"] + trace["id"] = i return trace_output def get_preprocessed_data(self, verbose=True): @@ -1000,25 +1119,29 @@ class RawData: if version == 0: with tarfile.open(filename) as tf: - self.setup_by_fileno.append(json.load(tf.extractfile('setup.json'))) - self.traces_by_fileno.append(json.load(tf.extractfile('src/apps/DriverEval/DriverLog.json'))) + self.setup_by_fileno.append(json.load(tf.extractfile("setup.json"))) + self.traces_by_fileno.append( + json.load(tf.extractfile("src/apps/DriverEval/DriverLog.json")) + ) for member in tf.getmembers(): _, extension = os.path.splitext(member.name) - if extension == '.mim': - offline_data.append({ - 'content': tf.extractfile(member).read(), - 'fileno': i, - 'info': member, - 'setup': self.setup_by_fileno[i], - 'with_traces': self.with_traces, - }) + if extension == ".mim": + offline_data.append( + { + "content": tf.extractfile(member).read(), + "fileno": i, + "info": member, + "setup": self.setup_by_fileno[i], + "with_traces": self.with_traces, + } + ) elif version == 1: new_filenames = list() with tarfile.open(filename) as tf: - ptalog = json.load(tf.extractfile(tf.getmember('ptalog.json'))) - self.pta = ptalog['pta'] + ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json"))) + self.pta = ptalog["pta"] # Benchmark code may be too large to be executed in a single # run, so benchmarks (a benchmark is basically a list of DFA runs) @@ -1043,33 +1166,37 @@ class RawData: # ptalog['files'][0][0] is its first iteration/repetition, # ptalog['files'][0][1] the second, etc. - for j, traces in enumerate(ptalog['traces']): - new_filenames.append('{}#{}'.format(filename, j)) + for j, traces in enumerate(ptalog["traces"]): + new_filenames.append("{}#{}".format(filename, j)) self.traces_by_fileno.append(traces) - self.setup_by_fileno.append({ - 'mimosa_voltage': ptalog['configs'][j]['voltage'], - 'mimosa_shunt': ptalog['configs'][j]['shunt'], - 'state_duration': ptalog['opt']['sleep'], - }) - for repeat_id, mim_file in enumerate(ptalog['files'][j]): + self.setup_by_fileno.append( + { + "mimosa_voltage": ptalog["configs"][j]["voltage"], + "mimosa_shunt": ptalog["configs"][j]["shunt"], + "state_duration": ptalog["opt"]["sleep"], + } + ) + for repeat_id, mim_file in enumerate(ptalog["files"][j]): member = tf.getmember(mim_file) - offline_data.append({ - 'content': tf.extractfile(member).read(), - 'fileno': j, - 'info': member, - 'setup': self.setup_by_fileno[j], - 'repeat_id': repeat_id, - 'expected_trace': ptalog['traces'][j], - 'with_traces': self.with_traces, - }) + offline_data.append( + { + "content": tf.extractfile(member).read(), + "fileno": j, + "info": member, + "setup": self.setup_by_fileno[j], + "repeat_id": repeat_id, + "expected_trace": ptalog["traces"][j], + "with_traces": self.with_traces, + } + ) self.filenames = new_filenames elif version == 2: new_filenames = list() with tarfile.open(filename) as tf: - ptalog = json.load(tf.extractfile(tf.getmember('ptalog.json'))) - self.pta = ptalog['pta'] + ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json"))) + self.pta = ptalog["pta"] # Benchmark code may be too large to be executed in a single # run, so benchmarks (a benchmark is basically a list of DFA runs) @@ -1103,32 +1230,45 @@ class RawData: # to an invalid measurement and thus power[b] corresponding # to duration[C]. At the moment, this is harmless, but in the # future it might not be. - if 'offline_aggregates' in ptalog['traces'][0][0]['trace'][0]: - for trace_group in ptalog['traces']: + if "offline_aggregates" in ptalog["traces"][0][0]["trace"][0]: + for trace_group in ptalog["traces"]: for trace in trace_group: - for state_or_transition in trace['trace']: - offline_aggregates = state_or_transition.pop('offline_aggregates', None) + for state_or_transition in trace["trace"]: + offline_aggregates = state_or_transition.pop( + "offline_aggregates", None + ) if offline_aggregates: - state_or_transition['online_aggregates'] = offline_aggregates + state_or_transition[ + "online_aggregates" + ] = offline_aggregates - for j, traces in enumerate(ptalog['traces']): - new_filenames.append('{}#{}'.format(filename, j)) + for j, traces in enumerate(ptalog["traces"]): + new_filenames.append("{}#{}".format(filename, j)) self.traces_by_fileno.append(traces) - self.setup_by_fileno.append({ - 'voltage': ptalog['configs'][j]['voltage'], - 'state_duration': ptalog['opt']['sleep'], - }) - for repeat_id, etlog_file in enumerate(ptalog['files'][j]): + self.setup_by_fileno.append( + { + "voltage": ptalog["configs"][j]["voltage"], + "state_duration": ptalog["opt"]["sleep"], + } + ) + for repeat_id, etlog_file in enumerate(ptalog["files"][j]): member = tf.getmember(etlog_file) - offline_data.append({ - 'content': tf.extractfile(member).read(), - 'fileno': j, - 'info': member, - 'setup': self.setup_by_fileno[j], - 'repeat_id': repeat_id, - 'expected_trace': ptalog['traces'][j], - 'transition_names': list(map(lambda x: x['name'], ptalog['pta']['transitions'])) - }) + offline_data.append( + { + "content": tf.extractfile(member).read(), + "fileno": j, + "info": member, + "setup": self.setup_by_fileno[j], + "repeat_id": repeat_id, + "expected_trace": ptalog["traces"][j], + "transition_names": list( + map( + lambda x: x["name"], + ptalog["pta"]["transitions"], + ) + ), + } + ) self.filenames = new_filenames # TODO remove 'offline_aggregates' from pre-parse data and place # it under 'online_aggregates' or similar instead. This way, if @@ -1145,52 +1285,69 @@ class RawData: num_valid = 0 for measurement in measurements: - if 'energy_trace' not in measurement: - vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format( - ar=self.filenames[measurement['fileno']], - m=measurement['info'].name, - e='; '.join(measurement['datasource_errors']))) + if "energy_trace" not in measurement: + vprint( + self.verbose, + "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + ar=self.filenames[measurement["fileno"]], + m=measurement["info"].name, + e="; ".join(measurement["datasource_errors"]), + ), + ) continue if version == 0: # Strip the last state (it is not part of the scheduled measurement) - measurement['energy_trace'].pop() + measurement["energy_trace"].pop() elif version == 1: # The first online measurement is the UNINITIALIZED state. In v1, # it is not part of the expected PTA trace -> remove it. - measurement['energy_trace'].pop(0) + measurement["energy_trace"].pop(0) if version == 0 or version == 1: if self._measurement_is_valid_01(measurement): self._merge_online_and_offline(measurement) num_valid += 1 else: - vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format( - ar=self.filenames[measurement['fileno']], - m=measurement['info'].name, - e=measurement['error'])) + vprint( + self.verbose, + "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + ar=self.filenames[measurement["fileno"]], + m=measurement["info"].name, + e=measurement["error"], + ), + ) elif version == 2: if self._measurement_is_valid_2(measurement): self._merge_online_and_etlog(measurement) num_valid += 1 else: - vprint(self.verbose, '[W] Skipping {ar:s}/{m:s}: {e:s}'.format( - ar=self.filenames[measurement['fileno']], - m=measurement['info'].name, - e=measurement['error'])) - vprint(self.verbose, '[I] {num_valid:d}/{num_total:d} measurements are valid'.format( - num_valid=num_valid, - num_total=len(measurements))) + vprint( + self.verbose, + "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + ar=self.filenames[measurement["fileno"]], + m=measurement["info"].name, + e=measurement["error"], + ), + ) + vprint( + self.verbose, + "[I] {num_valid:d}/{num_total:d} measurements are valid".format( + num_valid=num_valid, num_total=len(measurements) + ), + ) if version == 0: self.traces = self._concatenate_traces(self.traces_by_fileno) elif version == 1: - self.traces = self._concatenate_traces(map(lambda x: x['expected_trace'], measurements)) + self.traces = self._concatenate_traces( + map(lambda x: x["expected_trace"], measurements) + ) self.traces = self._concatenate_traces(self.traces_by_fileno) elif version == 2: self.traces = self._concatenate_traces(self.traces_by_fileno) self.preprocessing_stats = { - 'num_runs': len(measurements), - 'num_valid': num_valid + "num_runs": len(measurements), + "num_valid": num_valid, } @@ -1207,16 +1364,33 @@ class ParallelParamFit: self.fit_queue = [] self.by_param = by_param - def enqueue(self, state_or_tran, attribute, param_index, param_name, safe_functions_enabled=False, param_filter=None): + def enqueue( + self, + state_or_tran, + attribute, + param_index, + param_name, + safe_functions_enabled=False, + param_filter=None, + ): """ Add state_or_tran/attribute/param_name to fit queue. This causes fit() to compute the best-fitting function for this model part. """ - self.fit_queue.append({ - 'key': [state_or_tran, attribute, param_name, param_filter], - 'args': [self.by_param, state_or_tran, attribute, param_index, safe_functions_enabled, param_filter] - }) + self.fit_queue.append( + { + "key": [state_or_tran, attribute, param_name, param_filter], + "args": [ + self.by_param, + state_or_tran, + attribute, + param_index, + safe_functions_enabled, + param_filter, + ], + } + ) def fit(self): """ @@ -1236,13 +1410,17 @@ def _try_fits_parallel(arg): Must be a global function as it is called from a multiprocessing Pool. """ - return { - 'key': arg['key'], - 'result': _try_fits(*arg['args']) - } + return {"key": arg["key"], "result": _try_fits(*arg["args"])} -def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functions_enabled=False, param_filter: dict = None): +def _try_fits( + by_param, + state_or_tran, + model_attribute, + param_index, + safe_functions_enabled=False, + param_filter: dict = None, +): """ Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions. @@ -1281,22 +1459,28 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi function_names = list(functions.keys()) for function_name in function_names: function_object = functions[function_name] - if is_numeric(param_key[1][param_index]) and not function_object.is_valid(param_key[1][param_index]): + if is_numeric(param_key[1][param_index]) and not function_object.is_valid( + param_key[1][param_index] + ): functions.pop(function_name, None) raw_results = dict() raw_results_by_param = dict() - ref_results = { - 'mean': list(), - 'median': list() - } + ref_results = {"mean": list(), "median": list()} results = dict() results_by_param = dict() seen_parameter_combinations = set() # for each parameter combination: - for param_key in filter(lambda x: x[0] == state_or_tran and remove_index_from_tuple(x[1], param_index) not in seen_parameter_combinations and len(by_param[x]['param']) and match_parameter_values(by_param[x]['param'][0], param_filter), by_param.keys()): + for param_key in filter( + lambda x: x[0] == state_or_tran + and remove_index_from_tuple(x[1], param_index) + not in seen_parameter_combinations + and len(by_param[x]["param"]) + and match_parameter_values(by_param[x]["param"][0], param_filter), + by_param.keys(), + ): X = [] Y = [] num_valid = 0 @@ -1304,10 +1488,14 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1, # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters - seen_parameter_combinations.add(remove_index_from_tuple(param_key[1], param_index)) + seen_parameter_combinations.add( + remove_index_from_tuple(param_key[1], param_index) + ) # for each value of the parameter denoted by param_index (all other parameters remain the same): - for k, v in filter(lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()): + for k, v in filter( + lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items() + ): num_total += 1 if is_numeric(k[1][param_index]): num_valid += 1 @@ -1324,7 +1512,9 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi if function_name not in raw_results: raw_results[function_name] = dict() error_function = param_function.error_function - res = optimize.least_squares(error_function, [0, 1], args=(X, Y), xtol=2e-15) + res = optimize.least_squares( + error_function, [0, 1], args=(X, Y), xtol=2e-15 + ) measures = regression_measures(param_function.eval(res.x, X), Y) raw_results_by_param[other_parameters][function_name] = measures for measure, error_rate in measures.items(): @@ -1333,38 +1523,37 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi raw_results[function_name][measure].append(error_rate) # print(function_name, res, measures) mean_measures = aggregate_measures(np.mean(Y), Y) - ref_results['mean'].append(mean_measures['rmsd']) - raw_results_by_param[other_parameters]['mean'] = mean_measures + ref_results["mean"].append(mean_measures["rmsd"]) + raw_results_by_param[other_parameters]["mean"] = mean_measures median_measures = aggregate_measures(np.median(Y), Y) - ref_results['median'].append(median_measures['rmsd']) - raw_results_by_param[other_parameters]['median'] = median_measures + ref_results["median"].append(median_measures["rmsd"]) + raw_results_by_param[other_parameters]["median"] = median_measures - if not len(ref_results['mean']): + if not len(ref_results["mean"]): # Insufficient data for fitting # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index)) - return { - 'best': None, - 'best_rmsd': np.inf, - 'results': results - } + return {"best": None, "best_rmsd": np.inf, "results": results} - for other_parameter_combination, other_parameter_results in raw_results_by_param.items(): + for ( + other_parameter_combination, + other_parameter_results, + ) in raw_results_by_param.items(): best_fit_val = np.inf best_fit_name = None results = dict() for function_name, result in other_parameter_results.items(): if len(result) > 0: results[function_name] = result - rmsd = result['rmsd'] + rmsd = result["rmsd"] if rmsd < best_fit_val: best_fit_val = rmsd best_fit_name = function_name results_by_param[other_parameter_combination] = { - 'best': best_fit_name, - 'best_rmsd': best_fit_val, - 'mean_rmsd': results['mean']['rmsd'], - 'median_rmsd': results['median']['rmsd'], - 'results': results + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": results["mean"]["rmsd"], + "median_rmsd": results["median"]["rmsd"], + "results": results, } best_fit_val = np.inf @@ -1375,26 +1564,26 @@ def _try_fits(by_param, state_or_tran, model_attribute, param_index, safe_functi results[function_name] = {} for measure in result.keys(): results[function_name][measure] = np.mean(result[measure]) - rmsd = results[function_name]['rmsd'] + rmsd = results[function_name]["rmsd"] if rmsd < best_fit_val: best_fit_val = rmsd best_fit_name = function_name return { - 'best': best_fit_name, - 'best_rmsd': best_fit_val, - 'mean_rmsd': np.mean(ref_results['mean']), - 'median_rmsd': np.mean(ref_results['median']), - 'results': results, - 'results_by_other_param': results_by_param + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": np.mean(ref_results["mean"]), + "median_rmsd": np.mean(ref_results["median"]), + "results": results, + "results_by_other_param": results_by_param, } def _num_args_from_by_name(by_name): num_args = dict() for key, value in by_name.items(): - if 'args' in value: - num_args[key] = len(value['args'][0]) + if "args" in value: + num_args[key] = len(value["args"][0]) return num_args @@ -1413,19 +1602,44 @@ def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = """ fit_result = dict() for result in results: - if result['key'][0] == name and result['key'][1] == attribute and result['key'][3] == param_filter and result['result']['best'] is not None: # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? - this_result = result['result'] - if this_result['best_rmsd'] >= min(this_result['mean_rmsd'], this_result['median_rmsd']): - vprint(verbose, '[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})'.format( - name, attribute, result['key'][2], this_result['best_rmsd'], - this_result['mean_rmsd'], this_result['median_rmsd'])) + if ( + result["key"][0] == name + and result["key"][1] == attribute + and result["key"][3] == param_filter + and result["result"]["best"] is not None + ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? + this_result = result["result"] + if this_result["best_rmsd"] >= min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + vprint( + verbose, + "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ), + ) # See notes on depends_on_param - elif this_result['best_rmsd'] >= 0.8 * min(this_result['mean_rmsd'], this_result['median_rmsd']): - vprint(verbose, '[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})'.format( - name, attribute, result['key'][2], this_result['best_rmsd'], - this_result['mean_rmsd'], this_result['median_rmsd'])) + elif this_result["best_rmsd"] >= 0.8 * min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + vprint( + verbose, + "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ), + ) else: - fit_result[result['key'][2]] = this_result + fit_result[result["key"][2]] = this_result return fit_result @@ -1471,7 +1685,15 @@ class AnalyticModel: assess -- calculate model quality """ - def __init__(self, by_name, parameters, arg_count=None, function_override=dict(), verbose=True, use_corrcoef=False): + def __init__( + self, + by_name, + parameters, + arg_count=None, + function_override=dict(), + verbose=True, + use_corrcoef=False, + ): """ Create a new AnalyticModel and compute parameter statistics. @@ -1521,19 +1743,29 @@ class AnalyticModel: if self._num_args is None: self._num_args = _num_args_from_by_name(by_name) - self.stats = ParamStats(self.by_name, self.by_param, self.parameters, self._num_args, verbose=verbose, use_corrcoef=use_corrcoef) + self.stats = ParamStats( + self.by_name, + self.by_param, + self.parameters, + self._num_args, + verbose=verbose, + use_corrcoef=use_corrcoef, + ) def _get_model_from_dict(self, model_dict, model_function): model = {} for name, elem in model_dict.items(): model[name] = {} - for key in elem['attributes']: + for key in elem["attributes"]: try: model[name][key] = model_function(elem[key]) except RuntimeWarning: - vprint(self.verbose, '[W] Got no data for {} {}'.format(name, key)) + vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) except FloatingPointError as fpe: - vprint(self.verbose, '[W] Got no data for {} {}: {}'.format(name, key, fpe)) + vprint( + self.verbose, + "[W] Got no data for {} {}: {}".format(name, key, fpe), + ) return model def param_index(self, param_name): @@ -1596,22 +1828,28 @@ class AnalyticModel: model_function(name, attribute, param=parameter values) -> model value. model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None """ - if 'fitted_model_getter' in self.cache and 'fitted_info_getter' in self.cache: - return self.cache['fitted_model_getter'], self.cache['fitted_info_getter'] + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] static_model = self._get_model_from_dict(self.by_name, np.median) param_model = dict([[name, {}] for name in self.by_name.keys()]) paramfit = ParallelParamFit(self.by_param) for name in self.by_name.keys(): - for attribute in self.by_name[name]['attributes']: + for attribute in self.by_name[name]["attributes"]: for param_index, param in enumerate(self.parameters): if self.stats.depends_on_param(name, attribute, param): paramfit.enqueue(name, attribute, param_index, param, False) if arg_support_enabled and name in self._num_args: for arg_index in range(self._num_args[name]): if self.stats.depends_on_arg(name, attribute, arg_index): - paramfit.enqueue(name, attribute, len(self.parameters) + arg_index, arg_index, False) + paramfit.enqueue( + name, + attribute, + len(self.parameters) + arg_index, + arg_index, + False, + ) paramfit.fit() @@ -1619,8 +1857,10 @@ class AnalyticModel: num_args = 0 if name in self._num_args: num_args = self._num_args[name] - for attribute in self.by_name[name]['attributes']: - fit_result = get_fit_result(paramfit.results, name, attribute, self.verbose) + for attribute in self.by_name[name]["attributes"]: + fit_result = get_fit_result( + paramfit.results, name, attribute, self.verbose + ) if (name, attribute) in self.function_override: function_str = self.function_override[(name, attribute)] @@ -1628,25 +1868,27 @@ class AnalyticModel: x.fit(self.by_param, name, attribute) if x.fit_success: param_model[name][attribute] = { - 'fit_result': fit_result, - 'function': x + "fit_result": fit_result, + "function": x, } elif len(fit_result.keys()): - x = analytic.function_powerset(fit_result, self.parameters, num_args) + x = analytic.function_powerset( + fit_result, self.parameters, num_args + ) x.fit(self.by_param, name, attribute) if x.fit_success: param_model[name][attribute] = { - 'fit_result': fit_result, - 'function': x + "fit_result": fit_result, + "function": x, } def model_getter(name, key, **kwargs): - if 'arg' in kwargs and 'param' in kwargs: - kwargs['param'].extend(map(soft_cast_int, kwargs['arg'])) + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) if key in param_model[name]: - param_list = kwargs['param'] - param_function = param_model[name][key]['function'] + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] if param_function.is_predictable(param_list): return param_function.eval(param_list) return static_model[name][key] @@ -1656,8 +1898,8 @@ class AnalyticModel: return param_model[name][key] return None - self.cache['fitted_model_getter'] = model_getter - self.cache['fitted_info_getter'] = info_getter + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter return model_getter, info_getter @@ -1677,13 +1919,22 @@ class AnalyticModel: detailed_results = {} for name, elem in sorted(self.by_name.items()): detailed_results[name] = {} - for attribute in elem['attributes']: - predicted_data = np.array(list(map(lambda i: model_function(name, attribute, param=elem['param'][i]), range(len(elem[attribute]))))) + for attribute in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function( + name, attribute, param=elem["param"][i] + ), + range(len(elem[attribute])), + ) + ) + ) measures = regression_measures(predicted_data, elem[attribute]) detailed_results[name][attribute] = measures return { - 'by_name': detailed_results, + "by_name": detailed_results, } def to_json(self): @@ -1695,25 +1946,28 @@ def _add_trace_data_to_aggregate(aggregate, key, element): # Only cares about element['isa'], element['offline_aggregates'], and # element['plan']['level'] if key not in aggregate: - aggregate[key] = { - 'isa': element['isa'] - } - for datakey in element['offline_aggregates'].keys(): + aggregate[key] = {"isa": element["isa"]} + for datakey in element["offline_aggregates"].keys(): aggregate[key][datakey] = [] - if element['isa'] == 'state': - aggregate[key]['attributes'] = ['power'] + if element["isa"] == "state": + aggregate[key]["attributes"] = ["power"] else: # TODO do not hardcode values - aggregate[key]['attributes'] = ['duration', 'energy', 'rel_energy_prev', 'rel_energy_next'] + aggregate[key]["attributes"] = [ + "duration", + "energy", + "rel_energy_prev", + "rel_energy_next", + ] # Uncomment this line if you also want to analyze mean transition power # aggrgate[key]['attributes'].append('power') - if 'plan' in element and element['plan']['level'] == 'epilogue': - aggregate[key]['attributes'].insert(0, 'timeout') - attributes = aggregate[key]['attributes'].copy() + if "plan" in element and element["plan"]["level"] == "epilogue": + aggregate[key]["attributes"].insert(0, "timeout") + attributes = aggregate[key]["attributes"].copy() for attribute in attributes: - if attribute not in element['offline_aggregates']: - aggregate[key]['attributes'].remove(attribute) - for datakey, dataval in element['offline_aggregates'].items(): + if attribute not in element["offline_aggregates"]: + aggregate[key]["attributes"].remove(attribute) + for datakey, dataval in element["offline_aggregates"].items(): aggregate[key][datakey].extend(dataval) @@ -1771,16 +2025,20 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]): """ arg_count = dict() by_name = dict() - parameter_names = sorted(traces[0]['trace'][0]['parameter'].keys()) + parameter_names = sorted(traces[0]["trace"][0]["parameter"].keys()) for run in traces: - if run['id'] not in ignore_trace_indexes: - for elem in run['trace']: - if elem['isa'] == 'transition' and not elem['name'] in arg_count and 'args' in elem: - arg_count[elem['name']] = len(elem['args']) - if elem['name'] != 'UNINITIALIZED': - _add_trace_data_to_aggregate(by_name, elem['name'], elem) + if run["id"] not in ignore_trace_indexes: + for elem in run["trace"]: + if ( + elem["isa"] == "transition" + and not elem["name"] in arg_count + and "args" in elem + ): + arg_count[elem["name"]] = len(elem["args"]) + if elem["name"] != "UNINITIALIZED": + _add_trace_data_to_aggregate(by_name, elem["name"], elem) for elem in by_name.values(): - for key in elem['attributes']: + for key in elem["attributes"]: elem[key] = np.array(elem[key]) return by_name, parameter_names, arg_count @@ -1817,7 +2075,19 @@ class PTAModel: - rel_energy_next: transition energy relative to next state mean power in pJ """ - def __init__(self, by_name, parameters, arg_count, traces=[], ignore_trace_indexes=[], discard_outliers=None, function_override={}, verbose=True, use_corrcoef=False, pta=None): + def __init__( + self, + by_name, + parameters, + arg_count, + traces=[], + ignore_trace_indexes=[], + discard_outliers=None, + function_override={}, + verbose=True, + use_corrcoef=False, + pta=None, + ): """ Prepare a new PTA energy model. @@ -1854,9 +2124,16 @@ class PTAModel: self._num_args = arg_count self._use_corrcoef = use_corrcoef self.traces = traces - self.stats = ParamStats(self.by_name, self.by_param, self._parameter_names, self._num_args, self._use_corrcoef, verbose=verbose) + self.stats = ParamStats( + self.by_name, + self.by_param, + self._parameter_names, + self._num_args, + self._use_corrcoef, + verbose=verbose, + ) self.cache = {} - np.seterr('raise') + np.seterr("raise") self._outlier_threshold = discard_outliers self.function_override = function_override.copy() self.verbose = verbose @@ -1866,7 +2143,7 @@ class PTAModel: def _aggregate_to_ndarray(self, aggregate): for elem in aggregate.values(): - for key in elem['attributes']: + for key in elem["attributes"]: elem[key] = np.array(elem[key]) # This heuristic is very similar to the "function is not much better than @@ -1884,13 +2161,16 @@ class PTAModel: model = {} for name, elem in model_dict.items(): model[name] = {} - for key in elem['attributes']: + for key in elem["attributes"]: try: model[name][key] = model_function(elem[key]) except RuntimeWarning: - vprint(self.verbose, '[W] Got no data for {} {}'.format(name, key)) + vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) except FloatingPointError as fpe: - vprint(self.verbose, '[W] Got no data for {} {}: {}'.format(name, key, fpe)) + vprint( + self.verbose, + "[W] Got no data for {} {}: {}".format(name, key, fpe), + ) return model def get_static(self, use_mean=False): @@ -1953,63 +2233,110 @@ class PTAModel: model_function(name, attribute, param=parameter values) -> model value. model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None """ - if 'fitted_model_getter' in self.cache and 'fitted_info_getter' in self.cache: - return self.cache['fitted_model_getter'], self.cache['fitted_info_getter'] + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] static_model = self._get_model_from_dict(self.by_name, np.median) - param_model = dict([[state_or_tran, {}] for state_or_tran in self.by_name.keys()]) + param_model = dict( + [[state_or_tran, {}] for state_or_tran in self.by_name.keys()] + ) paramfit = ParallelParamFit(self.by_param) for state_or_tran in self.by_name.keys(): - for model_attribute in self.by_name[state_or_tran]['attributes']: + for model_attribute in self.by_name[state_or_tran]["attributes"]: fit_results = {} for parameter_index, parameter_name in enumerate(self._parameter_names): - if self.depends_on_param(state_or_tran, model_attribute, parameter_name): - paramfit.enqueue(state_or_tran, model_attribute, parameter_index, parameter_name, safe_functions_enabled) - for codependent_param_dict in self.stats.codependent_parameter_value_dicts(state_or_tran, model_attribute, parameter_name): - paramfit.enqueue(state_or_tran, model_attribute, parameter_index, parameter_name, safe_functions_enabled, codependent_param_dict) - if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition': + if self.depends_on_param( + state_or_tran, model_attribute, parameter_name + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + parameter_index, + parameter_name, + safe_functions_enabled, + ) + for ( + codependent_param_dict + ) in self.stats.codependent_parameter_value_dicts( + state_or_tran, model_attribute, parameter_name + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + parameter_index, + parameter_name, + safe_functions_enabled, + codependent_param_dict, + ) + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): for arg_index in range(self._num_args[state_or_tran]): - if self.depends_on_arg(state_or_tran, model_attribute, arg_index): - paramfit.enqueue(state_or_tran, model_attribute, len(self._parameter_names) + arg_index, arg_index, safe_functions_enabled) + if self.depends_on_arg( + state_or_tran, model_attribute, arg_index + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + len(self._parameter_names) + arg_index, + arg_index, + safe_functions_enabled, + ) paramfit.fit() for state_or_tran in self.by_name.keys(): num_args = 0 - if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition': + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): num_args = self._num_args[state_or_tran] - for model_attribute in self.by_name[state_or_tran]['attributes']: - fit_results = get_fit_result(paramfit.results, state_or_tran, model_attribute, self.verbose) + for model_attribute in self.by_name[state_or_tran]["attributes"]: + fit_results = get_fit_result( + paramfit.results, state_or_tran, model_attribute, self.verbose + ) for parameter_name in self._parameter_names: - if self.depends_on_param(state_or_tran, model_attribute, parameter_name): - for codependent_param_dict in self.stats.codependent_parameter_value_dicts(state_or_tran, model_attribute, parameter_name): + if self.depends_on_param( + state_or_tran, model_attribute, parameter_name + ): + for ( + codependent_param_dict + ) in self.stats.codependent_parameter_value_dicts( + state_or_tran, model_attribute, parameter_name + ): pass # FIXME get_fit_result hat ja gar keinen Parameter als Argument... if (state_or_tran, model_attribute) in self.function_override: - function_str = self.function_override[(state_or_tran, model_attribute)] + function_str = self.function_override[ + (state_or_tran, model_attribute) + ] x = AnalyticFunction(function_str, self._parameter_names, num_args) x.fit(self.by_param, state_or_tran, model_attribute) if x.fit_success: param_model[state_or_tran][model_attribute] = { - 'fit_result': fit_results, - 'function': x + "fit_result": fit_results, + "function": x, } elif len(fit_results.keys()): - x = analytic.function_powerset(fit_results, self._parameter_names, num_args) + x = analytic.function_powerset( + fit_results, self._parameter_names, num_args + ) x.fit(self.by_param, state_or_tran, model_attribute) if x.fit_success: param_model[state_or_tran][model_attribute] = { - 'fit_result': fit_results, - 'function': x + "fit_result": fit_results, + "function": x, } def model_getter(name, key, **kwargs): - if 'arg' in kwargs and 'param' in kwargs: - kwargs['param'].extend(map(soft_cast_int, kwargs['arg'])) + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) if key in param_model[name]: - param_list = kwargs['param'] - param_function = param_model[name][key]['function'] + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] if param_function.is_predictable(param_list): return param_function.eval(param_list) return static_model[name][key] @@ -2019,8 +2346,8 @@ class PTAModel: return param_model[name][key] return None - self.cache['fitted_model_getter'] = model_getter - self.cache['fitted_info_getter'] = info_getter + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter return model_getter, info_getter @@ -2029,16 +2356,32 @@ class PTAModel: static_quality = self.assess(static_model) param_model, param_info = self.get_fitted() analytic_quality = self.assess(param_model) - self.pta.update(static_model, param_info, static_error=static_quality['by_name'], analytic_error=analytic_quality['by_name']) + self.pta.update( + static_model, + param_info, + static_error=static_quality["by_name"], + analytic_error=analytic_quality["by_name"], + ) return self.pta.to_json() def states(self): """Return sorted list of state names.""" - return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'state', self.by_name.keys()))) + return sorted( + list( + filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys()) + ) + ) def transitions(self): """Return sorted list of transition names.""" - return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'transition', self.by_name.keys()))) + return sorted( + list( + filter( + lambda k: self.by_name[k]["isa"] == "transition", + self.by_name.keys(), + ) + ) + ) def states_and_transitions(self): """Return list of states and transition names.""" @@ -2050,7 +2393,7 @@ class PTAModel: return self._parameter_names def attributes(self, state_or_trans): - return self.by_name[state_or_trans]['attributes'] + return self.by_name[state_or_trans]["attributes"] def assess(self, model_function): """ @@ -2068,16 +2411,23 @@ class PTAModel: detailed_results = {} for name, elem in sorted(self.by_name.items()): detailed_results[name] = {} - for key in elem['attributes']: - predicted_data = np.array(list(map(lambda i: model_function(name, key, param=elem['param'][i]), range(len(elem[key]))))) + for key in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function(name, key, param=elem["param"][i]), + range(len(elem[key])), + ) + ) + ) measures = regression_measures(predicted_data, elem[key]) detailed_results[name][key] = measures - return { - 'by_name': detailed_results - } + return {"by_name": detailed_results} - def assess_states(self, model_function, model_attribute='power', distribution: dict = None): + def assess_states( + self, model_function, model_attribute="power", distribution: dict = None + ): """ Calculate overall model error assuming equal distribution of states """ @@ -2089,7 +2439,9 @@ class PTAModel: distribution = dict(map(lambda x: [x, 1 / num_states], self.states())) if not np.isclose(sum(distribution.values()), 1): - raise ValueError('distribution must be a probability distribution with sum 1') + raise ValueError( + "distribution must be a probability distribution with sum 1" + ) # total_value = None # try: @@ -2097,7 +2449,17 @@ class PTAModel: # except KeyError: # pass - total_error = np.sqrt(sum(map(lambda x: np.square(model_quality['by_name'][x][model_attribute]['mae'] * distribution[x]), self.states()))) + total_error = np.sqrt( + sum( + map( + lambda x: np.square( + model_quality["by_name"][x][model_attribute]["mae"] + * distribution[x] + ), + self.states(), + ) + ) + ) return total_error def assess_on_traces(self, model_function): @@ -2118,44 +2480,72 @@ class PTAModel: real_timeout_list = [] for trace in self.traces: - if trace['id'] not in self.ignore_trace_indexes: - for rep_id in range(len(trace['trace'][0]['offline'])): - model_energy = 0. - real_energy = 0. - model_rel_energy = 0. - model_state_energy = 0. - model_duration = 0. - real_duration = 0. - model_timeout = 0. - real_timeout = 0. - for i, trace_part in enumerate(trace['trace']): - name = trace_part['name'] - prev_name = trace['trace'][i - 1]['name'] - isa = trace_part['isa'] - if name != 'UNINITIALIZED': + if trace["id"] not in self.ignore_trace_indexes: + for rep_id in range(len(trace["trace"][0]["offline"])): + model_energy = 0.0 + real_energy = 0.0 + model_rel_energy = 0.0 + model_state_energy = 0.0 + model_duration = 0.0 + real_duration = 0.0 + model_timeout = 0.0 + real_timeout = 0.0 + for i, trace_part in enumerate(trace["trace"]): + name = trace_part["name"] + prev_name = trace["trace"][i - 1]["name"] + isa = trace_part["isa"] + if name != "UNINITIALIZED": try: - param = trace_part['offline_aggregates']['param'][rep_id] - prev_param = trace['trace'][i - 1]['offline_aggregates']['param'][rep_id] - power = trace_part['offline'][rep_id]['uW_mean'] - duration = trace_part['offline'][rep_id]['us'] - prev_duration = trace['trace'][i - 1]['offline'][rep_id]['us'] + param = trace_part["offline_aggregates"]["param"][ + rep_id + ] + prev_param = trace["trace"][i - 1][ + "offline_aggregates" + ]["param"][rep_id] + power = trace_part["offline"][rep_id]["uW_mean"] + duration = trace_part["offline"][rep_id]["us"] + prev_duration = trace["trace"][i - 1]["offline"][ + rep_id + ]["us"] real_energy += power * duration - if isa == 'state': - model_energy += model_function(name, 'power', param=param) * duration + if isa == "state": + model_energy += ( + model_function(name, "power", param=param) + * duration + ) else: - model_energy += model_function(name, 'energy', param=param) + model_energy += model_function( + name, "energy", param=param + ) # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data if i == 1: - model_rel_energy += model_function(name, 'energy', param=param) + model_rel_energy += model_function( + name, "energy", param=param + ) else: - model_rel_energy += model_function(prev_name, 'power', param=prev_param) * (prev_duration + duration) - model_state_energy += model_function(prev_name, 'power', param=prev_param) * (prev_duration + duration) - model_rel_energy += model_function(name, 'rel_energy_prev', param=param) + model_rel_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_state_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_rel_energy += model_function( + name, "rel_energy_prev", param=param + ) real_duration += duration - model_duration += model_function(name, 'duration', param=param) - if 'plan' in trace_part and trace_part['plan']['level'] == 'epilogue': - real_timeout += trace_part['offline'][rep_id]['timeout'] - model_timeout += model_function(name, 'timeout', param=param) + model_duration += model_function( + name, "duration", param=param + ) + if ( + "plan" in trace_part + and trace_part["plan"]["level"] == "epilogue" + ): + real_timeout += trace_part["offline"][rep_id][ + "timeout" + ] + model_timeout += model_function( + name, "timeout", param=param + ) except KeyError: # if states/transitions have been removed via --filter-param, this is harmless pass @@ -2169,11 +2559,21 @@ class PTAModel: model_timeout_list.append(model_timeout) return { - 'duration_by_trace': regression_measures(np.array(model_duration_list), np.array(real_duration_list)), - 'energy_by_trace': regression_measures(np.array(model_energy_list), np.array(real_energy_list)), - 'timeout_by_trace': regression_measures(np.array(model_timeout_list), np.array(real_timeout_list)), - 'rel_energy_by_trace': regression_measures(np.array(model_rel_energy_list), np.array(real_energy_list)), - 'state_energy_by_trace': regression_measures(np.array(model_state_energy_list), np.array(real_energy_list)), + "duration_by_trace": regression_measures( + np.array(model_duration_list), np.array(real_duration_list) + ), + "energy_by_trace": regression_measures( + np.array(model_energy_list), np.array(real_energy_list) + ), + "timeout_by_trace": regression_measures( + np.array(model_timeout_list), np.array(real_timeout_list) + ), + "rel_energy_by_trace": regression_measures( + np.array(model_rel_energy_list), np.array(real_energy_list) + ), + "state_energy_by_trace": regression_measures( + np.array(model_state_energy_list), np.array(real_energy_list) + ), } @@ -2230,17 +2630,19 @@ class EnergyTraceLog: """ if not zbar_available: - self.errors.append('zbar module is not available. Try "apt install python3-zbar"') + self.errors.append( + 'zbar module is not available. Try "apt install python3-zbar"' + ) return list() - lines = log_data.decode('ascii').split('\n') - data_count = sum(map(lambda x: len(x) > 0 and x[0] != '#', lines)) - data_lines = filter(lambda x: len(x) > 0 and x[0] != '#', lines) + lines = log_data.decode("ascii").split("\n") + data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) + data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) data = np.empty((data_count, 4)) for i, line in enumerate(data_lines): - fields = line.split(' ') + fields = line.split(" ") if len(fields) == 4: timestamp, current, voltage, total_energy = map(int, fields) elif len(fields) == 5: @@ -2252,15 +2654,26 @@ class EnergyTraceLog: self.interval_start_timestamp = data[:-1, 0] * 1e-6 self.interval_duration = (data[1:, 0] - data[:-1, 0]) * 1e-6 - self.interval_power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ((data[1:, 0] - data[:-1, 0]) * 1e-6) + self.interval_power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ( + (data[1:, 0] - data[:-1, 0]) * 1e-6 + ) m_duration_us = data[-1, 0] - data[0, 0] self.sample_rate = data_count / (m_duration_us * 1e-6) - vprint(self.verbose, 'got {} samples with {} seconds of log data ({} Hz)'.format(data_count, m_duration_us * 1e-6, self.sample_rate)) + vprint( + self.verbose, + "got {} samples with {} seconds of log data ({} Hz)".format( + data_count, m_duration_us * 1e-6, self.sample_rate + ), + ) - return self.interval_start_timestamp, self.interval_duration, self.interval_power + return ( + self.interval_start_timestamp, + self.interval_duration, + self.interval_power, + ) def ts_to_index(self, timestamp): """ @@ -2279,7 +2692,12 @@ class EnergyTraceLog: mid_index = left_index + (right_index - left_index) // 2 # I'm feeling lucky - if timestamp > self.interval_start_timestamp[mid_index] and timestamp <= self.interval_start_timestamp[mid_index] + self.interval_duration[mid_index]: + if ( + timestamp > self.interval_start_timestamp[mid_index] + and timestamp + <= self.interval_start_timestamp[mid_index] + + self.interval_duration[mid_index] + ): return mid_index if timestamp <= self.interval_start_timestamp[mid_index]: @@ -2322,16 +2740,29 @@ class EnergyTraceLog: expected_transitions = list() for trace_number, trace in enumerate(traces): - for state_or_transition_number, state_or_transition in enumerate(trace['trace']): - if state_or_transition['isa'] == 'transition': + for state_or_transition_number, state_or_transition in enumerate( + trace["trace"] + ): + if state_or_transition["isa"] == "transition": try: - expected_transitions.append(( - state_or_transition['name'], - state_or_transition['online_aggregates']['duration'][offline_index] * 1e-6 - )) + expected_transitions.append( + ( + state_or_transition["name"], + state_or_transition["online_aggregates"]["duration"][ + offline_index + ] + * 1e-6, + ) + ) except IndexError: - self.errors.append('Entry #{} ("{}") in trace #{} has no duration entry for offline_index/repeat_id {}'.format( - state_or_transition_number, state_or_transition['name'], trace_number, offline_index)) + self.errors.append( + 'Entry #{} ("{}") in trace #{} has no duration entry for offline_index/repeat_id {}'.format( + state_or_transition_number, + state_or_transition["name"], + trace_number, + offline_index, + ) + ) return energy_trace next_barcode = first_sync @@ -2342,51 +2773,101 @@ class EnergyTraceLog: print('[!!!] did not find transition "{}"'.format(name)) break next_barcode = end + self.state_duration + duration - vprint(self.verbose, '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format(offline_index, bc, start, stop, end)) + vprint( + self.verbose, + '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format( + offline_index, bc, start, stop, end + ), + ) if bc != name: - vprint(self.verbose, '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc)) - vprint(self.verbose, '{} estimated transition area: {:0.3f} .. {:0.3f} seconds'.format(offline_index, end, end + duration)) + vprint( + self.verbose, + '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc), + ) + vprint( + self.verbose, + "{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format( + offline_index, end, end + duration + ), + ) transition_start_index = self.ts_to_index(end) transition_done_index = self.ts_to_index(end + duration) + 1 state_start_index = transition_done_index - state_done_index = self.ts_to_index(end + duration + self.state_duration) + 1 - - vprint(self.verbose, '{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds'.format(offline_index, transition_start_index / self.sample_rate, transition_done_index / self.sample_rate)) + state_done_index = ( + self.ts_to_index(end + duration + self.state_duration) + 1 + ) - energy_trace.append({ - 'isa': 'transition', - 'W_mean': np.mean(self.interval_power[transition_start_index: transition_done_index]), - 'W_std': np.std(self.interval_power[transition_start_index: transition_done_index]), - 's': duration, - 's_coarse': self.interval_start_timestamp[transition_done_index] - self.interval_start_timestamp[transition_start_index] + vprint( + self.verbose, + "{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format( + offline_index, + transition_start_index / self.sample_rate, + transition_done_index / self.sample_rate, + ), + ) - }) + energy_trace.append( + { + "isa": "transition", + "W_mean": np.mean( + self.interval_power[ + transition_start_index:transition_done_index + ] + ), + "W_std": np.std( + self.interval_power[ + transition_start_index:transition_done_index + ] + ), + "s": duration, + "s_coarse": self.interval_start_timestamp[transition_done_index] + - self.interval_start_timestamp[transition_start_index], + } + ) if len(energy_trace) > 1: - energy_trace[-1]['W_mean_delta_prev'] = energy_trace[-1]['W_mean'] - energy_trace[-2]['W_mean'] + energy_trace[-1]["W_mean_delta_prev"] = ( + energy_trace[-1]["W_mean"] - energy_trace[-2]["W_mean"] + ) - energy_trace.append({ - 'isa': 'state', - 'W_mean': np.mean(self.interval_power[state_start_index: state_done_index]), - 'W_std': np.std(self.interval_power[state_start_index: state_done_index]), - 's': self.state_duration, - 's_coarse': self.interval_start_timestamp[state_done_index] - self.interval_start_timestamp[state_start_index] - }) + energy_trace.append( + { + "isa": "state", + "W_mean": np.mean( + self.interval_power[state_start_index:state_done_index] + ), + "W_std": np.std( + self.interval_power[state_start_index:state_done_index] + ), + "s": self.state_duration, + "s_coarse": self.interval_start_timestamp[state_done_index] + - self.interval_start_timestamp[state_start_index], + } + ) - energy_trace[-2]['W_mean_delta_next'] = energy_trace[-2]['W_mean'] - energy_trace[-1]['W_mean'] + energy_trace[-2]["W_mean_delta_next"] = ( + energy_trace[-2]["W_mean"] - energy_trace[-1]["W_mean"] + ) expected_transition_count = len(expected_transitions) recovered_transition_ount = len(energy_trace) // 2 if expected_transition_count != recovered_transition_ount: - self.errors.append('Expected {:d} transitions, got {:d}'.format(expected_transition_count, recovered_transition_ount)) + self.errors.append( + "Expected {:d} transitions, got {:d}".format( + expected_transition_count, recovered_transition_ount + ) + ) return energy_trace def find_first_sync(self): # LED Power is approx. self.led_power W, use self.led_power/2 W above surrounding median as threshold - sync_threshold_power = np.median(self.interval_power[: int(3 * self.sample_rate)]) + self.led_power / 3 + sync_threshold_power = ( + np.median(self.interval_power[: int(3 * self.sample_rate)]) + + self.led_power / 3 + ) for i, ts in enumerate(self.interval_start_timestamp): if ts > 2 and self.interval_power[i] > sync_threshold_power: return self.interval_start_timestamp[i - 300] @@ -2410,26 +2891,56 @@ class EnergyTraceLog: lookaround = int(0.1 * self.sample_rate) # LED Power is approx. self.led_power W, use self.led_power/2 W above surrounding median as threshold - sync_threshold_power = np.median(self.interval_power[start_position - lookaround: start_position + lookaround]) + self.led_power / 3 + sync_threshold_power = ( + np.median( + self.interval_power[ + start_position - lookaround : start_position + lookaround + ] + ) + + self.led_power / 3 + ) - vprint(self.verbose, 'looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW'.format(start_ts, sync_threshold_power * 1e3)) + vprint( + self.verbose, + "looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format( + start_ts, sync_threshold_power * 1e3 + ), + ) sync_area_start = None sync_start_ts = None sync_area_end = None sync_end_ts = None for i, ts in enumerate(self.interval_start_timestamp): - if sync_area_start is None and ts >= start_ts and self.interval_power[i] > sync_threshold_power: + if ( + sync_area_start is None + and ts >= start_ts + and self.interval_power[i] > sync_threshold_power + ): sync_area_start = i - 300 sync_start_ts = ts - if sync_area_start is not None and sync_area_end is None and ts > sync_start_ts + self.min_barcode_duration and (ts > sync_start_ts + self.max_barcode_duration or abs(sync_threshold_power - self.interval_power[i]) > self.led_power): + if ( + sync_area_start is not None + and sync_area_end is None + and ts > sync_start_ts + self.min_barcode_duration + and ( + ts > sync_start_ts + self.max_barcode_duration + or abs(sync_threshold_power - self.interval_power[i]) + > self.led_power + ) + ): sync_area_end = i sync_end_ts = ts break - barcode_data = self.interval_power[sync_area_start: sync_area_end] + barcode_data = self.interval_power[sync_area_start:sync_area_end] - vprint(self.verbose, 'barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)'.format(sync_start_ts, sync_end_ts, len(barcode_data))) + vprint( + self.verbose, + "barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format( + sync_start_ts, sync_end_ts, len(barcode_data) + ), + ) bc, start, stop, padding_bits = self.find_barcode_in_power_data(barcode_data) @@ -2439,7 +2950,9 @@ class EnergyTraceLog: start_ts = self.interval_start_timestamp[sync_area_start + start] stop_ts = self.interval_start_timestamp[sync_area_start + stop] - end_ts = stop_ts + self.module_duration * padding_bits + self.quiet_zone_duration + end_ts = ( + stop_ts + self.module_duration * padding_bits + self.quiet_zone_duration + ) # barcode content, barcode start timestamp, barcode stop timestamp, barcode end (stop + padding) timestamp return bc, start_ts, stop_ts, end_ts @@ -2455,7 +2968,9 @@ class EnergyTraceLog: # -> Create a black and white (not grayscale) image to avoid this. # Unfortunately, this decreases resilience against background noise # (e.g. a not-exactly-idle peripheral device or CPU interrupts). - image_data = np.around(1 - ((barcode_data - min_power) / (max_power - min_power))) + image_data = np.around( + 1 - ((barcode_data - min_power) / (max_power - min_power)) + ) image_data *= 255 # zbar only returns the complete barcode position if it is at least @@ -2469,12 +2984,12 @@ class EnergyTraceLog: # img = Image.frombytes('L', (width, height), image_data).resize((width, 100)) # img.save('/tmp/test-{}.png'.format(os.getpid())) - zbimg = zbar.Image(width, height, 'Y800', image_data) + zbimg = zbar.Image(width, height, "Y800", image_data) scanner = zbar.ImageScanner() - scanner.parse_config('enable') + scanner.parse_config("enable") if scanner.scan(zbimg): - sym, = zbimg.symbols + (sym,) = zbimg.symbols content = sym.data try: sym_start = sym.location[1][0] @@ -2482,7 +2997,7 @@ class EnergyTraceLog: sym_start = 0 sym_end = sym.location[0][0] - match = re.fullmatch(r'T(\d+)', content) + match = re.fullmatch(r"T(\d+)", content) if match: content = self.transition_names[int(match.group(1))] @@ -2490,7 +3005,7 @@ class EnergyTraceLog: # additional non-barcode padding (encoded as LED off / image white). # Calculate the amount of extra bits to determine the offset until # the transition starts. - padding_bits = len(Code128(sym.data, charset='B').modules) % 8 + padding_bits = len(Code128(sym.data, charset="B").modules) % 8 # sym_start leaves out the first two bars, but we don't do anything about that here # sym_end leaves out the last three bars, each of which is one padding bit long. @@ -2499,7 +3014,7 @@ class EnergyTraceLog: return content, sym_start, sym_end, padding_bits else: - vprint(self.verbose, 'unable to find barcode') + vprint(self.verbose, "unable to find barcode") return None, None, None, None @@ -2555,15 +3070,15 @@ class MIMOSA: :returns: (numpy array of charges (pJ per 10µs), numpy array of triggers (0/1 int, per 10µs)) """ - num_bytes = tf.getmember('/tmp/mimosa//mimosa_scale_1.tmp').size + num_bytes = tf.getmember("/tmp/mimosa//mimosa_scale_1.tmp").size charges = np.ndarray(shape=(int(num_bytes / 4)), dtype=np.int32) triggers = np.ndarray(shape=(int(num_bytes / 4)), dtype=np.int8) - with tf.extractfile('/tmp/mimosa//mimosa_scale_1.tmp') as f: + with tf.extractfile("/tmp/mimosa//mimosa_scale_1.tmp") as f: content = f.read() - iterator = struct.iter_unpack('<I', content) + iterator = struct.iter_unpack("<I", content) i = 0 for word in iterator: - charges[i] = (word[0] >> 4) + charges[i] = word[0] >> 4 triggers[i] = (word[0] & 0x08) >> 3 i += 1 return charges, triggers @@ -2616,7 +3131,7 @@ class MIMOSA: trigidx = [] if len(triggers) < 1000000: - self.errors.append('MIMOSA log is too short') + self.errors.append("MIMOSA log is too short") return trigidx prevtrig = triggers[999999] @@ -2625,13 +3140,17 @@ class MIMOSA: # something went wrong and are unable to determine when the first # transition starts. if prevtrig != 0: - self.errors.append('Unable to find start of first transition (log starts with trigger == {} != 0)'.format(prevtrig)) + self.errors.append( + "Unable to find start of first transition (log starts with trigger == {} != 0)".format( + prevtrig + ) + ) # if the last trigger is high (i.e., trigger/buzzer pin is active when the benchmark ends), # it terminated in the middle of a transition -- meaning that it was not # measured in its entirety. if triggers[-1] != 0: - self.errors.append('Log ends during a transition'.format(prevtrig)) + self.errors.append("Log ends during a transition".format(prevtrig)) # the device is reset for MIMOSA calibration in the first 10s and may # send bogus interrupts -> bogus triggers @@ -2663,11 +3182,23 @@ class MIMOSA: for i in range(100000, len(currents)): if r1idx == 0 and currents[i] > ua_r1 * 0.6: r1idx = i - elif r1idx != 0 and r2idx == 0 and i > (r1idx + 180000) and currents[i] < ua_r1 * 0.4: + elif ( + r1idx != 0 + and r2idx == 0 + and i > (r1idx + 180000) + and currents[i] < ua_r1 * 0.4 + ): r2idx = i # 2s disconnected, 2s r1, 2s r2 with r1 < r2 -> ua_r1 > ua_r2 # allow 5ms buffer in both directions to account for bouncing relais contacts - return r1idx - 180500, r1idx - 500, r1idx + 500, r2idx - 500, r2idx + 500, r2idx + 180500 + return ( + r1idx - 180500, + r1idx - 500, + r1idx + 500, + r2idx - 500, + r2idx + 500, + r2idx + 180500, + ) def calibration_function(self, charges, cal_edges): u""" @@ -2711,7 +3242,7 @@ class MIMOSA: if cal_r2_mean > cal_0_mean: b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean) else: - vprint(self.verbose, '[W] 0 uA == %.f uA during calibration' % (ua_r2)) + vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2)) b_lower = 0 b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean) @@ -2726,7 +3257,9 @@ class MIMOSA: return 0 else: return charge * b_lower + a_lower + else: + def calfunc(charge): if charge < cal_0_mean: return 0 @@ -2736,19 +3269,19 @@ class MIMOSA: return charge * b_upper + a_upper + ua_r2 caldata = { - 'edges': [x * 10 for x in cal_edges], - 'offset': cal_0_mean, - 'offset2': cal_r2_mean, - 'slope_low': b_lower, - 'slope_high': b_upper, - 'add_low': a_lower, - 'add_high': a_upper, - 'r0_err_uW': np.mean(self.currents_nocal(chg_r0)) * self.voltage, - 'r0_std_uW': np.std(self.currents_nocal(chg_r0)) * self.voltage, - 'r1_err_uW': (np.mean(self.currents_nocal(chg_r1)) - ua_r1) * self.voltage, - 'r1_std_uW': np.std(self.currents_nocal(chg_r1)) * self.voltage, - 'r2_err_uW': (np.mean(self.currents_nocal(chg_r2)) - ua_r2) * self.voltage, - 'r2_std_uW': np.std(self.currents_nocal(chg_r2)) * self.voltage, + "edges": [x * 10 for x in cal_edges], + "offset": cal_0_mean, + "offset2": cal_r2_mean, + "slope_low": b_lower, + "slope_high": b_upper, + "add_low": a_lower, + "add_high": a_upper, + "r0_err_uW": np.mean(self.currents_nocal(chg_r0)) * self.voltage, + "r0_std_uW": np.std(self.currents_nocal(chg_r0)) * self.voltage, + "r1_err_uW": (np.mean(self.currents_nocal(chg_r1)) - ua_r1) * self.voltage, + "r1_std_uW": np.std(self.currents_nocal(chg_r1)) * self.voltage, + "r2_err_uW": (np.mean(self.currents_nocal(chg_r2)) - ua_r2) * self.voltage, + "r2_std_uW": np.std(self.currents_nocal(chg_r2)) * self.voltage, } # print("if charge < %f : return 0" % cal_0_mean) @@ -2843,51 +3376,59 @@ class MIMOSA: statelist = [] prevsubidx = 0 for subidx in subst: - statelist.append({ - 'duration': (subidx - prevsubidx) * 10, - 'uW_mean': np.mean(range_ua[prevsubidx: subidx] * self.voltage), - 'uW_std': np.std(range_ua[prevsubidx: subidx] * self.voltage), - }) + statelist.append( + { + "duration": (subidx - prevsubidx) * 10, + "uW_mean": np.mean( + range_ua[prevsubidx:subidx] * self.voltage + ), + "uW_std": np.std( + range_ua[prevsubidx:subidx] * self.voltage + ), + } + ) prevsubidx = subidx substates = { - 'threshold': thr, - 'states': statelist, + "threshold": thr, + "states": statelist, } - isa = 'state' + isa = "state" if not is_state: - isa = 'transition' + isa = "transition" data = { - 'isa': isa, - 'clip_rate': np.mean(range_raw == 65535), - 'raw_mean': np.mean(range_raw), - 'raw_std': np.std(range_raw), - 'uW_mean': np.mean(range_ua * self.voltage), - 'uW_std': np.std(range_ua * self.voltage), - 'us': (idx - previdx) * 10, + "isa": isa, + "clip_rate": np.mean(range_raw == 65535), + "raw_mean": np.mean(range_raw), + "raw_std": np.std(range_raw), + "uW_mean": np.mean(range_ua * self.voltage), + "uW_std": np.std(range_ua * self.voltage), + "us": (idx - previdx) * 10, } if self.with_traces: - data['uW'] = range_ua * self.voltage + data["uW"] = range_ua * self.voltage - if 'states' in substates: - data['substates'] = substates - ssum = np.sum(list(map(lambda x: x['duration'], substates['states']))) - if ssum != data['us']: - vprint(self.verbose, "ERR: duration %d vs %d" % (data['us'], ssum)) + if "states" in substates: + data["substates"] = substates + ssum = np.sum(list(map(lambda x: x["duration"], substates["states"]))) + if ssum != data["us"]: + vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum)) - if isa == 'transition': + if isa == "transition": # subtract average power of previous state # (that is, the state from which this transition originates) - data['uW_mean_delta_prev'] = data['uW_mean'] - iterdata[-1]['uW_mean'] + data["uW_mean_delta_prev"] = data["uW_mean"] - iterdata[-1]["uW_mean"] # placeholder to avoid extra cases in the analysis - data['uW_mean_delta_next'] = data['uW_mean'] - data['timeout'] = iterdata[-1]['us'] + data["uW_mean_delta_next"] = data["uW_mean"] + data["timeout"] = iterdata[-1]["us"] elif len(iterdata) > 0: # subtract average power of next state # (the state into which this transition leads) - iterdata[-1]['uW_mean_delta_next'] = iterdata[-1]['uW_mean'] - data['uW_mean'] + iterdata[-1]["uW_mean_delta_next"] = ( + iterdata[-1]["uW_mean"] - data["uW_mean"] + ) iterdata.append(data) |