summaryrefslogtreecommitdiff
path: root/lib/lennart/SigrokAPIInterface.py
blob: a8765ba8ad0f539b571aede7a96b6f10c3e24b20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import time

from dfatool.lennart.SigrokInterface import SigrokInterface

import sigrok.core as sr
from sigrok.core.classes import *

from util.ByteHelper import ByteHelper


class SigrokAPIInterface(SigrokInterface):
    def datafeed_changes(self, device, packet):
        """
        Callback type with changes analysis
        :param device: device object
        :param packet: data (String with binary data)
        """
        data = ByteHelper.rawbytes(self.output.receive(packet))
        if data:
            # only using every second byte,
            # because only every second contains the useful information.
            for x in data[1::2]:
                self.analyzeData(x)

    def datafeed_in_all(self, device, packet):
        """
        Callback type which writes all data into the array
        :param device: device object
        :param packet: data (String with binary data)
        """
        data = ByteHelper.rawbytes(self.output.receive(packet))
        if data:
            # only using every second byte,
            # because only every second contains the useful information.
            self.all_data += data[1::2]

    def datafeed_file(self, device, packet):
        """
        Callback type which writes all data into a file
        :param device: device object
        :param packet: data (String with binary data)
        """
        data = ByteHelper.rawbytes(self.output.receive(packet))
        if data:
            # only using every second byte,
            # because only every second contains the useful information.
            for x in data[1::2]:
                self.file.write(str(x) + "\n")

    def __init__(
        self,
        driver="fx2lafw",
        sample_rate=100_000,
        sample_count=1_000_000,
        debug_output=False,
        used_datafeed=datafeed_changes,
        fake=False,
    ):
        """

        :param driver: Driver that should be used
        :param sample_rate: The sample rate of the Logic analyzer
        :param sample_count: The sample count of the Logic analyzer
        :param debug_output: Should be true if output should be displayed to user
        :param used_datafeed: one of the datafeeds above, user later as callback.
        :param fake:
        """
        super(SigrokAPIInterface, self).__init__(sample_rate, sample_count)
        if fake:
            raise NotImplementedError("Not implemented!")
        self.used_datafeed = used_datafeed

        self.debug_output = debug_output
        self.session = None

    def forceStopMeasure(self):
        """
        Force stopping the measurement
        :return: None
        """
        self.session.stop()

    def runMeasure(self):
        """
        Start the Measurement and set all settings
        """
        context = sr.Context_create()

        devs = context.drivers[self.driver].scan()
        # print(devs)
        if len(devs) == 0:
            raise RuntimeError("No device with that driver found!")
        sigrokDevice = devs[0]
        if len(devs) > 1:
            raise Warning(
                "Attention! Multiple devices with that driver found! Using ",
                sigrokDevice.connection_id(),
            )

        sigrokDevice.open()
        sigrokDevice.config_set(ConfigKey.LIMIT_SAMPLES, self.sample_count)
        sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)

        enabled_channels = ["D1"]
        for channel in sigrokDevice.channels:
            channel.enabled = channel.name in enabled_channels

        self.session = context.create_session()
        self.session.add_device(sigrokDevice)
        self.session.start()

        self.output = context.output_formats["binary"].create_output(sigrokDevice)

        print(context.output_formats)
        self.all_data = b""

        def datafeed(device, packet):
            self.used_datafeed(self, device, packet)

        self.session.add_datafeed_callback(datafeed)
        time_running = time.time()
        self.session.run()
        total_time = time.time() - time_running
        print(
            "Used time: ",
            total_time * 1_000_000,
            "µs | sample/s: ",
            self.sample_count / (total_time),
            "Hz ",
        )
        self.session.stop()

        if self.debug_output:
            # if self.used_datafeed == self.datafeed_in_change:
            if True:
                changes = [x / self.sample_rate for x in self.changes]
                print(changes)
                is_on = self.start == 0xFF
                print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
                for x in range(len(changes) - 1):
                    is_on = not is_on
                    print(
                        changes[x],
                        " - ",
                        changes[x + 1],
                        " / ",
                        changes[x + 1] - changes[x],
                        " # Pin ",
                        "HIGH" if is_on else "LOW",
                    )
            elif self.used_datafeed == self.datafeed_in_all:
                print(self.all_data)