diff options
Diffstat (limited to 'lib/automata.py')
-rwxr-xr-x | lib/automata.py | 705 |
1 files changed, 475 insertions, 230 deletions
diff --git a/lib/automata.py b/lib/automata.py index b7668c5..b3318e0 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -28,7 +28,15 @@ class SimulationResult: :param mean_power: mean power during run in W """ - def __init__(self, duration: float, energy: float, end_state, parameters, duration_mae: float = None, energy_mae: float = None): + def __init__( + self, + duration: float, + energy: float, + end_state, + parameters, + duration_mae: float = None, + energy_mae: float = None, + ): u""" Create a new SimulationResult. @@ -77,7 +85,13 @@ class PTAAttribute: :param function_error: mean absolute error of function (optional) """ - def __init__(self, value: float = 0, function: AnalyticFunction = None, value_error=None, function_error=None): + def __init__( + self, + value: float = 0, + function: AnalyticFunction = None, + value_error=None, + function_error=None, + ): self.value = value self.function = function self.value_error = value_error @@ -85,8 +99,10 @@ class PTAAttribute: def __repr__(self): if self.function is not None: - return 'PTAATtribute<{:.0f}, {}>'.format(self.value, self.function._model_str) - return 'PTAATtribute<{:.0f}, None>'.format(self.value) + return "PTAATtribute<{:.0f}, {}>".format( + self.value, self.function._model_str + ) + return "PTAATtribute<{:.0f}, None>".format(self.value) def eval(self, param_dict=dict(), args=list()): """ @@ -108,33 +124,38 @@ class PTAAttribute: """ param_list = _dict_to_list(param_dict) if self.function and self.function.is_predictable(param_list): - return self.function_error['mae'] - return self.value_error['mae'] + return self.function_error["mae"] + return self.value_error["mae"] def to_json(self): ret = { - 'static': self.value, - 'static_error': self.value_error, + "static": self.value, + "static_error": self.value_error, } if self.function: - ret['function'] = { - 'raw': self.function._model_str, - 'regression_args': list(self.function._regression_args) + ret["function"] = { + "raw": self.function._model_str, + "regression_args": list(self.function._regression_args), } - ret['function_error'] = self.function_error + ret["function_error"] = self.function_error return ret @classmethod def from_json(cls, json_input: dict, parameters: dict): ret = cls() - if 'static' in json_input: - ret.value = json_input['static'] - if 'static_error' in json_input: - ret.value_error = json_input['static_error'] - if 'function' in json_input: - ret.function = AnalyticFunction(json_input['function']['raw'], parameters, 0, regression_args=json_input['function']['regression_args']) - if 'function_error' in json_input: - ret.function_error = json_input['function_error'] + if "static" in json_input: + ret.value = json_input["static"] + if "static_error" in json_input: + ret.value_error = json_input["static_error"] + if "function" in json_input: + ret.function = AnalyticFunction( + json_input["function"]["raw"], + parameters, + 0, + regression_args=json_input["function"]["regression_args"], + ) + if "function_error" in json_input: + ret.function_error = json_input["function_error"] return ret @classmethod @@ -145,16 +166,23 @@ class PTAAttribute: def _json_function_to_analytic_function(base, attribute: str, parameters: list): - if attribute in base and 'function' in base[attribute]: - base = base[attribute]['function'] - return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args']) + if attribute in base and "function" in base[attribute]: + base = base[attribute]["function"] + return AnalyticFunction( + base["raw"], parameters, 0, regression_args=base["regression_args"] + ) return None class State: """A single PTA state.""" - def __init__(self, name: str, power: PTAAttribute = PTAAttribute(), power_function: AnalyticFunction = None): + def __init__( + self, + name: str, + power: PTAAttribute = PTAAttribute(), + power_function: AnalyticFunction = None, + ): u""" Create a new PTA state. @@ -173,10 +201,10 @@ class State: if type(power_function) is AnalyticFunction: self.power.function = power_function else: - raise ValueError('power_function must be None or AnalyticFunction') + raise ValueError("power_function must be None or AnalyticFunction") def __repr__(self): - return 'State<{:s}, {}>'.format(self.name, self.power) + return "State<{:s}, {}>".format(self.name, self.power) def add_outgoing_transition(self, new_transition: object): """Add a new outgoing transition.""" @@ -206,7 +234,11 @@ class State: try: return self.outgoing_transitions[transition_name] except KeyError: - raise ValueError('State {} has no outgoing transition called {}'.format(self.name, transition_name)) from None + raise ValueError( + "State {} has no outgoing transition called {}".format( + self.name, transition_name + ) + ) from None def has_interrupt_transitions(self) -> bool: """Return whether this state has any outgoing interrupt transitions.""" @@ -224,7 +256,9 @@ class State: :param parameters: current parameter values :returns: Transition object """ - interrupts = filter(lambda x: x.is_interrupt, self.outgoing_transitions.values()) + interrupts = filter( + lambda x: x.is_interrupt, self.outgoing_transitions.values() + ) interrupts = sorted(interrupts, key=lambda x: x.get_timeout(parameters)) return interrupts[0] @@ -246,15 +280,32 @@ class State: # TODO parametergewahrer Trace-Filter, z.B. "setHeaterDuration nur wenn bme680 power mode => FORCED und GAS_ENABLED" # A '$' entry in trace_filter indicates that the trace should (successfully) terminate here regardless of `depth`. - if trace_filter is not None and next(filter(lambda x: x == '$', map(lambda x: x[0], trace_filter)), None) is not None: + if ( + trace_filter is not None + and next( + filter(lambda x: x == "$", map(lambda x: x[0], trace_filter)), None + ) + is not None + ): yield [] # there may be other entries in trace_filter that still yield results. if depth == 0: for trans in self.outgoing_transitions.values(): - if trace_filter is not None and len(list(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)))) == 0: + if ( + trace_filter is not None + and len( + list( + filter( + lambda x: x == trans.name, + map(lambda x: x[0], trace_filter), + ) + ) + ) + == 0 + ): continue if with_arguments: - if trans.argument_combination == 'cartesian': + if trans.argument_combination == "cartesian": for args in itertools.product(*trans.argument_values): if sleep: yield [(None, sleep), (trans, args)] @@ -273,18 +324,35 @@ class State: yield [(trans,)] else: for trans in self.outgoing_transitions.values(): - if trace_filter is not None and next(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)), None) is None: + if ( + trace_filter is not None + and next( + filter( + lambda x: x == trans.name, map(lambda x: x[0], trace_filter) + ), + None, + ) + is None + ): continue if trace_filter is not None: - new_trace_filter = map(lambda x: x[1:], filter(lambda x: x[0] == trans.name, trace_filter)) + new_trace_filter = map( + lambda x: x[1:], + filter(lambda x: x[0] == trans.name, trace_filter), + ) new_trace_filter = list(filter(len, new_trace_filter)) if len(new_trace_filter) == 0: new_trace_filter = None else: new_trace_filter = None - for suffix in trans.destination.dfs(depth - 1, with_arguments=with_arguments, trace_filter=new_trace_filter, sleep=sleep): + for suffix in trans.destination.dfs( + depth - 1, + with_arguments=with_arguments, + trace_filter=new_trace_filter, + sleep=sleep, + ): if with_arguments: - if trans.argument_combination == 'cartesian': + if trans.argument_combination == "cartesian": for args in itertools.product(*trans.argument_values): if sleep: new_suffix = [(None, sleep), (trans, args)] @@ -314,10 +382,7 @@ class State: def to_json(self) -> dict: """Return JSON encoding of this state object.""" - ret = { - 'name': self.name, - 'power': self.power.to_json() - } + ret = {"name": self.name, "power": self.power.to_json()} return ret @@ -345,19 +410,27 @@ class Transition: :param codegen: todo """ - def __init__(self, orig_state: State, dest_state: State, name: str, - energy: PTAAttribute = PTAAttribute(), energy_function: AnalyticFunction = None, - duration: PTAAttribute = PTAAttribute(), duration_function: AnalyticFunction = None, - timeout: PTAAttribute = PTAAttribute(), timeout_function: AnalyticFunction = None, - is_interrupt: bool = False, - arguments: list = [], - argument_values: list = [], - argument_combination: str = 'cartesian', # or 'zip' - param_update_function=None, - arg_to_param_map: dict = None, - set_param=None, - return_value_handlers: list = [], - codegen=dict()): + def __init__( + self, + orig_state: State, + dest_state: State, + name: str, + energy: PTAAttribute = PTAAttribute(), + energy_function: AnalyticFunction = None, + duration: PTAAttribute = PTAAttribute(), + duration_function: AnalyticFunction = None, + timeout: PTAAttribute = PTAAttribute(), + timeout_function: AnalyticFunction = None, + is_interrupt: bool = False, + arguments: list = [], + argument_values: list = [], + argument_combination: str = "cartesian", # or 'zip' + param_update_function=None, + arg_to_param_map: dict = None, + set_param=None, + return_value_handlers: list = [], + codegen=dict(), + ): """ Create a new transition between two PTA states. @@ -400,8 +473,8 @@ class Transition: self.timeout.function = timeout_function for handler in self.return_value_handlers: - if 'formula' in handler: - handler['formula'] = NormalizationFunction(handler['formula']) + if "formula" in handler: + handler["formula"] = NormalizationFunction(handler["formula"]) def get_duration(self, param_dict: dict = {}, args: list = []) -> float: u""" @@ -465,25 +538,25 @@ class Transition: def to_json(self) -> dict: """Return JSON encoding of this transition object.""" ret = { - 'name': self.name, - 'origin': self.origin.name, - 'destination': self.destination.name, - 'is_interrupt': self.is_interrupt, - 'arguments': self.arguments, - 'argument_values': self.argument_values, - 'argument_combination': self.argument_combination, - 'arg_to_param_map': self.arg_to_param_map, - 'set_param': self.set_param, - 'duration': self.duration.to_json(), - 'energy': self.energy.to_json(), - 'timeout': self.timeout.to_json() + "name": self.name, + "origin": self.origin.name, + "destination": self.destination.name, + "is_interrupt": self.is_interrupt, + "arguments": self.arguments, + "argument_values": self.argument_values, + "argument_combination": self.argument_combination, + "arg_to_param_map": self.arg_to_param_map, + "set_param": self.set_param, + "duration": self.duration.to_json(), + "energy": self.energy.to_json(), + "timeout": self.timeout.to_json(), } return ret def _json_get_static(base, attribute: str): if attribute in base: - return base[attribute]['static'] + return base[attribute]["static"] return 0 @@ -505,10 +578,15 @@ class PTA: :param transitions: list of `Transition` objects """ - def __init__(self, state_names: list = [], - accepting_states: list = None, - parameters: list = [], initial_param_values: list = None, - codegen: dict = {}, parameter_normalization: dict = None): + def __init__( + self, + state_names: list = [], + accepting_states: list = None, + parameters: list = [], + initial_param_values: list = None, + codegen: dict = {}, + parameter_normalization: dict = None, + ): """ Return a new PTA object. @@ -524,7 +602,9 @@ class PTA: `enum`: maps enum descriptors (keys) to parameter values. Note that the mapping is not required to correspond to the driver API. `formula`: maps an argument or return value (passed as `param`) to a parameter value. Must be a string describing a valid python lambda function. NumPy is available as `np`. """ - self.state = dict([[state_name, State(state_name)] for state_name in state_names]) + self.state = dict( + [[state_name, State(state_name)] for state_name in state_names] + ) self.accepting_states = accepting_states.copy() if accepting_states else None self.parameters = parameters.copy() self.parameter_normalization = parameter_normalization @@ -535,13 +615,15 @@ class PTA: self.initial_param_values = [None for x in self.parameters] self.transitions = [] - if 'UNINITIALIZED' not in state_names: - self.state['UNINITIALIZED'] = State('UNINITIALIZED') + if "UNINITIALIZED" not in state_names: + self.state["UNINITIALIZED"] = State("UNINITIALIZED") if self.parameter_normalization: for normalization_spec in self.parameter_normalization.values(): - if 'formula' in normalization_spec: - normalization_spec['formula'] = NormalizationFunction(normalization_spec['formula']) + if "formula" in normalization_spec: + normalization_spec["formula"] = NormalizationFunction( + normalization_spec["formula"] + ) def normalize_parameter(self, parameter_name: str, parameter_value) -> float: """ @@ -553,11 +635,23 @@ class PTA: :param parameter_name: parameter name. :param parameter_value: parameter value. """ - if parameter_value is not None and self.parameter_normalization is not None and parameter_name in self.parameter_normalization: - if 'enum' in self.parameter_normalization[parameter_name] and parameter_value in self.parameter_normalization[parameter_name]['enum']: - return self.parameter_normalization[parameter_name]['enum'][parameter_value] - if 'formula' in self.parameter_normalization[parameter_name]: - normalization_formula = self.parameter_normalization[parameter_name]['formula'] + if ( + parameter_value is not None + and self.parameter_normalization is not None + and parameter_name in self.parameter_normalization + ): + if ( + "enum" in self.parameter_normalization[parameter_name] + and parameter_value + in self.parameter_normalization[parameter_name]["enum"] + ): + return self.parameter_normalization[parameter_name]["enum"][ + parameter_value + ] + if "formula" in self.parameter_normalization[parameter_name]: + normalization_formula = self.parameter_normalization[parameter_name][ + "formula" + ] return normalization_formula.eval(parameter_value) return parameter_value @@ -580,8 +674,8 @@ class PTA: @classmethod def from_file(cls, model_file: str): """Return PTA loaded from the provided JSON or YAML file.""" - with open(model_file, 'r') as f: - if '.json' in model_file: + with open(model_file, "r") as f: + if ".json" in model_file: return cls.from_json(json.load(f)) else: return cls.from_yaml(yaml.safe_load(f)) @@ -593,36 +687,58 @@ class PTA: Compatible with the to_json method. """ - if 'transition' in json_input: + if "transition" in json_input: return cls.from_legacy_json(json_input) kwargs = dict() - for key in ('state_names', 'parameters', 'initial_param_values', 'accepting_states'): + for key in ( + "state_names", + "parameters", + "initial_param_values", + "accepting_states", + ): if key in json_input: kwargs[key] = json_input[key] pta = cls(**kwargs) - for name, state in json_input['state'].items(): - pta.add_state(name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters)) - for transition in json_input['transitions']: + for name, state in json_input["state"].items(): + pta.add_state( + name, power=PTAAttribute.from_json_maybe(state, "power", pta.parameters) + ) + for transition in json_input["transitions"]: kwargs = dict() - for key in ['arguments', 'argument_values', 'argument_combination', 'is_interrupt', 'set_param']: + for key in [ + "arguments", + "argument_values", + "argument_combination", + "is_interrupt", + "set_param", + ]: if key in transition: kwargs[key] = transition[key] # arg_to_param_map uses integer indices. This is not supported by JSON - if 'arg_to_param_map' in transition: - kwargs['arg_to_param_map'] = dict() - for arg_index, param_name in transition['arg_to_param_map'].items(): - kwargs['arg_to_param_map'][int(arg_index)] = param_name - origins = transition['origin'] + if "arg_to_param_map" in transition: + kwargs["arg_to_param_map"] = dict() + for arg_index, param_name in transition["arg_to_param_map"].items(): + kwargs["arg_to_param_map"][int(arg_index)] = param_name + origins = transition["origin"] if type(origins) != list: origins = [origins] for origin in origins: - pta.add_transition(origin, transition['destination'], - transition['name'], - duration=PTAAttribute.from_json_maybe(transition, 'duration', pta.parameters), - energy=PTAAttribute.from_json_maybe(transition, 'energy', pta.parameters), - timeout=PTAAttribute.from_json_maybe(transition, 'timeout', pta.parameters), - **kwargs) + pta.add_transition( + origin, + transition["destination"], + transition["name"], + duration=PTAAttribute.from_json_maybe( + transition, "duration", pta.parameters + ), + energy=PTAAttribute.from_json_maybe( + transition, "energy", pta.parameters + ), + timeout=PTAAttribute.from_json_maybe( + transition, "timeout", pta.parameters + ), + **kwargs + ) return pta @@ -634,37 +750,45 @@ class PTA: Compatible with the legacy dfatool/perl format. """ kwargs = { - 'parameters': list(), - 'initial_param_values': list(), + "parameters": list(), + "initial_param_values": list(), } - for param in sorted(json_input['parameter'].keys()): - kwargs['parameters'].append(param) - kwargs['initial_param_values'].append(json_input['parameter'][param]['default']) + for param in sorted(json_input["parameter"].keys()): + kwargs["parameters"].append(param) + kwargs["initial_param_values"].append( + json_input["parameter"][param]["default"] + ) pta = cls(**kwargs) - for name, state in json_input['state'].items(): - pta.add_state(name, power=PTAAttribute(value=float(state['power']['static']))) + for name, state in json_input["state"].items(): + pta.add_state( + name, power=PTAAttribute(value=float(state["power"]["static"])) + ) - for trans_name in sorted(json_input['transition'].keys()): - transition = json_input['transition'][trans_name] - destination = transition['destination'] + for trans_name in sorted(json_input["transition"].keys()): + transition = json_input["transition"][trans_name] + destination = transition["destination"] arguments = list() argument_values = list() is_interrupt = False - if transition['level'] == 'epilogue': + if transition["level"] == "epilogue": is_interrupt = True if type(destination) == list: destination = destination[0] - for arg in transition['parameters']: - arguments.append(arg['name']) - argument_values.append(arg['values']) - for origin in transition['origins']: - pta.add_transition(origin, destination, trans_name, - arguments=arguments, - argument_values=argument_values, - is_interrupt=is_interrupt) + for arg in transition["parameters"]: + arguments.append(arg["name"]) + argument_values.append(arg["values"]) + for origin in transition["origins"]: + pta.add_transition( + origin, + destination, + trans_name, + arguments=arguments, + argument_values=argument_values, + is_interrupt=is_interrupt, + ) return pta @@ -674,68 +798,79 @@ class PTA: kwargs = dict() - if 'parameters' in yaml_input: - kwargs['parameters'] = yaml_input['parameters'] + if "parameters" in yaml_input: + kwargs["parameters"] = yaml_input["parameters"] - if 'initial_param_values' in yaml_input: - kwargs['initial_param_values'] = yaml_input['initial_param_values'] + if "initial_param_values" in yaml_input: + kwargs["initial_param_values"] = yaml_input["initial_param_values"] - if 'states' in yaml_input: - kwargs['state_names'] = yaml_input['states'] + if "states" in yaml_input: + kwargs["state_names"] = yaml_input["states"] # else: set to UNINITIALIZED by class constructor - if 'codegen' in yaml_input: - kwargs['codegen'] = yaml_input['codegen'] + if "codegen" in yaml_input: + kwargs["codegen"] = yaml_input["codegen"] - if 'parameter_normalization' in yaml_input: - kwargs['parameter_normalization'] = yaml_input['parameter_normalization'] + if "parameter_normalization" in yaml_input: + kwargs["parameter_normalization"] = yaml_input["parameter_normalization"] pta = cls(**kwargs) - if 'state' in yaml_input: - for state_name, state in yaml_input['state'].items(): - pta.add_state(state_name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters)) + if "state" in yaml_input: + for state_name, state in yaml_input["state"].items(): + pta.add_state( + state_name, + power=PTAAttribute.from_json_maybe(state, "power", pta.parameters), + ) - for trans_name in sorted(yaml_input['transition'].keys()): + for trans_name in sorted(yaml_input["transition"].keys()): kwargs = dict() - transition = yaml_input['transition'][trans_name] + transition = yaml_input["transition"][trans_name] arguments = list() argument_values = list() arg_to_param_map = dict() - if 'arguments' in transition: - for i, argument in enumerate(transition['arguments']): - arguments.append(argument['name']) - argument_values.append(argument['values']) - if 'parameter' in argument: - arg_to_param_map[i] = argument['parameter'] - if 'argument_combination' in transition: - kwargs['argument_combination'] = transition['argument_combination'] - if 'set_param' in transition: - kwargs['set_param'] = transition['set_param'] - if 'is_interrupt' in transition: - kwargs['is_interrupt'] = transition['is_interrupt'] - if 'return_value' in transition: - kwargs['return_value_handlers'] = transition['return_value'] - if 'codegen' in transition: - kwargs['codegen'] = transition['codegen'] - if 'loop' in transition: - for state_name in transition['loop']: - pta.add_transition(state_name, state_name, trans_name, - arguments=arguments, - argument_values=argument_values, - arg_to_param_map=arg_to_param_map, - **kwargs) + if "arguments" in transition: + for i, argument in enumerate(transition["arguments"]): + arguments.append(argument["name"]) + argument_values.append(argument["values"]) + if "parameter" in argument: + arg_to_param_map[i] = argument["parameter"] + if "argument_combination" in transition: + kwargs["argument_combination"] = transition["argument_combination"] + if "set_param" in transition: + kwargs["set_param"] = transition["set_param"] + if "is_interrupt" in transition: + kwargs["is_interrupt"] = transition["is_interrupt"] + if "return_value" in transition: + kwargs["return_value_handlers"] = transition["return_value"] + if "codegen" in transition: + kwargs["codegen"] = transition["codegen"] + if "loop" in transition: + for state_name in transition["loop"]: + pta.add_transition( + state_name, + state_name, + trans_name, + arguments=arguments, + argument_values=argument_values, + arg_to_param_map=arg_to_param_map, + **kwargs + ) else: - if 'src' not in transition: - transition['src'] = ['UNINITIALIZED'] - if 'dst' not in transition: - transition['dst'] = 'UNINITIALIZED' - for origin in transition['src']: - pta.add_transition(origin, transition['dst'], trans_name, - arguments=arguments, - argument_values=argument_values, - arg_to_param_map=arg_to_param_map, - **kwargs) + if "src" not in transition: + transition["src"] = ["UNINITIALIZED"] + if "dst" not in transition: + transition["dst"] = "UNINITIALIZED" + for origin in transition["src"]: + pta.add_transition( + origin, + transition["dst"], + trans_name, + arguments=arguments, + argument_values=argument_values, + arg_to_param_map=arg_to_param_map, + **kwargs + ) return pta @@ -746,11 +881,13 @@ class PTA: Compatible with the from_json method. """ ret = { - 'parameters': self.parameters, - 'initial_param_values': self.initial_param_values, - 'state': dict([[state.name, state.to_json()] for state in self.state.values()]), - 'transitions': [trans.to_json() for trans in self.transitions], - 'accepting_states': self.accepting_states, + "parameters": self.parameters, + "initial_param_values": self.initial_param_values, + "state": dict( + [[state.name, state.to_json()] for state in self.state.values()] + ), + "transitions": [trans.to_json() for trans in self.transitions], + "accepting_states": self.accepting_states, } return ret @@ -760,12 +897,19 @@ class PTA: See the State() documentation for acceptable arguments. """ - if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction and kwargs['power_function'] is not None: - kwargs['power_function'] = AnalyticFunction(kwargs['power_function'], - self.parameters, 0) + if ( + "power_function" in kwargs + and type(kwargs["power_function"]) != AnalyticFunction + and kwargs["power_function"] is not None + ): + kwargs["power_function"] = AnalyticFunction( + kwargs["power_function"], self.parameters, 0 + ) self.state[state_name] = State(state_name, **kwargs) - def add_transition(self, orig_state: str, dest_state: str, function_name: str, **kwargs): + def add_transition( + self, orig_state: str, dest_state: str, function_name: str, **kwargs + ): """ Add function_name as new transition from orig_state to dest_state. @@ -776,8 +920,12 @@ class PTA: """ orig_state = self.state[orig_state] dest_state = self.state[dest_state] - for key in ('duration_function', 'energy_function', 'timeout_function'): - if key in kwargs and kwargs[key] is not None and type(kwargs[key]) != AnalyticFunction: + for key in ("duration_function", "energy_function", "timeout_function"): + if ( + key in kwargs + and kwargs[key] is not None + and type(kwargs[key]) != AnalyticFunction + ): kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) new_transition = Transition(orig_state, dest_state, function_name, **kwargs) @@ -824,7 +972,12 @@ class PTA: return self.get_unique_transitions().index(transition) def get_initial_param_dict(self): - return dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))]) + return dict( + [ + [self.parameters[i], self.initial_param_values[i]] + for i in range(len(self.parameters)) + ] + ) def set_random_energy_model(self, static_model=True): u""" @@ -841,18 +994,24 @@ class PTA: def get_most_expensive_state(self): max_state = None for state in self.state.values(): - if state.name != 'UNINITIALIZED' and (max_state is None or state.power.value > max_state.power.value): + if state.name != "UNINITIALIZED" and ( + max_state is None or state.power.value > max_state.power.value + ): max_state = state return max_state def get_least_expensive_state(self): min_state = None for state in self.state.values(): - if state.name != 'UNINITIALIZED' and (min_state is None or state.power.value < min_state.power.value): + if state.name != "UNINITIALIZED" and ( + min_state is None or state.power.value < min_state.power.value + ): min_state = state return min_state - def min_duration_until_energy_overflow(self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1): + def min_duration_until_energy_overflow( + self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1 + ): """ Return minimum duration (in s) until energy counter overflow during online accounting. @@ -862,7 +1021,9 @@ class PTA: max_power_state = self.get_most_expensive_state() if max_power_state.has_interrupt_transitions(): - raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate') + raise RuntimeWarning( + "state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate" + ) # convert from µW to W max_power = max_power_state.power.value * 1e-6 @@ -870,7 +1031,9 @@ class PTA: min_duration = max_energy_value * energy_granularity / max_power return min_duration - def max_duration_until_energy_overflow(self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1): + def max_duration_until_energy_overflow( + self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1 + ): """ Return maximum duration (in s) until energy counter overflow during online accounting. @@ -880,7 +1043,9 @@ class PTA: min_power_state = self.get_least_expensive_state() if min_power_state.has_interrupt_transitions(): - raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate') + raise RuntimeWarning( + "state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate" + ) # convert from µW to W min_power = min_power_state.power.value * 1e-6 @@ -904,14 +1069,19 @@ class PTA: for i, argument in enumerate(transition.arguments): if len(transition.argument_values[i]) <= 2: continue - if transition.argument_combination == 'zip': + if transition.argument_combination == "zip": continue values_are_numeric = True for value in transition.argument_values[i]: - if not is_numeric(self.normalize_parameter(transition.arg_to_param_map[i], value)): + if not is_numeric( + self.normalize_parameter(transition.arg_to_param_map[i], value) + ): values_are_numeric = False if values_are_numeric and len(transition.argument_values[i]) > 2: - transition.argument_values[i] = [transition.argument_values[i][0], transition.argument_values[i][-1]] + transition.argument_values[i] = [ + transition.argument_values[i][0], + transition.argument_values[i][-1], + ] def _dfs_with_param(self, generator, param_dict): for trace in generator: @@ -921,13 +1091,23 @@ class PTA: transition, arguments = elem if transition is not None: param = transition.get_params_after_transition(param, arguments) - ret.append((transition, arguments, self.normalize_parameters(param))) + ret.append( + (transition, arguments, self.normalize_parameters(param)) + ) else: # parameters have already been normalized ret.append((transition, arguments, param)) yield ret - def bfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, transition_filter=None, state_filter=None): + def bfs( + self, + depth: int = 10, + orig_state: str = "UNINITIALIZED", + param_dict: dict = None, + with_parameters: bool = False, + transition_filter=None, + state_filter=None, + ): """ Return a generator object for breadth-first search of traces starting at orig_state. @@ -968,7 +1148,14 @@ class PTA: yield new_trace state_queue.put((new_trace, transition.destination)) - def dfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, **kwargs): + def dfs( + self, + depth: int = 10, + orig_state: str = "UNINITIALIZED", + param_dict: dict = None, + with_parameters: bool = False, + **kwargs + ): """ Return a generator object for depth-first search starting at orig_state. @@ -994,12 +1181,14 @@ class PTA: if with_parameters and not param_dict: param_dict = self.get_initial_param_dict() - if with_parameters and 'with_arguments' not in kwargs: + if with_parameters and "with_arguments" not in kwargs: raise ValueError("with_parameters = True requires with_arguments = True") if self.accepting_states: - generator = filter(lambda x: x[-1][0].destination.name in self.accepting_states, - self.state[orig_state].dfs(depth, **kwargs)) + generator = filter( + lambda x: x[-1][0].destination.name in self.accepting_states, + self.state[orig_state].dfs(depth, **kwargs), + ) else: generator = self.state[orig_state].dfs(depth, **kwargs) @@ -1008,7 +1197,13 @@ class PTA: else: return generator - def simulate(self, trace: list, orig_state: str = 'UNINITIALIZED', orig_param=None, accounting=None): + def simulate( + self, + trace: list, + orig_state: str = "UNINITIALIZED", + orig_param=None, + accounting=None, + ): u""" Simulate a single run through the PTA and return total energy, duration, final state, and resulting parameters. @@ -1021,10 +1216,10 @@ class PTA: :returns: SimulationResult with duration in s, total energy in J, end state, and final parameters """ - total_duration = 0. - total_duration_mae = 0. - total_energy = 0. - total_energy_error = 0. + total_duration = 0.0 + total_duration_mae = 0.0 + total_energy = 0.0 + total_energy_error = 0.0 if type(orig_state) is State: state = orig_state else: @@ -1032,7 +1227,12 @@ class PTA: if orig_param: param_dict = orig_param.copy() else: - param_dict = dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))]) + param_dict = dict( + [ + [self.parameters[i], self.initial_param_values[i]] + for i in range(len(self.parameters)) + ] + ) for function in trace: if isinstance(function[0], Transition): function_name = function[0].name @@ -1040,11 +1240,13 @@ class PTA: else: function_name = function[0] function_args = function[1:] - if function_name is None or function_name == '_': + if function_name is None or function_name == "_": duration = function_args[0] total_energy += state.get_energy(duration, param_dict) if state.power.value_error is not None: - total_energy_error += (duration * state.power.eval_mae(param_dict, function_args))**2 + total_energy_error += ( + duration * state.power.eval_mae(param_dict, function_args) + ) ** 2 total_duration += duration # assumption: sleep is near-exact and does not contribute to the duration error if accounting is not None: @@ -1053,15 +1255,21 @@ class PTA: transition = state.get_transition(function_name) total_duration += transition.duration.eval(param_dict, function_args) if transition.duration.value_error is not None: - total_duration_mae += transition.duration.eval_mae(param_dict, function_args)**2 + total_duration_mae += ( + transition.duration.eval_mae(param_dict, function_args) ** 2 + ) total_energy += transition.get_energy(param_dict, function_args) if transition.energy.value_error is not None: - total_energy_error += transition.energy.eval_mae(param_dict, function_args)**2 - param_dict = transition.get_params_after_transition(param_dict, function_args) + total_energy_error += ( + transition.energy.eval_mae(param_dict, function_args) ** 2 + ) + param_dict = transition.get_params_after_transition( + param_dict, function_args + ) state = transition.destination if accounting is not None: accounting.pass_transition(transition) - while (state.has_interrupt_transitions()): + while state.has_interrupt_transitions(): transition = state.get_next_interrupt(param_dict) duration = transition.get_timeout(param_dict) total_duration += duration @@ -1072,45 +1280,82 @@ class PTA: param_dict = transition.get_params_after_transition(param_dict) state = transition.destination - return SimulationResult(total_duration, total_energy, state, param_dict, duration_mae=np.sqrt(total_duration_mae), energy_mae=np.sqrt(total_energy_error)) + return SimulationResult( + total_duration, + total_energy, + state, + param_dict, + duration_mae=np.sqrt(total_duration_mae), + energy_mae=np.sqrt(total_energy_error), + ) def update(self, static_model, param_model, static_error=None, analytic_error=None): for state in self.state.values(): - if state.name != 'UNINITIALIZED': + if state.name != "UNINITIALIZED": try: - state.power.value = static_model(state.name, 'power') + state.power.value = static_model(state.name, "power") if static_error is not None: - state.power.value_error = static_error[state.name]['power'] - if param_model(state.name, 'power'): - state.power.function = param_model(state.name, 'power')['function'] + state.power.value_error = static_error[state.name]["power"] + if param_model(state.name, "power"): + state.power.function = param_model(state.name, "power")[ + "function" + ] if analytic_error is not None: - state.power.function_error = analytic_error[state.name]['power'] + state.power.function_error = analytic_error[state.name][ + "power" + ] except KeyError: - print('[W] skipping model update of state {} due to missing data'.format(state.name)) + print( + "[W] skipping model update of state {} due to missing data".format( + state.name + ) + ) pass for transition in self.transitions: try: - transition.duration.value = static_model(transition.name, 'duration') - if param_model(transition.name, 'duration'): - transition.duration.function = param_model(transition.name, 'duration')['function'] + transition.duration.value = static_model(transition.name, "duration") + if param_model(transition.name, "duration"): + transition.duration.function = param_model( + transition.name, "duration" + )["function"] if analytic_error is not None: - transition.duration.function_error = analytic_error[transition.name]['duration'] - transition.energy.value = static_model(transition.name, 'energy') - if param_model(transition.name, 'energy'): - transition.energy.function = param_model(transition.name, 'energy')['function'] + transition.duration.function_error = analytic_error[ + transition.name + ]["duration"] + transition.energy.value = static_model(transition.name, "energy") + if param_model(transition.name, "energy"): + transition.energy.function = param_model(transition.name, "energy")[ + "function" + ] if analytic_error is not None: - transition.energy.function_error = analytic_error[transition.name]['energy'] + transition.energy.function_error = analytic_error[ + transition.name + ]["energy"] if transition.is_interrupt: - transition.timeout.value = static_model(transition.name, 'timeout') - if param_model(transition.name, 'timeout'): - transition.timeout.function = param_model(transition.name, 'timeout')['function'] + transition.timeout.value = static_model(transition.name, "timeout") + if param_model(transition.name, "timeout"): + transition.timeout.function = param_model( + transition.name, "timeout" + )["function"] if analytic_error is not None: - transition.timeout.function_error = analytic_error[transition.name]['timeout'] + transition.timeout.function_error = analytic_error[ + transition.name + ]["timeout"] if static_error is not None: - transition.duration.value_error = static_error[transition.name]['duration'] - transition.energy.value_error = static_error[transition.name]['energy'] - transition.timeout.value_error = static_error[transition.name]['timeout'] + transition.duration.value_error = static_error[transition.name][ + "duration" + ] + transition.energy.value_error = static_error[transition.name][ + "energy" + ] + transition.timeout.value_error = static_error[transition.name][ + "timeout" + ] except KeyError: - print('[W] skipping model update of transition {} due to missing data'.format(transition.name)) + print( + "[W] skipping model update of transition {} due to missing data".format( + transition.name + ) + ) pass |