diff options
-rwxr-xr-x | bin/analyze-trace.py | 37 | ||||
-rw-r--r-- | lib/behaviour.py | 79 |
2 files changed, 80 insertions, 36 deletions
diff --git a/bin/analyze-trace.py b/bin/analyze-trace.py index f721876..1cc3b89 100755 --- a/bin/analyze-trace.py +++ b/bin/analyze-trace.py @@ -82,47 +82,12 @@ def main(): return "∧".join(map(lambda kv: f"{kv[0]}={kv[1]}", guard)) for name in sorted(delta_by_name.keys()): - delta_cond = dict() for t_from, t_to_set in delta_by_name[name].items(): i_to_transition = dict() delta_param_sets = list() to_names = list() transition_guard = dict() - if len(t_to_set) > 1: - am_tt_by_name = { - name: { - "attributes": [t_from], - "param": list(), - t_from: list(), - }, - } - for i, t_to in enumerate(sorted(t_to_set)): - for param in delta_param_by_name[name][(t_from, t_to)]: - am_tt_by_name[name]["param"].append( - dfatool.utils.param_dict_to_list( - dfatool.utils.param_str_to_dict(param), - am_tt_param_names, - ) - ) - am_tt_by_name[name][t_from].append(i) - i_to_transition[i] = t_to - am = AnalyticModel(am_tt_by_name, am_tt_param_names, force_tree=True) - model, info = am.get_fitted() - if type(info(name, t_from)) is df.SplitFunction: - flat_model = info(name, t_from).flatten() - else: - flat_model = list() - logging.warning( - f"Model for {name} {t_from} is {info(name, t_from)}, expected SplitFunction" - ) - - for prefix, output in flat_model: - transition_name = i_to_transition[int(output)] - if transition_name not in transition_guard: - transition_guard[transition_name] = list() - transition_guard[transition_name].append(prefix) - for t_to in sorted(t_to_set): delta_params = delta_param_by_name[name][(t_from, t_to)] delta_param_sets.append(delta_params) @@ -134,7 +99,7 @@ def main(): print(f"{name} {t_from} → {t_to} →") else: print( - f"{name} {t_from} → {t_to} ({' ∨ '.join(map(format_guard, transition_guard.get(t_to, list()))) or '⊤'})" + f"{name} {t_from} → {t_to} ({' ∨ '.join(map(format_guard, bm.transition_guard[t_from].get(t_to, list()))) or '⊤'})" ) for i in range(len(delta_param_sets)): diff --git a/lib/behaviour.py b/lib/behaviour.py index 156da5f..a0ceb95 100644 --- a/lib/behaviour.py +++ b/lib/behaviour.py @@ -2,6 +2,8 @@ import logging from . import utils +from .model import AnalyticModel +from . import functions as df logger = logging.getLogger(__name__) @@ -35,6 +37,83 @@ class SDKBehaviourModel: self.meta_observations = meta_observations self.is_loop = is_loop + self.build_transition_guards() + + def build_transition_guards(self): + self.transition_guard = dict() + for name in sorted(self.delta_by_name.keys()): + for t_from, t_to_set in self.delta_by_name[name].items(): + i_to_transition = dict() + delta_param_sets = list() + to_names = list() + transition_guard = dict() + + if len(t_to_set) > 1: + am_tt_by_name = { + name: { + "attributes": [t_from], + "param": list(), + t_from: list(), + }, + } + for i, t_to in enumerate(sorted(t_to_set)): + for param in self.delta_param_by_name[name][(t_from, t_to)]: + am_tt_by_name[name]["param"].append( + utils.param_dict_to_list( + utils.param_str_to_dict(param), + self.am_tt_param_names, + ) + ) + am_tt_by_name[name][t_from].append(i) + i_to_transition[i] = t_to + am = AnalyticModel( + am_tt_by_name, self.am_tt_param_names, force_tree=True + ) + model, info = am.get_fitted() + if type(info(name, t_from)) is df.SplitFunction: + flat_model = info(name, t_from).flatten() + else: + flat_model = list() + logging.warning( + f"Model for {name} {t_from} is {info(name, t_from)}, expected SplitFunction" + ) + + for prefix, output in flat_model: + transition_name = i_to_transition[int(output)] + if transition_name not in transition_guard: + transition_guard[transition_name] = list() + transition_guard[transition_name].append(prefix) + + self.transition_guard[t_from] = transition_guard + + def get_trace(self, name, param_dict): + delta = self.delta_by_name[name] + current_state = "__init__" + trace = [current_state] + states_seen = set() + while current_state != "__end__": + next_states = delta[current_state] + + next_states = list(filter(lambda q: q not in states_seen, next_states)) + + if len(next_states) == 0: + raise RuntimeError( + f"get_trace({name}, {param_dict}): found infinite loop at {trace}" + ) + if len(next_states) > 1: + raise RuntimeError( + f"get_trace({name}, {param_dict}): found non-deterministic outbound transitions {next_states} at {trace}" + ) + + (next_state,) = next_states + + trace.append(next_state) + states_seen.add(current_state) + current_state = next_state + + print(trace) + return trace + def learn_pta(self, observations, annotation, delta=dict(), delta_param=dict()): prev_i = annotation.start.offset prev = "__init__" |