summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2021-03-04 13:32:36 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2021-03-04 13:32:36 +0100
commit9bf7d10f3310147c7e85330a79da655b9f7a5bad (patch)
tree00283ccf137cd3a87e1d5d08869717a8ffddd4cc
parentf33c69dcaf24ecc7e039dec83a4a5c74908da52f (diff)
PTA State/Transition: Use ModelFunction instead of PTAAttribute
-rwxr-xr-xlib/automata.py196
-rw-r--r--lib/codegen.py4
-rw-r--r--lib/functions.py82
-rw-r--r--lib/modular_arithmetic.py3
-rw-r--r--lib/parameters.py22
-rwxr-xr-xlib/protocol_benchmarks.py34
-rw-r--r--lib/runner.py11
-rw-r--r--lib/size_to_radio_energy.py28
-rwxr-xr-xtest/test_codegen.py57
-rwxr-xr-xtest/test_pta.py253
10 files changed, 288 insertions, 402 deletions
diff --git a/lib/automata.py b/lib/automata.py
index 1e47596..b1e5623 100755
--- a/lib/automata.py
+++ b/lib/automata.py
@@ -1,7 +1,12 @@
#!/usr/bin/env python3
"""Classes and helper functions for PTA and other automata."""
-from .functions import AnalyticFunction, NormalizationFunction
+from .functions import (
+ AnalyticFunction,
+ NormalizationFunction,
+ ModelFunction,
+ StaticFunction,
+)
from .parameters import ModelAttribute
from .utils import is_numeric
import itertools
@@ -14,7 +19,7 @@ import yaml
logger = logging.getLogger(__name__)
-def _dict_to_list(input_dict: dict) -> list:
+def dict_to_list(input_dict: dict) -> list:
return [input_dict[x] for x in sorted(input_dict.keys())]
@@ -74,122 +79,15 @@ class SimulationResult:
self.mean_power = 0
-class PTAAttribute:
- u"""
- A single PTA attribute (e.g. power, duration).
-
- A PTA attribute can be described by a static value and an analytic
- function (depending on parameters and function arguments).
-
- It is not specified how value_error and function_error are determined --
- at the moment, they do not use cross validation.
-
- :param value: static value, typically in µW/µs/pJ
- :param value_error: mean absolute error of value (optional)
- :param function: AnalyticFunction for parameter-aware prediction (optional)
- :param function_error: mean absolute error of function (optional)
- """
-
- def __init__(
- self,
- value: float = 0,
- function: AnalyticFunction = None,
- value_error=None,
- function_error=None,
- ):
- self.value = value
- self.function = function
- self.value_error = value_error
- self.function_error = function_error
-
- def __repr__(self):
- if self.function is not None:
- return "PTAATtribute<{:.0f}, {}>".format(
- self.value, self.function.model_function
- )
- return "PTAATtribute<{:.0f}, None>".format(self.value)
-
- def eval(self, param_dict=dict(), args=list()):
- """
- Return attribute for given `param_dict` and `args` value.
-
- Uses `function` if set and usable for the given `param_dict` and
- `value` otherwise.
- """
- param_list = _dict_to_list(param_dict)
- if self.function and self.function.is_predictable(param_list):
- return self.function.eval(param_list, args)
- return self.value
-
- def eval_mae(self, param_dict=dict(), args=list()):
- """
- Return attribute mean absolute error for given `param_dict` and `args` value.
-
- Uses `function_error` if `function` is set and usable for the given `param_dict` and `value_error` otherwise.
- """
- param_list = _dict_to_list(param_dict)
- if self.function and self.function.is_predictable(param_list):
- return self.function_error["mae"]
- return self.value_error["mae"]
-
- def to_json(self):
- ret = {"static": self.value, "static_error": self.value_error}
- if self.function:
- ret["function"] = {
- "raw": self.function.model_function,
- "regression_args": list(self.function.model_args),
- }
- ret["function_error"] = self.function_error
- return ret
-
- @classmethod
- def from_json(cls, json_input: dict, parameters: dict):
- ret = cls()
- if "static" in json_input:
- ret.value = json_input["static"]
- if "static_error" in json_input:
- ret.value_error = json_input["static_error"]
- if "function" in json_input:
- ret.function = AnalyticFunction(
- json_input["function"]["raw"],
- parameters,
- 0,
- regression_args=json_input["function"]["regression_args"],
- )
- if "function_error" in json_input:
- ret.function_error = json_input["function_error"]
- return ret
-
- @classmethod
- def from_json_maybe(cls, json_wrapped: dict, attribute: str, parameters: dict):
- if type(json_wrapped) is dict and attribute in json_wrapped:
- return cls.from_json(json_wrapped[attribute], parameters)
- return cls()
-
-
-def _json_function_to_analytic_function(base, attribute: str, parameters: list):
- if attribute in base and "function" in base[attribute]:
- base = base[attribute]["function"]
- return AnalyticFunction(
- base["raw"], parameters, 0, regression_args=base["regression_args"]
- )
- return None
-
-
class State:
"""A single PTA state."""
- def __init__(
- self,
- name: str,
- power: PTAAttribute = PTAAttribute(),
- power_function: AnalyticFunction = None,
- ):
+ def __init__(self, name: str, power: ModelFunction = StaticFunction(0)):
u"""
Create a new PTA state.
:param name: state name
- :param power: state power PTAAttribute in µW, default static 0 / parameterized None
+ :param power: state power ModelFunction in µW, default static StaticFunction(0)
:param power_function: Legacy support
"""
self.name = name
@@ -197,13 +95,7 @@ class State:
self.outgoing_transitions = {}
if type(self.power) is float or type(self.power) is int:
- self.power = PTAAttribute(self.power)
-
- if power_function is not None:
- if type(power_function) is AnalyticFunction:
- self.power.function = power_function
- else:
- raise ValueError("power_function must be None or AnalyticFunction")
+ self.power = StaticFunction(self.power)
def __repr__(self):
return "State<{:s}, {}>".format(self.name, self.power)
@@ -220,7 +112,7 @@ class State:
:param param_dict: current parameters
:returns: energy spent in pJ
"""
- return self.power.eval(param_dict) * duration
+ return self.power.eval(dict_to_list(param_dict)) * duration
def set_random_energy_model(self, static_model=True):
u"""Set a random static state power between 0 µW and 50 mW."""
@@ -417,12 +309,9 @@ class Transition:
orig_state: State,
dest_state: State,
name: str,
- energy: PTAAttribute = PTAAttribute(),
- energy_function: AnalyticFunction = None,
- duration: PTAAttribute = PTAAttribute(),
- duration_function: AnalyticFunction = None,
- timeout: PTAAttribute = PTAAttribute(),
- timeout_function: AnalyticFunction = None,
+ energy: ModelFunction = StaticFunction(0),
+ duration: ModelFunction = StaticFunction(0),
+ timeout: ModelFunction = StaticFunction(0),
is_interrupt: bool = False,
arguments: list = [],
argument_values: list = [],
@@ -457,22 +346,13 @@ class Transition:
self.codegen = codegen
if type(self.energy) is float or type(self.energy) is int:
- self.energy = PTAAttribute(self.energy)
- if energy_function is not None:
- if type(energy_function) is AnalyticFunction:
- self.energy.function = energy_function
+ self.energy = StaticFunction(self.energy)
if type(self.duration) is float or type(self.duration) is int:
- self.duration = PTAAttribute(self.duration)
- if duration_function is not None:
- if type(duration_function) is AnalyticFunction:
- self.duration.function = duration_function
+ self.duration = StaticFunction(self.duration)
if type(self.timeout) is float or type(self.timeout) is int:
- self.timeout = PTAAttribute(self.timeout)
- if timeout_function is not None:
- if type(timeout_function) is AnalyticFunction:
- self.timeout.function = timeout_function
+ self.timeout = StaticFunction(self.timeout)
for handler in self.return_value_handlers:
if "formula" in handler:
@@ -487,7 +367,7 @@ class Transition:
:returns: transition duration in µs
"""
- return self.duration.eval(param_dict, args)
+ return self.duration.eval(dict_to_list(param_dict) + args)
def get_energy(self, param_dict: dict = {}, args: list = []) -> float:
u"""
@@ -496,15 +376,14 @@ class Transition:
:param param_dict: current parameter values
:param args: function arguments
"""
- return self.energy.eval(param_dict, args)
+ return self.energy.eval(dict_to_list(param_dict) + args)
def set_random_energy_model(self, static_model=True):
self.energy.value = int(np.random.sample() * 50000)
self.duration.value = int(np.random.sample() * 50000)
- if self.is_interrupt:
- self.timeout.value = int(np.random.sample() * 50000)
+ self.timeout.value = int(np.random.sample() * 50000)
- def get_timeout(self, param_dict: dict = {}) -> float:
+ def get_timeout(self, param_dict: dict = {}, args: list = list()) -> float:
u"""
Return transition timeout in µs.
@@ -513,7 +392,7 @@ class Transition:
:param param_dict: current parameter values
:param args: function arguments
"""
- return self.timeout.eval(param_dict)
+ return self.timeout.eval(dict_to_list(param_dict) + args)
def get_params_after_transition(self, param_dict: dict, args: list = []) -> dict:
"""
@@ -703,9 +582,7 @@ class PTA:
kwargs[key] = json_input[key]
pta = cls(**kwargs)
for name, state in json_input["state"].items():
- pta.add_state(
- name, power=PTAAttribute.from_json_maybe(state, "power", pta.parameters)
- )
+ pta.add_state(name, power=ModelFunction.from_json_maybe(state, "power"))
for transition in json_input["transitions"]:
kwargs = dict()
for key in [
@@ -730,15 +607,9 @@ class PTA:
origin,
transition["destination"],
transition["name"],
- duration=PTAAttribute.from_json_maybe(
- transition, "duration", pta.parameters
- ),
- energy=PTAAttribute.from_json_maybe(
- transition, "energy", pta.parameters
- ),
- timeout=PTAAttribute.from_json_maybe(
- transition, "timeout", pta.parameters
- ),
+ duration=ModelFunction.from_json_maybe(transition, "duration"),
+ energy=ModelFunction.from_json_maybe(transition, "energy"),
+ timeout=ModelFunction.from_json_maybe(transition, "timeout"),
**kwargs
)
@@ -762,9 +633,7 @@ class PTA:
pta = cls(**kwargs)
for name, state in json_input["state"].items():
- pta.add_state(
- name, power=PTAAttribute(value=float(state["power"]["static"]))
- )
+ pta.add_state(name, power=StaticFunction(float(state["power"]["static"])))
for trans_name in sorted(json_input["transition"].keys()):
transition = json_input["transition"][trans_name]
@@ -818,8 +687,7 @@ class PTA:
if "state" in yaml_input:
for state_name, state in yaml_input["state"].items():
pta.add_state(
- state_name,
- power=PTAAttribute.from_json_maybe(state, "power", pta.parameters),
+ state_name, power=ModelFunction.from_json_maybe(state, "power")
)
for trans_name in sorted(yaml_input["transition"].keys()):
@@ -902,7 +770,7 @@ class PTA:
and kwargs["power_function"] is not None
):
kwargs["power_function"] = AnalyticFunction(
- kwargs["power_function"], self.parameters, 0
+ None, kwargs["power_function"], self.parameters, 0
)
self.state[state_name] = State(state_name, **kwargs)
@@ -925,7 +793,7 @@ class PTA:
and kwargs[key] is not None
and type(kwargs[key]) != AnalyticFunction
):
- kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0)
+ kwargs[key] = AnalyticFunction(None, kwargs[key], self.parameters, 0)
new_transition = Transition(orig_state, dest_state, function_name, **kwargs)
self.transitions.append(new_transition)
@@ -1252,7 +1120,9 @@ class PTA:
accounting.sleep(duration)
else:
transition = state.get_transition(function_name)
- total_duration += transition.duration.eval(param_dict, function_args)
+ total_duration += transition.duration.eval(
+ dict_to_list(param_dict) + function_args
+ )
if transition.duration.value_error is not None:
total_duration_mae += (
transition.duration.eval_mae(param_dict, function_args) ** 2
diff --git a/lib/codegen.py b/lib/codegen.py
index 62776fd..d224a01 100644
--- a/lib/codegen.py
+++ b/lib/codegen.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
"""Code generators for multipass dummy drivers for online model evaluation."""
from .automata import PTA, Transition
@@ -227,11 +228,8 @@ class SimulatedStaticAccountingImmediateCalculation(SimulatedAccountingMethod):
def sleep(self, duration_us):
time = self._sleep_duration(duration_us)
- print("sleep duration is {}".format(time))
power = int(self.current_state.power.value)
- print("power is {}".format(power))
energy = self._energy_from_power_and_time(time, power)
- print("energy is {}".format(energy))
self.energy += energy
def pass_transition(self, transition: Transition):
diff --git a/lib/functions.py b/lib/functions.py
index 663b65e..7950b5a 100644
--- a/lib/functions.py
+++ b/lib/functions.py
@@ -154,8 +154,15 @@ class NormalizationFunction:
class ModelFunction:
- def __init__(self):
- pass
+ def __init__(self, value):
+ # a model always has a static (median/mean) value. For StaticFunction, it's the only data point.
+ # For more complex models, it's usede both as fallback in case the model cannot predict the current
+ # parameter combination, and for use cases requiring static models
+ self.value = value
+
+ # Legacy(?) attributes for PTA
+ self.value_error = None
+ self.function_error = None
def is_predictable(self, param_list):
raise NotImplementedError
@@ -163,11 +170,28 @@ class ModelFunction:
def eval(self, param_list, arg_list):
raise NotImplementedError
+ def to_json(self):
+ raise NotImplementedError
+
+ @classmethod
+ def from_json(cls, data):
+ if data["type"] == "static":
+ return StaticFunction.from_json(data)
+ if data["type"] == "split":
+ return SplitFunction.from_json(data)
+ if data["type"] == "analytic":
+ return AnalyticFunction.from_json(data)
+ raise ValueError("Unknown ModelFunction type: " + data["type"])
+
+ @classmethod
+ def from_json_maybe(cls, json_wrapped: dict, attribute: str):
+ # Legacy Code for PTA / tests. Do not use.
+ if type(json_wrapped) is dict and attribute in json_wrapped:
+ return cls.from_json(json_wrapped[attribute])
+ return StaticFunction(0)
-class StaticFunction(ModelFunction):
- def __init__(self, value):
- self.value = value
+class StaticFunction(ModelFunction):
def is_predictable(self, param_list=None):
"""
Return whether the model function can be evaluated on the given parameter values.
@@ -188,9 +212,18 @@ class StaticFunction(ModelFunction):
def to_json(self):
return {"type": "static", "value": self.value}
+ @classmethod
+ def from_json(cls, data):
+ assert data["type"] == "static"
+ return cls(data["value"])
+
+ def __repr__(self):
+ return f"StaticFunction({self.value})"
+
class SplitFunction(ModelFunction):
- def __init__(self, param_index, child):
+ def __init__(self, value, param_index, child):
+ super().__init__(value)
self.param_index = param_index
self.child = child
@@ -216,10 +249,22 @@ class SplitFunction(ModelFunction):
def to_json(self):
return {
"type": "split",
+ "value": self.value,
"paramIndex": self.param_index,
"child": dict([[k, v.to_json()] for k, v in self.child.items()]),
}
+ @classmethod
+ def from_json(cls, data):
+ assert data["type"] == "split"
+ self = cls(data["value"], data["paramIndex"], dict())
+
+ for k, v in data["child"].items():
+ self.child[k] = ModelFunction.from_json(v)
+
+ def __repr__(self):
+ return f"SplitFunction<{self.value}, param_index={self.param_index}>"
+
class AnalyticFunction(ModelFunction):
"""
@@ -232,9 +277,10 @@ class AnalyticFunction(ModelFunction):
def __init__(
self,
+ value,
function_str,
parameters,
- num_args,
+ num_args=0,
regression_args=None,
fit_by_param=None,
):
@@ -256,6 +302,7 @@ class AnalyticFunction(ModelFunction):
both for function usage and least squares optimization.
If unset, defaults to [1, 1, 1, ...]
"""
+ super().__init__(value)
self._parameter_names = parameters
self._num_args = num_args
self.model_function = function_str
@@ -416,11 +463,28 @@ class AnalyticFunction(ModelFunction):
def to_json(self):
return {
"type": "analytic",
+ "value": self.value,
"functionStr": self.model_function,
- "dependsOnParam": self._dependson,
+ "argCount": self._num_args,
+ "parameterNames": self._parameter_names,
"regressionModel": list(self.model_args),
}
+ @classmethod
+ def from_json(cls, data):
+ assert data["type"] == "analytic"
+
+ return cls(
+ data["value"],
+ data["functionStr"],
+ data["parameterNames"],
+ data["argCount"],
+ data["regressionModel"],
+ )
+
+ def __repr__(self):
+ return f"AnalyticFunction<{self.value}, {self.model_function}>"
+
class analytic:
"""
@@ -617,5 +681,5 @@ class analytic:
)
)
return AnalyticFunction(
- buf, parameter_names, num_args, fit_by_param=fit_results
+ None, buf, parameter_names, num_args, fit_by_param=fit_results
)
diff --git a/lib/modular_arithmetic.py b/lib/modular_arithmetic.py
index c5ed1aa..0a3eaab 100644
--- a/lib/modular_arithmetic.py
+++ b/lib/modular_arithmetic.py
@@ -66,7 +66,8 @@ class Mod:
return self # The unary plus operator does nothing.
def __abs__(self):
- return self # The value is always kept non-negative, so the abs function should do nothing.
+ # The value is always kept non-negative, so the abs function should do nothing.
+ return self
# Helper functions to build common operands based on a template.
diff --git a/lib/parameters.py b/lib/parameters.py
index 1cad7a5..aedb6cd 100644
--- a/lib/parameters.py
+++ b/lib/parameters.py
@@ -607,6 +607,17 @@ class ModelAttribute:
}
return ret
+ @staticmethod
+ def from_json(cls, name, attr, data):
+ param_names = data["paramNames"]
+ arg_count = data["argCount"]
+
+ self = cls(name, attr, None, None, param_names, arg_count)
+
+ self.model_function = df.ModelFunction.from_json(data["modelFunction"])
+
+ return self
+
def get_static(self, use_mean=False):
if use_mean:
return self.mean
@@ -782,7 +793,9 @@ class ModelAttribute:
for param_value, child in child_by_param_value.items():
child.set_data_from_paramfit(paramfit, prefix + (param_value,))
function_child[param_value] = child.model_function
- self.model_function = df.SplitFunction(split_param_index, function_child)
+ self.model_function = df.SplitFunction(
+ self.median, split_param_index, function_child
+ )
def set_data_from_paramfit_this(self, paramfit, prefix):
fit_result = paramfit.get_result((self.name, self.attr) + prefix)
@@ -790,7 +803,11 @@ class ModelAttribute:
if self.function_override is not None:
function_str = self.function_override
x = df.AnalyticFunction(
- function_str, self.param_names, self.arg_count, fit_by_param=fit_result
+ self.median,
+ function_str,
+ self.param_names,
+ self.arg_count,
+ fit_by_param=fit_result,
)
x.fit(self.by_param)
if x.fit_success:
@@ -801,6 +818,7 @@ class ModelAttribute:
x = df.analytic.function_powerset(
fit_result, self.param_names, self.arg_count
)
+ x.value = self.median
x.fit(self.by_param)
if x.fit_success:
diff --git a/lib/protocol_benchmarks.py b/lib/protocol_benchmarks.py
index 7f3e2f2..cc47b38 100755
--- a/lib/protocol_benchmarks.py
+++ b/lib/protocol_benchmarks.py
@@ -328,10 +328,8 @@ class ArduinoJSON(DummyProtocol):
child = enc_node + "l"
while child in self.children:
child += "_"
- self.enc_buf += (
- "ArduinoJson::JsonArray& {} = {}.createNestedArray();\n".format(
- child, enc_node
- )
+ self.enc_buf += "ArduinoJson::JsonArray& {} = {}.createNestedArray();\n".format(
+ child, enc_node
)
self.children.add(child)
self.from_json(value, child)
@@ -340,10 +338,8 @@ class ArduinoJSON(DummyProtocol):
child = enc_node + "o"
while child in self.children:
child += "_"
- self.enc_buf += (
- "ArduinoJson::JsonObject& {} = {}.createNestedObject();\n".format(
- child, enc_node
- )
+ self.enc_buf += "ArduinoJson::JsonObject& {} = {}.createNestedObject();\n".format(
+ child, enc_node
)
self.children.add(child)
self.from_json(value, child)
@@ -620,15 +616,11 @@ class CapnProtoC(DummyProtocol):
[len(value)],
)
for i, elem in enumerate(value):
- self.enc_buf += (
- "capn_set{:d}({}.{}, {:d}, capn_from_f{:d}({:f}));\n".format(
- self.float_bits, self.name, key, i, self.float_bits, elem
- )
+ self.enc_buf += "capn_set{:d}({}.{}, {:d}, capn_from_f{:d}({:f}));\n".format(
+ self.float_bits, self.name, key, i, self.float_bits, elem
)
- self.dec_buf += (
- "kout << capn_to_f{:d}(capn_get{:d}({}.{}, {:d}));\n".format(
- self.float_bits, self.float_bits, self.name, key, i
- )
+ self.dec_buf += "kout << capn_to_f{:d}(capn_get{:d}({}.{}, {:d}));\n".format(
+ self.float_bits, self.float_bits, self.name, key, i
)
self.assign_and_kout(
self.float_type,
@@ -1204,10 +1196,8 @@ class NanoPB(DummyProtocol):
self.cc_encoders += (
"if (!pb_encode_tag_for_field(stream, field)) return false;\n"
)
- self.cc_encoders += (
- 'return pb_encode_string(stream, (uint8_t*)"{}", {:d});\n'.format(
- value, len(value)
- )
+ self.cc_encoders += 'return pb_encode_string(stream, (uint8_t*)"{}", {:d});\n'.format(
+ value, len(value)
)
self.cc_encoders += "}\n"
self.enc_buf += "msg.{}{}.funcs.encode = encode_{};\n".format(
@@ -1388,7 +1378,9 @@ class UBJ(DummyProtocol):
ret = 'kout << dec << "dec:";\n'
ret += self.dec_buf
ret += "kout << endl;\n"
- ret += "ubjr_cleanup_dynamic(&dynamic_root);\n" # This causes the data (including all strings) to be free'd
+ ret += (
+ "ubjr_cleanup_dynamic(&dynamic_root);\n"
+ ) # This causes the data (including all strings) to be free'd
ret += "ubjr_close_context(ctx);\n"
return ret
diff --git a/lib/runner.py b/lib/runner.py
index 59f69c4..f718180 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -206,8 +206,7 @@ class EnergyTraceLogicAnalyzerMonitor(EnergyTraceMonitor):
# Initialization of Interfaces
self.sig = SigrokCLIInterface(
- sample_rate=options["sample_rate"],
- fake=options["fake"],
+ sample_rate=options["sample_rate"], fake=options["fake"]
)
# Start Measurements
@@ -482,7 +481,7 @@ class Arch:
match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line)
if match:
cpu_freq = int(match.group(1))
- if cpu_freq is not None and cpu_freq > 8000000:
+ if cpu_freq is not None and cpu_freq > 8_000_000:
max_sleep = 250
else:
max_sleep = 500
@@ -509,5 +508,9 @@ class Arch:
overflow_value = int(match.group(1))
max_overflow = int(match.group(2))
if cpu_freq and overflow_value:
- return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow
+ return (
+ 1_000_000 / cpu_freq,
+ overflow_value * 1_000_000 / cpu_freq,
+ max_overflow,
+ )
raise RuntimeError("Did not find Counter Overflow limits")
diff --git a/lib/size_to_radio_energy.py b/lib/size_to_radio_energy.py
index 10de1a3..7f70144 100644
--- a/lib/size_to_radio_energy.py
+++ b/lib/size_to_radio_energy.py
@@ -40,10 +40,7 @@ class CC1200tx:
"txbytes": [],
"txpower": [10, 20, 30, 40, 47], # dBm = f(txpower)
}
- default_params = {
- "symbolrate": 100,
- "txpower": 47,
- }
+ default_params = {"symbolrate": 100, "txpower": 47}
@staticmethod
def get_energy(params: dict):
@@ -137,10 +134,7 @@ class CC1200rx:
"txbytes": [],
"txpower": [10, 20, 30, 40, 47], # dBm = f(txpower)
}
- default_params = {
- "symbolrate": 100,
- "txpower": 47,
- }
+ default_params = {"symbolrate": 100, "txpower": 47}
@staticmethod
def get_energy(params):
@@ -171,11 +165,7 @@ class NRF24L01rx:
"txpower": [-18, -12, -6, 0], # dBm
"voltage": [1.9, 3.6],
}
- default_params = {
- "datarate": 1000,
- "txpower": -6,
- "voltage": 3,
- }
+ default_params = {"datarate": 1000, "txpower": -6, "voltage": 3}
@staticmethod
def get_energy_per_byte(params):
@@ -202,11 +192,7 @@ class NRF24L01tx:
"txpower": [-18, -12, -6, 0], # dBm
"voltage": [1.9, 3.6],
}
- default_params = {
- "datarate": 1000,
- "txpower": -6,
- "voltage": 3,
- }
+ default_params = {"datarate": 1000, "txpower": -6, "voltage": 3}
# AEMR:
# TX power / energy:
@@ -270,11 +256,7 @@ class NRF24L01dtx:
"txpower": [-18, -12, -6, 0], # dBm
"voltage": [1.9, 3.6],
}
- default_params = {
- "datarate": 1000,
- "txpower": -6,
- "voltage": 3,
- }
+ default_params = {"datarate": 1000, "txpower": -6, "voltage": 3}
# 130 us RX settling: 8.9 mE
# 130 us TX settling: 8 mA
diff --git a/test/test_codegen.py b/test/test_codegen.py
index ce565d6..c957c30 100755
--- a/test/test_codegen.py
+++ b/test/test_codegen.py
@@ -8,15 +8,15 @@ example_json_1 = {
"parameters": ["datarate", "txbytes", "txpower"],
"initial_param_values": [None, None, None],
"state": {
- "IDLE": {"power": {"static": 5,}},
+ "IDLE": {"power": {"type": "static", "value": 5}},
"TX": {
"power": {
- "static": 100,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)"
- " * parameter(txpower)",
- "regression_args": [100, 2],
- },
+ "type": "analytic",
+ "value": 100,
+ "functionStr": "regression_arg(0) + regression_arg(1) * parameter(txpower)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [100, 2],
}
},
},
@@ -25,15 +25,15 @@ example_json_1 = {
"name": "init",
"origin": ["UNINITIALIZED", "IDLE"],
"destination": "IDLE",
- "duration": {"static": 50000,},
+ "duration": {"type": "static", "value": 50000},
"set_param": {"txpower": 10},
},
{
"name": "setTxPower",
"origin": "IDLE",
"destination": "IDLE",
- "duration": {"static": 120},
- "energy ": {"static": 10000},
+ "duration": {"type": "static", "value": 120},
+ "energy ": {"type": "static", "value": 10000},
"arg_to_param_map": {0: "txpower"},
"argument_values": [[10, 20, 30]],
},
@@ -42,18 +42,22 @@ example_json_1 = {
"origin": "IDLE",
"destination": "TX",
"duration": {
- "static": 10,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)",
- "regression_args": [48, 8],
- },
+ "type": "analytic",
+ "value": 10,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * function_arg(1)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [48, 8],
},
"energy": {
- "static": 3,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)",
- "regression_args": [3, 5],
- },
+ "type": "analytic",
+ "value": 3,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * function_arg(1)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [3, 5],
},
"arg_to_param_map": {1: "txbytes"},
"argument_values": [['"foo"', '"hodor"'], [3, 5]],
@@ -65,12 +69,13 @@ example_json_1 = {
"destination": "IDLE",
"is_interrupt": 1,
"timeout": {
- "static": 2000,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)"
- " * parameter(txbytes)",
- "regression_args": [500, 16],
- },
+ "type": "analytic",
+ "value": 2000,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * parameter(txbytes)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [500, 16],
},
},
],
diff --git a/test/test_pta.py b/test/test_pta.py
index d43e702..00fa19a 100755
--- a/test/test_pta.py
+++ b/test/test_pta.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
from dfatool.automata import PTA
+from dfatool.functions import AnalyticFunction
import unittest
import yaml
@@ -8,15 +9,15 @@ example_json_1 = {
"parameters": ["datarate", "txbytes", "txpower"],
"initial_param_values": [None, None, None],
"state": {
- "IDLE": {"power": {"static": 5,}},
+ "IDLE": {"power": {"type": "static", "value": 5}},
"TX": {
"power": {
- "static": 10000,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)"
- " * parameter(txpower)",
- "regression_args": [10000, 2],
- },
+ "type": "analytic",
+ "value": 100,
+ "functionStr": "regression_arg(0) + regression_arg(1) * parameter(txpower)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [10000, 2],
}
},
},
@@ -25,15 +26,15 @@ example_json_1 = {
"name": "init",
"origin": ["UNINITIALIZED", "IDLE"],
"destination": "IDLE",
- "duration": {"static": 50000,},
+ "duration": {"type": "static", "value": 50000},
"set_param": {"txpower": 10},
},
{
"name": "setTxPower",
"origin": "IDLE",
"destination": "IDLE",
- "duration": {"static": 120},
- "energy ": {"static": 10000},
+ "duration": {"type": "static", "value": 120},
+ "energy ": {"type": "static", "value": 10000},
"arg_to_param_map": {0: "txpower"},
"argument_values": [[10, 20, 30]],
},
@@ -42,18 +43,22 @@ example_json_1 = {
"origin": "IDLE",
"destination": "TX",
"duration": {
- "static": 10,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)",
- "regression_args": [48, 8],
- },
+ "type": "analytic",
+ "value": 10,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * function_arg(1)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [48, 8],
},
"energy": {
- "static": 3,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)",
- "regression_args": [3, 5],
- },
+ "type": "analytic",
+ "value": 3,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * function_arg(1)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [3, 5],
},
"arg_to_param_map": {1: "txbytes"},
"argument_values": [['"foo"', '"hodor"'], [3, 5]],
@@ -65,12 +70,13 @@ example_json_1 = {
"destination": "IDLE",
"is_interrupt": 1,
"timeout": {
- "static": 2000,
- "function": {
- "raw": "regression_arg(0) + regression_arg(1)"
- " * parameter(txbytes)",
- "regression_args": [500, 16],
- },
+ "type": "analytic",
+ "value": 2000,
+ "functionStr": "regression_arg(0) + regression_arg(1)"
+ " * parameter(txbytes)",
+ "parameterNames": ["datarate", "txbytes", "txpower"],
+ "argCount": 0,
+ "regressionModel": [500, 16],
},
},
],
@@ -470,36 +476,12 @@ class TestPTA(unittest.TestCase):
def test_from_json_dfs_param(self):
pta = PTA.from_json(example_json_1)
- no_param = {
- "datarate": None,
- "txbytes": None,
- "txpower": 10,
- }
- param_tx3 = {
- "datarate": None,
- "txbytes": 3,
- "txpower": 10,
- }
- param_tx5 = {
- "datarate": None,
- "txbytes": 5,
- "txpower": 10,
- }
- param_txp10 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 10,
- }
- param_txp20 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 20,
- }
- param_txp30 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 30,
- }
+ no_param = {"datarate": None, "txbytes": None, "txpower": 10}
+ param_tx3 = {"datarate": None, "txbytes": 3, "txpower": 10}
+ param_tx5 = {"datarate": None, "txbytes": 5, "txpower": 10}
+ param_txp10 = {"datarate": None, "txbytes": None, "txpower": 10}
+ param_txp20 = {"datarate": None, "txbytes": None, "txpower": 20}
+ param_txp30 = {"datarate": None, "txbytes": None, "txpower": 30}
self.assertEqual(
sorted(
dfs_tran_to_name(
@@ -533,36 +515,12 @@ class TestPTA(unittest.TestCase):
def test_from_yaml_dfs_param(self):
pta = PTA.from_yaml(example_yaml_1)
- no_param = {
- "datarate": None,
- "txbytes": None,
- "txpower": None,
- }
- param_tx3 = {
- "datarate": None,
- "txbytes": 3,
- "txpower": None,
- }
- param_tx5 = {
- "datarate": None,
- "txbytes": 5,
- "txpower": None,
- }
- param_txp10 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 10,
- }
- param_txp20 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 20,
- }
- param_txp30 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 30,
- }
+ no_param = {"datarate": None, "txbytes": None, "txpower": None}
+ param_tx3 = {"datarate": None, "txbytes": 3, "txpower": None}
+ param_tx5 = {"datarate": None, "txbytes": 5, "txpower": None}
+ param_txp10 = {"datarate": None, "txbytes": None, "txpower": 10}
+ param_txp20 = {"datarate": None, "txbytes": None, "txpower": 20}
+ param_txp30 = {"datarate": None, "txbytes": None, "txpower": 30}
self.assertEqual(
sorted(
dfs_tran_to_name(
@@ -581,36 +539,12 @@ class TestPTA(unittest.TestCase):
def test_normalization(self):
pta = PTA.from_yaml(example_yaml_2)
- no_param = {
- "datarate": None,
- "txbytes": None,
- "txpower": None,
- }
- param_tx3 = {
- "datarate": None,
- "txbytes": 3,
- "txpower": None,
- }
- param_tx6 = {
- "datarate": None,
- "txbytes": 6,
- "txpower": None,
- }
- param_txp10 = {
- "datarate": None,
- "txbytes": None,
- "txpower": -6,
- }
- param_txp20 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 4,
- }
- param_txp30 = {
- "datarate": None,
- "txbytes": None,
- "txpower": 14,
- }
+ no_param = {"datarate": None, "txbytes": None, "txpower": None}
+ param_tx3 = {"datarate": None, "txbytes": 3, "txpower": None}
+ param_tx6 = {"datarate": None, "txbytes": 6, "txpower": None}
+ param_txp10 = {"datarate": None, "txbytes": None, "txpower": -6}
+ param_txp20 = {"datarate": None, "txbytes": None, "txpower": 4}
+ param_txp30 = {"datarate": None, "txbytes": None, "txpower": 14}
self.assertEqual(
sorted(
dfs_tran_to_name(
@@ -690,9 +624,7 @@ class TestPTA(unittest.TestCase):
)
pta.add_transition("IDLE", "TX", "send", energy=3, duration=10)
pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True)
- trace = [
- ["init"],
- ]
+ trace = [["init"]]
expected_energy = 500000
expected_duration = 50000
result = pta.simulate(trace)
@@ -765,9 +697,7 @@ class TestPTA(unittest.TestCase):
duration=50000,
set_param={"txpower": 10},
)
- trace = [
- ["init"],
- ]
+ trace = [["init"]]
expected_energy = 500000
expected_duration = 50000
result = pta.simulate(trace)
@@ -795,17 +725,23 @@ class TestPTA(unittest.TestCase):
"IDLE",
"TX",
"send",
- energy=3,
- duration=10,
- energy_function=lambda param, arg: 3 + 5 * arg[1],
- duration_function=lambda param, arg: 48 + 8 * arg[1],
+ energy=AnalyticFunction(
+ 3,
+ "regression_arg(0) + regression_arg(1) * function_arg(1)",
+ ["txpower", "length"],
+ regression_args=[3.0, 5],
+ num_args=2,
+ ),
+ duration=AnalyticFunction(
+ 10,
+ "regression_arg(0) + regression_arg(1) * function_arg(1)",
+ ["txpower", "length"],
+ regression_args=[48, 8],
+ num_args=2,
+ ),
)
pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True)
- trace = [
- ["init"],
- ["setTxPower", 10],
- ["send", "foo", 3],
- ]
+ trace = [["init"], ["setTxPower", 10], ["send", "foo", 3]]
expected_energy = 500000 + 10000 + (3 + 5 * 3) + (2000 * 100)
expected_duration = 50000 + 120 + (48 + 8 * 3) + 2000
result = pta.simulate(trace)
@@ -832,17 +768,23 @@ class TestPTA(unittest.TestCase):
"IDLE",
"TX",
"send",
- energy=3,
- duration=10,
- energy_function=lambda param, arg: 3 + 5 * arg[1],
- duration_function=lambda param, arg: 48 + 8 * arg[1],
+ energy=AnalyticFunction(
+ 3,
+ "regression_arg(0) + regression_arg(1) * function_arg(1)",
+ ["txpower", "length"],
+ regression_args=[3, 5],
+ num_args=2,
+ ),
+ duration=AnalyticFunction(
+ 10,
+ "regression_arg(0) + regression_arg(1) * function_arg(1)",
+ ["txpower", "length"],
+ regression_args=[48, 8],
+ num_args=2,
+ ),
)
pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True)
- trace = [
- ["init"],
- ["setTxPower", 10],
- ["send", "foobar", 6],
- ]
+ trace = [["init"], ["setTxPower", 10], ["send", "foobar", 6]]
expected_energy = 500000 + 10000 + (3 + 5 * 6) + (2000 * 100)
expected_duration = 50000 + 120 + (48 + 8 * 6) + 2000
result = pta.simulate(trace)
@@ -855,7 +797,13 @@ class TestPTA(unittest.TestCase):
pta = PTA(parameters=["length", "txpower"])
pta.add_state("IDLE", power=5)
pta.add_state(
- "TX", power=100, power_function=lambda param, arg: 1000 + 2 * param[1]
+ "TX",
+ power=AnalyticFunction(
+ 100,
+ "regression_arg(0) + regression_arg(1) * parameter(txpower)",
+ ["length", "txpower"],
+ regression_args=[1000, 2],
+ ),
)
pta.add_transition(
"UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000
@@ -872,24 +820,29 @@ class TestPTA(unittest.TestCase):
"IDLE",
"TX",
"send",
- energy=3,
+ energy=AnalyticFunction(
+ 3,
+ "regression_arg(0) + regression_arg(1) * function_arg(1)",
+ ["length", "txpower"],
+ regression_args=[3, 5],
+ num_args=2,
+ ),
duration=10,
- energy_function=lambda param, arg: 3 + 5 * arg[1],
param_update_function=lambda param, arg: {**param, "length": arg[1]},
)
pta.add_transition(
"TX",
"IDLE",
"txComplete",
- timeout=2000,
is_interrupt=True,
- timeout_function=lambda param, arg: 500 + 16 * param[0],
+ timeout=AnalyticFunction(
+ 2000,
+ "regression_arg(0) + regression_arg(1) * parameter(length)",
+ ["length", "txpower"],
+ regression_args=[500, 16],
+ ),
)
- trace = [
- ["init"],
- ["setTxPower", 10],
- ["send", "foo", 3],
- ]
+ trace = [["init"], ["setTxPower", 10], ["send", "foo", 3]]
expected_energy = (
500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3)
)