summaryrefslogtreecommitdiff
path: root/lib/behaviour.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/behaviour.py')
-rw-r--r--lib/behaviour.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/lib/behaviour.py b/lib/behaviour.py
index 156da5f..a0ceb95 100644
--- a/lib/behaviour.py
+++ b/lib/behaviour.py
@@ -2,6 +2,8 @@
import logging
from . import utils
+from .model import AnalyticModel
+from . import functions as df
logger = logging.getLogger(__name__)
@@ -35,6 +37,83 @@ class SDKBehaviourModel:
self.meta_observations = meta_observations
self.is_loop = is_loop
+ self.build_transition_guards()
+
+ def build_transition_guards(self):
+ self.transition_guard = dict()
+ for name in sorted(self.delta_by_name.keys()):
+ for t_from, t_to_set in self.delta_by_name[name].items():
+ i_to_transition = dict()
+ delta_param_sets = list()
+ to_names = list()
+ transition_guard = dict()
+
+ if len(t_to_set) > 1:
+ am_tt_by_name = {
+ name: {
+ "attributes": [t_from],
+ "param": list(),
+ t_from: list(),
+ },
+ }
+ for i, t_to in enumerate(sorted(t_to_set)):
+ for param in self.delta_param_by_name[name][(t_from, t_to)]:
+ am_tt_by_name[name]["param"].append(
+ utils.param_dict_to_list(
+ utils.param_str_to_dict(param),
+ self.am_tt_param_names,
+ )
+ )
+ am_tt_by_name[name][t_from].append(i)
+ i_to_transition[i] = t_to
+ am = AnalyticModel(
+ am_tt_by_name, self.am_tt_param_names, force_tree=True
+ )
+ model, info = am.get_fitted()
+ if type(info(name, t_from)) is df.SplitFunction:
+ flat_model = info(name, t_from).flatten()
+ else:
+ flat_model = list()
+ logging.warning(
+ f"Model for {name} {t_from} is {info(name, t_from)}, expected SplitFunction"
+ )
+
+ for prefix, output in flat_model:
+ transition_name = i_to_transition[int(output)]
+ if transition_name not in transition_guard:
+ transition_guard[transition_name] = list()
+ transition_guard[transition_name].append(prefix)
+
+ self.transition_guard[t_from] = transition_guard
+
+ def get_trace(self, name, param_dict):
+ delta = self.delta_by_name[name]
+ current_state = "__init__"
+ trace = [current_state]
+ states_seen = set()
+ while current_state != "__end__":
+ next_states = delta[current_state]
+
+ next_states = list(filter(lambda q: q not in states_seen, next_states))
+
+ if len(next_states) == 0:
+ raise RuntimeError(
+ f"get_trace({name}, {param_dict}): found infinite loop at {trace}"
+ )
+ if len(next_states) > 1:
+ raise RuntimeError(
+ f"get_trace({name}, {param_dict}): found non-deterministic outbound transitions {next_states} at {trace}"
+ )
+
+ (next_state,) = next_states
+
+ trace.append(next_state)
+ states_seen.add(current_state)
+ current_state = next_state
+
+ print(trace)
+ return trace
+
def learn_pta(self, observations, annotation, delta=dict(), delta_param=dict()):
prev_i = annotation.start.offset
prev = "__init__"