diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | lib/drift.py | 339 | ||||
-rw-r--r-- | lib/functions.py | 1 | ||||
-rw-r--r-- | lib/loader/energytrace.py | 42 | ||||
-rw-r--r-- | lib/loader/generic.py | 115 | ||||
-rw-r--r-- | lib/loader/keysight.py | 3 |
6 files changed, 443 insertions, 58 deletions
@@ -1,6 +1,7 @@ /lib/__pycache__/ *.pyc *.swp +/cache/ /htmlcov/ /.coverage* .idea/ diff --git a/lib/drift.py b/lib/drift.py new file mode 100644 index 0000000..cb769f4 --- /dev/null +++ b/lib/drift.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python3 + +import numpy as np +import os +import scipy +from bisect import bisect_left, bisect_right + + +def compensate(data, timestamps, event_timestamps, offline_index=None): + """Use ruptures (e.g. Pelt, Dynp) to determine transition timestamps.""" + from dfatool.pelt import PELT + + # "rbf" und "l2" scheinen ähnlich gut zu funktionieren, l2 ist schneller. l1 ist wohl noch besser. + # PELT does not find changepoints for transitions which span just four or five data points (i.e., transitions shorter than ~2ms). + # Workaround: Double the data rate passed to PELT by interpolation ("stretch=2") + pelt = PELT(with_multiprocessing=False, stretch=2, min_dist=1, cache_dir=None) + expected_transition_start_timestamps = event_timestamps[::2] + transition_start_candidate_weights = list() + drift = 0 + + # TODO auch Kandidatenbestimmung per Ableitung probieren + # (-> Umgebungsvariable zur Auswahl) + + pelt_traces = list() + range_timestamps = list() + candidate_weights = list() + + for i, expected_start_ts in enumerate(expected_transition_start_timestamps): + expected_end_ts = event_timestamps[2 * i + 1] + # assumption: maximum deviation between expected and actual timestamps is 5ms. + # We use ±10ms to have some contetx for PELT + et_timestamps_start = bisect_left(timestamps, expected_start_ts - 10e-3) + et_timestamps_end = bisect_right(timestamps, expected_end_ts + 10e-3) + range_timestamps.append(timestamps[et_timestamps_start : et_timestamps_end + 1]) + pelt_traces.append(data[et_timestamps_start : et_timestamps_end + 1]) + + # TODO for greedy mode, perform changepoint detection between greedy steps + # (-> the expected changepoint area is well-known, Dynp with 1/2 changepoints + # should work much better than "somewhere in these 20ms there should be a transition") + + if os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY"): + penalties = (int(os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY")),) + else: + penalties = (1, 2, 5, 10, 15, 20) + for penalty in penalties: + changepoints_by_transition = pelt.get_changepoints(pelt_traces, penalty=penalty) + for i in range(len(expected_transition_start_timestamps)): + candidate_weights.append(dict()) + for changepoint in changepoints_by_transition[i]: + if changepoint in candidate_weights[i]: + candidate_weights[i][changepoint] += 1 + else: + candidate_weights[i][changepoint] = 1 + + for i, expected_start_ts in enumerate(expected_transition_start_timestamps): + + # TODO ist expected_start_ts wirklich eine gute Referenz? Wenn vor einer Transition ein UART-Dump + # liegt, dürfte expected_end_ts besser sein, dann muss allerdings bei der compensation wieder auf + # start_ts zurückgerechnet werden. + transition_start_candidate_weights.append( + list( + map( + lambda k: ( + range_timestamps[i][k] - expected_start_ts, + range_timestamps[i][k] - expected_end_ts, + candidate_weights[i][k], + ), + sorted(candidate_weights[i].keys()), + ) + ) + ) + + if os.getenv("DFATOOL_COMPENSATE_DRIFT_GREEDY"): + return compensate_drift_greedy( + event_timestamps, transition_start_candidate_weights + ) + + return compensate_drift_graph( + event_timestamps, + transition_start_candidate_weights, + offline_index=offline_index, + ) + + +def compensate_drift_graph( + event_timestamps, transition_start_candidate_weights, offline_index=None +): + # Algorithm: Obtain the shortest path in a layered graph made up from + # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition. + # Each node in layer i contains a directed edge to each node in layer i+1. + # The edge weight is the drift delta between the two nodes. So, if, + # node X (transition i, candidate a) has a drift of 5, and node Y + # (transition i+1, candidate b) has a drift of -2, the weight is 7. + # The first and last layer of the graph consists of a single node + # with a drift of 0, representing the start / end synchronization pulse, respectively. + + prev_nodes = [0] + prev_drifts = [0] + node_drifts = [0] + edge_srcs = list() + edge_dsts = list() + csr_weights = list() + + # (transition index) -> [candidate 0/start node, candidate 0/end node, candidate 1/start node, ...] + nodes_by_transition_index = dict() + + # (node number) -> (transition index, candidate index, is_end) + # (-> transition_start_candidate_weights[transition index][candidate index][is_end]) + transition_by_node = dict() + + compensated_timestamps = list() + + # default: up to two nodes may be skipped + max_skip_count = 2 + + if os.getenv("DFATOOL_DC_MAX_SKIP"): + max_skip_count = int(os.getenv("DFATOOL_DC_MAX_SKIP")) + + for transition_index, candidates in enumerate(transition_start_candidate_weights): + new_nodes = list() + new_drifts = list() + i_offset = prev_nodes[-1] + 1 + nodes_by_transition_index[transition_index] = list() + for new_node_i, (new_drift_start, new_drift_end, _) in enumerate(candidates): + for is_end, new_drift in enumerate((new_drift_start, new_drift_end)): + new_node = i_offset + new_node_i * 2 + is_end + nodes_by_transition_index[transition_index].append(new_node) + transition_by_node[new_node] = (transition_index, new_node_i, is_end) + new_nodes.append(new_node) + new_drifts.append(new_drift) + node_drifts.append(new_drift) + for prev_node_i, prev_node in enumerate(prev_nodes): + prev_drift = prev_drifts[prev_node_i] + + edge_srcs.append(prev_node) + edge_dsts.append(new_node) + + delta_drift = np.abs(prev_drift - new_drift) + # TODO evaluate "delta_drift ** 2" or similar nonlinear + # weights -> further penalize large drift deltas + csr_weights.append(delta_drift) + + # a transition's candidate list may be empty + if len(new_nodes): + prev_nodes = new_nodes + prev_drifts = new_drifts + + # add an end node for shortest path search + # (end node == final sync, so drift == 0) + new_node = prev_nodes[-1] + 1 + for prev_node_i, prev_node in enumerate(prev_nodes): + prev_drift = prev_drifts[prev_node_i] + edge_srcs.append(prev_node) + edge_dsts.append(new_node) + csr_weights.append(np.abs(prev_drift)) + + # Add "skip" edges spanning from transition i to transition i+n (n > 1). + # These avoid synchronization errors caused by transitions wich are + # not found by changepiont detection, as long as they are sufficiently rare. + for transition_index, candidates in enumerate(transition_start_candidate_weights): + for skip_count in range(2, max_skip_count + 2): + if transition_index < skip_count: + continue + for from_node in nodes_by_transition_index[transition_index - skip_count]: + for to_node in nodes_by_transition_index[transition_index]: + + (from_trans_i, from_candidate_i, from_is_end) = transition_by_node[ + from_node + ] + to_trans_i, to_candidate_i, to_is_end = transition_by_node[to_node] + + assert transition_index - skip_count == from_trans_i + assert transition_index == to_trans_i + + from_drift = transition_start_candidate_weights[from_trans_i][ + from_candidate_i + ][from_is_end] + to_drift = transition_start_candidate_weights[to_trans_i][ + to_candidate_i + ][to_is_end] + + edge_srcs.append(from_node) + edge_dsts.append(to_node) + csr_weights.append( + np.abs(from_drift - to_drift) + (skip_count - 1) * 270e-6 + ) + + sm = scipy.sparse.csr_matrix( + (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1) + ) + dm, predecessors = scipy.sparse.csgraph.shortest_path( + sm, return_predecessors=True, indices=0 + ) + + nodes = list() + pred = predecessors[-1] + while pred > 0: + nodes.append(pred) + pred = predecessors[pred] + + nodes = list(reversed(nodes)) + + # first and last node are not included in "nodes" as they represent + # the start/stop sync pulse (and not a transition with sync candidates) + + prev_transition = -1 + for i, node in enumerate(nodes): + transition, _, _ = transition_by_node[node] + drift = node_drifts[node] + + while transition - prev_transition > 1: + prev_drift = node_drifts[nodes[i - 1]] + prev_transition += 1 + expected_start_ts = event_timestamps[prev_transition * 2] + prev_drift + expected_end_ts = event_timestamps[prev_transition * 2 + 1] + prev_drift + compensated_timestamps.append(expected_start_ts) + compensated_timestamps.append(expected_end_ts) + + expected_start_ts = event_timestamps[transition * 2] + drift + expected_end_ts = event_timestamps[transition * 2 + 1] + drift + compensated_timestamps.append(expected_start_ts) + compensated_timestamps.append(expected_end_ts) + prev_transition = transition + + # handle skips over the last few transitions, if any + transition = len(transition_start_candidate_weights) - 1 + while transition - prev_transition > 0: + prev_drift = node_drifts[nodes[-1]] + prev_transition += 1 + expected_start_ts = event_timestamps[prev_transition * 2] + prev_drift + expected_end_ts = event_timestamps[prev_transition * 2 + 1] + prev_drift + compensated_timestamps.append(expected_start_ts) + compensated_timestamps.append(expected_end_ts) + + if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"): + import json + from dfatool.utils import NpEncoder + + expected_transition_start_timestamps = event_timestamps[::2] + filename = os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION") + filename = f"{filename}.{offline_index}" + + with open(filename, "w") as f: + json.dump( + [ + expected_transition_start_timestamps, + transition_start_candidate_weights, + ], + f, + cls=NpEncoder, + ) + + return compensated_timestamps + + +def compensate_drift_greedy(event_timestamps, transition_start_candidate_weights): + drift = 0 + expected_transition_start_timestamps = event_timestamps[::2] + compensated_timestamps = list() + + for i, expected_start_ts in enumerate(expected_transition_start_timestamps): + candidates = sorted( + map( + lambda x: x[0] + expected_start_ts, + transition_start_candidate_weights[i], + ) + ) + expected_start_ts += drift + expected_end_ts = event_timestamps[2 * i + 1] + drift + + # choose the next candidates around the expected sync point. + start_right_sync = bisect_left(candidates, expected_start_ts) + start_left_sync = start_right_sync - 1 + + end_right_sync = bisect_left(candidates, expected_end_ts) + end_left_sync = end_right_sync - 1 + + if start_right_sync >= 0: + start_left_diff = expected_start_ts - candidates[start_left_sync] + else: + start_left_diff = np.inf + + if start_right_sync < len(candidates): + start_right_diff = candidates[start_right_sync] - expected_start_ts + else: + start_right_diff = np.inf + + if end_left_sync >= 0: + end_left_diff = expected_end_ts - candidates[end_left_sync] + else: + end_left_diff = np.inf + + if end_right_sync < len(candidates): + end_right_diff = candidates[end_right_sync] - expected_end_ts + else: + end_right_diff = np.inf + + drift_candidates = ( + start_left_diff, + start_right_diff, + end_left_diff, + end_right_diff, + ) + min_drift_i = np.argmin(drift_candidates) + min_drift = min(drift_candidates) + + if min_drift < 5e-4: + if min_drift_i % 2 == 0: + # left + compensated_timestamps.append(expected_start_ts - min_drift) + compensated_timestamps.append(expected_end_ts - min_drift) + drift -= min_drift + else: + # right + compensated_timestamps.append(expected_start_ts + min_drift) + compensated_timestamps.append(expected_end_ts + min_drift) + drift += min_drift + + else: + compensated_timestamps.append(expected_start_ts) + compensated_timestamps.append(expected_end_ts) + + if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"): + import json + from dfatool.utils import NpEncoder + + expected_transition_start_timestamps = event_timestamps[::2] + + with open(os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"), "w") as f: + json.dump( + [ + expected_transition_start_timestamps, + transition_start_candidate_weights, + ], + f, + cls=NpEncoder, + ) + + return compensated_timestamps diff --git a/lib/functions.py b/lib/functions.py index 7e4a999..5c2e8ff 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -284,6 +284,7 @@ class SplitFunction(ModelFunction): { "type": "split", "paramIndex": self.param_index, + # TODO zusätzlich paramName "child": dict([[k, v.to_json()] for k, v in self.child.items()]), } ) diff --git a/lib/loader/energytrace.py b/lib/loader/energytrace.py index 103a4d6..d7f3f88 100644 --- a/lib/loader/energytrace.py +++ b/lib/loader/energytrace.py @@ -6,6 +6,7 @@ import numpy as np import os import re +from dfatool.loader.generic import ExternalTimerSync from dfatool.utils import NpEncoder, soft_cast_int logger = logging.getLogger(__name__) @@ -724,7 +725,7 @@ class EnergyTraceWithLogicAnalyzer: return energy_trace_new -class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer): +class EnergyTraceWithTimer(ExternalTimerSync): def __init__( self, voltage: float, @@ -748,7 +749,10 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer): self.with_traces = with_traces self.errors = list() - super().__init__(voltage, state_duration, transition_names, with_traces) + self.sync_min_duration = 0.7 + self.sync_min_low_count = 3 + self.sync_min_high_count = 1 + self.sync_power = 0.011 def load_data(self, log_data): self.sync_data = None @@ -760,34 +764,6 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer): self.hw_statechange_indexes, ) = _load_energytrace(log_data[0]) - def analyze_states(self, traces, offline_index: int): - - # Start "Synchronization pulse" - timestamps = [0, 10, 1e6, 1e6 + 10] - - # The first trace doesn't start immediately, append offset saved by OnboarTimerHarness - timestamps.append(timestamps[-1] + traces[0]["start_offset"][offline_index]) - for tr in traces: - for t in tr["trace"]: - # print(t["online_aggregates"]["duration"][offline_index]) - try: - timestamps.append( - timestamps[-1] - + t["online_aggregates"]["duration"][offline_index] - ) - except IndexError: - self.errors.append( - f"""offline_index {offline_index} missing in trace {tr["id"]}""" - ) - return list() - - # Stop "Synchronization pulses". The first one has already started. - timestamps.extend(np.array([10, 1e6, 1e6 + 10]) + timestamps[-1]) - timestamps.extend(np.array([0, 10, 1e6, 1e6 + 10]) + 250e3 + timestamps[-1]) - - timestamps = list(np.array(timestamps) * 1e-6) - - from dfatool.lennart.SigrokInterface import SigrokResult - - self.sync_data = SigrokResult(timestamps, False) - return super().analyze_states(traces, offline_index) + # for analyze_states + self.timestamps = self.interval_start_timestamp + self.data = self.interval_power diff --git a/lib/loader/generic.py b/lib/loader/generic.py index fce823c..a34f486 100644 --- a/lib/loader/generic.py +++ b/lib/loader/generic.py @@ -1,39 +1,71 @@ #!/usr/bin/env python3 +import json +import logging import numpy as np import os from bisect import bisect_right +from dfatool.utils import NpEncoder + +logger = logging.getLogger(__name__) class ExternalTimerSync: def __init__(self): raise NotImplementedError("must be implemented in sub-class") + def assert_sync_areas(self, sync_areas): + # may be implemented in sub-class + pass + + def compensate_drift(self, data, timestamps, event_timestamps, offline_index=None): + # adjust intermediate timestamps. There is a small error between consecutive measurements, + # again due to drift caused by random temperature fluctuation. The error increases with + # increased distance from synchronization points: It is negligible at the start and end + # of the measurement and may be quite high around the middle. That's just the bounds, though -- + # you may also have a low error in the middle and error peaks elsewhere. + # As the start and stop timestamps have already been synchronized, we only adjust + # actual transition timestamps here. + if os.getenv("DFATOOL_COMPENSATE_DRIFT"): + import dfatool.drift + + return dfatool.drift.compensate( + data, timestamps, event_timestamps, offline_index=offline_index + ) + return event_timestamps + # very similar to DataProcessor.getStatesdfatool # requires: # * self.data (e.g. power readings) # * self.timestamps (timstamps in seconds) + # * self.sync_min_high_count, self.sync_min_low_count: outlier handling in synchronization pulse detection # * self.sync_power, self.sync_min_duration: synchronization pulse parameters. one pulse before the measurement, two pulses afterwards # expected_trace must contain online timestamps def analyze_states(self, expected_trace, repeat_id): sync_start = None sync_timestamps = list() - above_count = 0 - below_count = 0 + high_count = 0 + low_count = 0 + high_ts = None + low_ts = None for i, timestamp in enumerate(self.timestamps): power = self.data[i] if power > self.sync_power: - above_count += 1 - below_count = 0 + if high_count == 0: + high_ts = timestamp + high_count += 1 + low_count = 0 else: - above_count = 0 - below_count += 1 - - if above_count > 2 and sync_start is None: - sync_start = timestamp - elif below_count > 2 and sync_start is not None: - if timestamp - sync_start > self.sync_min_duration: - sync_end = timestamp + if low_count == 0: + low_ts = timestamp + high_count = 0 + low_count += 1 + + if high_count >= self.sync_min_high_count and sync_start is None: + sync_start = high_ts + elif low_count >= self.sync_min_low_count and sync_start is not None: + if low_ts - sync_start > self.sync_min_duration: + sync_end = low_ts sync_timestamps.append((sync_start, sync_end)) sync_start = None print(sync_timestamps) @@ -45,6 +77,8 @@ class ExternalTimerSync: self.errors.append(f"Synchronization pulses == {sync_timestamps}") return list() + self.assert_sync_areas(sync_timestamps) + start_ts = sync_timestamps[0][1] end_ts = sync_timestamps[1][0] @@ -52,16 +86,30 @@ class ExternalTimerSync: online_timestamps = [0, expected_trace[0]["start_offset"][repeat_id]] # remaining events from the end of the first transition (start of second state) to the end of the last observed state - for trace in expected_trace: - for word in trace["trace"]: - online_timestamps.append( - online_timestamps[-1] - + word["online_aggregates"]["duration"][repeat_id] - ) + try: + for trace in expected_trace: + for word in trace["trace"]: + online_timestamps.append( + online_timestamps[-1] + + word["online_aggregates"]["duration"][repeat_id] + ) + except IndexError: + self.errors.append( + f"""offline_index {repeat_id} missing in trace {trace["id"]}""" + ) + return list() online_timestamps = np.array(online_timestamps) * 1e-6 online_timestamps = ( - online_timestamps * ((end_ts - start_ts) / online_timestamps[-1]) + start_ts + online_timestamps + * ((end_ts - start_ts) / (online_timestamps[-1] - online_timestamps[0])) + + start_ts + ) + + # drift compensation works on transition boundaries. Exclude start of first state and end of last state. + # Those are defined to have zero drift anyways. + online_timestamps[1:-1] = self.compensate_drift( + self.data, self.timestamps, online_timestamps[1:-1], repeat_id ) trigger_edges = list() @@ -166,9 +214,31 @@ class ExternalTimerSync: ): self.plot_sync(online_timestamps) # <- plot traces with sync annotatons # self.plot_sync(names) # <- plot annotated traces (with state/transition names) + # TODO LASYNC -> SYNC + if os.getenv("DFATOOL_EXPORT_LASYNC") is not None: + filename = os.getenv("DFATOOL_EXPORT_LASYNC") + f"_{repeat_id}.json" + with open(filename, "w") as f: + json.dump(self._export_sync(online_timestamps), f, cls=NpEncoder) + logger.info("Exported sync timestamps to {filename}") return energy_trace + def _export_sync(self, online_timestamps): + # [(1st trans start, 1st trans stop), (2nd trans start, 2nd trans stop), ...] + sync_timestamps = list() + + for i in range(1, len(online_timestamps) - 1, 2): + sync_timestamps.append((online_timestamps[i], online_timestamps[i + 1])) + + # input timestamps + timestamps = self.timestamps + + # input data, e.g. power + data = self.data + + # TODO "power" -> "data" + return {"sync": sync_timestamps, "timestamps": timestamps, "power": data} + def plot_sync(self, event_timestamps, annotateData=None): """ Plots the power usage and the timestamps by logic analyzer @@ -214,12 +284,7 @@ class ExternalTimerSync: plt.plot(self.timestamps, self.data, label="Leistung") plt.plot(self.timestamps, np.gradient(self.data), label="dP/dt") - plt.plot( - rectCurve_with_drift[0], - rectCurve_with_drift[1], - "-g", - label="Events", - ) + plt.plot(rectCurve_with_drift[0], rectCurve_with_drift[1], "-g", label="Events") plt.xlabel("Zeit [s]") plt.ylabel("Leistung [W]") diff --git a/lib/loader/keysight.py b/lib/loader/keysight.py index 2243361..77330ad 100644 --- a/lib/loader/keysight.py +++ b/lib/loader/keysight.py @@ -66,6 +66,9 @@ class DLog(ExternalTimerSync): self.errors = list() self.sync_min_duration = 0.7 + self.sync_min_low_count = 3 + self.sync_min_high_count = 3 + # TODO auto-detect self.sync_power = 10e-3 |