summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Friesel <daniel.friesel@uos.de>2021-03-02 14:05:43 +0100
committerDaniel Friesel <daniel.friesel@uos.de>2021-03-02 14:05:43 +0100
commit6e999370180f4060d1c9cb4cef4500570cb6d787 (patch)
treeb43f8e8463bff8bbff6813cf451503c371660c75
parent6383ef088e91273c68cd77e8215ad3920dc50ea1 (diff)
Move ParallelParamFit to a separate module
-rw-r--r--lib/model.py325
-rw-r--r--lib/paramfit.py261
-rw-r--r--lib/utils.py74
3 files changed, 338 insertions, 322 deletions
diff --git a/lib/model.py b/lib/model.py
index 24d5bc1..e97cfbb 100644
--- a/lib/model.py
+++ b/lib/model.py
@@ -3,332 +3,13 @@
import logging
import numpy as np
import os
-from scipy import optimize
-from sklearn.metrics import r2_score
-from multiprocessing import Pool
from .automata import PTA, ModelAttribute
-from .functions import analytic, StaticInfo
+from .functions import StaticInfo
from .parameters import ParallelParamStats
-from .utils import is_numeric, soft_cast_int, param_slice_eq, remove_index_from_tuple
-from .utils import by_name_to_by_param, match_parameter_values
+from .paramfit import ParallelParamFit
+from .utils import soft_cast_int, by_name_to_by_param, regression_measures
logger = logging.getLogger(__name__)
-arg_support_enabled = True
-
-
-def aggregate_measures(aggregate: float, actual: list) -> dict:
- """
- Calculate error measures for model value on data list.
-
- arguments:
- aggregate -- model value (float or int)
- actual -- real-world / reference values (list of float or int)
-
- return value:
- See regression_measures
- """
- aggregate_array = np.array([aggregate] * len(actual))
- return regression_measures(aggregate_array, np.array(actual))
-
-
-def regression_measures(predicted: np.ndarray, actual: np.ndarray):
- """
- Calculate error measures by comparing model values to reference values.
-
- arguments:
- predicted -- model values (np.ndarray)
- actual -- real-world / reference values (np.ndarray)
-
- Returns a dict containing the following measures:
- mae -- Mean Absolute Error
- mape -- Mean Absolute Percentage Error,
- if all items in actual are non-zero (NaN otherwise)
- smape -- Symmetric Mean Absolute Percentage Error,
- if no 0,0-pairs are present in actual and predicted (NaN otherwise)
- msd -- Mean Square Deviation
- rmsd -- Root Mean Square Deviation
- ssr -- Sum of Squared Residuals
- rsq -- R^2 measure, see sklearn.metrics.r2_score
- count -- Number of values
- """
- if type(predicted) != np.ndarray:
- raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
- if type(actual) != np.ndarray:
- raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
- deviations = predicted - actual
- # mean = np.mean(actual)
- if len(deviations) == 0:
- return {}
- measures = {
- "mae": np.mean(np.abs(deviations), dtype=np.float64),
- "msd": np.mean(deviations ** 2, dtype=np.float64),
- "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
- "ssr": np.sum(deviations ** 2, dtype=np.float64),
- "rsq": r2_score(actual, predicted),
- "count": len(actual),
- }
-
- # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
-
- if np.all(actual != 0):
- measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
- else:
- measures["mape"] = np.nan
- if np.all(np.abs(predicted) + np.abs(actual) != 0):
- measures["smape"] = (
- np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
- * 100
- )
- else:
- measures["smape"] = np.nan
- # if np.all(rsq_quotient != 0):
- # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
-
- return measures
-
-
-class ParallelParamFit:
- """
- Fit a set of functions on parameterized measurements.
-
- One parameter is variale, all others are fixed. Reports the best-fitting
- function type for each parameter.
- """
-
- def __init__(self):
- """Create a new ParallelParamFit object."""
- self.fit_queue = list()
-
- def enqueue(self, key, param, args):
- """
- Add state_or_tran/attribute/param_name to fit queue.
-
- This causes fit() to compute the best-fitting function for this model part.
-
- :param key: arbitrary key used to retrieve param result in `get_result`. Typically (state/transition name, model attribute).
- Different parameter names may have the same key. Identical parameter names must have different keys.
- :param param: parameter name
- :param args: [by_param, param_index, safe_functions_enabled, param_filter]
- by_param[(param 1, param2, ...)] holds measurements.
- """
- self.fit_queue.append({"key": (key, param), "args": args})
-
- def fit(self):
- """
- Fit functions on previously enqueue data.
-
- Fitting is one in parallel with one process per core.
-
- Results can be accessed using the public ParallelParamFit.results object.
- """
- with Pool() as pool:
- self.results = pool.map(_try_fits_parallel, self.fit_queue)
-
- def get_result(self, key):
- """
- Parse and sanitize fit results.
-
- Filters out results where the best function is worse (or not much better than) static mean/median estimates.
-
- :param key: arbitrary key used in `enqueue`. Typically (state/transition name, model attribute).
- :param param: parameter name
- :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
- """
- fit_result = dict()
- for result in self.results:
- if (
- result["key"][0] == key and result["result"]["best"] is not None
- ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
- this_result = result["result"]
- if this_result["best_rmsd"] >= min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
- result["key"][0],
- result["key"][1],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- # See notes on depends_on_param
- elif this_result["best_rmsd"] >= 0.8 * min(
- this_result["mean_rmsd"], this_result["median_rmsd"]
- ):
- logger.debug(
- "Not modeling {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
- result["key"][0],
- result["key"][1],
- this_result["best_rmsd"],
- this_result["mean_rmsd"],
- this_result["median_rmsd"],
- )
- )
- else:
- fit_result[result["key"][1]] = this_result
- return fit_result
-
-
-def _try_fits_parallel(arg):
- """
- Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
-
- Must be a global function as it is called from a multiprocessing Pool.
- """
- return {"key": arg["key"], "result": _try_fits(*arg["args"])}
-
-
-def _try_fits(
- n_by_param, param_index, safe_functions_enabled=False, param_filter: dict = None
-):
- """
- Determine goodness-of-fit for prediction of `n_by_param[(param1_value, param2_value, ...)]` dependence on `param_index` using various functions.
-
- This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
- The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
- Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
- Fitting is only performed if at least three distinct parameter values exist in `by_param[*]`.
-
- :returns: a dictionary with the following elements:
- best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
- best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
- mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
- median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
- results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
-
- :param n_by_param: measurements of a specific model attribute partitioned by parameter values.
- Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}`
-
- :param param_index: index of the parameter used as model input
- :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
- :param param_filter: Only use measurements whose parameters match param_filter for fitting.
- """
-
- functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
-
- for param_key in n_by_param.keys():
- # We might remove elements from 'functions' while iterating over
- # its keys. A generator will not allow this, so we need to
- # convert to a list.
- function_names = list(functions.keys())
- for function_name in function_names:
- function_object = functions[function_name]
- if is_numeric(param_key[param_index]) and not function_object.is_valid(
- param_key[param_index]
- ):
- functions.pop(function_name, None)
-
- raw_results = dict()
- raw_results_by_param = dict()
- ref_results = {"mean": list(), "median": list()}
- results = dict()
- results_by_param = dict()
-
- seen_parameter_combinations = set()
-
- # for each parameter combination:
- for param_key in filter(
- lambda x: remove_index_from_tuple(x, param_index)
- not in seen_parameter_combinations
- and len(n_by_param[x])
- and match_parameter_values(n_by_param[x][0], param_filter),
- n_by_param.keys(),
- ):
- X = []
- Y = []
- num_valid = 0
- num_total = 0
-
- # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
- # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
- seen_parameter_combinations.add(remove_index_from_tuple(param_key, param_index))
-
- # for each value of the parameter denoted by param_index (all other parameters remain the same):
- for k, v in filter(
- lambda kv: param_slice_eq(kv[0], param_key, param_index), n_by_param.items()
- ):
- num_total += 1
- if is_numeric(k[param_index]):
- num_valid += 1
- X.extend([float(k[param_index])] * len(v))
- Y.extend(v)
-
- if num_valid > 2:
- X = np.array(X)
- Y = np.array(Y)
- other_parameters = remove_index_from_tuple(k, param_index)
- raw_results_by_param[other_parameters] = dict()
- results_by_param[other_parameters] = dict()
- for function_name, param_function in functions.items():
- if function_name not in raw_results:
- raw_results[function_name] = dict()
- error_function = param_function.error_function
- res = optimize.least_squares(
- error_function, [0, 1], args=(X, Y), xtol=2e-15
- )
- measures = regression_measures(param_function.eval(res.x, X), Y)
- raw_results_by_param[other_parameters][function_name] = measures
- for measure, error_rate in measures.items():
- if measure not in raw_results[function_name]:
- raw_results[function_name][measure] = list()
- raw_results[function_name][measure].append(error_rate)
- # print(function_name, res, measures)
- mean_measures = aggregate_measures(np.mean(Y), Y)
- ref_results["mean"].append(mean_measures["rmsd"])
- raw_results_by_param[other_parameters]["mean"] = mean_measures
- median_measures = aggregate_measures(np.median(Y), Y)
- ref_results["median"].append(median_measures["rmsd"])
- raw_results_by_param[other_parameters]["median"] = median_measures
-
- if not len(ref_results["mean"]):
- # Insufficient data for fitting
- # print('[W] Insufficient data for fitting {}'.format(param_index))
- return {"best": None, "best_rmsd": np.inf, "results": results}
-
- for (
- other_parameter_combination,
- other_parameter_results,
- ) in raw_results_by_param.items():
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in other_parameter_results.items():
- if len(result) > 0:
- results[function_name] = result
- rmsd = result["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
- results_by_param[other_parameter_combination] = {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": results["mean"]["rmsd"],
- "median_rmsd": results["median"]["rmsd"],
- "results": results,
- }
-
- best_fit_val = np.inf
- best_fit_name = None
- results = dict()
- for function_name, result in raw_results.items():
- if len(result) > 0:
- results[function_name] = {}
- for measure in result.keys():
- results[function_name][measure] = np.mean(result[measure])
- rmsd = results[function_name]["rmsd"]
- if rmsd < best_fit_val:
- best_fit_val = rmsd
- best_fit_name = function_name
-
- return {
- "best": best_fit_name,
- "best_rmsd": best_fit_val,
- "mean_rmsd": np.mean(ref_results["mean"]),
- "median_rmsd": np.mean(ref_results["median"]),
- "results": results,
- "results_by_other_param": results_by_param,
- }
def _num_args_from_by_name(by_name):
diff --git a/lib/paramfit.py b/lib/paramfit.py
new file mode 100644
index 0000000..eed8eed
--- /dev/null
+++ b/lib/paramfit.py
@@ -0,0 +1,261 @@
+#!/usr/bin/env python3
+
+import logging
+import numpy as np
+from multiprocessing import Pool
+from scipy import optimize
+from .functions import analytic
+from .utils import (
+ is_numeric,
+ param_slice_eq,
+ remove_index_from_tuple,
+ match_parameter_values,
+ aggregate_measures,
+ regression_measures,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class ParallelParamFit:
+ """
+ Fit a set of functions on parameterized measurements.
+
+ One parameter is variale, all others are fixed. Reports the best-fitting
+ function type for each parameter.
+ """
+
+ def __init__(self):
+ """Create a new ParallelParamFit object."""
+ self.fit_queue = list()
+
+ def enqueue(self, key, param, args):
+ """
+ Add state_or_tran/attribute/param_name to fit queue.
+
+ This causes fit() to compute the best-fitting function for this model part.
+
+ :param key: arbitrary key used to retrieve param result in `get_result`. Typically (state/transition name, model attribute).
+ Different parameter names may have the same key. Identical parameter names must have different keys.
+ :param param: parameter name
+ :param args: [by_param, param_index, safe_functions_enabled, param_filter]
+ by_param[(param 1, param2, ...)] holds measurements.
+ """
+ self.fit_queue.append({"key": (key, param), "args": args})
+
+ def fit(self):
+ """
+ Fit functions on previously enqueue data.
+
+ Fitting is one in parallel with one process per core.
+
+ Results can be accessed using the public ParallelParamFit.results object.
+ """
+ with Pool() as pool:
+ self.results = pool.map(_try_fits_parallel, self.fit_queue)
+
+ def get_result(self, key):
+ """
+ Parse and sanitize fit results.
+
+ Filters out results where the best function is worse (or not much better than) static mean/median estimates.
+
+ :param key: arbitrary key used in `enqueue`. Typically (state/transition name, model attribute).
+ :param param: parameter name
+ :returns: dict with fit result (see `_try_fits`) for each successfully fitted parameter. E.g. {'param 1': {'best' : 'function name', ...} }
+ """
+ fit_result = dict()
+ for result in self.results:
+ if (
+ result["key"][0] == key and result["result"]["best"] is not None
+ ): # dürfte an ['best'] != None liegen-> Fit für gefilterten Kram schlägt fehl?
+ this_result = result["result"]
+ if this_result["best_rmsd"] >= min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} as function of {}: best ({:.0f}) is worse than ref ({:.0f}, {:.0f})".format(
+ result["key"][0],
+ result["key"][1],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ # See notes on depends_on_param
+ elif this_result["best_rmsd"] >= 0.8 * min(
+ this_result["mean_rmsd"], this_result["median_rmsd"]
+ ):
+ logger.debug(
+ "Not modeling {} as function of {}: best ({:.0f}) is not much better than ref ({:.0f}, {:.0f})".format(
+ result["key"][0],
+ result["key"][1],
+ this_result["best_rmsd"],
+ this_result["mean_rmsd"],
+ this_result["median_rmsd"],
+ )
+ )
+ else:
+ fit_result[result["key"][1]] = this_result
+ return fit_result
+
+
+def _try_fits_parallel(arg):
+ """
+ Call _try_fits(*arg['args']) and return arg['key'] and the _try_fits result.
+
+ Must be a global function as it is called from a multiprocessing Pool.
+ """
+ return {"key": arg["key"], "result": _try_fits(*arg["args"])}
+
+
+def _try_fits(
+ n_by_param, param_index, safe_functions_enabled=False, param_filter: dict = None
+):
+ """
+ Determine goodness-of-fit for prediction of `n_by_param[(param1_value, param2_value, ...)]` dependence on `param_index` using various functions.
+
+ This is done by varying `param_index` while keeping all other parameters constant and doing one least squares optimization for each function and for each combination of the remaining parameters.
+ The value of the parameter corresponding to `param_index` (e.g. txpower or packet length) is the sole input to the model function.
+ Only numeric parameter values (as determined by `utils.is_numeric`) are used for fitting, non-numeric values such as None or enum strings are ignored.
+ Fitting is only performed if at least three distinct parameter values exist in `by_param[*]`.
+
+ :returns: a dictionary with the following elements:
+ best -- name of the best-fitting function (see `analytic.functions`). `None` in case of insufficient data.
+ best_rmsd -- mean Root Mean Square Deviation of best-fitting function over all combinations of the remaining parameters
+ mean_rmsd -- mean Root Mean Square Deviation of a reference model using the mean of its respective input data as model value
+ median_rmsd -- mean Root Mean Square Deviation of a reference model using the median of its respective input data as model value
+ results -- mean goodness-of-fit measures for the individual functions. See `analytic.functions` for keys and `aggregate_measures` for values
+
+ :param n_by_param: measurements of a specific model attribute partitioned by parameter values.
+ Example: `{(0, 2): [2], (0, 4): [4], (0, 6): [6]}`
+
+ :param param_index: index of the parameter used as model input
+ :param safe_functions_enabled: Include "safe" variants of functions with limited argument range.
+ :param param_filter: Only use measurements whose parameters match param_filter for fitting.
+ """
+
+ functions = analytic.functions(safe_functions_enabled=safe_functions_enabled)
+
+ for param_key in n_by_param.keys():
+ # We might remove elements from 'functions' while iterating over
+ # its keys. A generator will not allow this, so we need to
+ # convert to a list.
+ function_names = list(functions.keys())
+ for function_name in function_names:
+ function_object = functions[function_name]
+ if is_numeric(param_key[param_index]) and not function_object.is_valid(
+ param_key[param_index]
+ ):
+ functions.pop(function_name, None)
+
+ raw_results = dict()
+ raw_results_by_param = dict()
+ ref_results = {"mean": list(), "median": list()}
+ results = dict()
+ results_by_param = dict()
+
+ seen_parameter_combinations = set()
+
+ # for each parameter combination:
+ for param_key in filter(
+ lambda x: remove_index_from_tuple(x, param_index)
+ not in seen_parameter_combinations
+ and len(n_by_param[x])
+ and match_parameter_values(n_by_param[x][0], param_filter),
+ n_by_param.keys(),
+ ):
+ X = []
+ Y = []
+ num_valid = 0
+ num_total = 0
+
+ # Ensure that each parameter combination is only optimized once. Otherwise, with parameters (1, 2, 5), (1, 3, 5), (1, 4, 5) and param_index == 1,
+ # the parameter combination (1, *, 5) would be optimized three times, both wasting time and biasing results towards more frequently occuring combinations of non-param_index parameters
+ seen_parameter_combinations.add(remove_index_from_tuple(param_key, param_index))
+
+ # for each value of the parameter denoted by param_index (all other parameters remain the same):
+ for k, v in filter(
+ lambda kv: param_slice_eq(kv[0], param_key, param_index), n_by_param.items()
+ ):
+ num_total += 1
+ if is_numeric(k[param_index]):
+ num_valid += 1
+ X.extend([float(k[param_index])] * len(v))
+ Y.extend(v)
+
+ if num_valid > 2:
+ X = np.array(X)
+ Y = np.array(Y)
+ other_parameters = remove_index_from_tuple(k, param_index)
+ raw_results_by_param[other_parameters] = dict()
+ results_by_param[other_parameters] = dict()
+ for function_name, param_function in functions.items():
+ if function_name not in raw_results:
+ raw_results[function_name] = dict()
+ error_function = param_function.error_function
+ res = optimize.least_squares(
+ error_function, [0, 1], args=(X, Y), xtol=2e-15
+ )
+ measures = regression_measures(param_function.eval(res.x, X), Y)
+ raw_results_by_param[other_parameters][function_name] = measures
+ for measure, error_rate in measures.items():
+ if measure not in raw_results[function_name]:
+ raw_results[function_name][measure] = list()
+ raw_results[function_name][measure].append(error_rate)
+ # print(function_name, res, measures)
+ mean_measures = aggregate_measures(np.mean(Y), Y)
+ ref_results["mean"].append(mean_measures["rmsd"])
+ raw_results_by_param[other_parameters]["mean"] = mean_measures
+ median_measures = aggregate_measures(np.median(Y), Y)
+ ref_results["median"].append(median_measures["rmsd"])
+ raw_results_by_param[other_parameters]["median"] = median_measures
+
+ if not len(ref_results["mean"]):
+ # Insufficient data for fitting
+ # print('[W] Insufficient data for fitting {}'.format(param_index))
+ return {"best": None, "best_rmsd": np.inf, "results": results}
+
+ for (
+ other_parameter_combination,
+ other_parameter_results,
+ ) in raw_results_by_param.items():
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in other_parameter_results.items():
+ if len(result) > 0:
+ results[function_name] = result
+ rmsd = result["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+ results_by_param[other_parameter_combination] = {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": results["mean"]["rmsd"],
+ "median_rmsd": results["median"]["rmsd"],
+ "results": results,
+ }
+
+ best_fit_val = np.inf
+ best_fit_name = None
+ results = dict()
+ for function_name, result in raw_results.items():
+ if len(result) > 0:
+ results[function_name] = {}
+ for measure in result.keys():
+ results[function_name][measure] = np.mean(result[measure])
+ rmsd = results[function_name]["rmsd"]
+ if rmsd < best_fit_val:
+ best_fit_val = rmsd
+ best_fit_name = function_name
+
+ return {
+ "best": best_fit_name,
+ "best_rmsd": best_fit_val,
+ "mean_rmsd": np.mean(ref_results["mean"]),
+ "median_rmsd": np.mean(ref_results["median"]),
+ "results": results,
+ "results_by_other_param": results_by_param,
+ }
diff --git a/lib/utils.py b/lib/utils.py
index b38a359..d3334c9 100644
--- a/lib/utils.py
+++ b/lib/utils.py
@@ -1,7 +1,10 @@
+#!/usr/bin/env python3
+
import json
import numpy as np
import re
import logging
+from sklearn.metrics import r2_score
arg_support_enabled = True
logger = logging.getLogger(__name__)
@@ -318,6 +321,77 @@ def detect_outliers_in_aggregate(aggregate, z_limit=10, remove_outliers=False):
)
+def aggregate_measures(aggregate: float, actual: list) -> dict:
+ """
+ Calculate error measures for model value on data list.
+
+ arguments:
+ aggregate -- model value (float or int)
+ actual -- real-world / reference values (list of float or int)
+
+ return value:
+ See regression_measures
+ """
+ aggregate_array = np.array([aggregate] * len(actual))
+ return regression_measures(aggregate_array, np.array(actual))
+
+
+def regression_measures(predicted: np.ndarray, actual: np.ndarray):
+ """
+ Calculate error measures by comparing model values to reference values.
+
+ arguments:
+ predicted -- model values (np.ndarray)
+ actual -- real-world / reference values (np.ndarray)
+
+ Returns a dict containing the following measures:
+ mae -- Mean Absolute Error
+ mape -- Mean Absolute Percentage Error,
+ if all items in actual are non-zero (NaN otherwise)
+ smape -- Symmetric Mean Absolute Percentage Error,
+ if no 0,0-pairs are present in actual and predicted (NaN otherwise)
+ msd -- Mean Square Deviation
+ rmsd -- Root Mean Square Deviation
+ ssr -- Sum of Squared Residuals
+ rsq -- R^2 measure, see sklearn.metrics.r2_score
+ count -- Number of values
+ """
+ if type(predicted) != np.ndarray:
+ raise ValueError("first arg must be ndarray, is {}".format(type(predicted)))
+ if type(actual) != np.ndarray:
+ raise ValueError("second arg must be ndarray, is {}".format(type(actual)))
+ deviations = predicted - actual
+ # mean = np.mean(actual)
+ if len(deviations) == 0:
+ return {}
+ measures = {
+ "mae": np.mean(np.abs(deviations), dtype=np.float64),
+ "msd": np.mean(deviations ** 2, dtype=np.float64),
+ "rmsd": np.sqrt(np.mean(deviations ** 2), dtype=np.float64),
+ "ssr": np.sum(deviations ** 2, dtype=np.float64),
+ "rsq": r2_score(actual, predicted),
+ "count": len(actual),
+ }
+
+ # rsq_quotient = np.sum((actual - mean)**2, dtype=np.float64) * np.sum((predicted - mean)**2, dtype=np.float64)
+
+ if np.all(actual != 0):
+ measures["mape"] = np.mean(np.abs(deviations / actual)) * 100 # bad measure
+ else:
+ measures["mape"] = np.nan
+ if np.all(np.abs(predicted) + np.abs(actual) != 0):
+ measures["smape"] = (
+ np.mean(np.abs(deviations) / ((np.abs(predicted) + np.abs(actual)) / 2))
+ * 100
+ )
+ else:
+ measures["smape"] = np.nan
+ # if np.all(rsq_quotient != 0):
+ # measures['rsq'] = (np.sum((actual - mean) * (predicted - mean), dtype=np.float64)**2) / rsq_quotient
+
+ return measures
+
+
class OptionalTimingAnalysis:
def __init__(self, enabled=True):
self.enabled = enabled