summaryrefslogtreecommitdiff
path: root/lib/lennart/SigrokAPIInterface.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/lennart/SigrokAPIInterface.py')
-rw-r--r--lib/lennart/SigrokAPIInterface.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py
new file mode 100644
index 0000000..a8765ba
--- /dev/null
+++ b/lib/lennart/SigrokAPIInterface.py
@@ -0,0 +1,152 @@
+import time
+
+from dfatool.lennart.SigrokInterface import SigrokInterface
+
+import sigrok.core as sr
+from sigrok.core.classes import *
+
+from util.ByteHelper import ByteHelper
+
+
+class SigrokAPIInterface(SigrokInterface):
+ def datafeed_changes(self, device, packet):
+ """
+ Callback type with changes analysis
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ for x in data[1::2]:
+ self.analyzeData(x)
+
+ def datafeed_in_all(self, device, packet):
+ """
+ Callback type which writes all data into the array
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ self.all_data += data[1::2]
+
+ def datafeed_file(self, device, packet):
+ """
+ Callback type which writes all data into a file
+ :param device: device object
+ :param packet: data (String with binary data)
+ """
+ data = ByteHelper.rawbytes(self.output.receive(packet))
+ if data:
+ # only using every second byte,
+ # because only every second contains the useful information.
+ for x in data[1::2]:
+ self.file.write(str(x) + "\n")
+
+ def __init__(
+ self,
+ driver="fx2lafw",
+ sample_rate=100_000,
+ sample_count=1_000_000,
+ debug_output=False,
+ used_datafeed=datafeed_changes,
+ fake=False,
+ ):
+ """
+
+ :param driver: Driver that should be used
+ :param sample_rate: The sample rate of the Logic analyzer
+ :param sample_count: The sample count of the Logic analyzer
+ :param debug_output: Should be true if output should be displayed to user
+ :param used_datafeed: one of the datafeeds above, user later as callback.
+ :param fake:
+ """
+ super(SigrokAPIInterface, self).__init__(sample_rate, sample_count)
+ if fake:
+ raise NotImplementedError("Not implemented!")
+ self.used_datafeed = used_datafeed
+
+ self.debug_output = debug_output
+ self.session = None
+
+ def forceStopMeasure(self):
+ """
+ Force stopping the measurement
+ :return: None
+ """
+ self.session.stop()
+
+ def runMeasure(self):
+ """
+ Start the Measurement and set all settings
+ """
+ context = sr.Context_create()
+
+ devs = context.drivers[self.driver].scan()
+ # print(devs)
+ if len(devs) == 0:
+ raise RuntimeError("No device with that driver found!")
+ sigrokDevice = devs[0]
+ if len(devs) > 1:
+ raise Warning(
+ "Attention! Multiple devices with that driver found! Using ",
+ sigrokDevice.connection_id(),
+ )
+
+ sigrokDevice.open()
+ sigrokDevice.config_set(ConfigKey.LIMIT_SAMPLES, self.sample_count)
+ sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)
+
+ enabled_channels = ["D1"]
+ for channel in sigrokDevice.channels:
+ channel.enabled = channel.name in enabled_channels
+
+ self.session = context.create_session()
+ self.session.add_device(sigrokDevice)
+ self.session.start()
+
+ self.output = context.output_formats["binary"].create_output(sigrokDevice)
+
+ print(context.output_formats)
+ self.all_data = b""
+
+ def datafeed(device, packet):
+ self.used_datafeed(self, device, packet)
+
+ self.session.add_datafeed_callback(datafeed)
+ time_running = time.time()
+ self.session.run()
+ total_time = time.time() - time_running
+ print(
+ "Used time: ",
+ total_time * 1_000_000,
+ "µs | sample/s: ",
+ self.sample_count / (total_time),
+ "Hz ",
+ )
+ self.session.stop()
+
+ if self.debug_output:
+ # if self.used_datafeed == self.datafeed_in_change:
+ if True:
+ changes = [x / self.sample_rate for x in self.changes]
+ print(changes)
+ is_on = self.start == 0xFF
+ print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
+ for x in range(len(changes) - 1):
+ is_on = not is_on
+ print(
+ changes[x],
+ " - ",
+ changes[x + 1],
+ " / ",
+ changes[x + 1] - changes[x],
+ " # Pin ",
+ "HIGH" if is_on else "LOW",
+ )
+ elif self.used_datafeed == self.datafeed_in_all:
+ print(self.all_data)