1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
import time
from dfatool.lennart.SigrokInterface import SigrokInterface
import sigrok.core as sr
from sigrok.core.classes import *
from util.ByteHelper import ByteHelper
import logging
logger = logging.getLogger(__name__)
class SigrokAPIInterface(SigrokInterface):
def datafeed_changes(self, device, packet):
"""
Callback type with changes analysis
:param device: device object
:param packet: data (String with binary data)
"""
data = ByteHelper.rawbytes(self.output.receive(packet))
if data:
# only using every second byte,
# because only every second contains the useful information.
for x in data[1::2]:
self.analyzeData(x)
def datafeed_in_all(self, device, packet):
"""
Callback type which writes all data into the array
:param device: device object
:param packet: data (String with binary data)
"""
data = ByteHelper.rawbytes(self.output.receive(packet))
if data:
# only using every second byte,
# because only every second contains the useful information.
self.all_data += data[1::2]
def datafeed_file(self, device, packet):
"""
Callback type which writes all data into a file
:param device: device object
:param packet: data (String with binary data)
"""
data = ByteHelper.rawbytes(self.output.receive(packet))
if data:
# only using every second byte,
# because only every second contains the useful information.
for x in data[1::2]:
self.file.write(str(x) + "\n")
def __init__(
self,
driver="fx2lafw",
sample_rate=100_000,
debug_output=False,
used_datafeed=datafeed_changes,
fake=False,
):
"""
:param driver: Driver that should be used
:param sample_rate: The sample rate of the Logic analyzer
:param debug_output: Should be true if output should be displayed to user
:param used_datafeed: one of the datafeeds above, user later as callback.
:param fake:
"""
super(SigrokAPIInterface, self).__init__(sample_rate)
if fake:
raise NotImplementedError("Not implemented!")
self.used_datafeed = used_datafeed
self.debug_output = debug_output
self.session = None
def forceStopMeasure(self):
"""
Force stopping the measurement
:return: None
"""
self.session.stop()
def runMeasure(self):
"""
Start the Measurement and set all settings
"""
context = sr.Context_create()
devs = context.drivers[self.driver].scan()
# print(devs)
if len(devs) == 0:
raise RuntimeError("No device with that driver found!")
sigrokDevice = devs[0]
if len(devs) > 1:
raise Warning(
"Attention! Multiple devices with that driver found! Using ",
sigrokDevice.connection_id(),
)
sigrokDevice.open()
sigrokDevice.config_set(ConfigKey.SAMPLERATE, self.sample_rate)
enabled_channels = ["D1"]
for channel in sigrokDevice.channels:
channel.enabled = channel.name in enabled_channels
self.session = context.create_session()
self.session.add_device(sigrokDevice)
self.session.start()
self.output = context.output_formats["binary"].create_output(sigrokDevice)
print(context.output_formats)
self.all_data = b""
def datafeed(device, packet):
self.used_datafeed(self, device, packet)
self.session.add_datafeed_callback(datafeed)
time_running = time.time()
self.session.run()
total_time = time.time() - time_running
print(
"Used time: ",
total_time * 1_000_000,
"µs",
)
self.session.stop()
if self.debug_output:
# if self.used_datafeed == self.datafeed_in_change:
if True:
changes = [x / self.sample_rate for x in self.changes]
print(changes)
is_on = self.start == 0xFF
print("0", " - ", changes[0], " # Pin ", "HIGH" if is_on else "LOW")
for x in range(len(changes) - 1):
is_on = not is_on
print(
changes[x],
" - ",
changes[x + 1],
" / ",
changes[x + 1] - changes[x],
" # Pin ",
"HIGH" if is_on else "LOW",
)
elif self.used_datafeed == self.datafeed_in_all:
print(self.all_data)
|