diff options
Diffstat (limited to 'lib/lennart')
-rw-r--r-- | lib/lennart/DataInterface.py | 24 | ||||
-rw-r--r-- | lib/lennart/DataProcessor.py | 825 | ||||
-rw-r--r-- | lib/lennart/EnergyInterface.py | 89 | ||||
-rw-r--r-- | lib/lennart/SigrokAPIInterface.py | 150 | ||||
-rw-r--r-- | lib/lennart/SigrokCLIInterface.py | 2 | ||||
-rw-r--r-- | lib/lennart/SigrokInterface.py | 5 |
6 files changed, 5 insertions, 1090 deletions
diff --git a/lib/lennart/DataInterface.py b/lib/lennart/DataInterface.py deleted file mode 100644 index 4495db2..0000000 --- a/lib/lennart/DataInterface.py +++ /dev/null @@ -1,24 +0,0 @@ -class DataInterface: - def runMeasure(self): - """ - Implemented in subclasses. - - Starts the measurement - """ - raise NotImplementedError("The method not implemented") - - def getData(self): - """ - Implemented in subclasses - - :returns: gathered data - """ - raise NotImplementedError("The method not implemented") - - def forceStopMeasure(self): - """ - Implemented in subclasses - - Force stops the measurement - """ - raise NotImplementedError("The method not implemented") diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py deleted file mode 100644 index c211beb..0000000 --- a/lib/lennart/DataProcessor.py +++ /dev/null @@ -1,825 +0,0 @@ -#!/usr/bin/env python3 - -# XXX deprecated and unused -import numpy as np -import logging -import os -import scipy -from bisect import bisect_left, bisect_right - -logger = logging.getLogger(__name__) - - -class DataProcessor: - def __init__( - self, - sync_data, - et_timestamps, - et_power, - hw_statechange_indexes=list(), - offline_index=None, - ): - """ - Creates DataProcessor object. - - :param sync_data: input timestamps (SigrokResult) - :param energy_data: List of EnergyTrace datapoints - """ - self.raw_sync_timestamps = [] - # high-precision LA/Timer timestamps at synchronization events - self.sync_timestamps = [] - # low-precision energytrace timestamps - self.et_timestamps = et_timestamps - # energytrace power values - self.et_power_values = et_power - self.hw_statechange_indexes = hw_statechange_indexes - self.offline_index = offline_index - self.sync_data = sync_data - self.start_offset = 0 - - # TODO determine automatically based on minimum (or p1) power draw over measurement area + X - # use 0.02 for HFXT runs - self.power_sync_watt = 0.011 - self.power_sync_len = 0.7 - self.power_sync_max_outliers = 2 - - def run(self): - """ - Main Function to remove unwanted data, get synchronization points, add the offset and add drift. - :return: None - """ - - # Remove bogus data before / after the measurement - - time_stamp_data = self.sync_data - for x in range(1, len(time_stamp_data)): - if time_stamp_data[x] - time_stamp_data[x - 1] > 1.3: - time_stamp_data = time_stamp_data[x:] - break - - for x in reversed(range(1, len(time_stamp_data))): - if time_stamp_data[x] - time_stamp_data[x - 1] > 1.3: - time_stamp_data = time_stamp_data[:x] - break - - # Each synchronization pulse consists of two LogicAnalyzer pulses, so four - # entries in time_stamp_data (rising edge, falling edge, rising edge, falling edge). - # If we have less then twelve entries, we observed no transitions and don't even have - # valid synchronization data. In this case, we bail out. - if len(time_stamp_data) < 12: - raise RuntimeError( - f"LogicAnalyzer sync data has length {len(time_stamp_data)}, expected >= 12" - ) - - self.raw_sync_timestamps = time_stamp_data - - # NEW - datasync_timestamps = [] - sync_start = 0 - outliers = 0 - pre_outliers_ts = None - # TODO only consider the first few and the last few seconds for sync points - for i, timestamp in enumerate(self.et_timestamps): - power = self.et_power_values[i] - if power > 0: - if power > self.power_sync_watt: - if sync_start is None: - sync_start = timestamp - outliers = 0 - else: - # Sync point over or outliers - if outliers == 0: - pre_outliers_ts = timestamp - outliers += 1 - if outliers > self.power_sync_max_outliers: - if sync_start is not None: - if (pre_outliers_ts - sync_start) > self.power_sync_len: - datasync_timestamps.append( - (sync_start, pre_outliers_ts) - ) - sync_start = None - - if power > self.power_sync_watt: - if (self.et_timestamps[-1] - sync_start) > self.power_sync_len: - datasync_timestamps.append((sync_start, pre_outliers_ts)) - - # time_stamp_data contains an entry for each level change on the Logic Analyzer input. - # So, time_stamp_data[0] is the first low-to-high transition, time_stamp_data[2] the second, etc. - # -> time_stamp_data[2] is the low-to-high transition indicating the end of the first sync pulse - # -> time_stamp_data[-8] is the low-to-high transition indicating the start of the first after-measurement sync pulse - - start_timestamp = datasync_timestamps[0][1] - start_offset = start_timestamp - time_stamp_data[2] - - end_timestamp = datasync_timestamps[-2][0] - end_offset = end_timestamp - (time_stamp_data[-8] + start_offset) - logger.debug( - f"Iteration #{self.offline_index}: Measurement area: ET timestamp range [{start_timestamp}, {end_timestamp}]" - ) - logger.debug( - f"Iteration #{self.offline_index}: Measurement area: LA timestamp range [{time_stamp_data[2]}, {time_stamp_data[-8]}]" - ) - logger.debug( - f"Iteration #{self.offline_index}: Start/End offsets: {start_offset} / {end_offset}" - ) - - if abs(end_offset) > 10: - raise RuntimeError( - f"Iteration #{self.offline_index}: synchronization end_offset == {end_offset}. It should be no more than a few seconds." - ) - - # adjust start offset - with_offset = np.array(time_stamp_data) + start_offset - logger.debug( - f"Iteration #{self.offline_index}: Measurement area with offset: LA timestamp range [{with_offset[2]}, {with_offset[-8]}]" - ) - - # adjust stop offset (may be different from start offset due to drift caused by - # random temperature fluctuations) - with_drift = self.addDrift( - with_offset, end_timestamp, end_offset, start_timestamp - ) - logger.debug( - f"Iteration #{self.offline_index}: Measurement area with drift: LA timestamp range [{with_drift[2]}, {with_drift[-8]}]" - ) - - self.sync_timestamps = with_drift - - # adjust intermediate timestamps. There is a small error between consecutive measurements, - # again due to drift caused by random temperature fluctuation. The error increases with - # increased distance from synchronization points: It is negligible at the start and end - # of the measurement and may be quite high around the middle. That's just the bounds, though -- - # you may also have a low error in the middle and error peaks elsewhere. - # As the start and stop timestamps have already been synchronized, we only adjust - # actual transition timestamps here. - if os.getenv("DFATOOL_COMPENSATE_DRIFT"): - if len(self.hw_statechange_indexes): - # measurement was performed with EnergyTrace++ - # (i.e., with cpu state annotations) - with_drift_compensation = self.compensateDriftPlusplus(with_drift[4:-8]) - else: - with_drift_compensation = self.compensateDrift(with_drift[4:-8]) - try: - self.sync_timestamps[4:-8] = with_drift_compensation - except ValueError: - logger.error( - f"Iteration #{self.offline_index}: drift-compensated sequence is too short ({len(with_drift_compensation)}/{len(self.sync_timestamps[4:-8])-1})" - ) - raise - - def addDrift(self, input_timestamps, end_timestamp, end_offset, start_timestamp): - """ - Add drift to datapoints - - :param input_timestamps: List of timestamps (float list) - :param end_timestamp: Timestamp of first EnergyTrace datapoint at the second-to-last sync point - :param end_offset: the time between end_timestamp and the timestamp of synchronisation signal - :param start_timestamp: Timestamp of last EnergyTrace datapoint at the first sync point - :return: List of modified timestamps (float list) - """ - endFactor = 1 + (end_offset / ((end_timestamp - end_offset) - start_timestamp)) - # endFactor assumes that the end of the first sync pulse is at timestamp 0. - # Then, timestamps with drift := timestamps * endFactor. - # As this is not the case (the first sync pulse ends at start_timestamp > 0), we shift the data by first - # removing start_timestamp, then multiplying with endFactor, and then re-adding the start_timestamp. - sync_timestamps_with_drift = ( - input_timestamps - start_timestamp - ) * endFactor + start_timestamp - return sync_timestamps_with_drift - - def compensateDriftPlusplus(self, sync_timestamps): - """Use hardware state changes reported by EnergyTrace++ to determine transition timestamps.""" - expected_transition_start_timestamps = sync_timestamps[::2] - compensated_timestamps = list() - drift = 0 - for i, expected_start_ts in enumerate(expected_transition_start_timestamps): - expected_end_ts = sync_timestamps[i * 2 + 1] - et_timestamps_start = bisect_left( - self.et_timestamps, expected_start_ts - 5e-3 - ) - et_timestamps_end = bisect_right( - self.et_timestamps, expected_start_ts + 5e-3 - ) - - candidate_indexes = list() - for index in self.hw_statechange_indexes: - if et_timestamps_start <= index <= et_timestamps_end: - candidate_indexes.append(index) - - if len(candidate_indexes) == 2: - drift = self.et_timestamps[candidate_indexes[0]] - expected_start_ts - - compensated_timestamps.append(expected_start_ts + drift) - compensated_timestamps.append(expected_end_ts + drift) - - return compensated_timestamps - - def compensateDrift(self, sync_timestamps): - """Use ruptures (e.g. Pelt, Dynp) to determine transition timestamps.""" - from dfatool.pelt import PELT - - # "rbf" und "l2" scheinen ähnlich gut zu funktionieren, l2 ist schneller. l1 ist wohl noch besser. - # PELT does not find changepoints for transitions which span just four or five data points (i.e., transitions shorter than ~2ms). - # Workaround: Double the data rate passed to PELT by interpolation ("stretch=2") - pelt = PELT(with_multiprocessing=False, stretch=2, min_dist=1) - expected_transition_start_timestamps = sync_timestamps[::2] - transition_start_candidate_weights = list() - drift = 0 - - # TODO auch Kandidatenbestimmung per Ableitung probieren - # (-> Umgebungsvariable zur Auswahl) - - pelt_traces = list() - timestamps = list() - candidate_weights = list() - - for i, expected_start_ts in enumerate(expected_transition_start_timestamps): - expected_end_ts = sync_timestamps[2 * i + 1] - # assumption: maximum deviation between expected and actual timestamps is 5ms. - # We use ±10ms to have some contetx for PELT - et_timestamps_start = bisect_left( - self.et_timestamps, expected_start_ts - 10e-3 - ) - et_timestamps_end = bisect_right( - self.et_timestamps, expected_end_ts + 10e-3 - ) - timestamps.append( - self.et_timestamps[et_timestamps_start : et_timestamps_end + 1] - ) - pelt_traces.append( - self.et_power_values[et_timestamps_start : et_timestamps_end + 1] - ) - - # TODO for greedy mode, perform changepoint detection between greedy steps - # (-> the expected changepoint area is well-known, Dynp with 1/2 changepoints - # should work much better than "somewhere in these 20ms there should be a transition") - - if os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY"): - penalties = (int(os.getenv("DFATOOL_DRIFT_COMPENSATION_PENALTY")),) - else: - penalties = (1, 2, 5, 10, 15, 20) - for penalty in penalties: - changepoints_by_transition = pelt.get_changepoints( - pelt_traces, penalty=penalty - ) - for i in range(len(expected_transition_start_timestamps)): - candidate_weights.append(dict()) - for changepoint in changepoints_by_transition[i]: - if changepoint in candidate_weights[i]: - candidate_weights[i][changepoint] += 1 - else: - candidate_weights[i][changepoint] = 1 - - for i, expected_start_ts in enumerate(expected_transition_start_timestamps): - - # TODO ist expected_start_ts wirklich eine gute Referenz? Wenn vor einer Transition ein UART-Dump - # liegt, dürfte expected_end_ts besser sein, dann muss allerdings bei der compensation wieder auf - # start_ts zurückgerechnet werden. - transition_start_candidate_weights.append( - list( - map( - lambda k: ( - timestamps[i][k] - expected_start_ts, - timestamps[i][k] - expected_end_ts, - candidate_weights[i][k], - ), - sorted(candidate_weights[i].keys()), - ) - ) - ) - - if os.getenv("DFATOOL_COMPENSATE_DRIFT_GREEDY"): - return self.compensate_drift_greedy( - sync_timestamps, transition_start_candidate_weights - ) - - return self.compensate_drift_graph( - sync_timestamps, transition_start_candidate_weights - ) - - def compensate_drift_graph( - self, sync_timestamps, transition_start_candidate_weights - ): - # Algorithm: Obtain the shortest path in a layered graph made up from - # transition candidates. Each node represents a transition candidate timestamp, and each layer represents a transition. - # Each node in layer i contains a directed edge to each node in layer i+1. - # The edge weight is the drift delta between the two nodes. So, if, - # node X (transition i, candidate a) has a drift of 5, and node Y - # (transition i+1, candidate b) has a drift of -2, the weight is 7. - # The first and last layer of the graph consists of a single node - # with a drift of 0, representing the start / end synchronization pulse, respectively. - - prev_nodes = [0] - prev_drifts = [0] - node_drifts = [0] - edge_srcs = list() - edge_dsts = list() - csr_weights = list() - - # (transition index) -> [candidate 0/start node, candidate 0/end node, candidate 1/start node, ...] - nodes_by_transition_index = dict() - - # (node number) -> (transition index, candidate index, is_end) - # (-> transition_start_candidate_weights[transition index][candidate index][is_end]) - transition_by_node = dict() - - compensated_timestamps = list() - - # default: up to two nodes may be skipped - max_skip_count = 2 - - if os.getenv("DFATOOL_DC_MAX_SKIP"): - max_skip_count = int(os.getenv("DFATOOL_DC_MAX_SKIP")) - - for transition_index, candidates in enumerate( - transition_start_candidate_weights - ): - new_nodes = list() - new_drifts = list() - i_offset = prev_nodes[-1] + 1 - nodes_by_transition_index[transition_index] = list() - for new_node_i, (new_drift_start, new_drift_end, _) in enumerate( - candidates - ): - for is_end, new_drift in enumerate((new_drift_start, new_drift_end)): - new_node = i_offset + new_node_i * 2 + is_end - nodes_by_transition_index[transition_index].append(new_node) - transition_by_node[new_node] = ( - transition_index, - new_node_i, - is_end, - ) - new_nodes.append(new_node) - new_drifts.append(new_drift) - node_drifts.append(new_drift) - for prev_node_i, prev_node in enumerate(prev_nodes): - prev_drift = prev_drifts[prev_node_i] - - edge_srcs.append(prev_node) - edge_dsts.append(new_node) - - delta_drift = np.abs(prev_drift - new_drift) - # TODO evaluate "delta_drift ** 2" or similar nonlinear - # weights -> further penalize large drift deltas - csr_weights.append(delta_drift) - - # a transition's candidate list may be empty - if len(new_nodes): - prev_nodes = new_nodes - prev_drifts = new_drifts - - # add an end node for shortest path search - # (end node == final sync, so drift == 0) - new_node = prev_nodes[-1] + 1 - for prev_node_i, prev_node in enumerate(prev_nodes): - prev_drift = prev_drifts[prev_node_i] - edge_srcs.append(prev_node) - edge_dsts.append(new_node) - csr_weights.append(np.abs(prev_drift)) - - # Add "skip" edges spanning from transition i to transition i+n (n > 1). - # These avoid synchronization errors caused by transitions wich are - # not found by changepiont detection, as long as they are sufficiently rare. - for transition_index, candidates in enumerate( - transition_start_candidate_weights - ): - for skip_count in range(2, max_skip_count + 2): - if transition_index < skip_count: - continue - for from_node in nodes_by_transition_index[ - transition_index - skip_count - ]: - for to_node in nodes_by_transition_index[transition_index]: - - ( - from_trans_i, - from_candidate_i, - from_is_end, - ) = transition_by_node[from_node] - to_trans_i, to_candidate_i, to_is_end = transition_by_node[ - to_node - ] - - assert transition_index - skip_count == from_trans_i - assert transition_index == to_trans_i - - from_drift = transition_start_candidate_weights[from_trans_i][ - from_candidate_i - ][from_is_end] - to_drift = transition_start_candidate_weights[to_trans_i][ - to_candidate_i - ][to_is_end] - - edge_srcs.append(from_node) - edge_dsts.append(to_node) - csr_weights.append( - np.abs(from_drift - to_drift) + (skip_count - 1) * 270e-6 - ) - - sm = scipy.sparse.csr_matrix( - (csr_weights, (edge_srcs, edge_dsts)), shape=(new_node + 1, new_node + 1) - ) - dm, predecessors = scipy.sparse.csgraph.shortest_path( - sm, return_predecessors=True, indices=0 - ) - - nodes = list() - pred = predecessors[-1] - while pred > 0: - nodes.append(pred) - pred = predecessors[pred] - - nodes = list(reversed(nodes)) - - # first and last node are not included in "nodes" as they represent - # the start/stop sync pulse (and not a transition with sync candidates) - - prev_transition = -1 - for i, node in enumerate(nodes): - transition, _, _ = transition_by_node[node] - drift = node_drifts[node] - - while transition - prev_transition > 1: - prev_drift = node_drifts[nodes[i - 1]] - prev_transition += 1 - expected_start_ts = sync_timestamps[prev_transition * 2] + prev_drift - expected_end_ts = sync_timestamps[prev_transition * 2 + 1] + prev_drift - compensated_timestamps.append(expected_start_ts) - compensated_timestamps.append(expected_end_ts) - - if os.getenv("DFATOOL_PLOT_LASYNC") and self.offline_index == int( - os.getenv("DFATOOL_PLOT_LASYNC") - ): - print( - f"trans {transition:3d}: raw ({sync_timestamps[transition * 2]:.6f}, {sync_timestamps[transition * 2 + 1]:.6f}), candidate {transition_by_node[node]}" - ) - print( - f"trans {transition:3d} -> ({sync_timestamps[transition * 2] + drift:.6f}, {sync_timestamps[transition * 2 + 1] + drift:.6f})" - ) - - expected_start_ts = sync_timestamps[transition * 2] + drift - expected_end_ts = sync_timestamps[transition * 2 + 1] + drift - compensated_timestamps.append(expected_start_ts) - compensated_timestamps.append(expected_end_ts) - prev_transition = transition - - # handle skips over the last few transitions, if any - transition = len(transition_start_candidate_weights) - 1 - while transition - prev_transition > 0: - prev_drift = node_drifts[nodes[-1]] - prev_transition += 1 - expected_start_ts = sync_timestamps[prev_transition * 2] + prev_drift - expected_end_ts = sync_timestamps[prev_transition * 2 + 1] + prev_drift - compensated_timestamps.append(expected_start_ts) - compensated_timestamps.append(expected_end_ts) - - if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"): - import json - from dfatool.utils import NpEncoder - - expected_transition_start_timestamps = sync_timestamps[::2] - filename = os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION") - filename = f"{filename}.{self.offline_index}" - - with open(filename, "w") as f: - json.dump( - [ - expected_transition_start_timestamps, - transition_start_candidate_weights, - ], - f, - cls=NpEncoder, - ) - - return compensated_timestamps - - def compensate_drift_greedy( - self, sync_timestamps, transition_start_candidate_weights - ): - drift = 0 - expected_transition_start_timestamps = sync_timestamps[::2] - compensated_timestamps = list() - - for i, expected_start_ts in enumerate(expected_transition_start_timestamps): - candidates = sorted( - map( - lambda x: x[0] + expected_start_ts, - transition_start_candidate_weights[i], - ) - ) - expected_start_ts += drift - expected_end_ts = sync_timestamps[2 * i + 1] + drift - - # choose the next candidates around the expected sync point. - start_right_sync = bisect_left(candidates, expected_start_ts) - start_left_sync = start_right_sync - 1 - - end_right_sync = bisect_left(candidates, expected_end_ts) - end_left_sync = end_right_sync - 1 - - if start_right_sync >= 0: - start_left_diff = expected_start_ts - candidates[start_left_sync] - else: - start_left_diff = np.inf - - if start_right_sync < len(candidates): - start_right_diff = candidates[start_right_sync] - expected_start_ts - else: - start_right_diff = np.inf - - if end_left_sync >= 0: - end_left_diff = expected_end_ts - candidates[end_left_sync] - else: - end_left_diff = np.inf - - if end_right_sync < len(candidates): - end_right_diff = candidates[end_right_sync] - expected_end_ts - else: - end_right_diff = np.inf - - drift_candidates = ( - start_left_diff, - start_right_diff, - end_left_diff, - end_right_diff, - ) - min_drift_i = np.argmin(drift_candidates) - min_drift = min(drift_candidates) - - if min_drift < 5e-4: - if min_drift_i % 2 == 0: - # left - compensated_timestamps.append(expected_start_ts - min_drift) - compensated_timestamps.append(expected_end_ts - min_drift) - drift -= min_drift - else: - # right - compensated_timestamps.append(expected_start_ts + min_drift) - compensated_timestamps.append(expected_end_ts + min_drift) - drift += min_drift - - else: - compensated_timestamps.append(expected_start_ts) - compensated_timestamps.append(expected_end_ts) - - if os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"): - import json - from dfatool.utils import NpEncoder - - expected_transition_start_timestamps = sync_timestamps[::2] - - with open(os.getenv("DFATOOL_EXPORT_DRIFT_COMPENSATION"), "w") as f: - json.dump( - [ - expected_transition_start_timestamps, - transition_start_candidate_weights, - ], - f, - cls=NpEncoder, - ) - - return compensated_timestamps - - def export_sync(self): - # [1st trans start, 1st trans stop, 2nd trans start, 2nd trans stop, ...] - sync_timestamps = list() - - for i in range(4, len(self.sync_timestamps) - 8, 2): - sync_timestamps.append( - (self.sync_timestamps[i], self.sync_timestamps[i + 1]) - ) - - # EnergyTrace timestamps - timestamps = self.et_timestamps - - # EnergyTrace power values - power = self.et_power_values - - return {"sync": sync_timestamps, "timestamps": timestamps, "power": power} - - def plot(self, annotateData=None): - """ - Plots the power usage and the timestamps by logic analyzer - - :param annotateData: List of Strings with labels, only needed if annotated plots are wished - :return: None - """ - - def calculateRectangleCurve(timestamps, min_value=0, max_value=0.160): - import numpy as np - - data = [] - for ts in timestamps: - data.append(ts) - data.append(ts) - - a = np.empty((len(data),)) - a[0::4] = min_value - a[1::4] = max_value - a[2::4] = max_value - a[3::4] = min_value - return data, a # plotting by columns - - import matplotlib.pyplot as plt - - fig, ax = plt.subplots() - - if annotateData: - annot = ax.annotate( - "", - xy=(0, 0), - xytext=(20, 20), - textcoords="offset points", - bbox=dict(boxstyle="round", fc="w"), - arrowprops=dict(arrowstyle="->"), - ) - annot.set_visible(True) - - rectCurve_with_drift = calculateRectangleCurve( - self.sync_timestamps, max_value=max(self.et_power_values) - ) - - plt.plot(self.et_timestamps, self.et_power_values, label="Leistung") - plt.plot(self.et_timestamps, np.gradient(self.et_power_values), label="dP/dt") - - plt.plot( - rectCurve_with_drift[0], - rectCurve_with_drift[1], - "-g", - label="Synchronisationsignale mit Driftfaktor", - ) - - plt.xlabel("Zeit von EnergyTrace [s]") - plt.ylabel("Leistung [W]") - leg = plt.legend() - - def getDataText(x): - # print(x) - dl = len(annotateData) - for i, xt in enumerate(self.sync_timestamps): - if xt > x and i >= 4 and i - 5 < dl: - return f"SoT: {annotateData[i - 5]}" - - def update_annot(x, y, name): - annot.xy = (x, y) - text = name - - annot.set_text(text) - annot.get_bbox_patch().set_alpha(0.4) - - def hover(event): - if event.xdata and event.ydata: - annot.set_visible(False) - update_annot(event.xdata, event.ydata, getDataText(event.xdata)) - annot.set_visible(True) - fig.canvas.draw_idle() - - if annotateData: - fig.canvas.mpl_connect("motion_notify_event", hover) - - plt.show() - - def getStatesdfatool(self, state_sleep, with_traces=False, algorithm=False): - """ - Calculates the length and energy usage of the states - - :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle - :param algorithm: possible usage of accuracy algorithm / not implemented yet - :returns: returns list of states and transitions, starting with a transition and ending with astate - Each element is a dict containing: - * `isa`: 'state' or 'transition' - * `W_mean`: Mittelwert der Leistungsaufnahme - * `W_std`: Standardabweichung der Leistungsaufnahme - * `s`: Dauer - """ - if algorithm: - raise NotImplementedError - end_transition_ts = None - timestamps_sync_start = 0 - energy_trace_new = list() - - # sync_timestamps[3] is the start of the first (UNINITIALIZED) state (and the end of the benchmark-start sync pulse) - # sync_timestamps[-8] is the end of the final state and the corresponding UART dump (and the start of the benchmark-end sync pulses) - self.trigger_high_precision_timestamps = self.sync_timestamps[3:-7] - - self.trigger_edges = list() - for ts in self.trigger_high_precision_timestamps: - # Let ts be the trigger timestamp corresponding to the end of a transition. - # We are looking for an index i such that et_timestamps[i-1] <= ts < et_timestamps[i]. - # Then, et_power_values[i] (the mean power in the interval et_timestamps[i-1] .. et_timestamps[i]) is affected by the transition and - # et_power_values[i+1] is not affected by it. - # - # bisect_right does just what we need; bisect_left would correspond to et_timestamps[i-1] < ts <= et_timestamps[i]. - # Not that this is a moot point in practice, as ts ≠ et_timestamps[j] for almost all j. Also, the resolution of - # et_timestamps is several decades lower than the resolution of trigger_high_precision_timestamps. - self.trigger_edges.append(bisect_right(self.et_timestamps, ts)) - - # Loop over transitions. We start at the end of the first transition and handle the transition and the following state. - # We then proceed to the end of the second transition, etc. - for i in range(2, len(self.trigger_high_precision_timestamps), 2): - prev_state_start_index = self.trigger_edges[i - 2] - prev_state_stop_index = self.trigger_edges[i - 1] - transition_start_index = self.trigger_edges[i - 1] - transition_stop_index = self.trigger_edges[i] - state_start_index = self.trigger_edges[i] - state_stop_index = self.trigger_edges[i + 1] - - # If a transition takes less time than the energytrace measurement interval, its start and stop index may be the same. - # In this case, et_power_values[transition_start_index] is the only data point affected by the transition. - # We use the et_power_values slice [transition_start_index, transition_stop_index) to determine the mean power, so we need - # to increment transition_stop_index by 1 to end at et_power_values[transition_start_index] - # (as et_power_values[transition_start_index : transition_start_index+1 ] == [et_power_values[transition_start_index]) - if transition_stop_index == transition_start_index: - transition_stop_index += 1 - - prev_state_duration = ( - self.trigger_high_precision_timestamps[i + 1] - - self.trigger_high_precision_timestamps[i] - ) - transition_duration = ( - self.trigger_high_precision_timestamps[i] - - self.trigger_high_precision_timestamps[i - 1] - ) - state_duration = ( - self.trigger_high_precision_timestamps[i + 1] - - self.trigger_high_precision_timestamps[i] - ) - - # some states are followed by a UART dump of log data. This causes an increase in CPU energy - # consumption and is not part of the peripheral behaviour, so it should not be part of the benchmark results. - # If a case is followed by a UART dump, its duration is longer than the sleep duration between two transitions. - # In this case, we re-calculate the stop index, and calculate the state duration from coarse energytrace data - # instead of high-precision sync data - if ( - self.et_timestamps[prev_state_stop_index] - - self.et_timestamps[prev_state_start_index] - > state_sleep - ): - prev_state_stop_index = bisect_right( - self.et_timestamps, - self.et_timestamps[prev_state_start_index] + state_sleep, - ) - prev_state_duration = ( - self.et_timestamps[prev_state_stop_index] - - self.et_timestamps[prev_state_start_index] - ) - - if ( - self.et_timestamps[state_stop_index] - - self.et_timestamps[state_start_index] - > state_sleep - ): - state_stop_index = bisect_right( - self.et_timestamps, - self.et_timestamps[state_start_index] + state_sleep, - ) - state_duration = ( - self.et_timestamps[state_stop_index] - - self.et_timestamps[state_start_index] - ) - - prev_state_power = self.et_power_values[ - prev_state_start_index:prev_state_stop_index - ] - - transition_timestamps = self.et_timestamps[ - transition_start_index:transition_stop_index - ] - transition_power = self.et_power_values[ - transition_start_index:transition_stop_index - ] - - state_timestamps = self.et_timestamps[state_start_index:state_stop_index] - state_power = self.et_power_values[state_start_index:state_stop_index] - - transition = { - "isa": "transition", - "W_mean": np.mean(transition_power), - "W_std": np.std(transition_power), - "s": transition_duration, - "count_dp": len(transition_power), - } - if with_traces: - transition["plot"] = ( - transition_timestamps - transition_timestamps[0], - transition_power, - ) - - state = { - "isa": "state", - "W_mean": np.mean(state_power), - "W_std": np.std(state_power), - "s": state_duration, - } - if with_traces: - state["plot"] = (state_timestamps - state_timestamps[0], state_power) - - transition["W_mean_delta_prev"] = transition["W_mean"] - np.mean( - prev_state_power - ) - transition["W_mean_delta_next"] = transition["W_mean"] - state["W_mean"] - - energy_trace_new.append(transition) - energy_trace_new.append(state) - - return energy_trace_new diff --git a/lib/lennart/EnergyInterface.py b/lib/lennart/EnergyInterface.py deleted file mode 100644 index 55bf7c1..0000000 --- a/lib/lennart/EnergyInterface.py +++ /dev/null @@ -1,89 +0,0 @@ -import re -import subprocess - -from dfatool.lennart.DataInterface import DataInterface -import logging - -logger = logging.getLogger(__name__) - - -class EnergyInterface(DataInterface): - def __init__( - self, - duration_seconds=10, - console_output=False, - temp_file="temp/energytrace.log", - fake=False, - ): - """ - class is not used in embedded into dfatool. - - :param duration_seconds: seconds the EnergyTrace should be running - :param console_output: if EnergyTrace output should be printed to the user - :param temp_file: file path for temporary file - :param fake: if already existing file should be used - """ - self.energytrace = None - self.duration_seconds = duration_seconds - self.console_output = console_output - self.temp_file = temp_file - self.fake = fake - - def runMeasure(self): - """ - starts the measurement, with waiting for done - """ - if self.fake: - return - self.runMeasureAsynchronously() - self.waitForAsynchronousMeasure() - - def runMeasureAsynchronously(self): - """ - starts the measurement, not waiting for done - """ - if self.fake: - return - self.energytrace = subprocess.Popen( - "msp430-etv --save %s %s %s" - % ( - self.temp_file, - self.duration_seconds, - "" if self.console_output else "> /dev/null", - ), - shell=True, - ) - print( - "msp430-etv --save %s %s %s" - % ( - self.temp_file, - self.duration_seconds, - "" if self.console_output else "> /dev/null", - ) - ) - - def waitForAsynchronousMeasure(self): - """ - Wait until is command call is done - """ - if self.fake: - return - self.energytrace.wait() - - def setFile(self, path): - """ - changeing the temporary file - - :param path: file path of new temp file - :return: None - """ - self.temp_file = path - pass - - def forceStopMeasure(self): - """ - force stops the Measurement, with signals - :return: None - """ - self.energytrace.send_signal(subprocess.signal.SIGINT) - stdout, stderr = self.energytrace.communicate(timeout=15) diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py deleted file mode 100644 index 44da678..0000000 --- a/lib/lennart/SigrokAPIInterface.py +++ /dev/null @@ -1,150 +0,0 @@ -import time - -from dfatool.lennart.SigrokInterface import SigrokInterface - -import sigrok.core as sr -from sigrok.core.classes import * - -from util.ByteHelper import ByteHelper -import logging - -logger = logging.getLogger(__name__) - - -class SigrokAPIInterface(SigrokInterface): - def datafeed_changes(self, device, packet): - """ - Callback type with changes analysis - :param device: device object - :param packet: data (String with binary data) - """ - data = ByteHelper.rawbytes(self.output.receive(packet)) - if data: - # only using every second byte, - # because only every second contains the useful information. - for x in data[1::2]: - self.analyzeData(x) - - def datafeed_in_all(self, device, packet): - """ - Callback type which writes all data into the array - :param device: device object - :param packet: data (String with binary data) - """ - data = ByteHelper.rawbytes(self.output.receive(packet)) - if data: - # only using every second byte, - # because only every second contains the useful information. - self.all_data += data[1::2] - - def datafeed_file(self, device, packet): - """ - Callback type which writes all data into a file - :param device: device object - :param packet: data (String with binary data) - """ - data = ByteHelper.rawbytes(self.output.receive(packet)) - if data: - # only using every second byte, - # because only every second contains the useful information. - for x in data[1::2]: - self.file.write(str(x) + "\n") - - def __init__( - self, - driver="fx2lafw", - sample_rate=100_000, - debug_output=False, - used_datafeed=datafeed_changes, - fake=False, - ): - """ - - :param driver: Driver that should be used - :param sample_rate: The sample rate of the Logic analyzer - :param debug_output: Should be true if output should be displayed to user - :param used_datafeed: one of the datafeeds above, user later as callback. - :param fake: - """ - super(SigrokAPIInterface, self).__init__(sample_rate) - if fake: - raise NotImplementedError("Not implemented!") - self.used_datafeed = used_datafeed - - self.debug_output = debug_output - self.session = None - - def forceStopMeasure(self): - """ - Force stopping the measurement - :return: None - """ - self.session.stop() - - def runMeasure(self): - """ - Start the Measurement and set all settings - """ - context = sr.Context_create() - - devs = context.drivers[self.driver].scan() - # print(devs) - if len(devs) == 0: - raise RuntimeError("No device with that driver found!") - sigrokDevice = devs[0] - if len(devs) > 1: - raise Warning( - "Attention! Multiple devices with that driver found! Using ", - sigrokDevice.connection_id(), - ) - - sigrokDevice.open() - sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate) - - enabled_channels = ["D1"] - for channel in sigrokDevice.channels: - channel.enabled = channel.name in enabled_channels - - self.session = context.create_session() - self.session.add_device(sigrokDevice) - self.session.start() - - self.output = context.output_formats["binary"].create_output(sigrokDevice) - - print(context.output_formats) - self.all_data = b"" - - def datafeed(device, packet): - self.used_datafeed(self, device, packet) - - self.session.add_datafeed_callback(datafeed) - time_running = time.time() - self.session.run() - total_time = time.time() - time_running - print( - "Used time: ", - total_time * 1_000_000, - "µs", - ) - self.session.stop() - - if self.debug_output: - # if self.used_datafeed == self.datafeed_in_change: - if True: - changes = [x / self.sample_rate for x in self.changes] - print(changes) - is_on = self.start == 0xFF - print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW") - for x in range(len(changes) - 1): - is_on = not is_on - print( - changes[x], - " - ", - changes[x + 1], - " / ", - changes[x + 1] - changes[x], - " # Pin ", - "HIGH" if is_on else "LOW", - ) - elif self.used_datafeed == self.datafeed_in_all: - print(self.all_data) diff --git a/lib/lennart/SigrokCLIInterface.py b/lib/lennart/SigrokCLIInterface.py index b28a8a9..600c00f 100644 --- a/lib/lennart/SigrokCLIInterface.py +++ b/lib/lennart/SigrokCLIInterface.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import subprocess import time diff --git a/lib/lennart/SigrokInterface.py b/lib/lennart/SigrokInterface.py index 32e8fe2..a8b392f 100644 --- a/lib/lennart/SigrokInterface.py +++ b/lib/lennart/SigrokInterface.py @@ -1,7 +1,8 @@ +#!/usr/bin/env python3 + import json import numpy as np -from dfatool.lennart.DataInterface import DataInterface import logging logger = logging.getLogger(__name__) @@ -64,7 +65,7 @@ class SigrokResult: pass -class SigrokInterface(DataInterface): +class SigrokInterface: def __init__(self, sample_rate, driver="fx2lafw", filename="temp/sigrok.log"): """ |