summaryrefslogtreecommitdiff
path: root/lib/lennart/SigrokInterface.py
blob: a8b392f8e4ab9316e33137ec4e08081a2393e911 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3

import json
import numpy as np

import logging

logger = logging.getLogger(__name__)


# Adding additional parsing functionality
class SigrokResult:
    def __init__(self, timestamps, onbeforefirstchange):
        """
        Creates SigrokResult object, struct for timestamps and onBeforeFirstChange.

        :param timestamps: list of changing timestamps
        :param onbeforefirstchange: if the state before the first change is already on / should always be off, just to be data correct
        """
        self.timestamps = timestamps
        self.onBeforeFirstChange = onbeforefirstchange

    def __str__(self):
        """
        :return: string representation of object
        """
        return "<Sigrok Result onBeforeFirstChange=%s timestamps=%s>" % (
            self.onBeforeFirstChange,
            self.timestamps,
        )

    def getDict(self):
        """
        :return: dict representation of object
        """
        data = {
            "onBeforeFirstChange": self.onBeforeFirstChange,
            "timestamps": self.timestamps,
        }
        return data

    @classmethod
    def fromFile(cls, path):
        """
        Generates SigrokResult from json_file

        :param path: file path
        :return: SigrokResult object
        """
        with open(path) as json_file:
            data = json.load(json_file)
            return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
        pass

    @classmethod
    def fromString(cls, string):
        """
        Generates SigrokResult from string

        :param string: string
        :return: SigrokResult object
        """
        data = json.loads(string)
        return SigrokResult(data["timestamps"], data["onBeforeFirstChange"])
        pass


class SigrokInterface:
    def __init__(self, sample_rate, driver="fx2lafw", filename="temp/sigrok.log"):
        """

        :param sample_rate: Samplerate of the Logic Analyzer
        :param driver: for many Logic Analyzer from Saleae the "fx2lafw" should be working
        :param filename: temporary file name
        """
        # options
        self.sample_rate = sample_rate
        self.file = open(filename, "w+")
        self.driver = driver

        # internal data
        self.changes = []
        self.start = None
        self.last_val = None
        self.index = 0

    def runMeasure(self):
        """
        Not implemented because implemented in subclasses
        :return: None
        """
        raise NotImplementedError("The method not implemented")

    def forceStopMeasure(self):
        """
        Not implemented because implemented in subclasses
        :return: None
        """
        raise NotImplementedError("The method not implemented")

    def getData(self):
        """

        :return:
        """
        # return sigrok_energy_api_result(self.changes, True if self.start == 0xff else False)
        return SigrokResult(
            [x / self.sample_rate for x in self.changes],
            True if self.start == 0xFF else False,
        )

    def analyzeData(self, byte):
        """
        analyze one byte if it differs from the last byte, it will be appended to changes.

        :param byte: one byte to analyze
        """
        if self.start is None:
            self.start = byte
            self.last_val = byte
        if byte != self.last_val:
            self.changes.append(self.index)
            self.last_val = byte
        self.index += 1