diff options
| -rwxr-xr-x | bin/test_automata.py | 265 | ||||
| -rwxr-xr-x | lib/automata.py | 76 | 
2 files changed, 319 insertions, 22 deletions
| diff --git a/bin/test_automata.py b/bin/test_automata.py new file mode 100755 index 0000000..5a62931 --- /dev/null +++ b/bin/test_automata.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 + +from automata import PTA +import unittest + +class TestPTA(unittest.TestCase): +    def test_dfs(self): +        pta = PTA(['IDLE', 'TX']) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init') +        pta.add_transition('IDLE', 'TX', 'send') +        pta.add_transition('TX', 'IDLE', 'txComplete') +        self.assertEqual(list(pta.dfs(0)), [['init']]) +        self.assertEqual(list(pta.dfs(1)), [['init', 'send']]) +        self.assertEqual(list(pta.dfs(2)), [['init', 'send', 'txComplete']]) +        self.assertEqual(list(pta.dfs(3)), [['init', 'send', 'txComplete', 'send']]) + +        pta = PTA(['IDLE']) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init') +        pta.add_transition('IDLE', 'IDLE', 'set1') +        pta.add_transition('IDLE', 'IDLE', 'set2') +        self.assertEqual(list(pta.dfs(0)), [['init']]) +        self.assertEqual(list(pta.dfs(1)), [['init', 'set1'], ['init', 'set2']]) +        self.assertEqual(list(pta.dfs(2)), [['init', 'set1', 'set1'], +            ['init', 'set1', 'set2'], +            ['init', 'set2', 'set1'], +            ['init', 'set2', 'set2']]) + +    def test_from_json(self): +        json_input = { +            'parameters' : ['datarate', 'txbytes', 'txpower'], +            'initial_param_values' : [None, None], +            'states' : { +                'IDLE' : { +                    'power' : { +                        'static' : 5, +                    } +                }, +                'TX' : { +                    'power' : { +                        'static' : 100, +                        'function' : { +                            'raw' : '0 + regression_arg(0) + regression_arg(1)' +                                ' * parameter(txpower) + regression_arg(2)' +                                ' * 1 / parameter(datarate) + regression_arg(3)' +                                ' * parameter(txpower) * 1 / parameter(datarate)', +                            'regression_args' : [ +                                13918.0985307854, +                                245.185518812381, +                                8368807.4712942, +                                271527.656040595 +                            ], +                        }, +                    } +                }, +            }, +            'transitions' : [ +                { +                    'name' : 'init', +                    'origin' : 'UNINITIALIZED', +                    'destination' : 'IDLE', +                    'duration' : { +                        'static' : 50000, +                    }, +                }, +                { +                    'name' : 'setTxPower', +                    'origin' : 'IDLE', +                    'destination' : 'IDLE', +                    'duration' : { 'static' : 120 }, +                    'energy ' : { 'static' : 10000 }, +                    'arg_to_param_map' : [ 'txpower' ], +                }, +                { +                    'name' : 'send', +                    'origin' : 'IDLE', +                    'destination' : 'TX', +                    'duration' : { +                        'static' : 10, +                        'function' : { +                            'raw' : 'regression_arg(0) + regression_arg(1)' +                                ' * function_arg(1)', +                            'regression_args' : [48, 8], +                        }, +                    }, +                    'energy' : { +                        'static' : 3, +                        'function' : { +                            'raw' : 'regression_arg(0) + regression_arg(1)' +                                ' * function_arg(1)', +                            'regression_args' : [3, 5], +                        }, +                    }, +                    'arg_to_param_map' : [ None, 'txbytes' ], +                }, +                { +                    'name' : 'txComplete', +                    'origin' : 'TX', +                    'destination' : 'IDLE', +                    'is_interrupt' : 1, +                    'timeout' : { +                        'static' : 2000, +                        'function' : { +                            'raw' : 'regression_arg(0) + regression_arg(1)' +                                ' * parameter(txbytes)', +                            'regression_args' : [ 500, 16 ], +                        }, +                    }, +                } +            ], +        } + +    def test_simulation(self): +        pta = PTA() +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', duration = 50000) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True) +        trace = [ +            ['init'], +            ['sleep', 10000000], +            ['send', 'foo', 3], +            ['sleep', 5000000], +            ['send', 'foo', 3] +        ] +        expected_energy = 5. * 10000000 + 3 + 100 * 2000 + 5 * 5000000 + 3 + 100 * 2000 +        expected_duration = 50000 + 10000000 + 10 + 2000 + 5000000 + 10 + 2000 +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, {}) + +    def test_simulation_param_none(self): +        pta = PTA(parameters = ['txpower', 'length']) +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True) +        trace = [ +            ['init'], +        ] +        expected_energy = 500000 +        expected_duration = 50000 +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, { +            'txpower' : None, +            'length' : None +        }) + +    def test_simulation_param_set(self): +        pta = PTA(parameters = ['txpower', 'length']) +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000) +        pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120, +            param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]}) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True) +        trace = [ +            ['init'], +            ['setTxPower', 10] +        ] +        expected_energy = 510000 +        expected_duration = 50120 +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, { +            'txpower' : 10, +            'length' : None +        }) + +    def test_simulation_arg_function(self): +        pta = PTA(parameters = ['txpower', 'length']) +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000) +        pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120, +            param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]}) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10, +            energy_function = lambda param, arg: 3 + 5 * arg[1], +            duration_function = lambda param, arg: 48 + 8 * arg[1]) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True) +        trace = [ +            ['init'], +            ['setTxPower', 10], +            ['send', 'foo', 3], +        ] +        expected_energy = 500000 + 10000 + (3 + 5 * 3) + (2000 * 100) +        expected_duration = 50000 + 120 + (48 + 8 * 3) + 2000 +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, { +            'txpower' : 10, +            'length' : None +        }) + +        pta = PTA(parameters = ['txpower', 'length']) +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000) +        pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120, +            param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]}) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10, +            energy_function = lambda param, arg: 3 + 5 * arg[1], +            duration_function = lambda param, arg: 48 + 8 * arg[1]) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True) +        trace = [ +            ['init'], +            ['setTxPower', 10], +            ['send', 'foobar', 6], +        ] +        expected_energy = 500000 + 10000 + (3 + 5 * 6) + (2000 * 100) +        expected_duration = 50000 + 120 + (48 + 8 * 6) + 2000 +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, { +            'txpower' : 10, +            'length' : None +        }) + + +    def test_simulation_param_function(self): +        pta = PTA(parameters = ['length', 'txpower']) +        pta.add_state('IDLE', power = 5) +        pta.add_state('TX', power = 100, +            power_function = lambda param, arg: 1000 + 2 * param[1]) +        pta.add_transition('UNINITIALIZED', 'IDLE', 'init', energy = 500000, duration = 50000) +        pta.add_transition('IDLE', 'IDLE', 'setTxPower', energy = 10000, duration = 120, +            param_update_function = lambda param, arg: {**param, 'txpower' : arg[0]}) +        pta.add_transition('IDLE', 'TX', 'send', energy = 3, duration = 10, +            energy_function = lambda param, arg: 3 + 5 * arg[1], +            param_update_function = lambda param, arg: {**param, 'length' : arg[1]}) +        pta.add_transition('TX', 'IDLE', 'txComplete', timeout = 2000, is_interrupt = True, +            timeout_function = lambda param, arg: 500 + 16 * param[0]) +        trace = [ +            ['init'], +            ['setTxPower', 10], +            ['send', 'foo', 3], +        ] +        expected_energy = 500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3) +        expected_duration = 50000 + 120 + 10 + (500 + 16 * 3) +        power, duration, state, parameters = pta.simulate(trace) +        self.assertEqual(power, expected_energy) +        self.assertEqual(duration, expected_duration) +        self.assertEqual(state.name, 'IDLE') +        self.assertEqual(parameters, { +            'txpower' : 10, +            'length' : 3 +        }) + + + +if __name__ == '__main__': +    unittest.main() diff --git a/lib/automata.py b/lib/automata.py index c61b065..0948cee 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -1,10 +1,23 @@ +from dfatool import AnalyticFunction + +def _parse_function(input_function): +    if type('input_function') == 'str': +        raise NotImplemented +    if type('input_function') == 'function': +        return 'raise ValueError', input_function +    raise ValueError('Function description must be provided as string or function') + +def _dict_to_list(input_dict): +    return [input_dict[x] for x in sorted(input_dict.keys())] +  class Transition:      def __init__(self, orig_state, dest_state, name,              energy = 0, energy_function = None,              duration = 0, duration_function = None,              timeout = 0, timeout_function = None,              is_interrupt = False, -            arguments = [], param_update_function = None): +            arguments = [], param_update_function = None, +            arg_to_param_map = None):          self.name = name          self.origin = orig_state          self.destination = dest_state @@ -17,26 +30,31 @@ class Transition:          self.is_interrupt = is_interrupt          self.arguments = arguments.copy()          self.param_update_function = param_update_function +        self.arg_to_param_map = arg_to_param_map -    def get_duration(self, parameters = [], args = []): +    def get_duration(self, param_dict = {}, args = []):          if self.duration_function: -            return self.duration_function(parameters, args) +            return self.duration_function.eval(_dict_to_list(param_dict), args)          return self.duration -    def get_energy(self, parameters = [], args = []): +    def get_energy(self, param_dict = {}, args = []):          if self.energy_function: -            return self.energy_function(parameters, args) +            return self.energy_function.eval(_dict_to_list(param_dict), args)          return self.energy -    def get_timeout(self, parameters = []): +    def get_timeout(self, param_dict = {}):          if self.timeout_function: -            return self.timeout_function(parameters) +            return self.timeout_function.eval(_dict_to_list(param_dict))          return self.timeout -    def get_params_after_transition(self, parameters, args = []): +    def get_params_after_transition(self, param_dict, args = []):          if self.param_update_function: -            return self.param_update_function(parameters, args) -        return parameters +            return self.param_update_function(param_dict, args) +        if self.arg_to_param_map: +            ret = {} +            #for arg_index in range(self.arg_to_param_map): +            #    if self.arg_to_param_map[arg_index] +        return param_dict  class State:      def __init__(self, name, power = 0, power_function = None): @@ -45,12 +63,19 @@ class State:          self.power_function = power_function          self.outgoing_transitions = {} +    """@classmethod +    def from_json(cls, serialized_state): +        if 'power' in serialized_state: +            cls.power = serialized_state['power']['static'] +            if 'function' in serialized_state: +                cls.power_function = """ +      def add_outgoing_transition(self, new_transition):          self.outgoing_transitions[new_transition.name] = new_transition -    def get_energy(self, duration, parameters = []): +    def get_energy(self, duration, param_dict = {}):          if self.power_function: -            return self.power_function(parameters) * duration +            return self.power_function.eval(_dict_to_list(param_dict)) * duration          return self.power * duration      def get_transition(self, transition_name): @@ -92,11 +117,18 @@ class PTA:              self.states['UNINITIALIZED'] = State('UNINITIALIZED')      def add_state(self, state_name, **kwargs): +        if 'power_function' in kwargs: +            kwargs['power_function'] = AnalyticFunction(kwargs['power_function'], +                self.parameters, 0)          self.states[state_name] = State(state_name, **kwargs)      def add_transition(self, orig_state, dest_state, function_name, **kwargs):          orig_state = self.states[orig_state]          dest_state = self.states[dest_state] +        for key in ('duration_function', 'energy_function', 'timeout_function'): +            if key in kwargs: +                kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) +          new_transition = Transition(orig_state, dest_state, function_name, **kwargs)          self.transitions.append(new_transition)          orig_state.add_outgoing_transition(new_transition) @@ -108,26 +140,26 @@ class PTA:          total_duration = 0.          total_energy = 0.          state = self.states[orig_state] -        parameters = self.initial_param_values +        param_dict = dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))])          for function in trace:              function_name = function[0]              function_args = function[1 : ]              if function_name == 'sleep':                  duration = function_args[0] -                total_energy += state.get_energy(duration, parameters) +                total_energy += state.get_energy(duration, param_dict)                  total_duration += duration              else:                  transition = state.get_transition(function_name) -                total_duration += transition.get_duration(parameters, function_args) -                total_energy += transition.get_energy(parameters, function_args) -                parameters = transition.get_params_after_transition(parameters, function_args) +                total_duration += transition.get_duration(param_dict, function_args) +                total_energy += transition.get_energy(param_dict, function_args) +                param_dict = transition.get_params_after_transition(param_dict, function_args)                  state = transition.destination                  while (state.has_interrupt_transitions()): -                    transition = state.get_next_interrupt(parameters) -                    duration = transition.get_timeout(parameters) +                    transition = state.get_next_interrupt(param_dict) +                    duration = transition.get_timeout(param_dict)                      total_duration += duration -                    total_energy += state.get_energy(duration, parameters) -                    parameters = transition.get_params_after_transition(parameters) +                    total_energy += state.get_energy(duration, param_dict) +                    param_dict = transition.get_params_after_transition(param_dict)                      state = transition.destination -        return total_energy, total_duration, state, parameters +        return total_energy, total_duration, state, param_dict | 
