diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2021-02-18 12:07:52 +0100 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2021-02-18 12:07:52 +0100 |
commit | c68c4a2bc617dd1356d5d0d2c3ee0ff9754261ab (patch) | |
tree | 20446fcd7ed1f8f4a88d112ef5f8573debf0ccd6 /lib | |
parent | 45310b5f95dba00b1b6e2191309961c98ba9980c (diff) |
refactor model generation from Analytic/PTAModel into ModelAttribute class
Iteration over states/transitions and model attributes is no longer hardcoded
into most model generation code. This should make support for decision trees
and sub-states much easier.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/functions.py | 57 | ||||
-rw-r--r-- | lib/model.py | 519 | ||||
-rw-r--r-- | lib/parameters.py | 386 | ||||
-rw-r--r-- | lib/utils.py | 10 |
4 files changed, 395 insertions, 577 deletions
diff --git a/lib/functions.py b/lib/functions.py index 9d799c7..0bdea45 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -222,21 +222,18 @@ class AnalyticFunction: else: self.model_args = [] - def get_fit_data(self, by_param, state_or_tran, model_attribute): + def get_fit_data(self, by_param): """ Return training data suitable for scipy.optimize.least_squares. - :param by_param: measurement data, partitioned by state/transition name and parameter/arg values. - This function only uses by_param[(state_or_tran, *)][model_attribute], - which must be a list or 1-D NumPy array containing the ground truth. - The parameter values in (state_or_tran, *) must be numeric for + :param by_param: measurement data, partitioned by parameter/arg values. + by_param[*] must be a list or 1-D NumPy array containing the ground truth. + The parameter values (dict keys) must be numeric for all parameters this function depends on -- otherwise, the corresponding data will be left out. Parameter values must be ordered according to the order of parameter names used in the ParamFunction constructor. Argument values (if any) always come after parameters, in the order of their index in the function signature. - :param state_or_tran: state or transition name, e.g. "TX" or "send" - :param model_attribute: model attribute name, e.g. "power" or "duration" :return: (X, Y, num_valid, num_total): X -- 2-D NumPy array of parameter combinations (model input). @@ -255,48 +252,44 @@ class AnalyticFunction: num_total = 0 for key, val in by_param.items(): - if key[0] == state_or_tran and len(key[1]) == dimension: + if len(key) == dimension: valid = True num_total += 1 for i in range(dimension): - if self._dependson[i] and not is_numeric(key[1][i]): + if self._dependson[i] and not is_numeric(key[i]): valid = False if valid: num_valid += 1 - Y.extend(val[model_attribute]) + Y.extend(val) for i in range(dimension): if self._dependson[i]: - X[i].extend([float(key[1][i])] * len(val[model_attribute])) + X[i].extend([float(key[i])] * len(val)) else: - X[i].extend([np.nan] * len(val[model_attribute])) - elif key[0] == state_or_tran and len(key[1]) != dimension: + X[i].extend([np.nan] * len(val)) + else: logger.warning( - "Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.".format( - state_or_tran, model_attribute, len(key[1]), dimension - ), + "Invalid parameter key length while gathering fit data. is {}, want {}.".format( + len(key), dimension + ) ) X = np.array(X) Y = np.array(Y) return X, Y, num_valid, num_total - def fit(self, by_param, state_or_tran, model_attribute): + def fit(self, by_param): """ Fit the function on measurements via least squares regression. - :param by_param: measurement data, partitioned by state/transition name and parameter/arg values - :param state_or_tran: state or transition name, e.g. "TX" or "send" - :param model_attribute: model attribute name, e.g. "power" or "duration" + :param by_param: measurement data, partitioned by parameter/arg values - The ground truth is read from by_param[(state_or_tran, *)][model_attribute], + The ground truth is read from by_param[*], which must be a list or 1-D NumPy array. Parameter values must be ordered according to the parameter names in the constructor. If argument values are present, they must come after parameter values in the order of their appearance in the function signature. """ - X, Y, num_valid, num_total = self.get_fit_data( - by_param, state_or_tran, model_attribute - ) + X, Y, num_valid, num_total = self.get_fit_data(by_param) if num_valid > 2: error_function = lambda P, X, y: self._function(P, X) - y try: @@ -304,27 +297,17 @@ class AnalyticFunction: error_function, self.model_args, args=(X, Y), xtol=2e-15 ) except ValueError as err: - logger.warning( - "Fit failed for {}/{}: {} (function: {})".format( - state_or_tran, model_attribute, err, self.model_function - ), - ) + logger.warning(f"Fit failed: {err} (function: {self.model_function})") return if res.status > 0: self.model_args = res.x self.fit_success = True else: logger.warning( - "Fit failed for {}/{}: {} (function: {})".format( - state_or_tran, model_attribute, res.message, self.model_function - ), + f"Fit failed: {res.message} (function: {self.model_function})" ) else: - logger.warning( - "Insufficient amount of valid parameter keys, cannot fit {}/{}".format( - state_or_tran, model_attribute - ), - ) + logger.warning("Insufficient amount of valid parameter keys, cannot fit") def is_predictable(self, param_list): """ diff --git a/lib/model.py b/lib/model.py index 518566c..ce73a02 100644 --- a/lib/model.py +++ b/lib/model.py @@ -8,9 +8,14 @@ from multiprocessing import Pool from .automata import PTA from .functions import analytic from .functions import AnalyticFunction -from .parameters import ParamStats +from .parameters import ParallelParamStats from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple -from .utils import by_name_to_by_param, by_param_to_by_name, match_parameter_values +from .utils import ( + by_name_to_by_param, + by_param_to_by_name, + match_parameter_values, + partition_by_param, +) logger = logging.getLogger(__name__) arg_support_enabled = True @@ -95,38 +100,21 @@ class ParallelParamFit: function type for each parameter. """ - def __init__(self, by_param): + def __init__(self): """Create a new ParallelParamFit object.""" - self.fit_queue = [] - self.by_param = by_param + self.fit_queue = list() - def enqueue( - self, - state_or_tran, - attribute, - param_index, - param_name, - safe_functions_enabled=False, - param_filter=None, - ): + def enqueue(self, key, args): """ Add state_or_tran/attribute/param_name to fit queue. This causes fit() to compute the best-fitting function for this model part. + + :param key: (state/transition name, model attribute, parameter name) + :param args: [by_param, param_index, safe_functions_enabled, param_filter] + by_param[(param 1, param2, ...)] holds measurements. """ - # Transform by_param[(state_or_tran, param_value)][attribute] = ... - # into n_by_param[param_value] = ... - # (param_value is dynamic, the rest is fixed) - n_by_param = dict() - for k, v in self.by_param.items(): - if k[0] == state_or_tran: - n_by_param[k[1]] = v[attribute] - self.fit_queue.append( - { - "key": [state_or_tran, attribute, param_name, param_filter], - "args": [n_by_param, param_index, safe_functions_enabled, param_filter], - } - ) + self.fit_queue.append({"key": key, "args": args}) def fit(self): """ @@ -139,14 +127,12 @@ class ParallelParamFit: with Pool() as pool: self.results = pool.map(_try_fits_parallel, self.fit_queue) - def get_result(self, name, attribute, param_filter: dict = None): + def get_result(self, name, attr): """ - Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'. + Parse and sanitize fit results. Filters out results where the best function is worse (or not much better than) static mean/median estimates. - :param name: state/transition/... name, e.g. 'TX' - :param attribute: model attribute, e.g. 'duration' :param param_filter: :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } """ @@ -154,8 +140,7 @@ class ParallelParamFit: for result in self.results: if ( result["key"][0] == name - and result["key"][1] == attribute - and result["key"][3] == param_filter + and result["key"][1] == attr and result["result"]["best"] is not None ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? this_result = result["result"] @@ -163,9 +148,7 @@ class ParallelParamFit: this_result["mean_rmsd"], this_result["median_rmsd"] ): logger.debug( - "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( - name, - attribute, + "Not modeling as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( result["key"][2], this_result["best_rmsd"], this_result["mean_rmsd"], @@ -177,9 +160,7 @@ class ParallelParamFit: this_result["mean_rmsd"], this_result["median_rmsd"] ): logger.debug( - "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( - name, - attribute, + "Not modeling as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( result["key"][2], this_result["best_rmsd"], this_result["mean_rmsd"], @@ -360,6 +341,81 @@ def _num_args_from_by_name(by_name): return num_args +class ModelAttribute: + def __init__(self, name, attr, data, param_values, param_names, arg_count=0): + self.name = name + self.attr = attr + self.data = data + self.param_values = param_values + self.param_names = sorted(param_names) + self.arg_count = arg_count + self.by_param = None # set via ParallelParamStats + self.function_override = None + self.param_model = None + + def get_static(self, use_mean=False): + if use_mean: + return np.mean(self.data) + return np.median(self.data) + + def get_lut(self, param, use_mean=False): + if use_mean: + return np.mean(self.by_param[param]) + return np.median(self.by_param[param]) + + def get_data_for_paramfit(self, safe_functions_enabled=False): + ret = list() + for param_index, param_name in enumerate(self.param_names): + if self.stats.depends_on_param(param_name): + ret.append( + (param_name, (self.by_param, param_index, safe_functions_enabled)) + ) + if self.arg_count: + for arg_index in range(self.arg_count): + if self.stats.depends_on_arg(arg_index): + ret.append( + ( + arg_index, + ( + self.by_param, + len(self.param_names) + arg_index, + safe_functions_enabled, + ), + ) + ) + return ret + + def set_data_from_paramfit(self, fit_result): + param_model = (None, None) + if self.function_override is not None: + function_str = self.function_override + x = AnalyticFunction(function_str, self.param_names, self.arg_count) + x.fit(self.by_param) + if x.fit_success: + param_model = (x, fit_result) + elif len(fit_result.keys()): + x = analytic.function_powerset(fit_result, self.param_names, self.arg_count) + x.fit(self.by_param) + + if x.fit_success: + param_model = (x, fit_result) + + self.param_model = param_model + + def get_fitted(self): + """ + Get paramete-aware model function and model information function. + They must have been set via get_data_for_paramfit -> ParallelParamFit -> set-data_from_paramfit first. + + Returns a tuple (function, info): + function -> AnalyticFunction for model. function(param=parameter values) -> model value. + info -> {'fit_result' : ..., 'function' : ... } + + Returns (None, None) if fitting failed. Returns None if ParamFit has not been performed yet. + """ + return self.param_model + + class AnalyticModel: """ Parameter-aware analytic energy/data size/... model. @@ -447,8 +503,8 @@ class AnalyticModel: :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter """ self.cache = dict() - self.by_name = by_name - self.by_param = by_name_to_by_param(by_name) + self.by_name = by_name # no longer required? + self.attr_by_name = dict() self.names = sorted(by_name.keys()) self.parameters = sorted(parameters) self.function_override = function_override.copy() @@ -457,26 +513,33 @@ class AnalyticModel: if self._num_args is None: self._num_args = _num_args_from_by_name(by_name) - self.stats = ParamStats( - self.by_name, - self.by_param, - self.parameters, - self._num_args, - use_corrcoef=use_corrcoef, - ) + self.fit_done = False - def _get_model_from_dict(self, model_dict, model_function): - model = {} - for name, elem in model_dict.items(): - model[name] = {} - for key in elem["attributes"]: - try: - model[name][key] = model_function(elem[key]) - except RuntimeWarning: - logger.warning("Got no data for {} {}".format(name, key)) - except FloatingPointError as fpe: - logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) - return model + self._compute_stats(by_name) + + def _compute_stats(self, by_name): + paramstats = ParallelParamStats() + + for name, data in by_name.items(): + self.attr_by_name[name] = dict() + for attr in data["attributes"]: + model_attr = ModelAttribute( + name, + attr, + data[attr], + data["param"], + self.parameters, + self._num_args.get(name, 0), + ) + self.attr_by_name[name][attr] = model_attr + paramstats.enqueue((name, attr), model_attr) + if (name, attr) in self.function_override: + model_attr.function_override = self.function_override[(name, attr)] + + paramstats.compute() + + def attributes(self, name): + return self.attr_by_name[name].keys() def param_index(self, param_name): if param_name in self.parameters: @@ -492,21 +555,20 @@ class AnalyticModel: """ Get static model function: name, attribute -> model value. - Uses the median of by_name for modeling. + Uses the median of by_name for modeling, unless `use_mean` is set. """ - getter_function = np.median - - if use_mean: - getter_function = np.mean - - static_model = self._get_model_from_dict(self.by_name, getter_function) + model = dict() + for name, attr in self.attr_by_name.items(): + model[name] = dict() + for k, v in attr.items(): + model[name][k] = v.get_static(use_mean=use_mean) def static_model_getter(name, key, **kwargs): - return static_model[name][key] + return model[name][key] return static_model_getter - def get_param_lut(self, fallback=False): + def get_param_lut(self, use_mean=False, fallback=False): """ Get parameter-look-up-table model function: name, attribute, parameter values -> model value. @@ -516,13 +578,22 @@ class AnalyticModel: arguments: fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values """ - static_model = self._get_model_from_dict(self.by_name, np.median) - lut_model = self._get_model_from_dict(self.by_param, np.median) - - def lut_median_getter(name, key, param, arg=[], **kwargs): + static_model = dict() + lut_model = dict() + for name, attr in self.attr_by_name.items(): + static_model[name] = dict() + lut_model[name] = dict() + for k, v in attr.items(): + static_model[name][k] = v.get_static(use_mean=use_mean) + lut_model[name][k] = dict() + for param, model_value in v.by_param.items(): + lut_model[name][k][param] = v.get_lut(param, use_mean=use_mean) + + def lut_median_getter(name, key, param, arg=list(), **kwargs): param.extend(map(soft_cast_int, arg)) + param = tuple(param) try: - return lut_model[(name, tuple(param))][key] + return lut_model[name][key][param] except KeyError: if fallback: return static_model[name][key] @@ -530,84 +601,67 @@ class AnalyticModel: return lut_median_getter - def get_fitted(self, safe_functions_enabled=False): + def get_fitted(self, use_mean=False, safe_functions_enabled=False): """ - Get paramete-aware model function and model information function. + Get parameter-aware model function and model information function. Returns two functions: model_function(name, attribute, param=parameter values) -> model value. model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None """ - if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: - return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] - - static_model = self._get_model_from_dict(self.by_name, np.median) - param_model = dict([[name, {}] for name in self.by_name.keys()]) - paramfit = ParallelParamFit(self.by_param) - - for name in self.by_name.keys(): - for attribute in self.by_name[name]["attributes"]: - for param_index, param in enumerate(self.parameters): - if self.stats.depends_on_param(name, attribute, param): - paramfit.enqueue(name, attribute, param_index, param, False) - if arg_support_enabled and name in self._num_args: - for arg_index in range(self._num_args[name]): - if self.stats.depends_on_arg(name, attribute, arg_index): - paramfit.enqueue( - name, - attribute, - len(self.parameters) + arg_index, - arg_index, - False, - ) - paramfit.fit() - - for name in self.by_name.keys(): - num_args = 0 - if name in self._num_args: - num_args = self._num_args[name] - for attribute in self.by_name[name]["attributes"]: - fit_result = paramfit.get_result(name, attribute) - - if (name, attribute) in self.function_override: - function_str = self.function_override[(name, attribute)] - x = AnalyticFunction(function_str, self.parameters, num_args) - x.fit(self.by_param, name, attribute) - if x.fit_success: - param_model[name][attribute] = { - "fit_result": fit_result, - "function": x, - } - elif len(fit_result.keys()): - x = analytic.function_powerset( - fit_result, self.parameters, num_args + if not self.fit_done: + + paramfit = ParallelParamFit() + + for name in self.names: + for attr in self.attr_by_name[name].keys(): + for key, args in self.attr_by_name[name][ + attr + ].get_data_for_paramfit( + safe_functions_enabled=safe_functions_enabled + ): + key = (name, attr, key) + paramfit.enqueue(key, args) + + paramfit.fit() + + for name in self.names: + for attr in self.attr_by_name[name].keys(): + self.attr_by_name[name][attr].set_data_from_paramfit( + paramfit.get_result(name, attr) ) - x.fit(self.by_param, name, attribute) - if x.fit_success: - param_model[name][attribute] = { - "fit_result": fit_result, - "function": x, - } + self.fit_done = True + + static_model = dict() + for name, attr in self.attr_by_name.items(): + static_model[name] = dict() + for k, v in attr.items(): + static_model[name][k] = v.get_static(use_mean=use_mean) def model_getter(name, key, **kwargs): + param_function, _ = self.attr_by_name[name][key].get_fitted() + + if param_function is None: + return static_model[name][key] + if "arg" in kwargs and "param" in kwargs: kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) - if key in param_model[name]: - param_list = kwargs["param"] - param_function = param_model[name][key]["function"] - if param_function.is_predictable(param_list): - return param_function.eval(param_list) + + if param_function.is_predictable(kwargs["param"]): + return param_function.eval(kwargs["param"]) + return static_model[name][key] def info_getter(name, key): - if key in param_model[name]: - return param_model[name][key] - return None - - self.cache["fitted_model_getter"] = model_getter - self.cache["fitted_info_getter"] = info_getter + try: + model_function, fit_result = self.attr_by_name[name][key].get_fitted() + except KeyError: + return None + if model_function is None: + return None + return {"function": model_function, "fit_result": fit_result} return model_getter, info_getter @@ -625,20 +679,22 @@ class AnalyticModel: overfitting cannot be detected. """ detailed_results = {} - for name, elem in sorted(self.by_name.items()): + for name in self.names: detailed_results[name] = {} - for attribute in elem["attributes"]: + for attribute in self.attr_by_name[name].keys(): + data = self.attr_by_name[name][attribute].data + param_values = self.attr_by_name[name][attribute].param_values predicted_data = np.array( list( map( lambda i: model_function( - name, attribute, param=elem["param"][i] + name, attribute, param=param_values[i] ), - range(len(elem[attribute])), + range(len(data)), ) ) ) - measures = regression_measures(predicted_data, elem[attribute]) + measures = regression_measures(predicted_data, data) detailed_results[name][attribute] = measures return {"by_name": detailed_results} @@ -648,7 +704,7 @@ class AnalyticModel: pass -class PTAModel: +class PTAModel(AnalyticModel): """ Parameter-aware PTA-based energy model. @@ -718,11 +774,18 @@ class PTAModel: pelt -- perform sub-state detection via PELT and model sub-states as well. Requires traces to be set. """ self.by_name = by_name + self.attr_by_name = dict() self.by_param = by_name_to_by_param(by_name) + self.names = sorted(by_name.keys()) self._parameter_names = sorted(parameters) + self.parameters = sorted(parameters) self._num_args = arg_count self._use_corrcoef = use_corrcoef self.traces = traces + self.function_override = function_override.copy() + + self.fit_done = False + if traces is not None and pelt is not None: from .pelt import PELT @@ -730,19 +793,14 @@ class PTAModel: self.find_substates() else: self.pelt = None - self.stats = ParamStats( - self.by_name, - self.by_param, - self._parameter_names, - self._num_args, - self._use_corrcoef, - ) - self.cache = {} + + self._aggregate_to_ndarray(self.by_name) + + self._compute_stats(by_name) + np.seterr("raise") - self.function_override = function_override.copy() self.pta = pta self.ignore_trace_indexes = ignore_trace_indexes - self._aggregate_to_ndarray(self.by_name) def _aggregate_to_ndarray(self, aggregate): for elem in aggregate.values(): @@ -773,157 +831,6 @@ class PTAModel: logger.warning("Got no data for {} {}: {}".format(name, key, fpe)) return model - def get_static(self, use_mean=False): - """ - Get static model function: name, attribute -> model value. - - Uses the median of by_name for modeling, unless `use_mean` is set. - """ - getter_function = np.median - - if use_mean: - getter_function = np.mean - - static_model = self._get_model_from_dict(self.by_name, getter_function) - - def static_model_getter(name, key, **kwargs): - return static_model[name][key] - - return static_model_getter - - def get_param_lut(self, fallback=False): - """ - Get parameter-look-up-table model function: name, attribute, parameter values -> model value. - - The function can only give model values for parameter combinations - present in by_param. By default, it raises KeyError for other values. - - arguments: - fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values - """ - static_model = self._get_model_from_dict(self.by_name, np.median) - lut_model = self._get_model_from_dict(self.by_param, np.median) - - def lut_median_getter(name, key, param, arg=[], **kwargs): - param.extend(map(soft_cast_int, arg)) - try: - return lut_model[(name, tuple(param))][key] - except KeyError: - if fallback: - return static_model[name][key] - raise - - return lut_median_getter - - def param_index(self, param_name): - if param_name in self._parameter_names: - return self._parameter_names.index(param_name) - return len(self._parameter_names) + int(param_name) - - def param_name(self, param_index): - if param_index < len(self._parameter_names): - return self._parameter_names[param_index] - return str(param_index) - - def get_fitted(self, safe_functions_enabled=False): - """ - Get parameter-aware model function and model information function. - - Returns two functions: - model_function(name, attribute, param=parameter values) -> model value. - model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None - """ - if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache: - return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"] - - static_model = self._get_model_from_dict(self.by_name, np.median) - param_model = dict( - [[state_or_tran, {}] for state_or_tran in self.by_name.keys()] - ) - paramfit = ParallelParamFit(self.by_param) - for state_or_tran in self.by_name.keys(): - for model_attribute in self.by_name[state_or_tran]["attributes"]: - fit_results = {} - for parameter_index, parameter_name in enumerate(self._parameter_names): - if self.depends_on_param( - state_or_tran, model_attribute, parameter_name - ): - paramfit.enqueue( - state_or_tran, - model_attribute, - parameter_index, - parameter_name, - safe_functions_enabled, - ) - if ( - arg_support_enabled - and self.by_name[state_or_tran]["isa"] == "transition" - ): - for arg_index in range(self._num_args[state_or_tran]): - if self.depends_on_arg( - state_or_tran, model_attribute, arg_index - ): - paramfit.enqueue( - state_or_tran, - model_attribute, - len(self._parameter_names) + arg_index, - arg_index, - safe_functions_enabled, - ) - paramfit.fit() - - for state_or_tran in self.by_name.keys(): - num_args = 0 - if ( - arg_support_enabled - and self.by_name[state_or_tran]["isa"] == "transition" - ): - num_args = self._num_args[state_or_tran] - for model_attribute in self.by_name[state_or_tran]["attributes"]: - fit_results = paramfit.get_result(state_or_tran, model_attribute) - - if (state_or_tran, model_attribute) in self.function_override: - function_str = self.function_override[ - (state_or_tran, model_attribute) - ] - x = AnalyticFunction(function_str, self._parameter_names, num_args) - x.fit(self.by_param, state_or_tran, model_attribute) - if x.fit_success: - param_model[state_or_tran][model_attribute] = { - "fit_result": fit_results, - "function": x, - } - elif len(fit_results.keys()): - x = analytic.function_powerset( - fit_results, self._parameter_names, num_args - ) - x.fit(self.by_param, state_or_tran, model_attribute) - if x.fit_success: - param_model[state_or_tran][model_attribute] = { - "fit_result": fit_results, - "function": x, - } - - def model_getter(name, key, **kwargs): - if "arg" in kwargs and "param" in kwargs: - kwargs["param"].extend(map(soft_cast_int, kwargs["arg"])) - if key in param_model[name]: - param_list = kwargs["param"] - param_function = param_model[name][key]["function"] - if param_function.is_predictable(param_list): - return param_function.eval(param_list) - return static_model[name][key] - - def info_getter(name, key): - if key in param_model[name]: - return param_model[name][key] - return None - - self.cache["fitted_model_getter"] = model_getter - self.cache["fitted_info_getter"] = info_getter - - return model_getter, info_getter - def pelt_refine(self, by_param_key): logger.debug(f"PELT: {by_param_key} needs refinement") @@ -1112,12 +1019,6 @@ class PTAModel: ret.extend(self.transitions()) return ret - def parameters(self): - return self._parameter_names - - def attributes(self, state_or_trans): - return self.by_name[state_or_trans]["attributes"] - def assess(self, model_function, ref=None): """ Calculate MAE, SMAPE, etc. of model_function for each by_name entry. diff --git a/lib/parameters.py b/lib/parameters.py index bad39dd..e8347a3 100644 --- a/lib/parameters.py +++ b/lib/parameters.py @@ -6,16 +6,16 @@ from collections import OrderedDict from copy import deepcopy from multiprocessing import Pool from .utils import remove_index_from_tuple, is_numeric -from .utils import filter_aggregate_by_param, by_name_to_by_param +from .utils import filter_aggregate_by_param, partition_by_param logger = logging.getLogger(__name__) -def distinct_param_values(by_name, state_or_tran): +def distinct_param_values(param_tuples): """ - Return the distinct values of each parameter in by_name[state_or_tran]. + Return the distinct values of each parameter in param_tuples. - E.g. if by_name[state_or_tran]['param'] contains the distinct entries (1, 1), (1, 2), (1, 3), (0, 3), + E.g. if param_tuples contains the distinct entries (1, 1), (1, 2), (1, 3), (0, 3), this function returns [[1, 0], [1, 2, 3]]. Note that this function deliberately also consider None @@ -25,10 +25,8 @@ def distinct_param_values(by_name, state_or_tran): write() or similar has not been called yet. Other parameters should always be initialized when leaving UNINITIALIZED. """ - distinct_values = [ - OrderedDict() for i in range(len(by_name[state_or_tran]["param"][0])) - ] - for param_tuple in by_name[state_or_tran]["param"]: + distinct_values = [OrderedDict() for i in range(len(param_tuples[0]))] + for param_tuple in param_tuples: for i in range(len(param_tuple)): distinct_values[i][param_tuple[i]] = True @@ -82,17 +80,15 @@ def _reduce_param_matrix(matrix: np.ndarray, parameter_names: list) -> list: return list() -def _std_by_param(n_by_param, all_param_values, state_or_tran, attribute, param_index): +def _std_by_param(n_by_param, all_param_values, param_index): u""" Calculate standard deviations for a static model where all parameters but `param_index` are constant. :param n_by_param: measurements of a specific model attribute partitioned by parameter values. Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}` - :param all_param_values: distinct values of each parameter in `state_or_tran`. + :param all_param_values: distinct values of each parameter. E.g. for two parameters, the first being None, FOO, or BAR, and the second being 1, 2, 3, or 4, the argument is `[[None, 'FOO', 'BAR'], [1, 2, 3, 4]]`. - :param state_or_tran: state or transition name for debugging - :param attribute: model attribute for debugging, e.g. 'power' or 'duration' :param param_index: index of variable parameter :returns: (stddev matrix, mean stddev, LUT matrix) *stddev matrix* is an ((number of parameters)-1)-dimensional matrix giving the standard deviation of each individual parameter variation partition. @@ -100,8 +96,7 @@ def _std_by_param(n_by_param, all_param_values, state_or_tran, attribute, param_ measurements with param0 == all_param_values[0][a], param1 == all_param_values[1][b], param2 variable, and param3 == all_param_values[3][d]. - *mean stddev* is the mean standard deviation of all measurements of `attribute` - for `state_or_tran` where parameter `param_index` is dynamic and all other parameters are fixed. + *mean stddev* is the mean standard deviation of all measurements where parameter `param_index` is dynamic and all other parameters are fixed. E.g., if parameters are a, b, c ∈ {1,2,3} and 'index' corresponds to b, then this function returns the mean of the standard deviations of (a=1, b=*, c=1), (a=1, b=*, c=2), and so on. @@ -146,8 +141,8 @@ def _std_by_param(n_by_param, all_param_values, state_or_tran, attribute, param_ if np.all(np.isnan(stddev_matrix)): warnings.warn( - "{}/{} parameter #{} has no data partitions. stddev_matrix = {}".format( - state_or_tran, attribute, param_index, stddev_matrix + "parameter #{} has no data partitions. stddev_matrix = {}".format( + param_index, stddev_matrix ) ) return stddev_matrix, 0.0 @@ -167,9 +162,9 @@ def _corr_by_param(attribute_data, param_values, param_index): If any value of `param_index` is not numeric (i.e., can not be parsed as float), this function returns 0. - :param attribute_data: list or 1-D numpy array taken from by_name[state_or_trans][attribute]. - :param param_values: list of parameter values taken from by_name[state_or_trans]["param"]. - :param param_index: index of parameter in `by_name[state_or_trans]['param']` + :param attribute_data: list or 1-D numpy array of measurements + :param param_values: list of parameter values + :param param_index: index of parameter in `by_name[*]['param']` """ if _all_params_are_numeric(param_values, param_index): param_values = np.array(list((map(lambda x: x[param_index], param_values)))) @@ -195,121 +190,113 @@ def _corr_by_param(attribute_data, param_values, param_index): def _compute_param_statistics( - attribute_data, - param_values, - n_by_param, - parameter_names, - arg_count, - state_or_trans, - attribute, - distinct_values, - distinct_values_by_param_index, + data, param_names, param_tuples, arg_count=None, use_corrcoef=False ): """ - Compute standard deviation and correlation coefficient for various data partitions. + Compute standard deviation and correlation coefficient on parameterized data partitions. - It is strongly recommended to vary all parameter values evenly across partitions. + It is strongly recommended to vary all parameter values evenly. For instance, given two parameters, providing only the combinations (1, 1), (5, 1), (7, 1,) (10, 1), (1, 2), (1, 6) will lead to bogus results. It is better to provide (1, 1), (5, 1), (1, 2), (5, 2), ... (i.e. a cross product of all individual parameter values) - :param attribute_data: list or 1-D numpy array taken from by_name[state_or_trans][attribute] - (ground truth partitioned by state/transition name). - :param param_values: list of parameter values - corresponding to the ground truth, e.g. [[1, 2, 3], ...] if the - first ground truth element has the (lexically) first parameter set to 1, - the second to 2 and the third to 3. Taken from by_name[state_or_trans]["param"]. - :param n_by_param: measurements of a specific model attribute partitioned by parameter values. - Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}` - :param parameter_names: list of parameter names, must have the same order as the parameter - values in by_param (lexical sorting is recommended). - :param arg_count: dict providing the number of functions args ("local parameters") for each function. - :param state_or_trans: state or transition name, e.g. 'send' or 'TX' - :param attribute: model attribute, e.g. 'power' or 'duration' + arguments: + data -- measurement data (ground truth). Must be a list or 1-D numpy array. + param_names -- list of parameter names + param_tuples -- list of parameter values corresponding to the order in param_names + arg_count -- dict providing the number of functions args ("local parameters") for each function. + use_corrcoef -- use correlation coefficient instead of stddev heuristic for parameter detection :returns: a dict with the following content: - std_static -- static parameter-unaware model error: stddev of by_name[state_or_trans][attribute] - std_param_lut -- static parameter-aware model error: mean stddev of n_by_param[*] + std_static -- static parameter-unaware model error: stddev of data + std_param_lut -- static parameter-aware model error: mean stddev of data[*] std_by_param -- static parameter-aware model error ignoring a single parameter. dictionary with one key per parameter. The value is the mean stddev of measurements where all other parameters are fixed and the parameter in question is variable. E.g. std_by_param['X'] is the mean stddev of n_by_param[(X=*, Y=..., Z=...)]. std_by_arg -- same, but ignoring a single function argument - Only set if state_or_trans appears in arg_count, empty dict otherwise. + Only set if arg_count is non-zero, empty list otherwise. corr_by_param -- correlation coefficient corr_by_arg -- same, but ignoring a single function argument - Only set if state_or_trans appears in arg_count, empty dict otherwise. + Only set if arg_count is non-zero, empty list otherwise. depends_on_param -- dict(parameter_name -> Bool). True if /attribute/ behaviour probably depends on /parameter_name/ depends_on_arg -- list(bool). Same, but for function arguments, if any. """ - ret = { - "std_static": np.std(attribute_data), - # TODO Gewichtung? Parameterkombinationen mit wenig verfügbaren Messdaten werden - # genau so behandelt wie welchemit vielen verfügbaren Messdaten, in - # std_static haben sie dagegen weniger Gewicht - "std_param_lut": np.mean([np.std(n_by_param[x]) for x in n_by_param.keys()]), - "std_by_param": {}, - "std_by_param_values": {}, - "lut_by_param_values": {}, - "std_by_arg": [], - "std_by_arg_values": [], - "lut_by_arg_values": [], - "corr_by_param": {}, - "corr_by_arg": [], - "depends_on_param": {}, - "depends_on_arg": [], - } + ret = dict() + + ret["by_param"] = by_param = partition_by_param(data, param_tuples) + + ret["use_corrcoef"] = use_corrcoef + ret["_parameter_names"] = param_names + + ret["distinct_values_by_param_index"] = distinct_param_values(param_tuples) + + ret["distinct_values_by_param_name"] = dict() + for i, param in enumerate(param_names): + ret["distinct_values_by_param_name"][param] = ret[ + "distinct_values_by_param_index" + ][i] + + ret["std_static"] = np.std(data) + # TODO Gewichtung? Parameterkombinationen mit wenig verfügbaren Messdaten werden + # genau so behandelt wie welchemit vielen verfügbaren Messdaten, in + # std_static haben sie dagegen weniger Gewicht + ret["std_param_lut"] = np.mean([np.std(v) for v in by_param.values()]) + + ret["std_by_param"] = dict() + ret["std_by_param_values"] = dict() + ret["lut_by_param_values"] = dict() + + ret["std_by_arg"] = list() + ret["std_by_arg_values"] = list() + ret["lut_by_arg_values"] = list() + + ret["corr_by_param"] = dict() + ret["corr_by_arg"] = list() + + ret["_depends_on_param"] = dict() + ret["_depends_on_arg"] = list() np.seterr("raise") - for param_idx, param in enumerate(parameter_names): + for param_idx, param in enumerate(param_names): std_matrix, mean_std, lut_matrix = _std_by_param( - n_by_param, - distinct_values_by_param_index, - state_or_trans, - attribute, - param_idx, + by_param, ret["distinct_values_by_param_index"], param_idx ) ret["std_by_param"][param] = mean_std ret["std_by_param_values"][param] = std_matrix ret["lut_by_param_values"][param] = lut_matrix - ret["corr_by_param"][param] = _corr_by_param( - attribute_data, param_values, param_idx - ) + ret["corr_by_param"][param] = _corr_by_param(data, param_tuples, param_idx) - ret["depends_on_param"][param] = _depends_on_param( + ret["_depends_on_param"][param] = _depends_on_param( ret["corr_by_param"][param], ret["std_by_param"][param], ret["std_param_lut"], ) - if state_or_trans in arg_count: - for arg_index in range(arg_count[state_or_trans]): + if arg_count: + for arg_index in range(arg_count): std_matrix, mean_std, lut_matrix = _std_by_param( - n_by_param, - distinct_values_by_param_index, - state_or_trans, - attribute, - len(parameter_names) + arg_index, + by_param, + ret["distinct_values_by_param_index"], + len(param_names) + arg_index, ) ret["std_by_arg"].append(mean_std) ret["std_by_arg_values"].append(std_matrix) ret["lut_by_arg_values"].append(lut_matrix) ret["corr_by_arg"].append( - _corr_by_param( - attribute_data, param_values, len(parameter_names) + arg_index - ) + _corr_by_param(data, param_tuples, len(param_names) + arg_index) ) if False: - ret["depends_on_arg"].append(ret["corr_by_arg"][arg_index] > 0.1) + ret["_depends_on_arg"].append(ret["corr_by_arg"][arg_index] > 0.1) elif ret["std_by_arg"][arg_index] == 0: # In general, std_param_lut < std_by_arg. So, if std_by_arg == 0, std_param_lut == 0 follows. # This means that the variation of arg does not affect the model quality -> no influence - ret["depends_on_arg"].append(False) + ret["_depends_on_arg"].append(False) else: - ret["depends_on_arg"].append( + ret["_depends_on_arg"].append( ret["std_param_lut"] / ret["std_by_arg"][arg_index] < 0.5 ) @@ -317,7 +304,7 @@ def _compute_param_statistics( def _compute_param_statistics_parallel(arg): - return {"key": arg["key"], "result": _compute_param_statistics(*arg["args"])} + return {"key": arg["key"], "dict": _compute_param_statistics(*arg["args"])} def _all_params_are_numeric(data, param_idx): @@ -413,213 +400,150 @@ def remove_parameters_by_indices(by_name, parameter_names, parameter_indices_to_ parameter_names.pop(parameter_index) -class ParamStats: - """ - :param stats: `stats[state_or_tran][attribute]` = std_static, std_param_lut, ... (see `compute_param_statistics`) - :param distinct_values: `distinct_values[state_or_tran][param]` = [distinct values in aggregate] - :param distinct_values_by_param_index: `distinct_values[state_or_tran][i]` = [distinct values in aggregate] - """ +class ParallelParamStats: + def __init__(self): + self.queue = list() + self.map = dict() + + def enqueue(self, key, attr): + self.queue.append( + { + "key": key, + "args": [ + attr.data, + attr.param_names, + attr.param_values, + attr.arg_count, + ], + } + ) + self.map[key] = attr - def __init__( - self, by_name, by_param, parameter_names, arg_count, use_corrcoef=False - ): + def compute(self): """ - Compute standard deviation and correlation coefficient on parameterized data partitions. - - It is strongly recommended to vary all parameter values evenly. - For instance, given two parameters, providing only the combinations - (1, 1), (5, 1), (7, 1,) (10, 1), (1, 2), (1, 6) will lead to bogus results. - It is better to provide (1, 1), (5, 1), (1, 2), (5, 2), ... (i.e. a cross product of all individual parameter values) - - arguments: - by_name -- ground truth partitioned by state/transition name. - by_name[state_or_trans][attribute] must be a list or 1-D numpy array. - by_name[state_or_trans]['param'] must be a list of parameter values - corresponding to the ground truth, e.g. [[1, 2, 3], ...] if the - first ground truth element has the (lexically) first parameter set to 1, - the second to 2 and the third to 3. - by_param -- ground truth partitioned by state/transition name and parameters. - by_name[(state_or_trans, *)][attribute] must be a list or 1-D numpy array. - parameter_names -- list of parameter names, must have the same order as the parameter - values in by_param (lexical sorting is recommended). - arg_count -- dict providing the number of functions args ("local parameters") for each function. - use_corrcoef -- use correlation coefficient instead of stddev heuristic for parameter detection - """ - self.stats = dict() - self.distinct_values = dict() - self.distinct_values_by_param_index = dict() - self.use_corrcoef = use_corrcoef - self._parameter_names = parameter_names - - stats_queue = list() - - for state_or_tran in by_name.keys(): - self.stats[state_or_tran] = dict() - self.distinct_values_by_param_index[state_or_tran] = distinct_param_values( - by_name, state_or_tran - ) - self.distinct_values[state_or_tran] = dict() - for i, param in enumerate(parameter_names): - self.distinct_values[state_or_tran][ - param - ] = self.distinct_values_by_param_index[state_or_tran][i] - for attribute in by_name[state_or_tran]["attributes"]: - n_by_param = dict() - for k, v in by_param.items(): - if k[0] == state_or_tran: - n_by_param[k[1]] = v[attribute] - stats_queue.append( - { - "key": [state_or_tran, attribute], - "args": [ - by_name[state_or_tran][attribute], - by_name[state_or_tran]["param"], - n_by_param, - parameter_names, - arg_count, - state_or_tran, - attribute, - self.distinct_values[state_or_tran], - self.distinct_values_by_param_index[state_or_tran], - ], - } - ) + Fit functions on previously enqueue data. - # Fails if an object is > 2 GB in size. This happens when using - # --plot-traces or --pelt, which cause by_param and by_name to contain - # "power_traces" data with raw traces + Fitting is one in parallel with one process per core. + + Results can be accessed using the public ParallelParamFit.results object. + """ with Pool() as pool: - stats_results = pool.map(_compute_param_statistics_parallel, stats_queue) + results = pool.map(_compute_param_statistics_parallel, self.queue) - for stats in stats_results: - state_or_tran, attribute = stats["key"] - self.stats[state_or_tran][attribute] = stats["result"] + for result in results: + self.map[result["key"]].by_param = result["dict"].pop("by_param") + self.map[result["key"]].stats = ParamStats(result["dict"]) - def can_be_fitted(self, state_or_tran=None) -> bool: + +class ParamStats: + def __init__(self, data): + self.__dict__.update(data) + + def can_be_fitted(self) -> bool: """ Return whether a sufficient amount of distinct numeric parameter values is available, allowing a parameter-aware model to be generated. - - :param state_or_tran: state or transition. If unset, returns whether any state or transition can be fitted. """ - if state_or_tran is None: - keys = self.stats.keys() - else: - keys = [state_or_tran] - - for key in keys: - for param in self._parameter_names: - if ( - len( + for param in self._parameter_names: + if ( + len( + list( + filter( + lambda n: is_numeric(n), + self.distinct_values_by_param_name[param], + ) + ) + ) + > 2 + ): + logger.debug( + "can be fitted for param {} on {}".format( + param, list( filter( lambda n: is_numeric(n), - self.distinct_values[key][param], + self.distinct_values_by_param_name[param], ) - ) + ), ) - > 2 - ): - logger.debug( - "{} can be fitted for param {} on {}".format( - key, - param, - list( - filter( - lambda n: is_numeric(n), - self.distinct_values[key][param], - ) - ), - ) - ) - return True + ) + return True return False - def _generic_param_independence_ratio(self, state_or_trans, attribute): + def _generic_param_independence_ratio(self): """ - Return the heuristic ratio of parameter independence for state_or_trans and attribute. + Return the heuristic ratio of parameter independence. This is not supported if the correlation coefficient is used. A value close to 1 means no influence, a value close to 0 means high probability of influence. """ - statistics = self.stats[state_or_trans][attribute] if self.use_corrcoef: # not supported raise ValueError - if statistics["std_static"] == 0: + if self.std_static == 0: return 0 - return statistics["std_param_lut"] / statistics["std_static"] + return self.std_param_lut / self.std_static - def generic_param_dependence_ratio(self, state_or_trans, attribute): + def generic_param_dependence_ratio(self): """ - Return the heuristic ratio of parameter dependence for state_or_trans and attribute. + Return the heuristic ratio of parameter dependence. This is not supported if the correlation coefficient is used. A value close to 0 means no influence, a value close to 1 means high probability of influence. """ - return 1 - self._generic_param_independence_ratio(state_or_trans, attribute) + return 1 - self._generic_param_independence_ratio() - def _param_independence_ratio( - self, state_or_trans: str, attribute: str, param: str - ) -> float: + def _param_independence_ratio(self, param: str) -> float: """ - Return the heuristic ratio of parameter independence for state_or_trans, attribute, and param. + Return the heuristic ratio of parameter independence for param. A value close to 1 means no influence, a value close to 0 means high probability of influence. """ - statistics = self.stats[state_or_trans][attribute] if self.use_corrcoef: - return 1 - np.abs(statistics["corr_by_param"][param]) - if statistics["std_by_param"][param] == 0: - if statistics["std_param_lut"] != 0: + return 1 - np.abs(self.corr_by_param[param]) + if self.std_by_param[param] == 0: + if self.std_param_lut != 0: raise RuntimeError("wat") # In general, std_param_lut < std_by_param. So, if std_by_param == 0, std_param_lut == 0 follows. # This means that the variation of param does not affect the model quality -> no influence, return 1 return 1.0 - return statistics["std_param_lut"] / statistics["std_by_param"][param] + return self.std_param_lut / self.std_by_param[param] - def param_dependence_ratio( - self, state_or_trans: str, attribute: str, param: str - ) -> float: + def param_dependence_ratio(self, param: str) -> float: """ - Return the heuristic ratio of parameter dependence for state_or_trans, attribute, and param. + Return the heuristic ratio of parameter dependence for param. A value close to 0 means no influence, a value close to 1 means high probability of influence. - :param state_or_trans: state or transition name - :param attribute: model attribute :param param: parameter name :returns: parameter dependence (float between 0 == no influence and 1 == high probability of influence) """ - return 1 - self._param_independence_ratio(state_or_trans, attribute, param) + return 1 - self._param_independence_ratio(param) - def _arg_independence_ratio(self, state_or_trans, attribute, arg_index): - statistics = self.stats[state_or_trans][attribute] + def _arg_independence_ratio(self, arg_index): if self.use_corrcoef: - return 1 - np.abs(statistics["corr_by_arg"][arg_index]) - if statistics["std_by_arg"][arg_index] == 0: - if statistics["std_param_lut"] != 0: + return 1 - np.abs(self.corr_by_arg[arg_index]) + if self.std_by_arg[arg_index] == 0: + if self.std_param_lut != 0: raise RuntimeError("wat") # In general, std_param_lut < std_by_arg. So, if std_by_arg == 0, std_param_lut == 0 follows. # This means that the variation of arg does not affect the model quality -> no influence, return 1 return 1 - return statistics["std_param_lut"] / statistics["std_by_arg"][arg_index] + return self.std_param_lut / self.std_by_arg[arg_index] - def arg_dependence_ratio( - self, state_or_trans: str, attribute: str, arg_index: int - ) -> float: - return 1 - self._arg_independence_ratio(state_or_trans, attribute, arg_index) + def arg_dependence_ratio(self, arg_index: int) -> float: + return 1 - self._arg_independence_ratio(arg_index) # This heuristic is very similar to the "function is not much better than # median" checks in get_fitted. So far, doing it here as well is mostly # a performance and not an algorithm quality decision. # --df, 2018-04-18 - def depends_on_param(self, state_or_trans, attribute, param): + def depends_on_param(self, param): """Return whether attribute of state_or_trans depens on param.""" - return self.stats[state_or_trans][attribute]["depends_on_param"][param] + return self._depends_on_param[param] # See notes on depends_on_param - def depends_on_arg(self, state_or_trans, attribute, arg_index): + def depends_on_arg(self, arg_index): """Return whether attribute of state_or_trans depens on arg_index.""" - return self.stats[state_or_trans][attribute]["depends_on_arg"][arg_index] + return self._depends_on_arg[arg_index] diff --git a/lib/utils.py b/lib/utils.py index 560ab79..b38a359 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -173,6 +173,16 @@ def match_parameter_values(input_param: dict, match_param: dict): return True +def partition_by_param(data, param_values): + ret = dict() + for i, parameters in enumerate(param_values): + param_key = tuple(parameters) + if param_key not in ret: + ret[param_key] = list() + ret[param_key].append(data[i]) + return ret + + def by_name_to_by_param(by_name: dict): """ Convert aggregation by name to aggregation by name and parameter values. |