diff options
Diffstat (limited to 'lib/pelt.py')
-rw-r--r-- | lib/pelt.py | 130 |
1 files changed, 60 insertions, 70 deletions
diff --git a/lib/pelt.py b/lib/pelt.py index a215b28..518bef7 100644 --- a/lib/pelt.py +++ b/lib/pelt.py @@ -1,6 +1,9 @@ +import logging import numpy as np from multiprocessing import Pool +logger = logging.getLogger(__name__) + def PELT_get_changepoints(algo, penalty): res = (penalty, algo.predict(pen=penalty)) @@ -10,42 +13,21 @@ def PELT_get_changepoints(algo, penalty): # calculates the raw_states for measurement measurement. num_measurement is used to identify the # return value # penalty, model and jump are directly passed to pelt -def PELT_get_raw_states(num_measurement, algo, signal, penalty): - bkpts = algo.predict(pen=penalty) - calced_states = list() - start_time = 0 - end_time = 0 +def PELT_get_raw_states(num_measurement, algo, penalty): + changepoints = algo.predict(pen=penalty) + substates = list() + start_index = 0 + end_index = 0 # calc metrics for all states - for bkpt in bkpts: - # start_time of state is end_time of previous one + for changepoint in changepoints: + # start_index of state is end_index of previous one # (Transitions are instantaneous) - start_time = end_time - end_time = bkpt - power_vals = signal[start_time:end_time] - mean_power = np.mean(power_vals) - std_dev = np.std(power_vals) - calced_state = (start_time, end_time, mean_power, std_dev) - calced_states.append(calced_state) - num = 0 - new_avg_std = 0 - # calc avg std for all states from this measurement - for s in calced_states: - # print_info("State " + str(num) + " starts at t=" + str(s[0]) - # + " and ends at t=" + str(s[1]) - # + " while using " + str(s[2]) - # + "uW with sigma=" + str(s[3])) - num = num + 1 - new_avg_std = new_avg_std + s[3] - # check case if no state has been found to avoid crashing - if len(calced_states) != 0: - new_avg_std = new_avg_std / len(calced_states) - else: - new_avg_std = 0 - change_avg_std = None # measurement["uW_std"] - new_avg_std - # print_info("The average standard deviation for the newly found states is " - # + str(new_avg_std)) - # print_info("That is a reduction of " + str(change_avg_std)) - return num_measurement, calced_states, new_avg_std, change_avg_std + start_index = end_index + end_index = changepoint - 1 + substate = (start_index, end_index) + substates.append(substate) + + return num_measurement, substates class PELT: @@ -54,7 +36,7 @@ class PELT: self.jump = 1 self.min_dist = 10 self.num_samples = None - self.refinement_threshold = 200e-6 # µW + self.refinement_threshold = 200e-6 # 200 µW self.range_min = 0 self.range_max = 100 self.__dict__.update(kwargs) @@ -89,7 +71,6 @@ class PELT: if self.num_samples is not None and len(signal) > self.num_samples: self.jump = len(signal) // int(self.num_samples) - print(f"jump = {self.jump}") else: self.jump = 1 @@ -106,29 +87,29 @@ class PELT: if len(res[1]) > 0 and res[1][-1] == len(signal): res[1].pop() changepoints_by_penalty[res[0]] = res[1] - num_changepoints = list() + changepoint_counts = list() for i in range(0, 100): - num_changepoints.append(len(changepoints_by_penalty[i])) + changepoint_counts.append(len(changepoints_by_penalty[i])) start_index = -1 end_index = -1 longest_start = -1 longest_end = -1 prev_val = -1 - for i, num_bkpts in enumerate(num_changepoints): - if num_bkpts != prev_val: + for i, num_changepoints in enumerate(changepoint_counts): + if num_changepoints != prev_val: end_index = i - 1 if end_index - start_index > longest_end - longest_start: longest_start = start_index longest_end = end_index start_index = i - if i == len(num_changepoints) - 1: + if i == len(changepoint_counts) - 1: end_index = i if end_index - start_index > longest_end - longest_start: longest_start = start_index longest_end = end_index start_index = i - prev_val = num_bkpts + prev_val = num_changepoints middle_of_plateau = longest_start + (longest_start - longest_start) // 2 changepoints = np.array(changepoints_by_penalty[middle_of_plateau]) return middle_of_plateau, changepoints @@ -141,48 +122,57 @@ class PELT: penalty, _ = self.get_penalty_and_changepoints(signal) return penalty - def calc_raw_states(self, signals, penalty, opt_model=None): + def calc_raw_states(self, timestamps, signals, penalty, opt_model=None): + """ + Calculate substates for signals (assumed to be long to a single parameter configuration). + + :returns: List of substates with duration and mean power: [(substate 1 duration, substate 1 power), ...] + """ + # imported here as ruptures is only used for changepoint detection. # This way, dfatool can be used without having ruptures installed as # long as --pelt isn't active. import ruptures + substate_data = list() + raw_states_calc_args = list() for num_measurement, measurement in enumerate(signals): normed_signal = self.norm_signal(measurement) algo = ruptures.Pelt( model=self.model, jump=self.jump, min_size=self.min_dist ).fit(normed_signal) - raw_states_calc_args.append((num_measurement, algo, normed_signal, penalty)) + raw_states_calc_args.append((num_measurement, algo, penalty)) raw_states_list = [None] * len(signals) with Pool() as pool: raw_states_res = pool.starmap(PELT_get_raw_states, raw_states_calc_args) - # extracting result and putting it in correct order -> index of raw_states_list - # entry still corresponds with index of measurement in measurements_by_states - # -> If measurements are discarded the used ones are easily recognized - for ret_val in raw_states_res: - num_measurement = ret_val[0] - raw_states = ret_val[1] - avg_std = ret_val[2] - change_avg_std = ret_val[3] - # FIXME: Wieso gibt mir meine IDE hier eine Warning aus? Der Index müsste doch - # int sein oder nicht? Es scheint auch vernünftig zu klappen... - raw_states_list[num_measurement] = raw_states - # print( - # "The average standard deviation for the newly found states in " - # + "measurement No. " - # + str(num_measurement) - # + " is " - # + str(avg_std) - # ) - # print("That is a reduction of " + str(change_avg_std)) - for i, raw_state in enumerate(raw_states): - print( - f"Measurement #{num_measurement} sub-state #{i}: {raw_state[0]} -> {raw_state[1]}, mean {raw_state[2]}" + substate_counts = list(map(lambda x: len(x[1]), raw_states_res)) + expected_substate_count = np.argmax(np.bincount(substate_counts)) + usable_measurements = list( + filter(lambda x: len(x[1]) == expected_substate_count, raw_states_res) + ) + logger.debug( + f" There are {expected_substate_count} substates (std = {np.std(substate_counts)}, {len(usable_measurements)}/{len(raw_states_res)} results are usable)" + ) + + for i in range(expected_substate_count): + substate_data.append( + {"duration": list(), "power": list(), "power_std": list()} + ) + + for num_measurement, substates in usable_measurements: + for i, substate in enumerate(substates): + power_trace = signals[num_measurement][substate[0] : substate[1]] + mean_power = np.mean(power_trace) + std_power = np.std(power_trace) + duration = ( + timestamps[num_measurement][substate[1]] + - timestamps[num_measurement][substate[0]] ) - # l_signal = measurements_by_config['offline'][num_measurement]['uW'] - # l_bkpts = [s[1] for s in raw_states] - # fig, ax = rpt.display(np.array(l_signal), l_bkpts) - # plt.show() + substate_data[i]["duration"].append(duration) + substate_data[i]["power"].append(mean_power) + substate_data[i]["power_std"].append(std_power) + + return substate_counts, substate_data |