summaryrefslogtreecommitdiff
path: root/bin/kaxxxxp-viewer
diff options
context:
space:
mode:
Diffstat (limited to 'bin/kaxxxxp-viewer')
-rwxr-xr-xbin/kaxxxxp-viewer93
1 files changed, 83 insertions, 10 deletions
diff --git a/bin/kaxxxxp-viewer b/bin/kaxxxxp-viewer
index dac5520..277fbb4 100755
--- a/bin/kaxxxxp-viewer
+++ b/bin/kaxxxxp-viewer
@@ -68,23 +68,33 @@ class SerialReader(serial.threaded.Protocol):
def __init__(self):
self.remaining_chars = 0
self.read_complete = False
+ self.expect_binary = False
self.recv_buf = ""
self.lines = []
- def expect(self, num_chars):
+ def expect(self, num_chars, binary=False):
self.recv_buf = ""
self.remaining_chars = num_chars
self.read_complete = False
+ self.expect_binary = binary
def __call__(self):
return self
def data_received(self, data):
+ if self.expect_binary:
+ self.lines.extend(list(data))
+ self.remaining_chars -= len(data)
+ if self.remaining_chars <= 0:
+ self.read_complete = True
+ return
+
try:
str_data = data.decode("UTF-8")
self.recv_buf += str_data
except UnicodeDecodeError:
sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data))
+ return
self.remaining_chars -= len(str_data)
@@ -94,8 +104,11 @@ class SerialReader(serial.threaded.Protocol):
def get_expected_line(self):
if len(self.lines):
- ret = self.lines[0]
- self.lines = []
+ if self.expect_binary:
+ ret = self.lines
+ else:
+ ret = self.lines[0]
+ self.lines = list()
return ret
return None
@@ -107,6 +120,22 @@ class SerialReader(serial.threaded.Protocol):
return None
+class KoradStatus:
+ # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits.
+ # Or they're the wrong bits altogether.
+ # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and
+ # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/>
+ # don't agree on how to parse the status byte.
+ def __init__(self, status_bytes):
+ status_byte = status_bytes[0]
+ self.over_current_protection_enabled = bool(status_byte & 0x20)
+ self.output_enabled = bool(status_byte & 0x40)
+ self.over_voltage_protection_enabled = bool(status_byte & 0x80)
+
+ def __repr__(self):
+ return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>"
+
+
class KA320:
def __init__(self, port, channel=1):
self.ser = serial.serial_for_url(port, do_not_open=True)
@@ -129,21 +158,40 @@ class KA320:
self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
self.worker.start()
- def rw(self, cmd, num_chars, exact=False):
- self.reader.expect(num_chars)
+ def rw(self, cmd, num_chars, exact=False, binary=False):
+ self.reader.expect(num_chars, binary=binary)
self.ser.write(cmd)
- timeout = 10
+ timeout = 20
while not self.reader.read_complete and not timeout == 0:
time.sleep(0.02)
timeout -= 1
if exact:
return self.reader.get_expected_line()
- return self.reader.get_line()
+ elif self.reader.read_complete:
+ return self.reader.get_line()
+ else:
+ return self.reader.recv_buf
# See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands
def connect(self):
- return self.rw(b"*IDN?", 16)
+ # Device ID length is unknown
+ return self.rw(b"*IDN?", 32, exact=False)
+
+ def get_status(self):
+ return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True))
+
+ def ovp(self, enable=True):
+ enable_bit = int(enable)
+ self.ser.write(f"OVP{enable_bit}".encode())
+ time.sleep(0.1)
+ # assert self.get_status().over_voltage_protection_enabled == enable
+
+ def ocp(self, enable=True):
+ enable_bit = int(enable)
+ self.ser.write(f"OCP{enable_bit}".encode())
+ time.sleep(0.1)
+ # assert self.get_status().over_current_protection_enabled == enable
def get_max_voltage(self):
return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True))
@@ -180,7 +228,7 @@ def graceful_exit(sig, frame):
terminate_measurement = True
-def measure_data(port, filename, duration, channel=1):
+def measure_data(port, filename, duration, channel=1, ocp=False, ovp=False):
global terminate_measurement
signal.signal(signal.SIGINT, graceful_exit)
@@ -195,6 +243,14 @@ def measure_data(port, filename, duration, channel=1):
else:
output_handle = tempfile.TemporaryFile("w+")
+ if ovp:
+ print("Enabling over-voltage protection")
+ korad.ovp(True)
+
+ if ocp:
+ print("Enabling over-current protection")
+ korad.ocp(True)
+
if duration:
print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.")
else:
@@ -322,6 +378,18 @@ def main():
)
parser.add_argument("--channel", type=int, default=1, help="Measurement Channel")
parser.add_argument(
+ "--over-current-protection",
+ "--ocp",
+ action="store_true",
+ help="Enable over-current protection",
+ )
+ parser.add_argument(
+ "--over-voltage-protection",
+ "--ovp",
+ action="store_true",
+ help="Enable over-voltage protection",
+ )
+ parser.add_argument(
"--save", metavar="FILE", type=str, help="Save measurement data in FILE"
)
parser.add_argument(
@@ -364,7 +432,12 @@ def main():
log_data = f.read()
else:
log_data = measure_data(
- args.port, args.save, args.duration, channel=args.channel
+ args.port,
+ args.save,
+ args.duration,
+ channel=args.channel,
+ ocp=args.over_current_protection,
+ ovp=args.over_voltage_protection,
)
data = parse_data(log_data, skip=args.skip, limit=args.limit)