summaryrefslogtreecommitdiff
path: root/lib/lennart/SigrokAPIInterface.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/lennart/SigrokAPIInterface.py')
-rw-r--r--lib/lennart/SigrokAPIInterface.py150
1 files changed, 0 insertions, 150 deletions
diff --git a/lib/lennart/SigrokAPIInterface.py b/lib/lennart/SigrokAPIInterface.py
deleted file mode 100644
index 44da678..0000000
--- a/lib/lennart/SigrokAPIInterface.py
+++ /dev/null
@@ -1,150 +0,0 @@
-import time
-
-from dfatool.lennart.SigrokInterface import SigrokInterface
-
-import sigrok.core as sr
-from sigrok.core.classes import *
-
-from util.ByteHelper import ByteHelper
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SigrokAPIInterface(SigrokInterface):
- def datafeed_changes(self, device, packet):
- """
- Callback type with changes analysis
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- for x in data[1::2]:
- self.analyzeData(x)
-
- def datafeed_in_all(self, device, packet):
- """
- Callback type which writes all data into the array
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- self.all_data += data[1::2]
-
- def datafeed_file(self, device, packet):
- """
- Callback type which writes all data into a file
- :param device: device object
- :param packet: data (String with binary data)
- """
- data = ByteHelper.rawbytes(self.output.receive(packet))
- if data:
- # only using every second byte,
- # because only every second contains the useful information.
- for x in data[1::2]:
- self.file.write(str(x) + "\n")
-
- def __init__(
- self,
- driver="fx2lafw",
- sample_rate=100_000,
- debug_output=False,
- used_datafeed=datafeed_changes,
- fake=False,
- ):
- """
-
- :param driver: Driver that should be used
- :param sample_rate: The sample rate of the Logic analyzer
- :param debug_output: Should be true if output should be displayed to user
- :param used_datafeed: one of the datafeeds above, user later as callback.
- :param fake:
- """
- super(SigrokAPIInterface, self).__init__(sample_rate)
- if fake:
- raise NotImplementedError("Not implemented!")
- self.used_datafeed = used_datafeed
-
- self.debug_output = debug_output
- self.session = None
-
- def forceStopMeasure(self):
- """
- Force stopping the measurement
- :return: None
- """
- self.session.stop()
-
- def runMeasure(self):
- """
- Start the Measurement and set all settings
- """
- context = sr.Context_create()
-
- devs = context.drivers[self.driver].scan()
- # print(devs)
- if len(devs) == 0:
- raise RuntimeError("No device with that driver found!")
- sigrokDevice = devs[0]
- if len(devs) > 1:
- raise Warning(
- "Attention! Multiple devices with that driver found! Using ",
- sigrokDevice.connection_id(),
- )
-
- sigrokDevice.open()
- sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)
-
- enabled_channels = ["D1"]
- for channel in sigrokDevice.channels:
- channel.enabled = channel.name in enabled_channels
-
- self.session = context.create_session()
- self.session.add_device(sigrokDevice)
- self.session.start()
-
- self.output = context.output_formats["binary"].create_output(sigrokDevice)
-
- print(context.output_formats)
- self.all_data = b""
-
- def datafeed(device, packet):
- self.used_datafeed(self, device, packet)
-
- self.session.add_datafeed_callback(datafeed)
- time_running = time.time()
- self.session.run()
- total_time = time.time() - time_running
- print(
- "Used time: ",
- total_time * 1_000_000,
- "µs",
- )
- self.session.stop()
-
- if self.debug_output:
- # if self.used_datafeed == self.datafeed_in_change:
- if True:
- changes = [x / self.sample_rate for x in self.changes]
- print(changes)
- is_on = self.start == 0xFF
- print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
- for x in range(len(changes) - 1):
- is_on = not is_on
- print(
- changes[x],
- " - ",
- changes[x + 1],
- " / ",
- changes[x + 1] - changes[x],
- " # Pin ",
- "HIGH" if is_on else "LOW",
- )
- elif self.used_datafeed == self.datafeed_in_all:
- print(self.all_data)