diff options
Diffstat (limited to 'lib/parameters.py')
-rw-r--r-- | lib/parameters.py | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/lib/parameters.py b/lib/parameters.py index d216de9..cf9713c 100644 --- a/lib/parameters.py +++ b/lib/parameters.py @@ -1,10 +1,12 @@ import itertools import logging import numpy as np +import os import warnings from collections import OrderedDict from copy import deepcopy from multiprocessing import Pool +import dfatool.functions as df from .utils import remove_index_from_tuple, is_numeric from .utils import filter_aggregate_by_param, partition_by_param @@ -561,3 +563,239 @@ class ParamStats: def depends_on_arg(self, arg_index): """Return whether attribute of state_or_trans depens on arg_index.""" return self._depends_on_arg[arg_index] + + +class ModelAttribute: + def __init__(self, name, attr, data, param_values, param_names, arg_count=0): + self.name = name + self.attr = attr + self.data = np.array(data) + self.param_values = param_values + self.param_names = sorted(param_names) + self.arg_count = arg_count + self.by_param = None # set via ParallelParamStats + self.function_override = None + self.param_model = None + self.split = None + + def __repr__(self): + mean = np.mean(self.data) + return f"ModelAttribute<{self.name}, {self.attr}, mean={mean}>" + + def get_static(self, use_mean=False): + if use_mean: + return np.mean(self.data) + return np.median(self.data) + + def get_lut(self, param, use_mean=False): + if use_mean: + return np.mean(self.by_param[param]) + return np.median(self.by_param[param]) + + def build_dtree(self): + split_param_index = self.get_split_param_index() + if split_param_index is None: + return + + distinct_values = self.stats.distinct_values_by_param_index[split_param_index] + tt1 = list( + map( + lambda i: self.param_values[i][split_param_index] == distinct_values[0], + range(len(self.param_values)), + ) + ) + tt2 = np.invert(tt1) + + pv1 = list() + pv2 = list() + + for i, param_tuple in enumerate(self.param_values): + if tt1[i]: + pv1.append(param_tuple) + else: + pv2.append(param_tuple) + + # print( + # f">>> split {self.name} {self.attr} by param #{split_param_index}" + # ) + + child1 = ModelAttribute( + self.name, self.attr, self.data[tt1], pv1, self.param_names, self.arg_count + ) + child2 = ModelAttribute( + self.name, self.attr, self.data[tt2], pv2, self.param_names, self.arg_count + ) + + ParamStats.compute_for_attr(child1) + ParamStats.compute_for_attr(child2) + + child1.build_dtree() + child2.build_dtree() + + self.split = ( + split_param_index, + {distinct_values[0]: child1, distinct_values[1]: child2}, + ) + + # print( + # f"<<< split {self.name} {self.attr} by param #{split_param_index}" + # ) + + # None -> kein split notwendig + # andernfalls: Parameter-Index, anhand dessen eine Decision Tree-Ebene aufgespannt wird + # (Kinder sind wiederum ModelAttributes, in denen dieser Parameter konstant ist) + def get_split_param_index(self): + if not self.param_names: + return None + std_by_param = list() + for param_index, param_name in enumerate(self.param_names): + distinct_values = self.stats.distinct_values_by_param_index[param_index] + if self.stats.depends_on_param(param_name) and len(distinct_values) == 2: + val1 = list( + map( + lambda i: self.param_values[i][param_index] + == distinct_values[0], + range(len(self.param_values)), + ) + ) + val2 = np.invert(val1) + val1_std = np.std(self.data[val1]) + val2_std = np.std(self.data[val2]) + std_by_param.append(np.mean([val1_std, val2_std])) + else: + std_by_param.append(np.inf) + for arg_index in range(self.arg_count): + distinct_values = self.stats.distinct_values_by_param_index[ + len(self.param_names) + arg_index + ] + if self.stats.depends_on_arg(arg_index) and len(distinct_values) == 2: + val1 = list( + map( + lambda i: self.param_values[i][ + len(self.param_names) + arg_index + ] + == distinct_values[0], + range(len(self.param_values)), + ) + ) + val2 = np.invert(val1) + val1_std = np.std(self.data[val1]) + val2_std = np.std(self.data[val2]) + std_by_param.append(np.mean([val1_std, val2_std])) + else: + std_by_param.append(np.inf) + split_param_index = np.argmin(std_by_param) + split_std = std_by_param[split_param_index] + if split_std == np.inf: + return None + return split_param_index + + def get_data_for_paramfit(self, safe_functions_enabled=False): + if self.split: + return self.get_data_for_paramfit_split( + safe_functions_enabled=safe_functions_enabled + ) + else: + return self.get_data_for_paramfit_this( + safe_functions_enabled=safe_functions_enabled + ) + + def get_data_for_paramfit_split(self, safe_functions_enabled=False): + split_param_index, child_by_param_value = self.split + ret = list() + for param_value, child in child_by_param_value.items(): + child_ret = child.get_data_for_paramfit( + safe_functions_enabled=safe_functions_enabled + ) + for key, param, val in child_ret: + ret.append((key[:2] + (param_value,) + key[2:], param, val)) + return ret + + def get_data_for_paramfit_this(self, safe_functions_enabled=False): + ret = list() + for param_index, param_name in enumerate(self.param_names): + if self.stats.depends_on_param(param_name): + ret.append( + ( + (self.name, self.attr), + param_name, + (self.by_param, param_index, safe_functions_enabled), + ) + ) + if self.arg_count: + for arg_index in range(self.arg_count): + if self.stats.depends_on_arg(arg_index): + ret.append( + ( + (self.name, self.attr), + arg_index, + ( + self.by_param, + len(self.param_names) + arg_index, + safe_functions_enabled, + ), + ) + ) + + return ret + + def set_data_from_paramfit(self, paramfit, prefix=tuple()): + if self.split: + self.set_data_from_paramfit_split(paramfit, prefix) + else: + self.set_data_from_paramfit_this(paramfit, prefix) + + def set_data_from_paramfit_split(self, paramfit, prefix): + split_param_index, child_by_param_value = self.split + function_map = { + "split_by": split_param_index, + "child": dict(), + "child_static": dict(), + } + function_child = dict() + info_child = dict() + for param_value, child in child_by_param_value.items(): + child.set_data_from_paramfit(paramfit, prefix + (param_value,)) + function_child[param_value], info_child[param_value] = child.get_fitted() + function_map = df.SplitFunction(split_param_index, function_child) + info_map = df.SplitInfo(split_param_index, info_child) + + self.param_model = function_map, info_map + + def set_data_from_paramfit_this(self, paramfit, prefix): + fit_result = paramfit.get_result((self.name, self.attr) + prefix) + param_model = ( + df.StaticFunction(np.median(self.data)), + df.StaticInfo(self.data), + ) + if self.function_override is not None: + function_str = self.function_override + x = df.AnalyticFunction(function_str, self.param_names, self.arg_count) + x.fit(self.by_param) + if x.fit_success: + param_model = (x, df.AnalyticInfo(fit_result, x)) + elif os.getenv("DFATOOL_NO_PARAM"): + pass + elif len(fit_result.keys()): + x = df.analytic.function_powerset( + fit_result, self.param_names, self.arg_count + ) + x.fit(self.by_param) + + if x.fit_success: + param_model = (x, df.AnalyticInfo(fit_result, x)) + + self.param_model = param_model + + def get_fitted(self): + """ + Get paramete-aware model function and model information function. + They must have been set via get_data_for_paramfit -> ParallelParamFit -> set-data_from_paramfit first. + + Returns a tuple (function, info): + function -> AnalyticFunction for model. function(param=parameter values) -> model value. + info -> {'fit_result' : ..., 'function' : ... } + + Returns (None, None) if fitting failed. Returns None if ParamFit has not been performed yet. + """ + return self.param_model |