diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2020-09-07 14:57:39 +0200 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2020-09-07 14:57:39 +0200 |
commit | 8b969f4945e97d811b7a5b27c99b76cf2dd2840b (patch) | |
tree | 68b402ae63953d51d5d10308003b7ce0ea04db71 | |
parent | 160546a8b11a26c1c56f26b8eff68e455fa9ca1e (diff) | |
parent | ab33810fa92f8a262695077ae9504c836cd3c1a2 (diff) |
Merge branch 'master' into decisiontrees
-rwxr-xr-x | bin/analyze-archive.py | 60 | ||||
-rwxr-xr-x | bin/generate-dfa-benchmark.py | 43 | ||||
-rw-r--r-- | lib/loader.py | 6 | ||||
-rw-r--r-- | lib/model.py | 8 | ||||
-rw-r--r-- | lib/runner.py | 242 | ||||
-rw-r--r-- | lib/validation.py | 21 | ||||
-rwxr-xr-x | test/test_ptamodel.py | 465 |
7 files changed, 687 insertions, 158 deletions
diff --git a/bin/analyze-archive.py b/bin/analyze-archive.py index 4c442af..5a6b8f0 100755 --- a/bin/analyze-archive.py +++ b/bin/analyze-archive.py @@ -101,6 +101,9 @@ Options: --export-energymodel=<model.json> Export energy model. Works out of the box for v1 and v2 logfiles. Requires --hwmodel for v0 logfiles. + +--no-cache + Do not load cached measurement results """ import getopt @@ -142,6 +145,15 @@ def format_quality_measures(result): def model_quality_table(result_lists, info_list): + print( + "{:20s} {:15s} {:19s} {:19s} {:19s}".format( + "key", + "attribute", + "static".center(19), + "parameterized".center(19), + "LUT".center(19), + ) + ) for state_or_tran in result_lists[0]["by_name"].keys(): for key in result_lists[0]["by_name"][state_or_tran].keys(): buf = "{:20s} {:15s}".format(state_or_tran, key) @@ -152,7 +164,7 @@ def model_quality_table(result_lists, info_list): result = results["by_name"][state_or_tran][key] buf += format_quality_measures(result) else: - buf += "{:6}----{:9}".format("", "") + buf += "{:7}----{:8}".format("", "") print(buf) @@ -300,7 +312,7 @@ if __name__ == "__main__": try: optspec = ( - "info " + "info no-cache " "plot-unparam= plot-param= plot-traces= show-models= show-quality= " "ignored-trace-indexes= function-override= " "export-traces= " @@ -362,11 +374,18 @@ if __name__ == "__main__": sys.exit(2) raw_data = RawData( - args, with_traces=("export-traces" in opt or "plot-traces" in opt) + args, + with_traces=("export-traces" in opt or "plot-traces" in opt), + skip_cache=("no-cache" in opt), ) if "info" in opt: print(" ".join(raw_data.filenames) + ":") + if raw_data.ptalog: + options = " --".join( + map(lambda kv: f"{kv[0]}={str(kv[1])}", raw_data.ptalog["opt"].items()) + ) + print(f" Options: --{options}") if raw_data.version <= 1: data_source = "MIMOSA" elif raw_data.version == 2: @@ -420,7 +439,7 @@ if __name__ == "__main__": ) sys.exit(2) - if len(traces) > 20: + if len(traces) > 40: print(f"""Truncating plot to 40 of {len(traces)} traces (random sample)""") traces = random.sample(traces, 40) @@ -693,7 +712,7 @@ if __name__ == "__main__": ) if "overall" in show_quality or "all" in show_quality: - print("overall static/param/lut MAE assuming equal state distribution:") + print("overall state static/param/lut MAE assuming equal state distribution:") print( " {:6.1f} / {:6.1f} / {:6.1f} µW".format( model.assess_states(static_model), @@ -701,15 +720,30 @@ if __name__ == "__main__": model.assess_states(lut_model), ) ) - print("overall static/param/lut MAE assuming 95% STANDBY1:") - distrib = {"STANDBY1": 0.95, "POWERDOWN": 0.03, "TX": 0.01, "RX": 0.01} - print( - " {:6.1f} / {:6.1f} / {:6.1f} µW".format( - model.assess_states(static_model, distribution=distrib), - model.assess_states(param_model, distribution=distrib), - model.assess_states(lut_model, distribution=distrib), + distrib = dict() + num_states = len(model.states()) + p95_state = None + for state in model.states(): + distrib[state] = 1.0 / num_states + + if "STANDBY1" in model.states(): + p95_state = "STANDBY1" + elif "SLEEP" in model.states(): + p95_state = "SLEEP" + + if p95_state is not None: + for state in distrib.keys(): + distrib[state] = 0.05 / (num_states - 1) + distrib[p95_state] = 0.95 + + print(f"overall state static/param/lut MAE assuming 95% {p95_state}:") + print( + " {:6.1f} / {:6.1f} / {:6.1f} µW".format( + model.assess_states(static_model, distribution=distrib), + model.assess_states(param_model, distribution=distrib), + model.assess_states(lut_model, distribution=distrib), + ) ) - ) if "summary" in show_quality or "all" in show_quality: model_summary_table( diff --git a/bin/generate-dfa-benchmark.py b/bin/generate-dfa-benchmark.py index 1410c28..64f8f73 100755 --- a/bin/generate-dfa-benchmark.py +++ b/bin/generate-dfa-benchmark.py @@ -223,17 +223,11 @@ def benchmark_from_runs( ) elif opt["sleep"]: if "energytrace" in opt: - outbuf.write( - "arch.sleep_ms({:d}); // {}\n".format( - opt["sleep"], transition.destination.name - ) - ) + outbuf.write(f"// -> {transition.destination.name}\n") + outbuf.write(target.sleep_ms(opt["sleep"])) else: - outbuf.write( - "arch.delay_ms({:d}); // {}\n".format( - opt["sleep"], transition.destination.name - ) - ) + outbuf.write(f"// -> {transition.destination.name}\n") + outbuf.write("arch.delay_ms({:d});\n".format(opt["sleep"])) outbuf.write(harness.stop_run(num_traces)) if dummy: @@ -289,7 +283,7 @@ def run_benchmark( needs_split = True else: try: - runner.build(arch, app, run_args) + target.build(app, run_args) except RuntimeError: if len(runs) > 50: # Application is too large -> split up runs @@ -342,14 +336,14 @@ def run_benchmark( i = 0 while i < opt["repeat"]: print(f"""[RUN] flashing benchmark {i+1}/{opt["repeat"]}""") - runner.flash(arch, app, run_args) + target.flash(app, run_args) if "mimosa" in opt: - monitor = runner.get_monitor( - arch, callback=harness.parser_cb, mimosa=opt["mimosa"] + monitor = target.get_monitor( + callback=harness.parser_cb, mimosa=opt["mimosa"] ) elif "energytrace" in opt: - monitor = runner.get_monitor( - arch, callback=harness.parser_cb, energytrace=opt["energytrace"] + monitor = target.get_monitor( + callback=harness.parser_cb, energytrace=opt["energytrace"] ) sync_error = False @@ -400,8 +394,8 @@ def run_benchmark( return [(runs, harness, monitor, files)] else: - runner.flash(arch, app, run_args) - monitor = runner.get_monitor(arch, callback=harness.parser_cb) + target.flash(app, run_args) + monitor = target.get_monitor(callback=harness.parser_cb) if arch == "posix": print("[RUN] Will run benchmark for {:.0f} seconds".format(run_timeout)) @@ -518,6 +512,11 @@ if __name__ == "__main__": print(err) sys.exit(2) + if "msp430fr" in opt["arch"]: + target = runner.Arch(opt["arch"], ["cpu_freq=8000000"]) + else: + target = runner.Arch(opt["arch"]) + modelfile = args[0] pta = PTA.from_file(modelfile) @@ -594,8 +593,8 @@ if __name__ == "__main__": if "codegen" in driver_definition and "flags" in driver_definition["codegen"]: if run_flags is None: run_flags = driver_definition["codegen"]["flags"] - if run_flags is None: - run_flags = opt["run"].split() + if "run" in opt: + run_flags.extend(opt["run"].split()) runs = list( pta.dfs( @@ -642,7 +641,7 @@ if __name__ == "__main__": gpio_pin=timer_pin, gpio_mode=gpio_mode, pta=pta, - counter_limits=runner.get_counter_limits_us(opt["arch"]), + counter_limits=target.get_counter_limits_us(run_flags), log_return_values=need_return_values, repeat=1, ) @@ -650,7 +649,7 @@ if __name__ == "__main__": harness = OnboardTimerHarness( gpio_pin=timer_pin, pta=pta, - counter_limits=runner.get_counter_limits_us(opt["arch"]), + counter_limits=target.get_counter_limits_us(run_flags), log_return_values=need_return_values, repeat=opt["repeat"], ) diff --git a/lib/loader.py b/lib/loader.py index c35eb4c..57b3d30 100644 --- a/lib/loader.py +++ b/lib/loader.py @@ -242,7 +242,7 @@ class RawData: file system, making subsequent loads near-instant. """ - def __init__(self, filenames, with_traces=False): + def __init__(self, filenames, with_traces=False, skip_cache=False): """ Create a new RawData object. @@ -321,7 +321,7 @@ class RawData: self.pta = self.ptalog["pta"] self.set_cache_file() - if not with_traces: + if not with_traces and not skip_cache: self.load_cache() def set_cache_file(self): @@ -481,7 +481,7 @@ class RawData: if sorted(online_trace_part["parameter"].keys()) != self._parameter_names: processed_data[ "error" - ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want:s}, is {param_is:s}".format( + ] = "Offline #{off_idx:d} (online {on_name:s} @ {on_idx:d}/{on_sub:d}) has inconsistent parameter set: should be {param_want}, is {param_is}".format( off_idx=offline_idx, on_idx=online_run_idx, on_sub=online_trace_part_idx, diff --git a/lib/model.py b/lib/model.py index f53f645..41cf726 100644 --- a/lib/model.py +++ b/lib/model.py @@ -5,6 +5,7 @@ import numpy as np from scipy import optimize from sklearn.metrics import r2_score from multiprocessing import Pool +from .automata import PTA from .functions import analytic from .functions import AnalyticFunction from .parameters import ParamStats @@ -1111,13 +1112,16 @@ class PTAModel: static_quality = self.assess(static_model) param_model, param_info = self.get_fitted() analytic_quality = self.assess(param_model) - self.pta.update( + pta = self.pta + if pta is None: + pta = PTA(self.states(), parameters=self._parameter_names) + pta.update( static_model, param_info, static_error=static_quality["by_name"], analytic_error=analytic_quality["by_name"], ) - return self.pta.to_json() + return pta.to_json() def states(self): """Return sorted list of state names.""" diff --git a/lib/runner.py b/lib/runner.py index aeb8600..71ca799 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -311,113 +311,157 @@ class ShellMonitor: pass -def build(arch, app, opts=[]): - command = ["make", "arch={}".format(arch), "app={}".format(app), "clean"] - command.extend(opts) - res = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True - ) - if res.returncode != 0: - raise RuntimeError( - "Build failure, executing {}:\n".format(command) + res.stderr +class Arch: + def __init__(self, name, opts=list()): + self.name = name + self.opts = opts + self.info = self.get_info() + + def build(self, app, opts=list()): + command = ["make", "arch={}".format(self.name), "app={}".format(app), "clean"] + command.extend(self.opts) + command.extend(opts) + res = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, ) - command = ["make", "-B", "arch={}".format(arch), "app={}".format(app)] - command.extend(opts) - res = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True - ) - if res.returncode != 0: - raise RuntimeError( - "Build failure, executing {}:\n ".format(command) + res.stderr + if res.returncode != 0: + raise RuntimeError( + "Build failure, executing {}:\n".format(command) + res.stderr + ) + command = ["make", "-B", "arch={}".format(self.name), "app={}".format(app)] + command.extend(self.opts) + command.extend(opts) + res = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, ) - return command - - -def flash(arch, app, opts=[]): - command = ["make", "arch={}".format(arch), "app={}".format(app), "program"] - command.extend(opts) - res = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True - ) - if res.returncode != 0: - raise RuntimeError("Flash failure") - return command + if res.returncode != 0: + raise RuntimeError( + "Build failure, executing {}:\n ".format(command) + res.stderr + ) + return command + def flash(self, app, opts=list()): + command = ["make", "arch={}".format(self.name), "app={}".format(app), "program"] + command.extend(self.opts) + command.extend(opts) + res = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + if res.returncode != 0: + raise RuntimeError("Flash failure") + return command -def get_info(arch, opts: list = []) -> list: - """ - Return multipass "make info" output. + def get_info(self, opts=list()) -> list: + """ + Return multipass "make info" output. - Returns a list. - """ - command = ["make", "arch={}".format(arch), "info"] - command.extend(opts) - res = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True - ) - if res.returncode != 0: - raise RuntimeError("make info Failure") - return res.stdout.split("\n") + Returns a list. + """ + command = ["make", "arch={}".format(self.name), "info"] + command.extend(self.opts) + command.extend(opts) + res = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + if res.returncode != 0: + raise RuntimeError("make info Failure") + return res.stdout.split("\n") + def _cached_info(self, opts=list()) -> list: + if len(opts): + return self.get_info(opts) + return self.info -def get_monitor(arch: str, **kwargs) -> object: - """ - Return an appropriate monitor for arch, depending on "make info" output. + def get_monitor(self, **kwargs) -> object: + """ + Return an appropriate monitor for arch, depending on "make info" output. - Port and Baud rate are taken from "make info". + Port and Baud rate are taken from "make info". - :param arch: architecture name, e.g. 'msp430fr5994lp' or 'posix' - :param energytrace: `EnergyTraceMonitor` options. Returns an EnergyTrace monitor if not None. - :param mimosa: `MIMOSAMonitor` options. Returns a MIMOSA monitor if not None. - """ - for line in get_info(arch): - if "Monitor:" in line: - _, port, arg = line.split(" ") - if port == "run": - return ShellMonitor(arg, **kwargs) - elif "mimosa" in kwargs and kwargs["mimosa"] is not None: - mimosa_kwargs = kwargs.pop("mimosa") - return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs) - elif "energytrace" in kwargs and kwargs["energytrace"] is not None: - energytrace_kwargs = kwargs.pop("energytrace").copy() - sync_mode = energytrace_kwargs.pop("sync") - if sync_mode == "la": - return EnergyTraceLogicAnalyzerMonitor( - port, arg, **energytrace_kwargs, **kwargs - ) + :param energytrace: `EnergyTraceMonitor` options. Returns an EnergyTrace monitor if not None. + :param mimosa: `MIMOSAMonitor` options. Returns a MIMOSA monitor if not None. + """ + for line in self.info: + if "Monitor:" in line: + _, port, arg = line.split(" ") + if port == "run": + return ShellMonitor(arg, **kwargs) + elif "mimosa" in kwargs and kwargs["mimosa"] is not None: + mimosa_kwargs = kwargs.pop("mimosa") + return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs) + elif "energytrace" in kwargs and kwargs["energytrace"] is not None: + energytrace_kwargs = kwargs.pop("energytrace").copy() + sync_mode = energytrace_kwargs.pop("sync") + if sync_mode == "la": + return EnergyTraceLogicAnalyzerMonitor( + port, arg, **energytrace_kwargs, **kwargs + ) + else: + return EnergyTraceMonitor( + port, arg, **energytrace_kwargs, **kwargs + ) else: - return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs) + kwargs.pop("energytrace", None) + kwargs.pop("mimosa", None) + return SerialMonitor(port, arg, **kwargs) + raise RuntimeError("Monitor failure") + + def get_counter_limits(self, opts=list()) -> tuple: + """Return multipass max counter and max overflow value for arch.""" + for line in self._cached_info(opts): + match = re.match("Counter Overflow: ([^/]*)/(.*)", line) + if match: + overflow_value = int(match.group(1)) + max_overflow = int(match.group(2)) + return overflow_value, max_overflow + raise RuntimeError("Did not find Counter Overflow limits") + + def sleep_ms(self, duration: int, opts=list()) -> str: + max_sleep = None + if "msp430fr" in self.name: + cpu_freq = None + for line in self._cached_info(opts): + match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line) + if match: + cpu_freq = int(match.group(1)) + if cpu_freq is not None and cpu_freq > 8000000: + max_sleep = 250 else: - kwargs.pop("energytrace", None) - kwargs.pop("mimosa", None) - return SerialMonitor(port, arg, **kwargs) - raise RuntimeError("Monitor failure") - - -def get_counter_limits(arch: str) -> tuple: - """Return multipass max counter and max overflow value for arch.""" - for line in get_info(arch): - match = re.match("Counter Overflow: ([^/]*)/(.*)", line) - if match: - overflow_value = int(match.group(1)) - max_overflow = int(match.group(2)) - return overflow_value, max_overflow - raise RuntimeError("Did not find Counter Overflow limits") - - -def get_counter_limits_us(arch: str) -> tuple: - """Return duration of one counter step and one counter overflow in us.""" - cpu_freq = 0 - overflow_value = 0 - max_overflow = 0 - for line in get_info(arch): - match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line) - if match: - cpu_freq = int(match.group(1)) - match = re.match(r"Counter Overflow:\s+([^/]*)/(.*)", line) - if match: - overflow_value = int(match.group(1)) - max_overflow = int(match.group(2)) - if cpu_freq and overflow_value: - return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow - raise RuntimeError("Did not find Counter Overflow limits") + max_sleep = 500 + if max_sleep is not None and duration > max_sleep: + sub_sleep_count = duration // max_sleep + tail_sleep = duration % max_sleep + ret = f"for (unsigned char i = 0; i < {sub_sleep_count}; i++) {{ arch.sleep_ms({max_sleep}); }}\n" + if tail_sleep > 0: + ret += f"arch.sleep_ms({tail_sleep});\n" + return ret + return f"arch.sleep_ms({duration});\n" + + def get_counter_limits_us(self, opts=list()) -> tuple: + """Return duration of one counter step and one counter overflow in us.""" + cpu_freq = 0 + overflow_value = 0 + max_overflow = 0 + for line in self._cached_info(opts): + match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line) + if match: + cpu_freq = int(match.group(1)) + match = re.match(r"Counter Overflow:\s+([^/]*)/(.*)", line) + if match: + overflow_value = int(match.group(1)) + max_overflow = int(match.group(2)) + if cpu_freq and overflow_value: + return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow + raise RuntimeError("Did not find Counter Overflow limits") diff --git a/lib/validation.py b/lib/validation.py index 98d49c1..ee147fe 100644 --- a/lib/validation.py +++ b/lib/validation.py @@ -179,6 +179,7 @@ class CrossValidator: for attribute in self.by_name[name]["attributes"]: ret["by_name"][name][attribute] = { "mae_list": list(), + "rmsd_list": list(), "smape_list": list(), } @@ -186,21 +187,17 @@ class CrossValidator: res = self._single_xv(model_getter, training_and_validation_by_name) for name in self.names: for attribute in self.by_name[name]["attributes"]: - ret["by_name"][name][attribute]["mae_list"].append( - res["by_name"][name][attribute]["mae"] - ) - ret["by_name"][name][attribute]["smape_list"].append( - res["by_name"][name][attribute]["smape"] - ) + for measure in ("mae", "rmsd", "smape"): + ret["by_name"][name][attribute][f"{measure}_list"].append( + res["by_name"][name][attribute][measure] + ) for name in self.names: for attribute in self.by_name[name]["attributes"]: - ret["by_name"][name][attribute]["mae"] = np.mean( - ret["by_name"][name][attribute]["mae_list"] - ) - ret["by_name"][name][attribute]["smape"] = np.mean( - ret["by_name"][name][attribute]["smape_list"] - ) + for measure in ("mae", "rmsd", "smape"): + ret["by_name"][name][attribute][measure] = np.mean( + ret["by_name"][name][attribute][f"{measure}_list"] + ) return ret diff --git a/test/test_ptamodel.py b/test/test_ptamodel.py index 94ee842..e8905b1 100755 --- a/test/test_ptamodel.py +++ b/test/test_ptamodel.py @@ -2,13 +2,464 @@ from dfatool.loader import RawData, pta_trace_to_aggregate from dfatool.model import PTAModel +from dfatool.utils import by_name_to_by_param +from dfatool.validation import CrossValidator import os import unittest import pytest +import numpy as np -class TestModels(unittest.TestCase): - def test_model_singlefile_rf24(self): + +class TestSynthetic(unittest.TestCase): + def test_model_validation(self): + # rng = np.random.default_rng(seed=1312) # requiresy NumPy >= 1.17 + np.random.seed(1312) + X = np.arange(500) % 50 + parameter_names = ["p_mod5", "p_linear"] + + s1_duration_base = 70 + s1_duration_scale = 2 + s1_power_base = 50 + s1_power_scale = 7 + s2_duration_base = 700 + s2_duration_scale = 1 + s2_power_base = 1500 + s2_power_scale = 10 + + by_name = { + "raw_state_1": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s1_duration_base + + np.random.normal(size=X.size, scale=s1_duration_scale), + "power": s1_power_base + + X + + np.random.normal(size=X.size, scale=s1_power_scale), + "attributes": ["duration", "power"], + }, + "raw_state_2": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s2_duration_base + - 2 * X + + np.random.normal(size=X.size, scale=s2_duration_scale), + "power": s2_power_base + + X + + np.random.normal(size=X.size, scale=s2_power_scale), + "attributes": ["duration", "power"], + }, + } + by_param = by_name_to_by_param(by_name) + model = PTAModel(by_name, parameter_names, dict()) + static_model = model.get_static() + + # x ∈ [0, 50] -> mean(X) is 25 + self.assertAlmostEqual( + static_model("raw_state_1", "duration"), s1_duration_base, places=0 + ) + self.assertAlmostEqual( + static_model("raw_state_1", "power"), s1_power_base + 25, delta=7 + ) + self.assertAlmostEqual( + static_model("raw_state_2", "duration"), s2_duration_base - 2 * 25, delta=2 + ) + self.assertAlmostEqual( + static_model("raw_state_2", "power"), s2_power_base + 25, delta=7 + ) + + param_model, param_info = model.get_fitted() + + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 10]), + s1_duration_base, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 50]), + s1_duration_base, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 70]), + s1_duration_base, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 10]), + s1_power_base + 10, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 50]), + s1_power_base + 50, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 70]), + s1_power_base + 70, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 10]), + s2_duration_base - 2 * 10, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 50]), + s2_duration_base - 2 * 50, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 70]), + s2_duration_base - 2 * 70, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 10]), + s2_power_base + 10, + delta=50, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 50]), + s2_power_base + 50, + delta=50, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 70]), + s2_power_base + 70, + delta=50, + ) + + static_quality = model.assess(static_model) + param_quality = model.assess(param_model) + + # static quality reflects normal distribution scale for non-parameterized data + + # the Root Mean Square Deviation must not be greater the scale (i.e., standard deviation) of the normal distribution + # Low Mean Absolute Error (< 2) + self.assertTrue(static_quality["by_name"]["raw_state_1"]["duration"]["mae"] < 2) + # Low Root Mean Square Deviation (< scale == 2) + self.assertTrue( + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"] < 2 + ) + # Relatively low error percentage (~~ MAE * 100% / s1_duration_base) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["mape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + + # static error is high for parameterized data + + # MAE == mean(abs(actual value - model value)) + # parameter range is [0, 50) -> mean 25, deviation range is [0, 25) -> mean deviation is 12.5 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mae"], 12.5, delta=1 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["rmsd"], 16, delta=2 + ) + # high percentage error due to low s1_power_base + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mape"], 19, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["smape"], 19, delta=2 + ) + + # parameter range is [0, 100) -> mean deviation is 25 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mae"], 25, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], 30, delta=2 + ) + + # low percentage error due to high s2_duration_base (~~ 3.5 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mae"], 12.5, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["rmsd"], 17, delta=2 + ) + + # low percentage error due to high s2_power_base (~~ 1.7 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mape"], + 25 * 100 / s2_power_base, + delta=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["smape"], + 25 * 100 / s2_power_base, + delta=1, + ) + + # raw_state_1/duration does not depend on parameters and delegates to the static model + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mae"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mape"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + ) + + # fitted param-model quality reflects normal distribution scale for all data + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["mape"], 0.9, places=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["mae"] < s1_power_scale + ) + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["rmsd"] < s1_power_scale + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["mape"], 7.5, delta=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["smape"], 7.5, delta=1 + ) + + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mae"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mape"], + 0.12, + delta=0.01, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 0.12, + delta=0.01, + ) + + # ... unless the signal-to-noise ratio (parameter range = [0 .. 50] vs. scale = 10) is bad, leading to + # increased regression errors + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["mae"] < 15) + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["rmsd"] < 18) + + # still: low percentage error due to high s2_power_base + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["mape"], 0.9, places=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + def test_model_crossvalidation_10fold(self): + # rng = np.random.default_rng(seed=1312) # requiresy NumPy >= 1.17 + np.random.seed(1312) + X = np.arange(500) % 50 + parameter_names = ["p_mod5", "p_linear"] + + s1_duration_base = 70 + s1_duration_scale = 2 + s1_power_base = 50 + s1_power_scale = 7 + s2_duration_base = 700 + s2_duration_scale = 1 + s2_power_base = 1500 + s2_power_scale = 10 + + by_name = { + "raw_state_1": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s1_duration_base + + np.random.normal(size=X.size, scale=s1_duration_scale), + "power": s1_power_base + + X + + np.random.normal(size=X.size, scale=s1_power_scale), + "attributes": ["duration", "power"], + }, + "raw_state_2": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s2_duration_base + - 2 * X + + np.random.normal(size=X.size, scale=s2_duration_scale), + "power": s2_power_base + + X + + np.random.normal(size=X.size, scale=s2_power_scale), + "attributes": ["duration", "power"], + }, + } + by_param = by_name_to_by_param(by_name) + arg_count = dict() + model = PTAModel(by_name, parameter_names, arg_count) + validator = CrossValidator(PTAModel, by_name, parameter_names, arg_count) + + static_quality = validator.kfold(lambda m: m.get_static(), 10) + param_quality = validator.kfold(lambda m: m.get_fitted()[0], 10) + + print(static_quality) + + # static quality reflects normal distribution scale for non-parameterized data + + # the Root Mean Square Deviation must not be greater the scale (i.e., standard deviation) of the normal distribution + # Low Mean Absolute Error (< 2) + self.assertTrue(static_quality["by_name"]["raw_state_1"]["duration"]["mae"] < 2) + # Low Root Mean Square Deviation (< scale == 2) + self.assertTrue( + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"] < 2 + ) + # Relatively low error percentage (~~ MAE * 100% / s1_duration_base) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + + # static error is high for parameterized data + + # MAE == mean(abs(actual value - model value)) + # parameter range is [0, 50) -> mean 25, deviation range is [0, 25) -> mean deviation is 12.5 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mae"], 12.5, delta=1 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["rmsd"], 16, delta=2 + ) + # high percentage error due to low s1_power_base + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["smape"], 19, delta=2 + ) + + # parameter range is [0, 100) -> mean deviation is 25 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mae"], 25, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], 30, delta=2 + ) + + # low percentage error due to high s2_duration_base (~~ 3.5 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mae"], 12.5, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["rmsd"], 17, delta=2 + ) + + # low percentage error due to high s2_power_base (~~ 1.7 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["smape"], + 25 * 100 / s2_power_base, + delta=1, + ) + + # raw_state_1/duration does not depend on parameters and delegates to the static model + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mae"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + ) + + # fitted param-model quality reflects normal distribution scale for all data + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["mae"] < s1_power_scale + ) + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["rmsd"] < s1_power_scale + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["smape"], 7.5, delta=1 + ) + + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mae"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 0.12, + delta=0.01, + ) + + # ... unless the signal-to-noise ratio (parameter range = [0 .. 50] vs. scale = 10) is bad, leading to + # increased regression errors + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["mae"] < 15) + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["rmsd"] < 18) + + # still: low percentage error due to high s2_power_base + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + +class TestFromFile(unittest.TestCase): + def test_singlefile_rf24(self): raw_data = RawData(["test-data/20170220_164723_RF24_int_A.tar"]) preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) @@ -162,7 +613,7 @@ class TestModels(unittest.TestCase): param_model("RX", "power", param=[1, None, None]), 48647, places=-1 ) - def test_model_singlefile_mmparam(self): + def test_singlefile_mmparam(self): raw_data = RawData(["test-data/20161221_123347_mmparam.tar"]) preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) @@ -201,7 +652,7 @@ class TestModels(unittest.TestCase): param_lut_model("ON", "power", param=[None, None]), 17866, places=0 ) - def test_model_multifile_lm75x(self): + def test_multifile_lm75x(self): testfiles = [ "test-data/20170116_124500_LM75x.tar", "test-data/20170116_131306_LM75x.tar", @@ -243,7 +694,7 @@ class TestModels(unittest.TestCase): self.assertAlmostEqual(static_model("shutdown", "duration"), 6980, places=0) self.assertAlmostEqual(static_model("start", "duration"), 6980, places=0) - def test_model_multifile_sharp(self): + def test_multifile_sharp(self): testfiles = [ "test-data/20170116_145420_sharpLS013B4DN.tar", "test-data/20170116_151348_sharpLS013B4DN.tar", @@ -285,7 +736,7 @@ class TestModels(unittest.TestCase): self.assertAlmostEqual(static_model("sendLine", "duration"), 180, places=0) self.assertAlmostEqual(static_model("toggleVCOM", "duration"), 30, places=0) - def test_model_multifile_mmstatic(self): + def test_multifile_mmstatic(self): testfiles = [ "test-data/20170116_143516_mmstatic.tar", "test-data/20170116_142654_mmstatic.tar", @@ -325,7 +776,7 @@ class TestModels(unittest.TestCase): @pytest.mark.skipif( "TEST_SLOW" not in os.environ, reason="slow test, set TEST_SLOW=1 to run" ) - def test_model_multifile_cc1200(self): + def test_multifile_cc1200(self): testfiles = [ "test-data/20170125_125433_cc1200.tar", "test-data/20170125_142420_cc1200.tar", |