summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2020-10-16 13:58:20 +0200
committerDaniel Friesel <daniel.friesel@uos.de>2020-10-16 13:58:20 +0200
commit81082d00444d3bfd644f2417ef5c1a34d9569a28 (patch)
tree613ddc91d6aeaa551062260e907e47f813d18bff /lib
parentf5125e191ab1db62e4167b00f809dba20bc54b6f (diff)
Code aus Lennarts BA-repo
Diffstat (limited to 'lib')
-rw-r--r--lib/lennart/DataInterface.py24
-rw-r--r--lib/lennart/DataProcessor.py374
-rw-r--r--lib/lennart/EnergyInterface.py119
-rw-r--r--lib/lennart/SigrokAPIInterface.py152
-rw-r--r--lib/lennart/SigrokCLIInterface.py78
-rw-r--r--lib/lennart/SigrokInterface.py165
-rw-r--r--lib/lennart/__init__.py0
-rw-r--r--lib/loader.py12
-rw-r--r--lib/runner.py2
9 files changed, 919 insertions, 7 deletions
diff --git a/lib/lennart/DataInterface.py b/lib/lennart/DataInterface.py
new file mode 100644
index 0000000..4495db2
--- /dev/null
+++ b/lib/lennart/DataInterface.py
@@ -0,0 +1,24 @@
+class DataInterface:
+ def runMeasure(self):
+ """
+ Implemented in subclasses.
+
+ Starts the measurement
+ """
+ raise NotImplementedError("The method not implemented")
+
+ def getData(self):
+ """
+ Implemented in subclasses
+
+ :returns: gathered data
+ """
+ raise NotImplementedError("The method not implemented")
+
+ def forceStopMeasure(self):
+ """
+ Implemented in subclasses
+
+ Force stops the measurement
+ """
+ raise NotImplementedError("The method not implemented")
diff --git a/lib/lennart/DataProcessor.py b/lib/lennart/DataProcessor.py
new file mode 100644
index 0000000..df3f41c
--- /dev/null
+++ b/lib/lennart/DataProcessor.py
@@ -0,0 +1,374 @@
+class DataProcessor:
+ def __init__(self, sync_data, energy_data):
+ """
+ Creates DataProcessor object.
+
+ :param sync_data: input timestamps (SigrokResult)
+ :param energy_data: List of EnergyTrace datapoints
+ """
+ self.reduced_timestamps = []
+ self.modified_timestamps = []
+ self.plot_data_x = []
+ self.plot_data_y = []
+ self.sync_data = sync_data
+ self.energy_data = energy_data
+ self.start_offset = 0
+
+ self.power_sync_watt = 0.011
+ self.power_sync_len = 0.7
+ self.power_sync_max_outliers = 2
+
+ def run(self):
+ """
+ Main Function to remove unwanted data, get synchronization points, add the offset and add drift.
+ :return: None
+ """
+ # remove Dirty Data from previously running program (happens if logic Analyzer Measurement starts earlier than
+ # the HW Reset from energytrace)
+ use_data_after_index = 0
+ for x in range(1, len(self.sync_data.timestamps)):
+ if self.sync_data.timestamps[x] - self.sync_data.timestamps[x - 1] > 1.3:
+ use_data_after_index = x
+ break
+
+ time_stamp_data = self.sync_data.timestamps[use_data_after_index:]
+
+ last_data = [0, 0, 0, 0]
+
+ # clean timestamp data, if at the end strange ts got added somehow
+ time_stamp_data = self.removeTooFarDatasets(time_stamp_data)
+
+ self.reduced_timestamps = time_stamp_data
+
+ # NEW
+ datasync_timestamps = []
+ sync_start = 0
+ outliers = 0
+ pre_outliers_ts = None
+ for i, energytrace_dataset in enumerate(self.energy_data):
+ usedtime = energytrace_dataset[0] - last_data[0] # in microseconds
+ timestamp = energytrace_dataset[0]
+ usedenergy = energytrace_dataset[3] - last_data[3]
+ power = usedenergy / usedtime * 10 ** -3 # in watts
+ if power > 0:
+ if power > self.power_sync_watt:
+ if sync_start is None:
+ sync_start = timestamp
+ outliers = 0
+ else:
+ # Sync point over or outliers
+ if outliers == 0:
+ pre_outliers_ts = timestamp
+ outliers += 1
+ if outliers > self.power_sync_max_outliers:
+ if sync_start is not None:
+ if (
+ pre_outliers_ts - sync_start
+ ) / 1_000_000 > self.power_sync_len:
+ datasync_timestamps.append(
+ (
+ sync_start / 1_000_000,
+ pre_outliers_ts / 1_000_000,
+ )
+ )
+ sync_start = None
+
+ last_data = energytrace_dataset
+
+ self.plot_data_x.append(energytrace_dataset[0] / 1_000_000)
+ self.plot_data_y.append(power)
+
+ if power > self.power_sync_watt:
+ if (self.energy_data[-1][0] - sync_start) / 1_000_000 > self.power_sync_len:
+ datasync_timestamps.append(
+ (sync_start / 1_000_000, pre_outliers_ts / 1_000_000)
+ )
+
+ # print("SYNC SPOTS: ", datasync_timestamps)
+ # print(time_stamp_data[2])
+
+ start_offset = datasync_timestamps[0][1] - time_stamp_data[2]
+ start_timestamp = datasync_timestamps[0][1]
+
+ end_offset = datasync_timestamps[-2][0] - (time_stamp_data[-8] + start_offset)
+ end_timestamp = datasync_timestamps[-2][0]
+ print(start_timestamp, end_timestamp)
+
+ # print(start_offset, start_timestamp, end_offset, end_timestamp)
+
+ with_offset = self.addOffset(time_stamp_data, start_offset)
+
+ with_drift = self.addDrift(
+ with_offset, end_timestamp, end_offset, start_timestamp
+ )
+
+ self.modified_timestamps = with_drift
+
+ def addOffset(self, input_timestamps, start_offset):
+ """
+ Add begin offset at start
+
+ :param input_timestamps: List of timestamps (float list)
+ :param start_offset: Timestamp of last EnergyTrace datapoint at the first sync point
+ :return: List of modified timestamps (float list)
+ """
+ modified_timestamps_with_offset = []
+ for x in input_timestamps:
+ if x + start_offset >= 0:
+ modified_timestamps_with_offset.append(x + start_offset)
+ return modified_timestamps_with_offset
+
+ def removeTooFarDatasets(self, input_timestamps):
+ """
+ Removing datasets, that are to far away at ethe end
+
+ :param input_timestamps: List of timestamps (float list)
+ :return: List of modified timestamps (float list)
+ """
+ modified_timestamps = []
+ for i, x in enumerate(input_timestamps):
+ # print(x - input_timestamps[i - 1], x - input_timestamps[i - 1] < 2.5)
+ if x - input_timestamps[i - 1] < 1.6:
+ modified_timestamps.append(x)
+ else:
+ break
+ return modified_timestamps
+
+ def addDrift(self, input_timestamps, end_timestamp, end_offset, start_timestamp):
+ """
+ Add drift to datapoints
+
+ :param input_timestamps: List of timestamps (float list)
+ :param end_timestamp: Timestamp of first EnergyTrace datapoint at the second last sync point
+ :param end_offset: the time between end_timestamp and the timestamp of synchronisation signal
+ :param start_timestamp: Timestamp of last EnergyTrace datapoint at the first sync point
+ :return: List of modified timestamps (float list)
+ """
+ endFactor = (end_timestamp + end_offset - start_timestamp) / (
+ end_timestamp - start_timestamp
+ )
+ modified_timestamps_with_drift = []
+ for x in input_timestamps:
+ modified_timestamps_with_drift.append(
+ ((x - start_timestamp) * endFactor) + start_timestamp
+ )
+
+ return modified_timestamps_with_drift
+
+ def plot(self, annotateData=None):
+ """
+ Plots the power usage and the timestamps by logic analyzer
+
+ :param annotateData: List of Strings with labels, only needed if annotated plots are wished
+ :return: None
+ """
+
+ def calculateRectangleCurve(timestamps, min_value=0, max_value=0.160):
+ import numpy as np
+
+ data = []
+ for ts in timestamps:
+ data.append(ts)
+ data.append(ts)
+
+ a = np.empty((len(data),))
+ a[1::4] = max_value
+ a[2::4] = max_value
+ a[3::4] = min_value
+ a[4::4] = min_value
+ return data, a # plotting by columns
+
+ import matplotlib.pyplot as plt
+
+ fig, ax = plt.subplots()
+
+ if annotateData:
+ annot = ax.annotate(
+ "",
+ xy=(0, 0),
+ xytext=(20, 20),
+ textcoords="offset points",
+ bbox=dict(boxstyle="round", fc="w"),
+ arrowprops=dict(arrowstyle="->"),
+ )
+ annot.set_visible(True)
+
+ rectCurve_with_drift = calculateRectangleCurve(
+ self.modified_timestamps, max_value=max(self.plot_data_y)
+ )
+
+ plt.plot(self.plot_data_x, self.plot_data_y, label="Leistung")
+
+ plt.plot(
+ rectCurve_with_drift[0],
+ rectCurve_with_drift[1],
+ "-g",
+ label="Synchronisationsignale mit Driftfaktor",
+ )
+
+ plt.xlabel("Zeit [s]")
+ plt.ylabel("Leistung [W]")
+ leg = plt.legend()
+
+ def getDataText(x):
+ # print(x)
+ for i, xt in enumerate(self.modified_timestamps):
+ if xt > x:
+ return "Value: %s" % annotateData[i - 5]
+
+ def update_annot(x, y, name):
+ annot.xy = (x, y)
+ text = name
+
+ annot.set_text(text)
+ annot.get_bbox_patch().set_alpha(0.4)
+
+ def hover(event):
+ if event.xdata and event.ydata:
+ annot.set_visible(False)
+ update_annot(event.xdata, event.ydata, getDataText(event.xdata))
+ annot.set_visible(True)
+ fig.canvas.draw_idle()
+
+ if annotateData:
+ fig.canvas.mpl_connect("motion_notify_event", hover)
+
+ plt.show()
+
+ def getPowerBetween(self, start, end, state_sleep): # 0.001469
+ """
+ calculates the average powerusage in interval
+ NOT SIDE EFFECT FREE, DON'T USE IT EVERYWHERE
+
+ :param start: Start timestamp of interval
+ :param end: End timestamp of interval
+ :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle
+ :return: float with average power usage
+ """
+ first_index = 0
+ all_power = []
+ for ind in range(self.start_offset, len(self.plot_data_x)):
+ first_index = ind
+ if self.plot_data_x[ind] > start:
+ break
+
+ nextIndAfterIndex = None
+ for ind in range(first_index, len(self.plot_data_x)):
+ nextIndAfterIndex = ind
+ if (
+ self.plot_data_x[ind] > end
+ or self.plot_data_x[ind] > start + state_sleep
+ ):
+ self.start_offset = ind - 1
+ break
+ all_power.append(self.plot_data_y[ind])
+
+ # TODO Idea remove datapoints that are too far away
+ def removeSD_Mean_Values(arr):
+ import numpy
+
+ elements = numpy.array(arr)
+
+ mean = numpy.mean(elements, axis=0)
+ sd = numpy.std(elements, axis=0)
+
+ return [x for x in arr if (mean - 1 * sd < x < mean + 1.5 * sd)]
+
+ if len(all_power) > 10:
+ # all_power = removeSD_Mean_Values(all_power)
+ pass
+ # TODO algorithm relocate datapoint
+
+ pre_fix_len = len(all_power)
+ if len(all_power) == 0:
+ # print("PROBLEM")
+ all_power.append(self.plot_data_y[nextIndAfterIndex])
+ elif len(all_power) == 1:
+ # print("OKAY")
+ pass
+ return pre_fix_len, sum(all_power) / len(all_power)
+
+ def getStatesdfatool(self, state_sleep, algorithm=False):
+ """
+ Calculates the length and energy usage of the states
+
+ :param state_sleep: Length in seconds of one state, needed for cutting out the UART Sending cycle
+ :param algorithm: possible usage of accuracy algorithm / not implemented yet
+ :returns: returns list of states and transitions, starting with a transition and ending with astate
+ Each element is a dict containing:
+ * `isa`: 'state' or 'transition'
+ * `W_mean`: Mittelwert der Leistungsaufnahme
+ * `W_std`: Standardabweichung der Leistungsaufnahme
+ * `s`: Dauer
+ """
+ if algorithm:
+ raise NotImplementedError
+ end_transition_ts = None
+ timestamps_sync_start = 0
+ energy_trace_new = list()
+
+ for ts_index in range(
+ 0 + timestamps_sync_start, int(len(self.modified_timestamps) / 2)
+ ):
+ start_transition_ts = self.modified_timestamps[ts_index * 2]
+ start_transition_ts_timing = self.reduced_timestamps[ts_index * 2]
+
+ if end_transition_ts is not None:
+ count_dp, power = self.getPowerBetween(
+ end_transition_ts, start_transition_ts, state_sleep
+ )
+
+ # print("STATE", end_transition_ts * 10 ** 6, start_transition_ts * 10 ** 6, (start_transition_ts - end_transition_ts) * 10 ** 6, power)
+ if (
+ (start_transition_ts - end_transition_ts) * 10 ** 6 > 900_000
+ and power > self.power_sync_watt * 0.9
+ and ts_index > 10
+ ):
+ # remove last transition and stop (upcoming data only sync)
+ del energy_trace_new[-1]
+ break
+ pass
+
+ state = {
+ "isa": "state",
+ "W_mean": power,
+ "W_std": 0.0001,
+ "s": (
+ start_transition_ts_timing - end_transition_ts_timing
+ ), # * 10 ** 6,
+ }
+ energy_trace_new.append(state)
+
+ energy_trace_new[-2]["W_mean_delta_next"] = (
+ energy_trace_new[-2]["W_mean"] - energy_trace_new[-1]["W_mean"]
+ )
+
+ # get energy end_transition_ts
+ end_transition_ts = self.modified_timestamps[ts_index * 2 + 1]
+ count_dp, power = self.getPowerBetween(
+ start_transition_ts, end_transition_ts, state_sleep
+ )
+
+ # print("TRANS", start_transition_ts * 10 ** 6, end_transition_ts * 10 ** 6, (end_transition_ts - start_transition_ts) * 10 ** 6, power)
+ end_transition_ts_timing = self.reduced_timestamps[ts_index * 2 + 1]
+
+ transition = {
+ "isa": "transition",
+ "W_mean": power,
+ "W_std": 0.0001,
+ "s": (
+ end_transition_ts_timing - start_transition_ts_timing
+ ), # * 10 ** 6,
+ "count_dp": count_dp,
+ }
+
+ if (end_transition_ts - start_transition_ts) * 10 ** 6 > 2_000_000:
+ # TODO Last data set corrupted? HOT FIX!!!!!!!!!!!! REMOVE LATER
+ # for x in range(4):
+ # del energy_trace_new[-1]
+ # break
+ pass
+
+ energy_trace_new.append(transition)
+ # print(start_transition_ts, "-", end_transition_ts, "-", end_transition_ts - start_transition_ts)
+ return energy_trace_new
diff --git a/lib/lennart/EnergyInterface.py b/lib/lennart/EnergyInterface.py
new file mode 100644
index 0000000..2b23667
--- /dev/null
+++ b/lib/lennart/EnergyInterface.py
@@ -0,0 +1,119 @@
+import re
+import subprocess
+
+from dfatool.lennart.DataInterface import DataInterface
+
+
+class EnergyInterface(DataInterface):
+ def __init__(
+ self,
+ duration_seconds=10,
+ console_output=False,
+ temp_file="temp/energytrace.log",
+ fake=False,
+ ):
+ """
+ class is not used in embedded into dfatool.
+
+ :param duration_seconds: seconds the EnergyTrace should be running
+ :param console_output: if EnergyTrace output should be printed to the user
+ :param temp_file: file path for temporary file
+ :param fake: if already existing file should be used
+ """
+ self.energytrace = None
+ self.duration_seconds = duration_seconds
+ self.console_output = console_output
+ self.temp_file = temp_file
+ self.fake = fake
+
+ def runMeasure(self):
+ """
+ starts the measurement, with waiting for done
+ """
+ if self.fake:
+ return
+ self.runMeasureAsynchronously()
+ self.waitForAsynchronousMeasure()
+
+ def runMeasureAsynchronously(self):
+ """
+ starts the measurement, not waiting for done
+ """
+ if self.fake:
+ return
+ self.energytrace = subprocess.Popen(
+ "msp430-etv --save %s %s %s"
+ % (
+ self.temp_file,
+ self.duration_seconds,
+ "" if self.console_output else "> /dev/null",
+ ),
+ shell=True,
+ )
+ print(
+ "msp430-etv --save %s %s %s"
+ % (
+ self.temp_file,
+ self.duration_seconds,
+ "" if self.console_output else "> /dev/null",
+ )
+ )
+
+ def waitForAsynchronousMeasure(self):
+ """
+ Wait until is command call is done
+ """
+ if self.fake:
+ return
+ self.energytrace.wait()
+
+ def getData(self):
+ """
+ cleans the string data and creates int list
+ :return: list of data, in format [[int,int,int,int], [int,int,int,int], ... ]
+ """
+ energytrace_log = open(self.temp_file)
+ lines = energytrace_log.readlines()[21:]
+ data = []
+ for line in lines:
+ if "MSP430_DisableEnergyTrace" in line:
+ break
+ else:
+ data.append([int(i) for i in line.split()])
+ return data
+
+ @classmethod
+ def getDataFromString(cls, string, delimiter="\\n"):
+ """
+ Parsing the data from string
+
+ :param string: input string which will be parsed
+ :param delimiter: for normal file its \n
+ :return: list of data, in format [[int,int,int,int], [int,int,int,int], ... ]
+ """
+ lines = string.split(delimiter)[21:]
+ data = []
+ for line in lines:
+ if "MSP430_DisableEnergyTrace" in line:
+ break
+ else:
+ data.append([int(i) for i in line.split()])
+ return data
+
+ def setFile(self, path):
+ """
+ changeing the temporary file
+
+ :param path: file path of new temp file
+ :return: None
+ """
+ self.temp_file = path
+ pass
+
+ def forceStopMeasure(self):
+ """
+ force stops the Measurement, with signals
+ :return: None
+ """
+ self.energytrace.send_signal(subprocess.signal.SIGINT)
+ stdout, stderr = self.energytrace.communicate(timeout=15)
diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py
new file mode 100644
index 0000000..a8765ba
--- /dev/null
+++ b/lib/lennart/SigrokAPIInterface.py
@@ -0,0 +1,152 @@
+import time
+
+from dfatool.lennart.SigrokInterface import SigrokInterface
+
+import sigrok.core as sr
+from sigrok.core.classes import *
+
+from util.ByteHelper import ByteHelper
+
+
+class SigrokAPIInterface(SigrokInterface):
+ def datafeed_changes(self, device, packet):
+ """
+ Callback type with changes analysis
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ for x in data[1::2]:
+ self.analyzeData(x)
+
+ def datafeed_in_all(self, device, packet):
+ """
+ Callback type which writes all data into the array
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ self.all_data += data[1::2]
+
+ def datafeed_file(self, device, packet):
+ """
+ Callback type which writes all data into a file
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ for x in data[1::2]:
+ self.file.write(str(x) + "\n")
+
+ def __init__(
+ self,
+ driver="fx2lafw",
+ sample_rate=100_000,
+ sample_count=1_000_000,
+ debug_output=False,
+ used_datafeed=datafeed_changes,
+ fake=False,
+ ):
+ """
+
+ :param driver: Driver that should be used
+ :param sample_rate: The sample rate of the Logic analyzer
+ :param sample_count: The sample count of the Logic analyzer
+ :param debug_output: Should be true if output should be displayed to user
+ :param used_datafeed: one of the datafeeds above, user later as callback.
+ :param fake:
+ """
+ super(SigrokAPIInterface, self).__init__(sample_rate, sample_count)
+ if fake:
+ raise NotImplementedError("Not implemented!")
+ self.used_datafeed = used_datafeed
+
+ self.debug_output = debug_output
+ self.session = None
+
+ def forceStopMeasure(self):
+ """
+ Force stopping the measurement
+ :return: None
+ """
+ self.session.stop()
+
+ def runMeasure(self):
+ """
+ Start the Measurement and set all settings
+ """
+ context = sr.Context_create()
+
+ devs = context.drivers[self.driver].scan()
+ # print(devs)
+ if len(devs) == 0:
+ raise RuntimeError("No device with that driver found!")
+ sigrokDevice = devs[0]
+ if len(devs) > 1:
+ raise Warning(
+ "Attention! Multiple devices with that driver found! Using ",
+ sigrokDevice.connection_id(),
+ )
+
+ sigrokDevice.open()
+ sigrokDevice.config_set(ConfigKey.LIMIT_SAMPLES, self.sample_count)
+ sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)
+
+ enabled_channels = ["D1"]
+ for channel in sigrokDevice.channels:
+ channel.enabled = channel.name in enabled_channels
+
+ self.session = context.create_session()
+ self.session.add_device(sigrokDevice)
+ self.session.start()
+
+ self.output = context.output_formats["binary"].create_output(sigrokDevice)
+
+ print(context.output_formats)
+ self.all_data = b""
+
+ def datafeed(device, packet):
+ self.used_datafeed(self, device, packet)
+
+ self.session.add_datafeed_callback(datafeed)
+ time_running = time.time()
+ self.session.run()
+ total_time = time.time() - time_running
+ print(
+ "Used time: ",
+ total_time * 1_000_000,
+ "µs | sample/s: ",
+ self.sample_count / (total_time),
+ "Hz ",
+ )
+ self.session.stop()
+
+ if self.debug_output:
+ # if self.used_datafeed == self.datafeed_in_change:
+ if True:
+ changes = [x / self.sample_rate for x in self.changes]
+ print(changes)
+ is_on = self.start == 0xFF
+ print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
+ for x in range(len(changes) - 1):
+ is_on = not is_on
+ print(
+ changes[x],
+ " - ",
+ changes[x + 1],
+ " / ",
+ changes[x + 1] - changes[x],
+ " # Pin ",
+ "HIGH" if is_on else "LOW",
+ )
+ elif self.used_datafeed == self.datafeed_in_all:
+ print(self.all_data)
diff --git a/lib/lennart/SigrokCLIInterface.py b/lib/lennart/SigrokCLIInterface.py
new file mode 100644
index 0000000..873b226
--- /dev/null
+++ b/lib/lennart/SigrokCLIInterface.py
@@ -0,0 +1,78 @@
+import subprocess
+import time
+
+from dfatool.lennart.SigrokInterface import SigrokInterface
+
+
+class SigrokCLIInterface(SigrokInterface):
+ def __init__(
+ self,
+ bin_temp_file="temp/out.bin",
+ sample_rate=100_000,
+ sample_count=1_000_000,
+ fake=False,
+ ):
+ """
+ creates SigrokCLIInterface object. Uses the CLI Interface (Command: sigrok-cli)
+
+ :param bin_temp_file: temporary file for binary output
+ :param sample_rate: The sample rate of the Logic analyzer
+ :param sample_count: The sample count of the Logic analyzer
+ :param fake: if it should use existing data
+ """
+ super(SigrokCLIInterface, self).__init__(sample_rate, sample_count)
+ self.fake = fake
+ self.bin_temp_file = bin_temp_file
+ self.sigrok_cli_thread = None
+
+ def forceStopMeasure(self):
+ """
+ Force stopping measure, sometimes needs pkill for killing definitly
+ :return: None
+ """
+ self.sigrok_cli_thread.send_signal(subprocess.signal.SIGKILL)
+ stdout, stderr = self.sigrok_cli_thread.communicate(timeout=15)
+ time.sleep(5)
+ # TODO not nice solution, make better
+ import os
+
+ os.system("pkill -f sigrok")
+ self.runOpenAnalyze()
+
+ def runMeasure(self):
+ """
+ starts the measurement, with waiting for done
+ """
+ if not self.fake:
+ self.runMeasureAsynchronous()
+ self.waitForAsynchronousMeasure()
+
+ def runMeasureAsynchronous(self):
+ """
+ starts the measurement, not waiting for done
+ """
+ shellcommand = (
+ 'sigrok-cli --output-file %s --output-format binary --samples %s -d %s --config "samplerate=%s Hz"'
+ % (self.bin_temp_file, self.sample_count, self.driver, self.sample_rate)
+ )
+ self.sigrok_cli_thread = subprocess.Popen(shellcommand, shell=True)
+
+ def waitForAsynchronousMeasure(self):
+ """
+ Wait until is command call is done
+ """
+ if not self.fake:
+ self.sigrok_cli_thread.wait()
+ self.runOpenAnalyze()
+
+ def runOpenAnalyze(self):
+ """
+ Opens the generated binary file and parses it byte by byte
+
+ """
+ in_file = open(self.bin_temp_file, "rb") # opening for [r]eading as [b]inary
+ data = in_file.read() # if you only wanted to read 512 bytes, do .read(512)
+ in_file.close()
+
+ for x in data:
+ self.analyzeData(x)
diff --git a/lib/lennart/SigrokInterface.py b/lib/lennart/SigrokInterface.py
new file mode 100644
index 0000000..555a25f
--- /dev/null
+++ b/lib/lennart/SigrokInterface.py
@@ -0,0 +1,165 @@
+import json
+
+from dfatool.lennart.DataInterface import DataInterface
+
+
+# Adding additional parsing functionality
+class SigrokResult:
+ def __init__(self, timestamps, onbeforefirstchange):
+ """
+ Creates SigrokResult object, struct for timestamps and onBeforeFirstChange.
+
+ :param timestamps: list of changing timestamps
+ :param onbeforefirstchange: if the state before the first change is already on / should always be off, just to be data correct
+ """
+ self.timestamps = timestamps
+ self.onBeforeFirstChange = onbeforefirstchange
+
+ def __str__(self):
+ """
+ :return: string representation of object
+ """
+ return "<Sigrok Result onBeforeFirstChange=%s timestamps=%s>" % (
+ self.onBeforeFirstChange,
+ self.timestamps,
+ )
+
+ def getDict(self):
+ """
+ :return: dict representation of object
+ """
+ data = {
+ "onBeforeFirstChange": self.onBeforeFirstChange,
+ "timestamps": self.timestamps,
+ }
+ return data
+
+ @classmethod
+ def fromFile(cls, path):
+ """
+ Generates SigrokResult from json_file
+
+ :param path: file path
+ :return: SigrokResult object
+ """
+ with open(path) as json_file:
+ data = json.load(json_file)
+ return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
+ pass
+
+ @classmethod
+ def fromString(cls, string):
+ """
+ Generates SigrokResult from string
+
+ :param string: string
+ :return: SigrokResult object
+ """
+ data = json.loads(string)
+ return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
+ pass
+
+ @classmethod
+ def fromTraces(cls, traces):
+ """
+ Generates SigrokResult from ptalog.json traces
+
+ :param traces: traces from dfatool ptalog.json
+ :return: SigrokResult object
+ """
+ timestamps = [0]
+ for tr in traces:
+ for t in tr["trace"]:
+ # print(t['online_aggregates']['duration'][0])
+ timestamps.append(
+ timestamps[-1] + (t["online_aggregates"]["duration"][0] * 10 ** -6)
+ )
+
+ # print(timestamps)
+ # prepend FAKE Sync point
+ t_neu = [0.0, 0.0000001, 1.0, 1.00000001]
+ for i, x in enumerate(timestamps):
+ t_neu.append(
+ round(float(x) + t_neu[3] + 0.20, 6)
+ ) # list(map(float, t_ist.split(",")[:i+1]))
+
+ # append FAKE Sync point / eine überschneidung
+ # [30.403632, 30.403639, 31.407265, 31.407271]
+ # appendData = [29.144855,30.148495,30.148502,30.403632,30.403639,31.407265,31.407271,]
+ appendData = [0, 1.000001, 1.000002, 1.25, 1.2500001]
+
+ # TODO future work here, why does the sync not work completely
+ t_neu[-1] = (
+ t_neu[-2] + (t_neu[-1] - t_neu[-2]) * 0.9
+ ) # Weird offset failure with UART stuff
+
+ offset = t_neu[-1] - appendData[0]
+ for x in appendData:
+ t_neu.append(x + offset)
+
+ # print(t_neu)
+ print(len(t_neu))
+ return SigrokResult(t_neu, False)
+
+
+class SigrokInterface(DataInterface):
+ def __init__(
+ self, sample_rate, sample_count, driver="fx2lafw", filename="temp/sigrok.log"
+ ):
+ """
+
+ :param sample_rate: Samplerate of the Logic Analyzer
+ :param sample_count: Count of samples
+ :param driver: for many Logic Analyzer from Saleae the "fx2lafw" should be working
+ :param filename: temporary file name
+ """
+ # options
+ self.sample_count = sample_count
+ self.sample_rate = sample_rate
+ self.file = open(filename, "w+")
+ self.driver = driver
+
+ # internal data
+ self.changes = []
+ self.start = None
+ self.last_val = None
+ self.index = 0
+
+ def runMeasure(self):
+ """
+ Not implemented because implemented in subclasses
+ :return: None
+ """
+ raise NotImplementedError("The method not implemented")
+
+ def forceStopMeasure(self):
+ """
+ Not implemented because implemented in subclasses
+ :return: None
+ """
+ raise NotImplementedError("The method not implemented")
+
+ def getData(self):
+ """
+
+ :return:
+ """
+ # return sigrok_energy_api_result(self.changes, True if self.start == 0xff else False)
+ return SigrokResult(
+ [x / self.sample_rate for x in self.changes],
+ True if self.start == 0xFF else False,
+ )
+
+ def analyzeData(self, byte):
+ """
+ analyze one byte if it differs from the last byte, it will be appended to changes.
+
+ :param byte: one byte to analyze
+ """
+ if self.start is None:
+ self.start = byte
+ self.last_val = byte
+ if byte != self.last_val:
+ self.changes.append(self.index)
+ self.last_val = byte
+ self.index += 1
diff --git a/lib/lennart/__init__.py b/lib/lennart/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lennart/__init__.py
diff --git a/lib/loader.py b/lib/loader.py
index fcd5490..568c98c 100644
--- a/lib/loader.py
+++ b/lib/loader.py
@@ -1638,8 +1638,8 @@ class EnergyTraceWithLogicAnalyzer:
self.errors = list()
def load_data(self, log_data):
- from data.timing.SigrokInterface import SigrokResult
- from data.energy.EnergyInterface import EnergyInterface
+ from dfatool.lennart.SigrokInterface import SigrokResult
+ from dfatool.lennart.EnergyInterface import EnergyInterface
# Daten laden
self.sync_data = SigrokResult.fromString(log_data[0])
@@ -1677,7 +1677,7 @@ class EnergyTraceWithLogicAnalyzer:
for state_or_transition in trace["trace"]:
names.append(state_or_transition["name"])
# print(names[:15])
- from data.processing.DataProcessor import DataProcessor
+ from dfatool.lennart.DataProcessor import DataProcessor
dp = DataProcessor(sync_data=self.sync_data, energy_data=self.energy_data)
dp.run()
@@ -1741,8 +1741,8 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer):
super().__init__(voltage, state_duration, transition_names, with_traces)
def load_data(self, log_data):
- from data.timing.SigrokInterface import SigrokResult
- from data.energy.EnergyInterface import EnergyInterface
+ from dfatool.lennart.SigrokInterface import SigrokResult
+ from dfatool.lennart.EnergyInterface import EnergyInterface
# Daten laden
self.sync_data = None
@@ -1751,7 +1751,7 @@ class EnergyTraceWithTimer(EnergyTraceWithLogicAnalyzer):
pass
def analyze_states(self, traces, offline_index: int):
- from data.timing.SigrokInterface import SigrokResult
+ from dfatool.lennart.SigrokInterface import SigrokResult
self.sync_data = SigrokResult.fromTraces(traces)
return super().analyze_states(traces, offline_index)
diff --git a/lib/runner.py b/lib/runner.py
index 96627cf..0f44e97 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -17,7 +17,7 @@ import serial.threaded
import subprocess
import sys
import time
-from data.timing.SigrokCLIInterface import SigrokCLIInterface
+from dfatool.lennart.SigrokCLIInterface import SigrokCLIInterface
class SerialReader(serial.threaded.Protocol):