diff options
-rwxr-xr-x | lib/automata.py | 55 | ||||
-rwxr-xr-x | test/test_pta.py | 19 |
2 files changed, 57 insertions, 17 deletions
diff --git a/lib/automata.py b/lib/automata.py index 1ad80bb..3bb7878 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -79,7 +79,7 @@ class State: interrupts = sorted(interrupts, key = lambda x: x.get_timeout(parameters)) return interrupts[0] - def dfs(self, depth: int, with_arguments: bool = False, trace_filter = None): + def dfs(self, depth: int, with_arguments: bool = False, trace_filter = None, sleep: int = 0): """ Return a generator object for depth-first search over all outgoing transitions. @@ -88,7 +88,9 @@ class State: with_arguments -- perform dfs with function+argument transitions instead of just function transitions. trace_filter -- list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned. E.g. trace_filter = [['init', 'foo'], ['init', 'bar']] will only return traces with init as first and foo or bar as second element. - trace_filter = [['init', 'foo', '$'], ['init', 'bar'], '$'] will only return the traces ['init', 'foo'] and ['init', 'bar']. + trace_filter = [['init', 'foo', '$'], ['init', 'bar', '$']] will only return the traces ['init', 'foo'] and ['init', 'bar']. + sleep -- if set and non-zero: include sleep pseudo-states with <sleep> us duration + For the [['init', 'foo', '$'], ['init', 'bar', '$']] example above, sleep=10 results in [(None, 10), 'init', (None, 10), 'foo'] and [(None, 10), 'init', (None, 10), 'bar'] """ # A '$' entry in trace_filter indicates that the trace should (successfully) terminate here regardless of `depth`. @@ -102,12 +104,21 @@ class State: if with_arguments: if trans.argument_combination == 'cartesian': for args in itertools.product(*trans.argument_values): - yield [(trans, args)] + if sleep: + yield [(None, sleep), (trans, args)] + else: + yield [(trans, args)] else: for args in zip(*trans.argument_values): - yield [(trans, args)] + if sleep: + yield [(None, sleep), (trans, args)] + else: + yield [(trans, args)] else: - yield [(trans,)] + if sleep: + yield [(None, sleep), (trans,)] + else: + yield [(trans,)] else: for trans in self.outgoing_transitions.values(): if trace_filter is not None and next(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)), None) is None: @@ -119,11 +130,14 @@ class State: new_trace_filter = None else: new_trace_filter = None - for suffix in trans.destination.dfs(depth - 1, with_arguments = with_arguments, trace_filter = new_trace_filter): + for suffix in trans.destination.dfs(depth - 1, with_arguments = with_arguments, trace_filter = new_trace_filter, sleep = sleep): if with_arguments: if trans.argument_combination == 'cartesian': for args in itertools.product(*trans.argument_values): - new_suffix = [(trans, args)] + if sleep: + new_suffix = [(None, sleep), (trans, args)] + else: + new_suffix = [(trans, args)] new_suffix.extend(suffix) yield new_suffix else: @@ -132,11 +146,17 @@ class State: else: arg_values = [tuple()] for args in arg_values: - new_suffix = [(trans, args)] + if sleep: + new_suffix = [(None, sleep), (trans, args)] + else: + new_suffix = [(trans, args)] new_suffix.extend(suffix) yield new_suffix else: - new_suffix = [(trans,)] + if sleep: + new_suffix = [(None, sleep), (trans,)] + else: + new_suffix = [(trans,)] new_suffix.extend(suffix) yield new_suffix @@ -579,7 +599,8 @@ class PTA: ret = list() for elem in trace: transition, arguments = elem - param = transition.get_params_after_transition(param, arguments) + if transition is not None: + param = transition.get_params_after_transition(param, arguments) ret.append((transition, arguments, self.normalize_parameters(param))) yield ret @@ -587,13 +608,13 @@ class PTA: """ Return a generator object for depth-first search starting at orig_state. - arguments: - depth: search depth - orig_state: initial state for depth-first search - param_dict: initial parameter values - with_arguments: perform dfs with argument values - with_parameters: include parameters in trace? - trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned. + :param depth: search depth + :param orig_state: initial state for depth-first search + :param param_dict: initial parameter values + :param with_arguments: perform dfs with argument values + :param with_parameters: include parameters in trace? + :param trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned. + :param sleep: sleep duration between states in us. If None or 0, no sleep pseudo-transitions will be included in the trace. The returned generator emits traces. Each trace consts of a list of tuples describing the corresponding transition and (if enabled) diff --git a/test/test_pta.py b/test/test_pta.py index e8de399..9bd6362 100755 --- a/test/test_pta.py +++ b/test/test_pta.py @@ -191,6 +191,25 @@ class TestPTA(unittest.TestCase): self.assertEqual(pta.get_transition_id(trace[1][0]), 1) self.assertEqual(pta.get_transition_id(trace[2][0]), 2) + def test_dfs_with_sleep(self): + pta = PTA(['IDLE', 'TX']) + pta.add_transition('UNINITIALIZED', 'IDLE', 'init') + pta.add_transition('IDLE', 'TX', 'send') + pta.add_transition('TX', 'IDLE', 'txComplete') + traces = list(pta.dfs(2, sleep = 10)) + self.assertEqual(len(traces), 1) + trace = traces[0] + self.assertEqual(len(trace), 6) + self.assertIsNone(trace[0][0]) + self.assertEqual(trace[1][0].name, 'init') + self.assertIsNone(trace[2][0]) + self.assertEqual(trace[3][0].name, 'send') + self.assertIsNone(trace[4][0]) + self.assertEqual(trace[5][0].name, 'txComplete') + self.assertEqual(pta.get_transition_id(trace[1][0]), 0) + self.assertEqual(pta.get_transition_id(trace[3][0]), 1) + self.assertEqual(pta.get_transition_id(trace[5][0]), 2) + def test_from_json(self): pta = PTA.from_json(example_json_1) self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower']) |