From bd3da8396a3ee4e1768507827158b7cdce317603 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Tue, 10 Dec 2019 13:26:32 +0100 Subject: PTA: Use PTAAttribute helper class for static+dynamic attributes --- lib/automata.py | 180 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 109 insertions(+), 71 deletions(-) (limited to 'lib') diff --git a/lib/automata.py b/lib/automata.py index a80796c..38a2645 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -13,36 +13,81 @@ def _dict_to_list(input_dict: dict) -> list: return [input_dict[x] for x in sorted(input_dict.keys())] -def _attribute_to_json(static_value: float, param_function: AnalyticFunction) -> dict: - ret = { - 'static': static_value - } - if param_function: - ret['function'] = { - 'raw': param_function._model_str, - 'regression_args': list(param_function._regression_args) +class PTAAttribute: + def __init__(self, value: float = 0, function: AnalyticFunction = None): + self.value = value + self.function = function + + def __repr__(self): + if self.function is not None: + return 'PTAATtribute<{:.0f}, {}>'.format(self.value, self.function._model_str) + return 'PTAATtribute<{:.0f}, None>'.format(self.value) + + def eval(self, param_dict=dict(), args=list()): + if self.function: + return self.function.eval(_dict_to_list(param_dict), args) + return self.value + + def to_json(self): + ret = { + 'static': self.value } - return ret + if self.function: + ret['function'] = { + 'raw': self.function._model_str, + 'regression_args': list(self.function._regression_args) + } + return ret + + @classmethod + def from_json(cls, json_input: dict, parameters: dict): + ret = cls() + if 'static' in json_input: + ret.value = json_input['static'] + if 'function' in json_input: + ret.function = AnalyticFunction(json_input['function']['raw'], parameters, 0, regression_args=json_input['function']['regression_args']) + return ret + + @classmethod + def from_json_maybe(cls, json_wrapped: dict, attribute: str, parameters: dict): + if attribute in json_wrapped: + return cls.from_json(json_wrapped[attribute], parameters) + return cls() + + +def _json_function_to_analytic_function(base, attribute: str, parameters: list): + if attribute in base and 'function' in base[attribute]: + base = base[attribute]['function'] + return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args']) + return None class State: """A single PTA state.""" - def __init__(self, name: str, power: float = 0, power_function: AnalyticFunction = None): + def __init__(self, name: str, power: PTAAttribute = PTAAttribute(), power_function: AnalyticFunction = None): u""" Create a new PTA state. :param name: state name - :param power: static state power in µW, default 0 - :param power_function: parameterized state power in µW, default None + :param power: state power PTAAttribute in µW, default static 0 / parameterized None + :param power_function: Legacy support """ self.name = name self.power = power - self.power_function = power_function self.outgoing_transitions = {} + if type(self.power) is float or type(self.power) is int: + self.power = PTAAttribute(self.power) + + if power_function is not None: + if type(power_function) is AnalyticFunction: + self.power.function = power_function + else: + raise ValueError('power_function must be None or AnalyticFunction') + def __repr__(self): - return 'State<{:s}, {:.0f}, {}>'.format(self.name, self.power, self.power_function) + return 'State<{:s}, {}>'.format(self.name, self.power) def add_outgoing_transition(self, new_transition: object): """Add a new outgoing transition.""" @@ -56,13 +101,11 @@ class State: :param param_dict: current parameters :returns: energy spent in pJ """ - if self.power_function: - return self.power_function.eval(_dict_to_list(param_dict)) * duration - return self.power * duration + return self.power.eval(param_dict) * duration def set_random_energy_model(self, static_model=True): u"""Set a random static state power between 0 µW and 50 mW.""" - self.power = int(np.random.sample() * 50000) + self.power.value = int(np.random.sample() * 50000) def get_transition(self, transition_name: str) -> object: """ @@ -181,7 +224,7 @@ class State: """Return JSON encoding of this state object.""" ret = { 'name': self.name, - 'power': _attribute_to_json(self.power, self.power_function) + 'power': self.power.to_json() } return ret @@ -211,9 +254,9 @@ class Transition: """ def __init__(self, orig_state: State, dest_state: State, name: str, - energy: float = 0, energy_function: AnalyticFunction = None, - duration: float = 0, duration_function: AnalyticFunction = None, - timeout: float = 0, timeout_function: AnalyticFunction = None, + energy: PTAAttribute = PTAAttribute(), energy_function: AnalyticFunction = None, + duration: PTAAttribute = PTAAttribute(), duration_function: AnalyticFunction = None, + timeout: PTAAttribute = PTAAttribute(), timeout_function: AnalyticFunction = None, is_interrupt: bool = False, arguments: list = [], argument_values: list = [], @@ -234,11 +277,8 @@ class Transition: self.origin = orig_state self.destination = dest_state self.energy = energy - self.energy_function = energy_function self.duration = duration - self.duration_function = duration_function self.timeout = timeout - self.timeout_function = timeout_function self.is_interrupt = is_interrupt self.arguments = arguments.copy() self.argument_values = argument_values.copy() @@ -249,6 +289,24 @@ class Transition: self.return_value_handlers = return_value_handlers self.codegen = codegen + if type(self.energy) is float or type(self.energy) is int: + self.energy = PTAAttribute(self.energy) + if energy_function is not None: + if type(energy_function) is AnalyticFunction: + self.energy.function = energy_function + + if type(self.duration) is float or type(self.duration) is int: + self.duration = PTAAttribute(self.duration) + if duration_function is not None: + if type(duration_function) is AnalyticFunction: + self.duration.function = duration_function + + if type(self.timeout) is float or type(self.timeout) is int: + self.timeout = PTAAttribute(self.timeout) + if timeout_function is not None: + if type(timeout_function) is AnalyticFunction: + self.timeout.function = timeout_function + for handler in self.return_value_handlers: if 'formula' in handler: handler['formula'] = NormalizationFunction(handler['formula']) @@ -262,9 +320,7 @@ class Transition: :returns: transition duration in µs """ - if self.duration_function: - return self.duration_function.eval(_dict_to_list(param_dict), args) - return self.duration + return self.duration.eval(param_dict, args) def get_energy(self, param_dict: dict = {}, args: list = []) -> float: u""" @@ -273,15 +329,13 @@ class Transition: :param param_dict: current parameter values :param args: function arguments """ - if self.energy_function: - return self.energy_function.eval(_dict_to_list(param_dict), args) - return self.energy + return self.energy.eval(param_dict, args) def set_random_energy_model(self, static_model=True): - self.energy = int(np.random.sample() * 50000) - self.duration = int(np.random.sample() * 50000) + self.energy.value = int(np.random.sample() * 50000) + self.duration.value = int(np.random.sample() * 50000) if self.is_interrupt: - self.timeout = int(np.random.sample() * 50000) + self.timeout.value = int(np.random.sample() * 50000) def get_timeout(self, param_dict: dict = {}) -> float: u""" @@ -292,9 +346,7 @@ class Transition: :param param_dict: current parameter values :param args: function arguments """ - if self.timeout_function: - return self.timeout_function.eval(_dict_to_list(param_dict)) - return self.timeout + return self.timeout.eval(param_dict) def get_params_after_transition(self, param_dict: dict, args: list = []) -> dict: """ @@ -328,20 +380,13 @@ class Transition: 'argument_combination': self.argument_combination, 'arg_to_param_map': self.arg_to_param_map, 'set_param': self.set_param, - 'duration': _attribute_to_json(self.duration, self.duration_function), - 'energy': _attribute_to_json(self.energy, self.energy_function), - 'timeout': _attribute_to_json(self.timeout, self.timeout_function), + 'duration': self.duration.to_json(), + 'energy': self.energy.to_json(), + 'timeout': self.timeout.to_json() } return ret -def _json_function_to_analytic_function(base, attribute: str, parameters: list): - if attribute in base and 'function' in base[attribute]: - base = base[attribute]['function'] - return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args']) - return None - - def _json_get_static(base, attribute: str): if attribute in base: return base[attribute]['static'] @@ -463,12 +508,8 @@ class PTA: kwargs[key] = json_input[key] pta = cls(**kwargs) for name, state in json_input['state'].items(): - power_function = _json_function_to_analytic_function(state, 'power', pta.parameters) - pta.add_state(name, power=_json_get_static(state, 'power'), power_function=power_function) + pta.add_state(name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters)) for transition in json_input['transitions']: - duration_function = _json_function_to_analytic_function(transition, 'duration', pta.parameters) - energy_function = _json_function_to_analytic_function(transition, 'energy', pta.parameters) - timeout_function = _json_function_to_analytic_function(transition, 'timeout', pta.parameters) kwargs = dict() for key in ['arguments', 'arg_to_param_map', 'argument_values', 'argument_combination', 'is_interrupt', 'set_param']: if key in transition: @@ -479,12 +520,9 @@ class PTA: for origin in origins: pta.add_transition(origin, transition['destination'], transition['name'], - duration=_json_get_static(transition, 'duration'), - duration_function=duration_function, - energy=_json_get_static(transition, 'energy'), - energy_function=energy_function, - timeout=_json_get_static(transition, 'timeout'), - timeout_function=timeout_function, + duration=PTAAttribute.from_json_maybe(transition, 'duration', pta.parameters), + energy=PTAAttribute.from_json_maybe(transition, 'energy', pta.parameters), + timeout=PTAAttribute.from_json_maybe(transition, 'timeout', pta.parameters), **kwargs) return pta @@ -508,7 +546,7 @@ class PTA: pta = cls(**kwargs) for name, state in json_input['state'].items(): - pta.add_state(name, power=float(state['power']['static'])) + pta.add_state(name, power=PTAAttribute(value=float(state['power']['static']))) for trans_name in sorted(json_input['transition'].keys()): transition = json_input['transition'][trans_name] @@ -700,14 +738,14 @@ class PTA: def get_most_expensive_state(self): max_state = None for state in self.state.values(): - if state.name != 'UNINITIALIZED' and (max_state is None or state.power > max_state.power): + if state.name != 'UNINITIALIZED' and (max_state is None or state.power.value > max_state.power.value): max_state = state return max_state def get_least_expensive_state(self): min_state = None for state in self.state.values(): - if state.name != 'UNINITIALIZED' and (min_state is None or state.power < min_state.power): + if state.name != 'UNINITIALIZED' and (min_state is None or state.power.value < min_state.power.value): min_state = state return min_state @@ -724,7 +762,7 @@ class PTA: raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate') # convert from µW to W - max_power = max_power_state.power * 1e-6 + max_power = max_power_state.power.value * 1e-6 min_duration = max_energy_value * energy_granularity / max_power return min_duration @@ -742,7 +780,7 @@ class PTA: raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate') # convert from µW to W - min_power = min_power_state.power * 1e-6 + min_power = min_power_state.power.value * 1e-6 max_duration = max_energy_value * energy_granularity / min_power return max_duration @@ -920,24 +958,24 @@ class PTA: for state in self.state.values(): if state.name != 'UNINITIALIZED': try: - state.power = static_model(state.name, 'power') + state.power.value = static_model(state.name, 'power') if param_model(state.name, 'power'): - state.power_function = param_model(state.name, 'power')['function'] + state.power.function = param_model(state.name, 'power')['function'] except KeyError: print('[W] skipping model update of state {} due to missing data'.format(state.name)) pass for transition in self.transitions: try: - transition.duration = static_model(transition.name, 'duration') + transition.duration.value = static_model(transition.name, 'duration') if param_model(transition.name, 'duration'): - transition.duration_function = param_model(transition.name, 'duration')['function'] - transition.energy = static_model(transition.name, 'energy') + transition.duration.function = param_model(transition.name, 'duration')['function'] + transition.energy.value = static_model(transition.name, 'energy') if param_model(transition.name, 'energy'): - transition.energy_function = param_model(transition.name, 'energy')['function'] + transition.energy.function = param_model(transition.name, 'energy')['function'] if transition.is_interrupt: - transition.timeout = static_model(transition.name, 'timeout') + transition.timeout.value = static_model(transition.name, 'timeout') if param_model(transition.name, 'timeout'): - transition.timeout_function = param_model(transition.name, 'timeout')['function'] + transition.timeout.function = param_model(transition.name, 'timeout')['function'] except KeyError: print('[W] skipping model update of transition {} due to missing data'.format(state.name)) pass -- cgit v1.2.3