diff options
Diffstat (limited to 'lib/runner.py')
-rw-r--r-- | lib/runner.py | 82 |
1 files changed, 51 insertions, 31 deletions
diff --git a/lib/runner.py b/lib/runner.py index 1525b56..32eb950 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -18,6 +18,7 @@ import subprocess import sys import time + class SerialReader(serial.threaded.Protocol): """ Character- to line-wise data buffer for serial interfaces. @@ -25,7 +26,8 @@ class SerialReader(serial.threaded.Protocol): Reads in new data whenever it becomes available and exposes a line-based interface to applications. """ - def __init__(self, callback = None): + + def __init__(self, callback=None): """Create a new SerialReader object.""" self.callback = callback self.recv_buf = '' @@ -53,7 +55,7 @@ class SerialReader(serial.threaded.Protocol): except UnicodeDecodeError: pass - #sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data)) + # sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data)) def get_lines(self) -> list: """ @@ -79,10 +81,11 @@ class SerialReader(serial.threaded.Protocol): return ret return None + class SerialMonitor: """SerialMonitor captures serial output for a specific amount of time.""" - def __init__(self, port: str, baud: int, callback = None): + def __init__(self, port: str, baud: int, callback=None): """ Create a new SerialMonitor connected to port at the specified baud rate. @@ -101,7 +104,7 @@ class SerialMonitor: sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e)) sys.exit(1) - self.reader = SerialReader(callback = callback) + self.reader = SerialReader(callback=callback) self.worker = serial.threaded.ReaderThread(self.ser, self.reader) self.worker.start() @@ -128,10 +131,17 @@ class SerialMonitor: self.worker.stop() self.ser.close() +# TODO Optionale Kalibrierung mit bekannten Widerständen an GPIOs am Anfang +# TODO Sync per LED? -> Vor und ggf nach jeder Transition kurz pulsen +# TODO Für Verbraucher mit wenig Energiebedarf: Versorgung direkt per GPIO +# -> Zu Beginn der Messung ganz ausknipsen + + class EnergyTraceMonitor(SerialMonitor): """EnergyTraceMonitor captures serial timing output and EnergyTrace energy data.""" - def __init__(self, port: str, baud: int, callback = None, voltage = 3.3): - super().__init__(port = port, baud = baud, callback = callback) + + def __init__(self, port: str, baud: int, callback=None, voltage=3.3): + super().__init__(port=port, baud=baud, callback=callback) self._voltage = voltage self._output = time.strftime('%Y%m%d-%H%M%S.etlog') self._start_energytrace() @@ -139,26 +149,28 @@ class EnergyTraceMonitor(SerialMonitor): def _start_energytrace(self): cmd = ['msp430-etv', '--save', self._output, '0'] self._logger = subprocess.Popen(cmd, - stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) def close(self): super().close() self._logger.send_signal(subprocess.signal.SIGINT) - stdout, stderr = self._logger.communicate(timeout = 15) + stdout, stderr = self._logger.communicate(timeout=15) def get_files(self) -> list: return [self._output] def get_config(self) -> dict: return { - 'voltage' : self._voltage, + 'voltage': self._voltage, } + class MIMOSAMonitor(SerialMonitor): """MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time.""" - def __init__(self, port: str, baud: int, callback = None, offset = 130, shunt = 330, voltage = 3.3): - super().__init__(port = port, baud = baud, callback = callback) + + def __init__(self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3): + super().__init__(port=port, baud=baud, callback=callback) self._offset = offset self._shunt = shunt self._voltage = voltage @@ -188,9 +200,9 @@ class MIMOSAMonitor(SerialMonitor): self._mimosacmd(['--parameter', 'voltage', str(self._voltage)]) self._mimosacmd(['--mimosa-start']) time.sleep(2) - self._mimosactl('1k') # 987 ohm + self._mimosactl('1k') # 987 ohm time.sleep(2) - self._mimosactl('100k') # 99.3 kohm + self._mimosactl('100k') # 99.3 kohm time.sleep(2) self._mimosactl('connect') @@ -205,7 +217,7 @@ class MIMOSAMonitor(SerialMonitor): # belong to the current measurements. This ensures that older .mim # files lying around in the directory will not confuse our # heuristic. - for filename in sorted(os.listdir(), reverse = True): + for filename in sorted(os.listdir(), reverse=True): if re.search(r'[.]mim$', filename): mim_file = filename break @@ -226,14 +238,16 @@ class MIMOSAMonitor(SerialMonitor): def get_config(self) -> dict: return { - 'offset' : self._offset, - 'shunt' : self._shunt, - 'voltage' : self._voltage, + 'offset': self._offset, + 'shunt': self._shunt, + 'voltage': self._voltage, } + class ShellMonitor: """SerialMonitor runs a program and captures its output for a specific amount of time.""" - def __init__(self, script: str, callback = None): + + def __init__(self, script: str, callback=None): """ Create a new ShellMonitor object. @@ -251,8 +265,8 @@ class ShellMonitor: if type(timeout) != int: raise ValueError('timeout argument must be int') res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script], - stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) if self.callback: for line in res.stdout.split('\n'): self.callback(line) @@ -269,30 +283,33 @@ class ShellMonitor: """ pass -def build(arch, app, opts = []): + +def build(arch, app, opts=[]): command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'clean'] command.extend(opts) - res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) if res.returncode != 0: raise RuntimeError('Build failure: ' + res.stderr) command = ['make', '-B', 'arch={}'.format(arch), 'app={}'.format(app)] command.extend(opts) - res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) if res.returncode != 0: raise RuntimeError('Build failure: ' + res.stderr) return command -def flash(arch, app, opts = []): + +def flash(arch, app, opts=[]): command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'program'] command.extend(opts) - res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) if res.returncode != 0: raise RuntimeError('Flash failure') return command + def get_info(arch, opts: list = []) -> list: """ Return multipass "make info" output. @@ -301,12 +318,13 @@ def get_info(arch, opts: list = []) -> list: """ command = ['make', 'arch={}'.format(arch), 'info'] command.extend(opts) - res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, - universal_newlines = True) + res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) if res.returncode != 0: raise RuntimeError('make info Failure') return res.stdout.split('\n') + def get_monitor(arch: str, **kwargs) -> object: """ Return an appropriate monitor for arch, depending on "make info" output. @@ -334,6 +352,7 @@ def get_monitor(arch: str, **kwargs) -> object: return SerialMonitor(port, arg, **kwargs) raise RuntimeError('Monitor failure') + def get_counter_limits(arch: str) -> tuple: """Return multipass max counter and max overflow value for arch.""" for line in get_info(arch): @@ -344,6 +363,7 @@ def get_counter_limits(arch: str) -> tuple: return overflow_value, max_overflow raise RuntimeError('Did not find Counter Overflow limits') + def get_counter_limits_us(arch: str) -> tuple: """Return duration of one counter step and one counter overflow in us.""" cpu_freq = 0 |