summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-10-28 13:53:19 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2020-10-28 13:54:40 +0100
commitd274e4592385f5d601d32986f8980ca60d82c24b (patch)
tree2860253c8204dba5db8fbd63dbb06c6b6c720fa6
parentbeac1d86844b03535684f9b5348422606986d91a (diff)
add changepoint detection and optional load limit
-rwxr-xr-xbin/dlog-viewer190
1 files changed, 185 insertions, 5 deletions
diff --git a/bin/dlog-viewer b/bin/dlog-viewer
index 2018622..df32ab5 100755
--- a/bin/dlog-viewer
+++ b/bin/dlog-viewer
@@ -17,6 +17,7 @@ OPTIONS
import argparse
import csv
+import json
import matplotlib.pyplot as plt
import numpy as np
import os
@@ -24,6 +25,8 @@ import struct
import sys
import xml.etree.ElementTree as ET
+from multiprocessing import Pool
+
def running_mean(x: np.ndarray, N: int) -> np.ndarray:
"""
@@ -41,6 +44,109 @@ def running_mean(x: np.ndarray, N: int) -> np.ndarray:
return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid")
+def downsample(x: np.ndarray, N: int) -> np.ndarray:
+ """Downsample `x` by factor `N`."""
+ fill = x.shape[0] % N
+ if fill:
+ x = np.array(list(x) + [x[-1] for i in range(N - fill)])
+ return x.reshape(-1, N).mean(axis=1)
+
+
+class NpEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, np.integer):
+ return int(obj)
+ elif isinstance(obj, np.floating):
+ return float(obj)
+ elif isinstance(obj, np.ndarray):
+ return obj.tolist()
+ else:
+ return super(NpEncoder, self).default(obj)
+
+
+def PELT_get_changepoints(algo, penalty):
+ res = (penalty, algo.predict(pen=penalty))
+ return res
+
+
+class PELT:
+ def __init__(self, signal, num_samples=None):
+ self.signal = signal
+ self.model = "l1"
+ self.jump = 1
+ self.min_dist = 500
+
+ if num_samples is not None:
+ self.ds_factor = len(signal) // num_samples
+ else:
+ self.ds_factor = 1
+
+ self.jump = self.ds_factor
+ # if self.ds_factor > 1:
+ # self.signal = downsample(self.signal, self.ds_factor)
+ # print(f"ds from {len(signal)} to {len(self.signal)}")
+
+ def norm_signal(self, signal, scaler=25):
+ max_val = max(np.abs(signal))
+ normed_signal = np.zeros(shape=len(signal))
+ for i, signal_i in enumerate(signal):
+ normed_signal[i] = signal_i / max_val
+ normed_signal[i] = normed_signal[i] * scaler
+ return normed_signal
+
+ def get_changepoints(self):
+ # imported here as ruptures is only used for changepoint detection
+ import ruptures
+
+ algo = ruptures.Pelt(
+ model=self.model, jump=self.jump, min_size=self.min_dist
+ ).fit(self.norm_signal(self.signal))
+ queue = list()
+ for i in range(0, 100):
+ queue.append((algo, i))
+ with Pool() as pool:
+ changepoints = pool.starmap(PELT_get_changepoints, queue)
+ changepoints_by_penalty = dict()
+ for res in changepoints:
+ changepoints_by_penalty[res[0]] = res[1]
+ num_changepoints = list()
+ for i in range(0, 100):
+ num_changepoints.append(len(changepoints_by_penalty[i]))
+
+ # Find plateau
+ start_index = -1
+ end_index = -1
+ longest_start = -1
+ longest_end = -1
+ prev_val = -1
+ for i, num_bkpts in enumerate(num_changepoints):
+ if num_bkpts != prev_val:
+ end_index = i - 1
+ if end_index - start_index > longest_end - longest_start:
+ # currently found sequence is the longest found yet
+ longest_start = start_index
+ longest_end = end_index
+ start_index = i
+ if i == len(num_changepoints) - 1:
+ # end sequence with last value
+ end_index = i
+ # # since it is not guaranteed that this is the end of the plateau, assume the mid
+ # # of the plateau was hit.
+ # size = end_index - start_index
+ # end_index = end_index + size
+ # However this is not the clean solution. Better if search interval is widened
+ # with range_min and range_max
+ if end_index - start_index > longest_end - longest_start:
+ # last found sequence is the longest found yet
+ longest_start = start_index
+ longest_end = end_index
+ start_index = i
+ prev_val = num_bkpts
+ middle_of_plateau = longest_start + (longest_start - longest_start) // 2
+ changepoints = np.array(changepoints_by_penalty[middle_of_plateau])
+ return changepoints
+
+
class DLogChannel:
def __init__(self, desc_tuple):
self.slot = desc_tuple[0]
@@ -53,7 +159,8 @@ class DLogChannel:
class DLog:
- def __init__(self, filename):
+ def __init__(self, filename, limit=None):
+ self.limit_duration = limit
self.load_dlog(filename)
def load_dlog(self, filename):
@@ -99,6 +206,19 @@ class DLog:
0, self.observed_duration, num=int(len(raw_data) / (4 * num_channels))
)
+ if (
+ self.limit_duration is not None
+ and self.observed_duration > self.limit_duration
+ ):
+ stop_offset = len(self.timestamps) - 1
+ for i, ts in enumerate(self.timestamps):
+ if ts > self.limit_duration:
+ stop_offset = i
+ break
+ self.timestamps = self.timestamps[:stop_offset]
+ self.observed_duration = self.timestamps[-1]
+ raw_data = raw_data[: stop_offset * 4 * num_channels]
+
self.data = np.ndarray(
shape=(num_channels, int(len(raw_data) / (4 * num_channels))),
dtype=np.float32,
@@ -149,17 +269,40 @@ class DLog:
return True
+def detect_changepoints(dlog, num_samples):
+ ret_slots = list()
+ for slot in dlog.slots:
+ ret_slot = dict()
+ for unit in slot:
+ print(f"changepoint detection for {slot} {unit}")
+ pelt = PELT(running_mean(slot[unit].data, 10), num_samples=num_samples)
+ changepoints = pelt.get_changepoints()
+ prev = 0
+ ret_slot[unit] = list()
+ for cp in changepoints:
+ cp = cp - 1
+ ret_slot[unit].append(
+ {
+ "interval": [dlog.timestamps[prev], dlog.timestamps[cp]],
+ "mean": np.mean(slot[unit].data[prev:cp]),
+ }
+ )
+ prev = cp
+ ret_slots.append(ret_slot)
+ return ret_slots
+
+
def print_stats(dlog):
if dlog.observed_duration_equals_expectation():
print(
"Measurement duration: {:d} seconds at {:f} µs per sample".format(
- dlog.planned_duration, dlog.interval * 1000000
+ dlog.planned_duration, dlog.interval * 1_000_000
)
)
else:
print(
"Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format(
- dlog.observed_duration, dlog.planned_duration, dlog.interval * 1000000
+ dlog.observed_duration, dlog.planned_duration, dlog.interval * 1_000_000
)
)
@@ -303,12 +446,40 @@ def export_csv(dlog, filename):
writer.writerow([dlog.timestamps[row]] + list(dlog.data[:, row]))
+def export_json(dlog, filename, extra_data=dict()):
+ json_channels = list()
+ for channel in dlog.channels:
+ json_channels.append({"smu": channel.smu, "unit": channel.unit})
+ json_data = {"channels": json_channels}
+ json_data.update(extra_data)
+ with open(filename, "w") as f:
+ json.dump(json_data, f, cls=NpEncoder)
+
+
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__
)
parser.add_argument(
- "--csv-export", type=str, help="Export measurements to CSV file"
+ "--csv-export",
+ metavar="FILENAME",
+ type=str,
+ help="Export measurements to CSV file",
+ )
+ parser.add_argument(
+ "--json-export",
+ metavar="FILENAME",
+ type=str,
+ help="Export analysis results (e.g. changepoints) to JSON file",
+ )
+ parser.add_argument(
+ "--limit",
+ type=float,
+ metavar="N",
+ help="Limit analysis to the first N seconds of data",
+ )
+ parser.add_argument(
+ "--pelt", metavar="JUMP", type=int, help="Perform changepoint detection"
)
parser.add_argument(
"--plot",
@@ -324,7 +495,7 @@ def main():
args = parser.parse_args()
- dlog = DLog(args.dlog_file)
+ dlog = DLog(args.dlog_file, args.limit)
if args.stat:
print_stats(dlog)
@@ -332,6 +503,15 @@ def main():
if args.csv_export:
export_csv(dlog, args.csv_export)
+ if args.pelt:
+ changepoints = detect_changepoints(dlog, num_samples=args.pelt)
+
+ if args.json_export:
+ extra_data = dict()
+ if args.pelt:
+ extra_data["changepoints"] = changepoints
+ export_json(dlog, args.json_export, extra_data)
+
if args.plot:
if args.plot == "P" and dlog.all_data_slots_have_power():
show_power_plot(dlog)