diff options
Diffstat (limited to 'lib/model.py')
-rw-r--r-- | lib/model.py | 325 |
1 files changed, 3 insertions, 322 deletions
diff --git a/lib/model.py b/lib/model.py index 24d5bc1..e97cfbb 100644 --- a/lib/model.py +++ b/lib/model.py @@ -3,332 +3,13 @@ import logging import numpy as np import os -from scipy import optimize -from sklearn.metrics import r2_score -from multiprocessing import Pool from .automata import PTA, ModelAttribute -from .functions import analytic, StaticInfo +from .functions import StaticInfo from .parameters import ParallelParamStats -from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple -from .utils import by_name_to_by_param, match_parameter_values +from .paramfit import ParallelParamFit +from .utils import soft_cast_int, by_name_to_by_param, regression_measures logger = logging.getLogger(__name__) -arg_support_enabled = True - - -def aggregate_measures(aggregate: float, actual: list) -> dict: - """ - Calculate error measures for model value on data list. - - arguments: - aggregate -- model value (float or int) - actual -- real-world / reference values (list of float or int) - - return value: - See regression_measures - """ - aggregate_array = np.array([aggregate] * len(actual)) - return regression_measures(aggregate_array, np.array(actual)) - - -def regression_measures(predicted: np.ndarray, actual: np.ndarray): - """ - Calculate error measures by comparing model values to reference values. - - arguments: - predicted -- model values (np.ndarray) - actual -- real-world / reference values (np.ndarray) - - Returns a dict containing the following measures: - mae -- Mean Absolute Error - mape -- Mean Absolute Percentage Error, - if all items in actual are non-zero (NaN otherwise) - smape -- Symmetric Mean Absolute Percentage Error, - if no 0,0-pairs are present in actual and predicted (NaN otherwise) - msd -- Mean Square Deviation - rmsd -- Root Mean Square Deviation - ssr -- Sum of Squared Residuals - rsq -- R^2 measure, see sklearn.metrics.r2_score - count -- Number of values - """ - if type(predicted) != np.ndarray: - raise ValueError("first arg must be ndarray, is {}".format(type(predicted))) - if type(actual) != np.ndarray: - raise ValueError("second arg must be ndarray, is {}".format(type(actual))) - deviations = predicted - actual - # mean = np.mean(actual) - if len(deviations) == 0: - return {} - measures = { - "mae": np.mean(np.abs(deviations), dtype=np.float64), - "msd": np.mean(deviations ** 2, dtype=np.float64), - "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64), - "ssr": np.sum(deviations ** 2, dtype=np.float64), - "rsq": r2_score(actual, predicted), - "count": len(actual), - } - - # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64) - - if np.all(actual != 0): - measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure - else: - measures["mape"] = np.nan - if np.all(np.abs(predicted) + np.abs(actual) != 0): - measures["smape"] = ( - np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2)) - * 100 - ) - else: - measures["smape"] = np.nan - # if np.all(rsq_quotient != 0): - # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient - - return measures - - -class ParallelParamFit: - """ - Fit a set of functions on parameterized measurements. - - One parameter is variale, all others are fixed. Reports the best-fitting - function type for each parameter. - """ - - def __init__(self): - """Create a new ParallelParamFit object.""" - self.fit_queue = list() - - def enqueue(self, key, param, args): - """ - Add state_or_tran/attribute/param_name to fit queue. - - This causes fit() to compute the best-fitting function for this model part. - - :param key: arbitrary key used to retrieve param result in `get_result`. Typically (state/transition name, model attribute). - Different parameter names may have the same key. Identical parameter names must have different keys. - :param param: parameter name - :param args: [by_param, param_index, safe_functions_enabled, param_filter] - by_param[(param 1, param2, ...)] holds measurements. - """ - self.fit_queue.append({"key": (key, param), "args": args}) - - def fit(self): - """ - Fit functions on previously enqueue data. - - Fitting is one in parallel with one process per core. - - Results can be accessed using the public ParallelParamFit.results object. - """ - with Pool() as pool: - self.results = pool.map(_try_fits_parallel, self.fit_queue) - - def get_result(self, key): - """ - Parse and sanitize fit results. - - Filters out results where the best function is worse (or not much better than) static mean/median estimates. - - :param key: arbitrary key used in `enqueue`. Typically (state/transition name, model attribute). - :param param: parameter name - :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} } - """ - fit_result = dict() - for result in self.results: - if ( - result["key"][0] == key and result["result"]["best"] is not None - ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl? - this_result = result["result"] - if this_result["best_rmsd"] >= min( - this_result["mean_rmsd"], this_result["median_rmsd"] - ): - logger.debug( - "Not modeling {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format( - result["key"][0], - result["key"][1], - this_result["best_rmsd"], - this_result["mean_rmsd"], - this_result["median_rmsd"], - ) - ) - # See notes on depends_on_param - elif this_result["best_rmsd"] >= 0.8 * min( - this_result["mean_rmsd"], this_result["median_rmsd"] - ): - logger.debug( - "Not modeling {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format( - result["key"][0], - result["key"][1], - this_result["best_rmsd"], - this_result["mean_rmsd"], - this_result["median_rmsd"], - ) - ) - else: - fit_result[result["key"][1]] = this_result - return fit_result - - -def _try_fits_parallel(arg): - """ - Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result. - - Must be a global function as it is called from a multiprocessing Pool. - """ - return {"key": arg["key"], "result": _try_fits(*arg["args"])} - - -def _try_fits( - n_by_param, param_index, safe_functions_enabled=False, param_filter: dict = None -): - """ - Determine goodness-of-fit for prediction of `n_by_param[(param1_value, param2_value, ...)]` dependence on `param_index` using various functions. - - This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters. - The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function. - Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored. - Fitting is only performed if at least three distinct parameter values exist in `by_param[*]`. - - :returns: a dictionary with the following elements: - best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data. - best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters - mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value - median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value - results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values - - :param n_by_param: measurements of a specific model attribute partitioned by parameter values. - Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}` - - :param param_index: index of the parameter used as model input - :param safe_functions_enabled: Include "safe" variants of functions with limited argument range. - :param param_filter: Only use measurements whose parameters match param_filter for fitting. - """ - - functions = analytic.functions(safe_functions_enabled=safe_functions_enabled) - - for param_key in n_by_param.keys(): - # We might remove elements from 'functions' while iterating over - # its keys. A generator will not allow this, so we need to - # convert to a list. - function_names = list(functions.keys()) - for function_name in function_names: - function_object = functions[function_name] - if is_numeric(param_key[param_index]) and not function_object.is_valid( - param_key[param_index] - ): - functions.pop(function_name, None) - - raw_results = dict() - raw_results_by_param = dict() - ref_results = {"mean": list(), "median": list()} - results = dict() - results_by_param = dict() - - seen_parameter_combinations = set() - - # for each parameter combination: - for param_key in filter( - lambda x: remove_index_from_tuple(x, param_index) - not in seen_parameter_combinations - and len(n_by_param[x]) - and match_parameter_values(n_by_param[x][0], param_filter), - n_by_param.keys(), - ): - X = [] - Y = [] - num_valid = 0 - num_total = 0 - - # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1, - # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters - seen_parameter_combinations.add(remove_index_from_tuple(param_key, param_index)) - - # for each value of the parameter denoted by param_index (all other parameters remain the same): - for k, v in filter( - lambda kv: param_slice_eq(kv[0], param_key, param_index), n_by_param.items() - ): - num_total += 1 - if is_numeric(k[param_index]): - num_valid += 1 - X.extend([float(k[param_index])] * len(v)) - Y.extend(v) - - if num_valid > 2: - X = np.array(X) - Y = np.array(Y) - other_parameters = remove_index_from_tuple(k, param_index) - raw_results_by_param[other_parameters] = dict() - results_by_param[other_parameters] = dict() - for function_name, param_function in functions.items(): - if function_name not in raw_results: - raw_results[function_name] = dict() - error_function = param_function.error_function - res = optimize.least_squares( - error_function, [0, 1], args=(X, Y), xtol=2e-15 - ) - measures = regression_measures(param_function.eval(res.x, X), Y) - raw_results_by_param[other_parameters][function_name] = measures - for measure, error_rate in measures.items(): - if measure not in raw_results[function_name]: - raw_results[function_name][measure] = list() - raw_results[function_name][measure].append(error_rate) - # print(function_name, res, measures) - mean_measures = aggregate_measures(np.mean(Y), Y) - ref_results["mean"].append(mean_measures["rmsd"]) - raw_results_by_param[other_parameters]["mean"] = mean_measures - median_measures = aggregate_measures(np.median(Y), Y) - ref_results["median"].append(median_measures["rmsd"]) - raw_results_by_param[other_parameters]["median"] = median_measures - - if not len(ref_results["mean"]): - # Insufficient data for fitting - # print('[W] Insufficient data for fitting {}'.format(param_index)) - return {"best": None, "best_rmsd": np.inf, "results": results} - - for ( - other_parameter_combination, - other_parameter_results, - ) in raw_results_by_param.items(): - best_fit_val = np.inf - best_fit_name = None - results = dict() - for function_name, result in other_parameter_results.items(): - if len(result) > 0: - results[function_name] = result - rmsd = result["rmsd"] - if rmsd < best_fit_val: - best_fit_val = rmsd - best_fit_name = function_name - results_by_param[other_parameter_combination] = { - "best": best_fit_name, - "best_rmsd": best_fit_val, - "mean_rmsd": results["mean"]["rmsd"], - "median_rmsd": results["median"]["rmsd"], - "results": results, - } - - best_fit_val = np.inf - best_fit_name = None - results = dict() - for function_name, result in raw_results.items(): - if len(result) > 0: - results[function_name] = {} - for measure in result.keys(): - results[function_name][measure] = np.mean(result[measure]) - rmsd = results[function_name]["rmsd"] - if rmsd < best_fit_val: - best_fit_val = rmsd - best_fit_name = function_name - - return { - "best": best_fit_name, - "best_rmsd": best_fit_val, - "mean_rmsd": np.mean(ref_results["mean"]), - "median_rmsd": np.mean(ref_results["median"]), - "results": results, - "results_by_other_param": results_by_param, - } def _num_args_from_by_name(by_name): |