#!/usr/bin/env python3 # Copyright (C) 2020 Daniel Friesel # # SPDX-License-Identifier: GPL-2.0-or-later # vim:tabstop=4:softtabstop=4:shiftwidth=4:textwidth=160:smarttab:expandtab """msp430-etv - MSP430 EnergyTrace Visualizer DESCRIPTION msp430-etv takes energy measurements from an MSP430 Launchpad or similar device using MSP430 EnergyTrace technology. Measurements can be taken directly (by specifying in seconds) or loaded from a logfile using --load . Data can be plotted or aggregated on stdout. This program is not affiliated with Texas Instruments. Use at your own risk. DEPENDENCIES For data measurements (i.e., any invocation not using --load), energytrace-util must be available in $PATH and libmsp430.so must be located in the LD library search path (e.g. LD_LIBRARY_PATH=../MSP430Flasher). OPTIONS """ import argparse import itertools import json import numpy as np import os from shutil import which import subprocess import sys import tempfile import time opt = dict() def running_mean(x: np.ndarray, N: int) -> np.ndarray: """ Compute `N` elements wide running average over `x`. :param x: 1-Dimensional NumPy array :param N: how many items to average. Should be even for optimal results. """ # to ensure that output.shape == input.shape, we need to insert data # at the boundaries boundary_array = np.insert(x, 0, np.full((N // 2), x[0])) boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2 - 1), x[-1])) return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid") class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(NpEncoder, self).default(obj) def PELT_get_changepoints(algo, penalty): res = (penalty, algo.predict(pen=penalty)) return res class PELT: def __init__(self, signal, num_samples=None): self.signal = signal self.model = "l1" self.jump = 1 self.min_dist = 46 if num_samples is not None: self.ds_factor = len(signal) // num_samples else: self.ds_factor = 1 self.jump = self.ds_factor def norm_signal(self, signal, scaler=25): max_val = max(np.abs(signal)) normed_signal = np.zeros(shape=len(signal)) for i, signal_i in enumerate(signal): normed_signal[i] = signal_i / max_val normed_signal[i] = normed_signal[i] * scaler return normed_signal def get_changepoints(self): # imported here as multiprocessing and ruptures are only used for changepoint detection from multiprocessing import Pool import ruptures algo = ruptures.Pelt( model=self.model, jump=self.jump, min_size=self.min_dist ).fit(self.norm_signal(self.signal)) queue = list() for i in range(0, 100): queue.append((algo, i)) with Pool() as pool: changepoints = pool.starmap(PELT_get_changepoints, queue) changepoints_by_penalty = dict() for res in changepoints: changepoints_by_penalty[res[0]] = res[1] num_changepoints = list() for i in range(0, 100): num_changepoints.append(len(changepoints_by_penalty[i])) # Find plateau start_index = -1 end_index = -1 longest_start = -1 longest_end = -1 prev_val = -1 for i, num_bkpts in enumerate(num_changepoints): if num_bkpts != prev_val: end_index = i - 1 if end_index - start_index > longest_end - longest_start: # currently found sequence is the longest found yet longest_start = start_index longest_end = end_index start_index = i if i == len(num_changepoints) - 1: # end sequence with last value end_index = i # # since it is not guaranteed that this is the end of the plateau, assume the mid # # of the plateau was hit. # size = end_index - start_index # end_index = end_index + size # However this is not the clean solution. Better if search interval is widened # with range_min and range_max if end_index - start_index > longest_end - longest_start: # last found sequence is the longest found yet longest_start = start_index longest_end = end_index start_index = i prev_val = num_bkpts middle_of_plateau = longest_start + (longest_start - longest_start) // 2 changepoints = np.array(changepoints_by_penalty[middle_of_plateau]) return changepoints def measure_data(filename, duration): # libmsp430.so must be available if not "LD_LIBRARY_PATH" in os.environ: os.environ[ "LD_LIBRARY_PATH" ] = "{}/var/projects/msp430/MSP430Flasher_1.3.15".format(os.environ["HOME"]) # https://ess.cs.uos.de/git/df/energytrace-util must be available energytrace_cmd = "energytrace" if which(energytrace_cmd) is None: energytrace_cmd = "{}/var/source/energytrace-util/energytrace64".format( os.environ["HOME"] ) if filename is not None: output_handle = open(filename, "w+") else: output_handle = tempfile.TemporaryFile("w+") energytrace = subprocess.Popen( [energytrace_cmd, str(duration)], stdout=output_handle, universal_newlines=True ) try: if duration: time.sleep(duration) else: print("Press Ctrl+C to stop measurement") while True: time.sleep(3600) except KeyboardInterrupt: energytrace.send_signal(subprocess.signal.SIGTERM) energytrace.communicate(timeout=5) output_handle.seek(0) output = output_handle.read() output_handle.close() return output def export_json(filename, data=dict()): with open(filename, "w") as f: json.dump(data, f, cls=NpEncoder) def detect_changepoints(timestamps, trace, num_samples): pelt = PELT(running_mean(trace, 10), num_samples=num_samples) changepoints = pelt.get_changepoints() prev = 0 ret = list() for cp in changepoints: cp = cp - 1 ret.append( { "interval": [timestamps[prev], timestamps[cp]], "mean": np.mean(trace[prev:cp]), } ) prev = cp return ret def peak_search(data, lower, upper, direction_function): while upper - lower > 1e-6: bs_test = np.mean([lower, upper]) peakcount = itertools.groupby(data, lambda x: x >= bs_test) peakcount = filter(lambda x: x[0] == True, peakcount) peakcount = sum(1 for i in peakcount) direction = direction_function(peakcount, bs_test) if direction == 0: return bs_test elif direction == 1: lower = bs_test else: upper = bs_test return None def peak_search2(data, lower, upper, check_function): for power in np.arange(lower, upper, 1e-6): peakcount = itertools.groupby(data, lambda x: x >= power) peakcount = filter(lambda x: x[0] == True, peakcount) peakcount = sum(1 for i in peakcount) if check_function(peakcount, power) == 0: return power return None def plot_changepoints_vlines(changepoints): X = list() for cp in changepoints: X.append(cp["interval"][1]) return X def main(): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__ ) parser.add_argument("--load", metavar="FILE", type=str, help="Load data from FILE") parser.add_argument( "--save", metavar="FILE", type=str, help="Save measurement data in FILE" ) parser.add_argument( "--json-export", metavar="FILENAME", type=str, help="Export analysis results (e.g. changepoints) to JSON file", ) parser.add_argument( "--skip", metavar="N", type=float, default=0, help="Skip the first N seconds of data. This is useful to avoid startup code influencing the results of a long-running measurement", ) parser.add_argument( "--limit", type=float, metavar="N", help="Limit analysis to the first N seconds of data", ) parser.add_argument( "--pelt", metavar="FREQ", type=int, help="Perform changepoint detection with FREQ Hz", ) parser.add_argument( "--threshold", metavar="WATTS", type=str, help="Partition data into points with mean power >= WATTS and points with mean power < WATTS, and print some statistics. higher power is handled as peaks, whereas low-power measurements constitute the baseline. If WATTS is 'mean', the mean power of all measurements will be used", ) parser.add_argument( "--threshold-peakcount", metavar="NUM", type=int, help="Automatically determine a threshold so that there are exactly NUM peaks. A peaks is a group of consecutive measurements with mean power >= threshold. WARNING: In general, there is more than one threshold value leading to exactly NUM peaks. If the difference between baseline and peak power is sufficiently high, this option should do what you mean[tm]", ) parser.add_argument( "--plot", metavar="UNIT", choices=["U", "I", "P", "P/U"], help="Plot voltage / current / power over time", ) parser.add_argument( "--stat", action="store_true", help="Print mean voltage, current, and power as well as total energy consumption", ) parser.add_argument( "--histogram", metavar="N", type=int, help="Draw histograms of reported energy values per measurement interval (i.e., the differences between each pair of consecutive total energy readings), measurement interval duration, and mean power values per measurement interval (calculated from energy difference and duration). Each histogram uses N buckets", ) parser.add_argument( "duration", type=int, nargs="?", help="Measurement duration in seconds" ) args = parser.parse_args() if args.load is None and args.duration is None: print("Either --load or duration must be specified", file=sys.stderr) sys.exit(1) if args.threshold is not None and args.threshold != "mean": args.threshold = float(args.threshold) if args.load: if args.load.endswith(".xz"): import lzma with lzma.open(args.load, "rt") as f: log_data = f.read() else: with open(args.load, "r") as f: log_data = f.read() else: log_data = measure_data(args.save, args.duration) lines = log_data.split("\n") data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) data = np.empty((data_count, 4)) skip_offset = 0 limit_index = data_count energy_overflow_count = 0 prev_total_energy = 0 for i, line in enumerate(data_lines): fields = line.split(" ") if len(fields) == 4: timestamp, current, voltage, total_energy = map(int, fields) elif len(fields) == 5: cpustate = fields[0] timestamp, current, voltage, total_energy = map(int, fields[1:]) else: raise RuntimeError('cannot parse line "{}"'.format(line)) if total_energy < 0 and prev_total_energy > 0: energy_overflow_count += 1 prev_total_energy = total_energy total_energy += energy_overflow_count * (2 ** 32) if args.skip is not None and timestamp * 1e-6 < args.skip: skip_offset = i + 1 continue if args.limit is not None and timestamp * 1e-6 > args.limit: limit_index = i - 1 break data[i] = [timestamp, current, voltage, total_energy] data = data[skip_offset:limit_index] m_duration_us = data[-1, 0] - data[0, 0] m_energy_nj = data[-1, 3] - data[0, 3] print( "{:d} measurements in {:.2f} s = {:.0f} Hz sample rate".format( len(data), m_duration_us * 1e-6, len(data) / (m_duration_us * 1e-6) ) ) print("Reported energy: E = {:f} J".format(m_energy_nj * 1e-9)) # nJ / us = mW -> (nJ * 1e-9) / (us * 1e-6) = W # Do not use power = data[:, 1] * data[:, 2] * 1e-12 here: nA values provided by the EnergyTrace library in data[:, 1] are heavily filtered and mostly # useless for visualization and calculation. They often do not agree with the nJ values in data[:, 3]. power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ( (data[1:, 0] - data[:-1, 0]) * 1e-6 ) if args.threshold_peakcount: bs_mean = np.mean(power) # Finding the correct threshold is tricky. If #peaks < peakcont, our # current threshold may be too low (extreme case: a single peaks # containing all measurements), but it may also be too high (extreme # case: a single peak containing just one data point). Similarly, # #peaks > peakcount may be due to baseline noise causing lots of # small peaks, or due to peak noise (if the threshold is already rather # high). # For now, we first try a simple binary search: # The threshold is probably somewhere around the mean, so if # #peaks != peakcount and threshold < mean, we go up, and if # #peaks != peakcount and threshold >= mean, we go down. # If that doesn't work, we fall back to a linear search in 1 µW steps def direction_function(peakcount, power): if peakcount == args.threshold - peakcount: return 0 if power < bs_mean: return 1 return -1 threshold = peak_search(power, np.min(power), np.max(power), direction_function) if threshold == None: threshold = peak_search2( power, np.min(power), np.max(power), direction_function ) if threshold != None: print( "Threshold set to {:.0f} µW : {:.9f}".format( threshold * 1e6, threshold ) ) args.threshold = threshold else: print("Found no working threshold") if args.threshold: if args.threshold == "mean": args.threshold = np.mean(power) print( "Threshold set to {:.0f} µW : {:.9f}".format( args.threshold * 1e6, args.threshold ) ) baseline_mean = 0 if np.any(power < args.threshold): baseline_mean = np.mean(power[power < args.threshold]) print( "Baseline mean: {:.0f} µW : {:.9f}".format( baseline_mean * 1e6, baseline_mean ) ) if np.any(power >= args.threshold): print( "Peak mean: {:.0f} µW : {:.9f}".format( np.mean(power[power >= args.threshold]) * 1e6, np.mean(power[power >= args.threshold]), ) ) peaks = [] peak_start = -1 for i, dp in enumerate(power): if dp >= args.threshold and peak_start == -1: peak_start = i elif dp < args.threshold and peak_start != -1: peaks.append((peak_start, i)) peak_start = -1 total_energy = 0 delta_energy = 0 for peak in peaks: duration = data[peak[1] - 1, 0] - data[peak[0], 0] total_energy += np.mean(power[peak[0] : peak[1]]) * duration delta_energy += ( np.mean(power[peak[0] : peak[1]]) - baseline_mean ) * duration print( "{:.2f}ms peak ({:f} -> {:f})".format( duration * 1000, data[peak[0], 0], data[peak[1] - 1, 0] ) ) print( " {:f} µJ / mean {:f} µW".format( np.mean(power[peak[0] : peak[1]]) * duration * 1e6, np.mean(power[peak[0] : peak[1]]) * 1e6, ) ) print( "Peak energy mean: {:.0f} µJ : {:.9f}".format( total_energy * 1e6 / len(peaks), total_energy / len(peaks) ) ) print( "Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}".format( delta_energy * 1e6 / len(peaks), delta_energy / len(peaks) ) ) power_from_energy = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ( (data[1:, 0] - data[:-1, 0]) * 1e-6 ) smooth_power = running_mean(power_from_energy, 10) if args.pelt: power_changepoints = detect_changepoints( data[1:, 0] * 1e-6, power_from_energy, num_samples=args.pelt ) print(f"Found {len(power_changepoints)} changepoints for power") current_changepoints = detect_changepoints( data[1:, 0] * 1e-6, power_from_energy / (data[1:, 2] * 1e-3), num_samples=args.pelt, ) print(f"Found {len(current_changepoints)} changepoints for current") if args.stat: mean_voltage = np.mean(data[:, 2] * 1e-3) mean_power = np.mean(power_from_energy) current = power_from_energy / (data[1:, 2] * 1e-3) mean_current = np.mean(current) print( "Mean voltage: {:.2f} V : {:.9f}".format(mean_voltage, mean_voltage) ) print( "Mean current: {:.0f} µA : {:.9f}".format( mean_current * 1e6, mean_current ) ) print( "Current prediction error: {:.0f} µA ({:.2f}%)".format( np.mean(np.abs(mean_current - current)) * 1e6, np.mean( np.abs(mean_current - current) / ((np.abs(current) + np.abs(mean_current)) / 2) ) * 100, ) ) print( "Mean power: {:.0f} µW : {:.9f}".format(mean_power * 1e6, mean_power) ) print( "Total energy: {:f} J : {:.9f}".format( m_energy_nj * 1e-9, m_energy_nj * 1e-9 ) ) if args.json_export: extra_data = dict() if args.pelt: extra_data["power_changepoints"] = power_changepoints extra_data["current_changepoints"] = current_changepoints export_json(args.json_export, extra_data) if args.plot: import matplotlib.pyplot as plt if args.plot == "U": # mV (energyhandle,) = plt.plot( data[1:, 0] * 1e-6, data[1:, 2] * 1e-3, "b-", label="U", markersize=1 ) (meanhandle,) = plt.plot( data[1:, 0] * 1e-6, running_mean(data[1:, 2], 10) * 1e-3, "r-", label="mean(U, 10)", markersize=1, ) plt.legend(handles=[energyhandle, meanhandle]) plt.ylabel("Voltage [V]") elif args.plot == "I": print( "Warning: The current reported by energytrace is aggressively smoothed and often inaccurate." ) # nA (energyhandle,) = plt.plot( data[1:, 0] * 1e-6, data[1:, 1] * 1e-9, "b-", label="I", markersize=1 ) (meanhandle,) = plt.plot( data[1:, 0] * 1e-6, running_mean(data[1:, 1], 10) * 1e-9, "r-", label="mean(I, 10)", markersize=1, ) plt.legend(handles=[energyhandle, meanhandle]) plt.ylabel("Current [A]") elif args.plot == "P/U": X = data[1:, 0] * 1e-6 Y = power_from_energy / (data[1:, 2] * 1e-3) (energyhandle,) = plt.plot( X, Y, "b-", label="I=ΔE/(Δt·U)", markersize=1, ) (meanhandle,) = plt.plot( X, smooth_power / (data[1:, 2] * 1e-3), "r-", label="mean(I, 10)", markersize=1, ) if args.pelt: plt.vlines( plot_changepoints_vlines(current_changepoints), np.min(Y), np.max(Y), "g", label="changepoints(I)", ) plt.legend(handles=[energyhandle, meanhandle]) plt.ylabel("Current [A]") else: X = data[1:, 0] * 1e-6 Y = power_from_energy (energyhandle,) = plt.plot( X, Y, "b-", label="P=ΔE/Δt", markersize=1, ) (meanhandle,) = plt.plot( X, smooth_power, "r-", label="mean(P, 10)", markersize=1, ) if args.pelt: plt.vlines( plot_changepoints_vlines(power_changepoints), np.min(Y), np.max(Y), "g", label="changepoints(I)", ) plt.legend(handles=[energyhandle, meanhandle]) plt.ylabel("Power [W]") plt.xlabel("Time [s]") plt.grid(True) if args.load: plt.title(args.load) plt.show() if args.histogram: import matplotlib.pyplot as plt bin_count = args.histogram plt.title("EnergyTrace Data Analysis") plt.xlabel("Reported Energy per Measurement Interval [J]") plt.ylabel("Count") plt.hist((data[1:, 3] - data[:-1, 3]) * 1e-9, bins=bin_count) plt.show() plt.title("EnergyTrace Data Analysis") plt.xlabel("Measurement Interval Duration [s]") plt.ylabel("Count") plt.hist((data[1:, 0] - data[:-1, 0]) * 1e-6, bins=bin_count) plt.show() plt.title("EnergyTrace Data Analysis") plt.xlabel("Mean Power per Measurement Interval [W]") plt.ylabel("Count") plt.hist(power_from_energy, bins=bin_count) plt.show() plt.title("Postprocessing via Running average (window size=10)") plt.xlabel("Mean Power per Measurement Interval [W]") plt.ylabel("Count") plt.hist(smooth_power, bins=bin_count) plt.show() if __name__ == "__main__": main()