From 81082d00444d3bfd644f2417ef5c1a34d9569a28 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Fri, 16 Oct 2020 13:58:20 +0200 Subject: Code aus Lennarts BA-repo --- lib/lennart/DataInterface.py | 24 +++ lib/lennart/DataProcessor.py | 374 ++++++++++++++++++++++++++++++++++++++ lib/lennart/EnergyInterface.py | 119 ++++++++++++ lib/lennart/SigrokAPIInterface.py | 152 ++++++++++++++++ lib/lennart/SigrokCLIInterface.py | 78 ++++++++ lib/lennart/SigrokInterface.py | 165 +++++++++++++++++ lib/lennart/__init__.py | 0 lib/loader.py | 12 +- lib/runner.py | 2 +- 9 files changed, 919 insertions(+), 7 deletions(-) create mode 100644 lib/lennart/DataInterface.py create mode 100644 lib/lennart/DataProcessor.py create mode 100644 lib/lennart/EnergyInterface.py create mode 100644 lib/lennart/SigrokAPIInterface.py create mode 100644 lib/lennart/SigrokCLIInterface.py create mode 100644 lib/lennart/SigrokInterface.py create mode 100644 lib/lennart/__init__.py diff --git a/lib/lennart/DataInterface.py b/lib/lennart/DataInterface.py new file mode 100644 index 0000000..4495db2 --- /dev/null +++ b/lib/lennart/DataInterface.py @@ -0,0 +1,24 @@ +class DataInterface: + def runMeasure(self): + """ + Implemented in subclasses. + + Starts the measurement + """ + raise NotImplementedError("The method not implemented") + + def getData(self): + """ + Implemented in subclasses + + :returns: gathered data + """ + raise NotImplementedError("The method not implemented") + + def forceStopMeasure(self): + """ + Implemented in subclasses + + Force stops the measurement + """ + raise NotImplementedError("The method not implemented") diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py new file mode 100644 index 0000000..df3f41c --- /dev/null +++ b/lib/lennart/DataProcessor.py @@ -0,0 +1,374 @@ +class DataProcessor: + def __init__(self, sync_data, energy_data): + """ + Creates DataProcessor object. + + :param sync_data: input timestamps (SigrokResult) + :param energy_data: List of EnergyTrace datapoints + """ + self.reduced_timestamps = [] + self.modified_timestamps = [] + self.plot_data_x = [] + self.plot_data_y = [] + self.sync_data = sync_data + self.energy_data = energy_data + self.start_offset = 0 + + self.power_sync_watt = 0.011 + self.power_sync_len = 0.7 + self.power_sync_max_outliers = 2 + + def run(self): + """ + Main Function to remove unwanted data, get synchronization points, add the offset and add drift. + :return: None + """ + # remove Dirty Data from previously running program (happens if logic Analyzer Measurement starts earlier than + # the HW Reset from energytrace) + use_data_after_index = 0 + for x in range(1, len(self.sync_data.timestamps)): + if self.sync_data.timestamps[x] - self.sync_data.timestamps[x - 1] > 1.3: + use_data_after_index = x + break + + time_stamp_data = self.sync_data.timestamps[use_data_after_index:] + + last_data = [0, 0, 0, 0] + + # clean timestamp data, if at the end strange ts got added somehow + time_stamp_data = self.removeTooFarDatasets(time_stamp_data) + + self.reduced_timestamps = time_stamp_data + + # NEW + datasync_timestamps = [] + sync_start = 0 + outliers = 0 + pre_outliers_ts = None + for i, energytrace_dataset in enumerate(self.energy_data): + usedtime = energytrace_dataset[0] - last_data[0] # in microseconds + timestamp = energytrace_dataset[0] + usedenergy = energytrace_dataset[3] - last_data[3] + power = usedenergy / usedtime * 10 ** -3 # in watts + if power > 0: + if power > self.power_sync_watt: + if sync_start is None: + sync_start = timestamp + outliers = 0 + else: + # Sync point over or outliers + if outliers == 0: + pre_outliers_ts = timestamp + outliers += 1 + if outliers > self.power_sync_max_outliers: + if sync_start is not None: + if ( + pre_outliers_ts - sync_start + ) / 1_000_000 > self.power_sync_len: + datasync_timestamps.append( + ( + sync_start / 1_000_000, + pre_outliers_ts / 1_000_000, + ) + ) + sync_start = None + + last_data = energytrace_dataset + + self.plot_data_x.append(energytrace_dataset[0] / 1_000_000) + self.plot_data_y.append(power) + + if power > self.power_sync_watt: + if (self.energy_data[-1][0] - sync_start) / 1_000_000 > self.power_sync_len: + datasync_timestamps.append( + (sync_start / 1_000_000, pre_outliers_ts / 1_000_000) + ) + + # print("SYNC SPOTS: ", datasync_timestamps) + # print(time_stamp_data[2]) + + start_offset = datasync_timestamps[0][1] - time_stamp_data[2] + start_timestamp = datasync_timestamps[0][1] + + end_offset = datasync_timestamps[-2][0] - (time_stamp_data[-8] + start_offset) + end_timestamp = datasync_timestamps[-2][0] + print(start_timestamp, end_timestamp) + + # print(start_offset, start_timestamp, end_offset, end_timestamp) + + with_offset = self.addOffset(time_stamp_data, start_offset) + + with_drift = self.addDrift( + with_offset, end_timestamp, end_offset, start_timestamp + ) + + self.modified_timestamps = with_drift + + def addOffset(self, input_timestamps, start_offset): + """ + Add begin offset at start + + :param input_timestamps: List of timestamps (float list) + :param start_offset: Timestamp of last EnergyTrace datapoint at the first sync point + :return: List of modified timestamps (float list) + """ + modified_timestamps_with_offset = [] + for x in input_timestamps: + if x + start_offset >= 0: + modified_timestamps_with_offset.append(x + start_offset) + return modified_timestamps_with_offset + + def removeTooFarDatasets(self, input_timestamps): + """ + Removing datasets, that are to far away at ethe end + + :param input_timestamps: List of timestamps (float list) + :return: List of modified timestamps (float list) + """ + modified_timestamps = [] + for i, x in enumerate(input_timestamps): + # print(x - input_timestamps[i - 1], x - input_timestamps[i - 1] < 2.5) + if x - input_timestamps[i - 1] < 1.6: + modified_timestamps.append(x) + else: + break + return modified_timestamps + + def addDrift(self, input_timestamps, end_timestamp, end_offset, start_timestamp): + """ + Add drift to datapoints + + :param input_timestamps: List of timestamps (float list) + :param end_timestamp: Timestamp of first EnergyTrace datapoint at the second last sync point + :param end_offset: the time between end_timestamp and the timestamp of synchronisation signal + :param start_timestamp: Timestamp of last EnergyTrace datapoint at the first sync point + :return: List of modified timestamps (float list) + """ + endFactor = (end_timestamp + end_offset - start_timestamp) / ( + end_timestamp - start_timestamp + ) + modified_timestamps_with_drift = [] + for x in input_timestamps: + modified_timestamps_with_drift.append( + ((x - start_timestamp) * endFactor) + start_timestamp + ) + + return modified_timestamps_with_drift + + def plot(self, annotateData=None): + """ + Plots the power usage and the timestamps by logic analyzer + + :param annotateData: List of Strings with labels, only needed if annotated plots are wished + :return: None + """ + + def calculateRectangleCurve(timestamps, min_value=0, max_value=0.160): + import numpy as np + + data = [] + for ts in timestamps: + data.append(ts) + data.append(ts) + + a = np.empty((len(data),)) + a[1::4] = max_value + a[2::4] = max_value + a[3::4] = min_value + a[4::4] = min_value + return data, a # plotting by columns + + import matplotlib.pyplot as plt + + fig, ax = plt.subplots() + + if annotateData: + annot = ax.annotate( + "", + xy=(0, 0), + xytext=(20, 20), + textcoords="offset points", + bbox=dict(boxstyle="round", fc="w"), + arrowprops=dict(arrowstyle="->"), + ) + annot.set_visible(True) + + rectCurve_with_drift = calculateRectangleCurve( + self.modified_timestamps, max_value=max(self.plot_data_y) + ) + + plt.plot(self.plot_data_x, self.plot_data_y, label="Leistung") + + plt.plot( + rectCurve_with_drift[0], + rectCurve_with_drift[1], + "-g", + label="Synchronisationsignale mit Driftfaktor", + ) + + plt.xlabel("Zeit [s]") + plt.ylabel("Leistung [W]") + leg = plt.legend() + + def getDataText(x): + # print(x) + for i, xt in enumerate(self.modified_timestamps): + if xt > x: + return "Value: %s" % annotateData[i - 5] + + def update_annot(x, y, name): + annot.xy = (x, y) + text = name + + annot.set_text(text) + annot.get_bbox_patch().set_alpha(0.4) + + def hover(event): + if event.xdata and event.ydata: + annot.set_visible(False) + update_annot(event.xdata, event.ydata, getDataText(event.xdata)) + annot.set_visible(True) + fig.canvas.draw_idle() + + if annotateData: + fig.canvas.mpl_connect("motion_notify_event", hover) + + plt.show() + + def getPowerBetween(self, start, end, state_sleep): # 0.001469 + """ + calculates the average powerusage in interval + NOT SIDE EFFECT FREE, DON'T USE IT EVERYWHERE + + :param start: Start timestamp of interval + :param end: End timestamp of interval + :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle + :return: float with average power usage + """ + first_index = 0 + all_power = [] + for ind in range(self.start_offset, len(self.plot_data_x)): + first_index = ind + if self.plot_data_x[ind] > start: + break + + nextIndAfterIndex = None + for ind in range(first_index, len(self.plot_data_x)): + nextIndAfterIndex = ind + if ( + self.plot_data_x[ind] > end + or self.plot_data_x[ind] > start + state_sleep + ): + self.start_offset = ind - 1 + break + all_power.append(self.plot_data_y[ind]) + + # TODO Idea remove datapoints that are too far away + def removeSD_Mean_Values(arr): + import numpy + + elements = numpy.array(arr) + + mean = numpy.mean(elements, axis=0) + sd = numpy.std(elements, axis=0) + + return [x for x in arr if (mean - 1 * sd < x < mean + 1.5 * sd)] + + if len(all_power) > 10: + # all_power = removeSD_Mean_Values(all_power) + pass + # TODO algorithm relocate datapoint + + pre_fix_len = len(all_power) + if len(all_power) == 0: + # print("PROBLEM") + all_power.append(self.plot_data_y[nextIndAfterIndex]) + elif len(all_power) == 1: + # print("OKAY") + pass + return pre_fix_len, sum(all_power) / len(all_power) + + def getStatesdfatool(self, state_sleep, algorithm=False): + """ + Calculates the length and energy usage of the states + + :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle + :param algorithm: possible usage of accuracy algorithm / not implemented yet + :returns: returns list of states and transitions, starting with a transition and ending with astate + Each element is a dict containing: + * `isa`: 'state' or 'transition' + * `W_mean`: Mittelwert der Leistungsaufnahme + * `W_std`: Standardabweichung der Leistungsaufnahme + * `s`: Dauer + """ + if algorithm: + raise NotImplementedError + end_transition_ts = None + timestamps_sync_start = 0 + energy_trace_new = list() + + for ts_index in range( + 0 + timestamps_sync_start, int(len(self.modified_timestamps) / 2) + ): + start_transition_ts = self.modified_timestamps[ts_index * 2] + start_transition_ts_timing = self.reduced_timestamps[ts_index * 2] + + if end_transition_ts is not None: + count_dp, power = self.getPowerBetween( + end_transition_ts, start_transition_ts, state_sleep + ) + + # print("STATE", end_transition_ts * 10 ** 6, start_transition_ts * 10 ** 6, (start_transition_ts - end_transition_ts) * 10 ** 6, power) + if ( + (start_transition_ts - end_transition_ts) * 10 ** 6 > 900_000 + and power > self.power_sync_watt * 0.9 + and ts_index > 10 + ): + # remove last transition and stop (upcoming data only sync) + del energy_trace_new[-1] + break + pass + + state = { + "isa": "state", + "W_mean": power, + "W_std": 0.0001, + "s": ( + start_transition_ts_timing - end_transition_ts_timing + ), # * 10 ** 6, + } + energy_trace_new.append(state) + + energy_trace_new[-2]["W_mean_delta_next"] = ( + energy_trace_new[-2]["W_mean"] - energy_trace_new[-1]["W_mean"] + ) + + # get energy end_transition_ts + end_transition_ts = self.modified_timestamps[ts_index * 2 + 1] + count_dp, power = self.getPowerBetween( + start_transition_ts, end_transition_ts, state_sleep + ) + + # print("TRANS", start_transition_ts * 10 ** 6, end_transition_ts * 10 ** 6, (end_transition_ts - start_transition_ts) * 10 ** 6, power) + end_transition_ts_timing = self.reduced_timestamps[ts_index * 2 + 1] + + transition = { + "isa": "transition", + "W_mean": power, + "W_std": 0.0001, + "s": ( + end_transition_ts_timing - start_transition_ts_timing + ), # * 10 ** 6, + "count_dp": count_dp, + } + + if (end_transition_ts - start_transition_ts) * 10 ** 6 > 2_000_000: + # TODO Last data set corrupted? HOT FIX!!!!!!!!!!!! REMOVE LATER + # for x in range(4): + # del energy_trace_new[-1] + # break + pass + + energy_trace_new.append(transition) + # print(start_transition_ts, "-", end_transition_ts, "-", end_transition_ts - start_transition_ts) + return energy_trace_new diff --git a/lib/lennart/EnergyInterface.py b/lib/lennart/EnergyInterface.py new file mode 100644 index 0000000..2b23667 --- /dev/null +++ b/lib/lennart/EnergyInterface.py @@ -0,0 +1,119 @@ +import re +import subprocess + +from dfatool.lennart.DataInterface import DataInterface + + +class EnergyInterface(DataInterface): + def __init__( + self, + duration_seconds=10, + console_output=False, + temp_file="temp/energytrace.log", + fake=False, + ): + """ + class is not used in embedded into dfatool. + + :param duration_seconds: seconds the EnergyTrace should be running + :param console_output: if EnergyTrace output should be printed to the user + :param temp_file: file path for temporary file + :param fake: if already existing file should be used + """ + self.energytrace = None + self.duration_seconds = duration_seconds + self.console_output = console_output + self.temp_file = temp_file + self.fake = fake + + def runMeasure(self): + """ + starts the measurement, with waiting for done + """ + if self.fake: + return + self.runMeasureAsynchronously() + self.waitForAsynchronousMeasure() + + def runMeasureAsynchronously(self): + """ + starts the measurement, not waiting for done + """ + if self.fake: + return + self.energytrace = subprocess.Popen( + "msp430-etv --save %s %s %s" + % ( + self.temp_file, + self.duration_seconds, + "" if self.console_output else "> /dev/null", + ), + shell=True, + ) + print( + "msp430-etv --save %s %s %s" + % ( + self.temp_file, + self.duration_seconds, + "" if self.console_output else "> /dev/null", + ) + ) + + def waitForAsynchronousMeasure(self): + """ + Wait until is command call is done + """ + if self.fake: + return + self.energytrace.wait() + + def getData(self): + """ + cleans the string data and creates int list + :return: list of data, in format [[int,int,int,int], [int,int,int,int], ... ] + """ + energytrace_log = open(self.temp_file) + lines = energytrace_log.readlines()[21:] + data = [] + for line in lines: + if "MSP430_DisableEnergyTrace" in line: + break + else: + data.append([int(i) for i in line.split()]) + return data + + @classmethod + def getDataFromString(cls, string, delimiter="\\n"): + """ + Parsing the data from string + + :param string: input string which will be parsed + :param delimiter: for normal file its \n + :return: list of data, in format [[int,int,int,int], [int,int,int,int], ... ] + """ + lines = string.split(delimiter)[21:] + data = [] + for line in lines: + if "MSP430_DisableEnergyTrace" in line: + break + else: + data.append([int(i) for i in line.split()]) + return data + + def setFile(self, path): + """ + changeing the temporary file + + :param path: file path of new temp file + :return: None + """ + self.temp_file = path + pass + + def forceStopMeasure(self): + """ + force stops the Measurement, with signals + :return: None + """ + self.energytrace.send_signal(subprocess.signal.SIGINT) + stdout, stderr = self.energytrace.communicate(timeout=15) diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py new file mode 100644 index 0000000..a8765ba --- /dev/null +++ b/lib/lennart/SigrokAPIInterface.py @@ -0,0 +1,152 @@ +import time + +from dfatool.lennart.SigrokInterface import SigrokInterface + +import sigrok.core as sr +from sigrok.core.classes import * + +from util.ByteHelper import ByteHelper + + +class SigrokAPIInterface(SigrokInterface): + def datafeed_changes(self, device, packet): + """ + Callback type with changes analysis + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + for x in data[1::2]: + self.analyzeData(x) + + def datafeed_in_all(self, device, packet): + """ + Callback type which writes all data into the array + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + self.all_data += data[1::2] + + def datafeed_file(self, device, packet): + """ + Callback type which writes all data into a file + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + for x in data[1::2]: + self.file.write(str(x) + "\n") + + def __init__( + self, + driver="fx2lafw", + sample_rate=100_000, + sample_count=1_000_000, + debug_output=False, + used_datafeed=datafeed_changes, + fake=False, + ): + """ + + :param driver: Driver that should be used + :param sample_rate: The sample rate of the Logic analyzer + :param sample_count: The sample count of the Logic analyzer + :param debug_output: Should be true if output should be displayed to user + :param used_datafeed: one of the datafeeds above, user later as callback. + :param fake: + """ + super(SigrokAPIInterface, self).__init__(sample_rate, sample_count) + if fake: + raise NotImplementedError("Not implemented!") + self.used_datafeed = used_datafeed + + self.debug_output = debug_output + self.session = None + + def forceStopMeasure(self): + """ + Force stopping the measurement + :return: None + """ + self.session.stop() + + def runMeasure(self): + """ + Start the Measurement and set all settings + """ + context = sr.Context_create() + + devs = context.drivers[self.driver].scan() + # print(devs) + if len(devs) == 0: + raise RuntimeError("No device with that driver found!") + sigrokDevice = devs[0] + if len(devs) > 1: + raise Warning( + "Attention! Multiple devices with that driver found! Using ", + sigrokDevice.connection_id(), + ) + + sigrokDevice.open() + sigrokDevice.config_set(ConfigKey.LIMIT_SAMPLES, self.sample_count) + sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate) + + enabled_channels = ["D1"] + for channel in sigrokDevice.channels: + channel.enabled = channel.name in enabled_channels + + self.session = context.create_session() + self.session.add_device(sigrokDevice) + self.session.start() + + self.output = context.output_formats["binary"].create_output(sigrokDevice) + + print(context.output_formats) + self.all_data = b"" + + def datafeed(device, packet): + self.used_datafeed(self, device, packet) + + self.session.add_datafeed_callback(datafeed) + time_running = time.time() + self.session.run() + total_time = time.time() - time_running + print( + "Used time: ", + total_time * 1_000_000, + "µs | sample/s: ", + self.sample_count / (total_time), + "Hz ", + ) + self.session.stop() + + if self.debug_output: + # if self.used_datafeed == self.datafeed_in_change: + if True: + changes = [x / self.sample_rate for x in self.changes] + print(changes) + is_on = self.start == 0xFF + print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW") + for x in range(len(changes) - 1): + is_on = not is_on + print( + changes[x], + " - ", + changes[x + 1], + " / ", + changes[x + 1] - changes[x], + " # Pin ", + "HIGH" if is_on else "LOW", + ) + elif self.used_datafeed == self.datafeed_in_all: + print(self.all_data) diff --git a/lib/lennart/SigrokCLIInterface.py b/lib/lennart/SigrokCLIInterface.py new file mode 100644 index 0000000..873b226 --- /dev/null +++ b/lib/lennart/SigrokCLIInterface.py @@ -0,0 +1,78 @@ +import subprocess +import time + +from dfatool.lennart.SigrokInterface import SigrokInterface + + +class SigrokCLIInterface(SigrokInterface): + def __init__( + self, + bin_temp_file="temp/out.bin", + sample_rate=100_000, + sample_count=1_000_000, + fake=False, + ): + """ + creates SigrokCLIInterface object. Uses the CLI Interface (Command: sigrok-cli) + + :param bin_temp_file: temporary file for binary output + :param sample_rate: The sample rate of the Logic analyzer + :param sample_count: The sample count of the Logic analyzer + :param fake: if it should use existing data + """ + super(SigrokCLIInterface, self).__init__(sample_rate, sample_count) + self.fake = fake + self.bin_temp_file = bin_temp_file + self.sigrok_cli_thread = None + + def forceStopMeasure(self): + """ + Force stopping measure, sometimes needs pkill for killing definitly + :return: None + """ + self.sigrok_cli_thread.send_signal(subprocess.signal.SIGKILL) + stdout, stderr = self.sigrok_cli_thread.communicate(timeout=15) + time.sleep(5) + # TODO not nice solution, make better + import os + + os.system("pkill -f sigrok") + self.runOpenAnalyze() + + def runMeasure(self): + """ + starts the measurement, with waiting for done + """ + if not self.fake: + self.runMeasureAsynchronous() + self.waitForAsynchronousMeasure() + + def runMeasureAsynchronous(self): + """ + starts the measurement, not waiting for done + """ + shellcommand = ( + 'sigrok-cli --output-file %s --output-format binary --samples %s -d %s --config "samplerate=%s Hz"' + % (self.bin_temp_file, self.sample_count, self.driver, self.sample_rate) + ) + self.sigrok_cli_thread = subprocess.Popen(shellcommand, shell=True) + + def waitForAsynchronousMeasure(self): + """ + Wait until is command call is done + """ + if not self.fake: + self.sigrok_cli_thread.wait() + self.runOpenAnalyze() + + def runOpenAnalyze(self): + """ + Opens the generated binary file and parses it byte by byte + + """ + in_file = open(self.bin_temp_file, "rb") # opening for [r]eading as [b]inary + data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) + in_file.close() + + for x in data: + self.analyzeData(x) diff --git a/lib/lennart/SigrokInterface.py b/lib/lennart/SigrokInterface.py new file mode 100644 index 0000000..555a25f --- /dev/null +++ b/lib/lennart/SigrokInterface.py @@ -0,0 +1,165 @@ +import json + +from dfatool.lennart.DataInterface import DataInterface + + +# Adding additional parsing functionality +class SigrokResult: + def __init__(self, timestamps, onbeforefirstchange): + """ + Creates SigrokResult object, struct for timestamps and onBeforeFirstChange. + + :param timestamps: list of changing timestamps + :param onbeforefirstchange: if the state before the first change is already on / should always be off, just to be data correct + """ + self.timestamps = timestamps + self.onBeforeFirstChange = onbeforefirstchange + + def __str__(self): + """ + :return: string representation of object + """ + return "" % ( + self.onBeforeFirstChange, + self.timestamps, + ) + + def getDict(self): + """ + :return: dict representation of object + """ + data = { + "onBeforeFirstChange": self.onBeforeFirstChange, + "timestamps": self.timestamps, + } + return data + + @classmethod + def fromFile(cls, path): + """ + Generates SigrokResult from json_file + + :param path: file path + :return: SigrokResult object + """ + with open(path) as json_file: + data = json.load(json_file) + return SigrokResult(data["timestamps"], data["onBeforeFirstChange"]) + pass + + @classmethod + def fromString(cls, string): + """ + Generates SigrokResult from string + + :param string: string + :return: SigrokResult object + """ + data = json.loads(string) + return SigrokResult(data["timestamps"], data["onBeforeFirstChange"]) + pass + + @classmethod + def fromTraces(cls, traces): + """ + Generates SigrokResult from ptalog.json traces + + :param traces: traces from dfatool ptalog.json + :return: SigrokResult object + """ + timestamps = [0] + for tr in traces: + for t in tr["trace"]: + # print(t['online_aggregates']['duration'][0]) + timestamps.append( + timestamps[-1] + (t["online_aggregates"]["duration"][0] * 10 ** -6) + ) + + # print(timestamps) + # prepend FAKE Sync point + t_neu = [0.0, 0.0000001, 1.0, 1.00000001] + for i, x in enumerate(timestamps): + t_neu.append( + round(float(x) + t_neu[3] + 0.20, 6) + ) # list(map(float, t_ist.split(",")[:i+1])) + + # append FAKE Sync point / eine überschneidung + # [30.403632, 30.403639, 31.407265, 31.407271] + # appendData = [29.144855,30.148495,30.148502,30.403632,30.403639,31.407265,31.407271,] + appendData = [0, 1.000001, 1.000002, 1.25, 1.2500001] + + # TODO future work here, why does the sync not work completely + t_neu[-1] = ( + t_neu[-2] + (t_neu[-1] - t_neu[-2]) * 0.9 + ) # Weird offset failure with UART stuff + + offset = t_neu[-1] - appendData[0] + for x in appendData: + t_neu.append(x + offset) + + # print(t_neu) + print(len(t_neu)) + return SigrokResult(t_neu, False) + + +class SigrokInterface(DataInterface): + def __init__( + self, sample_rate, sample_count, driver="fx2lafw", filename="temp/sigrok.log" + ): + """ + + :param sample_rate: Samplerate of the Logic Analyzer + :param sample_count: Count of samples + :param driver: for many Logic Analyzer from Saleae the "fx2lafw" should be working + :param filename: temporary file name + """ + # options + self.sample_count = sample_count + self.sample_rate = sample_rate + self.file = open(filename, "w+") + self.driver = driver + + # internal data + self.changes = [] + self.start = None + self.last_val = None + self.index = 0 + + def runMeasure(self): + """ + Not implemented because implemented in subclasses + :return: None + """ + raise NotImplementedError("The method not implemented") + + def forceStopMeasure(self): + """ + Not implemented because implemented in subclasses + :return: None + """ + raise NotImplementedError("The method not implemented") + + def getData(self): + """ + + :return: + """ + # return sigrok_energy_api_result(self.changes, True if self.start == 0xff else False) + return SigrokResult( + [x / self.sample_rate for x in self.changes], + True if self.start == 0xFF else False, + ) + + def analyzeData(self, byte): + """ + analyze one byte if it differs from the last byte, it will be appended to changes. + + :param byte: one byte to analyze + """ + if self.start is None: + self.start = byte + self.last_val = byte + if byte != self.last_val: + self.changes.append(self.index) + self.last_val = byte + self.index += 1 diff --git a/lib/lennart/__init__.py b/lib/lennart/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/loader.py b/lib/loader.py index fcd5490..568c98c 100644 --- a/lib/loader.py +++ b/lib/loader.py @@ -1638,8 +1638,8 @@ class EnergyTraceWithLogicAnalyzer: self.errors = list() def load_data(self, log_data): - from data.timing.SigrokInterface import SigrokResult - from data.energy.EnergyInterface import EnergyInterface + from dfatool.lennart.SigrokInterface import SigrokResult + from dfatool.lennart.EnergyInterface import EnergyInterface # Daten laden self.sync_data = SigrokResult.fromString(log_data[0]) @@ -1677,7 +1677,7 @@ class EnergyTraceWithLogicAnalyzer: for state_or_transition in trace["trace"]: names.append(state_or_transition["name"]) # print(names[:15]) - from data.processing.DataProcessor import DataProcessor + from dfatool.lennart.DataProcessor import DataProcessor dp = DataProcessor(sync_data=self.sync_data, energy_data=self.energy_data) dp.run() @@ -1741,8 +1741,8 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer): super().__init__(voltage, state_duration, transition_names, with_traces) def load_data(self, log_data): - from data.timing.SigrokInterface import SigrokResult - from data.energy.EnergyInterface import EnergyInterface + from dfatool.lennart.SigrokInterface import SigrokResult + from dfatool.lennart.EnergyInterface import EnergyInterface # Daten laden self.sync_data = None @@ -1751,7 +1751,7 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer): pass def analyze_states(self, traces, offline_index: int): - from data.timing.SigrokInterface import SigrokResult + from dfatool.lennart.SigrokInterface import SigrokResult self.sync_data = SigrokResult.fromTraces(traces) return super().analyze_states(traces, offline_index) diff --git a/lib/runner.py b/lib/runner.py index 96627cf..0f44e97 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -17,7 +17,7 @@ import serial.threaded import subprocess import sys import time -from data.timing.SigrokCLIInterface import SigrokCLIInterface +from dfatool.lennart.SigrokCLIInterface import SigrokCLIInterface class SerialReader(serial.threaded.Protocol): -- cgit v1.2.3