diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2020-10-28 13:53:19 +0100 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2020-10-28 13:54:40 +0100 |
commit | d274e4592385f5d601d32986f8980ca60d82c24b (patch) | |
tree | 2860253c8204dba5db8fbd63dbb06c6b6c720fa6 | |
parent | beac1d86844b03535684f9b5348422606986d91a (diff) |
add changepoint detection and optional load limit
-rwxr-xr-x | bin/dlog-viewer | 190 |
1 files changed, 185 insertions, 5 deletions
diff --git a/bin/dlog-viewer b/bin/dlog-viewer index 2018622..df32ab5 100755 --- a/bin/dlog-viewer +++ b/bin/dlog-viewer @@ -17,6 +17,7 @@ OPTIONS import argparse import csv +import json import matplotlib.pyplot as plt import numpy as np import os @@ -24,6 +25,8 @@ import struct import sys import xml.etree.ElementTree as ET +from multiprocessing import Pool + def running_mean(x: np.ndarray, N: int) -> np.ndarray: """ @@ -41,6 +44,109 @@ def running_mean(x: np.ndarray, N: int) -> np.ndarray: return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") +def downsample(x: np.ndarray, N: int) -> np.ndarray: + """Downsample `x` by factor `N`.""" + fill = x.shape[0] % N + if fill: + x = np.array(list(x) + [x[-1] for i in range(N - fill)]) + return x.reshape(-1, N).mean(axis=1) + + +class NpEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + else: + return super(NpEncoder, self).default(obj) + + +def PELT_get_changepoints(algo, penalty): + res = (penalty, algo.predict(pen=penalty)) + return res + + +class PELT: + def __init__(self, signal, num_samples=None): + self.signal = signal + self.model = "l1" + self.jump = 1 + self.min_dist = 500 + + if num_samples is not None: + self.ds_factor = len(signal) // num_samples + else: + self.ds_factor = 1 + + self.jump = self.ds_factor + # if self.ds_factor > 1: + # self.signal = downsample(self.signal, self.ds_factor) + # print(f"ds from {len(signal)} to {len(self.signal)}") + + def norm_signal(self, signal, scaler=25): + max_val = max(np.abs(signal)) + normed_signal = np.zeros(shape=len(signal)) + for i, signal_i in enumerate(signal): + normed_signal[i] = signal_i / max_val + normed_signal[i] = normed_signal[i] * scaler + return normed_signal + + def get_changepoints(self): + # imported here as ruptures is only used for changepoint detection + import ruptures + + algo = ruptures.Pelt( + model=self.model, jump=self.jump, min_size=self.min_dist + ).fit(self.norm_signal(self.signal)) + queue = list() + for i in range(0, 100): + queue.append((algo, i)) + with Pool() as pool: + changepoints = pool.starmap(PELT_get_changepoints, queue) + changepoints_by_penalty = dict() + for res in changepoints: + changepoints_by_penalty[res[0]] = res[1] + num_changepoints = list() + for i in range(0, 100): + num_changepoints.append(len(changepoints_by_penalty[i])) + + # Find plateau + start_index = -1 + end_index = -1 + longest_start = -1 + longest_end = -1 + prev_val = -1 + for i, num_bkpts in enumerate(num_changepoints): + if num_bkpts != prev_val: + end_index = i - 1 + if end_index - start_index > longest_end - longest_start: + # currently found sequence is the longest found yet + longest_start = start_index + longest_end = end_index + start_index = i + if i == len(num_changepoints) - 1: + # end sequence with last value + end_index = i + # # since it is not guaranteed that this is the end of the plateau, assume the mid + # # of the plateau was hit. + # size = end_index - start_index + # end_index = end_index + size + # However this is not the clean solution. Better if search interval is widened + # with range_min and range_max + if end_index - start_index > longest_end - longest_start: + # last found sequence is the longest found yet + longest_start = start_index + longest_end = end_index + start_index = i + prev_val = num_bkpts + middle_of_plateau = longest_start + (longest_start - longest_start) // 2 + changepoints = np.array(changepoints_by_penalty[middle_of_plateau]) + return changepoints + + class DLogChannel: def __init__(self, desc_tuple): self.slot = desc_tuple[0] @@ -53,7 +159,8 @@ class DLogChannel: class DLog: - def __init__(self, filename): + def __init__(self, filename, limit=None): + self.limit_duration = limit self.load_dlog(filename) def load_dlog(self, filename): @@ -99,6 +206,19 @@ class DLog: 0, self.observed_duration, num=int(len(raw_data) / (4 * num_channels)) ) + if ( + self.limit_duration is not None + and self.observed_duration > self.limit_duration + ): + stop_offset = len(self.timestamps) - 1 + for i, ts in enumerate(self.timestamps): + if ts > self.limit_duration: + stop_offset = i + break + self.timestamps = self.timestamps[:stop_offset] + self.observed_duration = self.timestamps[-1] + raw_data = raw_data[: stop_offset * 4 * num_channels] + self.data = np.ndarray( shape=(num_channels, int(len(raw_data) / (4 * num_channels))), dtype=np.float32, @@ -149,17 +269,40 @@ class DLog: return True +def detect_changepoints(dlog, num_samples): + ret_slots = list() + for slot in dlog.slots: + ret_slot = dict() + for unit in slot: + print(f"changepoint detection for {slot} {unit}") + pelt = PELT(running_mean(slot[unit].data, 10), num_samples=num_samples) + changepoints = pelt.get_changepoints() + prev = 0 + ret_slot[unit] = list() + for cp in changepoints: + cp = cp - 1 + ret_slot[unit].append( + { + "interval": [dlog.timestamps[prev], dlog.timestamps[cp]], + "mean": np.mean(slot[unit].data[prev:cp]), + } + ) + prev = cp + ret_slots.append(ret_slot) + return ret_slots + + def print_stats(dlog): if dlog.observed_duration_equals_expectation(): print( "Measurement duration: {:d} seconds at {:f} µs per sample".format( - dlog.planned_duration, dlog.interval * 1000000 + dlog.planned_duration, dlog.interval * 1_000_000 ) ) else: print( "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format( - dlog.observed_duration, dlog.planned_duration, dlog.interval * 1000000 + dlog.observed_duration, dlog.planned_duration, dlog.interval * 1_000_000 ) ) @@ -303,12 +446,40 @@ def export_csv(dlog, filename): writer.writerow([dlog.timestamps[row]] + list(dlog.data[:, row])) +def export_json(dlog, filename, extra_data=dict()): + json_channels = list() + for channel in dlog.channels: + json_channels.append({"smu": channel.smu, "unit": channel.unit}) + json_data = {"channels": json_channels} + json_data.update(extra_data) + with open(filename, "w") as f: + json.dump(json_data, f, cls=NpEncoder) + + def main(): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ ) parser.add_argument( - "--csv-export", type=str, help="Export measurements to CSV file" + "--csv-export", + metavar="FILENAME", + type=str, + help="Export measurements to CSV file", + ) + parser.add_argument( + "--json-export", + metavar="FILENAME", + type=str, + help="Export analysis results (e.g. changepoints) to JSON file", + ) + parser.add_argument( + "--limit", + type=float, + metavar="N", + help="Limit analysis to the first N seconds of data", + ) + parser.add_argument( + "--pelt", metavar="JUMP", type=int, help="Perform changepoint detection" ) parser.add_argument( "--plot", @@ -324,7 +495,7 @@ def main(): args = parser.parse_args() - dlog = DLog(args.dlog_file) + dlog = DLog(args.dlog_file, args.limit) if args.stat: print_stats(dlog) @@ -332,6 +503,15 @@ def main(): if args.csv_export: export_csv(dlog, args.csv_export) + if args.pelt: + changepoints = detect_changepoints(dlog, num_samples=args.pelt) + + if args.json_export: + extra_data = dict() + if args.pelt: + extra_data["changepoints"] = changepoints + export_json(dlog, args.json_export, extra_data) + if args.plot: if args.plot == "P" and dlog.all_data_slots_have_power(): show_power_plot(dlog) |