summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-07-06 11:47:05 +0200
committerDaniel Friesel <daniel.friesel@uos.de>2020-07-06 11:47:05 +0200
commitd7ca9acbb668d4c73f07eddf0278c08bbdae7be7 (patch)
tree655b6aac65e5a553c9e0228778fe8f83c305ec04
parent1406e32aaa0466f5e43d270b0b10e54702210769 (diff)
Move ParamFit, PTAModel, AnalyticModel to model.py module
-rwxr-xr-xbin/analyze-archive.py3
-rwxr-xr-xbin/analyze-timing.py3
-rwxr-xr-xbin/eval-online-model-accuracy.py2
-rwxr-xr-xbin/eval-outlier-removal.py3
-rwxr-xr-xbin/eval-rel-energy.py3
-rwxr-xr-xbin/gptest.py3
-rwxr-xr-xbin/mimosa-etv163
-rwxr-xr-xbin/test_corrcoef.py3
-rw-r--r--lib/dfatool.py1171
-rw-r--r--lib/model.py1186
-rwxr-xr-xtest/test_parameters.py6
-rwxr-xr-xtest/test_ptamodel.py3
-rwxr-xr-xtest/test_timingharness.py3
13 files changed, 1310 insertions, 1242 deletions
diff --git a/bin/analyze-archive.py b/bin/analyze-archive.py
index bf9e511..e9d70f6 100755
--- a/bin/analyze-archive.py
+++ b/bin/analyze-archive.py
@@ -113,8 +113,9 @@ import random
import re
import sys
from dfatool import plotter
-from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate
+from dfatool.dfatool import RawData, pta_trace_to_aggregate
from dfatool.dfatool import gplearn_to_function
+from dfatool.model import PTAModel
from dfatool.validation import CrossValidator
from dfatool.utils import filter_aggregate_by_param
from dfatool.automata import PTA
diff --git a/bin/analyze-timing.py b/bin/analyze-timing.py
index 8c7ee5b..9271787 100755
--- a/bin/analyze-timing.py
+++ b/bin/analyze-timing.py
@@ -78,8 +78,9 @@ import json
import re
import sys
from dfatool import plotter
-from dfatool.dfatool import AnalyticModel, TimingData, pta_trace_to_aggregate
+from dfatool.dfatool import TimingData, pta_trace_to_aggregate
from dfatool.dfatool import gplearn_to_function
+from dfatool.model import AnalyticModel
from dfatool.validation import CrossValidator
from dfatool.utils import filter_aggregate_by_param
from dfatool.parameters import prune_dependent_parameters
diff --git a/bin/eval-online-model-accuracy.py b/bin/eval-online-model-accuracy.py
index 202ac28..97fd8e2 100755
--- a/bin/eval-online-model-accuracy.py
+++ b/bin/eval-online-model-accuracy.py
@@ -28,7 +28,7 @@ import itertools
import yaml
from dfatool.automata import PTA
from dfatool.codegen import get_simulated_accountingmethod
-from dfatool.dfatool import regression_measures
+from dfatool.model import regression_measures
import numpy as np
opt = dict()
diff --git a/bin/eval-outlier-removal.py b/bin/eval-outlier-removal.py
index b091ea4..b81c33a 100755
--- a/bin/eval-outlier-removal.py
+++ b/bin/eval-outlier-removal.py
@@ -3,7 +3,8 @@
import getopt
import re
import sys
-from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate
+from dfatool.dfatool import RawData, pta_trace_to_aggregate
+from dfatool.model import PTAModel
opt = dict()
diff --git a/bin/eval-rel-energy.py b/bin/eval-rel-energy.py
index 8a2be13..2af2cff 100755
--- a/bin/eval-rel-energy.py
+++ b/bin/eval-rel-energy.py
@@ -3,7 +3,8 @@
import getopt
import re
import sys
-from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate
+from dfatool.dfatool import RawData, pta_trace_to_aggregate
+from dfatool.model import PTAModel
opt = dict()
diff --git a/bin/gptest.py b/bin/gptest.py
index 82b4575..bcfb7aa 100755
--- a/bin/gptest.py
+++ b/bin/gptest.py
@@ -3,11 +3,10 @@
import sys
import numpy as np
from dfatool.dfatool import (
- PTAModel,
RawData,
- regression_measures,
pta_trace_to_aggregate,
)
+from dfatool.model import PTAModel, regression_measures
from gplearn.genetic import SymbolicRegressor
from multiprocessing import Pool
diff --git a/bin/mimosa-etv b/bin/mimosa-etv
index e23b46c..431e275 100755
--- a/bin/mimosa-etv
+++ b/bin/mimosa-etv
@@ -8,13 +8,16 @@ import numpy as np
import os
import re
import sys
-from dfatool.dfatool import aggregate_measures, MIMOSA
+from dfatool.dfatool import MIMOSA
+from dfatool.model import aggregate_measures
from dfatool.utils import running_mean
opt = dict()
+
def show_help():
- print('''mimosa-etv - MIMOSA Analyzer and Visualizer
+ print(
+ """mimosa-etv - MIMOSA Analyzer and Visualizer
USAGE
@@ -41,7 +44,9 @@ OPTIONS
Show power/time plot
--stat
Show mean voltage, current, and power as well as total energy consumption.
- ''')
+ """
+ )
+
def peak_search(data, lower, upper, direction_function):
while upper - lower > 1e-6:
@@ -58,6 +63,7 @@ def peak_search(data, lower, upper, direction_function):
upper = bs_test
return None
+
def peak_search2(data, lower, upper, check_function):
for power in np.arange(lower, upper, 1e-6):
peakcount = itertools.groupby(data, lambda x: x >= power)
@@ -67,38 +73,39 @@ def peak_search2(data, lower, upper, check_function):
return power
return None
-if __name__ == '__main__':
+
+if __name__ == "__main__":
try:
- optspec = ('help skip= threshold= threshold-peakcount= plot stat')
- raw_opts, args = getopt.getopt(sys.argv[1:], "", optspec.split(' '))
+ optspec = "help skip= threshold= threshold-peakcount= plot stat"
+ raw_opts, args = getopt.getopt(sys.argv[1:], "", optspec.split(" "))
for option, parameter in raw_opts:
- optname = re.sub(r'^--', '', option)
+ optname = re.sub(r"^--", "", option)
opt[optname] = parameter
- if 'help' in opt:
+ if "help" in opt:
show_help()
sys.exit(0)
- if 'skip' in opt:
- opt['skip'] = int(opt['skip'])
+ if "skip" in opt:
+ opt["skip"] = int(opt["skip"])
else:
- opt['skip'] = 0
+ opt["skip"] = 0
- if 'threshold' in opt and opt['threshold'] != 'mean':
- opt['threshold'] = float(opt['threshold'])
+ if "threshold" in opt and opt["threshold"] != "mean":
+ opt["threshold"] = float(opt["threshold"])
- if 'threshold-peakcount' in opt:
- opt['threshold-peakcount'] = int(opt['threshold-peakcount'])
+ if "threshold-peakcount" in opt:
+ opt["threshold-peakcount"] = int(opt["threshold-peakcount"])
except getopt.GetoptError as err:
print(err)
sys.exit(2)
except IndexError:
- print('Usage: mimosa-etv <duration>')
+ print("Usage: mimosa-etv <duration>")
sys.exit(2)
except ValueError:
- print('Error: duration or skip is not a number')
+ print("Error: duration or skip is not a number")
sys.exit(2)
voltage, shunt, inputfile = args
@@ -110,7 +117,7 @@ if __name__ == '__main__':
currents = mim.charge_to_current_nocal(charges) * 1e-6
powers = currents * voltage
- if 'threshold-peakcount' in opt:
+ if "threshold-peakcount" in opt:
bs_mean = np.mean(powers)
# Finding the correct threshold is tricky. If #peaks < peakcont, our
@@ -126,42 +133,59 @@ if __name__ == '__main__':
# #peaks != peakcount and threshold >= mean, we go down.
# If that doesn't work, we fall back to a linear search in 1 µW steps
def direction_function(peakcount, power):
- if peakcount == opt['threshold-peakcount']:
+ if peakcount == opt["threshold-peakcount"]:
return 0
if power < bs_mean:
return 1
return -1
+
threshold = peak_search(power, np.min(power), np.max(power), direction_function)
if threshold == None:
- threshold = peak_search2(power, np.min(power), np.max(power), direction_function)
+ threshold = peak_search2(
+ power, np.min(power), np.max(power), direction_function
+ )
if threshold != None:
- print('Threshold set to {:.0f} µW : {:.9f}'.format(threshold * 1e6, threshold))
- opt['threshold'] = threshold
+ print(
+ "Threshold set to {:.0f} µW : {:.9f}".format(
+ threshold * 1e6, threshold
+ )
+ )
+ opt["threshold"] = threshold
else:
- print('Found no working threshold')
+ print("Found no working threshold")
- if 'threshold' in opt:
- if opt['threshold'] == 'mean':
- opt['threshold'] = np.mean(powers)
- print('Threshold set to {:.0f} µW : {:.9f}'.format(opt['threshold'] * 1e6, opt['threshold']))
+ if "threshold" in opt:
+ if opt["threshold"] == "mean":
+ opt["threshold"] = np.mean(powers)
+ print(
+ "Threshold set to {:.0f} µW : {:.9f}".format(
+ opt["threshold"] * 1e6, opt["threshold"]
+ )
+ )
baseline_mean = 0
- if np.any(powers < opt['threshold']):
- baseline_mean = np.mean(powers[powers < opt['threshold']])
- print('Baseline mean: {:.0f} µW : {:.9f}'.format(
- baseline_mean * 1e6, baseline_mean))
- if np.any(powers >= opt['threshold']):
- print('Peak mean: {:.0f} µW : {:.9f}'.format(
- np.mean(powers[powers >= opt['threshold']]) * 1e6,
- np.mean(powers[powers >= opt['threshold']])))
+ if np.any(powers < opt["threshold"]):
+ baseline_mean = np.mean(powers[powers < opt["threshold"]])
+ print(
+ "Baseline mean: {:.0f} µW : {:.9f}".format(
+ baseline_mean * 1e6, baseline_mean
+ )
+ )
+ if np.any(powers >= opt["threshold"]):
+ print(
+ "Peak mean: {:.0f} µW : {:.9f}".format(
+ np.mean(powers[powers >= opt["threshold"]]) * 1e6,
+ np.mean(powers[powers >= opt["threshold"]]),
+ )
+ )
peaks = []
peak_start = -1
for i, dp in enumerate(powers):
- if dp >= opt['threshold'] and peak_start == -1:
+ if dp >= opt["threshold"] and peak_start == -1:
peak_start = i
- elif dp < opt['threshold'] and peak_start != -1:
+ elif dp < opt["threshold"] and peak_start != -1:
peaks.append((peak_start, i))
peak_start = -1
@@ -170,32 +194,55 @@ if __name__ == '__main__':
for peak in peaks:
duration = (peak[1] - peak[0]) * 1e-5
total_energy += np.mean(powers[peak[0] : peak[1]]) * duration
- delta_energy += (np.mean(powers[peak[0] : peak[1]]) - baseline_mean) * duration
+ delta_energy += (
+ np.mean(powers[peak[0] : peak[1]]) - baseline_mean
+ ) * duration
delta_powers = powers[peak[0] : peak[1]] - baseline_mean
- print('{:.2f}ms peak ({:f} -> {:f})'.format(duration * 1000,
- peak[0], peak[1]))
- print(' {:f} µJ / mean {:f} µW'.format(
- np.mean(powers[peak[0] : peak[1]]) * duration * 1e6,
- np.mean(powers[peak[0] : peak[1]]) * 1e6 ))
+ print(
+ "{:.2f}ms peak ({:f} -> {:f})".format(duration * 1000, peak[0], peak[1])
+ )
+ print(
+ " {:f} µJ / mean {:f} µW".format(
+ np.mean(powers[peak[0] : peak[1]]) * duration * 1e6,
+ np.mean(powers[peak[0] : peak[1]]) * 1e6,
+ )
+ )
measures = aggregate_measures(np.mean(delta_powers), delta_powers)
- print(' {:f} µW delta mean = {:0.1f}% / {:f} µW error'.format(np.mean(delta_powers) * 1e6, measures['smape'], measures['rmsd'] * 1e6 ))
- print('Peak energy mean: {:.0f} µJ : {:.9f}'.format(
- total_energy * 1e6 / len(peaks), total_energy / len(peaks)))
- print('Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}'.format(
- delta_energy * 1e6 / len(peaks), delta_energy / len(peaks)))
-
-
- if 'stat' in opt:
+ print(
+ " {:f} µW delta mean = {:0.1f}% / {:f} µW error".format(
+ np.mean(delta_powers) * 1e6,
+ measures["smape"],
+ measures["rmsd"] * 1e6,
+ )
+ )
+ print(
+ "Peak energy mean: {:.0f} µJ : {:.9f}".format(
+ total_energy * 1e6 / len(peaks), total_energy / len(peaks)
+ )
+ )
+ print(
+ "Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}".format(
+ delta_energy * 1e6 / len(peaks), delta_energy / len(peaks)
+ )
+ )
+
+ if "stat" in opt:
mean_current = np.mean(currents)
mean_power = np.mean(powers)
- print('Mean current: {:.0f} µA : {:.9f}'.format(mean_current * 1e6, mean_current))
- print('Mean power: {:.0f} µW : {:.9f}'.format(mean_power * 1e6, mean_power))
-
- if 'plot' in opt:
+ print(
+ "Mean current: {:.0f} µA : {:.9f}".format(
+ mean_current * 1e6, mean_current
+ )
+ )
+ print(
+ "Mean power: {:.0f} µW : {:.9f}".format(mean_power * 1e6, mean_power)
+ )
+
+ if "plot" in opt:
timestamps = np.arange(len(powers)) * 1e-5
- pwrhandle, = plt.plot(timestamps, powers, 'b-', label='U*I', markersize=1)
+ (pwrhandle,) = plt.plot(timestamps, powers, "b-", label="U*I", markersize=1)
plt.legend(handles=[pwrhandle])
- plt.xlabel('Time [s]')
- plt.ylabel('Power [W]')
+ plt.xlabel("Time [s]")
+ plt.ylabel("Power [W]")
plt.grid(True)
plt.show()
diff --git a/bin/test_corrcoef.py b/bin/test_corrcoef.py
index 0b1ca54..75b5d0d 100755
--- a/bin/test_corrcoef.py
+++ b/bin/test_corrcoef.py
@@ -4,8 +4,9 @@ import getopt
import re
import sys
from dfatool import plotter
-from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate
+from dfatool.dfatool import RawData, pta_trace_to_aggregate
from dfatool.dfatool import gplearn_to_function
+from dfatool.model import PTAModel
opt = dict()
diff --git a/lib/dfatool.py b/lib/dfatool.py
index 392f5a6..20e198d 100644
--- a/lib/dfatool.py
+++ b/lib/dfatool.py
@@ -92,77 +92,6 @@ def mean_or_none(arr):
return -1
-def aggregate_measures(aggregate: float, actual: list) -> dict:
- """
- Calculate error measures for model value on data list.
-
- arguments:
- aggregate -- model value (float or int)
- actual -- real-world / reference values (list of float or int)
-
- return value:
- See regression_measures
- """
- aggregate_array = np.array([aggregate] * len(actual))
- return regression_measures(aggregate_array, np.array(actual))
-
-
-def regression_measures(predicted: np.ndarray, actual: np.ndarray):
- """
- Calculate error measures by comparing model values to reference values.
-
- arguments:
- predicted -- model values (np.ndarray)
- actual -- real-world / reference values (np.ndarray)
-
- Returns a dict containing the following measures:
- mae -- Mean Absolute Error
- mape -- Mean Absolute Percentage Error,
- if all items in actual are non-zero (NaN otherwise)
- smape -- Symmetric Mean Absolute Percentage Error,
- if no 0,0-pairs are present in actual and predicted (NaN otherwise)
- msd -- Mean Square Deviation
- rmsd -- Root Mean Square Deviation
- ssr -- Sum of Squared Residuals
- rsq -- R^2 measure, see sklearn.metrics.r2_score
- count -- Number of values
- """
- if type(predicted) != np.ndarray:
- raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
- if type(actual) != np.ndarray:
- raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
- deviations = predicted - actual
- # mean = np.mean(actual)
- if len(deviations) == 0:
- return {}
- measures = {
- "mae": np.mean(np.abs(deviations), dtype=np.float64),
- "msd": np.mean(deviations ** 2, dtype=np.float64),
- "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
- "ssr": np.sum(deviations ** 2, dtype=np.float64),
- "rsq": r2_score(actual, predicted),
- "count": len(actual),
- }
-
- # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
-
- if np.all(actual != 0):
- measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
- else:
- measures["mape"] = np.nan
- if np.all(np.abs(predicted) + np.abs(actual) != 0):
- measures["smape"] = (
- np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
- * 100
- )
- else:
- measures["smape"] = np.nan
- # if np.all(rsq_quotient != 0):
- # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
-
- return measures
-
-
class KeysightCSV:
"""Simple loader for Keysight CSV data, as exported by the windows software."""
@@ -1191,581 +1120,6 @@ class RawData:
}
-class ParallelParamFit:
- """
- Fit a set of functions on parameterized measurements.
-
- One parameter is variale, all others are fixed. Reports the best-fitting
- function type for each parameter.
- """
-
- def __init__(self, by_param):
- """Create a new ParallelParamFit object."""
- self.fit_queue = []
- self.by_param = by_param
-
- def enqueue(
- self,
- state_or_tran,
- attribute,
- param_index,
- param_name,
- safe_functions_enabled=False,
- param_filter=None,
- ):
- """
- Add state_or_tran/attribute/param_name to fit queue.
-
- This causes fit() to compute the best-fitting function for this model part.
- """
- self.fit_queue.append(
- {
- "key": [state_or_tran, attribute, param_name, param_filter],
- "args": [
- self.by_param,
- state_or_tran,
- attribute,
- param_index,
- safe_functions_enabled,
- param_filter,
- ],
- }
- )
-
- def fit(self):
- """
- Fit functions on previously enqueue data.
-
- Fitting is one in parallel with one process per core.
-
- Results can be accessed using the public ParallelParamFit.results object.
- """
- with Pool() as pool:
- self.results = pool.map(_try_fits_parallel, self.fit_queue)
-
- def get_result(self, name, attribute, param_filter: dict = None):
- """
- Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
-
- Filters out results where the best function is worse (or not much better than) static mean/median estimates.
-
- :param name: state/transition/... name, e.g. 'TX'
- :param attribute: model attribute, e.g. 'duration'
- :param param_filter:
- :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
- """
- fit_result = dict()
- for result in self.results:
- if (
- result["key"][0] == name
- and result["key"][1] == attribute
- and result["key"][3] == param_filter
- and result["result"]["best"] is not None
- ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
- this_result = result["result"]
- if this_result["best_rmsd"] >= min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- # See notes on depends_on_param
- elif this_result["best_rmsd"] >= 0.8 * min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- else:
- fit_result[result["key"][2]] = this_result
- return fit_result
-
-
-def _try_fits_parallel(arg):
- """
- Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
-
- Must be a global function as it is called from a multiprocessing Pool.
- """
- return {"key": arg["key"], "result": _try_fits(*arg["args"])}
-
-
-def _try_fits(
- by_param,
- state_or_tran,
- model_attribute,
- param_index,
- safe_functions_enabled=False,
- param_filter: dict = None,
-):
- """
- Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
-
- This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
- The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
- Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
- Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`.
-
- :returns: a dictionary with the following elements:
- best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
- best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
- mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
- median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
- results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
-
- :param by_param: measurements partitioned by state/transition/... name and parameter values.
- Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}`
-
- :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple).
- Example: `'foo'`
-
- :param model_attribute: attribute for which goodness-of-fit will be calculated.
- Example: `'bar'`
-
- :param param_index: index of the parameter used as model input
- :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
- :param param_filter: Only use measurements whose parameters match param_filter for fitting.
- """
-
- functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
-
- for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()):
- # We might remove elements from 'functions' while iterating over
- # its keys. A generator will not allow this, so we need to
- # convert to a list.
- function_names = list(functions.keys())
- for function_name in function_names:
- function_object = functions[function_name]
- if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
- param_key[1][param_index]
- ):
- functions.pop(function_name, None)
-
- raw_results = dict()
- raw_results_by_param = dict()
- ref_results = {"mean": list(), "median": list()}
- results = dict()
- results_by_param = dict()
-
- seen_parameter_combinations = set()
-
- # for each parameter combination:
- for param_key in filter(
- lambda x: x[0] == state_or_tran
- and remove_index_from_tuple(x[1], param_index)
- not in seen_parameter_combinations
- and len(by_param[x]["param"])
- and match_parameter_values(by_param[x]["param"][0], param_filter),
- by_param.keys(),
- ):
- X = []
- Y = []
- num_valid = 0
- num_total = 0
-
- # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
- # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
- seen_parameter_combinations.add(
- remove_index_from_tuple(param_key[1], param_index)
- )
-
- # for each value of the parameter denoted by param_index (all other parameters remain the same):
- for k, v in filter(
- lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
- ):
- num_total += 1
- if is_numeric(k[1][param_index]):
- num_valid += 1
- X.extend([float(k[1][param_index])] * len(v[model_attribute]))
- Y.extend(v[model_attribute])
-
- if num_valid > 2:
- X = np.array(X)
- Y = np.array(Y)
- other_parameters = remove_index_from_tuple(k[1], param_index)
- raw_results_by_param[other_parameters] = dict()
- results_by_param[other_parameters] = dict()
- for function_name, param_function in functions.items():
- if function_name not in raw_results:
- raw_results[function_name] = dict()
- error_function = param_function.error_function
- res = optimize.least_squares(
- error_function, [0, 1], args=(X, Y), xtol=2e-15
- )
- measures = regression_measures(param_function.eval(res.x, X), Y)
- raw_results_by_param[other_parameters][function_name] = measures
- for measure, error_rate in measures.items():
- if measure not in raw_results[function_name]:
- raw_results[function_name][measure] = list()
- raw_results[function_name][measure].append(error_rate)
- # print(function_name, res, measures)
- mean_measures = aggregate_measures(np.mean(Y), Y)
- ref_results["mean"].append(mean_measures["rmsd"])
- raw_results_by_param[other_parameters]["mean"] = mean_measures
- median_measures = aggregate_measures(np.median(Y), Y)
- ref_results["median"].append(median_measures["rmsd"])
- raw_results_by_param[other_parameters]["median"] = median_measures
-
- if not len(ref_results["mean"]):
- # Insufficient data for fitting
- # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
- return {"best": None, "best_rmsd": np.inf, "results": results}
-
- for (
- other_parameter_combination,
- other_parameter_results,
- ) in raw_results_by_param.items():
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in other_parameter_results.items():
- if len(result) > 0:
- results[function_name] = result
- rmsd = result["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
- results_by_param[other_parameter_combination] = {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": results["mean"]["rmsd"],
- "median_rmsd": results["median"]["rmsd"],
- "results": results,
- }
-
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in raw_results.items():
- if len(result) > 0:
- results[function_name] = {}
- for measure in result.keys():
- results[function_name][measure] = np.mean(result[measure])
- rmsd = results[function_name]["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
-
- return {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": np.mean(ref_results["mean"]),
- "median_rmsd": np.mean(ref_results["median"]),
- "results": results,
- "results_by_other_param": results_by_param,
- }
-
-
-def _num_args_from_by_name(by_name):
- num_args = dict()
- for key, value in by_name.items():
- if "args" in value:
- num_args[key] = len(value["args"][0])
- return num_args
-
-
-class AnalyticModel:
- u"""
- Parameter-aware analytic energy/data size/... model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- These provide measurements aggregated by (function/state/...) name
- and (for by_param) parameter values. Layout:
- dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'bar' : [5, 6, 7],
- 'attributes' : ['foo', 'bar'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- }
-
- methods:
- get_static -- return static (parameter-unaware) model.
- get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param.
- get_fitted -- return parameter-aware model using fitted functions for behaviour prediction.
-
- variables:
- names -- function/state/... names (i.e., the keys of by_name)
- parameters -- parameter names
- stats -- ParamStats object providing parameter-dependency statistics for each name and attribute
- assess -- calculate model quality
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count=None,
- function_override=dict(),
- use_corrcoef=False,
- ):
- """
- Create a new AnalyticModel and compute parameter statistics.
-
- :param by_name: measurements aggregated by (function/state/...) name.
- Layout: dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'duration' : [5, 6, 7],
- 'attributes' : ['foo', 'duration'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- # foo_count-^ ^-irrelevant
- }
- :param parameters: List of parameter names
- :param function_override: dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
- """
- self.cache = dict()
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self.names = sorted(by_name.keys())
- self.parameters = sorted(parameters)
- self.function_override = function_override.copy()
- self._use_corrcoef = use_corrcoef
- self._num_args = arg_count
- if self._num_args is None:
- self._num_args = _num_args_from_by_name(by_name)
-
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self.parameters,
- self._num_args,
- use_corrcoef=use_corrcoef,
- )
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- logger.warning("Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
- return model
-
- def param_index(self, param_name):
- if param_name in self.parameters:
- return self.parameters.index(param_name)
- return len(self.parameters) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self.parameters):
- return self.parameters[param_index]
- return str(param_index)
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get paramete-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict([[name, {}] for name in self.by_name.keys()])
- paramfit = ParallelParamFit(self.by_param)
-
- for name in self.by_name.keys():
- for attribute in self.by_name[name]["attributes"]:
- for param_index, param in enumerate(self.parameters):
- if self.stats.depends_on_param(name, attribute, param):
- paramfit.enqueue(name, attribute, param_index, param, False)
- if arg_support_enabled and name in self._num_args:
- for arg_index in range(self._num_args[name]):
- if self.stats.depends_on_arg(name, attribute, arg_index):
- paramfit.enqueue(
- name,
- attribute,
- len(self.parameters) + arg_index,
- arg_index,
- False,
- )
-
- paramfit.fit()
-
- for name in self.by_name.keys():
- num_args = 0
- if name in self._num_args:
- num_args = self._num_args[name]
- for attribute in self.by_name[name]["attributes"]:
- fit_result = paramfit.get_result(name, attribute)
-
- if (name, attribute) in self.function_override:
- function_str = self.function_override[(name, attribute)]
- x = AnalyticFunction(function_str, self.parameters, num_args)
- x.fit(self.by_param, name, attribute)
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
- elif len(fit_result.keys()):
- x = analytic.function_powerset(
- fit_result, self.parameters, num_args
- )
- x.fit(self.by_param, name, attribute)
-
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this AnalyticModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for attribute in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(
- name, attribute, param=elem["param"][i]
- ),
- range(len(elem[attribute])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[attribute])
- detailed_results[name][attribute] = measures
-
- return {"by_name": detailed_results}
-
- def to_json(self):
- # TODO
- pass
-
-
def _add_trace_data_to_aggregate(aggregate, key, element):
# Only cares about element['isa'], element['offline_aggregates'], and
# element['plan']['level']
@@ -1867,531 +1221,6 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]):
return by_name, parameter_names, arg_count
-class PTAModel:
- u"""
- Parameter-aware PTA-based energy model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- The model heavily relies on two internal data structures:
- PTAModel.by_name and PTAModel.by_param.
-
- These provide measurements aggregated by state/transition name
- and (for by_param) parameter values. Layout:
- dictionary with one key per state/transition ('send', 'TX', ...) or
- one key per state/transition and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
- For by_param, parameter values are ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - isa: 'state' or 'transition'
- - power: list of mean power measurements in µW
- - duration: list of durations in µs
- - power_std: list of stddev of power per state/transition
- - energy: consumed energy (power*duration) in pJ
- - paramkeys: list of parameter names in each measurement (-> list of lists)
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- additionally, only if isa == 'transition':
- - timeout: list of duration of previous state in µs
- - rel_energy_prev: transition energy relative to previous state mean power in pJ
- - rel_energy_next: transition energy relative to next state mean power in pJ
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count,
- traces=[],
- ignore_trace_indexes=[],
- discard_outliers=None,
- function_override={},
- use_corrcoef=False,
- pta=None,
- ):
- """
- Prepare a new PTA energy model.
-
- Actual model generation is done on-demand by calling the respective functions.
-
- arguments:
- by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate.
- parameters -- list of parameter names, as returned by pta_trace_to_aggregate
- arg_count -- function arguments, as returned by pta_trace_to_aggregate
- traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data()
- ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored.
- discard_outliers -- currently not supported: threshold for outlier detection and removel (float).
- Outlier detection is performed individually for each state/transition in each trace,
- so it only works if the benchmark ran several times.
- Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace),
- "m" (the median of all attribute measurements with the same parameters, which may include data from other traces),
- a data point X is considered an outlier if
- | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers .
- function_override -- dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- use_corrcoef -- use correlation coefficient instead of stddev comparison
- to detect whether a model attribute depends on a parameter
- pta -- hardware model as `PTA` object
- """
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self._parameter_names = sorted(parameters)
- self._num_args = arg_count
- self._use_corrcoef = use_corrcoef
- self.traces = traces
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self._parameter_names,
- self._num_args,
- self._use_corrcoef,
- )
- self.cache = {}
- np.seterr("raise")
- self._outlier_threshold = discard_outliers
- self.function_override = function_override.copy()
- self.pta = pta
- self.ignore_trace_indexes = ignore_trace_indexes
- self._aggregate_to_ndarray(self.by_name)
-
- def _aggregate_to_ndarray(self, aggregate):
- for elem in aggregate.values():
- for key in elem["attributes"]:
- elem[key] = np.array(elem[key])
-
- # This heuristic is very similar to the "function is not much better than
- # median" checks in get_fitted. So far, doing it here as well is mostly
- # a performance and not an algorithm quality decision.
- # --df, 2018-04-18
- def depends_on_param(self, state_or_trans, key, param):
- return self.stats.depends_on_param(state_or_trans, key, param)
-
- # See notes on depends_on_param
- def depends_on_arg(self, state_or_trans, key, param):
- return self.stats.depends_on_arg(state_or_trans, key, param)
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- logger.warning("Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
- return model
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling, unless `use_mean` is set.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def param_index(self, param_name):
- if param_name in self._parameter_names:
- return self._parameter_names.index(param_name)
- return len(self._parameter_names) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self._parameter_names):
- return self._parameter_names[param_index]
- return str(param_index)
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get parameter-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict(
- [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
- )
- paramfit = ParallelParamFit(self.by_param)
- for state_or_tran in self.by_name.keys():
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = {}
- for parameter_index, parameter_name in enumerate(self._parameter_names):
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- )
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- codependent_param_dict,
- )
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- for arg_index in range(self._num_args[state_or_tran]):
- if self.depends_on_arg(
- state_or_tran, model_attribute, arg_index
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- len(self._parameter_names) + arg_index,
- arg_index,
- safe_functions_enabled,
- )
- paramfit.fit()
-
- for state_or_tran in self.by_name.keys():
- num_args = 0
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- num_args = self._num_args[state_or_tran]
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = paramfit.get_result(state_or_tran, model_attribute)
-
- for parameter_name in self._parameter_names:
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- pass
- # FIXME paramfit.get_result hat ja gar keinen Parameter als Argument...
-
- if (state_or_tran, model_attribute) in self.function_override:
- function_str = self.function_override[
- (state_or_tran, model_attribute)
- ]
- x = AnalyticFunction(function_str, self._parameter_names, num_args)
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
- elif len(fit_results.keys()):
- x = analytic.function_powerset(
- fit_results, self._parameter_names, num_args
- )
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def to_json(self):
- static_model = self.get_static()
- static_quality = self.assess(static_model)
- param_model, param_info = self.get_fitted()
- analytic_quality = self.assess(param_model)
- self.pta.update(
- static_model,
- param_info,
- static_error=static_quality["by_name"],
- analytic_error=analytic_quality["by_name"],
- )
- return self.pta.to_json()
-
- def states(self):
- """Return sorted list of state names."""
- return sorted(
- list(
- filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
- )
- )
-
- def transitions(self):
- """Return sorted list of transition names."""
- return sorted(
- list(
- filter(
- lambda k: self.by_name[k]["isa"] == "transition",
- self.by_name.keys(),
- )
- )
- )
-
- def states_and_transitions(self):
- """Return list of states and transition names."""
- ret = self.states()
- ret.extend(self.transitions())
- return ret
-
- def parameters(self):
- return self._parameter_names
-
- def attributes(self, state_or_trans):
- return self.by_name[state_or_trans]["attributes"]
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this PTAModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for key in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(name, key, param=elem["param"][i]),
- range(len(elem[key])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[key])
- detailed_results[name][key] = measures
-
- return {"by_name": detailed_results}
-
- def assess_states(
- self, model_function, model_attribute="power", distribution: dict = None
- ):
- """
- Calculate overall model error assuming equal distribution of states
- """
- # TODO calculate mean power draw for distribution and use it to
- # calculate relative error from MAE combination
- model_quality = self.assess(model_function)
- num_states = len(self.states())
- if distribution is None:
- distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
-
- if not np.isclose(sum(distribution.values()), 1):
- raise ValueError(
- "distribution must be a probability distribution with sum 1"
- )
-
- # total_value = None
- # try:
- # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states()))
- # except KeyError:
- # pass
-
- total_error = np.sqrt(
- sum(
- map(
- lambda x: np.square(
- model_quality["by_name"][x][model_attribute]["mae"]
- * distribution[x]
- ),
- self.states(),
- )
- )
- )
- return total_error
-
- def assess_on_traces(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance.
-
- :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`.
- Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the
- traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states
- """
- model_energy_list = []
- real_energy_list = []
- model_rel_energy_list = []
- model_state_energy_list = []
- model_duration_list = []
- real_duration_list = []
- model_timeout_list = []
- real_timeout_list = []
-
- for trace in self.traces:
- if trace["id"] not in self.ignore_trace_indexes:
- for rep_id in range(len(trace["trace"][0]["offline"])):
- model_energy = 0.0
- real_energy = 0.0
- model_rel_energy = 0.0
- model_state_energy = 0.0
- model_duration = 0.0
- real_duration = 0.0
- model_timeout = 0.0
- real_timeout = 0.0
- for i, trace_part in enumerate(trace["trace"]):
- name = trace_part["name"]
- prev_name = trace["trace"][i - 1]["name"]
- isa = trace_part["isa"]
- if name != "UNINITIALIZED":
- try:
- param = trace_part["offline_aggregates"]["param"][
- rep_id
- ]
- prev_param = trace["trace"][i - 1][
- "offline_aggregates"
- ]["param"][rep_id]
- power = trace_part["offline"][rep_id]["uW_mean"]
- duration = trace_part["offline"][rep_id]["us"]
- prev_duration = trace["trace"][i - 1]["offline"][
- rep_id
- ]["us"]
- real_energy += power * duration
- if isa == "state":
- model_energy += (
- model_function(name, "power", param=param)
- * duration
- )
- else:
- model_energy += model_function(
- name, "energy", param=param
- )
- # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
- if i == 1:
- model_rel_energy += model_function(
- name, "energy", param=param
- )
- else:
- model_rel_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_state_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_rel_energy += model_function(
- name, "rel_energy_prev", param=param
- )
- real_duration += duration
- model_duration += model_function(
- name, "duration", param=param
- )
- if (
- "plan" in trace_part
- and trace_part["plan"]["level"] == "epilogue"
- ):
- real_timeout += trace_part["offline"][rep_id][
- "timeout"
- ]
- model_timeout += model_function(
- name, "timeout", param=param
- )
- except KeyError:
- # if states/transitions have been removed via --filter-param, this is harmless
- pass
- real_energy_list.append(real_energy)
- model_energy_list.append(model_energy)
- model_rel_energy_list.append(model_rel_energy)
- model_state_energy_list.append(model_state_energy)
- real_duration_list.append(real_duration)
- model_duration_list.append(model_duration)
- real_timeout_list.append(real_timeout)
- model_timeout_list.append(model_timeout)
-
- return {
- "duration_by_trace": regression_measures(
- np.array(model_duration_list), np.array(real_duration_list)
- ),
- "energy_by_trace": regression_measures(
- np.array(model_energy_list), np.array(real_energy_list)
- ),
- "timeout_by_trace": regression_measures(
- np.array(model_timeout_list), np.array(real_timeout_list)
- ),
- "rel_energy_by_trace": regression_measures(
- np.array(model_rel_energy_list), np.array(real_energy_list)
- ),
- "state_energy_by_trace": regression_measures(
- np.array(model_state_energy_list), np.array(real_energy_list)
- ),
- }
-
-
class EnergyTraceLog:
"""
EnergyTrace log loader for DFA traces.
diff --git a/lib/model.py b/lib/model.py
new file mode 100644
index 0000000..d83c12c
--- /dev/null
+++ b/lib/model.py
@@ -0,0 +1,1186 @@
+#!/usr/bin/env python3
+
+import logging
+import numpy as np
+from scipy import optimize
+from sklearn.metrics import r2_score
+from multiprocessing import Pool
+from .functions import analytic
+from .functions import AnalyticFunction
+from .parameters import ParamStats
+from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple
+from .utils import by_name_to_by_param, match_parameter_values
+
+logger = logging.getLogger(__name__)
+arg_support_enabled = True
+
+
+def aggregate_measures(aggregate: float, actual: list) -> dict:
+ """
+ Calculate error measures for model value on data list.
+
+ arguments:
+ aggregate -- model value (float or int)
+ actual -- real-world / reference values (list of float or int)
+
+ return value:
+ See regression_measures
+ """
+ aggregate_array = np.array([aggregate] * len(actual))
+ return regression_measures(aggregate_array, np.array(actual))
+
+
+def regression_measures(predicted: np.ndarray, actual: np.ndarray):
+ """
+ Calculate error measures by comparing model values to reference values.
+
+ arguments:
+ predicted -- model values (np.ndarray)
+ actual -- real-world / reference values (np.ndarray)
+
+ Returns a dict containing the following measures:
+ mae -- Mean Absolute Error
+ mape -- Mean Absolute Percentage Error,
+ if all items in actual are non-zero (NaN otherwise)
+ smape -- Symmetric Mean Absolute Percentage Error,
+ if no 0,0-pairs are present in actual and predicted (NaN otherwise)
+ msd -- Mean Square Deviation
+ rmsd -- Root Mean Square Deviation
+ ssr -- Sum of Squared Residuals
+ rsq -- R^2 measure, see sklearn.metrics.r2_score
+ count -- Number of values
+ """
+ if type(predicted) != np.ndarray:
+ raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
+ if type(actual) != np.ndarray:
+ raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
+ deviations = predicted - actual
+ # mean = np.mean(actual)
+ if len(deviations) == 0:
+ return {}
+ measures = {
+ "mae": np.mean(np.abs(deviations), dtype=np.float64),
+ "msd": np.mean(deviations ** 2, dtype=np.float64),
+ "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
+ "ssr": np.sum(deviations ** 2, dtype=np.float64),
+ "rsq": r2_score(actual, predicted),
+ "count": len(actual),
+ }
+
+ # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
+
+ if np.all(actual != 0):
+ measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
+ else:
+ measures["mape"] = np.nan
+ if np.all(np.abs(predicted) + np.abs(actual) != 0):
+ measures["smape"] = (
+ np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
+ * 100
+ )
+ else:
+ measures["smape"] = np.nan
+ # if np.all(rsq_quotient != 0):
+ # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
+
+ return measures
+
+
+class ParallelParamFit:
+ """
+ Fit a set of functions on parameterized measurements.
+
+ One parameter is variale, all others are fixed. Reports the best-fitting
+ function type for each parameter.
+ """
+
+ def __init__(self, by_param):
+ """Create a new ParallelParamFit object."""
+ self.fit_queue = []
+ self.by_param = by_param
+
+ def enqueue(
+ self,
+ state_or_tran,
+ attribute,
+ param_index,
+ param_name,
+ safe_functions_enabled=False,
+ param_filter=None,
+ ):
+ """
+ Add state_or_tran/attribute/param_name to fit queue.
+
+ This causes fit() to compute the best-fitting function for this model part.
+ """
+ self.fit_queue.append(
+ {
+ "key": [state_or_tran, attribute, param_name, param_filter],
+ "args": [
+ self.by_param,
+ state_or_tran,
+ attribute,
+ param_index,
+ safe_functions_enabled,
+ param_filter,
+ ],
+ }
+ )
+
+ def fit(self):
+ """
+ Fit functions on previously enqueue data.
+
+ Fitting is one in parallel with one process per core.
+
+ Results can be accessed using the public ParallelParamFit.results object.
+ """
+ with Pool() as pool:
+ self.results = pool.map(_try_fits_parallel, self.fit_queue)
+
+ def get_result(self, name, attribute, param_filter: dict = None):
+ """
+ Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
+
+ Filters out results where the best function is worse (or not much better than) static mean/median estimates.
+
+ :param name: state/transition/... name, e.g. 'TX'
+ :param attribute: model attribute, e.g. 'duration'
+ :param param_filter:
+ :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
+ """
+ fit_result = dict()
+ for result in self.results:
+ if (
+ result["key"][0] == name
+ and result["key"][1] == attribute
+ and result["key"][3] == param_filter
+ and result["result"]["best"] is not None
+ ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
+ this_result = result["result"]
+ if this_result["best_rmsd"] >= min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ # See notes on depends_on_param
+ elif this_result["best_rmsd"] >= 0.8 * min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ else:
+ fit_result[result["key"][2]] = this_result
+ return fit_result
+
+
+def _try_fits_parallel(arg):
+ """
+ Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
+
+ Must be a global function as it is called from a multiprocessing Pool.
+ """
+ return {"key": arg["key"], "result": _try_fits(*arg["args"])}
+
+
+def _try_fits(
+ by_param,
+ state_or_tran,
+ model_attribute,
+ param_index,
+ safe_functions_enabled=False,
+ param_filter: dict = None,
+):
+ """
+ Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
+
+ This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
+ The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
+ Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
+ Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`.
+
+ :returns: a dictionary with the following elements:
+ best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
+ best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
+ mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
+ median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
+ results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
+
+ :param by_param: measurements partitioned by state/transition/... name and parameter values.
+ Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}`
+
+ :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple).
+ Example: `'foo'`
+
+ :param model_attribute: attribute for which goodness-of-fit will be calculated.
+ Example: `'bar'`
+
+ :param param_index: index of the parameter used as model input
+ :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
+ :param param_filter: Only use measurements whose parameters match param_filter for fitting.
+ """
+
+ functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
+
+ for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()):
+ # We might remove elements from 'functions' while iterating over
+ # its keys. A generator will not allow this, so we need to
+ # convert to a list.
+ function_names = list(functions.keys())
+ for function_name in function_names:
+ function_object = functions[function_name]
+ if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
+ param_key[1][param_index]
+ ):
+ functions.pop(function_name, None)
+
+ raw_results = dict()
+ raw_results_by_param = dict()
+ ref_results = {"mean": list(), "median": list()}
+ results = dict()
+ results_by_param = dict()
+
+ seen_parameter_combinations = set()
+
+ # for each parameter combination:
+ for param_key in filter(
+ lambda x: x[0] == state_or_tran
+ and remove_index_from_tuple(x[1], param_index)
+ not in seen_parameter_combinations
+ and len(by_param[x]["param"])
+ and match_parameter_values(by_param[x]["param"][0], param_filter),
+ by_param.keys(),
+ ):
+ X = []
+ Y = []
+ num_valid = 0
+ num_total = 0
+
+ # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
+ # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
+ seen_parameter_combinations.add(
+ remove_index_from_tuple(param_key[1], param_index)
+ )
+
+ # for each value of the parameter denoted by param_index (all other parameters remain the same):
+ for k, v in filter(
+ lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
+ ):
+ num_total += 1
+ if is_numeric(k[1][param_index]):
+ num_valid += 1
+ X.extend([float(k[1][param_index])] * len(v[model_attribute]))
+ Y.extend(v[model_attribute])
+
+ if num_valid > 2:
+ X = np.array(X)
+ Y = np.array(Y)
+ other_parameters = remove_index_from_tuple(k[1], param_index)
+ raw_results_by_param[other_parameters] = dict()
+ results_by_param[other_parameters] = dict()
+ for function_name, param_function in functions.items():
+ if function_name not in raw_results:
+ raw_results[function_name] = dict()
+ error_function = param_function.error_function
+ res = optimize.least_squares(
+ error_function, [0, 1], args=(X, Y), xtol=2e-15
+ )
+ measures = regression_measures(param_function.eval(res.x, X), Y)
+ raw_results_by_param[other_parameters][function_name] = measures
+ for measure, error_rate in measures.items():
+ if measure not in raw_results[function_name]:
+ raw_results[function_name][measure] = list()
+ raw_results[function_name][measure].append(error_rate)
+ # print(function_name, res, measures)
+ mean_measures = aggregate_measures(np.mean(Y), Y)
+ ref_results["mean"].append(mean_measures["rmsd"])
+ raw_results_by_param[other_parameters]["mean"] = mean_measures
+ median_measures = aggregate_measures(np.median(Y), Y)
+ ref_results["median"].append(median_measures["rmsd"])
+ raw_results_by_param[other_parameters]["median"] = median_measures
+
+ if not len(ref_results["mean"]):
+ # Insufficient data for fitting
+ # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
+ return {"best": None, "best_rmsd": np.inf, "results": results}
+
+ for (
+ other_parameter_combination,
+ other_parameter_results,
+ ) in raw_results_by_param.items():
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in other_parameter_results.items():
+ if len(result) > 0:
+ results[function_name] = result
+ rmsd = result["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+ results_by_param[other_parameter_combination] = {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": results["mean"]["rmsd"],
+ "median_rmsd": results["median"]["rmsd"],
+ "results": results,
+ }
+
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in raw_results.items():
+ if len(result) > 0:
+ results[function_name] = {}
+ for measure in result.keys():
+ results[function_name][measure] = np.mean(result[measure])
+ rmsd = results[function_name]["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+
+ return {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": np.mean(ref_results["mean"]),
+ "median_rmsd": np.mean(ref_results["median"]),
+ "results": results,
+ "results_by_other_param": results_by_param,
+ }
+
+
+def _num_args_from_by_name(by_name):
+ num_args = dict()
+ for key, value in by_name.items():
+ if "args" in value:
+ num_args[key] = len(value["args"][0])
+ return num_args
+
+
+class AnalyticModel:
+ u"""
+ Parameter-aware analytic energy/data size/... model.
+
+ Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
+
+ These provide measurements aggregated by (function/state/...) name
+ and (for by_param) parameter values. Layout:
+ dictionary with one key per name ('send', 'TX', ...) or
+ one key per name and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+
+ Parameter values must be ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ - for each attribute mentioned in 'attributes': A list with measurements.
+ All list except for 'attributes' must have the same length.
+
+ For example:
+ parameters = ['foo_count', 'irrelevant']
+ by_name = {
+ 'foo' : [1, 1, 2],
+ 'bar' : [5, 6, 7],
+ 'attributes' : ['foo', 'bar'],
+ 'param' : [[1, 0], [1, 0], [2, 0]]
+ }
+
+ methods:
+ get_static -- return static (parameter-unaware) model.
+ get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param.
+ get_fitted -- return parameter-aware model using fitted functions for behaviour prediction.
+
+ variables:
+ names -- function/state/... names (i.e., the keys of by_name)
+ parameters -- parameter names
+ stats -- ParamStats object providing parameter-dependency statistics for each name and attribute
+ assess -- calculate model quality
+ """
+
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count=None,
+ function_override=dict(),
+ use_corrcoef=False,
+ ):
+ """
+ Create a new AnalyticModel and compute parameter statistics.
+
+ :param by_name: measurements aggregated by (function/state/...) name.
+ Layout: dictionary with one key per name ('send', 'TX', ...) or
+ one key per name and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+
+ Parameter values must be ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ - for each attribute mentioned in 'attributes': A list with measurements.
+ All list except for 'attributes' must have the same length.
+
+ For example:
+ parameters = ['foo_count', 'irrelevant']
+ by_name = {
+ 'foo' : [1, 1, 2],
+ 'duration' : [5, 6, 7],
+ 'attributes' : ['foo', 'duration'],
+ 'param' : [[1, 0], [1, 0], [2, 0]]
+ # foo_count-^ ^-irrelevant
+ }
+ :param parameters: List of parameter names
+ :param function_override: dict of overrides for automatic parameter function generation.
+ If (state or transition name, model attribute) is present in function_override,
+ the corresponding text string is the function used for analytic (parameter-aware/fitted)
+ modeling of this attribute. It is passed to AnalyticFunction, see
+ there for the required format. Note that this happens regardless of
+ parameter dependency detection: The provided analytic function will be assigned
+ even if it seems like the model attribute is static / parameter-independent.
+ :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
+ """
+ self.cache = dict()
+ self.by_name = by_name
+ self.by_param = by_name_to_by_param(by_name)
+ self.names = sorted(by_name.keys())
+ self.parameters = sorted(parameters)
+ self.function_override = function_override.copy()
+ self._use_corrcoef = use_corrcoef
+ self._num_args = arg_count
+ if self._num_args is None:
+ self._num_args = _num_args_from_by_name(by_name)
+
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self.parameters,
+ self._num_args,
+ use_corrcoef=use_corrcoef,
+ )
+
+ def _get_model_from_dict(self, model_dict, model_function):
+ model = {}
+ for name, elem in model_dict.items():
+ model[name] = {}
+ for key in elem["attributes"]:
+ try:
+ model[name][key] = model_function(elem[key])
+ except RuntimeWarning:
+ logger.warning("Got no data for {} {}".format(name, key))
+ except FloatingPointError as fpe:
+ logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
+ return model
+
+ def param_index(self, param_name):
+ if param_name in self.parameters:
+ return self.parameters.index(param_name)
+ return len(self.parameters) + int(param_name)
+
+ def param_name(self, param_index):
+ if param_index < len(self.parameters):
+ return self.parameters[param_index]
+ return str(param_index)
+
+ def get_static(self, use_mean=False):
+ """
+ Get static model function: name, attribute -> model value.
+
+ Uses the median of by_name for modeling.
+ """
+ getter_function = np.median
+
+ if use_mean:
+ getter_function = np.mean
+
+ static_model = self._get_model_from_dict(self.by_name, getter_function)
+
+ def static_model_getter(name, key, **kwargs):
+ return static_model[name][key]
+
+ return static_model_getter
+
+ def get_param_lut(self, fallback=False):
+ """
+ Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
+
+ The function can only give model values for parameter combinations
+ present in by_param. By default, it raises KeyError for other values.
+
+ arguments:
+ fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
+ """
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ lut_model = self._get_model_from_dict(self.by_param, np.median)
+
+ def lut_median_getter(name, key, param, arg=[], **kwargs):
+ param.extend(map(soft_cast_int, arg))
+ try:
+ return lut_model[(name, tuple(param))][key]
+ except KeyError:
+ if fallback:
+ return static_model[name][key]
+ raise
+
+ return lut_median_getter
+
+ def get_fitted(self, safe_functions_enabled=False):
+ """
+ Get paramete-aware model function and model information function.
+
+ Returns two functions:
+ model_function(name, attribute, param=parameter values) -> model value.
+ model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
+ """
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
+
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ param_model = dict([[name, {}] for name in self.by_name.keys()])
+ paramfit = ParallelParamFit(self.by_param)
+
+ for name in self.by_name.keys():
+ for attribute in self.by_name[name]["attributes"]:
+ for param_index, param in enumerate(self.parameters):
+ if self.stats.depends_on_param(name, attribute, param):
+ paramfit.enqueue(name, attribute, param_index, param, False)
+ if arg_support_enabled and name in self._num_args:
+ for arg_index in range(self._num_args[name]):
+ if self.stats.depends_on_arg(name, attribute, arg_index):
+ paramfit.enqueue(
+ name,
+ attribute,
+ len(self.parameters) + arg_index,
+ arg_index,
+ False,
+ )
+
+ paramfit.fit()
+
+ for name in self.by_name.keys():
+ num_args = 0
+ if name in self._num_args:
+ num_args = self._num_args[name]
+ for attribute in self.by_name[name]["attributes"]:
+ fit_result = paramfit.get_result(name, attribute)
+
+ if (name, attribute) in self.function_override:
+ function_str = self.function_override[(name, attribute)]
+ x = AnalyticFunction(function_str, self.parameters, num_args)
+ x.fit(self.by_param, name, attribute)
+ if x.fit_success:
+ param_model[name][attribute] = {
+ "fit_result": fit_result,
+ "function": x,
+ }
+ elif len(fit_result.keys()):
+ x = analytic.function_powerset(
+ fit_result, self.parameters, num_args
+ )
+ x.fit(self.by_param, name, attribute)
+
+ if x.fit_success:
+ param_model[name][attribute] = {
+ "fit_result": fit_result,
+ "function": x,
+ }
+
+ def model_getter(name, key, **kwargs):
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
+ if key in param_model[name]:
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
+ if param_function.is_predictable(param_list):
+ return param_function.eval(param_list)
+ return static_model[name][key]
+
+ def info_getter(name, key):
+ if key in param_model[name]:
+ return param_model[name][key]
+ return None
+
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
+
+ return model_getter, info_getter
+
+ def assess(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
+
+ state/transition/... name and parameter values are fed into model_function.
+ The by_name entries of this AnalyticModel are used as ground truth and
+ compared with the values predicted by model_function.
+
+ For proper model assessments, the data used to generate model_function
+ and the data fed into this AnalyticModel instance must be mutually
+ exclusive (e.g. by performing cross validation). Otherwise,
+ overfitting cannot be detected.
+ """
+ detailed_results = {}
+ for name, elem in sorted(self.by_name.items()):
+ detailed_results[name] = {}
+ for attribute in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(
+ name, attribute, param=elem["param"][i]
+ ),
+ range(len(elem[attribute])),
+ )
+ )
+ )
+ measures = regression_measures(predicted_data, elem[attribute])
+ detailed_results[name][attribute] = measures
+
+ return {"by_name": detailed_results}
+
+ def to_json(self):
+ # TODO
+ pass
+
+
+class PTAModel:
+ u"""
+ Parameter-aware PTA-based energy model.
+
+ Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
+
+ The model heavily relies on two internal data structures:
+ PTAModel.by_name and PTAModel.by_param.
+
+ These provide measurements aggregated by state/transition name
+ and (for by_param) parameter values. Layout:
+ dictionary with one key per state/transition ('send', 'TX', ...) or
+ one key per state/transition and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+ For by_param, parameter values are ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - isa: 'state' or 'transition'
+ - power: list of mean power measurements in µW
+ - duration: list of durations in µs
+ - power_std: list of stddev of power per state/transition
+ - energy: consumed energy (power*duration) in pJ
+ - paramkeys: list of parameter names in each measurement (-> list of lists)
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ additionally, only if isa == 'transition':
+ - timeout: list of duration of previous state in µs
+ - rel_energy_prev: transition energy relative to previous state mean power in pJ
+ - rel_energy_next: transition energy relative to next state mean power in pJ
+ """
+
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count,
+ traces=[],
+ ignore_trace_indexes=[],
+ discard_outliers=None,
+ function_override={},
+ use_corrcoef=False,
+ pta=None,
+ ):
+ """
+ Prepare a new PTA energy model.
+
+ Actual model generation is done on-demand by calling the respective functions.
+
+ arguments:
+ by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate.
+ parameters -- list of parameter names, as returned by pta_trace_to_aggregate
+ arg_count -- function arguments, as returned by pta_trace_to_aggregate
+ traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data()
+ ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored.
+ discard_outliers -- currently not supported: threshold for outlier detection and removel (float).
+ Outlier detection is performed individually for each state/transition in each trace,
+ so it only works if the benchmark ran several times.
+ Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace),
+ "m" (the median of all attribute measurements with the same parameters, which may include data from other traces),
+ a data point X is considered an outlier if
+ | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers .
+ function_override -- dict of overrides for automatic parameter function generation.
+ If (state or transition name, model attribute) is present in function_override,
+ the corresponding text string is the function used for analytic (parameter-aware/fitted)
+ modeling of this attribute. It is passed to AnalyticFunction, see
+ there for the required format. Note that this happens regardless of
+ parameter dependency detection: The provided analytic function will be assigned
+ even if it seems like the model attribute is static / parameter-independent.
+ use_corrcoef -- use correlation coefficient instead of stddev comparison
+ to detect whether a model attribute depends on a parameter
+ pta -- hardware model as `PTA` object
+ """
+ self.by_name = by_name
+ self.by_param = by_name_to_by_param(by_name)
+ self._parameter_names = sorted(parameters)
+ self._num_args = arg_count
+ self._use_corrcoef = use_corrcoef
+ self.traces = traces
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self._parameter_names,
+ self._num_args,
+ self._use_corrcoef,
+ )
+ self.cache = {}
+ np.seterr("raise")
+ self._outlier_threshold = discard_outliers
+ self.function_override = function_override.copy()
+ self.pta = pta
+ self.ignore_trace_indexes = ignore_trace_indexes
+ self._aggregate_to_ndarray(self.by_name)
+
+ def _aggregate_to_ndarray(self, aggregate):
+ for elem in aggregate.values():
+ for key in elem["attributes"]:
+ elem[key] = np.array(elem[key])
+
+ # This heuristic is very similar to the "function is not much better than
+ # median" checks in get_fitted. So far, doing it here as well is mostly
+ # a performance and not an algorithm quality decision.
+ # --df, 2018-04-18
+ def depends_on_param(self, state_or_trans, key, param):
+ return self.stats.depends_on_param(state_or_trans, key, param)
+
+ # See notes on depends_on_param
+ def depends_on_arg(self, state_or_trans, key, param):
+ return self.stats.depends_on_arg(state_or_trans, key, param)
+
+ def _get_model_from_dict(self, model_dict, model_function):
+ model = {}
+ for name, elem in model_dict.items():
+ model[name] = {}
+ for key in elem["attributes"]:
+ try:
+ model[name][key] = model_function(elem[key])
+ except RuntimeWarning:
+ logger.warning("Got no data for {} {}".format(name, key))
+ except FloatingPointError as fpe:
+ logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
+ return model
+
+ def get_static(self, use_mean=False):
+ """
+ Get static model function: name, attribute -> model value.
+
+ Uses the median of by_name for modeling, unless `use_mean` is set.
+ """
+ getter_function = np.median
+
+ if use_mean:
+ getter_function = np.mean
+
+ static_model = self._get_model_from_dict(self.by_name, getter_function)
+
+ def static_model_getter(name, key, **kwargs):
+ return static_model[name][key]
+
+ return static_model_getter
+
+ def get_param_lut(self, fallback=False):
+ """
+ Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
+
+ The function can only give model values for parameter combinations
+ present in by_param. By default, it raises KeyError for other values.
+
+ arguments:
+ fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
+ """
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ lut_model = self._get_model_from_dict(self.by_param, np.median)
+
+ def lut_median_getter(name, key, param, arg=[], **kwargs):
+ param.extend(map(soft_cast_int, arg))
+ try:
+ return lut_model[(name, tuple(param))][key]
+ except KeyError:
+ if fallback:
+ return static_model[name][key]
+ raise
+
+ return lut_median_getter
+
+ def param_index(self, param_name):
+ if param_name in self._parameter_names:
+ return self._parameter_names.index(param_name)
+ return len(self._parameter_names) + int(param_name)
+
+ def param_name(self, param_index):
+ if param_index < len(self._parameter_names):
+ return self._parameter_names[param_index]
+ return str(param_index)
+
+ def get_fitted(self, safe_functions_enabled=False):
+ """
+ Get parameter-aware model function and model information function.
+
+ Returns two functions:
+ model_function(name, attribute, param=parameter values) -> model value.
+ model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
+ """
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
+
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ param_model = dict(
+ [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
+ )
+ paramfit = ParallelParamFit(self.by_param)
+ for state_or_tran in self.by_name.keys():
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
+ fit_results = {}
+ for parameter_index, parameter_name in enumerate(self._parameter_names):
+ if self.depends_on_param(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ parameter_index,
+ parameter_name,
+ safe_functions_enabled,
+ )
+ for (
+ codependent_param_dict
+ ) in self.stats.codependent_parameter_value_dicts(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ parameter_index,
+ parameter_name,
+ safe_functions_enabled,
+ codependent_param_dict,
+ )
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
+ for arg_index in range(self._num_args[state_or_tran]):
+ if self.depends_on_arg(
+ state_or_tran, model_attribute, arg_index
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ len(self._parameter_names) + arg_index,
+ arg_index,
+ safe_functions_enabled,
+ )
+ paramfit.fit()
+
+ for state_or_tran in self.by_name.keys():
+ num_args = 0
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
+ num_args = self._num_args[state_or_tran]
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
+ fit_results = paramfit.get_result(state_or_tran, model_attribute)
+
+ for parameter_name in self._parameter_names:
+ if self.depends_on_param(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ for (
+ codependent_param_dict
+ ) in self.stats.codependent_parameter_value_dicts(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ pass
+ # FIXME paramfit.get_result hat ja gar keinen Parameter als Argument...
+
+ if (state_or_tran, model_attribute) in self.function_override:
+ function_str = self.function_override[
+ (state_or_tran, model_attribute)
+ ]
+ x = AnalyticFunction(function_str, self._parameter_names, num_args)
+ x.fit(self.by_param, state_or_tran, model_attribute)
+ if x.fit_success:
+ param_model[state_or_tran][model_attribute] = {
+ "fit_result": fit_results,
+ "function": x,
+ }
+ elif len(fit_results.keys()):
+ x = analytic.function_powerset(
+ fit_results, self._parameter_names, num_args
+ )
+ x.fit(self.by_param, state_or_tran, model_attribute)
+ if x.fit_success:
+ param_model[state_or_tran][model_attribute] = {
+ "fit_result": fit_results,
+ "function": x,
+ }
+
+ def model_getter(name, key, **kwargs):
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
+ if key in param_model[name]:
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
+ if param_function.is_predictable(param_list):
+ return param_function.eval(param_list)
+ return static_model[name][key]
+
+ def info_getter(name, key):
+ if key in param_model[name]:
+ return param_model[name][key]
+ return None
+
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
+
+ return model_getter, info_getter
+
+ def to_json(self):
+ static_model = self.get_static()
+ static_quality = self.assess(static_model)
+ param_model, param_info = self.get_fitted()
+ analytic_quality = self.assess(param_model)
+ self.pta.update(
+ static_model,
+ param_info,
+ static_error=static_quality["by_name"],
+ analytic_error=analytic_quality["by_name"],
+ )
+ return self.pta.to_json()
+
+ def states(self):
+ """Return sorted list of state names."""
+ return sorted(
+ list(
+ filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
+ )
+ )
+
+ def transitions(self):
+ """Return sorted list of transition names."""
+ return sorted(
+ list(
+ filter(
+ lambda k: self.by_name[k]["isa"] == "transition",
+ self.by_name.keys(),
+ )
+ )
+ )
+
+ def states_and_transitions(self):
+ """Return list of states and transition names."""
+ ret = self.states()
+ ret.extend(self.transitions())
+ return ret
+
+ def parameters(self):
+ return self._parameter_names
+
+ def attributes(self, state_or_trans):
+ return self.by_name[state_or_trans]["attributes"]
+
+ def assess(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
+
+ state/transition/... name and parameter values are fed into model_function.
+ The by_name entries of this PTAModel are used as ground truth and
+ compared with the values predicted by model_function.
+
+ For proper model assessments, the data used to generate model_function
+ and the data fed into this AnalyticModel instance must be mutually
+ exclusive (e.g. by performing cross validation). Otherwise,
+ overfitting cannot be detected.
+ """
+ detailed_results = {}
+ for name, elem in sorted(self.by_name.items()):
+ detailed_results[name] = {}
+ for key in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(name, key, param=elem["param"][i]),
+ range(len(elem[key])),
+ )
+ )
+ )
+ measures = regression_measures(predicted_data, elem[key])
+ detailed_results[name][key] = measures
+
+ return {"by_name": detailed_results}
+
+ def assess_states(
+ self, model_function, model_attribute="power", distribution: dict = None
+ ):
+ """
+ Calculate overall model error assuming equal distribution of states
+ """
+ # TODO calculate mean power draw for distribution and use it to
+ # calculate relative error from MAE combination
+ model_quality = self.assess(model_function)
+ num_states = len(self.states())
+ if distribution is None:
+ distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
+
+ if not np.isclose(sum(distribution.values()), 1):
+ raise ValueError(
+ "distribution must be a probability distribution with sum 1"
+ )
+
+ # total_value = None
+ # try:
+ # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states()))
+ # except KeyError:
+ # pass
+
+ total_error = np.sqrt(
+ sum(
+ map(
+ lambda x: np.square(
+ model_quality["by_name"][x][model_attribute]["mae"]
+ * distribution[x]
+ ),
+ self.states(),
+ )
+ )
+ )
+ return total_error
+
+ def assess_on_traces(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance.
+
+ :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`.
+ Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the
+ traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states
+ """
+ model_energy_list = []
+ real_energy_list = []
+ model_rel_energy_list = []
+ model_state_energy_list = []
+ model_duration_list = []
+ real_duration_list = []
+ model_timeout_list = []
+ real_timeout_list = []
+
+ for trace in self.traces:
+ if trace["id"] not in self.ignore_trace_indexes:
+ for rep_id in range(len(trace["trace"][0]["offline"])):
+ model_energy = 0.0
+ real_energy = 0.0
+ model_rel_energy = 0.0
+ model_state_energy = 0.0
+ model_duration = 0.0
+ real_duration = 0.0
+ model_timeout = 0.0
+ real_timeout = 0.0
+ for i, trace_part in enumerate(trace["trace"]):
+ name = trace_part["name"]
+ prev_name = trace["trace"][i - 1]["name"]
+ isa = trace_part["isa"]
+ if name != "UNINITIALIZED":
+ try:
+ param = trace_part["offline_aggregates"]["param"][
+ rep_id
+ ]
+ prev_param = trace["trace"][i - 1][
+ "offline_aggregates"
+ ]["param"][rep_id]
+ power = trace_part["offline"][rep_id]["uW_mean"]
+ duration = trace_part["offline"][rep_id]["us"]
+ prev_duration = trace["trace"][i - 1]["offline"][
+ rep_id
+ ]["us"]
+ real_energy += power * duration
+ if isa == "state":
+ model_energy += (
+ model_function(name, "power", param=param)
+ * duration
+ )
+ else:
+ model_energy += model_function(
+ name, "energy", param=param
+ )
+ # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
+ if i == 1:
+ model_rel_energy += model_function(
+ name, "energy", param=param
+ )
+ else:
+ model_rel_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_state_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_rel_energy += model_function(
+ name, "rel_energy_prev", param=param
+ )
+ real_duration += duration
+ model_duration += model_function(
+ name, "duration", param=param
+ )
+ if (
+ "plan" in trace_part
+ and trace_part["plan"]["level"] == "epilogue"
+ ):
+ real_timeout += trace_part["offline"][rep_id][
+ "timeout"
+ ]
+ model_timeout += model_function(
+ name, "timeout", param=param
+ )
+ except KeyError:
+ # if states/transitions have been removed via --filter-param, this is harmless
+ pass
+ real_energy_list.append(real_energy)
+ model_energy_list.append(model_energy)
+ model_rel_energy_list.append(model_rel_energy)
+ model_state_energy_list.append(model_state_energy)
+ real_duration_list.append(real_duration)
+ model_duration_list.append(model_duration)
+ real_timeout_list.append(real_timeout)
+ model_timeout_list.append(model_timeout)
+
+ return {
+ "duration_by_trace": regression_measures(
+ np.array(model_duration_list), np.array(real_duration_list)
+ ),
+ "energy_by_trace": regression_measures(
+ np.array(model_energy_list), np.array(real_energy_list)
+ ),
+ "timeout_by_trace": regression_measures(
+ np.array(model_timeout_list), np.array(real_timeout_list)
+ ),
+ "rel_energy_by_trace": regression_measures(
+ np.array(model_rel_energy_list), np.array(real_energy_list)
+ ),
+ "state_energy_by_trace": regression_measures(
+ np.array(model_state_energy_list), np.array(real_energy_list)
+ ),
+ }
diff --git a/test/test_parameters.py b/test/test_parameters.py
index 22efccf..e36b1a1 100755
--- a/test/test_parameters.py
+++ b/test/test_parameters.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
-from dfatool import dfatool as dt
from dfatool import parameters
from dfatool.utils import by_name_to_by_param
from dfatool.functions import analytic
+from dfatool.model import ParallelParamFit
import unittest
import numpy as np
@@ -52,7 +52,7 @@ class TestModels(unittest.TestCase):
# Fit individual functions for each parameter (only "p_linear" in this case)
- paramfit = dt.ParallelParamFit(by_param)
+ paramfit = ParallelParamFit(by_param)
paramfit.enqueue("TX", "power", 1, "p_linear")
paramfit.fit()
@@ -133,7 +133,7 @@ class TestModels(unittest.TestCase):
self.assertEqual(stats.depends_on_param("someKey", "ll", "log_inv"), True)
self.assertEqual(stats.depends_on_param("someKey", "ll", "square_none"), False)
- paramfit = dt.ParallelParamFit(by_param)
+ paramfit = ParallelParamFit(by_param)
paramfit.enqueue("someKey", "lls", 0, "lin_lin")
paramfit.enqueue("someKey", "lls", 1, "log_inv")
paramfit.enqueue("someKey", "lls", 2, "square_none")
diff --git a/test/test_ptamodel.py b/test/test_ptamodel.py
index 9abe3c0..e153280 100755
--- a/test/test_ptamodel.py
+++ b/test/test_ptamodel.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
-from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate
+from dfatool.dfatool import RawData, pta_trace_to_aggregate
+from dfatool.model import PTAModel
import os
import unittest
import pytest
diff --git a/test/test_timingharness.py b/test/test_timingharness.py
index 61cdf4b..cc4b766 100755
--- a/test/test_timingharness.py
+++ b/test/test_timingharness.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
-from dfatool.dfatool import AnalyticModel, TimingData, pta_trace_to_aggregate
+from dfatool.dfatool import TimingData, pta_trace_to_aggregate
+from dfatool.model import AnalyticModel
from dfatool.parameters import prune_dependent_parameters
import unittest