diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/functions.py | 22 | ||||
-rw-r--r-- | lib/lennart/DataProcessor.py | 4 | ||||
-rw-r--r-- | lib/loader/energytrace.py | 82 | ||||
-rw-r--r-- | lib/loader/generic.py | 43 |
4 files changed, 73 insertions, 78 deletions
diff --git a/lib/functions.py b/lib/functions.py index 5c2e8ff..33ecaca 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -153,6 +153,21 @@ class NormalizationFunction: class ModelFunction: + """ + Encapsulates the behaviour of a single model attribute, e.g. TX power or write duration. + + The behaviour may be constant or depend on a number of factors. Modelfunction is a virtual base class, + individuel decendents describe actual behaviour. + + Common attributes: + :param value: median data value + :type value: float + :param value_error: static model value error + :type value_error: dict, optional + :param function_error: model error + :type value_error: dict, optional + """ + def __init__(self, value): # a model always has a static (median/mean) value. For StaticFunction, it's the only data point. # For more complex models, it's usede both as fallback in case the model cannot predict the current @@ -171,11 +186,13 @@ class ModelFunction: raise NotImplementedError def eval_mae(self, param_list): + """Return model Mean Absolute Error (MAE) for `param_list`.""" if self.is_predictable(param_list): return self.function_error["mae"] return self.value_error["mae"] def to_json(self): + """Convert model to JSON.""" ret = { "value": self.value, "valueError": self.value_error, @@ -185,6 +202,11 @@ class ModelFunction: @classmethod def from_json(cls, data): + """ + Create ModelFunction instance from JSON. + + Delegates to StaticFunction, SplitFunction, etc. as appropriate. + """ if data["type"] == "static": mf = StaticFunction.from_json(data) elif data["type"] == "split": diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py index 6df813e..c211beb 100644 --- a/lib/lennart/DataProcessor.py +++ b/lib/lennart/DataProcessor.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 + +# XXX deprecated and unused import numpy as np import logging import os @@ -49,7 +51,7 @@ class DataProcessor: # Remove bogus data before / after the measurement - time_stamp_data = self.sync_data.timestamps + time_stamp_data = self.sync_data for x in range(1, len(time_stamp_data)): if time_stamp_data[x] - time_stamp_data[x - 1] > 1.3: time_stamp_data = time_stamp_data[x:] diff --git a/lib/loader/energytrace.py b/lib/loader/energytrace.py index d7f3f88..0ffaac7 100644 --- a/lib/loader/energytrace.py +++ b/lib/loader/energytrace.py @@ -627,7 +627,7 @@ class EnergyTraceWithBarcode: return None, None, None, None -class EnergyTraceWithLogicAnalyzer: +class EnergyTraceWithLogicAnalyzer(ExternalTimerSync): def __init__( self, voltage: float, @@ -650,12 +650,15 @@ class EnergyTraceWithLogicAnalyzer: self.with_traces = with_traces self.errors = list() + self.sync_min_duration = 0.7 + self.sync_min_low_count = 3 + self.sync_min_high_count = 1 + self.sync_power = 0.011 + def load_data(self, log_data): - from dfatool.lennart.SigrokInterface import SigrokResult - from dfatool.lennart.EnergyInterface import EnergyInterface + la_data = json.loads(log_data[0]) + self.sync_data = la_data["timestamps"] - # Daten laden - self.sync_data = SigrokResult.fromString(log_data[0]) ( self.interval_start_timestamp, self.interval_duration, @@ -664,65 +667,26 @@ class EnergyTraceWithLogicAnalyzer: self.hw_statechange_indexes, ) = _load_energytrace(log_data[1]) - def analyze_states(self, traces, offline_index: int): - """ - Split log data into states and transitions and return duration, energy, and mean power for each element. - - :param traces: expected traces, needed to synchronize with the measurement. - traces is a list of runs, traces[*]['trace'] is a single run - (i.e. a list of states and transitions, starting with a transition - and ending with a state). - :param offline_index: This function uses traces[*]['trace'][*]['online_aggregates']['duration'][offline_index] to find sync codes + self.timestamps = self.interval_start_timestamp + self.data = self.interval_power - :param charges: raw charges (each element describes the charge in pJ transferred during 10 µs) - :param trigidx: "charges" indexes corresponding to a trigger edge, see `trigger_edges` - :param ua_func: charge(pJ) -> current(µA) function as returned by `calibration_function` + for x in range(1, len(self.sync_data)): + if self.sync_data[x] - self.sync_data[x - 1] > 1.3: + self.sync_data = self.sync_data[x:] + break - :returns: returns list of states and transitions, starting with a transition and ending with astate - Each element is a dict containing: - * `isa`: 'state' or 'transition' - * `W_mean`: Mittelwert der Leistungsaufnahme - * `W_std`: Standardabweichung der Leistungsaufnahme - * `s`: Dauer - if isa == 'transition, it also contains: - * `W_mean_delta_prev`: Differenz zwischen W_mean und W_mean des vorherigen Zustands - * `W_mean_delta_next`: Differenz zwischen W_mean und W_mean des Folgezustands - """ + for x in reversed(range(1, len(self.sync_data))): + if self.sync_data[x] - self.sync_data[x - 1] > 1.3: + self.sync_data = self.sync_data[:x] + break - names = [] - for trace_number, trace in enumerate(traces): - for state_or_transition in trace["trace"]: - names.append(state_or_transition["name"]) - # print(names[:15]) - from dfatool.lennart.DataProcessor import DataProcessor - - dp = DataProcessor( - sync_data=self.sync_data, - et_timestamps=self.interval_start_timestamp, - et_power=self.interval_power, - hw_statechange_indexes=self.hw_statechange_indexes, - offline_index=offline_index, + self.online_timestamps = self.sync_data[2:3] + self.sync_data[4:-7] + self.online_timestamps = ( + np.array(self.online_timestamps) - self.online_timestamps[0] ) - dp.run() - energy_trace_new = dp.getStatesdfatool( - state_sleep=self.state_duration, with_traces=self.with_traces - ) - # Uncomment to plot traces - if os.getenv("DFATOOL_PLOT_LASYNC") is not None and offline_index == int( - os.getenv("DFATOOL_PLOT_LASYNC") - ): - dp.plot() # <- plot traces with sync annotatons - # dp.plot(names) # <- plot annotated traces (with state/transition names) - if os.getenv("DFATOOL_EXPORT_LASYNC") is not None: - filename = os.getenv("DFATOOL_EXPORT_LASYNC") + f"_{offline_index}.json" - with open(filename, "w") as f: - json.dump(dp.export_sync(), f, cls=NpEncoder) - logger.info("Exported data and LA sync timestamps to {filename}") - - energy_trace = list() - expected_transitions = list() - return energy_trace_new + def analyze_states(self, expected_trace, repeat_id): + return super().analyze_states(expected_trace, repeat_id, self.online_timestamps) class EnergyTraceWithTimer(ExternalTimerSync): diff --git a/lib/loader/generic.py b/lib/loader/generic.py index 984cf06..78305e0 100644 --- a/lib/loader/generic.py +++ b/lib/loader/generic.py @@ -41,7 +41,10 @@ class ExternalTimerSync: # * self.sync_min_high_count, self.sync_min_low_count: outlier handling in synchronization pulse detection # * self.sync_power, self.sync_min_duration: synchronization pulse parameters. one pulse before the measurement, two pulses afterwards # expected_trace must contain online timestamps - def analyze_states(self, expected_trace, repeat_id): + def analyze_states(self, expected_trace, repeat_id, online_timestamps=None): + """ + :param online_timestamps: must start at 0, if set + """ sync_start = None sync_timestamps = list() high_count = 0 @@ -81,24 +84,28 @@ class ExternalTimerSync: start_ts = sync_timestamps[0][1] end_ts = sync_timestamps[1][0] - # start and end of first state - online_timestamps = [0, expected_trace[0]["start_offset"][repeat_id]] - - # remaining events from the end of the first transition (start of second state) to the end of the last observed state - try: - for trace in expected_trace: - for word in trace["trace"]: - online_timestamps.append( - online_timestamps[-1] - + word["online_aggregates"]["duration"][repeat_id] - ) - except IndexError: - self.errors.append( - f"""offline_index {repeat_id} missing in trace {trace["id"]}""" - ) - return list() + if online_timestamps is None: + # start and end of first state + online_timestamps = [0, expected_trace[0]["start_offset"][repeat_id]] + + # remaining events from the end of the first transition (start of second state) to the end of the last observed state + try: + for trace in expected_trace: + for word in trace["trace"]: + online_timestamps.append( + online_timestamps[-1] + + word["online_aggregates"]["duration"][repeat_id] + ) + except IndexError: + self.errors.append( + f"""offline_index {repeat_id} missing in trace {trace["id"]}""" + ) + return list() + + online_timestamps = np.array(online_timestamps) * 1e-6 + else: + online_timestamps = np.array(online_timestamps) - online_timestamps = np.array(online_timestamps) * 1e-6 online_timestamps = ( online_timestamps * ((end_ts - start_ts) / (online_timestamps[-1] - online_timestamps[0])) |