summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/lennart/DataInterface.py24
-rw-r--r--lib/lennart/DataProcessor.py825
-rw-r--r--lib/lennart/EnergyInterface.py89
-rw-r--r--lib/lennart/SigrokAPIInterface.py150
-rw-r--r--lib/lennart/SigrokCLIInterface.py2
-rw-r--r--lib/lennart/SigrokInterface.py5
6 files changed, 5 insertions, 1090 deletions
diff --git a/lib/lennart/DataInterface.py b/lib/lennart/DataInterface.py
deleted file mode 100644
index 4495db2..0000000
--- a/lib/lennart/DataInterface.py
+++ /dev/null
@@ -1,24 +0,0 @@
-class DataInterface:
- def runMeasure(self):
- """
- Implemented in subclasses.
-
- Starts the measurement
- """
- raise NotImplementedError("The method not implemented")
-
- def getData(self):
- """
- Implemented in subclasses
-
- :returns: gathered data
- """
- raise NotImplementedError("The method not implemented")
-
- def forceStopMeasure(self):
- """
- Implemented in subclasses
-
- Force stops the measurement
- """
- raise NotImplementedError("The method not implemented")
diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py
deleted file mode 100644
index c211beb..0000000
--- a/lib/lennart/DataProcessor.py
+++ /dev/null
@@ -1,825 +0,0 @@
-#!/usr/bin/env python3
-
-# XXX deprecated and unused
-import numpy as np
-import logging
-import os
-import scipy
-from bisect import bisect_left, bisect_right
-
-logger = logging.getLogger(__name__)
-
-
-class DataProcessor:
- def __init__(
- self,
- sync_data,
- et_timestamps,
- et_power,
- hw_statechange_indexes=list(),
- offline_index=None,
- ):
- """
- Creates DataProcessor object.
-
- :param sync_data: input timestamps (SigrokResult)
- :param energy_data: List of EnergyTrace datapoints
- """
- self.raw_sync_timestamps = []
- # high-precision LA/Timer timestamps at synchronization events
- self.sync_timestamps = []
- # low-precision energytrace timestamps
- self.et_timestamps = et_timestamps
- # energytrace power values
- self.et_power_values = et_power
- self.hw_statechange_indexes = hw_statechange_indexes
- self.offline_index = offline_index
- self.sync_data = sync_data
- self.start_offset = 0
-
- # TODO determine automatically based on minimum (or p1) power draw over measurement area + X
- # use 0.02 for HFXT runs
- self.power_sync_watt = 0.011
- self.power_sync_len = 0.7
- self.power_sync_max_outliers = 2
-
- def run(self):
- """
- Main Function to remove unwanted data, get synchronization points, add the offset and add drift.
- :return: None
- """
-
- # Remove bogus data before / after the measurement
-
- time_stamp_data = self.sync_data
- for x in range(1, len(time_stamp_data)):
- if time_stamp_data[x] - time_stamp_data[x - 1] > 1.3:
- time_stamp_data = time_stamp_data[x:]
- break
-
- for x in reversed(range(1, len(time_stamp_data))):
- if time_stamp_data[x] - time_stamp_data[x - 1] > 1.3:
- time_stamp_data = time_stamp_data[:x]
- break
-
- # Each synchronization pulse consists of two LogicAnalyzer pulses, so four
- # entries in time_stamp_data (rising edge, falling edge, rising edge, falling edge).
- # If we have less then twelve entries, we observed no transitions and don't even have
- # valid synchronization data. In this case, we bail out.
- if len(time_stamp_data) < 12:
- raise RuntimeError(
- f"LogicAnalyzer sync data has length {len(time_stamp_data)}, expected >= 12"
- )
-
- self.raw_sync_timestamps = time_stamp_data
-
- # NEW
- datasync_timestamps = []
- sync_start = 0
- outliers = 0
- pre_outliers_ts = None
- # TODO only consider the first few and the last few seconds for sync points
- for i, timestamp in enumerate(self.et_timestamps):
- power = self.et_power_values[i]
- if power > 0:
- if power > self.power_sync_watt:
- if sync_start is None:
- sync_start = timestamp
- outliers = 0
- else:
- # Sync point over or outliers
- if outliers == 0:
- pre_outliers_ts = timestamp
- outliers += 1
- if outliers > self.power_sync_max_outliers:
- if sync_start is not None:
- if (pre_outliers_ts - sync_start) > self.power_sync_len:
- datasync_timestamps.append(
- (sync_start, pre_outliers_ts)
- )
- sync_start = None
-
- if power > self.power_sync_watt:
- if (self.et_timestamps[-1] - sync_start) > self.power_sync_len:
- datasync_timestamps.append((sync_start, pre_outliers_ts))
-
- # time_stamp_data contains an entry for each level change on the Logic Analyzer input.
- # So, time_stamp_data[0] is the first low-to-high transition, time_stamp_data[2] the second, etc.
- # -> time_stamp_data[2] is the low-to-high transition indicating the end of the first sync pulse
- # -> time_stamp_data[-8] is the low-to-high transition indicating the start of the first after-measurement sync pulse
-
- start_timestamp = datasync_timestamps[0][1]
- start_offset = start_timestamp - time_stamp_data[2]
-
- end_timestamp = datasync_timestamps[-2][0]
- end_offset = end_timestamp - (time_stamp_data[-8] + start_offset)
- logger.debug(
- f"Iteration #{self.offline_index}: Measurement area: ET timestamp range [{start_timestamp}, {end_timestamp}]"
- )
- logger.debug(
- f"Iteration #{self.offline_index}: Measurement area: LA timestamp range [{time_stamp_data[2]}, {time_stamp_data[-8]}]"
- )
- logger.debug(
- f"Iteration #{self.offline_index}: Start/End offsets: {start_offset} / {end_offset}"
- )
-
- if abs(end_offset) > 10:
- raise RuntimeError(
- f"Iteration #{self.offline_index}: synchronization end_offset == {end_offset}. It should be no more than a few seconds."
- )
-
- # adjust start offset
- with_offset = np.array(time_stamp_data) + start_offset
- logger.debug(
- f"Iteration #{self.offline_index}: Measurement area with offset: LA timestamp range [{with_offset[2]}, {with_offset[-8]}]"
- )
-
- # adjust stop offset (may be different from start offset due to drift caused by
- # random temperature fluctuations)
- with_drift = self.addDrift(
- with_offset, end_timestamp, end_offset, start_timestamp
- )
- logger.debug(
- f"Iteration #{self.offline_index}: Measurement area with drift: LA timestamp range [{with_drift[2]}, {with_drift[-8]}]"
- )
-
- self.sync_timestamps = with_drift
-
- # adjust intermediate timestamps. There is a small error between consecutive measurements,
- # again due to drift caused by random temperature fluctuation. The error increases with
- # increased distance from synchronization points: It is negligible at the start and end
- # of the measurement and may be quite high around the middle. That's just the bounds, though --
- # you may also have a low error in the middle and error peaks elsewhere.
- # As the start and stop timestamps have already been synchronized, we only adjust
- # actual transition timestamps here.
- if os.getenv("DFATOOL_COMPENSATE_DRIFT"):
- if len(self.hw_statechange_indexes):
- # measurement was performed with EnergyTrace++
- # (i.e., with cpu state annotations)
- with_drift_compensation = self.compensateDriftPlusplus(with_drift[4:-8])
- else:
- with_drift_compensation = self.compensateDrift(with_drift[4:-8])
- try:
- self.sync_timestamps[4:-8] = with_drift_compensation
- except ValueError:
- logger.error(
- f"Iteration #{self.offline_index}: drift-compensated sequence is too short ({len(with_drift_compensation)}/{len(self.sync_timestamps[4:-8])-1})"
- )
- raise
-
- def addDrift(self, input_timestamps, end_timestamp, end_offset, start_timestamp):
- """
- Add drift to datapoints
-
- :param input_timestamps: List of timestamps (float list)
- :param end_timestamp: Timestamp of first EnergyTrace datapoint at the second-to-last sync point
- :param end_offset: the time between end_timestamp and the timestamp of synchronisation signal
- :param start_timestamp: Timestamp of last EnergyTrace datapoint at the first sync point
- :return: List of modified timestamps (float list)
- """
- endFactor = 1 + (end_offset / ((end_timestamp - end_offset) - start_timestamp))
- # endFactor assumes that the end of the first sync pulse is at timestamp 0.
- # Then, timestamps with drift := timestamps * endFactor.
- # As this is not the case (the first sync pulse ends at start_timestamp > 0), we shift the data by first
- # removing start_timestamp, then multiplying with endFactor, and then re-adding the start_timestamp.
- sync_timestamps_with_drift = (
- input_timestamps - start_timestamp
- ) * endFactor + start_timestamp
- return sync_timestamps_with_drift
-
- def compensateDriftPlusplus(self, sync_timestamps):
- """Use hardware state changes reported by EnergyTrace++ to determine transition timestamps."""
- expected_transition_start_timestamps = sync_timestamps[::2]
- compensated_timestamps = list()
- drift = 0
- for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
- expected_end_ts = sync_timestamps[i * 2 + 1]
- et_timestamps_start = bisect_left(
- self.et_timestamps, expected_start_ts - 5e-3
- )
- et_timestamps_end = bisect_right(
- self.et_timestamps, expected_start_ts + 5e-3
- )
-
- candidate_indexes = list()
- for index in self.hw_statechange_indexes:
- if et_timestamps_start <= index <= et_timestamps_end:
- candidate_indexes.append(index)
-
- if len(candidate_indexes) == 2:
- drift = self.et_timestamps[candidate_indexes[0]] - expected_start_ts
-
- compensated_timestamps.append(expected_start_ts + drift)
- compensated_timestamps.append(expected_end_ts + drift)
-
- return compensated_timestamps
-
- def compensateDrift(self, sync_timestamps):
- """Use ruptures (e.g. Pelt, Dynp) to determine transition timestamps."""
- from dfatool.pelt import PELT
-
- # "rbf" und "l2" scheinen ähnlich gut zu funktionieren, l2 ist schneller. l1 ist wohl noch besser.
- # PELT does not find changepoints for transitions which span just four or five data points (i.e., transitions shorter than ~2ms).
- # Workaround: Double the data rate passed to PELT by interpolation ("stretch=2")
- pelt = PELT(with_multiprocessing=False, stretch=2, min_dist=1)
- expected_transition_start_timestamps = sync_timestamps[::2]
- transition_start_candidate_weights = list()
- drift = 0
-
- # TODO auch Kandidatenbestimmung per Ableitung probieren
- # (-> Umgebungsvariable zur Auswahl)
-
- pelt_traces = list()
- timestamps = list()
- candidate_weights = list()
-
- for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
- expected_end_ts = sync_timestamps[2 * i + 1]
- # assumption: maximum deviation between expected and actual timestamps is 5ms.
- # We use ±10ms to have some contetx for PELT
- et_timestamps_start = bisect_left(
- self.et_timestamps, expected_start_ts - 10e-3
- )
- et_timestamps_end = bisect_right(
- self.et_timestamps, expected_end_ts + 10e-3
- )
- timestamps.append(
- self.et_timestamps[et_timestamps_start : et_timestamps_end + 1]
- )
- pelt_traces.append(
- self.et_power_values[et_timestamps_start : et_timestamps_end + 1]
- )
-
- # TODO for greedy mode, perform changepoint detection between greedy steps
- # (-> the expected changepoint area is well-known, Dynp with 1/2 changepoints
- # should work much better than "somewhere in these 20ms there should be a transition")
-
- if os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY"):
- penalties = (int(os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY")),)
- else:
- penalties = (1, 2, 5, 10, 15, 20)
- for penalty in penalties:
- changepoints_by_transition = pelt.get_changepoints(
- pelt_traces, penalty=penalty
- )
- for i in range(len(expected_transition_start_timestamps)):
- candidate_weights.append(dict())
- for changepoint in changepoints_by_transition[i]:
- if changepoint in candidate_weights[i]:
- candidate_weights[i][changepoint] += 1
- else:
- candidate_weights[i][changepoint] = 1
-
- for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
-
- # TODO ist expected_start_ts wirklich eine gute Referenz? Wenn vor einer Transition ein UART-Dump
- # liegt, dürfte expected_end_ts besser sein, dann muss allerdings bei der compensation wieder auf
- # start_ts zurückgerechnet werden.
- transition_start_candidate_weights.append(
- list(
- map(
- lambda k: (
- timestamps[i][k] - expected_start_ts,
- timestamps[i][k] - expected_end_ts,
- candidate_weights[i][k],
- ),
- sorted(candidate_weights[i].keys()),
- )
- )
- )
-
- if os.getenv("DFATOOL_COMPENSATE_DRIFT_GREEDY"):
- return self.compensate_drift_greedy(
- sync_timestamps, transition_start_candidate_weights
- )
-
- return self.compensate_drift_graph(
- sync_timestamps, transition_start_candidate_weights
- )
-
- def compensate_drift_graph(
- self, sync_timestamps, transition_start_candidate_weights
- ):
- # Algorithm: Obtain the shortest path in a layered graph made up from
- # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition.
- # Each node in layer i contains a directed edge to each node in layer i+1.
- # The edge weight is the drift delta between the two nodes. So, if,
- # node X (transition i, candidate a) has a drift of 5, and node Y
- # (transition i+1, candidate b) has a drift of -2, the weight is 7.
- # The first and last layer of the graph consists of a single node
- # with a drift of 0, representing the start / end synchronization pulse, respectively.
-
- prev_nodes = [0]
- prev_drifts = [0]
- node_drifts = [0]
- edge_srcs = list()
- edge_dsts = list()
- csr_weights = list()
-
- # (transition index) -> [candidate 0/start node, candidate 0/end node, candidate 1/start node, ...]
- nodes_by_transition_index = dict()
-
- # (node number) -> (transition index, candidate index, is_end)
- # (-> transition_start_candidate_weights[transition index][candidate index][is_end])
- transition_by_node = dict()
-
- compensated_timestamps = list()
-
- # default: up to two nodes may be skipped
- max_skip_count = 2
-
- if os.getenv("DFATOOL_DC_MAX_SKIP"):
- max_skip_count = int(os.getenv("DFATOOL_DC_MAX_SKIP"))
-
- for transition_index, candidates in enumerate(
- transition_start_candidate_weights
- ):
- new_nodes = list()
- new_drifts = list()
- i_offset = prev_nodes[-1] + 1
- nodes_by_transition_index[transition_index] = list()
- for new_node_i, (new_drift_start, new_drift_end, _) in enumerate(
- candidates
- ):
- for is_end, new_drift in enumerate((new_drift_start, new_drift_end)):
- new_node = i_offset + new_node_i * 2 + is_end
- nodes_by_transition_index[transition_index].append(new_node)
- transition_by_node[new_node] = (
- transition_index,
- new_node_i,
- is_end,
- )
- new_nodes.append(new_node)
- new_drifts.append(new_drift)
- node_drifts.append(new_drift)
- for prev_node_i, prev_node in enumerate(prev_nodes):
- prev_drift = prev_drifts[prev_node_i]
-
- edge_srcs.append(prev_node)
- edge_dsts.append(new_node)
-
- delta_drift = np.abs(prev_drift - new_drift)
- # TODO evaluate "delta_drift ** 2" or similar nonlinear
- # weights -> further penalize large drift deltas
- csr_weights.append(delta_drift)
-
- # a transition's candidate list may be empty
- if len(new_nodes):
- prev_nodes = new_nodes
- prev_drifts = new_drifts
-
- # add an end node for shortest path search
- # (end node == final sync, so drift == 0)
- new_node = prev_nodes[-1] + 1
- for prev_node_i, prev_node in enumerate(prev_nodes):
- prev_drift = prev_drifts[prev_node_i]
- edge_srcs.append(prev_node)
- edge_dsts.append(new_node)
- csr_weights.append(np.abs(prev_drift))
-
- # Add "skip" edges spanning from transition i to transition i+n (n > 1).
- # These avoid synchronization errors caused by transitions wich are
- # not found by changepiont detection, as long as they are sufficiently rare.
- for transition_index, candidates in enumerate(
- transition_start_candidate_weights
- ):
- for skip_count in range(2, max_skip_count + 2):
- if transition_index < skip_count:
- continue
- for from_node in nodes_by_transition_index[
- transition_index - skip_count
- ]:
- for to_node in nodes_by_transition_index[transition_index]:
-
- (
- from_trans_i,
- from_candidate_i,
- from_is_end,
- ) = transition_by_node[from_node]
- to_trans_i, to_candidate_i, to_is_end = transition_by_node[
- to_node
- ]
-
- assert transition_index - skip_count == from_trans_i
- assert transition_index == to_trans_i
-
- from_drift = transition_start_candidate_weights[from_trans_i][
- from_candidate_i
- ][from_is_end]
- to_drift = transition_start_candidate_weights[to_trans_i][
- to_candidate_i
- ][to_is_end]
-
- edge_srcs.append(from_node)
- edge_dsts.append(to_node)
- csr_weights.append(
- np.abs(from_drift - to_drift) + (skip_count - 1) * 270e-6
- )
-
- sm = scipy.sparse.csr_matrix(
- (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1)
- )
- dm, predecessors = scipy.sparse.csgraph.shortest_path(
- sm, return_predecessors=True, indices=0
- )
-
- nodes = list()
- pred = predecessors[-1]
- while pred > 0:
- nodes.append(pred)
- pred = predecessors[pred]
-
- nodes = list(reversed(nodes))
-
- # first and last node are not included in "nodes" as they represent
- # the start/stop sync pulse (and not a transition with sync candidates)
-
- prev_transition = -1
- for i, node in enumerate(nodes):
- transition, _, _ = transition_by_node[node]
- drift = node_drifts[node]
-
- while transition - prev_transition > 1:
- prev_drift = node_drifts[nodes[i - 1]]
- prev_transition += 1
- expected_start_ts = sync_timestamps[prev_transition * 2] + prev_drift
- expected_end_ts = sync_timestamps[prev_transition * 2 + 1] + prev_drift
- compensated_timestamps.append(expected_start_ts)
- compensated_timestamps.append(expected_end_ts)
-
- if os.getenv("DFATOOL_PLOT_LASYNC") and self.offline_index == int(
- os.getenv("DFATOOL_PLOT_LASYNC")
- ):
- print(
- f"trans {transition:3d}: raw ({sync_timestamps[transition * 2]:.6f}, {sync_timestamps[transition * 2 + 1]:.6f}), candidate {transition_by_node[node]}"
- )
- print(
- f"trans {transition:3d} -> ({sync_timestamps[transition * 2] + drift:.6f}, {sync_timestamps[transition * 2 + 1] + drift:.6f})"
- )
-
- expected_start_ts = sync_timestamps[transition * 2] + drift
- expected_end_ts = sync_timestamps[transition * 2 + 1] + drift
- compensated_timestamps.append(expected_start_ts)
- compensated_timestamps.append(expected_end_ts)
- prev_transition = transition
-
- # handle skips over the last few transitions, if any
- transition = len(transition_start_candidate_weights) - 1
- while transition - prev_transition > 0:
- prev_drift = node_drifts[nodes[-1]]
- prev_transition += 1
- expected_start_ts = sync_timestamps[prev_transition * 2] + prev_drift
- expected_end_ts = sync_timestamps[prev_transition * 2 + 1] + prev_drift
- compensated_timestamps.append(expected_start_ts)
- compensated_timestamps.append(expected_end_ts)
-
- if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"):
- import json
- from dfatool.utils import NpEncoder
-
- expected_transition_start_timestamps = sync_timestamps[::2]
- filename = os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION")
- filename = f"{filename}.{self.offline_index}"
-
- with open(filename, "w") as f:
- json.dump(
- [
- expected_transition_start_timestamps,
- transition_start_candidate_weights,
- ],
- f,
- cls=NpEncoder,
- )
-
- return compensated_timestamps
-
- def compensate_drift_greedy(
- self, sync_timestamps, transition_start_candidate_weights
- ):
- drift = 0
- expected_transition_start_timestamps = sync_timestamps[::2]
- compensated_timestamps = list()
-
- for i, expected_start_ts in enumerate(expected_transition_start_timestamps):
- candidates = sorted(
- map(
- lambda x: x[0] + expected_start_ts,
- transition_start_candidate_weights[i],
- )
- )
- expected_start_ts += drift
- expected_end_ts = sync_timestamps[2 * i + 1] + drift
-
- # choose the next candidates around the expected sync point.
- start_right_sync = bisect_left(candidates, expected_start_ts)
- start_left_sync = start_right_sync - 1
-
- end_right_sync = bisect_left(candidates, expected_end_ts)
- end_left_sync = end_right_sync - 1
-
- if start_right_sync >= 0:
- start_left_diff = expected_start_ts - candidates[start_left_sync]
- else:
- start_left_diff = np.inf
-
- if start_right_sync < len(candidates):
- start_right_diff = candidates[start_right_sync] - expected_start_ts
- else:
- start_right_diff = np.inf
-
- if end_left_sync >= 0:
- end_left_diff = expected_end_ts - candidates[end_left_sync]
- else:
- end_left_diff = np.inf
-
- if end_right_sync < len(candidates):
- end_right_diff = candidates[end_right_sync] - expected_end_ts
- else:
- end_right_diff = np.inf
-
- drift_candidates = (
- start_left_diff,
- start_right_diff,
- end_left_diff,
- end_right_diff,
- )
- min_drift_i = np.argmin(drift_candidates)
- min_drift = min(drift_candidates)
-
- if min_drift < 5e-4:
- if min_drift_i % 2 == 0:
- # left
- compensated_timestamps.append(expected_start_ts - min_drift)
- compensated_timestamps.append(expected_end_ts - min_drift)
- drift -= min_drift
- else:
- # right
- compensated_timestamps.append(expected_start_ts + min_drift)
- compensated_timestamps.append(expected_end_ts + min_drift)
- drift += min_drift
-
- else:
- compensated_timestamps.append(expected_start_ts)
- compensated_timestamps.append(expected_end_ts)
-
- if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"):
- import json
- from dfatool.utils import NpEncoder
-
- expected_transition_start_timestamps = sync_timestamps[::2]
-
- with open(os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"), "w") as f:
- json.dump(
- [
- expected_transition_start_timestamps,
- transition_start_candidate_weights,
- ],
- f,
- cls=NpEncoder,
- )
-
- return compensated_timestamps
-
- def export_sync(self):
- # [1st trans start, 1st trans stop, 2nd trans start, 2nd trans stop, ...]
- sync_timestamps = list()
-
- for i in range(4, len(self.sync_timestamps) - 8, 2):
- sync_timestamps.append(
- (self.sync_timestamps[i], self.sync_timestamps[i + 1])
- )
-
- # EnergyTrace timestamps
- timestamps = self.et_timestamps
-
- # EnergyTrace power values
- power = self.et_power_values
-
- return {"sync": sync_timestamps, "timestamps": timestamps, "power": power}
-
- def plot(self, annotateData=None):
- """
- Plots the power usage and the timestamps by logic analyzer
-
- :param annotateData: List of Strings with labels, only needed if annotated plots are wished
- :return: None
- """
-
- def calculateRectangleCurve(timestamps, min_value=0, max_value=0.160):
- import numpy as np
-
- data = []
- for ts in timestamps:
- data.append(ts)
- data.append(ts)
-
- a = np.empty((len(data),))
- a[0::4] = min_value
- a[1::4] = max_value
- a[2::4] = max_value
- a[3::4] = min_value
- return data, a # plotting by columns
-
- import matplotlib.pyplot as plt
-
- fig, ax = plt.subplots()
-
- if annotateData:
- annot = ax.annotate(
- "",
- xy=(0, 0),
- xytext=(20, 20),
- textcoords="offset points",
- bbox=dict(boxstyle="round", fc="w"),
- arrowprops=dict(arrowstyle="->"),
- )
- annot.set_visible(True)
-
- rectCurve_with_drift = calculateRectangleCurve(
- self.sync_timestamps, max_value=max(self.et_power_values)
- )
-
- plt.plot(self.et_timestamps, self.et_power_values, label="Leistung")
- plt.plot(self.et_timestamps, np.gradient(self.et_power_values), label="dP/dt")
-
- plt.plot(
- rectCurve_with_drift[0],
- rectCurve_with_drift[1],
- "-g",
- label="Synchronisationsignale mit Driftfaktor",
- )
-
- plt.xlabel("Zeit von EnergyTrace [s]")
- plt.ylabel("Leistung [W]")
- leg = plt.legend()
-
- def getDataText(x):
- # print(x)
- dl = len(annotateData)
- for i, xt in enumerate(self.sync_timestamps):
- if xt > x and i >= 4 and i - 5 < dl:
- return f"SoT: {annotateData[i - 5]}"
-
- def update_annot(x, y, name):
- annot.xy = (x, y)
- text = name
-
- annot.set_text(text)
- annot.get_bbox_patch().set_alpha(0.4)
-
- def hover(event):
- if event.xdata and event.ydata:
- annot.set_visible(False)
- update_annot(event.xdata, event.ydata, getDataText(event.xdata))
- annot.set_visible(True)
- fig.canvas.draw_idle()
-
- if annotateData:
- fig.canvas.mpl_connect("motion_notify_event", hover)
-
- plt.show()
-
- def getStatesdfatool(self, state_sleep, with_traces=False, algorithm=False):
- """
- Calculates the length and energy usage of the states
-
- :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle
- :param algorithm: possible usage of accuracy algorithm / not implemented yet
- :returns: returns list of states and transitions, starting with a transition and ending with astate
- Each element is a dict containing:
- * `isa`: 'state' or 'transition'
- * `W_mean`: Mittelwert der Leistungsaufnahme
- * `W_std`: Standardabweichung der Leistungsaufnahme
- * `s`: Dauer
- """
- if algorithm:
- raise NotImplementedError
- end_transition_ts = None
- timestamps_sync_start = 0
- energy_trace_new = list()
-
- # sync_timestamps[3] is the start of the first (UNINITIALIZED) state (and the end of the benchmark-start sync pulse)
- # sync_timestamps[-8] is the end of the final state and the corresponding UART dump (and the start of the benchmark-end sync pulses)
- self.trigger_high_precision_timestamps = self.sync_timestamps[3:-7]
-
- self.trigger_edges = list()
- for ts in self.trigger_high_precision_timestamps:
- # Let ts be the trigger timestamp corresponding to the end of a transition.
- # We are looking for an index i such that et_timestamps[i-1] <= ts < et_timestamps[i].
- # Then, et_power_values[i] (the mean power in the interval et_timestamps[i-1] .. et_timestamps[i]) is affected by the transition and
- # et_power_values[i+1] is not affected by it.
- #
- # bisect_right does just what we need; bisect_left would correspond to et_timestamps[i-1] < ts <= et_timestamps[i].
- # Not that this is a moot point in practice, as ts ≠ et_timestamps[j] for almost all j. Also, the resolution of
- # et_timestamps is several decades lower than the resolution of trigger_high_precision_timestamps.
- self.trigger_edges.append(bisect_right(self.et_timestamps, ts))
-
- # Loop over transitions. We start at the end of the first transition and handle the transition and the following state.
- # We then proceed to the end of the second transition, etc.
- for i in range(2, len(self.trigger_high_precision_timestamps), 2):
- prev_state_start_index = self.trigger_edges[i - 2]
- prev_state_stop_index = self.trigger_edges[i - 1]
- transition_start_index = self.trigger_edges[i - 1]
- transition_stop_index = self.trigger_edges[i]
- state_start_index = self.trigger_edges[i]
- state_stop_index = self.trigger_edges[i + 1]
-
- # If a transition takes less time than the energytrace measurement interval, its start and stop index may be the same.
- # In this case, et_power_values[transition_start_index] is the only data point affected by the transition.
- # We use the et_power_values slice [transition_start_index, transition_stop_index) to determine the mean power, so we need
- # to increment transition_stop_index by 1 to end at et_power_values[transition_start_index]
- # (as et_power_values[transition_start_index : transition_start_index+1 ] == [et_power_values[transition_start_index])
- if transition_stop_index == transition_start_index:
- transition_stop_index += 1
-
- prev_state_duration = (
- self.trigger_high_precision_timestamps[i + 1]
- - self.trigger_high_precision_timestamps[i]
- )
- transition_duration = (
- self.trigger_high_precision_timestamps[i]
- - self.trigger_high_precision_timestamps[i - 1]
- )
- state_duration = (
- self.trigger_high_precision_timestamps[i + 1]
- - self.trigger_high_precision_timestamps[i]
- )
-
- # some states are followed by a UART dump of log data. This causes an increase in CPU energy
- # consumption and is not part of the peripheral behaviour, so it should not be part of the benchmark results.
- # If a case is followed by a UART dump, its duration is longer than the sleep duration between two transitions.
- # In this case, we re-calculate the stop index, and calculate the state duration from coarse energytrace data
- # instead of high-precision sync data
- if (
- self.et_timestamps[prev_state_stop_index]
- - self.et_timestamps[prev_state_start_index]
- > state_sleep
- ):
- prev_state_stop_index = bisect_right(
- self.et_timestamps,
- self.et_timestamps[prev_state_start_index] + state_sleep,
- )
- prev_state_duration = (
- self.et_timestamps[prev_state_stop_index]
- - self.et_timestamps[prev_state_start_index]
- )
-
- if (
- self.et_timestamps[state_stop_index]
- - self.et_timestamps[state_start_index]
- > state_sleep
- ):
- state_stop_index = bisect_right(
- self.et_timestamps,
- self.et_timestamps[state_start_index] + state_sleep,
- )
- state_duration = (
- self.et_timestamps[state_stop_index]
- - self.et_timestamps[state_start_index]
- )
-
- prev_state_power = self.et_power_values[
- prev_state_start_index:prev_state_stop_index
- ]
-
- transition_timestamps = self.et_timestamps[
- transition_start_index:transition_stop_index
- ]
- transition_power = self.et_power_values[
- transition_start_index:transition_stop_index
- ]
-
- state_timestamps = self.et_timestamps[state_start_index:state_stop_index]
- state_power = self.et_power_values[state_start_index:state_stop_index]
-
- transition = {
- "isa": "transition",
- "W_mean": np.mean(transition_power),
- "W_std": np.std(transition_power),
- "s": transition_duration,
- "count_dp": len(transition_power),
- }
- if with_traces:
- transition["plot"] = (
- transition_timestamps - transition_timestamps[0],
- transition_power,
- )
-
- state = {
- "isa": "state",
- "W_mean": np.mean(state_power),
- "W_std": np.std(state_power),
- "s": state_duration,
- }
- if with_traces:
- state["plot"] = (state_timestamps - state_timestamps[0], state_power)
-
- transition["W_mean_delta_prev"] = transition["W_mean"] - np.mean(
- prev_state_power
- )
- transition["W_mean_delta_next"] = transition["W_mean"] - state["W_mean"]
-
- energy_trace_new.append(transition)
- energy_trace_new.append(state)
-
- return energy_trace_new
diff --git a/lib/lennart/EnergyInterface.py b/lib/lennart/EnergyInterface.py
deleted file mode 100644
index 55bf7c1..0000000
--- a/lib/lennart/EnergyInterface.py
+++ /dev/null
@@ -1,89 +0,0 @@
-import re
-import subprocess
-
-from dfatool.lennart.DataInterface import DataInterface
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class EnergyInterface(DataInterface):
- def __init__(
- self,
- duration_seconds=10,
- console_output=False,
- temp_file="temp/energytrace.log",
- fake=False,
- ):
- """
- class is not used in embedded into dfatool.
-
- :param duration_seconds: seconds the EnergyTrace should be running
- :param console_output: if EnergyTrace output should be printed to the user
- :param temp_file: file path for temporary file
- :param fake: if already existing file should be used
- """
- self.energytrace = None
- self.duration_seconds = duration_seconds
- self.console_output = console_output
- self.temp_file = temp_file
- self.fake = fake
-
- def runMeasure(self):
- """
- starts the measurement, with waiting for done
- """
- if self.fake:
- return
- self.runMeasureAsynchronously()
- self.waitForAsynchronousMeasure()
-
- def runMeasureAsynchronously(self):
- """
- starts the measurement, not waiting for done
- """
- if self.fake:
- return
- self.energytrace = subprocess.Popen(
- "msp430-etv --save %s %s %s"
- % (
- self.temp_file,
- self.duration_seconds,
- "" if self.console_output else "> /dev/null",
- ),
- shell=True,
- )
- print(
- "msp430-etv --save %s %s %s"
- % (
- self.temp_file,
- self.duration_seconds,
- "" if self.console_output else "> /dev/null",
- )
- )
-
- def waitForAsynchronousMeasure(self):
- """
- Wait until is command call is done
- """
- if self.fake:
- return
- self.energytrace.wait()
-
- def setFile(self, path):
- """
- changeing the temporary file
-
- :param path: file path of new temp file
- :return: None
- """
- self.temp_file = path
- pass
-
- def forceStopMeasure(self):
- """
- force stops the Measurement, with signals
- :return: None
- """
- self.energytrace.send_signal(subprocess.signal.SIGINT)
- stdout, stderr = self.energytrace.communicate(timeout=15)
diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py
deleted file mode 100644
index 44da678..0000000
--- a/lib/lennart/SigrokAPIInterface.py
+++ /dev/null
@@ -1,150 +0,0 @@
-import time
-
-from dfatool.lennart.SigrokInterface import SigrokInterface
-
-import sigrok.core as sr
-from sigrok.core.classes import *
-
-from util.ByteHelper import ByteHelper
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SigrokAPIInterface(SigrokInterface):
- def datafeed_changes(self, device, packet):
- """
- Callback type with changes analysis
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- for x in data[1::2]:
- self.analyzeData(x)
-
- def datafeed_in_all(self, device, packet):
- """
- Callback type which writes all data into the array
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- self.all_data += data[1::2]
-
- def datafeed_file(self, device, packet):
- """
- Callback type which writes all data into a file
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- for x in data[1::2]:
- self.file.write(str(x) + "\n")
-
- def __init__(
- self,
- driver="fx2lafw",
- sample_rate=100_000,
- debug_output=False,
- used_datafeed=datafeed_changes,
- fake=False,
- ):
- """
-
- :param driver: Driver that should be used
- :param sample_rate: The sample rate of the Logic analyzer
- :param debug_output: Should be true if output should be displayed to user
- :param used_datafeed: one of the datafeeds above, user later as callback.
- :param fake:
- """
- super(SigrokAPIInterface, self).__init__(sample_rate)
- if fake:
- raise NotImplementedError("Not implemented!")
- self.used_datafeed = used_datafeed
-
- self.debug_output = debug_output
- self.session = None
-
- def forceStopMeasure(self):
- """
- Force stopping the measurement
- :return: None
- """
- self.session.stop()
-
- def runMeasure(self):
- """
- Start the Measurement and set all settings
- """
- context = sr.Context_create()
-
- devs = context.drivers[self.driver].scan()
- # print(devs)
- if len(devs) == 0:
- raise RuntimeError("No device with that driver found!")
- sigrokDevice = devs[0]
- if len(devs) > 1:
- raise Warning(
- "Attention! Multiple devices with that driver found! Using ",
- sigrokDevice.connection_id(),
- )
-
- sigrokDevice.open()
- sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)
-
- enabled_channels = ["D1"]
- for channel in sigrokDevice.channels:
- channel.enabled = channel.name in enabled_channels
-
- self.session = context.create_session()
- self.session.add_device(sigrokDevice)
- self.session.start()
-
- self.output = context.output_formats["binary"].create_output(sigrokDevice)
-
- print(context.output_formats)
- self.all_data = b""
-
- def datafeed(device, packet):
- self.used_datafeed(self, device, packet)
-
- self.session.add_datafeed_callback(datafeed)
- time_running = time.time()
- self.session.run()
- total_time = time.time() - time_running
- print(
- "Used time: ",
- total_time * 1_000_000,
- "µs",
- )
- self.session.stop()
-
- if self.debug_output:
- # if self.used_datafeed == self.datafeed_in_change:
- if True:
- changes = [x / self.sample_rate for x in self.changes]
- print(changes)
- is_on = self.start == 0xFF
- print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
- for x in range(len(changes) - 1):
- is_on = not is_on
- print(
- changes[x],
- " - ",
- changes[x + 1],
- " / ",
- changes[x + 1] - changes[x],
- " # Pin ",
- "HIGH" if is_on else "LOW",
- )
- elif self.used_datafeed == self.datafeed_in_all:
- print(self.all_data)
diff --git a/lib/lennart/SigrokCLIInterface.py b/lib/lennart/SigrokCLIInterface.py
index b28a8a9..600c00f 100644
--- a/lib/lennart/SigrokCLIInterface.py
+++ b/lib/lennart/SigrokCLIInterface.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import subprocess
import time
diff --git a/lib/lennart/SigrokInterface.py b/lib/lennart/SigrokInterface.py
index 32e8fe2..a8b392f 100644
--- a/lib/lennart/SigrokInterface.py
+++ b/lib/lennart/SigrokInterface.py
@@ -1,7 +1,8 @@
+#!/usr/bin/env python3
+
import json
import numpy as np
-from dfatool.lennart.DataInterface import DataInterface
import logging
logger = logging.getLogger(__name__)
@@ -64,7 +65,7 @@ class SigrokResult:
pass
-class SigrokInterface(DataInterface):
+class SigrokInterface:
def __init__(self, sample_rate, driver="fx2lafw", filename="temp/sigrok.log"):
"""