summaryrefslogtreecommitdiff
path: root/lib/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/runner.py')
-rw-r--r--lib/runner.py156
1 files changed, 87 insertions, 69 deletions
diff --git a/lib/runner.py b/lib/runner.py
index 85df2b6..16f0a29 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -30,7 +30,7 @@ class SerialReader(serial.threaded.Protocol):
def __init__(self, callback=None):
"""Create a new SerialReader object."""
self.callback = callback
- self.recv_buf = ''
+ self.recv_buf = ""
self.lines = []
def __call__(self):
@@ -39,13 +39,13 @@ class SerialReader(serial.threaded.Protocol):
def data_received(self, data):
"""Append newly received serial data to the line buffer."""
try:
- str_data = data.decode('UTF-8')
+ str_data = data.decode("UTF-8")
self.recv_buf += str_data
# We may get anything between \r\n, \n\r and simple \n newlines.
# We assume that \n is always present and use str.strip to remove leading/trailing \r symbols
# Note: Do not call str.strip on lines[-1]! Otherwise, lines may be mangled
- lines = self.recv_buf.split('\n')
+ lines = self.recv_buf.split("\n")
if len(lines) > 1:
self.lines.extend(map(str.strip, lines[:-1]))
self.recv_buf = lines[-1]
@@ -94,14 +94,16 @@ class SerialMonitor:
"""
self.ser = serial.serial_for_url(port, do_not_open=True)
self.ser.baudrate = baud
- self.ser.parity = 'N'
+ self.ser.parity = "N"
self.ser.rtscts = False
self.ser.xonxoff = False
try:
self.ser.open()
except serial.SerialException as e:
- sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e))
+ sys.stderr.write(
+ "Could not open serial port {}: {}\n".format(self.ser.name, e)
+ )
sys.exit(1)
self.reader = SerialReader(callback=callback)
@@ -131,6 +133,7 @@ class SerialMonitor:
self.worker.stop()
self.ser.close()
+
# TODO Optionale Kalibrierung mit bekannten Widerständen an GPIOs am Anfang
# TODO Sync per LED? -> Vor und ggf nach jeder Transition kurz pulsen
# TODO Für Verbraucher mit wenig Energiebedarf: Versorgung direkt per GPIO
@@ -143,14 +146,14 @@ class EnergyTraceMonitor(SerialMonitor):
def __init__(self, port: str, baud: int, callback=None, voltage=3.3):
super().__init__(port=port, baud=baud, callback=callback)
self._voltage = voltage
- self._output = time.strftime('%Y%m%d-%H%M%S.etlog')
+ self._output = time.strftime("%Y%m%d-%H%M%S.etlog")
self._start_energytrace()
def _start_energytrace(self):
- cmd = ['msp430-etv', '--save', self._output, '0']
- self._logger = subprocess.Popen(cmd,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ cmd = ["msp430-etv", "--save", self._output, "0"]
+ self._logger = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
+ )
def close(self):
super().close()
@@ -162,14 +165,16 @@ class EnergyTraceMonitor(SerialMonitor):
def get_config(self) -> dict:
return {
- 'voltage': self._voltage,
+ "voltage": self._voltage,
}
class MIMOSAMonitor(SerialMonitor):
"""MIMOSAMonitor captures serial output and MIMOSA energy data for a specific amount of time."""
- def __init__(self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3):
+ def __init__(
+ self, port: str, baud: int, callback=None, offset=130, shunt=330, voltage=3.3
+ ):
super().__init__(port=port, baud=baud, callback=callback)
self._offset = offset
self._shunt = shunt
@@ -177,39 +182,41 @@ class MIMOSAMonitor(SerialMonitor):
self._start_mimosa()
def _mimosactl(self, subcommand):
- cmd = ['mimosactl']
+ cmd = ["mimosactl"]
cmd.append(subcommand)
res = subprocess.run(cmd)
if res.returncode != 0:
res = subprocess.run(cmd)
if res.returncode != 0:
- raise RuntimeError('{} returned {}'.format(' '.join(cmd), res.returncode))
+ raise RuntimeError(
+ "{} returned {}".format(" ".join(cmd), res.returncode)
+ )
def _mimosacmd(self, opts):
- cmd = ['MimosaCMD']
+ cmd = ["MimosaCMD"]
cmd.extend(opts)
res = subprocess.run(cmd)
if res.returncode != 0:
- raise RuntimeError('{} returned {}'.format(' '.join(cmd), res.returncode))
+ raise RuntimeError("{} returned {}".format(" ".join(cmd), res.returncode))
def _start_mimosa(self):
- self._mimosactl('disconnect')
- self._mimosacmd(['--start'])
- self._mimosacmd(['--parameter', 'offset', str(self._offset)])
- self._mimosacmd(['--parameter', 'shunt', str(self._shunt)])
- self._mimosacmd(['--parameter', 'voltage', str(self._voltage)])
- self._mimosacmd(['--mimosa-start'])
+ self._mimosactl("disconnect")
+ self._mimosacmd(["--start"])
+ self._mimosacmd(["--parameter", "offset", str(self._offset)])
+ self._mimosacmd(["--parameter", "shunt", str(self._shunt)])
+ self._mimosacmd(["--parameter", "voltage", str(self._voltage)])
+ self._mimosacmd(["--mimosa-start"])
time.sleep(2)
- self._mimosactl('1k') # 987 ohm
+ self._mimosactl("1k") # 987 ohm
time.sleep(2)
- self._mimosactl('100k') # 99.3 kohm
+ self._mimosactl("100k") # 99.3 kohm
time.sleep(2)
- self._mimosactl('connect')
+ self._mimosactl("connect")
def _stop_mimosa(self):
# Make sure the MIMOSA daemon has gathered all needed data
time.sleep(2)
- self._mimosacmd(['--mimosa-stop'])
+ self._mimosacmd(["--mimosa-stop"])
mtime_changed = True
mim_file = None
time.sleep(1)
@@ -218,7 +225,7 @@ class MIMOSAMonitor(SerialMonitor):
# files lying around in the directory will not confuse our
# heuristic.
for filename in sorted(os.listdir(), reverse=True):
- if re.search(r'[.]mim$', filename):
+ if re.search(r"[.]mim$", filename):
mim_file = filename
break
while mtime_changed:
@@ -226,7 +233,7 @@ class MIMOSAMonitor(SerialMonitor):
if time.time() - os.stat(mim_file).st_mtime < 3:
mtime_changed = True
time.sleep(1)
- self._mimosacmd(['--stop'])
+ self._mimosacmd(["--stop"])
return mim_file
def close(self):
@@ -238,9 +245,9 @@ class MIMOSAMonitor(SerialMonitor):
def get_config(self) -> dict:
return {
- 'offset': self._offset,
- 'shunt': self._shunt,
- 'voltage': self._voltage,
+ "offset": self._offset,
+ "shunt": self._shunt,
+ "voltage": self._voltage,
}
@@ -263,14 +270,17 @@ class ShellMonitor:
stderr and return status are discarded at the moment.
"""
if type(timeout) != int:
- raise ValueError('timeout argument must be int')
- res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ raise ValueError("timeout argument must be int")
+ res = subprocess.run(
+ ["timeout", "{:d}s".format(timeout), self.script],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ )
if self.callback:
- for line in res.stdout.split('\n'):
+ for line in res.stdout.split("\n"):
self.callback(line)
- return res.stdout.split('\n')
+ return res.stdout.split("\n")
def monitor(self):
raise NotImplementedError
@@ -285,28 +295,35 @@ class ShellMonitor:
def build(arch, app, opts=[]):
- command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'clean']
+ command = ["make", "arch={}".format(arch), "app={}".format(app), "clean"]
command.extend(opts)
- res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ res = subprocess.run(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
+ )
if res.returncode != 0:
- raise RuntimeError('Build failure, executing {}:\n'.format(command) + res.stderr)
- command = ['make', '-B', 'arch={}'.format(arch), 'app={}'.format(app)]
+ raise RuntimeError(
+ "Build failure, executing {}:\n".format(command) + res.stderr
+ )
+ command = ["make", "-B", "arch={}".format(arch), "app={}".format(app)]
command.extend(opts)
- res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ res = subprocess.run(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
+ )
if res.returncode != 0:
- raise RuntimeError('Build failure, executing {}:\n '.format(command) + res.stderr)
+ raise RuntimeError(
+ "Build failure, executing {}:\n ".format(command) + res.stderr
+ )
return command
def flash(arch, app, opts=[]):
- command = ['make', 'arch={}'.format(arch), 'app={}'.format(app), 'program']
+ command = ["make", "arch={}".format(arch), "app={}".format(app), "program"]
command.extend(opts)
- res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ res = subprocess.run(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
+ )
if res.returncode != 0:
- raise RuntimeError('Flash failure')
+ raise RuntimeError("Flash failure")
return command
@@ -316,13 +333,14 @@ def get_info(arch, opts: list = []) -> list:
Returns a list.
"""
- command = ['make', 'arch={}'.format(arch), 'info']
+ command = ["make", "arch={}".format(arch), "info"]
command.extend(opts)
- res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=True)
+ res = subprocess.run(
+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
+ )
if res.returncode != 0:
- raise RuntimeError('make info Failure')
- return res.stdout.split('\n')
+ raise RuntimeError("make info Failure")
+ return res.stdout.split("\n")
def get_monitor(arch: str, **kwargs) -> object:
@@ -336,32 +354,32 @@ def get_monitor(arch: str, **kwargs) -> object:
:param mimosa: `MIMOSAMonitor` options. Returns a MIMOSA monitor if not None.
"""
for line in get_info(arch):
- if 'Monitor:' in line:
- _, port, arg = line.split(' ')
- if port == 'run':
+ if "Monitor:" in line:
+ _, port, arg = line.split(" ")
+ if port == "run":
return ShellMonitor(arg, **kwargs)
- elif 'mimosa' in kwargs and kwargs['mimosa'] is not None:
- mimosa_kwargs = kwargs.pop('mimosa')
+ elif "mimosa" in kwargs and kwargs["mimosa"] is not None:
+ mimosa_kwargs = kwargs.pop("mimosa")
return MIMOSAMonitor(port, arg, **mimosa_kwargs, **kwargs)
- elif 'energytrace' in kwargs and kwargs['energytrace'] is not None:
- energytrace_kwargs = kwargs.pop('energytrace')
+ elif "energytrace" in kwargs and kwargs["energytrace"] is not None:
+ energytrace_kwargs = kwargs.pop("energytrace")
return EnergyTraceMonitor(port, arg, **energytrace_kwargs, **kwargs)
else:
- kwargs.pop('energytrace', None)
- kwargs.pop('mimosa', None)
+ kwargs.pop("energytrace", None)
+ kwargs.pop("mimosa", None)
return SerialMonitor(port, arg, **kwargs)
- raise RuntimeError('Monitor failure')
+ raise RuntimeError("Monitor failure")
def get_counter_limits(arch: str) -> tuple:
"""Return multipass max counter and max overflow value for arch."""
for line in get_info(arch):
- match = re.match('Counter Overflow: ([^/]*)/(.*)', line)
+ match = re.match("Counter Overflow: ([^/]*)/(.*)", line)
if match:
overflow_value = int(match.group(1))
max_overflow = int(match.group(2))
return overflow_value, max_overflow
- raise RuntimeError('Did not find Counter Overflow limits')
+ raise RuntimeError("Did not find Counter Overflow limits")
def get_counter_limits_us(arch: str) -> tuple:
@@ -370,13 +388,13 @@ def get_counter_limits_us(arch: str) -> tuple:
overflow_value = 0
max_overflow = 0
for line in get_info(arch):
- match = re.match(r'CPU\s+Freq:\s+(.*)\s+Hz', line)
+ match = re.match(r"CPU\s+Freq:\s+(.*)\s+Hz", line)
if match:
cpu_freq = int(match.group(1))
- match = re.match(r'Counter Overflow:\s+([^/]*)/(.*)', line)
+ match = re.match(r"Counter Overflow:\s+([^/]*)/(.*)", line)
if match:
overflow_value = int(match.group(1))
max_overflow = int(match.group(2))
if cpu_freq and overflow_value:
return 1000000 / cpu_freq, overflow_value * 1000000 / cpu_freq, max_overflow
- raise RuntimeError('Did not find Counter Overflow limits')
+ raise RuntimeError("Did not find Counter Overflow limits")