diff options
Diffstat (limited to 'lib/model.py')
-rw-r--r-- | lib/model.py | 1186 |
1 files changed, 1186 insertions, 0 deletions
diff --git a/lib/model.py b/lib/model.py new file mode 100644 index 0000000..d83c12c --- /dev/null +++ b/lib/model.py @@ -0,0 +1,1186 @@ +#!/usr/bin/env python3 + +import logging +import numpy as np +from scipy import optimize +from sklearn.metrics import r2_score +from multiprocessing import Pool +from .functions import analytic +from .functions import AnalyticFunction +from .parameters import ParamStats +from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple +from .utils import by_name_to_by_param, match_parameter_values + +logger = logging.getLogger(__name__) +arg_support_enabled = True + + +def aggregate_measures(aggregate: float, actual: list) -> dict: + """ + Calculate error measures for model value on data list. + + arguments: + aggregate -- model value (float or int) + actual -- real-world / reference values (list of float or int) + + return value: + See regression_measures + """ + aggregate_array = np.array([aggregate] * len(actual)) + return regression_measures(aggregate_array, np.array(actual)) + + +def regression_measures(predicted: np.ndarray, actual: np.ndarray): + """ + Calculate error measures by comparing model values to reference values. + + arguments: + predicted -- model values (np.ndarray) + actual -- real-world / reference values (np.ndarray) + + Returns a dict containing the following measures: + mae -- Mean Absolute Error + mape -- Mean Absolute Percentage Error, + if all items in actual are non-zero (NaN otherwise) + smape -- Symmetric Mean Absolute Percentage Error, + if no 0,0-pairs are present in actual and predicted (NaN otherwise) + msd -- Mean Square Deviation + rmsd -- Root Mean Square Deviation + ssr -- Sum of Squared Residuals + rsq -- R^2 measure, see sklearn.metrics.r2_score + count -- Number of values + """ + if type(predicted) != np.ndarray: + raise ValueError("first arg must be ndarray, is {}".format(type(predicted))) + if type(actual) != np.ndarray: + raise ValueError("second arg must be ndarray, is {}".format(type(actual))) + deviations = predicted - actual + # mean = np.mean(actual) + if len(deviations) == 0: + return {} + measures = { + "mae": np.mean(np.abs(deviations), dtype=np.float64), + "msd": np.mean(deviations ** 2, dtype=np.float64), + "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64), + "ssr": np.sum(deviations ** 2, dtype=np.float64), + "rsq": r2_score(actual, predicted), + "count": len(actual), + } + + # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64) + + if np.all(actual != 0): + measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure + else: + measures["mape"] = np.nan + if np.all(np.abs(predicted) + np.abs(actual) != 0): + measures["smape"] = ( + np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) + * 100 + ) + else: + measures["smape"] = np.nan + # if np.all(rsq_quotient != 0): + # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient + + return measures + + +class ParallelParamFit: + """ + Fit a set of functions on parameterized measurements. + + One parameter is variale, all others are fixed. Reports the best-fitting + function type for each parameter. + """ + + def __init__(self, by_param): + """Create a new ParallelParamFit object.""" + self.fit_queue = [] + self.by_param = by_param + + def enqueue( + self, + state_or_tran, + attribute, + param_index, + param_name, + safe_functions_enabled=False, + param_filter=None, + ): + """ + Add state_or_tran/attribute/param_name to fit queue. + + This causes fit() to compute the best-fitting function for this model part. + """ + self.fit_queue.append( + { + "key": [state_or_tran, attribute, param_name, param_filter], + "args": [ + self.by_param, + state_or_tran, + attribute, + param_index, + safe_functions_enabled, + param_filter, + ], + } + ) + + def fit(self): + """ + Fit functions on previously enqueue data. + + Fitting is one in parallel with one process per core. + + Results can be accessed using the public ParallelParamFit.results object. + """ + with Pool() as pool: + self.results = pool.map(_try_fits_parallel, self.fit_queue) + + def get_result(self, name, attribute, param_filter: dict = None): + """ + Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'. + + Filters out results where the best function is worse (or not much better than) static mean/median estimates. + + :param name: state/transition/... name, e.g. 'TX' + :param attribute: model attribute, e.g. 'duration' + :param param_filter: + :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } + """ + fit_result = dict() + for result in self.results: + if ( + result["key"][0] == name + and result["key"][1] == attribute + and result["key"][3] == param_filter + and result["result"]["best"] is not None + ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? + this_result = result["result"] + if this_result["best_rmsd"] >= min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ) + ) + # See notes on depends_on_param + elif this_result["best_rmsd"] >= 0.8 * min( + this_result["mean_rmsd"], this_result["median_rmsd"] + ): + logger.debug( + "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( + name, + attribute, + result["key"][2], + this_result["best_rmsd"], + this_result["mean_rmsd"], + this_result["median_rmsd"], + ) + ) + else: + fit_result[result["key"][2]] = this_result + return fit_result + + +def _try_fits_parallel(arg): + """ + Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result. + + Must be a global function as it is called from a multiprocessing Pool. + """ + return {"key": arg["key"], "result": _try_fits(*arg["args"])} + + +def _try_fits( + by_param, + state_or_tran, + model_attribute, + param_index, + safe_functions_enabled=False, + param_filter: dict = None, +): + """ + Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions. + + This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters. + The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function. + Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored. + Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`. + + :returns: a dictionary with the following elements: + best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data. + best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters + mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value + median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value + results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values + + :param by_param: measurements partitioned by state/transition/... name and parameter values. + Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}` + + :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple). + Example: `'foo'` + + :param model_attribute: attribute for which goodness-of-fit will be calculated. + Example: `'bar'` + + :param param_index: index of the parameter used as model input + :param safe_functions_enabled: Include "safe" variants of functions with limited argument range. + :param param_filter: Only use measurements whose parameters match param_filter for fitting. + """ + + functions = analytic.functions(safe_functions_enabled=safe_functions_enabled) + + for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()): + # We might remove elements from 'functions' while iterating over + # its keys. A generator will not allow this, so we need to + # convert to a list. + function_names = list(functions.keys()) + for function_name in function_names: + function_object = functions[function_name] + if is_numeric(param_key[1][param_index]) and not function_object.is_valid( + param_key[1][param_index] + ): + functions.pop(function_name, None) + + raw_results = dict() + raw_results_by_param = dict() + ref_results = {"mean": list(), "median": list()} + results = dict() + results_by_param = dict() + + seen_parameter_combinations = set() + + # for each parameter combination: + for param_key in filter( + lambda x: x[0] == state_or_tran + and remove_index_from_tuple(x[1], param_index) + not in seen_parameter_combinations + and len(by_param[x]["param"]) + and match_parameter_values(by_param[x]["param"][0], param_filter), + by_param.keys(), + ): + X = [] + Y = [] + num_valid = 0 + num_total = 0 + + # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1, + # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters + seen_parameter_combinations.add( + remove_index_from_tuple(param_key[1], param_index) + ) + + # for each value of the parameter denoted by param_index (all other parameters remain the same): + for k, v in filter( + lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items() + ): + num_total += 1 + if is_numeric(k[1][param_index]): + num_valid += 1 + X.extend([float(k[1][param_index])] * len(v[model_attribute])) + Y.extend(v[model_attribute]) + + if num_valid > 2: + X = np.array(X) + Y = np.array(Y) + other_parameters = remove_index_from_tuple(k[1], param_index) + raw_results_by_param[other_parameters] = dict() + results_by_param[other_parameters] = dict() + for function_name, param_function in functions.items(): + if function_name not in raw_results: + raw_results[function_name] = dict() + error_function = param_function.error_function + res = optimize.least_squares( + error_function, [0, 1], args=(X, Y), xtol=2e-15 + ) + measures = regression_measures(param_function.eval(res.x, X), Y) + raw_results_by_param[other_parameters][function_name] = measures + for measure, error_rate in measures.items(): + if measure not in raw_results[function_name]: + raw_results[function_name][measure] = list() + raw_results[function_name][measure].append(error_rate) + # print(function_name, res, measures) + mean_measures = aggregate_measures(np.mean(Y), Y) + ref_results["mean"].append(mean_measures["rmsd"]) + raw_results_by_param[other_parameters]["mean"] = mean_measures + median_measures = aggregate_measures(np.median(Y), Y) + ref_results["median"].append(median_measures["rmsd"]) + raw_results_by_param[other_parameters]["median"] = median_measures + + if not len(ref_results["mean"]): + # Insufficient data for fitting + # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index)) + return {"best": None, "best_rmsd": np.inf, "results": results} + + for ( + other_parameter_combination, + other_parameter_results, + ) in raw_results_by_param.items(): + best_fit_val = np.inf + best_fit_name = None + results = dict() + for function_name, result in other_parameter_results.items(): + if len(result) > 0: + results[function_name] = result + rmsd = result["rmsd"] + if rmsd < best_fit_val: + best_fit_val = rmsd + best_fit_name = function_name + results_by_param[other_parameter_combination] = { + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": results["mean"]["rmsd"], + "median_rmsd": results["median"]["rmsd"], + "results": results, + } + + best_fit_val = np.inf + best_fit_name = None + results = dict() + for function_name, result in raw_results.items(): + if len(result) > 0: + results[function_name] = {} + for measure in result.keys(): + results[function_name][measure] = np.mean(result[measure]) + rmsd = results[function_name]["rmsd"] + if rmsd < best_fit_val: + best_fit_val = rmsd + best_fit_name = function_name + + return { + "best": best_fit_name, + "best_rmsd": best_fit_val, + "mean_rmsd": np.mean(ref_results["mean"]), + "median_rmsd": np.mean(ref_results["median"]), + "results": results, + "results_by_other_param": results_by_param, + } + + +def _num_args_from_by_name(by_name): + num_args = dict() + for key, value in by_name.items(): + if "args" in value: + num_args[key] = len(value["args"][0]) + return num_args + + +class AnalyticModel: + u""" + Parameter-aware analytic energy/data size/... model. + + Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. + + These provide measurements aggregated by (function/state/...) name + and (for by_param) parameter values. Layout: + dictionary with one key per name ('send', 'TX', ...) or + one key per name and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + + Parameter values must be ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + - for each attribute mentioned in 'attributes': A list with measurements. + All list except for 'attributes' must have the same length. + + For example: + parameters = ['foo_count', 'irrelevant'] + by_name = { + 'foo' : [1, 1, 2], + 'bar' : [5, 6, 7], + 'attributes' : ['foo', 'bar'], + 'param' : [[1, 0], [1, 0], [2, 0]] + } + + methods: + get_static -- return static (parameter-unaware) model. + get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param. + get_fitted -- return parameter-aware model using fitted functions for behaviour prediction. + + variables: + names -- function/state/... names (i.e., the keys of by_name) + parameters -- parameter names + stats -- ParamStats object providing parameter-dependency statistics for each name and attribute + assess -- calculate model quality + """ + + def __init__( + self, + by_name, + parameters, + arg_count=None, + function_override=dict(), + use_corrcoef=False, + ): + """ + Create a new AnalyticModel and compute parameter statistics. + + :param by_name: measurements aggregated by (function/state/...) name. + Layout: dictionary with one key per name ('send', 'TX', ...) or + one key per name and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + + Parameter values must be ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + - for each attribute mentioned in 'attributes': A list with measurements. + All list except for 'attributes' must have the same length. + + For example: + parameters = ['foo_count', 'irrelevant'] + by_name = { + 'foo' : [1, 1, 2], + 'duration' : [5, 6, 7], + 'attributes' : ['foo', 'duration'], + 'param' : [[1, 0], [1, 0], [2, 0]] + # foo_count-^ ^-irrelevant + } + :param parameters: List of parameter names + :param function_override: dict of overrides for automatic parameter function generation. + If (state or transition name, model attribute) is present in function_override, + the corresponding text string is the function used for analytic (parameter-aware/fitted) + modeling of this attribute. It is passed to AnalyticFunction, see + there for the required format. Note that this happens regardless of + parameter dependency detection: The provided analytic function will be assigned + even if it seems like the model attribute is static / parameter-independent. + :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter + """ + self.cache = dict() + self.by_name = by_name + self.by_param = by_name_to_by_param(by_name) + self.names = sorted(by_name.keys()) + self.parameters = sorted(parameters) + self.function_override = function_override.copy() + self._use_corrcoef = use_corrcoef + self._num_args = arg_count + if self._num_args is None: + self._num_args = _num_args_from_by_name(by_name) + + self.stats = ParamStats( + self.by_name, + self.by_param, + self.parameters, + self._num_args, + use_corrcoef=use_corrcoef, + ) + + def _get_model_from_dict(self, model_dict, model_function): + model = {} + for name, elem in model_dict.items(): + model[name] = {} + for key in elem["attributes"]: + try: + model[name][key] = model_function(elem[key]) + except RuntimeWarning: + logger.warning("Got no data for {} {}".format(name, key)) + except FloatingPointError as fpe: + logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) + return model + + def param_index(self, param_name): + if param_name in self.parameters: + return self.parameters.index(param_name) + return len(self.parameters) + int(param_name) + + def param_name(self, param_index): + if param_index < len(self.parameters): + return self.parameters[param_index] + return str(param_index) + + def get_static(self, use_mean=False): + """ + Get static model function: name, attribute -> model value. + + Uses the median of by_name for modeling. + """ + getter_function = np.median + + if use_mean: + getter_function = np.mean + + static_model = self._get_model_from_dict(self.by_name, getter_function) + + def static_model_getter(name, key, **kwargs): + return static_model[name][key] + + return static_model_getter + + def get_param_lut(self, fallback=False): + """ + Get parameter-look-up-table model function: name, attribute, parameter values -> model value. + + The function can only give model values for parameter combinations + present in by_param. By default, it raises KeyError for other values. + + arguments: + fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values + """ + static_model = self._get_model_from_dict(self.by_name, np.median) + lut_model = self._get_model_from_dict(self.by_param, np.median) + + def lut_median_getter(name, key, param, arg=[], **kwargs): + param.extend(map(soft_cast_int, arg)) + try: + return lut_model[(name, tuple(param))][key] + except KeyError: + if fallback: + return static_model[name][key] + raise + + return lut_median_getter + + def get_fitted(self, safe_functions_enabled=False): + """ + Get paramete-aware model function and model information function. + + Returns two functions: + model_function(name, attribute, param=parameter values) -> model value. + model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None + """ + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] + + static_model = self._get_model_from_dict(self.by_name, np.median) + param_model = dict([[name, {}] for name in self.by_name.keys()]) + paramfit = ParallelParamFit(self.by_param) + + for name in self.by_name.keys(): + for attribute in self.by_name[name]["attributes"]: + for param_index, param in enumerate(self.parameters): + if self.stats.depends_on_param(name, attribute, param): + paramfit.enqueue(name, attribute, param_index, param, False) + if arg_support_enabled and name in self._num_args: + for arg_index in range(self._num_args[name]): + if self.stats.depends_on_arg(name, attribute, arg_index): + paramfit.enqueue( + name, + attribute, + len(self.parameters) + arg_index, + arg_index, + False, + ) + + paramfit.fit() + + for name in self.by_name.keys(): + num_args = 0 + if name in self._num_args: + num_args = self._num_args[name] + for attribute in self.by_name[name]["attributes"]: + fit_result = paramfit.get_result(name, attribute) + + if (name, attribute) in self.function_override: + function_str = self.function_override[(name, attribute)] + x = AnalyticFunction(function_str, self.parameters, num_args) + x.fit(self.by_param, name, attribute) + if x.fit_success: + param_model[name][attribute] = { + "fit_result": fit_result, + "function": x, + } + elif len(fit_result.keys()): + x = analytic.function_powerset( + fit_result, self.parameters, num_args + ) + x.fit(self.by_param, name, attribute) + + if x.fit_success: + param_model[name][attribute] = { + "fit_result": fit_result, + "function": x, + } + + def model_getter(name, key, **kwargs): + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) + if key in param_model[name]: + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] + if param_function.is_predictable(param_list): + return param_function.eval(param_list) + return static_model[name][key] + + def info_getter(name, key): + if key in param_model[name]: + return param_model[name][key] + return None + + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter + + return model_getter, info_getter + + def assess(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each by_name entry. + + state/transition/... name and parameter values are fed into model_function. + The by_name entries of this AnalyticModel are used as ground truth and + compared with the values predicted by model_function. + + For proper model assessments, the data used to generate model_function + and the data fed into this AnalyticModel instance must be mutually + exclusive (e.g. by performing cross validation). Otherwise, + overfitting cannot be detected. + """ + detailed_results = {} + for name, elem in sorted(self.by_name.items()): + detailed_results[name] = {} + for attribute in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function( + name, attribute, param=elem["param"][i] + ), + range(len(elem[attribute])), + ) + ) + ) + measures = regression_measures(predicted_data, elem[attribute]) + detailed_results[name][attribute] = measures + + return {"by_name": detailed_results} + + def to_json(self): + # TODO + pass + + +class PTAModel: + u""" + Parameter-aware PTA-based energy model. + + Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence. + + The model heavily relies on two internal data structures: + PTAModel.by_name and PTAModel.by_param. + + These provide measurements aggregated by state/transition name + and (for by_param) parameter values. Layout: + dictionary with one key per state/transition ('send', 'TX', ...) or + one key per state/transition and parameter combination + (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...). + For by_param, parameter values are ordered corresponding to the lexically sorted parameter names. + + Each element is in turn a dict with the following elements: + - isa: 'state' or 'transition' + - power: list of mean power measurements in µW + - duration: list of durations in µs + - power_std: list of stddev of power per state/transition + - energy: consumed energy (power*duration) in pJ + - paramkeys: list of parameter names in each measurement (-> list of lists) + - param: list of parameter values in each measurement (-> list of lists) + - attributes: list of keys that should be analyzed, + e.g. ['power', 'duration'] + additionally, only if isa == 'transition': + - timeout: list of duration of previous state in µs + - rel_energy_prev: transition energy relative to previous state mean power in pJ + - rel_energy_next: transition energy relative to next state mean power in pJ + """ + + def __init__( + self, + by_name, + parameters, + arg_count, + traces=[], + ignore_trace_indexes=[], + discard_outliers=None, + function_override={}, + use_corrcoef=False, + pta=None, + ): + """ + Prepare a new PTA energy model. + + Actual model generation is done on-demand by calling the respective functions. + + arguments: + by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate. + parameters -- list of parameter names, as returned by pta_trace_to_aggregate + arg_count -- function arguments, as returned by pta_trace_to_aggregate + traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data() + ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored. + discard_outliers -- currently not supported: threshold for outlier detection and removel (float). + Outlier detection is performed individually for each state/transition in each trace, + so it only works if the benchmark ran several times. + Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace), + "m" (the median of all attribute measurements with the same parameters, which may include data from other traces), + a data point X is considered an outlier if + | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers . + function_override -- dict of overrides for automatic parameter function generation. + If (state or transition name, model attribute) is present in function_override, + the corresponding text string is the function used for analytic (parameter-aware/fitted) + modeling of this attribute. It is passed to AnalyticFunction, see + there for the required format. Note that this happens regardless of + parameter dependency detection: The provided analytic function will be assigned + even if it seems like the model attribute is static / parameter-independent. + use_corrcoef -- use correlation coefficient instead of stddev comparison + to detect whether a model attribute depends on a parameter + pta -- hardware model as `PTA` object + """ + self.by_name = by_name + self.by_param = by_name_to_by_param(by_name) + self._parameter_names = sorted(parameters) + self._num_args = arg_count + self._use_corrcoef = use_corrcoef + self.traces = traces + self.stats = ParamStats( + self.by_name, + self.by_param, + self._parameter_names, + self._num_args, + self._use_corrcoef, + ) + self.cache = {} + np.seterr("raise") + self._outlier_threshold = discard_outliers + self.function_override = function_override.copy() + self.pta = pta + self.ignore_trace_indexes = ignore_trace_indexes + self._aggregate_to_ndarray(self.by_name) + + def _aggregate_to_ndarray(self, aggregate): + for elem in aggregate.values(): + for key in elem["attributes"]: + elem[key] = np.array(elem[key]) + + # This heuristic is very similar to the "function is not much better than + # median" checks in get_fitted. So far, doing it here as well is mostly + # a performance and not an algorithm quality decision. + # --df, 2018-04-18 + def depends_on_param(self, state_or_trans, key, param): + return self.stats.depends_on_param(state_or_trans, key, param) + + # See notes on depends_on_param + def depends_on_arg(self, state_or_trans, key, param): + return self.stats.depends_on_arg(state_or_trans, key, param) + + def _get_model_from_dict(self, model_dict, model_function): + model = {} + for name, elem in model_dict.items(): + model[name] = {} + for key in elem["attributes"]: + try: + model[name][key] = model_function(elem[key]) + except RuntimeWarning: + logger.warning("Got no data for {} {}".format(name, key)) + except FloatingPointError as fpe: + logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) + return model + + def get_static(self, use_mean=False): + """ + Get static model function: name, attribute -> model value. + + Uses the median of by_name for modeling, unless `use_mean` is set. + """ + getter_function = np.median + + if use_mean: + getter_function = np.mean + + static_model = self._get_model_from_dict(self.by_name, getter_function) + + def static_model_getter(name, key, **kwargs): + return static_model[name][key] + + return static_model_getter + + def get_param_lut(self, fallback=False): + """ + Get parameter-look-up-table model function: name, attribute, parameter values -> model value. + + The function can only give model values for parameter combinations + present in by_param. By default, it raises KeyError for other values. + + arguments: + fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values + """ + static_model = self._get_model_from_dict(self.by_name, np.median) + lut_model = self._get_model_from_dict(self.by_param, np.median) + + def lut_median_getter(name, key, param, arg=[], **kwargs): + param.extend(map(soft_cast_int, arg)) + try: + return lut_model[(name, tuple(param))][key] + except KeyError: + if fallback: + return static_model[name][key] + raise + + return lut_median_getter + + def param_index(self, param_name): + if param_name in self._parameter_names: + return self._parameter_names.index(param_name) + return len(self._parameter_names) + int(param_name) + + def param_name(self, param_index): + if param_index < len(self._parameter_names): + return self._parameter_names[param_index] + return str(param_index) + + def get_fitted(self, safe_functions_enabled=False): + """ + Get parameter-aware model function and model information function. + + Returns two functions: + model_function(name, attribute, param=parameter values) -> model value. + model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None + """ + if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: + return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] + + static_model = self._get_model_from_dict(self.by_name, np.median) + param_model = dict( + [[state_or_tran, {}] for state_or_tran in self.by_name.keys()] + ) + paramfit = ParallelParamFit(self.by_param) + for state_or_tran in self.by_name.keys(): + for model_attribute in self.by_name[state_or_tran]["attributes"]: + fit_results = {} + for parameter_index, parameter_name in enumerate(self._parameter_names): + if self.depends_on_param( + state_or_tran, model_attribute, parameter_name + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + parameter_index, + parameter_name, + safe_functions_enabled, + ) + for ( + codependent_param_dict + ) in self.stats.codependent_parameter_value_dicts( + state_or_tran, model_attribute, parameter_name + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + parameter_index, + parameter_name, + safe_functions_enabled, + codependent_param_dict, + ) + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): + for arg_index in range(self._num_args[state_or_tran]): + if self.depends_on_arg( + state_or_tran, model_attribute, arg_index + ): + paramfit.enqueue( + state_or_tran, + model_attribute, + len(self._parameter_names) + arg_index, + arg_index, + safe_functions_enabled, + ) + paramfit.fit() + + for state_or_tran in self.by_name.keys(): + num_args = 0 + if ( + arg_support_enabled + and self.by_name[state_or_tran]["isa"] == "transition" + ): + num_args = self._num_args[state_or_tran] + for model_attribute in self.by_name[state_or_tran]["attributes"]: + fit_results = paramfit.get_result(state_or_tran, model_attribute) + + for parameter_name in self._parameter_names: + if self.depends_on_param( + state_or_tran, model_attribute, parameter_name + ): + for ( + codependent_param_dict + ) in self.stats.codependent_parameter_value_dicts( + state_or_tran, model_attribute, parameter_name + ): + pass + # FIXME paramfit.get_result hat ja gar keinen Parameter als Argument... + + if (state_or_tran, model_attribute) in self.function_override: + function_str = self.function_override[ + (state_or_tran, model_attribute) + ] + x = AnalyticFunction(function_str, self._parameter_names, num_args) + x.fit(self.by_param, state_or_tran, model_attribute) + if x.fit_success: + param_model[state_or_tran][model_attribute] = { + "fit_result": fit_results, + "function": x, + } + elif len(fit_results.keys()): + x = analytic.function_powerset( + fit_results, self._parameter_names, num_args + ) + x.fit(self.by_param, state_or_tran, model_attribute) + if x.fit_success: + param_model[state_or_tran][model_attribute] = { + "fit_result": fit_results, + "function": x, + } + + def model_getter(name, key, **kwargs): + if "arg" in kwargs and "param" in kwargs: + kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) + if key in param_model[name]: + param_list = kwargs["param"] + param_function = param_model[name][key]["function"] + if param_function.is_predictable(param_list): + return param_function.eval(param_list) + return static_model[name][key] + + def info_getter(name, key): + if key in param_model[name]: + return param_model[name][key] + return None + + self.cache["fitted_model_getter"] = model_getter + self.cache["fitted_info_getter"] = info_getter + + return model_getter, info_getter + + def to_json(self): + static_model = self.get_static() + static_quality = self.assess(static_model) + param_model, param_info = self.get_fitted() + analytic_quality = self.assess(param_model) + self.pta.update( + static_model, + param_info, + static_error=static_quality["by_name"], + analytic_error=analytic_quality["by_name"], + ) + return self.pta.to_json() + + def states(self): + """Return sorted list of state names.""" + return sorted( + list( + filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys()) + ) + ) + + def transitions(self): + """Return sorted list of transition names.""" + return sorted( + list( + filter( + lambda k: self.by_name[k]["isa"] == "transition", + self.by_name.keys(), + ) + ) + ) + + def states_and_transitions(self): + """Return list of states and transition names.""" + ret = self.states() + ret.extend(self.transitions()) + return ret + + def parameters(self): + return self._parameter_names + + def attributes(self, state_or_trans): + return self.by_name[state_or_trans]["attributes"] + + def assess(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each by_name entry. + + state/transition/... name and parameter values are fed into model_function. + The by_name entries of this PTAModel are used as ground truth and + compared with the values predicted by model_function. + + For proper model assessments, the data used to generate model_function + and the data fed into this AnalyticModel instance must be mutually + exclusive (e.g. by performing cross validation). Otherwise, + overfitting cannot be detected. + """ + detailed_results = {} + for name, elem in sorted(self.by_name.items()): + detailed_results[name] = {} + for key in elem["attributes"]: + predicted_data = np.array( + list( + map( + lambda i: model_function(name, key, param=elem["param"][i]), + range(len(elem[key])), + ) + ) + ) + measures = regression_measures(predicted_data, elem[key]) + detailed_results[name][key] = measures + + return {"by_name": detailed_results} + + def assess_states( + self, model_function, model_attribute="power", distribution: dict = None + ): + """ + Calculate overall model error assuming equal distribution of states + """ + # TODO calculate mean power draw for distribution and use it to + # calculate relative error from MAE combination + model_quality = self.assess(model_function) + num_states = len(self.states()) + if distribution is None: + distribution = dict(map(lambda x: [x, 1 / num_states], self.states())) + + if not np.isclose(sum(distribution.values()), 1): + raise ValueError( + "distribution must be a probability distribution with sum 1" + ) + + # total_value = None + # try: + # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states())) + # except KeyError: + # pass + + total_error = np.sqrt( + sum( + map( + lambda x: np.square( + model_quality["by_name"][x][model_attribute]["mae"] + * distribution[x] + ), + self.states(), + ) + ) + ) + return total_error + + def assess_on_traces(self, model_function): + """ + Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance. + + :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`. + Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the + traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states + """ + model_energy_list = [] + real_energy_list = [] + model_rel_energy_list = [] + model_state_energy_list = [] + model_duration_list = [] + real_duration_list = [] + model_timeout_list = [] + real_timeout_list = [] + + for trace in self.traces: + if trace["id"] not in self.ignore_trace_indexes: + for rep_id in range(len(trace["trace"][0]["offline"])): + model_energy = 0.0 + real_energy = 0.0 + model_rel_energy = 0.0 + model_state_energy = 0.0 + model_duration = 0.0 + real_duration = 0.0 + model_timeout = 0.0 + real_timeout = 0.0 + for i, trace_part in enumerate(trace["trace"]): + name = trace_part["name"] + prev_name = trace["trace"][i - 1]["name"] + isa = trace_part["isa"] + if name != "UNINITIALIZED": + try: + param = trace_part["offline_aggregates"]["param"][ + rep_id + ] + prev_param = trace["trace"][i - 1][ + "offline_aggregates" + ]["param"][rep_id] + power = trace_part["offline"][rep_id]["uW_mean"] + duration = trace_part["offline"][rep_id]["us"] + prev_duration = trace["trace"][i - 1]["offline"][ + rep_id + ]["us"] + real_energy += power * duration + if isa == "state": + model_energy += ( + model_function(name, "power", param=param) + * duration + ) + else: + model_energy += model_function( + name, "energy", param=param + ) + # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data + if i == 1: + model_rel_energy += model_function( + name, "energy", param=param + ) + else: + model_rel_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_state_energy += model_function( + prev_name, "power", param=prev_param + ) * (prev_duration + duration) + model_rel_energy += model_function( + name, "rel_energy_prev", param=param + ) + real_duration += duration + model_duration += model_function( + name, "duration", param=param + ) + if ( + "plan" in trace_part + and trace_part["plan"]["level"] == "epilogue" + ): + real_timeout += trace_part["offline"][rep_id][ + "timeout" + ] + model_timeout += model_function( + name, "timeout", param=param + ) + except KeyError: + # if states/transitions have been removed via --filter-param, this is harmless + pass + real_energy_list.append(real_energy) + model_energy_list.append(model_energy) + model_rel_energy_list.append(model_rel_energy) + model_state_energy_list.append(model_state_energy) + real_duration_list.append(real_duration) + model_duration_list.append(model_duration) + real_timeout_list.append(real_timeout) + model_timeout_list.append(model_timeout) + + return { + "duration_by_trace": regression_measures( + np.array(model_duration_list), np.array(real_duration_list) + ), + "energy_by_trace": regression_measures( + np.array(model_energy_list), np.array(real_energy_list) + ), + "timeout_by_trace": regression_measures( + np.array(model_timeout_list), np.array(real_timeout_list) + ), + "rel_energy_by_trace": regression_measures( + np.array(model_rel_energy_list), np.array(real_energy_list) + ), + "state_energy_by_trace": regression_measures( + np.array(model_state_energy_list), np.array(real_energy_list) + ), + } |