1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
#!/usr/bin/env python3
import json
import numpy as np
import logging
logger = logging.getLogger(__name__)
# Adding additional parsing functionality
class SigrokResult:
def __init__(self, timestamps, onbeforefirstchange):
"""
Creates SigrokResult object, struct for timestamps and onBeforeFirstChange.
:param timestamps: list of changing timestamps
:param onbeforefirstchange: if the state before the first change is already on / should always be off, just to be data correct
"""
self.timestamps = timestamps
self.onBeforeFirstChange = onbeforefirstchange
def __str__(self):
"""
:return: string representation of object
"""
return "<Sigrok Result onBeforeFirstChange=%s timestamps=%s>" % (
self.onBeforeFirstChange,
self.timestamps,
)
def getDict(self):
"""
:return: dict representation of object
"""
data = {
"onBeforeFirstChange": self.onBeforeFirstChange,
"timestamps": self.timestamps,
}
return data
@classmethod
def fromFile(cls, path):
"""
Generates SigrokResult from json_file
:param path: file path
:return: SigrokResult object
"""
with open(path) as json_file:
data = json.load(json_file)
return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
pass
@classmethod
def fromString(cls, string):
"""
Generates SigrokResult from string
:param string: string
:return: SigrokResult object
"""
data = json.loads(string)
return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
pass
class SigrokInterface:
def __init__(self, sample_rate, driver="fx2lafw", filename="temp/sigrok.log"):
"""
:param sample_rate: Samplerate of the Logic Analyzer
:param driver: for many Logic Analyzer from Saleae the "fx2lafw" should be working
:param filename: temporary file name
"""
# options
self.sample_rate = sample_rate
self.file = open(filename, "w+")
self.driver = driver
# internal data
self.changes = []
self.start = None
self.last_val = None
self.index = 0
def runMeasure(self):
"""
Not implemented because implemented in subclasses
:return: None
"""
raise NotImplementedError("The method not implemented")
def forceStopMeasure(self):
"""
Not implemented because implemented in subclasses
:return: None
"""
raise NotImplementedError("The method not implemented")
def getData(self):
"""
:return:
"""
# return sigrok_energy_api_result(self.changes, True if self.start == 0xff else False)
return SigrokResult(
[x / self.sample_rate for x in self.changes],
True if self.start == 0xFF else False,
)
def analyzeData(self, byte):
"""
analyze one byte if it differs from the last byte, it will be appended to changes.
:param byte: one byte to analyze
"""
if self.start is None:
self.start = byte
self.last_val = byte
if byte != self.last_val:
self.changes.append(self.index)
self.last_val = byte
self.index += 1
|