From e22b809bccd1cde36f6111e17c00ce094e687cfd Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Fri, 9 Jul 2021 20:01:12 +0200 Subject: Initial commit --- bin/kaxxxxp-viewer | 358 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 358 insertions(+) create mode 100755 bin/kaxxxxp-viewer diff --git a/bin/kaxxxxp-viewer b/bin/kaxxxxp-viewer new file mode 100755 index 0000000..afceb57 --- /dev/null +++ b/bin/kaxxxxp-viewer @@ -0,0 +1,358 @@ +#!/usr/bin/env python3 +# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 +# +# Copyright (C) 2021 Daniel Friesel +# +# SPDX-License-Identifier: GPL-2.0-or-later + +"""kaxxxxp-viewer - Data Logger and Viewer for KAxxxxP power supplies + +DESCRIPTION + +kaxxxxp-viewer logs voltage and current readings provided by a KAxxxxP power supply. +Measurements can be taken directly (by specifying in seconds) +or loaded from a logfile using --load . Data can be plotted or aggregated on stdout. + +WARNING + +The power supply's serial communication protocol is supports both read and write +operations. Communication errors or bugs may cause the power supply to set an +incompatible voltage or current limit, which may result in damaged equipment or +fire. By using this software, you acknowledge that you are aware of these risks +and the following disclaimer. + +This software is provided by the copyright holders and contributors "as is" and +any express or implied warranties, including, but not limited to, the implied +warranties of merchantability and fitness for a particular purpose are +disclaimed. In no event shall the copyright holder or contributors be liable +for any direct, indirect, incidental, special, exemplary, or consequential +damages (including, but not limited to, procurement of substitute goods or +services; loss of use, data, or profits; or business interruption) however +caused and on any theory of liability, whether in contract, strict liability, +or tort (including negligence or otherwise) arising in any way out of the use +of this software, even if advised of the possibility of such damage. + +OPTIONS +""" + +import argparse +import numpy as np +import serial +import serial.threaded +import signal +import sys +import tempfile +import time + +terminate_measurement = False + + +def running_mean(x: np.ndarray, N: int) -> np.ndarray: + """ + Compute `N` elements wide running average over `x`. + + :param x: 1-Dimensional NumPy array + :param N: how many items to average. Should be even for optimal results. + """ + + # to ensure that output.shape == input.shape, we need to insert data + # at the boundaries + boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) + boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) + + return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") + + +class SerialReader(serial.threaded.Protocol): + def __init__(self): + self.remaining_chars = 0 + self.recv_buf = "" + self.lines = [] + + def expect(self, num_chars): + self.recv_buf = "" + self.remaining_chars = num_chars + + def __call__(self): + return self + + def data_received(self, data): + try: + str_data = data.decode("UTF-8") + self.recv_buf += str_data + except UnicodeDecodeError: + sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) + + self.remaining_chars -= len(str_data) + + if self.remaining_chars <= 0: + self.lines.append(self.recv_buf) + + def get_expected_line(self): + if len(self.lines): + ret = self.lines[0] + self.lines = [] + return ret + return None + + def get_line(self): + if len(self.lines): + ret = self.lines[-1] + self.lines = [] + return ret + return None + + +class KA320: + def __init__(self, port): + self.ser = serial.serial_for_url(port, do_not_open=True) + self.ser.baudrate = 9600 + self.ser.parity = "N" + self.ser.rtscts = False + self.ser.xonxoff = False + + try: + self.ser.open() + except serial.SerialException as e: + sys.stderr.write( + "Could not open serial port {}: {}\n".format(self.ser.name, e) + ) + sys.exit(1) + + self.reader = SerialReader() + self.worker = serial.threaded.ReaderThread(self.ser, self.reader) + self.worker.start() + + def rw(self, cmd, num_chars, exact=False): + self.reader.expect(num_chars) + self.ser.write(cmd) + time.sleep(0.1) + if exact: + return self.reader.get_expected_line() + return self.reader.get_line() + + def connect(self): + return self.rw(b"*IDN?", 16) + + def get_max_voltage(self): + return float(self.rw(b"VSET1?", 5)) + + def get_max_current(self): + return float(self.rw(b"ISET1?", 5, True)) + + def get_voltage(self): + try: + return float(self.rw(b"VOUT1?", 5)) + except TypeError: + return None + + def get_current(self): + try: + return float(self.rw(b"IOUT1?", 5, True)) + except TypeError: + return None + + def set_output(self, enable): + if enable: + self.ser.write(b"OUT1") + else: + self.ser.write(b"OUT0") + time.sleep(0.1) + + def disconnect(self): + self.worker.stop() + self.ser.close() + + +def graceful_exit(sig, frame): + global terminate_measurement + terminate_measurement = True + + +def measure_data(port, filename, duration): + global terminate_measurement + + signal.signal(signal.SIGINT, graceful_exit) + signal.signal(signal.SIGTERM, graceful_exit) + signal.signal(signal.SIGQUIT, graceful_exit) + korad = KA320(port) + + start_ts = time.time() + + if filename is not None: + output_handle = open(filename, "w+") + else: + output_handle = tempfile.TemporaryFile("w+") + + print("# Device: " + korad.connect(), file=output_handle) + print("# Timestamp Voltage Current", file=output_handle) + while not terminate_measurement: + ts = time.time() + current = korad.get_current() + voltage = korad.get_voltage() + if voltage is not None and current is not None: + print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle) + elif voltage is not None: + print(f"{ts:.3f} {volvoltage:5.2f} NaN", file=output_handle) + elif current is not None: + print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle) + else: + print(f"{ts:.3f} NaN NaN", file=output_handle) + time.sleep(0.1) + + if duration and ts - start_ts > duration: + terminate_measurement = True + + korad.disconnect() + + output_handle.seek(0) + output = output_handle.read() + output_handle.close() + + return output + + +def plot_data(data, mode): + import matplotlib.pyplot as plt + + if mode == "U": + (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1], 10), + "r-", + label="mean(U, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.ylabel("Voltage [V]") + + elif mode == "I": + (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 2], 10), + "r-", + label="mean(I, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.ylabel("Current [A]") + + elif mode == "P": + (datahandle,) = plt.plot( + data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1 + ) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1] * data[:, 2], 10), + "r-", + label="mean(P, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.ylabel("Power [W]") + + plt.show() + + +def parse_data(log_data, skip=None, limit=None): + lines = log_data.split("\n") + data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) + data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) + + data = np.empty((data_count, 3)) + skip_index = 0 + limit_index = data_count + + for i, line in enumerate(data_lines): + fields = line.split() + if len(fields) == 3: + timestamp, voltage, current = map(float, fields) + else: + raise RuntimeError('cannot parse line "{}"'.format(line)) + + if i == 0: + first_timestamp = timestamp + + timestamp = timestamp - first_timestamp + + if skip is not None and timestamp < skip: + skip_index = i + 1 + continue + + if limit is not None and timestamp > limit: + limit_index = i - 1 + break + + data[i] = [timestamp, voltage, current] + + data = data[skip_index:limit_index] + + return data + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ + ) + parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE") + parser.add_argument( + "--port", + metavar="PORT", + type=str, + default="/dev/ttyACM0", + help="Set PSU serial port", + ) + parser.add_argument( + "--save", metavar="FILE", type=str, help="Save measurement data in FILE" + ) + parser.add_argument( + "--skip", + metavar="N", + type=float, + default=0, + help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement", + ) + parser.add_argument( + "--limit", + type=float, + metavar="N", + help="Limit analysis to the first N seconds of data", + ) + parser.add_argument( + "--plot", + metavar="UNIT", + choices=["U", "I", "P"], + help="Plot voltage / current / power over time", + ) + parser.add_argument( + "duration", type=int, nargs="?", help="Measurement duration in seconds" + ) + + args = parser.parse_args() + + if args.load is None and args.duration is None: + print("Either --load or duration must be specified", file=sys.stderr) + sys.exit(1) + + if args.load: + if args.load.endswith(".xz"): + import lzma + + with lzma.open(args.load, "rt") as f: + log_data = f.read() + else: + with open(args.load, "r") as f: + log_data = f.read() + else: + log_data = measure_data(args.port, args.save, args.duration) + + data = parse_data(log_data, skip=args.skip, limit=args.limit) + + if args.plot: + plot_data(data, args.plot) + + +if __name__ == "__main__": + main() -- cgit v1.2.3