summaryrefslogtreecommitdiff
path: root/bin/korad-logger
diff options
context:
space:
mode:
Diffstat (limited to 'bin/korad-logger')
-rwxr-xr-xbin/korad-logger607
1 files changed, 607 insertions, 0 deletions
diff --git a/bin/korad-logger b/bin/korad-logger
new file mode 100755
index 0000000..84d9483
--- /dev/null
+++ b/bin/korad-logger
@@ -0,0 +1,607 @@
+#!/usr/bin/env python3
+# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160
+#
+# Copyright (C) 2021 Daniel Friesel
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+"""korad-logger - Data Logger and Controller for Korad KAxxxxP power supplies
+
+DESCRIPTION
+
+korad-logger logs voltage and current readings provided by a KAxxxxP power
+supply with serial/USB interface, sold under brands such as Korad or RND Lab.
+It is also capable of performing simple control tasks, such as stepping through
+voltage/current slopes for automated I-V curve measurements. Measurements can
+be taken directly (by specifying <measurement duration> in seconds) or loaded
+from a logfile using --load <file>. Data can be plotted or aggregated on stdout.
+
+WARNING
+
+The KAxxxxP serial interface supports both reading current/voltage
+data and writing current/voltage limits. korad-logger uses these to change
+PSU attributes at runtime, if requested. The serial protocol does not use
+checksums or similar mechanisms, so communication errors or bugs may cause the
+power supply to receive a write command with an arbitrary voltage or current
+value. This may result in damaged equipment, fire, or other harm. By using
+this software, you acknowledge that you are aware of these risks and the
+following disclaimer.
+
+This software is provided by the copyright holders and contributors "as is" and
+any express or implied warranties, including, but not limited to, the implied
+warranties of merchantability and fitness for a particular purpose are
+disclaimed. In no event shall the copyright holder or contributors be liable
+for any direct, indirect, incidental, special, exemplary, or consequential
+damages (including, but not limited to, procurement of substitute goods or
+services; loss of use, data, or profits; or business interruption) however
+caused and on any theory of liability, whether in contract, strict liability,
+or tort (including negligence or otherwise) arising in any way out of the use
+of this software, even if advised of the possibility of such damage.
+
+OPTIONS
+"""
+
+import argparse
+import numpy as np
+import serial
+import serial.threaded
+import signal
+import sys
+import tempfile
+import time
+
+terminate_measurement = False
+
+
+def running_mean(x: np.ndarray, N: int) -> np.ndarray:
+ """
+ Compute `N` elements wide running average over `x`.
+
+ :param x: 1-Dimensional NumPy array
+ :param N: how many items to average. Should be even for optimal results.
+ """
+
+ # to ensure that output.shape == input.shape, we need to insert data
+ # at the boundaries
+ boundary_array = np.insert(x, 0, np.full((N // 2), x[0]))
+ boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1]))
+
+ return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid")
+
+
+class SerialReader(serial.threaded.Protocol):
+ def __init__(self):
+ self.remaining_chars = 0
+ self.read_complete = False
+ self.expect_binary = False
+ self.recv_buf = ""
+ self.lines = []
+
+ def expect(self, num_chars, binary=False):
+ self.recv_buf = ""
+ self.remaining_chars = num_chars
+ self.read_complete = False
+ self.expect_binary = binary
+
+ def __call__(self):
+ return self
+
+ def data_received(self, data):
+ if self.expect_binary:
+ self.lines.extend(list(data))
+ self.remaining_chars -= len(data)
+ if self.remaining_chars <= 0:
+ self.read_complete = True
+ return
+
+ try:
+ str_data = data.decode("UTF-8")
+ self.recv_buf += str_data
+ except UnicodeDecodeError:
+ sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data))
+ return
+
+ self.remaining_chars -= len(str_data)
+
+ if self.remaining_chars <= 0:
+ self.lines.append(self.recv_buf)
+ self.read_complete = True
+
+ def get_expected_line(self):
+ if len(self.lines):
+ if self.expect_binary:
+ ret = self.lines
+ else:
+ ret = self.lines[0]
+ self.lines = list()
+ return ret
+ return None
+
+ def get_line(self):
+ if len(self.lines):
+ ret = self.lines[-1]
+ self.lines = []
+ return ret
+ return None
+
+
+class KoradStatus:
+ # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits.
+ # Or they're the wrong bits altogether.
+ # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and
+ # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/>
+ # don't agree on how to parse the status byte.
+ def __init__(self, status_bytes):
+ status_byte = status_bytes[0]
+ self.over_current_protection_enabled = bool(status_byte & 0x20)
+ self.output_enabled = bool(status_byte & 0x40)
+ self.over_voltage_protection_enabled = bool(status_byte & 0x80)
+
+ def __repr__(self):
+ return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>"
+
+
+class KA320:
+ def __init__(self, port, channel=1):
+ self.ser = serial.serial_for_url(port, do_not_open=True)
+ self.ser.baudrate = 9600
+ self.ser.parity = "N"
+ self.ser.rtscts = False
+ self.ser.xonxoff = False
+
+ try:
+ self.ser.open()
+ except serial.SerialException as e:
+ sys.stderr.write(
+ "Could not open serial port {}: {}\n".format(self.ser.name, e)
+ )
+ sys.exit(1)
+
+ self.channel = channel
+
+ self.reader = SerialReader()
+ self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
+ self.worker.start()
+
+ def rw(self, cmd, num_chars, exact=False, binary=False):
+ self.reader.expect(num_chars, binary=binary)
+ self.ser.write(cmd)
+ timeout = 20
+ while not self.reader.read_complete and not timeout == 0:
+ time.sleep(0.02)
+ timeout -= 1
+ if exact:
+ return self.reader.get_expected_line()
+ elif self.reader.read_complete:
+ return self.reader.get_line()
+ else:
+ return self.reader.recv_buf
+
+ # See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands
+
+ def connect(self):
+ # Device ID length is unknown
+ return self.rw(b"*IDN?", 32, exact=False)
+
+ def get_status(self):
+ return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True))
+
+ def ovp(self, enable=True):
+ enable_bit = int(enable)
+ self.ser.write(f"OVP{enable_bit}".encode())
+ time.sleep(0.1)
+ # assert self.get_status().over_voltage_protection_enabled == enable
+
+ def ocp(self, enable=True):
+ enable_bit = int(enable)
+ self.ser.write(f"OCP{enable_bit}".encode())
+ time.sleep(0.1)
+ # assert self.get_status().over_current_protection_enabled == enable
+
+ def set_max_voltage(self, max_voltage):
+ self.ser.write(f"VSET{self.channel:d}:{max_voltage:05.2f}".encode())
+ time.sleep(0.1)
+
+ def set_max_current(self, max_current):
+ self.ser.write(f"ISET{self.channel:d}:{max_current:05.3f}".encode())
+ time.sleep(0.1)
+
+ def get_max_voltage(self):
+ return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True))
+
+ def get_max_current(self):
+ return float(self.rw(f"ISET{self.channel:d}?".encode(), 5, True))
+
+ def get_voltage(self):
+ try:
+ return float(self.rw(f"VOUT{self.channel:d}?".encode(), 5, True))
+ except TypeError:
+ return None
+
+ def get_current(self):
+ try:
+ return float(self.rw(f"IOUT{self.channel:d}?".encode(), 5, True))
+ except TypeError:
+ return None
+
+ def set_output(self, enable):
+ if enable:
+ self.ser.write(b"OUT1")
+ else:
+ self.ser.write(b"OUT0")
+ time.sleep(0.1)
+
+ def disconnect(self):
+ self.worker.stop()
+ self.ser.close()
+
+
+def graceful_exit(sig, frame):
+ global terminate_measurement
+ terminate_measurement = True
+
+
+def measure_data(
+ port,
+ filename,
+ duration,
+ channel=1,
+ ocp=False,
+ ovp=False,
+ max_voltage=None,
+ max_current=None,
+ voltage_range=(None, None, None),
+ current_range=(None, None, None),
+ on_off=False,
+):
+ global terminate_measurement
+
+ voltage_start, voltage_stop, voltage_step = voltage_range
+ current_start, current_stop, current_step = current_range
+ last_range_step = 0
+
+ if voltage_start is not None:
+ max_voltage = voltage_start
+ if current_start is not None:
+ max_current = current_start
+
+ signal.signal(signal.SIGINT, graceful_exit)
+ signal.signal(signal.SIGTERM, graceful_exit)
+ signal.signal(signal.SIGQUIT, graceful_exit)
+ korad = KA320(port, channel)
+
+ start_ts = time.time()
+
+ if filename is not None:
+ output_handle = open(filename, "w+")
+ else:
+ output_handle = tempfile.TemporaryFile("w+")
+
+ if max_voltage is not None or max_current is not None:
+ # turn off output before setting current and voltage limits
+ print("Turning off outputs")
+ korad.set_output(False)
+
+ if max_voltage is not None:
+ print(f"Setting voltage limit to {max_voltage:5.2f} V")
+ korad.set_max_voltage(max_voltage)
+
+ if max_current is not None:
+ print(f"Setting current limit to {max_current:5.3f} A")
+ korad.set_max_current(max_current)
+
+ if ovp:
+ print("Enabling over-voltage protection")
+ korad.ovp(True)
+
+ if ocp:
+ print("Enabling over-current protection")
+ korad.ocp(True)
+
+ if max_voltage is not None or max_current is not None or on_off:
+ print("Turning on outputs")
+ korad.set_output(True)
+
+ if duration:
+ print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.")
+ else:
+ print(f"Starting data acquisition. Press Ctrl+C to stop.")
+
+ print("# Device: " + korad.connect(), file=output_handle)
+ print("# Timestamp Voltage Current", file=output_handle)
+ while not terminate_measurement:
+ ts = time.time()
+ current = korad.get_current()
+ voltage = korad.get_voltage()
+ if voltage is not None and current is not None:
+ print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle)
+ elif voltage is not None:
+ print(f"{ts:.3f} {voltage:5.2f} NaN", file=output_handle)
+ elif current is not None:
+ print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle)
+ else:
+ print(f"{ts:.3f} NaN NaN", file=output_handle)
+ time.sleep(0.1)
+
+ if int(ts - start_ts) > last_range_step:
+ last_range_step = int(ts - start_ts)
+ if voltage_step:
+ max_voltage = voltage_start + last_range_step * voltage_step
+ if (voltage_step > 0 and max_voltage <= voltage_stop) or (
+ voltage_step < 0 and max_voltage >= voltage_stop
+ ):
+ print(f"Setting voltage limit to {max_voltage:5.2f} V")
+ korad.set_max_voltage(max_voltage)
+ if current_step:
+ max_current = current_start + last_range_step * current_step
+ if (current_step > 0 and max_current <= current_stop) or (
+ current_step < 0 and max_current >= current_stop
+ ):
+ print(f"Setting current limit to {max_current:5.3f} A")
+ korad.set_max_current(max_current)
+
+ if duration and ts - start_ts > duration:
+ terminate_measurement = True
+
+ if on_off:
+ print("Turning off outputs")
+ korad.set_output(False)
+
+ korad.disconnect()
+
+ output_handle.seek(0)
+ output = output_handle.read()
+ output_handle.close()
+
+ return output
+
+
+def plot_data(data, mode):
+ import matplotlib.pyplot as plt
+
+ if mode == "U":
+ (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1)
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 1], 10),
+ "r-",
+ label="mean(U, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.xlabel("Time [s]")
+ plt.ylabel("Voltage [V]")
+
+ elif mode == "I":
+ (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1)
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 2], 10),
+ "r-",
+ label="mean(I, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.xlabel("Time [s]")
+ plt.ylabel("Current [A]")
+
+ elif mode == "P":
+ (datahandle,) = plt.plot(
+ data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1
+ )
+ (meanhandle,) = plt.plot(
+ data[:, 0],
+ running_mean(data[:, 1] * data[:, 2], 10),
+ "r-",
+ label="mean(P, 10)",
+ markersize=1,
+ )
+ plt.legend(handles=[datahandle, meanhandle])
+ plt.xlabel("Time [s]")
+ plt.ylabel("Power [W]")
+
+ elif mode == "UI":
+ plt.plot(data[:, 1], data[:, 2], "bs", markersize=2)
+ plt.xlabel("Voltage [V]")
+ plt.ylabel("Current [A]")
+
+ elif mode == "UP":
+ plt.plot(data[:, 1], data[:, 1] * data[:, 2], "bs", markersize=2)
+ plt.xlabel("Voltage [V]")
+ plt.ylabel("Power [W]")
+
+ elif mode == "IU":
+ plt.plot(data[:, 2], data[:, 1], "bs", markersize=2)
+ plt.xlabel("Current [A]")
+ plt.ylabel("Voltage [V]")
+
+ elif mode == "IP":
+ plt.plot(data[:, 2], data[:, 1] * data[:, 2], "bs", markersize=2)
+ plt.xlabel("Current [A]")
+ plt.ylabel("Power [W]")
+
+ plt.show()
+
+
+def parse_data(log_data, skip=None, limit=None):
+ lines = log_data.split("\n")
+ data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines))
+ data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines)
+
+ data = np.empty((data_count, 3))
+ skip_index = 0
+ limit_index = data_count
+
+ for i, line in enumerate(data_lines):
+ fields = line.split()
+ if len(fields) == 3:
+ timestamp, voltage, current = map(float, fields)
+ else:
+ raise RuntimeError('cannot parse line "{}"'.format(line))
+
+ if i == 0:
+ first_timestamp = timestamp
+
+ timestamp = timestamp - first_timestamp
+
+ if skip is not None and timestamp < skip:
+ skip_index = i + 1
+ continue
+
+ if limit is not None and timestamp > limit:
+ limit_index = i - 1
+ break
+
+ data[i] = [timestamp, voltage, current]
+
+ data = data[skip_index:limit_index]
+
+ return data
+
+
+def parse_range(range_spec):
+ if range_spec is None:
+ return None, None, None
+
+ start, stop, step = list(map(float, range_spec.split()))
+
+ if start < 0 or stop < 0:
+ print(
+ f"Range specification '{range_spec}' is invalid: start and stop must be positive",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ if step == 0:
+ print(
+ f"Range specification '{range_spec}' is invalid: step must be ≠ 0",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ if (start < stop and step < 0) or (start > stop and step > 0):
+ step = -step
+
+ return start, stop, step
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__
+ )
+ parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE")
+ parser.add_argument(
+ "--port",
+ metavar="PORT",
+ type=str,
+ default="/dev/ttyACM0",
+ help="Set PSU serial port",
+ )
+ parser.add_argument("--channel", type=int, default=1, help="Measurement Channel")
+ parser.add_argument(
+ "--over-current-protection",
+ "--ocp",
+ action="store_true",
+ help="Enable over-current protection",
+ )
+ parser.add_argument(
+ "--over-voltage-protection",
+ "--ovp",
+ action="store_true",
+ help="Enable over-voltage protection",
+ )
+ parser.add_argument(
+ "--voltage-limit",
+ type=float,
+ metavar="VOLTAGE",
+ help="Set voltage limit",
+ )
+ parser.add_argument(
+ "--voltage-range",
+ type=str,
+ metavar="START STOP STEP",
+ help="Vary voltage limit from START to STOP over the course of the measurement. Adjust by STEP V per second.",
+ )
+ parser.add_argument(
+ "--current-limit",
+ type=float,
+ help="Set current limit",
+ )
+ parser.add_argument(
+ "--current-range",
+ type=str,
+ metavar="START STOP STEP",
+ help="Vary current limit from START to STOP over the course of the measurement. Adjust by STEP A per second.",
+ )
+ parser.add_argument(
+ "--on-off",
+ action="store_true",
+ help="Enable output after starting the measurement; disable it after stopping it",
+ )
+ parser.add_argument(
+ "--save", metavar="FILE", type=str, help="Save measurement data in FILE"
+ )
+ parser.add_argument(
+ "--skip",
+ metavar="N",
+ type=float,
+ default=0,
+ help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement",
+ )
+ parser.add_argument(
+ "--limit",
+ type=float,
+ metavar="N",
+ help="Limit analysis to the first N seconds of data",
+ )
+ parser.add_argument(
+ "--plot",
+ metavar="UNIT",
+ choices=["U", "I", "P", "UI", "UP", "IU", "IP"],
+ help="Plot voltage / current / power over time or voltage vs current / current vs voltage",
+ )
+ parser.add_argument(
+ "duration", type=int, nargs="?", help="Measurement duration in seconds"
+ )
+
+ args = parser.parse_args()
+
+ if args.load is None and args.duration is None:
+ print("Either --load or duration must be specified", file=sys.stderr)
+ sys.exit(1)
+
+ current_range = parse_range(args.current_range)
+ voltage_range = parse_range(args.voltage_range)
+
+ if args.load:
+ if args.load.endswith(".xz"):
+ import lzma
+
+ with lzma.open(args.load, "rt") as f:
+ log_data = f.read()
+ else:
+ with open(args.load, "r") as f:
+ log_data = f.read()
+ else:
+ log_data = measure_data(
+ args.port,
+ args.save,
+ args.duration,
+ channel=args.channel,
+ ocp=args.over_current_protection,
+ ovp=args.over_voltage_protection,
+ max_voltage=args.voltage_limit,
+ max_current=args.current_limit,
+ voltage_range=voltage_range,
+ current_range=current_range,
+ on_off=args.on_off,
+ )
+
+ data = parse_data(log_data, skip=args.skip, limit=args.limit)
+
+ if args.plot:
+ plot_data(data, args.plot)
+
+
+if __name__ == "__main__":
+ main()