diff options
-rwxr-xr-x | lib/automata.py | 43 | ||||
-rwxr-xr-x | test/test_pta.py | 24 |
2 files changed, 65 insertions, 2 deletions
diff --git a/lib/automata.py b/lib/automata.py index 35be2fd..2c21389 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -5,6 +5,7 @@ from utils import is_numeric import itertools import numpy as np import json +import queue import yaml @@ -783,11 +784,49 @@ class PTA: ret.append((transition, arguments, param)) yield ret + def bfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', transition_filter=None, state_filter=None): + """ + Return a generator object for breadth-first search of traces starting at orig_state. + + Each trace consists of a list of + tuples describing the corresponding transition and (if enabled) + arguments and parameters. When both with_arguments and with_parameters + are True, each transition is a (Transition object, argument list, parameter dict) tuple. + + Note that the parameter dict refers to parameter values _after_ + passing the corresponding transition. Although this may seem odd at + first, it is useful when analyzing measurements: Properties of + the state following this transition may be affected by the parameters + set by the transition, so it is useful to have those readily available. + + A trace is (at the moment) a list of alternating states and transition, both starting and ending with a state. + Does not yield the no-operation trace consisting only of `orig_state`. If `orig_state` has no outgoing transitions, the output is empty. + + :param orig_state: initial state for breadth-first search + :param depth: search depth, default 10 + :param transition_filter: If set, only follow a transition if transition_filter(transition object) returns true. Default None. + :param state_iflter: If set, only follow a state if state_filter(state_object) returns true. Default None. + """ + state_queue = queue.Queue() + state_queue.put((list(), self.state[orig_state])) + + while not state_queue.empty(): + trace, state = state_queue.get() + if len(trace) > depth: + return + if state_filter is None or state_filter(state): + for transition in state.outgoing_transitions.values(): + if transition_filter is None or transition_filter(transition): + new_trace = trace.copy() + new_trace.append((transition,)) + yield new_trace + state_queue.put((new_trace, transition.destination)) + def dfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, **kwargs): """ Return a generator object for depth-first search starting at orig_state. - :param depth: search depth + :param depth: search depth, default 10 :param orig_state: initial state for depth-first search :param param_dict: initial parameter values :param with_arguments: perform dfs with argument values @@ -795,7 +834,7 @@ class PTA: :param trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned. :param sleep: sleep duration between states in us. If None or 0, no sleep pseudo-transitions will be included in the trace. - The returned generator emits traces. Each trace consts of a list of + Each trace consists of a list of tuples describing the corresponding transition and (if enabled) arguments and parameters. When both with_arguments and with_parameters are True, each transition is a (Transition object, argument list, parameter dict) tuple. diff --git a/test/test_pta.py b/test/test_pta.py index 7a93087..2ad4605 100755 --- a/test/test_pta.py +++ b/test/test_pta.py @@ -348,6 +348,30 @@ class TestPTA(unittest.TestCase): self.assertEqual(pta.get_transition_id(trace[3][0]), 1) self.assertEqual(pta.get_transition_id(trace[5][0]), 2) + def test_bfs(self): + pta = PTA(['IDLE', 'TX']) + pta.add_transition('UNINITIALIZED', 'IDLE', 'init') + pta.add_transition('IDLE', 'TX', 'send') + pta.add_transition('TX', 'IDLE', 'txComplete') + self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']]) + self.assertEqual(dfs_tran_to_name(pta.bfs(1), False), [['init'], ['init', 'send']]) + self.assertEqual(dfs_tran_to_name(pta.bfs(2), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete']]) + self.assertEqual(dfs_tran_to_name(pta.bfs(3), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete'], ['init', 'send', 'txComplete', 'send']]) + + pta = PTA(['IDLE']) + pta.add_transition('UNINITIALIZED', 'IDLE', 'init') + pta.add_transition('IDLE', 'IDLE', 'set1') + pta.add_transition('IDLE', 'IDLE', 'set2') + self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']]) + self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(1), False)), [['init'], ['init', 'set1'], ['init', 'set2']]) + self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(2), False)), [['init'], + ['init', 'set1'], + ['init', 'set1', 'set1'], + ['init', 'set1', 'set2'], + ['init', 'set2'], + ['init', 'set2', 'set1'], + ['init', 'set2', 'set2']]) + def test_from_json(self): pta = PTA.from_json(example_json_1) self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower']) |