diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2020-10-16 13:58:20 +0200 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2020-10-16 13:58:20 +0200 |
commit | 81082d00444d3bfd644f2417ef5c1a34d9569a28 (patch) | |
tree | 613ddc91d6aeaa551062260e907e47f813d18bff /lib/lennart/SigrokAPIInterface.py | |
parent | f5125e191ab1db62e4167b00f809dba20bc54b6f (diff) |
Code aus Lennarts BA-repo
Diffstat (limited to 'lib/lennart/SigrokAPIInterface.py')
-rw-r--r-- | lib/lennart/SigrokAPIInterface.py | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py new file mode 100644 index 0000000..a8765ba --- /dev/null +++ b/lib/lennart/SigrokAPIInterface.py @@ -0,0 +1,152 @@ +import time + +from dfatool.lennart.SigrokInterface import SigrokInterface + +import sigrok.core as sr +from sigrok.core.classes import * + +from util.ByteHelper import ByteHelper + + +class SigrokAPIInterface(SigrokInterface): + def datafeed_changes(self, device, packet): + """ + Callback type with changes analysis + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + for x in data[1::2]: + self.analyzeData(x) + + def datafeed_in_all(self, device, packet): + """ + Callback type which writes all data into the array + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + self.all_data += data[1::2] + + def datafeed_file(self, device, packet): + """ + Callback type which writes all data into a file + :param device: device object + :param packet: data (String with binary data) + """ + data = ByteHelper.rawbytes(self.output.receive(packet)) + if data: + # only using every second byte, + # because only every second contains the useful information. + for x in data[1::2]: + self.file.write(str(x) + "\n") + + def __init__( + self, + driver="fx2lafw", + sample_rate=100_000, + sample_count=1_000_000, + debug_output=False, + used_datafeed=datafeed_changes, + fake=False, + ): + """ + + :param driver: Driver that should be used + :param sample_rate: The sample rate of the Logic analyzer + :param sample_count: The sample count of the Logic analyzer + :param debug_output: Should be true if output should be displayed to user + :param used_datafeed: one of the datafeeds above, user later as callback. + :param fake: + """ + super(SigrokAPIInterface, self).__init__(sample_rate, sample_count) + if fake: + raise NotImplementedError("Not implemented!") + self.used_datafeed = used_datafeed + + self.debug_output = debug_output + self.session = None + + def forceStopMeasure(self): + """ + Force stopping the measurement + :return: None + """ + self.session.stop() + + def runMeasure(self): + """ + Start the Measurement and set all settings + """ + context = sr.Context_create() + + devs = context.drivers[self.driver].scan() + # print(devs) + if len(devs) == 0: + raise RuntimeError("No device with that driver found!") + sigrokDevice = devs[0] + if len(devs) > 1: + raise Warning( + "Attention! Multiple devices with that driver found! Using ", + sigrokDevice.connection_id(), + ) + + sigrokDevice.open() + sigrokDevice.config_set(ConfigKey.LIMIT_SAMPLES, self.sample_count) + sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate) + + enabled_channels = ["D1"] + for channel in sigrokDevice.channels: + channel.enabled = channel.name in enabled_channels + + self.session = context.create_session() + self.session.add_device(sigrokDevice) + self.session.start() + + self.output = context.output_formats["binary"].create_output(sigrokDevice) + + print(context.output_formats) + self.all_data = b"" + + def datafeed(device, packet): + self.used_datafeed(self, device, packet) + + self.session.add_datafeed_callback(datafeed) + time_running = time.time() + self.session.run() + total_time = time.time() - time_running + print( + "Used time: ", + total_time * 1_000_000, + "µs | sample/s: ", + self.sample_count / (total_time), + "Hz ", + ) + self.session.stop() + + if self.debug_output: + # if self.used_datafeed == self.datafeed_in_change: + if True: + changes = [x / self.sample_rate for x in self.changes] + print(changes) + is_on = self.start == 0xFF + print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW") + for x in range(len(changes) - 1): + is_on = not is_on + print( + changes[x], + " - ", + changes[x + 1], + " / ", + changes[x + 1] - changes[x], + " # Pin ", + "HIGH" if is_on else "LOW", + ) + elif self.used_datafeed == self.datafeed_in_all: + print(self.all_data) |