| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
 | #!/usr/bin/env python3
import json
import numpy as np
import logging
logger = logging.getLogger(__name__)
# Adding additional parsing functionality
class SigrokResult:
    def __init__(self, timestamps, onbeforefirstchange):
        """
        Creates SigrokResult object, struct for timestamps and onBeforeFirstChange.
        :param timestamps: list of changing timestamps
        :param onbeforefirstchange: if the state before the first change is already on / should always be off, just to be data correct
        """
        self.timestamps = timestamps
        self.onBeforeFirstChange = onbeforefirstchange
    def __str__(self):
        """
        :return: string representation of object
        """
        return "<Sigrok Result onBeforeFirstChange=%s timestamps=%s>" % (
            self.onBeforeFirstChange,
            self.timestamps,
        )
    def getDict(self):
        """
        :return: dict representation of object
        """
        data = {
            "onBeforeFirstChange": self.onBeforeFirstChange,
            "timestamps": self.timestamps,
        }
        return data
    @classmethod
    def fromFile(cls, path):
        """
        Generates SigrokResult from json_file
        :param path: file path
        :return: SigrokResult object
        """
        with open(path) as json_file:
            data = json.load(json_file)
            return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
        pass
    @classmethod
    def fromString(cls, string):
        """
        Generates SigrokResult from string
        :param string: string
        :return: SigrokResult object
        """
        data = json.loads(string)
        return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
        pass
class SigrokInterface:
    def __init__(self, sample_rate, driver="fx2lafw", filename="temp/sigrok.log"):
        """
        :param sample_rate: Samplerate of the Logic Analyzer
        :param driver: for many Logic Analyzer from Saleae the "fx2lafw" should be working
        :param filename: temporary file name
        """
        # options
        self.sample_rate = sample_rate
        self.file = open(filename, "w+")
        self.driver = driver
        # internal data
        self.changes = []
        self.start = None
        self.last_val = None
        self.index = 0
    def runMeasure(self):
        """
        Not implemented because implemented in subclasses
        :return: None
        """
        raise NotImplementedError("The method not implemented")
    def forceStopMeasure(self):
        """
        Not implemented because implemented in subclasses
        :return: None
        """
        raise NotImplementedError("The method not implemented")
    def getData(self):
        """
        :return:
        """
        # return sigrok_energy_api_result(self.changes, True if self.start == 0xff else False)
        return SigrokResult(
            [x / self.sample_rate for x in self.changes],
            True if self.start == 0xFF else False,
        )
    def analyzeData(self, byte):
        """
        analyze one byte if it differs from the last byte, it will be appended to changes.
        :param byte: one byte to analyze
        """
        if self.start is None:
            self.start = byte
            self.last_val = byte
        if byte != self.last_val:
            self.changes.append(self.index)
            self.last_val = byte
        self.index += 1
 |