summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorjfalkenhagen <jfalkenhagen@uos.de>2020-07-16 16:39:19 +0200
committerjfalkenhagen <jfalkenhagen@uos.de>2020-07-16 16:39:19 +0200
commit98d23807e35cc211415c7e0c887f1b1b502f10e5 (patch)
treeebb649c585166e546dda704990ed4c5eeb95519f /lib
parenta00ffc0e32ddc72a8faceec4344432cdbf3b90c7 (diff)
parentaf4cc108b5c5132a991a2b83d258ed55e985936f (diff)
Merge branch 'master' into janis
Diffstat (limited to 'lib')
-rwxr-xr-xlib/automata.py17
-rw-r--r--lib/data_parameters.py17
-rw-r--r--lib/functions.py91
-rw-r--r--lib/harness.py4
-rwxr-xr-xlib/keysightdlog.py164
-rw-r--r--lib/lex.py9
-rw-r--r--lib/loader.py (renamed from lib/dfatool.py)1603
-rw-r--r--lib/model.py1156
-rw-r--r--lib/parameters.py252
-rwxr-xr-xlib/protocol_benchmarks.py7
-rw-r--r--lib/runner.py50
-rw-r--r--lib/utils.py14
-rw-r--r--lib/validation.py238
13 files changed, 1616 insertions, 2006 deletions
diff --git a/lib/automata.py b/lib/automata.py
index b3318e0..ebe1871 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -3,11 +3,14 @@
from .functions import AnalyticFunction, NormalizationFunction
from .utils import is_numeric
import itertools
+import logging
import numpy as np
import json
import queue
import yaml
+logger = logging.getLogger(__name__)
+
def _dict_to_list(input_dict: dict) -> list:
return [input_dict[x] for x in sorted(input_dict.keys())]
@@ -100,7 +103,7 @@ class PTAAttribute:
def __repr__(self):
if self.function is not None:
return "PTAATtribute<{:.0f}, {}>".format(
- self.value, self.function._model_str
+ self.value, self.function.model_function
)
return "PTAATtribute<{:.0f}, None>".format(self.value)
@@ -134,8 +137,8 @@ class PTAAttribute:
}
if self.function:
ret["function"] = {
- "raw": self.function._model_str,
- "regression_args": list(self.function._regression_args),
+ "raw": self.function.model_function,
+ "regression_args": list(self.function.model_args),
}
ret["function_error"] = self.function_error
return ret
@@ -1305,8 +1308,8 @@ class PTA:
"power"
]
except KeyError:
- print(
- "[W] skipping model update of state {} due to missing data".format(
+ logger.warning(
+ "skipping model update of state {} due to missing data".format(
state.name
)
)
@@ -1353,8 +1356,8 @@ class PTA:
"timeout"
]
except KeyError:
- print(
- "[W] skipping model update of transition {} due to missing data".format(
+ logger.warning(
+ "skipping model update of transition {} due to missing data".format(
transition.name
)
)
diff --git a/lib/data_parameters.py b/lib/data_parameters.py
index 1150b71..84eacfd 100644
--- a/lib/data_parameters.py
+++ b/lib/data_parameters.py
@@ -7,9 +7,12 @@ length of lists, ane more.
from .protocol_benchmarks import codegen_for_lib
from . import cycles_to_energy, size_to_radio_energy, utils
+import logging
import numpy as np
import ubjson
+logger = logging.getLogger(__name__)
+
def _string_value_length(json):
if type(json) == str:
@@ -224,7 +227,7 @@ class Protolog:
except KeyError:
pass
except TypeError as e:
- print(
+ logger.error(
"TypeError in {} {} {} {}: {} -> {}".format(
arch_lib,
benchmark,
@@ -395,7 +398,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_enc is NaN for {} -> {} -> {}".format(
arch, lib, key
)
@@ -410,7 +413,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_ser is NaN for {} -> {} -> {}".format(
arch, lib, key
)
@@ -425,7 +428,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_encser is NaN for {} -> {} -> {}".format(
arch, lib, key
)
@@ -440,7 +443,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_des is NaN for {} -> {} -> {}".format(
arch, lib, key
)
@@ -455,7 +458,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_dec is NaN for {} -> {} -> {}".format(
arch, lib, key
)
@@ -470,7 +473,7 @@ class Protolog:
except KeyError:
pass
except ValueError:
- print(
+ logger.warning(
"cycles_desdec is NaN for {} -> {} -> {}".format(
arch, lib, key
)
diff --git a/lib/functions.py b/lib/functions.py
index 6d8daa4..94b1aaf 100644
--- a/lib/functions.py
+++ b/lib/functions.py
@@ -5,12 +5,14 @@ This module provides classes and helper functions useful for least-squares
regression and general handling of model functions.
"""
from itertools import chain, combinations
+import logging
import numpy as np
import re
from scipy import optimize
-from .utils import is_numeric, vprint
+from .utils import is_numeric
arg_support_enabled = True
+logger = logging.getLogger(__name__)
def powerset(iterable):
@@ -23,6 +25,47 @@ def powerset(iterable):
return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
+def gplearn_to_function(function_str: str):
+ """
+ Convert gplearn-style function string to Python function.
+
+ Takes a function string like "mul(add(X0, X1), X2)" and returns
+ a Python function implementing the specified behaviour,
+ e.g. "lambda x, y, z: (x + y) * z".
+
+ Supported functions:
+ add -- x + y
+ sub -- x - y
+ mul -- x * y
+ div -- x / y if |y| > 0.001, otherwise 1
+ sqrt -- sqrt(|x|)
+ log -- log(|x|) if |x| > 0.001, otherwise 0
+ inv -- 1 / x if |x| > 0.001, otherwise 0
+ """
+ eval_globals = {
+ "add": lambda x, y: x + y,
+ "sub": lambda x, y: x - y,
+ "mul": lambda x, y: x * y,
+ "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0,
+ "sqrt": lambda x: np.sqrt(np.abs(x)),
+ "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0,
+ "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0,
+ }
+
+ last_arg_index = 0
+ for i in range(0, 100):
+ if function_str.find("X{:d}".format(i)) >= 0:
+ last_arg_index = i
+
+ arg_list = []
+ for i in range(0, last_arg_index + 1):
+ arg_list.append("X{:d}".format(i))
+
+ eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str)
+ logger.debug(eval_str)
+ return eval(eval_str, eval_globals)
+
+
class ParamFunction:
"""
A one-dimensional model function, ready for least squares optimization and similar.
@@ -118,9 +161,7 @@ class AnalyticFunction:
packet length.
"""
- def __init__(
- self, function_str, parameters, num_args, verbose=True, regression_args=None
- ):
+ def __init__(self, function_str, parameters, num_args, regression_args=None):
"""
Create a new AnalyticFunction object from a function string.
@@ -135,18 +176,16 @@ class AnalyticFunction:
:param num_args: number of local function arguments, if any. Set to 0 if
the model attribute does not belong to a function or if function
arguments are not included in the model.
- :param verbose: complain about odd events
:param regression_args: Initial regression variable values,
both for function usage and least squares optimization.
If unset, defaults to [1, 1, 1, ...]
"""
self._parameter_names = parameters
self._num_args = num_args
- self._model_str = function_str
+ self.model_function = function_str
rawfunction = function_str
self._dependson = [False] * (len(parameters) + num_args)
self.fit_success = False
- self.verbose = verbose
if type(function_str) == str:
num_vars_re = re.compile(r"regression_arg\(([0-9]+)\)")
@@ -176,12 +215,12 @@ class AnalyticFunction:
self._function = function_str
if regression_args:
- self._regression_args = regression_args.copy()
+ self.model_args = regression_args.copy()
self._fit_success = True
elif type(function_str) == str:
- self._regression_args = list(np.ones((num_vars)))
+ self.model_args = list(np.ones((num_vars)))
else:
- self._regression_args = []
+ self.model_args = []
def get_fit_data(self, by_param, state_or_tran, model_attribute):
"""
@@ -231,9 +270,8 @@ class AnalyticFunction:
else:
X[i].extend([np.nan] * len(val[model_attribute]))
elif key[0] == state_or_tran and len(key[1]) != dimension:
- vprint(
- self.verbose,
- "[W] Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.".format(
+ logger.warning(
+ "Invalid parameter key length while gathering fit data for {}/{}. is {}, want {}.".format(
state_or_tran, model_attribute, len(key[1]), dimension
),
)
@@ -263,30 +301,27 @@ class AnalyticFunction:
error_function = lambda P, X, y: self._function(P, X) - y
try:
res = optimize.least_squares(
- error_function, self._regression_args, args=(X, Y), xtol=2e-15
+ error_function, self.model_args, args=(X, Y), xtol=2e-15
)
except ValueError as err:
- vprint(
- self.verbose,
- "[W] Fit failed for {}/{}: {} (function: {})".format(
- state_or_tran, model_attribute, err, self._model_str
+ logger.warning(
+ "Fit failed for {}/{}: {} (function: {})".format(
+ state_or_tran, model_attribute, err, self.model_function
),
)
return
if res.status > 0:
- self._regression_args = res.x
+ self.model_args = res.x
self.fit_success = True
else:
- vprint(
- self.verbose,
- "[W] Fit failed for {}/{}: {} (function: {})".format(
- state_or_tran, model_attribute, res.message, self._model_str
+ logger.warning(
+ "Fit failed for {}/{}: {} (function: {})".format(
+ state_or_tran, model_attribute, res.message, self.model_function
),
)
else:
- vprint(
- self.verbose,
- "[W] Insufficient amount of valid parameter keys, cannot fit {}/{}".format(
+ logger.warning(
+ "Insufficient amount of valid parameter keys, cannot fit {}/{}".format(
state_or_tran, model_attribute
),
)
@@ -314,9 +349,9 @@ class AnalyticFunction:
corresponds to lexically first parameter, etc.
:param arg_list: argument values (list of float), if arguments are used.
"""
- if len(self._regression_args) == 0:
+ if len(self.model_args) == 0:
return self._function(param_list, arg_list)
- return self._function(self._regression_args, param_list)
+ return self._function(self.model_args, param_list)
class analytic:
diff --git a/lib/harness.py b/lib/harness.py
index 3b279c0..ae9c28c 100644
--- a/lib/harness.py
+++ b/lib/harness.py
@@ -21,7 +21,7 @@ class TransitionHarness:
* `name`: state or transition name
* `parameter`: currently valid parameter values. If normalization is used, they are already normalized. Each parameter value is either a primitive
int/float/str value (-> constant for each iteration) or a list of
- primitive values (-> set by the return value of the current run, not necessarily constan)
+ primitive values (-> set by the return value of the current run, not necessarily constant)
* `args`: function arguments, if isa == 'transition'
"""
@@ -229,6 +229,7 @@ class TransitionHarness:
log_data_target["parameter"][parameter_name] = list()
log_data_target["parameter"][parameter_name].append(parameter_value)
+ # Here Be Dragons
def parser_cb(self, line):
# print('[HARNESS] got line {}'.format(line))
if re.match(r"\[PTA\] benchmark stop", line):
@@ -440,6 +441,7 @@ class OnboardTimerHarness(TransitionHarness):
log_data_target["parameter"][parameter_name] = list()
log_data_target["parameter"][parameter_name].append(parameter_value)
+ # Here Be Dragons
def parser_cb(self, line):
# print('[HARNESS] got line {}'.format(line))
res = re.match(r"\[PTA\] nop=(\S+)/(\S+)", line)
diff --git a/lib/keysightdlog.py b/lib/keysightdlog.py
deleted file mode 100755
index 89264b9..0000000
--- a/lib/keysightdlog.py
+++ /dev/null
@@ -1,164 +0,0 @@
-#!/usr/bin/env python3
-
-import lzma
-import matplotlib.pyplot as plt
-import numpy as np
-import os
-import struct
-import sys
-import xml.etree.ElementTree as ET
-
-
-def plot_y(Y, **kwargs):
- plot_xy(np.arange(len(Y)), Y, **kwargs)
-
-
-def plot_xy(X, Y, xlabel=None, ylabel=None, title=None, output=None):
- fig, ax1 = plt.subplots(figsize=(10, 6))
- if title != None:
- fig.canvas.set_window_title(title)
- if xlabel != None:
- ax1.set_xlabel(xlabel)
- if ylabel != None:
- ax1.set_ylabel(ylabel)
- plt.subplots_adjust(left=0.1, bottom=0.1, right=0.99, top=0.99)
- plt.plot(X, Y, "bo", markersize=2)
- if output:
- plt.savefig(output)
- with open("{}.txt".format(output), "w") as f:
- print("X Y", file=f)
- for i in range(len(X)):
- print("{} {}".format(X[i], Y[i]), file=f)
- else:
- plt.show()
-
-
-filename = sys.argv[1]
-
-with open(filename, "rb") as logfile:
- lines = []
- line = ""
-
- if ".xz" in filename:
- f = lzma.open(logfile)
- else:
- f = logfile
-
- while line != "</dlog>\n":
- line = f.readline().decode()
- lines.append(line)
- xml_header = "".join(lines)
- raw_header = f.read(8)
- data_offset = f.tell()
- raw_data = f.read()
-
- xml_header = xml_header.replace("1ua>", "X1ua>")
- xml_header = xml_header.replace("2ua>", "X2ua>")
- dlog = ET.fromstring(xml_header)
- channels = []
- for channel in dlog.findall("channel"):
- channel_id = int(channel.get("id"))
- sense_curr = channel.find("sense_curr").text
- sense_volt = channel.find("sense_volt").text
- model = channel.find("ident").find("model").text
- if sense_volt == "1":
- channels.append((channel_id, model, "V"))
- if sense_curr == "1":
- channels.append((channel_id, model, "A"))
-
- num_channels = len(channels)
- duration = int(dlog.find("frame").find("time").text)
- interval = float(dlog.find("frame").find("tint").text)
- real_duration = interval * int(len(raw_data) / (4 * num_channels))
-
- data = np.ndarray(
- shape=(num_channels, int(len(raw_data) / (4 * num_channels))), dtype=np.float32
- )
-
- iterator = struct.iter_unpack(">f", raw_data)
- channel_offset = 0
- measurement_offset = 0
- for value in iterator:
- data[channel_offset, measurement_offset] = value[0]
- if channel_offset + 1 == num_channels:
- channel_offset = 0
- measurement_offset += 1
- else:
- channel_offset += 1
-
-if int(real_duration) != duration:
- print(
- "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format(
- real_duration, duration, interval * 1000000
- )
- )
-else:
- print(
- "Measurement duration: {:d} seconds at {:f} µs per sample".format(
- duration, interval * 1000000
- )
- )
-
-for i, channel in enumerate(channels):
- channel_id, channel_model, channel_type = channel
- print(
- "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} {:s}".format(
- channel_id,
- channel_model,
- np.min(data[i]),
- np.max(data[i]),
- np.mean(data[i]),
- channel_type,
- )
- )
-
- if (
- i > 0
- and channel_type == "A"
- and channels[i - 1][2] == "V"
- and channel_id == channels[i - 1][0]
- ):
- power = data[i - 1] * data[i]
- power = 3.6 * data[i]
- print(
- "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} W".format(
- channel_id, channel_model, np.min(power), np.max(power), np.mean(power)
- )
- )
- min_power = np.min(power)
- max_power = np.max(power)
- power_border = np.mean([min_power, max_power])
- low_power = power[power < power_border]
- high_power = power[power >= power_border]
- plot_y(power)
- print(
- " avg low / high power (delta): {:f} / {:f} ({:f}) W".format(
- np.mean(low_power),
- np.mean(high_power),
- np.mean(high_power) - np.mean(low_power),
- )
- )
- # plot_y(low_power)
- # plot_y(high_power)
- high_power_durations = []
- current_high_power_duration = 0
- for is_hpe in power >= power_border:
- if is_hpe:
- current_high_power_duration += interval
- else:
- if current_high_power_duration > 0:
- high_power_durations.append(current_high_power_duration)
- current_high_power_duration = 0
- print(
- " avg high-power duration: {:f} µs".format(
- np.mean(high_power_durations) * 1000000
- )
- )
-
-# print(xml_header)
-# print(raw_header)
-# print(channels)
-# print(data)
-# print(np.mean(data[0]))
-# print(np.mean(data[1]))
-# print(np.mean(data[0] * data[1]))
diff --git a/lib/lex.py b/lib/lex.py
index 7bb3760..f698e8c 100644
--- a/lib/lex.py
+++ b/lib/lex.py
@@ -1,4 +1,7 @@
from .sly import Lexer, Parser
+import logging
+
+logger = logging.getLogger(__name__)
class TimedWordLexer(Lexer):
@@ -38,7 +41,7 @@ class TimedSequenceLexer(Lexer):
FUNCTIONSEP = r";"
def error(self, t):
- print("Illegal character '%s'" % t.value[0])
+ logger.error("Illegal character '%s'" % t.value[0])
if t.value[0] == "{" and t.value.find("}"):
self.index += 1 + t.value.find("}")
else:
@@ -153,11 +156,11 @@ class TimedSequenceParser(Parser):
def error(self, p):
if p:
- print("Syntax error at token", p.type)
+ logger.error("Syntax error at token", p.type)
# Just discard the token and tell the parser it's okay.
self.errok()
else:
- print("Syntax error at EOF")
+ logger.error("Syntax error at EOF")
class TimedWord:
diff --git a/lib/dfatool.py b/lib/loader.py
index 63639d3..4e07c92 100644
--- a/lib/dfatool.py
+++ b/lib/loader.py
@@ -3,26 +3,17 @@
import csv
import io
import json
+import logging
import numpy as np
import os
import re
-from scipy import optimize
-from sklearn.metrics import r2_score
import struct
import tarfile
import hashlib
from multiprocessing import Pool
-from .functions import analytic
-from .functions import AnalyticFunction
-from .parameters import ParamStats
-from .utils import (
- vprint,
- is_numeric,
- soft_cast_int,
- param_slice_eq,
- remove_index_from_tuple,
-)
-from .utils import by_name_to_by_param, match_parameter_values, running_mean
+from .utils import running_mean, soft_cast_int
+
+logger = logging.getLogger(__name__)
try:
from .pubcode import Code128
@@ -36,135 +27,6 @@ except ImportError:
arg_support_enabled = True
-def gplearn_to_function(function_str: str):
- """
- Convert gplearn-style function string to Python function.
-
- Takes a function string like "mul(add(X0, X1), X2)" and returns
- a Python function implementing the specified behaviour,
- e.g. "lambda x, y, z: (x + y) * z".
-
- Supported functions:
- add -- x + y
- sub -- x - y
- mul -- x * y
- div -- x / y if |y| > 0.001, otherwise 1
- sqrt -- sqrt(|x|)
- log -- log(|x|) if |x| > 0.001, otherwise 0
- inv -- 1 / x if |x| > 0.001, otherwise 0
- """
- eval_globals = {
- "add": lambda x, y: x + y,
- "sub": lambda x, y: x - y,
- "mul": lambda x, y: x * y,
- "div": lambda x, y: np.divide(x, y) if np.abs(y) > 0.001 else 1.0,
- "sqrt": lambda x: np.sqrt(np.abs(x)),
- "log": lambda x: np.log(np.abs(x)) if np.abs(x) > 0.001 else 0.0,
- "inv": lambda x: 1.0 / x if np.abs(x) > 0.001 else 0.0,
- }
-
- last_arg_index = 0
- for i in range(0, 100):
- if function_str.find("X{:d}".format(i)) >= 0:
- last_arg_index = i
-
- arg_list = []
- for i in range(0, last_arg_index + 1):
- arg_list.append("X{:d}".format(i))
-
- eval_str = "lambda {}, *whatever: {}".format(",".join(arg_list), function_str)
- print(eval_str)
- return eval(eval_str, eval_globals)
-
-
-def append_if_set(aggregate: dict, data: dict, key: str):
- """Append data[key] to aggregate if key in data."""
- if key in data:
- aggregate.append(data[key])
-
-
-def mean_or_none(arr):
- """
- Compute mean of NumPy array `arr`, return -1 if empty.
-
- :param arr: 1-Dimensional NumPy array
- """
- if len(arr):
- return np.mean(arr)
- return -1
-
-
-def aggregate_measures(aggregate: float, actual: list) -> dict:
- """
- Calculate error measures for model value on data list.
-
- arguments:
- aggregate -- model value (float or int)
- actual -- real-world / reference values (list of float or int)
-
- return value:
- See regression_measures
- """
- aggregate_array = np.array([aggregate] * len(actual))
- return regression_measures(aggregate_array, np.array(actual))
-
-
-def regression_measures(predicted: np.ndarray, actual: np.ndarray):
- """
- Calculate error measures by comparing model values to reference values.
-
- arguments:
- predicted -- model values (np.ndarray)
- actual -- real-world / reference values (np.ndarray)
-
- Returns a dict containing the following measures:
- mae -- Mean Absolute Error
- mape -- Mean Absolute Percentage Error,
- if all items in actual are non-zero (NaN otherwise)
- smape -- Symmetric Mean Absolute Percentage Error,
- if no 0,0-pairs are present in actual and predicted (NaN otherwise)
- msd -- Mean Square Deviation
- rmsd -- Root Mean Square Deviation
- ssr -- Sum of Squared Residuals
- rsq -- R^2 measure, see sklearn.metrics.r2_score
- count -- Number of values
- """
- if type(predicted) != np.ndarray:
- raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
- if type(actual) != np.ndarray:
- raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
- deviations = predicted - actual
- # mean = np.mean(actual)
- if len(deviations) == 0:
- return {}
- measures = {
- "mae": np.mean(np.abs(deviations), dtype=np.float64),
- "msd": np.mean(deviations ** 2, dtype=np.float64),
- "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
- "ssr": np.sum(deviations ** 2, dtype=np.float64),
- "rsq": r2_score(actual, predicted),
- "count": len(actual),
- }
-
- # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
-
- if np.all(actual != 0):
- measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
- else:
- measures["mape"] = np.nan
- if np.all(np.abs(predicted) + np.abs(actual) != 0):
- measures["smape"] = (
- np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
- * 100
- )
- else:
- measures["smape"] = np.nan
- # if np.all(rsq_quotient != 0):
- # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
-
- return measures
-
-
class KeysightCSV:
"""Simple loader for Keysight CSV data, as exported by the windows software."""
@@ -194,162 +56,6 @@ class KeysightCSV:
return timestamps, currents
-def _xv_partitions_kfold(length, num_slices):
- pairs = []
- indexes = np.arange(length)
- for i in range(0, num_slices):
- training = np.delete(indexes, slice(i, None, num_slices))
- validation = indexes[i::num_slices]
- pairs.append((training, validation))
- return pairs
-
-
-def _xv_partition_montecarlo(length):
- shuffled = np.random.permutation(np.arange(length))
- border = int(length * float(2) / 3)
- training = shuffled[:border]
- validation = shuffled[border:]
- return (training, validation)
-
-
-class CrossValidator:
- """
- Cross-Validation helper for model generation.
-
- Given a set of measurements and a model class, it will partition the
- data into training and validation sets, train the model on the training
- set, and assess its quality on the validation set. This is repeated
- several times depending on cross-validation algorithm and configuration.
- Reports the mean model error over all cross-validation runs.
- """
-
- def __init__(self, model_class, by_name, parameters, arg_count):
- """
- Create a new CrossValidator object.
-
- Does not perform cross-validation yet.
-
- arguments:
- model_class -- model class/type used for model synthesis,
- e.g. PTAModel or AnalyticModel. model_class must have a
- constructor accepting (by_name, parameters, arg_count, verbose = False)
- and provide an assess method.
- by_name -- measurements aggregated by state/transition/function/... name.
- Layout: by_name[name][attribute] = list of data. Additionally,
- by_name[name]['attributes'] must be set to the list of attributes,
- e.g. ['power'] or ['duration', 'energy'].
- """
- self.model_class = model_class
- self.by_name = by_name
- self.names = sorted(by_name.keys())
- self.parameters = sorted(parameters)
- self.arg_count = arg_count
-
- def montecarlo(self, model_getter, count=200):
- """
- Perform Monte Carlo cross-validation and return average model quality.
-
- The by_name data is randomly divided into 2/3 training and 1/3
- validation. After creating a model for the training set, the
- model type returned by model_getter is evaluated on the validation set.
- This is repeated count times (defaulting to 200); the average of all
- measures is returned to the user.
-
- arguments:
- model_getter -- function with signature (model_object) -> model,
- e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware
- model with automatic parameter detection.
- count -- number of validation runs to perform, defaults to 200
-
- return value:
- dict of model quality measures.
- {
- 'by_name' : {
- for each name: {
- for each attribute: {
- 'mae' : mean of all mean absolute errors
- 'mae_list' : list of the individual MAE values encountered during cross-validation
- 'smape' : mean of all symmetric mean absolute percentage errors
- 'smape_list' : list of the individual SMAPE values encountered during cross-validation
- }
- }
- }
- }
- """
- ret = {"by_name": dict()}
-
- for name in self.names:
- ret["by_name"][name] = dict()
- for attribute in self.by_name[name]["attributes"]:
- ret["by_name"][name][attribute] = {
- "mae_list": list(),
- "smape_list": list(),
- }
-
- for _ in range(count):
- res = self._single_montecarlo(model_getter)
- for name in self.names:
- for attribute in self.by_name[name]["attributes"]:
- ret["by_name"][name][attribute]["mae_list"].append(
- res["by_name"][name][attribute]["mae"]
- )
- ret["by_name"][name][attribute]["smape_list"].append(
- res["by_name"][name][attribute]["smape"]
- )
-
- for name in self.names:
- for attribute in self.by_name[name]["attributes"]:
- ret["by_name"][name][attribute]["mae"] = np.mean(
- ret["by_name"][name][attribute]["mae_list"]
- )
- ret["by_name"][name][attribute]["smape"] = np.mean(
- ret["by_name"][name][attribute]["smape_list"]
- )
-
- return ret
-
- def _single_montecarlo(self, model_getter):
- training = dict()
- validation = dict()
- for name in self.names:
- training[name] = {"attributes": self.by_name[name]["attributes"]}
- validation[name] = {"attributes": self.by_name[name]["attributes"]}
-
- if "isa" in self.by_name[name]:
- training[name]["isa"] = self.by_name[name]["isa"]
- validation[name]["isa"] = self.by_name[name]["isa"]
-
- data_count = len(self.by_name[name]["param"])
- training_subset, validation_subset = _xv_partition_montecarlo(data_count)
-
- for attribute in self.by_name[name]["attributes"]:
- self.by_name[name][attribute] = np.array(self.by_name[name][attribute])
- training[name][attribute] = self.by_name[name][attribute][
- training_subset
- ]
- validation[name][attribute] = self.by_name[name][attribute][
- validation_subset
- ]
-
- # We can't use slice syntax for 'param', which may contain strings and other odd values
- training[name]["param"] = list()
- validation[name]["param"] = list()
- for idx in training_subset:
- training[name]["param"].append(self.by_name[name]["param"][idx])
- for idx in validation_subset:
- validation[name]["param"].append(self.by_name[name]["param"][idx])
-
- training_data = self.model_class(
- training, self.parameters, self.arg_count, verbose=False
- )
- training_model = model_getter(training_data)
- validation_data = self.model_class(
- validation, self.parameters, self.arg_count, verbose=False
- )
-
- return validation_data.assess(training_model)
-
-
def _preprocess_mimosa(measurement):
setup = measurement["setup"]
mim = MIMOSA(
@@ -457,9 +163,7 @@ class TimingData:
transitions = list(
filter(lambda x: x["isa"] == "transition", trace["trace"])
)
- self.traces.append(
- {"id": trace["id"], "trace": transitions,}
- )
+ self.traces.append({"id": trace["id"], "trace": transitions})
for i, trace in enumerate(self.traces):
trace["orig_id"] = trace["id"]
trace["id"] = i
@@ -490,14 +194,13 @@ class TimingData:
self.traces_by_fileno.extend(log_data["traces"])
self._concatenate_analyzed_traces()
- def get_preprocessed_data(self, verbose=True):
+ def get_preprocessed_data(self):
"""
Return a list of DFA traces annotated with timing and parameter data.
Suitable for the PTAModel constructor.
See PTAModel(...) docstring for format details.
"""
- self.verbose = verbose
if self.preprocessed:
return self.traces
if self.version == 0:
@@ -539,7 +242,7 @@ class RawData:
file system, making subsequent loads near-instant.
"""
- def __init__(self, filenames, with_traces=False):
+ def __init__(self, filenames, with_traces=False, skip_cache=False):
"""
Create a new RawData object.
@@ -602,6 +305,7 @@ class RawData:
self._parameter_names = None
self.ignore_clipping = False
self.pta = None
+ self.ptalog = None
with tarfile.open(filenames[0]) as tf:
for member in tf.getmembers():
@@ -612,9 +316,12 @@ class RawData:
elif ".etlog" in member.name:
self.version = 2
break
+ if self.version >= 1:
+ self.ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json")))
+ self.pta = self.ptalog["pta"]
self.set_cache_file()
- if not with_traces:
+ if not with_traces and not skip_cache:
self.load_cache()
def set_cache_file(self):
@@ -631,6 +338,8 @@ class RawData:
self.preprocessing_stats = cache_data["preprocessing_stats"]
if "pta" in cache_data:
self.pta = cache_data["pta"]
+ if "ptalog" in cache_data:
+ self.ptalog = cache_data["ptalog"]
self.setup_by_fileno = cache_data["setup_by_fileno"]
self.preprocessed = True
@@ -647,6 +356,7 @@ class RawData:
"traces": self.traces,
"preprocessing_stats": self.preprocessing_stats,
"pta": self.pta,
+ "ptalog": self.ptalog,
"setup_by_fileno": self.setup_by_fileno,
}
json.dump(cache_data, f)
@@ -1050,7 +760,7 @@ class RawData:
trace["id"] = i
return trace_output
- def get_preprocessed_data(self, verbose=True):
+ def get_preprocessed_data(self):
"""
Return a list of DFA traces annotated with energy, timing, and parameter data.
The list is cached on disk, unless the constructor was called with `with_traces` set.
@@ -1103,7 +813,6 @@ class RawData:
* `args`: List of arguments the corresponding function call was called with. args entries are strings which are not necessarily numeric
* `code`: List of function name (first entry) and arguments (remaining entries) of the corresponding function call
"""
- self.verbose = verbose
if self.preprocessed:
return self.traces
if self.version == 0:
@@ -1145,8 +854,7 @@ class RawData:
new_filenames = list()
with tarfile.open(filename) as tf:
- ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json")))
- self.pta = ptalog["pta"]
+ ptalog = self.ptalog
# Benchmark code may be too large to be executed in a single
# run, so benchmarks (a benchmark is basically a list of DFA runs)
@@ -1200,8 +908,7 @@ class RawData:
new_filenames = list()
with tarfile.open(filename) as tf:
- ptalog = json.load(tf.extractfile(tf.getmember("ptalog.json")))
- self.pta = ptalog["pta"]
+ ptalog = self.ptalog
# Benchmark code may be too large to be executed in a single
# run, so benchmarks (a benchmark is basically a list of DFA runs)
@@ -1292,13 +999,12 @@ class RawData:
for measurement in measurements:
if "energy_trace" not in measurement:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logger.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e="; ".join(measurement["datasource_errors"]),
- ),
+ )
)
continue
@@ -1315,32 +1021,29 @@ class RawData:
self._merge_online_and_offline(measurement)
num_valid += 1
else:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logger.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e=measurement["error"],
- ),
+ )
)
elif version == 2:
if self._measurement_is_valid_2(measurement):
self._merge_online_and_etlog(measurement)
num_valid += 1
else:
- vprint(
- self.verbose,
- "[W] Skipping {ar:s}/{m:s}: {e:s}".format(
+ logger.warning(
+ "Skipping {ar:s}/{m:s}: {e:s}".format(
ar=self.filenames[measurement["fileno"]],
m=measurement["info"].name,
e=measurement["error"],
- ),
+ )
)
- vprint(
- self.verbose,
- "[I] {num_valid:d}/{num_total:d} measurements are valid".format(
+ logger.info(
+ "{num_valid:d}/{num_total:d} measurements are valid".format(
num_valid=num_valid, num_total=len(measurements)
- ),
+ )
)
if version == 0:
self.traces = self._concatenate_traces(self.traces_by_fileno)
@@ -1357,597 +1060,6 @@ class RawData:
}
-class ParallelParamFit:
- """
- Fit a set of functions on parameterized measurements.
-
- One parameter is variale, all others are fixed. Reports the best-fitting
- function type for each parameter.
- """
-
- def __init__(self, by_param):
- """Create a new ParallelParamFit object."""
- self.fit_queue = []
- self.by_param = by_param
-
- def enqueue(
- self,
- state_or_tran,
- attribute,
- param_index,
- param_name,
- safe_functions_enabled=False,
- param_filter=None,
- ):
- """
- Add state_or_tran/attribute/param_name to fit queue.
-
- This causes fit() to compute the best-fitting function for this model part.
- """
- self.fit_queue.append(
- {
- "key": [state_or_tran, attribute, param_name, param_filter],
- "args": [
- self.by_param,
- state_or_tran,
- attribute,
- param_index,
- safe_functions_enabled,
- param_filter,
- ],
- }
- )
-
- def fit(self):
- """
- Fit functions on previously enqueue data.
-
- Fitting is one in parallel with one process per core.
-
- Results can be accessed using the public ParallelParamFit.results object.
- """
- with Pool() as pool:
- self.results = pool.map(_try_fits_parallel, self.fit_queue)
-
-
-def _try_fits_parallel(arg):
- """
- Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
-
- Must be a global function as it is called from a multiprocessing Pool.
- """
- return {"key": arg["key"], "result": _try_fits(*arg["args"])}
-
-
-def _try_fits(
- by_param,
- state_or_tran,
- model_attribute,
- param_index,
- safe_functions_enabled=False,
- param_filter: dict = None,
-):
- """
- Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
-
- This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
- The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
- Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
- Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`.
-
- :returns: a dictionary with the following elements:
- best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
- best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
- mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
- median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
- results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
-
- :param by_param: measurements partitioned by state/transition/... name and parameter values.
- Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}`
-
- :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple).
- Example: `'foo'`
-
- :param model_attribute: attribute for which goodness-of-fit will be calculated.
- Example: `'bar'`
-
- :param param_index: index of the parameter used as model input
- :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
- :param param_filter: Only use measurements whose parameters match param_filter for fitting.
- """
-
- functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
-
- for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()):
- # We might remove elements from 'functions' while iterating over
- # its keys. A generator will not allow this, so we need to
- # convert to a list.
- function_names = list(functions.keys())
- for function_name in function_names:
- function_object = functions[function_name]
- if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
- param_key[1][param_index]
- ):
- functions.pop(function_name, None)
-
- raw_results = dict()
- raw_results_by_param = dict()
- ref_results = {"mean": list(), "median": list()}
- results = dict()
- results_by_param = dict()
-
- seen_parameter_combinations = set()
-
- # for each parameter combination:
- for param_key in filter(
- lambda x: x[0] == state_or_tran
- and remove_index_from_tuple(x[1], param_index)
- not in seen_parameter_combinations
- and len(by_param[x]["param"])
- and match_parameter_values(by_param[x]["param"][0], param_filter),
- by_param.keys(),
- ):
- X = []
- Y = []
- num_valid = 0
- num_total = 0
-
- # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
- # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
- seen_parameter_combinations.add(
- remove_index_from_tuple(param_key[1], param_index)
- )
-
- # for each value of the parameter denoted by param_index (all other parameters remain the same):
- for k, v in filter(
- lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
- ):
- num_total += 1
- if is_numeric(k[1][param_index]):
- num_valid += 1
- X.extend([float(k[1][param_index])] * len(v[model_attribute]))
- Y.extend(v[model_attribute])
-
- if num_valid > 2:
- X = np.array(X)
- Y = np.array(Y)
- other_parameters = remove_index_from_tuple(k[1], param_index)
- raw_results_by_param[other_parameters] = dict()
- results_by_param[other_parameters] = dict()
- for function_name, param_function in functions.items():
- if function_name not in raw_results:
- raw_results[function_name] = dict()
- error_function = param_function.error_function
- res = optimize.least_squares(
- error_function, [0, 1], args=(X, Y), xtol=2e-15
- )
- measures = regression_measures(param_function.eval(res.x, X), Y)
- raw_results_by_param[other_parameters][function_name] = measures
- for measure, error_rate in measures.items():
- if measure not in raw_results[function_name]:
- raw_results[function_name][measure] = list()
- raw_results[function_name][measure].append(error_rate)
- # print(function_name, res, measures)
- mean_measures = aggregate_measures(np.mean(Y), Y)
- ref_results["mean"].append(mean_measures["rmsd"])
- raw_results_by_param[other_parameters]["mean"] = mean_measures
- median_measures = aggregate_measures(np.median(Y), Y)
- ref_results["median"].append(median_measures["rmsd"])
- raw_results_by_param[other_parameters]["median"] = median_measures
-
- if not len(ref_results["mean"]):
- # Insufficient data for fitting
- # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
- return {"best": None, "best_rmsd": np.inf, "results": results}
-
- for (
- other_parameter_combination,
- other_parameter_results,
- ) in raw_results_by_param.items():
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in other_parameter_results.items():
- if len(result) > 0:
- results[function_name] = result
- rmsd = result["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
- results_by_param[other_parameter_combination] = {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": results["mean"]["rmsd"],
- "median_rmsd": results["median"]["rmsd"],
- "results": results,
- }
-
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in raw_results.items():
- if len(result) > 0:
- results[function_name] = {}
- for measure in result.keys():
- results[function_name][measure] = np.mean(result[measure])
- rmsd = results[function_name]["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
-
- return {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": np.mean(ref_results["mean"]),
- "median_rmsd": np.mean(ref_results["median"]),
- "results": results,
- "results_by_other_param": results_by_param,
- }
-
-
-def _num_args_from_by_name(by_name):
- num_args = dict()
- for key, value in by_name.items():
- if "args" in value:
- num_args[key] = len(value["args"][0])
- return num_args
-
-
-def get_fit_result(results, name, attribute, verbose=False, param_filter: dict = None):
- """
- Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
-
- Filters out results where the best function is worse (or not much better than) static mean/median estimates.
-
- :param results: fit results as returned by `paramfit.results`
- :param name: state/transition/... name, e.g. 'TX'
- :param attribute: model attribute, e.g. 'duration'
- :param verbose: print debug message to stdout when deliberately not using a determined fit function
- :param param_filter:
- :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
- """
- fit_result = dict()
- for result in results:
- if (
- result["key"][0] == name
- and result["key"][1] == attribute
- and result["key"][3] == param_filter
- and result["result"]["best"] is not None
- ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
- this_result = result["result"]
- if this_result["best_rmsd"] >= min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- vprint(
- verbose,
- "[I] Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- ),
- )
- # See notes on depends_on_param
- elif this_result["best_rmsd"] >= 0.8 * min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- vprint(
- verbose,
- "[I] Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
- name,
- attribute,
- result["key"][2],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- ),
- )
- else:
- fit_result[result["key"][2]] = this_result
- return fit_result
-
-
-class AnalyticModel:
- u"""
- Parameter-aware analytic energy/data size/... model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- These provide measurements aggregated by (function/state/...) name
- and (for by_param) parameter values. Layout:
- dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'bar' : [5, 6, 7],
- 'attributes' : ['foo', 'bar'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- }
-
- methods:
- get_static -- return static (parameter-unaware) model.
- get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param.
- get_fitted -- return parameter-aware model using fitted functions for behaviour prediction.
-
- variables:
- names -- function/state/... names (i.e., the keys of by_name)
- parameters -- parameter names
- stats -- ParamStats object providing parameter-dependency statistics for each name and attribute
- assess -- calculate model quality
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count=None,
- function_override=dict(),
- verbose=True,
- use_corrcoef=False,
- ):
- """
- Create a new AnalyticModel and compute parameter statistics.
-
- :param by_name: measurements aggregated by (function/state/...) name.
- Layout: dictionary with one key per name ('send', 'TX', ...) or
- one key per name and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
-
- Parameter values must be ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- - for each attribute mentioned in 'attributes': A list with measurements.
- All list except for 'attributes' must have the same length.
-
- For example:
- parameters = ['foo_count', 'irrelevant']
- by_name = {
- 'foo' : [1, 1, 2],
- 'duration' : [5, 6, 7],
- 'attributes' : ['foo', 'duration'],
- 'param' : [[1, 0], [1, 0], [2, 0]]
- # foo_count-^ ^-irrelevant
- }
- :param parameters: List of parameter names
- :param function_override: dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- :param verbose: Print debug/info output while generating the model?
- :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
- """
- self.cache = dict()
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self.names = sorted(by_name.keys())
- self.parameters = sorted(parameters)
- self.function_override = function_override.copy()
- self.verbose = verbose
- self._use_corrcoef = use_corrcoef
- self._num_args = arg_count
- if self._num_args is None:
- self._num_args = _num_args_from_by_name(by_name)
-
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self.parameters,
- self._num_args,
- verbose=verbose,
- use_corrcoef=use_corrcoef,
- )
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- vprint(
- self.verbose,
- "[W] Got no data for {} {}: {}".format(name, key, fpe),
- )
- return model
-
- def param_index(self, param_name):
- if param_name in self.parameters:
- return self.parameters.index(param_name)
- return len(self.parameters) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self.parameters):
- return self.parameters[param_index]
- return str(param_index)
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get paramete-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict([[name, {}] for name in self.by_name.keys()])
- paramfit = ParallelParamFit(self.by_param)
-
- for name in self.by_name.keys():
- for attribute in self.by_name[name]["attributes"]:
- for param_index, param in enumerate(self.parameters):
- if self.stats.depends_on_param(name, attribute, param):
- paramfit.enqueue(name, attribute, param_index, param, False)
- if arg_support_enabled and name in self._num_args:
- for arg_index in range(self._num_args[name]):
- if self.stats.depends_on_arg(name, attribute, arg_index):
- paramfit.enqueue(
- name,
- attribute,
- len(self.parameters) + arg_index,
- arg_index,
- False,
- )
-
- paramfit.fit()
-
- for name in self.by_name.keys():
- num_args = 0
- if name in self._num_args:
- num_args = self._num_args[name]
- for attribute in self.by_name[name]["attributes"]:
- fit_result = get_fit_result(
- paramfit.results, name, attribute, self.verbose
- )
-
- if (name, attribute) in self.function_override:
- function_str = self.function_override[(name, attribute)]
- x = AnalyticFunction(function_str, self.parameters, num_args)
- x.fit(self.by_param, name, attribute)
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
- elif len(fit_result.keys()):
- x = analytic.function_powerset(
- fit_result, self.parameters, num_args
- )
- x.fit(self.by_param, name, attribute)
-
- if x.fit_success:
- param_model[name][attribute] = {
- "fit_result": fit_result,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this AnalyticModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for attribute in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(
- name, attribute, param=elem["param"][i]
- ),
- range(len(elem[attribute])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[attribute])
- detailed_results[name][attribute] = measures
-
- return {
- "by_name": detailed_results,
- }
-
- def to_json(self):
- # TODO
- pass
-
-
def _add_trace_data_to_aggregate(aggregate, key, element):
# Only cares about element['isa'], element['offline_aggregates'], and
# element['plan']['level']
@@ -2049,540 +1161,6 @@ def pta_trace_to_aggregate(traces, ignore_trace_indexes=[]):
return by_name, parameter_names, arg_count
-class PTAModel:
- u"""
- Parameter-aware PTA-based energy model.
-
- Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
-
- The model heavily relies on two internal data structures:
- PTAModel.by_name and PTAModel.by_param.
-
- These provide measurements aggregated by state/transition name
- and (for by_param) parameter values. Layout:
- dictionary with one key per state/transition ('send', 'TX', ...) or
- one key per state/transition and parameter combination
- (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
- For by_param, parameter values are ordered corresponding to the lexically sorted parameter names.
-
- Each element is in turn a dict with the following elements:
- - isa: 'state' or 'transition'
- - power: list of mean power measurements in µW
- - duration: list of durations in µs
- - power_std: list of stddev of power per state/transition
- - energy: consumed energy (power*duration) in pJ
- - paramkeys: list of parameter names in each measurement (-> list of lists)
- - param: list of parameter values in each measurement (-> list of lists)
- - attributes: list of keys that should be analyzed,
- e.g. ['power', 'duration']
- additionally, only if isa == 'transition':
- - timeout: list of duration of previous state in µs
- - rel_energy_prev: transition energy relative to previous state mean power in pJ
- - rel_energy_next: transition energy relative to next state mean power in pJ
- """
-
- def __init__(
- self,
- by_name,
- parameters,
- arg_count,
- traces=[],
- ignore_trace_indexes=[],
- discard_outliers=None,
- function_override={},
- verbose=True,
- use_corrcoef=False,
- pta=None,
- ):
- """
- Prepare a new PTA energy model.
-
- Actual model generation is done on-demand by calling the respective functions.
-
- arguments:
- by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate.
- parameters -- list of parameter names, as returned by pta_trace_to_aggregate
- arg_count -- function arguments, as returned by pta_trace_to_aggregate
- traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data()
- ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored.
- discard_outliers -- currently not supported: threshold for outlier detection and removel (float).
- Outlier detection is performed individually for each state/transition in each trace,
- so it only works if the benchmark ran several times.
- Given "data" (a set of measurements of the same thing, e.g. TX duration in the third benchmark trace),
- "m" (the median of all attribute measurements with the same parameters, which may include data from other traces),
- a data point X is considered an outlier if
- | 0.6745 * (X - m) / median(|data - m|) | > discard_outliers .
- function_override -- dict of overrides for automatic parameter function generation.
- If (state or transition name, model attribute) is present in function_override,
- the corresponding text string is the function used for analytic (parameter-aware/fitted)
- modeling of this attribute. It is passed to AnalyticFunction, see
- there for the required format. Note that this happens regardless of
- parameter dependency detection: The provided analytic function will be assigned
- even if it seems like the model attribute is static / parameter-independent.
- verbose -- print informative output, e.g. when removing an outlier
- use_corrcoef -- use correlation coefficient instead of stddev comparison
- to detect whether a model attribute depends on a parameter
- pta -- hardware model as `PTA` object
- """
- self.by_name = by_name
- self.by_param = by_name_to_by_param(by_name)
- self._parameter_names = sorted(parameters)
- self._num_args = arg_count
- self._use_corrcoef = use_corrcoef
- self.traces = traces
- self.stats = ParamStats(
- self.by_name,
- self.by_param,
- self._parameter_names,
- self._num_args,
- self._use_corrcoef,
- verbose=verbose,
- )
- self.cache = {}
- np.seterr("raise")
- self._outlier_threshold = discard_outliers
- self.function_override = function_override.copy()
- self.verbose = verbose
- self.pta = pta
- self.ignore_trace_indexes = ignore_trace_indexes
- self._aggregate_to_ndarray(self.by_name)
-
- def _aggregate_to_ndarray(self, aggregate):
- for elem in aggregate.values():
- for key in elem["attributes"]:
- elem[key] = np.array(elem[key])
-
- # This heuristic is very similar to the "function is not much better than
- # median" checks in get_fitted. So far, doing it here as well is mostly
- # a performance and not an algorithm quality decision.
- # --df, 2018-04-18
- def depends_on_param(self, state_or_trans, key, param):
- return self.stats.depends_on_param(state_or_trans, key, param)
-
- # See notes on depends_on_param
- def depends_on_arg(self, state_or_trans, key, param):
- return self.stats.depends_on_arg(state_or_trans, key, param)
-
- def _get_model_from_dict(self, model_dict, model_function):
- model = {}
- for name, elem in model_dict.items():
- model[name] = {}
- for key in elem["attributes"]:
- try:
- model[name][key] = model_function(elem[key])
- except RuntimeWarning:
- vprint(self.verbose, "[W] Got no data for {} {}".format(name, key))
- except FloatingPointError as fpe:
- vprint(
- self.verbose,
- "[W] Got no data for {} {}: {}".format(name, key, fpe),
- )
- return model
-
- def get_static(self, use_mean=False):
- """
- Get static model function: name, attribute -> model value.
-
- Uses the median of by_name for modeling, unless `use_mean` is set.
- """
- getter_function = np.median
-
- if use_mean:
- getter_function = np.mean
-
- static_model = self._get_model_from_dict(self.by_name, getter_function)
-
- def static_model_getter(name, key, **kwargs):
- return static_model[name][key]
-
- return static_model_getter
-
- def get_param_lut(self, fallback=False):
- """
- Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
-
- The function can only give model values for parameter combinations
- present in by_param. By default, it raises KeyError for other values.
-
- arguments:
- fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
- """
- static_model = self._get_model_from_dict(self.by_name, np.median)
- lut_model = self._get_model_from_dict(self.by_param, np.median)
-
- def lut_median_getter(name, key, param, arg=[], **kwargs):
- param.extend(map(soft_cast_int, arg))
- try:
- return lut_model[(name, tuple(param))][key]
- except KeyError:
- if fallback:
- return static_model[name][key]
- raise
-
- return lut_median_getter
-
- def param_index(self, param_name):
- if param_name in self._parameter_names:
- return self._parameter_names.index(param_name)
- return len(self._parameter_names) + int(param_name)
-
- def param_name(self, param_index):
- if param_index < len(self._parameter_names):
- return self._parameter_names[param_index]
- return str(param_index)
-
- def get_fitted(self, safe_functions_enabled=False):
- """
- Get parameter-aware model function and model information function.
-
- Returns two functions:
- model_function(name, attribute, param=parameter values) -> model value.
- model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
- """
- if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
- return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
-
- static_model = self._get_model_from_dict(self.by_name, np.median)
- param_model = dict(
- [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
- )
- paramfit = ParallelParamFit(self.by_param)
- for state_or_tran in self.by_name.keys():
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = {}
- for parameter_index, parameter_name in enumerate(self._parameter_names):
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- )
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- parameter_index,
- parameter_name,
- safe_functions_enabled,
- codependent_param_dict,
- )
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- for arg_index in range(self._num_args[state_or_tran]):
- if self.depends_on_arg(
- state_or_tran, model_attribute, arg_index
- ):
- paramfit.enqueue(
- state_or_tran,
- model_attribute,
- len(self._parameter_names) + arg_index,
- arg_index,
- safe_functions_enabled,
- )
- paramfit.fit()
-
- for state_or_tran in self.by_name.keys():
- num_args = 0
- if (
- arg_support_enabled
- and self.by_name[state_or_tran]["isa"] == "transition"
- ):
- num_args = self._num_args[state_or_tran]
- for model_attribute in self.by_name[state_or_tran]["attributes"]:
- fit_results = get_fit_result(
- paramfit.results, state_or_tran, model_attribute, self.verbose
- )
-
- for parameter_name in self._parameter_names:
- if self.depends_on_param(
- state_or_tran, model_attribute, parameter_name
- ):
- for (
- codependent_param_dict
- ) in self.stats.codependent_parameter_value_dicts(
- state_or_tran, model_attribute, parameter_name
- ):
- pass
- # FIXME get_fit_result hat ja gar keinen Parameter als Argument...
-
- if (state_or_tran, model_attribute) in self.function_override:
- function_str = self.function_override[
- (state_or_tran, model_attribute)
- ]
- x = AnalyticFunction(function_str, self._parameter_names, num_args)
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
- elif len(fit_results.keys()):
- x = analytic.function_powerset(
- fit_results, self._parameter_names, num_args
- )
- x.fit(self.by_param, state_or_tran, model_attribute)
- if x.fit_success:
- param_model[state_or_tran][model_attribute] = {
- "fit_result": fit_results,
- "function": x,
- }
-
- def model_getter(name, key, **kwargs):
- if "arg" in kwargs and "param" in kwargs:
- kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
- if key in param_model[name]:
- param_list = kwargs["param"]
- param_function = param_model[name][key]["function"]
- if param_function.is_predictable(param_list):
- return param_function.eval(param_list)
- return static_model[name][key]
-
- def info_getter(name, key):
- if key in param_model[name]:
- return param_model[name][key]
- return None
-
- self.cache["fitted_model_getter"] = model_getter
- self.cache["fitted_info_getter"] = info_getter
-
- return model_getter, info_getter
-
- def to_json(self):
- static_model = self.get_static()
- static_quality = self.assess(static_model)
- param_model, param_info = self.get_fitted()
- analytic_quality = self.assess(param_model)
- self.pta.update(
- static_model,
- param_info,
- static_error=static_quality["by_name"],
- analytic_error=analytic_quality["by_name"],
- )
- return self.pta.to_json()
-
- def states(self):
- """Return sorted list of state names."""
- return sorted(
- list(
- filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
- )
- )
-
- def transitions(self):
- """Return sorted list of transition names."""
- return sorted(
- list(
- filter(
- lambda k: self.by_name[k]["isa"] == "transition",
- self.by_name.keys(),
- )
- )
- )
-
- def states_and_transitions(self):
- """Return list of states and transition names."""
- ret = self.states()
- ret.extend(self.transitions())
- return ret
-
- def parameters(self):
- return self._parameter_names
-
- def attributes(self, state_or_trans):
- return self.by_name[state_or_trans]["attributes"]
-
- def assess(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
-
- state/transition/... name and parameter values are fed into model_function.
- The by_name entries of this PTAModel are used as ground truth and
- compared with the values predicted by model_function.
-
- For proper model assessments, the data used to generate model_function
- and the data fed into this AnalyticModel instance must be mutually
- exclusive (e.g. by performing cross validation). Otherwise,
- overfitting cannot be detected.
- """
- detailed_results = {}
- for name, elem in sorted(self.by_name.items()):
- detailed_results[name] = {}
- for key in elem["attributes"]:
- predicted_data = np.array(
- list(
- map(
- lambda i: model_function(name, key, param=elem["param"][i]),
- range(len(elem[key])),
- )
- )
- )
- measures = regression_measures(predicted_data, elem[key])
- detailed_results[name][key] = measures
-
- return {"by_name": detailed_results}
-
- def assess_states(
- self, model_function, model_attribute="power", distribution: dict = None
- ):
- """
- Calculate overall model error assuming equal distribution of states
- """
- # TODO calculate mean power draw for distribution and use it to
- # calculate relative error from MAE combination
- model_quality = self.assess(model_function)
- num_states = len(self.states())
- if distribution is None:
- distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
-
- if not np.isclose(sum(distribution.values()), 1):
- raise ValueError(
- "distribution must be a probability distribution with sum 1"
- )
-
- # total_value = None
- # try:
- # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states()))
- # except KeyError:
- # pass
-
- total_error = np.sqrt(
- sum(
- map(
- lambda x: np.square(
- model_quality["by_name"][x][model_attribute]["mae"]
- * distribution[x]
- ),
- self.states(),
- )
- )
- )
- return total_error
-
- def assess_on_traces(self, model_function):
- """
- Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance.
-
- :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`.
- Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the
- traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states
- """
- model_energy_list = []
- real_energy_list = []
- model_rel_energy_list = []
- model_state_energy_list = []
- model_duration_list = []
- real_duration_list = []
- model_timeout_list = []
- real_timeout_list = []
-
- for trace in self.traces:
- if trace["id"] not in self.ignore_trace_indexes:
- for rep_id in range(len(trace["trace"][0]["offline"])):
- model_energy = 0.0
- real_energy = 0.0
- model_rel_energy = 0.0
- model_state_energy = 0.0
- model_duration = 0.0
- real_duration = 0.0
- model_timeout = 0.0
- real_timeout = 0.0
- for i, trace_part in enumerate(trace["trace"]):
- name = trace_part["name"]
- prev_name = trace["trace"][i - 1]["name"]
- isa = trace_part["isa"]
- if name != "UNINITIALIZED":
- try:
- param = trace_part["offline_aggregates"]["param"][
- rep_id
- ]
- prev_param = trace["trace"][i - 1][
- "offline_aggregates"
- ]["param"][rep_id]
- power = trace_part["offline"][rep_id]["uW_mean"]
- duration = trace_part["offline"][rep_id]["us"]
- prev_duration = trace["trace"][i - 1]["offline"][
- rep_id
- ]["us"]
- real_energy += power * duration
- if isa == "state":
- model_energy += (
- model_function(name, "power", param=param)
- * duration
- )
- else:
- model_energy += model_function(
- name, "energy", param=param
- )
- # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
- if i == 1:
- model_rel_energy += model_function(
- name, "energy", param=param
- )
- else:
- model_rel_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_state_energy += model_function(
- prev_name, "power", param=prev_param
- ) * (prev_duration + duration)
- model_rel_energy += model_function(
- name, "rel_energy_prev", param=param
- )
- real_duration += duration
- model_duration += model_function(
- name, "duration", param=param
- )
- if (
- "plan" in trace_part
- and trace_part["plan"]["level"] == "epilogue"
- ):
- real_timeout += trace_part["offline"][rep_id][
- "timeout"
- ]
- model_timeout += model_function(
- name, "timeout", param=param
- )
- except KeyError:
- # if states/transitions have been removed via --filter-param, this is harmless
- pass
- real_energy_list.append(real_energy)
- model_energy_list.append(model_energy)
- model_rel_energy_list.append(model_rel_energy)
- model_state_energy_list.append(model_state_energy)
- real_duration_list.append(real_duration)
- model_duration_list.append(model_duration)
- real_timeout_list.append(real_timeout)
- model_timeout_list.append(model_timeout)
-
- return {
- "duration_by_trace": regression_measures(
- np.array(model_duration_list), np.array(real_duration_list)
- ),
- "energy_by_trace": regression_measures(
- np.array(model_energy_list), np.array(real_energy_list)
- ),
- "timeout_by_trace": regression_measures(
- np.array(model_timeout_list), np.array(real_timeout_list)
- ),
- "rel_energy_by_trace": regression_measures(
- np.array(model_rel_energy_list), np.array(real_energy_list)
- ),
- "state_energy_by_trace": regression_measures(
- np.array(model_state_energy_list), np.array(real_energy_list)
- ),
- }
-
-
class EnergyTraceLog:
"""
EnergyTrace log loader for DFA traces.
@@ -2617,7 +1195,6 @@ class EnergyTraceLog:
self.state_duration = state_duration * 1e-3
self.transition_names = transition_names
self.with_traces = with_traces
- self.verbose = False
self.errors = list()
# TODO auto-detect
@@ -2643,6 +1220,7 @@ class EnergyTraceLog:
"""
if not zbar_available:
+ logger.error("zbar module is not available")
self.errors.append(
'zbar module is not available. Try "apt install python3-zbar"'
)
@@ -2675,11 +1253,10 @@ class EnergyTraceLog:
self.sample_rate = data_count / (m_duration_us * 1e-6)
- vprint(
- self.verbose,
+ logger.debug(
"got {} samples with {} seconds of log data ({} Hz)".format(
data_count, m_duration_us * 1e-6, self.sample_rate
- ),
+ )
)
return (
@@ -2783,25 +1360,20 @@ class EnergyTraceLog:
for name, duration in expected_transitions:
bc, start, stop, end = self.find_barcode(next_barcode)
if bc is None:
- print('[!!!] did not find transition "{}"'.format(name))
+ logger.error('did not find transition "{}"'.format(name))
break
next_barcode = end + self.state_duration + duration
- vprint(
- self.verbose,
+ logger.debug(
'{} barcode "{}" area: {:0.2f} .. {:0.2f} / {:0.2f} seconds'.format(
offline_index, bc, start, stop, end
- ),
+ )
)
if bc != name:
- vprint(
- self.verbose,
- '[!!!] mismatch: expected "{}", got "{}"'.format(name, bc),
- )
- vprint(
- self.verbose,
+ logger.error('mismatch: expected "{}", got "{}"'.format(name, bc))
+ logger.debug(
"{} estimated transition area: {:0.3f} .. {:0.3f} seconds".format(
offline_index, end, end + duration
- ),
+ )
)
transition_start_index = self.ts_to_index(end)
@@ -2811,13 +1383,12 @@ class EnergyTraceLog:
self.ts_to_index(end + duration + self.state_duration) + 1
)
- vprint(
- self.verbose,
+ logger.debug(
"{} estimated transitionindex: {:0.3f} .. {:0.3f} seconds".format(
offline_index,
transition_start_index / self.sample_rate,
transition_done_index / self.sample_rate,
- ),
+ )
)
transition_power_W = self.interval_power[
@@ -2912,11 +1483,10 @@ class EnergyTraceLog:
+ self.led_power / 3
)
- vprint(
- self.verbose,
+ logger.debug(
"looking for barcode starting at {:0.2f} s, threshold is {:0.1f} mW".format(
start_ts, sync_threshold_power * 1e3
- ),
+ )
)
sync_area_start = None
@@ -2947,11 +1517,10 @@ class EnergyTraceLog:
barcode_data = self.interval_power[sync_area_start:sync_area_end]
- vprint(
- self.verbose,
+ logger.debug(
"barcode search area: {:0.2f} .. {:0.2f} seconds ({} samples)".format(
sync_start_ts, sync_end_ts, len(barcode_data)
- ),
+ )
)
bc, start, stop, padding_bits = self.find_barcode_in_power_data(barcode_data)
@@ -3026,7 +1595,7 @@ class EnergyTraceLog:
return content, sym_start, sym_end, padding_bits
else:
- vprint(self.verbose, "unable to find barcode")
+ logger.warning("unable to find barcode")
return None, None, None, None
@@ -3046,17 +1615,15 @@ class MIMOSA:
Resulting data is a list of state/transition/state/transition/... measurements.
"""
- def __init__(self, voltage: float, shunt: int, verbose=True, with_traces=False):
+ def __init__(self, voltage: float, shunt: int, with_traces=False):
"""
Initialize MIMOSA loader for a specific voltage and shunt setting.
:param voltage: MIMOSA DUT supply voltage (V)
:para mshunt: MIMOSA Shunt (Ohms)
- :param verbose: print notices about invalid data on STDOUT?
"""
self.voltage = voltage
self.shunt = shunt
- self.verbose = verbose
self.with_traces = with_traces
self.r1 = 984 # "1k"
self.r2 = 99013 # "100k"
@@ -3254,7 +1821,7 @@ class MIMOSA:
if cal_r2_mean > cal_0_mean:
b_lower = (ua_r2 - 0) / (cal_r2_mean - cal_0_mean)
else:
- vprint(self.verbose, "[W] 0 uA == %.f uA during calibration" % (ua_r2))
+ logger.warning("0 uA == %.f uA during calibration" % (ua_r2))
b_lower = 0
b_upper = (ua_r1 - ua_r2) / (cal_r1_mean - cal_r2_mean)
@@ -3302,50 +1869,6 @@ class MIMOSA:
return calfunc, caldata
- """
- def calcgrad(self, currents, threshold):
- grad = np.gradient(running_mean(currents * self.voltage, 10))
- # len(grad) == len(currents) - 9
- subst = []
- lastgrad = 0
- for i in range(len(grad)):
- # minimum substate duration: 10ms
- if np.abs(grad[i]) > threshold and i - lastgrad > 50:
- # account for skew introduced by running_mean and current
- # ramp slope (parasitic capacitors etc.)
- subst.append(i+10)
- lastgrad = i
- if lastgrad != i:
- subst.append(i+10)
- return subst
-
- # TODO konfigurierbare min/max threshold und len(gradidx) > X, binaere
- # Sache nach noetiger threshold. postprocessing mit
- # "zwei benachbarte substates haben sehr aehnliche werte / niedrige stddev" -> mergen
- # ... min/max muessen nicht vorgegeben werden, sind ja bekannt (0 / np.max(grad))
- # TODO bei substates / index foo den offset durch running_mean beachten
- # TODO ggf. clustering der 'abs(grad) > threshold' und bestimmung interessanter
- # uebergaenge dadurch?
- def gradfoo(self, currents):
- gradients = np.abs(np.gradient(running_mean(currents * self.voltage, 10)))
- gradmin = np.min(gradients)
- gradmax = np.max(gradients)
- threshold = np.mean([gradmin, gradmax])
- gradidx = self.calcgrad(currents, threshold)
- num_substates = 2
- while len(gradidx) != num_substates:
- if gradmax - gradmin < 0.1:
- # We did our best
- return threshold, gradidx
- if len(gradidx) > num_substates:
- gradmin = threshold
- else:
- gradmax = threshold
- threshold = np.mean([gradmin, gradmax])
- gradidx = self.calcgrad(currents, threshold)
- return threshold, gradidx
- """
-
def analyze_states(self, charges, trigidx, ua_func):
u"""
Split log data into states and transitions and return duration, energy, and mean power for each element.
@@ -3380,30 +1903,6 @@ class MIMOSA:
for idx in trigger_indices:
range_raw = charges[previdx:idx]
range_ua = ua_func(range_raw)
- substates = {}
-
- if previdx != 0 and idx - previdx > 200:
- thr, subst = 0, [] # self.gradfoo(range_ua)
- if len(subst):
- statelist = []
- prevsubidx = 0
- for subidx in subst:
- statelist.append(
- {
- "duration": (subidx - prevsubidx) * 10,
- "uW_mean": np.mean(
- range_ua[prevsubidx:subidx] * self.voltage
- ),
- "uW_std": np.std(
- range_ua[prevsubidx:subidx] * self.voltage
- ),
- }
- )
- prevsubidx = subidx
- substates = {
- "threshold": thr,
- "states": statelist,
- }
isa = "state"
if not is_state:
@@ -3422,12 +1921,6 @@ class MIMOSA:
if self.with_traces:
data["uW"] = range_ua * self.voltage
- if "states" in substates:
- data["substates"] = substates
- ssum = np.sum(list(map(lambda x: x["duration"], substates["states"])))
- if ssum != data["us"]:
- vprint(self.verbose, "ERR: duration %d vs %d" % (data["us"], ssum))
-
if isa == "transition":
# subtract average power of previous state
# (that is, the state from which this transition originates)
diff --git a/lib/model.py b/lib/model.py
new file mode 100644
index 0000000..bb4a45b
--- /dev/null
+++ b/lib/model.py
@@ -0,0 +1,1156 @@
+#!/usr/bin/env python3
+
+import logging
+import numpy as np
+from scipy import optimize
+from sklearn.metrics import r2_score
+from multiprocessing import Pool
+from .automata import PTA
+from .functions import analytic
+from .functions import AnalyticFunction
+from .parameters import ParamStats
+from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple
+from .utils import by_name_to_by_param, match_parameter_values
+
+logger = logging.getLogger(__name__)
+arg_support_enabled = True
+
+
+def aggregate_measures(aggregate: float, actual: list) -> dict:
+ """
+ Calculate error measures for model value on data list.
+
+ arguments:
+ aggregate -- model value (float or int)
+ actual -- real-world / reference values (list of float or int)
+
+ return value:
+ See regression_measures
+ """
+ aggregate_array = np.array([aggregate] * len(actual))
+ return regression_measures(aggregate_array, np.array(actual))
+
+
+def regression_measures(predicted: np.ndarray, actual: np.ndarray):
+ """
+ Calculate error measures by comparing model values to reference values.
+
+ arguments:
+ predicted -- model values (np.ndarray)
+ actual -- real-world / reference values (np.ndarray)
+
+ Returns a dict containing the following measures:
+ mae -- Mean Absolute Error
+ mape -- Mean Absolute Percentage Error,
+ if all items in actual are non-zero (NaN otherwise)
+ smape -- Symmetric Mean Absolute Percentage Error,
+ if no 0,0-pairs are present in actual and predicted (NaN otherwise)
+ msd -- Mean Square Deviation
+ rmsd -- Root Mean Square Deviation
+ ssr -- Sum of Squared Residuals
+ rsq -- R^2 measure, see sklearn.metrics.r2_score
+ count -- Number of values
+ """
+ if type(predicted) != np.ndarray:
+ raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
+ if type(actual) != np.ndarray:
+ raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
+ deviations = predicted - actual
+ # mean = np.mean(actual)
+ if len(deviations) == 0:
+ return {}
+ measures = {
+ "mae": np.mean(np.abs(deviations), dtype=np.float64),
+ "msd": np.mean(deviations ** 2, dtype=np.float64),
+ "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
+ "ssr": np.sum(deviations ** 2, dtype=np.float64),
+ "rsq": r2_score(actual, predicted),
+ "count": len(actual),
+ }
+
+ # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
+
+ if np.all(actual != 0):
+ measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
+ else:
+ measures["mape"] = np.nan
+ if np.all(np.abs(predicted) + np.abs(actual) != 0):
+ measures["smape"] = (
+ np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
+ * 100
+ )
+ else:
+ measures["smape"] = np.nan
+ # if np.all(rsq_quotient != 0):
+ # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
+
+ return measures
+
+
+class ParallelParamFit:
+ """
+ Fit a set of functions on parameterized measurements.
+
+ One parameter is variale, all others are fixed. Reports the best-fitting
+ function type for each parameter.
+ """
+
+ def __init__(self, by_param):
+ """Create a new ParallelParamFit object."""
+ self.fit_queue = []
+ self.by_param = by_param
+
+ def enqueue(
+ self,
+ state_or_tran,
+ attribute,
+ param_index,
+ param_name,
+ safe_functions_enabled=False,
+ param_filter=None,
+ ):
+ """
+ Add state_or_tran/attribute/param_name to fit queue.
+
+ This causes fit() to compute the best-fitting function for this model part.
+ """
+ self.fit_queue.append(
+ {
+ "key": [state_or_tran, attribute, param_name, param_filter],
+ "args": [
+ self.by_param,
+ state_or_tran,
+ attribute,
+ param_index,
+ safe_functions_enabled,
+ param_filter,
+ ],
+ }
+ )
+
+ def fit(self):
+ """
+ Fit functions on previously enqueue data.
+
+ Fitting is one in parallel with one process per core.
+
+ Results can be accessed using the public ParallelParamFit.results object.
+ """
+ with Pool() as pool:
+ self.results = pool.map(_try_fits_parallel, self.fit_queue)
+
+ def get_result(self, name, attribute, param_filter: dict = None):
+ """
+ Parse and sanitize fit results for state/transition/... 'name' and model attribute 'attribute'.
+
+ Filters out results where the best function is worse (or not much better than) static mean/median estimates.
+
+ :param name: state/transition/... name, e.g. 'TX'
+ :param attribute: model attribute, e.g. 'duration'
+ :param param_filter:
+ :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
+ """
+ fit_result = dict()
+ for result in self.results:
+ if (
+ result["key"][0] == name
+ and result["key"][1] == attribute
+ and result["key"][3] == param_filter
+ and result["result"]["best"] is not None
+ ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
+ this_result = result["result"]
+ if this_result["best_rmsd"] >= min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ # See notes on depends_on_param
+ elif this_result["best_rmsd"] >= 0.8 * min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
+ name,
+ attribute,
+ result["key"][2],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ else:
+ fit_result[result["key"][2]] = this_result
+ return fit_result
+
+
+def _try_fits_parallel(arg):
+ """
+ Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
+
+ Must be a global function as it is called from a multiprocessing Pool.
+ """
+ return {"key": arg["key"], "result": _try_fits(*arg["args"])}
+
+
+def _try_fits(
+ by_param,
+ state_or_tran,
+ model_attribute,
+ param_index,
+ safe_functions_enabled=False,
+ param_filter: dict = None,
+):
+ """
+ Determine goodness-of-fit for prediction of `by_param[(state_or_tran, *)][model_attribute]` dependence on `param_index` using various functions.
+
+ This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
+ The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
+ Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
+ Fitting is only performed if at least three distinct parameter values exist in `by_param[(state_or_tran, *)]`.
+
+ :returns: a dictionary with the following elements:
+ best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
+ best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
+ mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
+ median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
+ results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
+
+ :param by_param: measurements partitioned by state/transition/... name and parameter values.
+ Example: `{('foo', (0, 2)): {'bar': [2]}, ('foo', (0, 4)): {'bar': [4]}, ('foo', (0, 6)): {'bar': [6]}}`
+
+ :param state_or_tran: state/transition/... name for which goodness-of-fit will be calculated (first element of by_param key tuple).
+ Example: `'foo'`
+
+ :param model_attribute: attribute for which goodness-of-fit will be calculated.
+ Example: `'bar'`
+
+ :param param_index: index of the parameter used as model input
+ :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
+ :param param_filter: Only use measurements whose parameters match param_filter for fitting.
+ """
+
+ functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
+
+ for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()):
+ # We might remove elements from 'functions' while iterating over
+ # its keys. A generator will not allow this, so we need to
+ # convert to a list.
+ function_names = list(functions.keys())
+ for function_name in function_names:
+ function_object = functions[function_name]
+ if is_numeric(param_key[1][param_index]) and not function_object.is_valid(
+ param_key[1][param_index]
+ ):
+ functions.pop(function_name, None)
+
+ raw_results = dict()
+ raw_results_by_param = dict()
+ ref_results = {"mean": list(), "median": list()}
+ results = dict()
+ results_by_param = dict()
+
+ seen_parameter_combinations = set()
+
+ # for each parameter combination:
+ for param_key in filter(
+ lambda x: x[0] == state_or_tran
+ and remove_index_from_tuple(x[1], param_index)
+ not in seen_parameter_combinations
+ and len(by_param[x]["param"])
+ and match_parameter_values(by_param[x]["param"][0], param_filter),
+ by_param.keys(),
+ ):
+ X = []
+ Y = []
+ num_valid = 0
+ num_total = 0
+
+ # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
+ # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
+ seen_parameter_combinations.add(
+ remove_index_from_tuple(param_key[1], param_index)
+ )
+
+ # for each value of the parameter denoted by param_index (all other parameters remain the same):
+ for k, v in filter(
+ lambda kv: param_slice_eq(kv[0], param_key, param_index), by_param.items()
+ ):
+ num_total += 1
+ if is_numeric(k[1][param_index]):
+ num_valid += 1
+ X.extend([float(k[1][param_index])] * len(v[model_attribute]))
+ Y.extend(v[model_attribute])
+
+ if num_valid > 2:
+ X = np.array(X)
+ Y = np.array(Y)
+ other_parameters = remove_index_from_tuple(k[1], param_index)
+ raw_results_by_param[other_parameters] = dict()
+ results_by_param[other_parameters] = dict()
+ for function_name, param_function in functions.items():
+ if function_name not in raw_results:
+ raw_results[function_name] = dict()
+ error_function = param_function.error_function
+ res = optimize.least_squares(
+ error_function, [0, 1], args=(X, Y), xtol=2e-15
+ )
+ measures = regression_measures(param_function.eval(res.x, X), Y)
+ raw_results_by_param[other_parameters][function_name] = measures
+ for measure, error_rate in measures.items():
+ if measure not in raw_results[function_name]:
+ raw_results[function_name][measure] = list()
+ raw_results[function_name][measure].append(error_rate)
+ # print(function_name, res, measures)
+ mean_measures = aggregate_measures(np.mean(Y), Y)
+ ref_results["mean"].append(mean_measures["rmsd"])
+ raw_results_by_param[other_parameters]["mean"] = mean_measures
+ median_measures = aggregate_measures(np.median(Y), Y)
+ ref_results["median"].append(median_measures["rmsd"])
+ raw_results_by_param[other_parameters]["median"] = median_measures
+
+ if not len(ref_results["mean"]):
+ # Insufficient data for fitting
+ # print('[W] Insufficient data for fitting {}/{}/{}'.format(state_or_tran, model_attribute, param_index))
+ return {"best": None, "best_rmsd": np.inf, "results": results}
+
+ for (
+ other_parameter_combination,
+ other_parameter_results,
+ ) in raw_results_by_param.items():
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in other_parameter_results.items():
+ if len(result) > 0:
+ results[function_name] = result
+ rmsd = result["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+ results_by_param[other_parameter_combination] = {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": results["mean"]["rmsd"],
+ "median_rmsd": results["median"]["rmsd"],
+ "results": results,
+ }
+
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in raw_results.items():
+ if len(result) > 0:
+ results[function_name] = {}
+ for measure in result.keys():
+ results[function_name][measure] = np.mean(result[measure])
+ rmsd = results[function_name]["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+
+ return {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": np.mean(ref_results["mean"]),
+ "median_rmsd": np.mean(ref_results["median"]),
+ "results": results,
+ "results_by_other_param": results_by_param,
+ }
+
+
+def _num_args_from_by_name(by_name):
+ num_args = dict()
+ for key, value in by_name.items():
+ if "args" in value:
+ num_args[key] = len(value["args"][0])
+ return num_args
+
+
+class AnalyticModel:
+ u"""
+ Parameter-aware analytic energy/data size/... model.
+
+ Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
+
+ These provide measurements aggregated by (function/state/...) name
+ and (for by_param) parameter values. Layout:
+ dictionary with one key per name ('send', 'TX', ...) or
+ one key per name and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+
+ Parameter values must be ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ - for each attribute mentioned in 'attributes': A list with measurements.
+ All list except for 'attributes' must have the same length.
+
+ For example:
+ parameters = ['foo_count', 'irrelevant']
+ by_name = {
+ 'foo' : [1, 1, 2],
+ 'bar' : [5, 6, 7],
+ 'attributes' : ['foo', 'bar'],
+ 'param' : [[1, 0], [1, 0], [2, 0]]
+ }
+
+ methods:
+ get_static -- return static (parameter-unaware) model.
+ get_param_lut -- return parameter-aware look-up-table model. Cannot model parameter combinations not present in by_param.
+ get_fitted -- return parameter-aware model using fitted functions for behaviour prediction.
+
+ variables:
+ names -- function/state/... names (i.e., the keys of by_name)
+ parameters -- parameter names
+ stats -- ParamStats object providing parameter-dependency statistics for each name and attribute
+ assess -- calculate model quality
+ """
+
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count=None,
+ function_override=dict(),
+ use_corrcoef=False,
+ ):
+ """
+ Create a new AnalyticModel and compute parameter statistics.
+
+ :param by_name: measurements aggregated by (function/state/...) name.
+ Layout: dictionary with one key per name ('send', 'TX', ...) or
+ one key per name and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+
+ Parameter values must be ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ - for each attribute mentioned in 'attributes': A list with measurements.
+ All list except for 'attributes' must have the same length.
+
+ For example:
+ parameters = ['foo_count', 'irrelevant']
+ by_name = {
+ 'foo' : [1, 1, 2],
+ 'duration' : [5, 6, 7],
+ 'attributes' : ['foo', 'duration'],
+ 'param' : [[1, 0], [1, 0], [2, 0]]
+ # foo_count-^ ^-irrelevant
+ }
+ :param parameters: List of parameter names
+ :param function_override: dict of overrides for automatic parameter function generation.
+ If (state or transition name, model attribute) is present in function_override,
+ the corresponding text string is the function used for analytic (parameter-aware/fitted)
+ modeling of this attribute. It is passed to AnalyticFunction, see
+ there for the required format. Note that this happens regardless of
+ parameter dependency detection: The provided analytic function will be assigned
+ even if it seems like the model attribute is static / parameter-independent.
+ :param use_corrcoef: use correlation coefficient instead of stddev comparison to detect whether a model attribute depends on a parameter
+ """
+ self.cache = dict()
+ self.by_name = by_name
+ self.by_param = by_name_to_by_param(by_name)
+ self.names = sorted(by_name.keys())
+ self.parameters = sorted(parameters)
+ self.function_override = function_override.copy()
+ self._use_corrcoef = use_corrcoef
+ self._num_args = arg_count
+ if self._num_args is None:
+ self._num_args = _num_args_from_by_name(by_name)
+
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self.parameters,
+ self._num_args,
+ use_corrcoef=use_corrcoef,
+ )
+
+ def _get_model_from_dict(self, model_dict, model_function):
+ model = {}
+ for name, elem in model_dict.items():
+ model[name] = {}
+ for key in elem["attributes"]:
+ try:
+ model[name][key] = model_function(elem[key])
+ except RuntimeWarning:
+ logger.warning("Got no data for {} {}".format(name, key))
+ except FloatingPointError as fpe:
+ logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
+ return model
+
+ def param_index(self, param_name):
+ if param_name in self.parameters:
+ return self.parameters.index(param_name)
+ return len(self.parameters) + int(param_name)
+
+ def param_name(self, param_index):
+ if param_index < len(self.parameters):
+ return self.parameters[param_index]
+ return str(param_index)
+
+ def get_static(self, use_mean=False):
+ """
+ Get static model function: name, attribute -> model value.
+
+ Uses the median of by_name for modeling.
+ """
+ getter_function = np.median
+
+ if use_mean:
+ getter_function = np.mean
+
+ static_model = self._get_model_from_dict(self.by_name, getter_function)
+
+ def static_model_getter(name, key, **kwargs):
+ return static_model[name][key]
+
+ return static_model_getter
+
+ def get_param_lut(self, fallback=False):
+ """
+ Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
+
+ The function can only give model values for parameter combinations
+ present in by_param. By default, it raises KeyError for other values.
+
+ arguments:
+ fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
+ """
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ lut_model = self._get_model_from_dict(self.by_param, np.median)
+
+ def lut_median_getter(name, key, param, arg=[], **kwargs):
+ param.extend(map(soft_cast_int, arg))
+ try:
+ return lut_model[(name, tuple(param))][key]
+ except KeyError:
+ if fallback:
+ return static_model[name][key]
+ raise
+
+ return lut_median_getter
+
+ def get_fitted(self, safe_functions_enabled=False):
+ """
+ Get paramete-aware model function and model information function.
+
+ Returns two functions:
+ model_function(name, attribute, param=parameter values) -> model value.
+ model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
+ """
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
+
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ param_model = dict([[name, {}] for name in self.by_name.keys()])
+ paramfit = ParallelParamFit(self.by_param)
+
+ for name in self.by_name.keys():
+ for attribute in self.by_name[name]["attributes"]:
+ for param_index, param in enumerate(self.parameters):
+ if self.stats.depends_on_param(name, attribute, param):
+ paramfit.enqueue(name, attribute, param_index, param, False)
+ if arg_support_enabled and name in self._num_args:
+ for arg_index in range(self._num_args[name]):
+ if self.stats.depends_on_arg(name, attribute, arg_index):
+ paramfit.enqueue(
+ name,
+ attribute,
+ len(self.parameters) + arg_index,
+ arg_index,
+ False,
+ )
+
+ paramfit.fit()
+
+ for name in self.by_name.keys():
+ num_args = 0
+ if name in self._num_args:
+ num_args = self._num_args[name]
+ for attribute in self.by_name[name]["attributes"]:
+ fit_result = paramfit.get_result(name, attribute)
+
+ if (name, attribute) in self.function_override:
+ function_str = self.function_override[(name, attribute)]
+ x = AnalyticFunction(function_str, self.parameters, num_args)
+ x.fit(self.by_param, name, attribute)
+ if x.fit_success:
+ param_model[name][attribute] = {
+ "fit_result": fit_result,
+ "function": x,
+ }
+ elif len(fit_result.keys()):
+ x = analytic.function_powerset(
+ fit_result, self.parameters, num_args
+ )
+ x.fit(self.by_param, name, attribute)
+
+ if x.fit_success:
+ param_model[name][attribute] = {
+ "fit_result": fit_result,
+ "function": x,
+ }
+
+ def model_getter(name, key, **kwargs):
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
+ if key in param_model[name]:
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
+ if param_function.is_predictable(param_list):
+ return param_function.eval(param_list)
+ return static_model[name][key]
+
+ def info_getter(name, key):
+ if key in param_model[name]:
+ return param_model[name][key]
+ return None
+
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
+
+ return model_getter, info_getter
+
+ def assess(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
+
+ state/transition/... name and parameter values are fed into model_function.
+ The by_name entries of this AnalyticModel are used as ground truth and
+ compared with the values predicted by model_function.
+
+ For proper model assessments, the data used to generate model_function
+ and the data fed into this AnalyticModel instance must be mutually
+ exclusive (e.g. by performing cross validation). Otherwise,
+ overfitting cannot be detected.
+ """
+ detailed_results = {}
+ for name, elem in sorted(self.by_name.items()):
+ detailed_results[name] = {}
+ for attribute in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(
+ name, attribute, param=elem["param"][i]
+ ),
+ range(len(elem[attribute])),
+ )
+ )
+ )
+ measures = regression_measures(predicted_data, elem[attribute])
+ detailed_results[name][attribute] = measures
+
+ return {"by_name": detailed_results}
+
+ def to_json(self):
+ # TODO
+ pass
+
+
+class PTAModel:
+ u"""
+ Parameter-aware PTA-based energy model.
+
+ Supports both static and parameter-based model attributes, and automatic detection of parameter-dependence.
+
+ The model heavily relies on two internal data structures:
+ PTAModel.by_name and PTAModel.by_param.
+
+ These provide measurements aggregated by state/transition name
+ and (for by_param) parameter values. Layout:
+ dictionary with one key per state/transition ('send', 'TX', ...) or
+ one key per state/transition and parameter combination
+ (('send', (1, 2)), ('send', (2, 3)), ('TX', (1, 2)), ('TX', (2, 3)), ...).
+ For by_param, parameter values are ordered corresponding to the lexically sorted parameter names.
+
+ Each element is in turn a dict with the following elements:
+ - isa: 'state' or 'transition'
+ - power: list of mean power measurements in µW
+ - duration: list of durations in µs
+ - power_std: list of stddev of power per state/transition
+ - energy: consumed energy (power*duration) in pJ
+ - paramkeys: list of parameter names in each measurement (-> list of lists)
+ - param: list of parameter values in each measurement (-> list of lists)
+ - attributes: list of keys that should be analyzed,
+ e.g. ['power', 'duration']
+ additionally, only if isa == 'transition':
+ - timeout: list of duration of previous state in µs
+ - rel_energy_prev: transition energy relative to previous state mean power in pJ
+ - rel_energy_next: transition energy relative to next state mean power in pJ
+ """
+
+ def __init__(
+ self,
+ by_name,
+ parameters,
+ arg_count,
+ traces=[],
+ ignore_trace_indexes=[],
+ function_override={},
+ use_corrcoef=False,
+ pta=None,
+ ):
+ """
+ Prepare a new PTA energy model.
+
+ Actual model generation is done on-demand by calling the respective functions.
+
+ arguments:
+ by_name -- state/transition measurements aggregated by name, as returned by pta_trace_to_aggregate.
+ parameters -- list of parameter names, as returned by pta_trace_to_aggregate
+ arg_count -- function arguments, as returned by pta_trace_to_aggregate
+ traces -- list of preprocessed DFA traces, as returned by RawData.get_preprocessed_data()
+ ignore_trace_indexes -- list of trace indexes. The corresponding traces will be ignored.
+ function_override -- dict of overrides for automatic parameter function generation.
+ If (state or transition name, model attribute) is present in function_override,
+ the corresponding text string is the function used for analytic (parameter-aware/fitted)
+ modeling of this attribute. It is passed to AnalyticFunction, see
+ there for the required format. Note that this happens regardless of
+ parameter dependency detection: The provided analytic function will be assigned
+ even if it seems like the model attribute is static / parameter-independent.
+ use_corrcoef -- use correlation coefficient instead of stddev comparison
+ to detect whether a model attribute depends on a parameter
+ pta -- hardware model as `PTA` object
+ """
+ self.by_name = by_name
+ self.by_param = by_name_to_by_param(by_name)
+ self._parameter_names = sorted(parameters)
+ self._num_args = arg_count
+ self._use_corrcoef = use_corrcoef
+ self.traces = traces
+ self.stats = ParamStats(
+ self.by_name,
+ self.by_param,
+ self._parameter_names,
+ self._num_args,
+ self._use_corrcoef,
+ )
+ self.cache = {}
+ np.seterr("raise")
+ self.function_override = function_override.copy()
+ self.pta = pta
+ self.ignore_trace_indexes = ignore_trace_indexes
+ self._aggregate_to_ndarray(self.by_name)
+
+ def _aggregate_to_ndarray(self, aggregate):
+ for elem in aggregate.values():
+ for key in elem["attributes"]:
+ elem[key] = np.array(elem[key])
+
+ # This heuristic is very similar to the "function is not much better than
+ # median" checks in get_fitted. So far, doing it here as well is mostly
+ # a performance and not an algorithm quality decision.
+ # --df, 2018-04-18
+ def depends_on_param(self, state_or_trans, key, param):
+ return self.stats.depends_on_param(state_or_trans, key, param)
+
+ # See notes on depends_on_param
+ def depends_on_arg(self, state_or_trans, key, param):
+ return self.stats.depends_on_arg(state_or_trans, key, param)
+
+ def _get_model_from_dict(self, model_dict, model_function):
+ model = {}
+ for name, elem in model_dict.items():
+ model[name] = {}
+ for key in elem["attributes"]:
+ try:
+ model[name][key] = model_function(elem[key])
+ except RuntimeWarning:
+ logger.warning("Got no data for {} {}".format(name, key))
+ except FloatingPointError as fpe:
+ logger.warning("Got no data for {} {}: {}".format(name, key, fpe))
+ return model
+
+ def get_static(self, use_mean=False):
+ """
+ Get static model function: name, attribute -> model value.
+
+ Uses the median of by_name for modeling, unless `use_mean` is set.
+ """
+ getter_function = np.median
+
+ if use_mean:
+ getter_function = np.mean
+
+ static_model = self._get_model_from_dict(self.by_name, getter_function)
+
+ def static_model_getter(name, key, **kwargs):
+ return static_model[name][key]
+
+ return static_model_getter
+
+ def get_param_lut(self, fallback=False):
+ """
+ Get parameter-look-up-table model function: name, attribute, parameter values -> model value.
+
+ The function can only give model values for parameter combinations
+ present in by_param. By default, it raises KeyError for other values.
+
+ arguments:
+ fallback -- Fall back to the (non-parameter-aware) static model when encountering unknown parameter values
+ """
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ lut_model = self._get_model_from_dict(self.by_param, np.median)
+
+ def lut_median_getter(name, key, param, arg=[], **kwargs):
+ param.extend(map(soft_cast_int, arg))
+ try:
+ return lut_model[(name, tuple(param))][key]
+ except KeyError:
+ if fallback:
+ return static_model[name][key]
+ raise
+
+ return lut_median_getter
+
+ def param_index(self, param_name):
+ if param_name in self._parameter_names:
+ return self._parameter_names.index(param_name)
+ return len(self._parameter_names) + int(param_name)
+
+ def param_name(self, param_index):
+ if param_index < len(self._parameter_names):
+ return self._parameter_names[param_index]
+ return str(param_index)
+
+ def get_fitted(self, safe_functions_enabled=False):
+ """
+ Get parameter-aware model function and model information function.
+
+ Returns two functions:
+ model_function(name, attribute, param=parameter values) -> model value.
+ model_info(name, attribute) -> {'fit_result' : ..., 'function' : ... } or None
+ """
+ if "fitted_model_getter" in self.cache and "fitted_info_getter" in self.cache:
+ return self.cache["fitted_model_getter"], self.cache["fitted_info_getter"]
+
+ static_model = self._get_model_from_dict(self.by_name, np.median)
+ param_model = dict(
+ [[state_or_tran, {}] for state_or_tran in self.by_name.keys()]
+ )
+ paramfit = ParallelParamFit(self.by_param)
+ for state_or_tran in self.by_name.keys():
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
+ fit_results = {}
+ for parameter_index, parameter_name in enumerate(self._parameter_names):
+ if self.depends_on_param(
+ state_or_tran, model_attribute, parameter_name
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ parameter_index,
+ parameter_name,
+ safe_functions_enabled,
+ )
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
+ for arg_index in range(self._num_args[state_or_tran]):
+ if self.depends_on_arg(
+ state_or_tran, model_attribute, arg_index
+ ):
+ paramfit.enqueue(
+ state_or_tran,
+ model_attribute,
+ len(self._parameter_names) + arg_index,
+ arg_index,
+ safe_functions_enabled,
+ )
+ paramfit.fit()
+
+ for state_or_tran in self.by_name.keys():
+ num_args = 0
+ if (
+ arg_support_enabled
+ and self.by_name[state_or_tran]["isa"] == "transition"
+ ):
+ num_args = self._num_args[state_or_tran]
+ for model_attribute in self.by_name[state_or_tran]["attributes"]:
+ fit_results = paramfit.get_result(state_or_tran, model_attribute)
+
+ if (state_or_tran, model_attribute) in self.function_override:
+ function_str = self.function_override[
+ (state_or_tran, model_attribute)
+ ]
+ x = AnalyticFunction(function_str, self._parameter_names, num_args)
+ x.fit(self.by_param, state_or_tran, model_attribute)
+ if x.fit_success:
+ param_model[state_or_tran][model_attribute] = {
+ "fit_result": fit_results,
+ "function": x,
+ }
+ elif len(fit_results.keys()):
+ x = analytic.function_powerset(
+ fit_results, self._parameter_names, num_args
+ )
+ x.fit(self.by_param, state_or_tran, model_attribute)
+ if x.fit_success:
+ param_model[state_or_tran][model_attribute] = {
+ "fit_result": fit_results,
+ "function": x,
+ }
+
+ def model_getter(name, key, **kwargs):
+ if "arg" in kwargs and "param" in kwargs:
+ kwargs["param"].extend(map(soft_cast_int, kwargs["arg"]))
+ if key in param_model[name]:
+ param_list = kwargs["param"]
+ param_function = param_model[name][key]["function"]
+ if param_function.is_predictable(param_list):
+ return param_function.eval(param_list)
+ return static_model[name][key]
+
+ def info_getter(name, key):
+ if key in param_model[name]:
+ return param_model[name][key]
+ return None
+
+ self.cache["fitted_model_getter"] = model_getter
+ self.cache["fitted_info_getter"] = info_getter
+
+ return model_getter, info_getter
+
+ def to_json(self):
+ static_model = self.get_static()
+ static_quality = self.assess(static_model)
+ param_model, param_info = self.get_fitted()
+ analytic_quality = self.assess(param_model)
+ pta = self.pta
+ if pta is None:
+ pta = PTA(self.states(), parameters=self._parameter_names)
+ pta.update(
+ static_model,
+ param_info,
+ static_error=static_quality["by_name"],
+ analytic_error=analytic_quality["by_name"],
+ )
+ return pta.to_json()
+
+ def states(self):
+ """Return sorted list of state names."""
+ return sorted(
+ list(
+ filter(lambda k: self.by_name[k]["isa"] == "state", self.by_name.keys())
+ )
+ )
+
+ def transitions(self):
+ """Return sorted list of transition names."""
+ return sorted(
+ list(
+ filter(
+ lambda k: self.by_name[k]["isa"] == "transition",
+ self.by_name.keys(),
+ )
+ )
+ )
+
+ def states_and_transitions(self):
+ """Return list of states and transition names."""
+ ret = self.states()
+ ret.extend(self.transitions())
+ return ret
+
+ def parameters(self):
+ return self._parameter_names
+
+ def attributes(self, state_or_trans):
+ return self.by_name[state_or_trans]["attributes"]
+
+ def assess(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each by_name entry.
+
+ state/transition/... name and parameter values are fed into model_function.
+ The by_name entries of this PTAModel are used as ground truth and
+ compared with the values predicted by model_function.
+
+ For proper model assessments, the data used to generate model_function
+ and the data fed into this AnalyticModel instance must be mutually
+ exclusive (e.g. by performing cross validation). Otherwise,
+ overfitting cannot be detected.
+ """
+ detailed_results = {}
+ for name, elem in sorted(self.by_name.items()):
+ detailed_results[name] = {}
+ for key in elem["attributes"]:
+ predicted_data = np.array(
+ list(
+ map(
+ lambda i: model_function(name, key, param=elem["param"][i]),
+ range(len(elem[key])),
+ )
+ )
+ )
+ measures = regression_measures(predicted_data, elem[key])
+ detailed_results[name][key] = measures
+
+ return {"by_name": detailed_results}
+
+ def assess_states(
+ self, model_function, model_attribute="power", distribution: dict = None
+ ):
+ """
+ Calculate overall model error assuming equal distribution of states
+ """
+ # TODO calculate mean power draw for distribution and use it to
+ # calculate relative error from MAE combination
+ model_quality = self.assess(model_function)
+ num_states = len(self.states())
+ if distribution is None:
+ distribution = dict(map(lambda x: [x, 1 / num_states], self.states()))
+
+ if not np.isclose(sum(distribution.values()), 1):
+ raise ValueError(
+ "distribution must be a probability distribution with sum 1"
+ )
+
+ # total_value = None
+ # try:
+ # total_value = sum(map(lambda x: model_function(x, model_attribute) * distribution[x], self.states()))
+ # except KeyError:
+ # pass
+
+ total_error = np.sqrt(
+ sum(
+ map(
+ lambda x: np.square(
+ model_quality["by_name"][x][model_attribute]["mae"]
+ * distribution[x]
+ ),
+ self.states(),
+ )
+ )
+ )
+ return total_error
+
+ def assess_on_traces(self, model_function):
+ """
+ Calculate MAE, SMAPE, etc. of model_function for each trace known to this PTAModel instance.
+
+ :returns: dict of `duration_by_trace`, `energy_by_trace`, `timeout_by_trace`, `rel_energy_by_trace` and `state_energy_by_trace`.
+ Each entry holds regression measures for the corresponding measure. Note that the determined model quality heavily depends on the
+ traces: small-ish absolute errors in states which frequently occur may have more effect than large absolute errors in rarely occuring states
+ """
+ model_energy_list = []
+ real_energy_list = []
+ model_rel_energy_list = []
+ model_state_energy_list = []
+ model_duration_list = []
+ real_duration_list = []
+ model_timeout_list = []
+ real_timeout_list = []
+
+ for trace in self.traces:
+ if trace["id"] not in self.ignore_trace_indexes:
+ for rep_id in range(len(trace["trace"][0]["offline"])):
+ model_energy = 0.0
+ real_energy = 0.0
+ model_rel_energy = 0.0
+ model_state_energy = 0.0
+ model_duration = 0.0
+ real_duration = 0.0
+ model_timeout = 0.0
+ real_timeout = 0.0
+ for i, trace_part in enumerate(trace["trace"]):
+ name = trace_part["name"]
+ prev_name = trace["trace"][i - 1]["name"]
+ isa = trace_part["isa"]
+ if name != "UNINITIALIZED":
+ try:
+ param = trace_part["offline_aggregates"]["param"][
+ rep_id
+ ]
+ prev_param = trace["trace"][i - 1][
+ "offline_aggregates"
+ ]["param"][rep_id]
+ power = trace_part["offline"][rep_id]["uW_mean"]
+ duration = trace_part["offline"][rep_id]["us"]
+ prev_duration = trace["trace"][i - 1]["offline"][
+ rep_id
+ ]["us"]
+ real_energy += power * duration
+ if isa == "state":
+ model_energy += (
+ model_function(name, "power", param=param)
+ * duration
+ )
+ else:
+ model_energy += model_function(
+ name, "energy", param=param
+ )
+ # If i == 1, the previous state was UNINITIALIZED, for which we do not have model data
+ if i == 1:
+ model_rel_energy += model_function(
+ name, "energy", param=param
+ )
+ else:
+ model_rel_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_state_energy += model_function(
+ prev_name, "power", param=prev_param
+ ) * (prev_duration + duration)
+ model_rel_energy += model_function(
+ name, "rel_energy_prev", param=param
+ )
+ real_duration += duration
+ model_duration += model_function(
+ name, "duration", param=param
+ )
+ if (
+ "plan" in trace_part
+ and trace_part["plan"]["level"] == "epilogue"
+ ):
+ real_timeout += trace_part["offline"][rep_id][
+ "timeout"
+ ]
+ model_timeout += model_function(
+ name, "timeout", param=param
+ )
+ except KeyError:
+ # if states/transitions have been removed via --filter-param, this is harmless
+ pass
+ real_energy_list.append(real_energy)
+ model_energy_list.append(model_energy)
+ model_rel_energy_list.append(model_rel_energy)
+ model_state_energy_list.append(model_state_energy)
+ real_duration_list.append(real_duration)
+ model_duration_list.append(model_duration)
+ real_timeout_list.append(real_timeout)
+ model_timeout_list.append(model_timeout)
+
+ return {
+ "duration_by_trace": regression_measures(
+ np.array(model_duration_list), np.array(real_duration_list)
+ ),
+ "energy_by_trace": regression_measures(
+ np.array(model_energy_list), np.array(real_energy_list)
+ ),
+ "timeout_by_trace": regression_measures(
+ np.array(model_timeout_list), np.array(real_timeout_list)
+ ),
+ "rel_energy_by_trace": regression_measures(
+ np.array(model_rel_energy_list), np.array(real_energy_list)
+ ),
+ "state_energy_by_trace": regression_measures(
+ np.array(model_state_energy_list), np.array(real_energy_list)
+ ),
+ }
diff --git a/lib/parameters.py b/lib/parameters.py
index 8b562b6..5c6b978 100644
--- a/lib/parameters.py
+++ b/lib/parameters.py
@@ -1,11 +1,15 @@
import itertools
+import logging
import numpy as np
+import warnings
from collections import OrderedDict
from copy import deepcopy
from multiprocessing import Pool
from .utils import remove_index_from_tuple, is_numeric
from .utils import filter_aggregate_by_param, by_name_to_by_param
+logger = logging.getLogger(__name__)
+
def distinct_param_values(by_name, state_or_tran):
"""
@@ -78,25 +82,7 @@ def _reduce_param_matrix(matrix: np.ndarray, parameter_names: list) -> list:
return list()
-def _codependent_parameters(param, lut_by_param_values, std_by_param_values):
- """
- Return list of parameters which affect whether a parameter affects a model attribute or not.
- """
- return list()
- safe_div = np.vectorize(lambda x, y: 0.0 if x == 0 else 1 - x / y)
- ratio_by_value = safe_div(lut_by_param_values, std_by_param_values)
- err_mode = np.seterr("ignore")
- dep_by_value = ratio_by_value > 0.5
- np.seterr(**err_mode)
-
- other_param_list = list(filter(lambda x: x != param, self._parameter_names))
- influencer_parameters = _reduce_param_matrix(dep_by_value, other_param_list)
- return influencer_parameters
-
-
-def _std_by_param(
- by_param, all_param_values, state_or_tran, attribute, param_index, verbose=False
-):
+def _std_by_param(by_param, all_param_values, state_or_tran, attribute, param_index):
u"""
Calculate standard deviations for a static model where all parameters but `param_index` are constant.
@@ -162,12 +148,11 @@ def _std_by_param(
# vprint(verbose, '[W] parameter value partition for {} is empty'.format(param_value))
if np.all(np.isnan(stddev_matrix)):
- print(
- "[W] {}/{} parameter #{} has no data partitions -- how did this even happen?".format(
- state_or_tran, attribute, param_index
+ warnings.warn(
+ "{}/{} parameter #{} has no data partitions. stddev_matrix = {}".format(
+ state_or_tran, attribute, param_index, stddev_matrix
)
)
- print("stddev_matrix = {}".format(stddev_matrix))
return stddev_matrix, 0.0
return (
@@ -202,13 +187,13 @@ def _corr_by_param(by_name, state_or_trans, attribute, param_index):
# -> assume no correlation
return 0.0
except ValueError:
- print(
- "[!] Exception in _corr_by_param(by_name, state_or_trans={}, attribute={}, param_index={})".format(
+ logger.error(
+ "ValueError in _corr_by_param(by_name, state_or_trans={}, attribute={}, param_index={})".format(
state_or_trans, attribute, param_index
)
)
- print(
- "[!] while executing np.corrcoef(by_name[{}][{}]={}, {}))".format(
+ logger.error(
+ "while executing np.corrcoef(by_name[{}][{}]={}, {}))".format(
state_or_trans,
attribute,
by_name[state_or_trans][attribute],
@@ -229,7 +214,6 @@ def _compute_param_statistics(
attribute,
distinct_values,
distinct_values_by_param_index,
- verbose=False,
):
"""
Compute standard deviation and correlation coefficient for various data partitions.
@@ -252,7 +236,6 @@ def _compute_param_statistics(
:param arg_count: dict providing the number of functions args ("local parameters") for each function.
:param state_or_trans: state or transition name, e.g. 'send' or 'TX'
:param attribute: model attribute, e.g. 'power' or 'duration'
- :param verbose: print warning if some parameter partitions are too small for fitting
:returns: a dict with the following content:
std_static -- static parameter-unaware model error: stddev of by_name[state_or_trans][attribute]
@@ -267,6 +250,8 @@ def _compute_param_statistics(
corr_by_param -- correlation coefficient
corr_by_arg -- same, but ignoring a single function argument
Only set if state_or_trans appears in arg_count, empty dict otherwise.
+ depends_on_param -- dict(parameter_name -> Bool). True if /attribute/ behaviour probably depends on /parameter_name/
+ depends_on_arg -- list(bool). Same, but for function arguments, if any.
"""
ret = {
"std_static": np.std(by_name[state_or_trans][attribute]),
@@ -287,7 +272,6 @@ def _compute_param_statistics(
"corr_by_arg": [],
"depends_on_param": {},
"depends_on_arg": [],
- "param_data": {},
}
np.seterr("raise")
@@ -299,7 +283,6 @@ def _compute_param_statistics(
state_or_trans,
attribute,
param_idx,
- verbose,
)
ret["std_by_param"][param] = mean_std
ret["std_by_param_values"][param] = std_matrix
@@ -314,49 +297,6 @@ def _compute_param_statistics(
ret["std_param_lut"],
)
- if ret["depends_on_param"][param]:
- ret["param_data"][param] = {
- "codependent_parameters": _codependent_parameters(
- param, lut_matrix, std_matrix
- ),
- "depends_for_codependent_value": dict(),
- }
-
- # calculate parameter dependence for individual values of codependent parameters
- codependent_param_values = list()
- for codependent_param in ret["param_data"][param]["codependent_parameters"]:
- codependent_param_values.append(distinct_values[codependent_param])
- for combi in itertools.product(*codependent_param_values):
- by_name_part = deepcopy(by_name)
- filter_list = list(
- zip(ret["param_data"][param]["codependent_parameters"], combi)
- )
- filter_aggregate_by_param(by_name_part, parameter_names, filter_list)
- by_param_part = by_name_to_by_param(by_name_part)
- # there may be no data for this specific parameter value combination
- if state_or_trans in by_name_part:
- part_corr = _corr_by_param(
- by_name_part, state_or_trans, attribute, param_idx
- )
- part_std_lut = np.mean(
- [
- np.std(by_param_part[x][attribute])
- for x in by_param_part.keys()
- if x[0] == state_or_trans
- ]
- )
- _, part_std_param, _ = _std_by_param(
- by_param_part,
- distinct_values_by_param_index,
- state_or_trans,
- attribute,
- param_idx,
- verbose,
- )
- ret["param_data"][param]["depends_for_codependent_value"][
- combi
- ] = _depends_on_param(part_corr, part_std_param, part_std_lut)
-
if state_or_trans in arg_count:
for arg_index in range(arg_count[state_or_trans]):
std_matrix, mean_std, lut_matrix = _std_by_param(
@@ -365,7 +305,6 @@ def _compute_param_statistics(
state_or_trans,
attribute,
len(parameter_names) + arg_index,
- verbose,
)
ret["std_by_arg"].append(mean_std)
ret["std_by_arg_values"].append(std_matrix)
@@ -447,8 +386,8 @@ def prune_dependent_parameters(by_name, parameter_names, correlation_threshold=0
correlation != np.nan
and np.abs(correlation) > correlation_threshold
):
- print(
- "[!] Parameters {} <-> {} are correlated with coefficcient {}".format(
+ logger.debug(
+ "Parameters {} <-> {} are correlated with coefficcient {}".format(
parameter_names[index_1],
parameter_names[index_2],
correlation,
@@ -458,7 +397,7 @@ def prune_dependent_parameters(by_name, parameter_names, correlation_threshold=0
index_to_remove = index_1
else:
index_to_remove = index_2
- print(
+ logger.debug(
" Removing parameter {}".format(
parameter_names[index_to_remove]
)
@@ -495,13 +434,7 @@ class ParamStats:
"""
def __init__(
- self,
- by_name,
- by_param,
- parameter_names,
- arg_count,
- use_corrcoef=False,
- verbose=False,
+ self, by_name, by_param, parameter_names, arg_count, use_corrcoef=False,
):
"""
Compute standard deviation and correlation coefficient on parameterized data partitions.
@@ -556,7 +489,6 @@ class ParamStats:
attribute,
self.distinct_values[state_or_tran],
self.distinct_values_by_param_index[state_or_tran],
- verbose,
],
}
)
@@ -592,147 +524,21 @@ class ParamStats:
)
> 2
):
- print(
- key,
- param,
- list(
- filter(
- lambda n: is_numeric(n),
- self.distinct_values[key][param],
- )
- ),
+ logger.debug(
+ "{} can be fitted for param {} on {}".format(
+ key,
+ param,
+ list(
+ filter(
+ lambda n: is_numeric(n),
+ self.distinct_values[key][param],
+ )
+ ),
+ )
)
return True
return False
- def static_submodel_params(self, state_or_tran, attribute):
- """
- Return the union of all parameter values which decide whether another parameter influences the model or not.
-
- I.e., the returned list of dicts contains one entry for each parameter value combination which (probably) does not have any parameter influencing the model.
- If the current parameters matches one of these, a static sub-model built based on this subset of parameters can likely be used.
- """
- # TODO
- pass
-
- def has_codependent_parameters(
- self, state_or_tran: str, attribute: str, param: str
- ) -> bool:
- """
- Return whether there are parameters which determine whether `param` influences `state_or_tran` `attribute` or not.
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- :param param: parameter name
- """
- if len(self.codependent_parameters(state_or_tran, attribute, param)):
- return True
- return False
-
- def codependent_parameters(
- self, state_or_tran: str, attribute: str, param: str
- ) -> list:
- """
- Return list of parameters which determine whether `param` influences `state_or_tran` `attribute` or not.
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- :param param: parameter name
- """
- if self.stats[state_or_tran][attribute]["depends_on_param"][param]:
- return self.stats[state_or_tran][attribute]["param_data"][param][
- "codependent_parameters"
- ]
- return list()
-
- def has_codependent_parameters_union(
- self, state_or_tran: str, attribute: str
- ) -> bool:
- """
- Return whether there is a subset of parameters which decides whether `state_or_tran` `attribute` is static or parameter-dependent
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- """
- depends_on_a_parameter = False
- for param in self._parameter_names:
- if self.stats[state_or_tran][attribute]["depends_on_param"][param]:
- print("{}/{} depends on {}".format(state_or_tran, attribute, param))
- depends_on_a_parameter = True
- if (
- len(self.codependent_parameters(state_or_tran, attribute, param))
- == 0
- ):
- print("has no codependent parameters")
- # Always depends on this parameter, regardless of other parameters' values
- return False
- return depends_on_a_parameter
-
- def codependent_parameters_union(self, state_or_tran: str, attribute: str) -> list:
- """
- Return list of parameters which determine whether any parameter influences `state_or_tran` `attribute`.
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- """
- codependent_parameters = set()
- for param in self._parameter_names:
- if self.stats[state_or_tran][attribute]["depends_on_param"][param]:
- if (
- len(self.codependent_parameters(state_or_tran, attribute, param))
- == 0
- ):
- return list(self._parameter_names)
- for codependent_param in self.codependent_parameters(
- state_or_tran, attribute, param
- ):
- codependent_parameters.add(codependent_param)
- return sorted(codependent_parameters)
-
- def codependence_by_codependent_param_values(
- self, state_or_tran: str, attribute: str, param: str
- ) -> dict:
- """
- Return dict mapping codependent parameter values to a boolean indicating whether `param` influences `state_or_tran` `attribute`.
-
- If a dict value is true, `attribute` depends on `param` for the corresponding codependent parameter values, otherwise it does not.
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- :param param: parameter name
- """
- if self.stats[state_or_tran][attribute]["depends_on_param"][param]:
- return self.stats[state_or_tran][attribute]["param_data"][param][
- "depends_for_codependent_value"
- ]
- return dict()
-
- def codependent_parameter_value_dicts(
- self, state_or_tran: str, attribute: str, param: str, kind="dynamic"
- ):
- """
- Return dicts of codependent parameter key-value mappings for which `param` influences (or does not influence) `state_or_tran` `attribute`.
-
- :param state_or_tran: model state or transition
- :param attribute: model attribute
- :param param: parameter name:
- :param kind: 'static' or 'dynamic'. If 'dynamic' (the default), returns codependent parameter values for which `param` influences `attribute`. If 'static', returns codependent parameter values for which `param` does not influence `attribute`
- """
- codependent_parameters = self.stats[state_or_tran][attribute]["param_data"][
- param
- ]["codependent_parameters"]
- codependence_info = self.stats[state_or_tran][attribute]["param_data"][param][
- "depends_for_codependent_value"
- ]
- if len(codependent_parameters) == 0:
- return
- else:
- for param_values, is_dynamic in codependence_info.items():
- if (is_dynamic and kind == "dynamic") or (
- not is_dynamic and kind == "static"
- ):
- yield dict(zip(codependent_parameters, param_values))
-
def _generic_param_independence_ratio(self, state_or_trans, attribute):
"""
Return the heuristic ratio of parameter independence for state_or_trans and attribute.
diff --git a/lib/protocol_benchmarks.py b/lib/protocol_benchmarks.py
index b42e821..d41979f 100755
--- a/lib/protocol_benchmarks.py
+++ b/lib/protocol_benchmarks.py
@@ -16,8 +16,11 @@ import io
import os
import re
import time
+import logging
from filelock import FileLock
+logger = logging.getLogger(__name__)
+
class DummyProtocol:
def __init__(self):
@@ -1838,14 +1841,14 @@ class Benchmark:
this_result["data"] = data
if value != None:
this_result[key] = {"v": value, "ts": int(time.time())}
- print(
+ logger.debug(
"{} {} {} ({}) :: {} -> {}".format(
libkey, bench_name, bench_index, data, key, value
)
)
else:
this_result[key] = {"e": error, "ts": int(time.time())}
- print(
+ logger.debug(
"{} {} {} ({}) :: {} -> [E] {}".format(
libkey, bench_name, bench_index, data, key, error[:500]
)
diff --git a/lib/runner.py b/lib/runner.py
index 16f0a29..77b7c68 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -31,7 +31,8 @@ class SerialReader(serial.threaded.Protocol):
"""Create a new SerialReader object."""
self.callback = callback
self.recv_buf = ""
- self.lines = []
+ self.lines = list()
+ self.all_lines = list()
def __call__(self):
return self
@@ -47,7 +48,9 @@ class SerialReader(serial.threaded.Protocol):
# Note: Do not call str.strip on lines[-1]! Otherwise, lines may be mangled
lines = self.recv_buf.split("\n")
if len(lines) > 1:
- self.lines.extend(map(str.strip, lines[:-1]))
+ new_lines = list(map(str.strip, lines[:-1]))
+ self.lines.extend(new_lines)
+ self.all_lines.extend(new_lines)
self.recv_buf = lines[-1]
if self.callback:
for line in lines[:-1]:
@@ -120,7 +123,7 @@ class SerialMonitor:
return self.reader.get_lines()
def get_lines(self) -> list:
- return self.reader.get_lines()
+ return self.reader.all_lines
def get_files(self) -> list:
return list()
@@ -143,6 +146,9 @@ class SerialMonitor:
class EnergyTraceMonitor(SerialMonitor):
"""EnergyTraceMonitor captures serial timing output and EnergyTrace energy data."""
+ # Zusätzliche key-value-Argumente von generate-dfa-benchmark.py --energytrace=... landen hier
+ # (z.B. --energytrace=var1=bar,somecount=2 => EnerygTraceMonitor(..., var1="bar", somecount="2")).
+ # Soald das EnergyTraceMonitor-Objekt erzeugt wird, beginnt die Messung (d.h. hier: msp430-etv wird gestartet)
def __init__(self, port: str, baud: int, callback=None, voltage=3.3):
super().__init__(port=port, baud=baud, callback=callback)
self._voltage = voltage
@@ -155,20 +161,31 @@ class EnergyTraceMonitor(SerialMonitor):
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
+ # Benchmark fertig -> externe Hilfsprogramme beenden
def close(self):
super().close()
self._logger.send_signal(subprocess.signal.SIGINT)
stdout, stderr = self._logger.communicate(timeout=15)
+ # Zusätzliche Dateien, die mit dem Benchmark-Log und -Plan abgespeichert werden sollen
+ # (hier: Die von msp430-etv generierten Logfiles)
def get_files(self) -> list:
return [self._output]
+ #
def get_config(self) -> dict:
return {
"voltage": self._voltage,
}
+class EnergyTraceLogicAnalyzerMonitor(EnergyTraceMonitor):
+ """EnergyTraceLogicAnalyzerMonitor captures EnergyTrace energy data and LogicAnalyzer timing output."""
+
+ def __init__(self, port: str, baud: int, callback=None, voltage=3.3):
+ super().__init__(port=port, baud=baud, callback=callback, voltage=voltage)
+
+
class MIMOSAMonitor(SerialMonitor):
"""MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time."""
@@ -362,8 +379,14 @@ def get_monitor(arch: str, **kwargs) -> object:
mimosa_kwargs = kwargs.pop("mimosa")
return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs)
elif "energytrace" in kwargs and kwargs["energytrace"] is not None:
- energytrace_kwargs = kwargs.pop("energytrace")
- return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs)
+ energytrace_kwargs = kwargs.pop("energytrace").copy()
+ sync_mode = energytrace_kwargs.pop("sync")
+ if sync_mode == "la":
+ return EnergyTraceLogicAnalyzerMonitor(
+ port, arg, **energytrace_kwargs, **kwargs
+ )
+ else:
+ return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs)
else:
kwargs.pop("energytrace", None)
kwargs.pop("mimosa", None)
@@ -382,6 +405,23 @@ def get_counter_limits(arch: str) -> tuple:
raise RuntimeError("Did not find Counter Overflow limits")
+def sleep_ms(duration: int, arch: str, cpu_freq: int = None) -> str:
+ max_sleep = None
+ if "msp430fr" in arch:
+ if cpu_freq is not None and cpu_freq > 8000000:
+ max_sleep = 250
+ else:
+ max_sleep = 500
+ if max_sleep is not None and duration > max_sleep:
+ sub_sleep_count = duration // max_sleep
+ tail_sleep = duration % max_sleep
+ ret = f"for (unsigned char i = 0; i < {sub_sleep_count}; i++) {{ arch.sleep_ms({max_sleep}); }}\n"
+ if tail_sleep > 0:
+ ret += f"arch.sleep_ms({tail_sleep});\n"
+ return ret
+ return "arch.sleep_ms({duration});\n"
+
+
def get_counter_limits_us(arch: str) -> tuple:
"""Return duration of one counter step and one counter overflow in us."""
cpu_freq = 0
diff --git a/lib/utils.py b/lib/utils.py
index 91dded0..d28ecda 100644
--- a/lib/utils.py
+++ b/lib/utils.py
@@ -1,17 +1,9 @@
import numpy as np
import re
+import logging
arg_support_enabled = True
-
-
-def vprint(verbose, string):
- """
- Print `string` if `verbose`.
-
- Prints string if verbose is a True value
- """
- if verbose:
- print(string)
+logger = logging.getLogger(__name__)
def running_mean(x: np.ndarray, N: int) -> np.ndarray:
@@ -222,7 +214,7 @@ def filter_aggregate_by_param(aggregate, parameters, parameter_filter):
)
)
if len(indices_to_keep) == 0:
- print("??? {}->{}".format(parameter_filter, name))
+ logger.debug("??? {}->{}".format(parameter_filter, name))
names_to_remove.add(name)
else:
for attribute in aggregate[name]["attributes"]:
diff --git a/lib/validation.py b/lib/validation.py
new file mode 100644
index 0000000..ee147fe
--- /dev/null
+++ b/lib/validation.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python3
+
+import logging
+import numpy as np
+
+logger = logging.getLogger(__name__)
+
+
+def _xv_partitions_kfold(length, k=10):
+ """
+ Return k pairs of training and validation sets for k-fold cross-validation on `length` items.
+
+ In k-fold cross-validation, every k-th item is used for validation and the remainder is used for training.
+ As there are k ways to do this (items 0, k, 2k, ... vs. items 1, k+1, 2k+1, ... etc), this function returns k pairs of training and validation set.
+
+ Note that this function operates on indices, not data.
+ """
+ pairs = []
+ num_slices = k
+ indexes = np.arange(length)
+ for i in range(num_slices):
+ training = np.delete(indexes, slice(i, None, num_slices))
+ validation = indexes[i::num_slices]
+ pairs.append((training, validation))
+ return pairs
+
+
+def _xv_partition_montecarlo(length):
+ """
+ Return training and validation set for Monte Carlo cross-validation on `length` items.
+
+ This function operates on indices, not data. It randomly partitions range(length) into a list of training indices and a list of validation indices.
+
+ The training set contains 2/3 of all indices; the validation set consits of the remaining 1/3.
+
+ Example: 9 items -> training = [7, 3, 8, 0, 4, 2], validation = [ 1, 6, 5]
+ """
+ shuffled = np.random.permutation(np.arange(length))
+ border = int(length * float(2) / 3)
+ training = shuffled[:border]
+ validation = shuffled[border:]
+ return (training, validation)
+
+
+class CrossValidator:
+ """
+ Cross-Validation helper for model generation.
+
+ Given a set of measurements and a model class, it will partition the
+ data into training and validation sets, train the model on the training
+ set, and assess its quality on the validation set. This is repeated
+ several times depending on cross-validation algorithm and configuration.
+ Reports the mean model error over all cross-validation runs.
+ """
+
+ def __init__(self, model_class, by_name, parameters, arg_count):
+ """
+ Create a new CrossValidator object.
+
+ Does not perform cross-validation yet.
+
+ arguments:
+ model_class -- model class/type used for model synthesis,
+ e.g. PTAModel or AnalyticModel. model_class must have a
+ constructor accepting (by_name, parameters, arg_count)
+ and provide an `assess` method.
+ by_name -- measurements aggregated by state/transition/function/... name.
+ Layout: by_name[name][attribute] = list of data. Additionally,
+ by_name[name]['attributes'] must be set to the list of attributes,
+ e.g. ['power'] or ['duration', 'energy'].
+ """
+ self.model_class = model_class
+ self.by_name = by_name
+ self.names = sorted(by_name.keys())
+ self.parameters = sorted(parameters)
+ self.arg_count = arg_count
+
+ def kfold(self, model_getter, k=10):
+ """
+ Perform k-fold cross-validation and return average model quality.
+
+ The by_name data is divided into 1-1/k training and 1/k validation in a deterministic manner.
+ After creating a model for the training set, the
+ model type returned by model_getter is evaluated on the validation set.
+ This is repeated k times; the average of all measures is returned to the user.
+
+ arguments:
+ model_getter -- function with signature (model_object) -> model,
+ e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware
+ model with automatic parameter detection.
+ k -- step size for k-fold cross-validation. The validation set contains 100/k % of data.
+
+ return value:
+ dict of model quality measures.
+ {
+ 'by_name' : {
+ for each name: {
+ for each attribute: {
+ 'mae' : mean of all mean absolute errors
+ 'mae_list' : list of the individual MAE values encountered during cross-validation
+ 'smape' : mean of all symmetric mean absolute percentage errors
+ 'smape_list' : list of the individual SMAPE values encountered during cross-validation
+ }
+ }
+ }
+ }
+ """
+
+ # training / validation subsets for each state and transition
+ subsets_by_name = dict()
+ training_and_validation_sets = list()
+
+ for name in self.names:
+ sample_count = len(self.by_name[name]["param"])
+ subsets_by_name[name] = list()
+ subsets_by_name[name] = _xv_partitions_kfold(sample_count, k)
+
+ for i in range(k):
+ training_and_validation_sets.append(dict())
+ for name in self.names:
+ training_and_validation_sets[i][name] = subsets_by_name[name][i]
+
+ return self._generic_xv(model_getter, training_and_validation_sets)
+
+ def montecarlo(self, model_getter, count=200):
+ """
+ Perform Monte Carlo cross-validation and return average model quality.
+
+ The by_name data is randomly divided into 2/3 training and 1/3
+ validation. After creating a model for the training set, the
+ model type returned by model_getter is evaluated on the validation set.
+ This is repeated count times (defaulting to 200); the average of all
+ measures is returned to the user.
+
+ arguments:
+ model_getter -- function with signature (model_object) -> model,
+ e.g. lambda m: m.get_fitted()[0] to evaluate the parameter-aware
+ model with automatic parameter detection.
+ count -- number of validation runs to perform, defaults to 200
+
+ return value:
+ dict of model quality measures.
+ {
+ 'by_name' : {
+ for each name: {
+ for each attribute: {
+ 'mae' : mean of all mean absolute errors
+ 'mae_list' : list of the individual MAE values encountered during cross-validation
+ 'smape' : mean of all symmetric mean absolute percentage errors
+ 'smape_list' : list of the individual SMAPE values encountered during cross-validation
+ }
+ }
+ }
+ }
+ """
+
+ # training / validation subsets for each state and transition
+ subsets_by_name = dict()
+ training_and_validation_sets = list()
+
+ for name in self.names:
+ sample_count = len(self.by_name[name]["param"])
+ subsets_by_name[name] = list()
+ for _ in range(count):
+ subsets_by_name[name].append(_xv_partition_montecarlo(sample_count))
+
+ for i in range(count):
+ training_and_validation_sets.append(dict())
+ for name in self.names:
+ training_and_validation_sets[i][name] = subsets_by_name[name][i]
+
+ return self._generic_xv(model_getter, training_and_validation_sets)
+
+ def _generic_xv(self, model_getter, training_and_validation_sets):
+ ret = {"by_name": dict()}
+
+ for name in self.names:
+ ret["by_name"][name] = dict()
+ for attribute in self.by_name[name]["attributes"]:
+ ret["by_name"][name][attribute] = {
+ "mae_list": list(),
+ "rmsd_list": list(),
+ "smape_list": list(),
+ }
+
+ for training_and_validation_by_name in training_and_validation_sets:
+ res = self._single_xv(model_getter, training_and_validation_by_name)
+ for name in self.names:
+ for attribute in self.by_name[name]["attributes"]:
+ for measure in ("mae", "rmsd", "smape"):
+ ret["by_name"][name][attribute][f"{measure}_list"].append(
+ res["by_name"][name][attribute][measure]
+ )
+
+ for name in self.names:
+ for attribute in self.by_name[name]["attributes"]:
+ for measure in ("mae", "rmsd", "smape"):
+ ret["by_name"][name][attribute][measure] = np.mean(
+ ret["by_name"][name][attribute][f"{measure}_list"]
+ )
+
+ return ret
+
+ def _single_xv(self, model_getter, tv_set_dict):
+ training = dict()
+ validation = dict()
+ for name in self.names:
+ training[name] = {"attributes": self.by_name[name]["attributes"]}
+ validation[name] = {"attributes": self.by_name[name]["attributes"]}
+
+ if "isa" in self.by_name[name]:
+ training[name]["isa"] = self.by_name[name]["isa"]
+ validation[name]["isa"] = self.by_name[name]["isa"]
+
+ training_subset, validation_subset = tv_set_dict[name]
+
+ for attribute in self.by_name[name]["attributes"]:
+ self.by_name[name][attribute] = np.array(self.by_name[name][attribute])
+ training[name][attribute] = self.by_name[name][attribute][
+ training_subset
+ ]
+ validation[name][attribute] = self.by_name[name][attribute][
+ validation_subset
+ ]
+
+ # We can't use slice syntax for 'param', which may contain strings and other odd values
+ training[name]["param"] = list()
+ validation[name]["param"] = list()
+ for idx in training_subset:
+ training[name]["param"].append(self.by_name[name]["param"][idx])
+ for idx in validation_subset:
+ validation[name]["param"].append(self.by_name[name]["param"][idx])
+
+ training_data = self.model_class(training, self.parameters, self.arg_count)
+ training_model = model_getter(training_data)
+ validation_data = self.model_class(validation, self.parameters, self.arg_count)
+
+ return validation_data.assess(training_model)