From 30d7cc1b5d75f98eaa0ea5ae97f4f42eef57ad72 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Wed, 20 May 2020 08:32:12 +0200 Subject: autoformat the code (using black(1)) --- bin/msp430-etv | 253 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 160 insertions(+), 93 deletions(-) diff --git a/bin/msp430-etv b/bin/msp430-etv index 5356f01..624f91f 100755 --- a/bin/msp430-etv +++ b/bin/msp430-etv @@ -15,6 +15,7 @@ import time opt = dict() + def running_mean(x: np.ndarray, N: int) -> np.ndarray: """ Compute `N` elements wide running average over `x`. @@ -31,28 +32,35 @@ def running_mean(x: np.ndarray, N: int) -> np.ndarray: cumsum = np.cumsum(boundary_array) return (cumsum[N:] - cumsum[:-N]) / N + def measure_data(filename, duration): # libmsp430.so must be available - if not 'LD_LIBRARY_PATH' in os.environ: - os.environ['LD_LIBRARY_PATH'] = '{}/var/projects/msp430/MSP430Flasher_1.3.15'.format(os.environ['HOME']) + if not "LD_LIBRARY_PATH" in os.environ: + os.environ[ + "LD_LIBRARY_PATH" + ] = "{}/var/projects/msp430/MSP430Flasher_1.3.15".format(os.environ["HOME"]) # https://github.com/carrotIndustries/energytrace-util must be available - energytrace_cmd = 'energytrace' + energytrace_cmd = "energytrace" if which(energytrace_cmd) is None: - energytrace_cmd = '{}/var/source/energytrace-util/energytrace64'.format(os.environ['HOME']) + energytrace_cmd = "{}/var/source/energytrace-util/energytrace64".format( + os.environ["HOME"] + ) if filename is not None: - output_handle = open(filename, 'w+') + output_handle = open(filename, "w+") else: - output_handle = tempfile.TemporaryFile('w+') + output_handle = tempfile.TemporaryFile("w+") - energytrace = subprocess.Popen([energytrace_cmd, str(duration)], stdout = output_handle, universal_newlines = True) + energytrace = subprocess.Popen( + [energytrace_cmd, str(duration)], stdout=output_handle, universal_newlines=True + ) try: if duration: time.sleep(duration) else: - print('Press Ctrl+C to stop measurement') + print("Press Ctrl+C to stop measurement") while True: time.sleep(3600) except KeyboardInterrupt: @@ -66,8 +74,10 @@ def measure_data(filename, duration): return output + def show_help(): - print('''msp430-etv - MSP430 EnergyTrace Visualizer + print( + """msp430-etv - MSP430 EnergyTrace Visualizer USAGE @@ -117,7 +127,9 @@ For data measurements (i.e., any invocation not using --load), energytrace-util must be available in $PATH and libmsp430.so must be located in the LD library search path (e.g. LD_LIBRARY_PATH=../MSP430Flasher). - ''') + """ + ) + def peak_search(data, lower, upper, direction_function): while upper - lower > 1e-6: @@ -134,6 +146,7 @@ def peak_search(data, lower, upper, direction_function): upper = bs_test return None + def peak_search2(data, lower, upper, check_function): for power in np.arange(lower, upper, 1e-6): peakcount = itertools.groupby(data, lambda x: x >= power) @@ -143,61 +156,62 @@ def peak_search2(data, lower, upper, check_function): return power return None -if __name__ == '__main__': + +if __name__ == "__main__": try: - optspec = ('help load= save= skip= threshold= threshold-peakcount= plot stat histogram=') - raw_opts, args = getopt.getopt(sys.argv[1:], "", optspec.split(' ')) + optspec = "help load= save= skip= threshold= threshold-peakcount= plot stat histogram=" + raw_opts, args = getopt.getopt(sys.argv[1:], "", optspec.split(" ")) for option, parameter in raw_opts: - optname = re.sub(r'^--', '', option) + optname = re.sub(r"^--", "", option) opt[optname] = parameter - if 'help' in opt: + if "help" in opt: show_help() sys.exit(0) - if not 'load' in opt: + if not "load" in opt: duration = int(args[0]) - if not 'save' in opt: - opt['save'] = None + if not "save" in opt: + opt["save"] = None - if 'skip' in opt: - opt['skip'] = int(opt['skip']) + if "skip" in opt: + opt["skip"] = int(opt["skip"]) else: - opt['skip'] = 0 + opt["skip"] = 0 - if 'threshold' in opt and opt['threshold'] != 'mean': - opt['threshold'] = float(opt['threshold']) + if "threshold" in opt and opt["threshold"] != "mean": + opt["threshold"] = float(opt["threshold"]) - if 'threshold-peakcount' in opt: - opt['threshold-peakcount'] = int(opt['threshold-peakcount']) + if "threshold-peakcount" in opt: + opt["threshold-peakcount"] = int(opt["threshold-peakcount"]) except getopt.GetoptError as err: print(err) sys.exit(2) except IndexError: - print('Usage: msp430-etv ') + print("Usage: msp430-etv ") sys.exit(2) except ValueError: - print('Error: duration or skip is not a number') + print("Error: duration or skip is not a number") sys.exit(2) - if 'load' in opt: - with open(opt['load'], 'r') as f: + if "load" in opt: + with open(opt["load"], "r") as f: log_data = f.read() else: - log_data = measure_data(opt['save'], duration) + log_data = measure_data(opt["save"], duration) - lines = log_data.split('\n') - data_count = sum(map(lambda x: len(x) > 0 and x[0] != '#', lines)) - data_lines = filter(lambda x: len(x) > 0 and x[0] != '#', lines) + lines = log_data.split("\n") + data_count = sum(map(lambda x: len(x) > 0 and x[0] != "#", lines)) + data_lines = filter(lambda x: len(x) > 0 and x[0] != "#", lines) - data = np.empty((data_count - opt['skip'], 4)) + data = np.empty((data_count - opt["skip"], 4)) for i, line in enumerate(data_lines): - if i >= opt['skip']: - fields = line.split(' ') + if i >= opt["skip"]: + fields = line.split(" ") if len(fields) == 4: timestamp, current, voltage, total_energy = map(int, fields) elif len(fields) == 5: @@ -205,24 +219,28 @@ if __name__ == '__main__': timestamp, current, voltage, total_energy = map(int, fields[1:]) else: raise RuntimeError('cannot parse line "{}"'.format(line)) - data[i - opt['skip']] = [timestamp, current, voltage, total_energy] + data[i - opt["skip"]] = [timestamp, current, voltage, total_energy] m_duration_us = data[-1, 0] - data[0, 0] m_energy_nj = data[-1, 3] - data[0, 3] # mV * nA * us = aJ (1e-18 J) -> use factor 1e-6 to get pJ (1e-12 J) - print('{:d} measurements in {:.2f} s = {:.0f} Hz sample rate'.format( - data_count, m_duration_us * 1e-6, data_count / (m_duration_us * 1e-6))) + print( + "{:d} measurements in {:.2f} s = {:.0f} Hz sample rate".format( + data_count, m_duration_us * 1e-6, data_count / (m_duration_us * 1e-6) + ) + ) - print('Reported energy: E = {:f} J'.format(m_energy_nj * 1e-9)) + print("Reported energy: E = {:f} J".format(m_energy_nj * 1e-9)) # nJ / us = mW -> (nJ * 1e-9) / (us * 1e-6) = W # Do not use power = data[:, 1] * data[:, 2] * 1e-12 here: nA values provided by the EnergyTrace library in data[:, 1] are heavily filtered and mostly # useless for visualization and calculation. They often do not agree with the nJ values in data[:, 3]. - power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ((data[1:, 0] - data[:-1, 0]) * 1e-6) - + power = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ( + (data[1:, 0] - data[:-1, 0]) * 1e-6 + ) - if 'threshold-peakcount' in opt: + if "threshold-peakcount" in opt: bs_mean = np.mean(power) # Finding the correct threshold is tricky. If #peaks < peakcont, our @@ -238,85 +256,134 @@ if __name__ == '__main__': # #peaks != peakcount and threshold >= mean, we go down. # If that doesn't work, we fall back to a linear search in 1 µW steps def direction_function(peakcount, power): - if peakcount == opt['threshold-peakcount']: - return 0; + if peakcount == opt["threshold-peakcount"]: + return 0 if power < bs_mean: - return 1; - return -1; + return 1 + return -1 + threshold = peak_search(power, np.min(power), np.max(power), direction_function) if threshold == None: - threshold = peak_search2(power, np.min(power), np.max(power), direction_function) + threshold = peak_search2( + power, np.min(power), np.max(power), direction_function + ) if threshold != None: - print('Threshold set to {:.0f} µW : {:.9f}'.format(threshold * 1e6, threshold)) - opt['threshold'] = threshold + print( + "Threshold set to {:.0f} µW : {:.9f}".format( + threshold * 1e6, threshold + ) + ) + opt["threshold"] = threshold else: - print('Found no working threshold') + print("Found no working threshold") - if 'threshold' in opt: - if opt['threshold'] == 'mean': - opt['threshold'] = np.mean(power) - print('Threshold set to {:.0f} µW : {:.9f}'.format(opt['threshold'] * 1e6, opt['threshold'])) + if "threshold" in opt: + if opt["threshold"] == "mean": + opt["threshold"] = np.mean(power) + print( + "Threshold set to {:.0f} µW : {:.9f}".format( + opt["threshold"] * 1e6, opt["threshold"] + ) + ) baseline_mean = 0 - if np.any(power < opt['threshold']): - baseline_mean = np.mean(power[power < opt['threshold']]) - print('Baseline mean: {:.0f} µW : {:.9f}'.format( - baseline_mean * 1e6, baseline_mean)) - if np.any(power >= opt['threshold']): - print('Peak mean: {:.0f} µW : {:.9f}'.format( - np.mean(power[power >= opt['threshold']]) * 1e6, - np.mean(power[power >= opt['threshold']]))) + if np.any(power < opt["threshold"]): + baseline_mean = np.mean(power[power < opt["threshold"]]) + print( + "Baseline mean: {:.0f} µW : {:.9f}".format( + baseline_mean * 1e6, baseline_mean + ) + ) + if np.any(power >= opt["threshold"]): + print( + "Peak mean: {:.0f} µW : {:.9f}".format( + np.mean(power[power >= opt["threshold"]]) * 1e6, + np.mean(power[power >= opt["threshold"]]), + ) + ) peaks = [] peak_start = -1 for i, dp in enumerate(power): - if dp >= opt['threshold'] and peak_start == -1: + if dp >= opt["threshold"] and peak_start == -1: peak_start = i - elif dp < opt['threshold'] and peak_start != -1: + elif dp < opt["threshold"] and peak_start != -1: peaks.append((peak_start, i)) peak_start = -1 total_energy = 0 delta_energy = 0 for peak in peaks: - duration = data[peak[1]-1, 0] - data[peak[0], 0] + duration = data[peak[1] - 1, 0] - data[peak[0], 0] total_energy += np.mean(power[peak[0] : peak[1]]) * duration - delta_energy += (np.mean(power[peak[0] : peak[1]]) - baseline_mean) * duration - print('{:.2f}ms peak ({:f} -> {:f})'.format(duration * 1000, - data[peak[0], 0], data[peak[1]-1, 0])) - print(' {:f} µJ / mean {:f} µW'.format( - np.mean(power[peak[0] : peak[1]]) * duration * 1e6, - np.mean(power[peak[0] : peak[1]]) * 1e6 )) - print('Peak energy mean: {:.0f} µJ : {:.9f}'.format( - total_energy * 1e6 / len(peaks), total_energy / len(peaks))) - print('Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}'.format( - delta_energy * 1e6 / len(peaks), delta_energy / len(peaks))) - - power_from_energy = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ((data[1:, 0] - data[:-1, 0]) * 1e-6) + delta_energy += ( + np.mean(power[peak[0] : peak[1]]) - baseline_mean + ) * duration + print( + "{:.2f}ms peak ({:f} -> {:f})".format( + duration * 1000, data[peak[0], 0], data[peak[1] - 1, 0] + ) + ) + print( + " {:f} µJ / mean {:f} µW".format( + np.mean(power[peak[0] : peak[1]]) * duration * 1e6, + np.mean(power[peak[0] : peak[1]]) * 1e6, + ) + ) + print( + "Peak energy mean: {:.0f} µJ : {:.9f}".format( + total_energy * 1e6 / len(peaks), total_energy / len(peaks) + ) + ) + print( + "Average per-peak energy (delta over baseline): {:.0f} µJ : {:.9f}".format( + delta_energy * 1e6 / len(peaks), delta_energy / len(peaks) + ) + ) + + power_from_energy = ((data[1:, 3] - data[:-1, 3]) * 1e-9) / ( + (data[1:, 0] - data[:-1, 0]) * 1e-6 + ) mean_power = running_mean(power_from_energy, 10) - if 'stat' in opt: + if "stat" in opt: mean_voltage = np.mean(data[:, 2] * 1e-3) mean_current = np.mean(data[:, 1] * 1e-9) mean_power = np.mean(data[:, 1] * data[:, 2] * 1e-12) - print('Mean voltage: {:.2f} V : {:.9f}'.format(mean_voltage, mean_voltage)) - print('Mean current: {:.0f} µA : {:.9f}'.format(mean_current * 1e6, mean_current)) - print('Mean power: {:.0f} µW : {:.9f}'.format(mean_power * 1e6, mean_power)) - print('Total energy: {:f} J : {:.9f}'.format(m_energy_nj * 1e-9, m_energy_nj * 1e-9)) - - if 'plot' in opt: + print( + "Mean voltage: {:.2f} V : {:.9f}".format(mean_voltage, mean_voltage) + ) + print( + "Mean current: {:.0f} µA : {:.9f}".format( + mean_current * 1e6, mean_current + ) + ) + print( + "Mean power: {:.0f} µW : {:.9f}".format(mean_power * 1e6, mean_power) + ) + print( + "Total energy: {:f} J : {:.9f}".format( + m_energy_nj * 1e-9, m_energy_nj * 1e-9 + ) + ) + + if "plot" in opt: # nA * mV = pW - energyhandle, = plt.plot(data[1:, 0] * 1e-6, power_from_energy, 'b-', label='P=ΔE/Δt', markersize=1) - meanhandle, = plt.plot(data[1:, 0] * 1e-6, mean_power, 'r-', label='mean(P, 10)', markersize=1) + (energyhandle,) = plt.plot( + data[1:, 0] * 1e-6, power_from_energy, "b-", label="P=ΔE/Δt", markersize=1 + ) + (meanhandle,) = plt.plot( + data[1:, 0] * 1e-6, mean_power, "r-", label="mean(P, 10)", markersize=1 + ) plt.legend(handles=[energyhandle, meanhandle]) - plt.xlabel('Time [s]') - plt.ylabel('Power [W]') + plt.xlabel("Time [s]") + plt.ylabel("Power [W]") plt.grid(True) - if 'load' in opt: - plt.title(opt['load']) + if "load" in opt: + plt.title(opt["load"]) plt.show() - if 'histogram' in opt: - plt.hist((data[1:, 3] - data[:-1, 3]) * 1e-9, bins=int(opt['histogram'])) + if "histogram" in opt: + plt.hist((data[1:, 3] - data[:-1, 3]) * 1e-9, bins=int(opt["histogram"])) plt.show() -- cgit v1.2.3