summaryrefslogtreecommitdiff
path: root/lib/automata.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/automata.py')
-rwxr-xr-xlib/automata.py705
1 files changed, 475 insertions, 230 deletions
diff --git a/lib/automata.py b/lib/automata.py
index b7668c5..b3318e0 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -28,7 +28,15 @@ class SimulationResult:
:param mean_power: mean power during run in W
"""
- def __init__(self, duration: float, energy: float, end_state, parameters, duration_mae: float = None, energy_mae: float = None):
+ def __init__(
+ self,
+ duration: float,
+ energy: float,
+ end_state,
+ parameters,
+ duration_mae: float = None,
+ energy_mae: float = None,
+ ):
u"""
Create a new SimulationResult.
@@ -77,7 +85,13 @@ class PTAAttribute:
:param function_error: mean absolute error of function (optional)
"""
- def __init__(self, value: float = 0, function: AnalyticFunction = None, value_error=None, function_error=None):
+ def __init__(
+ self,
+ value: float = 0,
+ function: AnalyticFunction = None,
+ value_error=None,
+ function_error=None,
+ ):
self.value = value
self.function = function
self.value_error = value_error
@@ -85,8 +99,10 @@ class PTAAttribute:
def __repr__(self):
if self.function is not None:
- return 'PTAATtribute<{:.0f}, {}>'.format(self.value, self.function._model_str)
- return 'PTAATtribute<{:.0f}, None>'.format(self.value)
+ return "PTAATtribute<{:.0f}, {}>".format(
+ self.value, self.function._model_str
+ )
+ return "PTAATtribute<{:.0f}, None>".format(self.value)
def eval(self, param_dict=dict(), args=list()):
"""
@@ -108,33 +124,38 @@ class PTAAttribute:
"""
param_list = _dict_to_list(param_dict)
if self.function and self.function.is_predictable(param_list):
- return self.function_error['mae']
- return self.value_error['mae']
+ return self.function_error["mae"]
+ return self.value_error["mae"]
def to_json(self):
ret = {
- 'static': self.value,
- 'static_error': self.value_error,
+ "static": self.value,
+ "static_error": self.value_error,
}
if self.function:
- ret['function'] = {
- 'raw': self.function._model_str,
- 'regression_args': list(self.function._regression_args)
+ ret["function"] = {
+ "raw": self.function._model_str,
+ "regression_args": list(self.function._regression_args),
}
- ret['function_error'] = self.function_error
+ ret["function_error"] = self.function_error
return ret
@classmethod
def from_json(cls, json_input: dict, parameters: dict):
ret = cls()
- if 'static' in json_input:
- ret.value = json_input['static']
- if 'static_error' in json_input:
- ret.value_error = json_input['static_error']
- if 'function' in json_input:
- ret.function = AnalyticFunction(json_input['function']['raw'], parameters, 0, regression_args=json_input['function']['regression_args'])
- if 'function_error' in json_input:
- ret.function_error = json_input['function_error']
+ if "static" in json_input:
+ ret.value = json_input["static"]
+ if "static_error" in json_input:
+ ret.value_error = json_input["static_error"]
+ if "function" in json_input:
+ ret.function = AnalyticFunction(
+ json_input["function"]["raw"],
+ parameters,
+ 0,
+ regression_args=json_input["function"]["regression_args"],
+ )
+ if "function_error" in json_input:
+ ret.function_error = json_input["function_error"]
return ret
@classmethod
@@ -145,16 +166,23 @@ class PTAAttribute:
def _json_function_to_analytic_function(base, attribute: str, parameters: list):
- if attribute in base and 'function' in base[attribute]:
- base = base[attribute]['function']
- return AnalyticFunction(base['raw'], parameters, 0, regression_args=base['regression_args'])
+ if attribute in base and "function" in base[attribute]:
+ base = base[attribute]["function"]
+ return AnalyticFunction(
+ base["raw"], parameters, 0, regression_args=base["regression_args"]
+ )
return None
class State:
"""A single PTA state."""
- def __init__(self, name: str, power: PTAAttribute = PTAAttribute(), power_function: AnalyticFunction = None):
+ def __init__(
+ self,
+ name: str,
+ power: PTAAttribute = PTAAttribute(),
+ power_function: AnalyticFunction = None,
+ ):
u"""
Create a new PTA state.
@@ -173,10 +201,10 @@ class State:
if type(power_function) is AnalyticFunction:
self.power.function = power_function
else:
- raise ValueError('power_function must be None or AnalyticFunction')
+ raise ValueError("power_function must be None or AnalyticFunction")
def __repr__(self):
- return 'State<{:s}, {}>'.format(self.name, self.power)
+ return "State<{:s}, {}>".format(self.name, self.power)
def add_outgoing_transition(self, new_transition: object):
"""Add a new outgoing transition."""
@@ -206,7 +234,11 @@ class State:
try:
return self.outgoing_transitions[transition_name]
except KeyError:
- raise ValueError('State {} has no outgoing transition called {}'.format(self.name, transition_name)) from None
+ raise ValueError(
+ "State {} has no outgoing transition called {}".format(
+ self.name, transition_name
+ )
+ ) from None
def has_interrupt_transitions(self) -> bool:
"""Return whether this state has any outgoing interrupt transitions."""
@@ -224,7 +256,9 @@ class State:
:param parameters: current parameter values
:returns: Transition object
"""
- interrupts = filter(lambda x: x.is_interrupt, self.outgoing_transitions.values())
+ interrupts = filter(
+ lambda x: x.is_interrupt, self.outgoing_transitions.values()
+ )
interrupts = sorted(interrupts, key=lambda x: x.get_timeout(parameters))
return interrupts[0]
@@ -246,15 +280,32 @@ class State:
# TODO parametergewahrer Trace-Filter, z.B. "setHeaterDuration nur wenn bme680 power mode => FORCED und GAS_ENABLED"
# A '$' entry in trace_filter indicates that the trace should (successfully) terminate here regardless of `depth`.
- if trace_filter is not None and next(filter(lambda x: x == '$', map(lambda x: x[0], trace_filter)), None) is not None:
+ if (
+ trace_filter is not None
+ and next(
+ filter(lambda x: x == "$", map(lambda x: x[0], trace_filter)), None
+ )
+ is not None
+ ):
yield []
# there may be other entries in trace_filter that still yield results.
if depth == 0:
for trans in self.outgoing_transitions.values():
- if trace_filter is not None and len(list(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)))) == 0:
+ if (
+ trace_filter is not None
+ and len(
+ list(
+ filter(
+ lambda x: x == trans.name,
+ map(lambda x: x[0], trace_filter),
+ )
+ )
+ )
+ == 0
+ ):
continue
if with_arguments:
- if trans.argument_combination == 'cartesian':
+ if trans.argument_combination == "cartesian":
for args in itertools.product(*trans.argument_values):
if sleep:
yield [(None, sleep), (trans, args)]
@@ -273,18 +324,35 @@ class State:
yield [(trans,)]
else:
for trans in self.outgoing_transitions.values():
- if trace_filter is not None and next(filter(lambda x: x == trans.name, map(lambda x: x[0], trace_filter)), None) is None:
+ if (
+ trace_filter is not None
+ and next(
+ filter(
+ lambda x: x == trans.name, map(lambda x: x[0], trace_filter)
+ ),
+ None,
+ )
+ is None
+ ):
continue
if trace_filter is not None:
- new_trace_filter = map(lambda x: x[1:], filter(lambda x: x[0] == trans.name, trace_filter))
+ new_trace_filter = map(
+ lambda x: x[1:],
+ filter(lambda x: x[0] == trans.name, trace_filter),
+ )
new_trace_filter = list(filter(len, new_trace_filter))
if len(new_trace_filter) == 0:
new_trace_filter = None
else:
new_trace_filter = None
- for suffix in trans.destination.dfs(depth - 1, with_arguments=with_arguments, trace_filter=new_trace_filter, sleep=sleep):
+ for suffix in trans.destination.dfs(
+ depth - 1,
+ with_arguments=with_arguments,
+ trace_filter=new_trace_filter,
+ sleep=sleep,
+ ):
if with_arguments:
- if trans.argument_combination == 'cartesian':
+ if trans.argument_combination == "cartesian":
for args in itertools.product(*trans.argument_values):
if sleep:
new_suffix = [(None, sleep), (trans, args)]
@@ -314,10 +382,7 @@ class State:
def to_json(self) -> dict:
"""Return JSON encoding of this state object."""
- ret = {
- 'name': self.name,
- 'power': self.power.to_json()
- }
+ ret = {"name": self.name, "power": self.power.to_json()}
return ret
@@ -345,19 +410,27 @@ class Transition:
:param codegen: todo
"""
- def __init__(self, orig_state: State, dest_state: State, name: str,
- energy: PTAAttribute = PTAAttribute(), energy_function: AnalyticFunction = None,
- duration: PTAAttribute = PTAAttribute(), duration_function: AnalyticFunction = None,
- timeout: PTAAttribute = PTAAttribute(), timeout_function: AnalyticFunction = None,
- is_interrupt: bool = False,
- arguments: list = [],
- argument_values: list = [],
- argument_combination: str = 'cartesian', # or 'zip'
- param_update_function=None,
- arg_to_param_map: dict = None,
- set_param=None,
- return_value_handlers: list = [],
- codegen=dict()):
+ def __init__(
+ self,
+ orig_state: State,
+ dest_state: State,
+ name: str,
+ energy: PTAAttribute = PTAAttribute(),
+ energy_function: AnalyticFunction = None,
+ duration: PTAAttribute = PTAAttribute(),
+ duration_function: AnalyticFunction = None,
+ timeout: PTAAttribute = PTAAttribute(),
+ timeout_function: AnalyticFunction = None,
+ is_interrupt: bool = False,
+ arguments: list = [],
+ argument_values: list = [],
+ argument_combination: str = "cartesian", # or 'zip'
+ param_update_function=None,
+ arg_to_param_map: dict = None,
+ set_param=None,
+ return_value_handlers: list = [],
+ codegen=dict(),
+ ):
"""
Create a new transition between two PTA states.
@@ -400,8 +473,8 @@ class Transition:
self.timeout.function = timeout_function
for handler in self.return_value_handlers:
- if 'formula' in handler:
- handler['formula'] = NormalizationFunction(handler['formula'])
+ if "formula" in handler:
+ handler["formula"] = NormalizationFunction(handler["formula"])
def get_duration(self, param_dict: dict = {}, args: list = []) -> float:
u"""
@@ -465,25 +538,25 @@ class Transition:
def to_json(self) -> dict:
"""Return JSON encoding of this transition object."""
ret = {
- 'name': self.name,
- 'origin': self.origin.name,
- 'destination': self.destination.name,
- 'is_interrupt': self.is_interrupt,
- 'arguments': self.arguments,
- 'argument_values': self.argument_values,
- 'argument_combination': self.argument_combination,
- 'arg_to_param_map': self.arg_to_param_map,
- 'set_param': self.set_param,
- 'duration': self.duration.to_json(),
- 'energy': self.energy.to_json(),
- 'timeout': self.timeout.to_json()
+ "name": self.name,
+ "origin": self.origin.name,
+ "destination": self.destination.name,
+ "is_interrupt": self.is_interrupt,
+ "arguments": self.arguments,
+ "argument_values": self.argument_values,
+ "argument_combination": self.argument_combination,
+ "arg_to_param_map": self.arg_to_param_map,
+ "set_param": self.set_param,
+ "duration": self.duration.to_json(),
+ "energy": self.energy.to_json(),
+ "timeout": self.timeout.to_json(),
}
return ret
def _json_get_static(base, attribute: str):
if attribute in base:
- return base[attribute]['static']
+ return base[attribute]["static"]
return 0
@@ -505,10 +578,15 @@ class PTA:
:param transitions: list of `Transition` objects
"""
- def __init__(self, state_names: list = [],
- accepting_states: list = None,
- parameters: list = [], initial_param_values: list = None,
- codegen: dict = {}, parameter_normalization: dict = None):
+ def __init__(
+ self,
+ state_names: list = [],
+ accepting_states: list = None,
+ parameters: list = [],
+ initial_param_values: list = None,
+ codegen: dict = {},
+ parameter_normalization: dict = None,
+ ):
"""
Return a new PTA object.
@@ -524,7 +602,9 @@ class PTA:
`enum`: maps enum descriptors (keys) to parameter values. Note that the mapping is not required to correspond to the driver API.
`formula`: maps an argument or return value (passed as `param`) to a parameter value. Must be a string describing a valid python lambda function. NumPy is available as `np`.
"""
- self.state = dict([[state_name, State(state_name)] for state_name in state_names])
+ self.state = dict(
+ [[state_name, State(state_name)] for state_name in state_names]
+ )
self.accepting_states = accepting_states.copy() if accepting_states else None
self.parameters = parameters.copy()
self.parameter_normalization = parameter_normalization
@@ -535,13 +615,15 @@ class PTA:
self.initial_param_values = [None for x in self.parameters]
self.transitions = []
- if 'UNINITIALIZED' not in state_names:
- self.state['UNINITIALIZED'] = State('UNINITIALIZED')
+ if "UNINITIALIZED" not in state_names:
+ self.state["UNINITIALIZED"] = State("UNINITIALIZED")
if self.parameter_normalization:
for normalization_spec in self.parameter_normalization.values():
- if 'formula' in normalization_spec:
- normalization_spec['formula'] = NormalizationFunction(normalization_spec['formula'])
+ if "formula" in normalization_spec:
+ normalization_spec["formula"] = NormalizationFunction(
+ normalization_spec["formula"]
+ )
def normalize_parameter(self, parameter_name: str, parameter_value) -> float:
"""
@@ -553,11 +635,23 @@ class PTA:
:param parameter_name: parameter name.
:param parameter_value: parameter value.
"""
- if parameter_value is not None and self.parameter_normalization is not None and parameter_name in self.parameter_normalization:
- if 'enum' in self.parameter_normalization[parameter_name] and parameter_value in self.parameter_normalization[parameter_name]['enum']:
- return self.parameter_normalization[parameter_name]['enum'][parameter_value]
- if 'formula' in self.parameter_normalization[parameter_name]:
- normalization_formula = self.parameter_normalization[parameter_name]['formula']
+ if (
+ parameter_value is not None
+ and self.parameter_normalization is not None
+ and parameter_name in self.parameter_normalization
+ ):
+ if (
+ "enum" in self.parameter_normalization[parameter_name]
+ and parameter_value
+ in self.parameter_normalization[parameter_name]["enum"]
+ ):
+ return self.parameter_normalization[parameter_name]["enum"][
+ parameter_value
+ ]
+ if "formula" in self.parameter_normalization[parameter_name]:
+ normalization_formula = self.parameter_normalization[parameter_name][
+ "formula"
+ ]
return normalization_formula.eval(parameter_value)
return parameter_value
@@ -580,8 +674,8 @@ class PTA:
@classmethod
def from_file(cls, model_file: str):
"""Return PTA loaded from the provided JSON or YAML file."""
- with open(model_file, 'r') as f:
- if '.json' in model_file:
+ with open(model_file, "r") as f:
+ if ".json" in model_file:
return cls.from_json(json.load(f))
else:
return cls.from_yaml(yaml.safe_load(f))
@@ -593,36 +687,58 @@ class PTA:
Compatible with the to_json method.
"""
- if 'transition' in json_input:
+ if "transition" in json_input:
return cls.from_legacy_json(json_input)
kwargs = dict()
- for key in ('state_names', 'parameters', 'initial_param_values', 'accepting_states'):
+ for key in (
+ "state_names",
+ "parameters",
+ "initial_param_values",
+ "accepting_states",
+ ):
if key in json_input:
kwargs[key] = json_input[key]
pta = cls(**kwargs)
- for name, state in json_input['state'].items():
- pta.add_state(name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters))
- for transition in json_input['transitions']:
+ for name, state in json_input["state"].items():
+ pta.add_state(
+ name, power=PTAAttribute.from_json_maybe(state, "power", pta.parameters)
+ )
+ for transition in json_input["transitions"]:
kwargs = dict()
- for key in ['arguments', 'argument_values', 'argument_combination', 'is_interrupt', 'set_param']:
+ for key in [
+ "arguments",
+ "argument_values",
+ "argument_combination",
+ "is_interrupt",
+ "set_param",
+ ]:
if key in transition:
kwargs[key] = transition[key]
# arg_to_param_map uses integer indices. This is not supported by JSON
- if 'arg_to_param_map' in transition:
- kwargs['arg_to_param_map'] = dict()
- for arg_index, param_name in transition['arg_to_param_map'].items():
- kwargs['arg_to_param_map'][int(arg_index)] = param_name
- origins = transition['origin']
+ if "arg_to_param_map" in transition:
+ kwargs["arg_to_param_map"] = dict()
+ for arg_index, param_name in transition["arg_to_param_map"].items():
+ kwargs["arg_to_param_map"][int(arg_index)] = param_name
+ origins = transition["origin"]
if type(origins) != list:
origins = [origins]
for origin in origins:
- pta.add_transition(origin, transition['destination'],
- transition['name'],
- duration=PTAAttribute.from_json_maybe(transition, 'duration', pta.parameters),
- energy=PTAAttribute.from_json_maybe(transition, 'energy', pta.parameters),
- timeout=PTAAttribute.from_json_maybe(transition, 'timeout', pta.parameters),
- **kwargs)
+ pta.add_transition(
+ origin,
+ transition["destination"],
+ transition["name"],
+ duration=PTAAttribute.from_json_maybe(
+ transition, "duration", pta.parameters
+ ),
+ energy=PTAAttribute.from_json_maybe(
+ transition, "energy", pta.parameters
+ ),
+ timeout=PTAAttribute.from_json_maybe(
+ transition, "timeout", pta.parameters
+ ),
+ **kwargs
+ )
return pta
@@ -634,37 +750,45 @@ class PTA:
Compatible with the legacy dfatool/perl format.
"""
kwargs = {
- 'parameters': list(),
- 'initial_param_values': list(),
+ "parameters": list(),
+ "initial_param_values": list(),
}
- for param in sorted(json_input['parameter'].keys()):
- kwargs['parameters'].append(param)
- kwargs['initial_param_values'].append(json_input['parameter'][param]['default'])
+ for param in sorted(json_input["parameter"].keys()):
+ kwargs["parameters"].append(param)
+ kwargs["initial_param_values"].append(
+ json_input["parameter"][param]["default"]
+ )
pta = cls(**kwargs)
- for name, state in json_input['state'].items():
- pta.add_state(name, power=PTAAttribute(value=float(state['power']['static'])))
+ for name, state in json_input["state"].items():
+ pta.add_state(
+ name, power=PTAAttribute(value=float(state["power"]["static"]))
+ )
- for trans_name in sorted(json_input['transition'].keys()):
- transition = json_input['transition'][trans_name]
- destination = transition['destination']
+ for trans_name in sorted(json_input["transition"].keys()):
+ transition = json_input["transition"][trans_name]
+ destination = transition["destination"]
arguments = list()
argument_values = list()
is_interrupt = False
- if transition['level'] == 'epilogue':
+ if transition["level"] == "epilogue":
is_interrupt = True
if type(destination) == list:
destination = destination[0]
- for arg in transition['parameters']:
- arguments.append(arg['name'])
- argument_values.append(arg['values'])
- for origin in transition['origins']:
- pta.add_transition(origin, destination, trans_name,
- arguments=arguments,
- argument_values=argument_values,
- is_interrupt=is_interrupt)
+ for arg in transition["parameters"]:
+ arguments.append(arg["name"])
+ argument_values.append(arg["values"])
+ for origin in transition["origins"]:
+ pta.add_transition(
+ origin,
+ destination,
+ trans_name,
+ arguments=arguments,
+ argument_values=argument_values,
+ is_interrupt=is_interrupt,
+ )
return pta
@@ -674,68 +798,79 @@ class PTA:
kwargs = dict()
- if 'parameters' in yaml_input:
- kwargs['parameters'] = yaml_input['parameters']
+ if "parameters" in yaml_input:
+ kwargs["parameters"] = yaml_input["parameters"]
- if 'initial_param_values' in yaml_input:
- kwargs['initial_param_values'] = yaml_input['initial_param_values']
+ if "initial_param_values" in yaml_input:
+ kwargs["initial_param_values"] = yaml_input["initial_param_values"]
- if 'states' in yaml_input:
- kwargs['state_names'] = yaml_input['states']
+ if "states" in yaml_input:
+ kwargs["state_names"] = yaml_input["states"]
# else: set to UNINITIALIZED by class constructor
- if 'codegen' in yaml_input:
- kwargs['codegen'] = yaml_input['codegen']
+ if "codegen" in yaml_input:
+ kwargs["codegen"] = yaml_input["codegen"]
- if 'parameter_normalization' in yaml_input:
- kwargs['parameter_normalization'] = yaml_input['parameter_normalization']
+ if "parameter_normalization" in yaml_input:
+ kwargs["parameter_normalization"] = yaml_input["parameter_normalization"]
pta = cls(**kwargs)
- if 'state' in yaml_input:
- for state_name, state in yaml_input['state'].items():
- pta.add_state(state_name, power=PTAAttribute.from_json_maybe(state, 'power', pta.parameters))
+ if "state" in yaml_input:
+ for state_name, state in yaml_input["state"].items():
+ pta.add_state(
+ state_name,
+ power=PTAAttribute.from_json_maybe(state, "power", pta.parameters),
+ )
- for trans_name in sorted(yaml_input['transition'].keys()):
+ for trans_name in sorted(yaml_input["transition"].keys()):
kwargs = dict()
- transition = yaml_input['transition'][trans_name]
+ transition = yaml_input["transition"][trans_name]
arguments = list()
argument_values = list()
arg_to_param_map = dict()
- if 'arguments' in transition:
- for i, argument in enumerate(transition['arguments']):
- arguments.append(argument['name'])
- argument_values.append(argument['values'])
- if 'parameter' in argument:
- arg_to_param_map[i] = argument['parameter']
- if 'argument_combination' in transition:
- kwargs['argument_combination'] = transition['argument_combination']
- if 'set_param' in transition:
- kwargs['set_param'] = transition['set_param']
- if 'is_interrupt' in transition:
- kwargs['is_interrupt'] = transition['is_interrupt']
- if 'return_value' in transition:
- kwargs['return_value_handlers'] = transition['return_value']
- if 'codegen' in transition:
- kwargs['codegen'] = transition['codegen']
- if 'loop' in transition:
- for state_name in transition['loop']:
- pta.add_transition(state_name, state_name, trans_name,
- arguments=arguments,
- argument_values=argument_values,
- arg_to_param_map=arg_to_param_map,
- **kwargs)
+ if "arguments" in transition:
+ for i, argument in enumerate(transition["arguments"]):
+ arguments.append(argument["name"])
+ argument_values.append(argument["values"])
+ if "parameter" in argument:
+ arg_to_param_map[i] = argument["parameter"]
+ if "argument_combination" in transition:
+ kwargs["argument_combination"] = transition["argument_combination"]
+ if "set_param" in transition:
+ kwargs["set_param"] = transition["set_param"]
+ if "is_interrupt" in transition:
+ kwargs["is_interrupt"] = transition["is_interrupt"]
+ if "return_value" in transition:
+ kwargs["return_value_handlers"] = transition["return_value"]
+ if "codegen" in transition:
+ kwargs["codegen"] = transition["codegen"]
+ if "loop" in transition:
+ for state_name in transition["loop"]:
+ pta.add_transition(
+ state_name,
+ state_name,
+ trans_name,
+ arguments=arguments,
+ argument_values=argument_values,
+ arg_to_param_map=arg_to_param_map,
+ **kwargs
+ )
else:
- if 'src' not in transition:
- transition['src'] = ['UNINITIALIZED']
- if 'dst' not in transition:
- transition['dst'] = 'UNINITIALIZED'
- for origin in transition['src']:
- pta.add_transition(origin, transition['dst'], trans_name,
- arguments=arguments,
- argument_values=argument_values,
- arg_to_param_map=arg_to_param_map,
- **kwargs)
+ if "src" not in transition:
+ transition["src"] = ["UNINITIALIZED"]
+ if "dst" not in transition:
+ transition["dst"] = "UNINITIALIZED"
+ for origin in transition["src"]:
+ pta.add_transition(
+ origin,
+ transition["dst"],
+ trans_name,
+ arguments=arguments,
+ argument_values=argument_values,
+ arg_to_param_map=arg_to_param_map,
+ **kwargs
+ )
return pta
@@ -746,11 +881,13 @@ class PTA:
Compatible with the from_json method.
"""
ret = {
- 'parameters': self.parameters,
- 'initial_param_values': self.initial_param_values,
- 'state': dict([[state.name, state.to_json()] for state in self.state.values()]),
- 'transitions': [trans.to_json() for trans in self.transitions],
- 'accepting_states': self.accepting_states,
+ "parameters": self.parameters,
+ "initial_param_values": self.initial_param_values,
+ "state": dict(
+ [[state.name, state.to_json()] for state in self.state.values()]
+ ),
+ "transitions": [trans.to_json() for trans in self.transitions],
+ "accepting_states": self.accepting_states,
}
return ret
@@ -760,12 +897,19 @@ class PTA:
See the State() documentation for acceptable arguments.
"""
- if 'power_function' in kwargs and type(kwargs['power_function']) != AnalyticFunction and kwargs['power_function'] is not None:
- kwargs['power_function'] = AnalyticFunction(kwargs['power_function'],
- self.parameters, 0)
+ if (
+ "power_function" in kwargs
+ and type(kwargs["power_function"]) != AnalyticFunction
+ and kwargs["power_function"] is not None
+ ):
+ kwargs["power_function"] = AnalyticFunction(
+ kwargs["power_function"], self.parameters, 0
+ )
self.state[state_name] = State(state_name, **kwargs)
- def add_transition(self, orig_state: str, dest_state: str, function_name: str, **kwargs):
+ def add_transition(
+ self, orig_state: str, dest_state: str, function_name: str, **kwargs
+ ):
"""
Add function_name as new transition from orig_state to dest_state.
@@ -776,8 +920,12 @@ class PTA:
"""
orig_state = self.state[orig_state]
dest_state = self.state[dest_state]
- for key in ('duration_function', 'energy_function', 'timeout_function'):
- if key in kwargs and kwargs[key] is not None and type(kwargs[key]) != AnalyticFunction:
+ for key in ("duration_function", "energy_function", "timeout_function"):
+ if (
+ key in kwargs
+ and kwargs[key] is not None
+ and type(kwargs[key]) != AnalyticFunction
+ ):
kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0)
new_transition = Transition(orig_state, dest_state, function_name, **kwargs)
@@ -824,7 +972,12 @@ class PTA:
return self.get_unique_transitions().index(transition)
def get_initial_param_dict(self):
- return dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))])
+ return dict(
+ [
+ [self.parameters[i], self.initial_param_values[i]]
+ for i in range(len(self.parameters))
+ ]
+ )
def set_random_energy_model(self, static_model=True):
u"""
@@ -841,18 +994,24 @@ class PTA:
def get_most_expensive_state(self):
max_state = None
for state in self.state.values():
- if state.name != 'UNINITIALIZED' and (max_state is None or state.power.value > max_state.power.value):
+ if state.name != "UNINITIALIZED" and (
+ max_state is None or state.power.value > max_state.power.value
+ ):
max_state = state
return max_state
def get_least_expensive_state(self):
min_state = None
for state in self.state.values():
- if state.name != 'UNINITIALIZED' and (min_state is None or state.power.value < min_state.power.value):
+ if state.name != "UNINITIALIZED" and (
+ min_state is None or state.power.value < min_state.power.value
+ ):
min_state = state
return min_state
- def min_duration_until_energy_overflow(self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1):
+ def min_duration_until_energy_overflow(
+ self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1
+ ):
"""
Return minimum duration (in s) until energy counter overflow during online accounting.
@@ -862,7 +1021,9 @@ class PTA:
max_power_state = self.get_most_expensive_state()
if max_power_state.has_interrupt_transitions():
- raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate')
+ raise RuntimeWarning(
+ "state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate"
+ )
# convert from µW to W
max_power = max_power_state.power.value * 1e-6
@@ -870,7 +1031,9 @@ class PTA:
min_duration = max_energy_value * energy_granularity / max_power
return min_duration
- def max_duration_until_energy_overflow(self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1):
+ def max_duration_until_energy_overflow(
+ self, energy_granularity=1e-12, max_energy_value=2 ** 32 - 1
+ ):
"""
Return maximum duration (in s) until energy counter overflow during online accounting.
@@ -880,7 +1043,9 @@ class PTA:
min_power_state = self.get_least_expensive_state()
if min_power_state.has_interrupt_transitions():
- raise RuntimeWarning('state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate')
+ raise RuntimeWarning(
+ "state with maximum power consumption has outgoing interrupt transitions, results will be inaccurate"
+ )
# convert from µW to W
min_power = min_power_state.power.value * 1e-6
@@ -904,14 +1069,19 @@ class PTA:
for i, argument in enumerate(transition.arguments):
if len(transition.argument_values[i]) <= 2:
continue
- if transition.argument_combination == 'zip':
+ if transition.argument_combination == "zip":
continue
values_are_numeric = True
for value in transition.argument_values[i]:
- if not is_numeric(self.normalize_parameter(transition.arg_to_param_map[i], value)):
+ if not is_numeric(
+ self.normalize_parameter(transition.arg_to_param_map[i], value)
+ ):
values_are_numeric = False
if values_are_numeric and len(transition.argument_values[i]) > 2:
- transition.argument_values[i] = [transition.argument_values[i][0], transition.argument_values[i][-1]]
+ transition.argument_values[i] = [
+ transition.argument_values[i][0],
+ transition.argument_values[i][-1],
+ ]
def _dfs_with_param(self, generator, param_dict):
for trace in generator:
@@ -921,13 +1091,23 @@ class PTA:
transition, arguments = elem
if transition is not None:
param = transition.get_params_after_transition(param, arguments)
- ret.append((transition, arguments, self.normalize_parameters(param)))
+ ret.append(
+ (transition, arguments, self.normalize_parameters(param))
+ )
else:
# parameters have already been normalized
ret.append((transition, arguments, param))
yield ret
- def bfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, transition_filter=None, state_filter=None):
+ def bfs(
+ self,
+ depth: int = 10,
+ orig_state: str = "UNINITIALIZED",
+ param_dict: dict = None,
+ with_parameters: bool = False,
+ transition_filter=None,
+ state_filter=None,
+ ):
"""
Return a generator object for breadth-first search of traces starting at orig_state.
@@ -968,7 +1148,14 @@ class PTA:
yield new_trace
state_queue.put((new_trace, transition.destination))
- def dfs(self, depth: int = 10, orig_state: str = 'UNINITIALIZED', param_dict: dict = None, with_parameters: bool = False, **kwargs):
+ def dfs(
+ self,
+ depth: int = 10,
+ orig_state: str = "UNINITIALIZED",
+ param_dict: dict = None,
+ with_parameters: bool = False,
+ **kwargs
+ ):
"""
Return a generator object for depth-first search starting at orig_state.
@@ -994,12 +1181,14 @@ class PTA:
if with_parameters and not param_dict:
param_dict = self.get_initial_param_dict()
- if with_parameters and 'with_arguments' not in kwargs:
+ if with_parameters and "with_arguments" not in kwargs:
raise ValueError("with_parameters = True requires with_arguments = True")
if self.accepting_states:
- generator = filter(lambda x: x[-1][0].destination.name in self.accepting_states,
- self.state[orig_state].dfs(depth, **kwargs))
+ generator = filter(
+ lambda x: x[-1][0].destination.name in self.accepting_states,
+ self.state[orig_state].dfs(depth, **kwargs),
+ )
else:
generator = self.state[orig_state].dfs(depth, **kwargs)
@@ -1008,7 +1197,13 @@ class PTA:
else:
return generator
- def simulate(self, trace: list, orig_state: str = 'UNINITIALIZED', orig_param=None, accounting=None):
+ def simulate(
+ self,
+ trace: list,
+ orig_state: str = "UNINITIALIZED",
+ orig_param=None,
+ accounting=None,
+ ):
u"""
Simulate a single run through the PTA and return total energy, duration, final state, and resulting parameters.
@@ -1021,10 +1216,10 @@ class PTA:
:returns: SimulationResult with duration in s, total energy in J, end state, and final parameters
"""
- total_duration = 0.
- total_duration_mae = 0.
- total_energy = 0.
- total_energy_error = 0.
+ total_duration = 0.0
+ total_duration_mae = 0.0
+ total_energy = 0.0
+ total_energy_error = 0.0
if type(orig_state) is State:
state = orig_state
else:
@@ -1032,7 +1227,12 @@ class PTA:
if orig_param:
param_dict = orig_param.copy()
else:
- param_dict = dict([[self.parameters[i], self.initial_param_values[i]] for i in range(len(self.parameters))])
+ param_dict = dict(
+ [
+ [self.parameters[i], self.initial_param_values[i]]
+ for i in range(len(self.parameters))
+ ]
+ )
for function in trace:
if isinstance(function[0], Transition):
function_name = function[0].name
@@ -1040,11 +1240,13 @@ class PTA:
else:
function_name = function[0]
function_args = function[1:]
- if function_name is None or function_name == '_':
+ if function_name is None or function_name == "_":
duration = function_args[0]
total_energy += state.get_energy(duration, param_dict)
if state.power.value_error is not None:
- total_energy_error += (duration * state.power.eval_mae(param_dict, function_args))**2
+ total_energy_error += (
+ duration * state.power.eval_mae(param_dict, function_args)
+ ) ** 2
total_duration += duration
# assumption: sleep is near-exact and does not contribute to the duration error
if accounting is not None:
@@ -1053,15 +1255,21 @@ class PTA:
transition = state.get_transition(function_name)
total_duration += transition.duration.eval(param_dict, function_args)
if transition.duration.value_error is not None:
- total_duration_mae += transition.duration.eval_mae(param_dict, function_args)**2
+ total_duration_mae += (
+ transition.duration.eval_mae(param_dict, function_args) ** 2
+ )
total_energy += transition.get_energy(param_dict, function_args)
if transition.energy.value_error is not None:
- total_energy_error += transition.energy.eval_mae(param_dict, function_args)**2
- param_dict = transition.get_params_after_transition(param_dict, function_args)
+ total_energy_error += (
+ transition.energy.eval_mae(param_dict, function_args) ** 2
+ )
+ param_dict = transition.get_params_after_transition(
+ param_dict, function_args
+ )
state = transition.destination
if accounting is not None:
accounting.pass_transition(transition)
- while (state.has_interrupt_transitions()):
+ while state.has_interrupt_transitions():
transition = state.get_next_interrupt(param_dict)
duration = transition.get_timeout(param_dict)
total_duration += duration
@@ -1072,45 +1280,82 @@ class PTA:
param_dict = transition.get_params_after_transition(param_dict)
state = transition.destination
- return SimulationResult(total_duration, total_energy, state, param_dict, duration_mae=np.sqrt(total_duration_mae), energy_mae=np.sqrt(total_energy_error))
+ return SimulationResult(
+ total_duration,
+ total_energy,
+ state,
+ param_dict,
+ duration_mae=np.sqrt(total_duration_mae),
+ energy_mae=np.sqrt(total_energy_error),
+ )
def update(self, static_model, param_model, static_error=None, analytic_error=None):
for state in self.state.values():
- if state.name != 'UNINITIALIZED':
+ if state.name != "UNINITIALIZED":
try:
- state.power.value = static_model(state.name, 'power')
+ state.power.value = static_model(state.name, "power")
if static_error is not None:
- state.power.value_error = static_error[state.name]['power']
- if param_model(state.name, 'power'):
- state.power.function = param_model(state.name, 'power')['function']
+ state.power.value_error = static_error[state.name]["power"]
+ if param_model(state.name, "power"):
+ state.power.function = param_model(state.name, "power")[
+ "function"
+ ]
if analytic_error is not None:
- state.power.function_error = analytic_error[state.name]['power']
+ state.power.function_error = analytic_error[state.name][
+ "power"
+ ]
except KeyError:
- print('[W] skipping model update of state {} due to missing data'.format(state.name))
+ print(
+ "[W] skipping model update of state {} due to missing data".format(
+ state.name
+ )
+ )
pass
for transition in self.transitions:
try:
- transition.duration.value = static_model(transition.name, 'duration')
- if param_model(transition.name, 'duration'):
- transition.duration.function = param_model(transition.name, 'duration')['function']
+ transition.duration.value = static_model(transition.name, "duration")
+ if param_model(transition.name, "duration"):
+ transition.duration.function = param_model(
+ transition.name, "duration"
+ )["function"]
if analytic_error is not None:
- transition.duration.function_error = analytic_error[transition.name]['duration']
- transition.energy.value = static_model(transition.name, 'energy')
- if param_model(transition.name, 'energy'):
- transition.energy.function = param_model(transition.name, 'energy')['function']
+ transition.duration.function_error = analytic_error[
+ transition.name
+ ]["duration"]
+ transition.energy.value = static_model(transition.name, "energy")
+ if param_model(transition.name, "energy"):
+ transition.energy.function = param_model(transition.name, "energy")[
+ "function"
+ ]
if analytic_error is not None:
- transition.energy.function_error = analytic_error[transition.name]['energy']
+ transition.energy.function_error = analytic_error[
+ transition.name
+ ]["energy"]
if transition.is_interrupt:
- transition.timeout.value = static_model(transition.name, 'timeout')
- if param_model(transition.name, 'timeout'):
- transition.timeout.function = param_model(transition.name, 'timeout')['function']
+ transition.timeout.value = static_model(transition.name, "timeout")
+ if param_model(transition.name, "timeout"):
+ transition.timeout.function = param_model(
+ transition.name, "timeout"
+ )["function"]
if analytic_error is not None:
- transition.timeout.function_error = analytic_error[transition.name]['timeout']
+ transition.timeout.function_error = analytic_error[
+ transition.name
+ ]["timeout"]
if static_error is not None:
- transition.duration.value_error = static_error[transition.name]['duration']
- transition.energy.value_error = static_error[transition.name]['energy']
- transition.timeout.value_error = static_error[transition.name]['timeout']
+ transition.duration.value_error = static_error[transition.name][
+ "duration"
+ ]
+ transition.energy.value_error = static_error[transition.name][
+ "energy"
+ ]
+ transition.timeout.value_error = static_error[transition.name][
+ "timeout"
+ ]
except KeyError:
- print('[W] skipping model update of transition {} due to missing data'.format(transition.name))
+ print(
+ "[W] skipping model update of transition {} due to missing data".format(
+ transition.name
+ )
+ )
pass