diff options
-rw-r--r-- | lib/runner.py | 87 |
1 files changed, 73 insertions, 14 deletions
diff --git a/lib/runner.py b/lib/runner.py index 99061a3..cb3486b 100644 --- a/lib/runner.py +++ b/lib/runner.py @@ -1,7 +1,13 @@ """ Utilities for running benchmarks. -Foo. +Classes: + SerialMonitor -- captures serial output for a specific amount of time + ShellMonitor -- captures UNIX program output for a specific amount of time + +Functions: + get_monitor -- return Monitor class suitable for the selected multipass arch + get_counter_limits -- return arch-specific multipass counter limits (max value, max overflow) """ import re @@ -11,17 +17,22 @@ import subprocess import time class SerialReader(serial.threaded.Protocol): + """ + Character- to line-wise data buffer for serial interfaces. + + Reads in new data whenever it becomes available and exposes a line-based + interface to applications. + """ def __init__(self): + """Create a new SerialReader object.""" self.recv_buf = '' self.lines = [] - def expect(self, num_chars): - self.recv_buf = '' - def __call__(self): return self def data_received(self, data): + """Append newly received serial data to the line buffer.""" try: str_data = data.decode('UTF-8') self.recv_buf += str_data @@ -35,12 +46,24 @@ class SerialReader(serial.threaded.Protocol): pass #sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data)) - def get_lines(self): + def get_lines(self) -> list: + """ + Return the latest batch of complete lines. + + The return value is a list and may be empty. + + Empties the internal line buffer to ensure that no line is returned twice. + """ ret = self.lines self.lines = [] return ret - def get_line(self): + def get_line(self) -> str: + """ + Return the latest complete line, or None. + + Empties the entire internal line buffer to ensure that no line is returned twice. + """ if len(self.lines): ret = self.lines[-1] self.lines = [] @@ -48,13 +71,20 @@ class SerialReader(serial.threaded.Protocol): return None class SerialMonitor: - def __init__(self, port, baud): + """SerialMonitor captures serial output for a specific amount of time.""" + + def __init__(self, port: str, baud: int): + """ + Create a new SerialMonitor connected to port at the specified baud rate. + + Communication uses no parity, no flow control, and one stop bit. + Data collection starts immediately. + """ self.ser = serial.serial_for_url(port, do_not_open=True) self.ser.baudrate = baud self.ser.parity = 'N' self.ser.rtscts = False self.ser.xonxoff = False - self.check_command = None try: self.ser.open() @@ -66,28 +96,55 @@ class SerialMonitor: self.worker = serial.threaded.ReaderThread(self.ser, self.reader) self.worker.start() - def run(self, timeout = 10): + def run(self, timeout: int = 10) -> list: + """ + Collect serial output for timeout seconds and return a list of all output lines. + + Blocks until data collection is complete. + """ time.sleep(timeout) return self.reader.get_lines() def close(self): + """Close serial connection.""" self.worker.stop() self.ser.close() class ShellMonitor: - def __init__(self, script): + """SerialMonitor runs a program and captures its output for a specific amount of time.""" + def __init__(self, script: str): + """ + Create a new ShellMonitor object. + + Does not start execution and monitoring yet. + """ self.script = script - def run(self, timeout = 4): + def run(self, timeout: int = 4) -> list: + """ + Run program for timeout seconds and return a list of its stdout lines. + + stderr and return status are discarded at the moment. + """ res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script], stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True) return res.stdout.split('\n') def close(self): + """ + Do nothing, successfully. + + Intended for compatibility with SerialMonitor. + """ pass -def get_info(arch, opts = []): +def get_info(arch, opts: list = []) -> list: + """ + Return multipass "make info" output. + + Returns a list. + """ command = ['make', 'arch={}'.format(arch), 'info'] command.extend(opts) res = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, @@ -96,7 +153,8 @@ def get_info(arch, opts = []): raise RuntimeError('make info Failure') return res.stdout.split('\n') -def get_monitor(arch): +def get_monitor(arch: str) -> object: + """Return a SerialMonitor or ShellMonitor.""" for line in get_info(arch): if 'Monitor:' in line: _, port, arg = line.split(' ') @@ -106,7 +164,8 @@ def get_monitor(arch): return SerialMonitor(port, arg) raise RuntimeError('Monitor failure') -def get_counter_limits(arch): +def get_counter_limits(arch: str) -> tuple: + """Return multipass max counter and max overflow value for arch.""" for line in get_info(arch): match = re.match('Counter Overflow: ([^/]*)/(.*)', line) if match: |