summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/test_automata.py265
-rwxr-xr-xlib/automata.py76
2 files changed, 319 insertions, 22 deletions
diff --git a/bin/test_automata.py b/bin/test_automata.py
new file mode 100755
index 0000000..5a62931
--- /dev/null
+++ b/bin/test_automata.py
@@ -0,0 +1,265 @@
+#!/usr/bin/env python3
+
+from automata import PTA
+import unittest
+
+class TestPTA(unittest.TestCase):
+ def test_dfs(self):
+ pta = PTA(['IDLE', 'TX'])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init')
+ pta.add_transition('IDLE', 'TX', 'send')
+ pta.add_transition('TX', 'IDLE', 'txComplete')
+ self.assertEqual(list(pta.dfs(0)), [['init']])
+ self.assertEqual(list(pta.dfs(1)), [['init', 'send']])
+ self.assertEqual(list(pta.dfs(2)), [['init', 'send', 'txComplete']])
+ self.assertEqual(list(pta.dfs(3)), [['init', 'send', 'txComplete', 'send']])
+
+ pta = PTA(['IDLE'])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init')
+ pta.add_transition('IDLE', 'IDLE', 'set1')
+ pta.add_transition('IDLE', 'IDLE', 'set2')
+ self.assertEqual(list(pta.dfs(0)), [['init']])
+ self.assertEqual(list(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']])
+ self.assertEqual(list(pta.dfs(2)), [['init', 'set1', 'set1'],
+ ['init', 'set1', 'set2'],
+ ['init', 'set2', 'set1'],
+ ['init', 'set2', 'set2']])
+
+ def test_from_json(self):
+ json_input = {
+ 'parameters' : ['datarate', 'txbytes', 'txpower'],
+ 'initial_param_values' : [None, None],
+ 'states' : {
+ 'IDLE' : {
+ 'power' : {
+ 'static' : 5,
+ }
+ },
+ 'TX' : {
+ 'power' : {
+ 'static' : 100,
+ 'function' : {
+ 'raw' : '0 + regression_arg(0) + regression_arg(1)'
+ ' * parameter(txpower) + regression_arg(2)'
+ ' * 1 / parameter(datarate) + regression_arg(3)'
+ ' * parameter(txpower) * 1 / parameter(datarate)',
+ 'regression_args' : [
+ 13918.0985307854,
+ 245.185518812381,
+ 8368807.4712942,
+ 271527.656040595
+ ],
+ },
+ }
+ },
+ },
+ 'transitions' : [
+ {
+ 'name' : 'init',
+ 'origin' : 'UNINITIALIZED',
+ 'destination' : 'IDLE',
+ 'duration' : {
+ 'static' : 50000,
+ },
+ },
+ {
+ 'name' : 'setTxPower',
+ 'origin' : 'IDLE',
+ 'destination' : 'IDLE',
+ 'duration' : { 'static' : 120 },
+ 'energy ' : { 'static' : 10000 },
+ 'arg_to_param_map' : [ 'txpower' ],
+ },
+ {
+ 'name' : 'send',
+ 'origin' : 'IDLE',
+ 'destination' : 'TX',
+ 'duration' : {
+ 'static' : 10,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * function_arg(1)',
+ 'regression_args' : [48, 8],
+ },
+ },
+ 'energy' : {
+ 'static' : 3,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * function_arg(1)',
+ 'regression_args' : [3, 5],
+ },
+ },
+ 'arg_to_param_map' : [ None, 'txbytes' ],
+ },
+ {
+ 'name' : 'txComplete',
+ 'origin' : 'TX',
+ 'destination' : 'IDLE',
+ 'is_interrupt' : 1,
+ 'timeout' : {
+ 'static' : 2000,
+ 'function' : {
+ 'raw' : 'regression_arg(0) + regression_arg(1)'
+ ' * parameter(txbytes)',
+ 'regression_args' : [ 500, 16 ],
+ },
+ },
+ }
+ ],
+ }
+
+ def test_simulation(self):
+ pta = PTA()
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100)
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', duration = 50000)
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10)
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True)
+ trace = [
+ ['init'],
+ ['sleep', 10000000],
+ ['send', 'foo', 3],
+ ['sleep', 5000000],
+ ['send', 'foo', 3]
+ ]
+ expected_energy = 5. * 10000000 + 3 + 100 * 2000 + 5 * 5000000 + 3 + 100 * 2000
+ expected_duration = 50000 + 10000000 + 10 + 2000 + 5000000 + 10 + 2000
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {})
+
+ def test_simulation_param_none(self):
+ pta = PTA(parameters = ['txpower', 'length'])
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100)
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000)
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10)
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True)
+ trace = [
+ ['init'],
+ ]
+ expected_energy = 500000
+ expected_duration = 50000
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {
+ 'txpower' : None,
+ 'length' : None
+ })
+
+ def test_simulation_param_set(self):
+ pta = PTA(parameters = ['txpower', 'length'])
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100)
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000)
+ pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120,
+ param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]})
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10)
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True)
+ trace = [
+ ['init'],
+ ['setTxPower', 10]
+ ]
+ expected_energy = 510000
+ expected_duration = 50120
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {
+ 'txpower' : 10,
+ 'length' : None
+ })
+
+ def test_simulation_arg_function(self):
+ pta = PTA(parameters = ['txpower', 'length'])
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100)
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000)
+ pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120,
+ param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]})
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10,
+ energy_function = lambda param, arg: 3 + 5 * arg[1],
+ duration_function = lambda param, arg: 48 + 8 * arg[1])
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True)
+ trace = [
+ ['init'],
+ ['setTxPower', 10],
+ ['send', 'foo', 3],
+ ]
+ expected_energy = 500000 + 10000 + (3 + 5 * 3) + (2000 * 100)
+ expected_duration = 50000 + 120 + (48 + 8 * 3) + 2000
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {
+ 'txpower' : 10,
+ 'length' : None
+ })
+
+ pta = PTA(parameters = ['txpower', 'length'])
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100)
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000)
+ pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120,
+ param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]})
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10,
+ energy_function = lambda param, arg: 3 + 5 * arg[1],
+ duration_function = lambda param, arg: 48 + 8 * arg[1])
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True)
+ trace = [
+ ['init'],
+ ['setTxPower', 10],
+ ['send', 'foobar', 6],
+ ]
+ expected_energy = 500000 + 10000 + (3 + 5 * 6) + (2000 * 100)
+ expected_duration = 50000 + 120 + (48 + 8 * 6) + 2000
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {
+ 'txpower' : 10,
+ 'length' : None
+ })
+
+
+ def test_simulation_param_function(self):
+ pta = PTA(parameters = ['length', 'txpower'])
+ pta.add_state('IDLE', power = 5)
+ pta.add_state('TX', power = 100,
+ power_function = lambda param, arg: 1000 + 2 * param[1])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000)
+ pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120,
+ param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]})
+ pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10,
+ energy_function = lambda param, arg: 3 + 5 * arg[1],
+ param_update_function = lambda param, arg: {**param, 'length' : arg[1]})
+ pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True,
+ timeout_function = lambda param, arg: 500 + 16 * param[0])
+ trace = [
+ ['init'],
+ ['setTxPower', 10],
+ ['send', 'foo', 3],
+ ]
+ expected_energy = 500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3)
+ expected_duration = 50000 + 120 + 10 + (500 + 16 * 3)
+ power, duration, state, parameters = pta.simulate(trace)
+ self.assertEqual(power, expected_energy)
+ self.assertEqual(duration, expected_duration)
+ self.assertEqual(state.name, 'IDLE')
+ self.assertEqual(parameters, {
+ 'txpower' : 10,
+ 'length' : 3
+ })
+
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/lib/automata.py b/lib/automata.py
index c61b065..0948cee 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -1,10 +1,23 @@
+from dfatool import AnalyticFunction
+
+def _parse_function(input_function):
+ if type('input_function') == 'str':
+ raise NotImplemented
+ if type('input_function') == 'function':
+ return 'raise ValueError', input_function
+ raise ValueError('Function description must be provided as string or function')
+
+def _dict_to_list(input_dict):
+ return [input_dict[x] for x in sorted(input_dict.keys())]
+
class Transition:
def __init__(self, orig_state, dest_state, name,
energy = 0, energy_function = None,
duration = 0, duration_function = None,
timeout = 0, timeout_function = None,
is_interrupt = False,
- arguments = [], param_update_function = None):
+ arguments = [], param_update_function = None,
+ arg_to_param_map = None):
self.name = name
self.origin = orig_state
self.destination = dest_state
@@ -17,26 +30,31 @@ class Transition:
self.is_interrupt = is_interrupt
self.arguments = arguments.copy()
self.param_update_function = param_update_function
+ self.arg_to_param_map = arg_to_param_map
- def get_duration(self, parameters = [], args = []):
+ def get_duration(self, param_dict = {}, args = []):
if self.duration_function:
- return self.duration_function(parameters, args)
+ return self.duration_function.eval(_dict_to_list(param_dict), args)
return self.duration
- def get_energy(self, parameters = [], args = []):
+ def get_energy(self, param_dict = {}, args = []):
if self.energy_function:
- return self.energy_function(parameters, args)
+ return self.energy_function.eval(_dict_to_list(param_dict), args)
return self.energy
- def get_timeout(self, parameters = []):
+ def get_timeout(self, param_dict = {}):
if self.timeout_function:
- return self.timeout_function(parameters)
+ return self.timeout_function.eval(_dict_to_list(param_dict))
return self.timeout
- def get_params_after_transition(self, parameters, args = []):
+ def get_params_after_transition(self, param_dict, args = []):
if self.param_update_function:
- return self.param_update_function(parameters, args)
- return parameters
+ return self.param_update_function(param_dict, args)
+ if self.arg_to_param_map:
+ ret = {}
+ #for arg_index in range(self.arg_to_param_map):
+ # if self.arg_to_param_map[arg_index]
+ return param_dict
class State:
def __init__(self, name, power = 0, power_function = None):
@@ -45,12 +63,19 @@ class State:
self.power_function = power_function
self.outgoing_transitions = {}
+ """@classmethod
+ def from_json(cls, serialized_state):
+ if 'power' in serialized_state:
+ cls.power = serialized_state['power']['static']
+ if 'function' in serialized_state:
+ cls.power_function = """
+
def add_outgoing_transition(self, new_transition):
self.outgoing_transitions[new_transition.name] = new_transition
- def get_energy(self, duration, parameters = []):
+ def get_energy(self, duration, param_dict = {}):
if self.power_function:
- return self.power_function(parameters) * duration
+ return self.power_function.eval(_dict_to_list(param_dict)) * duration
return self.power * duration
def get_transition(self, transition_name):
@@ -92,11 +117,18 @@ class PTA:
self.states['UNINITIALIZED'] = State('UNINITIALIZED')
def add_state(self, state_name, **kwargs):
+ if 'power_function' in kwargs:
+ kwargs['power_function'] = AnalyticFunction(kwargs['power_function'],
+ self.parameters, 0)
self.states[state_name] = State(state_name, **kwargs)
def add_transition(self, orig_state, dest_state, function_name, **kwargs):
orig_state = self.states[orig_state]
dest_state = self.states[dest_state]
+ for key in ('duration_function', 'energy_function', 'timeout_function'):
+ if key in kwargs:
+ kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0)
+
new_transition = Transition(orig_state, dest_state, function_name, **kwargs)
self.transitions.append(new_transition)
orig_state.add_outgoing_transition(new_transition)
@@ -108,26 +140,26 @@ class PTA:
total_duration = 0.
total_energy = 0.
state = self.states[orig_state]
- parameters = self.initial_param_values
+ param_dict = dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))])
for function in trace:
function_name = function[0]
function_args = function[1 : ]
if function_name == 'sleep':
duration = function_args[0]
- total_energy += state.get_energy(duration, parameters)
+ total_energy += state.get_energy(duration, param_dict)
total_duration += duration
else:
transition = state.get_transition(function_name)
- total_duration += transition.get_duration(parameters, function_args)
- total_energy += transition.get_energy(parameters, function_args)
- parameters = transition.get_params_after_transition(parameters, function_args)
+ total_duration += transition.get_duration(param_dict, function_args)
+ total_energy += transition.get_energy(param_dict, function_args)
+ param_dict = transition.get_params_after_transition(param_dict, function_args)
state = transition.destination
while (state.has_interrupt_transitions()):
- transition = state.get_next_interrupt(parameters)
- duration = transition.get_timeout(parameters)
+ transition = state.get_next_interrupt(param_dict)
+ duration = transition.get_timeout(param_dict)
total_duration += duration
- total_energy += state.get_energy(duration, parameters)
- parameters = transition.get_params_after_transition(parameters)
+ total_energy += state.get_energy(duration, param_dict)
+ param_dict = transition.get_params_after_transition(param_dict)
state = transition.destination
- return total_energy, total_duration, state, parameters
+ return total_energy, total_duration, state, param_dict