summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/runner.py87
1 files changed, 73 insertions, 14 deletions
diff --git a/lib/runner.py b/lib/runner.py
index 99061a3..cb3486b 100644
--- a/lib/runner.py
+++ b/lib/runner.py
@@ -1,7 +1,13 @@
"""
Utilities for running benchmarks.
-Foo.
+Classes:
+ SerialMonitor -- captures serial output for a specific amount of time
+ ShellMonitor -- captures UNIX program output for a specific amount of time
+
+Functions:
+ get_monitor -- return Monitor class suitable for the selected multipass arch
+ get_counter_limits -- return arch-specific multipass counter limits (max value, max overflow)
"""
import re
@@ -11,17 +17,22 @@ import subprocess
import time
class SerialReader(serial.threaded.Protocol):
+ """
+ Character- to line-wise data buffer for serial interfaces.
+
+ Reads in new data whenever it becomes available and exposes a line-based
+ interface to applications.
+ """
def __init__(self):
+ """Create a new SerialReader object."""
self.recv_buf = ''
self.lines = []
- def expect(self, num_chars):
- self.recv_buf = ''
-
def __call__(self):
return self
def data_received(self, data):
+ """Append newly received serial data to the line buffer."""
try:
str_data = data.decode('UTF-8')
self.recv_buf += str_data
@@ -35,12 +46,24 @@ class SerialReader(serial.threaded.Protocol):
pass
#sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data))
- def get_lines(self):
+ def get_lines(self) -> list:
+ """
+ Return the latest batch of complete lines.
+
+ The return value is a list and may be empty.
+
+ Empties the internal line buffer to ensure that no line is returned twice.
+ """
ret = self.lines
self.lines = []
return ret
- def get_line(self):
+ def get_line(self) -> str:
+ """
+ Return the latest complete line, or None.
+
+ Empties the entire internal line buffer to ensure that no line is returned twice.
+ """
if len(self.lines):
ret = self.lines[-1]
self.lines = []
@@ -48,13 +71,20 @@ class SerialReader(serial.threaded.Protocol):
return None
class SerialMonitor:
- def __init__(self, port, baud):
+ """SerialMonitor captures serial output for a specific amount of time."""
+
+ def __init__(self, port: str, baud: int):
+ """
+ Create a new SerialMonitor connected to port at the specified baud rate.
+
+ Communication uses no parity, no flow control, and one stop bit.
+ Data collection starts immediately.
+ """
self.ser = serial.serial_for_url(port, do_not_open=True)
self.ser.baudrate = baud
self.ser.parity = 'N'
self.ser.rtscts = False
self.ser.xonxoff = False
- self.check_command = None
try:
self.ser.open()
@@ -66,28 +96,55 @@ class SerialMonitor:
self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
self.worker.start()
- def run(self, timeout = 10):
+ def run(self, timeout: int = 10) -> list:
+ """
+ Collect serial output for timeout seconds and return a list of all output lines.
+
+ Blocks until data collection is complete.
+ """
time.sleep(timeout)
return self.reader.get_lines()
def close(self):
+ """Close serial connection."""
self.worker.stop()
self.ser.close()
class ShellMonitor:
- def __init__(self, script):
+ """SerialMonitor runs a program and captures its output for a specific amount of time."""
+ def __init__(self, script: str):
+ """
+ Create a new ShellMonitor object.
+
+ Does not start execution and monitoring yet.
+ """
self.script = script
- def run(self, timeout = 4):
+ def run(self, timeout: int = 4) -> list:
+ """
+ Run program for timeout seconds and return a list of its stdout lines.
+
+ stderr and return status are discarded at the moment.
+ """
res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script],
stdout = subprocess.PIPE, stderr = subprocess.PIPE,
universal_newlines = True)
return res.stdout.split('\n')
def close(self):
+ """
+ Do nothing, successfully.
+
+ Intended for compatibility with SerialMonitor.
+ """
pass
-def get_info(arch, opts = []):
+def get_info(arch, opts: list = []) -> list:
+ """
+ Return multipass "make info" output.
+
+ Returns a list.
+ """
command = ['make', 'arch={}'.format(arch), 'info']
command.extend(opts)
res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
@@ -96,7 +153,8 @@ def get_info(arch, opts = []):
raise RuntimeError('make info Failure')
return res.stdout.split('\n')
-def get_monitor(arch):
+def get_monitor(arch: str) -> object:
+ """Return a SerialMonitor or ShellMonitor."""
for line in get_info(arch):
if 'Monitor:' in line:
_, port, arg = line.split(' ')
@@ -106,7 +164,8 @@ def get_monitor(arch):
return SerialMonitor(port, arg)
raise RuntimeError('Monitor failure')
-def get_counter_limits(arch):
+def get_counter_limits(arch: str) -> tuple:
+ """Return multipass max counter and max overflow value for arch."""
for line in get_info(arch):
match = re.match('Counter Overflow: ([^/]*)/(.*)', line)
if match: