diff options
Diffstat (limited to 'lib/model.py')
-rw-r--r-- | lib/model.py | 253 |
1 files changed, 6 insertions, 247 deletions
diff --git a/lib/model.py b/lib/model.py index 5093bae..24d5bc1 100644 --- a/lib/model.py +++ b/lib/model.py @@ -6,16 +6,11 @@ import os from scipy import optimize from sklearn.metrics import r2_score from multiprocessing import Pool -from .automata import PTA -import dfatool.functions as df -from .parameters import ParallelParamStats, ParamStats +from .automata import PTA, ModelAttribute +from .functions import analytic, StaticInfo +from .parameters import ParallelParamStats from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple -from .utils import ( - by_name_to_by_param, - by_param_to_by_name, - match_parameter_values, - partition_by_param, -) +from .utils import by_name_to_by_param, match_parameter_values logger = logging.getLogger(__name__) arg_support_enabled = True @@ -210,7 +205,7 @@ def _try_fits( :param param_filter: Only use measurements whose parameters match param_filter for fitting. """ - functions = df.analytic.functions(safe_functions_enabled=safe_functions_enabled) + functions = analytic.functions(safe_functions_enabled=safe_functions_enabled) for param_key in n_by_param.keys(): # We might remove elements from 'functions' while iterating over @@ -344,242 +339,6 @@ def _num_args_from_by_name(by_name): return num_args -class ModelAttribute: - def __init__(self, name, attr, data, param_values, param_names, arg_count=0): - self.name = name - self.attr = attr - self.data = np.array(data) - self.param_values = param_values - self.param_names = sorted(param_names) - self.arg_count = arg_count - self.by_param = None # set via ParallelParamStats - self.function_override = None - self.param_model = None - self.split = None - - def __repr__(self): - mean = np.mean(self.data) - return f"ModelAttribute<{self.name}, {self.attr}, mean={mean}>" - - def get_static(self, use_mean=False): - if use_mean: - return np.mean(self.data) - return np.median(self.data) - - def get_lut(self, param, use_mean=False): - if use_mean: - return np.mean(self.by_param[param]) - return np.median(self.by_param[param]) - - def build_dtree(self): - split_param_index = self.get_split_param_index() - if split_param_index is None: - return - - distinct_values = self.stats.distinct_values_by_param_index[split_param_index] - tt1 = list( - map( - lambda i: self.param_values[i][split_param_index] == distinct_values[0], - range(len(self.param_values)), - ) - ) - tt2 = np.invert(tt1) - - pv1 = list() - pv2 = list() - - for i, param_tuple in enumerate(self.param_values): - if tt1[i]: - pv1.append(param_tuple) - else: - pv2.append(param_tuple) - - # print( - # f">>> split {self.name} {self.attr} by param #{split_param_index}" - # ) - - child1 = ModelAttribute( - self.name, self.attr, self.data[tt1], pv1, self.param_names, self.arg_count - ) - child2 = ModelAttribute( - self.name, self.attr, self.data[tt2], pv2, self.param_names, self.arg_count - ) - - ParamStats.compute_for_attr(child1) - ParamStats.compute_for_attr(child2) - - child1.build_dtree() - child2.build_dtree() - - self.split = ( - split_param_index, - {distinct_values[0]: child1, distinct_values[1]: child2}, - ) - - # print( - # f"<<< split {self.name} {self.attr} by param #{split_param_index}" - # ) - - # None -> kein split notwendig - # andernfalls: Parameter-Index, anhand dessen eine Decision Tree-Ebene aufgespannt wird - # (Kinder sind wiederum ModelAttributes, in denen dieser Parameter konstant ist) - def get_split_param_index(self): - if not self.param_names: - return None - std_by_param = list() - for param_index, param_name in enumerate(self.param_names): - distinct_values = self.stats.distinct_values_by_param_index[param_index] - if self.stats.depends_on_param(param_name) and len(distinct_values) == 2: - val1 = list( - map( - lambda i: self.param_values[i][param_index] - == distinct_values[0], - range(len(self.param_values)), - ) - ) - val2 = np.invert(val1) - val1_std = np.std(self.data[val1]) - val2_std = np.std(self.data[val2]) - std_by_param.append(np.mean([val1_std, val2_std])) - else: - std_by_param.append(np.inf) - for arg_index in range(self.arg_count): - distinct_values = self.stats.distinct_values_by_param_index[ - len(self.param_names) + arg_index - ] - if self.stats.depends_on_arg(arg_index) and len(distinct_values) == 2: - val1 = list( - map( - lambda i: self.param_values[i][ - len(self.param_names) + arg_index - ] - == distinct_values[0], - range(len(self.param_values)), - ) - ) - val2 = np.invert(val1) - val1_std = np.std(self.data[val1]) - val2_std = np.std(self.data[val2]) - std_by_param.append(np.mean([val1_std, val2_std])) - else: - std_by_param.append(np.inf) - split_param_index = np.argmin(std_by_param) - split_std = std_by_param[split_param_index] - if split_std == np.inf: - return None - return split_param_index - - def get_data_for_paramfit(self, safe_functions_enabled=False): - if self.split: - return self.get_data_for_paramfit_split( - safe_functions_enabled=safe_functions_enabled - ) - else: - return self.get_data_for_paramfit_this( - safe_functions_enabled=safe_functions_enabled - ) - - def get_data_for_paramfit_split(self, safe_functions_enabled=False): - split_param_index, child_by_param_value = self.split - ret = list() - for param_value, child in child_by_param_value.items(): - child_ret = child.get_data_for_paramfit( - safe_functions_enabled=safe_functions_enabled - ) - for key, param, val in child_ret: - ret.append((key[:2] + (param_value,) + key[2:], param, val)) - return ret - - def get_data_for_paramfit_this(self, safe_functions_enabled=False): - ret = list() - for param_index, param_name in enumerate(self.param_names): - if self.stats.depends_on_param(param_name): - ret.append( - ( - (self.name, self.attr), - param_name, - (self.by_param, param_index, safe_functions_enabled), - ) - ) - if self.arg_count: - for arg_index in range(self.arg_count): - if self.stats.depends_on_arg(arg_index): - ret.append( - ( - (self.name, self.attr), - arg_index, - ( - self.by_param, - len(self.param_names) + arg_index, - safe_functions_enabled, - ), - ) - ) - - return ret - - def set_data_from_paramfit(self, paramfit, prefix=tuple()): - if self.split: - self.set_data_from_paramfit_split(paramfit, prefix) - else: - self.set_data_from_paramfit_this(paramfit, prefix) - - def set_data_from_paramfit_split(self, paramfit, prefix): - split_param_index, child_by_param_value = self.split - function_map = { - "split_by": split_param_index, - "child": dict(), - "child_static": dict(), - } - function_child = dict() - info_child = dict() - for param_value, child in child_by_param_value.items(): - child.set_data_from_paramfit(paramfit, prefix + (param_value,)) - function_child[param_value], info_child[param_value] = child.get_fitted() - function_map = df.SplitFunction(split_param_index, function_child) - info_map = df.SplitInfo(split_param_index, info_child) - - self.param_model = function_map, info_map - - def set_data_from_paramfit_this(self, paramfit, prefix): - fit_result = paramfit.get_result((self.name, self.attr) + prefix) - param_model = ( - df.StaticFunction(np.median(self.data)), - df.StaticInfo(self.data), - ) - if self.function_override is not None: - function_str = self.function_override - x = df.AnalyticFunction(function_str, self.param_names, self.arg_count) - x.fit(self.by_param) - if x.fit_success: - param_model = (x, df.AnalyticInfo(fit_result, x)) - elif os.getenv("DFATOOL_NO_PARAM"): - pass - elif len(fit_result.keys()): - x = df.analytic.function_powerset( - fit_result, self.param_names, self.arg_count - ) - x.fit(self.by_param) - - if x.fit_success: - param_model = (x, df.AnalyticInfo(fit_result, x)) - - self.param_model = param_model - - def get_fitted(self): - """ - Get paramete-aware model function and model information function. - They must have been set via get_data_for_paramfit -> ParallelParamFit -> set-data_from_paramfit first. - - Returns a tuple (function, info): - function -> AnalyticFunction for model. function(param=parameter values) -> model value. - info -> {'fit_result' : ..., 'function' : ... } - - Returns (None, None) if fitting failed. Returns None if ParamFit has not been performed yet. - """ - return self.param_model - - class AnalyticModel: """ Parameter-aware analytic energy/data size/... model. @@ -816,7 +575,7 @@ class AnalyticModel: def model_getter(name, key, **kwargs): param_function, param_info = self.attr_by_name[name][key].get_fitted() - if type(param_info) is df.StaticInfo: + if type(param_info) is StaticInfo: return static_model[name][key] if "arg" in kwargs and "param" in kwargs: |