summaryrefslogtreecommitdiff
path: root/lib/dfatool.py
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-07-06 11:47:05 +0200
committerDaniel Friesel <daniel.friesel@uos.de>2020-07-06 11:47:05 +0200
commitd7ca9acbb668d4c73f07eddf0278c08bbdae7be7 (patch)
tree655b6aac65e5a553c9e0228778fe8f83c305ec04 /lib/dfatool.py
parent1406e32aaa0466f5e43d270b0b10e54702210769 (diff)
Move ParamFit, PTAModel, AnalyticModel to model.py module
Diffstat (limited to 'lib/dfatool.py')
-rw-r--r--lib/dfatool.py1171
1 files changed, 0 insertions, 1171 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py
index 392f5a6..20e198d 100644
--- a/lib/dfatool.py
+++ b/lib/dfatool.py
@@ -92,77 +92,6 @@ def mean_or_none(arr):
return -1
-def aggregate_measures(aggregate: float, actual: list) -> dict:
- """
- Calculate error measures for model value on data list.
-
- arguments:
- aggregate -- model value (float or int)
- actual -- real-world / reference values (list of float or int)
-
- return value:
- See regression_measures
- """
- aggregate_array = np.array([aggregate] * len(actual))
- return regression_measures(aggregate_array, np.array(actual))
-
-
-def regression_measures(predicted: np.ndarray, actual: np.ndarray):
- """
- Calculate error measures by comparing model values to reference values.
-
- arguments:
- predicted -- model values (np.ndarray)
- actual -- real-world / reference values (np.ndarray)
-
- Returns a dict containing the following measures:
- mae -- Mean Absolute Error
- mape -- Mean Absolute Percentage Error,
- if all items in actual are non-zero (NaN otherwise)
- smape -- Symmetric Mean Absolute Percentage Error,
- if no 0,0-pairs are present in actual and predicted (NaN otherwise)
- msd -- Mean Square Deviation
- rmsd -- Root Mean Square Deviation
- ssr -- Sum of Squared Residuals
- rsq -- R^2 measure, see sklearn.metrics.r2_score
- count -- Number of values
- """
- if type(predicted) != np.ndarray:
- raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
- if type(actual) != np.ndarray:
- raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
- deviations = predicted - actual
- # mean = np.mean(actual)
- if len(deviations) == 0:
- return {}
- measures = {
- "mae": np.mean(np.abs(deviations), dtype=np.float64),
- "msd": np.mean(deviations ** 2, dtype=np.float64),
- "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
- "ssr": np.sum(deviations ** 2, dtype=np.float64),
- "rsq": r2_score(actual, predicted),
- "count": len(actual),
- }
-
- # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
-
- if np.all(actual != 0):
- measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
- else:
- measures["mape"] = np.nan
- if np.all(np.abs(predicted) + np.abs(actual) != 0):
- measures["smape"] = (
- np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
- * 100
- )
- else:
- measures["smape"] = np.nan
- # if np.all(rsq_quotient != 0):
- # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
-
- return measures
-
-
class KeysightCSV:
"""Simple loader for Keysight CSV data, as exported by the windows software."""
@@ -1191,581 +1120,6 @@ class RawData:
}
-class ParallelParamFit:
- """
- Fit a set of functions on parameterized measurements.
-
- One parameter is variale, all others are fixed. Reports the best-fitting
- function type for each parameter.
- """
-
- def __init__(self, by_param):
- """Create a new ParallelParamFit object."""
- self.fit_queue = []
- self.by_param = by_param
-
- def enqueue(
- self,
- state_or_tran,
- attribute,
- param_index,
- param_name,
- safe_functions_enabled=False,
- param_filter=None,
- ):
- """
- Add state_or_tran/attribute/param_name to fit queue.
-
- This causes fit() to compute the best-fitting function for this model part.
- """
- self.fit_queue.append(
- {
- "key": [state_or_tran, attribute, param_name, param_filter],
- "args": [
- self.by_param,
- state_or_tran,
- attribute,
- param_index,
- safe_functions_enabled,
- param_filter,
- ],
- }
- )
-
- def fit(self):
- """
- Fit functions on previously enqueue data.
-
- Fitting is one in parallel with one process per core.
-
- Results can be accessed using the public ParallelParamFit.results object.
- """
- with Pool() as pool:
- self.results = pool.map(_try_fits_parallel, self.fit_queue)
-
- def get_result(self, name, attribute, param_filter: dict = None):
- """
- Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
-
- Filters out results where the best function is worse (or not much better than) static mean/median estimates.
-
- :param name: state/transition/... name, e.g. 'TX'
- :param attribute: model attribute, e.g. 'duration'
- :param param_filter:
- :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
- """
- fit_result = dict()
- for result in self.results:
- if (
- result["key"][0] == name
- and result["key"][1] == attribute
- and result["key"][3] == param_filter
- and result["result"]["best"] is not None
- ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
- this_result = result["result"]
- if this_result["best_rmsd"] >= min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- # See notes on depends_on_param
- elif this_result["best_rmsd"] >= 0.8 * min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- else:
- fit_result[result["key"][2]] = this_result
- return fit_result
-
-
-def _try_fits_parallel(arg):
- """
- Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
-
- Must be a global function as it is called from a multiprocessing Pool.
- """
- return {"key": arg["key"], "result": _try_fits(*arg["args"])}
-
-
-def _try_fits(
- by_param,
- state_or_tran,
- model_attribute,
- param_index,
- safe_functions_enabled=False,
- param_filter: dict = None,
-):
- """
- Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
-
- This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
- The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
- Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
- Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`.
-
- :returns: a dictionary with the following elements:
- best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
- best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
- mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
- median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
- results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
-
- :param by_param: measurements partitioned by state/transition/... name and parameter values.
- Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}`
-
- :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple).
- Example: `'foo'`
-
- :param model_attribute: attribute for which goodness-of-fit will be calculated.
- Example: `'bar'`
-
- :param param_index: index of the parameter used as model input
- :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
- :param param_filter: Only use measurements whose parameters match param_filter for fitting.
- """
-
- functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
-
- for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()):
- # We might remove elements from 'functions' while iterating over
- # its keys. A generator will not allow this, so we need to
- # convert to a list.
- function_names = list(functions.keys())
- for function_name in function_names:
- function_object = functions[function_name]
- if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
- param_key[1][param_index]
- ):
- functions.pop(function_name, None)
-
- raw_results = dict()
- raw_results_by_param = dict()
- ref_results = {"mean": list(), "median": list()}
- results = dict()
- results_by_param = dict()
-
- seen_parameter_combinations = set()
-
- # for each parameter combination:
- for param_key in filter(
- lambda x: x[0] == state_or_tran
- and remove_index_from_tuple(x[1], param_index)
- not in seen_parameter_combinations
- and len(by_param[x]["param"])
- and match_parameter_values(by_param[x]["param"][0], param_filter),
- by_param.keys(),
- ):
- X = []
- Y = []
- num_valid = 0
- num_total = 0
-
- # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
- # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
- seen_parameter_combinations.add(
- remove_index_from_tuple(param_key[1], param_index)
- )
-
- # for each value of the parameter denoted by param_index (all other parameters remain the same):
- for k, v in filter(
- lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
- ):
- num_total += 1
- if is_numeric(k[1][param_index]):
- num_valid += 1
- X.extend([float(k[1][param_index])] * len(v[model_attribute]))
- Y.extend(v[model_attribute])
-
- if num_valid > 2:
- X = np.array(X)
- Y = np.array(Y)
- other_parameters = remove_index_from_tuple(k[1], param_index)
- raw_results_by_param[other_parameters] = dict()
- results_by_param[other_parameters] = dict()
- for function_name, param_function in functions.items():
- if function_name not in raw_results:
- raw_results[function_name] = dict()
- error_function = param_function.error_function
- res = optimize.least_squares(
- error_function, [0, 1], args=(X, Y), xtol=2e-15
- )
- measures = regression_measures(param_function.eval(res.x, X), Y)
- raw_results_by_param[other_parameters][function_name] = measures
- for measure, error_rate in measures.items():
- if measure not in raw_results[function_name]:
- raw_results[function_name][measure] = list()
- raw_results[function_name][measure].append(error_rate)
- # print(function_name, res, measures)
- mean_measures = aggregate_measures(np.mean(Y), Y)
- ref_results["mean"].append(mean_measures["rmsd"])
- raw_results_by_param[other_parameters]["mean"] = mean_measures
- median_measures = aggregate_measures(np.median(Y), Y)
- ref_results["median"].append(median_measures["rmsd"])
- raw_results_by_param[other_parameters]["median"] = median_measures
-
- if not len(ref_results["mean"]):
- # Insufficient data for fitting
- # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
- return {"best": None, "best_rmsd": np.inf, "results": results}
-
- for (
- other_parameter_combination,
- other_parameter_results,
- ) in raw_results_by_param.items():
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in other_parameter_results.items():
- if len(result) > 0:
- results[function_name] = result
- rmsd = result["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
- results_by_param[other_parameter_combination] = {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": results["mean"]["rmsd"],
- "median_rmsd": results["median"]["rmsd"],
- "results": results,
- }
-
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in raw_results.items():
- if len(result) > 0:
- results[function_name] = {}
- for measure in result.keys():
- results[function_name][measure] = np.mean(result[measure])
- rmsd = results[function_name]["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
-
- return {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": np.mean(ref_results["mean"]),
- "median_rmsd": np.mean(ref_results["median"]),
- "results": results,
- "results_by_other_param": results_by_param,
- }
-
-
-def _num_args_from_by_name(by_name):
- num_args = dict()
- for key, value in by_name.items():
- if "args" in value:
- num_args[key] = len(value["args"][0])
- return num_args
-
-
-class AnalyticModel:
- u"""
- Parameter-aware analytic energy/data size/... model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- These provide measurements aggregated by (function/state/...) name
- and (for by_param) parameter values. Layout:
- dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'bar' : [5, 6, 7],
- 'attributes' : ['foo', 'bar'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- }
-
- methods:
- get_static -- return static (parameter-unaware) model.
- get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param.
- get_fitted -- return parameter-aware model using fitted functions for behaviour prediction.
-
- variables:
- names -- function/state/... names (i.e., the keys of by_name)
- parameters -- parameter names
- stats -- ParamStats object providing parameter-dependency statistics for each name and attribute
- assess -- calculate model quality
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count=None,
- function_override=dict(),
- use_corrcoef=False,
- ):
- """
- Create a new AnalyticModel and compute parameter statistics.
-
- :param by_name: measurements aggregated by (function/state/...) name.
- Layout: dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'duration' : [5, 6, 7],
- 'attributes' : ['foo', 'duration'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- # foo_count-^ ^-irrelevant
- }
- :param parameters: List of parameter names
- :param function_override: dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
- """
- self.cache = dict()
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self.names = sorted(by_name.keys())
- self.parameters = sorted(parameters)
- self.function_override = function_override.copy()
- self._use_corrcoef = use_corrcoef
- self._num_args = arg_count
- if self._num_args is None:
- self._num_args = _num_args_from_by_name(by_name)
-
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self.parameters,
- self._num_args,
- use_corrcoef=use_corrcoef,
- )
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- logger.warning("Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
- return model
-
- def param_index(self, param_name):
- if param_name in self.parameters:
- return self.parameters.index(param_name)
- return len(self.parameters) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self.parameters):
- return self.parameters[param_index]
- return str(param_index)
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get paramete-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict([[name, {}] for name in self.by_name.keys()])
- paramfit = ParallelParamFit(self.by_param)
-
- for name in self.by_name.keys():
- for attribute in self.by_name[name]["attributes"]:
- for param_index, param in enumerate(self.parameters):
- if self.stats.depends_on_param(name, attribute, param):
- paramfit.enqueue(name, attribute, param_index, param, False)
- if arg_support_enabled and name in self._num_args:
- for arg_index in range(self._num_args[name]):
- if self.stats.depends_on_arg(name, attribute, arg_index):
- paramfit.enqueue(
- name,
- attribute,
- len(self.parameters) + arg_index,
- arg_index,
- False,
- )
-
- paramfit.fit()
-
- for name in self.by_name.keys():
- num_args = 0
- if name in self._num_args:
- num_args = self._num_args[name]
- for attribute in self.by_name[name]["attributes"]:
- fit_result = paramfit.get_result(name, attribute)
-
- if (name, attribute) in self.function_override:
- function_str = self.function_override[(name, attribute)]
- x = AnalyticFunction(function_str, self.parameters, num_args)
- x.fit(self.by_param, name, attribute)
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
- elif len(fit_result.keys()):
- x = analytic.function_powerset(
- fit_result, self.parameters, num_args
- )
- x.fit(self.by_param, name, attribute)
-
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this AnalyticModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for attribute in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(
- name, attribute, param=elem["param"][i]
- ),
- range(len(elem[attribute])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[attribute])
- detailed_results[name][attribute] = measures
-
- return {"by_name": detailed_results}
-
- def to_json(self):
- # TODO
- pass
-
-
def _add_trace_data_to_aggregate(aggregate, key, element):
# Only cares about element['isa'], element['offline_aggregates'], and
# element['plan']['level']
@@ -1867,531 +1221,6 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]):
return by_name, parameter_names, arg_count
-class PTAModel:
- u"""
- Parameter-aware PTA-based energy model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- The model heavily relies on two internal data structures:
- PTAModel.by_name and PTAModel.by_param.
-
- These provide measurements aggregated by state/transition name
- and (for by_param) parameter values. Layout:
- dictionary with one key per state/transition ('send', 'TX', ...) or
- one key per state/transition and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
- For by_param, parameter values are ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - isa: 'state' or 'transition'
- - power: list of mean power measurements in µW
- - duration: list of durations in µs
- - power_std: list of stddev of power per state/transition
- - energy: consumed energy (power*duration) in pJ
- - paramkeys: list of parameter names in each measurement (-> list of lists)
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- additionally, only if isa == 'transition':
- - timeout: list of duration of previous state in µs
- - rel_energy_prev: transition energy relative to previous state mean power in pJ
- - rel_energy_next: transition energy relative to next state mean power in pJ
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count,
- traces=[],
- ignore_trace_indexes=[],
- discard_outliers=None,
- function_override={},
- use_corrcoef=False,
- pta=None,
- ):
- """
- Prepare a new PTA energy model.
-
- Actual model generation is done on-demand by calling the respective functions.
-
- arguments:
- by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate.
- parameters -- list of parameter names, as returned by pta_trace_to_aggregate
- arg_count -- function arguments, as returned by pta_trace_to_aggregate
- traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data()
- ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored.
- discard_outliers -- currently not supported: threshold for outlier detection and removel (float).
- Outlier detection is performed individually for each state/transition in each trace,
- so it only works if the benchmark ran several times.
- Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace),
- "m" (the median of all attribute measurements with the same parameters, which may include data from other traces),
- a data point X is considered an outlier if
- | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers .
- function_override -- dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- use_corrcoef -- use correlation coefficient instead of stddev comparison
- to detect whether a model attribute depends on a parameter
- pta -- hardware model as `PTA` object
- """
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self._parameter_names = sorted(parameters)
- self._num_args = arg_count
- self._use_corrcoef = use_corrcoef
- self.traces = traces
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self._parameter_names,
- self._num_args,
- self._use_corrcoef,
- )
- self.cache = {}
- np.seterr("raise")
- self._outlier_threshold = discard_outliers
- self.function_override = function_override.copy()
- self.pta = pta
- self.ignore_trace_indexes = ignore_trace_indexes
- self._aggregate_to_ndarray(self.by_name)
-
- def _aggregate_to_ndarray(self, aggregate):
- for elem in aggregate.values():
- for key in elem["attributes"]:
- elem[key] = np.array(elem[key])
-
- # This heuristic is very similar to the "function is not much better than
- # median" checks in get_fitted. So far, doing it here as well is mostly
- # a performance and not an algorithm quality decision.
- # --df, 2018-04-18
- def depends_on_param(self, state_or_trans, key, param):
- return self.stats.depends_on_param(state_or_trans, key, param)
-
- # See notes on depends_on_param
- def depends_on_arg(self, state_or_trans, key, param):
- return self.stats.depends_on_arg(state_or_trans, key, param)
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- logger.warning("Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
- return model
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling, unless `use_mean` is set.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def param_index(self, param_name):
- if param_name in self._parameter_names:
- return self._parameter_names.index(param_name)
- return len(self._parameter_names) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self._parameter_names):
- return self._parameter_names[param_index]
- return str(param_index)
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get parameter-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict(
- [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
- )
- paramfit = ParallelParamFit(self.by_param)
- for state_or_tran in self.by_name.keys():
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = {}
- for parameter_index, parameter_name in enumerate(self._parameter_names):
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- )
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- codependent_param_dict,
- )
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- for arg_index in range(self._num_args[state_or_tran]):
- if self.depends_on_arg(
- state_or_tran, model_attribute, arg_index
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- len(self._parameter_names) + arg_index,
- arg_index,
- safe_functions_enabled,
- )
- paramfit.fit()
-
- for state_or_tran in self.by_name.keys():
- num_args = 0
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- num_args = self._num_args[state_or_tran]
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = paramfit.get_result(state_or_tran, model_attribute)
-
- for parameter_name in self._parameter_names:
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- pass
- # FIXME paramfit.get_result hat ja gar keinen Parameter als Argument...
-
- if (state_or_tran, model_attribute) in self.function_override:
- function_str = self.function_override[
- (state_or_tran, model_attribute)
- ]
- x = AnalyticFunction(function_str, self._parameter_names, num_args)
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
- elif len(fit_results.keys()):
- x = analytic.function_powerset(
- fit_results, self._parameter_names, num_args
- )
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def to_json(self):
- static_model = self.get_static()
- static_quality = self.assess(static_model)
- param_model, param_info = self.get_fitted()
- analytic_quality = self.assess(param_model)
- self.pta.update(
- static_model,
- param_info,
- static_error=static_quality["by_name"],
- analytic_error=analytic_quality["by_name"],
- )
- return self.pta.to_json()
-
- def states(self):
- """Return sorted list of state names."""
- return sorted(
- list(
- filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
- )
- )
-
- def transitions(self):
- """Return sorted list of transition names."""
- return sorted(
- list(
- filter(
- lambda k: self.by_name[k]["isa"] == "transition",
- self.by_name.keys(),
- )
- )
- )
-
- def states_and_transitions(self):
- """Return list of states and transition names."""
- ret = self.states()
- ret.extend(self.transitions())
- return ret
-
- def parameters(self):
- return self._parameter_names
-
- def attributes(self, state_or_trans):
- return self.by_name[state_or_trans]["attributes"]
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this PTAModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for key in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(name, key, param=elem["param"][i]),
- range(len(elem[key])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[key])
- detailed_results[name][key] = measures
-
- return {"by_name": detailed_results}
-
- def assess_states(
- self, model_function, model_attribute="power", distribution: dict = None
- ):
- """
- Calculate overall model error assuming equal distribution of states
- """
- # TODO calculate mean power draw for distribution and use it to
- # calculate relative error from MAE combination
- model_quality = self.assess(model_function)
- num_states = len(self.states())
- if distribution is None:
- distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
-
- if not np.isclose(sum(distribution.values()), 1):
- raise ValueError(
- "distribution must be a probability distribution with sum 1"
- )
-
- # total_value = None
- # try:
- # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states()))
- # except KeyError:
- # pass
-
- total_error = np.sqrt(
- sum(
- map(
- lambda x: np.square(
- model_quality["by_name"][x][model_attribute]["mae"]
- * distribution[x]
- ),
- self.states(),
- )
- )
- )
- return total_error
-
- def assess_on_traces(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance.
-
- :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`.
- Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the
- traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states
- """
- model_energy_list = []
- real_energy_list = []
- model_rel_energy_list = []
- model_state_energy_list = []
- model_duration_list = []
- real_duration_list = []
- model_timeout_list = []
- real_timeout_list = []
-
- for trace in self.traces:
- if trace["id"] not in self.ignore_trace_indexes:
- for rep_id in range(len(trace["trace"][0]["offline"])):
- model_energy = 0.0
- real_energy = 0.0
- model_rel_energy = 0.0
- model_state_energy = 0.0
- model_duration = 0.0
- real_duration = 0.0
- model_timeout = 0.0
- real_timeout = 0.0
- for i, trace_part in enumerate(trace["trace"]):
- name = trace_part["name"]
- prev_name = trace["trace"][i - 1]["name"]
- isa = trace_part["isa"]
- if name != "UNINITIALIZED":
- try:
- param = trace_part["offline_aggregates"]["param"][
- rep_id
- ]
- prev_param = trace["trace"][i - 1][
- "offline_aggregates"
- ]["param"][rep_id]
- power = trace_part["offline"][rep_id]["uW_mean"]
- duration = trace_part["offline"][rep_id]["us"]
- prev_duration = trace["trace"][i - 1]["offline"][
- rep_id
- ]["us"]
- real_energy += power * duration
- if isa == "state":
- model_energy += (
- model_function(name, "power", param=param)
- * duration
- )
- else:
- model_energy += model_function(
- name, "energy", param=param
- )
- # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
- if i == 1:
- model_rel_energy += model_function(
- name, "energy", param=param
- )
- else:
- model_rel_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_state_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_rel_energy += model_function(
- name, "rel_energy_prev", param=param
- )
- real_duration += duration
- model_duration += model_function(
- name, "duration", param=param
- )
- if (
- "plan" in trace_part
- and trace_part["plan"]["level"] == "epilogue"
- ):
- real_timeout += trace_part["offline"][rep_id][
- "timeout"
- ]
- model_timeout += model_function(
- name, "timeout", param=param
- )
- except KeyError:
- # if states/transitions have been removed via --filter-param, this is harmless
- pass
- real_energy_list.append(real_energy)
- model_energy_list.append(model_energy)
- model_rel_energy_list.append(model_rel_energy)
- model_state_energy_list.append(model_state_energy)
- real_duration_list.append(real_duration)
- model_duration_list.append(model_duration)
- real_timeout_list.append(real_timeout)
- model_timeout_list.append(model_timeout)
-
- return {
- "duration_by_trace": regression_measures(
- np.array(model_duration_list), np.array(real_duration_list)
- ),
- "energy_by_trace": regression_measures(
- np.array(model_energy_list), np.array(real_energy_list)
- ),
- "timeout_by_trace": regression_measures(
- np.array(model_timeout_list), np.array(real_timeout_list)
- ),
- "rel_energy_by_trace": regression_measures(
- np.array(model_rel_energy_list), np.array(real_energy_list)
- ),
- "state_energy_by_trace": regression_measures(
- np.array(model_state_energy_list), np.array(real_energy_list)
- ),
- }
-
-
class EnergyTraceLog:
"""
EnergyTrace log loader for DFA traces.