summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xlib/automata.py55
-rwxr-xr-xtest/test_pta.py19
2 files changed, 57 insertions, 17 deletions
diff --git a/lib/automata.py b/lib/automata.py
index 1ad80bb..3bb7878 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -79,7 +79,7 @@ class State:
interrupts = sorted(interrupts, key = lambda x: x.get_timeout(parameters))
return interrupts[0]
- def dfs(self, depth: int, with_arguments: bool = False, trace_filter = None):
+ def dfs(self, depth: int, with_arguments: bool = False, trace_filter = None, sleep: int = 0):
"""
Return a generator object for depth-first search over all outgoing transitions.
@@ -88,7 +88,9 @@ class State:
with_arguments -- perform dfs with function+argument transitions instead of just function transitions.
trace_filter -- list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned.
E.g. trace_filter = [['init', 'foo'], ['init', 'bar']] will only return traces with init as first and foo or bar as second element.
- trace_filter = [['init', 'foo', '$'], ['init', 'bar'], '$'] will only return the traces ['init', 'foo'] and ['init', 'bar'].
+ trace_filter = [['init', 'foo', '$'], ['init', 'bar', '$']] will only return the traces ['init', 'foo'] and ['init', 'bar'].
+ sleep -- if set and non-zero: include sleep pseudo-states with <sleep> us duration
+ For the [['init', 'foo', '$'], ['init', 'bar', '$']] example above, sleep=10 results in [(None, 10), 'init', (None, 10), 'foo'] and [(None, 10), 'init', (None, 10), 'bar']
"""
# A '$' entry in trace_filter indicates that the trace should (successfully) terminate here regardless of `depth`.
@@ -102,12 +104,21 @@ class State:
if with_arguments:
if trans.argument_combination == 'cartesian':
for args in itertools.product(*trans.argument_values):
- yield [(trans, args)]
+ if sleep:
+ yield [(None, sleep), (trans, args)]
+ else:
+ yield [(trans, args)]
else:
for args in zip(*trans.argument_values):
- yield [(trans, args)]
+ if sleep:
+ yield [(None, sleep), (trans, args)]
+ else:
+ yield [(trans, args)]
else:
- yield [(trans,)]
+ if sleep:
+ yield [(None, sleep), (trans,)]
+ else:
+ yield [(trans,)]
else:
for trans in self.outgoing_transitions.values():
if trace_filter is not None and next(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)), None) is None:
@@ -119,11 +130,14 @@ class State:
new_trace_filter = None
else:
new_trace_filter = None
- for suffix in trans.destination.dfs(depth - 1, with_arguments = with_arguments, trace_filter = new_trace_filter):
+ for suffix in trans.destination.dfs(depth - 1, with_arguments = with_arguments, trace_filter = new_trace_filter, sleep = sleep):
if with_arguments:
if trans.argument_combination == 'cartesian':
for args in itertools.product(*trans.argument_values):
- new_suffix = [(trans, args)]
+ if sleep:
+ new_suffix = [(None, sleep), (trans, args)]
+ else:
+ new_suffix = [(trans, args)]
new_suffix.extend(suffix)
yield new_suffix
else:
@@ -132,11 +146,17 @@ class State:
else:
arg_values = [tuple()]
for args in arg_values:
- new_suffix = [(trans, args)]
+ if sleep:
+ new_suffix = [(None, sleep), (trans, args)]
+ else:
+ new_suffix = [(trans, args)]
new_suffix.extend(suffix)
yield new_suffix
else:
- new_suffix = [(trans,)]
+ if sleep:
+ new_suffix = [(None, sleep), (trans,)]
+ else:
+ new_suffix = [(trans,)]
new_suffix.extend(suffix)
yield new_suffix
@@ -579,7 +599,8 @@ class PTA:
ret = list()
for elem in trace:
transition, arguments = elem
- param = transition.get_params_after_transition(param, arguments)
+ if transition is not None:
+ param = transition.get_params_after_transition(param, arguments)
ret.append((transition, arguments, self.normalize_parameters(param)))
yield ret
@@ -587,13 +608,13 @@ class PTA:
"""
Return a generator object for depth-first search starting at orig_state.
- arguments:
- depth: search depth
- orig_state: initial state for depth-first search
- param_dict: initial parameter values
- with_arguments: perform dfs with argument values
- with_parameters: include parameters in trace?
- trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned.
+ :param depth: search depth
+ :param orig_state: initial state for depth-first search
+ :param param_dict: initial parameter values
+ :param with_arguments: perform dfs with argument values
+ :param with_parameters: include parameters in trace?
+ :param trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned.
+ :param sleep: sleep duration between states in us. If None or 0, no sleep pseudo-transitions will be included in the trace.
The returned generator emits traces. Each trace consts of a list of
tuples describing the corresponding transition and (if enabled)
diff --git a/test/test_pta.py b/test/test_pta.py
index e8de399..9bd6362 100755
--- a/test/test_pta.py
+++ b/test/test_pta.py
@@ -191,6 +191,25 @@ class TestPTA(unittest.TestCase):
self.assertEqual(pta.get_transition_id(trace[1][0]), 1)
self.assertEqual(pta.get_transition_id(trace[2][0]), 2)
+ def test_dfs_with_sleep(self):
+ pta = PTA(['IDLE', 'TX'])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init')
+ pta.add_transition('IDLE', 'TX', 'send')
+ pta.add_transition('TX', 'IDLE', 'txComplete')
+ traces = list(pta.dfs(2, sleep = 10))
+ self.assertEqual(len(traces), 1)
+ trace = traces[0]
+ self.assertEqual(len(trace), 6)
+ self.assertIsNone(trace[0][0])
+ self.assertEqual(trace[1][0].name, 'init')
+ self.assertIsNone(trace[2][0])
+ self.assertEqual(trace[3][0].name, 'send')
+ self.assertIsNone(trace[4][0])
+ self.assertEqual(trace[5][0].name, 'txComplete')
+ self.assertEqual(pta.get_transition_id(trace[1][0]), 0)
+ self.assertEqual(pta.get_transition_id(trace[3][0]), 1)
+ self.assertEqual(pta.get_transition_id(trace[5][0]), 2)
+
def test_from_json(self):
pta = PTA.from_json(example_json_1)
self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower'])