summaryrefslogtreecommitdiff
path: root/bin/Proof_Of_Concept_PELT.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/Proof_Of_Concept_PELT.py')
-rw-r--r--bin/Proof_Of_Concept_PELT.py523
1 files changed, 341 insertions, 182 deletions
diff --git a/bin/Proof_Of_Concept_PELT.py b/bin/Proof_Of_Concept_PELT.py
index 688c5a7..f0ecfc2 100644
--- a/bin/Proof_Of_Concept_PELT.py
+++ b/bin/Proof_Of_Concept_PELT.py
@@ -27,10 +27,10 @@ from dfatool.validation import CrossValidator
# helper functions. Not used
def plot_data_from_json(filename, trace_num, x_axis, y_axis):
- with open(filename, 'r') as file:
+ with open(filename, "r") as file:
tx_data = json.load(file)
- print(tx_data[trace_num]['parameter'])
- plt.plot(tx_data[trace_num]['offline'][0]['uW'])
+ print(tx_data[trace_num]["parameter"])
+ plt.plot(tx_data[trace_num]["offline"][0]["uW"])
plt.xlabel(x_axis)
plt.ylabel(y_axis)
plt.show()
@@ -49,12 +49,12 @@ def plot_data_vs_data_vs_means(signal1, signal2, x_axis, y_axis):
plt.plot(signal1)
lens = max(len(signal1), len(signal2))
average = np.mean(signal1)
- plt.hlines(average, 0, lens, color='red')
- plt.vlines(len(signal1), 0, 100000, color='red', linestyles='dashed')
+ plt.hlines(average, 0, lens, color="red")
+ plt.vlines(len(signal1), 0, 100000, color="red", linestyles="dashed")
plt.plot(signal2)
average = np.mean(signal2)
- plt.hlines(average, 0, lens, color='green')
- plt.vlines(len(signal2), 0, 100000, color='green', linestyles='dashed')
+ plt.hlines(average, 0, lens, color="green")
+ plt.vlines(len(signal2), 0, 100000, color="green", linestyles="dashed")
plt.xlabel(x_axis)
plt.ylabel(y_axis)
plt.show()
@@ -72,7 +72,7 @@ def get_bkps(algo, pen, q):
# Wrapper for kneedle
-def find_knee_point(data_x, data_y, S=1.0, curve='convex', direction='decreasing'):
+def find_knee_point(data_x, data_y, S=1.0, curve="convex", direction="decreasing"):
kneedle = KneeLocator(data_x, data_y, S=S, curve=curve, direction=direction)
kneepoint = (kneedle.knee, kneedle.knee_y)
return kneepoint
@@ -111,9 +111,20 @@ def calc_pelt(signal, penalty, model="l1", jump=5, min_dist=2, plotting=False):
# pen_modifier is used as a factor on the resulting penalty.
# the interval [range_min, range_max] is used for searching.
# refresh_delay and refresh_thresh are used to configure the progress "bar".
-def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0, range_max=50,
- num_processes=8, refresh_delay=1, refresh_thresh=5, S=1.0,
- pen_modifier=None, show_plots=False):
+def calculate_penalty_value(
+ signal,
+ model="l1",
+ jump=5,
+ min_dist=2,
+ range_min=0,
+ range_max=50,
+ num_processes=8,
+ refresh_delay=1,
+ refresh_thresh=5,
+ S=1.0,
+ pen_modifier=None,
+ show_plots=False,
+):
# default params in Function
if model is None:
model = "l1"
@@ -206,7 +217,7 @@ def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0,
longest_start = -1
longest_end = -1
prev_val = -1
- for i, num_bkpts in enumerate(fitted_bkps_val[knee[0]:]):
+ for i, num_bkpts in enumerate(fitted_bkps_val[knee[0] :]):
if num_bkpts != prev_val:
end_index = i - 1
if end_index - start_index > longest_end - longest_start:
@@ -214,7 +225,7 @@ def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0,
longest_start = start_index
longest_end = end_index
start_index = i
- if i == len(fitted_bkps_val[knee[0]:]) - 1:
+ if i == len(fitted_bkps_val[knee[0] :]) - 1:
# end sequence with last value
end_index = i
# # since it is not guaranteed that this is the end of the plateau, assume the mid
@@ -230,11 +241,15 @@ def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0,
start_index = i
prev_val = num_bkpts
if show_plots:
- plt.xlabel('Penalty')
- plt.ylabel('Number of Changepoints')
+ plt.xlabel("Penalty")
+ plt.ylabel("Number of Changepoints")
plt.plot(pen_val, fitted_bkps_val)
- plt.vlines(longest_start + knee[0], 0, max(fitted_bkps_val), linestyles='dashed')
- plt.vlines(longest_end + knee[0], 0, max(fitted_bkps_val), linestyles='dashed')
+ plt.vlines(
+ longest_start + knee[0], 0, max(fitted_bkps_val), linestyles="dashed"
+ )
+ plt.vlines(
+ longest_end + knee[0], 0, max(fitted_bkps_val), linestyles="dashed"
+ )
plt.show()
# choosing pen from plateau
mid_of_plat = longest_start + (longest_end - longest_start) // 2
@@ -250,8 +265,11 @@ def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0,
if knee[0] is not None:
return knee
- print_error("With the current thresh-hold S=" + str(S)
- + " it is not possible to select a penalty value.")
+ print_error(
+ "With the current thresh-hold S="
+ + str(S)
+ + " it is not possible to select a penalty value."
+ )
sys.exit(-1)
@@ -260,7 +278,7 @@ def calculate_penalty_value(signal, model="l1", jump=5, min_dist=2, range_min=0,
# penalty, model and jump are directly passed to pelt
def calc_raw_states_func(num_measurement, measurement, penalty, model, jump):
# extract signal
- signal = np.array(measurement['uW'])
+ signal = np.array(measurement["uW"])
# norm signal to remove dependency on absolute values
normed_signal = norm_signal(signal)
# calculate the breakpoints
@@ -274,7 +292,7 @@ def calc_raw_states_func(num_measurement, measurement, penalty, model, jump):
# (Transitions are instantaneous)
start_time = end_time
end_time = bkpt
- power_vals = signal[start_time: end_time]
+ power_vals = signal[start_time:end_time]
mean_power = np.mean(power_vals)
std_dev = np.std(power_vals)
calced_state = (start_time, end_time, mean_power, std_dev)
@@ -294,7 +312,7 @@ def calc_raw_states_func(num_measurement, measurement, penalty, model, jump):
new_avg_std = new_avg_std / len(calced_states)
else:
new_avg_std = 0
- change_avg_std = measurement['uW_std'] - new_avg_std
+ change_avg_std = measurement["uW_std"] - new_avg_std
# print_info("The average standard deviation for the newly found states is "
# + str(new_avg_std))
# print_info("That is a reduction of " + str(change_avg_std))
@@ -318,7 +336,9 @@ def needs_refinement(signal, thresh):
percentile_size = int()
percentile_size = length_of_signal // 100
lower_percentile = sorted_signal[0:percentile_size]
- upper_percentile = sorted_signal[length_of_signal - percentile_size: length_of_signal]
+ upper_percentile = sorted_signal[
+ length_of_signal - percentile_size : length_of_signal
+ ]
lower_percentile_mean = np.mean(lower_percentile)
upper_percentile_mean = np.mean(upper_percentile)
median = np.median(sorted_signal)
@@ -330,22 +350,23 @@ def needs_refinement(signal, thresh):
return True
return False
+
# helper functions for user output
# TODO: maybe switch with python logging feature
def print_info(str_to_prt):
- str_lst = str_to_prt.split(sep='\n')
+ str_lst = str_to_prt.split(sep="\n")
for str_prt in str_lst:
print("[INFO]" + str_prt)
def print_warning(str_to_prt):
- str_lst = str_to_prt.split(sep='\n')
+ str_lst = str_to_prt.split(sep="\n")
for str_prt in str_lst:
print("[WARNING]" + str_prt)
def print_error(str_to_prt):
- str_lst = str_to_prt.split(sep='\n')
+ str_lst = str_to_prt.split(sep="\n")
for str_prt in str_lst:
print("[ERROR]" + str_prt, file=sys.stderr)
@@ -383,7 +404,7 @@ def get_state_num(state_name, distinct_states):
return -1
-if __name__ == '__main__':
+if __name__ == "__main__":
# OPTION RECOGNITION
opt = dict()
@@ -429,87 +450,87 @@ if __name__ == '__main__':
optname = re.sub(r"^--", "", option)
opt[optname] = parameter
- if 'filename' not in opt:
+ if "filename" not in opt:
print_error("No file specified!")
sys.exit(-1)
else:
- opt_filename = opt['filename']
- if 'v' in opt:
+ opt_filename = opt["filename"]
+ if "v" in opt:
opt_verbose = True
opt_plotting = True
- if 'model' in opt:
- opt_model = opt['model']
- if 'jump' in opt:
+ if "model" in opt:
+ opt_model = opt["model"]
+ if "jump" in opt:
try:
- opt_jump = int(opt['jump'])
+ opt_jump = int(opt["jump"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'min_dist' in opt:
+ if "min_dist" in opt:
try:
- opt_min_dist = int(opt['min_dist'])
+ opt_min_dist = int(opt["min_dist"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'range_min' in opt:
+ if "range_min" in opt:
try:
- opt_range_min = int(opt['range_min'])
+ opt_range_min = int(opt["range_min"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'range_max' in opt:
+ if "range_max" in opt:
try:
- opt_range_max = int(opt['range_max'])
+ opt_range_max = int(opt["range_max"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'num_processes' in opt:
+ if "num_processes" in opt:
try:
- opt_num_processes = int(opt['num_processes'])
+ opt_num_processes = int(opt["num_processes"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'refresh_delay' in opt:
+ if "refresh_delay" in opt:
try:
- opt_refresh_delay = int(opt['refresh_delay'])
+ opt_refresh_delay = int(opt["refresh_delay"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'refresh_thresh' in opt:
+ if "refresh_thresh" in opt:
try:
- opt_refresh_thresh = int(opt['refresh_thresh'])
+ opt_refresh_thresh = int(opt["refresh_thresh"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'S' in opt:
+ if "S" in opt:
try:
- opt_S = float(opt['S'])
+ opt_S = float(opt["S"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'pen_override' in opt:
+ if "pen_override" in opt:
try:
- opt_pen_override = int(opt['pen_override'])
+ opt_pen_override = int(opt["pen_override"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'pen_modifier' in opt:
+ if "pen_modifier" in opt:
try:
- opt_pen_modifier = float(opt['pen_modifier'])
+ opt_pen_modifier = float(opt["pen_modifier"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'refinement_thresh' in opt:
+ if "refinement_thresh" in opt:
try:
- opt_refinement_thresh = int(opt['refinement_thresh'])
+ opt_refinement_thresh = int(opt["refinement_thresh"])
except ValueError as verr:
print(verr, file=sys.stderr)
sys.exit(-1)
- if 'cache_dicts' in opt:
- if 'cache_loc' in opt:
- opt_cache_loc = opt['cache_loc']
+ if "cache_dicts" in opt:
+ if "cache_loc" in opt:
+ opt_cache_loc = opt["cache_loc"]
else:
- print_error("If \"cache_dicts\" is set, \"cache_loc\" must be provided.")
+ print_error('If "cache_dicts" is set, "cache_loc" must be provided.')
sys.exit(-1)
except getopt.GetoptError as err:
print(err, file=sys.stderr)
@@ -519,8 +540,11 @@ if __name__ == '__main__':
if ".json" in opt_filename:
# open file with trace data from json
print_info(
- "Will only refine the state which is present in " + opt_filename + " if necessary.")
- with open(opt_filename, 'r') as f:
+ "Will only refine the state which is present in "
+ + opt_filename
+ + " if necessary."
+ )
+ with open(opt_filename, "r") as f:
configurations = json.load(f)
# for i in range(0, 7):
@@ -562,57 +586,78 @@ if __name__ == '__main__':
if flag:
print_info("The cache will be build.")
else:
- print_warning("THE OPTION \"cache_dicts\" IS FOR DEBUGGING PURPOSES ONLY! "
- "\nDO NOT USE FOR REGULAR APPLICATIONS!"
- "\nThis will possibly not be maintained in further development.")
+ print_warning(
+ 'THE OPTION "cache_dicts" IS FOR DEBUGGING PURPOSES ONLY! '
+ "\nDO NOT USE FOR REGULAR APPLICATIONS!"
+ "\nThis will possibly not be maintained in further development."
+ )
from_cache = True
- big_state_name = configurations[0]['name']
+ big_state_name = configurations[0]["name"]
if None in (by_param_file, by_name_file, param_names_file):
state_durations_by_config = []
state_consumptions_by_config = []
# loop through all traces check if refinement is necessary and if necessary refine it.
for num_config, measurements_by_config in enumerate(configurations):
# loop through all occurrences of the looked at state
- print_info("Looking at state '" + measurements_by_config['name'] + "' with params: "
- + str(measurements_by_config['parameter']) + "(" + str(
- num_config + 1) + "/"
- + str(len(configurations)) + ")")
+ print_info(
+ "Looking at state '"
+ + measurements_by_config["name"]
+ + "' with params: "
+ + str(measurements_by_config["parameter"])
+ + "("
+ + str(num_config + 1)
+ + "/"
+ + str(len(configurations))
+ + ")"
+ )
num_needs_refine = 0
print_info("Checking if refinement is necessary...")
- for measurement in measurements_by_config['offline']:
+ for measurement in measurements_by_config["offline"]:
# loop through measurements of particular state
# an check if state needs refinement
- signal = measurement['uW']
+ signal = measurement["uW"]
# mean = measurement['uW_mean']
if needs_refinement(signal, opt_refinement_thresh):
num_needs_refine = num_needs_refine + 1
if num_needs_refine == 0:
print_info(
- "No refinement necessary for state '" + measurements_by_config['name']
- + "' with params: " + str(measurements_by_config['parameter']))
- elif num_needs_refine < len(measurements_by_config['offline']) / 2:
+ "No refinement necessary for state '"
+ + measurements_by_config["name"]
+ + "' with params: "
+ + str(measurements_by_config["parameter"])
+ )
+ elif num_needs_refine < len(measurements_by_config["offline"]) / 2:
print_info(
- "No refinement necessary for state '" + measurements_by_config['name']
- + "' with params: " + str(measurements_by_config['parameter']))
+ "No refinement necessary for state '"
+ + measurements_by_config["name"]
+ + "' with params: "
+ + str(measurements_by_config["parameter"])
+ )
print_warning(
"However this decision was not unanimously. This could hint a poor"
- "measurement quality.")
+ "measurement quality."
+ )
else:
- if num_needs_refine != len(measurements_by_config['parameter']):
+ if num_needs_refine != len(measurements_by_config["parameter"]):
print_warning(
"However this decision was not unanimously. This could hint a poor"
- "measurement quality.")
+ "measurement quality."
+ )
# assume that all measurements of the same param configuration are fundamentally
# similar -> calculate penalty for first measurement, use it for all
if opt_pen_override is None:
- signal = np.array(measurements_by_config['offline'][0]['uW'])
+ signal = np.array(measurements_by_config["offline"][0]["uW"])
normed_signal = norm_signal(signal)
- penalty = calculate_penalty_value(normed_signal, model=opt_model,
- range_min=opt_range_min,
- range_max=opt_range_max,
- num_processes=opt_num_processes,
- jump=opt_jump, S=opt_S,
- pen_modifier=opt_pen_modifier)
+ penalty = calculate_penalty_value(
+ normed_signal,
+ model=opt_model,
+ range_min=opt_range_min,
+ range_max=opt_range_max,
+ num_processes=opt_num_processes,
+ jump=opt_jump,
+ S=opt_S,
+ pen_modifier=opt_pen_modifier,
+ )
penalty = penalty[0]
else:
penalty = opt_pen_override
@@ -620,12 +665,16 @@ if __name__ == '__main__':
print_info("Starting raw_states calculation.")
raw_states_calc_args = []
for num_measurement, measurement in enumerate(
- measurements_by_config['offline']):
- raw_states_calc_args.append((num_measurement, measurement, penalty,
- opt_model, opt_jump))
+ measurements_by_config["offline"]
+ ):
+ raw_states_calc_args.append(
+ (num_measurement, measurement, penalty, opt_model, opt_jump)
+ )
- raw_states_list = [None] * len(measurements_by_config['offline'])
- raw_states_res = calc_raw_states(raw_states_calc_args, opt_num_processes)
+ raw_states_list = [None] * len(measurements_by_config["offline"])
+ raw_states_res = calc_raw_states(
+ raw_states_calc_args, opt_num_processes
+ )
# extracting result and putting it in correct order -> index of raw_states_list
# entry still corresponds with index of measurement in measurements_by_states
# -> If measurements are discarded the used ones are easily recognized
@@ -637,9 +686,13 @@ if __name__ == '__main__':
# FIXME: Wieso gibt mir meine IDE hier eine Warning aus? Der Index müsste doch
# int sein oder nicht? Es scheint auch vernünftig zu klappen...
raw_states_list[num_measurement] = raw_states
- print_info("The average standard deviation for the newly found states in "
- + "measurement No. " + str(num_measurement) + " is "
- + str(avg_std))
+ print_info(
+ "The average standard deviation for the newly found states in "
+ + "measurement No. "
+ + str(num_measurement)
+ + " is "
+ + str(avg_std)
+ )
print_info("That is a reduction of " + str(change_avg_std))
# l_signal = measurements_by_config['offline'][num_measurement]['uW']
# l_bkpts = [s[1] for s in raw_states]
@@ -652,15 +705,21 @@ if __name__ == '__main__':
num_states_array[i] = len(x)
avg_num_states = np.mean(num_states_array)
num_states_dev = np.std(num_states_array)
- print_info("On average " + str(avg_num_states)
- + " States have been found. The standard deviation"
- + " is " + str(num_states_dev))
+ print_info(
+ "On average "
+ + str(avg_num_states)
+ + " States have been found. The standard deviation"
+ + " is "
+ + str(num_states_dev)
+ )
# TODO: MAGIC NUMBER
if num_states_dev > 1:
- print_warning("The number of states varies strongly across measurements."
- " Consider choosing a larger range for penalty detection."
- " It is also possible, that the processed data is not accurate"
- " enough to produce proper results.")
+ print_warning(
+ "The number of states varies strongly across measurements."
+ " Consider choosing a larger range for penalty detection."
+ " It is also possible, that the processed data is not accurate"
+ " enough to produce proper results."
+ )
time.sleep(5)
# TODO: Wie bekomme ich da jetzt raus, was die Wahrheit ist?
# Einfach Durchschnitt nehmen?
@@ -668,11 +727,14 @@ if __name__ == '__main__':
# frequent state count
counts = np.bincount(num_states_array)
num_raw_states = np.argmax(counts)
- print_info("Choose " + str(num_raw_states) + " as number of raw_states.")
+ print_info(
+ "Choose " + str(num_raw_states) + " as number of raw_states."
+ )
if num_raw_states == 1:
print_info(
"Upon further inspection it is clear that no refinement is necessary."
- " The macromodel is usable for this configuration.")
+ " The macromodel is usable for this configuration."
+ )
continue
# iterate through all found breakpoints and determine start and end points as well
# as power consumption
@@ -687,8 +749,12 @@ if __name__ == '__main__':
if len(raw_states) == num_raw_states:
num_used_measurements = num_used_measurements + 1
for num_state, s in enumerate(raw_states):
- states_duration_list[num_state][num_measurement] = s[1] - s[0]
- states_consumption_list[num_state][num_measurement] = s[2]
+ states_duration_list[num_state][num_measurement] = (
+ s[1] - s[0]
+ )
+ states_consumption_list[num_state][num_measurement] = s[
+ 2
+ ]
# calced_state = (start_time, end_time, mean_power, std_dev)
# for num_state, s in enumerate(raw_states):
# state_duration = s[1] - s[0]
@@ -698,9 +764,12 @@ if __name__ == '__main__':
# states_consumption_list[num_state] = \
# states_consumption_list[num_state] + state_consumption
else:
- print_info("Discarding measurement No. " + str(num_measurement)
- + " because it did not recognize the number of "
- "raw_states correctly.")
+ print_info(
+ "Discarding measurement No. "
+ + str(num_measurement)
+ + " because it did not recognize the number of "
+ "raw_states correctly."
+ )
# l_signal = measurements_by_config['offline'][num_measurement]['uW']
# l_bkpts = [s[1] for s in raw_states]
# fig, ax = rpt.display(np.array(l_signal), l_bkpts)
@@ -711,65 +780,88 @@ if __name__ == '__main__':
# states_consumption_list[i] = x / num_used_measurements
if num_used_measurements != len(raw_states_list):
if num_used_measurements / len(raw_states_list) <= 0.5:
- print_warning("Only used " + str(num_used_measurements) + "/"
- + str(len(raw_states_list))
- + " Measurements for refinement. "
- + "Others did not recognize number of states correctly."
- + "\nYou should verify the integrity of the measurements.")
+ print_warning(
+ "Only used "
+ + str(num_used_measurements)
+ + "/"
+ + str(len(raw_states_list))
+ + " Measurements for refinement. "
+ + "Others did not recognize number of states correctly."
+ + "\nYou should verify the integrity of the measurements."
+ )
else:
- print_info("Used " + str(num_used_measurements) + "/"
- + str(len(raw_states_list)) + " Measurements for refinement."
- + " Others did not recognize number of states correctly.")
+ print_info(
+ "Used "
+ + str(num_used_measurements)
+ + "/"
+ + str(len(raw_states_list))
+ + " Measurements for refinement."
+ + " Others did not recognize number of states correctly."
+ )
num_used_measurements = i
else:
print_info("Used all available measurements.")
state_durations_by_config.append((num_config, states_duration_list))
- state_consumptions_by_config.append((num_config, states_consumption_list))
+ state_consumptions_by_config.append(
+ (num_config, states_consumption_list)
+ )
# combine all state durations and consumptions to parametrized model
if len(state_durations_by_config) == 0:
- print("No refinement necessary for this state. The macromodel is usable.")
+ print(
+ "No refinement necessary for this state. The macromodel is usable."
+ )
sys.exit(1)
- if len(state_durations_by_config) / len(configurations) > 1 / 2 \
- and len(state_durations_by_config) != len(configurations):
+ if len(state_durations_by_config) / len(configurations) > 1 / 2 and len(
+ state_durations_by_config
+ ) != len(configurations):
print_warning(
"Some measurements(>50%) need to be refined, however that is not true for"
" all measurements. This hints a correlation between the structure of"
" the underlying automaton and parameters. Only the ones which need to"
" be refined will be refined. THE RESULT WILL NOT ACCURATELY DEPICT "
- " THE REAL WORLD.")
+ " THE REAL WORLD."
+ )
not_accurate = True
if len(state_durations_by_config) / len(configurations) < 1 / 2:
print_warning(
"Some measurements(<50%) need to be refined, however that is not true for"
" all measurements. This hints a correlation between the structure of"
" the underlying automaton and parameters. Or a poor quality of measurements."
- " No Refinement will be done.")
+ " No Refinement will be done."
+ )
sys.exit(-1)
# this is only necessary because at this state only linear automatons can be modeled.
num_states_array = [int()] * len(state_consumptions_by_config)
- for i, (_, states_consumption_list) in enumerate(state_consumptions_by_config):
+ for i, (_, states_consumption_list) in enumerate(
+ state_consumptions_by_config
+ ):
num_states_array[i] = len(states_consumption_list)
counts = np.bincount(num_states_array)
num_raw_states = np.argmax(counts)
usable_configs = len(state_consumptions_by_config)
# param_list identical for each raw_state
param_list = []
- param_names = configurations[0]['offline_aggregates']['paramkeys'][0]
+ param_names = configurations[0]["offline_aggregates"]["paramkeys"][0]
print_info("param_names: " + str(param_names))
for num_config, states_consumption_list in state_consumptions_by_config:
if len(states_consumption_list) != num_raw_states:
print_warning(
- "Config No." + str(num_config) + " not usable yet due to different "
+ "Config No."
+ + str(num_config)
+ + " not usable yet due to different "
+ "number of states. This hints a correlation between parameters and "
+ "the structure of the resulting automaton. This will be possibly"
+ " supported in a future version of this tool. HOWEVER AT THE MOMENT"
- " THIS WILL LEAD TO INACCURATE RESULTS!")
+ " THIS WILL LEAD TO INACCURATE RESULTS!"
+ )
not_accurate = True
usable_configs = usable_configs - 1
else:
- param_list.extend(configurations[num_config]['offline_aggregates']['param'])
+ param_list.extend(
+ configurations[num_config]["offline_aggregates"]["param"]
+ )
print_info("param_list: " + str(param_list))
if usable_configs == len(state_consumptions_by_config):
@@ -782,7 +874,9 @@ if __name__ == '__main__':
for i in range(num_raw_states):
consumptions_for_state = []
durations_for_state = []
- for j, (_, states_consumption_list) in enumerate(state_consumptions_by_config):
+ for j, (_, states_consumption_list) in enumerate(
+ state_consumptions_by_config
+ ):
if len(states_consumption_list) == num_raw_states:
consumptions_for_state.extend(states_consumption_list[i])
durations_for_state.extend(state_durations_by_config[j][1][i])
@@ -790,8 +884,10 @@ if __name__ == '__main__':
not_accurate = True
usable_configs_2 = usable_configs_2 - 1
if usable_configs_2 != usable_configs:
- print_error("an zwei unterschiedlichen Stellen wurden unterschiedlich viele "
- "Messungen rausgeworfen. Bei Janis beschweren.")
+ print_error(
+ "an zwei unterschiedlichen Stellen wurden unterschiedlich viele "
+ "Messungen rausgeworfen. Bei Janis beschweren."
+ )
state_name = "state_" + str(i)
state_dict = {
"param": param_list,
@@ -799,7 +895,7 @@ if __name__ == '__main__':
"duration": durations_for_state,
"attributes": ["power", "duration"],
# Da kein "richtiger" Automat generiert wird, gibt es auch keine Transitionen
- "isa": "state"
+ "isa": "state",
}
by_name[state_name] = state_dict
by_param = by_name_to_by_param(by_name)
@@ -842,12 +938,22 @@ if __name__ == '__main__':
paramfit.enqueue(state_name, "power", num_param, param_name)
if stats.depends_on_param(state_name, "duration", param_name):
paramfit.enqueue(state_name, "duration", num_param, param_name)
- print_info("State " + state_name + "s power depends on param " + param_name + ":" +
- str(stats.depends_on_param(state_name, "power", param_name))
- )
- print_info("State " + state_name + "s duration depends on param " + param_name + ":"
- + str(stats.depends_on_param(state_name, "duration", param_name))
- )
+ print_info(
+ "State "
+ + state_name
+ + "s power depends on param "
+ + param_name
+ + ":"
+ + str(stats.depends_on_param(state_name, "power", param_name))
+ )
+ print_info(
+ "State "
+ + state_name
+ + "s duration depends on param "
+ + param_name
+ + ":"
+ + str(stats.depends_on_param(state_name, "duration", param_name))
+ )
paramfit.fit()
fit_res_dur_dict = {}
fit_res_pow_dict = {}
@@ -856,13 +962,19 @@ if __name__ == '__main__':
fit_power = paramfit.get_result(state_name, "power")
fit_duration = paramfit.get_result(state_name, "duration")
combined_fit_power = analytic.function_powerset(fit_power, param_names, 0)
- combined_fit_duration = analytic.function_powerset(fit_duration, param_names, 0)
+ combined_fit_duration = analytic.function_powerset(
+ fit_duration, param_names, 0
+ )
combined_fit_power.fit(by_param, state_name, "power")
if not combined_fit_power.fit_success:
- print_warning("Fitting(power) for state " + state_name + " was not succesful!")
+ print_warning(
+ "Fitting(power) for state " + state_name + " was not succesful!"
+ )
combined_fit_duration.fit(by_param, state_name, "duration")
if not combined_fit_duration.fit_success:
- print_warning("Fitting(duration) for state " + state_name + " was not succesful!")
+ print_warning(
+ "Fitting(duration) for state " + state_name + " was not succesful!"
+ )
fit_res_pow_dict[state_name] = combined_fit_power
fit_res_dur_dict[state_name] = combined_fit_duration
# only raw_states with the same number of function parameters can be similar
@@ -876,8 +988,7 @@ if __name__ == '__main__':
for num_arg, arg in enumerate(model_args):
replace_string = "regression_arg(" + str(num_arg) + ")"
model_function = model_function.replace(replace_string, str(arg))
- print_info("Power-Function for state " + state_name + ": "
- + model_function)
+ print_info("Power-Function for state " + state_name + ": " + model_function)
for state_name in by_name.keys():
model_function = str(fit_res_dur_dict[state_name].model_function)
model_args = fit_res_dur_dict[state_name].model_args
@@ -885,8 +996,9 @@ if __name__ == '__main__':
for num_arg, arg in enumerate(model_args):
replace_string = "regression_arg(" + str(num_arg) + ")"
model_function = model_function.replace(replace_string, str(arg))
- print_info("Duration-Function for state " + state_name + ": "
- + model_function)
+ print_info(
+ "Duration-Function for state " + state_name + ": " + model_function
+ )
# sort states in buckets for clustering
similar_raw_state_buckets = {}
for state_name in by_name.keys():
@@ -900,15 +1012,21 @@ if __name__ == '__main__':
# cluster for each Key-Tuple using the function parameters
distinct_states = []
for key_tuple in similar_raw_state_buckets.keys():
- print_info("Key-Tuple " + str(key_tuple) + ": "
- + str(similar_raw_state_buckets[key_tuple]))
+ print_info(
+ "Key-Tuple "
+ + str(key_tuple)
+ + ": "
+ + str(similar_raw_state_buckets[key_tuple])
+ )
similar_states = similar_raw_state_buckets[key_tuple]
if len(similar_states) > 1:
# only necessary to cluster if more than one raw_state has the same function
# configuration
# functions are identical -> num_params and used params are identical
- num_params = num_param_dur_dict[similar_states[0]] + num_param_pow_dict[
- similar_states[0]]
+ num_params = (
+ num_param_dur_dict[similar_states[0]]
+ + num_param_pow_dict[similar_states[0]]
+ )
values_to_cluster = np.zeros((len(similar_states), num_params))
for num_state, state_name in enumerate(similar_states):
dur_params = fit_res_dur_dict[state_name].model_args
@@ -921,11 +1039,14 @@ if __name__ == '__main__':
values_to_cluster[num_state][j] = param
j = j + 1
normed_vals_to_cluster = norm_values_to_cluster(values_to_cluster)
- cluster = AgglomerativeClustering(n_clusters=None, compute_full_tree=True,
- affinity='euclidean',
- linkage='ward',
- # TODO: Magic Number. Beim Evaluieren finetunen
- distance_threshold=1)
+ cluster = AgglomerativeClustering(
+ n_clusters=None,
+ compute_full_tree=True,
+ affinity="euclidean",
+ linkage="ward",
+ # TODO: Magic Number. Beim Evaluieren finetunen
+ distance_threshold=1,
+ )
cluster.fit_predict(values_to_cluster)
cluster_labels = cluster.labels_
print_info("Cluster labels:\n" + str(cluster_labels))
@@ -952,8 +1073,11 @@ if __name__ == '__main__':
state_name = "state_" + str(i)
state_num = get_state_num(state_name, distinct_states)
if state_num == -1:
- print_error("Critical Error when creating the resulting sequence. raw_state state_"
- + str(i) + " could not be mapped to a state.")
+ print_error(
+ "Critical Error when creating the resulting sequence. raw_state state_"
+ + str(i)
+ + " could not be mapped to a state."
+ )
sys.exit(-1)
resulting_sequence[i] = state_num
print("Resulting sequence is: " + str(resulting_sequence))
@@ -981,11 +1105,13 @@ if __name__ == '__main__':
"duration": durations_for_state,
"attributes": ["power", "duration"],
# Da kein richtiger Automat generiert wird, gibt es auch keine Transitionen
- "isa": "state"
+ "isa": "state",
}
new_by_name[state_name] = new_state_dict
new_by_param = by_name_to_by_param(new_by_name)
- new_stats = parameters.ParamStats(new_by_name, new_by_param, param_names, dict())
+ new_stats = parameters.ParamStats(
+ new_by_name, new_by_param, param_names, dict()
+ )
new_paramfit = ParallelParamFit(new_by_param)
for state_name in new_by_name.keys():
for num_param, param_name in enumerate(param_names):
@@ -993,12 +1119,24 @@ if __name__ == '__main__':
new_paramfit.enqueue(state_name, "power", num_param, param_name)
if new_stats.depends_on_param(state_name, "duration", param_name):
new_paramfit.enqueue(state_name, "duration", num_param, param_name)
- print_info("State " + state_name + "s power depends on param " + param_name + ":" +
- str(new_stats.depends_on_param(state_name, "power", param_name))
- )
- print_info("State " + state_name + "s duration depends on param " + param_name + ":"
- + str(new_stats.depends_on_param(state_name, "duration", param_name))
- )
+ print_info(
+ "State "
+ + state_name
+ + "s power depends on param "
+ + param_name
+ + ":"
+ + str(new_stats.depends_on_param(state_name, "power", param_name))
+ )
+ print_info(
+ "State "
+ + state_name
+ + "s duration depends on param "
+ + param_name
+ + ":"
+ + str(
+ new_stats.depends_on_param(state_name, "duration", param_name)
+ )
+ )
new_paramfit.fit()
new_fit_res_dur_dict = {}
new_fit_res_pow_dict = {}
@@ -1006,13 +1144,19 @@ if __name__ == '__main__':
fit_power = new_paramfit.get_result(state_name, "power")
fit_duration = new_paramfit.get_result(state_name, "duration")
combined_fit_power = analytic.function_powerset(fit_power, param_names, 0)
- combined_fit_duration = analytic.function_powerset(fit_duration, param_names, 0)
+ combined_fit_duration = analytic.function_powerset(
+ fit_duration, param_names, 0
+ )
combined_fit_power.fit(new_by_param, state_name, "power")
if not combined_fit_power.fit_success:
- print_warning("Fitting(power) for state " + state_name + " was not succesful!")
+ print_warning(
+ "Fitting(power) for state " + state_name + " was not succesful!"
+ )
combined_fit_duration.fit(new_by_param, state_name, "duration")
if not combined_fit_duration.fit_success:
- print_warning("Fitting(duration) for state " + state_name + " was not succesful!")
+ print_warning(
+ "Fitting(duration) for state " + state_name + " was not succesful!"
+ )
new_fit_res_pow_dict[state_name] = combined_fit_power
new_fit_res_dur_dict[state_name] = combined_fit_duration
# output results
@@ -1026,10 +1170,14 @@ if __name__ == '__main__':
for num_arg, arg in enumerate(model_args):
replace_string = "regression_arg(" + str(num_arg) + ")"
model_function = model_function.replace(replace_string, str(arg))
- print("Power-Function for state " + state_name + ": "
- + model_function)
- f.write("Power-Function for state " + state_name + ": "
- + model_function + "\n")
+ print("Power-Function for state " + state_name + ": " + model_function)
+ f.write(
+ "Power-Function for state "
+ + state_name
+ + ": "
+ + model_function
+ + "\n"
+ )
f.write("\n\n")
for state_name in new_by_name.keys():
model_function = str(new_fit_res_dur_dict[state_name].model_function)
@@ -1037,16 +1185,25 @@ if __name__ == '__main__':
for num_arg, arg in enumerate(model_args):
replace_string = "regression_arg(" + str(num_arg) + ")"
model_function = model_function.replace(replace_string, str(arg))
- print("Duration-Function for state " + state_name + ": "
- + model_function)
- f.write("Duration-Function for state " + state_name + ": "
- + model_function + "\n")
+ print(
+ "Duration-Function for state " + state_name + ": " + model_function
+ )
+ f.write(
+ "Duration-Function for state "
+ + state_name
+ + ": "
+ + model_function
+ + "\n"
+ )
if not_accurate:
print_warning(
"THIS RESULT IS NOT ACCURATE. SEE WARNINGLOG TO GET A BETTER UNDERSTANDING"
- " WHY.")
- f.write("THIS RESULT IS NOT ACCURATE. SEE WARNINGLOG TO GET A BETTER UNDERSTANDING"
- " WHY.")
+ " WHY."
+ )
+ f.write(
+ "THIS RESULT IS NOT ACCURATE. SEE WARNINGLOG TO GET A BETTER UNDERSTANDING"
+ " WHY."
+ )
# Removed clustering at this point, since it provided too much difficulties
# at the current state. Clustering is still used, but at another point of execution.
@@ -1227,16 +1384,18 @@ if __name__ == '__main__':
# open with dfatool
raw_data_args = list()
raw_data_args.append(opt_filename)
- raw_data = RawData(
- raw_data_args, with_traces=True
+ raw_data = RawData(raw_data_args, with_traces=True)
+ print_info(
+ "Preprocessing file. Depending on its size, this could take a while."
)
- print_info("Preprocessing file. Depending on its size, this could take a while.")
preprocessed_data = raw_data.get_preprocessed_data()
print_info("File fully preprocessed")
# TODO: Mal schauen, wie ich das mache. Erstmal nur mit json. Ist erstmal raus. Wird nicht
# umgesetzt.
- print_error("Not implemented yet. Please generate .json files first with dfatool and use"
- " those.")
+ print_error(
+ "Not implemented yet. Please generate .json files first with dfatool and use"
+ " those."
+ )
else:
print_error("Unknown dataformat")
- sys.exit(-1) \ No newline at end of file
+ sys.exit(-1)