From 9aeb58cb7c64446d76c597641acbfe81e5156e47 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Wed, 26 Aug 2020 18:10:20 +0200 Subject: Initial Commit --- bin/dlog-viewer | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100755 bin/dlog-viewer (limited to 'bin') diff --git a/bin/dlog-viewer b/bin/dlog-viewer new file mode 100755 index 0000000..98773ae --- /dev/null +++ b/bin/dlog-viewer @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +# vim:tabstop=4:softtabstop=4:shiftwidth=4:textwidth=160:smarttab:expandtab + +"""dlog-viewer - View and Convert Keysight .dlog Files + +USAGE + +dlog-viewer [--csv-export ] [--plot] [--stat] + +DESCRIPTION + +dlog-viewer loads voltage, current, and/or power measurements from .dlog files +produced by devices such as the Keysight N6705B DC Power Analyzer. +Measurements can be exported to CSV or plotted on-screen. + +This program is not affiliated with Keysight and has not been thoroughly +tested yet. Use at your own risk. + +OPTIONS + + --csv-export + Export measurements as CSV to + --plot + Draw plots of voltage/current/power over time + --stat + Print mean voltage, current, and power + +""" + +import argparse +import csv +import lzma +import matplotlib.pyplot as plt +import numpy as np +import os +import struct +import sys +import xml.etree.ElementTree as ET + + +def running_mean(x: np.ndarray, N: int) -> np.ndarray: + """ + Compute `N` elements wide running average over `x`. + + :param x: 1-Dimensional NumPy array + :param N: how many items to average. Should be even for optimal results. + """ + + # to ensure that output.shape == input.shape, we need to insert data + # at the boundaries + boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) + boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) + + return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") + + +class DLogChannel: + def __init__(self, desc_tuple): + self.slot = desc_tuple[0] + self.smu = desc_tuple[1] + self.unit = desc_tuple[2] + self.data = None + + def __repr__(self): + return f"""""" + + +class DLog: + def __init__(self, filename): + self.load_dlog(filename) + + def load_dlog(self, filename): + lines = [] + line = "" + + with open(filename, "rb") as f: + if ".xz" in filename: + f = lzma.open(f) + + while line != "\n": + line = f.readline().decode() + lines.append(line) + xml_header = "".join(lines) + raw_header = f.read(8) + data_offset = f.tell() + raw_data = f.read() + + xml_header = xml_header.replace("1ua>", "X1ua>") + xml_header = xml_header.replace("2ua>", "X2ua>") + dlog = ET.fromstring(xml_header) + channels = [] + for channel in dlog.findall("channel"): + channel_id = int(channel.get("id")) + sense_curr = channel.find("sense_curr").text + sense_volt = channel.find("sense_volt").text + model = channel.find("ident").find("model").text + if sense_volt == "1": + channels.append((channel_id, model, "V")) + if sense_curr == "1": + channels.append((channel_id, model, "A")) + + num_channels = len(channels) + + self.channels = list(map(DLogChannel, channels)) + self.interval = float(dlog.find("frame").find("tint").text) + self.planned_duration = int(dlog.find("frame").find("time").text) + self.observed_duration = self.interval * int(len(raw_data) / (4 * num_channels)) + + self.timestamps = np.linspace( + 0, self.observed_duration, num=int(len(raw_data) / (4 * num_channels)) + ) + + if int(self.observed_duration) != self.planned_duration: + self.duration_deviates = True + else: + self.duration_deviates = False + + self.data = np.ndarray( + shape=(num_channels, int(len(raw_data) / (4 * num_channels))), + dtype=np.float32, + ) + + iterator = struct.iter_unpack(">f", raw_data) + channel_offset = 0 + measurement_offset = 0 + for value in iterator: + self.data[channel_offset, measurement_offset] = value[0] + if channel_offset + 1 == num_channels: + channel_offset = 0 + measurement_offset += 1 + else: + channel_offset += 1 + + # An SMU has four slots + self.slots = [dict(), dict(), dict(), dict()] + + for i, channel in enumerate(self.channels): + channel.data = self.data[i] + self.slots[channel.slot - 1][channel.unit] = channel + + def slot_has_data(self, slot): + return len(self.slots[slot - 1]) > 0 + + def slot_has_power(self, slot): + slot_data = self.slots[slot - 1] + if "W" in slot_data: + return True + if "V" in slot_data and "A" in slot_data: + return True + return False + + def all_data_slots_have_power(self): + for slot in range(4): + if self.slot_has_data(slot) and not self.slot_has_power(slot): + return False + return True + + +def print_stats(dlog): + for channel in dlog.channels: + min_data = np.min(channel.data) + max_data = np.max(channel.data) + mean_data = np.mean(channel.data) + if channel.unit == "V": + precision = 3 + else: + precision = 6 + print(f"Slot {channel.slot} ({channel.smu}):") + print(f" Min {min_data:.{precision}f} {channel.unit}") + print(f" Mean {mean_data:.{precision}f} {channel.unit}") + print(f" Max {max_data:.{precision}f} {channel.unit}") + print() + + +def show_power_plot(dlog): + + handles = list() + + for slot in dlog.slots: + if "W" in slot: + (handle,) = plt.plot( + dlog.timestamps, slot["W"].data, "b-", label="P", markersize=1 + ) + handles.append(handle) + (handle,) = plt.plot( + dlog.timestamps, + running_mean(slot["W"].data, 10), + "r-", + label="mean(P, 10)", + markersize=1, + ) + handles.append(handle) + elif "V" in slot and "A" in slot: + (handle,) = plt.plot( + dlog.timestamps, + slot["V"].data * slot["A"].data, + "b-", + label="P = U * I", + markersize=1, + ) + handles.append(handle) + (handle,) = plt.plot( + dlog.timestamps, + running_mean(slot["V"].data * slot["A"].data, 10), + "r-", + label="mean(P, 10)", + markersize=1, + ) + handles.append(handle) + + plt.legend(handles=handles) + plt.xlabel("Time [s]") + plt.ylabel("Power [W]") + plt.grid(True) + plt.show() + + +def show_raw_plot(dlog): + handles = list() + + for channel in dlog.channels: + label = f"{channel.slot} / {channel.smu} {channel.unit}" + (handle,) = plt.plot( + dlog.timestamps, channel.data, "b-", label=label, markersize=1 + ) + handles.append(handle) + (handle,) = plt.plot( + dlog.timestamps, + running_mean(channel.data, 10), + "r-", + label=f"mean({label}, 10)", + markersize=1, + ) + handles.append(handle) + + plt.legend(handles=handles) + plt.xlabel("Time [s]") + plt.ylabel("Voltage [V] / Current [A] / Power [W]") + plt.grid(True) + plt.show() + + +def export_csv(dlog, filename): + cols, rows = dlog.data.shape + with open(filename, "w", newline="") as f: + writer = csv.writer(f) + channel_header = list( + map(lambda x: f"Slot {x.slot} {x.unit} ({x.smu})", dlog.channels) + ) + writer.writerow(["Timestamp [s]"] + channel_header) + for row in range(rows): + writer.writerow([dlog.timestamps[row]] + list(dlog.data[:, row])) + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ + ) + parser.add_argument( + "--csv-export", type=str, help="Export measurements to CSV file" + ) + parser.add_argument( + "--plot", + help="Draw plots of voltage/current/power overtime", + action="store_true", + ) + parser.add_argument( + "--stat", help="Print mean voltage, current, and power", action="store_true" + ) + parser.add_argument( + "dlog_file", type=str, help="Input filename in Keysight dlog format" + ) + + args = parser.parse_args() + + print(args) + + dlog = DLog(args.dlog_file) + + if dlog.duration_deviates: + print( + "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format( + dlog.observed_duration, dlog.planned_duration, dlog.interval * 1000000 + ) + ) + else: + print( + "Measurement duration: {:d} seconds at {:f} µs per sample".format( + dlog.planned_duration, dlog.interval * 1000000 + ) + ) + + if args.stat: + print_stats(dlog) + + if args.csv_export: + export_csv(dlog, args.csv_export) + + if args.plot: + if dlog.all_data_slots_have_power() and False: + show_power_plot(dlog) + else: + show_raw_plot(dlog) + + +if __name__ == "__main__": + main() -- cgit v1.2.3