summaryrefslogtreecommitdiff
path: root/lib/runner.py
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2019-11-21 09:17:34 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2019-11-21 09:17:34 +0100
commit0c7da8eb44c0f3162829413baff0e9162566202d (patch)
tree0d468ebfb73974029938ab86c708e384ba1a6d48 /lib/runner.py
parentbffa9cba304c5ff1a2a11e1ea3a9b1fede1cacfb (diff)
autopep8 / flake8
Diffstat (limited to 'lib/runner.py')
-rw-r--r--lib/runner.py82
1 files changed, 51 insertions, 31 deletions
diff --git a/lib/runner.py b/lib/runner.py
index 1525b56..32eb950 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -18,6 +18,7 @@ import subprocess
import sys
import time
+
class SerialReader(serial.threaded.Protocol):
"""
Character- to line-wise data buffer for serial interfaces.
@@ -25,7 +26,8 @@ class SerialReader(serial.threaded.Protocol):
Reads in new data whenever it becomes available and exposes a line-based
interface to applications.
"""
- def __init__(self, callback = None):
+
+ def __init__(self, callback=None):
"""Create a new SerialReader object."""
self.callback = callback
self.recv_buf = ''
@@ -53,7 +55,7 @@ class SerialReader(serial.threaded.Protocol):
except UnicodeDecodeError:
pass
- #sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data))
+ # sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data))
def get_lines(self) -> list:
"""
@@ -79,10 +81,11 @@ class SerialReader(serial.threaded.Protocol):
return ret
return None
+
class SerialMonitor:
"""SerialMonitor captures serial output for a specific amount of time."""
- def __init__(self, port: str, baud: int, callback = None):
+ def __init__(self, port: str, baud: int, callback=None):
"""
Create a new SerialMonitor connected to port at the specified baud rate.
@@ -101,7 +104,7 @@ class SerialMonitor:
sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e))
sys.exit(1)
- self.reader = SerialReader(callback = callback)
+ self.reader = SerialReader(callback=callback)
self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
self.worker.start()
@@ -128,10 +131,17 @@ class SerialMonitor:
self.worker.stop()
self.ser.close()
+# TODO Optionale Kalibrierung mit bekannten Widerständen an GPIOs am Anfang
+# TODO Sync per LED? -> Vor und ggf nach jeder Transition kurz pulsen
+# TODO Für Verbraucher mit wenig Energiebedarf: Versorgung direkt per GPIO
+# -> Zu Beginn der Messung ganz ausknipsen
+
+
class EnergyTraceMonitor(SerialMonitor):
"""EnergyTraceMonitor captures serial timing output and EnergyTrace energy data."""
- def __init__(self, port: str, baud: int, callback = None, voltage = 3.3):
- super().__init__(port = port, baud = baud, callback = callback)
+
+ def __init__(self, port: str, baud: int, callback=None, voltage=3.3):
+ super().__init__(port=port, baud=baud, callback=callback)
self._voltage = voltage
self._output = time.strftime('%Y%m%d-%H%M%S.etlog')
self._start_energytrace()
@@ -139,26 +149,28 @@ class EnergyTraceMonitor(SerialMonitor):
def _start_energytrace(self):
cmd = ['msp430-etv', '--save', self._output, '0']
self._logger = subprocess.Popen(cmd,
- stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
def close(self):
super().close()
self._logger.send_signal(subprocess.signal.SIGINT)
- stdout, stderr = self._logger.communicate(timeout = 15)
+ stdout, stderr = self._logger.communicate(timeout=15)
def get_files(self) -> list:
return [self._output]
def get_config(self) -> dict:
return {
- 'voltage' : self._voltage,
+ 'voltage': self._voltage,
}
+
class MIMOSAMonitor(SerialMonitor):
"""MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time."""
- def __init__(self, port: str, baud: int, callback = None, offset = 130, shunt = 330, voltage = 3.3):
- super().__init__(port = port, baud = baud, callback = callback)
+
+ def __init__(self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3):
+ super().__init__(port=port, baud=baud, callback=callback)
self._offset = offset
self._shunt = shunt
self._voltage = voltage
@@ -188,9 +200,9 @@ class MIMOSAMonitor(SerialMonitor):
self._mimosacmd(['--parameter', 'voltage', str(self._voltage)])
self._mimosacmd(['--mimosa-start'])
time.sleep(2)
- self._mimosactl('1k') # 987 ohm
+ self._mimosactl('1k') # 987 ohm
time.sleep(2)
- self._mimosactl('100k') # 99.3 kohm
+ self._mimosactl('100k') # 99.3 kohm
time.sleep(2)
self._mimosactl('connect')
@@ -205,7 +217,7 @@ class MIMOSAMonitor(SerialMonitor):
# belong to the current measurements. This ensures that older .mim
# files lying around in the directory will not confuse our
# heuristic.
- for filename in sorted(os.listdir(), reverse = True):
+ for filename in sorted(os.listdir(), reverse=True):
if re.search(r'[.]mim$', filename):
mim_file = filename
break
@@ -226,14 +238,16 @@ class MIMOSAMonitor(SerialMonitor):
def get_config(self) -> dict:
return {
- 'offset' : self._offset,
- 'shunt' : self._shunt,
- 'voltage' : self._voltage,
+ 'offset': self._offset,
+ 'shunt': self._shunt,
+ 'voltage': self._voltage,
}
+
class ShellMonitor:
"""SerialMonitor runs a program and captures its output for a specific amount of time."""
- def __init__(self, script: str, callback = None):
+
+ def __init__(self, script: str, callback=None):
"""
Create a new ShellMonitor object.
@@ -251,8 +265,8 @@ class ShellMonitor:
if type(timeout) != int:
raise ValueError('timeout argument must be int')
res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script],
- stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
if self.callback:
for line in res.stdout.split('\n'):
self.callback(line)
@@ -269,30 +283,33 @@ class ShellMonitor:
"""
pass
-def build(arch, app, opts = []):
+
+def build(arch, app, opts=[]):
command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'clean']
command.extend(opts)
- res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
if res.returncode != 0:
raise RuntimeError('Build failure: ' + res.stderr)
command = ['make', '-B', 'arch={}'.format(arch), 'app={}'.format(app)]
command.extend(opts)
- res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
if res.returncode != 0:
raise RuntimeError('Build failure: ' + res.stderr)
return command
-def flash(arch, app, opts = []):
+
+def flash(arch, app, opts=[]):
command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'program']
command.extend(opts)
- res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
if res.returncode != 0:
raise RuntimeError('Flash failure')
return command
+
def get_info(arch, opts: list = []) -> list:
"""
Return multipass "make info" output.
@@ -301,12 +318,13 @@ def get_info(arch, opts: list = []) -> list:
"""
command = ['make', 'arch={}'.format(arch), 'info']
command.extend(opts)
- res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
- universal_newlines = True)
+ res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
if res.returncode != 0:
raise RuntimeError('make info Failure')
return res.stdout.split('\n')
+
def get_monitor(arch: str, **kwargs) -> object:
"""
Return an appropriate monitor for arch, depending on "make info" output.
@@ -334,6 +352,7 @@ def get_monitor(arch: str, **kwargs) -> object:
return SerialMonitor(port, arg, **kwargs)
raise RuntimeError('Monitor failure')
+
def get_counter_limits(arch: str) -> tuple:
"""Return multipass max counter and max overflow value for arch."""
for line in get_info(arch):
@@ -344,6 +363,7 @@ def get_counter_limits(arch: str) -> tuple:
return overflow_value, max_overflow
raise RuntimeError('Did not find Counter Overflow limits')
+
def get_counter_limits_us(arch: str) -> tuple:
"""Return duration of one counter step and one counter overflow in us."""
cpu_freq = 0