diff options
Diffstat (limited to 'bin/kaxxxxp-viewer')
-rwxr-xr-x | bin/kaxxxxp-viewer | 93 |
1 files changed, 83 insertions, 10 deletions
diff --git a/bin/kaxxxxp-viewer b/bin/kaxxxxp-viewer index dac5520..277fbb4 100755 --- a/bin/kaxxxxp-viewer +++ b/bin/kaxxxxp-viewer @@ -68,23 +68,33 @@ class SerialReader(serial.threaded.Protocol): def __init__(self): self.remaining_chars = 0 self.read_complete = False + self.expect_binary = False self.recv_buf = "" self.lines = [] - def expect(self, num_chars): + def expect(self, num_chars, binary=False): self.recv_buf = "" self.remaining_chars = num_chars self.read_complete = False + self.expect_binary = binary def __call__(self): return self def data_received(self, data): + if self.expect_binary: + self.lines.extend(list(data)) + self.remaining_chars -= len(data) + if self.remaining_chars <= 0: + self.read_complete = True + return + try: str_data = data.decode("UTF-8") self.recv_buf += str_data except UnicodeDecodeError: sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) + return self.remaining_chars -= len(str_data) @@ -94,8 +104,11 @@ class SerialReader(serial.threaded.Protocol): def get_expected_line(self): if len(self.lines): - ret = self.lines[0] - self.lines = [] + if self.expect_binary: + ret = self.lines + else: + ret = self.lines[0] + self.lines = list() return ret return None @@ -107,6 +120,22 @@ class SerialReader(serial.threaded.Protocol): return None +class KoradStatus: + # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. + # Or they're the wrong bits altogether. + # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and + # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/> + # don't agree on how to parse the status byte. + def __init__(self, status_bytes): + status_byte = status_bytes[0] + self.over_current_protection_enabled = bool(status_byte & 0x20) + self.output_enabled = bool(status_byte & 0x40) + self.over_voltage_protection_enabled = bool(status_byte & 0x80) + + def __repr__(self): + return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>" + + class KA320: def __init__(self, port, channel=1): self.ser = serial.serial_for_url(port, do_not_open=True) @@ -129,21 +158,40 @@ class KA320: self.worker = serial.threaded.ReaderThread(self.ser, self.reader) self.worker.start() - def rw(self, cmd, num_chars, exact=False): - self.reader.expect(num_chars) + def rw(self, cmd, num_chars, exact=False, binary=False): + self.reader.expect(num_chars, binary=binary) self.ser.write(cmd) - timeout = 10 + timeout = 20 while not self.reader.read_complete and not timeout == 0: time.sleep(0.02) timeout -= 1 if exact: return self.reader.get_expected_line() - return self.reader.get_line() + elif self.reader.read_complete: + return self.reader.get_line() + else: + return self.reader.recv_buf # See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands def connect(self): - return self.rw(b"*IDN?", 16) + # Device ID length is unknown + return self.rw(b"*IDN?", 32, exact=False) + + def get_status(self): + return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) + + def ovp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OVP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_voltage_protection_enabled == enable + + def ocp(self, enable=True): + enable_bit = int(enable) + self.ser.write(f"OCP{enable_bit}".encode()) + time.sleep(0.1) + # assert self.get_status().over_current_protection_enabled == enable def get_max_voltage(self): return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) @@ -180,7 +228,7 @@ def graceful_exit(sig, frame): terminate_measurement = True -def measure_data(port, filename, duration, channel=1): +def measure_data(port, filename, duration, channel=1, ocp=False, ovp=False): global terminate_measurement signal.signal(signal.SIGINT, graceful_exit) @@ -195,6 +243,14 @@ def measure_data(port, filename, duration, channel=1): else: output_handle = tempfile.TemporaryFile("w+") + if ovp: + print("Enabling over-voltage protection") + korad.ovp(True) + + if ocp: + print("Enabling over-current protection") + korad.ocp(True) + if duration: print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") else: @@ -322,6 +378,18 @@ def main(): ) parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") parser.add_argument( + "--over-current-protection", + "--ocp", + action="store_true", + help="Enable over-current protection", + ) + parser.add_argument( + "--over-voltage-protection", + "--ovp", + action="store_true", + help="Enable over-voltage protection", + ) + parser.add_argument( "--save", metavar="FILE", type=str, help="Save measurement data in FILE" ) parser.add_argument( @@ -364,7 +432,12 @@ def main(): log_data = f.read() else: log_data = measure_data( - args.port, args.save, args.duration, channel=args.channel + args.port, + args.save, + args.duration, + channel=args.channel, + ocp=args.over_current_protection, + ovp=args.over_voltage_protection, ) data = parse_data(log_data, skip=args.skip, limit=args.limit) |