summaryrefslogtreecommitdiff
path: root/lib/dfatool.py
diff options
context:
space:
mode:
authorDaniel Friesel <derf@finalrewind.org>2018-04-26 10:02:10 +0200
committerDaniel Friesel <derf@finalrewind.org>2018-04-26 10:02:10 +0200
commit3992ec39ee64460e3555e9ba157932dffb74042b (patch)
treea8065bd97ce5a8323a94ae2b3d06d99fb0ab42b1 /lib/dfatool.py
parent4c220e1b4e029d2130789c16b814143e4dcc00b8 (diff)
refactor function code into separate file to solve circular import
Diffstat (limited to 'lib/dfatool.py')
-rwxr-xr-xlib/dfatool.py267
1 files changed, 10 insertions, 257 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py
index 896dc12..0b2260d 100755
--- a/lib/dfatool.py
+++ b/lib/dfatool.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
import csv
-from itertools import chain, combinations
import io
import json
import numpy as np
@@ -13,6 +12,9 @@ import struct
import sys
import tarfile
from multiprocessing import Pool
+from automata import PTA
+from functions import analytic
+from utils import is_numeric
arg_support_enabled = True
@@ -20,15 +22,6 @@ def running_mean(x, N):
cumsum = np.cumsum(np.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / N
-def is_numeric(n):
- if n == None:
- return False
- try:
- int(n)
- return True
- except ValueError:
- return False
-
def soft_cast_int(n):
if n == None or n == '':
return None
@@ -145,10 +138,6 @@ def regression_measures(predicted, actual):
return measures
-def powerset(iterable):
- s = list(iterable)
- return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
-
class Keysight:
def __init__(self):
@@ -448,248 +437,6 @@ def _param_slice_eq(a, b, index):
return True
return False
-class ParamFunction:
-
- def __init__(self, param_function, validation_function, num_vars):
- self._param_function = param_function
- self._validation_function = validation_function
- self._num_variables = num_vars
-
- def is_valid(self, arg):
- return self._validation_function(arg)
-
- def eval(self, param, args):
- return self._param_function(param, args)
-
- def error_function(self, P, X, y):
- return self._param_function(P, X) - y
-
-class AnalyticFunction:
-
- def __init__(self, function_str, parameters, num_args, verbose = True, regression_args = None):
- self._parameter_names = parameters
- self._num_args = num_args
- self._model_str = function_str
- rawfunction = function_str
- self._dependson = [False] * (len(parameters) + num_args)
- self.fit_success = False
- self.verbose = verbose
-
- if type(function_str) == str:
- num_vars_re = re.compile(r'regression_arg\(([0-9]+)\)')
- num_vars = max(map(int, num_vars_re.findall(function_str))) + 1
- for i in range(len(parameters)):
- if rawfunction.find('parameter({})'.format(parameters[i])) >= 0:
- self._dependson[i] = True
- rawfunction = rawfunction.replace('parameter({})'.format(parameters[i]), 'model_param[{:d}]'.format(i))
- for i in range(0, num_args):
- if rawfunction.find('function_arg({:d})'.format(i)) >= 0:
- self._dependson[len(parameters) + i] = True
- rawfunction = rawfunction.replace('function_arg({:d})'.format(i), 'model_param[{:d}]'.format(len(parameters) + i))
- for i in range(num_vars):
- rawfunction = rawfunction.replace('regression_arg({:d})'.format(i), 'reg_param[{:d}]'.format(i))
- self._function_str = rawfunction
- self._function = eval('lambda reg_param, model_param: ' + rawfunction)
- else:
- self._function_str = 'raise ValueError'
- self._function = function_str
-
- if regression_args:
- self._regression_args = regression_args.copy()
- self._fit_success = True
- elif type(function_str) == str:
- self._regression_args = list(np.ones((num_vars)))
- else:
- self._regression_args = None
-
- def get_fit_data(self, by_param, state_or_tran, model_attribute):
- dimension = len(self._parameter_names) + self._num_args
- X = [[] for i in range(dimension)]
- Y = []
-
- num_valid = 0
- num_total = 0
-
- for key, val in by_param.items():
- if key[0] == state_or_tran and len(key[1]) == dimension:
- valid = True
- num_total += 1
- for i in range(dimension):
- if self._dependson[i] and not is_numeric(key[1][i]):
- valid = False
- if valid:
- num_valid += 1
- Y.extend(val[model_attribute])
- for i in range(dimension):
- if self._dependson[i]:
- X[i].extend([float(key[1][i])] * len(val[model_attribute]))
- else:
- X[i].extend([np.nan] * len(val[model_attribute]))
- elif key[0] == state_or_tran and len(key[1]) != dimension:
- vprint(self.verbose, '[W] Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.'.format(state_or_tran, model_attribute, len(key[1]), dimension))
- X = np.array(X)
- Y = np.array(Y)
-
- return X, Y, num_valid, num_total
-
- def fit(self, by_param, state_or_tran, model_attribute):
- X, Y, num_valid, num_total = self.get_fit_data(by_param, state_or_tran, model_attribute)
- if num_valid > 2:
- error_function = lambda P, X, y: self._function(P, X) - y
- try:
- res = optimize.least_squares(error_function, self._regression_args, args=(X, Y), xtol=2e-15)
- except ValueError as err:
- vprint(self.verbose, '[W] Fit failed for {}/{}: {} (function: {})'.format(state_or_tran, model_attribute, err, self._model_str))
- return
- if res.status > 0:
- self._regression_args = res.x
- self.fit_success = True
- else:
- vprint(self.verbose, '[W] Fit failed for {}/{}: {} (function: {})'.format(state_or_tran, model_attribute, res.message, self._model_str))
- else:
- vprint(self.verbose, '[W] Insufficient amount of valid parameter keys, cannot fit {}/{}'.format(state_or_tran, model_attribute))
-
- def is_predictable(self, param_list):
- for i, param in enumerate(param_list):
- if self._dependson[i] and not is_numeric(param):
- return False
- return True
-
- def eval(self, param_list, arg_list = []):
- if self._regression_args == None:
- return self._function(param_list, arg_list)
- return self._function(self._regression_args, param_list)
-
-class analytic:
- _num0_8 = np.vectorize(lambda x: 8 - bin(int(x)).count("1"))
- _num0_16 = np.vectorize(lambda x: 16 - bin(int(x)).count("1"))
- _num1 = np.vectorize(lambda x: bin(int(x)).count("1"))
- _safe_log = np.vectorize(lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 1.)
- _safe_inv = np.vectorize(lambda x: 1 / x if np.abs(x) > 0.001 else 1.)
- _safe_sqrt = np.vectorize(lambda x: np.sqrt(np.abs(x)))
-
- _function_map = {
- 'linear' : lambda x: x,
- 'logarithmic' : np.log,
- 'logarithmic1' : lambda x: np.log(x + 1),
- 'exponential' : np.exp,
- 'square' : lambda x : x ** 2,
- 'inverse' : lambda x : 1 / x,
- 'sqrt' : lambda x: np.sqrt(np.abs(x)),
- 'num0_8' : _num0_8,
- 'num0_16' : _num0_16,
- 'num1' : _num1,
- 'safe_log' : lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 1.,
- 'safe_inv' : lambda x: 1 / x if np.abs(x) > 0.001 else 1.,
- 'safe_sqrt': lambda x: np.sqrt(np.abs(x)),
- }
-
- def functions(safe_functions_enabled = False):
- functions = {
- 'linear' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param,
- lambda model_param: True,
- 2
- ),
- 'logarithmic' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.log(model_param),
- lambda model_param: model_param > 0,
- 2
- ),
- 'logarithmic1' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.log(model_param + 1),
- lambda model_param: model_param > -1,
- 2
- ),
- 'exponential' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.exp(model_param),
- lambda model_param: model_param <= 64,
- 2
- ),
- #'polynomial' : lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param + reg_param[2] * model_param ** 2,
- 'square' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * model_param ** 2,
- lambda model_param: True,
- 2
- ),
- 'inverse' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] / model_param,
- lambda model_param: model_param != 0,
- 2
- ),
- 'sqrt' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * np.sqrt(model_param),
- lambda model_param: model_param >= 0,
- 2
- ),
- 'num0_8' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num0_8(model_param),
- lambda model_param: True,
- 2
- ),
- 'num0_16' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num0_16(model_param),
- lambda model_param: True,
- 2
- ),
- 'num1' : ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._num1(model_param),
- lambda model_param: True,
- 2
- ),
- }
-
- if safe_functions_enabled:
- functions['safe_log'] = ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_log(model_param),
- lambda model_param: True,
- 2
- )
- functions['safe_inv'] = ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_inv(model_param),
- lambda model_param: True,
- 2
- )
- functions['safe_sqrt'] = ParamFunction(
- lambda reg_param, model_param: reg_param[0] + reg_param[1] * analytic._safe_sqrt(model_param),
- lambda model_param: True,
- 2
- )
-
- return functions
-
- def _fmap(reference_type, reference_name, function_type):
- ref_str = '{}({})'.format(reference_type,reference_name)
- if function_type == 'linear':
- return ref_str
- if function_type == 'logarithmic':
- return 'np.log({})'.format(ref_str)
- if function_type == 'logarithmic1':
- return 'np.log({} + 1)'.format(ref_str)
- if function_type == 'exponential':
- return 'np.exp({})'.format(ref_str)
- if function_type == 'exponential':
- return 'np.exp({})'.format(ref_str)
- if function_type == 'square':
- return '({})**2'.format(ref_str)
- if function_type == 'inverse':
- return '1/({})'.format(ref_str)
- if function_type == 'sqrt':
- return 'np.sqrt({})'.format(ref_str)
- return 'analytic._{}({})'.format(function_type, ref_str)
-
- def function_powerset(function_descriptions, parameter_names, num_args):
- buf = '0'
- arg_idx = 0
- for combination in powerset(function_descriptions.items()):
- buf += ' + regression_arg({:d})'.format(arg_idx)
- arg_idx += 1
- for function_item in combination:
- if arg_support_enabled and is_numeric(function_item[0]):
- buf += ' * {}'.format(analytic._fmap('function_arg', function_item[0], function_item[1]['best']))
- else:
- buf += ' * {}'.format(analytic._fmap('parameter', function_item[0], function_item[1]['best']))
- return AnalyticFunction(buf, parameter_names, num_args)
def _try_fits_parallel(arg):
return {
@@ -837,7 +584,7 @@ def _corr_by_param(by_name, state_or_trans, key, param_index):
class EnergyModel:
- def __init__(self, preprocessed_data, ignore_trace_indexes = None, discard_outliers = None, function_override = {}, verbose = True, use_corrcoef = False):
+ def __init__(self, preprocessed_data, ignore_trace_indexes = None, discard_outliers = None, function_override = {}, verbose = True, use_corrcoef = False, hwmodel = None):
self.traces = preprocessed_data
self.by_name = {}
self.by_param = {}
@@ -851,6 +598,7 @@ class EnergyModel:
self._use_corrcoef = use_corrcoef
self.function_override = function_override
self.verbose = verbose
+ self.hwmodel = hwmodel
if discard_outliers != None:
self._compute_outlier_stats(ignore_trace_indexes, discard_outliers)
for run in self.traces:
@@ -1158,6 +906,11 @@ class EnergyModel:
return model_getter, info_getter
+ def to_json(self):
+ _, param_info = self.get_fitted()
+ pta = PTA.from_json(self.hwmodel)
+ pta.update(param_info)
+ return pta.to_json()
def states(self):
return sorted(list(filter(lambda k: self.by_name[k]['isa'] == 'state', self.by_name.keys())))