diff options
Diffstat (limited to 'lib/harness.py')
-rw-r--r-- | lib/harness.py | 489 |
1 files changed, 338 insertions, 151 deletions
diff --git a/lib/harness.py b/lib/harness.py index 54518e3..3b279c0 100644 --- a/lib/harness.py +++ b/lib/harness.py @@ -24,7 +24,16 @@ class TransitionHarness: primitive values (-> set by the return value of the current run, not necessarily constan) * `args`: function arguments, if isa == 'transition' """ - def __init__(self, gpio_pin = None, gpio_mode = 'around', pta = None, log_return_values = False, repeat = 0, post_transition_delay_us = 0): + + def __init__( + self, + gpio_pin=None, + gpio_mode="around", + pta=None, + log_return_values=False, + repeat=0, + post_transition_delay_us=0, + ): """ Create a new TransitionHarness @@ -47,7 +56,14 @@ class TransitionHarness: self.reset() def copy(self): - new_object = __class__(gpio_pin = self.gpio_pin, gpio_mode = self.gpio_mode, pta = self.pta, log_return_values = self.log_return_values, repeat = self.repeat, post_transition_delay_us = self.post_transition_delay_us) + new_object = __class__( + gpio_pin=self.gpio_pin, + gpio_mode=self.gpio_mode, + pta=self.pta, + log_return_values=self.log_return_values, + repeat=self.repeat, + post_transition_delay_us=self.post_transition_delay_us, + ) new_object.traces = self.traces.copy() new_object.trace_id = self.trace_id return new_object @@ -62,12 +78,16 @@ class TransitionHarness: of the current benchmark iteration. Resets `done` and `synced`, """ for trace in self.traces: - for state_or_transition in trace['trace']: - if 'return_values' in state_or_transition: - state_or_transition['return_values'] = state_or_transition['return_values'][:undo_from] - for param_name in state_or_transition['parameter'].keys(): - if type(state_or_transition['parameter'][param_name]) is list: - state_or_transition['parameter'][param_name] = state_or_transition['parameter'][param_name][:undo_from] + for state_or_transition in trace["trace"]: + if "return_values" in state_or_transition: + state_or_transition["return_values"] = state_or_transition[ + "return_values" + ][:undo_from] + for param_name in state_or_transition["parameter"].keys(): + if type(state_or_transition["parameter"][param_name]) is list: + state_or_transition["parameter"][ + param_name + ] = state_or_transition["parameter"][param_name][:undo_from] def reset(self): """ @@ -95,33 +115,32 @@ class TransitionHarness: def global_code(self): """Return global (pre-`main()`) C++ code needed for tracing.""" - ret = '' + ret = "" if self.gpio_pin != None: - ret += '#define PTALOG_GPIO {}\n'.format(self.gpio_pin) - if self.gpio_mode == 'before': - ret += '#define PTALOG_GPIO_BEFORE\n' - elif self.gpio_mode == 'bar': - ret += '#define PTALOG_GPIO_BAR\n' + ret += "#define PTALOG_GPIO {}\n".format(self.gpio_pin) + if self.gpio_mode == "before": + ret += "#define PTALOG_GPIO_BEFORE\n" + elif self.gpio_mode == "bar": + ret += "#define PTALOG_GPIO_BAR\n" if self.log_return_values: - ret += '#define PTALOG_WITH_RETURNVALUES\n' - ret += 'uint16_t transition_return_value;\n' + ret += "#define PTALOG_WITH_RETURNVALUES\n" + ret += "uint16_t transition_return_value;\n" ret += '#include "object/ptalog.h"\n' if self.gpio_pin != None: - ret += 'PTALog ptalog({});\n'.format(self.gpio_pin) + ret += "PTALog ptalog({});\n".format(self.gpio_pin) else: - ret += 'PTALog ptalog;\n' + ret += "PTALog ptalog;\n" return ret - def start_benchmark(self, benchmark_id = 0): + def start_benchmark(self, benchmark_id=0): """Return C++ code to signal benchmark start to harness.""" - return 'ptalog.startBenchmark({:d});\n'.format(benchmark_id) + return "ptalog.startBenchmark({:d});\n".format(benchmark_id) def start_trace(self): """Prepare a new trace/run in the internal `.traces` structure.""" - self.traces.append({ - 'id' : self.trace_id, - 'trace' : list(), - }) + self.traces.append( + {"id": self.trace_id, "trace": list(),} + ) self.trace_id += 1 def append_state(self, state_name, param): @@ -131,13 +150,11 @@ class TransitionHarness: :param state_name: state name :param param: parameter dict """ - self.traces[-1]['trace'].append({ - 'name': state_name, - 'isa': 'state', - 'parameter': param, - }) + self.traces[-1]["trace"].append( + {"name": state_name, "isa": "state", "parameter": param,} + ) - def append_transition(self, transition_name, param, args = []): + def append_transition(self, transition_name, param, args=[]): """ Append a transition to the current run in the internal `.traces` structure. @@ -145,122 +162,188 @@ class TransitionHarness: :param param: parameter dict :param args: function arguments (optional) """ - self.traces[-1]['trace'].append({ - 'name': transition_name, - 'isa': 'transition', - 'parameter': param, - 'args' : args, - }) + self.traces[-1]["trace"].append( + { + "name": transition_name, + "isa": "transition", + "parameter": param, + "args": args, + } + ) def start_run(self): """Return C++ code used to start a new run/trace.""" - return 'ptalog.reset();\n' + return "ptalog.reset();\n" def _pass_transition_call(self, transition_id): - if self.gpio_mode == 'bar': - barcode_bits = Code128('T{}'.format(transition_id), charset='B').modules + if self.gpio_mode == "bar": + barcode_bits = Code128("T{}".format(transition_id), charset="B").modules if len(barcode_bits) % 8 != 0: barcode_bits.extend([1] * (8 - (len(barcode_bits) % 8))) - barcode_bytes = [255 - int("".join(map(str, reversed(barcode_bits[i:i+8]))), 2) for i in range(0, len(barcode_bits), 8)] - inline_array = "".join(map(lambda s: '\\x{:02x}'.format(s), barcode_bytes)) - return 'ptalog.startTransition("{}", {});\n'.format(inline_array, len(barcode_bytes)) + barcode_bytes = [ + 255 - int("".join(map(str, reversed(barcode_bits[i : i + 8]))), 2) + for i in range(0, len(barcode_bits), 8) + ] + inline_array = "".join(map(lambda s: "\\x{:02x}".format(s), barcode_bytes)) + return 'ptalog.startTransition("{}", {});\n'.format( + inline_array, len(barcode_bytes) + ) else: - return 'ptalog.startTransition();\n' + return "ptalog.startTransition();\n" - def pass_transition(self, transition_id, transition_code, transition: object = None): + def pass_transition( + self, transition_id, transition_code, transition: object = None + ): """ Return C++ code used to pass a transition, including the corresponding function call. Tracks which transition has been executed and optionally its return value. May also inject a delay, if `post_transition_delay_us` is set. """ - ret = 'ptalog.passTransition({:d});\n'.format(transition_id) + ret = "ptalog.passTransition({:d});\n".format(transition_id) ret += self._pass_transition_call(transition_id) - if self.log_return_values and transition and len(transition.return_value_handlers): - ret += 'transition_return_value = {}\n'.format(transition_code) - ret += 'ptalog.logReturn(transition_return_value);\n' + if ( + self.log_return_values + and transition + and len(transition.return_value_handlers) + ): + ret += "transition_return_value = {}\n".format(transition_code) + ret += "ptalog.logReturn(transition_return_value);\n" else: - ret += '{}\n'.format(transition_code) + ret += "{}\n".format(transition_code) if self.post_transition_delay_us: - ret += 'arch.delay_us({});\n'.format(self.post_transition_delay_us) - ret += 'ptalog.stopTransition();\n' + ret += "arch.delay_us({});\n".format(self.post_transition_delay_us) + ret += "ptalog.stopTransition();\n" return ret - def stop_run(self, num_traces = 0): - return 'ptalog.dump({:d});\n'.format(num_traces) + def stop_run(self, num_traces=0): + return "ptalog.dump({:d});\n".format(num_traces) def stop_benchmark(self): - return 'ptalog.stopBenchmark();\n' + return "ptalog.stopBenchmark();\n" - def _append_nondeterministic_parameter_value(self, log_data_target, parameter_name, parameter_value): - if log_data_target['parameter'][parameter_name] is None: - log_data_target['parameter'][parameter_name] = list() - log_data_target['parameter'][parameter_name].append(parameter_value) + def _append_nondeterministic_parameter_value( + self, log_data_target, parameter_name, parameter_value + ): + if log_data_target["parameter"][parameter_name] is None: + log_data_target["parameter"][parameter_name] = list() + log_data_target["parameter"][parameter_name].append(parameter_value) def parser_cb(self, line): - #print('[HARNESS] got line {}'.format(line)) - if re.match(r'\[PTA\] benchmark stop', line): + # print('[HARNESS] got line {}'.format(line)) + if re.match(r"\[PTA\] benchmark stop", line): self.repetitions += 1 self.synced = False if self.repeat > 0 and self.repetitions == self.repeat: self.done = True - print('[HARNESS] done') + print("[HARNESS] done") return - if re.match(r'\[PTA\] benchmark start, id=(\S+)', line): + if re.match(r"\[PTA\] benchmark start, id=(\S+)", line): self.synced = True - print('[HARNESS] synced, {}/{}'.format(self.repetitions + 1, self.repeat)) + print("[HARNESS] synced, {}/{}".format(self.repetitions + 1, self.repeat)) if self.synced: - res = re.match(r'\[PTA\] trace=(\S+) count=(\S+)', line) + res = re.match(r"\[PTA\] trace=(\S+) count=(\S+)", line) if res: self.trace_id = int(res.group(1)) self.trace_length = int(res.group(2)) self.current_transition_in_trace = 0 if self.log_return_values: - res = re.match(r'\[PTA\] transition=(\S+) return=(\S+)', line) + res = re.match(r"\[PTA\] transition=(\S+) return=(\S+)", line) else: - res = re.match(r'\[PTA\] transition=(\S+)', line) + res = re.match(r"\[PTA\] transition=(\S+)", line) if res: transition_id = int(res.group(1)) # self.traces contains transitions and states, UART output only contains transitions -> use index * 2 try: - log_data_target = self.traces[self.trace_id]['trace'][self.current_transition_in_trace * 2] + log_data_target = self.traces[self.trace_id]["trace"][ + self.current_transition_in_trace * 2 + ] except IndexError: transition_name = None if self.pta: transition_name = self.pta.transitions[transition_id].name - print('[HARNESS] benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}, name {}) is out of bounds'.format(0, self.trace_id, self.current_transition_in_trace, transition_id, transition_name)) - print(' Offending line: {}'.format(line)) + print( + "[HARNESS] benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}, name {}) is out of bounds".format( + 0, + self.trace_id, + self.current_transition_in_trace, + transition_id, + transition_name, + ) + ) + print(" Offending line: {}".format(line)) return - if log_data_target['isa'] != 'transition': + if log_data_target["isa"] != "transition": self.abort = True - raise RuntimeError('Log mismatch: Expected transition, got {:s}'.format(log_data_target['isa'])) + raise RuntimeError( + "Log mismatch: Expected transition, got {:s}".format( + log_data_target["isa"] + ) + ) if self.pta: transition = self.pta.transitions[transition_id] - if transition.name != log_data_target['name']: + if transition.name != log_data_target["name"]: self.abort = True - raise RuntimeError('Log mismatch: Expected transition {:s}, got transition {:s} -- may have been caused by preceding malformed UART output'.format(log_data_target['name'], transition.name)) + raise RuntimeError( + "Log mismatch: Expected transition {:s}, got transition {:s} -- may have been caused by preceding malformed UART output".format( + log_data_target["name"], transition.name + ) + ) if self.log_return_values and len(transition.return_value_handlers): for handler in transition.return_value_handlers: - if 'parameter' in handler: + if "parameter" in handler: parameter_value = return_value = int(res.group(2)) - if 'return_values' not in log_data_target: - log_data_target['return_values'] = list() - log_data_target['return_values'].append(return_value) - - if 'formula' in handler: - parameter_value = handler['formula'].eval(return_value) - - self._append_nondeterministic_parameter_value(log_data_target, handler['parameter'], parameter_value) - for following_log_data_target in self.traces[self.trace_id]['trace'][(self.current_transition_in_trace * 2 + 1) :]: - self._append_nondeterministic_parameter_value(following_log_data_target, handler['parameter'], parameter_value) - if 'apply_from' in handler and any(map(lambda x: x['name'] == handler['apply_from'], self.traces[self.trace_id]['trace'][: (self.current_transition_in_trace * 2 + 1)])): - for preceding_log_data_target in reversed(self.traces[self.trace_id]['trace'][: (self.current_transition_in_trace * 2)]): - self._append_nondeterministic_parameter_value(preceding_log_data_target, handler['parameter'], parameter_value) - if preceding_log_data_target['name'] == handler['apply_from']: + if "return_values" not in log_data_target: + log_data_target["return_values"] = list() + log_data_target["return_values"].append(return_value) + + if "formula" in handler: + parameter_value = handler["formula"].eval( + return_value + ) + + self._append_nondeterministic_parameter_value( + log_data_target, + handler["parameter"], + parameter_value, + ) + for following_log_data_target in self.traces[ + self.trace_id + ]["trace"][ + (self.current_transition_in_trace * 2 + 1) : + ]: + self._append_nondeterministic_parameter_value( + following_log_data_target, + handler["parameter"], + parameter_value, + ) + if "apply_from" in handler and any( + map( + lambda x: x["name"] == handler["apply_from"], + self.traces[self.trace_id]["trace"][ + : (self.current_transition_in_trace * 2 + 1) + ], + ) + ): + for preceding_log_data_target in reversed( + self.traces[self.trace_id]["trace"][ + : (self.current_transition_in_trace * 2) + ] + ): + self._append_nondeterministic_parameter_value( + preceding_log_data_target, + handler["parameter"], + parameter_value, + ) + if ( + preceding_log_data_target["name"] + == handler["apply_from"] + ): break self.current_transition_in_trace += 1 + class OnboardTimerHarness(TransitionHarness): """TODO @@ -271,13 +354,25 @@ class OnboardTimerHarness(TransitionHarness): benchmark iteration. I.e. `.traces[*]['trace'][*]['offline_aggregates']['duration'] = [..., ...]` """ + def __init__(self, counter_limits, **kwargs): super().__init__(**kwargs) self.trace_length = 0 - self.one_cycle_in_us, self.one_overflow_in_us, self.counter_max_overflow = counter_limits + ( + self.one_cycle_in_us, + self.one_overflow_in_us, + self.counter_max_overflow, + ) = counter_limits def copy(self): - new_harness = __class__((self.one_cycle_in_us, self.one_overflow_in_us, self.counter_max_overflow), gpio_pin = self.gpio_pin, gpio_mode = self.gpio_mode, pta = self.pta, log_return_values = self.log_return_values, repeat = self.repeat) + new_harness = __class__( + (self.one_cycle_in_us, self.one_overflow_in_us, self.counter_max_overflow), + gpio_pin=self.gpio_pin, + gpio_mode=self.gpio_mode, + pta=self.pta, + log_return_values=self.log_return_values, + repeat=self.repeat, + ) new_harness.traces = self.traces.copy() new_harness.trace_id = self.trace_id return new_harness @@ -293,123 +388,215 @@ class OnboardTimerHarness(TransitionHarness): """ super().undo(undo_from) for trace in self.traces: - for state_or_transition in trace['trace']: - if 'offline_aggregates' in state_or_transition: - state_or_transition['offline_aggregates']['duration'] = state_or_transition['offline_aggregates']['duration'][:undo_from] + for state_or_transition in trace["trace"]: + if "offline_aggregates" in state_or_transition: + state_or_transition["offline_aggregates"][ + "duration" + ] = state_or_transition["offline_aggregates"]["duration"][ + :undo_from + ] def global_code(self): ret = '#include "driver/counter.h"\n' - ret += '#define PTALOG_TIMING\n' + ret += "#define PTALOG_TIMING\n" ret += super().global_code() return ret - def start_benchmark(self, benchmark_id = 0): - ret = 'counter.start();\n' - ret += 'counter.stop();\n' - ret += 'ptalog.passNop(counter);\n' + def start_benchmark(self, benchmark_id=0): + ret = "counter.start();\n" + ret += "counter.stop();\n" + ret += "ptalog.passNop(counter);\n" ret += super().start_benchmark(benchmark_id) return ret - def pass_transition(self, transition_id, transition_code, transition: object = None): - ret = 'ptalog.passTransition({:d});\n'.format(transition_id) + def pass_transition( + self, transition_id, transition_code, transition: object = None + ): + ret = "ptalog.passTransition({:d});\n".format(transition_id) ret += self._pass_transition_call(transition_id) - ret += 'counter.start();\n' - if self.log_return_values and transition and len(transition.return_value_handlers): - ret += 'transition_return_value = {}\n'.format(transition_code) + ret += "counter.start();\n" + if ( + self.log_return_values + and transition + and len(transition.return_value_handlers) + ): + ret += "transition_return_value = {}\n".format(transition_code) else: - ret += '{}\n'.format(transition_code) - ret += 'counter.stop();\n' - if self.log_return_values and transition and len(transition.return_value_handlers): - ret += 'ptalog.logReturn(transition_return_value);\n' - ret += 'ptalog.stopTransition(counter);\n' + ret += "{}\n".format(transition_code) + ret += "counter.stop();\n" + if ( + self.log_return_values + and transition + and len(transition.return_value_handlers) + ): + ret += "ptalog.logReturn(transition_return_value);\n" + ret += "ptalog.stopTransition(counter);\n" return ret - def _append_nondeterministic_parameter_value(self, log_data_target, parameter_name, parameter_value): - if log_data_target['parameter'][parameter_name] is None: - log_data_target['parameter'][parameter_name] = list() - log_data_target['parameter'][parameter_name].append(parameter_value) + def _append_nondeterministic_parameter_value( + self, log_data_target, parameter_name, parameter_value + ): + if log_data_target["parameter"][parameter_name] is None: + log_data_target["parameter"][parameter_name] = list() + log_data_target["parameter"][parameter_name].append(parameter_value) def parser_cb(self, line): # print('[HARNESS] got line {}'.format(line)) - res = re.match(r'\[PTA\] nop=(\S+)/(\S+)', line) + res = re.match(r"\[PTA\] nop=(\S+)/(\S+)", line) if res: self.nop_cycles = int(res.group(1)) if int(res.group(2)): - raise RuntimeError('Counter overflow ({:d}/{:d}) during NOP test, wtf?!'.format(res.group(1), res.group(2))) - if re.match(r'\[PTA\] benchmark stop', line): + raise RuntimeError( + "Counter overflow ({:d}/{:d}) during NOP test, wtf?!".format( + res.group(1), res.group(2) + ) + ) + if re.match(r"\[PTA\] benchmark stop", line): self.repetitions += 1 self.synced = False if self.repeat > 0 and self.repetitions == self.repeat: self.done = True - print('[HARNESS] done') + print("[HARNESS] done") return # May be repeated, e.g. if the device is reset shortly after start by # EnergyTrace. - if re.match(r'\[PTA\] benchmark start, id=(\S+)', line): + if re.match(r"\[PTA\] benchmark start, id=(\S+)", line): self.synced = True - print('[HARNESS] synced, {}/{}'.format(self.repetitions + 1, self.repeat)) + print("[HARNESS] synced, {}/{}".format(self.repetitions + 1, self.repeat)) if self.synced: - res = re.match(r'\[PTA\] trace=(\S+) count=(\S+)', line) + res = re.match(r"\[PTA\] trace=(\S+) count=(\S+)", line) if res: self.trace_id = int(res.group(1)) self.trace_length = int(res.group(2)) self.current_transition_in_trace = 0 if self.log_return_values: - res = re.match(r'\[PTA\] transition=(\S+) cycles=(\S+)/(\S+) return=(\S+)', line) + res = re.match( + r"\[PTA\] transition=(\S+) cycles=(\S+)/(\S+) return=(\S+)", line + ) else: - res = re.match(r'\[PTA\] transition=(\S+) cycles=(\S+)/(\S+)', line) + res = re.match(r"\[PTA\] transition=(\S+) cycles=(\S+)/(\S+)", line) if res: transition_id = int(res.group(1)) cycles = int(res.group(2)) overflow = int(res.group(3)) if overflow >= self.counter_max_overflow: self.abort = True - raise RuntimeError('Counter overflow ({:d}/{:d}) in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d})'.format(cycles, overflow, 0, self.trace_id, self.current_transition_in_trace, transition_id)) - duration_us = cycles * self.one_cycle_in_us + overflow * self.one_overflow_in_us - self.nop_cycles * self.one_cycle_in_us + raise RuntimeError( + "Counter overflow ({:d}/{:d}) in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d})".format( + cycles, + overflow, + 0, + self.trace_id, + self.current_transition_in_trace, + transition_id, + ) + ) + duration_us = ( + cycles * self.one_cycle_in_us + + overflow * self.one_overflow_in_us + - self.nop_cycles * self.one_cycle_in_us + ) if duration_us < 0: duration_us = 0 # self.traces contains transitions and states, UART output only contains transitions -> use index * 2 try: - log_data_target = self.traces[self.trace_id]['trace'][self.current_transition_in_trace * 2] + log_data_target = self.traces[self.trace_id]["trace"][ + self.current_transition_in_trace * 2 + ] except IndexError: transition_name = None if self.pta: transition_name = self.pta.transitions[transition_id].name - print('[HARNESS] benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}, name {}) is out of bounds'.format(0, self.trace_id, self.current_transition_in_trace, transition_id, transition_name)) - print(' Offending line: {}'.format(line)) + print( + "[HARNESS] benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}, name {}) is out of bounds".format( + 0, + self.trace_id, + self.current_transition_in_trace, + transition_id, + transition_name, + ) + ) + print(" Offending line: {}".format(line)) return - if log_data_target['isa'] != 'transition': + if log_data_target["isa"] != "transition": self.abort = True - raise RuntimeError('Log mismatch in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}): Expected transition, got {:s}'.format(0, - self.trace_id, self.current_transition_in_trace, transition_id, log_data_target['isa'])) + raise RuntimeError( + "Log mismatch in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}): Expected transition, got {:s}".format( + 0, + self.trace_id, + self.current_transition_in_trace, + transition_id, + log_data_target["isa"], + ) + ) if self.pta: transition = self.pta.transitions[transition_id] - if transition.name != log_data_target['name']: + if transition.name != log_data_target["name"]: self.abort = True - raise RuntimeError('Log mismatch in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}): Expected transition {:s}, got transition {:s} -- may have been caused by preceding maformed UART output'.format(0, self.trace_id, self.current_transition_in_trace, transition_id, log_data_target['name'], transition.name, line)) + raise RuntimeError( + "Log mismatch in benchmark id={:d} trace={:d}: transition #{:d} (ID {:d}): Expected transition {:s}, got transition {:s} -- may have been caused by preceding maformed UART output".format( + 0, + self.trace_id, + self.current_transition_in_trace, + transition_id, + log_data_target["name"], + transition.name, + line, + ) + ) if self.log_return_values and len(transition.return_value_handlers): for handler in transition.return_value_handlers: - if 'parameter' in handler: + if "parameter" in handler: parameter_value = return_value = int(res.group(4)) - if 'return_values' not in log_data_target: - log_data_target['return_values'] = list() - log_data_target['return_values'].append(return_value) - - if 'formula' in handler: - parameter_value = handler['formula'].eval(return_value) - - self._append_nondeterministic_parameter_value(log_data_target, handler['parameter'], parameter_value) - for following_log_data_target in self.traces[self.trace_id]['trace'][(self.current_transition_in_trace * 2 + 1) :]: - self._append_nondeterministic_parameter_value(following_log_data_target, handler['parameter'], parameter_value) - if 'apply_from' in handler and any(map(lambda x: x['name'] == handler['apply_from'], self.traces[self.trace_id]['trace'][: (self.current_transition_in_trace * 2 + 1)])): - for preceding_log_data_target in reversed(self.traces[self.trace_id]['trace'][: (self.current_transition_in_trace * 2)]): - self._append_nondeterministic_parameter_value(preceding_log_data_target, handler['parameter'], parameter_value) - if preceding_log_data_target['name'] == handler['apply_from']: + if "return_values" not in log_data_target: + log_data_target["return_values"] = list() + log_data_target["return_values"].append(return_value) + + if "formula" in handler: + parameter_value = handler["formula"].eval( + return_value + ) + + self._append_nondeterministic_parameter_value( + log_data_target, + handler["parameter"], + parameter_value, + ) + for following_log_data_target in self.traces[ + self.trace_id + ]["trace"][ + (self.current_transition_in_trace * 2 + 1) : + ]: + self._append_nondeterministic_parameter_value( + following_log_data_target, + handler["parameter"], + parameter_value, + ) + if "apply_from" in handler and any( + map( + lambda x: x["name"] == handler["apply_from"], + self.traces[self.trace_id]["trace"][ + : (self.current_transition_in_trace * 2 + 1) + ], + ) + ): + for preceding_log_data_target in reversed( + self.traces[self.trace_id]["trace"][ + : (self.current_transition_in_trace * 2) + ] + ): + self._append_nondeterministic_parameter_value( + preceding_log_data_target, + handler["parameter"], + parameter_value, + ) + if ( + preceding_log_data_target["name"] + == handler["apply_from"] + ): break - if 'offline_aggregates' not in log_data_target: - log_data_target['offline_aggregates'] = { - 'duration' : list() - } - log_data_target['offline_aggregates']['duration'].append(duration_us) + if "offline_aggregates" not in log_data_target: + log_data_target["offline_aggregates"] = {"duration": list()} + log_data_target["offline_aggregates"]["duration"].append(duration_us) self.current_transition_in_trace += 1 |