diff options
Diffstat (limited to 'lib/runner.py')
-rw-r--r-- | lib/runner.py | 156 |
1 files changed, 87 insertions, 69 deletions
diff --git a/lib/runner.py b/lib/runner.py index 85df2b6..16f0a29 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -30,7 +30,7 @@ class SerialReader(serial.threaded.Protocol): def __init__(self, callback=None): """Create a new SerialReader object.""" self.callback = callback - self.recv_buf = '' + self.recv_buf = "" self.lines = [] def __call__(self): @@ -39,13 +39,13 @@ class SerialReader(serial.threaded.Protocol): def data_received(self, data): """Append newly received serial data to the line buffer.""" try: - str_data = data.decode('UTF-8') + str_data = data.decode("UTF-8") self.recv_buf += str_data # We may get anything between \r\n, \n\r and simple \n newlines. # We assume that \n is always present and use str.strip to remove leading/trailing \r symbols # Note: Do not call str.strip on lines[-1]! Otherwise, lines may be mangled - lines = self.recv_buf.split('\n') + lines = self.recv_buf.split("\n") if len(lines) > 1: self.lines.extend(map(str.strip, lines[:-1])) self.recv_buf = lines[-1] @@ -94,14 +94,16 @@ class SerialMonitor: """ self.ser = serial.serial_for_url(port, do_not_open=True) self.ser.baudrate = baud - self.ser.parity = 'N' + self.ser.parity = "N" self.ser.rtscts = False self.ser.xonxoff = False try: self.ser.open() except serial.SerialException as e: - sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e)) + sys.stderr.write( + "Could not open serial port {}: {}\n".format(self.ser.name, e) + ) sys.exit(1) self.reader = SerialReader(callback=callback) @@ -131,6 +133,7 @@ class SerialMonitor: self.worker.stop() self.ser.close() + # TODO Optionale Kalibrierung mit bekannten Widerständen an GPIOs am Anfang # TODO Sync per LED? -> Vor und ggf nach jeder Transition kurz pulsen # TODO Für Verbraucher mit wenig Energiebedarf: Versorgung direkt per GPIO @@ -143,14 +146,14 @@ class EnergyTraceMonitor(SerialMonitor): def __init__(self, port: str, baud: int, callback=None, voltage=3.3): super().__init__(port=port, baud=baud, callback=callback) self._voltage = voltage - self._output = time.strftime('%Y%m%d-%H%M%S.etlog') + self._output = time.strftime("%Y%m%d-%H%M%S.etlog") self._start_energytrace() def _start_energytrace(self): - cmd = ['msp430-etv', '--save', self._output, '0'] - self._logger = subprocess.Popen(cmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + cmd = ["msp430-etv", "--save", self._output, "0"] + self._logger = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True + ) def close(self): super().close() @@ -162,14 +165,16 @@ class EnergyTraceMonitor(SerialMonitor): def get_config(self) -> dict: return { - 'voltage': self._voltage, + "voltage": self._voltage, } class MIMOSAMonitor(SerialMonitor): """MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time.""" - def __init__(self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3): + def __init__( + self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3 + ): super().__init__(port=port, baud=baud, callback=callback) self._offset = offset self._shunt = shunt @@ -177,39 +182,41 @@ class MIMOSAMonitor(SerialMonitor): self._start_mimosa() def _mimosactl(self, subcommand): - cmd = ['mimosactl'] + cmd = ["mimosactl"] cmd.append(subcommand) res = subprocess.run(cmd) if res.returncode != 0: res = subprocess.run(cmd) if res.returncode != 0: - raise RuntimeError('{} returned {}'.format(' '.join(cmd), res.returncode)) + raise RuntimeError( + "{} returned {}".format(" ".join(cmd), res.returncode) + ) def _mimosacmd(self, opts): - cmd = ['MimosaCMD'] + cmd = ["MimosaCMD"] cmd.extend(opts) res = subprocess.run(cmd) if res.returncode != 0: - raise RuntimeError('{} returned {}'.format(' '.join(cmd), res.returncode)) + raise RuntimeError("{} returned {}".format(" ".join(cmd), res.returncode)) def _start_mimosa(self): - self._mimosactl('disconnect') - self._mimosacmd(['--start']) - self._mimosacmd(['--parameter', 'offset', str(self._offset)]) - self._mimosacmd(['--parameter', 'shunt', str(self._shunt)]) - self._mimosacmd(['--parameter', 'voltage', str(self._voltage)]) - self._mimosacmd(['--mimosa-start']) + self._mimosactl("disconnect") + self._mimosacmd(["--start"]) + self._mimosacmd(["--parameter", "offset", str(self._offset)]) + self._mimosacmd(["--parameter", "shunt", str(self._shunt)]) + self._mimosacmd(["--parameter", "voltage", str(self._voltage)]) + self._mimosacmd(["--mimosa-start"]) time.sleep(2) - self._mimosactl('1k') # 987 ohm + self._mimosactl("1k") # 987 ohm time.sleep(2) - self._mimosactl('100k') # 99.3 kohm + self._mimosactl("100k") # 99.3 kohm time.sleep(2) - self._mimosactl('connect') + self._mimosactl("connect") def _stop_mimosa(self): # Make sure the MIMOSA daemon has gathered all needed data time.sleep(2) - self._mimosacmd(['--mimosa-stop']) + self._mimosacmd(["--mimosa-stop"]) mtime_changed = True mim_file = None time.sleep(1) @@ -218,7 +225,7 @@ class MIMOSAMonitor(SerialMonitor): # files lying around in the directory will not confuse our # heuristic. for filename in sorted(os.listdir(), reverse=True): - if re.search(r'[.]mim$', filename): + if re.search(r"[.]mim$", filename): mim_file = filename break while mtime_changed: @@ -226,7 +233,7 @@ class MIMOSAMonitor(SerialMonitor): if time.time() - os.stat(mim_file).st_mtime < 3: mtime_changed = True time.sleep(1) - self._mimosacmd(['--stop']) + self._mimosacmd(["--stop"]) return mim_file def close(self): @@ -238,9 +245,9 @@ class MIMOSAMonitor(SerialMonitor): def get_config(self) -> dict: return { - 'offset': self._offset, - 'shunt': self._shunt, - 'voltage': self._voltage, + "offset": self._offset, + "shunt": self._shunt, + "voltage": self._voltage, } @@ -263,14 +270,17 @@ class ShellMonitor: stderr and return status are discarded at the moment. """ if type(timeout) != int: - raise ValueError('timeout argument must be int') - res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + raise ValueError("timeout argument must be int") + res = subprocess.run( + ["timeout", "{:d}s".format(timeout), self.script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) if self.callback: - for line in res.stdout.split('\n'): + for line in res.stdout.split("\n"): self.callback(line) - return res.stdout.split('\n') + return res.stdout.split("\n") def monitor(self): raise NotImplementedError @@ -285,28 +295,35 @@ class ShellMonitor: def build(arch, app, opts=[]): - command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'clean'] + command = ["make", "arch={}".format(arch), "app={}".format(app), "clean"] command.extend(opts) - res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + res = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True + ) if res.returncode != 0: - raise RuntimeError('Build failure, executing {}:\n'.format(command) + res.stderr) - command = ['make', '-B', 'arch={}'.format(arch), 'app={}'.format(app)] + raise RuntimeError( + "Build failure, executing {}:\n".format(command) + res.stderr + ) + command = ["make", "-B", "arch={}".format(arch), "app={}".format(app)] command.extend(opts) - res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + res = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True + ) if res.returncode != 0: - raise RuntimeError('Build failure, executing {}:\n '.format(command) + res.stderr) + raise RuntimeError( + "Build failure, executing {}:\n ".format(command) + res.stderr + ) return command def flash(arch, app, opts=[]): - command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'program'] + command = ["make", "arch={}".format(arch), "app={}".format(app), "program"] command.extend(opts) - res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + res = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True + ) if res.returncode != 0: - raise RuntimeError('Flash failure') + raise RuntimeError("Flash failure") return command @@ -316,13 +333,14 @@ def get_info(arch, opts: list = []) -> list: Returns a list. """ - command = ['make', 'arch={}'.format(arch), 'info'] + command = ["make", "arch={}".format(arch), "info"] command.extend(opts) - res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) + res = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True + ) if res.returncode != 0: - raise RuntimeError('make info Failure') - return res.stdout.split('\n') + raise RuntimeError("make info Failure") + return res.stdout.split("\n") def get_monitor(arch: str, **kwargs) -> object: @@ -336,32 +354,32 @@ def get_monitor(arch: str, **kwargs) -> object: :param mimosa: `MIMOSAMonitor` options. Returns a MIMOSA monitor if not None. """ for line in get_info(arch): - if 'Monitor:' in line: - _, port, arg = line.split(' ') - if port == 'run': + if "Monitor:" in line: + _, port, arg = line.split(" ") + if port == "run": return ShellMonitor(arg, **kwargs) - elif 'mimosa' in kwargs and kwargs['mimosa'] is not None: - mimosa_kwargs = kwargs.pop('mimosa') + elif "mimosa" in kwargs and kwargs["mimosa"] is not None: + mimosa_kwargs = kwargs.pop("mimosa") return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs) - elif 'energytrace' in kwargs and kwargs['energytrace'] is not None: - energytrace_kwargs = kwargs.pop('energytrace') + elif "energytrace" in kwargs and kwargs["energytrace"] is not None: + energytrace_kwargs = kwargs.pop("energytrace") return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs) else: - kwargs.pop('energytrace', None) - kwargs.pop('mimosa', None) + kwargs.pop("energytrace", None) + kwargs.pop("mimosa", None) return SerialMonitor(port, arg, **kwargs) - raise RuntimeError('Monitor failure') + raise RuntimeError("Monitor failure") def get_counter_limits(arch: str) -> tuple: """Return multipass max counter and max overflow value for arch.""" for line in get_info(arch): - match = re.match('Counter Overflow: ([^/]*)/(.*)', line) + match = re.match("Counter Overflow: ([^/]*)/(.*)", line) if match: overflow_value = int(match.group(1)) max_overflow = int(match.group(2)) return overflow_value, max_overflow - raise RuntimeError('Did not find Counter Overflow limits') + raise RuntimeError("Did not find Counter Overflow limits") def get_counter_limits_us(arch: str) -> tuple: @@ -370,13 +388,13 @@ def get_counter_limits_us(arch: str) -> tuple: overflow_value = 0 max_overflow = 0 for line in get_info(arch): - match = re.match(r'CPU\s+Freq:\s+(.*)\s+Hz', line) + match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line) if match: cpu_freq = int(match.group(1)) - match = re.match(r'Counter Overflow:\s+([^/]*)/(.*)', line) + match = re.match(r"Counter Overflow:\s+([^/]*)/(.*)", line) if match: overflow_value = int(match.group(1)) max_overflow = int(match.group(2)) if cpu_freq and overflow_value: return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow - raise RuntimeError('Did not find Counter Overflow limits') + raise RuntimeError("Did not find Counter Overflow limits") |