summaryrefslogtreecommitdiff
path: root/lib/model.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/model.py')
-rw-r--r--lib/model.py253
1 files changed, 6 insertions, 247 deletions
diff --git a/lib/model.py b/lib/model.py
index 5093bae..24d5bc1 100644
--- a/lib/model.py
+++ b/lib/model.py
@@ -6,16 +6,11 @@ import os
from scipy import optimize
from sklearn.metrics import r2_score
from multiprocessing import Pool
-from .automata import PTA
-import dfatool.functions as df
-from .parameters import ParallelParamStats, ParamStats
+from .automata import PTA, ModelAttribute
+from .functions import analytic, StaticInfo
+from .parameters import ParallelParamStats
from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple
-from .utils import (
- by_name_to_by_param,
- by_param_to_by_name,
- match_parameter_values,
- partition_by_param,
-)
+from .utils import by_name_to_by_param, match_parameter_values
logger = logging.getLogger(__name__)
arg_support_enabled = True
@@ -210,7 +205,7 @@ def _try_fits(
:param param_filter: Only use measurements whose parameters match param_filter for fitting.
"""
- functions = df.analytic.functions(safe_functions_enabled=safe_functions_enabled)
+ functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
for param_key in n_by_param.keys():
# We might remove elements from 'functions' while iterating over
@@ -344,242 +339,6 @@ def _num_args_from_by_name(by_name):
return num_args
-class ModelAttribute:
- def __init__(self, name, attr, data, param_values, param_names, arg_count=0):
- self.name = name
- self.attr = attr
- self.data = np.array(data)
- self.param_values = param_values
- self.param_names = sorted(param_names)
- self.arg_count = arg_count
- self.by_param = None # set via ParallelParamStats
- self.function_override = None
- self.param_model = None
- self.split = None
-
- def __repr__(self):
- mean = np.mean(self.data)
- return f"ModelAttribute<{self.name}, {self.attr}, mean={mean}>"
-
- def get_static(self, use_mean=False):
- if use_mean:
- return np.mean(self.data)
- return np.median(self.data)
-
- def get_lut(self, param, use_mean=False):
- if use_mean:
- return np.mean(self.by_param[param])
- return np.median(self.by_param[param])
-
- def build_dtree(self):
- split_param_index = self.get_split_param_index()
- if split_param_index is None:
- return
-
- distinct_values = self.stats.distinct_values_by_param_index[split_param_index]
- tt1 = list(
- map(
- lambda i: self.param_values[i][split_param_index] == distinct_values[0],
- range(len(self.param_values)),
- )
- )
- tt2 = np.invert(tt1)
-
- pv1 = list()
- pv2 = list()
-
- for i, param_tuple in enumerate(self.param_values):
- if tt1[i]:
- pv1.append(param_tuple)
- else:
- pv2.append(param_tuple)
-
- # print(
- # f">>> split {self.name} {self.attr} by param #{split_param_index}"
- # )
-
- child1 = ModelAttribute(
- self.name, self.attr, self.data[tt1], pv1, self.param_names, self.arg_count
- )
- child2 = ModelAttribute(
- self.name, self.attr, self.data[tt2], pv2, self.param_names, self.arg_count
- )
-
- ParamStats.compute_for_attr(child1)
- ParamStats.compute_for_attr(child2)
-
- child1.build_dtree()
- child2.build_dtree()
-
- self.split = (
- split_param_index,
- {distinct_values[0]: child1, distinct_values[1]: child2},
- )
-
- # print(
- # f"<<< split {self.name} {self.attr} by param #{split_param_index}"
- # )
-
- # None -> kein split notwendig
- # andernfalls: Parameter-Index, anhand dessen eine Decision Tree-Ebene aufgespannt wird
- # (Kinder sind wiederum ModelAttributes, in denen dieser Parameter konstant ist)
- def get_split_param_index(self):
- if not self.param_names:
- return None
- std_by_param = list()
- for param_index, param_name in enumerate(self.param_names):
- distinct_values = self.stats.distinct_values_by_param_index[param_index]
- if self.stats.depends_on_param(param_name) and len(distinct_values) == 2:
- val1 = list(
- map(
- lambda i: self.param_values[i][param_index]
- == distinct_values[0],
- range(len(self.param_values)),
- )
- )
- val2 = np.invert(val1)
- val1_std = np.std(self.data[val1])
- val2_std = np.std(self.data[val2])
- std_by_param.append(np.mean([val1_std, val2_std]))
- else:
- std_by_param.append(np.inf)
- for arg_index in range(self.arg_count):
- distinct_values = self.stats.distinct_values_by_param_index[
- len(self.param_names) + arg_index
- ]
- if self.stats.depends_on_arg(arg_index) and len(distinct_values) == 2:
- val1 = list(
- map(
- lambda i: self.param_values[i][
- len(self.param_names) + arg_index
- ]
- == distinct_values[0],
- range(len(self.param_values)),
- )
- )
- val2 = np.invert(val1)
- val1_std = np.std(self.data[val1])
- val2_std = np.std(self.data[val2])
- std_by_param.append(np.mean([val1_std, val2_std]))
- else:
- std_by_param.append(np.inf)
- split_param_index = np.argmin(std_by_param)
- split_std = std_by_param[split_param_index]
- if split_std == np.inf:
- return None
- return split_param_index
-
- def get_data_for_paramfit(self, safe_functions_enabled=False):
- if self.split:
- return self.get_data_for_paramfit_split(
- safe_functions_enabled=safe_functions_enabled
- )
- else:
- return self.get_data_for_paramfit_this(
- safe_functions_enabled=safe_functions_enabled
- )
-
- def get_data_for_paramfit_split(self, safe_functions_enabled=False):
- split_param_index, child_by_param_value = self.split
- ret = list()
- for param_value, child in child_by_param_value.items():
- child_ret = child.get_data_for_paramfit(
- safe_functions_enabled=safe_functions_enabled
- )
- for key, param, val in child_ret:
- ret.append((key[:2] + (param_value,) + key[2:], param, val))
- return ret
-
- def get_data_for_paramfit_this(self, safe_functions_enabled=False):
- ret = list()
- for param_index, param_name in enumerate(self.param_names):
- if self.stats.depends_on_param(param_name):
- ret.append(
- (
- (self.name, self.attr),
- param_name,
- (self.by_param, param_index, safe_functions_enabled),
- )
- )
- if self.arg_count:
- for arg_index in range(self.arg_count):
- if self.stats.depends_on_arg(arg_index):
- ret.append(
- (
- (self.name, self.attr),
- arg_index,
- (
- self.by_param,
- len(self.param_names) + arg_index,
- safe_functions_enabled,
- ),
- )
- )
-
- return ret
-
- def set_data_from_paramfit(self, paramfit, prefix=tuple()):
- if self.split:
- self.set_data_from_paramfit_split(paramfit, prefix)
- else:
- self.set_data_from_paramfit_this(paramfit, prefix)
-
- def set_data_from_paramfit_split(self, paramfit, prefix):
- split_param_index, child_by_param_value = self.split
- function_map = {
- "split_by": split_param_index,
- "child": dict(),
- "child_static": dict(),
- }
- function_child = dict()
- info_child = dict()
- for param_value, child in child_by_param_value.items():
- child.set_data_from_paramfit(paramfit, prefix + (param_value,))
- function_child[param_value], info_child[param_value] = child.get_fitted()
- function_map = df.SplitFunction(split_param_index, function_child)
- info_map = df.SplitInfo(split_param_index, info_child)
-
- self.param_model = function_map, info_map
-
- def set_data_from_paramfit_this(self, paramfit, prefix):
- fit_result = paramfit.get_result((self.name, self.attr) + prefix)
- param_model = (
- df.StaticFunction(np.median(self.data)),
- df.StaticInfo(self.data),
- )
- if self.function_override is not None:
- function_str = self.function_override
- x = df.AnalyticFunction(function_str, self.param_names, self.arg_count)
- x.fit(self.by_param)
- if x.fit_success:
- param_model = (x, df.AnalyticInfo(fit_result, x))
- elif os.getenv("DFATOOL_NO_PARAM"):
- pass
- elif len(fit_result.keys()):
- x = df.analytic.function_powerset(
- fit_result, self.param_names, self.arg_count
- )
- x.fit(self.by_param)
-
- if x.fit_success:
- param_model = (x, df.AnalyticInfo(fit_result, x))
-
- self.param_model = param_model
-
- def get_fitted(self):
- """
- Get paramete-aware model function and model information function.
- They must have been set via get_data_for_paramfit -> ParallelParamFit -> set-data_from_paramfit first.
-
- Returns a tuple (function, info):
- function -> AnalyticFunction for model. function(param=parameter values) -> model value.
- info -> {'fit_result' : ..., 'function' : ... }
-
- Returns (None, None) if fitting failed. Returns None if ParamFit has not been performed yet.
- """
- return self.param_model
-
-
class AnalyticModel:
"""
Parameter-aware analytic energy/data size/... model.
@@ -816,7 +575,7 @@ class AnalyticModel:
def model_getter(name, key, **kwargs):
param_function, param_info = self.attr_by_name[name][key].get_fitted()
- if type(param_info) is df.StaticInfo:
+ if type(param_info) is StaticInfo:
return static_model[name][key]
if "arg" in kwargs and "param" in kwargs: