summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--lib/drift.py339
-rw-r--r--lib/functions.py1
-rw-r--r--lib/loader/energytrace.py42
-rw-r--r--lib/loader/generic.py115
-rw-r--r--lib/loader/keysight.py3
6 files changed, 443 insertions, 58 deletions
diff --git a/.gitignore b/.gitignore
index b25e92e..cacf91c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
/lib/__pycache__/
*.pyc
*.swp
+/cache/
/htmlcov/
/.coverage*
.idea/
diff --git a/lib/drift.py b/lib/drift.py
new file mode 100644
index 0000000..cb769f4
--- /dev/null
+++ b/lib/drift.py
@@ -0,0 +1,339 @@
+#!/usr/bin/env python3
+
+import numpy as np
+import os
+import scipy
+from bisect import bisect_left, bisect_right
+
+
+def compensate(data, timestamps, event_timestamps, offline_index=None):
+ """Use ruptures (e.g. Pelt, Dynp) to determine transition timestamps."""
+ from dfatool.pelt import PELT
+
+ # "rbf" und "l2" scheinen ähnlich gut zu funktionieren, l2 ist schneller. l1 ist wohl noch besser.
+ # PELT does not find changepoints for transitions which span just four or five data points (i.e., transitions shorter than ~2ms).
+ # Workaround: Double the data rate passed to PELT by interpolation ("stretch=2")
+ pelt = PELT(with_multiprocessing=False, stretch=2, min_dist=1, cache_dir=None)
+ expected_transition_start_timestamps = event_timestamps[::2]
+ transition_start_candidate_weights = list()
+ drift = 0
+
+ # TODO auch Kandidatenbestimmung per Ableitung probieren
+ # (-> Umgebungsvariable zur Auswahl)
+
+ pelt_traces = list()
+ range_timestamps = list()
+ candidate_weights = list()
+
+ for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
+ expected_end_ts = event_timestamps[2 * i + 1]
+ # assumption: maximum deviation between expected and actual timestamps is 5ms.
+ # We use ±10ms to have some contetx for PELT
+ et_timestamps_start = bisect_left(timestamps, expected_start_ts - 10e-3)
+ et_timestamps_end = bisect_right(timestamps, expected_end_ts + 10e-3)
+ range_timestamps.append(timestamps[et_timestamps_start : et_timestamps_end + 1])
+ pelt_traces.append(data[et_timestamps_start : et_timestamps_end + 1])
+
+ # TODO for greedy mode, perform changepoint detection between greedy steps
+ # (-> the expected changepoint area is well-known, Dynp with 1/2 changepoints
+ # should work much better than "somewhere in these 20ms there should be a transition")
+
+ if os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY"):
+ penalties = (int(os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY")),)
+ else:
+ penalties = (1, 2, 5, 10, 15, 20)
+ for penalty in penalties:
+ changepoints_by_transition = pelt.get_changepoints(pelt_traces, penalty=penalty)
+ for i in range(len(expected_transition_start_timestamps)):
+ candidate_weights.append(dict())
+ for changepoint in changepoints_by_transition[i]:
+ if changepoint in candidate_weights[i]:
+ candidate_weights[i][changepoint] += 1
+ else:
+ candidate_weights[i][changepoint] = 1
+
+ for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
+
+ # TODO ist expected_start_ts wirklich eine gute Referenz? Wenn vor einer Transition ein UART-Dump
+ # liegt, dürfte expected_end_ts besser sein, dann muss allerdings bei der compensation wieder auf
+ # start_ts zurückgerechnet werden.
+ transition_start_candidate_weights.append(
+ list(
+ map(
+ lambda k: (
+ range_timestamps[i][k] - expected_start_ts,
+ range_timestamps[i][k] - expected_end_ts,
+ candidate_weights[i][k],
+ ),
+ sorted(candidate_weights[i].keys()),
+ )
+ )
+ )
+
+ if os.getenv("DFATOOL_COMPENSATE_DRIFT_GREEDY"):
+ return compensate_drift_greedy(
+ event_timestamps, transition_start_candidate_weights
+ )
+
+ return compensate_drift_graph(
+ event_timestamps,
+ transition_start_candidate_weights,
+ offline_index=offline_index,
+ )
+
+
+def compensate_drift_graph(
+ event_timestamps, transition_start_candidate_weights, offline_index=None
+):
+ # Algorithm: Obtain the shortest path in a layered graph made up from
+ # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition.
+ # Each node in layer i contains a directed edge to each node in layer i+1.
+ # The edge weight is the drift delta between the two nodes. So, if,
+ # node X (transition i, candidate a) has a drift of 5, and node Y
+ # (transition i+1, candidate b) has a drift of -2, the weight is 7.
+ # The first and last layer of the graph consists of a single node
+ # with a drift of 0, representing the start / end synchronization pulse, respectively.
+
+ prev_nodes = [0]
+ prev_drifts = [0]
+ node_drifts = [0]
+ edge_srcs = list()
+ edge_dsts = list()
+ csr_weights = list()
+
+ # (transition index) -> [candidate 0/start node, candidate 0/end node, candidate 1/start node, ...]
+ nodes_by_transition_index = dict()
+
+ # (node number) -> (transition index, candidate index, is_end)
+ # (-> transition_start_candidate_weights[transition index][candidate index][is_end])
+ transition_by_node = dict()
+
+ compensated_timestamps = list()
+
+ # default: up to two nodes may be skipped
+ max_skip_count = 2
+
+ if os.getenv("DFATOOL_DC_MAX_SKIP"):
+ max_skip_count = int(os.getenv("DFATOOL_DC_MAX_SKIP"))
+
+ for transition_index, candidates in enumerate(transition_start_candidate_weights):
+ new_nodes = list()
+ new_drifts = list()
+ i_offset = prev_nodes[-1] + 1
+ nodes_by_transition_index[transition_index] = list()
+ for new_node_i, (new_drift_start, new_drift_end, _) in enumerate(candidates):
+ for is_end, new_drift in enumerate((new_drift_start, new_drift_end)):
+ new_node = i_offset + new_node_i * 2 + is_end
+ nodes_by_transition_index[transition_index].append(new_node)
+ transition_by_node[new_node] = (transition_index, new_node_i, is_end)
+ new_nodes.append(new_node)
+ new_drifts.append(new_drift)
+ node_drifts.append(new_drift)
+ for prev_node_i, prev_node in enumerate(prev_nodes):
+ prev_drift = prev_drifts[prev_node_i]
+
+ edge_srcs.append(prev_node)
+ edge_dsts.append(new_node)
+
+ delta_drift = np.abs(prev_drift - new_drift)
+ # TODO evaluate "delta_drift ** 2" or similar nonlinear
+ # weights -> further penalize large drift deltas
+ csr_weights.append(delta_drift)
+
+ # a transition's candidate list may be empty
+ if len(new_nodes):
+ prev_nodes = new_nodes
+ prev_drifts = new_drifts
+
+ # add an end node for shortest path search
+ # (end node == final sync, so drift == 0)
+ new_node = prev_nodes[-1] + 1
+ for prev_node_i, prev_node in enumerate(prev_nodes):
+ prev_drift = prev_drifts[prev_node_i]
+ edge_srcs.append(prev_node)
+ edge_dsts.append(new_node)
+ csr_weights.append(np.abs(prev_drift))
+
+ # Add "skip" edges spanning from transition i to transition i+n (n > 1).
+ # These avoid synchronization errors caused by transitions wich are
+ # not found by changepiont detection, as long as they are sufficiently rare.
+ for transition_index, candidates in enumerate(transition_start_candidate_weights):
+ for skip_count in range(2, max_skip_count + 2):
+ if transition_index < skip_count:
+ continue
+ for from_node in nodes_by_transition_index[transition_index - skip_count]:
+ for to_node in nodes_by_transition_index[transition_index]:
+
+ (from_trans_i, from_candidate_i, from_is_end) = transition_by_node[
+ from_node
+ ]
+ to_trans_i, to_candidate_i, to_is_end = transition_by_node[to_node]
+
+ assert transition_index - skip_count == from_trans_i
+ assert transition_index == to_trans_i
+
+ from_drift = transition_start_candidate_weights[from_trans_i][
+ from_candidate_i
+ ][from_is_end]
+ to_drift = transition_start_candidate_weights[to_trans_i][
+ to_candidate_i
+ ][to_is_end]
+
+ edge_srcs.append(from_node)
+ edge_dsts.append(to_node)
+ csr_weights.append(
+ np.abs(from_drift - to_drift) + (skip_count - 1) * 270e-6
+ )
+
+ sm = scipy.sparse.csr_matrix(
+ (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1)
+ )
+ dm, predecessors = scipy.sparse.csgraph.shortest_path(
+ sm, return_predecessors=True, indices=0
+ )
+
+ nodes = list()
+ pred = predecessors[-1]
+ while pred > 0:
+ nodes.append(pred)
+ pred = predecessors[pred]
+
+ nodes = list(reversed(nodes))
+
+ # first and last node are not included in "nodes" as they represent
+ # the start/stop sync pulse (and not a transition with sync candidates)
+
+ prev_transition = -1
+ for i, node in enumerate(nodes):
+ transition, _, _ = transition_by_node[node]
+ drift = node_drifts[node]
+
+ while transition - prev_transition > 1:
+ prev_drift = node_drifts[nodes[i - 1]]
+ prev_transition += 1
+ expected_start_ts = event_timestamps[prev_transition * 2] + prev_drift
+ expected_end_ts = event_timestamps[prev_transition * 2 + 1] + prev_drift
+ compensated_timestamps.append(expected_start_ts)
+ compensated_timestamps.append(expected_end_ts)
+
+ expected_start_ts = event_timestamps[transition * 2] + drift
+ expected_end_ts = event_timestamps[transition * 2 + 1] + drift
+ compensated_timestamps.append(expected_start_ts)
+ compensated_timestamps.append(expected_end_ts)
+ prev_transition = transition
+
+ # handle skips over the last few transitions, if any
+ transition = len(transition_start_candidate_weights) - 1
+ while transition - prev_transition > 0:
+ prev_drift = node_drifts[nodes[-1]]
+ prev_transition += 1
+ expected_start_ts = event_timestamps[prev_transition * 2] + prev_drift
+ expected_end_ts = event_timestamps[prev_transition * 2 + 1] + prev_drift
+ compensated_timestamps.append(expected_start_ts)
+ compensated_timestamps.append(expected_end_ts)
+
+ if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"):
+ import json
+ from dfatool.utils import NpEncoder
+
+ expected_transition_start_timestamps = event_timestamps[::2]
+ filename = os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION")
+ filename = f"{filename}.{offline_index}"
+
+ with open(filename, "w") as f:
+ json.dump(
+ [
+ expected_transition_start_timestamps,
+ transition_start_candidate_weights,
+ ],
+ f,
+ cls=NpEncoder,
+ )
+
+ return compensated_timestamps
+
+
+def compensate_drift_greedy(event_timestamps, transition_start_candidate_weights):
+ drift = 0
+ expected_transition_start_timestamps = event_timestamps[::2]
+ compensated_timestamps = list()
+
+ for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
+ candidates = sorted(
+ map(
+ lambda x: x[0] + expected_start_ts,
+ transition_start_candidate_weights[i],
+ )
+ )
+ expected_start_ts += drift
+ expected_end_ts = event_timestamps[2 * i + 1] + drift
+
+ # choose the next candidates around the expected sync point.
+ start_right_sync = bisect_left(candidates, expected_start_ts)
+ start_left_sync = start_right_sync - 1
+
+ end_right_sync = bisect_left(candidates, expected_end_ts)
+ end_left_sync = end_right_sync - 1
+
+ if start_right_sync >= 0:
+ start_left_diff = expected_start_ts - candidates[start_left_sync]
+ else:
+ start_left_diff = np.inf
+
+ if start_right_sync < len(candidates):
+ start_right_diff = candidates[start_right_sync] - expected_start_ts
+ else:
+ start_right_diff = np.inf
+
+ if end_left_sync >= 0:
+ end_left_diff = expected_end_ts - candidates[end_left_sync]
+ else:
+ end_left_diff = np.inf
+
+ if end_right_sync < len(candidates):
+ end_right_diff = candidates[end_right_sync] - expected_end_ts
+ else:
+ end_right_diff = np.inf
+
+ drift_candidates = (
+ start_left_diff,
+ start_right_diff,
+ end_left_diff,
+ end_right_diff,
+ )
+ min_drift_i = np.argmin(drift_candidates)
+ min_drift = min(drift_candidates)
+
+ if min_drift < 5e-4:
+ if min_drift_i % 2 == 0:
+ # left
+ compensated_timestamps.append(expected_start_ts - min_drift)
+ compensated_timestamps.append(expected_end_ts - min_drift)
+ drift -= min_drift
+ else:
+ # right
+ compensated_timestamps.append(expected_start_ts + min_drift)
+ compensated_timestamps.append(expected_end_ts + min_drift)
+ drift += min_drift
+
+ else:
+ compensated_timestamps.append(expected_start_ts)
+ compensated_timestamps.append(expected_end_ts)
+
+ if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"):
+ import json
+ from dfatool.utils import NpEncoder
+
+ expected_transition_start_timestamps = event_timestamps[::2]
+
+ with open(os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"), "w") as f:
+ json.dump(
+ [
+ expected_transition_start_timestamps,
+ transition_start_candidate_weights,
+ ],
+ f,
+ cls=NpEncoder,
+ )
+
+ return compensated_timestamps
diff --git a/lib/functions.py b/lib/functions.py
index 7e4a999..5c2e8ff 100644
--- a/lib/functions.py
+++ b/lib/functions.py
@@ -284,6 +284,7 @@ class SplitFunction(ModelFunction):
{
"type": "split",
"paramIndex": self.param_index,
+ # TODO zusätzlich paramName
"child": dict([[k, v.to_json()] for k, v in self.child.items()]),
}
)
diff --git a/lib/loader/energytrace.py b/lib/loader/energytrace.py
index 103a4d6..d7f3f88 100644
--- a/lib/loader/energytrace.py
+++ b/lib/loader/energytrace.py
@@ -6,6 +6,7 @@ import numpy as np
import os
import re
+from dfatool.loader.generic import ExternalTimerSync
from dfatool.utils import NpEncoder, soft_cast_int
logger = logging.getLogger(__name__)
@@ -724,7 +725,7 @@ class EnergyTraceWithLogicAnalyzer:
return energy_trace_new
-class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer):
+class EnergyTraceWithTimer(ExternalTimerSync):
def __init__(
self,
voltage: float,
@@ -748,7 +749,10 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer):
self.with_traces = with_traces
self.errors = list()
- super().__init__(voltage, state_duration, transition_names, with_traces)
+ self.sync_min_duration = 0.7
+ self.sync_min_low_count = 3
+ self.sync_min_high_count = 1
+ self.sync_power = 0.011
def load_data(self, log_data):
self.sync_data = None
@@ -760,34 +764,6 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer):
self.hw_statechange_indexes,
) = _load_energytrace(log_data[0])
- def analyze_states(self, traces, offline_index: int):
-
- # Start "Synchronization pulse"
- timestamps = [0, 10, 1e6, 1e6 + 10]
-
- # The first trace doesn't start immediately, append offset saved by OnboarTimerHarness
- timestamps.append(timestamps[-1] + traces[0]["start_offset"][offline_index])
- for tr in traces:
- for t in tr["trace"]:
- # print(t["online_aggregates"]["duration"][offline_index])
- try:
- timestamps.append(
- timestamps[-1]
- + t["online_aggregates"]["duration"][offline_index]
- )
- except IndexError:
- self.errors.append(
- f"""offline_index {offline_index} missing in trace {tr["id"]}"""
- )
- return list()
-
- # Stop "Synchronization pulses". The first one has already started.
- timestamps.extend(np.array([10, 1e6, 1e6 + 10]) + timestamps[-1])
- timestamps.extend(np.array([0, 10, 1e6, 1e6 + 10]) + 250e3 + timestamps[-1])
-
- timestamps = list(np.array(timestamps) * 1e-6)
-
- from dfatool.lennart.SigrokInterface import SigrokResult
-
- self.sync_data = SigrokResult(timestamps, False)
- return super().analyze_states(traces, offline_index)
+ # for analyze_states
+ self.timestamps = self.interval_start_timestamp
+ self.data = self.interval_power
diff --git a/lib/loader/generic.py b/lib/loader/generic.py
index fce823c..a34f486 100644
--- a/lib/loader/generic.py
+++ b/lib/loader/generic.py
@@ -1,39 +1,71 @@
#!/usr/bin/env python3
+import json
+import logging
import numpy as np
import os
from bisect import bisect_right
+from dfatool.utils import NpEncoder
+
+logger = logging.getLogger(__name__)
class ExternalTimerSync:
def __init__(self):
raise NotImplementedError("must be implemented in sub-class")
+ def assert_sync_areas(self, sync_areas):
+ # may be implemented in sub-class
+ pass
+
+ def compensate_drift(self, data, timestamps, event_timestamps, offline_index=None):
+ # adjust intermediate timestamps. There is a small error between consecutive measurements,
+ # again due to drift caused by random temperature fluctuation. The error increases with
+ # increased distance from synchronization points: It is negligible at the start and end
+ # of the measurement and may be quite high around the middle. That's just the bounds, though --
+ # you may also have a low error in the middle and error peaks elsewhere.
+ # As the start and stop timestamps have already been synchronized, we only adjust
+ # actual transition timestamps here.
+ if os.getenv("DFATOOL_COMPENSATE_DRIFT"):
+ import dfatool.drift
+
+ return dfatool.drift.compensate(
+ data, timestamps, event_timestamps, offline_index=offline_index
+ )
+ return event_timestamps
+
# very similar to DataProcessor.getStatesdfatool
# requires:
# * self.data (e.g. power readings)
# * self.timestamps (timstamps in seconds)
+ # * self.sync_min_high_count, self.sync_min_low_count: outlier handling in synchronization pulse detection
# * self.sync_power, self.sync_min_duration: synchronization pulse parameters. one pulse before the measurement, two pulses afterwards
# expected_trace must contain online timestamps
def analyze_states(self, expected_trace, repeat_id):
sync_start = None
sync_timestamps = list()
- above_count = 0
- below_count = 0
+ high_count = 0
+ low_count = 0
+ high_ts = None
+ low_ts = None
for i, timestamp in enumerate(self.timestamps):
power = self.data[i]
if power > self.sync_power:
- above_count += 1
- below_count = 0
+ if high_count == 0:
+ high_ts = timestamp
+ high_count += 1
+ low_count = 0
else:
- above_count = 0
- below_count += 1
-
- if above_count > 2 and sync_start is None:
- sync_start = timestamp
- elif below_count > 2 and sync_start is not None:
- if timestamp - sync_start > self.sync_min_duration:
- sync_end = timestamp
+ if low_count == 0:
+ low_ts = timestamp
+ high_count = 0
+ low_count += 1
+
+ if high_count >= self.sync_min_high_count and sync_start is None:
+ sync_start = high_ts
+ elif low_count >= self.sync_min_low_count and sync_start is not None:
+ if low_ts - sync_start > self.sync_min_duration:
+ sync_end = low_ts
sync_timestamps.append((sync_start, sync_end))
sync_start = None
print(sync_timestamps)
@@ -45,6 +77,8 @@ class ExternalTimerSync:
self.errors.append(f"Synchronization pulses == {sync_timestamps}")
return list()
+ self.assert_sync_areas(sync_timestamps)
+
start_ts = sync_timestamps[0][1]
end_ts = sync_timestamps[1][0]
@@ -52,16 +86,30 @@ class ExternalTimerSync:
online_timestamps = [0, expected_trace[0]["start_offset"][repeat_id]]
# remaining events from the end of the first transition (start of second state) to the end of the last observed state
- for trace in expected_trace:
- for word in trace["trace"]:
- online_timestamps.append(
- online_timestamps[-1]
- + word["online_aggregates"]["duration"][repeat_id]
- )
+ try:
+ for trace in expected_trace:
+ for word in trace["trace"]:
+ online_timestamps.append(
+ online_timestamps[-1]
+ + word["online_aggregates"]["duration"][repeat_id]
+ )
+ except IndexError:
+ self.errors.append(
+ f"""offline_index {repeat_id} missing in trace {trace["id"]}"""
+ )
+ return list()
online_timestamps = np.array(online_timestamps) * 1e-6
online_timestamps = (
- online_timestamps * ((end_ts - start_ts) / online_timestamps[-1]) + start_ts
+ online_timestamps
+ * ((end_ts - start_ts) / (online_timestamps[-1] - online_timestamps[0]))
+ + start_ts
+ )
+
+ # drift compensation works on transition boundaries. Exclude start of first state and end of last state.
+ # Those are defined to have zero drift anyways.
+ online_timestamps[1:-1] = self.compensate_drift(
+ self.data, self.timestamps, online_timestamps[1:-1], repeat_id
)
trigger_edges = list()
@@ -166,9 +214,31 @@ class ExternalTimerSync:
):
self.plot_sync(online_timestamps) # <- plot traces with sync annotatons
# self.plot_sync(names) # <- plot annotated traces (with state/transition names)
+ # TODO LASYNC -> SYNC
+ if os.getenv("DFATOOL_EXPORT_LASYNC") is not None:
+ filename = os.getenv("DFATOOL_EXPORT_LASYNC") + f"_{repeat_id}.json"
+ with open(filename, "w") as f:
+ json.dump(self._export_sync(online_timestamps), f, cls=NpEncoder)
+ logger.info("Exported sync timestamps to {filename}")
return energy_trace
+ def _export_sync(self, online_timestamps):
+ # [(1st trans start, 1st trans stop), (2nd trans start, 2nd trans stop), ...]
+ sync_timestamps = list()
+
+ for i in range(1, len(online_timestamps) - 1, 2):
+ sync_timestamps.append((online_timestamps[i], online_timestamps[i + 1]))
+
+ # input timestamps
+ timestamps = self.timestamps
+
+ # input data, e.g. power
+ data = self.data
+
+ # TODO "power" -> "data"
+ return {"sync": sync_timestamps, "timestamps": timestamps, "power": data}
+
def plot_sync(self, event_timestamps, annotateData=None):
"""
Plots the power usage and the timestamps by logic analyzer
@@ -214,12 +284,7 @@ class ExternalTimerSync:
plt.plot(self.timestamps, self.data, label="Leistung")
plt.plot(self.timestamps, np.gradient(self.data), label="dP/dt")
- plt.plot(
- rectCurve_with_drift[0],
- rectCurve_with_drift[1],
- "-g",
- label="Events",
- )
+ plt.plot(rectCurve_with_drift[0], rectCurve_with_drift[1], "-g", label="Events")
plt.xlabel("Zeit [s]")
plt.ylabel("Leistung [W]")
diff --git a/lib/loader/keysight.py b/lib/loader/keysight.py
index 2243361..77330ad 100644
--- a/lib/loader/keysight.py
+++ b/lib/loader/keysight.py
@@ -66,6 +66,9 @@ class DLog(ExternalTimerSync):
self.errors = list()
self.sync_min_duration = 0.7
+ self.sync_min_low_count = 3
+ self.sync_min_high_count = 3
+
# TODO auto-detect
self.sync_power = 10e-3