diff options
author | Daniel Friesel <derf@finalrewind.org> | 2018-02-20 12:29:09 +0100 |
---|---|---|
committer | Daniel Friesel <derf@finalrewind.org> | 2018-02-20 12:29:09 +0100 |
commit | 0bd0e95ede23063b3a18eb15888b61c73fa3a25d (patch) | |
tree | 9d6d7d3d8895b0a9edac842a887da68c3e7c6069 | |
parent | aa2e27cce9c4d70240408f9d5a85bbec36ee017d (diff) |
run fit checks in parallel
-rwxr-xr-x | lib/dfatool.py | 244 |
1 files changed, 157 insertions, 87 deletions
diff --git a/lib/dfatool.py b/lib/dfatool.py index 56eac4b..59446da 100755 --- a/lib/dfatool.py +++ b/lib/dfatool.py @@ -13,6 +13,8 @@ import sys import tarfile from multiprocessing import Pool +arg_support_enabled = False + def running_mean(x, N): cumsum = np.cumsum(np.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / N @@ -42,11 +44,17 @@ def float_or_nan(n): except ValueError: return np.nan -def _param_dict_to_list(param_dict): +def _elem_param_and_arg_list(elem): + param_dict = elem['parameter'] paramkeys = sorted(param_dict.keys()) paramvalue = [_soft_cast_int(param_dict[x]) for x in paramkeys] + if arg_support_enabled and 'args' in elem: + paramvalue.extend(map(_soft_cast_int, elem['args'])) return paramvalue +def _arg_name(arg_index): + return '~arg{:02}'.format(arg_index) + def append_if_set(aggregate, data, key): if key in data: aggregate.append(data[key]) @@ -256,6 +264,12 @@ class RawData: paramkeys = sorted(online_trace_part['parameter'].keys()) paramvalue = [_soft_cast_int(online_trace_part['parameter'][x]) for x in paramkeys] + # NB: Unscheduled transitions do not have an 'args' field set. + # However, they should only be caused by interrupts, and + # interrupts don't have args anyways. + if arg_support_enabled and 'args' in online_trace_part: + paramvalue.extend(map(_soft_cast_int, online_trace_part['args'])) + if not 'offline_aggregates' in online_trace_part: online_trace_part['offline_aggregates'] = { 'power' : [], @@ -269,7 +283,6 @@ class RawData: online_trace_part['offline_aggregates']['timeout'] = [] online_trace_part['offline_aggregates']['rel_energy_prev'] = [] online_trace_part['offline_aggregates']['rel_energy_next'] = [] - online_trace_part['offline_aggregates']['args'] = [] # Note: All state/transitions are 20us "too long" due to injected # active wait states. These are needed to work around MIMOSA's @@ -291,13 +304,6 @@ class RawData: offline_trace_part['uW_mean_delta_prev'] * (offline_trace_part['us'] - 20)) online_trace_part['offline_aggregates']['rel_energy_next'].append( offline_trace_part['uW_mean_delta_next'] * (offline_trace_part['us'] - 20)) - # Unscheduled transitions do not have an 'args' field set. - # However, they should only be caused by interrupts, and - # interrupts don't have args anyways. - if 'args' in online_trace_part: - online_trace_part['offline_aggregates']['args'].append(online_trace_part['args']) - else: - online_trace_part['offline_aggregates']['args'].append([]) def _concatenate_analyzed_traces(self): self.traces = [] @@ -377,19 +383,23 @@ class ParamFunction: class AnalyticFunction: - def __init__(self, function_str, num_vars, parameters): + def __init__(self, function_str, num_vars, parameters, num_args): self._parameter_names = parameters self._model_str = function_str rawfunction = function_str - self._dependson = [False] * len(parameters) + self._dependson = [False] * (len(parameters) + num_args) + + for i in range(0, num_args): + if rawfunction.find('parameter({})'.format(_arg_name(i))) >= 0: + rawfunction = rawfunction.replace('parameter({})'.format(_arg_name(i)), 'function_arg({:d})'.format(i)) for i in range(len(parameters)): if rawfunction.find('parameter({})'.format(parameters[i])) >= 0: self._dependson[i] = True rawfunction = rawfunction.replace('parameter({})'.format(parameters[i]), 'model_param[{:d}]'.format(i)) - for i in range(0, 0): + for i in range(0, num_args): if rawfunction.find('function_arg({:d})'.format(i)) >= 0: - self._dependson[i] = True + self._dependson[len(parameters) + i] = True rawfunction = rawfunction.replace('function_arg({:d})'.format(i), 'model_param[{:d}]'.format(len(parameters) + i)) for i in range(num_vars): rawfunction = rawfunction.replace('regression_arg({:d})'.format(i), 'reg_param[{:d}]'.format(i)) @@ -539,7 +549,7 @@ class analytic: return 'np.sqrt({})'.format(ref_str) return 'analytic._{}({})'.format(function_type, ref_str) - def function_powerset(function_descriptions, parameter_names): + def function_powerset(function_descriptions, parameter_names, num_args): buf = '0' arg_idx = 0 for combination in powerset(function_descriptions.items()): @@ -547,7 +557,7 @@ class analytic: arg_idx += 1 for function_item in combination: buf += ' * {}'.format(analytic._fmap('parameter', function_item[0], function_item[1]['best'])) - return AnalyticFunction(buf, arg_idx, parameter_names) + return AnalyticFunction(buf, arg_idx, parameter_names, num_args) #def function_powerset(function_descriptions): # function_buffer = lambda param, arg: 0 @@ -560,6 +570,74 @@ class analytic: # new_function = lambda param, arg: param[param_idx] * # function_buffer = lambda param, arg: function_buffer(param, arg) + +def _try_fits_parallel(arg): + return { + 'key' : arg['key'], + 'result' : _try_fits(*arg['args']) + } + + +def _try_fits(by_param, state_or_tran, model_attribute, param_index): + functions = analytic.functions() + + + for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()): + # We might remove elements from 'functions' while iterating over + # its keys. A generator will not allow this, so we need to + # convert to a list. + function_names = list(functions.keys()) + for function_name in function_names: + function_object = functions[function_name] + if is_numeric(param_key[1][param_index]) and not function_object.is_valid(param_key[1][param_index]): + functions.pop(function_name, None) + + raw_results = {} + results = {} + + for param_key in filter(lambda x: x[0] == state_or_tran, by_param.keys()): + X = [] + Y = [] + num_valid = 0 + num_total = 0 + for k, v in by_param.items(): + if _param_slice_eq(k, param_key, param_index): + num_total += 1 + if is_numeric(k[1][param_index]): + num_valid += 1 + X.extend([float(k[1][param_index])] * len(v[model_attribute])) + Y.extend(v[model_attribute]) + + if num_valid > 2: + X = np.array(X) + Y = np.array(Y) + for function_name, param_function in functions.items(): + raw_results[function_name] = {} + error_function = param_function.error_function + res = optimize.least_squares(error_function, [0, 1], args=(X, Y), xtol=2e-15) + measures = regression_measures(param_function.eval(res.x, X), Y) + for measure, error_rate in measures.items(): + if not measure in raw_results[function_name]: + raw_results[function_name][measure] = [] + raw_results[function_name][measure].append(error_rate) + #print(function_name, res, measures) + + best_fit_val = np.inf + best_fit_name = None + for function_name, result in raw_results.items(): + if len(result) > 0: + results[function_name] = {} + for measure in result.keys(): + results[function_name][measure] = np.mean(result[measure]) + rmsd = results[function_name]['rmsd'] + if rmsd < best_fit_val: + best_fit_val = rmsd + best_fit_name = function_name + + return { + 'best' : best_fit_name, + 'results' : results + } + class EnergyModel: def __init__(self, preprocessed_data): @@ -570,77 +648,37 @@ class EnergyModel: self.stats = {} np.seterr('raise') self._parameter_names = sorted(self.traces[0]['trace'][0]['parameter'].keys()) + self._num_args = {} for runidx, run in enumerate(self.traces): # if opts['ignore-trace-idx'] != runidx for i, elem in enumerate(run['trace']): if elem['name'] != 'UNINITIALIZED': self._load_run_elem(i, elem) + if elem['isa'] == 'transition' and not elem['name'] in self._num_args and 'args' in elem: + self._num_args[elem['name']] = len(elem['args']) self._aggregate_to_ndarray(self.by_name) for state_or_trans in self.by_name.keys(): for key in ['power', 'energy', 'duration', 'timeout', 'rel_energy_prev', 'rel_energy_next']: if key in self.by_name[state_or_trans]: self._compute_param_statistics(state_or_trans, key) - def _try_fits(self, state_or_tran, model_attribute, param_index): - functions = analytic.functions() - - - for param_key in filter(lambda x: x[0] == state_or_tran, self.by_param.keys()): - # We might remove elements from 'functions' while iterating over - # its keys. A generator will not allow this, so we need to - # convert to a list. - function_names = list(functions.keys()) - for function_name in function_names: - function_object = functions[function_name] - if is_numeric(param_key[1][param_index]) and not function_object.is_valid(param_key[1][param_index]): - functions.pop(function_name, None) - - raw_results = {} - results = {} - - for param_key in filter(lambda x: x[0] == state_or_tran, self.by_param.keys()): - X = [] - Y = [] - num_valid = 0 - num_total = 0 - for k, v in self.by_param.items(): - if _param_slice_eq(k, param_key, param_index): - num_total += 1 - if is_numeric(k[1][param_index]): - num_valid += 1 - X.extend([float(k[1][param_index])] * len(v[model_attribute])) - Y.extend(v[model_attribute]) - - if num_valid > 2: - X = np.array(X) - Y = np.array(Y) - for function_name, param_function in functions.items(): - raw_results[function_name] = {} - error_function = param_function.error_function - res = optimize.least_squares(error_function, [0, 1], args=(X, Y), xtol=2e-15) - measures = regression_measures(param_function.eval(res.x, X), Y) - for measure, error_rate in measures.items(): - if not measure in raw_results[function_name]: - raw_results[function_name][measure] = [] - raw_results[function_name][measure].append(error_rate) - #print(function_name, res, measures) - - best_fit_val = np.inf - best_fit_name = None - for function_name, result in raw_results.items(): - if len(result) > 0: - results[function_name] = {} - for measure in result.keys(): - results[function_name][measure] = np.mean(result[measure]) - rmsd = results[function_name]['rmsd'] - if rmsd < best_fit_val: - best_fit_val = rmsd - best_fit_name = function_name - - return { - 'best' : best_fit_name, - 'results' : results - } + @classmethod + def from_model(self, model_data, parameter_names): + self.by_name = {} + self.by_param = {} + self.stats = {} + np.seterr('raise') + self._parameter_names = parameter_names + for state_or_tran, values in model_data.items(): + for elem in values: + self._load_agg_elem(state_or_tran, elem) + #if elem['isa'] == 'transition' and not state_or_tran in self._num_args and 'args' in elem: + # self._num_args = len(elem['args']) + self._aggregate_to_ndarray(self.by_name) + for state_or_trans in self.by_name.keys(): + for key in ['power', 'energy', 'duration', 'timeout', 'rel_energy_prev', 'rel_energy_next']: + if key in self.by_name[state_or_trans]: + self._compute_param_statistics(state_or_trans, key) def _aggregate_to_ndarray(self, aggregate): for elem in aggregate.values(): @@ -660,18 +698,12 @@ class EnergyModel: aggregate[key][datakey].extend(dataval) def _load_agg_elem(self, name, elem): - args = [] - if 'args' in elem: - args = elem['args'] self._add_data_to_aggregate(self.by_name, name, elem) - self._add_data_to_aggregate(self.by_param, (name, tuple(elem['param']), tuple(args)), elem) + self._add_data_to_aggregate(self.by_param, (name, tuple(elem['param'])), elem) def _load_run_elem(self, i, elem): - args = [] - if 'args' in elem: - args = elem['args'] self._add_data_to_aggregate(self.by_name, elem['name'], elem) - self._add_data_to_aggregate(self.by_param, (elem['name'], tuple(_param_dict_to_list(elem['parameter'])), tuple(args)), elem) + self._add_data_to_aggregate(self.by_param, (elem['name'], tuple(_elem_param_and_arg_list(elem))), elem) def _compute_param_statistics(self, state_or_trans, key): if not state_or_trans in self.stats: @@ -689,6 +721,9 @@ class EnergyModel: for param_idx, param in enumerate(self._parameter_names): self.stats[state_or_trans][key]['std_by_param'][param] = self._mean_std_by_param(state_or_trans, key, param_idx) + if arg_support_enabled and self.by_name[state_or_trans]['isa'] == 'transition': + for arg_index in range(self._num_args[state_or_trans]): + self.stats[state_or_trans][key]['std_by_param'][_arg_name(arg_index)] = self._mean_std_by_param(state_or_trans, key, len(self._parameter_names) + arg_index) # returns the mean standard deviation of all measurements of 'what' # (e.g. power consumption or timeout) for state/transition 'name' where @@ -761,7 +796,8 @@ class EnergyModel: lut_model = self._get_model_from_dict(self.by_param, np.median) def lut_median_getter(name, key, param, arg = [], **kwargs): - return lut_model[(name, tuple(param), tuple(arg))][key] + param.extend(map(_soft_cast_int, arg)) + return lut_model[(name, tuple(param))][key] return lut_median_getter @@ -771,7 +807,9 @@ class EnergyModel: def get_fitted(self): static_model = self._get_model_from_dict(self.by_name, np.median) param_model = dict([[state_or_tran, {}] for state_or_tran in self.by_name.keys()]) + fit_queue = [] for state_or_tran in self.by_name.keys(): + num_args = 0 if self.by_name[state_or_tran]['isa'] == 'state': attributes = ['power'] else: @@ -780,10 +818,42 @@ class EnergyModel: fit_results = {} for parameter_index, parameter_name in enumerate(self._parameter_names): if self.param_dependence_ratio(state_or_tran, model_attribute, parameter_name) > 0.5: - fit_results[parameter_name] = self._try_fits(state_or_tran, model_attribute, parameter_index) + fit_queue.append({ + 'key' : [state_or_tran, model_attribute, parameter_name], + 'args' : [self.by_param, state_or_tran, model_attribute, parameter_index] + }) + #fit_results[parameter_name] = _try_fits(self.by_param, state_or_tran, model_attribute, parameter_index) #print('{} {} is {}'.format(state_or_tran, parameter_name, fit_results[parameter_name]['best'])) + if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition': + num_args = self._num_args[state_or_tran] + for arg_index in range(self._num_args[state_or_tran]): + if self.param_dependence_ratio(state_or_tran, model_attribute, _arg_name(arg_index)) > 0.5: + fit_queue.append({ + 'key' : [state_or_tran, model_attribute, _arg_name(arg_index)], + 'args' : [self.by_param, state_or_tran, model_attribute, len(self._parameter_names) + arg_index] + }) + #fit_results[_arg_name(arg_index)] = _try_fits(self.by_param, state_or_tran, model_attribute, len(self._parameter_names) + arg_index) + #if 'args' in self.by_name[state_or_tran]: + # for i, arg in range(len(self.by_name + with Pool() as pool: + all_fit_results = pool.map(_try_fits_parallel, fit_queue) + + for state_or_tran in self.by_name.keys(): + num_args = 0 + if arg_support_enabled and self.by_name[state_or_tran]['isa'] == 'transition': + num_args = self._num_args[state_or_tran] + if self.by_name[state_or_tran]['isa'] == 'state': + attributes = ['power'] + else: + attributes = ['energy', 'duration', 'timeout', 'rel_energy_prev', 'rel_energy_next'] + for model_attribute in attributes: + fit_results = {} + for result in all_fit_results: + if result['key'][0] == state_or_tran and result['key'][1] == model_attribute: + fit_results[result['key'][2]] = result['result'] + if len(fit_results.keys()): - x = analytic.function_powerset(fit_results, self._parameter_names) + x = analytic.function_powerset(fit_results, self._parameter_names, num_args) x.fit(self.by_param, state_or_tran, model_attribute) param_model[state_or_tran][model_attribute] = { 'fit_result': fit_results, @@ -825,7 +895,7 @@ class EnergyModel: results[name]['power'] = measures else: for key in ['duration', 'energy', 'rel_energy_prev', 'rel_energy_next', 'timeout']: - predicted_data = np.array(list(map(lambda i: model_function(name, key, param=elem['param'][i], arg=elem['args'][i]), range(len(elem[key]))))) + predicted_data = np.array(list(map(lambda i: model_function(name, key, param=elem['param'][i]), range(len(elem[key]))))) measures = regression_measures(predicted_data, elem[key]) results[name][key] = measures return results |