#!/usr/bin/env python3 """ analyze-archive - generate PTA energy model from dfatool benchmark traces analyze-archive generates a PTA energy model from one or more annotated traces generated by dfatool. By default, it does nothing else. Cross-Validation help: If is "montecarlo": Randomly divide data into 2/3 training and 1/3 validation, times. Reported model quality is the average of all validation runs. Data is partitioned without regard for parameter values, so a specific parameter combination may be present in both training and validation sets or just one of them. If is "kfold": Perform k-fold cross validation with k=. Divide data into 1-1/k training and 1/k validation, times. In the first set, items 0, k, 2k, ... ard used for validation, in the second set, items 1, k+1, 2k+1, ... and so on. validation, times. Reported model quality is the average of all validation runs. Data is partitioned without regard for parameter values, so a specific parameter combination may be present in both training and validation sets or just one of them. Trace Export: Each JSON file lists all occurences of the corresponding state/transition in the benchmark's PTA trace. Each occurence contains the corresponding PTA parameters (if any) in 'parameter' and measurement results in 'offline'. As measurements are typically run repeatedly, 'offline' is in turn a list of measurements: offline[0]['uW'] is the power trace of the first measurement of this state/transition, offline[1]['uW'] corresponds t the second measurement, etc. Values are provided in microwatts. For example, TX.json[0].offline[0].uW corresponds to the first measurement of the first TX state in the benchmark, and TX.json[5].offline[2].uW corresponds to the third measurement of the sixth TX state in the benchmark. WARNING: Several GB of RAM and disk space are required for complex measurements. (JSON files may grow very large -- we trade efficiency for easy handling) """ import argparse import json import logging import random import re import sys from dfatool import plotter from dfatool.loader import RawData, pta_trace_to_aggregate from dfatool.functions import ( gplearn_to_function, SplitFunction, AnalyticFunction, SubstateFunction, StaticFunction, ) from dfatool.model import PTAModel from dfatool.validation import CrossValidator from dfatool.utils import ( filter_aggregate_by_param, detect_outliers_in_aggregate, NpEncoder, ) from dfatool.automata import PTA def print_model_quality(results): for state_or_tran in results.keys(): print() for key, result in results[state_or_tran].items(): if "smape" in result: print( "{:20s} {:15s} {:.2f}% / {:.0f}".format( state_or_tran, key, result["smape"], result["mae"] ) ) else: print("{:20s} {:15s} {:.0f}".format(state_or_tran, key, result["mae"])) def format_quality_measures(result): if "smape" in result: return "{:6.2f}% / {:9.0f}".format(result["smape"], result["mae"]) else: return "{:6} {:9.0f}".format("", result["mae"]) def model_quality_table(header, result_lists, info_list): print( "{:20s} {:15s} {:19s} {:19s} {:19s}".format( "key", "attribute", header[0].center(19), header[1].center(19), header[2].center(19), ) ) for state_or_tran in result_lists[0].keys(): for key in result_lists[0][state_or_tran].keys(): buf = "{:20s} {:15s}".format(state_or_tran, key) for i, results in enumerate(result_lists): info = info_list[i] buf += " ||| " if ( info is None or ( key != "energy_Pt" and type(info(state_or_tran, key)) is not StaticFunction ) or ( key == "energy_Pt" and ( type(info(state_or_tran, "power")) is not StaticFunction or type(info(state_or_tran, "duration")) is not StaticFunction ) ) ): result = results[state_or_tran][key] buf += format_quality_measures(result) else: buf += "{:7}----{:8}".format("", "") print(buf) def model_summary_table(result_list): buf = "transition duration" for results in result_list: if len(buf): buf += " ||| " buf += format_quality_measures(results["duration_by_trace"]) print(buf) buf = "total energy " for results in result_list: if len(buf): buf += " ||| " buf += format_quality_measures(results["energy_by_trace"]) print(buf) buf = "rel total energy " for results in result_list: if len(buf): buf += " ||| " buf += format_quality_measures(results["rel_energy_by_trace"]) print(buf) buf = "state-only energy " for results in result_list: if len(buf): buf += " ||| " buf += format_quality_measures(results["state_energy_by_trace"]) print(buf) buf = "transition timeout " for results in result_list: if len(buf): buf += " ||| " buf += format_quality_measures(results["timeout_by_trace"]) print(buf) def print_text_model_data(model, pm, pq, lm, lq, am, ai, aq): print("") print(r"key attribute $1 - \frac{\sigma_X}{...}$") for state_or_tran in model.by_name.keys(): for attribute in model.attributes(state_or_tran): print( "{} {} {:.8f}".format( state_or_tran, attribute, model.attr_by_name[state_or_tran][ attr_by_name ].stats.generic_param_dependence_ratio(), ) ) print("") print(r"key attribute parameter $1 - \frac{...}{...}$") for state_or_tran in model.by_name.keys(): for attribute in model.attributes(state_or_tran): for param in model.parameters: print( "{} {} {} {:.8f}".format( state_or_tran, attribute, param, model.attr_by_name[state_or_tran][ attribute ].stats.param_dependence_ratio(param), ) ) if state_or_tran in model._num_args: for arg_index in range(model._num_args[state_or_tran]): print( "{} {} {:d} {:.8f}".format( state_or_tran, attribute, arg_index, model.attr_by_name[state_or_tran][ attribute ].stats.arg_dependence_ratio(arg_index), ) ) def print_html_model_data(raw_data, model, pm, pq, lm, lq, am, ai, aq): state_attributes = model.attributes(model.states[0]) trans_attributes = model.attributes(model.transitions[0]) print("# Setup") print("* Input files: `", " ".join(raw_data.filenames), "`") print( f"""* Number of usable / performed measurements: {raw_data.preprocessing_stats["num_valid"]}/{raw_data.preprocessing_stats["num_runs"]}""" ) print(f"""* State duration: {raw_data.setup_by_fileno[0]["state_duration"]} ms""") print() print("# States") for state in model.states: print() print(f"## {state}") print() for param in model.parameters: print( "* {} ∈ {}".format( param, model.attr_by_name[state][ "power" ].stats.distinct_values_by_param_name[param], ) ) for attribute in state_attributes: unit = "" if attribute == "power": unit = "µW" static_quality = pq[state][attribute]["smape"] print( f"* {attribute} mean: {pm(state, attribute):.0f} {unit} (± {static_quality:.1f}%)" ) if ai(state, attribute): analytic_quality = aq[state][attribute]["smape"] fstr = ai(state, attribute)["function"].model_function fstr = fstr.replace("0 + ", "", 1) for i, marg in enumerate(ai(state, attribute)["function"].model_args): fstr = fstr.replace(f"regression_arg({i})", str(marg)) fstr = fstr.replace("+ -", "-") print(f"* {attribute} function: {fstr} (± {analytic_quality:.1f}%)") print() print("# Transitions") for trans in model.transitions: print() print(f"## {trans}") print() for param in model.parameters: print( "* {} ∈ {}".format( param, model.attr_by_name[trans][ "duration" ].stats.distinct_values_by_param_name[param], ) ) for attribute in trans_attributes: unit = "" if attribute == "duration": unit = "µs" elif attribute in ["energy", "rel_energy_prev"]: unit = "pJ" static_quality = pq[trans][attribute]["smape"] print( f"* {attribute} mean: {pm(trans, attribute):.0f} {unit} (± {static_quality:.1f}%)" ) if ai(trans, attribute): analytic_quality = aq[trans][attribute]["smape"] fstr = ai(trans, attribute)["function"].model_function fstr = fstr.replace("0 + ", "", 1) for i, marg in enumerate(ai(trans, attribute)["function"].model_args): fstr = fstr.replace(f"regression_arg({i})", str(marg)) fstr = fstr.replace("+ -", "-") print(f"* {attribute} function: {fstr} (± {analytic_quality:.1f}%)") print( "" ) for state in model.states: print("", end="") print("".format(state), end="") for attribute in state_attributes: unit = "" if attribute == "power": unit = "µW" print( "".format( pm(state, attribute), unit, pq[state][attribute]["smape"] ), end="", ) print("") print("
state" + "".join(state_attributes) + "
{}{:.0f} {} ({:.1f}%)
") trans_attributes = model.attributes(model.transitions[0]) if "rel_energy_prev" in trans_attributes: trans_attributes.remove("rel_energy_next") print( "" ) for trans in model.transitions: print("", end="") print("".format(trans), end="") for attribute in trans_attributes: unit = "" if attribute == "duration": unit = "µs" elif attribute in ["energy", "rel_energy_prev"]: unit = "pJ" print( "".format( pm(trans, attribute), unit, pq[trans][attribute]["smape"] ), end="", ) print("") print("
transition" + "".join(trans_attributes) + "
{}{:.0f} {} ({:.1f}%)
") def plot_traces(preprocessed_data, sot_name): traces = list() timestamps = list() for trace in preprocessed_data: for state_or_transition in trace["trace"]: if state_or_transition["name"] == sot_name: timestamps.extend( map(lambda x: x["plot"][0], state_or_transition["offline"]) ) traces.extend( map(lambda x: x["plot"][1], state_or_transition["offline"]) ) if len(traces) == 0: print( f"""Did not find traces for state or transition {sot_name}. Abort.""", file=sys.stderr, ) sys.exit(2) if len(traces) > 40: print(f"""Truncating plot to 40 of {len(traces)} traces (random sample)""") indexes = random.sample(range(len(traces)), 40) timestamps = [timestamps[i] for i in indexes] traces = [traces[i] for i in indexes] plotter.plot_xy( timestamps, traces, xlabel="t [s]", ylabel="P [W]", title=sot_name, family=True ) def print_static(model, static_model, name, attribute): unit = " " if attribute == "power": unit = "µW" elif attribute == "duration": unit = "µs" elif attribute == "substate_count": unit = "su" print( "{:10s}: {:.0f} {:s} ({:.2f})".format( name, static_model(name, attribute), unit, model.attr_by_name[name][attribute].stats.generic_param_dependence_ratio(), ) ) for param in model.parameters: print( "{:10s} dependence on {:15s}: {:.2f}".format( "", param, model.attr_by_name[name][attribute].stats.param_dependence_ratio(param), ) ) def print_analyticinfo(prefix, info): empty = "" print(f"{prefix}: {info.model_function}") print(f"{empty:{len(prefix)}s} {info.model_args}") def print_splitinfo(param_names, info, prefix=""): if type(info) is SplitFunction: for k, v in info.child.items(): if info.param_index < len(param_names): param_name = param_names[info.param_index] else: param_name = f"arg{info.param_index - len(param_names)}" print_splitinfo(param_names, v, f"{prefix} {param_name}={k}") elif type(info) is AnalyticFunction: print_analyticinfo(prefix, info) elif type(info) is StaticFunction: print(f"{prefix}: {info.value}") else: print(f"{prefix}: UNKNOWN") def _mogrify(function_str, parameter_names, regression_args): for i in range(len(regression_args)): function_str = function_str.replace( f"regression_arg({i})", str(regression_args[i]) ) for parameter_name in parameter_names: function_str = function_str.replace( f"parameter({parameter_name})", f"""param["{parameter_name}"]""" ) for arg_num in range(10): function_str = function_str.replace( f"function_arg({arg_num})", f"args[{arg_num}]" ) function_str = function_str.replace("np.", "Math.") return "#![modelfunction]" + function_str def mogrify(model): if type(model) is dict: if "functionStr" in model: model["function"] = _mogrify( model["functionStr"], model["parameterNames"], model["regressionModel"] ) model.pop("functionStr") model.pop("parameterNames") model.pop("regressionModel") else: for k, v in model.items(): model[k] = mogrify(v) elif type(model) is list: for i, elem in enumerate(model): model[i] = mogrify(model[i]) return model if __name__ == "__main__": ignored_trace_indexes = [] safe_functions_enabled = False function_override = {} show_models = [] show_quality = [] pta = None energymodel_export_file = None trace_export_dir = None xv_method = None xv_count = 10 parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ ) parser.add_argument( "--info", action="store_true", help="Show state duration and (for each state and transition) number of measurements and parameter values)", ) parser.add_argument( "--no-cache", action="store_true", help="Do not load cached measurement results" ) parser.add_argument( "--plot-unparam", metavar="::[;::