summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <derf@finalrewind.org>2021-07-09 20:01:12 +0200
committerDaniel Friesel <derf@finalrewind.org>2021-07-09 20:01:12 +0200
commite22b809bccd1cde36f6111e17c00ce094e687cfd (patch)
tree808d4251297eb16035e67185052a3d00566da9f3
Initial commit
-rwxr-xr-xbin/kaxxxxp-viewer358
1 files changed, 358 insertions, 0 deletions
diff --git a/bin/kaxxxxp-viewer b/bin/kaxxxxp-viewer
new file mode 100755
index 0000000..afceb57
--- /dev/null
+++ b/bin/kaxxxxp-viewer
@@ -0,0 +1,358 @@
+#!/usr/bin/env python3
+# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160
+#
+# Copyright (C) 2021 Daniel Friesel
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+"""kaxxxxp-viewer - Data Logger and Viewer for KAxxxxP power supplies
+
+DESCRIPTION
+
+kaxxxxp-viewer logs voltage and current readings provided by a KAxxxxP power supply.
+Measurements can be taken directly (by specifying <measurement duration> in seconds)
+or loaded from a logfile using --load <file>. Data can be plotted or aggregated on stdout.
+
+WARNING
+
+The power supply's serial communication protocol is supports both read and write
+operations. Communication errors or bugs may cause the power supply to set an
+incompatible voltage or current limit, which may result in damaged equipment or
+fire. By using this software, you acknowledge that you are aware of these risks
+and the following disclaimer.
+
+This software is provided by the copyright holders and contributors "as is" and
+any express or implied warranties, including, but not limited to, the implied
+warranties of merchantability and fitness for a particular purpose are
+disclaimed. In no event shall the copyright holder or contributors be liable
+for any direct, indirect, incidental, special, exemplary, or consequential
+damages (including, but not limited to, procurement of substitute goods or
+services; loss of use, data, or profits; or business interruption) however
+caused and on any theory of liability, whether in contract, strict liability,
+or tort (including negligence or otherwise) arising in any way out of the use
+of this software, even if advised of the possibility of such damage.
+
+OPTIONS
+"""
+
+import argparse
+import numpy as np
+import serial
+import serial.threaded
+import signal
+import sys
+import tempfile
+import time
+
+terminate_measurement = False
+
+
+def running_mean(x: np.ndarray, N: int) -> np.ndarray:
+ """
+ Compute `N` elements wide running average over `x`.
+
+ :param x: 1-Dimensional NumPy array
+ :param N: how many items to average. Should be even for optimal results.
+ """
+
+ # to ensure that output.shape == input.shape, we need to insert data
+ # at the boundaries
+ boundary_array = np.insert(x, 0, np.full((N // 2), x[0]))
+ boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1]))
+
+ return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid")
+
+
+class SerialReader(serial.threaded.Protocol):
+ def __init__(self):
+ self.remaining_chars = 0
+ self.recv_buf = ""
+ self.lines = []
+
+ def expect(self, num_chars):
+ self.recv_buf = ""
+ self.remaining_chars = num_chars
+
+ def __call__(self):
+ return self
+
+ def data_received(self, data):
+ try:
+ str_data = data.decode("UTF-8")
+ self.recv_buf += str_data
+ except UnicodeDecodeError:
+ sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data))
+
+ self.remaining_chars -= len(str_data)
+
+ if self.remaining_chars <= 0:
+ self.lines.append(self.recv_buf)
+
+ def get_expected_line(self):
+ if len(self.lines):
+ ret = self.lines[0]
+ self.lines = []
+ return ret
+ return None
+
+ def get_line(self):
+ if len(self.lines):
+ ret = self.lines[-1]
+ self.lines = []
+ return ret
+ return None
+
+
+class KA320:
+ def __init__(self, port):
+ self.ser = serial.serial_for_url(port, do_not_open=True)
+ self.ser.baudrate = 9600
+ self.ser.parity = "N"
+ self.ser.rtscts = False
+ self.ser.xonxoff = False
+
+ try:
+ self.ser.open()
+ except serial.SerialException as e:
+ sys.stderr.write(
+ "Could not open serial port {}: {}\n".format(self.ser.name, e)
+ )
+ sys.exit(1)
+
+ self.reader = SerialReader()
+ self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
+ self.worker.start()
+
+ def rw(self, cmd, num_chars, exact=False):
+ self.reader.expect(num_chars)
+ self.ser.write(cmd)
+ time.sleep(0.1)
+ if exact:
+ return self.reader.get_expected_line()
+ return self.reader.get_line()
+
+ def connect(self):
+ return self.rw(b"*IDN?", 16)
+
+ def get_max_voltage(self):
+ return float(self.rw(b"VSET1?", 5))
+
+ def get_max_current(self):
+ return float(self.rw(b"ISET1?", 5, True))
+
+ def get_voltage(self):
+ try:
+ return float(self.rw(b"VOUT1?", 5))
+ except TypeError:
+ return None
+
+ def get_current(self):
+ try:
+ return float(self.rw(b"IOUT1?", 5, True))
+ except TypeError:
+ return None
+
+ def set_output(self, enable):
+ if enable:
+ self.ser.write(b"OUT1")
+ else:
+ self.ser.write(b"OUT0")
+ time.sleep(0.1)
+
+ def disconnect(self):
+ self.worker.stop()
+ self.ser.close()
+
+
+def graceful_exit(sig, frame):
+ global terminate_measurement
+ terminate_measurement = True
+
+
+def measure_data(port, filename, duration):
+ global terminate_measurement
+
+ signal.signal(signal.SIGINT, graceful_exit)
+ signal.signal(signal.SIGTERM, graceful_exit)
+ signal.signal(signal.SIGQUIT, graceful_exit)
+ korad = KA320(port)
+
+ start_ts = time.time()
+
+ if filename is not None:
+ output_handle = open(filename, "w+")
+ else:
+ output_handle = tempfile.TemporaryFile("w+")
+
+ print("# Device: " + korad.connect(), file=output_handle)
+ print("# Timestamp Voltage Current", file=output_handle)
+ while not terminate_measurement:
+ ts = time.time()
+ current = korad.get_current()
+ voltage = korad.get_voltage()
+ if voltage is not None and current is not None:
+ print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle)
+ elif voltage is not None:
+ print(f"{ts:.3f} {volvoltage:5.2f} NaN", file=output_handle)
+ elif current is not None:
+ print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle)
+ else:
+ print(f"{ts:.3f} NaN NaN", file=output_handle)
+ time.sleep(0.1)
+
+ if duration and ts - start_ts > duration:
+ terminate_measurement = True
+
+ korad.disconnect()
+
+ output_handle.seek(0)
+ output = output_handle.read()
+ output_handle.close()
+
+ return output
+
+
+def plot_data(data, mode):
+ import matplotlib.pyplot as plt
+
+ if mode == "U":
+ (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1)
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 1], 10),
+ "r-",
+ label="mean(U, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.ylabel("Voltage [V]")
+
+ elif mode == "I":
+ (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1)
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 2], 10),
+ "r-",
+ label="mean(I, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.ylabel("Current [A]")
+
+ elif mode == "P":
+ (datahandle,) = plt.plot(
+ data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1
+ )
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 1] * data[:, 2], 10),
+ "r-",
+ label="mean(P, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.ylabel("Power [W]")
+
+ plt.show()
+
+
+def parse_data(log_data, skip=None, limit=None):
+ lines = log_data.split("\n")
+ data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines))
+ data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines)
+
+ data = np.empty((data_count, 3))
+ skip_index = 0
+ limit_index = data_count
+
+ for i, line in enumerate(data_lines):
+ fields = line.split()
+ if len(fields) == 3:
+ timestamp, voltage, current = map(float, fields)
+ else:
+ raise RuntimeError('cannot parse line "{}"'.format(line))
+
+ if i == 0:
+ first_timestamp = timestamp
+
+ timestamp = timestamp - first_timestamp
+
+ if skip is not None and timestamp < skip:
+ skip_index = i + 1
+ continue
+
+ if limit is not None and timestamp > limit:
+ limit_index = i - 1
+ break
+
+ data[i] = [timestamp, voltage, current]
+
+ data = data[skip_index:limit_index]
+
+ return data
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__
+ )
+ parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE")
+ parser.add_argument(
+ "--port",
+ metavar="PORT",
+ type=str,
+ default="/dev/ttyACM0",
+ help="Set PSU serial port",
+ )
+ parser.add_argument(
+ "--save", metavar="FILE", type=str, help="Save measurement data in FILE"
+ )
+ parser.add_argument(
+ "--skip",
+ metavar="N",
+ type=float,
+ default=0,
+ help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement",
+ )
+ parser.add_argument(
+ "--limit",
+ type=float,
+ metavar="N",
+ help="Limit analysis to the first N seconds of data",
+ )
+ parser.add_argument(
+ "--plot",
+ metavar="UNIT",
+ choices=["U", "I", "P"],
+ help="Plot voltage / current / power over time",
+ )
+ parser.add_argument(
+ "duration", type=int, nargs="?", help="Measurement duration in seconds"
+ )
+
+ args = parser.parse_args()
+
+ if args.load is None and args.duration is None:
+ print("Either --load or duration must be specified", file=sys.stderr)
+ sys.exit(1)
+
+ if args.load:
+ if args.load.endswith(".xz"):
+ import lzma
+
+ with lzma.open(args.load, "rt") as f:
+ log_data = f.read()
+ else:
+ with open(args.load, "r") as f:
+ log_data = f.read()
+ else:
+ log_data = measure_data(args.port, args.save, args.duration)
+
+ data = parse_data(log_data, skip=args.skip, limit=args.limit)
+
+ if args.plot:
+ plot_data(data, args.plot)
+
+
+if __name__ == "__main__":
+ main()