diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2019-11-21 08:17:04 +0100 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2019-11-21 08:17:04 +0100 |
commit | 0d4616db3975919c46b73cfbd4c6054b94e55aa6 (patch) | |
tree | ce3b4eaadb62583d155b804113008794ab2535fd /lib/automata.py | |
parent | 68c54f846d941ab2bb7367c5ac5dad091b5fde9f (diff) |
flake8
Diffstat (limited to 'lib/automata.py')
-rwxr-xr-x | lib/automata.py | 164 |
1 files changed, 86 insertions, 78 deletions
diff --git a/lib/automata.py b/lib/automata.py index 73c0225..4aa9a97 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -4,27 +4,30 @@ from functions import AnalyticFunction, NormalizationFunction from utils import is_numeric import itertools import numpy as np -import json, yaml +import json +import yaml + def _dict_to_list(input_dict: dict) -> list: return [input_dict[x] for x in sorted(input_dict.keys())] + def _attribute_to_json(static_value: float, param_function: AnalyticFunction) -> dict: ret = { - 'static' : static_value + 'static': static_value } if param_function: ret['function'] = { - 'raw' : param_function._model_str, - 'regression_args' : list(param_function._regression_args) + 'raw': param_function._model_str, + 'regression_args': list(param_function._regression_args) } return ret + class State: """A single PTA state.""" - def __init__(self, name: str, power: float = 0, - power_function: AnalyticFunction = None): + def __init__(self, name: str, power: float = 0, power_function: AnalyticFunction = None): u""" Create a new PTA state. @@ -53,7 +56,7 @@ class State: return self.power_function.eval(_dict_to_list(param_dict)) * duration return self.power * duration - def set_random_energy_model(self, static_model = True): + def set_random_energy_model(self, static_model=True): """Set a random static energy value.""" self.power = int(np.random.sample() * 50000) @@ -83,10 +86,10 @@ class State: :returns: Transition object """ interrupts = filter(lambda x: x.is_interrupt, self.outgoing_transitions.values()) - interrupts = sorted(interrupts, key = lambda x: x.get_timeout(parameters)) + interrupts = sorted(interrupts, key=lambda x: x.get_timeout(parameters)) return interrupts[0] - def dfs(self, depth: int, with_arguments: bool = False, trace_filter = None, sleep: int = 0): + def dfs(self, depth: int, with_arguments=False, trace_filter=None, sleep=0): """ Return a generator object for depth-first search over all outgoing transitions. @@ -140,7 +143,7 @@ class State: new_trace_filter = None else: new_trace_filter = None - for suffix in trans.destination.dfs(depth - 1, with_arguments = with_arguments, trace_filter = new_trace_filter, sleep = sleep): + for suffix in trans.destination.dfs(depth - 1, with_arguments=with_arguments, trace_filter=new_trace_filter, sleep=sleep): if with_arguments: if trans.argument_combination == 'cartesian': for args in itertools.product(*trans.argument_values): @@ -173,11 +176,12 @@ class State: def to_json(self) -> dict: """Return JSON encoding of this state object.""" ret = { - 'name' : self.name, - 'power' : _attribute_to_json(self.power, self.power_function) + 'name': self.name, + 'power': _attribute_to_json(self.power, self.power_function) } return ret + class Transition: u""" A single PTA transition with one origin and one destination state. @@ -203,18 +207,18 @@ class Transition: """ def __init__(self, orig_state: State, dest_state: State, name: str, - energy: float = 0, energy_function: AnalyticFunction = None, - duration: float = 0, duration_function: AnalyticFunction = None, - timeout: float = 0, timeout_function: AnalyticFunction = None, - is_interrupt: bool = False, - arguments: list = [], - argument_values: list = [], - argument_combination: str = 'cartesian', # or 'zip' - param_update_function = None, - arg_to_param_map: dict = None, - set_param = None, - return_value_handlers: list = [], - codegen = dict()): + energy: float = 0, energy_function: AnalyticFunction = None, + duration: float = 0, duration_function: AnalyticFunction = None, + timeout: float = 0, timeout_function: AnalyticFunction = None, + is_interrupt: bool = False, + arguments: list = [], + argument_values: list = [], + argument_combination: str = 'cartesian', # or 'zip' + param_update_function=None, + arg_to_param_map: dict = None, + set_param=None, + return_value_handlers: list = [], + codegen=dict()): """ Create a new transition between two PTA states. @@ -245,7 +249,6 @@ class Transition: if 'formula' in handler: handler['formula'] = NormalizationFunction(handler['formula']) - def get_duration(self, param_dict: dict = {}, args: list = []) -> float: u""" Return transition duration in µs. @@ -270,7 +273,7 @@ class Transition: return self.energy_function.eval(_dict_to_list(param_dict), args) return self.energy - def set_random_energy_model(self, static_model = True): + def set_random_energy_model(self, static_model=True): self.energy = int(np.random.sample() * 50000) def get_timeout(self, param_dict: dict = {}) -> float: @@ -309,32 +312,35 @@ class Transition: def to_json(self) -> dict: """Return JSON encoding of this transition object.""" ret = { - 'name' : self.name, - 'origin' : self.origin.name, - 'destination' : self.destination.name, - 'is_interrupt' : self.is_interrupt, - 'arguments' : self.arguments, - 'argument_values' : self.argument_values, - 'argument_combination' : self.argument_combination, - 'arg_to_param_map' : self.arg_to_param_map, - 'set_param' : self.set_param, - 'duration' : _attribute_to_json(self.duration, self.duration_function), - 'energy' : _attribute_to_json(self.energy, self.energy_function), - 'timeout' : _attribute_to_json(self.timeout, self.timeout_function), + 'name': self.name, + 'origin': self.origin.name, + 'destination': self.destination.name, + 'is_interrupt': self.is_interrupt, + 'arguments': self.arguments, + 'argument_values': self.argument_values, + 'argument_combination': self.argument_combination, + 'arg_to_param_map': self.arg_to_param_map, + 'set_param': self.set_param, + 'duration': _attribute_to_json(self.duration, self.duration_function), + 'energy': _attribute_to_json(self.energy, self.energy_function), + 'timeout': _attribute_to_json(self.timeout, self.timeout_function), } return ret + def _json_function_to_analytic_function(base, attribute: str, parameters: list): if attribute in base and 'function' in base[attribute]: base = base[attribute]['function'] - return AnalyticFunction(base['raw'], parameters, 0, regression_args = base['regression_args']) + return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args']) return None + def _json_get_static(base, attribute: str): if attribute in base: return base[attribute]['static'] return 0 + class PTA: """ A parameterized priced timed automaton. All states are accepting. @@ -354,9 +360,9 @@ class PTA: """ def __init__(self, state_names: list = [], - accepting_states: list = None, - parameters: list = [], initial_param_values: list = None, - codegen: dict = {}, parameter_normalization: dict = None): + accepting_states: list = None, + parameters: list = [], initial_param_values: list = None, + codegen: dict = {}, parameter_normalization: dict = None): """ Return a new PTA object. @@ -383,7 +389,7 @@ class PTA: self.initial_param_values = [None for x in self.parameters] self.transitions = [] - if not 'UNINITIALIZED' in state_names: + if 'UNINITIALIZED' not in state_names: self.state['UNINITIALIZED'] = State('UNINITIALIZED') if self.parameter_normalization: @@ -451,7 +457,7 @@ class PTA: pta = cls(**kwargs) for name, state in json_input['state'].items(): power_function = _json_function_to_analytic_function(state, 'power', pta.parameters) - pta.add_state(name, power = _json_get_static(state, 'power'), power_function = power_function) + pta.add_state(name, power=_json_get_static(state, 'power'), power_function=power_function) for transition in json_input['transitions']: duration_function = _json_function_to_analytic_function(transition, 'duration', pta.parameters) energy_function = _json_function_to_analytic_function(transition, 'energy', pta.parameters) @@ -465,15 +471,14 @@ class PTA: origins = [origins] for origin in origins: pta.add_transition(origin, transition['destination'], - transition['name'], - duration = _json_get_static(transition, 'duration'), - duration_function = duration_function, - energy = _json_get_static(transition, 'energy'), - energy_function = energy_function, - timeout = _json_get_static(transition, 'timeout'), - timeout_function = timeout_function, - **kwargs - ) + transition['name'], + duration=_json_get_static(transition, 'duration'), + duration_function=duration_function, + energy=_json_get_static(transition, 'energy'), + energy_function=energy_function, + timeout=_json_get_static(transition, 'timeout'), + timeout_function=timeout_function, + **kwargs) return pta @@ -485,7 +490,7 @@ class PTA: Compatible with the legacy dfatool/perl format. """ kwargs = { - 'parameters' : list(), + 'parameters': list(), 'initial_param_values': list(), } @@ -496,7 +501,7 @@ class PTA: pta = cls(**kwargs) for name, state in json_input['state'].items(): - pta.add_state(name, power = float(state['power']['static'])) + pta.add_state(name, power=float(state['power']['static'])) for trans_name in sorted(json_input['transition'].keys()): transition = json_input['transition'][trans_name] @@ -513,8 +518,9 @@ class PTA: argument_values.append(arg['values']) for origin in transition['origins']: pta.add_transition(origin, destination, trans_name, - arguments = arguments, argument_values = argument_values, - is_interrupt = is_interrupt) + arguments=arguments, + argument_values=argument_values, + is_interrupt=is_interrupt) return pta @@ -565,19 +571,21 @@ class PTA: if 'loop' in transition: for state_name in transition['loop']: pta.add_transition(state_name, state_name, trans_name, - arguments = arguments, argument_values = argument_values, - arg_to_param_map = arg_to_param_map, - **kwargs) + arguments=arguments, + argument_values=argument_values, + arg_to_param_map=arg_to_param_map, + **kwargs) else: - if not 'src' in transition: + if 'src' not in transition: transition['src'] = ['UNINITIALIZED'] - if not 'dst' in transition: + if 'dst' not in transition: transition['dst'] = 'UNINITIALIZED' for origin in transition['src']: pta.add_transition(origin, transition['dst'], trans_name, - arguments = arguments, argument_values = argument_values, - arg_to_param_map = arg_to_param_map, - **kwargs) + arguments=arguments, + argument_values=argument_values, + arg_to_param_map=arg_to_param_map, + **kwargs) return pta @@ -588,11 +596,11 @@ class PTA: Compatible with the from_json method. """ ret = { - 'parameters' : self.parameters, - 'initial_param_values' : self.initial_param_values, - 'state' : dict([[state.name, state.to_json()] for state in self.state.values()]), - 'transitions' : [trans.to_json() for trans in self.transitions], - 'accepting_states' : self.accepting_states, + 'parameters': self.parameters, + 'initial_param_values': self.initial_param_values, + 'state': dict([[state.name, state.to_json()] for state in self.state.values()]), + 'transitions': [trans.to_json() for trans in self.transitions], + 'accepting_states': self.accepting_states, } return ret @@ -602,9 +610,9 @@ class PTA: See the State() documentation for acceptable arguments. """ - if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction and kwargs['power_function'] != None: + if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction and kwargs['power_function'] is not None: kwargs['power_function'] = AnalyticFunction(kwargs['power_function'], - self.parameters, 0) + self.parameters, 0) self.state[state_name] = State(state_name, **kwargs) def add_transition(self, orig_state: str, dest_state: str, function_name: str, **kwargs): @@ -620,7 +628,7 @@ class PTA: orig_state = self.state[orig_state] dest_state = self.state[dest_state] for key in ('duration_function', 'energy_function', 'timeout_function'): - if key in kwargs and kwargs[key] != None and type(kwargs[key]) != AnalyticFunction: + if key in kwargs and kwargs[key] is not None and type(kwargs[key]) != AnalyticFunction: kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) new_transition = Transition(orig_state, dest_state, function_name, **kwargs) @@ -669,7 +677,7 @@ class PTA: def get_initial_param_dict(self): return dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))]) - def set_random_energy_model(self, static_model = True): + def set_random_energy_model(self, static_model=True): for state in self.state.values(): state.set_random_energy_model(static_model) for transition in self.transitions: @@ -740,12 +748,12 @@ class PTA: if with_parameters and not param_dict: param_dict = self.get_initial_param_dict() - if with_parameters and not 'with_arguments' in kwargs: + if with_parameters and 'with_arguments' not in kwargs: raise ValueError("with_parameters = True requires with_arguments = True") if self.accepting_states: generator = filter(lambda x: x[-1][0].destination.name in self.accepting_states, - self.state[orig_state].dfs(depth, **kwargs)) + self.state[orig_state].dfs(depth, **kwargs)) else: generator = self.state[orig_state].dfs(depth, **kwargs) @@ -754,7 +762,7 @@ class PTA: else: return generator - def simulate(self, trace: list, orig_state: str = 'UNINITIALIZED', accounting = None): + def simulate(self, trace: list, orig_state: str = 'UNINITIALIZED', accounting=None): u""" Simulate a single run through the PTA and return total energy, duration, final state, and resulting parameters. @@ -775,7 +783,7 @@ class PTA: function_args = function[1] else: function_name = function[0] - function_args = function[1 : ] + function_args = function[1:] if function_name is None: duration = function_args[0] total_energy += state.get_energy(duration, param_dict) |