diff options
author | Daniel Friesel <derf@finalrewind.org> | 2018-04-26 10:02:10 +0200 |
---|---|---|
committer | Daniel Friesel <derf@finalrewind.org> | 2018-04-26 10:02:10 +0200 |
commit | 3992ec39ee64460e3555e9ba157932dffb74042b (patch) | |
tree | a8065bd97ce5a8323a94ae2b3d06d99fb0ab42b1 /lib/dfatool.py | |
parent | 4c220e1b4e029d2130789c16b814143e4dcc00b8 (diff) |
refactor function code into separate file to solve circular import
Diffstat (limited to 'lib/dfatool.py')
-rwxr-xr-x | lib/dfatool.py | 267 |
1 files changed, 10 insertions, 257 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py index 896dc12..0b2260d 100755 --- a/lib/dfatool.py +++ b/lib/dfatool.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import csv -from itertools import chain, combinations import io import json import numpy as np @@ -13,6 +12,9 @@ import struct import sys import tarfile from multiprocessing import Pool +from automata import PTA +from functions import analytic +from utils import is_numeric arg_support_enabled = True @@ -20,15 +22,6 @@ def running_mean(x, N): cumsum = np.cumsum(np.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / N -def is_numeric(n): - if n == None: - return False - try: - int(n) - return True - except ValueError: - return False - def soft_cast_int(n): if n == None or n == '': return None @@ -145,10 +138,6 @@ def regression_measures(predicted, actual): return measures -def powerset(iterable): - s = list(iterable) - return chain.from_iterable(combinations(s, r) for r in range(len(s)+1)) - class Keysight: def __init__(self): @@ -448,248 +437,6 @@ def _param_slice_eq(a, b, index): return True return False -class ParamFunction: - - def __init__(self, param_function, validation_function, num_vars): - self._param_function = param_function - self._validation_function = validation_function - self._num_variables = num_vars - - def is_valid(self, arg): - return self._validation_function(arg) - - def eval(self, param, args): - return self._param_function(param, args) - - def error_function(self, P, X, y): - return self._param_function(P, X) - y - -class AnalyticFunction: - - def __init__(self, function_str, parameters, num_args, verbose = True, regression_args = None): - self._parameter_names = parameters - self._num_args = num_args - self._model_str = function_str - rawfunction = function_str - self._dependson = [False] * (len(parameters) + num_args) - self.fit_success = False - self.verbose = verbose - - if type(function_str) == str: - num_vars_re = re.compile(r'regression_arg\(([0-9]+)\)') - num_vars = max(map(int, num_vars_re.findall(function_str))) + 1 - for i in range(len(parameters)): - if rawfunction.find('parameter({})'.format(parameters[i])) >= 0: - self._dependson[i] = True - rawfunction = rawfunction.replace('parameter({})'.format(parameters[i]), 'model_param[{:d}]'.format(i)) - for i in range(0, num_args): - if rawfunction.find('function_arg({:d})'.format(i)) >= 0: - self._dependson[len(parameters) + i] = True - rawfunction = rawfunction.replace('function_arg({:d})'.format(i), 'model_param[{:d}]'.format(len(parameters) + i)) - for i in range(num_vars): - rawfunction = rawfunction.replace('regression_arg({:d})'.format(i), 'reg_param[{:d}]'.format(i)) - self._function_str = rawfunction - self._function = eval('lambda reg_param, model_param: ' + rawfunction) - else: - self._function_str = 'raise ValueError' - self._function = function_str - - if regression_args: - self._regression_args = regression_args.copy() - self._fit_success = True - elif type(function_str) == str: - self._regression_args = list(np.ones((num_vars))) - else: - self._regression_args = None - - def get_fit_data(self, by_param, state_or_tran, model_attribute): - dimension = len(self._parameter_names) + self._num_args - X = [[] for i in range(dimension)] - Y = [] - - num_valid = 0 - num_total = 0 - - for key, val in by_param.items(): - if key[0] == state_or_tran and len(key[1]) == dimension: - valid = True - num_total += 1 - for i in range(dimension): - if self._dependson[i] and not is_numeric(key[1][i]): - valid = False - if valid: - num_valid += 1 - Y.extend(val[model_attribute]) - for i in range(dimension): - if self._dependson[i]: - X[i].extend([float(key[1][i])] * len(val[model_attribute])) - else: - X[i].extend([np.nan] * len(val[model_attribute])) - elif key[0] == state_or_tran and len(key[1]) != dimension: - vprint(self.verbose, '[W] Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.'.format(state_or_tran, model_attribute, len(key[1]), dimension)) - X = np.array(X) - Y = np.array(Y) - - return X, Y, num_valid, num_total - - def fit(self, by_param, state_or_tran, model_attribute): - X, Y, num_valid, num_total = self.get_fit_data(by_param, state_or_tran, model_attribute) - if num_valid > 2: - error_function = lambda P, X, y: self._function(P, X) - y - try: - res = optimize.least_squares(error_function, self._regression_args, args=(X, Y), xtol=2e-15) - except ValueError as err: - vprint(self.verbose, '[W] Fit failed for {}/{}: {} (function: {})'.format(state_or_tran, model_attribute, err, self._model_str)) - return - if res.status > 0: - self._regression_args = res.x - self.fit_success = True - else: - vprint(self.verbose, '[W] Fit failed for {}/{}: {} (function: {})'.format(state_or_tran, model_attribute, res.message, self._model_str)) - else: - vprint(self.verbose, '[W] Insufficient amount of valid parameter keys, cannot fit {}/{}'.format(state_or_tran, model_attribute)) - - def is_predictable(self, param_list): - for i, param in enumerate(param_list): - if self._dependson[i] and not is_numeric(param): - return False - return True - - def eval(self, param_list, arg_list = []): - if self._regression_args == None: - return self._function(param_list, arg_list) - return self._function(self._regression_args, param_list) - -class analytic: - _num0_8 = np.vectorize(lambda x: 8 - bin(int(x)).count("1")) - _num0_16 = np.vectorize(lambda x: 16 - bin(int(x)).count("1")) - _num1 = np.vectorize(lambda x: bin(int(x)).count("1")) - _safe_log = np.vectorize(lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 1.) - _safe_inv = np.vectorize(lambda x: 1 / x if np.abs(x) > 0.001 else 1.) - _safe_sqrt = np.vectorize(lambda x: np.sqrt(np.abs(x))) - - _function_map = { - 'linear' : lambda x: x, - 'logarithmic' : np.log, - 'logarithmic1' : lambda x: np.log(x + 1), - 'exponential' : np.exp, - 'square' : lambda x : x ** 2, - 'inverse' : lambda x : 1 / x, - 'sqrt' : lambda x: np.sqrt(np.abs(x)), - 'num0_8' : _num0_8, - 'num0_16' : _num0_16, - 'num1' : _num1, - 'safe_log' : lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 1., - 'safe_inv' : lambda x: 1 / x if np.abs(x) > 0.001 else 1., - 'safe_sqrt': lambda x: np.sqrt(np.abs(x)), - } - - def functions(safe_functions_enabled = False): - functions = { - 'linear' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param, - lambda model_param: True, - 2 - ), - 'logarithmic' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.log(model_param), - lambda model_param: model_param > 0, - 2 - ), - 'logarithmic1' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.log(model_param + 1), - lambda model_param: model_param > -1, - 2 - ), - 'exponential' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.exp(model_param), - lambda model_param: model_param <= 64, - 2 - ), - #'polynomial' : lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param + reg_param[2] * model_param ** 2, - 'square' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param ** 2, - lambda model_param: True, - 2 - ), - 'inverse' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] / model_param, - lambda model_param: model_param != 0, - 2 - ), - 'sqrt' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.sqrt(model_param), - lambda model_param: model_param >= 0, - 2 - ), - 'num0_8' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num0_8(model_param), - lambda model_param: True, - 2 - ), - 'num0_16' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num0_16(model_param), - lambda model_param: True, - 2 - ), - 'num1' : ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num1(model_param), - lambda model_param: True, - 2 - ), - } - - if safe_functions_enabled: - functions['safe_log'] = ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_log(model_param), - lambda model_param: True, - 2 - ) - functions['safe_inv'] = ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_inv(model_param), - lambda model_param: True, - 2 - ) - functions['safe_sqrt'] = ParamFunction( - lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_sqrt(model_param), - lambda model_param: True, - 2 - ) - - return functions - - def _fmap(reference_type, reference_name, function_type): - ref_str = '{}({})'.format(reference_type,reference_name) - if function_type == 'linear': - return ref_str - if function_type == 'logarithmic': - return 'np.log({})'.format(ref_str) - if function_type == 'logarithmic1': - return 'np.log({} + 1)'.format(ref_str) - if function_type == 'exponential': - return 'np.exp({})'.format(ref_str) - if function_type == 'exponential': - return 'np.exp({})'.format(ref_str) - if function_type == 'square': - return '({})**2'.format(ref_str) - if function_type == 'inverse': - return '1/({})'.format(ref_str) - if function_type == 'sqrt': - return 'np.sqrt({})'.format(ref_str) - return 'analytic._{}({})'.format(function_type, ref_str) - - def function_powerset(function_descriptions, parameter_names, num_args): - buf = '0' - arg_idx = 0 - for combination in powerset(function_descriptions.items()): - buf += ' + regression_arg({:d})'.format(arg_idx) - arg_idx += 1 - for function_item in combination: - if arg_support_enabled and is_numeric(function_item[0]): - buf += ' * {}'.format(analytic._fmap('function_arg', function_item[0], function_item[1]['best'])) - else: - buf += ' * {}'.format(analytic._fmap('parameter', function_item[0], function_item[1]['best'])) - return AnalyticFunction(buf, parameter_names, num_args) def _try_fits_parallel(arg): return { @@ -837,7 +584,7 @@ def _corr_by_param(by_name, state_or_trans, key, param_index): class EnergyModel: - def __init__(self, preprocessed_data, ignore_trace_indexes = None, discard_outliers = None, function_override = {}, verbose = True, use_corrcoef = False): + def __init__(self, preprocessed_data, ignore_trace_indexes = None, discard_outliers = None, function_override = {}, verbose = True, use_corrcoef = False, hwmodel = None): self.traces = preprocessed_data self.by_name = {} self.by_param = {} @@ -851,6 +598,7 @@ class EnergyModel: self._use_corrcoef = use_corrcoef self.function_override = function_override self.verbose = verbose + self.hwmodel = hwmodel if discard_outliers != None: self._compute_outlier_stats(ignore_trace_indexes, discard_outliers) for run in self.traces: @@ -1158,6 +906,11 @@ class EnergyModel: return model_getter, info_getter + def to_json(self): + _, param_info = self.get_fitted() + pta = PTA.from_json(self.hwmodel) + pta.update(param_info) + return pta.to_json() def states(self): return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'state', self.by_name.keys()))) |