#!/usr/bin/env python3 # vim:tabstop=4:softtabstop=4:shiftwidth=4:textwidth=160:smarttab:expandtab import getopt import itertools import matplotlib.pyplot as plt import numpy as np import os import re from shutil import which import subprocess import sys import tempfile opt = dict() def measure_data(filename, time): # libmsp430.so must be available if not 'LD_LIBRARY_PATH' in os.environ: os.environ['LD_LIBRARY_PATH'] = '{}/var/projects/msp430/MSP430Flasher_1.3.15'.format(os.environ['HOME']) # https://github.com/carrotIndustries/energytrace-util must be available energytrace_cmd = 'energytrace' if which(energytrace_cmd) is None: energytrace_cmd = '{}/var/source/energytrace-util/energytrace64'.format(os.environ['HOME']) if filename is not None: output_handle = open(filename, 'w+') else: output_handle = tempfile.TemporaryFile('w+') res = subprocess.run([energytrace_cmd, str(duration)], stdout = output_handle, universal_newlines = True) output_handle.seek(0) output = output_handle.read() output_handle.close() return output def show_help(): print('''msp430-etv - MSP430 EnergyTrace Visualizer USAGE msp430-etv [--load | ] [--save ] [--skip ] [--threshold ] [--plot] [--stat] DESCRIPTION msp430-etv takes energy measurements from an MSP430 Launchpad or similar device using MSP430 EnergyTrace technology. Measurements can be taken directly (by specifying in seconds) or loaded from a logfile using --load . Data can be plotted or aggregated on stdout. OPTIONS --load Load data from --save Save measurement data in --skip Skip data samples. This is useful to avoid startup code influencing the results of a long-running measurement --threshold |mean Partition data into points with mean power >= and points with mean power < , and print some statistics. higher power is handled as peaks, whereas low-power measurements constitute the baseline. If the threshold is set to "mean", the mean power of all measurements will be used --threshold-peakcount Automatically determine a threshold so that there are exactly peaks. A peaks is a group of consecutive measurements with mean power >= threshold. WARNING: In general, there is more than one threshold value leading to exactly peaks. If the difference between baseline and peak power is sufficiently high, this option should do what you mean[tm] --plot Draw power/time plot --stat Print mean voltage, current, and power as well as total energy consumption. DEPENDENCIES For data measurements (i.e., any invocation not using --load), energytrace-util must be available in $PATH and libmsp430.so must be located in the LD library search path (e.g. LD_LIBRARY_PATH=../MSP430Flasher). ''') def peak_search(data, lower, upper, direction_function): while upper - lower > 1e-6: bs_test = np.mean([lower, upper]) peakcount = itertools.groupby(data, lambda x: x >= bs_test) peakcount = filter(lambda x: x[0] == True, peakcount) peakcount = sum(1 for i in peakcount) direction = direction_function(peakcount, bs_test) if direction == 0: return bs_test elif direction == 1: lower = bs_test else: upper = bs_test return None def peak_search2(data, lower, upper, check_function): for power in np.arange(lower, upper, 1e-6): peakcount = itertools.groupby(data, lambda x: x >= power) peakcount = filter(lambda x: x[0] == True, peakcount) peakcount = sum(1 for i in peakcount) if check_function(peakcount, power) == 0: return power return None if __name__ == '__main__': try: optspec = ('help load= save= skip= threshold= threshold-peakcount= plot stat') raw_opts, args = getopt.getopt(sys.argv[1:], "", optspec.split(' ')) for option, parameter in raw_opts: optname = re.sub(r'^--', '', option) opt[optname] = parameter if 'help' in opt: show_help() sys.exit(0) if not 'load' in opt: duration = int(args[0]) if not 'save' in opt: opt['save'] = None if 'skip' in opt: opt['skip'] = int(opt['skip']) else: opt['skip'] = 0 if 'threshold' in opt and opt['threshold'] != 'mean': opt['threshold'] = float(opt['threshold']) if 'threshold-peakcount' in opt: opt['threshold-peakcount'] = int(opt['threshold-peakcount']) except getopt.GetoptError as err: print(err) sys.exit(2) except IndexError: print('Usage: msp430-etv ') sys.exit(2) except ValueError: print('Error: duration or skip is not a number') sys.exit(2) if 'load' in opt: with open(opt['load'], 'r') as f: log_data = f.read() else: log_data = measure_data(opt['save'], duration) lines = log_data.split('\n') data_count = sum(map(lambda x: len(x) > 0 and x[0] != '#', lines)) data_lines = filter(lambda x: len(x) > 0 and x[0] != '#', lines) data = np.empty((data_count - opt['skip'], 4)) for i, line in enumerate(data_lines): if i >= opt['skip']: timestamp, current, voltage, total_energy = map(float, line.split(' ')) data[i - opt['skip']] = [timestamp, current, voltage, total_energy] m_duration = data[-1, 0] - data[0, 0] m_energy = data[-1, 3] - data[0, 3] m_calc_energy = np.sum(data[1:, 1] * data[1:, 2] * (data[1:, 0] - data[:-1, 0])) m_energy_deviation = np.abs(m_energy - m_calc_energy) / np.max([m_energy, m_calc_energy]) print('{:d} measurements in {:.2f} s = {:.0f} Hz sample rate'.format( data_count, m_duration, data_count / m_duration)) print('Reported energy: E = {:f} J'.format(m_energy)) print('Calculated energy: U*I*t = {:f} J'.format(m_calc_energy)) print('Energy deviation: {:.1f}%'.format(m_energy_deviation * 100)) power = data[:, 1] * data[:, 2] if 'threshold-peakcount' in opt: bs_mean = np.mean(power) # Finding the correct threshold is tricky. If #peaks < peakcont, our # current threshold may be too low (extreme case: a single peaks # containing all measurements), but it may also be too high (extreme # case: a single peak containing just one data point). Similarly, # #peaks > peakcount may be due to baseline noise causing lots of # small peaks, or due to peak noise (if the threshold is already rather # high). # For now, we first try a simple binary search: # The threshold is probably somewhere around the mean, so if # #peaks != peakcount and threshold < mean, we go up, and if # #peaks != peakcount and threshold >= mean, we go down. # If that doesn't work, we fall back to a linear search in 1 µW steps def direction_function(peakcount, power): if peakcount == opt['threshold-peakcount']: return 0; if power < bs_mean: return 1; return -1; threshold = peak_search(power, np.min(power), np.max(power), direction_function) if threshold == None: threshold = peak_search2(power, np.min(power), np.max(power), direction_function) if threshold != None: print('Threshold set to {:.0f} µW : {:.9f}'.format(threshold * 1e6, threshold)) opt['threshold'] = threshold else: print('Found no working threshold') if 'threshold' in opt: if opt['threshold'] == 'mean': opt['threshold'] = np.mean(power) print('Threshold set to {:.0f} µW : {:.9f}'.format(opt['threshold'] * 1e6, opt['threshold'])) baseline_mean = 0 if np.any(power < opt['threshold']): baseline_mean = np.mean(power[power < opt['threshold']]) print('Baseline mean: {:.0f} µW : {:.9f}'.format( baseline_mean * 1e6, baseline_mean)) if np.any(power >= opt['threshold']): print('Peak mean: {:.0f} µW : {:.9f}'.format( np.mean(power[power >= opt['threshold']]) * 1e6, np.mean(power[power >= opt['threshold']]))) peaks = [] peak_start = -1 for i, dp in enumerate(power): if dp >= opt['threshold'] and peak_start == -1: peak_start = i elif dp < opt['threshold'] and peak_start != -1: peaks.append((peak_start, i)) peak_start = -1 total_energy = 0 delta_energy = 0 for peak in peaks: duration = data[peak[1]-1, 0] - data[peak[0], 0] total_energy += np.mean(power[peak[0] : peak[1]]) * duration delta_energy += (np.mean(power[peak[0] : peak[1]]) - baseline_mean) * duration print('{:.2f}ms peak ({:f} -> {:f})'.format(duration * 1000, data[peak[0], 0], data[peak[1]-1, 0])) print(' {:f} µJ / mean {:f} µW'.format( np.mean(power[peak[0] : peak[1]]) * duration * 1e6, np.mean(power[peak[0] : peak[1]]) * 1e6 )) print('Peak energy mean: {:.0f} µJ : {:.9f}'.format( total_energy * 1e6 / len(peaks), total_energy / len(peaks))) print('Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}'.format( delta_energy * 1e6 / len(peaks), delta_energy / len(peaks))) if 'stat' in opt: mean_voltage = np.mean(data[:, 2]) mean_current = np.mean(data[:, 1]) mean_power = np.mean(data[:, 1] * data[:, 2]) print('Mean voltage: {:.2f} V : {:.9f}'.format(mean_voltage, mean_voltage)) print('Mean current: {:.0f} µA : {:.9f}'.format(mean_current * 1e6, mean_current)) print('Mean power: {:.0f} µW : {:.9f}'.format(mean_power * 1e6, mean_power)) print('Total energy: {:f} J : {:.9f}'.format(m_energy, m_energy)) if 'plot' in opt: pwrhandle, = plt.plot(data[:, 0], data[:, 1] * data[:, 2], 'b-', label='U*I', markersize=1) #energyhandle, = plt.plot(data[1:, 0], (data[1:, 3] - data[:-1, 3]) / (data[1:, 0] - data[:-1, 0]), 'r-', label='E/Δt', markersize=1) plt.legend(handles=[pwrhandle]) plt.xlabel('Time [s]') plt.ylabel('Power [W]') plt.grid(True) plt.show()