summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/test_automata.py180
-rwxr-xr-xlib/automata.py38
2 files changed, 132 insertions, 86 deletions
diff --git a/bin/test_automata.py b/bin/test_automata.py
index c21a5a1..fd3f462 100755
--- a/bin/test_automata.py
+++ b/bin/test_automata.py
@@ -3,6 +3,82 @@
from automata import PTA
import unittest
+example_json_1 = {
+ 'parameters' : ['datarate', 'txbytes', 'txpower'],
+ 'initial_param_values' : [None, None],
+ 'states' : {
+ 'IDLE' : {
+ 'power' : {
+ 'static' : 5,
+ }
+ },
+ 'TX' : {
+ 'power' : {
+ 'static' : 100,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * parameter(txpower)',
+ 'regression_args' : [ 100, 2 ]
+ },
+ }
+ },
+ },
+ 'transitions' : [
+ {
+ 'name' : 'init',
+ 'origin' : 'UNINITIALIZED',
+ 'destination' : 'IDLE',
+ 'duration' : {
+ 'static' : 50000,
+ },
+ },
+ {
+ 'name' : 'setTxPower',
+ 'origin' : 'IDLE',
+ 'destination' : 'IDLE',
+ 'duration' : { 'static' : 120 },
+ 'energy ' : { 'static' : 10000 },
+ 'arg_to_param_map' : { 'txpower' : 0 },
+ },
+ {
+ 'name' : 'send',
+ 'origin' : 'IDLE',
+ 'destination' : 'TX',
+ 'duration' : {
+ 'static' : 10,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * function_arg(1)',
+ 'regression_args' : [48, 8],
+ },
+ },
+ 'energy' : {
+ 'static' : 3,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * function_arg(1)',
+ 'regression_args' : [3, 5],
+ },
+ },
+ 'arg_to_param_map' : { 'txbytes' : 1 },
+ },
+ {
+ 'name' : 'txComplete',
+ 'origin' : 'TX',
+ 'destination' : 'IDLE',
+ 'is_interrupt' : 1,
+ 'timeout' : {
+ 'static' : 2000,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * parameter(txbytes)',
+ 'regression_args' : [ 500, 16 ],
+ },
+ },
+ }
+ ],
+}
+
class TestPTA(unittest.TestCase):
def test_dfs(self):
pta = PTA(['IDLE', 'TX'])
@@ -19,95 +95,31 @@ class TestPTA(unittest.TestCase):
pta.add_transition('IDLE', 'IDLE', 'set1')
pta.add_transition('IDLE', 'IDLE', 'set2')
self.assertEqual(list(pta.dfs(0)), [['init']])
- self.assertEqual(list(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']])
- self.assertEqual(list(pta.dfs(2)), [['init', 'set1', 'set1'],
+ self.assertEqual(sorted(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']])
+ self.assertEqual(sorted(pta.dfs(2)), [['init', 'set1', 'set1'],
['init', 'set1', 'set2'],
['init', 'set2', 'set1'],
['init', 'set2', 'set2']])
def test_from_json(self):
- json_input = {
- 'parameters' : ['datarate', 'txbytes', 'txpower'],
- 'initial_param_values' : [None, None],
- 'states' : {
- 'IDLE' : {
- 'power' : {
- 'static' : 5,
- }
- },
- 'TX' : {
- 'power' : {
- 'static' : 100,
- 'function' : {
- 'raw' : '0 + regression_arg(0) + regression_arg(1)'
- ' * parameter(txpower) + regression_arg(2)'
- ' * 1 / parameter(datarate) + regression_arg(3)'
- ' * parameter(txpower) * 1 / parameter(datarate)',
- 'regression_args' : [
- 13918.0985307854,
- 245.185518812381,
- 8368807.4712942,
- 271527.656040595
- ],
- },
- }
- },
- },
- 'transitions' : [
- {
- 'name' : 'init',
- 'origin' : 'UNINITIALIZED',
- 'destination' : 'IDLE',
- 'duration' : {
- 'static' : 50000,
- },
- },
- {
- 'name' : 'setTxPower',
- 'origin' : 'IDLE',
- 'destination' : 'IDLE',
- 'duration' : { 'static' : 120 },
- 'energy ' : { 'static' : 10000 },
- 'arg_to_param_map' : [ 'txpower' ],
- },
- {
- 'name' : 'send',
- 'origin' : 'IDLE',
- 'destination' : 'TX',
- 'duration' : {
- 'static' : 10,
- 'function' : {
- 'raw' : 'regression_arg(0) + regression_arg(1)'
- ' * function_arg(1)',
- 'regression_args' : [48, 8],
- },
- },
- 'energy' : {
- 'static' : 3,
- 'function' : {
- 'raw' : 'regression_arg(0) + regression_arg(1)'
- ' * function_arg(1)',
- 'regression_args' : [3, 5],
- },
- },
- 'arg_to_param_map' : [ None, 'txbytes' ],
- },
- {
- 'name' : 'txComplete',
- 'origin' : 'TX',
- 'destination' : 'IDLE',
- 'is_interrupt' : 1,
- 'timeout' : {
- 'static' : 2000,
- 'function' : {
- 'raw' : 'regression_arg(0) + regression_arg(1)'
- ' * parameter(txbytes)',
- 'regression_args' : [ 500, 16 ],
- },
- },
- }
- ],
- }
+ pta = PTA.from_json(example_json_1)
+ self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower'])
+ self.assertEqual(pta.states['UNINITIALIZED'].name, 'UNINITIALIZED')
+ self.assertEqual(pta.states['IDLE'].name, 'IDLE')
+ self.assertEqual(pta.states['TX'].name, 'TX')
+ self.assertEqual(len(pta.transitions), 4)
+ self.assertEqual(pta.transitions[0].name, 'init')
+ self.assertEqual(pta.transitions[1].name, 'setTxPower')
+ self.assertEqual(pta.transitions[2].name, 'send')
+ self.assertEqual(pta.transitions[3].name, 'txComplete')
+
+ def test_from_json_dfs(self):
+ pta = PTA.from_json(example_json_1)
+ self.assertEqual(sorted(pta.dfs(1)), [['init', 'send'], ['init', 'setTxPower']])
+
+ def test_from_json_function(self):
+ pta = PTA.from_json(example_json_1)
+ self.assertEqual(pta.states['TX'].get_energy(1000, {'datarate' : 10, 'txbytes' : 6, 'txpower' : 10 }), 1000 * (100 + 2 * 10))
def test_simulation(self):
pta = PTA()
diff --git a/lib/automata.py b/lib/automata.py
index ff32bf7..4e76581 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -104,6 +104,17 @@ class State:
new_suffix.extend(suffix)
yield new_suffix
+def _json_function_to_analytic_function(base, attribute, parameters):
+ if attribute in base and 'function' in base[attribute]:
+ base = base[attribute]['function']
+ return AnalyticFunction(base['raw'], parameters, 0, regression_args = base['regression_args'])
+ return None
+
+def _json_get_static(base, attribute):
+ if attribute in base:
+ return base[attribute]['static']
+ return 0
+
class PTA:
def __init__(self, state_names = [], parameters = [], initial_param_values = None):
self.states = dict([[state_name, State(state_name)] for state_name in state_names])
@@ -117,8 +128,31 @@ class PTA:
if not 'UNINITIALIZED' in state_names:
self.states['UNINITIALIZED'] = State('UNINITIALIZED')
+ @classmethod
+ def from_json(cls, json_input):
+ kwargs = {}
+ for key in ('state_names', 'parameters', 'initial_param_values'):
+ if key in json_input:
+ kwargs[key] = json_input[key]
+ pta = cls(**kwargs)
+ for name, state in json_input['states'].items():
+ power_function = _json_function_to_analytic_function(state, 'power', pta.parameters)
+ pta.add_state(name, power = _json_get_static(state, 'power'), power_function = power_function)
+ for transition in json_input['transitions']:
+ duration_function = _json_function_to_analytic_function(transition, 'duration', pta.parameters)
+ energy_function = _json_function_to_analytic_function(transition, 'energy', pta.parameters)
+ timeout_function = _json_function_to_analytic_function(transition, 'timeout', pta.parameters)
+ pta.add_transition(transition['origin'], transition['destination'],
+ transition['name'],
+ duration = _json_get_static(transition, 'duration'),
+ energy = _json_get_static(transition, 'energy'),
+ timeout = _json_get_static(transition, 'timeout')
+ )
+
+ return pta
+
def add_state(self, state_name, **kwargs):
- if 'power_function' in kwargs:
+ if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction:
kwargs['power_function'] = AnalyticFunction(kwargs['power_function'],
self.parameters, 0)
self.states[state_name] = State(state_name, **kwargs)
@@ -127,7 +161,7 @@ class PTA:
orig_state = self.states[orig_state]
dest_state = self.states[dest_state]
for key in ('duration_function', 'energy_function', 'timeout_function'):
- if key in kwargs:
+ if key in kwargs and type(kwargs[key]) != AnalyticFunction:
kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0)
new_transition = Transition(orig_state, dest_state, function_name, **kwargs)