Source code for skutil.h2o.split

from __future__ import division, print_function, absolute_import
import numbers
import warnings
from abc import ABCMeta, abstractmethod
import numpy as np
from .base import check_frame
from skutil.base import overrides
from sklearn.externals import six
from sklearn.base import _pprint
from sklearn.utils.fixes import signature, bincount
from sklearn.utils import check_random_state
from math import ceil, floor

try:
    from h2o import H2OEstimator
except ImportError:
    from h2o.estimators.estimator_base import H2OEstimator

try:
    from sklearn.model_selection import KFold
    SK18 = True
except ImportError:
    from sklearn.cross_validation import KFold
    SK18 = False

__all__ = [
    'check_cv',
    'h2o_train_test_split',
    'H2OKFold',
    'H2OShuffleSplit',
    'H2OStratifiedKFold',
    'H2OStratifiedShuffleSplit'
]


def _build_repr(self):
    # XXX This is copied from sklearn.BaseEstimator's get_params
    cls = self.__class__
    init = getattr(cls.__init__, 'deprecated_original', cls.__init__)

    init_signature = signature(init)

    if init is object.__init__:
        args = []
    else:
        args = sorted([p.name for p in init_signature.parameters.values()
                       if p.name != 'self' and p.kind != p.VAR_KEYWORD])

    class_name = self.__class__.__name__
    params = dict()
    for key in args:
        warnings.simplefilter("always", DeprecationWarning)
        try:
            with warnings.catch_warnings(record=True) as w:
                value = getattr(self, key, None)
            if len(w) and w[0].category == DeprecationWarning:
                continue
        finally:
            warnings.filters.pop(0)
        params[key] = value

    return '%s(%s)' % (class_name, _pprint(params, offset=len(class_name)))


