diff options
author | Daniel Friesel <daniel.friesel@uos.de> | 2022-01-27 10:20:17 +0100 |
---|---|---|
committer | Daniel Friesel <daniel.friesel@uos.de> | 2022-01-27 10:20:17 +0100 |
commit | 937bcec1ed1bd379c226aea5eb8ce5ec95264703 (patch) | |
tree | d9da5b2102481e1916808031ce31d28fbe8980d5 /lib/lineartree/_classes.py | |
parent | e149c6bc24935ff8383471759c8775d3174ec29d (diff) |
add LMT support via https://github.com/cerlymarco/linear-tree
Diffstat (limited to 'lib/lineartree/_classes.py')
-rw-r--r-- | lib/lineartree/_classes.py | 1246 |
1 files changed, 1246 insertions, 0 deletions
diff --git a/lib/lineartree/_classes.py b/lib/lineartree/_classes.py new file mode 100644 index 0000000..83385c3 --- /dev/null +++ b/lib/lineartree/_classes.py @@ -0,0 +1,1246 @@ +#!/usr/bin/env python3 +# Copyright (c) 2021 Marco Cerliani, MIT License <https://github.com/cerlymarco/linear-tree> + +import numbers +import numpy as np +import scipy.sparse as sp + +from copy import deepcopy +from joblib import Parallel, effective_n_jobs # , delayed + +from sklearn.dummy import DummyClassifier +from sklearn.tree import DecisionTreeRegressor +from sklearn.ensemble import RandomForestRegressor + +from sklearn.base import is_regressor +from sklearn.base import BaseEstimator, TransformerMixin + +from sklearn.utils import check_array +from sklearn.utils.validation import has_fit_parameter, check_is_fitted + +from ._criterion import SCORING +from ._criterion import mse, rmse, mae, poisson +from ._criterion import hamming, crossentropy + +import sklearn + +_sklearn_v1 = eval(sklearn.__version__.split(".")[0]) > 0 + + +CRITERIA = { + "mse": mse, + "rmse": rmse, + "mae": mae, + "poisson": poisson, + "hamming": hamming, + "crossentropy": crossentropy, +} + + +######################################################################### +### remove when https://github.com/joblib/joblib/issues/1071 is fixed ### +######################################################################### +from sklearn import get_config, config_context +from functools import update_wrapper +import functools + +# from sklearn.utils.fixes +def delayed(function): + """Decorator used to capture the arguments of a function.""" + + @functools.wraps(function) + def delayed_function(*args, **kwargs): + return _FuncWrapper(function), args, kwargs + + return delayed_function + + +# from sklearn.utils.fixes +class _FuncWrapper: + """ "Load the global configuration before calling the function.""" + + def __init__(self, function): + self.function = function + self.config = get_config() + update_wrapper(self, self.function) + + def __call__(self, *args, **kwargs): + with config_context(**self.config): + return self.function(*args, **kwargs) + + +######################################################################### +######################################################################### +######################################################################### + + +def _partition_columns(columns, n_jobs): + """Private function to partition columns splitting between jobs.""" + # Compute the number of jobs + n_columns = len(columns) + n_jobs = min(effective_n_jobs(n_jobs), n_columns) + + # Partition columns between jobs + n_columns_per_job = np.full(n_jobs, n_columns // n_jobs, dtype=int) + n_columns_per_job[: n_columns % n_jobs] += 1 + columns_per_job = np.cumsum(n_columns_per_job) + columns_per_job = np.split(columns, columns_per_job) + columns_per_job = columns_per_job[:-1] + + return n_jobs, columns_per_job + + +def _parallel_binning_fit( + split_feat, _self, X, y, weights, support_sample_weight, bins, loss +): + """Private function to find the best column splittings within a job.""" + n_sample, n_feat = X.shape + feval = CRITERIA[_self.criterion] + + split_t = None + split_col = None + left_node = (None, None, None, None) + right_node = (None, None, None, None) + largs_left = {"classes": None} + largs_right = {"classes": None} + + if n_sample < _self._min_samples_split: + return loss, split_t, split_col, left_node, right_node + + for col, _bin in zip(split_feat, bins): + + for q in _bin: + + # create 1D bool mask for right/left children + mask = X[:, col] > q + + n_left, n_right = (~mask).sum(), mask.sum() + + if n_left < _self._min_samples_leaf or n_right < _self._min_samples_leaf: + continue + + # create 2D bool mask for right/left children + left_mesh = np.ix_(~mask, _self._linear_features) + right_mesh = np.ix_(mask, _self._linear_features) + + model_left = deepcopy(_self.base_estimator) + model_right = deepcopy(_self.base_estimator) + + if hasattr(_self, "classes_"): + largs_left["classes"] = np.unique(y[~mask]) + largs_right["classes"] = np.unique(y[mask]) + if len(largs_left["classes"]) == 1: + model_left = DummyClassifier(strategy="most_frequent") + if len(largs_right["classes"]) == 1: + model_right = DummyClassifier(strategy="most_frequent") + + if weights is None: + + model_left.fit(X[left_mesh], y[~mask]) + loss_left = feval(model_left, X[left_mesh], y[~mask], **largs_left) + wloss_left = loss_left * (n_left / n_sample) + + model_right.fit(X[right_mesh], y[mask]) + loss_right = feval(model_right, X[right_mesh], y[mask], **largs_right) + wloss_right = loss_right * (n_right / n_sample) + + else: + + if support_sample_weight: + + model_left.fit(X[left_mesh], y[~mask], sample_weight=weights[~mask]) + + model_right.fit(X[right_mesh], y[mask], sample_weight=weights[mask]) + + else: + + model_left.fit(X[left_mesh], y[~mask]) + + model_right.fit(X[right_mesh], y[mask]) + + loss_left = feval( + model_left, + X[left_mesh], + y[~mask], + weights=weights[~mask], + **largs_left + ) + wloss_left = loss_left * (weights[~mask].sum() / weights.sum()) + + loss_right = feval( + model_right, + X[right_mesh], + y[mask], + weights=weights[mask], + **largs_right + ) + wloss_right = loss_right * (weights[mask].sum() / weights.sum()) + + total_loss = wloss_left + wloss_right + + # store if best + if total_loss < loss: + split_t = q + split_col = col + loss = total_loss + left_node = ( + model_left, + loss_left, + wloss_left, + n_left, + largs_left["classes"], + ) + right_node = ( + model_right, + loss_right, + wloss_right, + n_right, + largs_right["classes"], + ) + + return loss, split_t, split_col, left_node, right_node + + +def _map_node(X, feat, direction, split): + """Utility to map samples to nodes""" + if direction == "L": + mask = X[:, feat] <= split + else: + mask = X[:, feat] > split + + return mask + + +def _predict_branch(X, branch_history, mask=None): + """Utility to map samples to branches""" + + if mask is None: + mask = np.repeat(True, X.shape[0]) + + for node in branch_history: + mask = np.logical_and(_map_node(X, *node), mask) + + return mask + + +class Node: + def __init__( + self, + id=None, + threshold=[], + parent=None, + children=None, + n_samples=None, + w_loss=None, + loss=None, + model=None, + classes=None, + ): + self.id = id + self.threshold = threshold + self.parent = parent + self.children = children + self.n_samples = n_samples + self.w_loss = w_loss + self.loss = loss + self.model = model + self.classes = classes + + +class _LinearTree(BaseEstimator): + """Base class for Linear Tree meta-estimator. + + Warning: This class should not be used directly. Use derived classes + instead. + """ + + def __init__( + self, + base_estimator, + *, + criterion, + max_depth, + min_samples_split, + min_samples_leaf, + max_bins, + categorical_features, + split_features, + linear_features, + n_jobs + ): + + self.base_estimator = base_estimator + self.criterion = criterion + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.max_bins = max_bins + self.categorical_features = categorical_features + self.split_features = split_features + self.linear_features = linear_features + self.n_jobs = n_jobs + + def _parallel_args(self): + return {} + + def _split(self, X, y, bins, support_sample_weight, weights=None, loss=None): + """Evaluate optimal splits in a given node (in a specific partition of + X and y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + + y : array-like of shape (n_samples, ) + The target values (class labels in classification, real numbers in + regression). + + bins : array-like of shape (max_bins - 2, ) + The bins to use to find an optimal split. Expressed as percentiles. + + support_sample_weight : bool + Whether the estimator's fit method supports sample_weight. + + weights : array-like of shape (n_samples, ), default=None + Sample weights. If None, then samples are equally weighted. + Note that if the base estimator does not support sample weighting, + the sample weights are still used to evaluate the splits. + + loss : float, default=None + The loss of the parent node. A split is computed if the weighted + loss sum of the two children is lower than the loss of the parent. + A None value implies the first fit on all the data to evaluate + the benefits of possible future splits. + + Returns + ------- + self : object + """ + # Parallel loops + n_jobs, split_feat = _partition_columns(self._split_features, self.n_jobs) + + # partition columns splittings between jobs + all_results = Parallel(n_jobs=n_jobs, verbose=0, **self._parallel_args())( + delayed(_parallel_binning_fit)( + feat, + self, + X, + y, + weights, + support_sample_weight, + [bins[i] for i in feat], + loss, + ) + for feat in split_feat + ) + + # extract results from parallel loops + _losses, split_t, split_col = [], [], [] + left_node, right_node = [], [] + for job_res in all_results: + _losses.append(job_res[0]) + split_t.append(job_res[1]) + split_col.append(job_res[2]) + left_node.append(job_res[3]) + right_node.append(job_res[4]) + + # select best results + _id_best = np.argmin(_losses) + if _losses[_id_best] < loss: + split_t = split_t[_id_best] + split_col = split_col[_id_best] + left_node = left_node[_id_best] + right_node = right_node[_id_best] + else: + split_t = None + split_col = None + left_node = (None, None, None, None, None) + right_node = (None, None, None, None, None) + + return split_t, split_col, left_node, right_node + + def _grow(self, X, y, weights=None): + """Grow and prune a Linear Tree from the training set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + + y : array-like of shape (n_samples, ) + The target values (class labels in classification, real numbers in + regression). + + weights : array-like of shape (n_samples, ), default=None + Sample weights. If None, then samples are equally weighted. + Note that if the base estimator does not support sample weighting, + the sample weights are still used to evaluate the splits. + + Returns + ------- + self : object + """ + n_sample, self.n_features_in_ = X.shape + self.feature_importances_ = np.zeros((self.n_features_in_,)) + + # extract quantiles + bins = np.linspace(0, 1, self.max_bins)[1:-1] + bins = np.quantile(X, bins, axis=0, interpolation="midpoint") + bins = list(bins.T) + bins = [ + np.unique(X[:, c]) if c in self._categorical_features else np.unique(q) + for c, q in enumerate(bins) + ] + + # check if base_estimator supports fitting with sample_weights + support_sample_weight = has_fit_parameter(self.base_estimator, "sample_weight") + + queue = [""] # queue of the nodes to evaluate for splitting + # store the results of each node in dicts + self._nodes = {} + self._leaves = {} + + # initialize first fit + largs = {"classes": None} + model = deepcopy(self.base_estimator) + if weights is None or not support_sample_weight: + model.fit(X[:, self._linear_features], y) + else: + model.fit(X[:, self._linear_features], y, sample_weight=weights) + + if hasattr(self, "classes_"): + largs["classes"] = self.classes_ + + loss = CRITERIA[self.criterion]( + model, X[:, self._linear_features], y, weights=weights, **largs + ) + + self._nodes[""] = Node( + id=0, n_samples=n_sample, model=model, loss=loss, classes=largs["classes"] + ) + + # in the beginning consider all the samples + start = np.repeat(True, n_sample) + mask = start.copy() + + i = 1 + while len(queue) > 0: + + if weights is None: + split_t, split_col, left_node, right_node = self._split( + X[mask], y[mask], bins, support_sample_weight, loss=loss + ) + else: + split_t, split_col, left_node, right_node = self._split( + X[mask], + y[mask], + bins, + support_sample_weight, + weights[mask], + loss=loss, + ) + + # no utility in splitting + if split_col is None or len(queue[-1]) >= self.max_depth: + self._leaves[queue[-1]] = self._nodes[queue[-1]] + del self._nodes[queue[-1]] + queue.pop() + + else: + + model_left, loss_left, wloss_left, n_left, class_left = left_node + model_right, loss_right, wloss_right, n_right, class_right = right_node + self.feature_importances_[split_col] += loss - wloss_left - wloss_right + + self._nodes[queue[-1] + "L"] = Node( + id=i, + parent=queue[-1], + model=model_left, + loss=loss_left, + w_loss=wloss_left, + n_samples=n_left, + threshold=self._nodes[queue[-1]].threshold[:] + + [(split_col, "L", split_t)], + ) + + self._nodes[queue[-1] + "R"] = Node( + id=i + 1, + parent=queue[-1], + model=model_right, + loss=loss_right, + w_loss=wloss_right, + n_samples=n_right, + threshold=self._nodes[queue[-1]].threshold[:] + + [(split_col, "R", split_t)], + ) + + if hasattr(self, "classes_"): + self._nodes[queue[-1] + "L"].classes = class_left + self._nodes[queue[-1] + "R"].classes = class_right + + self._nodes[queue[-1]].children = (queue[-1] + "L", queue[-1] + "R") + + i += 2 + q = queue[-1] + queue.pop() + queue.extend([q + "R", q + "L"]) + + if len(queue) > 0: + loss = self._nodes[queue[-1]].loss + mask = _predict_branch( + X, self._nodes[queue[-1]].threshold, start.copy() + ) + + self.node_count = i + + return self + + def _fit(self, X, y, sample_weight=None): + """Build a Linear Tree of a linear estimator from the training + set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + + y : array-like of shape (n_samples, ) or also (n_samples, n_targets) for + multitarget regression. + The target values (class labels in classification, real numbers in + regression). + + sample_weight : array-like of shape (n_samples, ), default=None + Sample weights. If None, then samples are equally weighted. + Note that if the base estimator does not support sample weighting, + the sample weights are still used to evaluate the splits. + + Returns + ------- + self : object + """ + n_sample, n_feat = X.shape + + if isinstance(self.min_samples_split, numbers.Integral): + if self.min_samples_split < 6: + raise ValueError( + "min_samples_split must be an integer greater than 5 or " + "a float in (0.0, 1.0); got the integer {}".format( + self.min_samples_split + ) + ) + self._min_samples_split = self.min_samples_split + else: + if not 0.0 < self.min_samples_split < 1.0: + raise ValueError( + "min_samples_split must be an integer greater than 5 or " + "a float in (0.0, 1.0); got the float {}".format( + self.min_samples_split + ) + ) + + self._min_samples_split = int(np.ceil(self.min_samples_split * n_sample)) + self._min_samples_split = max(6, self._min_samples_split) + + if isinstance(self.min_samples_leaf, numbers.Integral): + if self.min_samples_leaf < 3: + raise ValueError( + "min_samples_leaf must be an integer greater than 2 or " + "a float in (0.0, 1.0); got the integer {}".format( + self.min_samples_leaf + ) + ) + self._min_samples_leaf = self.min_samples_leaf + else: + if not 0.0 < self.min_samples_leaf < 1.0: + raise ValueError( + "min_samples_leaf must be an integer greater than 2 or " + "a float in (0.0, 1.0); got the float {}".format( + self.min_samples_leaf + ) + ) + + self._min_samples_leaf = int(np.ceil(self.min_samples_leaf * n_sample)) + self._min_samples_leaf = max(3, self._min_samples_leaf) + + if not 1 <= self.max_depth <= 20: + raise ValueError("max_depth must be an integer in [1, 20].") + + if not 10 <= self.max_bins <= 120: + raise ValueError("max_bins must be an integer in [10, 120].") + + if not hasattr(self.base_estimator, "fit_intercept"): + raise ValueError( + "Only linear models are accepted as base_estimator. " + "Select one from linear_model class of scikit-learn." + ) + + if self.categorical_features is not None: + cat_features = np.unique(self.categorical_features) + + if not issubclass(cat_features.dtype.type, numbers.Integral): + raise ValueError( + "No valid specification of categorical columns. " + "Only a scalar, list or array-like of integers is allowed." + ) + + if (cat_features < 0).any() or (cat_features >= n_feat).any(): + raise ValueError( + "Categorical features must be in [0, {}].".format(n_feat - 1) + ) + + if len(cat_features) == n_feat: + raise ValueError( + "Only categorical features detected. " + "No features available for fitting." + ) + else: + cat_features = [] + self._categorical_features = cat_features + + if self.split_features is not None: + split_features = np.unique(self.split_features) + + if not issubclass(split_features.dtype.type, numbers.Integral): + raise ValueError( + "No valid specification of split_features. " + "Only a scalar, list or array-like of integers is allowed." + ) + + if (split_features < 0).any() or (split_features >= n_feat).any(): + raise ValueError( + "Splitting features must be in [0, {}].".format(n_feat - 1) + ) + else: + split_features = np.arange(n_feat) + self._split_features = split_features + + if self.linear_features is not None: + linear_features = np.unique(self.linear_features) + + if not issubclass(linear_features.dtype.type, numbers.Integral): + raise ValueError( + "No valid specification of linear_features. " + "Only a scalar, list or array-like of integers is allowed." + ) + + if (linear_features < 0).any() or (linear_features >= n_feat).any(): + raise ValueError( + "Linear features must be in [0, {}].".format(n_feat - 1) + ) + + if np.isin(linear_features, cat_features).any(): + raise ValueError("Linear features cannot be categorical features.") + else: + linear_features = np.setdiff1d(np.arange(n_feat), cat_features) + self._linear_features = linear_features + + self._grow(X, y, sample_weight) + + normalizer = np.sum(self.feature_importances_) + if normalizer > 0: + self.feature_importances_ /= normalizer + + return self + + def summary(self, feature_names=None, only_leaves=False, max_depth=None): + """Return a summary of nodes created from model fitting. + + Parameters + ---------- + feature_names : array-like of shape (n_features, ), default=None + Names of each of the features. If None, generic names + will be used (“X[0]”, “X[1]”, …). + + only_leaves : bool, default=False + Store only information of leaf nodes. + + max_depth : int, default=None + The maximum depth of the representation. If None, the tree + is fully generated. + + Returns + ------- + summary : nested dict + The keys are the integer map of each node. + The values are dicts containing information for that node: + + - 'col' (^): column used for splitting; + - 'th' (^): threshold value used for splitting in the + selected column; + - 'loss': loss computed at node level. Weighted sum of + children' losses if it is a splitting node; + - 'samples': number of samples in the node. Sum of children' + samples if it is a split node; + - 'children' (^): integer mapping of possible children nodes; + - 'models': fitted linear models built in each split. + Single model if it is leaf node; + - 'classes' (^^): target classes detected in the split. + Available only for LinearTreeClassifier. + + (^): Only for split nodes. + (^^): Only for leaf nodes. + """ + check_is_fitted(self, attributes="_nodes") + + if max_depth is None: + max_depth = 20 + if max_depth < 1: + raise ValueError("max_depth must be > 0, got {}".format(max_depth)) + + summary = {} + + if len(self._nodes) > 0 and not only_leaves: + + if feature_names is not None and len(feature_names) != self.n_features_in_: + raise ValueError( + "feature_names must contain {} elements, got {}".format( + self.n_features_in_, len(feature_names) + ) + ) + + if feature_names is None: + feature_names = np.arange(self.n_features_in_) + + for n, N in self._nodes.items(): + + if len(n) >= max_depth: + continue + + cl, cr = N.children + Cl = self._nodes[cl] if cl in self._nodes else self._leaves[cl] + Cr = self._nodes[cr] if cr in self._nodes else self._leaves[cr] + + summary[N.id] = { + "col": feature_names[Cl.threshold[-1][0]], + "th": round(Cl.threshold[-1][-1], 4), + "loss": round(Cl.w_loss + Cr.w_loss, 4), + "samples": Cl.n_samples + Cr.n_samples, + "children": (Cl.id, Cr.id), + "models": (Cl.model, Cr.model), + } + + for l, L in self._leaves.items(): + + if len(l) > max_depth: + continue + + summary[L.id] = { + "loss": round(L.loss, 4), + "samples": L.n_samples, + "models": L.model, + } + + if hasattr(self, "classes_"): + summary[L.id]["classes"] = L.classes + + return summary + + def apply(self, X): + """Return the index of the leaf that each sample is predicted as. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Samples. + + Returns + ------- + X_leaves : array-like of shape (n_samples, ) + For each datapoint x in X, return the index of the leaf x + ends up in. Leaves are numbered within + ``[0; n_nodes)``, possibly with gaps in the + numbering. + """ + check_is_fitted(self, attributes="_nodes") + + X = check_array(X, accept_sparse=False, dtype=None, force_all_finite=False) + self._check_n_features(X, reset=False) + + X_leaves = np.zeros(X.shape[0], dtype="int64") + + for L in self._leaves.values(): + + mask = _predict_branch(X, L.threshold) + if (~mask).all(): + continue + + X_leaves[mask] = L.id + + return X_leaves + + def decision_path(self, X): + """Return the decision path in the tree. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Samples. + + Returns + ------- + indicator : sparse matrix of shape (n_samples, n_nodes) + Return a node indicator CSR matrix where non zero elements + indicates that the samples goes through the nodes. + """ + check_is_fitted(self, attributes="_nodes") + + X = check_array(X, accept_sparse=False, dtype=None, force_all_finite=False) + self._check_n_features(X, reset=False) + + indicator = np.zeros((X.shape[0], self.node_count), dtype="int64") + + for L in self._leaves.values(): + + mask = _predict_branch(X, L.threshold) + if (~mask).all(): + continue + + n = L.id + p = L.parent + paths_id = [n] + + while p is not None: + n = self._nodes[p].id + p = self._nodes[p].parent + paths_id.append(n) + + indicator[np.ix_(mask, paths_id)] = 1 + + return sp.csr_matrix(indicator) + + def model_to_dot(self, feature_names=None, max_depth=None): + """Convert a fitted Linear Tree model to dot format. + It results in ModuleNotFoundError if graphviz or pydot are not available. + When installing graphviz make sure to add it to the system path. + + Parameters + ---------- + feature_names : array-like of shape (n_features, ), default=None + Names of each of the features. If None, generic names + will be used (“X[0]”, “X[1]”, …). + + max_depth : int, default=None + The maximum depth of the representation. If None, the tree + is fully generated. + + Returns + ------- + graph : pydot.Dot instance + Return an instance representing the Linear Tree. Splitting nodes have + a rectangular shape while leaf nodes have a circular one. + """ + import pydot + + summary = self.summary(feature_names=feature_names, max_depth=max_depth) + graph = pydot.Dot("linear_tree", graph_type="graph") + + # create nodes + for n in summary: + if "col" in summary[n]: + if isinstance(summary[n]["col"], str): + msg = "id_node: {}\n{} <= {}\nloss: {:.4f}\nsamples: {}" + else: + msg = "id_node: {}\nX[{}] <= {}\nloss: {:.4f}\nsamples: {}" + + msg = msg.format( + n, + summary[n]["col"], + summary[n]["th"], + summary[n]["loss"], + summary[n]["samples"], + ) + graph.add_node(pydot.Node(n, label=msg, shape="rectangle")) + + for c in summary[n]["children"]: + if c not in summary: + graph.add_node(pydot.Node(c, label="...", shape="rectangle")) + + else: + msg = "id_node: {}\nloss: {:.4f}\nsamples: {}".format( + n, summary[n]["loss"], summary[n]["samples"] + ) + graph.add_node(pydot.Node(n, label=msg)) + + # add edges + for n in summary: + if "children" in summary[n]: + for c in summary[n]["children"]: + graph.add_edge(pydot.Edge(n, c)) + + return graph + + def plot_model(self, feature_names=None, max_depth=None): + """Convert a fitted Linear Tree model to dot format and display it. + It results in ModuleNotFoundError if graphviz or pydot are not available. + When installing graphviz make sure to add it to the system path. + + Parameters + ---------- + feature_names : array-like of shape (n_features, ), default=None + Names of each of the features. If None, generic names + will be used (“X[0]”, “X[1]”, …). + + max_depth : int, default=None + The maximum depth of the representation. If None, the tree + is fully generated. + + Returns + ------- + A Jupyter notebook Image object if Jupyter is installed. + This enables in-line display of the model plots in notebooks. + Splitting nodes have a rectangular shape while leaf nodes + have a circular one. + """ + from IPython.display import Image + + graph = self.model_to_dot(feature_names=feature_names, max_depth=max_depth) + + return Image(graph.create_png()) + + +class _LinearBoosting(TransformerMixin, BaseEstimator): + """Base class for Linear Boosting meta-estimator. + + Warning: This class should not be used directly. Use derived classes + instead. + """ + + def __init__( + self, + base_estimator, + *, + loss, + n_estimators, + max_depth, + min_samples_split, + min_samples_leaf, + min_weight_fraction_leaf, + max_features, + random_state, + max_leaf_nodes, + min_impurity_decrease, + ccp_alpha + ): + + self.base_estimator = base_estimator + self.loss = loss + self.n_estimators = n_estimators + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.min_weight_fraction_leaf = min_weight_fraction_leaf + self.max_features = max_features + self.random_state = random_state + self.max_leaf_nodes = max_leaf_nodes + self.min_impurity_decrease = min_impurity_decrease + self.ccp_alpha = ccp_alpha + + def _fit(self, X, y, sample_weight=None): + """Build a Linear Boosting from the training set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + + y : array-like of shape (n_samples, ) or also (n_samples, n_targets) for + multitarget regression. + The target values (class labels in classification, real numbers in + regression). + + sample_weight : array-like of shape (n_samples, ), default=None + Sample weights. + + Returns + ------- + self : object + """ + if not hasattr(self.base_estimator, "fit_intercept"): + raise ValueError( + "Only linear models are accepted as base_estimator. " + "Select one from linear_model class of scikit-learn." + ) + + if self.n_estimators <= 0: + raise ValueError( + "n_estimators must be an integer greater than 0 but " + "got {}".format(self.n_estimators) + ) + + n_sample, self.n_features_in_ = X.shape + + self._trees = [] + self._leaves = [] + + for i in range(self.n_estimators): + + estimator = deepcopy(self.base_estimator) + estimator.fit(X, y, sample_weight=sample_weight) + + if self.loss == "entropy": + pred = estimator.predict_proba(X) + else: + pred = estimator.predict(X) + + if hasattr(self, "classes_"): + resid = SCORING[self.loss](y, pred, self.classes_) + else: + resid = SCORING[self.loss](y, pred) + + if resid.ndim > 1: + resid = resid.mean(1) + + criterion = "squared_error" if _sklearn_v1 else "mse" + + tree = DecisionTreeRegressor( + criterion=criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + min_weight_fraction_leaf=self.min_weight_fraction_leaf, + max_features=self.max_features, + random_state=self.random_state, + max_leaf_nodes=self.max_leaf_nodes, + min_impurity_decrease=self.min_impurity_decrease, + ccp_alpha=self.ccp_alpha, + ) + + tree.fit(X, resid, sample_weight=sample_weight, check_input=False) + self._trees.append(tree) + + pred_tree = np.abs(tree.predict(X, check_input=False)) + worst_pred = np.max(pred_tree) + self._leaves.append(worst_pred) + + pred_tree = (pred_tree == worst_pred).astype(np.float32) + pred_tree = pred_tree.reshape(-1, 1) + X = np.concatenate([X, pred_tree], axis=1) + + self.base_estimator_ = deepcopy(self.base_estimator) + self.base_estimator_.fit(X, y, sample_weight=sample_weight) + + if hasattr(self.base_estimator_, "coef_"): + self.coef_ = self.base_estimator_.coef_ + + if hasattr(self.base_estimator_, "intercept_"): + self.intercept_ = self.base_estimator_.intercept_ + + self.n_features_out_ = X.shape[1] + + return self + + def transform(self, X): + """Transform dataset. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input data to be transformed. Use ``dtype=np.float32`` for maximum + efficiency. + + Returns + ------- + X_transformed : ndarray of shape (n_samples, n_out) + Transformed dataset. + `n_out` is equal to `n_features` + `n_estimators` + """ + check_is_fitted(self, attributes="base_estimator_") + X = check_array(X, dtype=np.float32, accept_sparse=False) + self._check_n_features(X, reset=False) + + for tree, leaf in zip(self._trees, self._leaves): + pred_tree = np.abs(tree.predict(X, check_input=False)) + pred_tree = (pred_tree == leaf).astype(np.float32) + pred_tree = pred_tree.reshape(-1, 1) + X = np.concatenate([X, pred_tree], axis=1) + + return X + + +class _LinearForest(BaseEstimator): + """Base class for Linear Forest meta-estimator. + + Warning: This class should not be used directly. Use derived classes + instead. + """ + + def __init__( + self, + base_estimator, + *, + n_estimators, + max_depth, + min_samples_split, + min_samples_leaf, + min_weight_fraction_leaf, + max_features, + max_leaf_nodes, + min_impurity_decrease, + bootstrap, + oob_score, + n_jobs, + random_state, + ccp_alpha, + max_samples + ): + + self.base_estimator = base_estimator + self.n_estimators = n_estimators + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.min_weight_fraction_leaf = min_weight_fraction_leaf + self.max_features = max_features + self.max_leaf_nodes = max_leaf_nodes + self.min_impurity_decrease = min_impurity_decrease + self.bootstrap = bootstrap + self.oob_score = oob_score + self.n_jobs = n_jobs + self.random_state = random_state + self.ccp_alpha = ccp_alpha + self.max_samples = max_samples + + def _sigmoid(self, y): + """Expit function (a.k.a. logistic sigmoid). + + Parameters + ---------- + y : array-like of shape (n_samples, ) + The array to apply expit to element-wise. + + Returns + ------- + y : array-like of shape (n_samples, ) + Expits. + """ + return np.exp(y) / (1 + np.exp(y)) + + def _inv_sigmoid(self, y): + """Logit function. + + Parameters + ---------- + y : array-like of shape (n_samples, ) + The array to apply logit to element-wise. + + Returns + ------- + y : array-like of shape (n_samples, ) + Logits. + """ + y = y.clip(1e-3, 1 - 1e-3) + + return np.log(y / (1 - y)) + + def _fit(self, X, y, sample_weight=None): + """Build a Linear Boosting from the training set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + + y : array-like of shape (n_samples, ) or also (n_samples, n_targets) for + multitarget regression. + The target values (class labels in classification, real numbers in + regression). + + sample_weight : array-like of shape (n_samples, ), default=None + Sample weights. + + Returns + ------- + self : object + """ + if not hasattr(self.base_estimator, "fit_intercept"): + raise ValueError( + "Only linear models are accepted as base_estimator. " + "Select one from linear_model class of scikit-learn." + ) + + if not is_regressor(self.base_estimator): + raise ValueError("Select a regressor linear model as base_estimator.") + + n_sample, self.n_features_in_ = X.shape + + if hasattr(self, "classes_"): + class_to_int = dict(map(reversed, enumerate(self.classes_))) + y = np.array([class_to_int[i] for i in y]) + y = self._inv_sigmoid(y) + + self.base_estimator_ = deepcopy(self.base_estimator) + self.base_estimator_.fit(X, y, sample_weight) + resid = y - self.base_estimator_.predict(X) + + criterion = "squared_error" if _sklearn_v1 else "mse" + + self.forest_estimator_ = RandomForestRegressor( + n_estimators=self.n_estimators, + criterion=criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + min_weight_fraction_leaf=self.min_weight_fraction_leaf, + max_features=self.max_features, + max_leaf_nodes=self.max_leaf_nodes, + min_impurity_decrease=self.min_impurity_decrease, + bootstrap=self.bootstrap, + oob_score=self.oob_score, + n_jobs=self.n_jobs, + random_state=self.random_state, + ccp_alpha=self.ccp_alpha, + max_samples=self.max_samples, + ) + self.forest_estimator_.fit(X, resid, sample_weight) + + if hasattr(self.base_estimator_, "coef_"): + self.coef_ = self.base_estimator_.coef_ + + if hasattr(self.base_estimator_, "intercept_"): + self.intercept_ = self.base_estimator_.intercept_ + + self.feature_importances_ = self.forest_estimator_.feature_importances_ + + return self + + def apply(self, X): + """Apply trees in the forest to X, return leaf indices. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + X_leaves : ndarray of shape (n_samples, n_estimators) + For each datapoint x in X and for each tree in the forest, + return the index of the leaf x ends up in. + """ + check_is_fitted(self, attributes="base_estimator_") + + return self.forest_estimator_.apply(X) + + def decision_path(self, X): + """Return the decision path in the forest. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + indicator : sparse matrix of shape (n_samples, n_nodes) + Return a node indicator matrix where non zero elements indicates + that the samples goes through the nodes. The matrix is of CSR + format. + + n_nodes_ptr : ndarray of shape (n_estimators + 1, ) + The columns from indicator[n_nodes_ptr[i]:n_nodes_ptr[i+1]] + gives the indicator value for the i-th estimator. + """ + check_is_fitted(self, attributes="base_estimator_") + + return self.forest_estimator_.decision_path(X) |