summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2019-11-27 09:04:12 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2019-11-27 09:10:15 +0100
commitd285be422c90851019a31bfe6c28aed3f3eb55fb (patch)
treee03683ac061b1fc73fdc4800fc5e5fd12fc839e0
parent88f991dc873084701f60736ae92695cf04939586 (diff)
PTA: Add breadth-first search
-rwxr-xr-xlib/automata.py43
-rwxr-xr-xtest/test_pta.py24
2 files changed, 65 insertions, 2 deletions
diff --git a/lib/automata.py b/lib/automata.py
index 35be2fd..2c21389 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -5,6 +5,7 @@ from utils import is_numeric
import itertools
import numpy as np
import json
+import queue
import yaml
@@ -783,11 +784,49 @@ class PTA:
ret.append((transition, arguments, param))
yield ret
+ def bfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', transition_filter=None, state_filter=None):
+ """
+ Return a generator object for breadth-first search of traces starting at orig_state.
+
+ Each trace consists of a list of
+ tuples describing the corresponding transition and (if enabled)
+ arguments and parameters. When both with_arguments and with_parameters
+ are True, each transition is a (Transition object, argument list, parameter dict) tuple.
+
+ Note that the parameter dict refers to parameter values _after_
+ passing the corresponding transition. Although this may seem odd at
+ first, it is useful when analyzing measurements: Properties of
+ the state following this transition may be affected by the parameters
+ set by the transition, so it is useful to have those readily available.
+
+ A trace is (at the moment) a list of alternating states and transition, both starting and ending with a state.
+ Does not yield the no-operation trace consisting only of `orig_state`. If `orig_state` has no outgoing transitions, the output is empty.
+
+ :param orig_state: initial state for breadth-first search
+ :param depth: search depth, default 10
+ :param transition_filter: If set, only follow a transition if transition_filter(transition object) returns true. Default None.
+ :param state_iflter: If set, only follow a state if state_filter(state_object) returns true. Default None.
+ """
+ state_queue = queue.Queue()
+ state_queue.put((list(), self.state[orig_state]))
+
+ while not state_queue.empty():
+ trace, state = state_queue.get()
+ if len(trace) > depth:
+ return
+ if state_filter is None or state_filter(state):
+ for transition in state.outgoing_transitions.values():
+ if transition_filter is None or transition_filter(transition):
+ new_trace = trace.copy()
+ new_trace.append((transition,))
+ yield new_trace
+ state_queue.put((new_trace, transition.destination))
+
def dfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, **kwargs):
"""
Return a generator object for depth-first search starting at orig_state.
- :param depth: search depth
+ :param depth: search depth, default 10
:param orig_state: initial state for depth-first search
:param param_dict: initial parameter values
:param with_arguments: perform dfs with argument values
@@ -795,7 +834,7 @@ class PTA:
:param trace_filter: list of lists. Each sub-list is a trace. Only traces matching one of the provided sub-lists are returned.
:param sleep: sleep duration between states in us. If None or 0, no sleep pseudo-transitions will be included in the trace.
- The returned generator emits traces. Each trace consts of a list of
+ Each trace consists of a list of
tuples describing the corresponding transition and (if enabled)
arguments and parameters. When both with_arguments and with_parameters
are True, each transition is a (Transition object, argument list, parameter dict) tuple.
diff --git a/test/test_pta.py b/test/test_pta.py
index 7a93087..2ad4605 100755
--- a/test/test_pta.py
+++ b/test/test_pta.py
@@ -348,6 +348,30 @@ class TestPTA(unittest.TestCase):
self.assertEqual(pta.get_transition_id(trace[3][0]), 1)
self.assertEqual(pta.get_transition_id(trace[5][0]), 2)
+ def test_bfs(self):
+ pta = PTA(['IDLE', 'TX'])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init')
+ pta.add_transition('IDLE', 'TX', 'send')
+ pta.add_transition('TX', 'IDLE', 'txComplete')
+ self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']])
+ self.assertEqual(dfs_tran_to_name(pta.bfs(1), False), [['init'], ['init', 'send']])
+ self.assertEqual(dfs_tran_to_name(pta.bfs(2), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete']])
+ self.assertEqual(dfs_tran_to_name(pta.bfs(3), False), [['init'], ['init', 'send'], ['init', 'send', 'txComplete'], ['init', 'send', 'txComplete', 'send']])
+
+ pta = PTA(['IDLE'])
+ pta.add_transition('UNINITIALIZED', 'IDLE', 'init')
+ pta.add_transition('IDLE', 'IDLE', 'set1')
+ pta.add_transition('IDLE', 'IDLE', 'set2')
+ self.assertEqual(dfs_tran_to_name(pta.bfs(0), False), [['init']])
+ self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(1), False)), [['init'], ['init', 'set1'], ['init', 'set2']])
+ self.assertEqual(sorted(dfs_tran_to_name(pta.bfs(2), False)), [['init'],
+ ['init', 'set1'],
+ ['init', 'set1', 'set1'],
+ ['init', 'set1', 'set2'],
+ ['init', 'set2'],
+ ['init', 'set2', 'set1'],
+ ['init', 'set2', 'set2']])
+
def test_from_json(self):
pta = PTA.from_json(example_json_1)
self.assertEqual(pta.parameters, ['datarate', 'txbytes', 'txpower'])