1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
#!/usr/bin/env python3
import logging
import numpy as np
from multiprocessing import Pool
from scipy import optimize
from .functions import analytic
from .utils import (
is_numeric,
param_slice_eq,
remove_index_from_tuple,
match_parameter_values,
aggregate_measures,
regression_measures,
)
logger = logging.getLogger(__name__)
class ParallelParamFit:
"""
Fit a set of functions on parameterized measurements.
One parameter is variale, all others are fixed. Reports the best-fitting
function type for each parameter.
"""
def __init__(self):
"""Create a new ParallelParamFit object."""
self.fit_queue = list()
def enqueue(self, key, param, args):
"""
Add state_or_tran/attribute/param_name to fit queue.
This causes fit() to compute the best-fitting function for this model part.
:param key: arbitrary key used to retrieve param result in `get_result`. Typically (state/transition name, model attribute).
Different parameter names may have the same key. Identical parameter names must have different keys.
:param param: parameter name
:param args: [by_param, param_index, safe_functions_enabled, param_filter]
by_param[(param 1, param2, ...)] holds measurements.
"""
self.fit_queue.append({"key": (key, param), "args": args})
def fit(self):
"""
Fit functions on previously enqueue data.
Fitting is one in parallel with one process per core.
Results can be accessed using the public ParallelParamFit.results object.
"""
with Pool() as pool:
self.results = pool.map(_try_fits_parallel, self.fit_queue)
def get_result(self, key):
"""
Parse and sanitize fit results.
Filters out results where the best function is worse (or not much better than) static mean/median estimates.
:param key: arbitrary key used in `enqueue`. Typically (state/transition name, model attribute).
:param param: parameter name
:returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
"""
fit_result = dict()
for result in self.results:
if (
result["key"][0] == key and result["result"]["best"] is not None
): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
this_result = result["result"]
if this_result["best_rmsd"] >= min(
this_result["mean_rmsd"], this_result["median_rmsd"]
):
logger.debug(
"Not modeling {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
result["key"][0],
result["key"][1],
this_result["best_rmsd"],
this_result["mean_rmsd"],
this_result["median_rmsd"],
)
)
# See notes on depends_on_param
elif this_result["best_rmsd"] >= 0.8 * min(
this_result["mean_rmsd"], this_result["median_rmsd"]
):
logger.debug(
"Not modeling {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
result["key"][0],
result["key"][1],
this_result["best_rmsd"],
this_result["mean_rmsd"],
this_result["median_rmsd"],
)
)
else:
fit_result[result["key"][1]] = this_result
return fit_result
def _try_fits_parallel(arg):
"""
Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
Must be a global function as it is called from a multiprocessing Pool.
"""
return {"key": arg["key"], "result": _try_fits(*arg["args"])}
def _try_fits(
n_by_param, param_index, safe_functions_enabled=False, param_filter: dict = None
):
"""
Determine goodness-of-fit for prediction of `n_by_param[(param1_value, param2_value, ...)]` dependence on `param_index` using various functions.
This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
Fitting is only performed if at least three distinct parameter values exist in `by_param[*]`.
:returns: a dictionary with the following elements:
best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
:param n_by_param: measurements of a specific model attribute partitioned by parameter values.
Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}`
:param param_index: index of the parameter used as model input
:param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
:param param_filter: Only use measurements whose parameters match param_filter for fitting.
"""
functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
for param_key in n_by_param.keys():
# We might remove elements from 'functions' while iterating over
# its keys. A generator will not allow this, so we need to
# convert to a list.
function_names = list(functions.keys())
for function_name in function_names:
function_object = functions[function_name]
if is_numeric(param_key[param_index]) and not function_object.is_valid(
param_key[param_index]
):
functions.pop(function_name, None)
raw_results = dict()
raw_results_by_param = dict()
ref_results = {"mean": list(), "median": list()}
results = dict()
results_by_param = dict()
seen_parameter_combinations = set()
# for each parameter combination:
for param_key in filter(
lambda x: remove_index_from_tuple(x, param_index)
not in seen_parameter_combinations
and len(n_by_param[x])
and match_parameter_values(n_by_param[x][0], param_filter),
n_by_param.keys(),
):
X = []
Y = []
num_valid = 0
num_total = 0
# Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
# the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
seen_parameter_combinations.add(remove_index_from_tuple(param_key, param_index))
# for each value of the parameter denoted by param_index (all other parameters remain the same):
for k, v in filter(
lambda kv: param_slice_eq(kv[0], param_key, param_index), n_by_param.items()
):
num_total += 1
if is_numeric(k[param_index]):
num_valid += 1
X.extend([float(k[param_index])] * len(v))
Y.extend(v)
if num_valid > 2:
X = np.array(X)
Y = np.array(Y)
other_parameters = remove_index_from_tuple(k, param_index)
raw_results_by_param[other_parameters] = dict()
results_by_param[other_parameters] = dict()
for function_name, param_function in functions.items():
if function_name not in raw_results:
raw_results[function_name] = dict()
error_function = param_function.error_function
res = optimize.least_squares(
error_function, [0, 1], args=(X, Y), xtol=2e-15
)
measures = regression_measures(param_function.eval(res.x, X), Y)
raw_results_by_param[other_parameters][function_name] = measures
for measure, error_rate in measures.items():
if measure not in raw_results[function_name]:
raw_results[function_name][measure] = list()
raw_results[function_name][measure].append(error_rate)
# print(function_name, res, measures)
mean_measures = aggregate_measures(np.mean(Y), Y)
ref_results["mean"].append(mean_measures["rmsd"])
raw_results_by_param[other_parameters]["mean"] = mean_measures
median_measures = aggregate_measures(np.median(Y), Y)
ref_results["median"].append(median_measures["rmsd"])
raw_results_by_param[other_parameters]["median"] = median_measures
if not len(ref_results["mean"]):
# Insufficient data for fitting
# print('[W] Insufficient data for fitting {}'.format(param_index))
return {"best": None, "best_rmsd": np.inf, "results": results}
for (
other_parameter_combination,
other_parameter_results,
) in raw_results_by_param.items():
best_fit_val = np.inf
best_fit_name = None
results = dict()
for function_name, result in other_parameter_results.items():
if len(result) > 0:
results[function_name] = result
rmsd = result["rmsd"]
if rmsd < best_fit_val:
best_fit_val = rmsd
best_fit_name = function_name
results_by_param[other_parameter_combination] = {
"best": best_fit_name,
"best_rmsd": best_fit_val,
"mean_rmsd": results["mean"]["rmsd"],
"median_rmsd": results["median"]["rmsd"],
"results": results,
}
best_fit_val = np.inf
best_fit_name = None
results = dict()
for function_name, result in raw_results.items():
if len(result) > 0:
results[function_name] = {}
for measure in result.keys():
results[function_name][measure] = np.mean(result[measure])
rmsd = results[function_name]["rmsd"]
if rmsd < best_fit_val:
best_fit_val = rmsd
best_fit_name = function_name
return {
"best": best_fit_name,
"best_rmsd": best_fit_val,
"mean_rmsd": np.mean(ref_results["mean"]),
"median_rmsd": np.mean(ref_results["median"]),
"results": results,
"results_by_other_param": results_by_param,
}
|