summaryrefslogtreecommitdiff
path: root/lib/pelt.py
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-11-02 13:28:40 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2020-11-02 13:28:40 +0100
commitc7a7b48c6739e193ba24eec4d41082271df164ce (patch)
treeca01ada66ac02395999310f84bf8f4a61d1aa049 /lib/pelt.py
parentc179546f74807882f4dff47c8a969741f2dba1a7 (diff)
Simplify PELT usage. remove kneedle, refactor code
Diffstat (limited to 'lib/pelt.py')
-rw-r--r--lib/pelt.py340
1 files changed, 67 insertions, 273 deletions
diff --git a/lib/pelt.py b/lib/pelt.py
index 7f2e922..8966d56 100644
--- a/lib/pelt.py
+++ b/lib/pelt.py
@@ -1,80 +1,17 @@
-#!/usr/bin/env python3
-
import numpy as np
-import ruptures
from multiprocessing import Pool
-# returns the found changepoints by algo for the specific penalty pen.
-# algo should be the return value of Pelt(...).fit(signal)
-# Also puts a token in container q to let the progressmeter know the changepoints for penalty pen
-# have been calculated.
-# used for parallel calculation of changepoints vs penalty
-def _get_breakpoints(algo, pen):
- return pen, len(algo.predict(pen=pen))
-
-
-def find_knee_point(data_x, data_y, S=1.0, curve="convex", direction="decreasing"):
- kneedle = kneed.KneeLocator(data_x, data_y, S=S, curve=curve, direction=direction)
- kneepoint = (kneedle.knee, kneedle.knee_y)
- return kneepoint
-
-
-def norm_signal(signal, scaler=25):
- max_val = max(signal)
- normed_signal = np.zeros(shape=len(signal))
- for i, signal_i in enumerate(signal):
- normed_signal[i] = signal_i / max_val
- normed_signal[i] = normed_signal[i] * scaler
- return normed_signal
-
-
-# Scheint Einfluss auf die gefundene Anzahl CHangepoints zu haben. Hrmpf.
-# Kann aber auch shitty downsampling sein. Diese Funktion ist _sehr_ quick&dirty.
-def coarse_signal(signal, divisor=10):
- ret = list()
- for i in range((len(signal) // divisor)):
- ret.append(np.mean(signal[i * divisor : (i + 1) * divisor]))
- return np.array(ret)
-
-# returns the changepoints found on signal with penalty penalty.
-# model, jump and min_dist are directly passed to PELT
-def calc_pelt(signal, penalty, model="l1", jump=5, min_dist=2, plotting=False):
- # default params in Function
- if model is None:
- model = "l1"
- if jump is None:
- jump = 5
- if min_dist is None:
- min_dist = 2
- if plotting is None:
- plotting = False
- # change point detection. best fit seemingly with l1. rbf prods. RuntimeErr for pen > 30
- # https://ctruong.perso.math.cnrs.fr/ruptures-docs/build/html/costs/index.html
- # model = "l1" #"l1" # "l2", "rbf"
- algo = ruptures.Pelt(model=model, jump=jump, min_size=min_dist).fit(signal)
-
- if penalty is not None:
- bkps = algo.predict(pen=penalty)
- if plotting:
- fig, ax = ruptures.display(signal, bkps)
- plt.show()
- return bkps
-
- print_error("No Penalty specified.")
- sys.exit(-1)
+def PELT_get_changepoints(algo, penalty):
+ res = (penalty, algo.predict(pen=penalty))
+ return res
# calculates the raw_states for measurement measurement. num_measurement is used to identify the
# return value
# penalty, model and jump are directly passed to pelt
-def calc_raw_states_func(num_measurement, measurement, penalty, model, jump):
- # extract signal
- signal = np.array(measurement)
- # norm signal to remove dependency on absolute values
- normed_signal = norm_signal(signal)
- # calculate the breakpoints
- bkpts = calc_pelt(normed_signal, penalty, model=model, jump=jump)
+def PELT_get_raw_states(num_measurement, algo, signal, penalty):
+ bkpts = algo.predict(pen=penalty)
calced_states = list()
start_time = 0
end_time = 0
@@ -111,19 +48,13 @@ def calc_raw_states_func(num_measurement, measurement, penalty, model, jump):
return num_measurement, calced_states, new_avg_std, change_avg_std
-# parallelize calc over all measurements
-def calc_raw_states(arg_list):
- with Pool() as pool:
- # collect results from pool
- result = pool.starmap(calc_raw_states_func, arg_list)
- return result
-
-
class PELT:
def __init__(self, **kwargs):
- # Defaults von Janis
+ self.model = "l1"
self.jump = 1
- self.refinement_threshold = 100
+ self.min_dist = 10
+ self.num_samples = None
+ self.refinement_threshold = 200e-6 # µW
self.range_min = 0
self.range_max = 100
self.__dict__.update(kwargs)
@@ -132,7 +63,8 @@ class PELT:
def needs_refinement(self, signals):
count = 0
for signal in signals:
- p1, median, p99 = np.percentile(signal, (1, 50, 99))
+ # test
+ p1, median, p99 = np.percentile(signal[5:-5], (1, 50, 99))
if median - p1 > self.refinement_threshold:
count += 1
@@ -141,70 +73,90 @@ class PELT:
refinement_ratio = count / len(signals)
return refinement_ratio > 0.3
- def get_penalty_value(
- self, signals, model="l1", min_dist=2, range_min=0, range_max=100, S=1.0
- ):
- # Janis macht hier noch kein norm_signal. Mit sieht es aber genau so brauchbar aus.
- # TODO vor der Penaltybestimmung die Auflösung der Daten auf z.B. 1 kHz
- # verringern. Dann geht's deutlich schneller und superkurze
- # Substates interessieren uns ohnehin weniger
- signal = coarse_signal(norm_signal(signals[0]))
- algo = ruptures.Pelt(model=model, jump=self.jump, min_size=min_dist).fit(signal)
+ def norm_signal(self, signal, scaler=25):
+ max_val = max(np.abs(signal))
+ normed_signal = np.zeros(shape=len(signal))
+ for i, signal_i in enumerate(signal):
+ normed_signal[i] = signal_i / max_val
+ normed_signal[i] = normed_signal[i] * scaler
+ return normed_signal
+
+ def get_penalty_and_changepoints(self, signal):
+ # imported here as ruptures is only used for changepoint detection.
+ # This way, dfatool can be used without having ruptures installed as
+ # long as --pelt isn't active.
+ import ruptures
+
+ if self.num_samples is not None:
+ self.jump = len(signal) // int(self.num_samples)
+ else:
+ self.jump = 1
+
+ algo = ruptures.Pelt(
+ model=self.model, jump=self.jump, min_size=self.min_dist
+ ).fit(self.norm_signal(signal))
queue = list()
- for i in range(range_min, range_max + 1):
+ for i in range(0, 100):
queue.append((algo, i))
with Pool() as pool:
- results = pool.starmap(_get_breakpoints, queue)
- pen_val = [x[0] for x in results]
- changepoint_counts = [x[1] for x in results]
- # Scheint unnötig zu sein, da wir ohnehin plateau detection durchführen
- # knee = find_knee_point(pen_val, changepoint_counts, S=S)
- knee = (0,)
+ changepoints = pool.starmap(PELT_get_changepoints, queue)
+ changepoints_by_penalty = dict()
+ for res in changepoints:
+ if len(res[1]) > 0 and res[1][-1] == len(signal):
+ res[1].pop()
+ changepoints_by_penalty[res[0]] = res[1]
+ num_changepoints = list()
+ for i in range(0, 100):
+ num_changepoints.append(len(changepoints_by_penalty[i]))
start_index = -1
end_index = -1
longest_start = -1
longest_end = -1
prev_val = -1
- for i, num_bkpts in enumerate(changepoint_counts[knee[0] :]):
+ for i, num_bkpts in enumerate(num_changepoints):
if num_bkpts != prev_val:
end_index = i - 1
if end_index - start_index > longest_end - longest_start:
- # currently found sequence is the longest found yet
longest_start = start_index
longest_end = end_index
start_index = i
- if i == len(changepoint_counts[knee[0] :]) - 1:
- # end sequence with last value
+ if i == len(num_changepoints) - 1:
end_index = i
- # # since it is not guaranteed that this is the end of the plateau, assume the mid
- # # of the plateau was hit.
- # size = end_index - start_index
- # end_index = end_index + size
- # However this is not the clean solution. Better if search interval is widened
- # with range_min and range_max
if end_index - start_index > longest_end - longest_start:
- # last found sequence is the longest found yet
longest_start = start_index
longest_end = end_index
start_index = i
prev_val = num_bkpts
- mid_of_plat = longest_start + (longest_end - longest_start) // 2
- knee = (mid_of_plat + knee[0], changepoint_counts[mid_of_plat + knee[0]])
+ middle_of_plateau = longest_start + (longest_start - longest_start) // 2
+ changepoints = np.array(changepoints_by_penalty[middle_of_plateau])
+ return middle_of_plateau, changepoints
+
+ def get_changepoints(self, signal):
+ _, changepoints = self.get_penalty_and_changepoints(signal)
+ return changepoints
- # modify knee according to options. Defaults to 1 * knee
- knee = (knee[0] * 1, knee[1])
- return knee
+ def get_penalty(self, signal):
+ penalty, _ = self.get_penalty_and_changepoints(signal)
+ return penalty
def calc_raw_states(self, signals, penalty, opt_model=None):
+ # imported here as ruptures is only used for changepoint detection.
+ # This way, dfatool can be used without having ruptures installed as
+ # long as --pelt isn't active.
+ import ruptures
+
raw_states_calc_args = list()
for num_measurement, measurement in enumerate(signals):
- raw_states_calc_args.append(
- (num_measurement, measurement, penalty, opt_model, self.jump)
- )
+ normed_signal = self.norm_signal(measurement)
+ algo = ruptures.Pelt(
+ model=self.model, jump=self.jump, min_size=self.min_dist
+ ).fit(normed_signal)
+ raw_states_calc_args.append((num_measurement, algo, normed_signal, penalty))
raw_states_list = [None] * len(signals)
- raw_states_res = calc_raw_states(raw_states_calc_args)
+ with Pool() as pool:
+ raw_states_res = pool.starmap(PELT_get_raw_states, raw_states_calc_args)
# extracting result and putting it in correct order -> index of raw_states_list
# entry still corresponds with index of measurement in measurements_by_states
@@ -233,161 +185,3 @@ class PELT:
# l_bkpts = [s[1] for s in raw_states]
# fig, ax = rpt.display(np.array(l_signal), l_bkpts)
# plt.show()
-
- """
- # calculates and returns the necessary penalty for signal. Parallel execution with num_processes many processes
- # jump, min_dist are passed directly to PELT. S is directly passed to kneedle.
- # pen_modifier is used as a factor on the resulting penalty.
- # the interval [range_min, range_max] is used for searching.
- def calculate_penalty_value(
- signal,
- model="l1",
- jump=5,
- min_dist=2,
- range_min=0,
- range_max=100,
- S=1.0,
- pen_modifier=None,
- show_plots=False,
- ):
- # default params in Function
- if model is None:
- model = "l1"
- if jump is None:
- jump = 5
- if min_dist is None:
- min_dist = 2
- if range_min is None:
- range_min = 0
- if range_max is None:
- range_max = 50
- if S is None:
- S = 1.0
- if pen_modifier is None:
- pen_modifier = 1
- # change point detection. best fit seemingly with l1. rbf prods. RuntimeErr for pen > 30
- # https://ctruong.perso.math.cnrs.fr/ruptures-docs/build/html/costs/index.html
- # model = "l1" #"l1" # "l2", "rbf"
- algo = ruptures.Pelt(model=model, jump=jump, min_size=min_dist).fit(signal)
-
- ### CALC BKPS WITH DIFF PENALTYS
- if range_max != range_min:
- # building args array for parallelizing
- args = []
- # for displaying progression
- m = Manager()
- q = m.Queue()
-
- for i in range(range_min, range_max + 1):
- # same calculation for all except other penalty
- args.append((algo, i, q))
-
- print_info("starting kneepoint calculation.")
- # init Pool with num_proesses
- with Pool() as p:
- # collect results from pool
- result = p.starmap_async(get_bkps, args)
- # monitor loop
- percentage = -100 # Force display of 0%
- i = 0
- while True:
- if result.ready():
- break
-
- size = q.qsize()
- last_percentage = percentage
- percentage = round(size / (range_max - range_min) * 100, 2)
- if percentage >= last_percentage + 2 or i >= refresh_thresh:
- print_info("Current progress: " + str(percentage) + "%")
- i = 0
- else:
- i += 1
- time.sleep(refresh_delay)
- res = result.get()
- print_info("Finished kneepoint calculation.")
- # DECIDE WHICH PENALTY VALUE TO CHOOSE ACCORDING TO ELBOW/KNEE APPROACH
- # split x and y coords to pass to kneedle
- pen_val = [x[0] for x in res]
- fitted_bkps_val = [x[1] for x in res]
- # # plot to look at res
- knee = find_knee_point(pen_val, fitted_bkps_val, S=S)
-
- # TODO: Find plateau on pen_val vs fitted_bkps_val
- # scipy.find_peaks() does not find plateaus if they extend through the end of the data.
- # to counter that, add one extremely large value to the right side of the data
- # after negating it is extremely small -> Almost certainly smaller than the
- # found plateau therefore the plateau does not extend through the border
- # -> scipy.find_peaks finds it. Choose value from within that plateau.
- # fitted_bkps_val.append(100000000)
- # TODO: Approaching over find_peaks might not work if the initial decrease step to the
- # "correct" number of changepoints and additional decrease steps e.g. underfitting
- # take place within the given penalty interval. find_peak only finds plateaus
- # of peaks. If the number of chpts decreases after the wanted plateau the condition
- # for local peaks is not satisfied anymore. Therefore this approach will only work
- # if the plateau extends over the right border of the penalty interval.
- # peaks, peak_plateaus = find_peaks(- np.array(fitted_bkps_val), plateau_size=1)
- # Since the data is monotonously decreasing only one plateau can be found.
-
- # assuming the plateau is constant, i.e. no noise. OK to assume this here, since num_bkpts
- # is monotonously decreasing. If the number of bkpts decreases inside a considered
- # plateau, it means that the stable configuration is not yet met. -> Search further
- start_index = -1
- end_index = -1
- longest_start = -1
- longest_end = -1
- prev_val = -1
- for i, num_bkpts in enumerate(fitted_bkps_val[knee[0] :]):
- if num_bkpts != prev_val:
- end_index = i - 1
- if end_index - start_index > longest_end - longest_start:
- # currently found sequence is the longest found yet
- longest_start = start_index
- longest_end = end_index
- start_index = i
- if i == len(fitted_bkps_val[knee[0] :]) - 1:
- # end sequence with last value
- end_index = i
- # # since it is not guaranteed that this is the end of the plateau, assume the mid
- # # of the plateau was hit.
- # size = end_index - start_index
- # end_index = end_index + size
- # However this is not the clean solution. Better if search interval is widened
- # with range_min and range_max
- if end_index - start_index > longest_end - longest_start:
- # last found sequence is the longest found yet
- longest_start = start_index
- longest_end = end_index
- start_index = i
- prev_val = num_bkpts
- if show_plots:
- plt.xlabel("Penalty")
- plt.ylabel("Number of Changepoints")
- plt.plot(pen_val, fitted_bkps_val)
- plt.vlines(
- longest_start + knee[0], 0, max(fitted_bkps_val), linestyles="dashed"
- )
- plt.vlines(
- longest_end + knee[0], 0, max(fitted_bkps_val), linestyles="dashed"
- )
- plt.show()
- # choosing pen from plateau
- mid_of_plat = longest_start + (longest_end - longest_start) // 2
- knee = (mid_of_plat + knee[0], fitted_bkps_val[mid_of_plat + knee[0]])
-
- # modify knee according to options. Defaults to 1 * knee
- knee = (knee[0] * pen_modifier, knee[1])
-
- else:
- # range_min == range_max. has the same effect as pen_override
- knee = (range_min, None)
- print_info(str(knee[0]) + " has been selected as penalty.")
- if knee[0] is not None:
- return knee
-
- print_error(
- "With the current thresh-hold S="
- + str(S)
- + " it is not possible to select a penalty value."
- )
- sys.exit(-1)
- """