summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/analyze-trace.py37
-rw-r--r--lib/behaviour.py79
2 files changed, 80 insertions, 36 deletions
diff --git a/bin/analyze-trace.py b/bin/analyze-trace.py
index f721876..1cc3b89 100755
--- a/bin/analyze-trace.py
+++ b/bin/analyze-trace.py
@@ -82,47 +82,12 @@ def main():
return "∧".join(map(lambda kv: f"{kv[0]}={kv[1]}", guard))
for name in sorted(delta_by_name.keys()):
- delta_cond = dict()
for t_from, t_to_set in delta_by_name[name].items():
i_to_transition = dict()
delta_param_sets = list()
to_names = list()
transition_guard = dict()
- if len(t_to_set) > 1:
- am_tt_by_name = {
- name: {
- "attributes": [t_from],
- "param": list(),
- t_from: list(),
- },
- }
- for i, t_to in enumerate(sorted(t_to_set)):
- for param in delta_param_by_name[name][(t_from, t_to)]:
- am_tt_by_name[name]["param"].append(
- dfatool.utils.param_dict_to_list(
- dfatool.utils.param_str_to_dict(param),
- am_tt_param_names,
- )
- )
- am_tt_by_name[name][t_from].append(i)
- i_to_transition[i] = t_to
- am = AnalyticModel(am_tt_by_name, am_tt_param_names, force_tree=True)
- model, info = am.get_fitted()
- if type(info(name, t_from)) is df.SplitFunction:
- flat_model = info(name, t_from).flatten()
- else:
- flat_model = list()
- logging.warning(
- f"Model for {name} {t_from} is {info(name, t_from)}, expected SplitFunction"
- )
-
- for prefix, output in flat_model:
- transition_name = i_to_transition[int(output)]
- if transition_name not in transition_guard:
- transition_guard[transition_name] = list()
- transition_guard[transition_name].append(prefix)
-
for t_to in sorted(t_to_set):
delta_params = delta_param_by_name[name][(t_from, t_to)]
delta_param_sets.append(delta_params)
@@ -134,7 +99,7 @@ def main():
print(f"{name} {t_from} → {t_to} →")
else:
print(
- f"{name} {t_from} → {t_to} ({' ∨ '.join(map(format_guard, transition_guard.get(t_to, list()))) or '⊤'})"
+ f"{name} {t_from} → {t_to} ({' ∨ '.join(map(format_guard, bm.transition_guard[t_from].get(t_to, list()))) or '⊤'})"
)
for i in range(len(delta_param_sets)):
diff --git a/lib/behaviour.py b/lib/behaviour.py
index 156da5f..a0ceb95 100644
--- a/lib/behaviour.py
+++ b/lib/behaviour.py
@@ -2,6 +2,8 @@
import logging
from . import utils
+from .model import AnalyticModel
+from . import functions as df
logger = logging.getLogger(__name__)
@@ -35,6 +37,83 @@ class SDKBehaviourModel:
self.meta_observations = meta_observations
self.is_loop = is_loop
+ self.build_transition_guards()
+
+ def build_transition_guards(self):
+ self.transition_guard = dict()
+ for name in sorted(self.delta_by_name.keys()):
+ for t_from, t_to_set in self.delta_by_name[name].items():
+ i_to_transition = dict()
+ delta_param_sets = list()
+ to_names = list()
+ transition_guard = dict()
+
+ if len(t_to_set) > 1:
+ am_tt_by_name = {
+ name: {
+ "attributes": [t_from],
+ "param": list(),
+ t_from: list(),
+ },
+ }
+ for i, t_to in enumerate(sorted(t_to_set)):
+ for param in self.delta_param_by_name[name][(t_from, t_to)]:
+ am_tt_by_name[name]["param"].append(
+ utils.param_dict_to_list(
+ utils.param_str_to_dict(param),
+ self.am_tt_param_names,
+ )
+ )
+ am_tt_by_name[name][t_from].append(i)
+ i_to_transition[i] = t_to
+ am = AnalyticModel(
+ am_tt_by_name, self.am_tt_param_names, force_tree=True
+ )
+ model, info = am.get_fitted()
+ if type(info(name, t_from)) is df.SplitFunction:
+ flat_model = info(name, t_from).flatten()
+ else:
+ flat_model = list()
+ logging.warning(
+ f"Model for {name} {t_from} is {info(name, t_from)}, expected SplitFunction"
+ )
+
+ for prefix, output in flat_model:
+ transition_name = i_to_transition[int(output)]
+ if transition_name not in transition_guard:
+ transition_guard[transition_name] = list()
+ transition_guard[transition_name].append(prefix)
+
+ self.transition_guard[t_from] = transition_guard
+
+ def get_trace(self, name, param_dict):
+ delta = self.delta_by_name[name]
+ current_state = "__init__"
+ trace = [current_state]
+ states_seen = set()
+ while current_state != "__end__":
+ next_states = delta[current_state]
+
+ next_states = list(filter(lambda q: q not in states_seen, next_states))
+
+ if len(next_states) == 0:
+ raise RuntimeError(
+ f"get_trace({name}, {param_dict}): found infinite loop at {trace}"
+ )
+ if len(next_states) > 1:
+ raise RuntimeError(
+ f"get_trace({name}, {param_dict}): found non-deterministic outbound transitions {next_states} at {trace}"
+ )
+
+ (next_state,) = next_states
+
+ trace.append(next_state)
+ states_seen.add(current_state)
+ current_state = next_state
+
+ print(trace)
+ return trace
+
def learn_pta(self, observations, annotation, delta=dict(), delta_param=dict()):
prev_i = annotation.start.offset
prev = "__init__"