diff options
-rw-r--r-- | lib/runner.py | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/lib/runner.py b/lib/runner.py new file mode 100644 index 0000000..9a6022d --- /dev/null +++ b/lib/runner.py @@ -0,0 +1,88 @@ +""" +Utilities for running benchmarks. + +Foo. +""" + +import serial +import serial.threaded +import subprocess +import time + +class SerialReader(serial.threaded.Protocol): + def __init__(self): + self.recv_buf = '' + self.lines = [] + + def expect(self, num_chars): + self.recv_buf = '' + + def __call__(self): + return self + + def data_received(self, data): + try: + str_data = data.decode('UTF-8') + self.recv_buf += str_data + + lines = self.recv_buf.split("\n\r") + if len(lines) > 1: + self.lines.extend(lines[:-1]) + self.recv_buf = lines[-1] + + except UnicodeDecodeError: + pass + #sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data)) + + def get_lines(self): + ret = self.lines + self.lines = [] + return ret + + def get_line(self): + if len(self.lines): + ret = self.lines[-1] + self.lines = [] + return ret + return None + +class SerialMonitor: + def __init__(self, port, baud): + self.ser = serial.serial_for_url(port, do_not_open=True) + self.ser.baudrate = baud + self.ser.parity = 'N' + self.ser.rtscts = False + self.ser.xonxoff = False + self.check_command = None + + try: + self.ser.open() + except serial.SerialException as e: + sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e)) + sys.exit(1) + + self.reader = SerialReader() + self.worker = serial.threaded.ReaderThread(self.ser, self.reader) + self.worker.start() + + def run(self, timeout = 10): + time.sleep(timeout) + return self.reader.get_lines() + + def close(self): + self.worker.stop() + self.ser.close() + +class ShellMonitor: + def __init__(self, script): + self.script = script + + def run(self, timeout = 4): + res = subprocess.run(['timeout', '{:d}s'.format(timeout), self.script], + stdout = subprocess.PIPE, stderr = subprocess.PIPE, + universal_newlines = True) + return res.stdout.split('\n') + + def close(self): + pass + |