[docs]def check_cv(cv=3): """Checks the ``cv`` parameter to determine whether it's a valid int or H2OBaseCrossValidator. Parameters ---------- cv : int or H2OBaseCrossValidator, optional (default=3) The number of folds or the H2OBaseCrossValidator instance. Returns ------- cv : H2OBaseCrossValidator The instance of H2OBaseCrossValidator """ if cv is None: cv = 3 if isinstance(cv, numbers.Integral): return H2OKFold(cv) if not isinstance(cv, H2OBaseCrossValidator): raise ValueError('expected int or instance of ' 'H2OBaseCrossValidator but got %s' % type(cv)) return cv
[docs]def h2o_train_test_split(frame, test_size=None, train_size=None, random_state=None, stratify=None): """Splits an H2OFrame into random train and test subsets Parameters ---------- frame : H2OFrame The h2o frame to split test_size : float, int, or None (default=None) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is automatically set to the complement of the train size. If train size is also None, test size is set to 0.25 train_size : float, int, or None (default=None) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int or RandomState Pseudo-random number generator state used for random sampling. stratify : str or None (default=None) The name of the target on which to stratify the sampling Returns ------- out : tuple, shape=(2,) training_frame : H2OFrame The training fold split testing_frame : H2OFrame The testing fold split """ frame = check_frame(frame, copy=False) if test_size is None and train_size is None: test_size = 0.25 if stratify is not None: CVClass = H2OStratifiedShuffleSplit else: CVClass = H2OShuffleSplit cv = CVClass(n_splits=2, test_size=test_size, train_size=train_size, random_state=random_state) # for the h2o one, we only need iter 0 tr_te_tuples = [(tr, te) for tr, te in cv.split(frame, stratify)][0] # h2o "doesn't reorder rows" so we need to keep these sorted... train, test = sorted(list(tr_te_tuples[0])), sorted(list(tr_te_tuples[1])) out = ( frame[train, :], frame[test, :] ) return out
# Avoid a pb with nosetests... h2o_train_test_split.__test__ = False def _val_y(y): if isinstance(y, six.string_types): return str(y) elif y is None: return y raise TypeError('y must be a string. Got %s' % y) class H2OBaseCrossValidator(six.with_metaclass(ABCMeta)): """Base class for H2O cross validation operations. All implementing subclasses should override ``get_n_splits`` and ``_iter_test_indices``. """ def __init__(self): pass def split(self, frame, y=None): """Generate indices to split data into training and test. Parameters ---------- frame : ``H2OFrame`` The h2o frame to split y : str, optional (default=None) The name of the column to stratify, if applicable. Returns ------- train : ndarray The training set indices for the split test : ndarray The testing set indices for that split """ frame = check_frame(frame, copy=False) indices = np.arange(frame.shape[0]) for test_index in self._iter_test_masks(frame, y): train_index = indices[np.logical_not(test_index)] test_index = indices[test_index] # h2o can't handle anything but lists... yield list(train_index), list(test_index) def _iter_test_masks(self, frame, y=None): """Generates boolean masks corresponding to the tests set. Parameters ---------- frame : H2OFrame The h2o frame to split y : string, optional (default=None) The column to stratify. Returns ------- test_mask : np.ndarray, shape=(n_samples,) The indices for the test split """ for test_index in self._iter_test_indices(frame, y): test_mask = np.zeros(frame.shape[0], dtype=np.bool) test_mask[test_index] = True yield test_mask def _iter_test_indices(self, frame, y=None): raise NotImplementedError('this method must be implemented by a subclass') @abstractmethod def get_n_splits(self): """Get the number of splits or folds for this instance of the cross validator. """ pass def __repr__(self): return _build_repr(self) def _validate_shuffle_split_init(test_size, train_size): """Validation helper to check the test_size and train_size at init""" if test_size is None and train_size is None: raise ValueError('test_size and train_size can not both be None') if test_size is not None: if np.asarray(test_size).dtype.kind == 'f': if test_size >= 1.: raise ValueError( 'test_size=%f should be smaller ' 'than 1.0 or be an integer' % test_size) elif np.asarray(test_size).dtype.kind != 'i': raise ValueError('Invalid value for test_size: %r' % test_size) if train_size is not None: if np.asarray(train_size).dtype.kind == 'f': if train_size >= 1.: raise ValueError( 'train_size=%f should be smaller ' 'than 1.0 or be an integer' % test_size) elif (np.asarray(test_size).dtype.kind == 'f' and (train_size + test_size) > 1.): raise ValueError('The sum of test_size and train_size = %f' 'should be smaller than 1.0. Reduce test_size ' 'and/or train_size.' % (train_size + test_size)) elif np.asarray(train_size).dtype.kind != 'i': raise ValueError('Invalid value for train_size: %r' % train_size) def _validate_shuffle_split(n_samples, test_size, train_size): if test_size is not None and np.asarray(test_size).dtype.kind == 'i' and test_size >= n_samples: raise ValueError('test_size=%d should be smaller ' 'than the number of samples %d' % (test_size, n_samples)) if train_size is not None and np.asarray(train_size).dtype.kind == 'i' and train_size >= n_samples: raise ValueError('train_size=%d should be smaller ' 'than the number of samples %d' % (train_size, n_samples)) if np.asarray(test_size).dtype.kind == 'f': n_test = ceil(test_size * n_samples) elif np.asarray(test_size).dtype.kind == 'i': n_test = float(test_size) if train_size is None: n_train = n_samples - n_test elif np.asarray(train_size).dtype.kind == 'f': n_train = floor(train_size * n_samples) else: n_train = float(train_size) if test_size is None: n_test = n_samples - n_train if n_train + n_test > n_samples: raise ValueError('The sum of train_size and test_size=%d, ' 'should be smaller than the number of ' 'samples %d. Reduce test_size and/or ' 'train_size.' % (n_train + n_test, n_samples)) return int(n_train), int(n_test) class H2OBaseShuffleSplit(six.with_metaclass(ABCMeta)): """Base class for H2OShuffleSplit and H2OStratifiedShuffleSplit. This is used for ``h2o_train_test_split`` in strategic train/test splits of H2OFrames. Implementing subclasses should override ``_iter_indices``. Parameters ---------- n_splits : int, optional (default=2) The number of folds or splits in the split test_size : float or int, optional (default=0.1) The ratio of observations for the test fold train_size : float or int, optional (default=None) The ratio of observations for the train fold random_state : int or RandomState, optional (default=None) The random state for duplicative purposes. """ def __init__(self, n_splits=2, test_size=0.1, train_size=None, random_state=None): _validate_shuffle_split_init(test_size, train_size) self.n_splits = n_splits self.test_size = test_size self.train_size = train_size self.random_state = random_state def split(self, frame, y=None): """Split the frame. Parameters ---------- frame : H2OFrame The frame to split y : string, optional (default=None) The column to stratify. """ for train, test in self._iter_indices(frame, y): yield train, test @abstractmethod def _iter_indices(self, frame, y): """Abstract method for iterating the indices. Parameters ---------- frame : H2OFrame The frame to split y : string, optional (default=None) The column to stratify. """ pass def get_n_splits(self): """Get the number of splits or folds for this instance of the shuffle split. """ return self.n_splits def __repr__(self): return _build_repr(self)
[docs]class H2OShuffleSplit(H2OBaseShuffleSplit): """Default shuffle splitter used for ``h2o_train_test_split``. This shuffle split class will not perform any stratification, and will simply shuffle indices and split into the number of specified sub-frames. """ def _iter_indices(self, frame, y=None): """Iterate the indices. Parameters ---------- frame : H2OFrame The frame to split y : string, optional (default=None) The column to stratify. Since this class does not perform stratification, ``y`` is unused. Returns ------- ind_train : np.ndarray, shape=(n_samples,) The train indices ind_test : np.ndarray, shape=(n_samples,) The test indices """ n_samples = frame.shape[0] n_train, n_test = _validate_shuffle_split(n_samples, self.test_size, self.train_size) rng = check_random_state(self.random_state) for i in range(self.n_splits): permutation = rng.permutation(n_samples) ind_test = permutation[:n_test] ind_train = permutation[n_test:(n_test + n_train)] yield ind_train, ind_test
[docs]class H2OStratifiedShuffleSplit(H2OBaseShuffleSplit): """Shuffle splitter used for ``h2o_train_test_split`` when stratified option is specified. This shuffle split class will perform stratification. """ def _iter_indices(self, frame, y): """Iterate the indices with stratification. Parameters ---------- frame : H2OFrame The frame to split y : string The column to stratify. Returns ------- train : np.ndarray, shape=(n_samples,) The train indices test : np.ndarray, shape=(n_samples,) The test indices """ n_samples = frame.shape[0] n_train, n_test = _validate_shuffle_split(n_samples, self.test_size, self.train_size) # need to validate y... y = _val_y(y) target = np.asarray(frame[y].as_data_frame(use_pandas=True)[y].tolist()) classes, y_indices = np.unique(target, return_inverse=True) n_classes = classes.shape[0] class_counts = bincount(y_indices) if np.min(class_counts) < 2: raise ValueError('The least populated class in y has only 1 ' 'member, which is too few. The minimum number of labels ' 'for any class cannot be less than 2.') if n_train < n_classes: raise ValueError('The train_size=%d should be greater than or ' 'equal to the number of classes=%d' % (n_train, n_classes)) if n_test < n_classes: raise ValueError('The test_size=%d should be greater than or ' 'equal to the number of classes=%d' % (n_test, n_classes)) rng = check_random_state(self.random_state) p_i = class_counts / float(n_samples) n_i = np.round(n_train * p_i).astype(int) t_i = np.minimum(class_counts - n_i, np.round(n_test * p_i).astype(int)) for _ in range(self.n_splits): train = [] test = [] for i, class_i in enumerate(classes): permutation = rng.permutation(class_counts[i]) perm_indices_class_i = np.where((target == class_i))[0][permutation] train.extend(perm_indices_class_i[:n_i[i]]) test.extend(perm_indices_class_i[n_i[i]:n_i[i] + t_i[i]]) # Might end up here with less samples in train and test than we asked # for, due to rounding errors. if len(train) + len(test) < n_train + n_test: missing_indices = np.where(bincount(train + test, minlength=len(target)) == 0)[0] missing_indices = rng.permutation(missing_indices) n_missing_train = n_train - len(train) n_missing_test = n_test - len(test) if n_missing_train > 0: train.extend(missing_indices[:n_missing_train]) if n_missing_test > 0: test.extend(missing_indices[-n_missing_test:]) train = rng.permutation(train) test = rng.permutation(test) yield train, test
[docs] def split(self, frame, y): """Split the frame with stratification. Parameters ---------- frame : H2OFrame The frame to split y : string The column to stratify. """ return super(H2OStratifiedShuffleSplit, self).split(frame, y)
class _H2OBaseKFold(six.with_metaclass(ABCMeta, H2OBaseCrossValidator)): """Base class for KFold and Stratified KFold. Parameters ---------- n_folds : int The number of splits shuffle : bool Whether to shuffle indices random_state : int or RandomState The random state for the split """ @abstractmethod def __init__(self, n_folds, shuffle, random_state): if not isinstance(n_folds, numbers.Integral): raise ValueError('n_folds must be of Integral type. ' '%s of type %s was passed' % (n_folds, type(n_folds))) n_folds = int(n_folds) if n_folds <= 1: raise ValueError('k-fold cross-validation requires at least one ' 'train/test split by setting n_folds=2 or more') if shuffle not in [True, False]: raise TypeError('shuffle must be True or False. Got %s (type=%s)' % (str(shuffle), type(shuffle))) self.n_folds = n_folds self.shuffle = shuffle self.random_state = random_state @overrides(H2OBaseCrossValidator) def split(self, frame, y=None): """Split the frame. Parameters ---------- frame : H2OFrame The frame to split y : string, optional (default=None) The column to stratify. """ frame = check_frame(frame, copy=False) n_obs = frame.shape[0] if self.n_folds > n_obs: raise ValueError('Cannot have n_folds greater than n_obs') for train, test in super(_H2OBaseKFold, self).split(frame, y): yield train, test @overrides(H2OBaseCrossValidator) def get_n_splits(self): """Get the number of splits or folds. Returns ------- n_folds : int The number of folds """ return self.n_folds
[docs]class H2OKFold(_H2OBaseKFold): """K-folds cross-validator for an H2OFrame. Parameters ---------- n_folds : int, optional (default=3) The number of splits shuffle : bool, optional (default=False) Whether to shuffle indices random_state : int or RandomState, optional (default=None) The random state for the split """ def __init__(self, n_folds=3, shuffle=False, random_state=None): super(H2OKFold, self).__init__(n_folds, shuffle, random_state) @overrides(_H2OBaseKFold) def _iter_test_indices(self, frame, y=None): n_obs = frame.shape[0] indices = np.arange(n_obs) if self.shuffle: check_random_state(self.random_state).shuffle(indices) n_folds = self.n_folds fold_sizes = (n_obs // n_folds) * np.ones(n_folds, dtype=np.int) fold_sizes[:n_obs % n_folds] += 1 current = 0 for fold_size in fold_sizes: start, stop = current, current + fold_size yield indices[start:stop] current = stop
[docs]class H2OStratifiedKFold(_H2OBaseKFold): """K-folds cross-validator for an H2OFrame with stratified splits. Parameters ---------- n_folds : int, optional (default=3) The number of splits shuffle : bool, optional (default=False) Whether to shuffle indices random_state : int or RandomState, optional (default=None) The random state for the split """ def __init__(self, n_folds=3, shuffle=False, random_state=None): super(H2OStratifiedKFold, self).__init__(n_folds, shuffle, random_state)
[docs] def split(self, frame, y): """Split the frame with stratification. Parameters ---------- frame : H2OFrame The frame to split y : string The column to stratify. """ return super(H2OStratifiedKFold, self).split(frame, y)
def _iter_test_masks(self, frame, y): test_folds = self._make_test_folds(frame, y) for i in range(self.n_folds): yield test_folds == i def _make_test_folds(self, frame, y): if self.shuffle: rng = check_random_state(self.random_state) else: rng = self.random_state # validate that it's a string y = _val_y(y) # gets a string back or None if y is None: raise ValueError('H2OStratifiedKFold requires a target name (got None)') target = frame[y].as_data_frame(use_pandas=True)[y].values n_samples = target.shape[0] unique_y, y_inversed = np.unique(target, return_inverse=True) y_counts = bincount(y_inversed) min_labels = np.min(y_counts) if np.all(self.n_folds > y_counts): raise ValueError(('All the n_labels for individual classes' ' are less than %d folds.' % self.n_folds), Warning) if self.n_folds > min_labels: warnings.warn(('The least populated class in y has only %d' ' members, which is too few. The minimum' ' number of labels for any class cannot' ' be less than n_folds=%d.' % (min_labels, self.n_folds)), Warning) # NOTE FROM SKLEARN: # pre-assign each sample to a test fold index using individual KFold # splitting strategies for each class so as to respect the balance of # classes # NOTE: Passing the data corresponding to ith class say X[y==class_i] # will break when the data is not 100% stratifiable for all classes. # So we pass np.zeroes(max(c, n_folds)) as data to the KFold. # Remember, however that we might be using the old-fold KFold which doesn't # have a split method... if SK18: per_cls_cvs = [ KFold(self.n_folds, # using sklearn's KFold here shuffle=self.shuffle, random_state=rng).split(np.zeros(max(count, self.n_folds))) for count in y_counts ] else: per_cls_cvs = [ KFold(max(count, self.n_folds), # using sklearn's KFold here self.n_folds, shuffle=self.shuffle, random_state=rng) for count in y_counts ] test_folds = np.zeros(n_samples, dtype=np.int) for test_fold_indices, per_cls_splits in enumerate(zip(*per_cls_cvs)): for cls, (_, test_split) in zip(unique_y, per_cls_splits): cls_test_folds = test_folds[target == cls] # the test split can be too big because we used # KFold(...).split(X[:max(c, n_folds)]) when data is not 100% # stratifiable for all the classes # (we use a warning instead of raising an exception) # If this is the case, let's trim it: test_split = test_split[test_split < len(cls_test_folds)] cls_test_folds[test_split] = test_fold_indices test_folds[target == cls] = cls_test_folds return test_folds