summaryrefslogtreecommitdiff
path: root/lib/parameters.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/parameters.py')
-rw-r--r--lib/parameters.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/lib/parameters.py b/lib/parameters.py
index d216de9..cf9713c 100644
--- a/lib/parameters.py
+++ b/lib/parameters.py
@@ -1,10 +1,12 @@
import itertools
import logging
import numpy as np
+import os
import warnings
from collections import OrderedDict
from copy import deepcopy
from multiprocessing import Pool
+import dfatool.functions as df
from .utils import remove_index_from_tuple, is_numeric
from .utils import filter_aggregate_by_param, partition_by_param
@@ -561,3 +563,239 @@ class ParamStats:
def depends_on_arg(self, arg_index):
"""Return whether attribute of state_or_trans depens on arg_index."""
return self._depends_on_arg[arg_index]
+
+
+class ModelAttribute:
+ def __init__(self, name, attr, data, param_values, param_names, arg_count=0):
+ self.name = name
+ self.attr = attr
+ self.data = np.array(data)
+ self.param_values = param_values
+ self.param_names = sorted(param_names)
+ self.arg_count = arg_count
+ self.by_param = None # set via ParallelParamStats
+ self.function_override = None
+ self.param_model = None
+ self.split = None
+
+ def __repr__(self):
+ mean = np.mean(self.data)
+ return f"ModelAttribute<{self.name}, {self.attr}, mean={mean}>"
+
+ def get_static(self, use_mean=False):
+ if use_mean:
+ return np.mean(self.data)
+ return np.median(self.data)
+
+ def get_lut(self, param, use_mean=False):
+ if use_mean:
+ return np.mean(self.by_param[param])
+ return np.median(self.by_param[param])
+
+ def build_dtree(self):
+ split_param_index = self.get_split_param_index()
+ if split_param_index is None:
+ return
+
+ distinct_values = self.stats.distinct_values_by_param_index[split_param_index]
+ tt1 = list(
+ map(
+ lambda i: self.param_values[i][split_param_index] == distinct_values[0],
+ range(len(self.param_values)),
+ )
+ )
+ tt2 = np.invert(tt1)
+
+ pv1 = list()
+ pv2 = list()
+
+ for i, param_tuple in enumerate(self.param_values):
+ if tt1[i]:
+ pv1.append(param_tuple)
+ else:
+ pv2.append(param_tuple)
+
+ # print(
+ # f">>> split {self.name} {self.attr} by param #{split_param_index}"
+ # )
+
+ child1 = ModelAttribute(
+ self.name, self.attr, self.data[tt1], pv1, self.param_names, self.arg_count
+ )
+ child2 = ModelAttribute(
+ self.name, self.attr, self.data[tt2], pv2, self.param_names, self.arg_count
+ )
+
+ ParamStats.compute_for_attr(child1)
+ ParamStats.compute_for_attr(child2)
+
+ child1.build_dtree()
+ child2.build_dtree()
+
+ self.split = (
+ split_param_index,
+ {distinct_values[0]: child1, distinct_values[1]: child2},
+ )
+
+ # print(
+ # f"<<< split {self.name} {self.attr} by param #{split_param_index}"
+ # )
+
+ # None -> kein split notwendig
+ # andernfalls: Parameter-Index, anhand dessen eine Decision Tree-Ebene aufgespannt wird
+ # (Kinder sind wiederum ModelAttributes, in denen dieser Parameter konstant ist)
+ def get_split_param_index(self):
+ if not self.param_names:
+ return None
+ std_by_param = list()
+ for param_index, param_name in enumerate(self.param_names):
+ distinct_values = self.stats.distinct_values_by_param_index[param_index]
+ if self.stats.depends_on_param(param_name) and len(distinct_values) == 2:
+ val1 = list(
+ map(
+ lambda i: self.param_values[i][param_index]
+ == distinct_values[0],
+ range(len(self.param_values)),
+ )
+ )
+ val2 = np.invert(val1)
+ val1_std = np.std(self.data[val1])
+ val2_std = np.std(self.data[val2])
+ std_by_param.append(np.mean([val1_std, val2_std]))
+ else:
+ std_by_param.append(np.inf)
+ for arg_index in range(self.arg_count):
+ distinct_values = self.stats.distinct_values_by_param_index[
+ len(self.param_names) + arg_index
+ ]
+ if self.stats.depends_on_arg(arg_index) and len(distinct_values) == 2:
+ val1 = list(
+ map(
+ lambda i: self.param_values[i][
+ len(self.param_names) + arg_index
+ ]
+ == distinct_values[0],
+ range(len(self.param_values)),
+ )
+ )
+ val2 = np.invert(val1)
+ val1_std = np.std(self.data[val1])
+ val2_std = np.std(self.data[val2])
+ std_by_param.append(np.mean([val1_std, val2_std]))
+ else:
+ std_by_param.append(np.inf)
+ split_param_index = np.argmin(std_by_param)
+ split_std = std_by_param[split_param_index]
+ if split_std == np.inf:
+ return None
+ return split_param_index
+
+ def get_data_for_paramfit(self, safe_functions_enabled=False):
+ if self.split:
+ return self.get_data_for_paramfit_split(
+ safe_functions_enabled=safe_functions_enabled
+ )
+ else:
+ return self.get_data_for_paramfit_this(
+ safe_functions_enabled=safe_functions_enabled
+ )
+
+ def get_data_for_paramfit_split(self, safe_functions_enabled=False):
+ split_param_index, child_by_param_value = self.split
+ ret = list()
+ for param_value, child in child_by_param_value.items():
+ child_ret = child.get_data_for_paramfit(
+ safe_functions_enabled=safe_functions_enabled
+ )
+ for key, param, val in child_ret:
+ ret.append((key[:2] + (param_value,) + key[2:], param, val))
+ return ret
+
+ def get_data_for_paramfit_this(self, safe_functions_enabled=False):
+ ret = list()
+ for param_index, param_name in enumerate(self.param_names):
+ if self.stats.depends_on_param(param_name):
+ ret.append(
+ (
+ (self.name, self.attr),
+ param_name,
+ (self.by_param, param_index, safe_functions_enabled),
+ )
+ )
+ if self.arg_count:
+ for arg_index in range(self.arg_count):
+ if self.stats.depends_on_arg(arg_index):
+ ret.append(
+ (
+ (self.name, self.attr),
+ arg_index,
+ (
+ self.by_param,
+ len(self.param_names) + arg_index,
+ safe_functions_enabled,
+ ),
+ )
+ )
+
+ return ret
+
+ def set_data_from_paramfit(self, paramfit, prefix=tuple()):
+ if self.split:
+ self.set_data_from_paramfit_split(paramfit, prefix)
+ else:
+ self.set_data_from_paramfit_this(paramfit, prefix)
+
+ def set_data_from_paramfit_split(self, paramfit, prefix):
+ split_param_index, child_by_param_value = self.split
+ function_map = {
+ "split_by": split_param_index,
+ "child": dict(),
+ "child_static": dict(),
+ }
+ function_child = dict()
+ info_child = dict()
+ for param_value, child in child_by_param_value.items():
+ child.set_data_from_paramfit(paramfit, prefix + (param_value,))
+ function_child[param_value], info_child[param_value] = child.get_fitted()
+ function_map = df.SplitFunction(split_param_index, function_child)
+ info_map = df.SplitInfo(split_param_index, info_child)
+
+ self.param_model = function_map, info_map
+
+ def set_data_from_paramfit_this(self, paramfit, prefix):
+ fit_result = paramfit.get_result((self.name, self.attr) + prefix)
+ param_model = (
+ df.StaticFunction(np.median(self.data)),
+ df.StaticInfo(self.data),
+ )
+ if self.function_override is not None:
+ function_str = self.function_override
+ x = df.AnalyticFunction(function_str, self.param_names, self.arg_count)
+ x.fit(self.by_param)
+ if x.fit_success:
+ param_model = (x, df.AnalyticInfo(fit_result, x))
+ elif os.getenv("DFATOOL_NO_PARAM"):
+ pass
+ elif len(fit_result.keys()):
+ x = df.analytic.function_powerset(
+ fit_result, self.param_names, self.arg_count
+ )
+ x.fit(self.by_param)
+
+ if x.fit_success:
+ param_model = (x, df.AnalyticInfo(fit_result, x))
+
+ self.param_model = param_model
+
+ def get_fitted(self):
+ """
+ Get paramete-aware model function and model information function.
+ They must have been set via get_data_for_paramfit -> ParallelParamFit -> set-data_from_paramfit first.
+
+ Returns a tuple (function, info):
+ function -> AnalyticFunction for model. function(param=parameter values) -> model value.
+ info -> {'fit_result' : ..., 'function' : ... }
+
+ Returns (None, None) if fitting failed. Returns None if ParamFit has not been performed yet.
+ """
+ return self.param_model