#!/usr/bin/env python3 import lzma import matplotlib.pyplot as plt import numpy as np import os import struct import sys import xml.etree.ElementTree as ET def plot_y(Y, **kwargs): plot_xy(np.arange(len(Y)), Y, **kwargs) def plot_xy(X, Y, xlabel=None, ylabel=None, title=None, output=None): fig, ax1 = plt.subplots(figsize=(10, 6)) if title != None: fig.canvas.set_window_title(title) if xlabel != None: ax1.set_xlabel(xlabel) if ylabel != None: ax1.set_ylabel(ylabel) plt.subplots_adjust(left=0.1, bottom=0.1, right=0.99, top=0.99) plt.plot(X, Y, "bo", markersize=2) if output: plt.savefig(output) with open("{}.txt".format(output), "w") as f: print("X Y", file=f) for i in range(len(X)): print("{} {}".format(X[i], Y[i]), file=f) else: plt.show() filename = sys.argv[1] with open(filename, "rb") as logfile: lines = [] line = "" if ".xz" in filename: f = lzma.open(logfile) else: f = logfile while line != "\n": line = f.readline().decode() lines.append(line) xml_header = "".join(lines) raw_header = f.read(8) data_offset = f.tell() raw_data = f.read() xml_header = xml_header.replace("1ua>", "X1ua>") xml_header = xml_header.replace("2ua>", "X2ua>") dlog = ET.fromstring(xml_header) channels = [] for channel in dlog.findall("channel"): channel_id = int(channel.get("id")) sense_curr = channel.find("sense_curr").text sense_volt = channel.find("sense_volt").text model = channel.find("ident").find("model").text if sense_volt == "1": channels.append((channel_id, model, "V")) if sense_curr == "1": channels.append((channel_id, model, "A")) num_channels = len(channels) duration = int(dlog.find("frame").find("time").text) interval = float(dlog.find("frame").find("tint").text) real_duration = interval * int(len(raw_data) / (4 * num_channels)) data = np.ndarray( shape=(num_channels, int(len(raw_data) / (4 * num_channels))), dtype=np.float32 ) iterator = struct.iter_unpack(">f", raw_data) channel_offset = 0 measurement_offset = 0 for value in iterator: data[channel_offset, measurement_offset] = value[0] if channel_offset + 1 == num_channels: channel_offset = 0 measurement_offset += 1 else: channel_offset += 1 if int(real_duration) != duration: print( "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format( real_duration, duration, interval * 1000000 ) ) else: print( "Measurement duration: {:d} seconds at {:f} µs per sample".format( duration, interval * 1000000 ) ) for i, channel in enumerate(channels): channel_id, channel_model, channel_type = channel print( "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} {:s}".format( channel_id, channel_model, np.min(data[i]), np.max(data[i]), np.mean(data[i]), channel_type, ) ) if ( i > 0 and channel_type == "A" and channels[i - 1][2] == "V" and channel_id == channels[i - 1][0] ): power = data[i - 1] * data[i] power = 3.6 * data[i] print( "channel {:d} ({:s}): min {:f}, max {:f}, mean {:f} W".format( channel_id, channel_model, np.min(power), np.max(power), np.mean(power) ) ) min_power = np.min(power) max_power = np.max(power) power_border = np.mean([min_power, max_power]) low_power = power[power < power_border] high_power = power[power >= power_border] plot_y(power) print( " avg low / high power (delta): {:f} / {:f} ({:f}) W".format( np.mean(low_power), np.mean(high_power), np.mean(high_power) - np.mean(low_power), ) ) # plot_y(low_power) # plot_y(high_power) high_power_durations = [] current_high_power_duration = 0 for is_hpe in power >= power_border: if is_hpe: current_high_power_duration += interval else: if current_high_power_duration > 0: high_power_durations.append(current_high_power_duration) current_high_power_duration = 0 print( " avg high-power duration: {:f} µs".format( np.mean(high_power_durations) * 1000000 ) ) # print(xml_header) # print(raw_header) # print(channels) # print(data) # print(np.mean(data[0])) # print(np.mean(data[1])) # print(np.mean(data[0] * data[1]))