diff options
-rwxr-xr-x | bin/test_automata.py | 180 | ||||
-rwxr-xr-x | lib/automata.py | 38 |
2 files changed, 132 insertions, 86 deletions
diff --git a/bin/test_automata.py b/bin/test_automata.py index c21a5a1..fd3f462 100755 --- a/bin/test_automata.py +++ b/bin/test_automata.py @@ -3,6 +3,82 @@ from automata import PTA import unittest +example_json_1 = { + 'parameters' : ['datarate', 'txbytes', 'txpower'], + 'initial_param_values' : [None, None], + 'states' : { + 'IDLE' : { + 'power' : { + 'static' : 5, + } + }, + 'TX' : { + 'power' : { + 'static' : 100, + 'function' : { + 'raw' : 'regression_arg(0) + regression_arg(1)' + ' * parameter(txpower)', + 'regression_args' : [ 100, 2 ] + }, + } + }, + }, + 'transitions' : [ + { + 'name' : 'init', + 'origin' : 'UNINITIALIZED', + 'destination' : 'IDLE', + 'duration' : { + 'static' : 50000, + }, + }, + { + 'name' : 'setTxPower', + 'origin' : 'IDLE', + 'destination' : 'IDLE', + 'duration' : { 'static' : 120 }, + 'energy ' : { 'static' : 10000 }, + 'arg_to_param_map' : { 'txpower' : 0 }, + }, + { + 'name' : 'send', + 'origin' : 'IDLE', + 'destination' : 'TX', + 'duration' : { + 'static' : 10, + 'function' : { + 'raw' : 'regression_arg(0) + regression_arg(1)' + ' * function_arg(1)', + 'regression_args' : [48, 8], + }, + }, + 'energy' : { + 'static' : 3, + 'function' : { + 'raw' : 'regression_arg(0) + regression_arg(1)' + ' * function_arg(1)', + 'regression_args' : [3, 5], + }, + }, + 'arg_to_param_map' : { 'txbytes' : 1 }, + }, + { + 'name' : 'txComplete', + 'origin' : 'TX', + 'destination' : 'IDLE', + 'is_interrupt' : 1, + 'timeout' : { + 'static' : 2000, + 'function' : { + 'raw' : 'regression_arg(0) + regression_arg(1)' + ' * parameter(txbytes)', + 'regression_args' : [ 500, 16 ], + }, + }, + } + ], +} + class TestPTA(unittest.TestCase): def test_dfs(self): pta = PTA(['IDLE', 'TX']) @@ -19,95 +95,31 @@ class TestPTA(unittest.TestCase): pta.add_transition('IDLE', 'IDLE', 'set1') pta.add_transition('IDLE', 'IDLE', 'set2') self.assertEqual(list(pta.dfs(0)), [['init']]) - self.assertEqual(list(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']]) - self.assertEqual(list(pta.dfs(2)), [['init', 'set1', 'set1'], + self.assertEqual(sorted(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']]) + self.assertEqual(sorted(pta.dfs(2)), [['init', 'set1', 'set1'], ['init', 'set1', 'set2'], ['init', 'set2', 'set1'], ['init', 'set2', 'set2']]) def test_from_json(self): - json_input = { - 'parameters' : ['datarate', 'txbytes', 'txpower'], - 'initial_param_values' : [None, None], - 'states' : { - 'IDLE' : { - 'power' : { - 'static' : 5, - } - }, - 'TX' : { - 'power' : { - 'static' : 100, - 'function' : { - 'raw' : '0 + regression_arg(0) + regression_arg(1)' - ' * parameter(txpower) + regression_arg(2)' - ' * 1 / parameter(datarate) + regression_arg(3)' - ' * parameter(txpower) * 1 / parameter(datarate)', - 'regression_args' : [ - 13918.0985307854, - 245.185518812381, - 8368807.4712942, - 271527.656040595 - ], - }, - } - }, - }, - 'transitions' : [ - { - 'name' : 'init', - 'origin' : 'UNINITIALIZED', - 'destination' : 'IDLE', - 'duration' : { - 'static' : 50000, - }, - }, - { - 'name' : 'setTxPower', - 'origin' : 'IDLE', - 'destination' : 'IDLE', - 'duration' : { 'static' : 120 }, - 'energy ' : { 'static' : 10000 }, - 'arg_to_param_map' : [ 'txpower' ], - }, - { - 'name' : 'send', - 'origin' : 'IDLE', - 'destination' : 'TX', - 'duration' : { - 'static' : 10, - 'function' : { - 'raw' : 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args' : [48, 8], - }, - }, - 'energy' : { - 'static' : 3, - 'function' : { - 'raw' : 'regression_arg(0) + regression_arg(1)' - ' * function_arg(1)', - 'regression_args' : [3, 5], - }, - }, - 'arg_to_param_map' : [ None, 'txbytes' ], - }, - { - 'name' : 'txComplete', - 'origin' : 'TX', - 'destination' : 'IDLE', - 'is_interrupt' : 1, - 'timeout' : { - 'static' : 2000, - 'function' : { - 'raw' : 'regression_arg(0) + regression_arg(1)' - ' * parameter(txbytes)', - 'regression_args' : [ 500, 16 ], - }, - }, - } - ], - } + pta = PTA.from_json(example_json_1) + self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower']) + self.assertEqual(pta.states['UNINITIALIZED'].name, 'UNINITIALIZED') + self.assertEqual(pta.states['IDLE'].name, 'IDLE') + self.assertEqual(pta.states['TX'].name, 'TX') + self.assertEqual(len(pta.transitions), 4) + self.assertEqual(pta.transitions[0].name, 'init') + self.assertEqual(pta.transitions[1].name, 'setTxPower') + self.assertEqual(pta.transitions[2].name, 'send') + self.assertEqual(pta.transitions[3].name, 'txComplete') + + def test_from_json_dfs(self): + pta = PTA.from_json(example_json_1) + self.assertEqual(sorted(pta.dfs(1)), [['init', 'send'], ['init', 'setTxPower']]) + + def test_from_json_function(self): + pta = PTA.from_json(example_json_1) + self.assertEqual(pta.states['TX'].get_energy(1000, {'datarate' : 10, 'txbytes' : 6, 'txpower' : 10 }), 1000 * (100 + 2 * 10)) def test_simulation(self): pta = PTA() diff --git a/lib/automata.py b/lib/automata.py index ff32bf7..4e76581 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -104,6 +104,17 @@ class State: new_suffix.extend(suffix) yield new_suffix +def _json_function_to_analytic_function(base, attribute, parameters): + if attribute in base and 'function' in base[attribute]: + base = base[attribute]['function'] + return AnalyticFunction(base['raw'], parameters, 0, regression_args = base['regression_args']) + return None + +def _json_get_static(base, attribute): + if attribute in base: + return base[attribute]['static'] + return 0 + class PTA: def __init__(self, state_names = [], parameters = [], initial_param_values = None): self.states = dict([[state_name, State(state_name)] for state_name in state_names]) @@ -117,8 +128,31 @@ class PTA: if not 'UNINITIALIZED' in state_names: self.states['UNINITIALIZED'] = State('UNINITIALIZED') + @classmethod + def from_json(cls, json_input): + kwargs = {} + for key in ('state_names', 'parameters', 'initial_param_values'): + if key in json_input: + kwargs[key] = json_input[key] + pta = cls(**kwargs) + for name, state in json_input['states'].items(): + power_function = _json_function_to_analytic_function(state, 'power', pta.parameters) + pta.add_state(name, power = _json_get_static(state, 'power'), power_function = power_function) + for transition in json_input['transitions']: + duration_function = _json_function_to_analytic_function(transition, 'duration', pta.parameters) + energy_function = _json_function_to_analytic_function(transition, 'energy', pta.parameters) + timeout_function = _json_function_to_analytic_function(transition, 'timeout', pta.parameters) + pta.add_transition(transition['origin'], transition['destination'], + transition['name'], + duration = _json_get_static(transition, 'duration'), + energy = _json_get_static(transition, 'energy'), + timeout = _json_get_static(transition, 'timeout') + ) + + return pta + def add_state(self, state_name, **kwargs): - if 'power_function' in kwargs: + if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction: kwargs['power_function'] = AnalyticFunction(kwargs['power_function'], self.parameters, 0) self.states[state_name] = State(state_name, **kwargs) @@ -127,7 +161,7 @@ class PTA: orig_state = self.states[orig_state] dest_state = self.states[dest_state] for key in ('duration_function', 'energy_function', 'timeout_function'): - if key in kwargs: + if key in kwargs and type(kwargs[key]) != AnalyticFunction: kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) new_transition = Transition(orig_state, dest_state, function_name, **kwargs) |