diff options
author | jfalkenhagen <jfalkenhagen@uos.de> | 2020-07-16 16:39:19 +0200 |
---|---|---|
committer | jfalkenhagen <jfalkenhagen@uos.de> | 2020-07-16 16:39:19 +0200 |
commit | 98d23807e35cc211415c7e0c887f1b1b502f10e5 (patch) | |
tree | ebb649c585166e546dda704990ed4c5eeb95519f /lib | |
parent | a00ffc0e32ddc72a8faceec4344432cdbf3b90c7 (diff) | |
parent | af4cc108b5c5132a991a2b83d258ed55e985936f (diff) |
Merge branch 'master' into janis
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/automata.py | 17 | ||||
-rw-r--r-- | lib/data_parameters.py | 17 | ||||
-rw-r--r-- | lib/functions.py | 91 | ||||
-rw-r--r-- | lib/harness.py | 4 | ||||
-rwxr-xr-x | lib/keysightdlog.py | 164 | ||||
-rw-r--r-- | lib/lex.py | 9 | ||||
-rw-r--r-- | lib/loader.py (renamed from lib/dfatool.py) | 1603 | ||||
-rw-r--r-- | lib/model.py | 1156 | ||||
-rw-r--r-- | lib/parameters.py | 252 | ||||
-rwxr-xr-x | lib/protocol_benchmarks.py | 7 | ||||
-rw-r--r-- | lib/runner.py | 50 | ||||
-rw-r--r-- | lib/utils.py | 14 | ||||
-rw-r--r-- | lib/validation.py | 238 |
13 files changed, 1616 insertions, 2006 deletions
diff --git a/lib/automata.py b/lib/automata.py index b3318e0..ebe1871 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -3,11 +3,14 @@ from .functions import AnalyticFunction, NormalizationFunction from .utils import is_numeric import itertools +import logging import numpy as np import json import queue import yaml +logger = logging.getLogger(__name__) + def _dict_to_list(input_dict: dict) -> list: return [input_dict[x] for x in sorted(input_dict.keys())] @@ -100,7 +103,7 @@ class PTAAttribute: def __repr__(self): if self.function is not None: return "PTAATtribute<{:.0f}, {}>".format( - self.value, self.function._model_str + self.value, self.function.model_function ) return "PTAATtribute<{:.0f}, None>".format(self.value) @@ -134,8 +137,8 @@ class PTAAttribute: } if self.function: ret["function"] = { - "raw": self.function._model_str, - "regression_args": list(self.function._regression_args), + "raw": self.function.model_function, + "regression_args": list(self.function.model_args), } ret["function_error"] = self.function_error return ret @@ -1305,8 +1308,8 @@ class PTA: "power" ] except KeyError: - print( - "[W] skipping model update of state {} due to missing data".format( + logger.warning( + "skipping model update of state {} due to missing data".format( state.name ) ) @@ -1353,8 +1356,8 @@ class PTA: "timeout" ] except KeyError: - print( - "[W] skipping model update of transition {} due to missing data".format( + logger.warning( + "skipping model update of transition {} due to missing data".format( transition.name ) ) diff --git a/lib/data_parameters.py b/lib/data_parameters.py index 1150b71..84eacfd 100644 --- a/lib/data_parameters.py +++ b/lib/data_parameters.py @@ -7,9 +7,12 @@ length of lists, ane more. from .protocol_benchmarks import codegen_for_lib from . import cycles_to_energy, size_to_radio_energy, utils +import logging import numpy as np import ubjson +logger = logging.getLogger(__name__) + def _string_value_length(json): if type(json) == str: @@ -224,7 +227,7 @@ class Protolog: except KeyError: pass except TypeError as e: - print( + logger.error( "TypeError in {} {} {} {}: {} -> {}".format( arch_lib, benchmark, @@ -395,7 +398,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_enc is NaN for {} -> {} -> {}".format( arch, lib, key ) @@ -410,7 +413,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_ser is NaN for {} -> {} -> {}".format( arch, lib, key ) @@ -425,7 +428,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_encser is NaN for {} -> {} -> {}".format( arch, lib, key ) @@ -440,7 +443,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_des is NaN for {} -> {} -> {}".format( arch, lib, key ) @@ -455,7 +458,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_dec is NaN for {} -> {} -> {}".format( arch, lib, key ) @@ -470,7 +473,7 @@ class Protolog: except KeyError: pass except ValueError: - print( + logger.warning( "cycles_desdec is NaN for {} -> {} -> {}".format( arch, lib, key ) diff --git a/lib/functions.py b/lib/functions.py index 6d8daa4..94b1aaf 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -5,12 +5,14 @@ This module provides classes and helper functions useful for least-squares regression and general handling of model functions. """ from itertools import chain, combinations +import logging import numpy as np import re from scipy import optimize -from .utils import is_numeric, vprint +from .utils import is_numeric arg_support_enabled = True +logger = logging.getLogger(__name__) def powerset(iterable): @@ -23,6 +25,47 @@ def powerset(iterable): return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1)) +def gplearn_to_function(function_str: str): + """ + Convert gplearn-style function string to Python function. + + Takes a function string like "mul(add(X0, X1), X2)" and returns + a Python function implementing the specified behaviour, + e.g. "lambda x, y, z: (x + y) * z". + + Supported functions: + add -- x + y + sub -- x - y + mul -- x * y + div -- x / y if |y| > 0.001, otherwise 1 + sqrt -- sqrt(|x|) + log -- log(|x|) if |x| > 0.001, otherwise 0 + inv -- 1 / x if |x| > 0.001, otherwise 0 + """ + eval_globals = { + "add": lambda x, y: x + y, + "sub": lambda x, y: x - y, + "mul": lambda x, y: x * y, + "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0, + "sqrt": lambda x: np.sqrt(np.abs(x)), + "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0, + "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0, + } + + last_arg_index = 0 + for i in range(0, 100): + if function_str.find("X{:d}".format(i)) >= 0: + last_arg_index = i + + arg_list = [] + for i in range(0, last_arg_index + 1): + arg_list.append("X{:d}".format(i)) + + eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str) + logger.debug(eval_str) + return eval(eval_str, eval_globals) + + class ParamFunction: """ A one-dimensional model function, ready for least squares optimization and similar. @@ -118,9 +161,7 @@ class AnalyticFunction: packet length. """ - def __init__( - self, function_str, parameters, num_args, verbose=True, regression_args=None - ): + def __init__(self, function_str, parameters, num_args, regression_args=None): """ Create a new AnalyticFunction object from a function string. @@ -135,18 +176,16 @@ class AnalyticFunction: :param num_args: number of local function arguments, if any. Set to 0 if the model attribute does not belong to a function or if function arguments are not included in the model. - :param verbose: complain about odd events :param regression_args: Initial regression variable values, both for function usage and least squares optimization. If unset, defaults to [1, 1, 1, ...] """ self._parameter_names = parameters self._num_args = num_args - self._model_str = function_str + self.model_function = function_str rawfunction = function_str self._dependson = [False] * (len(parameters) + num_args) self.fit_success = False - self.verbose = verbose if type(function_str) == str: num_vars_re = re.compile(r"regression_arg\(([0-9]+)\)") @@ -176,12 +215,12 @@ class AnalyticFunction: self._function = function_str if regression_args: - self._regression_args = regression_args.copy() + self.model_args = regression_args.copy() self._fit_success = True elif type(function_str) == str: - self._regression_args = list(np.ones((num_vars))) + self.model_args = list(np.ones((num_vars))) else: - self._regression_args = [] + self.model_args = [] def get_fit_data(self, by_param, state_or_tran, model_attribute): """ @@ -231,9 +270,8 @@ class AnalyticFunction: else: X[i].extend([np.nan] * len(val[model_attribute])) elif key[0] == state_or_tran and len(key[1]) != dimension: - vprint( - self.verbose, - "[W] Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.".format( + logger.warning( + "Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.".format( state_or_tran, model_attribute, len(key[1]), dimension ), ) @@ -263,30 +301,27 @@ class AnalyticFunction: error_function = lambda P, X, y: self._function(P, X) - y try: res = optimize.least_squares( - error_function, self._regression_args, args=(X, Y), xtol=2e-15 + error_function, self.model_args, args=(X, Y), xtol=2e-15 ) except ValueError as err: - vprint( - self.verbose, - "[W] Fit failed for {}/{}: {} (function: {})".format( - state_or_tran, model_attribute, err, self._model_str + logger.warning( + "Fit failed for {}/{}: {} (function: {})".format( + state_or_tran, model_attribute, err, self.model_function ), ) return if res.status > 0: - self._regression_args = res.x + self.model_args = res.x self.fit_success = True else: - vprint( - self.verbose, - "[W] Fit failed for {}/{}: {} (function: {})".format( - state_or_tran, model_attribute, res.message, self._model_str + logger.warning( + "Fit failed for {}/{}: {} (function: {})".format( + state_or_tran, model_attribute, res.message, self.model_function ), ) else: - vprint( - self.verbose, - "[W] Insufficient amount of valid parameter keys, cannot fit {}/{}".format( + logger.warning( + "Insufficient amount of valid parameter keys, cannot fit {}/{}".format( state_or_tran, model_attribute ), ) @@ -314,9 +349,9 @@ class AnalyticFunction: corresponds to lexically first parameter, etc. :param arg_list: argument values (list of float), if arguments are used. """ - if len(self._regression_args) == 0: + if len(self.model_args) == 0: return self._function(param_list, arg_list) - return self._function(self._regression_args, param_list) + return self._function(self.model_args, param_list) class analytic: diff --git a/lib/harness.py b/lib/harness.py index 3b279c0..ae9c28c 100644 --- a/lib/harness.py +++ b/lib/harness.py @@ -21,7 +21,7 @@ class TransitionHarness: * `name`: state or transition name * `parameter`: currently valid parameter values. If normalization is used, they are already normalized. Each parameter value is either a primitive int/float/str value (-> constant for each iteration) or a list of - primitive values (-> set by the return value of the current run, not necessarily constan) + primitive values (-> set by the return value of the current run, not necessarily constant) * `args`: function arguments, if isa == 'transition' """ @@ -229,6 +229,7 @@ class TransitionHarness: log_data_target["parameter"][parameter_name] = list() log_data_target["parameter"][parameter_name].append(parameter_value) + # Here Be Dragons def parser_cb(self, line): # print('[HARNESS] got line {}'.format(line)) if re.match(r"\[PTA\] benchmark stop", line): @@ -440,6 +441,7 @@ class OnboardTimerHarness(TransitionHarness): log_data_target["parameter"][parameter_name] = list() log_data_target["parameter"][parameter_name].append(parameter_value) + # Here Be Dragons def parser_cb(self, line): # print('[HARNESS] got line {}'.format(line)) res = re.match(r"\[PTA\] nop=(\S+)/(\S+)", line) diff --git a/lib/keysightdlog.py b/lib/keysightdlog.py deleted file mode 100755 index 89264b9..0000000 --- a/lib/keysightdlog.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python3 - -import lzma -import matplotlib.pyplot as plt -import numpy as np -import os -import struct -import sys -import xml.etree.ElementTree as ET - - -def plot_y(Y, **kwargs): - plot_xy(np.arange(len(Y)), Y, **kwargs) - - -def plot_xy(X, Y, xlabel=None, ylabel=None, title=None, output=None): - fig, ax1 = plt.subplots(figsize=(10, 6)) - if title != None: - fig.canvas.set_window_title(title) - if xlabel != None: - ax1.set_xlabel(xlabel) - if ylabel != None: - ax1.set_ylabel(ylabel) - plt.subplots_adjust(left=0.1, bottom=0.1, right=0.99, top=0.99) - plt.plot(X, Y, "bo", markersize=2) - if output: - plt.savefig(output) - with open("{}.txt".format(output), "w") as f: - print("X Y", file=f) - for i in range(len(X)): - print("{} {}".format(X[i], Y[i]), file=f) - else: - plt.show() - - -filename = sys.argv[1] - -with open(filename, "rb") as logfile: - lines = [] - line = "" - - if ".xz" in filename: - f = lzma.open(logfile) - else: - f = logfile - - while line != "</dlog>\n": - line = f.readline().decode() - lines.append(line) - xml_header = "".join(lines) - raw_header = f.read(8) - data_offset = f.tell() - raw_data = f.read() - - xml_header = xml_header.replace("1ua>", "X1ua>") - xml_header = xml_header.replace("2ua>", "X2ua>") - dlog = ET.fromstring(xml_header) - channels = [] - for channel in dlog.findall("channel"): - channel_id = int(channel.get("id")) - sense_curr = channel.find("sense_curr").text - sense_volt = channel.find("sense_volt").text - model = channel.find("ident").find("model").text - if sense_volt == "1": - channels.append((channel_id, model, "V")) - if sense_curr == "1": - channels.append((channel_id, model, "A")) - - num_channels = len(channels) - duration = int(dlog.find("frame").find("time").text) - interval = float(dlog.find("frame").find("tint").text) - real_duration = interval * int(len(raw_data) / (4 * num_channels)) - - data = np.ndarray( - shape=(num_channels, int(len(raw_data) / (4 * num_channels))), dtype=np.float32 - ) - - iterator = struct.iter_unpack(">f", raw_data) - channel_offset = 0 - measurement_offset = 0 - for value in iterator: - data[channel_offset, measurement_offset] = value[0] - if channel_offset + 1 == num_channels: - channel_offset = 0 - measurement_offset += 1 - else: - channel_offset += 1 - -if int(real_duration) != duration: - print( - "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format( - real_duration, duration, interval * 1000000 - ) - ) -else: - print( - "Measurement duration: {:d} seconds at {:f} µs per sample".format( - duration, interval * 1000000 - ) - ) - -for i, channel in enumerate(channels): - channel_id, channel_model, channel_type = channel - print( - "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} {:s}".format( - channel_id, - channel_model, - np.min(data[i]), - np.max(data[i]), - np.mean(data[i]), - channel_type, - ) - ) - - if ( - i > 0 - and channel_type == "A" - and channels[i - 1][2] == "V" - and channel_id == channels[i - 1][0] - ): - power = data[i - 1] * data[i] - power = 3.6 * data[i] - print( - "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} W".format( - channel_id, channel_model, np.min(power), np.max(power), np.mean(power) - ) - ) - min_power = np.min(power) - max_power = np.max(power) - power_border = np.mean([min_power, max_power]) - low_power = power[power < power_border] - high_power = power[power >= power_border] - plot_y(power) - print( - " avg low / high power (delta): {:f} / {:f} ({:f}) W".format( - np.mean(low_power), - np.mean(high_power), - np.mean(high_power) - np.mean(low_power), - ) - ) - # plot_y(low_power) - # plot_y(high_power) - high_power_durations = [] - current_high_power_duration = 0 - for is_hpe in power >= power_border: - if is_hpe: - current_high_power_duration += interval - else: - if current_high_power_duration > 0: - high_power_durations.append(current_high_power_duration) - current_high_power_duration = 0 - print( - " avg high-power duration: {:f} µs".format( - np.mean(high_power_durations) * 1000000 - ) - ) - -# print(xml_header) -# print(raw_header) -# print(channels) -# print(data) -# print(np.mean(data[0])) -# print(np.mean(data[1])) -# print(np.mean(data[0] * data[1])) @@ -1,4 +1,7 @@ from .sly import Lexer, Parser +import logging + +logger = logging.getLogger(__name__) class TimedWordLexer(Lexer): @@ -38,7 +41,7 @@ class TimedSequenceLexer(Lexer): FUNCTIONSEP = r";" def error(self, t): - print("Illegal character '%s'" % t.value[0]) + logger.error("Illegal character '%s'" % t.value[0]) if t.value[0] == "{" and t.value.find("}"): self.index += 1 + t.value.find("}") else: @@ -153,11 +156,11 @@ class TimedSequenceParser(Parser): def error(self, p): if p: - print("Syntax error at token", p.type) + logger.error("Syntax error at token", p.type) # Just discard the token and tell the parser it's okay. self.errok() else: - print("Syntax error at EOF") + logger.error("Syntax error at EOF") class TimedWord: diff --git a/lib/dfatool.py b/lib/loader.py index 63639d3..4e07c92 100644 --- a/lib/dfatool.py +++ b/lib/loader.py @@ -3,26 +3,17 @@ import csv import io import json +import logging import numpy as np import os import re -from scipy import optimize -from sklearn.metrics import r2_score import struct import tarfile import hashlib from multiprocessing import Pool -from .functions import analytic -from .functions import AnalyticFunction -from .parameters import ParamStats -from .utils import ( - vprint, - is_numeric, - soft_cast_int, - param_slice_eq, - remove_index_from_tuple, -) -from .utils import by_name_to_by_param, match_parameter_values, running_mean +from .utils import running_mean, soft_cast_int + +logger = logging.getLogger(__name__) try: from .pubcode import Code128 @@ -36,135 +27,6 @@ except ImportError: arg_support_enabled = True -def gplearn_to_function(function_str: str): - """ - Convert gplearn-style function string to Python function. - - Takes a function string like "mul(add(X0, X1), X2)" and returns - a Python function implementing the specified behaviour, - e.g. "lambda x, y, z: (x + y) * z". - - Supported functions: - add -- x + y - sub -- x - y - mul -- x * y - div -- x / y if |y| > 0.001, otherwise 1 - sqrt -- sqrt(|x|) - log -- log(|x|) if |x| > 0.001, otherwise 0 - inv -- 1 / x if |x| > 0.001, otherwise 0 - """ - eval_globals = { - "add": lambda x, y: x + y, - "sub": lambda x, y: x - y, - "mul": lambda x, y: x * y, - "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0, - "sqrt": lambda x: np.sqrt(np.abs(x)), - "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0, - "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0, - } - - last_arg_index = 0 - for i in range(0, 100): - if function_str.find("X{:d}".format(i)) >= 0: - last_arg_index = i - - arg_list = [] - for i in range(0, last_arg_index + 1): - arg_list.append("X{:d}".format(i)) - - eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str) - print(eval_str) - return eval(eval_str, eval_globals) - - -def append_if_set(aggregate: dict, data: dict, key: str): - """Append data[key] to aggregate if key in data.""" - if key in data: - aggregate.append(data[key]) - - -def mean_or_none(arr): - """ - Compute mean of NumPy array `arr`, return -1 if empty. - - :param arr: 1-Dimensional NumPy array - """ - if len(arr): - return np.mean(arr) - return -1 - - -def aggregate_measures(aggregate: float, actual: list) -> dict: - """ - Calculate error measures for model value on data list. - - arguments: - aggregate -- model value (float or int) - actual -- real-world / reference values (list of float or int) - - return value: - See regression_measures - """ - aggregate_array = np.array([aggregate] * len(actual)) - return regression_measures(aggregate_array, np.array(actual)) - - -def regression_measures(predicted: np.ndarray, actual: np.ndarray): - """ - Calculate error measures by comparing model values to reference values. - - arguments: - predicted -- model values (np.ndarray) - actual -- real-world / reference values (np.ndarray) - - Returns a dict containing the following measures: - mae -- Mean Absolute Error - mape -- Mean Absolute Percentage Error, - if all items in actual are non-zero (NaN otherwise) - smape -- Symmetric Mean Absolute Percentage Error, - if no 0,0-pairs are present in actual and predicted (NaN otherwise) - msd -- Mean Square Deviation - rmsd -- Root Mean Square Deviation - ssr -- Sum of Squared Residuals - rsq -- R^2 measure, see sklearn.metrics.r2_score - count -- Number of values - """ - if type(predicted) != np.ndarray: - raise ValueError("first arg must be ndarray, is {}".format(type(predicted))) - if type(actual) != np.ndarray: - raise ValueError("second arg must be ndarray, is {}".format(type(actual))) - deviations = predicted - actual - # mean = np.mean(actual) - if len(deviations) == 0: - return {} - measures = { - "mae": np.mean(np.abs(deviations), dtype=np.float64), - "msd": np.mean(deviations ** 2, dtype=np.float64), - "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64), - "ssr": np.sum(deviations ** 2, dtype=np.float64), - "rsq": r2_score(actual, predicted), - "count": len(actual), - } - - # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64) - - if np.all(actual != 0): - measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure - else: - measures["mape"] = np.nan - if np.all(np.abs(predicted) + np.abs(actual) != 0): - measures["smape"] = ( - np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) - * 100 - ) - else: - measures["smape"] = np.nan - # if np.all(rsq_quotient != 0): - # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient - - return measures - - class KeysightCSV: """Simple loader for Keysight CSV data, as exported by the windows software.""" @@ -194,162 +56,6 @@ class KeysightCSV: return timestamps, currents -def _xv_partitions_kfold(length, num_slices): - pairs = [] - indexes = np.arange(length) - for i in range(0, num_slices): - training = np.delete(indexes, slice(i, None, num_slices)) - validation = indexes[i::num_slices] - pairs.append((training, validation)) - return pairs - - -def _xv_partition_montecarlo(length): - shuffled = np.random.permutation(np.arange(length)) - border = int(length * float(2) / 3) - training = shuffled[:border] - validation = shuffled[border:] - return (training, validation) - - -class CrossValidator: - """ - Cross-Validation helper for model generation. - - Given a set of measurements and a model class, it will partition the - data into training and validation sets, train the model on the training - set, and assess its quality on the validation set. This is repeated - several times depending on cross-validation algorithm and configuration. - Reports the mean model error over all cross-validation runs. - """ - - def __init__(self, model_class, by_name, parameters, arg_count): - """ - Create a new CrossValidator object. - - Does not perform cross-validation yet. - - arguments: - model_class -- model class/type used for model synthesis, - e.g. PTAModel or AnalyticModel. model_class must have a - constructor accepting (by_name, parameters, arg_count, verbose = False) - and provide an assess method. - by_name -- measurements aggregated by state/transition/function/... name. - Layout: by_name[name][attribute] = list of data. Additionally, - by_name[name]['attributes'] must be set to the list of attributes, - e.g. ['power'] or ['duration', 'energy']. - """ - self.model_class = model_class - self.by_name = by_name - self.names = sorted(by_name.keys()) - self.parameters = sorted(parameters) - self.arg_count = arg_count - - def montecarlo(self, model_getter, count=200): - """ - Perform Monte Carlo cross-validation and return average model quality. - - The by_name data is randomly divided into 2/3 training and 1/3 - validation. After creating a model for the training set, the - model type returned by model_getter is evaluated on the validation set. - This is repeated count times (defaulting to 200); the average of all - measures is returned to the user. - - arguments: - model_getter -- function with signature (model_object) -> model, - e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware - model with automatic parameter detection. - count -- number of validation runs to perform, defaults to 200 - - return value: - dict of model quality measures. - { - 'by_name' : { - for each name: { - for each attribute: { - 'mae' : mean of all mean absolute errors - 'mae_list' : list of the individual MAE values encountered during cross-validation - 'smape' : mean of all symmetric mean absolute percentage errors - 'smape_list' : list of the individual SMAPE values encountered during cross-validation - } - } - } - } - """ - ret = {"by_name": dict()} - - for name in self.names: - ret["by_name"][name] = dict() - for attribute in self.by_name[name]["attributes"]: - ret["by_name"][name][attribute] = { - "mae_list": list(), - "smape_list": list(), - } - - for _ in range(count): - res = self._single_montecarlo(model_getter) - for name in self.names: - for attribute in self.by_name[name]["attributes"]: - ret["by_name"][name][attribute]["mae_list"].append( - res["by_name"][name][attribute]["mae"] - ) - ret["by_name"][name][attribute]["smape_list"].append( - res["by_name"][name][attribute]["smape"] - ) - - for name in self.names: - for attribute in self.by_name[name]["attributes"]: - ret["by_name"][name][attribute]["mae"] = np.mean( - ret["by_name"][name][attribute]["mae_list"] - ) - ret["by_name"][name][attribute]["smape"] = np.mean( - ret["by_name"][name][attribute]["smape_list"] - ) - - return ret - - def _single_montecarlo(self, model_getter): - training = dict() - validation = dict() - for name in self.names: - training[name] = {"attributes": self.by_name[name]["attributes"]} - validation[name] = {"attributes": self.by_name[name]["attributes"]} - - if "isa" in self.by_name[name]: - training[name]["isa"] = self.by_name[name]["isa"] - validation[name]["isa"] = self.by_name[name]["isa"] - - data_count = len(self.by_name[name]["param"]) - training_subset, validation_subset = _xv_partition_montecarlo(data_count) - - for attribute in self.by_name[name]["attributes"]: - self.by_name[name][attribute] = np.array(self.by_name[name][attribute]) - training[name][attribute] = self.by_name[name][attribute][ - training_subset - ] - validation[name][attribute] = self.by_name[name][attribute][ - validation_subset - ] - - # We can't use slice syntax for 'param', which may contain strings and other odd values - training[name]["param"] = list() - validation[name]["param"] = list() - for idx in training_subset: - training[name]["param"].append(self.by_name[name]["param"][idx]) - for idx in validation_subset: - validation[name]["param"].append(self.by_name[name]["param"][idx]) - - training_data = self.model_class( - training, self.parameters, self.arg_count, verbose=False - ) - training_model = model_getter(training_data) - validation_data = self.model_class( - validation, self.parameters, self.arg_count, verbose=False - ) - - return validation_data.assess(training_model) - - def _preprocess_mimosa(measurement): setup = measurement["setup"] mim = MIMOSA( @@ -457,9 +163,7 @@ class TimingData: transitions = list( filter(lambda x: x["isa"] == "transition", trace["trace"]) ) - self.traces.append( - {"id": trace["id"], "trace": transitions,} - ) + self.traces.append({"id": trace["id"], "trace": transitions}) for i, trace in enumerate(self.traces): trace["orig_id"] = trace["id"] trace["id"] = i @@ -490,14 +194,13 @@ class TimingData: self.traces_by_fileno.extend(log_data["traces"]) self._concatenate_analyzed_traces() - def get_preprocessed_data(self, verbose=True): + def get_preprocessed_data(self): """ Return a list of DFA traces annotated with timing and parameter data. Suitable for the PTAModel constructor. See PTAModel(...) docstring for format details. """ - self.verbose = verbose if self.preprocessed: return self.traces if self.version == 0: @@ -539,7 +242,7 @@ class RawData: file system, making subsequent loads near-instant. """ - def __init__(self, filenames, with_traces=False): + def __init__(self, filenames, with_traces=False, skip_cache=False): """ Create a new RawData object. @@ -602,6 +305,7 @@ class RawData: self._parameter_names = None self.ignore_clipping = False self.pta = None + self.ptalog = None with tarfile.open(filenames[0]) as tf: for member in tf.getmembers(): @@ -612,9 +316,12 @@ class RawData: elif ".etlog" in member.name: self.version = 2 break + if self.version >= 1: + self.ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json"))) + self.pta = self.ptalog["pta"] self.set_cache_file() - if not with_traces: + if not with_traces and not skip_cache: self.load_cache() def set_cache_file(self): @@ -631,6 +338,8 @@ class RawData: self.preprocessing_stats = cache_data["preprocessing_stats"] if "pta" in cache_data: self.pta = cache_data["pta"] + if "ptalog" in cache_data: + self.ptalog = cache_data["ptalog"] self.setup_by_fileno = cache_data["setup_by_fileno"] self.preprocessed = True @@ -647,6 +356,7 @@ class RawData: "traces": self.traces, "preprocessing_stats": self.preprocessing_stats, "pta": self.pta, + "ptalog": self.ptalog, "setup_by_fileno": self.setup_by_fileno, } json.dump(cache_data, f) @@ -1050,7 +760,7 @@ class RawData: trace["id"] = i return trace_output - def get_preprocessed_data(self, verbose=True): + def get_preprocessed_data(self): """ Return a list of DFA traces annotated with energy, timing, and parameter data. The list is cached on disk, unless the constructor was called with `with_traces` set. @@ -1103,7 +813,6 @@ class RawData: * `args`: List of arguments the corresponding function call was called with. args entries are strings which are not necessarily numeric * `code`: List of function name (first entry) and arguments (remaining entries) of the corresponding function call """ - self.verbose = verbose if self.preprocessed: return self.traces if self.version == 0: @@ -1145,8 +854,7 @@ class RawData: new_filenames = list() with tarfile.open(filename) as tf: - ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json"))) - self.pta = ptalog["pta"] + ptalog = self.ptalog # Benchmark code may be too large to be executed in a single # run, so benchmarks (a benchmark is basically a list of DFA runs) @@ -1200,8 +908,7 @@ class RawData: new_filenames = list() with tarfile.open(filename) as tf: - ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json"))) - self.pta = ptalog["pta"] + ptalog = self.ptalog # Benchmark code may be too large to be executed in a single # run, so benchmarks (a benchmark is basically a list of DFA runs) @@ -1292,13 +999,12 @@ class RawData: for measurement in measurements: if "energy_trace" not in measurement: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logger.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e="; ".join(measurement["datasource_errors"]), - ), + ) ) continue @@ -1315,32 +1021,29 @@ class RawData: self._merge_online_and_offline(measurement) num_valid += 1 else: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logger.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e=measurement["error"], - ), + ) ) elif version == 2: if self._measurement_is_valid_2(measurement): self._merge_online_and_etlog(measurement) num_valid += 1 else: - vprint( - self.verbose, - "[W] Skipping {ar:s}/{m:s}: {e:s}".format( + logger.warning( + "Skipping {ar:s}/{m:s}: {e:s}".format( ar=self.filenames[measurement["fileno"]], m=measurement["info"].name, e=measurement["error"], - ), + ) ) - vprint( - self.verbose, - "[I] {num_valid:d}/{num_total:d} measurements are valid".format( + logger.info( + "{num_valid:d}/{num_total:d} measurements are valid".format( num_valid=num_valid, num_total=len(measurements) - ), + ) ) if version == 0: self.traces = self._concatenate_traces(self.traces_by_fileno) @@ -1357,597 +1060,6 @@ class RawData: } -class ParallelParamFit: - """ - Fit a set of functions on parameterized measurements. - - One parameter is variale, all others are fixed. Reports the best-fitting - function type for each parameter. - """ - - def __init__(self, by_param): - """Create a new ParallelParamFit object.""" - self.fit_queue = [] - self.by_param = by_param - - def enqueue( - self, - state_or_tran, - attribute, - param_index, - param_name, - safe_functions_enabled=False, - param_filter=None, - ): - """ - Add state_or_tran/attribute/param_name to fit queue. - - This causes fit() to compute the best-fitting function for this model part. - """ - self.fit_queue.append( - { - "key": [state_or_tran, attribute, param_name, param_filter], - "args": [ - self.by_param, - state_or_tran, - attribute, - param_index, - safe_functions_enabled, - param_filter, - ], - } - ) - - def fit(self): - """ - Fit functions on previously enqueue data. - - Fitting is one in parallel with one process per core. - - Results can be accessed using the public ParallelParamFit.results object. - """ - with Pool() as pool: - self.results = pool.map(_try_fits_parallel, self.fit_queue) - - -def _try_fits_parallel(arg): - """ - Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result. - - Must be a global function as it is called from a multiprocessing Pool. - """ - return {"key": arg["key"], "result": _try_fits(*arg["args"])} - - -def _try_fits( - by_param, - state_or_tran, - model_attribute, - param_index, - safe_functions_enabled=False, - param_filter: dict = None, -): - """ - Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions. - - This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters. - The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function. - Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored. - Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`. - - :returns: a dictionary with the following elements: - best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data. - best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters - mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value - median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value - results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values - - :param by_param: measurements partitioned by state/transition/... name and parameter values. - Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}` - - :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple). - Example: `'foo'` - - :param model_attribute: attribute for which goodness-of-fit will be calculated. - Example: `'bar'` - - :param param_index: index of the parameter used as model input - :param safe_functions_enabled: Include "safe" variants of functions with limited argument range. - :param param_filter: Only use measurements whose parameters match param_filter for fitting. - """ - - functions = analytic.functions(safe_functions_enabled=safe_functions_enabled) - - for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()): - # We might remove elements from 'functions' while iterating over - # its keys. A generator will not allow this, so we need to - # convert to a list. - function_names = list(functions.keys()) - for function_name in function_names: - function_object = functions[function_name] - if is_numeric(param_key[1][param_index]) and not function_object.is_valid( - param_key[1][param_index] - ): - functions.pop(function_name, None) - - raw_results = dict() - raw_results_by_param = dict() - ref_results = {"mean": list(), "median": list()} - results = dict() - results_by_param = dict() - - seen_parameter_combinations = set() - - # for each parameter combination: - for param_key in filter( - lambda x: x[0] == state_or_tran - and remove_index_from_tuple(x[1], param_index) - not in seen_parameter_combinations - and len(by_param[x]["param"]) - and match_parameter_values(by_param[x]["param"][0], param_filter), - by_param.keys(), - ): - X = [] - Y = [] - num_valid = 0 - num_total = 0 - - # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1, - # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters - seen_parameter_combinations.add( - remove_index_from_tuple(param_key[1], param_index) - ) - - # for each value of the parameter denoted by param_index (all other parameters remain the same): - for k, v in filter( - lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items() - ): - num_total += 1 - if is_numeric(k[1][param_index]): - num_valid += 1 - X.extend([float(k[1][param_index])] * len(v[model_attribute])) - Y.extend(v[model_attribute]) - - if num_valid > 2: - X = np.array(X) - Y = np.array(Y) - other_parameters = remove_index_from_tuple(k[1], param_index) - raw_results_by_param[other_parameters] = dict() - results_by_param[other_parameters] = dict() - for function_name, param_function in functions.items(): - if function_name not in raw_results: - raw_results[function_name] = dict() - error_function = param_function.error_function - res = optimize.least_squares( - error_function, [0, 1], args=(X, Y), xtol=2e-15 - ) - measures = regression_measures(param_function.eval(res.x, X), Y) - raw_results_by_param[other_parameters][function_name] = measures - for measure, error_rate in measures.items(): - if measure not in raw_results[function_name]: - raw_results[function_name][measure] = list() - raw_results[function_name][measure].append(error_rate) - # print(function_name, res, measures) - mean_measures = aggregate_measures(np.mean(Y), Y) - ref_results["mean"].append(mean_measures["rmsd"]) - raw_results_by_param[other_parameters]["mean"] = mean_measures - median_measures = aggregate_measures(np.median(Y), Y) - ref_results["median"].append(median_measures["rmsd"]) - raw_results_by_param[other_parameters]["median"] = median_measures - - if not len(ref_results["mean"]): - # Insufficient data for fitting - # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index)) - return {"best": None, "best_rmsd": np.inf, "results": results} - - for ( - other_parameter_combination, - other_parameter_results, - ) in raw_results_by_param.items(): - best_fit_val = np.inf - best_fit_name = None - results = dict() - for function_name, result in other_parameter_results.items(): - if len(result) > 0: - results[function_name] = result - rmsd = result["rmsd"] - if rmsd < best_fit_val: - best_fit_val = rmsd - best_fit_name = function_name - results_by_param[other_parameter_combination] = { - "best": best_fit_name, - "best_rmsd": best_fit_val, - "mean_rmsd": results["mean"]["rmsd"], - "median_rmsd": results["median"]["rmsd"], - "results": results, - } - - best_fit_val = np.inf - best_fit_name = None - results = dict() - for function_name, result in raw_results.items(): - if len(result) > 0: - results[function_name] = {} - for measure in result.keys(): - results[function_name][measure] = np.mean(result[measure]) - rmsd = results[function_name]["rmsd"] - if rmsd < best_fit_val: - best_fit_val = rmsd - best_fit_name = function_name - - return { - "best": best_fit_name, - "best_rmsd": best_fit_val, - "mean_rmsd": np.mean(ref_results["mean"]), - "median_rmsd": np.mean(ref_results["median"]), - "results": results, - "results_by_other_param": results_by_param, - } - - -def _num_args_from_by_name(by_name): - num_args = dict() - for key, value in by_name.items(): - if "args" in value: - num_args[key] = len(value["args"][0]) - return num_args - - -def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = None): - """ - Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'. - - Filters out results where the best function is worse (or not much better than) static mean/median estimates. - - :param results: fit results as returned by `paramfit.results` - :param name: state/transition/... name, e.g. 'TX' - :param attribute: model attribute, e.g. 'duration' - :param verbose: print debug message to stdout when deliberately not using a determined fit function - :param param_filter: - :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } - """ - fit_result = dict() - for result in results: - if ( - result["key"][0] == name - and result["key"][1] == attribute - and result["key"][3] == param_filter - and result["result"]["best"] is not None - ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? - this_result = result["result"] - if this_result["best_rmsd"] >= min( - this_result["mean_rmsd"], this_result["median_rmsd"] - ): - vprint( - verbose, - "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( - name, - attribute, - result["key"][2], - this_result["best_rmsd"], - this_result["mean_rmsd"], - this_result["median_rmsd"], - ), - ) - # See notes on depends_on_param - elif this_result["best_rmsd"] >= 0.8 * min( - this_result["mean_rmsd"], this_result["median_rmsd"] - ): - vprint( - verbose, - "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( - name, - attribute, - result["key"][2], - this_result["best_rmsd"], - this_result["mean_rmsd"], - this_result["median_rmsd"], - ), - ) - else: - fit_result[result["key"][2]] = this_result - return fit_result - - -class AnalyticModel: - u""" - Parameter-aware analytic energy/data size/... model. - - Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. - - These provide measurements aggregated by (function/state/...) name - and (for by_param) parameter values. Layout: - dictionary with one key per name ('send', 'TX', ...) or - one key per name and parameter combination - (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). - - Parameter values must be ordered corresponding to the lexically sorted parameter names. - - Each element is in turn a dict with the following elements: - - param: list of parameter values in each measurement (-> list of lists) - - attributes: list of keys that should be analyzed, - e.g. ['power', 'duration'] - - for each attribute mentioned in 'attributes': A list with measurements. - All list except for 'attributes' must have the same length. - - For example: - parameters = ['foo_count', 'irrelevant'] - by_name = { - 'foo' : [1, 1, 2], - 'bar' : [5, 6, 7], - 'attributes' : ['foo', 'bar'], - 'param' : [[1, 0], [1, 0], [2, 0]] - } - - methods: - get_static -- return static (parameter-unaware) model. - get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param. - get_fitted -- return parameter-aware model using fitted functions for behaviour prediction. - - variables: - names -- function/state/... names (i.e., the keys of by_name) - parameters -- parameter names - stats -- ParamStats object providing parameter-dependency statistics for each name and attribute - assess -- calculate model quality - """ - - def __init__( - self, - by_name, - parameters, - arg_count=None, - function_override=dict(), - verbose=True, - use_corrcoef=False, - ): - """ - Create a new AnalyticModel and compute parameter statistics. - - :param by_name: measurements aggregated by (function/state/...) name. - Layout: dictionary with one key per name ('send', 'TX', ...) or - one key per name and parameter combination - (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). - - Parameter values must be ordered corresponding to the lexically sorted parameter names. - - Each element is in turn a dict with the following elements: - - param: list of parameter values in each measurement (-> list of lists) - - attributes: list of keys that should be analyzed, - e.g. ['power', 'duration'] - - for each attribute mentioned in 'attributes': A list with measurements. - All list except for 'attributes' must have the same length. - - For example: - parameters = ['foo_count', 'irrelevant'] - by_name = { - 'foo' : [1, 1, 2], - 'duration' : [5, 6, 7], - 'attributes' : ['foo', 'duration'], - 'param' : [[1, 0], [1, 0], [2, 0]] - # foo_count-^ ^-irrelevant - } - :param parameters: List of parameter names - :param function_override: dict of overrides for automatic parameter function generation. - If (state or transition name, model attribute) is present in function_override, - the corresponding text string is the function used for analytic (parameter-aware/fitted) - modeling of this attribute. It is passed to AnalyticFunction, see - there for the required format. Note that this happens regardless of - parameter dependency detection: The provided analytic function will be assigned - even if it seems like the model attribute is static / parameter-independent. - :param verbose: Print debug/info output while generating the model? - :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter - """ - self.cache = dict() - self.by_name = by_name - self.by_param = by_name_to_by_param(by_name) - self.names = sorted(by_name.keys()) - self.parameters = sorted(parameters) - self.function_override = function_override.copy() - self.verbose = verbose - self._use_corrcoef = use_corrcoef - self._num_args = arg_count - if self._num_args is None: - self._num_args = _num_args_from_by_name(by_name) - - self.stats = ParamStats( - self.by_name, - self.by_param, - self.parameters, - self._num_args, - verbose=verbose, - use_corrcoef=use_corrcoef, - ) - - def _get_model_from_dict(self, model_dict, model_function): - model = {} - for name, elem in model_dict.items(): - model[name] = {} - for key in elem["attributes"]: - try: - model[name][key] = model_function(elem[key]) - except RuntimeWarning: - vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) - except FloatingPointError as fpe: - vprint( - self.verbose, - "[W] Got no data for {} {}: {}".format(name, key, fpe), - ) - return model - - def param_index(self, param_name): - if param_name in self.parameters: - return self.parameters.index(param_name) - return len(self.parameters) + int(param_name) - - def param_name(self, param_index): - if param_index < len(self.parameters): - return self.parameters[param_index] - return str(param_index) - - def get_static(self, use_mean=False): - """ - Get static model function: name, attribute -> model value. - - Uses the median of by_name for modeling. - """ - getter_function = np.median - - if use_mean: - getter_function = np.mean - - static_model = self._get_model_from_dict(self.by_name, getter_function) - - def static_model_getter(name, key, **kwargs): - return static_model[name][key] - - return static_model_getter - - def get_param_lut(self, fallback=False): - """ - Get parameter-look-up-table model function: name, attribute, parameter values -> model value. - - The function can only give model values for parameter combinations - present in by_param. By default, it raises KeyError for other values. - - arguments: - fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values - """ - static_model = self._get_model_from_dict(self.by_name, np.median) - lut_model = self._get_model_from_dict(self.by_param, np.median) - - def lut_median_getter(name, key, param, arg=[], **kwargs): - param.extend(map(soft_cast_int, arg)) - try: - return lut_model[(name, tuple(param))][key] - except KeyError: - if fallback: - return static_model[name][key] - raise - - return lut_median_getter - - def get_fitted(self, safe_functions_enabled=False): - """ - Get paramete-aware model function and model information function. - - Returns two functions: - model_function(name, attribute, param=parameter values) -> model value. - model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None - """ - if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: - return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] - - static_model = self._get_model_from_dict(self.by_name, np.median) - param_model = dict([[name, {}] for name in self.by_name.keys()]) - paramfit = ParallelParamFit(self.by_param) - - for name in self.by_name.keys(): - for attribute in self.by_name[name]["attributes"]: - for param_index, param in enumerate(self.parameters): - if self.stats.depends_on_param(name, attribute, param): - paramfit.enqueue(name, attribute, param_index, param, False) - if arg_support_enabled and name in self._num_args: - for arg_index in range(self._num_args[name]): - if self.stats.depends_on_arg(name, attribute, arg_index): - paramfit.enqueue( - name, - attribute, - len(self.parameters) + arg_index, - arg_index, - False, - ) - - paramfit.fit() - - for name in self.by_name.keys(): - num_args = 0 - if name in self._num_args: - num_args = self._num_args[name] - for attribute in self.by_name[name]["attributes"]: - fit_result = get_fit_result( - paramfit.results, name, attribute, self.verbose - ) - - if (name, attribute) in self.function_override: - function_str = self.function_override[(name, attribute)] - x = AnalyticFunction(function_str, self.parameters, num_args) - x.fit(self.by_param, name, attribute) - if x.fit_success: - param_model[name][attribute] = { - "fit_result": fit_result, - "function": x, - } - elif len(fit_result.keys()): - x = analytic.function_powerset( - fit_result, self.parameters, num_args - ) - x.fit(self.by_param, name, attribute) - - if x.fit_success: - param_model[name][attribute] = { - "fit_result": fit_result, - "function": x, - } - - def model_getter(name, key, **kwargs): - if "arg" in kwargs and "param" in kwargs: - kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) - if key in param_model[name]: - param_list = kwargs["param"] - param_function = param_model[name][key]["function"] - if param_function.is_predictable(param_list): - return param_function.eval(param_list) - return static_model[name][key] - - def info_getter(name, key): - if key in param_model[name]: - return param_model[name][key] - return None - - self.cache["fitted_model_getter"] = model_getter - self.cache["fitted_info_getter"] = info_getter - - return model_getter, info_getter - - def assess(self, model_function): - """ - Calculate MAE, SMAPE, etc. of model_function for each by_name entry. - - state/transition/... name and parameter values are fed into model_function. - The by_name entries of this AnalyticModel are used as ground truth and - compared with the values predicted by model_function. - - For proper model assessments, the data used to generate model_function - and the data fed into this AnalyticModel instance must be mutually - exclusive (e.g. by performing cross validation). Otherwise, - overfitting cannot be detected. - """ - detailed_results = {} - for name, elem in sorted(self.by_name.items()): - detailed_results[name] = {} - for attribute in elem["attributes"]: - predicted_data = np.array( - list( - map( - lambda i: model_function( - name, attribute, param=elem["param"][i] - ), - range(len(elem[attribute])), - ) - ) - ) - measures = regression_measures(predicted_data, elem[attribute]) - detailed_results[name][attribute] = measures - - return { - "by_name": detailed_results, - } - - def to_json(self): - # TODO - pass - - def _add_trace_data_to_aggregate(aggregate, key, element): # Only cares about element['isa'], element['offline_aggregates'], and # element['plan']['level'] @@ -2049,540 +1161,6 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]): return by_name, parameter_names, arg_count -class PTAModel: - u""" - Parameter-aware PTA-based energy model. - - Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. - - The model heavily relies on two internal data structures: - PTAModel.by_name and PTAModel.by_param. - - These provide measurements aggregated by state/transition name - and (for by_param) parameter values. Layout: - dictionary with one key per state/transition ('send', 'TX', ...) or - one key per state/transition and parameter combination - (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). - For by_param, parameter values are ordered corresponding to the lexically sorted parameter names. - - Each element is in turn a dict with the following elements: - - isa: 'state' or 'transition' - - power: list of mean power measurements in µW - - duration: list of durations in µs - - power_std: list of stddev of power per state/transition - - energy: consumed energy (power*duration) in pJ - - paramkeys: list of parameter names in each measurement (-> list of lists) - - param: list of parameter values in each measurement (-> list of lists) - - attributes: list of keys that should be analyzed, - e.g. ['power', 'duration'] - additionally, only if isa == 'transition': - - timeout: list of duration of previous state in µs - - rel_energy_prev: transition energy relative to previous state mean power in pJ - - rel_energy_next: transition energy relative to next state mean power in pJ - """ - - def __init__( - self, - by_name, - parameters, - arg_count, - traces=[], - ignore_trace_indexes=[], - discard_outliers=None, - function_override={}, - verbose=True, - use_corrcoef=False, - pta=None, - ): - """ - Prepare a new PTA energy model. - - Actual model generation is done on-demand by calling the respective functions. - - arguments: - by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate. - parameters -- list of parameter names, as returned by pta_trace_to_aggregate - arg_count -- function arguments, as returned by pta_trace_to_aggregate - traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data() - ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored. - discard_outliers -- currently not supported: threshold for outlier detection and removel (float). - Outlier detection is performed individually for each state/transition in each trace, - so it only works if the benchmark ran several times. - Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace), - "m" (the median of all attribute measurements with the same parameters, which may include data from other traces), - a data point X is considered an outlier if - | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers . - function_override -- dict of overrides for automatic parameter function generation. - If (state or transition name, model attribute) is present in function_override, - the corresponding text string is the function used for analytic (parameter-aware/fitted) - modeling of this attribute. It is passed to AnalyticFunction, see - there for the required format. Note that this happens regardless of - parameter dependency detection: The provided analytic function will be assigned - even if it seems like the model attribute is static / parameter-independent. - verbose -- print informative output, e.g. when removing an outlier - use_corrcoef -- use correlation coefficient instead of stddev comparison - to detect whether a model attribute depends on a parameter - pta -- hardware model as `PTA` object - """ - self.by_name = by_name - self.by_param = by_name_to_by_param(by_name) - self._parameter_names = sorted(parameters) - self._num_args = arg_count - self._use_corrcoef = use_corrcoef - self.traces = traces - self.stats = ParamStats( - self.by_name, - self.by_param, - self._parameter_names, - self._num_args, - self._use_corrcoef, - verbose=verbose, - ) - self.cache = {} - np.seterr("raise") - self._outlier_threshold = discard_outliers - self.function_override = function_override.copy() - self.verbose = verbose - self.pta = pta - self.ignore_trace_indexes = ignore_trace_indexes - self._aggregate_to_ndarray(self.by_name) - - def _aggregate_to_ndarray(self, aggregate): - for elem in aggregate.values(): - for key in elem["attributes"]: - elem[key] = np.array(elem[key]) - - # This heuristic is very similar to the "function is not much better than - # median" checks in get_fitted. So far, doing it here as well is mostly - # a performance and not an algorithm quality decision. - # --df, 2018-04-18 - def depends_on_param(self, state_or_trans, key, param): - return self.stats.depends_on_param(state_or_trans, key, param) - - # See notes on depends_on_param - def depends_on_arg(self, state_or_trans, key, param): - return self.stats.depends_on_arg(state_or_trans, key, param) - - def _get_model_from_dict(self, model_dict, model_function): - model = {} - for name, elem in model_dict.items(): - model[name] = {} - for key in elem["attributes"]: - try: - model[name][key] = model_function(elem[key]) - except RuntimeWarning: - vprint(self.verbose, "[W] Got no data for {} {}".format(name, key)) - except FloatingPointError as fpe: - vprint( - self.verbose, - "[W] Got no data for {} {}: {}".format(name, key, fpe), - ) - return model - - def get_static(self, use_mean=False): - """ - Get static model function: name, attribute -> model value. - - Uses the median of by_name for modeling, unless `use_mean` is set. - """ - getter_function = np.median - - if use_mean: - getter_function = np.mean - - static_model = self._get_model_from_dict(self.by_name, getter_function) - - def static_model_getter(name, key, **kwargs): - return static_model[name][key] - - return static_model_getter - - def get_param_lut(self, fallback=False): - """ - Get parameter-look-up-table model function: name, attribute, parameter values -> model value. - - The function can only give model values for parameter combinations - present in by_param. By default, it raises KeyError for other values. - - arguments: - fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values - """ - static_model = self._get_model_from_dict(self.by_name, np.median) - lut_model = self._get_model_from_dict(self.by_param, np.median) - - def lut_median_getter(name, key, param, arg=[], **kwargs): - param.extend(map(soft_cast_int, arg)) - try: - return lut_model[(name, tuple(param))][key] - except KeyError: - if fallback: - return static_model[name][key] - raise - - return lut_median_getter - - def param_index(self, param_name): - if param_name in self._parameter_names: - return self._parameter_names.index(param_name) - return len(self._parameter_names) + int(param_name) - - def param_name(self, param_index): - if param_index < len(self._parameter_names): - return self._parameter_names[param_index] - return str(param_index) - - def get_fitted(self, safe_functions_enabled=False): - """ - Get parameter-aware model function and model information function. - - Returns two functions: - model_function(name, attribute, param=parameter values) -> model value. - model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None - """ - if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: - return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] - - static_model = self._get_model_from_dict(self.by_name, np.median) - param_model = dict( - [[state_or_tran, {}] for state_or_tran in self.by_name.keys()] - ) - paramfit = ParallelParamFit(self.by_param) - for state_or_tran in self.by_name.keys(): - for model_attribute in self.by_name[state_or_tran]["attributes"]: - fit_results = {} - for parameter_index, parameter_name in enumerate(self._parameter_names): - if self.depends_on_param( - state_or_tran, model_attribute, parameter_name - ): - paramfit.enqueue( - state_or_tran, - model_attribute, - parameter_index, - parameter_name, - safe_functions_enabled, - ) - for ( - codependent_param_dict - ) in self.stats.codependent_parameter_value_dicts( - state_or_tran, model_attribute, parameter_name - ): - paramfit.enqueue( - state_or_tran, - model_attribute, - parameter_index, - parameter_name, - safe_functions_enabled, - codependent_param_dict, - ) - if ( - arg_support_enabled - and self.by_name[state_or_tran]["isa"] == "transition" - ): - for arg_index in range(self._num_args[state_or_tran]): - if self.depends_on_arg( - state_or_tran, model_attribute, arg_index - ): - paramfit.enqueue( - state_or_tran, - model_attribute, - len(self._parameter_names) + arg_index, - arg_index, - safe_functions_enabled, - ) - paramfit.fit() - - for state_or_tran in self.by_name.keys(): - num_args = 0 - if ( - arg_support_enabled - and self.by_name[state_or_tran]["isa"] == "transition" - ): - num_args = self._num_args[state_or_tran] - for model_attribute in self.by_name[state_or_tran]["attributes"]: - fit_results = get_fit_result( - paramfit.results, state_or_tran, model_attribute, self.verbose - ) - - for parameter_name in self._parameter_names: - if self.depends_on_param( - state_or_tran, model_attribute, parameter_name - ): - for ( - codependent_param_dict - ) in self.stats.codependent_parameter_value_dicts( - state_or_tran, model_attribute, parameter_name - ): - pass - # FIXME get_fit_result hat ja gar keinen Parameter als Argument... - - if (state_or_tran, model_attribute) in self.function_override: - function_str = self.function_override[ - (state_or_tran, model_attribute) - ] - x = AnalyticFunction(function_str, self._parameter_names, num_args) - x.fit(self.by_param, state_or_tran, model_attribute) - if x.fit_success: - param_model[state_or_tran][model_attribute] = { - "fit_result": fit_results, - "function": x, - } - elif len(fit_results.keys()): - x = analytic.function_powerset( - fit_results, self._parameter_names, num_args - ) - x.fit(self.by_param, state_or_tran, model_attribute) - if x.fit_success: - param_model[state_or_tran][model_attribute] = { - "fit_result": fit_results, - "function": x, - } - - def model_getter(name, key, **kwargs): - if "arg" in kwargs and "param" in kwargs: - kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) - if key in param_model[name]: - param_list = kwargs["param"] - param_function = param_model[name][key]["function"] - if param_function.is_predictable(param_list): - return param_function.eval(param_list) - return static_model[name][key] - - def info_getter(name, key): - if key in param_model[name]: - return param_model[name][key] - return None - - self.cache["fitted_model_getter"] = model_getter - self.cache["fitted_info_getter"] = info_getter - - return model_getter, info_getter - - def to_json(self): - static_model = self.get_static() - static_quality = self.assess(static_model) - param_model, param_info = self.get_fitted() - analytic_quality = self.assess(param_model) - self.pta.update( - static_model, - param_info, - static_error=static_quality["by_name"], - analytic_error=analytic_quality["by_name"], - ) - return self.pta.to_json() - - def states(self): - """Return sorted list of state names.""" - return sorted( - list( - filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys()) - ) - ) - - def transitions(self): - """Return sorted list of transition names.""" - return sorted( - list( - filter( - lambda k: self.by_name[k]["isa"] == "transition", - self.by_name.keys(), - ) - ) - ) - - def states_and_transitions(self): - """Return list of states and transition names.""" - ret = self.states() - ret.extend(self.transitions()) - return ret - - def parameters(self): - return self._parameter_names - - def attributes(self, state_or_trans): - return self.by_name[state_or_trans]["attributes"] - - def assess(self, model_function): - """ - Calculate MAE, SMAPE, etc. of model_function for each by_name entry. - - state/transition/... name and parameter values are fed into model_function. - The by_name entries of this PTAModel are used as ground truth and - compared with the values predicted by model_function. - - For proper model assessments, the data used to generate model_function - and the data fed into this AnalyticModel instance must be mutually - exclusive (e.g. by performing cross validation). Otherwise, - overfitting cannot be detected. - """ - detailed_results = {} - for name, elem in sorted(self.by_name.items()): - detailed_results[name] = {} - for key in elem["attributes"]: - predicted_data = np.array( - list( - map( - lambda i: model_function(name, key, param=elem["param"][i]), - range(len(elem[key])), - ) - ) - ) - measures = regression_measures(predicted_data, elem[key]) - detailed_results[name][key] = measures - - return {"by_name": detailed_results} - - def assess_states( - self, model_function, model_attribute="power", distribution: dict = None - ): - """ - Calculate overall model error assuming equal distribution of states - """ - # TODO calculate mean power draw for distribution and use it to - # calculate relative error from MAE combination - model_quality = self.assess(model_function) - num_states = len(self.states()) - if distribution is None: - distribution = dict(map(lambda x: [x, 1 / num_states], self.states())) - - if not np.isclose(sum(distribution.values()), 1): - raise ValueError( - "distribution must be a probability distribution with sum 1" - ) - - # total_value = None - # try: - # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states())) - # except KeyError: - # pass - - total_error = np.sqrt( - sum( - map( - lambda x: np.square( - model_quality["by_name"][x][model_attribute]["mae"] - * distribution[x] - ), - self.states(), - ) - ) - ) - return total_error - - def assess_on_traces(self, model_function): - """ - Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance. - - :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`. - Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the - traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states - """ - model_energy_list = [] - real_energy_list = [] - model_rel_energy_list = [] - model_state_energy_list = [] - model_duration_list = [] - real_duration_list = [] - model_timeout_list = [] - real_timeout_list = [] - - for trace in self.traces: - if trace["id"] not in self.ignore_trace_indexes: - for rep_id in range(len(trace["trace"][0]["offline"])): - model_energy = 0.0 - real_energy = 0.0 - model_rel_energy = 0.0 - model_state_energy = 0.0 - model_duration = 0.0 - real_duration = 0.0 - model_timeout = 0.0 - real_timeout = 0.0 - for i, trace_part in enumerate(trace["trace"]): - name = trace_part["name"] - prev_name = trace["trace"][i - 1]["name"] - isa = trace_part["isa"] - if name != "UNINITIALIZED": - try: - param = trace_part["offline_aggregates"]["param"][ - rep_id - ] - prev_param = trace["trace"][i - 1][ - "offline_aggregates" - ]["param"][rep_id] - power = trace_part["offline"][rep_id]["uW_mean"] - duration = trace_part["offline"][rep_id]["us"] - prev_duration = trace["trace"][i - 1]["offline"][ - rep_id - ]["us"] - real_energy += power * duration - if isa == "state": - model_energy += ( - model_function(name, "power", param=param) - * duration - ) - else: - model_energy += model_function( - name, "energy", param=param - ) - # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data - if i == 1: - model_rel_energy += model_function( - name, "energy", param=param - ) - else: - model_rel_energy += model_function( - prev_name, "power", param=prev_param - ) * (prev_duration + duration) - model_state_energy += model_function( - prev_name, "power", param=prev_param - ) * (prev_duration + duration) - model_rel_energy += model_function( - name, "rel_energy_prev", param=param - ) - real_duration += duration - model_duration += model_function( - name, "duration", param=param - ) - if ( - "plan" in trace_part - and trace_part["plan"]["level"] == "epilogue" - ): - real_timeout += trace_part["offline"][rep_id][ - "timeout" - ] - model_timeout += model_function( - name, "timeout", param=param - ) - except KeyError: - # if states/transitions have been removed via --filter-param, this is harmless - pass - real_energy_list.append(real_energy) - model_energy_list.append(model_energy) - model_rel_energy_list.append(model_rel_energy) - model_state_energy_list.append(model_state_energy) - real_duration_list.append(real_duration) - model_duration_list.append(model_duration) - real_timeout_list.append(real_timeout) - model_timeout_list.append(model_timeout) - - return { - "duration_by_trace": regression_measures( - np.array(model_duration_list), np.array(real_duration_list) - ), - "energy_by_trace": regression_measures( - np.array(model_energy_list), np.array(real_energy_list) - ), - "timeout_by_trace": regression_measures( - np.array(model_timeout_list), np.array(real_timeout_list) - ), - "rel_energy_by_trace": regression_measures( - np.array(model_rel_energy_list), np.array(real_energy_list) - ), - "state_energy_by_trace": regression_measures( - np.array(model_state_energy_list), np.array(real_energy_list) - ), - } - - class EnergyTraceLog: """ EnergyTrace log loader for DFA traces. @@ -2617,7 +1195,6 @@ class EnergyTraceLog: self.state_duration = state_duration * 1e-3 self.transition_names = transition_names self.with_traces = with_traces - self.verbose = False self.errors = list() # TODO auto-detect @@ -2643,6 +1220,7 @@ class EnergyTraceLog: """ if not zbar_available: + logger.error("zbar module is not available") self.errors.append( 'zbar module is not available. Try "apt install python3-zbar"' ) @@ -2675,11 +1253,10 @@ class EnergyTraceLog: self.sample_rate = data_count / (m_duration_us * 1e-6) - vprint( - self.verbose, + logger.debug( "got {} samples with {} seconds of log data ({} Hz)".format( data_count, m_duration_us * 1e-6, self.sample_rate - ), + ) ) return ( @@ -2783,25 +1360,20 @@ class EnergyTraceLog: for name, duration in expected_transitions: bc, start, stop, end = self.find_barcode(next_barcode) if bc is None: - print('[!!!] did not find transition "{}"'.format(name)) + logger.error('did not find transition "{}"'.format(name)) break next_barcode = end + self.state_duration + duration - vprint( - self.verbose, + logger.debug( '{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format( offline_index, bc, start, stop, end - ), + ) ) if bc != name: - vprint( - self.verbose, - '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc), - ) - vprint( - self.verbose, + logger.error('mismatch: expected "{}", got "{}"'.format(name, bc)) + logger.debug( "{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format( offline_index, end, end + duration - ), + ) ) transition_start_index = self.ts_to_index(end) @@ -2811,13 +1383,12 @@ class EnergyTraceLog: self.ts_to_index(end + duration + self.state_duration) + 1 ) - vprint( - self.verbose, + logger.debug( "{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format( offline_index, transition_start_index / self.sample_rate, transition_done_index / self.sample_rate, - ), + ) ) transition_power_W = self.interval_power[ @@ -2912,11 +1483,10 @@ class EnergyTraceLog: + self.led_power / 3 ) - vprint( - self.verbose, + logger.debug( "looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format( start_ts, sync_threshold_power * 1e3 - ), + ) ) sync_area_start = None @@ -2947,11 +1517,10 @@ class EnergyTraceLog: barcode_data = self.interval_power[sync_area_start:sync_area_end] - vprint( - self.verbose, + logger.debug( "barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format( sync_start_ts, sync_end_ts, len(barcode_data) - ), + ) ) bc, start, stop, padding_bits = self.find_barcode_in_power_data(barcode_data) @@ -3026,7 +1595,7 @@ class EnergyTraceLog: return content, sym_start, sym_end, padding_bits else: - vprint(self.verbose, "unable to find barcode") + logger.warning("unable to find barcode") return None, None, None, None @@ -3046,17 +1615,15 @@ class MIMOSA: Resulting data is a list of state/transition/state/transition/... measurements. """ - def __init__(self, voltage: float, shunt: int, verbose=True, with_traces=False): + def __init__(self, voltage: float, shunt: int, with_traces=False): """ Initialize MIMOSA loader for a specific voltage and shunt setting. :param voltage: MIMOSA DUT supply voltage (V) :para mshunt: MIMOSA Shunt (Ohms) - :param verbose: print notices about invalid data on STDOUT? """ self.voltage = voltage self.shunt = shunt - self.verbose = verbose self.with_traces = with_traces self.r1 = 984 # "1k" self.r2 = 99013 # "100k" @@ -3254,7 +1821,7 @@ class MIMOSA: if cal_r2_mean > cal_0_mean: b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean) else: - vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2)) + logger.warning("0 uA == %.f uA during calibration" % (ua_r2)) b_lower = 0 b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean) @@ -3302,50 +1869,6 @@ class MIMOSA: return calfunc, caldata - """ - def calcgrad(self, currents, threshold): - grad = np.gradient(running_mean(currents * self.voltage, 10)) - # len(grad) == len(currents) - 9 - subst = [] - lastgrad = 0 - for i in range(len(grad)): - # minimum substate duration: 10ms - if np.abs(grad[i]) > threshold and i - lastgrad > 50: - # account for skew introduced by running_mean and current - # ramp slope (parasitic capacitors etc.) - subst.append(i+10) - lastgrad = i - if lastgrad != i: - subst.append(i+10) - return subst - - # TODO konfigurierbare min/max threshold und len(gradidx) > X, binaere - # Sache nach noetiger threshold. postprocessing mit - # "zwei benachbarte substates haben sehr aehnliche werte / niedrige stddev" -> mergen - # ... min/max muessen nicht vorgegeben werden, sind ja bekannt (0 / np.max(grad)) - # TODO bei substates / index foo den offset durch running_mean beachten - # TODO ggf. clustering der 'abs(grad) > threshold' und bestimmung interessanter - # uebergaenge dadurch? - def gradfoo(self, currents): - gradients = np.abs(np.gradient(running_mean(currents * self.voltage, 10))) - gradmin = np.min(gradients) - gradmax = np.max(gradients) - threshold = np.mean([gradmin, gradmax]) - gradidx = self.calcgrad(currents, threshold) - num_substates = 2 - while len(gradidx) != num_substates: - if gradmax - gradmin < 0.1: - # We did our best - return threshold, gradidx - if len(gradidx) > num_substates: - gradmin = threshold - else: - gradmax = threshold - threshold = np.mean([gradmin, gradmax]) - gradidx = self.calcgrad(currents, threshold) - return threshold, gradidx - """ - def analyze_states(self, charges, trigidx, ua_func): u""" Split log data into states and transitions and return duration, energy, and mean power for each element. @@ -3380,30 +1903,6 @@ class MIMOSA: for idx in trigger_indices: range_raw = charges[previdx:idx] range_ua = ua_func(range_raw) - substates = {} - - if previdx != 0 and idx - previdx > 200: - thr, subst = 0, [] # self.gradfoo(range_ua) - if len(subst): - statelist = [] - prevsubidx = 0 - for subidx in subst: - statelist.append( - { - "duration": (subidx - prevsubidx) * 10, - "uW_mean": np.mean( - range_ua[prevsubidx:subidx] * self.voltage - ), - "uW_std": np.std( - range_ua[prevsubidx:subidx] * self.voltage - ), - } - ) - prevsubidx = subidx - substates = { - "threshold": thr, - "states": statelist, - } isa = "state" if not is_state: @@ -3422,12 +1921,6 @@ class MIMOSA: if self.with_traces: data["uW"] = range_ua * self.voltage - if "states" in substates: - data["substates"] = substates - ssum = np.sum(list(map(lambda x: x["duration"], substates["states"]))) - if ssum != data["us"]: - vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum)) - if isa == "transition": # subtract average power of previous state # (that is, the state from which this transition originates) diff --git a/lib/model.py b/lib/model.py new file mode 100644 index 0000000..bb4a45b --- /dev/null +++ b/lib/model.py @@ -0,0 +1,1156 @@ +#!/usr/bin/env python3 + +import logging +import numpy as np +from scipy import optimize +from sklearn.metrics import r2_score +from multiprocessing import Pool +from .automata import PTA +from .functions import analytic +from .functions import AnalyticFunction +from .parameters import ParamStats +from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple +from .utils import by_name_to_by_param, match_parameter_values + +logger = logging.getLogger(__name__) +arg_support_enabled = True + + +def aggregate_measures(aggregate: float, actual: list) -> dict: + """ + Calculate error measures for model value on data list. + + arguments: + aggregate -- model value (float or int) + actual -- real-world / reference values (list of float or int) + + return value: + See regression_measures + """ + aggregate_array = np.array([aggregate] * len(actual)) + return regression_measures(aggregate_array, np.array(actual)) + + +def regression_measures(predicted: np.ndarray, actual: np.ndarray): + """ + Calculate error measures by comparing model values to reference values. + + arguments: + predicted -- model values (np.ndarray) + actual -- real-world / reference values (np.ndarray) + + Returns a dict containing the following measures: + mae -- Mean Absolute Error + mape -- Mean Absolute Percentage Error, + if all items in actual are non-zero (NaN otherwise) + smape -- Symmetric Mean Absolute Percentage Error, + if no 0,0-pairs are present in actual and predicted (NaN otherwise) + msd -- Mean Square Deviation + rmsd -- Root Mean Square Deviation + ssr -- Sum of Squared Residuals + rsq -- R^2 measure, see sklearn.metrics.r2_score + count -- Number of values + """ + if type(predicted) != np.ndarray: + raise ValueError("first arg must be ndarray, is {}".format(type(predicted))) + if type(actual) != np.ndarray: + raise ValueError("second arg must be ndarray, is {}".format(type(actual))) + deviations = predicted - actual + # mean = np.mean(actual) + if len(deviations) == 0: + return {} + measures = { + "mae": np.mean(np.abs(deviations), dtype=np.float64), + "msd": np.mean(deviations ** 2, dtype=np.float64), + "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64), + "ssr": np.sum(deviations ** 2, dtype=np.float64), + "rsq": r2_score(actual, predicted), + "count": len(actual), + } + + # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64) + + if np.all(actual != 0): + measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure + else: + measures["mape"] = np.nan + if np.all(np.abs(predicted) + np.abs(actual) != 0): + measures["smape"] = ( + np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) + * 100 + ) + else: + measures["smape"] = np.nan + # if np.all(rsq_quotient != 0): + # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient + + return measures + + +class ParallelParamFit: + """ + Fit a set of functions on parameterized measurements. + + One parameter is variale, all others are fixed. Reports the best-fitting + function type for each parameter. + """ + + def __init__(self, by_param): + """Create a new ParallelParamFit object.""" + self.fit_queue = [] + self.by_param = by_param + + def enqueue( + self, + state_or_tran, + attribute, + param_index, + param_name, + safe_functions_enabled=False, + param_filter=None, + ): + """ + Add state_or_tran/attribute/param_name to fit queue. + + This causes fit() to compute the best-fitting function for this model part. + """ + self.fit_queue.append( + { + "key": [state_or_tran, attribute, param_name, param_filter], + "args": [ + self.by_param, + state_or_tran, + attribute, + param_index, + safe_functions_enabled, + param_filter, + ], + } + ) + + def fit(self): + """ + Fit functions on previously enqueue data. + + Fitting is one in parallel with one process per core. + + Results can be accessed using the public ParallelParamFit.results object. + """ + with Pool() as pool: + self.results = pool.map(_try_fits_parallel, self.fit_queue) + + def get_result(self, name, attribute, param_filter: dict = None): + """ + Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'. + + Filters out results where the best function is worse (or not much better than) static mean/median estimates. + + :param name: state/transition/... name, e.g. 'TX' + :param attribute: model attribute, e.g. 'duration' + :param param_filter: + :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } + """ + fit_result = dict() + for result in self.results: + if ( + result["key"][0] == name + and result["key"][1] == attribute + and result["key"][3] == param_filter + and result["result"]["best"] is not None + ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? + this_result = result["result"] + if this_result["best_rmsd"] >= min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ) + ) + # See notes on depends_on_param + elif this_result["best_rmsd"] >= 0.8 * min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ) + ) + else: + fit_result[result["key"][2]] = this_result + return fit_result + + +def _try_fits_parallel(arg): + """ + Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result. + + Must be a global function as it is called from a multiprocessing Pool. + """ + return {"key": arg["key"], "result": _try_fits(*arg["args"])} + + +def _try_fits( + by_param, + state_or_tran, + model_attribute, + param_index, + safe_functions_enabled=False, + param_filter: dict = None, +): + """ + Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions. + + This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters. + The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function. + Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored. + Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`. + + :returns: a dictionary with the following elements: + best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data. + best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters + mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value + median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value + results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values + + :param by_param: measurements partitioned by state/transition/... name and parameter values. + Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}` + + :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple). + Example: `'foo'` + + :param model_attribute: attribute for which goodness-of-fit will be calculated. + Example: `'bar'` + + :param param_index: index of the parameter used as model input + :param safe_functions_enabled: Include "safe" variants of functions with limited argument range. + :param param_filter: Only use measurements whose parameters match param_filter for fitting. + """ + + functions = analytic.functions(safe_functions_enabled=safe_functions_enabled) + + for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()): + # We might remove elements from 'functions' while iterating over + # its keys. A generator will not allow this, so we need to + # convert to a list. + function_names = list(functions.keys()) + for function_name in function_names: + function_object = functions[function_name] + if is_numeric(param_key[1][param_index]) and not function_object.is_valid( + param_key[1][param_index] + ): + functions.pop(function_name, None) + + raw_results = dict() + raw_results_by_param = dict() + ref_results = {"mean": list(), "median": list()} + results = dict() + results_by_param = dict() + + seen_parameter_combinations = set() + + # for each parameter combination: + for param_key in filter( + lambda x: x[0] == state_or_tran + and remove_index_from_tuple(x[1], param_index) + not in seen_parameter_combinations + and len(by_param[x]["param"]) + and match_parameter_values(by_param[x]["param"][0], param_filter), + by_param.keys(), + ): + X = [] + Y = [] + num_valid = 0 + num_total = 0 + + # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1, + # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters + seen_parameter_combinations.add( + remove_index_from_tuple(param_key[1], param_index) + ) + + # for each value of the parameter denoted by param_index (all other parameters remain the same): + for k, v in filter( + lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items() + ): + num_total += 1 + if is_numeric(k[1][param_index]): + num_valid += 1 + X.extend([float(k[1][param_index])] * len(v[model_attribute])) + Y.extend(v[model_attribute]) + + if num_valid > 2: + X = np.array(X) + Y = np.array(Y) + other_parameters = remove_index_from_tuple(k[1], param_index) + raw_results_by_param[other_parameters] = dict() + results_by_param[other_parameters] = dict() + for function_name, param_function in functions.items(): + if function_name not in raw_results: + raw_results[function_name] = dict() + error_function = param_function.error_function + res = optimize.least_squares( + error_function, [0, 1], args=(X, Y), xtol=2e-15 + ) + measures = regression_measures(param_function.eval(res.x, X), Y) + raw_results_by_param[other_parameters][function_name] = measures + for measure, error_rate in measures.items(): + if measure not in raw_results[function_name]: + raw_results[function_name][measure] = list() + raw_results[function_name][measure].append(error_rate) + # print(function_name, res, measures) + mean_measures = aggregate_measures(np.mean(Y), Y) + ref_results["mean"].append(mean_measures["rmsd"]) + raw_results_by_param[other_parameters]["mean"] = mean_measures + median_measures = aggregate_measures(np.median(Y), Y) + ref_results["median"].append(median_measures["rmsd"]) + raw_results_by_param[other_parameters]["median"] = median_measures + + if not len(ref_results["mean"]): + # Insufficient data for fitting + # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index)) + return {"best": None, "best_rmsd": np.inf, "results": results} + + for ( + other_parameter_combination, + other_parameter_results, + ) in raw_results_by_param.items(): + best_fit_val = np.inf + best_fit_name = None + results = dict() + for function_name, result in other_parameter_results.items(): + if len(result) > 0: + results[function_name] = result + rmsd = result["rmsd"] + if rmsd < best_fit_val: + best_fit_val = rmsd + best_fit_name = function_name + results_by_param[other_parameter_combination] = { + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": results["mean"]["rmsd"], + "median_rmsd": results["median"]["rmsd"], + "results": results, + } + + best_fit_val = np.inf + best_fit_name = None + results = dict() + for function_name, result in raw_results.items(): + if len(result) > 0: + results[function_name] = {} + for measure in result.keys(): + results[function_name][measure] = np.mean(result[measure]) + rmsd = results[function_name]["rmsd"] + if rmsd < best_fit_val: + best_fit_val = rmsd + best_fit_name = function_name + + return { + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": np.mean(ref_results["mean"]), + "median_rmsd": np.mean(ref_results["median"]), + "results": results, + "results_by_other_param": results_by_param, + } + + +def _num_args_from_by_name(by_name): + num_args = dict() + for key, value in by_name.items(): + if "args" in value: + num_args[key] = len(value["args"][0]) + return num_args + + +class AnalyticModel: + u""" + Parameter-aware analytic energy/data size/... model. + + Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. + + These provide measurements aggregated by (function/state/...) name + and (for by_param) parameter values. Layout: + dictionary with one key per name ('send', 'TX', ...) or + one key per name and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + + Parameter values must be ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + - for each attribute mentioned in 'attributes': A list with measurements. + All list except for 'attributes' must have the same length. + + For example: + parameters = ['foo_count', 'irrelevant'] + by_name = { + 'foo' : [1, 1, 2], + 'bar' : [5, 6, 7], + 'attributes' : ['foo', 'bar'], + 'param' : [[1, 0], [1, 0], [2, 0]] + } + + methods: + get_static -- return static (parameter-unaware) model. + get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param. + get_fitted -- return parameter-aware model using fitted functions for behaviour prediction. + + variables: + names -- function/state/... names (i.e., the keys of by_name) + parameters -- parameter names + stats -- ParamStats object providing parameter-dependency statistics for each name and attribute + assess -- calculate model quality + """ + + def __init__( + self, + by_name, + parameters, + arg_count=None, + function_override=dict(), + use_corrcoef=False, + ): + """ + Create a new AnalyticModel and compute parameter statistics. + + :param by_name: measurements aggregated by (function/state/...) name. + Layout: dictionary with one key per name ('send', 'TX', ...) or + one key per name and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + + Parameter values must be ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + - for each attribute mentioned in 'attributes': A list with measurements. + All list except for 'attributes' must have the same length. + + For example: + parameters = ['foo_count', 'irrelevant'] + by_name = { + 'foo' : [1, 1, 2], + 'duration' : [5, 6, 7], + 'attributes' : ['foo', 'duration'], + 'param' : [[1, 0], [1, 0], [2, 0]] + # foo_count-^ ^-irrelevant + } + :param parameters: List of parameter names + :param function_override: dict of overrides for automatic parameter function generation. + If (state or transition name, model attribute) is present in function_override, + the corresponding text string is the function used for analytic (parameter-aware/fitted) + modeling of this attribute. It is passed to AnalyticFunction, see + there for the required format. Note that this happens regardless of + parameter dependency detection: The provided analytic function will be assigned + even if it seems like the model attribute is static / parameter-independent. + :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter + """ + self.cache = dict() + self.by_name = by_name + self.by_param = by_name_to_by_param(by_name) + self.names = sorted(by_name.keys()) + self.parameters = sorted(parameters) + self.function_override = function_override.copy() + self._use_corrcoef = use_corrcoef + self._num_args = arg_count + if self._num_args is None: + self._num_args = _num_args_from_by_name(by_name) + + self.stats = ParamStats( + self.by_name, + self.by_param, + self.parameters, + self._num_args, + use_corrcoef=use_corrcoef, + ) + + def _get_model_from_dict(self, model_dict, model_function): + model = {} + for name, elem in model_dict.items(): + model[name] = {} + for key in elem["attributes"]: + try: + model[name][key] = model_function(elem[key]) + except RuntimeWarning: + logger.warning("Got no data for {} {}".format(name, key)) + except FloatingPointError as fpe: + logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) + return model + + def param_index(self, param_name): + if param_name in self.parameters: + return self.parameters.index(param_name) + return len(self.parameters) + int(param_name) + + def param_name(self, param_index): + if param_index < len(self.parameters): + return self.parameters[param_index] + return str(param_index) + + def get_static(self, use_mean=False): + """ + Get static model function: name, attribute -> model value. + + Uses the median of by_name for modeling. + """ + getter_function = np.median + + if use_mean: + getter_function = np.mean + + static_model = self._get_model_from_dict(self.by_name, getter_function) + + def static_model_getter(name, key, **kwargs): + return static_model[name][key] + + return static_model_getter + + def get_param_lut(self, fallback=False): + """ + Get parameter-look-up-table model function: name, attribute, parameter values -> model value. + + The function can only give model values for parameter combinations + present in by_param. By default, it raises KeyError for other values. + + arguments: + fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values + """ + static_model = self._get_model_from_dict(self.by_name, np.median) + lut_model = self._get_model_from_dict(self.by_param, np.median) + + def lut_median_getter(name, key, param, arg=[], **kwargs): + param.extend(map(soft_cast_int, arg)) + try: + return lut_model[(name, tuple(param))][key] + except KeyError: + if fallback: + return static_model[name][key] + raise + + return lut_median_getter + + def get_fitted(self, safe_functions_enabled=False): + """ + Get paramete-aware model function and model information function. + + Returns two functions: + model_function(name, attribute, param=parameter values) -> model value. + model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None + """ + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] + + static_model = self._get_model_from_dict(self.by_name, np.median) + param_model = dict([[name, {}] for name in self.by_name.keys()]) + paramfit = ParallelParamFit(self.by_param) + + for name in self.by_name.keys(): + for attribute in self.by_name[name]["attributes"]: + for param_index, param in enumerate(self.parameters): + if self.stats.depends_on_param(name, attribute, param): + paramfit.enqueue(name, attribute, param_index, param, False) + if arg_support_enabled and name in self._num_args: + for arg_index in range(self._num_args[name]): + if self.stats.depends_on_arg(name, attribute, arg_index): + paramfit.enqueue( + name, + attribute, + len(self.parameters) + arg_index, + arg_index, + False, + ) + + paramfit.fit() + + for name in self.by_name.keys(): + num_args = 0 + if name in self._num_args: + num_args = self._num_args[name] + for attribute in self.by_name[name]["attributes"]: + fit_result = paramfit.get_result(name, attribute) + + if (name, attribute) in self.function_override: + function_str = self.function_override[(name, attribute)] + x = AnalyticFunction(function_str, self.parameters, num_args) + x.fit(self.by_param, name, attribute) + if x.fit_success: + param_model[name][attribute] = { + "fit_result": fit_result, + "function": x, + } + elif len(fit_result.keys()): + x = analytic.function_powerset( + fit_result, self.parameters, num_args + ) + x.fit(self.by_param, name, attribute) + + if x.fit_success: + param_model[name][attribute] = { + "fit_result": fit_result, + "function": x, + } + + def model_getter(name, key, **kwargs): + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) + if key in param_model[name]: + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] + if param_function.is_predictable(param_list): + return param_function.eval(param_list) + return static_model[name][key] + + def info_getter(name, key): + if key in param_model[name]: + return param_model[name][key] + return None + + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter + + return model_getter, info_getter + + def assess(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each by_name entry. + + state/transition/... name and parameter values are fed into model_function. + The by_name entries of this AnalyticModel are used as ground truth and + compared with the values predicted by model_function. + + For proper model assessments, the data used to generate model_function + and the data fed into this AnalyticModel instance must be mutually + exclusive (e.g. by performing cross validation). Otherwise, + overfitting cannot be detected. + """ + detailed_results = {} + for name, elem in sorted(self.by_name.items()): + detailed_results[name] = {} + for attribute in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function( + name, attribute, param=elem["param"][i] + ), + range(len(elem[attribute])), + ) + ) + ) + measures = regression_measures(predicted_data, elem[attribute]) + detailed_results[name][attribute] = measures + + return {"by_name": detailed_results} + + def to_json(self): + # TODO + pass + + +class PTAModel: + u""" + Parameter-aware PTA-based energy model. + + Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. + + The model heavily relies on two internal data structures: + PTAModel.by_name and PTAModel.by_param. + + These provide measurements aggregated by state/transition name + and (for by_param) parameter values. Layout: + dictionary with one key per state/transition ('send', 'TX', ...) or + one key per state/transition and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + For by_param, parameter values are ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - isa: 'state' or 'transition' + - power: list of mean power measurements in µW + - duration: list of durations in µs + - power_std: list of stddev of power per state/transition + - energy: consumed energy (power*duration) in pJ + - paramkeys: list of parameter names in each measurement (-> list of lists) + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + additionally, only if isa == 'transition': + - timeout: list of duration of previous state in µs + - rel_energy_prev: transition energy relative to previous state mean power in pJ + - rel_energy_next: transition energy relative to next state mean power in pJ + """ + + def __init__( + self, + by_name, + parameters, + arg_count, + traces=[], + ignore_trace_indexes=[], + function_override={}, + use_corrcoef=False, + pta=None, + ): + """ + Prepare a new PTA energy model. + + Actual model generation is done on-demand by calling the respective functions. + + arguments: + by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate. + parameters -- list of parameter names, as returned by pta_trace_to_aggregate + arg_count -- function arguments, as returned by pta_trace_to_aggregate + traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data() + ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored. + function_override -- dict of overrides for automatic parameter function generation. + If (state or transition name, model attribute) is present in function_override, + the corresponding text string is the function used for analytic (parameter-aware/fitted) + modeling of this attribute. It is passed to AnalyticFunction, see + there for the required format. Note that this happens regardless of + parameter dependency detection: The provided analytic function will be assigned + even if it seems like the model attribute is static / parameter-independent. + use_corrcoef -- use correlation coefficient instead of stddev comparison + to detect whether a model attribute depends on a parameter + pta -- hardware model as `PTA` object + """ + self.by_name = by_name + self.by_param = by_name_to_by_param(by_name) + self._parameter_names = sorted(parameters) + self._num_args = arg_count + self._use_corrcoef = use_corrcoef + self.traces = traces + self.stats = ParamStats( + self.by_name, + self.by_param, + self._parameter_names, + self._num_args, + self._use_corrcoef, + ) + self.cache = {} + np.seterr("raise") + self.function_override = function_override.copy() + self.pta = pta + self.ignore_trace_indexes = ignore_trace_indexes + self._aggregate_to_ndarray(self.by_name) + + def _aggregate_to_ndarray(self, aggregate): + for elem in aggregate.values(): + for key in elem["attributes"]: + elem[key] = np.array(elem[key]) + + # This heuristic is very similar to the "function is not much better than + # median" checks in get_fitted. So far, doing it here as well is mostly + # a performance and not an algorithm quality decision. + # --df, 2018-04-18 + def depends_on_param(self, state_or_trans, key, param): + return self.stats.depends_on_param(state_or_trans, key, param) + + # See notes on depends_on_param + def depends_on_arg(self, state_or_trans, key, param): + return self.stats.depends_on_arg(state_or_trans, key, param) + + def _get_model_from_dict(self, model_dict, model_function): + model = {} + for name, elem in model_dict.items(): + model[name] = {} + for key in elem["attributes"]: + try: + model[name][key] = model_function(elem[key]) + except RuntimeWarning: + logger.warning("Got no data for {} {}".format(name, key)) + except FloatingPointError as fpe: + logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) + return model + + def get_static(self, use_mean=False): + """ + Get static model function: name, attribute -> model value. + + Uses the median of by_name for modeling, unless `use_mean` is set. + """ + getter_function = np.median + + if use_mean: + getter_function = np.mean + + static_model = self._get_model_from_dict(self.by_name, getter_function) + + def static_model_getter(name, key, **kwargs): + return static_model[name][key] + + return static_model_getter + + def get_param_lut(self, fallback=False): + """ + Get parameter-look-up-table model function: name, attribute, parameter values -> model value. + + The function can only give model values for parameter combinations + present in by_param. By default, it raises KeyError for other values. + + arguments: + fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values + """ + static_model = self._get_model_from_dict(self.by_name, np.median) + lut_model = self._get_model_from_dict(self.by_param, np.median) + + def lut_median_getter(name, key, param, arg=[], **kwargs): + param.extend(map(soft_cast_int, arg)) + try: + return lut_model[(name, tuple(param))][key] + except KeyError: + if fallback: + return static_model[name][key] + raise + + return lut_median_getter + + def param_index(self, param_name): + if param_name in self._parameter_names: + return self._parameter_names.index(param_name) + return len(self._parameter_names) + int(param_name) + + def param_name(self, param_index): + if param_index < len(self._parameter_names): + return self._parameter_names[param_index] + return str(param_index) + + def get_fitted(self, safe_functions_enabled=False): + """ + Get parameter-aware model function and model information function. + + Returns two functions: + model_function(name, attribute, param=parameter values) -> model value. + model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None + """ + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] + + static_model = self._get_model_from_dict(self.by_name, np.median) + param_model = dict( + [[state_or_tran, {}] for state_or_tran in self.by_name.keys()] + ) + paramfit = ParallelParamFit(self.by_param) + for state_or_tran in self.by_name.keys(): + for model_attribute in self.by_name[state_or_tran]["attributes"]: + fit_results = {} + for parameter_index, parameter_name in enumerate(self._parameter_names): + if self.depends_on_param( + state_or_tran, model_attribute, parameter_name + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + parameter_index, + parameter_name, + safe_functions_enabled, + ) + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): + for arg_index in range(self._num_args[state_or_tran]): + if self.depends_on_arg( + state_or_tran, model_attribute, arg_index + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + len(self._parameter_names) + arg_index, + arg_index, + safe_functions_enabled, + ) + paramfit.fit() + + for state_or_tran in self.by_name.keys(): + num_args = 0 + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): + num_args = self._num_args[state_or_tran] + for model_attribute in self.by_name[state_or_tran]["attributes"]: + fit_results = paramfit.get_result(state_or_tran, model_attribute) + + if (state_or_tran, model_attribute) in self.function_override: + function_str = self.function_override[ + (state_or_tran, model_attribute) + ] + x = AnalyticFunction(function_str, self._parameter_names, num_args) + x.fit(self.by_param, state_or_tran, model_attribute) + if x.fit_success: + param_model[state_or_tran][model_attribute] = { + "fit_result": fit_results, + "function": x, + } + elif len(fit_results.keys()): + x = analytic.function_powerset( + fit_results, self._parameter_names, num_args + ) + x.fit(self.by_param, state_or_tran, model_attribute) + if x.fit_success: + param_model[state_or_tran][model_attribute] = { + "fit_result": fit_results, + "function": x, + } + + def model_getter(name, key, **kwargs): + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) + if key in param_model[name]: + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] + if param_function.is_predictable(param_list): + return param_function.eval(param_list) + return static_model[name][key] + + def info_getter(name, key): + if key in param_model[name]: + return param_model[name][key] + return None + + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter + + return model_getter, info_getter + + def to_json(self): + static_model = self.get_static() + static_quality = self.assess(static_model) + param_model, param_info = self.get_fitted() + analytic_quality = self.assess(param_model) + pta = self.pta + if pta is None: + pta = PTA(self.states(), parameters=self._parameter_names) + pta.update( + static_model, + param_info, + static_error=static_quality["by_name"], + analytic_error=analytic_quality["by_name"], + ) + return pta.to_json() + + def states(self): + """Return sorted list of state names.""" + return sorted( + list( + filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys()) + ) + ) + + def transitions(self): + """Return sorted list of transition names.""" + return sorted( + list( + filter( + lambda k: self.by_name[k]["isa"] == "transition", + self.by_name.keys(), + ) + ) + ) + + def states_and_transitions(self): + """Return list of states and transition names.""" + ret = self.states() + ret.extend(self.transitions()) + return ret + + def parameters(self): + return self._parameter_names + + def attributes(self, state_or_trans): + return self.by_name[state_or_trans]["attributes"] + + def assess(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each by_name entry. + + state/transition/... name and parameter values are fed into model_function. + The by_name entries of this PTAModel are used as ground truth and + compared with the values predicted by model_function. + + For proper model assessments, the data used to generate model_function + and the data fed into this AnalyticModel instance must be mutually + exclusive (e.g. by performing cross validation). Otherwise, + overfitting cannot be detected. + """ + detailed_results = {} + for name, elem in sorted(self.by_name.items()): + detailed_results[name] = {} + for key in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function(name, key, param=elem["param"][i]), + range(len(elem[key])), + ) + ) + ) + measures = regression_measures(predicted_data, elem[key]) + detailed_results[name][key] = measures + + return {"by_name": detailed_results} + + def assess_states( + self, model_function, model_attribute="power", distribution: dict = None + ): + """ + Calculate overall model error assuming equal distribution of states + """ + # TODO calculate mean power draw for distribution and use it to + # calculate relative error from MAE combination + model_quality = self.assess(model_function) + num_states = len(self.states()) + if distribution is None: + distribution = dict(map(lambda x: [x, 1 / num_states], self.states())) + + if not np.isclose(sum(distribution.values()), 1): + raise ValueError( + "distribution must be a probability distribution with sum 1" + ) + + # total_value = None + # try: + # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states())) + # except KeyError: + # pass + + total_error = np.sqrt( + sum( + map( + lambda x: np.square( + model_quality["by_name"][x][model_attribute]["mae"] + * distribution[x] + ), + self.states(), + ) + ) + ) + return total_error + + def assess_on_traces(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance. + + :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`. + Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the + traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states + """ + model_energy_list = [] + real_energy_list = [] + model_rel_energy_list = [] + model_state_energy_list = [] + model_duration_list = [] + real_duration_list = [] + model_timeout_list = [] + real_timeout_list = [] + + for trace in self.traces: + if trace["id"] not in self.ignore_trace_indexes: + for rep_id in range(len(trace["trace"][0]["offline"])): + model_energy = 0.0 + real_energy = 0.0 + model_rel_energy = 0.0 + model_state_energy = 0.0 + model_duration = 0.0 + real_duration = 0.0 + model_timeout = 0.0 + real_timeout = 0.0 + for i, trace_part in enumerate(trace["trace"]): + name = trace_part["name"] + prev_name = trace["trace"][i - 1]["name"] + isa = trace_part["isa"] + if name != "UNINITIALIZED": + try: + param = trace_part["offline_aggregates"]["param"][ + rep_id + ] + prev_param = trace["trace"][i - 1][ + "offline_aggregates" + ]["param"][rep_id] + power = trace_part["offline"][rep_id]["uW_mean"] + duration = trace_part["offline"][rep_id]["us"] + prev_duration = trace["trace"][i - 1]["offline"][ + rep_id + ]["us"] + real_energy += power * duration + if isa == "state": + model_energy += ( + model_function(name, "power", param=param) + * duration + ) + else: + model_energy += model_function( + name, "energy", param=param + ) + # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data + if i == 1: + model_rel_energy += model_function( + name, "energy", param=param + ) + else: + model_rel_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_state_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_rel_energy += model_function( + name, "rel_energy_prev", param=param + ) + real_duration += duration + model_duration += model_function( + name, "duration", param=param + ) + if ( + "plan" in trace_part + and trace_part["plan"]["level"] == "epilogue" + ): + real_timeout += trace_part["offline"][rep_id][ + "timeout" + ] + model_timeout += model_function( + name, "timeout", param=param + ) + except KeyError: + # if states/transitions have been removed via --filter-param, this is harmless + pass + real_energy_list.append(real_energy) + model_energy_list.append(model_energy) + model_rel_energy_list.append(model_rel_energy) + model_state_energy_list.append(model_state_energy) + real_duration_list.append(real_duration) + model_duration_list.append(model_duration) + real_timeout_list.append(real_timeout) + model_timeout_list.append(model_timeout) + + return { + "duration_by_trace": regression_measures( + np.array(model_duration_list), np.array(real_duration_list) + ), + "energy_by_trace": regression_measures( + np.array(model_energy_list), np.array(real_energy_list) + ), + "timeout_by_trace": regression_measures( + np.array(model_timeout_list), np.array(real_timeout_list) + ), + "rel_energy_by_trace": regression_measures( + np.array(model_rel_energy_list), np.array(real_energy_list) + ), + "state_energy_by_trace": regression_measures( + np.array(model_state_energy_list), np.array(real_energy_list) + ), + } diff --git a/lib/parameters.py b/lib/parameters.py index 8b562b6..5c6b978 100644 --- a/lib/parameters.py +++ b/lib/parameters.py @@ -1,11 +1,15 @@ import itertools +import logging import numpy as np +import warnings from collections import OrderedDict from copy import deepcopy from multiprocessing import Pool from .utils import remove_index_from_tuple, is_numeric from .utils import filter_aggregate_by_param, by_name_to_by_param +logger = logging.getLogger(__name__) + def distinct_param_values(by_name, state_or_tran): """ @@ -78,25 +82,7 @@ def _reduce_param_matrix(matrix: np.ndarray, parameter_names: list) -> list: return list() -def _codependent_parameters(param, lut_by_param_values, std_by_param_values): - """ - Return list of parameters which affect whether a parameter affects a model attribute or not. - """ - return list() - safe_div = np.vectorize(lambda x, y: 0.0 if x == 0 else 1 - x / y) - ratio_by_value = safe_div(lut_by_param_values, std_by_param_values) - err_mode = np.seterr("ignore") - dep_by_value = ratio_by_value > 0.5 - np.seterr(**err_mode) - - other_param_list = list(filter(lambda x: x != param, self._parameter_names)) - influencer_parameters = _reduce_param_matrix(dep_by_value, other_param_list) - return influencer_parameters - - -def _std_by_param( - by_param, all_param_values, state_or_tran, attribute, param_index, verbose=False -): +def _std_by_param(by_param, all_param_values, state_or_tran, attribute, param_index): u""" Calculate standard deviations for a static model where all parameters but `param_index` are constant. @@ -162,12 +148,11 @@ def _std_by_param( # vprint(verbose, '[W] parameter value partition for {} is empty'.format(param_value)) if np.all(np.isnan(stddev_matrix)): - print( - "[W] {}/{} parameter #{} has no data partitions -- how did this even happen?".format( - state_or_tran, attribute, param_index + warnings.warn( + "{}/{} parameter #{} has no data partitions. stddev_matrix = {}".format( + state_or_tran, attribute, param_index, stddev_matrix ) ) - print("stddev_matrix = {}".format(stddev_matrix)) return stddev_matrix, 0.0 return ( @@ -202,13 +187,13 @@ def _corr_by_param(by_name, state_or_trans, attribute, param_index): # -> assume no correlation return 0.0 except ValueError: - print( - "[!] Exception in _corr_by_param(by_name, state_or_trans={}, attribute={}, param_index={})".format( + logger.error( + "ValueError in _corr_by_param(by_name, state_or_trans={}, attribute={}, param_index={})".format( state_or_trans, attribute, param_index ) ) - print( - "[!] while executing np.corrcoef(by_name[{}][{}]={}, {}))".format( + logger.error( + "while executing np.corrcoef(by_name[{}][{}]={}, {}))".format( state_or_trans, attribute, by_name[state_or_trans][attribute], @@ -229,7 +214,6 @@ def _compute_param_statistics( attribute, distinct_values, distinct_values_by_param_index, - verbose=False, ): """ Compute standard deviation and correlation coefficient for various data partitions. @@ -252,7 +236,6 @@ def _compute_param_statistics( :param arg_count: dict providing the number of functions args ("local parameters") for each function. :param state_or_trans: state or transition name, e.g. 'send' or 'TX' :param attribute: model attribute, e.g. 'power' or 'duration' - :param verbose: print warning if some parameter partitions are too small for fitting :returns: a dict with the following content: std_static -- static parameter-unaware model error: stddev of by_name[state_or_trans][attribute] @@ -267,6 +250,8 @@ def _compute_param_statistics( corr_by_param -- correlation coefficient corr_by_arg -- same, but ignoring a single function argument Only set if state_or_trans appears in arg_count, empty dict otherwise. + depends_on_param -- dict(parameter_name -> Bool). True if /attribute/ behaviour probably depends on /parameter_name/ + depends_on_arg -- list(bool). Same, but for function arguments, if any. """ ret = { "std_static": np.std(by_name[state_or_trans][attribute]), @@ -287,7 +272,6 @@ def _compute_param_statistics( "corr_by_arg": [], "depends_on_param": {}, "depends_on_arg": [], - "param_data": {}, } np.seterr("raise") @@ -299,7 +283,6 @@ def _compute_param_statistics( state_or_trans, attribute, param_idx, - verbose, ) ret["std_by_param"][param] = mean_std ret["std_by_param_values"][param] = std_matrix @@ -314,49 +297,6 @@ def _compute_param_statistics( ret["std_param_lut"], ) - if ret["depends_on_param"][param]: - ret["param_data"][param] = { - "codependent_parameters": _codependent_parameters( - param, lut_matrix, std_matrix - ), - "depends_for_codependent_value": dict(), - } - - # calculate parameter dependence for individual values of codependent parameters - codependent_param_values = list() - for codependent_param in ret["param_data"][param]["codependent_parameters"]: - codependent_param_values.append(distinct_values[codependent_param]) - for combi in itertools.product(*codependent_param_values): - by_name_part = deepcopy(by_name) - filter_list = list( - zip(ret["param_data"][param]["codependent_parameters"], combi) - ) - filter_aggregate_by_param(by_name_part, parameter_names, filter_list) - by_param_part = by_name_to_by_param(by_name_part) - # there may be no data for this specific parameter value combination - if state_or_trans in by_name_part: - part_corr = _corr_by_param( - by_name_part, state_or_trans, attribute, param_idx - ) - part_std_lut = np.mean( - [ - np.std(by_param_part[x][attribute]) - for x in by_param_part.keys() - if x[0] == state_or_trans - ] - ) - _, part_std_param, _ = _std_by_param( - by_param_part, - distinct_values_by_param_index, - state_or_trans, - attribute, - param_idx, - verbose, - ) - ret["param_data"][param]["depends_for_codependent_value"][ - combi - ] = _depends_on_param(part_corr, part_std_param, part_std_lut) - if state_or_trans in arg_count: for arg_index in range(arg_count[state_or_trans]): std_matrix, mean_std, lut_matrix = _std_by_param( @@ -365,7 +305,6 @@ def _compute_param_statistics( state_or_trans, attribute, len(parameter_names) + arg_index, - verbose, ) ret["std_by_arg"].append(mean_std) ret["std_by_arg_values"].append(std_matrix) @@ -447,8 +386,8 @@ def prune_dependent_parameters(by_name, parameter_names, correlation_threshold=0 correlation != np.nan and np.abs(correlation) > correlation_threshold ): - print( - "[!] Parameters {} <-> {} are correlated with coefficcient {}".format( + logger.debug( + "Parameters {} <-> {} are correlated with coefficcient {}".format( parameter_names[index_1], parameter_names[index_2], correlation, @@ -458,7 +397,7 @@ def prune_dependent_parameters(by_name, parameter_names, correlation_threshold=0 index_to_remove = index_1 else: index_to_remove = index_2 - print( + logger.debug( " Removing parameter {}".format( parameter_names[index_to_remove] ) @@ -495,13 +434,7 @@ class ParamStats: """ def __init__( - self, - by_name, - by_param, - parameter_names, - arg_count, - use_corrcoef=False, - verbose=False, + self, by_name, by_param, parameter_names, arg_count, use_corrcoef=False, ): """ Compute standard deviation and correlation coefficient on parameterized data partitions. @@ -556,7 +489,6 @@ class ParamStats: attribute, self.distinct_values[state_or_tran], self.distinct_values_by_param_index[state_or_tran], - verbose, ], } ) @@ -592,147 +524,21 @@ class ParamStats: ) > 2 ): - print( - key, - param, - list( - filter( - lambda n: is_numeric(n), - self.distinct_values[key][param], - ) - ), + logger.debug( + "{} can be fitted for param {} on {}".format( + key, + param, + list( + filter( + lambda n: is_numeric(n), + self.distinct_values[key][param], + ) + ), + ) ) return True return False - def static_submodel_params(self, state_or_tran, attribute): - """ - Return the union of all parameter values which decide whether another parameter influences the model or not. - - I.e., the returned list of dicts contains one entry for each parameter value combination which (probably) does not have any parameter influencing the model. - If the current parameters matches one of these, a static sub-model built based on this subset of parameters can likely be used. - """ - # TODO - pass - - def has_codependent_parameters( - self, state_or_tran: str, attribute: str, param: str - ) -> bool: - """ - Return whether there are parameters which determine whether `param` influences `state_or_tran` `attribute` or not. - - :param state_or_tran: model state or transition - :param attribute: model attribute - :param param: parameter name - """ - if len(self.codependent_parameters(state_or_tran, attribute, param)): - return True - return False - - def codependent_parameters( - self, state_or_tran: str, attribute: str, param: str - ) -> list: - """ - Return list of parameters which determine whether `param` influences `state_or_tran` `attribute` or not. - - :param state_or_tran: model state or transition - :param attribute: model attribute - :param param: parameter name - """ - if self.stats[state_or_tran][attribute]["depends_on_param"][param]: - return self.stats[state_or_tran][attribute]["param_data"][param][ - "codependent_parameters" - ] - return list() - - def has_codependent_parameters_union( - self, state_or_tran: str, attribute: str - ) -> bool: - """ - Return whether there is a subset of parameters which decides whether `state_or_tran` `attribute` is static or parameter-dependent - - :param state_or_tran: model state or transition - :param attribute: model attribute - """ - depends_on_a_parameter = False - for param in self._parameter_names: - if self.stats[state_or_tran][attribute]["depends_on_param"][param]: - print("{}/{} depends on {}".format(state_or_tran, attribute, param)) - depends_on_a_parameter = True - if ( - len(self.codependent_parameters(state_or_tran, attribute, param)) - == 0 - ): - print("has no codependent parameters") - # Always depends on this parameter, regardless of other parameters' values - return False - return depends_on_a_parameter - - def codependent_parameters_union(self, state_or_tran: str, attribute: str) -> list: - """ - Return list of parameters which determine whether any parameter influences `state_or_tran` `attribute`. - - :param state_or_tran: model state or transition - :param attribute: model attribute - """ - codependent_parameters = set() - for param in self._parameter_names: - if self.stats[state_or_tran][attribute]["depends_on_param"][param]: - if ( - len(self.codependent_parameters(state_or_tran, attribute, param)) - == 0 - ): - return list(self._parameter_names) - for codependent_param in self.codependent_parameters( - state_or_tran, attribute, param - ): - codependent_parameters.add(codependent_param) - return sorted(codependent_parameters) - - def codependence_by_codependent_param_values( - self, state_or_tran: str, attribute: str, param: str - ) -> dict: - """ - Return dict mapping codependent parameter values to a boolean indicating whether `param` influences `state_or_tran` `attribute`. - - If a dict value is true, `attribute` depends on `param` for the corresponding codependent parameter values, otherwise it does not. - - :param state_or_tran: model state or transition - :param attribute: model attribute - :param param: parameter name - """ - if self.stats[state_or_tran][attribute]["depends_on_param"][param]: - return self.stats[state_or_tran][attribute]["param_data"][param][ - "depends_for_codependent_value" - ] - return dict() - - def codependent_parameter_value_dicts( - self, state_or_tran: str, attribute: str, param: str, kind="dynamic" - ): - """ - Return dicts of codependent parameter key-value mappings for which `param` influences (or does not influence) `state_or_tran` `attribute`. - - :param state_or_tran: model state or transition - :param attribute: model attribute - :param param: parameter name: - :param kind: 'static' or 'dynamic'. If 'dynamic' (the default), returns codependent parameter values for which `param` influences `attribute`. If 'static', returns codependent parameter values for which `param` does not influence `attribute` - """ - codependent_parameters = self.stats[state_or_tran][attribute]["param_data"][ - param - ]["codependent_parameters"] - codependence_info = self.stats[state_or_tran][attribute]["param_data"][param][ - "depends_for_codependent_value" - ] - if len(codependent_parameters) == 0: - return - else: - for param_values, is_dynamic in codependence_info.items(): - if (is_dynamic and kind == "dynamic") or ( - not is_dynamic and kind == "static" - ): - yield dict(zip(codependent_parameters, param_values)) - def _generic_param_independence_ratio(self, state_or_trans, attribute): """ Return the heuristic ratio of parameter independence for state_or_trans and attribute. diff --git a/lib/protocol_benchmarks.py b/lib/protocol_benchmarks.py index b42e821..d41979f 100755 --- a/lib/protocol_benchmarks.py +++ b/lib/protocol_benchmarks.py @@ -16,8 +16,11 @@ import io import os import re import time +import logging from filelock import FileLock +logger = logging.getLogger(__name__) + class DummyProtocol: def __init__(self): @@ -1838,14 +1841,14 @@ class Benchmark: this_result["data"] = data if value != None: this_result[key] = {"v": value, "ts": int(time.time())} - print( + logger.debug( "{} {} {} ({}) :: {} -> {}".format( libkey, bench_name, bench_index, data, key, value ) ) else: this_result[key] = {"e": error, "ts": int(time.time())} - print( + logger.debug( "{} {} {} ({}) :: {} -> [E] {}".format( libkey, bench_name, bench_index, data, key, error[:500] ) diff --git a/lib/runner.py b/lib/runner.py index 16f0a29..77b7c68 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -31,7 +31,8 @@ class SerialReader(serial.threaded.Protocol): """Create a new SerialReader object.""" self.callback = callback self.recv_buf = "" - self.lines = [] + self.lines = list() + self.all_lines = list() def __call__(self): return self @@ -47,7 +48,9 @@ class SerialReader(serial.threaded.Protocol): # Note: Do not call str.strip on lines[-1]! Otherwise, lines may be mangled lines = self.recv_buf.split("\n") if len(lines) > 1: - self.lines.extend(map(str.strip, lines[:-1])) + new_lines = list(map(str.strip, lines[:-1])) + self.lines.extend(new_lines) + self.all_lines.extend(new_lines) self.recv_buf = lines[-1] if self.callback: for line in lines[:-1]: @@ -120,7 +123,7 @@ class SerialMonitor: return self.reader.get_lines() def get_lines(self) -> list: - return self.reader.get_lines() + return self.reader.all_lines def get_files(self) -> list: return list() @@ -143,6 +146,9 @@ class SerialMonitor: class EnergyTraceMonitor(SerialMonitor): """EnergyTraceMonitor captures serial timing output and EnergyTrace energy data.""" + # Zusätzliche key-value-Argumente von generate-dfa-benchmark.py --energytrace=... landen hier + # (z.B. --energytrace=var1=bar,somecount=2 => EnerygTraceMonitor(..., var1="bar", somecount="2")). + # Soald das EnergyTraceMonitor-Objekt erzeugt wird, beginnt die Messung (d.h. hier: msp430-etv wird gestartet) def __init__(self, port: str, baud: int, callback=None, voltage=3.3): super().__init__(port=port, baud=baud, callback=callback) self._voltage = voltage @@ -155,20 +161,31 @@ class EnergyTraceMonitor(SerialMonitor): cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) + # Benchmark fertig -> externe Hilfsprogramme beenden def close(self): super().close() self._logger.send_signal(subprocess.signal.SIGINT) stdout, stderr = self._logger.communicate(timeout=15) + # Zusätzliche Dateien, die mit dem Benchmark-Log und -Plan abgespeichert werden sollen + # (hier: Die von msp430-etv generierten Logfiles) def get_files(self) -> list: return [self._output] + # def get_config(self) -> dict: return { "voltage": self._voltage, } +class EnergyTraceLogicAnalyzerMonitor(EnergyTraceMonitor): + """EnergyTraceLogicAnalyzerMonitor captures EnergyTrace energy data and LogicAnalyzer timing output.""" + + def __init__(self, port: str, baud: int, callback=None, voltage=3.3): + super().__init__(port=port, baud=baud, callback=callback, voltage=voltage) + + class MIMOSAMonitor(SerialMonitor): """MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time.""" @@ -362,8 +379,14 @@ def get_monitor(arch: str, **kwargs) -> object: mimosa_kwargs = kwargs.pop("mimosa") return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs) elif "energytrace" in kwargs and kwargs["energytrace"] is not None: - energytrace_kwargs = kwargs.pop("energytrace") - return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs) + energytrace_kwargs = kwargs.pop("energytrace").copy() + sync_mode = energytrace_kwargs.pop("sync") + if sync_mode == "la": + return EnergyTraceLogicAnalyzerMonitor( + port, arg, **energytrace_kwargs, **kwargs + ) + else: + return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs) else: kwargs.pop("energytrace", None) kwargs.pop("mimosa", None) @@ -382,6 +405,23 @@ def get_counter_limits(arch: str) -> tuple: raise RuntimeError("Did not find Counter Overflow limits") +def sleep_ms(duration: int, arch: str, cpu_freq: int = None) -> str: + max_sleep = None + if "msp430fr" in arch: + if cpu_freq is not None and cpu_freq > 8000000: + max_sleep = 250 + else: + max_sleep = 500 + if max_sleep is not None and duration > max_sleep: + sub_sleep_count = duration // max_sleep + tail_sleep = duration % max_sleep + ret = f"for (unsigned char i = 0; i < {sub_sleep_count}; i++) {{ arch.sleep_ms({max_sleep}); }}\n" + if tail_sleep > 0: + ret += f"arch.sleep_ms({tail_sleep});\n" + return ret + return "arch.sleep_ms({duration});\n" + + def get_counter_limits_us(arch: str) -> tuple: """Return duration of one counter step and one counter overflow in us.""" cpu_freq = 0 diff --git a/lib/utils.py b/lib/utils.py index 91dded0..d28ecda 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1,17 +1,9 @@ import numpy as np import re +import logging arg_support_enabled = True - - -def vprint(verbose, string): - """ - Print `string` if `verbose`. - - Prints string if verbose is a True value - """ - if verbose: - print(string) +logger = logging.getLogger(__name__) def running_mean(x: np.ndarray, N: int) -> np.ndarray: @@ -222,7 +214,7 @@ def filter_aggregate_by_param(aggregate, parameters, parameter_filter): ) ) if len(indices_to_keep) == 0: - print("??? {}->{}".format(parameter_filter, name)) + logger.debug("??? {}->{}".format(parameter_filter, name)) names_to_remove.add(name) else: for attribute in aggregate[name]["attributes"]: diff --git a/lib/validation.py b/lib/validation.py new file mode 100644 index 0000000..ee147fe --- /dev/null +++ b/lib/validation.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 + +import logging +import numpy as np + +logger = logging.getLogger(__name__) + + +def _xv_partitions_kfold(length, k=10): + """ + Return k pairs of training and validation sets for k-fold cross-validation on `length` items. + + In k-fold cross-validation, every k-th item is used for validation and the remainder is used for training. + As there are k ways to do this (items 0, k, 2k, ... vs. items 1, k+1, 2k+1, ... etc), this function returns k pairs of training and validation set. + + Note that this function operates on indices, not data. + """ + pairs = [] + num_slices = k + indexes = np.arange(length) + for i in range(num_slices): + training = np.delete(indexes, slice(i, None, num_slices)) + validation = indexes[i::num_slices] + pairs.append((training, validation)) + return pairs + + +def _xv_partition_montecarlo(length): + """ + Return training and validation set for Monte Carlo cross-validation on `length` items. + + This function operates on indices, not data. It randomly partitions range(length) into a list of training indices and a list of validation indices. + + The training set contains 2/3 of all indices; the validation set consits of the remaining 1/3. + + Example: 9 items -> training = [7, 3, 8, 0, 4, 2], validation = [ 1, 6, 5] + """ + shuffled = np.random.permutation(np.arange(length)) + border = int(length * float(2) / 3) + training = shuffled[:border] + validation = shuffled[border:] + return (training, validation) + + +class CrossValidator: + """ + Cross-Validation helper for model generation. + + Given a set of measurements and a model class, it will partition the + data into training and validation sets, train the model on the training + set, and assess its quality on the validation set. This is repeated + several times depending on cross-validation algorithm and configuration. + Reports the mean model error over all cross-validation runs. + """ + + def __init__(self, model_class, by_name, parameters, arg_count): + """ + Create a new CrossValidator object. + + Does not perform cross-validation yet. + + arguments: + model_class -- model class/type used for model synthesis, + e.g. PTAModel or AnalyticModel. model_class must have a + constructor accepting (by_name, parameters, arg_count) + and provide an `assess` method. + by_name -- measurements aggregated by state/transition/function/... name. + Layout: by_name[name][attribute] = list of data. Additionally, + by_name[name]['attributes'] must be set to the list of attributes, + e.g. ['power'] or ['duration', 'energy']. + """ + self.model_class = model_class + self.by_name = by_name + self.names = sorted(by_name.keys()) + self.parameters = sorted(parameters) + self.arg_count = arg_count + + def kfold(self, model_getter, k=10): + """ + Perform k-fold cross-validation and return average model quality. + + The by_name data is divided into 1-1/k training and 1/k validation in a deterministic manner. + After creating a model for the training set, the + model type returned by model_getter is evaluated on the validation set. + This is repeated k times; the average of all measures is returned to the user. + + arguments: + model_getter -- function with signature (model_object) -> model, + e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware + model with automatic parameter detection. + k -- step size for k-fold cross-validation. The validation set contains 100/k % of data. + + return value: + dict of model quality measures. + { + 'by_name' : { + for each name: { + for each attribute: { + 'mae' : mean of all mean absolute errors + 'mae_list' : list of the individual MAE values encountered during cross-validation + 'smape' : mean of all symmetric mean absolute percentage errors + 'smape_list' : list of the individual SMAPE values encountered during cross-validation + } + } + } + } + """ + + # training / validation subsets for each state and transition + subsets_by_name = dict() + training_and_validation_sets = list() + + for name in self.names: + sample_count = len(self.by_name[name]["param"]) + subsets_by_name[name] = list() + subsets_by_name[name] = _xv_partitions_kfold(sample_count, k) + + for i in range(k): + training_and_validation_sets.append(dict()) + for name in self.names: + training_and_validation_sets[i][name] = subsets_by_name[name][i] + + return self._generic_xv(model_getter, training_and_validation_sets) + + def montecarlo(self, model_getter, count=200): + """ + Perform Monte Carlo cross-validation and return average model quality. + + The by_name data is randomly divided into 2/3 training and 1/3 + validation. After creating a model for the training set, the + model type returned by model_getter is evaluated on the validation set. + This is repeated count times (defaulting to 200); the average of all + measures is returned to the user. + + arguments: + model_getter -- function with signature (model_object) -> model, + e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware + model with automatic parameter detection. + count -- number of validation runs to perform, defaults to 200 + + return value: + dict of model quality measures. + { + 'by_name' : { + for each name: { + for each attribute: { + 'mae' : mean of all mean absolute errors + 'mae_list' : list of the individual MAE values encountered during cross-validation + 'smape' : mean of all symmetric mean absolute percentage errors + 'smape_list' : list of the individual SMAPE values encountered during cross-validation + } + } + } + } + """ + + # training / validation subsets for each state and transition + subsets_by_name = dict() + training_and_validation_sets = list() + + for name in self.names: + sample_count = len(self.by_name[name]["param"]) + subsets_by_name[name] = list() + for _ in range(count): + subsets_by_name[name].append(_xv_partition_montecarlo(sample_count)) + + for i in range(count): + training_and_validation_sets.append(dict()) + for name in self.names: + training_and_validation_sets[i][name] = subsets_by_name[name][i] + + return self._generic_xv(model_getter, training_and_validation_sets) + + def _generic_xv(self, model_getter, training_and_validation_sets): + ret = {"by_name": dict()} + + for name in self.names: + ret["by_name"][name] = dict() + for attribute in self.by_name[name]["attributes"]: + ret["by_name"][name][attribute] = { + "mae_list": list(), + "rmsd_list": list(), + "smape_list": list(), + } + + for training_and_validation_by_name in training_and_validation_sets: + res = self._single_xv(model_getter, training_and_validation_by_name) + for name in self.names: + for attribute in self.by_name[name]["attributes"]: + for measure in ("mae", "rmsd", "smape"): + ret["by_name"][name][attribute][f"{measure}_list"].append( + res["by_name"][name][attribute][measure] + ) + + for name in self.names: + for attribute in self.by_name[name]["attributes"]: + for measure in ("mae", "rmsd", "smape"): + ret["by_name"][name][attribute][measure] = np.mean( + ret["by_name"][name][attribute][f"{measure}_list"] + ) + + return ret + + def _single_xv(self, model_getter, tv_set_dict): + training = dict() + validation = dict() + for name in self.names: + training[name] = {"attributes": self.by_name[name]["attributes"]} + validation[name] = {"attributes": self.by_name[name]["attributes"]} + + if "isa" in self.by_name[name]: + training[name]["isa"] = self.by_name[name]["isa"] + validation[name]["isa"] = self.by_name[name]["isa"] + + training_subset, validation_subset = tv_set_dict[name] + + for attribute in self.by_name[name]["attributes"]: + self.by_name[name][attribute] = np.array(self.by_name[name][attribute]) + training[name][attribute] = self.by_name[name][attribute][ + training_subset + ] + validation[name][attribute] = self.by_name[name][attribute][ + validation_subset + ] + + # We can't use slice syntax for 'param', which may contain strings and other odd values + training[name]["param"] = list() + validation[name]["param"] = list() + for idx in training_subset: + training[name]["param"].append(self.by_name[name]["param"][idx]) + for idx in validation_subset: + validation[name]["param"].append(self.by_name[name]["param"][idx]) + + training_data = self.model_class(training, self.parameters, self.arg_count) + training_model = model_getter(training_data) + validation_data = self.model_class(validation, self.parameters, self.arg_count) + + return validation_data.assess(training_model) |