summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2019-12-10 13:26:32 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2019-12-10 13:26:32 +0100
commitbd3da8396a3ee4e1768507827158b7cdce317603 (patch)
treeb1565639faeeb23d699d9e49f5028c9fa06c1b47
parentf09f89618b24916cb2e0b2c20bb9464df8e5de2c (diff)
PTA: Use PTAAttribute helper class for static+dynamic attributes
-rwxr-xr-xlib/automata.py180
1 files changed, 109 insertions, 71 deletions
diff --git a/lib/automata.py b/lib/automata.py
index a80796c..38a2645 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -13,36 +13,81 @@ def _dict_to_list(input_dict: dict) -> list:
return [input_dict[x] for x in sorted(input_dict.keys())]
-def _attribute_to_json(static_value: float, param_function: AnalyticFunction) -> dict:
- ret = {
- 'static': static_value
- }
- if param_function:
- ret['function'] = {
- 'raw': param_function._model_str,
- 'regression_args': list(param_function._regression_args)
+class PTAAttribute:
+ def __init__(self, value: float = 0, function: AnalyticFunction = None):
+ self.value = value
+ self.function = function
+
+ def __repr__(self):
+ if self.function is not None:
+ return 'PTAATtribute<{:.0f}, {}>'.format(self.value, self.function._model_str)
+ return 'PTAATtribute<{:.0f}, None>'.format(self.value)
+
+ def eval(self, param_dict=dict(), args=list()):
+ if self.function:
+ return self.function.eval(_dict_to_list(param_dict), args)
+ return self.value
+
+ def to_json(self):
+ ret = {
+ 'static': self.value
}
- return ret
+ if self.function:
+ ret['function'] = {
+ 'raw': self.function._model_str,
+ 'regression_args': list(self.function._regression_args)
+ }
+ return ret
+
+ @classmethod
+ def from_json(cls, json_input: dict, parameters: dict):
+ ret = cls()
+ if 'static' in json_input:
+ ret.value = json_input['static']
+ if 'function' in json_input:
+ ret.function = AnalyticFunction(json_input['function']['raw'], parameters, 0, regression_args=json_input['function']['regression_args'])
+ return ret
+
+ @classmethod
+ def from_json_maybe(cls, json_wrapped: dict, attribute: str, parameters: dict):
+ if attribute in json_wrapped:
+ return cls.from_json(json_wrapped[attribute], parameters)
+ return cls()
+
+
+def _json_function_to_analytic_function(base, attribute: str, parameters: list):
+ if attribute in base and 'function' in base[attribute]:
+ base = base[attribute]['function']
+ return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args'])
+ return None
class State:
"""A single PTA state."""
- def __init__(self, name: str, power: float = 0, power_function: AnalyticFunction = None):
+ def __init__(self, name: str, power: PTAAttribute = PTAAttribute(), power_function: AnalyticFunction = None):
u"""
Create a new PTA state.
:param name: state name
- :param power: static state power in µW, default 0
- :param power_function: parameterized state power in µW, default None
+ :param power: state power PTAAttribute in µW, default static 0 / parameterized None
+ :param power_function: Legacy support
"""
self.name = name
self.power = power
- self.power_function = power_function
self.outgoing_transitions = {}
+ if type(self.power) is float or type(self.power) is int:
+ self.power = PTAAttribute(self.power)
+
+ if power_function is not None:
+ if type(power_function) is AnalyticFunction:
+ self.power.function = power_function
+ else:
+ raise ValueError('power_function must be None or AnalyticFunction')
+
def __repr__(self):
- return 'State<{:s}, {:.0f}, {}>'.format(self.name, self.power, self.power_function)
+ return 'State<{:s}, {}>'.format(self.name, self.power)
def add_outgoing_transition(self, new_transition: object):
"""Add a new outgoing transition."""
@@ -56,13 +101,11 @@ class State:
:param param_dict: current parameters
:returns: energy spent in pJ
"""
- if self.power_function:
- return self.power_function.eval(_dict_to_list(param_dict)) * duration
- return self.power * duration
+ return self.power.eval(param_dict) * duration
def set_random_energy_model(self, static_model=True):
u"""Set a random static state power between 0 µW and 50 mW."""
- self.power = int(np.random.sample() * 50000)
+ self.power.value = int(np.random.sample() * 50000)
def get_transition(self, transition_name: str) -> object:
"""
@@ -181,7 +224,7 @@ class State:
"""Return JSON encoding of this state object."""
ret = {
'name': self.name,
- 'power': _attribute_to_json(self.power, self.power_function)
+ 'power': self.power.to_json()
}
return ret
@@ -211,9 +254,9 @@ class Transition:
"""
def __init__(self, orig_state: State, dest_state: State, name: str,
- energy: float = 0, energy_function: AnalyticFunction = None,
- duration: float = 0, duration_function: AnalyticFunction = None,
- timeout: float = 0, timeout_function: AnalyticFunction = None,
+ energy: PTAAttribute = PTAAttribute(), energy_function: AnalyticFunction = None,
+ duration: PTAAttribute = PTAAttribute(), duration_function: AnalyticFunction = None,
+ timeout: PTAAttribute = PTAAttribute(), timeout_function: AnalyticFunction = None,
is_interrupt: bool = False,
arguments: list = [],
argument_values: list = [],
@@ -234,11 +277,8 @@ class Transition:
self.origin = orig_state
self.destination = dest_state
self.energy = energy
- self.energy_function = energy_function
self.duration = duration
- self.duration_function = duration_function
self.timeout = timeout
- self.timeout_function = timeout_function
self.is_interrupt = is_interrupt
self.arguments = arguments.copy()
self.argument_values = argument_values.copy()
@@ -249,6 +289,24 @@ class Transition:
self.return_value_handlers = return_value_handlers
self.codegen = codegen
+ if type(self.energy) is float or type(self.energy) is int:
+ self.energy = PTAAttribute(self.energy)
+ if energy_function is not None:
+ if type(energy_function) is AnalyticFunction:
+ self.energy.function = energy_function
+
+ if type(self.duration) is float or type(self.duration) is int:
+ self.duration = PTAAttribute(self.duration)
+ if duration_function is not None:
+ if type(duration_function) is AnalyticFunction:
+ self.duration.function = duration_function
+
+ if type(self.timeout) is float or type(self.timeout) is int:
+ self.timeout = PTAAttribute(self.timeout)
+ if timeout_function is not None:
+ if type(timeout_function) is AnalyticFunction:
+ self.timeout.function = timeout_function
+
for handler in self.return_value_handlers:
if 'formula' in handler:
handler['formula'] = NormalizationFunction(handler['formula'])
@@ -262,9 +320,7 @@ class Transition:
:returns: transition duration in µs
"""
- if self.duration_function:
- return self.duration_function.eval(_dict_to_list(param_dict), args)
- return self.duration
+ return self.duration.eval(param_dict, args)
def get_energy(self, param_dict: dict = {}, args: list = []) -> float:
u"""
@@ -273,15 +329,13 @@ class Transition:
:param param_dict: current parameter values
:param args: function arguments
"""
- if self.energy_function:
- return self.energy_function.eval(_dict_to_list(param_dict), args)
- return self.energy
+ return self.energy.eval(param_dict, args)
def set_random_energy_model(self, static_model=True):
- self.energy = int(np.random.sample() * 50000)
- self.duration = int(np.random.sample() * 50000)
+ self.energy.value = int(np.random.sample() * 50000)
+ self.duration.value = int(np.random.sample() * 50000)
if self.is_interrupt:
- self.timeout = int(np.random.sample() * 50000)
+ self.timeout.value = int(np.random.sample() * 50000)
def get_timeout(self, param_dict: dict = {}) -> float:
u"""
@@ -292,9 +346,7 @@ class Transition:
:param param_dict: current parameter values
:param args: function arguments
"""
- if self.timeout_function:
- return self.timeout_function.eval(_dict_to_list(param_dict))
- return self.timeout
+ return self.timeout.eval(param_dict)
def get_params_after_transition(self, param_dict: dict, args: list = []) -> dict:
"""
@@ -328,20 +380,13 @@ class Transition:
'argument_combination': self.argument_combination,
'arg_to_param_map': self.arg_to_param_map,
'set_param': self.set_param,
- 'duration': _attribute_to_json(self.duration, self.duration_function),
- 'energy': _attribute_to_json(self.energy, self.energy_function),
- 'timeout': _attribute_to_json(self.timeout, self.timeout_function),
+ 'duration': self.duration.to_json(),
+ 'energy': self.energy.to_json(),
+ 'timeout': self.timeout.to_json()
}
return ret
-def _json_function_to_analytic_function(base, attribute: str, parameters: list):
- if attribute in base and 'function' in base[attribute]:
- base = base[attribute]['function']
- return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args'])
- return None
-
-
def _json_get_static(base, attribute: str):
if attribute in base:
return base[attribute]['static']
@@ -463,12 +508,8 @@ class PTA:
kwargs[key] = json_input[key]
pta = cls(**kwargs)
for name, state in json_input['state'].items():
- power_function = _json_function_to_analytic_function(state, 'power', pta.parameters)
- pta.add_state(name, power=_json_get_static(state, 'power'), power_function=power_function)
+ pta.add_state(name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters))
for transition in json_input['transitions']:
- duration_function = _json_function_to_analytic_function(transition, 'duration', pta.parameters)
- energy_function = _json_function_to_analytic_function(transition, 'energy', pta.parameters)
- timeout_function = _json_function_to_analytic_function(transition, 'timeout', pta.parameters)
kwargs = dict()
for key in ['arguments', 'arg_to_param_map', 'argument_values', 'argument_combination', 'is_interrupt', 'set_param']:
if key in transition:
@@ -479,12 +520,9 @@ class PTA:
for origin in origins:
pta.add_transition(origin, transition['destination'],
transition['name'],
- duration=_json_get_static(transition, 'duration'),
- duration_function=duration_function,
- energy=_json_get_static(transition, 'energy'),
- energy_function=energy_function,
- timeout=_json_get_static(transition, 'timeout'),
- timeout_function=timeout_function,
+ duration=PTAAttribute.from_json_maybe(transition, 'duration', pta.parameters),
+ energy=PTAAttribute.from_json_maybe(transition, 'energy', pta.parameters),
+ timeout=PTAAttribute.from_json_maybe(transition, 'timeout', pta.parameters),
**kwargs)
return pta
@@ -508,7 +546,7 @@ class PTA:
pta = cls(**kwargs)
for name, state in json_input['state'].items():
- pta.add_state(name, power=float(state['power']['static']))
+ pta.add_state(name, power=PTAAttribute(value=float(state['power']['static'])))
for trans_name in sorted(json_input['transition'].keys()):
transition = json_input['transition'][trans_name]
@@ -700,14 +738,14 @@ class PTA:
def get_most_expensive_state(self):
max_state = None
for state in self.state.values():
- if state.name != 'UNINITIALIZED' and (max_state is None or state.power > max_state.power):
+ if state.name != 'UNINITIALIZED' and (max_state is None or state.power.value > max_state.power.value):
max_state = state
return max_state
def get_least_expensive_state(self):
min_state = None
for state in self.state.values():
- if state.name != 'UNINITIALIZED' and (min_state is None or state.power < min_state.power):
+ if state.name != 'UNINITIALIZED' and (min_state is None or state.power.value < min_state.power.value):
min_state = state
return min_state
@@ -724,7 +762,7 @@ class PTA:
raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate')
# convert from µW to W
- max_power = max_power_state.power * 1e-6
+ max_power = max_power_state.power.value * 1e-6
min_duration = max_energy_value * energy_granularity / max_power
return min_duration
@@ -742,7 +780,7 @@ class PTA:
raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate')
# convert from µW to W
- min_power = min_power_state.power * 1e-6
+ min_power = min_power_state.power.value * 1e-6
max_duration = max_energy_value * energy_granularity / min_power
return max_duration
@@ -920,24 +958,24 @@ class PTA:
for state in self.state.values():
if state.name != 'UNINITIALIZED':
try:
- state.power = static_model(state.name, 'power')
+ state.power.value = static_model(state.name, 'power')
if param_model(state.name, 'power'):
- state.power_function = param_model(state.name, 'power')['function']
+ state.power.function = param_model(state.name, 'power')['function']
except KeyError:
print('[W] skipping model update of state {} due to missing data'.format(state.name))
pass
for transition in self.transitions:
try:
- transition.duration = static_model(transition.name, 'duration')
+ transition.duration.value = static_model(transition.name, 'duration')
if param_model(transition.name, 'duration'):
- transition.duration_function = param_model(transition.name, 'duration')['function']
- transition.energy = static_model(transition.name, 'energy')
+ transition.duration.function = param_model(transition.name, 'duration')['function']
+ transition.energy.value = static_model(transition.name, 'energy')
if param_model(transition.name, 'energy'):
- transition.energy_function = param_model(transition.name, 'energy')['function']
+ transition.energy.function = param_model(transition.name, 'energy')['function']
if transition.is_interrupt:
- transition.timeout = static_model(transition.name, 'timeout')
+ transition.timeout.value = static_model(transition.name, 'timeout')
if param_model(transition.name, 'timeout'):
- transition.timeout_function = param_model(transition.name, 'timeout')['function']
+ transition.timeout.function = param_model(transition.name, 'timeout')['function']
except KeyError:
print('[W] skipping model update of transition {} due to missing data'.format(state.name))
pass