From 9bf7d10f3310147c7e85330a79da655b9f7a5bad Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Thu, 4 Mar 2021 13:32:36 +0100 Subject: PTA State/Transition: Use ModelFunction instead of PTAAttribute --- lib/automata.py | 196 ++++++---------------------------- lib/codegen.py | 4 +- lib/functions.py | 82 ++++++++++++-- lib/modular_arithmetic.py | 3 +- lib/parameters.py | 22 +++- lib/protocol_benchmarks.py | 34 +++--- lib/runner.py | 11 +- lib/size_to_radio_energy.py | 28 +---- test/test_codegen.py | 57 +++++----- test/test_pta.py | 253 ++++++++++++++++++-------------------------- 10 files changed, 288 insertions(+), 402 deletions(-) diff --git a/lib/automata.py b/lib/automata.py index 1e47596..b1e5623 100755 --- a/lib/automata.py +++ b/lib/automata.py @@ -1,7 +1,12 @@ #!/usr/bin/env python3 """Classes and helper functions for PTA and other automata.""" -from .functions import AnalyticFunction, NormalizationFunction +from .functions import ( + AnalyticFunction, + NormalizationFunction, + ModelFunction, + StaticFunction, +) from .parameters import ModelAttribute from .utils import is_numeric import itertools @@ -14,7 +19,7 @@ import yaml logger = logging.getLogger(__name__) -def _dict_to_list(input_dict: dict) -> list: +def dict_to_list(input_dict: dict) -> list: return [input_dict[x] for x in sorted(input_dict.keys())] @@ -74,122 +79,15 @@ class SimulationResult: self.mean_power = 0 -class PTAAttribute: - u""" - A single PTA attribute (e.g. power, duration). - - A PTA attribute can be described by a static value and an analytic - function (depending on parameters and function arguments). - - It is not specified how value_error and function_error are determined -- - at the moment, they do not use cross validation. - - :param value: static value, typically in µW/µs/pJ - :param value_error: mean absolute error of value (optional) - :param function: AnalyticFunction for parameter-aware prediction (optional) - :param function_error: mean absolute error of function (optional) - """ - - def __init__( - self, - value: float = 0, - function: AnalyticFunction = None, - value_error=None, - function_error=None, - ): - self.value = value - self.function = function - self.value_error = value_error - self.function_error = function_error - - def __repr__(self): - if self.function is not None: - return "PTAATtribute<{:.0f}, {}>".format( - self.value, self.function.model_function - ) - return "PTAATtribute<{:.0f}, None>".format(self.value) - - def eval(self, param_dict=dict(), args=list()): - """ - Return attribute for given `param_dict` and `args` value. - - Uses `function` if set and usable for the given `param_dict` and - `value` otherwise. - """ - param_list = _dict_to_list(param_dict) - if self.function and self.function.is_predictable(param_list): - return self.function.eval(param_list, args) - return self.value - - def eval_mae(self, param_dict=dict(), args=list()): - """ - Return attribute mean absolute error for given `param_dict` and `args` value. - - Uses `function_error` if `function` is set and usable for the given `param_dict` and `value_error` otherwise. - """ - param_list = _dict_to_list(param_dict) - if self.function and self.function.is_predictable(param_list): - return self.function_error["mae"] - return self.value_error["mae"] - - def to_json(self): - ret = {"static": self.value, "static_error": self.value_error} - if self.function: - ret["function"] = { - "raw": self.function.model_function, - "regression_args": list(self.function.model_args), - } - ret["function_error"] = self.function_error - return ret - - @classmethod - def from_json(cls, json_input: dict, parameters: dict): - ret = cls() - if "static" in json_input: - ret.value = json_input["static"] - if "static_error" in json_input: - ret.value_error = json_input["static_error"] - if "function" in json_input: - ret.function = AnalyticFunction( - json_input["function"]["raw"], - parameters, - 0, - regression_args=json_input["function"]["regression_args"], - ) - if "function_error" in json_input: - ret.function_error = json_input["function_error"] - return ret - - @classmethod - def from_json_maybe(cls, json_wrapped: dict, attribute: str, parameters: dict): - if type(json_wrapped) is dict and attribute in json_wrapped: - return cls.from_json(json_wrapped[attribute], parameters) - return cls() - - -def _json_function_to_analytic_function(base, attribute: str, parameters: list): - if attribute in base and "function" in base[attribute]: - base = base[attribute]["function"] - return AnalyticFunction( - base["raw"], parameters, 0, regression_args=base["regression_args"] - ) - return None - - class State: """A single PTA state.""" - def __init__( - self, - name: str, - power: PTAAttribute = PTAAttribute(), - power_function: AnalyticFunction = None, - ): + def __init__(self, name: str, power: ModelFunction = StaticFunction(0)): u""" Create a new PTA state. :param name: state name - :param power: state power PTAAttribute in µW, default static 0 / parameterized None + :param power: state power ModelFunction in µW, default static StaticFunction(0) :param power_function: Legacy support """ self.name = name @@ -197,13 +95,7 @@ class State: self.outgoing_transitions = {} if type(self.power) is float or type(self.power) is int: - self.power = PTAAttribute(self.power) - - if power_function is not None: - if type(power_function) is AnalyticFunction: - self.power.function = power_function - else: - raise ValueError("power_function must be None or AnalyticFunction") + self.power = StaticFunction(self.power) def __repr__(self): return "State<{:s}, {}>".format(self.name, self.power) @@ -220,7 +112,7 @@ class State: :param param_dict: current parameters :returns: energy spent in pJ """ - return self.power.eval(param_dict) * duration + return self.power.eval(dict_to_list(param_dict)) * duration def set_random_energy_model(self, static_model=True): u"""Set a random static state power between 0 µW and 50 mW.""" @@ -417,12 +309,9 @@ class Transition: orig_state: State, dest_state: State, name: str, - energy: PTAAttribute = PTAAttribute(), - energy_function: AnalyticFunction = None, - duration: PTAAttribute = PTAAttribute(), - duration_function: AnalyticFunction = None, - timeout: PTAAttribute = PTAAttribute(), - timeout_function: AnalyticFunction = None, + energy: ModelFunction = StaticFunction(0), + duration: ModelFunction = StaticFunction(0), + timeout: ModelFunction = StaticFunction(0), is_interrupt: bool = False, arguments: list = [], argument_values: list = [], @@ -457,22 +346,13 @@ class Transition: self.codegen = codegen if type(self.energy) is float or type(self.energy) is int: - self.energy = PTAAttribute(self.energy) - if energy_function is not None: - if type(energy_function) is AnalyticFunction: - self.energy.function = energy_function + self.energy = StaticFunction(self.energy) if type(self.duration) is float or type(self.duration) is int: - self.duration = PTAAttribute(self.duration) - if duration_function is not None: - if type(duration_function) is AnalyticFunction: - self.duration.function = duration_function + self.duration = StaticFunction(self.duration) if type(self.timeout) is float or type(self.timeout) is int: - self.timeout = PTAAttribute(self.timeout) - if timeout_function is not None: - if type(timeout_function) is AnalyticFunction: - self.timeout.function = timeout_function + self.timeout = StaticFunction(self.timeout) for handler in self.return_value_handlers: if "formula" in handler: @@ -487,7 +367,7 @@ class Transition: :returns: transition duration in µs """ - return self.duration.eval(param_dict, args) + return self.duration.eval(dict_to_list(param_dict) + args) def get_energy(self, param_dict: dict = {}, args: list = []) -> float: u""" @@ -496,15 +376,14 @@ class Transition: :param param_dict: current parameter values :param args: function arguments """ - return self.energy.eval(param_dict, args) + return self.energy.eval(dict_to_list(param_dict) + args) def set_random_energy_model(self, static_model=True): self.energy.value = int(np.random.sample() * 50000) self.duration.value = int(np.random.sample() * 50000) - if self.is_interrupt: - self.timeout.value = int(np.random.sample() * 50000) + self.timeout.value = int(np.random.sample() * 50000) - def get_timeout(self, param_dict: dict = {}) -> float: + def get_timeout(self, param_dict: dict = {}, args: list = list()) -> float: u""" Return transition timeout in µs. @@ -513,7 +392,7 @@ class Transition: :param param_dict: current parameter values :param args: function arguments """ - return self.timeout.eval(param_dict) + return self.timeout.eval(dict_to_list(param_dict) + args) def get_params_after_transition(self, param_dict: dict, args: list = []) -> dict: """ @@ -703,9 +582,7 @@ class PTA: kwargs[key] = json_input[key] pta = cls(**kwargs) for name, state in json_input["state"].items(): - pta.add_state( - name, power=PTAAttribute.from_json_maybe(state, "power", pta.parameters) - ) + pta.add_state(name, power=ModelFunction.from_json_maybe(state, "power")) for transition in json_input["transitions"]: kwargs = dict() for key in [ @@ -730,15 +607,9 @@ class PTA: origin, transition["destination"], transition["name"], - duration=PTAAttribute.from_json_maybe( - transition, "duration", pta.parameters - ), - energy=PTAAttribute.from_json_maybe( - transition, "energy", pta.parameters - ), - timeout=PTAAttribute.from_json_maybe( - transition, "timeout", pta.parameters - ), + duration=ModelFunction.from_json_maybe(transition, "duration"), + energy=ModelFunction.from_json_maybe(transition, "energy"), + timeout=ModelFunction.from_json_maybe(transition, "timeout"), **kwargs ) @@ -762,9 +633,7 @@ class PTA: pta = cls(**kwargs) for name, state in json_input["state"].items(): - pta.add_state( - name, power=PTAAttribute(value=float(state["power"]["static"])) - ) + pta.add_state(name, power=StaticFunction(float(state["power"]["static"]))) for trans_name in sorted(json_input["transition"].keys()): transition = json_input["transition"][trans_name] @@ -818,8 +687,7 @@ class PTA: if "state" in yaml_input: for state_name, state in yaml_input["state"].items(): pta.add_state( - state_name, - power=PTAAttribute.from_json_maybe(state, "power", pta.parameters), + state_name, power=ModelFunction.from_json_maybe(state, "power") ) for trans_name in sorted(yaml_input["transition"].keys()): @@ -902,7 +770,7 @@ class PTA: and kwargs["power_function"] is not None ): kwargs["power_function"] = AnalyticFunction( - kwargs["power_function"], self.parameters, 0 + None, kwargs["power_function"], self.parameters, 0 ) self.state[state_name] = State(state_name, **kwargs) @@ -925,7 +793,7 @@ class PTA: and kwargs[key] is not None and type(kwargs[key]) != AnalyticFunction ): - kwargs[key] = AnalyticFunction(kwargs[key], self.parameters, 0) + kwargs[key] = AnalyticFunction(None, kwargs[key], self.parameters, 0) new_transition = Transition(orig_state, dest_state, function_name, **kwargs) self.transitions.append(new_transition) @@ -1252,7 +1120,9 @@ class PTA: accounting.sleep(duration) else: transition = state.get_transition(function_name) - total_duration += transition.duration.eval(param_dict, function_args) + total_duration += transition.duration.eval( + dict_to_list(param_dict) + function_args + ) if transition.duration.value_error is not None: total_duration_mae += ( transition.duration.eval_mae(param_dict, function_args) ** 2 diff --git a/lib/codegen.py b/lib/codegen.py index 62776fd..d224a01 100644 --- a/lib/codegen.py +++ b/lib/codegen.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 """Code generators for multipass dummy drivers for online model evaluation.""" from .automata import PTA, Transition @@ -227,11 +228,8 @@ class SimulatedStaticAccountingImmediateCalculation(SimulatedAccountingMethod): def sleep(self, duration_us): time = self._sleep_duration(duration_us) - print("sleep duration is {}".format(time)) power = int(self.current_state.power.value) - print("power is {}".format(power)) energy = self._energy_from_power_and_time(time, power) - print("energy is {}".format(energy)) self.energy += energy def pass_transition(self, transition: Transition): diff --git a/lib/functions.py b/lib/functions.py index 663b65e..7950b5a 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -154,8 +154,15 @@ class NormalizationFunction: class ModelFunction: - def __init__(self): - pass + def __init__(self, value): + # a model always has a static (median/mean) value. For StaticFunction, it's the only data point. + # For more complex models, it's usede both as fallback in case the model cannot predict the current + # parameter combination, and for use cases requiring static models + self.value = value + + # Legacy(?) attributes for PTA + self.value_error = None + self.function_error = None def is_predictable(self, param_list): raise NotImplementedError @@ -163,11 +170,28 @@ class ModelFunction: def eval(self, param_list, arg_list): raise NotImplementedError + def to_json(self): + raise NotImplementedError + + @classmethod + def from_json(cls, data): + if data["type"] == "static": + return StaticFunction.from_json(data) + if data["type"] == "split": + return SplitFunction.from_json(data) + if data["type"] == "analytic": + return AnalyticFunction.from_json(data) + raise ValueError("Unknown ModelFunction type: " + data["type"]) + + @classmethod + def from_json_maybe(cls, json_wrapped: dict, attribute: str): + # Legacy Code for PTA / tests. Do not use. + if type(json_wrapped) is dict and attribute in json_wrapped: + return cls.from_json(json_wrapped[attribute]) + return StaticFunction(0) -class StaticFunction(ModelFunction): - def __init__(self, value): - self.value = value +class StaticFunction(ModelFunction): def is_predictable(self, param_list=None): """ Return whether the model function can be evaluated on the given parameter values. @@ -188,9 +212,18 @@ class StaticFunction(ModelFunction): def to_json(self): return {"type": "static", "value": self.value} + @classmethod + def from_json(cls, data): + assert data["type"] == "static" + return cls(data["value"]) + + def __repr__(self): + return f"StaticFunction({self.value})" + class SplitFunction(ModelFunction): - def __init__(self, param_index, child): + def __init__(self, value, param_index, child): + super().__init__(value) self.param_index = param_index self.child = child @@ -216,10 +249,22 @@ class SplitFunction(ModelFunction): def to_json(self): return { "type": "split", + "value": self.value, "paramIndex": self.param_index, "child": dict([[k, v.to_json()] for k, v in self.child.items()]), } + @classmethod + def from_json(cls, data): + assert data["type"] == "split" + self = cls(data["value"], data["paramIndex"], dict()) + + for k, v in data["child"].items(): + self.child[k] = ModelFunction.from_json(v) + + def __repr__(self): + return f"SplitFunction<{self.value}, param_index={self.param_index}>" + class AnalyticFunction(ModelFunction): """ @@ -232,9 +277,10 @@ class AnalyticFunction(ModelFunction): def __init__( self, + value, function_str, parameters, - num_args, + num_args=0, regression_args=None, fit_by_param=None, ): @@ -256,6 +302,7 @@ class AnalyticFunction(ModelFunction): both for function usage and least squares optimization. If unset, defaults to [1, 1, 1, ...] """ + super().__init__(value) self._parameter_names = parameters self._num_args = num_args self.model_function = function_str @@ -416,11 +463,28 @@ class AnalyticFunction(ModelFunction): def to_json(self): return { "type": "analytic", + "value": self.value, "functionStr": self.model_function, - "dependsOnParam": self._dependson, + "argCount": self._num_args, + "parameterNames": self._parameter_names, "regressionModel": list(self.model_args), } + @classmethod + def from_json(cls, data): + assert data["type"] == "analytic" + + return cls( + data["value"], + data["functionStr"], + data["parameterNames"], + data["argCount"], + data["regressionModel"], + ) + + def __repr__(self): + return f"AnalyticFunction<{self.value}, {self.model_function}>" + class analytic: """ @@ -617,5 +681,5 @@ class analytic: ) ) return AnalyticFunction( - buf, parameter_names, num_args, fit_by_param=fit_results + None, buf, parameter_names, num_args, fit_by_param=fit_results ) diff --git a/lib/modular_arithmetic.py b/lib/modular_arithmetic.py index c5ed1aa..0a3eaab 100644 --- a/lib/modular_arithmetic.py +++ b/lib/modular_arithmetic.py @@ -66,7 +66,8 @@ class Mod: return self # The unary plus operator does nothing. def __abs__(self): - return self # The value is always kept non-negative, so the abs function should do nothing. + # The value is always kept non-negative, so the abs function should do nothing. + return self # Helper functions to build common operands based on a template. diff --git a/lib/parameters.py b/lib/parameters.py index 1cad7a5..aedb6cd 100644 --- a/lib/parameters.py +++ b/lib/parameters.py @@ -607,6 +607,17 @@ class ModelAttribute: } return ret + @staticmethod + def from_json(cls, name, attr, data): + param_names = data["paramNames"] + arg_count = data["argCount"] + + self = cls(name, attr, None, None, param_names, arg_count) + + self.model_function = df.ModelFunction.from_json(data["modelFunction"]) + + return self + def get_static(self, use_mean=False): if use_mean: return self.mean @@ -782,7 +793,9 @@ class ModelAttribute: for param_value, child in child_by_param_value.items(): child.set_data_from_paramfit(paramfit, prefix + (param_value,)) function_child[param_value] = child.model_function - self.model_function = df.SplitFunction(split_param_index, function_child) + self.model_function = df.SplitFunction( + self.median, split_param_index, function_child + ) def set_data_from_paramfit_this(self, paramfit, prefix): fit_result = paramfit.get_result((self.name, self.attr) + prefix) @@ -790,7 +803,11 @@ class ModelAttribute: if self.function_override is not None: function_str = self.function_override x = df.AnalyticFunction( - function_str, self.param_names, self.arg_count, fit_by_param=fit_result + self.median, + function_str, + self.param_names, + self.arg_count, + fit_by_param=fit_result, ) x.fit(self.by_param) if x.fit_success: @@ -801,6 +818,7 @@ class ModelAttribute: x = df.analytic.function_powerset( fit_result, self.param_names, self.arg_count ) + x.value = self.median x.fit(self.by_param) if x.fit_success: diff --git a/lib/protocol_benchmarks.py b/lib/protocol_benchmarks.py index 7f3e2f2..cc47b38 100755 --- a/lib/protocol_benchmarks.py +++ b/lib/protocol_benchmarks.py @@ -328,10 +328,8 @@ class ArduinoJSON(DummyProtocol): child = enc_node + "l" while child in self.children: child += "_" - self.enc_buf += ( - "ArduinoJson::JsonArray& {} = {}.createNestedArray();\n".format( - child, enc_node - ) + self.enc_buf += "ArduinoJson::JsonArray& {} = {}.createNestedArray();\n".format( + child, enc_node ) self.children.add(child) self.from_json(value, child) @@ -340,10 +338,8 @@ class ArduinoJSON(DummyProtocol): child = enc_node + "o" while child in self.children: child += "_" - self.enc_buf += ( - "ArduinoJson::JsonObject& {} = {}.createNestedObject();\n".format( - child, enc_node - ) + self.enc_buf += "ArduinoJson::JsonObject& {} = {}.createNestedObject();\n".format( + child, enc_node ) self.children.add(child) self.from_json(value, child) @@ -620,15 +616,11 @@ class CapnProtoC(DummyProtocol): [len(value)], ) for i, elem in enumerate(value): - self.enc_buf += ( - "capn_set{:d}({}.{}, {:d}, capn_from_f{:d}({:f}));\n".format( - self.float_bits, self.name, key, i, self.float_bits, elem - ) + self.enc_buf += "capn_set{:d}({}.{}, {:d}, capn_from_f{:d}({:f}));\n".format( + self.float_bits, self.name, key, i, self.float_bits, elem ) - self.dec_buf += ( - "kout << capn_to_f{:d}(capn_get{:d}({}.{}, {:d}));\n".format( - self.float_bits, self.float_bits, self.name, key, i - ) + self.dec_buf += "kout << capn_to_f{:d}(capn_get{:d}({}.{}, {:d}));\n".format( + self.float_bits, self.float_bits, self.name, key, i ) self.assign_and_kout( self.float_type, @@ -1204,10 +1196,8 @@ class NanoPB(DummyProtocol): self.cc_encoders += ( "if (!pb_encode_tag_for_field(stream, field)) return false;\n" ) - self.cc_encoders += ( - 'return pb_encode_string(stream, (uint8_t*)"{}", {:d});\n'.format( - value, len(value) - ) + self.cc_encoders += 'return pb_encode_string(stream, (uint8_t*)"{}", {:d});\n'.format( + value, len(value) ) self.cc_encoders += "}\n" self.enc_buf += "msg.{}{}.funcs.encode = encode_{};\n".format( @@ -1388,7 +1378,9 @@ class UBJ(DummyProtocol): ret = 'kout << dec << "dec:";\n' ret += self.dec_buf ret += "kout << endl;\n" - ret += "ubjr_cleanup_dynamic(&dynamic_root);\n" # This causes the data (including all strings) to be free'd + ret += ( + "ubjr_cleanup_dynamic(&dynamic_root);\n" + ) # This causes the data (including all strings) to be free'd ret += "ubjr_close_context(ctx);\n" return ret diff --git a/lib/runner.py b/lib/runner.py index 59f69c4..f718180 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -206,8 +206,7 @@ class EnergyTraceLogicAnalyzerMonitor(EnergyTraceMonitor): # Initialization of Interfaces self.sig = SigrokCLIInterface( - sample_rate=options["sample_rate"], - fake=options["fake"], + sample_rate=options["sample_rate"], fake=options["fake"] ) # Start Measurements @@ -482,7 +481,7 @@ class Arch: match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line) if match: cpu_freq = int(match.group(1)) - if cpu_freq is not None and cpu_freq > 8000000: + if cpu_freq is not None and cpu_freq > 8_000_000: max_sleep = 250 else: max_sleep = 500 @@ -509,5 +508,9 @@ class Arch: overflow_value = int(match.group(1)) max_overflow = int(match.group(2)) if cpu_freq and overflow_value: - return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow + return ( + 1_000_000 / cpu_freq, + overflow_value * 1_000_000 / cpu_freq, + max_overflow, + ) raise RuntimeError("Did not find Counter Overflow limits") diff --git a/lib/size_to_radio_energy.py b/lib/size_to_radio_energy.py index 10de1a3..7f70144 100644 --- a/lib/size_to_radio_energy.py +++ b/lib/size_to_radio_energy.py @@ -40,10 +40,7 @@ class CC1200tx: "txbytes": [], "txpower": [10, 20, 30, 40, 47], # dBm = f(txpower) } - default_params = { - "symbolrate": 100, - "txpower": 47, - } + default_params = {"symbolrate": 100, "txpower": 47} @staticmethod def get_energy(params: dict): @@ -137,10 +134,7 @@ class CC1200rx: "txbytes": [], "txpower": [10, 20, 30, 40, 47], # dBm = f(txpower) } - default_params = { - "symbolrate": 100, - "txpower": 47, - } + default_params = {"symbolrate": 100, "txpower": 47} @staticmethod def get_energy(params): @@ -171,11 +165,7 @@ class NRF24L01rx: "txpower": [-18, -12, -6, 0], # dBm "voltage": [1.9, 3.6], } - default_params = { - "datarate": 1000, - "txpower": -6, - "voltage": 3, - } + default_params = {"datarate": 1000, "txpower": -6, "voltage": 3} @staticmethod def get_energy_per_byte(params): @@ -202,11 +192,7 @@ class NRF24L01tx: "txpower": [-18, -12, -6, 0], # dBm "voltage": [1.9, 3.6], } - default_params = { - "datarate": 1000, - "txpower": -6, - "voltage": 3, - } + default_params = {"datarate": 1000, "txpower": -6, "voltage": 3} # AEMR: # TX power / energy: @@ -270,11 +256,7 @@ class NRF24L01dtx: "txpower": [-18, -12, -6, 0], # dBm "voltage": [1.9, 3.6], } - default_params = { - "datarate": 1000, - "txpower": -6, - "voltage": 3, - } + default_params = {"datarate": 1000, "txpower": -6, "voltage": 3} # 130 us RX settling: 8.9 mE # 130 us TX settling: 8 mA diff --git a/test/test_codegen.py b/test/test_codegen.py index ce565d6..c957c30 100755 --- a/test/test_codegen.py +++ b/test/test_codegen.py @@ -8,15 +8,15 @@ example_json_1 = { "parameters": ["datarate", "txbytes", "txpower"], "initial_param_values": [None, None, None], "state": { - "IDLE": {"power": {"static": 5,}}, + "IDLE": {"power": {"type": "static", "value": 5}}, "TX": { "power": { - "static": 100, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" - " * parameter(txpower)", - "regression_args": [100, 2], - }, + "type": "analytic", + "value": 100, + "functionStr": "regression_arg(0) + regression_arg(1) * parameter(txpower)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [100, 2], } }, }, @@ -25,15 +25,15 @@ example_json_1 = { "name": "init", "origin": ["UNINITIALIZED", "IDLE"], "destination": "IDLE", - "duration": {"static": 50000,}, + "duration": {"type": "static", "value": 50000}, "set_param": {"txpower": 10}, }, { "name": "setTxPower", "origin": "IDLE", "destination": "IDLE", - "duration": {"static": 120}, - "energy ": {"static": 10000}, + "duration": {"type": "static", "value": 120}, + "energy ": {"type": "static", "value": 10000}, "arg_to_param_map": {0: "txpower"}, "argument_values": [[10, 20, 30]], }, @@ -42,18 +42,22 @@ example_json_1 = { "origin": "IDLE", "destination": "TX", "duration": { - "static": 10, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", - "regression_args": [48, 8], - }, + "type": "analytic", + "value": 10, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * function_arg(1)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [48, 8], }, "energy": { - "static": 3, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", - "regression_args": [3, 5], - }, + "type": "analytic", + "value": 3, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * function_arg(1)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [3, 5], }, "arg_to_param_map": {1: "txbytes"}, "argument_values": [['"foo"', '"hodor"'], [3, 5]], @@ -65,12 +69,13 @@ example_json_1 = { "destination": "IDLE", "is_interrupt": 1, "timeout": { - "static": 2000, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" - " * parameter(txbytes)", - "regression_args": [500, 16], - }, + "type": "analytic", + "value": 2000, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * parameter(txbytes)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [500, 16], }, }, ], diff --git a/test/test_pta.py b/test/test_pta.py index d43e702..00fa19a 100755 --- a/test/test_pta.py +++ b/test/test_pta.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from dfatool.automata import PTA +from dfatool.functions import AnalyticFunction import unittest import yaml @@ -8,15 +9,15 @@ example_json_1 = { "parameters": ["datarate", "txbytes", "txpower"], "initial_param_values": [None, None, None], "state": { - "IDLE": {"power": {"static": 5,}}, + "IDLE": {"power": {"type": "static", "value": 5}}, "TX": { "power": { - "static": 10000, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" - " * parameter(txpower)", - "regression_args": [10000, 2], - }, + "type": "analytic", + "value": 100, + "functionStr": "regression_arg(0) + regression_arg(1) * parameter(txpower)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [10000, 2], } }, }, @@ -25,15 +26,15 @@ example_json_1 = { "name": "init", "origin": ["UNINITIALIZED", "IDLE"], "destination": "IDLE", - "duration": {"static": 50000,}, + "duration": {"type": "static", "value": 50000}, "set_param": {"txpower": 10}, }, { "name": "setTxPower", "origin": "IDLE", "destination": "IDLE", - "duration": {"static": 120}, - "energy ": {"static": 10000}, + "duration": {"type": "static", "value": 120}, + "energy ": {"type": "static", "value": 10000}, "arg_to_param_map": {0: "txpower"}, "argument_values": [[10, 20, 30]], }, @@ -42,18 +43,22 @@ example_json_1 = { "origin": "IDLE", "destination": "TX", "duration": { - "static": 10, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", - "regression_args": [48, 8], - }, + "type": "analytic", + "value": 10, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * function_arg(1)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [48, 8], }, "energy": { - "static": 3, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" " * function_arg(1)", - "regression_args": [3, 5], - }, + "type": "analytic", + "value": 3, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * function_arg(1)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [3, 5], }, "arg_to_param_map": {1: "txbytes"}, "argument_values": [['"foo"', '"hodor"'], [3, 5]], @@ -65,12 +70,13 @@ example_json_1 = { "destination": "IDLE", "is_interrupt": 1, "timeout": { - "static": 2000, - "function": { - "raw": "regression_arg(0) + regression_arg(1)" - " * parameter(txbytes)", - "regression_args": [500, 16], - }, + "type": "analytic", + "value": 2000, + "functionStr": "regression_arg(0) + regression_arg(1)" + " * parameter(txbytes)", + "parameterNames": ["datarate", "txbytes", "txpower"], + "argCount": 0, + "regressionModel": [500, 16], }, }, ], @@ -470,36 +476,12 @@ class TestPTA(unittest.TestCase): def test_from_json_dfs_param(self): pta = PTA.from_json(example_json_1) - no_param = { - "datarate": None, - "txbytes": None, - "txpower": 10, - } - param_tx3 = { - "datarate": None, - "txbytes": 3, - "txpower": 10, - } - param_tx5 = { - "datarate": None, - "txbytes": 5, - "txpower": 10, - } - param_txp10 = { - "datarate": None, - "txbytes": None, - "txpower": 10, - } - param_txp20 = { - "datarate": None, - "txbytes": None, - "txpower": 20, - } - param_txp30 = { - "datarate": None, - "txbytes": None, - "txpower": 30, - } + no_param = {"datarate": None, "txbytes": None, "txpower": 10} + param_tx3 = {"datarate": None, "txbytes": 3, "txpower": 10} + param_tx5 = {"datarate": None, "txbytes": 5, "txpower": 10} + param_txp10 = {"datarate": None, "txbytes": None, "txpower": 10} + param_txp20 = {"datarate": None, "txbytes": None, "txpower": 20} + param_txp30 = {"datarate": None, "txbytes": None, "txpower": 30} self.assertEqual( sorted( dfs_tran_to_name( @@ -533,36 +515,12 @@ class TestPTA(unittest.TestCase): def test_from_yaml_dfs_param(self): pta = PTA.from_yaml(example_yaml_1) - no_param = { - "datarate": None, - "txbytes": None, - "txpower": None, - } - param_tx3 = { - "datarate": None, - "txbytes": 3, - "txpower": None, - } - param_tx5 = { - "datarate": None, - "txbytes": 5, - "txpower": None, - } - param_txp10 = { - "datarate": None, - "txbytes": None, - "txpower": 10, - } - param_txp20 = { - "datarate": None, - "txbytes": None, - "txpower": 20, - } - param_txp30 = { - "datarate": None, - "txbytes": None, - "txpower": 30, - } + no_param = {"datarate": None, "txbytes": None, "txpower": None} + param_tx3 = {"datarate": None, "txbytes": 3, "txpower": None} + param_tx5 = {"datarate": None, "txbytes": 5, "txpower": None} + param_txp10 = {"datarate": None, "txbytes": None, "txpower": 10} + param_txp20 = {"datarate": None, "txbytes": None, "txpower": 20} + param_txp30 = {"datarate": None, "txbytes": None, "txpower": 30} self.assertEqual( sorted( dfs_tran_to_name( @@ -581,36 +539,12 @@ class TestPTA(unittest.TestCase): def test_normalization(self): pta = PTA.from_yaml(example_yaml_2) - no_param = { - "datarate": None, - "txbytes": None, - "txpower": None, - } - param_tx3 = { - "datarate": None, - "txbytes": 3, - "txpower": None, - } - param_tx6 = { - "datarate": None, - "txbytes": 6, - "txpower": None, - } - param_txp10 = { - "datarate": None, - "txbytes": None, - "txpower": -6, - } - param_txp20 = { - "datarate": None, - "txbytes": None, - "txpower": 4, - } - param_txp30 = { - "datarate": None, - "txbytes": None, - "txpower": 14, - } + no_param = {"datarate": None, "txbytes": None, "txpower": None} + param_tx3 = {"datarate": None, "txbytes": 3, "txpower": None} + param_tx6 = {"datarate": None, "txbytes": 6, "txpower": None} + param_txp10 = {"datarate": None, "txbytes": None, "txpower": -6} + param_txp20 = {"datarate": None, "txbytes": None, "txpower": 4} + param_txp30 = {"datarate": None, "txbytes": None, "txpower": 14} self.assertEqual( sorted( dfs_tran_to_name( @@ -690,9 +624,7 @@ class TestPTA(unittest.TestCase): ) pta.add_transition("IDLE", "TX", "send", energy=3, duration=10) pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) - trace = [ - ["init"], - ] + trace = [["init"]] expected_energy = 500000 expected_duration = 50000 result = pta.simulate(trace) @@ -765,9 +697,7 @@ class TestPTA(unittest.TestCase): duration=50000, set_param={"txpower": 10}, ) - trace = [ - ["init"], - ] + trace = [["init"]] expected_energy = 500000 expected_duration = 50000 result = pta.simulate(trace) @@ -795,17 +725,23 @@ class TestPTA(unittest.TestCase): "IDLE", "TX", "send", - energy=3, - duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], - duration_function=lambda param, arg: 48 + 8 * arg[1], + energy=AnalyticFunction( + 3, + "regression_arg(0) + regression_arg(1) * function_arg(1)", + ["txpower", "length"], + regression_args=[3.0, 5], + num_args=2, + ), + duration=AnalyticFunction( + 10, + "regression_arg(0) + regression_arg(1) * function_arg(1)", + ["txpower", "length"], + regression_args=[48, 8], + num_args=2, + ), ) pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) - trace = [ - ["init"], - ["setTxPower", 10], - ["send", "foo", 3], - ] + trace = [["init"], ["setTxPower", 10], ["send", "foo", 3]] expected_energy = 500000 + 10000 + (3 + 5 * 3) + (2000 * 100) expected_duration = 50000 + 120 + (48 + 8 * 3) + 2000 result = pta.simulate(trace) @@ -832,17 +768,23 @@ class TestPTA(unittest.TestCase): "IDLE", "TX", "send", - energy=3, - duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], - duration_function=lambda param, arg: 48 + 8 * arg[1], + energy=AnalyticFunction( + 3, + "regression_arg(0) + regression_arg(1) * function_arg(1)", + ["txpower", "length"], + regression_args=[3, 5], + num_args=2, + ), + duration=AnalyticFunction( + 10, + "regression_arg(0) + regression_arg(1) * function_arg(1)", + ["txpower", "length"], + regression_args=[48, 8], + num_args=2, + ), ) pta.add_transition("TX", "IDLE", "txComplete", timeout=2000, is_interrupt=True) - trace = [ - ["init"], - ["setTxPower", 10], - ["send", "foobar", 6], - ] + trace = [["init"], ["setTxPower", 10], ["send", "foobar", 6]] expected_energy = 500000 + 10000 + (3 + 5 * 6) + (2000 * 100) expected_duration = 50000 + 120 + (48 + 8 * 6) + 2000 result = pta.simulate(trace) @@ -855,7 +797,13 @@ class TestPTA(unittest.TestCase): pta = PTA(parameters=["length", "txpower"]) pta.add_state("IDLE", power=5) pta.add_state( - "TX", power=100, power_function=lambda param, arg: 1000 + 2 * param[1] + "TX", + power=AnalyticFunction( + 100, + "regression_arg(0) + regression_arg(1) * parameter(txpower)", + ["length", "txpower"], + regression_args=[1000, 2], + ), ) pta.add_transition( "UNINITIALIZED", "IDLE", "init", energy=500000, duration=50000 @@ -872,24 +820,29 @@ class TestPTA(unittest.TestCase): "IDLE", "TX", "send", - energy=3, + energy=AnalyticFunction( + 3, + "regression_arg(0) + regression_arg(1) * function_arg(1)", + ["length", "txpower"], + regression_args=[3, 5], + num_args=2, + ), duration=10, - energy_function=lambda param, arg: 3 + 5 * arg[1], param_update_function=lambda param, arg: {**param, "length": arg[1]}, ) pta.add_transition( "TX", "IDLE", "txComplete", - timeout=2000, is_interrupt=True, - timeout_function=lambda param, arg: 500 + 16 * param[0], + timeout=AnalyticFunction( + 2000, + "regression_arg(0) + regression_arg(1) * parameter(length)", + ["length", "txpower"], + regression_args=[500, 16], + ), ) - trace = [ - ["init"], - ["setTxPower", 10], - ["send", "foo", 3], - ] + trace = [["init"], ["setTxPower", 10], ["send", "foo", 3]] expected_energy = ( 500000 + 10000 + (3 + 5 * 3) + (1000 + 2 * 10) * (500 + 16 * 3) ) -- cgit v1.2.3