diff options
Diffstat (limited to 'bin/korad-logger')
-rwxr-xr-x | bin/korad-logger | 607 |
1 files changed, 607 insertions, 0 deletions
diff --git a/bin/korad-logger b/bin/korad-logger new file mode 100755 index 0000000..84d9483 --- /dev/null +++ b/bin/korad-logger @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 +# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 +# +# Copyright (C) 2021 Daniel Friesel +# +# SPDX-License-Identifier: GPL-2.0-or-later + +"""korad-logger - Data Logger and Controller for Korad KAxxxxP power supplies + +DESCRIPTION + +korad-logger logs voltage and current readings provided by a KAxxxxP power +supply with serial/USB interface, sold under brands such as Korad or RND Lab. +It is also capable of performing simple control tasks, such as stepping through +voltage/current slopes for automated I-V curve measurements. Measurements can +be taken directly (by specifying <measurement duration> in seconds) or loaded +from a logfile using --load <file>. Data can be plotted or aggregated on stdout. + +WARNING + +The KAxxxxP serial interface supports both reading current/voltage +data and writing current/voltage limits. korad-logger uses these to change +PSU attributes at runtime, if requested. The serial protocol does not use +checksums or similar mechanisms, so communication errors or bugs may cause the +power supply to receive a write command with an arbitrary voltage or current +value. This may result in damaged equipment, fire, or other harm. By using +this software, you acknowledge that you are aware of these risks and the +following disclaimer. + +This software is provided by the copyright holders and contributors "as is" and +any express or implied warranties, including, but not limited to, the implied +warranties of merchantability and fitness for a particular purpose are +disclaimed. In no event shall the copyright holder or contributors be liable +for any direct, indirect, incidental, special, exemplary, or consequential +damages (including, but not limited to, procurement of substitute goods or +services; loss of use, data, or profits; or business interruption) however +caused and on any theory of liability, whether in contract, strict liability, +or tort (including negligence or otherwise) arising in any way out of the use +of this software, even if advised of the possibility of such damage. + +OPTIONS +""" + +import argparse +import numpy as np +import serial +import serial.threaded +import signal +import sys +import tempfile +import time + +terminate_measurement = False + + +def running_mean(x: np.ndarray, N: int) -> np.ndarray: + """ + Compute `N` elements wide running average over `x`. + + :param x: 1-Dimensional NumPy array + :param N: how many items to average. Should be even for optimal results. + """ + + # to ensure that output.shape == input.shape, we need to insert data + # at the boundaries + boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) + boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) + + return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") + + +class SerialReader(serial.threaded.Protocol): + def __init__(self): + self.remaining_chars = 0 + self.read_complete = False + self.expect_binary = False + self.recv_buf = "" + self.lines = [] + + def expect(self, num_chars, binary=False): + self.recv_buf = "" + self.remaining_chars = num_chars + self.read_complete = False + self.expect_binary = binary + + def __call__(self): + return self + + def data_received(self, data): + if self.expect_binary: + self.lines.extend(list(data)) + self.remaining_chars -= len(data) + if self.remaining_chars <= 0: + self.read_complete = True + return + + try: + str_data = data.decode("UTF-8") + self.recv_buf += str_data + except UnicodeDecodeError: + sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) + return + + self.remaining_chars -= len(str_data) + + if self.remaining_chars <= 0: + self.lines.append(self.recv_buf) + self.read_complete = True + + def get_expected_line(self): + if len(self.lines): + if self.expect_binary: + ret = self.lines + else: + ret = self.lines[0] + self.lines = list() + return ret + return None + + def get_line(self): + if len(self.lines): + ret = self.lines[-1] + self.lines = [] + return ret + return None + + +class KoradStatus: + # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. + # Or they're the wrong bits altogether. + # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and + # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/> + # don't agree on how to parse the status byte. + def __init__(self, status_bytes): + status_byte = status_bytes[0] + self.over_current_protection_enabled = bool(status_byte & 0x20) + self.output_enabled = bool(status_byte & 0x40) + self.over_voltage_protection_enabled = bool(status_byte & 0x80) + + def __repr__(self): + return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>" + + +class KA320: + def __init__(self, port, channel=1): + self.ser = serial.serial_for_url(port, do_not_open=True) + self.ser.baudrate = 9600 + self.ser.parity = "N" + self.ser.rtscts = False + self.ser.xonxoff = False + + try: + self.ser.open() + except serial.SerialException as e: + sys.stderr.write( + "Could not open serial port {}: {}\n".format(self.ser.name, e) + ) + sys.exit(1) + + self.channel = channel + + self.reader = SerialReader() + self.worker = serial.threaded.ReaderThread(self.ser, self.reader) + self.worker.start() + + def rw(self, cmd, num_chars, exact=False, binary=False): + self.reader.expect(num_chars, binary=binary) + self.ser.write(cmd) + timeout = 20 + while not self.reader.read_complete and not timeout == 0: + time.sleep(0.02) + timeout -= 1 + if exact: + return self.reader.get_expected_line() + elif self.reader.read_complete: + return self.reader.get_line() + else: + return self.reader.recv_buf + + # See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands + + def connect(self): + # Device ID length is unknown + return self.rw(b"*IDN?", 32, exact=False) + + def get_status(self): + return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) + + def ovp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OVP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_voltage_protection_enabled == enable + + def ocp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OCP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_current_protection_enabled == enable + + def set_max_voltage(self, max_voltage): + self.ser.write(f"VSET{self.channel:d}:{max_voltage:05.2f}".encode()) + time.sleep(0.1) + + def set_max_current(self, max_current): + self.ser.write(f"ISET{self.channel:d}:{max_current:05.3f}".encode()) + time.sleep(0.1) + + def get_max_voltage(self): + return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) + + def get_max_current(self): + return float(self.rw(f"ISET{self.channel:d}?".encode(), 5, True)) + + def get_voltage(self): + try: + return float(self.rw(f"VOUT{self.channel:d}?".encode(), 5, True)) + except TypeError: + return None + + def get_current(self): + try: + return float(self.rw(f"IOUT{self.channel:d}?".encode(), 5, True)) + except TypeError: + return None + + def set_output(self, enable): + if enable: + self.ser.write(b"OUT1") + else: + self.ser.write(b"OUT0") + time.sleep(0.1) + + def disconnect(self): + self.worker.stop() + self.ser.close() + + +def graceful_exit(sig, frame): + global terminate_measurement + terminate_measurement = True + + +def measure_data( + port, + filename, + duration, + channel=1, + ocp=False, + ovp=False, + max_voltage=None, + max_current=None, + voltage_range=(None, None, None), + current_range=(None, None, None), + on_off=False, +): + global terminate_measurement + + voltage_start, voltage_stop, voltage_step = voltage_range + current_start, current_stop, current_step = current_range + last_range_step = 0 + + if voltage_start is not None: + max_voltage = voltage_start + if current_start is not None: + max_current = current_start + + signal.signal(signal.SIGINT, graceful_exit) + signal.signal(signal.SIGTERM, graceful_exit) + signal.signal(signal.SIGQUIT, graceful_exit) + korad = KA320(port, channel) + + start_ts = time.time() + + if filename is not None: + output_handle = open(filename, "w+") + else: + output_handle = tempfile.TemporaryFile("w+") + + if max_voltage is not None or max_current is not None: + # turn off output before setting current and voltage limits + print("Turning off outputs") + korad.set_output(False) + + if max_voltage is not None: + print(f"Setting voltage limit to {max_voltage:5.2f} V") + korad.set_max_voltage(max_voltage) + + if max_current is not None: + print(f"Setting current limit to {max_current:5.3f} A") + korad.set_max_current(max_current) + + if ovp: + print("Enabling over-voltage protection") + korad.ovp(True) + + if ocp: + print("Enabling over-current protection") + korad.ocp(True) + + if max_voltage is not None or max_current is not None or on_off: + print("Turning on outputs") + korad.set_output(True) + + if duration: + print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") + else: + print(f"Starting data acquisition. Press Ctrl+C to stop.") + + print("# Device: " + korad.connect(), file=output_handle) + print("# Timestamp Voltage Current", file=output_handle) + while not terminate_measurement: + ts = time.time() + current = korad.get_current() + voltage = korad.get_voltage() + if voltage is not None and current is not None: + print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle) + elif voltage is not None: + print(f"{ts:.3f} {voltage:5.2f} NaN", file=output_handle) + elif current is not None: + print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle) + else: + print(f"{ts:.3f} NaN NaN", file=output_handle) + time.sleep(0.1) + + if int(ts - start_ts) > last_range_step: + last_range_step = int(ts - start_ts) + if voltage_step: + max_voltage = voltage_start + last_range_step * voltage_step + if (voltage_step > 0 and max_voltage <= voltage_stop) or ( + voltage_step < 0 and max_voltage >= voltage_stop + ): + print(f"Setting voltage limit to {max_voltage:5.2f} V") + korad.set_max_voltage(max_voltage) + if current_step: + max_current = current_start + last_range_step * current_step + if (current_step > 0 and max_current <= current_stop) or ( + current_step < 0 and max_current >= current_stop + ): + print(f"Setting current limit to {max_current:5.3f} A") + korad.set_max_current(max_current) + + if duration and ts - start_ts > duration: + terminate_measurement = True + + if on_off: + print("Turning off outputs") + korad.set_output(False) + + korad.disconnect() + + output_handle.seek(0) + output = output_handle.read() + output_handle.close() + + return output + + +def plot_data(data, mode): + import matplotlib.pyplot as plt + + if mode == "U": + (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1], 10), + "r-", + label="mean(U, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Voltage [V]") + + elif mode == "I": + (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 2], 10), + "r-", + label="mean(I, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Current [A]") + + elif mode == "P": + (datahandle,) = plt.plot( + data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1 + ) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1] * data[:, 2], 10), + "r-", + label="mean(P, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Power [W]") + + elif mode == "UI": + plt.plot(data[:, 1], data[:, 2], "bs", markersize=2) + plt.xlabel("Voltage [V]") + plt.ylabel("Current [A]") + + elif mode == "UP": + plt.plot(data[:, 1], data[:, 1] * data[:, 2], "bs", markersize=2) + plt.xlabel("Voltage [V]") + plt.ylabel("Power [W]") + + elif mode == "IU": + plt.plot(data[:, 2], data[:, 1], "bs", markersize=2) + plt.xlabel("Current [A]") + plt.ylabel("Voltage [V]") + + elif mode == "IP": + plt.plot(data[:, 2], data[:, 1] * data[:, 2], "bs", markersize=2) + plt.xlabel("Current [A]") + plt.ylabel("Power [W]") + + plt.show() + + +def parse_data(log_data, skip=None, limit=None): + lines = log_data.split("\n") + data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) + data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) + + data = np.empty((data_count, 3)) + skip_index = 0 + limit_index = data_count + + for i, line in enumerate(data_lines): + fields = line.split() + if len(fields) == 3: + timestamp, voltage, current = map(float, fields) + else: + raise RuntimeError('cannot parse line "{}"'.format(line)) + + if i == 0: + first_timestamp = timestamp + + timestamp = timestamp - first_timestamp + + if skip is not None and timestamp < skip: + skip_index = i + 1 + continue + + if limit is not None and timestamp > limit: + limit_index = i - 1 + break + + data[i] = [timestamp, voltage, current] + + data = data[skip_index:limit_index] + + return data + + +def parse_range(range_spec): + if range_spec is None: + return None, None, None + + start, stop, step = list(map(float, range_spec.split())) + + if start < 0 or stop < 0: + print( + f"Range specification '{range_spec}' is invalid: start and stop must be positive", + file=sys.stderr, + ) + sys.exit(1) + + if step == 0: + print( + f"Range specification '{range_spec}' is invalid: step must be ≠ 0", + file=sys.stderr, + ) + sys.exit(1) + + if (start < stop and step < 0) or (start > stop and step > 0): + step = -step + + return start, stop, step + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ + ) + parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE") + parser.add_argument( + "--port", + metavar="PORT", + type=str, + default="/dev/ttyACM0", + help="Set PSU serial port", + ) + parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") + parser.add_argument( + "--over-current-protection", + "--ocp", + action="store_true", + help="Enable over-current protection", + ) + parser.add_argument( + "--over-voltage-protection", + "--ovp", + action="store_true", + help="Enable over-voltage protection", + ) + parser.add_argument( + "--voltage-limit", + type=float, + metavar="VOLTAGE", + help="Set voltage limit", + ) + parser.add_argument( + "--voltage-range", + type=str, + metavar="START STOP STEP", + help="Vary voltage limit from START to STOP over the course of the measurement. Adjust by STEP V per second.", + ) + parser.add_argument( + "--current-limit", + type=float, + help="Set current limit", + ) + parser.add_argument( + "--current-range", + type=str, + metavar="START STOP STEP", + help="Vary current limit from START to STOP over the course of the measurement. Adjust by STEP A per second.", + ) + parser.add_argument( + "--on-off", + action="store_true", + help="Enable output after starting the measurement; disable it after stopping it", + ) + parser.add_argument( + "--save", metavar="FILE", type=str, help="Save measurement data in FILE" + ) + parser.add_argument( + "--skip", + metavar="N", + type=float, + default=0, + help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement", + ) + parser.add_argument( + "--limit", + type=float, + metavar="N", + help="Limit analysis to the first N seconds of data", + ) + parser.add_argument( + "--plot", + metavar="UNIT", + choices=["U", "I", "P", "UI", "UP", "IU", "IP"], + help="Plot voltage / current / power over time or voltage vs current / current vs voltage", + ) + parser.add_argument( + "duration", type=int, nargs="?", help="Measurement duration in seconds" + ) + + args = parser.parse_args() + + if args.load is None and args.duration is None: + print("Either --load or duration must be specified", file=sys.stderr) + sys.exit(1) + + current_range = parse_range(args.current_range) + voltage_range = parse_range(args.voltage_range) + + if args.load: + if args.load.endswith(".xz"): + import lzma + + with lzma.open(args.load, "rt") as f: + log_data = f.read() + else: + with open(args.load, "r") as f: + log_data = f.read() + else: + log_data = measure_data( + args.port, + args.save, + args.duration, + channel=args.channel, + ocp=args.over_current_protection, + ovp=args.over_voltage_protection, + max_voltage=args.voltage_limit, + max_current=args.current_limit, + voltage_range=voltage_range, + current_range=current_range, + on_off=args.on_off, + ) + + data = parse_data(log_data, skip=args.skip, limit=args.limit) + + if args.plot: + plot_data(data, args.plot) + + +if __name__ == "__main__": + main() |