summaryrefslogtreecommitdiff
path: root/lib/pelt.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pelt.py')
-rw-r--r--lib/pelt.py130
1 files changed, 60 insertions, 70 deletions
diff --git a/lib/pelt.py b/lib/pelt.py
index a215b28..518bef7 100644
--- a/lib/pelt.py
+++ b/lib/pelt.py
@@ -1,6 +1,9 @@
+import logging
import numpy as np
from multiprocessing import Pool
+logger = logging.getLogger(__name__)
+
def PELT_get_changepoints(algo, penalty):
res = (penalty, algo.predict(pen=penalty))
@@ -10,42 +13,21 @@ def PELT_get_changepoints(algo, penalty):
# calculates the raw_states for measurement measurement. num_measurement is used to identify the
# return value
# penalty, model and jump are directly passed to pelt
-def PELT_get_raw_states(num_measurement, algo, signal, penalty):
- bkpts = algo.predict(pen=penalty)
- calced_states = list()
- start_time = 0
- end_time = 0
+def PELT_get_raw_states(num_measurement, algo, penalty):
+ changepoints = algo.predict(pen=penalty)
+ substates = list()
+ start_index = 0
+ end_index = 0
# calc metrics for all states
- for bkpt in bkpts:
- # start_time of state is end_time of previous one
+ for changepoint in changepoints:
+ # start_index of state is end_index of previous one
# (Transitions are instantaneous)
- start_time = end_time
- end_time = bkpt
- power_vals = signal[start_time:end_time]
- mean_power = np.mean(power_vals)
- std_dev = np.std(power_vals)
- calced_state = (start_time, end_time, mean_power, std_dev)
- calced_states.append(calced_state)
- num = 0
- new_avg_std = 0
- # calc avg std for all states from this measurement
- for s in calced_states:
- # print_info("State " + str(num) + " starts at t=" + str(s[0])
- # + " and ends at t=" + str(s[1])
- # + " while using " + str(s[2])
- # + "uW with sigma=" + str(s[3]))
- num = num + 1
- new_avg_std = new_avg_std + s[3]
- # check case if no state has been found to avoid crashing
- if len(calced_states) != 0:
- new_avg_std = new_avg_std / len(calced_states)
- else:
- new_avg_std = 0
- change_avg_std = None # measurement["uW_std"] - new_avg_std
- # print_info("The average standard deviation for the newly found states is "
- # + str(new_avg_std))
- # print_info("That is a reduction of " + str(change_avg_std))
- return num_measurement, calced_states, new_avg_std, change_avg_std
+ start_index = end_index
+ end_index = changepoint - 1
+ substate = (start_index, end_index)
+ substates.append(substate)
+
+ return num_measurement, substates
class PELT:
@@ -54,7 +36,7 @@ class PELT:
self.jump = 1
self.min_dist = 10
self.num_samples = None
- self.refinement_threshold = 200e-6 # µW
+ self.refinement_threshold = 200e-6 # 200 µW
self.range_min = 0
self.range_max = 100
self.__dict__.update(kwargs)
@@ -89,7 +71,6 @@ class PELT:
if self.num_samples is not None and len(signal) > self.num_samples:
self.jump = len(signal) // int(self.num_samples)
- print(f"jump = {self.jump}")
else:
self.jump = 1
@@ -106,29 +87,29 @@ class PELT:
if len(res[1]) > 0 and res[1][-1] == len(signal):
res[1].pop()
changepoints_by_penalty[res[0]] = res[1]
- num_changepoints = list()
+ changepoint_counts = list()
for i in range(0, 100):
- num_changepoints.append(len(changepoints_by_penalty[i]))
+ changepoint_counts.append(len(changepoints_by_penalty[i]))
start_index = -1
end_index = -1
longest_start = -1
longest_end = -1
prev_val = -1
- for i, num_bkpts in enumerate(num_changepoints):
- if num_bkpts != prev_val:
+ for i, num_changepoints in enumerate(changepoint_counts):
+ if num_changepoints != prev_val:
end_index = i - 1
if end_index - start_index > longest_end - longest_start:
longest_start = start_index
longest_end = end_index
start_index = i
- if i == len(num_changepoints) - 1:
+ if i == len(changepoint_counts) - 1:
end_index = i
if end_index - start_index > longest_end - longest_start:
longest_start = start_index
longest_end = end_index
start_index = i
- prev_val = num_bkpts
+ prev_val = num_changepoints
middle_of_plateau = longest_start + (longest_start - longest_start) // 2
changepoints = np.array(changepoints_by_penalty[middle_of_plateau])
return middle_of_plateau, changepoints
@@ -141,48 +122,57 @@ class PELT:
penalty, _ = self.get_penalty_and_changepoints(signal)
return penalty
- def calc_raw_states(self, signals, penalty, opt_model=None):
+ def calc_raw_states(self, timestamps, signals, penalty, opt_model=None):
+ """
+ Calculate substates for signals (assumed to be long to a single parameter configuration).
+
+ :returns: List of substates with duration and mean power: [(substate 1 duration, substate 1 power), ...]
+ """
+
# imported here as ruptures is only used for changepoint detection.
# This way, dfatool can be used without having ruptures installed as
# long as --pelt isn't active.
import ruptures
+ substate_data = list()
+
raw_states_calc_args = list()
for num_measurement, measurement in enumerate(signals):
normed_signal = self.norm_signal(measurement)
algo = ruptures.Pelt(
model=self.model, jump=self.jump, min_size=self.min_dist
).fit(normed_signal)
- raw_states_calc_args.append((num_measurement, algo, normed_signal, penalty))
+ raw_states_calc_args.append((num_measurement, algo, penalty))
raw_states_list = [None] * len(signals)
with Pool() as pool:
raw_states_res = pool.starmap(PELT_get_raw_states, raw_states_calc_args)
- # extracting result and putting it in correct order -> index of raw_states_list
- # entry still corresponds with index of measurement in measurements_by_states
- # -> If measurements are discarded the used ones are easily recognized
- for ret_val in raw_states_res:
- num_measurement = ret_val[0]
- raw_states = ret_val[1]
- avg_std = ret_val[2]
- change_avg_std = ret_val[3]
- # FIXME: Wieso gibt mir meine IDE hier eine Warning aus? Der Index müsste doch
- # int sein oder nicht? Es scheint auch vernünftig zu klappen...
- raw_states_list[num_measurement] = raw_states
- # print(
- # "The average standard deviation for the newly found states in "
- # + "measurement No. "
- # + str(num_measurement)
- # + " is "
- # + str(avg_std)
- # )
- # print("That is a reduction of " + str(change_avg_std))
- for i, raw_state in enumerate(raw_states):
- print(
- f"Measurement #{num_measurement} sub-state #{i}: {raw_state[0]} -> {raw_state[1]}, mean {raw_state[2]}"
+ substate_counts = list(map(lambda x: len(x[1]), raw_states_res))
+ expected_substate_count = np.argmax(np.bincount(substate_counts))
+ usable_measurements = list(
+ filter(lambda x: len(x[1]) == expected_substate_count, raw_states_res)
+ )
+ logger.debug(
+ f" There are {expected_substate_count} substates (std = {np.std(substate_counts)}, {len(usable_measurements)}/{len(raw_states_res)} results are usable)"
+ )
+
+ for i in range(expected_substate_count):
+ substate_data.append(
+ {"duration": list(), "power": list(), "power_std": list()}
+ )
+
+ for num_measurement, substates in usable_measurements:
+ for i, substate in enumerate(substates):
+ power_trace = signals[num_measurement][substate[0] : substate[1]]
+ mean_power = np.mean(power_trace)
+ std_power = np.std(power_trace)
+ duration = (
+ timestamps[num_measurement][substate[1]]
+ - timestamps[num_measurement][substate[0]]
)
- # l_signal = measurements_by_config['offline'][num_measurement]['uW']
- # l_bkpts = [s[1] for s in raw_states]
- # fig, ax = rpt.display(np.array(l_signal), l_bkpts)
- # plt.show()
+ substate_data[i]["duration"].append(duration)
+ substate_data[i]["power"].append(mean_power)
+ substate_data[i]["power_std"].append(std_power)
+
+ return substate_counts, substate_data