diff options
Diffstat (limited to 'test')
-rwxr-xr-x | test/test_codegen.py | 170 | ||||
-rwxr-xr-x | test/test_parameters.py | 228 | ||||
-rwxr-xr-x | test/test_pta.py | 917 | ||||
-rwxr-xr-x | test/test_ptamodel.py | 1003 | ||||
-rwxr-xr-x | test/test_timingharness.py | 200 |
5 files changed, 1785 insertions, 733 deletions
diff --git a/test/test_codegen.py b/test/test_codegen.py index 981117b..ce565d6 100755 --- a/test/test_codegen.py +++ b/test/test_codegen.py @@ -5,84 +5,74 @@ from dfatool.codegen import get_simulated_accountingmethod import unittest example_json_1 = { - 'parameters': ['datarate', 'txbytes', 'txpower'], - 'initial_param_values': [None, None, None], - 'state': { - 'IDLE': { - 'power': { - 'static': 5, - } - }, - 'TX': { - 'power': { - 'static': 100, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * parameter(txpower)', - 'regression_args': [100, 2] + "parameters": ["datarate", "txbytes", "txpower"], + "initial_param_values": [None, None, None], + "state": { + "IDLE": {"power": {"static": 5,}}, + "TX": { + "power": { + "static": 100, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" + " * parameter(txpower)", + "regression_args": [100, 2], }, } }, }, - 'transitions': [ + "transitions": [ { - 'name': 'init', - 'origin': ['UNINITIALIZED', 'IDLE'], - 'destination': 'IDLE', - 'duration': { - 'static': 50000, - }, - 'set_param': { - 'txpower': 10 - }, + "name": "init", + "origin": ["UNINITIALIZED", "IDLE"], + "destination": "IDLE", + "duration": {"static": 50000,}, + "set_param": {"txpower": 10}, }, { - 'name': 'setTxPower', - 'origin': 'IDLE', - 'destination': 'IDLE', - 'duration': {'static': 120}, - 'energy ': {'static': 10000}, - 'arg_to_param_map': {0: 'txpower'}, - 'argument_values': [[10, 20, 30]], + "name": "setTxPower", + "origin": "IDLE", + "destination": "IDLE", + "duration": {"static": 120}, + "energy ": {"static": 10000}, + "arg_to_param_map": {0: "txpower"}, + "argument_values": [[10, 20, 30]], }, { - 'name': 'send', - 'origin': 'IDLE', - 'destination': 'TX', - 'duration': { - 'static': 10, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args': [48, 8], + "name": "send", + "origin": "IDLE", + "destination": "TX", + "duration": { + "static": 10, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", + "regression_args": [48, 8], }, }, - 'energy': { - 'static': 3, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args': [3, 5], + "energy": { + "static": 3, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", + "regression_args": [3, 5], }, }, - 'arg_to_param_map': {1: 'txbytes'}, - 'argument_values': [['"foo"', '"hodor"'], [3, 5]], - 'argument_combination': 'zip', + "arg_to_param_map": {1: "txbytes"}, + "argument_values": [['"foo"', '"hodor"'], [3, 5]], + "argument_combination": "zip", }, { - 'name': 'txComplete', - 'origin': 'TX', - 'destination': 'IDLE', - 'is_interrupt': 1, - 'timeout': { - 'static': 2000, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * parameter(txbytes)', - 'regression_args': [500, 16], + "name": "txComplete", + "origin": "TX", + "destination": "IDLE", + "is_interrupt": 1, + "timeout": { + "static": 2000, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" + " * parameter(txbytes)", + "regression_args": [500, 16], }, }, - } + }, ], } @@ -91,9 +81,11 @@ class TestCG(unittest.TestCase): def test_statetransition_immediate(self): pta = PTA.from_json(example_json_1) pta.set_random_energy_model() - pta.state['IDLE'].power.value = 9 - cg = get_simulated_accountingmethod('static_statetransition_immediate')(pta, 1000000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t') - cg.current_state = pta.state['IDLE'] + pta.state["IDLE"].power.value = 9 + cg = get_simulated_accountingmethod("static_statetransition_immediate")( + pta, 1000000, "uint8_t", "uint8_t", "uint8_t", "uint8_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 9 * 7) pta.transitions[1].energy.value = 123 @@ -102,8 +94,10 @@ class TestCG(unittest.TestCase): cg.pass_transition(pta.transitions[1]) self.assertEqual(cg.get_energy(), (9 * 7 + 123 + 123) % 256) - cg = get_simulated_accountingmethod('static_statetransition_immediate')(pta, 100000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t') - cg.current_state = pta.state['IDLE'] + cg = get_simulated_accountingmethod("static_statetransition_immediate")( + pta, 100000, "uint8_t", "uint8_t", "uint8_t", "uint8_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 0) cg.sleep(15) @@ -111,8 +105,10 @@ class TestCG(unittest.TestCase): cg.sleep(90) self.assertEqual(cg.get_energy(), 900 % 256) - cg = get_simulated_accountingmethod('static_statetransition_immediate')(pta, 100000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint16_t') - cg.current_state = pta.state['IDLE'] + cg = get_simulated_accountingmethod("static_statetransition_immediate")( + pta, 100000, "uint8_t", "uint8_t", "uint8_t", "uint16_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 0) cg.sleep(15) @@ -120,10 +116,12 @@ class TestCG(unittest.TestCase): cg.sleep(90) self.assertEqual(cg.get_energy(), 900) - pta.state['IDLE'].power.value = 9 # -> 90 uW + pta.state["IDLE"].power.value = 9 # -> 90 uW pta.transitions[1].energy.value = 1 # -> 100 pJ - cg = get_simulated_accountingmethod('static_statetransition_immediate')(pta, 1000000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t', 1e-5, 1e-5, 1e-10) - cg.current_state = pta.state['IDLE'] + cg = get_simulated_accountingmethod("static_statetransition_immediate")( + pta, 1000000, "uint8_t", "uint8_t", "uint8_t", "uint8_t", 1e-5, 1e-5, 1e-10 + ) + cg.current_state = pta.state["IDLE"] cg.sleep(10) # 10 us self.assertEqual(cg.get_energy(), 90 * 10) cg.pass_transition(pta.transitions[1]) @@ -134,9 +132,11 @@ class TestCG(unittest.TestCase): def test_statetransition(self): pta = PTA.from_json(example_json_1) pta.set_random_energy_model() - pta.state['IDLE'].power.value = 9 - cg = get_simulated_accountingmethod('static_statetransition')(pta, 1000000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t') - cg.current_state = pta.state['IDLE'] + pta.state["IDLE"].power.value = 9 + cg = get_simulated_accountingmethod("static_statetransition")( + pta, 1000000, "uint8_t", "uint8_t", "uint8_t", "uint8_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 9 * 7) pta.transitions[1].energy.value = 123 @@ -148,9 +148,11 @@ class TestCG(unittest.TestCase): def test_state_immediate(self): pta = PTA.from_json(example_json_1) pta.set_random_energy_model() - pta.state['IDLE'].power.value = 9 - cg = get_simulated_accountingmethod('static_state_immediate')(pta, 1000000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t') - cg.current_state = pta.state['IDLE'] + pta.state["IDLE"].power.value = 9 + cg = get_simulated_accountingmethod("static_state_immediate")( + pta, 1000000, "uint8_t", "uint8_t", "uint8_t", "uint8_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 9 * 7) pta.transitions[1].energy.value = 123 @@ -162,9 +164,11 @@ class TestCG(unittest.TestCase): def test_state(self): pta = PTA.from_json(example_json_1) pta.set_random_energy_model() - pta.state['IDLE'].power.value = 9 - cg = get_simulated_accountingmethod('static_state')(pta, 1000000, 'uint8_t', 'uint8_t', 'uint8_t', 'uint8_t') - cg.current_state = pta.state['IDLE'] + pta.state["IDLE"].power.value = 9 + cg = get_simulated_accountingmethod("static_state")( + pta, 1000000, "uint8_t", "uint8_t", "uint8_t", "uint8_t" + ) + cg.current_state = pta.state["IDLE"] cg.sleep(7) self.assertEqual(cg.get_energy(), 9 * 7) pta.transitions[1].energy.value = 123 @@ -173,8 +177,10 @@ class TestCG(unittest.TestCase): cg.pass_transition(pta.transitions[1]) self.assertEqual(cg.get_energy(), 9 * 7) - cg = get_simulated_accountingmethod('static_state')(pta, 1000000, 'uint8_t', 'uint16_t', 'uint16_t', 'uint16_t') + cg = get_simulated_accountingmethod("static_state")( + pta, 1000000, "uint8_t", "uint16_t", "uint16_t", "uint16_t" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_parameters.py b/test/test_parameters.py new file mode 100755 index 0000000..e36b1a1 --- /dev/null +++ b/test/test_parameters.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 + +from dfatool import parameters +from dfatool.utils import by_name_to_by_param +from dfatool.functions import analytic +from dfatool.model import ParallelParamFit +import unittest + +import numpy as np + + +class TestModels(unittest.TestCase): + def test_distinct_param_values(self): + X = np.arange(35) + by_name = { + "TX": { + "param": [(x % 5, x % 7) for x in X], + "power": X, + "attributes": ["power"], + } + } + self.assertEqual( + parameters.distinct_param_values(by_name, "TX"), + [list(range(5)), list(range(7))], + ) + + def test_parameter_detection_linear(self): + # rng = np.random.default_rng(seed=1312) # requiresy NumPy >= 1.17 + np.random.seed(1312) + X = np.arange(200) % 50 + # Y = X + rng.normal(size=X.size) # requiry NumPy >= 1.17 + Y = X + np.random.normal(size=X.size) + parameter_names = ["p_mod5", "p_linear"] + + # Test input data: + # * param[0] ("p_mod5") == X % 5 (bogus data to test detection of non-influence) + # * param[1] ("p_linear") == X + # * TX power == X ± gaussian noise + # -> TX power depends linearly on "p_linear" + by_name = { + "TX": { + "param": [(x % 5, x) for x in X], + "power": Y, + "attributes": ["power"], + } + } + by_param = by_name_to_by_param(by_name) + stats = parameters.ParamStats(by_name, by_param, parameter_names, dict()) + + self.assertEqual(stats.depends_on_param("TX", "power", "p_mod5"), False) + self.assertEqual(stats.depends_on_param("TX", "power", "p_linear"), True) + + # Fit individual functions for each parameter (only "p_linear" in this case) + + paramfit = ParallelParamFit(by_param) + paramfit.enqueue("TX", "power", 1, "p_linear") + paramfit.fit() + + fit_result = paramfit.get_result("TX", "power") + self.assertEqual(fit_result["p_linear"]["best"], "linear") + self.assertEqual("p_mod5" not in fit_result, True) + + # Fit a single function for all parameters (still only "p_linear" in this case) + + combined_fit = analytic.function_powerset(fit_result, parameter_names, 0) + + self.assertEqual( + combined_fit.model_function, + "0 + regression_arg(0) + regression_arg(1) * parameter(p_linear)", + ) + self.assertEqual( + combined_fit._function_str, + "0 + reg_param[0] + reg_param[1] * model_param[1]", + ) + + combined_fit.fit(by_param, "TX", "power") + + self.assertEqual(combined_fit.fit_success, True) + + self.assertEqual(combined_fit.is_predictable([None, None]), False) + self.assertEqual(combined_fit.is_predictable([None, 0]), True) + self.assertEqual(combined_fit.is_predictable([None, 50]), True) + self.assertEqual(combined_fit.is_predictable([0, None]), False) + self.assertEqual(combined_fit.is_predictable([50, None]), False) + self.assertEqual(combined_fit.is_predictable([0, 0]), True) + self.assertEqual(combined_fit.is_predictable([0, 50]), True) + self.assertEqual(combined_fit.is_predictable([50, 0]), True) + self.assertEqual(combined_fit.is_predictable([50, 50]), True) + + # The function should be linear without offset or skew + for i in range(100): + self.assertAlmostEqual(combined_fit.eval([None, i]), i, places=0) + + def test_parameter_detection_multi_dimensional(self): + # rng = np.random.default_rng(seed=1312) # requires NumPy >= 1.17 + np.random.seed(1312) + # vary each parameter from 1 to 10 + Xi = (np.arange(50) % 10) + 1 + # Three parameters -> Build input array [[1, 1, 1], [1, 1, 2], ..., [10, 10, 10]] + X = np.array(np.meshgrid(Xi, Xi, Xi)).T.reshape(-1, 3) + + f_lls = np.vectorize( + lambda x: 42 + 7 * x[0] + 10 * np.log(x[1]) - 0.5 * x[2] * x[2], + signature="(n)->()", + ) + f_ll = np.vectorize( + lambda x: 23 + 5 * x[0] - 3 * x[0] / x[1], signature="(n)->()" + ) + + # Y_lls = f_lls(X) + rng.normal(size=X.shape[0]) # requires NumPy >= 1.17 + # Y_ll = f_ll(X) + rng.normal(size=X.shape[0]) # requires NumPy >= 1.17 + Y_lls = f_lls(X) + np.random.normal(size=X.shape[0]) + Y_ll = f_ll(X) + np.random.normal(size=X.shape[0]) + + parameter_names = ["lin_lin", "log_inv", "square_none"] + + by_name = { + "someKey": { + "param": X, + "lls": Y_lls, + "ll": Y_ll, + "attributes": ["lls", "ll"], + } + } + by_param = by_name_to_by_param(by_name) + stats = parameters.ParamStats(by_name, by_param, parameter_names, dict()) + + self.assertEqual(stats.depends_on_param("someKey", "lls", "lin_lin"), True) + self.assertEqual(stats.depends_on_param("someKey", "lls", "log_inv"), True) + self.assertEqual(stats.depends_on_param("someKey", "lls", "square_none"), True) + + self.assertEqual(stats.depends_on_param("someKey", "ll", "lin_lin"), True) + self.assertEqual(stats.depends_on_param("someKey", "ll", "log_inv"), True) + self.assertEqual(stats.depends_on_param("someKey", "ll", "square_none"), False) + + paramfit = ParallelParamFit(by_param) + paramfit.enqueue("someKey", "lls", 0, "lin_lin") + paramfit.enqueue("someKey", "lls", 1, "log_inv") + paramfit.enqueue("someKey", "lls", 2, "square_none") + paramfit.enqueue("someKey", "ll", 0, "lin_lin") + paramfit.enqueue("someKey", "ll", 1, "log_inv") + paramfit.fit() + + fit_lls = paramfit.get_result("someKey", "lls") + self.assertEqual(fit_lls["lin_lin"]["best"], "linear") + self.assertEqual(fit_lls["log_inv"]["best"], "logarithmic") + self.assertEqual(fit_lls["square_none"]["best"], "square") + + combined_fit_lls = analytic.function_powerset(fit_lls, parameter_names, 0) + + self.assertEqual( + combined_fit_lls.model_function, + "0 + regression_arg(0) + regression_arg(1) * parameter(lin_lin)" + " + regression_arg(2) * np.log(parameter(log_inv))" + " + regression_arg(3) * (parameter(square_none))**2" + " + regression_arg(4) * parameter(lin_lin) * np.log(parameter(log_inv))" + " + regression_arg(5) * parameter(lin_lin) * (parameter(square_none))**2" + " + regression_arg(6) * np.log(parameter(log_inv)) * (parameter(square_none))**2" + " + regression_arg(7) * parameter(lin_lin) * np.log(parameter(log_inv)) * (parameter(square_none))**2", + ) + + combined_fit_lls.fit(by_param, "someKey", "lls") + + self.assertEqual(combined_fit_lls.fit_success, True) + + # Verify that f_lls parameters have been found + self.assertAlmostEqual(combined_fit_lls.model_args[0], 42, places=0) + self.assertAlmostEqual(combined_fit_lls.model_args[1], 7, places=0) + self.assertAlmostEqual(combined_fit_lls.model_args[2], 10, places=0) + self.assertAlmostEqual(combined_fit_lls.model_args[3], -0.5, places=1) + self.assertAlmostEqual(combined_fit_lls.model_args[4], 0, places=2) + self.assertAlmostEqual(combined_fit_lls.model_args[5], 0, places=2) + self.assertAlmostEqual(combined_fit_lls.model_args[6], 0, places=2) + self.assertAlmostEqual(combined_fit_lls.model_args[7], 0, places=2) + + self.assertEqual(combined_fit_lls.is_predictable([None, None, None]), False) + self.assertEqual(combined_fit_lls.is_predictable([None, None, 11]), False) + self.assertEqual(combined_fit_lls.is_predictable([None, 11, None]), False) + self.assertEqual(combined_fit_lls.is_predictable([None, 11, 11]), False) + self.assertEqual(combined_fit_lls.is_predictable([11, None, None]), False) + self.assertEqual(combined_fit_lls.is_predictable([11, None, 11]), False) + self.assertEqual(combined_fit_lls.is_predictable([11, 11, None]), False) + self.assertEqual(combined_fit_lls.is_predictable([11, 11, 11]), True) + + # Verify that fitted function behaves like input function + for i, x in enumerate(X): + self.assertAlmostEqual(combined_fit_lls.eval(x), f_lls(x), places=0) + + fit_ll = paramfit.get_result("someKey", "ll") + self.assertEqual(fit_ll["lin_lin"]["best"], "linear") + self.assertEqual(fit_ll["log_inv"]["best"], "inverse") + self.assertEqual("quare_none" not in fit_ll, True) + + combined_fit_ll = analytic.function_powerset(fit_ll, parameter_names, 0) + + self.assertEqual( + combined_fit_ll.model_function, + "0 + regression_arg(0) + regression_arg(1) * parameter(lin_lin)" + " + regression_arg(2) * 1/(parameter(log_inv))" + " + regression_arg(3) * parameter(lin_lin) * 1/(parameter(log_inv))", + ) + + combined_fit_ll.fit(by_param, "someKey", "ll") + + self.assertEqual(combined_fit_ll.fit_success, True) + + # Verify that f_ll parameters have been found + self.assertAlmostEqual(combined_fit_ll.model_args[0], 23, places=0) + self.assertAlmostEqual(combined_fit_ll.model_args[1], 5, places=0) + self.assertAlmostEqual(combined_fit_ll.model_args[2], 0, places=1) + self.assertAlmostEqual(combined_fit_ll.model_args[3], -3, places=0) + + self.assertEqual(combined_fit_ll.is_predictable([None, None, None]), False) + self.assertEqual(combined_fit_ll.is_predictable([None, None, 11]), False) + self.assertEqual(combined_fit_ll.is_predictable([None, 11, None]), False) + self.assertEqual(combined_fit_ll.is_predictable([None, 11, 11]), False) + self.assertEqual(combined_fit_ll.is_predictable([11, None, None]), False) + self.assertEqual(combined_fit_ll.is_predictable([11, None, 11]), False) + self.assertEqual(combined_fit_ll.is_predictable([11, 11, None]), True) + self.assertEqual(combined_fit_ll.is_predictable([11, 11, 11]), True) + + # Verify that fitted function behaves like input function + for i, x in enumerate(X): + self.assertAlmostEqual(combined_fit_ll.eval(x), f_ll(x), places=0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_pta.py b/test/test_pta.py index 9f0778d..d43e702 100755 --- a/test/test_pta.py +++ b/test/test_pta.py @@ -5,88 +5,79 @@ import unittest import yaml example_json_1 = { - 'parameters': ['datarate', 'txbytes', 'txpower'], - 'initial_param_values': [None, None, None], - 'state': { - 'IDLE': { - 'power': { - 'static': 5, - } - }, - 'TX': { - 'power': { - 'static': 10000, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * parameter(txpower)', - 'regression_args': [10000, 2] + "parameters": ["datarate", "txbytes", "txpower"], + "initial_param_values": [None, None, None], + "state": { + "IDLE": {"power": {"static": 5,}}, + "TX": { + "power": { + "static": 10000, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" + " * parameter(txpower)", + "regression_args": [10000, 2], }, } }, }, - 'transitions': [ + "transitions": [ { - 'name': 'init', - 'origin': ['UNINITIALIZED', 'IDLE'], - 'destination': 'IDLE', - 'duration': { - 'static': 50000, - }, - 'set_param': { - 'txpower': 10 - }, + "name": "init", + "origin": ["UNINITIALIZED", "IDLE"], + "destination": "IDLE", + "duration": {"static": 50000,}, + "set_param": {"txpower": 10}, }, { - 'name': 'setTxPower', - 'origin': 'IDLE', - 'destination': 'IDLE', - 'duration': {'static': 120}, - 'energy ': {'static': 10000}, - 'arg_to_param_map': {0: 'txpower'}, - 'argument_values': [[10, 20, 30]], + "name": "setTxPower", + "origin": "IDLE", + "destination": "IDLE", + "duration": {"static": 120}, + "energy ": {"static": 10000}, + "arg_to_param_map": {0: "txpower"}, + "argument_values": [[10, 20, 30]], }, { - 'name': 'send', - 'origin': 'IDLE', - 'destination': 'TX', - 'duration': { - 'static': 10, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args': [48, 8], + "name": "send", + "origin": "IDLE", + "destination": "TX", + "duration": { + "static": 10, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", + "regression_args": [48, 8], }, }, - 'energy': { - 'static': 3, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args': [3, 5], + "energy": { + "static": 3, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", + "regression_args": [3, 5], }, }, - 'arg_to_param_map': {1: 'txbytes'}, - 'argument_values': [['"foo"', '"hodor"'], [3, 5]], - 'argument_combination': 'zip', + "arg_to_param_map": {1: "txbytes"}, + "argument_values": [['"foo"', '"hodor"'], [3, 5]], + "argument_combination": "zip", }, { - 'name': 'txComplete', - 'origin': 'TX', - 'destination': 'IDLE', - 'is_interrupt': 1, - 'timeout': { - 'static': 2000, - 'function': { - 'raw': 'regression_arg(0) + regression_arg(1)' - ' * parameter(txbytes)', - 'regression_args': [500, 16], + "name": "txComplete", + "origin": "TX", + "destination": "IDLE", + "is_interrupt": 1, + "timeout": { + "static": 2000, + "function": { + "raw": "regression_arg(0) + regression_arg(1)" + " * parameter(txbytes)", + "regression_args": [500, 16], }, }, - } + }, ], } -example_yaml_1 = yaml.safe_load(""" +example_yaml_1 = yaml.safe_load( + """ codegen: instance: cc1200 @@ -124,9 +115,11 @@ transition: src: [TX] dst: IDLE is_interrupt: true -""") +""" +) -example_yaml_2 = yaml.safe_load(""" +example_yaml_2 = yaml.safe_load( + """ codegen: instance: cc1200 @@ -169,9 +162,11 @@ transition: src: [TX] dst: IDLE is_interrupt: true -""") +""" +) -example_yaml_3 = yaml.safe_load(""" +example_yaml_3 = yaml.safe_load( + """ codegen: instance: nrf24l01 includes: ['driver/nrf24l01.h'] @@ -260,12 +255,17 @@ transition: - name: blocking values: [1, 1, 1, 1, 1, 1] argument_combination: zip -""") +""" +) -def dfs_tran_to_name(runs: list, with_args: bool = False, with_param: bool = False) -> list: +def dfs_tran_to_name( + runs: list, with_args: bool = False, with_param: bool = False +) -> list: if with_param: - return list(map(lambda run: list(map(lambda x: (x[0].name, x[1], x[2]), run)), runs)) + return list( + map(lambda run: list(map(lambda x: (x[0].name, x[1], x[2]), run)), runs) + ) if with_args: return list(map(lambda run: list(map(lambda x: (x[0].name, x[1]), run)), runs)) return list(map(lambda run: list(map(lambda x: (x[0].name), run)), runs)) @@ -273,117 +273,175 @@ def dfs_tran_to_name(runs: list, with_args: bool = False, with_param: bool = Fal class TestPTA(unittest.TestCase): def test_dfs(self): - pta = PTA(['IDLE', 'TX']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'TX', 'send') - pta.add_transition('TX', 'IDLE', 'txComplete') - self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [['init']]) - self.assertEqual(dfs_tran_to_name(pta.dfs(1), False), [['init', 'send']]) - self.assertEqual(dfs_tran_to_name(pta.dfs(2), False), [['init', 'send', 'txComplete']]) - self.assertEqual(dfs_tran_to_name(pta.dfs(3), False), [['init', 'send', 'txComplete', 'send']]) - - pta = PTA(['IDLE']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'IDLE', 'set1') - pta.add_transition('IDLE', 'IDLE', 'set2') - self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [['init']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1), False)), [['init', 'set1'], ['init', 'set2']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(2), False)), [['init', 'set1', 'set1'], - ['init', 'set1', 'set2'], - ['init', 'set2', 'set1'], - ['init', 'set2', 'set2']]) + pta = PTA(["IDLE", "TX"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "TX", "send") + pta.add_transition("TX", "IDLE", "txComplete") + self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [["init"]]) + self.assertEqual(dfs_tran_to_name(pta.dfs(1), False), [["init", "send"]]) + self.assertEqual( + dfs_tran_to_name(pta.dfs(2), False), [["init", "send", "txComplete"]] + ) + self.assertEqual( + dfs_tran_to_name(pta.dfs(3), False), + [["init", "send", "txComplete", "send"]], + ) + + pta = PTA(["IDLE"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "IDLE", "set1") + pta.add_transition("IDLE", "IDLE", "set2") + self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [["init"]]) + self.assertEqual( + sorted(dfs_tran_to_name(pta.dfs(1), False)), + [["init", "set1"], ["init", "set2"]], + ) + self.assertEqual( + sorted(dfs_tran_to_name(pta.dfs(2), False)), + [ + ["init", "set1", "set1"], + ["init", "set1", "set2"], + ["init", "set2", "set1"], + ["init", "set2", "set2"], + ], + ) def test_dfs_trace_filter(self): - pta = PTA(['IDLE']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'IDLE', 'set1') - pta.add_transition('IDLE', 'IDLE', 'set2') - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(2, trace_filter=[['init', 'set1', 'set2'], ['init', 'set2', 'set1']]), False)), - [['init', 'set1', 'set2'], ['init', 'set2', 'set1']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(2, trace_filter=[['init', 'set1', '$'], ['init', 'set2', '$']]), False)), - [['init', 'set1'], ['init', 'set2']]) + pta = PTA(["IDLE"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "IDLE", "set1") + pta.add_transition("IDLE", "IDLE", "set2") + self.assertEqual( + sorted( + dfs_tran_to_name( + pta.dfs( + 2, + trace_filter=[ + ["init", "set1", "set2"], + ["init", "set2", "set1"], + ], + ), + False, + ) + ), + [["init", "set1", "set2"], ["init", "set2", "set1"]], + ) + self.assertEqual( + sorted( + dfs_tran_to_name( + pta.dfs( + 2, trace_filter=[["init", "set1", "$"], ["init", "set2", "$"]] + ), + False, + ) + ), + [["init", "set1"], ["init", "set2"]], + ) def test_dfs_accepting(self): - pta = PTA(['IDLE', 'TX'], accepting_states=['IDLE']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'TX', 'send') - pta.add_transition('TX', 'IDLE', 'txComplete') - self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [['init']]) + pta = PTA(["IDLE", "TX"], accepting_states=["IDLE"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "TX", "send") + pta.add_transition("TX", "IDLE", "txComplete") + self.assertEqual(dfs_tran_to_name(pta.dfs(0), False), [["init"]]) self.assertEqual(dfs_tran_to_name(pta.dfs(1), False), []) - self.assertEqual(dfs_tran_to_name(pta.dfs(2), False), [['init', 'send', 'txComplete']]) + self.assertEqual( + dfs_tran_to_name(pta.dfs(2), False), [["init", "send", "txComplete"]] + ) self.assertEqual(dfs_tran_to_name(pta.dfs(3), False), []) def test_dfs_objects(self): - pta = PTA(['IDLE', 'TX']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'TX', 'send') - pta.add_transition('TX', 'IDLE', 'txComplete') + pta = PTA(["IDLE", "TX"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "TX", "send") + pta.add_transition("TX", "IDLE", "txComplete") traces = list(pta.dfs(2)) self.assertEqual(len(traces), 1) trace = traces[0] self.assertEqual(len(trace), 3) - self.assertEqual(trace[0][0].name, 'init') - self.assertEqual(trace[1][0].name, 'send') - self.assertEqual(trace[2][0].name, 'txComplete') + self.assertEqual(trace[0][0].name, "init") + self.assertEqual(trace[1][0].name, "send") + self.assertEqual(trace[2][0].name, "txComplete") self.assertEqual(pta.get_transition_id(trace[0][0]), 0) self.assertEqual(pta.get_transition_id(trace[1][0]), 1) self.assertEqual(pta.get_transition_id(trace[2][0]), 2) def test_dfs_with_sleep(self): - pta = PTA(['IDLE', 'TX']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'TX', 'send') - pta.add_transition('TX', 'IDLE', 'txComplete') + pta = PTA(["IDLE", "TX"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "TX", "send") + pta.add_transition("TX", "IDLE", "txComplete") traces = list(pta.dfs(2, sleep=10)) self.assertEqual(len(traces), 1) trace = traces[0] self.assertEqual(len(trace), 6) self.assertIsNone(trace[0][0]) - self.assertEqual(trace[1][0].name, 'init') + self.assertEqual(trace[1][0].name, "init") self.assertIsNone(trace[2][0]) - self.assertEqual(trace[3][0].name, 'send') + self.assertEqual(trace[3][0].name, "send") self.assertIsNone(trace[4][0]) - self.assertEqual(trace[5][0].name, 'txComplete') + self.assertEqual(trace[5][0].name, "txComplete") self.assertEqual(pta.get_transition_id(trace[1][0]), 0) self.assertEqual(pta.get_transition_id(trace[3][0]), 1) self.assertEqual(pta.get_transition_id(trace[5][0]), 2) def test_bfs(self): - pta = PTA(['IDLE', 'TX']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'TX', 'send') - pta.add_transition('TX', 'IDLE', 'txComplete') - self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']]) - self.assertEqual(dfs_tran_to_name(pta.bfs(1), False), [['init'], ['init', 'send']]) - self.assertEqual(dfs_tran_to_name(pta.bfs(2), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete']]) - self.assertEqual(dfs_tran_to_name(pta.bfs(3), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete'], ['init', 'send', 'txComplete', 'send']]) - - pta = PTA(['IDLE']) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init') - pta.add_transition('IDLE', 'IDLE', 'set1') - pta.add_transition('IDLE', 'IDLE', 'set2') - self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(1), False)), [['init'], ['init', 'set1'], ['init', 'set2']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(2), False)), [['init'], - ['init', 'set1'], - ['init', 'set1', 'set1'], - ['init', 'set1', 'set2'], - ['init', 'set2'], - ['init', 'set2', 'set1'], - ['init', 'set2', 'set2']]) + pta = PTA(["IDLE", "TX"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "TX", "send") + pta.add_transition("TX", "IDLE", "txComplete") + self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [["init"]]) + self.assertEqual( + dfs_tran_to_name(pta.bfs(1), False), [["init"], ["init", "send"]] + ) + self.assertEqual( + dfs_tran_to_name(pta.bfs(2), False), + [["init"], ["init", "send"], ["init", "send", "txComplete"]], + ) + self.assertEqual( + dfs_tran_to_name(pta.bfs(3), False), + [ + ["init"], + ["init", "send"], + ["init", "send", "txComplete"], + ["init", "send", "txComplete", "send"], + ], + ) + + pta = PTA(["IDLE"]) + pta.add_transition("UNINITIALIZED", "IDLE", "init") + pta.add_transition("IDLE", "IDLE", "set1") + pta.add_transition("IDLE", "IDLE", "set2") + self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [["init"]]) + self.assertEqual( + sorted(dfs_tran_to_name(pta.bfs(1), False)), + [["init"], ["init", "set1"], ["init", "set2"]], + ) + self.assertEqual( + sorted(dfs_tran_to_name(pta.bfs(2), False)), + [ + ["init"], + ["init", "set1"], + ["init", "set1", "set1"], + ["init", "set1", "set2"], + ["init", "set2"], + ["init", "set2", "set1"], + ["init", "set2", "set2"], + ], + ) def test_from_json(self): pta = PTA.from_json(example_json_1) - self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower']) - self.assertEqual(pta.state['UNINITIALIZED'].name, 'UNINITIALIZED') - self.assertEqual(pta.state['IDLE'].name, 'IDLE') - self.assertEqual(pta.state['TX'].name, 'TX') + self.assertEqual(pta.parameters, ["datarate", "txbytes", "txpower"]) + self.assertEqual(pta.state["UNINITIALIZED"].name, "UNINITIALIZED") + self.assertEqual(pta.state["IDLE"].name, "IDLE") + self.assertEqual(pta.state["TX"].name, "TX") self.assertEqual(len(pta.transitions), 5) - self.assertEqual(pta.transitions[0].name, 'init') - self.assertEqual(pta.transitions[1].name, 'init') - self.assertEqual(pta.transitions[2].name, 'setTxPower') - self.assertEqual(pta.transitions[3].name, 'send') - self.assertEqual(pta.transitions[4].name, 'txComplete') + self.assertEqual(pta.transitions[0].name, "init") + self.assertEqual(pta.transitions[1].name, "init") + self.assertEqual(pta.transitions[2].name, "setTxPower") + self.assertEqual(pta.transitions[3].name, "send") + self.assertEqual(pta.transitions[4].name, "txComplete") # def test_to_json(self): # pta = PTA.from_json(example_json_1) @@ -394,368 +452,471 @@ class TestPTA(unittest.TestCase): def test_from_json_dfs_arg(self): pta = PTA.from_json(example_json_1) - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1), False)), [['init', 'init'], ['init', 'send'], ['init', 'setTxPower']]) - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1, with_arguments=True), True)), - [ - [('init', ()), ('init', ())], - [('init', ()), ('send', ('"foo"', 3))], - [('init', ()), ('send', ('"hodor"', 5))], - [('init', ()), ('setTxPower', (10,))], - [('init', ()), ('setTxPower', (20,))], - [('init', ()), ('setTxPower', (30,))], - ] + self.assertEqual( + sorted(dfs_tran_to_name(pta.dfs(1), False)), + [["init", "init"], ["init", "send"], ["init", "setTxPower"]], + ) + self.assertEqual( + sorted(dfs_tran_to_name(pta.dfs(1, with_arguments=True), True)), + [ + [("init", ()), ("init", ())], + [("init", ()), ("send", ('"foo"', 3))], + [("init", ()), ("send", ('"hodor"', 5))], + [("init", ()), ("setTxPower", (10,))], + [("init", ()), ("setTxPower", (20,))], + [("init", ()), ("setTxPower", (30,))], + ], ) def test_from_json_dfs_param(self): pta = PTA.from_json(example_json_1) no_param = { - 'datarate': None, - 'txbytes': None, - 'txpower': 10, + "datarate": None, + "txbytes": None, + "txpower": 10, } param_tx3 = { - 'datarate': None, - 'txbytes': 3, - 'txpower': 10, + "datarate": None, + "txbytes": 3, + "txpower": 10, } param_tx5 = { - 'datarate': None, - 'txbytes': 5, - 'txpower': 10, + "datarate": None, + "txbytes": 5, + "txpower": 10, } param_txp10 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 10, + "datarate": None, + "txbytes": None, + "txpower": 10, } param_txp20 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 20, + "datarate": None, + "txbytes": None, + "txpower": 20, } param_txp30 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 30, + "datarate": None, + "txbytes": None, + "txpower": 30, } - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1, with_arguments=True, with_parameters=True), True, True)), - [ - [('init', (), no_param), ('init', (), no_param)], - [('init', (), no_param), ('send', ('"foo"', 3), param_tx3)], - [('init', (), no_param), ('send', ('"hodor"', 5), param_tx5)], - [('init', (), no_param), ('setTxPower', (10,), param_txp10)], - [('init', (), no_param), ('setTxPower', (20,), param_txp20)], - [('init', (), no_param), ('setTxPower', (30,), param_txp30)], - ] + self.assertEqual( + sorted( + dfs_tran_to_name( + pta.dfs(1, with_arguments=True, with_parameters=True), True, True + ) + ), + [ + [("init", (), no_param), ("init", (), no_param)], + [("init", (), no_param), ("send", ('"foo"', 3), param_tx3)], + [("init", (), no_param), ("send", ('"hodor"', 5), param_tx5)], + [("init", (), no_param), ("setTxPower", (10,), param_txp10)], + [("init", (), no_param), ("setTxPower", (20,), param_txp20)], + [("init", (), no_param), ("setTxPower", (30,), param_txp30)], + ], ) def test_from_json_function(self): pta = PTA.from_json(example_json_1) - self.assertEqual(pta.state['TX'].get_energy(1000, {'datarate': 10, 'txbytes': 6, 'txpower': 10}), 1000 * (10000 + 2 * 10)) - self.assertEqual(pta.transitions[4].get_timeout({'datarate': 10, 'txbytes': 6, 'txpower': 10}), 500 + 16 * 6) + self.assertEqual( + pta.state["TX"].get_energy( + 1000, {"datarate": 10, "txbytes": 6, "txpower": 10} + ), + 1000 * (10000 + 2 * 10), + ) + self.assertEqual( + pta.transitions[4].get_timeout( + {"datarate": 10, "txbytes": 6, "txpower": 10} + ), + 500 + 16 * 6, + ) def test_from_yaml_dfs_param(self): pta = PTA.from_yaml(example_yaml_1) no_param = { - 'datarate': None, - 'txbytes': None, - 'txpower': None, + "datarate": None, + "txbytes": None, + "txpower": None, } param_tx3 = { - 'datarate': None, - 'txbytes': 3, - 'txpower': None, + "datarate": None, + "txbytes": 3, + "txpower": None, } param_tx5 = { - 'datarate': None, - 'txbytes': 5, - 'txpower': None, + "datarate": None, + "txbytes": 5, + "txpower": None, } param_txp10 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 10, + "datarate": None, + "txbytes": None, + "txpower": 10, } param_txp20 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 20, + "datarate": None, + "txbytes": None, + "txpower": 20, } param_txp30 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 30, + "datarate": None, + "txbytes": None, + "txpower": 30, } - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1, with_arguments=True, with_parameters=True), True, True)), - [ - [('init', (), no_param), ('init', (), no_param)], - [('init', (), no_param), ('send', ('"foo"', 3), param_tx3)], - [('init', (), no_param), ('send', ('"hodor"', 5), param_tx5)], - [('init', (), no_param), ('setTxPower', (10,), param_txp10)], - [('init', (), no_param), ('setTxPower', (20,), param_txp20)], - [('init', (), no_param), ('setTxPower', (30,), param_txp30)], - ] + self.assertEqual( + sorted( + dfs_tran_to_name( + pta.dfs(1, with_arguments=True, with_parameters=True), True, True + ) + ), + [ + [("init", (), no_param), ("init", (), no_param)], + [("init", (), no_param), ("send", ('"foo"', 3), param_tx3)], + [("init", (), no_param), ("send", ('"hodor"', 5), param_tx5)], + [("init", (), no_param), ("setTxPower", (10,), param_txp10)], + [("init", (), no_param), ("setTxPower", (20,), param_txp20)], + [("init", (), no_param), ("setTxPower", (30,), param_txp30)], + ], ) def test_normalization(self): pta = PTA.from_yaml(example_yaml_2) no_param = { - 'datarate': None, - 'txbytes': None, - 'txpower': None, + "datarate": None, + "txbytes": None, + "txpower": None, } param_tx3 = { - 'datarate': None, - 'txbytes': 3, - 'txpower': None, + "datarate": None, + "txbytes": 3, + "txpower": None, } param_tx6 = { - 'datarate': None, - 'txbytes': 6, - 'txpower': None, + "datarate": None, + "txbytes": 6, + "txpower": None, } param_txp10 = { - 'datarate': None, - 'txbytes': None, - 'txpower': -6, + "datarate": None, + "txbytes": None, + "txpower": -6, } param_txp20 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 4, + "datarate": None, + "txbytes": None, + "txpower": 4, } param_txp30 = { - 'datarate': None, - 'txbytes': None, - 'txpower': 14, + "datarate": None, + "txbytes": None, + "txpower": 14, } - self.assertEqual(sorted(dfs_tran_to_name(pta.dfs(1, with_arguments=True, with_parameters=True), True, True)), - [ - [('init', (), no_param), ('init', (), no_param)], - [('init', (), no_param), ('send', ('FOO',), param_tx3)], - [('init', (), no_param), ('send', ('LONGER',), param_tx6)], - [('init', (), no_param), ('setTxPower', (10,), param_txp10)], - [('init', (), no_param), ('setTxPower', (20,), param_txp20)], - [('init', (), no_param), ('setTxPower', (30,), param_txp30)], - ] + self.assertEqual( + sorted( + dfs_tran_to_name( + pta.dfs(1, with_arguments=True, with_parameters=True), True, True + ) + ), + [ + [("init", (), no_param), ("init", (), no_param)], + [("init", (), no_param), ("send", ("FOO",), param_tx3)], + [("init", (), no_param), ("send", ("LONGER",), param_tx6)], + [("init", (), no_param), ("setTxPower", (10,), param_txp10)], + [("init", (), no_param), ("setTxPower", (20,), param_txp20)], + [("init", (), no_param), ("setTxPower", (30,), param_txp30)], + ], ) def test_shrink(self): pta = PTA.from_yaml(example_yaml_3) pta.shrink_argument_values() - self.assertEqual(pta.transitions[0].name, 'setAutoAck') - self.assertEqual(pta.transitions[1].name, 'setPALevel') - self.assertEqual(pta.transitions[2].name, 'setRetries') - self.assertEqual(pta.transitions[3].name, 'setup') - self.assertEqual(pta.transitions[4].name, 'setup') - self.assertEqual(pta.transitions[5].name, 'write') + self.assertEqual(pta.transitions[0].name, "setAutoAck") + self.assertEqual(pta.transitions[1].name, "setPALevel") + self.assertEqual(pta.transitions[2].name, "setRetries") + self.assertEqual(pta.transitions[3].name, "setup") + self.assertEqual(pta.transitions[4].name, "setup") + self.assertEqual(pta.transitions[5].name, "write") self.assertEqual(pta.transitions[0].argument_values, [[0, 1]]) - self.assertEqual(pta.transitions[1].argument_values, [['Nrf24l01::RF24_PA_MIN', 'Nrf24l01::RF24_PA_MAX']]) + self.assertEqual( + pta.transitions[1].argument_values, + [["Nrf24l01::RF24_PA_MIN", "Nrf24l01::RF24_PA_MAX"]], + ) self.assertEqual(pta.transitions[2].argument_values, [[0, 15], [0, 15]]) - self.assertEqual(pta.transitions[5].argument_values, [['"foo"', '"foo"', '"foofoofoo"', '"foofoofoo"', '"123456789012345678901234567890"', - '"123456789012345678901234567890"'], [3, 3, 9, 9, 30, 30], [0, 1, 0, 1, 0, 1], [1, 1, 1, 1, 1, 1]]) + self.assertEqual( + pta.transitions[5].argument_values, + [ + [ + '"foo"', + '"foo"', + '"foofoofoo"', + '"foofoofoo"', + '"123456789012345678901234567890"', + '"123456789012345678901234567890"', + ], + [3, 3, 9, 9, 30, 30], + [0, 1, 0, 1, 0, 1], + [1, 1, 1, 1, 1, 1], + ], + ) def test_simulation(self): pta = PTA() - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', duration=50000) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition("UNINITIALIZED", "IDLE", "init", duration=50000) + pta.add_transition("IDLE", "TX", "send", energy=3, duration=10) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) trace = [ - ['init'], + ["init"], [None, 10000000], - ['send', 'foo', 3], + ["send", "foo", 3], [None, 5000000], - ['send', 'foo', 3] + ["send", "foo", 3], ] - expected_energy = 5. * 10000000 + 3 + 100 * 2000 + 5 * 5000000 + 3 + 100 * 2000 + expected_energy = 5.0 * 10000000 + 3 + 100 * 2000 + 5 * 5000000 + 3 + 100 * 2000 expected_duration = 50000 + 10000000 + 10 + 2000 + 5000000 + 10 + 2000 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') + self.assertEqual(result.end_state.name, "IDLE") self.assertEqual(result.parameters, {}) def test_simulation_param_none(self): - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition("IDLE", "TX", "send", energy=3, duration=10) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) trace = [ - ['init'], + ["init"], ] expected_energy = 500000 expected_duration = 50000 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': None, - 'length': None - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": None, "length": None}) def test_simulation_param_update_function(self): - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy=10000, duration=120, - param_update_function=lambda param, arg: {**param, 'txpower': arg[0]}) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) - trace = [ - ['init'], - ['setTxPower', 10] - ] + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition( + "IDLE", + "IDLE", + "setTxPower", + energy=10000, + duration=120, + param_update_function=lambda param, arg: {**param, "txpower": arg[0]}, + ) + pta.add_transition("IDLE", "TX", "send", energy=3, duration=10) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) + trace = [["init"], ["setTxPower", 10]] expected_energy = 510000 expected_duration = 50120 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': None - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": None}) def test_simulation_arg_to_param_map(self): - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy=10000, duration=120, - arg_to_param_map={0: 'txpower'}) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) - trace = [ - ['init'], - ['setTxPower', 10] - ] + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition( + "IDLE", + "IDLE", + "setTxPower", + energy=10000, + duration=120, + arg_to_param_map={0: "txpower"}, + ) + pta.add_transition("IDLE", "TX", "send", energy=3, duration=10) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) + trace = [["init"], ["setTxPower", 10]] expected_energy = 510000 expected_duration = 50120 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': None - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": None}) def test_simulation_set_param(self): - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000, set_param={'txpower': 10}) + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", + "IDLE", + "init", + energy=500000, + duration=50000, + set_param={"txpower": 10}, + ) trace = [ - ['init'], + ["init"], ] expected_energy = 500000 expected_duration = 50000 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': None - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": None}) def test_simulation_arg_function(self): - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy=10000, duration=120, - param_update_function=lambda param, arg: {**param, 'txpower': arg[0]}) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], - duration_function=lambda param, arg: 48 + 8 * arg[1]) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition( + "IDLE", + "IDLE", + "setTxPower", + energy=10000, + duration=120, + param_update_function=lambda param, arg: {**param, "txpower": arg[0]}, + ) + pta.add_transition( + "IDLE", + "TX", + "send", + energy=3, + duration=10, + energy_function=lambda param, arg: 3 + 5 * arg[1], + duration_function=lambda param, arg: 48 + 8 * arg[1], + ) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) trace = [ - ['init'], - ['setTxPower', 10], - ['send', 'foo', 3], + ["init"], + ["setTxPower", 10], + ["send", "foo", 3], ] expected_energy = 500000 + 10000 + (3 + 5 * 3) + (2000 * 100) expected_duration = 50000 + 120 + (48 + 8 * 3) + 2000 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': None - }) - - pta = PTA(parameters=['txpower', 'length']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy=10000, duration=120, - param_update_function=lambda param, arg: {**param, 'txpower': arg[0]}) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], - duration_function=lambda param, arg: 48 + 8 * arg[1]) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": None}) + + pta = PTA(parameters=["txpower", "length"]) + pta.add_state("IDLE", power=5) + pta.add_state("TX", power=100) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition( + "IDLE", + "IDLE", + "setTxPower", + energy=10000, + duration=120, + param_update_function=lambda param, arg: {**param, "txpower": arg[0]}, + ) + pta.add_transition( + "IDLE", + "TX", + "send", + energy=3, + duration=10, + energy_function=lambda param, arg: 3 + 5 * arg[1], + duration_function=lambda param, arg: 48 + 8 * arg[1], + ) + pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) trace = [ - ['init'], - ['setTxPower', 10], - ['send', 'foobar', 6], + ["init"], + ["setTxPower", 10], + ["send", "foobar", 6], ] expected_energy = 500000 + 10000 + (3 + 5 * 6) + (2000 * 100) expected_duration = 50000 + 120 + (48 + 8 * 6) + 2000 result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': None - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": None}) def test_simulation_param_function(self): - pta = PTA(parameters=['length', 'txpower']) - pta.add_state('IDLE', power=5) - pta.add_state('TX', power=100, - power_function=lambda param, arg: 1000 + 2 * param[1]) - pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy=500000, duration=50000) - pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy=10000, duration=120, - param_update_function=lambda param, arg: {**param, 'txpower': arg[0]}) - pta.add_transition('IDLE', 'TX', 'send', energy=3, duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], - param_update_function=lambda param, arg: {**param, 'length': arg[1]}) - pta.add_transition('TX', 'IDLE', 'txComplete', timeout=2000, is_interrupt=True, - timeout_function=lambda param, arg: 500 + 16 * param[0]) + pta = PTA(parameters=["length", "txpower"]) + pta.add_state("IDLE", power=5) + pta.add_state( + "TX", power=100, power_function=lambda param, arg: 1000 + 2 * param[1] + ) + pta.add_transition( + "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 + ) + pta.add_transition( + "IDLE", + "IDLE", + "setTxPower", + energy=10000, + duration=120, + param_update_function=lambda param, arg: {**param, "txpower": arg[0]}, + ) + pta.add_transition( + "IDLE", + "TX", + "send", + energy=3, + duration=10, + energy_function=lambda param, arg: 3 + 5 * arg[1], + param_update_function=lambda param, arg: {**param, "length": arg[1]}, + ) + pta.add_transition( + "TX", + "IDLE", + "txComplete", + timeout=2000, + is_interrupt=True, + timeout_function=lambda param, arg: 500 + 16 * param[0], + ) trace = [ - ['init'], - ['setTxPower', 10], - ['send', 'foo', 3], + ["init"], + ["setTxPower", 10], + ["send", "foo", 3], ] - expected_energy = 500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3) + expected_energy = ( + 500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3) + ) expected_duration = 50000 + 120 + 10 + (500 + 16 * 3) result = pta.simulate(trace) self.assertAlmostEqual(result.energy, expected_energy * 1e-12, places=12) self.assertAlmostEqual(result.duration, expected_duration * 1e-6, places=6) - self.assertEqual(result.end_state.name, 'IDLE') - self.assertEqual(result.parameters, { - 'txpower': 10, - 'length': 3 - }) + self.assertEqual(result.end_state.name, "IDLE") + self.assertEqual(result.parameters, {"txpower": 10, "length": 3}) def test_get_X_expensive_state(self): pta = PTA.from_json(example_json_1) - self.assertEqual(pta.get_least_expensive_state(), pta.state['IDLE']) - self.assertEqual(pta.get_most_expensive_state(), pta.state['TX']) + self.assertEqual(pta.get_least_expensive_state(), pta.state["IDLE"]) + self.assertEqual(pta.get_most_expensive_state(), pta.state["TX"]) # self.assertAlmostEqual(pta.min_duration_until_energy_overflow(), (2**32 - 1) * 1e-12 / 10e-3, places=9) # self.assertAlmostEqual(pta.min_duration_until_energy_overflow(energy_granularity=1e-9), (2**32 - 1) * 1e-9 / 10e-3, places=9) - self.assertAlmostEqual(pta.max_duration_until_energy_overflow(), (2**32 - 1) * 1e-12 / 5e-6, places=9) - self.assertAlmostEqual(pta.max_duration_until_energy_overflow(energy_granularity=1e-9), (2**32 - 1) * 1e-9 / 5e-6, places=9) + self.assertAlmostEqual( + pta.max_duration_until_energy_overflow(), + (2 ** 32 - 1) * 1e-12 / 5e-6, + places=9, + ) + self.assertAlmostEqual( + pta.max_duration_until_energy_overflow(energy_granularity=1e-9), + (2 ** 32 - 1) * 1e-9 / 5e-6, + places=9, + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_ptamodel.py b/test/test_ptamodel.py index 7d501e6..e8905b1 100755 --- a/test/test_ptamodel.py +++ b/test/test_ptamodel.py @@ -1,248 +1,843 @@ #!/usr/bin/env python3 -from dfatool.dfatool import PTAModel, RawData, pta_trace_to_aggregate +from dfatool.loader import RawData, pta_trace_to_aggregate +from dfatool.model import PTAModel +from dfatool.utils import by_name_to_by_param +from dfatool.validation import CrossValidator import os import unittest import pytest +import numpy as np -class TestModels(unittest.TestCase): - def test_model_singlefile_rf24(self): - raw_data = RawData(['test-data/20170220_164723_RF24_int_A.tar']) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + +class TestSynthetic(unittest.TestCase): + def test_model_validation(self): + # rng = np.random.default_rng(seed=1312) # requiresy NumPy >= 1.17 + np.random.seed(1312) + X = np.arange(500) % 50 + parameter_names = ["p_mod5", "p_linear"] + + s1_duration_base = 70 + s1_duration_scale = 2 + s1_power_base = 50 + s1_power_scale = 7 + s2_duration_base = 700 + s2_duration_scale = 1 + s2_power_base = 1500 + s2_power_scale = 10 + + by_name = { + "raw_state_1": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s1_duration_base + + np.random.normal(size=X.size, scale=s1_duration_scale), + "power": s1_power_base + + X + + np.random.normal(size=X.size, scale=s1_power_scale), + "attributes": ["duration", "power"], + }, + "raw_state_2": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s2_duration_base + - 2 * X + + np.random.normal(size=X.size, scale=s2_duration_scale), + "power": s2_power_base + + X + + np.random.normal(size=X.size, scale=s2_power_scale), + "attributes": ["duration", "power"], + }, + } + by_param = by_name_to_by_param(by_name) + model = PTAModel(by_name, parameter_names, dict()) + static_model = model.get_static() + + # x ∈ [0, 50] -> mean(X) is 25 + self.assertAlmostEqual( + static_model("raw_state_1", "duration"), s1_duration_base, places=0 + ) + self.assertAlmostEqual( + static_model("raw_state_1", "power"), s1_power_base + 25, delta=7 + ) + self.assertAlmostEqual( + static_model("raw_state_2", "duration"), s2_duration_base - 2 * 25, delta=2 + ) + self.assertAlmostEqual( + static_model("raw_state_2", "power"), s2_power_base + 25, delta=7 + ) + + param_model, param_info = model.get_fitted() + + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 10]), + s1_duration_base, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 50]), + s1_duration_base, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "duration", param=[0, 70]), + s1_duration_base, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 10]), + s1_power_base + 10, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 50]), + s1_power_base + 50, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_1", "power", param=[0, 70]), + s1_power_base + 70, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 10]), + s2_duration_base - 2 * 10, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 50]), + s2_duration_base - 2 * 50, + places=0, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "duration", param=[0, 70]), + s2_duration_base - 2 * 70, + places=0, + ) + + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 10]), + s2_power_base + 10, + delta=50, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 50]), + s2_power_base + 50, + delta=50, + ) + self.assertAlmostEqual( + param_model("raw_state_2", "power", param=[0, 70]), + s2_power_base + 70, + delta=50, + ) + + static_quality = model.assess(static_model) + param_quality = model.assess(param_model) + + # static quality reflects normal distribution scale for non-parameterized data + + # the Root Mean Square Deviation must not be greater the scale (i.e., standard deviation) of the normal distribution + # Low Mean Absolute Error (< 2) + self.assertTrue(static_quality["by_name"]["raw_state_1"]["duration"]["mae"] < 2) + # Low Root Mean Square Deviation (< scale == 2) + self.assertTrue( + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"] < 2 + ) + # Relatively low error percentage (~~ MAE * 100% / s1_duration_base) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["mape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + + # static error is high for parameterized data + + # MAE == mean(abs(actual value - model value)) + # parameter range is [0, 50) -> mean 25, deviation range is [0, 25) -> mean deviation is 12.5 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mae"], 12.5, delta=1 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["rmsd"], 16, delta=2 + ) + # high percentage error due to low s1_power_base + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mape"], 19, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["smape"], 19, delta=2 + ) + + # parameter range is [0, 100) -> mean deviation is 25 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mae"], 25, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], 30, delta=2 + ) + + # low percentage error due to high s2_duration_base (~~ 3.5 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mae"], 12.5, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["rmsd"], 17, delta=2 + ) + + # low percentage error due to high s2_power_base (~~ 1.7 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mape"], + 25 * 100 / s2_power_base, + delta=1, + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["smape"], + 25 * 100 / s2_power_base, + delta=1, + ) + + # raw_state_1/duration does not depend on parameters and delegates to the static model + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mae"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mape"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + ) + + # fitted param-model quality reflects normal distribution scale for all data + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["mape"], 0.9, places=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["mae"] < s1_power_scale + ) + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["rmsd"] < s1_power_scale + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["mape"], 7.5, delta=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["smape"], 7.5, delta=1 + ) + + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mae"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mape"], + 0.12, + delta=0.01, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 0.12, + delta=0.01, + ) + + # ... unless the signal-to-noise ratio (parameter range = [0 .. 50] vs. scale = 10) is bad, leading to + # increased regression errors + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["mae"] < 15) + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["rmsd"] < 18) + + # still: low percentage error due to high s2_power_base + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["mape"], 0.9, places=1 + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + def test_model_crossvalidation_10fold(self): + # rng = np.random.default_rng(seed=1312) # requiresy NumPy >= 1.17 + np.random.seed(1312) + X = np.arange(500) % 50 + parameter_names = ["p_mod5", "p_linear"] + + s1_duration_base = 70 + s1_duration_scale = 2 + s1_power_base = 50 + s1_power_scale = 7 + s2_duration_base = 700 + s2_duration_scale = 1 + s2_power_base = 1500 + s2_power_scale = 10 + + by_name = { + "raw_state_1": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s1_duration_base + + np.random.normal(size=X.size, scale=s1_duration_scale), + "power": s1_power_base + + X + + np.random.normal(size=X.size, scale=s1_power_scale), + "attributes": ["duration", "power"], + }, + "raw_state_2": { + "isa": "state", + "param": [(x % 5, x) for x in X], + "duration": s2_duration_base + - 2 * X + + np.random.normal(size=X.size, scale=s2_duration_scale), + "power": s2_power_base + + X + + np.random.normal(size=X.size, scale=s2_power_scale), + "attributes": ["duration", "power"], + }, + } + by_param = by_name_to_by_param(by_name) + arg_count = dict() + model = PTAModel(by_name, parameter_names, arg_count) + validator = CrossValidator(PTAModel, by_name, parameter_names, arg_count) + + static_quality = validator.kfold(lambda m: m.get_static(), 10) + param_quality = validator.kfold(lambda m: m.get_fitted()[0], 10) + + print(static_quality) + + # static quality reflects normal distribution scale for non-parameterized data + + # the Root Mean Square Deviation must not be greater the scale (i.e., standard deviation) of the normal distribution + # Low Mean Absolute Error (< 2) + self.assertTrue(static_quality["by_name"]["raw_state_1"]["duration"]["mae"] < 2) + # Low Root Mean Square Deviation (< scale == 2) + self.assertTrue( + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"] < 2 + ) + # Relatively low error percentage (~~ MAE * 100% / s1_duration_base) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"] + * 100 + / s1_duration_base, + places=1, + ) + + # static error is high for parameterized data + + # MAE == mean(abs(actual value - model value)) + # parameter range is [0, 50) -> mean 25, deviation range is [0, 25) -> mean deviation is 12.5 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["mae"], 12.5, delta=1 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["rmsd"], 16, delta=2 + ) + # high percentage error due to low s1_power_base + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_1"]["power"]["smape"], 19, delta=2 + ) + + # parameter range is [0, 100) -> mean deviation is 25 ± gauss scale + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["mae"], 25, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], 30, delta=2 + ) + + # low percentage error due to high s2_duration_base (~~ 3.5 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 25 * 100 / s2_duration_base, + delta=1, + ) + + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["mae"], 12.5, delta=2 + ) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["rmsd"], 17, delta=2 + ) + + # low percentage error due to high s2_power_base (~~ 1.7 %) + self.assertAlmostEqual( + static_quality["by_name"]["raw_state_2"]["power"]["smape"], + 25 * 100 / s2_power_base, + delta=1, + ) + + # raw_state_1/duration does not depend on parameters and delegates to the static model + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["mae"], + static_quality["by_name"]["raw_state_1"]["duration"]["mae"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + static_quality["by_name"]["raw_state_1"]["duration"]["rmsd"], + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["duration"]["smape"], + static_quality["by_name"]["raw_state_1"]["duration"]["smape"], + ) + + # fitted param-model quality reflects normal distribution scale for all data + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["mae"] < s1_power_scale + ) + self.assertTrue( + param_quality["by_name"]["raw_state_1"]["power"]["rmsd"] < s1_power_scale + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_1"]["power"]["smape"], 7.5, delta=1 + ) + + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["mae"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["rmsd"], + s2_duration_scale, + delta=0.2, + ) + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["duration"]["smape"], + 0.12, + delta=0.01, + ) + + # ... unless the signal-to-noise ratio (parameter range = [0 .. 50] vs. scale = 10) is bad, leading to + # increased regression errors + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["mae"] < 15) + self.assertTrue(param_quality["by_name"]["raw_state_2"]["power"]["rmsd"] < 18) + + # still: low percentage error due to high s2_power_base + self.assertAlmostEqual( + param_quality["by_name"]["raw_state_2"]["power"]["smape"], 0.9, places=1 + ) + + +class TestFromFile(unittest.TestCase): + def test_singlefile_rf24(self): + raw_data = RawData(["test-data/20170220_164723_RF24_int_A.tar"]) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'POWERDOWN RX STANDBY1 TX'.split(' ')) - self.assertEqual(model.transitions(), 'begin epilogue powerDown powerUp setDataRate_num setPALevel_num startListening stopListening write_nb'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual(model.states(), "POWERDOWN RX STANDBY1 TX".split(" ")) + self.assertEqual( + model.transitions(), + "begin epilogue powerDown powerUp setDataRate_num setPALevel_num startListening stopListening write_nb".split( + " " + ), + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('POWERDOWN', 'power'), 0, places=0) - self.assertAlmostEqual(static_model('RX', 'power'), 52254, places=0) - self.assertAlmostEqual(static_model('STANDBY1', 'power'), 7, places=0) - self.assertAlmostEqual(static_model('TX', 'power'), 18414, places=0) - self.assertAlmostEqual(static_model('begin', 'energy'), 1652249, places=0) - self.assertAlmostEqual(static_model('epilogue', 'energy'), 15449, places=0) - self.assertAlmostEqual(static_model('powerDown', 'energy'), 4547, places=0) - self.assertAlmostEqual(static_model('powerUp', 'energy'), 1641765, places=0) - self.assertAlmostEqual(static_model('setDataRate_num', 'energy'), 7749, places=0) - self.assertAlmostEqual(static_model('setPALevel_num', 'energy'), 4700, places=0) - self.assertAlmostEqual(static_model('startListening', 'energy'), 4309602, places=0) - self.assertAlmostEqual(static_model('stopListening', 'energy'), 193775, places=0) - self.assertAlmostEqual(static_model('write_nb', 'energy'), 218339, places=0) - self.assertAlmostEqual(static_model('begin', 'rel_energy_prev'), 1649571, places=0) - self.assertAlmostEqual(static_model('epilogue', 'rel_energy_prev'), -744114, places=0) - self.assertAlmostEqual(static_model('powerDown', 'rel_energy_prev'), 3854, places=0) - self.assertAlmostEqual(static_model('powerUp', 'rel_energy_prev'), 1641381, places=0) - self.assertAlmostEqual(static_model('setDataRate_num', 'rel_energy_prev'), 6777, places=0) - self.assertAlmostEqual(static_model('setPALevel_num', 'rel_energy_prev'), 3728, places=0) - self.assertAlmostEqual(static_model('startListening', 'rel_energy_prev'), 4307769, places=0) - self.assertAlmostEqual(static_model('stopListening', 'rel_energy_prev'), -13533693, places=0) - self.assertAlmostEqual(static_model('write_nb', 'rel_energy_prev'), 214618, places=0) - self.assertAlmostEqual(static_model('begin', 'duration'), 19830, places=0) - self.assertAlmostEqual(static_model('epilogue', 'duration'), 40, places=0) - self.assertAlmostEqual(static_model('powerDown', 'duration'), 90, places=0) - self.assertAlmostEqual(static_model('powerUp', 'duration'), 10030, places=0) - self.assertAlmostEqual(static_model('setDataRate_num', 'duration'), 140, places=0) - self.assertAlmostEqual(static_model('setPALevel_num', 'duration'), 90, places=0) - self.assertAlmostEqual(static_model('startListening', 'duration'), 260, places=0) - self.assertAlmostEqual(static_model('stopListening', 'duration'), 260, places=0) - self.assertAlmostEqual(static_model('write_nb', 'duration'), 510, places=0) - - self.assertAlmostEqual(model.stats.param_dependence_ratio('POWERDOWN', 'power', 'datarate'), 0, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('POWERDOWN', 'power', 'txbytes'), 0, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('POWERDOWN', 'power', 'txpower'), 0, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('RX', 'power', 'datarate'), 0.99, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('RX', 'power', 'txbytes'), 0, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('RX', 'power', 'txpower'), 0.01, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('STANDBY1', 'power', 'datarate'), 0.04, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('STANDBY1', 'power', 'txbytes'), 0.35, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('STANDBY1', 'power', 'txpower'), 0.32, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('TX', 'power', 'datarate'), 1, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('TX', 'power', 'txbytes'), 0.09, places=2) - self.assertAlmostEqual(model.stats.param_dependence_ratio('TX', 'power', 'txpower'), 1, places=2) + self.assertAlmostEqual(static_model("POWERDOWN", "power"), 0, places=0) + self.assertAlmostEqual(static_model("RX", "power"), 52254, places=0) + self.assertAlmostEqual(static_model("STANDBY1", "power"), 7, places=0) + self.assertAlmostEqual(static_model("TX", "power"), 18414, places=0) + self.assertAlmostEqual(static_model("begin", "energy"), 1652249, places=0) + self.assertAlmostEqual(static_model("epilogue", "energy"), 15449, places=0) + self.assertAlmostEqual(static_model("powerDown", "energy"), 4547, places=0) + self.assertAlmostEqual(static_model("powerUp", "energy"), 1641765, places=0) + self.assertAlmostEqual( + static_model("setDataRate_num", "energy"), 7749, places=0 + ) + self.assertAlmostEqual(static_model("setPALevel_num", "energy"), 4700, places=0) + self.assertAlmostEqual( + static_model("startListening", "energy"), 4309602, places=0 + ) + self.assertAlmostEqual( + static_model("stopListening", "energy"), 193775, places=0 + ) + self.assertAlmostEqual(static_model("write_nb", "energy"), 218339, places=0) + self.assertAlmostEqual( + static_model("begin", "rel_energy_prev"), 1649571, places=0 + ) + self.assertAlmostEqual( + static_model("epilogue", "rel_energy_prev"), -744114, places=0 + ) + self.assertAlmostEqual( + static_model("powerDown", "rel_energy_prev"), 3854, places=0 + ) + self.assertAlmostEqual( + static_model("powerUp", "rel_energy_prev"), 1641381, places=0 + ) + self.assertAlmostEqual( + static_model("setDataRate_num", "rel_energy_prev"), 6777, places=0 + ) + self.assertAlmostEqual( + static_model("setPALevel_num", "rel_energy_prev"), 3728, places=0 + ) + self.assertAlmostEqual( + static_model("startListening", "rel_energy_prev"), 4307769, places=0 + ) + self.assertAlmostEqual( + static_model("stopListening", "rel_energy_prev"), -13533693, places=0 + ) + self.assertAlmostEqual( + static_model("write_nb", "rel_energy_prev"), 214618, places=0 + ) + self.assertAlmostEqual(static_model("begin", "duration"), 19830, places=0) + self.assertAlmostEqual(static_model("epilogue", "duration"), 40, places=0) + self.assertAlmostEqual(static_model("powerDown", "duration"), 90, places=0) + self.assertAlmostEqual(static_model("powerUp", "duration"), 10030, places=0) + self.assertAlmostEqual( + static_model("setDataRate_num", "duration"), 140, places=0 + ) + self.assertAlmostEqual(static_model("setPALevel_num", "duration"), 90, places=0) + self.assertAlmostEqual( + static_model("startListening", "duration"), 260, places=0 + ) + self.assertAlmostEqual(static_model("stopListening", "duration"), 260, places=0) + self.assertAlmostEqual(static_model("write_nb", "duration"), 510, places=0) + + self.assertAlmostEqual( + model.stats.param_dependence_ratio("POWERDOWN", "power", "datarate"), + 0, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("POWERDOWN", "power", "txbytes"), + 0, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("POWERDOWN", "power", "txpower"), + 0, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("RX", "power", "datarate"), + 0.99, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("RX", "power", "txbytes"), 0, places=2 + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("RX", "power", "txpower"), 0.01, places=2 + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("STANDBY1", "power", "datarate"), + 0.04, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("STANDBY1", "power", "txbytes"), + 0.35, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("STANDBY1", "power", "txpower"), + 0.32, + places=2, + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("TX", "power", "datarate"), 1, places=2 + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("TX", "power", "txbytes"), 0.09, places=2 + ) + self.assertAlmostEqual( + model.stats.param_dependence_ratio("TX", "power", "txpower"), 1, places=2 + ) param_model, param_info = model.get_fitted() - self.assertEqual(param_info('POWERDOWN', 'power'), None) - self.assertEqual(param_info('RX', 'power')['function']._model_str, - '0 + regression_arg(0) + regression_arg(1) * np.sqrt(parameter(datarate))') - self.assertAlmostEqual(param_info('RX', 'power')['function']._regression_args[0], 48530.7, places=0) - self.assertAlmostEqual(param_info('RX', 'power')['function']._regression_args[1], 117, places=0) - self.assertEqual(param_info('STANDBY1', 'power'), None) - self.assertEqual(param_info('TX', 'power')['function']._model_str, - '0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate)) + regression_arg(2) * parameter(txpower) + regression_arg(3) * 1/(parameter(datarate)) * parameter(txpower)') - self.assertEqual(param_info('epilogue', 'timeout')['function']._model_str, - '0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate))') - self.assertEqual(param_info('stopListening', 'duration')['function']._model_str, - '0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate))') - - self.assertAlmostEqual(param_model('RX', 'power', param=[1, None, None]), 48647, places=-1) - - def test_model_singlefile_mmparam(self): - raw_data = RawData(['test-data/20161221_123347_mmparam.tar']) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + self.assertEqual(param_info("POWERDOWN", "power"), None) + self.assertEqual( + param_info("RX", "power")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * np.sqrt(parameter(datarate))", + ) + self.assertAlmostEqual( + param_info("RX", "power")["function"].model_args[0], 48530.7, places=0 + ) + self.assertAlmostEqual( + param_info("RX", "power")["function"].model_args[1], 117, places=0 + ) + self.assertEqual(param_info("STANDBY1", "power"), None) + self.assertEqual( + param_info("TX", "power")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate)) + regression_arg(2) * parameter(txpower) + regression_arg(3) * 1/(parameter(datarate)) * parameter(txpower)", + ) + self.assertEqual( + param_info("epilogue", "timeout")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate))", + ) + self.assertEqual( + param_info("stopListening", "duration")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * 1/(parameter(datarate))", + ) + + self.assertAlmostEqual( + param_model("RX", "power", param=[1, None, None]), 48647, places=-1 + ) + + def test_singlefile_mmparam(self): + raw_data = RawData(["test-data/20161221_123347_mmparam.tar"]) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'OFF ON'.split(' ')) - self.assertEqual(model.transitions(), 'off setBrightness'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual(model.states(), "OFF ON".split(" ")) + self.assertEqual(model.transitions(), "off setBrightness".split(" ")) static_model = model.get_static() - self.assertAlmostEqual(static_model('OFF', 'power'), 7124, places=0) - self.assertAlmostEqual(static_model('ON', 'power'), 17866, places=0) - self.assertAlmostEqual(static_model('off', 'energy'), 268079197, places=0) - self.assertAlmostEqual(static_model('setBrightness', 'energy'), 168912773, places=0) - self.assertAlmostEqual(static_model('off', 'rel_energy_prev'), 105040198, places=0) - self.assertAlmostEqual(static_model('setBrightness', 'rel_energy_prev'), 103745586, places=0) - self.assertAlmostEqual(static_model('off', 'duration'), 9130, places=0) - self.assertAlmostEqual(static_model('setBrightness', 'duration'), 9130, places=0) + self.assertAlmostEqual(static_model("OFF", "power"), 7124, places=0) + self.assertAlmostEqual(static_model("ON", "power"), 17866, places=0) + self.assertAlmostEqual(static_model("off", "energy"), 268079197, places=0) + self.assertAlmostEqual( + static_model("setBrightness", "energy"), 168912773, places=0 + ) + self.assertAlmostEqual( + static_model("off", "rel_energy_prev"), 105040198, places=0 + ) + self.assertAlmostEqual( + static_model("setBrightness", "rel_energy_prev"), 103745586, places=0 + ) + self.assertAlmostEqual(static_model("off", "duration"), 9130, places=0) + self.assertAlmostEqual( + static_model("setBrightness", "duration"), 9130, places=0 + ) param_lut_model = model.get_param_lut() - self.assertAlmostEqual(param_lut_model('OFF', 'power', param=[None, None]), 7124, places=0) + self.assertAlmostEqual( + param_lut_model("OFF", "power", param=[None, None]), 7124, places=0 + ) with self.assertRaises(KeyError): - param_lut_model('ON', 'power', param=[None, None]) - param_lut_model('ON', 'power', param=['a']) - param_lut_model('ON', 'power', param=[0]) - self.assertTrue(param_lut_model('ON', 'power', param=[0, 0])) + param_lut_model("ON", "power", param=[None, None]) + param_lut_model("ON", "power", param=["a"]) + param_lut_model("ON", "power", param=[0]) + self.assertTrue(param_lut_model("ON", "power", param=[0, 0])) param_lut_model = model.get_param_lut(fallback=True) - self.assertAlmostEqual(param_lut_model('ON', 'power', param=[None, None]), 17866, places=0) + self.assertAlmostEqual( + param_lut_model("ON", "power", param=[None, None]), 17866, places=0 + ) - def test_model_multifile_lm75x(self): + def test_multifile_lm75x(self): testfiles = [ - 'test-data/20170116_124500_LM75x.tar', - 'test-data/20170116_131306_LM75x.tar', + "test-data/20170116_124500_LM75x.tar", + "test-data/20170116_131306_LM75x.tar", ] raw_data = RawData(testfiles) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'ACTIVE POWEROFF'.split(' ')) - self.assertEqual(model.transitions(), 'getTemp setHyst setOS shutdown start'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual(model.states(), "ACTIVE POWEROFF".split(" ")) + self.assertEqual( + model.transitions(), "getTemp setHyst setOS shutdown start".split(" ") + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('ACTIVE', 'power'), 332, places=0) - self.assertAlmostEqual(static_model('POWEROFF', 'power'), 7, places=0) - self.assertAlmostEqual(static_model('getTemp', 'energy'), 26016748, places=0) - self.assertAlmostEqual(static_model('setHyst', 'energy'), 22082226, places=0) - self.assertAlmostEqual(static_model('setOS', 'energy'), 21774238, places=0) - self.assertAlmostEqual(static_model('shutdown', 'energy'), 11808160, places=0) - self.assertAlmostEqual(static_model('start', 'energy'), 12445302, places=0) - self.assertAlmostEqual(static_model('getTemp', 'rel_energy_prev'), 21722720, places=0) - self.assertAlmostEqual(static_model('setHyst', 'rel_energy_prev'), 19001499, places=0) - self.assertAlmostEqual(static_model('setOS', 'rel_energy_prev'), 18693283, places=0) - self.assertAlmostEqual(static_model('shutdown', 'rel_energy_prev'), 11746224, places=0) - self.assertAlmostEqual(static_model('start', 'rel_energy_prev'), 12391462, places=0) - self.assertAlmostEqual(static_model('getTemp', 'duration'), 12740, places=0) - self.assertAlmostEqual(static_model('setHyst', 'duration'), 9140, places=0) - self.assertAlmostEqual(static_model('setOS', 'duration'), 9140, places=0) - self.assertAlmostEqual(static_model('shutdown', 'duration'), 6980, places=0) - self.assertAlmostEqual(static_model('start', 'duration'), 6980, places=0) - - def test_model_multifile_sharp(self): + self.assertAlmostEqual(static_model("ACTIVE", "power"), 332, places=0) + self.assertAlmostEqual(static_model("POWEROFF", "power"), 7, places=0) + self.assertAlmostEqual(static_model("getTemp", "energy"), 26016748, places=0) + self.assertAlmostEqual(static_model("setHyst", "energy"), 22082226, places=0) + self.assertAlmostEqual(static_model("setOS", "energy"), 21774238, places=0) + self.assertAlmostEqual(static_model("shutdown", "energy"), 11808160, places=0) + self.assertAlmostEqual(static_model("start", "energy"), 12445302, places=0) + self.assertAlmostEqual( + static_model("getTemp", "rel_energy_prev"), 21722720, places=0 + ) + self.assertAlmostEqual( + static_model("setHyst", "rel_energy_prev"), 19001499, places=0 + ) + self.assertAlmostEqual( + static_model("setOS", "rel_energy_prev"), 18693283, places=0 + ) + self.assertAlmostEqual( + static_model("shutdown", "rel_energy_prev"), 11746224, places=0 + ) + self.assertAlmostEqual( + static_model("start", "rel_energy_prev"), 12391462, places=0 + ) + self.assertAlmostEqual(static_model("getTemp", "duration"), 12740, places=0) + self.assertAlmostEqual(static_model("setHyst", "duration"), 9140, places=0) + self.assertAlmostEqual(static_model("setOS", "duration"), 9140, places=0) + self.assertAlmostEqual(static_model("shutdown", "duration"), 6980, places=0) + self.assertAlmostEqual(static_model("start", "duration"), 6980, places=0) + + def test_multifile_sharp(self): testfiles = [ - 'test-data/20170116_145420_sharpLS013B4DN.tar', - 'test-data/20170116_151348_sharpLS013B4DN.tar', + "test-data/20170116_145420_sharpLS013B4DN.tar", + "test-data/20170116_151348_sharpLS013B4DN.tar", ] raw_data = RawData(testfiles) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'DISABLED ENABLED'.split(' ')) - self.assertEqual(model.transitions(), 'clear disable enable ioInit sendLine toggleVCOM'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual(model.states(), "DISABLED ENABLED".split(" ")) + self.assertEqual( + model.transitions(), + "clear disable enable ioInit sendLine toggleVCOM".split(" "), + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('DISABLED', 'power'), 22, places=0) - self.assertAlmostEqual(static_model('ENABLED', 'power'), 24, places=0) - self.assertAlmostEqual(static_model('clear', 'energy'), 14059, places=0) - self.assertAlmostEqual(static_model('disable', 'energy'), 0, places=0) - self.assertAlmostEqual(static_model('enable', 'energy'), 0, places=0) - self.assertAlmostEqual(static_model('ioInit', 'energy'), 0, places=0) - self.assertAlmostEqual(static_model('sendLine', 'energy'), 37874, places=0) - self.assertAlmostEqual(static_model('toggleVCOM', 'energy'), 30991, places=0) - self.assertAlmostEqual(static_model('clear', 'rel_energy_prev'), 13329, places=0) - self.assertAlmostEqual(static_model('disable', 'rel_energy_prev'), 0, places=0) - self.assertAlmostEqual(static_model('enable', 'rel_energy_prev'), 0, places=0) - self.assertAlmostEqual(static_model('ioInit', 'rel_energy_prev'), 0, places=0) - self.assertAlmostEqual(static_model('sendLine', 'rel_energy_prev'), 33447, places=0) - self.assertAlmostEqual(static_model('toggleVCOM', 'rel_energy_prev'), 30242, places=0) - self.assertAlmostEqual(static_model('clear', 'duration'), 30, places=0) - self.assertAlmostEqual(static_model('disable', 'duration'), 0, places=0) - self.assertAlmostEqual(static_model('enable', 'duration'), 0, places=0) - self.assertAlmostEqual(static_model('ioInit', 'duration'), 0, places=0) - self.assertAlmostEqual(static_model('sendLine', 'duration'), 180, places=0) - self.assertAlmostEqual(static_model('toggleVCOM', 'duration'), 30, places=0) - - def test_model_multifile_mmstatic(self): + self.assertAlmostEqual(static_model("DISABLED", "power"), 22, places=0) + self.assertAlmostEqual(static_model("ENABLED", "power"), 24, places=0) + self.assertAlmostEqual(static_model("clear", "energy"), 14059, places=0) + self.assertAlmostEqual(static_model("disable", "energy"), 0, places=0) + self.assertAlmostEqual(static_model("enable", "energy"), 0, places=0) + self.assertAlmostEqual(static_model("ioInit", "energy"), 0, places=0) + self.assertAlmostEqual(static_model("sendLine", "energy"), 37874, places=0) + self.assertAlmostEqual(static_model("toggleVCOM", "energy"), 30991, places=0) + self.assertAlmostEqual( + static_model("clear", "rel_energy_prev"), 13329, places=0 + ) + self.assertAlmostEqual(static_model("disable", "rel_energy_prev"), 0, places=0) + self.assertAlmostEqual(static_model("enable", "rel_energy_prev"), 0, places=0) + self.assertAlmostEqual(static_model("ioInit", "rel_energy_prev"), 0, places=0) + self.assertAlmostEqual( + static_model("sendLine", "rel_energy_prev"), 33447, places=0 + ) + self.assertAlmostEqual( + static_model("toggleVCOM", "rel_energy_prev"), 30242, places=0 + ) + self.assertAlmostEqual(static_model("clear", "duration"), 30, places=0) + self.assertAlmostEqual(static_model("disable", "duration"), 0, places=0) + self.assertAlmostEqual(static_model("enable", "duration"), 0, places=0) + self.assertAlmostEqual(static_model("ioInit", "duration"), 0, places=0) + self.assertAlmostEqual(static_model("sendLine", "duration"), 180, places=0) + self.assertAlmostEqual(static_model("toggleVCOM", "duration"), 30, places=0) + + def test_multifile_mmstatic(self): testfiles = [ - 'test-data/20170116_143516_mmstatic.tar', - 'test-data/20170116_142654_mmstatic.tar', + "test-data/20170116_143516_mmstatic.tar", + "test-data/20170116_142654_mmstatic.tar", ] raw_data = RawData(testfiles) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'B G OFF R'.split(' ')) - self.assertEqual(model.transitions(), 'blue green off red'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual(model.states(), "B G OFF R".split(" ")) + self.assertEqual(model.transitions(), "blue green off red".split(" ")) static_model = model.get_static() - self.assertAlmostEqual(static_model('B', 'power'), 29443, places=0) - self.assertAlmostEqual(static_model('G', 'power'), 29432, places=0) - self.assertAlmostEqual(static_model('OFF', 'power'), 7057, places=0) - self.assertAlmostEqual(static_model('R', 'power'), 49068, places=0) - self.assertAlmostEqual(static_model('blue', 'energy'), 374440955, places=0) - self.assertAlmostEqual(static_model('green', 'energy'), 372026027, places=0) - self.assertAlmostEqual(static_model('off', 'energy'), 372999554, places=0) - self.assertAlmostEqual(static_model('red', 'energy'), 378936634, places=0) - self.assertAlmostEqual(static_model('blue', 'rel_energy_prev'), 105535587, places=0) - self.assertAlmostEqual(static_model('green', 'rel_energy_prev'), 102999371, places=0) - self.assertAlmostEqual(static_model('off', 'rel_energy_prev'), 103613698, places=0) - self.assertAlmostEqual(static_model('red', 'rel_energy_prev'), 110474331, places=0) - self.assertAlmostEqual(static_model('blue', 'duration'), 9140, places=0) - self.assertAlmostEqual(static_model('green', 'duration'), 9140, places=0) - self.assertAlmostEqual(static_model('off', 'duration'), 9140, places=0) - self.assertAlmostEqual(static_model('red', 'duration'), 9140, places=0) - - @pytest.mark.skipif('TEST_SLOW' not in os.environ, reason="slow test, set TEST_SLOW=1 to run") - def test_model_multifile_cc1200(self): + self.assertAlmostEqual(static_model("B", "power"), 29443, places=0) + self.assertAlmostEqual(static_model("G", "power"), 29432, places=0) + self.assertAlmostEqual(static_model("OFF", "power"), 7057, places=0) + self.assertAlmostEqual(static_model("R", "power"), 49068, places=0) + self.assertAlmostEqual(static_model("blue", "energy"), 374440955, places=0) + self.assertAlmostEqual(static_model("green", "energy"), 372026027, places=0) + self.assertAlmostEqual(static_model("off", "energy"), 372999554, places=0) + self.assertAlmostEqual(static_model("red", "energy"), 378936634, places=0) + self.assertAlmostEqual( + static_model("blue", "rel_energy_prev"), 105535587, places=0 + ) + self.assertAlmostEqual( + static_model("green", "rel_energy_prev"), 102999371, places=0 + ) + self.assertAlmostEqual( + static_model("off", "rel_energy_prev"), 103613698, places=0 + ) + self.assertAlmostEqual( + static_model("red", "rel_energy_prev"), 110474331, places=0 + ) + self.assertAlmostEqual(static_model("blue", "duration"), 9140, places=0) + self.assertAlmostEqual(static_model("green", "duration"), 9140, places=0) + self.assertAlmostEqual(static_model("off", "duration"), 9140, places=0) + self.assertAlmostEqual(static_model("red", "duration"), 9140, places=0) + + @pytest.mark.skipif( + "TEST_SLOW" not in os.environ, reason="slow test, set TEST_SLOW=1 to run" + ) + def test_multifile_cc1200(self): testfiles = [ - 'test-data/20170125_125433_cc1200.tar', - 'test-data/20170125_142420_cc1200.tar', - 'test-data/20170125_144957_cc1200.tar', - 'test-data/20170125_151149_cc1200.tar', - 'test-data/20170125_151824_cc1200.tar', - 'test-data/20170125_154019_cc1200.tar', + "test-data/20170125_125433_cc1200.tar", + "test-data/20170125_142420_cc1200.tar", + "test-data/20170125_144957_cc1200.tar", + "test-data/20170125_151149_cc1200.tar", + "test-data/20170125_151824_cc1200.tar", + "test-data/20170125_154019_cc1200.tar", ] raw_data = RawData(testfiles) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = PTAModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.states(), 'IDLE RX SLEEP SLEEP_EWOR SYNTH_ON TX XOFF'.split(' ')) - self.assertEqual(model.transitions(), 'crystal_off eWOR idle init prepare_xmit receive send setSymbolRate setTxPower sleep txDone'.split(' ')) + model = PTAModel(by_name, parameters, arg_count) + self.assertEqual( + model.states(), "IDLE RX SLEEP SLEEP_EWOR SYNTH_ON TX XOFF".split(" ") + ) + self.assertEqual( + model.transitions(), + "crystal_off eWOR idle init prepare_xmit receive send setSymbolRate setTxPower sleep txDone".split( + " " + ), + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('IDLE', 'power'), 9500, places=0) - self.assertAlmostEqual(static_model('RX', 'power'), 85177, places=0) - self.assertAlmostEqual(static_model('SLEEP', 'power'), 143, places=0) - self.assertAlmostEqual(static_model('SLEEP_EWOR', 'power'), 81801, places=0) - self.assertAlmostEqual(static_model('SYNTH_ON', 'power'), 60036, places=0) - self.assertAlmostEqual(static_model('TX', 'power'), 92461, places=0) - self.assertAlmostEqual(static_model('XOFF', 'power'), 780, places=0) - self.assertAlmostEqual(static_model('crystal_off', 'energy'), 114658, places=0) - self.assertAlmostEqual(static_model('eWOR', 'energy'), 317556, places=0) - self.assertAlmostEqual(static_model('idle', 'energy'), 717713, places=0) - self.assertAlmostEqual(static_model('init', 'energy'), 23028941, places=0) - self.assertAlmostEqual(static_model('prepare_xmit', 'energy'), 378552, places=0) - self.assertAlmostEqual(static_model('receive', 'energy'), 380335, places=0) - self.assertAlmostEqual(static_model('send', 'energy'), 4282597, places=0) - self.assertAlmostEqual(static_model('setSymbolRate', 'energy'), 962060, places=0) - self.assertAlmostEqual(static_model('setTxPower', 'energy'), 288701, places=0) - self.assertAlmostEqual(static_model('sleep', 'energy'), 104445, places=0) - self.assertEqual(static_model('txDone', 'energy'), 0) + self.assertAlmostEqual(static_model("IDLE", "power"), 9500, places=0) + self.assertAlmostEqual(static_model("RX", "power"), 85177, places=0) + self.assertAlmostEqual(static_model("SLEEP", "power"), 143, places=0) + self.assertAlmostEqual(static_model("SLEEP_EWOR", "power"), 81801, places=0) + self.assertAlmostEqual(static_model("SYNTH_ON", "power"), 60036, places=0) + self.assertAlmostEqual(static_model("TX", "power"), 92461, places=0) + self.assertAlmostEqual(static_model("XOFF", "power"), 780, places=0) + self.assertAlmostEqual(static_model("crystal_off", "energy"), 114658, places=0) + self.assertAlmostEqual(static_model("eWOR", "energy"), 317556, places=0) + self.assertAlmostEqual(static_model("idle", "energy"), 717713, places=0) + self.assertAlmostEqual(static_model("init", "energy"), 23028941, places=0) + self.assertAlmostEqual(static_model("prepare_xmit", "energy"), 378552, places=0) + self.assertAlmostEqual(static_model("receive", "energy"), 380335, places=0) + self.assertAlmostEqual(static_model("send", "energy"), 4282597, places=0) + self.assertAlmostEqual( + static_model("setSymbolRate", "energy"), 962060, places=0 + ) + self.assertAlmostEqual(static_model("setTxPower", "energy"), 288701, places=0) + self.assertAlmostEqual(static_model("sleep", "energy"), 104445, places=0) + self.assertEqual(static_model("txDone", "energy"), 0) param_model, param_info = model.get_fitted() - self.assertEqual(param_info('IDLE', 'power'), None) - self.assertEqual(param_info('RX', 'power')['function']._model_str, - '0 + regression_arg(0) + regression_arg(1) * np.log(parameter(symbolrate) + 1)') - self.assertEqual(param_info('SLEEP', 'power'), None) - self.assertEqual(param_info('SLEEP_EWOR', 'power'), None) - self.assertEqual(param_info('SYNTH_ON', 'power'), None) - self.assertEqual(param_info('XOFF', 'power'), None) + self.assertEqual(param_info("IDLE", "power"), None) + self.assertEqual( + param_info("RX", "power")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * np.log(parameter(symbolrate) + 1)", + ) + self.assertEqual(param_info("SLEEP", "power"), None) + self.assertEqual(param_info("SLEEP_EWOR", "power"), None) + self.assertEqual(param_info("SYNTH_ON", "power"), None) + self.assertEqual(param_info("XOFF", "power"), None) - self.assertAlmostEqual(param_info('RX', 'power')['function']._regression_args[0], 84415, places=0) - self.assertAlmostEqual(param_info('RX', 'power')['function']._regression_args[1], 206, places=0) + self.assertAlmostEqual( + param_info("RX", "power")["function"].model_args[0], 84415, places=0 + ) + self.assertAlmostEqual( + param_info("RX", "power")["function"].model_args[1], 206, places=0 + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_timingharness.py b/test/test_timingharness.py index c8a422c..917e4e2 100755 --- a/test/test_timingharness.py +++ b/test/test_timingharness.py @@ -1,95 +1,157 @@ #!/usr/bin/env python3 -from dfatool.dfatool import AnalyticModel, TimingData, pta_trace_to_aggregate +from dfatool.loader import TimingData, pta_trace_to_aggregate +from dfatool.model import AnalyticModel from dfatool.parameters import prune_dependent_parameters import unittest class TestModels(unittest.TestCase): def test_model_singlefile_rf24(self): - raw_data = TimingData(['test-data/20190815_111745_nRF24_no-rx.json']) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + raw_data = TimingData(["test-data/20190815_111745_nRF24_no-rx.json"]) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = AnalyticModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.names, 'setPALevel setRetries setup write'.split(' ')) + model = AnalyticModel(by_name, parameters, arg_count) + self.assertEqual(model.names, "setPALevel setRetries setup write".split(" ")) static_model = model.get_static() - self.assertAlmostEqual(static_model('setPALevel', 'duration'), 146, places=0) - self.assertAlmostEqual(static_model('setRetries', 'duration'), 73, places=0) - self.assertAlmostEqual(static_model('setup', 'duration'), 6533, places=0) - self.assertAlmostEqual(static_model('write', 'duration'), 12634, places=0) - - for transition in 'setPALevel setRetries setup write'.split(' '): - self.assertAlmostEqual(model.stats.param_dependence_ratio(transition, 'duration', 'channel'), 0, places=2) + self.assertAlmostEqual(static_model("setPALevel", "duration"), 146, places=0) + self.assertAlmostEqual(static_model("setRetries", "duration"), 73, places=0) + self.assertAlmostEqual(static_model("setup", "duration"), 6533, places=0) + self.assertAlmostEqual(static_model("write", "duration"), 12634, places=0) + + for transition in "setPALevel setRetries setup write".split(" "): + self.assertAlmostEqual( + model.stats.param_dependence_ratio(transition, "duration", "channel"), + 0, + places=2, + ) param_model, param_info = model.get_fitted() - self.assertEqual(param_info('setPALevel', 'duration'), None) - self.assertEqual(param_info('setRetries', 'duration'), None) - self.assertEqual(param_info('setup', 'duration'), None) - self.assertEqual(param_info('write', 'duration')['function']._model_str, '0 + regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay)') - - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[0], 1163, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[1], 464, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[2], 1, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[3], 1, places=0) + self.assertEqual(param_info("setPALevel", "duration"), None) + self.assertEqual(param_info("setRetries", "duration"), None) + self.assertEqual(param_info("setup", "duration"), None) + self.assertEqual( + param_info("write", "duration")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay)", + ) + + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[0], 1163, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[1], 464, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[2], 1, places=0 + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[3], 1, places=0 + ) def test_dependent_parameter_pruning(self): - raw_data = TimingData(['test-data/20190815_103347_nRF24_no-rx.json']) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + raw_data = TimingData(["test-data/20190815_103347_nRF24_no-rx.json"]) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) prune_dependent_parameters(by_name, parameters) - model = AnalyticModel(by_name, parameters, arg_count, verbose=False) - self.assertEqual(model.names, 'getObserveTx setPALevel setRetries setup write'.split(' ')) + model = AnalyticModel(by_name, parameters, arg_count) + self.assertEqual( + model.names, "getObserveTx setPALevel setRetries setup write".split(" ") + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('getObserveTx', 'duration'), 75, places=0) - self.assertAlmostEqual(static_model('setPALevel', 'duration'), 146, places=0) - self.assertAlmostEqual(static_model('setRetries', 'duration'), 73, places=0) - self.assertAlmostEqual(static_model('setup', 'duration'), 6533, places=0) - self.assertAlmostEqual(static_model('write', 'duration'), 12634, places=0) - - for transition in 'getObserveTx setPALevel setRetries setup write'.split(' '): - self.assertAlmostEqual(model.stats.param_dependence_ratio(transition, 'duration', 'channel'), 0, places=2) + self.assertAlmostEqual(static_model("getObserveTx", "duration"), 75, places=0) + self.assertAlmostEqual(static_model("setPALevel", "duration"), 146, places=0) + self.assertAlmostEqual(static_model("setRetries", "duration"), 73, places=0) + self.assertAlmostEqual(static_model("setup", "duration"), 6533, places=0) + self.assertAlmostEqual(static_model("write", "duration"), 12634, places=0) + + for transition in "getObserveTx setPALevel setRetries setup write".split(" "): + self.assertAlmostEqual( + model.stats.param_dependence_ratio(transition, "duration", "channel"), + 0, + places=2, + ) param_model, param_info = model.get_fitted() - self.assertEqual(param_info('getObserveTx', 'duration'), None) - self.assertEqual(param_info('setPALevel', 'duration'), None) - self.assertEqual(param_info('setRetries', 'duration'), None) - self.assertEqual(param_info('setup', 'duration'), None) - self.assertEqual(param_info('write', 'duration')['function']._model_str, '0 + regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay)') - - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[0], 1163, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[1], 464, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[2], 1, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[3], 1, places=0) + self.assertEqual(param_info("getObserveTx", "duration"), None) + self.assertEqual(param_info("setPALevel", "duration"), None) + self.assertEqual(param_info("setRetries", "duration"), None) + self.assertEqual(param_info("setup", "duration"), None) + self.assertEqual( + param_info("write", "duration")["function"].model_function, + "0 + regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay)", + ) + + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[0], 1163, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[1], 464, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[2], 1, places=0 + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[3], 1, places=0 + ) def test_function_override(self): - raw_data = TimingData(['test-data/20190815_122531_nRF24_no-rx.json']) - preprocessed_data = raw_data.get_preprocessed_data(verbose=False) + raw_data = TimingData(["test-data/20190815_122531_nRF24_no-rx.json"]) + preprocessed_data = raw_data.get_preprocessed_data() by_name, parameters, arg_count = pta_trace_to_aggregate(preprocessed_data) - model = AnalyticModel(by_name, parameters, arg_count, verbose=False, function_override={('write', 'duration'): '(parameter(auto_ack!) * (regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay))) + ((1 - parameter(auto_ack!)) * regression_arg(4))'}) - self.assertEqual(model.names, 'setAutoAck setPALevel setRetries setup write'.split(' ')) + model = AnalyticModel( + by_name, + parameters, + arg_count, + function_override={ + ( + "write", + "duration", + ): "(parameter(auto_ack!) * (regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay))) + ((1 - parameter(auto_ack!)) * regression_arg(4))" + }, + ) + self.assertEqual( + model.names, "setAutoAck setPALevel setRetries setup write".split(" ") + ) static_model = model.get_static() - self.assertAlmostEqual(static_model('setAutoAck', 'duration'), 72, places=0) - self.assertAlmostEqual(static_model('setPALevel', 'duration'), 146, places=0) - self.assertAlmostEqual(static_model('setRetries', 'duration'), 73, places=0) - self.assertAlmostEqual(static_model('setup', 'duration'), 6533, places=0) - self.assertAlmostEqual(static_model('write', 'duration'), 1181, places=0) - - for transition in 'setAutoAck setPALevel setRetries setup write'.split(' '): - self.assertAlmostEqual(model.stats.param_dependence_ratio(transition, 'duration', 'channel'), 0, places=2) + self.assertAlmostEqual(static_model("setAutoAck", "duration"), 72, places=0) + self.assertAlmostEqual(static_model("setPALevel", "duration"), 146, places=0) + self.assertAlmostEqual(static_model("setRetries", "duration"), 73, places=0) + self.assertAlmostEqual(static_model("setup", "duration"), 6533, places=0) + self.assertAlmostEqual(static_model("write", "duration"), 1181, places=0) + + for transition in "setAutoAck setPALevel setRetries setup write".split(" "): + self.assertAlmostEqual( + model.stats.param_dependence_ratio(transition, "duration", "channel"), + 0, + places=2, + ) param_model, param_info = model.get_fitted() - self.assertEqual(param_info('setAutoAck', 'duration'), None) - self.assertEqual(param_info('setPALevel', 'duration'), None) - self.assertEqual(param_info('setRetries', 'duration'), None) - self.assertEqual(param_info('setup', 'duration'), None) - self.assertEqual(param_info('write', 'duration')['function']._model_str, '(parameter(auto_ack!) * (regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay))) + ((1 - parameter(auto_ack!)) * regression_arg(4))') - - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[0], 1162, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[1], 464, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[2], 1, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[3], 1, places=0) - self.assertAlmostEqual(param_info('write', 'duration')['function']._regression_args[4], 1086, places=0) - - -if __name__ == '__main__': + self.assertEqual(param_info("setAutoAck", "duration"), None) + self.assertEqual(param_info("setPALevel", "duration"), None) + self.assertEqual(param_info("setRetries", "duration"), None) + self.assertEqual(param_info("setup", "duration"), None) + self.assertEqual( + param_info("write", "duration")["function"].model_function, + "(parameter(auto_ack!) * (regression_arg(0) + regression_arg(1) * parameter(max_retry_count) + regression_arg(2) * parameter(retry_delay) + regression_arg(3) * parameter(max_retry_count) * parameter(retry_delay))) + ((1 - parameter(auto_ack!)) * regression_arg(4))", + ) + + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[0], 1162, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[1], 464, places=0, + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[2], 1, places=0 + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[3], 1, places=0 + ) + self.assertAlmostEqual( + param_info("write", "duration")["function"].model_args[4], 1086, places=0, + ) + + +if __name__ == "__main__": unittest.main() |