summaryrefslogtreecommitdiff
path: root/ext/lightgbm/dask.py
diff options
context:
space:
mode:
authorBirte Kristina Friesel <birte.friesel@uos.de>2024-03-06 08:05:12 +0100
committerBirte Kristina Friesel <birte.friesel@uos.de>2024-03-06 08:05:12 +0100
commit08636303e6bef99ea5f997a8ba4a41256aabee6b (patch)
treeb8480c4d5289dedb06b3e3338953666900a849ee /ext/lightgbm/dask.py
parent112ebbe21ff330c68faf883b125ba4932e007544 (diff)
import lightgbm
Diffstat (limited to 'ext/lightgbm/dask.py')
-rw-r--r--ext/lightgbm/dask.py1671
1 files changed, 1671 insertions, 0 deletions
diff --git a/ext/lightgbm/dask.py b/ext/lightgbm/dask.py
new file mode 100644
index 0000000..88e4779
--- /dev/null
+++ b/ext/lightgbm/dask.py
@@ -0,0 +1,1671 @@
+# coding: utf-8
+"""Distributed training with LightGBM and dask.distributed.
+
+This module enables you to perform distributed training with LightGBM on
+dask.Array and dask.DataFrame collections.
+
+It is based on dask-lightgbm, which was based on dask-xgboost.
+"""
+import operator
+import socket
+from collections import defaultdict
+from copy import deepcopy
+from enum import Enum, auto
+from functools import partial
+from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
+from urllib.parse import urlparse
+
+import numpy as np
+import scipy.sparse as ss
+
+from .basic import LightGBMError, _choose_param_value, _ConfigAliases, _log_info, _log_warning
+from .compat import (DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED, Client, Future, LGBMNotFittedError, concat,
+ dask_Array, dask_array_from_delayed, dask_bag_from_delayed, dask_DataFrame, dask_Series,
+ default_client, delayed, pd_DataFrame, pd_Series, wait)
+from .sklearn import (LGBMClassifier, LGBMModel, LGBMRanker, LGBMRegressor, _LGBM_ScikitCustomObjectiveFunction,
+ _LGBM_ScikitEvalMetricType, _lgbmmodel_doc_custom_eval_note, _lgbmmodel_doc_fit,
+ _lgbmmodel_doc_predict)
+
+__all__ = [
+ 'DaskLGBMClassifier',
+ 'DaskLGBMRanker',
+ 'DaskLGBMRegressor',
+]
+
+_DaskCollection = Union[dask_Array, dask_DataFrame, dask_Series]
+_DaskMatrixLike = Union[dask_Array, dask_DataFrame]
+_DaskVectorLike = Union[dask_Array, dask_Series]
+_DaskPart = Union[np.ndarray, pd_DataFrame, pd_Series, ss.spmatrix]
+_PredictionDtype = Union[Type[np.float32], Type[np.float64], Type[np.int32], Type[np.int64]]
+
+
+class _RemoteSocket:
+ def acquire(self) -> int:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind(('', 0))
+ return self.socket.getsockname()[1]
+
+ def release(self) -> None:
+ self.socket.close()
+
+
+def _acquire_port() -> Tuple[_RemoteSocket, int]:
+ s = _RemoteSocket()
+ port = s.acquire()
+ return s, port
+
+
+class _DatasetNames(Enum):
+ """Placeholder names used by lightgbm.dask internals to say 'also evaluate the training data'.
+
+ Avoid duplicating the training data when the validation set refers to elements of training data.
+ """
+
+ TRAINSET = auto()
+ SAMPLE_WEIGHT = auto()
+ INIT_SCORE = auto()
+ GROUP = auto()
+
+
+def _get_dask_client(client: Optional[Client]) -> Client:
+ """Choose a Dask client to use.
+
+ Parameters
+ ----------
+ client : dask.distributed.Client or None
+ Dask client.
+
+ Returns
+ -------
+ client : dask.distributed.Client
+ A Dask client.
+ """
+ if client is None:
+ return default_client()
+ else:
+ return client
+
+
+def _assign_open_ports_to_workers(
+ client: Client,
+ workers: List[str],
+) -> Tuple[Dict[str, Future], Dict[str, int]]:
+ """Assign an open port to each worker.
+
+ Returns
+ -------
+ worker_to_socket_future: dict
+ mapping from worker address to a future pointing to the remote socket.
+ worker_to_port: dict
+ mapping from worker address to an open port in the worker's host.
+ """
+ # Acquire port in worker
+ worker_to_future = {}
+ for worker in workers:
+ worker_to_future[worker] = client.submit(
+ _acquire_port,
+ workers=[worker],
+ allow_other_workers=False,
+ pure=False,
+ )
+
+ # schedule futures to retrieve each element of the tuple
+ worker_to_socket_future = {}
+ worker_to_port_future = {}
+ for worker, socket_future in worker_to_future.items():
+ worker_to_socket_future[worker] = client.submit(operator.itemgetter(0), socket_future)
+ worker_to_port_future[worker] = client.submit(operator.itemgetter(1), socket_future)
+
+ # retrieve ports
+ worker_to_port = client.gather(worker_to_port_future)
+
+ return worker_to_socket_future, worker_to_port
+
+
+def _concat(seq: List[_DaskPart]) -> _DaskPart:
+ if isinstance(seq[0], np.ndarray):
+ return np.concatenate(seq, axis=0)
+ elif isinstance(seq[0], (pd_DataFrame, pd_Series)):
+ return concat(seq, axis=0)
+ elif isinstance(seq[0], ss.spmatrix):
+ return ss.vstack(seq, format='csr')
+ else:
+ raise TypeError(f'Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got {type(seq[0]).__name__}.')
+
+
+def _remove_list_padding(*args: Any) -> List[List[Any]]:
+ return [[z for z in arg if z is not None] for arg in args]
+
+
+def _pad_eval_names(lgbm_model: LGBMModel, required_names: List[str]) -> LGBMModel:
+ """Append missing (key, value) pairs to a LightGBM model's evals_result_ and best_score_ OrderedDict attrs based on a set of required eval_set names.
+
+ Allows users to rely on expected eval_set names being present when fitting DaskLGBM estimators with ``eval_set``.
+ """
+ for eval_name in required_names:
+ if eval_name not in lgbm_model.evals_result_:
+ lgbm_model.evals_result_[eval_name] = {}
+ if eval_name not in lgbm_model.best_score_:
+ lgbm_model.best_score_[eval_name] = {}
+
+ return lgbm_model
+
+
+def _train_part(
+ params: Dict[str, Any],
+ model_factory: Type[LGBMModel],
+ list_of_parts: List[Dict[str, _DaskPart]],
+ machines: str,
+ local_listen_port: int,
+ num_machines: int,
+ return_model: bool,
+ time_out: int,
+ remote_socket: _RemoteSocket,
+ **kwargs: Any
+) -> Optional[LGBMModel]:
+ network_params = {
+ 'machines': machines,
+ 'local_listen_port': local_listen_port,
+ 'time_out': time_out,
+ 'num_machines': num_machines
+ }
+ params.update(network_params)
+
+ is_ranker = issubclass(model_factory, LGBMRanker)
+
+ # Concatenate many parts into one
+ data = _concat([x['data'] for x in list_of_parts])
+ label = _concat([x['label'] for x in list_of_parts])
+
+ if 'weight' in list_of_parts[0]:
+ weight = _concat([x['weight'] for x in list_of_parts])
+ else:
+ weight = None
+
+ if 'group' in list_of_parts[0]:
+ group = _concat([x['group'] for x in list_of_parts])
+ else:
+ group = None
+
+ if 'init_score' in list_of_parts[0]:
+ init_score = _concat([x['init_score'] for x in list_of_parts])
+ else:
+ init_score = None
+
+ # construct local eval_set data.
+ n_evals = max(len(x.get('eval_set', [])) for x in list_of_parts)
+ eval_names = kwargs.pop('eval_names', None)
+ eval_class_weight = kwargs.get('eval_class_weight')
+ local_eval_set = None
+ local_eval_names = None
+ local_eval_sample_weight = None
+ local_eval_init_score = None
+ local_eval_group = None
+
+ if n_evals:
+ has_eval_sample_weight = any(x.get('eval_sample_weight') is not None for x in list_of_parts)
+ has_eval_init_score = any(x.get('eval_init_score') is not None for x in list_of_parts)
+
+ local_eval_set = []
+ evals_result_names = []
+ if has_eval_sample_weight:
+ local_eval_sample_weight = []
+ if has_eval_init_score:
+ local_eval_init_score = []
+ if is_ranker:
+ local_eval_group = []
+
+ # store indices of eval_set components that were not contained within local parts.
+ missing_eval_component_idx = []
+
+ # consolidate parts of each individual eval component.
+ for i in range(n_evals):
+ x_e = []
+ y_e = []
+ w_e = []
+ init_score_e = []
+ g_e = []
+ for part in list_of_parts:
+ if not part.get('eval_set'):
+ continue
+
+ # require that eval_name exists in evaluated result data in case dropped due to padding.
+ # in distributed training the 'training' eval_set is not detected, will have name 'valid_<index>'.
+ if eval_names:
+ evals_result_name = eval_names[i]
+ else:
+ evals_result_name = f'valid_{i}'
+
+ eval_set = part['eval_set'][i]
+ if eval_set is _DatasetNames.TRAINSET:
+ x_e.append(part['data'])
+ y_e.append(part['label'])
+ else:
+ x_e.extend(eval_set[0])
+ y_e.extend(eval_set[1])
+
+ if evals_result_name not in evals_result_names:
+ evals_result_names.append(evals_result_name)
+
+ eval_weight = part.get('eval_sample_weight')
+ if eval_weight:
+ if eval_weight[i] is _DatasetNames.SAMPLE_WEIGHT:
+ w_e.append(part['weight'])
+ else:
+ w_e.extend(eval_weight[i])
+
+ eval_init_score = part.get('eval_init_score')
+ if eval_init_score:
+ if eval_init_score[i] is _DatasetNames.INIT_SCORE:
+ init_score_e.append(part['init_score'])
+ else:
+ init_score_e.extend(eval_init_score[i])
+
+ eval_group = part.get('eval_group')
+ if eval_group:
+ if eval_group[i] is _DatasetNames.GROUP:
+ g_e.append(part['group'])
+ else:
+ g_e.extend(eval_group[i])
+
+ # filter padding from eval parts then _concat each eval_set component.
+ x_e, y_e, w_e, init_score_e, g_e = _remove_list_padding(x_e, y_e, w_e, init_score_e, g_e)
+ if x_e:
+ local_eval_set.append((_concat(x_e), _concat(y_e)))
+ else:
+ missing_eval_component_idx.append(i)
+ continue
+
+ if w_e:
+ local_eval_sample_weight.append(_concat(w_e))
+ if init_score_e:
+ local_eval_init_score.append(_concat(init_score_e))
+ if g_e:
+ local_eval_group.append(_concat(g_e))
+
+ # reconstruct eval_set fit args/kwargs depending on which components of eval_set are on worker.
+ eval_component_idx = [i for i in range(n_evals) if i not in missing_eval_component_idx]
+ if eval_names:
+ local_eval_names = [eval_names[i] for i in eval_component_idx]
+ if eval_class_weight:
+ kwargs['eval_class_weight'] = [eval_class_weight[i] for i in eval_component_idx]
+
+ model = model_factory(**params)
+ if remote_socket is not None:
+ remote_socket.release()
+ try:
+ if is_ranker:
+ model.fit(
+ data,
+ label,
+ sample_weight=weight,
+ init_score=init_score,
+ group=group,
+ eval_set=local_eval_set,
+ eval_sample_weight=local_eval_sample_weight,
+ eval_init_score=local_eval_init_score,
+ eval_group=local_eval_group,
+ eval_names=local_eval_names,
+ **kwargs
+ )
+ else:
+ model.fit(
+ data,
+ label,
+ sample_weight=weight,
+ init_score=init_score,
+ eval_set=local_eval_set,
+ eval_sample_weight=local_eval_sample_weight,
+ eval_init_score=local_eval_init_score,
+ eval_names=local_eval_names,
+ **kwargs
+ )
+
+ finally:
+ if getattr(model, "fitted_", False):
+ model.booster_.free_network()
+
+ if n_evals:
+ # ensure that expected keys for evals_result_ and best_score_ exist regardless of padding.
+ model = _pad_eval_names(model, required_names=evals_result_names)
+
+ return model if return_model else None
+
+
+def _split_to_parts(data: _DaskCollection, is_matrix: bool) -> List[_DaskPart]:
+ parts = data.to_delayed()
+ if isinstance(parts, np.ndarray):
+ if is_matrix:
+ assert parts.shape[1] == 1
+ else:
+ assert parts.ndim == 1 or parts.shape[1] == 1
+ parts = parts.flatten().tolist()
+ return parts
+
+
+def _machines_to_worker_map(machines: str, worker_addresses: Iterable[str]) -> Dict[str, int]:
+ """Create a worker_map from machines list.
+
+ Given ``machines`` and a list of Dask worker addresses, return a mapping where the keys are
+ ``worker_addresses`` and the values are ports from ``machines``.
+
+ Parameters
+ ----------
+ machines : str
+ A comma-delimited list of workers, of the form ``ip1:port,ip2:port``.
+ worker_addresses : list of str
+ An iterable of Dask worker addresses, of the form ``{protocol}{hostname}:{port}``, where ``port`` is the port Dask's scheduler uses to talk to that worker.
+
+ Returns
+ -------
+ result : Dict[str, int]
+ Dictionary where keys are work addresses in the form expected by Dask and values are a port for LightGBM to use.
+ """
+ machine_addresses = machines.split(",")
+
+ if len(set(machine_addresses)) != len(machine_addresses):
+ raise ValueError(f"Found duplicates in 'machines' ({machines}). Each entry in 'machines' must be a unique IP-port combination.")
+
+ machine_to_port = defaultdict(set)
+ for address in machine_addresses:
+ host, port = address.split(":")
+ machine_to_port[host].add(int(port))
+
+ out = {}
+ for address in worker_addresses:
+ worker_host = urlparse(address).hostname
+ if not worker_host:
+ raise ValueError(f"Could not parse host name from worker address '{address}'")
+ out[address] = machine_to_port[worker_host].pop()
+
+ return out
+
+
+def _train(
+ client: Client,
+ data: _DaskMatrixLike,
+ label: _DaskCollection,
+ params: Dict[str, Any],
+ model_factory: Type[LGBMModel],
+ sample_weight: Optional[_DaskVectorLike] = None,
+ init_score: Optional[_DaskCollection] = None,
+ group: Optional[_DaskVectorLike] = None,
+ eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
+ eval_names: Optional[List[str]] = None,
+ eval_sample_weight: Optional[List[_DaskVectorLike]] = None,
+ eval_class_weight: Optional[List[Union[dict, str]]] = None,
+ eval_init_score: Optional[List[_DaskCollection]] = None,
+ eval_group: Optional[List[_DaskVectorLike]] = None,
+ eval_metric: Optional[_LGBM_ScikitEvalMetricType] = None,
+ eval_at: Optional[Union[List[int], Tuple[int, ...]]] = None,
+ **kwargs: Any
+) -> LGBMModel:
+ """Inner train routine.
+
+ Parameters
+ ----------
+ client : dask.distributed.Client
+ Dask client.
+ data : Dask Array or Dask DataFrame of shape = [n_samples, n_features]
+ Input feature matrix.
+ label : Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]
+ The target values (class labels in classification, real numbers in regression).
+ params : dict
+ Parameters passed to constructor of the local underlying model.
+ model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
+ Class of the local underlying model.
+ sample_weight : Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)
+ Weights of training data. Weights should be non-negative.
+ init_score : Dask Array or Dask Series of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task), or Dask Array or Dask DataFrame of shape = [n_samples, n_classes] (for multi-class task), or None, optional (default=None)
+ Init score of training data.
+ group : Dask Array or Dask Series or None, optional (default=None)
+ Group/query data.
+ Only used in the learning-to-rank task.
+ sum(group) = n_samples.
+ For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups,
+ where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc.
+ eval_set : list of (X, y) tuples of Dask data collections, or None, optional (default=None)
+ List of (X, y) tuple pairs to use as validation sets.
+ Note, that not all workers may receive chunks of every eval set within ``eval_set``. When the returned
+ lightgbm estimator is not trained using any chunks of a particular eval set, its corresponding component
+ of ``evals_result_`` and ``best_score_`` will be empty dictionaries.
+ eval_names : list of str, or None, optional (default=None)
+ Names of eval_set.
+ eval_sample_weight : list of Dask Array or Dask Series, or None, optional (default=None)
+ Weights for each validation set in eval_set. Weights should be non-negative.
+ eval_class_weight : list of dict or str, or None, optional (default=None)
+ Class weights, one dict or str for each validation set in eval_set.
+ eval_init_score : list of Dask Array, Dask Series or Dask DataFrame (for multi-class task), or None, optional (default=None)
+ Initial model score for each validation set in eval_set.
+ eval_group : list of Dask Array or Dask Series, or None, optional (default=None)
+ Group/query for each validation set in eval_set.
+ eval_metric : str, callable, list or None, optional (default=None)
+ If str, it should be a built-in evaluation metric to use.
+ If callable, it should be a custom evaluation metric, see note below for more details.
+ If list, it can be a list of built-in metrics, a list of custom evaluation metrics, or a mix of both.
+ In either case, the ``metric`` from the Dask model parameters (or inferred from the objective) will be evaluated and used as well.
+ Default: 'l2' for DaskLGBMRegressor, 'binary(multi)_logloss' for DaskLGBMClassifier, 'ndcg' for DaskLGBMRanker.
+ eval_at : list or tuple of int, optional (default=None)
+ The evaluation positions of the specified ranking metric.
+ **kwargs
+ Other parameters passed to ``fit`` method of the local underlying model.
+
+ Returns
+ -------
+ model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
+ Returns fitted underlying model.
+
+ Note
+ ----
+
+ This method handles setting up the following network parameters based on information
+ about the Dask cluster referenced by ``client``.
+
+ * ``local_listen_port``: port that each LightGBM worker opens a listening socket on,
+ to accept connections from other workers. This can differ from LightGBM worker
+ to LightGBM worker, but does not have to.
+ * ``machines``: a comma-delimited list of all workers in the cluster, in the
+ form ``ip:port,ip:port``. If running multiple Dask workers on the same host, use different
+ ports for each worker. For example, for ``LocalCluster(n_workers=3)``, you might
+ pass ``"127.0.0.1:12400,127.0.0.1:12401,127.0.0.1:12402"``.
+ * ``num_machines``: number of LightGBM workers.
+ * ``timeout``: time in minutes to wait before closing unused sockets.
+
+ The default behavior of this function is to generate ``machines`` from the list of
+ Dask workers which hold some piece of the training data, and to search for an open
+ port on each worker to be used as ``local_listen_port``.
+
+ If ``machines`` is provided explicitly in ``params``, this function uses the hosts
+ and ports in that list directly, and does not do any searching. This means that if
+ any of the Dask workers are missing from the list or any of those ports are not free
+ when training starts, training will fail.
+
+ If ``local_listen_port`` is provided in ``params`` and ``machines`` is not, this function
+ constructs ``machines`` from the list of Dask workers which hold some piece of the
+ training data, assuming that each one will use the same ``local_listen_port``.
+ """
+ params = deepcopy(params)
+
+ # capture whether local_listen_port or its aliases were provided
+ listen_port_in_params = any(
+ alias in params for alias in _ConfigAliases.get("local_listen_port")
+ )
+
+ # capture whether machines or its aliases were provided
+ machines_in_params = any(
+ alias in params for alias in _ConfigAliases.get("machines")
+ )
+
+ params = _choose_param_value(
+ main_param_name="tree_learner",
+ params=params,
+ default_value="data"
+ )
+ allowed_tree_learners = {
+ 'data',
+ 'data_parallel',
+ 'feature',
+ 'feature_parallel',
+ 'voting',
+ 'voting_parallel'
+ }
+ if params["tree_learner"] not in allowed_tree_learners:
+ _log_warning(f'Parameter tree_learner set to {params["tree_learner"]}, which is not allowed. Using "data" as default')
+ params['tree_learner'] = 'data'
+
+ # Some passed-in parameters can be removed:
+ # * 'num_machines': set automatically from Dask worker list
+ # * 'num_threads': overridden to match nthreads on each Dask process
+ for param_alias in _ConfigAliases.get('num_machines', 'num_threads'):
+ if param_alias in params:
+ _log_warning(f"Parameter {param_alias} will be ignored.")
+ params.pop(param_alias)
+
+ # Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality
+ data_parts = _split_to_parts(data=data, is_matrix=True)
+ label_parts = _split_to_parts(data=label, is_matrix=False)
+ parts = [{'data': x, 'label': y} for (x, y) in zip(data_parts, label_parts)]
+ n_parts = len(parts)
+
+ if sample_weight is not None:
+ weight_parts = _split_to_parts(data=sample_weight, is_matrix=False)
+ for i in range(n_parts):
+ parts[i]['weight'] = weight_parts[i]
+
+ if group is not None:
+ group_parts = _split_to_parts(data=group, is_matrix=False)
+ for i in range(n_parts):
+ parts[i]['group'] = group_parts[i]
+
+ if init_score is not None:
+ init_score_parts = _split_to_parts(data=init_score, is_matrix=False)
+ for i in range(n_parts):
+ parts[i]['init_score'] = init_score_parts[i]
+
+ # evals_set will to be re-constructed into smaller lists of (X, y) tuples, where
+ # X and y are each delayed sub-lists of original eval dask Collections.
+ if eval_set:
+ # find maximum number of parts in an individual eval set so that we can
+ # pad eval sets when they come in different sizes.
+ n_largest_eval_parts = max(x[0].npartitions for x in eval_set)
+
+ eval_sets: Dict[
+ int,
+ List[
+ Union[
+ _DatasetNames,
+ Tuple[
+ List[Optional[_DaskMatrixLike]],
+ List[Optional[_DaskVectorLike]]
+ ]
+ ]
+ ]
+ ] = defaultdict(list)
+ if eval_sample_weight:
+ eval_sample_weights: Dict[
+ int,
+ List[
+ Union[
+ _DatasetNames,
+ List[Optional[_DaskVectorLike]]
+ ]
+ ]
+ ] = defaultdict(list)
+ if eval_group:
+ eval_groups: Dict[
+ int,
+ List[
+ Union[
+ _DatasetNames,
+ List[Optional[_DaskVectorLike]]
+ ]
+ ]
+ ] = defaultdict(list)
+ if eval_init_score:
+ eval_init_scores: Dict[
+ int,
+ List[
+ Union[
+ _DatasetNames,
+ List[Optional[_DaskMatrixLike]]
+ ]
+ ]
+ ] = defaultdict(list)
+
+ for i, (X_eval, y_eval) in enumerate(eval_set):
+ n_this_eval_parts = X_eval.npartitions
+
+ # when individual eval set is equivalent to training data, skip recomputing parts.
+ if X_eval is data and y_eval is label:
+ for parts_idx in range(n_parts):
+ eval_sets[parts_idx].append(_DatasetNames.TRAINSET)
+ else:
+ eval_x_parts = _split_to_parts(data=X_eval, is_matrix=True)
+ eval_y_parts = _split_to_parts(data=y_eval, is_matrix=False)
+ for j in range(n_largest_eval_parts):
+ parts_idx = j % n_parts
+
+ # add None-padding for individual eval_set member if it is smaller than the largest member.
+ if j < n_this_eval_parts:
+ x_e = eval_x_parts[j]
+ y_e = eval_y_parts[j]
+ else:
+ x_e = None
+ y_e = None
+
+ if j < n_parts:
+ # first time a chunk of this eval set is added to this part.
+ eval_sets[parts_idx].append(([x_e], [y_e]))
+ else:
+ # append additional chunks of this eval set to this part.
+ eval_sets[parts_idx][-1][0].append(x_e) # type: ignore[index, union-attr]
+ eval_sets[parts_idx][-1][1].append(y_e) # type: ignore[index, union-attr]
+
+ if eval_sample_weight:
+ if eval_sample_weight[i] is sample_weight:
+ for parts_idx in range(n_parts):
+ eval_sample_weights[parts_idx].append(_DatasetNames.SAMPLE_WEIGHT)
+ else:
+ eval_w_parts = _split_to_parts(data=eval_sample_weight[i], is_matrix=False)
+
+ # ensure that all evaluation parts map uniquely to one part.
+ for j in range(n_largest_eval_parts):
+ if j < n_this_eval_parts:
+ w_e = eval_w_parts[j]
+ else:
+ w_e = None
+
+ parts_idx = j % n_parts
+ if j < n_parts:
+ eval_sample_weights[parts_idx].append([w_e])
+ else:
+ eval_sample_weights[parts_idx][-1].append(w_e) # type: ignore[union-attr]
+
+ if eval_init_score:
+ if eval_init_score[i] is init_score:
+ for parts_idx in range(n_parts):
+ eval_init_scores[parts_idx].append(_DatasetNames.INIT_SCORE)
+ else:
+ eval_init_score_parts = _split_to_parts(data=eval_init_score[i], is_matrix=False)
+ for j in range(n_largest_eval_parts):
+ if j < n_this_eval_parts:
+ init_score_e = eval_init_score_parts[j]
+ else:
+ init_score_e = None
+
+ parts_idx = j % n_parts
+ if j < n_parts:
+ eval_init_scores[parts_idx].append([init_score_e])
+ else:
+ eval_init_scores[parts_idx][-1].append(init_score_e) # type: ignore[union-attr]
+
+ if eval_group:
+ if eval_group[i] is group:
+ for parts_idx in range(n_parts):
+ eval_groups[parts_idx].append(_DatasetNames.GROUP)
+ else:
+ eval_g_parts = _split_to_parts(data=eval_group[i], is_matrix=False)
+ for j in range(n_largest_eval_parts):
+ if j < n_this_eval_parts:
+ g_e = eval_g_parts[j]
+ else:
+ g_e = None
+
+ parts_idx = j % n_parts
+ if j < n_parts:
+ eval_groups[parts_idx].append([g_e])
+ else:
+ eval_groups[parts_idx][-1].append(g_e) # type: ignore[union-attr]
+
+ # assign sub-eval_set components to worker parts.
+ for parts_idx, e_set in eval_sets.items():
+ parts[parts_idx]['eval_set'] = e_set
+ if eval_sample_weight:
+ parts[parts_idx]['eval_sample_weight'] = eval_sample_weights[parts_idx]
+ if eval_init_score:
+ parts[parts_idx]['eval_init_score'] = eval_init_scores[parts_idx]
+ if eval_group:
+ parts[parts_idx]['eval_group'] = eval_groups[parts_idx]
+
+ # Start computation in the background
+ parts = list(map(delayed, parts))
+ parts = client.compute(parts)
+ wait(parts)
+
+ for part in parts:
+ if part.status == 'error': # type: ignore
+ # trigger error locally
+ return part # type: ignore[return-value]
+
+ # Find locations of all parts and map them to particular Dask workers
+ key_to_part_dict = {part.key: part for part in parts} # type: ignore
+ who_has = client.who_has(parts)
+ worker_map = defaultdict(list)
+ for key, workers in who_has.items():
+ worker_map[next(iter(workers))].append(key_to_part_dict[key])
+
+ # Check that all workers were provided some of eval_set. Otherwise warn user that validation
+ # data artifacts may not be populated depending on worker returning final estimator.
+ if eval_set:
+ for worker in worker_map:
+ has_eval_set = False
+ for part in worker_map[worker]:
+ if 'eval_set' in part.result(): # type: ignore[attr-defined]
+ has_eval_set = True
+ break
+
+ if not has_eval_set:
+ _log_warning(
+ f"Worker {worker} was not allocated eval_set data. Therefore evals_result_ and best_score_ data may be unreliable. "
+ "Try rebalancing data across workers."
+ )
+
+ # assign general validation set settings to fit kwargs.
+ if eval_names:
+ kwargs['eval_names'] = eval_names
+ if eval_class_weight:
+ kwargs['eval_class_weight'] = eval_class_weight
+ if eval_metric:
+ kwargs['eval_metric'] = eval_metric
+ if eval_at:
+ kwargs['eval_at'] = eval_at
+
+ master_worker = next(iter(worker_map))
+ worker_ncores = client.ncores()
+
+ # resolve aliases for network parameters and pop the result off params.
+ # these values are added back in calls to `_train_part()`
+ params = _choose_param_value(
+ main_param_name="local_listen_port",
+ params=params,
+ default_value=12400
+ )
+ local_listen_port = params.pop("local_listen_port")
+
+ params = _choose_param_value(
+ main_param_name="machines",
+ params=params,
+ default_value=None
+ )
+ machines = params.pop("machines")
+
+ # figure out network params
+ worker_to_socket_future: Dict[str, Future] = {}
+ worker_addresses = worker_map.keys()
+ if machines is not None:
+ _log_info("Using passed-in 'machines' parameter")
+ worker_address_to_port = _machines_to_worker_map(
+ machines=machines,
+ worker_addresses=worker_addresses
+ )
+ else:
+ if listen_port_in_params:
+ _log_info("Using passed-in 'local_listen_port' for all workers")
+ unique_hosts = {urlparse(a).hostname for a in worker_addresses}
+ if len(unique_hosts) < len(worker_addresses):
+ msg = (
+ "'local_listen_port' was provided in Dask training parameters, but at least one "
+ "machine in the cluster has multiple Dask worker processes running on it. Please omit "
+ "'local_listen_port' or pass 'machines'."
+ )
+ raise LightGBMError(msg)
+
+ worker_address_to_port = {
+ address: local_listen_port
+ for address in worker_addresses
+ }
+ else:
+ _log_info("Finding random open ports for workers")
+ worker_to_socket_future, worker_address_to_port = _assign_open_ports_to_workers(client, list(worker_map.keys()))
+
+ machines = ','.join([
+ f'{urlparse(worker_address).hostname}:{port}'
+ for worker_address, port
+ in worker_address_to_port.items()
+ ])
+
+ num_machines = len(worker_address_to_port)
+
+ # Tell each worker to train on the parts that it has locally
+ #
+ # This code treats ``_train_part()`` calls as not "pure" because:
+ # 1. there is randomness in the training process unless parameters ``seed``
+ # and ``deterministic`` are set
+ # 2. even with those parameters set, the output of one ``_train_part()`` call
+ # relies on global state (it and all the other LightGBM training processes
+ # coordinate with each other)
+ futures_classifiers = [
+ client.submit(
+ _train_part,
+ model_factory=model_factory,
+ params={**params, 'num_threads': worker_ncores[worker]},
+ list_of_parts=list_of_parts,
+ machines=machines,
+ local_listen_port=worker_address_to_port[worker],
+ num_machines=num_machines,
+ time_out=params.get('time_out', 120),
+ remote_socket=worker_to_socket_future.get(worker, None),
+ return_model=(worker == master_worker),
+ workers=[worker],
+ allow_other_workers=False,
+ pure=False,
+ **kwargs
+ )
+ for worker, list_of_parts in worker_map.items()
+ ]
+
+ results = client.gather(futures_classifiers)
+ results = [v for v in results if v]
+ model = results[0]
+
+ # if network parameters were changed during training, remove them from the
+ # returned model so that they're generated dynamically on every run based
+ # on the Dask cluster you're connected to and which workers have pieces of
+ # the training data
+ if not listen_port_in_params:
+ for param in _ConfigAliases.get('local_listen_port'):
+ model._other_params.pop(param, None)
+
+ if not machines_in_params:
+ for param in _ConfigAliases.get('machines'):
+ model._other_params.pop(param, None)
+
+ for param in _ConfigAliases.get('num_machines', 'timeout'):
+ model._other_params.pop(param, None)
+
+ return model
+
+
+def _predict_part(
+ part: _DaskPart,
+ model: LGBMModel,
+ raw_score: bool,
+ pred_proba: bool,
+ pred_leaf: bool,
+ pred_contrib: bool,
+ **kwargs: Any
+) -> _DaskPart:
+
+ result: _DaskPart
+ if part.shape[0] == 0:
+ result = np.array([])
+ elif pred_proba:
+ result = model.predict_proba(
+ part,
+ raw_score=raw_score,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ **kwargs
+ )
+ else:
+ result = model.predict(
+ part,
+ raw_score=raw_score,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ **kwargs
+ )
+
+ # dask.DataFrame.map_partitions() expects each call to return a pandas DataFrame or Series
+ if isinstance(part, pd_DataFrame):
+ if len(result.shape) == 2:
+ result = pd_DataFrame(result, index=part.index)
+ else:
+ result = pd_Series(result, index=part.index, name='predictions')
+
+ return result
+
+
+def _predict(
+ model: LGBMModel,
+ data: _DaskMatrixLike,
+ client: Client,
+ raw_score: bool = False,
+ pred_proba: bool = False,
+ pred_leaf: bool = False,
+ pred_contrib: bool = False,
+ dtype: _PredictionDtype = np.float32,
+ **kwargs: Any
+) -> Union[dask_Array, List[dask_Array]]:
+ """Inner predict routine.
+
+ Parameters
+ ----------
+ model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
+ Fitted underlying model.
+ data : Dask Array or Dask DataFrame of shape = [n_samples, n_features]
+ Input feature matrix.
+ raw_score : bool, optional (default=False)
+ Whether to predict raw scores.
+ pred_proba : bool, optional (default=False)
+ Should method return results of ``predict_proba`` (``pred_proba=True``) or ``predict`` (``pred_proba=False``).
+ pred_leaf : bool, optional (default=False)
+ Whether to predict leaf index.
+ pred_contrib : bool, optional (default=False)
+ Whether to predict feature contributions.
+ dtype : np.dtype, optional (default=np.float32)
+ Dtype of the output.
+ **kwargs
+ Other parameters passed to ``predict`` or ``predict_proba`` method.
+
+ Returns
+ -------
+ predicted_result : Dask Array of shape = [n_samples] or shape = [n_samples, n_classes]
+ The predicted values.
+ X_leaves : Dask Array of shape = [n_samples, n_trees] or shape = [n_samples, n_trees * n_classes]
+ If ``pred_leaf=True``, the predicted leaf of every tree for each sample.
+ X_SHAP_values : Dask Array of shape = [n_samples, n_features + 1] or shape = [n_samples, (n_features + 1) * n_classes] or (if multi-class and using sparse inputs) a list of ``n_classes`` Dask Arrays of shape = [n_samples, n_features + 1]
+ If ``pred_contrib=True``, the feature contributions for each sample.
+ """
+ if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)):
+ raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask')
+ if isinstance(data, dask_DataFrame):
+ return data.map_partitions(
+ _predict_part,
+ model=model,
+ raw_score=raw_score,
+ pred_proba=pred_proba,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ **kwargs
+ ).values
+ elif isinstance(data, dask_Array):
+ # for multi-class classification with sparse matrices, pred_contrib predictions
+ # are returned as a list of sparse matrices (one per class)
+ num_classes = model._n_classes
+
+ if (
+ num_classes > 2
+ and pred_contrib
+ and isinstance(data._meta, ss.spmatrix)
+ ):
+
+ predict_function = partial(
+ _predict_part,
+ model=model,
+ raw_score=False,
+ pred_proba=pred_proba,
+ pred_leaf=False,
+ pred_contrib=True,
+ **kwargs
+ )
+
+ delayed_chunks = data.to_delayed()
+ bag = dask_bag_from_delayed(delayed_chunks[:, 0])
+
+ @delayed
+ def _extract(items: List[Any], i: int) -> Any:
+ return items[i]
+
+ preds = bag.map_partitions(predict_function)
+
+ # pred_contrib output will have one column per feature,
+ # plus one more for the base value
+ num_cols = model.n_features_ + 1
+
+ nrows_per_chunk = data.chunks[0]
+ out: List[List[dask_Array]] = [[] for _ in range(num_classes)]
+
+ # need to tell Dask the expected type and shape of individual preds
+ pred_meta = data._meta
+
+ for j, partition in enumerate(preds.to_delayed()):
+ for i in range(num_classes):
+ part = dask_array_from_delayed(
+ value=_extract(partition, i),
+ shape=(nrows_per_chunk[j], num_cols),
+ meta=pred_meta
+ )
+ out[i].append(part)
+
+ # by default, dask.array.concatenate() concatenates sparse arrays into a COO matrix
+ # the code below is used instead to ensure that the sparse type is preserved during concatentation
+ if isinstance(pred_meta, ss.csr_matrix):
+ concat_fn = partial(ss.vstack, format='csr')
+ elif isinstance(pred_meta, ss.csc_matrix):
+ concat_fn = partial(ss.vstack, format='csc')
+ else:
+ concat_fn = ss.vstack
+
+ # At this point, `out` is a list of lists of delayeds (each of which points to a matrix).
+ # Concatenate them to return a list of Dask Arrays.
+ out_arrays: List[dask_Array] = []
+ for i in range(num_classes):
+ out_arrays.append(
+ dask_array_from_delayed(
+ value=delayed(concat_fn)(out[i]),
+ shape=(data.shape[0], num_cols),
+ meta=pred_meta
+ )
+ )
+
+ return out_arrays
+
+ data_row = client.compute(data[[0]]).result()
+ predict_fn = partial(
+ _predict_part,
+ model=model,
+ raw_score=raw_score,
+ pred_proba=pred_proba,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ **kwargs,
+ )
+ pred_row = predict_fn(data_row)
+ chunks: Tuple[int, ...] = (data.chunks[0],)
+ map_blocks_kwargs = {}
+ if len(pred_row.shape) > 1:
+ chunks += (pred_row.shape[1],)
+ else:
+ map_blocks_kwargs['drop_axis'] = 1
+ return data.map_blocks(
+ predict_fn,
+ chunks=chunks,
+ meta=pred_row,
+ dtype=dtype,
+ **map_blocks_kwargs,
+ )
+ else:
+ raise TypeError(f'Data must be either Dask Array or Dask DataFrame. Got {type(data).__name__}.')
+
+
+class _DaskLGBMModel:
+
+ @property
+ def client_(self) -> Client:
+ """:obj:`dask.distributed.Client`: Dask client.
+
+ This property can be passed in the constructor or updated
+ with ``model.set_params(client=client)``.
+ """
+ if not getattr(self, "fitted_", False):
+ raise LGBMNotFittedError('Cannot access property client_ before calling fit().')
+
+ return _get_dask_client(client=self.client)
+
+ def _lgb_dask_getstate(self) -> Dict[Any, Any]:
+ """Remove un-picklable attributes before serialization."""
+ client = self.__dict__.pop("client", None)
+ self._other_params.pop("client", None) # type: ignore[attr-defined]
+ out = deepcopy(self.__dict__)
+ out.update({"client": None})
+ self.client = client
+ return out
+
+ def _lgb_dask_fit(
+ self,
+ model_factory: Type[LGBMModel],
+ X: _DaskMatrixLike,
+ y: _DaskCollection,
+ sample_weight: Optional[_DaskVectorLike] = None,
+ init_score: Optional[_DaskCollection] = None,
+ group: Optional[_DaskVectorLike] = None,
+ eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
+ eval_names: Optional[List[str]] = None,
+ eval_sample_weight: Optional[List[_DaskVectorLike]] = None,
+ eval_class_weight: Optional[List[Union[dict, str]]] = None,
+ eval_init_score: Optional[List[_DaskCollection]] = None,
+ eval_group: Optional[List[_DaskVectorLike]] = None,
+ eval_metric: Optional[_LGBM_ScikitEvalMetricType] = None,
+ eval_at: Optional[Union[List[int], Tuple[int, ...]]] = None,
+ **kwargs: Any
+ ) -> "_DaskLGBMModel":
+ if not DASK_INSTALLED:
+ raise LightGBMError('dask is required for lightgbm.dask')
+ if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)):
+ raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask')
+
+ params = self.get_params(True) # type: ignore[attr-defined]
+ params.pop("client", None)
+
+ model = _train(
+ client=_get_dask_client(self.client),
+ data=X,
+ label=y,
+ params=params,
+ model_factory=model_factory,
+ sample_weight=sample_weight,
+ init_score=init_score,
+ group=group,
+ eval_set=eval_set,
+ eval_names=eval_names,
+ eval_sample_weight=eval_sample_weight,
+ eval_class_weight=eval_class_weight,
+ eval_init_score=eval_init_score,
+ eval_group=eval_group,
+ eval_metric=eval_metric,
+ eval_at=eval_at,
+ **kwargs
+ )
+
+ self.set_params(**model.get_params()) # type: ignore[attr-defined]
+ self._lgb_dask_copy_extra_params(model, self) # type: ignore[attr-defined]
+
+ return self
+
+ def _lgb_dask_to_local(self, model_factory: Type[LGBMModel]) -> LGBMModel:
+ params = self.get_params() # type: ignore[attr-defined]
+ params.pop("client", None)
+ model = model_factory(**params)
+ self._lgb_dask_copy_extra_params(self, model)
+ model._other_params.pop("client", None)
+ return model
+
+ @staticmethod
+ def _lgb_dask_copy_extra_params(source: Union["_DaskLGBMModel", LGBMModel], dest: Union["_DaskLGBMModel", LGBMModel]) -> None:
+ params = source.get_params() # type: ignore[union-attr]
+ attributes = source.__dict__
+ extra_param_names = set(attributes.keys()).difference(params.keys())
+ for name in extra_param_names:
+ setattr(dest, name, attributes[name])
+
+
+class DaskLGBMClassifier(LGBMClassifier, _DaskLGBMModel):
+ """Distributed version of lightgbm.LGBMClassifier."""
+
+ def __init__(
+ self,
+ boosting_type: str = 'gbdt',
+ num_leaves: int = 31,
+ max_depth: int = -1,
+ learning_rate: float = 0.1,
+ n_estimators: int = 100,
+ subsample_for_bin: int = 200000,
+ objective: Optional[Union[str, _LGBM_ScikitCustomObjectiveFunction]] = None,
+ class_weight: Optional[Union[dict, str]] = None,
+ min_split_gain: float = 0.,
+ min_child_weight: float = 1e-3,
+ min_child_samples: int = 20,
+ subsample: float = 1.,
+ subsample_freq: int = 0,
+ colsample_bytree: float = 1.,
+ reg_alpha: float = 0.,
+ reg_lambda: float = 0.,
+ random_state: Optional[Union[int, np.random.RandomState, 'np.random.Generator']] = None,
+ n_jobs: Optional[int] = None,
+ importance_type: str = 'split',
+ client: Optional[Client] = None,
+ **kwargs: Any
+ ):
+ """Docstring is inherited from the lightgbm.LGBMClassifier.__init__."""
+ self.client = client
+ super().__init__(
+ boosting_type=boosting_type,
+ num_leaves=num_leaves,
+ max_depth=max_depth,
+ learning_rate=learning_rate,
+ n_estimators=n_estimators,
+ subsample_for_bin=subsample_for_bin,
+ objective=objective,
+ class_weight=class_weight,
+ min_split_gain=min_split_gain,
+ min_child_weight=min_child_weight,
+ min_child_samples=min_child_samples,
+ subsample=subsample,
+ subsample_freq=subsample_freq,
+ colsample_bytree=colsample_bytree,
+ reg_alpha=reg_alpha,
+ reg_lambda=reg_lambda,
+ random_state=random_state,
+ n_jobs=n_jobs,
+ importance_type=importance_type,
+ **kwargs
+ )
+
+ _base_doc = LGBMClassifier.__init__.__doc__
+ _before_kwargs, _kwargs, _after_kwargs = _base_doc.partition('**kwargs') # type: ignore
+ __init__.__doc__ = f"""
+ {_before_kwargs}client : dask.distributed.Client or None, optional (default=None)
+ {' ':4}Dask client. If ``None``, ``distributed.default_client()`` will be used at runtime. The Dask client used by this class will not be saved if the model object is pickled.
+ {_kwargs}{_after_kwargs}
+ """
+
+ def __getstate__(self) -> Dict[Any, Any]:
+ return self._lgb_dask_getstate()
+
+ def fit( # type: ignore[override]
+ self,
+ X: _DaskMatrixLike,
+ y: _DaskCollection,
+ sample_weight: Optional[_DaskVectorLike] = None,
+ init_score: Optional[_DaskCollection] = None,
+ eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
+ eval_names: Optional[List[str]] = None,
+ eval_sample_weight: Optional[List[_DaskVectorLike]] = None,
+ eval_class_weight: Optional[List[Union[dict, str]]] = None,
+ eval_init_score: Optional[List[_DaskCollection]] = None,
+ eval_metric: Optional[_LGBM_ScikitEvalMetricType] = None,
+ **kwargs: Any
+ ) -> "DaskLGBMClassifier":
+ """Docstring is inherited from the lightgbm.LGBMClassifier.fit."""
+ self._lgb_dask_fit(
+ model_factory=LGBMClassifier,
+ X=X,
+ y=y,
+ sample_weight=sample_weight,
+ init_score=init_score,
+ eval_set=eval_set,
+ eval_names=eval_names,
+ eval_sample_weight=eval_sample_weight,
+ eval_class_weight=eval_class_weight,
+ eval_init_score=eval_init_score,
+ eval_metric=eval_metric,
+ **kwargs
+ )
+ return self
+
+ _base_doc = _lgbmmodel_doc_fit.format(
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
+ sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
+ init_score_shape="Dask Array or Dask Series of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task), or Dask Array or Dask DataFrame of shape = [n_samples, n_classes] (for multi-class task), or None, optional (default=None)",
+ group_shape="Dask Array or Dask Series or None, optional (default=None)",
+ eval_sample_weight_shape="list of Dask Array or Dask Series, or None, optional (default=None)",
+ eval_init_score_shape="list of Dask Array, Dask Series or Dask DataFrame (for multi-class task), or None, optional (default=None)",
+ eval_group_shape="list of Dask Array or Dask Series, or None, optional (default=None)"
+ )
+
+ # DaskLGBMClassifier does not support group, eval_group.
+ _base_doc = (_base_doc[:_base_doc.find('group :')]
+ + _base_doc[_base_doc.find('eval_set :'):])
+
+ _base_doc = (_base_doc[:_base_doc.find('eval_group :')]
+ + _base_doc[_base_doc.find('eval_metric :'):])
+
+ # DaskLGBMClassifier support for callbacks and init_model is not tested
+ fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
+ Other parameters passed through to ``LGBMClassifier.fit()``.
+
+ Returns
+ -------
+ self : lightgbm.DaskLGBMClassifier
+ Returns self.
+
+ {_lgbmmodel_doc_custom_eval_note}
+ """
+
+ def predict(
+ self,
+ X: _DaskMatrixLike, # type: ignore[override]
+ raw_score: bool = False,
+ start_iteration: int = 0,
+ num_iteration: Optional[int] = None,
+ pred_leaf: bool = False,
+ pred_contrib: bool = False,
+ validate_features: bool = False,
+ **kwargs: Any
+ ) -> dask_Array:
+ """Docstring is inherited from the lightgbm.LGBMClassifier.predict."""
+ return _predict(
+ model=self.to_local(),
+ data=X,
+ dtype=self.classes_.dtype,
+ client=_get_dask_client(self.client),
+ raw_score=raw_score,
+ start_iteration=start_iteration,
+ num_iteration=num_iteration,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ validate_features=validate_features,
+ **kwargs
+ )
+
+ predict.__doc__ = _lgbmmodel_doc_predict.format(
+ description="Return the predicted value for each sample.",
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ output_name="predicted_result",
+ predicted_result_shape="Dask Array of shape = [n_samples] or shape = [n_samples, n_classes]",
+ X_leaves_shape="Dask Array of shape = [n_samples, n_trees] or shape = [n_samples, n_trees * n_classes]",
+ X_SHAP_values_shape="Dask Array of shape = [n_samples, n_features + 1] or shape = [n_samples, (n_features + 1) * n_classes] or (if multi-class and using sparse inputs) a list of ``n_classes`` Dask Arrays of shape = [n_samples, n_features + 1]"
+ )
+
+ def predict_proba(
+ self,
+ X: _DaskMatrixLike, # type: ignore[override]
+ raw_score: bool = False,
+ start_iteration: int = 0,
+ num_iteration: Optional[int] = None,
+ pred_leaf: bool = False,
+ pred_contrib: bool = False,
+ validate_features: bool = False,
+ **kwargs: Any
+ ) -> dask_Array:
+ """Docstring is inherited from the lightgbm.LGBMClassifier.predict_proba."""
+ return _predict(
+ model=self.to_local(),
+ data=X,
+ pred_proba=True,
+ client=_get_dask_client(self.client),
+ raw_score=raw_score,
+ start_iteration=start_iteration,
+ num_iteration=num_iteration,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ validate_features=validate_features,
+ **kwargs
+ )
+
+ predict_proba.__doc__ = _lgbmmodel_doc_predict.format(
+ description="Return the predicted probability for each class for each sample.",
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ output_name="predicted_probability",
+ predicted_result_shape="Dask Array of shape = [n_samples] or shape = [n_samples, n_classes]",
+ X_leaves_shape="Dask Array of shape = [n_samples, n_trees] or shape = [n_samples, n_trees * n_classes]",
+ X_SHAP_values_shape="Dask Array of shape = [n_samples, n_features + 1] or shape = [n_samples, (n_features + 1) * n_classes] or (if multi-class and using sparse inputs) a list of ``n_classes`` Dask Arrays of shape = [n_samples, n_features + 1]"
+ )
+
+ def to_local(self) -> LGBMClassifier:
+ """Create regular version of lightgbm.LGBMClassifier from the distributed version.
+
+ Returns
+ -------
+ model : lightgbm.LGBMClassifier
+ Local underlying model.
+ """
+ return self._lgb_dask_to_local(LGBMClassifier)
+
+
+class DaskLGBMRegressor(LGBMRegressor, _DaskLGBMModel):
+ """Distributed version of lightgbm.LGBMRegressor."""
+
+ def __init__(
+ self,
+ boosting_type: str = 'gbdt',
+ num_leaves: int = 31,
+ max_depth: int = -1,
+ learning_rate: float = 0.1,
+ n_estimators: int = 100,
+ subsample_for_bin: int = 200000,
+ objective: Optional[Union[str, _LGBM_ScikitCustomObjectiveFunction]] = None,
+ class_weight: Optional[Union[dict, str]] = None,
+ min_split_gain: float = 0.,
+ min_child_weight: float = 1e-3,
+ min_child_samples: int = 20,
+ subsample: float = 1.,
+ subsample_freq: int = 0,
+ colsample_bytree: float = 1.,
+ reg_alpha: float = 0.,
+ reg_lambda: float = 0.,
+ random_state: Optional[Union[int, np.random.RandomState, 'np.random.Generator']] = None,
+ n_jobs: Optional[int] = None,
+ importance_type: str = 'split',
+ client: Optional[Client] = None,
+ **kwargs: Any
+ ):
+ """Docstring is inherited from the lightgbm.LGBMRegressor.__init__."""
+ self.client = client
+ super().__init__(
+ boosting_type=boosting_type,
+ num_leaves=num_leaves,
+ max_depth=max_depth,
+ learning_rate=learning_rate,
+ n_estimators=n_estimators,
+ subsample_for_bin=subsample_for_bin,
+ objective=objective,
+ class_weight=class_weight,
+ min_split_gain=min_split_gain,
+ min_child_weight=min_child_weight,
+ min_child_samples=min_child_samples,
+ subsample=subsample,
+ subsample_freq=subsample_freq,
+ colsample_bytree=colsample_bytree,
+ reg_alpha=reg_alpha,
+ reg_lambda=reg_lambda,
+ random_state=random_state,
+ n_jobs=n_jobs,
+ importance_type=importance_type,
+ **kwargs
+ )
+
+ _base_doc = LGBMRegressor.__init__.__doc__
+ _before_kwargs, _kwargs, _after_kwargs = _base_doc.partition('**kwargs') # type: ignore
+ __init__.__doc__ = f"""
+ {_before_kwargs}client : dask.distributed.Client or None, optional (default=None)
+ {' ':4}Dask client. If ``None``, ``distributed.default_client()`` will be used at runtime. The Dask client used by this class will not be saved if the model object is pickled.
+ {_kwargs}{_after_kwargs}
+ """
+
+ def __getstate__(self) -> Dict[Any, Any]:
+ return self._lgb_dask_getstate()
+
+ def fit( # type: ignore[override]
+ self,
+ X: _DaskMatrixLike,
+ y: _DaskCollection,
+ sample_weight: Optional[_DaskVectorLike] = None,
+ init_score: Optional[_DaskVectorLike] = None,
+ eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
+ eval_names: Optional[List[str]] = None,
+ eval_sample_weight: Optional[List[_DaskVectorLike]] = None,
+ eval_init_score: Optional[List[_DaskVectorLike]] = None,
+ eval_metric: Optional[_LGBM_ScikitEvalMetricType] = None,
+ **kwargs: Any
+ ) -> "DaskLGBMRegressor":
+ """Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
+ self._lgb_dask_fit(
+ model_factory=LGBMRegressor,
+ X=X,
+ y=y,
+ sample_weight=sample_weight,
+ init_score=init_score,
+ eval_set=eval_set,
+ eval_names=eval_names,
+ eval_sample_weight=eval_sample_weight,
+ eval_init_score=eval_init_score,
+ eval_metric=eval_metric,
+ **kwargs
+ )
+ return self
+
+ _base_doc = _lgbmmodel_doc_fit.format(
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
+ sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
+ init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
+ group_shape="Dask Array or Dask Series or None, optional (default=None)",
+ eval_sample_weight_shape="list of Dask Array or Dask Series, or None, optional (default=None)",
+ eval_init_score_shape="list of Dask Array or Dask Series, or None, optional (default=None)",
+ eval_group_shape="list of Dask Array or Dask Series, or None, optional (default=None)"
+ )
+
+ # DaskLGBMRegressor does not support group, eval_class_weight, eval_group.
+ _base_doc = (_base_doc[:_base_doc.find('group :')]
+ + _base_doc[_base_doc.find('eval_set :'):])
+
+ _base_doc = (_base_doc[:_base_doc.find('eval_class_weight :')]
+ + _base_doc[_base_doc.find('eval_init_score :'):])
+
+ _base_doc = (_base_doc[:_base_doc.find('eval_group :')]
+ + _base_doc[_base_doc.find('eval_metric :'):])
+
+ # DaskLGBMRegressor support for callbacks and init_model is not tested
+ fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
+ Other parameters passed through to ``LGBMRegressor.fit()``.
+
+ Returns
+ -------
+ self : lightgbm.DaskLGBMRegressor
+ Returns self.
+
+ {_lgbmmodel_doc_custom_eval_note}
+ """
+
+ def predict(
+ self,
+ X: _DaskMatrixLike, # type: ignore[override]
+ raw_score: bool = False,
+ start_iteration: int = 0,
+ num_iteration: Optional[int] = None,
+ pred_leaf: bool = False,
+ pred_contrib: bool = False,
+ validate_features: bool = False,
+ **kwargs: Any
+ ) -> dask_Array:
+ """Docstring is inherited from the lightgbm.LGBMRegressor.predict."""
+ return _predict(
+ model=self.to_local(),
+ data=X,
+ client=_get_dask_client(self.client),
+ raw_score=raw_score,
+ start_iteration=start_iteration,
+ num_iteration=num_iteration,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ validate_features=validate_features,
+ **kwargs
+ )
+
+ predict.__doc__ = _lgbmmodel_doc_predict.format(
+ description="Return the predicted value for each sample.",
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ output_name="predicted_result",
+ predicted_result_shape="Dask Array of shape = [n_samples]",
+ X_leaves_shape="Dask Array of shape = [n_samples, n_trees]",
+ X_SHAP_values_shape="Dask Array of shape = [n_samples, n_features + 1]"
+ )
+
+ def to_local(self) -> LGBMRegressor:
+ """Create regular version of lightgbm.LGBMRegressor from the distributed version.
+
+ Returns
+ -------
+ model : lightgbm.LGBMRegressor
+ Local underlying model.
+ """
+ return self._lgb_dask_to_local(LGBMRegressor)
+
+
+class DaskLGBMRanker(LGBMRanker, _DaskLGBMModel):
+ """Distributed version of lightgbm.LGBMRanker."""
+
+ def __init__(
+ self,
+ boosting_type: str = 'gbdt',
+ num_leaves: int = 31,
+ max_depth: int = -1,
+ learning_rate: float = 0.1,
+ n_estimators: int = 100,
+ subsample_for_bin: int = 200000,
+ objective: Optional[Union[str, _LGBM_ScikitCustomObjectiveFunction]] = None,
+ class_weight: Optional[Union[dict, str]] = None,
+ min_split_gain: float = 0.,
+ min_child_weight: float = 1e-3,
+ min_child_samples: int = 20,
+ subsample: float = 1.,
+ subsample_freq: int = 0,
+ colsample_bytree: float = 1.,
+ reg_alpha: float = 0.,
+ reg_lambda: float = 0.,
+ random_state: Optional[Union[int, np.random.RandomState, 'np.random.Generator']] = None,
+ n_jobs: Optional[int] = None,
+ importance_type: str = 'split',
+ client: Optional[Client] = None,
+ **kwargs: Any
+ ):
+ """Docstring is inherited from the lightgbm.LGBMRanker.__init__."""
+ self.client = client
+ super().__init__(
+ boosting_type=boosting_type,
+ num_leaves=num_leaves,
+ max_depth=max_depth,
+ learning_rate=learning_rate,
+ n_estimators=n_estimators,
+ subsample_for_bin=subsample_for_bin,
+ objective=objective,
+ class_weight=class_weight,
+ min_split_gain=min_split_gain,
+ min_child_weight=min_child_weight,
+ min_child_samples=min_child_samples,
+ subsample=subsample,
+ subsample_freq=subsample_freq,
+ colsample_bytree=colsample_bytree,
+ reg_alpha=reg_alpha,
+ reg_lambda=reg_lambda,
+ random_state=random_state,
+ n_jobs=n_jobs,
+ importance_type=importance_type,
+ **kwargs
+ )
+
+ _base_doc = LGBMRanker.__init__.__doc__
+ _before_kwargs, _kwargs, _after_kwargs = _base_doc.partition('**kwargs') # type: ignore
+ __init__.__doc__ = f"""
+ {_before_kwargs}client : dask.distributed.Client or None, optional (default=None)
+ {' ':4}Dask client. If ``None``, ``distributed.default_client()`` will be used at runtime. The Dask client used by this class will not be saved if the model object is pickled.
+ {_kwargs}{_after_kwargs}
+ """
+
+ def __getstate__(self) -> Dict[Any, Any]:
+ return self._lgb_dask_getstate()
+
+ def fit( # type: ignore[override]
+ self,
+ X: _DaskMatrixLike,
+ y: _DaskCollection,
+ sample_weight: Optional[_DaskVectorLike] = None,
+ init_score: Optional[_DaskVectorLike] = None,
+ group: Optional[_DaskVectorLike] = None,
+ eval_set: Optional[List[Tuple[_DaskMatrixLike, _DaskCollection]]] = None,
+ eval_names: Optional[List[str]] = None,
+ eval_sample_weight: Optional[List[_DaskVectorLike]] = None,
+ eval_init_score: Optional[List[_DaskVectorLike]] = None,
+ eval_group: Optional[List[_DaskVectorLike]] = None,
+ eval_metric: Optional[_LGBM_ScikitEvalMetricType] = None,
+ eval_at: Union[List[int], Tuple[int, ...]] = (1, 2, 3, 4, 5),
+ **kwargs: Any
+ ) -> "DaskLGBMRanker":
+ """Docstring is inherited from the lightgbm.LGBMRanker.fit."""
+ self._lgb_dask_fit(
+ model_factory=LGBMRanker,
+ X=X,
+ y=y,
+ sample_weight=sample_weight,
+ init_score=init_score,
+ group=group,
+ eval_set=eval_set,
+ eval_names=eval_names,
+ eval_sample_weight=eval_sample_weight,
+ eval_init_score=eval_init_score,
+ eval_group=eval_group,
+ eval_metric=eval_metric,
+ eval_at=eval_at,
+ **kwargs
+ )
+ return self
+
+ _base_doc = _lgbmmodel_doc_fit.format(
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]",
+ sample_weight_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
+ init_score_shape="Dask Array or Dask Series of shape = [n_samples] or None, optional (default=None)",
+ group_shape="Dask Array or Dask Series or None, optional (default=None)",
+ eval_sample_weight_shape="list of Dask Array or Dask Series, or None, optional (default=None)",
+ eval_init_score_shape="list of Dask Array or Dask Series, or None, optional (default=None)",
+ eval_group_shape="list of Dask Array or Dask Series, or None, optional (default=None)"
+ )
+
+ # DaskLGBMRanker does not support eval_class_weight or early stopping
+ _base_doc = (_base_doc[:_base_doc.find('eval_class_weight :')]
+ + _base_doc[_base_doc.find('eval_init_score :'):])
+
+ _base_doc = (_base_doc[:_base_doc.find('feature_name :')]
+ + "eval_at : list or tuple of int, optional (default=(1, 2, 3, 4, 5))\n"
+ + f"{' ':8}The evaluation positions of the specified metric.\n"
+ + f"{' ':4}{_base_doc[_base_doc.find('feature_name :'):]}")
+
+ # DaskLGBMRanker support for callbacks and init_model is not tested
+ fit.__doc__ = f"""{_base_doc[:_base_doc.find('callbacks :')]}**kwargs
+ Other parameters passed through to ``LGBMRanker.fit()``.
+
+ Returns
+ -------
+ self : lightgbm.DaskLGBMRanker
+ Returns self.
+
+ {_lgbmmodel_doc_custom_eval_note}
+ """
+
+ def predict(
+ self,
+ X: _DaskMatrixLike, # type: ignore[override]
+ raw_score: bool = False,
+ start_iteration: int = 0,
+ num_iteration: Optional[int] = None,
+ pred_leaf: bool = False,
+ pred_contrib: bool = False,
+ validate_features: bool = False,
+ **kwargs: Any
+ ) -> dask_Array:
+ """Docstring is inherited from the lightgbm.LGBMRanker.predict."""
+ return _predict(
+ model=self.to_local(),
+ data=X,
+ client=_get_dask_client(self.client),
+ raw_score=raw_score,
+ start_iteration=start_iteration,
+ num_iteration=num_iteration,
+ pred_leaf=pred_leaf,
+ pred_contrib=pred_contrib,
+ validate_features=validate_features,
+ **kwargs
+ )
+
+ predict.__doc__ = _lgbmmodel_doc_predict.format(
+ description="Return the predicted value for each sample.",
+ X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]",
+ output_name="predicted_result",
+ predicted_result_shape="Dask Array of shape = [n_samples]",
+ X_leaves_shape="Dask Array of shape = [n_samples, n_trees]",
+ X_SHAP_values_shape="Dask Array of shape = [n_samples, n_features + 1]"
+ )
+
+ def to_local(self) -> LGBMRanker:
+ """Create regular version of lightgbm.LGBMRanker from the distributed version.
+
+ Returns
+ -------
+ model : lightgbm.LGBMRanker
+ Local underlying model.
+ """
+ return self._lgb_dask_to_local(LGBMRanker)