From 278d1386f748bc477cfb542662780497515ca102 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Sun, 7 Nov 2021 06:53:52 +0100 Subject: kaxxxxp-viewer -> korad-logger; add related projects --- README.md | 31 ++- bin/kaxxxxp-viewer | 602 ---------------------------------------------------- bin/korad-logger | 607 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 629 insertions(+), 611 deletions(-) delete mode 100755 bin/kaxxxxp-viewer create mode 100755 bin/korad-logger diff --git a/README.md b/README.md index d655042..0e542e2 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,14 @@ -# kaxxxxp-viewer - Data Logger and Viewer for KAxxxxP power supplies +# korad-logger - Data Logger and Controller for KAxxxxP power supplies -**kaxxxxp-viewer** acquires and visualizes voltage and current data from +**korad-logger** acquires and visualizes voltage and current data from KAxxxxP power supplies. These are sold under brand names such as Korad or RND Lab and equipped with a USB serial port, allowing them to be used as simple, low-resolution measurement devices. See the [sigrok wiki](https://sigrok.org/wiki/Korad_KAxxxxP_series) for details. +korad-logger is capable of performing simple control tasks, such as stepping +through voltage/current slopes for automated I-V curve measurements. + It has been successfully used with "RND 320-KA3005P" (single-channel) and "RND 320-KA3305P" (dual-channel) supplies. Observed attributes: @@ -18,16 +21,26 @@ It has been successfully used with "RND 320-KA3005P" (single-channel) and "RND Voltage and current range depend on the device, the resolutions are probably identical. -**Warning:** In addition to reading voltage and current values, the serial -interface also supports *setting* them. This is not supported by -kaxxxxp-viewer. However, be warned that communication errors or bugs may cause -the power supply to receive a write command with an arbitrary voltage or -current value, which may result in damaged equipment, fire, or other harm. By -using this software, you acknowledge that you are aware of these risks. +**Warning:** The KAxxxxP serial interface supports both reading current/voltage +data and writing current/voltage limits. korad-logger uses these to change +PSU attributes at runtime, if requested. The serial protocol does not use +checksums or similar mechanisms, so communication errors or bugs may cause the +power supply to receive a write command with an arbitrary voltage or current +value. This may result in damaged equipment, fire, or other harm. By using +this software, you acknowledge that you are aware of these risks. -See `bin/kaxxxxp-viewer --help` for usage details. +See `bin/korad-logger --help` for usage details. ## Dependencies * Python 3 with the following modules: numpy, serial * Data Visualization (--plot): python3-matplotlib + +## Related work + +This is by far not the first project for korad PSU logging and control. See also: + +* [libsigrok](https://sigrok.org/wiki/Korad_KAxxxxP_series) +* [koradctl](https://github.com/attie/koradctl) +* [KoradControl](https://github.com/maximaximal/KoradControl) +* [Korad-KA6003P-Software](https://github.com/Tamagotono/Korad-KA6003P-Software) diff --git a/bin/kaxxxxp-viewer b/bin/kaxxxxp-viewer deleted file mode 100755 index 742d6af..0000000 --- a/bin/kaxxxxp-viewer +++ /dev/null @@ -1,602 +0,0 @@ -#!/usr/bin/env python3 -# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 -# -# Copyright (C) 2021 Daniel Friesel -# -# SPDX-License-Identifier: GPL-2.0-or-later - -"""kaxxxxp-viewer - Data Logger and Viewer for KAxxxxP power supplies - -DESCRIPTION - -kaxxxxp-viewer logs voltage and current readings provided by a KAxxxxP power supply -with serial/USB interface, sold under brands such as Korad or RND Lab. -Measurements can be taken directly (by specifying in seconds) -or loaded from a logfile using --load . Data can be plotted or aggregated on stdout. - -WARNING - -The power supply's serial communication protocol is supports both read and write -operations. Communication errors or bugs may cause the power supply to set an -incompatible voltage or current limit, which may result in damaged equipment or -fire. By using this software, you acknowledge that you are aware of these risks -and the following disclaimer. - -This software is provided by the copyright holders and contributors "as is" and -any express or implied warranties, including, but not limited to, the implied -warranties of merchantability and fitness for a particular purpose are -disclaimed. In no event shall the copyright holder or contributors be liable -for any direct, indirect, incidental, special, exemplary, or consequential -damages (including, but not limited to, procurement of substitute goods or -services; loss of use, data, or profits; or business interruption) however -caused and on any theory of liability, whether in contract, strict liability, -or tort (including negligence or otherwise) arising in any way out of the use -of this software, even if advised of the possibility of such damage. - -OPTIONS -""" - -import argparse -import numpy as np -import serial -import serial.threaded -import signal -import sys -import tempfile -import time - -terminate_measurement = False - - -def running_mean(x: np.ndarray, N: int) -> np.ndarray: - """ - Compute `N` elements wide running average over `x`. - - :param x: 1-Dimensional NumPy array - :param N: how many items to average. Should be even for optimal results. - """ - - # to ensure that output.shape == input.shape, we need to insert data - # at the boundaries - boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) - boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) - - return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") - - -class SerialReader(serial.threaded.Protocol): - def __init__(self): - self.remaining_chars = 0 - self.read_complete = False - self.expect_binary = False - self.recv_buf = "" - self.lines = [] - - def expect(self, num_chars, binary=False): - self.recv_buf = "" - self.remaining_chars = num_chars - self.read_complete = False - self.expect_binary = binary - - def __call__(self): - return self - - def data_received(self, data): - if self.expect_binary: - self.lines.extend(list(data)) - self.remaining_chars -= len(data) - if self.remaining_chars <= 0: - self.read_complete = True - return - - try: - str_data = data.decode("UTF-8") - self.recv_buf += str_data - except UnicodeDecodeError: - sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) - return - - self.remaining_chars -= len(str_data) - - if self.remaining_chars <= 0: - self.lines.append(self.recv_buf) - self.read_complete = True - - def get_expected_line(self): - if len(self.lines): - if self.expect_binary: - ret = self.lines - else: - ret = self.lines[0] - self.lines = list() - return ret - return None - - def get_line(self): - if len(self.lines): - ret = self.lines[-1] - self.lines = [] - return ret - return None - - -class KoradStatus: - # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. - # Or they're the wrong bits altogether. - # and - # - # don't agree on how to parse the status byte. - def __init__(self, status_bytes): - status_byte = status_bytes[0] - self.over_current_protection_enabled = bool(status_byte & 0x20) - self.output_enabled = bool(status_byte & 0x40) - self.over_voltage_protection_enabled = bool(status_byte & 0x80) - - def __repr__(self): - return f"KoradStatus" - - -class KA320: - def __init__(self, port, channel=1): - self.ser = serial.serial_for_url(port, do_not_open=True) - self.ser.baudrate = 9600 - self.ser.parity = "N" - self.ser.rtscts = False - self.ser.xonxoff = False - - try: - self.ser.open() - except serial.SerialException as e: - sys.stderr.write( - "Could not open serial port {}: {}\n".format(self.ser.name, e) - ) - sys.exit(1) - - self.channel = channel - - self.reader = SerialReader() - self.worker = serial.threaded.ReaderThread(self.ser, self.reader) - self.worker.start() - - def rw(self, cmd, num_chars, exact=False, binary=False): - self.reader.expect(num_chars, binary=binary) - self.ser.write(cmd) - timeout = 20 - while not self.reader.read_complete and not timeout == 0: - time.sleep(0.02) - timeout -= 1 - if exact: - return self.reader.get_expected_line() - elif self.reader.read_complete: - return self.reader.get_line() - else: - return self.reader.recv_buf - - # See for supported commands - - def connect(self): - # Device ID length is unknown - return self.rw(b"*IDN?", 32, exact=False) - - def get_status(self): - return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) - - def ovp(self, enable=True): - enable_bit = int(enable) - self.ser.write(f"OVP{enable_bit}".encode()) - time.sleep(0.1) - # assert self.get_status().over_voltage_protection_enabled == enable - - def ocp(self, enable=True): - enable_bit = int(enable) - self.ser.write(f"OCP{enable_bit}".encode()) - time.sleep(0.1) - # assert self.get_status().over_current_protection_enabled == enable - - def set_max_voltage(self, max_voltage): - self.ser.write(f"VSET{self.channel:d}:{max_voltage:05.2f}".encode()) - time.sleep(0.1) - - def set_max_current(self, max_current): - self.ser.write(f"ISET{self.channel:d}:{max_current:05.3f}".encode()) - time.sleep(0.1) - - def get_max_voltage(self): - return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) - - def get_max_current(self): - return float(self.rw(f"ISET{self.channel:d}?".encode(), 5, True)) - - def get_voltage(self): - try: - return float(self.rw(f"VOUT{self.channel:d}?".encode(), 5, True)) - except TypeError: - return None - - def get_current(self): - try: - return float(self.rw(f"IOUT{self.channel:d}?".encode(), 5, True)) - except TypeError: - return None - - def set_output(self, enable): - if enable: - self.ser.write(b"OUT1") - else: - self.ser.write(b"OUT0") - time.sleep(0.1) - - def disconnect(self): - self.worker.stop() - self.ser.close() - - -def graceful_exit(sig, frame): - global terminate_measurement - terminate_measurement = True - - -def measure_data( - port, - filename, - duration, - channel=1, - ocp=False, - ovp=False, - max_voltage=None, - max_current=None, - voltage_range=(None, None, None), - current_range=(None, None, None), - on_off=False, -): - global terminate_measurement - - voltage_start, voltage_stop, voltage_step = voltage_range - current_start, current_stop, current_step = current_range - last_range_step = 0 - - if voltage_start is not None: - max_voltage = voltage_start - if current_start is not None: - max_current = current_start - - signal.signal(signal.SIGINT, graceful_exit) - signal.signal(signal.SIGTERM, graceful_exit) - signal.signal(signal.SIGQUIT, graceful_exit) - korad = KA320(port, channel) - - start_ts = time.time() - - if filename is not None: - output_handle = open(filename, "w+") - else: - output_handle = tempfile.TemporaryFile("w+") - - if max_voltage is not None or max_current is not None: - # turn off output before setting current and voltage limits - print("Turning off outputs") - korad.set_output(False) - - if max_voltage is not None: - print(f"Setting voltage limit to {max_voltage:5.2f} V") - korad.set_max_voltage(max_voltage) - - if max_current is not None: - print(f"Setting current limit to {max_current:5.3f} A") - korad.set_max_current(max_current) - - if ovp: - print("Enabling over-voltage protection") - korad.ovp(True) - - if ocp: - print("Enabling over-current protection") - korad.ocp(True) - - if max_voltage is not None or max_current is not None or on_off: - print("Turning on outputs") - korad.set_output(True) - - if duration: - print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") - else: - print(f"Starting data acquisition. Press Ctrl+C to stop.") - - print("# Device: " + korad.connect(), file=output_handle) - print("# Timestamp Voltage Current", file=output_handle) - while not terminate_measurement: - ts = time.time() - current = korad.get_current() - voltage = korad.get_voltage() - if voltage is not None and current is not None: - print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle) - elif voltage is not None: - print(f"{ts:.3f} {voltage:5.2f} NaN", file=output_handle) - elif current is not None: - print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle) - else: - print(f"{ts:.3f} NaN NaN", file=output_handle) - time.sleep(0.1) - - if int(ts - start_ts) > last_range_step: - last_range_step = int(ts - start_ts) - if voltage_step: - max_voltage = voltage_start + last_range_step * voltage_step - if (voltage_step > 0 and max_voltage <= voltage_stop) or ( - voltage_step < 0 and max_voltage >= voltage_stop - ): - print(f"Setting voltage limit to {max_voltage:5.2f} V") - korad.set_max_voltage(max_voltage) - if current_step: - max_current = current_start + last_range_step * current_step - if (current_step > 0 and max_current <= current_stop) or ( - current_step < 0 and max_current >= current_stop - ): - print(f"Setting current limit to {max_current:5.3f} A") - korad.set_max_current(max_current) - - if duration and ts - start_ts > duration: - terminate_measurement = True - - if on_off: - print("Turning off outputs") - korad.set_output(False) - - korad.disconnect() - - output_handle.seek(0) - output = output_handle.read() - output_handle.close() - - return output - - -def plot_data(data, mode): - import matplotlib.pyplot as plt - - if mode == "U": - (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1) - (meanhandle,) = plt.plot( - data[:, 0], - running_mean(data[:, 1], 10), - "r-", - label="mean(U, 10)", - markersize=1, - ) - plt.legend(handles=[datahandle, meanhandle]) - plt.xlabel("Time [s]") - plt.ylabel("Voltage [V]") - - elif mode == "I": - (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1) - (meanhandle,) = plt.plot( - data[:, 0], - running_mean(data[:, 2], 10), - "r-", - label="mean(I, 10)", - markersize=1, - ) - plt.legend(handles=[datahandle, meanhandle]) - plt.xlabel("Time [s]") - plt.ylabel("Current [A]") - - elif mode == "P": - (datahandle,) = plt.plot( - data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1 - ) - (meanhandle,) = plt.plot( - data[:, 0], - running_mean(data[:, 1] * data[:, 2], 10), - "r-", - label="mean(P, 10)", - markersize=1, - ) - plt.legend(handles=[datahandle, meanhandle]) - plt.xlabel("Time [s]") - plt.ylabel("Power [W]") - - elif mode == "UI": - plt.plot(data[:, 1], data[:, 2], "bs", markersize=2) - plt.xlabel("Voltage [V]") - plt.ylabel("Current [A]") - - elif mode == "UP": - plt.plot(data[:, 1], data[:, 1] * data[:, 2], "bs", markersize=2) - plt.xlabel("Voltage [V]") - plt.ylabel("Power [W]") - - elif mode == "IU": - plt.plot(data[:, 2], data[:, 1], "bs", markersize=2) - plt.xlabel("Current [A]") - plt.ylabel("Voltage [V]") - - elif mode == "IP": - plt.plot(data[:, 2], data[:, 1] * data[:, 2], "bs", markersize=2) - plt.xlabel("Current [A]") - plt.ylabel("Power [W]") - - plt.show() - - -def parse_data(log_data, skip=None, limit=None): - lines = log_data.split("\n") - data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) - data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) - - data = np.empty((data_count, 3)) - skip_index = 0 - limit_index = data_count - - for i, line in enumerate(data_lines): - fields = line.split() - if len(fields) == 3: - timestamp, voltage, current = map(float, fields) - else: - raise RuntimeError('cannot parse line "{}"'.format(line)) - - if i == 0: - first_timestamp = timestamp - - timestamp = timestamp - first_timestamp - - if skip is not None and timestamp < skip: - skip_index = i + 1 - continue - - if limit is not None and timestamp > limit: - limit_index = i - 1 - break - - data[i] = [timestamp, voltage, current] - - data = data[skip_index:limit_index] - - return data - - -def parse_range(range_spec): - if range_spec is None: - return None, None, None - - start, stop, step = list(map(float, range_spec.split())) - - if start < 0 or stop < 0: - print( - f"Range specification '{range_spec}' is invalid: start and stop must be positive", - file=sys.stderr, - ) - sys.exit(1) - - if step == 0: - print( - f"Range specification '{range_spec}' is invalid: step must be ≠ 0", - file=sys.stderr, - ) - sys.exit(1) - - if (start < stop and step < 0) or (start > stop and step > 0): - step = -step - - return start, stop, step - - -def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ - ) - parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE") - parser.add_argument( - "--port", - metavar="PORT", - type=str, - default="/dev/ttyACM0", - help="Set PSU serial port", - ) - parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") - parser.add_argument( - "--over-current-protection", - "--ocp", - action="store_true", - help="Enable over-current protection", - ) - parser.add_argument( - "--over-voltage-protection", - "--ovp", - action="store_true", - help="Enable over-voltage protection", - ) - parser.add_argument( - "--voltage-limit", - type=float, - metavar="VOLTAGE", - help="Set voltage limit", - ) - parser.add_argument( - "--voltage-range", - type=str, - metavar="START STOP STEP", - help="Vary voltage limit from START to STOP over the course of the measurement. Adjust by STEP V per second.", - ) - parser.add_argument( - "--current-limit", - type=float, - help="Set current limit", - ) - parser.add_argument( - "--current-range", - type=str, - metavar="START STOP STEP", - help="Vary current limit from START to STOP over the course of the measurement. Adjust by STEP A per second.", - ) - parser.add_argument( - "--on-off", - action="store_true", - help="Enable output after starting the measurement; disable it after stopping it", - ) - parser.add_argument( - "--save", metavar="FILE", type=str, help="Save measurement data in FILE" - ) - parser.add_argument( - "--skip", - metavar="N", - type=float, - default=0, - help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement", - ) - parser.add_argument( - "--limit", - type=float, - metavar="N", - help="Limit analysis to the first N seconds of data", - ) - parser.add_argument( - "--plot", - metavar="UNIT", - choices=["U", "I", "P", "UI", "UP", "IU", "IP"], - help="Plot voltage / current / power over time or voltage vs current / current vs voltage", - ) - parser.add_argument( - "duration", type=int, nargs="?", help="Measurement duration in seconds" - ) - - args = parser.parse_args() - - if args.load is None and args.duration is None: - print("Either --load or duration must be specified", file=sys.stderr) - sys.exit(1) - - current_range = parse_range(args.current_range) - voltage_range = parse_range(args.voltage_range) - - if args.load: - if args.load.endswith(".xz"): - import lzma - - with lzma.open(args.load, "rt") as f: - log_data = f.read() - else: - with open(args.load, "r") as f: - log_data = f.read() - else: - log_data = measure_data( - args.port, - args.save, - args.duration, - channel=args.channel, - ocp=args.over_current_protection, - ovp=args.over_voltage_protection, - max_voltage=args.voltage_limit, - max_current=args.current_limit, - voltage_range=voltage_range, - current_range=current_range, - on_off=args.on_off, - ) - - data = parse_data(log_data, skip=args.skip, limit=args.limit) - - if args.plot: - plot_data(data, args.plot) - - -if __name__ == "__main__": - main() diff --git a/bin/korad-logger b/bin/korad-logger new file mode 100755 index 0000000..84d9483 --- /dev/null +++ b/bin/korad-logger @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 +# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 +# +# Copyright (C) 2021 Daniel Friesel +# +# SPDX-License-Identifier: GPL-2.0-or-later + +"""korad-logger - Data Logger and Controller for Korad KAxxxxP power supplies + +DESCRIPTION + +korad-logger logs voltage and current readings provided by a KAxxxxP power +supply with serial/USB interface, sold under brands such as Korad or RND Lab. +It is also capable of performing simple control tasks, such as stepping through +voltage/current slopes for automated I-V curve measurements. Measurements can +be taken directly (by specifying in seconds) or loaded +from a logfile using --load . Data can be plotted or aggregated on stdout. + +WARNING + +The KAxxxxP serial interface supports both reading current/voltage +data and writing current/voltage limits. korad-logger uses these to change +PSU attributes at runtime, if requested. The serial protocol does not use +checksums or similar mechanisms, so communication errors or bugs may cause the +power supply to receive a write command with an arbitrary voltage or current +value. This may result in damaged equipment, fire, or other harm. By using +this software, you acknowledge that you are aware of these risks and the +following disclaimer. + +This software is provided by the copyright holders and contributors "as is" and +any express or implied warranties, including, but not limited to, the implied +warranties of merchantability and fitness for a particular purpose are +disclaimed. In no event shall the copyright holder or contributors be liable +for any direct, indirect, incidental, special, exemplary, or consequential +damages (including, but not limited to, procurement of substitute goods or +services; loss of use, data, or profits; or business interruption) however +caused and on any theory of liability, whether in contract, strict liability, +or tort (including negligence or otherwise) arising in any way out of the use +of this software, even if advised of the possibility of such damage. + +OPTIONS +""" + +import argparse +import numpy as np +import serial +import serial.threaded +import signal +import sys +import tempfile +import time + +terminate_measurement = False + + +def running_mean(x: np.ndarray, N: int) -> np.ndarray: + """ + Compute `N` elements wide running average over `x`. + + :param x: 1-Dimensional NumPy array + :param N: how many items to average. Should be even for optimal results. + """ + + # to ensure that output.shape == input.shape, we need to insert data + # at the boundaries + boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) + boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) + + return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") + + +class SerialReader(serial.threaded.Protocol): + def __init__(self): + self.remaining_chars = 0 + self.read_complete = False + self.expect_binary = False + self.recv_buf = "" + self.lines = [] + + def expect(self, num_chars, binary=False): + self.recv_buf = "" + self.remaining_chars = num_chars + self.read_complete = False + self.expect_binary = binary + + def __call__(self): + return self + + def data_received(self, data): + if self.expect_binary: + self.lines.extend(list(data)) + self.remaining_chars -= len(data) + if self.remaining_chars <= 0: + self.read_complete = True + return + + try: + str_data = data.decode("UTF-8") + self.recv_buf += str_data + except UnicodeDecodeError: + sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) + return + + self.remaining_chars -= len(str_data) + + if self.remaining_chars <= 0: + self.lines.append(self.recv_buf) + self.read_complete = True + + def get_expected_line(self): + if len(self.lines): + if self.expect_binary: + ret = self.lines + else: + ret = self.lines[0] + self.lines = list() + return ret + return None + + def get_line(self): + if len(self.lines): + ret = self.lines[-1] + self.lines = [] + return ret + return None + + +class KoradStatus: + # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. + # Or they're the wrong bits altogether. + # and + # + # don't agree on how to parse the status byte. + def __init__(self, status_bytes): + status_byte = status_bytes[0] + self.over_current_protection_enabled = bool(status_byte & 0x20) + self.output_enabled = bool(status_byte & 0x40) + self.over_voltage_protection_enabled = bool(status_byte & 0x80) + + def __repr__(self): + return f"KoradStatus" + + +class KA320: + def __init__(self, port, channel=1): + self.ser = serial.serial_for_url(port, do_not_open=True) + self.ser.baudrate = 9600 + self.ser.parity = "N" + self.ser.rtscts = False + self.ser.xonxoff = False + + try: + self.ser.open() + except serial.SerialException as e: + sys.stderr.write( + "Could not open serial port {}: {}\n".format(self.ser.name, e) + ) + sys.exit(1) + + self.channel = channel + + self.reader = SerialReader() + self.worker = serial.threaded.ReaderThread(self.ser, self.reader) + self.worker.start() + + def rw(self, cmd, num_chars, exact=False, binary=False): + self.reader.expect(num_chars, binary=binary) + self.ser.write(cmd) + timeout = 20 + while not self.reader.read_complete and not timeout == 0: + time.sleep(0.02) + timeout -= 1 + if exact: + return self.reader.get_expected_line() + elif self.reader.read_complete: + return self.reader.get_line() + else: + return self.reader.recv_buf + + # See for supported commands + + def connect(self): + # Device ID length is unknown + return self.rw(b"*IDN?", 32, exact=False) + + def get_status(self): + return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) + + def ovp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OVP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_voltage_protection_enabled == enable + + def ocp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OCP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_current_protection_enabled == enable + + def set_max_voltage(self, max_voltage): + self.ser.write(f"VSET{self.channel:d}:{max_voltage:05.2f}".encode()) + time.sleep(0.1) + + def set_max_current(self, max_current): + self.ser.write(f"ISET{self.channel:d}:{max_current:05.3f}".encode()) + time.sleep(0.1) + + def get_max_voltage(self): + return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) + + def get_max_current(self): + return float(self.rw(f"ISET{self.channel:d}?".encode(), 5, True)) + + def get_voltage(self): + try: + return float(self.rw(f"VOUT{self.channel:d}?".encode(), 5, True)) + except TypeError: + return None + + def get_current(self): + try: + return float(self.rw(f"IOUT{self.channel:d}?".encode(), 5, True)) + except TypeError: + return None + + def set_output(self, enable): + if enable: + self.ser.write(b"OUT1") + else: + self.ser.write(b"OUT0") + time.sleep(0.1) + + def disconnect(self): + self.worker.stop() + self.ser.close() + + +def graceful_exit(sig, frame): + global terminate_measurement + terminate_measurement = True + + +def measure_data( + port, + filename, + duration, + channel=1, + ocp=False, + ovp=False, + max_voltage=None, + max_current=None, + voltage_range=(None, None, None), + current_range=(None, None, None), + on_off=False, +): + global terminate_measurement + + voltage_start, voltage_stop, voltage_step = voltage_range + current_start, current_stop, current_step = current_range + last_range_step = 0 + + if voltage_start is not None: + max_voltage = voltage_start + if current_start is not None: + max_current = current_start + + signal.signal(signal.SIGINT, graceful_exit) + signal.signal(signal.SIGTERM, graceful_exit) + signal.signal(signal.SIGQUIT, graceful_exit) + korad = KA320(port, channel) + + start_ts = time.time() + + if filename is not None: + output_handle = open(filename, "w+") + else: + output_handle = tempfile.TemporaryFile("w+") + + if max_voltage is not None or max_current is not None: + # turn off output before setting current and voltage limits + print("Turning off outputs") + korad.set_output(False) + + if max_voltage is not None: + print(f"Setting voltage limit to {max_voltage:5.2f} V") + korad.set_max_voltage(max_voltage) + + if max_current is not None: + print(f"Setting current limit to {max_current:5.3f} A") + korad.set_max_current(max_current) + + if ovp: + print("Enabling over-voltage protection") + korad.ovp(True) + + if ocp: + print("Enabling over-current protection") + korad.ocp(True) + + if max_voltage is not None or max_current is not None or on_off: + print("Turning on outputs") + korad.set_output(True) + + if duration: + print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") + else: + print(f"Starting data acquisition. Press Ctrl+C to stop.") + + print("# Device: " + korad.connect(), file=output_handle) + print("# Timestamp Voltage Current", file=output_handle) + while not terminate_measurement: + ts = time.time() + current = korad.get_current() + voltage = korad.get_voltage() + if voltage is not None and current is not None: + print(f"{ts:.3f} {voltage:5.2f} {current:5.3f}", file=output_handle) + elif voltage is not None: + print(f"{ts:.3f} {voltage:5.2f} NaN", file=output_handle) + elif current is not None: + print(f"{ts:.3f} NaN {current:5.3f}", file=output_handle) + else: + print(f"{ts:.3f} NaN NaN", file=output_handle) + time.sleep(0.1) + + if int(ts - start_ts) > last_range_step: + last_range_step = int(ts - start_ts) + if voltage_step: + max_voltage = voltage_start + last_range_step * voltage_step + if (voltage_step > 0 and max_voltage <= voltage_stop) or ( + voltage_step < 0 and max_voltage >= voltage_stop + ): + print(f"Setting voltage limit to {max_voltage:5.2f} V") + korad.set_max_voltage(max_voltage) + if current_step: + max_current = current_start + last_range_step * current_step + if (current_step > 0 and max_current <= current_stop) or ( + current_step < 0 and max_current >= current_stop + ): + print(f"Setting current limit to {max_current:5.3f} A") + korad.set_max_current(max_current) + + if duration and ts - start_ts > duration: + terminate_measurement = True + + if on_off: + print("Turning off outputs") + korad.set_output(False) + + korad.disconnect() + + output_handle.seek(0) + output = output_handle.read() + output_handle.close() + + return output + + +def plot_data(data, mode): + import matplotlib.pyplot as plt + + if mode == "U": + (datahandle,) = plt.plot(data[:, 0], data[:, 1], "b-", label="U", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1], 10), + "r-", + label="mean(U, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Voltage [V]") + + elif mode == "I": + (datahandle,) = plt.plot(data[:, 0], data[:, 2], "b-", label="I", markersize=1) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 2], 10), + "r-", + label="mean(I, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Current [A]") + + elif mode == "P": + (datahandle,) = plt.plot( + data[:, 0], data[:, 1] * data[:, 2], "b-", label="P", markersize=1 + ) + (meanhandle,) = plt.plot( + data[:, 0], + running_mean(data[:, 1] * data[:, 2], 10), + "r-", + label="mean(P, 10)", + markersize=1, + ) + plt.legend(handles=[datahandle, meanhandle]) + plt.xlabel("Time [s]") + plt.ylabel("Power [W]") + + elif mode == "UI": + plt.plot(data[:, 1], data[:, 2], "bs", markersize=2) + plt.xlabel("Voltage [V]") + plt.ylabel("Current [A]") + + elif mode == "UP": + plt.plot(data[:, 1], data[:, 1] * data[:, 2], "bs", markersize=2) + plt.xlabel("Voltage [V]") + plt.ylabel("Power [W]") + + elif mode == "IU": + plt.plot(data[:, 2], data[:, 1], "bs", markersize=2) + plt.xlabel("Current [A]") + plt.ylabel("Voltage [V]") + + elif mode == "IP": + plt.plot(data[:, 2], data[:, 1] * data[:, 2], "bs", markersize=2) + plt.xlabel("Current [A]") + plt.ylabel("Power [W]") + + plt.show() + + +def parse_data(log_data, skip=None, limit=None): + lines = log_data.split("\n") + data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) + data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) + + data = np.empty((data_count, 3)) + skip_index = 0 + limit_index = data_count + + for i, line in enumerate(data_lines): + fields = line.split() + if len(fields) == 3: + timestamp, voltage, current = map(float, fields) + else: + raise RuntimeError('cannot parse line "{}"'.format(line)) + + if i == 0: + first_timestamp = timestamp + + timestamp = timestamp - first_timestamp + + if skip is not None and timestamp < skip: + skip_index = i + 1 + continue + + if limit is not None and timestamp > limit: + limit_index = i - 1 + break + + data[i] = [timestamp, voltage, current] + + data = data[skip_index:limit_index] + + return data + + +def parse_range(range_spec): + if range_spec is None: + return None, None, None + + start, stop, step = list(map(float, range_spec.split())) + + if start < 0 or stop < 0: + print( + f"Range specification '{range_spec}' is invalid: start and stop must be positive", + file=sys.stderr, + ) + sys.exit(1) + + if step == 0: + print( + f"Range specification '{range_spec}' is invalid: step must be ≠ 0", + file=sys.stderr, + ) + sys.exit(1) + + if (start < stop and step < 0) or (start > stop and step > 0): + step = -step + + return start, stop, step + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ + ) + parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE") + parser.add_argument( + "--port", + metavar="PORT", + type=str, + default="/dev/ttyACM0", + help="Set PSU serial port", + ) + parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") + parser.add_argument( + "--over-current-protection", + "--ocp", + action="store_true", + help="Enable over-current protection", + ) + parser.add_argument( + "--over-voltage-protection", + "--ovp", + action="store_true", + help="Enable over-voltage protection", + ) + parser.add_argument( + "--voltage-limit", + type=float, + metavar="VOLTAGE", + help="Set voltage limit", + ) + parser.add_argument( + "--voltage-range", + type=str, + metavar="START STOP STEP", + help="Vary voltage limit from START to STOP over the course of the measurement. Adjust by STEP V per second.", + ) + parser.add_argument( + "--current-limit", + type=float, + help="Set current limit", + ) + parser.add_argument( + "--current-range", + type=str, + metavar="START STOP STEP", + help="Vary current limit from START to STOP over the course of the measurement. Adjust by STEP A per second.", + ) + parser.add_argument( + "--on-off", + action="store_true", + help="Enable output after starting the measurement; disable it after stopping it", + ) + parser.add_argument( + "--save", metavar="FILE", type=str, help="Save measurement data in FILE" + ) + parser.add_argument( + "--skip", + metavar="N", + type=float, + default=0, + help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement", + ) + parser.add_argument( + "--limit", + type=float, + metavar="N", + help="Limit analysis to the first N seconds of data", + ) + parser.add_argument( + "--plot", + metavar="UNIT", + choices=["U", "I", "P", "UI", "UP", "IU", "IP"], + help="Plot voltage / current / power over time or voltage vs current / current vs voltage", + ) + parser.add_argument( + "duration", type=int, nargs="?", help="Measurement duration in seconds" + ) + + args = parser.parse_args() + + if args.load is None and args.duration is None: + print("Either --load or duration must be specified", file=sys.stderr) + sys.exit(1) + + current_range = parse_range(args.current_range) + voltage_range = parse_range(args.voltage_range) + + if args.load: + if args.load.endswith(".xz"): + import lzma + + with lzma.open(args.load, "rt") as f: + log_data = f.read() + else: + with open(args.load, "r") as f: + log_data = f.read() + else: + log_data = measure_data( + args.port, + args.save, + args.duration, + channel=args.channel, + ocp=args.over_current_protection, + ovp=args.over_voltage_protection, + max_voltage=args.voltage_limit, + max_current=args.current_limit, + voltage_range=voltage_range, + current_range=current_range, + on_off=args.on_off, + ) + + data = parse_data(log_data, skip=args.skip, limit=args.limit) + + if args.plot: + plot_data(data, args.plot) + + +if __name__ == "__main__": + main() -- cgit v1.2.3